| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464 |
- //go:build PSIPHON_ENABLE_INPROXY
- /*
- * Copyright (c) 2023, Psiphon Inc.
- * All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- */
- package inproxy
- import (
- "bytes"
- "context"
- "encoding/binary"
- std_errors "errors"
- "fmt"
- "io"
- "math"
- "net"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- tls "github.com/Psiphon-Labs/psiphon-tls"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
- inproxy_dtls "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy/dtls"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/stacktrace"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/values"
- quic_go "github.com/Psiphon-Labs/quic-go"
- "github.com/pion/datachannel"
- "github.com/pion/dtls/v2"
- "github.com/pion/ice/v2"
- "github.com/pion/interceptor"
- pion_logging "github.com/pion/logging"
- "github.com/pion/rtp"
- "github.com/pion/sdp/v3"
- "github.com/pion/stun"
- "github.com/pion/transport/v2"
- "github.com/pion/webrtc/v3"
- "github.com/wlynxg/anet"
- )
- const (
- portMappingAwaitTimeout = 2 * time.Second
- readyToProxyAwaitTimeout = 20 * time.Second
- dataChannelBufferedAmountLowThreshold uint64 = 512 * 1024
- dataChannelMaxBufferedAmount uint64 = 1024 * 1024
- dataChannelMaxMessageSize = 65536
- dataChannelMaxLabelLength = 256
- mediaTrackMaxUDPPayloadLength = 1200
- mediaTrackRTPPacketOverhead = 12 + 16 + 1 // RTP header, SRTP encryption, and Psiphon padding header
- mediaTrackMaxRTPPayloadLength = mediaTrackMaxUDPPayloadLength - mediaTrackRTPPacketOverhead
- mediaTrackMaxIDLength = 256
- // Psiphon uses a fork of github.com/pion/dtls/v2, selected with go mod
- // replace, which has an idential API apart from dtls.IsPsiphon. If
- // dtls.IsPsiphon is undefined, the build is not using the fork.
- //
- // Limitation: this doesn't check that the vendored code is exactly the
- // same code as the fork.
- assertDTLSFork = dtls.IsPsiphon
- // Similarly, check for the fork of github.com/pion/ice/v2.
- assertICEFork = ice.IsPsiphon
- // Note that Psiphon also uses a fork of github.com/pion/webrtc/v3, but it
- // has an API change which will cause builds to fail when not present.
- )
- // webRTCConn is a WebRTC connection between two peers, with a data channel
- // used to relay streams or packets between them. WebRTCConn implements the
- // net.Conn interface.
- type webRTCConn struct {
- config *webRTCConfig
- isOffer bool
- mutex sync.Mutex
- udpConn net.PacketConn
- portMapper *portMapper
- isClosed int32
- closedSignal chan struct{}
- readyToProxySignal chan struct{}
- readyToProxyOnce sync.Once
- peerConnection *webrtc.PeerConnection
- dataChannel *webrtc.DataChannel
- dataChannelConn datachannel.ReadWriteCloser
- dataChannelWriteBufferSignal chan struct{}
- sendMediaTrack *webrtc.TrackLocalStaticRTP
- sendMediaTrackRTP *webrtc.RTPTransceiver
- receiveMediaTrack *webrtc.TrackRemote
- receiveMediaTrackOpenedSignal chan struct{}
- mediaTrackReliabilityLayer *reliableConn
- iceCandidatePairMetrics common.LogFields
- readMutex sync.Mutex
- readBuffer []byte
- readOffset int
- readLength int
- readError error
- peerPaddingDone bool
- receiveMediaTrackPacket *rtp.Packet
- writeMutex sync.Mutex
- trafficShapingPRNG *prng.PRNG
- trafficShapingBuffer *bytes.Buffer
- paddedMessageCount int
- decoyMessageCount int
- trafficShapingDone bool
- sendMediaTrackPacket *rtp.Packet
- sendMediaTrackSequencer rtp.Sequencer
- sendMediaTrackTimestampTick int
- sendMediaTrackFrameSizeRange [2]int
- sendMediaTrackRemainingFrameSize int
- decoyDone atomic.Bool
- paddedMessagesSent int32
- paddedMessagesReceived int32
- decoyMessagesSent int32
- decoyMessagesReceived int32
- }
- // webRTCConfig specifies the configuration for a WebRTC dial.
- type webRTCConfig struct {
- // Logger is used to log events.
- Logger common.Logger
- // EnableDebugLogging indicates whether to log pion/webrtc debug and trace
- // events. When enabled, these events will be logged to the specified
- // Logger at a Debug log level.
- EnableDebugLogging bool
- // WebRTCDialCoordinator specifies specific WebRTC dial strategies and
- // settings; WebRTCDialCoordinator also facilities dial replay by
- // receiving callbacks when individual dial steps succeed or fail.
- WebRTCDialCoordinator WebRTCDialCoordinator
- // ClientRootObfuscationSecret is generated (or replayed) by the client
- // and sent to the proxy and used to drive obfuscation operations.
- ClientRootObfuscationSecret ObfuscationSecret
- // DoDTLSRandomization indicates whether to perform DTLS randomization.
- DoDTLSRandomization bool
- // UseMediaStreams indicates whether to use WebRTC media streams to tunnel
- // traffic. When false, a WebRTC data channel is used to tunnel traffic.
- UseMediaStreams bool
- // TrafficShapingParameters indicates whether and how to perform data
- // channel or media track traffic shaping.
- TrafficShapingParameters *TrafficShapingParameters
- // ReliableTransport indicates whether to configure the WebRTC data
- // channel to use reliable transport. Set ReliableTransport when proxying
- // a TCP stream, and unset it when proxying a UDP packets flow with its
- // own reliability later, such as QUIC.
- ReliableTransport bool
- }
- // newWebRTCConnWithOffer initiates a new WebRTC connection. An offer SDP is
- // returned, to be sent to the peer. After the offer SDP is forwarded and an
- // answer SDP received in response, call SetRemoteSDP with the answer SDP and
- // then call AwaitInitialDataChannel to await the eventual WebRTC connection
- // establishment.
- func newWebRTCConnForOffer(
- ctx context.Context,
- config *webRTCConfig,
- hasPersonalCompartmentIDs bool) (
- *webRTCConn, WebRTCSessionDescription, *webRTCSDPMetrics, error) {
- conn, SDP, metrics, err := newWebRTCConn(
- ctx, config, nil, hasPersonalCompartmentIDs)
- if err != nil {
- return nil, WebRTCSessionDescription{}, nil, errors.Trace(err)
- }
- return conn, *SDP, metrics, nil
- }
- // newWebRTCConnWithAnswer creates a new WebRTC connection initiated by a peer
- // that provided an offer SDP. An answer SDP is returned to be sent to the
- // peer. After the answer SDP is forwarded, call AwaitInitialDataChannel to
- // await the eventual WebRTC connection establishment.
- func newWebRTCConnForAnswer(
- ctx context.Context,
- config *webRTCConfig,
- peerSDP WebRTCSessionDescription,
- hasPersonalCompartmentIDs bool) (
- *webRTCConn, WebRTCSessionDescription, *webRTCSDPMetrics, error) {
- conn, SDP, metrics, err := newWebRTCConn(
- ctx, config, &peerSDP, hasPersonalCompartmentIDs)
- if err != nil {
- return nil, WebRTCSessionDescription{}, nil, errors.Trace(err)
- }
- return conn, *SDP, metrics, nil
- }
- func newWebRTCConn(
- ctx context.Context,
- config *webRTCConfig,
- peerSDP *WebRTCSessionDescription,
- hasPersonalCompartmentIDs bool) (
- retconn *webRTCConn,
- retSDP *WebRTCSessionDescription,
- retMetrics *webRTCSDPMetrics,
- retErr error) {
- isOffer := peerSDP == nil
- udpConn, err := config.WebRTCDialCoordinator.UDPListen(ctx)
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- // Initialize WebRTC
- // There is no explicit anti-probing measures for the proxy side of the
- // WebRTC connection, since each proxy "listener" is ephemeral, and since
- // the WebRTC data channel protocol authenticates peers with
- // certificates, so even if a probe were to find an ephemeral proxy
- // listener, the listener can respond the same as a normal WebRTC end
- // point would respond to a peer that doesn't have the correct credentials.
- //
- // pion's Mux API is used, as it enables providing a pre-created UDP
- // socket which is configured with necessary BindToDevice settings. We do
- // not actually multiplex multiple client connections on a single proxy
- // connection. As a proxy creates a new UDP socket and Mux for each
- // client, this currently open issue should not impact our
- // implementation: "Listener doesn't process parallel handshakes",
- // https://github.com/pion/dtls/issues/279.
- //
- // We detach data channels in order to use the standard Read/Write APIs.
- // As detaching avoids using the pion DataChannel read loop, this
- // currently open issue should not impact our
- // implementation: "DataChannel.readLoop goroutine leak",
- // https://github.com/pion/webrtc/issues/2098.
- // UDPMux Limitations:
- //
- // For Psiphon, WebRTCDialCoordinator.UDPListen will call
- // https://pkg.go.dev/net#ListenUDP with an unspecified IP address, in
- // order to listen on all available interfaces, both IPv4 and IPv6.
- // However, using webrtc.NewICEUDPMux and a UDP conn with an unspecified
- // IP address results in this log warning: "UDPMuxDefault should not
- // listening on unspecified address, use NewMultiUDPMuxFromPort instead".
- //
- // With NewICEUDPMux and an unspecified IP address, pion currently
- // enumerates local, active interfaces and derives a list of listening
- // addresses, combining each interface's IP addresses with the assigned
- // port:
- // https://github.com/pion/ice/blob/8c5b0991ef3bb070e47afda96faf090e8bf94be6/net.go#L35.
- // While this works ok in many cases, this PR,
- // https://github.com/pion/ice/pull/475, indicates the nature of the
- // issue with UDPMuxDefault:
- //
- // > When we have multiple host candidates and been mux to a single port,
- // > if these candidates share a same conn (either tcp or udp), they
- // > might read other's [messages causing failure].
- //
- // This PR, https://github.com/pion/ice/pull/473, also describes the issue:
- //
- // > When using UDPMux and UniversalUDPMux, it is possible that a
- // > registerConnForAddress() could be called twice or more for the same
- // > remote candidate (endpoint) by different candidates. E.g., when
- // > different HOST candidates ping the same remote candidate, the
- // > udpMuxedConn gets stored once. The second candidate will never
- // > receive a response. This is also the case when a single socket is
- // > used for gathering SRFLX and HOST candidates.
- //
- // PR 475 introduced MultiUDPMuxDefault to address the issue. However, at
- // this time, https://github.com/pion/ice/releases/tag/v2.3.6, there's an
- // open bug with MultiUDPMuxDefault
- // https://github.com/pion/ice/issues/507: "Multi UDP Mux can't works
- // when remote also enables Multi UDP Mux". Running the test program
- // attached to the bug confirms that no data channel is established;
- // while switching the test code to use NewICEUDPMux results in a
- // successful data channel connection. Since we need to use a Mux API on
- // both clients and proxies, we can't yet use MultiUDPMux.
- //
- // We patch pion/webrtc to add the SetICEUDPMuxSrflx functionality from
- // the currently unmerged https://github.com/pion/webrtc/pull/2298.
- // Without SetICEUDPMuxSrflx, STUN operations don't use the mux.
- //
- // We patch pion/ice gatherCandidatesSrflxUDPMux vendor patch to include
- // only the correct network type (IPv4 or IPv6) address candidates.
- // Without this patch, we observed up to 2x duplicate/redundant STUN
- // candidates.
- //
- // TODO: implement and try using transport.Net UDP dial functions in place
- // of NewICEUDPMux and pre-dialed UDP conn; track all dialed UDP
- // connections to close on WebRTCConn.Close; this approach would require
- // an alternative approach to injecting port mapping candidates, which
- // currently depends on the mux UDP socket being available outside of pion.
- // Another limitation and issue with NewICEUDPMux is that its enumeration
- // of all local interfaces and IPs includes many IPv6 addresses for
- // certain interfaces. For example, on macOS,
- // https://apple.stackexchange.com/a/371661, there are "secured" IPv6
- // addresses and many "temporary" IPv6 addresses, with all but one
- // temporary address being "deprecated". Instead of a full enumeration,
- // we should select only the non-deprecated temporary IPv6 address --
- // both for performance (avoid excess STUN requests) and privacy.
- //
- // Go has a proposal to expose the necessary IPv6 address information:
- // https://github.com/golang/go/issues/42694. However, as of Android SDK
- // 30, Go's net.InterfaceAddrs doesn't work at all:
- // https://github.com/pion/transport/issues/228,
- // https://github.com/golang/go/issues/40569.
- //
- // Note that it's not currently possible to
- // webrtc.SettingEngine.SetIPFilter to limit IPv6 selection to a single
- // candidate; that IP filter is not passed through to localInterfaces in
- // the NewUDPMuxDefault case. And even if it were, there's no guarantee
- // that the the first IPv6 address passed to the filter would be the
- // non-deprecated temporary address.
- //
- // To workaround net.Interface issues, we use SettingEngine.SetNet to plug
- // in an alternative implementation of net.Interface which selects only
- // one IPv4 and one IPv6 active interface and IP address and uses the
- // anet package for Android. See pionNetwork for more details.
- deadline, _ := ctx.Deadline()
- TTL := time.Until(deadline)
- pionLoggerFactory := newPionLoggerFactory(
- config.Logger,
- func() bool { return ctx.Err() != nil },
- config.EnableDebugLogging)
- pionNetwork := newPionNetwork(
- ctx, pionLoggerFactory.NewLogger("net"), config.WebRTCDialCoordinator)
- udpMux := webrtc.NewICEUniversalUDPMux(
- pionLoggerFactory.NewLogger("mux"), udpConn, TTL, pionNetwork)
- settingEngine := webrtc.SettingEngine{
- LoggerFactory: pionLoggerFactory,
- }
- settingEngine.SetNet(pionNetwork)
- settingEngine.DetachDataChannels()
- settingEngine.SetICEMulticastDNSMode(ice.MulticastDNSModeDisabled)
- settingEngine.SetICEUDPMux(udpMux)
- settingEngine.SetICEUDPMuxSrflx(udpMux)
- // Set this behavior to look like common web browser WebRTC stacks.
- settingEngine.SetDTLSInsecureSkipHelloVerify(true)
- settingEngine.EnableSCTPZeroChecksum(true)
- // Timeout, retry, and delay adjustments
- //
- // - Add some jitter to timed operations to avoid a trivial pion timing
- // fingerprint.
- //
- // - Reduce the wait time for STUN and peer reflexive candidates from the
- // default 500ms and 1s.
- //
- // - Reduce keepalives from the default 2s to +/-15s and increase
- // disconnect timeout from the default 5s to 3x15s.
- //
- // TODO:
- //
- // - Configuration via tactics.
- //
- // - While the RFC,
- // https://datatracker.ietf.org/doc/html/rfc5245#section-10, calls for
- // keep alives no less than 15s, implementations such as Chrome send
- // keep alives much more frequently,
- // https://issues.webrtc.org/issues/42221718.
- //
- // - Varying the period bewteen each keepalive, as is done with SSH via
- // SSHKeepAlivePeriodMin/Max, requires changes to pion/dtls.
- //
- // - Some traffic-related timeouts are not yet exposed via settingEngine,
- // including ice.defaultSTUNGatherTimeout, ice.maxBindingRequestTimeout.
- settingEngine.SetDTLSRetransmissionInterval(prng.JitterDuration(100*time.Millisecond, 0.1))
- settingEngine.SetHostAcceptanceMinWait(0)
- settingEngine.SetSrflxAcceptanceMinWait(prng.JitterDuration(100*time.Millisecond, 0.1))
- settingEngine.SetPrflxAcceptanceMinWait(prng.JitterDuration(200*time.Millisecond, 0.1))
- settingEngine.SetICETimeouts(45*time.Second, 0, prng.JitterDuration(15*time.Second, 0.2))
- settingEngine.SetICEMaxBindingRequests(10)
- // Initialize data channel or media streams obfuscation
- config.Logger.WithTraceFields(common.LogFields{
- "dtls_randomization": config.DoDTLSRandomization,
- "data_channel_traffic_shaping": config.TrafficShapingParameters != nil,
- "use_media_streams": config.UseMediaStreams,
- }).Info("webrtc_obfuscation")
- // Facilitate DTLS Client/ServerHello randomization. The client decides
- // whether to do DTLS randomization and generates and the proxy receives
- // ClientRootObfuscationSecret, so the client can orchestrate replay on
- // both ends of the connection by reusing an obfuscation secret. Derive a
- // secret specific to DTLS. SetDTLSSeed will futher derive a secure PRNG
- // seed specific to either the client or proxy end of the connection
- // (so each peer's randomization will be distinct).
- //
- // To avoid forking many pion repos in order to pass the seed through to
- // the DTLS implementation, SetDTLSSeed attaches the seed to the DTLS
- // dial context.
- //
- // Either SetDTLSSeed or SetNoDTLSSeed should be set for each conn, as the
- // pion/dtl fork treats no-seed as an error, as a check against the
- // context value mechanism.
- var dtlsCtx context.Context
- if config.DoDTLSRandomization {
- dtlsObfuscationSecret, err := deriveObfuscationSecret(
- config.ClientRootObfuscationSecret, "in-proxy-DTLS-seed")
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- baseSeed := prng.Seed(dtlsObfuscationSecret)
- dtlsCtx, err = inproxy_dtls.SetDTLSSeed(ctx, &baseSeed, isOffer)
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- } else {
- dtlsCtx = inproxy_dtls.SetNoDTLSSeed(ctx)
- }
- settingEngine.SetDTLSConnectContextMaker(func() (context.Context, func()) {
- return context.WithCancel(dtlsCtx)
- })
- // Configure traffic shaping, which adds random padding and decoy messages
- // to data channel message or media track packet flows.
- var trafficShapingPRNG *prng.PRNG
- trafficShapingBuffer := new(bytes.Buffer)
- paddedMessageCount := 0
- decoyMessageCount := 0
- if config.TrafficShapingParameters != nil {
- // TODO: also use pion/dtls.Config.PaddingLengthGenerator?
- trafficShapingContext := "in-proxy-traffic-shaping-offer"
- if !isOffer {
- trafficShapingContext = "in-proxy-traffic-shaping-answer"
- }
- trafficShapingObfuscationSecret, err := deriveObfuscationSecret(
- config.ClientRootObfuscationSecret, trafficShapingContext)
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- seed := prng.Seed(trafficShapingObfuscationSecret)
- trafficShapingPRNG = prng.NewPRNGWithSeed(&seed)
- paddedMessageCount = trafficShapingPRNG.Range(
- config.TrafficShapingParameters.MinPaddedMessages,
- config.TrafficShapingParameters.MaxPaddedMessages)
- decoyMessageCount = trafficShapingPRNG.Range(
- config.TrafficShapingParameters.MinDecoyMessages,
- config.TrafficShapingParameters.MaxDecoyMessages)
- }
- // NAT traversal setup
- // When DisableInboundForMobileNetworks is set, skip both STUN and port
- // mapping for mobile networks. Most mobile networks use CGNAT and
- // neither STUN nor port mapping will be effective. It's faster to not
- // wait for something that ultimately won't work.
- disableInbound := config.WebRTCDialCoordinator.DisableInboundForMobileNetworks() &&
- config.WebRTCDialCoordinator.NetworkType() == NetworkTypeMobile
- // Try to establish a port mapping (UPnP-IGD, PCP, or NAT-PMP), using port
- // mapping services previously found and recorded in PortMappingProbe.
- // Note that portMapper may perform additional probes. portMapper.start
- // launches the process of creating a new port mapping and does not
- // block. Port mappings are not part of the WebRTC standard, or supported
- // by pion/webrtc. Instead, if a port mapping is established, it's edited
- // into the SDP as a new host-type ICE candidate.
- portMappingProbe := config.WebRTCDialCoordinator.PortMappingProbe()
- doPortMapping := !disableInbound &&
- !config.WebRTCDialCoordinator.DisablePortMapping() &&
- portMappingProbe != nil
- var portMapper *portMapper
- if doPortMapping {
- localPort := udpConn.LocalAddr().(*net.UDPAddr).Port
- portMapper, err = newPortMapper(config.Logger, portMappingProbe, localPort)
- if err != nil {
- config.Logger.WithTraceFields(common.LogFields{
- "error": err,
- }).Warning("newPortMapper failed")
- // Continue without port mapper
- } else {
- portMapper.start()
- // On early return, portMapper will be closed by the following
- // deferred conn.Close.
- }
- }
- // Select a STUN server for ICE hole punching. The STUN server to be used
- // needs only support bind and not full RFC5780 NAT discovery.
- //
- // Each dial trys only one STUN server; in Psiphon tunnel establishment,
- // other, concurrent in-proxy dials may select alternative STUN servers
- // via WebRTCDialCoordinator. When the STUN server operation is successful,
- // WebRTCDialCoordinator will be signaled so that it may configure the STUN
- // server selection for replay.
- //
- // The STUN server will observe proxy IP addresses. Enumeration is
- // mitigated by using various public STUN servers, including Psiphon STUN
- // servers for proxies in non-censored regions. Proxies are also more
- // ephemeral than Psiphon servers.
- RFC5780 := false
- stunServerAddress := config.WebRTCDialCoordinator.STUNServerAddress(RFC5780)
- // Proceed even when stunServerAddress is "" and !DisableSTUN, as ICE may
- // find other host candidates.
- doSTUN := stunServerAddress != "" && !disableInbound && !config.WebRTCDialCoordinator.DisableSTUN()
- var ICEServers []webrtc.ICEServer
- if doSTUN {
- // stunServerAddress domain names are resolved with the Psiphon custom
- // resolver via pionNetwork.ResolveUDPAddr
- ICEServers = []webrtc.ICEServer{{URLs: []string{"stun:" + stunServerAddress}}}
- }
- conn := &webRTCConn{
- config: config,
- isOffer: isOffer,
- udpConn: udpConn,
- portMapper: portMapper,
- closedSignal: make(chan struct{}),
- readyToProxySignal: make(chan struct{}),
- dataChannelWriteBufferSignal: make(chan struct{}, 1),
- // A data channel uses SCTP and is message oriented. The maximum
- // message size supported by pion/webrtc is 65536:
- // https://github.com/pion/webrtc/blob/dce970438344727af9c9965f88d958c55d32e64d/datachannel.go#L19.
- // This read buffer must be as large as the maximum message size or
- // else a read may fail with io.ErrShortBuffer.
- //
- // For media streams, the largest media track RTP packet payload is
- // no more than mediaTrackMaxRTPPayloadLength.
- readBuffer: make([]byte, max(dataChannelMaxMessageSize, mediaTrackMaxRTPPayloadLength)),
- trafficShapingPRNG: trafficShapingPRNG,
- trafficShapingBuffer: trafficShapingBuffer,
- paddedMessageCount: paddedMessageCount,
- decoyMessageCount: decoyMessageCount,
- }
- defer func() {
- if retErr != nil {
- // Cleanup on early return
- conn.Close()
- // Notify the WebRTCDialCoordinator that the operation failed so
- // that it can clear replay for that STUN server selection.
- //
- // Limitation: the error here may be due to failures unrelated to
- // the STUN server.
- if ctx.Err() == nil && doSTUN {
- config.WebRTCDialCoordinator.STUNServerAddressFailed(RFC5780, stunServerAddress)
- }
- }
- }()
- settingEngine.SetICEBindingRequestHandler(conn.onICEBindingRequest)
- // All settingEngine configuration must be done before calling NewAPI.
- var webRTCAPI *webrtc.API
- if !config.UseMediaStreams {
- webRTCAPI = webrtc.NewAPI(webrtc.WithSettingEngine(settingEngine))
- } else {
- // Additional webRTCAPI setup for media streams support.
- mediaEngine := &webrtc.MediaEngine{}
- err := mediaEngine.RegisterDefaultCodecs()
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- // pion/webrtc interceptors monitor RTP and send additional traffic
- // including NACKs and RTCP. Enable interceptors for the potential
- // obfuscation benefit from exhibiting this additional traffic.
- // webrtc.RegisterDefaultInterceptors calls ConfigureNack,
- // ConfigureRTCPReports, ConfigureTWCCSender. At this time we skip
- // ConfigureNack as this appears to generate excess "duplicated
- // packet" logs and connection instability. From a connection
- // reliability stand point, the underlying QUIC layer provides any
- // necessary resends.
- interceptors := &interceptor.Registry{}
- err = webrtc.ConfigureRTCPReports(interceptors)
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- err = webrtc.ConfigureTWCCSender(mediaEngine, interceptors)
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- webRTCAPI = webrtc.NewAPI(
- webrtc.WithSettingEngine(settingEngine),
- webrtc.WithMediaEngine(mediaEngine),
- webrtc.WithInterceptorRegistry(interceptors))
- }
- conn.peerConnection, err = webRTCAPI.NewPeerConnection(
- webrtc.Configuration{
- ICEServers: ICEServers,
- })
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- conn.peerConnection.OnConnectionStateChange(conn.onConnectionStateChange)
- conn.peerConnection.OnICECandidate(conn.onICECandidate)
- conn.peerConnection.OnICEConnectionStateChange(conn.onICEConnectionStateChange)
- conn.peerConnection.OnICEGatheringStateChange(conn.onICEGatheringStateChange)
- conn.peerConnection.OnNegotiationNeeded(conn.onNegotiationNeeded)
- conn.peerConnection.OnSignalingStateChange(conn.onSignalingStateChange)
- conn.peerConnection.OnDataChannel(conn.onDataChannel)
- if !config.UseMediaStreams && isOffer {
- // Use a data channel to proxy traffic. The client offer sets the data
- // channel configuration.
- dataChannelInit := &webrtc.DataChannelInit{}
- if !config.ReliableTransport {
- ordered := false
- dataChannelInit.Ordered = &ordered
- maxRetransmits := uint16(0)
- dataChannelInit.MaxRetransmits = &maxRetransmits
- }
- // Generate a random length label, to vary the DATA_CHANNEL_OPEN
- // message length. This length/value is not replayed.
- dataChannelLabel := prng.HexString(prng.Range(1, dataChannelMaxLabelLength))
- dataChannel, err := conn.peerConnection.CreateDataChannel(
- dataChannelLabel, dataChannelInit)
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- conn.setDataChannel(dataChannel)
- }
- if config.UseMediaStreams {
- // Use media streams to proxy traffic. Each peer configures one
- // unidirectional media stream track to send its proxied traffic. In
- // WebRTC, a media stream consists of a set of tracks. Configure and
- // use a single video track.
- //
- // This implementation is intended to circumvent the WebRTC data
- // channel blocking described in "Differential Degradation
- // Vulnerabilities in Censorship Circumvention Systems",
- // https://arxiv.org/html/2409.06247v1, section 5.2.
- // Select the media track attributes, which are observable, in
- // plaintext, in the RTP header. Attributes include the payload
- // type/codec and codec timestamp inputs. Attempt to mimic common
- // WebRTC media stream traffic by selecting common codecs and video
- // frame sizes and timestamp ticks. Each peer's track has its own
- // attributes, which is not unusual. This is a basic effort to avoid
- // trivial, stateless or minimal state DPI blocking, unlike more
- // advanced schemes which replace bytes in actual video streams. The
- // client drives attribute selection and replay by specifying
- // ClientRootObfuscationSecret.
- propertiesContext := "in-proxy-media-track-properties-offer"
- if !isOffer {
- propertiesContext = "in-proxy-media-track-properties-answer"
- }
- propertiesObfuscationSecret, err := deriveObfuscationSecret(
- config.ClientRootObfuscationSecret, propertiesContext)
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- seed := prng.Seed(propertiesObfuscationSecret)
- propertiesPRNG := prng.NewPRNGWithSeed(&seed)
- // Omit webrtc.MimeTypeH265, which results in the error:
- // "SetRemoteSDP: unable to start track, codec is not supported by remote".
- mimeTypes := []string{webrtc.MimeTypeH264, webrtc.MimeTypeVP8, webrtc.MimeTypeVP9, webrtc.MimeTypeAV1}
- clockRate := 90000 // Standard 90kHz
- frameRates := []int{25, 30, 60} // Common frame rates
- // Select frame sizes from common video modes. Each frame size is
- // selected at random from the given range, and the codec timestamp
- // is advanced when the resulting "frame size" number of proxied
- // bytes is sent.
- //
- // - Low-resolution video (e.g., QCIF): 1–10 KB per frame.
- // - Standard-definition video (480p): 50–200 KB per frame.
- // - High-definition video (720p): 100–500 KB per frame.
- // - Full HD video (1080p): 300 KB – 1 MB per frame.
- // - 4K video: 1–4 MB per frame.
- KB := 1024
- MB := 1024 * 1024
- frameSizeRanges := [][2]int{
- {1 * KB, 10 * KB},
- {50 * KB, 200 * KB},
- {100 * KB, 500 * KB},
- {300 * KB, 1 * MB},
- {1 * MB, 4 * MB}}
- mimeType := mimeTypes[propertiesPRNG.Intn(len(mimeTypes))]
- frameRate := frameRates[propertiesPRNG.Intn(len(frameRates))]
- frameSizeRange := frameSizeRanges[propertiesPRNG.Intn(len(frameSizeRanges))]
- conn.sendMediaTrackTimestampTick = clockRate / frameRate
- conn.sendMediaTrackFrameSizeRange = frameSizeRange
- // Initialize the first frame size. The random frame sizes are not
- // replayed.
- conn.sendMediaTrackRemainingFrameSize = prng.Range(
- conn.sendMediaTrackFrameSizeRange[0], conn.sendMediaTrackFrameSizeRange[1])
- // Generate random IDs, to vary the resulting SDP entry size message
- // length. These lengths/values are not replayed.
- trackID := prng.HexString(prng.Range(1, mediaTrackMaxIDLength))
- trackStreamID := prng.HexString(prng.Range(1, mediaTrackMaxIDLength))
- // Initialize a reusable rtp.Packet struct to avoid an allocation per
- // write. In SRTP, the packet payload is encrypted while the RTP
- // header remains plaintext.
- //
- // Plaintext RTP header fields:
- //
- // - Version is always 2.
- //
- // - Timestamp is initialized here to a random value, as is common,
- // and incremented, after writes, for the next video "frame".
- // Limitation: in states of low tunnel traffic, the video frame and
- // timestamp progression won't look realistic.
- //
- // - PayloadType is the codec and is auto-populated by pion.
- //
- // - SequenceNumber is a packet sequence number and populated by
- // pion's rtp.NewRandomSequencer, which uses the same logic as
- // Chrome's WebRTC implementation.
- //
- // - SSRC a random stream identifier, distinct from the track/stream
- // ID, and is auto-populated by pion.
- conn.sendMediaTrackPacket = &rtp.Packet{
- Header: rtp.Header{
- Version: 2,
- Timestamp: uint32(prng.Int63n(int64(1) << 32)),
- }}
- conn.sendMediaTrackSequencer = rtp.NewRandomSequencer()
- // Add the outbound media track to the SDP that is sent to the peer.
- conn.sendMediaTrack, err = webrtc.NewTrackLocalStaticRTP(
- webrtc.RTPCodecCapability{
- MimeType: mimeType,
- ClockRate: uint32(clockRate),
- },
- trackID,
- trackStreamID)
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- conn.sendMediaTrackRTP, err = conn.peerConnection.AddTransceiverFromTrack(
- conn.sendMediaTrack,
- webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendrecv})
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- for _, rtpSender := range conn.peerConnection.GetSenders() {
- // Read incoming packets for this outbound RTP stream. Streams are
- // unidirectional for media payload, but there will be incoming
- // packets, from the peer, for RTCP, NACK, and other control
- // mechanisms. Interceptors are implicitly invoked and the
- // packets are then discarded.
- go func(rtpSender *webrtc.RTPSender) {
- var buffer [1500]byte
- for {
- _, _, err := conn.sendMediaTrackRTP.Sender().Read(buffer[:])
- if err != nil {
- // TODO: log error?
- select {
- case <-conn.closedSignal:
- return
- default:
- }
- }
- }
- }(rtpSender)
- }
- // Initialize the callback that is invoked once we receive an inbound
- // packet from the peer's media stream.
- //
- // Unlike data channels, where webrtc.DataChannel.OnOpen is symmetric
- // and invoked on both peers for a single, bidirectional channel,
- // webrtc.PeerConnection.OnTrack is unidirectional. And, unlike
- // DataChannel.OnOpen, if both peers await OnTrack before proxying,
- // the tunnel will hang. One side must start sending data in order
- // for OnTrack to be invoked on the other side.
- // See: https://github.com/pion/webrtc/issues/989#issuecomment-580424615.
- //
- // This has implications for AwaitReadyToProxy: in the media stream
- // mode, and when not using the media track reliability layer,
- // AwaitReadyToProxy returns when the DTLS handshake has completed,
- // but before any SRTP packets have been received from the peer.
- conn.receiveMediaTrackOpenedSignal = make(chan struct{})
- conn.receiveMediaTrackPacket = &rtp.Packet{}
- conn.peerConnection.OnTrack(conn.onMediaTrack)
- }
- // Prepare to await full ICE completion, including STUN candidates.
- // Trickle ICE is not used, simplifying the broker API. It's expected
- // that most clients and proxies will be behind a NAT, and not have
- // publicly addressable host candidates. TURN is not used. So most
- // candidates will be STUN, or server-reflexive, candidates.
- //
- // Later, the first to complete out of ICE or port mapping is used.
- //
- // TODO: stop waiting if an IPv6 host candidate is found?
- iceComplete := webrtc.GatheringCompletePromise(conn.peerConnection)
- // Create an offer, or input a peer's offer to create an answer.
- if isOffer {
- offer, err := conn.peerConnection.CreateOffer(nil)
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- err = conn.peerConnection.SetLocalDescription(offer)
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- } else {
- SDP := peerSDP.SDP
- if hasPersonalCompartmentIDs {
- // In personal pairing mode, the peer SDP may include private IP
- // addresses. To avoid unnecessary network traffic, filter out
- // any peer private IP addresses for which there is no
- // corresponding local, active interface.
- errorOnNoCandidates := false
- allowPrivateIPAddressCandidates := true
- filterPrivateIPAddressCandidates := true
- adjustedSDP, _, err := filterSDPAddresses(
- []byte(peerSDP.SDP),
- errorOnNoCandidates,
- nil,
- common.GeoIPData{},
- allowPrivateIPAddressCandidates,
- filterPrivateIPAddressCandidates)
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- SDP = string(adjustedSDP)
- }
- pionSessionDescription := webrtc.SessionDescription{
- Type: webrtc.SDPType(peerSDP.Type),
- SDP: SDP,
- }
- err = conn.peerConnection.SetRemoteDescription(pionSessionDescription)
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- answer, err := conn.peerConnection.CreateAnswer(nil)
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- err = conn.peerConnection.SetLocalDescription(answer)
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- }
- // Await either ICE or port mapping completion.
- // As a future enhancement, track which of ICE or port mapping succeeds
- // and is then followed by a failed WebRTC dial; stop trying the method
- // that often fails.
- iceCompleted := false
- portMappingExternalAddr := ""
- if portMapper == nil {
- select {
- case <-iceComplete:
- iceCompleted = true
- case <-ctx.Done():
- return nil, nil, nil, errors.Trace(ctx.Err())
- }
- } else {
- select {
- case <-iceComplete:
- iceCompleted = true
- case portMappingExternalAddr = <-portMapper.portMappingExternalAddress():
- case <-ctx.Done():
- return nil, nil, nil, errors.Trace(ctx.Err())
- }
- // When STUN is skipped and a port mapping is expected to be
- // available, await a port mapping for a short period. In this
- // scenario, pion ICE gathering may complete first, since it's only
- // gathering local host candidates.
- //
- // It remains possible that these local candidates are sufficient, if
- // they are public IPs or private IPs on the same LAN as the peer in
- // the case of personal pairing. For that reason, the await timeout
- // should be no more than a couple of seconds.
- //
- // TODO: also await port mappings when doSTUN, in case there are no
- // STUN candidates; see hasServerReflexive check below; as it stands,
- // in this case, it's more likely that port mapping won the previous
- // select race.
- if iceCompleted && portMappingExternalAddr == "" && !doSTUN && doPortMapping {
- timer := time.NewTimer(
- common.ValueOrDefault(
- config.WebRTCDialCoordinator.WebRTCAwaitPortMappingTimeout(),
- portMappingAwaitTimeout))
- defer timer.Stop()
- select {
- case portMappingExternalAddr = <-portMapper.portMappingExternalAddress():
- case <-timer.C:
- // Continue without port mapping
- case <-ctx.Done():
- return nil, nil, nil, errors.Trace(ctx.Err())
- }
- timer.Stop()
- }
- if portMapper != nil && portMappingExternalAddr == "" {
- // Release any port mapping resources when not using it.
- portMapper.close()
- conn.portMapper = nil
- } else if portMappingExternalAddr != "" {
- // Update responding port mapping types for metrics.
- //
- // Limitation: if there are multiple responding protocol types, it's
- // not known here which was used for this dial.
- config.WebRTCDialCoordinator.SetPortMappingTypes(
- getRespondingPortMappingTypes(config.WebRTCDialCoordinator.NetworkID()))
- }
- }
- config.Logger.WithTraceFields(common.LogFields{
- "ice_completed": iceCompleted,
- "port_mapping": portMappingExternalAddr != "",
- }).Info("webrtc_candidates_gathered")
- // Get the offer or answer, now populated with any ICE candidates.
- localDescription := conn.peerConnection.LocalDescription()
- // Adjust the SDP, removing local network addresses and adding any
- // port mapping candidate. Clients (offer) are permitted to have
- // no ICE candidates but proxies (answer) must have at least one
- //candidate.
- errorOnNoCandidates := !isOffer
- adjustedSDP, metrics, err := prepareSDPAddresses(
- []byte(localDescription.SDP),
- errorOnNoCandidates,
- portMappingExternalAddr,
- config.WebRTCDialCoordinator.DisableIPv6ICECandidates(),
- hasPersonalCompartmentIDs)
- if err != nil {
- return nil, nil, nil, errors.Trace(err)
- }
- // When STUN was attempted, ICE completed, and a STUN server-reflexive
- // candidate is present, notify the WebRTCDialCoordinator so that it can
- // set replay for that STUN server selection.
- if iceCompleted && doSTUN {
- hasServerReflexive := false
- for _, candidateType := range metrics.iceCandidateTypes {
- if candidateType == ICECandidateServerReflexive {
- hasServerReflexive = true
- }
- }
- if hasServerReflexive {
- config.WebRTCDialCoordinator.STUNServerAddressSucceeded(RFC5780, stunServerAddress)
- } else {
- config.WebRTCDialCoordinator.STUNServerAddressFailed(RFC5780, stunServerAddress)
- }
- }
- // The WebRTCConn is prepared, but the data channel is not yet connected.
- // On the offer end, the peer's following answer must be input to
- // SetRemoteSDP. And both ends must call AwaitInitialDataChannel to await
- // the data channel establishment.
- return conn,
- &WebRTCSessionDescription{
- Type: int(localDescription.Type),
- SDP: string(adjustedSDP),
- },
- metrics,
- nil
- }
- func (conn *webRTCConn) setDataChannel(dataChannel *webrtc.DataChannel) {
- // Assumes the caller holds conn.mutex, or is newWebRTCConn, creating the
- // conn.
- conn.dataChannel = dataChannel
- conn.dataChannel.OnOpen(conn.onDataChannelOpen)
- conn.dataChannel.OnClose(conn.onDataChannelClose)
- // Set up flow control (see comment in conn.Write)
- conn.dataChannel.SetBufferedAmountLowThreshold(dataChannelBufferedAmountLowThreshold)
- conn.dataChannel.OnBufferedAmountLow(func() {
- select {
- case conn.dataChannelWriteBufferSignal <- struct{}{}:
- default:
- }
- })
- }
- // SetRemoteSDP takes the answer SDP that is received in response to an offer
- // SDP. SetRemoteSDP initiates the WebRTC connection establishment on the
- // offer end.
- func (conn *webRTCConn) SetRemoteSDP(
- peerSDP WebRTCSessionDescription,
- hasPersonalCompartmentIDs bool) error {
- conn.mutex.Lock()
- defer conn.mutex.Unlock()
- SDP := peerSDP.SDP
- if hasPersonalCompartmentIDs {
- // In personal pairing mode, the peer SDP may include private IP
- // addresses. To avoid unnecessary network traffic, filter out any
- // peer private IP addresses for which there is no corresponding
- // local, active interface.
- errorOnNoCandidates := false
- allowPrivateIPAddressCandidates := true
- filterPrivateIPAddressCandidates := true
- adjustedSDP, _, err := filterSDPAddresses(
- []byte(peerSDP.SDP),
- errorOnNoCandidates,
- nil,
- common.GeoIPData{},
- allowPrivateIPAddressCandidates,
- filterPrivateIPAddressCandidates)
- if err != nil {
- return errors.Trace(err)
- }
- SDP = string(adjustedSDP)
- }
- pionSessionDescription := webrtc.SessionDescription{
- Type: webrtc.SDPType(peerSDP.Type),
- SDP: SDP,
- }
- err := conn.peerConnection.SetRemoteDescription(pionSessionDescription)
- if err != nil {
- return errors.Trace(err)
- }
- return nil
- }
- // AwaitReadyToProxy returns when the data channel is established, or media
- // streams are ready to send data, or when an error has occured.
- func (conn *webRTCConn) AwaitReadyToProxy(ctx context.Context, connectionID ID) error {
- // Don't lock the mutex, or else necessary operations will deadlock.
- select {
- case <-conn.readyToProxySignal:
- // ICE is complete and DTLS is connected. In data channel mode, the
- // data channel is established using SCTP, which involves a further
- // handshake. In media stream mode, due to its unidirectional nature,
- // there is no equivalent to the the data channel establishment step.
- // See OnTrack comment in newWebRTCConn.
- err := conn.recordSelectedICECandidateStats()
- if err != nil {
- conn.config.Logger.WithTraceFields(common.LogFields{
- "error": err.Error()}).Warning("recordCandidateStats failed")
- // Continue without log
- }
- case <-ctx.Done():
- return errors.Tracef("with ICE candidate pairs %s: %w",
- conn.getICECandidatePairsSummary(),
- ctx.Err())
- case <-conn.closedSignal:
- return errors.TraceNew("connection has closed")
- }
- if conn.config.UseMediaStreams && conn.config.ReliableTransport {
- // The SRTP protocol used in media stream mode doesn't offer
- // reliable/ordered transport, so when that transport property is
- // required, add a reliability layer based on QUIC. This layer is
- // fully established here before returning read-to-proxy.
- err := conn.addRTPReliabilityLayer(ctx)
- if err != nil {
- return errors.Trace(err)
- }
- }
- conn.config.Logger.WithTraceFields(common.LogFields{
- "connectionID": connectionID,
- }).Info("WebRTC tunnel established")
- return nil
- }
- func (conn *webRTCConn) getICECandidatePairsSummary() string {
- conn.mutex.Lock()
- defer conn.mutex.Unlock()
- stateCounts := map[webrtc.StatsICECandidatePairState]int{}
- statsReport := conn.peerConnection.GetStats()
- for key, stats := range statsReport {
- // Uses the pion StatsReport key formats "candidate:<ID>"
- // and "candidate:<ID>-candidate:<ID>"
- key, found := strings.CutPrefix(key, "candidate:")
- if !found {
- continue
- }
- candidateIDs := strings.Split(key, "-candidate:")
- if len(candidateIDs) != 2 {
- continue
- }
- candidatePairStats, ok := stats.(webrtc.ICECandidatePairStats)
- if !ok {
- continue
- }
- stateCounts[candidatePairStats.State] += 1
- }
- if len(stateCounts) == 0 {
- return "(none)"
- }
- var strs []string
- for state, count := range stateCounts {
- strs = append(strs, fmt.Sprintf("%s(%d)", state, count))
- }
- return strings.Join(strs, ", ")
- }
- func (conn *webRTCConn) recordSelectedICECandidateStats() error {
- conn.mutex.Lock()
- defer conn.mutex.Unlock()
- statsReport := conn.peerConnection.GetStats()
- foundNominatedPair := false
- for key, stats := range statsReport {
- // Uses the pion StatsReport key formats "candidate:<ID>"
- // and "candidate:<ID>-candidate:<ID>"
- key, found := strings.CutPrefix(key, "candidate:")
- if !found {
- continue
- }
- candidateIDs := strings.Split(key, "-candidate:")
- if len(candidateIDs) != 2 {
- continue
- }
- candidatePairStats, ok := stats.(webrtc.ICECandidatePairStats)
- if !ok ||
- candidatePairStats.State != webrtc.StatsICECandidatePairStateSucceeded ||
- !candidatePairStats.Nominated {
- continue
- }
- localKey := fmt.Sprintf("candidate:%s", candidateIDs[0])
- stats, ok := statsReport[localKey]
- if !ok {
- return errors.TraceNew("missing local ICECandidateStats")
- }
- localCandidateStats, ok := stats.(webrtc.ICECandidateStats)
- if !ok {
- return errors.TraceNew("unexpected local ICECandidateStats")
- }
- remoteKey := fmt.Sprintf("candidate:%s", candidateIDs[1])
- stats, ok = statsReport[remoteKey]
- if !ok {
- return errors.TraceNew("missing remote ICECandidateStats")
- }
- remoteCandidateStats, ok := stats.(webrtc.ICECandidateStats)
- if !ok {
- return errors.TraceNew("unexpected remote ICECandidateStats")
- }
- // Use the same ICE candidate type names as logged in broker logs.
- logCandidateType := func(
- iceCandidateType webrtc.ICECandidateType) string {
- logType := ICECandidateUnknown
- switch iceCandidateType {
- case webrtc.ICECandidateTypeHost:
- logType = ICECandidateHost
- case webrtc.ICECandidateTypeSrflx:
- logType = ICECandidateServerReflexive
- case webrtc.ICECandidateTypePrflx:
- logType = ICECandidatePeerReflexive
- }
- return logType.String()
- }
- conn.iceCandidatePairMetrics = common.LogFields{}
- // TODO: log which of local/remote candidate is initiator
- conn.iceCandidatePairMetrics["inproxy_webrtc_local_ice_candidate_type"] =
- logCandidateType(localCandidateStats.CandidateType)
- localIP := net.ParseIP(localCandidateStats.IP)
- isIPv6 := "0"
- if localIP != nil && localIP.To4() == nil {
- isIPv6 = "1"
- }
- isPrivate := "0"
- if localIP != nil && localIP.IsPrivate() {
- isPrivate = "1"
- }
- conn.iceCandidatePairMetrics["inproxy_webrtc_local_ice_candidate_is_IPv6"] =
- isIPv6
- conn.iceCandidatePairMetrics["inproxy_webrtc_local_ice_candidate_is_private_IP"] =
- isPrivate
- conn.iceCandidatePairMetrics["inproxy_webrtc_local_ice_candidate_port"] =
- localCandidateStats.Port
- conn.iceCandidatePairMetrics["inproxy_webrtc_remote_ice_candidate_type"] =
- logCandidateType(remoteCandidateStats.CandidateType)
- remoteIP := net.ParseIP(remoteCandidateStats.IP)
- isIPv6 = "0"
- if remoteIP != nil && remoteIP.To4() == nil {
- isIPv6 = "1"
- }
- isPrivate = "0"
- if remoteIP != nil && remoteIP.IsPrivate() {
- isPrivate = "1"
- }
- conn.iceCandidatePairMetrics["inproxy_webrtc_remote_ice_candidate_is_IPv6"] =
- isIPv6
- conn.iceCandidatePairMetrics["inproxy_webrtc_remote_ice_candidate_is_private_IP"] =
- isPrivate
- conn.iceCandidatePairMetrics["inproxy_webrtc_remote_ice_candidate_port"] =
- remoteCandidateStats.Port
- foundNominatedPair = true
- break
- }
- if !foundNominatedPair {
- return errors.TraceNew("missing nominated ICECandidateStatsPair")
- }
- return nil
- }
- func (conn *webRTCConn) Close() error {
- if !atomic.CompareAndSwapInt32(&conn.isClosed, 0, 1) {
- return nil
- }
- // Synchronize reading these conn fields, which may be initialized by
- // concurrent callbacks such as onDataChannel and onMediaTrack.
- //
- // To avoid potential deadlocks, don't continue to hold the lock while
- // closing individual components. For example, internally, the quic-go
- // implementation underlying reliableConn can concurrently call through
- // to writeMediaTrackPacket, which attempts to temporarily lock
- // conn.mutex, while reliableConn's quicConn.Close will wait on that
- // write operation.
- conn.mutex.Lock()
- portMapper := conn.portMapper
- sendMediaTrackRTP := conn.sendMediaTrackRTP
- mediaTrackReliabilityLayer := conn.mediaTrackReliabilityLayer
- dataChannelConn := conn.dataChannelConn
- dataChannel := conn.dataChannel
- peerConnection := conn.peerConnection
- udpConn := conn.udpConn
- conn.mutex.Unlock()
- // Signal closing, which will unblock some waiting conditions, before
- // awaiting the close of each component.
- close(conn.closedSignal)
- // Close the udpConn to interrupt any blocking DTLS handshake:
- // https://github.com/pion/webrtc/blob/c1467e4871c78ee3f463b50d858d13dc6f2874a4/dtlstransport.go#L334-L340
- //
- // Limitation: there is no guarantee that pion sends any closing packets
- // before the UDP socket is closed here.
- if udpConn != nil {
- _ = udpConn.Close()
- }
- // Neither sendMediaTrack nor receiveMediaTrack have a Close operation.
- if portMapper != nil {
- portMapper.close()
- }
- if sendMediaTrackRTP != nil {
- _ = sendMediaTrackRTP.Stop()
- }
- if mediaTrackReliabilityLayer != nil {
- _ = mediaTrackReliabilityLayer.Close()
- }
- if dataChannelConn != nil {
- _ = dataChannelConn.Close()
- }
- if dataChannel != nil {
- _ = dataChannel.Close()
- }
- if peerConnection != nil {
- // TODO: use PeerConnection.GracefulClose (requires pion/webrtc 3.2.51)?
- _ = peerConnection.Close()
- }
- return nil
- }
- func (conn *webRTCConn) IsClosed() bool {
- return atomic.LoadInt32(&conn.isClosed) == 1
- }
- func (conn *webRTCConn) Read(p []byte) (int, error) {
- if !conn.config.UseMediaStreams {
- // Data channel mode.
- n, err := conn.readDataChannel(p)
- return n, errors.TraceReader(err)
- }
- if conn.mediaTrackReliabilityLayer != nil {
- // Media stream mode with reliability layer.
- n, err := conn.mediaTrackReliabilityLayer.Read(p)
- return n, errors.TraceReader(err)
- }
- // Media stream mode without reliability layer.
- n, err := conn.readMediaTrack(p)
- return n, errors.TraceReader(err)
- }
- func (conn *webRTCConn) Write(p []byte) (int, error) {
- if !conn.config.UseMediaStreams {
- // Data channel mode.
- n, err := conn.writeDataChannelMessage(p, false)
- return n, errors.Trace(err)
- }
- if conn.mediaTrackReliabilityLayer != nil {
- // Media stream mode with reliability layer.
- n, err := conn.mediaTrackReliabilityLayer.Write(p)
- return n, errors.Trace(err)
- }
- // Media stream mode without reliability layer.
- n, err := conn.writeMediaTrackPacket(p, false)
- return n, errors.Trace(err)
- }
- func (conn *webRTCConn) LocalAddr() net.Addr {
- conn.mutex.Lock()
- defer conn.mutex.Unlock()
- // This is the local UDP socket address, not the external, public address.
- return conn.udpConn.LocalAddr()
- }
- func (conn *webRTCConn) RemoteAddr() net.Addr {
- conn.mutex.Lock()
- defer conn.mutex.Unlock()
- // Not supported.
- return nil
- }
- func (conn *webRTCConn) SetDeadline(t time.Time) error {
- conn.mutex.Lock()
- defer conn.mutex.Unlock()
- return errors.TraceNew("not supported")
- }
- func (conn *webRTCConn) SetReadDeadline(t time.Time) error {
- conn.mutex.Lock()
- defer conn.mutex.Unlock()
- if conn.IsClosed() {
- return errors.TraceNew("closed")
- }
- if conn.config.UseMediaStreams {
- // This is the same workaround used and documented in
- // mediaTrackPacketConn.SetReadDeadline.
- //
- // As in mediaTrackPacketConn, this assumes that SetReadDeadline is
- // called only in the terminating quic-go case.
- go func() {
- _ = conn.Close()
- }()
- }
- readDeadliner, ok := conn.dataChannelConn.(datachannel.ReadDeadliner)
- if !ok {
- return errors.TraceNew("no data channel")
- }
- return readDeadliner.SetReadDeadline(t)
- }
- func (conn *webRTCConn) SetWriteDeadline(t time.Time) error {
- conn.mutex.Lock()
- defer conn.mutex.Unlock()
- return errors.TraceNew("not supported")
- }
- // GetMetrics implements the common.MetricsSource interface and returns log
- // fields detailing the WebRTC dial parameters.
- func (conn *webRTCConn) GetMetrics() common.LogFields {
- conn.mutex.Lock()
- defer conn.mutex.Unlock()
- logFields := make(common.LogFields)
- logFields.Add(conn.iceCandidatePairMetrics)
- randomizeDTLS := "0"
- if conn.config.DoDTLSRandomization {
- randomizeDTLS = "1"
- }
- logFields["inproxy_webrtc_randomize_dtls"] = randomizeDTLS
- useMediaStreams := "0"
- if conn.config.UseMediaStreams {
- useMediaStreams = "1"
- }
- logFields["inproxy_webrtc_use_media_streams"] = useMediaStreams
- logFields["inproxy_webrtc_padded_messages_sent"] = atomic.LoadInt32(&conn.paddedMessagesSent)
- logFields["inproxy_webrtc_padded_messages_received"] = atomic.LoadInt32(&conn.paddedMessagesReceived)
- logFields["inproxy_webrtc_decoy_messages_sent"] = atomic.LoadInt32(&conn.decoyMessagesSent)
- logFields["inproxy_webrtc_decoy_messages_received"] = atomic.LoadInt32(&conn.decoyMessagesReceived)
- return logFields
- }
- func (conn *webRTCConn) onConnectionStateChange(state webrtc.PeerConnectionState) {
- switch state {
- case webrtc.PeerConnectionStateConnected:
- if conn.config.UseMediaStreams {
- // webrtc.PeerConnectionStateConnected is received once the DTLS
- // connection is established. At this point, media track data may
- // be sent. In media stream mode, unblock AwaitForReadyToProxy to
- // allow peers to start sending data. In data channel mode, wait
- // and signal in onDataChannelOpen.
- conn.readyToProxyOnce.Do(func() { close(conn.readyToProxySignal) })
- }
- case webrtc.PeerConnectionStateDisconnected,
- webrtc.PeerConnectionStateFailed,
- webrtc.PeerConnectionStateClosed:
- // Close the WebRTCConn when the connection is no longer connected. Close
- // will lock conn.mutex, so do not aquire the lock here.
- //
- // Currently, ICE Restart is not used, and there is no transition from
- // Disconnected back to Connected.
- conn.Close()
- }
- conn.config.Logger.WithTraceFields(common.LogFields{
- "state": state.String(),
- }).Debug("peer connection state changed")
- }
- func (conn *webRTCConn) onICECandidate(candidate *webrtc.ICECandidate) {
- if candidate == nil {
- return
- }
- conn.config.Logger.WithTraceFields(common.LogFields{
- "candidate": candidate.String(),
- }).Debug("new ICE candidate")
- }
- func (conn *webRTCConn) onICEBindingRequest(m *stun.Message, local, remote ice.Candidate, pair *ice.CandidatePair) bool {
- // SetICEBindingRequestHandler is used to hook onICEBindingRequest into
- // STUN bind events for logging. The return values is always false as
- // this callback makes no adjustments to ICE candidate selection. When
- // the data channel or media track tunnel has already opened, skip
- // logging events, as this callback appears to be invoked for keepalive
- // pings.
- if local == nil || remote == nil {
- return false
- }
- select {
- case <-conn.readyToProxySignal:
- return false
- default:
- }
- conn.config.Logger.WithTraceFields(common.LogFields{
- "local_candidate": local.String(),
- "remote_candidate": remote.String(),
- }).Debug("new ICE STUN binding request")
- return false
- }
- func (conn *webRTCConn) onICEConnectionStateChange(state webrtc.ICEConnectionState) {
- conn.mutex.Lock()
- defer conn.mutex.Unlock()
- conn.config.Logger.WithTraceFields(common.LogFields{
- "state": state.String(),
- }).Debug("ICE connection state changed")
- }
- func (conn *webRTCConn) onICEGatheringStateChange(state webrtc.ICEGathererState) {
- conn.config.Logger.WithTraceFields(common.LogFields{
- "state": state.String(),
- }).Debug("ICE gathering state changed")
- }
- func (conn *webRTCConn) onNegotiationNeeded() {
- conn.config.Logger.WithTrace().Debug("negotiation needed")
- }
- func (conn *webRTCConn) onSignalingStateChange(state webrtc.SignalingState) {
- conn.config.Logger.WithTraceFields(common.LogFields{
- "state": state.String(),
- }).Debug("signaling state changed")
- }
- func (conn *webRTCConn) onDataChannel(dataChannel *webrtc.DataChannel) {
- conn.mutex.Lock()
- defer conn.mutex.Unlock()
- conn.setDataChannel(dataChannel)
- conn.config.Logger.WithTraceFields(common.LogFields{
- "label": dataChannel.Label(),
- "ID": dataChannel.ID(),
- }).Debug("new data channel")
- }
- func (conn *webRTCConn) onMediaTrack(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
- conn.mutex.Lock()
- defer conn.mutex.Unlock()
- conn.receiveMediaTrack = track
- close(conn.receiveMediaTrackOpenedSignal)
- conn.config.Logger.WithTraceFields(common.LogFields{
- "ID": track.ID(),
- "payload_type": track.Kind().String(),
- }).Info("media track open")
- }
- func (conn *webRTCConn) onDataChannelOpen() {
- conn.mutex.Lock()
- defer conn.mutex.Unlock()
- dataChannelConn, err := conn.dataChannel.Detach()
- if err == nil {
- conn.dataChannelConn = dataChannelConn
- // TODO: can a data channel be connected, disconnected, and then
- // reestablished in one session?
- conn.readyToProxyOnce.Do(func() { close(conn.readyToProxySignal) })
- }
- conn.config.Logger.WithTraceFields(common.LogFields{
- "detachError": err,
- }).Info("data channel open")
- }
- func (conn *webRTCConn) onDataChannelClose() {
- // Close the WebRTCConn when the data channel is closed. Close will lock
- // conn.mutex, so do not aquire the lock here.
- conn.Close()
- conn.config.Logger.WithTrace().Info("data channel closed")
- }
- func (conn *webRTCConn) readDataChannel(p []byte) (int, error) {
- for {
- n, err := conn.readDataChannelMessage(p)
- if err != nil || n > 0 {
- return n, errors.TraceReader(err)
- }
- // A decoy message was read; discard and read again.
- }
- }
- func (conn *webRTCConn) readDataChannelMessage(p []byte) (int, error) {
- if conn.IsClosed() {
- return 0, errors.TraceNew("closed")
- }
- // Don't hold this lock, or else concurrent Writes will be blocked.
- conn.mutex.Lock()
- dataChannelConn := conn.dataChannelConn
- conn.mutex.Unlock()
- if dataChannelConn == nil {
- return 0, errors.TraceNew("no data channel")
- }
- // The input read buffer, p, may not be the same length as the message
- // read from the data channel. Buffer the read message if another Read
- // call is necessary to consume it. As per https://pkg.go.dev/io#Reader,
- // dataChannelConn bytes read are processed even when
- // dataChannelConn.Read returns an error; the error value is stored and
- // returned with the Read call that consumes the end of the message buffer.
- conn.readMutex.Lock()
- defer conn.readMutex.Unlock()
- if conn.readOffset == conn.readLength {
- n, err := dataChannelConn.Read(conn.readBuffer)
- conn.readOffset = 0
- conn.readLength = n
- conn.readError = err
- if conn.readLength == 0 && conn.readError != nil {
- // No bytes were read, so return the error immediately.
- return 0, errors.TraceReader(err)
- }
- // Skip over padding.
- if conn.readLength > 0 && !conn.peerPaddingDone {
- paddingSize, n := binary.Varint(conn.readBuffer[0:conn.readLength])
- if (paddingSize == 0 && n <= 0) || paddingSize > int64(conn.readLength-n) {
- if conn.readError == nil {
- return 0, errors.Tracef(
- "invalid padding: %d, %d, %d,", n, paddingSize, conn.readLength)
- }
- return 0, errors.Tracef(
- "invalid padding: %d, %d, %d, %w",
- n, paddingSize, conn.readLength, conn.readError)
- }
- if paddingSize < 0 {
- // When the padding header indicates a padding size of -1, the
- // peer is indicating that padding is done. Subsequent
- // messages will have no padding header or padding bytes.
- conn.peerPaddingDone = true
- conn.readOffset += n
- } else {
- conn.readOffset += n + int(paddingSize)
- atomic.AddInt32(&conn.paddedMessagesReceived, 1)
- if conn.readOffset == conn.readLength {
- atomic.AddInt32(&conn.decoyMessagesReceived, 1)
- }
- }
- }
- }
- n := copy(p, conn.readBuffer[conn.readOffset:conn.readLength])
- conn.readOffset += n
- var err error
- if conn.readOffset == conn.readLength {
- err = conn.readError
- }
- // When decoy messages are enabled, periodically respond to an incoming
- // messages with an immediate outbound decoy message. This is similar to
- // the design here:
- // https://github.com/Psiphon-Labs/psiphon-tunnel-core/blob/c4f6a593a645db4479a7032a9e97d3c0b905cdfc/psiphon/common/quic/obfuscator.go#L361-L409
- //
- // writeDataChannelMessage handles conn.decoyMessageCount, which is
- // synchronized with conn.WriteMutex, as well as other specific logic.
- // Here we just signal writeDataChannelMessage based on the read event.
- //
- // When the data channel already has buffered writes in excess of a decoy
- // message size, the writeDataChannelMessage skips the decoy message and
- // returns without blocking, so Read calls will not block.
- if !conn.decoyDone.Load() {
- _, _ = conn.writeDataChannelMessage(nil, true)
- }
- return n, errors.TraceReader(err)
- }
- func (conn *webRTCConn) writeDataChannelMessage(p []byte, decoy bool) (int, error) {
- if p != nil && decoy {
- return 0, errors.TraceNew("invalid write parameters")
- }
- // pion/sctp doesn't handle 0-byte writes correctly, so drop/skip at this level.
- //
- // Testing shows that the SCTP connection stalls after a 0-byte write. In
- // the pion/sctp implementation,
- // https://github.com/pion/sctp/blob/v1.8.8/stream.go#L254-L278 and
- // https://github.com/pion/sctp/blob/v1.8.8/stream.go#L280-L336, it
- // appears that a zero-byte write won't send an SCTP messages but does
- // increment a sequence number.
- if len(p) == 0 && !decoy {
- return 0, nil
- }
- if conn.IsClosed() {
- return 0, errors.TraceNew("closed")
- }
- // Don't hold this lock, or else concurrent Reads will be blocked.
- conn.mutex.Lock()
- dataChannel := conn.dataChannel
- dataChannelConn := conn.dataChannelConn
- conn.mutex.Unlock()
- if dataChannel == nil || dataChannelConn == nil {
- return 0, errors.TraceNew("no data channel")
- }
- bufferedAmount := dataChannel.BufferedAmount()
- // Only proceed with a decoy message when no pending writes are buffered.
- //
- // This check is made before acquiring conn.writeMutex so that, in most
- // cases, writeMessage won't block Read calls when a concurrent Write is
- // holding conn.writeMutex and potentially blocking on flow control.
- // There's still a chance that this test passes, and a concurrent Write
- // arrives at the same time.
- if decoy && bufferedAmount > 0 {
- return 0, nil
- }
- conn.writeMutex.Lock()
- defer conn.writeMutex.Unlock()
- writeSize := len(p)
- // Determine padding size and padding header size.
- doPadding := false
- paddingSize := 0
- var paddingHeader [binary.MaxVarintLen32]byte
- paddingHeaderSize := 0
- if decoy {
- if conn.decoyMessageCount < 1 {
- return 0, nil
- }
- if !conn.trafficShapingPRNG.FlipWeightedCoin(
- conn.config.TrafficShapingParameters.DecoyMessageProbability) {
- return 0, nil
- }
- conn.decoyMessageCount -= 1
- decoySize := conn.trafficShapingPRNG.Range(
- conn.config.TrafficShapingParameters.MinDecoySize,
- conn.config.TrafficShapingParameters.MaxDecoySize)
- // When sending a decoy message, the entire message is padding.
- doPadding = true
- paddingSize = decoySize
- if conn.decoyMessageCount == 0 {
- // Set the shared flag that readMessage uses to stop invoking
- // writeMessage for decoy events.
- conn.decoyDone.Store(true)
- }
- } else if conn.paddedMessageCount > 0 {
- // Add padding to a normal write.
- conn.paddedMessageCount -= 1
- doPadding = true
- paddingSize = prng.Range(
- conn.config.TrafficShapingParameters.MinPaddingSize,
- conn.config.TrafficShapingParameters.MaxPaddingSize)
- } else if conn.decoyMessageCount > 0 {
- // Padding normal messages is done, but there are still outstanding
- // decoy messages, so add a padding header indicating padding size 0
- // to this normal message.
- doPadding = true
- paddingSize = 0
- } else if !conn.trafficShapingDone {
- // Padding normal messages is done and all decoy messages are sent, so
- // send a special padding header with padding size -1, signaling the
- // peer that no additional padding will be performed and no
- // subsequent messages will contain a padding header.
- doPadding = true
- paddingSize = -1
- }
- if doPadding {
- if paddingSize > 0 {
- // Reduce, if necessary, to stay within the maximum data channel
- // message size. This is not expected to happen for the io.Copy use
- // case, with 32K message size, plus reasonable padding sizes.
- if writeSize+binary.MaxVarintLen32+paddingSize > dataChannelMaxMessageSize {
- paddingSize -= (writeSize + binary.MaxVarintLen32 + paddingSize) - dataChannelMaxMessageSize
- if paddingSize < 0 {
- paddingSize = 0
- }
- }
- // Add padding overhead to total writeSize before the flow control check.
- writeSize += paddingSize
- }
- paddingHeaderSize = binary.PutVarint(paddingHeader[:], int64(paddingSize))
- writeSize += paddingHeaderSize
- }
- if writeSize > dataChannelMaxMessageSize {
- return 0, errors.TraceNew("write too large")
- }
- // Flow control is required to ensure that Write calls don't result in
- // unbounded buffering in pion/webrtc. Use similar logic and the same
- // buffer size thresholds as the pion sample code.
- //
- // https://github.com/pion/webrtc/tree/master/examples/data-channels-flow-control#when-do-we-need-it:
- // > Send or SendText methods are called on DataChannel to send data to
- // > the connected peer. The methods return immediately, but it does not
- // > mean the data was actually sent onto the wire. Instead, it is
- // > queued in a buffer until it actually gets sent out to the wire.
- // >
- // > When you have a large amount of data to send, it is an application's
- // > responsibility to control the buffered amount in order not to
- // > indefinitely grow the buffer size to eventually exhaust the memory.
- // If the pion write buffer is too full, wait for a signal that sufficient
- // write data has been consumed before writing more.
- if !conn.IsClosed() && bufferedAmount+uint64(writeSize) > dataChannelMaxBufferedAmount {
- select {
- case <-conn.dataChannelWriteBufferSignal:
- case <-conn.closedSignal:
- return 0, errors.TraceNew("connection has closed")
- }
- }
- if conn.trafficShapingDone {
- // When traffic shaping is done, p is written directly without the
- // additional trafficShapingBuffer copy.
- // Limitation: if len(p) > 65536, the dataChannelConn.Write will fail. In
- // practise, this is not expected to happen with typical use cases such
- // as io.Copy, which uses a 32K buffer.
- n, err := dataChannelConn.Write(p)
- return n, errors.Trace(err)
- }
- conn.trafficShapingBuffer.Reset()
- conn.trafficShapingBuffer.Write(paddingHeader[:paddingHeaderSize])
- if paddingSize > 0 {
- conn.trafficShapingBuffer.Write(prng.Bytes(paddingSize))
- }
- conn.trafficShapingBuffer.Write(p)
- // Limitation: see above; len(p) + padding must be <= 65536.
- _, err := dataChannelConn.Write(conn.trafficShapingBuffer.Bytes())
- if decoy {
- atomic.AddInt32(&conn.decoyMessagesSent, 1)
- } else if doPadding && paddingSize > 0 {
- atomic.AddInt32(&conn.paddedMessagesSent, 1)
- }
- if conn.paddedMessageCount == 0 && conn.decoyMessageCount == 0 && paddingSize == -1 {
- // Set flag indicating -1 padding size was sent and release traffic
- // shaping resources.
- conn.trafficShapingDone = true
- conn.trafficShapingPRNG = nil
- conn.trafficShapingBuffer = nil
- }
- return len(p), errors.Trace(err)
- }
- // GetQUICMaxPacketSizeAdjustment returns the value to be specified in
- // Psiphon's quic-go configuration ClientMaxPacketSizeAdjustment
- // ServerMaxPacketSizeAdjustment fields. Psiphon's quic-go max packet size
- // adjustment reduces the QUIC payload to accomodate overhead from
- // obfuscation, as in Obfuscated QUIC. In the in-proxy case, the same
- // mechanism is used to ensure that QUIC packets fit within the space
- // available for SRTP packet payloads, allowing for the overhead of the RTP
- // packet. Beyond that allowance, the adjustment is tuned to produce SRTP
- // packets that match common SRTP traffic with maximum packet sizes of 1200
- // bytes, excluding IP and UDP headers.
- //
- // INPROXY-QUIC-OSSH must apply GetQUICMaxPacketSizeAdjustment on both the
- // client and server side. In addition, the client must disable
- // DisablePathMTUDiscovery.
- func GetQUICMaxPacketSizeAdjustment() int {
- // Limitations:
- //
- // - For INPROXY-QUIC-OSSH, the second hop egressing from the proxy is
- // identical regardless of whether the 1st hop uses data channel mode
- // or media stream mode. Currently, the INPROXY-QUIC-OSSH server won't
- // be able to distinguish, early enough, between the modes used by the
- // 1st hop. In order to conform with the required adustment for media
- // stream mode, the server must always apply the adjustment. This
- // reduction in QUIC packet size may impact the performance of data
- // channel mode. Furthermore, the lower maximum QUIC packet size is
- // directly observable on the 2nd hop.
- // common/quic.MAX_PRE_DISCOVERY_PACKET_SIZE = 1280
- quicMTU := 1280
- targetMTUAdjustment := quicMTU - mediaTrackMaxUDPPayloadLength
- if targetMTUAdjustment < 0 {
- targetMTUAdjustment = 0
- }
- adjustment := targetMTUAdjustment + mediaTrackRTPPacketOverhead
- if adjustment < 0 {
- adjustment = 0
- }
- return adjustment
- }
- func (conn *webRTCConn) readMediaTrack(p []byte) (int, error) {
- for {
- n, err := conn.readMediaTrackPacket(p)
- if err != nil || n > 0 {
- return n, errors.TraceReader(err)
- }
- // A decoy message was read; discard and read again.
- }
- }
- func (conn *webRTCConn) readMediaTrackPacket(p []byte) (int, error) {
- // Await opening the peer's media track, the OnTrack event. This
- // synchronization is necessary since AwaitReadyToProxy returns before
- // receiving a media track packet from the peer, which triggers OnTrack.
- select {
- case <-conn.receiveMediaTrackOpenedSignal:
- case <-conn.closedSignal:
- return 0, errors.TraceNew("closed")
- }
- if conn.IsClosed() {
- return 0, errors.TraceNew("closed")
- }
- // Don't hold this lock, or else concurrent Writes will be blocked.
- conn.mutex.Lock()
- receiveMediaTrack := conn.receiveMediaTrack
- conn.mutex.Unlock()
- if receiveMediaTrack == nil {
- return 0, errors.TraceNew("no media track")
- }
- conn.readMutex.Lock()
- defer conn.readMutex.Unlock()
- // Use the lower-level Read and Unmarshal functions to avoid per-call allocations
- // performed by the higher-level ReadRTP.
- n, _, err := receiveMediaTrack.Read(conn.readBuffer)
- if err != nil {
- return 0, errors.TraceReader(err)
- }
- err = conn.receiveMediaTrackPacket.Unmarshal(conn.readBuffer[:n])
- if err != nil {
- return 0, errors.Trace(err)
- }
- payload := conn.receiveMediaTrackPacket.Payload
- if len(payload) < 1 {
- return 0, errors.TraceNew("invalid padding")
- }
- // Read the padding header byte, which is always present (see comment in
- // writeMediaTrackPacket).
- paddingSize := int(payload[0])
- if paddingSize == 255 {
- // When the header is 255, this is a decoy packet with no application
- // payload. Discard the entire packet. Return n = 0 bytes read, and
- // the caller will read again.
- return 0, nil
- }
- if len(payload) < 1+paddingSize {
- return 0, errors.Tracef("invalid padding: %d < %d", len(payload), 1+paddingSize)
- }
- payload = payload[1+paddingSize:]
- // Unlike the data channel case, there is no carry over data left in
- // conn.readBuffer between readMediaTrackPacket calls: the entire packet
- // payload must be read in this one call.
- if len(p) < len(payload) {
- return 0, errors.Tracef("read buffer too short: %d < %d", len(p), len(payload))
- }
- copy(p, payload)
- // When decoy messages are enabled, periodically respond to an incoming
- // messages with an immediate outbound decoy message.
- //
- // writeMediaTrackPacket handles conn.decoyMessageCount, which is
- // synchronized with conn.WriteMutex, as well as other specific logic.
- // Here we just signal writeDataChannelMessage based on the read event.
- if !conn.decoyDone.Load() {
- _, _ = conn.writeMediaTrackPacket(nil, true)
- }
- return len(payload), nil
- }
- func (conn *webRTCConn) writeMediaTrackPacket(p []byte, decoy bool) (int, error) {
- if p != nil && decoy {
- return 0, errors.TraceNew("invalid write parameters")
- }
- if conn.IsClosed() {
- return 0, errors.TraceNew("closed")
- }
- // Don't hold this lock, or else concurrent Writes will be blocked.
- conn.mutex.Lock()
- sendMediaTrack := conn.sendMediaTrack
- conn.mutex.Unlock()
- if sendMediaTrack == nil {
- return 0, errors.TraceNew("no media track")
- }
- conn.writeMutex.Lock()
- defer conn.writeMutex.Unlock()
- // Packet writes can't be split.
- maxRTPPayloadLength := mediaTrackMaxRTPPayloadLength
- if len(p) > maxRTPPayloadLength {
- return 0, errors.Tracef("write too large: %d > %d", len(p), maxRTPPayloadLength)
- }
- // Determine padding size and padding header size.
- // Limitation: unlike data channel padding, the header size is fixed, not
- // a varint, and is always sent. This is due to the fixed QUIC max packet
- // size adjustment. To limit the overhead, and because the maximum SRTP
- // payload size is much smaller than the maximum data channel message
- // size, the padding is limited to 254 bytes, represented with a 1 byte
- // header. The value 255 is reserved to signal that the entire packet is
- // a decoy packet.
- conn.trafficShapingBuffer.Reset()
- if decoy {
- if conn.decoyMessageCount < 1 {
- return 0, nil
- }
- if !conn.trafficShapingPRNG.FlipWeightedCoin(
- conn.config.TrafficShapingParameters.DecoyMessageProbability) {
- return 0, nil
- }
- conn.decoyMessageCount -= 1
- // When sending a decoy message, the entire message is padding, and
- // the padding can be up to the full packet size.
- //
- // Note that the actual decoy payload size is decoySize+1, including
- // the padding header.
- decoySize := conn.trafficShapingPRNG.Range(
- conn.config.TrafficShapingParameters.MinDecoySize,
- conn.config.TrafficShapingParameters.MaxDecoySize)
- if decoySize > maxRTPPayloadLength-1 {
- // Ensure there's space for the 1 byte padding header.
- decoySize = maxRTPPayloadLength - 1
- }
- // Set the padding header to 255, which indicates a decoy packet.
- conn.trafficShapingBuffer.WriteByte(255)
- if decoySize > 0 {
- conn.trafficShapingBuffer.Write(prng.Bytes(decoySize))
- }
- if conn.decoyMessageCount == 0 {
- // Set the shared flag that readMessage uses to stop invoking
- // writeMessage for decoy events.
- conn.decoyDone.Store(true)
- }
- } else {
- // Add padding to a normal write.
- paddingSize := 0
- if conn.paddedMessageCount > 0 {
- paddingSize = prng.Range(
- conn.config.TrafficShapingParameters.MinPaddingSize,
- conn.config.TrafficShapingParameters.MaxPaddingSize)
- if paddingSize > 254 {
- // The maximum padding size is 254.
- paddingSize = 254
- }
- if len(p)+1+paddingSize > maxRTPPayloadLength {
- paddingSize -= (len(p) + 1 + paddingSize) - maxRTPPayloadLength
- }
- if paddingSize < 0 {
- paddingSize = 0
- }
- conn.paddedMessageCount -= 1
- }
- conn.trafficShapingBuffer.WriteByte(byte(paddingSize))
- if paddingSize > 0 {
- conn.trafficShapingBuffer.Write(prng.Bytes(paddingSize))
- }
- conn.trafficShapingBuffer.Write(p)
- }
- paddedPayload := conn.trafficShapingBuffer.Bytes()
- // Sanity check, in case there's a bug in the padding logic above; +1 here
- // is the padding header.
- if len(paddedPayload) > maxRTPPayloadLength+1 {
- return 0, errors.Tracef("write too large: %d > %d", len(paddedPayload), maxRTPPayloadLength)
- }
- // Send the RTP packet.
- // Dynamic plaintext RTP header values are set here: the sequence number
- // is set when sending the packet; the timestamp, initialized in
- // newWebRTCConn, is updated once payload equivalent to a complete
- // video "frame" has been sent. See the "Plaintext RTP header fields"
- // comment in newWebRTCConn.
- conn.sendMediaTrackPacket.SequenceNumber = conn.sendMediaTrackSequencer.NextSequenceNumber()
- conn.sendMediaTrackPacket.Payload = paddedPayload
- err := sendMediaTrack.WriteRTP(conn.sendMediaTrackPacket)
- if err != nil {
- return 0, errors.Trace(err)
- }
- conn.sendMediaTrackRemainingFrameSize -= len(paddedPayload)
- if conn.sendMediaTrackRemainingFrameSize <= 0 {
- conn.sendMediaTrackPacket.Timestamp += uint32(conn.sendMediaTrackTimestampTick)
- conn.sendMediaTrackRemainingFrameSize = prng.Range(conn.sendMediaTrackFrameSizeRange[0], conn.sendMediaTrackFrameSizeRange[1])
- }
- return len(p), nil
- }
- func (conn *webRTCConn) addRTPReliabilityLayer(ctx context.Context) error {
- // Add a QUIC layer over the SRTP packet flow to provide reliable delivery
- // and ordering. The proxy runs a QUIC server and the client runs a QUIC
- // client that connects to the proxy's server. As all of the QUIC traffic
- // is encapsulated in the secure SRTP layer.
- // Wrap the RTP track read and write operations in a mediaTrackPacketConn
- // provides the net.PacketConn interface required by quic-go. There is no
- // Close-on-error for mediaTrackPacketConn since it doesn't allocate or use
- // any resources.
- mediaTrackPacketConn := newMediaTrackPacketConn(conn)
- // Use the Psiphon QUIC obfuscated PSK mechanism to facilitate a faster
- // QUIC TLS handshake. QUIC client hello randomization is also
- // initialized, as it will vary the QUIC handshake traffic shape within
- // the SRTP packet flow.
- var obfuscatedPSKKey [32]byte
- obfuscationSecret, err := deriveObfuscationSecret(
- conn.config.ClientRootObfuscationSecret, "in-proxy-RTP-QUIC-reliability-layer")
- if err != nil {
- return errors.Trace(err)
- }
- obfuscationSeed := prng.Seed(obfuscationSecret)
- copy(obfuscatedPSKKey[:], prng.NewPRNGWithSeed(&obfuscationSeed).Bytes(len(obfuscatedPSKKey)))
- // To effectively disable them, quic-go's idle timeouts and keep-alives
- // are initialized to the maximum possible duration. The higher-level
- // WebRTC connection will provide this functionality.
- maxDuration := time.Duration(math.MaxInt64)
- // Set the handshake timeout to align with the ctx deadline. Setting
- // HandshakeIdleTimeout to maxDuration causes the quic-go dial to fail.
- // Assumes ctx has a deadline.
- deadline, _ := ctx.Deadline()
- handshakeIdleTimeout := time.Until(deadline) / 2
- if conn.isOffer {
- // The client is a QUIC client.
- // Initialize the obfuscated PSK.
- sessionCache := common.WrapClientSessionCache(tls.NewLRUClientSessionCache(1), "")
- obfuscatedSessionState, err := tls.NewObfuscatedClientSessionState(
- obfuscatedPSKKey, true, false)
- if err != nil {
- return errors.Trace(err)
- }
- sessionCache.Put(
- "", tls.MakeClientSessionState(
- obfuscatedSessionState.SessionTicket,
- obfuscatedSessionState.Vers,
- obfuscatedSessionState.CipherSuite,
- obfuscatedSessionState.MasterSecret,
- obfuscatedSessionState.CreatedAt,
- obfuscatedSessionState.AgeAdd,
- obfuscatedSessionState.UseBy))
- tlsConfig := &tls.Config{
- InsecureSkipVerify: true,
- InsecureSkipTimeVerify: true,
- NextProtos: []string{"h3"},
- ServerName: values.GetHostName(),
- ClientSessionCache: sessionCache,
- }
- maxPacketSizeAdjustment := GetQUICMaxPacketSizeAdjustment()
- // Set ClientMaxPacketSizeAdjustment to so that quic-go will produce
- // packets with a small enough max size to produce the overall target
- // packet MTU.
- quicConfig := &quic_go.Config{
- HandshakeIdleTimeout: handshakeIdleTimeout,
- MaxIdleTimeout: maxDuration,
- KeepAlivePeriod: maxDuration,
- Versions: []quic_go.Version{0x1},
- ClientHelloSeed: &obfuscationSeed,
- ClientMaxPacketSizeAdjustment: maxPacketSizeAdjustment,
- DisablePathMTUDiscovery: true,
- }
- deadline, ok := ctx.Deadline()
- if ok {
- quicConfig.HandshakeIdleTimeout = time.Until(deadline)
- }
- // Establish the QUIC connection with the server and open a single
- // data stream for relaying traffic.
- //
- // Use DialEarly, in combination with the "established" PSK, for
- // 0-RTT, which potentially allows data to be sent with the
- // handshake; this could include the open stream message from the
- // following OpenStreamSync call. There is no replay concern with
- // 0-RTT here, as the QUIC traffic is encapsualted in the secure SRTP
- // flow.
- quicConn, err := quic_go.DialEarly(
- ctx,
- mediaTrackPacketConn,
- mediaTrackPacketConn.remoteAddr,
- tlsConfig,
- quicConfig)
- if err != nil {
- return errors.Trace(err)
- }
- quicStream, err := quicConn.OpenStreamSync(ctx)
- if err != nil {
- // Ensure any background quic-go goroutines are stopped.
- _ = quicConn.CloseWithError(0, "")
- return errors.Trace(err)
- }
- conn.mediaTrackReliabilityLayer = &reliableConn{
- mediaTrackConn: mediaTrackPacketConn,
- quicConn: quicConn,
- quicStream: quicStream,
- }
- return nil
- } else {
- // The proxy is a QUIC server.
- // Use an ephemeral, self-signed certificate.
- certificate, privateKey, _, err := common.GenerateWebServerCertificate(
- values.GetHostName())
- if err != nil {
- return errors.Trace(err)
- }
- tlsCertificate, err := tls.X509KeyPair([]byte(certificate), []byte(privateKey))
- if err != nil {
- return errors.Trace(err)
- }
- tlsConfig := &tls.Config{
- Certificates: []tls.Certificate{tlsCertificate},
- NextProtos: []string{"h3"},
- }
- tlsConfig.SetSessionTicketKeys([][32]byte{
- obfuscatedPSKKey,
- })
- // Anti-probing via VerifyClientHelloRandom, for passthrough, is not
- // necessary here and is not initialized.
- quicConfig := &quic_go.Config{
- Allow0RTT: true,
- HandshakeIdleTimeout: handshakeIdleTimeout,
- MaxIdleTimeout: maxDuration,
- KeepAlivePeriod: maxDuration,
- MaxIncomingStreams: 1,
- MaxIncomingUniStreams: -1,
- VerifyClientHelloRandom: nil,
- ServerMaxPacketSizeAdjustment: func(addr net.Addr) int {
- return GetQUICMaxPacketSizeAdjustment()
- },
- }
- quicTransport := &quic_go.Transport{
- Conn: mediaTrackPacketConn,
- DisableVersionNegotiationPackets: true,
- }
- quicListener, err := quicTransport.ListenEarly(tlsConfig, quicConfig)
- if err != nil {
- return errors.Trace(err)
- }
- // Accept the single expected QUIC client and its QUIC data stream.
- quicConn, err := quicListener.Accept(ctx)
- if err != nil {
- _ = quicTransport.Close()
- return errors.Trace(err)
- }
- quicStream, err := quicConn.AcceptStream(ctx)
- if err != nil {
- _ = quicConn.CloseWithError(0, "")
- _ = quicTransport.Close()
- return errors.Trace(err)
- }
- // Closing the quic-go Transport/Listener closes all client
- // connections, so retain the Transport for the duration of the
- // overall connection.
- conn.mediaTrackReliabilityLayer = &reliableConn{
- mediaTrackConn: mediaTrackPacketConn,
- quicTransport: quicTransport,
- quicConn: quicConn,
- quicStream: quicStream,
- }
- return nil
- }
- }
- // incrementingIPv6Address provides successive, distinct IPv6 addresses from
- // the 2001:db8::/32 range, reserved for documentation purposes as defined in
- // RFC 3849. It will wrap after 2^96 calls.
- type incrementingIPv6Address struct {
- mutex sync.Mutex
- ip [12]byte
- }
- var uniqueIPv6Address incrementingIPv6Address
- func (inc *incrementingIPv6Address) next() net.IP {
- inc.mutex.Lock()
- defer inc.mutex.Unlock()
- for i := 11; i >= 0; i-- {
- inc.ip[i]++
- if inc.ip[i] != 0 {
- break
- }
- }
- ip := make([]byte, 16)
- copy(ip[0:4], []byte{0x20, 0x01, 0x0d, 0xb8})
- copy(ip[4:16], inc.ip[:])
- return net.IP(ip)
- }
- // mediaTrackPacketConn provides the required net.PacketConn interface for
- // quic-go to use to read and write packets to the RTP media track conn.
- type mediaTrackPacketConn struct {
- webRTCConn *webRTCConn
- localAddr net.Addr
- remoteAddr net.Addr
- isClosed int32
- }
- func newMediaTrackPacketConn(conn *webRTCConn) *mediaTrackPacketConn {
- // Create distinct, artificial local/remote addrs for the synthetic
- // net.PacketConn.
- //
- // For its local operations, quic-go references local/remote addrs for the
- // net.PacketConns it uses. Furthermore, the quic-go server listener
- // currently uses a singleton multiplexer, connMultiplexer, which panics
- // if multiple conns with the same local addr are added. Since this is a
- // singleton, this panic occurs even when using distinct quic-go
- // listeners per conn.
- //
- // No actual network traffic is sent to these artificial addresses.
- ip := uniqueIPv6Address.next()
- localAddr := &net.UDPAddr{IP: ip, Port: 1}
- remoteAddr := &net.UDPAddr{IP: ip, Port: 2}
- return &mediaTrackPacketConn{
- webRTCConn: conn,
- localAddr: localAddr,
- remoteAddr: remoteAddr,
- }
- }
- func (conn *mediaTrackPacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
- if atomic.LoadInt32(&conn.isClosed) == 1 {
- return 0, conn.remoteAddr, errors.TraceNew("closed")
- }
- n, err := conn.webRTCConn.readMediaTrack(p)
- return n, conn.remoteAddr, errors.TraceReader(err)
- }
- func (conn *mediaTrackPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
- if atomic.LoadInt32(&conn.isClosed) == 1 {
- return 0, errors.TraceNew("closed")
- }
- n, err := conn.webRTCConn.writeMediaTrackPacket(p, false)
- return n, errors.Trace(err)
- }
- func (conn *mediaTrackPacketConn) Close() error {
- if !atomic.CompareAndSwapInt32(&conn.isClosed, 0, 1) {
- return nil
- }
- return nil
- }
- func (conn *mediaTrackPacketConn) LocalAddr() net.Addr {
- return conn.localAddr
- }
- func (conn *mediaTrackPacketConn) SetDeadline(t time.Time) error {
- return errors.TraceNew("not supported")
- }
- func (conn *mediaTrackPacketConn) SetReadDeadline(t time.Time) error {
- // Workaround:
- //
- // When a quic-go DialEarly fails, it invokes Transport.Close. In turn,
- // Transport.Close calls this SetReadDeadline in order to interrupt any
- // blocked read. The underlying pion/webrtc.TrackRemote has a
- // SetReadDeadline. However, at this time webRTCConn.receiveMediaTrack
- // may be nil, and readMediaTrack may be blocking on
- // receiveMediaTrackOpenedSignal.
- //
- // In addition, as of v2.2.4, pion/transport/v2/packetio.Buffer.Read,
- // which underlies receiveMediaTrack.Read, isn't interrupted when
- // SetReadDeadline is update -- it only checks and applies the read
- // deadline once before blocking.
- //
- // Simply calling webRTCConn.Close unblocks both cases.
- //
- // Invoke in a goroutine to avoid a deadlock that would otherwise occur
- // when webRTCConn.Close is invoked directly, as it will call down to
- // mediaTrackPacketConn.SetReadDeadline via reliableConn.Close. The
- // webRTCConn.Close isClosed check ensures there isn't an endless loop of
- // calls.
- //
- // Assumes that mediaTrackPacketConn.SetReadDeadline is called only in
- // this terminating quic-go case.
- go func() {
- _ = conn.webRTCConn.Close()
- }()
- return nil
- }
- func (conn *mediaTrackPacketConn) SetWriteDeadline(t time.Time) error {
- return errors.TraceNew("not supported")
- }
- // reliableConn provides a reliable/ordered delivery layer on top of the media
- // track RTP conn. This is implemented as a QUIC connection.
- type reliableConn struct {
- mediaTrackConn *mediaTrackPacketConn
- quicTransport *quic_go.Transport
- quicConn quic_go.EarlyConnection
- quicStream quic_go.Stream
- readMutex sync.Mutex
- writeMutex sync.Mutex
- isClosed int32
- }
- func (conn *reliableConn) Read(b []byte) (int, error) {
- if atomic.LoadInt32(&conn.isClosed) == 1 {
- return 0, errors.TraceNew("closed")
- }
- // Add mutex to provide full net.Conn concurrency semantics.
- // https://github.com/lucas-clemente/quic-go/blob/9cc23135d0477baf83aa4715de39ae7070039cb2/stream.go#L64
- // "Read() and Write() may be called concurrently, but multiple calls to
- // "Read() or Write() individually must be synchronized manually."
- conn.readMutex.Lock()
- defer conn.readMutex.Unlock()
- n, err := conn.quicStream.Read(b)
- if quic.IsIETFErrorIndicatingClosed(err) {
- _ = conn.Close()
- err = io.EOF
- }
- return n, errors.TraceReader(err)
- }
- func (conn *reliableConn) Write(b []byte) (int, error) {
- if atomic.LoadInt32(&conn.isClosed) == 1 {
- return 0, errors.TraceNew("closed")
- }
- conn.writeMutex.Lock()
- defer conn.writeMutex.Unlock()
- n, err := conn.quicStream.Write(b)
- if quic.IsIETFErrorIndicatingClosed(err) {
- _ = conn.Close()
- if n == len(b) {
- err = nil
- }
- }
- return n, errors.Trace(err)
- }
- func (conn *reliableConn) Close() error {
- if !atomic.CompareAndSwapInt32(&conn.isClosed, 0, 1) {
- return nil
- }
- // Close mediaTrackConn first, or else quic-go's Close will attempt to
- // Write, which leads to deadlock between webRTCConn.writeMediaTrack and
- // webRTCConn.Close. The graceful QUIC close write will fail, but that's
- // not an issue.
- _ = conn.mediaTrackConn.Close()
- err := conn.quicConn.CloseWithError(0, "")
- if conn.quicTransport != nil {
- conn.quicTransport.Close()
- }
- return errors.Trace(err)
- }
- func (conn *reliableConn) LocalAddr() net.Addr {
- return conn.quicConn.LocalAddr()
- }
- func (conn *reliableConn) RemoteAddr() net.Addr {
- return conn.quicConn.RemoteAddr()
- }
- func (conn *reliableConn) SetDeadline(t time.Time) error {
- return conn.quicStream.SetDeadline(t)
- }
- func (conn *reliableConn) SetReadDeadline(t time.Time) error {
- return conn.quicStream.SetReadDeadline(t)
- }
- func (conn *reliableConn) SetWriteDeadline(t time.Time) error {
- return conn.quicStream.SetWriteDeadline(t)
- }
- // prepareSDPAddresses adjusts the SDP, pruning local network addresses and
- // adding any port mapping as a host candidate.
- func prepareSDPAddresses(
- encodedSDP []byte,
- errorOnNoCandidates bool,
- portMappingExternalAddr string,
- disableIPv6Candidates bool,
- allowPrivateIPAddressCandidates bool) ([]byte, *webRTCSDPMetrics, error) {
- modifiedSDP, metrics, err := processSDPAddresses(
- encodedSDP,
- errorOnNoCandidates,
- portMappingExternalAddr,
- disableIPv6Candidates,
- allowPrivateIPAddressCandidates,
- false,
- nil,
- common.GeoIPData{})
- return modifiedSDP, metrics, errors.Trace(err)
- }
- // filterSDPAddresses checks that the SDP does not contain an empty list of
- // candidates, bogon candidates, or candidates outside of the country and ASN
- // for the specified expectedGeoIPData. Invalid candidates are stripped and a
- // filtered SDP is returned.
- func filterSDPAddresses(
- encodedSDP []byte,
- errorOnNoCandidates bool,
- lookupGeoIP LookupGeoIP,
- expectedGeoIPData common.GeoIPData,
- allowPrivateIPAddressCandidates bool,
- filterPrivateIPAddressCandidates bool) ([]byte, *webRTCSDPMetrics, error) {
- filteredSDP, metrics, err := processSDPAddresses(
- encodedSDP,
- errorOnNoCandidates,
- "",
- false,
- allowPrivateIPAddressCandidates,
- filterPrivateIPAddressCandidates,
- lookupGeoIP,
- expectedGeoIPData)
- return filteredSDP, metrics, errors.Trace(err)
- }
- // webRTCSDPMetrics are network capability metrics values for an SDP.
- type webRTCSDPMetrics struct {
- iceCandidateTypes []ICECandidateType
- hasIPv6 bool
- hasPrivateIP bool
- filteredICECandidates []string
- }
- // processSDPAddresses is based on snowflake/common/util.StripLocalAddresses
- // https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/blob/v2.5.1/common/util/util.go#L70-99
- /*
- This file contains the license for "Snowflake"
- a free software project which provides a WebRTC pluggable transport.
- ================================================================================
- Copyright (c) 2016, Serene Han, Arlo Breault
- Copyright (c) 2019-2020, The Tor Project, Inc
- Redistribution and use in source and binary forms, with or without modification,
- are permitted provided that the following conditions are met:
- * Redistributions of source code must retain the above copyright notice, this
- list of conditions and the following disclaimer.
- * Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation and/or
- other materials provided with the distribution.
- * Neither the names of the copyright owners nor the names of its
- contributors may be used to endorse or promote products derived from this
- software without specific prior written permission.
- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
- ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
- ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
- ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- ================================================================================
- */
- func processSDPAddresses(
- encodedSDP []byte,
- errorOnNoCandidates bool,
- portMappingExternalAddr string,
- disableIPv6Candidates bool,
- allowPrivateIPAddressCandidates bool,
- filterPrivateIPAddressCandidates bool,
- lookupGeoIP LookupGeoIP,
- expectedGeoIPData common.GeoIPData) ([]byte, *webRTCSDPMetrics, error) {
- var sessionDescription sdp.SessionDescription
- err := sessionDescription.Unmarshal(encodedSDP)
- if err != nil {
- return nil, nil, errors.Trace(err)
- }
- candidateTypes := map[ICECandidateType]bool{}
- hasIPv6 := false
- hasPrivateIP := false
- filteredCandidateReasons := make(map[string]int)
- var portMappingICECandidates []sdp.Attribute
- if portMappingExternalAddr != "" {
- // Prepare ICE candidate attibute pair for the port mapping, modeled
- // after the definition of host candidates.
- host, portStr, err := net.SplitHostPort(portMappingExternalAddr)
- if err != nil {
- return nil, nil, errors.Trace(err)
- }
- port, err := strconv.Atoi(portStr)
- if err != nil {
- return nil, nil, errors.Trace(err)
- }
- // Only IPv4 port mapping addresses are supported due to the
- // NewCandidateHost limitation noted below. It is expected that port
- // mappings will be IPv4, as NAT and IPv6 is not a typical combination.
- hostIP := net.ParseIP(host)
- if hostIP != nil && hostIP.To4() != nil {
- for _, component := range []webrtc.ICEComponent{webrtc.ICEComponentRTP, webrtc.ICEComponentRTCP} {
- // The candidate ID is generated and the priority and foundation
- // use the default for hosts.
- //
- // Limitation: NewCandidateHost initializes the networkType to
- // NetworkTypeUDP4, and this field is not-exported.
- // https://github.com/pion/ice/blob/6d301287654b05a36248842c278d58d501454bff/candidate_host.go#L27-L64
- iceCandidate, err := ice.NewCandidateHost(&ice.CandidateHostConfig{
- Network: "udp",
- Address: host,
- Port: port,
- Component: uint16(component),
- })
- if err != nil {
- return nil, nil, errors.Trace(err)
- }
- portMappingICECandidates = append(
- portMappingICECandidates,
- sdp.Attribute{Key: "candidate", Value: iceCandidate.Marshal()})
- }
- candidateTypes[ICECandidatePortMapping] = true
- }
- }
- candidateCount := len(portMappingICECandidates)
- for _, mediaDescription := range sessionDescription.MediaDescriptions {
- addPortMappingCandidates := len(portMappingICECandidates) > 0
- var attributes []sdp.Attribute
- for _, attribute := range mediaDescription.Attributes {
- // Insert the port mapping candidate either before the
- // first "a=candidate", or before "a=end-of-candidates"(there may
- // be no "a=candidate" attributes).
- if addPortMappingCandidates &&
- (attribute.IsICECandidate() || attribute.Key == sdp.AttrKeyEndOfCandidates) {
- attributes = append(attributes, portMappingICECandidates...)
- addPortMappingCandidates = false
- }
- if attribute.IsICECandidate() {
- candidate, err := ice.UnmarshalCandidate(attribute.Value)
- if err != nil {
- return nil, nil, errors.Trace(err)
- }
- candidateIP := net.ParseIP(candidate.Address())
- if candidateIP == nil {
- return nil, nil, errors.TraceNew("unexpected non-IP")
- }
- candidateIsIPv6 := false
- if candidateIP.To4() == nil {
- if disableIPv6Candidates {
- reason := fmt.Sprintf("disabled %s IPv6",
- candidate.Type().String())
- filteredCandidateReasons[reason] += 1
- continue
- }
- candidateIsIPv6 = true
- }
- // Strip non-routable bogons, including RFC 1918/4193 private
- // IP addresses. Same-LAN client/proxy hops are not expected
- // to be useful, and this also avoids unnecessary network traffic.
- //
- // Well-behaved clients and proxies should strip these values;
- // the broker enforces this with filtering.
- //
- // In personal pairing mode, private IP addresses are allowed,
- // as connection may be made between devices the same LAN and
- // not all routers support NAT hairpinning.
- candidateIsPrivateIP := candidateIP.IsPrivate()
- if !GetAllowBogonWebRTCConnections() &&
- !(candidateIsPrivateIP && allowPrivateIPAddressCandidates) &&
- common.IsBogon(candidateIP) {
- version := "IPv4"
- if candidateIsIPv6 {
- version = "IPv6"
- }
- reason := fmt.Sprintf("bogon %s %s",
- candidate.Type().String(), version)
- filteredCandidateReasons[reason] += 1
- continue
- }
- // In personal pairing mode, filter out any private IP
- // addresses for which there is no corresponding local,
- // active interface. This avoids unnecessary network traffic.
- // This filtering option is applied post-broker exchange,
- // with the SDP received, via the broker, from the peer.
- if candidateIsPrivateIP && filterPrivateIPAddressCandidates {
- if !hasInterfaceForPrivateIPAddress(candidateIP) {
- continue
- }
- }
- // The broker will check that clients and proxies specify only
- // candidates that map to the same GeoIP country and ASN as
- // the client/proxy connection to the broker. This limits
- // misuse of candidates to connect to other locations.
- // Legitimate candidates will not all have the exact same IP
- // address, as there could be a mix of IPv4 and IPv6, as well
- // as potentially different NAT paths.
- //
- // In some cases, legitimate clients and proxies may
- // unintentionally submit candidates with mismatching GeoIP.
- // This can occur, for example, when a STUN candidate is only
- // a partial hole punch through double NAT, and when internal
- // network addresses misuse non-private IP ranges (so are
- // technically not bogons). Instead of outright rejecting
- // SDPs containing unexpected GeoIP candidates, they are
- // instead stripped out and the resulting filtered SDP is
- // used.
- if lookupGeoIP != nil {
- candidateGeoIPData := lookupGeoIP(candidate.Address())
- if candidateGeoIPData.Country != expectedGeoIPData.Country ||
- candidateGeoIPData.ASN != expectedGeoIPData.ASN {
- version := "IPv4"
- if candidateIsIPv6 {
- version = "IPv6"
- }
- reason := fmt.Sprintf(
- "unexpected GeoIP %s %s: %s/%s",
- candidate.Type().String(),
- version,
- candidateGeoIPData.Country,
- candidateGeoIPData.ASN)
- filteredCandidateReasons[reason] += 1
- continue
- }
- }
- if candidateIsIPv6 {
- hasIPv6 = true
- }
- if candidateIsPrivateIP {
- hasPrivateIP = true
- }
- // These types are not reported:
- // - CandidateTypeRelay: TURN servers are not used.
- // - CandidateTypePeerReflexive: this candidate type only
- // emerges later in the connection process.
- switch candidate.Type() {
- case ice.CandidateTypeHost:
- candidateTypes[ICECandidateHost] = true
- case ice.CandidateTypeServerReflexive:
- candidateTypes[ICECandidateServerReflexive] = true
- }
- candidateCount += 1
- }
- attributes = append(attributes, attribute)
- }
- mediaDescription.Attributes = attributes
- }
- if errorOnNoCandidates && candidateCount == 0 {
- return nil, nil, errors.TraceNew("no candidates")
- }
- encodedSDP, err = sessionDescription.Marshal()
- if err != nil {
- return nil, nil, errors.Trace(err)
- }
- metrics := &webRTCSDPMetrics{
- hasIPv6: hasIPv6,
- hasPrivateIP: hasPrivateIP,
- }
- for candidateType := range candidateTypes {
- metrics.iceCandidateTypes = append(metrics.iceCandidateTypes, candidateType)
- }
- for reason, count := range filteredCandidateReasons {
- metrics.filteredICECandidates = append(metrics.filteredICECandidates,
- fmt.Sprintf("%s: %d", reason, count))
- }
- return encodedSDP, metrics, nil
- }
- type pionLoggerFactory struct {
- logger common.Logger
- stopLogging func() bool
- debugLogging bool
- }
- func newPionLoggerFactory(
- logger common.Logger, stopLogging func() bool, debugLogging bool) *pionLoggerFactory {
- return &pionLoggerFactory{
- logger: logger,
- stopLogging: stopLogging,
- debugLogging: debugLogging,
- }
- }
- func (f *pionLoggerFactory) NewLogger(scope string) pion_logging.LeveledLogger {
- return newPionLogger(scope, f.logger, f.stopLogging, f.debugLogging)
- }
- // pionLogger wraps common.Logger and implements
- // https://pkg.go.dev/github.com/pion/logging#LeveledLogger for passing into
- // pion.
- type pionLogger struct {
- scope string
- logger common.Logger
- stopLogging func() bool
- debugLogging bool
- warnNoPairs int32
- }
- func newPionLogger(
- scope string, logger common.Logger, stopLogging func() bool, debugLogging bool) *pionLogger {
- return &pionLogger{
- scope: scope,
- logger: logger,
- stopLogging: stopLogging,
- debugLogging: debugLogging,
- }
- }
- func (l *pionLogger) Trace(msg string) {
- if l.stopLogging() || !l.debugLogging {
- return
- }
- l.logger.WithTrace().Debug(fmt.Sprintf("webRTC: %s: %s", l.scope, msg))
- }
- func (l *pionLogger) Tracef(format string, args ...interface{}) {
- if l.stopLogging() || !l.debugLogging {
- return
- }
- l.logger.WithTrace().Debug(fmt.Sprintf("webRTC: %s: %s", l.scope, fmt.Sprintf(format, args...)))
- }
- func (l *pionLogger) Debug(msg string) {
- if l.stopLogging() || !l.debugLogging {
- return
- }
- l.logger.WithTrace().Debug(fmt.Sprintf("[webRTC: %s: %s", l.scope, msg))
- }
- func (l *pionLogger) Debugf(format string, args ...interface{}) {
- if l.stopLogging() || !l.debugLogging {
- return
- }
- l.logger.WithTrace().Debug(fmt.Sprintf("webRTC: %s: %s", l.scope, fmt.Sprintf(format, args...)))
- }
- func (l *pionLogger) Info(msg string) {
- if l.stopLogging() {
- return
- }
- l.logger.WithTrace().Info(fmt.Sprintf("webRTC: %s: %s", l.scope, msg))
- }
- func (l *pionLogger) Infof(format string, args ...interface{}) {
- if l.stopLogging() {
- return
- }
- l.logger.WithTrace().Info(fmt.Sprintf("webRTC: %s: %s", l.scope, fmt.Sprintf(format, args...)))
- }
- func (l *pionLogger) Warn(msg string) {
- if l.stopLogging() {
- return
- }
- // To reduce diagnostic log noise, only log this message once per dial attempt.
- if msg == "Failed to ping without candidate pairs. Connection is not possible yet." &&
- !atomic.CompareAndSwapInt32(&l.warnNoPairs, 0, 1) {
- return
- }
- l.logger.WithTrace().Warning(fmt.Sprintf("webRTC: %s: %s", l.scope, msg))
- }
- func (l *pionLogger) Warnf(format string, args ...interface{}) {
- if l.stopLogging() {
- return
- }
- l.logger.WithTrace().Warning(fmt.Sprintf("webRTC: %s: %s", l.scope, fmt.Sprintf(format, args...)))
- }
- func (l *pionLogger) Error(msg string) {
- if l.stopLogging() {
- return
- }
- l.logger.WithTrace().Error(fmt.Sprintf("webRTC: %s: %s", l.scope, msg))
- }
- func (l *pionLogger) Errorf(format string, args ...interface{}) {
- if l.stopLogging() {
- return
- }
- l.logger.WithTrace().Error(fmt.Sprintf("webRTC: %s: %s", l.scope, fmt.Sprintf(format, args...)))
- }
- func hasInterfaceForPrivateIPAddress(IP net.IP) bool {
- if !IP.IsPrivate() {
- return false
- }
- // The anet package is used to work around net.Interfaces not working on
- // Android at this time: https://github.com/golang/go/issues/40569.
- //
- // Any errors are silently dropped; the caller will proceed without using
- // the input private IP; and equivalent anet calls are made in
- // pionNetwork.Interfaces, with errors logged.
- netInterfaces, err := anet.Interfaces()
- if err != nil {
- return false
- }
- for _, netInterface := range netInterfaces {
- // Note: don't exclude interfaces with the net.FlagPointToPoint flag,
- // which is set for certain mobile networks
- if netInterface.Flags&net.FlagUp == 0 {
- continue
- }
- addrs, err := anet.InterfaceAddrsByInterface(&netInterface)
- if err != nil {
- continue
- }
- for _, addr := range addrs {
- _, IPNet, err := net.ParseCIDR(addr.String())
- if err != nil {
- continue
- }
- if IPNet.Contains(IP) {
- return true
- }
- }
- }
- return false
- }
- // pionNetwork implements pion/transport.Net.
- //
- // Via the SettingsEngine, pion is configured to use a pionNetwork instance,
- // which providing alternative implementations for various network functions.
- // The Interfaces implementation provides a workaround for Android
- // net.Interfaces issues and reduces the number of IPv6 candidates to avoid
- // excess STUN requests; and the ResolveUDPAddr implementation hooks into the
- // Psiphon custom resolver.
- type pionNetwork struct {
- dialCtx context.Context
- logger pion_logging.LeveledLogger
- webRTCDialCoordinator WebRTCDialCoordinator
- }
- func newPionNetwork(
- dialCtx context.Context,
- logger pion_logging.LeveledLogger,
- webRTCDialCoordinator WebRTCDialCoordinator) *pionNetwork {
- return &pionNetwork{
- dialCtx: dialCtx,
- logger: logger,
- webRTCDialCoordinator: webRTCDialCoordinator,
- }
- }
- func (p *pionNetwork) Interfaces() ([]*transport.Interface, error) {
- // To determine the active IPv4 and IPv6 interfaces, let the OS bind IPv4
- // and IPv6 UDP sockets with a specified external destination address.
- // Then iterate over all interfaces, but return interface info for only
- // the interfaces those sockets were bound to.
- //
- // The destination IPs are the IPs that currently resolve for example.com.
- // No actual traffic to these IPs or example.com is sent, as the UDP
- // sockets are not used to send any packets.
- //
- // This scheme should select just one IPv4 and one IPv6 address, which
- // should be the active, externally routable addresses, and the IPv6
- // address should be the preferred, non-deprecated temporary IPv6 address.
- //
- // In post-ICE gathering processing, processSDPAddresses will also strip
- // all bogon addresses, so there is no explicit bogon check here.
- //
- // Limitations:
- //
- // - The active interface could change between the socket operation and
- // iterating over all interfaces. Higher-level code is expected to
- // react to active network changes.
- //
- // - The public IPs for example.com may not be robust in all routing
- // situations. Alternatively, we could use the configured STUN server
- // as the test destination, but the STUN server domain is not resolved
- // at this point and STUN is not always configured and used.
- //
- // - The results could be cached and reused.
- var defaultIPv4, defaultIPv6 net.IP
- udpConnIPv4, err := p.webRTCDialCoordinator.UDPConn(
- context.Background(), "udp4", "93.184.216.34:3478")
- if err == nil {
- defaultIPv4 = udpConnIPv4.LocalAddr().(*net.UDPAddr).IP
- udpConnIPv4.Close()
- }
- udpConnIPv6, err := p.webRTCDialCoordinator.UDPConn(
- context.Background(), "udp6", "[2606:2800:220:1:248:1893:25c8:1946]:3478")
- if err == nil {
- defaultIPv6 = udpConnIPv6.LocalAddr().(*net.UDPAddr).IP
- udpConnIPv6.Close()
- }
- // The anet package is used to work around net.Interfaces not working on
- // Android at this time: https://github.com/golang/go/issues/40569.
- transportInterfaces := []*transport.Interface{}
- netInterfaces, err := anet.Interfaces()
- if err != nil {
- return nil, errors.Trace(err)
- }
- for _, netInterface := range netInterfaces {
- // Note: don't exclude interfaces with the net.FlagPointToPoint flag,
- // which is set for certain mobile networks
- if (netInterface.Flags&net.FlagUp == 0) ||
- (!GetAllowBogonWebRTCConnections() && (netInterface.Flags&net.FlagLoopback != 0)) {
- continue
- }
- addrs, err := anet.InterfaceAddrsByInterface(&netInterface)
- if err != nil {
- return nil, errors.Trace(err)
- }
- var transportInterface *transport.Interface
- for _, addr := range addrs {
- IP, _, err := net.ParseCIDR(addr.String())
- if err != nil {
- return nil, errors.Trace(err)
- }
- if IP.Equal(defaultIPv4) || IP.Equal(defaultIPv6) ||
- (GetAllowBogonWebRTCConnections() && (netInterface.Flags&net.FlagLoopback != 0)) {
- if transportInterface == nil {
- transportInterface = transport.NewInterface(netInterface)
- }
- transportInterface.AddAddress(addr)
- }
- }
- if transportInterface != nil {
- transportInterfaces = append(transportInterfaces, transportInterface)
- }
- }
- return transportInterfaces, nil
- }
- func (p *pionNetwork) ResolveUDPAddr(network, address string) (retAddr *net.UDPAddr, retErr error) {
- defer func() {
- if retErr != nil {
- // Explicitly log an error since certain pion operations -- e.g.,
- // ICE gathering -- don't propagate all pion/transport.Net errors.
- p.logger.Errorf("pionNetwork.ResolveUDPAddr failed: %v", retErr)
- }
- }()
- // Currently, pion appears to call ResolveUDPAddr with "udp4"/udp6"
- // instead of "ip4"/"ip6", as expected by, e.g., net.Resolver.LookupIP.
- // Convert to "ip4"/"ip6".
- // Specifying v4/v6 ensures that the resolved IP address is the correct
- // type. In the case of STUN servers, the correct type is required in
- // order to create the correct IPv4 or IPv6 whole punch address.
- switch network {
- case "udp4", "tcp4":
- network = "ip4"
- case "udp6", "tcp6":
- network = "ip6"
- default:
- network = "ip"
- }
- // Currently, pion appears to call ResolveUDPAddr with an improperly
- // formatted address, <IPv6>:443 not [<IPv6>]:443; handle this case.
- index := strings.LastIndex(address, ":")
- if index != -1 {
- address = net.JoinHostPort(address[:index], address[index+1:])
- }
- // Use the Psiphon custom resolver to resolve any STUN server domains.
- resolvedAddress, err := p.webRTCDialCoordinator.ResolveAddress(
- p.dialCtx, network, address)
- if err != nil {
- return nil, errors.Trace(err)
- }
- IPStr, portStr, err := net.SplitHostPort(resolvedAddress)
- if err != nil {
- return nil, errors.Trace(err)
- }
- IP := net.ParseIP(IPStr)
- if IP == nil {
- return nil, errors.TraceNew("invalid IP address")
- }
- port, err := strconv.Atoi(portStr)
- if err != nil {
- return nil, errors.Trace(err)
- }
- return &net.UDPAddr{IP: IP, Port: port}, nil
- }
- var errNotSupported = std_errors.New("not supported")
- func (p *pionNetwork) ListenPacket(network string, address string) (net.PacketConn, error) {
- // Explicitly log an error since certain pion operations -- e.g., ICE
- // gathering -- don't propagate all pion/transport.Net errors.
- p.logger.Errorf("unexpected pionNetwork.ListenPacket call from %s", stacktrace.GetParentFunctionName())
- return nil, errors.Trace(errNotSupported)
- }
- func (p *pionNetwork) ListenUDP(network string, locAddr *net.UDPAddr) (transport.UDPConn, error) {
- p.logger.Errorf("unexpected pionNetwork.ListenUDP call from %s", stacktrace.GetParentFunctionName())
- return nil, errors.Trace(errNotSupported)
- }
- func (p *pionNetwork) ListenTCP(network string, laddr *net.TCPAddr) (transport.TCPListener, error) {
- p.logger.Errorf("unexpected pionNetwork.ListenTCP call from %s", stacktrace.GetParentFunctionName())
- return nil, errors.Trace(errNotSupported)
- }
- func (p *pionNetwork) Dial(network, address string) (net.Conn, error) {
- p.logger.Errorf("unexpected pionNetwork.Dial call from %s", stacktrace.GetParentFunctionName())
- return nil, errors.Trace(errNotSupported)
- }
- func (p *pionNetwork) DialUDP(network string, laddr, raddr *net.UDPAddr) (transport.UDPConn, error) {
- p.logger.Errorf("unexpected pionNetwork.DialUDP call from %s", stacktrace.GetParentFunctionName())
- return nil, errors.Trace(errNotSupported)
- }
- func (p *pionNetwork) DialTCP(network string, laddr, raddr *net.TCPAddr) (transport.TCPConn, error) {
- p.logger.Errorf("unexpected pionNetwork.DialTCP call from %s", stacktrace.GetParentFunctionName())
- return nil, errors.Trace(errNotSupported)
- }
- func (p *pionNetwork) ResolveIPAddr(network, address string) (*net.IPAddr, error) {
- p.logger.Errorf("unexpected pionNetwork.ResolveIPAddr call from %s", stacktrace.GetParentFunctionName())
- return nil, errors.Trace(errNotSupported)
- }
- func (p *pionNetwork) ResolveTCPAddr(network, address string) (*net.TCPAddr, error) {
- p.logger.Errorf("unexpected pionNetwork.ResolveTCPAddr call from %s", stacktrace.GetParentFunctionName())
- return nil, errors.Trace(errNotSupported)
- }
- func (p *pionNetwork) InterfaceByIndex(index int) (*transport.Interface, error) {
- p.logger.Errorf("unexpected pionNetwork.InterfaceByIndex call from %s", stacktrace.GetParentFunctionName())
- return nil, errors.Trace(errNotSupported)
- }
- func (p *pionNetwork) InterfaceByName(name string) (*transport.Interface, error) {
- p.logger.Errorf("unexpected pionNetwork.InterfaceByName call from %s", stacktrace.GetParentFunctionName())
- return nil, errors.Trace(errNotSupported)
- }
- func (p *pionNetwork) CreateDialer(dialer *net.Dialer) transport.Dialer {
- return &pionNetworkDialer{pionNetwork: p}
- }
- type pionNetworkDialer struct {
- pionNetwork *pionNetwork
- }
- func (d pionNetworkDialer) Dial(network, address string) (net.Conn, error) {
- d.pionNetwork.logger.Errorf("unexpected pionNetworkDialer.Dial call from %s", stacktrace.GetParentFunctionName())
- return nil, errors.Trace(errNotSupported)
- }
|