"github.com/quic-go/quic-go/internal/mocks"
"github.com/quic-go/quic-go/internal/protocol"
+ "github.com/quic-go/quic-go/internal/synctest"
"github.com/quic-go/quic-go/internal/wire"
"github.com/stretchr/testify/assert"
}
func TestReceiveStreamBlockRead(t *testing.T) {
- mockCtrl := gomock.NewController(t)
- mockFC := mocks.NewMockStreamFlowController(mockCtrl)
- mockSender := NewMockStreamSender(mockCtrl)
- str := newReceiveStream(42, mockSender, mockFC)
-
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false, gomock.Any())
- mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
- errChan := make(chan error, 1)
- go func() {
- frame := wire.StreamFrame{Data: []byte{0xDE, 0xAD}}
- time.Sleep(scaleDuration(5 * time.Millisecond))
- errChan <- str.handleStreamFrame(&frame, time.Now())
- }()
+ synctest.Test(t, func(t *testing.T) {
+ mockCtrl := gomock.NewController(t)
+ mockFC := mocks.NewMockStreamFlowController(mockCtrl)
+ mockSender := NewMockStreamSender(mockCtrl)
+ str := newReceiveStream(42, mockSender, mockFC)
+
+ mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false, gomock.Any())
+ mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
+ errChan := make(chan error, 1)
+ now := time.Now()
+ go func() {
+ frame := &wire.StreamFrame{Data: []byte{0xde, 0xad}}
+ time.Sleep(time.Hour)
+ errChan <- str.handleStreamFrame(frame, time.Now())
+ }()
- n, err := (&readerWithTimeout{Reader: str, Timeout: time.Second}).Read(make([]byte, 2))
- require.NoError(t, err)
- require.Equal(t, 2, n)
- require.NoError(t, <-errChan)
+ n, err := (&readerWithTimeout{Reader: str, Timeout: 2 * time.Hour}).Read(make([]byte, 2))
+ require.NoError(t, err)
+ require.Equal(t, 2, n)
+ require.Equal(t, now.Add(time.Hour), time.Now())
+ require.NoError(t, <-errChan)
+ })
}
func TestReceiveStreamReadOverlappingData(t *testing.T) {
now := time.Now()
mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false, now).Times(3)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
- for i := 0; i < 3; i++ {
+ for range 3 {
require.NoError(t, str.handleStreamFrame(&wire.StreamFrame{Data: []byte{0xde, 0xad, 0xbe, 0xef}}, now))
}
b := make([]byte, 4)
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4)),
mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)),
)
- require.NoError(t, str.handleStreamFrame(&wire.StreamFrame{Offset: 4, Data: []byte{'f', 'o', 'o', 'b'}}, now))
- require.NoError(t, str.handleStreamFrame(&wire.StreamFrame{Offset: 6, Data: []byte{'o', 'b', 'a', 'r'}}, now))
+ require.NoError(t, str.handleStreamFrame(&wire.StreamFrame{Offset: 4, Data: []byte("foob")}, now))
+ require.NoError(t, str.handleStreamFrame(&wire.StreamFrame{Offset: 6, Data: []byte("obar")}, now))
b = make([]byte, 6)
n, err = (&readerWithTimeout{Reader: str, Timeout: time.Second}).Read(b)
require.NoError(t, err)
require.Equal(t, 6, n)
- require.Equal(t, []byte{'f', 'o', 'o', 'b', 'a', 'r'}, b)
+ require.Equal(t, []byte("foobar"), b)
}
func TestReceiveStreamFlowControlUpdates(t *testing.T) {
}
func TestReceiveStreamDeadlineRemoval(t *testing.T) {
- mockCtrl := gomock.NewController(t)
- mockFC := mocks.NewMockStreamFlowController(mockCtrl)
- str := newReceiveStream(42, nil, mockFC)
-
- deadline := scaleDuration(20 * time.Millisecond)
- require.NoError(t, str.SetReadDeadline(time.Now().Add(deadline)))
- errChan := make(chan error, 1)
- go func() {
- _, err := (&readerWithTimeout{Reader: str, Timeout: 5 * time.Second}).Read([]byte{0})
- errChan <- err
- }()
- select {
- case err := <-errChan:
- t.Fatalf("read should not have returned yet: %v", err)
- case <-time.After(deadline / 2):
- }
+ synctest.Test(t, func(t *testing.T) {
+ mockCtrl := gomock.NewController(t)
+ mockFC := mocks.NewMockStreamFlowController(mockCtrl)
+ str := newReceiveStream(42, nil, mockFC)
+
+ const deadline = time.Minute
+ require.NoError(t, str.SetReadDeadline(time.Now().Add(deadline)))
+ errChan := make(chan error, 1)
+ go func() {
+ _, err := (&readerWithTimeout{Reader: str, Timeout: 3 * deadline}).Read([]byte{0})
+ errChan <- err
+ }()
+ select {
+ case err := <-errChan:
+ t.Fatalf("read should not have returned yet: %v", err)
+ case <-time.After(deadline / 2):
+ }
- // remove the deadline after a while (but before it expires)
- require.NoError(t, str.SetReadDeadline(time.Time{}))
+ // remove the deadline after a while (but before it expires)
+ require.NoError(t, str.SetReadDeadline(time.Time{}))
- select {
- case err := <-errChan:
- t.Fatalf("read should not have returned yet: %v", err)
- case <-time.After(deadline):
- }
+ // no deadline set: Read should not return at all
+ select {
+ case err := <-errChan:
+ t.Fatalf("read should not have returned yet: %v", err)
+ case <-time.After(2 * deadline):
+ }
- // now set the deadline to the past to make Read return immediately
- require.NoError(t, str.SetReadDeadline(time.Now().Add(-time.Second)))
- select {
- case err := <-errChan:
- require.ErrorIs(t, err, os.ErrDeadlineExceeded)
- case <-time.After(time.Second):
- t.Fatal("timeout")
- }
+ // now set the deadline to the past to make Read return immediately
+ require.NoError(t, str.SetReadDeadline(time.Now().Add(-time.Second)))
+ select {
+ case err := <-errChan:
+ require.ErrorIs(t, err, os.ErrDeadlineExceeded)
+ case <-time.After(time.Second):
+ t.Fatal("timeout")
+ }
+ })
}
func TestReceiveStreamDeadlineExtension(t *testing.T) {
- mockCtrl := gomock.NewController(t)
- mockFC := mocks.NewMockStreamFlowController(mockCtrl)
- str := newReceiveStream(42, nil, mockFC)
-
- deadline := scaleDuration(20 * time.Millisecond)
- require.NoError(t, str.SetReadDeadline(time.Now().Add(deadline)))
- errChan := make(chan error, 1)
- go func() {
- _, err := (&readerWithTimeout{Reader: str, Timeout: 5 * time.Second}).Read([]byte{0})
- errChan <- err
- }()
- select {
- case err := <-errChan:
- t.Fatalf("read should not have returned yet: %v", err)
- case <-time.After(deadline / 2):
- }
+ synctest.Test(t, func(t *testing.T) {
+ mockCtrl := gomock.NewController(t)
+ mockFC := mocks.NewMockStreamFlowController(mockCtrl)
+ str := newReceiveStream(42, nil, mockFC)
+
+ start := time.Now()
+ deadline := 5 * time.Second
+ require.NoError(t, str.SetReadDeadline(time.Now().Add(deadline)))
+ errChan := make(chan error, 1)
+ go func() {
+ _, err := (&readerWithTimeout{Reader: str, Timeout: 2 * deadline}).Read([]byte{0})
+ errChan <- err
+ }()
+ select {
+ case err := <-errChan:
+ t.Fatalf("read should not have returned yet: %v", err)
+ case <-time.After(deadline / 2):
+ }
- // extend the deadline
- require.NoError(t, str.SetReadDeadline(time.Now().Add(deadline)))
- select {
- case err := <-errChan:
- require.ErrorIs(t, err, os.ErrDeadlineExceeded)
- case <-time.After(deadline * 3 / 2):
- t.Fatal("timeout")
- }
+ // extend the deadline
+ require.NoError(t, str.SetReadDeadline(time.Now().Add(deadline)))
+ select {
+ case err := <-errChan:
+ require.ErrorIs(t, err, os.ErrDeadlineExceeded)
+ require.Equal(t, start.Add(deadline*3/2), time.Now())
+ case <-time.After(deadline + time.Nanosecond):
+ t.Fatal("timeout")
+ }
+ })
}
func TestReceiveStreamEOFWithData(t *testing.T) {
}
func TestReceiveStreamCloseForShutdown(t *testing.T) {
- mockCtrl := gomock.NewController(t)
- mockFC := mocks.NewMockStreamFlowController(mockCtrl)
- mockSender := NewMockStreamSender(mockCtrl)
- str := newReceiveStream(42, mockSender, mockFC)
- strWithTimeout := &readerWithTimeout{Reader: str, Timeout: time.Second}
+ synctest.Test(t, func(t *testing.T) {
+ mockCtrl := gomock.NewController(t)
+ mockFC := mocks.NewMockStreamFlowController(mockCtrl)
+ mockSender := NewMockStreamSender(mockCtrl)
+ str := newReceiveStream(42, mockSender, mockFC)
+ strWithTimeout := &readerWithTimeout{Reader: str, Timeout: time.Minute}
+
+ // Test immediate return of reads
+ errChan := make(chan error, 1)
+ go func() {
+ _, err := strWithTimeout.Read([]byte{0})
+ errChan <- err
+ }()
- // Test immediate return of reads
- errChan := make(chan error, 1)
- go func() {
- _, err := strWithTimeout.Read([]byte{0})
- errChan <- err
- }()
+ select {
+ case err := <-errChan:
+ t.Fatalf("read returned before closeForShutdown: %v", err)
+ case <-time.After(time.Second): // short wait to ensure read is blocked
+ }
- select {
- case err := <-errChan:
- t.Fatalf("read returned before closeForShutdown: %v", err)
- case <-time.After(scaleDuration(5 * time.Millisecond)): // short wait to ensure read is blocked
- }
+ str.closeForShutdown(assert.AnError)
+ select {
+ case err := <-errChan:
+ require.ErrorIs(t, err, assert.AnError)
+ case <-time.After(time.Second):
+ t.Fatal("read did not return after closeForShutdown")
+ }
- str.closeForShutdown(assert.AnError)
- select {
- case err := <-errChan:
+ // following calls to Read should return the error
+ n, err := strWithTimeout.Read([]byte{0})
+ require.Zero(t, n)
require.ErrorIs(t, err, assert.AnError)
- case <-time.After(time.Second):
- t.Fatal("read did not return after closeForShutdown")
- }
- // following calls to Read should return the error
- n, err := strWithTimeout.Read([]byte{0})
- require.Zero(t, n)
- require.ErrorIs(t, err, assert.AnError)
-
- // receiving a RESET_STREAM frame after closeForShutdown does nothing
- require.NoError(t, str.handleResetStreamFrame(&wire.ResetStreamFrame{StreamID: 42, ErrorCode: 1234, FinalSize: 42}, time.Now()))
- n, err = strWithTimeout.Read([]byte{0})
- require.Zero(t, n)
- require.ErrorIs(t, err, assert.AnError)
+ // receiving a RESET_STREAM frame after closeForShutdown does nothing
+ require.NoError(t, str.handleResetStreamFrame(&wire.ResetStreamFrame{StreamID: 42, ErrorCode: 1234, FinalSize: 42}, time.Now()))
+ n, err = strWithTimeout.Read([]byte{0})
+ require.Zero(t, n)
+ require.ErrorIs(t, err, assert.AnError)
- // calling CancelRead after closeForShutdown does nothing
- str.CancelRead(1234)
- n, err = strWithTimeout.Read([]byte{0})
- require.Zero(t, n)
- require.ErrorIs(t, err, assert.AnError)
+ // calling CancelRead after closeForShutdown does nothing
+ str.CancelRead(1234)
+ n, err = strWithTimeout.Read([]byte{0})
+ require.Zero(t, n)
+ require.ErrorIs(t, err, assert.AnError)
+ })
}
func TestReceiveStreamCancellation(t *testing.T) {
- mockCtrl := gomock.NewController(t)
- mockFC := mocks.NewMockStreamFlowController(mockCtrl)
- mockSender := NewMockStreamSender(mockCtrl)
- str := newReceiveStream(42, mockSender, mockFC)
- strWithTimeout := &readerWithTimeout{Reader: str, Timeout: time.Second}
-
- mockSender.EXPECT().onHasStreamControlFrame(str.StreamID(), gomock.Any())
- errChan := make(chan error, 1)
- go func() {
- _, err := strWithTimeout.Read([]byte{0})
- errChan <- err
- }()
-
- select {
- case err := <-errChan:
- t.Fatalf("read returned before CancelRead: %v", err)
- case <-time.After(scaleDuration(5 * time.Millisecond)):
- }
-
- str.CancelRead(1234)
- // this queues a STOP_SENDING frame
- f, ok, hasMore := str.getControlFrame(time.Now())
- require.True(t, ok)
- require.Equal(t, &wire.StopSendingFrame{StreamID: 42, ErrorCode: 1234}, f.Frame)
- require.False(t, hasMore)
- require.True(t, mockCtrl.Satisfied())
+ synctest.Test(t, func(t *testing.T) {
+ mockCtrl := gomock.NewController(t)
+ mockFC := mocks.NewMockStreamFlowController(mockCtrl)
+ mockSender := NewMockStreamSender(mockCtrl)
+ str := newReceiveStream(42, mockSender, mockFC)
+ strWithTimeout := &readerWithTimeout{Reader: str, Timeout: 2 * time.Second}
+
+ mockSender.EXPECT().onHasStreamControlFrame(str.StreamID(), gomock.Any())
+ errChan := make(chan error, 1)
+ go func() {
+ _, err := strWithTimeout.Read([]byte{0})
+ errChan <- err
+ }()
- select {
- case err := <-errChan:
- var streamErr *StreamError
- require.ErrorAs(t, err, &streamErr)
- require.Equal(t, StreamError{StreamID: 42, ErrorCode: 1234, Remote: false}, *streamErr)
- case <-time.After(time.Second):
- t.Fatal("Read was not unblocked")
- }
+ select {
+ case err := <-errChan:
+ t.Fatalf("read returned before CancelRead: %v", err)
+ case <-time.After(time.Second):
+ }
- // further Read calls return the error
- n, err := strWithTimeout.Read([]byte{0})
- require.Zero(t, n)
- require.ErrorIs(t, err, &StreamError{StreamID: 42, ErrorCode: 1234, Remote: false})
-
- // calling CancelRead again does nothing
- // especially:
- // 1. no more calls to onHasStreamControlFrame
- // 2. no changes of the error code returned by Read
- str.CancelRead(1234)
- str.CancelRead(4321)
- n, err = strWithTimeout.Read([]byte{0})
- require.Zero(t, n)
- // error code unchanged
- require.ErrorIs(t, err, &StreamError{StreamID: 42, ErrorCode: 1234, Remote: false})
- require.True(t, mockCtrl.Satisfied())
+ str.CancelRead(1234)
+ // this queues a STOP_SENDING frame
+ f, ok, hasMore := str.getControlFrame(time.Now())
+ require.True(t, ok)
+ require.Equal(t, &wire.StopSendingFrame{StreamID: 42, ErrorCode: 1234}, f.Frame)
+ require.False(t, hasMore)
+ require.True(t, mockCtrl.Satisfied())
- // receiving the FIN bit has no effect
- mockFC.EXPECT().Abandon()
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), true, gomock.Any()).Times(2)
- mockSender.EXPECT().onStreamCompleted(protocol.StreamID(42))
- // receive two of them, to make sure onStreamCompleted is not called twice
- require.NoError(t, str.handleStreamFrame(&wire.StreamFrame{Data: []byte("foobar"), Fin: true}, time.Now()))
- require.NoError(t, str.handleStreamFrame(&wire.StreamFrame{Data: []byte("foobar"), Fin: true}, time.Now()))
- require.True(t, mockCtrl.Satisfied())
+ select {
+ case err := <-errChan:
+ var streamErr *StreamError
+ require.ErrorAs(t, err, &streamErr)
+ require.Equal(t, StreamError{StreamID: 42, ErrorCode: 1234, Remote: false}, *streamErr)
+ case <-time.After(time.Second):
+ t.Fatal("Read was not unblocked")
+ }
- // receiving a RESET_STREAM frame after CancelRead has no effect
- mockFC.EXPECT().Abandon()
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true, gomock.Any())
- require.NoError(t, str.handleResetStreamFrame(&wire.ResetStreamFrame{StreamID: 42, ErrorCode: 4321, FinalSize: 42}, time.Now()))
- n, err = strWithTimeout.Read([]byte{0})
- require.Zero(t, n)
- require.ErrorIs(t, err, &StreamError{StreamID: 42, ErrorCode: 1234, Remote: false})
+ // further Read calls return the error
+ n, err := strWithTimeout.Read([]byte{0})
+ require.Zero(t, n)
+ require.ErrorIs(t, err, &StreamError{StreamID: 42, ErrorCode: 1234, Remote: false})
+
+ // calling CancelRead again does nothing
+ // especially:
+ // 1. no more calls to onHasStreamControlFrame
+ // 2. no changes of the error code returned by Read
+ str.CancelRead(1234)
+ str.CancelRead(4321)
+ n, err = strWithTimeout.Read([]byte{0})
+ require.Zero(t, n)
+ // error code unchanged
+ require.ErrorIs(t, err, &StreamError{StreamID: 42, ErrorCode: 1234, Remote: false})
+ require.True(t, mockCtrl.Satisfied())
+
+ // receiving the FIN bit has no effect
+ mockFC.EXPECT().Abandon()
+ mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), true, gomock.Any()).Times(2)
+ mockSender.EXPECT().onStreamCompleted(protocol.StreamID(42))
+ // receive two of them, to make sure onStreamCompleted is not called twice
+ require.NoError(t, str.handleStreamFrame(&wire.StreamFrame{Data: []byte("foobar"), Fin: true}, time.Now()))
+ require.NoError(t, str.handleStreamFrame(&wire.StreamFrame{Data: []byte("foobar"), Fin: true}, time.Now()))
+ require.True(t, mockCtrl.Satisfied())
+
+ // receiving a RESET_STREAM frame after CancelRead has no effect
+ mockFC.EXPECT().Abandon()
+ mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true, gomock.Any())
+ require.NoError(t, str.handleResetStreamFrame(&wire.ResetStreamFrame{StreamID: 42, ErrorCode: 4321, FinalSize: 42}, time.Now()))
+ n, err = strWithTimeout.Read([]byte{0})
+ require.Zero(t, n)
+ require.ErrorIs(t, err, &StreamError{StreamID: 42, ErrorCode: 1234, Remote: false})
+ })
}
func TestReceiveStreamCancelReadAfterFIN(t *testing.T) {
}
func TestReceiveStreamReset(t *testing.T) {
- mockCtrl := gomock.NewController(t)
- mockFC := mocks.NewMockStreamFlowController(mockCtrl)
- mockSender := NewMockStreamSender(mockCtrl)
- str := newReceiveStream(42, mockSender, mockFC)
- strWithTimeout := &readerWithTimeout{Reader: str, Timeout: time.Second}
+ synctest.Test(t, func(t *testing.T) {
+ mockCtrl := gomock.NewController(t)
+ mockFC := mocks.NewMockStreamFlowController(mockCtrl)
+ mockSender := NewMockStreamSender(mockCtrl)
+ str := newReceiveStream(42, mockSender, mockFC)
+ strWithTimeout := &readerWithTimeout{Reader: str, Timeout: 2 * time.Second}
+
+ errChan := make(chan error, 1)
+ go func() {
+ _, err := strWithTimeout.Read([]byte{0})
+ errChan <- err
+ }()
- errChan := make(chan error, 1)
- go func() {
- _, err := strWithTimeout.Read([]byte{0})
- errChan <- err
- }()
+ select {
+ case err := <-errChan:
+ t.Fatalf("read returned before reset: %v", err)
+ case <-time.After(time.Second):
+ }
- select {
- case err := <-errChan:
- t.Fatalf("read returned before reset: %v", err)
- case <-time.After(scaleDuration(5 * time.Millisecond)):
- }
+ mockSender.EXPECT().onStreamCompleted(protocol.StreamID(42))
+ gomock.InOrder(
+ mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true, gomock.Any()),
+ mockFC.EXPECT().Abandon().MinTimes(1),
+ )
+ require.NoError(t, str.handleResetStreamFrame(
+ &wire.ResetStreamFrame{StreamID: 42, ErrorCode: 1234, FinalSize: 42},
+ time.Now(),
+ ))
- mockSender.EXPECT().onStreamCompleted(protocol.StreamID(42))
- gomock.InOrder(
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true, gomock.Any()),
- mockFC.EXPECT().Abandon().MinTimes(1),
- )
- require.NoError(t, str.handleResetStreamFrame(
- &wire.ResetStreamFrame{StreamID: 42, ErrorCode: 1234, FinalSize: 42},
- time.Now(),
- ))
+ select {
+ case err := <-errChan:
+ require.ErrorIs(t, err, &StreamError{StreamID: 42, ErrorCode: 1234, Remote: true})
+ case <-time.After(time.Second):
+ t.Fatal("Read was not unblocked")
+ }
- select {
- case err := <-errChan:
+ // Test that further calls to Read return the error
+ _, err := strWithTimeout.Read([]byte{0})
+ require.Equal(t, &StreamError{StreamID: 42, ErrorCode: 1234, Remote: true}, err)
+
+ // further RESET_STREAM frames have no effect
+ mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true, gomock.Any())
+ require.NoError(t, str.handleResetStreamFrame(
+ &wire.ResetStreamFrame{StreamID: 42, ErrorCode: 4321, FinalSize: 42},
+ time.Now(),
+ ))
+ n, err := str.Read([]byte{0})
+ require.Zero(t, n)
+ // error code unchanged
require.ErrorIs(t, err, &StreamError{StreamID: 42, ErrorCode: 1234, Remote: true})
- case <-time.After(time.Second):
- t.Fatal("Read was not unblocked")
- }
- // Test that further calls to Read return the error
- _, err := strWithTimeout.Read([]byte{0})
- require.Equal(t, &StreamError{StreamID: 42, ErrorCode: 1234, Remote: true}, err)
-
- // further RESET_STREAM frames have no effect
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true, gomock.Any())
- require.NoError(t, str.handleResetStreamFrame(
- &wire.ResetStreamFrame{StreamID: 42, ErrorCode: 4321, FinalSize: 42},
- time.Now(),
- ))
- n, err := str.Read([]byte{0})
- require.Zero(t, n)
- // error code unchanged
- require.ErrorIs(t, err, &StreamError{StreamID: 42, ErrorCode: 1234, Remote: true})
-
- // CancelRead after a RESET_STREAM frame has no effect
- str.CancelRead(100)
- n, err = str.Read([]byte{0})
- require.Zero(t, n)
- // error code and remote flag unchanged
- require.ErrorIs(t, err, &StreamError{StreamID: 42, ErrorCode: 1234, Remote: true})
+ // CancelRead after a RESET_STREAM frame has no effect
+ str.CancelRead(100)
+ n, err = str.Read([]byte{0})
+ require.Zero(t, n)
+ // error code and remote flag unchanged
+ require.ErrorIs(t, err, &StreamError{StreamID: 42, ErrorCode: 1234, Remote: true})
+ })
}
func TestReceiveStreamResetAfterFINRead(t *testing.T) {