| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129 |
- /*
- * Copyright (c) 2015, Psiphon Inc.
- * All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- */
- package psiphon
- import (
- "bytes"
- "context"
- "crypto/rand"
- golangtls "crypto/tls"
- "encoding/base64"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "net"
- "net/http"
- "net/url"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/Psiphon-Inc/goarista/monotime"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/nacl/box"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tls"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/upstreamproxy"
- "golang.org/x/net/http2"
- )
- // MeekConn is based on meek-client.go from Tor and Psiphon:
- //
- // https://gitweb.torproject.org/pluggable-transports/meek.git/blob/HEAD:/meek-client/meek-client.go
- // CC0 1.0 Universal
- //
- // https://bitbucket.org/psiphon/psiphon-circumvention-system/src/default/go/meek-client/meek-client.go
- const (
- MEEK_PROTOCOL_VERSION = 3
- MEEK_COOKIE_MAX_PADDING = 32
- MAX_SEND_PAYLOAD_LENGTH = 65536
- FULL_RECEIVE_BUFFER_LENGTH = 4194304
- READ_PAYLOAD_CHUNK_LENGTH = 65536
- LIMITED_FULL_RECEIVE_BUFFER_LENGTH = 131072
- LIMITED_READ_PAYLOAD_CHUNK_LENGTH = 4096
- MIN_POLL_INTERVAL = 100 * time.Millisecond
- MIN_POLL_INTERVAL_JITTER = 0.3
- MAX_POLL_INTERVAL = 5 * time.Second
- MAX_POLL_INTERVAL_JITTER = 0.1
- POLL_INTERVAL_MULTIPLIER = 1.5
- POLL_INTERVAL_JITTER = 0.1
- MEEK_ROUND_TRIP_RETRY_DEADLINE = 5 * time.Second
- MEEK_ROUND_TRIP_RETRY_MIN_DELAY = 50 * time.Millisecond
- MEEK_ROUND_TRIP_RETRY_MAX_DELAY = 1000 * time.Millisecond
- MEEK_ROUND_TRIP_RETRY_MULTIPLIER = 2
- MEEK_ROUND_TRIP_TIMEOUT = 20 * time.Second
- )
- // MeekConfig specifies the behavior of a MeekConn
- type MeekConfig struct {
- // LimitBufferSizes indicates whether to use smaller buffers to
- // conserve memory.
- LimitBufferSizes bool
- // DialAddress is the actual network address to dial to establish a
- // connection to the meek server. This may be either a fronted or
- // direct address. The address must be in the form "host:port",
- // where host may be a domain name or IP address.
- DialAddress string
- // UseHTTPS indicates whether to use HTTPS (true) or HTTP (false).
- UseHTTPS bool
- // TLSProfile specifies the TLS profile to use for all underlying
- // TLS connections created by this meek connection. Valid values
- // are the possible values for CustomTLSConfig.TLSProfile.
- // TLSProfile will be used only when DialConfig.UseIndistinguishableTLS
- // is set in the DialConfig passed in to DialMeek.
- TLSProfile string
- // UseObfuscatedSessionTickets indicates whether to use obfuscated
- // session tickets. Assumes UseHTTPS is true.
- UseObfuscatedSessionTickets bool
- // SNIServerName is the value to place in the TLS SNI server_name
- // field when HTTPS is used.
- SNIServerName string
- // HostHeader is the value to place in the HTTP request Host header.
- HostHeader string
- // TransformedHostName records whether a hostname transformation is
- // in effect. This value is used for stats reporting.
- TransformedHostName bool
- // ClientTunnelProtocol is the protocol the client is using. It's
- // included in the meek cookie for optional use by the server, in
- // cases where the server cannot unambiguously determine the
- // tunnel protocol.
- ClientTunnelProtocol string
- // The following values are used to create the obfuscated meek cookie.
- PsiphonServerAddress string
- SessionID string
- MeekCookieEncryptionPublicKey string
- MeekObfuscatedKey string
- }
- // MeekConn is a network connection that tunnels TCP over HTTP and supports "fronting". Meek sends
- // client->server flow in HTTP request bodies and receives server->client flow in HTTP response bodies.
- // Polling is used to achieve full duplex TCP.
- //
- // Fronting is an obfuscation technique in which the connection
- // to a web server, typically a CDN, is indistinguishable from any other HTTPS connection to the generic
- // "fronting domain" -- the HTTP Host header is used to route the requests to the actual destination.
- // See https://trac.torproject.org/projects/tor/wiki/doc/meek for more details.
- //
- // MeekConn also operates in unfronted mode, in which plain HTTP connections are made without routing
- // through a CDN.
- type MeekConn struct {
- url *url.URL
- additionalHeaders http.Header
- cookie *http.Cookie
- cachedTLSDialer *cachedTLSDialer
- transport transporter
- mutex sync.Mutex
- isClosed bool
- runCtx context.Context
- stopRunning context.CancelFunc
- relayWaitGroup *sync.WaitGroup
- fullReceiveBufferLength int
- readPayloadChunkLength int
- emptyReceiveBuffer chan *bytes.Buffer
- partialReceiveBuffer chan *bytes.Buffer
- fullReceiveBuffer chan *bytes.Buffer
- emptySendBuffer chan *bytes.Buffer
- partialSendBuffer chan *bytes.Buffer
- fullSendBuffer chan *bytes.Buffer
- }
- // transporter is implemented by both http.Transport and upstreamproxy.ProxyAuthTransport.
- type transporter interface {
- CloseIdleConnections()
- RoundTrip(req *http.Request) (resp *http.Response, err error)
- }
- // DialMeek returns an initialized meek connection. A meek connection is
- // an HTTP session which does not depend on an underlying socket connection (although
- // persistent HTTP connections are used for performance). This function does not
- // wait for the connection to be "established" before returning. A goroutine
- // is spawned which will eventually start HTTP polling.
- // When frontingAddress is not "", fronting is used. This option assumes caller has
- // already checked server entry capabilities.
- func DialMeek(
- ctx context.Context,
- meekConfig *MeekConfig,
- dialConfig *DialConfig) (meek *MeekConn, err error) {
- runCtx, stopRunning := context.WithCancel(context.Background())
- cleanupStopRunning := true
- cleanupCachedTLSDialer := true
- var cachedTLSDialer *cachedTLSDialer
- // Cleanup in error cases
- defer func() {
- if cleanupStopRunning {
- stopRunning()
- }
- if cleanupCachedTLSDialer && cachedTLSDialer != nil {
- cachedTLSDialer.close()
- }
- }()
- // Configure transport: HTTP or HTTPS
- var scheme string
- var transport transporter
- var additionalHeaders http.Header
- var proxyUrl func(*http.Request) (*url.URL, error)
- if meekConfig.UseHTTPS {
- // Custom TLS dialer:
- //
- // 1. ignores the HTTP request address and uses the fronting domain
- // 2. optionally disables SNI -- SNI breaks fronting when used with certain CDNs.
- // 3. skips verifying the server cert.
- //
- // Reasoning for #3:
- //
- // With a TLS MiM attack in place, and server certs verified, we'll fail to connect because the client
- // will refuse to connect. That's not a successful outcome.
- //
- // With a MiM attack in place, and server certs not verified, we'll fail to connect if the MiM is actively
- // targeting Psiphon and classifying the HTTP traffic by Host header or payload signature.
- //
- // However, in the case of a passive MiM that's just recording traffic or an active MiM that's targeting
- // something other than Psiphon, the client will connect. This is a successful outcome.
- //
- // What is exposed to the MiM? The Host header does not contain a Psiphon server IP address, just an
- // unrelated, randomly generated domain name which cannot be used to block direct connections. The
- // Psiphon server IP is sent over meek, but it's in the encrypted cookie.
- //
- // The payload (user traffic) gets its confidentiality and integrity from the underlying SSH protocol.
- // So, nothing is leaked to the MiM apart from signatures which could be used to classify the traffic
- // as Psiphon to possibly block it; but note that not revealing that the client is Psiphon is outside
- // our threat model; we merely seek to evade mass blocking by taking steps that require progressively
- // more effort to block.
- //
- // There is a subtle attack remaining: an adversary that can MiM some CDNs but not others (and so can
- // classify Psiphon traffic on some CDNs but not others) may throttle non-MiM CDNs so that our server
- // selection always chooses tunnels to the MiM CDN (without any server cert verification, we won't
- // exclusively connect to non-MiM CDNs); then the adversary kills the underlying TCP connection after
- // some short period. This is mitigated by the "impaired" protocol classification mechanism.
- scheme = "https"
- tlsConfig := &CustomTLSConfig{
- DialAddr: meekConfig.DialAddress,
- Dial: NewTCPDialer(dialConfig),
- SNIServerName: meekConfig.SNIServerName,
- SkipVerify: true,
- UseIndistinguishableTLS: dialConfig.UseIndistinguishableTLS,
- TLSProfile: meekConfig.TLSProfile,
- TrustedCACertificatesFilename: dialConfig.TrustedCACertificatesFilename,
- }
- if meekConfig.UseObfuscatedSessionTickets {
- tlsConfig.ObfuscatedSessionTicketKey = meekConfig.MeekObfuscatedKey
- }
- tlsDialer := NewCustomTLSDialer(tlsConfig)
- // Pre-dial one TLS connection in order to inspect the negotiated
- // application protocol. Then we create an HTTP/2 or HTTP/1.1 transport
- // depending on which protocol was negotiated. The TLS dialer
- // is assumed to negotiate only "h2" or "http/1.1"; or not negotiate
- // an application protocol.
- //
- // We cannot rely on net/http's HTTP/2 support since it's only
- // activated when http.Transport.DialTLS returns a golang crypto/tls.Conn;
- // e.g., https://github.com/golang/go/blob/c8aec4095e089ff6ac50d18e97c3f46561f14f48/src/net/http/transport.go#L1040
- //
- // The pre-dialed connection is stored in a cachedTLSDialer, which will
- // return the cached pre-dialed connection to its first Dial caller, and
- // use the tlsDialer for all other Dials.
- //
- // cachedTLSDialer.close() must be called on all exits paths from this
- // function and in meek.Close() to ensure the cached conn is closed in
- // any case where no Dial call is made.
- //
- // The pre-dial must be interruptible so that DialMeek doesn't block and
- // hang/delay a shutdown or end of establishment. So the pre-dial uses
- // the Controller's PendingConns, not the MeekConn PendingConns. For this
- // purpose, a special preDialer is configured.
- //
- // Only one pre-dial attempt is made; there are no retries. This differs
- // from roundTrip, which retries and may redial for each retry. Retries
- // at the pre-dial phase are less useful since there's no active session
- // to preserve, and establishment will simply try another server. Note
- // that the underlying TCPDial may still try multiple IP addreses when
- // the destination is a domain and ir resolves to multiple IP adresses.
- // The pre-dial is made within the parent dial context, so that DialMeek
- // may be interrupted. Subsequent dials are made within the meek round trip
- // request context. Since http.DialTLS doesn't take a context argument
- // (yet; as of Go 1.9 this issue is still open: https://github.com/golang/go/issues/21526),
- // cachedTLSDialer is used as a conduit to send the request context.
- // meekConn.roundTrip sets its request context into cachedTLSDialer, and
- // cachedTLSDialer.dial uses that context.
- // As DialAddr is set in the CustomTLSConfig, no address is required here.
- preConn, err := tlsDialer(ctx, "tcp", "")
- if err != nil {
- return nil, common.ContextError(err)
- }
- isHTTP2 := false
- if tlsConn, ok := preConn.(*tls.Conn); ok {
- state := tlsConn.ConnectionState()
- if state.NegotiatedProtocolIsMutual &&
- state.NegotiatedProtocol == "h2" {
- isHTTP2 = true
- }
- }
- cachedTLSDialer = newCachedTLSDialer(preConn, tlsDialer)
- if isHTTP2 {
- NoticeInfo("negotiated HTTP/2 for %s", meekConfig.DialAddress)
- transport = &http2.Transport{
- DialTLS: func(network, addr string, _ *golangtls.Config) (net.Conn, error) {
- return cachedTLSDialer.dial(network, addr)
- },
- }
- } else {
- transport = &http.Transport{
- DialTLS: func(network, addr string) (net.Conn, error) {
- return cachedTLSDialer.dial(network, addr)
- },
- }
- }
- } else {
- scheme = "http"
- // The dialer ignores address that http.Transport will pass in (derived
- // from the HTTP request URL) and always dials meekConfig.DialAddress.
- dialer := func(ctx context.Context, network, _ string) (net.Conn, error) {
- return NewTCPDialer(dialConfig)(ctx, network, meekConfig.DialAddress)
- }
- // For HTTP, and when the meekConfig.DialAddress matches the
- // meekConfig.HostHeader, we let http.Transport handle proxying.
- // http.Transport will put the the HTTP server address in the HTTP
- // request line. In this one case, we can use an HTTP proxy that does
- // not offer CONNECT support.
- if strings.HasPrefix(dialConfig.UpstreamProxyUrl, "http://") &&
- (meekConfig.DialAddress == meekConfig.HostHeader ||
- meekConfig.DialAddress == meekConfig.HostHeader+":80") {
- url, err := url.Parse(dialConfig.UpstreamProxyUrl)
- if err != nil {
- return nil, common.ContextError(err)
- }
- proxyUrl = http.ProxyURL(url)
- // Here, the dialer must use the address that http.Transport
- // passes in (which will be proxy address).
- copyDialConfig := new(DialConfig)
- *copyDialConfig = *dialConfig
- copyDialConfig.UpstreamProxyUrl = ""
- dialer = NewTCPDialer(copyDialConfig)
- }
- httpTransport := &http.Transport{
- Proxy: proxyUrl,
- DialContext: dialer,
- }
- if proxyUrl != nil {
- // Wrap transport with a transport that can perform HTTP proxy auth negotiation
- transport, err = upstreamproxy.NewProxyAuthTransport(httpTransport, dialConfig.CustomHeaders)
- if err != nil {
- return nil, common.ContextError(err)
- }
- } else {
- transport = httpTransport
- }
- }
- url := &url.URL{
- Scheme: scheme,
- Host: meekConfig.HostHeader,
- Path: "/",
- }
- if meekConfig.UseHTTPS {
- host, _, err := net.SplitHostPort(meekConfig.DialAddress)
- if err != nil {
- return nil, common.ContextError(err)
- }
- additionalHeaders = map[string][]string{
- "X-Psiphon-Fronting-Address": {host},
- }
- } else {
- if proxyUrl == nil {
- additionalHeaders = dialConfig.CustomHeaders
- }
- }
- cookie, err := makeMeekCookie(meekConfig)
- if err != nil {
- return nil, common.ContextError(err)
- }
- // The main loop of a MeekConn is run in the relay() goroutine.
- // A MeekConn implements net.Conn concurrency semantics:
- // "Multiple goroutines may invoke methods on a Conn simultaneously."
- //
- // Read() calls and relay() are synchronized by exchanging control of a single
- // receiveBuffer (bytes.Buffer). This single buffer may be:
- // - in the emptyReceiveBuffer channel when it is available and empty;
- // - in the partialReadBuffer channel when it is available and contains data;
- // - in the fullReadBuffer channel when it is available and full of data;
- // - "checked out" by relay or Read when they are are writing to or reading from the
- // buffer, respectively.
- // relay() will obtain the buffer from either the empty or partial channel but block when
- // the buffer is full. Read will obtain the buffer from the partial or full channel when
- // there is data to read but block when the buffer is empty.
- // Write() calls and relay() are synchronized in a similar way, using a single
- // sendBuffer.
- meek = &MeekConn{
- url: url,
- additionalHeaders: additionalHeaders,
- cookie: cookie,
- cachedTLSDialer: cachedTLSDialer,
- transport: transport,
- isClosed: false,
- runCtx: runCtx,
- stopRunning: stopRunning,
- relayWaitGroup: new(sync.WaitGroup),
- fullReceiveBufferLength: FULL_RECEIVE_BUFFER_LENGTH,
- readPayloadChunkLength: READ_PAYLOAD_CHUNK_LENGTH,
- emptyReceiveBuffer: make(chan *bytes.Buffer, 1),
- partialReceiveBuffer: make(chan *bytes.Buffer, 1),
- fullReceiveBuffer: make(chan *bytes.Buffer, 1),
- emptySendBuffer: make(chan *bytes.Buffer, 1),
- partialSendBuffer: make(chan *bytes.Buffer, 1),
- fullSendBuffer: make(chan *bytes.Buffer, 1),
- }
- // stopRunning and cachedTLSDialer will now be closed in meek.Close()
- cleanupStopRunning = false
- cleanupCachedTLSDialer = false
- meek.emptyReceiveBuffer <- new(bytes.Buffer)
- meek.emptySendBuffer <- new(bytes.Buffer)
- meek.relayWaitGroup.Add(1)
- if meekConfig.LimitBufferSizes {
- meek.fullReceiveBufferLength = LIMITED_FULL_RECEIVE_BUFFER_LENGTH
- meek.readPayloadChunkLength = LIMITED_READ_PAYLOAD_CHUNK_LENGTH
- }
- go meek.relay()
- return meek, nil
- }
- type cachedTLSDialer struct {
- usedCachedConn int32
- cachedConn net.Conn
- requestContext atomic.Value
- dialer Dialer
- }
- func newCachedTLSDialer(cachedConn net.Conn, dialer Dialer) *cachedTLSDialer {
- return &cachedTLSDialer{
- cachedConn: cachedConn,
- dialer: dialer,
- }
- }
- func (c *cachedTLSDialer) setRequestContext(requestContext context.Context) {
- c.requestContext.Store(requestContext)
- }
- func (c *cachedTLSDialer) dial(network, addr string) (net.Conn, error) {
- if atomic.CompareAndSwapInt32(&c.usedCachedConn, 0, 1) {
- conn := c.cachedConn
- c.cachedConn = nil
- return conn, nil
- }
- ctx := c.requestContext.Load().(context.Context)
- if ctx == nil {
- ctx = context.Background()
- }
- return c.dialer(ctx, network, addr)
- }
- func (c *cachedTLSDialer) close() {
- if atomic.CompareAndSwapInt32(&c.usedCachedConn, 0, 1) {
- c.cachedConn.Close()
- c.cachedConn = nil
- }
- }
- // Close terminates the meek connection. Close waits for the relay processing goroutine
- // to stop and releases HTTP transport resources.
- // A mutex is required to support net.Conn concurrency semantics.
- func (meek *MeekConn) Close() (err error) {
- meek.mutex.Lock()
- isClosed := meek.isClosed
- meek.isClosed = true
- meek.mutex.Unlock()
- if !isClosed {
- meek.stopRunning()
- if meek.cachedTLSDialer != nil {
- meek.cachedTLSDialer.close()
- }
- meek.relayWaitGroup.Wait()
- meek.transport.CloseIdleConnections()
- }
- return nil
- }
- // IsClosed implements the Closer iterface. The return value
- // indicates whether the MeekConn has been closed.
- func (meek *MeekConn) IsClosed() bool {
- meek.mutex.Lock()
- isClosed := meek.isClosed
- meek.mutex.Unlock()
- return isClosed
- }
- // Read reads data from the connection.
- // net.Conn Deadlines are ignored. net.Conn concurrency semantics are supported.
- func (meek *MeekConn) Read(buffer []byte) (n int, err error) {
- if meek.IsClosed() {
- return 0, common.ContextError(errors.New("meek connection is closed"))
- }
- // Block until there is received data to consume
- var receiveBuffer *bytes.Buffer
- select {
- case receiveBuffer = <-meek.partialReceiveBuffer:
- case receiveBuffer = <-meek.fullReceiveBuffer:
- case <-meek.runCtx.Done():
- return 0, common.ContextError(errors.New("meek connection has closed"))
- }
- n, err = receiveBuffer.Read(buffer)
- meek.replaceReceiveBuffer(receiveBuffer)
- return n, err
- }
- // Write writes data to the connection.
- // net.Conn Deadlines are ignored. net.Conn concurrency semantics are supported.
- func (meek *MeekConn) Write(buffer []byte) (n int, err error) {
- if meek.IsClosed() {
- return 0, common.ContextError(errors.New("meek connection is closed"))
- }
- // Repeats until all n bytes are written
- n = len(buffer)
- for len(buffer) > 0 {
- // Block until there is capacity in the send buffer
- var sendBuffer *bytes.Buffer
- select {
- case sendBuffer = <-meek.emptySendBuffer:
- case sendBuffer = <-meek.partialSendBuffer:
- case <-meek.runCtx.Done():
- return 0, common.ContextError(errors.New("meek connection has closed"))
- }
- writeLen := MAX_SEND_PAYLOAD_LENGTH - sendBuffer.Len()
- if writeLen > 0 {
- if writeLen > len(buffer) {
- writeLen = len(buffer)
- }
- _, err = sendBuffer.Write(buffer[:writeLen])
- buffer = buffer[writeLen:]
- }
- meek.replaceSendBuffer(sendBuffer)
- }
- return n, err
- }
- // LocalAddr is a stub implementation of net.Conn.LocalAddr
- func (meek *MeekConn) LocalAddr() net.Addr {
- return nil
- }
- // RemoteAddr is a stub implementation of net.Conn.RemoteAddr
- func (meek *MeekConn) RemoteAddr() net.Addr {
- return nil
- }
- // SetDeadline is a stub implementation of net.Conn.SetDeadline
- func (meek *MeekConn) SetDeadline(t time.Time) error {
- return common.ContextError(errors.New("not supported"))
- }
- // SetReadDeadline is a stub implementation of net.Conn.SetReadDeadline
- func (meek *MeekConn) SetReadDeadline(t time.Time) error {
- return common.ContextError(errors.New("not supported"))
- }
- // SetWriteDeadline is a stub implementation of net.Conn.SetWriteDeadline
- func (meek *MeekConn) SetWriteDeadline(t time.Time) error {
- return common.ContextError(errors.New("not supported"))
- }
- func (meek *MeekConn) replaceReceiveBuffer(receiveBuffer *bytes.Buffer) {
- switch {
- case receiveBuffer.Len() == 0:
- meek.emptyReceiveBuffer <- receiveBuffer
- case receiveBuffer.Len() >= meek.fullReceiveBufferLength:
- meek.fullReceiveBuffer <- receiveBuffer
- default:
- meek.partialReceiveBuffer <- receiveBuffer
- }
- }
- func (meek *MeekConn) replaceSendBuffer(sendBuffer *bytes.Buffer) {
- switch {
- case sendBuffer.Len() == 0:
- meek.emptySendBuffer <- sendBuffer
- case sendBuffer.Len() >= MAX_SEND_PAYLOAD_LENGTH:
- meek.fullSendBuffer <- sendBuffer
- default:
- meek.partialSendBuffer <- sendBuffer
- }
- }
- // relay sends and receives tunneled traffic (payload). An HTTP request is
- // triggered when data is in the write queue or at a polling interval.
- // There's a geometric increase, up to a maximum, in the polling interval when
- // no data is exchanged. Only one HTTP request is in flight at a time.
- func (meek *MeekConn) relay() {
- // Note: meek.Close() calls here in relay() are made asynchronously
- // (using goroutines) since Close() will wait on this WaitGroup.
- defer meek.relayWaitGroup.Done()
- interval := common.JitterDuration(
- MIN_POLL_INTERVAL,
- MIN_POLL_INTERVAL_JITTER)
- timeout := time.NewTimer(interval)
- defer timeout.Stop()
- for {
- timeout.Reset(interval)
- // Block until there is payload to send or it is time to poll
- var sendBuffer *bytes.Buffer
- select {
- case sendBuffer = <-meek.partialSendBuffer:
- case sendBuffer = <-meek.fullSendBuffer:
- case <-timeout.C:
- // In the polling case, send an empty payload
- case <-meek.runCtx.Done():
- // Drop through to second Done() check
- }
- // Check Done() again, to ensure it takes precedence
- select {
- case <-meek.runCtx.Done():
- return
- default:
- }
- sendPayloadSize := 0
- if sendBuffer != nil {
- sendPayloadSize = sendBuffer.Len()
- }
- // roundTrip will replace sendBuffer (by calling replaceSendBuffer). This is
- // a compromise to conserve memory. Using a second buffer here, we could copy
- // sendBuffer and immediately replace it, unblocking meekConn.Write() and
- // allowing more upstream payload to immediately enqueue. Instead, the request
- // payload is read directly from sendBuffer, including retries. Only once the
- // server has acknowledged the request payload is sendBuffer replaced. This
- // still allows meekConn.Write() to unblock before the round trip response is
- // read.
- receivedPayloadSize, err := meek.roundTrip(sendBuffer)
- if err != nil {
- select {
- case <-meek.runCtx.Done():
- // In this case, meek.roundTrip encountered Done(). Exit without logging error.
- return
- default:
- }
- NoticeAlert("%s", common.ContextError(err))
- go meek.Close()
- return
- }
- // Calculate polling interval. When data is received,
- // immediately request more. Otherwise, schedule next
- // poll with exponential back off. Jitter and coin
- // flips are used to avoid trivial, static traffic
- // timing patterns.
- if receivedPayloadSize > 0 || sendPayloadSize > 0 {
- interval = 0
- } else if interval == 0 {
- interval = common.JitterDuration(
- MIN_POLL_INTERVAL,
- MIN_POLL_INTERVAL_JITTER)
- } else {
- if common.FlipCoin() {
- interval = common.JitterDuration(
- interval,
- POLL_INTERVAL_JITTER)
- } else {
- interval = common.JitterDuration(
- time.Duration(float64(interval)*POLL_INTERVAL_MULTIPLIER),
- POLL_INTERVAL_JITTER)
- }
- if interval >= MAX_POLL_INTERVAL {
- interval = common.JitterDuration(
- MAX_POLL_INTERVAL,
- MAX_POLL_INTERVAL_JITTER)
- }
- }
- }
- }
- // readCloseSignaller is an io.ReadCloser wrapper for an io.Reader
- // that is passed, as the request body, to http.Transport.RoundTrip.
- // readCloseSignaller adds the AwaitClosed call, which is used
- // to schedule recycling the buffer underlying the reader only after
- // RoundTrip has called Close and will no longer use the buffer.
- // See: https://golang.org/pkg/net/http/#RoundTripper
- type readCloseSignaller struct {
- context context.Context
- reader io.Reader
- closed chan struct{}
- }
- func NewReadCloseSignaller(
- context context.Context,
- reader io.Reader) *readCloseSignaller {
- return &readCloseSignaller{
- context: context,
- reader: reader,
- closed: make(chan struct{}, 1),
- }
- }
- func (r *readCloseSignaller) Read(p []byte) (int, error) {
- return r.reader.Read(p)
- }
- func (r *readCloseSignaller) Close() error {
- select {
- case r.closed <- *new(struct{}):
- default:
- }
- return nil
- }
- func (r *readCloseSignaller) AwaitClosed() bool {
- select {
- case <-r.context.Done():
- case <-r.closed:
- return true
- }
- return false
- }
- // roundTrip configures and makes the actual HTTP POST request
- func (meek *MeekConn) roundTrip(sendBuffer *bytes.Buffer) (int64, error) {
- // Retries are made when the round trip fails. This adds resiliency
- // to connection interruption and intermittent failures.
- //
- // At least one retry is always attempted, and retries continue
- // while still within a brief deadline -- 5 seconds, currently the
- // deadline for an actively probed SSH connection to timeout. There
- // is a brief delay between retries, allowing for intermittent
- // failure states to resolve.
- //
- // Failure may occur at various stages of the HTTP request:
- //
- // 1. Before the request begins. In this case, the entire request
- // may be rerun.
- //
- // 2. While sending the request payload. In this case, the client
- // must resend its request payload. The server will not have
- // relayed its partially received request payload.
- //
- // 3. After sending the request payload but before receiving
- // a response. The client cannot distinguish between case 2 and
- // this case, case 3. The client resends its payload and the
- // server detects this and skips relaying the request payload.
- //
- // 4. While reading the response payload. The client will omit its
- // request payload when retrying, as the server has already
- // acknowledged it. The client will also indicate to the server
- // the amount of response payload already received, and the
- // server will skip resending the indicated amount of response
- // payload.
- //
- // Retries are indicated to the server by adding a Range header,
- // which includes the response payload resend position.
- defer func() {
- // Ensure sendBuffer is replaced, even in error code paths.
- if sendBuffer != nil {
- sendBuffer.Truncate(0)
- meek.replaceSendBuffer(sendBuffer)
- }
- }()
- retries := uint(0)
- retryDeadline := monotime.Now().Add(MEEK_ROUND_TRIP_RETRY_DEADLINE)
- retryDelay := MEEK_ROUND_TRIP_RETRY_MIN_DELAY
- serverAcknowledgedRequestPayload := false
- receivedPayloadSize := int64(0)
- for try := 0; ; try++ {
- // Omit the request payload when retrying after receiving a
- // partial server response.
- var signaller *readCloseSignaller
- var requestBody io.ReadCloser
- contentLength := 0
- if !serverAcknowledgedRequestPayload && sendBuffer != nil {
- // sendBuffer will be replaced once the data is no longer needed,
- // when RoundTrip calls Close on the Body; this allows meekConn.Write()
- // to unblock and start buffering data for the next roung trip while
- // still reading the current round trip response. signaller provides
- // the hook for awaiting RoundTrip's call to Close.
- signaller = NewReadCloseSignaller(meek.runCtx, bytes.NewReader(sendBuffer.Bytes()))
- requestBody = signaller
- contentLength = sendBuffer.Len()
- }
- var request *http.Request
- request, err := http.NewRequest("POST", meek.url.String(), requestBody)
- if err != nil {
- // Don't retry when can't initialize a Request
- return 0, common.ContextError(err)
- }
- // Content-Length won't be set automatically due to the underlying
- // type of requestBody.
- if contentLength > 0 {
- request.ContentLength = int64(contentLength)
- }
- // - meek.stopRunning() will abort a round trip in flight
- // - round trip will abort if it exceeds MEEK_ROUND_TRIP_TIMEOUT
- requestContext, cancelFunc := context.WithTimeout(
- meek.runCtx,
- MEEK_ROUND_TRIP_TIMEOUT)
- defer cancelFunc()
- // Ensure TLS dials are made within the current request context.
- if meek.cachedTLSDialer != nil {
- meek.cachedTLSDialer.setRequestContext(requestContext)
- }
- request = request.WithContext(requestContext)
- meek.addAdditionalHeaders(request)
- request.Header.Set("Content-Type", "application/octet-stream")
- request.AddCookie(meek.cookie)
- expectedStatusCode := http.StatusOK
- // When retrying, add a Range header to indicate how much
- // of the response was already received.
- if try > 0 {
- expectedStatusCode = http.StatusPartialContent
- request.Header.Set("Range", fmt.Sprintf("bytes=%d-", receivedPayloadSize))
- }
- response, err := meek.transport.RoundTrip(request)
- // Wait for RoundTrip to call Close on the request body, when
- // there is one. This is necessary to ensure it's safe to
- // subsequently replace sendBuffer in both the success and
- // error cases.
- if signaller != nil {
- if !signaller.AwaitClosed() {
- // AwaitClosed encountered Done(). Abort immediately. Do not
- // replace sendBuffer, as we cannot be certain RoundTrip is
- // done with it. MeekConn.Write will exit on Done and not hang
- // awaiting sendBuffer.
- sendBuffer = nil
- return 0, common.ContextError(errors.New("meek connection has closed"))
- }
- }
- if err != nil {
- select {
- case <-meek.runCtx.Done():
- // Exit without retrying and without logging error.
- return 0, common.ContextError(err)
- default:
- }
- NoticeAlert("meek round trip failed: %s", err)
- // ...continue to retry
- }
- if err == nil {
- if response.StatusCode != expectedStatusCode &&
- // Certain http servers return 200 OK where we expect 206, so accept that.
- !(expectedStatusCode == http.StatusPartialContent && response.StatusCode == http.StatusOK) {
- // Don't retry when the status code is incorrect
- response.Body.Close()
- return 0, common.ContextError(
- fmt.Errorf(
- "unexpected status code: %d instead of %d",
- response.StatusCode, expectedStatusCode))
- }
- // Update meek session cookie
- for _, c := range response.Cookies() {
- if meek.cookie.Name == c.Name {
- meek.cookie.Value = c.Value
- break
- }
- }
- // Received the response status code, so the server
- // must have received the request payload.
- serverAcknowledgedRequestPayload = true
- // sendBuffer is now no longer required for retries, and the
- // buffer may be replaced; this allows meekConn.Write() to unblock
- // and start buffering data for the next round trip while still
- // reading the current round trip response.
- if sendBuffer != nil {
- // Assumes signaller.AwaitClosed is called above, so
- // sendBuffer will no longer be accessed by RoundTrip.
- sendBuffer.Truncate(0)
- meek.replaceSendBuffer(sendBuffer)
- sendBuffer = nil
- }
- readPayloadSize, err := meek.readPayload(response.Body)
- response.Body.Close()
- // receivedPayloadSize is the number of response
- // payload bytes received and relayed. A retry can
- // resume after this position.
- receivedPayloadSize += readPayloadSize
- if err != nil {
- NoticeAlert("meek read payload failed: %s", err)
- // ...continue to retry
- } else {
- // Round trip completed successfully
- break
- }
- }
- // Release context resources now.
- cancelFunc()
- // Either the request failed entirely, or there was a failure
- // streaming the response payload. Always retry once. Then
- // retry if time remains; when the next delay exceeds the time
- // remaining until the deadline, do not retry.
- now := monotime.Now()
- if retries >= 1 &&
- (now.After(retryDeadline) || retryDeadline.Sub(now) <= retryDelay) {
- return 0, common.ContextError(err)
- }
- retries += 1
- delayTimer := time.NewTimer(retryDelay)
- select {
- case <-delayTimer.C:
- case <-meek.runCtx.Done():
- delayTimer.Stop()
- return 0, common.ContextError(err)
- }
- // Increase the next delay, to back off and avoid excessive
- // activity in conditions such as no network connectivity.
- retryDelay *= MEEK_ROUND_TRIP_RETRY_MULTIPLIER
- if retryDelay >= MEEK_ROUND_TRIP_RETRY_MAX_DELAY {
- retryDelay = MEEK_ROUND_TRIP_RETRY_MAX_DELAY
- }
- }
- return receivedPayloadSize, nil
- }
- // Add additional headers to the HTTP request using the same method we use for adding
- // custom headers to HTTP proxy requests.
- func (meek *MeekConn) addAdditionalHeaders(request *http.Request) {
- for name, value := range meek.additionalHeaders {
- // hack around special case of "Host" header
- // https://golang.org/src/net/http/request.go#L474
- // using URL.Opaque, see URL.RequestURI() https://golang.org/src/net/url/url.go#L915
- if name == "Host" {
- if len(value) > 0 {
- if request.URL.Opaque == "" {
- request.URL.Opaque = request.URL.Scheme + "://" + request.Host + request.URL.RequestURI()
- }
- request.Host = value[0]
- }
- } else {
- request.Header[name] = value
- }
- }
- }
- // readPayload reads the HTTP response in chunks, making the read buffer available
- // to MeekConn.Read() calls after each chunk; the intention is to allow bytes to
- // flow back to the reader as soon as possible instead of buffering the entire payload.
- //
- // When readPayload returns an error, the totalSize output is remains valid -- it's the
- // number of payload bytes successfully read and relayed.
- func (meek *MeekConn) readPayload(
- receivedPayload io.ReadCloser) (totalSize int64, err error) {
- defer receivedPayload.Close()
- totalSize = 0
- for {
- reader := io.LimitReader(receivedPayload, int64(meek.readPayloadChunkLength))
- // Block until there is capacity in the receive buffer
- var receiveBuffer *bytes.Buffer
- select {
- case receiveBuffer = <-meek.emptyReceiveBuffer:
- case receiveBuffer = <-meek.partialReceiveBuffer:
- case <-meek.runCtx.Done():
- return 0, nil
- }
- // Note: receiveBuffer size may exceed meek.fullReceiveBufferLength by up to the size
- // of one received payload. The meek.fullReceiveBufferLength value is just a guideline.
- n, err := receiveBuffer.ReadFrom(reader)
- meek.replaceReceiveBuffer(receiveBuffer)
- totalSize += n
- if err != nil {
- return totalSize, common.ContextError(err)
- }
- if n == 0 {
- break
- }
- }
- return totalSize, nil
- }
- // makeCookie creates the cookie to be sent with initial meek HTTP request.
- // The purpose of the cookie is to send the following to the server:
- // ServerAddress -- the Psiphon Server address the meek server should relay to
- // SessionID -- the Psiphon session ID (used by meek server to relay geolocation
- // information obtained from the CDN through to the Psiphon Server)
- // MeekProtocolVersion -- tells the meek server that this client understands
- // the latest protocol.
- // The server will create a session using these values and send the session ID
- // back to the client via Set-Cookie header. Client must use that value with
- // all consequent HTTP requests
- // In unfronted meek mode, the cookie is visible over the adversary network, so the
- // cookie is encrypted and obfuscated.
- func makeMeekCookie(meekConfig *MeekConfig) (cookie *http.Cookie, err error) {
- // Make the JSON data
- serverAddress := meekConfig.PsiphonServerAddress
- cookieData := &protocol.MeekCookieData{
- ServerAddress: serverAddress,
- SessionID: meekConfig.SessionID,
- MeekProtocolVersion: MEEK_PROTOCOL_VERSION,
- ClientTunnelProtocol: meekConfig.ClientTunnelProtocol,
- }
- serializedCookie, err := json.Marshal(cookieData)
- if err != nil {
- return nil, common.ContextError(err)
- }
- // Encrypt the JSON data
- // NaCl box is used for encryption. The peer public key comes from the server entry.
- // Nonce is always all zeros, and is not sent in the cookie (the server also uses an all-zero nonce).
- // http://nacl.cace-project.eu/box.html:
- // "There is no harm in having the same nonce for different messages if the {sender, receiver} sets are
- // different. This is true even if the sets overlap. For example, a sender can use the same nonce for two
- // different messages if the messages are sent to two different public keys."
- var nonce [24]byte
- var publicKey [32]byte
- decodedPublicKey, err := base64.StdEncoding.DecodeString(meekConfig.MeekCookieEncryptionPublicKey)
- if err != nil {
- return nil, common.ContextError(err)
- }
- copy(publicKey[:], decodedPublicKey)
- ephemeralPublicKey, ephemeralPrivateKey, err := box.GenerateKey(rand.Reader)
- if err != nil {
- return nil, common.ContextError(err)
- }
- box := box.Seal(nil, serializedCookie, &nonce, &publicKey, ephemeralPrivateKey)
- encryptedCookie := make([]byte, 32+len(box))
- copy(encryptedCookie[0:32], ephemeralPublicKey[0:32])
- copy(encryptedCookie[32:], box)
- // Obfuscate the encrypted data
- obfuscator, err := common.NewClientObfuscator(
- &common.ObfuscatorConfig{Keyword: meekConfig.MeekObfuscatedKey, MaxPadding: MEEK_COOKIE_MAX_PADDING})
- if err != nil {
- return nil, common.ContextError(err)
- }
- obfuscatedCookie := obfuscator.SendSeedMessage()
- seedLen := len(obfuscatedCookie)
- obfuscatedCookie = append(obfuscatedCookie, encryptedCookie...)
- obfuscator.ObfuscateClientToServer(obfuscatedCookie[seedLen:])
- // Format the HTTP cookie
- // The format is <random letter 'A'-'Z'>=<base64 data>, which is intended to match common cookie formats.
- A := int('A')
- Z := int('Z')
- // letterIndex is integer in range [int('A'), int('Z')]
- letterIndex, err := common.MakeSecureRandomInt(Z - A + 1)
- if err != nil {
- return nil, common.ContextError(err)
- }
- return &http.Cookie{
- Name: string(byte(A + letterIndex)),
- Value: base64.StdEncoding.EncodeToString(obfuscatedCookie)},
- nil
- }
|