/*
* Copyright (c) 2023, Psiphon Inc.
* All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*
*/
package psiphon
import (
"bytes"
"context"
"encoding/binary"
std_errors "errors"
"fmt"
"io"
"net"
"net/http"
"net/netip"
"strconv"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/fragmentor"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/resolver"
"github.com/cespare/xxhash"
"golang.org/x/net/bpf"
)
// InproxyBrokerClientManager manages an InproxyBrokerClientInstance, an
// in-proxy broker client, and its associated broker dial parameters, that
// may be shared by multiple client dials or proxy instances. There is no
// explicit close operation for the managed InproxyBrokerClientInstance.
//
// Once used, the current InproxyBrokerClientInstance and its broker client is
// left actively connected to the broker, to minimize transport round trips
// for additional requests.
//
// The InproxyBrokerClientManager and its components implement a replay system
// for broker client dials. As one broker client is shared access multiple
// client in-proxy dials, the broker dial parameters are replayed
// independently from tunnel dial parameters.
//
// The NewInproxyBrokerClientInstance layer provides a fixed association
// between a broker client and its broker dial parameters, ensuring that
// in-proxy success/failure callbacks reference the correct replay parameters
// when setting or clearing replay.
//
// A new InproxyBrokerClientInstance, including the broker dial parameters and
// broker client, is instantiated when the active network ID changes, using
// tactics for the new network.
type InproxyBrokerClientManager struct {
config *Config
isProxy bool
mutex sync.Mutex
networkID string
brokerClientInstance *InproxyBrokerClientInstance
}
// NewInproxyBrokerClientManager creates a new InproxyBrokerClientManager.
// NewInproxyBrokerClientManager does not perform any network operations; the
// managed InproxyBrokerClientInstance is initialized when used for a round
// trip.
func NewInproxyBrokerClientManager(
config *Config, isProxy bool) *InproxyBrokerClientManager {
b := &InproxyBrokerClientManager{
config: config,
isProxy: isProxy,
}
// b.brokerClientInstance is initialized on demand, when getBrokerClient
// is called.
return b
}
// TacticsApplied implements the TacticsAppliedReceiver interface, and is
// called when tactics have changed, which triggers a broker client reset in
// order to apply potentially changed parameters.
func (b *InproxyBrokerClientManager) TacticsApplied() error {
b.mutex.Lock()
defer b.mutex.Unlock()
// Don't reset when not yet initialized; b.brokerClientInstance is
// initialized only on demand.
if b.brokerClientInstance == nil {
return nil
}
// TODO: as a future future enhancement, don't reset when the tactics
// brokerSpecs.Hash() is unchanged?
return errors.Trace(b.reset())
}
// GetBrokerClient returns the current, shared broker client and its
// corresponding dial parametrers (for metrics logging). If there is no
// current broker client, if the network ID differs from the network ID
// associated with the previous broker client, a new broker client is
// initialized.
func (b *InproxyBrokerClientManager) GetBrokerClient(
networkID string) (*inproxy.BrokerClient, *InproxyBrokerDialParameters, error) {
b.mutex.Lock()
defer b.mutex.Unlock()
if b.brokerClientInstance == nil || b.networkID != networkID {
err := b.reset()
if err != nil {
return nil, nil, errors.Trace(err)
}
}
// The b.brokerClientInstance.brokerClient is wired up to refer back to
// b.brokerClientInstance.brokerDialParams/roundTripper, etc.
return b.brokerClientInstance.brokerClient,
b.brokerClientInstance.brokerDialParams,
nil
}
func (b *InproxyBrokerClientManager) resetBrokerClientOnRoundTripperFailed(
brokerClientInstance *InproxyBrokerClientInstance) error {
b.mutex.Lock()
defer b.mutex.Unlock()
if b.brokerClientInstance != brokerClientInstance {
// Ignore the reset if the signal comes from the non-current
// brokerClientInstance, which may occur when multiple in-flight
// round trips fail in close proximity.
return nil
}
return errors.Trace(b.reset())
}
func (b *InproxyBrokerClientManager) reset() error {
// Assumes b.mutex lock is held.
if b.brokerClientInstance != nil {
// Close the existing broker client. This will close all underlying
// network connections, interrupting any in-flight requests. This
// close is invoked in the resetBrokerClientOnRoundTripperFailed
// case, where it's expected that the round tripped has permanently
// failed.
b.brokerClientInstance.Close()
}
// Any existing broker client is removed, even if
// NewInproxyBrokerClientInstance fails. This ensures, for example, that
// an existing broker client is removed when its spec is no longer
// available in tactics.
b.networkID = ""
b.brokerClientInstance = nil
networkID := b.config.GetNetworkID()
brokerClientInstance, err := NewInproxyBrokerClientInstance(
b.config, b, networkID, b.isProxy)
if err != nil {
return errors.Trace(err)
}
b.networkID = networkID
b.brokerClientInstance = brokerClientInstance
return nil
}
// InproxyBrokerClientInstance pairs an inproxy.BrokerClient instance with an
// implementation of the inproxy.BrokerDialCoordinator interface and the
// associated, underlying broker dial parameters. InproxyBrokerClientInstance
// implements broker client dial replay.
type InproxyBrokerClientInstance struct {
config *Config
brokerClientManager *InproxyBrokerClientManager
networkID string
brokerClientPrivateKey inproxy.SessionPrivateKey
brokerClient *inproxy.BrokerClient
brokerPublicKey inproxy.SessionPublicKey
brokerRootObfuscationSecret inproxy.ObfuscationSecret
brokerDialParams *InproxyBrokerDialParameters
replayEnabled bool
isReplay bool
roundTripper *InproxyBrokerRoundTripper
personalCompartmentIDs []inproxy.ID
commonCompartmentIDs []inproxy.ID
sessionHandshakeTimeout time.Duration
announceRequestTimeout time.Duration
announceDelay time.Duration
announceDelayJitter float64
answerRequestTimeout time.Duration
offerRequestTimeout time.Duration
offerRetryDelay time.Duration
offerRetryJitter float64
relayedPacketRequestTimeout time.Duration
replayRetainFailedProbability float64
replayUpdateFrequency time.Duration
mutex sync.Mutex
lastStoreReplay time.Time
}
// NewInproxyBrokerClientInstance creates a new InproxyBrokerClientInstance.
// NewInproxyBrokerClientManager does not perform any network operations; the
// new InproxyBrokerClientInstance is initialized when used for a round
// trip.
func NewInproxyBrokerClientInstance(
config *Config,
brokerClientManager *InproxyBrokerClientManager,
networkID string,
isProxy bool) (*InproxyBrokerClientInstance, error) {
p := config.GetParameters().Get()
defer p.Close()
// Select common or personal compartment IDs.
commonCompartmentIDs, personalCompartmentIDs, err := prepareCompartmentIDs(config, p, isProxy)
if err != nil {
return nil, errors.Trace(err)
}
// Select the broker to use, optionally favoring brokers with replay data.
// In the InproxyBrokerSpecs calls, the first non-empty tactics parameter
// list is used.
//
// Optional broker specs may be used to specify broker(s) dedicated to
// personal pairing, a configuration which can be used to reserve more
// capacity for personal pairing, given the simple rendezvous scheme below.
var brokerSpecs parameters.InproxyBrokerSpecsValue
if isProxy {
if config.IsInproxyPersonalPairingMode() {
brokerSpecs = p.InproxyBrokerSpecs(
parameters.InproxyProxyPersonalPairingBrokerSpecs,
parameters.InproxyPersonalPairingBrokerSpecs,
parameters.InproxyProxyBrokerSpecs,
parameters.InproxyBrokerSpecs)
} else {
brokerSpecs = p.InproxyBrokerSpecs(
parameters.InproxyProxyBrokerSpecs,
parameters.InproxyBrokerSpecs)
}
} else {
if config.IsInproxyPersonalPairingMode() {
brokerSpecs = p.InproxyBrokerSpecs(
parameters.InproxyClientPersonalPairingBrokerSpecs,
parameters.InproxyPersonalPairingBrokerSpecs,
parameters.InproxyClientBrokerSpecs,
parameters.InproxyBrokerSpecs)
} else {
brokerSpecs = p.InproxyBrokerSpecs(
parameters.InproxyClientBrokerSpecs,
parameters.InproxyBrokerSpecs)
}
}
if len(brokerSpecs) == 0 {
return nil, errors.TraceNew("no broker specs")
}
// To ensure personal compartment ID client/proxy rendezvous at same
// broker, simply pick the first configured broker.
//
// Limitations: there's no failover or load balancing for the personal
// compartment ID case; and this logic assumes that the broker spec
// tactics are the same for the client and proxy.
if len(personalCompartmentIDs) > 0 {
brokerSpecs = brokerSpecs[:1]
}
now := time.Now()
// Prefer a broker with replay data.
// Replay is disabled when the TTL, InproxyReplayBrokerDialParametersTTL,
// is 0.
ttl := p.Duration(parameters.InproxyReplayBrokerDialParametersTTL)
replayEnabled := ttl > 0 &&
!config.DisableReplay &&
prng.FlipWeightedCoin(p.Float(parameters.InproxyReplayBrokerDialParametersProbability))
brokerSpec, brokerDialParams, err :=
ShuffleAndGetNetworkReplayParameters[parameters.InproxyBrokerSpec, InproxyBrokerDialParameters](
networkID,
replayEnabled,
brokerSpecs,
func(spec *parameters.InproxyBrokerSpec) string { return spec.BrokerPublicKey },
func(spec *parameters.InproxyBrokerSpec, dialParams *InproxyBrokerDialParameters) bool {
return dialParams.LastUsedTimestamp.After(now.Add(-ttl)) &&
bytes.Equal(dialParams.LastUsedBrokerSpecHash, hashBrokerSpec(spec))
})
if err != nil {
NoticeWarning("ShuffleAndGetNetworkReplayParameters failed: %v", errors.Trace(err))
// When there's an error, try to continue, using a random broker spec
// and no replay dial parameters.
brokerSpec = brokerSpecs[prng.Intn(len(brokerSpecs)-1)]
}
// Generate new broker dial parameters if not replaying. Later, isReplay
// is used to report the replay metric.
isReplay := brokerDialParams != nil
if !isReplay {
brokerDialParams, err = MakeInproxyBrokerDialParameters(config, p, networkID, brokerSpec)
if err != nil {
return nil, errors.Trace(err)
}
} else {
brokerDialParams.brokerSpec = brokerSpec
err := brokerDialParams.prepareDialConfigs(config, p, networkID, true, nil)
if err != nil {
return nil, errors.Trace(err)
}
}
// Load broker key material.
brokerPublicKey, err := inproxy.SessionPublicKeyFromString(brokerSpec.BrokerPublicKey)
if err != nil {
return nil, errors.Trace(err)
}
brokerRootObfuscationSecret, err := inproxy.ObfuscationSecretFromString(brokerSpec.BrokerRootObfuscationSecret)
if err != nil {
return nil, errors.Trace(err)
}
roundTripper := NewInproxyBrokerRoundTripper(p, brokerDialParams)
// Clients always generate an ephemeral session key pair. Proxies may opt
// to use a long-lived key pair for proxied traffic attribution.
var brokerClientPrivateKey inproxy.SessionPrivateKey
if isProxy && config.InproxyProxySessionPrivateKey != "" {
brokerClientPrivateKey, err = inproxy.SessionPrivateKeyFromString(config.InproxyProxySessionPrivateKey)
if err != nil {
return nil, errors.Trace(err)
}
} else {
brokerClientPrivateKey, err = inproxy.GenerateSessionPrivateKey()
if err != nil {
return nil, errors.Trace(err)
}
}
// InproxyBrokerClientInstance implements the
// inproxy.BrokerDialCoordinator interface and passes itself to
// inproxy.NewBrokerClient in order to provide the round tripper, key
// material, compartment IDs, timeouts, and other configuration to the
// in-proxy broker client.
//
// Timeouts are not replayed, but snapshots are stored in the
// InproxyBrokerClientInstance for efficient lookup.
b := &InproxyBrokerClientInstance{
config: config,
brokerClientManager: brokerClientManager,
networkID: networkID,
brokerClientPrivateKey: brokerClientPrivateKey,
brokerPublicKey: brokerPublicKey,
brokerRootObfuscationSecret: brokerRootObfuscationSecret,
brokerDialParams: brokerDialParams,
replayEnabled: replayEnabled,
isReplay: isReplay,
roundTripper: roundTripper,
personalCompartmentIDs: personalCompartmentIDs,
commonCompartmentIDs: commonCompartmentIDs,
sessionHandshakeTimeout: p.Duration(parameters.InproxySessionHandshakeRoundTripTimeout),
announceRequestTimeout: p.Duration(parameters.InproxyProxyAnnounceRequestTimeout),
announceDelay: p.Duration(parameters.InproxyProxyAnnounceDelay),
announceDelayJitter: p.Float(parameters.InproxyProxyAnnounceDelayJitter),
answerRequestTimeout: p.Duration(parameters.InproxyProxyAnswerRequestTimeout),
offerRequestTimeout: p.Duration(parameters.InproxyClientOfferRequestTimeout),
offerRetryDelay: p.Duration(parameters.InproxyClientOfferRetryDelay),
offerRetryJitter: p.Float(parameters.InproxyClientOfferRetryJitter),
relayedPacketRequestTimeout: p.Duration(parameters.InproxyClientRelayedPacketRequestTimeout),
replayRetainFailedProbability: p.Float(parameters.InproxyReplayBrokerRetainFailedProbability),
replayUpdateFrequency: p.Duration(parameters.InproxyReplayBrokerUpdateFrequency),
}
// Initialize broker client. This will start with a fresh broker session.
//
// When resetBrokerClientOnRoundTripperFailed is invoked due to a failure
// at the transport level -- TLS or domain fronting --
// NewInproxyBrokerClientInstance is invoked, resetting both the broker
// client round tripper and the broker session. As a future enhancement,
// consider distinguishing between transport and session errors and
// retaining a valid established session when only the transport needs to
// be reset/retried.
b.brokerClient, err = inproxy.NewBrokerClient(b)
if err != nil {
return nil, errors.Trace(err)
}
return b, nil
}
func prepareCompartmentIDs(
config *Config,
p parameters.ParametersAccessor,
isProxy bool) ([]inproxy.ID, []inproxy.ID, error) {
// Personal compartment IDs are loaded from the tunnel-core config; these
// are set by the external app based on user input/configuration of IDs
// generated by or obtained from personal proxies. Both clients and
// proxies send personal compartment IDs to the in-proxy broker. For
// clients, when personal compartment IDs are configured, no common
// compartment IDs are prepared, ensuring matches with only proxies that
// supply the corresponding personal compartment IDs.
//
// Common compartment IDs are obtained from tactics and merged with
// previously learned IDs stored in the local datastore. When new IDs are
// obtained from tactics, the merged list is written back to the
// datastore. This allows for schemes where common compartment IDs are
// distributed to sets of clients, then removed from distibution, and
// still used to match proxies to those sets of clients. Only clients
// send common compartment IDs to the in-proxy broker. Proxies are
// automatically assigned to common compartments by the broker.
//
// Maximum compartment ID list lengths are enforced to ensure broker
// request sizes don't grow unbounded.
//
// Limitation: currently, in max length trimming, new common compartment
// IDs take precedence over older IDs.
maxCompartmentIDListLength := p.Int(parameters.InproxyMaxCompartmentIDListLength)
configPersonalCompartmentIDs := config.InproxyProxyPersonalCompartmentIDs
if !isProxy {
configPersonalCompartmentIDs = config.InproxyClientPersonalCompartmentIDs
}
personalCompartmentIDs, err := inproxy.IDsFromStrings(configPersonalCompartmentIDs)
if err != nil {
return nil, nil, errors.Trace(err)
}
if len(personalCompartmentIDs) > maxCompartmentIDListLength {
// Trim the list. It's not expected that user-configured personal
// compartment ID lists will exceed the max length.
//
// TODO: shuffle before trimming? Prioritize previous matches?
personalCompartmentIDs = personalCompartmentIDs[:maxCompartmentIDListLength]
}
var commonCompartmentIDs []inproxy.ID
if !isProxy && len(personalCompartmentIDs) == 0 {
tacticsCommonCompartmentIDs := p.InproxyCompartmentIDs(parameters.InproxyCommonCompartmentIDs)
knownCommonCompartmentIDs, err := LoadInproxyCommonCompartmentIDs()
if err != nil {
NoticeWarning("LoadInproxyCommonCompartmentIDs failed: %v", errors.Trace(err))
// Continue with only the tactics common compartment IDs.
}
newCompartmentIDs := make([]string, 0, len(tacticsCommonCompartmentIDs))
for _, compartmentID := range tacticsCommonCompartmentIDs {
// TODO: faster lookup?
if !common.Contains(knownCommonCompartmentIDs, compartmentID) {
newCompartmentIDs = append(newCompartmentIDs, compartmentID)
}
}
if len(newCompartmentIDs) > 0 {
newCompartmentIDs = append(newCompartmentIDs, knownCommonCompartmentIDs...)
// Locally store more than InproxyMaxCompartmentIDListLength known
// common compartment IDs, in case the request limit parameter is
// increased in the future.
// maxPersistedCommonCompartmentIDListLength still limits the
// length of the list to cap local memory and disk impact.
maxPersistedCommonCompartmentIDListLength := 500 // ~16K
if maxCompartmentIDListLength > maxPersistedCommonCompartmentIDListLength {
maxPersistedCommonCompartmentIDListLength = maxCompartmentIDListLength
}
if len(newCompartmentIDs) > maxPersistedCommonCompartmentIDListLength {
newCompartmentIDs = newCompartmentIDs[:maxPersistedCommonCompartmentIDListLength]
}
err := StoreInproxyCommonCompartmentIDs(newCompartmentIDs)
if err != nil {
NoticeWarning("StoreInproxyCommonCompartmentIDs failed: %v", errors.Trace(err))
// Continue without persisting new common compartment IDs.
}
knownCommonCompartmentIDs = newCompartmentIDs
}
commonCompartmentIDs, err = inproxy.IDsFromStrings(knownCommonCompartmentIDs)
if err != nil {
return nil, nil, errors.Trace(err)
}
if len(commonCompartmentIDs) > maxCompartmentIDListLength {
// TODO: shuffle before trimming? Prioritize previous matches?
commonCompartmentIDs = commonCompartmentIDs[:maxCompartmentIDListLength]
}
}
return commonCompartmentIDs, personalCompartmentIDs, nil
}
// Close closes the broker client round tripped, including closing all
// underlying network connections, which will interrupt any in-flight round
// trips.
func (b *InproxyBrokerClientInstance) Close() error {
err := b.roundTripper.Close()
return errors.Trace(err)
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) NetworkID() string {
return b.networkID
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) NetworkType() inproxy.NetworkType {
return getInproxyNetworkType(GetNetworkType(b.networkID))
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) CommonCompartmentIDs() []inproxy.ID {
return b.commonCompartmentIDs
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) PersonalCompartmentIDs() []inproxy.ID {
return b.personalCompartmentIDs
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) BrokerClientPrivateKey() inproxy.SessionPrivateKey {
return b.brokerClientPrivateKey
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) BrokerPublicKey() inproxy.SessionPublicKey {
return b.brokerPublicKey
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) BrokerRootObfuscationSecret() inproxy.ObfuscationSecret {
return b.brokerRootObfuscationSecret
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) BrokerClientRoundTripper() (inproxy.RoundTripper, error) {
// Returns the same round tripper for the lifetime of the
// inproxy.BrokerDialCoordinator, ensuring all requests for one in-proxy
// dial or proxy relay use the same broker, as is necessary due to the
// broker state for the proxy announce/answer, client broker/server
// relay, etc.
return b.roundTripper, nil
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) BrokerClientRoundTripperSucceeded(roundTripper inproxy.RoundTripper) {
b.mutex.Lock()
defer b.mutex.Unlock()
if rt, ok := roundTripper.(*InproxyBrokerRoundTripper); !ok || rt != b.roundTripper {
// Passing in the round tripper obtained from BrokerClientRoundTripper
// is just used for sanity check in this implementation, since each
// InproxyBrokerClientInstance has exactly one round tripper.
NoticeError("BrokerClientRoundTripperSucceeded: roundTripper instance mismatch")
return
}
// Set replay or extend the broker dial parameters replay TTL after a
// success. With tunnel dial parameters, the replay TTL is extended after
// every successful tunnel connection. Since there are potentially more
// and more frequent broker round trips one tunnel dial, the TTL is only
// extended after some target duration has elapsed, to avoid excessive
// datastore writes.
now := time.Now()
if b.replayEnabled && now.Sub(b.lastStoreReplay) > b.replayUpdateFrequency {
b.brokerDialParams.LastUsedTimestamp = time.Now()
err := SetNetworkReplayParameters[InproxyBrokerDialParameters](
b.networkID, b.brokerDialParams.brokerSpec.BrokerPublicKey, b.brokerDialParams)
if err != nil {
NoticeWarning("StoreBrokerDialParameters failed: %v", errors.Trace(err))
// Continue without persisting replay changes.
} else {
b.lastStoreReplay = now
}
}
// Verify/extend the resolver cache entry for any resolved domain after a
// success.
//
// Limitation: currently this re-extends regardless of how long ago the DNS
// resolve happened.
resolver := b.config.GetResolver()
if resolver != nil {
resolver.VerifyCacheExtension(b.brokerDialParams.FrontingDialAddress)
}
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) BrokerClientRoundTripperFailed(roundTripper inproxy.RoundTripper) {
b.mutex.Lock()
defer b.mutex.Unlock()
if rt, ok := roundTripper.(*InproxyBrokerRoundTripper); !ok || rt != b.roundTripper {
// Passing in the round tripper obtained from BrokerClientRoundTripper
// is just used for sanity check in this implementation, since each
// InproxyBrokerClientInstance has exactly one round tripper.
NoticeError("BrokerClientRoundTripperFailed: roundTripper instance mismatch")
return
}
// Delete any persistent replay dial parameters. Unlike with the success
// case, consecutive, repeated deletes shouldn't write to storage, so
// they are not avoided.
if b.replayEnabled &&
!prng.FlipWeightedCoin(b.replayRetainFailedProbability) {
// Limitation: there's a race condition with multiple
// InproxyBrokerClientInstances writing to the replay datastore for
// the same broker, such as in the case where there's a dual-mode
// in-proxy client and proxy; this delete could potentially clobber a
// concurrent fresh replay store after a success.
//
// TODO: add an additional storage key distinguisher for each instance?
err := DeleteNetworkReplayParameters[InproxyBrokerDialParameters](
b.networkID, b.brokerDialParams.brokerSpec.BrokerPublicKey)
if err != nil {
NoticeWarning("DeleteBrokerDialParameters failed: %v", errors.Trace(err))
// Continue without resetting replay.
}
}
// Invoke resetBrokerClientOnRoundTripperFailed to signal the
// InproxyBrokerClientManager to create a new
// InproxyBrokerClientInstance, with new dial parameters and a new round
// tripper, after a failure.
//
// This InproxyBrokerClientInstance doesn't change its dial parameters or
// round tripper to ensure that any concurrent usage retains affinity
// with the same parameters and broker.
//
// Limitation: a transport-level failure may unnecessarily reset the
// broker session state; see comment in NewInproxyBrokerClientInstance.
err := b.brokerClientManager.resetBrokerClientOnRoundTripperFailed(b)
if err != nil {
NoticeWarning("reset broker client failed: %v", errors.Trace(err))
// Continue with old broker client instance.
}
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) AnnounceRequestTimeout() time.Duration {
return b.announceRequestTimeout
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) SessionHandshakeRoundTripTimeout() time.Duration {
return b.sessionHandshakeTimeout
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) AnnounceDelay() time.Duration {
return b.announceDelay
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) AnnounceDelayJitter() float64 {
return b.announceDelayJitter
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) AnswerRequestTimeout() time.Duration {
return b.answerRequestTimeout
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) OfferRequestTimeout() time.Duration {
return b.offerRequestTimeout
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) OfferRetryDelay() time.Duration {
return b.offerRetryDelay
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) OfferRetryJitter() float64 {
return b.offerRetryJitter
}
// Implements the inproxy.BrokerDialCoordinator interface.
func (b *InproxyBrokerClientInstance) RelayedPacketRequestTimeout() time.Duration {
return b.relayedPacketRequestTimeout
}
// InproxyBrokerDialParameters represents a selected broker transport and dial
// paramaters.
//
// InproxyBrokerDialParameters is used to configure dialers; as a persistent
// record to store successful dial parameters for replay; and to report dial
// stats in notices and Psiphon API calls.
//
// InproxyBrokerDialParameters is similar to tunnel DialParameters, but is
// specific to the in-proxy broker dial phase.
type InproxyBrokerDialParameters struct {
brokerSpec *parameters.InproxyBrokerSpec `json:"-"`
isReplay bool `json:"-"`
LastUsedTimestamp time.Time
LastUsedBrokerSpecHash []byte
NetworkLatencyMultiplier float64
BrokerTransport string
DialAddress string
FrontingProviderID string
FrontingDialAddress string
SNIServerName string
TransformedHostName bool
VerifyServerName string
VerifyPins []string
HostHeader string
ResolvedIPAddress atomic.Value `json:"-"`
TLSProfile string
TLSVersion string
RandomizedTLSProfileSeed *prng.Seed
NoDefaultTLSSessionID bool
TLSFragmentClientHello bool
SelectedUserAgent bool
UserAgent string
BPFProgramName string
BPFProgramInstructions []bpf.RawInstruction
FragmentorSeed *prng.Seed
ResolveParameters *resolver.ResolveParameters
dialConfig *DialConfig `json:"-"`
meekConfig *MeekConfig `json:"-"`
}
// MakeInproxyBrokerDialParameters creates a new InproxyBrokerDialParameters.
func MakeInproxyBrokerDialParameters(
config *Config,
p parameters.ParametersAccessor,
networkID string,
brokerSpec *parameters.InproxyBrokerSpec) (*InproxyBrokerDialParameters, error) {
// This function duplicates some code from MakeDialParameters and
// makeFrontedHTTPClient. To simplify the logic, the Replay
// tactic flags for individual dial components are ignored.
//
// TODO: merge common functionality?
if config.UseUpstreamProxy() {
return nil, errors.TraceNew("upstream proxy unsupported")
}
currentTimestamp := time.Now()
var brokerDialParams *InproxyBrokerDialParameters
// Select new broker dial parameters
brokerDialParams = &InproxyBrokerDialParameters{
brokerSpec: brokerSpec,
LastUsedTimestamp: currentTimestamp,
LastUsedBrokerSpecHash: hashBrokerSpec(brokerSpec),
}
// Network latency multiplier
brokerDialParams.NetworkLatencyMultiplier = prng.ExpFloat64Range(
p.Float(parameters.NetworkLatencyMultiplierMin),
p.Float(parameters.NetworkLatencyMultiplierMax),
p.Float(parameters.NetworkLatencyMultiplierLambda))
// Select fronting configuration
var err error
brokerDialParams.FrontingProviderID,
brokerDialParams.BrokerTransport,
brokerDialParams.FrontingDialAddress,
brokerDialParams.SNIServerName,
brokerDialParams.VerifyServerName,
brokerDialParams.VerifyPins,
brokerDialParams.HostHeader,
err = brokerDialParams.brokerSpec.BrokerFrontingSpecs.SelectParameters()
if err != nil {
return nil, errors.Trace(err)
}
// At this time, the broker client, the transport is limited to fronted
// HTTPS.
//
// As a future enhancement, allow HTTP for the in-proxy broker case, skip
// selecting TLS tactics and select HTTP tactics such as
// HTTPTransformerParameters.
if brokerDialParams.BrokerTransport == protocol.FRONTING_TRANSPORT_HTTP {
return nil, errors.TraceNew("unsupported fronting transport")
}
// Determine and use the equivilent tunnel protocol for tactics
// selections. For example, for the broker transport FRONTED-HTTPS, use
// the tactics for FRONTED-MEEK-OSSH.
equivilentTunnelProtocol, err := protocol.EquivilentTunnelProtocol(brokerDialParams.BrokerTransport)
if err != nil {
return nil, errors.Trace(err)
}
// FrontSpec.Addresses may include a port; default to 443 if none.
if _, _, err := net.SplitHostPort(brokerDialParams.FrontingDialAddress); err == nil {
brokerDialParams.DialAddress = brokerDialParams.FrontingDialAddress
} else {
brokerDialParams.DialAddress = net.JoinHostPort(brokerDialParams.FrontingDialAddress, "443")
}
// SNI configuration
//
// For a FrontingSpec, an SNI value of "" indicates to disable/omit SNI, so
// never transform in that case.
if brokerDialParams.SNIServerName != "" {
if p.WeightedCoinFlip(parameters.TransformHostNameProbability) {
brokerDialParams.SNIServerName = selectHostName(equivilentTunnelProtocol, p)
brokerDialParams.TransformedHostName = true
}
}
// TLS configuration
//
// The requireTLS13 flag is set to true in order to use only modern TLS
// fingerprints which should support HTTP/2 in the ALPN.
//
// TODO: TLS padding, NoDefaultTLSSessionID
brokerDialParams.TLSProfile,
brokerDialParams.TLSVersion,
brokerDialParams.RandomizedTLSProfileSeed,
err = SelectTLSProfile(false, true, true, brokerDialParams.FrontingProviderID, p)
brokerDialParams.NoDefaultTLSSessionID = p.WeightedCoinFlip(
parameters.NoDefaultTLSSessionIDProbability)
if brokerDialParams.SNIServerName != "" && net.ParseIP(brokerDialParams.SNIServerName) == nil {
tlsFragmentorLimitProtocols := p.TunnelProtocols(parameters.TLSFragmentClientHelloLimitProtocols)
if len(tlsFragmentorLimitProtocols) == 0 || common.Contains(tlsFragmentorLimitProtocols, equivilentTunnelProtocol) {
brokerDialParams.TLSFragmentClientHello = p.WeightedCoinFlip(parameters.TLSFragmentClientHelloProbability)
}
}
// User Agent configuration
dialCustomHeaders := makeDialCustomHeaders(config, p)
brokerDialParams.SelectedUserAgent, brokerDialParams.UserAgent = selectUserAgentIfUnset(p, dialCustomHeaders)
// BPF configuration
if ClientBPFEnabled() &&
protocol.TunnelProtocolMayUseClientBPF(equivilentTunnelProtocol) {
if p.WeightedCoinFlip(parameters.BPFClientTCPProbability) {
brokerDialParams.BPFProgramName = ""
brokerDialParams.BPFProgramInstructions = nil
ok, name, rawInstructions := p.BPFProgram(parameters.BPFClientTCPProgram)
if ok {
brokerDialParams.BPFProgramName = name
brokerDialParams.BPFProgramInstructions = rawInstructions
}
}
}
// Fragmentor configuration
brokerDialParams.FragmentorSeed, err = prng.NewSeed()
if err != nil {
return nil, errors.Trace(err)
}
// Resolver configuration
//
// The custom resolcer is wired up only when there is a domain to be
// resolved; GetMetrics will log resolver metrics when the resolver is set.
if net.ParseIP(brokerDialParams.FrontingDialAddress) == nil {
resolver := config.GetResolver()
if resolver == nil {
return nil, errors.TraceNew("missing resolver")
}
brokerDialParams.ResolveParameters, err = resolver.MakeResolveParameters(
p, brokerDialParams.FrontingProviderID, brokerDialParams.FrontingDialAddress)
if err != nil {
return nil, errors.Trace(err)
}
}
// Initialize Dial/MeekConfigs to be passed to the corresponding dialers.
err = brokerDialParams.prepareDialConfigs(config, p, networkID, false, dialCustomHeaders)
if err != nil {
return nil, errors.Trace(err)
}
return brokerDialParams, nil
}
// prepareDialConfigs is called for both new and replayed broker dial parameters.
func (brokerDialParams *InproxyBrokerDialParameters) prepareDialConfigs(
config *Config,
p parameters.ParametersAccessor,
networkID string,
isReplay bool,
dialCustomHeaders http.Header) error {
brokerDialParams.isReplay = isReplay
equivilentTunnelProtocol, err := protocol.EquivilentTunnelProtocol(brokerDialParams.BrokerTransport)
if err != nil {
return errors.Trace(err)
}
// Custom headers and User Agent
if dialCustomHeaders == nil {
dialCustomHeaders = makeDialCustomHeaders(config, p)
}
if brokerDialParams.SelectedUserAgent {
// Limitation: if config.CustomHeaders adds a User-Agent between
// replays, it may be ignored due to replaying a selected User-Agent.
dialCustomHeaders.Set("User-Agent", brokerDialParams.UserAgent)
}
// Fragmentor
fragmentorConfig := fragmentor.NewUpstreamConfig(
p, equivilentTunnelProtocol, brokerDialParams.FragmentorSeed)
// Resolver
//
// DialConfig.ResolveIP is required and called even when the destination
// is an IP address.
resolver := config.GetResolver()
if resolver == nil {
return errors.TraceNew("missing resolver")
}
resolveIP := func(ctx context.Context, hostname string) ([]net.IP, error) {
IPs, err := resolver.ResolveIP(
ctx, networkID, brokerDialParams.ResolveParameters, hostname)
return IPs, errors.Trace(err)
}
// DialConfig
brokerDialParams.ResolvedIPAddress.Store("")
brokerDialParams.dialConfig = &DialConfig{
DiagnosticID: brokerDialParams.brokerSpec.BrokerPublicKey,
CustomHeaders: dialCustomHeaders,
BPFProgramInstructions: brokerDialParams.BPFProgramInstructions,
DeviceBinder: config.deviceBinder,
IPv6Synthesizer: config.IPv6Synthesizer,
ResolveIP: resolveIP,
TrustedCACertificatesFilename: config.TrustedCACertificatesFilename,
FragmentorConfig: fragmentorConfig,
ResolvedIPCallback: func(IPAddress string) {
brokerDialParams.ResolvedIPAddress.Store(IPAddress)
},
}
// MeekDialConfig
//
// The broker round trips use MeekModeWrappedPlaintextRoundTrip without
// meek cookies, so meek obfuscation is not configured. The in-proxy
// broker session payloads have their own obfuscation layer.
addPsiphonFrontingHeader := false
if brokerDialParams.FrontingProviderID != "" {
addPsiphonFrontingHeader = common.Contains(
p.LabeledTunnelProtocols(
parameters.AddFrontingProviderPsiphonFrontingHeader,
brokerDialParams.FrontingProviderID),
equivilentTunnelProtocol)
}
brokerDialParams.meekConfig = &MeekConfig{
Mode: MeekModeWrappedPlaintextRoundTrip,
DiagnosticID: brokerDialParams.FrontingProviderID,
Parameters: config.GetParameters(),
DialAddress: brokerDialParams.DialAddress,
TLSProfile: brokerDialParams.TLSProfile,
NoDefaultTLSSessionID: brokerDialParams.NoDefaultTLSSessionID,
RandomizedTLSProfileSeed: brokerDialParams.RandomizedTLSProfileSeed,
SNIServerName: brokerDialParams.SNIServerName,
AddPsiphonFrontingHeader: addPsiphonFrontingHeader,
VerifyServerName: brokerDialParams.VerifyServerName,
VerifyPins: brokerDialParams.VerifyPins,
HostHeader: brokerDialParams.HostHeader,
TransformedHostName: brokerDialParams.TransformedHostName,
NetworkLatencyMultiplier: brokerDialParams.NetworkLatencyMultiplier,
AdditionalHeaders: config.MeekAdditionalHeaders,
}
switch brokerDialParams.BrokerTransport {
case protocol.FRONTING_TRANSPORT_HTTPS:
brokerDialParams.meekConfig.UseHTTPS = true
case protocol.FRONTING_TRANSPORT_QUIC:
brokerDialParams.meekConfig.UseQUIC = true
}
return nil
}
// GetMetrics implements the common.MetricsSource interface and returns log
// fields detailing the broker dial parameters.
func (brokerDialParams *InproxyBrokerDialParameters) GetMetrics() common.LogFields {
logFields := make(common.LogFields)
logFields["inproxy_broker_transport"] = brokerDialParams.BrokerTransport
isReplay := "0"
if brokerDialParams.isReplay {
isReplay = "1"
}
logFields["inproxy_broker_is_replay"] = isReplay
// Note: as At the broker client transport is currently limited to domain
// fronted HTTPS, the following related parameters are included
// unconditionally.
logFields["inproxy_broker_fronting_provider_id"] = brokerDialParams.FrontingProviderID
logFields["inproxy_broker_dial_address"] = brokerDialParams.FrontingDialAddress
resolvedIPAddress := brokerDialParams.ResolvedIPAddress.Load().(string)
if resolvedIPAddress != "" {
logFields["inproxy_broker_resolved_ip_address"] = resolvedIPAddress
}
if brokerDialParams.SNIServerName != "" {
logFields["inproxy_broker_sni_server_name"] = brokerDialParams.SNIServerName
}
logFields["inproxy_broker_host_header"] = brokerDialParams.HostHeader
transformedHostName := "0"
if brokerDialParams.TransformedHostName {
transformedHostName = "1"
}
logFields["inproxy_broker_transformed_host_name"] = transformedHostName
if brokerDialParams.UserAgent != "" {
logFields["inproxy_broker_user_agent"] = brokerDialParams.UserAgent
}
if brokerDialParams.BrokerTransport == protocol.FRONTING_TRANSPORT_HTTPS {
if brokerDialParams.TLSProfile != "" {
logFields["inproxy_broker_tls_profile"] = brokerDialParams.TLSProfile
}
logFields["inproxy_broker_tls_version"] = brokerDialParams.TLSVersion
tlsFragmented := "0"
if brokerDialParams.TLSFragmentClientHello {
tlsFragmented = "1"
}
logFields["inproxy_broker_tls_fragmented"] = tlsFragmented
}
if brokerDialParams.BPFProgramName != "" {
logFields["inproxy_broker_client_bpf"] = brokerDialParams.BPFProgramName
}
if brokerDialParams.ResolveParameters != nil {
// See comment for dialParams.ResolveParameters handling in
// getBaseAPIParameters.
if brokerDialParams.ResolveParameters.PreresolvedIPAddress != "" {
dialDomain, _, _ := net.SplitHostPort(brokerDialParams.DialAddress)
if brokerDialParams.ResolveParameters.PreresolvedDomain == dialDomain {
logFields["inproxy_broker_dns_preresolved"] = brokerDialParams.ResolveParameters.PreresolvedIPAddress
}
}
if brokerDialParams.ResolveParameters.PreferAlternateDNSServer {
logFields["inproxy_broker_dns_preferred"] = brokerDialParams.ResolveParameters.AlternateDNSServer
}
if brokerDialParams.ResolveParameters.ProtocolTransformName != "" {
logFields["inproxy_broker_dns_transform"] = brokerDialParams.ResolveParameters.ProtocolTransformName
}
logFields["inproxy_broker_dns_attempt"] = strconv.Itoa(
brokerDialParams.ResolveParameters.GetFirstAttemptWithAnswer())
}
// TODO: get fragmentor metrics, if any, from MeekConn.
return logFields
}
// hashBrokerSpec hashes the broker spec. The hash is used to detect when
// broker spec tactics have changed.
func hashBrokerSpec(spec *parameters.InproxyBrokerSpec) []byte {
var hash [8]byte
binary.BigEndian.PutUint64(
hash[:],
uint64(xxhash.Sum64String(fmt.Sprintf("%+v", spec))))
return hash[:]
}
// InproxyBrokerRoundTripper is a broker request round trip transport
// implemented using MeekConn in MeekModePlaintextRoundTrip mode, utilizing
// MeekConn's domain fronting capabilities and using persistent and
// multiplexed connections, via HTTP/2, to support multiple concurrent
// in-flight round trips.
//
// InproxyBrokerRoundTripper implements the inproxy.RoundTripper interface.
type InproxyBrokerRoundTripper struct {
brokerDialParams *InproxyBrokerDialParameters
runCtx context.Context
stopRunning context.CancelFunc
dial int32
dialCompleted chan struct{}
dialErr error
conn *MeekConn
failureThreshold time.Duration
}
// NewInproxyBrokerRoundTripper creates a new InproxyBrokerRoundTripper. The
// initial DialMeek is defered until the first call to RoundTrip, so
// NewInproxyBrokerRoundTripper does not perform any network operations.
//
// The input brokerDialParams dial parameter and config fields must not
// modifed after NewInproxyBrokerRoundTripper is called.
func NewInproxyBrokerRoundTripper(
p parameters.ParametersAccessor,
brokerDialParams *InproxyBrokerDialParameters) *InproxyBrokerRoundTripper {
runCtx, stopRunning := context.WithCancel(context.Background())
return &InproxyBrokerRoundTripper{
brokerDialParams: brokerDialParams,
runCtx: runCtx,
stopRunning: stopRunning,
dialCompleted: make(chan struct{}),
failureThreshold: p.Duration(
parameters.InproxyBrokerRoundTripStatusCodeFailureThreshold),
}
}
// Close interrupts any in-flight request and closes the underlying
// MeekConn.
func (rt *InproxyBrokerRoundTripper) Close() error {
// Interrupt any DialMeek or RoundTrip.
rt.stopRunning()
if atomic.CompareAndSwapInt32(&rt.dial, 0, 1) {
// RoundTrip has not yet been called or has not yet kicked off
// DialMeek, so there is no MeekConn to close. Prevent any future
// DialMeek by signaling dialCompleted and fail any future round trip
// attempt by setting dialErr.
rt.dialErr = errors.TraceNew("closed")
close(rt.dialCompleted)
} else {
// Await any ongoing DialMeek or RoundTrip (stopRunning should
// interrupt either one quickly).
<-rt.dialCompleted
if rt.conn != nil {
_ = rt.conn.Close()
}
}
// As with MeekConn.Close, any Close errors from underlying conns are not
// propagated.
return nil
}
// RoundTrip transports a request to the broker endpoint and returns a
// response.
func (rt *InproxyBrokerRoundTripper) RoundTrip(
ctx context.Context,
roundTripDelay time.Duration,
roundTripTimeout time.Duration,
requestPayload []byte) (_ []byte, retErr error) {
defer func() {
// Log any error which results in invoking BrokerClientRoundTripperFailed.
var failedError *inproxy.RoundTripperFailedError
if std_errors.As(retErr, &failedError) {
NoticeWarning("RoundTripperFailedError: %v", retErr)
}
}()
// Cancel DialMeek or MeekConn.RoundTrip when:
// - Close is called
// - the input context is done
ctx, cancelFunc := common.MergeContextCancel(ctx, rt.runCtx)
defer cancelFunc()
// Apply any round trip delay. Currently, this is used to apply an
// announce request delay post-waitToShareSession, pre-network round
// trip, and cancelable by the above merged context.
if roundTripDelay > 0 {
common.SleepWithContext(ctx, roundTripDelay)
}
// Apply the round trip timeout after any delay is complete.
//
// This timeout includes any TLS handshake network round trips, as
// performed by the initial DialMeek and may be performed subsequently by
// net/http via MeekConn.RoundTrip. These extra round trips should be
// accounted for in the in the difference between client-side request
// timeouts, such as InproxyProxyAnswerRequestTimeout, and broker-side
// handler timeouts, such as InproxyBrokerProxyAnnounceTimeout, with the
// former allowing more time for network round trips.
requestCtx := ctx
if roundTripTimeout > 0 {
var requestCancelFunc context.CancelFunc
requestCtx, requestCancelFunc = context.WithTimeout(ctx, roundTripTimeout)
defer requestCancelFunc()
}
// The first RoundTrip caller will perform the DialMeek step, which
// establishes the TLS trasport connection to the fronted endpoint.
// Following callers will await that DialMeek or share an established
// connection.
//
// To accomodate using custom utls fingerprints, with varying ALPNs, with
// net/http, DialMeek completes a full TLS handshake before instantiating
// the appropriate http.Transport or http2.Transport. Until that first
// DialMeek completes, and unlike standard net/http round trips,
// InproxyBrokerRoundTripper won't spawn distinct TLS persistent
// connections for concurrent round trips. After DialMeek, concurrent
// round trips over HTTP/2 connections may simply share the one TLS
// connection, while concurrent round trips over HTTP connections may
// spawn additional TLS persistent connections.
//
// There is no retry here if DialMeek fails, as higher levels will invoke
// BrokerClientRoundTripperFailed on failure, clear any replay, select
// new dial parameters, and retry.
if atomic.CompareAndSwapInt32(&rt.dial, 0, 1) {
// DialMeek hasn't been called yet.
conn, err := DialMeek(
requestCtx,
rt.brokerDialParams.meekConfig,
rt.brokerDialParams.dialConfig)
if err != nil && ctx.Err() != context.Canceled {
// DialMeek performs an initial TLS handshake. DialMeek errors,
// excluding a cancelled context as happens on shutdown, are
// classified as as RoundTripperFailedErrors, which will invoke
// BrokerClientRoundTripperFailed, resetting the round tripper
// and clearing replay parameters.
err = inproxy.NewRoundTripperFailedError(err)
}
rt.conn = conn
rt.dialErr = err
close(rt.dialCompleted)
if err != nil {
return nil, errors.Trace(rt.dialErr)
}
} else {
// Await any ongoing DialMeek run by a concurrent RoundTrip caller.
select {
case <-rt.dialCompleted:
case <-ctx.Done():
return nil, errors.Trace(ctx.Err())
}
if rt.dialErr != nil {
// There is no NewRoundTripperFailedError wrapping here, as the
// DialMeek caller will wrap its error and
// BrokerClientRoundTripperFailed will be invoked already.
return nil, errors.Trace(rt.dialErr)
}
}
// At this point, rt.conn is an established MeekConn.
// Note that the network address portion of the URL will be ignored by
// MeekConn in favor of the MeekDialConfig, while the path will be used.
url := fmt.Sprintf(
"https://%s/%s",
rt.brokerDialParams.DialAddress,
inproxy.BrokerEndPointName)
request, err := http.NewRequestWithContext(
requestCtx, "POST", url, bytes.NewBuffer(requestPayload))
if err != nil {
return nil, errors.Trace(err)
}
startTime := time.Now()
response, err := rt.conn.RoundTrip(request)
roundTripDuration := time.Since(startTime)
if err == nil {
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
err = fmt.Errorf(
"unexpected response status code %d after %v",
response.StatusCode,
roundTripDuration)
// Depending on the round trip duration, this case is treated as a
// temporary round tripper failure, since we received a response
// from the CDN, secured with TLS and VerifyPins, or from broker
// itself. One common scenario is the CDN returning a temporary
// timeout error, as can happen when CDN timeouts and broker
// timeouts are misaligned, especially for long-polling requests.
//
// In this scenario, we can reuse the existing round tripper and
// it may be counterproductive to return a RoundTripperFailedError
// which will trigger a clearing of any broker dial replay
// parameters as well as reseting the round tripper.
//
// When the round trip duration is sufficiently short, much
// shorter than expected round trip timeouts, this is still
// classified as a RoundTripperFailedError error, as it is more
// likely due to a more serious issue between the CDN and broker.
if rt.failureThreshold > 0 &&
roundTripDuration <= rt.failureThreshold {
err = inproxy.NewRoundTripperFailedError(err)
}
}
} else if ctx.Err() != context.Canceled {
// Other round trip errors, including TLS failures and client-side
// timeouts, but excluding a cancelled context as happens on
// shutdown, are classified as RoundTripperFailedErrors.
err = inproxy.NewRoundTripperFailedError(err)
}
if err != nil {
return nil, errors.Trace(err)
}
responsePayload, err := io.ReadAll(response.Body)
if err != nil {
err = inproxy.NewRoundTripperFailedError(err)
return nil, errors.Trace(err)
}
return responsePayload, nil
}
// InproxyWebRTCDialInstance is the network state and dial parameters for a
// single WebRTC client or proxy connection.
//
// InproxyWebRTCDialInstance implements the inproxy.WebRTCDialCoordinator
// interface, which provides the WebRTC dial configuration and support to the
// in-proxy package.
type InproxyWebRTCDialInstance struct {
config *Config
networkID string
natStateManager *InproxyNATStateManager
stunDialParameters *InproxySTUNDialParameters
webRTCDialParameters *InproxyWebRTCDialParameters
discoverNAT bool
disableSTUN bool
disablePortMapping bool
disableInboundForMobileNetworks bool
disableIPv6ICECandidates bool
discoverNATTimeout time.Duration
webRTCAnswerTimeout time.Duration
awaitDataChannelTimeout time.Duration
proxyDestinationDialTimeout time.Duration
}
// NewInproxyWebRTCDialInstance creates a new InproxyWebRTCDialInstance.
//
// The caller provides STUN and WebRTC dial parameters that are either newly
// generated or replayed. Proxies may optionally pass in nil for either
// stunDialParameters or webRTCDialParameters, and new parameters will be
// generated.
func NewInproxyWebRTCDialInstance(
config *Config,
networkID string,
isProxy bool,
natStateManager *InproxyNATStateManager,
stunDialParameters *InproxySTUNDialParameters,
webRTCDialParameters *InproxyWebRTCDialParameters) (*InproxyWebRTCDialInstance, error) {
p := config.GetParameters().Get()
defer p.Close()
if isProxy && stunDialParameters == nil {
// Auto-generate STUN dial parameters. There's no replay in this case.
var err error
stunDialParameters, err = MakeInproxySTUNDialParameters(config, p, isProxy)
if err != nil {
return nil, errors.Trace(err)
}
}
if isProxy && webRTCDialParameters == nil {
// Auto-generate STUN dial parameters. There's no replay in this case.
var err error
webRTCDialParameters, err = MakeInproxyWebRTCDialParameters(p)
if err != nil {
return nil, errors.Trace(err)
}
}
disableSTUN := p.Bool(parameters.InproxyDisableSTUN)
disablePortMapping := p.Bool(parameters.InproxyDisablePortMapping)
disableInboundForMobileNetworks := p.Bool(parameters.InproxyDisableInboundForMobileNetworks)
disableIPv6ICECandidates := p.Bool(parameters.InproxyDisableIPv6ICECandidates)
var discoverNATTimeout, awaitDataChannelTimeout time.Duration
if isProxy {
disableSTUN = disableSTUN || p.Bool(parameters.InproxyProxyDisableSTUN)
disablePortMapping = disablePortMapping || p.Bool(parameters.InproxyProxyDisablePortMapping)
disableInboundForMobileNetworks = disableInboundForMobileNetworks ||
p.Bool(parameters.InproxyProxyDisableInboundForMobileNetworks)
disableIPv6ICECandidates = disableIPv6ICECandidates ||
p.Bool(parameters.InproxyProxyDisableIPv6ICECandidates)
discoverNATTimeout = p.Duration(parameters.InproxyProxyDiscoverNATTimeout)
awaitDataChannelTimeout = p.Duration(parameters.InproxyProxyWebRTCAwaitDataChannelTimeout)
} else {
disableSTUN = disableSTUN || p.Bool(parameters.InproxyClientDisableSTUN)
disablePortMapping = disablePortMapping || p.Bool(parameters.InproxyClientDisablePortMapping)
disableInboundForMobileNetworks = disableInboundForMobileNetworks ||
p.Bool(parameters.InproxyClientDisableInboundForMobileNetworks)
disableIPv6ICECandidates = disableIPv6ICECandidates ||
p.Bool(parameters.InproxyClientDisableIPv6ICECandidates)
discoverNATTimeout = p.Duration(parameters.InproxyClientDiscoverNATTimeout)
awaitDataChannelTimeout = p.Duration(parameters.InproxyClientWebRTCAwaitDataChannelTimeout)
}
// Parameters such as disabling certain operations and operation timeouts
// are not replayed, but snapshots are stored in the
// InproxyWebRTCDialInstance for efficient lookup.
return &InproxyWebRTCDialInstance{
config: config,
networkID: networkID,
natStateManager: natStateManager,
stunDialParameters: stunDialParameters,
webRTCDialParameters: webRTCDialParameters,
// discoverNAT is ignored by proxies, which always attempt discovery.
// webRTCAnswerTimeout and proxyDestinationDialTimeout are used only
// by proxies.
discoverNAT: p.WeightedCoinFlip(parameters.InproxyClientDiscoverNATProbability),
disableSTUN: disableSTUN,
disablePortMapping: disablePortMapping,
disableInboundForMobileNetworks: disableInboundForMobileNetworks,
disableIPv6ICECandidates: disableIPv6ICECandidates,
discoverNATTimeout: discoverNATTimeout,
webRTCAnswerTimeout: p.Duration(parameters.InproxyWebRTCAnswerTimeout),
awaitDataChannelTimeout: awaitDataChannelTimeout,
proxyDestinationDialTimeout: p.Duration(parameters.InproxyProxyDestinationDialTimeout),
}, nil
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) NetworkID() string {
return w.networkID
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) NetworkType() inproxy.NetworkType {
return getInproxyNetworkType(GetNetworkType(w.networkID))
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) ClientRootObfuscationSecret() inproxy.ObfuscationSecret {
return w.webRTCDialParameters.RootObfuscationSecret
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) DoDTLSRandomization() bool {
return w.webRTCDialParameters.DoDTLSRandomization
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) DataChannelTrafficShapingParameters() *inproxy.DataChannelTrafficShapingParameters {
return w.webRTCDialParameters.DataChannelTrafficShapingParameters
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) STUNServerAddress(RFC5780 bool) string {
if RFC5780 {
return w.stunDialParameters.STUNServerAddressRFC5780
} else {
return w.stunDialParameters.STUNServerAddress
}
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) STUNServerAddressResolved(RFC5780 bool) string {
if RFC5780 {
return w.stunDialParameters.STUNServerAddressRFC5780
} else {
return w.stunDialParameters.STUNServerAddress
}
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) STUNServerAddressSucceeded(RFC5780 bool, address string) {
// Currently, for client tunnel dials, STUN dial parameter replay is
// managed by DialParameters and DialParameters.InproxySTUNDialParameters
// are replayed only when the entire dial succeeds.
//
// Note that, for a client tunnel dial, even if the STUN step fails and
// there are no STUN ICE candidates, the subsequent WebRTC connection may
// still proceed and be successful. In this case, the failed STUN dial
// parameters may be replayed.
//
// For proxies, there is no STUN dial parameter replay.
//
// As a future enhancement, consider independent and shared replay of
// working STUN servers, similar to how broker client dial parameters are
// replayed independent of overall dials and proxy relays, and shared
// between local client and proxy instances.
// Verify/extend the resolver cache entry for any resolved domain after a
// success.
resolver := w.config.GetResolver()
if resolver != nil {
resolver.VerifyCacheExtension(address)
}
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) STUNServerAddressFailed(RFC5780 bool, address string) {
// Currently there is no independent replay for STUN dial parameters. See
// comment in STUNServerAddressSucceeded.
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) DiscoverNAT() bool {
return w.discoverNAT
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) DisableSTUN() bool {
return w.disableSTUN
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) DisablePortMapping() bool {
return w.disablePortMapping
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) DisableInboundForMobileNetworks() bool {
return w.disableInboundForMobileNetworks
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) DisableIPv6ICECandidates() bool {
return w.disableIPv6ICECandidates
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) NATType() inproxy.NATType {
return w.natStateManager.getNATType(w.networkID)
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) SetNATType(natType inproxy.NATType) {
w.natStateManager.setNATType(w.networkID, natType)
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) PortMappingTypes() inproxy.PortMappingTypes {
return w.natStateManager.getPortMappingTypes(w.networkID)
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) SetPortMappingTypes(portMappingTypes inproxy.PortMappingTypes) {
w.natStateManager.setPortMappingTypes(w.networkID, portMappingTypes)
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) ResolveAddress(ctx context.Context, network, address string) (string, error) {
// Use the Psiphon resolver to resolve addresses.
r := w.config.GetResolver()
if r == nil {
return "", errors.TraceNew("missing resolver")
}
// Identify when the address to be resolved is one of the configured STUN
// servers, and, in those cases, use/replay any STUN dial parameters
// ResolveParameters; and record the resolved IP address for metrics.
//
// In the in-proxy proxy case, ResolveAddress is invoked for the upstream,
// 2nd hop dial as well as for STUN server addresses.
//
// Limitation: there's no ResolveParameters, including no preresolved DNS
// tactics, for 2nd hop dials.
isSTUNServerAddress := address == w.stunDialParameters.STUNServerAddress
isSTUNServerAddressRFC5780 := address == w.stunDialParameters.STUNServerAddressRFC5780
var resolveParams *resolver.ResolveParameters
if isSTUNServerAddress || isSTUNServerAddressRFC5780 {
resolveParams = w.stunDialParameters.ResolveParameters
}
resolved, err := r.ResolveAddress(
ctx, w.networkID, resolveParams, network, address)
if err != nil {
return "", errors.Trace(err)
}
// Invoke the resolved IP callbacks only when the input is not the
// resolved IP address (this differs from the meek
// DialConfig.ResolvedIPCallback case).
if resolved != address {
if isSTUNServerAddress {
w.stunDialParameters.STUNServerResolvedIPAddress.Store(resolved)
} else if isSTUNServerAddressRFC5780 {
w.stunDialParameters.STUNServerRFC5780ResolvedIPAddress.Store(resolved)
}
}
return resolved, nil
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) UDPListen(ctx context.Context) (net.PacketConn, error) {
// Create a new inproxyUDPConn for use as the in-proxy STUN and/ord WebRTC
// UDP socket.
conn, err := newInproxyUDPConn(ctx, w.config)
if err != nil {
return nil, errors.Trace(err)
}
return conn, nil
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) UDPConn(
ctx context.Context, network, remoteAddress string) (net.PacketConn, error) {
// Create a new UDPConn bound to the specified remote address. This UDP
// conn is used, by the inproxy package, to determine the local address
// of the active interface the OS will select for the specified remote
// destination.
//
// Only IP address destinations are supported. ResolveIP is wired up only
// because NewUDPConn requires a non-nil resolver.
dialConfig := &DialConfig{
DeviceBinder: w.config.deviceBinder,
IPv6Synthesizer: w.config.IPv6Synthesizer,
ResolveIP: func(_ context.Context, hostname string) ([]net.IP, error) {
IP := net.ParseIP(hostname)
if IP == nil {
return nil, errors.TraceNew("not supported")
}
return []net.IP{IP}, nil
},
}
conn, _, err := NewUDPConn(ctx, network, true, "", remoteAddress, dialConfig)
if err != nil {
return nil, errors.Trace(err)
}
return conn, nil
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) BindToDevice(fileDescriptor int) error {
if w.config.deviceBinder == nil {
return nil
}
// Use config.deviceBinder, with wired up logging, not
// config.DeviceBinder; other tunnel-core dials do this indirectly via
// psiphon.DialConfig.
_, err := w.config.deviceBinder.BindToDevice(fileDescriptor)
return errors.Trace(err)
}
func (w *InproxyWebRTCDialInstance) ProxyUpstreamDial(
ctx context.Context, network, address string) (net.Conn, error) {
// This implementation of ProxyUpstreamDial applies additional socket
// options and BindToDevice as required, but is otherwise a stock dialer.
//
// TODO: Use custom UDP and TCP dialers, and wire up TCP/UDP-level
// tactics, including BPF and the custom resolver, which may be enabled
// for the proxy's ISP or geolocation. Orchestrating preresolved DNS
// requires additional information from either from the broker, the
// FrontingProviderID, to be applied to any
// DNSResolverPreresolvedIPAddressCIDRs proxy tactics. In addition,
// replay the selected upstream dial tactics parameters.
dialer := net.Dialer{
Control: func(_, _ string, c syscall.RawConn) error {
var controlErr error
err := c.Control(func(fd uintptr) {
socketFD := int(fd)
setAdditionalSocketOptions(socketFD)
if w.config.deviceBinder != nil {
_, err := w.config.deviceBinder.BindToDevice(socketFD)
if err != nil {
controlErr = errors.Tracef("BindToDevice failed: %s", err)
return
}
}
})
if controlErr != nil {
return errors.Trace(controlErr)
}
return errors.Trace(err)
},
}
conn, err := dialer.DialContext(ctx, network, address)
if err != nil {
return nil, errors.Trace(err)
}
return conn, nil
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) DiscoverNATTimeout() time.Duration {
return w.discoverNATTimeout
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) WebRTCAnswerTimeout() time.Duration {
return w.webRTCAnswerTimeout
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) WebRTCAwaitDataChannelTimeout() time.Duration {
return w.awaitDataChannelTimeout
}
// Implements the inproxy.WebRTCDialCoordinator interface.
func (w *InproxyWebRTCDialInstance) ProxyDestinationDialTimeout() time.Duration {
return w.proxyDestinationDialTimeout
}
// InproxySTUNDialParameters is a set of STUN dial parameters.
// InproxySTUNDialParameters is compatible with DialParameters JSON
// marshaling. For client in-proxy tunnel dials, DialParameters will manage
// STUN dial parameter selection and replay.
//
// When an instance of InproxySTUNDialParameters is unmarshaled from JSON,
// Prepare must be called to initialize the instance for use.
type InproxySTUNDialParameters struct {
ResolveParameters *resolver.ResolveParameters
STUNServerAddress string
STUNServerAddressRFC5780 string
STUNServerResolvedIPAddress atomic.Value `json:"-"`
STUNServerRFC5780ResolvedIPAddress atomic.Value `json:"-"`
}
// MakeInproxySTUNDialParameters generates new STUN dial parameters from the
// given tactics parameters.
func MakeInproxySTUNDialParameters(
config *Config,
p parameters.ParametersAccessor,
isProxy bool) (*InproxySTUNDialParameters, error) {
var stunServerAddresses, stunServerAddressesRFC5780 []string
if isProxy {
stunServerAddresses = p.Strings(
parameters.InproxyProxySTUNServerAddresses, parameters.InproxySTUNServerAddresses)
stunServerAddressesRFC5780 = p.Strings(
parameters.InproxyProxySTUNServerAddressesRFC5780, parameters.InproxySTUNServerAddressesRFC5780)
} else {
stunServerAddresses = p.Strings(
parameters.InproxyClientSTUNServerAddresses, parameters.InproxySTUNServerAddresses)
stunServerAddressesRFC5780 = p.Strings(
parameters.InproxyClientSTUNServerAddressesRFC5780, parameters.InproxySTUNServerAddressesRFC5780)
}
// Empty STUN server address lists are not an error condition. When used
// for WebRTC, the STUN ICE candidate gathering will be skipped but the
// WebRTC connection may still be established via other candidate types.
var stunServerAddress, stunServerAddressRFC5780 string
if len(stunServerAddresses) > 0 {
stunServerAddress = stunServerAddresses[prng.Range(0, len(stunServerAddresses)-1)]
}
if len(stunServerAddressesRFC5780) > 0 {
stunServerAddressRFC5780 =
stunServerAddressesRFC5780[prng.Range(0, len(stunServerAddressesRFC5780)-1)]
}
// Create DNS resolver dial parameters to use when resolving STUN server
// domain addresses. Instantiate only when there is a domain to be
// resolved; when recording DNS fields, GetMetrics will assume that a nil
// InproxySTUNDialParameters.ResolveParameters implies no resolve was
// attempted.
var resolveParameters *resolver.ResolveParameters
if (stunServerAddress != "" && net.ParseIP(stunServerAddress) == nil) ||
(stunServerAddressRFC5780 != "" && net.ParseIP(stunServerAddressRFC5780) == nil) {
// No DNSResolverPreresolvedIPAddressCIDRs will be selected since no
// fronting provider ID is specified.
//
// It would be possible to overload the meaning of the fronting
// provider ID field by using a string derived from STUN server
// address as the key.
//
// However, preresolved STUN configuration can already be achieved
// with IP addresses in the STUNServerAddresses tactics parameters.
// This approach results in slightly different metrics log fields vs.
// preresolved.
var err error
resolveParameters, err = config.GetResolver().MakeResolveParameters(p, "", "")
if err != nil {
return nil, errors.Trace(err)
}
}
dialParams := &InproxySTUNDialParameters{
ResolveParameters: resolveParameters,
STUNServerAddress: stunServerAddress,
STUNServerAddressRFC5780: stunServerAddressRFC5780,
}
dialParams.Prepare()
return dialParams, nil
}
// Prepare initializes an InproxySTUNDialParameters for use. Prepare should be
// called for any InproxySTUNDialParameters instance unmarshaled from JSON.
func (dialParams *InproxySTUNDialParameters) Prepare() {
dialParams.STUNServerResolvedIPAddress.Store("")
dialParams.STUNServerRFC5780ResolvedIPAddress.Store("")
}
// IsValidClientReplay checks that the selected STUN servers remain configured
// STUN server candidates for in-proxy clients.
func (dialParams *InproxySTUNDialParameters) IsValidClientReplay(
p parameters.ParametersAccessor) bool {
return (dialParams.STUNServerAddress == "" ||
common.Contains(
p.Strings(parameters.InproxyClientSTUNServerAddresses),
dialParams.STUNServerAddress)) &&
(dialParams.STUNServerAddressRFC5780 == "" ||
common.Contains(
p.Strings(parameters.InproxyClientSTUNServerAddressesRFC5780),
dialParams.STUNServerAddressRFC5780))
}
// GetMetrics implements the common.MetricsSource interface and returns log
// fields detailing the STUN dial parameters.
func (dialParams *InproxySTUNDialParameters) GetMetrics() common.LogFields {
// There is no is_replay-type field added here; replay is handled at a
// higher level, and, for client in-proxy tunnel dials, is part of the
// main tunnel dial parameters.
logFields := make(common.LogFields)
logFields["inproxy_webrtc_stun_server"] = dialParams.STUNServerAddress
resolvedIPAddress := dialParams.STUNServerResolvedIPAddress.Load().(string)
if resolvedIPAddress != "" {
logFields["inproxy_webrtc_stun_server_resolved_ip_address"] = resolvedIPAddress
}
// TODO: log RFC5780 selection only if used?
logFields["inproxy_webrtc_stun_server_RFC5780"] = dialParams.STUNServerAddressRFC5780
resolvedIPAddress = dialParams.STUNServerRFC5780ResolvedIPAddress.Load().(string)
if resolvedIPAddress != "" {
logFields["inproxy_webrtc_stun_server_RFC5780_resolved_ip_address"] = resolvedIPAddress
}
if dialParams.ResolveParameters != nil {
// See comment in getBaseAPIParameters regarding
// dialParams.ResolveParameters handling. As noted in
// MakeInproxySTUNDialParameters, no preresolved parameters are set,
// so none are checked for logging.
//
// Limitation: the potential use of single ResolveParameters to
// resolve multiple, different STUN server domains can skew the
// meaning of GetFirstAttemptWithAnswer.
if dialParams.ResolveParameters.PreferAlternateDNSServer {
logFields["inproxy_webrtc_dns_preferred"] = dialParams.ResolveParameters.AlternateDNSServer
}
if dialParams.ResolveParameters.ProtocolTransformName != "" {
logFields["inproxy_webrtc_dns_transform"] = dialParams.ResolveParameters.ProtocolTransformName
}
logFields["inproxy_webrtc_dns_attempt"] = strconv.Itoa(
dialParams.ResolveParameters.GetFirstAttemptWithAnswer())
}
return logFields
}
// InproxyWebRTCDialParameters is a set of WebRTC obfuscation dial parameters.
// InproxyWebRTCDialParameters is compatible with DialParameters JSON
// marshaling. For client in-proxy tunnel dials, DialParameters will manage
// WebRTC dial parameter selection and replay.
type InproxyWebRTCDialParameters struct {
RootObfuscationSecret inproxy.ObfuscationSecret
DataChannelTrafficShapingParameters *inproxy.DataChannelTrafficShapingParameters
DoDTLSRandomization bool
}
// MakeInproxyWebRTCDialParameters generates new InproxyWebRTCDialParameters.
func MakeInproxyWebRTCDialParameters(
p parameters.ParametersAccessor) (*InproxyWebRTCDialParameters, error) {
rootObfuscationSecret, err := inproxy.GenerateRootObfuscationSecret()
if err != nil {
return nil, errors.Trace(err)
}
var trafficSharingParams inproxy.DataChannelTrafficShapingParameters
if p.WeightedCoinFlip(parameters.InproxyDataChannelTrafficShapingProbability) {
trafficSharingParams = inproxy.DataChannelTrafficShapingParameters(
p.InproxyDataChannelTrafficShapingParameters(
parameters.InproxyDataChannelTrafficShapingParameters))
}
doDTLSRandomization := p.WeightedCoinFlip(parameters.InproxyDTLSRandomizationProbability)
return &InproxyWebRTCDialParameters{
RootObfuscationSecret: rootObfuscationSecret,
DataChannelTrafficShapingParameters: &trafficSharingParams,
DoDTLSRandomization: doDTLSRandomization,
}, nil
}
// GetMetrics implements the common.MetricsSource interface.
func (dialParams *InproxyWebRTCDialParameters) GetMetrics() common.LogFields {
// There is no is_replay-type field added here; replay is handled at a
// higher level, and, for client in-proxy tunnel dials, is part of the
// main tunnel dial parameters.
// Currently, all WebRTC metrics are delivered via
// inproxy.ClientConn/WebRTCConn GetMetrics.
return common.LogFields{}
}
// InproxyNATStateManager manages the NAT-related network topology state for
// the current network, caching the discovered network NAT type and supported
// port mapping types, if any.
type InproxyNATStateManager struct {
config *Config
mutex sync.Mutex
networkID string
natType inproxy.NATType
portMappingTypes inproxy.PortMappingTypes
}
// NewInproxyNATStateManager creates a new InproxyNATStateManager.
func NewInproxyNATStateManager(config *Config) *InproxyNATStateManager {
s := &InproxyNATStateManager{
config: config,
natType: inproxy.NATTypeUnknown,
portMappingTypes: inproxy.PortMappingTypes{},
}
s.reset()
return s
}
// TacticsApplied implements the TacticsAppliedReceiver interface, and is
// called when tactics have changed, which triggers a cached NAT state reset
// in order to apply potentially changed parameters.
func (s *InproxyNATStateManager) TacticsApplied() error {
s.reset()
return nil
}
func (s *InproxyNATStateManager) reset() {
s.mutex.Lock()
defer s.mutex.Unlock()
networkID := s.config.GetNetworkID()
s.networkID = networkID
s.natType = inproxy.NATTypeUnknown
s.portMappingTypes = inproxy.PortMappingTypes{}
}
func (s *InproxyNATStateManager) getNATType(
networkID string) inproxy.NATType {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.networkID != networkID {
return inproxy.NATTypeUnknown
}
return s.natType
}
func (s *InproxyNATStateManager) setNATType(
networkID string, natType inproxy.NATType) {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.networkID != networkID {
return
}
s.natType = natType
}
func (s *InproxyNATStateManager) getPortMappingTypes(
networkID string) inproxy.PortMappingTypes {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.networkID != networkID {
return inproxy.PortMappingTypes{}
}
return s.portMappingTypes
}
func (s *InproxyNATStateManager) setPortMappingTypes(
networkID string, portMappingTypes inproxy.PortMappingTypes) {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.networkID != networkID {
return
}
s.portMappingTypes = portMappingTypes
}
// inproxyUDPConn is based on NewUDPConn and includes the write timeout
// workaround from common.WriteTimeoutUDPConn.
//
// inproxyUDPConn expands the NewUDPConn IPv6Synthesizer to support many
// destination addresses, as the inproxyUDPConn will be used to send/receive
// packets between many remote destination addresses.
//
// inproxyUDPConn implements the net.PacketConn interface.
type inproxyUDPConn struct {
udpConn *net.UDPConn
ipv6Synthesizer IPv6Synthesizer
synthesizerMutex sync.Mutex
ipv4ToIPv6 map[netip.Addr]net.IP
ipv6ToIPv4 map[netip.Addr]net.IP
}
func newInproxyUDPConn(ctx context.Context, config *Config) (net.PacketConn, error) {
listen := &net.ListenConfig{
Control: func(_, _ string, c syscall.RawConn) error {
var controlErr error
err := c.Control(func(fd uintptr) {
socketFD := int(fd)
setAdditionalSocketOptions(socketFD)
// Use config.deviceBinder, with wired up logging, not
// config.DeviceBinder; other tunnel-core dials do this
// indirectly via psiphon.DialConfig.
if config.deviceBinder != nil {
_, err := config.deviceBinder.BindToDevice(socketFD)
if err != nil {
controlErr = errors.Tracef("BindToDevice failed: %s", err)
return
}
}
})
if controlErr != nil {
return errors.Trace(controlErr)
}
return errors.Trace(err)
},
}
// Create an "unconnected" UDP socket for use with WriteTo and listening
// on all interfaces. See the limitation comment in NewUDPConn regarding
// its equivilent mode.
packetConn, err := listen.ListenPacket(ctx, "udp", "")
if err != nil {
return nil, errors.Trace(err)
}
var ok bool
udpConn, ok := packetConn.(*net.UDPConn)
if !ok {
return nil, errors.Tracef("unexpected conn type: %T", packetConn)
}
conn := &inproxyUDPConn{
udpConn: udpConn,
ipv6Synthesizer: config.IPv6Synthesizer,
}
if conn.ipv6Synthesizer != nil {
conn.ipv4ToIPv6 = make(map[netip.Addr]net.IP)
conn.ipv6ToIPv4 = make(map[netip.Addr]net.IP)
}
return conn, nil
}
func inproxyUDPAddrFromAddrPort(addrPort netip.AddrPort) *net.UDPAddr {
return &net.UDPAddr{
IP: addrPort.Addr().AsSlice(),
Port: int(addrPort.Port()),
}
}
func (conn *inproxyUDPConn) ReadFrom(p []byte) (int, net.Addr, error) {
// net.UDPConn.ReadFrom currently allocates a &UDPAddr{} per call, and so
// the &net.UDPAddr{} allocations done in the following synthesizer code
// path are no more than the standard code path.
//
// TODO: avoid all address allocations in both ReadFrom and WriteTo by:
//
// - changing ipvXToIPvY to map[netip.AddrPort]*net.UDPAddr
// - using a similar lookup for the non-synthesizer code path
//
// Such a scheme would work only if the caller is guaranteed to not mutate
// the returned net.Addr.
if conn.ipv6Synthesizer == nil {
// Do not wrap any I/O err returned by UDPConn
return conn.udpConn.ReadFrom(p)
}
n, addrPort, err := conn.udpConn.ReadFromUDPAddrPort(p)
// Reverse any synthesized address before returning err.
// Reverse the IPv6 synthesizer, returning the original IPv4 address
// as expected by the caller, including pion/webrtc. This logic
// assumes that no synthesized IPv6 address will conflict with any
// real IPv6 address.
var IP net.IP
ipAddr := addrPort.Addr()
if ipAddr.Is6() {
conn.synthesizerMutex.Lock()
IP, _ = conn.ipv6ToIPv4[ipAddr]
conn.synthesizerMutex.Unlock()
}
if IP == nil {
IP = ipAddr.AsSlice()
}
// Do not wrap any I/O err returned by UDPConn
return n, &net.UDPAddr{IP: IP, Port: int(addrPort.Port())}, err
}
func (conn *inproxyUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) {
// See common.WriteTimeoutUDPConn.
err := conn.udpConn.SetWriteDeadline(
time.Now().Add(common.UDP_PACKET_WRITE_TIMEOUT))
if err != nil {
return 0, errors.Trace(err)
}
if conn.ipv6Synthesizer == nil {
// Do not wrap any I/O err returned by UDPConn
return conn.udpConn.WriteTo(b, addr)
}
// When configured, attempt to synthesize IPv6 addresses from an IPv4
// addresses for compatibility on DNS64/NAT64 networks.
//
// Store any synthesized addresses in a lookup table and reuse for
// subsequent writes to the same destination as well as reversing the
// conversion on reads.
//
// If synthesize fails, fall back to trying the original address.
// The netip.Addr type is used as the map key and the input address is
// assumed to be of the type *net.UDPAddr. This allows for more efficient
// lookup operations vs. a string key and parsing the input address via
// addr.String()/net.SplitHostPort().
udpAddr, ok := addr.(*net.UDPAddr)
if !ok {
return 0, errors.Tracef("unexpected addr type: %T", addr)
}
// Stack allocate to avoid an extra heap allocation per write.
var synthesizedAddr net.UDPAddr
if udpAddr.IP.To4() != nil {
ip4Addr, ok := netip.AddrFromSlice(udpAddr.IP)
if !ok {
return 0, errors.Tracef("invalid addr")
}
conn.synthesizerMutex.Lock()
synthesizedIP, ok := conn.ipv4ToIPv6[ip4Addr]
conn.synthesizerMutex.Unlock()
if ok {
synthesizedAddr = net.UDPAddr{IP: synthesizedIP, Port: udpAddr.Port}
} else {
synthesized := conn.ipv6Synthesizer.IPv6Synthesize(udpAddr.IP.String())
if synthesized != "" {
synthesizedIP := net.ParseIP(synthesized)
if synthesizedIP != nil {
conn.synthesizerMutex.Lock()
conn.ipv4ToIPv6[ip4Addr] = synthesizedIP
ipv6Addr, _ := netip.AddrFromSlice(synthesizedIP)
conn.ipv6ToIPv4[ipv6Addr] = udpAddr.IP
conn.synthesizerMutex.Unlock()
synthesizedAddr = net.UDPAddr{IP: synthesizedIP, Port: udpAddr.Port}
}
}
}
}
if synthesizedAddr.IP == nil {
// Do not wrap any I/O err returned by UDPConn
return conn.udpConn.WriteTo(b, addr)
}
return conn.udpConn.WriteTo(b, &synthesizedAddr)
}
func (conn *inproxyUDPConn) Close() error {
// Do not wrap any I/O err returned by UDPConn
return conn.udpConn.Close()
}
func (conn *inproxyUDPConn) LocalAddr() net.Addr {
// Do not wrap any I/O err returned by UDPConn
return conn.udpConn.LocalAddr()
}
func (conn *inproxyUDPConn) SetDeadline(t time.Time) error {
// Do not wrap any I/O err returned by UDPConn
return conn.udpConn.SetDeadline(t)
}
func (conn *inproxyUDPConn) SetReadDeadline(t time.Time) error {
// Do not wrap any I/O err returned by UDPConn
return conn.udpConn.SetReadDeadline(t)
}
func (conn *inproxyUDPConn) SetWriteDeadline(t time.Time) error {
// Do not wrap any I/O err returned by UDPConn
return conn.udpConn.SetWriteDeadline(t)
}
// getInproxyNetworkType converts a legacy string network type to an inproxy
// package type.
func getInproxyNetworkType(networkType string) inproxy.NetworkType {
// There is no VPN type conversion; clients and proxies will skip/fail
// in-proxy operations on non-Psiphon VPN networks.
switch networkType {
case "WIFI":
return inproxy.NetworkTypeWiFi
case "MOBILE":
return inproxy.NetworkTypeMobile
}
return inproxy.NetworkTypeUnknown
}