]> git.feebdaed.xyz Git - 0xmirror/grpc-go.git/commitdiff
transport: Remove buffer copies while writing HTTP/2 Data frames (#8667)
authorArjan Singh Bal <46515553+arjan-bal@users.noreply.github.com>
Mon, 3 Nov 2025 09:03:17 +0000 (14:33 +0530)
committerGitHub <noreply@github.com>
Mon, 3 Nov 2025 09:03:17 +0000 (14:33 +0530)
This PR removes 2 buffer copies while writing data frames to the
underlying net.Conn: one [within
gRPC](https://github.com/grpc/grpc-go/blob/58d4b2b1492dbcfdf26daa7ed93830ebb871faf1/internal/transport/controlbuf.go#L1009-L1022)
and the other [in the
framer](https://cs.opensource.google/go/x/net/+/master:http2/frame.go;l=743;drc=6e243da531559f8c99439dabc7647dec07191f9b).
Care is taken to avoid any extra heap allocations which can affect
performance for smaller payloads.

A [CL](https://go-review.git.corp.google.com/c/net/+/711620) is out for
review which allows using the framer to write frame headers. This PR
duplicates the header writing code as a temporary workaround. This PR
will be merged only after the CL is merged.

## Results

### Small payloads
Performance for small payloads increases slightly due to the reduction
of a `deferred` statement.
```
$ go run benchmark/benchmain/main.go -benchtime=60s -workloads=unary \
   -compression=off -maxConcurrentCalls=120 -trace=off \
   -reqSizeBytes=100 -respSizeBytes=100 -networkMode=Local -resultFile="${RUN_NAME}"

$ go run benchmark/benchresult/main.go unary-before unary-after
               Title       Before        After Percentage
            TotalOps      7600878      7653522     0.69%
             SendOps            0            0      NaN%
             RecvOps            0            0      NaN%
            Bytes/op     10007.07     10000.89    -0.07%
           Allocs/op       146.93       146.91     0.00%
             ReqT/op 101345040.00 102046960.00     0.69%
            RespT/op 101345040.00 102046960.00     0.69%
            50th-Lat    833.724µs    830.041µs    -0.44%
            90th-Lat   1.281969ms   1.275336ms    -0.52%
            99th-Lat   2.403961ms   2.360606ms    -1.80%
             Avg-Lat    946.123µs    939.734µs    -0.68%
           GoVersion     go1.24.8     go1.24.8
         GrpcVersion   1.77.0-dev   1.77.0-dev
```

### Large payloads
Local benchmarks show a ~5-10% regression with 1 MB payloads on my dev
machine. The profiles show increased time spent in the copy operation
[inside the buffered
writer](https://github.com/grpc/grpc-go/blob/58d4b2b1492dbcfdf26daa7ed93830ebb871faf1/internal/transport/http_util.go#L334).
Counterintuitively, copying the grpc header and message data into a
larger buffer increased the performance by 4% (compared to master).

To validate this behaviour (extra copy increasing performance) I ran
[the k8s benchmark for 1MB
payloads](https://github.com/grpc/grpc/blob/65c9be86830b0e423dd970c066c69a06a9240298/tools/run_tests/performance/scenario_config.py#L291-L305)
and 100 concurrent streams which showed ~5% increase in QPS without the
copies across multiple runs. Adding a copy reduced the performance.

Load test config file:
[loadtest.yaml](https://github.com/user-attachments/files/23055312/loadtest.yaml)

```
# 30 core client and server
Before
QPS: 498.284 (16.6095/server core)
Latencies (50/90/95/99/99.9%-ile): 233256/275972/281250/291803/298533 us
Server system time: 93.0164
Server user time:   142.533
Client system time: 97.2688
Client user time:   144.542

After
QPS: 526.776 (17.5592/server core)
Latencies (50/90/95/99/99.9%-ile): 211010/263189/270969/280656/288828 us
Server system time: 96.5959
Server user time:   147.668
Client system time: 101.973
Client user time:   150.234

# 8 core client and server
Before
QPS: 291.049 (36.3811/server core)
Latencies (50/90/95/99/99.9%-ile): 294552/685822/903554/1.48399e+06/1.50757e+06 us
Server system time: 49.0355
Server user time:   87.1783
Client system time: 60.1945
Client user time:   103.633

After
QPS: 334.119 (41.7649/server core)
Latencies (50/90/95/99/99.9%-ile): 279395/518849/706327/1.09273e+06/1.11629e+06 us
Server system time: 69.3136
Server user time:   102.549
Client system time: 80.9804
Client user time:   107.103
```

RELEASE NOTES:
* transport: Avoid two buffer copies when writing Data frames.

internal/transport/controlbuf.go
internal/transport/http_util.go
mem/buffer_slice.go
mem/buffer_slice_test.go

index 752d4e8f8562dfc6b9d3d1227a6dd3fcf62b994e..2dcd1e63bdd2f90cde2630cce231fe65475c00c5 100644 (file)
@@ -496,6 +496,16 @@ const (
        serverSide
 )
 
+// maxWriteBufSize is the maximum length (number of elements) the cached
+// writeBuf can grow to. The length depends on the number of buffers
+// contained within the BufferSlice produced by the codec, which is
+// generally small.
+//
+// If a writeBuf larger than this limit is required, it will be allocated
+// and freed after use, rather than being cached. This avoids holding
+// on to large amounts of memory.
+const maxWriteBufSize = 64
+
 // Loopy receives frames from the control buffer.
 // Each frame is handled individually; most of the work done by loopy goes
 // into handling data frames. Loopy maintains a queue of active streams, and each
@@ -530,6 +540,8 @@ type loopyWriter struct {
 
        // Side-specific handlers
        ssGoAwayHandler func(*goAway) (bool, error)
+
+       writeBuf [][]byte // cached slice to avoid heap allocations for calls to mem.Reader.Peek.
 }
 
 func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error), bufferPool mem.BufferPool) *loopyWriter {
@@ -962,11 +974,11 @@ func (l *loopyWriter) processData() (bool, error) {
 
        if len(dataItem.h) == 0 && reader.Remaining() == 0 { // Empty data frame
                // Client sends out empty data frame with endStream = true
-               if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
+               if err := l.framer.writeData(dataItem.streamID, dataItem.endStream, nil); err != nil {
                        return false, err
                }
                str.itl.dequeue() // remove the empty data item from stream
-               _ = reader.Close()
+               reader.Close()
                if str.itl.isEmpty() {
                        str.state = empty
                } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
@@ -999,25 +1011,20 @@ func (l *loopyWriter) processData() (bool, error) {
        remainingBytes := len(dataItem.h) + reader.Remaining() - hSize - dSize
        size := hSize + dSize
 
-       var buf *[]byte
-
-       if hSize != 0 && dSize == 0 {
-               buf = &dataItem.h
-       } else {
-               // Note: this is only necessary because the http2.Framer does not support
-               // partially writing a frame, so the sequence must be materialized into a buffer.
-               // TODO: Revisit once https://github.com/golang/go/issues/66655 is addressed.
-               pool := l.bufferPool
-               if pool == nil {
-                       // Note that this is only supposed to be nil in tests. Otherwise, stream is
-                       // always initialized with a BufferPool.
-                       pool = mem.DefaultBufferPool()
+       l.writeBuf = l.writeBuf[:0]
+       if hSize > 0 {
+               l.writeBuf = append(l.writeBuf, dataItem.h[:hSize])
+       }
+       if dSize > 0 {
+               var err error
+               l.writeBuf, err = reader.Peek(dSize, l.writeBuf)
+               if err != nil {
+                       // This must never happen since the reader must have at least dSize
+                       // bytes.
+                       // Log an error to fail tests.
+                       l.logger.Errorf("unexpected error while reading Data frame payload: %v", err)
+                       return false, err
                }
-               buf = pool.Get(size)
-               defer pool.Put(buf)
-
-               copy((*buf)[:hSize], dataItem.h)
-               _, _ = reader.Read((*buf)[hSize:])
        }
 
        // Now that outgoing flow controls are checked we can replenish str's write quota
@@ -1030,7 +1037,14 @@ func (l *loopyWriter) processData() (bool, error) {
        if dataItem.onEachWrite != nil {
                dataItem.onEachWrite()
        }
-       if err := l.framer.fr.WriteData(dataItem.streamID, endStream, (*buf)[:size]); err != nil {
+       err := l.framer.writeData(dataItem.streamID, endStream, l.writeBuf)
+       reader.Discard(dSize)
+       if cap(l.writeBuf) > maxWriteBufSize {
+               l.writeBuf = nil
+       } else {
+               clear(l.writeBuf)
+       }
+       if err != nil {
                return false, err
        }
        str.bytesOutStanding += size
@@ -1038,7 +1052,7 @@ func (l *loopyWriter) processData() (bool, error) {
        dataItem.h = dataItem.h[hSize:]
 
        if remainingBytes == 0 { // All the data from that message was written out.
-               _ = reader.Close()
+               reader.Close()
                str.itl.dequeue()
        }
        if str.itl.isEmpty() {
index 0ba6c7e799415ec622f6f28111ee803f268bc353..6209eb23cdcd027e7e70e4d1effae7bd8fb56b84 100644 (file)
@@ -400,6 +400,7 @@ func (df *parsedDataFrame) StreamEnded() bool {
 type framer struct {
        writer    *bufWriter
        fr        *http2.Framer
+       headerBuf []byte // cached slice for framer headers to reduce heap allocs.
        reader    io.Reader
        dataFrame parsedDataFrame // Cached data frame to avoid heap allocations.
        pool      mem.BufferPool
@@ -443,6 +444,41 @@ func newFramer(conn io.ReadWriter, writeBufferSize, readBufferSize int, sharedWr
        return f
 }
 
+// writeData writes a DATA frame.
+//
+// It is the caller's responsibility not to violate the maximum frame size.
+func (f *framer) writeData(streamID uint32, endStream bool, data [][]byte) error {
+       var flags http2.Flags
+       if endStream {
+               flags = http2.FlagDataEndStream
+       }
+       length := uint32(0)
+       for _, d := range data {
+               length += uint32(len(d))
+       }
+       // TODO: Replace the header write with the framer API being added in
+       // https://github.com/golang/go/issues/66655.
+       f.headerBuf = append(f.headerBuf[:0],
+               byte(length>>16),
+               byte(length>>8),
+               byte(length),
+               byte(http2.FrameData),
+               byte(flags),
+               byte(streamID>>24),
+               byte(streamID>>16),
+               byte(streamID>>8),
+               byte(streamID))
+       if _, err := f.writer.Write(f.headerBuf); err != nil {
+               return err
+       }
+       for _, d := range data {
+               if _, err := f.writer.Write(d); err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
 // readFrame reads a single frame. The returned Frame is only valid
 // until the next call to readFrame.
 func (f *framer) readFrame() (any, error) {
index 9fcb12b989e539b6a6bfe7413afa1608ea97e478..084fb19c6d15c7b474e2ad37a663611bc79d1891 100644 (file)
@@ -19,6 +19,7 @@
 package mem
 
 import (
+       "fmt"
        "io"
 )
 
@@ -126,9 +127,10 @@ func (s BufferSlice) Reader() *Reader {
 }
 
 // Reader exposes a BufferSlice's data as an io.Reader, allowing it to interface
-// with other parts systems. It also provides an additional convenience method
-// Remaining(), which returns the number of unread bytes remaining in the slice.
+// with other systems.
+//
 // Buffers will be freed as they are read.
+//
 // A Reader can be constructed from a BufferSlice; alternatively the zero value
 // of a Reader may be used after calling Reset on it.
 type Reader struct {
@@ -285,3 +287,59 @@ nextBuffer:
                }
        }
 }
+
+// Discard skips the next n bytes, returning the number of bytes discarded.
+//
+// It frees buffers as they are fully consumed.
+//
+// If Discard skips fewer than n bytes, it also returns an error.
+func (r *Reader) Discard(n int) (discarded int, err error) {
+       total := n
+       for n > 0 && r.len > 0 {
+               curData := r.data[0].ReadOnlyData()
+               curSize := min(n, len(curData)-r.bufferIdx)
+               n -= curSize
+               r.len -= curSize
+               r.bufferIdx += curSize
+               if r.bufferIdx >= len(curData) {
+                       r.data[0].Free()
+                       r.data = r.data[1:]
+                       r.bufferIdx = 0
+               }
+       }
+       discarded = total - n
+       if n > 0 {
+               return discarded, fmt.Errorf("insufficient bytes in reader")
+       }
+       return discarded, nil
+}
+
+// Peek returns the next n bytes without advancing the reader.
+//
+// Peek appends results to the provided res slice and returns the updated slice.
+// This pattern allows re-using the storage of res if it has sufficient
+// capacity.
+//
+// The returned subslices are views into the underlying buffers and are only
+// valid until the reader is advanced past the corresponding buffer.
+//
+// If Peek returns fewer than n bytes, it also returns an error.
+func (r *Reader) Peek(n int, res [][]byte) ([][]byte, error) {
+       for i := 0; n > 0 && i < len(r.data); i++ {
+               curData := r.data[i].ReadOnlyData()
+               start := 0
+               if i == 0 {
+                       start = r.bufferIdx
+               }
+               curSize := min(n, len(curData)-start)
+               if curSize == 0 {
+                       continue
+               }
+               res = append(res, curData[start:start+curSize])
+               n -= curSize
+       }
+       if n > 0 {
+               return nil, fmt.Errorf("insufficient bytes in reader")
+       }
+       return res, nil
+}
index bb9303f0e9e1db348c3ba243756c367921b514eb..822acbda87da2d13cee4848c07a92ea9f81ff099 100644 (file)
@@ -484,3 +484,193 @@ func (t *testPool) Put(buf *[]byte) {
        }
        delete(t.allocated, buf)
 }
+
+func (s) TestBufferSlice_Iteration(t *testing.T) {
+       tests := []struct {
+               name       string
+               buffers    [][]byte
+               operations func(t *testing.T, c *mem.Reader)
+       }{
+               {
+                       name: "empty",
+                       operations: func(t *testing.T, r *mem.Reader) {
+                               if r.Remaining() != 0 {
+                                       t.Fatalf("Remaining() = %v, want 0", r.Remaining())
+                               }
+                               _, err := r.Peek(1, nil)
+                               if err == nil {
+                                       t.Fatalf("Peek(1) returned error <nil>, want non-nil")
+                               }
+                               discarded, err := r.Discard(1)
+                               if got, want := discarded, 0; got != want {
+                                       t.Fatalf("Discard(1) = %d, want %d", got, want)
+                               }
+                               if err == nil {
+                                       t.Fatalf("Discard(1) returned error <nil>, want non-nil")
+                               }
+                               if r.Remaining() != 0 {
+                                       t.Fatalf("Remaining() after Discard = %v, want 0", r.Remaining())
+                               }
+                       },
+               },
+               {
+                       name:    "single_buffer",
+                       buffers: [][]byte{[]byte("0123456789")},
+                       operations: func(t *testing.T, r *mem.Reader) {
+                               if r.Remaining() != 10 {
+                                       t.Fatalf("Remaining() = %v, want 10", r.Remaining())
+                               }
+
+                               res := make([][]byte, 0, 10)
+                               res, err := r.Peek(5, res)
+                               if err != nil {
+                                       t.Fatalf("Peek(5) return error %v, want <nil>", err)
+                               }
+                               if len(res) != 1 || !bytes.Equal(res[0], []byte("01234")) {
+                                       t.Fatalf("Peek(5) = %v, want [[01234]]", res)
+                               }
+                               if cap(res) != 10 {
+                                       t.Fatalf("Peek(5) did not use the provided slice.")
+                               }
+
+                               discarded, err := r.Discard(5)
+                               if got, want := discarded, 5; got != want {
+                                       t.Fatalf("Discard(5) = %d, want %d", got, want)
+                               }
+                               if err != nil {
+                                       t.Fatalf("Discard(5) return error %v, want <nil>", err)
+                               }
+                               if r.Remaining() != 5 {
+                                       t.Fatalf("Remaining() after Discard(5) = %v, want 5", r.Remaining())
+                               }
+                               res, err = r.Peek(5, res[:0])
+                               if err != nil {
+                                       t.Fatalf("Peek(5) return error %v, want <nil>", err)
+                               }
+                               if len(res) != 1 || !bytes.Equal(res[0], []byte("56789")) {
+                                       t.Fatalf("Peek(5) after Discard(5) = %v, want [[56789]]", res)
+                               }
+
+                               discarded, err = r.Discard(100)
+                               if got, want := discarded, 5; got != want {
+                                       t.Fatalf("Discard(100) = %d, want %d", got, want)
+                               }
+                               if err == nil {
+                                       t.Fatalf("Discard(100) returned error <nil>, want non-nil")
+                               }
+                               if r.Remaining() != 0 {
+                                       t.Fatalf("Remaining() after Discard(100) = %v, want 0", r.Remaining())
+                               }
+                       },
+               },
+               {
+                       name:    "multiple_buffers",
+                       buffers: [][]byte{[]byte("012"), []byte("345"), []byte("6789")},
+                       operations: func(t *testing.T, r *mem.Reader) {
+                               if r.Remaining() != 10 {
+                                       t.Fatalf("Remaining() = %v, want 10", r.Remaining())
+                               }
+
+                               res, err := r.Peek(5, nil)
+                               if err != nil {
+                                       t.Fatalf("Peek(5) return error %v, want <nil>", err)
+                               }
+                               if len(res) != 2 || !bytes.Equal(res[0], []byte("012")) || !bytes.Equal(res[1], []byte("34")) {
+                                       t.Fatalf("Peek(5) = %v, want [[012] [34]]", res)
+                               }
+
+                               discarded, err := r.Discard(5)
+                               if got, want := discarded, 5; got != want {
+                                       t.Fatalf("Discard(5) = %d, want %d", got, want)
+                               }
+                               if err != nil {
+                                       t.Fatalf("Discard(5) return error %v, want <nil>", err)
+                               }
+                               if r.Remaining() != 5 {
+                                       t.Fatalf("Remaining() after Discard(5) = %v, want 5", r.Remaining())
+                               }
+
+                               res, err = r.Peek(5, res[:0])
+                               if err != nil {
+                                       t.Fatalf("Peek(5) return error %v, want <nil>", err)
+                               }
+                               if len(res) != 2 || !bytes.Equal(res[0], []byte("5")) || !bytes.Equal(res[1], []byte("6789")) {
+                                       t.Fatalf("Peek(5) after advance = %v, want [[5] [6789]]", res)
+                               }
+                       },
+               },
+               {
+                       name:    "close",
+                       buffers: [][]byte{[]byte("0123456789")},
+                       operations: func(t *testing.T, r *mem.Reader) {
+                               r.Close()
+                               if r.Remaining() != 0 {
+                                       t.Fatalf("Remaining() after Close = %v, want 0", r.Remaining())
+                               }
+                       },
+               },
+               {
+                       name:    "reset",
+                       buffers: [][]byte{[]byte("0123")},
+                       operations: func(t *testing.T, r *mem.Reader) {
+                               newSlice := mem.BufferSlice{mem.SliceBuffer([]byte("56789"))}
+                               r.Reset(newSlice)
+                               if r.Remaining() != 5 {
+                                       t.Fatalf("Remaining() after Reset = %v, want 5", r.Remaining())
+                               }
+                               res, err := r.Peek(5, nil)
+                               if err != nil {
+                                       t.Fatalf("Peek(5) return error %v, want <nil>", err)
+                               }
+                               if len(res) != 1 || !bytes.Equal(res[0], []byte("56789")) {
+                                       t.Fatalf("Peek(5) after Reset = %v, want [[56789]]", res)
+                               }
+                       },
+               },
+               {
+                       name:    "zero_ops",
+                       buffers: [][]byte{[]byte("01234")},
+                       operations: func(t *testing.T, c *mem.Reader) {
+                               if c.Remaining() != 5 {
+                                       t.Fatalf("Remaining() = %v, want 5", c.Remaining())
+                               }
+                               res, err := c.Peek(0, nil)
+                               if err != nil {
+                                       t.Fatalf("Peek(0) return error %v, want <nil>", err)
+                               }
+                               if len(res) != 0 {
+                                       t.Fatalf("Peek(0) got slices: %v, want empty", res)
+                               }
+                               discarded, err := c.Discard(0)
+                               if err != nil {
+                                       t.Fatalf("Discard(0) return error %v, want <nil>", err)
+                               }
+                               if got, want := discarded, 0; got != want {
+                                       t.Fatalf("Discard(0) = %d, want %d", got, want)
+                               }
+                               if c.Remaining() != 5 {
+                                       t.Fatalf("Remaining() after Discard(0) = %v, want 5", c.Remaining())
+                               }
+                               res, err = c.Peek(2, res[:0])
+                               if err != nil {
+                                       t.Fatalf("Peek(2) return error %v, want <nil>", err)
+                               }
+                               if len(res) != 1 || !bytes.Equal(res[0], []byte("01")) {
+                                       t.Fatalf("Peek(2) after zero ops = %v, want [[01]]", res)
+                               }
+                       },
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       var slice mem.BufferSlice
+                       for _, b := range tt.buffers {
+                               slice = append(slice, mem.SliceBuffer(b))
+                       }
+                       c := slice.Reader()
+                       slice.Free()
+                       defer c.Close()
+                       tt.operations(t, c)
+               })
+       }
+}