|
|
@@ -46,6 +46,7 @@ import (
|
|
|
"fmt"
|
|
|
"io"
|
|
|
"net"
|
|
|
+ "net/http"
|
|
|
"sync"
|
|
|
"sync/atomic"
|
|
|
"time"
|
|
|
@@ -58,6 +59,8 @@ import (
|
|
|
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/h2quic"
|
|
|
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic/gquic-go/qerr"
|
|
|
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/values"
|
|
|
+ ietf_quic "github.com/Psiphon-Labs/quic-go"
|
|
|
+ "github.com/Psiphon-Labs/quic-go/http3"
|
|
|
)
|
|
|
|
|
|
const (
|
|
|
@@ -66,12 +69,34 @@ const (
|
|
|
CLIENT_IDLE_TIMEOUT = 30 * time.Second
|
|
|
)
|
|
|
|
|
|
+const ietfQUICDraft24VersionNumber = 0xff000018
|
|
|
+
|
|
|
+var supportedVersionNumbers = map[string]uint32{
|
|
|
+ protocol.QUIC_VERSION_GQUIC39: uint32(gquic.VersionGQUIC39),
|
|
|
+ protocol.QUIC_VERSION_GQUIC43: uint32(gquic.VersionGQUIC43),
|
|
|
+ protocol.QUIC_VERSION_GQUIC44: uint32(gquic.VersionGQUIC44),
|
|
|
+ protocol.QUIC_VERSION_OBFUSCATED: uint32(gquic.VersionGQUIC43),
|
|
|
+ protocol.QUIC_VERSION_IETF_DRAFT24: ietfQUICDraft24VersionNumber,
|
|
|
+}
|
|
|
+
|
|
|
+func isObfuscated(quicVersion string) bool {
|
|
|
+ return quicVersion == protocol.QUIC_VERSION_OBFUSCATED
|
|
|
+}
|
|
|
+
|
|
|
+func isIETFVersion(versionNumber uint32) bool {
|
|
|
+ return versionNumber == ietfQUICDraft24VersionNumber
|
|
|
+}
|
|
|
+
|
|
|
+func getALPN(versionNumber uint32) string {
|
|
|
+ return "h3-24"
|
|
|
+}
|
|
|
+
|
|
|
// quic_test overrides the server idle timeout.
|
|
|
var serverIdleTimeout = SERVER_IDLE_TIMEOUT
|
|
|
|
|
|
// Listener is a net.Listener.
|
|
|
type Listener struct {
|
|
|
- gquic.Listener
|
|
|
+ *muxListener
|
|
|
}
|
|
|
|
|
|
// Listen creates a new Listener.
|
|
|
@@ -92,18 +117,6 @@ func Listen(
|
|
|
return nil, errors.Trace(err)
|
|
|
}
|
|
|
|
|
|
- tlsConfig := &tls.Config{
|
|
|
- Certificates: []tls.Certificate{tlsCertificate},
|
|
|
- }
|
|
|
-
|
|
|
- quicConfig := &gquic.Config{
|
|
|
- HandshakeTimeout: SERVER_HANDSHAKE_TIMEOUT,
|
|
|
- IdleTimeout: serverIdleTimeout,
|
|
|
- MaxIncomingStreams: 1,
|
|
|
- MaxIncomingUniStreams: -1,
|
|
|
- KeepAlive: true,
|
|
|
- }
|
|
|
-
|
|
|
addr, err := net.ResolveUDPAddr("udp", address)
|
|
|
if err != nil {
|
|
|
return nil, errors.Trace(err)
|
|
|
@@ -116,27 +129,26 @@ func Listen(
|
|
|
|
|
|
seed, err := prng.NewSeed()
|
|
|
if err != nil {
|
|
|
+ udpConn.Close()
|
|
|
return nil, errors.Trace(err)
|
|
|
}
|
|
|
|
|
|
- var packetConn net.PacketConn
|
|
|
- packetConn, err = NewObfuscatedPacketConn(
|
|
|
- udpConn, true, obfuscationKey, seed)
|
|
|
+ obfuscatedPacketConn, err := NewObfuscatedPacketConn(udpConn, true, obfuscationKey, seed)
|
|
|
if err != nil {
|
|
|
+ udpConn.Close()
|
|
|
return nil, errors.Trace(err)
|
|
|
}
|
|
|
|
|
|
- // This wrapping must be outermost to ensure that all
|
|
|
- // ReadFrom errors are intercepted and logged.
|
|
|
- packetConn = newLoggingPacketConn(logger, packetConn)
|
|
|
+ // Note that, due to nature of muxListener, full accepts may happen before
|
|
|
+ // return and caller calls Accept.
|
|
|
|
|
|
- quicListener, err := gquic.Listen(
|
|
|
- packetConn, tlsConfig, quicConfig)
|
|
|
+ listener, err := newMuxListener(logger, obfuscatedPacketConn, tlsCertificate)
|
|
|
if err != nil {
|
|
|
+ obfuscatedPacketConn.Close()
|
|
|
return nil, errors.Trace(err)
|
|
|
}
|
|
|
|
|
|
- return &Listener{Listener: quicListener}, nil
|
|
|
+ return &Listener{muxListener: listener}, nil
|
|
|
}
|
|
|
|
|
|
// Accept returns a net.Conn that wraps a single QUIC session and stream. The
|
|
|
@@ -145,7 +157,7 @@ func Listen(
|
|
|
// net.Conn will perform the blocking AcceptStream.
|
|
|
func (listener *Listener) Accept() (net.Conn, error) {
|
|
|
|
|
|
- session, err := listener.Listener.Accept()
|
|
|
+ session, err := listener.muxListener.Accept()
|
|
|
if err != nil {
|
|
|
return nil, errors.Trace(err)
|
|
|
}
|
|
|
@@ -156,13 +168,6 @@ func (listener *Listener) Accept() (net.Conn, error) {
|
|
|
}, nil
|
|
|
}
|
|
|
|
|
|
-var supportedVersionNumbers = map[string]gquic.VersionNumber{
|
|
|
- protocol.QUIC_VERSION_GQUIC39: gquic.VersionGQUIC39,
|
|
|
- protocol.QUIC_VERSION_GQUIC43: gquic.VersionGQUIC43,
|
|
|
- protocol.QUIC_VERSION_GQUIC44: gquic.VersionGQUIC44,
|
|
|
- protocol.QUIC_VERSION_OBFUSCATED: gquic.VersionGQUIC43,
|
|
|
-}
|
|
|
-
|
|
|
// Dial establishes a new QUIC session and stream to the server specified by
|
|
|
// address.
|
|
|
//
|
|
|
@@ -181,29 +186,16 @@ func Dial(
|
|
|
obfuscationKey string,
|
|
|
obfuscationPaddingSeed *prng.Seed) (net.Conn, error) {
|
|
|
|
|
|
- var versions []gquic.VersionNumber
|
|
|
-
|
|
|
- if negotiateQUICVersion != "" {
|
|
|
- versionNumber, ok := supportedVersionNumbers[negotiateQUICVersion]
|
|
|
- if !ok {
|
|
|
- return nil, errors.Tracef("unsupported version: %s", negotiateQUICVersion)
|
|
|
- }
|
|
|
- versions = []gquic.VersionNumber{versionNumber}
|
|
|
- }
|
|
|
-
|
|
|
- quicConfig := &gquic.Config{
|
|
|
- HandshakeTimeout: time.Duration(1<<63 - 1),
|
|
|
- IdleTimeout: CLIENT_IDLE_TIMEOUT,
|
|
|
- KeepAlive: true,
|
|
|
- Versions: versions,
|
|
|
+ if negotiateQUICVersion == "" {
|
|
|
+ return nil, errors.TraceNew("missing version")
|
|
|
}
|
|
|
|
|
|
- deadline, ok := ctx.Deadline()
|
|
|
- if ok {
|
|
|
- quicConfig.HandshakeTimeout = time.Until(deadline)
|
|
|
+ versionNumber, ok := supportedVersionNumbers[negotiateQUICVersion]
|
|
|
+ if !ok {
|
|
|
+ return nil, errors.Tracef("unsupported version: %s", negotiateQUICVersion)
|
|
|
}
|
|
|
|
|
|
- if negotiateQUICVersion == protocol.QUIC_VERSION_OBFUSCATED {
|
|
|
+ if isObfuscated(negotiateQUICVersion) {
|
|
|
var err error
|
|
|
packetConn, err = NewObfuscatedPacketConn(
|
|
|
packetConn, false, obfuscationKey, obfuscationPaddingSeed)
|
|
|
@@ -212,13 +204,12 @@ func Dial(
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- session, err := gquic.DialContext(
|
|
|
+ session, err := dialQUIC(
|
|
|
ctx,
|
|
|
packetConn,
|
|
|
remoteAddr,
|
|
|
quicSNIAddress,
|
|
|
- &tls.Config{InsecureSkipVerify: true},
|
|
|
- quicConfig)
|
|
|
+ versionNumber)
|
|
|
if err != nil {
|
|
|
packetConn.Close()
|
|
|
return nil, errors.Trace(err)
|
|
|
@@ -272,13 +263,13 @@ func Dial(
|
|
|
// Conn is a net.Conn and psiphon/common.Closer.
|
|
|
type Conn struct {
|
|
|
packetConn net.PacketConn
|
|
|
- session gquic.Session
|
|
|
+ session quicSession
|
|
|
|
|
|
deferredAcceptStream bool
|
|
|
|
|
|
acceptMutex sync.Mutex
|
|
|
acceptErr error
|
|
|
- stream gquic.Stream
|
|
|
+ stream quicStream
|
|
|
|
|
|
readMutex sync.Mutex
|
|
|
writeMutex sync.Mutex
|
|
|
@@ -321,12 +312,13 @@ func (conn *Conn) Read(b []byte) (int, error) {
|
|
|
|
|
|
// Add mutex to provide full net.Conn concurrency semantics.
|
|
|
// https://github.com/lucas-clemente/quic-go/blob/9cc23135d0477baf83aa4715de39ae7070039cb2/stream.go#L64
|
|
|
- // "Read() and Write() may be called concurrently, but multiple calls to Read() or Write() individually must be synchronized manually."
|
|
|
+ // "Read() and Write() may be called concurrently, but multiple calls to
|
|
|
+ // "Read() or Write() individually must be synchronized manually."
|
|
|
conn.readMutex.Lock()
|
|
|
defer conn.readMutex.Unlock()
|
|
|
|
|
|
n, err := conn.stream.Read(b)
|
|
|
- if isErrorIndicatingClosed(err) {
|
|
|
+ if conn.session.isErrorIndicatingClosed(err) {
|
|
|
_ = conn.Close()
|
|
|
err = io.EOF
|
|
|
}
|
|
|
@@ -346,7 +338,7 @@ func (conn *Conn) Write(b []byte) (int, error) {
|
|
|
defer conn.writeMutex.Unlock()
|
|
|
|
|
|
n, err := conn.stream.Write(b)
|
|
|
- if isErrorIndicatingClosed(err) {
|
|
|
+ if conn.session.isErrorIndicatingClosed(err) {
|
|
|
_ = conn.Close()
|
|
|
if n == len(b) {
|
|
|
err = nil
|
|
|
@@ -415,92 +407,11 @@ func (conn *Conn) SetWriteDeadline(t time.Time) error {
|
|
|
return conn.stream.SetWriteDeadline(t)
|
|
|
}
|
|
|
|
|
|
-func isErrorIndicatingClosed(err error) bool {
|
|
|
- if err != nil {
|
|
|
- if quicErr, ok := err.(*qerr.QuicError); ok {
|
|
|
- switch quicErr.ErrorCode {
|
|
|
- case qerr.PeerGoingAway, qerr.NetworkIdleTimeout:
|
|
|
- return true
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return false
|
|
|
-}
|
|
|
-
|
|
|
-// loggingPacketConn is a workaround for issues in the quic-go server (as of
|
|
|
-// revision ffdfa1).
|
|
|
-//
|
|
|
-// 1. quic-go will shutdown the QUIC server on any error returned from
|
|
|
-// ReadFrom, even net.Error.Temporary() errors.
|
|
|
-//
|
|
|
-// 2. The server shutdown hangs due to a mutex deadlock:
|
|
|
-//
|
|
|
-// sync.(*RWMutex).Lock+0x2c /usr/local/go/src/sync/rwmutex.go:93
|
|
|
-// [...]/lucas-clemente/quic-go.(*packetHandlerMap).CloseServer+0x41 [...]/lucas-clemente/quic-go/packet_handler_map.go:77
|
|
|
-// [...]/lucas-clemente/quic-go.(*server).closeWithMutex+0x37 [...]/lucas-clemente/quic-go/server.go:314
|
|
|
-// [...]/lucas-clemente/quic-go.(*server).closeWithError+0xa2 [...]/lucas-clemente/quic-go/server.go:336
|
|
|
-// [...]/lucas-clemente/quic-go.(*packetHandlerMap).close+0x1da [...]/lucas-clemente/quic-go/packet_handler_map.go:115
|
|
|
-// [...]/lucas-clemente/quic-go.(*packetHandlerMap).listen+0x230 [...]/lucas-clemente/quic-go/packet_handler_map.go:130
|
|
|
-//
|
|
|
-// packetHandlerMap.CloseServer is attempting to lock the same mutex that
|
|
|
-// is already locked in packetHandlerMap.close, which deadlocks. As
|
|
|
-// packetHandlerMap and its mutex are used by all client sessions, this
|
|
|
-// effectively hangs the entire server.
|
|
|
-//
|
|
|
-// loggingPacketConn log ReadFrom errors and returns any usable values or
|
|
|
-// loops and calls ReadFrom again. In practise, due to the nature of UDP
|
|
|
-// sockets, ReadFrom errors are exceptional as they will most likely not occur
|
|
|
-// due to network transmission failures. ObfuscatedPacketConn returns errors
|
|
|
-// that could be due to network transmission failures that corrupt packets;
|
|
|
-// these are marked as net.Error.Temporary() and loggingPacketConn logs these
|
|
|
-// at debug level.
|
|
|
-//
|
|
|
-// loggingPacketConn assumes quic-go revision ffdfa1 behavior and will break
|
|
|
-// other behavior, such as setting deadlines and expecting net.Error.Timeout()
|
|
|
-// errors from ReadFrom.
|
|
|
-type loggingPacketConn struct {
|
|
|
- net.PacketConn
|
|
|
- logger common.Logger
|
|
|
-}
|
|
|
-
|
|
|
-func newLoggingPacketConn(
|
|
|
- logger common.Logger,
|
|
|
- packetConn net.PacketConn) *loggingPacketConn {
|
|
|
-
|
|
|
- return &loggingPacketConn{
|
|
|
- PacketConn: packetConn,
|
|
|
- logger: logger,
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func (conn *loggingPacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
|
|
|
-
|
|
|
- for {
|
|
|
- n, addr, err := conn.PacketConn.ReadFrom(p)
|
|
|
-
|
|
|
- if err != nil && conn.logger != nil {
|
|
|
- message := "ReadFrom failed"
|
|
|
- if e, ok := err.(net.Error); ok && e.Temporary() {
|
|
|
- conn.logger.WithTraceFields(
|
|
|
- common.LogFields{"error": err}).Debug(message)
|
|
|
- } else {
|
|
|
- conn.logger.WithTraceFields(
|
|
|
- common.LogFields{"error": err}).Warning(message)
|
|
|
- }
|
|
|
- }
|
|
|
- err = nil
|
|
|
-
|
|
|
- if n > 0 || addr != nil {
|
|
|
- return n, addr, nil
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
// QUICTransporter implements the psiphon.transporter interface, used in
|
|
|
// psiphon.MeekConn for HTTP requests, which requires a RoundTripper and
|
|
|
// CloseIdleConnections.
|
|
|
type QUICTransporter struct {
|
|
|
- *h2quic.RoundTripper
|
|
|
+ quicRoundTripper
|
|
|
noticeEmitter func(string)
|
|
|
udpDialer func(ctx context.Context) (net.PacketConn, *net.UDPAddr, error)
|
|
|
quicSNIAddress string
|
|
|
@@ -517,7 +428,12 @@ func NewQUICTransporter(
|
|
|
noticeEmitter func(string),
|
|
|
udpDialer func(ctx context.Context) (net.PacketConn, *net.UDPAddr, error),
|
|
|
quicSNIAddress string,
|
|
|
- negotiateQUICVersion string) *QUICTransporter {
|
|
|
+ negotiateQUICVersion string) (*QUICTransporter, error) {
|
|
|
+
|
|
|
+ versionNumber, ok := supportedVersionNumbers[negotiateQUICVersion]
|
|
|
+ if !ok {
|
|
|
+ return nil, errors.Tracef("unsupported version: %s", negotiateQUICVersion)
|
|
|
+ }
|
|
|
|
|
|
t := &QUICTransporter{
|
|
|
noticeEmitter: noticeEmitter,
|
|
|
@@ -527,9 +443,13 @@ func NewQUICTransporter(
|
|
|
ctx: ctx,
|
|
|
}
|
|
|
|
|
|
- t.RoundTripper = &h2quic.RoundTripper{Dial: t.dialQUIC}
|
|
|
+ if isIETFVersion(versionNumber) {
|
|
|
+ t.quicRoundTripper = &http3.RoundTripper{Dial: t.dialIETFQUIC}
|
|
|
+ } else {
|
|
|
+ t.quicRoundTripper = &h2quic.RoundTripper{Dial: t.dialgQUIC}
|
|
|
+ }
|
|
|
|
|
|
- return t
|
|
|
+ return t, nil
|
|
|
}
|
|
|
|
|
|
func (t *QUICTransporter) SetRequestContext(ctx context.Context) {
|
|
|
@@ -539,13 +459,18 @@ func (t *QUICTransporter) SetRequestContext(ctx context.Context) {
|
|
|
t.ctx = ctx
|
|
|
}
|
|
|
|
|
|
-// CloseIdleConnections wraps h2quic.RoundTripper.Close, which provides the
|
|
|
+// CloseIdleConnections wraps QUIC RoundTripper.Close, which provides the
|
|
|
// necessary functionality for psiphon.transporter as used by
|
|
|
// psiphon.MeekConn. Note that, unlike http.Transport.CloseIdleConnections,
|
|
|
// the connections are closed regardless of idle status.
|
|
|
func (t *QUICTransporter) CloseIdleConnections() {
|
|
|
+
|
|
|
+ // This operation doesn't prevent a concurrent http3.client.dial from
|
|
|
+ // establishing a new packet conn; we also rely on the request context to
|
|
|
+ // fully interrupt and stop a http3.RoundTripper.
|
|
|
+
|
|
|
t.closePacketConn()
|
|
|
- t.RoundTripper.Close()
|
|
|
+ t.quicRoundTripper.Close()
|
|
|
}
|
|
|
|
|
|
func (t *QUICTransporter) closePacketConn() {
|
|
|
@@ -555,30 +480,39 @@ func (t *QUICTransporter) closePacketConn() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (t *QUICTransporter) dialQUIC(
|
|
|
- _, _ string, _ *tls.Config, _ *gquic.Config) (retSession gquic.Session, retErr error) {
|
|
|
+func (t *QUICTransporter) dialIETFQUIC(
|
|
|
+ _, _ string, _ *tls.Config, _ *ietf_quic.Config) (ietf_quic.Session, error) {
|
|
|
+ session, err := t.dialQUIC()
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+ return session.(*ietfQUICSession).Session, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (t *QUICTransporter) dialgQUIC(
|
|
|
+ _, _ string, _ *tls.Config, _ *gquic.Config) (gquic.Session, error) {
|
|
|
+ session, err := t.dialQUIC()
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+ return session.(*gQUICSession).Session, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (t *QUICTransporter) dialQUIC() (retSession quicSession, retErr error) {
|
|
|
|
|
|
defer func() {
|
|
|
if retErr != nil && t.noticeEmitter != nil {
|
|
|
- t.noticeEmitter(fmt.Sprintf("dialQUIC failed: %s", retErr))
|
|
|
+ t.noticeEmitter(fmt.Sprintf("QUICTransporter.dialQUIC failed: %s", retErr))
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
- var versions []gquic.VersionNumber
|
|
|
-
|
|
|
- if t.negotiateQUICVersion != "" {
|
|
|
- versionNumber, ok := supportedVersionNumbers[t.negotiateQUICVersion]
|
|
|
- if !ok {
|
|
|
- return nil, errors.Tracef("unsupported version: %s", t.negotiateQUICVersion)
|
|
|
- }
|
|
|
- versions = []gquic.VersionNumber{versionNumber}
|
|
|
+ if t.negotiateQUICVersion == "" {
|
|
|
+ return nil, errors.TraceNew("missing version")
|
|
|
}
|
|
|
|
|
|
- quicConfig := &gquic.Config{
|
|
|
- HandshakeTimeout: time.Duration(1<<63 - 1),
|
|
|
- IdleTimeout: CLIENT_IDLE_TIMEOUT,
|
|
|
- KeepAlive: true,
|
|
|
- Versions: versions,
|
|
|
+ versionNumber, ok := supportedVersionNumbers[t.negotiateQUICVersion]
|
|
|
+ if !ok {
|
|
|
+ return nil, errors.Tracef("unsupported version: %s", t.negotiateQUICVersion)
|
|
|
}
|
|
|
|
|
|
t.mutex.Lock()
|
|
|
@@ -593,21 +527,20 @@ func (t *QUICTransporter) dialQUIC(
|
|
|
return nil, errors.Trace(err)
|
|
|
}
|
|
|
|
|
|
- session, err := gquic.DialContext(
|
|
|
+ session, err := dialQUIC(
|
|
|
ctx,
|
|
|
packetConn,
|
|
|
remoteAddr,
|
|
|
t.quicSNIAddress,
|
|
|
- &tls.Config{InsecureSkipVerify: true},
|
|
|
- quicConfig)
|
|
|
+ versionNumber)
|
|
|
if err != nil {
|
|
|
packetConn.Close()
|
|
|
return nil, errors.Trace(err)
|
|
|
}
|
|
|
|
|
|
- // We use gquic.DialContext as we must create our own UDP sockets to set
|
|
|
- // properties such as BIND_TO_DEVICE. However, when DialContext is used,
|
|
|
- // gquic does not take responsibiity for closing the underlying packetConn
|
|
|
+ // dialQUIC uses quic-go.DialContext as we must create our own UDP sockets to
|
|
|
+ // set properties such as BIND_TO_DEVICE. However, when DialContext is used,
|
|
|
+ // quic-go does not take responsibiity for closing the underlying packetConn
|
|
|
// when the QUIC session is closed.
|
|
|
//
|
|
|
// We track the most recent packetConn in QUICTransporter and close it:
|
|
|
@@ -624,3 +557,487 @@ func (t *QUICTransporter) dialQUIC(
|
|
|
|
|
|
return session, nil
|
|
|
}
|
|
|
+
|
|
|
+// The following code provides support for using both gQUIC and IETF QUIC,
|
|
|
+// which are implemented in two different branches (now forks) of quic-go.
|
|
|
+//
|
|
|
+// dialQUIC uses the appropriate quic-go and returns quicSession which wraps
|
|
|
+// either a ietf_quic.Session or gquic.Session.
|
|
|
+//
|
|
|
+// muxPacketConn provides a multiplexing listener that directs packets to
|
|
|
+// either a ietf_quic.Listener or a gquic.Listener based on the content of the
|
|
|
+// packet.
|
|
|
+
|
|
|
+type quicListener interface {
|
|
|
+ Close() error
|
|
|
+ Accept() (quicSession, error)
|
|
|
+}
|
|
|
+
|
|
|
+type quicSession interface {
|
|
|
+ io.Closer
|
|
|
+ LocalAddr() net.Addr
|
|
|
+ RemoteAddr() net.Addr
|
|
|
+ AcceptStream() (quicStream, error)
|
|
|
+ OpenStream() (quicStream, error)
|
|
|
+ isErrorIndicatingClosed(err error) bool
|
|
|
+}
|
|
|
+
|
|
|
+type quicStream interface {
|
|
|
+ io.Reader
|
|
|
+ io.Writer
|
|
|
+ io.Closer
|
|
|
+ SetReadDeadline(t time.Time) error
|
|
|
+ SetWriteDeadline(t time.Time) error
|
|
|
+ SetDeadline(t time.Time) error
|
|
|
+}
|
|
|
+
|
|
|
+type quicRoundTripper interface {
|
|
|
+ http.RoundTripper
|
|
|
+ Close() error
|
|
|
+}
|
|
|
+
|
|
|
+type ietfQUICListener struct {
|
|
|
+ ietf_quic.Listener
|
|
|
+}
|
|
|
+
|
|
|
+func (l *ietfQUICListener) Accept() (quicSession, error) {
|
|
|
+ // A specific context is not provided since the interface needs to match the
|
|
|
+ // gquic-go API, which lacks context support.
|
|
|
+ session, err := l.Listener.Accept(context.Background())
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+ return &ietfQUICSession{Session: session}, nil
|
|
|
+}
|
|
|
+
|
|
|
+type ietfQUICSession struct {
|
|
|
+ ietf_quic.Session
|
|
|
+}
|
|
|
+
|
|
|
+func (s *ietfQUICSession) AcceptStream() (quicStream, error) {
|
|
|
+ // A specific context is not provided since the interface needs to match the
|
|
|
+ // gquic-go API, which lacks context support.
|
|
|
+ //
|
|
|
+ // TODO: once gQUIC support is retired, this context may be used in place
|
|
|
+ // of the deferredAcceptStream mechanism.
|
|
|
+ stream, err := s.Session.AcceptStream(context.Background())
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+ return stream, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (s *ietfQUICSession) OpenStream() (quicStream, error) {
|
|
|
+ return s.Session.OpenStream()
|
|
|
+}
|
|
|
+
|
|
|
+func (s *ietfQUICSession) isErrorIndicatingClosed(err error) bool {
|
|
|
+ if err == nil {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ // The target error is of type *qerr.QuicError, but is not exported.
|
|
|
+ return err.Error() == "Application error 0x0"
|
|
|
+}
|
|
|
+
|
|
|
+type gQUICListener struct {
|
|
|
+ gquic.Listener
|
|
|
+}
|
|
|
+
|
|
|
+func (l *gQUICListener) Accept() (quicSession, error) {
|
|
|
+ session, err := l.Listener.Accept()
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+ return &gQUICSession{Session: session}, nil
|
|
|
+}
|
|
|
+
|
|
|
+type gQUICSession struct {
|
|
|
+ gquic.Session
|
|
|
+}
|
|
|
+
|
|
|
+func (s *gQUICSession) AcceptStream() (quicStream, error) {
|
|
|
+ stream, err := s.Session.AcceptStream()
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+ return stream, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (s *gQUICSession) OpenStream() (quicStream, error) {
|
|
|
+ return s.Session.OpenStream()
|
|
|
+}
|
|
|
+
|
|
|
+func (s *gQUICSession) isErrorIndicatingClosed(err error) bool {
|
|
|
+ if err == nil {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ if quicErr, ok := err.(*qerr.QuicError); ok {
|
|
|
+ switch quicErr.ErrorCode {
|
|
|
+ case qerr.PeerGoingAway, qerr.NetworkIdleTimeout:
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false
|
|
|
+}
|
|
|
+
|
|
|
+func dialQUIC(
|
|
|
+ ctx context.Context,
|
|
|
+ packetConn net.PacketConn,
|
|
|
+ remoteAddr *net.UDPAddr,
|
|
|
+ quicSNIAddress string,
|
|
|
+ versionNumber uint32) (quicSession, error) {
|
|
|
+
|
|
|
+ if isIETFVersion(versionNumber) {
|
|
|
+
|
|
|
+ quicConfig := &ietf_quic.Config{
|
|
|
+ HandshakeTimeout: time.Duration(1<<63 - 1),
|
|
|
+ IdleTimeout: CLIENT_IDLE_TIMEOUT,
|
|
|
+ KeepAlive: true,
|
|
|
+ Versions: []ietf_quic.VersionNumber{
|
|
|
+ ietf_quic.VersionNumber(versionNumber)},
|
|
|
+ }
|
|
|
+
|
|
|
+ deadline, ok := ctx.Deadline()
|
|
|
+ if ok {
|
|
|
+ quicConfig.HandshakeTimeout = time.Until(deadline)
|
|
|
+ }
|
|
|
+
|
|
|
+ dialSession, err := ietf_quic.DialContext(
|
|
|
+ ctx,
|
|
|
+ packetConn,
|
|
|
+ remoteAddr,
|
|
|
+ quicSNIAddress,
|
|
|
+ &tls.Config{
|
|
|
+ InsecureSkipVerify: true,
|
|
|
+ NextProtos: []string{getALPN(versionNumber)},
|
|
|
+ },
|
|
|
+ quicConfig)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ return &ietfQUICSession{Session: dialSession}, nil
|
|
|
+
|
|
|
+ } else {
|
|
|
+
|
|
|
+ quicConfig := &gquic.Config{
|
|
|
+ HandshakeTimeout: time.Duration(1<<63 - 1),
|
|
|
+ IdleTimeout: CLIENT_IDLE_TIMEOUT,
|
|
|
+ KeepAlive: true,
|
|
|
+ Versions: []gquic.VersionNumber{
|
|
|
+ gquic.VersionNumber(versionNumber)},
|
|
|
+ }
|
|
|
+
|
|
|
+ deadline, ok := ctx.Deadline()
|
|
|
+ if ok {
|
|
|
+ quicConfig.HandshakeTimeout = time.Until(deadline)
|
|
|
+ }
|
|
|
+
|
|
|
+ dialSession, err := gquic.DialContext(
|
|
|
+ ctx,
|
|
|
+ packetConn,
|
|
|
+ remoteAddr,
|
|
|
+ quicSNIAddress,
|
|
|
+ &tls.Config{
|
|
|
+ InsecureSkipVerify: true,
|
|
|
+ },
|
|
|
+ quicConfig)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ return &gQUICSession{Session: dialSession}, nil
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+const (
|
|
|
+ muxPacketQueueSize = 128
|
|
|
+ muxPacketBufferSize = 1452 // quic-go.MaxReceivePacketSize
|
|
|
+)
|
|
|
+
|
|
|
+type packet struct {
|
|
|
+ addr net.Addr
|
|
|
+ size int
|
|
|
+ data []byte
|
|
|
+}
|
|
|
+
|
|
|
+// muxPacketConn delivers packets to a specific quic-go listener.
|
|
|
+type muxPacketConn struct {
|
|
|
+ localAddr net.Addr
|
|
|
+ listener *muxListener
|
|
|
+ packets chan *packet
|
|
|
+}
|
|
|
+
|
|
|
+func newMuxPacketConn(localAddr net.Addr, listener *muxListener) *muxPacketConn {
|
|
|
+ return &muxPacketConn{
|
|
|
+ localAddr: localAddr,
|
|
|
+ listener: listener,
|
|
|
+ packets: make(chan *packet, muxPacketQueueSize),
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (conn *muxPacketConn) ReadFrom(b []byte) (int, net.Addr, error) {
|
|
|
+
|
|
|
+ select {
|
|
|
+ case p := <-conn.packets:
|
|
|
+
|
|
|
+ // If b is too short, the packet is truncated. This won't happen as long as
|
|
|
+ // muxPacketBufferSize matches quic-go.MaxReceivePacketSize.
|
|
|
+ copy(b, p.data[0:p.size])
|
|
|
+ n := p.size
|
|
|
+ addr := p.addr
|
|
|
+
|
|
|
+ // Clear and replace packet buffer.
|
|
|
+ p.size = 0
|
|
|
+ conn.listener.packets <- p
|
|
|
+
|
|
|
+ return n, addr, nil
|
|
|
+ case <-conn.listener.stopBroadcast:
|
|
|
+ return 0, nil, io.EOF
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (conn *muxPacketConn) WriteTo(b []byte, addr net.Addr) (int, error) {
|
|
|
+ return conn.listener.conn.WriteTo(b, addr)
|
|
|
+}
|
|
|
+
|
|
|
+func (conn *muxPacketConn) Close() error {
|
|
|
+ // This Close won't unblock Read/Write operations or propagate the Close
|
|
|
+ // signal up to muxListener. The correct way to shutdown is to call
|
|
|
+ // muxListener.Close.
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (conn *muxPacketConn) LocalAddr() net.Addr {
|
|
|
+ return conn.localAddr
|
|
|
+}
|
|
|
+
|
|
|
+func (conn *muxPacketConn) SetDeadline(t time.Time) error {
|
|
|
+ return errors.TraceNew("not supported")
|
|
|
+}
|
|
|
+
|
|
|
+func (conn *muxPacketConn) SetReadDeadline(t time.Time) error {
|
|
|
+ return errors.TraceNew("not supported")
|
|
|
+}
|
|
|
+
|
|
|
+func (conn *muxPacketConn) SetWriteDeadline(t time.Time) error {
|
|
|
+ return errors.TraceNew("not supported")
|
|
|
+}
|
|
|
+
|
|
|
+// muxListener is a multiplexing packet conn listener which relays packets to
|
|
|
+// multiple quic-go listeners.
|
|
|
+type muxListener struct {
|
|
|
+ logger common.Logger
|
|
|
+ isClosed int32
|
|
|
+ runWaitGroup *sync.WaitGroup
|
|
|
+ stopBroadcast chan struct{}
|
|
|
+ conn *ObfuscatedPacketConn
|
|
|
+ packets chan *packet
|
|
|
+ acceptedSessions chan quicSession
|
|
|
+ ietfQUICConn *muxPacketConn
|
|
|
+ ietfQUICListener quicListener
|
|
|
+ gQUICConn *muxPacketConn
|
|
|
+ gQUICListener quicListener
|
|
|
+}
|
|
|
+
|
|
|
+func newMuxListener(
|
|
|
+ logger common.Logger,
|
|
|
+ conn *ObfuscatedPacketConn,
|
|
|
+ tlsCertificate tls.Certificate) (*muxListener, error) {
|
|
|
+
|
|
|
+ listener := &muxListener{
|
|
|
+ logger: logger,
|
|
|
+ runWaitGroup: new(sync.WaitGroup),
|
|
|
+ stopBroadcast: make(chan struct{}),
|
|
|
+ conn: conn,
|
|
|
+ packets: make(chan *packet, muxPacketQueueSize),
|
|
|
+ acceptedSessions: make(chan quicSession, 2), // 1 per listener
|
|
|
+ }
|
|
|
+
|
|
|
+ // All packet relay buffers are allocated in advance.
|
|
|
+ for i := 0; i < muxPacketQueueSize; i++ {
|
|
|
+ listener.packets <- &packet{data: make([]byte, muxPacketBufferSize)}
|
|
|
+ }
|
|
|
+
|
|
|
+ listener.ietfQUICConn = newMuxPacketConn(conn.LocalAddr(), listener)
|
|
|
+
|
|
|
+ tlsConfig := &tls.Config{
|
|
|
+ Certificates: []tls.Certificate{tlsCertificate},
|
|
|
+ NextProtos: []string{getALPN(ietfQUICDraft24VersionNumber)},
|
|
|
+ }
|
|
|
+
|
|
|
+ ietfQUICConfig := &ietf_quic.Config{
|
|
|
+ HandshakeTimeout: SERVER_HANDSHAKE_TIMEOUT,
|
|
|
+ IdleTimeout: serverIdleTimeout,
|
|
|
+ MaxIncomingStreams: 1,
|
|
|
+ MaxIncomingUniStreams: -1,
|
|
|
+ KeepAlive: true,
|
|
|
+ }
|
|
|
+
|
|
|
+ il, err := ietf_quic.Listen(listener.ietfQUICConn, tlsConfig, ietfQUICConfig)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+ listener.ietfQUICListener = &ietfQUICListener{Listener: il}
|
|
|
+
|
|
|
+ listener.gQUICConn = newMuxPacketConn(conn.LocalAddr(), listener)
|
|
|
+
|
|
|
+ tlsConfig = &tls.Config{
|
|
|
+ Certificates: []tls.Certificate{tlsCertificate},
|
|
|
+ }
|
|
|
+
|
|
|
+ gQUICConfig := &gquic.Config{
|
|
|
+ HandshakeTimeout: SERVER_HANDSHAKE_TIMEOUT,
|
|
|
+ IdleTimeout: serverIdleTimeout,
|
|
|
+ MaxIncomingStreams: 1,
|
|
|
+ MaxIncomingUniStreams: -1,
|
|
|
+ KeepAlive: true,
|
|
|
+ }
|
|
|
+
|
|
|
+ gl, err := gquic.Listen(listener.gQUICConn, tlsConfig, gQUICConfig)
|
|
|
+ if err != nil {
|
|
|
+ listener.ietfQUICListener.Close()
|
|
|
+ return nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+ listener.gQUICListener = &gQUICListener{Listener: gl}
|
|
|
+
|
|
|
+ listener.runWaitGroup.Add(3)
|
|
|
+ go listener.relayPackets()
|
|
|
+ go listener.relayAcceptedSessions(listener.gQUICListener)
|
|
|
+ go listener.relayAcceptedSessions(listener.ietfQUICListener)
|
|
|
+
|
|
|
+ return listener, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (listener *muxListener) relayPackets() {
|
|
|
+ defer listener.runWaitGroup.Done()
|
|
|
+
|
|
|
+ for {
|
|
|
+
|
|
|
+ var p *packet
|
|
|
+ select {
|
|
|
+ case p = <-listener.packets:
|
|
|
+ case <-listener.stopBroadcast:
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Read network packets. The DPI functionality of the obfuscation layer
|
|
|
+ // identifies the type of QUIC, gQUIC or IETF, in addition to identifying
|
|
|
+ // and processing obfuscation. This type information determines which
|
|
|
+ // quic-go receives the packet.
|
|
|
+ //
|
|
|
+ // Network errors are not relayed to quic-go, as it will shut down the
|
|
|
+ // server on any error returned from ReadFrom, even net.Error.Temporary()
|
|
|
+ // errors.
|
|
|
+
|
|
|
+ var isIETF bool
|
|
|
+ var err error
|
|
|
+ p.size, p.addr, isIETF, err = listener.conn.readFromWithType(p.data)
|
|
|
+ if err != nil {
|
|
|
+ if listener.logger != nil {
|
|
|
+ message := "readFromWithType failed"
|
|
|
+ if e, ok := err.(net.Error); ok && e.Temporary() {
|
|
|
+ listener.logger.WithTraceFields(
|
|
|
+ common.LogFields{"error": err}).Debug(message)
|
|
|
+ } else {
|
|
|
+ listener.logger.WithTraceFields(
|
|
|
+ common.LogFields{"error": err}).Warning(message)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // TODO: propagate non-temporary errors to Accept?
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ // Send the packet to the correct quic-go. The packet is dropped if the
|
|
|
+ // target quic-go packet queue is full.
|
|
|
+
|
|
|
+ if isIETF {
|
|
|
+ select {
|
|
|
+ case listener.ietfQUICConn.packets <- p:
|
|
|
+ default:
|
|
|
+ listener.packets <- p
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ select {
|
|
|
+ case listener.gQUICConn.packets <- p:
|
|
|
+ default:
|
|
|
+ listener.packets <- p
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (listener *muxListener) relayAcceptedSessions(l quicListener) {
|
|
|
+ defer listener.runWaitGroup.Done()
|
|
|
+ for {
|
|
|
+ session, err := l.Accept()
|
|
|
+ if err != nil {
|
|
|
+ if listener.logger != nil {
|
|
|
+ message := "Accept failed"
|
|
|
+ if e, ok := err.(net.Error); ok && e.Temporary() {
|
|
|
+ listener.logger.WithTraceFields(
|
|
|
+ common.LogFields{"error": err}).Debug(message)
|
|
|
+ } else {
|
|
|
+ listener.logger.WithTraceFields(
|
|
|
+ common.LogFields{"error": err}).Warning(message)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // TODO: propagate non-temporary errors to Accept?
|
|
|
+ select {
|
|
|
+ case <-listener.stopBroadcast:
|
|
|
+ return
|
|
|
+ default:
|
|
|
+ }
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ select {
|
|
|
+ case listener.acceptedSessions <- session:
|
|
|
+ case <-listener.stopBroadcast:
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (listener *muxListener) Accept() (quicSession, error) {
|
|
|
+ select {
|
|
|
+ case conn := <-listener.acceptedSessions:
|
|
|
+ return conn, nil
|
|
|
+ case <-listener.stopBroadcast:
|
|
|
+ return nil, errors.TraceNew("closed")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (listener *muxListener) Close() error {
|
|
|
+
|
|
|
+ // Ensure close channel only called once.
|
|
|
+ if !atomic.CompareAndSwapInt32(&listener.isClosed, 0, 1) {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ close(listener.stopBroadcast)
|
|
|
+
|
|
|
+ var retErr error
|
|
|
+
|
|
|
+ err := listener.gQUICListener.Close()
|
|
|
+ if err != nil && retErr == nil {
|
|
|
+ retErr = errors.Trace(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ err = listener.ietfQUICListener.Close()
|
|
|
+ if err != nil && retErr == nil {
|
|
|
+ retErr = errors.Trace(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ err = listener.conn.Close()
|
|
|
+ if err != nil && retErr == nil {
|
|
|
+ retErr = errors.Trace(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ listener.runWaitGroup.Wait()
|
|
|
+
|
|
|
+ return retErr
|
|
|
+}
|
|
|
+
|
|
|
+func (listener *muxListener) Addr() net.Addr {
|
|
|
+ return listener.conn.LocalAddr()
|
|
|
+}
|