send_stream.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. package gquic
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/internal/flowcontrol"
  8. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/internal/protocol"
  9. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/internal/utils"
  10. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/internal/wire"
  11. )
  12. type sendStreamI interface {
  13. SendStream
  14. handleStopSendingFrame(*wire.StopSendingFrame)
  15. hasData() bool
  16. popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool)
  17. closeForShutdown(error)
  18. handleMaxStreamDataFrame(*wire.MaxStreamDataFrame)
  19. }
  20. type sendStream struct {
  21. mutex sync.Mutex
  22. ctx context.Context
  23. ctxCancel context.CancelFunc
  24. streamID protocol.StreamID
  25. sender streamSender
  26. writeOffset protocol.ByteCount
  27. cancelWriteErr error
  28. closeForShutdownErr error
  29. closedForShutdown bool // set when CloseForShutdown() is called
  30. finishedWriting bool // set once Close() is called
  31. canceledWrite bool // set when CancelWrite() is called, or a STOP_SENDING frame is received
  32. finSent bool // set when a STREAM_FRAME with FIN bit has b
  33. dataForWriting []byte
  34. writeChan chan struct{}
  35. deadline time.Time
  36. flowController flowcontrol.StreamFlowController
  37. version protocol.VersionNumber
  38. }
  39. var _ SendStream = &sendStream{}
  40. var _ sendStreamI = &sendStream{}
  41. func newSendStream(
  42. streamID protocol.StreamID,
  43. sender streamSender,
  44. flowController flowcontrol.StreamFlowController,
  45. version protocol.VersionNumber,
  46. ) *sendStream {
  47. s := &sendStream{
  48. streamID: streamID,
  49. sender: sender,
  50. flowController: flowController,
  51. writeChan: make(chan struct{}, 1),
  52. version: version,
  53. }
  54. s.ctx, s.ctxCancel = context.WithCancel(context.Background())
  55. return s
  56. }
  57. func (s *sendStream) StreamID() protocol.StreamID {
  58. return s.streamID // same for receiveStream and sendStream
  59. }
  60. func (s *sendStream) Write(p []byte) (int, error) {
  61. s.mutex.Lock()
  62. defer s.mutex.Unlock()
  63. if s.finishedWriting {
  64. return 0, fmt.Errorf("write on closed stream %d", s.streamID)
  65. }
  66. if s.canceledWrite {
  67. return 0, s.cancelWriteErr
  68. }
  69. if s.closeForShutdownErr != nil {
  70. return 0, s.closeForShutdownErr
  71. }
  72. if !s.deadline.IsZero() && !time.Now().Before(s.deadline) {
  73. return 0, errDeadline
  74. }
  75. if len(p) == 0 {
  76. return 0, nil
  77. }
  78. s.dataForWriting = p
  79. var (
  80. deadlineTimer *utils.Timer
  81. bytesWritten int
  82. notifiedSender bool
  83. )
  84. for {
  85. bytesWritten = len(p) - len(s.dataForWriting)
  86. deadline := s.deadline
  87. if !deadline.IsZero() {
  88. if !time.Now().Before(deadline) {
  89. s.dataForWriting = nil
  90. return bytesWritten, errDeadline
  91. }
  92. if deadlineTimer == nil {
  93. deadlineTimer = utils.NewTimer()
  94. }
  95. deadlineTimer.Reset(deadline)
  96. }
  97. if s.dataForWriting == nil || s.canceledWrite || s.closedForShutdown {
  98. break
  99. }
  100. s.mutex.Unlock()
  101. if !notifiedSender {
  102. s.sender.onHasStreamData(s.streamID) // must be called without holding the mutex
  103. notifiedSender = true
  104. }
  105. if deadline.IsZero() {
  106. <-s.writeChan
  107. } else {
  108. select {
  109. case <-s.writeChan:
  110. case <-deadlineTimer.Chan():
  111. deadlineTimer.SetRead()
  112. }
  113. }
  114. s.mutex.Lock()
  115. }
  116. // [Psiphon]
  117. // Stop timer to immediately release resources
  118. if deadlineTimer != nil {
  119. deadlineTimer.Reset(time.Time{})
  120. }
  121. if s.closeForShutdownErr != nil {
  122. return bytesWritten, s.closeForShutdownErr
  123. } else if s.cancelWriteErr != nil {
  124. return bytesWritten, s.cancelWriteErr
  125. }
  126. return bytesWritten, nil
  127. }
  128. // popStreamFrame returns the next STREAM frame that is supposed to be sent on this stream
  129. // maxBytes is the maximum length this frame (including frame header) will have.
  130. func (s *sendStream) popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool /* has more data to send */) {
  131. completed, frame, hasMoreData := s.popStreamFrameImpl(maxBytes)
  132. if completed {
  133. s.sender.onStreamCompleted(s.streamID)
  134. }
  135. return frame, hasMoreData
  136. }
  137. func (s *sendStream) popStreamFrameImpl(maxBytes protocol.ByteCount) (bool /* completed */, *wire.StreamFrame, bool /* has more data to send */) {
  138. s.mutex.Lock()
  139. defer s.mutex.Unlock()
  140. if s.closeForShutdownErr != nil {
  141. return false, nil, false
  142. }
  143. frame := &wire.StreamFrame{
  144. StreamID: s.streamID,
  145. Offset: s.writeOffset,
  146. DataLenPresent: true,
  147. }
  148. maxDataLen := frame.MaxDataLen(maxBytes, s.version)
  149. if maxDataLen == 0 { // a STREAM frame must have at least one byte of data
  150. return false, nil, s.dataForWriting != nil
  151. }
  152. frame.Data, frame.FinBit = s.getDataForWriting(maxDataLen)
  153. if len(frame.Data) == 0 && !frame.FinBit {
  154. // this can happen if:
  155. // - popStreamFrame is called but there's no data for writing
  156. // - there's data for writing, but the stream is stream-level flow control blocked
  157. // - there's data for writing, but the stream is connection-level flow control blocked
  158. if s.dataForWriting == nil {
  159. return false, nil, false
  160. }
  161. if isBlocked, offset := s.flowController.IsNewlyBlocked(); isBlocked {
  162. s.sender.queueControlFrame(&wire.StreamBlockedFrame{
  163. StreamID: s.streamID,
  164. Offset: offset,
  165. })
  166. return false, nil, false
  167. }
  168. return false, nil, true
  169. }
  170. if frame.FinBit {
  171. s.finSent = true
  172. }
  173. return frame.FinBit, frame, s.dataForWriting != nil
  174. }
  175. func (s *sendStream) hasData() bool {
  176. s.mutex.Lock()
  177. hasData := len(s.dataForWriting) > 0
  178. s.mutex.Unlock()
  179. return hasData
  180. }
  181. func (s *sendStream) getDataForWriting(maxBytes protocol.ByteCount) ([]byte, bool /* should send FIN */) {
  182. if s.dataForWriting == nil {
  183. return nil, s.finishedWriting && !s.finSent
  184. }
  185. if s.streamID != s.version.CryptoStreamID() {
  186. maxBytes = utils.MinByteCount(maxBytes, s.flowController.SendWindowSize())
  187. }
  188. if maxBytes == 0 {
  189. return nil, false
  190. }
  191. var ret []byte
  192. if protocol.ByteCount(len(s.dataForWriting)) > maxBytes {
  193. ret = make([]byte, int(maxBytes))
  194. copy(ret, s.dataForWriting[:maxBytes])
  195. s.dataForWriting = s.dataForWriting[maxBytes:]
  196. } else {
  197. ret = make([]byte, len(s.dataForWriting))
  198. copy(ret, s.dataForWriting)
  199. s.dataForWriting = nil
  200. s.signalWrite()
  201. }
  202. s.writeOffset += protocol.ByteCount(len(ret))
  203. s.flowController.AddBytesSent(protocol.ByteCount(len(ret)))
  204. return ret, s.finishedWriting && s.dataForWriting == nil && !s.finSent
  205. }
  206. func (s *sendStream) Close() error {
  207. s.mutex.Lock()
  208. if s.canceledWrite {
  209. s.mutex.Unlock()
  210. return fmt.Errorf("Close called for canceled stream %d", s.streamID)
  211. }
  212. s.finishedWriting = true
  213. s.mutex.Unlock()
  214. s.sender.onHasStreamData(s.streamID) // need to send the FIN, must be called without holding the mutex
  215. s.ctxCancel()
  216. return nil
  217. }
  218. func (s *sendStream) CancelWrite(errorCode protocol.ApplicationErrorCode) error {
  219. s.mutex.Lock()
  220. completed, err := s.cancelWriteImpl(errorCode, fmt.Errorf("Write on stream %d canceled with error code %d", s.streamID, errorCode))
  221. s.mutex.Unlock()
  222. if completed {
  223. s.sender.onStreamCompleted(s.streamID) // must be called without holding the mutex
  224. }
  225. return err
  226. }
  227. // must be called after locking the mutex
  228. func (s *sendStream) cancelWriteImpl(errorCode protocol.ApplicationErrorCode, writeErr error) (bool /*completed */, error) {
  229. if s.canceledWrite {
  230. return false, nil
  231. }
  232. if s.finishedWriting {
  233. return false, fmt.Errorf("CancelWrite for closed stream %d", s.streamID)
  234. }
  235. s.canceledWrite = true
  236. s.cancelWriteErr = writeErr
  237. s.signalWrite()
  238. s.sender.queueControlFrame(&wire.RstStreamFrame{
  239. StreamID: s.streamID,
  240. ByteOffset: s.writeOffset,
  241. ErrorCode: errorCode,
  242. })
  243. // TODO(#991): cancel retransmissions for this stream
  244. s.ctxCancel()
  245. return true, nil
  246. }
  247. func (s *sendStream) handleStopSendingFrame(frame *wire.StopSendingFrame) {
  248. if completed := s.handleStopSendingFrameImpl(frame); completed {
  249. s.sender.onStreamCompleted(s.streamID)
  250. }
  251. }
  252. func (s *sendStream) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) {
  253. s.mutex.Lock()
  254. hasStreamData := s.dataForWriting != nil
  255. s.mutex.Unlock()
  256. s.flowController.UpdateSendWindow(frame.ByteOffset)
  257. if hasStreamData {
  258. s.sender.onHasStreamData(s.streamID)
  259. }
  260. }
  261. // must be called after locking the mutex
  262. func (s *sendStream) handleStopSendingFrameImpl(frame *wire.StopSendingFrame) bool /*completed*/ {
  263. s.mutex.Lock()
  264. defer s.mutex.Unlock()
  265. writeErr := streamCanceledError{
  266. errorCode: frame.ErrorCode,
  267. error: fmt.Errorf("Stream %d was reset with error code %d", s.streamID, frame.ErrorCode),
  268. }
  269. errorCode := errorCodeStopping
  270. if !s.version.UsesIETFFrameFormat() {
  271. errorCode = errorCodeStoppingGQUIC
  272. }
  273. completed, _ := s.cancelWriteImpl(errorCode, writeErr)
  274. return completed
  275. }
  276. func (s *sendStream) Context() context.Context {
  277. return s.ctx
  278. }
  279. func (s *sendStream) SetWriteDeadline(t time.Time) error {
  280. s.mutex.Lock()
  281. s.deadline = t
  282. s.mutex.Unlock()
  283. s.signalWrite()
  284. return nil
  285. }
  286. // CloseForShutdown closes a stream abruptly.
  287. // It makes Write unblock (and return the error) immediately.
  288. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
  289. func (s *sendStream) closeForShutdown(err error) {
  290. s.mutex.Lock()
  291. s.closedForShutdown = true
  292. s.closeForShutdownErr = err
  293. s.mutex.Unlock()
  294. s.signalWrite()
  295. s.ctxCancel()
  296. }
  297. func (s *sendStream) getWriteOffset() protocol.ByteCount {
  298. return s.writeOffset
  299. }
  300. // signalWrite performs a non-blocking send on the writeChan
  301. func (s *sendStream) signalWrite() {
  302. select {
  303. case s.writeChan <- struct{}{}:
  304. default:
  305. }
  306. }