broker.go 71 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124
  1. /*
  2. * Copyright (c) 2023, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package inproxy
  20. import (
  21. "context"
  22. std_errors "errors"
  23. "net"
  24. "strconv"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/Psiphon-Labs/consistent"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  30. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  31. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  32. "github.com/cespare/xxhash"
  33. lrucache "github.com/cognusion/go-cache-lru"
  34. "github.com/fxamacker/cbor/v2"
  35. "golang.org/x/time/rate"
  36. )
  37. const (
  38. // BrokerMaxRequestBodySize is the maximum request size, that should be
  39. // enforced by the provided broker transport.
  40. BrokerMaxRequestBodySize = 65536
  41. // BrokerEndPointName is the standard name for referencing an endpoint
  42. // that services broker requests.
  43. BrokerEndPointName = "inproxy-broker"
  44. brokerProxyAnnounceTimeout = 2 * time.Minute
  45. brokerClientOfferTimeout = 10 * time.Second
  46. brokerPendingServerReportsTTL = 60 * time.Second
  47. brokerPendingServerReportsMaxSize = 100000
  48. brokerMetricName = "inproxy_broker"
  49. brokerRateLimiterReapHistoryFrequencySeconds = 300
  50. brokerRateLimiterMaxCacheEntries = 1000000
  51. )
  52. // LookupGeoIP is a callback for providing GeoIP lookup service.
  53. type LookupGeoIP func(IP string) common.GeoIPData
  54. // ExtendTransportTimeout is a callback that extends the timeout for a
  55. // server-side broker transport handler, facilitating request-specific
  56. // timeouts including long-polling for proxy announcements.
  57. type ExtendTransportTimeout func(timeout time.Duration)
  58. // GetTacticsPayload is a callback which returns the appropriate tactics
  59. // payload for the specified client/proxy GeoIP data and API parameters.
  60. type GetTacticsPayload func(
  61. geoIPData common.GeoIPData,
  62. apiParams common.APIParameters) ([]byte, string, error)
  63. // RelayDSLRequest is a callback that provides DSL request relaying. The
  64. // callback must always return a response payload, even in the case of an
  65. // error. See dsl.Relay.HandleRequest.
  66. type RelayDSLRequest func(
  67. ctx context.Context,
  68. extendTimeout ExtendTransportTimeout,
  69. clientIP string,
  70. clientGeoIPData common.GeoIPData,
  71. requestPayload []byte) ([]byte, error)
  72. // Broker is the in-proxy broker component, which matches clients and proxies
  73. // and provides WebRTC signaling functionalty.
  74. //
  75. // Both clients and proxies send requests to the broker to obtain matches and
  76. // exchange WebRTC SDPs. Broker does not implement a transport or obfuscation
  77. // layer; instead that is provided by the HandleSessionPacket caller. A
  78. // typical implementation would provide a domain fronted web server which
  79. // runs a Broker and calls Broker.HandleSessionPacket to handle web requests
  80. // encapsulating secure session packets.
  81. type Broker struct {
  82. config *BrokerConfig
  83. brokerID ID
  84. initiatorSessions *InitiatorSessions
  85. responderSessions *ResponderSessions
  86. matcher *Matcher
  87. pendingServerReports *lrucache.Cache
  88. proxyQualityState *ProxyQualityState
  89. knownServerInitiatorIDs sync.Map
  90. commonCompartmentsMutex sync.Mutex
  91. commonCompartments *consistent.Consistent
  92. proxyAnnounceTimeout atomic.Int64
  93. clientOfferTimeout atomic.Int64
  94. clientOfferPersonalTimeout atomic.Int64
  95. pendingServerReportsTTL atomic.Int64
  96. maxRequestTimeouts atomic.Value
  97. maxCompartmentIDs atomic.Int64
  98. enableProxyQualityMutex sync.Mutex
  99. enableProxyQuality atomic.Bool
  100. dslRequestRateLimiters *lrucache.Cache
  101. dslRequestRateLimitParams atomic.Value
  102. }
  103. // BrokerConfig specifies the configuration for a Broker.
  104. type BrokerConfig struct {
  105. // Logger is used to log events.
  106. Logger common.Logger
  107. // CommonCompartmentIDs is a list of common compartment IDs to apply to
  108. // proxies that announce without personal compartment ID. Common
  109. // compartment IDs are managed by Psiphon and distributed to clients via
  110. // tactics or embedded in OSLs. Clients must supply a valid compartment
  111. // ID to match with a proxy.
  112. //
  113. // A BrokerConfig must supply at least one compartment ID, or
  114. // SetCompartmentIDs must be called with at least one compartment ID
  115. // before calling Start.
  116. //
  117. // When only one, single common compartment ID is configured, it can serve
  118. // as an (obfuscation) secret that clients must obtain, via tactics, to
  119. // enable in-proxy participation.
  120. CommonCompartmentIDs []ID
  121. // AllowProxy is a callback which can indicate whether a proxy with the
  122. // given GeoIP data is allowed to match with common compartment ID
  123. // clients. Proxies with personal compartment IDs are always allowed.
  124. AllowProxy func(common.GeoIPData) bool
  125. // PrioritizeProxy is a callback which can indicate whether proxy
  126. // announcements from proxies with the specified in-proxy protocol
  127. // version, GeoIPData, and APIParameters should be prioritized in the
  128. // matcher queue. Priority proxy announcements match ahead of other proxy
  129. // announcements, regardless of announcement age/deadline. Priority
  130. // status takes precedence over preferred NAT matching. Prioritization
  131. // applies only to common compartment IDs and not personal pairing mode.
  132. PrioritizeProxy func(int, common.GeoIPData, common.APIParameters) bool
  133. // AllowClient is a callback which can indicate whether a client with the
  134. // given GeoIP data is allowed to match with common compartment ID
  135. // proxies. Clients are always allowed to match based on personal
  136. // compartment ID.
  137. AllowClient func(common.GeoIPData) bool
  138. // AllowDomainFrontedDestinations is a callback which can indicate whether
  139. // a client with the given GeoIP data is allowed to specify a proxied
  140. // destination for a domain fronted protocol. When false, only direct
  141. // address destinations are allowed.
  142. //
  143. // While tactics may may be set to instruct clients to use only direct
  144. // server tunnel protocols, with IP address destinations, this callback
  145. // adds server-side enforcement.
  146. AllowDomainFrontedDestinations func(common.GeoIPData) bool
  147. // AllowMatch is a callback which can indicate whether a proxy and client
  148. // pair, with the given, respective GeoIP data, is allowed to match
  149. // together. Pairs are always allowed to match based on personal
  150. // compartment ID.
  151. AllowMatch func(common.GeoIPData, common.GeoIPData) bool
  152. // LookupGeoIP provides GeoIP lookup service.
  153. LookupGeoIP LookupGeoIP
  154. // APIParameterValidator is a callback that validates base API metrics.
  155. APIParameterValidator common.APIParameterValidator
  156. // APIParameterValidator is a callback that formats base API metrics.
  157. APIParameterLogFieldFormatter common.APIParameterLogFieldFormatter
  158. // GetTacticsPayload provides a tactics lookup service.
  159. GetTacticsPayload GetTacticsPayload
  160. // IsValidServerEntryTag is a callback which checks if the specified
  161. // server entry tag is on the list of valid and active Psiphon server
  162. // entry tags.
  163. IsValidServerEntryTag func(serverEntryTag string) bool
  164. // IsLoadLimiting is a callback which checks if the broker process is in a
  165. // load limiting state, where consumed resources, including allocated
  166. // system memory and CPU load, exceed determined thresholds. When load
  167. // limiting is indicated, the broker will attempt to reduce load by
  168. // immediately rejecting either proxy announces or client offers,
  169. // depending on the state of the corresponding queues.
  170. IsLoadLimiting func() bool
  171. // RelayDSLRequest provides DSL request relay support.
  172. //
  173. // RelayDSLRequest must be set; return an error if no DSL relay is
  174. // configured.
  175. RelayDSLRequest RelayDSLRequest
  176. // PrivateKey is the broker's secure session long term private key.
  177. PrivateKey SessionPrivateKey
  178. // ObfuscationRootSecret broker's secure session long term obfuscation key.
  179. ObfuscationRootSecret ObfuscationSecret
  180. // ServerEntrySignaturePublicKey is the key used to verify Psiphon server
  181. // entry signatures.
  182. ServerEntrySignaturePublicKey string
  183. // These timeout parameters may be used to override defaults.
  184. ProxyAnnounceTimeout time.Duration
  185. ClientOfferTimeout time.Duration
  186. ClientOfferPersonalTimeout time.Duration
  187. PendingServerReportsTTL time.Duration
  188. // Announcement queue limit configuration.
  189. MatcherAnnouncementLimitEntryCount int
  190. MatcherAnnouncementRateLimitQuantity int
  191. MatcherAnnouncementRateLimitInterval time.Duration
  192. MatcherAnnouncementNonlimitedProxyIDs []ID
  193. // Offer queue limit configuration.
  194. MatcherOfferLimitEntryCount int
  195. MatcherOfferRateLimitQuantity int
  196. MatcherOfferRateLimitInterval time.Duration
  197. // DSL request relay rate limit configuration.
  198. DSLRequestRateLimitQuantity int
  199. DSLRequestRateLimitInterval time.Duration
  200. // MaxCompartmentIDs specifies the maximum number of compartment IDs that
  201. // can be included, per list, in one request. If 0, the value
  202. // MaxCompartmentIDs is used.
  203. MaxCompartmentIDs int
  204. }
  205. // BrokerLoggedEvent is an error type which indicates that the broker has
  206. // already logged an event recording the underlying error. This may be used
  207. // by the outer server layer to avoid redundant logs for HandleSessionPacket
  208. // errors.
  209. type BrokerLoggedEvent struct {
  210. err error
  211. }
  212. func NewBrokerLoggedEvent(err error) *BrokerLoggedEvent {
  213. return &BrokerLoggedEvent{err: err}
  214. }
  215. func (e *BrokerLoggedEvent) Error() string {
  216. return e.err.Error()
  217. }
  218. // NewBroker initializes a new Broker.
  219. func NewBroker(config *BrokerConfig) (*Broker, error) {
  220. // initiatorSessions are secure sessions initiated by the broker and used
  221. // to send BrokerServerReports to servers. The servers will be
  222. // configured to establish sessions only with brokers with specified
  223. // public keys.
  224. initiatorSessions := NewInitiatorSessions(config.PrivateKey)
  225. // responderSessions are secure sessions initiated by clients and proxies
  226. // and used to send requests to the broker. Clients and proxies are
  227. // configured to establish sessions only with specified broker public keys.
  228. responderSessions, err := NewResponderSessions(
  229. config.PrivateKey, config.ObfuscationRootSecret)
  230. if err != nil {
  231. return nil, errors.Trace(err)
  232. }
  233. // The broker ID is the broker's session public key in Curve25519 form.
  234. publicKey, err := config.PrivateKey.GetPublicKey()
  235. if err != nil {
  236. return nil, errors.Trace(err)
  237. }
  238. brokerID, err := publicKey.ToCurve25519()
  239. if err != nil {
  240. return nil, errors.Trace(err)
  241. }
  242. proxyQuality := NewProxyQuality()
  243. b := &Broker{
  244. config: config,
  245. brokerID: ID(brokerID),
  246. initiatorSessions: initiatorSessions,
  247. responderSessions: responderSessions,
  248. matcher: NewMatcher(&MatcherConfig{
  249. Logger: config.Logger,
  250. AnnouncementLimitEntryCount: config.MatcherAnnouncementLimitEntryCount,
  251. AnnouncementRateLimitQuantity: config.MatcherAnnouncementRateLimitQuantity,
  252. AnnouncementRateLimitInterval: config.MatcherAnnouncementRateLimitInterval,
  253. AnnouncementNonlimitedProxyIDs: config.MatcherAnnouncementNonlimitedProxyIDs,
  254. OfferLimitEntryCount: config.MatcherOfferLimitEntryCount,
  255. OfferRateLimitQuantity: config.MatcherOfferRateLimitQuantity,
  256. OfferRateLimitInterval: config.MatcherOfferRateLimitInterval,
  257. ProxyQualityState: proxyQuality,
  258. IsLoadLimiting: config.IsLoadLimiting,
  259. AllowMatch: config.AllowMatch,
  260. }),
  261. proxyQualityState: proxyQuality,
  262. dslRequestRateLimiters: lrucache.NewWithLRU(
  263. 0,
  264. time.Duration(brokerRateLimiterReapHistoryFrequencySeconds)*time.Second,
  265. brokerRateLimiterMaxCacheEntries),
  266. }
  267. b.pendingServerReports = lrucache.NewWithLRU(
  268. common.ValueOrDefault(config.PendingServerReportsTTL, brokerPendingServerReportsTTL),
  269. 1*time.Minute,
  270. brokerPendingServerReportsMaxSize)
  271. b.proxyAnnounceTimeout.Store(int64(config.ProxyAnnounceTimeout))
  272. b.clientOfferTimeout.Store(int64(config.ClientOfferTimeout))
  273. b.clientOfferPersonalTimeout.Store(int64(config.ClientOfferPersonalTimeout))
  274. b.pendingServerReportsTTL.Store(int64(config.PendingServerReportsTTL))
  275. b.maxCompartmentIDs.Store(
  276. int64(common.ValueOrDefault(config.MaxCompartmentIDs, MaxCompartmentIDs)))
  277. b.dslRequestRateLimitParams.Store(
  278. &brokerRateLimitParams{
  279. quantity: config.DSLRequestRateLimitQuantity,
  280. interval: config.DSLRequestRateLimitInterval,
  281. })
  282. if len(config.CommonCompartmentIDs) > 0 {
  283. err = b.initializeCommonCompartmentIDHashing(config.CommonCompartmentIDs)
  284. if err != nil {
  285. return nil, errors.Trace(err)
  286. }
  287. }
  288. return b, nil
  289. }
  290. func (b *Broker) Start() error {
  291. if !b.isCommonCompartmentIDHashingInitialized() {
  292. return errors.TraceNew("missing common compartment IDs")
  293. }
  294. return errors.Trace(b.matcher.Start())
  295. }
  296. func (b *Broker) Stop() {
  297. b.matcher.Stop()
  298. }
  299. // SetCommonCompartmentIDs sets a new list of common compartment IDs,
  300. // replacing the previous configuration.
  301. func (b *Broker) SetCommonCompartmentIDs(commonCompartmentIDs []ID) error {
  302. // TODO: initializeCommonCompartmentIDHashing is called regardless whether
  303. // commonCompartmentIDs changes the previous configuration. To avoid the
  304. // overhead of consistent hashing initialization in
  305. // initializeCommonCompartmentIDHashing, add a mechanism to first quickly
  306. // check for changes?
  307. return errors.Trace(b.initializeCommonCompartmentIDHashing(commonCompartmentIDs))
  308. }
  309. // SetTimeouts sets new timeout values, replacing the previous configuration.
  310. // New timeout values do not apply to currently active announcement or offer
  311. // requests.
  312. func (b *Broker) SetTimeouts(
  313. proxyAnnounceTimeout time.Duration,
  314. clientOfferTimeout time.Duration,
  315. clientOfferPersonalTimeout time.Duration,
  316. pendingServerReportsTTL time.Duration,
  317. maxRequestTimeouts map[string]time.Duration) {
  318. b.proxyAnnounceTimeout.Store(int64(proxyAnnounceTimeout))
  319. b.clientOfferTimeout.Store(int64(clientOfferTimeout))
  320. b.clientOfferPersonalTimeout.Store(int64(clientOfferPersonalTimeout))
  321. b.pendingServerReportsTTL.Store(int64(pendingServerReportsTTL))
  322. b.maxRequestTimeouts.Store(maxRequestTimeouts)
  323. }
  324. // SetLimits sets new queue limit values, replacing the previous
  325. // configuration. New limits are only partially applied to existing queue
  326. // states; see Matcher.SetLimits.
  327. func (b *Broker) SetLimits(
  328. matcherAnnouncementLimitEntryCount int,
  329. matcherAnnouncementRateLimitQuantity int,
  330. matcherAnnouncementRateLimitInterval time.Duration,
  331. matcherAnnouncementNonlimitedProxyIDs []ID,
  332. matcherOfferLimitEntryCount int,
  333. matcherOfferRateLimitQuantity int,
  334. matcherOfferRateLimitInterval time.Duration,
  335. maxCompartmentIDs int,
  336. dslRequestRateLimitQuantity int,
  337. dslRequestRateLimitInterval time.Duration) {
  338. b.matcher.SetLimits(
  339. matcherAnnouncementLimitEntryCount,
  340. matcherAnnouncementRateLimitQuantity,
  341. matcherAnnouncementRateLimitInterval,
  342. matcherAnnouncementNonlimitedProxyIDs,
  343. matcherOfferLimitEntryCount,
  344. matcherOfferRateLimitQuantity,
  345. matcherOfferRateLimitInterval)
  346. b.maxCompartmentIDs.Store(
  347. int64(common.ValueOrDefault(maxCompartmentIDs, MaxCompartmentIDs)))
  348. b.dslRequestRateLimitParams.Store(
  349. &brokerRateLimitParams{
  350. quantity: dslRequestRateLimitQuantity,
  351. interval: dslRequestRateLimitInterval,
  352. })
  353. }
  354. func (b *Broker) SetProxyQualityParameters(
  355. enable bool,
  356. qualityTTL time.Duration,
  357. pendingFailedMatchDeadline time.Duration,
  358. failedMatchThreshold int) {
  359. // enableProxyQuality is an atomic for fast lookups in request handlers;
  360. // an additional mutex is used here to ensure the Swap/Flush combination
  361. // is also atomic.
  362. b.enableProxyQualityMutex.Lock()
  363. wasEnabled := b.enableProxyQuality.Swap(enable)
  364. if wasEnabled && !enable {
  365. // Flush quality state, since otherwise the quality TTL can retain
  366. // quality data which may be unexpectedly reactivated when enable is
  367. // toggled on again.
  368. b.proxyQualityState.Flush()
  369. }
  370. b.enableProxyQualityMutex.Unlock()
  371. b.proxyQualityState.SetParameters(
  372. qualityTTL,
  373. pendingFailedMatchDeadline,
  374. failedMatchThreshold)
  375. }
  376. // HandleSessionPacket handles a session packet from a client or proxy and
  377. // provides a response packet. The packet is part of a secure session and may
  378. // be a session handshake message, an expired session reset token, or a
  379. // session-wrapped request payload. Request payloads are routed to API
  380. // request endpoints.
  381. //
  382. // The caller is expected to provide a transport obfuscation layer, such as
  383. // domain fronted HTTPs. The session has an obfuscation layer that ensures
  384. // that packets are fully random, randomly padded, and cannot be replayed.
  385. // This makes session packets suitable to embed as plaintext in some
  386. // transports.
  387. //
  388. // The caller is responsible for rate limiting and enforcing timeouts and
  389. // maximum payload size checks.
  390. //
  391. // Secure sessions support multiplexing concurrent requests, as long as the
  392. // provided transport, for example HTTP/2, supports this as well.
  393. //
  394. // The input ctx should be canceled if the client/proxy disconnects from the
  395. // transport while HandleSessionPacket is running, since long-polling proxy
  396. // announcement requests will otherwise remain blocked until eventual
  397. // timeout; net/http does this.
  398. //
  399. // When HandleSessionPacket returns an error, the transport provider should
  400. // apply anti-probing mechanisms, as the client/proxy may be a prober or
  401. // scanner.
  402. func (b *Broker) HandleSessionPacket(
  403. ctx context.Context,
  404. extendTransportTimeout ExtendTransportTimeout,
  405. transportLogFields common.LogFields,
  406. brokerClientIP string,
  407. geoIPData common.GeoIPData,
  408. inPacket []byte) ([]byte, error) {
  409. // handleUnwrappedRequest handles requests after session unwrapping.
  410. // responderSessions.HandlePacket handles both session establishment and
  411. // request unwrapping, and invokes handleUnwrappedRequest once a session
  412. // is established and a valid request unwrapped.
  413. handleUnwrappedRequest := func(initiatorID ID, unwrappedRequestPayload []byte) ([]byte, error) {
  414. recordType, err := peekRecordPreambleType(unwrappedRequestPayload)
  415. if err != nil {
  416. return nil, errors.Trace(err)
  417. }
  418. var responsePayload []byte
  419. switch recordType {
  420. case recordTypeAPIProxyAnnounceRequest:
  421. responsePayload, err = b.handleProxyAnnounce(
  422. ctx,
  423. extendTransportTimeout,
  424. transportLogFields,
  425. brokerClientIP,
  426. geoIPData,
  427. initiatorID,
  428. unwrappedRequestPayload)
  429. if err != nil {
  430. return nil, errors.Trace(err)
  431. }
  432. case recordTypeAPIProxyAnswerRequest:
  433. responsePayload, err = b.handleProxyAnswer(
  434. ctx,
  435. extendTransportTimeout,
  436. transportLogFields,
  437. brokerClientIP,
  438. geoIPData,
  439. initiatorID,
  440. unwrappedRequestPayload)
  441. if err != nil {
  442. return nil, errors.Trace(err)
  443. }
  444. case recordTypeAPIClientOfferRequest:
  445. responsePayload, err = b.handleClientOffer(
  446. ctx,
  447. extendTransportTimeout,
  448. transportLogFields,
  449. brokerClientIP,
  450. geoIPData,
  451. initiatorID,
  452. unwrappedRequestPayload)
  453. if err != nil {
  454. return nil, errors.Trace(err)
  455. }
  456. case recordTypeAPIServerProxyQualityRequest:
  457. responsePayload, err = b.handleServerProxyQuality(
  458. ctx,
  459. extendTransportTimeout,
  460. transportLogFields,
  461. brokerClientIP,
  462. geoIPData,
  463. initiatorID,
  464. unwrappedRequestPayload)
  465. if err != nil {
  466. return nil, errors.Trace(err)
  467. }
  468. case recordTypeAPIClientRelayedPacketRequest:
  469. responsePayload, err = b.handleClientRelayedPacket(
  470. ctx,
  471. extendTransportTimeout,
  472. transportLogFields,
  473. geoIPData,
  474. initiatorID,
  475. unwrappedRequestPayload)
  476. if err != nil {
  477. return nil, errors.Trace(err)
  478. }
  479. case recordTypeAPIClientDSLRequest:
  480. responsePayload, err = b.handleClientDSL(
  481. ctx,
  482. extendTransportTimeout,
  483. transportLogFields,
  484. brokerClientIP,
  485. geoIPData,
  486. initiatorID,
  487. unwrappedRequestPayload)
  488. if err != nil {
  489. return nil, errors.Trace(err)
  490. }
  491. default:
  492. return nil, errors.Tracef("unexpected API record type %v", recordType)
  493. }
  494. return responsePayload, nil
  495. }
  496. // HandlePacket returns both a packet and an error in the expired session
  497. // reset token case. Log the error here, clear it, and return the
  498. // packet to be relayed back to the broker client.
  499. outPacket, err := b.responderSessions.HandlePacket(
  500. inPacket, handleUnwrappedRequest)
  501. if err != nil {
  502. if outPacket == nil {
  503. return nil, errors.Trace(err)
  504. }
  505. b.config.Logger.WithTraceFields(common.LogFields{"error": err}).Warning(
  506. "HandlePacket returned packet and error")
  507. }
  508. return outPacket, nil
  509. }
  510. // handleProxyAnnounce receives a proxy announcement, awaits a matching
  511. // client, and returns the client offer in the response. handleProxyAnnounce
  512. // has a long timeout so this request can idle until a matching client
  513. // arrives.
  514. func (b *Broker) handleProxyAnnounce(
  515. ctx context.Context,
  516. extendTransportTimeout ExtendTransportTimeout,
  517. transportLogFields common.LogFields,
  518. proxyIP string,
  519. geoIPData common.GeoIPData,
  520. initiatorID ID,
  521. requestPayload []byte) (retResponse []byte, retErr error) {
  522. startTime := time.Now()
  523. var logFields common.LogFields
  524. var isPriority bool
  525. var newTacticsTag string
  526. var clientOffer *MatchOffer
  527. var matchMetrics *MatchMetrics
  528. var timedOut bool
  529. var limitedErr error
  530. // As a future enhancement, a broker could initiate its own test
  531. // connection to the proxy to verify its effectiveness, including
  532. // simulating a symmetric NAT client.
  533. // Each announcement represents availability for a single client matching.
  534. // Proxies with multiple client availability will send multiple requests.
  535. //
  536. // The announcement request and response could be extended to allow the
  537. // proxy to specify availability for multiple clients in the request, and
  538. // multiple client offers returned in the response.
  539. //
  540. // If, as we expect, proxies run on home ISPs have limited upstream
  541. // bandwidth, they will support only a couple of concurrent clients, and
  542. // the simple single-client-announcment model may be sufficient. Also, if
  543. // the transport is HTTP/2, multiple requests can be multiplexed over a
  544. // single connection (and session) in any case.
  545. // The proxy ID is an implicit parameter: it's the proxy's session public
  546. // key. As part of the session handshake, the proxy has proven that it
  547. // has the corresponding private key. Proxy IDs are logged to attribute
  548. // traffic to a specific proxy.
  549. proxyID := initiatorID
  550. // Generate a connection ID. This ID is used to associate proxy
  551. // announcments, client offers, and proxy answers, as well as associating
  552. // Psiphon tunnels with in-proxy pairings.
  553. connectionID, err := MakeID()
  554. if err != nil {
  555. return nil, errors.Trace(err)
  556. }
  557. // Always log the outcome.
  558. defer func() {
  559. if logFields == nil {
  560. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  561. }
  562. logFields["broker_event"] = "proxy-announce"
  563. logFields["broker_id"] = b.brokerID
  564. logFields["proxy_id"] = proxyID
  565. logFields["is_priority"] = isPriority
  566. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  567. logFields["connection_id"] = connectionID
  568. if newTacticsTag != "" {
  569. logFields["new_tactics_tag"] = newTacticsTag
  570. }
  571. if clientOffer != nil {
  572. // Log the target Psiphon server ID (diagnostic ID). The presence
  573. // of this field indicates that a match was made.
  574. logFields["destination_server_id"] = clientOffer.DestinationServerID
  575. logFields["use_media_streams"] = clientOffer.UseMediaStreams
  576. }
  577. if timedOut {
  578. logFields["timed_out"] = true
  579. }
  580. if retErr != nil {
  581. logFields["error"] = retErr.Error()
  582. } else if limitedErr != nil {
  583. logFields["error"] = limitedErr.Error()
  584. }
  585. logFields.Add(transportLogFields)
  586. logFields.Add(matchMetrics.GetMetrics())
  587. b.config.Logger.LogMetric(brokerMetricName, logFields)
  588. if retErr != nil {
  589. retErr = NewBrokerLoggedEvent(retErr)
  590. }
  591. }()
  592. announceRequest, err := UnmarshalProxyAnnounceRequest(requestPayload)
  593. if err != nil {
  594. return nil, errors.Trace(err)
  595. }
  596. var apiParams common.APIParameters
  597. apiParams, logFields, err = announceRequest.ValidateAndGetParametersAndLogFields(
  598. int(b.maxCompartmentIDs.Load()),
  599. b.config.APIParameterValidator,
  600. b.config.APIParameterLogFieldFormatter,
  601. geoIPData)
  602. if err != nil {
  603. return nil, errors.Trace(err)
  604. }
  605. hasPersonalCompartmentIDs := len(announceRequest.PersonalCompartmentIDs) > 0
  606. // Return MustUpgrade when the proxy's protocol version is less than the
  607. // minimum required.
  608. if announceRequest.Metrics.ProtocolVersion < minimumProxyProtocolVersion {
  609. responsePayload, err := MarshalProxyAnnounceResponse(
  610. &ProxyAnnounceResponse{
  611. NoMatch: true,
  612. MustUpgrade: true,
  613. })
  614. if err != nil {
  615. return nil, errors.Trace(err)
  616. }
  617. return responsePayload, nil
  618. }
  619. // Fetch new tactics for the proxy, if required, using the tactics tag
  620. // that should be included with the API parameters. A tacticsPayload may
  621. // be returned when there are no new tactics, and this is relayed back to
  622. // the proxy, after matching, so that it can extend the TTL for its
  623. // existing, cached tactics. In the case where tactics have changed,
  624. // don't enqueue the proxy announcement and return no-match so that the
  625. // proxy can store and apply the new tactics before announcing again.
  626. var tacticsPayload []byte
  627. if announceRequest.CheckTactics {
  628. tacticsPayload, newTacticsTag, err =
  629. b.config.GetTacticsPayload(geoIPData, apiParams)
  630. if err != nil {
  631. return nil, errors.Trace(err)
  632. }
  633. if tacticsPayload != nil && newTacticsTag != "" {
  634. responsePayload, err := MarshalProxyAnnounceResponse(
  635. &ProxyAnnounceResponse{
  636. TacticsPayload: tacticsPayload,
  637. NoMatch: true,
  638. })
  639. if err != nil {
  640. return nil, errors.Trace(err)
  641. }
  642. return responsePayload, nil
  643. }
  644. }
  645. // AllowProxy may be used to disallow proxies from certain geolocations,
  646. // such as censored locations, from announcing. Proxies with personal
  647. // compartment IDs are always allowed, as they will be used only by
  648. // clients specifically configured to use them.
  649. if !hasPersonalCompartmentIDs &&
  650. !b.config.AllowProxy(geoIPData) {
  651. return nil, errors.TraceNew("proxy disallowed")
  652. }
  653. // Assign this proxy to a common compartment ID, unless it has specified a
  654. // dedicated, personal compartment ID. Assignment uses consistent hashing
  655. // keyed with the proxy ID, in an effort to keep proxies consistently
  656. // assigned to the same compartment.
  657. var commonCompartmentIDs []ID
  658. if !hasPersonalCompartmentIDs {
  659. compartmentID, err := b.selectCommonCompartmentID(proxyID)
  660. if err != nil {
  661. return nil, errors.Trace(err)
  662. }
  663. commonCompartmentIDs = []ID{compartmentID}
  664. }
  665. // Determine whether to enqueue the proxy announcement in the priority
  666. // queue. To be prioritized, a proxy, identified by its ID and ASN, must
  667. // have a recent quality tunnel recorded in the quality state. In
  668. // addition, when the PrioritizeProxy callback is set, invoke this
  669. // additional condition, which can filter by proxy geolocation and other
  670. // properties.
  671. //
  672. // There is no prioritization for personal pairing announcements.
  673. // Potential future enhancements:
  674. //
  675. // - For a proxy with unknown quality (neither reported quality tunnels,
  676. // nor known failed matches), prioritize with some low probability to
  677. // give unknown proxies a chance to qualify? This could be limited, for
  678. // example, to proxies in the same ASN as other quality proxies. To
  679. // implement this, ProxyQualityState would need to record proxy IDs
  680. // with failed matches; and proxy ASNs would need to be input to
  681. // ProxyQualityState.
  682. //
  683. // - Consider using the Psiphon server region, as given in the signed
  684. // server entry, as part of the prioritization logic.
  685. if !hasPersonalCompartmentIDs && b.enableProxyQuality.Load() {
  686. // Here, no specific client ASN is specified for HasQuality. As long
  687. // as a proxy has a quality tunnel for any client ASN, it is
  688. // prioritized. In the matching process, an attempt is made to match
  689. // using HasQuality using the client ASN. See Matcher.matchOffer.
  690. isPriority = b.proxyQualityState.HasQuality(proxyID, geoIPData.ASN, "")
  691. }
  692. if isPriority && b.config.PrioritizeProxy != nil {
  693. // Note that, in the psiphon/server package, inproxyBrokerPrioritizeProxy
  694. // is always wired up, and, as currently implemented, the default value for
  695. // the InproxyBrokerMatcherPrioritizeProxiesFilter tactics parameter
  696. // results in PrioritizeProxy always returning false. Some filter,
  697. // even just a wildcard match, must be configured in order to prioritize.
  698. // Limitation: Of the two return values from
  699. // ValidateAndGetParametersAndLogFields, apiParams and logFields,
  700. // only logFields contains fields such as max_clients
  701. // and *_bytes_per_second, and so these cannot be part of any
  702. // filtering performed by the PrioritizeProxy callback.
  703. //
  704. // TODO: include the additional fields in logFields. Since the
  705. // logFields return value is the output of server.getRequestLogFields
  706. // processing, it's not safe to use it directly. In addition,
  707. // filtering by fields such as max_clients and *_bytes_per_second
  708. // calls for range filtering, which is not yet supported in the
  709. // psiphon/server.MeekServer PrioritizeProxy provider.
  710. isPriority = b.config.PrioritizeProxy(
  711. int(announceRequest.Metrics.ProtocolVersion), geoIPData, apiParams)
  712. }
  713. // Await client offer.
  714. timeout := common.ValueOrDefault(
  715. time.Duration(b.proxyAnnounceTimeout.Load()),
  716. brokerProxyAnnounceTimeout)
  717. // Adjust the timeout to respect any shorter maximum request timeouts for
  718. // the fronting provider.
  719. timeout = b.adjustRequestTimeout(logFields, timeout)
  720. announceCtx, cancelFunc := context.WithTimeout(ctx, timeout)
  721. defer cancelFunc()
  722. extendTransportTimeout(timeout)
  723. // Note that matcher.Announce assumes a monotonically increasing
  724. // announceCtx.Deadline input for each successive call.
  725. clientOffer, matchMetrics, err = b.matcher.Announce(
  726. announceCtx,
  727. proxyIP,
  728. &MatchAnnouncement{
  729. Properties: MatchProperties{
  730. IsPriority: isPriority,
  731. ProtocolVersion: announceRequest.Metrics.ProtocolVersion,
  732. CommonCompartmentIDs: commonCompartmentIDs,
  733. PersonalCompartmentIDs: announceRequest.PersonalCompartmentIDs,
  734. GeoIPData: geoIPData,
  735. NetworkType: GetNetworkType(announceRequest.Metrics.BaseAPIParameters),
  736. NATType: announceRequest.Metrics.NATType,
  737. PortMappingTypes: announceRequest.Metrics.PortMappingTypes,
  738. },
  739. ProxyID: initiatorID,
  740. ProxyMetrics: announceRequest.Metrics,
  741. ConnectionID: connectionID,
  742. })
  743. if err != nil {
  744. var limitError *MatcherLimitError
  745. limited := std_errors.As(err, &limitError)
  746. timeout := announceCtx.Err() == context.DeadlineExceeded
  747. if !limited && !timeout {
  748. return nil, errors.Trace(err)
  749. }
  750. // A no-match response is sent in the case of a timeout awaiting a
  751. // match. The faster-failing rate or entry limiting case also results
  752. // in a response, rather than an error return from handleProxyAnnounce,
  753. // so that the proxy doesn't receive a 404 and flag its BrokerClient as
  754. // having failed.
  755. //
  756. // When the timeout and limit case coincide, limit takes precedence in
  757. // the response.
  758. if timeout && !limited {
  759. // Note: the respective proxy and broker timeouts,
  760. // InproxyBrokerProxyAnnounceTimeout and
  761. // InproxyProxyAnnounceRequestTimeout in tactics, should be
  762. // configured so that the broker will timeout first and have an
  763. // opportunity to send this response before the proxy times out.
  764. timedOut = true
  765. } else {
  766. // Record the specific limit error in the proxy-announce broker event.
  767. limitedErr = err
  768. }
  769. responsePayload, err := MarshalProxyAnnounceResponse(
  770. &ProxyAnnounceResponse{
  771. TacticsPayload: tacticsPayload,
  772. Limited: limited,
  773. NoMatch: timeout && !limited,
  774. })
  775. if err != nil {
  776. return nil, errors.Trace(err)
  777. }
  778. return responsePayload, nil
  779. }
  780. // Select the protocol version. The matcher has already checked
  781. // negotiateProtocolVersion, so failure is not expected.
  782. negotiatedProtocolVersion, ok := negotiateProtocolVersion(
  783. announceRequest.Metrics.ProtocolVersion,
  784. clientOffer.Properties.ProtocolVersion,
  785. clientOffer.UseMediaStreams)
  786. if !ok {
  787. return nil, errors.TraceNew("unexpected negotiateProtocolVersion failure")
  788. }
  789. // Respond with the client offer. The proxy will follow up with an answer
  790. // request, which is relayed to the client, and then the WebRTC dial begins.
  791. // Limitation: as part of the client's tunnel establishment horse race, a
  792. // client may abort an in-proxy dial at any point. If the overall dial is
  793. // past the SDP exchange and aborted during the WebRTC connection
  794. // establishment, the client may leave the proxy's Proxy.proxyOneClient
  795. // dangling until timeout. Consider adding a signal from the client to
  796. // the proxy, relayed by the broker, that a dial is aborted.
  797. responsePayload, err := MarshalProxyAnnounceResponse(
  798. &ProxyAnnounceResponse{
  799. TacticsPayload: tacticsPayload,
  800. ConnectionID: connectionID,
  801. SelectedProtocolVersion: negotiatedProtocolVersion,
  802. ClientOfferSDP: clientOffer.ClientOfferSDP,
  803. ClientRootObfuscationSecret: clientOffer.ClientRootObfuscationSecret,
  804. DoDTLSRandomization: clientOffer.DoDTLSRandomization,
  805. UseMediaStreams: clientOffer.UseMediaStreams,
  806. TrafficShapingParameters: clientOffer.TrafficShapingParameters,
  807. NetworkProtocol: clientOffer.NetworkProtocol,
  808. DestinationAddress: clientOffer.DestinationAddress,
  809. })
  810. if err != nil {
  811. return nil, errors.Trace(err)
  812. }
  813. // Set the "failed match" trigger, which will progress towards clearing
  814. // the quality state for this proxyID unless quality tunnels are reported
  815. // soon enough after matches. This includes failure, by the proxy, to
  816. // return an proxy answer, as well as any tunnel failures after that.
  817. //
  818. // Failures are expected even for good quality proxies, due to cases such
  819. // as the in-proxy protocol losing the client tunnel establishment horse
  820. // race. There is a threshold number of failed matches that must be
  821. // reached before a quality state is cleared.
  822. b.proxyQualityState.Matched(proxyID, geoIPData.ASN)
  823. return responsePayload, nil
  824. }
  825. // handleClientOffer receives a client offer, awaits a matching client, and
  826. // returns the proxy answer. handleClientOffer has a shorter timeout than
  827. // handleProxyAnnounce since the client has supplied an SDP with STUN hole
  828. // punches which will expire; and, in general, the client is trying to
  829. // connect immediately and is also trying other candidates.
  830. func (b *Broker) handleClientOffer(
  831. ctx context.Context,
  832. extendTransportTimeout ExtendTransportTimeout,
  833. transportLogFields common.LogFields,
  834. clientIP string,
  835. geoIPData common.GeoIPData,
  836. initiatorID ID,
  837. requestPayload []byte) (retResponse []byte, retErr error) {
  838. // As a future enhancement, consider having proxies send offer SDPs with
  839. // announcements and clients long poll to await a match and then provide
  840. // an answer. This order of operations would make sense if client demand
  841. // is high and proxy supply is lower.
  842. //
  843. // Also see comment in Proxy.proxyOneClient for other alternative
  844. // approaches.
  845. // The client's session public key is ephemeral and is not logged.
  846. startTime := time.Now()
  847. var logFields common.LogFields
  848. var serverParams *serverParams
  849. var clientMatchOffer *MatchOffer
  850. var proxyMatchAnnouncement *MatchAnnouncement
  851. var proxyAnswer *MatchAnswer
  852. var matchMetrics *MatchMetrics
  853. var timedOut bool
  854. var limitedErr error
  855. // Always log the outcome.
  856. defer func() {
  857. if logFields == nil {
  858. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  859. }
  860. logFields["broker_event"] = "client-offer"
  861. logFields["broker_id"] = b.brokerID
  862. if serverParams != nil {
  863. logFields["destination_server_id"] = serverParams.serverID
  864. }
  865. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  866. if proxyAnswer != nil {
  867. // The presence of these fields indicate that a match was made,
  868. // the proxy delivered an answer, and the client was still
  869. // waiting for it.
  870. logFields["connection_id"] = proxyAnswer.ConnectionID
  871. logFields["client_nat_type"] = clientMatchOffer.Properties.NATType
  872. logFields["client_port_mapping_types"] = clientMatchOffer.Properties.PortMappingTypes
  873. logFields["proxy_nat_type"] = proxyMatchAnnouncement.Properties.NATType
  874. logFields["proxy_port_mapping_types"] = proxyMatchAnnouncement.Properties.PortMappingTypes
  875. logFields["preferred_nat_match"] =
  876. clientMatchOffer.Properties.IsPreferredNATMatch(&proxyMatchAnnouncement.Properties)
  877. // TODO: also log proxy ice_candidate_types and has_IPv6; for the
  878. // client, these values are added by ValidateAndGetLogFields.
  879. }
  880. if timedOut {
  881. logFields["timed_out"] = true
  882. }
  883. if retErr != nil {
  884. logFields["error"] = retErr.Error()
  885. } else if limitedErr != nil {
  886. logFields["error"] = limitedErr.Error()
  887. }
  888. logFields.Add(transportLogFields)
  889. logFields.Add(matchMetrics.GetMetrics())
  890. b.config.Logger.LogMetric(brokerMetricName, logFields)
  891. if retErr != nil {
  892. retErr = NewBrokerLoggedEvent(retErr)
  893. }
  894. }()
  895. offerRequest, err := UnmarshalClientOfferRequest(requestPayload)
  896. if err != nil {
  897. return nil, errors.Trace(err)
  898. }
  899. // The filtered SDP is the request SDP with any invalid (bogon, unexpected
  900. // GeoIP) ICE candidates filtered out. In some cases, clients cannot
  901. // avoid submitting invalid candidates (see comment in
  902. // processSDPAddresses), so all invalid candidates are removed and the
  903. // remaining SDP is used. Filtered candidate information is logged in
  904. // logFields.
  905. //
  906. // In personal pairing mode, RFC 1918/4193 private IP addresses are
  907. // permitted in exchanged SDPs and not filtered out.
  908. var filteredSDP []byte
  909. filteredSDP, logFields, err = offerRequest.ValidateAndGetLogFields(
  910. int(b.maxCompartmentIDs.Load()),
  911. b.config.LookupGeoIP,
  912. b.config.APIParameterValidator,
  913. b.config.APIParameterLogFieldFormatter,
  914. geoIPData)
  915. if err != nil {
  916. return nil, errors.Trace(err)
  917. }
  918. hasPersonalCompartmentIDs := len(offerRequest.PersonalCompartmentIDs) > 0
  919. offerSDP := offerRequest.ClientOfferSDP
  920. offerSDP.SDP = string(filteredSDP)
  921. // AllowClient may be used to disallow clients from certain geolocations
  922. // from offering. Clients are always allowed to match proxies with shared
  923. // personal compartment IDs.
  924. if !hasPersonalCompartmentIDs &&
  925. !b.config.AllowClient(geoIPData) {
  926. return nil, errors.TraceNew("client disallowed")
  927. }
  928. // Validate that the proxy destination specified by the client is a valid
  929. // dial address for a signed Psiphon server entry. This ensures a client
  930. // can't misuse a proxy to connect to arbitrary destinations.
  931. serverParams, err = b.validateDestination(
  932. geoIPData,
  933. offerRequest.PackedDestinationServerEntry,
  934. offerRequest.NetworkProtocol,
  935. offerRequest.DestinationAddress)
  936. if err != nil {
  937. // Record the specific error in the client-offer broker event.
  938. limitedErr = err
  939. // Return a response. This avoids returning a broker-client-resetting
  940. // 404 in cases including "invalid server entry tag", where a
  941. // legitimate client submits an unpruned, decommissioned server entry.
  942. responsePayload, err := MarshalClientOfferResponse(
  943. &ClientOfferResponse{Limited: true})
  944. if err != nil {
  945. return nil, errors.Trace(err)
  946. }
  947. return responsePayload, nil
  948. }
  949. // Return MustUpgrade when the client's protocol version is less than the
  950. // minimum required.
  951. if offerRequest.Metrics.ProtocolVersion < minimumClientProtocolVersion {
  952. responsePayload, err := MarshalClientOfferResponse(
  953. &ClientOfferResponse{
  954. NoMatch: true,
  955. MustUpgrade: true,
  956. })
  957. if err != nil {
  958. return nil, errors.Trace(err)
  959. }
  960. return responsePayload, nil
  961. }
  962. // Enqueue the client offer and await a proxy matching and subsequent
  963. // proxy answer.
  964. // The Client Offer timeout may be configured with a shorter value in
  965. // personal pairing mode, to facilitate a faster no-match result and
  966. // resulting broker rotation.
  967. var timeout time.Duration
  968. if hasPersonalCompartmentIDs {
  969. timeout = time.Duration(b.clientOfferPersonalTimeout.Load())
  970. } else {
  971. timeout = time.Duration(b.clientOfferTimeout.Load())
  972. }
  973. timeout = common.ValueOrDefault(timeout, brokerClientOfferTimeout)
  974. // Adjust the timeout to respect any shorter maximum request timeouts for
  975. // the fronting provider.
  976. timeout = b.adjustRequestTimeout(logFields, timeout)
  977. offerCtx, cancelFunc := context.WithTimeout(ctx, timeout)
  978. defer cancelFunc()
  979. extendTransportTimeout(timeout)
  980. clientMatchOffer = &MatchOffer{
  981. Properties: MatchProperties{
  982. ProtocolVersion: offerRequest.Metrics.ProtocolVersion,
  983. CommonCompartmentIDs: offerRequest.CommonCompartmentIDs,
  984. PersonalCompartmentIDs: offerRequest.PersonalCompartmentIDs,
  985. GeoIPData: geoIPData,
  986. NetworkType: GetNetworkType(offerRequest.Metrics.BaseAPIParameters),
  987. NATType: offerRequest.Metrics.NATType,
  988. PortMappingTypes: offerRequest.Metrics.PortMappingTypes,
  989. },
  990. ClientOfferSDP: offerSDP,
  991. ClientRootObfuscationSecret: offerRequest.ClientRootObfuscationSecret,
  992. DoDTLSRandomization: offerRequest.DoDTLSRandomization,
  993. UseMediaStreams: offerRequest.UseMediaStreams,
  994. TrafficShapingParameters: offerRequest.TrafficShapingParameters,
  995. NetworkProtocol: offerRequest.NetworkProtocol,
  996. DestinationAddress: offerRequest.DestinationAddress,
  997. DestinationServerID: serverParams.serverID,
  998. }
  999. proxyAnswer, proxyMatchAnnouncement, matchMetrics, err = b.matcher.Offer(
  1000. offerCtx,
  1001. clientIP,
  1002. clientMatchOffer)
  1003. if err != nil {
  1004. var limitError *MatcherLimitError
  1005. limited := std_errors.As(err, &limitError)
  1006. timeout := offerCtx.Err() == context.DeadlineExceeded
  1007. // A no-match response is sent in the case of a timeout awaiting a
  1008. // match. The faster-failing rate or entry limiting case also results
  1009. // in a response, rather than an error return from handleClientOffer,
  1010. // so that the client doesn't receive a 404 and flag its BrokerClient
  1011. // as having failed.
  1012. //
  1013. // When the timeout and limit cases coincide, limit takes precedence in
  1014. // the response.
  1015. if timeout {
  1016. // Record the time out outcome in the client-offer broker event.
  1017. //
  1018. // Note: the respective client and broker timeouts,
  1019. // InproxyBrokerClientOfferTimeout and
  1020. // InproxyClientOfferRequestTimeout in tactics, should be configured
  1021. // so that the broker will timeout first and have an opportunity to
  1022. // send this response before the client times out.
  1023. timedOut = true
  1024. }
  1025. if limited {
  1026. // Record the specific limit error in the client-offer broker event.
  1027. limitedErr = err
  1028. }
  1029. if !limited && !timeout {
  1030. // If matcher.Offer failed for some other reason, default to
  1031. // returning Limited in the response. As currently implemented,
  1032. // when clients receive the Limited flag, they will fail the
  1033. // in-proxy dial without retrying, but retain their broker client.
  1034. //
  1035. // While the matcher.Offer failure scenarios may include an
  1036. // internal error, they also include the "no answer" case where a
  1037. // proxy fails to produce an answer. For the "no answer" case,
  1038. // Limited is preferred over returning NoMatch, which can trigger
  1039. // a broker client cycle.
  1040. //
  1041. // TODO: add a new flag to signal to the client that it may retry
  1042. // in the "no answer" case.
  1043. limited = true
  1044. limitedErr = err
  1045. }
  1046. responsePayload, err := MarshalClientOfferResponse(
  1047. &ClientOfferResponse{
  1048. Limited: limited,
  1049. NoMatch: timeout && !limited,
  1050. })
  1051. if err != nil {
  1052. return nil, errors.Trace(err)
  1053. }
  1054. return responsePayload, nil
  1055. }
  1056. // Log the type of compartment matching that occurred. As
  1057. // PersonalCompartmentIDs are user-generated and shared, actual matching
  1058. // values are not logged as they may link users.
  1059. // TODO: log matching common compartment IDs?
  1060. matchedCommonCompartments := HaveCommonIDs(
  1061. proxyMatchAnnouncement.Properties.CommonCompartmentIDs,
  1062. clientMatchOffer.Properties.CommonCompartmentIDs)
  1063. matchedPersonalCompartments := HaveCommonIDs(
  1064. proxyMatchAnnouncement.Properties.PersonalCompartmentIDs,
  1065. clientMatchOffer.Properties.PersonalCompartmentIDs)
  1066. // Initiate a BrokerServerReport, which sends important information
  1067. // about the connection, including the original client IP, plus other
  1068. // values to be logged with server_tunne, to the server. The report is
  1069. // sent through a secure session established between the broker and the
  1070. // server, relayed by the client.
  1071. //
  1072. // The first relay message will be embedded in the Psiphon handshake. The
  1073. // broker may already have an established session with the server. In
  1074. // this case, only only that initial message is required. The
  1075. // BrokerServerReport is a one-way message, which avoids extra untunneled
  1076. // client/broker traffic.
  1077. //
  1078. // Limitations, due to the one-way message:
  1079. // - the broker can't actively clean up pendingServerReports as
  1080. // tunnels are established and must rely on cache expiry.
  1081. // - the broker doesn't learn that the server accepted the report, and
  1082. // so cannot log a final connection status or signal the proxy to
  1083. // disconnect the client in any misuse cases.
  1084. //
  1085. // As a future enhancement, consider adding a _tunneled_ client relay
  1086. // of a server response acknowledging the broker report.
  1087. relayPacket, err := b.initiateRelayedServerReport(
  1088. serverParams,
  1089. proxyAnswer.ConnectionID,
  1090. &BrokerServerReport{
  1091. ProxyID: proxyAnswer.ProxyID,
  1092. ConnectionID: proxyAnswer.ConnectionID,
  1093. MatchedCommonCompartments: matchedCommonCompartments,
  1094. MatchedPersonalCompartments: matchedPersonalCompartments,
  1095. ClientNATType: clientMatchOffer.Properties.NATType,
  1096. ClientPortMappingTypes: clientMatchOffer.Properties.PortMappingTypes,
  1097. ClientIP: clientIP,
  1098. ProxyIP: proxyAnswer.ProxyIP,
  1099. // ProxyMetrics includes proxy NAT and port mapping types.
  1100. ProxyMetrics: proxyMatchAnnouncement.ProxyMetrics,
  1101. ProxyIsPriority: proxyMatchAnnouncement.Properties.IsPriority,
  1102. })
  1103. if err != nil {
  1104. return nil, errors.Trace(err)
  1105. }
  1106. // Select the protocol version. The matcher has already checked
  1107. // negotiateProtocolVersion, so failure is not expected.
  1108. negotiatedProtocolVersion, ok := negotiateProtocolVersion(
  1109. proxyMatchAnnouncement.Properties.ProtocolVersion,
  1110. offerRequest.Metrics.ProtocolVersion,
  1111. offerRequest.UseMediaStreams)
  1112. if !ok {
  1113. return nil, errors.TraceNew("unexpected negotiateProtocolVersion failure")
  1114. }
  1115. // Respond with the proxy answer and initial broker/server session packet.
  1116. responsePayload, err := MarshalClientOfferResponse(
  1117. &ClientOfferResponse{
  1118. ConnectionID: proxyAnswer.ConnectionID,
  1119. SelectedProtocolVersion: negotiatedProtocolVersion,
  1120. ProxyAnswerSDP: proxyAnswer.ProxyAnswerSDP,
  1121. RelayPacketToServer: relayPacket,
  1122. })
  1123. if err != nil {
  1124. return nil, errors.Trace(err)
  1125. }
  1126. return responsePayload, nil
  1127. }
  1128. // handleProxyAnswer receives a proxy answer and delivers it to the waiting
  1129. // client.
  1130. func (b *Broker) handleProxyAnswer(
  1131. ctx context.Context,
  1132. extendTransportTimeout ExtendTransportTimeout,
  1133. transportLogFields common.LogFields,
  1134. proxyIP string,
  1135. geoIPData common.GeoIPData,
  1136. initiatorID ID,
  1137. requestPayload []byte) (retResponse []byte, retErr error) {
  1138. startTime := time.Now()
  1139. var logFields common.LogFields
  1140. var proxyAnswer *MatchAnswer
  1141. var answerError string
  1142. // The proxy ID is an implicit parameter: it's the proxy's session public
  1143. // key.
  1144. proxyID := initiatorID
  1145. // Always log the outcome.
  1146. defer func() {
  1147. if logFields == nil {
  1148. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  1149. }
  1150. logFields["broker_event"] = "proxy-answer"
  1151. logFields["broker_id"] = b.brokerID
  1152. logFields["proxy_id"] = proxyID
  1153. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  1154. if proxyAnswer != nil {
  1155. logFields["connection_id"] = proxyAnswer.ConnectionID
  1156. }
  1157. if answerError != "" {
  1158. // This is a proxy-reported error that occurred while creating the answer.
  1159. logFields["answer_error"] = answerError
  1160. }
  1161. if retErr != nil {
  1162. logFields["error"] = retErr.Error()
  1163. }
  1164. logFields.Add(transportLogFields)
  1165. b.config.Logger.LogMetric(brokerMetricName, logFields)
  1166. if retErr != nil {
  1167. retErr = NewBrokerLoggedEvent(retErr)
  1168. }
  1169. }()
  1170. answerRequest, err := UnmarshalProxyAnswerRequest(requestPayload)
  1171. if err != nil {
  1172. return nil, errors.Trace(err)
  1173. }
  1174. // The filtered SDP is the request SDP with any invalid (bogon, unexpected
  1175. // GeoIP) ICE candidates filtered out. In some cases, proxies cannot
  1176. // avoid submitting invalid candidates (see comment in
  1177. // processSDPAddresses), so all invalid candidates are removed and the
  1178. // remaining SDP is used. Filtered candidate information is logged in
  1179. // logFields.
  1180. //
  1181. // In personal pairing mode, RFC 1918/4193 private IP addresses are
  1182. // permitted in exchanged SDPs and not filtered out.
  1183. hasPersonalCompartmentIDs, err := b.matcher.AnnouncementHasPersonalCompartmentIDs(
  1184. initiatorID, answerRequest.ConnectionID)
  1185. if err != nil {
  1186. return nil, errors.Trace(err)
  1187. }
  1188. var filteredSDP []byte
  1189. filteredSDP, logFields, err = answerRequest.ValidateAndGetLogFields(
  1190. b.config.LookupGeoIP,
  1191. b.config.APIParameterValidator,
  1192. b.config.APIParameterLogFieldFormatter,
  1193. geoIPData,
  1194. hasPersonalCompartmentIDs)
  1195. if err != nil {
  1196. return nil, errors.Trace(err)
  1197. }
  1198. answerSDP := answerRequest.ProxyAnswerSDP
  1199. answerSDP.SDP = string(filteredSDP)
  1200. if answerRequest.AnswerError != "" {
  1201. // The proxy failed to create an answer.
  1202. answerError = answerRequest.AnswerError
  1203. b.matcher.AnswerError(initiatorID, answerRequest.ConnectionID)
  1204. } else {
  1205. // Deliver the answer to the client.
  1206. // Note that neither ProxyID nor ProxyIP is returned to the client.
  1207. // These fields are used internally in the matcher.
  1208. proxyAnswer = &MatchAnswer{
  1209. ProxyIP: proxyIP,
  1210. ProxyID: initiatorID,
  1211. ConnectionID: answerRequest.ConnectionID,
  1212. ProxyAnswerSDP: answerSDP,
  1213. }
  1214. err = b.matcher.Answer(proxyAnswer)
  1215. if err != nil {
  1216. return nil, errors.Trace(err)
  1217. }
  1218. }
  1219. // There is no data in this response, it's simply an acknowledgement that
  1220. // the answer was received. Upon receiving the response, the proxy should
  1221. // begin the WebRTC dial operation.
  1222. responsePayload, err := MarshalProxyAnswerResponse(
  1223. &ProxyAnswerResponse{})
  1224. if err != nil {
  1225. return nil, errors.Trace(err)
  1226. }
  1227. return responsePayload, nil
  1228. }
  1229. // handleServerProxyQuality receives, from servers, proxy tunnel quality and
  1230. // records that in the proxy quality state that is used to prioritize
  1231. // well-performing proxies.
  1232. func (b *Broker) handleServerProxyQuality(
  1233. ctx context.Context,
  1234. extendTransportTimeout ExtendTransportTimeout,
  1235. transportLogFields common.LogFields,
  1236. proxyIP string,
  1237. geoIPData common.GeoIPData,
  1238. initiatorID ID,
  1239. requestPayload []byte) (retResponse []byte, retErr error) {
  1240. startTime := time.Now()
  1241. var logFields common.LogFields
  1242. // Only known, trusted Psiphon server initiators are allowed to send proxy
  1243. // quality requests. knownServerInitiatorIDs is populated with the
  1244. // Curve25519 public keys -- initiator IDs -- corresponding to the
  1245. // session public keys found in signed Psiphon server entries.
  1246. //
  1247. // Currently, knownServerInitiatorIDs is populated with destination server
  1248. // entries received in client offers, so the broker must first receive a
  1249. // client offer before a given server is trusted, which means
  1250. // that "invalid initiator" errors may occur, and some quality requests
  1251. // may be dropped, in some expected situations, including a broker restart.
  1252. // serverID is the server entry diagnostic ID of the server.
  1253. serverIDValue, ok := b.knownServerInitiatorIDs.Load(initiatorID)
  1254. if !ok {
  1255. return nil, errors.TraceNew("invalid initiator")
  1256. }
  1257. serverID := serverIDValue.(string)
  1258. // Always log the outcome.
  1259. defer func() {
  1260. // Typically, a server will send the same proxy quality request to all
  1261. // brokers. For the one "broadcast" request, server-proxy-quality is
  1262. // logged by each broker, as an indication that every server/broker
  1263. // request pair is successful.
  1264. //
  1265. // TODO: log more details from ServerProxyQualityRequest.QualityCounts?
  1266. if logFields == nil {
  1267. logFields = common.LogFields{}
  1268. }
  1269. logFields["broker_event"] = "server-proxy-quality"
  1270. logFields["broker_id"] = b.brokerID
  1271. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  1272. logFields["server_id"] = serverID
  1273. if retErr != nil {
  1274. logFields["error"] = retErr.Error()
  1275. }
  1276. logFields.Add(transportLogFields)
  1277. b.config.Logger.LogMetric(brokerMetricName, logFields)
  1278. if retErr != nil {
  1279. retErr = NewBrokerLoggedEvent(retErr)
  1280. }
  1281. }()
  1282. qualityRequest, err := UnmarshalServerProxyQualityRequest(requestPayload)
  1283. if err != nil {
  1284. return nil, errors.Trace(err)
  1285. }
  1286. logFields, err = qualityRequest.ValidateAndGetLogFields()
  1287. if err != nil {
  1288. return nil, errors.Trace(err)
  1289. }
  1290. // Add the quality counts into the existing proxy quality state.
  1291. //
  1292. // The counts are ignored when proxy quality is disabled, but an
  1293. // anknowledgement is still returned to the server.
  1294. if b.enableProxyQuality.Load() {
  1295. for proxyKey, counts := range qualityRequest.QualityCounts {
  1296. b.proxyQualityState.AddQuality(proxyKey, counts)
  1297. }
  1298. }
  1299. // There is no data in this response, it's simply an acknowledgement that
  1300. // the request was received.
  1301. responsePayload, err := MarshalServerProxyQualityResponse(
  1302. &ServerProxyQualityResponse{})
  1303. if err != nil {
  1304. return nil, errors.Trace(err)
  1305. }
  1306. return responsePayload, nil
  1307. }
  1308. // handleClientRelayedPacket facilitates broker/server sessions. The initial
  1309. // packet from the broker is sent to the client in the ClientOfferResponse.
  1310. // The client sends that to the server in the Psiphon handshake. If the
  1311. // session was already established, the relay ends there. Otherwise, the
  1312. // client receives any packet sent back by the server and that server packet
  1313. // is then delivered to the broker in a ClientRelayedPacketRequest. If the
  1314. // session needs to be [re-]negotiated, there are additional
  1315. // ClientRelayedPacket round trips until the session is established and the
  1316. // BrokerServerReport is securely exchanged between the broker and server.
  1317. func (b *Broker) handleClientRelayedPacket(
  1318. ctx context.Context,
  1319. extendTransportTimeout ExtendTransportTimeout,
  1320. transportLogFields common.LogFields,
  1321. geoIPData common.GeoIPData,
  1322. initiatorID ID,
  1323. requestPayload []byte) (retResponse []byte, retErr error) {
  1324. startTime := time.Now()
  1325. var logFields common.LogFields
  1326. var relayedPacketRequest *ClientRelayedPacketRequest
  1327. var serverID string
  1328. // Always log the outcome.
  1329. defer func() {
  1330. if logFields == nil {
  1331. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  1332. }
  1333. logFields["broker_event"] = "client-relayed-packet"
  1334. logFields["broker_id"] = b.brokerID
  1335. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  1336. if relayedPacketRequest != nil {
  1337. logFields["connection_id"] = relayedPacketRequest.ConnectionID
  1338. }
  1339. if serverID != "" {
  1340. logFields["destination_server_id"] = serverID
  1341. }
  1342. if retErr != nil {
  1343. logFields["error"] = retErr.Error()
  1344. }
  1345. logFields.Add(transportLogFields)
  1346. b.config.Logger.LogMetric(brokerMetricName, logFields)
  1347. if retErr != nil {
  1348. retErr = NewBrokerLoggedEvent(retErr)
  1349. }
  1350. }()
  1351. relayedPacketRequest, err := UnmarshalClientRelayedPacketRequest(requestPayload)
  1352. if err != nil {
  1353. return nil, errors.Trace(err)
  1354. }
  1355. logFields, err = relayedPacketRequest.ValidateAndGetLogFields(
  1356. b.config.APIParameterValidator,
  1357. b.config.APIParameterLogFieldFormatter,
  1358. geoIPData)
  1359. if err != nil {
  1360. return nil, errors.Trace(err)
  1361. }
  1362. // The relay state is associated with the connection ID.
  1363. strConnectionID := string(relayedPacketRequest.ConnectionID[:])
  1364. entry, ok := b.pendingServerReports.Get(strConnectionID)
  1365. if !ok {
  1366. // The relay state is not found; it may have been evicted from the
  1367. // cache. The client will receive a generic error in this case and
  1368. // should stop relaying. Assuming the server is configured to require
  1369. // a BrokerServerReport, the tunnel will be terminated, so the
  1370. // client should also abandon the dial.
  1371. return nil, errors.TraceNew("no pending report")
  1372. }
  1373. pendingServerReport := entry.(*pendingServerReport)
  1374. serverID = pendingServerReport.serverID
  1375. // When the broker tried to use an existing session that was expired on the
  1376. // server, the server will respond here with a signed session reset token. The
  1377. // broker resets the session and starts to establish a new session.
  1378. //
  1379. // The non-waiting session establishment mode is used for broker/server
  1380. // sessions: if multiple clients concurrently try to relay new sessions,
  1381. // all establishments will happen in parallel without forcing any clients
  1382. // to wait for one client to lead the establishment. The last established
  1383. // session will be retained for reuse.
  1384. //
  1385. // If there is an error, the relayed packet is invalid. Drop the packet
  1386. // and return an error to be logged. Do _not_ reset the session,
  1387. // otherwise a malicious client could interrupt a valid broker/server
  1388. // session with a malformed packet.
  1389. // Next is given a nil ctx since we're not waiting for any other client to
  1390. // establish the session.
  1391. out, _, err := pendingServerReport.roundTrip.Next(
  1392. nil, relayedPacketRequest.PacketFromServer)
  1393. if err != nil {
  1394. return nil, errors.Trace(err)
  1395. }
  1396. if out == nil {
  1397. // The BrokerServerReport is a one-way message, As a result, the relay
  1398. // never ends with broker receiving a response; it's either
  1399. // (re)handshaking or sending the one-way report.
  1400. return nil, errors.TraceNew("unexpected nil packet")
  1401. }
  1402. // Return the next broker packet for the client to relay to the server.
  1403. // When it receives a nil PacketToServer, the client will stop relaying.
  1404. responsePayload, err := MarshalClientRelayedPacketResponse(
  1405. &ClientRelayedPacketResponse{
  1406. PacketToServer: out,
  1407. })
  1408. if err != nil {
  1409. return nil, errors.Trace(err)
  1410. }
  1411. return responsePayload, nil
  1412. }
  1413. // handleClientDSL relays client DSL requests. The broker's role is to provide
  1414. // a blocking resistant hop through to the DSL backend; DSL requests are not
  1415. // direct components of the in-proxy protocol.
  1416. func (b *Broker) handleClientDSL(
  1417. ctx context.Context,
  1418. extendTransportTimeout ExtendTransportTimeout,
  1419. transportLogFields common.LogFields,
  1420. clientIP string,
  1421. geoIPData common.GeoIPData,
  1422. initiatorID ID,
  1423. requestPayload []byte) (retResponse []byte, retErr error) {
  1424. startTime := time.Now()
  1425. var logFields common.LogFields
  1426. var requestSize, responseSize int
  1427. var dslRelayErr error
  1428. // Always log the outcome.
  1429. defer func() {
  1430. if logFields == nil {
  1431. logFields = b.config.APIParameterLogFieldFormatter("", geoIPData, nil)
  1432. }
  1433. logFields["broker_event"] = "client-dsl"
  1434. logFields["broker_id"] = b.brokerID
  1435. logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
  1436. logFields["request_size"] = requestSize
  1437. logFields["response_size"] = responseSize
  1438. loggedError := false
  1439. if retErr != nil {
  1440. logFields["error"] = retErr.Error()
  1441. loggedError = true
  1442. } else if dslRelayErr != nil {
  1443. logFields["error"] = dslRelayErr.Error()
  1444. loggedError = true
  1445. }
  1446. logFields.Add(transportLogFields)
  1447. b.config.Logger.LogMetric(brokerMetricName, logFields)
  1448. if loggedError {
  1449. retErr = NewBrokerLoggedEvent(retErr)
  1450. }
  1451. }()
  1452. // Rate limit the number of relayed DSL requests. The DSL backend has its
  1453. // own rate limit enforcement, but avoiding excess requests here saves on
  1454. // resources consumed between the relay and backend.
  1455. //
  1456. // Unlike the announce/offer rate limit cases, there's no "limited" error
  1457. // flag returned to the client in this case, since this rate limiter is
  1458. // purely for abuse prevention and is expected to be configured with
  1459. // limits that won't be exceeded by legitimate clients.
  1460. rateLimitParams := b.dslRequestRateLimitParams.Load().(*brokerRateLimitParams)
  1461. err := brokerRateLimit(
  1462. b.dslRequestRateLimiters,
  1463. clientIP,
  1464. rateLimitParams.quantity,
  1465. rateLimitParams.interval)
  1466. if err != nil {
  1467. return nil, errors.Trace(err)
  1468. }
  1469. dslRequest, err := UnmarshalClientDSLRequest(requestPayload)
  1470. if err != nil {
  1471. return nil, errors.Trace(err)
  1472. }
  1473. requestSize = len(dslRequest.RequestPayload)
  1474. // There is no ValidateAndGetLogFields call in this handler as the DSL
  1475. // backend (or relay) validates inputs and logs events.
  1476. dslResponsePayload, err := b.config.RelayDSLRequest(
  1477. ctx,
  1478. extendTransportTimeout,
  1479. clientIP,
  1480. geoIPData,
  1481. dslRequest.RequestPayload)
  1482. if err != nil {
  1483. // RelayDSLRequest will always return a generic error response payload
  1484. // to send to the client, to ensure it retains its broker client
  1485. // round tripper. Any DSL relay errors, including missing
  1486. // configuration, will be logged to the broker_event.
  1487. dslRelayErr = errors.Trace(err)
  1488. }
  1489. responseSize = len(dslResponsePayload)
  1490. responsePayload, err := MarshalClientDSLResponse(
  1491. &ClientDSLResponse{
  1492. ResponsePayload: dslResponsePayload,
  1493. })
  1494. if err != nil {
  1495. return nil, errors.Trace(err)
  1496. }
  1497. return responsePayload, nil
  1498. }
  1499. func (b *Broker) adjustRequestTimeout(
  1500. logFields common.LogFields, timeout time.Duration) time.Duration {
  1501. // Adjust long-polling request timeouts to respect any maximum request
  1502. // timeout supported by the provider fronting the request.
  1503. //
  1504. // Limitation: the client is trusted to provide the correct fronting
  1505. // provider ID.
  1506. maxRequestTimeouts, ok := b.maxRequestTimeouts.Load().(map[string]time.Duration)
  1507. if !ok || maxRequestTimeouts == nil {
  1508. return timeout
  1509. }
  1510. frontingProviderID, ok := logFields["fronting_provider_id"].(string)
  1511. if !ok {
  1512. return timeout
  1513. }
  1514. maxRequestTimeout, ok := maxRequestTimeouts[frontingProviderID]
  1515. if !ok || maxRequestTimeout <= 0 || timeout <= maxRequestTimeout {
  1516. return timeout
  1517. }
  1518. return maxRequestTimeout
  1519. }
  1520. type pendingServerReport struct {
  1521. serverID string
  1522. serverReport *BrokerServerReport
  1523. roundTrip *InitiatorRoundTrip
  1524. }
  1525. func (b *Broker) initiateRelayedServerReport(
  1526. serverParams *serverParams,
  1527. connectionID ID,
  1528. serverReport *BrokerServerReport) ([]byte, error) {
  1529. reportPayload, err := MarshalBrokerServerReport(serverReport)
  1530. if err != nil {
  1531. return nil, errors.Trace(err)
  1532. }
  1533. // Force a new, concurrent session establishment with the server even if
  1534. // another handshake is already in progess, relayed by some other client.
  1535. // This ensures clients don't block waiting for other client relays
  1536. // through other tunnels. The last established session will be retained
  1537. // for reuse.
  1538. waitToShareSession := false
  1539. roundTrip, err := b.initiatorSessions.NewRoundTrip(
  1540. serverParams.sessionPublicKey,
  1541. serverParams.sessionRootObfuscationSecret,
  1542. waitToShareSession,
  1543. reportPayload)
  1544. if err != nil {
  1545. return nil, errors.Trace(err)
  1546. }
  1547. relayPacket, _, err := roundTrip.Next(nil, nil)
  1548. if err != nil {
  1549. return nil, errors.Trace(err)
  1550. }
  1551. strConnectionID := string(connectionID[:])
  1552. b.pendingServerReports.Set(
  1553. strConnectionID,
  1554. &pendingServerReport{
  1555. serverID: serverParams.serverID,
  1556. serverReport: serverReport,
  1557. roundTrip: roundTrip,
  1558. },
  1559. time.Duration(b.pendingServerReportsTTL.Load()))
  1560. return relayPacket, nil
  1561. }
  1562. type serverParams struct {
  1563. serverID string
  1564. sessionPublicKey SessionPublicKey
  1565. sessionRootObfuscationSecret ObfuscationSecret
  1566. }
  1567. // validateDestination checks that the client's specified proxy dial
  1568. // destination is valid destination address for a tunnel protocol in the
  1569. // specified signed and valid Psiphon server entry.
  1570. func (b *Broker) validateDestination(
  1571. geoIPData common.GeoIPData,
  1572. packedDestinationServerEntry []byte,
  1573. networkProtocol NetworkProtocol,
  1574. destinationAddress string) (*serverParams, error) {
  1575. var packedServerEntry protocol.PackedServerEntryFields
  1576. err := cbor.Unmarshal(packedDestinationServerEntry, &packedServerEntry)
  1577. if err != nil {
  1578. return nil, errors.Trace(err)
  1579. }
  1580. serverEntryFields, err := protocol.DecodePackedServerEntryFields(packedServerEntry)
  1581. if err != nil {
  1582. return nil, errors.Trace(err)
  1583. }
  1584. // Strip any unsigned fields, which could be forged by the client. In
  1585. // particular, this includes the server entry tag, which, in some cases,
  1586. // is locally populated by a client for its own reference.
  1587. serverEntryFields.RemoveUnsignedFields()
  1588. // Check that the server entry is signed by Psiphon. Otherwise a client
  1589. // could manufacture a server entry corresponding to an arbitrary dial
  1590. // destination.
  1591. err = serverEntryFields.VerifySignature(
  1592. b.config.ServerEntrySignaturePublicKey)
  1593. if err != nil {
  1594. return nil, errors.Trace(err)
  1595. }
  1596. // The server entry tag must be set and signed by Psiphon, as local,
  1597. // client derived tags are unsigned and untrusted.
  1598. serverEntryTag := serverEntryFields.GetTag()
  1599. if serverEntryTag == "" {
  1600. return nil, errors.TraceNew("missing server entry tag")
  1601. }
  1602. // Check that the server entry tag is on a list of active and valid
  1603. // Psiphon server entry tags. This ensures that an obsolete entry for a
  1604. // pruned server cannot by misused by a client to proxy to what's no
  1605. // longer a Psiphon server.
  1606. if !b.config.IsValidServerEntryTag(serverEntryTag) {
  1607. return nil, errors.TraceNew("invalid server entry tag")
  1608. }
  1609. serverID := serverEntryFields.GetDiagnosticID()
  1610. serverEntry, err := serverEntryFields.GetServerEntry()
  1611. if err != nil {
  1612. return nil, errors.Trace(err)
  1613. }
  1614. // Validate the dial host (IP or domain) and port matches a tunnel
  1615. // protocol offered by the server entry.
  1616. destHost, destPort, err := net.SplitHostPort(destinationAddress)
  1617. if err != nil {
  1618. return nil, errors.Trace(err)
  1619. }
  1620. destPortNum, err := strconv.Atoi(destPort)
  1621. if err != nil {
  1622. return nil, errors.Trace(err)
  1623. }
  1624. // For domain fronted cases, since we can't verify the Host header, access
  1625. // is strictly to limited to targeted clients. Clients should use tactics
  1626. // to avoid disallowed domain dial address cases, but here the broker
  1627. // enforces it.
  1628. //
  1629. // TODO: this issue could be further mitigated with a server
  1630. // acknowledgement of the broker's report, with no acknowledgement
  1631. // followed by signaling the proxy to terminate client connection.
  1632. // This assumes that any domain dial is for domain fronting.
  1633. isDomain := net.ParseIP(destHost) == nil
  1634. if isDomain && !b.config.AllowDomainFrontedDestinations(geoIPData) {
  1635. return nil, errors.TraceNew("domain fronted destinations disallowed")
  1636. }
  1637. // The server entry must include an in-proxy tunnel protocol capability
  1638. // and corresponding dial port number. In-proxy capacity may be set for
  1639. // only a subset of all Psiphon servers, to limited the number of servers
  1640. // a proxy can observe and enumerate. Well-behaved clients will not send
  1641. // any server entries lacking this capability, but here the broker
  1642. // enforces it.
  1643. if !serverEntry.IsValidInproxyDialAddress(networkProtocol.String(), destHost, destPortNum) {
  1644. return nil, errors.TraceNew("invalid destination address")
  1645. }
  1646. // Extract and return the key material to be used for the secure session
  1647. // and BrokerServer exchange between the broker and the Psiphon server
  1648. // corresponding to this server entry.
  1649. params := &serverParams{
  1650. serverID: serverID,
  1651. }
  1652. params.sessionPublicKey, err = SessionPublicKeyFromString(
  1653. serverEntry.InproxySessionPublicKey)
  1654. if err != nil {
  1655. return nil, errors.Trace(err)
  1656. }
  1657. params.sessionRootObfuscationSecret, err = ObfuscationSecretFromString(
  1658. serverEntry.InproxySessionRootObfuscationSecret)
  1659. if err != nil {
  1660. return nil, errors.Trace(err)
  1661. }
  1662. // Record that this server is known and trusted ServerProxyQualityRequest
  1663. // sender. The serverID is stored for logging in handleServerProxyQuality.
  1664. //
  1665. // There is no expiry for knownServerInitiatorIDs entries, and they will
  1666. // clear only if the broker is restarted (which is the same lifetime as
  1667. // ServerEntrySignaturePublicKey).
  1668. //
  1669. // Limitation: in time, the above IsValidServerEntryTag check could become
  1670. // false for a retired server, while its entry remains in
  1671. // knownServerInitiatorIDs. However, unlike the case of a recycled
  1672. // Psiphon server IP being used as a proxy destination, it's safer to
  1673. // assume that a retired server's session private key does not become
  1674. // exposed.
  1675. serverInitiatorID, err := params.sessionPublicKey.ToCurve25519()
  1676. if err != nil {
  1677. return nil, errors.Trace(err)
  1678. }
  1679. // For hosts running a single psiphond with multiple server entries, there
  1680. // will be multiple possible serverIDs for one serverInitiatorID. Don't
  1681. // overwrite any existing entry; keep the first observed serverID for
  1682. // more stable logging.
  1683. _, _ = b.knownServerInitiatorIDs.LoadOrStore(ID(serverInitiatorID), serverID)
  1684. return params, nil
  1685. }
  1686. func (b *Broker) isCommonCompartmentIDHashingInitialized() bool {
  1687. b.commonCompartmentsMutex.Lock()
  1688. defer b.commonCompartmentsMutex.Unlock()
  1689. return b.commonCompartments != nil
  1690. }
  1691. func (b *Broker) initializeCommonCompartmentIDHashing(
  1692. commonCompartmentIDs []ID) error {
  1693. b.commonCompartmentsMutex.Lock()
  1694. defer b.commonCompartmentsMutex.Unlock()
  1695. // At least one common compartment ID is required. At a minimum, one ID
  1696. // will be used and distributed to clients via tactics, limiting matching
  1697. // to those clients targeted to receive that tactic parameters.
  1698. if len(commonCompartmentIDs) == 0 {
  1699. return errors.TraceNew("missing common compartment IDs")
  1700. }
  1701. // The consistent package doesn't allow duplicate members.
  1702. checkDup := make(map[ID]bool, len(commonCompartmentIDs))
  1703. for _, compartmentID := range commonCompartmentIDs {
  1704. if checkDup[compartmentID] {
  1705. return errors.TraceNew("duplicate common compartment IDs")
  1706. }
  1707. checkDup[compartmentID] = true
  1708. }
  1709. // Proxies without personal compartment IDs are randomly assigned to the
  1710. // set of common, Psiphon-specified, compartment IDs. These common
  1711. // compartment IDs are then distributed to targeted clients through
  1712. // tactics or embedded in OSLs, to limit access to proxies.
  1713. //
  1714. // Use consistent hashing in an effort to keep a consistent assignment of
  1715. // proxies (as specified by proxy ID, which covers all announcements for
  1716. // a single proxy). This is more of a concern for long-lived, permanent
  1717. // proxies that are not behind any NAT.
  1718. //
  1719. // Even with consistent hashing, a subset of proxies will still change
  1720. // assignment when CommonCompartmentIDs changes.
  1721. consistentMembers := make([]consistent.Member, len(commonCompartmentIDs))
  1722. for i, compartmentID := range commonCompartmentIDs {
  1723. consistentMembers[i] = consistentMember(compartmentID.String())
  1724. }
  1725. b.commonCompartments = consistent.New(
  1726. consistentMembers,
  1727. consistent.Config{
  1728. PartitionCount: len(consistentMembers),
  1729. ReplicationFactor: 1,
  1730. Load: 1,
  1731. Hasher: xxhasher{},
  1732. })
  1733. return nil
  1734. }
  1735. // xxhasher wraps github.com/cespare/xxhash.Sum64 in the interface expected by
  1736. // github.com/buraksezer/consistent. xxhash is a high quality hash function
  1737. // used in github.com/buraksezer/consistent examples.
  1738. type xxhasher struct{}
  1739. func (h xxhasher) Sum64(data []byte) uint64 {
  1740. return xxhash.Sum64(data)
  1741. }
  1742. // consistentMember wraps the string type with the interface expected by
  1743. // github.com/buraksezer/consistent.
  1744. type consistentMember string
  1745. func (m consistentMember) String() string {
  1746. return string(m)
  1747. }
  1748. func (b *Broker) selectCommonCompartmentID(proxyID ID) (ID, error) {
  1749. b.commonCompartmentsMutex.Lock()
  1750. defer b.commonCompartmentsMutex.Unlock()
  1751. compartmentID, err := IDFromString(
  1752. b.commonCompartments.LocateKey(proxyID[:]).String())
  1753. if err != nil {
  1754. return compartmentID, errors.Trace(err)
  1755. }
  1756. return compartmentID, nil
  1757. }
  1758. type brokerRateLimitParams struct {
  1759. quantity int
  1760. interval time.Duration
  1761. }
  1762. func brokerRateLimit(
  1763. rateLimiters *lrucache.Cache,
  1764. limitIP string,
  1765. quantity int,
  1766. interval time.Duration) error {
  1767. if quantity <= 0 || interval <= 0 {
  1768. return nil
  1769. }
  1770. var rateLimiter *rate.Limiter
  1771. entry, ok := rateLimiters.Get(limitIP)
  1772. if ok {
  1773. rateLimiter = entry.(*rate.Limiter)
  1774. } else {
  1775. limit := float64(quantity) / interval.Seconds()
  1776. rateLimiter = rate.NewLimiter(rate.Limit(limit), quantity)
  1777. rateLimiters.Set(
  1778. limitIP, rateLimiter, interval)
  1779. }
  1780. if !rateLimiter.Allow() {
  1781. return errors.TraceNew("rate exceeded for IP")
  1782. }
  1783. return nil
  1784. }