broker.go 65 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "context"
  22. std_errors "errors"
  23. "net"
  24. "strconv"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/Psiphon-Labs/consistent"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  30. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  31. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  32. "github.com/cespare/xxhash"
  33. lrucache "github.com/cognusion/go-cache-lru"
  34. "github.com/fxamacker/cbor/v2"
  35. )
  36. const (
  37. // BrokerMaxRequestBodySize is the maximum request size, that should be
  38. // enforced by the provided broker transport.
  39. BrokerMaxRequestBodySize = 65536
  40. // BrokerEndPointName is the standard name for referencing an endpoint
  41. // that services broker requests.
  42. BrokerEndPointName = "inproxy-broker"
  43. brokerProxyAnnounceTimeout = 2 * time.Minute
  44. brokerClientOfferTimeout = 10 * time.Second
  45. brokerPendingServerReportsTTL = 60 * time.Second
  46. brokerPendingServerReportsMaxSize = 100000
  47. brokerMetricName = "inproxy_broker"
  48. )
  49. // LookupGeoIP is a callback for providing GeoIP lookup service.
  50. type LookupGeoIP func(IP string) common.GeoIPData
  51. // ExtendTransportTimeout is a callback that extends the timeout for a
  52. // server-side broker transport handler, facilitating request-specific
  53. // timeouts including long-polling for proxy announcements.
  54. type ExtendTransportTimeout func(timeout time.Duration)
  55. // GetTacticsPayload is a callback which returns the appropriate tactics
  56. // payload for the specified client/proxy GeoIP data and API parameters.
  57. type GetTacticsPayload func(
  58. common.GeoIPData, common.APIParameters) ([]byte, string, error)
  59. // Broker is the in-proxy broker component, which matches clients and proxies
  60. // and provides WebRTC signaling functionalty.
  61. //
  62. // Both clients and proxies send requests to the broker to obtain matches and
  63. // exchange WebRTC SDPs. Broker does not implement a transport or obfuscation
  64. // layer; instead that is provided by the HandleSessionPacket caller. A
  65. // typical implementation would provide a domain fronted web server which
  66. // runs a Broker and calls Broker.HandleSessionPacket to handle web requests
  67. // encapsulating secure session packets.
  68. type Broker struct {
  69. config *BrokerConfig
  70. brokerID ID
  71. initiatorSessions *InitiatorSessions
  72. responderSessions *ResponderSessions
  73. matcher *Matcher
  74. pendingServerReports *lrucache.Cache
  75. proxyQualityState *ProxyQualityState
  76. knownServerInitiatorIDs sync.Map
  77. commonCompartmentsMutex sync.Mutex
  78. commonCompartments *consistent.Consistent
  79. proxyAnnounceTimeout int64
  80. clientOfferTimeout int64
  81. clientOfferPersonalTimeout int64
  82. pendingServerReportsTTL int64
  83. maxRequestTimeouts atomic.Value
  84. maxCompartmentIDs int64
  85. enableProxyQualityMutex sync.Mutex
  86. enableProxyQuality atomic.Bool
  87. }
  88. // BrokerConfig specifies the configuration for a Broker.
  89. type BrokerConfig struct {
  90. // Logger is used to log events.
  91. Logger common.Logger
  92. // CommonCompartmentIDs is a list of common compartment IDs to apply to
  93. // proxies that announce without personal compartment ID. Common
  94. // compartment IDs are managed by Psiphon and distributed to clients via
  95. // tactics or embedded in OSLs. Clients must supply a valid compartment
  96. // ID to match with a proxy.
  97. //
  98. // A BrokerConfig must supply at least one compartment ID, or
  99. // SetCompartmentIDs must be called with at least one compartment ID
  100. // before calling Start.
  101. //
  102. // When only one, single common compartment ID is configured, it can serve
  103. // as an (obfuscation) secret that clients must obtain, via tactics, to
  104. // enable in-proxy participation.
  105. CommonCompartmentIDs []ID
  106. // AllowProxy is a callback which can indicate whether a proxy with the
  107. // given GeoIP data is allowed to match with common compartment ID
  108. // clients. Proxies with personal compartment IDs are always allowed.
  109. AllowProxy func(common.GeoIPData) bool
  110. // PrioritizeProxy is a callback which can indicate whether proxy
  111. // announcements from proxies with the specified in-proxy protocol
  112. // version, GeoIPData, and APIParameters should be prioritized in the
  113. // matcher queue. Priority proxy announcements match ahead of other proxy
  114. // announcements, regardless of announcement age/deadline. Priority
  115. // status takes precedence over preferred NAT matching. Prioritization
  116. // applies only to common compartment IDs and not personal pairing mode.
  117. PrioritizeProxy func(int, common.GeoIPData, common.APIParameters) bool
  118. // AllowClient is a callback which can indicate whether a client with the
  119. // given GeoIP data is allowed to match with common compartment ID
  120. // proxies. Clients are always allowed to match based on personal
  121. // compartment ID.
  122. AllowClient func(common.GeoIPData) bool
  123. // AllowDomainFrontedDestinations is a callback which can indicate whether
  124. // a client with the given GeoIP data is allowed to specify a proxied
  125. // destination for a domain fronted protocol. When false, only direct
  126. // address destinations are allowed.
  127. //
  128. // While tactics may may be set to instruct clients to use only direct
  129. // server tunnel protocols, with IP address destinations, this callback
  130. // adds server-side enforcement.
  131. AllowDomainFrontedDestinations func(common.GeoIPData) bool
  132. // LookupGeoIP provides GeoIP lookup service.
  133. LookupGeoIP LookupGeoIP
  134. // APIParameterValidator is a callback that validates base API metrics.
  135. APIParameterValidator common.APIParameterValidator
  136. // APIParameterValidator is a callback that formats base API metrics.
  137. APIParameterLogFieldFormatter common.APIParameterLogFieldFormatter
  138. // GetTacticsPayload provides a tactics lookup service.
  139. GetTacticsPayload GetTacticsPayload
  140. // IsValidServerEntryTag is a callback which checks if the specified
  141. // server entry tag is on the list of valid and active Psiphon server
  142. // entry tags.
  143. IsValidServerEntryTag func(serverEntryTag string) bool
  144. // IsLoadLimiting is a callback which checks if the broker process is in a
  145. // load limiting state, where consumed resources, including allocated
  146. // system memory and CPU load, exceed determined thresholds. When load
  147. // limiting is indicated, the broker will attempt to reduce load by
  148. // immediately rejecting either proxy announces or client offers,
  149. // depending on the state of the corresponding queues.
  150. IsLoadLimiting func() bool
  151. // PrivateKey is the broker's secure session long term private key.
  152. PrivateKey SessionPrivateKey
  153. // ObfuscationRootSecret broker's secure session long term obfuscation key.
  154. ObfuscationRootSecret ObfuscationSecret
  155. // ServerEntrySignaturePublicKey is the key used to verify Psiphon server
  156. // entry signatures.
  157. ServerEntrySignaturePublicKey string
  158. // These timeout parameters may be used to override defaults.
  159. ProxyAnnounceTimeout time.Duration
  160. ClientOfferTimeout time.Duration
  161. ClientOfferPersonalTimeout time.Duration
  162. PendingServerReportsTTL time.Duration
  163. // Announcement queue limit configuration.
  164. MatcherAnnouncementLimitEntryCount int
  165. MatcherAnnouncementRateLimitQuantity int
  166. MatcherAnnouncementRateLimitInterval time.Duration
  167. MatcherAnnouncementNonlimitedProxyIDs []ID
  168. // Offer queue limit configuration.
  169. MatcherOfferLimitEntryCount int
  170. MatcherOfferRateLimitQuantity int
  171. MatcherOfferRateLimitInterval time.Duration
  172. // MaxCompartmentIDs specifies the maximum number of compartment IDs that
  173. // can be included, per list, in one request. If 0, the value
  174. // MaxCompartmentIDs is used.
  175. MaxCompartmentIDs int
  176. }
  177. // NewBroker initializes a new Broker.
  178. func NewBroker(config *BrokerConfig) (*Broker, error) {
  179. // initiatorSessions are secure sessions initiated by the broker and used
  180. // to send BrokerServerReports to servers. The servers will be
  181. // configured to establish sessions only with brokers with specified
  182. // public keys.
  183. initiatorSessions := NewInitiatorSessions(config.PrivateKey)
  184. // responderSessions are secure sessions initiated by clients and proxies
  185. // and used to send requests to the broker. Clients and proxies are
  186. // configured to establish sessions only with specified broker public keys.
  187. responderSessions, err := NewResponderSessions(
  188. config.PrivateKey, config.ObfuscationRootSecret)
  189. if err != nil {
  190. return nil, errors.Trace(err)
  191. }
  192. // The broker ID is the broker's session public key in Curve25519 form.
  193. publicKey, err := config.PrivateKey.GetPublicKey()
  194. if err != nil {
  195. return nil, errors.Trace(err)
  196. }
  197. brokerID, err := publicKey.ToCurve25519()
  198. if err != nil {
  199. return nil, errors.Trace(err)
  200. }
  201. proxyQuality := NewProxyQuality()
  202. b := &Broker{
  203. config: config,
  204. brokerID: ID(brokerID),
  205. initiatorSessions: initiatorSessions,
  206. responderSessions: responderSessions,
  207. matcher: NewMatcher(&MatcherConfig{
  208. Logger: config.Logger,
  209. AnnouncementLimitEntryCount: config.MatcherAnnouncementLimitEntryCount,
  210. AnnouncementRateLimitQuantity: config.MatcherAnnouncementRateLimitQuantity,
  211. AnnouncementRateLimitInterval: config.MatcherAnnouncementRateLimitInterval,
  212. AnnouncementNonlimitedProxyIDs: config.MatcherAnnouncementNonlimitedProxyIDs,
  213. OfferLimitEntryCount: config.MatcherOfferLimitEntryCount,
  214. OfferRateLimitQuantity: config.MatcherOfferRateLimitQuantity,
  215. OfferRateLimitInterval: config.MatcherOfferRateLimitInterval,
  216. ProxyQualityState: proxyQuality,
  217. IsLoadLimiting: config.IsLoadLimiting,
  218. }),
  219. proxyQualityState: proxyQuality,
  220. proxyAnnounceTimeout: int64(config.ProxyAnnounceTimeout),
  221. clientOfferTimeout: int64(config.ClientOfferTimeout),
  222. clientOfferPersonalTimeout: int64(config.ClientOfferPersonalTimeout),
  223. pendingServerReportsTTL: int64(config.PendingServerReportsTTL),
  224. maxCompartmentIDs: int64(common.ValueOrDefault(config.MaxCompartmentIDs, MaxCompartmentIDs)),
  225. }
  226. b.pendingServerReports = lrucache.NewWithLRU(
  227. common.ValueOrDefault(config.PendingServerReportsTTL, brokerPendingServerReportsTTL),
  228. 1*time.Minute,
  229. brokerPendingServerReportsMaxSize)
  230. if len(config.CommonCompartmentIDs) > 0 {
  231. err = b.initializeCommonCompartmentIDHashing(config.CommonCompartmentIDs)
  232. if err != nil {
  233. return nil, errors.Trace(err)
  234. }
  235. }
  236. return b, nil
  237. }
  238. func (b *Broker) Start() error {
  239. if !b.isCommonCompartmentIDHashingInitialized() {
  240. return errors.TraceNew("missing common compartment IDs")
  241. }
  242. return errors.Trace(b.matcher.Start())
  243. }
  244. func (b *Broker) Stop() {
  245. b.matcher.Stop()
  246. }
  247. // SetCommonCompartmentIDs sets a new list of common compartment IDs,
  248. // replacing the previous configuration.
  249. func (b *Broker) SetCommonCompartmentIDs(commonCompartmentIDs []ID) error {
  250. // TODO: initializeCommonCompartmentIDHashing is called regardless whether
  251. // commonCompartmentIDs changes the previous configuration. To avoid the
  252. // overhead of consistent hashing initialization in
  253. // initializeCommonCompartmentIDHashing, add a mechanism to first quickly
  254. // check for changes?
  255. return errors.Trace(b.initializeCommonCompartmentIDHashing(commonCompartmentIDs))
  256. }
  257. // SetTimeouts sets new timeout values, replacing the previous configuration.
  258. // New timeout values do not apply to currently active announcement or offer
  259. // requests.
  260. func (b *Broker) SetTimeouts(
  261. proxyAnnounceTimeout time.Duration,
  262. clientOfferTimeout time.Duration,
  263. clientOfferPersonalTimeout time.Duration,
  264. pendingServerReportsTTL time.Duration,
  265. maxRequestTimeouts map[string]time.Duration) {
  266. atomic.StoreInt64(&b.proxyAnnounceTimeout, int64(proxyAnnounceTimeout))
  267. atomic.StoreInt64(&b.clientOfferTimeout, int64(clientOfferTimeout))
  268. atomic.StoreInt64(&b.clientOfferPersonalTimeout, int64(clientOfferPersonalTimeout))
  269. atomic.StoreInt64(&b.pendingServerReportsTTL, int64(pendingServerReportsTTL))
  270. b.maxRequestTimeouts.Store(maxRequestTimeouts)
  271. }
  272. // SetLimits sets new queue limit values, replacing the previous
  273. // configuration. New limits are only partially applied to existing queue
  274. // states; see Matcher.SetLimits.
  275. func (b *Broker) SetLimits(
  276. matcherAnnouncementLimitEntryCount int,
  277. matcherAnnouncementRateLimitQuantity int,
  278. matcherAnnouncementRateLimitInterval time.Duration,
  279. matcherAnnouncementNonlimitedProxyIDs []ID,
  280. matcherOfferLimitEntryCount int,
  281. matcherOfferRateLimitQuantity int,
  282. matcherOfferRateLimitInterval time.Duration,
  283. maxCompartmentIDs int) {
  284. b.matcher.SetLimits(
  285. matcherAnnouncementLimitEntryCount,
  286. matcherAnnouncementRateLimitQuantity,
  287. matcherAnnouncementRateLimitInterval,
  288. matcherAnnouncementNonlimitedProxyIDs,
  289. matcherOfferLimitEntryCount,
  290. matcherOfferRateLimitQuantity,
  291. matcherOfferRateLimitInterval)
  292. atomic.StoreInt64(
  293. &b.maxCompartmentIDs,
  294. int64(common.ValueOrDefault(maxCompartmentIDs, MaxCompartmentIDs)))
  295. }
  296. func (b *Broker) SetProxyQualityParameters(
  297. enable bool,
  298. qualityTTL time.Duration,
  299. pendingFailedMatchDeadline time.Duration,
  300. failedMatchThreshold int) {
  301. // enableProxyQuality is an atomic for fast lookups in request handlers;
  302. // an additional mutex is used here to ensure the Swap/Flush combination
  303. // is also atomic.
  304. b.enableProxyQualityMutex.Lock()
  305. wasEnabled := b.enableProxyQuality.Swap(enable)
  306. if wasEnabled && !enable {
  307. // Flush quality state, since otherwise the quality TTL can retain
  308. // quality data which may be unexpectedly reactivated when enable is
  309. // toggled on again.
  310. b.proxyQualityState.Flush()
  311. }
  312. b.enableProxyQualityMutex.Unlock()
  313. b.proxyQualityState.SetParameters(
  314. qualityTTL,
  315. pendingFailedMatchDeadline,
  316. failedMatchThreshold)
  317. }
  318. // HandleSessionPacket handles a session packet from a client or proxy and
  319. // provides a response packet. The packet is part of a secure session and may
  320. // be a session handshake message, an expired session reset token, or a
  321. // session-wrapped request payload. Request payloads are routed to API
  322. // request endpoints.
  323. //
  324. // The caller is expected to provide a transport obfuscation layer, such as
  325. // domain fronted HTTPs. The session has an obfuscation layer that ensures
  326. // that packets are fully random, randomly padded, and cannot be replayed.
  327. // This makes session packets suitable to embed as plaintext in some
  328. // transports.
  329. //
  330. // The caller is responsible for rate limiting and enforcing timeouts and
  331. // maximum payload size checks.
  332. //
  333. // Secure sessions support multiplexing concurrent requests, as long as the
  334. // provided transport, for example HTTP/2, supports this as well.
  335. //
  336. // The input ctx should be canceled if the client/proxy disconnects from the
  337. // transport while HandleSessionPacket is running, since long-polling proxy
  338. // announcement requests will otherwise remain blocked until eventual
  339. // timeout; net/http does this.
  340. //
  341. // When HandleSessionPacket returns an error, the transport provider should
  342. // apply anti-probing mechanisms, as the client/proxy may be a prober or
  343. // scanner.
  344. func (b *Broker) HandleSessionPacket(
  345. ctx context.Context,
  346. extendTransportTimeout ExtendTransportTimeout,
  347. transportLogFields common.LogFields,
  348. brokerClientIP string,
  349. geoIPData common.GeoIPData,
  350. inPacket []byte) ([]byte, error) {
  351. // handleUnwrappedRequest handles requests after session unwrapping.
  352. // responderSessions.HandlePacket handles both session establishment and
  353. // request unwrapping, and invokes handleUnwrappedRequest once a session
  354. // is established and a valid request unwrapped.
  355. handleUnwrappedRequest := func(initiatorID ID, unwrappedRequestPayload []byte) ([]byte, error) {
  356. recordType, err := peekRecordPreambleType(unwrappedRequestPayload)
  357. if err != nil {
  358. return nil, errors.Trace(err)
  359. }
  360. var responsePayload []byte
  361. switch recordType {
  362. case recordTypeAPIProxyAnnounceRequest:
  363. responsePayload, err = b.handleProxyAnnounce(
  364. ctx,
  365. extendTransportTimeout,
  366. transportLogFields,
  367. brokerClientIP,
  368. geoIPData,
  369. initiatorID,
  370. unwrappedRequestPayload)
  371. if err != nil {
  372. return nil, errors.Trace(err)
  373. }
  374. case recordTypeAPIProxyAnswerRequest:
  375. responsePayload, err = b.handleProxyAnswer(
  376. ctx,
  377. extendTransportTimeout,
  378. transportLogFields,
  379. brokerClientIP,
  380. geoIPData,
  381. initiatorID,
  382. unwrappedRequestPayload)
  383. if err != nil {
  384. return nil, errors.Trace(err)
  385. }
  386. case recordTypeAPIClientOfferRequest:
  387. responsePayload, err = b.handleClientOffer(
  388. ctx,
  389. extendTransportTimeout,
  390. transportLogFields,
  391. brokerClientIP,
  392. geoIPData,
  393. initiatorID,
  394. unwrappedRequestPayload)
  395. if err != nil {
  396. return nil, errors.Trace(err)
  397. }
  398. case recordTypeAPIServerProxyQualityRequest:
  399. responsePayload, err = b.handleServerProxyQuality(
  400. ctx,
  401. extendTransportTimeout,
  402. transportLogFields,
  403. brokerClientIP,
  404. geoIPData,
  405. initiatorID,
  406. unwrappedRequestPayload)
  407. if err != nil {
  408. return nil, errors.Trace(err)
  409. }
  410. case recordTypeAPIClientRelayedPacketRequest:
  411. responsePayload, err = b.handleClientRelayedPacket(
  412. ctx,
  413. extendTransportTimeout,
  414. transportLogFields,
  415. geoIPData,
  416. initiatorID,
  417. unwrappedRequestPayload)
  418. if err != nil {
  419. return nil, errors.Trace(err)
  420. }
  421. default:
  422. return nil, errors.Tracef("unexpected API record type %v", recordType)
  423. }
  424. return responsePayload, nil
  425. }
  426. // HandlePacket returns both a packet and an error in the expired session
  427. // reset token case. Log the error here, clear it, and return the
  428. // packet to be relayed back to the broker client.
  429. outPacket, err := b.responderSessions.HandlePacket(
  430. inPacket, handleUnwrappedRequest)
  431. if err != nil {
  432. if outPacket == nil {
  433. return nil, errors.Trace(err)
  434. }
  435. b.config.Logger.WithTraceFields(common.LogFields{"error": err}).Warning(
  436. "HandlePacket returned packet and error")
  437. }
  438. return outPacket, nil
  439. }
  440. // handleProxyAnnounce receives a proxy announcement, awaits a matching
  441. // client, and returns the client offer in the response. handleProxyAnnounce
  442. // has a long timeout so this request can idle until a matching client
  443. // arrives.
  444. func (b *Broker) handleProxyAnnounce(
  445. ctx context.Context,
  446. extendTransportTimeout ExtendTransportTimeout,
  447. transportLogFields common.LogFields,
  448. proxyIP string,
  449. geoIPData common.GeoIPData,
  450. initiatorID ID,
  451. requestPayload []byte) (retResponse []byte, retErr error) {
  452. startTime := time.Now()
  453. var logFields common.LogFields
  454. var isPriority bool
  455. var newTacticsTag string
  456. var clientOffer *MatchOffer
  457. var matchMetrics *MatchMetrics
  458. var timedOut bool
  459. var limitedErr error
  460. // As a future enhancement, a broker could initiate its own test
  461. // connection to the proxy to verify its effectiveness, including
  462. // simulating a symmetric NAT client.
  463. // Each announcement represents availability for a single client matching.
  464. // Proxies with multiple client availability will send multiple requests.
  465. //
  466. // The announcement request and response could be extended to allow the
  467. // proxy to specify availability for multiple clients in the request, and
  468. // multiple client offers returned in the response.
  469. //
  470. // If, as we expect, proxies run on home ISPs have limited upstream
  471. // bandwidth, they will support only a couple of concurrent clients, and
  472. // the simple single-client-announcment model may be sufficient. Also, if
  473. // the transport is HTTP/2, multiple requests can be multiplexed over a
  474. // single connection (and session) in any case.
  475. // The proxy ID is an implicit parameter: it's the proxy's session public
  476. // key. As part of the session handshake, the proxy has proven that it
  477. // has the corresponding private key. Proxy IDs are logged to attribute
  478. // traffic to a specific proxy.
  479. proxyID := initiatorID
  480. // Generate a connection ID. This ID is used to associate proxy
  481. // announcments, client offers, and proxy answers, as well as associating
  482. // Psiphon tunnels with in-proxy pairings.
  483. connectionID, err := MakeID()
  484. if err != nil {
  485. return nil, errors.Trace(err)
  486. }
  487. // Always log the outcome.
  488. defer func() {
  489. if logFields == nil {
  490. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  491. }
  492. logFields["broker_event"] = "proxy-announce"
  493. logFields["broker_id"] = b.brokerID
  494. logFields["proxy_id"] = proxyID
  495. logFields["is_priority"] = isPriority
  496. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  497. logFields["connection_id"] = connectionID
  498. if newTacticsTag != "" {
  499. logFields["new_tactics_tag"] = newTacticsTag
  500. }
  501. if clientOffer != nil {
  502. // Log the target Psiphon server ID (diagnostic ID). The presence
  503. // of this field indicates that a match was made.
  504. logFields["destination_server_id"] = clientOffer.DestinationServerID
  505. logFields["use_media_streams"] = clientOffer.UseMediaStreams
  506. }
  507. if timedOut {
  508. logFields["timed_out"] = true
  509. }
  510. if retErr != nil {
  511. logFields["error"] = retErr.Error()
  512. } else if limitedErr != nil {
  513. logFields["error"] = limitedErr.Error()
  514. }
  515. logFields.Add(transportLogFields)
  516. logFields.Add(matchMetrics.GetMetrics())
  517. b.config.Logger.LogMetric(brokerMetricName, logFields)
  518. }()
  519. announceRequest, err := UnmarshalProxyAnnounceRequest(requestPayload)
  520. if err != nil {
  521. return nil, errors.Trace(err)
  522. }
  523. var apiParams common.APIParameters
  524. apiParams, logFields, err = announceRequest.ValidateAndGetParametersAndLogFields(
  525. int(atomic.LoadInt64(&b.maxCompartmentIDs)),
  526. b.config.APIParameterValidator,
  527. b.config.APIParameterLogFieldFormatter,
  528. geoIPData)
  529. if err != nil {
  530. return nil, errors.Trace(err)
  531. }
  532. hasPersonalCompartmentIDs := len(announceRequest.PersonalCompartmentIDs) > 0
  533. // Return MustUpgrade when the proxy's protocol version is less than the
  534. // minimum required.
  535. if announceRequest.Metrics.ProtocolVersion < minimumProxyProtocolVersion {
  536. responsePayload, err := MarshalProxyAnnounceResponse(
  537. &ProxyAnnounceResponse{
  538. NoMatch: true,
  539. MustUpgrade: true,
  540. })
  541. if err != nil {
  542. return nil, errors.Trace(err)
  543. }
  544. return responsePayload, nil
  545. }
  546. // Fetch new tactics for the proxy, if required, using the tactics tag
  547. // that should be included with the API parameters. A tacticsPayload may
  548. // be returned when there are no new tactics, and this is relayed back to
  549. // the proxy, after matching, so that it can extend the TTL for its
  550. // existing, cached tactics. In the case where tactics have changed,
  551. // don't enqueue the proxy announcement and return no-match so that the
  552. // proxy can store and apply the new tactics before announcing again.
  553. var tacticsPayload []byte
  554. if announceRequest.CheckTactics {
  555. tacticsPayload, newTacticsTag, err =
  556. b.config.GetTacticsPayload(geoIPData, apiParams)
  557. if err != nil {
  558. return nil, errors.Trace(err)
  559. }
  560. if tacticsPayload != nil && newTacticsTag != "" {
  561. responsePayload, err := MarshalProxyAnnounceResponse(
  562. &ProxyAnnounceResponse{
  563. TacticsPayload: tacticsPayload,
  564. NoMatch: true,
  565. })
  566. if err != nil {
  567. return nil, errors.Trace(err)
  568. }
  569. return responsePayload, nil
  570. }
  571. }
  572. // AllowProxy may be used to disallow proxies from certain geolocations,
  573. // such as censored locations, from announcing. Proxies with personal
  574. // compartment IDs are always allowed, as they will be used only by
  575. // clients specifically configured to use them.
  576. if !hasPersonalCompartmentIDs &&
  577. !b.config.AllowProxy(geoIPData) {
  578. return nil, errors.TraceNew("proxy disallowed")
  579. }
  580. // Assign this proxy to a common compartment ID, unless it has specified a
  581. // dedicated, personal compartment ID. Assignment uses consistent hashing
  582. // keyed with the proxy ID, in an effort to keep proxies consistently
  583. // assigned to the same compartment.
  584. var commonCompartmentIDs []ID
  585. if !hasPersonalCompartmentIDs {
  586. compartmentID, err := b.selectCommonCompartmentID(proxyID)
  587. if err != nil {
  588. return nil, errors.Trace(err)
  589. }
  590. commonCompartmentIDs = []ID{compartmentID}
  591. }
  592. // Determine whether to enqueue the proxy announcement in the priority
  593. // queue. To be prioritized, a proxy, identified by its ID and ASN, must
  594. // have a recent quality tunnel recorded in the quality state. In
  595. // addition, when the PrioritizeProxy callback is set, invoke this
  596. // additional condition, which can filter by proxy geolocation and other
  597. // properties.
  598. //
  599. // There is no prioritization for personal pairing announcements.
  600. // Potential future enhancements:
  601. //
  602. // - For a proxy with unknown quality (neither reported quality tunnels,
  603. // nor known failed matches), prioritize with some low probability to
  604. // give unknown proxies a chance to qualify? This could be limited, for
  605. // example, to proxies in the same ASN as other quality proxies. To
  606. // implement this, ProxyQualityState would need to record proxy IDs
  607. // with failed matches; and proxy ASNs would need to be input to
  608. // ProxyQualityState.
  609. //
  610. // - Consider using the Psiphon server region, as given in the signed
  611. // server entry, as part of the prioritization logic.
  612. if !hasPersonalCompartmentIDs && b.enableProxyQuality.Load() {
  613. // Here, no specific client ASN is specified for HasQuality. As long
  614. // as a proxy has a quality tunnel for any client ASN, it is
  615. // prioritized. In the matching process, an attempt is made to match
  616. // using HasQuality using the client ASN. See Matcher.matchOffer.
  617. isPriority = b.proxyQualityState.HasQuality(proxyID, geoIPData.ASN, "")
  618. }
  619. if isPriority && b.config.PrioritizeProxy != nil {
  620. // Note that, in the psiphon/server package, inproxyBrokerPrioritizeProxy
  621. // is always wired up, and, as currently implemented, the default value for
  622. // the InproxyBrokerMatcherPrioritizeProxiesFilter tactics parameter
  623. // results in PrioritizeProxy always returning false. Some filter,
  624. // even just a wildcard match, must be configured in order to prioritize.
  625. // Limitation: Of the two return values from
  626. // ValidateAndGetParametersAndLogFields, apiParams and logFields,
  627. // only logFields contains fields such as max_clients
  628. // and *_bytes_per_second, and so these cannot be part of any
  629. // filtering performed by the PrioritizeProxy callback.
  630. //
  631. // TODO: include the additional fields in logFields. Since the
  632. // logFields return value is the output of server.getRequestLogFields
  633. // processing, it's not safe to use it directly. In addition,
  634. // filtering by fields such as max_clients and *_bytes_per_second
  635. // calls for range filtering, which is not yet supported in the
  636. // psiphon/server.MeekServer PrioritizeProxy provider.
  637. isPriority = b.config.PrioritizeProxy(
  638. int(announceRequest.Metrics.ProtocolVersion), geoIPData, apiParams)
  639. }
  640. // Await client offer.
  641. timeout := common.ValueOrDefault(
  642. time.Duration(atomic.LoadInt64(&b.proxyAnnounceTimeout)),
  643. brokerProxyAnnounceTimeout)
  644. // Adjust the timeout to respect any shorter maximum request timeouts for
  645. // the fronting provider.
  646. timeout = b.adjustRequestTimeout(logFields, timeout)
  647. announceCtx, cancelFunc := context.WithTimeout(ctx, timeout)
  648. defer cancelFunc()
  649. extendTransportTimeout(timeout)
  650. // Note that matcher.Announce assumes a monotonically increasing
  651. // announceCtx.Deadline input for each successive call.
  652. clientOffer, matchMetrics, err = b.matcher.Announce(
  653. announceCtx,
  654. proxyIP,
  655. &MatchAnnouncement{
  656. Properties: MatchProperties{
  657. IsPriority: isPriority,
  658. ProtocolVersion: announceRequest.Metrics.ProtocolVersion,
  659. CommonCompartmentIDs: commonCompartmentIDs,
  660. PersonalCompartmentIDs: announceRequest.PersonalCompartmentIDs,
  661. GeoIPData: geoIPData,
  662. NetworkType: GetNetworkType(announceRequest.Metrics.BaseAPIParameters),
  663. NATType: announceRequest.Metrics.NATType,
  664. PortMappingTypes: announceRequest.Metrics.PortMappingTypes,
  665. },
  666. ProxyID: initiatorID,
  667. ProxyMetrics: announceRequest.Metrics,
  668. ConnectionID: connectionID,
  669. })
  670. if err != nil {
  671. var limitError *MatcherLimitError
  672. limited := std_errors.As(err, &limitError)
  673. timeout := announceCtx.Err() == context.DeadlineExceeded
  674. if !limited && !timeout {
  675. return nil, errors.Trace(err)
  676. }
  677. // A no-match response is sent in the case of a timeout awaiting a
  678. // match. The faster-failing rate or entry limiting case also results
  679. // in a response, rather than an error return from handleProxyAnnounce,
  680. // so that the proxy doesn't receive a 404 and flag its BrokerClient as
  681. // having failed.
  682. //
  683. // When the timeout and limit case coincide, limit takes precedence in
  684. // the response.
  685. if timeout && !limited {
  686. // Note: the respective proxy and broker timeouts,
  687. // InproxyBrokerProxyAnnounceTimeout and
  688. // InproxyProxyAnnounceRequestTimeout in tactics, should be
  689. // configured so that the broker will timeout first and have an
  690. // opportunity to send this response before the proxy times out.
  691. timedOut = true
  692. } else {
  693. // Record the specific limit error in the proxy-announce broker event.
  694. limitedErr = err
  695. }
  696. responsePayload, err := MarshalProxyAnnounceResponse(
  697. &ProxyAnnounceResponse{
  698. TacticsPayload: tacticsPayload,
  699. Limited: limited,
  700. NoMatch: timeout && !limited,
  701. })
  702. if err != nil {
  703. return nil, errors.Trace(err)
  704. }
  705. return responsePayload, nil
  706. }
  707. // Select the protocol version. The matcher has already checked
  708. // negotiateProtocolVersion, so failure is not expected.
  709. negotiatedProtocolVersion, ok := negotiateProtocolVersion(
  710. announceRequest.Metrics.ProtocolVersion,
  711. clientOffer.Properties.ProtocolVersion,
  712. clientOffer.UseMediaStreams)
  713. if !ok {
  714. return nil, errors.TraceNew("unexpected negotiateProtocolVersion failure")
  715. }
  716. // Respond with the client offer. The proxy will follow up with an answer
  717. // request, which is relayed to the client, and then the WebRTC dial begins.
  718. // Limitation: as part of the client's tunnel establishment horse race, a
  719. // client may abort an in-proxy dial at any point. If the overall dial is
  720. // past the SDP exchange and aborted during the WebRTC connection
  721. // establishment, the client may leave the proxy's Proxy.proxyOneClient
  722. // dangling until timeout. Consider adding a signal from the client to
  723. // the proxy, relayed by the broker, that a dial is aborted.
  724. responsePayload, err := MarshalProxyAnnounceResponse(
  725. &ProxyAnnounceResponse{
  726. TacticsPayload: tacticsPayload,
  727. ConnectionID: connectionID,
  728. SelectedProtocolVersion: negotiatedProtocolVersion,
  729. ClientOfferSDP: clientOffer.ClientOfferSDP,
  730. ClientRootObfuscationSecret: clientOffer.ClientRootObfuscationSecret,
  731. DoDTLSRandomization: clientOffer.DoDTLSRandomization,
  732. UseMediaStreams: clientOffer.UseMediaStreams,
  733. TrafficShapingParameters: clientOffer.TrafficShapingParameters,
  734. NetworkProtocol: clientOffer.NetworkProtocol,
  735. DestinationAddress: clientOffer.DestinationAddress,
  736. })
  737. if err != nil {
  738. return nil, errors.Trace(err)
  739. }
  740. // Set the "failed match" trigger, which will progress towards clearing
  741. // the quality state for this proxyID unless quality tunnels are reported
  742. // soon enough after matches. This includes failure, by the proxy, to
  743. // return an proxy answer, as well as any tunnel failures after that.
  744. //
  745. // Failures are expected even for good quality proxies, due to cases such
  746. // as the in-proxy protocol losing the client tunnel establishment horse
  747. // race. There is a threshold number of failed matches that must be
  748. // reached before a quality state is cleared.
  749. b.proxyQualityState.Matched(proxyID, geoIPData.ASN)
  750. return responsePayload, nil
  751. }
  752. // handleClientOffer receives a client offer, awaits a matching client, and
  753. // returns the proxy answer. handleClientOffer has a shorter timeout than
  754. // handleProxyAnnounce since the client has supplied an SDP with STUN hole
  755. // punches which will expire; and, in general, the client is trying to
  756. // connect immediately and is also trying other candidates.
  757. func (b *Broker) handleClientOffer(
  758. ctx context.Context,
  759. extendTransportTimeout ExtendTransportTimeout,
  760. transportLogFields common.LogFields,
  761. clientIP string,
  762. geoIPData common.GeoIPData,
  763. initiatorID ID,
  764. requestPayload []byte) (retResponse []byte, retErr error) {
  765. // As a future enhancement, consider having proxies send offer SDPs with
  766. // announcements and clients long poll to await a match and then provide
  767. // an answer. This order of operations would make sense if client demand
  768. // is high and proxy supply is lower.
  769. //
  770. // Also see comment in Proxy.proxyOneClient for other alternative
  771. // approaches.
  772. // The client's session public key is ephemeral and is not logged.
  773. startTime := time.Now()
  774. var logFields common.LogFields
  775. var serverParams *serverParams
  776. var clientMatchOffer *MatchOffer
  777. var proxyMatchAnnouncement *MatchAnnouncement
  778. var proxyAnswer *MatchAnswer
  779. var matchMetrics *MatchMetrics
  780. var timedOut bool
  781. var limitedErr error
  782. // Always log the outcome.
  783. defer func() {
  784. if logFields == nil {
  785. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  786. }
  787. logFields["broker_event"] = "client-offer"
  788. logFields["broker_id"] = b.brokerID
  789. if serverParams != nil {
  790. logFields["destination_server_id"] = serverParams.serverID
  791. }
  792. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  793. if proxyAnswer != nil {
  794. // The presence of these fields indicate that a match was made,
  795. // the proxy delivered an answer, and the client was still
  796. // waiting for it.
  797. logFields["connection_id"] = proxyAnswer.ConnectionID
  798. logFields["client_nat_type"] = clientMatchOffer.Properties.NATType
  799. logFields["client_port_mapping_types"] = clientMatchOffer.Properties.PortMappingTypes
  800. logFields["proxy_nat_type"] = proxyMatchAnnouncement.Properties.NATType
  801. logFields["proxy_port_mapping_types"] = proxyMatchAnnouncement.Properties.PortMappingTypes
  802. logFields["preferred_nat_match"] =
  803. clientMatchOffer.Properties.IsPreferredNATMatch(&proxyMatchAnnouncement.Properties)
  804. // TODO: also log proxy ice_candidate_types and has_IPv6; for the
  805. // client, these values are added by ValidateAndGetLogFields.
  806. }
  807. if timedOut {
  808. logFields["timed_out"] = true
  809. }
  810. if retErr != nil {
  811. logFields["error"] = retErr.Error()
  812. } else if limitedErr != nil {
  813. logFields["error"] = limitedErr.Error()
  814. }
  815. logFields.Add(transportLogFields)
  816. logFields.Add(matchMetrics.GetMetrics())
  817. b.config.Logger.LogMetric(brokerMetricName, logFields)
  818. }()
  819. offerRequest, err := UnmarshalClientOfferRequest(requestPayload)
  820. if err != nil {
  821. return nil, errors.Trace(err)
  822. }
  823. // The filtered SDP is the request SDP with any invalid (bogon, unexpected
  824. // GeoIP) ICE candidates filtered out. In some cases, clients cannot
  825. // avoid submitting invalid candidates (see comment in
  826. // processSDPAddresses), so all invalid candidates are removed and the
  827. // remaining SDP is used. Filtered candidate information is logged in
  828. // logFields.
  829. //
  830. // In personal pairing mode, RFC 1918/4193 private IP addresses are
  831. // permitted in exchanged SDPs and not filtered out.
  832. var filteredSDP []byte
  833. filteredSDP, logFields, err = offerRequest.ValidateAndGetLogFields(
  834. int(atomic.LoadInt64(&b.maxCompartmentIDs)),
  835. b.config.LookupGeoIP,
  836. b.config.APIParameterValidator,
  837. b.config.APIParameterLogFieldFormatter,
  838. geoIPData)
  839. if err != nil {
  840. return nil, errors.Trace(err)
  841. }
  842. hasPersonalCompartmentIDs := len(offerRequest.PersonalCompartmentIDs) > 0
  843. offerSDP := offerRequest.ClientOfferSDP
  844. offerSDP.SDP = string(filteredSDP)
  845. // AllowClient may be used to disallow clients from certain geolocations
  846. // from offering. Clients are always allowed to match proxies with shared
  847. // personal compartment IDs.
  848. if !hasPersonalCompartmentIDs &&
  849. !b.config.AllowClient(geoIPData) {
  850. return nil, errors.TraceNew("client disallowed")
  851. }
  852. // Validate that the proxy destination specified by the client is a valid
  853. // dial address for a signed Psiphon server entry. This ensures a client
  854. // can't misuse a proxy to connect to arbitrary destinations.
  855. serverParams, err = b.validateDestination(
  856. geoIPData,
  857. offerRequest.PackedDestinationServerEntry,
  858. offerRequest.NetworkProtocol,
  859. offerRequest.DestinationAddress)
  860. if err != nil {
  861. // Record the specific error in the client-offer broker event.
  862. limitedErr = err
  863. // Return a response. This avoids returning a broker-client-resetting
  864. // 404 in cases including "invalid server entry tag", where a
  865. // legitimate client submits an unpruned, decommissioned server entry.
  866. responsePayload, err := MarshalClientOfferResponse(
  867. &ClientOfferResponse{Limited: true})
  868. if err != nil {
  869. return nil, errors.Trace(err)
  870. }
  871. return responsePayload, nil
  872. }
  873. // Return MustUpgrade when the client's protocol version is less than the
  874. // minimum required.
  875. if offerRequest.Metrics.ProtocolVersion < minimumClientProtocolVersion {
  876. responsePayload, err := MarshalClientOfferResponse(
  877. &ClientOfferResponse{
  878. NoMatch: true,
  879. MustUpgrade: true,
  880. })
  881. if err != nil {
  882. return nil, errors.Trace(err)
  883. }
  884. return responsePayload, nil
  885. }
  886. // Enqueue the client offer and await a proxy matching and subsequent
  887. // proxy answer.
  888. // The Client Offer timeout may be configured with a shorter value in
  889. // personal pairing mode, to facilitate a faster no-match result and
  890. // resulting broker rotation.
  891. var timeout time.Duration
  892. if hasPersonalCompartmentIDs {
  893. timeout = time.Duration(atomic.LoadInt64(&b.clientOfferPersonalTimeout))
  894. } else {
  895. timeout = time.Duration(atomic.LoadInt64(&b.clientOfferTimeout))
  896. }
  897. timeout = common.ValueOrDefault(timeout, brokerClientOfferTimeout)
  898. // Adjust the timeout to respect any shorter maximum request timeouts for
  899. // the fronting provider.
  900. timeout = b.adjustRequestTimeout(logFields, timeout)
  901. offerCtx, cancelFunc := context.WithTimeout(ctx, timeout)
  902. defer cancelFunc()
  903. extendTransportTimeout(timeout)
  904. clientMatchOffer = &MatchOffer{
  905. Properties: MatchProperties{
  906. ProtocolVersion: offerRequest.Metrics.ProtocolVersion,
  907. CommonCompartmentIDs: offerRequest.CommonCompartmentIDs,
  908. PersonalCompartmentIDs: offerRequest.PersonalCompartmentIDs,
  909. GeoIPData: geoIPData,
  910. NetworkType: GetNetworkType(offerRequest.Metrics.BaseAPIParameters),
  911. NATType: offerRequest.Metrics.NATType,
  912. PortMappingTypes: offerRequest.Metrics.PortMappingTypes,
  913. },
  914. ClientOfferSDP: offerSDP,
  915. ClientRootObfuscationSecret: offerRequest.ClientRootObfuscationSecret,
  916. DoDTLSRandomization: offerRequest.DoDTLSRandomization,
  917. UseMediaStreams: offerRequest.UseMediaStreams,
  918. TrafficShapingParameters: offerRequest.TrafficShapingParameters,
  919. NetworkProtocol: offerRequest.NetworkProtocol,
  920. DestinationAddress: offerRequest.DestinationAddress,
  921. DestinationServerID: serverParams.serverID,
  922. }
  923. proxyAnswer, proxyMatchAnnouncement, matchMetrics, err = b.matcher.Offer(
  924. offerCtx,
  925. clientIP,
  926. clientMatchOffer)
  927. if err != nil {
  928. var limitError *MatcherLimitError
  929. limited := std_errors.As(err, &limitError)
  930. timeout := offerCtx.Err() == context.DeadlineExceeded
  931. // A no-match response is sent in the case of a timeout awaiting a
  932. // match. The faster-failing rate or entry limiting case also results
  933. // in a response, rather than an error return from handleClientOffer,
  934. // so that the client doesn't receive a 404 and flag its BrokerClient
  935. // as having failed.
  936. //
  937. // When the timeout and limit cases coincide, limit takes precedence in
  938. // the response.
  939. if timeout {
  940. // Record the time out outcome in the client-offer broker event.
  941. //
  942. // Note: the respective client and broker timeouts,
  943. // InproxyBrokerClientOfferTimeout and
  944. // InproxyClientOfferRequestTimeout in tactics, should be configured
  945. // so that the broker will timeout first and have an opportunity to
  946. // send this response before the client times out.
  947. timedOut = true
  948. }
  949. if limited {
  950. // Record the specific limit error in the client-offer broker event.
  951. limitedErr = err
  952. }
  953. if !limited && !timeout {
  954. // If matcher.Offer failed for some other reason, default to
  955. // returning Limited in the response. As currently implemented,
  956. // when clients receive the Limited flag, they will fail the
  957. // in-proxy dial without retrying, but retain their broker client.
  958. //
  959. // While the matcher.Offer failure scenarios may include an
  960. // internal error, they also include the "no answer" case where a
  961. // proxy fails to produce an answer. For the "no answer" case,
  962. // Limited is preferred over returning NoMatch, which can trigger
  963. // a broker client cycle.
  964. //
  965. // TODO: add a new flag to signal to the client that it may retry
  966. // in the "no answer" case.
  967. limited = true
  968. limitedErr = err
  969. }
  970. responsePayload, err := MarshalClientOfferResponse(
  971. &ClientOfferResponse{
  972. Limited: limited,
  973. NoMatch: timeout && !limited,
  974. })
  975. if err != nil {
  976. return nil, errors.Trace(err)
  977. }
  978. return responsePayload, nil
  979. }
  980. // Log the type of compartment matching that occurred. As
  981. // PersonalCompartmentIDs are user-generated and shared, actual matching
  982. // values are not logged as they may link users.
  983. // TODO: log matching common compartment IDs?
  984. matchedCommonCompartments := HaveCommonIDs(
  985. proxyMatchAnnouncement.Properties.CommonCompartmentIDs,
  986. clientMatchOffer.Properties.CommonCompartmentIDs)
  987. matchedPersonalCompartments := HaveCommonIDs(
  988. proxyMatchAnnouncement.Properties.PersonalCompartmentIDs,
  989. clientMatchOffer.Properties.PersonalCompartmentIDs)
  990. // Initiate a BrokerServerReport, which sends important information
  991. // about the connection, including the original client IP, plus other
  992. // values to be logged with server_tunne, to the server. The report is
  993. // sent through a secure session established between the broker and the
  994. // server, relayed by the client.
  995. //
  996. // The first relay message will be embedded in the Psiphon handshake. The
  997. // broker may already have an established session with the server. In
  998. // this case, only only that initial message is required. The
  999. // BrokerServerReport is a one-way message, which avoids extra untunneled
  1000. // client/broker traffic.
  1001. //
  1002. // Limitations, due to the one-way message:
  1003. // - the broker can't actively clean up pendingServerReports as
  1004. // tunnels are established and must rely on cache expiry.
  1005. // - the broker doesn't learn that the server accepted the report, and
  1006. // so cannot log a final connection status or signal the proxy to
  1007. // disconnect the client in any misuse cases.
  1008. //
  1009. // As a future enhancement, consider adding a _tunneled_ client relay
  1010. // of a server response acknowledging the broker report.
  1011. relayPacket, err := b.initiateRelayedServerReport(
  1012. serverParams,
  1013. proxyAnswer.ConnectionID,
  1014. &BrokerServerReport{
  1015. ProxyID: proxyAnswer.ProxyID,
  1016. ConnectionID: proxyAnswer.ConnectionID,
  1017. MatchedCommonCompartments: matchedCommonCompartments,
  1018. MatchedPersonalCompartments: matchedPersonalCompartments,
  1019. ClientNATType: clientMatchOffer.Properties.NATType,
  1020. ClientPortMappingTypes: clientMatchOffer.Properties.PortMappingTypes,
  1021. ClientIP: clientIP,
  1022. ProxyIP: proxyAnswer.ProxyIP,
  1023. // ProxyMetrics includes proxy NAT and port mapping types.
  1024. ProxyMetrics: proxyMatchAnnouncement.ProxyMetrics,
  1025. ProxyIsPriority: proxyMatchAnnouncement.Properties.IsPriority,
  1026. })
  1027. if err != nil {
  1028. return nil, errors.Trace(err)
  1029. }
  1030. // Select the protocol version. The matcher has already checked
  1031. // negotiateProtocolVersion, so failure is not expected.
  1032. negotiatedProtocolVersion, ok := negotiateProtocolVersion(
  1033. proxyMatchAnnouncement.Properties.ProtocolVersion,
  1034. offerRequest.Metrics.ProtocolVersion,
  1035. offerRequest.UseMediaStreams)
  1036. if !ok {
  1037. return nil, errors.TraceNew("unexpected negotiateProtocolVersion failure")
  1038. }
  1039. // Respond with the proxy answer and initial broker/server session packet.
  1040. responsePayload, err := MarshalClientOfferResponse(
  1041. &ClientOfferResponse{
  1042. ConnectionID: proxyAnswer.ConnectionID,
  1043. SelectedProtocolVersion: negotiatedProtocolVersion,
  1044. ProxyAnswerSDP: proxyAnswer.ProxyAnswerSDP,
  1045. RelayPacketToServer: relayPacket,
  1046. })
  1047. if err != nil {
  1048. return nil, errors.Trace(err)
  1049. }
  1050. return responsePayload, nil
  1051. }
  1052. // handleProxyAnswer receives a proxy answer and delivers it to the waiting
  1053. // client.
  1054. func (b *Broker) handleProxyAnswer(
  1055. ctx context.Context,
  1056. extendTransportTimeout ExtendTransportTimeout,
  1057. transportLogFields common.LogFields,
  1058. proxyIP string,
  1059. geoIPData common.GeoIPData,
  1060. initiatorID ID,
  1061. requestPayload []byte) (retResponse []byte, retErr error) {
  1062. startTime := time.Now()
  1063. var logFields common.LogFields
  1064. var proxyAnswer *MatchAnswer
  1065. var answerError string
  1066. // The proxy ID is an implicit parameter: it's the proxy's session public
  1067. // key.
  1068. proxyID := initiatorID
  1069. // Always log the outcome.
  1070. defer func() {
  1071. if logFields == nil {
  1072. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  1073. }
  1074. logFields["broker_event"] = "proxy-answer"
  1075. logFields["broker_id"] = b.brokerID
  1076. logFields["proxy_id"] = proxyID
  1077. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  1078. if proxyAnswer != nil {
  1079. logFields["connection_id"] = proxyAnswer.ConnectionID
  1080. }
  1081. if answerError != "" {
  1082. // This is a proxy-reported error that occurred while creating the answer.
  1083. logFields["answer_error"] = answerError
  1084. }
  1085. if retErr != nil {
  1086. logFields["error"] = retErr.Error()
  1087. }
  1088. logFields.Add(transportLogFields)
  1089. b.config.Logger.LogMetric(brokerMetricName, logFields)
  1090. }()
  1091. answerRequest, err := UnmarshalProxyAnswerRequest(requestPayload)
  1092. if err != nil {
  1093. return nil, errors.Trace(err)
  1094. }
  1095. // The filtered SDP is the request SDP with any invalid (bogon, unexpected
  1096. // GeoIP) ICE candidates filtered out. In some cases, proxies cannot
  1097. // avoid submitting invalid candidates (see comment in
  1098. // processSDPAddresses), so all invalid candidates are removed and the
  1099. // remaining SDP is used. Filtered candidate information is logged in
  1100. // logFields.
  1101. //
  1102. // In personal pairing mode, RFC 1918/4193 private IP addresses are
  1103. // permitted in exchanged SDPs and not filtered out.
  1104. hasPersonalCompartmentIDs, err := b.matcher.AnnouncementHasPersonalCompartmentIDs(
  1105. initiatorID, answerRequest.ConnectionID)
  1106. if err != nil {
  1107. return nil, errors.Trace(err)
  1108. }
  1109. var filteredSDP []byte
  1110. filteredSDP, logFields, err = answerRequest.ValidateAndGetLogFields(
  1111. b.config.LookupGeoIP,
  1112. b.config.APIParameterValidator,
  1113. b.config.APIParameterLogFieldFormatter,
  1114. geoIPData,
  1115. hasPersonalCompartmentIDs)
  1116. if err != nil {
  1117. return nil, errors.Trace(err)
  1118. }
  1119. answerSDP := answerRequest.ProxyAnswerSDP
  1120. answerSDP.SDP = string(filteredSDP)
  1121. if answerRequest.AnswerError != "" {
  1122. // The proxy failed to create an answer.
  1123. answerError = answerRequest.AnswerError
  1124. b.matcher.AnswerError(initiatorID, answerRequest.ConnectionID)
  1125. } else {
  1126. // Deliver the answer to the client.
  1127. // Note that neither ProxyID nor ProxyIP is returned to the client.
  1128. // These fields are used internally in the matcher.
  1129. proxyAnswer = &MatchAnswer{
  1130. ProxyIP: proxyIP,
  1131. ProxyID: initiatorID,
  1132. ConnectionID: answerRequest.ConnectionID,
  1133. ProxyAnswerSDP: answerSDP,
  1134. }
  1135. err = b.matcher.Answer(proxyAnswer)
  1136. if err != nil {
  1137. return nil, errors.Trace(err)
  1138. }
  1139. }
  1140. // There is no data in this response, it's simply an acknowledgement that
  1141. // the answer was received. Upon receiving the response, the proxy should
  1142. // begin the WebRTC dial operation.
  1143. responsePayload, err := MarshalProxyAnswerResponse(
  1144. &ProxyAnswerResponse{})
  1145. if err != nil {
  1146. return nil, errors.Trace(err)
  1147. }
  1148. return responsePayload, nil
  1149. }
  1150. // handleServerProxyQuality receives, from servers, proxy tunnel quality and
  1151. // records that in the proxy quality state that is used to prioritize
  1152. // well-performing proxies.
  1153. func (b *Broker) handleServerProxyQuality(
  1154. ctx context.Context,
  1155. extendTransportTimeout ExtendTransportTimeout,
  1156. transportLogFields common.LogFields,
  1157. proxyIP string,
  1158. geoIPData common.GeoIPData,
  1159. initiatorID ID,
  1160. requestPayload []byte) (retResponse []byte, retErr error) {
  1161. startTime := time.Now()
  1162. var logFields common.LogFields
  1163. // Only known, trusted Psiphon server initiators are allowed to send proxy
  1164. // quality requests. knownServerInitiatorIDs is populated with the
  1165. // Curve25519 public keys -- initiator IDs -- corresponding to the
  1166. // session public keys found in signed Psiphon server entries.
  1167. //
  1168. // Currently, knownServerInitiatorIDs is populated with destination server
  1169. // entries received in client offers, so the broker must first receive a
  1170. // client offer before a given server is trusted, which means
  1171. // that "invalid initiator" errors may occur, and some quality requests
  1172. // may be dropped, in some expected situations, including a broker restart.
  1173. // serverID is the server entry diagnostic ID of the server.
  1174. serverIDValue, ok := b.knownServerInitiatorIDs.Load(initiatorID)
  1175. if !ok {
  1176. return nil, errors.TraceNew("invalid initiator")
  1177. }
  1178. serverID := serverIDValue.(string)
  1179. // Always log the outcome.
  1180. defer func() {
  1181. // Typically, a server will send the same proxy quality request to all
  1182. // brokers. For the one "broadcast" request, server-proxy-quality is
  1183. // logged by each broker, as an indication that every server/broker
  1184. // request pair is successful.
  1185. //
  1186. // TODO: log more details from ServerProxyQualityRequest.QualityCounts?
  1187. if logFields == nil {
  1188. logFields = common.LogFields{}
  1189. }
  1190. logFields["broker_event"] = "server-proxy-quality"
  1191. logFields["broker_id"] = b.brokerID
  1192. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  1193. logFields["server_id"] = serverID
  1194. if retErr != nil {
  1195. logFields["error"] = retErr.Error()
  1196. }
  1197. logFields.Add(transportLogFields)
  1198. b.config.Logger.LogMetric(brokerMetricName, logFields)
  1199. }()
  1200. qualityRequest, err := UnmarshalServerProxyQualityRequest(requestPayload)
  1201. if err != nil {
  1202. return nil, errors.Trace(err)
  1203. }
  1204. logFields, err = qualityRequest.ValidateAndGetLogFields()
  1205. if err != nil {
  1206. return nil, errors.Trace(err)
  1207. }
  1208. // Add the quality counts into the existing proxy quality state.
  1209. //
  1210. // The counts are ignored when proxy quality is disabled, but an
  1211. // anknowledgement is still returned to the server.
  1212. if b.enableProxyQuality.Load() {
  1213. for proxyKey, counts := range qualityRequest.QualityCounts {
  1214. b.proxyQualityState.AddQuality(proxyKey, counts)
  1215. }
  1216. }
  1217. // There is no data in this response, it's simply an acknowledgement that
  1218. // the request was received.
  1219. responsePayload, err := MarshalServerProxyQualityResponse(
  1220. &ServerProxyQualityResponse{})
  1221. if err != nil {
  1222. return nil, errors.Trace(err)
  1223. }
  1224. return responsePayload, nil
  1225. }
  1226. // handleClientRelayedPacket facilitates broker/server sessions. The initial
  1227. // packet from the broker is sent to the client in the ClientOfferResponse.
  1228. // The client sends that to the server in the Psiphon handshake. If the
  1229. // session was already established, the relay ends there. Otherwise, the
  1230. // client receives any packet sent back by the server and that server packet
  1231. // is then delivered to the broker in a ClientRelayedPacketRequest. If the
  1232. // session needs to be [re-]negotiated, there are additional
  1233. // ClientRelayedPacket round trips until the session is established and the
  1234. // BrokerServerReport is securely exchanged between the broker and server.
  1235. func (b *Broker) handleClientRelayedPacket(
  1236. ctx context.Context,
  1237. extendTransportTimeout ExtendTransportTimeout,
  1238. transportLogFields common.LogFields,
  1239. geoIPData common.GeoIPData,
  1240. initiatorID ID,
  1241. requestPayload []byte) (retResponse []byte, retErr error) {
  1242. startTime := time.Now()
  1243. var logFields common.LogFields
  1244. var relayedPacketRequest *ClientRelayedPacketRequest
  1245. var serverID string
  1246. // Always log the outcome.
  1247. defer func() {
  1248. if logFields == nil {
  1249. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  1250. }
  1251. logFields["broker_event"] = "client-relayed-packet"
  1252. logFields["broker_id"] = b.brokerID
  1253. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  1254. if relayedPacketRequest != nil {
  1255. logFields["connection_id"] = relayedPacketRequest.ConnectionID
  1256. }
  1257. if serverID != "" {
  1258. logFields["destination_server_id"] = serverID
  1259. }
  1260. if retErr != nil {
  1261. logFields["error"] = retErr.Error()
  1262. }
  1263. logFields.Add(transportLogFields)
  1264. b.config.Logger.LogMetric(brokerMetricName, logFields)
  1265. }()
  1266. relayedPacketRequest, err := UnmarshalClientRelayedPacketRequest(requestPayload)
  1267. if err != nil {
  1268. return nil, errors.Trace(err)
  1269. }
  1270. logFields, err = relayedPacketRequest.ValidateAndGetLogFields(
  1271. b.config.APIParameterValidator,
  1272. b.config.APIParameterLogFieldFormatter,
  1273. geoIPData)
  1274. if err != nil {
  1275. return nil, errors.Trace(err)
  1276. }
  1277. // The relay state is associated with the connection ID.
  1278. strConnectionID := string(relayedPacketRequest.ConnectionID[:])
  1279. entry, ok := b.pendingServerReports.Get(strConnectionID)
  1280. if !ok {
  1281. // The relay state is not found; it may have been evicted from the
  1282. // cache. The client will receive a generic error in this case and
  1283. // should stop relaying. Assuming the server is configured to require
  1284. // a BrokerServerReport, the tunnel will be terminated, so the
  1285. // client should also abandon the dial.
  1286. return nil, errors.TraceNew("no pending report")
  1287. }
  1288. pendingServerReport := entry.(*pendingServerReport)
  1289. serverID = pendingServerReport.serverID
  1290. // When the broker tried to use an existing session that was expired on the
  1291. // server, the server will respond here with a signed session reset token. The
  1292. // broker resets the session and starts to establish a new session.
  1293. //
  1294. // The non-waiting session establishment mode is used for broker/server
  1295. // sessions: if multiple clients concurrently try to relay new sessions,
  1296. // all establishments will happen in parallel without forcing any clients
  1297. // to wait for one client to lead the establishment. The last established
  1298. // session will be retained for reuse.
  1299. //
  1300. // If there is an error, the relayed packet is invalid. Drop the packet
  1301. // and return an error to be logged. Do _not_ reset the session,
  1302. // otherwise a malicious client could interrupt a valid broker/server
  1303. // session with a malformed packet.
  1304. // Next is given a nil ctx since we're not waiting for any other client to
  1305. // establish the session.
  1306. out, _, err := pendingServerReport.roundTrip.Next(
  1307. nil, relayedPacketRequest.PacketFromServer)
  1308. if err != nil {
  1309. return nil, errors.Trace(err)
  1310. }
  1311. if out == nil {
  1312. // The BrokerServerReport is a one-way message, As a result, the relay
  1313. // never ends with broker receiving a response; it's either
  1314. // (re)handshaking or sending the one-way report.
  1315. return nil, errors.TraceNew("unexpected nil packet")
  1316. }
  1317. // Return the next broker packet for the client to relay to the server.
  1318. // When it receives a nil PacketToServer, the client will stop relaying.
  1319. responsePayload, err := MarshalClientRelayedPacketResponse(
  1320. &ClientRelayedPacketResponse{
  1321. PacketToServer: out,
  1322. })
  1323. if err != nil {
  1324. return nil, errors.Trace(err)
  1325. }
  1326. return responsePayload, nil
  1327. }
  1328. func (b *Broker) adjustRequestTimeout(
  1329. logFields common.LogFields, timeout time.Duration) time.Duration {
  1330. // Adjust long-polling request timeouts to respect any maximum request
  1331. // timeout supported by the provider fronting the request.
  1332. //
  1333. // Limitation: the client is trusted to provide the correct fronting
  1334. // provider ID.
  1335. maxRequestTimeouts, ok := b.maxRequestTimeouts.Load().(map[string]time.Duration)
  1336. if !ok || maxRequestTimeouts == nil {
  1337. return timeout
  1338. }
  1339. frontingProviderID, ok := logFields["fronting_provider_id"].(string)
  1340. if !ok {
  1341. return timeout
  1342. }
  1343. maxRequestTimeout, ok := maxRequestTimeouts[frontingProviderID]
  1344. if !ok || maxRequestTimeout <= 0 || timeout <= maxRequestTimeout {
  1345. return timeout
  1346. }
  1347. return maxRequestTimeout
  1348. }
  1349. type pendingServerReport struct {
  1350. serverID string
  1351. serverReport *BrokerServerReport
  1352. roundTrip *InitiatorRoundTrip
  1353. }
  1354. func (b *Broker) initiateRelayedServerReport(
  1355. serverParams *serverParams,
  1356. connectionID ID,
  1357. serverReport *BrokerServerReport) ([]byte, error) {
  1358. reportPayload, err := MarshalBrokerServerReport(serverReport)
  1359. if err != nil {
  1360. return nil, errors.Trace(err)
  1361. }
  1362. // Force a new, concurrent session establishment with the server even if
  1363. // another handshake is already in progess, relayed by some other client.
  1364. // This ensures clients don't block waiting for other client relays
  1365. // through other tunnels. The last established session will be retained
  1366. // for reuse.
  1367. waitToShareSession := false
  1368. roundTrip, err := b.initiatorSessions.NewRoundTrip(
  1369. serverParams.sessionPublicKey,
  1370. serverParams.sessionRootObfuscationSecret,
  1371. waitToShareSession,
  1372. reportPayload)
  1373. if err != nil {
  1374. return nil, errors.Trace(err)
  1375. }
  1376. relayPacket, _, err := roundTrip.Next(nil, nil)
  1377. if err != nil {
  1378. return nil, errors.Trace(err)
  1379. }
  1380. strConnectionID := string(connectionID[:])
  1381. b.pendingServerReports.Set(
  1382. strConnectionID,
  1383. &pendingServerReport{
  1384. serverID: serverParams.serverID,
  1385. serverReport: serverReport,
  1386. roundTrip: roundTrip,
  1387. },
  1388. time.Duration(atomic.LoadInt64(&b.pendingServerReportsTTL)))
  1389. return relayPacket, nil
  1390. }
  1391. type serverParams struct {
  1392. serverID string
  1393. sessionPublicKey SessionPublicKey
  1394. sessionRootObfuscationSecret ObfuscationSecret
  1395. }
  1396. // validateDestination checks that the client's specified proxy dial
  1397. // destination is valid destination address for a tunnel protocol in the
  1398. // specified signed and valid Psiphon server entry.
  1399. func (b *Broker) validateDestination(
  1400. geoIPData common.GeoIPData,
  1401. packedDestinationServerEntry []byte,
  1402. networkProtocol NetworkProtocol,
  1403. destinationAddress string) (*serverParams, error) {
  1404. var packedServerEntry protocol.PackedServerEntryFields
  1405. err := cbor.Unmarshal(packedDestinationServerEntry, &packedServerEntry)
  1406. if err != nil {
  1407. return nil, errors.Trace(err)
  1408. }
  1409. serverEntryFields, err := protocol.DecodePackedServerEntryFields(packedServerEntry)
  1410. if err != nil {
  1411. return nil, errors.Trace(err)
  1412. }
  1413. // Strip any unsigned fields, which could be forged by the client. In
  1414. // particular, this includes the server entry tag, which, in some cases,
  1415. // is locally populated by a client for its own reference.
  1416. serverEntryFields.RemoveUnsignedFields()
  1417. // Check that the server entry is signed by Psiphon. Otherwise a client
  1418. // could manufacture a server entry corresponding to an arbitrary dial
  1419. // destination.
  1420. err = serverEntryFields.VerifySignature(
  1421. b.config.ServerEntrySignaturePublicKey)
  1422. if err != nil {
  1423. return nil, errors.Trace(err)
  1424. }
  1425. // The server entry tag must be set and signed by Psiphon, as local,
  1426. // client derived tags are unsigned and untrusted.
  1427. serverEntryTag := serverEntryFields.GetTag()
  1428. if serverEntryTag == "" {
  1429. return nil, errors.TraceNew("missing server entry tag")
  1430. }
  1431. // Check that the server entry tag is on a list of active and valid
  1432. // Psiphon server entry tags. This ensures that an obsolete entry for a
  1433. // pruned server cannot by misused by a client to proxy to what's no
  1434. // longer a Psiphon server.
  1435. if !b.config.IsValidServerEntryTag(serverEntryTag) {
  1436. return nil, errors.TraceNew("invalid server entry tag")
  1437. }
  1438. serverID := serverEntryFields.GetDiagnosticID()
  1439. serverEntry, err := serverEntryFields.GetServerEntry()
  1440. if err != nil {
  1441. return nil, errors.Trace(err)
  1442. }
  1443. // Validate the dial host (IP or domain) and port matches a tunnel
  1444. // protocol offered by the server entry.
  1445. destHost, destPort, err := net.SplitHostPort(destinationAddress)
  1446. if err != nil {
  1447. return nil, errors.Trace(err)
  1448. }
  1449. destPortNum, err := strconv.Atoi(destPort)
  1450. if err != nil {
  1451. return nil, errors.Trace(err)
  1452. }
  1453. // For domain fronted cases, since we can't verify the Host header, access
  1454. // is strictly to limited to targeted clients. Clients should use tactics
  1455. // to avoid disallowed domain dial address cases, but here the broker
  1456. // enforces it.
  1457. //
  1458. // TODO: this issue could be further mitigated with a server
  1459. // acknowledgement of the broker's report, with no acknowledgement
  1460. // followed by signaling the proxy to terminate client connection.
  1461. // This assumes that any domain dial is for domain fronting.
  1462. isDomain := net.ParseIP(destHost) == nil
  1463. if isDomain && !b.config.AllowDomainFrontedDestinations(geoIPData) {
  1464. return nil, errors.TraceNew("domain fronted destinations disallowed")
  1465. }
  1466. // The server entry must include an in-proxy tunnel protocol capability
  1467. // and corresponding dial port number. In-proxy capacity may be set for
  1468. // only a subset of all Psiphon servers, to limited the number of servers
  1469. // a proxy can observe and enumerate. Well-behaved clients will not send
  1470. // any server entries lacking this capability, but here the broker
  1471. // enforces it.
  1472. if !serverEntry.IsValidInproxyDialAddress(networkProtocol.String(), destHost, destPortNum) {
  1473. return nil, errors.TraceNew("invalid destination address")
  1474. }
  1475. // Extract and return the key material to be used for the secure session
  1476. // and BrokerServer exchange between the broker and the Psiphon server
  1477. // corresponding to this server entry.
  1478. params := &serverParams{
  1479. serverID: serverID,
  1480. }
  1481. params.sessionPublicKey, err = SessionPublicKeyFromString(
  1482. serverEntry.InproxySessionPublicKey)
  1483. if err != nil {
  1484. return nil, errors.Trace(err)
  1485. }
  1486. params.sessionRootObfuscationSecret, err = ObfuscationSecretFromString(
  1487. serverEntry.InproxySessionRootObfuscationSecret)
  1488. if err != nil {
  1489. return nil, errors.Trace(err)
  1490. }
  1491. // Record that this server is known and trusted ServerProxyQualityRequest
  1492. // sender. The serverID is stored for logging in handleServerProxyQuality.
  1493. //
  1494. // There is no expiry for knownServerInitiatorIDs entries, and they will
  1495. // clear only if the broker is restarted (which is the same lifetime as
  1496. // ServerEntrySignaturePublicKey).
  1497. //
  1498. // Limitation: in time, the above IsValidServerEntryTag check could become
  1499. // false for a retired server, while its entry remains in
  1500. // knownServerInitiatorIDs. However, unlike the case of a recycled
  1501. // Psiphon server IP being used as a proxy destination, it's safer to
  1502. // assume that a retired server's session private key does not become
  1503. // exposed.
  1504. serverInitiatorID, err := params.sessionPublicKey.ToCurve25519()
  1505. if err != nil {
  1506. return nil, errors.Trace(err)
  1507. }
  1508. // For hosts running a single psiphond with multiple server entries, there
  1509. // will be multiple possible serverIDs for one serverInitiatorID. Don't
  1510. // overwrite any existing entry; keep the first observed serverID for
  1511. // more stable logging.
  1512. _, _ = b.knownServerInitiatorIDs.LoadOrStore(ID(serverInitiatorID), serverID)
  1513. return params, nil
  1514. }
  1515. func (b *Broker) isCommonCompartmentIDHashingInitialized() bool {
  1516. b.commonCompartmentsMutex.Lock()
  1517. defer b.commonCompartmentsMutex.Unlock()
  1518. return b.commonCompartments != nil
  1519. }
  1520. func (b *Broker) initializeCommonCompartmentIDHashing(
  1521. commonCompartmentIDs []ID) error {
  1522. b.commonCompartmentsMutex.Lock()
  1523. defer b.commonCompartmentsMutex.Unlock()
  1524. // At least one common compartment ID is required. At a minimum, one ID
  1525. // will be used and distributed to clients via tactics, limiting matching
  1526. // to those clients targeted to receive that tactic parameters.
  1527. if len(commonCompartmentIDs) == 0 {
  1528. return errors.TraceNew("missing common compartment IDs")
  1529. }
  1530. // The consistent package doesn't allow duplicate members.
  1531. checkDup := make(map[ID]bool, len(commonCompartmentIDs))
  1532. for _, compartmentID := range commonCompartmentIDs {
  1533. if checkDup[compartmentID] {
  1534. return errors.TraceNew("duplicate common compartment IDs")
  1535. }
  1536. checkDup[compartmentID] = true
  1537. }
  1538. // Proxies without personal compartment IDs are randomly assigned to the
  1539. // set of common, Psiphon-specified, compartment IDs. These common
  1540. // compartment IDs are then distributed to targeted clients through
  1541. // tactics or embedded in OSLs, to limit access to proxies.
  1542. //
  1543. // Use consistent hashing in an effort to keep a consistent assignment of
  1544. // proxies (as specified by proxy ID, which covers all announcements for
  1545. // a single proxy). This is more of a concern for long-lived, permanent
  1546. // proxies that are not behind any NAT.
  1547. //
  1548. // Even with consistent hashing, a subset of proxies will still change
  1549. // assignment when CommonCompartmentIDs changes.
  1550. consistentMembers := make([]consistent.Member, len(commonCompartmentIDs))
  1551. for i, compartmentID := range commonCompartmentIDs {
  1552. consistentMembers[i] = consistentMember(compartmentID.String())
  1553. }
  1554. b.commonCompartments = consistent.New(
  1555. consistentMembers,
  1556. consistent.Config{
  1557. PartitionCount: len(consistentMembers),
  1558. ReplicationFactor: 1,
  1559. Load: 1,
  1560. Hasher: xxhasher{},
  1561. })
  1562. return nil
  1563. }
  1564. // xxhasher wraps github.com/cespare/xxhash.Sum64 in the interface expected by
  1565. // github.com/buraksezer/consistent. xxhash is a high quality hash function
  1566. // used in github.com/buraksezer/consistent examples.
  1567. type xxhasher struct{}
  1568. func (h xxhasher) Sum64(data []byte) uint64 {
  1569. return xxhash.Sum64(data)
  1570. }
  1571. // consistentMember wraps the string type with the interface expected by
  1572. // github.com/buraksezer/consistent.
  1573. type consistentMember string
  1574. func (m consistentMember) String() string {
  1575. return string(m)
  1576. }
  1577. func (b *Broker) selectCommonCompartmentID(proxyID ID) (ID, error) {
  1578. b.commonCompartmentsMutex.Lock()
  1579. defer b.commonCompartmentsMutex.Unlock()
  1580. compartmentID, err := IDFromString(
  1581. b.commonCompartments.LocateKey(proxyID[:]).String())
  1582. if err != nil {
  1583. return compartmentID, errors.Trace(err)
  1584. }
  1585. return compartmentID, nil
  1586. }