broker.go 74 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "context"
  22. std_errors "errors"
  23. "fmt"
  24. "net"
  25. "strconv"
  26. "sync"
  27. "sync/atomic"
  28. "time"
  29. "github.com/Psiphon-Labs/consistent"
  30. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  31. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  32. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  33. "github.com/cespare/xxhash"
  34. lrucache "github.com/cognusion/go-cache-lru"
  35. "github.com/fxamacker/cbor/v2"
  36. "golang.org/x/time/rate"
  37. )
  38. const (
  39. // BrokerMaxRequestBodySize is the maximum request size, that should be
  40. // enforced by the provided broker transport.
  41. BrokerMaxRequestBodySize = 65536
  42. // BrokerEndPointName is the standard name for referencing an endpoint
  43. // that services broker requests.
  44. BrokerEndPointName = "inproxy-broker"
  45. brokerProxyAnnounceTimeout = 2 * time.Minute
  46. brokerClientOfferTimeout = 10 * time.Second
  47. brokerPendingServerReportsTTL = 60 * time.Second
  48. brokerPendingServerReportsMaxSize = 100000
  49. brokerMetricName = "inproxy_broker"
  50. brokerRateLimiterReapHistoryFrequencySeconds = 300
  51. brokerRateLimiterMaxCacheEntries = 1000000
  52. )
  53. // LookupGeoIP is a callback for providing GeoIP lookup service.
  54. type LookupGeoIP func(IP string) common.GeoIPData
  55. // ExtendTransportTimeout is a callback that extends the timeout for a
  56. // server-side broker transport handler, facilitating request-specific
  57. // timeouts including long-polling for proxy announcements.
  58. type ExtendTransportTimeout func(timeout time.Duration)
  59. // GetTacticsPayload is a callback which returns the appropriate tactics
  60. // payload for the specified client/proxy GeoIP data and API parameters.
  61. type GetTacticsPayload func(
  62. geoIPData common.GeoIPData,
  63. apiParams common.APIParameters) ([]byte, string, error)
  64. // RelayDSLRequest is a callback that provides DSL request relaying. The
  65. // callback must always return a response payload, even in the case of an
  66. // error. See dsl.Relay.HandleRequest.
  67. type RelayDSLRequest func(
  68. ctx context.Context,
  69. extendTimeout ExtendTransportTimeout,
  70. clientIP string,
  71. clientGeoIPData common.GeoIPData,
  72. requestPayload []byte) ([]byte, error)
  73. // Broker is the in-proxy broker component, which matches clients and proxies
  74. // and provides WebRTC signaling functionalty.
  75. //
  76. // Both clients and proxies send requests to the broker to obtain matches and
  77. // exchange WebRTC SDPs. Broker does not implement a transport or obfuscation
  78. // layer; instead that is provided by the HandleSessionPacket caller. A
  79. // typical implementation would provide a domain fronted web server which
  80. // runs a Broker and calls Broker.HandleSessionPacket to handle web requests
  81. // encapsulating secure session packets.
  82. type Broker struct {
  83. config *BrokerConfig
  84. brokerID ID
  85. initiatorSessions *InitiatorSessions
  86. responderSessions *ResponderSessions
  87. matcher *Matcher
  88. pendingServerReports *lrucache.Cache
  89. proxyQualityState *ProxyQualityState
  90. knownServerInitiatorIDs sync.Map
  91. commonCompartmentsMutex sync.Mutex
  92. commonCompartments *consistent.Consistent
  93. proxyAnnounceTimeout atomic.Int64
  94. clientOfferTimeout atomic.Int64
  95. clientOfferPersonalTimeout atomic.Int64
  96. pendingServerReportsTTL atomic.Int64
  97. maxRequestTimeouts atomic.Value
  98. maxCompartmentIDs atomic.Int64
  99. enableProxyQualityMutex sync.Mutex
  100. enableProxyQuality atomic.Bool
  101. dslRequestRateLimiters *lrucache.Cache
  102. dslRequestRateLimitParams atomic.Value
  103. }
  104. // BrokerConfig specifies the configuration for a Broker.
  105. type BrokerConfig struct {
  106. // Logger is used to log events.
  107. Logger common.Logger
  108. // CommonCompartmentIDs is a list of common compartment IDs to apply to
  109. // proxies that announce without personal compartment ID. Common
  110. // compartment IDs are managed by Psiphon and distributed to clients via
  111. // tactics or embedded in OSLs. Clients must supply a valid compartment
  112. // ID to match with a proxy.
  113. //
  114. // A BrokerConfig must supply at least one compartment ID, or
  115. // SetCompartmentIDs must be called with at least one compartment ID
  116. // before calling Start.
  117. //
  118. // When only one, single common compartment ID is configured, it can serve
  119. // as an (obfuscation) secret that clients must obtain, via tactics, to
  120. // enable in-proxy participation.
  121. CommonCompartmentIDs []ID
  122. // AllowProxy is a callback which can indicate whether a proxy with the
  123. // given GeoIP data is allowed to match with common compartment ID
  124. // clients. Proxies with personal compartment IDs are always allowed.
  125. AllowProxy func(common.GeoIPData) bool
  126. // PrioritizeProxy is a callback which can indicate whether proxy
  127. // announcements from proxies with the specified in-proxy protocol
  128. // version, GeoIPData, and APIParameters should be prioritized in the
  129. // matcher queue. Priority proxy announcements match ahead of other proxy
  130. // announcements, regardless of announcement age/deadline. Priority
  131. // status takes precedence over preferred NAT matching. Prioritization
  132. // applies only to common compartment IDs and not personal pairing mode.
  133. PrioritizeProxy func(int, common.GeoIPData, common.APIParameters) bool
  134. // AllowClient is a callback which can indicate whether a client with the
  135. // given GeoIP data is allowed to match with common compartment ID
  136. // proxies. Clients are always allowed to match based on personal
  137. // compartment ID.
  138. AllowClient func(common.GeoIPData) bool
  139. // AllowDomainFrontedDestinations is a callback which can indicate whether
  140. // a client with the given GeoIP data is allowed to specify a proxied
  141. // destination for a domain fronted protocol. When false, only direct
  142. // address destinations are allowed.
  143. //
  144. // While tactics may may be set to instruct clients to use only direct
  145. // server tunnel protocols, with IP address destinations, this callback
  146. // adds server-side enforcement.
  147. AllowDomainFrontedDestinations func(common.GeoIPData) bool
  148. // AllowMatch is a callback which can indicate whether a proxy and client
  149. // pair, with the given, respective GeoIP data, is allowed to match
  150. // together. Pairs are always allowed to match based on personal
  151. // compartment ID.
  152. AllowMatch func(common.GeoIPData, common.GeoIPData) bool
  153. // LookupGeoIP provides GeoIP lookup service.
  154. LookupGeoIP LookupGeoIP
  155. // APIParameterValidator is a callback that validates base API metrics.
  156. APIParameterValidator common.APIParameterValidator
  157. // APIParameterValidator is a callback that formats base API metrics.
  158. APIParameterLogFieldFormatter common.APIParameterLogFieldFormatter
  159. // GetTacticsPayload provides a tactics lookup service.
  160. GetTacticsPayload GetTacticsPayload
  161. // IsValidServerEntryTag is a callback which checks if the specified
  162. // server entry tag is on the list of valid and active Psiphon server
  163. // entry tags.
  164. IsValidServerEntryTag func(serverEntryTag string) bool
  165. // IsLoadLimiting is a callback which checks if the broker process is in a
  166. // load limiting state, where consumed resources, including allocated
  167. // system memory and CPU load, exceed determined thresholds. When load
  168. // limiting is indicated, the broker will attempt to reduce load by
  169. // immediately rejecting either proxy announces or client offers,
  170. // depending on the state of the corresponding queues.
  171. IsLoadLimiting func() bool
  172. // RelayDSLRequest provides DSL request relay support.
  173. //
  174. // RelayDSLRequest must be set; return an error if no DSL relay is
  175. // configured.
  176. RelayDSLRequest RelayDSLRequest
  177. // PrivateKey is the broker's secure session long term private key.
  178. PrivateKey SessionPrivateKey
  179. // ObfuscationRootSecret broker's secure session long term obfuscation key.
  180. ObfuscationRootSecret ObfuscationSecret
  181. // ServerEntrySignaturePublicKey is the key used to verify Psiphon server
  182. // entry signatures.
  183. ServerEntrySignaturePublicKey string
  184. // These timeout parameters may be used to override defaults.
  185. ProxyAnnounceTimeout time.Duration
  186. ClientOfferTimeout time.Duration
  187. ClientOfferPersonalTimeout time.Duration
  188. PendingServerReportsTTL time.Duration
  189. // Announcement queue limit configuration.
  190. MatcherAnnouncementLimitEntryCount int
  191. MatcherAnnouncementRateLimitQuantity int
  192. MatcherAnnouncementRateLimitInterval time.Duration
  193. MatcherAnnouncementNonlimitedProxyIDs []ID
  194. // Offer queue limit configuration.
  195. MatcherOfferLimitEntryCount int
  196. MatcherOfferRateLimitQuantity int
  197. MatcherOfferRateLimitInterval time.Duration
  198. // DSL request relay rate limit configuration.
  199. DSLRequestRateLimitQuantity int
  200. DSLRequestRateLimitInterval time.Duration
  201. // MaxCompartmentIDs specifies the maximum number of compartment IDs that
  202. // can be included, per list, in one request. If 0, the value
  203. // MaxCompartmentIDs is used.
  204. MaxCompartmentIDs int
  205. }
  206. // BrokerLoggedEvent is an error type which indicates that the broker has
  207. // already logged an event recording the underlying error. This may be used
  208. // by the outer server layer to avoid redundant logs for HandleSessionPacket
  209. // errors.
  210. type BrokerLoggedEvent struct {
  211. err error
  212. }
  213. func NewBrokerLoggedEvent(err error) *BrokerLoggedEvent {
  214. return &BrokerLoggedEvent{err: err}
  215. }
  216. func (e *BrokerLoggedEvent) Error() string {
  217. return e.err.Error()
  218. }
  219. // NewBroker initializes a new Broker.
  220. func NewBroker(config *BrokerConfig) (*Broker, error) {
  221. // initiatorSessions are secure sessions initiated by the broker and used
  222. // to send BrokerServerReports to servers. The servers will be
  223. // configured to establish sessions only with brokers with specified
  224. // public keys.
  225. initiatorSessions := NewInitiatorSessions(config.PrivateKey)
  226. // responderSessions are secure sessions initiated by clients and proxies
  227. // and used to send requests to the broker. Clients and proxies are
  228. // configured to establish sessions only with specified broker public keys.
  229. responderSessions, err := NewResponderSessions(
  230. config.PrivateKey, config.ObfuscationRootSecret)
  231. if err != nil {
  232. return nil, errors.Trace(err)
  233. }
  234. // The broker ID is the broker's session public key in Curve25519 form.
  235. publicKey, err := config.PrivateKey.GetPublicKey()
  236. if err != nil {
  237. return nil, errors.Trace(err)
  238. }
  239. brokerID, err := publicKey.ToCurve25519()
  240. if err != nil {
  241. return nil, errors.Trace(err)
  242. }
  243. proxyQuality := NewProxyQuality()
  244. b := &Broker{
  245. config: config,
  246. brokerID: ID(brokerID),
  247. initiatorSessions: initiatorSessions,
  248. responderSessions: responderSessions,
  249. matcher: NewMatcher(&MatcherConfig{
  250. Logger: config.Logger,
  251. AnnouncementLimitEntryCount: config.MatcherAnnouncementLimitEntryCount,
  252. AnnouncementRateLimitQuantity: config.MatcherAnnouncementRateLimitQuantity,
  253. AnnouncementRateLimitInterval: config.MatcherAnnouncementRateLimitInterval,
  254. AnnouncementNonlimitedProxyIDs: config.MatcherAnnouncementNonlimitedProxyIDs,
  255. OfferLimitEntryCount: config.MatcherOfferLimitEntryCount,
  256. OfferRateLimitQuantity: config.MatcherOfferRateLimitQuantity,
  257. OfferRateLimitInterval: config.MatcherOfferRateLimitInterval,
  258. ProxyQualityState: proxyQuality,
  259. IsLoadLimiting: config.IsLoadLimiting,
  260. AllowMatch: config.AllowMatch,
  261. }),
  262. proxyQualityState: proxyQuality,
  263. dslRequestRateLimiters: lrucache.NewWithLRU(
  264. 0,
  265. time.Duration(brokerRateLimiterReapHistoryFrequencySeconds)*time.Second,
  266. brokerRateLimiterMaxCacheEntries),
  267. }
  268. b.pendingServerReports = lrucache.NewWithLRU(
  269. common.ValueOrDefault(config.PendingServerReportsTTL, brokerPendingServerReportsTTL),
  270. 1*time.Minute,
  271. brokerPendingServerReportsMaxSize)
  272. b.proxyAnnounceTimeout.Store(int64(config.ProxyAnnounceTimeout))
  273. b.clientOfferTimeout.Store(int64(config.ClientOfferTimeout))
  274. b.clientOfferPersonalTimeout.Store(int64(config.ClientOfferPersonalTimeout))
  275. b.pendingServerReportsTTL.Store(int64(config.PendingServerReportsTTL))
  276. b.maxCompartmentIDs.Store(
  277. int64(common.ValueOrDefault(config.MaxCompartmentIDs, MaxCompartmentIDs)))
  278. b.dslRequestRateLimitParams.Store(
  279. &brokerRateLimitParams{
  280. quantity: config.DSLRequestRateLimitQuantity,
  281. interval: config.DSLRequestRateLimitInterval,
  282. })
  283. if len(config.CommonCompartmentIDs) > 0 {
  284. err = b.initializeCommonCompartmentIDHashing(config.CommonCompartmentIDs)
  285. if err != nil {
  286. return nil, errors.Trace(err)
  287. }
  288. }
  289. return b, nil
  290. }
  291. func (b *Broker) Start() error {
  292. if !b.isCommonCompartmentIDHashingInitialized() {
  293. return errors.TraceNew("missing common compartment IDs")
  294. }
  295. return errors.Trace(b.matcher.Start())
  296. }
  297. func (b *Broker) Stop() {
  298. b.matcher.Stop()
  299. }
  300. // SetCommonCompartmentIDs sets a new list of common compartment IDs,
  301. // replacing the previous configuration.
  302. func (b *Broker) SetCommonCompartmentIDs(commonCompartmentIDs []ID) error {
  303. // TODO: initializeCommonCompartmentIDHashing is called regardless whether
  304. // commonCompartmentIDs changes the previous configuration. To avoid the
  305. // overhead of consistent hashing initialization in
  306. // initializeCommonCompartmentIDHashing, add a mechanism to first quickly
  307. // check for changes?
  308. return errors.Trace(b.initializeCommonCompartmentIDHashing(commonCompartmentIDs))
  309. }
  310. // SetTimeouts sets new timeout values, replacing the previous configuration.
  311. // New timeout values do not apply to currently active announcement or offer
  312. // requests.
  313. func (b *Broker) SetTimeouts(
  314. proxyAnnounceTimeout time.Duration,
  315. clientOfferTimeout time.Duration,
  316. clientOfferPersonalTimeout time.Duration,
  317. pendingServerReportsTTL time.Duration,
  318. maxRequestTimeouts map[string]time.Duration) {
  319. b.proxyAnnounceTimeout.Store(int64(proxyAnnounceTimeout))
  320. b.clientOfferTimeout.Store(int64(clientOfferTimeout))
  321. b.clientOfferPersonalTimeout.Store(int64(clientOfferPersonalTimeout))
  322. b.pendingServerReportsTTL.Store(int64(pendingServerReportsTTL))
  323. b.maxRequestTimeouts.Store(maxRequestTimeouts)
  324. }
  325. // SetLimits sets new queue limit values, replacing the previous
  326. // configuration. New limits are only partially applied to existing queue
  327. // states; see Matcher.SetLimits.
  328. func (b *Broker) SetLimits(
  329. matcherAnnouncementLimitEntryCount int,
  330. matcherAnnouncementRateLimitQuantity int,
  331. matcherAnnouncementRateLimitInterval time.Duration,
  332. matcherAnnouncementNonlimitedProxyIDs []ID,
  333. matcherOfferLimitEntryCount int,
  334. matcherOfferRateLimitQuantity int,
  335. matcherOfferRateLimitInterval time.Duration,
  336. matcherOfferMinimumDeadline time.Duration,
  337. maxCompartmentIDs int,
  338. dslRequestRateLimitQuantity int,
  339. dslRequestRateLimitInterval time.Duration) {
  340. b.matcher.SetLimits(
  341. matcherAnnouncementLimitEntryCount,
  342. matcherAnnouncementRateLimitQuantity,
  343. matcherAnnouncementRateLimitInterval,
  344. matcherAnnouncementNonlimitedProxyIDs,
  345. matcherOfferLimitEntryCount,
  346. matcherOfferRateLimitQuantity,
  347. matcherOfferRateLimitInterval,
  348. matcherOfferMinimumDeadline)
  349. b.maxCompartmentIDs.Store(
  350. int64(common.ValueOrDefault(maxCompartmentIDs, MaxCompartmentIDs)))
  351. b.dslRequestRateLimitParams.Store(
  352. &brokerRateLimitParams{
  353. quantity: dslRequestRateLimitQuantity,
  354. interval: dslRequestRateLimitInterval,
  355. })
  356. }
  357. func (b *Broker) SetProxyQualityParameters(
  358. enable bool,
  359. qualityTTL time.Duration,
  360. pendingFailedMatchDeadline time.Duration,
  361. failedMatchThreshold int) {
  362. // enableProxyQuality is an atomic for fast lookups in request handlers;
  363. // an additional mutex is used here to ensure the Swap/Flush combination
  364. // is also atomic.
  365. b.enableProxyQualityMutex.Lock()
  366. wasEnabled := b.enableProxyQuality.Swap(enable)
  367. if wasEnabled && !enable {
  368. // Flush quality state, since otherwise the quality TTL can retain
  369. // quality data which may be unexpectedly reactivated when enable is
  370. // toggled on again.
  371. b.proxyQualityState.Flush()
  372. }
  373. b.enableProxyQualityMutex.Unlock()
  374. b.proxyQualityState.SetParameters(
  375. qualityTTL,
  376. pendingFailedMatchDeadline,
  377. failedMatchThreshold)
  378. }
  379. // HandleSessionPacket handles a session packet from a client or proxy and
  380. // provides a response packet. The packet is part of a secure session and may
  381. // be a session handshake message, an expired session reset token, or a
  382. // session-wrapped request payload. Request payloads are routed to API
  383. // request endpoints.
  384. //
  385. // The caller is expected to provide a transport obfuscation layer, such as
  386. // domain fronted HTTPs. The session has an obfuscation layer that ensures
  387. // that packets are fully random, randomly padded, and cannot be replayed.
  388. // This makes session packets suitable to embed as plaintext in some
  389. // transports.
  390. //
  391. // The caller is responsible for rate limiting and enforcing timeouts and
  392. // maximum payload size checks.
  393. //
  394. // Secure sessions support multiplexing concurrent requests, as long as the
  395. // provided transport, for example HTTP/2, supports this as well.
  396. //
  397. // The input ctx should be canceled if the client/proxy disconnects from the
  398. // transport while HandleSessionPacket is running, since long-polling proxy
  399. // announcement requests will otherwise remain blocked until eventual
  400. // timeout; net/http does this.
  401. //
  402. // When HandleSessionPacket returns an error, the transport provider should
  403. // apply anti-probing mechanisms, as the client/proxy may be a prober or
  404. // scanner.
  405. func (b *Broker) HandleSessionPacket(
  406. ctx context.Context,
  407. extendTransportTimeout ExtendTransportTimeout,
  408. transportLogFields common.LogFields,
  409. brokerClientIP string,
  410. geoIPData common.GeoIPData,
  411. inPacket []byte) ([]byte, error) {
  412. // handleUnwrappedRequest handles requests after session unwrapping.
  413. // responderSessions.HandlePacket handles both session establishment and
  414. // request unwrapping, and invokes handleUnwrappedRequest once a session
  415. // is established and a valid request unwrapped.
  416. handleUnwrappedRequest := func(initiatorID ID, unwrappedRequestPayload []byte) ([]byte, error) {
  417. recordType, err := peekRecordPreambleType(unwrappedRequestPayload)
  418. if err != nil {
  419. return nil, errors.Trace(err)
  420. }
  421. var responsePayload []byte
  422. switch recordType {
  423. case recordTypeAPIProxyAnnounceRequest:
  424. responsePayload, err = b.handleProxyAnnounce(
  425. ctx,
  426. extendTransportTimeout,
  427. transportLogFields,
  428. brokerClientIP,
  429. geoIPData,
  430. initiatorID,
  431. unwrappedRequestPayload)
  432. if err != nil {
  433. return nil, errors.Trace(err)
  434. }
  435. case recordTypeAPIProxyAnswerRequest:
  436. responsePayload, err = b.handleProxyAnswer(
  437. ctx,
  438. extendTransportTimeout,
  439. transportLogFields,
  440. brokerClientIP,
  441. geoIPData,
  442. initiatorID,
  443. unwrappedRequestPayload)
  444. if err != nil {
  445. return nil, errors.Trace(err)
  446. }
  447. case recordTypeAPIClientOfferRequest:
  448. responsePayload, err = b.handleClientOffer(
  449. ctx,
  450. extendTransportTimeout,
  451. transportLogFields,
  452. brokerClientIP,
  453. geoIPData,
  454. initiatorID,
  455. unwrappedRequestPayload)
  456. if err != nil {
  457. return nil, errors.Trace(err)
  458. }
  459. case recordTypeAPIServerProxyQualityRequest:
  460. responsePayload, err = b.handleServerProxyQuality(
  461. ctx,
  462. extendTransportTimeout,
  463. transportLogFields,
  464. brokerClientIP,
  465. geoIPData,
  466. initiatorID,
  467. unwrappedRequestPayload)
  468. if err != nil {
  469. return nil, errors.Trace(err)
  470. }
  471. case recordTypeAPIClientRelayedPacketRequest:
  472. responsePayload, err = b.handleClientRelayedPacket(
  473. ctx,
  474. extendTransportTimeout,
  475. transportLogFields,
  476. geoIPData,
  477. initiatorID,
  478. unwrappedRequestPayload)
  479. if err != nil {
  480. return nil, errors.Trace(err)
  481. }
  482. case recordTypeAPIClientDSLRequest:
  483. responsePayload, err = b.handleClientDSL(
  484. ctx,
  485. extendTransportTimeout,
  486. transportLogFields,
  487. brokerClientIP,
  488. geoIPData,
  489. initiatorID,
  490. unwrappedRequestPayload)
  491. if err != nil {
  492. return nil, errors.Trace(err)
  493. }
  494. default:
  495. return nil, errors.Tracef("unexpected API record type %v", recordType)
  496. }
  497. return responsePayload, nil
  498. }
  499. // HandlePacket returns both a packet and an error in the expired session
  500. // reset token case. Log the error here, clear it, and return the
  501. // packet to be relayed back to the broker client.
  502. outPacket, err := b.responderSessions.HandlePacket(
  503. inPacket, handleUnwrappedRequest)
  504. if err != nil {
  505. if outPacket == nil {
  506. return nil, errors.Trace(err)
  507. }
  508. b.config.Logger.WithTraceFields(common.LogFields{"error": err}).Warning(
  509. "HandlePacket returned packet and error")
  510. }
  511. return outPacket, nil
  512. }
  513. // handleProxyAnnounce receives a proxy announcement, awaits a matching
  514. // client, and returns the client offer in the response. handleProxyAnnounce
  515. // has a long timeout so this request can idle until a matching client
  516. // arrives.
  517. func (b *Broker) handleProxyAnnounce(
  518. ctx context.Context,
  519. extendTransportTimeout ExtendTransportTimeout,
  520. transportLogFields common.LogFields,
  521. proxyIP string,
  522. geoIPData common.GeoIPData,
  523. initiatorID ID,
  524. requestPayload []byte) (retResponse []byte, retErr error) {
  525. startTime := time.Now()
  526. var logFields common.LogFields
  527. var isPriority bool
  528. var newTacticsTag string
  529. var clientOffer *MatchOffer
  530. var matchMetrics *MatchMetrics
  531. var timedOut bool
  532. var limitedErr error
  533. // As a future enhancement, a broker could initiate its own test
  534. // connection to the proxy to verify its effectiveness, including
  535. // simulating a symmetric NAT client.
  536. // Each announcement represents availability for a single client matching.
  537. // Proxies with multiple client availability will send multiple requests.
  538. //
  539. // The announcement request and response could be extended to allow the
  540. // proxy to specify availability for multiple clients in the request, and
  541. // multiple client offers returned in the response.
  542. //
  543. // If, as we expect, proxies run on home ISPs have limited upstream
  544. // bandwidth, they will support only a couple of concurrent clients, and
  545. // the simple single-client-announcment model may be sufficient. Also, if
  546. // the transport is HTTP/2, multiple requests can be multiplexed over a
  547. // single connection (and session) in any case.
  548. // The proxy ID is an implicit parameter: it's the proxy's session public
  549. // key. As part of the session handshake, the proxy has proven that it
  550. // has the corresponding private key. Proxy IDs are logged to attribute
  551. // traffic to a specific proxy.
  552. proxyID := initiatorID
  553. // Generate a connection ID. This ID is used to associate proxy
  554. // announcments, client offers, and proxy answers, as well as associating
  555. // Psiphon tunnels with in-proxy pairings.
  556. connectionID, err := MakeID()
  557. if err != nil {
  558. return nil, errors.Trace(err)
  559. }
  560. // Always log the outcome.
  561. defer func() {
  562. if logFields == nil {
  563. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  564. }
  565. logFields["broker_event"] = "proxy-announce"
  566. logFields["broker_id"] = b.brokerID
  567. logFields["proxy_id"] = proxyID
  568. logFields["is_priority"] = isPriority
  569. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  570. logFields["connection_id"] = connectionID
  571. if newTacticsTag != "" {
  572. logFields["new_tactics_tag"] = newTacticsTag
  573. }
  574. if clientOffer != nil {
  575. // Log the target Psiphon server ID (diagnostic ID). The presence
  576. // of this field indicates that a match was made.
  577. logFields["destination_server_id"] = clientOffer.DestinationServerID
  578. logFields["use_media_streams"] = clientOffer.UseMediaStreams
  579. }
  580. if timedOut {
  581. logFields["timed_out"] = true
  582. }
  583. if retErr != nil {
  584. logFields["error"] = retErr.Error()
  585. } else if limitedErr != nil {
  586. logFields["error"] = limitedErr.Error()
  587. }
  588. logFields.Add(transportLogFields)
  589. logFields.Add(matchMetrics.GetMetrics())
  590. b.config.Logger.LogMetric(brokerMetricName, logFields)
  591. if retErr != nil {
  592. retErr = NewBrokerLoggedEvent(retErr)
  593. }
  594. }()
  595. announceRequest, err := UnmarshalProxyAnnounceRequest(requestPayload)
  596. if err != nil {
  597. return nil, errors.Trace(err)
  598. }
  599. var apiParams common.APIParameters
  600. apiParams, logFields, err = announceRequest.ValidateAndGetParametersAndLogFields(
  601. int(b.maxCompartmentIDs.Load()),
  602. b.config.APIParameterValidator,
  603. b.config.APIParameterLogFieldFormatter,
  604. geoIPData)
  605. if err != nil {
  606. return nil, errors.Trace(err)
  607. }
  608. hasPersonalCompartmentIDs := len(announceRequest.PersonalCompartmentIDs) > 0
  609. // Return MustUpgrade when the proxy's protocol version is less than the
  610. // minimum required.
  611. if announceRequest.Metrics.ProtocolVersion < minimumProxyProtocolVersion {
  612. responsePayload, err := MarshalProxyAnnounceResponse(
  613. &ProxyAnnounceResponse{
  614. NoMatch: true,
  615. MustUpgrade: true,
  616. })
  617. if err != nil {
  618. return nil, errors.Trace(err)
  619. }
  620. return responsePayload, nil
  621. }
  622. // Fetch new tactics for the proxy, if required, using the tactics tag
  623. // that should be included with the API parameters. A tacticsPayload may
  624. // be returned when there are no new tactics, and this is relayed back to
  625. // the proxy, after matching, so that it can extend the TTL for its
  626. // existing, cached tactics. In the case where tactics have changed,
  627. // don't enqueue the proxy announcement and return no-match so that the
  628. // proxy can store and apply the new tactics before announcing again.
  629. //
  630. // For PreCheckTactics requests, an immediate no-match response is
  631. // returned even when there are no new tactics.
  632. var tacticsPayload []byte
  633. if announceRequest.CheckTactics || announceRequest.PreCheckTactics {
  634. tacticsPayload, newTacticsTag, err =
  635. b.config.GetTacticsPayload(geoIPData, apiParams)
  636. if err != nil {
  637. return nil, errors.Trace(err)
  638. }
  639. if (tacticsPayload != nil && newTacticsTag != "") ||
  640. announceRequest.PreCheckTactics {
  641. responsePayload, err := MarshalProxyAnnounceResponse(
  642. &ProxyAnnounceResponse{
  643. TacticsPayload: tacticsPayload,
  644. NoMatch: true,
  645. })
  646. if err != nil {
  647. return nil, errors.Trace(err)
  648. }
  649. return responsePayload, nil
  650. }
  651. }
  652. // AllowProxy may be used to disallow proxies from certain geolocations,
  653. // such as censored locations, from announcing. Proxies with personal
  654. // compartment IDs are always allowed, as they will be used only by
  655. // clients specifically configured to use them.
  656. //
  657. // AllowProxy is not enforced until after CheckTactics/PreCheckTactics
  658. // cases, which may return an immediate response. This allows proxies to
  659. // download new tactics that may set AllowProxy, which well-behaved
  660. // proxies can enforce locally as well.
  661. if !hasPersonalCompartmentIDs &&
  662. !b.config.AllowProxy(geoIPData) {
  663. // Send a "limited" response so the proxy backs off.
  664. limitedErr = errors.TraceNew("proxy disallowed")
  665. responsePayload, err := MarshalProxyAnnounceResponse(
  666. &ProxyAnnounceResponse{Limited: true})
  667. if err != nil {
  668. return nil, errors.Trace(err)
  669. }
  670. return responsePayload, nil
  671. }
  672. // Assign this proxy to a common compartment ID, unless it has specified a
  673. // dedicated, personal compartment ID. Assignment uses consistent hashing
  674. // keyed with the proxy ID, in an effort to keep proxies consistently
  675. // assigned to the same compartment.
  676. var commonCompartmentIDs []ID
  677. if !hasPersonalCompartmentIDs {
  678. compartmentID, err := b.selectCommonCompartmentID(proxyID)
  679. if err != nil {
  680. return nil, errors.Trace(err)
  681. }
  682. commonCompartmentIDs = []ID{compartmentID}
  683. }
  684. // Determine whether to enqueue the proxy announcement in the priority
  685. // queue. To be prioritized, a proxy, identified by its ID and ASN, must
  686. // have a recent quality tunnel recorded in the quality state. In
  687. // addition, when the PrioritizeProxy callback is set, invoke this
  688. // additional condition, which can filter by proxy geolocation and other
  689. // properties.
  690. //
  691. // There is no prioritization for personal pairing announcements.
  692. // Potential future enhancements:
  693. //
  694. // - For a proxy with unknown quality (neither reported quality tunnels,
  695. // nor known failed matches), prioritize with some low probability to
  696. // give unknown proxies a chance to qualify? This could be limited, for
  697. // example, to proxies in the same ASN as other quality proxies. To
  698. // implement this, ProxyQualityState would need to record proxy IDs
  699. // with failed matches; and proxy ASNs would need to be input to
  700. // ProxyQualityState.
  701. //
  702. // - Consider using the Psiphon server region, as given in the signed
  703. // server entry, as part of the prioritization logic.
  704. if !hasPersonalCompartmentIDs && b.enableProxyQuality.Load() {
  705. // Here, no specific client ASN is specified for HasQuality. As long
  706. // as a proxy has a quality tunnel for any client ASN, it is
  707. // prioritized. In the matching process, an attempt is made to match
  708. // using HasQuality using the client ASN. See Matcher.matchOffer.
  709. isPriority = b.proxyQualityState.HasQuality(proxyID, geoIPData.ASN, "")
  710. }
  711. if isPriority && b.config.PrioritizeProxy != nil {
  712. // Note that, in the psiphon/server package, inproxyBrokerPrioritizeProxy
  713. // is always wired up, and, as currently implemented, the default value for
  714. // the InproxyBrokerMatcherPrioritizeProxiesFilter tactics parameter
  715. // results in PrioritizeProxy always returning false. Some filter,
  716. // even just a wildcard match, must be configured in order to prioritize.
  717. // Limitation: Of the two return values from
  718. // ValidateAndGetParametersAndLogFields, apiParams and logFields,
  719. // only logFields contains fields such as max_clients
  720. // and *_bytes_per_second, and so these cannot be part of any
  721. // filtering performed by the PrioritizeProxy callback.
  722. //
  723. // TODO: include the additional fields in logFields. Since the
  724. // logFields return value is the output of server.getRequestLogFields
  725. // processing, it's not safe to use it directly. In addition,
  726. // filtering by fields such as max_clients and *_bytes_per_second
  727. // calls for range filtering, which is not yet supported in the
  728. // psiphon/server.MeekServer PrioritizeProxy provider.
  729. isPriority = b.config.PrioritizeProxy(
  730. int(announceRequest.Metrics.ProtocolVersion), geoIPData, apiParams)
  731. }
  732. // Await client offer.
  733. timeout := common.ValueOrDefault(
  734. time.Duration(b.proxyAnnounceTimeout.Load()),
  735. brokerProxyAnnounceTimeout)
  736. // Adjust the timeout to respect any shorter maximum request timeouts for
  737. // the fronting provider.
  738. timeout = b.adjustRequestTimeout(logFields, timeout)
  739. announceCtx, cancelFunc := context.WithTimeout(ctx, timeout)
  740. defer cancelFunc()
  741. extendTransportTimeout(timeout)
  742. // Note that matcher.Announce assumes a monotonically increasing
  743. // announceCtx.Deadline input for each successive call.
  744. clientOffer, matchMetrics, err = b.matcher.Announce(
  745. announceCtx,
  746. proxyIP,
  747. &MatchAnnouncement{
  748. Properties: MatchProperties{
  749. IsPriority: isPriority,
  750. ProtocolVersion: announceRequest.Metrics.ProtocolVersion,
  751. CommonCompartmentIDs: commonCompartmentIDs,
  752. PersonalCompartmentIDs: announceRequest.PersonalCompartmentIDs,
  753. GeoIPData: geoIPData,
  754. NetworkType: GetNetworkType(announceRequest.Metrics.BaseAPIParameters),
  755. NATType: announceRequest.Metrics.NATType,
  756. PortMappingTypes: announceRequest.Metrics.PortMappingTypes,
  757. },
  758. ProxyID: initiatorID,
  759. ProxyMetrics: announceRequest.Metrics,
  760. ConnectionID: connectionID,
  761. })
  762. if err != nil {
  763. var limitError *MatcherLimitError
  764. limited := std_errors.As(err, &limitError)
  765. timeout := announceCtx.Err() == context.DeadlineExceeded
  766. if !limited && !timeout {
  767. return nil, errors.Trace(err)
  768. }
  769. // A no-match response is sent in the case of a timeout awaiting a
  770. // match. The faster-failing rate or entry limiting case also results
  771. // in a response, rather than an error return from handleProxyAnnounce,
  772. // so that the proxy doesn't receive a 404 and flag its BrokerClient as
  773. // having failed.
  774. //
  775. // When the timeout and limit case coincide, limit takes precedence in
  776. // the response.
  777. if timeout && !limited {
  778. // Note: the respective proxy and broker timeouts,
  779. // InproxyBrokerProxyAnnounceTimeout and
  780. // InproxyProxyAnnounceRequestTimeout in tactics, should be
  781. // configured so that the broker will timeout first and have an
  782. // opportunity to send this response before the proxy times out.
  783. timedOut = true
  784. } else {
  785. // Record the specific limit error in the proxy-announce broker event.
  786. limitedErr = err
  787. }
  788. responsePayload, err := MarshalProxyAnnounceResponse(
  789. &ProxyAnnounceResponse{
  790. TacticsPayload: tacticsPayload,
  791. Limited: limited,
  792. NoMatch: timeout && !limited,
  793. })
  794. if err != nil {
  795. return nil, errors.Trace(err)
  796. }
  797. return responsePayload, nil
  798. }
  799. // Select the protocol version. The matcher has already checked
  800. // negotiateProtocolVersion, so failure is not expected.
  801. negotiatedProtocolVersion, ok := negotiateProtocolVersion(
  802. announceRequest.Metrics.ProtocolVersion,
  803. clientOffer.Properties.ProtocolVersion,
  804. clientOffer.UseMediaStreams)
  805. if !ok {
  806. return nil, errors.TraceNew("unexpected negotiateProtocolVersion failure")
  807. }
  808. // Respond with the client offer. The proxy will follow up with an answer
  809. // request, which is relayed to the client, and then the WebRTC dial begins.
  810. // Limitation: as part of the client's tunnel establishment horse race, a
  811. // client may abort an in-proxy dial at any point. If the overall dial is
  812. // past the SDP exchange and aborted during the WebRTC connection
  813. // establishment, the client may leave the proxy's Proxy.proxyOneClient
  814. // dangling until timeout. Consider adding a signal from the client to
  815. // the proxy, relayed by the broker, that a dial is aborted.
  816. responsePayload, err := MarshalProxyAnnounceResponse(
  817. &ProxyAnnounceResponse{
  818. TacticsPayload: tacticsPayload,
  819. ConnectionID: connectionID,
  820. SelectedProtocolVersion: negotiatedProtocolVersion,
  821. ClientOfferSDP: clientOffer.ClientOfferSDP,
  822. ClientRootObfuscationSecret: clientOffer.ClientRootObfuscationSecret,
  823. DoDTLSRandomization: clientOffer.DoDTLSRandomization,
  824. UseMediaStreams: clientOffer.UseMediaStreams,
  825. TrafficShapingParameters: clientOffer.TrafficShapingParameters,
  826. NetworkProtocol: clientOffer.NetworkProtocol,
  827. DestinationAddress: clientOffer.DestinationAddress,
  828. ClientRegion: clientOffer.Properties.GeoIPData.Country,
  829. })
  830. if err != nil {
  831. return nil, errors.Trace(err)
  832. }
  833. // Set the "failed match" trigger, which will progress towards clearing
  834. // the quality state for this proxyID unless quality tunnels are reported
  835. // soon enough after matches. This includes failure, by the proxy, to
  836. // return an proxy answer, as well as any tunnel failures after that.
  837. //
  838. // Failures are expected even for good quality proxies, due to cases such
  839. // as the in-proxy protocol losing the client tunnel establishment horse
  840. // race. There is a threshold number of failed matches that must be
  841. // reached before a quality state is cleared.
  842. b.proxyQualityState.Matched(proxyID, geoIPData.ASN)
  843. return responsePayload, nil
  844. }
  845. // handleClientOffer receives a client offer, awaits a matching client, and
  846. // returns the proxy answer. handleClientOffer has a shorter timeout than
  847. // handleProxyAnnounce since the client has supplied an SDP with STUN hole
  848. // punches which will expire; and, in general, the client is trying to
  849. // connect immediately and is also trying other candidates.
  850. func (b *Broker) handleClientOffer(
  851. ctx context.Context,
  852. extendTransportTimeout ExtendTransportTimeout,
  853. transportLogFields common.LogFields,
  854. clientIP string,
  855. geoIPData common.GeoIPData,
  856. initiatorID ID,
  857. requestPayload []byte) (retResponse []byte, retErr error) {
  858. // As a future enhancement, consider having proxies send offer SDPs with
  859. // announcements and clients long poll to await a match and then provide
  860. // an answer. This order of operations would make sense if client demand
  861. // is high and proxy supply is lower.
  862. //
  863. // Also see comment in Proxy.proxyOneClient for other alternative
  864. // approaches.
  865. // The client's session public key is ephemeral and is not logged.
  866. startTime := time.Now()
  867. var logFields common.LogFields
  868. var serverParams *serverParams
  869. var clientMatchOffer *MatchOffer
  870. var proxyMatchAnnouncement *MatchAnnouncement
  871. var proxyAnswer *MatchAnswer
  872. var matchMetrics *MatchMetrics
  873. var timedOut bool
  874. var limitedErr error
  875. // Always log the outcome.
  876. defer func() {
  877. if logFields == nil {
  878. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  879. }
  880. logFields["broker_event"] = "client-offer"
  881. logFields["broker_id"] = b.brokerID
  882. if serverParams != nil {
  883. logFields["destination_server_id"] = serverParams.serverID
  884. }
  885. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  886. if proxyAnswer != nil {
  887. // The presence of these fields indicate that a match was made,
  888. // the proxy delivered an answer, and the client was still
  889. // waiting for it.
  890. logFields["connection_id"] = proxyAnswer.ConnectionID
  891. logFields["client_nat_type"] = clientMatchOffer.Properties.NATType
  892. logFields["client_port_mapping_types"] = clientMatchOffer.Properties.PortMappingTypes
  893. logFields["proxy_nat_type"] = proxyMatchAnnouncement.Properties.NATType
  894. logFields["proxy_port_mapping_types"] = proxyMatchAnnouncement.Properties.PortMappingTypes
  895. logFields["preferred_nat_match"] =
  896. clientMatchOffer.Properties.IsPreferredNATMatch(&proxyMatchAnnouncement.Properties)
  897. // TODO: also log proxy ice_candidate_types and has_IPv4/6; for the
  898. // client, these values are added by ValidateAndGetLogFields.
  899. }
  900. if timedOut {
  901. logFields["timed_out"] = true
  902. }
  903. if retErr != nil {
  904. logFields["error"] = retErr.Error()
  905. } else if limitedErr != nil {
  906. logFields["error"] = limitedErr.Error()
  907. }
  908. logFields.Add(transportLogFields)
  909. logFields.Add(matchMetrics.GetMetrics())
  910. b.config.Logger.LogMetric(brokerMetricName, logFields)
  911. if retErr != nil {
  912. retErr = NewBrokerLoggedEvent(retErr)
  913. }
  914. }()
  915. offerRequest, err := UnmarshalClientOfferRequest(requestPayload)
  916. if err != nil {
  917. return nil, errors.Trace(err)
  918. }
  919. // The filtered SDP is the request SDP with any invalid (bogon, unexpected
  920. // GeoIP) ICE candidates filtered out. In some cases, clients cannot
  921. // avoid submitting invalid candidates (see comment in
  922. // processSDPAddresses), so all invalid candidates are removed and the
  923. // remaining SDP is used. Filtered candidate information is logged in
  924. // logFields.
  925. //
  926. // In personal pairing mode, RFC 1918/4193 private IP addresses are
  927. // permitted in exchanged SDPs and not filtered out.
  928. var filteredSDP []byte
  929. filteredSDP, logFields, err = offerRequest.ValidateAndGetLogFields(
  930. int(b.maxCompartmentIDs.Load()),
  931. b.config.LookupGeoIP,
  932. b.config.APIParameterValidator,
  933. b.config.APIParameterLogFieldFormatter,
  934. geoIPData)
  935. if err != nil {
  936. return nil, errors.Trace(err)
  937. }
  938. hasPersonalCompartmentIDs := len(offerRequest.PersonalCompartmentIDs) > 0
  939. offerSDP := offerRequest.ClientOfferSDP
  940. offerSDP.SDP = string(filteredSDP)
  941. // AllowClient may be used to disallow clients from certain geolocations
  942. // from offering. Clients are always allowed to match proxies with shared
  943. // personal compartment IDs.
  944. if !hasPersonalCompartmentIDs &&
  945. !b.config.AllowClient(geoIPData) {
  946. // Send a "limited" response so the client retains its broker client
  947. // and doesn't retry the offer.
  948. limitedErr = errors.TraceNew("client disallowed")
  949. responsePayload, err := MarshalClientOfferResponse(
  950. &ClientOfferResponse{Limited: true})
  951. if err != nil {
  952. return nil, errors.Trace(err)
  953. }
  954. return responsePayload, nil
  955. }
  956. // Validate that the proxy destination specified by the client is a valid
  957. // dial address for a signed Psiphon server entry. This ensures a client
  958. // can't misuse a proxy to connect to arbitrary destinations.
  959. serverParams, err = b.validateDestination(
  960. geoIPData,
  961. offerRequest.PackedDestinationServerEntry,
  962. offerRequest.NetworkProtocol,
  963. offerRequest.DestinationAddress)
  964. if err != nil {
  965. // Record the specific error in the client-offer broker event.
  966. limitedErr = err
  967. // Return a response. This avoids returning a broker-client-resetting
  968. // 404 in cases including "invalid server entry tag", where a
  969. // legitimate client submits an unpruned, decommissioned server entry.
  970. responsePayload, err := MarshalClientOfferResponse(
  971. &ClientOfferResponse{Limited: true})
  972. if err != nil {
  973. return nil, errors.Trace(err)
  974. }
  975. return responsePayload, nil
  976. }
  977. // Return MustUpgrade when the client's protocol version is less than the
  978. // minimum required.
  979. if offerRequest.Metrics.ProtocolVersion < minimumClientProtocolVersion {
  980. responsePayload, err := MarshalClientOfferResponse(
  981. &ClientOfferResponse{
  982. NoMatch: true,
  983. MustUpgrade: true,
  984. })
  985. if err != nil {
  986. return nil, errors.Trace(err)
  987. }
  988. return responsePayload, nil
  989. }
  990. // Enqueue the client offer and await a proxy matching and subsequent
  991. // proxy answer.
  992. // The Client Offer timeout may be configured with a shorter value in
  993. // personal pairing mode, to facilitate a faster no-match result and
  994. // resulting broker rotation.
  995. var timeout time.Duration
  996. if hasPersonalCompartmentIDs {
  997. timeout = time.Duration(b.clientOfferPersonalTimeout.Load())
  998. } else {
  999. timeout = time.Duration(b.clientOfferTimeout.Load())
  1000. }
  1001. timeout = common.ValueOrDefault(timeout, brokerClientOfferTimeout)
  1002. // Adjust the timeout to respect any shorter maximum request timeouts for
  1003. // the fronting provider.
  1004. timeout = b.adjustRequestTimeout(logFields, timeout)
  1005. offerCtx, cancelFunc := context.WithTimeout(ctx, timeout)
  1006. defer cancelFunc()
  1007. extendTransportTimeout(timeout)
  1008. clientMatchOffer = &MatchOffer{
  1009. Properties: MatchProperties{
  1010. ProtocolVersion: offerRequest.Metrics.ProtocolVersion,
  1011. CommonCompartmentIDs: offerRequest.CommonCompartmentIDs,
  1012. PersonalCompartmentIDs: offerRequest.PersonalCompartmentIDs,
  1013. GeoIPData: geoIPData,
  1014. NetworkType: GetNetworkType(offerRequest.Metrics.BaseAPIParameters),
  1015. NATType: offerRequest.Metrics.NATType,
  1016. PortMappingTypes: offerRequest.Metrics.PortMappingTypes,
  1017. },
  1018. ClientOfferSDP: offerSDP,
  1019. ClientRootObfuscationSecret: offerRequest.ClientRootObfuscationSecret,
  1020. DoDTLSRandomization: offerRequest.DoDTLSRandomization,
  1021. UseMediaStreams: offerRequest.UseMediaStreams,
  1022. TrafficShapingParameters: offerRequest.TrafficShapingParameters,
  1023. NetworkProtocol: offerRequest.NetworkProtocol,
  1024. DestinationAddress: offerRequest.DestinationAddress,
  1025. DestinationServerID: serverParams.serverID,
  1026. }
  1027. proxyAnswer, proxyMatchAnnouncement, matchMetrics, err = b.matcher.Offer(
  1028. offerCtx,
  1029. clientIP,
  1030. clientMatchOffer)
  1031. if err != nil {
  1032. var limitError *MatcherLimitError
  1033. limited := std_errors.As(err, &limitError)
  1034. timeout := offerCtx.Err() == context.DeadlineExceeded ||
  1035. std_errors.Is(err, errOfferDropped)
  1036. // A no-match response is sent in the case of a timeout awaiting a
  1037. // match. The faster-failing rate or entry limiting case also results
  1038. // in a response, rather than an error return from handleClientOffer,
  1039. // so that the client doesn't receive a 404 and flag its BrokerClient
  1040. // as having failed.
  1041. //
  1042. // When the timeout and limit cases coincide, limit takes precedence in
  1043. // the response.
  1044. if timeout {
  1045. // Record the time out outcome in the client-offer broker event.
  1046. //
  1047. // Note: the respective client and broker timeouts,
  1048. // InproxyBrokerClientOfferTimeout and
  1049. // InproxyClientOfferRequestTimeout in tactics, should be configured
  1050. // so that the broker will timeout first and have an opportunity to
  1051. // send this response before the client times out.
  1052. //
  1053. // In the errOfferDropped case, the matcher dropped the offer due
  1054. // to age. While this is distinct from a timeout after a
  1055. // completed match, the same timed_out log field is set. The
  1056. // cases can be distinguished based on elapsed_time, as the
  1057. // dropped cases will have an elapsed_time less than
  1058. // InproxyBrokerClientOfferTimeout.
  1059. timedOut = true
  1060. }
  1061. if limited {
  1062. // Record the specific limit error in the client-offer broker event.
  1063. limitedErr = err
  1064. }
  1065. if !limited && !timeout {
  1066. // If matcher.Offer failed for some other reason, default to
  1067. // returning Limited in the response. As currently implemented,
  1068. // when clients receive the Limited flag, they will fail the
  1069. // in-proxy dial without retrying, but retain their broker client.
  1070. //
  1071. // While the matcher.Offer failure scenarios may include an
  1072. // internal error, they also include the "no answer" case where a
  1073. // proxy fails to produce an answer. For the "no answer" case,
  1074. // Limited is preferred over returning NoMatch, which can trigger
  1075. // a broker client cycle.
  1076. //
  1077. // TODO: add a new flag to signal to the client that it may retry
  1078. // in the "no answer" case.
  1079. limited = true
  1080. limitedErr = err
  1081. }
  1082. responsePayload, err := MarshalClientOfferResponse(
  1083. &ClientOfferResponse{
  1084. Limited: limited,
  1085. NoMatch: timeout && !limited,
  1086. })
  1087. if err != nil {
  1088. return nil, errors.Trace(err)
  1089. }
  1090. return responsePayload, nil
  1091. }
  1092. // Log the type of compartment matching that occurred. As
  1093. // PersonalCompartmentIDs are user-generated and shared, actual matching
  1094. // values are not logged as they may link users.
  1095. // TODO: log matching common compartment IDs?
  1096. matchedCommonCompartments := HaveCommonIDs(
  1097. proxyMatchAnnouncement.Properties.CommonCompartmentIDs,
  1098. clientMatchOffer.Properties.CommonCompartmentIDs)
  1099. matchedPersonalCompartments := HaveCommonIDs(
  1100. proxyMatchAnnouncement.Properties.PersonalCompartmentIDs,
  1101. clientMatchOffer.Properties.PersonalCompartmentIDs)
  1102. // Initiate a BrokerServerReport, which sends important information
  1103. // about the connection, including the original client IP, plus other
  1104. // values to be logged with server_tunne, to the server. The report is
  1105. // sent through a secure session established between the broker and the
  1106. // server, relayed by the client.
  1107. //
  1108. // The first relay message will be embedded in the Psiphon handshake. The
  1109. // broker may already have an established session with the server. In
  1110. // this case, only only that initial message is required. The
  1111. // BrokerServerReport is a one-way message, which avoids extra untunneled
  1112. // client/broker traffic.
  1113. //
  1114. // Limitations, due to the one-way message:
  1115. // - the broker can't actively clean up pendingServerReports as
  1116. // tunnels are established and must rely on cache expiry.
  1117. // - the broker doesn't learn that the server accepted the report, and
  1118. // so cannot log a final connection status or signal the proxy to
  1119. // disconnect the client in any misuse cases.
  1120. //
  1121. // As a future enhancement, consider adding a _tunneled_ client relay
  1122. // of a server response acknowledging the broker report.
  1123. relayPacket, err := b.initiateRelayedServerReport(
  1124. serverParams,
  1125. proxyAnswer.ConnectionID,
  1126. &BrokerServerReport{
  1127. ProxyID: proxyAnswer.ProxyID,
  1128. ConnectionID: proxyAnswer.ConnectionID,
  1129. MatchedCommonCompartments: matchedCommonCompartments,
  1130. MatchedPersonalCompartments: matchedPersonalCompartments,
  1131. ClientNATType: clientMatchOffer.Properties.NATType,
  1132. ClientPortMappingTypes: clientMatchOffer.Properties.PortMappingTypes,
  1133. ClientIP: clientIP,
  1134. ProxyIP: proxyAnswer.ProxyIP,
  1135. // ProxyMetrics includes proxy NAT and port mapping types.
  1136. ProxyMetrics: proxyMatchAnnouncement.ProxyMetrics,
  1137. ProxyIsPriority: proxyMatchAnnouncement.Properties.IsPriority,
  1138. })
  1139. if err != nil {
  1140. return nil, errors.Trace(err)
  1141. }
  1142. // Select the protocol version. The matcher has already checked
  1143. // negotiateProtocolVersion, so failure is not expected.
  1144. negotiatedProtocolVersion, ok := negotiateProtocolVersion(
  1145. proxyMatchAnnouncement.Properties.ProtocolVersion,
  1146. offerRequest.Metrics.ProtocolVersion,
  1147. offerRequest.UseMediaStreams)
  1148. if !ok {
  1149. return nil, errors.TraceNew("unexpected negotiateProtocolVersion failure")
  1150. }
  1151. // Respond with the proxy answer and initial broker/server session packet.
  1152. responsePayload, err := MarshalClientOfferResponse(
  1153. &ClientOfferResponse{
  1154. ConnectionID: proxyAnswer.ConnectionID,
  1155. SelectedProtocolVersion: negotiatedProtocolVersion,
  1156. ProxyAnswerSDP: proxyAnswer.ProxyAnswerSDP,
  1157. RelayPacketToServer: relayPacket,
  1158. })
  1159. if err != nil {
  1160. return nil, errors.Trace(err)
  1161. }
  1162. return responsePayload, nil
  1163. }
  1164. // handleProxyAnswer receives a proxy answer and delivers it to the waiting
  1165. // client.
  1166. func (b *Broker) handleProxyAnswer(
  1167. ctx context.Context,
  1168. extendTransportTimeout ExtendTransportTimeout,
  1169. transportLogFields common.LogFields,
  1170. proxyIP string,
  1171. geoIPData common.GeoIPData,
  1172. initiatorID ID,
  1173. requestPayload []byte) (retResponse []byte, retErr error) {
  1174. startTime := time.Now()
  1175. var logFields common.LogFields
  1176. var proxyAnswer *MatchAnswer
  1177. var answerError string
  1178. // The proxy ID is an implicit parameter: it's the proxy's session public
  1179. // key.
  1180. proxyID := initiatorID
  1181. // Always log the outcome.
  1182. defer func() {
  1183. if logFields == nil {
  1184. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  1185. }
  1186. logFields["broker_event"] = "proxy-answer"
  1187. logFields["broker_id"] = b.brokerID
  1188. logFields["proxy_id"] = proxyID
  1189. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  1190. if proxyAnswer != nil {
  1191. logFields["connection_id"] = proxyAnswer.ConnectionID
  1192. }
  1193. if answerError != "" {
  1194. // This is a proxy-reported error that occurred while creating the answer.
  1195. logFields["answer_error"] = answerError
  1196. logFields["error"] = fmt.Sprintf("proxy answer error: %s", answerError)
  1197. }
  1198. if retErr != nil {
  1199. // For the error field, retErr takes precedence over answerError
  1200. logFields["error"] = retErr.Error()
  1201. }
  1202. logFields.Add(transportLogFields)
  1203. b.config.Logger.LogMetric(brokerMetricName, logFields)
  1204. if retErr != nil {
  1205. retErr = NewBrokerLoggedEvent(retErr)
  1206. }
  1207. }()
  1208. answerRequest, err := UnmarshalProxyAnswerRequest(requestPayload)
  1209. if err != nil {
  1210. return nil, errors.Trace(err)
  1211. }
  1212. // The filtered SDP is the request SDP with any invalid (bogon, unexpected
  1213. // GeoIP) ICE candidates filtered out. In some cases, proxies cannot
  1214. // avoid submitting invalid candidates (see comment in
  1215. // processSDPAddresses), so all invalid candidates are removed and the
  1216. // remaining SDP is used. Filtered candidate information is logged in
  1217. // logFields.
  1218. //
  1219. // In personal pairing mode, RFC 1918/4193 private IP addresses are
  1220. // permitted in exchanged SDPs and not filtered out.
  1221. hasPersonalCompartmentIDs, err := b.matcher.AnnouncementHasPersonalCompartmentIDs(
  1222. initiatorID, answerRequest.ConnectionID)
  1223. if err != nil {
  1224. if std_errors.Is(err, errNoPendingAnswer) {
  1225. // Return a response. This avoids returning a
  1226. // broker-client-resetting 404 in this case.
  1227. responsePayload, err := MarshalProxyAnswerResponse(
  1228. &ProxyAnswerResponse{NoAwaitingClient: true})
  1229. if err != nil {
  1230. return nil, errors.Trace(err)
  1231. }
  1232. return responsePayload, nil
  1233. }
  1234. return nil, errors.Trace(err)
  1235. }
  1236. var filteredSDP []byte
  1237. filteredSDP, logFields, err = answerRequest.ValidateAndGetLogFields(
  1238. b.config.LookupGeoIP,
  1239. b.config.APIParameterValidator,
  1240. b.config.APIParameterLogFieldFormatter,
  1241. geoIPData,
  1242. hasPersonalCompartmentIDs)
  1243. if err != nil {
  1244. return nil, errors.Trace(err)
  1245. }
  1246. if answerRequest.AnswerError != "" {
  1247. // The proxy failed to create an answer.
  1248. answerError = answerRequest.AnswerError
  1249. b.matcher.AnswerError(initiatorID, answerRequest.ConnectionID)
  1250. } else {
  1251. // Deliver the answer to the client.
  1252. // Note that neither ProxyID nor ProxyIP is returned to the client.
  1253. // These fields are used internally in the matcher.
  1254. answerSDP := answerRequest.ProxyAnswerSDP
  1255. answerSDP.SDP = string(filteredSDP)
  1256. proxyAnswer = &MatchAnswer{
  1257. ProxyIP: proxyIP,
  1258. ProxyID: initiatorID,
  1259. ConnectionID: answerRequest.ConnectionID,
  1260. ProxyAnswerSDP: answerSDP,
  1261. }
  1262. err = b.matcher.Answer(proxyAnswer)
  1263. if err != nil {
  1264. if std_errors.Is(err, errNoPendingAnswer) {
  1265. // Return a response. This avoids returning a
  1266. // broker-client-resetting 404 in this case.
  1267. responsePayload, err := MarshalProxyAnswerResponse(
  1268. &ProxyAnswerResponse{NoAwaitingClient: true})
  1269. if err != nil {
  1270. return nil, errors.Trace(err)
  1271. }
  1272. return responsePayload, nil
  1273. }
  1274. return nil, errors.Trace(err)
  1275. }
  1276. }
  1277. // There is no data in this response, it's simply an acknowledgement that
  1278. // the answer was received. Upon receiving the response, the proxy should
  1279. // begin the WebRTC dial operation.
  1280. responsePayload, err := MarshalProxyAnswerResponse(
  1281. &ProxyAnswerResponse{})
  1282. if err != nil {
  1283. return nil, errors.Trace(err)
  1284. }
  1285. return responsePayload, nil
  1286. }
  1287. // handleServerProxyQuality receives, from servers, proxy tunnel quality and
  1288. // records that in the proxy quality state that is used to prioritize
  1289. // well-performing proxies.
  1290. func (b *Broker) handleServerProxyQuality(
  1291. ctx context.Context,
  1292. extendTransportTimeout ExtendTransportTimeout,
  1293. transportLogFields common.LogFields,
  1294. proxyIP string,
  1295. geoIPData common.GeoIPData,
  1296. initiatorID ID,
  1297. requestPayload []byte) (retResponse []byte, retErr error) {
  1298. startTime := time.Now()
  1299. var logFields common.LogFields
  1300. // Only known, trusted Psiphon server initiators are allowed to send proxy
  1301. // quality requests. knownServerInitiatorIDs is populated with the
  1302. // Curve25519 public keys -- initiator IDs -- corresponding to the
  1303. // session public keys found in signed Psiphon server entries.
  1304. //
  1305. // Currently, knownServerInitiatorIDs is populated with destination server
  1306. // entries received in client offers, so the broker must first receive a
  1307. // client offer before a given server is trusted, which means
  1308. // that "invalid initiator" errors may occur, and some quality requests
  1309. // may be dropped, in some expected situations, including a broker restart.
  1310. // serverID is the server entry diagnostic ID of the server.
  1311. serverIDValue, ok := b.knownServerInitiatorIDs.Load(initiatorID)
  1312. if !ok {
  1313. return nil, errors.TraceNew("invalid initiator")
  1314. }
  1315. serverID := serverIDValue.(string)
  1316. // Always log the outcome.
  1317. defer func() {
  1318. // Typically, a server will send the same proxy quality request to all
  1319. // brokers. For the one "broadcast" request, server-proxy-quality is
  1320. // logged by each broker, as an indication that every server/broker
  1321. // request pair is successful.
  1322. //
  1323. // TODO: log more details from ServerProxyQualityRequest.QualityCounts?
  1324. if logFields == nil {
  1325. logFields = common.LogFields{}
  1326. }
  1327. logFields["broker_event"] = "server-proxy-quality"
  1328. logFields["broker_id"] = b.brokerID
  1329. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  1330. logFields["server_id"] = serverID
  1331. if retErr != nil {
  1332. logFields["error"] = retErr.Error()
  1333. }
  1334. logFields.Add(transportLogFields)
  1335. b.config.Logger.LogMetric(brokerMetricName, logFields)
  1336. if retErr != nil {
  1337. retErr = NewBrokerLoggedEvent(retErr)
  1338. }
  1339. }()
  1340. qualityRequest, err := UnmarshalServerProxyQualityRequest(requestPayload)
  1341. if err != nil {
  1342. return nil, errors.Trace(err)
  1343. }
  1344. logFields, err = qualityRequest.ValidateAndGetLogFields()
  1345. if err != nil {
  1346. return nil, errors.Trace(err)
  1347. }
  1348. // Add the quality counts into the existing proxy quality state.
  1349. //
  1350. // The counts are ignored when proxy quality is disabled, but an
  1351. // anknowledgement is still returned to the server.
  1352. if b.enableProxyQuality.Load() {
  1353. for proxyKey, counts := range qualityRequest.QualityCounts {
  1354. b.proxyQualityState.AddQuality(proxyKey, counts)
  1355. }
  1356. }
  1357. // There is no data in this response, it's simply an acknowledgement that
  1358. // the request was received.
  1359. responsePayload, err := MarshalServerProxyQualityResponse(
  1360. &ServerProxyQualityResponse{})
  1361. if err != nil {
  1362. return nil, errors.Trace(err)
  1363. }
  1364. return responsePayload, nil
  1365. }
  1366. // handleClientRelayedPacket facilitates broker/server sessions. The initial
  1367. // packet from the broker is sent to the client in the ClientOfferResponse.
  1368. // The client sends that to the server in the Psiphon handshake. If the
  1369. // session was already established, the relay ends there. Otherwise, the
  1370. // client receives any packet sent back by the server and that server packet
  1371. // is then delivered to the broker in a ClientRelayedPacketRequest. If the
  1372. // session needs to be [re-]negotiated, there are additional
  1373. // ClientRelayedPacket round trips until the session is established and the
  1374. // BrokerServerReport is securely exchanged between the broker and server.
  1375. func (b *Broker) handleClientRelayedPacket(
  1376. ctx context.Context,
  1377. extendTransportTimeout ExtendTransportTimeout,
  1378. transportLogFields common.LogFields,
  1379. geoIPData common.GeoIPData,
  1380. initiatorID ID,
  1381. requestPayload []byte) (retResponse []byte, retErr error) {
  1382. startTime := time.Now()
  1383. var logFields common.LogFields
  1384. var relayedPacketRequest *ClientRelayedPacketRequest
  1385. var serverID string
  1386. // Always log the outcome.
  1387. defer func() {
  1388. if logFields == nil {
  1389. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  1390. }
  1391. logFields["broker_event"] = "client-relayed-packet"
  1392. logFields["broker_id"] = b.brokerID
  1393. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  1394. if relayedPacketRequest != nil {
  1395. logFields["connection_id"] = relayedPacketRequest.ConnectionID
  1396. }
  1397. if serverID != "" {
  1398. logFields["destination_server_id"] = serverID
  1399. }
  1400. if retErr != nil {
  1401. logFields["error"] = retErr.Error()
  1402. }
  1403. logFields.Add(transportLogFields)
  1404. b.config.Logger.LogMetric(brokerMetricName, logFields)
  1405. if retErr != nil {
  1406. retErr = NewBrokerLoggedEvent(retErr)
  1407. }
  1408. }()
  1409. relayedPacketRequest, err := UnmarshalClientRelayedPacketRequest(requestPayload)
  1410. if err != nil {
  1411. return nil, errors.Trace(err)
  1412. }
  1413. logFields, err = relayedPacketRequest.ValidateAndGetLogFields(
  1414. b.config.APIParameterValidator,
  1415. b.config.APIParameterLogFieldFormatter,
  1416. geoIPData)
  1417. if err != nil {
  1418. return nil, errors.Trace(err)
  1419. }
  1420. // The relay state is associated with the connection ID.
  1421. strConnectionID := string(relayedPacketRequest.ConnectionID[:])
  1422. entry, ok := b.pendingServerReports.Get(strConnectionID)
  1423. if !ok {
  1424. // The relay state is not found; it may have been evicted from the
  1425. // cache. The client will receive a generic error in this case and
  1426. // should stop relaying. Assuming the server is configured to require
  1427. // a BrokerServerReport, the tunnel will be terminated, so the
  1428. // client should also abandon the dial.
  1429. return nil, errors.TraceNew("no pending report")
  1430. }
  1431. pendingServerReport := entry.(*pendingServerReport)
  1432. serverID = pendingServerReport.serverID
  1433. // When the broker tried to use an existing session that was expired on the
  1434. // server, the server will respond here with a signed session reset token. The
  1435. // broker resets the session and starts to establish a new session.
  1436. //
  1437. // The non-waiting session establishment mode is used for broker/server
  1438. // sessions: if multiple clients concurrently try to relay new sessions,
  1439. // all establishments will happen in parallel without forcing any clients
  1440. // to wait for one client to lead the establishment. The last established
  1441. // session will be retained for reuse.
  1442. //
  1443. // If there is an error, the relayed packet is invalid. Drop the packet
  1444. // and return an error to be logged. Do _not_ reset the session,
  1445. // otherwise a malicious client could interrupt a valid broker/server
  1446. // session with a malformed packet.
  1447. // Next is given a nil ctx since we're not waiting for any other client to
  1448. // establish the session.
  1449. out, _, err := pendingServerReport.roundTrip.Next(
  1450. nil, relayedPacketRequest.PacketFromServer)
  1451. if err != nil {
  1452. return nil, errors.Trace(err)
  1453. }
  1454. if out == nil {
  1455. // The BrokerServerReport is a one-way message, As a result, the relay
  1456. // never ends with broker receiving a response; it's either
  1457. // (re)handshaking or sending the one-way report.
  1458. return nil, errors.TraceNew("unexpected nil packet")
  1459. }
  1460. // Return the next broker packet for the client to relay to the server.
  1461. // When it receives a nil PacketToServer, the client will stop relaying.
  1462. responsePayload, err := MarshalClientRelayedPacketResponse(
  1463. &ClientRelayedPacketResponse{
  1464. PacketToServer: out,
  1465. })
  1466. if err != nil {
  1467. return nil, errors.Trace(err)
  1468. }
  1469. return responsePayload, nil
  1470. }
  1471. // handleClientDSL relays client DSL requests. The broker's role is to provide
  1472. // a blocking resistant hop through to the DSL backend; DSL requests are not
  1473. // direct components of the in-proxy protocol.
  1474. func (b *Broker) handleClientDSL(
  1475. ctx context.Context,
  1476. extendTransportTimeout ExtendTransportTimeout,
  1477. transportLogFields common.LogFields,
  1478. clientIP string,
  1479. geoIPData common.GeoIPData,
  1480. initiatorID ID,
  1481. requestPayload []byte) (retResponse []byte, retErr error) {
  1482. startTime := time.Now()
  1483. var logFields common.LogFields
  1484. var requestSize, responseSize int
  1485. var dslRelayErr error
  1486. // Always log the outcome.
  1487. defer func() {
  1488. if logFields == nil {
  1489. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  1490. }
  1491. logFields["broker_event"] = "client-dsl"
  1492. logFields["broker_id"] = b.brokerID
  1493. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  1494. logFields["request_size"] = requestSize
  1495. logFields["response_size"] = responseSize
  1496. loggedError := false
  1497. if retErr != nil {
  1498. logFields["error"] = retErr.Error()
  1499. loggedError = true
  1500. } else if dslRelayErr != nil {
  1501. logFields["error"] = dslRelayErr.Error()
  1502. loggedError = true
  1503. }
  1504. logFields.Add(transportLogFields)
  1505. b.config.Logger.LogMetric(brokerMetricName, logFields)
  1506. if loggedError {
  1507. retErr = NewBrokerLoggedEvent(retErr)
  1508. }
  1509. }()
  1510. // Rate limit the number of relayed DSL requests. The DSL backend has its
  1511. // own rate limit enforcement, but avoiding excess requests here saves on
  1512. // resources consumed between the relay and backend.
  1513. //
  1514. // Unlike the announce/offer rate limit cases, there's no "limited" error
  1515. // flag returned to the client in this case, since this rate limiter is
  1516. // purely for abuse prevention and is expected to be configured with
  1517. // limits that won't be exceeded by legitimate clients.
  1518. rateLimitParams := b.dslRequestRateLimitParams.Load().(*brokerRateLimitParams)
  1519. err := brokerRateLimit(
  1520. b.dslRequestRateLimiters,
  1521. clientIP,
  1522. rateLimitParams.quantity,
  1523. rateLimitParams.interval)
  1524. if err != nil {
  1525. return nil, errors.Trace(err)
  1526. }
  1527. dslRequest, err := UnmarshalClientDSLRequest(requestPayload)
  1528. if err != nil {
  1529. return nil, errors.Trace(err)
  1530. }
  1531. requestSize = len(dslRequest.RequestPayload)
  1532. // There is no ValidateAndGetLogFields call in this handler as the DSL
  1533. // backend (or relay) validates inputs and logs events.
  1534. dslResponsePayload, err := b.config.RelayDSLRequest(
  1535. ctx,
  1536. extendTransportTimeout,
  1537. clientIP,
  1538. geoIPData,
  1539. dslRequest.RequestPayload)
  1540. if err != nil {
  1541. // RelayDSLRequest will always return a generic error response payload
  1542. // to send to the client, to ensure it retains its broker client
  1543. // round tripper. Any DSL relay errors, including missing
  1544. // configuration, will be logged to the broker_event.
  1545. dslRelayErr = errors.Trace(err)
  1546. }
  1547. responseSize = len(dslResponsePayload)
  1548. responsePayload, err := MarshalClientDSLResponse(
  1549. &ClientDSLResponse{
  1550. ResponsePayload: dslResponsePayload,
  1551. })
  1552. if err != nil {
  1553. return nil, errors.Trace(err)
  1554. }
  1555. return responsePayload, nil
  1556. }
  1557. func (b *Broker) adjustRequestTimeout(
  1558. logFields common.LogFields, timeout time.Duration) time.Duration {
  1559. // Adjust long-polling request timeouts to respect any maximum request
  1560. // timeout supported by the provider fronting the request.
  1561. //
  1562. // Limitation: the client is trusted to provide the correct fronting
  1563. // provider ID.
  1564. maxRequestTimeouts, ok := b.maxRequestTimeouts.Load().(map[string]time.Duration)
  1565. if !ok || maxRequestTimeouts == nil {
  1566. return timeout
  1567. }
  1568. frontingProviderID, ok := logFields["fronting_provider_id"].(string)
  1569. if !ok {
  1570. return timeout
  1571. }
  1572. maxRequestTimeout, ok := maxRequestTimeouts[frontingProviderID]
  1573. if !ok || maxRequestTimeout <= 0 || timeout <= maxRequestTimeout {
  1574. return timeout
  1575. }
  1576. return maxRequestTimeout
  1577. }
  1578. type pendingServerReport struct {
  1579. serverID string
  1580. serverReport *BrokerServerReport
  1581. roundTrip *InitiatorRoundTrip
  1582. }
  1583. func (b *Broker) initiateRelayedServerReport(
  1584. serverParams *serverParams,
  1585. connectionID ID,
  1586. serverReport *BrokerServerReport) ([]byte, error) {
  1587. reportPayload, err := MarshalBrokerServerReport(serverReport)
  1588. if err != nil {
  1589. return nil, errors.Trace(err)
  1590. }
  1591. // Force a new, concurrent session establishment with the server even if
  1592. // another handshake is already in progess, relayed by some other client.
  1593. // This ensures clients don't block waiting for other client relays
  1594. // through other tunnels. The last established session will be retained
  1595. // for reuse.
  1596. waitToShareSession := false
  1597. roundTrip, err := b.initiatorSessions.NewRoundTrip(
  1598. serverParams.sessionPublicKey,
  1599. serverParams.sessionRootObfuscationSecret,
  1600. waitToShareSession,
  1601. reportPayload)
  1602. if err != nil {
  1603. return nil, errors.Trace(err)
  1604. }
  1605. relayPacket, _, err := roundTrip.Next(nil, nil)
  1606. if err != nil {
  1607. return nil, errors.Trace(err)
  1608. }
  1609. strConnectionID := string(connectionID[:])
  1610. b.pendingServerReports.Set(
  1611. strConnectionID,
  1612. &pendingServerReport{
  1613. serverID: serverParams.serverID,
  1614. serverReport: serverReport,
  1615. roundTrip: roundTrip,
  1616. },
  1617. time.Duration(b.pendingServerReportsTTL.Load()))
  1618. return relayPacket, nil
  1619. }
  1620. type serverParams struct {
  1621. serverID string
  1622. sessionPublicKey SessionPublicKey
  1623. sessionRootObfuscationSecret ObfuscationSecret
  1624. }
  1625. // validateDestination checks that the client's specified proxy dial
  1626. // destination is valid destination address for a tunnel protocol in the
  1627. // specified signed and valid Psiphon server entry.
  1628. func (b *Broker) validateDestination(
  1629. geoIPData common.GeoIPData,
  1630. packedDestinationServerEntry []byte,
  1631. networkProtocol NetworkProtocol,
  1632. destinationAddress string) (*serverParams, error) {
  1633. var packedServerEntry protocol.PackedServerEntryFields
  1634. err := cbor.Unmarshal(packedDestinationServerEntry, &packedServerEntry)
  1635. if err != nil {
  1636. return nil, errors.Trace(err)
  1637. }
  1638. serverEntryFields, err := protocol.DecodePackedServerEntryFields(packedServerEntry)
  1639. if err != nil {
  1640. return nil, errors.Trace(err)
  1641. }
  1642. // Strip any unsigned fields, which could be forged by the client. In
  1643. // particular, this includes the server entry tag, which, in some cases,
  1644. // is locally populated by a client for its own reference.
  1645. serverEntryFields.RemoveUnsignedFields()
  1646. // Check that the server entry is signed by Psiphon. Otherwise a client
  1647. // could manufacture a server entry corresponding to an arbitrary dial
  1648. // destination.
  1649. err = serverEntryFields.VerifySignature(
  1650. b.config.ServerEntrySignaturePublicKey)
  1651. if err != nil {
  1652. return nil, errors.Trace(err)
  1653. }
  1654. // The server entry tag must be set and signed by Psiphon, as local,
  1655. // client derived tags are unsigned and untrusted.
  1656. serverEntryTag := serverEntryFields.GetTag()
  1657. if serverEntryTag == "" {
  1658. return nil, errors.TraceNew("missing server entry tag")
  1659. }
  1660. // Check that the server entry tag is on a list of active and valid
  1661. // Psiphon server entry tags. This ensures that an obsolete entry for a
  1662. // pruned server cannot by misused by a client to proxy to what's no
  1663. // longer a Psiphon server.
  1664. if !b.config.IsValidServerEntryTag(serverEntryTag) {
  1665. return nil, errors.TraceNew("invalid server entry tag")
  1666. }
  1667. serverID := serverEntryFields.GetDiagnosticID()
  1668. serverEntry, err := serverEntryFields.GetServerEntry()
  1669. if err != nil {
  1670. return nil, errors.Trace(err)
  1671. }
  1672. // Validate the dial host (IP or domain) and port matches a tunnel
  1673. // protocol offered by the server entry.
  1674. destHost, destPort, err := net.SplitHostPort(destinationAddress)
  1675. if err != nil {
  1676. return nil, errors.Trace(err)
  1677. }
  1678. destPortNum, err := strconv.Atoi(destPort)
  1679. if err != nil {
  1680. return nil, errors.Trace(err)
  1681. }
  1682. // For domain fronted cases, since we can't verify the Host header, access
  1683. // is strictly to limited to targeted clients. Clients should use tactics
  1684. // to avoid disallowed domain dial address cases, but here the broker
  1685. // enforces it.
  1686. //
  1687. // TODO: this issue could be further mitigated with a server
  1688. // acknowledgement of the broker's report, with no acknowledgement
  1689. // followed by signaling the proxy to terminate client connection.
  1690. // This assumes that any domain dial is for domain fronting.
  1691. isDomain := net.ParseIP(destHost) == nil
  1692. if isDomain && !b.config.AllowDomainFrontedDestinations(geoIPData) {
  1693. return nil, errors.TraceNew("domain fronted destinations disallowed")
  1694. }
  1695. // The server entry must include an in-proxy tunnel protocol capability
  1696. // and corresponding dial port number. In-proxy capacity may be set for
  1697. // only a subset of all Psiphon servers, to limited the number of servers
  1698. // a proxy can observe and enumerate. Well-behaved clients will not send
  1699. // any server entries lacking this capability, but here the broker
  1700. // enforces it.
  1701. if !serverEntry.IsValidInproxyDialAddress(networkProtocol.String(), destHost, destPortNum) {
  1702. return nil, errors.TraceNew("invalid destination address")
  1703. }
  1704. // Extract and return the key material to be used for the secure session
  1705. // and BrokerServer exchange between the broker and the Psiphon server
  1706. // corresponding to this server entry.
  1707. params := &serverParams{
  1708. serverID: serverID,
  1709. }
  1710. params.sessionPublicKey, err = SessionPublicKeyFromString(
  1711. serverEntry.InproxySessionPublicKey)
  1712. if err != nil {
  1713. return nil, errors.Trace(err)
  1714. }
  1715. params.sessionRootObfuscationSecret, err = ObfuscationSecretFromString(
  1716. serverEntry.InproxySessionRootObfuscationSecret)
  1717. if err != nil {
  1718. return nil, errors.Trace(err)
  1719. }
  1720. // Record that this server is known and trusted ServerProxyQualityRequest
  1721. // sender. The serverID is stored for logging in handleServerProxyQuality.
  1722. //
  1723. // There is no expiry for knownServerInitiatorIDs entries, and they will
  1724. // clear only if the broker is restarted (which is the same lifetime as
  1725. // ServerEntrySignaturePublicKey).
  1726. //
  1727. // Limitation: in time, the above IsValidServerEntryTag check could become
  1728. // false for a retired server, while its entry remains in
  1729. // knownServerInitiatorIDs. However, unlike the case of a recycled
  1730. // Psiphon server IP being used as a proxy destination, it's safer to
  1731. // assume that a retired server's session private key does not become
  1732. // exposed.
  1733. serverInitiatorID, err := params.sessionPublicKey.ToCurve25519()
  1734. if err != nil {
  1735. return nil, errors.Trace(err)
  1736. }
  1737. // For hosts running a single psiphond with multiple server entries, there
  1738. // will be multiple possible serverIDs for one serverInitiatorID. Don't
  1739. // overwrite any existing entry; keep the first observed serverID for
  1740. // more stable logging.
  1741. _, _ = b.knownServerInitiatorIDs.LoadOrStore(ID(serverInitiatorID), serverID)
  1742. return params, nil
  1743. }
  1744. func (b *Broker) isCommonCompartmentIDHashingInitialized() bool {
  1745. b.commonCompartmentsMutex.Lock()
  1746. defer b.commonCompartmentsMutex.Unlock()
  1747. return b.commonCompartments != nil
  1748. }
  1749. func (b *Broker) initializeCommonCompartmentIDHashing(
  1750. commonCompartmentIDs []ID) error {
  1751. b.commonCompartmentsMutex.Lock()
  1752. defer b.commonCompartmentsMutex.Unlock()
  1753. // At least one common compartment ID is required. At a minimum, one ID
  1754. // will be used and distributed to clients via tactics, limiting matching
  1755. // to those clients targeted to receive that tactic parameters.
  1756. if len(commonCompartmentIDs) == 0 {
  1757. return errors.TraceNew("missing common compartment IDs")
  1758. }
  1759. // The consistent package doesn't allow duplicate members.
  1760. checkDup := make(map[ID]bool, len(commonCompartmentIDs))
  1761. for _, compartmentID := range commonCompartmentIDs {
  1762. if checkDup[compartmentID] {
  1763. return errors.TraceNew("duplicate common compartment IDs")
  1764. }
  1765. checkDup[compartmentID] = true
  1766. }
  1767. // Proxies without personal compartment IDs are randomly assigned to the
  1768. // set of common, Psiphon-specified, compartment IDs. These common
  1769. // compartment IDs are then distributed to targeted clients through
  1770. // tactics or embedded in OSLs, to limit access to proxies.
  1771. //
  1772. // Use consistent hashing in an effort to keep a consistent assignment of
  1773. // proxies (as specified by proxy ID, which covers all announcements for
  1774. // a single proxy). This is more of a concern for long-lived, permanent
  1775. // proxies that are not behind any NAT.
  1776. //
  1777. // Even with consistent hashing, a subset of proxies will still change
  1778. // assignment when CommonCompartmentIDs changes.
  1779. consistentMembers := make([]consistent.Member, len(commonCompartmentIDs))
  1780. for i, compartmentID := range commonCompartmentIDs {
  1781. consistentMembers[i] = consistentMember(compartmentID.String())
  1782. }
  1783. b.commonCompartments = consistent.New(
  1784. consistentMembers,
  1785. consistent.Config{
  1786. PartitionCount: len(consistentMembers),
  1787. ReplicationFactor: 1,
  1788. Load: 1,
  1789. Hasher: xxhasher{},
  1790. })
  1791. return nil
  1792. }
  1793. // xxhasher wraps github.com/cespare/xxhash.Sum64 in the interface expected by
  1794. // github.com/buraksezer/consistent. xxhash is a high quality hash function
  1795. // used in github.com/buraksezer/consistent examples.
  1796. type xxhasher struct{}
  1797. func (h xxhasher) Sum64(data []byte) uint64 {
  1798. return xxhash.Sum64(data)
  1799. }
  1800. // consistentMember wraps the string type with the interface expected by
  1801. // github.com/buraksezer/consistent.
  1802. type consistentMember string
  1803. func (m consistentMember) String() string {
  1804. return string(m)
  1805. }
  1806. func (b *Broker) selectCommonCompartmentID(proxyID ID) (ID, error) {
  1807. b.commonCompartmentsMutex.Lock()
  1808. defer b.commonCompartmentsMutex.Unlock()
  1809. compartmentID, err := IDFromString(
  1810. b.commonCompartments.LocateKey(proxyID[:]).String())
  1811. if err != nil {
  1812. return compartmentID, errors.Trace(err)
  1813. }
  1814. return compartmentID, nil
  1815. }
  1816. type brokerRateLimitParams struct {
  1817. quantity int
  1818. interval time.Duration
  1819. }
  1820. func brokerRateLimit(
  1821. rateLimiters *lrucache.Cache,
  1822. limitIP string,
  1823. quantity int,
  1824. interval time.Duration) error {
  1825. if quantity <= 0 || interval <= 0 {
  1826. return nil
  1827. }
  1828. var rateLimiter *rate.Limiter
  1829. entry, ok := rateLimiters.Get(limitIP)
  1830. if ok {
  1831. rateLimiter = entry.(*rate.Limiter)
  1832. } else {
  1833. limit := float64(quantity) / interval.Seconds()
  1834. rateLimiter = rate.NewLimiter(rate.Limit(limit), quantity)
  1835. rateLimiters.Set(
  1836. limitIP, rateLimiter, interval)
  1837. }
  1838. if !rateLimiter.Allow() {
  1839. return errors.TraceNew("rate exceeded for IP")
  1840. }
  1841. return nil
  1842. }