| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060 |
- // +build !DISABLE_QUIC
- /*
- * Copyright (c) 2018, Psiphon Inc.
- * All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- */
- /*
- Package quic wraps github.com/lucas-clemente/quic-go with net.Listener and
- net.Conn types that provide a drop-in replacement for net.TCPConn.
- Each QUIC session has exactly one stream, which is the equivilent of a TCP
- stream.
- Conns returned from Accept will have an established QUIC session and are
- configured to perform a deferred AcceptStream on the first Read or Write.
- Conns returned from Dial have an established QUIC session and stream. Dial
- accepts a Context input which may be used to cancel the dial.
- Conns mask or translate qerr.PeerGoingAway to io.EOF as appropriate.
- QUIC idle timeouts and keep alives are tuned to mitigate aggressive UDP NAT
- timeouts on mobile data networks while accounting for the fact that mobile
- devices in standby/sleep may not be able to initiate the keep alive.
- */
- package quic
- import (
- "context"
- "crypto/tls"
- "fmt"
- "io"
- "net"
- "net/http"
- "sync"
- "sync/atomic"
- "time"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/h2quic"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/qerr"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/values"
- ietf_quic "github.com/Psiphon-Labs/quic-go"
- "github.com/Psiphon-Labs/quic-go/http3"
- )
- const (
- SERVER_HANDSHAKE_TIMEOUT = 30 * time.Second
- SERVER_IDLE_TIMEOUT = 5 * time.Minute
- CLIENT_IDLE_TIMEOUT = 30 * time.Second
- )
- // Enabled indicates if QUIC functionality is enabled.
- func Enabled() bool {
- return true
- }
- const ietfQUICDraft24VersionNumber = 0xff000018
- var supportedVersionNumbers = map[string]uint32{
- protocol.QUIC_VERSION_GQUIC39: uint32(gquic.VersionGQUIC39),
- protocol.QUIC_VERSION_GQUIC43: uint32(gquic.VersionGQUIC43),
- protocol.QUIC_VERSION_GQUIC44: uint32(gquic.VersionGQUIC44),
- protocol.QUIC_VERSION_OBFUSCATED: uint32(gquic.VersionGQUIC43),
- protocol.QUIC_VERSION_IETF_DRAFT24: ietfQUICDraft24VersionNumber,
- }
- func isObfuscated(quicVersion string) bool {
- return quicVersion == protocol.QUIC_VERSION_OBFUSCATED
- }
- func isIETFVersion(versionNumber uint32) bool {
- return versionNumber == ietfQUICDraft24VersionNumber
- }
- func getALPN(versionNumber uint32) string {
- return "h3-24"
- }
- // quic_test overrides the server idle timeout.
- var serverIdleTimeout = SERVER_IDLE_TIMEOUT
- // Listener is a net.Listener.
- type Listener struct {
- *muxListener
- }
- // Listen creates a new Listener.
- func Listen(
- logger common.Logger,
- address string,
- obfuscationKey string) (net.Listener, error) {
- certificate, privateKey, err := common.GenerateWebServerCertificate(
- values.GetHostName())
- if err != nil {
- return nil, errors.Trace(err)
- }
- tlsCertificate, err := tls.X509KeyPair(
- []byte(certificate), []byte(privateKey))
- if err != nil {
- return nil, errors.Trace(err)
- }
- addr, err := net.ResolveUDPAddr("udp", address)
- if err != nil {
- return nil, errors.Trace(err)
- }
- udpConn, err := net.ListenUDP("udp", addr)
- if err != nil {
- return nil, errors.Trace(err)
- }
- seed, err := prng.NewSeed()
- if err != nil {
- udpConn.Close()
- return nil, errors.Trace(err)
- }
- obfuscatedPacketConn, err := NewObfuscatedPacketConn(udpConn, true, obfuscationKey, seed)
- if err != nil {
- udpConn.Close()
- return nil, errors.Trace(err)
- }
- // Note that, due to nature of muxListener, full accepts may happen before
- // return and caller calls Accept.
- listener, err := newMuxListener(logger, obfuscatedPacketConn, tlsCertificate)
- if err != nil {
- obfuscatedPacketConn.Close()
- return nil, errors.Trace(err)
- }
- return &Listener{muxListener: listener}, nil
- }
- // Accept returns a net.Conn that wraps a single QUIC session and stream. The
- // stream establishment is deferred until the first Read or Write, allowing
- // Accept to be called in a fast loop while goroutines spawned to handle each
- // net.Conn will perform the blocking AcceptStream.
- func (listener *Listener) Accept() (net.Conn, error) {
- session, err := listener.muxListener.Accept()
- if err != nil {
- return nil, errors.Trace(err)
- }
- return &Conn{
- session: session,
- deferredAcceptStream: true,
- }, nil
- }
- // Dial establishes a new QUIC session and stream to the server specified by
- // address.
- //
- // packetConn is used as the underlying packet connection for QUIC. The dial
- // may be cancelled by ctx; packetConn will be closed if the dial is
- // cancelled or fails.
- //
- // Keep alive and idle timeout functionality in QUIC is disabled as these
- // aspects are expected to be handled at a higher level.
- func Dial(
- ctx context.Context,
- packetConn net.PacketConn,
- remoteAddr *net.UDPAddr,
- quicSNIAddress string,
- negotiateQUICVersion string,
- obfuscationKey string,
- obfuscationPaddingSeed *prng.Seed) (net.Conn, error) {
- if negotiateQUICVersion == "" {
- return nil, errors.TraceNew("missing version")
- }
- versionNumber, ok := supportedVersionNumbers[negotiateQUICVersion]
- if !ok {
- return nil, errors.Tracef("unsupported version: %s", negotiateQUICVersion)
- }
- // Fail if the destination port is invalid. Network operations should fail
- // quickly in this case, but IETF quic-go has been observed to timeout,
- // instead of failing quickly, in the case of invalid destination port 0.
- if remoteAddr.Port <= 0 || remoteAddr.Port >= 65536 {
- return nil, errors.Tracef("invalid destination port: %d", remoteAddr.Port)
- }
- if isObfuscated(negotiateQUICVersion) {
- var err error
- packetConn, err = NewObfuscatedPacketConn(
- packetConn, false, obfuscationKey, obfuscationPaddingSeed)
- if err != nil {
- return nil, errors.Trace(err)
- }
- }
- session, err := dialQUIC(
- ctx,
- packetConn,
- remoteAddr,
- quicSNIAddress,
- versionNumber)
- if err != nil {
- packetConn.Close()
- return nil, errors.Trace(err)
- }
- type dialResult struct {
- conn *Conn
- err error
- }
- resultChannel := make(chan dialResult)
- go func() {
- stream, err := session.OpenStream()
- if err != nil {
- session.Close()
- resultChannel <- dialResult{err: err}
- return
- }
- resultChannel <- dialResult{
- conn: &Conn{
- packetConn: packetConn,
- session: session,
- stream: stream,
- },
- }
- }()
- var conn *Conn
- select {
- case result := <-resultChannel:
- conn, err = result.conn, result.err
- case <-ctx.Done():
- err = ctx.Err()
- // Interrupt the goroutine
- session.Close()
- <-resultChannel
- }
- if err != nil {
- packetConn.Close()
- return nil, errors.Trace(err)
- }
- return conn, nil
- }
- // Conn is a net.Conn and psiphon/common.Closer.
- type Conn struct {
- packetConn net.PacketConn
- session quicSession
- deferredAcceptStream bool
- acceptMutex sync.Mutex
- acceptErr error
- stream quicStream
- readMutex sync.Mutex
- writeMutex sync.Mutex
- isClosed int32
- }
- func (conn *Conn) doDeferredAcceptStream() error {
- conn.acceptMutex.Lock()
- defer conn.acceptMutex.Unlock()
- if conn.stream != nil {
- return nil
- }
- if conn.acceptErr != nil {
- return conn.acceptErr
- }
- stream, err := conn.session.AcceptStream()
- if err != nil {
- conn.session.Close()
- conn.acceptErr = errors.Trace(err)
- return conn.acceptErr
- }
- conn.stream = stream
- return nil
- }
- func (conn *Conn) Read(b []byte) (int, error) {
- if conn.deferredAcceptStream {
- err := conn.doDeferredAcceptStream()
- if err != nil {
- return 0, errors.Trace(err)
- }
- }
- // Add mutex to provide full net.Conn concurrency semantics.
- // https://github.com/lucas-clemente/quic-go/blob/9cc23135d0477baf83aa4715de39ae7070039cb2/stream.go#L64
- // "Read() and Write() may be called concurrently, but multiple calls to
- // "Read() or Write() individually must be synchronized manually."
- conn.readMutex.Lock()
- defer conn.readMutex.Unlock()
- n, err := conn.stream.Read(b)
- if conn.session.isErrorIndicatingClosed(err) {
- _ = conn.Close()
- err = io.EOF
- }
- return n, err
- }
- func (conn *Conn) Write(b []byte) (int, error) {
- if conn.deferredAcceptStream {
- err := conn.doDeferredAcceptStream()
- if err != nil {
- return 0, errors.Trace(err)
- }
- }
- conn.writeMutex.Lock()
- defer conn.writeMutex.Unlock()
- n, err := conn.stream.Write(b)
- if conn.session.isErrorIndicatingClosed(err) {
- _ = conn.Close()
- if n == len(b) {
- err = nil
- }
- }
- return n, err
- }
- func (conn *Conn) Close() error {
- err := conn.session.Close()
- if conn.packetConn != nil {
- err1 := conn.packetConn.Close()
- if err == nil {
- err = err1
- }
- }
- atomic.StoreInt32(&conn.isClosed, 1)
- return err
- }
- func (conn *Conn) IsClosed() bool {
- return atomic.LoadInt32(&conn.isClosed) == 1
- }
- func (conn *Conn) LocalAddr() net.Addr {
- return conn.session.LocalAddr()
- }
- func (conn *Conn) RemoteAddr() net.Addr {
- return conn.session.RemoteAddr()
- }
- func (conn *Conn) SetDeadline(t time.Time) error {
- if conn.deferredAcceptStream {
- err := conn.doDeferredAcceptStream()
- if err != nil {
- return errors.Trace(err)
- }
- }
- return conn.stream.SetDeadline(t)
- }
- func (conn *Conn) SetReadDeadline(t time.Time) error {
- if conn.deferredAcceptStream {
- err := conn.doDeferredAcceptStream()
- if err != nil {
- return errors.Trace(err)
- }
- }
- return conn.stream.SetReadDeadline(t)
- }
- func (conn *Conn) SetWriteDeadline(t time.Time) error {
- if conn.deferredAcceptStream {
- err := conn.doDeferredAcceptStream()
- if err != nil {
- return errors.Trace(err)
- }
- }
- return conn.stream.SetWriteDeadline(t)
- }
- // QUICTransporter implements the psiphon.transporter interface, used in
- // psiphon.MeekConn for HTTP requests, which requires a RoundTripper and
- // CloseIdleConnections.
- type QUICTransporter struct {
- quicRoundTripper
- noticeEmitter func(string)
- udpDialer func(ctx context.Context) (net.PacketConn, *net.UDPAddr, error)
- quicSNIAddress string
- negotiateQUICVersion string
- packetConn atomic.Value
- mutex sync.Mutex
- ctx context.Context
- }
- // NewQUICTransporter creates a new QUICTransporter.
- func NewQUICTransporter(
- ctx context.Context,
- noticeEmitter func(string),
- udpDialer func(ctx context.Context) (net.PacketConn, *net.UDPAddr, error),
- quicSNIAddress string,
- negotiateQUICVersion string) (*QUICTransporter, error) {
- versionNumber, ok := supportedVersionNumbers[negotiateQUICVersion]
- if !ok {
- return nil, errors.Tracef("unsupported version: %s", negotiateQUICVersion)
- }
- t := &QUICTransporter{
- noticeEmitter: noticeEmitter,
- udpDialer: udpDialer,
- quicSNIAddress: quicSNIAddress,
- negotiateQUICVersion: negotiateQUICVersion,
- ctx: ctx,
- }
- if isIETFVersion(versionNumber) {
- t.quicRoundTripper = &http3.RoundTripper{Dial: t.dialIETFQUIC}
- } else {
- t.quicRoundTripper = &h2quic.RoundTripper{Dial: t.dialgQUIC}
- }
- return t, nil
- }
- func (t *QUICTransporter) SetRequestContext(ctx context.Context) {
- // Note: can't use sync.Value since underlying type of ctx changes.
- t.mutex.Lock()
- defer t.mutex.Unlock()
- t.ctx = ctx
- }
- // CloseIdleConnections wraps QUIC RoundTripper.Close, which provides the
- // necessary functionality for psiphon.transporter as used by
- // psiphon.MeekConn. Note that, unlike http.Transport.CloseIdleConnections,
- // the connections are closed regardless of idle status.
- func (t *QUICTransporter) CloseIdleConnections() {
- // This operation doesn't prevent a concurrent http3.client.dial from
- // establishing a new packet conn; we also rely on the request context to
- // fully interrupt and stop a http3.RoundTripper.
- t.closePacketConn()
- t.quicRoundTripper.Close()
- }
- func (t *QUICTransporter) closePacketConn() {
- packetConn := t.packetConn.Load()
- if p, ok := packetConn.(net.PacketConn); ok {
- p.Close()
- }
- }
- func (t *QUICTransporter) dialIETFQUIC(
- _, _ string, _ *tls.Config, _ *ietf_quic.Config) (ietf_quic.Session, error) {
- session, err := t.dialQUIC()
- if err != nil {
- return nil, errors.Trace(err)
- }
- return session.(*ietfQUICSession).Session, nil
- }
- func (t *QUICTransporter) dialgQUIC(
- _, _ string, _ *tls.Config, _ *gquic.Config) (gquic.Session, error) {
- session, err := t.dialQUIC()
- if err != nil {
- return nil, errors.Trace(err)
- }
- return session.(*gQUICSession).Session, nil
- }
- func (t *QUICTransporter) dialQUIC() (retSession quicSession, retErr error) {
- defer func() {
- if retErr != nil && t.noticeEmitter != nil {
- t.noticeEmitter(fmt.Sprintf("QUICTransporter.dialQUIC failed: %s", retErr))
- }
- }()
- if t.negotiateQUICVersion == "" {
- return nil, errors.TraceNew("missing version")
- }
- versionNumber, ok := supportedVersionNumbers[t.negotiateQUICVersion]
- if !ok {
- return nil, errors.Tracef("unsupported version: %s", t.negotiateQUICVersion)
- }
- t.mutex.Lock()
- ctx := t.ctx
- t.mutex.Unlock()
- if ctx == nil {
- ctx = context.Background()
- }
- packetConn, remoteAddr, err := t.udpDialer(ctx)
- if err != nil {
- return nil, errors.Trace(err)
- }
- session, err := dialQUIC(
- ctx,
- packetConn,
- remoteAddr,
- t.quicSNIAddress,
- versionNumber)
- if err != nil {
- packetConn.Close()
- return nil, errors.Trace(err)
- }
- // dialQUIC uses quic-go.DialContext as we must create our own UDP sockets to
- // set properties such as BIND_TO_DEVICE. However, when DialContext is used,
- // quic-go does not take responsibiity for closing the underlying packetConn
- // when the QUIC session is closed.
- //
- // We track the most recent packetConn in QUICTransporter and close it:
- // - when CloseIdleConnections is called, as it is by psiphon.MeekConn when
- // it is closing;
- // - here in dialFunc, with the assumption that only one concurrent QUIC
- // session is used per h2quic.RoundTripper.
- //
- // This code also assume no concurrent calls to dialFunc, as otherwise a race
- // condition exists between closePacketConn and Store.
- t.closePacketConn()
- t.packetConn.Store(packetConn)
- return session, nil
- }
- // The following code provides support for using both gQUIC and IETF QUIC,
- // which are implemented in two different branches (now forks) of quic-go.
- //
- // dialQUIC uses the appropriate quic-go and returns quicSession which wraps
- // either a ietf_quic.Session or gquic.Session.
- //
- // muxPacketConn provides a multiplexing listener that directs packets to
- // either a ietf_quic.Listener or a gquic.Listener based on the content of the
- // packet.
- type quicListener interface {
- Close() error
- Accept() (quicSession, error)
- }
- type quicSession interface {
- io.Closer
- LocalAddr() net.Addr
- RemoteAddr() net.Addr
- AcceptStream() (quicStream, error)
- OpenStream() (quicStream, error)
- isErrorIndicatingClosed(err error) bool
- }
- type quicStream interface {
- io.Reader
- io.Writer
- io.Closer
- SetReadDeadline(t time.Time) error
- SetWriteDeadline(t time.Time) error
- SetDeadline(t time.Time) error
- }
- type quicRoundTripper interface {
- http.RoundTripper
- Close() error
- }
- type ietfQUICListener struct {
- ietf_quic.Listener
- }
- func (l *ietfQUICListener) Accept() (quicSession, error) {
- // A specific context is not provided since the interface needs to match the
- // gquic-go API, which lacks context support.
- session, err := l.Listener.Accept(context.Background())
- if err != nil {
- return nil, errors.Trace(err)
- }
- return &ietfQUICSession{Session: session}, nil
- }
- type ietfQUICSession struct {
- ietf_quic.Session
- }
- func (s *ietfQUICSession) AcceptStream() (quicStream, error) {
- // A specific context is not provided since the interface needs to match the
- // gquic-go API, which lacks context support.
- //
- // TODO: once gQUIC support is retired, this context may be used in place
- // of the deferredAcceptStream mechanism.
- stream, err := s.Session.AcceptStream(context.Background())
- if err != nil {
- return nil, errors.Trace(err)
- }
- return stream, nil
- }
- func (s *ietfQUICSession) OpenStream() (quicStream, error) {
- return s.Session.OpenStream()
- }
- func (s *ietfQUICSession) isErrorIndicatingClosed(err error) bool {
- if err == nil {
- return false
- }
- errStr := err.Error()
- // The target error is of type *qerr.QuicError, but is not exported.
- return errStr == "Application error 0x0" ||
- errStr == "NO_ERROR: No recent network activity"
- }
- type gQUICListener struct {
- gquic.Listener
- }
- func (l *gQUICListener) Accept() (quicSession, error) {
- session, err := l.Listener.Accept()
- if err != nil {
- return nil, errors.Trace(err)
- }
- return &gQUICSession{Session: session}, nil
- }
- type gQUICSession struct {
- gquic.Session
- }
- func (s *gQUICSession) AcceptStream() (quicStream, error) {
- stream, err := s.Session.AcceptStream()
- if err != nil {
- return nil, errors.Trace(err)
- }
- return stream, nil
- }
- func (s *gQUICSession) OpenStream() (quicStream, error) {
- return s.Session.OpenStream()
- }
- func (s *gQUICSession) isErrorIndicatingClosed(err error) bool {
- if err == nil {
- return false
- }
- if quicErr, ok := err.(*qerr.QuicError); ok {
- switch quicErr.ErrorCode {
- case qerr.PeerGoingAway, qerr.NetworkIdleTimeout:
- return true
- }
- }
- return false
- }
- func dialQUIC(
- ctx context.Context,
- packetConn net.PacketConn,
- remoteAddr *net.UDPAddr,
- quicSNIAddress string,
- versionNumber uint32) (quicSession, error) {
- if isIETFVersion(versionNumber) {
- quicConfig := &ietf_quic.Config{
- HandshakeTimeout: time.Duration(1<<63 - 1),
- IdleTimeout: CLIENT_IDLE_TIMEOUT,
- KeepAlive: true,
- Versions: []ietf_quic.VersionNumber{
- ietf_quic.VersionNumber(versionNumber)},
- }
- deadline, ok := ctx.Deadline()
- if ok {
- quicConfig.HandshakeTimeout = time.Until(deadline)
- }
- dialSession, err := ietf_quic.DialContext(
- ctx,
- packetConn,
- remoteAddr,
- quicSNIAddress,
- &tls.Config{
- InsecureSkipVerify: true,
- NextProtos: []string{getALPN(versionNumber)},
- },
- quicConfig)
- if err != nil {
- return nil, errors.Trace(err)
- }
- return &ietfQUICSession{Session: dialSession}, nil
- } else {
- quicConfig := &gquic.Config{
- HandshakeTimeout: time.Duration(1<<63 - 1),
- IdleTimeout: CLIENT_IDLE_TIMEOUT,
- KeepAlive: true,
- Versions: []gquic.VersionNumber{
- gquic.VersionNumber(versionNumber)},
- }
- deadline, ok := ctx.Deadline()
- if ok {
- quicConfig.HandshakeTimeout = time.Until(deadline)
- }
- dialSession, err := gquic.DialContext(
- ctx,
- packetConn,
- remoteAddr,
- quicSNIAddress,
- &tls.Config{
- InsecureSkipVerify: true,
- },
- quicConfig)
- if err != nil {
- return nil, errors.Trace(err)
- }
- return &gQUICSession{Session: dialSession}, nil
- }
- }
- const (
- muxPacketQueueSize = 128
- muxPacketBufferSize = 1452 // quic-go.MaxReceivePacketSize
- )
- type packet struct {
- addr net.Addr
- size int
- data []byte
- }
- // muxPacketConn delivers packets to a specific quic-go listener.
- type muxPacketConn struct {
- localAddr net.Addr
- listener *muxListener
- packets chan *packet
- }
- func newMuxPacketConn(localAddr net.Addr, listener *muxListener) *muxPacketConn {
- return &muxPacketConn{
- localAddr: localAddr,
- listener: listener,
- packets: make(chan *packet, muxPacketQueueSize),
- }
- }
- func (conn *muxPacketConn) ReadFrom(b []byte) (int, net.Addr, error) {
- select {
- case p := <-conn.packets:
- // If b is too short, the packet is truncated. This won't happen as long as
- // muxPacketBufferSize matches quic-go.MaxReceivePacketSize.
- copy(b, p.data[0:p.size])
- n := p.size
- addr := p.addr
- // Clear and replace packet buffer.
- p.size = 0
- conn.listener.packets <- p
- return n, addr, nil
- case <-conn.listener.stopBroadcast:
- return 0, nil, io.EOF
- }
- }
- func (conn *muxPacketConn) WriteTo(b []byte, addr net.Addr) (int, error) {
- return conn.listener.conn.WriteTo(b, addr)
- }
- func (conn *muxPacketConn) Close() error {
- // This Close won't unblock Read/Write operations or propagate the Close
- // signal up to muxListener. The correct way to shutdown is to call
- // muxListener.Close.
- return nil
- }
- func (conn *muxPacketConn) LocalAddr() net.Addr {
- return conn.localAddr
- }
- func (conn *muxPacketConn) SetDeadline(t time.Time) error {
- return errors.TraceNew("not supported")
- }
- func (conn *muxPacketConn) SetReadDeadline(t time.Time) error {
- return errors.TraceNew("not supported")
- }
- func (conn *muxPacketConn) SetWriteDeadline(t time.Time) error {
- return errors.TraceNew("not supported")
- }
- // muxListener is a multiplexing packet conn listener which relays packets to
- // multiple quic-go listeners.
- type muxListener struct {
- logger common.Logger
- isClosed int32
- runWaitGroup *sync.WaitGroup
- stopBroadcast chan struct{}
- conn *ObfuscatedPacketConn
- packets chan *packet
- acceptedSessions chan quicSession
- ietfQUICConn *muxPacketConn
- ietfQUICListener quicListener
- gQUICConn *muxPacketConn
- gQUICListener quicListener
- }
- func newMuxListener(
- logger common.Logger,
- conn *ObfuscatedPacketConn,
- tlsCertificate tls.Certificate) (*muxListener, error) {
- listener := &muxListener{
- logger: logger,
- runWaitGroup: new(sync.WaitGroup),
- stopBroadcast: make(chan struct{}),
- conn: conn,
- packets: make(chan *packet, muxPacketQueueSize),
- acceptedSessions: make(chan quicSession, 2), // 1 per listener
- }
- // All packet relay buffers are allocated in advance.
- for i := 0; i < muxPacketQueueSize; i++ {
- listener.packets <- &packet{data: make([]byte, muxPacketBufferSize)}
- }
- listener.ietfQUICConn = newMuxPacketConn(conn.LocalAddr(), listener)
- tlsConfig := &tls.Config{
- Certificates: []tls.Certificate{tlsCertificate},
- NextProtos: []string{getALPN(ietfQUICDraft24VersionNumber)},
- }
- ietfQUICConfig := &ietf_quic.Config{
- HandshakeTimeout: SERVER_HANDSHAKE_TIMEOUT,
- IdleTimeout: serverIdleTimeout,
- MaxIncomingStreams: 1,
- MaxIncomingUniStreams: -1,
- KeepAlive: true,
- }
- il, err := ietf_quic.Listen(listener.ietfQUICConn, tlsConfig, ietfQUICConfig)
- if err != nil {
- return nil, errors.Trace(err)
- }
- listener.ietfQUICListener = &ietfQUICListener{Listener: il}
- listener.gQUICConn = newMuxPacketConn(conn.LocalAddr(), listener)
- tlsConfig = &tls.Config{
- Certificates: []tls.Certificate{tlsCertificate},
- }
- gQUICConfig := &gquic.Config{
- HandshakeTimeout: SERVER_HANDSHAKE_TIMEOUT,
- IdleTimeout: serverIdleTimeout,
- MaxIncomingStreams: 1,
- MaxIncomingUniStreams: -1,
- KeepAlive: true,
- }
- gl, err := gquic.Listen(listener.gQUICConn, tlsConfig, gQUICConfig)
- if err != nil {
- listener.ietfQUICListener.Close()
- return nil, errors.Trace(err)
- }
- listener.gQUICListener = &gQUICListener{Listener: gl}
- listener.runWaitGroup.Add(3)
- go listener.relayPackets()
- go listener.relayAcceptedSessions(listener.gQUICListener)
- go listener.relayAcceptedSessions(listener.ietfQUICListener)
- return listener, nil
- }
- func (listener *muxListener) relayPackets() {
- defer listener.runWaitGroup.Done()
- for {
- var p *packet
- select {
- case p = <-listener.packets:
- case <-listener.stopBroadcast:
- return
- }
- // Read network packets. The DPI functionality of the obfuscation layer
- // identifies the type of QUIC, gQUIC or IETF, in addition to identifying
- // and processing obfuscation. This type information determines which
- // quic-go receives the packet.
- //
- // Network errors are not relayed to quic-go, as it will shut down the
- // server on any error returned from ReadFrom, even net.Error.Temporary()
- // errors.
- var isIETF bool
- var err error
- p.size, p.addr, isIETF, err = listener.conn.readFromWithType(p.data)
- if err != nil {
- if listener.logger != nil {
- message := "readFromWithType failed"
- if e, ok := err.(net.Error); ok && e.Temporary() {
- listener.logger.WithTraceFields(
- common.LogFields{"error": err}).Debug(message)
- } else {
- listener.logger.WithTraceFields(
- common.LogFields{"error": err}).Warning(message)
- }
- }
- // TODO: propagate non-temporary errors to Accept?
- listener.packets <- p
- continue
- }
- // Send the packet to the correct quic-go. The packet is dropped if the
- // target quic-go packet queue is full.
- if isIETF {
- select {
- case listener.ietfQUICConn.packets <- p:
- default:
- listener.packets <- p
- }
- } else {
- select {
- case listener.gQUICConn.packets <- p:
- default:
- listener.packets <- p
- }
- }
- }
- }
- func (listener *muxListener) relayAcceptedSessions(l quicListener) {
- defer listener.runWaitGroup.Done()
- for {
- session, err := l.Accept()
- if err != nil {
- if listener.logger != nil {
- message := "Accept failed"
- if e, ok := err.(net.Error); ok && e.Temporary() {
- listener.logger.WithTraceFields(
- common.LogFields{"error": err}).Debug(message)
- } else {
- listener.logger.WithTraceFields(
- common.LogFields{"error": err}).Warning(message)
- }
- }
- // TODO: propagate non-temporary errors to Accept?
- select {
- case <-listener.stopBroadcast:
- return
- default:
- }
- continue
- }
- select {
- case listener.acceptedSessions <- session:
- case <-listener.stopBroadcast:
- return
- }
- }
- }
- func (listener *muxListener) Accept() (quicSession, error) {
- select {
- case conn := <-listener.acceptedSessions:
- return conn, nil
- case <-listener.stopBroadcast:
- return nil, errors.TraceNew("closed")
- }
- }
- func (listener *muxListener) Close() error {
- // Ensure close channel only called once.
- if !atomic.CompareAndSwapInt32(&listener.isClosed, 0, 1) {
- return nil
- }
- close(listener.stopBroadcast)
- var retErr error
- err := listener.gQUICListener.Close()
- if err != nil && retErr == nil {
- retErr = errors.Trace(err)
- }
- err = listener.ietfQUICListener.Close()
- if err != nil && retErr == nil {
- retErr = errors.Trace(err)
- }
- err = listener.conn.Close()
- if err != nil && retErr == nil {
- retErr = errors.Trace(err)
- }
- listener.runWaitGroup.Wait()
- return retErr
- }
- func (listener *muxListener) Addr() net.Addr {
- return listener.conn.LocalAddr()
- }
|