session.go 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462
  1. package quic
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/tls"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "reflect"
  11. "sync"
  12. "time"
  13. "github.com/Psiphon-Labs/quic-go/internal/ackhandler"
  14. "github.com/Psiphon-Labs/quic-go/internal/congestion"
  15. "github.com/Psiphon-Labs/quic-go/internal/flowcontrol"
  16. "github.com/Psiphon-Labs/quic-go/internal/handshake"
  17. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  18. "github.com/Psiphon-Labs/quic-go/internal/qerr"
  19. "github.com/Psiphon-Labs/quic-go/internal/utils"
  20. "github.com/Psiphon-Labs/quic-go/internal/wire"
  21. "github.com/Psiphon-Labs/quic-go/quictrace"
  22. )
  23. type unpacker interface {
  24. Unpack(hdr *wire.Header, rcvTime time.Time, data []byte) (*unpackedPacket, error)
  25. }
  26. type streamGetter interface {
  27. GetOrOpenReceiveStream(protocol.StreamID) (receiveStreamI, error)
  28. GetOrOpenSendStream(protocol.StreamID) (sendStreamI, error)
  29. }
  30. type streamManager interface {
  31. GetOrOpenSendStream(protocol.StreamID) (sendStreamI, error)
  32. GetOrOpenReceiveStream(protocol.StreamID) (receiveStreamI, error)
  33. OpenStream() (Stream, error)
  34. OpenUniStream() (SendStream, error)
  35. OpenStreamSync(context.Context) (Stream, error)
  36. OpenUniStreamSync(context.Context) (SendStream, error)
  37. AcceptStream(context.Context) (Stream, error)
  38. AcceptUniStream(context.Context) (ReceiveStream, error)
  39. DeleteStream(protocol.StreamID) error
  40. UpdateLimits(*handshake.TransportParameters) error
  41. HandleMaxStreamsFrame(*wire.MaxStreamsFrame) error
  42. CloseWithError(error)
  43. }
  44. type cryptoStreamHandler interface {
  45. RunHandshake()
  46. ChangeConnectionID(protocol.ConnectionID)
  47. SetLargest1RTTAcked(protocol.PacketNumber)
  48. io.Closer
  49. ConnectionState() tls.ConnectionState
  50. }
  51. type receivedPacket struct {
  52. remoteAddr net.Addr
  53. rcvTime time.Time
  54. data []byte
  55. buffer *packetBuffer
  56. }
  57. func (p *receivedPacket) Clone() *receivedPacket {
  58. return &receivedPacket{
  59. remoteAddr: p.remoteAddr,
  60. rcvTime: p.rcvTime,
  61. data: p.data,
  62. buffer: p.buffer,
  63. }
  64. }
  65. type sessionRunner interface {
  66. Add(protocol.ConnectionID, packetHandler) [16]byte
  67. Retire(protocol.ConnectionID)
  68. Remove(protocol.ConnectionID)
  69. ReplaceWithClosed(protocol.ConnectionID, packetHandler)
  70. AddResetToken([16]byte, packetHandler)
  71. RemoveResetToken([16]byte)
  72. RetireResetToken([16]byte)
  73. }
  74. type handshakeRunner struct {
  75. onReceivedParams func([]byte)
  76. onError func(error)
  77. dropKeys func(protocol.EncryptionLevel)
  78. onHandshakeComplete func()
  79. }
  80. func (r *handshakeRunner) OnReceivedParams(b []byte) { r.onReceivedParams(b) }
  81. func (r *handshakeRunner) OnError(e error) { r.onError(e) }
  82. func (r *handshakeRunner) DropKeys(el protocol.EncryptionLevel) { r.dropKeys(el) }
  83. func (r *handshakeRunner) OnHandshakeComplete() { r.onHandshakeComplete() }
  84. type closeError struct {
  85. err error
  86. remote bool
  87. immediate bool
  88. }
  89. var errCloseForRecreating = errors.New("closing session in order to recreate it")
  90. // A Session is a QUIC session
  91. type session struct {
  92. // Destination connection ID used during the handshake.
  93. // Used to check source connection ID on incoming packets.
  94. handshakeDestConnID protocol.ConnectionID
  95. // if the server sends a Retry, this is the connection ID we used initially
  96. origDestConnID protocol.ConnectionID
  97. srcConnIDLen int
  98. perspective protocol.Perspective
  99. initialVersion protocol.VersionNumber // if version negotiation is performed, this is the version we initially tried
  100. version protocol.VersionNumber
  101. config *Config
  102. conn connection
  103. sendQueue *sendQueue
  104. streamsMap streamManager
  105. connIDManager *connIDManager
  106. connIDGenerator *connIDGenerator
  107. rttStats *congestion.RTTStats
  108. cryptoStreamManager *cryptoStreamManager
  109. sentPacketHandler ackhandler.SentPacketHandler
  110. receivedPacketHandler ackhandler.ReceivedPacketHandler
  111. retransmissionQueue *retransmissionQueue
  112. framer framer
  113. windowUpdateQueue *windowUpdateQueue
  114. connFlowController flowcontrol.ConnectionFlowController
  115. tokenStoreKey string // only set for the client
  116. tokenGenerator *handshake.TokenGenerator // only set for the server
  117. unpacker unpacker
  118. frameParser wire.FrameParser
  119. packer packer
  120. cryptoStreamHandler cryptoStreamHandler
  121. receivedPackets chan *receivedPacket
  122. sendingScheduled chan struct{}
  123. closeOnce sync.Once
  124. // closeChan is used to notify the run loop that it should terminate
  125. closeChan chan closeError
  126. ctx context.Context
  127. ctxCancel context.CancelFunc
  128. handshakeCtx context.Context
  129. handshakeCtxCancel context.CancelFunc
  130. undecryptablePackets []*receivedPacket
  131. clientHelloWritten <-chan struct{}
  132. earlySessionReadyChan chan struct{}
  133. handshakeCompleteChan chan struct{} // is closed when the handshake completes
  134. handshakeComplete bool
  135. receivedRetry bool
  136. receivedFirstPacket bool
  137. sessionCreationTime time.Time
  138. // The idle timeout is set based on the max of the time we received the last packet...
  139. lastPacketReceivedTime time.Time
  140. // ... and the time we sent a new ack-eliciting packet after receiving a packet.
  141. firstAckElicitingPacketAfterIdleSentTime time.Time
  142. // pacingDeadline is the time when the next packet should be sent
  143. pacingDeadline time.Time
  144. peerParams *handshake.TransportParameters
  145. timer *utils.Timer
  146. // keepAlivePingSent stores whether a keep alive PING is in flight.
  147. // It is reset as soon as we receive a packet from the peer.
  148. keepAlivePingSent bool
  149. keepAliveInterval time.Duration
  150. traceCallback func(quictrace.Event)
  151. logID string
  152. logger utils.Logger
  153. }
  154. var _ Session = &session{}
  155. var _ EarlySession = &session{}
  156. var _ streamSender = &session{}
  157. var newSession = func(
  158. conn connection,
  159. runner sessionRunner,
  160. origDestConnID protocol.ConnectionID,
  161. clientDestConnID protocol.ConnectionID,
  162. destConnID protocol.ConnectionID,
  163. srcConnID protocol.ConnectionID,
  164. statelessResetToken [16]byte,
  165. conf *Config,
  166. tlsConf *tls.Config,
  167. tokenGenerator *handshake.TokenGenerator,
  168. logger utils.Logger,
  169. v protocol.VersionNumber,
  170. ) quicSession {
  171. s := &session{
  172. conn: conn,
  173. config: conf,
  174. handshakeDestConnID: destConnID,
  175. srcConnIDLen: srcConnID.Len(),
  176. tokenGenerator: tokenGenerator,
  177. perspective: protocol.PerspectiveServer,
  178. handshakeCompleteChan: make(chan struct{}),
  179. logger: logger,
  180. version: v,
  181. }
  182. if origDestConnID != nil {
  183. s.logID = origDestConnID.String()
  184. } else {
  185. s.logID = destConnID.String()
  186. }
  187. s.connIDManager = newConnIDManager(
  188. destConnID,
  189. func(token [16]byte) { runner.AddResetToken(token, s) },
  190. runner.RemoveResetToken,
  191. runner.RetireResetToken,
  192. s.queueControlFrame,
  193. )
  194. s.connIDGenerator = newConnIDGenerator(
  195. srcConnID,
  196. clientDestConnID,
  197. func(connID protocol.ConnectionID) [16]byte { return runner.Add(connID, s) },
  198. runner.Remove,
  199. runner.Retire,
  200. runner.ReplaceWithClosed,
  201. s.queueControlFrame,
  202. )
  203. s.preSetup()
  204. s.sentPacketHandler = ackhandler.NewSentPacketHandler(0, s.rttStats, s.traceCallback, s.logger)
  205. initialStream := newCryptoStream()
  206. handshakeStream := newCryptoStream()
  207. oneRTTStream := newPostHandshakeCryptoStream(s.framer)
  208. params := &handshake.TransportParameters{
  209. InitialMaxStreamDataBidiLocal: protocol.InitialMaxStreamData,
  210. InitialMaxStreamDataBidiRemote: protocol.InitialMaxStreamData,
  211. InitialMaxStreamDataUni: protocol.InitialMaxStreamData,
  212. InitialMaxData: protocol.InitialMaxData,
  213. IdleTimeout: s.config.IdleTimeout,
  214. MaxBidiStreamNum: protocol.StreamNum(s.config.MaxIncomingStreams),
  215. MaxUniStreamNum: protocol.StreamNum(s.config.MaxIncomingUniStreams),
  216. MaxAckDelay: protocol.MaxAckDelayInclGranularity,
  217. AckDelayExponent: protocol.AckDelayExponent,
  218. DisableMigration: true,
  219. StatelessResetToken: &statelessResetToken,
  220. OriginalConnectionID: origDestConnID,
  221. ActiveConnectionIDLimit: protocol.MaxActiveConnectionIDs,
  222. }
  223. cs := handshake.NewCryptoSetupServer(
  224. initialStream,
  225. handshakeStream,
  226. oneRTTStream,
  227. clientDestConnID,
  228. conn.RemoteAddr(),
  229. params,
  230. &handshakeRunner{
  231. onReceivedParams: s.processTransportParameters,
  232. onError: s.closeLocal,
  233. dropKeys: s.dropEncryptionLevel,
  234. onHandshakeComplete: func() {
  235. runner.Retire(clientDestConnID)
  236. close(s.handshakeCompleteChan)
  237. },
  238. },
  239. tlsConf,
  240. s.rttStats,
  241. logger,
  242. )
  243. s.cryptoStreamHandler = cs
  244. s.packer = newPacketPacker(
  245. srcConnID,
  246. s.connIDManager.Get,
  247. initialStream,
  248. handshakeStream,
  249. s.sentPacketHandler,
  250. s.retransmissionQueue,
  251. s.RemoteAddr(),
  252. cs,
  253. s.framer,
  254. s.receivedPacketHandler,
  255. s.perspective,
  256. s.version,
  257. )
  258. s.unpacker = newPacketUnpacker(cs, s.version)
  259. s.cryptoStreamManager = newCryptoStreamManager(cs, initialStream, handshakeStream, oneRTTStream)
  260. return s
  261. }
  262. // declare this as a variable, such that we can it mock it in the tests
  263. var newClientSession = func(
  264. conn connection,
  265. runner sessionRunner,
  266. destConnID protocol.ConnectionID,
  267. srcConnID protocol.ConnectionID,
  268. conf *Config,
  269. tlsConf *tls.Config,
  270. initialPacketNumber protocol.PacketNumber,
  271. initialVersion protocol.VersionNumber,
  272. logger utils.Logger,
  273. v protocol.VersionNumber,
  274. ) quicSession {
  275. s := &session{
  276. conn: conn,
  277. config: conf,
  278. handshakeDestConnID: destConnID,
  279. srcConnIDLen: srcConnID.Len(),
  280. perspective: protocol.PerspectiveClient,
  281. handshakeCompleteChan: make(chan struct{}),
  282. logID: destConnID.String(),
  283. logger: logger,
  284. initialVersion: initialVersion,
  285. version: v,
  286. }
  287. s.connIDManager = newConnIDManager(
  288. destConnID,
  289. func(token [16]byte) { runner.AddResetToken(token, s) },
  290. runner.RemoveResetToken,
  291. runner.RetireResetToken,
  292. s.queueControlFrame,
  293. )
  294. s.connIDGenerator = newConnIDGenerator(
  295. srcConnID,
  296. nil,
  297. func(connID protocol.ConnectionID) [16]byte { return runner.Add(connID, s) },
  298. runner.Remove,
  299. runner.Retire,
  300. runner.ReplaceWithClosed,
  301. s.queueControlFrame,
  302. )
  303. s.preSetup()
  304. s.sentPacketHandler = ackhandler.NewSentPacketHandler(initialPacketNumber, s.rttStats, s.traceCallback, s.logger)
  305. initialStream := newCryptoStream()
  306. handshakeStream := newCryptoStream()
  307. oneRTTStream := newPostHandshakeCryptoStream(s.framer)
  308. params := &handshake.TransportParameters{
  309. InitialMaxStreamDataBidiRemote: protocol.InitialMaxStreamData,
  310. InitialMaxStreamDataBidiLocal: protocol.InitialMaxStreamData,
  311. InitialMaxStreamDataUni: protocol.InitialMaxStreamData,
  312. InitialMaxData: protocol.InitialMaxData,
  313. IdleTimeout: s.config.IdleTimeout,
  314. MaxBidiStreamNum: protocol.StreamNum(s.config.MaxIncomingStreams),
  315. MaxUniStreamNum: protocol.StreamNum(s.config.MaxIncomingUniStreams),
  316. MaxAckDelay: protocol.MaxAckDelayInclGranularity,
  317. AckDelayExponent: protocol.AckDelayExponent,
  318. DisableMigration: true,
  319. ActiveConnectionIDLimit: protocol.MaxActiveConnectionIDs,
  320. }
  321. cs, clientHelloWritten := handshake.NewCryptoSetupClient(
  322. initialStream,
  323. handshakeStream,
  324. oneRTTStream,
  325. destConnID,
  326. conn.RemoteAddr(),
  327. params,
  328. &handshakeRunner{
  329. onReceivedParams: s.processTransportParameters,
  330. onError: s.closeLocal,
  331. dropKeys: s.dropEncryptionLevel,
  332. onHandshakeComplete: func() { close(s.handshakeCompleteChan) },
  333. },
  334. tlsConf,
  335. s.rttStats,
  336. logger,
  337. )
  338. s.clientHelloWritten = clientHelloWritten
  339. s.cryptoStreamHandler = cs
  340. s.cryptoStreamManager = newCryptoStreamManager(cs, initialStream, handshakeStream, oneRTTStream)
  341. s.unpacker = newPacketUnpacker(cs, s.version)
  342. s.packer = newPacketPacker(
  343. srcConnID,
  344. s.connIDManager.Get,
  345. initialStream,
  346. handshakeStream,
  347. s.sentPacketHandler,
  348. s.retransmissionQueue,
  349. s.RemoteAddr(),
  350. cs,
  351. s.framer,
  352. s.receivedPacketHandler,
  353. s.perspective,
  354. s.version,
  355. )
  356. if len(tlsConf.ServerName) > 0 {
  357. s.tokenStoreKey = tlsConf.ServerName
  358. } else {
  359. s.tokenStoreKey = conn.RemoteAddr().String()
  360. }
  361. if s.config.TokenStore != nil {
  362. if token := s.config.TokenStore.Pop(s.tokenStoreKey); token != nil {
  363. s.packer.SetToken(token.data)
  364. }
  365. }
  366. return s
  367. }
  368. func (s *session) preSetup() {
  369. s.sendQueue = newSendQueue(s.conn)
  370. s.retransmissionQueue = newRetransmissionQueue(s.version)
  371. s.frameParser = wire.NewFrameParser(s.version)
  372. s.rttStats = &congestion.RTTStats{}
  373. s.receivedPacketHandler = ackhandler.NewReceivedPacketHandler(s.rttStats, s.logger, s.version)
  374. s.connFlowController = flowcontrol.NewConnectionFlowController(
  375. protocol.InitialMaxData,
  376. protocol.ByteCount(s.config.MaxReceiveConnectionFlowControlWindow),
  377. s.onHasConnectionWindowUpdate,
  378. s.rttStats,
  379. s.logger,
  380. )
  381. s.earlySessionReadyChan = make(chan struct{})
  382. s.streamsMap = newStreamsMap(
  383. s,
  384. s.newFlowController,
  385. uint64(s.config.MaxIncomingStreams),
  386. uint64(s.config.MaxIncomingUniStreams),
  387. s.perspective,
  388. s.version,
  389. )
  390. s.framer = newFramer(s.streamsMap, s.version)
  391. s.receivedPackets = make(chan *receivedPacket, protocol.MaxSessionUnprocessedPackets)
  392. s.closeChan = make(chan closeError, 1)
  393. s.sendingScheduled = make(chan struct{}, 1)
  394. s.undecryptablePackets = make([]*receivedPacket, 0, protocol.MaxUndecryptablePackets)
  395. s.ctx, s.ctxCancel = context.WithCancel(context.Background())
  396. s.handshakeCtx, s.handshakeCtxCancel = context.WithCancel(context.Background())
  397. s.timer = utils.NewTimer()
  398. now := time.Now()
  399. s.lastPacketReceivedTime = now
  400. s.sessionCreationTime = now
  401. s.windowUpdateQueue = newWindowUpdateQueue(s.streamsMap, s.connFlowController, s.framer.QueueControlFrame)
  402. if s.config.QuicTracer != nil {
  403. s.traceCallback = func(ev quictrace.Event) {
  404. s.config.QuicTracer.Trace(s.origDestConnID, ev)
  405. }
  406. }
  407. }
  408. // run the session main loop
  409. func (s *session) run() error {
  410. defer s.ctxCancel()
  411. go s.cryptoStreamHandler.RunHandshake()
  412. go func() {
  413. if err := s.sendQueue.Run(); err != nil {
  414. s.closeLocal(err)
  415. }
  416. }()
  417. if s.perspective == protocol.PerspectiveClient {
  418. select {
  419. case <-s.clientHelloWritten:
  420. s.scheduleSending()
  421. case closeErr := <-s.closeChan:
  422. // put the close error back into the channel, so that the run loop can receive it
  423. s.closeChan <- closeErr
  424. }
  425. }
  426. var closeErr closeError
  427. runLoop:
  428. for {
  429. // Close immediately if requested
  430. select {
  431. case closeErr = <-s.closeChan:
  432. break runLoop
  433. case <-s.handshakeCompleteChan:
  434. s.handleHandshakeComplete()
  435. default:
  436. }
  437. s.maybeResetTimer()
  438. select {
  439. case closeErr = <-s.closeChan:
  440. break runLoop
  441. case <-s.timer.Chan():
  442. s.timer.SetRead()
  443. // We do all the interesting stuff after the switch statement, so
  444. // nothing to see here.
  445. case <-s.sendingScheduled:
  446. // We do all the interesting stuff after the switch statement, so
  447. // nothing to see here.
  448. case p := <-s.receivedPackets:
  449. // Only reset the timers if this packet was actually processed.
  450. // This avoids modifying any state when handling undecryptable packets,
  451. // which could be injected by an attacker.
  452. if wasProcessed := s.handlePacketImpl(p); !wasProcessed {
  453. continue
  454. }
  455. case <-s.handshakeCompleteChan:
  456. s.handleHandshakeComplete()
  457. }
  458. now := time.Now()
  459. if timeout := s.sentPacketHandler.GetLossDetectionTimeout(); !timeout.IsZero() && timeout.Before(now) {
  460. // This could cause packets to be retransmitted.
  461. // Check it before trying to send packets.
  462. if err := s.sentPacketHandler.OnLossDetectionTimeout(); err != nil {
  463. s.closeLocal(err)
  464. }
  465. }
  466. var pacingDeadline time.Time
  467. if s.pacingDeadline.IsZero() { // the timer didn't have a pacing deadline set
  468. pacingDeadline = s.sentPacketHandler.TimeUntilSend()
  469. }
  470. if s.config.KeepAlive && !s.keepAlivePingSent && s.handshakeComplete && s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && time.Since(s.lastPacketReceivedTime) >= s.keepAliveInterval/2 {
  471. // send a PING frame since there is no activity in the session
  472. s.logger.Debugf("Sending a keep-alive ping to keep the connection alive.")
  473. s.framer.QueueControlFrame(&wire.PingFrame{})
  474. s.keepAlivePingSent = true
  475. } else if !pacingDeadline.IsZero() && now.Before(pacingDeadline) {
  476. // If we get to this point before the pacing deadline, we should wait until that deadline.
  477. // This can happen when scheduleSending is called, or a packet is received.
  478. // Set the timer and restart the run loop.
  479. s.pacingDeadline = pacingDeadline
  480. continue
  481. }
  482. if !s.handshakeComplete && now.Sub(s.sessionCreationTime) >= s.config.HandshakeTimeout {
  483. s.destroyImpl(qerr.TimeoutError("Handshake did not complete in time"))
  484. continue
  485. }
  486. if s.handshakeComplete && now.Sub(s.idleTimeoutStartTime()) >= s.config.IdleTimeout {
  487. s.destroyImpl(qerr.TimeoutError("No recent network activity"))
  488. continue
  489. }
  490. if err := s.sendPackets(); err != nil {
  491. s.closeLocal(err)
  492. }
  493. }
  494. // [Psiphon]
  495. // Stop timer to immediately release resources
  496. s.timer.Reset(time.Time{})
  497. s.handleCloseError(closeErr)
  498. s.logger.Infof("Connection %s closed.", s.logID)
  499. s.cryptoStreamHandler.Close()
  500. s.sendQueue.Close()
  501. return closeErr.err
  502. }
  503. // blocks until the early session can be used
  504. func (s *session) earlySessionReady() <-chan struct{} {
  505. return s.earlySessionReadyChan
  506. }
  507. func (s *session) HandshakeComplete() context.Context {
  508. return s.handshakeCtx
  509. }
  510. func (s *session) Context() context.Context {
  511. return s.ctx
  512. }
  513. func (s *session) ConnectionState() tls.ConnectionState {
  514. return s.cryptoStreamHandler.ConnectionState()
  515. }
  516. func (s *session) maybeResetTimer() {
  517. var deadline time.Time
  518. if s.config.KeepAlive && s.handshakeComplete && !s.keepAlivePingSent {
  519. deadline = s.idleTimeoutStartTime().Add(s.keepAliveInterval / 2)
  520. } else {
  521. deadline = s.idleTimeoutStartTime().Add(s.config.IdleTimeout)
  522. }
  523. if ackAlarm := s.receivedPacketHandler.GetAlarmTimeout(); !ackAlarm.IsZero() {
  524. deadline = utils.MinTime(deadline, ackAlarm)
  525. }
  526. if lossTime := s.sentPacketHandler.GetLossDetectionTimeout(); !lossTime.IsZero() {
  527. deadline = utils.MinTime(deadline, lossTime)
  528. }
  529. if !s.handshakeComplete {
  530. handshakeDeadline := s.sessionCreationTime.Add(s.config.HandshakeTimeout)
  531. deadline = utils.MinTime(deadline, handshakeDeadline)
  532. }
  533. if !s.pacingDeadline.IsZero() {
  534. deadline = utils.MinTime(deadline, s.pacingDeadline)
  535. }
  536. s.timer.Reset(deadline)
  537. }
  538. func (s *session) idleTimeoutStartTime() time.Time {
  539. return utils.MaxTime(s.lastPacketReceivedTime, s.firstAckElicitingPacketAfterIdleSentTime)
  540. }
  541. func (s *session) handleHandshakeComplete() {
  542. s.handshakeComplete = true
  543. s.handshakeCompleteChan = nil // prevent this case from ever being selected again
  544. s.handshakeCtxCancel()
  545. s.connIDGenerator.SetHandshakeComplete()
  546. s.sentPacketHandler.SetHandshakeComplete()
  547. // The client completes the handshake first (after sending the CFIN).
  548. // We need to make sure it learns about the server completing the handshake,
  549. // in order to stop retransmitting handshake packets.
  550. // They will stop retransmitting handshake packets when receiving the first 1-RTT packet.
  551. if s.perspective == protocol.PerspectiveServer {
  552. token, err := s.tokenGenerator.NewToken(s.conn.RemoteAddr())
  553. if err != nil {
  554. s.closeLocal(err)
  555. }
  556. s.queueControlFrame(&wire.NewTokenFrame{Token: token})
  557. }
  558. }
  559. func (s *session) handlePacketImpl(rp *receivedPacket) bool {
  560. var counter uint8
  561. var lastConnID protocol.ConnectionID
  562. var processed bool
  563. data := rp.data
  564. p := rp
  565. for len(data) > 0 {
  566. if counter > 0 {
  567. p = p.Clone()
  568. p.data = data
  569. }
  570. hdr, packetData, rest, err := wire.ParsePacket(p.data, s.srcConnIDLen)
  571. if err != nil {
  572. s.logger.Debugf("error parsing packet: %s", err)
  573. break
  574. }
  575. if counter > 0 && !hdr.DestConnectionID.Equal(lastConnID) {
  576. s.logger.Debugf("coalesced packet has different destination connection ID: %s, expected %s", hdr.DestConnectionID, lastConnID)
  577. break
  578. }
  579. lastConnID = hdr.DestConnectionID
  580. if counter > 0 {
  581. p.buffer.Split()
  582. }
  583. counter++
  584. // only log if this actually a coalesced packet
  585. if s.logger.Debug() && (counter > 1 || len(rest) > 0) {
  586. s.logger.Debugf("Parsed a coalesced packet. Part %d: %d bytes. Remaining: %d bytes.", counter, len(packetData), len(rest))
  587. }
  588. p.data = packetData
  589. if wasProcessed := s.handleSinglePacket(p, hdr); wasProcessed {
  590. processed = true
  591. }
  592. data = rest
  593. }
  594. p.buffer.MaybeRelease()
  595. return processed
  596. }
  597. func (s *session) handleSinglePacket(p *receivedPacket, hdr *wire.Header) bool /* was the packet successfully processed */ {
  598. var wasQueued bool
  599. defer func() {
  600. // Put back the packet buffer if the packet wasn't queued for later decryption.
  601. if !wasQueued {
  602. p.buffer.Decrement()
  603. }
  604. }()
  605. if hdr.Type == protocol.PacketTypeRetry {
  606. return s.handleRetryPacket(hdr)
  607. }
  608. // The server can change the source connection ID with the first Handshake packet.
  609. // After this, all packets with a different source connection have to be ignored.
  610. if s.receivedFirstPacket && hdr.IsLongHeader && !hdr.SrcConnectionID.Equal(s.handshakeDestConnID) {
  611. s.logger.Debugf("Dropping %s packet with unexpected source connection ID: %s (expected %s)", hdr.PacketType(), hdr.SrcConnectionID, s.handshakeDestConnID)
  612. return false
  613. }
  614. // drop 0-RTT packets
  615. if hdr.Type == protocol.PacketType0RTT {
  616. return false
  617. }
  618. packet, err := s.unpacker.Unpack(hdr, p.rcvTime, p.data)
  619. if err != nil {
  620. switch err {
  621. case handshake.ErrKeysDropped:
  622. s.logger.Debugf("Dropping %s packet because we already dropped the keys.", hdr.PacketType())
  623. case handshake.ErrKeysNotYetAvailable:
  624. // Sealer for this encryption level not yet available.
  625. // Try again later.
  626. wasQueued = true
  627. s.tryQueueingUndecryptablePacket(p)
  628. case wire.ErrInvalidReservedBits:
  629. s.closeLocal(qerr.Error(qerr.ProtocolViolation, err.Error()))
  630. default:
  631. // This might be a packet injected by an attacker.
  632. // Drop it.
  633. s.logger.Debugf("Dropping %s packet that could not be unpacked. Error: %s", hdr.PacketType(), err)
  634. }
  635. return false
  636. }
  637. if s.logger.Debug() {
  638. s.logger.Debugf("<- Reading packet %#x (%d bytes) for connection %s, %s", packet.packetNumber, len(p.data), hdr.DestConnectionID, packet.encryptionLevel)
  639. packet.hdr.Log(s.logger)
  640. }
  641. if err := s.handleUnpackedPacket(packet, p.rcvTime); err != nil {
  642. s.closeLocal(err)
  643. return false
  644. }
  645. return true
  646. }
  647. func (s *session) handleRetryPacket(hdr *wire.Header) bool /* was this a valid Retry */ {
  648. if s.perspective == protocol.PerspectiveServer {
  649. s.logger.Debugf("Ignoring Retry.")
  650. return false
  651. }
  652. if s.receivedFirstPacket {
  653. s.logger.Debugf("Ignoring Retry, since we already received a packet.")
  654. return false
  655. }
  656. (&wire.ExtendedHeader{Header: *hdr}).Log(s.logger)
  657. if !hdr.OrigDestConnectionID.Equal(s.handshakeDestConnID) {
  658. s.logger.Debugf("Ignoring spoofed Retry. Original Destination Connection ID: %s, expected: %s", hdr.OrigDestConnectionID, s.handshakeDestConnID)
  659. return false
  660. }
  661. if hdr.SrcConnectionID.Equal(s.handshakeDestConnID) {
  662. s.logger.Debugf("Ignoring Retry, since the server didn't change the Source Connection ID.")
  663. return false
  664. }
  665. // If a token is already set, this means that we already received a Retry from the server.
  666. // Ignore this Retry packet.
  667. if s.receivedRetry {
  668. s.logger.Debugf("Ignoring Retry, since a Retry was already received.")
  669. return false
  670. }
  671. s.logger.Debugf("<- Received Retry")
  672. s.logger.Debugf("Switching destination connection ID to: %s", hdr.SrcConnectionID)
  673. s.origDestConnID = s.handshakeDestConnID
  674. newDestConnID := hdr.SrcConnectionID
  675. s.receivedRetry = true
  676. if err := s.sentPacketHandler.ResetForRetry(); err != nil {
  677. s.closeLocal(err)
  678. return false
  679. }
  680. s.handshakeDestConnID = newDestConnID
  681. s.cryptoStreamHandler.ChangeConnectionID(newDestConnID)
  682. s.packer.SetToken(hdr.Token)
  683. s.connIDManager.ChangeInitialConnID(newDestConnID)
  684. s.scheduleSending()
  685. return true
  686. }
  687. func (s *session) handleUnpackedPacket(packet *unpackedPacket, rcvTime time.Time) error {
  688. if len(packet.data) == 0 {
  689. return qerr.Error(qerr.ProtocolViolation, "empty packet")
  690. }
  691. // The server can change the source connection ID with the first Handshake packet.
  692. if s.perspective == protocol.PerspectiveClient && !s.receivedFirstPacket && packet.hdr.IsLongHeader && !packet.hdr.SrcConnectionID.Equal(s.handshakeDestConnID) {
  693. cid := packet.hdr.SrcConnectionID
  694. s.logger.Debugf("Received first packet. Switching destination connection ID to: %s", cid)
  695. s.handshakeDestConnID = cid
  696. s.connIDManager.ChangeInitialConnID(cid)
  697. }
  698. s.receivedFirstPacket = true
  699. s.lastPacketReceivedTime = rcvTime
  700. s.firstAckElicitingPacketAfterIdleSentTime = time.Time{}
  701. s.keepAlivePingSent = false
  702. // Only used for tracing.
  703. // If we're not tracing, this slice will always remain empty.
  704. var frames []wire.Frame
  705. var transportState *quictrace.TransportState
  706. r := bytes.NewReader(packet.data)
  707. var isAckEliciting bool
  708. for {
  709. frame, err := s.frameParser.ParseNext(r, packet.encryptionLevel)
  710. if err != nil {
  711. return err
  712. }
  713. if frame == nil {
  714. break
  715. }
  716. if ackhandler.IsFrameAckEliciting(frame) {
  717. isAckEliciting = true
  718. }
  719. if s.traceCallback != nil {
  720. frames = append(frames, frame)
  721. }
  722. if err := s.handleFrame(frame, packet.packetNumber, packet.encryptionLevel); err != nil {
  723. return err
  724. }
  725. }
  726. if s.traceCallback != nil {
  727. transportState = s.sentPacketHandler.GetStats()
  728. s.traceCallback(quictrace.Event{
  729. Time: rcvTime,
  730. EventType: quictrace.PacketReceived,
  731. TransportState: transportState,
  732. EncryptionLevel: packet.encryptionLevel,
  733. PacketNumber: packet.packetNumber,
  734. PacketSize: protocol.ByteCount(len(packet.data)),
  735. Frames: frames,
  736. })
  737. }
  738. s.receivedPacketHandler.ReceivedPacket(packet.packetNumber, packet.encryptionLevel, rcvTime, isAckEliciting)
  739. return nil
  740. }
  741. func (s *session) handleFrame(f wire.Frame, pn protocol.PacketNumber, encLevel protocol.EncryptionLevel) error {
  742. var err error
  743. wire.LogFrame(s.logger, f, false)
  744. switch frame := f.(type) {
  745. case *wire.CryptoFrame:
  746. err = s.handleCryptoFrame(frame, encLevel)
  747. case *wire.StreamFrame:
  748. err = s.handleStreamFrame(frame)
  749. case *wire.AckFrame:
  750. err = s.handleAckFrame(frame, pn, encLevel)
  751. case *wire.ConnectionCloseFrame:
  752. s.handleConnectionCloseFrame(frame)
  753. case *wire.ResetStreamFrame:
  754. err = s.handleResetStreamFrame(frame)
  755. case *wire.MaxDataFrame:
  756. s.handleMaxDataFrame(frame)
  757. case *wire.MaxStreamDataFrame:
  758. err = s.handleMaxStreamDataFrame(frame)
  759. case *wire.MaxStreamsFrame:
  760. err = s.handleMaxStreamsFrame(frame)
  761. case *wire.DataBlockedFrame:
  762. case *wire.StreamDataBlockedFrame:
  763. case *wire.StreamsBlockedFrame:
  764. case *wire.StopSendingFrame:
  765. err = s.handleStopSendingFrame(frame)
  766. case *wire.PingFrame:
  767. case *wire.PathChallengeFrame:
  768. s.handlePathChallengeFrame(frame)
  769. case *wire.PathResponseFrame:
  770. // since we don't send PATH_CHALLENGEs, we don't expect PATH_RESPONSEs
  771. err = errors.New("unexpected PATH_RESPONSE frame")
  772. case *wire.NewTokenFrame:
  773. err = s.handleNewTokenFrame(frame)
  774. case *wire.NewConnectionIDFrame:
  775. err = s.handleNewConnectionIDFrame(frame)
  776. case *wire.RetireConnectionIDFrame:
  777. err = s.handleRetireConnectionIDFrame(frame)
  778. default:
  779. err = fmt.Errorf("unexpected frame type: %s", reflect.ValueOf(&frame).Elem().Type().Name())
  780. }
  781. return err
  782. }
  783. // handlePacket is called by the server with a new packet
  784. func (s *session) handlePacket(p *receivedPacket) {
  785. // Discard packets once the amount of queued packets is larger than
  786. // the channel size, protocol.MaxSessionUnprocessedPackets
  787. select {
  788. case s.receivedPackets <- p:
  789. default:
  790. }
  791. }
  792. func (s *session) handleConnectionCloseFrame(frame *wire.ConnectionCloseFrame) {
  793. var e error
  794. if frame.IsApplicationError {
  795. e = qerr.ApplicationError(frame.ErrorCode, frame.ReasonPhrase)
  796. } else {
  797. e = qerr.Error(frame.ErrorCode, frame.ReasonPhrase)
  798. }
  799. s.closeRemote(e)
  800. }
  801. func (s *session) handleCryptoFrame(frame *wire.CryptoFrame, encLevel protocol.EncryptionLevel) error {
  802. encLevelChanged, err := s.cryptoStreamManager.HandleCryptoFrame(frame, encLevel)
  803. if err != nil {
  804. return err
  805. }
  806. if encLevelChanged {
  807. s.tryDecryptingQueuedPackets()
  808. }
  809. return nil
  810. }
  811. func (s *session) handleStreamFrame(frame *wire.StreamFrame) error {
  812. str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID)
  813. if err != nil {
  814. return err
  815. }
  816. if str == nil {
  817. // Stream is closed and already garbage collected
  818. // ignore this StreamFrame
  819. return nil
  820. }
  821. return str.handleStreamFrame(frame)
  822. }
  823. func (s *session) handleMaxDataFrame(frame *wire.MaxDataFrame) {
  824. s.connFlowController.UpdateSendWindow(frame.ByteOffset)
  825. }
  826. func (s *session) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) error {
  827. str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID)
  828. if err != nil {
  829. return err
  830. }
  831. if str == nil {
  832. // stream is closed and already garbage collected
  833. return nil
  834. }
  835. str.handleMaxStreamDataFrame(frame)
  836. return nil
  837. }
  838. func (s *session) handleMaxStreamsFrame(frame *wire.MaxStreamsFrame) error {
  839. return s.streamsMap.HandleMaxStreamsFrame(frame)
  840. }
  841. func (s *session) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
  842. str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID)
  843. if err != nil {
  844. return err
  845. }
  846. if str == nil {
  847. // stream is closed and already garbage collected
  848. return nil
  849. }
  850. return str.handleResetStreamFrame(frame)
  851. }
  852. func (s *session) handleStopSendingFrame(frame *wire.StopSendingFrame) error {
  853. str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID)
  854. if err != nil {
  855. return err
  856. }
  857. if str == nil {
  858. // stream is closed and already garbage collected
  859. return nil
  860. }
  861. str.handleStopSendingFrame(frame)
  862. return nil
  863. }
  864. func (s *session) handlePathChallengeFrame(frame *wire.PathChallengeFrame) {
  865. s.queueControlFrame(&wire.PathResponseFrame{Data: frame.Data})
  866. }
  867. func (s *session) handleNewTokenFrame(frame *wire.NewTokenFrame) error {
  868. if s.perspective == protocol.PerspectiveServer {
  869. return qerr.Error(qerr.ProtocolViolation, "Received NEW_TOKEN frame from the client.")
  870. }
  871. if s.config.TokenStore != nil {
  872. s.config.TokenStore.Put(s.tokenStoreKey, &ClientToken{data: frame.Token})
  873. }
  874. return nil
  875. }
  876. func (s *session) handleNewConnectionIDFrame(f *wire.NewConnectionIDFrame) error {
  877. return s.connIDManager.Add(f)
  878. }
  879. func (s *session) handleRetireConnectionIDFrame(f *wire.RetireConnectionIDFrame) error {
  880. return s.connIDGenerator.Retire(f.SequenceNumber)
  881. }
  882. func (s *session) handleAckFrame(frame *wire.AckFrame, pn protocol.PacketNumber, encLevel protocol.EncryptionLevel) error {
  883. if err := s.sentPacketHandler.ReceivedAck(frame, pn, encLevel, s.lastPacketReceivedTime); err != nil {
  884. return err
  885. }
  886. if encLevel == protocol.Encryption1RTT {
  887. s.receivedPacketHandler.IgnoreBelow(s.sentPacketHandler.GetLowestPacketNotConfirmedAcked())
  888. s.cryptoStreamHandler.SetLargest1RTTAcked(frame.LargestAcked())
  889. }
  890. return nil
  891. }
  892. // closeLocal closes the session and send a CONNECTION_CLOSE containing the error
  893. func (s *session) closeLocal(e error) {
  894. s.closeOnce.Do(func() {
  895. if e == nil {
  896. s.logger.Infof("Closing session.")
  897. } else {
  898. s.logger.Errorf("Closing session with error: %s", e)
  899. }
  900. s.closeChan <- closeError{err: e, immediate: false, remote: false}
  901. })
  902. }
  903. // destroy closes the session without sending the error on the wire
  904. func (s *session) destroy(e error) {
  905. s.destroyImpl(e)
  906. <-s.ctx.Done()
  907. }
  908. func (s *session) destroyImpl(e error) {
  909. s.closeOnce.Do(func() {
  910. if nerr, ok := e.(net.Error); ok && nerr.Timeout() {
  911. s.logger.Errorf("Destroying session %s: %s", s.connIDManager.Get(), e)
  912. } else {
  913. s.logger.Errorf("Destroying session %s with error: %s", s.connIDManager.Get(), e)
  914. }
  915. s.closeChan <- closeError{err: e, immediate: true, remote: false}
  916. })
  917. }
  918. // closeForRecreating closes the session in order to recreate it immediately afterwards
  919. // It returns the first packet number that should be used in the new session.
  920. func (s *session) closeForRecreating() protocol.PacketNumber {
  921. s.destroy(errCloseForRecreating)
  922. nextPN, _ := s.sentPacketHandler.PeekPacketNumber(protocol.EncryptionInitial)
  923. return nextPN
  924. }
  925. func (s *session) closeRemote(e error) {
  926. s.closeOnce.Do(func() {
  927. s.logger.Errorf("Peer closed session with error: %s", e)
  928. s.closeChan <- closeError{err: e, immediate: true, remote: true}
  929. })
  930. }
  931. // Close the connection. It sends a qerr.NoError.
  932. // It waits until the run loop has stopped before returning
  933. func (s *session) Close() error {
  934. s.closeLocal(nil)
  935. <-s.ctx.Done()
  936. return nil
  937. }
  938. func (s *session) CloseWithError(code protocol.ApplicationErrorCode, desc string) error {
  939. s.closeLocal(qerr.ApplicationError(qerr.ErrorCode(code), desc))
  940. <-s.ctx.Done()
  941. return nil
  942. }
  943. func (s *session) handleCloseError(closeErr closeError) {
  944. if closeErr.err == nil {
  945. closeErr.err = qerr.ApplicationError(0, "")
  946. }
  947. var quicErr *qerr.QuicError
  948. var ok bool
  949. if quicErr, ok = closeErr.err.(*qerr.QuicError); !ok {
  950. quicErr = qerr.ToQuicError(closeErr.err)
  951. }
  952. s.streamsMap.CloseWithError(quicErr)
  953. s.connIDManager.Close()
  954. // If this is a remote close we're done here
  955. if closeErr.remote {
  956. s.connIDGenerator.ReplaceWithClosed(newClosedRemoteSession(s.perspective))
  957. return
  958. }
  959. if closeErr.immediate {
  960. s.connIDGenerator.RemoveAll()
  961. return
  962. }
  963. connClosePacket, err := s.sendConnectionClose(quicErr)
  964. if err != nil {
  965. s.logger.Debugf("Error sending CONNECTION_CLOSE: %s", err)
  966. }
  967. cs := newClosedLocalSession(s.conn, connClosePacket, s.perspective, s.logger)
  968. s.connIDGenerator.ReplaceWithClosed(cs)
  969. }
  970. func (s *session) dropEncryptionLevel(encLevel protocol.EncryptionLevel) {
  971. s.sentPacketHandler.DropPackets(encLevel)
  972. s.receivedPacketHandler.DropPackets(encLevel)
  973. }
  974. func (s *session) processTransportParameters(data []byte) {
  975. var params *handshake.TransportParameters
  976. var err error
  977. switch s.perspective {
  978. case protocol.PerspectiveClient:
  979. params, err = s.processTransportParametersForClient(data)
  980. case protocol.PerspectiveServer:
  981. params, err = s.processTransportParametersForServer(data)
  982. }
  983. if err != nil {
  984. s.closeLocal(err)
  985. return
  986. }
  987. s.logger.Debugf("Received Transport Parameters: %s", params)
  988. s.peerParams = params
  989. s.keepAliveInterval = utils.MinDuration(params.IdleTimeout/2, protocol.MaxKeepAliveInterval)
  990. if err := s.streamsMap.UpdateLimits(params); err != nil {
  991. s.closeLocal(err)
  992. return
  993. }
  994. s.packer.HandleTransportParameters(params)
  995. s.frameParser.SetAckDelayExponent(params.AckDelayExponent)
  996. s.connFlowController.UpdateSendWindow(params.InitialMaxData)
  997. s.rttStats.SetMaxAckDelay(params.MaxAckDelay)
  998. s.connIDGenerator.SetMaxActiveConnIDs(params.ActiveConnectionIDLimit)
  999. if params.StatelessResetToken != nil {
  1000. s.connIDManager.SetStatelessResetToken(*params.StatelessResetToken)
  1001. }
  1002. // On the server side, the early session is ready as soon as we processed
  1003. // the client's transport parameters.
  1004. close(s.earlySessionReadyChan)
  1005. }
  1006. func (s *session) processTransportParametersForClient(data []byte) (*handshake.TransportParameters, error) {
  1007. params := &handshake.TransportParameters{}
  1008. if err := params.Unmarshal(data, s.perspective.Opposite()); err != nil {
  1009. return nil, err
  1010. }
  1011. // check the Retry token
  1012. if !params.OriginalConnectionID.Equal(s.origDestConnID) {
  1013. return nil, qerr.Error(qerr.TransportParameterError, fmt.Sprintf("expected original_connection_id to equal %s, is %s", s.origDestConnID, params.OriginalConnectionID))
  1014. }
  1015. // We don't support connection migration yet, so we don't have any use for the preferred_address.
  1016. if params.PreferredAddress != nil {
  1017. s.logger.Debugf("Server sent preferred_address. Retiring the preferred_address connection ID.")
  1018. // Retire the connection ID.
  1019. s.framer.QueueControlFrame(&wire.RetireConnectionIDFrame{SequenceNumber: 1})
  1020. }
  1021. return params, nil
  1022. }
  1023. func (s *session) processTransportParametersForServer(data []byte) (*handshake.TransportParameters, error) {
  1024. params := &handshake.TransportParameters{}
  1025. if err := params.Unmarshal(data, s.perspective.Opposite()); err != nil {
  1026. return nil, err
  1027. }
  1028. return params, nil
  1029. }
  1030. func (s *session) sendPackets() error {
  1031. s.pacingDeadline = time.Time{}
  1032. sendMode := s.sentPacketHandler.SendMode()
  1033. if sendMode == ackhandler.SendNone { // shortcut: return immediately if there's nothing to send
  1034. return nil
  1035. }
  1036. numPackets := s.sentPacketHandler.ShouldSendNumPackets()
  1037. var numPacketsSent int
  1038. sendLoop:
  1039. for {
  1040. switch sendMode {
  1041. case ackhandler.SendNone:
  1042. break sendLoop
  1043. case ackhandler.SendAck:
  1044. // If we already sent packets, and the send mode switches to SendAck,
  1045. // we've just become congestion limited.
  1046. // There's no need to try to send an ACK at this moment.
  1047. if numPacketsSent > 0 {
  1048. return nil
  1049. }
  1050. // We can at most send a single ACK only packet.
  1051. // There will only be a new ACK after receiving new packets.
  1052. // SendAck is only returned when we're congestion limited, so we don't need to set the pacingt timer.
  1053. return s.maybeSendAckOnlyPacket()
  1054. case ackhandler.SendPTOInitial:
  1055. if err := s.sendProbePacket(protocol.EncryptionInitial); err != nil {
  1056. return err
  1057. }
  1058. numPacketsSent++
  1059. case ackhandler.SendPTOHandshake:
  1060. if err := s.sendProbePacket(protocol.EncryptionHandshake); err != nil {
  1061. return err
  1062. }
  1063. numPacketsSent++
  1064. case ackhandler.SendPTOAppData:
  1065. if err := s.sendProbePacket(protocol.Encryption1RTT); err != nil {
  1066. return err
  1067. }
  1068. numPacketsSent++
  1069. case ackhandler.SendAny:
  1070. sentPacket, err := s.sendPacket()
  1071. if err != nil {
  1072. return err
  1073. }
  1074. if !sentPacket {
  1075. break sendLoop
  1076. }
  1077. numPacketsSent++
  1078. default:
  1079. return fmt.Errorf("BUG: invalid send mode %d", sendMode)
  1080. }
  1081. if numPacketsSent >= numPackets {
  1082. break
  1083. }
  1084. sendMode = s.sentPacketHandler.SendMode()
  1085. }
  1086. // Only start the pacing timer if we sent as many packets as we were allowed.
  1087. // There will probably be more to send when calling sendPacket again.
  1088. if numPacketsSent == numPackets {
  1089. s.pacingDeadline = s.sentPacketHandler.TimeUntilSend()
  1090. }
  1091. return nil
  1092. }
  1093. func (s *session) maybeSendAckOnlyPacket() error {
  1094. packet, err := s.packer.MaybePackAckPacket()
  1095. if err != nil {
  1096. return err
  1097. }
  1098. if packet == nil {
  1099. return nil
  1100. }
  1101. s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket(s.retransmissionQueue))
  1102. s.sendPackedPacket(packet)
  1103. return nil
  1104. }
  1105. func (s *session) sendProbePacket(encLevel protocol.EncryptionLevel) error {
  1106. // Queue probe packets until we actually send out a packet,
  1107. // or until there are no more packets to queue.
  1108. var packet *packedPacket
  1109. for {
  1110. if wasQueued := s.sentPacketHandler.QueueProbePacket(encLevel); !wasQueued {
  1111. break
  1112. }
  1113. var err error
  1114. packet, err = s.packer.MaybePackProbePacket(encLevel)
  1115. if err != nil {
  1116. return err
  1117. }
  1118. if packet != nil {
  1119. break
  1120. }
  1121. }
  1122. if packet == nil {
  1123. switch encLevel {
  1124. case protocol.EncryptionInitial:
  1125. s.retransmissionQueue.AddInitial(&wire.PingFrame{})
  1126. case protocol.EncryptionHandshake:
  1127. s.retransmissionQueue.AddHandshake(&wire.PingFrame{})
  1128. case protocol.Encryption1RTT:
  1129. s.retransmissionQueue.AddAppData(&wire.PingFrame{})
  1130. default:
  1131. panic("unexpected encryption level")
  1132. }
  1133. var err error
  1134. packet, err = s.packer.MaybePackProbePacket(encLevel)
  1135. if err != nil {
  1136. return err
  1137. }
  1138. }
  1139. if packet == nil {
  1140. return fmt.Errorf("session BUG: couldn't pack %s probe packet", encLevel)
  1141. }
  1142. s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket(s.retransmissionQueue))
  1143. s.sendPackedPacket(packet)
  1144. return nil
  1145. }
  1146. func (s *session) sendPacket() (bool, error) {
  1147. if isBlocked, offset := s.connFlowController.IsNewlyBlocked(); isBlocked {
  1148. s.framer.QueueControlFrame(&wire.DataBlockedFrame{DataLimit: offset})
  1149. }
  1150. s.windowUpdateQueue.QueueAll()
  1151. packet, err := s.packer.PackPacket()
  1152. if err != nil || packet == nil {
  1153. return false, err
  1154. }
  1155. s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket(s.retransmissionQueue))
  1156. s.sendPackedPacket(packet)
  1157. return true, nil
  1158. }
  1159. func (s *session) sendPackedPacket(packet *packedPacket) {
  1160. if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && packet.IsAckEliciting() {
  1161. s.firstAckElicitingPacketAfterIdleSentTime = time.Now()
  1162. }
  1163. if s.traceCallback != nil {
  1164. frames := make([]wire.Frame, 0, len(packet.frames))
  1165. for _, f := range packet.frames {
  1166. frames = append(frames, f.Frame)
  1167. }
  1168. s.traceCallback(quictrace.Event{
  1169. Time: time.Now(),
  1170. EventType: quictrace.PacketSent,
  1171. TransportState: s.sentPacketHandler.GetStats(),
  1172. EncryptionLevel: packet.EncryptionLevel(),
  1173. PacketNumber: packet.header.PacketNumber,
  1174. PacketSize: protocol.ByteCount(len(packet.raw)),
  1175. Frames: frames,
  1176. })
  1177. }
  1178. s.logPacket(packet)
  1179. s.connIDManager.SentPacket()
  1180. s.sendQueue.Send(packet)
  1181. }
  1182. func (s *session) sendConnectionClose(quicErr *qerr.QuicError) ([]byte, error) {
  1183. // don't send application errors in Initial or Handshake packets
  1184. if quicErr.IsApplicationError() && !s.handshakeComplete {
  1185. quicErr = qerr.UserCanceledError
  1186. }
  1187. var reason string
  1188. // don't send details of crypto errors
  1189. if !quicErr.IsCryptoError() {
  1190. reason = quicErr.ErrorMessage
  1191. }
  1192. packet, err := s.packer.PackConnectionClose(&wire.ConnectionCloseFrame{
  1193. IsApplicationError: quicErr.IsApplicationError(),
  1194. ErrorCode: quicErr.ErrorCode,
  1195. FrameType: quicErr.FrameType,
  1196. ReasonPhrase: reason,
  1197. })
  1198. if err != nil {
  1199. return nil, err
  1200. }
  1201. s.logPacket(packet)
  1202. return packet.raw, s.conn.Write(packet.raw)
  1203. }
  1204. func (s *session) logPacket(packet *packedPacket) {
  1205. if !s.logger.Debug() {
  1206. // We don't need to allocate the slices for calling the format functions
  1207. return
  1208. }
  1209. s.logger.Debugf("-> Sending packet 0x%x (%d bytes) for connection %s, %s", packet.header.PacketNumber, len(packet.raw), s.logID, packet.EncryptionLevel())
  1210. packet.header.Log(s.logger)
  1211. if packet.ack != nil {
  1212. wire.LogFrame(s.logger, packet.ack, true)
  1213. }
  1214. for _, frame := range packet.frames {
  1215. wire.LogFrame(s.logger, frame.Frame, true)
  1216. }
  1217. }
  1218. // AcceptStream returns the next stream openend by the peer
  1219. func (s *session) AcceptStream(ctx context.Context) (Stream, error) {
  1220. return s.streamsMap.AcceptStream(ctx)
  1221. }
  1222. func (s *session) AcceptUniStream(ctx context.Context) (ReceiveStream, error) {
  1223. return s.streamsMap.AcceptUniStream(ctx)
  1224. }
  1225. // OpenStream opens a stream
  1226. func (s *session) OpenStream() (Stream, error) {
  1227. return s.streamsMap.OpenStream()
  1228. }
  1229. func (s *session) OpenStreamSync(ctx context.Context) (Stream, error) {
  1230. return s.streamsMap.OpenStreamSync(ctx)
  1231. }
  1232. func (s *session) OpenUniStream() (SendStream, error) {
  1233. return s.streamsMap.OpenUniStream()
  1234. }
  1235. func (s *session) OpenUniStreamSync(ctx context.Context) (SendStream, error) {
  1236. return s.streamsMap.OpenUniStreamSync(ctx)
  1237. }
  1238. func (s *session) newFlowController(id protocol.StreamID) flowcontrol.StreamFlowController {
  1239. var initialSendWindow protocol.ByteCount
  1240. if s.peerParams != nil {
  1241. if id.Type() == protocol.StreamTypeUni {
  1242. initialSendWindow = s.peerParams.InitialMaxStreamDataUni
  1243. } else {
  1244. if id.InitiatedBy() == s.perspective {
  1245. initialSendWindow = s.peerParams.InitialMaxStreamDataBidiRemote
  1246. } else {
  1247. initialSendWindow = s.peerParams.InitialMaxStreamDataBidiLocal
  1248. }
  1249. }
  1250. }
  1251. return flowcontrol.NewStreamFlowController(
  1252. id,
  1253. s.connFlowController,
  1254. protocol.InitialMaxStreamData,
  1255. protocol.ByteCount(s.config.MaxReceiveStreamFlowControlWindow),
  1256. initialSendWindow,
  1257. s.onHasStreamWindowUpdate,
  1258. s.rttStats,
  1259. s.logger,
  1260. )
  1261. }
  1262. // scheduleSending signals that we have data for sending
  1263. func (s *session) scheduleSending() {
  1264. select {
  1265. case s.sendingScheduled <- struct{}{}:
  1266. default:
  1267. }
  1268. }
  1269. func (s *session) tryQueueingUndecryptablePacket(p *receivedPacket) {
  1270. if s.handshakeComplete {
  1271. s.logger.Debugf("Received undecryptable packet from %s after the handshake (%d bytes)", p.remoteAddr.String(), len(p.data))
  1272. return
  1273. }
  1274. if len(s.undecryptablePackets)+1 > protocol.MaxUndecryptablePackets {
  1275. s.logger.Infof("Dropping undecrytable packet (%d bytes). Undecryptable packet queue full.", len(p.data))
  1276. return
  1277. }
  1278. s.logger.Infof("Queueing packet (%d bytes) for later decryption", len(p.data))
  1279. s.undecryptablePackets = append(s.undecryptablePackets, p)
  1280. }
  1281. func (s *session) tryDecryptingQueuedPackets() {
  1282. for _, p := range s.undecryptablePackets {
  1283. s.handlePacket(p)
  1284. }
  1285. s.undecryptablePackets = s.undecryptablePackets[:0]
  1286. }
  1287. func (s *session) queueControlFrame(f wire.Frame) {
  1288. s.framer.QueueControlFrame(f)
  1289. s.scheduleSending()
  1290. }
  1291. func (s *session) onHasStreamWindowUpdate(id protocol.StreamID) {
  1292. s.windowUpdateQueue.AddStream(id)
  1293. s.scheduleSending()
  1294. }
  1295. func (s *session) onHasConnectionWindowUpdate() {
  1296. s.windowUpdateQueue.AddConnection()
  1297. s.scheduleSending()
  1298. }
  1299. func (s *session) onHasStreamData(id protocol.StreamID) {
  1300. s.framer.AddActiveStream(id)
  1301. s.scheduleSending()
  1302. }
  1303. func (s *session) onStreamCompleted(id protocol.StreamID) {
  1304. if err := s.streamsMap.DeleteStream(id); err != nil {
  1305. s.closeLocal(err)
  1306. }
  1307. }
  1308. func (s *session) LocalAddr() net.Addr {
  1309. return s.conn.LocalAddr()
  1310. }
  1311. func (s *session) RemoteAddr() net.Addr {
  1312. return s.conn.RemoteAddr()
  1313. }
  1314. func (s *session) getPerspective() protocol.Perspective {
  1315. return s.perspective
  1316. }
  1317. func (s *session) GetVersion() protocol.VersionNumber {
  1318. return s.version
  1319. }