inproxy.go 92 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "context"
  23. "encoding/binary"
  24. std_errors "errors"
  25. "fmt"
  26. "io"
  27. "net"
  28. "net/http"
  29. "net/netip"
  30. "strconv"
  31. "sync"
  32. "sync/atomic"
  33. "syscall"
  34. "time"
  35. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  36. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/resolver"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tun"
  42. utls "github.com/Psiphon-Labs/utls"
  43. "github.com/cespare/xxhash"
  44. )
  45. // InproxyBrokerClientManager manages an InproxyBrokerClientInstance, an
  46. // in-proxy broker client, and its associated broker dial parameters, that
  47. // may be shared by multiple client dials or proxy instances. There is no
  48. // explicit close operation for the managed InproxyBrokerClientInstance.
  49. //
  50. // Once used, the current InproxyBrokerClientInstance and its broker client is
  51. // left actively connected to the broker, to minimize transport round trips
  52. // for additional requests.
  53. //
  54. // The InproxyBrokerClientManager and its components implement a replay system
  55. // for broker client dials. As one broker client is shared access multiple
  56. // client in-proxy dials, the broker dial parameters are replayed
  57. // independently from tunnel dial parameters.
  58. //
  59. // The NewInproxyBrokerClientInstance layer provides a fixed association
  60. // between a broker client and its broker dial parameters, ensuring that
  61. // in-proxy success/failure callbacks reference the correct replay parameters
  62. // when setting or clearing replay.
  63. //
  64. // A new InproxyBrokerClientInstance, including the broker dial parameters and
  65. // broker client, is instantiated when the active network ID changes, using
  66. // tactics for the new network.
  67. type InproxyBrokerClientManager struct {
  68. config *Config
  69. isProxy bool
  70. tlsCache utls.ClientSessionCache
  71. mutex sync.Mutex
  72. brokerSelectCount int
  73. networkID string
  74. brokerClientInstance *InproxyBrokerClientInstance
  75. }
  76. // NewInproxyBrokerClientManager creates a new InproxyBrokerClientManager.
  77. // NewInproxyBrokerClientManager does not perform any network operations; the
  78. // managed InproxyBrokerClientInstance is initialized when used for a round
  79. // trip.
  80. func NewInproxyBrokerClientManager(
  81. config *Config, isProxy bool, tlsCache utls.ClientSessionCache) *InproxyBrokerClientManager {
  82. b := &InproxyBrokerClientManager{
  83. config: config,
  84. isProxy: isProxy,
  85. tlsCache: tlsCache,
  86. }
  87. // b.brokerClientInstance is initialized on demand, when getBrokerClient
  88. // is called.
  89. return b
  90. }
  91. // TacticsApplied implements the TacticsAppliedReceiver interface, and is
  92. // called when tactics have changed, which triggers a broker client reset in
  93. // order to apply potentially changed parameters.
  94. func (b *InproxyBrokerClientManager) TacticsApplied() error {
  95. b.mutex.Lock()
  96. defer b.mutex.Unlock()
  97. // Don't reset when not yet initialized; b.brokerClientInstance is
  98. // initialized only on demand.
  99. if b.brokerClientInstance == nil {
  100. return nil
  101. }
  102. // TODO: as a future future enhancement, don't reset when the tactics
  103. // brokerSpecs.Hash() is unchanged?
  104. return errors.Trace(b.reset(resetBrokerClientReasonTacticsApplied))
  105. }
  106. // NetworkChanged is called when the active network changes, to trigger a
  107. // broker client reset.
  108. func (b *InproxyBrokerClientManager) NetworkChanged() error {
  109. b.mutex.Lock()
  110. defer b.mutex.Unlock()
  111. // Don't reset when not yet initialized; b.brokerClientInstance is
  112. // initialized only on demand.
  113. if b.brokerClientInstance == nil {
  114. return nil
  115. }
  116. return errors.Trace(b.reset(resetBrokerClientReasonNetworkChanged))
  117. }
  118. // GetBrokerClient returns the current, shared broker client and its
  119. // corresponding dial parameters (for metrics logging). If there is no
  120. // current broker client, if the network ID differs from the network ID
  121. // associated with the previous broker client, a new broker client is
  122. // initialized.
  123. func (b *InproxyBrokerClientManager) GetBrokerClient(
  124. networkID string) (*inproxy.BrokerClient, *InproxyBrokerDialParameters, error) {
  125. b.mutex.Lock()
  126. if b.brokerClientInstance == nil || b.networkID != networkID {
  127. err := b.reset(resetBrokerClientReasonInit)
  128. if err != nil {
  129. b.mutex.Unlock()
  130. return nil, nil, errors.Trace(err)
  131. }
  132. }
  133. brokerClientInstance := b.brokerClientInstance
  134. // Release lock before calling brokerClientInstance.HasSuccess. Otherwise,
  135. // there's a potential deadlock that would result from this code path
  136. // locking InproxyBrokerClientManager.mutex then InproxyBrokerClientInstance.mutex,
  137. // while the BrokerClientRoundTripperFailed code path locks in the reverse order.
  138. b.mutex.Unlock()
  139. // Set isReuse, which will record a metric indicating if this broker
  140. // client has already been used for a successful round trip, a case which
  141. // should result in faster overall dials.
  142. //
  143. // Limitations with HasSuccess, and the resulting isReuse metric: in some
  144. // cases, it's possible that the underlying TLS connection is still
  145. // redialed by net/http; or it's possible that the Noise session is
  146. // invalid/expired and must be reestablished; or it can be the case that
  147. // a shared broker client is only partially established at this point in
  148. // time.
  149. //
  150. // Return a shallow copy of the broker dial params in order to record the
  151. // correct isReuse, which varies depending on previous use.
  152. brokerDialParams := *brokerClientInstance.brokerDialParams
  153. brokerDialParams.isReuse = brokerClientInstance.HasSuccess()
  154. // The b.brokerClientInstance.brokerClient is wired up to refer back to
  155. // b.brokerClientInstance.brokerDialParams/roundTripper, etc.
  156. return brokerClientInstance.brokerClient,
  157. &brokerDialParams,
  158. nil
  159. }
  160. func (b *InproxyBrokerClientManager) resetBrokerClientOnRoundTripperFailed(
  161. brokerClientInstance *InproxyBrokerClientInstance) error {
  162. b.mutex.Lock()
  163. defer b.mutex.Unlock()
  164. if b.brokerClientInstance != brokerClientInstance {
  165. // Ignore the reset if the signal comes from the non-current
  166. // brokerClientInstance, which may occur when multiple in-flight
  167. // round trips fail in close proximity.
  168. return nil
  169. }
  170. return errors.Trace(b.reset(resetBrokerClientReasonRoundTripperFailed))
  171. }
  172. func (b *InproxyBrokerClientManager) resetBrokerClientOnNoMatch(
  173. brokerClientInstance *InproxyBrokerClientInstance) error {
  174. // Ignore the no match callback for proxies. For personal pairing, the
  175. // broker rotation scheme has clients moving brokers to find relatively
  176. // static proxies. For common pairing, we want to achieve balanced supply
  177. // across brokers.
  178. //
  179. // Currently, inproxy.BrokerDialCoordinator.BrokerClientNoMatch is only
  180. // wired up for clients, but this check ensures it'll still be ignored in
  181. // case that changes.
  182. if b.isProxy {
  183. return nil
  184. }
  185. if b.brokerClientInstance != brokerClientInstance {
  186. // See comment for same logic in resetBrokerClientOnRoundTripperFailed.
  187. return nil
  188. }
  189. p := b.config.GetParameters().Get()
  190. defer p.Close()
  191. probability := parameters.InproxyClientNoMatchFailoverProbability
  192. if b.config.IsInproxyClientPersonalPairingMode() {
  193. probability = parameters.InproxyClientNoMatchFailoverPersonalProbability
  194. }
  195. if !p.WeightedCoinFlip(probability) {
  196. return nil
  197. }
  198. return errors.Trace(b.reset(resetBrokerClientReasonRoundNoMatch))
  199. }
  200. type resetBrokerClientReason int
  201. const (
  202. resetBrokerClientReasonInit resetBrokerClientReason = iota + 1
  203. resetBrokerClientReasonTacticsApplied
  204. resetBrokerClientReasonNetworkChanged
  205. resetBrokerClientReasonRoundTripperFailed
  206. resetBrokerClientReasonRoundNoMatch
  207. )
  208. func (b *InproxyBrokerClientManager) reset(reason resetBrokerClientReason) error {
  209. // Assumes b.mutex lock is held.
  210. if b.brokerClientInstance != nil {
  211. // Close the existing broker client. This will close all underlying
  212. // network connections, interrupting any in-flight requests. This
  213. // close is invoked in the resetBrokerClientOnRoundTripperFailed
  214. // case, where it's expected that the round tripped has permanently
  215. // failed.
  216. b.brokerClientInstance.Close()
  217. }
  218. // b.brokerSelectCount tracks the number of broker resets and is used to
  219. // iterate over the brokers in a deterministic rotation when running in
  220. // personal pairing mode.
  221. switch reason {
  222. case resetBrokerClientReasonInit,
  223. resetBrokerClientReasonTacticsApplied,
  224. resetBrokerClientReasonNetworkChanged:
  225. b.brokerSelectCount = 0
  226. case resetBrokerClientReasonRoundTripperFailed,
  227. resetBrokerClientReasonRoundNoMatch:
  228. b.brokerSelectCount += 1
  229. }
  230. // Any existing broker client is removed, even if
  231. // NewInproxyBrokerClientInstance fails. This ensures, for example, that
  232. // an existing broker client is removed when its spec is no longer
  233. // available in tactics.
  234. b.networkID = ""
  235. b.brokerClientInstance = nil
  236. networkID := b.config.GetNetworkID()
  237. brokerClientInstance, err := NewInproxyBrokerClientInstance(
  238. b.config,
  239. b,
  240. networkID,
  241. b.isProxy,
  242. b.brokerSelectCount,
  243. reason == resetBrokerClientReasonRoundNoMatch)
  244. if err != nil {
  245. return errors.Trace(err)
  246. }
  247. b.networkID = networkID
  248. b.brokerClientInstance = brokerClientInstance
  249. return nil
  250. }
  251. // InproxyBrokerClientInstance pairs an inproxy.BrokerClient instance with an
  252. // implementation of the inproxy.BrokerDialCoordinator interface and the
  253. // associated, underlying broker dial parameters. InproxyBrokerClientInstance
  254. // implements broker client dial replay.
  255. type InproxyBrokerClientInstance struct {
  256. config *Config
  257. brokerClientManager *InproxyBrokerClientManager
  258. networkID string
  259. brokerClientPrivateKey inproxy.SessionPrivateKey
  260. brokerClient *inproxy.BrokerClient
  261. brokerPublicKey inproxy.SessionPublicKey
  262. brokerRootObfuscationSecret inproxy.ObfuscationSecret
  263. brokerDialParams *InproxyBrokerDialParameters
  264. replayEnabled bool
  265. roundTripper *InproxyBrokerRoundTripper
  266. personalCompartmentIDs []inproxy.ID
  267. commonCompartmentIDs []inproxy.ID
  268. disableWaitToShareSession bool
  269. sessionHandshakeTimeout time.Duration
  270. announceRequestTimeout time.Duration
  271. announceDelay time.Duration
  272. announceMaxBackoffDelay time.Duration
  273. announceDelayJitter float64
  274. answerRequestTimeout time.Duration
  275. offerRequestTimeout time.Duration
  276. offerRequestPersonalTimeout time.Duration
  277. offerRetryDelay time.Duration
  278. offerRetryJitter float64
  279. relayedPacketRequestTimeout time.Duration
  280. dslRequestTimeout time.Duration
  281. replayRetainFailedProbability float64
  282. replayUpdateFrequency time.Duration
  283. retryOnFailedPeriod time.Duration
  284. mutex sync.Mutex
  285. lastStoreReplay time.Time
  286. lastSuccess time.Time
  287. }
  288. // NewInproxyBrokerClientInstance creates a new InproxyBrokerClientInstance.
  289. // NewInproxyBrokerClientManager does not perform any network operations; the
  290. // new InproxyBrokerClientInstance is initialized when used for a round
  291. // trip.
  292. func NewInproxyBrokerClientInstance(
  293. config *Config,
  294. brokerClientManager *InproxyBrokerClientManager,
  295. networkID string,
  296. isProxy bool,
  297. brokerSelectCount int,
  298. resetReasonNoMatch bool) (*InproxyBrokerClientInstance, error) {
  299. p := config.GetParameters().Get()
  300. defer p.Close()
  301. // Select common or personal compartment IDs. Clients must provide at
  302. // least on compartment ID.
  303. //
  304. // A here check for !isProxy && len(commonCompartmentIDs) == 0 && len
  305. // (personalCompartmentIDs) == 0 is now deferred until
  306. // inproxy.DialClient, to allow broker connections for DSL requests
  307. // without in-proxy compartment IDs.
  308. commonCompartmentIDs, personalCompartmentIDs, err :=
  309. prepareInproxyCompartmentIDs(config, p, isProxy)
  310. if err != nil {
  311. return nil, errors.Trace(err)
  312. }
  313. if len(personalCompartmentIDs) > 1 {
  314. return nil, errors.TraceNew("unexpected multiple personal compartment IDs")
  315. }
  316. // Select the broker to use, optionally favoring brokers with replay data.
  317. // In the InproxyBrokerSpecs calls, the first non-empty tactics parameter
  318. // list is used.
  319. //
  320. // Optional broker specs may be used to specify broker(s) dedicated to
  321. // personal pairing, a configuration which can be used to reserve more
  322. // capacity for personal pairing, given the simple rendezvous scheme below.
  323. brokerSpecs := getInproxyBrokerSpecs(config, p, isProxy)
  324. if len(brokerSpecs) == 0 {
  325. return nil, errors.TraceNew("no broker specs")
  326. }
  327. // Select a broker.
  328. // In common pairing mode, the available brokers are shuffled before
  329. // selection, for random load balancing. Brokers with available dial
  330. // parameter replay data are preferred. When rotating brokers due to a no
  331. // match, the available replay data is ignored to increase the chance of
  332. // selecting a different broker.
  333. //
  334. // In personal pairing mode, arrange for the proxy and client to
  335. // rendezvous at the same broker by shuffling based on the shared
  336. // personal compartment ID. Both the client and proxy will select the
  337. // same initial broker, and fail over to other brokers in the same order.
  338. // By design, clients will move between brokers aggressively, rotating on
  339. // no-match responses and applying a shorter client offer timeout; while
  340. // proxies will remain in place in order to be found. Since rendezvous
  341. // depends on the ordering, each broker is selected in shuffle order;
  342. // dial parameter replay data is used when available but not considered
  343. // in selection ordering. The brokerSelectCount input is used to
  344. // progressively index into the list of shuffled brokers.
  345. //
  346. // Potential future enhancements:
  347. //
  348. // - Use brokerSelectCount in the common pairing case as well, to ensure
  349. // that a no-match reset always selects a different broker; but, unlike
  350. // the personal pairing logic, still prefer brokers with replay rather
  351. // than following a strict shuffle order.
  352. //
  353. // - The common pairing no match broker rotation is intended to partially
  354. // mitigate poor common proxy load balancing that can leave a broker
  355. // with little proxy supply. A more robust mitigation would be to make
  356. // proxies distribute announcements across multiple or even all brokers.
  357. personalPairing := len(personalCompartmentIDs) > 0
  358. // In the following cases, don't shuffle or otherwise mutate the original
  359. // broker spec slice, as it is a tactics parameter.
  360. if personalPairing {
  361. if len(personalCompartmentIDs[0]) < prng.SEED_LENGTH {
  362. // Both inproxy.ID and prng.SEED_LENGTH are 32 bytes.
  363. return nil, errors.TraceNew("unexpected ID length")
  364. }
  365. seed := prng.Seed(personalCompartmentIDs[0][0:prng.SEED_LENGTH])
  366. PRNG := prng.NewPRNGWithSeed(&seed)
  367. permutedIndexes := PRNG.Perm(len(brokerSpecs))
  368. // Minimize rendezvous time by reducing the number of brokers this
  369. // personal compartment ID maps over to. With a reduced number of
  370. // possible brokers, the client and proxy have fewer brokers to check
  371. // after fail overs.
  372. //
  373. // Given that permutedIndexes is a randomized shuffle, each personal
  374. // compartment ID will map to a different set of reduced brokers,
  375. // preserving overall broker load balancing.
  376. //
  377. // InproxyPersonalPairingMaxBrokerSpecCount will be configured high
  378. // enough to also preserve reasonable availability when brokers fail.
  379. // When InproxyPersonalPairingMaxBrokerSpecCount is 0, there is no max.
  380. //
  381. // This scheme depends on the len(personalCompartmentIDs) <= 1
  382. // constraint checked above.
  383. maxBrokerSpecs := p.Int(parameters.InproxyPersonalPairingMaxBrokerSpecCount)
  384. if maxBrokerSpecs > 0 && len(permutedIndexes) > maxBrokerSpecs {
  385. permutedIndexes = permutedIndexes[:maxBrokerSpecs]
  386. }
  387. selectedIndex := permutedIndexes[brokerSelectCount%len(permutedIndexes)]
  388. brokerSpecs = brokerSpecs[selectedIndex : selectedIndex+1]
  389. } else {
  390. permutedIndexes := prng.Perm(len(brokerSpecs))
  391. shuffledBrokerSpecs := make(parameters.InproxyBrokerSpecsValue, len(brokerSpecs))
  392. for i, index := range permutedIndexes {
  393. shuffledBrokerSpecs[i] = brokerSpecs[index]
  394. }
  395. brokerSpecs = shuffledBrokerSpecs
  396. }
  397. selectFirstCandidate := resetReasonNoMatch || personalPairing
  398. // Replay broker dial parameters.
  399. // In selectFirstCandidate cases, SelectCandidateWithNetworkReplayParameters
  400. // will always select the first candidate, returning corresponding replay
  401. // data when available. Otherwise, SelectCandidateWithNetworkReplayParameters
  402. // iterates over the shuffled candidates and returns the first with replay data.
  403. var brokerSpec *parameters.InproxyBrokerSpec
  404. var brokerDialParams *InproxyBrokerDialParameters
  405. // Replay is disabled when the TTL, InproxyReplayBrokerDialParametersTTL,
  406. // is 0.
  407. now := time.Now()
  408. ttl := p.Duration(parameters.InproxyReplayBrokerDialParametersTTL)
  409. replayEnabled := ttl > 0 &&
  410. !config.DisableReplay &&
  411. prng.FlipWeightedCoin(p.Float(parameters.InproxyReplayBrokerDialParametersProbability))
  412. if replayEnabled {
  413. brokerSpec, brokerDialParams, err =
  414. SelectCandidateWithNetworkReplayParameters[parameters.InproxyBrokerSpec, InproxyBrokerDialParameters](
  415. networkID,
  416. selectFirstCandidate,
  417. brokerSpecs,
  418. func(spec *parameters.InproxyBrokerSpec) string { return spec.BrokerPublicKey },
  419. func(spec *parameters.InproxyBrokerSpec, dialParams *InproxyBrokerDialParameters) bool {
  420. // Replay the successful broker spec, if present, by
  421. // comparing its hash with that of the candidate.
  422. return dialParams.LastUsedTimestamp.After(now.Add(-ttl)) &&
  423. bytes.Equal(dialParams.LastUsedBrokerSpecHash, hashBrokerSpec(spec))
  424. })
  425. if err != nil {
  426. NoticeWarning("SelectCandidateWithNetworkReplayParameters failed: %v", errors.Trace(err))
  427. // Continue without replay
  428. }
  429. }
  430. // Select the first broker in the shuffle when replay is not enabled or in
  431. // case SelectCandidateWithNetworkReplayParameters fails.
  432. if brokerSpec == nil {
  433. brokerSpec = brokerSpecs[0]
  434. }
  435. // Generate new broker dial parameters if not replaying. Later, isReplay
  436. // is used to report the replay metric.
  437. isReplay := brokerDialParams != nil
  438. // Handle legacy replay records by discarding replay when required fields
  439. // are missing.
  440. if isReplay && brokerDialParams.FrontedHTTPDialParameters == nil {
  441. isReplay = false
  442. }
  443. if !isReplay {
  444. brokerDialParams, err = MakeInproxyBrokerDialParameters(config, p, networkID, brokerSpec, brokerClientManager.tlsCache)
  445. if err != nil {
  446. return nil, errors.Trace(err)
  447. }
  448. } else {
  449. brokerDialParams.brokerSpec = brokerSpec
  450. err := brokerDialParams.prepareDialConfigs(config, p, true, brokerClientManager.tlsCache)
  451. if err != nil {
  452. return nil, errors.Trace(err)
  453. }
  454. }
  455. // Load broker key material.
  456. brokerPublicKey, err := inproxy.SessionPublicKeyFromString(brokerSpec.BrokerPublicKey)
  457. if err != nil {
  458. return nil, errors.Trace(err)
  459. }
  460. brokerRootObfuscationSecret, err := inproxy.ObfuscationSecretFromString(brokerSpec.BrokerRootObfuscationSecret)
  461. if err != nil {
  462. return nil, errors.Trace(err)
  463. }
  464. roundTripper := NewInproxyBrokerRoundTripper(p, brokerDialParams)
  465. // Clients always generate an ephemeral session key pair. Proxies may opt
  466. // to use a long-lived key pair for proxied traffic attribution.
  467. var brokerClientPrivateKey inproxy.SessionPrivateKey
  468. if isProxy && config.InproxyProxySessionPrivateKey != "" {
  469. brokerClientPrivateKey, err = inproxy.SessionPrivateKeyFromString(config.InproxyProxySessionPrivateKey)
  470. if err != nil {
  471. return nil, errors.Trace(err)
  472. }
  473. } else {
  474. brokerClientPrivateKey, err = inproxy.GenerateSessionPrivateKey()
  475. if err != nil {
  476. return nil, errors.Trace(err)
  477. }
  478. }
  479. // InproxyBrokerClientInstance implements the
  480. // inproxy.BrokerDialCoordinator interface and passes itself to
  481. // inproxy.NewBrokerClient in order to provide the round tripper, key
  482. // material, compartment IDs, timeouts, and other configuration to the
  483. // in-proxy broker client.
  484. //
  485. // Timeouts are not replayed, but snapshots are stored in the
  486. // InproxyBrokerClientInstance for efficient lookup.
  487. b := &InproxyBrokerClientInstance{
  488. config: config,
  489. brokerClientManager: brokerClientManager,
  490. networkID: networkID,
  491. brokerClientPrivateKey: brokerClientPrivateKey,
  492. brokerPublicKey: brokerPublicKey,
  493. brokerRootObfuscationSecret: brokerRootObfuscationSecret,
  494. brokerDialParams: brokerDialParams,
  495. replayEnabled: replayEnabled,
  496. roundTripper: roundTripper,
  497. personalCompartmentIDs: personalCompartmentIDs,
  498. commonCompartmentIDs: commonCompartmentIDs,
  499. sessionHandshakeTimeout: p.Duration(parameters.InproxySessionHandshakeRoundTripTimeout),
  500. announceRequestTimeout: p.Duration(parameters.InproxyProxyAnnounceRequestTimeout),
  501. announceDelay: p.Duration(parameters.InproxyProxyAnnounceDelay),
  502. announceMaxBackoffDelay: p.Duration(parameters.InproxyProxyAnnounceMaxBackoffDelay),
  503. announceDelayJitter: p.Float(parameters.InproxyProxyAnnounceDelayJitter),
  504. answerRequestTimeout: p.Duration(parameters.InproxyProxyAnswerRequestTimeout),
  505. offerRequestTimeout: p.Duration(parameters.InproxyClientOfferRequestTimeout),
  506. offerRequestPersonalTimeout: p.Duration(parameters.InproxyClientOfferRequestPersonalTimeout),
  507. offerRetryDelay: p.Duration(parameters.InproxyClientOfferRetryDelay),
  508. offerRetryJitter: p.Float(parameters.InproxyClientOfferRetryJitter),
  509. relayedPacketRequestTimeout: p.Duration(parameters.InproxyClientRelayedPacketRequestTimeout),
  510. dslRequestTimeout: p.Duration(parameters.InproxyClientDSLRequestTimeout),
  511. replayRetainFailedProbability: p.Float(parameters.InproxyReplayBrokerRetainFailedProbability),
  512. replayUpdateFrequency: p.Duration(parameters.InproxyReplayBrokerUpdateFrequency),
  513. }
  514. if isProxy && !config.IsInproxyProxyPersonalPairingMode() {
  515. // This retry is applied only for proxies and only in common pairing
  516. // mode. See comment in BrokerClientRoundTripperFailed.
  517. b.retryOnFailedPeriod = p.Duration(parameters.InproxyProxyOnBrokerClientFailedRetryPeriod)
  518. }
  519. // Limitation: currently, disableWaitToShareSession is neither replayed
  520. // nor is the selected value reported in metrics. The default tactics
  521. // parameters are considered to be optimal: the in-proxy clients
  522. // disabling wait and proxies using wait. The tactics flag can be used to
  523. // enable wait for clients in case performance is poor or load on
  524. // brokers -- due to simultaneous sessions -- is unexpectedly high.
  525. //
  526. // Note that, for broker dial parameter replay, the isValidReplay function
  527. // currently invalidates replay only when broker specs change, and not
  528. // when tactics in general change; so changes to these
  529. // disableWaitToShareSession parameters would not properly invalidate
  530. // replays in any case.
  531. //
  532. // Potential future enhancements for in-proxy client broker clients
  533. // include using a pool of broker clients, with each one potentially
  534. // using a different broker and/or fronting spec. In this scenario,
  535. // waitToShareSession would be less impactful.
  536. if isProxy {
  537. b.disableWaitToShareSession = p.Bool(parameters.InproxyProxyDisableWaitToShareSession)
  538. } else {
  539. b.disableWaitToShareSession = p.Bool(parameters.InproxyClientDisableWaitToShareSession)
  540. }
  541. // Adjust long-polling request timeouts to respect any maximum request
  542. // timeout supported by the provider fronting the request.
  543. maxRequestTimeout, ok := p.KeyDurations(
  544. parameters.InproxyFrontingProviderClientMaxRequestTimeouts)[brokerDialParams.FrontedHTTPDialParameters.FrontingProviderID]
  545. if ok && maxRequestTimeout > 0 {
  546. if b.announceRequestTimeout > maxRequestTimeout {
  547. b.announceRequestTimeout = maxRequestTimeout
  548. }
  549. if b.offerRequestTimeout > maxRequestTimeout {
  550. b.offerRequestTimeout = maxRequestTimeout
  551. }
  552. if b.offerRequestPersonalTimeout > maxRequestTimeout {
  553. b.offerRequestPersonalTimeout = maxRequestTimeout
  554. }
  555. }
  556. // Initialize broker client. This will start with a fresh broker session.
  557. //
  558. // When resetBrokerClientOnRoundTripperFailed is invoked due to a failure
  559. // at the transport level -- TLS or domain fronting --
  560. // NewInproxyBrokerClientInstance is invoked, resetting both the broker
  561. // client round tripper and the broker session. As a future enhancement,
  562. // consider distinguishing between transport and session errors and
  563. // retaining a valid established session when only the transport needs to
  564. // be reset/retried.
  565. b.brokerClient, err = inproxy.NewBrokerClient(b)
  566. if err != nil {
  567. return nil, errors.Trace(err)
  568. }
  569. // The broker ID is the broker's session public key in Curve25519 form.
  570. brokerID, err := brokerPublicKey.ToCurve25519()
  571. if err != nil {
  572. return nil, errors.Trace(err)
  573. }
  574. NoticeInfo("inproxy: selected broker %s", inproxy.ID(brokerID))
  575. return b, nil
  576. }
  577. func haveInproxyProxyBrokerSpecs(config *Config) bool {
  578. p := config.GetParameters().Get()
  579. defer p.Close()
  580. return len(getInproxyBrokerSpecs(config, p, true)) > 0
  581. }
  582. func haveInproxyClientBrokerSpecs(config *Config) bool {
  583. p := config.GetParameters().Get()
  584. defer p.Close()
  585. return len(getInproxyBrokerSpecs(config, p, false)) > 0
  586. }
  587. func getInproxyBrokerSpecs(
  588. config *Config,
  589. p parameters.ParametersAccessor,
  590. isProxy bool) parameters.InproxyBrokerSpecsValue {
  591. if isProxy {
  592. if config.IsInproxyProxyPersonalPairingMode() {
  593. return p.InproxyBrokerSpecs(
  594. parameters.InproxyProxyPersonalPairingBrokerSpecs,
  595. parameters.InproxyPersonalPairingBrokerSpecs,
  596. parameters.InproxyProxyBrokerSpecs,
  597. parameters.InproxyBrokerSpecs)
  598. } else {
  599. return p.InproxyBrokerSpecs(
  600. parameters.InproxyProxyBrokerSpecs,
  601. parameters.InproxyBrokerSpecs)
  602. }
  603. } else {
  604. if config.IsInproxyClientPersonalPairingMode() {
  605. return p.InproxyBrokerSpecs(
  606. parameters.InproxyClientPersonalPairingBrokerSpecs,
  607. parameters.InproxyPersonalPairingBrokerSpecs,
  608. parameters.InproxyClientBrokerSpecs,
  609. parameters.InproxyBrokerSpecs)
  610. } else {
  611. return p.InproxyBrokerSpecs(
  612. parameters.InproxyClientBrokerSpecs,
  613. parameters.InproxyBrokerSpecs)
  614. }
  615. }
  616. }
  617. func haveInproxyCommonCompartmentIDs(config *Config) bool {
  618. p := config.GetParameters().Get()
  619. defer p.Close()
  620. if len(p.InproxyCompartmentIDs(parameters.InproxyCommonCompartmentIDs)) > 0 {
  621. return true
  622. }
  623. commonCompartmentIDs, _ := LoadInproxyCommonCompartmentIDs()
  624. return len(commonCompartmentIDs) > 0
  625. }
  626. func prepareInproxyCompartmentIDs(
  627. config *Config,
  628. p parameters.ParametersAccessor,
  629. isProxy bool) ([]inproxy.ID, []inproxy.ID, error) {
  630. // Personal compartment IDs are loaded from the tunnel-core config; these
  631. // are set by the external app based on user input/configuration of IDs
  632. // generated by or obtained from personal proxies. Both clients and
  633. // proxies send personal compartment IDs to the in-proxy broker. For
  634. // clients, when personal compartment IDs are configured, no common
  635. // compartment IDs are prepared, ensuring matches with only proxies that
  636. // supply the corresponding personal compartment IDs.
  637. //
  638. // Common compartment IDs are obtained from tactics and merged with
  639. // previously learned IDs stored in the local datastore. When new IDs are
  640. // obtained from tactics, the merged list is written back to the
  641. // datastore. This allows for schemes where common compartment IDs are
  642. // distributed to sets of clients, then removed from distibution, and
  643. // still used to match proxies to those sets of clients. Only clients
  644. // send common compartment IDs to the in-proxy broker. Proxies are
  645. // automatically assigned to common compartments by the broker.
  646. //
  647. // Maximum compartment ID list lengths are enforced to ensure broker
  648. // request sizes don't grow unbounded.
  649. //
  650. // Limitation: currently, in max length trimming, new common compartment
  651. // IDs take precedence over older IDs.
  652. maxCompartmentIDListLength := p.Int(parameters.InproxyMaxCompartmentIDListLength)
  653. // Personal compartment ID limitations:
  654. //
  655. // The broker API messages, ProxyAnnounceRequest and ClientOfferRequest,
  656. // support lists of personal compartment IDs. However, both the proxy and
  657. // the client are currently limited to specifying at most one personal
  658. // compartment ID due to the following limitations:
  659. //
  660. // - On the broker side, the matcher queue implementation supports at most
  661. // one proxy personal compartment ID. See inproxy/Matcher.Announce. The
  662. // broker currently enforces that at most one personal compartment ID
  663. // may be specified per ProxyAnnounceRequest.
  664. //
  665. // - On the proxy/client side, the personal pairing rendezvous logic --
  666. // which aims for proxies and clients to select the same initial broker
  667. // and same order of failover to other brokers -- uses a shuffle that
  668. // assumes both the proxy and client use the same single, personal
  669. // compartment ID
  670. var configPersonalCompartmentIDs []string
  671. if isProxy && len(config.InproxyProxyPersonalCompartmentID) > 0 {
  672. configPersonalCompartmentIDs = []string{config.InproxyProxyPersonalCompartmentID}
  673. } else if !isProxy && len(config.InproxyClientPersonalCompartmentID) > 0 {
  674. configPersonalCompartmentIDs = []string{config.InproxyClientPersonalCompartmentID}
  675. }
  676. personalCompartmentIDs, err := inproxy.IDsFromStrings(configPersonalCompartmentIDs)
  677. if err != nil {
  678. return nil, nil, errors.Trace(err)
  679. }
  680. if len(personalCompartmentIDs) > maxCompartmentIDListLength {
  681. // Trim the list. It's not expected that user-configured personal
  682. // compartment ID lists will exceed the max length.
  683. //
  684. // TODO: shuffle before trimming? Prioritize previous matches?
  685. personalCompartmentIDs = personalCompartmentIDs[:maxCompartmentIDListLength]
  686. }
  687. var commonCompartmentIDs []inproxy.ID
  688. if !isProxy && len(personalCompartmentIDs) == 0 {
  689. tacticsCommonCompartmentIDs := p.InproxyCompartmentIDs(parameters.InproxyCommonCompartmentIDs)
  690. knownCommonCompartmentIDs, err := LoadInproxyCommonCompartmentIDs()
  691. if err != nil {
  692. NoticeWarning("LoadInproxyCommonCompartmentIDs failed: %v", errors.Trace(err))
  693. // Continue with only the tactics common compartment IDs.
  694. }
  695. newCompartmentIDs := make([]string, 0, len(tacticsCommonCompartmentIDs))
  696. for _, compartmentID := range tacticsCommonCompartmentIDs {
  697. // TODO: faster lookup?
  698. if !common.Contains(knownCommonCompartmentIDs, compartmentID) {
  699. newCompartmentIDs = append(newCompartmentIDs, compartmentID)
  700. }
  701. }
  702. if len(newCompartmentIDs) > 0 {
  703. newCompartmentIDs = append(newCompartmentIDs, knownCommonCompartmentIDs...)
  704. // Locally store more than InproxyMaxCompartmentIDListLength known
  705. // common compartment IDs, in case the request limit parameter is
  706. // increased in the future.
  707. // maxPersistedCommonCompartmentIDListLength still limits the
  708. // length of the list to cap local memory and disk impact.
  709. maxPersistedCommonCompartmentIDListLength := 500 // ~16K
  710. if maxCompartmentIDListLength > maxPersistedCommonCompartmentIDListLength {
  711. maxPersistedCommonCompartmentIDListLength = maxCompartmentIDListLength
  712. }
  713. if len(newCompartmentIDs) > maxPersistedCommonCompartmentIDListLength {
  714. newCompartmentIDs = newCompartmentIDs[:maxPersistedCommonCompartmentIDListLength]
  715. }
  716. err := StoreInproxyCommonCompartmentIDs(newCompartmentIDs)
  717. if err != nil {
  718. NoticeWarning("StoreInproxyCommonCompartmentIDs failed: %v", errors.Trace(err))
  719. // Continue without persisting new common compartment IDs.
  720. }
  721. knownCommonCompartmentIDs = newCompartmentIDs
  722. }
  723. commonCompartmentIDs, err = inproxy.IDsFromStrings(knownCommonCompartmentIDs)
  724. if err != nil {
  725. return nil, nil, errors.Trace(err)
  726. }
  727. if len(commonCompartmentIDs) > maxCompartmentIDListLength {
  728. // TODO: shuffle before trimming? Prioritize previous matches?
  729. commonCompartmentIDs = commonCompartmentIDs[:maxCompartmentIDListLength]
  730. }
  731. }
  732. return commonCompartmentIDs, personalCompartmentIDs, nil
  733. }
  734. // HasSuccess indicates whether this broker client instance has completed at
  735. // least one successful round trip.
  736. func (b *InproxyBrokerClientInstance) HasSuccess() bool {
  737. b.mutex.Lock()
  738. defer b.mutex.Unlock()
  739. return !b.lastSuccess.IsZero()
  740. }
  741. // Close closes the broker client round tripper, including closing all
  742. // underlying network connections, which will interrupt any in-flight round
  743. // trips.
  744. func (b *InproxyBrokerClientInstance) Close() error {
  745. // Concurrency note: Close is called from InproxyBrokerClientManager with
  746. // its mutex locked. Close must not lock InproxyBrokerClientInstance's
  747. // mutex, or else there is a risk of deadlock similar to the HasSuccess
  748. // case documented in InproxyBrokerClientManager.GetBrokerClient.
  749. err := b.roundTripper.Close()
  750. return errors.Trace(err)
  751. }
  752. // Implements the inproxy.BrokerDialCoordinator interface.
  753. func (b *InproxyBrokerClientInstance) NetworkID() string {
  754. return b.networkID
  755. }
  756. // Implements the inproxy.BrokerDialCoordinator interface.
  757. func (b *InproxyBrokerClientInstance) NetworkType() inproxy.NetworkType {
  758. return getInproxyNetworkType(GetNetworkType(b.networkID))
  759. }
  760. // Implements the inproxy.BrokerDialCoordinator interface.
  761. func (b *InproxyBrokerClientInstance) CommonCompartmentIDs() []inproxy.ID {
  762. return b.commonCompartmentIDs
  763. }
  764. // Implements the inproxy.BrokerDialCoordinator interface.
  765. func (b *InproxyBrokerClientInstance) PersonalCompartmentIDs() []inproxy.ID {
  766. return b.personalCompartmentIDs
  767. }
  768. // Implements the inproxy.BrokerDialCoordinator interface.
  769. func (b *InproxyBrokerClientInstance) DisableWaitToShareSession() bool {
  770. return b.disableWaitToShareSession
  771. }
  772. // Implements the inproxy.BrokerDialCoordinator interface.
  773. func (b *InproxyBrokerClientInstance) BrokerClientPrivateKey() inproxy.SessionPrivateKey {
  774. return b.brokerClientPrivateKey
  775. }
  776. // Implements the inproxy.BrokerDialCoordinator interface.
  777. func (b *InproxyBrokerClientInstance) BrokerPublicKey() inproxy.SessionPublicKey {
  778. return b.brokerPublicKey
  779. }
  780. // Implements the inproxy.BrokerDialCoordinator interface.
  781. func (b *InproxyBrokerClientInstance) BrokerRootObfuscationSecret() inproxy.ObfuscationSecret {
  782. return b.brokerRootObfuscationSecret
  783. }
  784. // Implements the inproxy.BrokerDialCoordinator interface.
  785. func (b *InproxyBrokerClientInstance) BrokerClientRoundTripper() (inproxy.RoundTripper, error) {
  786. // Returns the same round tripper for the lifetime of the
  787. // inproxy.BrokerDialCoordinator, ensuring all requests for one in-proxy
  788. // dial or proxy relay use the same broker, as is necessary due to the
  789. // broker state for the proxy announce/answer, client broker/server
  790. // relay, etc.
  791. return b.roundTripper, nil
  792. }
  793. // Implements the inproxy.BrokerDialCoordinator interface.
  794. func (b *InproxyBrokerClientInstance) BrokerClientRoundTripperSucceeded(roundTripper inproxy.RoundTripper) {
  795. b.mutex.Lock()
  796. defer b.mutex.Unlock()
  797. if rt, ok := roundTripper.(*InproxyBrokerRoundTripper); !ok || rt != b.roundTripper {
  798. // Passing in the round tripper obtained from BrokerClientRoundTripper
  799. // is just used for sanity check in this implementation, since each
  800. // InproxyBrokerClientInstance has exactly one round tripper.
  801. NoticeError("BrokerClientRoundTripperSucceeded: roundTripper instance mismatch")
  802. return
  803. }
  804. now := time.Now()
  805. b.lastSuccess = now
  806. // Set replay or extend the broker dial parameters replay TTL after a
  807. // success. With tunnel dial parameters, the replay TTL is extended after
  808. // every successful tunnel connection. Since there are potentially more
  809. // and more frequent broker round trips compared to tunnel dials, the TTL
  810. // is only extended after some target duration has elapsed, to avoid
  811. // excessive datastore writes.
  812. if b.replayEnabled && now.Sub(b.lastStoreReplay) > b.replayUpdateFrequency {
  813. b.brokerDialParams.LastUsedTimestamp = time.Now()
  814. err := SetNetworkReplayParameters[InproxyBrokerDialParameters](
  815. b.networkID, b.brokerDialParams.brokerSpec.BrokerPublicKey, b.brokerDialParams)
  816. if err != nil {
  817. NoticeWarning("StoreBrokerDialParameters failed: %v", errors.Trace(err))
  818. // Continue without persisting replay changes.
  819. } else {
  820. b.lastStoreReplay = now
  821. }
  822. }
  823. // Verify/extend the resolver cache entry for any resolved domain after a
  824. // success.
  825. //
  826. // Limitation: currently this re-extends regardless of how long ago the DNS
  827. // resolve happened.
  828. resolver := b.config.GetResolver()
  829. if resolver != nil {
  830. resolver.VerifyCacheExtension(b.brokerDialParams.FrontedHTTPDialParameters.FrontingDialAddress)
  831. }
  832. }
  833. // Implements the inproxy.BrokerDialCoordinator interface.
  834. func (b *InproxyBrokerClientInstance) BrokerClientRoundTripperFailed(roundTripper inproxy.RoundTripper) {
  835. b.mutex.Lock()
  836. defer b.mutex.Unlock()
  837. if rt, ok := roundTripper.(*InproxyBrokerRoundTripper); !ok || rt != b.roundTripper {
  838. // Passing in the round tripper obtained from BrokerClientRoundTripper
  839. // is just used for sanity check in this implementation, since each
  840. // InproxyBrokerClientInstance has exactly one round tripper.
  841. NoticeError("BrokerClientRoundTripperFailed: roundTripper instance mismatch")
  842. return
  843. }
  844. // For common pairing proxies, skip both the replay deletion and the
  845. // InproxyBrokerClientInstance reset for a short duration after a recent
  846. // round trip success. In this case, subsequent broker requests will use
  847. // the existing round tripper, wired up with the same dial parameters and
  848. // fronting provider selection. If the failure was due to a transient
  849. // TLS/TCP network failure, the net/http round tripper should establish a
  850. // new connection on the next request.
  851. //
  852. // This retry is intended to retain proxy affinity with its currently
  853. // selected broker in cases such as broker service upgrades/restarts or
  854. // brief network interruptions, mitigating load balancing issues that
  855. // otherwise occur (e.g., all proxies fail over to other brokers, leaving
  856. // no supply on a restarted broker).
  857. //
  858. // In common pairing mode, clients do not perform this retry and
  859. // immediately reset, as is appropriate for the tunnel establishment
  860. // race. In personal pairing mode, neither proxies nor clients retry and
  861. // instead follow the personal pairing broker selection scheme in an
  862. // effort to rendezvous at the same broker with minimal delay.
  863. //
  864. // A delay before retrying announce requests is appropriate, but there is
  865. // no delay added here since Proxy.proxyOneClient already schedule delays
  866. // between announcements.
  867. //
  868. // Limitation: BrokerClientRoundTripperSucceeded is not invoked -- and no
  869. // recent last success time is set -- for proxies which announce, don't
  870. // match, and then hit the misaligned fronting provider request timeout
  871. // issue. See the ""unexpected response status code" case and comment in
  872. // InproxyBrokerRoundTripper.RoundTrip. This case should be mitigated by
  873. // configuring InproxyFrontingProviderServerMaxRequestTimeouts.
  874. //
  875. // TODO: also retry after initial startup, with no previous success? This
  876. // would further retain random load balancing of proxies newly starting
  877. // at the same time that their initially selected broker is restarted or
  878. // briefly unavailable.
  879. if b.brokerClientManager.isProxy &&
  880. !b.config.IsInproxyProxyPersonalPairingMode() &&
  881. b.retryOnFailedPeriod > 0 &&
  882. !b.lastSuccess.IsZero() &&
  883. time.Since(b.lastSuccess) <= b.retryOnFailedPeriod {
  884. NoticeWarning("BrokerClientRoundTripperFailed: retry roundTripper")
  885. return
  886. }
  887. // Delete any persistent replay dial parameters. Unlike with the success
  888. // case, consecutive, repeated deletes shouldn't write to storage, so
  889. // they are not avoided.
  890. if b.replayEnabled &&
  891. !prng.FlipWeightedCoin(b.replayRetainFailedProbability) {
  892. // Limitation: there's a race condition with multiple
  893. // InproxyBrokerClientInstances writing to the replay datastore for
  894. // the same broker, such as in the case where there's a dual-mode
  895. // in-proxy client and proxy; this delete could potentially clobber a
  896. // concurrent fresh replay store after a success.
  897. //
  898. // TODO: add an additional storage key distinguisher for each instance?
  899. err := DeleteNetworkReplayParameters[InproxyBrokerDialParameters](
  900. b.networkID, b.brokerDialParams.brokerSpec.BrokerPublicKey)
  901. if err != nil {
  902. NoticeWarning("DeleteBrokerDialParameters failed: %v", errors.Trace(err))
  903. // Continue without resetting replay.
  904. }
  905. }
  906. // Remove the TLS session cache entry for the broker's fronting dial address, if present.
  907. // This ensures that the next round trip establishes a new TLS session, avoiding potential issues
  908. // caused by session resumption fingerprint that may have contributed to the round tripper failure.
  909. if hardcodedCache := b.brokerDialParams.FrontedHTTPDialParameters.meekConfig.TLSClientSessionCache; hardcodedCache != nil {
  910. hardcodedCache.RemoveCacheEntry()
  911. }
  912. // Invoke resetBrokerClientOnRoundTripperFailed to signal the
  913. // InproxyBrokerClientManager to create a new
  914. // InproxyBrokerClientInstance, with new dial parameters and a new round
  915. // tripper, after a failure.
  916. //
  917. // This InproxyBrokerClientInstance doesn't change its dial parameters or
  918. // round tripper to ensure that any concurrent usage retains affinity
  919. // with the same parameters and broker.
  920. //
  921. // Limitation: a transport-level failure may unnecessarily reset the
  922. // broker session state; see comment in NewInproxyBrokerClientInstance.
  923. err := b.brokerClientManager.resetBrokerClientOnRoundTripperFailed(b)
  924. if err != nil {
  925. NoticeWarning("reset broker client failed: %v", errors.Trace(err))
  926. // Continue with old broker client instance.
  927. }
  928. }
  929. // Implements the inproxy.BrokerDialCoordinator interface.
  930. func (b *InproxyBrokerClientInstance) BrokerClientNoMatch(roundTripper inproxy.RoundTripper) {
  931. b.mutex.Lock()
  932. defer b.mutex.Unlock()
  933. if rt, ok := roundTripper.(*InproxyBrokerRoundTripper); !ok || rt != b.roundTripper {
  934. // See roundTripper check comment in BrokerClientRoundTripperFailed.
  935. NoticeError("BrokerClientNoMatch: roundTripper instance mismatch")
  936. return
  937. }
  938. // Any persistent replay dial parameters are retained and not deleted,
  939. // since the broker client successfully transacted with the broker.
  940. err := b.brokerClientManager.resetBrokerClientOnNoMatch(b)
  941. if err != nil {
  942. NoticeWarning("reset broker client failed: %v", errors.Trace(err))
  943. // Continue with old broker client instance.
  944. }
  945. }
  946. // Implements the inproxy.BrokerDialCoordinator interface.
  947. func (b *InproxyBrokerClientInstance) MetricsForBrokerRequests() common.LogFields {
  948. return b.brokerDialParams.GetMetricsForBrokerRequests()
  949. }
  950. // Implements the inproxy.BrokerDialCoordinator interface.
  951. func (b *InproxyBrokerClientInstance) AnnounceRequestTimeout() time.Duration {
  952. return b.announceRequestTimeout
  953. }
  954. // Implements the inproxy.BrokerDialCoordinator interface.
  955. func (b *InproxyBrokerClientInstance) SessionHandshakeRoundTripTimeout() time.Duration {
  956. return b.sessionHandshakeTimeout
  957. }
  958. // Implements the inproxy.BrokerDialCoordinator interface.
  959. func (b *InproxyBrokerClientInstance) AnnounceDelay() time.Duration {
  960. return b.announceDelay
  961. }
  962. // Implements the inproxy.BrokerDialCoordinator interface.
  963. func (b *InproxyBrokerClientInstance) AnnounceMaxBackoffDelay() time.Duration {
  964. return b.announceMaxBackoffDelay
  965. }
  966. // Implements the inproxy.BrokerDialCoordinator interface.
  967. func (b *InproxyBrokerClientInstance) AnnounceDelayJitter() float64 {
  968. return b.announceDelayJitter
  969. }
  970. // Implements the inproxy.BrokerDialCoordinator interface.
  971. func (b *InproxyBrokerClientInstance) AnswerRequestTimeout() time.Duration {
  972. return b.answerRequestTimeout
  973. }
  974. // Implements the inproxy.BrokerDialCoordinator interface.
  975. func (b *InproxyBrokerClientInstance) OfferRequestTimeout() time.Duration {
  976. return b.offerRequestTimeout
  977. }
  978. // Implements the inproxy.BrokerDialCoordinator interface.
  979. func (b *InproxyBrokerClientInstance) OfferRequestPersonalTimeout() time.Duration {
  980. return b.offerRequestPersonalTimeout
  981. }
  982. // Implements the inproxy.BrokerDialCoordinator interface.
  983. func (b *InproxyBrokerClientInstance) OfferRetryDelay() time.Duration {
  984. return b.offerRetryDelay
  985. }
  986. // Implements the inproxy.BrokerDialCoordinator interface.
  987. func (b *InproxyBrokerClientInstance) OfferRetryJitter() float64 {
  988. return b.offerRetryJitter
  989. }
  990. // Implements the inproxy.BrokerDialCoordinator interface.
  991. func (b *InproxyBrokerClientInstance) RelayedPacketRequestTimeout() time.Duration {
  992. return b.relayedPacketRequestTimeout
  993. }
  994. // Implements the inproxy.BrokerDialCoordinator interface.
  995. func (b *InproxyBrokerClientInstance) DSLRequestTimeout() time.Duration {
  996. return b.dslRequestTimeout
  997. }
  998. // InproxyBrokerDialParameters represents a selected broker transport and dial
  999. // paramaters.
  1000. //
  1001. // InproxyBrokerDialParameters is used to configure dialers; as a persistent
  1002. // record to store successful dial parameters for replay; and to report dial
  1003. // stats in notices and Psiphon API calls.
  1004. //
  1005. // InproxyBrokerDialParameters is similar to tunnel DialParameters, but is
  1006. // specific to the in-proxy broker dial phase.
  1007. type InproxyBrokerDialParameters struct {
  1008. brokerSpec *parameters.InproxyBrokerSpec `json:"-"`
  1009. isReplay bool `json:"-"`
  1010. isReuse bool `json:"-"`
  1011. LastUsedTimestamp time.Time
  1012. LastUsedBrokerSpecHash []byte
  1013. FrontedHTTPDialParameters *FrontedMeekDialParameters
  1014. }
  1015. // MakeInproxyBrokerDialParameters creates a new InproxyBrokerDialParameters.
  1016. func MakeInproxyBrokerDialParameters(
  1017. config *Config,
  1018. p parameters.ParametersAccessor,
  1019. networkID string,
  1020. brokerSpec *parameters.InproxyBrokerSpec,
  1021. tlsCache utls.ClientSessionCache) (*InproxyBrokerDialParameters, error) {
  1022. if config.UseUpstreamProxy() {
  1023. return nil, errors.TraceNew("upstream proxy unsupported")
  1024. }
  1025. currentTimestamp := time.Now()
  1026. // Select new broker dial parameters
  1027. brokerDialParams := &InproxyBrokerDialParameters{
  1028. brokerSpec: brokerSpec,
  1029. LastUsedTimestamp: currentTimestamp,
  1030. LastUsedBrokerSpecHash: hashBrokerSpec(brokerSpec),
  1031. }
  1032. // FrontedMeekDialParameters
  1033. //
  1034. // The broker round trips use MeekModeWrappedPlaintextRoundTrip without
  1035. // meek cookies, so meek obfuscation is not configured. The in-proxy
  1036. // broker session payloads have their own obfuscation layer.
  1037. payloadSecure := true
  1038. skipVerify := false
  1039. var err error
  1040. brokerDialParams.FrontedHTTPDialParameters, err = makeFrontedMeekDialParameters(
  1041. config,
  1042. p,
  1043. nil,
  1044. brokerSpec.BrokerFrontingSpecs,
  1045. nil,
  1046. true,
  1047. skipVerify,
  1048. config.DisableSystemRootCAs,
  1049. payloadSecure,
  1050. tlsCache,
  1051. )
  1052. if err != nil {
  1053. return nil, errors.Trace(err)
  1054. }
  1055. // Initialize Dial/MeekConfigs to be passed to the corresponding dialers.
  1056. err = brokerDialParams.prepareDialConfigs(
  1057. config,
  1058. p,
  1059. false,
  1060. tlsCache)
  1061. if err != nil {
  1062. return nil, errors.Trace(err)
  1063. }
  1064. return brokerDialParams, nil
  1065. }
  1066. // prepareDialConfigs is called for both new and replayed broker dial parameters.
  1067. func (brokerDialParams *InproxyBrokerDialParameters) prepareDialConfigs(
  1068. config *Config,
  1069. p parameters.ParametersAccessor,
  1070. isReplay bool,
  1071. tlsCache utls.ClientSessionCache) error {
  1072. brokerDialParams.isReplay = isReplay
  1073. // brokerDialParams.isReuse is set only later, as this is a new broker
  1074. // client dial.
  1075. if isReplay {
  1076. // FrontedHTTPDialParameters
  1077. //
  1078. // The broker round trips use MeekModeWrappedPlaintextRoundTrip without
  1079. // meek cookies, so meek obfuscation is not configured. The in-proxy
  1080. // broker session payloads have their own obfuscation layer.
  1081. payloadSecure := true
  1082. skipVerify := false
  1083. err := brokerDialParams.FrontedHTTPDialParameters.prepareDialConfigs(
  1084. config, p, nil, nil, true, skipVerify,
  1085. config.DisableSystemRootCAs, payloadSecure, tlsCache)
  1086. if err != nil {
  1087. return errors.Trace(err)
  1088. }
  1089. }
  1090. return nil
  1091. }
  1092. // GetMetricsForBroker returns broker client dial parameter log fields to be
  1093. // reported to a broker.
  1094. func (brokerDialParams *InproxyBrokerDialParameters) GetMetricsForBrokerRequests() common.LogFields {
  1095. logFields := common.LogFields{}
  1096. // TODO: add additional broker fronting dial parameters to be logged by
  1097. // the broker -- as successful parameters might not otherwise by logged
  1098. // via server_tunnel if the subsequent WebRTC dials fail.
  1099. logFields["fronting_provider_id"] = brokerDialParams.FrontedHTTPDialParameters.FrontingProviderID
  1100. return logFields
  1101. }
  1102. // GetMetrics implements the common.MetricsSource interface and returns log
  1103. // fields detailing the broker dial parameters.
  1104. func (brokerDialParams *InproxyBrokerDialParameters) GetMetrics() common.LogFields {
  1105. logFields := common.LogFields{}
  1106. // Add underlying log fields, which must be renamed to be scoped to the
  1107. // broker.
  1108. logFields.Add(brokerDialParams.FrontedHTTPDialParameters.GetMetrics("inproxy_broker_"))
  1109. logFields["inproxy_broker_transport"] = brokerDialParams.FrontedHTTPDialParameters.FrontingTransport
  1110. isReplay := "0"
  1111. if brokerDialParams.isReplay {
  1112. isReplay = "1"
  1113. }
  1114. logFields["inproxy_broker_is_replay"] = isReplay
  1115. isReuse := "0"
  1116. if brokerDialParams.isReuse {
  1117. isReuse = "1"
  1118. }
  1119. logFields["inproxy_broker_is_reuse"] = isReuse
  1120. // TODO: include tlsConn.GetMetrics tls_did_resume/tls_sent_ticket.
  1121. // Requires a reference to the InproxyBrokerRoundTripper.
  1122. return logFields
  1123. }
  1124. // hashBrokerSpec hashes the broker spec. The hash is used to detect when
  1125. // broker spec tactics have changed.
  1126. func hashBrokerSpec(spec *parameters.InproxyBrokerSpec) []byte {
  1127. var hash [8]byte
  1128. binary.BigEndian.PutUint64(
  1129. hash[:],
  1130. uint64(xxhash.Sum64String(fmt.Sprintf("%+v", spec))))
  1131. return hash[:]
  1132. }
  1133. // InproxyBrokerRoundTripper is a broker request round trip transport
  1134. // implemented using MeekConn in MeekModePlaintextRoundTrip mode, utilizing
  1135. // MeekConn's domain fronting capabilities and using persistent and
  1136. // multiplexed connections, via HTTP/2, to support multiple concurrent
  1137. // in-flight round trips.
  1138. //
  1139. // InproxyBrokerRoundTripper implements the inproxy.RoundTripper interface.
  1140. type InproxyBrokerRoundTripper struct {
  1141. brokerDialParams *InproxyBrokerDialParameters
  1142. runCtx context.Context
  1143. stopRunning context.CancelFunc
  1144. dial int32
  1145. dialCompleted chan struct{}
  1146. dialErr error
  1147. conn *MeekConn
  1148. failureThreshold time.Duration
  1149. }
  1150. // NewInproxyBrokerRoundTripper creates a new InproxyBrokerRoundTripper. The
  1151. // initial DialMeek is defered until the first call to RoundTrip, so
  1152. // NewInproxyBrokerRoundTripper does not perform any network operations.
  1153. //
  1154. // The input brokerDialParams dial parameter and config fields must not
  1155. // modifed after NewInproxyBrokerRoundTripper is called.
  1156. func NewInproxyBrokerRoundTripper(
  1157. p parameters.ParametersAccessor,
  1158. brokerDialParams *InproxyBrokerDialParameters) *InproxyBrokerRoundTripper {
  1159. runCtx, stopRunning := context.WithCancel(context.Background())
  1160. return &InproxyBrokerRoundTripper{
  1161. brokerDialParams: brokerDialParams,
  1162. runCtx: runCtx,
  1163. stopRunning: stopRunning,
  1164. dialCompleted: make(chan struct{}),
  1165. failureThreshold: p.Duration(
  1166. parameters.InproxyBrokerRoundTripStatusCodeFailureThreshold),
  1167. }
  1168. }
  1169. // Close interrupts any in-flight request and closes the underlying
  1170. // MeekConn.
  1171. func (rt *InproxyBrokerRoundTripper) Close() error {
  1172. // Interrupt any DialMeek or RoundTrip.
  1173. rt.stopRunning()
  1174. if atomic.CompareAndSwapInt32(&rt.dial, 0, 1) {
  1175. // RoundTrip has not yet been called or has not yet kicked off
  1176. // DialMeek, so there is no MeekConn to close. Prevent any future
  1177. // DialMeek by signaling dialCompleted and fail any future round trip
  1178. // attempt by setting dialErr.
  1179. rt.dialErr = errors.TraceNew("closed")
  1180. close(rt.dialCompleted)
  1181. } else {
  1182. // Await any ongoing DialMeek or RoundTrip (stopRunning should
  1183. // interrupt either one quickly).
  1184. <-rt.dialCompleted
  1185. if rt.conn != nil {
  1186. _ = rt.conn.Close()
  1187. }
  1188. }
  1189. // As with MeekConn.Close, any Close errors from underlying conns are not
  1190. // propagated.
  1191. return nil
  1192. }
  1193. // RoundTrip transports a request to the broker endpoint and returns a
  1194. // response.
  1195. func (rt *InproxyBrokerRoundTripper) RoundTrip(
  1196. ctx context.Context,
  1197. roundTripDelay time.Duration,
  1198. roundTripTimeout time.Duration,
  1199. requestPayload []byte) (_ []byte, retErr error) {
  1200. defer func() {
  1201. // Log any error which results in invoking BrokerClientRoundTripperFailed.
  1202. var failedError *inproxy.RoundTripperFailedError
  1203. if std_errors.As(retErr, &failedError) {
  1204. NoticeWarning("RoundTripperFailedError: %v", retErr)
  1205. }
  1206. }()
  1207. // Cancel DialMeek or MeekConn.RoundTrip when:
  1208. // - Close is called
  1209. // - the input context is done
  1210. ctx, cancelFunc := common.MergeContextCancel(ctx, rt.runCtx)
  1211. defer cancelFunc()
  1212. // Apply any round trip delay. Currently, this is used to apply an
  1213. // announce request delay post-waitToShareSession, pre-network round
  1214. // trip, and cancelable by the above merged context.
  1215. if roundTripDelay > 0 {
  1216. common.SleepWithContext(ctx, roundTripDelay)
  1217. }
  1218. // Apply the round trip timeout after any delay is complete.
  1219. //
  1220. // This timeout includes any TLS handshake network round trips, as
  1221. // performed by the initial DialMeek and may be performed subsequently by
  1222. // net/http via MeekConn.RoundTrip. These extra round trips should be
  1223. // accounted for in the in the difference between client-side request
  1224. // timeouts, such as InproxyProxyAnswerRequestTimeout, and broker-side
  1225. // handler timeouts, such as InproxyBrokerProxyAnnounceTimeout, with the
  1226. // former allowing more time for network round trips.
  1227. requestCtx := ctx
  1228. if roundTripTimeout > 0 {
  1229. var requestCancelFunc context.CancelFunc
  1230. requestCtx, requestCancelFunc = context.WithTimeout(ctx, roundTripTimeout)
  1231. defer requestCancelFunc()
  1232. }
  1233. // The first RoundTrip caller will perform the DialMeek step, which
  1234. // establishes the TLS trasport connection to the fronted endpoint.
  1235. // Following callers will await that DialMeek or share an established
  1236. // connection.
  1237. //
  1238. // To accomodate using custom utls fingerprints, with varying ALPNs, with
  1239. // net/http, DialMeek completes a full TLS handshake before instantiating
  1240. // the appropriate http.Transport or http2.Transport. Until that first
  1241. // DialMeek completes, and unlike standard net/http round trips,
  1242. // InproxyBrokerRoundTripper won't spawn distinct TLS persistent
  1243. // connections for concurrent round trips. After DialMeek, concurrent
  1244. // round trips over HTTP/2 connections may simply share the one TLS
  1245. // connection, while concurrent round trips over HTTP connections may
  1246. // spawn additional TLS persistent connections.
  1247. //
  1248. // There is no retry here if DialMeek fails, as higher levels will invoke
  1249. // BrokerClientRoundTripperFailed on failure, clear any replay, select
  1250. // new dial parameters, and retry.
  1251. if atomic.CompareAndSwapInt32(&rt.dial, 0, 1) {
  1252. // DialMeek hasn't been called yet.
  1253. conn, err := DialMeek(
  1254. requestCtx,
  1255. rt.brokerDialParams.FrontedHTTPDialParameters.meekConfig,
  1256. rt.brokerDialParams.FrontedHTTPDialParameters.dialConfig)
  1257. if err != nil && ctx.Err() != context.Canceled {
  1258. // DialMeek performs an initial TLS handshake. DialMeek errors,
  1259. // excluding a cancelled context as happens on shutdown, are
  1260. // classified as as RoundTripperFailedErrors, which will invoke
  1261. // BrokerClientRoundTripperFailed, resetting the round tripper
  1262. // and clearing replay parameters.
  1263. err = inproxy.NewRoundTripperFailedError(err)
  1264. }
  1265. rt.conn = conn
  1266. rt.dialErr = err
  1267. close(rt.dialCompleted)
  1268. if err != nil {
  1269. return nil, errors.Trace(rt.dialErr)
  1270. }
  1271. } else {
  1272. // Await any ongoing DialMeek run by a concurrent RoundTrip caller.
  1273. select {
  1274. case <-rt.dialCompleted:
  1275. case <-ctx.Done():
  1276. return nil, errors.Trace(ctx.Err())
  1277. }
  1278. if rt.dialErr != nil {
  1279. // There is no NewRoundTripperFailedError wrapping here, as the
  1280. // DialMeek caller will wrap its error and
  1281. // BrokerClientRoundTripperFailed will be invoked already.
  1282. return nil, errors.Trace(rt.dialErr)
  1283. }
  1284. }
  1285. // At this point, rt.conn is an established MeekConn.
  1286. // Note that the network address portion of the URL will be ignored by
  1287. // MeekConn in favor of the MeekDialConfig, while the path will be used.
  1288. url := fmt.Sprintf(
  1289. "https://%s/%s",
  1290. rt.brokerDialParams.FrontedHTTPDialParameters.DialAddress,
  1291. inproxy.BrokerEndPointName)
  1292. request, err := http.NewRequestWithContext(
  1293. requestCtx, "POST", url, bytes.NewBuffer(requestPayload))
  1294. if err != nil {
  1295. return nil, errors.Trace(err)
  1296. }
  1297. startTime := time.Now()
  1298. response, err := rt.conn.RoundTrip(request)
  1299. roundTripDuration := time.Since(startTime)
  1300. if err == nil {
  1301. defer response.Body.Close()
  1302. if response.StatusCode != http.StatusOK {
  1303. err = fmt.Errorf(
  1304. "unexpected response status code %d after %v",
  1305. response.StatusCode,
  1306. roundTripDuration)
  1307. // Depending on the round trip duration, this case is treated as a
  1308. // temporary round tripper failure, since we received a response
  1309. // from the CDN, secured with TLS and VerifyPins, or from broker
  1310. // itself. One common scenario is the CDN returning a temporary
  1311. // timeout error, as can happen when CDN timeouts and broker
  1312. // timeouts are misaligned, especially for long-polling requests.
  1313. //
  1314. // In this scenario, we can reuse the existing round tripper and
  1315. // it may be counterproductive to return a RoundTripperFailedError
  1316. // which will trigger a clearing of any broker dial replay
  1317. // parameters as well as reseting the round tripper.
  1318. //
  1319. // When the round trip duration is sufficiently short, much
  1320. // shorter than expected round trip timeouts, this is still
  1321. // classified as a RoundTripperFailedError error, as it is more
  1322. // likely due to a more serious issue between the CDN and broker.
  1323. if rt.failureThreshold > 0 &&
  1324. roundTripDuration <= rt.failureThreshold {
  1325. err = inproxy.NewRoundTripperFailedError(err)
  1326. }
  1327. }
  1328. } else if ctx.Err() != context.Canceled {
  1329. // Other round trip errors, including TLS failures and client-side
  1330. // timeouts, but excluding a cancelled context as happens on
  1331. // shutdown, are classified as RoundTripperFailedErrors.
  1332. err = inproxy.NewRoundTripperFailedError(err)
  1333. }
  1334. if err != nil {
  1335. return nil, errors.Trace(err)
  1336. }
  1337. responsePayload, err := io.ReadAll(response.Body)
  1338. if err != nil {
  1339. err = inproxy.NewRoundTripperFailedError(err)
  1340. return nil, errors.Trace(err)
  1341. }
  1342. return responsePayload, nil
  1343. }
  1344. // InproxyWebRTCDialInstance is the network state and dial parameters for a
  1345. // single WebRTC client or proxy connection.
  1346. //
  1347. // InproxyWebRTCDialInstance implements the inproxy.WebRTCDialCoordinator
  1348. // interface, which provides the WebRTC dial configuration and support to the
  1349. // in-proxy package.
  1350. type InproxyWebRTCDialInstance struct {
  1351. config *Config
  1352. networkID string
  1353. natStateManager *InproxyNATStateManager
  1354. stunDialParameters *InproxySTUNDialParameters
  1355. webRTCDialParameters *InproxyWebRTCDialParameters
  1356. discoverNAT bool
  1357. disableSTUN bool
  1358. disablePortMapping bool
  1359. disableInboundForMobileNetworks bool
  1360. disableIPv6ICECandidates bool
  1361. discoverNATTimeout time.Duration
  1362. webRTCAnswerTimeout time.Duration
  1363. webRTCAwaitPortMappingTimeout time.Duration
  1364. awaitReadyToProxyTimeout time.Duration
  1365. proxyDestinationDialTimeout time.Duration
  1366. proxyRelayInactivityTimeout time.Duration
  1367. }
  1368. // NewInproxyWebRTCDialInstance creates a new InproxyWebRTCDialInstance.
  1369. //
  1370. // The caller provides STUN and WebRTC dial parameters that are either newly
  1371. // generated or replayed. Proxies may optionally pass in nil for either
  1372. // stunDialParameters or webRTCDialParameters, and new parameters will be
  1373. // generated.
  1374. func NewInproxyWebRTCDialInstance(
  1375. config *Config,
  1376. networkID string,
  1377. isProxy bool,
  1378. natStateManager *InproxyNATStateManager,
  1379. stunDialParameters *InproxySTUNDialParameters,
  1380. webRTCDialParameters *InproxyWebRTCDialParameters) (*InproxyWebRTCDialInstance, error) {
  1381. p := config.GetParameters().Get()
  1382. defer p.Close()
  1383. if isProxy && stunDialParameters == nil {
  1384. // Auto-generate STUN dial parameters. There's no replay in this case.
  1385. var err error
  1386. stunDialParameters, err = MakeInproxySTUNDialParameters(config, p, isProxy)
  1387. if err != nil {
  1388. return nil, errors.Trace(err)
  1389. }
  1390. }
  1391. if isProxy && webRTCDialParameters == nil {
  1392. // Auto-generate STUN dial parameters. There's no replay in this case.
  1393. var err error
  1394. webRTCDialParameters, err = MakeInproxyWebRTCDialParameters(p)
  1395. if err != nil {
  1396. return nil, errors.Trace(err)
  1397. }
  1398. }
  1399. disableSTUN := p.Bool(parameters.InproxyDisableSTUN)
  1400. disablePortMapping := p.Bool(parameters.InproxyDisablePortMapping)
  1401. disableInboundForMobileNetworks := p.Bool(parameters.InproxyDisableInboundForMobileNetworks)
  1402. disableIPv6ICECandidates := p.Bool(parameters.InproxyDisableIPv6ICECandidates)
  1403. var discoverNATTimeout, awaitReadyToProxyTimeout time.Duration
  1404. if isProxy {
  1405. disableSTUN = disableSTUN || p.Bool(parameters.InproxyProxyDisableSTUN)
  1406. disablePortMapping = disablePortMapping || p.Bool(parameters.InproxyProxyDisablePortMapping)
  1407. disableInboundForMobileNetworks = disableInboundForMobileNetworks ||
  1408. p.Bool(parameters.InproxyProxyDisableInboundForMobileNetworks)
  1409. disableIPv6ICECandidates = disableIPv6ICECandidates ||
  1410. p.Bool(parameters.InproxyProxyDisableIPv6ICECandidates)
  1411. discoverNATTimeout = p.Duration(parameters.InproxyProxyDiscoverNATTimeout)
  1412. awaitReadyToProxyTimeout = p.Duration(parameters.InproxyProxyWebRTCAwaitReadyToProxyTimeout)
  1413. } else {
  1414. disableSTUN = disableSTUN || p.Bool(parameters.InproxyClientDisableSTUN)
  1415. disablePortMapping = disablePortMapping || p.Bool(parameters.InproxyClientDisablePortMapping)
  1416. disableInboundForMobileNetworks = disableInboundForMobileNetworks ||
  1417. p.Bool(parameters.InproxyClientDisableInboundForMobileNetworks)
  1418. disableIPv6ICECandidates = disableIPv6ICECandidates ||
  1419. p.Bool(parameters.InproxyClientDisableIPv6ICECandidates)
  1420. discoverNATTimeout = p.Duration(parameters.InproxyClientDiscoverNATTimeout)
  1421. awaitReadyToProxyTimeout = p.Duration(parameters.InproxyClientWebRTCAwaitReadyToProxyTimeout)
  1422. }
  1423. if clientAPILevelDisableInproxyPortMapping.Load() {
  1424. disablePortMapping = true
  1425. }
  1426. // Parameters such as disabling certain operations and operation timeouts
  1427. // are not replayed, but snapshots are stored in the
  1428. // InproxyWebRTCDialInstance for efficient lookup.
  1429. return &InproxyWebRTCDialInstance{
  1430. config: config,
  1431. networkID: networkID,
  1432. natStateManager: natStateManager,
  1433. stunDialParameters: stunDialParameters,
  1434. webRTCDialParameters: webRTCDialParameters,
  1435. // discoverNAT is ignored by proxies, which always attempt discovery.
  1436. // webRTCAnswerTimeout, proxyDestinationDialTimeout, and
  1437. // proxyRelayInactivityTimeout are used only by proxies.
  1438. discoverNAT: p.WeightedCoinFlip(parameters.InproxyClientDiscoverNATProbability),
  1439. disableSTUN: disableSTUN,
  1440. disablePortMapping: disablePortMapping,
  1441. disableInboundForMobileNetworks: disableInboundForMobileNetworks,
  1442. disableIPv6ICECandidates: disableIPv6ICECandidates,
  1443. discoverNATTimeout: discoverNATTimeout,
  1444. webRTCAnswerTimeout: p.Duration(parameters.InproxyWebRTCAnswerTimeout),
  1445. webRTCAwaitPortMappingTimeout: p.Duration(parameters.InproxyWebRTCAwaitPortMappingTimeout),
  1446. awaitReadyToProxyTimeout: awaitReadyToProxyTimeout,
  1447. proxyDestinationDialTimeout: p.Duration(parameters.InproxyProxyDestinationDialTimeout),
  1448. proxyRelayInactivityTimeout: p.Duration(parameters.InproxyProxyRelayInactivityTimeout),
  1449. }, nil
  1450. }
  1451. // Implements the inproxy.WebRTCDialCoordinator interface.
  1452. func (w *InproxyWebRTCDialInstance) NetworkID() string {
  1453. return w.networkID
  1454. }
  1455. // Implements the inproxy.WebRTCDialCoordinator interface.
  1456. func (w *InproxyWebRTCDialInstance) NetworkType() inproxy.NetworkType {
  1457. return getInproxyNetworkType(GetNetworkType(w.networkID))
  1458. }
  1459. // Implements the inproxy.WebRTCDialCoordinator interface.
  1460. func (w *InproxyWebRTCDialInstance) ClientRootObfuscationSecret() inproxy.ObfuscationSecret {
  1461. return w.webRTCDialParameters.RootObfuscationSecret
  1462. }
  1463. // Implements the inproxy.WebRTCDialCoordinator interface.
  1464. func (w *InproxyWebRTCDialInstance) DoDTLSRandomization() bool {
  1465. return w.webRTCDialParameters.DoDTLSRandomization
  1466. }
  1467. // Implements the inproxy.WebRTCDialCoordinator interface.
  1468. func (w *InproxyWebRTCDialInstance) UseMediaStreams() bool {
  1469. return w.webRTCDialParameters.UseMediaStreams
  1470. }
  1471. // Implements the inproxy.WebRTCDialCoordinator interface.
  1472. func (w *InproxyWebRTCDialInstance) TrafficShapingParameters() *inproxy.TrafficShapingParameters {
  1473. return w.webRTCDialParameters.TrafficShapingParameters
  1474. }
  1475. // Implements the inproxy.WebRTCDialCoordinator interface.
  1476. func (w *InproxyWebRTCDialInstance) STUNServerAddress(RFC5780 bool) string {
  1477. if RFC5780 {
  1478. return w.stunDialParameters.STUNServerAddressRFC5780
  1479. } else {
  1480. return w.stunDialParameters.STUNServerAddress
  1481. }
  1482. }
  1483. // Implements the inproxy.WebRTCDialCoordinator interface.
  1484. func (w *InproxyWebRTCDialInstance) STUNServerAddressResolved(RFC5780 bool) string {
  1485. if RFC5780 {
  1486. return w.stunDialParameters.STUNServerAddressRFC5780
  1487. } else {
  1488. return w.stunDialParameters.STUNServerAddress
  1489. }
  1490. }
  1491. // Implements the inproxy.WebRTCDialCoordinator interface.
  1492. func (w *InproxyWebRTCDialInstance) STUNServerAddressSucceeded(RFC5780 bool, address string) {
  1493. // Currently, for client tunnel dials, STUN dial parameter replay is
  1494. // managed by DialParameters and DialParameters.InproxySTUNDialParameters
  1495. // are replayed only when the entire dial succeeds.
  1496. //
  1497. // Note that, for a client tunnel dial, even if the STUN step fails and
  1498. // there are no STUN ICE candidates, the subsequent WebRTC connection may
  1499. // still proceed and be successful. In this case, the failed STUN dial
  1500. // parameters may be replayed.
  1501. //
  1502. // For proxies, there is no STUN dial parameter replay.
  1503. //
  1504. // As a future enhancement, consider independent and shared replay of
  1505. // working STUN servers, similar to how broker client dial parameters are
  1506. // replayed independent of overall dials and proxy relays, and shared
  1507. // between local client and proxy instances.
  1508. // Verify/extend the resolver cache entry for any resolved domain after a
  1509. // success.
  1510. resolver := w.config.GetResolver()
  1511. if resolver != nil {
  1512. resolver.VerifyCacheExtension(address)
  1513. }
  1514. }
  1515. // Implements the inproxy.WebRTCDialCoordinator interface.
  1516. func (w *InproxyWebRTCDialInstance) STUNServerAddressFailed(RFC5780 bool, address string) {
  1517. // Currently there is no independent replay for STUN dial parameters. See
  1518. // comment in STUNServerAddressSucceeded.
  1519. }
  1520. // Implements the inproxy.WebRTCDialCoordinator interface.
  1521. func (w *InproxyWebRTCDialInstance) DiscoverNAT() bool {
  1522. return w.discoverNAT
  1523. }
  1524. // Implements the inproxy.WebRTCDialCoordinator interface.
  1525. func (w *InproxyWebRTCDialInstance) DisableSTUN() bool {
  1526. return w.disableSTUN
  1527. }
  1528. // Implements the inproxy.WebRTCDialCoordinator interface.
  1529. func (w *InproxyWebRTCDialInstance) DisablePortMapping() bool {
  1530. return w.disablePortMapping
  1531. }
  1532. // Implements the inproxy.WebRTCDialCoordinator interface.
  1533. func (w *InproxyWebRTCDialInstance) DisableInboundForMobileNetworks() bool {
  1534. return w.disableInboundForMobileNetworks
  1535. }
  1536. // Implements the inproxy.WebRTCDialCoordinator interface.
  1537. func (w *InproxyWebRTCDialInstance) DisableIPv6ICECandidates() bool {
  1538. return w.disableIPv6ICECandidates
  1539. }
  1540. // Implements the inproxy.WebRTCDialCoordinator interface.
  1541. func (w *InproxyWebRTCDialInstance) NATType() inproxy.NATType {
  1542. return w.natStateManager.getNATType(w.networkID)
  1543. }
  1544. // Implements the inproxy.WebRTCDialCoordinator interface.
  1545. func (w *InproxyWebRTCDialInstance) SetNATType(natType inproxy.NATType) {
  1546. w.natStateManager.setNATType(w.networkID, natType)
  1547. }
  1548. // Implements the inproxy.WebRTCDialCoordinator interface.
  1549. func (w *InproxyWebRTCDialInstance) PortMappingTypes() inproxy.PortMappingTypes {
  1550. return w.natStateManager.getPortMappingTypes(w.networkID)
  1551. }
  1552. // Implements the inproxy.WebRTCDialCoordinator interface.
  1553. func (w *InproxyWebRTCDialInstance) SetPortMappingTypes(
  1554. portMappingTypes inproxy.PortMappingTypes) {
  1555. w.natStateManager.setPortMappingTypes(w.networkID, portMappingTypes)
  1556. }
  1557. // Implements the inproxy.WebRTCDialCoordinator interface.
  1558. func (w *InproxyWebRTCDialInstance) PortMappingProbe() *inproxy.PortMappingProbe {
  1559. return w.natStateManager.getPortMappingProbe(w.networkID)
  1560. }
  1561. // Implements the inproxy.WebRTCDialCoordinator interface.
  1562. func (w *InproxyWebRTCDialInstance) SetPortMappingProbe(
  1563. portMappingProbe *inproxy.PortMappingProbe) {
  1564. w.natStateManager.setPortMappingProbe(w.networkID, portMappingProbe)
  1565. }
  1566. // Implements the inproxy.WebRTCDialCoordinator interface.
  1567. func (w *InproxyWebRTCDialInstance) ResolveAddress(ctx context.Context, network, address string) (string, error) {
  1568. // Use the Psiphon resolver to resolve addresses.
  1569. r := w.config.GetResolver()
  1570. if r == nil {
  1571. return "", errors.TraceNew("missing resolver")
  1572. }
  1573. // Identify when the address to be resolved is one of the configured STUN
  1574. // servers, and, in those cases, use/replay any STUN dial parameters
  1575. // ResolveParameters; and record the resolved IP address for metrics.
  1576. //
  1577. // In the in-proxy proxy case, ResolveAddress is invoked for the upstream,
  1578. // 2nd hop dial as well as for STUN server addresses.
  1579. //
  1580. // Limitation: there's no ResolveParameters, including no preresolved DNS
  1581. // tactics, for 2nd hop dials.
  1582. isSTUNServerAddress := address == w.stunDialParameters.STUNServerAddress
  1583. isSTUNServerAddressRFC5780 := address == w.stunDialParameters.STUNServerAddressRFC5780
  1584. var resolveParams *resolver.ResolveParameters
  1585. if isSTUNServerAddress || isSTUNServerAddressRFC5780 {
  1586. resolveParams = w.stunDialParameters.ResolveParameters
  1587. }
  1588. resolved, err := r.ResolveAddress(
  1589. ctx, w.networkID, resolveParams, network, address)
  1590. if err != nil {
  1591. return "", errors.Trace(err)
  1592. }
  1593. // Invoke the resolved IP callbacks only when the input is not the
  1594. // resolved IP address (this differs from the meek
  1595. // DialConfig.ResolvedIPCallback case).
  1596. if resolved != address {
  1597. if isSTUNServerAddress {
  1598. w.stunDialParameters.STUNServerResolvedIPAddress.Store(resolved)
  1599. } else if isSTUNServerAddressRFC5780 {
  1600. w.stunDialParameters.STUNServerRFC5780ResolvedIPAddress.Store(resolved)
  1601. }
  1602. }
  1603. return resolved, nil
  1604. }
  1605. // Implements the inproxy.WebRTCDialCoordinator interface.
  1606. func (w *InproxyWebRTCDialInstance) UDPListen(ctx context.Context) (net.PacketConn, error) {
  1607. // Create a new inproxyUDPConn for use as the in-proxy STUN and/ord WebRTC
  1608. // UDP socket.
  1609. conn, err := newInproxyUDPConn(ctx, w.config)
  1610. if err != nil {
  1611. return nil, errors.Trace(err)
  1612. }
  1613. return conn, nil
  1614. }
  1615. // Implements the inproxy.WebRTCDialCoordinator interface.
  1616. func (w *InproxyWebRTCDialInstance) UDPConn(
  1617. ctx context.Context, network, remoteAddress string) (net.PacketConn, error) {
  1618. // Create a new UDPConn bound to the specified remote address. This UDP
  1619. // conn is used, by the inproxy package, to determine the local address
  1620. // of the active interface the OS will select for the specified remote
  1621. // destination.
  1622. //
  1623. // Only IP address destinations are supported. ResolveIP is wired up only
  1624. // because NewUDPConn requires a non-nil resolver.
  1625. dialConfig := &DialConfig{
  1626. DeviceBinder: w.config.deviceBinder,
  1627. IPv6Synthesizer: w.config.IPv6Synthesizer,
  1628. ResolveIP: func(_ context.Context, hostname string) ([]net.IP, error) {
  1629. IP := net.ParseIP(hostname)
  1630. if IP == nil {
  1631. return nil, errors.TraceNew("not supported")
  1632. }
  1633. return []net.IP{IP}, nil
  1634. },
  1635. }
  1636. conn, _, err := NewUDPConn(ctx, network, true, "", remoteAddress, dialConfig)
  1637. if err != nil {
  1638. return nil, errors.Trace(err)
  1639. }
  1640. return conn, nil
  1641. }
  1642. // Implements the inproxy.WebRTCDialCoordinator interface.
  1643. func (w *InproxyWebRTCDialInstance) BindToDevice(fileDescriptor int) error {
  1644. if w.config.deviceBinder == nil {
  1645. return nil
  1646. }
  1647. // Use config.deviceBinder, with wired up logging, not
  1648. // config.DeviceBinder; other tunnel-core dials do this indirectly via
  1649. // psiphon.DialConfig.
  1650. _, err := w.config.deviceBinder.BindToDevice(fileDescriptor)
  1651. return errors.Trace(err)
  1652. }
  1653. func (w *InproxyWebRTCDialInstance) ProxyUpstreamDial(
  1654. ctx context.Context, network, address string) (net.Conn, error) {
  1655. // This implementation of ProxyUpstreamDial applies additional socket
  1656. // options and BindToDevice as required, but is otherwise a stock dialer.
  1657. //
  1658. // TODO: Use custom UDP and TCP dialers, and wire up TCP/UDP-level
  1659. // tactics, including BPF and the custom resolver, which may be enabled
  1660. // for the proxy's ISP or geolocation. Orchestrating preresolved DNS
  1661. // requires additional information from either from the broker, the
  1662. // FrontingProviderID, to be applied to any
  1663. // DNSResolverPreresolvedIPAddressCIDRs proxy tactics. In addition,
  1664. // replay the selected upstream dial tactics parameters.
  1665. splitUpstreamInterfaceName := w.config.InproxyProxySplitUpstreamInterfaceName
  1666. dialer := net.Dialer{
  1667. Control: func(_, _ string, c syscall.RawConn) error {
  1668. var controlErr error
  1669. err := c.Control(func(fd uintptr) {
  1670. socketFD := int(fd)
  1671. setAdditionalSocketOptions(socketFD)
  1672. if w.config.deviceBinder != nil {
  1673. _, err := w.config.deviceBinder.BindToDevice(socketFD)
  1674. if err != nil {
  1675. controlErr = errors.Tracef("BindToDevice failed: %s", err)
  1676. return
  1677. }
  1678. } else if splitUpstreamInterfaceName != "" {
  1679. err := tun.BindToDevice(socketFD, splitUpstreamInterfaceName)
  1680. if err != nil {
  1681. controlErr = errors.Tracef("BindToDevice failed: %s", err)
  1682. return
  1683. }
  1684. }
  1685. })
  1686. if controlErr != nil {
  1687. return errors.Trace(controlErr)
  1688. }
  1689. return errors.Trace(err)
  1690. },
  1691. }
  1692. conn, err := dialer.DialContext(ctx, network, address)
  1693. if err != nil {
  1694. return nil, errors.Trace(err)
  1695. }
  1696. return conn, nil
  1697. }
  1698. // Implements the inproxy.WebRTCDialCoordinator interface.
  1699. func (w *InproxyWebRTCDialInstance) DiscoverNATTimeout() time.Duration {
  1700. return w.discoverNATTimeout
  1701. }
  1702. // Implements the inproxy.WebRTCDialCoordinator interface.
  1703. func (w *InproxyWebRTCDialInstance) WebRTCAnswerTimeout() time.Duration {
  1704. return w.webRTCAnswerTimeout
  1705. }
  1706. // Implements the inproxy.WebRTCDialCoordinator interface.
  1707. func (w *InproxyWebRTCDialInstance) WebRTCAwaitPortMappingTimeout() time.Duration {
  1708. return w.webRTCAwaitPortMappingTimeout
  1709. }
  1710. // Implements the inproxy.WebRTCDialCoordinator interface.
  1711. func (w *InproxyWebRTCDialInstance) WebRTCAwaitReadyToProxyTimeout() time.Duration {
  1712. return w.awaitReadyToProxyTimeout
  1713. }
  1714. // Implements the inproxy.WebRTCDialCoordinator interface.
  1715. func (w *InproxyWebRTCDialInstance) ProxyDestinationDialTimeout() time.Duration {
  1716. return w.proxyDestinationDialTimeout
  1717. }
  1718. // Implements the inproxy.WebRTCDialCoordinator interface.
  1719. func (w *InproxyWebRTCDialInstance) ProxyRelayInactivityTimeout() time.Duration {
  1720. return w.proxyRelayInactivityTimeout
  1721. }
  1722. // InproxySTUNDialParameters is a set of STUN dial parameters.
  1723. // InproxySTUNDialParameters is compatible with DialParameters JSON
  1724. // marshaling. For client in-proxy tunnel dials, DialParameters will manage
  1725. // STUN dial parameter selection and replay.
  1726. //
  1727. // When an instance of InproxySTUNDialParameters is unmarshaled from JSON,
  1728. // Prepare must be called to initialize the instance for use.
  1729. type InproxySTUNDialParameters struct {
  1730. ResolveParameters *resolver.ResolveParameters
  1731. STUNServerAddress string
  1732. STUNServerAddressRFC5780 string
  1733. STUNServerResolvedIPAddress atomic.Value `json:"-"`
  1734. STUNServerRFC5780ResolvedIPAddress atomic.Value `json:"-"`
  1735. }
  1736. // MakeInproxySTUNDialParameters generates new STUN dial parameters from the
  1737. // given tactics parameters.
  1738. func MakeInproxySTUNDialParameters(
  1739. config *Config,
  1740. p parameters.ParametersAccessor,
  1741. isProxy bool) (*InproxySTUNDialParameters, error) {
  1742. var stunServerAddresses, stunServerAddressesRFC5780 []string
  1743. if isProxy {
  1744. stunServerAddresses = p.Strings(
  1745. parameters.InproxyProxySTUNServerAddresses, parameters.InproxySTUNServerAddresses)
  1746. stunServerAddressesRFC5780 = p.Strings(
  1747. parameters.InproxyProxySTUNServerAddressesRFC5780, parameters.InproxySTUNServerAddressesRFC5780)
  1748. } else {
  1749. stunServerAddresses = p.Strings(
  1750. parameters.InproxyClientSTUNServerAddresses, parameters.InproxySTUNServerAddresses)
  1751. stunServerAddressesRFC5780 = p.Strings(
  1752. parameters.InproxyClientSTUNServerAddressesRFC5780, parameters.InproxySTUNServerAddressesRFC5780)
  1753. }
  1754. // Empty STUN server address lists are not an error condition. When used
  1755. // for WebRTC, the STUN ICE candidate gathering will be skipped but the
  1756. // WebRTC connection may still be established via other candidate types.
  1757. var stunServerAddress, stunServerAddressRFC5780 string
  1758. if len(stunServerAddresses) > 0 {
  1759. stunServerAddress = stunServerAddresses[prng.Range(0, len(stunServerAddresses)-1)]
  1760. }
  1761. if len(stunServerAddressesRFC5780) > 0 {
  1762. stunServerAddressRFC5780 =
  1763. stunServerAddressesRFC5780[prng.Range(0, len(stunServerAddressesRFC5780)-1)]
  1764. }
  1765. // Create DNS resolver dial parameters to use when resolving STUN server
  1766. // domain addresses. Instantiate only when there is a domain to be
  1767. // resolved; when recording DNS fields, GetMetrics will assume that a nil
  1768. // InproxySTUNDialParameters.ResolveParameters implies no resolve was
  1769. // attempted.
  1770. var resolveParameters *resolver.ResolveParameters
  1771. if (stunServerAddress != "" && net.ParseIP(stunServerAddress) == nil) ||
  1772. (stunServerAddressRFC5780 != "" && net.ParseIP(stunServerAddressRFC5780) == nil) {
  1773. // No DNSResolverPreresolvedIPAddressCIDRs will be selected since no
  1774. // fronting provider ID is specified.
  1775. //
  1776. // It would be possible to overload the meaning of the fronting
  1777. // provider ID field by using a string derived from STUN server
  1778. // address as the key.
  1779. //
  1780. // However, preresolved STUN configuration can already be achieved
  1781. // with IP addresses in the STUNServerAddresses tactics parameters.
  1782. // This approach results in slightly different metrics log fields vs.
  1783. // preresolved.
  1784. var err error
  1785. resolveParameters, err = config.GetResolver().MakeResolveParameters(p, "", "")
  1786. if err != nil {
  1787. return nil, errors.Trace(err)
  1788. }
  1789. }
  1790. dialParams := &InproxySTUNDialParameters{
  1791. ResolveParameters: resolveParameters,
  1792. STUNServerAddress: stunServerAddress,
  1793. STUNServerAddressRFC5780: stunServerAddressRFC5780,
  1794. }
  1795. dialParams.Prepare()
  1796. return dialParams, nil
  1797. }
  1798. // Prepare initializes an InproxySTUNDialParameters for use. Prepare should be
  1799. // called for any InproxySTUNDialParameters instance unmarshaled from JSON.
  1800. func (dialParams *InproxySTUNDialParameters) Prepare() {
  1801. dialParams.STUNServerResolvedIPAddress.Store("")
  1802. dialParams.STUNServerRFC5780ResolvedIPAddress.Store("")
  1803. }
  1804. // IsValidClientReplay checks that the selected STUN servers remain configured
  1805. // STUN server candidates for in-proxy clients.
  1806. func (dialParams *InproxySTUNDialParameters) IsValidClientReplay(
  1807. p parameters.ParametersAccessor) bool {
  1808. return (dialParams.STUNServerAddress == "" ||
  1809. common.Contains(
  1810. p.Strings(parameters.InproxyClientSTUNServerAddresses),
  1811. dialParams.STUNServerAddress)) &&
  1812. (dialParams.STUNServerAddressRFC5780 == "" ||
  1813. common.Contains(
  1814. p.Strings(parameters.InproxyClientSTUNServerAddressesRFC5780),
  1815. dialParams.STUNServerAddressRFC5780))
  1816. }
  1817. // GetMetrics implements the common.MetricsSource interface and returns log
  1818. // fields detailing the STUN dial parameters.
  1819. func (dialParams *InproxySTUNDialParameters) GetMetrics() common.LogFields {
  1820. // There is no is_replay-type field added here; replay is handled at a
  1821. // higher level, and, for client in-proxy tunnel dials, is part of the
  1822. // main tunnel dial parameters.
  1823. logFields := make(common.LogFields)
  1824. logFields["inproxy_webrtc_stun_server"] = dialParams.STUNServerAddress
  1825. resolvedIPAddress := dialParams.STUNServerResolvedIPAddress.Load().(string)
  1826. if resolvedIPAddress != "" {
  1827. logFields["inproxy_webrtc_stun_server_resolved_ip_address"] = resolvedIPAddress
  1828. }
  1829. // TODO: log RFC5780 selection only if used?
  1830. logFields["inproxy_webrtc_stun_server_RFC5780"] = dialParams.STUNServerAddressRFC5780
  1831. resolvedIPAddress = dialParams.STUNServerRFC5780ResolvedIPAddress.Load().(string)
  1832. if resolvedIPAddress != "" {
  1833. logFields["inproxy_webrtc_stun_server_RFC5780_resolved_ip_address"] = resolvedIPAddress
  1834. }
  1835. if dialParams.ResolveParameters != nil {
  1836. // See comment in getBaseAPIParameters regarding
  1837. // dialParams.ResolveParameters handling. As noted in
  1838. // MakeInproxySTUNDialParameters, no preresolved parameters are set,
  1839. // so none are checked for logging.
  1840. //
  1841. // Limitation: the potential use of single ResolveParameters to
  1842. // resolve multiple, different STUN server domains can skew the
  1843. // meaning of GetFirstAttemptWithAnswer.
  1844. if dialParams.ResolveParameters.PreferAlternateDNSServer {
  1845. logFields["inproxy_webrtc_dns_preferred"] = dialParams.ResolveParameters.AlternateDNSServer
  1846. }
  1847. if dialParams.ResolveParameters.ProtocolTransformName != "" {
  1848. logFields["inproxy_webrtc_dns_transform"] = dialParams.ResolveParameters.ProtocolTransformName
  1849. }
  1850. if dialParams.ResolveParameters.RandomQNameCasingSeed != nil {
  1851. logFields["inproxy_webrtc_dns_qname_random_casing"] = "1"
  1852. }
  1853. if dialParams.ResolveParameters.ResponseQNameMustMatch {
  1854. logFields["inproxy_webrtc_dns_qname_must_match"] = "1"
  1855. }
  1856. logFields["inproxy_webrtc_dns_qname_mismatches"] = strconv.Itoa(
  1857. dialParams.ResolveParameters.GetQNameMismatches())
  1858. logFields["inproxy_webrtc_dns_attempt"] = strconv.Itoa(
  1859. dialParams.ResolveParameters.GetFirstAttemptWithAnswer())
  1860. }
  1861. return logFields
  1862. }
  1863. // InproxyWebRTCDialParameters is a set of WebRTC obfuscation dial parameters.
  1864. // InproxyWebRTCDialParameters is compatible with DialParameters JSON
  1865. // marshaling. For client in-proxy tunnel dials, DialParameters will manage
  1866. // WebRTC dial parameter selection and replay.
  1867. type InproxyWebRTCDialParameters struct {
  1868. RootObfuscationSecret inproxy.ObfuscationSecret
  1869. UseMediaStreams bool
  1870. TrafficShapingParameters *inproxy.TrafficShapingParameters
  1871. DoDTLSRandomization bool
  1872. }
  1873. // MakeInproxyWebRTCDialParameters generates new InproxyWebRTCDialParameters.
  1874. func MakeInproxyWebRTCDialParameters(
  1875. p parameters.ParametersAccessor) (*InproxyWebRTCDialParameters, error) {
  1876. rootObfuscationSecret, err := inproxy.GenerateRootObfuscationSecret()
  1877. if err != nil {
  1878. return nil, errors.Trace(err)
  1879. }
  1880. useMediaStreams := p.WeightedCoinFlip(parameters.InproxyWebRTCMediaStreamsProbability)
  1881. var trafficSharingParams *inproxy.TrafficShapingParameters
  1882. if useMediaStreams {
  1883. if p.WeightedCoinFlip(parameters.InproxyWebRTCMediaStreamsTrafficShapingProbability) {
  1884. t := inproxy.TrafficShapingParameters(
  1885. p.InproxyTrafficShapingParameters(
  1886. parameters.InproxyWebRTCMediaStreamsTrafficShapingParameters))
  1887. trafficSharingParams = &t
  1888. }
  1889. } else {
  1890. if p.WeightedCoinFlip(parameters.InproxyWebRTCDataChannelTrafficShapingProbability) {
  1891. t := inproxy.TrafficShapingParameters(
  1892. p.InproxyTrafficShapingParameters(
  1893. parameters.InproxyWebRTCDataChannelTrafficShapingParameters))
  1894. trafficSharingParams = &t
  1895. }
  1896. }
  1897. doDTLSRandomization := p.WeightedCoinFlip(parameters.InproxyDTLSRandomizationProbability)
  1898. return &InproxyWebRTCDialParameters{
  1899. RootObfuscationSecret: rootObfuscationSecret,
  1900. UseMediaStreams: useMediaStreams,
  1901. TrafficShapingParameters: trafficSharingParams,
  1902. DoDTLSRandomization: doDTLSRandomization,
  1903. }, nil
  1904. }
  1905. // GetMetrics implements the common.MetricsSource interface.
  1906. func (dialParams *InproxyWebRTCDialParameters) GetMetrics() common.LogFields {
  1907. // There is no is_replay-type field added here; replay is handled at a
  1908. // higher level, and, for client in-proxy tunnel dials, is part of the
  1909. // main tunnel dial parameters.
  1910. // Currently, all WebRTC metrics are delivered via
  1911. // inproxy.ClientConn/WebRTCConn GetMetrics.
  1912. return common.LogFields{}
  1913. }
  1914. // InproxyNATStateManager manages the NAT-related network topology state for
  1915. // the current network, caching the discovered network NAT type and supported
  1916. // port mapping types, if any.
  1917. type InproxyNATStateManager struct {
  1918. config *Config
  1919. mutex sync.Mutex
  1920. networkID string
  1921. natType inproxy.NATType
  1922. portMappingTypes inproxy.PortMappingTypes
  1923. portMappingProbe *inproxy.PortMappingProbe
  1924. }
  1925. // NewInproxyNATStateManager creates a new InproxyNATStateManager.
  1926. func NewInproxyNATStateManager(config *Config) *InproxyNATStateManager {
  1927. s := &InproxyNATStateManager{
  1928. config: config,
  1929. natType: inproxy.NATTypeUnknown,
  1930. portMappingTypes: inproxy.PortMappingTypes{},
  1931. }
  1932. s.reset()
  1933. return s
  1934. }
  1935. // TacticsApplied implements the TacticsAppliedReceiver interface, and is
  1936. // called when tactics have changed, which triggers a cached NAT state reset
  1937. // in order to apply potentially changed parameters.
  1938. func (s *InproxyNATStateManager) TacticsApplied() error {
  1939. s.reset()
  1940. return nil
  1941. }
  1942. func (s *InproxyNATStateManager) reset() {
  1943. s.mutex.Lock()
  1944. defer s.mutex.Unlock()
  1945. networkID := s.config.GetNetworkID()
  1946. s.networkID = networkID
  1947. s.natType = inproxy.NATTypeUnknown
  1948. s.portMappingTypes = inproxy.PortMappingTypes{}
  1949. }
  1950. func (s *InproxyNATStateManager) getNATType(
  1951. networkID string) inproxy.NATType {
  1952. s.mutex.Lock()
  1953. defer s.mutex.Unlock()
  1954. if s.networkID != networkID {
  1955. return inproxy.NATTypeUnknown
  1956. }
  1957. return s.natType
  1958. }
  1959. func (s *InproxyNATStateManager) setNATType(
  1960. networkID string, natType inproxy.NATType) {
  1961. s.mutex.Lock()
  1962. defer s.mutex.Unlock()
  1963. if s.networkID != networkID {
  1964. return
  1965. }
  1966. s.natType = natType
  1967. }
  1968. func (s *InproxyNATStateManager) getPortMappingTypes(
  1969. networkID string) inproxy.PortMappingTypes {
  1970. s.mutex.Lock()
  1971. defer s.mutex.Unlock()
  1972. if s.networkID != networkID {
  1973. return inproxy.PortMappingTypes{}
  1974. }
  1975. return s.portMappingTypes
  1976. }
  1977. func (s *InproxyNATStateManager) setPortMappingTypes(
  1978. networkID string,
  1979. portMappingTypes inproxy.PortMappingTypes) {
  1980. s.mutex.Lock()
  1981. defer s.mutex.Unlock()
  1982. if s.networkID != networkID {
  1983. return
  1984. }
  1985. s.portMappingTypes = portMappingTypes
  1986. }
  1987. func (s *InproxyNATStateManager) getPortMappingProbe(
  1988. networkID string) *inproxy.PortMappingProbe {
  1989. s.mutex.Lock()
  1990. defer s.mutex.Unlock()
  1991. if s.networkID != networkID {
  1992. return nil
  1993. }
  1994. return s.portMappingProbe
  1995. }
  1996. func (s *InproxyNATStateManager) setPortMappingProbe(
  1997. networkID string,
  1998. portMappingProbe *inproxy.PortMappingProbe) {
  1999. s.mutex.Lock()
  2000. defer s.mutex.Unlock()
  2001. if s.networkID != networkID {
  2002. return
  2003. }
  2004. s.portMappingProbe = portMappingProbe
  2005. }
  2006. // inproxyUDPConn is based on NewUDPConn and includes the write timeout
  2007. // workaround from common.WriteTimeoutUDPConn.
  2008. //
  2009. // inproxyUDPConn expands the NewUDPConn IPv6Synthesizer to support many
  2010. // destination addresses, as the inproxyUDPConn will be used to send/receive
  2011. // packets between many remote destination addresses.
  2012. //
  2013. // inproxyUDPConn implements the net.PacketConn interface.
  2014. type inproxyUDPConn struct {
  2015. udpConn *net.UDPConn
  2016. ipv6Synthesizer IPv6Synthesizer
  2017. synthesizerMutex sync.Mutex
  2018. ipv4ToIPv6 map[netip.Addr]net.IP
  2019. ipv6ToIPv4 map[netip.Addr]net.IP
  2020. }
  2021. func newInproxyUDPConn(ctx context.Context, config *Config) (net.PacketConn, error) {
  2022. listen := &net.ListenConfig{
  2023. Control: func(_, _ string, c syscall.RawConn) error {
  2024. var controlErr error
  2025. err := c.Control(func(fd uintptr) {
  2026. socketFD := int(fd)
  2027. setAdditionalSocketOptions(socketFD)
  2028. // Use config.deviceBinder, with wired up logging, not
  2029. // config.DeviceBinder; other tunnel-core dials do this
  2030. // indirectly via psiphon.DialConfig.
  2031. if config.deviceBinder != nil {
  2032. _, err := config.deviceBinder.BindToDevice(socketFD)
  2033. if err != nil {
  2034. controlErr = errors.Tracef("BindToDevice failed: %s", err)
  2035. return
  2036. }
  2037. }
  2038. })
  2039. if controlErr != nil {
  2040. return errors.Trace(controlErr)
  2041. }
  2042. return errors.Trace(err)
  2043. },
  2044. }
  2045. // Create an "unconnected" UDP socket for use with WriteTo and listening
  2046. // on all interfaces. See the limitation comment in NewUDPConn regarding
  2047. // its equivilent mode.
  2048. packetConn, err := listen.ListenPacket(ctx, "udp", "")
  2049. if err != nil {
  2050. return nil, errors.Trace(err)
  2051. }
  2052. var ok bool
  2053. udpConn, ok := packetConn.(*net.UDPConn)
  2054. if !ok {
  2055. return nil, errors.Tracef("unexpected conn type: %T", packetConn)
  2056. }
  2057. conn := &inproxyUDPConn{
  2058. udpConn: udpConn,
  2059. ipv6Synthesizer: config.IPv6Synthesizer,
  2060. }
  2061. if conn.ipv6Synthesizer != nil {
  2062. conn.ipv4ToIPv6 = make(map[netip.Addr]net.IP)
  2063. conn.ipv6ToIPv4 = make(map[netip.Addr]net.IP)
  2064. }
  2065. return conn, nil
  2066. }
  2067. func (conn *inproxyUDPConn) ReadFrom(p []byte) (int, net.Addr, error) {
  2068. // net.UDPConn.ReadFrom currently allocates a &UDPAddr{} per call, and so
  2069. // the &net.UDPAddr{} allocations done in the following synthesizer code
  2070. // path are no more than the standard code path.
  2071. //
  2072. // TODO: avoid all address allocations in both ReadFrom and WriteTo by:
  2073. //
  2074. // - changing ipvXToIPvY to map[netip.AddrPort]*net.UDPAddr
  2075. // - using a similar lookup for the non-synthesizer code path
  2076. //
  2077. // Such a scheme would work only if the caller is guaranteed to not mutate
  2078. // the returned net.Addr.
  2079. if conn.ipv6Synthesizer == nil {
  2080. // Do not wrap any I/O err returned by UDPConn
  2081. return conn.udpConn.ReadFrom(p)
  2082. }
  2083. n, addrPort, err := conn.udpConn.ReadFromUDPAddrPort(p)
  2084. // Reverse any synthesized address before returning err.
  2085. // Reverse the IPv6 synthesizer, returning the original IPv4 address
  2086. // as expected by the caller, including pion/webrtc. This logic
  2087. // assumes that no synthesized IPv6 address will conflict with any
  2088. // real IPv6 address.
  2089. var IP net.IP
  2090. ipAddr := addrPort.Addr()
  2091. if ipAddr.Is6() {
  2092. conn.synthesizerMutex.Lock()
  2093. IP, _ = conn.ipv6ToIPv4[ipAddr]
  2094. conn.synthesizerMutex.Unlock()
  2095. }
  2096. if IP == nil {
  2097. IP = ipAddr.AsSlice()
  2098. }
  2099. // Do not wrap any I/O err returned by UDPConn
  2100. return n, &net.UDPAddr{IP: IP, Port: int(addrPort.Port())}, err
  2101. }
  2102. func (conn *inproxyUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) {
  2103. // See common.WriteTimeoutUDPConn.
  2104. err := conn.udpConn.SetWriteDeadline(
  2105. time.Now().Add(common.UDP_PACKET_WRITE_TIMEOUT))
  2106. if err != nil {
  2107. return 0, errors.Trace(err)
  2108. }
  2109. if conn.ipv6Synthesizer == nil {
  2110. // Do not wrap any I/O err returned by UDPConn
  2111. return conn.udpConn.WriteTo(b, addr)
  2112. }
  2113. // When configured, attempt to synthesize IPv6 addresses from an IPv4
  2114. // addresses for compatibility on DNS64/NAT64 networks.
  2115. //
  2116. // Store any synthesized addresses in a lookup table and reuse for
  2117. // subsequent writes to the same destination as well as reversing the
  2118. // conversion on reads.
  2119. //
  2120. // If synthesize fails, fall back to trying the original address.
  2121. // The netip.Addr type is used as the map key and the input address is
  2122. // assumed to be of the type *net.UDPAddr. This allows for more efficient
  2123. // lookup operations vs. a string key and parsing the input address via
  2124. // addr.String()/net.SplitHostPort().
  2125. udpAddr, ok := addr.(*net.UDPAddr)
  2126. if !ok {
  2127. return 0, errors.Tracef("unexpected addr type: %T", addr)
  2128. }
  2129. // Stack allocate to avoid an extra heap allocation per write.
  2130. var synthesizedAddr net.UDPAddr
  2131. if udpAddr.IP.To4() != nil {
  2132. ip4Addr, ok := netip.AddrFromSlice(udpAddr.IP)
  2133. if !ok {
  2134. return 0, errors.Tracef("invalid addr")
  2135. }
  2136. conn.synthesizerMutex.Lock()
  2137. synthesizedIP, ok := conn.ipv4ToIPv6[ip4Addr]
  2138. conn.synthesizerMutex.Unlock()
  2139. if ok {
  2140. synthesizedAddr = net.UDPAddr{IP: synthesizedIP, Port: udpAddr.Port}
  2141. } else {
  2142. synthesized := conn.ipv6Synthesizer.IPv6Synthesize(udpAddr.IP.String())
  2143. if synthesized != "" {
  2144. synthesizedIP := net.ParseIP(synthesized)
  2145. if synthesizedIP != nil {
  2146. conn.synthesizerMutex.Lock()
  2147. conn.ipv4ToIPv6[ip4Addr] = synthesizedIP
  2148. ipv6Addr, _ := netip.AddrFromSlice(synthesizedIP)
  2149. conn.ipv6ToIPv4[ipv6Addr] = udpAddr.IP
  2150. conn.synthesizerMutex.Unlock()
  2151. synthesizedAddr = net.UDPAddr{IP: synthesizedIP, Port: udpAddr.Port}
  2152. }
  2153. }
  2154. }
  2155. }
  2156. if synthesizedAddr.IP == nil {
  2157. // Do not wrap any I/O err returned by UDPConn
  2158. return conn.udpConn.WriteTo(b, addr)
  2159. }
  2160. return conn.udpConn.WriteTo(b, &synthesizedAddr)
  2161. }
  2162. func (conn *inproxyUDPConn) Close() error {
  2163. // Do not wrap any I/O err returned by UDPConn
  2164. return conn.udpConn.Close()
  2165. }
  2166. func (conn *inproxyUDPConn) LocalAddr() net.Addr {
  2167. // Do not wrap any I/O err returned by UDPConn
  2168. return conn.udpConn.LocalAddr()
  2169. }
  2170. func (conn *inproxyUDPConn) SetDeadline(t time.Time) error {
  2171. // Do not wrap any I/O err returned by UDPConn
  2172. return conn.udpConn.SetDeadline(t)
  2173. }
  2174. func (conn *inproxyUDPConn) SetReadDeadline(t time.Time) error {
  2175. // Do not wrap any I/O err returned by UDPConn
  2176. return conn.udpConn.SetReadDeadline(t)
  2177. }
  2178. func (conn *inproxyUDPConn) SetWriteDeadline(t time.Time) error {
  2179. // Do not wrap any I/O err returned by UDPConn
  2180. return conn.udpConn.SetWriteDeadline(t)
  2181. }
  2182. // getInproxyNetworkType converts a legacy string network type to an inproxy
  2183. // package type.
  2184. func getInproxyNetworkType(networkType string) inproxy.NetworkType {
  2185. // There is no VPN type conversion; clients and proxies will skip/fail
  2186. // in-proxy operations on non-Psiphon VPN networks.
  2187. switch networkType {
  2188. case "WIFI":
  2189. return inproxy.NetworkTypeWiFi
  2190. case "MOBILE":
  2191. return inproxy.NetworkTypeMobile
  2192. case "WIRED":
  2193. return inproxy.NetworkTypeWired
  2194. case "VPN":
  2195. return inproxy.NetworkTypeVPN
  2196. }
  2197. return inproxy.NetworkTypeUnknown
  2198. }