streams_map.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. package quic
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "sync"
  8. "github.com/Psiphon-Labs/quic-go/internal/flowcontrol"
  9. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  10. "github.com/Psiphon-Labs/quic-go/internal/qerr"
  11. "github.com/Psiphon-Labs/quic-go/internal/wire"
  12. )
  13. type streamError struct {
  14. message string
  15. nums []protocol.StreamNum
  16. }
  17. func (e streamError) Error() string {
  18. return e.message
  19. }
  20. func convertStreamError(err error, stype protocol.StreamType, pers protocol.Perspective) error {
  21. strError, ok := err.(streamError)
  22. if !ok {
  23. return err
  24. }
  25. ids := make([]interface{}, len(strError.nums))
  26. for i, num := range strError.nums {
  27. ids[i] = num.StreamID(stype, pers)
  28. }
  29. return fmt.Errorf(strError.Error(), ids...)
  30. }
  31. type streamOpenErr struct{ error }
  32. var _ net.Error = &streamOpenErr{}
  33. func (streamOpenErr) Timeout() bool { return false }
  34. func (e streamOpenErr) Unwrap() error { return e.error }
  35. func (e streamOpenErr) Temporary() bool {
  36. // In older versions of quic-go, the stream limit error was documented to be a net.Error.Temporary.
  37. // This function was since deprecated, but we keep the existing behavior.
  38. return errors.Is(e, &StreamLimitReachedError{})
  39. }
  40. // StreamLimitReachedError is returned from Connection.OpenStream and Connection.OpenUniStream
  41. // when it is not possible to open a new stream because the number of opens streams reached
  42. // the peer's stream limit.
  43. type StreamLimitReachedError struct{}
  44. func (e StreamLimitReachedError) Error() string { return "too many open streams" }
  45. type streamsMap struct {
  46. ctx context.Context // not used for cancellations, but carries the values associated with the connection
  47. perspective protocol.Perspective
  48. maxIncomingBidiStreams uint64
  49. maxIncomingUniStreams uint64
  50. sender streamSender
  51. queueControlFrame func(wire.Frame)
  52. newFlowController func(protocol.StreamID) flowcontrol.StreamFlowController
  53. mutex sync.Mutex
  54. outgoingBidiStreams *outgoingStreamsMap[streamI]
  55. outgoingUniStreams *outgoingStreamsMap[sendStreamI]
  56. incomingBidiStreams *incomingStreamsMap[streamI]
  57. incomingUniStreams *incomingStreamsMap[receiveStreamI]
  58. reset bool
  59. }
  60. var _ streamManager = &streamsMap{}
  61. func newStreamsMap(
  62. ctx context.Context,
  63. sender streamSender,
  64. queueControlFrame func(wire.Frame),
  65. newFlowController func(protocol.StreamID) flowcontrol.StreamFlowController,
  66. maxIncomingBidiStreams uint64,
  67. maxIncomingUniStreams uint64,
  68. perspective protocol.Perspective,
  69. ) *streamsMap {
  70. m := &streamsMap{
  71. ctx: ctx,
  72. perspective: perspective,
  73. queueControlFrame: queueControlFrame,
  74. newFlowController: newFlowController,
  75. maxIncomingBidiStreams: maxIncomingBidiStreams,
  76. maxIncomingUniStreams: maxIncomingUniStreams,
  77. sender: sender,
  78. }
  79. m.initMaps()
  80. return m
  81. }
  82. func (m *streamsMap) initMaps() {
  83. m.outgoingBidiStreams = newOutgoingStreamsMap(
  84. protocol.StreamTypeBidi,
  85. func(num protocol.StreamNum) streamI {
  86. id := num.StreamID(protocol.StreamTypeBidi, m.perspective)
  87. return newStream(m.ctx, id, m.sender, m.newFlowController(id))
  88. },
  89. m.queueControlFrame,
  90. )
  91. m.incomingBidiStreams = newIncomingStreamsMap(
  92. protocol.StreamTypeBidi,
  93. func(num protocol.StreamNum) streamI {
  94. id := num.StreamID(protocol.StreamTypeBidi, m.perspective.Opposite())
  95. return newStream(m.ctx, id, m.sender, m.newFlowController(id))
  96. },
  97. m.maxIncomingBidiStreams,
  98. m.queueControlFrame,
  99. )
  100. m.outgoingUniStreams = newOutgoingStreamsMap(
  101. protocol.StreamTypeUni,
  102. func(num protocol.StreamNum) sendStreamI {
  103. id := num.StreamID(protocol.StreamTypeUni, m.perspective)
  104. return newSendStream(m.ctx, id, m.sender, m.newFlowController(id))
  105. },
  106. m.queueControlFrame,
  107. )
  108. m.incomingUniStreams = newIncomingStreamsMap(
  109. protocol.StreamTypeUni,
  110. func(num protocol.StreamNum) receiveStreamI {
  111. id := num.StreamID(protocol.StreamTypeUni, m.perspective.Opposite())
  112. return newReceiveStream(id, m.sender, m.newFlowController(id))
  113. },
  114. m.maxIncomingUniStreams,
  115. m.queueControlFrame,
  116. )
  117. }
  118. func (m *streamsMap) OpenStream() (Stream, error) {
  119. m.mutex.Lock()
  120. reset := m.reset
  121. mm := m.outgoingBidiStreams
  122. m.mutex.Unlock()
  123. if reset {
  124. return nil, Err0RTTRejected
  125. }
  126. str, err := mm.OpenStream()
  127. return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective)
  128. }
  129. func (m *streamsMap) OpenStreamSync(ctx context.Context) (Stream, error) {
  130. m.mutex.Lock()
  131. reset := m.reset
  132. mm := m.outgoingBidiStreams
  133. m.mutex.Unlock()
  134. if reset {
  135. return nil, Err0RTTRejected
  136. }
  137. str, err := mm.OpenStreamSync(ctx)
  138. return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective)
  139. }
  140. func (m *streamsMap) OpenUniStream() (SendStream, error) {
  141. m.mutex.Lock()
  142. reset := m.reset
  143. mm := m.outgoingUniStreams
  144. m.mutex.Unlock()
  145. if reset {
  146. return nil, Err0RTTRejected
  147. }
  148. str, err := mm.OpenStream()
  149. return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective)
  150. }
  151. func (m *streamsMap) OpenUniStreamSync(ctx context.Context) (SendStream, error) {
  152. m.mutex.Lock()
  153. reset := m.reset
  154. mm := m.outgoingUniStreams
  155. m.mutex.Unlock()
  156. if reset {
  157. return nil, Err0RTTRejected
  158. }
  159. str, err := mm.OpenStreamSync(ctx)
  160. return str, convertStreamError(err, protocol.StreamTypeUni, m.perspective)
  161. }
  162. func (m *streamsMap) AcceptStream(ctx context.Context) (Stream, error) {
  163. m.mutex.Lock()
  164. reset := m.reset
  165. mm := m.incomingBidiStreams
  166. m.mutex.Unlock()
  167. if reset {
  168. return nil, Err0RTTRejected
  169. }
  170. str, err := mm.AcceptStream(ctx)
  171. return str, convertStreamError(err, protocol.StreamTypeBidi, m.perspective.Opposite())
  172. }
  173. func (m *streamsMap) AcceptUniStream(ctx context.Context) (ReceiveStream, error) {
  174. m.mutex.Lock()
  175. reset := m.reset
  176. mm := m.incomingUniStreams
  177. m.mutex.Unlock()
  178. if reset {
  179. return nil, Err0RTTRejected
  180. }
  181. str, err := mm.AcceptStream(ctx)
  182. return str, convertStreamError(err, protocol.StreamTypeUni, m.perspective.Opposite())
  183. }
  184. func (m *streamsMap) DeleteStream(id protocol.StreamID) error {
  185. num := id.StreamNum()
  186. switch id.Type() {
  187. case protocol.StreamTypeUni:
  188. if id.InitiatedBy() == m.perspective {
  189. return convertStreamError(m.outgoingUniStreams.DeleteStream(num), protocol.StreamTypeUni, m.perspective)
  190. }
  191. return convertStreamError(m.incomingUniStreams.DeleteStream(num), protocol.StreamTypeUni, m.perspective.Opposite())
  192. case protocol.StreamTypeBidi:
  193. if id.InitiatedBy() == m.perspective {
  194. return convertStreamError(m.outgoingBidiStreams.DeleteStream(num), protocol.StreamTypeBidi, m.perspective)
  195. }
  196. return convertStreamError(m.incomingBidiStreams.DeleteStream(num), protocol.StreamTypeBidi, m.perspective.Opposite())
  197. }
  198. panic("")
  199. }
  200. func (m *streamsMap) GetOrOpenReceiveStream(id protocol.StreamID) (receiveStreamI, error) {
  201. str, err := m.getOrOpenReceiveStream(id)
  202. if err != nil {
  203. return nil, &qerr.TransportError{
  204. ErrorCode: qerr.StreamStateError,
  205. ErrorMessage: err.Error(),
  206. }
  207. }
  208. return str, nil
  209. }
  210. func (m *streamsMap) getOrOpenReceiveStream(id protocol.StreamID) (receiveStreamI, error) {
  211. num := id.StreamNum()
  212. switch id.Type() {
  213. case protocol.StreamTypeUni:
  214. if id.InitiatedBy() == m.perspective {
  215. // an outgoing unidirectional stream is a send stream, not a receive stream
  216. return nil, fmt.Errorf("peer attempted to open receive stream %d", id)
  217. }
  218. str, err := m.incomingUniStreams.GetOrOpenStream(num)
  219. return str, convertStreamError(err, protocol.StreamTypeUni, m.perspective)
  220. case protocol.StreamTypeBidi:
  221. var str receiveStreamI
  222. var err error
  223. if id.InitiatedBy() == m.perspective {
  224. str, err = m.outgoingBidiStreams.GetStream(num)
  225. } else {
  226. str, err = m.incomingBidiStreams.GetOrOpenStream(num)
  227. }
  228. return str, convertStreamError(err, protocol.StreamTypeBidi, id.InitiatedBy())
  229. }
  230. panic("")
  231. }
  232. func (m *streamsMap) GetOrOpenSendStream(id protocol.StreamID) (sendStreamI, error) {
  233. str, err := m.getOrOpenSendStream(id)
  234. if err != nil {
  235. return nil, &qerr.TransportError{
  236. ErrorCode: qerr.StreamStateError,
  237. ErrorMessage: err.Error(),
  238. }
  239. }
  240. return str, nil
  241. }
  242. func (m *streamsMap) getOrOpenSendStream(id protocol.StreamID) (sendStreamI, error) {
  243. num := id.StreamNum()
  244. switch id.Type() {
  245. case protocol.StreamTypeUni:
  246. if id.InitiatedBy() == m.perspective {
  247. str, err := m.outgoingUniStreams.GetStream(num)
  248. return str, convertStreamError(err, protocol.StreamTypeUni, m.perspective)
  249. }
  250. // an incoming unidirectional stream is a receive stream, not a send stream
  251. return nil, fmt.Errorf("peer attempted to open send stream %d", id)
  252. case protocol.StreamTypeBidi:
  253. var str sendStreamI
  254. var err error
  255. if id.InitiatedBy() == m.perspective {
  256. str, err = m.outgoingBidiStreams.GetStream(num)
  257. } else {
  258. str, err = m.incomingBidiStreams.GetOrOpenStream(num)
  259. }
  260. return str, convertStreamError(err, protocol.StreamTypeBidi, id.InitiatedBy())
  261. }
  262. panic("")
  263. }
  264. func (m *streamsMap) HandleMaxStreamsFrame(f *wire.MaxStreamsFrame) {
  265. switch f.Type {
  266. case protocol.StreamTypeUni:
  267. m.outgoingUniStreams.SetMaxStream(f.MaxStreamNum)
  268. case protocol.StreamTypeBidi:
  269. m.outgoingBidiStreams.SetMaxStream(f.MaxStreamNum)
  270. }
  271. }
  272. func (m *streamsMap) UpdateLimits(p *wire.TransportParameters) {
  273. m.outgoingBidiStreams.UpdateSendWindow(p.InitialMaxStreamDataBidiRemote)
  274. m.outgoingBidiStreams.SetMaxStream(p.MaxBidiStreamNum)
  275. m.outgoingUniStreams.UpdateSendWindow(p.InitialMaxStreamDataUni)
  276. m.outgoingUniStreams.SetMaxStream(p.MaxUniStreamNum)
  277. }
  278. func (m *streamsMap) CloseWithError(err error) {
  279. m.outgoingBidiStreams.CloseWithError(err)
  280. m.outgoingUniStreams.CloseWithError(err)
  281. m.incomingBidiStreams.CloseWithError(err)
  282. m.incomingUniStreams.CloseWithError(err)
  283. }
  284. // ResetFor0RTT resets is used when 0-RTT is rejected. In that case, the streams maps are
  285. // 1. closed with an Err0RTTRejected, making calls to Open{Uni}Stream{Sync} / Accept{Uni}Stream return that error.
  286. // 2. reset to their initial state, such that we can immediately process new incoming stream data.
  287. // Afterwards, calls to Open{Uni}Stream{Sync} / Accept{Uni}Stream will continue to return the error,
  288. // until UseResetMaps() has been called.
  289. func (m *streamsMap) ResetFor0RTT() {
  290. m.mutex.Lock()
  291. defer m.mutex.Unlock()
  292. m.reset = true
  293. m.CloseWithError(Err0RTTRejected)
  294. m.initMaps()
  295. }
  296. func (m *streamsMap) UseResetMaps() {
  297. m.mutex.Lock()
  298. m.reset = false
  299. m.mutex.Unlock()
  300. }