connection.go 81 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482
  1. package quic
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net"
  9. "reflect"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. tls "github.com/Psiphon-Labs/psiphon-tls"
  14. "github.com/Psiphon-Labs/quic-go/internal/ackhandler"
  15. "github.com/Psiphon-Labs/quic-go/internal/flowcontrol"
  16. "github.com/Psiphon-Labs/quic-go/internal/handshake"
  17. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  18. "github.com/Psiphon-Labs/quic-go/internal/qerr"
  19. "github.com/Psiphon-Labs/quic-go/internal/utils"
  20. "github.com/Psiphon-Labs/quic-go/internal/wire"
  21. "github.com/Psiphon-Labs/quic-go/logging"
  22. )
  23. type unpacker interface {
  24. UnpackLongHeader(hdr *wire.Header, data []byte) (*unpackedPacket, error)
  25. UnpackShortHeader(rcvTime time.Time, data []byte) (protocol.PacketNumber, protocol.PacketNumberLen, protocol.KeyPhaseBit, []byte, error)
  26. }
  27. type streamManager interface {
  28. GetOrOpenSendStream(protocol.StreamID) (sendStreamI, error)
  29. GetOrOpenReceiveStream(protocol.StreamID) (receiveStreamI, error)
  30. OpenStream() (Stream, error)
  31. OpenUniStream() (SendStream, error)
  32. OpenStreamSync(context.Context) (Stream, error)
  33. OpenUniStreamSync(context.Context) (SendStream, error)
  34. AcceptStream(context.Context) (Stream, error)
  35. AcceptUniStream(context.Context) (ReceiveStream, error)
  36. DeleteStream(protocol.StreamID) error
  37. UpdateLimits(*wire.TransportParameters)
  38. HandleMaxStreamsFrame(*wire.MaxStreamsFrame)
  39. CloseWithError(error)
  40. ResetFor0RTT()
  41. UseResetMaps()
  42. }
  43. type cryptoStreamHandler interface {
  44. StartHandshake(context.Context) error
  45. ChangeConnectionID(protocol.ConnectionID)
  46. SetLargest1RTTAcked(protocol.PacketNumber) error
  47. SetHandshakeConfirmed()
  48. GetSessionTicket() ([]byte, error)
  49. NextEvent() handshake.Event
  50. DiscardInitialKeys()
  51. HandleMessage([]byte, protocol.EncryptionLevel) error
  52. io.Closer
  53. ConnectionState() handshake.ConnectionState
  54. // [Psiphon]
  55. TLSConnectionMetrics() tls.ConnectionMetrics
  56. }
  57. type receivedPacket struct {
  58. buffer *packetBuffer
  59. remoteAddr net.Addr
  60. rcvTime time.Time
  61. data []byte
  62. ecn protocol.ECN
  63. info packetInfo // only valid if the contained IP address is valid
  64. }
  65. func (p *receivedPacket) Size() protocol.ByteCount { return protocol.ByteCount(len(p.data)) }
  66. func (p *receivedPacket) Clone() *receivedPacket {
  67. return &receivedPacket{
  68. remoteAddr: p.remoteAddr,
  69. rcvTime: p.rcvTime,
  70. data: p.data,
  71. buffer: p.buffer,
  72. ecn: p.ecn,
  73. info: p.info,
  74. }
  75. }
  76. type connRunner interface {
  77. Add(protocol.ConnectionID, packetHandler) bool
  78. Retire(protocol.ConnectionID)
  79. Remove(protocol.ConnectionID)
  80. ReplaceWithClosed([]protocol.ConnectionID, []byte)
  81. AddResetToken(protocol.StatelessResetToken, packetHandler)
  82. RemoveResetToken(protocol.StatelessResetToken)
  83. }
  84. type closeError struct {
  85. err error
  86. remote bool
  87. immediate bool
  88. }
  89. type errCloseForRecreating struct {
  90. nextPacketNumber protocol.PacketNumber
  91. nextVersion protocol.Version
  92. }
  93. func (e *errCloseForRecreating) Error() string {
  94. return "closing connection in order to recreate it"
  95. }
  96. var connTracingID atomic.Uint64 // to be accessed atomically
  97. func nextConnTracingID() ConnectionTracingID { return ConnectionTracingID(connTracingID.Add(1)) }
  98. // A Connection is a QUIC connection
  99. type connection struct {
  100. // Destination connection ID used during the handshake.
  101. // Used to check source connection ID on incoming packets.
  102. handshakeDestConnID protocol.ConnectionID
  103. // Set for the client. Destination connection ID used on the first Initial sent.
  104. origDestConnID protocol.ConnectionID
  105. retrySrcConnID *protocol.ConnectionID // only set for the client (and if a Retry was performed)
  106. srcConnIDLen int
  107. perspective protocol.Perspective
  108. version protocol.Version
  109. config *Config
  110. conn sendConn
  111. sendQueue sender
  112. streamsMap streamManager
  113. connIDManager *connIDManager
  114. connIDGenerator *connIDGenerator
  115. rttStats *utils.RTTStats
  116. cryptoStreamManager *cryptoStreamManager
  117. sentPacketHandler ackhandler.SentPacketHandler
  118. receivedPacketHandler ackhandler.ReceivedPacketHandler
  119. retransmissionQueue *retransmissionQueue
  120. framer *framer
  121. connFlowController flowcontrol.ConnectionFlowController
  122. tokenStoreKey string // only set for the client
  123. tokenGenerator *handshake.TokenGenerator // only set for the server
  124. unpacker unpacker
  125. frameParser wire.FrameParser
  126. packer packer
  127. mtuDiscoverer mtuDiscoverer // initialized when the transport parameters are received
  128. maxPayloadSizeEstimate atomic.Uint32
  129. initialStream *cryptoStream
  130. handshakeStream *cryptoStream
  131. oneRTTStream *cryptoStream // only set for the server
  132. cryptoStreamHandler cryptoStreamHandler
  133. receivedPackets chan receivedPacket
  134. sendingScheduled chan struct{}
  135. closeOnce sync.Once
  136. // closeChan is used to notify the run loop that it should terminate
  137. closeChan chan closeError
  138. ctx context.Context
  139. ctxCancel context.CancelCauseFunc
  140. handshakeCompleteChan chan struct{}
  141. undecryptablePackets []receivedPacket // undecryptable packets, waiting for a change in encryption level
  142. undecryptablePacketsToProcess []receivedPacket
  143. earlyConnReadyChan chan struct{}
  144. sentFirstPacket bool
  145. droppedInitialKeys bool
  146. handshakeComplete bool
  147. handshakeConfirmed bool
  148. receivedRetry bool
  149. versionNegotiated bool
  150. receivedFirstPacket bool
  151. // the minimum of the max_idle_timeout values advertised by both endpoints
  152. idleTimeout time.Duration
  153. creationTime time.Time
  154. // The idle timeout is set based on the max of the time we received the last packet...
  155. lastPacketReceivedTime time.Time
  156. // ... and the time we sent a new ack-eliciting packet after receiving a packet.
  157. firstAckElicitingPacketAfterIdleSentTime time.Time
  158. // pacingDeadline is the time when the next packet should be sent
  159. pacingDeadline time.Time
  160. peerParams *wire.TransportParameters
  161. timer connectionTimer
  162. // keepAlivePingSent stores whether a keep alive PING is in flight.
  163. // It is reset as soon as we receive a packet from the peer.
  164. keepAlivePingSent bool
  165. keepAliveInterval time.Duration
  166. datagramQueue *datagramQueue
  167. connStateMutex sync.Mutex
  168. connState ConnectionState
  169. logID string
  170. tracer *logging.ConnectionTracer
  171. logger utils.Logger
  172. }
  173. var (
  174. _ Connection = &connection{}
  175. _ EarlyConnection = &connection{}
  176. _ streamSender = &connection{}
  177. )
  178. var newConnection = func(
  179. ctx context.Context,
  180. ctxCancel context.CancelCauseFunc,
  181. conn sendConn,
  182. runner connRunner,
  183. origDestConnID protocol.ConnectionID,
  184. retrySrcConnID *protocol.ConnectionID,
  185. clientDestConnID protocol.ConnectionID,
  186. destConnID protocol.ConnectionID,
  187. srcConnID protocol.ConnectionID,
  188. connIDGenerator ConnectionIDGenerator,
  189. statelessResetter *statelessResetter,
  190. conf *Config,
  191. tlsConf *tls.Config,
  192. tokenGenerator *handshake.TokenGenerator,
  193. clientAddressValidated bool,
  194. tracer *logging.ConnectionTracer,
  195. logger utils.Logger,
  196. v protocol.Version,
  197. ) quicConn {
  198. s := &connection{
  199. ctx: ctx,
  200. ctxCancel: ctxCancel,
  201. conn: conn,
  202. config: conf,
  203. handshakeDestConnID: destConnID,
  204. srcConnIDLen: srcConnID.Len(),
  205. tokenGenerator: tokenGenerator,
  206. oneRTTStream: newCryptoStream(),
  207. perspective: protocol.PerspectiveServer,
  208. tracer: tracer,
  209. logger: logger,
  210. version: v,
  211. }
  212. if origDestConnID.Len() > 0 {
  213. s.logID = origDestConnID.String()
  214. } else {
  215. s.logID = destConnID.String()
  216. }
  217. s.connIDManager = newConnIDManager(
  218. destConnID,
  219. func(token protocol.StatelessResetToken) { runner.AddResetToken(token, s) },
  220. runner.RemoveResetToken,
  221. s.queueControlFrame,
  222. )
  223. s.connIDGenerator = newConnIDGenerator(
  224. srcConnID,
  225. &clientDestConnID,
  226. func(connID protocol.ConnectionID) { runner.Add(connID, s) },
  227. statelessResetter,
  228. runner.Remove,
  229. runner.Retire,
  230. runner.ReplaceWithClosed,
  231. s.queueControlFrame,
  232. connIDGenerator,
  233. )
  234. s.preSetup()
  235. // [Psiphon]
  236. initialMaxDatagramSize := protocol.ByteCount(s.config.InitialPacketSize)
  237. if conf.ServerMaxPacketSizeAdjustment != nil {
  238. maxPacketSizeAdjustment := protocol.ByteCount(conf.ServerMaxPacketSizeAdjustment(s.RemoteAddr()))
  239. if initialMaxDatagramSize > maxPacketSizeAdjustment {
  240. initialMaxDatagramSize -= maxPacketSizeAdjustment
  241. }
  242. }
  243. s.sentPacketHandler, s.receivedPacketHandler = ackhandler.NewAckHandler(
  244. 0,
  245. // [Psiphon]
  246. // protocol.ByteCount(s.config.InitialPacketSize),
  247. initialMaxDatagramSize,
  248. s.rttStats,
  249. clientAddressValidated,
  250. s.conn.capabilities().ECN,
  251. s.perspective,
  252. s.tracer,
  253. s.logger,
  254. )
  255. // [Psiphon]
  256. // s.maxPayloadSizeEstimate.Store(uint32(estimateMaxPayloadSize(protocol.ByteCount(s.config.InitialPacketSize))))
  257. s.maxPayloadSizeEstimate.Store(uint32(estimateMaxPayloadSize(initialMaxDatagramSize)))
  258. statelessResetToken := statelessResetter.GetStatelessResetToken(srcConnID)
  259. params := &wire.TransportParameters{
  260. InitialMaxStreamDataBidiLocal: protocol.ByteCount(s.config.InitialStreamReceiveWindow),
  261. InitialMaxStreamDataBidiRemote: protocol.ByteCount(s.config.InitialStreamReceiveWindow),
  262. InitialMaxStreamDataUni: protocol.ByteCount(s.config.InitialStreamReceiveWindow),
  263. InitialMaxData: protocol.ByteCount(s.config.InitialConnectionReceiveWindow),
  264. MaxIdleTimeout: s.config.MaxIdleTimeout,
  265. MaxBidiStreamNum: protocol.StreamNum(s.config.MaxIncomingStreams),
  266. MaxUniStreamNum: protocol.StreamNum(s.config.MaxIncomingUniStreams),
  267. MaxAckDelay: protocol.MaxAckDelayInclGranularity,
  268. AckDelayExponent: protocol.AckDelayExponent,
  269. MaxUDPPayloadSize: protocol.MaxPacketBufferSize,
  270. DisableActiveMigration: true,
  271. StatelessResetToken: &statelessResetToken,
  272. OriginalDestinationConnectionID: origDestConnID,
  273. // For interoperability with quic-go versions before May 2023, this value must be set to a value
  274. // different from protocol.DefaultActiveConnectionIDLimit.
  275. // If set to the default value, it will be omitted from the transport parameters, which will make
  276. // old quic-go versions interpret it as 0, instead of the default value of 2.
  277. // See https://github.com/quic-go/quic-go/pull/3806.
  278. ActiveConnectionIDLimit: protocol.MaxActiveConnectionIDs,
  279. InitialSourceConnectionID: srcConnID,
  280. RetrySourceConnectionID: retrySrcConnID,
  281. }
  282. if s.config.EnableDatagrams {
  283. params.MaxDatagramFrameSize = wire.MaxDatagramSize
  284. } else {
  285. params.MaxDatagramFrameSize = protocol.InvalidByteCount
  286. }
  287. if s.tracer != nil && s.tracer.SentTransportParameters != nil {
  288. s.tracer.SentTransportParameters(params)
  289. }
  290. cs := handshake.NewCryptoSetupServer(
  291. clientDestConnID,
  292. conn.LocalAddr(),
  293. conn.RemoteAddr(),
  294. params,
  295. tlsConf,
  296. conf.Allow0RTT,
  297. s.rttStats,
  298. tracer,
  299. logger,
  300. s.version,
  301. )
  302. s.cryptoStreamHandler = cs
  303. s.packer = newPacketPacker(srcConnID, s.connIDManager.Get, s.initialStream, s.handshakeStream, s.sentPacketHandler, s.retransmissionQueue, cs, s.framer, s.receivedPacketHandler, s.datagramQueue, s.perspective)
  304. s.unpacker = newPacketUnpacker(cs, s.srcConnIDLen)
  305. s.cryptoStreamManager = newCryptoStreamManager(s.initialStream, s.handshakeStream, s.oneRTTStream)
  306. return s
  307. }
  308. // declare this as a variable, such that we can it mock it in the tests
  309. var newClientConnection = func(
  310. ctx context.Context,
  311. conn sendConn,
  312. runner connRunner,
  313. destConnID protocol.ConnectionID,
  314. srcConnID protocol.ConnectionID,
  315. connIDGenerator ConnectionIDGenerator,
  316. statelessResetter *statelessResetter,
  317. conf *Config,
  318. tlsConf *tls.Config,
  319. initialPacketNumber protocol.PacketNumber,
  320. enable0RTT bool,
  321. hasNegotiatedVersion bool,
  322. tracer *logging.ConnectionTracer,
  323. logger utils.Logger,
  324. v protocol.Version,
  325. ) quicConn {
  326. s := &connection{
  327. conn: conn,
  328. config: conf,
  329. origDestConnID: destConnID,
  330. handshakeDestConnID: destConnID,
  331. srcConnIDLen: srcConnID.Len(),
  332. perspective: protocol.PerspectiveClient,
  333. logID: destConnID.String(),
  334. logger: logger,
  335. tracer: tracer,
  336. versionNegotiated: hasNegotiatedVersion,
  337. version: v,
  338. }
  339. s.connIDManager = newConnIDManager(
  340. destConnID,
  341. func(token protocol.StatelessResetToken) { runner.AddResetToken(token, s) },
  342. runner.RemoveResetToken,
  343. s.queueControlFrame,
  344. )
  345. s.connIDGenerator = newConnIDGenerator(
  346. srcConnID,
  347. nil,
  348. func(connID protocol.ConnectionID) { runner.Add(connID, s) },
  349. statelessResetter,
  350. runner.Remove,
  351. runner.Retire,
  352. runner.ReplaceWithClosed,
  353. s.queueControlFrame,
  354. connIDGenerator,
  355. )
  356. s.ctx, s.ctxCancel = context.WithCancelCause(ctx)
  357. s.preSetup()
  358. // [Psiphon]
  359. initialMaxDatagramSize := protocol.ByteCount(s.config.InitialPacketSize)
  360. maxPacketSizeAdjustment := protocol.ByteCount(conf.ClientMaxPacketSizeAdjustment)
  361. if initialMaxDatagramSize > maxPacketSizeAdjustment {
  362. initialMaxDatagramSize -= maxPacketSizeAdjustment
  363. }
  364. s.sentPacketHandler, s.receivedPacketHandler = ackhandler.NewAckHandler(
  365. initialPacketNumber,
  366. // [Psiphon]
  367. // protocol.ByteCount(s.config.InitialPacketSize),
  368. initialMaxDatagramSize,
  369. s.rttStats,
  370. false, // has no effect
  371. s.conn.capabilities().ECN,
  372. s.perspective,
  373. s.tracer,
  374. s.logger,
  375. )
  376. // [Psiphon]
  377. // s.maxPayloadSizeEstimate.Store(uint32(estimateMaxPayloadSize(protocol.ByteCount(s.config.InitialPacketSize))))
  378. s.maxPayloadSizeEstimate.Store(uint32(estimateMaxPayloadSize(initialMaxDatagramSize)))
  379. oneRTTStream := newCryptoStream()
  380. params := &wire.TransportParameters{
  381. InitialMaxStreamDataBidiRemote: protocol.ByteCount(s.config.InitialStreamReceiveWindow),
  382. InitialMaxStreamDataBidiLocal: protocol.ByteCount(s.config.InitialStreamReceiveWindow),
  383. InitialMaxStreamDataUni: protocol.ByteCount(s.config.InitialStreamReceiveWindow),
  384. InitialMaxData: protocol.ByteCount(s.config.InitialConnectionReceiveWindow),
  385. MaxIdleTimeout: s.config.MaxIdleTimeout,
  386. MaxBidiStreamNum: protocol.StreamNum(s.config.MaxIncomingStreams),
  387. MaxUniStreamNum: protocol.StreamNum(s.config.MaxIncomingUniStreams),
  388. MaxAckDelay: protocol.MaxAckDelayInclGranularity,
  389. MaxUDPPayloadSize: protocol.MaxPacketBufferSize,
  390. AckDelayExponent: protocol.AckDelayExponent,
  391. DisableActiveMigration: true,
  392. // For interoperability with quic-go versions before May 2023, this value must be set to a value
  393. // different from protocol.DefaultActiveConnectionIDLimit.
  394. // If set to the default value, it will be omitted from the transport parameters, which will make
  395. // old quic-go versions interpret it as 0, instead of the default value of 2.
  396. // See https://github.com/quic-go/quic-go/pull/3806.
  397. ActiveConnectionIDLimit: protocol.MaxActiveConnectionIDs,
  398. InitialSourceConnectionID: srcConnID,
  399. }
  400. if s.config.EnableDatagrams {
  401. params.MaxDatagramFrameSize = wire.MaxDatagramSize
  402. } else {
  403. params.MaxDatagramFrameSize = protocol.InvalidByteCount
  404. }
  405. if s.tracer != nil && s.tracer.SentTransportParameters != nil {
  406. s.tracer.SentTransportParameters(params)
  407. }
  408. cs := handshake.NewCryptoSetupClient(
  409. destConnID,
  410. params,
  411. tlsConf,
  412. // [Psiphon]
  413. conf.ClientHelloSeed,
  414. conf.GetClientHelloRandom,
  415. enable0RTT,
  416. s.rttStats,
  417. tracer,
  418. logger,
  419. s.version,
  420. )
  421. s.cryptoStreamHandler = cs
  422. s.cryptoStreamManager = newCryptoStreamManager(s.initialStream, s.handshakeStream, oneRTTStream)
  423. s.unpacker = newPacketUnpacker(cs, s.srcConnIDLen)
  424. s.packer = newPacketPacker(srcConnID, s.connIDManager.Get, s.initialStream, s.handshakeStream, s.sentPacketHandler, s.retransmissionQueue, cs, s.framer, s.receivedPacketHandler, s.datagramQueue, s.perspective)
  425. if len(tlsConf.ServerName) > 0 {
  426. s.tokenStoreKey = tlsConf.ServerName
  427. } else {
  428. s.tokenStoreKey = conn.RemoteAddr().String()
  429. }
  430. if s.config.TokenStore != nil {
  431. if token := s.config.TokenStore.Pop(s.tokenStoreKey); token != nil {
  432. s.packer.SetToken(token.data)
  433. }
  434. }
  435. return s
  436. }
  437. func (s *connection) preSetup() {
  438. s.initialStream = newCryptoStream()
  439. s.handshakeStream = newCryptoStream()
  440. s.sendQueue = newSendQueue(s.conn)
  441. s.retransmissionQueue = newRetransmissionQueue()
  442. s.frameParser = *wire.NewFrameParser(s.config.EnableDatagrams)
  443. s.rttStats = &utils.RTTStats{}
  444. s.connFlowController = flowcontrol.NewConnectionFlowController(
  445. protocol.ByteCount(s.config.InitialConnectionReceiveWindow),
  446. protocol.ByteCount(s.config.MaxConnectionReceiveWindow),
  447. func(size protocol.ByteCount) bool {
  448. if s.config.AllowConnectionWindowIncrease == nil {
  449. return true
  450. }
  451. return s.config.AllowConnectionWindowIncrease(s, uint64(size))
  452. },
  453. s.rttStats,
  454. s.logger,
  455. )
  456. s.earlyConnReadyChan = make(chan struct{})
  457. s.streamsMap = newStreamsMap(
  458. s.ctx,
  459. s,
  460. s.queueControlFrame,
  461. s.newFlowController,
  462. uint64(s.config.MaxIncomingStreams),
  463. uint64(s.config.MaxIncomingUniStreams),
  464. s.perspective,
  465. )
  466. s.framer = newFramer(s.connFlowController)
  467. s.receivedPackets = make(chan receivedPacket, protocol.MaxConnUnprocessedPackets)
  468. s.closeChan = make(chan closeError, 1)
  469. s.sendingScheduled = make(chan struct{}, 1)
  470. s.handshakeCompleteChan = make(chan struct{})
  471. now := time.Now()
  472. s.lastPacketReceivedTime = now
  473. s.creationTime = now
  474. s.datagramQueue = newDatagramQueue(s.scheduleSending, s.logger)
  475. s.connState.Version = s.version
  476. }
  477. // run the connection main loop
  478. func (s *connection) run() error {
  479. var closeErr closeError
  480. defer func() { s.ctxCancel(closeErr.err) }()
  481. defer func() {
  482. // Drain queued packets that will never be processed.
  483. for {
  484. select {
  485. case p, ok := <-s.receivedPackets:
  486. if !ok {
  487. return
  488. }
  489. p.buffer.Decrement()
  490. p.buffer.MaybeRelease()
  491. default:
  492. return
  493. }
  494. }
  495. }()
  496. s.timer = *newTimer()
  497. if err := s.cryptoStreamHandler.StartHandshake(s.ctx); err != nil {
  498. return err
  499. }
  500. if err := s.handleHandshakeEvents(time.Now()); err != nil {
  501. return err
  502. }
  503. go func() {
  504. if err := s.sendQueue.Run(); err != nil {
  505. s.destroyImpl(err)
  506. }
  507. }()
  508. if s.perspective == protocol.PerspectiveClient {
  509. s.scheduleSending() // so the ClientHello actually gets sent
  510. }
  511. var sendQueueAvailable <-chan struct{}
  512. runLoop:
  513. for {
  514. if s.framer.QueuedTooManyControlFrames() {
  515. s.closeLocal(&qerr.TransportError{ErrorCode: InternalError})
  516. }
  517. // Close immediately if requested
  518. select {
  519. case closeErr = <-s.closeChan:
  520. break runLoop
  521. default:
  522. }
  523. s.maybeResetTimer()
  524. var processedUndecryptablePacket bool
  525. if len(s.undecryptablePacketsToProcess) > 0 {
  526. queue := s.undecryptablePacketsToProcess
  527. s.undecryptablePacketsToProcess = nil
  528. for _, p := range queue {
  529. if processed := s.handlePacketImpl(p); processed {
  530. processedUndecryptablePacket = true
  531. }
  532. // Don't set timers and send packets if the packet made us close the connection.
  533. select {
  534. case closeErr = <-s.closeChan:
  535. break runLoop
  536. default:
  537. }
  538. }
  539. }
  540. // If we processed any undecryptable packets, jump to the resetting of the timers directly.
  541. if !processedUndecryptablePacket {
  542. select {
  543. case closeErr = <-s.closeChan:
  544. break runLoop
  545. case <-s.timer.Chan():
  546. s.timer.SetRead()
  547. // We do all the interesting stuff after the switch statement, so
  548. // nothing to see here.
  549. case <-s.sendingScheduled:
  550. // We do all the interesting stuff after the switch statement, so
  551. // nothing to see here.
  552. case <-sendQueueAvailable:
  553. case firstPacket := <-s.receivedPackets:
  554. wasProcessed := s.handlePacketImpl(firstPacket)
  555. // Don't set timers and send packets if the packet made us close the connection.
  556. select {
  557. case closeErr = <-s.closeChan:
  558. break runLoop
  559. default:
  560. }
  561. if s.handshakeComplete {
  562. // Now process all packets in the receivedPackets channel.
  563. // Limit the number of packets to the length of the receivedPackets channel,
  564. // so we eventually get a chance to send out an ACK when receiving a lot of packets.
  565. numPackets := len(s.receivedPackets)
  566. receiveLoop:
  567. for i := 0; i < numPackets; i++ {
  568. select {
  569. case p := <-s.receivedPackets:
  570. if processed := s.handlePacketImpl(p); processed {
  571. wasProcessed = true
  572. }
  573. select {
  574. case closeErr = <-s.closeChan:
  575. break runLoop
  576. default:
  577. }
  578. default:
  579. break receiveLoop
  580. }
  581. }
  582. }
  583. // Only reset the timers if this packet was actually processed.
  584. // This avoids modifying any state when handling undecryptable packets,
  585. // which could be injected by an attacker.
  586. if !wasProcessed {
  587. continue
  588. }
  589. }
  590. }
  591. now := time.Now()
  592. if timeout := s.sentPacketHandler.GetLossDetectionTimeout(); !timeout.IsZero() && timeout.Before(now) {
  593. // This could cause packets to be retransmitted.
  594. // Check it before trying to send packets.
  595. if err := s.sentPacketHandler.OnLossDetectionTimeout(now); err != nil {
  596. s.closeLocal(err)
  597. }
  598. }
  599. if keepAliveTime := s.nextKeepAliveTime(); !keepAliveTime.IsZero() && !now.Before(keepAliveTime) {
  600. // send a PING frame since there is no activity in the connection
  601. s.logger.Debugf("Sending a keep-alive PING to keep the connection alive.")
  602. s.framer.QueueControlFrame(&wire.PingFrame{})
  603. s.keepAlivePingSent = true
  604. } else if !s.handshakeComplete && now.Sub(s.creationTime) >= s.config.handshakeTimeout() {
  605. s.destroyImpl(qerr.ErrHandshakeTimeout)
  606. continue
  607. } else {
  608. idleTimeoutStartTime := s.idleTimeoutStartTime()
  609. if (!s.handshakeComplete && now.Sub(idleTimeoutStartTime) >= s.config.HandshakeIdleTimeout) ||
  610. (s.handshakeComplete && now.After(s.nextIdleTimeoutTime())) {
  611. s.destroyImpl(qerr.ErrIdleTimeout)
  612. continue
  613. }
  614. }
  615. if s.sendQueue.WouldBlock() {
  616. // The send queue is still busy sending out packets.
  617. // Wait until there's space to enqueue new packets.
  618. sendQueueAvailable = s.sendQueue.Available()
  619. continue
  620. }
  621. if err := s.triggerSending(now); err != nil {
  622. s.closeLocal(err)
  623. }
  624. if s.sendQueue.WouldBlock() {
  625. sendQueueAvailable = s.sendQueue.Available()
  626. } else {
  627. sendQueueAvailable = nil
  628. }
  629. }
  630. s.cryptoStreamHandler.Close()
  631. s.sendQueue.Close() // close the send queue before sending the CONNECTION_CLOSE
  632. s.handleCloseError(&closeErr)
  633. if s.tracer != nil && s.tracer.Close != nil {
  634. if e := (&errCloseForRecreating{}); !errors.As(closeErr.err, &e) {
  635. s.tracer.Close()
  636. }
  637. }
  638. s.logger.Infof("Connection %s closed.", s.logID)
  639. s.timer.Stop()
  640. return closeErr.err
  641. }
  642. // blocks until the early connection can be used
  643. func (s *connection) earlyConnReady() <-chan struct{} {
  644. return s.earlyConnReadyChan
  645. }
  646. func (s *connection) HandshakeComplete() <-chan struct{} {
  647. return s.handshakeCompleteChan
  648. }
  649. func (s *connection) Context() context.Context {
  650. return s.ctx
  651. }
  652. func (s *connection) supportsDatagrams() bool {
  653. return s.peerParams.MaxDatagramFrameSize > 0
  654. }
  655. func (s *connection) ConnectionState() ConnectionState {
  656. s.connStateMutex.Lock()
  657. defer s.connStateMutex.Unlock()
  658. cs := s.cryptoStreamHandler.ConnectionState()
  659. s.connState.TLS = cs.ConnectionState
  660. s.connState.Used0RTT = cs.Used0RTT
  661. s.connState.GSO = s.conn.capabilities().GSO
  662. return s.connState
  663. }
  664. // [Psiphon]
  665. func (s *connection) TLSConnectionMetrics() tls.ConnectionMetrics {
  666. s.connStateMutex.Lock()
  667. defer s.connStateMutex.Unlock()
  668. return s.cryptoStreamHandler.TLSConnectionMetrics()
  669. }
  670. // Time when the connection should time out
  671. func (s *connection) nextIdleTimeoutTime() time.Time {
  672. idleTimeout := max(s.idleTimeout, s.rttStats.PTO(true)*3)
  673. return s.idleTimeoutStartTime().Add(idleTimeout)
  674. }
  675. // Time when the next keep-alive packet should be sent.
  676. // It returns a zero time if no keep-alive should be sent.
  677. func (s *connection) nextKeepAliveTime() time.Time {
  678. if s.config.KeepAlivePeriod == 0 || s.keepAlivePingSent {
  679. return time.Time{}
  680. }
  681. keepAliveInterval := max(s.keepAliveInterval, s.rttStats.PTO(true)*3/2)
  682. return s.lastPacketReceivedTime.Add(keepAliveInterval)
  683. }
  684. func (s *connection) maybeResetTimer() {
  685. var deadline time.Time
  686. if !s.handshakeComplete {
  687. deadline = s.creationTime.Add(s.config.handshakeTimeout())
  688. if t := s.idleTimeoutStartTime().Add(s.config.HandshakeIdleTimeout); t.Before(deadline) {
  689. deadline = t
  690. }
  691. } else {
  692. if keepAliveTime := s.nextKeepAliveTime(); !keepAliveTime.IsZero() {
  693. deadline = keepAliveTime
  694. } else {
  695. deadline = s.nextIdleTimeoutTime()
  696. }
  697. }
  698. s.timer.SetTimer(
  699. deadline,
  700. s.receivedPacketHandler.GetAlarmTimeout(),
  701. s.sentPacketHandler.GetLossDetectionTimeout(),
  702. s.pacingDeadline,
  703. )
  704. }
  705. func (s *connection) idleTimeoutStartTime() time.Time {
  706. startTime := s.lastPacketReceivedTime
  707. if t := s.firstAckElicitingPacketAfterIdleSentTime; t.After(startTime) {
  708. startTime = t
  709. }
  710. return startTime
  711. }
  712. func (s *connection) handleHandshakeComplete(now time.Time) error {
  713. defer close(s.handshakeCompleteChan)
  714. // Once the handshake completes, we have derived 1-RTT keys.
  715. // There's no point in queueing undecryptable packets for later decryption anymore.
  716. s.undecryptablePackets = nil
  717. s.connIDManager.SetHandshakeComplete()
  718. s.connIDGenerator.SetHandshakeComplete()
  719. if s.tracer != nil && s.tracer.ChoseALPN != nil {
  720. s.tracer.ChoseALPN(s.cryptoStreamHandler.ConnectionState().NegotiatedProtocol)
  721. }
  722. // The server applies transport parameters right away, but the client side has to wait for handshake completion.
  723. // During a 0-RTT connection, the client is only allowed to use the new transport parameters for 1-RTT packets.
  724. if s.perspective == protocol.PerspectiveClient {
  725. s.applyTransportParameters()
  726. return nil
  727. }
  728. // All these only apply to the server side.
  729. if err := s.handleHandshakeConfirmed(now); err != nil {
  730. return err
  731. }
  732. ticket, err := s.cryptoStreamHandler.GetSessionTicket()
  733. if err != nil {
  734. return err
  735. }
  736. if ticket != nil { // may be nil if session tickets are disabled via tls.Config.SessionTicketsDisabled
  737. s.oneRTTStream.Write(ticket)
  738. for s.oneRTTStream.HasData() {
  739. s.queueControlFrame(s.oneRTTStream.PopCryptoFrame(protocol.MaxPostHandshakeCryptoFrameSize))
  740. }
  741. }
  742. token, err := s.tokenGenerator.NewToken(s.conn.RemoteAddr())
  743. if err != nil {
  744. return err
  745. }
  746. s.queueControlFrame(&wire.NewTokenFrame{Token: token})
  747. s.queueControlFrame(&wire.HandshakeDoneFrame{})
  748. return nil
  749. }
  750. func (s *connection) handleHandshakeConfirmed(now time.Time) error {
  751. if err := s.dropEncryptionLevel(protocol.EncryptionHandshake, now); err != nil {
  752. return err
  753. }
  754. s.handshakeConfirmed = true
  755. s.cryptoStreamHandler.SetHandshakeConfirmed()
  756. if !s.config.DisablePathMTUDiscovery && s.conn.capabilities().DF {
  757. s.mtuDiscoverer.Start(now)
  758. }
  759. return nil
  760. }
  761. func (s *connection) handlePacketImpl(rp receivedPacket) bool {
  762. s.sentPacketHandler.ReceivedBytes(rp.Size(), rp.rcvTime)
  763. if wire.IsVersionNegotiationPacket(rp.data) {
  764. s.handleVersionNegotiationPacket(rp)
  765. return false
  766. }
  767. var counter uint8
  768. var lastConnID protocol.ConnectionID
  769. var processed bool
  770. data := rp.data
  771. p := rp
  772. for len(data) > 0 {
  773. if counter > 0 {
  774. p = *(p.Clone())
  775. p.data = data
  776. destConnID, err := wire.ParseConnectionID(p.data, s.srcConnIDLen)
  777. if err != nil {
  778. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  779. s.tracer.DroppedPacket(logging.PacketTypeNotDetermined, protocol.InvalidPacketNumber, protocol.ByteCount(len(data)), logging.PacketDropHeaderParseError)
  780. }
  781. s.logger.Debugf("error parsing packet, couldn't parse connection ID: %s", err)
  782. break
  783. }
  784. if destConnID != lastConnID {
  785. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  786. s.tracer.DroppedPacket(logging.PacketTypeNotDetermined, protocol.InvalidPacketNumber, protocol.ByteCount(len(data)), logging.PacketDropUnknownConnectionID)
  787. }
  788. s.logger.Debugf("coalesced packet has different destination connection ID: %s, expected %s", destConnID, lastConnID)
  789. break
  790. }
  791. }
  792. if wire.IsLongHeaderPacket(p.data[0]) {
  793. hdr, packetData, rest, err := wire.ParsePacket(p.data)
  794. if err != nil {
  795. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  796. dropReason := logging.PacketDropHeaderParseError
  797. if err == wire.ErrUnsupportedVersion {
  798. dropReason = logging.PacketDropUnsupportedVersion
  799. }
  800. s.tracer.DroppedPacket(logging.PacketTypeNotDetermined, protocol.InvalidPacketNumber, protocol.ByteCount(len(data)), dropReason)
  801. }
  802. s.logger.Debugf("error parsing packet: %s", err)
  803. break
  804. }
  805. lastConnID = hdr.DestConnectionID
  806. if hdr.Version != s.version {
  807. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  808. s.tracer.DroppedPacket(logging.PacketTypeFromHeader(hdr), protocol.InvalidPacketNumber, protocol.ByteCount(len(data)), logging.PacketDropUnexpectedVersion)
  809. }
  810. s.logger.Debugf("Dropping packet with version %x. Expected %x.", hdr.Version, s.version)
  811. break
  812. }
  813. if counter > 0 {
  814. p.buffer.Split()
  815. }
  816. counter++
  817. // only log if this actually a coalesced packet
  818. if s.logger.Debug() && (counter > 1 || len(rest) > 0) {
  819. s.logger.Debugf("Parsed a coalesced packet. Part %d: %d bytes. Remaining: %d bytes.", counter, len(packetData), len(rest))
  820. }
  821. p.data = packetData
  822. if wasProcessed := s.handleLongHeaderPacket(p, hdr); wasProcessed {
  823. processed = true
  824. }
  825. data = rest
  826. } else {
  827. if counter > 0 {
  828. p.buffer.Split()
  829. }
  830. if wasProcessed := s.handleShortHeaderPacket(p); wasProcessed {
  831. processed = true
  832. }
  833. break
  834. }
  835. }
  836. p.buffer.MaybeRelease()
  837. return processed
  838. }
  839. func (s *connection) handleShortHeaderPacket(p receivedPacket) bool {
  840. var wasQueued bool
  841. defer func() {
  842. // Put back the packet buffer if the packet wasn't queued for later decryption.
  843. if !wasQueued {
  844. p.buffer.Decrement()
  845. }
  846. }()
  847. destConnID, err := wire.ParseConnectionID(p.data, s.srcConnIDLen)
  848. if err != nil {
  849. s.tracer.DroppedPacket(logging.PacketType1RTT, protocol.InvalidPacketNumber, protocol.ByteCount(len(p.data)), logging.PacketDropHeaderParseError)
  850. return false
  851. }
  852. pn, pnLen, keyPhase, data, err := s.unpacker.UnpackShortHeader(p.rcvTime, p.data)
  853. if err != nil {
  854. wasQueued = s.handleUnpackError(err, p, logging.PacketType1RTT)
  855. return false
  856. }
  857. if s.logger.Debug() {
  858. s.logger.Debugf("<- Reading packet %d (%d bytes) for connection %s, 1-RTT", pn, p.Size(), destConnID)
  859. wire.LogShortHeader(s.logger, destConnID, pn, pnLen, keyPhase)
  860. }
  861. if s.receivedPacketHandler.IsPotentiallyDuplicate(pn, protocol.Encryption1RTT) {
  862. s.logger.Debugf("Dropping (potentially) duplicate packet.")
  863. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  864. s.tracer.DroppedPacket(logging.PacketType1RTT, pn, p.Size(), logging.PacketDropDuplicate)
  865. }
  866. return false
  867. }
  868. var log func([]logging.Frame)
  869. if s.tracer != nil && s.tracer.ReceivedShortHeaderPacket != nil {
  870. log = func(frames []logging.Frame) {
  871. s.tracer.ReceivedShortHeaderPacket(
  872. &logging.ShortHeader{
  873. DestConnectionID: destConnID,
  874. PacketNumber: pn,
  875. PacketNumberLen: pnLen,
  876. KeyPhase: keyPhase,
  877. },
  878. p.Size(),
  879. p.ecn,
  880. frames,
  881. )
  882. }
  883. }
  884. if err := s.handleUnpackedShortHeaderPacket(destConnID, pn, data, p.ecn, p.rcvTime, log); err != nil {
  885. s.closeLocal(err)
  886. return false
  887. }
  888. return true
  889. }
  890. func (s *connection) handleLongHeaderPacket(p receivedPacket, hdr *wire.Header) bool /* was the packet successfully processed */ {
  891. var wasQueued bool
  892. defer func() {
  893. // Put back the packet buffer if the packet wasn't queued for later decryption.
  894. if !wasQueued {
  895. p.buffer.Decrement()
  896. }
  897. }()
  898. if hdr.Type == protocol.PacketTypeRetry {
  899. return s.handleRetryPacket(hdr, p.data, p.rcvTime)
  900. }
  901. // The server can change the source connection ID with the first Handshake packet.
  902. // After this, all packets with a different source connection have to be ignored.
  903. if s.receivedFirstPacket && hdr.Type == protocol.PacketTypeInitial && hdr.SrcConnectionID != s.handshakeDestConnID {
  904. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  905. s.tracer.DroppedPacket(logging.PacketTypeInitial, protocol.InvalidPacketNumber, p.Size(), logging.PacketDropUnknownConnectionID)
  906. }
  907. s.logger.Debugf("Dropping Initial packet (%d bytes) with unexpected source connection ID: %s (expected %s)", p.Size(), hdr.SrcConnectionID, s.handshakeDestConnID)
  908. return false
  909. }
  910. // drop 0-RTT packets, if we are a client
  911. if s.perspective == protocol.PerspectiveClient && hdr.Type == protocol.PacketType0RTT {
  912. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  913. s.tracer.DroppedPacket(logging.PacketType0RTT, protocol.InvalidPacketNumber, p.Size(), logging.PacketDropUnexpectedPacket)
  914. }
  915. return false
  916. }
  917. packet, err := s.unpacker.UnpackLongHeader(hdr, p.data)
  918. if err != nil {
  919. wasQueued = s.handleUnpackError(err, p, logging.PacketTypeFromHeader(hdr))
  920. return false
  921. }
  922. if s.logger.Debug() {
  923. s.logger.Debugf("<- Reading packet %d (%d bytes) for connection %s, %s", packet.hdr.PacketNumber, p.Size(), hdr.DestConnectionID, packet.encryptionLevel)
  924. packet.hdr.Log(s.logger)
  925. }
  926. if pn := packet.hdr.PacketNumber; s.receivedPacketHandler.IsPotentiallyDuplicate(pn, packet.encryptionLevel) {
  927. s.logger.Debugf("Dropping (potentially) duplicate packet.")
  928. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  929. s.tracer.DroppedPacket(logging.PacketTypeFromHeader(hdr), pn, p.Size(), logging.PacketDropDuplicate)
  930. }
  931. return false
  932. }
  933. if err := s.handleUnpackedLongHeaderPacket(packet, p.ecn, p.rcvTime, p.Size()); err != nil {
  934. s.closeLocal(err)
  935. return false
  936. }
  937. return true
  938. }
  939. func (s *connection) handleUnpackError(err error, p receivedPacket, pt logging.PacketType) (wasQueued bool) {
  940. switch err {
  941. case handshake.ErrKeysDropped:
  942. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  943. s.tracer.DroppedPacket(pt, protocol.InvalidPacketNumber, p.Size(), logging.PacketDropKeyUnavailable)
  944. }
  945. s.logger.Debugf("Dropping %s packet (%d bytes) because we already dropped the keys.", pt, p.Size())
  946. case handshake.ErrKeysNotYetAvailable:
  947. // Sealer for this encryption level not yet available.
  948. // Try again later.
  949. s.tryQueueingUndecryptablePacket(p, pt)
  950. return true
  951. case wire.ErrInvalidReservedBits:
  952. s.closeLocal(&qerr.TransportError{
  953. ErrorCode: qerr.ProtocolViolation,
  954. ErrorMessage: err.Error(),
  955. })
  956. case handshake.ErrDecryptionFailed:
  957. // This might be a packet injected by an attacker. Drop it.
  958. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  959. s.tracer.DroppedPacket(pt, protocol.InvalidPacketNumber, p.Size(), logging.PacketDropPayloadDecryptError)
  960. }
  961. s.logger.Debugf("Dropping %s packet (%d bytes) that could not be unpacked. Error: %s", pt, p.Size(), err)
  962. default:
  963. var headerErr *headerParseError
  964. if errors.As(err, &headerErr) {
  965. // This might be a packet injected by an attacker. Drop it.
  966. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  967. s.tracer.DroppedPacket(pt, protocol.InvalidPacketNumber, p.Size(), logging.PacketDropHeaderParseError)
  968. }
  969. s.logger.Debugf("Dropping %s packet (%d bytes) for which we couldn't unpack the header. Error: %s", pt, p.Size(), err)
  970. } else {
  971. // This is an error returned by the AEAD (other than ErrDecryptionFailed).
  972. // For example, a PROTOCOL_VIOLATION due to key updates.
  973. s.closeLocal(err)
  974. }
  975. }
  976. return false
  977. }
  978. func (s *connection) handleRetryPacket(hdr *wire.Header, data []byte, rcvTime time.Time) bool /* was this a valid Retry */ {
  979. if s.perspective == protocol.PerspectiveServer {
  980. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  981. s.tracer.DroppedPacket(logging.PacketTypeRetry, protocol.InvalidPacketNumber, protocol.ByteCount(len(data)), logging.PacketDropUnexpectedPacket)
  982. }
  983. s.logger.Debugf("Ignoring Retry.")
  984. return false
  985. }
  986. if s.receivedFirstPacket {
  987. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  988. s.tracer.DroppedPacket(logging.PacketTypeRetry, protocol.InvalidPacketNumber, protocol.ByteCount(len(data)), logging.PacketDropUnexpectedPacket)
  989. }
  990. s.logger.Debugf("Ignoring Retry, since we already received a packet.")
  991. return false
  992. }
  993. destConnID := s.connIDManager.Get()
  994. if hdr.SrcConnectionID == destConnID {
  995. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  996. s.tracer.DroppedPacket(logging.PacketTypeRetry, protocol.InvalidPacketNumber, protocol.ByteCount(len(data)), logging.PacketDropUnexpectedPacket)
  997. }
  998. s.logger.Debugf("Ignoring Retry, since the server didn't change the Source Connection ID.")
  999. return false
  1000. }
  1001. // If a token is already set, this means that we already received a Retry from the server.
  1002. // Ignore this Retry packet.
  1003. if s.receivedRetry {
  1004. s.logger.Debugf("Ignoring Retry, since a Retry was already received.")
  1005. return false
  1006. }
  1007. tag := handshake.GetRetryIntegrityTag(data[:len(data)-16], destConnID, hdr.Version)
  1008. if !bytes.Equal(data[len(data)-16:], tag[:]) {
  1009. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  1010. s.tracer.DroppedPacket(logging.PacketTypeRetry, protocol.InvalidPacketNumber, protocol.ByteCount(len(data)), logging.PacketDropPayloadDecryptError)
  1011. }
  1012. s.logger.Debugf("Ignoring spoofed Retry. Integrity Tag doesn't match.")
  1013. return false
  1014. }
  1015. newDestConnID := hdr.SrcConnectionID
  1016. s.receivedRetry = true
  1017. s.sentPacketHandler.ResetForRetry(rcvTime)
  1018. s.handshakeDestConnID = newDestConnID
  1019. s.retrySrcConnID = &newDestConnID
  1020. s.cryptoStreamHandler.ChangeConnectionID(newDestConnID)
  1021. s.packer.SetToken(hdr.Token)
  1022. s.connIDManager.ChangeInitialConnID(newDestConnID)
  1023. if s.logger.Debug() {
  1024. s.logger.Debugf("<- Received Retry:")
  1025. (&wire.ExtendedHeader{Header: *hdr}).Log(s.logger)
  1026. s.logger.Debugf("Switching destination connection ID to: %s", hdr.SrcConnectionID)
  1027. }
  1028. if s.tracer != nil && s.tracer.ReceivedRetry != nil {
  1029. s.tracer.ReceivedRetry(hdr)
  1030. }
  1031. s.scheduleSending()
  1032. return true
  1033. }
  1034. func (s *connection) handleVersionNegotiationPacket(p receivedPacket) {
  1035. if s.perspective == protocol.PerspectiveServer || // servers never receive version negotiation packets
  1036. s.receivedFirstPacket || s.versionNegotiated { // ignore delayed / duplicated version negotiation packets
  1037. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  1038. s.tracer.DroppedPacket(logging.PacketTypeVersionNegotiation, protocol.InvalidPacketNumber, p.Size(), logging.PacketDropUnexpectedPacket)
  1039. }
  1040. return
  1041. }
  1042. src, dest, supportedVersions, err := wire.ParseVersionNegotiationPacket(p.data)
  1043. if err != nil {
  1044. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  1045. s.tracer.DroppedPacket(logging.PacketTypeVersionNegotiation, protocol.InvalidPacketNumber, p.Size(), logging.PacketDropHeaderParseError)
  1046. }
  1047. s.logger.Debugf("Error parsing Version Negotiation packet: %s", err)
  1048. return
  1049. }
  1050. for _, v := range supportedVersions {
  1051. if v == s.version {
  1052. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  1053. s.tracer.DroppedPacket(logging.PacketTypeVersionNegotiation, protocol.InvalidPacketNumber, p.Size(), logging.PacketDropUnexpectedVersion)
  1054. }
  1055. // The Version Negotiation packet contains the version that we offered.
  1056. // This might be a packet sent by an attacker, or it was corrupted.
  1057. return
  1058. }
  1059. }
  1060. s.logger.Infof("Received a Version Negotiation packet. Supported Versions: %s", supportedVersions)
  1061. if s.tracer != nil && s.tracer.ReceivedVersionNegotiationPacket != nil {
  1062. s.tracer.ReceivedVersionNegotiationPacket(dest, src, supportedVersions)
  1063. }
  1064. newVersion, ok := protocol.ChooseSupportedVersion(s.config.Versions, supportedVersions)
  1065. if !ok {
  1066. s.destroyImpl(&VersionNegotiationError{
  1067. Ours: s.config.Versions,
  1068. Theirs: supportedVersions,
  1069. })
  1070. s.logger.Infof("No compatible QUIC version found.")
  1071. return
  1072. }
  1073. if s.tracer != nil && s.tracer.NegotiatedVersion != nil {
  1074. s.tracer.NegotiatedVersion(newVersion, s.config.Versions, supportedVersions)
  1075. }
  1076. s.logger.Infof("Switching to QUIC version %s.", newVersion)
  1077. nextPN, _ := s.sentPacketHandler.PeekPacketNumber(protocol.EncryptionInitial)
  1078. s.destroyImpl(&errCloseForRecreating{
  1079. nextPacketNumber: nextPN,
  1080. nextVersion: newVersion,
  1081. })
  1082. }
  1083. func (s *connection) handleUnpackedLongHeaderPacket(
  1084. packet *unpackedPacket,
  1085. ecn protocol.ECN,
  1086. rcvTime time.Time,
  1087. packetSize protocol.ByteCount, // only for logging
  1088. ) error {
  1089. if !s.receivedFirstPacket {
  1090. s.receivedFirstPacket = true
  1091. if !s.versionNegotiated && s.tracer != nil && s.tracer.NegotiatedVersion != nil {
  1092. var clientVersions, serverVersions []protocol.Version
  1093. switch s.perspective {
  1094. case protocol.PerspectiveClient:
  1095. clientVersions = s.config.Versions
  1096. case protocol.PerspectiveServer:
  1097. serverVersions = s.config.Versions
  1098. }
  1099. s.tracer.NegotiatedVersion(s.version, clientVersions, serverVersions)
  1100. }
  1101. // The server can change the source connection ID with the first Handshake packet.
  1102. if s.perspective == protocol.PerspectiveClient && packet.hdr.SrcConnectionID != s.handshakeDestConnID {
  1103. cid := packet.hdr.SrcConnectionID
  1104. s.logger.Debugf("Received first packet. Switching destination connection ID to: %s", cid)
  1105. s.handshakeDestConnID = cid
  1106. s.connIDManager.ChangeInitialConnID(cid)
  1107. }
  1108. // We create the connection as soon as we receive the first packet from the client.
  1109. // We do that before authenticating the packet.
  1110. // That means that if the source connection ID was corrupted,
  1111. // we might have created a connection with an incorrect source connection ID.
  1112. // Once we authenticate the first packet, we need to update it.
  1113. if s.perspective == protocol.PerspectiveServer {
  1114. if packet.hdr.SrcConnectionID != s.handshakeDestConnID {
  1115. s.handshakeDestConnID = packet.hdr.SrcConnectionID
  1116. s.connIDManager.ChangeInitialConnID(packet.hdr.SrcConnectionID)
  1117. }
  1118. if s.tracer != nil && s.tracer.StartedConnection != nil {
  1119. s.tracer.StartedConnection(
  1120. s.conn.LocalAddr(),
  1121. s.conn.RemoteAddr(),
  1122. packet.hdr.SrcConnectionID,
  1123. packet.hdr.DestConnectionID,
  1124. )
  1125. }
  1126. }
  1127. }
  1128. if s.perspective == protocol.PerspectiveServer && packet.encryptionLevel == protocol.EncryptionHandshake &&
  1129. !s.droppedInitialKeys {
  1130. // On the server side, Initial keys are dropped as soon as the first Handshake packet is received.
  1131. // See Section 4.9.1 of RFC 9001.
  1132. if err := s.dropEncryptionLevel(protocol.EncryptionInitial, rcvTime); err != nil {
  1133. return err
  1134. }
  1135. }
  1136. s.lastPacketReceivedTime = rcvTime
  1137. s.firstAckElicitingPacketAfterIdleSentTime = time.Time{}
  1138. s.keepAlivePingSent = false
  1139. var log func([]logging.Frame)
  1140. if s.tracer != nil && s.tracer.ReceivedLongHeaderPacket != nil {
  1141. log = func(frames []logging.Frame) {
  1142. s.tracer.ReceivedLongHeaderPacket(packet.hdr, packetSize, ecn, frames)
  1143. }
  1144. }
  1145. isAckEliciting, err := s.handleFrames(packet.data, packet.hdr.DestConnectionID, packet.encryptionLevel, log, rcvTime)
  1146. if err != nil {
  1147. return err
  1148. }
  1149. return s.receivedPacketHandler.ReceivedPacket(packet.hdr.PacketNumber, ecn, packet.encryptionLevel, rcvTime, isAckEliciting)
  1150. }
  1151. func (s *connection) handleUnpackedShortHeaderPacket(
  1152. destConnID protocol.ConnectionID,
  1153. pn protocol.PacketNumber,
  1154. data []byte,
  1155. ecn protocol.ECN,
  1156. rcvTime time.Time,
  1157. log func([]logging.Frame),
  1158. ) error {
  1159. s.lastPacketReceivedTime = rcvTime
  1160. s.firstAckElicitingPacketAfterIdleSentTime = time.Time{}
  1161. s.keepAlivePingSent = false
  1162. isAckEliciting, err := s.handleFrames(data, destConnID, protocol.Encryption1RTT, log, rcvTime)
  1163. if err != nil {
  1164. return err
  1165. }
  1166. return s.receivedPacketHandler.ReceivedPacket(pn, ecn, protocol.Encryption1RTT, rcvTime, isAckEliciting)
  1167. }
  1168. func (s *connection) handleFrames(
  1169. data []byte,
  1170. destConnID protocol.ConnectionID,
  1171. encLevel protocol.EncryptionLevel,
  1172. log func([]logging.Frame),
  1173. rcvTime time.Time,
  1174. ) (isAckEliciting bool, _ error) {
  1175. // Only used for tracing.
  1176. // If we're not tracing, this slice will always remain empty.
  1177. var frames []logging.Frame
  1178. if log != nil {
  1179. frames = make([]logging.Frame, 0, 4)
  1180. }
  1181. handshakeWasComplete := s.handshakeComplete
  1182. var handleErr error
  1183. for len(data) > 0 {
  1184. l, frame, err := s.frameParser.ParseNext(data, encLevel, s.version)
  1185. if err != nil {
  1186. return false, err
  1187. }
  1188. data = data[l:]
  1189. if frame == nil {
  1190. break
  1191. }
  1192. if ackhandler.IsFrameAckEliciting(frame) {
  1193. isAckEliciting = true
  1194. }
  1195. if log != nil {
  1196. frames = append(frames, toLoggingFrame(frame))
  1197. }
  1198. // An error occurred handling a previous frame.
  1199. // Don't handle the current frame.
  1200. if handleErr != nil {
  1201. continue
  1202. }
  1203. if err := s.handleFrame(frame, encLevel, destConnID, rcvTime); err != nil {
  1204. if log == nil {
  1205. return false, err
  1206. }
  1207. // If we're logging, we need to keep parsing (but not handling) all frames.
  1208. handleErr = err
  1209. }
  1210. }
  1211. if log != nil {
  1212. log(frames)
  1213. if handleErr != nil {
  1214. return false, handleErr
  1215. }
  1216. }
  1217. // Handle completion of the handshake after processing all the frames.
  1218. // This ensures that we correctly handle the following case on the server side:
  1219. // We receive a Handshake packet that contains the CRYPTO frame that allows us to complete the handshake,
  1220. // and an ACK serialized after that CRYPTO frame. In this case, we still want to process the ACK frame.
  1221. if !handshakeWasComplete && s.handshakeComplete {
  1222. if err := s.handleHandshakeComplete(rcvTime); err != nil {
  1223. return false, err
  1224. }
  1225. }
  1226. return
  1227. }
  1228. func (s *connection) handleFrame(
  1229. f wire.Frame,
  1230. encLevel protocol.EncryptionLevel,
  1231. destConnID protocol.ConnectionID,
  1232. rcvTime time.Time,
  1233. ) error {
  1234. var err error
  1235. wire.LogFrame(s.logger, f, false)
  1236. switch frame := f.(type) {
  1237. case *wire.CryptoFrame:
  1238. err = s.handleCryptoFrame(frame, encLevel, rcvTime)
  1239. case *wire.StreamFrame:
  1240. err = s.handleStreamFrame(frame, rcvTime)
  1241. case *wire.AckFrame:
  1242. err = s.handleAckFrame(frame, encLevel, rcvTime)
  1243. case *wire.ConnectionCloseFrame:
  1244. s.handleConnectionCloseFrame(frame)
  1245. case *wire.ResetStreamFrame:
  1246. err = s.handleResetStreamFrame(frame, rcvTime)
  1247. case *wire.MaxDataFrame:
  1248. s.handleMaxDataFrame(frame)
  1249. case *wire.MaxStreamDataFrame:
  1250. err = s.handleMaxStreamDataFrame(frame)
  1251. case *wire.MaxStreamsFrame:
  1252. s.handleMaxStreamsFrame(frame)
  1253. case *wire.DataBlockedFrame:
  1254. case *wire.StreamDataBlockedFrame:
  1255. err = s.handleStreamDataBlockedFrame(frame)
  1256. case *wire.StreamsBlockedFrame:
  1257. case *wire.StopSendingFrame:
  1258. err = s.handleStopSendingFrame(frame)
  1259. case *wire.PingFrame:
  1260. case *wire.PathChallengeFrame:
  1261. s.handlePathChallengeFrame(frame)
  1262. case *wire.PathResponseFrame:
  1263. // since we don't send PATH_CHALLENGEs, we don't expect PATH_RESPONSEs
  1264. err = &qerr.TransportError{
  1265. ErrorCode: qerr.ProtocolViolation,
  1266. ErrorMessage: "unexpected PATH_RESPONSE frame",
  1267. }
  1268. case *wire.NewTokenFrame:
  1269. err = s.handleNewTokenFrame(frame)
  1270. case *wire.NewConnectionIDFrame:
  1271. err = s.handleNewConnectionIDFrame(frame)
  1272. case *wire.RetireConnectionIDFrame:
  1273. err = s.handleRetireConnectionIDFrame(frame, destConnID)
  1274. case *wire.HandshakeDoneFrame:
  1275. err = s.handleHandshakeDoneFrame(rcvTime)
  1276. case *wire.DatagramFrame:
  1277. err = s.handleDatagramFrame(frame)
  1278. default:
  1279. err = fmt.Errorf("unexpected frame type: %s", reflect.ValueOf(&frame).Elem().Type().Name())
  1280. }
  1281. return err
  1282. }
  1283. // handlePacket is called by the server with a new packet
  1284. func (s *connection) handlePacket(p receivedPacket) {
  1285. // Discard packets once the amount of queued packets is larger than
  1286. // the channel size, protocol.MaxConnUnprocessedPackets
  1287. select {
  1288. case s.receivedPackets <- p:
  1289. default:
  1290. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  1291. s.tracer.DroppedPacket(logging.PacketTypeNotDetermined, protocol.InvalidPacketNumber, p.Size(), logging.PacketDropDOSPrevention)
  1292. }
  1293. }
  1294. }
  1295. func (s *connection) handleConnectionCloseFrame(frame *wire.ConnectionCloseFrame) {
  1296. if frame.IsApplicationError {
  1297. s.closeRemote(&qerr.ApplicationError{
  1298. Remote: true,
  1299. ErrorCode: qerr.ApplicationErrorCode(frame.ErrorCode),
  1300. ErrorMessage: frame.ReasonPhrase,
  1301. })
  1302. return
  1303. }
  1304. s.closeRemote(&qerr.TransportError{
  1305. Remote: true,
  1306. ErrorCode: qerr.TransportErrorCode(frame.ErrorCode),
  1307. FrameType: frame.FrameType,
  1308. ErrorMessage: frame.ReasonPhrase,
  1309. })
  1310. }
  1311. func (s *connection) handleCryptoFrame(frame *wire.CryptoFrame, encLevel protocol.EncryptionLevel, rcvTime time.Time) error {
  1312. if err := s.cryptoStreamManager.HandleCryptoFrame(frame, encLevel); err != nil {
  1313. return err
  1314. }
  1315. for {
  1316. data := s.cryptoStreamManager.GetCryptoData(encLevel)
  1317. if data == nil {
  1318. break
  1319. }
  1320. if err := s.cryptoStreamHandler.HandleMessage(data, encLevel); err != nil {
  1321. return err
  1322. }
  1323. }
  1324. return s.handleHandshakeEvents(rcvTime)
  1325. }
  1326. func (s *connection) handleHandshakeEvents(now time.Time) error {
  1327. for {
  1328. ev := s.cryptoStreamHandler.NextEvent()
  1329. var err error
  1330. switch ev.Kind {
  1331. case handshake.EventNoEvent:
  1332. return nil
  1333. case handshake.EventHandshakeComplete:
  1334. // Don't call handleHandshakeComplete yet.
  1335. // It's advantageous to process ACK frames that might be serialized after the CRYPTO frame first.
  1336. s.handshakeComplete = true
  1337. case handshake.EventReceivedTransportParameters:
  1338. err = s.handleTransportParameters(ev.TransportParameters)
  1339. case handshake.EventRestoredTransportParameters:
  1340. s.restoreTransportParameters(ev.TransportParameters)
  1341. close(s.earlyConnReadyChan)
  1342. case handshake.EventReceivedReadKeys:
  1343. // Queue all packets for decryption that have been undecryptable so far.
  1344. s.undecryptablePacketsToProcess = s.undecryptablePackets
  1345. s.undecryptablePackets = nil
  1346. case handshake.EventDiscard0RTTKeys:
  1347. err = s.dropEncryptionLevel(protocol.Encryption0RTT, now)
  1348. case handshake.EventWriteInitialData:
  1349. _, err = s.initialStream.Write(ev.Data)
  1350. case handshake.EventWriteHandshakeData:
  1351. _, err = s.handshakeStream.Write(ev.Data)
  1352. }
  1353. if err != nil {
  1354. return err
  1355. }
  1356. }
  1357. }
  1358. func (s *connection) handleStreamFrame(frame *wire.StreamFrame, rcvTime time.Time) error {
  1359. str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID)
  1360. if err != nil {
  1361. return err
  1362. }
  1363. if str == nil { // stream was already closed and garbage collected
  1364. return nil
  1365. }
  1366. return str.handleStreamFrame(frame, rcvTime)
  1367. }
  1368. func (s *connection) handleMaxDataFrame(frame *wire.MaxDataFrame) {
  1369. s.connFlowController.UpdateSendWindow(frame.MaximumData)
  1370. }
  1371. func (s *connection) handleMaxStreamDataFrame(frame *wire.MaxStreamDataFrame) error {
  1372. str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID)
  1373. if err != nil {
  1374. return err
  1375. }
  1376. if str == nil {
  1377. // stream is closed and already garbage collected
  1378. return nil
  1379. }
  1380. str.updateSendWindow(frame.MaximumStreamData)
  1381. return nil
  1382. }
  1383. func (s *connection) handleStreamDataBlockedFrame(frame *wire.StreamDataBlockedFrame) error {
  1384. // We don't need to do anything in response to a STREAM_DATA_BLOCKED frame,
  1385. // but we need to make sure that the stream ID is valid.
  1386. _, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID)
  1387. return err
  1388. }
  1389. func (s *connection) handleMaxStreamsFrame(frame *wire.MaxStreamsFrame) {
  1390. s.streamsMap.HandleMaxStreamsFrame(frame)
  1391. }
  1392. func (s *connection) handleResetStreamFrame(frame *wire.ResetStreamFrame, rcvTime time.Time) error {
  1393. str, err := s.streamsMap.GetOrOpenReceiveStream(frame.StreamID)
  1394. if err != nil {
  1395. return err
  1396. }
  1397. if str == nil {
  1398. // stream is closed and already garbage collected
  1399. return nil
  1400. }
  1401. return str.handleResetStreamFrame(frame, rcvTime)
  1402. }
  1403. func (s *connection) handleStopSendingFrame(frame *wire.StopSendingFrame) error {
  1404. str, err := s.streamsMap.GetOrOpenSendStream(frame.StreamID)
  1405. if err != nil {
  1406. return err
  1407. }
  1408. if str == nil {
  1409. // stream is closed and already garbage collected
  1410. return nil
  1411. }
  1412. str.handleStopSendingFrame(frame)
  1413. return nil
  1414. }
  1415. func (s *connection) handlePathChallengeFrame(frame *wire.PathChallengeFrame) {
  1416. s.queueControlFrame(&wire.PathResponseFrame{Data: frame.Data})
  1417. }
  1418. func (s *connection) handleNewTokenFrame(frame *wire.NewTokenFrame) error {
  1419. if s.perspective == protocol.PerspectiveServer {
  1420. return &qerr.TransportError{
  1421. ErrorCode: qerr.ProtocolViolation,
  1422. ErrorMessage: "received NEW_TOKEN frame from the client",
  1423. }
  1424. }
  1425. if s.config.TokenStore != nil {
  1426. s.config.TokenStore.Put(s.tokenStoreKey, &ClientToken{data: frame.Token})
  1427. }
  1428. return nil
  1429. }
  1430. func (s *connection) handleNewConnectionIDFrame(f *wire.NewConnectionIDFrame) error {
  1431. return s.connIDManager.Add(f)
  1432. }
  1433. func (s *connection) handleRetireConnectionIDFrame(f *wire.RetireConnectionIDFrame, destConnID protocol.ConnectionID) error {
  1434. return s.connIDGenerator.Retire(f.SequenceNumber, destConnID)
  1435. }
  1436. func (s *connection) handleHandshakeDoneFrame(rcvTime time.Time) error {
  1437. if s.perspective == protocol.PerspectiveServer {
  1438. return &qerr.TransportError{
  1439. ErrorCode: qerr.ProtocolViolation,
  1440. ErrorMessage: "received a HANDSHAKE_DONE frame",
  1441. }
  1442. }
  1443. if !s.handshakeConfirmed {
  1444. return s.handleHandshakeConfirmed(rcvTime)
  1445. }
  1446. return nil
  1447. }
  1448. func (s *connection) handleAckFrame(frame *wire.AckFrame, encLevel protocol.EncryptionLevel, rcvTime time.Time) error {
  1449. acked1RTTPacket, err := s.sentPacketHandler.ReceivedAck(frame, encLevel, s.lastPacketReceivedTime)
  1450. if err != nil {
  1451. return err
  1452. }
  1453. if !acked1RTTPacket {
  1454. return nil
  1455. }
  1456. // On the client side: If the packet acknowledged a 1-RTT packet, this confirms the handshake.
  1457. // This is only possible if the ACK was sent in a 1-RTT packet.
  1458. // This is an optimization over simply waiting for a HANDSHAKE_DONE frame, see section 4.1.2 of RFC 9001.
  1459. if s.perspective == protocol.PerspectiveClient && !s.handshakeConfirmed {
  1460. if err := s.handleHandshakeConfirmed(rcvTime); err != nil {
  1461. return err
  1462. }
  1463. }
  1464. return s.cryptoStreamHandler.SetLargest1RTTAcked(frame.LargestAcked())
  1465. }
  1466. func (s *connection) handleDatagramFrame(f *wire.DatagramFrame) error {
  1467. if f.Length(s.version) > wire.MaxDatagramSize {
  1468. return &qerr.TransportError{
  1469. ErrorCode: qerr.ProtocolViolation,
  1470. ErrorMessage: "DATAGRAM frame too large",
  1471. }
  1472. }
  1473. s.datagramQueue.HandleDatagramFrame(f)
  1474. return nil
  1475. }
  1476. // closeLocal closes the connection and send a CONNECTION_CLOSE containing the error
  1477. func (s *connection) closeLocal(e error) {
  1478. s.closeOnce.Do(func() {
  1479. if e == nil {
  1480. s.logger.Infof("Closing connection.")
  1481. } else {
  1482. s.logger.Errorf("Closing connection with error: %s", e)
  1483. }
  1484. s.closeChan <- closeError{err: e, immediate: false, remote: false}
  1485. })
  1486. }
  1487. // destroy closes the connection without sending the error on the wire
  1488. func (s *connection) destroy(e error) {
  1489. s.destroyImpl(e)
  1490. <-s.ctx.Done()
  1491. }
  1492. func (s *connection) destroyImpl(e error) {
  1493. s.closeOnce.Do(func() {
  1494. if nerr, ok := e.(net.Error); ok && nerr.Timeout() {
  1495. s.logger.Errorf("Destroying connection: %s", e)
  1496. } else {
  1497. s.logger.Errorf("Destroying connection with error: %s", e)
  1498. }
  1499. s.closeChan <- closeError{err: e, immediate: true, remote: false}
  1500. })
  1501. }
  1502. func (s *connection) closeRemote(e error) {
  1503. s.closeOnce.Do(func() {
  1504. s.logger.Errorf("Peer closed connection with error: %s", e)
  1505. s.closeChan <- closeError{err: e, immediate: true, remote: true}
  1506. })
  1507. }
  1508. func (s *connection) CloseWithError(code ApplicationErrorCode, desc string) error {
  1509. s.closeLocal(&qerr.ApplicationError{
  1510. ErrorCode: code,
  1511. ErrorMessage: desc,
  1512. })
  1513. <-s.ctx.Done()
  1514. return nil
  1515. }
  1516. func (s *connection) closeWithTransportError(code TransportErrorCode) {
  1517. s.closeLocal(&qerr.TransportError{ErrorCode: code})
  1518. <-s.ctx.Done()
  1519. }
  1520. func (s *connection) handleCloseError(closeErr *closeError) {
  1521. e := closeErr.err
  1522. if e == nil {
  1523. e = &qerr.ApplicationError{}
  1524. } else {
  1525. defer func() {
  1526. closeErr.err = e
  1527. }()
  1528. }
  1529. var (
  1530. statelessResetErr *StatelessResetError
  1531. versionNegotiationErr *VersionNegotiationError
  1532. recreateErr *errCloseForRecreating
  1533. applicationErr *ApplicationError
  1534. transportErr *TransportError
  1535. )
  1536. switch {
  1537. case errors.Is(e, qerr.ErrIdleTimeout),
  1538. errors.Is(e, qerr.ErrHandshakeTimeout),
  1539. errors.As(e, &statelessResetErr),
  1540. errors.As(e, &versionNegotiationErr),
  1541. errors.As(e, &recreateErr),
  1542. errors.As(e, &applicationErr),
  1543. errors.As(e, &transportErr):
  1544. case closeErr.immediate:
  1545. e = closeErr.err
  1546. default:
  1547. e = &qerr.TransportError{
  1548. ErrorCode: qerr.InternalError,
  1549. ErrorMessage: e.Error(),
  1550. }
  1551. }
  1552. s.streamsMap.CloseWithError(e)
  1553. if s.datagramQueue != nil {
  1554. s.datagramQueue.CloseWithError(e)
  1555. }
  1556. // In rare instances, the connection ID manager might switch to a new connection ID
  1557. // when sending the CONNECTION_CLOSE frame.
  1558. // The connection ID manager removes the active stateless reset token from the packet
  1559. // handler map when it is closed, so we need to make sure that this happens last.
  1560. defer s.connIDManager.Close()
  1561. if s.tracer != nil && s.tracer.ClosedConnection != nil && !errors.As(e, &recreateErr) {
  1562. s.tracer.ClosedConnection(e)
  1563. }
  1564. // If this is a remote close we're done here
  1565. if closeErr.remote {
  1566. s.connIDGenerator.ReplaceWithClosed(nil)
  1567. return
  1568. }
  1569. if closeErr.immediate {
  1570. s.connIDGenerator.RemoveAll()
  1571. return
  1572. }
  1573. // Don't send out any CONNECTION_CLOSE if this is an error that occurred
  1574. // before we even sent out the first packet.
  1575. if s.perspective == protocol.PerspectiveClient && !s.sentFirstPacket {
  1576. s.connIDGenerator.RemoveAll()
  1577. return
  1578. }
  1579. connClosePacket, err := s.sendConnectionClose(e)
  1580. if err != nil {
  1581. s.logger.Debugf("Error sending CONNECTION_CLOSE: %s", err)
  1582. }
  1583. s.connIDGenerator.ReplaceWithClosed(connClosePacket)
  1584. }
  1585. func (s *connection) dropEncryptionLevel(encLevel protocol.EncryptionLevel, now time.Time) error {
  1586. if s.tracer != nil && s.tracer.DroppedEncryptionLevel != nil {
  1587. s.tracer.DroppedEncryptionLevel(encLevel)
  1588. }
  1589. s.sentPacketHandler.DropPackets(encLevel, now)
  1590. s.receivedPacketHandler.DropPackets(encLevel)
  1591. //nolint:exhaustive // only Initial and 0-RTT need special treatment
  1592. switch encLevel {
  1593. case protocol.EncryptionInitial:
  1594. s.droppedInitialKeys = true
  1595. s.cryptoStreamHandler.DiscardInitialKeys()
  1596. case protocol.Encryption0RTT:
  1597. s.streamsMap.ResetFor0RTT()
  1598. s.framer.Handle0RTTRejection()
  1599. return s.connFlowController.Reset()
  1600. }
  1601. return s.cryptoStreamManager.Drop(encLevel)
  1602. }
  1603. // is called for the client, when restoring transport parameters saved for 0-RTT
  1604. func (s *connection) restoreTransportParameters(params *wire.TransportParameters) {
  1605. if s.logger.Debug() {
  1606. s.logger.Debugf("Restoring Transport Parameters: %s", params)
  1607. }
  1608. s.peerParams = params
  1609. s.connIDGenerator.SetMaxActiveConnIDs(params.ActiveConnectionIDLimit)
  1610. s.connFlowController.UpdateSendWindow(params.InitialMaxData)
  1611. s.streamsMap.UpdateLimits(params)
  1612. s.connStateMutex.Lock()
  1613. s.connState.SupportsDatagrams = s.supportsDatagrams()
  1614. s.connStateMutex.Unlock()
  1615. }
  1616. func (s *connection) handleTransportParameters(params *wire.TransportParameters) error {
  1617. if s.tracer != nil && s.tracer.ReceivedTransportParameters != nil {
  1618. s.tracer.ReceivedTransportParameters(params)
  1619. }
  1620. if err := s.checkTransportParameters(params); err != nil {
  1621. return &qerr.TransportError{
  1622. ErrorCode: qerr.TransportParameterError,
  1623. ErrorMessage: err.Error(),
  1624. }
  1625. }
  1626. if s.perspective == protocol.PerspectiveClient && s.peerParams != nil && s.ConnectionState().Used0RTT && !params.ValidForUpdate(s.peerParams) {
  1627. return &qerr.TransportError{
  1628. ErrorCode: qerr.ProtocolViolation,
  1629. ErrorMessage: "server sent reduced limits after accepting 0-RTT data",
  1630. }
  1631. }
  1632. s.peerParams = params
  1633. // On the client side we have to wait for handshake completion.
  1634. // During a 0-RTT connection, we are only allowed to use the new transport parameters for 1-RTT packets.
  1635. if s.perspective == protocol.PerspectiveServer {
  1636. s.applyTransportParameters()
  1637. // On the server side, the early connection is ready as soon as we processed
  1638. // the client's transport parameters.
  1639. close(s.earlyConnReadyChan)
  1640. }
  1641. s.connStateMutex.Lock()
  1642. s.connState.SupportsDatagrams = s.supportsDatagrams()
  1643. s.connStateMutex.Unlock()
  1644. return nil
  1645. }
  1646. func (s *connection) checkTransportParameters(params *wire.TransportParameters) error {
  1647. if s.logger.Debug() {
  1648. s.logger.Debugf("Processed Transport Parameters: %s", params)
  1649. }
  1650. // check the initial_source_connection_id
  1651. if params.InitialSourceConnectionID != s.handshakeDestConnID {
  1652. return fmt.Errorf("expected initial_source_connection_id to equal %s, is %s", s.handshakeDestConnID, params.InitialSourceConnectionID)
  1653. }
  1654. if s.perspective == protocol.PerspectiveServer {
  1655. return nil
  1656. }
  1657. // check the original_destination_connection_id
  1658. if params.OriginalDestinationConnectionID != s.origDestConnID {
  1659. return fmt.Errorf("expected original_destination_connection_id to equal %s, is %s", s.origDestConnID, params.OriginalDestinationConnectionID)
  1660. }
  1661. if s.retrySrcConnID != nil { // a Retry was performed
  1662. if params.RetrySourceConnectionID == nil {
  1663. return errors.New("missing retry_source_connection_id")
  1664. }
  1665. if *params.RetrySourceConnectionID != *s.retrySrcConnID {
  1666. return fmt.Errorf("expected retry_source_connection_id to equal %s, is %s", s.retrySrcConnID, *params.RetrySourceConnectionID)
  1667. }
  1668. } else if params.RetrySourceConnectionID != nil {
  1669. return errors.New("received retry_source_connection_id, although no Retry was performed")
  1670. }
  1671. return nil
  1672. }
  1673. func (s *connection) applyTransportParameters() {
  1674. params := s.peerParams
  1675. // Our local idle timeout will always be > 0.
  1676. s.idleTimeout = s.config.MaxIdleTimeout
  1677. // If the peer advertised an idle timeout, take the minimum of the values.
  1678. if params.MaxIdleTimeout > 0 {
  1679. s.idleTimeout = min(s.idleTimeout, params.MaxIdleTimeout)
  1680. }
  1681. s.keepAliveInterval = min(s.config.KeepAlivePeriod, s.idleTimeout/2)
  1682. s.streamsMap.UpdateLimits(params)
  1683. s.frameParser.SetAckDelayExponent(params.AckDelayExponent)
  1684. s.connFlowController.UpdateSendWindow(params.InitialMaxData)
  1685. s.rttStats.SetMaxAckDelay(params.MaxAckDelay)
  1686. s.connIDGenerator.SetMaxActiveConnIDs(params.ActiveConnectionIDLimit)
  1687. if params.StatelessResetToken != nil {
  1688. s.connIDManager.SetStatelessResetToken(*params.StatelessResetToken)
  1689. }
  1690. // We don't support connection migration yet, so we don't have any use for the preferred_address.
  1691. if params.PreferredAddress != nil {
  1692. // Retire the connection ID.
  1693. s.connIDManager.AddFromPreferredAddress(params.PreferredAddress.ConnectionID, params.PreferredAddress.StatelessResetToken)
  1694. }
  1695. maxPacketSize := protocol.ByteCount(protocol.MaxPacketBufferSize)
  1696. if params.MaxUDPPayloadSize > 0 && params.MaxUDPPayloadSize < maxPacketSize {
  1697. maxPacketSize = params.MaxUDPPayloadSize
  1698. }
  1699. // [Psiphon] SECTION BEGIN
  1700. // Adjust the max packet sizes to allow for obfuscation overhead.
  1701. maxPacketSizeAdjustment := protocol.ByteCount(0)
  1702. if s.config.ServerMaxPacketSizeAdjustment != nil {
  1703. maxPacketSizeAdjustment = protocol.ByteCount(s.config.ServerMaxPacketSizeAdjustment(s.conn.RemoteAddr()))
  1704. } else {
  1705. maxPacketSizeAdjustment = protocol.ByteCount(s.config.ClientMaxPacketSizeAdjustment)
  1706. }
  1707. if maxPacketSize > maxPacketSizeAdjustment {
  1708. maxPacketSize -= maxPacketSizeAdjustment
  1709. }
  1710. initialMaxPacketSize := protocol.ByteCount(s.config.InitialPacketSize)
  1711. if initialMaxPacketSize > maxPacketSizeAdjustment {
  1712. initialMaxPacketSize -= maxPacketSizeAdjustment
  1713. }
  1714. // [Psiphon] SECTION END
  1715. s.mtuDiscoverer = newMTUDiscoverer(
  1716. s.rttStats,
  1717. // [Psiphon]
  1718. // protocol.ByteCount(s.config.InitialPacketSize),
  1719. initialMaxPacketSize,
  1720. maxPacketSize,
  1721. s.onMTUIncreased,
  1722. s.tracer,
  1723. )
  1724. }
  1725. func (s *connection) triggerSending(now time.Time) error {
  1726. s.pacingDeadline = time.Time{}
  1727. sendMode := s.sentPacketHandler.SendMode(now)
  1728. //nolint:exhaustive // No need to handle pacing limited here.
  1729. switch sendMode {
  1730. case ackhandler.SendAny:
  1731. return s.sendPackets(now)
  1732. case ackhandler.SendNone:
  1733. return nil
  1734. case ackhandler.SendPacingLimited:
  1735. deadline := s.sentPacketHandler.TimeUntilSend()
  1736. if deadline.IsZero() {
  1737. deadline = deadlineSendImmediately
  1738. }
  1739. s.pacingDeadline = deadline
  1740. // Allow sending of an ACK if we're pacing limit.
  1741. // This makes sure that a peer that is mostly receiving data (and thus has an inaccurate cwnd estimate)
  1742. // sends enough ACKs to allow its peer to utilize the bandwidth.
  1743. fallthrough
  1744. case ackhandler.SendAck:
  1745. // We can at most send a single ACK only packet.
  1746. // There will only be a new ACK after receiving new packets.
  1747. // SendAck is only returned when we're congestion limited, so we don't need to set the pacing timer.
  1748. return s.maybeSendAckOnlyPacket(now)
  1749. case ackhandler.SendPTOInitial, ackhandler.SendPTOHandshake, ackhandler.SendPTOAppData:
  1750. if err := s.sendProbePacket(sendMode, now); err != nil {
  1751. return err
  1752. }
  1753. if s.sendQueue.WouldBlock() {
  1754. s.scheduleSending()
  1755. return nil
  1756. }
  1757. return s.triggerSending(now)
  1758. default:
  1759. return fmt.Errorf("BUG: invalid send mode %d", sendMode)
  1760. }
  1761. }
  1762. func (s *connection) sendPackets(now time.Time) error {
  1763. // Path MTU Discovery
  1764. // Can't use GSO, since we need to send a single packet that's larger than our current maximum size.
  1765. // Performance-wise, this doesn't matter, since we only send a very small (<10) number of
  1766. // MTU probe packets per connection.
  1767. if s.handshakeConfirmed && s.mtuDiscoverer != nil && s.mtuDiscoverer.ShouldSendProbe(now) {
  1768. ping, size := s.mtuDiscoverer.GetPing(now)
  1769. p, buf, err := s.packer.PackMTUProbePacket(ping, size, s.version)
  1770. if err != nil {
  1771. return err
  1772. }
  1773. ecn := s.sentPacketHandler.ECNMode(true)
  1774. s.logShortHeaderPacket(p.DestConnID, p.Ack, p.Frames, p.StreamFrames, p.PacketNumber, p.PacketNumberLen, p.KeyPhase, ecn, buf.Len(), false)
  1775. s.registerPackedShortHeaderPacket(p, ecn, now)
  1776. s.sendQueue.Send(buf, 0, ecn)
  1777. // There's (likely) more data to send. Loop around again.
  1778. s.scheduleSending()
  1779. return nil
  1780. }
  1781. if offset := s.connFlowController.GetWindowUpdate(now); offset > 0 {
  1782. s.framer.QueueControlFrame(&wire.MaxDataFrame{MaximumData: offset})
  1783. }
  1784. if cf := s.cryptoStreamManager.GetPostHandshakeData(protocol.MaxPostHandshakeCryptoFrameSize); cf != nil {
  1785. s.queueControlFrame(cf)
  1786. }
  1787. if !s.handshakeConfirmed {
  1788. packet, err := s.packer.PackCoalescedPacket(false, s.maxPacketSize(), now, s.version)
  1789. if err != nil || packet == nil {
  1790. return err
  1791. }
  1792. s.sentFirstPacket = true
  1793. if err := s.sendPackedCoalescedPacket(packet, s.sentPacketHandler.ECNMode(packet.IsOnlyShortHeaderPacket()), now); err != nil {
  1794. return err
  1795. }
  1796. sendMode := s.sentPacketHandler.SendMode(now)
  1797. if sendMode == ackhandler.SendPacingLimited {
  1798. s.resetPacingDeadline()
  1799. } else if sendMode == ackhandler.SendAny {
  1800. s.pacingDeadline = deadlineSendImmediately
  1801. }
  1802. return nil
  1803. }
  1804. if s.conn.capabilities().GSO {
  1805. return s.sendPacketsWithGSO(now)
  1806. }
  1807. return s.sendPacketsWithoutGSO(now)
  1808. }
  1809. func (s *connection) sendPacketsWithoutGSO(now time.Time) error {
  1810. for {
  1811. buf := getPacketBuffer()
  1812. ecn := s.sentPacketHandler.ECNMode(true)
  1813. if _, err := s.appendOneShortHeaderPacket(buf, s.maxPacketSize(), ecn, now); err != nil {
  1814. if err == errNothingToPack {
  1815. buf.Release()
  1816. return nil
  1817. }
  1818. return err
  1819. }
  1820. s.sendQueue.Send(buf, 0, ecn)
  1821. if s.sendQueue.WouldBlock() {
  1822. return nil
  1823. }
  1824. sendMode := s.sentPacketHandler.SendMode(now)
  1825. if sendMode == ackhandler.SendPacingLimited {
  1826. s.resetPacingDeadline()
  1827. return nil
  1828. }
  1829. if sendMode != ackhandler.SendAny {
  1830. return nil
  1831. }
  1832. // Prioritize receiving of packets over sending out more packets.
  1833. if len(s.receivedPackets) > 0 {
  1834. s.pacingDeadline = deadlineSendImmediately
  1835. return nil
  1836. }
  1837. }
  1838. }
  1839. func (s *connection) sendPacketsWithGSO(now time.Time) error {
  1840. buf := getLargePacketBuffer()
  1841. maxSize := s.maxPacketSize()
  1842. ecn := s.sentPacketHandler.ECNMode(true)
  1843. for {
  1844. var dontSendMore bool
  1845. size, err := s.appendOneShortHeaderPacket(buf, maxSize, ecn, now)
  1846. if err != nil {
  1847. if err != errNothingToPack {
  1848. return err
  1849. }
  1850. if buf.Len() == 0 {
  1851. buf.Release()
  1852. return nil
  1853. }
  1854. dontSendMore = true
  1855. }
  1856. if !dontSendMore {
  1857. sendMode := s.sentPacketHandler.SendMode(now)
  1858. if sendMode == ackhandler.SendPacingLimited {
  1859. s.resetPacingDeadline()
  1860. }
  1861. if sendMode != ackhandler.SendAny {
  1862. dontSendMore = true
  1863. }
  1864. }
  1865. // Don't send more packets in this batch if they require a different ECN marking than the previous ones.
  1866. nextECN := s.sentPacketHandler.ECNMode(true)
  1867. // Append another packet if
  1868. // 1. The congestion controller and pacer allow sending more
  1869. // 2. The last packet appended was a full-size packet
  1870. // 3. The next packet will have the same ECN marking
  1871. // 4. We still have enough space for another full-size packet in the buffer
  1872. if !dontSendMore && size == maxSize && nextECN == ecn && buf.Len()+maxSize <= buf.Cap() {
  1873. continue
  1874. }
  1875. s.sendQueue.Send(buf, uint16(maxSize), ecn)
  1876. if dontSendMore {
  1877. return nil
  1878. }
  1879. if s.sendQueue.WouldBlock() {
  1880. return nil
  1881. }
  1882. // Prioritize receiving of packets over sending out more packets.
  1883. if len(s.receivedPackets) > 0 {
  1884. s.pacingDeadline = deadlineSendImmediately
  1885. return nil
  1886. }
  1887. ecn = nextECN
  1888. buf = getLargePacketBuffer()
  1889. }
  1890. }
  1891. func (s *connection) resetPacingDeadline() {
  1892. deadline := s.sentPacketHandler.TimeUntilSend()
  1893. if deadline.IsZero() {
  1894. deadline = deadlineSendImmediately
  1895. }
  1896. s.pacingDeadline = deadline
  1897. }
  1898. func (s *connection) maybeSendAckOnlyPacket(now time.Time) error {
  1899. if !s.handshakeConfirmed {
  1900. ecn := s.sentPacketHandler.ECNMode(false)
  1901. packet, err := s.packer.PackCoalescedPacket(true, s.maxPacketSize(), now, s.version)
  1902. if err != nil {
  1903. return err
  1904. }
  1905. if packet == nil {
  1906. return nil
  1907. }
  1908. return s.sendPackedCoalescedPacket(packet, ecn, now)
  1909. }
  1910. ecn := s.sentPacketHandler.ECNMode(true)
  1911. p, buf, err := s.packer.PackAckOnlyPacket(s.maxPacketSize(), now, s.version)
  1912. if err != nil {
  1913. if err == errNothingToPack {
  1914. return nil
  1915. }
  1916. return err
  1917. }
  1918. s.logShortHeaderPacket(p.DestConnID, p.Ack, p.Frames, p.StreamFrames, p.PacketNumber, p.PacketNumberLen, p.KeyPhase, ecn, buf.Len(), false)
  1919. s.registerPackedShortHeaderPacket(p, ecn, now)
  1920. s.sendQueue.Send(buf, 0, ecn)
  1921. return nil
  1922. }
  1923. func (s *connection) sendProbePacket(sendMode ackhandler.SendMode, now time.Time) error {
  1924. var encLevel protocol.EncryptionLevel
  1925. //nolint:exhaustive // We only need to handle the PTO send modes here.
  1926. switch sendMode {
  1927. case ackhandler.SendPTOInitial:
  1928. encLevel = protocol.EncryptionInitial
  1929. case ackhandler.SendPTOHandshake:
  1930. encLevel = protocol.EncryptionHandshake
  1931. case ackhandler.SendPTOAppData:
  1932. encLevel = protocol.Encryption1RTT
  1933. default:
  1934. return fmt.Errorf("connection BUG: unexpected send mode: %d", sendMode)
  1935. }
  1936. // Queue probe packets until we actually send out a packet,
  1937. // or until there are no more packets to queue.
  1938. var packet *coalescedPacket
  1939. for {
  1940. if wasQueued := s.sentPacketHandler.QueueProbePacket(encLevel); !wasQueued {
  1941. break
  1942. }
  1943. var err error
  1944. packet, err = s.packer.MaybePackProbePacket(encLevel, s.maxPacketSize(), now, s.version)
  1945. if err != nil {
  1946. return err
  1947. }
  1948. if packet != nil {
  1949. break
  1950. }
  1951. }
  1952. if packet == nil {
  1953. s.retransmissionQueue.AddPing(encLevel)
  1954. var err error
  1955. packet, err = s.packer.MaybePackProbePacket(encLevel, s.maxPacketSize(), now, s.version)
  1956. if err != nil {
  1957. return err
  1958. }
  1959. }
  1960. if packet == nil || (len(packet.longHdrPackets) == 0 && packet.shortHdrPacket == nil) {
  1961. return fmt.Errorf("connection BUG: couldn't pack %s probe packet", encLevel)
  1962. }
  1963. return s.sendPackedCoalescedPacket(packet, s.sentPacketHandler.ECNMode(packet.IsOnlyShortHeaderPacket()), now)
  1964. }
  1965. // appendOneShortHeaderPacket appends a new packet to the given packetBuffer.
  1966. // If there was nothing to pack, the returned size is 0.
  1967. func (s *connection) appendOneShortHeaderPacket(buf *packetBuffer, maxSize protocol.ByteCount, ecn protocol.ECN, now time.Time) (protocol.ByteCount, error) {
  1968. startLen := buf.Len()
  1969. p, err := s.packer.AppendPacket(buf, maxSize, now, s.version)
  1970. if err != nil {
  1971. return 0, err
  1972. }
  1973. size := buf.Len() - startLen
  1974. s.logShortHeaderPacket(p.DestConnID, p.Ack, p.Frames, p.StreamFrames, p.PacketNumber, p.PacketNumberLen, p.KeyPhase, ecn, size, false)
  1975. s.registerPackedShortHeaderPacket(p, ecn, now)
  1976. return size, nil
  1977. }
  1978. func (s *connection) registerPackedShortHeaderPacket(p shortHeaderPacket, ecn protocol.ECN, now time.Time) {
  1979. if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && (len(p.StreamFrames) > 0 || ackhandler.HasAckElicitingFrames(p.Frames)) {
  1980. s.firstAckElicitingPacketAfterIdleSentTime = now
  1981. }
  1982. largestAcked := protocol.InvalidPacketNumber
  1983. if p.Ack != nil {
  1984. largestAcked = p.Ack.LargestAcked()
  1985. }
  1986. s.sentPacketHandler.SentPacket(now, p.PacketNumber, largestAcked, p.StreamFrames, p.Frames, protocol.Encryption1RTT, ecn, p.Length, p.IsPathMTUProbePacket)
  1987. s.connIDManager.SentPacket()
  1988. }
  1989. func (s *connection) sendPackedCoalescedPacket(packet *coalescedPacket, ecn protocol.ECN, now time.Time) error {
  1990. s.logCoalescedPacket(packet, ecn)
  1991. for _, p := range packet.longHdrPackets {
  1992. if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && p.IsAckEliciting() {
  1993. s.firstAckElicitingPacketAfterIdleSentTime = now
  1994. }
  1995. largestAcked := protocol.InvalidPacketNumber
  1996. if p.ack != nil {
  1997. largestAcked = p.ack.LargestAcked()
  1998. }
  1999. s.sentPacketHandler.SentPacket(now, p.header.PacketNumber, largestAcked, p.streamFrames, p.frames, p.EncryptionLevel(), ecn, p.length, false)
  2000. if s.perspective == protocol.PerspectiveClient && p.EncryptionLevel() == protocol.EncryptionHandshake &&
  2001. !s.droppedInitialKeys {
  2002. // On the client side, Initial keys are dropped as soon as the first Handshake packet is sent.
  2003. // See Section 4.9.1 of RFC 9001.
  2004. if err := s.dropEncryptionLevel(protocol.EncryptionInitial, now); err != nil {
  2005. return err
  2006. }
  2007. }
  2008. }
  2009. if p := packet.shortHdrPacket; p != nil {
  2010. if s.firstAckElicitingPacketAfterIdleSentTime.IsZero() && p.IsAckEliciting() {
  2011. s.firstAckElicitingPacketAfterIdleSentTime = now
  2012. }
  2013. largestAcked := protocol.InvalidPacketNumber
  2014. if p.Ack != nil {
  2015. largestAcked = p.Ack.LargestAcked()
  2016. }
  2017. s.sentPacketHandler.SentPacket(now, p.PacketNumber, largestAcked, p.StreamFrames, p.Frames, protocol.Encryption1RTT, ecn, p.Length, p.IsPathMTUProbePacket)
  2018. }
  2019. s.connIDManager.SentPacket()
  2020. s.sendQueue.Send(packet.buffer, 0, ecn)
  2021. return nil
  2022. }
  2023. func (s *connection) sendConnectionClose(e error) ([]byte, error) {
  2024. var packet *coalescedPacket
  2025. var err error
  2026. var transportErr *qerr.TransportError
  2027. var applicationErr *qerr.ApplicationError
  2028. if errors.As(e, &transportErr) {
  2029. packet, err = s.packer.PackConnectionClose(transportErr, s.maxPacketSize(), s.version)
  2030. } else if errors.As(e, &applicationErr) {
  2031. packet, err = s.packer.PackApplicationClose(applicationErr, s.maxPacketSize(), s.version)
  2032. } else {
  2033. packet, err = s.packer.PackConnectionClose(&qerr.TransportError{
  2034. ErrorCode: qerr.InternalError,
  2035. ErrorMessage: fmt.Sprintf("connection BUG: unspecified error type (msg: %s)", e.Error()),
  2036. }, s.maxPacketSize(), s.version)
  2037. }
  2038. if err != nil {
  2039. return nil, err
  2040. }
  2041. ecn := s.sentPacketHandler.ECNMode(packet.IsOnlyShortHeaderPacket())
  2042. s.logCoalescedPacket(packet, ecn)
  2043. return packet.buffer.Data, s.conn.Write(packet.buffer.Data, 0, ecn)
  2044. }
  2045. // [Psiphon]
  2046. //
  2047. // Upstream implementation kept for reference.
  2048. //
  2049. // func (s *connection) maxPacketSize() protocol.ByteCount {
  2050. // if s.mtuDiscoverer == nil {
  2051. // // Use the configured packet size on the client side.
  2052. // // If the server sends a max_udp_payload_size that's smaller than this size, we can ignore this:
  2053. // // Apparently the server still processed the (fully padded) Initial packet anyway.
  2054. // if s.perspective == protocol.PerspectiveClient {
  2055. // return protocol.ByteCount(s.config.InitialPacketSize)
  2056. // }
  2057. // // On the server side, there's no downside to using 1200 bytes until we received the client's transport
  2058. // // parameters:
  2059. // // * If the first packet didn't contain the entire ClientHello, all we can do is ACK that packet. We don't
  2060. // // need a lot of bytes for that.
  2061. // // * If it did, we will have processed the transport parameters and initialized the MTU discoverer.
  2062. // return protocol.MinInitialPacketSize
  2063. // }
  2064. // return s.mtuDiscoverer.CurrentSize()
  2065. // }
  2066. func (s *connection) maxPacketSize() protocol.ByteCount {
  2067. // TODO: internal/congestion.pacer continues to use
  2068. // initialMaxDatagramSize = protocol.InitialPacketSize.
  2069. // This seems inconsistent because newCubicSender passes a variable
  2070. // initialMaxDatagramSize, but newPacer still relies on the global constant.
  2071. // However, cubicSender has SetMaxDatagramSize, which updates both
  2072. // cubicSender and newPacer when the max packet size increases (e.g., after MTU discovery).
  2073. // This could be a minor bug in quic-go, as newPacer should ideally use
  2074. // the variable initialMaxDatagramSize passed to newCubicSender.
  2075. // Possible fixes include modifying the newPacer constructor or explicitly
  2076. // calling c.pacer.SetMaxDatagramSize(initialMaxDatagramSize) in newCubicSender.
  2077. if s.mtuDiscoverer == nil {
  2078. // Use the configured packet size on the client side.
  2079. // If the server sends a max_udp_payload_size that's smaller than this size, we can ignore this:
  2080. // Apparently the server still processed the (fully padded) Initial packet anyway.
  2081. if s.perspective == protocol.PerspectiveClient {
  2082. packetSizeAdjustment := protocol.ByteCount(s.config.ClientMaxPacketSizeAdjustment)
  2083. initialMaxPacketSize := protocol.ByteCount(s.config.InitialPacketSize)
  2084. if initialMaxPacketSize > packetSizeAdjustment {
  2085. initialMaxPacketSize -= packetSizeAdjustment
  2086. }
  2087. return initialMaxPacketSize
  2088. }
  2089. // On the server side, there's no downside to using 1200 bytes until we received the client's transport
  2090. // parameters:
  2091. // * If the first packet didn't contain the entire ClientHello, all we can do is ACK that packet. We don't
  2092. // need a lot of bytes for that.
  2093. // * If it did, we will have processed the transport parameters and initialized the MTU discoverer.
  2094. packetSizeAdjustment := protocol.ByteCount(0)
  2095. if s.config.ServerMaxPacketSizeAdjustment != nil {
  2096. packetSizeAdjustment = protocol.ByteCount(s.config.ServerMaxPacketSizeAdjustment(s.conn.RemoteAddr()))
  2097. }
  2098. initialPacketSize := protocol.ByteCount(protocol.MinInitialPacketSize)
  2099. if initialPacketSize > packetSizeAdjustment {
  2100. initialPacketSize -= packetSizeAdjustment
  2101. }
  2102. return initialPacketSize
  2103. }
  2104. return s.mtuDiscoverer.CurrentSize()
  2105. }
  2106. // AcceptStream returns the next stream openend by the peer
  2107. func (s *connection) AcceptStream(ctx context.Context) (Stream, error) {
  2108. return s.streamsMap.AcceptStream(ctx)
  2109. }
  2110. func (s *connection) AcceptUniStream(ctx context.Context) (ReceiveStream, error) {
  2111. return s.streamsMap.AcceptUniStream(ctx)
  2112. }
  2113. // OpenStream opens a stream
  2114. func (s *connection) OpenStream() (Stream, error) {
  2115. return s.streamsMap.OpenStream()
  2116. }
  2117. func (s *connection) OpenStreamSync(ctx context.Context) (Stream, error) {
  2118. return s.streamsMap.OpenStreamSync(ctx)
  2119. }
  2120. func (s *connection) OpenUniStream() (SendStream, error) {
  2121. return s.streamsMap.OpenUniStream()
  2122. }
  2123. func (s *connection) OpenUniStreamSync(ctx context.Context) (SendStream, error) {
  2124. return s.streamsMap.OpenUniStreamSync(ctx)
  2125. }
  2126. func (s *connection) newFlowController(id protocol.StreamID) flowcontrol.StreamFlowController {
  2127. initialSendWindow := s.peerParams.InitialMaxStreamDataUni
  2128. if id.Type() == protocol.StreamTypeBidi {
  2129. if id.InitiatedBy() == s.perspective {
  2130. initialSendWindow = s.peerParams.InitialMaxStreamDataBidiRemote
  2131. } else {
  2132. initialSendWindow = s.peerParams.InitialMaxStreamDataBidiLocal
  2133. }
  2134. }
  2135. return flowcontrol.NewStreamFlowController(
  2136. id,
  2137. s.connFlowController,
  2138. protocol.ByteCount(s.config.InitialStreamReceiveWindow),
  2139. protocol.ByteCount(s.config.MaxStreamReceiveWindow),
  2140. initialSendWindow,
  2141. s.rttStats,
  2142. s.logger,
  2143. )
  2144. }
  2145. // scheduleSending signals that we have data for sending
  2146. func (s *connection) scheduleSending() {
  2147. select {
  2148. case s.sendingScheduled <- struct{}{}:
  2149. default:
  2150. }
  2151. }
  2152. // tryQueueingUndecryptablePacket queues a packet for which we're missing the decryption keys.
  2153. // The logging.PacketType is only used for logging purposes.
  2154. func (s *connection) tryQueueingUndecryptablePacket(p receivedPacket, pt logging.PacketType) {
  2155. if s.handshakeComplete {
  2156. panic("shouldn't queue undecryptable packets after handshake completion")
  2157. }
  2158. if len(s.undecryptablePackets)+1 > protocol.MaxUndecryptablePackets {
  2159. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  2160. s.tracer.DroppedPacket(pt, protocol.InvalidPacketNumber, p.Size(), logging.PacketDropDOSPrevention)
  2161. }
  2162. s.logger.Infof("Dropping undecryptable packet (%d bytes). Undecryptable packet queue full.", p.Size())
  2163. return
  2164. }
  2165. s.logger.Infof("Queueing packet (%d bytes) for later decryption", p.Size())
  2166. if s.tracer != nil && s.tracer.BufferedPacket != nil {
  2167. s.tracer.BufferedPacket(pt, p.Size())
  2168. }
  2169. s.undecryptablePackets = append(s.undecryptablePackets, p)
  2170. }
  2171. func (s *connection) queueControlFrame(f wire.Frame) {
  2172. s.framer.QueueControlFrame(f)
  2173. s.scheduleSending()
  2174. }
  2175. func (s *connection) onHasConnectionData() { s.scheduleSending() }
  2176. func (s *connection) onHasStreamData(id protocol.StreamID, str sendStreamI) {
  2177. s.framer.AddActiveStream(id, str)
  2178. s.scheduleSending()
  2179. }
  2180. func (s *connection) onHasStreamControlFrame(id protocol.StreamID, str streamControlFrameGetter) {
  2181. s.framer.AddStreamWithControlFrames(id, str)
  2182. s.scheduleSending()
  2183. }
  2184. func (s *connection) onStreamCompleted(id protocol.StreamID) {
  2185. if err := s.streamsMap.DeleteStream(id); err != nil {
  2186. s.closeLocal(err)
  2187. }
  2188. s.framer.RemoveActiveStream(id)
  2189. }
  2190. func (s *connection) onMTUIncreased(mtu protocol.ByteCount) {
  2191. s.maxPayloadSizeEstimate.Store(uint32(estimateMaxPayloadSize(mtu)))
  2192. s.sentPacketHandler.SetMaxDatagramSize(mtu)
  2193. }
  2194. func (s *connection) SendDatagram(p []byte) error {
  2195. if !s.supportsDatagrams() {
  2196. return errors.New("datagram support disabled")
  2197. }
  2198. f := &wire.DatagramFrame{DataLenPresent: true}
  2199. // The payload size estimate is conservative.
  2200. // Under many circumstances we could send a few more bytes.
  2201. maxDataLen := min(
  2202. f.MaxDataLen(s.peerParams.MaxDatagramFrameSize, s.version),
  2203. protocol.ByteCount(s.maxPayloadSizeEstimate.Load()),
  2204. )
  2205. if protocol.ByteCount(len(p)) > maxDataLen {
  2206. return &DatagramTooLargeError{MaxDatagramPayloadSize: int64(maxDataLen)}
  2207. }
  2208. f.Data = make([]byte, len(p))
  2209. copy(f.Data, p)
  2210. return s.datagramQueue.Add(f)
  2211. }
  2212. func (s *connection) ReceiveDatagram(ctx context.Context) ([]byte, error) {
  2213. if !s.config.EnableDatagrams {
  2214. return nil, errors.New("datagram support disabled")
  2215. }
  2216. return s.datagramQueue.Receive(ctx)
  2217. }
  2218. func (s *connection) LocalAddr() net.Addr { return s.conn.LocalAddr() }
  2219. func (s *connection) RemoteAddr() net.Addr { return s.conn.RemoteAddr() }
  2220. func (s *connection) NextConnection(ctx context.Context) (Connection, error) {
  2221. // The handshake might fail after the server rejected 0-RTT.
  2222. // This could happen if the Finished message is malformed or never received.
  2223. select {
  2224. case <-ctx.Done():
  2225. return nil, context.Cause(ctx)
  2226. case <-s.Context().Done():
  2227. case <-s.HandshakeComplete():
  2228. s.streamsMap.UseResetMaps()
  2229. }
  2230. return s, nil
  2231. }
  2232. // estimateMaxPayloadSize estimates the maximum payload size for short header packets.
  2233. // It is not very sophisticated: it just subtracts the size of header (assuming the maximum
  2234. // connection ID length), and the size of the encryption tag.
  2235. func estimateMaxPayloadSize(mtu protocol.ByteCount) protocol.ByteCount {
  2236. return mtu - 1 /* type byte */ - 20 /* maximum connection ID length */ - 16 /* tag size */
  2237. }