/* * Copyright (c) 2015, Psiphon Inc. * All rights reserved. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . * */ package psiphon import ( "bytes" "context" "crypto/rand" golangtls "crypto/tls" "encoding/base64" "encoding/json" "errors" "fmt" "io" "net" "net/http" "net/url" "strings" "sync" "sync/atomic" "time" "github.com/Psiphon-Inc/goarista/monotime" "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common" "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/nacl/box" "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol" "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tls" "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/upstreamproxy" "golang.org/x/net/http2" ) // MeekConn is based on meek-client.go from Tor and Psiphon: // // https://gitweb.torproject.org/pluggable-transports/meek.git/blob/HEAD:/meek-client/meek-client.go // CC0 1.0 Universal // // https://bitbucket.org/psiphon/psiphon-circumvention-system/src/default/go/meek-client/meek-client.go const ( MEEK_PROTOCOL_VERSION = 3 MEEK_COOKIE_MAX_PADDING = 32 MAX_SEND_PAYLOAD_LENGTH = 65536 FULL_RECEIVE_BUFFER_LENGTH = 4194304 READ_PAYLOAD_CHUNK_LENGTH = 65536 LIMITED_FULL_RECEIVE_BUFFER_LENGTH = 131072 LIMITED_READ_PAYLOAD_CHUNK_LENGTH = 4096 MIN_POLL_INTERVAL = 100 * time.Millisecond MIN_POLL_INTERVAL_JITTER = 0.3 MAX_POLL_INTERVAL = 5 * time.Second MAX_POLL_INTERVAL_JITTER = 0.1 POLL_INTERVAL_MULTIPLIER = 1.5 POLL_INTERVAL_JITTER = 0.1 MEEK_ROUND_TRIP_RETRY_DEADLINE = 5 * time.Second MEEK_ROUND_TRIP_RETRY_MIN_DELAY = 50 * time.Millisecond MEEK_ROUND_TRIP_RETRY_MAX_DELAY = 1000 * time.Millisecond MEEK_ROUND_TRIP_RETRY_MULTIPLIER = 2 MEEK_ROUND_TRIP_TIMEOUT = 20 * time.Second ) // MeekConfig specifies the behavior of a MeekConn type MeekConfig struct { // LimitBufferSizes indicates whether to use smaller buffers to // conserve memory. LimitBufferSizes bool // DialAddress is the actual network address to dial to establish a // connection to the meek server. This may be either a fronted or // direct address. The address must be in the form "host:port", // where host may be a domain name or IP address. DialAddress string // UseHTTPS indicates whether to use HTTPS (true) or HTTP (false). UseHTTPS bool // TLSProfile specifies the TLS profile to use for all underlying // TLS connections created by this meek connection. Valid values // are the possible values for CustomTLSConfig.TLSProfile. // TLSProfile will be used only when DialConfig.UseIndistinguishableTLS // is set in the DialConfig passed in to DialMeek. TLSProfile string // UseObfuscatedSessionTickets indicates whether to use obfuscated // session tickets. Assumes UseHTTPS is true. UseObfuscatedSessionTickets bool // SNIServerName is the value to place in the TLS SNI server_name // field when HTTPS is used. SNIServerName string // HostHeader is the value to place in the HTTP request Host header. HostHeader string // TransformedHostName records whether a hostname transformation is // in effect. This value is used for stats reporting. TransformedHostName bool // ClientTunnelProtocol is the protocol the client is using. It's // included in the meek cookie for optional use by the server, in // cases where the server cannot unambiguously determine the // tunnel protocol. ClientTunnelProtocol string // The following values are used to create the obfuscated meek cookie. PsiphonServerAddress string SessionID string MeekCookieEncryptionPublicKey string MeekObfuscatedKey string } // MeekConn is a network connection that tunnels TCP over HTTP and supports "fronting". Meek sends // client->server flow in HTTP request bodies and receives server->client flow in HTTP response bodies. // Polling is used to achieve full duplex TCP. // // Fronting is an obfuscation technique in which the connection // to a web server, typically a CDN, is indistinguishable from any other HTTPS connection to the generic // "fronting domain" -- the HTTP Host header is used to route the requests to the actual destination. // See https://trac.torproject.org/projects/tor/wiki/doc/meek for more details. // // MeekConn also operates in unfronted mode, in which plain HTTP connections are made without routing // through a CDN. type MeekConn struct { url *url.URL additionalHeaders http.Header cookie *http.Cookie cachedTLSDialer *cachedTLSDialer transport transporter mutex sync.Mutex isClosed bool runCtx context.Context stopRunning context.CancelFunc relayWaitGroup *sync.WaitGroup fullReceiveBufferLength int readPayloadChunkLength int emptyReceiveBuffer chan *bytes.Buffer partialReceiveBuffer chan *bytes.Buffer fullReceiveBuffer chan *bytes.Buffer emptySendBuffer chan *bytes.Buffer partialSendBuffer chan *bytes.Buffer fullSendBuffer chan *bytes.Buffer } // transporter is implemented by both http.Transport and upstreamproxy.ProxyAuthTransport. type transporter interface { CloseIdleConnections() RoundTrip(req *http.Request) (resp *http.Response, err error) } // DialMeek returns an initialized meek connection. A meek connection is // an HTTP session which does not depend on an underlying socket connection (although // persistent HTTP connections are used for performance). This function does not // wait for the connection to be "established" before returning. A goroutine // is spawned which will eventually start HTTP polling. // When frontingAddress is not "", fronting is used. This option assumes caller has // already checked server entry capabilities. func DialMeek( ctx context.Context, meekConfig *MeekConfig, dialConfig *DialConfig) (meek *MeekConn, err error) { runCtx, stopRunning := context.WithCancel(context.Background()) cleanupStopRunning := true cleanupCachedTLSDialer := true var cachedTLSDialer *cachedTLSDialer // Cleanup in error cases defer func() { if cleanupStopRunning { stopRunning() } if cleanupCachedTLSDialer && cachedTLSDialer != nil { cachedTLSDialer.close() } }() // Configure transport: HTTP or HTTPS var scheme string var transport transporter var additionalHeaders http.Header var proxyUrl func(*http.Request) (*url.URL, error) if meekConfig.UseHTTPS { // Custom TLS dialer: // // 1. ignores the HTTP request address and uses the fronting domain // 2. optionally disables SNI -- SNI breaks fronting when used with certain CDNs. // 3. skips verifying the server cert. // // Reasoning for #3: // // With a TLS MiM attack in place, and server certs verified, we'll fail to connect because the client // will refuse to connect. That's not a successful outcome. // // With a MiM attack in place, and server certs not verified, we'll fail to connect if the MiM is actively // targeting Psiphon and classifying the HTTP traffic by Host header or payload signature. // // However, in the case of a passive MiM that's just recording traffic or an active MiM that's targeting // something other than Psiphon, the client will connect. This is a successful outcome. // // What is exposed to the MiM? The Host header does not contain a Psiphon server IP address, just an // unrelated, randomly generated domain name which cannot be used to block direct connections. The // Psiphon server IP is sent over meek, but it's in the encrypted cookie. // // The payload (user traffic) gets its confidentiality and integrity from the underlying SSH protocol. // So, nothing is leaked to the MiM apart from signatures which could be used to classify the traffic // as Psiphon to possibly block it; but note that not revealing that the client is Psiphon is outside // our threat model; we merely seek to evade mass blocking by taking steps that require progressively // more effort to block. // // There is a subtle attack remaining: an adversary that can MiM some CDNs but not others (and so can // classify Psiphon traffic on some CDNs but not others) may throttle non-MiM CDNs so that our server // selection always chooses tunnels to the MiM CDN (without any server cert verification, we won't // exclusively connect to non-MiM CDNs); then the adversary kills the underlying TCP connection after // some short period. This is mitigated by the "impaired" protocol classification mechanism. scheme = "https" tlsConfig := &CustomTLSConfig{ DialAddr: meekConfig.DialAddress, Dial: NewTCPDialer(dialConfig), SNIServerName: meekConfig.SNIServerName, SkipVerify: true, UseIndistinguishableTLS: dialConfig.UseIndistinguishableTLS, TLSProfile: meekConfig.TLSProfile, TrustedCACertificatesFilename: dialConfig.TrustedCACertificatesFilename, } if meekConfig.UseObfuscatedSessionTickets { tlsConfig.ObfuscatedSessionTicketKey = meekConfig.MeekObfuscatedKey } tlsDialer := NewCustomTLSDialer(tlsConfig) // Pre-dial one TLS connection in order to inspect the negotiated // application protocol. Then we create an HTTP/2 or HTTP/1.1 transport // depending on which protocol was negotiated. The TLS dialer // is assumed to negotiate only "h2" or "http/1.1"; or not negotiate // an application protocol. // // We cannot rely on net/http's HTTP/2 support since it's only // activated when http.Transport.DialTLS returns a golang crypto/tls.Conn; // e.g., https://github.com/golang/go/blob/c8aec4095e089ff6ac50d18e97c3f46561f14f48/src/net/http/transport.go#L1040 // // The pre-dialed connection is stored in a cachedTLSDialer, which will // return the cached pre-dialed connection to its first Dial caller, and // use the tlsDialer for all other Dials. // // cachedTLSDialer.close() must be called on all exits paths from this // function and in meek.Close() to ensure the cached conn is closed in // any case where no Dial call is made. // // The pre-dial must be interruptible so that DialMeek doesn't block and // hang/delay a shutdown or end of establishment. So the pre-dial uses // the Controller's PendingConns, not the MeekConn PendingConns. For this // purpose, a special preDialer is configured. // // Only one pre-dial attempt is made; there are no retries. This differs // from roundTrip, which retries and may redial for each retry. Retries // at the pre-dial phase are less useful since there's no active session // to preserve, and establishment will simply try another server. Note // that the underlying TCPDial may still try multiple IP addreses when // the destination is a domain and ir resolves to multiple IP adresses. // The pre-dial is made within the parent dial context, so that DialMeek // may be interrupted. Subsequent dials are made within the meek round trip // request context. Since http.DialTLS doesn't take a context argument // (yet; as of Go 1.9 this issue is still open: https://github.com/golang/go/issues/21526), // cachedTLSDialer is used as a conduit to send the request context. // meekConn.roundTrip sets its request context into cachedTLSDialer, and // cachedTLSDialer.dial uses that context. // As DialAddr is set in the CustomTLSConfig, no address is required here. preConn, err := tlsDialer(ctx, "tcp", "") if err != nil { return nil, common.ContextError(err) } isHTTP2 := false if tlsConn, ok := preConn.(*tls.Conn); ok { state := tlsConn.ConnectionState() if state.NegotiatedProtocolIsMutual && state.NegotiatedProtocol == "h2" { isHTTP2 = true } } cachedTLSDialer = newCachedTLSDialer(preConn, tlsDialer) if isHTTP2 { NoticeInfo("negotiated HTTP/2 for %s", meekConfig.DialAddress) transport = &http2.Transport{ DialTLS: func(network, addr string, _ *golangtls.Config) (net.Conn, error) { return cachedTLSDialer.dial(network, addr) }, } } else { transport = &http.Transport{ DialTLS: func(network, addr string) (net.Conn, error) { return cachedTLSDialer.dial(network, addr) }, } } } else { scheme = "http" // The dialer ignores address that http.Transport will pass in (derived // from the HTTP request URL) and always dials meekConfig.DialAddress. dialer := func(ctx context.Context, network, _ string) (net.Conn, error) { return NewTCPDialer(dialConfig)(ctx, network, meekConfig.DialAddress) } // For HTTP, and when the meekConfig.DialAddress matches the // meekConfig.HostHeader, we let http.Transport handle proxying. // http.Transport will put the the HTTP server address in the HTTP // request line. In this one case, we can use an HTTP proxy that does // not offer CONNECT support. if strings.HasPrefix(dialConfig.UpstreamProxyUrl, "http://") && (meekConfig.DialAddress == meekConfig.HostHeader || meekConfig.DialAddress == meekConfig.HostHeader+":80") { url, err := url.Parse(dialConfig.UpstreamProxyUrl) if err != nil { return nil, common.ContextError(err) } proxyUrl = http.ProxyURL(url) // Here, the dialer must use the address that http.Transport // passes in (which will be proxy address). copyDialConfig := new(DialConfig) *copyDialConfig = *dialConfig copyDialConfig.UpstreamProxyUrl = "" dialer = NewTCPDialer(copyDialConfig) } httpTransport := &http.Transport{ Proxy: proxyUrl, DialContext: dialer, } if proxyUrl != nil { // Wrap transport with a transport that can perform HTTP proxy auth negotiation transport, err = upstreamproxy.NewProxyAuthTransport(httpTransport, dialConfig.CustomHeaders) if err != nil { return nil, common.ContextError(err) } } else { transport = httpTransport } } url := &url.URL{ Scheme: scheme, Host: meekConfig.HostHeader, Path: "/", } if meekConfig.UseHTTPS { host, _, err := net.SplitHostPort(meekConfig.DialAddress) if err != nil { return nil, common.ContextError(err) } additionalHeaders = map[string][]string{ "X-Psiphon-Fronting-Address": {host}, } } else { if proxyUrl == nil { additionalHeaders = dialConfig.CustomHeaders } } cookie, err := makeMeekCookie(meekConfig) if err != nil { return nil, common.ContextError(err) } // The main loop of a MeekConn is run in the relay() goroutine. // A MeekConn implements net.Conn concurrency semantics: // "Multiple goroutines may invoke methods on a Conn simultaneously." // // Read() calls and relay() are synchronized by exchanging control of a single // receiveBuffer (bytes.Buffer). This single buffer may be: // - in the emptyReceiveBuffer channel when it is available and empty; // - in the partialReadBuffer channel when it is available and contains data; // - in the fullReadBuffer channel when it is available and full of data; // - "checked out" by relay or Read when they are are writing to or reading from the // buffer, respectively. // relay() will obtain the buffer from either the empty or partial channel but block when // the buffer is full. Read will obtain the buffer from the partial or full channel when // there is data to read but block when the buffer is empty. // Write() calls and relay() are synchronized in a similar way, using a single // sendBuffer. meek = &MeekConn{ url: url, additionalHeaders: additionalHeaders, cookie: cookie, cachedTLSDialer: cachedTLSDialer, transport: transport, isClosed: false, runCtx: runCtx, stopRunning: stopRunning, relayWaitGroup: new(sync.WaitGroup), fullReceiveBufferLength: FULL_RECEIVE_BUFFER_LENGTH, readPayloadChunkLength: READ_PAYLOAD_CHUNK_LENGTH, emptyReceiveBuffer: make(chan *bytes.Buffer, 1), partialReceiveBuffer: make(chan *bytes.Buffer, 1), fullReceiveBuffer: make(chan *bytes.Buffer, 1), emptySendBuffer: make(chan *bytes.Buffer, 1), partialSendBuffer: make(chan *bytes.Buffer, 1), fullSendBuffer: make(chan *bytes.Buffer, 1), } // stopRunning and cachedTLSDialer will now be closed in meek.Close() cleanupStopRunning = false cleanupCachedTLSDialer = false meek.emptyReceiveBuffer <- new(bytes.Buffer) meek.emptySendBuffer <- new(bytes.Buffer) meek.relayWaitGroup.Add(1) if meekConfig.LimitBufferSizes { meek.fullReceiveBufferLength = LIMITED_FULL_RECEIVE_BUFFER_LENGTH meek.readPayloadChunkLength = LIMITED_READ_PAYLOAD_CHUNK_LENGTH } go meek.relay() return meek, nil } type cachedTLSDialer struct { usedCachedConn int32 cachedConn net.Conn requestContext atomic.Value dialer Dialer } func newCachedTLSDialer(cachedConn net.Conn, dialer Dialer) *cachedTLSDialer { return &cachedTLSDialer{ cachedConn: cachedConn, dialer: dialer, } } func (c *cachedTLSDialer) setRequestContext(requestContext context.Context) { c.requestContext.Store(requestContext) } func (c *cachedTLSDialer) dial(network, addr string) (net.Conn, error) { if atomic.CompareAndSwapInt32(&c.usedCachedConn, 0, 1) { conn := c.cachedConn c.cachedConn = nil return conn, nil } ctx := c.requestContext.Load().(context.Context) if ctx == nil { ctx = context.Background() } return c.dialer(ctx, network, addr) } func (c *cachedTLSDialer) close() { if atomic.CompareAndSwapInt32(&c.usedCachedConn, 0, 1) { c.cachedConn.Close() c.cachedConn = nil } } // Close terminates the meek connection. Close waits for the relay processing goroutine // to stop and releases HTTP transport resources. // A mutex is required to support net.Conn concurrency semantics. func (meek *MeekConn) Close() (err error) { meek.mutex.Lock() isClosed := meek.isClosed meek.isClosed = true meek.mutex.Unlock() if !isClosed { meek.stopRunning() if meek.cachedTLSDialer != nil { meek.cachedTLSDialer.close() } meek.relayWaitGroup.Wait() meek.transport.CloseIdleConnections() } return nil } // IsClosed implements the Closer iterface. The return value // indicates whether the MeekConn has been closed. func (meek *MeekConn) IsClosed() bool { meek.mutex.Lock() isClosed := meek.isClosed meek.mutex.Unlock() return isClosed } // Read reads data from the connection. // net.Conn Deadlines are ignored. net.Conn concurrency semantics are supported. func (meek *MeekConn) Read(buffer []byte) (n int, err error) { if meek.IsClosed() { return 0, common.ContextError(errors.New("meek connection is closed")) } // Block until there is received data to consume var receiveBuffer *bytes.Buffer select { case receiveBuffer = <-meek.partialReceiveBuffer: case receiveBuffer = <-meek.fullReceiveBuffer: case <-meek.runCtx.Done(): return 0, common.ContextError(errors.New("meek connection has closed")) } n, err = receiveBuffer.Read(buffer) meek.replaceReceiveBuffer(receiveBuffer) return n, err } // Write writes data to the connection. // net.Conn Deadlines are ignored. net.Conn concurrency semantics are supported. func (meek *MeekConn) Write(buffer []byte) (n int, err error) { if meek.IsClosed() { return 0, common.ContextError(errors.New("meek connection is closed")) } // Repeats until all n bytes are written n = len(buffer) for len(buffer) > 0 { // Block until there is capacity in the send buffer var sendBuffer *bytes.Buffer select { case sendBuffer = <-meek.emptySendBuffer: case sendBuffer = <-meek.partialSendBuffer: case <-meek.runCtx.Done(): return 0, common.ContextError(errors.New("meek connection has closed")) } writeLen := MAX_SEND_PAYLOAD_LENGTH - sendBuffer.Len() if writeLen > 0 { if writeLen > len(buffer) { writeLen = len(buffer) } _, err = sendBuffer.Write(buffer[:writeLen]) buffer = buffer[writeLen:] } meek.replaceSendBuffer(sendBuffer) } return n, err } // LocalAddr is a stub implementation of net.Conn.LocalAddr func (meek *MeekConn) LocalAddr() net.Addr { return nil } // RemoteAddr is a stub implementation of net.Conn.RemoteAddr func (meek *MeekConn) RemoteAddr() net.Addr { return nil } // SetDeadline is a stub implementation of net.Conn.SetDeadline func (meek *MeekConn) SetDeadline(t time.Time) error { return common.ContextError(errors.New("not supported")) } // SetReadDeadline is a stub implementation of net.Conn.SetReadDeadline func (meek *MeekConn) SetReadDeadline(t time.Time) error { return common.ContextError(errors.New("not supported")) } // SetWriteDeadline is a stub implementation of net.Conn.SetWriteDeadline func (meek *MeekConn) SetWriteDeadline(t time.Time) error { return common.ContextError(errors.New("not supported")) } func (meek *MeekConn) replaceReceiveBuffer(receiveBuffer *bytes.Buffer) { switch { case receiveBuffer.Len() == 0: meek.emptyReceiveBuffer <- receiveBuffer case receiveBuffer.Len() >= meek.fullReceiveBufferLength: meek.fullReceiveBuffer <- receiveBuffer default: meek.partialReceiveBuffer <- receiveBuffer } } func (meek *MeekConn) replaceSendBuffer(sendBuffer *bytes.Buffer) { switch { case sendBuffer.Len() == 0: meek.emptySendBuffer <- sendBuffer case sendBuffer.Len() >= MAX_SEND_PAYLOAD_LENGTH: meek.fullSendBuffer <- sendBuffer default: meek.partialSendBuffer <- sendBuffer } } // relay sends and receives tunneled traffic (payload). An HTTP request is // triggered when data is in the write queue or at a polling interval. // There's a geometric increase, up to a maximum, in the polling interval when // no data is exchanged. Only one HTTP request is in flight at a time. func (meek *MeekConn) relay() { // Note: meek.Close() calls here in relay() are made asynchronously // (using goroutines) since Close() will wait on this WaitGroup. defer meek.relayWaitGroup.Done() interval := common.JitterDuration( MIN_POLL_INTERVAL, MIN_POLL_INTERVAL_JITTER) timeout := time.NewTimer(interval) defer timeout.Stop() for { timeout.Reset(interval) // Block until there is payload to send or it is time to poll var sendBuffer *bytes.Buffer select { case sendBuffer = <-meek.partialSendBuffer: case sendBuffer = <-meek.fullSendBuffer: case <-timeout.C: // In the polling case, send an empty payload case <-meek.runCtx.Done(): // Drop through to second Done() check } // Check Done() again, to ensure it takes precedence select { case <-meek.runCtx.Done(): return default: } sendPayloadSize := 0 if sendBuffer != nil { sendPayloadSize = sendBuffer.Len() } // roundTrip will replace sendBuffer (by calling replaceSendBuffer). This is // a compromise to conserve memory. Using a second buffer here, we could copy // sendBuffer and immediately replace it, unblocking meekConn.Write() and // allowing more upstream payload to immediately enqueue. Instead, the request // payload is read directly from sendBuffer, including retries. Only once the // server has acknowledged the request payload is sendBuffer replaced. This // still allows meekConn.Write() to unblock before the round trip response is // read. receivedPayloadSize, err := meek.roundTrip(sendBuffer) if err != nil { select { case <-meek.runCtx.Done(): // In this case, meek.roundTrip encountered Done(). Exit without logging error. return default: } NoticeAlert("%s", common.ContextError(err)) go meek.Close() return } // Calculate polling interval. When data is received, // immediately request more. Otherwise, schedule next // poll with exponential back off. Jitter and coin // flips are used to avoid trivial, static traffic // timing patterns. if receivedPayloadSize > 0 || sendPayloadSize > 0 { interval = 0 } else if interval == 0 { interval = common.JitterDuration( MIN_POLL_INTERVAL, MIN_POLL_INTERVAL_JITTER) } else { if common.FlipCoin() { interval = common.JitterDuration( interval, POLL_INTERVAL_JITTER) } else { interval = common.JitterDuration( time.Duration(float64(interval)*POLL_INTERVAL_MULTIPLIER), POLL_INTERVAL_JITTER) } if interval >= MAX_POLL_INTERVAL { interval = common.JitterDuration( MAX_POLL_INTERVAL, MAX_POLL_INTERVAL_JITTER) } } } } // readCloseSignaller is an io.ReadCloser wrapper for an io.Reader // that is passed, as the request body, to http.Transport.RoundTrip. // readCloseSignaller adds the AwaitClosed call, which is used // to schedule recycling the buffer underlying the reader only after // RoundTrip has called Close and will no longer use the buffer. // See: https://golang.org/pkg/net/http/#RoundTripper type readCloseSignaller struct { context context.Context reader io.Reader closed chan struct{} } func NewReadCloseSignaller( context context.Context, reader io.Reader) *readCloseSignaller { return &readCloseSignaller{ context: context, reader: reader, closed: make(chan struct{}, 1), } } func (r *readCloseSignaller) Read(p []byte) (int, error) { return r.reader.Read(p) } func (r *readCloseSignaller) Close() error { select { case r.closed <- *new(struct{}): default: } return nil } func (r *readCloseSignaller) AwaitClosed() bool { select { case <-r.context.Done(): case <-r.closed: return true } return false } // roundTrip configures and makes the actual HTTP POST request func (meek *MeekConn) roundTrip(sendBuffer *bytes.Buffer) (int64, error) { // Retries are made when the round trip fails. This adds resiliency // to connection interruption and intermittent failures. // // At least one retry is always attempted, and retries continue // while still within a brief deadline -- 5 seconds, currently the // deadline for an actively probed SSH connection to timeout. There // is a brief delay between retries, allowing for intermittent // failure states to resolve. // // Failure may occur at various stages of the HTTP request: // // 1. Before the request begins. In this case, the entire request // may be rerun. // // 2. While sending the request payload. In this case, the client // must resend its request payload. The server will not have // relayed its partially received request payload. // // 3. After sending the request payload but before receiving // a response. The client cannot distinguish between case 2 and // this case, case 3. The client resends its payload and the // server detects this and skips relaying the request payload. // // 4. While reading the response payload. The client will omit its // request payload when retrying, as the server has already // acknowledged it. The client will also indicate to the server // the amount of response payload already received, and the // server will skip resending the indicated amount of response // payload. // // Retries are indicated to the server by adding a Range header, // which includes the response payload resend position. defer func() { // Ensure sendBuffer is replaced, even in error code paths. if sendBuffer != nil { sendBuffer.Truncate(0) meek.replaceSendBuffer(sendBuffer) } }() retries := uint(0) retryDeadline := monotime.Now().Add(MEEK_ROUND_TRIP_RETRY_DEADLINE) retryDelay := MEEK_ROUND_TRIP_RETRY_MIN_DELAY serverAcknowledgedRequestPayload := false receivedPayloadSize := int64(0) for try := 0; ; try++ { // Omit the request payload when retrying after receiving a // partial server response. var signaller *readCloseSignaller var requestBody io.ReadCloser contentLength := 0 if !serverAcknowledgedRequestPayload && sendBuffer != nil { // sendBuffer will be replaced once the data is no longer needed, // when RoundTrip calls Close on the Body; this allows meekConn.Write() // to unblock and start buffering data for the next roung trip while // still reading the current round trip response. signaller provides // the hook for awaiting RoundTrip's call to Close. signaller = NewReadCloseSignaller(meek.runCtx, bytes.NewReader(sendBuffer.Bytes())) requestBody = signaller contentLength = sendBuffer.Len() } var request *http.Request request, err := http.NewRequest("POST", meek.url.String(), requestBody) if err != nil { // Don't retry when can't initialize a Request return 0, common.ContextError(err) } // Content-Length won't be set automatically due to the underlying // type of requestBody. if contentLength > 0 { request.ContentLength = int64(contentLength) } // - meek.stopRunning() will abort a round trip in flight // - round trip will abort if it exceeds MEEK_ROUND_TRIP_TIMEOUT requestContext, cancelFunc := context.WithTimeout( meek.runCtx, MEEK_ROUND_TRIP_TIMEOUT) defer cancelFunc() // Ensure TLS dials are made within the current request context. if meek.cachedTLSDialer != nil { meek.cachedTLSDialer.setRequestContext(requestContext) } request = request.WithContext(requestContext) meek.addAdditionalHeaders(request) request.Header.Set("Content-Type", "application/octet-stream") request.AddCookie(meek.cookie) expectedStatusCode := http.StatusOK // When retrying, add a Range header to indicate how much // of the response was already received. if try > 0 { expectedStatusCode = http.StatusPartialContent request.Header.Set("Range", fmt.Sprintf("bytes=%d-", receivedPayloadSize)) } response, err := meek.transport.RoundTrip(request) // Wait for RoundTrip to call Close on the request body, when // there is one. This is necessary to ensure it's safe to // subsequently replace sendBuffer in both the success and // error cases. if signaller != nil { if !signaller.AwaitClosed() { // AwaitClosed encountered Done(). Abort immediately. Do not // replace sendBuffer, as we cannot be certain RoundTrip is // done with it. MeekConn.Write will exit on Done and not hang // awaiting sendBuffer. sendBuffer = nil return 0, common.ContextError(errors.New("meek connection has closed")) } } if err != nil { select { case <-meek.runCtx.Done(): // Exit without retrying and without logging error. return 0, common.ContextError(err) default: } NoticeAlert("meek round trip failed: %s", err) // ...continue to retry } if err == nil { if response.StatusCode != expectedStatusCode && // Certain http servers return 200 OK where we expect 206, so accept that. !(expectedStatusCode == http.StatusPartialContent && response.StatusCode == http.StatusOK) { // Don't retry when the status code is incorrect response.Body.Close() return 0, common.ContextError( fmt.Errorf( "unexpected status code: %d instead of %d", response.StatusCode, expectedStatusCode)) } // Update meek session cookie for _, c := range response.Cookies() { if meek.cookie.Name == c.Name { meek.cookie.Value = c.Value break } } // Received the response status code, so the server // must have received the request payload. serverAcknowledgedRequestPayload = true // sendBuffer is now no longer required for retries, and the // buffer may be replaced; this allows meekConn.Write() to unblock // and start buffering data for the next round trip while still // reading the current round trip response. if sendBuffer != nil { // Assumes signaller.AwaitClosed is called above, so // sendBuffer will no longer be accessed by RoundTrip. sendBuffer.Truncate(0) meek.replaceSendBuffer(sendBuffer) sendBuffer = nil } readPayloadSize, err := meek.readPayload(response.Body) response.Body.Close() // receivedPayloadSize is the number of response // payload bytes received and relayed. A retry can // resume after this position. receivedPayloadSize += readPayloadSize if err != nil { NoticeAlert("meek read payload failed: %s", err) // ...continue to retry } else { // Round trip completed successfully break } } // Release context resources now. cancelFunc() // Either the request failed entirely, or there was a failure // streaming the response payload. Always retry once. Then // retry if time remains; when the next delay exceeds the time // remaining until the deadline, do not retry. now := monotime.Now() if retries >= 1 && (now.After(retryDeadline) || retryDeadline.Sub(now) <= retryDelay) { return 0, common.ContextError(err) } retries += 1 delayTimer := time.NewTimer(retryDelay) select { case <-delayTimer.C: case <-meek.runCtx.Done(): delayTimer.Stop() return 0, common.ContextError(err) } // Increase the next delay, to back off and avoid excessive // activity in conditions such as no network connectivity. retryDelay *= MEEK_ROUND_TRIP_RETRY_MULTIPLIER if retryDelay >= MEEK_ROUND_TRIP_RETRY_MAX_DELAY { retryDelay = MEEK_ROUND_TRIP_RETRY_MAX_DELAY } } return receivedPayloadSize, nil } // Add additional headers to the HTTP request using the same method we use for adding // custom headers to HTTP proxy requests. func (meek *MeekConn) addAdditionalHeaders(request *http.Request) { for name, value := range meek.additionalHeaders { // hack around special case of "Host" header // https://golang.org/src/net/http/request.go#L474 // using URL.Opaque, see URL.RequestURI() https://golang.org/src/net/url/url.go#L915 if name == "Host" { if len(value) > 0 { if request.URL.Opaque == "" { request.URL.Opaque = request.URL.Scheme + "://" + request.Host + request.URL.RequestURI() } request.Host = value[0] } } else { request.Header[name] = value } } } // readPayload reads the HTTP response in chunks, making the read buffer available // to MeekConn.Read() calls after each chunk; the intention is to allow bytes to // flow back to the reader as soon as possible instead of buffering the entire payload. // // When readPayload returns an error, the totalSize output is remains valid -- it's the // number of payload bytes successfully read and relayed. func (meek *MeekConn) readPayload( receivedPayload io.ReadCloser) (totalSize int64, err error) { defer receivedPayload.Close() totalSize = 0 for { reader := io.LimitReader(receivedPayload, int64(meek.readPayloadChunkLength)) // Block until there is capacity in the receive buffer var receiveBuffer *bytes.Buffer select { case receiveBuffer = <-meek.emptyReceiveBuffer: case receiveBuffer = <-meek.partialReceiveBuffer: case <-meek.runCtx.Done(): return 0, nil } // Note: receiveBuffer size may exceed meek.fullReceiveBufferLength by up to the size // of one received payload. The meek.fullReceiveBufferLength value is just a guideline. n, err := receiveBuffer.ReadFrom(reader) meek.replaceReceiveBuffer(receiveBuffer) totalSize += n if err != nil { return totalSize, common.ContextError(err) } if n == 0 { break } } return totalSize, nil } // makeCookie creates the cookie to be sent with initial meek HTTP request. // The purpose of the cookie is to send the following to the server: // ServerAddress -- the Psiphon Server address the meek server should relay to // SessionID -- the Psiphon session ID (used by meek server to relay geolocation // information obtained from the CDN through to the Psiphon Server) // MeekProtocolVersion -- tells the meek server that this client understands // the latest protocol. // The server will create a session using these values and send the session ID // back to the client via Set-Cookie header. Client must use that value with // all consequent HTTP requests // In unfronted meek mode, the cookie is visible over the adversary network, so the // cookie is encrypted and obfuscated. func makeMeekCookie(meekConfig *MeekConfig) (cookie *http.Cookie, err error) { // Make the JSON data serverAddress := meekConfig.PsiphonServerAddress cookieData := &protocol.MeekCookieData{ ServerAddress: serverAddress, SessionID: meekConfig.SessionID, MeekProtocolVersion: MEEK_PROTOCOL_VERSION, ClientTunnelProtocol: meekConfig.ClientTunnelProtocol, } serializedCookie, err := json.Marshal(cookieData) if err != nil { return nil, common.ContextError(err) } // Encrypt the JSON data // NaCl box is used for encryption. The peer public key comes from the server entry. // Nonce is always all zeros, and is not sent in the cookie (the server also uses an all-zero nonce). // http://nacl.cace-project.eu/box.html: // "There is no harm in having the same nonce for different messages if the {sender, receiver} sets are // different. This is true even if the sets overlap. For example, a sender can use the same nonce for two // different messages if the messages are sent to two different public keys." var nonce [24]byte var publicKey [32]byte decodedPublicKey, err := base64.StdEncoding.DecodeString(meekConfig.MeekCookieEncryptionPublicKey) if err != nil { return nil, common.ContextError(err) } copy(publicKey[:], decodedPublicKey) ephemeralPublicKey, ephemeralPrivateKey, err := box.GenerateKey(rand.Reader) if err != nil { return nil, common.ContextError(err) } box := box.Seal(nil, serializedCookie, &nonce, &publicKey, ephemeralPrivateKey) encryptedCookie := make([]byte, 32+len(box)) copy(encryptedCookie[0:32], ephemeralPublicKey[0:32]) copy(encryptedCookie[32:], box) // Obfuscate the encrypted data obfuscator, err := common.NewClientObfuscator( &common.ObfuscatorConfig{Keyword: meekConfig.MeekObfuscatedKey, MaxPadding: MEEK_COOKIE_MAX_PADDING}) if err != nil { return nil, common.ContextError(err) } obfuscatedCookie := obfuscator.SendSeedMessage() seedLen := len(obfuscatedCookie) obfuscatedCookie = append(obfuscatedCookie, encryptedCookie...) obfuscator.ObfuscateClientToServer(obfuscatedCookie[seedLen:]) // Format the HTTP cookie // The format is =, which is intended to match common cookie formats. A := int('A') Z := int('Z') // letterIndex is integer in range [int('A'), int('Z')] letterIndex, err := common.MakeSecureRandomInt(Z - A + 1) if err != nil { return nil, common.ContextError(err) } return &http.Cookie{ Name: string(byte(A + letterIndex)), Value: base64.StdEncoding.EncodeToString(obfuscatedCookie)}, nil }