session.go 45 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490
  1. package quic
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/tls"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "reflect"
  11. "sync"
  12. "time"
  13. "github.com/Psiphon-Labs/quic-go/internal/ackhandler"
  14. "github.com/Psiphon-Labs/quic-go/internal/congestion"
  15. "github.com/Psiphon-Labs/quic-go/internal/flowcontrol"
  16. "github.com/Psiphon-Labs/quic-go/internal/handshake"
  17. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  18. "github.com/Psiphon-Labs/quic-go/internal/qerr"
  19. "github.com/Psiphon-Labs/quic-go/internal/utils"
  20. "github.com/Psiphon-Labs/quic-go/internal/wire"
  21. "github.com/Psiphon-Labs/quic-go/quictrace"
  22. )
  23. type unpacker interface {
  24. Unpack(hdr *wire.Header, rcvTime time.Time, data []byte) (*unpackedPacket, error)
  25. }
  26. type streamGetter interface {
  27. GetOrOpenReceiveStream(protocol.StreamID) (receiveStreamI, error)
  28. GetOrOpenSendStream(protocol.StreamID) (sendStreamI, error)
  29. }
  30. type streamManager interface {
  31. GetOrOpenSendStream(protocol.StreamID) (sendStreamI, error)
  32. GetOrOpenReceiveStream(protocol.StreamID) (receiveStreamI, error)
  33. OpenStream() (Stream, error)
  34. OpenUniStream() (SendStream, error)
  35. OpenStreamSync(context.Context) (Stream, error)
  36. OpenUniStreamSync(context.Context) (SendStream, error)
  37. AcceptStream(context.Context) (Stream, error)
  38. AcceptUniStream(context.Context) (ReceiveStream, error)
  39. DeleteStream(protocol.StreamID) error
  40. UpdateLimits(*handshake.TransportParameters) error
  41. HandleMaxStreamsFrame(*wire.MaxStreamsFrame) error
  42. CloseWithError(error)
  43. }
  44. type cryptoStreamHandler interface {
  45. RunHandshake()
  46. ChangeConnectionID(protocol.ConnectionID)
  47. SetLargest1RTTAcked(protocol.PacketNumber)
  48. io.Closer
  49. ConnectionState() tls.ConnectionState
  50. }
  51. type receivedPacket struct {
  52. remoteAddr net.Addr
  53. rcvTime time.Time
  54. data []byte
  55. buffer *packetBuffer
  56. }
  57. func (p *receivedPacket) Clone() *receivedPacket {
  58. return &receivedPacket{
  59. remoteAddr: p.remoteAddr,
  60. rcvTime: p.rcvTime,
  61. data: p.data,
  62. buffer: p.buffer,
  63. }
  64. }
  65. type sessionRunner interface {
  66. Add(protocol.ConnectionID, packetHandler) [16]byte
  67. Retire(protocol.ConnectionID)
  68. Remove(protocol.ConnectionID)
  69. ReplaceWithClosed(protocol.ConnectionID, packetHandler)
  70. AddResetToken([16]byte, packetHandler)
  71. RemoveResetToken([16]byte)
  72. RetireResetToken([16]byte)
  73. }
  74. type handshakeRunner struct {
  75. onReceivedParams func([]byte)
  76. onError func(error)
  77. dropKeys func(protocol.EncryptionLevel)
  78. onHandshakeComplete func()
  79. }
  80. func (r *handshakeRunner) OnReceivedParams(b []byte) { r.onReceivedParams(b) }
  81. func (r *handshakeRunner) OnError(e error) { r.onError(e) }
  82. func (r *handshakeRunner) DropKeys(el protocol.EncryptionLevel) { r.dropKeys(el) }
  83. func (r *handshakeRunner) OnHandshakeComplete() { r.onHandshakeComplete() }
  84. type closeError struct {
  85. err error
  86. remote bool
  87. immediate bool
  88. }
  89. var errCloseForRecreating = errors.New("closing session in order to recreate it")
  90. // A Session is a QUIC session
  91. type session struct {
  92. // Destination connection ID used during the handshake.
  93. // Used to check source connection ID on incoming packets.
  94. handshakeDestConnID protocol.ConnectionID
  95. // if the server sends a Retry, this is the connection ID we used initially
  96. origDestConnID protocol.ConnectionID
  97. srcConnIDLen int
  98. perspective protocol.Perspective
  99. initialVersion protocol.VersionNumber // if version negotiation is performed, this is the version we initially tried
  100. version protocol.VersionNumber
  101. config *Config
  102. conn connection
  103. sendQueue *sendQueue
  104. streamsMap streamManager
  105. connIDManager *connIDManager
  106. connIDGenerator *connIDGenerator
  107. rttStats *congestion.RTTStats
  108. cryptoStreamManager *cryptoStreamManager
  109. sentPacketHandler ackhandler.SentPacketHandler
  110. receivedPacketHandler ackhandler.ReceivedPacketHandler
  111. retransmissionQueue *retransmissionQueue
  112. framer framer
  113. windowUpdateQueue *windowUpdateQueue
  114. connFlowController flowcontrol.ConnectionFlowController
  115. tokenStoreKey string // only set for the client
  116. tokenGenerator *handshake.TokenGenerator // only set for the server
  117. unpacker unpacker
  118. frameParser wire.FrameParser
  119. packer packer
  120. cryptoStreamHandler cryptoStreamHandler
  121. receivedPackets chan *receivedPacket
  122. sendingScheduled chan struct{}
  123. closeOnce sync.Once
  124. // closeChan is used to notify the run loop that it should terminate
  125. closeChan chan closeError
  126. ctx context.Context
  127. ctxCancel context.CancelFunc
  128. handshakeCtx context.Context
  129. handshakeCtxCancel context.CancelFunc
  130. undecryptablePackets []*receivedPacket
  131. clientHelloWritten <-chan struct{}
  132. earlySessionReadyChan chan struct{}
  133. handshakeCompleteChan chan struct{} // is closed when the handshake completes
  134. handshakeComplete bool
  135. receivedRetry bool
  136. receivedFirstPacket bool
  137. sessionCreationTime time.Time
  138. // The idle timeout is set based on the max of the time we received the last packet...
  139. lastPacketReceivedTime time.Time
  140. // ... and the time we sent a new ack-eliciting packet after receiving a packet.
  141. firstAckElicitingPacketAfterIdleSentTime time.Time
  142. // pacingDeadline is the time when the next packet should be sent
  143. pacingDeadline time.Time
  144. peerParams *handshake.TransportParameters
  145. timer *utils.Timer
  146. // keepAlivePingSent stores whether a keep alive PING is in flight.
  147. // It is reset as soon as we receive a packet from the peer.
  148. keepAlivePingSent bool
  149. keepAliveInterval time.Duration
  150. traceCallback func(quictrace.Event)
  151. logID string
  152. logger utils.Logger
  153. }
  154. var _ Session = &session{}
  155. var _ EarlySession = &session{}
  156. var _ streamSender = &session{}
  157. var newSession = func(
  158. conn connection,
  159. runner sessionRunner,
  160. origDestConnID protocol.ConnectionID,
  161. clientDestConnID protocol.ConnectionID,
  162. destConnID protocol.ConnectionID,
  163. srcConnID protocol.ConnectionID,
  164. statelessResetToken [16]byte,
  165. conf *Config,
  166. tlsConf *tls.Config,
  167. tokenGenerator *handshake.TokenGenerator,
  168. logger utils.Logger,
  169. v protocol.VersionNumber,
  170. ) quicSession {
  171. s := &session{
  172. conn: conn,
  173. config: conf,
  174. handshakeDestConnID: destConnID,
  175. srcConnIDLen: srcConnID.Len(),
  176. tokenGenerator: tokenGenerator,
  177. perspective: protocol.PerspectiveServer,
  178. handshakeCompleteChan: make(chan struct{}),
  179. logger: logger,
  180. version: v,
  181. }
  182. if origDestConnID != nil {
  183. s.logID = origDestConnID.String()
  184. } else {
  185. s.logID = destConnID.String()
  186. }
  187. s.connIDManager = newConnIDManager(
  188. destConnID,
  189. func(token [16]byte) { runner.AddResetToken(token, s) },
  190. runner.RemoveResetToken,
  191. runner.RetireResetToken,
  192. s.queueControlFrame,
  193. )
  194. s.connIDGenerator = newConnIDGenerator(
  195. srcConnID,
  196. clientDestConnID,
  197. func(connID protocol.ConnectionID) [16]byte { return runner.Add(connID, s) },
  198. runner.Remove,
  199. runner.Retire,
  200. runner.ReplaceWithClosed,
  201. s.queueControlFrame,
  202. )
  203. s.preSetup()
  204. s.sentPacketHandler = ackhandler.NewSentPacketHandler(0, s.rttStats, s.traceCallback, s.logger)
  205. initialStream := newCryptoStream()
  206. handshakeStream := newCryptoStream()
  207. oneRTTStream := newPostHandshakeCryptoStream(s.framer)
  208. params := &handshake.TransportParameters{
  209. InitialMaxStreamDataBidiLocal: protocol.InitialMaxStreamData,
  210. InitialMaxStreamDataBidiRemote: protocol.InitialMaxStreamData,
  211. InitialMaxStreamDataUni: protocol.InitialMaxStreamData,
  212. InitialMaxData: protocol.InitialMaxData,
  213. IdleTimeout: s.config.IdleTimeout,
  214. MaxBidiStreamNum: protocol.StreamNum(s.config.MaxIncomingStreams),
  215. MaxUniStreamNum: protocol.StreamNum(s.config.MaxIncomingUniStreams),
  216. MaxAckDelay: protocol.MaxAckDelayInclGranularity,
  217. AckDelayExponent: protocol.AckDelayExponent,
  218. DisableMigration: true,
  219. StatelessResetToken: &statelessResetToken,
  220. OriginalConnectionID: origDestConnID,
  221. ActiveConnectionIDLimit: protocol.MaxActiveConnectionIDs,
  222. }
  223. cs := handshake.NewCryptoSetupServer(
  224. initialStream,
  225. handshakeStream,
  226. oneRTTStream,
  227. clientDestConnID,
  228. conn.RemoteAddr(),
  229. params,
  230. &handshakeRunner{
  231. onReceivedParams: s.processTransportParameters,
  232. onError: s.closeLocal,
  233. dropKeys: s.dropEncryptionLevel,
  234. onHandshakeComplete: func() {
  235. runner.Retire(clientDestConnID)
  236. close(s.handshakeCompleteChan)
  237. },
  238. },
  239. tlsConf,
  240. s.rttStats,
  241. logger,
  242. )
  243. s.cryptoStreamHandler = cs
  244. s.packer = newPacketPacker(
  245. srcConnID,
  246. s.connIDManager.Get,
  247. initialStream,
  248. handshakeStream,
  249. s.sentPacketHandler,
  250. s.retransmissionQueue,
  251. s.RemoteAddr(),
  252. cs,
  253. s.framer,
  254. s.receivedPacketHandler,
  255. s.perspective,
  256. s.version,
  257. )
  258. s.unpacker = newPacketUnpacker(cs, s.version)
  259. s.cryptoStreamManager = newCryptoStreamManager(cs, initialStream, handshakeStream, oneRTTStream)
  260. return s
  261. }
  262. // declare this as a variable, such that we can it mock it in the tests
  263. var newClientSession = func(
  264. conn connection,
  265. runner sessionRunner,
  266. destConnID protocol.ConnectionID,
  267. srcConnID protocol.ConnectionID,
  268. conf *Config,
  269. tlsConf *tls.Config,
  270. initialPacketNumber protocol.PacketNumber,
  271. initialVersion protocol.VersionNumber,
  272. logger utils.Logger,
  273. v protocol.VersionNumber,
  274. ) quicSession {
  275. s := &session{
  276. conn: conn,
  277. config: conf,
  278. handshakeDestConnID: destConnID,
  279. srcConnIDLen: srcConnID.Len(),
  280. perspective: protocol.PerspectiveClient,
  281. handshakeCompleteChan: make(chan struct{}),
  282. logID: destConnID.String(),
  283. logger: logger,
  284. initialVersion: initialVersion,
  285. version: v,
  286. }
  287. s.connIDManager = newConnIDManager(
  288. destConnID,
  289. func(token [16]byte) { runner.AddResetToken(token, s) },
  290. runner.RemoveResetToken,
  291. runner.RetireResetToken,
  292. s.queueControlFrame,
  293. )
  294. s.connIDGenerator = newConnIDGenerator(
  295. srcConnID,
  296. nil,
  297. func(connID protocol.ConnectionID) [16]byte { return runner.Add(connID, s) },
  298. runner.Remove,
  299. runner.Retire,
  300. runner.ReplaceWithClosed,
  301. s.queueControlFrame,
  302. )
  303. s.preSetup()
  304. s.sentPacketHandler = ackhandler.NewSentPacketHandler(initialPacketNumber, s.rttStats, s.traceCallback, s.logger)
  305. initialStream := newCryptoStream()
  306. handshakeStream := newCryptoStream()
  307. oneRTTStream := newPostHandshakeCryptoStream(s.framer)
  308. params := &handshake.TransportParameters{
  309. InitialMaxStreamDataBidiRemote: protocol.InitialMaxStreamData,
  310. InitialMaxStreamDataBidiLocal: protocol.InitialMaxStreamData,
  311. InitialMaxStreamDataUni: protocol.InitialMaxStreamData,
  312. InitialMaxData: protocol.InitialMaxData,
  313. IdleTimeout: s.config.IdleTimeout,
  314. MaxBidiStreamNum: protocol.StreamNum(s.config.MaxIncomingStreams),
  315. MaxUniStreamNum: protocol.StreamNum(s.config.MaxIncomingUniStreams),
  316. MaxAckDelay: protocol.MaxAckDelayInclGranularity,
  317. AckDelayExponent: protocol.AckDelayExponent,
  318. DisableMigration: true,
  319. ActiveConnectionIDLimit: protocol.MaxActiveConnectionIDs,
  320. }
  321. cs, clientHelloWritten := handshake.NewCryptoSetupClient(
  322. initialStream,
  323. handshakeStream,
  324. oneRTTStream,
  325. destConnID,
  326. conn.RemoteAddr(),
  327. params,
  328. &handshakeRunner{
  329. onReceivedParams: s.processTransportParameters,
  330. onError: s.closeLocal,
  331. dropKeys: s.dropEncryptionLevel,
  332. onHandshakeComplete: func() { close(s.handshakeCompleteChan) },
  333. },
  334. tlsConf,
  335. s.rttStats,
  336. logger,
  337. )
  338. s.clientHelloWritten = clientHelloWritten
  339. s.cryptoStreamHandler = cs
  340. s.cryptoStreamManager = newCryptoStreamManager(cs, initialStream, handshakeStream, oneRTTStream)
  341. s.unpacker = newPacketUnpacker(cs, s.version)
  342. s.packer = newPacketPacker(
  343. srcConnID,
  344. s.connIDManager.Get,
  345. initialStream,
  346. handshakeStream,
  347. s.sentPacketHandler,
  348. s.retransmissionQueue,
  349. s.RemoteAddr(),
  350. cs,
  351. s.framer,
  352. s.receivedPacketHandler,
  353. s.perspective,
  354. s.version,
  355. )
  356. if len(tlsConf.ServerName) > 0 {
  357. s.tokenStoreKey = tlsConf.ServerName
  358. } else {
  359. s.tokenStoreKey = conn.RemoteAddr().String()
  360. }
  361. if s.config.TokenStore != nil {
  362. if token := s.config.TokenStore.Pop(s.tokenStoreKey); token != nil {
  363. s.packer.SetToken(token.data)
  364. }
  365. }
  366. return s
  367. }
  368. func (s *session) preSetup() {
  369. s.sendQueue = newSendQueue(s.conn)
  370. s.retransmissionQueue = newRetransmissionQueue(s.version)
  371. s.frameParser = wire.NewFrameParser(s.version)
  372. s.rttStats = &congestion.RTTStats{}
  373. s.receivedPacketHandler = ackhandler.NewReceivedPacketHandler(s.rttStats, s.logger, s.version)
  374. s.connFlowController = flowcontrol.NewConnectionFlowController(
  375. protocol.InitialMaxData,
  376. protocol.ByteCount(s.config.MaxReceiveConnectionFlowControlWindow),
  377. s.onHasConnectionWindowUpdate,
  378. s.rttStats,
  379. s.logger,
  380. )
  381. s.earlySessionReadyChan = make(chan struct{})
  382. s.streamsMap = newStreamsMap(
  383. s,
  384. s.newFlowController,
  385. uint64(s.config.MaxIncomingStreams),
  386. uint64(s.config.MaxIncomingUniStreams),
  387. s.perspective,
  388. s.version,
  389. )
  390. s.framer = newFramer(s.streamsMap, s.version)
  391. s.receivedPackets = make(chan *receivedPacket, protocol.MaxSessionUnprocessedPackets)
  392. s.closeChan = make(chan closeError, 1)
  393. s.sendingScheduled = make(chan struct{}, 1)
  394. s.undecryptablePackets = make([]*receivedPacket, 0, protocol.MaxUndecryptablePackets)
  395. s.ctx, s.ctxCancel = context.WithCancel(context.Background())
  396. s.handshakeCtx, s.handshakeCtxCancel = context.WithCancel(context.Background())
  397. s.timer = utils.NewTimer()
  398. now := time.Now()
  399. s.lastPacketReceivedTime = now
  400. s.sessionCreationTime = now
  401. s.windowUpdateQueue = newWindowUpdateQueue(s.streamsMap, s.connFlowController, s.framer.QueueControlFrame)
  402. if s.config.QuicTracer != nil {
  403. s.traceCallback = func(ev quictrace.Event) {
  404. s.config.QuicTracer.Trace(s.origDestConnID, ev)
  405. }
  406. }
  407. }
  408. // [Psiphon]
  409. //
  410. // Backport https://github.com/lucas-clemente/quic-go/commit/079279b9cf4cb5dafc8b7f673a2e7e47a4b6a06e:
  411. // > session.maybeResetTimer() and session.run() were using slightly
  412. // > different definitions of when a keep-alive PING should be sent. Under
  413. // > certain conditions, this would make us repeatedly set a timer for the
  414. // > keep-alive, but on timer expiration no keep-alive would be sent.
  415. //
  416. // This changes session.run and session.maybeResetTimer. As we don't yet have
  417. // https://github.com/lucas-clemente/quic-go/commit/27549c56656665859354255d3912f6428bfcb9f0,
  418. // "use the minimum of the two peers' max_idle_timeouts", s.config.IdleTimeout is used
  419. // in place of s.idleTimeout.
  420. // run the session main loop
  421. func (s *session) run() error {
  422. defer s.ctxCancel()
  423. go s.cryptoStreamHandler.RunHandshake()
  424. go func() {
  425. if err := s.sendQueue.Run(); err != nil {
  426. s.closeLocal(err)
  427. }
  428. }()
  429. if s.perspective == protocol.PerspectiveClient {
  430. select {
  431. case <-s.clientHelloWritten:
  432. s.scheduleSending()
  433. case closeErr := <-s.closeChan:
  434. // put the close error back into the channel, so that the run loop can receive it
  435. s.closeChan <- closeErr
  436. }
  437. }
  438. var closeErr closeError
  439. runLoop:
  440. for {
  441. // Close immediately if requested
  442. select {
  443. case closeErr = <-s.closeChan:
  444. break runLoop
  445. case <-s.handshakeCompleteChan:
  446. s.handleHandshakeComplete()
  447. default:
  448. }
  449. s.maybeResetTimer()
  450. select {
  451. case closeErr = <-s.closeChan:
  452. break runLoop
  453. case <-s.timer.Chan():
  454. s.timer.SetRead()
  455. // We do all the interesting stuff after the switch statement, so
  456. // nothing to see here.
  457. case <-s.sendingScheduled:
  458. // We do all the interesting stuff after the switch statement, so
  459. // nothing to see here.
  460. case p := <-s.receivedPackets:
  461. // Only reset the timers if this packet was actually processed.
  462. // This avoids modifying any state when handling undecryptable packets,
  463. // which could be injected by an attacker.
  464. if wasProcessed := s.handlePacketImpl(p); !wasProcessed {
  465. continue
  466. }
  467. case <-s.handshakeCompleteChan:
  468. s.handleHandshakeComplete()
  469. }
  470. now := time.Now()
  471. if timeout := s.sentPacketHandler.GetLossDetectionTimeout(); !timeout.IsZero() && timeout.Before(now) {
  472. // This could cause packets to be retransmitted.
  473. // Check it before trying to send packets.
  474. if err := s.sentPacketHandler.OnLossDetectionTimeout(); err != nil {
  475. s.closeLocal(err)
  476. }
  477. }
  478. var pacingDeadline time.Time
  479. if s.pacingDeadline.IsZero() { // the timer didn't have a pacing deadline set
  480. pacingDeadline = s.sentPacketHandler.TimeUntilSend()
  481. }
  482. if keepAliveTime := s.nextKeepAliveTime(); !keepAliveTime.IsZero() && !now.Before(keepAliveTime) {
  483. // send a PING frame since there is no activity in the session
  484. s.logger.Debugf("Sending a keep-alive PING to keep the connection alive.")
  485. s.framer.QueueControlFrame(&wire.PingFrame{})
  486. s.keepAlivePingSent = true
  487. } else if !s.handshakeComplete && now.Sub(s.sessionCreationTime) >= s.config.HandshakeTimeout {
  488. s.destroyImpl(qerr.TimeoutError("Handshake did not complete in time"))
  489. continue
  490. } else if s.handshakeComplete && now.Sub(s.idleTimeoutStartTime()) >= s.config.IdleTimeout {
  491. s.destroyImpl(qerr.TimeoutError("No recent network activity"))
  492. continue
  493. } else if !pacingDeadline.IsZero() && now.Before(pacingDeadline) {
  494. // If we get to this point before the pacing deadline, we should wait until that deadline.
  495. // This can happen when scheduleSending is called, or a packet is received.
  496. // Set the timer and restart the run loop.
  497. s.pacingDeadline = pacingDeadline
  498. continue
  499. }
  500. if !s.handshakeComplete && now.Sub(s.sessionCreationTime) >= s.config.HandshakeTimeout {
  501. s.destroyImpl(qerr.TimeoutError("Handshake did not complete in time"))
  502. continue
  503. }
  504. if s.handshakeComplete && now.Sub(s.idleTimeoutStartTime()) >= s.config.IdleTimeout {
  505. s.destroyImpl(qerr.TimeoutError("No recent network activity"))
  506. continue
  507. }
  508. if err := s.sendPackets(); err != nil {
  509. s.closeLocal(err)
  510. }
  511. }
  512. // [Psiphon]
  513. // Stop timer to immediately release resources
  514. s.timer.Reset(time.Time{})
  515. s.handleCloseError(closeErr)
  516. s.logger.Infof("Connection %s closed.", s.logID)
  517. s.cryptoStreamHandler.Close()
  518. s.sendQueue.Close()
  519. return closeErr.err
  520. }
  521. // blocks until the early session can be used
  522. func (s *session) earlySessionReady() <-chan struct{} {
  523. return s.earlySessionReadyChan
  524. }
  525. func (s *session) HandshakeComplete() context.Context {
  526. return s.handshakeCtx
  527. }
  528. func (s *session) Context() context.Context {
  529. return s.ctx
  530. }
  531. func (s *session) ConnectionState() tls.ConnectionState {
  532. return s.cryptoStreamHandler.ConnectionState()
  533. }
  534. // Time when the next keep-alive packet should be sent.
  535. // It returns a zero time if no keep-alive should be sent.
  536. func (s *session) nextKeepAliveTime() time.Time {
  537. if !s.config.KeepAlive || s.keepAlivePingSent || s.firstAckElicitingPacketAfterIdleSentTime.IsZero() {
  538. return time.Time{}
  539. }
  540. return s.lastPacketReceivedTime.Add(s.keepAliveInterval / 2)
  541. }
  542. func (s *session) maybeResetTimer() {
  543. var deadline time.Time
  544. if !s.handshakeComplete {
  545. deadline = s.sessionCreationTime.Add(s.config.HandshakeTimeout)
  546. } else {
  547. if keepAliveTime := s.nextKeepAliveTime(); !keepAliveTime.IsZero() {
  548. deadline = keepAliveTime
  549. } else {
  550. deadline = s.idleTimeoutStartTime().Add(s.config.IdleTimeout)
  551. }
  552. }
  553. if ackAlarm := s.receivedPacketHandler.GetAlarmTimeout(); !ackAlarm.IsZero() {
  554. deadline = utils.MinTime(deadline, ackAlarm)
  555. }
  556. if lossTime := s.sentPacketHandler.GetLossDetectionTimeout(); !lossTime.IsZero() {
  557. deadline = utils.MinTime(deadline, lossTime)
  558. }
  559. if !s.pacingDeadline.IsZero() {
  560. deadline = utils.MinTime(deadline, s.pacingDeadline)
  561. }
  562. s.timer.Reset(deadline)
  563. }
  564. func (s *session) idleTimeoutStartTime() time.Time {
  565. return utils.MaxTime(s.lastPacketReceivedTime, s.firstAckElicitingPacketAfterIdleSentTime)
  566. }
  567. func (s *session) handleHandshakeComplete() {
  568. s.handshakeComplete = true
  569. s.handshakeCompleteChan = nil // prevent this case from ever being selected again
  570. s.handshakeCtxCancel()
  571. s.connIDGenerator.SetHandshakeComplete()
  572. s.sentPacketHandler.SetHandshakeComplete()
  573. // The client completes the handshake first (after sending the CFIN).
  574. // We need to make sure it learns about the server completing the handshake,
  575. // in order to stop retransmitting handshake packets.
  576. // They will stop retransmitting handshake packets when receiving the first 1-RTT packet.
  577. if s.perspective == protocol.PerspectiveServer {
  578. token, err := s.tokenGenerator.NewToken(s.conn.RemoteAddr())
  579. if err != nil {
  580. s.closeLocal(err)
  581. }
  582. s.queueControlFrame(&wire.NewTokenFrame{Token: token})
  583. }
  584. }
  585. func (s *session) handlePacketImpl(rp *receivedPacket) bool {
  586. var counter uint8
  587. var lastConnID protocol.ConnectionID
  588. var processed bool
  589. data := rp.data
  590. p := rp
  591. for len(data) > 0 {
  592. if counter > 0 {
  593. p = p.Clone()
  594. p.data = data
  595. }
  596. hdr, packetData, rest, err := wire.ParsePacket(p.data, s.srcConnIDLen)
  597. if err != nil {
  598. s.logger.Debugf("error parsing packet: %s", err)
  599. break
  600. }
  601. if counter > 0 && !hdr.DestConnectionID.Equal(lastConnID) {
  602. s.logger.Debugf("coalesced packet has different destination connection ID: %s, expected %s", hdr.DestConnectionID, lastConnID)
  603. break
  604. }
  605. lastConnID = hdr.DestConnectionID
  606. if counter > 0 {
  607. p.buffer.Split()
  608. }
  609. counter++
  610. // only log if this actually a coalesced packet
  611. if s.logger.Debug() && (counter > 1 || len(rest) > 0) {
  612. s.logger.Debugf("Parsed a coalesced packet. Part %d: %d bytes. Remaining: %d bytes.", counter, len(packetData), len(rest))
  613. }
  614. p.data = packetData
  615. if wasProcessed := s.handleSinglePacket(p, hdr); wasProcessed {
  616. processed = true
  617. }
  618. data = rest
  619. }
  620. p.buffer.MaybeRelease()
  621. return processed
  622. }
  623. func (s *session) handleSinglePacket(p *receivedPacket, hdr *wire.Header) bool /* was the packet successfully processed */ {
  624. var wasQueued bool
  625. defer func() {
  626. // Put back the packet buffer if the packet wasn't queued for later decryption.
  627. if !wasQueued {
  628. p.buffer.Decrement()
  629. }
  630. }()
  631. if hdr.Type == protocol.PacketTypeRetry {
  632. return s.handleRetryPacket(hdr)
  633. }
  634. // The server can change the source connection ID with the first Handshake packet.
  635. // After this, all packets with a different source connection have to be ignored.
  636. if s.receivedFirstPacket && hdr.IsLongHeader && !hdr.SrcConnectionID.Equal(s.handshakeDestConnID) {
  637. s.logger.Debugf("Dropping %s packet with unexpected source connection ID: %s (expected %s)", hdr.PacketType(), hdr.SrcConnectionID, s.handshakeDestConnID)
  638. return false
  639. }
  640. // drop 0-RTT packets
  641. if hdr.Type == protocol.PacketType0RTT {
  642. return false
  643. }
  644. packet, err := s.unpacker.Unpack(hdr, p.rcvTime, p.data)
  645. if err != nil {
  646. switch err {
  647. case handshake.ErrKeysDropped:
  648. s.logger.Debugf("Dropping %s packet because we already dropped the keys.", hdr.PacketType())
  649. case handshake.ErrKeysNotYetAvailable:
  650. // Sealer for this encryption level not yet available.
  651. // Try again later.
  652. wasQueued = true
  653. s.tryQueueingUndecryptablePacket(p)
  654. case wire.ErrInvalidReservedBits:
  655. s.closeLocal(qerr.Error(qerr.ProtocolViolation, err.Error()))
  656. default:
  657. // This might be a packet injected by an attacker.
  658. // Drop it.
  659. s.logger.Debugf("Dropping %s packet that could not be unpacked. Error: %s", hdr.PacketType(), err)
  660. }
  661. return false
  662. }
  663. if s.logger.Debug() {
  664. s.logger.Debugf("<- Reading packet %#x (%d bytes) for connection %s, %s", packet.packetNumber, len(p.data), hdr.DestConnectionID, packet.encryptionLevel)
  665. packet.hdr.Log(s.logger)
  666. }
  667. if err := s.handleUnpackedPacket(packet, p.rcvTime); err != nil {
  668. s.closeLocal(err)
  669. return false
  670. }
  671. return true
  672. }
  673. func (s *session) handleRetryPacket(hdr *wire.Header) bool /* was this a valid Retry */ {
  674. if s.perspective == protocol.PerspectiveServer {
  675. s.logger.Debugf("Ignoring Retry.")
  676. return false
  677. }
  678. if s.receivedFirstPacket {
  679. s.logger.Debugf("Ignoring Retry, since we already received a packet.")
  680. return false
  681. }
  682. (&wire.ExtendedHeader{Header: *hdr}).Log(s.logger)
  683. if !hdr.OrigDestConnectionID.Equal(s.handshakeDestConnID) {
  684. s.logger.Debugf("Ignoring spoofed Retry. Original Destination Connection ID: %s, expected: %s", hdr.OrigDestConnectionID, s.handshakeDestConnID)
  685. return false
  686. }
  687. if hdr.SrcConnectionID.Equal(s.handshakeDestConnID) {
  688. s.logger.Debugf("Ignoring Retry, since the server didn't change the Source Connection ID.")
  689. return false
  690. }
  691. // If a token is already set, this means that we already received a Retry from the server.
  692. // Ignore this Retry packet.
  693. if s.receivedRetry {
  694. s.logger.Debugf("Ignoring Retry, since a Retry was already received.")
  695. return false
  696. }
  697. s.logger.Debugf("<- Received Retry")
  698. s.logger.Debugf("Switching destination connection ID to: %s", hdr.SrcConnectionID)
  699. s.origDestConnID = s.handshakeDestConnID
  700. newDestConnID := hdr.SrcConnectionID
  701. s.receivedRetry = true
  702. if err := s.sentPacketHandler.ResetForRetry(); err != nil {
  703. s.closeLocal(err)
  704. return false
  705. }
  706. s.handshakeDestConnID = newDestConnID
  707. s.cryptoStreamHandler.ChangeConnectionID(newDestConnID)
  708. s.packer.SetToken(hdr.Token)
  709. s.connIDManager.ChangeInitialConnID(newDestConnID)
  710. s.scheduleSending()
  711. return true
  712. }
  713. func (s *session) handleUnpackedPacket(packet *unpackedPacket, rcvTime time.Time) error {
  714. if len(packet.data) == 0 {
  715. return qerr.Error(qerr.ProtocolViolation, "empty packet")
  716. }
  717. // The server can change the source connection ID with the first Handshake packet.
  718. if s.perspective == protocol.PerspectiveClient && !s.receivedFirstPacket && packet.hdr.IsLongHeader && !packet.hdr.SrcConnectionID.Equal(s.handshakeDestConnID) {
  719. cid := packet.hdr.SrcConnectionID
  720. s.logger.Debugf("Received first packet. Switching destination connection ID to: %s", cid)
  721. s.handshakeDestConnID = cid
  722. s.connIDManager.ChangeInitialConnID(cid)
  723. }
  724. s.receivedFirstPacket = true
  725. s.lastPacketReceivedTime = rcvTime
  726. s.firstAckElicitingPacketAfterIdleSentTime = time.Time{}
  727. s.keepAlivePingSent = false
  728. // Only used for tracing.
  729. // If we're not tracing, this slice will always remain empty.
  730. var frames []wire.Frame
  731. var transportState *quictrace.TransportState
  732. r := bytes.NewReader(packet.data)
  733. var isAckEliciting bool
  734. for {
  735. frame, err := s.frameParser.ParseNext(r, packet.encryptionLevel)
  736. if err != nil {
  737. return err
  738. }
  739. if frame == nil {
  740. break
  741. }
  742. if ackhandler.IsFrameAckEliciting(frame) {
  743. isAckEliciting = true
  744. }
  745. if s.traceCallback != nil {
  746. frames = append(frames, frame)
  747. }
  748. if err := s.handleFrame(frame, packet.packetNumber, packet.encryptionLevel); err != nil {
  749. return err
  750. }
  751. }
  752. if s.traceCallback != nil {
  753. transportState = s.sentPacketHandler.GetStats()
  754. s.traceCallback(quictrace.Event{
  755. Time: rcvTime,
  756. EventType: quictrace.PacketReceived,
  757. TransportState: transportState,
  758. EncryptionLevel: packet.encryptionLevel,
  759. PacketNumber: packet.packetNumber,
  760. PacketSize: protocol.ByteCount(len(packet.data)),
  761. Frames: frames,
  762. })
  763. }
  764. s.receivedPacketHandler.ReceivedPacket(packet.packetNumber, packet.encryptionLevel, rcvTime, isAckEliciting)
  765. return nil
  766. }
  767. func (s *session) handleFrame(f wire.Frame, pn protocol.PacketNumber, encLevel protocol.EncryptionLevel) error {
  768. var err error
  769. wire.LogFrame(s.logger, f, false)
  770. switch frame := f.(type) {
  771. case *wire.CryptoFrame:
  772. err = s.handleCryptoFrame(frame, encLevel)
  773. case *wire.StreamFrame:
  774. err = s.handleStreamFrame(frame)
  775. case *wire.AckFrame:
  776. err = s.handleAckFrame(frame, pn, encLevel)
  777. case *wire.ConnectionCloseFrame:
  778. s.handleConnectionCloseFrame(frame)
  779. case *wire.ResetStreamFrame:
  780. err = s.handleResetStreamFrame(frame)
  781. case *wire.MaxDataFrame:
  782. s.handleMaxDataFrame(frame)
  783. case *wire.MaxStreamDataFrame:
  784. err = s.handleMaxStreamDataFrame(frame)
  785. case *wire.MaxStreamsFrame:
  786. err = s.handleMaxStreamsFrame(frame)
  787. case *wire.DataBlockedFrame:
  788. case *wire.StreamDataBlockedFrame:
  789. case *wire.StreamsBlockedFrame:
  790. case *wire.StopSendingFrame:
  791. err = s.handleStopSendingFrame(frame)
  792. case *wire.PingFrame:
  793. case *wire.PathChallengeFrame:
  794. s.handlePathChallengeFrame(frame)
  795. case *wire.PathResponseFrame:
  796. // since we don't send PATH_CHALLENGEs, we don't expect PATH_RESPONSEs
  797. err = errors.New("unexpected PATH_RESPONSE frame")
  798. case *wire.NewTokenFrame:
  799. err = s.handleNewTokenFrame(frame)
  800. case *wire.NewConnectionIDFrame:
  801. err = s.handleNewConnectionIDFrame(frame)
  802. case *wire.RetireConnectionIDFrame:
  803. err = s.handleRetireConnectionIDFrame(frame)
  804. default:
  805. err = fmt.Errorf("unexpected frame type: %s", reflect.ValueOf(&frame).Elem().Type().Name())
  806. }
  807. return err
  808. }
  809. // handlePacket is called by the server with a new packet
  810. func (s *session) handlePacket(p *receivedPacket) {
  811. // Discard packets once the amount of queued packets is larger than
  812. // the channel size, protocol.MaxSessionUnprocessedPackets
  813. select {
  814. case s.receivedPackets <- p:
  815. default:
  816. }
  817. }
  818. func (s *session) handleConnectionCloseFrame(frame *wire.ConnectionCloseFrame) {
  819. var e error
  820. if frame.IsApplicationError {
  821. e = qerr.ApplicationError(frame.ErrorCode, frame.ReasonPhrase)
  822. } else {
  823. e = qerr.Error(frame.ErrorCode, frame.ReasonPhrase)
  824. }
  825. s.closeRemote(e)
  826. }
  827. func (s *session) handleCryptoFrame(frame *wire.CryptoFrame, encLevel protocol.EncryptionLevel) error {
  828. encLevelChanged, err := s.cryptoStreamManager.HandleCryptoFrame(frame, encLevel)
  829. if err != nil {
  830. return err
  831. }
  832. if encLevelChanged {
  833. s.tryDecryptingQueuedPackets()
  834. }
  835. return nil
  836. }
  837. func (s *session) handleStreamFrame(frame *wire.StreamFrame) error {
  838. str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID)
  839. if err != nil {
  840. return err
  841. }
  842. if str == nil {
  843. // Stream is closed and already garbage collected
  844. // ignore this StreamFrame
  845. return nil
  846. }
  847. return str.handleStreamFrame(frame)
  848. }
  849. func (s *session) handleMaxDataFrame(frame *wire.MaxDataFrame) {
  850. s.connFlowController.UpdateSendWindow(frame.ByteOffset)
  851. }
  852. func (s *session) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) error {
  853. str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID)
  854. if err != nil {
  855. return err
  856. }
  857. if str == nil {
  858. // stream is closed and already garbage collected
  859. return nil
  860. }
  861. str.handleMaxStreamDataFrame(frame)
  862. return nil
  863. }
  864. func (s *session) handleMaxStreamsFrame(frame *wire.MaxStreamsFrame) error {
  865. return s.streamsMap.HandleMaxStreamsFrame(frame)
  866. }
  867. func (s *session) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
  868. str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID)
  869. if err != nil {
  870. return err
  871. }
  872. if str == nil {
  873. // stream is closed and already garbage collected
  874. return nil
  875. }
  876. return str.handleResetStreamFrame(frame)
  877. }
  878. func (s *session) handleStopSendingFrame(frame *wire.StopSendingFrame) error {
  879. str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID)
  880. if err != nil {
  881. return err
  882. }
  883. if str == nil {
  884. // stream is closed and already garbage collected
  885. return nil
  886. }
  887. str.handleStopSendingFrame(frame)
  888. return nil
  889. }
  890. func (s *session) handlePathChallengeFrame(frame *wire.PathChallengeFrame) {
  891. s.queueControlFrame(&wire.PathResponseFrame{Data: frame.Data})
  892. }
  893. func (s *session) handleNewTokenFrame(frame *wire.NewTokenFrame) error {
  894. if s.perspective == protocol.PerspectiveServer {
  895. return qerr.Error(qerr.ProtocolViolation, "Received NEW_TOKEN frame from the client.")
  896. }
  897. if s.config.TokenStore != nil {
  898. s.config.TokenStore.Put(s.tokenStoreKey, &ClientToken{data: frame.Token})
  899. }
  900. return nil
  901. }
  902. func (s *session) handleNewConnectionIDFrame(f *wire.NewConnectionIDFrame) error {
  903. return s.connIDManager.Add(f)
  904. }
  905. func (s *session) handleRetireConnectionIDFrame(f *wire.RetireConnectionIDFrame) error {
  906. return s.connIDGenerator.Retire(f.SequenceNumber)
  907. }
  908. func (s *session) handleAckFrame(frame *wire.AckFrame, pn protocol.PacketNumber, encLevel protocol.EncryptionLevel) error {
  909. if err := s.sentPacketHandler.ReceivedAck(frame, pn, encLevel, s.lastPacketReceivedTime); err != nil {
  910. return err
  911. }
  912. if encLevel == protocol.Encryption1RTT {
  913. s.receivedPacketHandler.IgnoreBelow(s.sentPacketHandler.GetLowestPacketNotConfirmedAcked())
  914. s.cryptoStreamHandler.SetLargest1RTTAcked(frame.LargestAcked())
  915. }
  916. return nil
  917. }
  918. // closeLocal closes the session and send a CONNECTION_CLOSE containing the error
  919. func (s *session) closeLocal(e error) {
  920. s.closeOnce.Do(func() {
  921. if e == nil {
  922. s.logger.Infof("Closing session.")
  923. } else {
  924. s.logger.Errorf("Closing session with error: %s", e)
  925. }
  926. s.closeChan <- closeError{err: e, immediate: false, remote: false}
  927. })
  928. }
  929. // destroy closes the session without sending the error on the wire
  930. func (s *session) destroy(e error) {
  931. s.destroyImpl(e)
  932. <-s.ctx.Done()
  933. }
  934. func (s *session) destroyImpl(e error) {
  935. s.closeOnce.Do(func() {
  936. if nerr, ok := e.(net.Error); ok && nerr.Timeout() {
  937. s.logger.Errorf("Destroying session %s: %s", s.connIDManager.Get(), e)
  938. } else {
  939. s.logger.Errorf("Destroying session %s with error: %s", s.connIDManager.Get(), e)
  940. }
  941. s.closeChan <- closeError{err: e, immediate: true, remote: false}
  942. })
  943. }
  944. // closeForRecreating closes the session in order to recreate it immediately afterwards
  945. // It returns the first packet number that should be used in the new session.
  946. func (s *session) closeForRecreating() protocol.PacketNumber {
  947. s.destroy(errCloseForRecreating)
  948. nextPN, _ := s.sentPacketHandler.PeekPacketNumber(protocol.EncryptionInitial)
  949. return nextPN
  950. }
  951. func (s *session) closeRemote(e error) {
  952. s.closeOnce.Do(func() {
  953. s.logger.Errorf("Peer closed session with error: %s", e)
  954. s.closeChan <- closeError{err: e, immediate: true, remote: true}
  955. })
  956. }
  957. // Close the connection. It sends a qerr.NoError.
  958. // It waits until the run loop has stopped before returning
  959. func (s *session) Close() error {
  960. s.closeLocal(nil)
  961. <-s.ctx.Done()
  962. return nil
  963. }
  964. func (s *session) CloseWithError(code protocol.ApplicationErrorCode, desc string) error {
  965. s.closeLocal(qerr.ApplicationError(qerr.ErrorCode(code), desc))
  966. <-s.ctx.Done()
  967. return nil
  968. }
  969. func (s *session) handleCloseError(closeErr closeError) {
  970. if closeErr.err == nil {
  971. closeErr.err = qerr.ApplicationError(0, "")
  972. }
  973. var quicErr *qerr.QuicError
  974. var ok bool
  975. if quicErr, ok = closeErr.err.(*qerr.QuicError); !ok {
  976. quicErr = qerr.ToQuicError(closeErr.err)
  977. }
  978. s.streamsMap.CloseWithError(quicErr)
  979. s.connIDManager.Close()
  980. // If this is a remote close we're done here
  981. if closeErr.remote {
  982. s.connIDGenerator.ReplaceWithClosed(newClosedRemoteSession(s.perspective))
  983. return
  984. }
  985. if closeErr.immediate {
  986. s.connIDGenerator.RemoveAll()
  987. return
  988. }
  989. connClosePacket, err := s.sendConnectionClose(quicErr)
  990. if err != nil {
  991. s.logger.Debugf("Error sending CONNECTION_CLOSE: %s", err)
  992. }
  993. cs := newClosedLocalSession(s.conn, connClosePacket, s.perspective, s.logger)
  994. s.connIDGenerator.ReplaceWithClosed(cs)
  995. }
  996. func (s *session) dropEncryptionLevel(encLevel protocol.EncryptionLevel) {
  997. s.sentPacketHandler.DropPackets(encLevel)
  998. s.receivedPacketHandler.DropPackets(encLevel)
  999. }
  1000. func (s *session) processTransportParameters(data []byte) {
  1001. var params *handshake.TransportParameters
  1002. var err error
  1003. switch s.perspective {
  1004. case protocol.PerspectiveClient:
  1005. params, err = s.processTransportParametersForClient(data)
  1006. case protocol.PerspectiveServer:
  1007. params, err = s.processTransportParametersForServer(data)
  1008. }
  1009. if err != nil {
  1010. s.closeLocal(err)
  1011. return
  1012. }
  1013. s.logger.Debugf("Received Transport Parameters: %s", params)
  1014. s.peerParams = params
  1015. s.keepAliveInterval = utils.MinDuration(params.IdleTimeout/2, protocol.MaxKeepAliveInterval)
  1016. if err := s.streamsMap.UpdateLimits(params); err != nil {
  1017. s.closeLocal(err)
  1018. return
  1019. }
  1020. s.packer.HandleTransportParameters(params)
  1021. s.frameParser.SetAckDelayExponent(params.AckDelayExponent)
  1022. s.connFlowController.UpdateSendWindow(params.InitialMaxData)
  1023. s.rttStats.SetMaxAckDelay(params.MaxAckDelay)
  1024. s.connIDGenerator.SetMaxActiveConnIDs(params.ActiveConnectionIDLimit)
  1025. if params.StatelessResetToken != nil {
  1026. s.connIDManager.SetStatelessResetToken(*params.StatelessResetToken)
  1027. }
  1028. // On the server side, the early session is ready as soon as we processed
  1029. // the client's transport parameters.
  1030. close(s.earlySessionReadyChan)
  1031. }
  1032. func (s *session) processTransportParametersForClient(data []byte) (*handshake.TransportParameters, error) {
  1033. params := &handshake.TransportParameters{}
  1034. if err := params.Unmarshal(data, s.perspective.Opposite()); err != nil {
  1035. return nil, err
  1036. }
  1037. // check the Retry token
  1038. if !params.OriginalConnectionID.Equal(s.origDestConnID) {
  1039. return nil, qerr.Error(qerr.TransportParameterError, fmt.Sprintf("expected original_connection_id to equal %s, is %s", s.origDestConnID, params.OriginalConnectionID))
  1040. }
  1041. // We don't support connection migration yet, so we don't have any use for the preferred_address.
  1042. if params.PreferredAddress != nil {
  1043. s.logger.Debugf("Server sent preferred_address. Retiring the preferred_address connection ID.")
  1044. // Retire the connection ID.
  1045. s.framer.QueueControlFrame(&wire.RetireConnectionIDFrame{SequenceNumber: 1})
  1046. }
  1047. return params, nil
  1048. }
  1049. func (s *session) processTransportParametersForServer(data []byte) (*handshake.TransportParameters, error) {
  1050. params := &handshake.TransportParameters{}
  1051. if err := params.Unmarshal(data, s.perspective.Opposite()); err != nil {
  1052. return nil, err
  1053. }
  1054. return params, nil
  1055. }
  1056. func (s *session) sendPackets() error {
  1057. s.pacingDeadline = time.Time{}
  1058. sendMode := s.sentPacketHandler.SendMode()
  1059. if sendMode == ackhandler.SendNone { // shortcut: return immediately if there's nothing to send
  1060. return nil
  1061. }
  1062. numPackets := s.sentPacketHandler.ShouldSendNumPackets()
  1063. var numPacketsSent int
  1064. sendLoop:
  1065. for {
  1066. switch sendMode {
  1067. case ackhandler.SendNone:
  1068. break sendLoop
  1069. case ackhandler.SendAck:
  1070. // If we already sent packets, and the send mode switches to SendAck,
  1071. // we've just become congestion limited.
  1072. // There's no need to try to send an ACK at this moment.
  1073. if numPacketsSent > 0 {
  1074. return nil
  1075. }
  1076. // We can at most send a single ACK only packet.
  1077. // There will only be a new ACK after receiving new packets.
  1078. // SendAck is only returned when we're congestion limited, so we don't need to set the pacingt timer.
  1079. return s.maybeSendAckOnlyPacket()
  1080. case ackhandler.SendPTOInitial:
  1081. if err := s.sendProbePacket(protocol.EncryptionInitial); err != nil {
  1082. return err
  1083. }
  1084. numPacketsSent++
  1085. case ackhandler.SendPTOHandshake:
  1086. if err := s.sendProbePacket(protocol.EncryptionHandshake); err != nil {
  1087. return err
  1088. }
  1089. numPacketsSent++
  1090. case ackhandler.SendPTOAppData:
  1091. if err := s.sendProbePacket(protocol.Encryption1RTT); err != nil {
  1092. return err
  1093. }
  1094. numPacketsSent++
  1095. case ackhandler.SendAny:
  1096. sentPacket, err := s.sendPacket()
  1097. if err != nil {
  1098. return err
  1099. }
  1100. if !sentPacket {
  1101. break sendLoop
  1102. }
  1103. numPacketsSent++
  1104. default:
  1105. return fmt.Errorf("BUG: invalid send mode %d", sendMode)
  1106. }
  1107. if numPacketsSent >= numPackets {
  1108. break
  1109. }
  1110. sendMode = s.sentPacketHandler.SendMode()
  1111. }
  1112. // Only start the pacing timer if we sent as many packets as we were allowed.
  1113. // There will probably be more to send when calling sendPacket again.
  1114. if numPacketsSent == numPackets {
  1115. s.pacingDeadline = s.sentPacketHandler.TimeUntilSend()
  1116. }
  1117. return nil
  1118. }
  1119. func (s *session) maybeSendAckOnlyPacket() error {
  1120. packet, err := s.packer.MaybePackAckPacket()
  1121. if err != nil {
  1122. return err
  1123. }
  1124. if packet == nil {
  1125. return nil
  1126. }
  1127. s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket(s.retransmissionQueue))
  1128. s.sendPackedPacket(packet)
  1129. return nil
  1130. }
  1131. func (s *session) sendProbePacket(encLevel protocol.EncryptionLevel) error {
  1132. // Queue probe packets until we actually send out a packet,
  1133. // or until there are no more packets to queue.
  1134. var packet *packedPacket
  1135. for {
  1136. if wasQueued := s.sentPacketHandler.QueueProbePacket(encLevel); !wasQueued {
  1137. break
  1138. }
  1139. var err error
  1140. packet, err = s.packer.MaybePackProbePacket(encLevel)
  1141. if err != nil {
  1142. return err
  1143. }
  1144. if packet != nil {
  1145. break
  1146. }
  1147. }
  1148. if packet == nil {
  1149. switch encLevel {
  1150. case protocol.EncryptionInitial:
  1151. s.retransmissionQueue.AddInitial(&wire.PingFrame{})
  1152. case protocol.EncryptionHandshake:
  1153. s.retransmissionQueue.AddHandshake(&wire.PingFrame{})
  1154. case protocol.Encryption1RTT:
  1155. s.retransmissionQueue.AddAppData(&wire.PingFrame{})
  1156. default:
  1157. panic("unexpected encryption level")
  1158. }
  1159. var err error
  1160. packet, err = s.packer.MaybePackProbePacket(encLevel)
  1161. if err != nil {
  1162. return err
  1163. }
  1164. }
  1165. if packet == nil {
  1166. return fmt.Errorf("session BUG: couldn't pack %s probe packet", encLevel)
  1167. }
  1168. s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket(s.retransmissionQueue))
  1169. s.sendPackedPacket(packet)
  1170. return nil
  1171. }
  1172. func (s *session) sendPacket() (bool, error) {
  1173. if isBlocked, offset := s.connFlowController.IsNewlyBlocked(); isBlocked {
  1174. s.framer.QueueControlFrame(&wire.DataBlockedFrame{DataLimit: offset})
  1175. }
  1176. s.windowUpdateQueue.QueueAll()
  1177. packet, err := s.packer.PackPacket()
  1178. if err != nil || packet == nil {
  1179. return false, err
  1180. }
  1181. s.sentPacketHandler.SentPacket(packet.ToAckHandlerPacket(s.retransmissionQueue))
  1182. s.sendPackedPacket(packet)
  1183. return true, nil
  1184. }
  1185. func (s *session) sendPackedPacket(packet *packedPacket) {
  1186. if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && packet.IsAckEliciting() {
  1187. s.firstAckElicitingPacketAfterIdleSentTime = time.Now()
  1188. }
  1189. if s.traceCallback != nil {
  1190. frames := make([]wire.Frame, 0, len(packet.frames))
  1191. for _, f := range packet.frames {
  1192. frames = append(frames, f.Frame)
  1193. }
  1194. s.traceCallback(quictrace.Event{
  1195. Time: time.Now(),
  1196. EventType: quictrace.PacketSent,
  1197. TransportState: s.sentPacketHandler.GetStats(),
  1198. EncryptionLevel: packet.EncryptionLevel(),
  1199. PacketNumber: packet.header.PacketNumber,
  1200. PacketSize: protocol.ByteCount(len(packet.raw)),
  1201. Frames: frames,
  1202. })
  1203. }
  1204. s.logPacket(packet)
  1205. s.connIDManager.SentPacket()
  1206. s.sendQueue.Send(packet)
  1207. }
  1208. func (s *session) sendConnectionClose(quicErr *qerr.QuicError) ([]byte, error) {
  1209. // don't send application errors in Initial or Handshake packets
  1210. if quicErr.IsApplicationError() && !s.handshakeComplete {
  1211. quicErr = qerr.UserCanceledError
  1212. }
  1213. var reason string
  1214. // don't send details of crypto errors
  1215. if !quicErr.IsCryptoError() {
  1216. reason = quicErr.ErrorMessage
  1217. }
  1218. packet, err := s.packer.PackConnectionClose(&wire.ConnectionCloseFrame{
  1219. IsApplicationError: quicErr.IsApplicationError(),
  1220. ErrorCode: quicErr.ErrorCode,
  1221. FrameType: quicErr.FrameType,
  1222. ReasonPhrase: reason,
  1223. })
  1224. if err != nil {
  1225. return nil, err
  1226. }
  1227. s.logPacket(packet)
  1228. return packet.raw, s.conn.Write(packet.raw)
  1229. }
  1230. func (s *session) logPacket(packet *packedPacket) {
  1231. if !s.logger.Debug() {
  1232. // We don't need to allocate the slices for calling the format functions
  1233. return
  1234. }
  1235. s.logger.Debugf("-> Sending packet 0x%x (%d bytes) for connection %s, %s", packet.header.PacketNumber, len(packet.raw), s.logID, packet.EncryptionLevel())
  1236. packet.header.Log(s.logger)
  1237. if packet.ack != nil {
  1238. wire.LogFrame(s.logger, packet.ack, true)
  1239. }
  1240. for _, frame := range packet.frames {
  1241. wire.LogFrame(s.logger, frame.Frame, true)
  1242. }
  1243. }
  1244. // AcceptStream returns the next stream openend by the peer
  1245. func (s *session) AcceptStream(ctx context.Context) (Stream, error) {
  1246. return s.streamsMap.AcceptStream(ctx)
  1247. }
  1248. func (s *session) AcceptUniStream(ctx context.Context) (ReceiveStream, error) {
  1249. return s.streamsMap.AcceptUniStream(ctx)
  1250. }
  1251. // OpenStream opens a stream
  1252. func (s *session) OpenStream() (Stream, error) {
  1253. return s.streamsMap.OpenStream()
  1254. }
  1255. func (s *session) OpenStreamSync(ctx context.Context) (Stream, error) {
  1256. return s.streamsMap.OpenStreamSync(ctx)
  1257. }
  1258. func (s *session) OpenUniStream() (SendStream, error) {
  1259. return s.streamsMap.OpenUniStream()
  1260. }
  1261. func (s *session) OpenUniStreamSync(ctx context.Context) (SendStream, error) {
  1262. return s.streamsMap.OpenUniStreamSync(ctx)
  1263. }
  1264. func (s *session) newFlowController(id protocol.StreamID) flowcontrol.StreamFlowController {
  1265. var initialSendWindow protocol.ByteCount
  1266. if s.peerParams != nil {
  1267. if id.Type() == protocol.StreamTypeUni {
  1268. initialSendWindow = s.peerParams.InitialMaxStreamDataUni
  1269. } else {
  1270. if id.InitiatedBy() == s.perspective {
  1271. initialSendWindow = s.peerParams.InitialMaxStreamDataBidiRemote
  1272. } else {
  1273. initialSendWindow = s.peerParams.InitialMaxStreamDataBidiLocal
  1274. }
  1275. }
  1276. }
  1277. return flowcontrol.NewStreamFlowController(
  1278. id,
  1279. s.connFlowController,
  1280. protocol.InitialMaxStreamData,
  1281. protocol.ByteCount(s.config.MaxReceiveStreamFlowControlWindow),
  1282. initialSendWindow,
  1283. s.onHasStreamWindowUpdate,
  1284. s.rttStats,
  1285. s.logger,
  1286. )
  1287. }
  1288. // scheduleSending signals that we have data for sending
  1289. func (s *session) scheduleSending() {
  1290. select {
  1291. case s.sendingScheduled <- struct{}{}:
  1292. default:
  1293. }
  1294. }
  1295. func (s *session) tryQueueingUndecryptablePacket(p *receivedPacket) {
  1296. if s.handshakeComplete {
  1297. s.logger.Debugf("Received undecryptable packet from %s after the handshake (%d bytes)", p.remoteAddr.String(), len(p.data))
  1298. return
  1299. }
  1300. if len(s.undecryptablePackets)+1 > protocol.MaxUndecryptablePackets {
  1301. s.logger.Infof("Dropping undecrytable packet (%d bytes). Undecryptable packet queue full.", len(p.data))
  1302. return
  1303. }
  1304. s.logger.Infof("Queueing packet (%d bytes) for later decryption", len(p.data))
  1305. s.undecryptablePackets = append(s.undecryptablePackets, p)
  1306. }
  1307. func (s *session) tryDecryptingQueuedPackets() {
  1308. for _, p := range s.undecryptablePackets {
  1309. s.handlePacket(p)
  1310. }
  1311. s.undecryptablePackets = s.undecryptablePackets[:0]
  1312. }
  1313. func (s *session) queueControlFrame(f wire.Frame) {
  1314. s.framer.QueueControlFrame(f)
  1315. s.scheduleSending()
  1316. }
  1317. func (s *session) onHasStreamWindowUpdate(id protocol.StreamID) {
  1318. s.windowUpdateQueue.AddStream(id)
  1319. s.scheduleSending()
  1320. }
  1321. func (s *session) onHasConnectionWindowUpdate() {
  1322. s.windowUpdateQueue.AddConnection()
  1323. s.scheduleSending()
  1324. }
  1325. func (s *session) onHasStreamData(id protocol.StreamID) {
  1326. s.framer.AddActiveStream(id)
  1327. s.scheduleSending()
  1328. }
  1329. func (s *session) onStreamCompleted(id protocol.StreamID) {
  1330. if err := s.streamsMap.DeleteStream(id); err != nil {
  1331. s.closeLocal(err)
  1332. }
  1333. }
  1334. func (s *session) LocalAddr() net.Addr {
  1335. return s.conn.LocalAddr()
  1336. }
  1337. func (s *session) RemoteAddr() net.Addr {
  1338. return s.conn.RemoteAddr()
  1339. }
  1340. func (s *session) getPerspective() protocol.Perspective {
  1341. return s.perspective
  1342. }
  1343. func (s *session) GetVersion() protocol.VersionNumber {
  1344. return s.version
  1345. }