server.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585
  1. package quic
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/tls"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/Psiphon-Labs/quic-go/internal/handshake"
  14. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  15. "github.com/Psiphon-Labs/quic-go/internal/qerr"
  16. "github.com/Psiphon-Labs/quic-go/internal/utils"
  17. "github.com/Psiphon-Labs/quic-go/internal/wire"
  18. )
  19. // packetHandler handles packets
  20. type packetHandler interface {
  21. handlePacket(*receivedPacket)
  22. io.Closer
  23. destroy(error)
  24. getPerspective() protocol.Perspective
  25. }
  26. type unknownPacketHandler interface {
  27. handlePacket(*receivedPacket)
  28. setCloseError(error)
  29. }
  30. type packetHandlerManager interface {
  31. io.Closer
  32. SetServer(unknownPacketHandler)
  33. CloseServer()
  34. sessionRunner
  35. AddIfNotTaken(protocol.ConnectionID, packetHandler) bool
  36. GetStatelessResetToken(protocol.ConnectionID) [16]byte
  37. }
  38. type quicSession interface {
  39. EarlySession
  40. earlySessionReady() <-chan struct{}
  41. handlePacket(*receivedPacket)
  42. GetVersion() protocol.VersionNumber
  43. getPerspective() protocol.Perspective
  44. run() error
  45. destroy(error)
  46. closeForRecreating() protocol.PacketNumber
  47. closeRemote(error)
  48. }
  49. // A Listener of QUIC
  50. type baseServer struct {
  51. mutex sync.Mutex
  52. acceptEarlySessions bool
  53. tlsConf *tls.Config
  54. config *Config
  55. conn net.PacketConn
  56. // If the server is started with ListenAddr, we create a packet conn.
  57. // If it is started with Listen, we take a packet conn as a parameter.
  58. createdPacketConn bool
  59. tokenGenerator *handshake.TokenGenerator
  60. sessionHandler packetHandlerManager
  61. // set as a member, so they can be set in the tests
  62. newSession func(connection, sessionRunner, protocol.ConnectionID /* original connection ID */, protocol.ConnectionID /* client dest connection ID */, protocol.ConnectionID /* destination connection ID */, protocol.ConnectionID /* source connection ID */, [16]byte, *Config, *tls.Config, *handshake.TokenGenerator, utils.Logger, protocol.VersionNumber) quicSession
  63. serverError error
  64. errorChan chan struct{}
  65. closed bool
  66. sessionQueue chan quicSession
  67. sessionQueueLen int32 // to be used as an atomic
  68. logger utils.Logger
  69. }
  70. var _ Listener = &baseServer{}
  71. var _ unknownPacketHandler = &baseServer{}
  72. type earlyServer struct{ *baseServer }
  73. var _ EarlyListener = &earlyServer{}
  74. func (s *earlyServer) Accept(ctx context.Context) (EarlySession, error) {
  75. return s.baseServer.accept(ctx)
  76. }
  77. // ListenAddr creates a QUIC server listening on a given address.
  78. // The tls.Config must not be nil and must contain a certificate configuration.
  79. // The quic.Config may be nil, in that case the default values will be used.
  80. func ListenAddr(addr string, tlsConf *tls.Config, config *Config) (Listener, error) {
  81. return listenAddr(addr, tlsConf, config, false)
  82. }
  83. // ListenAddrEarly works like ListenAddr, but it returns sessions before the handshake completes.
  84. func ListenAddrEarly(addr string, tlsConf *tls.Config, config *Config) (EarlyListener, error) {
  85. s, err := listenAddr(addr, tlsConf, config, true)
  86. if err != nil {
  87. return nil, err
  88. }
  89. return &earlyServer{s}, nil
  90. }
  91. func listenAddr(addr string, tlsConf *tls.Config, config *Config, acceptEarly bool) (*baseServer, error) {
  92. udpAddr, err := net.ResolveUDPAddr("udp", addr)
  93. if err != nil {
  94. return nil, err
  95. }
  96. conn, err := net.ListenUDP("udp", udpAddr)
  97. if err != nil {
  98. return nil, err
  99. }
  100. serv, err := listen(conn, tlsConf, config, acceptEarly)
  101. if err != nil {
  102. return nil, err
  103. }
  104. serv.createdPacketConn = true
  105. return serv, nil
  106. }
  107. // Listen listens for QUIC connections on a given net.PacketConn.
  108. // A single net.PacketConn only be used for a single call to Listen.
  109. // The PacketConn can be used for simultaneous calls to Dial.
  110. // QUIC connection IDs are used for demultiplexing the different connections.
  111. // The tls.Config must not be nil and must contain a certificate configuration.
  112. // Furthermore, it must define an application control (using NextProtos).
  113. // The quic.Config may be nil, in that case the default values will be used.
  114. func Listen(conn net.PacketConn, tlsConf *tls.Config, config *Config) (Listener, error) {
  115. return listen(conn, tlsConf, config, false)
  116. }
  117. // ListenEarly works like Listen, but it returns sessions before the handshake completes.
  118. func ListenEarly(conn net.PacketConn, tlsConf *tls.Config, config *Config) (EarlyListener, error) {
  119. s, err := listen(conn, tlsConf, config, true)
  120. if err != nil {
  121. return nil, err
  122. }
  123. s.acceptEarlySessions = true
  124. return &earlyServer{s}, nil
  125. }
  126. func listen(conn net.PacketConn, tlsConf *tls.Config, config *Config, acceptEarly bool) (*baseServer, error) {
  127. if tlsConf == nil {
  128. return nil, errors.New("quic: tls.Config not set")
  129. }
  130. config = populateServerConfig(config)
  131. for _, v := range config.Versions {
  132. if !protocol.IsValidVersion(v) {
  133. return nil, fmt.Errorf("%s is not a valid QUIC version", v)
  134. }
  135. }
  136. sessionHandler, err := getMultiplexer().AddConn(conn, config.ConnectionIDLength, config.StatelessResetKey)
  137. if err != nil {
  138. return nil, err
  139. }
  140. tokenGenerator, err := handshake.NewTokenGenerator()
  141. if err != nil {
  142. return nil, err
  143. }
  144. s := &baseServer{
  145. conn: conn,
  146. tlsConf: tlsConf,
  147. config: config,
  148. tokenGenerator: tokenGenerator,
  149. sessionHandler: sessionHandler,
  150. sessionQueue: make(chan quicSession),
  151. errorChan: make(chan struct{}),
  152. newSession: newSession,
  153. logger: utils.DefaultLogger.WithPrefix("server"),
  154. acceptEarlySessions: acceptEarly,
  155. }
  156. sessionHandler.SetServer(s)
  157. s.logger.Debugf("Listening for %s connections on %s", conn.LocalAddr().Network(), conn.LocalAddr().String())
  158. return s, nil
  159. }
  160. var defaultAcceptToken = func(clientAddr net.Addr, token *Token) bool {
  161. if token == nil {
  162. return false
  163. }
  164. validity := protocol.TokenValidity
  165. if token.IsRetryToken {
  166. validity = protocol.RetryTokenValidity
  167. }
  168. if time.Now().After(token.SentTime.Add(validity)) {
  169. return false
  170. }
  171. var sourceAddr string
  172. if udpAddr, ok := clientAddr.(*net.UDPAddr); ok {
  173. sourceAddr = udpAddr.IP.String()
  174. } else {
  175. sourceAddr = clientAddr.String()
  176. }
  177. return sourceAddr == token.RemoteAddr
  178. }
  179. // populateServerConfig populates fields in the quic.Config with their default values, if none are set
  180. // it may be called with nil
  181. func populateServerConfig(config *Config) *Config {
  182. if config == nil {
  183. config = &Config{}
  184. }
  185. versions := config.Versions
  186. if len(versions) == 0 {
  187. versions = protocol.SupportedVersions
  188. }
  189. verifyToken := defaultAcceptToken
  190. if config.AcceptToken != nil {
  191. verifyToken = config.AcceptToken
  192. }
  193. handshakeTimeout := protocol.DefaultHandshakeTimeout
  194. if config.HandshakeTimeout != 0 {
  195. handshakeTimeout = config.HandshakeTimeout
  196. }
  197. idleTimeout := protocol.DefaultIdleTimeout
  198. if config.IdleTimeout != 0 {
  199. idleTimeout = config.IdleTimeout
  200. }
  201. maxReceiveStreamFlowControlWindow := config.MaxReceiveStreamFlowControlWindow
  202. if maxReceiveStreamFlowControlWindow == 0 {
  203. maxReceiveStreamFlowControlWindow = protocol.DefaultMaxReceiveStreamFlowControlWindow
  204. }
  205. maxReceiveConnectionFlowControlWindow := config.MaxReceiveConnectionFlowControlWindow
  206. if maxReceiveConnectionFlowControlWindow == 0 {
  207. maxReceiveConnectionFlowControlWindow = protocol.DefaultMaxReceiveConnectionFlowControlWindow
  208. }
  209. maxIncomingStreams := config.MaxIncomingStreams
  210. if maxIncomingStreams == 0 {
  211. maxIncomingStreams = protocol.DefaultMaxIncomingStreams
  212. } else if maxIncomingStreams < 0 {
  213. maxIncomingStreams = 0
  214. }
  215. maxIncomingUniStreams := config.MaxIncomingUniStreams
  216. if maxIncomingUniStreams == 0 {
  217. maxIncomingUniStreams = protocol.DefaultMaxIncomingUniStreams
  218. } else if maxIncomingUniStreams < 0 {
  219. maxIncomingUniStreams = 0
  220. }
  221. connIDLen := config.ConnectionIDLength
  222. if connIDLen == 0 {
  223. connIDLen = protocol.DefaultConnectionIDLength
  224. }
  225. return &Config{
  226. Versions: versions,
  227. HandshakeTimeout: handshakeTimeout,
  228. IdleTimeout: idleTimeout,
  229. AcceptToken: verifyToken,
  230. KeepAlive: config.KeepAlive,
  231. MaxReceiveStreamFlowControlWindow: maxReceiveStreamFlowControlWindow,
  232. MaxReceiveConnectionFlowControlWindow: maxReceiveConnectionFlowControlWindow,
  233. MaxIncomingStreams: maxIncomingStreams,
  234. MaxIncomingUniStreams: maxIncomingUniStreams,
  235. ConnectionIDLength: connIDLen,
  236. StatelessResetKey: config.StatelessResetKey,
  237. QuicTracer: config.QuicTracer,
  238. }
  239. }
  240. // Accept returns sessions that already completed the handshake.
  241. // It is only valid if acceptEarlySessions is false.
  242. func (s *baseServer) Accept(ctx context.Context) (Session, error) {
  243. return s.accept(ctx)
  244. }
  245. func (s *baseServer) accept(ctx context.Context) (quicSession, error) {
  246. select {
  247. case <-ctx.Done():
  248. return nil, ctx.Err()
  249. case sess := <-s.sessionQueue:
  250. atomic.AddInt32(&s.sessionQueueLen, -1)
  251. return sess, nil
  252. case <-s.errorChan:
  253. return nil, s.serverError
  254. }
  255. }
  256. // Close the server
  257. func (s *baseServer) Close() error {
  258. s.mutex.Lock()
  259. defer s.mutex.Unlock()
  260. if s.closed {
  261. return nil
  262. }
  263. s.sessionHandler.CloseServer()
  264. if s.serverError == nil {
  265. s.serverError = errors.New("server closed")
  266. }
  267. var err error
  268. // If the server was started with ListenAddr, we created the packet conn.
  269. // We need to close it in order to make the go routine reading from that conn return.
  270. if s.createdPacketConn {
  271. err = s.sessionHandler.Close()
  272. }
  273. s.closed = true
  274. close(s.errorChan)
  275. return err
  276. }
  277. func (s *baseServer) setCloseError(e error) {
  278. s.mutex.Lock()
  279. defer s.mutex.Unlock()
  280. if s.closed {
  281. return
  282. }
  283. s.closed = true
  284. s.serverError = e
  285. close(s.errorChan)
  286. }
  287. // Addr returns the server's network address
  288. func (s *baseServer) Addr() net.Addr {
  289. return s.conn.LocalAddr()
  290. }
  291. func (s *baseServer) handlePacket(p *receivedPacket) {
  292. go func() {
  293. if shouldReleaseBuffer := s.handlePacketImpl(p); !shouldReleaseBuffer {
  294. p.buffer.Release()
  295. }
  296. }()
  297. }
  298. func (s *baseServer) handlePacketImpl(p *receivedPacket) bool /* was the packet passed on to a session */ {
  299. if len(p.data) < protocol.MinInitialPacketSize {
  300. s.logger.Debugf("Dropping a packet that is too small to be a valid Initial (%d bytes)", len(p.data))
  301. return false
  302. }
  303. // If we're creating a new session, the packet will be passed to the session.
  304. // The header will then be parsed again.
  305. hdr, _, _, err := wire.ParsePacket(p.data, s.config.ConnectionIDLength)
  306. if err != nil {
  307. s.logger.Debugf("Error parsing packet: %s", err)
  308. return false
  309. }
  310. // Short header packets should never end up here in the first place
  311. if !hdr.IsLongHeader {
  312. return false
  313. }
  314. // send a Version Negotiation Packet if the client is speaking a different protocol version
  315. if !protocol.IsSupportedVersion(s.config.Versions, hdr.Version) {
  316. s.sendVersionNegotiationPacket(p, hdr)
  317. return false
  318. }
  319. if hdr.IsLongHeader && hdr.Type != protocol.PacketTypeInitial {
  320. // Drop long header packets.
  321. // There's litte point in sending a Stateless Reset, since the client
  322. // might not have received the token yet.
  323. return false
  324. }
  325. s.logger.Debugf("<- Received Initial packet.")
  326. sess, err := s.handleInitialImpl(p, hdr)
  327. if err != nil {
  328. s.logger.Errorf("Error occurred handling initial packet: %s", err)
  329. return false
  330. }
  331. // A retry was done, or the connection attempt was rejected,
  332. // or if the Initial was a duplicate.
  333. if sess == nil {
  334. return false
  335. }
  336. // Don't put the packet buffer back if a new session was created.
  337. // The session will handle the packet and take of that.
  338. return true
  339. }
  340. func (s *baseServer) handleInitialImpl(p *receivedPacket, hdr *wire.Header) (quicSession, error) {
  341. if len(hdr.Token) == 0 && hdr.DestConnectionID.Len() < protocol.MinConnectionIDLenInitial {
  342. return nil, errors.New("too short connection ID")
  343. }
  344. var token *Token
  345. var origDestConnectionID protocol.ConnectionID
  346. if len(hdr.Token) > 0 {
  347. c, err := s.tokenGenerator.DecodeToken(hdr.Token)
  348. if err == nil {
  349. token = &Token{
  350. IsRetryToken: c.IsRetryToken,
  351. RemoteAddr: c.RemoteAddr,
  352. SentTime: c.SentTime,
  353. }
  354. origDestConnectionID = c.OriginalDestConnectionID
  355. }
  356. }
  357. if !s.config.AcceptToken(p.remoteAddr, token) {
  358. // Log the Initial packet now.
  359. // If no Retry is sent, the packet will be logged by the session.
  360. (&wire.ExtendedHeader{Header: *hdr}).Log(s.logger)
  361. return nil, s.sendRetry(p.remoteAddr, hdr)
  362. }
  363. if queueLen := atomic.LoadInt32(&s.sessionQueueLen); queueLen >= protocol.MaxAcceptQueueSize {
  364. s.logger.Debugf("Rejecting new connection. Server currently busy. Accept queue length: %d (max %d)", queueLen, protocol.MaxAcceptQueueSize)
  365. return nil, s.sendServerBusy(p.remoteAddr, hdr)
  366. }
  367. connID, err := protocol.GenerateConnectionID(s.config.ConnectionIDLength)
  368. if err != nil {
  369. return nil, err
  370. }
  371. s.logger.Debugf("Changing connection ID to %s.", connID)
  372. sess := s.createNewSession(
  373. p.remoteAddr,
  374. origDestConnectionID,
  375. hdr.DestConnectionID,
  376. hdr.SrcConnectionID,
  377. connID,
  378. hdr.Version,
  379. )
  380. if sess != nil {
  381. sess.handlePacket(p)
  382. }
  383. return sess, nil
  384. }
  385. func (s *baseServer) createNewSession(
  386. remoteAddr net.Addr,
  387. origDestConnID protocol.ConnectionID,
  388. clientDestConnID protocol.ConnectionID,
  389. destConnID protocol.ConnectionID,
  390. srcConnID protocol.ConnectionID,
  391. version protocol.VersionNumber,
  392. ) quicSession {
  393. sess := s.newSession(
  394. &conn{pconn: s.conn, currentAddr: remoteAddr},
  395. s.sessionHandler,
  396. origDestConnID,
  397. clientDestConnID,
  398. destConnID,
  399. srcConnID,
  400. s.sessionHandler.GetStatelessResetToken(srcConnID),
  401. s.config,
  402. s.tlsConf,
  403. s.tokenGenerator,
  404. s.logger,
  405. version,
  406. )
  407. added := s.sessionHandler.AddIfNotTaken(clientDestConnID, sess)
  408. // We're already keeping track of this connection ID.
  409. // This might happen if we receive two copies of the Initial at the same time.
  410. if !added {
  411. // [Psiphon]
  412. // Stop timer to release resources
  413. if s, ok := sess.(*session); ok {
  414. s.timer.Reset(time.Time{})
  415. }
  416. return nil
  417. }
  418. s.sessionHandler.Add(srcConnID, sess)
  419. go sess.run()
  420. go s.handleNewSession(sess)
  421. return sess
  422. }
  423. func (s *baseServer) handleNewSession(sess quicSession) {
  424. sessCtx := sess.Context()
  425. if s.acceptEarlySessions {
  426. // wait until the early session is ready (or the handshake fails)
  427. select {
  428. case <-sess.earlySessionReady():
  429. case <-sessCtx.Done():
  430. return
  431. }
  432. } else {
  433. // wait until the handshake is complete (or fails)
  434. select {
  435. case <-sess.HandshakeComplete().Done():
  436. case <-sessCtx.Done():
  437. return
  438. }
  439. }
  440. atomic.AddInt32(&s.sessionQueueLen, 1)
  441. select {
  442. case s.sessionQueue <- sess:
  443. // blocks until the session is accepted
  444. case <-sessCtx.Done():
  445. atomic.AddInt32(&s.sessionQueueLen, -1)
  446. // don't pass sessions that were already closed to Accept()
  447. }
  448. }
  449. func (s *baseServer) sendRetry(remoteAddr net.Addr, hdr *wire.Header) error {
  450. token, err := s.tokenGenerator.NewRetryToken(remoteAddr, hdr.DestConnectionID)
  451. if err != nil {
  452. return err
  453. }
  454. connID, err := protocol.GenerateConnectionID(s.config.ConnectionIDLength)
  455. if err != nil {
  456. return err
  457. }
  458. replyHdr := &wire.ExtendedHeader{}
  459. replyHdr.IsLongHeader = true
  460. replyHdr.Type = protocol.PacketTypeRetry
  461. replyHdr.Version = hdr.Version
  462. replyHdr.SrcConnectionID = connID
  463. replyHdr.DestConnectionID = hdr.SrcConnectionID
  464. replyHdr.OrigDestConnectionID = hdr.DestConnectionID
  465. replyHdr.Token = token
  466. s.logger.Debugf("Changing connection ID to %s.", connID)
  467. s.logger.Debugf("-> Sending Retry")
  468. replyHdr.Log(s.logger)
  469. buf := &bytes.Buffer{}
  470. if err := replyHdr.Write(buf, hdr.Version); err != nil {
  471. return err
  472. }
  473. if _, err := s.conn.WriteTo(buf.Bytes(), remoteAddr); err != nil {
  474. s.logger.Debugf("Error sending Retry: %s", err)
  475. }
  476. return nil
  477. }
  478. func (s *baseServer) sendServerBusy(remoteAddr net.Addr, hdr *wire.Header) error {
  479. sealer, _ := handshake.NewInitialAEAD(hdr.DestConnectionID, protocol.PerspectiveServer)
  480. packetBuffer := getPacketBuffer()
  481. defer packetBuffer.Release()
  482. buf := bytes.NewBuffer(packetBuffer.Slice[:0])
  483. ccf := &wire.ConnectionCloseFrame{ErrorCode: qerr.ServerBusy}
  484. replyHdr := &wire.ExtendedHeader{}
  485. replyHdr.IsLongHeader = true
  486. replyHdr.Type = protocol.PacketTypeInitial
  487. replyHdr.Version = hdr.Version
  488. replyHdr.SrcConnectionID = hdr.DestConnectionID
  489. replyHdr.DestConnectionID = hdr.SrcConnectionID
  490. replyHdr.PacketNumberLen = protocol.PacketNumberLen4
  491. replyHdr.Length = 4 /* packet number len */ + ccf.Length(hdr.Version) + protocol.ByteCount(sealer.Overhead())
  492. if err := replyHdr.Write(buf, hdr.Version); err != nil {
  493. return err
  494. }
  495. payloadOffset := buf.Len()
  496. if err := ccf.Write(buf, hdr.Version); err != nil {
  497. return err
  498. }
  499. raw := buf.Bytes()
  500. _ = sealer.Seal(raw[payloadOffset:payloadOffset], raw[payloadOffset:], replyHdr.PacketNumber, raw[:payloadOffset])
  501. raw = raw[0 : buf.Len()+sealer.Overhead()]
  502. pnOffset := payloadOffset - int(replyHdr.PacketNumberLen)
  503. sealer.EncryptHeader(
  504. raw[pnOffset+4:pnOffset+4+16],
  505. &raw[0],
  506. raw[pnOffset:payloadOffset],
  507. )
  508. replyHdr.Log(s.logger)
  509. wire.LogFrame(s.logger, ccf, true)
  510. if _, err := s.conn.WriteTo(raw, remoteAddr); err != nil {
  511. s.logger.Debugf("Error rejecting connection: %s", err)
  512. }
  513. return nil
  514. }
  515. func (s *baseServer) sendVersionNegotiationPacket(p *receivedPacket, hdr *wire.Header) {
  516. s.logger.Debugf("Client offered version %s, sending Version Negotiation", hdr.Version)
  517. data, err := wire.ComposeVersionNegotiation(hdr.SrcConnectionID, hdr.DestConnectionID, s.config.Versions)
  518. if err != nil {
  519. s.logger.Debugf("Error composing Version Negotiation: %s", err)
  520. return
  521. }
  522. if _, err := s.conn.WriteTo(data, p.remoteAddr); err != nil {
  523. s.logger.Debugf("Error sending Version Negotiation: %s", err)
  524. }
  525. }