quic.go 26 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060
  1. // +build !DISABLE_QUIC
  2. /*
  3. * Copyright (c) 2018, Psiphon Inc.
  4. * All rights reserved.
  5. *
  6. * This program is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License
  17. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. *
  19. */
  20. /*
  21. Package quic wraps github.com/lucas-clemente/quic-go with net.Listener and
  22. net.Conn types that provide a drop-in replacement for net.TCPConn.
  23. Each QUIC session has exactly one stream, which is the equivilent of a TCP
  24. stream.
  25. Conns returned from Accept will have an established QUIC session and are
  26. configured to perform a deferred AcceptStream on the first Read or Write.
  27. Conns returned from Dial have an established QUIC session and stream. Dial
  28. accepts a Context input which may be used to cancel the dial.
  29. Conns mask or translate qerr.PeerGoingAway to io.EOF as appropriate.
  30. QUIC idle timeouts and keep alives are tuned to mitigate aggressive UDP NAT
  31. timeouts on mobile data networks while accounting for the fact that mobile
  32. devices in standby/sleep may not be able to initiate the keep alive.
  33. */
  34. package quic
  35. import (
  36. "context"
  37. "crypto/tls"
  38. "fmt"
  39. "io"
  40. "net"
  41. "net/http"
  42. "sync"
  43. "sync/atomic"
  44. "time"
  45. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  46. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  47. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  48. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  49. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go"
  50. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/h2quic"
  51. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/qerr"
  52. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/values"
  53. ietf_quic "github.com/Psiphon-Labs/quic-go"
  54. "github.com/Psiphon-Labs/quic-go/http3"
  55. )
  56. const (
  57. SERVER_HANDSHAKE_TIMEOUT = 30 * time.Second
  58. SERVER_IDLE_TIMEOUT = 5 * time.Minute
  59. CLIENT_IDLE_TIMEOUT = 30 * time.Second
  60. )
  61. // Enabled indicates if QUIC functionality is enabled.
  62. func Enabled() bool {
  63. return true
  64. }
  65. const ietfQUICDraft24VersionNumber = 0xff000018
  66. var supportedVersionNumbers = map[string]uint32{
  67. protocol.QUIC_VERSION_GQUIC39: uint32(gquic.VersionGQUIC39),
  68. protocol.QUIC_VERSION_GQUIC43: uint32(gquic.VersionGQUIC43),
  69. protocol.QUIC_VERSION_GQUIC44: uint32(gquic.VersionGQUIC44),
  70. protocol.QUIC_VERSION_OBFUSCATED: uint32(gquic.VersionGQUIC43),
  71. protocol.QUIC_VERSION_IETF_DRAFT24: ietfQUICDraft24VersionNumber,
  72. }
  73. func isObfuscated(quicVersion string) bool {
  74. return quicVersion == protocol.QUIC_VERSION_OBFUSCATED
  75. }
  76. func isIETFVersion(versionNumber uint32) bool {
  77. return versionNumber == ietfQUICDraft24VersionNumber
  78. }
  79. func getALPN(versionNumber uint32) string {
  80. return "h3-24"
  81. }
  82. // quic_test overrides the server idle timeout.
  83. var serverIdleTimeout = SERVER_IDLE_TIMEOUT
  84. // Listener is a net.Listener.
  85. type Listener struct {
  86. *muxListener
  87. }
  88. // Listen creates a new Listener.
  89. func Listen(
  90. logger common.Logger,
  91. address string,
  92. obfuscationKey string) (net.Listener, error) {
  93. certificate, privateKey, err := common.GenerateWebServerCertificate(
  94. values.GetHostName())
  95. if err != nil {
  96. return nil, errors.Trace(err)
  97. }
  98. tlsCertificate, err := tls.X509KeyPair(
  99. []byte(certificate), []byte(privateKey))
  100. if err != nil {
  101. return nil, errors.Trace(err)
  102. }
  103. addr, err := net.ResolveUDPAddr("udp", address)
  104. if err != nil {
  105. return nil, errors.Trace(err)
  106. }
  107. udpConn, err := net.ListenUDP("udp", addr)
  108. if err != nil {
  109. return nil, errors.Trace(err)
  110. }
  111. seed, err := prng.NewSeed()
  112. if err != nil {
  113. udpConn.Close()
  114. return nil, errors.Trace(err)
  115. }
  116. obfuscatedPacketConn, err := NewObfuscatedPacketConn(udpConn, true, obfuscationKey, seed)
  117. if err != nil {
  118. udpConn.Close()
  119. return nil, errors.Trace(err)
  120. }
  121. // Note that, due to nature of muxListener, full accepts may happen before
  122. // return and caller calls Accept.
  123. listener, err := newMuxListener(logger, obfuscatedPacketConn, tlsCertificate)
  124. if err != nil {
  125. obfuscatedPacketConn.Close()
  126. return nil, errors.Trace(err)
  127. }
  128. return &Listener{muxListener: listener}, nil
  129. }
  130. // Accept returns a net.Conn that wraps a single QUIC session and stream. The
  131. // stream establishment is deferred until the first Read or Write, allowing
  132. // Accept to be called in a fast loop while goroutines spawned to handle each
  133. // net.Conn will perform the blocking AcceptStream.
  134. func (listener *Listener) Accept() (net.Conn, error) {
  135. session, err := listener.muxListener.Accept()
  136. if err != nil {
  137. return nil, errors.Trace(err)
  138. }
  139. return &Conn{
  140. session: session,
  141. deferredAcceptStream: true,
  142. }, nil
  143. }
  144. // Dial establishes a new QUIC session and stream to the server specified by
  145. // address.
  146. //
  147. // packetConn is used as the underlying packet connection for QUIC. The dial
  148. // may be cancelled by ctx; packetConn will be closed if the dial is
  149. // cancelled or fails.
  150. //
  151. // Keep alive and idle timeout functionality in QUIC is disabled as these
  152. // aspects are expected to be handled at a higher level.
  153. func Dial(
  154. ctx context.Context,
  155. packetConn net.PacketConn,
  156. remoteAddr *net.UDPAddr,
  157. quicSNIAddress string,
  158. negotiateQUICVersion string,
  159. obfuscationKey string,
  160. obfuscationPaddingSeed *prng.Seed) (net.Conn, error) {
  161. if negotiateQUICVersion == "" {
  162. return nil, errors.TraceNew("missing version")
  163. }
  164. versionNumber, ok := supportedVersionNumbers[negotiateQUICVersion]
  165. if !ok {
  166. return nil, errors.Tracef("unsupported version: %s", negotiateQUICVersion)
  167. }
  168. // Fail if the destination port is invalid. Network operations should fail
  169. // quickly in this case, but IETF quic-go has been observed to timeout,
  170. // instead of failing quickly, in the case of invalid destination port 0.
  171. if remoteAddr.Port <= 0 || remoteAddr.Port >= 65536 {
  172. return nil, errors.Tracef("invalid destination port: %d", remoteAddr.Port)
  173. }
  174. if isObfuscated(negotiateQUICVersion) {
  175. var err error
  176. packetConn, err = NewObfuscatedPacketConn(
  177. packetConn, false, obfuscationKey, obfuscationPaddingSeed)
  178. if err != nil {
  179. return nil, errors.Trace(err)
  180. }
  181. }
  182. session, err := dialQUIC(
  183. ctx,
  184. packetConn,
  185. remoteAddr,
  186. quicSNIAddress,
  187. versionNumber)
  188. if err != nil {
  189. packetConn.Close()
  190. return nil, errors.Trace(err)
  191. }
  192. type dialResult struct {
  193. conn *Conn
  194. err error
  195. }
  196. resultChannel := make(chan dialResult)
  197. go func() {
  198. stream, err := session.OpenStream()
  199. if err != nil {
  200. session.Close()
  201. resultChannel <- dialResult{err: err}
  202. return
  203. }
  204. resultChannel <- dialResult{
  205. conn: &Conn{
  206. packetConn: packetConn,
  207. session: session,
  208. stream: stream,
  209. },
  210. }
  211. }()
  212. var conn *Conn
  213. select {
  214. case result := <-resultChannel:
  215. conn, err = result.conn, result.err
  216. case <-ctx.Done():
  217. err = ctx.Err()
  218. // Interrupt the goroutine
  219. session.Close()
  220. <-resultChannel
  221. }
  222. if err != nil {
  223. packetConn.Close()
  224. return nil, errors.Trace(err)
  225. }
  226. return conn, nil
  227. }
  228. // Conn is a net.Conn and psiphon/common.Closer.
  229. type Conn struct {
  230. packetConn net.PacketConn
  231. session quicSession
  232. deferredAcceptStream bool
  233. acceptMutex sync.Mutex
  234. acceptErr error
  235. stream quicStream
  236. readMutex sync.Mutex
  237. writeMutex sync.Mutex
  238. isClosed int32
  239. }
  240. func (conn *Conn) doDeferredAcceptStream() error {
  241. conn.acceptMutex.Lock()
  242. defer conn.acceptMutex.Unlock()
  243. if conn.stream != nil {
  244. return nil
  245. }
  246. if conn.acceptErr != nil {
  247. return conn.acceptErr
  248. }
  249. stream, err := conn.session.AcceptStream()
  250. if err != nil {
  251. conn.session.Close()
  252. conn.acceptErr = errors.Trace(err)
  253. return conn.acceptErr
  254. }
  255. conn.stream = stream
  256. return nil
  257. }
  258. func (conn *Conn) Read(b []byte) (int, error) {
  259. if conn.deferredAcceptStream {
  260. err := conn.doDeferredAcceptStream()
  261. if err != nil {
  262. return 0, errors.Trace(err)
  263. }
  264. }
  265. // Add mutex to provide full net.Conn concurrency semantics.
  266. // https://github.com/lucas-clemente/quic-go/blob/9cc23135d0477baf83aa4715de39ae7070039cb2/stream.go#L64
  267. // "Read() and Write() may be called concurrently, but multiple calls to
  268. // "Read() or Write() individually must be synchronized manually."
  269. conn.readMutex.Lock()
  270. defer conn.readMutex.Unlock()
  271. n, err := conn.stream.Read(b)
  272. if conn.session.isErrorIndicatingClosed(err) {
  273. _ = conn.Close()
  274. err = io.EOF
  275. }
  276. return n, err
  277. }
  278. func (conn *Conn) Write(b []byte) (int, error) {
  279. if conn.deferredAcceptStream {
  280. err := conn.doDeferredAcceptStream()
  281. if err != nil {
  282. return 0, errors.Trace(err)
  283. }
  284. }
  285. conn.writeMutex.Lock()
  286. defer conn.writeMutex.Unlock()
  287. n, err := conn.stream.Write(b)
  288. if conn.session.isErrorIndicatingClosed(err) {
  289. _ = conn.Close()
  290. if n == len(b) {
  291. err = nil
  292. }
  293. }
  294. return n, err
  295. }
  296. func (conn *Conn) Close() error {
  297. err := conn.session.Close()
  298. if conn.packetConn != nil {
  299. err1 := conn.packetConn.Close()
  300. if err == nil {
  301. err = err1
  302. }
  303. }
  304. atomic.StoreInt32(&conn.isClosed, 1)
  305. return err
  306. }
  307. func (conn *Conn) IsClosed() bool {
  308. return atomic.LoadInt32(&conn.isClosed) == 1
  309. }
  310. func (conn *Conn) LocalAddr() net.Addr {
  311. return conn.session.LocalAddr()
  312. }
  313. func (conn *Conn) RemoteAddr() net.Addr {
  314. return conn.session.RemoteAddr()
  315. }
  316. func (conn *Conn) SetDeadline(t time.Time) error {
  317. if conn.deferredAcceptStream {
  318. err := conn.doDeferredAcceptStream()
  319. if err != nil {
  320. return errors.Trace(err)
  321. }
  322. }
  323. return conn.stream.SetDeadline(t)
  324. }
  325. func (conn *Conn) SetReadDeadline(t time.Time) error {
  326. if conn.deferredAcceptStream {
  327. err := conn.doDeferredAcceptStream()
  328. if err != nil {
  329. return errors.Trace(err)
  330. }
  331. }
  332. return conn.stream.SetReadDeadline(t)
  333. }
  334. func (conn *Conn) SetWriteDeadline(t time.Time) error {
  335. if conn.deferredAcceptStream {
  336. err := conn.doDeferredAcceptStream()
  337. if err != nil {
  338. return errors.Trace(err)
  339. }
  340. }
  341. return conn.stream.SetWriteDeadline(t)
  342. }
  343. // QUICTransporter implements the psiphon.transporter interface, used in
  344. // psiphon.MeekConn for HTTP requests, which requires a RoundTripper and
  345. // CloseIdleConnections.
  346. type QUICTransporter struct {
  347. quicRoundTripper
  348. noticeEmitter func(string)
  349. udpDialer func(ctx context.Context) (net.PacketConn, *net.UDPAddr, error)
  350. quicSNIAddress string
  351. negotiateQUICVersion string
  352. packetConn atomic.Value
  353. mutex sync.Mutex
  354. ctx context.Context
  355. }
  356. // NewQUICTransporter creates a new QUICTransporter.
  357. func NewQUICTransporter(
  358. ctx context.Context,
  359. noticeEmitter func(string),
  360. udpDialer func(ctx context.Context) (net.PacketConn, *net.UDPAddr, error),
  361. quicSNIAddress string,
  362. negotiateQUICVersion string) (*QUICTransporter, error) {
  363. versionNumber, ok := supportedVersionNumbers[negotiateQUICVersion]
  364. if !ok {
  365. return nil, errors.Tracef("unsupported version: %s", negotiateQUICVersion)
  366. }
  367. t := &QUICTransporter{
  368. noticeEmitter: noticeEmitter,
  369. udpDialer: udpDialer,
  370. quicSNIAddress: quicSNIAddress,
  371. negotiateQUICVersion: negotiateQUICVersion,
  372. ctx: ctx,
  373. }
  374. if isIETFVersion(versionNumber) {
  375. t.quicRoundTripper = &http3.RoundTripper{Dial: t.dialIETFQUIC}
  376. } else {
  377. t.quicRoundTripper = &h2quic.RoundTripper{Dial: t.dialgQUIC}
  378. }
  379. return t, nil
  380. }
  381. func (t *QUICTransporter) SetRequestContext(ctx context.Context) {
  382. // Note: can't use sync.Value since underlying type of ctx changes.
  383. t.mutex.Lock()
  384. defer t.mutex.Unlock()
  385. t.ctx = ctx
  386. }
  387. // CloseIdleConnections wraps QUIC RoundTripper.Close, which provides the
  388. // necessary functionality for psiphon.transporter as used by
  389. // psiphon.MeekConn. Note that, unlike http.Transport.CloseIdleConnections,
  390. // the connections are closed regardless of idle status.
  391. func (t *QUICTransporter) CloseIdleConnections() {
  392. // This operation doesn't prevent a concurrent http3.client.dial from
  393. // establishing a new packet conn; we also rely on the request context to
  394. // fully interrupt and stop a http3.RoundTripper.
  395. t.closePacketConn()
  396. t.quicRoundTripper.Close()
  397. }
  398. func (t *QUICTransporter) closePacketConn() {
  399. packetConn := t.packetConn.Load()
  400. if p, ok := packetConn.(net.PacketConn); ok {
  401. p.Close()
  402. }
  403. }
  404. func (t *QUICTransporter) dialIETFQUIC(
  405. _, _ string, _ *tls.Config, _ *ietf_quic.Config) (ietf_quic.Session, error) {
  406. session, err := t.dialQUIC()
  407. if err != nil {
  408. return nil, errors.Trace(err)
  409. }
  410. return session.(*ietfQUICSession).Session, nil
  411. }
  412. func (t *QUICTransporter) dialgQUIC(
  413. _, _ string, _ *tls.Config, _ *gquic.Config) (gquic.Session, error) {
  414. session, err := t.dialQUIC()
  415. if err != nil {
  416. return nil, errors.Trace(err)
  417. }
  418. return session.(*gQUICSession).Session, nil
  419. }
  420. func (t *QUICTransporter) dialQUIC() (retSession quicSession, retErr error) {
  421. defer func() {
  422. if retErr != nil && t.noticeEmitter != nil {
  423. t.noticeEmitter(fmt.Sprintf("QUICTransporter.dialQUIC failed: %s", retErr))
  424. }
  425. }()
  426. if t.negotiateQUICVersion == "" {
  427. return nil, errors.TraceNew("missing version")
  428. }
  429. versionNumber, ok := supportedVersionNumbers[t.negotiateQUICVersion]
  430. if !ok {
  431. return nil, errors.Tracef("unsupported version: %s", t.negotiateQUICVersion)
  432. }
  433. t.mutex.Lock()
  434. ctx := t.ctx
  435. t.mutex.Unlock()
  436. if ctx == nil {
  437. ctx = context.Background()
  438. }
  439. packetConn, remoteAddr, err := t.udpDialer(ctx)
  440. if err != nil {
  441. return nil, errors.Trace(err)
  442. }
  443. session, err := dialQUIC(
  444. ctx,
  445. packetConn,
  446. remoteAddr,
  447. t.quicSNIAddress,
  448. versionNumber)
  449. if err != nil {
  450. packetConn.Close()
  451. return nil, errors.Trace(err)
  452. }
  453. // dialQUIC uses quic-go.DialContext as we must create our own UDP sockets to
  454. // set properties such as BIND_TO_DEVICE. However, when DialContext is used,
  455. // quic-go does not take responsibiity for closing the underlying packetConn
  456. // when the QUIC session is closed.
  457. //
  458. // We track the most recent packetConn in QUICTransporter and close it:
  459. // - when CloseIdleConnections is called, as it is by psiphon.MeekConn when
  460. // it is closing;
  461. // - here in dialFunc, with the assumption that only one concurrent QUIC
  462. // session is used per h2quic.RoundTripper.
  463. //
  464. // This code also assume no concurrent calls to dialFunc, as otherwise a race
  465. // condition exists between closePacketConn and Store.
  466. t.closePacketConn()
  467. t.packetConn.Store(packetConn)
  468. return session, nil
  469. }
  470. // The following code provides support for using both gQUIC and IETF QUIC,
  471. // which are implemented in two different branches (now forks) of quic-go.
  472. //
  473. // dialQUIC uses the appropriate quic-go and returns quicSession which wraps
  474. // either a ietf_quic.Session or gquic.Session.
  475. //
  476. // muxPacketConn provides a multiplexing listener that directs packets to
  477. // either a ietf_quic.Listener or a gquic.Listener based on the content of the
  478. // packet.
  479. type quicListener interface {
  480. Close() error
  481. Accept() (quicSession, error)
  482. }
  483. type quicSession interface {
  484. io.Closer
  485. LocalAddr() net.Addr
  486. RemoteAddr() net.Addr
  487. AcceptStream() (quicStream, error)
  488. OpenStream() (quicStream, error)
  489. isErrorIndicatingClosed(err error) bool
  490. }
  491. type quicStream interface {
  492. io.Reader
  493. io.Writer
  494. io.Closer
  495. SetReadDeadline(t time.Time) error
  496. SetWriteDeadline(t time.Time) error
  497. SetDeadline(t time.Time) error
  498. }
  499. type quicRoundTripper interface {
  500. http.RoundTripper
  501. Close() error
  502. }
  503. type ietfQUICListener struct {
  504. ietf_quic.Listener
  505. }
  506. func (l *ietfQUICListener) Accept() (quicSession, error) {
  507. // A specific context is not provided since the interface needs to match the
  508. // gquic-go API, which lacks context support.
  509. session, err := l.Listener.Accept(context.Background())
  510. if err != nil {
  511. return nil, errors.Trace(err)
  512. }
  513. return &ietfQUICSession{Session: session}, nil
  514. }
  515. type ietfQUICSession struct {
  516. ietf_quic.Session
  517. }
  518. func (s *ietfQUICSession) AcceptStream() (quicStream, error) {
  519. // A specific context is not provided since the interface needs to match the
  520. // gquic-go API, which lacks context support.
  521. //
  522. // TODO: once gQUIC support is retired, this context may be used in place
  523. // of the deferredAcceptStream mechanism.
  524. stream, err := s.Session.AcceptStream(context.Background())
  525. if err != nil {
  526. return nil, errors.Trace(err)
  527. }
  528. return stream, nil
  529. }
  530. func (s *ietfQUICSession) OpenStream() (quicStream, error) {
  531. return s.Session.OpenStream()
  532. }
  533. func (s *ietfQUICSession) isErrorIndicatingClosed(err error) bool {
  534. if err == nil {
  535. return false
  536. }
  537. errStr := err.Error()
  538. // The target error is of type *qerr.QuicError, but is not exported.
  539. return errStr == "Application error 0x0" ||
  540. errStr == "NO_ERROR: No recent network activity"
  541. }
  542. type gQUICListener struct {
  543. gquic.Listener
  544. }
  545. func (l *gQUICListener) Accept() (quicSession, error) {
  546. session, err := l.Listener.Accept()
  547. if err != nil {
  548. return nil, errors.Trace(err)
  549. }
  550. return &gQUICSession{Session: session}, nil
  551. }
  552. type gQUICSession struct {
  553. gquic.Session
  554. }
  555. func (s *gQUICSession) AcceptStream() (quicStream, error) {
  556. stream, err := s.Session.AcceptStream()
  557. if err != nil {
  558. return nil, errors.Trace(err)
  559. }
  560. return stream, nil
  561. }
  562. func (s *gQUICSession) OpenStream() (quicStream, error) {
  563. return s.Session.OpenStream()
  564. }
  565. func (s *gQUICSession) isErrorIndicatingClosed(err error) bool {
  566. if err == nil {
  567. return false
  568. }
  569. if quicErr, ok := err.(*qerr.QuicError); ok {
  570. switch quicErr.ErrorCode {
  571. case qerr.PeerGoingAway, qerr.NetworkIdleTimeout:
  572. return true
  573. }
  574. }
  575. return false
  576. }
  577. func dialQUIC(
  578. ctx context.Context,
  579. packetConn net.PacketConn,
  580. remoteAddr *net.UDPAddr,
  581. quicSNIAddress string,
  582. versionNumber uint32) (quicSession, error) {
  583. if isIETFVersion(versionNumber) {
  584. quicConfig := &ietf_quic.Config{
  585. HandshakeTimeout: time.Duration(1<<63 - 1),
  586. IdleTimeout: CLIENT_IDLE_TIMEOUT,
  587. KeepAlive: true,
  588. Versions: []ietf_quic.VersionNumber{
  589. ietf_quic.VersionNumber(versionNumber)},
  590. }
  591. deadline, ok := ctx.Deadline()
  592. if ok {
  593. quicConfig.HandshakeTimeout = time.Until(deadline)
  594. }
  595. dialSession, err := ietf_quic.DialContext(
  596. ctx,
  597. packetConn,
  598. remoteAddr,
  599. quicSNIAddress,
  600. &tls.Config{
  601. InsecureSkipVerify: true,
  602. NextProtos: []string{getALPN(versionNumber)},
  603. },
  604. quicConfig)
  605. if err != nil {
  606. return nil, errors.Trace(err)
  607. }
  608. return &ietfQUICSession{Session: dialSession}, nil
  609. } else {
  610. quicConfig := &gquic.Config{
  611. HandshakeTimeout: time.Duration(1<<63 - 1),
  612. IdleTimeout: CLIENT_IDLE_TIMEOUT,
  613. KeepAlive: true,
  614. Versions: []gquic.VersionNumber{
  615. gquic.VersionNumber(versionNumber)},
  616. }
  617. deadline, ok := ctx.Deadline()
  618. if ok {
  619. quicConfig.HandshakeTimeout = time.Until(deadline)
  620. }
  621. dialSession, err := gquic.DialContext(
  622. ctx,
  623. packetConn,
  624. remoteAddr,
  625. quicSNIAddress,
  626. &tls.Config{
  627. InsecureSkipVerify: true,
  628. },
  629. quicConfig)
  630. if err != nil {
  631. return nil, errors.Trace(err)
  632. }
  633. return &gQUICSession{Session: dialSession}, nil
  634. }
  635. }
  636. const (
  637. muxPacketQueueSize = 128
  638. muxPacketBufferSize = 1452 // quic-go.MaxReceivePacketSize
  639. )
  640. type packet struct {
  641. addr net.Addr
  642. size int
  643. data []byte
  644. }
  645. // muxPacketConn delivers packets to a specific quic-go listener.
  646. type muxPacketConn struct {
  647. localAddr net.Addr
  648. listener *muxListener
  649. packets chan *packet
  650. }
  651. func newMuxPacketConn(localAddr net.Addr, listener *muxListener) *muxPacketConn {
  652. return &muxPacketConn{
  653. localAddr: localAddr,
  654. listener: listener,
  655. packets: make(chan *packet, muxPacketQueueSize),
  656. }
  657. }
  658. func (conn *muxPacketConn) ReadFrom(b []byte) (int, net.Addr, error) {
  659. select {
  660. case p := <-conn.packets:
  661. // If b is too short, the packet is truncated. This won't happen as long as
  662. // muxPacketBufferSize matches quic-go.MaxReceivePacketSize.
  663. copy(b, p.data[0:p.size])
  664. n := p.size
  665. addr := p.addr
  666. // Clear and replace packet buffer.
  667. p.size = 0
  668. conn.listener.packets <- p
  669. return n, addr, nil
  670. case <-conn.listener.stopBroadcast:
  671. return 0, nil, io.EOF
  672. }
  673. }
  674. func (conn *muxPacketConn) WriteTo(b []byte, addr net.Addr) (int, error) {
  675. return conn.listener.conn.WriteTo(b, addr)
  676. }
  677. func (conn *muxPacketConn) Close() error {
  678. // This Close won't unblock Read/Write operations or propagate the Close
  679. // signal up to muxListener. The correct way to shutdown is to call
  680. // muxListener.Close.
  681. return nil
  682. }
  683. func (conn *muxPacketConn) LocalAddr() net.Addr {
  684. return conn.localAddr
  685. }
  686. func (conn *muxPacketConn) SetDeadline(t time.Time) error {
  687. return errors.TraceNew("not supported")
  688. }
  689. func (conn *muxPacketConn) SetReadDeadline(t time.Time) error {
  690. return errors.TraceNew("not supported")
  691. }
  692. func (conn *muxPacketConn) SetWriteDeadline(t time.Time) error {
  693. return errors.TraceNew("not supported")
  694. }
  695. // muxListener is a multiplexing packet conn listener which relays packets to
  696. // multiple quic-go listeners.
  697. type muxListener struct {
  698. logger common.Logger
  699. isClosed int32
  700. runWaitGroup *sync.WaitGroup
  701. stopBroadcast chan struct{}
  702. conn *ObfuscatedPacketConn
  703. packets chan *packet
  704. acceptedSessions chan quicSession
  705. ietfQUICConn *muxPacketConn
  706. ietfQUICListener quicListener
  707. gQUICConn *muxPacketConn
  708. gQUICListener quicListener
  709. }
  710. func newMuxListener(
  711. logger common.Logger,
  712. conn *ObfuscatedPacketConn,
  713. tlsCertificate tls.Certificate) (*muxListener, error) {
  714. listener := &muxListener{
  715. logger: logger,
  716. runWaitGroup: new(sync.WaitGroup),
  717. stopBroadcast: make(chan struct{}),
  718. conn: conn,
  719. packets: make(chan *packet, muxPacketQueueSize),
  720. acceptedSessions: make(chan quicSession, 2), // 1 per listener
  721. }
  722. // All packet relay buffers are allocated in advance.
  723. for i := 0; i < muxPacketQueueSize; i++ {
  724. listener.packets <- &packet{data: make([]byte, muxPacketBufferSize)}
  725. }
  726. listener.ietfQUICConn = newMuxPacketConn(conn.LocalAddr(), listener)
  727. tlsConfig := &tls.Config{
  728. Certificates: []tls.Certificate{tlsCertificate},
  729. NextProtos: []string{getALPN(ietfQUICDraft24VersionNumber)},
  730. }
  731. ietfQUICConfig := &ietf_quic.Config{
  732. HandshakeTimeout: SERVER_HANDSHAKE_TIMEOUT,
  733. IdleTimeout: serverIdleTimeout,
  734. MaxIncomingStreams: 1,
  735. MaxIncomingUniStreams: -1,
  736. KeepAlive: true,
  737. }
  738. il, err := ietf_quic.Listen(listener.ietfQUICConn, tlsConfig, ietfQUICConfig)
  739. if err != nil {
  740. return nil, errors.Trace(err)
  741. }
  742. listener.ietfQUICListener = &ietfQUICListener{Listener: il}
  743. listener.gQUICConn = newMuxPacketConn(conn.LocalAddr(), listener)
  744. tlsConfig = &tls.Config{
  745. Certificates: []tls.Certificate{tlsCertificate},
  746. }
  747. gQUICConfig := &gquic.Config{
  748. HandshakeTimeout: SERVER_HANDSHAKE_TIMEOUT,
  749. IdleTimeout: serverIdleTimeout,
  750. MaxIncomingStreams: 1,
  751. MaxIncomingUniStreams: -1,
  752. KeepAlive: true,
  753. }
  754. gl, err := gquic.Listen(listener.gQUICConn, tlsConfig, gQUICConfig)
  755. if err != nil {
  756. listener.ietfQUICListener.Close()
  757. return nil, errors.Trace(err)
  758. }
  759. listener.gQUICListener = &gQUICListener{Listener: gl}
  760. listener.runWaitGroup.Add(3)
  761. go listener.relayPackets()
  762. go listener.relayAcceptedSessions(listener.gQUICListener)
  763. go listener.relayAcceptedSessions(listener.ietfQUICListener)
  764. return listener, nil
  765. }
  766. func (listener *muxListener) relayPackets() {
  767. defer listener.runWaitGroup.Done()
  768. for {
  769. var p *packet
  770. select {
  771. case p = <-listener.packets:
  772. case <-listener.stopBroadcast:
  773. return
  774. }
  775. // Read network packets. The DPI functionality of the obfuscation layer
  776. // identifies the type of QUIC, gQUIC or IETF, in addition to identifying
  777. // and processing obfuscation. This type information determines which
  778. // quic-go receives the packet.
  779. //
  780. // Network errors are not relayed to quic-go, as it will shut down the
  781. // server on any error returned from ReadFrom, even net.Error.Temporary()
  782. // errors.
  783. var isIETF bool
  784. var err error
  785. p.size, p.addr, isIETF, err = listener.conn.readFromWithType(p.data)
  786. if err != nil {
  787. if listener.logger != nil {
  788. message := "readFromWithType failed"
  789. if e, ok := err.(net.Error); ok && e.Temporary() {
  790. listener.logger.WithTraceFields(
  791. common.LogFields{"error": err}).Debug(message)
  792. } else {
  793. listener.logger.WithTraceFields(
  794. common.LogFields{"error": err}).Warning(message)
  795. }
  796. }
  797. // TODO: propagate non-temporary errors to Accept?
  798. listener.packets <- p
  799. continue
  800. }
  801. // Send the packet to the correct quic-go. The packet is dropped if the
  802. // target quic-go packet queue is full.
  803. if isIETF {
  804. select {
  805. case listener.ietfQUICConn.packets <- p:
  806. default:
  807. listener.packets <- p
  808. }
  809. } else {
  810. select {
  811. case listener.gQUICConn.packets <- p:
  812. default:
  813. listener.packets <- p
  814. }
  815. }
  816. }
  817. }
  818. func (listener *muxListener) relayAcceptedSessions(l quicListener) {
  819. defer listener.runWaitGroup.Done()
  820. for {
  821. session, err := l.Accept()
  822. if err != nil {
  823. if listener.logger != nil {
  824. message := "Accept failed"
  825. if e, ok := err.(net.Error); ok && e.Temporary() {
  826. listener.logger.WithTraceFields(
  827. common.LogFields{"error": err}).Debug(message)
  828. } else {
  829. listener.logger.WithTraceFields(
  830. common.LogFields{"error": err}).Warning(message)
  831. }
  832. }
  833. // TODO: propagate non-temporary errors to Accept?
  834. select {
  835. case <-listener.stopBroadcast:
  836. return
  837. default:
  838. }
  839. continue
  840. }
  841. select {
  842. case listener.acceptedSessions <- session:
  843. case <-listener.stopBroadcast:
  844. return
  845. }
  846. }
  847. }
  848. func (listener *muxListener) Accept() (quicSession, error) {
  849. select {
  850. case conn := <-listener.acceptedSessions:
  851. return conn, nil
  852. case <-listener.stopBroadcast:
  853. return nil, errors.TraceNew("closed")
  854. }
  855. }
  856. func (listener *muxListener) Close() error {
  857. // Ensure close channel only called once.
  858. if !atomic.CompareAndSwapInt32(&listener.isClosed, 0, 1) {
  859. return nil
  860. }
  861. close(listener.stopBroadcast)
  862. var retErr error
  863. err := listener.gQUICListener.Close()
  864. if err != nil && retErr == nil {
  865. retErr = errors.Trace(err)
  866. }
  867. err = listener.ietfQUICListener.Close()
  868. if err != nil && retErr == nil {
  869. retErr = errors.Trace(err)
  870. }
  871. err = listener.conn.Close()
  872. if err != nil && retErr == nil {
  873. retErr = errors.Trace(err)
  874. }
  875. listener.runWaitGroup.Wait()
  876. return retErr
  877. }
  878. func (listener *muxListener) Addr() net.Addr {
  879. return listener.conn.LocalAddr()
  880. }