server.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071
  1. package quic
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. tls "github.com/Psiphon-Labs/psiphon-tls"
  11. "github.com/Psiphon-Labs/quic-go/internal/handshake"
  12. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  13. "github.com/Psiphon-Labs/quic-go/internal/qerr"
  14. "github.com/Psiphon-Labs/quic-go/internal/qtls"
  15. "github.com/Psiphon-Labs/quic-go/internal/utils"
  16. "github.com/Psiphon-Labs/quic-go/internal/wire"
  17. "github.com/Psiphon-Labs/quic-go/logging"
  18. )
  19. // ErrServerClosed is returned by the Listener or EarlyListener's Accept method after a call to Close.
  20. var ErrServerClosed = errors.New("quic: server closed")
  21. // packetHandler handles packets
  22. type packetHandler interface {
  23. handlePacket(receivedPacket)
  24. shutdown()
  25. destroy(error)
  26. getPerspective() protocol.Perspective
  27. }
  28. type packetHandlerManager interface {
  29. Get(protocol.ConnectionID) (packetHandler, bool)
  30. GetByResetToken(protocol.StatelessResetToken) (packetHandler, bool)
  31. AddWithConnID(protocol.ConnectionID, protocol.ConnectionID, func() (packetHandler, bool)) bool
  32. Close(error)
  33. connRunner
  34. }
  35. type quicConn interface {
  36. EarlyConnection
  37. earlyConnReady() <-chan struct{}
  38. handlePacket(receivedPacket)
  39. GetVersion() protocol.VersionNumber
  40. getPerspective() protocol.Perspective
  41. run() error
  42. destroy(error)
  43. shutdown()
  44. }
  45. type zeroRTTQueue struct {
  46. packets []receivedPacket
  47. expiration time.Time
  48. }
  49. type rejectedPacket struct {
  50. receivedPacket
  51. hdr *wire.Header
  52. }
  53. // A Listener of QUIC
  54. type baseServer struct {
  55. disableVersionNegotiation bool
  56. acceptEarlyConns bool
  57. tlsConf *tls.Config
  58. config *Config
  59. conn rawConn
  60. tokenGenerator *handshake.TokenGenerator
  61. maxTokenAge time.Duration
  62. connIDGenerator ConnectionIDGenerator
  63. connHandler packetHandlerManager
  64. onClose func()
  65. receivedPackets chan receivedPacket
  66. nextZeroRTTCleanup time.Time
  67. zeroRTTQueues map[protocol.ConnectionID]*zeroRTTQueue // only initialized if acceptEarlyConns == true
  68. // set as a member, so they can be set in the tests
  69. newConn func(
  70. sendConn,
  71. connRunner,
  72. protocol.ConnectionID, /* original dest connection ID */
  73. *protocol.ConnectionID, /* retry src connection ID */
  74. protocol.ConnectionID, /* client dest connection ID */
  75. protocol.ConnectionID, /* destination connection ID */
  76. protocol.ConnectionID, /* source connection ID */
  77. ConnectionIDGenerator,
  78. protocol.StatelessResetToken,
  79. *Config,
  80. *tls.Config,
  81. *handshake.TokenGenerator,
  82. bool, /* client address validated by an address validation token */
  83. *logging.ConnectionTracer,
  84. uint64,
  85. utils.Logger,
  86. protocol.VersionNumber,
  87. ) quicConn
  88. closeOnce sync.Once
  89. errorChan chan struct{} // is closed when the server is closed
  90. closeErr error
  91. running chan struct{} // closed as soon as run() returns
  92. versionNegotiationQueue chan receivedPacket
  93. invalidTokenQueue chan rejectedPacket
  94. connectionRefusedQueue chan rejectedPacket
  95. retryQueue chan rejectedPacket
  96. connQueue chan quicConn
  97. connQueueLen int32 // to be used as an atomic
  98. tracer *logging.Tracer
  99. logger utils.Logger
  100. }
  101. // A Listener listens for incoming QUIC connections.
  102. // It returns connections once the handshake has completed.
  103. type Listener struct {
  104. baseServer *baseServer
  105. }
  106. // Accept returns new connections. It should be called in a loop.
  107. func (l *Listener) Accept(ctx context.Context) (Connection, error) {
  108. return l.baseServer.Accept(ctx)
  109. }
  110. // Close closes the listener.
  111. // Accept will return ErrServerClosed as soon as all connections in the accept queue have been accepted.
  112. // QUIC handshakes that are still in flight will be rejected with a CONNECTION_REFUSED error.
  113. // The effect of closing the listener depends on how it was created:
  114. // * if it was created using Transport.Listen, already established connections will be unaffected
  115. // * if it was created using the Listen convenience method, all established connection will be closed immediately
  116. func (l *Listener) Close() error {
  117. return l.baseServer.Close()
  118. }
  119. // Addr returns the local network address that the server is listening on.
  120. func (l *Listener) Addr() net.Addr {
  121. return l.baseServer.Addr()
  122. }
  123. // An EarlyListener listens for incoming QUIC connections, and returns them before the handshake completes.
  124. // For connections that don't use 0-RTT, this allows the server to send 0.5-RTT data.
  125. // This data is encrypted with forward-secure keys, however, the client's identity has not yet been verified.
  126. // For connection using 0-RTT, this allows the server to accept and respond to streams that the client opened in the
  127. // 0-RTT data it sent. Note that at this point during the handshake, the live-ness of the
  128. // client has not yet been confirmed, and the 0-RTT data could have been replayed by an attacker.
  129. type EarlyListener struct {
  130. baseServer *baseServer
  131. }
  132. // Accept returns a new connections. It should be called in a loop.
  133. func (l *EarlyListener) Accept(ctx context.Context) (EarlyConnection, error) {
  134. return l.baseServer.accept(ctx)
  135. }
  136. // Close the server. All active connections will be closed.
  137. func (l *EarlyListener) Close() error {
  138. return l.baseServer.Close()
  139. }
  140. // Addr returns the local network addr that the server is listening on.
  141. func (l *EarlyListener) Addr() net.Addr {
  142. return l.baseServer.Addr()
  143. }
  144. // ListenAddr creates a QUIC server listening on a given address.
  145. // See Listen for more details.
  146. func ListenAddr(addr string, tlsConf *tls.Config, config *Config) (*Listener, error) {
  147. conn, err := listenUDP(addr)
  148. if err != nil {
  149. return nil, err
  150. }
  151. return (&Transport{
  152. Conn: conn,
  153. createdConn: true,
  154. isSingleUse: true,
  155. }).Listen(tlsConf, config)
  156. }
  157. // ListenAddrEarly works like ListenAddr, but it returns connections before the handshake completes.
  158. func ListenAddrEarly(addr string, tlsConf *tls.Config, config *Config) (*EarlyListener, error) {
  159. conn, err := listenUDP(addr)
  160. if err != nil {
  161. return nil, err
  162. }
  163. return (&Transport{
  164. Conn: conn,
  165. createdConn: true,
  166. isSingleUse: true,
  167. }).ListenEarly(tlsConf, config)
  168. }
  169. func listenUDP(addr string) (*net.UDPConn, error) {
  170. udpAddr, err := net.ResolveUDPAddr("udp", addr)
  171. if err != nil {
  172. return nil, err
  173. }
  174. return net.ListenUDP("udp", udpAddr)
  175. }
  176. // Listen listens for QUIC connections on a given net.PacketConn.
  177. // If the PacketConn satisfies the OOBCapablePacketConn interface (as a net.UDPConn does),
  178. // ECN and packet info support will be enabled. In this case, ReadMsgUDP and WriteMsgUDP
  179. // will be used instead of ReadFrom and WriteTo to read/write packets.
  180. // A single net.PacketConn can only be used for a single call to Listen.
  181. //
  182. // The tls.Config must not be nil and must contain a certificate configuration.
  183. // Furthermore, it must define an application control (using NextProtos).
  184. // The quic.Config may be nil, in that case the default values will be used.
  185. //
  186. // This is a convenience function. More advanced use cases should instantiate a Transport,
  187. // which offers configuration options for a more fine-grained control of the connection establishment,
  188. // including reusing the underlying UDP socket for outgoing QUIC connections.
  189. // When closing a listener created with Listen, all established QUIC connections will be closed immediately.
  190. func Listen(conn net.PacketConn, tlsConf *tls.Config, config *Config) (*Listener, error) {
  191. tr := &Transport{Conn: conn, isSingleUse: true}
  192. return tr.Listen(tlsConf, config)
  193. }
  194. // ListenEarly works like Listen, but it returns connections before the handshake completes.
  195. func ListenEarly(conn net.PacketConn, tlsConf *tls.Config, config *Config) (*EarlyListener, error) {
  196. tr := &Transport{Conn: conn, isSingleUse: true}
  197. return tr.ListenEarly(tlsConf, config)
  198. }
  199. func newServer(
  200. conn rawConn,
  201. connHandler packetHandlerManager,
  202. connIDGenerator ConnectionIDGenerator,
  203. tlsConf *tls.Config,
  204. config *Config,
  205. tracer *logging.Tracer,
  206. onClose func(),
  207. tokenGeneratorKey TokenGeneratorKey,
  208. maxTokenAge time.Duration,
  209. disableVersionNegotiation bool,
  210. acceptEarly bool,
  211. ) *baseServer {
  212. s := &baseServer{
  213. conn: conn,
  214. tlsConf: tlsConf,
  215. config: config,
  216. tokenGenerator: handshake.NewTokenGenerator(tokenGeneratorKey),
  217. maxTokenAge: maxTokenAge,
  218. connIDGenerator: connIDGenerator,
  219. connHandler: connHandler,
  220. connQueue: make(chan quicConn),
  221. errorChan: make(chan struct{}),
  222. running: make(chan struct{}),
  223. receivedPackets: make(chan receivedPacket, protocol.MaxServerUnprocessedPackets),
  224. versionNegotiationQueue: make(chan receivedPacket, 4),
  225. invalidTokenQueue: make(chan rejectedPacket, 4),
  226. connectionRefusedQueue: make(chan rejectedPacket, 4),
  227. retryQueue: make(chan rejectedPacket, 8),
  228. newConn: newConnection,
  229. tracer: tracer,
  230. logger: utils.DefaultLogger.WithPrefix("server"),
  231. acceptEarlyConns: acceptEarly,
  232. disableVersionNegotiation: disableVersionNegotiation,
  233. onClose: onClose,
  234. }
  235. if acceptEarly {
  236. s.zeroRTTQueues = map[protocol.ConnectionID]*zeroRTTQueue{}
  237. }
  238. go s.run()
  239. go s.runSendQueue()
  240. s.logger.Debugf("Listening for %s connections on %s", conn.LocalAddr().Network(), conn.LocalAddr().String())
  241. return s
  242. }
  243. func (s *baseServer) run() {
  244. defer close(s.running)
  245. for {
  246. select {
  247. case <-s.errorChan:
  248. return
  249. default:
  250. }
  251. select {
  252. case <-s.errorChan:
  253. return
  254. case p := <-s.receivedPackets:
  255. if bufferStillInUse := s.handlePacketImpl(p); !bufferStillInUse {
  256. p.buffer.Release()
  257. }
  258. }
  259. }
  260. }
  261. func (s *baseServer) runSendQueue() {
  262. for {
  263. select {
  264. case <-s.running:
  265. return
  266. case p := <-s.versionNegotiationQueue:
  267. s.maybeSendVersionNegotiationPacket(p)
  268. case p := <-s.invalidTokenQueue:
  269. s.maybeSendInvalidToken(p)
  270. case p := <-s.connectionRefusedQueue:
  271. s.sendConnectionRefused(p)
  272. case p := <-s.retryQueue:
  273. s.sendRetry(p)
  274. }
  275. }
  276. }
  277. // Accept returns connections that already completed the handshake.
  278. // It is only valid if acceptEarlyConns is false.
  279. func (s *baseServer) Accept(ctx context.Context) (Connection, error) {
  280. return s.accept(ctx)
  281. }
  282. func (s *baseServer) accept(ctx context.Context) (quicConn, error) {
  283. select {
  284. case <-ctx.Done():
  285. return nil, ctx.Err()
  286. case conn := <-s.connQueue:
  287. atomic.AddInt32(&s.connQueueLen, -1)
  288. return conn, nil
  289. case <-s.errorChan:
  290. return nil, s.closeErr
  291. }
  292. }
  293. func (s *baseServer) Close() error {
  294. s.close(ErrServerClosed, true)
  295. return nil
  296. }
  297. func (s *baseServer) close(e error, notifyOnClose bool) {
  298. s.closeOnce.Do(func() {
  299. s.closeErr = e
  300. close(s.errorChan)
  301. <-s.running
  302. if notifyOnClose {
  303. s.onClose()
  304. }
  305. })
  306. }
  307. // Addr returns the server's network address
  308. func (s *baseServer) Addr() net.Addr {
  309. return s.conn.LocalAddr()
  310. }
  311. func (s *baseServer) handlePacket(p receivedPacket) {
  312. select {
  313. case s.receivedPackets <- p:
  314. default:
  315. s.logger.Debugf("Dropping packet from %s (%d bytes). Server receive queue full.", p.remoteAddr, p.Size())
  316. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  317. s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeNotDetermined, p.Size(), logging.PacketDropDOSPrevention)
  318. }
  319. }
  320. }
  321. func (s *baseServer) handlePacketImpl(p receivedPacket) bool /* is the buffer still in use? */ {
  322. if !s.nextZeroRTTCleanup.IsZero() && p.rcvTime.After(s.nextZeroRTTCleanup) {
  323. defer s.cleanupZeroRTTQueues(p.rcvTime)
  324. }
  325. if wire.IsVersionNegotiationPacket(p.data) {
  326. s.logger.Debugf("Dropping Version Negotiation packet.")
  327. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  328. s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeVersionNegotiation, p.Size(), logging.PacketDropUnexpectedPacket)
  329. }
  330. return false
  331. }
  332. // Short header packets should never end up here in the first place
  333. if !wire.IsLongHeaderPacket(p.data[0]) {
  334. panic(fmt.Sprintf("misrouted packet: %#v", p.data))
  335. }
  336. v, err := wire.ParseVersion(p.data)
  337. // drop the packet if we failed to parse the protocol version
  338. if err != nil {
  339. s.logger.Debugf("Dropping a packet with an unknown version")
  340. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  341. s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeNotDetermined, p.Size(), logging.PacketDropUnexpectedPacket)
  342. }
  343. return false
  344. }
  345. // send a Version Negotiation Packet if the client is speaking a different protocol version
  346. if !protocol.IsSupportedVersion(s.config.Versions, v) {
  347. if s.disableVersionNegotiation {
  348. return false
  349. }
  350. if p.Size() < protocol.MinUnknownVersionPacketSize {
  351. s.logger.Debugf("Dropping a packet with an unsupported version number %d that is too small (%d bytes)", v, p.Size())
  352. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  353. s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeNotDetermined, p.Size(), logging.PacketDropUnexpectedPacket)
  354. }
  355. return false
  356. }
  357. return s.enqueueVersionNegotiationPacket(p)
  358. }
  359. if wire.Is0RTTPacket(p.data) {
  360. if !s.acceptEarlyConns {
  361. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  362. s.tracer.DroppedPacket(p.remoteAddr, logging.PacketType0RTT, p.Size(), logging.PacketDropUnexpectedPacket)
  363. }
  364. return false
  365. }
  366. return s.handle0RTTPacket(p)
  367. }
  368. // If we're creating a new connection, the packet will be passed to the connection.
  369. // The header will then be parsed again.
  370. hdr, _, _, err := wire.ParsePacket(p.data)
  371. if err != nil {
  372. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  373. s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeNotDetermined, p.Size(), logging.PacketDropHeaderParseError)
  374. }
  375. s.logger.Debugf("Error parsing packet: %s", err)
  376. return false
  377. }
  378. // [Psiphon]
  379. // Both Obfuscated QUIC and QUIC over WebRTC SRTP adjust the max
  380. // packet size to allow for additional overhead. Enforce a reduced
  381. // minimum initial packet size when the adjustment results in a max
  382. // packet size less than protocol.MinInitialPacketSize.
  383. minInitialPacketSize := protocol.ByteCount(protocol.MinInitialPacketSize)
  384. if s.config.ServerMaxPacketSizeAdjustment != nil {
  385. maxPacketSize := protocol.ByteCount(protocol.InitialPacketSizeIPv4)
  386. if host, _, err := net.SplitHostPort(p.remoteAddr.String()); err == nil {
  387. if IP := net.ParseIP(host); IP != nil && IP.To4() == nil {
  388. maxPacketSize = protocol.InitialPacketSizeIPv6
  389. }
  390. }
  391. adjustment := protocol.ByteCount(s.config.ServerMaxPacketSizeAdjustment(p.remoteAddr))
  392. maxPacketSize -= adjustment
  393. if maxPacketSize < 0 {
  394. maxPacketSize = 0
  395. }
  396. if maxPacketSize < minInitialPacketSize {
  397. minInitialPacketSize = maxPacketSize
  398. }
  399. }
  400. if hdr.Type == protocol.PacketTypeInitial && p.Size() < minInitialPacketSize {
  401. s.logger.Debugf("Dropping a packet that is too small to be a valid Initial (%d bytes)", p.Size())
  402. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  403. s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeInitial, p.Size(), logging.PacketDropUnexpectedPacket)
  404. }
  405. return false
  406. }
  407. if hdr.Type != protocol.PacketTypeInitial {
  408. // Drop long header packets.
  409. // There's little point in sending a Stateless Reset, since the client
  410. // might not have received the token yet.
  411. s.logger.Debugf("Dropping long header packet of type %s (%d bytes)", hdr.Type, len(p.data))
  412. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  413. s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeFromHeader(hdr), p.Size(), logging.PacketDropUnexpectedPacket)
  414. }
  415. return false
  416. }
  417. s.logger.Debugf("<- Received Initial packet.")
  418. if err := s.handleInitialImpl(p, hdr); err != nil {
  419. s.logger.Errorf("Error occurred handling initial packet: %s", err)
  420. }
  421. // Don't put the packet buffer back.
  422. // handleInitialImpl deals with the buffer.
  423. return true
  424. }
  425. func (s *baseServer) handle0RTTPacket(p receivedPacket) bool {
  426. connID, err := wire.ParseConnectionID(p.data, 0)
  427. if err != nil {
  428. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  429. s.tracer.DroppedPacket(p.remoteAddr, logging.PacketType0RTT, p.Size(), logging.PacketDropHeaderParseError)
  430. }
  431. return false
  432. }
  433. // check again if we might have a connection now
  434. if handler, ok := s.connHandler.Get(connID); ok {
  435. handler.handlePacket(p)
  436. return true
  437. }
  438. if q, ok := s.zeroRTTQueues[connID]; ok {
  439. if len(q.packets) >= protocol.Max0RTTQueueLen {
  440. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  441. s.tracer.DroppedPacket(p.remoteAddr, logging.PacketType0RTT, p.Size(), logging.PacketDropDOSPrevention)
  442. }
  443. return false
  444. }
  445. q.packets = append(q.packets, p)
  446. return true
  447. }
  448. if len(s.zeroRTTQueues) >= protocol.Max0RTTQueues {
  449. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  450. s.tracer.DroppedPacket(p.remoteAddr, logging.PacketType0RTT, p.Size(), logging.PacketDropDOSPrevention)
  451. }
  452. return false
  453. }
  454. queue := &zeroRTTQueue{packets: make([]receivedPacket, 1, 8)}
  455. queue.packets[0] = p
  456. expiration := p.rcvTime.Add(protocol.Max0RTTQueueingDuration)
  457. queue.expiration = expiration
  458. if s.nextZeroRTTCleanup.IsZero() || s.nextZeroRTTCleanup.After(expiration) {
  459. s.nextZeroRTTCleanup = expiration
  460. }
  461. s.zeroRTTQueues[connID] = queue
  462. return true
  463. }
  464. func (s *baseServer) cleanupZeroRTTQueues(now time.Time) {
  465. // Iterate over all queues to find those that are expired.
  466. // This is ok since we're placing a pretty low limit on the number of queues.
  467. var nextCleanup time.Time
  468. for connID, q := range s.zeroRTTQueues {
  469. if q.expiration.After(now) {
  470. if nextCleanup.IsZero() || nextCleanup.After(q.expiration) {
  471. nextCleanup = q.expiration
  472. }
  473. continue
  474. }
  475. for _, p := range q.packets {
  476. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  477. s.tracer.DroppedPacket(p.remoteAddr, logging.PacketType0RTT, p.Size(), logging.PacketDropDOSPrevention)
  478. }
  479. p.buffer.Release()
  480. }
  481. delete(s.zeroRTTQueues, connID)
  482. if s.logger.Debug() {
  483. s.logger.Debugf("Removing 0-RTT queue for %s.", connID)
  484. }
  485. }
  486. s.nextZeroRTTCleanup = nextCleanup
  487. }
  488. // [Psiphon]
  489. type stubCryptoSetup struct {
  490. initialOpener handshake.LongHeaderOpener
  491. }
  492. var errNotSupported = errors.New("not supported")
  493. func (s *stubCryptoSetup) StartHandshake() error {
  494. return nil
  495. }
  496. func (s *stubCryptoSetup) Close() error {
  497. return errNotSupported
  498. }
  499. func (s *stubCryptoSetup) ChangeConnectionID(protocol.ConnectionID) {
  500. }
  501. func (s *stubCryptoSetup) GetSessionTicket() ([]byte, error) {
  502. return nil, errNotSupported
  503. }
  504. func (s *stubCryptoSetup) HandleMessage([]byte, protocol.EncryptionLevel) error {
  505. return nil
  506. }
  507. func (s *stubCryptoSetup) NextEvent() handshake.Event {
  508. return handshake.Event{}
  509. }
  510. func (s *stubCryptoSetup) SetLargest1RTTAcked(protocol.PacketNumber) error {
  511. return errNotSupported
  512. }
  513. func (s *stubCryptoSetup) DiscardInitialKeys() {
  514. }
  515. func (s *stubCryptoSetup) SetHandshakeConfirmed() {
  516. }
  517. func (s *stubCryptoSetup) ConnectionState() handshake.ConnectionState {
  518. return handshake.ConnectionState{}
  519. }
  520. // [Psiphon]
  521. func (s *stubCryptoSetup) TLSConnectionMetrics() tls.ConnectionMetrics {
  522. return tls.ConnectionMetrics{}
  523. }
  524. func (s *stubCryptoSetup) GetInitialOpener() (handshake.LongHeaderOpener, error) {
  525. return s.initialOpener, nil
  526. }
  527. func (s *stubCryptoSetup) GetHandshakeOpener() (handshake.LongHeaderOpener, error) {
  528. return nil, errNotSupported
  529. }
  530. func (s *stubCryptoSetup) Get0RTTOpener() (handshake.LongHeaderOpener, error) {
  531. return nil, errNotSupported
  532. }
  533. func (s *stubCryptoSetup) Get1RTTOpener() (handshake.ShortHeaderOpener, error) {
  534. return nil, errNotSupported
  535. }
  536. func (s *stubCryptoSetup) GetInitialSealer() (handshake.LongHeaderSealer, error) {
  537. return nil, errNotSupported
  538. }
  539. func (s *stubCryptoSetup) GetHandshakeSealer() (handshake.LongHeaderSealer, error) {
  540. return nil, errNotSupported
  541. }
  542. func (s *stubCryptoSetup) Get0RTTSealer() (handshake.LongHeaderSealer, error) {
  543. return nil, errNotSupported
  544. }
  545. func (s *stubCryptoSetup) Get1RTTSealer() (handshake.ShortHeaderSealer, error) {
  546. return nil, errNotSupported
  547. }
  548. // [Psiphon]
  549. // verifyClientHelloRandom unpacks an Initial packet, extracts the CRYPTO
  550. // frame, and calls Config.VerifyClientHelloRandom.
  551. func (s *baseServer) verifyClientHelloRandom(p receivedPacket, hdr *wire.Header) error {
  552. // TODO: support QUICv2
  553. versionNumber := protocol.Version1
  554. _, initialOpener := handshake.NewInitialAEAD(
  555. hdr.DestConnectionID, protocol.PerspectiveServer, versionNumber)
  556. cs := &stubCryptoSetup{
  557. initialOpener: initialOpener,
  558. }
  559. // Make a copy of the packet data since this unpacking modifies it and the
  560. // original packet data must be retained for subsequent processing.
  561. data := append([]byte(nil), p.data...)
  562. unpacker := newPacketUnpacker(cs, 0)
  563. unpacked, err := unpacker.UnpackLongHeader(hdr, p.rcvTime, data, versionNumber)
  564. if err != nil {
  565. return fmt.Errorf("verifyClientHelloRandom: UnpackLongHeader: %w", err)
  566. }
  567. parser := wire.NewFrameParser(s.config.EnableDatagrams)
  568. d := unpacked.data
  569. for len(d) > 0 {
  570. l, frame, err := parser.ParseNext(d, protocol.EncryptionInitial, versionNumber)
  571. if err != nil {
  572. return fmt.Errorf("verifyClientHelloRandom: ParseNext: %w", err)
  573. }
  574. if frame == nil {
  575. return errors.New("verifyClientHelloRandom: missing CRYPTO frame")
  576. }
  577. d = d[l:]
  578. cryptoFrame, ok := frame.(*wire.CryptoFrame)
  579. if !ok {
  580. continue
  581. }
  582. if cryptoFrame.Offset != 0 {
  583. return errors.New("verifyClientHelloRandom: unexpected CRYPTO frame offset")
  584. }
  585. random, err := qtls.ReadClientHelloRandom(cryptoFrame.Data)
  586. if err != nil {
  587. return fmt.Errorf("verifyClientHelloRandom: ReadClientHelloRandom: %w", err)
  588. }
  589. if !s.config.VerifyClientHelloRandom(p.remoteAddr, random) {
  590. return fmt.Errorf("verifyClientHelloRandom: VerifyClientHelloRandom failed")
  591. }
  592. break
  593. }
  594. return nil
  595. }
  596. // validateToken returns false if:
  597. // - address is invalid
  598. // - token is expired
  599. // - token is null
  600. func (s *baseServer) validateToken(token *handshake.Token, addr net.Addr) bool {
  601. if token == nil {
  602. return false
  603. }
  604. if !token.ValidateRemoteAddr(addr) {
  605. return false
  606. }
  607. if !token.IsRetryToken && time.Since(token.SentTime) > s.maxTokenAge {
  608. return false
  609. }
  610. if token.IsRetryToken && time.Since(token.SentTime) > s.config.maxRetryTokenAge() {
  611. return false
  612. }
  613. return true
  614. }
  615. func (s *baseServer) handleInitialImpl(p receivedPacket, hdr *wire.Header) error {
  616. if len(hdr.Token) == 0 && hdr.DestConnectionID.Len() < protocol.MinConnectionIDLenInitial {
  617. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  618. s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeInitial, p.Size(), logging.PacketDropUnexpectedPacket)
  619. }
  620. p.buffer.Release()
  621. return errors.New("too short connection ID")
  622. }
  623. // [Psiphon]
  624. // Drop any Initial packet that fails verifyClientHelloRandom.
  625. if s.config.VerifyClientHelloRandom != nil {
  626. err := s.verifyClientHelloRandom(p, hdr)
  627. if err != nil {
  628. p.buffer.Release()
  629. return err
  630. }
  631. }
  632. // The server queues packets for a while, and we might already have established a connection by now.
  633. // This results in a second check in the connection map.
  634. // That's ok since it's not the hot path (it's only taken by some Initial and 0-RTT packets).
  635. if handler, ok := s.connHandler.Get(hdr.DestConnectionID); ok {
  636. handler.handlePacket(p)
  637. return nil
  638. }
  639. var (
  640. token *handshake.Token
  641. retrySrcConnID *protocol.ConnectionID
  642. )
  643. origDestConnID := hdr.DestConnectionID
  644. if len(hdr.Token) > 0 {
  645. tok, err := s.tokenGenerator.DecodeToken(hdr.Token)
  646. if err == nil {
  647. if tok.IsRetryToken {
  648. origDestConnID = tok.OriginalDestConnectionID
  649. retrySrcConnID = &tok.RetrySrcConnectionID
  650. }
  651. token = tok
  652. }
  653. }
  654. clientAddrIsValid := s.validateToken(token, p.remoteAddr)
  655. if token != nil && !clientAddrIsValid {
  656. // For invalid and expired non-retry tokens, we don't send an INVALID_TOKEN error.
  657. // We just ignore them, and act as if there was no token on this packet at all.
  658. // This also means we might send a Retry later.
  659. if !token.IsRetryToken {
  660. token = nil
  661. } else {
  662. // For Retry tokens, we send an INVALID_ERROR if
  663. // * the token is too old, or
  664. // * the token is invalid, in case of a retry token.
  665. select {
  666. case s.invalidTokenQueue <- rejectedPacket{receivedPacket: p, hdr: hdr}:
  667. default:
  668. // drop packet if we can't send out the INVALID_TOKEN packets fast enough
  669. p.buffer.Release()
  670. }
  671. return nil
  672. }
  673. }
  674. if token == nil && s.config.RequireAddressValidation(p.remoteAddr) {
  675. // Retry invalidates all 0-RTT packets sent.
  676. delete(s.zeroRTTQueues, hdr.DestConnectionID)
  677. select {
  678. case s.retryQueue <- rejectedPacket{receivedPacket: p, hdr: hdr}:
  679. default:
  680. // drop packet if we can't send out Retry packets fast enough
  681. p.buffer.Release()
  682. }
  683. return nil
  684. }
  685. if queueLen := atomic.LoadInt32(&s.connQueueLen); queueLen >= protocol.MaxAcceptQueueSize {
  686. s.logger.Debugf("Rejecting new connection. Server currently busy. Accept queue length: %d (max %d)", queueLen, protocol.MaxAcceptQueueSize)
  687. select {
  688. case s.connectionRefusedQueue <- rejectedPacket{receivedPacket: p, hdr: hdr}:
  689. default:
  690. // drop packet if we can't send out the CONNECTION_REFUSED fast enough
  691. p.buffer.Release()
  692. }
  693. return nil
  694. }
  695. connID, err := s.connIDGenerator.GenerateConnectionID()
  696. if err != nil {
  697. return err
  698. }
  699. s.logger.Debugf("Changing connection ID to %s.", connID)
  700. var conn quicConn
  701. tracingID := nextConnTracingID()
  702. if added := s.connHandler.AddWithConnID(hdr.DestConnectionID, connID, func() (packetHandler, bool) {
  703. config := s.config
  704. if s.config.GetConfigForClient != nil {
  705. conf, err := s.config.GetConfigForClient(&ClientHelloInfo{RemoteAddr: p.remoteAddr})
  706. if err != nil {
  707. s.logger.Debugf("Rejecting new connection due to GetConfigForClient callback")
  708. return nil, false
  709. }
  710. config = populateConfig(conf)
  711. }
  712. var tracer *logging.ConnectionTracer
  713. if config.Tracer != nil {
  714. // Use the same connection ID that is passed to the client's GetLogWriter callback.
  715. connID := hdr.DestConnectionID
  716. if origDestConnID.Len() > 0 {
  717. connID = origDestConnID
  718. }
  719. tracer = config.Tracer(context.WithValue(context.Background(), ConnectionTracingKey, tracingID), protocol.PerspectiveServer, connID)
  720. }
  721. conn = s.newConn(
  722. newSendConn(s.conn, p.remoteAddr, p.info, s.logger),
  723. s.connHandler,
  724. origDestConnID,
  725. retrySrcConnID,
  726. hdr.DestConnectionID,
  727. hdr.SrcConnectionID,
  728. connID,
  729. s.connIDGenerator,
  730. s.connHandler.GetStatelessResetToken(connID),
  731. config,
  732. s.tlsConf,
  733. s.tokenGenerator,
  734. clientAddrIsValid,
  735. tracer,
  736. tracingID,
  737. s.logger,
  738. hdr.Version,
  739. )
  740. conn.handlePacket(p)
  741. if q, ok := s.zeroRTTQueues[hdr.DestConnectionID]; ok {
  742. for _, p := range q.packets {
  743. conn.handlePacket(p)
  744. }
  745. delete(s.zeroRTTQueues, hdr.DestConnectionID)
  746. }
  747. return conn, true
  748. }); !added {
  749. select {
  750. case s.connectionRefusedQueue <- rejectedPacket{receivedPacket: p, hdr: hdr}:
  751. default:
  752. // drop packet if we can't send out the CONNECTION_REFUSED fast enough
  753. p.buffer.Release()
  754. }
  755. return nil
  756. }
  757. go conn.run()
  758. go s.handleNewConn(conn)
  759. if conn == nil {
  760. p.buffer.Release()
  761. return nil
  762. }
  763. return nil
  764. }
  765. func (s *baseServer) handleNewConn(conn quicConn) {
  766. connCtx := conn.Context()
  767. if s.acceptEarlyConns {
  768. // wait until the early connection is ready, the handshake fails, or the server is closed
  769. select {
  770. case <-s.errorChan:
  771. conn.destroy(&qerr.TransportError{ErrorCode: ConnectionRefused})
  772. return
  773. case <-conn.earlyConnReady():
  774. case <-connCtx.Done():
  775. return
  776. }
  777. } else {
  778. // wait until the handshake is complete (or fails)
  779. select {
  780. case <-s.errorChan:
  781. conn.destroy(&qerr.TransportError{ErrorCode: ConnectionRefused})
  782. return
  783. case <-conn.HandshakeComplete():
  784. case <-connCtx.Done():
  785. return
  786. }
  787. }
  788. atomic.AddInt32(&s.connQueueLen, 1)
  789. select {
  790. case s.connQueue <- conn:
  791. // blocks until the connection is accepted
  792. case <-connCtx.Done():
  793. atomic.AddInt32(&s.connQueueLen, -1)
  794. // don't pass connections that were already closed to Accept()
  795. }
  796. }
  797. func (s *baseServer) sendRetry(p rejectedPacket) {
  798. if err := s.sendRetryPacket(p); err != nil {
  799. s.logger.Debugf("Error sending Retry packet: %s", err)
  800. }
  801. }
  802. func (s *baseServer) sendRetryPacket(p rejectedPacket) error {
  803. hdr := p.hdr
  804. // Log the Initial packet now.
  805. // If no Retry is sent, the packet will be logged by the connection.
  806. (&wire.ExtendedHeader{Header: *hdr}).Log(s.logger)
  807. srcConnID, err := s.connIDGenerator.GenerateConnectionID()
  808. if err != nil {
  809. return err
  810. }
  811. token, err := s.tokenGenerator.NewRetryToken(p.remoteAddr, hdr.DestConnectionID, srcConnID)
  812. if err != nil {
  813. return err
  814. }
  815. replyHdr := &wire.ExtendedHeader{}
  816. replyHdr.Type = protocol.PacketTypeRetry
  817. replyHdr.Version = hdr.Version
  818. replyHdr.SrcConnectionID = srcConnID
  819. replyHdr.DestConnectionID = hdr.SrcConnectionID
  820. replyHdr.Token = token
  821. if s.logger.Debug() {
  822. s.logger.Debugf("Changing connection ID to %s.", srcConnID)
  823. s.logger.Debugf("-> Sending Retry")
  824. replyHdr.Log(s.logger)
  825. }
  826. buf := getPacketBuffer()
  827. defer buf.Release()
  828. buf.Data, err = replyHdr.Append(buf.Data, hdr.Version)
  829. if err != nil {
  830. return err
  831. }
  832. // append the Retry integrity tag
  833. tag := handshake.GetRetryIntegrityTag(buf.Data, hdr.DestConnectionID, hdr.Version)
  834. buf.Data = append(buf.Data, tag[:]...)
  835. if s.tracer != nil && s.tracer.SentPacket != nil {
  836. s.tracer.SentPacket(p.remoteAddr, &replyHdr.Header, protocol.ByteCount(len(buf.Data)), nil)
  837. }
  838. _, err = s.conn.WritePacket(buf.Data, p.remoteAddr, p.info.OOB(), 0, protocol.ECNUnsupported)
  839. return err
  840. }
  841. func (s *baseServer) maybeSendInvalidToken(p rejectedPacket) {
  842. defer p.buffer.Release()
  843. // Only send INVALID_TOKEN if we can unprotect the packet.
  844. // This makes sure that we won't send it for packets that were corrupted.
  845. hdr := p.hdr
  846. sealer, opener := handshake.NewInitialAEAD(hdr.DestConnectionID, protocol.PerspectiveServer, hdr.Version)
  847. data := p.data[:hdr.ParsedLen()+hdr.Length]
  848. extHdr, err := unpackLongHeader(opener, hdr, data, hdr.Version)
  849. // Only send INVALID_TOKEN if we can unprotect the packet.
  850. // This makes sure that we won't send it for packets that were corrupted.
  851. if err != nil {
  852. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  853. s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeInitial, p.Size(), logging.PacketDropHeaderParseError)
  854. }
  855. return
  856. }
  857. hdrLen := extHdr.ParsedLen()
  858. if _, err := opener.Open(data[hdrLen:hdrLen], data[hdrLen:], extHdr.PacketNumber, data[:hdrLen]); err != nil {
  859. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  860. s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeInitial, p.Size(), logging.PacketDropPayloadDecryptError)
  861. }
  862. return
  863. }
  864. if s.logger.Debug() {
  865. s.logger.Debugf("Client sent an invalid retry token. Sending INVALID_TOKEN to %s.", p.remoteAddr)
  866. }
  867. if err := s.sendError(p.remoteAddr, hdr, sealer, qerr.InvalidToken, p.info); err != nil {
  868. s.logger.Debugf("Error sending INVALID_TOKEN error: %s", err)
  869. }
  870. }
  871. func (s *baseServer) sendConnectionRefused(p rejectedPacket) {
  872. defer p.buffer.Release()
  873. sealer, _ := handshake.NewInitialAEAD(p.hdr.DestConnectionID, protocol.PerspectiveServer, p.hdr.Version)
  874. if err := s.sendError(p.remoteAddr, p.hdr, sealer, qerr.ConnectionRefused, p.info); err != nil {
  875. s.logger.Debugf("Error sending CONNECTION_REFUSED error: %s", err)
  876. }
  877. }
  878. // sendError sends the error as a response to the packet received with header hdr
  879. func (s *baseServer) sendError(remoteAddr net.Addr, hdr *wire.Header, sealer handshake.LongHeaderSealer, errorCode qerr.TransportErrorCode, info packetInfo) error {
  880. b := getPacketBuffer()
  881. defer b.Release()
  882. ccf := &wire.ConnectionCloseFrame{ErrorCode: uint64(errorCode)}
  883. replyHdr := &wire.ExtendedHeader{}
  884. replyHdr.Type = protocol.PacketTypeInitial
  885. replyHdr.Version = hdr.Version
  886. replyHdr.SrcConnectionID = hdr.DestConnectionID
  887. replyHdr.DestConnectionID = hdr.SrcConnectionID
  888. replyHdr.PacketNumberLen = protocol.PacketNumberLen4
  889. replyHdr.Length = 4 /* packet number len */ + ccf.Length(hdr.Version) + protocol.ByteCount(sealer.Overhead())
  890. var err error
  891. b.Data, err = replyHdr.Append(b.Data, hdr.Version)
  892. if err != nil {
  893. return err
  894. }
  895. payloadOffset := len(b.Data)
  896. b.Data, err = ccf.Append(b.Data, hdr.Version)
  897. if err != nil {
  898. return err
  899. }
  900. _ = sealer.Seal(b.Data[payloadOffset:payloadOffset], b.Data[payloadOffset:], replyHdr.PacketNumber, b.Data[:payloadOffset])
  901. b.Data = b.Data[0 : len(b.Data)+sealer.Overhead()]
  902. pnOffset := payloadOffset - int(replyHdr.PacketNumberLen)
  903. sealer.EncryptHeader(
  904. b.Data[pnOffset+4:pnOffset+4+16],
  905. &b.Data[0],
  906. b.Data[pnOffset:payloadOffset],
  907. )
  908. replyHdr.Log(s.logger)
  909. wire.LogFrame(s.logger, ccf, true)
  910. if s.tracer != nil && s.tracer.SentPacket != nil {
  911. s.tracer.SentPacket(remoteAddr, &replyHdr.Header, protocol.ByteCount(len(b.Data)), []logging.Frame{ccf})
  912. }
  913. _, err = s.conn.WritePacket(b.Data, remoteAddr, info.OOB(), 0, protocol.ECNUnsupported)
  914. return err
  915. }
  916. func (s *baseServer) enqueueVersionNegotiationPacket(p receivedPacket) (bufferInUse bool) {
  917. select {
  918. case s.versionNegotiationQueue <- p:
  919. return true
  920. default:
  921. // it's fine to not send version negotiation packets when we are busy
  922. }
  923. return false
  924. }
  925. func (s *baseServer) maybeSendVersionNegotiationPacket(p receivedPacket) {
  926. defer p.buffer.Release()
  927. v, err := wire.ParseVersion(p.data)
  928. if err != nil {
  929. s.logger.Debugf("failed to parse version for sending version negotiation packet: %s", err)
  930. return
  931. }
  932. _, src, dest, err := wire.ParseArbitraryLenConnectionIDs(p.data)
  933. if err != nil { // should never happen
  934. s.logger.Debugf("Dropping a packet with an unknown version for which we failed to parse connection IDs")
  935. if s.tracer != nil && s.tracer.DroppedPacket != nil {
  936. s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeNotDetermined, p.Size(), logging.PacketDropUnexpectedPacket)
  937. }
  938. return
  939. }
  940. s.logger.Debugf("Client offered version %s, sending Version Negotiation", v)
  941. data := wire.ComposeVersionNegotiation(dest, src, s.config.Versions)
  942. if s.tracer != nil && s.tracer.SentVersionNegotiationPacket != nil {
  943. s.tracer.SentVersionNegotiationPacket(p.remoteAddr, src, dest, s.config.Versions)
  944. }
  945. if _, err := s.conn.WritePacket(data, p.remoteAddr, p.info.OOB(), 0, protocol.ECNUnsupported); err != nil {
  946. s.logger.Debugf("Error sending Version Negotiation: %s", err)
  947. }
  948. }