broker.go 66 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "context"
  22. std_errors "errors"
  23. "net"
  24. "strconv"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/Psiphon-Labs/consistent"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  30. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  31. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  32. "github.com/cespare/xxhash"
  33. lrucache "github.com/cognusion/go-cache-lru"
  34. "github.com/fxamacker/cbor/v2"
  35. )
  36. const (
  37. // BrokerMaxRequestBodySize is the maximum request size, that should be
  38. // enforced by the provided broker transport.
  39. BrokerMaxRequestBodySize = 65536
  40. // BrokerEndPointName is the standard name for referencing an endpoint
  41. // that services broker requests.
  42. BrokerEndPointName = "inproxy-broker"
  43. brokerProxyAnnounceTimeout = 2 * time.Minute
  44. brokerClientOfferTimeout = 10 * time.Second
  45. brokerPendingServerReportsTTL = 60 * time.Second
  46. brokerPendingServerReportsMaxSize = 100000
  47. brokerMetricName = "inproxy_broker"
  48. )
  49. // LookupGeoIP is a callback for providing GeoIP lookup service.
  50. type LookupGeoIP func(IP string) common.GeoIPData
  51. // ExtendTransportTimeout is a callback that extends the timeout for a
  52. // server-side broker transport handler, facilitating request-specific
  53. // timeouts including long-polling for proxy announcements.
  54. type ExtendTransportTimeout func(timeout time.Duration)
  55. // GetTacticsPayload is a callback which returns the appropriate tactics
  56. // payload for the specified client/proxy GeoIP data and API parameters.
  57. type GetTacticsPayload func(
  58. common.GeoIPData, common.APIParameters) ([]byte, string, error)
  59. // Broker is the in-proxy broker component, which matches clients and proxies
  60. // and provides WebRTC signaling functionalty.
  61. //
  62. // Both clients and proxies send requests to the broker to obtain matches and
  63. // exchange WebRTC SDPs. Broker does not implement a transport or obfuscation
  64. // layer; instead that is provided by the HandleSessionPacket caller. A
  65. // typical implementation would provide a domain fronted web server which
  66. // runs a Broker and calls Broker.HandleSessionPacket to handle web requests
  67. // encapsulating secure session packets.
  68. type Broker struct {
  69. config *BrokerConfig
  70. brokerID ID
  71. initiatorSessions *InitiatorSessions
  72. responderSessions *ResponderSessions
  73. matcher *Matcher
  74. pendingServerReports *lrucache.Cache
  75. proxyQualityState *ProxyQualityState
  76. knownServerInitiatorIDs sync.Map
  77. commonCompartmentsMutex sync.Mutex
  78. commonCompartments *consistent.Consistent
  79. proxyAnnounceTimeout int64
  80. clientOfferTimeout int64
  81. clientOfferPersonalTimeout int64
  82. pendingServerReportsTTL int64
  83. maxRequestTimeouts atomic.Value
  84. maxCompartmentIDs int64
  85. enableProxyQualityMutex sync.Mutex
  86. enableProxyQuality atomic.Bool
  87. }
  88. // BrokerConfig specifies the configuration for a Broker.
  89. type BrokerConfig struct {
  90. // Logger is used to log events.
  91. Logger common.Logger
  92. // CommonCompartmentIDs is a list of common compartment IDs to apply to
  93. // proxies that announce without personal compartment ID. Common
  94. // compartment IDs are managed by Psiphon and distributed to clients via
  95. // tactics or embedded in OSLs. Clients must supply a valid compartment
  96. // ID to match with a proxy.
  97. //
  98. // A BrokerConfig must supply at least one compartment ID, or
  99. // SetCompartmentIDs must be called with at least one compartment ID
  100. // before calling Start.
  101. //
  102. // When only one, single common compartment ID is configured, it can serve
  103. // as an (obfuscation) secret that clients must obtain, via tactics, to
  104. // enable in-proxy participation.
  105. CommonCompartmentIDs []ID
  106. // AllowProxy is a callback which can indicate whether a proxy with the
  107. // given GeoIP data is allowed to match with common compartment ID
  108. // clients. Proxies with personal compartment IDs are always allowed.
  109. AllowProxy func(common.GeoIPData) bool
  110. // PrioritizeProxy is a callback which can indicate whether proxy
  111. // announcements from proxies with the specified in-proxy protocol
  112. // version, GeoIPData, and APIParameters should be prioritized in the
  113. // matcher queue. Priority proxy announcements match ahead of other proxy
  114. // announcements, regardless of announcement age/deadline. Priority
  115. // status takes precedence over preferred NAT matching. Prioritization
  116. // applies only to common compartment IDs and not personal pairing mode.
  117. PrioritizeProxy func(int, common.GeoIPData, common.APIParameters) bool
  118. // AllowClient is a callback which can indicate whether a client with the
  119. // given GeoIP data is allowed to match with common compartment ID
  120. // proxies. Clients are always allowed to match based on personal
  121. // compartment ID.
  122. AllowClient func(common.GeoIPData) bool
  123. // AllowDomainFrontedDestinations is a callback which can indicate whether
  124. // a client with the given GeoIP data is allowed to specify a proxied
  125. // destination for a domain fronted protocol. When false, only direct
  126. // address destinations are allowed.
  127. //
  128. // While tactics may may be set to instruct clients to use only direct
  129. // server tunnel protocols, with IP address destinations, this callback
  130. // adds server-side enforcement.
  131. AllowDomainFrontedDestinations func(common.GeoIPData) bool
  132. // AllowMatch is a callback which can indicate whether a proxy and client
  133. // pair, with the given, respective GeoIP data, is allowed to match
  134. // together. Pairs are always allowed to match based on personal
  135. // compartment ID.
  136. AllowMatch func(common.GeoIPData, common.GeoIPData) bool
  137. // LookupGeoIP provides GeoIP lookup service.
  138. LookupGeoIP LookupGeoIP
  139. // APIParameterValidator is a callback that validates base API metrics.
  140. APIParameterValidator common.APIParameterValidator
  141. // APIParameterValidator is a callback that formats base API metrics.
  142. APIParameterLogFieldFormatter common.APIParameterLogFieldFormatter
  143. // GetTacticsPayload provides a tactics lookup service.
  144. GetTacticsPayload GetTacticsPayload
  145. // IsValidServerEntryTag is a callback which checks if the specified
  146. // server entry tag is on the list of valid and active Psiphon server
  147. // entry tags.
  148. IsValidServerEntryTag func(serverEntryTag string) bool
  149. // IsLoadLimiting is a callback which checks if the broker process is in a
  150. // load limiting state, where consumed resources, including allocated
  151. // system memory and CPU load, exceed determined thresholds. When load
  152. // limiting is indicated, the broker will attempt to reduce load by
  153. // immediately rejecting either proxy announces or client offers,
  154. // depending on the state of the corresponding queues.
  155. IsLoadLimiting func() bool
  156. // PrivateKey is the broker's secure session long term private key.
  157. PrivateKey SessionPrivateKey
  158. // ObfuscationRootSecret broker's secure session long term obfuscation key.
  159. ObfuscationRootSecret ObfuscationSecret
  160. // ServerEntrySignaturePublicKey is the key used to verify Psiphon server
  161. // entry signatures.
  162. ServerEntrySignaturePublicKey string
  163. // These timeout parameters may be used to override defaults.
  164. ProxyAnnounceTimeout time.Duration
  165. ClientOfferTimeout time.Duration
  166. ClientOfferPersonalTimeout time.Duration
  167. PendingServerReportsTTL time.Duration
  168. // Announcement queue limit configuration.
  169. MatcherAnnouncementLimitEntryCount int
  170. MatcherAnnouncementRateLimitQuantity int
  171. MatcherAnnouncementRateLimitInterval time.Duration
  172. MatcherAnnouncementNonlimitedProxyIDs []ID
  173. // Offer queue limit configuration.
  174. MatcherOfferLimitEntryCount int
  175. MatcherOfferRateLimitQuantity int
  176. MatcherOfferRateLimitInterval time.Duration
  177. // MaxCompartmentIDs specifies the maximum number of compartment IDs that
  178. // can be included, per list, in one request. If 0, the value
  179. // MaxCompartmentIDs is used.
  180. MaxCompartmentIDs int
  181. }
  182. // BrokerLoggedEvent is an error type which indicates that the broker has
  183. // already logged an event recording the underlying error. This may be used
  184. // by the outer server layer to avoid redundant logs for HandleSessionPacket
  185. // errors.
  186. type BrokerLoggedEvent struct {
  187. err error
  188. }
  189. func NewBrokerLoggedEvent(err error) *BrokerLoggedEvent {
  190. return &BrokerLoggedEvent{err: err}
  191. }
  192. func (e *BrokerLoggedEvent) Error() string {
  193. return e.err.Error()
  194. }
  195. // NewBroker initializes a new Broker.
  196. func NewBroker(config *BrokerConfig) (*Broker, error) {
  197. // initiatorSessions are secure sessions initiated by the broker and used
  198. // to send BrokerServerReports to servers. The servers will be
  199. // configured to establish sessions only with brokers with specified
  200. // public keys.
  201. initiatorSessions := NewInitiatorSessions(config.PrivateKey)
  202. // responderSessions are secure sessions initiated by clients and proxies
  203. // and used to send requests to the broker. Clients and proxies are
  204. // configured to establish sessions only with specified broker public keys.
  205. responderSessions, err := NewResponderSessions(
  206. config.PrivateKey, config.ObfuscationRootSecret)
  207. if err != nil {
  208. return nil, errors.Trace(err)
  209. }
  210. // The broker ID is the broker's session public key in Curve25519 form.
  211. publicKey, err := config.PrivateKey.GetPublicKey()
  212. if err != nil {
  213. return nil, errors.Trace(err)
  214. }
  215. brokerID, err := publicKey.ToCurve25519()
  216. if err != nil {
  217. return nil, errors.Trace(err)
  218. }
  219. proxyQuality := NewProxyQuality()
  220. b := &Broker{
  221. config: config,
  222. brokerID: ID(brokerID),
  223. initiatorSessions: initiatorSessions,
  224. responderSessions: responderSessions,
  225. matcher: NewMatcher(&MatcherConfig{
  226. Logger: config.Logger,
  227. AnnouncementLimitEntryCount: config.MatcherAnnouncementLimitEntryCount,
  228. AnnouncementRateLimitQuantity: config.MatcherAnnouncementRateLimitQuantity,
  229. AnnouncementRateLimitInterval: config.MatcherAnnouncementRateLimitInterval,
  230. AnnouncementNonlimitedProxyIDs: config.MatcherAnnouncementNonlimitedProxyIDs,
  231. OfferLimitEntryCount: config.MatcherOfferLimitEntryCount,
  232. OfferRateLimitQuantity: config.MatcherOfferRateLimitQuantity,
  233. OfferRateLimitInterval: config.MatcherOfferRateLimitInterval,
  234. ProxyQualityState: proxyQuality,
  235. IsLoadLimiting: config.IsLoadLimiting,
  236. AllowMatch: config.AllowMatch,
  237. }),
  238. proxyQualityState: proxyQuality,
  239. proxyAnnounceTimeout: int64(config.ProxyAnnounceTimeout),
  240. clientOfferTimeout: int64(config.ClientOfferTimeout),
  241. clientOfferPersonalTimeout: int64(config.ClientOfferPersonalTimeout),
  242. pendingServerReportsTTL: int64(config.PendingServerReportsTTL),
  243. maxCompartmentIDs: int64(common.ValueOrDefault(config.MaxCompartmentIDs, MaxCompartmentIDs)),
  244. }
  245. b.pendingServerReports = lrucache.NewWithLRU(
  246. common.ValueOrDefault(config.PendingServerReportsTTL, brokerPendingServerReportsTTL),
  247. 1*time.Minute,
  248. brokerPendingServerReportsMaxSize)
  249. if len(config.CommonCompartmentIDs) > 0 {
  250. err = b.initializeCommonCompartmentIDHashing(config.CommonCompartmentIDs)
  251. if err != nil {
  252. return nil, errors.Trace(err)
  253. }
  254. }
  255. return b, nil
  256. }
  257. func (b *Broker) Start() error {
  258. if !b.isCommonCompartmentIDHashingInitialized() {
  259. return errors.TraceNew("missing common compartment IDs")
  260. }
  261. return errors.Trace(b.matcher.Start())
  262. }
  263. func (b *Broker) Stop() {
  264. b.matcher.Stop()
  265. }
  266. // SetCommonCompartmentIDs sets a new list of common compartment IDs,
  267. // replacing the previous configuration.
  268. func (b *Broker) SetCommonCompartmentIDs(commonCompartmentIDs []ID) error {
  269. // TODO: initializeCommonCompartmentIDHashing is called regardless whether
  270. // commonCompartmentIDs changes the previous configuration. To avoid the
  271. // overhead of consistent hashing initialization in
  272. // initializeCommonCompartmentIDHashing, add a mechanism to first quickly
  273. // check for changes?
  274. return errors.Trace(b.initializeCommonCompartmentIDHashing(commonCompartmentIDs))
  275. }
  276. // SetTimeouts sets new timeout values, replacing the previous configuration.
  277. // New timeout values do not apply to currently active announcement or offer
  278. // requests.
  279. func (b *Broker) SetTimeouts(
  280. proxyAnnounceTimeout time.Duration,
  281. clientOfferTimeout time.Duration,
  282. clientOfferPersonalTimeout time.Duration,
  283. pendingServerReportsTTL time.Duration,
  284. maxRequestTimeouts map[string]time.Duration) {
  285. atomic.StoreInt64(&b.proxyAnnounceTimeout, int64(proxyAnnounceTimeout))
  286. atomic.StoreInt64(&b.clientOfferTimeout, int64(clientOfferTimeout))
  287. atomic.StoreInt64(&b.clientOfferPersonalTimeout, int64(clientOfferPersonalTimeout))
  288. atomic.StoreInt64(&b.pendingServerReportsTTL, int64(pendingServerReportsTTL))
  289. b.maxRequestTimeouts.Store(maxRequestTimeouts)
  290. }
  291. // SetLimits sets new queue limit values, replacing the previous
  292. // configuration. New limits are only partially applied to existing queue
  293. // states; see Matcher.SetLimits.
  294. func (b *Broker) SetLimits(
  295. matcherAnnouncementLimitEntryCount int,
  296. matcherAnnouncementRateLimitQuantity int,
  297. matcherAnnouncementRateLimitInterval time.Duration,
  298. matcherAnnouncementNonlimitedProxyIDs []ID,
  299. matcherOfferLimitEntryCount int,
  300. matcherOfferRateLimitQuantity int,
  301. matcherOfferRateLimitInterval time.Duration,
  302. maxCompartmentIDs int) {
  303. b.matcher.SetLimits(
  304. matcherAnnouncementLimitEntryCount,
  305. matcherAnnouncementRateLimitQuantity,
  306. matcherAnnouncementRateLimitInterval,
  307. matcherAnnouncementNonlimitedProxyIDs,
  308. matcherOfferLimitEntryCount,
  309. matcherOfferRateLimitQuantity,
  310. matcherOfferRateLimitInterval)
  311. atomic.StoreInt64(
  312. &b.maxCompartmentIDs,
  313. int64(common.ValueOrDefault(maxCompartmentIDs, MaxCompartmentIDs)))
  314. }
  315. func (b *Broker) SetProxyQualityParameters(
  316. enable bool,
  317. qualityTTL time.Duration,
  318. pendingFailedMatchDeadline time.Duration,
  319. failedMatchThreshold int) {
  320. // enableProxyQuality is an atomic for fast lookups in request handlers;
  321. // an additional mutex is used here to ensure the Swap/Flush combination
  322. // is also atomic.
  323. b.enableProxyQualityMutex.Lock()
  324. wasEnabled := b.enableProxyQuality.Swap(enable)
  325. if wasEnabled && !enable {
  326. // Flush quality state, since otherwise the quality TTL can retain
  327. // quality data which may be unexpectedly reactivated when enable is
  328. // toggled on again.
  329. b.proxyQualityState.Flush()
  330. }
  331. b.enableProxyQualityMutex.Unlock()
  332. b.proxyQualityState.SetParameters(
  333. qualityTTL,
  334. pendingFailedMatchDeadline,
  335. failedMatchThreshold)
  336. }
  337. // HandleSessionPacket handles a session packet from a client or proxy and
  338. // provides a response packet. The packet is part of a secure session and may
  339. // be a session handshake message, an expired session reset token, or a
  340. // session-wrapped request payload. Request payloads are routed to API
  341. // request endpoints.
  342. //
  343. // The caller is expected to provide a transport obfuscation layer, such as
  344. // domain fronted HTTPs. The session has an obfuscation layer that ensures
  345. // that packets are fully random, randomly padded, and cannot be replayed.
  346. // This makes session packets suitable to embed as plaintext in some
  347. // transports.
  348. //
  349. // The caller is responsible for rate limiting and enforcing timeouts and
  350. // maximum payload size checks.
  351. //
  352. // Secure sessions support multiplexing concurrent requests, as long as the
  353. // provided transport, for example HTTP/2, supports this as well.
  354. //
  355. // The input ctx should be canceled if the client/proxy disconnects from the
  356. // transport while HandleSessionPacket is running, since long-polling proxy
  357. // announcement requests will otherwise remain blocked until eventual
  358. // timeout; net/http does this.
  359. //
  360. // When HandleSessionPacket returns an error, the transport provider should
  361. // apply anti-probing mechanisms, as the client/proxy may be a prober or
  362. // scanner.
  363. func (b *Broker) HandleSessionPacket(
  364. ctx context.Context,
  365. extendTransportTimeout ExtendTransportTimeout,
  366. transportLogFields common.LogFields,
  367. brokerClientIP string,
  368. geoIPData common.GeoIPData,
  369. inPacket []byte) ([]byte, error) {
  370. // handleUnwrappedRequest handles requests after session unwrapping.
  371. // responderSessions.HandlePacket handles both session establishment and
  372. // request unwrapping, and invokes handleUnwrappedRequest once a session
  373. // is established and a valid request unwrapped.
  374. handleUnwrappedRequest := func(initiatorID ID, unwrappedRequestPayload []byte) ([]byte, error) {
  375. recordType, err := peekRecordPreambleType(unwrappedRequestPayload)
  376. if err != nil {
  377. return nil, errors.Trace(err)
  378. }
  379. var responsePayload []byte
  380. switch recordType {
  381. case recordTypeAPIProxyAnnounceRequest:
  382. responsePayload, err = b.handleProxyAnnounce(
  383. ctx,
  384. extendTransportTimeout,
  385. transportLogFields,
  386. brokerClientIP,
  387. geoIPData,
  388. initiatorID,
  389. unwrappedRequestPayload)
  390. if err != nil {
  391. return nil, errors.Trace(err)
  392. }
  393. case recordTypeAPIProxyAnswerRequest:
  394. responsePayload, err = b.handleProxyAnswer(
  395. ctx,
  396. extendTransportTimeout,
  397. transportLogFields,
  398. brokerClientIP,
  399. geoIPData,
  400. initiatorID,
  401. unwrappedRequestPayload)
  402. if err != nil {
  403. return nil, errors.Trace(err)
  404. }
  405. case recordTypeAPIClientOfferRequest:
  406. responsePayload, err = b.handleClientOffer(
  407. ctx,
  408. extendTransportTimeout,
  409. transportLogFields,
  410. brokerClientIP,
  411. geoIPData,
  412. initiatorID,
  413. unwrappedRequestPayload)
  414. if err != nil {
  415. return nil, errors.Trace(err)
  416. }
  417. case recordTypeAPIServerProxyQualityRequest:
  418. responsePayload, err = b.handleServerProxyQuality(
  419. ctx,
  420. extendTransportTimeout,
  421. transportLogFields,
  422. brokerClientIP,
  423. geoIPData,
  424. initiatorID,
  425. unwrappedRequestPayload)
  426. if err != nil {
  427. return nil, errors.Trace(err)
  428. }
  429. case recordTypeAPIClientRelayedPacketRequest:
  430. responsePayload, err = b.handleClientRelayedPacket(
  431. ctx,
  432. extendTransportTimeout,
  433. transportLogFields,
  434. geoIPData,
  435. initiatorID,
  436. unwrappedRequestPayload)
  437. if err != nil {
  438. return nil, errors.Trace(err)
  439. }
  440. default:
  441. return nil, errors.Tracef("unexpected API record type %v", recordType)
  442. }
  443. return responsePayload, nil
  444. }
  445. // HandlePacket returns both a packet and an error in the expired session
  446. // reset token case. Log the error here, clear it, and return the
  447. // packet to be relayed back to the broker client.
  448. outPacket, err := b.responderSessions.HandlePacket(
  449. inPacket, handleUnwrappedRequest)
  450. if err != nil {
  451. if outPacket == nil {
  452. return nil, errors.Trace(err)
  453. }
  454. b.config.Logger.WithTraceFields(common.LogFields{"error": err}).Warning(
  455. "HandlePacket returned packet and error")
  456. }
  457. return outPacket, nil
  458. }
  459. // handleProxyAnnounce receives a proxy announcement, awaits a matching
  460. // client, and returns the client offer in the response. handleProxyAnnounce
  461. // has a long timeout so this request can idle until a matching client
  462. // arrives.
  463. func (b *Broker) handleProxyAnnounce(
  464. ctx context.Context,
  465. extendTransportTimeout ExtendTransportTimeout,
  466. transportLogFields common.LogFields,
  467. proxyIP string,
  468. geoIPData common.GeoIPData,
  469. initiatorID ID,
  470. requestPayload []byte) (retResponse []byte, retErr error) {
  471. startTime := time.Now()
  472. var logFields common.LogFields
  473. var isPriority bool
  474. var newTacticsTag string
  475. var clientOffer *MatchOffer
  476. var matchMetrics *MatchMetrics
  477. var timedOut bool
  478. var limitedErr error
  479. // As a future enhancement, a broker could initiate its own test
  480. // connection to the proxy to verify its effectiveness, including
  481. // simulating a symmetric NAT client.
  482. // Each announcement represents availability for a single client matching.
  483. // Proxies with multiple client availability will send multiple requests.
  484. //
  485. // The announcement request and response could be extended to allow the
  486. // proxy to specify availability for multiple clients in the request, and
  487. // multiple client offers returned in the response.
  488. //
  489. // If, as we expect, proxies run on home ISPs have limited upstream
  490. // bandwidth, they will support only a couple of concurrent clients, and
  491. // the simple single-client-announcment model may be sufficient. Also, if
  492. // the transport is HTTP/2, multiple requests can be multiplexed over a
  493. // single connection (and session) in any case.
  494. // The proxy ID is an implicit parameter: it's the proxy's session public
  495. // key. As part of the session handshake, the proxy has proven that it
  496. // has the corresponding private key. Proxy IDs are logged to attribute
  497. // traffic to a specific proxy.
  498. proxyID := initiatorID
  499. // Generate a connection ID. This ID is used to associate proxy
  500. // announcments, client offers, and proxy answers, as well as associating
  501. // Psiphon tunnels with in-proxy pairings.
  502. connectionID, err := MakeID()
  503. if err != nil {
  504. return nil, errors.Trace(err)
  505. }
  506. // Always log the outcome.
  507. defer func() {
  508. if logFields == nil {
  509. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  510. }
  511. logFields["broker_event"] = "proxy-announce"
  512. logFields["broker_id"] = b.brokerID
  513. logFields["proxy_id"] = proxyID
  514. logFields["is_priority"] = isPriority
  515. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  516. logFields["connection_id"] = connectionID
  517. if newTacticsTag != "" {
  518. logFields["new_tactics_tag"] = newTacticsTag
  519. }
  520. if clientOffer != nil {
  521. // Log the target Psiphon server ID (diagnostic ID). The presence
  522. // of this field indicates that a match was made.
  523. logFields["destination_server_id"] = clientOffer.DestinationServerID
  524. logFields["use_media_streams"] = clientOffer.UseMediaStreams
  525. }
  526. if timedOut {
  527. logFields["timed_out"] = true
  528. }
  529. if retErr != nil {
  530. logFields["error"] = retErr.Error()
  531. } else if limitedErr != nil {
  532. logFields["error"] = limitedErr.Error()
  533. }
  534. logFields.Add(transportLogFields)
  535. logFields.Add(matchMetrics.GetMetrics())
  536. b.config.Logger.LogMetric(brokerMetricName, logFields)
  537. if retErr != nil {
  538. retErr = NewBrokerLoggedEvent(retErr)
  539. }
  540. }()
  541. announceRequest, err := UnmarshalProxyAnnounceRequest(requestPayload)
  542. if err != nil {
  543. return nil, errors.Trace(err)
  544. }
  545. var apiParams common.APIParameters
  546. apiParams, logFields, err = announceRequest.ValidateAndGetParametersAndLogFields(
  547. int(atomic.LoadInt64(&b.maxCompartmentIDs)),
  548. b.config.APIParameterValidator,
  549. b.config.APIParameterLogFieldFormatter,
  550. geoIPData)
  551. if err != nil {
  552. return nil, errors.Trace(err)
  553. }
  554. hasPersonalCompartmentIDs := len(announceRequest.PersonalCompartmentIDs) > 0
  555. // Return MustUpgrade when the proxy's protocol version is less than the
  556. // minimum required.
  557. if announceRequest.Metrics.ProtocolVersion < minimumProxyProtocolVersion {
  558. responsePayload, err := MarshalProxyAnnounceResponse(
  559. &ProxyAnnounceResponse{
  560. NoMatch: true,
  561. MustUpgrade: true,
  562. })
  563. if err != nil {
  564. return nil, errors.Trace(err)
  565. }
  566. return responsePayload, nil
  567. }
  568. // Fetch new tactics for the proxy, if required, using the tactics tag
  569. // that should be included with the API parameters. A tacticsPayload may
  570. // be returned when there are no new tactics, and this is relayed back to
  571. // the proxy, after matching, so that it can extend the TTL for its
  572. // existing, cached tactics. In the case where tactics have changed,
  573. // don't enqueue the proxy announcement and return no-match so that the
  574. // proxy can store and apply the new tactics before announcing again.
  575. var tacticsPayload []byte
  576. if announceRequest.CheckTactics {
  577. tacticsPayload, newTacticsTag, err =
  578. b.config.GetTacticsPayload(geoIPData, apiParams)
  579. if err != nil {
  580. return nil, errors.Trace(err)
  581. }
  582. if tacticsPayload != nil && newTacticsTag != "" {
  583. responsePayload, err := MarshalProxyAnnounceResponse(
  584. &ProxyAnnounceResponse{
  585. TacticsPayload: tacticsPayload,
  586. NoMatch: true,
  587. })
  588. if err != nil {
  589. return nil, errors.Trace(err)
  590. }
  591. return responsePayload, nil
  592. }
  593. }
  594. // AllowProxy may be used to disallow proxies from certain geolocations,
  595. // such as censored locations, from announcing. Proxies with personal
  596. // compartment IDs are always allowed, as they will be used only by
  597. // clients specifically configured to use them.
  598. if !hasPersonalCompartmentIDs &&
  599. !b.config.AllowProxy(geoIPData) {
  600. return nil, errors.TraceNew("proxy disallowed")
  601. }
  602. // Assign this proxy to a common compartment ID, unless it has specified a
  603. // dedicated, personal compartment ID. Assignment uses consistent hashing
  604. // keyed with the proxy ID, in an effort to keep proxies consistently
  605. // assigned to the same compartment.
  606. var commonCompartmentIDs []ID
  607. if !hasPersonalCompartmentIDs {
  608. compartmentID, err := b.selectCommonCompartmentID(proxyID)
  609. if err != nil {
  610. return nil, errors.Trace(err)
  611. }
  612. commonCompartmentIDs = []ID{compartmentID}
  613. }
  614. // Determine whether to enqueue the proxy announcement in the priority
  615. // queue. To be prioritized, a proxy, identified by its ID and ASN, must
  616. // have a recent quality tunnel recorded in the quality state. In
  617. // addition, when the PrioritizeProxy callback is set, invoke this
  618. // additional condition, which can filter by proxy geolocation and other
  619. // properties.
  620. //
  621. // There is no prioritization for personal pairing announcements.
  622. // Potential future enhancements:
  623. //
  624. // - For a proxy with unknown quality (neither reported quality tunnels,
  625. // nor known failed matches), prioritize with some low probability to
  626. // give unknown proxies a chance to qualify? This could be limited, for
  627. // example, to proxies in the same ASN as other quality proxies. To
  628. // implement this, ProxyQualityState would need to record proxy IDs
  629. // with failed matches; and proxy ASNs would need to be input to
  630. // ProxyQualityState.
  631. //
  632. // - Consider using the Psiphon server region, as given in the signed
  633. // server entry, as part of the prioritization logic.
  634. if !hasPersonalCompartmentIDs && b.enableProxyQuality.Load() {
  635. // Here, no specific client ASN is specified for HasQuality. As long
  636. // as a proxy has a quality tunnel for any client ASN, it is
  637. // prioritized. In the matching process, an attempt is made to match
  638. // using HasQuality using the client ASN. See Matcher.matchOffer.
  639. isPriority = b.proxyQualityState.HasQuality(proxyID, geoIPData.ASN, "")
  640. }
  641. if isPriority && b.config.PrioritizeProxy != nil {
  642. // Note that, in the psiphon/server package, inproxyBrokerPrioritizeProxy
  643. // is always wired up, and, as currently implemented, the default value for
  644. // the InproxyBrokerMatcherPrioritizeProxiesFilter tactics parameter
  645. // results in PrioritizeProxy always returning false. Some filter,
  646. // even just a wildcard match, must be configured in order to prioritize.
  647. // Limitation: Of the two return values from
  648. // ValidateAndGetParametersAndLogFields, apiParams and logFields,
  649. // only logFields contains fields such as max_clients
  650. // and *_bytes_per_second, and so these cannot be part of any
  651. // filtering performed by the PrioritizeProxy callback.
  652. //
  653. // TODO: include the additional fields in logFields. Since the
  654. // logFields return value is the output of server.getRequestLogFields
  655. // processing, it's not safe to use it directly. In addition,
  656. // filtering by fields such as max_clients and *_bytes_per_second
  657. // calls for range filtering, which is not yet supported in the
  658. // psiphon/server.MeekServer PrioritizeProxy provider.
  659. isPriority = b.config.PrioritizeProxy(
  660. int(announceRequest.Metrics.ProtocolVersion), geoIPData, apiParams)
  661. }
  662. // Await client offer.
  663. timeout := common.ValueOrDefault(
  664. time.Duration(atomic.LoadInt64(&b.proxyAnnounceTimeout)),
  665. brokerProxyAnnounceTimeout)
  666. // Adjust the timeout to respect any shorter maximum request timeouts for
  667. // the fronting provider.
  668. timeout = b.adjustRequestTimeout(logFields, timeout)
  669. announceCtx, cancelFunc := context.WithTimeout(ctx, timeout)
  670. defer cancelFunc()
  671. extendTransportTimeout(timeout)
  672. // Note that matcher.Announce assumes a monotonically increasing
  673. // announceCtx.Deadline input for each successive call.
  674. clientOffer, matchMetrics, err = b.matcher.Announce(
  675. announceCtx,
  676. proxyIP,
  677. &MatchAnnouncement{
  678. Properties: MatchProperties{
  679. IsPriority: isPriority,
  680. ProtocolVersion: announceRequest.Metrics.ProtocolVersion,
  681. CommonCompartmentIDs: commonCompartmentIDs,
  682. PersonalCompartmentIDs: announceRequest.PersonalCompartmentIDs,
  683. GeoIPData: geoIPData,
  684. NetworkType: GetNetworkType(announceRequest.Metrics.BaseAPIParameters),
  685. NATType: announceRequest.Metrics.NATType,
  686. PortMappingTypes: announceRequest.Metrics.PortMappingTypes,
  687. },
  688. ProxyID: initiatorID,
  689. ProxyMetrics: announceRequest.Metrics,
  690. ConnectionID: connectionID,
  691. })
  692. if err != nil {
  693. var limitError *MatcherLimitError
  694. limited := std_errors.As(err, &limitError)
  695. timeout := announceCtx.Err() == context.DeadlineExceeded
  696. if !limited && !timeout {
  697. return nil, errors.Trace(err)
  698. }
  699. // A no-match response is sent in the case of a timeout awaiting a
  700. // match. The faster-failing rate or entry limiting case also results
  701. // in a response, rather than an error return from handleProxyAnnounce,
  702. // so that the proxy doesn't receive a 404 and flag its BrokerClient as
  703. // having failed.
  704. //
  705. // When the timeout and limit case coincide, limit takes precedence in
  706. // the response.
  707. if timeout && !limited {
  708. // Note: the respective proxy and broker timeouts,
  709. // InproxyBrokerProxyAnnounceTimeout and
  710. // InproxyProxyAnnounceRequestTimeout in tactics, should be
  711. // configured so that the broker will timeout first and have an
  712. // opportunity to send this response before the proxy times out.
  713. timedOut = true
  714. } else {
  715. // Record the specific limit error in the proxy-announce broker event.
  716. limitedErr = err
  717. }
  718. responsePayload, err := MarshalProxyAnnounceResponse(
  719. &ProxyAnnounceResponse{
  720. TacticsPayload: tacticsPayload,
  721. Limited: limited,
  722. NoMatch: timeout && !limited,
  723. })
  724. if err != nil {
  725. return nil, errors.Trace(err)
  726. }
  727. return responsePayload, nil
  728. }
  729. // Select the protocol version. The matcher has already checked
  730. // negotiateProtocolVersion, so failure is not expected.
  731. negotiatedProtocolVersion, ok := negotiateProtocolVersion(
  732. announceRequest.Metrics.ProtocolVersion,
  733. clientOffer.Properties.ProtocolVersion,
  734. clientOffer.UseMediaStreams)
  735. if !ok {
  736. return nil, errors.TraceNew("unexpected negotiateProtocolVersion failure")
  737. }
  738. // Respond with the client offer. The proxy will follow up with an answer
  739. // request, which is relayed to the client, and then the WebRTC dial begins.
  740. // Limitation: as part of the client's tunnel establishment horse race, a
  741. // client may abort an in-proxy dial at any point. If the overall dial is
  742. // past the SDP exchange and aborted during the WebRTC connection
  743. // establishment, the client may leave the proxy's Proxy.proxyOneClient
  744. // dangling until timeout. Consider adding a signal from the client to
  745. // the proxy, relayed by the broker, that a dial is aborted.
  746. responsePayload, err := MarshalProxyAnnounceResponse(
  747. &ProxyAnnounceResponse{
  748. TacticsPayload: tacticsPayload,
  749. ConnectionID: connectionID,
  750. SelectedProtocolVersion: negotiatedProtocolVersion,
  751. ClientOfferSDP: clientOffer.ClientOfferSDP,
  752. ClientRootObfuscationSecret: clientOffer.ClientRootObfuscationSecret,
  753. DoDTLSRandomization: clientOffer.DoDTLSRandomization,
  754. UseMediaStreams: clientOffer.UseMediaStreams,
  755. TrafficShapingParameters: clientOffer.TrafficShapingParameters,
  756. NetworkProtocol: clientOffer.NetworkProtocol,
  757. DestinationAddress: clientOffer.DestinationAddress,
  758. })
  759. if err != nil {
  760. return nil, errors.Trace(err)
  761. }
  762. // Set the "failed match" trigger, which will progress towards clearing
  763. // the quality state for this proxyID unless quality tunnels are reported
  764. // soon enough after matches. This includes failure, by the proxy, to
  765. // return an proxy answer, as well as any tunnel failures after that.
  766. //
  767. // Failures are expected even for good quality proxies, due to cases such
  768. // as the in-proxy protocol losing the client tunnel establishment horse
  769. // race. There is a threshold number of failed matches that must be
  770. // reached before a quality state is cleared.
  771. b.proxyQualityState.Matched(proxyID, geoIPData.ASN)
  772. return responsePayload, nil
  773. }
  774. // handleClientOffer receives a client offer, awaits a matching client, and
  775. // returns the proxy answer. handleClientOffer has a shorter timeout than
  776. // handleProxyAnnounce since the client has supplied an SDP with STUN hole
  777. // punches which will expire; and, in general, the client is trying to
  778. // connect immediately and is also trying other candidates.
  779. func (b *Broker) handleClientOffer(
  780. ctx context.Context,
  781. extendTransportTimeout ExtendTransportTimeout,
  782. transportLogFields common.LogFields,
  783. clientIP string,
  784. geoIPData common.GeoIPData,
  785. initiatorID ID,
  786. requestPayload []byte) (retResponse []byte, retErr error) {
  787. // As a future enhancement, consider having proxies send offer SDPs with
  788. // announcements and clients long poll to await a match and then provide
  789. // an answer. This order of operations would make sense if client demand
  790. // is high and proxy supply is lower.
  791. //
  792. // Also see comment in Proxy.proxyOneClient for other alternative
  793. // approaches.
  794. // The client's session public key is ephemeral and is not logged.
  795. startTime := time.Now()
  796. var logFields common.LogFields
  797. var serverParams *serverParams
  798. var clientMatchOffer *MatchOffer
  799. var proxyMatchAnnouncement *MatchAnnouncement
  800. var proxyAnswer *MatchAnswer
  801. var matchMetrics *MatchMetrics
  802. var timedOut bool
  803. var limitedErr error
  804. // Always log the outcome.
  805. defer func() {
  806. if logFields == nil {
  807. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  808. }
  809. logFields["broker_event"] = "client-offer"
  810. logFields["broker_id"] = b.brokerID
  811. if serverParams != nil {
  812. logFields["destination_server_id"] = serverParams.serverID
  813. }
  814. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  815. if proxyAnswer != nil {
  816. // The presence of these fields indicate that a match was made,
  817. // the proxy delivered an answer, and the client was still
  818. // waiting for it.
  819. logFields["connection_id"] = proxyAnswer.ConnectionID
  820. logFields["client_nat_type"] = clientMatchOffer.Properties.NATType
  821. logFields["client_port_mapping_types"] = clientMatchOffer.Properties.PortMappingTypes
  822. logFields["proxy_nat_type"] = proxyMatchAnnouncement.Properties.NATType
  823. logFields["proxy_port_mapping_types"] = proxyMatchAnnouncement.Properties.PortMappingTypes
  824. logFields["preferred_nat_match"] =
  825. clientMatchOffer.Properties.IsPreferredNATMatch(&proxyMatchAnnouncement.Properties)
  826. // TODO: also log proxy ice_candidate_types and has_IPv6; for the
  827. // client, these values are added by ValidateAndGetLogFields.
  828. }
  829. if timedOut {
  830. logFields["timed_out"] = true
  831. }
  832. if retErr != nil {
  833. logFields["error"] = retErr.Error()
  834. } else if limitedErr != nil {
  835. logFields["error"] = limitedErr.Error()
  836. }
  837. logFields.Add(transportLogFields)
  838. logFields.Add(matchMetrics.GetMetrics())
  839. b.config.Logger.LogMetric(brokerMetricName, logFields)
  840. if retErr != nil {
  841. retErr = NewBrokerLoggedEvent(retErr)
  842. }
  843. }()
  844. offerRequest, err := UnmarshalClientOfferRequest(requestPayload)
  845. if err != nil {
  846. return nil, errors.Trace(err)
  847. }
  848. // The filtered SDP is the request SDP with any invalid (bogon, unexpected
  849. // GeoIP) ICE candidates filtered out. In some cases, clients cannot
  850. // avoid submitting invalid candidates (see comment in
  851. // processSDPAddresses), so all invalid candidates are removed and the
  852. // remaining SDP is used. Filtered candidate information is logged in
  853. // logFields.
  854. //
  855. // In personal pairing mode, RFC 1918/4193 private IP addresses are
  856. // permitted in exchanged SDPs and not filtered out.
  857. var filteredSDP []byte
  858. filteredSDP, logFields, err = offerRequest.ValidateAndGetLogFields(
  859. int(atomic.LoadInt64(&b.maxCompartmentIDs)),
  860. b.config.LookupGeoIP,
  861. b.config.APIParameterValidator,
  862. b.config.APIParameterLogFieldFormatter,
  863. geoIPData)
  864. if err != nil {
  865. return nil, errors.Trace(err)
  866. }
  867. hasPersonalCompartmentIDs := len(offerRequest.PersonalCompartmentIDs) > 0
  868. offerSDP := offerRequest.ClientOfferSDP
  869. offerSDP.SDP = string(filteredSDP)
  870. // AllowClient may be used to disallow clients from certain geolocations
  871. // from offering. Clients are always allowed to match proxies with shared
  872. // personal compartment IDs.
  873. if !hasPersonalCompartmentIDs &&
  874. !b.config.AllowClient(geoIPData) {
  875. return nil, errors.TraceNew("client disallowed")
  876. }
  877. // Validate that the proxy destination specified by the client is a valid
  878. // dial address for a signed Psiphon server entry. This ensures a client
  879. // can't misuse a proxy to connect to arbitrary destinations.
  880. serverParams, err = b.validateDestination(
  881. geoIPData,
  882. offerRequest.PackedDestinationServerEntry,
  883. offerRequest.NetworkProtocol,
  884. offerRequest.DestinationAddress)
  885. if err != nil {
  886. // Record the specific error in the client-offer broker event.
  887. limitedErr = err
  888. // Return a response. This avoids returning a broker-client-resetting
  889. // 404 in cases including "invalid server entry tag", where a
  890. // legitimate client submits an unpruned, decommissioned server entry.
  891. responsePayload, err := MarshalClientOfferResponse(
  892. &ClientOfferResponse{Limited: true})
  893. if err != nil {
  894. return nil, errors.Trace(err)
  895. }
  896. return responsePayload, nil
  897. }
  898. // Return MustUpgrade when the client's protocol version is less than the
  899. // minimum required.
  900. if offerRequest.Metrics.ProtocolVersion < minimumClientProtocolVersion {
  901. responsePayload, err := MarshalClientOfferResponse(
  902. &ClientOfferResponse{
  903. NoMatch: true,
  904. MustUpgrade: true,
  905. })
  906. if err != nil {
  907. return nil, errors.Trace(err)
  908. }
  909. return responsePayload, nil
  910. }
  911. // Enqueue the client offer and await a proxy matching and subsequent
  912. // proxy answer.
  913. // The Client Offer timeout may be configured with a shorter value in
  914. // personal pairing mode, to facilitate a faster no-match result and
  915. // resulting broker rotation.
  916. var timeout time.Duration
  917. if hasPersonalCompartmentIDs {
  918. timeout = time.Duration(atomic.LoadInt64(&b.clientOfferPersonalTimeout))
  919. } else {
  920. timeout = time.Duration(atomic.LoadInt64(&b.clientOfferTimeout))
  921. }
  922. timeout = common.ValueOrDefault(timeout, brokerClientOfferTimeout)
  923. // Adjust the timeout to respect any shorter maximum request timeouts for
  924. // the fronting provider.
  925. timeout = b.adjustRequestTimeout(logFields, timeout)
  926. offerCtx, cancelFunc := context.WithTimeout(ctx, timeout)
  927. defer cancelFunc()
  928. extendTransportTimeout(timeout)
  929. clientMatchOffer = &MatchOffer{
  930. Properties: MatchProperties{
  931. ProtocolVersion: offerRequest.Metrics.ProtocolVersion,
  932. CommonCompartmentIDs: offerRequest.CommonCompartmentIDs,
  933. PersonalCompartmentIDs: offerRequest.PersonalCompartmentIDs,
  934. GeoIPData: geoIPData,
  935. NetworkType: GetNetworkType(offerRequest.Metrics.BaseAPIParameters),
  936. NATType: offerRequest.Metrics.NATType,
  937. PortMappingTypes: offerRequest.Metrics.PortMappingTypes,
  938. },
  939. ClientOfferSDP: offerSDP,
  940. ClientRootObfuscationSecret: offerRequest.ClientRootObfuscationSecret,
  941. DoDTLSRandomization: offerRequest.DoDTLSRandomization,
  942. UseMediaStreams: offerRequest.UseMediaStreams,
  943. TrafficShapingParameters: offerRequest.TrafficShapingParameters,
  944. NetworkProtocol: offerRequest.NetworkProtocol,
  945. DestinationAddress: offerRequest.DestinationAddress,
  946. DestinationServerID: serverParams.serverID,
  947. }
  948. proxyAnswer, proxyMatchAnnouncement, matchMetrics, err = b.matcher.Offer(
  949. offerCtx,
  950. clientIP,
  951. clientMatchOffer)
  952. if err != nil {
  953. var limitError *MatcherLimitError
  954. limited := std_errors.As(err, &limitError)
  955. timeout := offerCtx.Err() == context.DeadlineExceeded
  956. // A no-match response is sent in the case of a timeout awaiting a
  957. // match. The faster-failing rate or entry limiting case also results
  958. // in a response, rather than an error return from handleClientOffer,
  959. // so that the client doesn't receive a 404 and flag its BrokerClient
  960. // as having failed.
  961. //
  962. // When the timeout and limit cases coincide, limit takes precedence in
  963. // the response.
  964. if timeout {
  965. // Record the time out outcome in the client-offer broker event.
  966. //
  967. // Note: the respective client and broker timeouts,
  968. // InproxyBrokerClientOfferTimeout and
  969. // InproxyClientOfferRequestTimeout in tactics, should be configured
  970. // so that the broker will timeout first and have an opportunity to
  971. // send this response before the client times out.
  972. timedOut = true
  973. }
  974. if limited {
  975. // Record the specific limit error in the client-offer broker event.
  976. limitedErr = err
  977. }
  978. if !limited && !timeout {
  979. // If matcher.Offer failed for some other reason, default to
  980. // returning Limited in the response. As currently implemented,
  981. // when clients receive the Limited flag, they will fail the
  982. // in-proxy dial without retrying, but retain their broker client.
  983. //
  984. // While the matcher.Offer failure scenarios may include an
  985. // internal error, they also include the "no answer" case where a
  986. // proxy fails to produce an answer. For the "no answer" case,
  987. // Limited is preferred over returning NoMatch, which can trigger
  988. // a broker client cycle.
  989. //
  990. // TODO: add a new flag to signal to the client that it may retry
  991. // in the "no answer" case.
  992. limited = true
  993. limitedErr = err
  994. }
  995. responsePayload, err := MarshalClientOfferResponse(
  996. &ClientOfferResponse{
  997. Limited: limited,
  998. NoMatch: timeout && !limited,
  999. })
  1000. if err != nil {
  1001. return nil, errors.Trace(err)
  1002. }
  1003. return responsePayload, nil
  1004. }
  1005. // Log the type of compartment matching that occurred. As
  1006. // PersonalCompartmentIDs are user-generated and shared, actual matching
  1007. // values are not logged as they may link users.
  1008. // TODO: log matching common compartment IDs?
  1009. matchedCommonCompartments := HaveCommonIDs(
  1010. proxyMatchAnnouncement.Properties.CommonCompartmentIDs,
  1011. clientMatchOffer.Properties.CommonCompartmentIDs)
  1012. matchedPersonalCompartments := HaveCommonIDs(
  1013. proxyMatchAnnouncement.Properties.PersonalCompartmentIDs,
  1014. clientMatchOffer.Properties.PersonalCompartmentIDs)
  1015. // Initiate a BrokerServerReport, which sends important information
  1016. // about the connection, including the original client IP, plus other
  1017. // values to be logged with server_tunne, to the server. The report is
  1018. // sent through a secure session established between the broker and the
  1019. // server, relayed by the client.
  1020. //
  1021. // The first relay message will be embedded in the Psiphon handshake. The
  1022. // broker may already have an established session with the server. In
  1023. // this case, only only that initial message is required. The
  1024. // BrokerServerReport is a one-way message, which avoids extra untunneled
  1025. // client/broker traffic.
  1026. //
  1027. // Limitations, due to the one-way message:
  1028. // - the broker can't actively clean up pendingServerReports as
  1029. // tunnels are established and must rely on cache expiry.
  1030. // - the broker doesn't learn that the server accepted the report, and
  1031. // so cannot log a final connection status or signal the proxy to
  1032. // disconnect the client in any misuse cases.
  1033. //
  1034. // As a future enhancement, consider adding a _tunneled_ client relay
  1035. // of a server response acknowledging the broker report.
  1036. relayPacket, err := b.initiateRelayedServerReport(
  1037. serverParams,
  1038. proxyAnswer.ConnectionID,
  1039. &BrokerServerReport{
  1040. ProxyID: proxyAnswer.ProxyID,
  1041. ConnectionID: proxyAnswer.ConnectionID,
  1042. MatchedCommonCompartments: matchedCommonCompartments,
  1043. MatchedPersonalCompartments: matchedPersonalCompartments,
  1044. ClientNATType: clientMatchOffer.Properties.NATType,
  1045. ClientPortMappingTypes: clientMatchOffer.Properties.PortMappingTypes,
  1046. ClientIP: clientIP,
  1047. ProxyIP: proxyAnswer.ProxyIP,
  1048. // ProxyMetrics includes proxy NAT and port mapping types.
  1049. ProxyMetrics: proxyMatchAnnouncement.ProxyMetrics,
  1050. ProxyIsPriority: proxyMatchAnnouncement.Properties.IsPriority,
  1051. })
  1052. if err != nil {
  1053. return nil, errors.Trace(err)
  1054. }
  1055. // Select the protocol version. The matcher has already checked
  1056. // negotiateProtocolVersion, so failure is not expected.
  1057. negotiatedProtocolVersion, ok := negotiateProtocolVersion(
  1058. proxyMatchAnnouncement.Properties.ProtocolVersion,
  1059. offerRequest.Metrics.ProtocolVersion,
  1060. offerRequest.UseMediaStreams)
  1061. if !ok {
  1062. return nil, errors.TraceNew("unexpected negotiateProtocolVersion failure")
  1063. }
  1064. // Respond with the proxy answer and initial broker/server session packet.
  1065. responsePayload, err := MarshalClientOfferResponse(
  1066. &ClientOfferResponse{
  1067. ConnectionID: proxyAnswer.ConnectionID,
  1068. SelectedProtocolVersion: negotiatedProtocolVersion,
  1069. ProxyAnswerSDP: proxyAnswer.ProxyAnswerSDP,
  1070. RelayPacketToServer: relayPacket,
  1071. })
  1072. if err != nil {
  1073. return nil, errors.Trace(err)
  1074. }
  1075. return responsePayload, nil
  1076. }
  1077. // handleProxyAnswer receives a proxy answer and delivers it to the waiting
  1078. // client.
  1079. func (b *Broker) handleProxyAnswer(
  1080. ctx context.Context,
  1081. extendTransportTimeout ExtendTransportTimeout,
  1082. transportLogFields common.LogFields,
  1083. proxyIP string,
  1084. geoIPData common.GeoIPData,
  1085. initiatorID ID,
  1086. requestPayload []byte) (retResponse []byte, retErr error) {
  1087. startTime := time.Now()
  1088. var logFields common.LogFields
  1089. var proxyAnswer *MatchAnswer
  1090. var answerError string
  1091. // The proxy ID is an implicit parameter: it's the proxy's session public
  1092. // key.
  1093. proxyID := initiatorID
  1094. // Always log the outcome.
  1095. defer func() {
  1096. if logFields == nil {
  1097. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  1098. }
  1099. logFields["broker_event"] = "proxy-answer"
  1100. logFields["broker_id"] = b.brokerID
  1101. logFields["proxy_id"] = proxyID
  1102. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  1103. if proxyAnswer != nil {
  1104. logFields["connection_id"] = proxyAnswer.ConnectionID
  1105. }
  1106. if answerError != "" {
  1107. // This is a proxy-reported error that occurred while creating the answer.
  1108. logFields["answer_error"] = answerError
  1109. }
  1110. if retErr != nil {
  1111. logFields["error"] = retErr.Error()
  1112. }
  1113. logFields.Add(transportLogFields)
  1114. b.config.Logger.LogMetric(brokerMetricName, logFields)
  1115. if retErr != nil {
  1116. retErr = NewBrokerLoggedEvent(retErr)
  1117. }
  1118. }()
  1119. answerRequest, err := UnmarshalProxyAnswerRequest(requestPayload)
  1120. if err != nil {
  1121. return nil, errors.Trace(err)
  1122. }
  1123. // The filtered SDP is the request SDP with any invalid (bogon, unexpected
  1124. // GeoIP) ICE candidates filtered out. In some cases, proxies cannot
  1125. // avoid submitting invalid candidates (see comment in
  1126. // processSDPAddresses), so all invalid candidates are removed and the
  1127. // remaining SDP is used. Filtered candidate information is logged in
  1128. // logFields.
  1129. //
  1130. // In personal pairing mode, RFC 1918/4193 private IP addresses are
  1131. // permitted in exchanged SDPs and not filtered out.
  1132. hasPersonalCompartmentIDs, err := b.matcher.AnnouncementHasPersonalCompartmentIDs(
  1133. initiatorID, answerRequest.ConnectionID)
  1134. if err != nil {
  1135. return nil, errors.Trace(err)
  1136. }
  1137. var filteredSDP []byte
  1138. filteredSDP, logFields, err = answerRequest.ValidateAndGetLogFields(
  1139. b.config.LookupGeoIP,
  1140. b.config.APIParameterValidator,
  1141. b.config.APIParameterLogFieldFormatter,
  1142. geoIPData,
  1143. hasPersonalCompartmentIDs)
  1144. if err != nil {
  1145. return nil, errors.Trace(err)
  1146. }
  1147. answerSDP := answerRequest.ProxyAnswerSDP
  1148. answerSDP.SDP = string(filteredSDP)
  1149. if answerRequest.AnswerError != "" {
  1150. // The proxy failed to create an answer.
  1151. answerError = answerRequest.AnswerError
  1152. b.matcher.AnswerError(initiatorID, answerRequest.ConnectionID)
  1153. } else {
  1154. // Deliver the answer to the client.
  1155. // Note that neither ProxyID nor ProxyIP is returned to the client.
  1156. // These fields are used internally in the matcher.
  1157. proxyAnswer = &MatchAnswer{
  1158. ProxyIP: proxyIP,
  1159. ProxyID: initiatorID,
  1160. ConnectionID: answerRequest.ConnectionID,
  1161. ProxyAnswerSDP: answerSDP,
  1162. }
  1163. err = b.matcher.Answer(proxyAnswer)
  1164. if err != nil {
  1165. return nil, errors.Trace(err)
  1166. }
  1167. }
  1168. // There is no data in this response, it's simply an acknowledgement that
  1169. // the answer was received. Upon receiving the response, the proxy should
  1170. // begin the WebRTC dial operation.
  1171. responsePayload, err := MarshalProxyAnswerResponse(
  1172. &ProxyAnswerResponse{})
  1173. if err != nil {
  1174. return nil, errors.Trace(err)
  1175. }
  1176. return responsePayload, nil
  1177. }
  1178. // handleServerProxyQuality receives, from servers, proxy tunnel quality and
  1179. // records that in the proxy quality state that is used to prioritize
  1180. // well-performing proxies.
  1181. func (b *Broker) handleServerProxyQuality(
  1182. ctx context.Context,
  1183. extendTransportTimeout ExtendTransportTimeout,
  1184. transportLogFields common.LogFields,
  1185. proxyIP string,
  1186. geoIPData common.GeoIPData,
  1187. initiatorID ID,
  1188. requestPayload []byte) (retResponse []byte, retErr error) {
  1189. startTime := time.Now()
  1190. var logFields common.LogFields
  1191. // Only known, trusted Psiphon server initiators are allowed to send proxy
  1192. // quality requests. knownServerInitiatorIDs is populated with the
  1193. // Curve25519 public keys -- initiator IDs -- corresponding to the
  1194. // session public keys found in signed Psiphon server entries.
  1195. //
  1196. // Currently, knownServerInitiatorIDs is populated with destination server
  1197. // entries received in client offers, so the broker must first receive a
  1198. // client offer before a given server is trusted, which means
  1199. // that "invalid initiator" errors may occur, and some quality requests
  1200. // may be dropped, in some expected situations, including a broker restart.
  1201. // serverID is the server entry diagnostic ID of the server.
  1202. serverIDValue, ok := b.knownServerInitiatorIDs.Load(initiatorID)
  1203. if !ok {
  1204. return nil, errors.TraceNew("invalid initiator")
  1205. }
  1206. serverID := serverIDValue.(string)
  1207. // Always log the outcome.
  1208. defer func() {
  1209. // Typically, a server will send the same proxy quality request to all
  1210. // brokers. For the one "broadcast" request, server-proxy-quality is
  1211. // logged by each broker, as an indication that every server/broker
  1212. // request pair is successful.
  1213. //
  1214. // TODO: log more details from ServerProxyQualityRequest.QualityCounts?
  1215. if logFields == nil {
  1216. logFields = common.LogFields{}
  1217. }
  1218. logFields["broker_event"] = "server-proxy-quality"
  1219. logFields["broker_id"] = b.brokerID
  1220. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  1221. logFields["server_id"] = serverID
  1222. if retErr != nil {
  1223. logFields["error"] = retErr.Error()
  1224. }
  1225. logFields.Add(transportLogFields)
  1226. b.config.Logger.LogMetric(brokerMetricName, logFields)
  1227. if retErr != nil {
  1228. retErr = NewBrokerLoggedEvent(retErr)
  1229. }
  1230. }()
  1231. qualityRequest, err := UnmarshalServerProxyQualityRequest(requestPayload)
  1232. if err != nil {
  1233. return nil, errors.Trace(err)
  1234. }
  1235. logFields, err = qualityRequest.ValidateAndGetLogFields()
  1236. if err != nil {
  1237. return nil, errors.Trace(err)
  1238. }
  1239. // Add the quality counts into the existing proxy quality state.
  1240. //
  1241. // The counts are ignored when proxy quality is disabled, but an
  1242. // anknowledgement is still returned to the server.
  1243. if b.enableProxyQuality.Load() {
  1244. for proxyKey, counts := range qualityRequest.QualityCounts {
  1245. b.proxyQualityState.AddQuality(proxyKey, counts)
  1246. }
  1247. }
  1248. // There is no data in this response, it's simply an acknowledgement that
  1249. // the request was received.
  1250. responsePayload, err := MarshalServerProxyQualityResponse(
  1251. &ServerProxyQualityResponse{})
  1252. if err != nil {
  1253. return nil, errors.Trace(err)
  1254. }
  1255. return responsePayload, nil
  1256. }
  1257. // handleClientRelayedPacket facilitates broker/server sessions. The initial
  1258. // packet from the broker is sent to the client in the ClientOfferResponse.
  1259. // The client sends that to the server in the Psiphon handshake. If the
  1260. // session was already established, the relay ends there. Otherwise, the
  1261. // client receives any packet sent back by the server and that server packet
  1262. // is then delivered to the broker in a ClientRelayedPacketRequest. If the
  1263. // session needs to be [re-]negotiated, there are additional
  1264. // ClientRelayedPacket round trips until the session is established and the
  1265. // BrokerServerReport is securely exchanged between the broker and server.
  1266. func (b *Broker) handleClientRelayedPacket(
  1267. ctx context.Context,
  1268. extendTransportTimeout ExtendTransportTimeout,
  1269. transportLogFields common.LogFields,
  1270. geoIPData common.GeoIPData,
  1271. initiatorID ID,
  1272. requestPayload []byte) (retResponse []byte, retErr error) {
  1273. startTime := time.Now()
  1274. var logFields common.LogFields
  1275. var relayedPacketRequest *ClientRelayedPacketRequest
  1276. var serverID string
  1277. // Always log the outcome.
  1278. defer func() {
  1279. if logFields == nil {
  1280. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  1281. }
  1282. logFields["broker_event"] = "client-relayed-packet"
  1283. logFields["broker_id"] = b.brokerID
  1284. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  1285. if relayedPacketRequest != nil {
  1286. logFields["connection_id"] = relayedPacketRequest.ConnectionID
  1287. }
  1288. if serverID != "" {
  1289. logFields["destination_server_id"] = serverID
  1290. }
  1291. if retErr != nil {
  1292. logFields["error"] = retErr.Error()
  1293. }
  1294. logFields.Add(transportLogFields)
  1295. b.config.Logger.LogMetric(brokerMetricName, logFields)
  1296. if retErr != nil {
  1297. retErr = NewBrokerLoggedEvent(retErr)
  1298. }
  1299. }()
  1300. relayedPacketRequest, err := UnmarshalClientRelayedPacketRequest(requestPayload)
  1301. if err != nil {
  1302. return nil, errors.Trace(err)
  1303. }
  1304. logFields, err = relayedPacketRequest.ValidateAndGetLogFields(
  1305. b.config.APIParameterValidator,
  1306. b.config.APIParameterLogFieldFormatter,
  1307. geoIPData)
  1308. if err != nil {
  1309. return nil, errors.Trace(err)
  1310. }
  1311. // The relay state is associated with the connection ID.
  1312. strConnectionID := string(relayedPacketRequest.ConnectionID[:])
  1313. entry, ok := b.pendingServerReports.Get(strConnectionID)
  1314. if !ok {
  1315. // The relay state is not found; it may have been evicted from the
  1316. // cache. The client will receive a generic error in this case and
  1317. // should stop relaying. Assuming the server is configured to require
  1318. // a BrokerServerReport, the tunnel will be terminated, so the
  1319. // client should also abandon the dial.
  1320. return nil, errors.TraceNew("no pending report")
  1321. }
  1322. pendingServerReport := entry.(*pendingServerReport)
  1323. serverID = pendingServerReport.serverID
  1324. // When the broker tried to use an existing session that was expired on the
  1325. // server, the server will respond here with a signed session reset token. The
  1326. // broker resets the session and starts to establish a new session.
  1327. //
  1328. // The non-waiting session establishment mode is used for broker/server
  1329. // sessions: if multiple clients concurrently try to relay new sessions,
  1330. // all establishments will happen in parallel without forcing any clients
  1331. // to wait for one client to lead the establishment. The last established
  1332. // session will be retained for reuse.
  1333. //
  1334. // If there is an error, the relayed packet is invalid. Drop the packet
  1335. // and return an error to be logged. Do _not_ reset the session,
  1336. // otherwise a malicious client could interrupt a valid broker/server
  1337. // session with a malformed packet.
  1338. // Next is given a nil ctx since we're not waiting for any other client to
  1339. // establish the session.
  1340. out, _, err := pendingServerReport.roundTrip.Next(
  1341. nil, relayedPacketRequest.PacketFromServer)
  1342. if err != nil {
  1343. return nil, errors.Trace(err)
  1344. }
  1345. if out == nil {
  1346. // The BrokerServerReport is a one-way message, As a result, the relay
  1347. // never ends with broker receiving a response; it's either
  1348. // (re)handshaking or sending the one-way report.
  1349. return nil, errors.TraceNew("unexpected nil packet")
  1350. }
  1351. // Return the next broker packet for the client to relay to the server.
  1352. // When it receives a nil PacketToServer, the client will stop relaying.
  1353. responsePayload, err := MarshalClientRelayedPacketResponse(
  1354. &ClientRelayedPacketResponse{
  1355. PacketToServer: out,
  1356. })
  1357. if err != nil {
  1358. return nil, errors.Trace(err)
  1359. }
  1360. return responsePayload, nil
  1361. }
  1362. func (b *Broker) adjustRequestTimeout(
  1363. logFields common.LogFields, timeout time.Duration) time.Duration {
  1364. // Adjust long-polling request timeouts to respect any maximum request
  1365. // timeout supported by the provider fronting the request.
  1366. //
  1367. // Limitation: the client is trusted to provide the correct fronting
  1368. // provider ID.
  1369. maxRequestTimeouts, ok := b.maxRequestTimeouts.Load().(map[string]time.Duration)
  1370. if !ok || maxRequestTimeouts == nil {
  1371. return timeout
  1372. }
  1373. frontingProviderID, ok := logFields["fronting_provider_id"].(string)
  1374. if !ok {
  1375. return timeout
  1376. }
  1377. maxRequestTimeout, ok := maxRequestTimeouts[frontingProviderID]
  1378. if !ok || maxRequestTimeout <= 0 || timeout <= maxRequestTimeout {
  1379. return timeout
  1380. }
  1381. return maxRequestTimeout
  1382. }
  1383. type pendingServerReport struct {
  1384. serverID string
  1385. serverReport *BrokerServerReport
  1386. roundTrip *InitiatorRoundTrip
  1387. }
  1388. func (b *Broker) initiateRelayedServerReport(
  1389. serverParams *serverParams,
  1390. connectionID ID,
  1391. serverReport *BrokerServerReport) ([]byte, error) {
  1392. reportPayload, err := MarshalBrokerServerReport(serverReport)
  1393. if err != nil {
  1394. return nil, errors.Trace(err)
  1395. }
  1396. // Force a new, concurrent session establishment with the server even if
  1397. // another handshake is already in progess, relayed by some other client.
  1398. // This ensures clients don't block waiting for other client relays
  1399. // through other tunnels. The last established session will be retained
  1400. // for reuse.
  1401. waitToShareSession := false
  1402. roundTrip, err := b.initiatorSessions.NewRoundTrip(
  1403. serverParams.sessionPublicKey,
  1404. serverParams.sessionRootObfuscationSecret,
  1405. waitToShareSession,
  1406. reportPayload)
  1407. if err != nil {
  1408. return nil, errors.Trace(err)
  1409. }
  1410. relayPacket, _, err := roundTrip.Next(nil, nil)
  1411. if err != nil {
  1412. return nil, errors.Trace(err)
  1413. }
  1414. strConnectionID := string(connectionID[:])
  1415. b.pendingServerReports.Set(
  1416. strConnectionID,
  1417. &pendingServerReport{
  1418. serverID: serverParams.serverID,
  1419. serverReport: serverReport,
  1420. roundTrip: roundTrip,
  1421. },
  1422. time.Duration(atomic.LoadInt64(&b.pendingServerReportsTTL)))
  1423. return relayPacket, nil
  1424. }
  1425. type serverParams struct {
  1426. serverID string
  1427. sessionPublicKey SessionPublicKey
  1428. sessionRootObfuscationSecret ObfuscationSecret
  1429. }
  1430. // validateDestination checks that the client's specified proxy dial
  1431. // destination is valid destination address for a tunnel protocol in the
  1432. // specified signed and valid Psiphon server entry.
  1433. func (b *Broker) validateDestination(
  1434. geoIPData common.GeoIPData,
  1435. packedDestinationServerEntry []byte,
  1436. networkProtocol NetworkProtocol,
  1437. destinationAddress string) (*serverParams, error) {
  1438. var packedServerEntry protocol.PackedServerEntryFields
  1439. err := cbor.Unmarshal(packedDestinationServerEntry, &packedServerEntry)
  1440. if err != nil {
  1441. return nil, errors.Trace(err)
  1442. }
  1443. serverEntryFields, err := protocol.DecodePackedServerEntryFields(packedServerEntry)
  1444. if err != nil {
  1445. return nil, errors.Trace(err)
  1446. }
  1447. // Strip any unsigned fields, which could be forged by the client. In
  1448. // particular, this includes the server entry tag, which, in some cases,
  1449. // is locally populated by a client for its own reference.
  1450. serverEntryFields.RemoveUnsignedFields()
  1451. // Check that the server entry is signed by Psiphon. Otherwise a client
  1452. // could manufacture a server entry corresponding to an arbitrary dial
  1453. // destination.
  1454. err = serverEntryFields.VerifySignature(
  1455. b.config.ServerEntrySignaturePublicKey)
  1456. if err != nil {
  1457. return nil, errors.Trace(err)
  1458. }
  1459. // The server entry tag must be set and signed by Psiphon, as local,
  1460. // client derived tags are unsigned and untrusted.
  1461. serverEntryTag := serverEntryFields.GetTag()
  1462. if serverEntryTag == "" {
  1463. return nil, errors.TraceNew("missing server entry tag")
  1464. }
  1465. // Check that the server entry tag is on a list of active and valid
  1466. // Psiphon server entry tags. This ensures that an obsolete entry for a
  1467. // pruned server cannot by misused by a client to proxy to what's no
  1468. // longer a Psiphon server.
  1469. if !b.config.IsValidServerEntryTag(serverEntryTag) {
  1470. return nil, errors.TraceNew("invalid server entry tag")
  1471. }
  1472. serverID := serverEntryFields.GetDiagnosticID()
  1473. serverEntry, err := serverEntryFields.GetServerEntry()
  1474. if err != nil {
  1475. return nil, errors.Trace(err)
  1476. }
  1477. // Validate the dial host (IP or domain) and port matches a tunnel
  1478. // protocol offered by the server entry.
  1479. destHost, destPort, err := net.SplitHostPort(destinationAddress)
  1480. if err != nil {
  1481. return nil, errors.Trace(err)
  1482. }
  1483. destPortNum, err := strconv.Atoi(destPort)
  1484. if err != nil {
  1485. return nil, errors.Trace(err)
  1486. }
  1487. // For domain fronted cases, since we can't verify the Host header, access
  1488. // is strictly to limited to targeted clients. Clients should use tactics
  1489. // to avoid disallowed domain dial address cases, but here the broker
  1490. // enforces it.
  1491. //
  1492. // TODO: this issue could be further mitigated with a server
  1493. // acknowledgement of the broker's report, with no acknowledgement
  1494. // followed by signaling the proxy to terminate client connection.
  1495. // This assumes that any domain dial is for domain fronting.
  1496. isDomain := net.ParseIP(destHost) == nil
  1497. if isDomain && !b.config.AllowDomainFrontedDestinations(geoIPData) {
  1498. return nil, errors.TraceNew("domain fronted destinations disallowed")
  1499. }
  1500. // The server entry must include an in-proxy tunnel protocol capability
  1501. // and corresponding dial port number. In-proxy capacity may be set for
  1502. // only a subset of all Psiphon servers, to limited the number of servers
  1503. // a proxy can observe and enumerate. Well-behaved clients will not send
  1504. // any server entries lacking this capability, but here the broker
  1505. // enforces it.
  1506. if !serverEntry.IsValidInproxyDialAddress(networkProtocol.String(), destHost, destPortNum) {
  1507. return nil, errors.TraceNew("invalid destination address")
  1508. }
  1509. // Extract and return the key material to be used for the secure session
  1510. // and BrokerServer exchange between the broker and the Psiphon server
  1511. // corresponding to this server entry.
  1512. params := &serverParams{
  1513. serverID: serverID,
  1514. }
  1515. params.sessionPublicKey, err = SessionPublicKeyFromString(
  1516. serverEntry.InproxySessionPublicKey)
  1517. if err != nil {
  1518. return nil, errors.Trace(err)
  1519. }
  1520. params.sessionRootObfuscationSecret, err = ObfuscationSecretFromString(
  1521. serverEntry.InproxySessionRootObfuscationSecret)
  1522. if err != nil {
  1523. return nil, errors.Trace(err)
  1524. }
  1525. // Record that this server is known and trusted ServerProxyQualityRequest
  1526. // sender. The serverID is stored for logging in handleServerProxyQuality.
  1527. //
  1528. // There is no expiry for knownServerInitiatorIDs entries, and they will
  1529. // clear only if the broker is restarted (which is the same lifetime as
  1530. // ServerEntrySignaturePublicKey).
  1531. //
  1532. // Limitation: in time, the above IsValidServerEntryTag check could become
  1533. // false for a retired server, while its entry remains in
  1534. // knownServerInitiatorIDs. However, unlike the case of a recycled
  1535. // Psiphon server IP being used as a proxy destination, it's safer to
  1536. // assume that a retired server's session private key does not become
  1537. // exposed.
  1538. serverInitiatorID, err := params.sessionPublicKey.ToCurve25519()
  1539. if err != nil {
  1540. return nil, errors.Trace(err)
  1541. }
  1542. // For hosts running a single psiphond with multiple server entries, there
  1543. // will be multiple possible serverIDs for one serverInitiatorID. Don't
  1544. // overwrite any existing entry; keep the first observed serverID for
  1545. // more stable logging.
  1546. _, _ = b.knownServerInitiatorIDs.LoadOrStore(ID(serverInitiatorID), serverID)
  1547. return params, nil
  1548. }
  1549. func (b *Broker) isCommonCompartmentIDHashingInitialized() bool {
  1550. b.commonCompartmentsMutex.Lock()
  1551. defer b.commonCompartmentsMutex.Unlock()
  1552. return b.commonCompartments != nil
  1553. }
  1554. func (b *Broker) initializeCommonCompartmentIDHashing(
  1555. commonCompartmentIDs []ID) error {
  1556. b.commonCompartmentsMutex.Lock()
  1557. defer b.commonCompartmentsMutex.Unlock()
  1558. // At least one common compartment ID is required. At a minimum, one ID
  1559. // will be used and distributed to clients via tactics, limiting matching
  1560. // to those clients targeted to receive that tactic parameters.
  1561. if len(commonCompartmentIDs) == 0 {
  1562. return errors.TraceNew("missing common compartment IDs")
  1563. }
  1564. // The consistent package doesn't allow duplicate members.
  1565. checkDup := make(map[ID]bool, len(commonCompartmentIDs))
  1566. for _, compartmentID := range commonCompartmentIDs {
  1567. if checkDup[compartmentID] {
  1568. return errors.TraceNew("duplicate common compartment IDs")
  1569. }
  1570. checkDup[compartmentID] = true
  1571. }
  1572. // Proxies without personal compartment IDs are randomly assigned to the
  1573. // set of common, Psiphon-specified, compartment IDs. These common
  1574. // compartment IDs are then distributed to targeted clients through
  1575. // tactics or embedded in OSLs, to limit access to proxies.
  1576. //
  1577. // Use consistent hashing in an effort to keep a consistent assignment of
  1578. // proxies (as specified by proxy ID, which covers all announcements for
  1579. // a single proxy). This is more of a concern for long-lived, permanent
  1580. // proxies that are not behind any NAT.
  1581. //
  1582. // Even with consistent hashing, a subset of proxies will still change
  1583. // assignment when CommonCompartmentIDs changes.
  1584. consistentMembers := make([]consistent.Member, len(commonCompartmentIDs))
  1585. for i, compartmentID := range commonCompartmentIDs {
  1586. consistentMembers[i] = consistentMember(compartmentID.String())
  1587. }
  1588. b.commonCompartments = consistent.New(
  1589. consistentMembers,
  1590. consistent.Config{
  1591. PartitionCount: len(consistentMembers),
  1592. ReplicationFactor: 1,
  1593. Load: 1,
  1594. Hasher: xxhasher{},
  1595. })
  1596. return nil
  1597. }
  1598. // xxhasher wraps github.com/cespare/xxhash.Sum64 in the interface expected by
  1599. // github.com/buraksezer/consistent. xxhash is a high quality hash function
  1600. // used in github.com/buraksezer/consistent examples.
  1601. type xxhasher struct{}
  1602. func (h xxhasher) Sum64(data []byte) uint64 {
  1603. return xxhash.Sum64(data)
  1604. }
  1605. // consistentMember wraps the string type with the interface expected by
  1606. // github.com/buraksezer/consistent.
  1607. type consistentMember string
  1608. func (m consistentMember) String() string {
  1609. return string(m)
  1610. }
  1611. func (b *Broker) selectCommonCompartmentID(proxyID ID) (ID, error) {
  1612. b.commonCompartmentsMutex.Lock()
  1613. defer b.commonCompartmentsMutex.Unlock()
  1614. compartmentID, err := IDFromString(
  1615. b.commonCompartments.LocateKey(proxyID[:]).String())
  1616. if err != nil {
  1617. return compartmentID, errors.Trace(err)
  1618. }
  1619. return compartmentID, nil
  1620. }