|
|
@@ -20,9 +20,10 @@
|
|
|
package inproxy
|
|
|
|
|
|
import (
|
|
|
+ "bytes"
|
|
|
"context"
|
|
|
+ "encoding/binary"
|
|
|
"fmt"
|
|
|
- "math"
|
|
|
"net"
|
|
|
"strconv"
|
|
|
"sync"
|
|
|
@@ -31,6 +32,8 @@ import (
|
|
|
|
|
|
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
|
|
|
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
|
|
|
+ inproxy_dtls "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy/dtls"
|
|
|
+ "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
|
|
|
"github.com/pion/datachannel"
|
|
|
"github.com/pion/ice/v2"
|
|
|
"github.com/pion/sdp/v3"
|
|
|
@@ -41,13 +44,16 @@ import (
|
|
|
const (
|
|
|
dataChannelBufferedAmountLowThreshold uint64 = 512 * 1024
|
|
|
dataChannelMaxBufferedAmount uint64 = 1024 * 1024
|
|
|
+ dataChannelMaxMessageSize = 65536
|
|
|
)
|
|
|
|
|
|
// WebRTCConn is a WebRTC connection between two peers, with a data channel
|
|
|
// used to relay streams or packets between them. WebRTCConn implements the
|
|
|
// net.Conn interface.
|
|
|
type WebRTCConn struct {
|
|
|
- config *WebRTCConfig
|
|
|
+ config *WebRTCConfig
|
|
|
+ trafficShapingParameters *DataChannelTrafficShapingParameters
|
|
|
+
|
|
|
mutex sync.Mutex
|
|
|
udpConn net.PacketConn
|
|
|
portMapper *portMapper
|
|
|
@@ -59,11 +65,21 @@ type WebRTCConn struct {
|
|
|
dataChannelOpenedSignal chan struct{}
|
|
|
dataChannelOpenedOnce sync.Once
|
|
|
dataChannelWriteBufferSignal chan struct{}
|
|
|
- messageMutex sync.Mutex
|
|
|
- messageBuffer []byte
|
|
|
- messageOffset int
|
|
|
- messageLength int
|
|
|
- messageError error
|
|
|
+ decoyDone bool
|
|
|
+
|
|
|
+ readMutex sync.Mutex
|
|
|
+ readBuffer []byte
|
|
|
+ readOffset int
|
|
|
+ readLength int
|
|
|
+ readError error
|
|
|
+ peerPaddingDone bool
|
|
|
+
|
|
|
+ writeMutex sync.Mutex
|
|
|
+ trafficShapingPRNG *prng.PRNG
|
|
|
+ trafficShapingBuffer *bytes.Buffer
|
|
|
+ paddedMessageCount int
|
|
|
+ decoyMessageCount int
|
|
|
+ trafficShapingDone bool
|
|
|
}
|
|
|
|
|
|
// WebRTCConfig specifies the configuration for a WebRTC dial.
|
|
|
@@ -84,6 +100,9 @@ type WebRTCConfig struct {
|
|
|
// DoDTLSRandomization indicates whether to perform DTLS randomization.
|
|
|
DoDTLSRandomization bool
|
|
|
|
|
|
+ // TrafficShapingParameters indicates whether and how to perform data channel traffic shaping.
|
|
|
+ TrafficShapingParameters *DataChannelTrafficShapingParameters
|
|
|
+
|
|
|
// ReliableTransport indicates whether to configure the WebRTC data
|
|
|
// channel to use reliable transport. Set ReliableTransport when proxying
|
|
|
// a TCP stream, and unset it when proxying a UDP packets flow with its
|
|
|
@@ -141,35 +160,6 @@ func newWebRTCConn(
|
|
|
return nil, nil, nil, errors.Trace(err)
|
|
|
}
|
|
|
|
|
|
- // Facilitate DTLS Client/ServerHello randomization. The client decides
|
|
|
- // whether to do DTLS randomization and generates and the proxy receives
|
|
|
- // ClientRootObfuscationSecret, so the client can orchestrate replay on
|
|
|
- // both ends of the connection by reusing an obfuscation secret. Derive a
|
|
|
- // secret specific to DTLS. SetDTLSSeed will futher derive a secure PRNG
|
|
|
- // seed specific to either the client or proxy end of the connection
|
|
|
- // (so each peer's randomization will be distinct).
|
|
|
- //
|
|
|
- // To avoid forking many pion repos in order to pass the seed through to
|
|
|
- // the DTLS implementation, SetDTLSSeed populates a cache that's keyed by
|
|
|
- // the UDP conn.
|
|
|
- //
|
|
|
- // TODO: pion/dtls is not forked yet, so this is a no-op at this time.
|
|
|
-
|
|
|
- if config.DoDTLSRandomization {
|
|
|
-
|
|
|
- dtlsObfuscationSecret, err := deriveObfuscationSecret(
|
|
|
- config.ClientRootObfuscationSecret, "in-proxy-DTLS-seed")
|
|
|
- if err != nil {
|
|
|
- return nil, nil, nil, errors.Trace(err)
|
|
|
- }
|
|
|
-
|
|
|
- deadline, _ := ctx.Deadline()
|
|
|
- err = SetDTLSSeed(udpConn, dtlsObfuscationSecret, isOffer, time.Until(deadline))
|
|
|
- if err != nil {
|
|
|
- return nil, nil, nil, errors.Trace(err)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
// Initialize WebRTC
|
|
|
|
|
|
// There is no explicit anti-probing measures for the proxy side of the
|
|
|
@@ -193,18 +183,176 @@ func newWebRTCConn(
|
|
|
// implementation: "DataChannel.readLoop goroutine leak",
|
|
|
// https://github.com/pion/webrtc/issues/2098.
|
|
|
|
|
|
+ // UDPMux Limitations:
|
|
|
+ //
|
|
|
+ // For Psiphon, DialParameters.UDPListen will call
|
|
|
+ // https://pkg.go.dev/net#ListenUDP with an unspecified IP address, in
|
|
|
+ // order to listen on all available interfaces, both IPv4 and IPv6.
|
|
|
+ // However, using webrtc.NewICEUDPMux and a UDP conn with an unspecifed
|
|
|
+ // IP address results in this log warning: "UDPMuxDefault should not
|
|
|
+ // listening on unspecified address, use NewMultiUDPMuxFromPort instead".
|
|
|
+ //
|
|
|
+ // With NewICEUDPMux and an unspecified IP address, pion currently
|
|
|
+ // enumerates local, active interfaces and derives a list of listening
|
|
|
+ // addresses, combining each interface's IP addresses with the assigned
|
|
|
+ // port:
|
|
|
+ // https://github.com/pion/ice/blob/8c5b0991ef3bb070e47afda96faf090e8bf94be6/net.go#L35.
|
|
|
+ // While this works ok in many cases, this PR,
|
|
|
+ // https://github.com/pion/ice/pull/475, indicates the nature of the
|
|
|
+ // issue with UDPMuxDefault:
|
|
|
+ //
|
|
|
+ // > When we have multiple host candidates and been mux to a single port,
|
|
|
+ // > if these candidates share a same conn (either tcp or udp), they
|
|
|
+ // > might read other's [messages causing failure].
|
|
|
+ //
|
|
|
+ // This PR, https://github.com/pion/ice/pull/473, also describes the issue:
|
|
|
+ //
|
|
|
+ // > When using UDPMux and UniversalUDPMux, it is possible that a
|
|
|
+ // > registerConnForAddress() could be called twice or more for the same
|
|
|
+ // > remote candidate (endpoint) by different candidates. E.g., when
|
|
|
+ // > different HOST candidates ping the same remote candidate, the
|
|
|
+ // > udpMuxedConn gets stored once. The second candidate will never
|
|
|
+ // > receive a response. This is also the case when a single socket is
|
|
|
+ // > used for gathering SRFLX and HOST candidates.
|
|
|
+ //
|
|
|
+ // PR 475 introduced MultiUDPMuxDefault to address the issue. However, at
|
|
|
+ // this time, https://github.com/pion/ice/releases/tag/v2.3.6, there's an
|
|
|
+ // open bug with MultiUDPMuxDefault
|
|
|
+ // https://github.com/pion/ice/issues/507: "Multi UDP Mux can't works
|
|
|
+ // when remote also enables Multi UDP Mux". Running the test program
|
|
|
+ // attached to the bug confirms that no data channel is established;
|
|
|
+ // while switching the test code to use NewICEUDPMux results in a
|
|
|
+ // successful data channel connection. Since we need to use a Mux API on
|
|
|
+ // both clients and proxies, we can't yet use MultiUDPMux.
|
|
|
+ //
|
|
|
+ // Another limitation and issue with NewICEUDPMux is that its enumeration
|
|
|
+ // of all local interfaces and IPs includes many IPv6 addresses for
|
|
|
+ // certain interfaces. For example, on macOS,
|
|
|
+ // https://apple.stackexchange.com/a/371661, there are "secured" IPv6
|
|
|
+ // addresses and many "temporary" IPv6 addresses, with all but one
|
|
|
+ // temporary address being "deprecated". Instead of a full enumeration,
|
|
|
+ // we should select only the non-deprecated temporary IPv6 address --
|
|
|
+ // both for performance (avoid excess STUN requests) and privacy.
|
|
|
+ //
|
|
|
+ // Go has a proposal to expose the necessary IPv6 address information:
|
|
|
+ // https://github.com/golang/go/issues/42694. However, as of Android SDK
|
|
|
+ // 30, Go's net.InterfaceAddrs doesn't work at all:
|
|
|
+ // https://github.com/pion/transport/issues/228,
|
|
|
+ // https://github.com/golang/go/issues/40569.
|
|
|
+ //
|
|
|
+ // Note that it's not currently possible to
|
|
|
+ // webrtc.SettingEngine.SetIPFilter to limit IPv6 selection to a single
|
|
|
+ // candidate; that IP filter is not passed through to localInterfaces in
|
|
|
+ // the NewUDPMuxDefault case. And even if it were, there's no guarantee
|
|
|
+ // that the the first IPv6 address passed to the filter would be the
|
|
|
+ // non-deprecated temporary address.
|
|
|
+ //
|
|
|
+ // TODO: get interface IP addresses using native code, apply proper IPv6
|
|
|
+ // filtering, and pass in to pion.
|
|
|
+
|
|
|
+ udpMux := webrtc.NewICEUDPMux(&webrtcLogger{logger: config.Logger}, udpConn)
|
|
|
+
|
|
|
settingEngine := webrtc.SettingEngine{}
|
|
|
settingEngine.DetachDataChannels()
|
|
|
settingEngine.SetICEMulticastDNSMode(ice.MulticastDNSModeDisabled)
|
|
|
- settingEngine.SetICEUDPMux(webrtc.NewICEUDPMux(&webrtcLogger{logger: config.Logger}, udpConn))
|
|
|
-
|
|
|
- // Set this behavior to like common web browser WebRTC stacks.
|
|
|
+ settingEngine.SetICEUDPMux(udpMux)
|
|
|
+ // Set this behavior to look like common web browser WebRTC stacks.
|
|
|
settingEngine.SetDTLSInsecureSkipHelloVerify(true)
|
|
|
|
|
|
+ // TODO: set settingEngine.SetDTLSConnectContextMaker?
|
|
|
+
|
|
|
webRTCAPI := webrtc.NewAPI(webrtc.WithSettingEngine(settingEngine))
|
|
|
|
|
|
dataChannelLabel := "in-proxy-data-channel"
|
|
|
|
|
|
+ // Initialize data channel obfuscation
|
|
|
+
|
|
|
+ config.Logger.WithTraceFields(common.LogFields{
|
|
|
+ "dtls_randomization": config.DoDTLSRandomization,
|
|
|
+ "data_channel_traffic_shaping": config.TrafficShapingParameters != nil,
|
|
|
+ }).Info("data_channel_obfuscation")
|
|
|
+
|
|
|
+ // Facilitate DTLS Client/ServerHello randomization. The client decides
|
|
|
+ // whether to do DTLS randomization and generates and the proxy receives
|
|
|
+ // ClientRootObfuscationSecret, so the client can orchestrate replay on
|
|
|
+ // both ends of the connection by reusing an obfuscation secret. Derive a
|
|
|
+ // secret specific to DTLS. SetDTLSSeed will futher derive a secure PRNG
|
|
|
+ // seed specific to either the client or proxy end of the connection
|
|
|
+ // (so each peer's randomization will be distinct).
|
|
|
+ //
|
|
|
+ // To avoid forking many pion repos in order to pass the seed through to
|
|
|
+ // the DTLS implementation, SetDTLSSeed populates a cache that's keyed by
|
|
|
+ // the UDP conn.
|
|
|
+ //
|
|
|
+ // Either SetDTLSSeed or SetNoDTLSSeed should be set for each conn, as the
|
|
|
+ // pion/dtl fork treats no-seed as an error, as a check against the local
|
|
|
+ // address lookup mechanism.
|
|
|
+
|
|
|
+ deadline, _ := ctx.Deadline()
|
|
|
+ dtlsSeedTTL := time.Until(deadline)
|
|
|
+
|
|
|
+ if config.DoDTLSRandomization {
|
|
|
+
|
|
|
+ dtlsObfuscationSecret, err := deriveObfuscationSecret(
|
|
|
+ config.ClientRootObfuscationSecret, "in-proxy-DTLS-seed")
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ baseSeed := prng.Seed(dtlsObfuscationSecret)
|
|
|
+
|
|
|
+ // We don't specify a listen address, so the UDP conn listens on all
|
|
|
+ // interfaces. Internally, pion/ice expands the UDPConn's LocalAddr
|
|
|
+ // to a concrete IP address per interface. We must set DTLS seeds for
|
|
|
+ // each address.
|
|
|
+ for _, localAddr := range udpMux.GetListenAddresses() {
|
|
|
+ err := inproxy_dtls.SetDTLSSeed(
|
|
|
+ localAddr, &baseSeed, isOffer, dtlsSeedTTL)
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ } else {
|
|
|
+
|
|
|
+ for _, localAddr := range udpMux.GetListenAddresses() {
|
|
|
+ inproxy_dtls.SetNoDTLSSeed(localAddr, dtlsSeedTTL)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Configure traffic shaping, which adds random padding and decoy messages
|
|
|
+ // to data channel message flows.
|
|
|
+
|
|
|
+ var trafficShapingPRNG *prng.PRNG
|
|
|
+ trafficShapingBuffer := new(bytes.Buffer)
|
|
|
+ paddedMessageCount := 0
|
|
|
+ decoyMessageCount := 0
|
|
|
+
|
|
|
+ if config.TrafficShapingParameters != nil {
|
|
|
+
|
|
|
+ trafficShapingContext := "in-proxy-data-channel-traffic-shaping-offer"
|
|
|
+ if !isOffer {
|
|
|
+ trafficShapingContext = "in-proxy-data-channel-traffic-shaping-answer"
|
|
|
+ }
|
|
|
+
|
|
|
+ trafficShapingObfuscationSecret, err := deriveObfuscationSecret(
|
|
|
+ config.ClientRootObfuscationSecret, trafficShapingContext)
|
|
|
+ if err != nil {
|
|
|
+ return nil, nil, nil, errors.Trace(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ seed := prng.Seed(trafficShapingObfuscationSecret)
|
|
|
+ trafficShapingPRNG = prng.NewPRNGWithSeed(&seed)
|
|
|
+
|
|
|
+ paddedMessageCount = trafficShapingPRNG.Range(
|
|
|
+ config.TrafficShapingParameters.MinPaddedMessages,
|
|
|
+ config.TrafficShapingParameters.MaxPaddedMessages)
|
|
|
+
|
|
|
+ decoyMessageCount = trafficShapingPRNG.Range(
|
|
|
+ config.TrafficShapingParameters.MinDecoyMessages,
|
|
|
+ config.TrafficShapingParameters.MaxDecoyMessages)
|
|
|
+ }
|
|
|
+
|
|
|
// NAT traversal setup
|
|
|
|
|
|
// When DisableInboundForMobleNetworks is set, skip both STUN and port
|
|
|
@@ -280,7 +428,8 @@ func newWebRTCConn(
|
|
|
}
|
|
|
|
|
|
conn := &WebRTCConn{
|
|
|
- config: config,
|
|
|
+ config: config,
|
|
|
+
|
|
|
udpConn: udpConn,
|
|
|
portMapper: portMapper,
|
|
|
closedSignal: make(chan struct{}),
|
|
|
@@ -293,7 +442,12 @@ func newWebRTCConn(
|
|
|
// https://github.com/pion/webrtc/blob/dce970438344727af9c9965f88d958c55d32e64d/datachannel.go#L19.
|
|
|
// This read buffer must be as large as the maximum message size or
|
|
|
// else a read may fail with io.ErrShortBuffer.
|
|
|
- messageBuffer: make([]byte, math.MaxUint16),
|
|
|
+ readBuffer: make([]byte, dataChannelMaxMessageSize),
|
|
|
+
|
|
|
+ trafficShapingPRNG: trafficShapingPRNG,
|
|
|
+ trafficShapingBuffer: trafficShapingBuffer,
|
|
|
+ paddedMessageCount: paddedMessageCount,
|
|
|
+ decoyMessageCount: decoyMessageCount,
|
|
|
}
|
|
|
defer func() {
|
|
|
if retErr != nil {
|
|
|
@@ -566,9 +720,23 @@ func (conn *WebRTCConn) Close() error {
|
|
|
|
|
|
func (conn *WebRTCConn) Read(p []byte) (int, error) {
|
|
|
|
|
|
+ for {
|
|
|
+
|
|
|
+ n, err := conn.readMessage(p)
|
|
|
+ if err != nil || n > 0 {
|
|
|
+ return n, err
|
|
|
+ }
|
|
|
+
|
|
|
+ // A decoy message was read; discard and read again.
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (conn *WebRTCConn) readMessage(p []byte) (int, error) {
|
|
|
+
|
|
|
// Don't hold this lock, or else concurrent Writes will be blocked.
|
|
|
conn.mutex.Lock()
|
|
|
dataChannelConn := conn.dataChannelConn
|
|
|
+ decoyDone := conn.decoyDone
|
|
|
conn.mutex.Unlock()
|
|
|
|
|
|
if dataChannelConn == nil {
|
|
|
@@ -582,28 +750,77 @@ func (conn *WebRTCConn) Read(p []byte) (int, error) {
|
|
|
// dataChannelConn.Read returns an error; the error value is stored and
|
|
|
// returned with the Read call that consumes the end of the message buffer.
|
|
|
|
|
|
- conn.messageMutex.Lock()
|
|
|
- defer conn.messageMutex.Unlock()
|
|
|
+ conn.readMutex.Lock()
|
|
|
+ defer conn.readMutex.Unlock()
|
|
|
+
|
|
|
+ if conn.readOffset == conn.readLength {
|
|
|
+ n, err := dataChannelConn.Read(conn.readBuffer)
|
|
|
+ conn.readOffset = 0
|
|
|
+ conn.readLength = n
|
|
|
+ conn.readError = err
|
|
|
|
|
|
- if conn.messageOffset == conn.messageLength {
|
|
|
- n, err := dataChannelConn.Read(conn.messageBuffer)
|
|
|
- conn.messageOffset = 0
|
|
|
- conn.messageLength = n
|
|
|
- conn.messageError = err
|
|
|
+ // Skip over padding.
|
|
|
+
|
|
|
+ if n > 0 && !conn.peerPaddingDone {
|
|
|
+
|
|
|
+ paddingSize, n := binary.Varint(conn.readBuffer[0:conn.readLength])
|
|
|
+ if (paddingSize == 0 && n <= 0) || paddingSize >= int64(conn.readLength) {
|
|
|
+ return 0, errors.TraceNew("invalid padding")
|
|
|
+ }
|
|
|
+
|
|
|
+ if paddingSize < 0 {
|
|
|
+
|
|
|
+ // When the padding header indicates a padding size of -1, the
|
|
|
+ // peer is indicating that padding is done. Subsequent
|
|
|
+ // messages will have no padding header or padding bytes.
|
|
|
+
|
|
|
+ conn.peerPaddingDone = true
|
|
|
+ conn.readOffset += n
|
|
|
+
|
|
|
+ } else {
|
|
|
+
|
|
|
+ conn.readOffset += n + int(paddingSize)
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- n := copy(p, conn.messageBuffer[conn.messageOffset:conn.messageLength])
|
|
|
- conn.messageOffset += n
|
|
|
+ n := copy(p, conn.readBuffer[conn.readOffset:conn.readLength])
|
|
|
+ conn.readOffset += n
|
|
|
|
|
|
var err error
|
|
|
- if conn.messageOffset == conn.messageLength {
|
|
|
- err = conn.messageError
|
|
|
+ if conn.readOffset == conn.readLength {
|
|
|
+ err = conn.readError
|
|
|
+ }
|
|
|
+
|
|
|
+ // When decoy messages are enabled, periodically response to an incoming
|
|
|
+ // messages with an immediate outbound decoy message. This is similar to
|
|
|
+ // the design here:
|
|
|
+ // https://github.com/Psiphon-Labs/psiphon-tunnel-core/blob/c4f6a593a645db4479a7032a9e97d3c0b905cdfc/psiphon/common/quic/obfuscator.go#L361-L409
|
|
|
+ //
|
|
|
+ // writeMessage handles conn.decoyMessageCount, which is syncronized with
|
|
|
+ // conn.WriteMutex, as well as other specific logic. Here we just signal
|
|
|
+ // writeMessage based on the read event.
|
|
|
+ //
|
|
|
+ // When the data channel already has buffered writes in excess of a decoy
|
|
|
+ // message size, the writeMessage skips the decoy message and returns
|
|
|
+ // without blocking, so Read calls will not block.
|
|
|
+
|
|
|
+ if !decoyDone {
|
|
|
+ _, _ = conn.writeMessage(nil, true)
|
|
|
}
|
|
|
|
|
|
return n, errors.Trace(err)
|
|
|
}
|
|
|
|
|
|
func (conn *WebRTCConn) Write(p []byte) (int, error) {
|
|
|
+ return conn.writeMessage(p, false)
|
|
|
+}
|
|
|
+
|
|
|
+func (conn *WebRTCConn) writeMessage(p []byte, decoy bool) (int, error) {
|
|
|
+
|
|
|
+ if p != nil && decoy {
|
|
|
+ return 0, errors.TraceNew("invalid write parameters")
|
|
|
+ }
|
|
|
|
|
|
// Don't hold this lock, or else concurrent Reads will be blocked.
|
|
|
conn.mutex.Lock()
|
|
|
@@ -616,6 +833,119 @@ func (conn *WebRTCConn) Write(p []byte) (int, error) {
|
|
|
return 0, errors.TraceNew("not connected")
|
|
|
}
|
|
|
|
|
|
+ // Only proceed with a decoy message when no pending writes are buffered.
|
|
|
+ //
|
|
|
+ // This check is made before acquiring conn.writeMutex so that, in most
|
|
|
+ // cases, writeMessage won't block Read calls when a concurrent Write is
|
|
|
+ // holding conn.writeMutex and potentially blocking on flow control.
|
|
|
+ // There's still a chance that this test passes, and a concurrent Write
|
|
|
+ // arrives at the same time.
|
|
|
+
|
|
|
+ if decoy && bufferedAmount > 0 {
|
|
|
+ return 0, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ conn.writeMutex.Lock()
|
|
|
+ defer conn.writeMutex.Unlock()
|
|
|
+
|
|
|
+ writeSize := len(p)
|
|
|
+
|
|
|
+ // Determine padding size and padding header size.
|
|
|
+
|
|
|
+ doPadding := false
|
|
|
+ paddingSize := 0
|
|
|
+ var paddingHeader [binary.MaxVarintLen32]byte
|
|
|
+ paddingHeaderSize := 0
|
|
|
+
|
|
|
+ if decoy {
|
|
|
+
|
|
|
+ if conn.decoyMessageCount < 1 {
|
|
|
+ return 0, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ if !conn.trafficShapingPRNG.FlipWeightedCoin(
|
|
|
+ conn.config.TrafficShapingParameters.DecoyMessageProbability) {
|
|
|
+ return 0, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ conn.decoyMessageCount -= 1
|
|
|
+
|
|
|
+ decoySize := conn.trafficShapingPRNG.Range(
|
|
|
+ conn.config.TrafficShapingParameters.MinDecoySize,
|
|
|
+ conn.config.TrafficShapingParameters.MaxDecoySize)
|
|
|
+
|
|
|
+ // When sending a decoy message, the entire message is padding.
|
|
|
+
|
|
|
+ doPadding = true
|
|
|
+ paddingSize = decoySize
|
|
|
+
|
|
|
+ if conn.decoyMessageCount == 0 {
|
|
|
+
|
|
|
+ // Set the shared flag that readMessage uses to stop invoking
|
|
|
+ // writeMessage for decoy events.
|
|
|
+
|
|
|
+ conn.mutex.Lock()
|
|
|
+ conn.decoyDone = true
|
|
|
+ conn.mutex.Unlock()
|
|
|
+ }
|
|
|
+
|
|
|
+ } else if conn.paddedMessageCount > 0 {
|
|
|
+
|
|
|
+ // Add padding to a normal write.
|
|
|
+
|
|
|
+ conn.paddedMessageCount -= 1
|
|
|
+
|
|
|
+ doPadding = true
|
|
|
+ paddingSize = prng.Range(
|
|
|
+ conn.config.TrafficShapingParameters.MinPaddingSize,
|
|
|
+ conn.config.TrafficShapingParameters.MaxPaddingSize)
|
|
|
+
|
|
|
+ } else if conn.decoyMessageCount > 0 {
|
|
|
+
|
|
|
+ // Padding normal messages is done, but there are still outstanding
|
|
|
+ // decoy messages, so add a padding header indicating padding size 0
|
|
|
+ // to this normal message.
|
|
|
+
|
|
|
+ doPadding = true
|
|
|
+ paddingSize = 0
|
|
|
+
|
|
|
+ } else if !conn.trafficShapingDone {
|
|
|
+
|
|
|
+ // Padding normal messages is done and all decoy messages are sent, so
|
|
|
+ // send a special padding header with padding size -1, signaling the
|
|
|
+ // peer that no additional padding will be performed and no
|
|
|
+ // subsequent messages will contain a padding header.
|
|
|
+
|
|
|
+ doPadding = true
|
|
|
+ paddingSize = -1
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ if doPadding {
|
|
|
+
|
|
|
+ // Reduce, if necessary, to stay within the maximum data channel
|
|
|
+ // message size. This is not expected to happen for the io.Copy use
|
|
|
+ // case, with 32K message size, plus reasonable padding sizes.
|
|
|
+
|
|
|
+ if writeSize+binary.MaxVarintLen32+paddingSize > dataChannelMaxMessageSize {
|
|
|
+ paddingSize -= (writeSize + binary.MaxVarintLen32 + paddingSize) - dataChannelMaxMessageSize
|
|
|
+ if paddingSize < 0 {
|
|
|
+ paddingSize = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Add padding overhead to total writeSize before the flow control check.
|
|
|
+
|
|
|
+ writeSize += paddingSize
|
|
|
+
|
|
|
+ paddingHeaderSize = binary.PutVarint(paddingHeader[:], int64(paddingSize))
|
|
|
+ writeSize += paddingHeaderSize
|
|
|
+ }
|
|
|
+
|
|
|
+ if writeSize > dataChannelMaxMessageSize {
|
|
|
+ return 0, errors.TraceNew("write too large")
|
|
|
+ }
|
|
|
+
|
|
|
// Flow control is required to ensure that Write calls don't result in
|
|
|
// unbounded buffering in pion/webrtc. Use similar logic and the same
|
|
|
// buffer size thresholds as the pion sample code.
|
|
|
@@ -632,7 +962,7 @@ func (conn *WebRTCConn) Write(p []byte) (int, error) {
|
|
|
|
|
|
// If the pion write buffer is too full, wait for a signal that sufficient
|
|
|
// write data has been consumed before writing more.
|
|
|
- if !isClosed && bufferedAmount+uint64(len(p)) > dataChannelMaxBufferedAmount {
|
|
|
+ if !isClosed && bufferedAmount+uint64(writeSize) > dataChannelMaxBufferedAmount {
|
|
|
select {
|
|
|
case <-conn.dataChannelWriteBufferSignal:
|
|
|
case <-conn.closedSignal:
|
|
|
@@ -640,12 +970,40 @@ func (conn *WebRTCConn) Write(p []byte) (int, error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Limitation: if len(p) > 65536, the dataChannelConn.Write wil fail. In
|
|
|
- // practise, this is not expected to happen with typical use cases such
|
|
|
- // as io.Copy, which uses a 32K buffer.
|
|
|
+ if conn.trafficShapingDone {
|
|
|
|
|
|
- n, err := dataChannelConn.Write(p)
|
|
|
- return n, errors.Trace(err)
|
|
|
+ // When traffic shaping is done, p is written directly without the
|
|
|
+ // additional trafficShapingBuffer copy.
|
|
|
+
|
|
|
+ // Limitation: if len(p) > 65536, the dataChannelConn.Write will fail. In
|
|
|
+ // practise, this is not expected to happen with typical use cases such
|
|
|
+ // as io.Copy, which uses a 32K buffer.
|
|
|
+ n, err := dataChannelConn.Write(p)
|
|
|
+
|
|
|
+ return n, errors.Trace(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ conn.trafficShapingBuffer.Reset()
|
|
|
+ conn.trafficShapingBuffer.Write(paddingHeader[:paddingHeaderSize])
|
|
|
+ if paddingSize > 0 {
|
|
|
+ conn.trafficShapingBuffer.Write(prng.Bytes(paddingSize))
|
|
|
+ }
|
|
|
+ conn.trafficShapingBuffer.Write(p)
|
|
|
+
|
|
|
+ // Limitation: see above; len(p) + padding must be <= 65536.
|
|
|
+ _, err := dataChannelConn.Write(conn.trafficShapingBuffer.Bytes())
|
|
|
+
|
|
|
+ if conn.paddedMessageCount == 0 && conn.decoyMessageCount == 0 && paddingSize == -1 {
|
|
|
+
|
|
|
+ // Set flag indicating -1 padding size was sent and release traffic
|
|
|
+ // shaping resources.
|
|
|
+
|
|
|
+ conn.trafficShapingDone = true
|
|
|
+ conn.trafficShapingPRNG = nil
|
|
|
+ conn.trafficShapingBuffer = nil
|
|
|
+ }
|
|
|
+
|
|
|
+ return len(p), errors.Trace(err)
|
|
|
}
|
|
|
|
|
|
func (conn *WebRTCConn) LocalAddr() net.Addr {
|