server.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. package http3
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/tls"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "net/http"
  11. "runtime"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. "github.com/Psiphon-Labs/quic-go"
  17. "github.com/Psiphon-Labs/quic-go/internal/handshake"
  18. "github.com/Psiphon-Labs/quic-go/internal/protocol"
  19. "github.com/Psiphon-Labs/quic-go/internal/utils"
  20. "github.com/Psiphon-Labs/quic-go/quicvarint"
  21. "github.com/marten-seemann/qpack"
  22. )
  23. // allows mocking of quic.Listen and quic.ListenAddr
  24. var (
  25. quicListen = quic.ListenEarly
  26. quicListenAddr = quic.ListenAddrEarly
  27. )
  28. const (
  29. nextProtoH3Draft29 = "h3-29"
  30. nextProtoH3 = "h3"
  31. )
  32. const (
  33. streamTypeControlStream = 0
  34. streamTypePushStream = 1
  35. streamTypeQPACKEncoderStream = 2
  36. streamTypeQPACKDecoderStream = 3
  37. )
  38. func versionToALPN(v protocol.VersionNumber) string {
  39. if v == protocol.Version1 {
  40. return nextProtoH3
  41. }
  42. if v == protocol.VersionTLS || v == protocol.VersionDraft29 {
  43. return nextProtoH3Draft29
  44. }
  45. return ""
  46. }
  47. // contextKey is a value for use with context.WithValue. It's used as
  48. // a pointer so it fits in an interface{} without allocation.
  49. type contextKey struct {
  50. name string
  51. }
  52. func (k *contextKey) String() string { return "quic-go/http3 context value " + k.name }
  53. // ServerContextKey is a context key. It can be used in HTTP
  54. // handlers with Context.Value to access the server that
  55. // started the handler. The associated value will be of
  56. // type *http3.Server.
  57. var ServerContextKey = &contextKey{"http3-server"}
  58. type requestError struct {
  59. err error
  60. streamErr errorCode
  61. connErr errorCode
  62. }
  63. func newStreamError(code errorCode, err error) requestError {
  64. return requestError{err: err, streamErr: code}
  65. }
  66. func newConnError(code errorCode, err error) requestError {
  67. return requestError{err: err, connErr: code}
  68. }
  69. // Server is a HTTP/3 server.
  70. type Server struct {
  71. *http.Server
  72. // By providing a quic.Config, it is possible to set parameters of the QUIC connection.
  73. // If nil, it uses reasonable default values.
  74. QuicConfig *quic.Config
  75. // Enable support for HTTP/3 datagrams.
  76. // If set to true, QuicConfig.EnableDatagram will be set.
  77. // See https://www.ietf.org/archive/id/draft-schinazi-masque-h3-datagram-02.html.
  78. EnableDatagrams bool
  79. port uint32 // used atomically
  80. mutex sync.Mutex
  81. listeners map[*quic.EarlyListener]struct{}
  82. closed utils.AtomicBool
  83. loggerOnce sync.Once
  84. logger utils.Logger
  85. }
  86. // ListenAndServe listens on the UDP address s.Addr and calls s.Handler to handle HTTP/3 requests on incoming connections.
  87. func (s *Server) ListenAndServe() error {
  88. if s.Server == nil {
  89. return errors.New("use of http3.Server without http.Server")
  90. }
  91. return s.serveImpl(s.TLSConfig, nil)
  92. }
  93. // ListenAndServeTLS listens on the UDP address s.Addr and calls s.Handler to handle HTTP/3 requests on incoming connections.
  94. func (s *Server) ListenAndServeTLS(certFile, keyFile string) error {
  95. var err error
  96. certs := make([]tls.Certificate, 1)
  97. certs[0], err = tls.LoadX509KeyPair(certFile, keyFile)
  98. if err != nil {
  99. return err
  100. }
  101. // We currently only use the cert-related stuff from tls.Config,
  102. // so we don't need to make a full copy.
  103. config := &tls.Config{
  104. Certificates: certs,
  105. }
  106. return s.serveImpl(config, nil)
  107. }
  108. // Serve an existing UDP connection.
  109. // It is possible to reuse the same connection for outgoing connections.
  110. // Closing the server does not close the packet conn.
  111. func (s *Server) Serve(conn net.PacketConn) error {
  112. return s.serveImpl(s.TLSConfig, conn)
  113. }
  114. func (s *Server) serveImpl(tlsConf *tls.Config, conn net.PacketConn) error {
  115. if s.closed.Get() {
  116. return http.ErrServerClosed
  117. }
  118. if s.Server == nil {
  119. return errors.New("use of http3.Server without http.Server")
  120. }
  121. s.loggerOnce.Do(func() {
  122. s.logger = utils.DefaultLogger.WithPrefix("server")
  123. })
  124. // The tls.Config we pass to Listen needs to have the GetConfigForClient callback set.
  125. // That way, we can get the QUIC version and set the correct ALPN value.
  126. baseConf := &tls.Config{
  127. GetConfigForClient: func(ch *tls.ClientHelloInfo) (*tls.Config, error) {
  128. // determine the ALPN from the QUIC version used
  129. proto := nextProtoH3Draft29
  130. if qconn, ok := ch.Conn.(handshake.ConnWithVersion); ok {
  131. if qconn.GetQUICVersion() == protocol.Version1 {
  132. proto = nextProtoH3
  133. }
  134. }
  135. config := tlsConf
  136. if tlsConf.GetConfigForClient != nil {
  137. getConfigForClient := tlsConf.GetConfigForClient
  138. var err error
  139. conf, err := getConfigForClient(ch)
  140. if err != nil {
  141. return nil, err
  142. }
  143. if conf != nil {
  144. config = conf
  145. }
  146. }
  147. if config == nil {
  148. return nil, nil
  149. }
  150. config = config.Clone()
  151. config.NextProtos = []string{proto}
  152. return config, nil
  153. },
  154. }
  155. var ln quic.EarlyListener
  156. var err error
  157. quicConf := s.QuicConfig
  158. if quicConf == nil {
  159. quicConf = &quic.Config{}
  160. } else {
  161. quicConf = s.QuicConfig.Clone()
  162. }
  163. if s.EnableDatagrams {
  164. quicConf.EnableDatagrams = true
  165. }
  166. if conn == nil {
  167. ln, err = quicListenAddr(s.Addr, baseConf, quicConf)
  168. } else {
  169. ln, err = quicListen(conn, baseConf, quicConf)
  170. }
  171. if err != nil {
  172. return err
  173. }
  174. s.addListener(&ln)
  175. defer s.removeListener(&ln)
  176. for {
  177. sess, err := ln.Accept(context.Background())
  178. if err != nil {
  179. return err
  180. }
  181. go s.handleConn(sess)
  182. }
  183. }
  184. // We store a pointer to interface in the map set. This is safe because we only
  185. // call trackListener via Serve and can track+defer untrack the same pointer to
  186. // local variable there. We never need to compare a Listener from another caller.
  187. func (s *Server) addListener(l *quic.EarlyListener) {
  188. s.mutex.Lock()
  189. if s.listeners == nil {
  190. s.listeners = make(map[*quic.EarlyListener]struct{})
  191. }
  192. s.listeners[l] = struct{}{}
  193. s.mutex.Unlock()
  194. }
  195. func (s *Server) removeListener(l *quic.EarlyListener) {
  196. s.mutex.Lock()
  197. delete(s.listeners, l)
  198. s.mutex.Unlock()
  199. }
  200. func (s *Server) handleConn(sess quic.EarlySession) {
  201. decoder := qpack.NewDecoder(nil)
  202. // send a SETTINGS frame
  203. str, err := sess.OpenUniStream()
  204. if err != nil {
  205. s.logger.Debugf("Opening the control stream failed.")
  206. return
  207. }
  208. buf := &bytes.Buffer{}
  209. quicvarint.Write(buf, streamTypeControlStream) // stream type
  210. (&settingsFrame{Datagram: s.EnableDatagrams}).Write(buf)
  211. str.Write(buf.Bytes())
  212. go s.handleUnidirectionalStreams(sess)
  213. // Process all requests immediately.
  214. // It's the client's responsibility to decide which requests are eligible for 0-RTT.
  215. for {
  216. str, err := sess.AcceptStream(context.Background())
  217. if err != nil {
  218. s.logger.Debugf("Accepting stream failed: %s", err)
  219. return
  220. }
  221. go func() {
  222. rerr := s.handleRequest(sess, str, decoder, func() {
  223. sess.CloseWithError(quic.ApplicationErrorCode(errorFrameUnexpected), "")
  224. })
  225. if rerr.err != nil || rerr.streamErr != 0 || rerr.connErr != 0 {
  226. s.logger.Debugf("Handling request failed: %s", err)
  227. if rerr.streamErr != 0 {
  228. str.CancelWrite(quic.StreamErrorCode(rerr.streamErr))
  229. }
  230. if rerr.connErr != 0 {
  231. var reason string
  232. if rerr.err != nil {
  233. reason = rerr.err.Error()
  234. }
  235. sess.CloseWithError(quic.ApplicationErrorCode(rerr.connErr), reason)
  236. }
  237. return
  238. }
  239. str.Close()
  240. }()
  241. }
  242. }
  243. func (s *Server) handleUnidirectionalStreams(sess quic.EarlySession) {
  244. for {
  245. str, err := sess.AcceptUniStream(context.Background())
  246. if err != nil {
  247. s.logger.Debugf("accepting unidirectional stream failed: %s", err)
  248. return
  249. }
  250. go func(str quic.ReceiveStream) {
  251. streamType, err := quicvarint.Read(&byteReaderImpl{str})
  252. if err != nil {
  253. s.logger.Debugf("reading stream type on stream %d failed: %s", str.StreamID(), err)
  254. return
  255. }
  256. // We're only interested in the control stream here.
  257. switch streamType {
  258. case streamTypeControlStream:
  259. case streamTypeQPACKEncoderStream, streamTypeQPACKDecoderStream:
  260. // Our QPACK implementation doesn't use the dynamic table yet.
  261. // TODO: check that only one stream of each type is opened.
  262. return
  263. case streamTypePushStream: // only the server can push
  264. sess.CloseWithError(quic.ApplicationErrorCode(errorStreamCreationError), "")
  265. return
  266. default:
  267. str.CancelRead(quic.StreamErrorCode(errorStreamCreationError))
  268. return
  269. }
  270. f, err := parseNextFrame(str)
  271. if err != nil {
  272. sess.CloseWithError(quic.ApplicationErrorCode(errorFrameError), "")
  273. return
  274. }
  275. sf, ok := f.(*settingsFrame)
  276. if !ok {
  277. sess.CloseWithError(quic.ApplicationErrorCode(errorMissingSettings), "")
  278. return
  279. }
  280. if !sf.Datagram {
  281. return
  282. }
  283. // If datagram support was enabled on our side as well as on the client side,
  284. // we can expect it to have been negotiated both on the transport and on the HTTP/3 layer.
  285. // Note: ConnectionState() will block until the handshake is complete (relevant when using 0-RTT).
  286. if s.EnableDatagrams && !sess.ConnectionState().SupportsDatagrams {
  287. sess.CloseWithError(quic.ApplicationErrorCode(errorSettingsError), "missing QUIC Datagram support")
  288. }
  289. }(str)
  290. }
  291. }
  292. func (s *Server) maxHeaderBytes() uint64 {
  293. if s.Server.MaxHeaderBytes <= 0 {
  294. return http.DefaultMaxHeaderBytes
  295. }
  296. return uint64(s.Server.MaxHeaderBytes)
  297. }
  298. func (s *Server) handleRequest(sess quic.Session, str quic.Stream, decoder *qpack.Decoder, onFrameError func()) requestError {
  299. frame, err := parseNextFrame(str)
  300. if err != nil {
  301. return newStreamError(errorRequestIncomplete, err)
  302. }
  303. hf, ok := frame.(*headersFrame)
  304. if !ok {
  305. return newConnError(errorFrameUnexpected, errors.New("expected first frame to be a HEADERS frame"))
  306. }
  307. if hf.Length > s.maxHeaderBytes() {
  308. return newStreamError(errorFrameError, fmt.Errorf("HEADERS frame too large: %d bytes (max: %d)", hf.Length, s.maxHeaderBytes()))
  309. }
  310. headerBlock := make([]byte, hf.Length)
  311. if _, err := io.ReadFull(str, headerBlock); err != nil {
  312. return newStreamError(errorRequestIncomplete, err)
  313. }
  314. hfs, err := decoder.DecodeFull(headerBlock)
  315. if err != nil {
  316. // TODO: use the right error code
  317. return newConnError(errorGeneralProtocolError, err)
  318. }
  319. req, err := requestFromHeaders(hfs)
  320. if err != nil {
  321. // TODO: use the right error code
  322. return newStreamError(errorGeneralProtocolError, err)
  323. }
  324. req.RemoteAddr = sess.RemoteAddr().String()
  325. req.Body = newRequestBody(str, onFrameError)
  326. if s.logger.Debug() {
  327. s.logger.Infof("%s %s%s, on stream %d", req.Method, req.Host, req.RequestURI, str.StreamID())
  328. } else {
  329. s.logger.Infof("%s %s%s", req.Method, req.Host, req.RequestURI)
  330. }
  331. ctx := str.Context()
  332. ctx = context.WithValue(ctx, ServerContextKey, s)
  333. ctx = context.WithValue(ctx, http.LocalAddrContextKey, sess.LocalAddr())
  334. req = req.WithContext(ctx)
  335. r := newResponseWriter(str, s.logger)
  336. defer func() {
  337. if !r.usedDataStream() {
  338. r.Flush()
  339. }
  340. }()
  341. handler := s.Handler
  342. if handler == nil {
  343. handler = http.DefaultServeMux
  344. }
  345. var panicked bool
  346. func() {
  347. defer func() {
  348. if p := recover(); p != nil {
  349. // Copied from net/http/server.go
  350. const size = 64 << 10
  351. buf := make([]byte, size)
  352. buf = buf[:runtime.Stack(buf, false)]
  353. s.logger.Errorf("http: panic serving: %v\n%s", p, buf)
  354. panicked = true
  355. }
  356. }()
  357. handler.ServeHTTP(r, req)
  358. }()
  359. if !r.usedDataStream() {
  360. if panicked {
  361. r.WriteHeader(500)
  362. } else {
  363. r.WriteHeader(200)
  364. }
  365. // If the EOF was read by the handler, CancelRead() is a no-op.
  366. str.CancelRead(quic.StreamErrorCode(errorNoError))
  367. }
  368. return requestError{}
  369. }
  370. // Close the server immediately, aborting requests and sending CONNECTION_CLOSE frames to connected clients.
  371. // Close in combination with ListenAndServe() (instead of Serve()) may race if it is called before a UDP socket is established.
  372. func (s *Server) Close() error {
  373. s.closed.Set(true)
  374. s.mutex.Lock()
  375. defer s.mutex.Unlock()
  376. var err error
  377. for ln := range s.listeners {
  378. if cerr := (*ln).Close(); cerr != nil && err == nil {
  379. err = cerr
  380. }
  381. }
  382. return err
  383. }
  384. // CloseGracefully shuts down the server gracefully. The server sends a GOAWAY frame first, then waits for either timeout to trigger, or for all running requests to complete.
  385. // CloseGracefully in combination with ListenAndServe() (instead of Serve()) may race if it is called before a UDP socket is established.
  386. func (s *Server) CloseGracefully(timeout time.Duration) error {
  387. // TODO: implement
  388. return nil
  389. }
  390. // SetQuicHeaders can be used to set the proper headers that announce that this server supports QUIC.
  391. // The values that are set depend on the port information from s.Server.Addr, and currently look like this (if Addr has port 443):
  392. // Alt-Svc: quic=":443"; ma=2592000; v="33,32,31,30"
  393. func (s *Server) SetQuicHeaders(hdr http.Header) error {
  394. port := atomic.LoadUint32(&s.port)
  395. if port == 0 {
  396. // Extract port from s.Server.Addr
  397. _, portStr, err := net.SplitHostPort(s.Server.Addr)
  398. if err != nil {
  399. return err
  400. }
  401. portInt, err := net.LookupPort("tcp", portStr)
  402. if err != nil {
  403. return err
  404. }
  405. port = uint32(portInt)
  406. atomic.StoreUint32(&s.port, port)
  407. }
  408. // This code assumes that we will use protocol.SupportedVersions if no quic.Config is passed.
  409. supportedVersions := protocol.SupportedVersions
  410. if s.QuicConfig != nil && len(s.QuicConfig.Versions) > 0 {
  411. supportedVersions = s.QuicConfig.Versions
  412. }
  413. altSvc := make([]string, 0, len(supportedVersions))
  414. for _, version := range supportedVersions {
  415. v := versionToALPN(version)
  416. if len(v) > 0 {
  417. altSvc = append(altSvc, fmt.Sprintf(`%s=":%d"; ma=2592000`, v, port))
  418. }
  419. }
  420. hdr.Add("Alt-Svc", strings.Join(altSvc, ","))
  421. return nil
  422. }
  423. // ListenAndServeQUIC listens on the UDP network address addr and calls the
  424. // handler for HTTP/3 requests on incoming connections. http.DefaultServeMux is
  425. // used when handler is nil.
  426. func ListenAndServeQUIC(addr, certFile, keyFile string, handler http.Handler) error {
  427. server := &Server{
  428. Server: &http.Server{
  429. Addr: addr,
  430. Handler: handler,
  431. },
  432. }
  433. return server.ListenAndServeTLS(certFile, keyFile)
  434. }
  435. // ListenAndServe listens on the given network address for both, TLS and QUIC
  436. // connetions in parallel. It returns if one of the two returns an error.
  437. // http.DefaultServeMux is used when handler is nil.
  438. // The correct Alt-Svc headers for QUIC are set.
  439. func ListenAndServe(addr, certFile, keyFile string, handler http.Handler) error {
  440. // Load certs
  441. var err error
  442. certs := make([]tls.Certificate, 1)
  443. certs[0], err = tls.LoadX509KeyPair(certFile, keyFile)
  444. if err != nil {
  445. return err
  446. }
  447. // We currently only use the cert-related stuff from tls.Config,
  448. // so we don't need to make a full copy.
  449. config := &tls.Config{
  450. Certificates: certs,
  451. }
  452. // Open the listeners
  453. udpAddr, err := net.ResolveUDPAddr("udp", addr)
  454. if err != nil {
  455. return err
  456. }
  457. udpConn, err := net.ListenUDP("udp", udpAddr)
  458. if err != nil {
  459. return err
  460. }
  461. defer udpConn.Close()
  462. tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
  463. if err != nil {
  464. return err
  465. }
  466. tcpConn, err := net.ListenTCP("tcp", tcpAddr)
  467. if err != nil {
  468. return err
  469. }
  470. defer tcpConn.Close()
  471. tlsConn := tls.NewListener(tcpConn, config)
  472. defer tlsConn.Close()
  473. // Start the servers
  474. httpServer := &http.Server{
  475. Addr: addr,
  476. TLSConfig: config,
  477. }
  478. quicServer := &Server{
  479. Server: httpServer,
  480. }
  481. if handler == nil {
  482. handler = http.DefaultServeMux
  483. }
  484. httpServer.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  485. quicServer.SetQuicHeaders(w.Header())
  486. handler.ServeHTTP(w, r)
  487. })
  488. hErr := make(chan error)
  489. qErr := make(chan error)
  490. go func() {
  491. hErr <- httpServer.Serve(tlsConn)
  492. }()
  493. go func() {
  494. qErr <- quicServer.Serve(udpConn)
  495. }()
  496. select {
  497. case err := <-hErr:
  498. quicServer.Close()
  499. return err
  500. case err := <-qErr:
  501. // Cannot close the HTTP server or wait for requests to complete properly :/
  502. return err
  503. }
  504. }