frontedHTTP.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. package psiphon
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "sync"
  10. "time"
  11. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  12. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  13. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  14. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  15. utls "github.com/Psiphon-Labs/utls"
  16. "github.com/cespare/xxhash"
  17. )
  18. // frontedHTTPClientInstance contains the fronted HTTP dial parameters required
  19. // to create a net/http.Client, which is configured to use domain fronting.
  20. // frontedHTTPClientInstance implements HTTP client dial replay.
  21. type frontedHTTPClientInstance struct {
  22. frontedHTTPDialParameters *frontedHTTPDialParameters
  23. networkID string
  24. replayEnabled bool
  25. replayRetainFailedProbability float64
  26. replayUpdateFrequency time.Duration
  27. mutex sync.Mutex
  28. lastStoreReplay time.Time
  29. }
  30. // newFrontedHTTPClientInstance creates a new frontedHTTPClientInstance.
  31. // newFrontedHTTPClientInstance does not perform any network operations; the
  32. // new frontedHTTPClientInstance is initialized when used for a round
  33. // trip.
  34. func newFrontedHTTPClientInstance(
  35. config *Config,
  36. tunnel *Tunnel,
  37. frontingSpecs parameters.FrontingSpecs,
  38. selectedFrontingProviderID func(string),
  39. useDeviceBinder,
  40. skipVerify,
  41. disableSystemRootCAs,
  42. payloadSecure bool,
  43. tlsCache utls.ClientSessionCache,
  44. ) (*frontedHTTPClientInstance, error) {
  45. if len(frontingSpecs) == 0 {
  46. return nil, errors.TraceNew("no fronting specs")
  47. }
  48. // This function duplicates some code from NewInproxyBrokerClientInstance.
  49. //
  50. // TODO: merge common functionality?
  51. p := config.GetParameters().Get()
  52. defer p.Close()
  53. // Shuffle fronting specs, for random load balancing. Fronting specs with
  54. // available dial parameter replay data are preferred.
  55. permutedIndexes := prng.Perm(len(frontingSpecs))
  56. shuffledFrontingSpecs := make(parameters.FrontingSpecs, len(frontingSpecs))
  57. for i, index := range permutedIndexes {
  58. shuffledFrontingSpecs[i] = frontingSpecs[index]
  59. }
  60. frontingSpecs = shuffledFrontingSpecs
  61. // Replay fronted HTTP dial parameters.
  62. var spec *parameters.FrontingSpec
  63. var dialParams *frontedHTTPDialParameters
  64. // Replay is disabled when the TTL, FrontedHTTPClientReplayDialParametersTTL,
  65. // is 0.
  66. now := time.Now()
  67. ttl := p.Duration(parameters.FrontedHTTPClientReplayDialParametersTTL)
  68. networkID := config.GetNetworkID()
  69. // Replay is disabled if there is an active tunnel.
  70. replayEnabled := tunnel == nil &&
  71. ttl > 0 &&
  72. !config.DisableReplay &&
  73. prng.FlipWeightedCoin(p.Float(parameters.FrontedHTTPClientReplayDialParametersProbability))
  74. if replayEnabled {
  75. selectFirstCandidate := false
  76. var err error
  77. spec, dialParams, err =
  78. SelectCandidateWithNetworkReplayParameters[parameters.FrontingSpec, frontedHTTPDialParameters](
  79. networkID,
  80. selectFirstCandidate,
  81. frontingSpecs,
  82. func(spec *parameters.FrontingSpec) string { return spec.FrontingProviderID },
  83. func(spec *parameters.FrontingSpec, dialParams *frontedHTTPDialParameters) bool {
  84. // Replay the successful fronting spec, if present, by
  85. // comparing its hash with that of the candidate.
  86. return dialParams.LastUsedTimestamp.After(now.Add(-ttl)) &&
  87. bytes.Equal(dialParams.LastUsedFrontingSpecHash, hashFrontingSpec(spec))
  88. })
  89. if err != nil {
  90. NoticeWarning("SelectCandidateWithNetworkReplayParameters failed: %v", errors.Trace(err))
  91. // Continue without replay
  92. }
  93. }
  94. // Select the first fronting spec in the shuffle when replay is not enabled
  95. // or in case SelectCandidateWithNetworkReplayParameters fails.
  96. if spec == nil {
  97. spec = frontingSpecs[0]
  98. }
  99. // Generate new fronted HTTP dial parameters if not replaying. Later,
  100. // isReplay is used to report the replay metric.
  101. isReplay := dialParams != nil
  102. if !isReplay {
  103. var err error
  104. dialParams, err = makeFrontedHTTPDialParameters(
  105. config,
  106. p,
  107. tunnel,
  108. spec,
  109. selectedFrontingProviderID,
  110. useDeviceBinder,
  111. skipVerify,
  112. disableSystemRootCAs,
  113. payloadSecure,
  114. tlsCache)
  115. if err != nil {
  116. return nil, errors.Trace(err)
  117. }
  118. } else {
  119. err := dialParams.prepareDialConfigs(
  120. config,
  121. p,
  122. isReplay,
  123. tunnel,
  124. useDeviceBinder,
  125. skipVerify,
  126. disableSystemRootCAs,
  127. payloadSecure,
  128. tlsCache)
  129. if err != nil {
  130. return nil, errors.Trace(err)
  131. }
  132. }
  133. return &frontedHTTPClientInstance{
  134. networkID: networkID,
  135. frontedHTTPDialParameters: dialParams,
  136. replayEnabled: replayEnabled,
  137. replayRetainFailedProbability: p.Float(parameters.FrontedHTTPClientReplayRetainFailedProbability),
  138. replayUpdateFrequency: p.Duration(parameters.FrontedHTTPClientReplayUpdateFrequency),
  139. }, nil
  140. }
  141. // RoundTrip implements the http.RoundTripper interface. RoundTrip makes a
  142. // domain fronted request to the meek server.
  143. //
  144. // Resources are cleaned up when the response body is closed.
  145. func (f *frontedHTTPClientInstance) RoundTrip(request *http.Request) (*http.Response, error) {
  146. // This function duplicates some code from InproxyBrokerRoundTripper.RoundTrip,
  147. // which has a more thorough implementation.
  148. //
  149. // TODO: merge implementations or common functionality?
  150. // Use MeekConn to domain front requests.
  151. conn, err := DialMeek(
  152. request.Context(),
  153. f.frontedHTTPDialParameters.FrontedMeekDialParameters.meekConfig,
  154. f.frontedHTTPDialParameters.FrontedMeekDialParameters.dialConfig)
  155. if err != nil {
  156. if request.Context().Err() != context.Canceled {
  157. // DialMeek performs an initial TLS handshake. Clear replay
  158. // parameters on error, excluding a cancelled context as
  159. // happens on shutdown.
  160. f.frontedHTTPClientRoundTripperFailed()
  161. }
  162. return nil, errors.Trace(err)
  163. }
  164. response, err := conn.RoundTrip(request)
  165. if err != nil {
  166. if request.Context().Err() != context.Canceled {
  167. // Clear replay parameters on other round trip errors, including
  168. // TLS failures and client-side timeouts, but excluding a cancelled
  169. // context as happens on shutdown.
  170. f.frontedHTTPClientRoundTripperFailed()
  171. }
  172. return nil, errors.Trace(err)
  173. }
  174. // Do not read the response body into memory all at once because it may
  175. // be large. Instead allow the caller to stream the response.
  176. body := newMeekHTTPResponseReadCloser(conn, response.Body)
  177. // Clear replay parameters if there are any errors while reading from the
  178. // response body.
  179. response.Body = newFrontedHTTPClientResponseReadCloser(f, body)
  180. // HTTP status codes other than 200 may indicate success depending on the
  181. // semantics of the operation. E.g., resumeable downloads are considered
  182. // successful if the HTTP server returns 200, 206, 304, 412, or 416.
  183. //
  184. // TODO: have the caller determine success and failure cases because this
  185. // is not always determined by the HTTP status code; e.g., HTTP server
  186. // returns 200 but payload signature check fails.
  187. if response.StatusCode == http.StatusOK ||
  188. response.StatusCode == http.StatusPartialContent ||
  189. response.StatusCode == http.StatusRequestedRangeNotSatisfiable ||
  190. response.StatusCode == http.StatusPreconditionFailed ||
  191. response.StatusCode == http.StatusNotModified {
  192. f.frontedHTTPClientRoundTripperSucceeded()
  193. } else {
  194. // TODO: do not clear replay parameters on temporary round tripper
  195. // failures, see InproxyBrokerRoundTripper.RoundTrip.
  196. f.frontedHTTPClientRoundTripperFailed()
  197. }
  198. return response, nil
  199. }
  200. // meekHTTPResponseReadCloser wraps an http.Response.Body received over a
  201. // frontedHTTPClientInstance in RoundTrip and exposes an io.ReadCloser.
  202. // Replay parameters are cleared if there are any errors while reading from
  203. // the response body.
  204. type frontedHTTPClientResponseReadCloser struct {
  205. client *frontedHTTPClientInstance
  206. responseBody io.ReadCloser
  207. }
  208. // newFrontedHTTPClientResponseReadCloser creates a frontedHTTPClientResponseReadCloser.
  209. func newFrontedHTTPClientResponseReadCloser(
  210. client *frontedHTTPClientInstance,
  211. responseBody io.ReadCloser) *frontedHTTPClientResponseReadCloser {
  212. return &frontedHTTPClientResponseReadCloser{
  213. client: client,
  214. responseBody: responseBody,
  215. }
  216. }
  217. // Read implements the io.Reader interface.
  218. func (f *frontedHTTPClientResponseReadCloser) Read(p []byte) (n int, err error) {
  219. n, err = f.responseBody.Read(p)
  220. if err != nil {
  221. f.client.frontedHTTPClientRoundTripperFailed()
  222. }
  223. return n, err
  224. }
  225. // Read implements the io.Closer interface.
  226. func (f *frontedHTTPClientResponseReadCloser) Close() error {
  227. return f.responseBody.Close()
  228. }
  229. // frontedHTTPClientRoundTripperSucceeded stores the current dial parameters
  230. // for replay.
  231. func (f *frontedHTTPClientInstance) frontedHTTPClientRoundTripperSucceeded() {
  232. // Note: duplicates code in BrokerClientRoundTripperSucceeded.
  233. f.mutex.Lock()
  234. defer f.mutex.Unlock()
  235. now := time.Now()
  236. if f.replayEnabled && now.Sub(f.lastStoreReplay) > f.replayUpdateFrequency {
  237. f.frontedHTTPDialParameters.LastUsedTimestamp = time.Now()
  238. replayID := f.frontedHTTPDialParameters.FrontedMeekDialParameters.FrontingProviderID
  239. err := SetNetworkReplayParameters[frontedHTTPDialParameters](
  240. f.networkID, replayID, f.frontedHTTPDialParameters)
  241. if err != nil {
  242. NoticeWarning("SetNetworkReplayParameters failed: %v", errors.Trace(err))
  243. // Continue without persisting replay changes.
  244. } else {
  245. f.lastStoreReplay = now
  246. }
  247. }
  248. }
  249. // frontedHTTPClientRoundTripperFailed clears replay parameters.
  250. func (f *frontedHTTPClientInstance) frontedHTTPClientRoundTripperFailed() {
  251. // Note: duplicates code in BrokerClientRoundTripperFailed.
  252. f.mutex.Lock()
  253. defer f.mutex.Unlock()
  254. // Delete any persistent replay dial parameters. Unlike with the success
  255. // case, consecutive, repeated deletes shouldn't write to storage, so
  256. // they are not avoided.
  257. if f.replayEnabled &&
  258. !prng.FlipWeightedCoin(f.replayRetainFailedProbability) {
  259. // Limitation: there's a race condition with multiple
  260. // frontedHTTPClientInstances writing to the replay datastore, such as
  261. // in the case where there's a feedback upload running concurrently
  262. // with a server list download; this delete could potentially clobber a
  263. // concurrent fresh replay store after a success.
  264. //
  265. // TODO: add an additional storage key distinguisher for each instance?
  266. replayID := f.frontedHTTPDialParameters.FrontedMeekDialParameters.FrontingProviderID
  267. err := DeleteNetworkReplayParameters[frontedHTTPDialParameters](
  268. f.networkID, replayID)
  269. if err != nil {
  270. NoticeWarning("DeleteNetworkReplayParameters failed: %v", errors.Trace(err))
  271. // Continue without resetting replay.
  272. }
  273. }
  274. }
  275. // hashFrontingSpec hashes the fronting spec. The hash is used to detect when
  276. // fronting spec tactics have changed.
  277. func hashFrontingSpec(spec *parameters.FrontingSpec) []byte {
  278. var hash [8]byte
  279. binary.BigEndian.PutUint64(
  280. hash[:],
  281. uint64(xxhash.Sum64String(fmt.Sprintf("%+v", spec))))
  282. return hash[:]
  283. }
  284. // frontedHTTPDialParameters represents a selected fronting transport and dial
  285. // parameters.
  286. //
  287. // frontedHTTPDialParameters is used to configure dialers; as a persistent
  288. // record to store successful dial parameters for replay; and to report dial
  289. // stats in notices and Psiphon API calls.
  290. //
  291. // frontedHTTPDialParameters is similar to tunnel DialParameters, but is
  292. // specific to fronted HTTP. It should be used for all fronted HTTP dials,
  293. // apart from the tunnel DialParameters cases.
  294. type frontedHTTPDialParameters struct {
  295. isReplay bool `json:"-"`
  296. LastUsedTimestamp time.Time
  297. LastUsedFrontingSpecHash []byte
  298. FrontedMeekDialParameters *FrontedMeekDialParameters
  299. }
  300. // makeFrontedHTTPDialParameters creates a new frontedHTTPDialParameters for
  301. // configuring a fronted HTTP client, including selecting a fronting transport
  302. // and all the various protocol attributes.
  303. //
  304. // payloadSecure must only be set if all HTTP plaintext payloads sent through
  305. // the returned net/http.Client will be wrapped in their own transport security
  306. // layer, which permits skipping of server certificate verification.
  307. func makeFrontedHTTPDialParameters(
  308. config *Config,
  309. p parameters.ParametersAccessor,
  310. tunnel *Tunnel,
  311. frontingSpec *parameters.FrontingSpec,
  312. selectedFrontingProviderID func(string),
  313. useDeviceBinder,
  314. skipVerify,
  315. disableSystemRootCAs,
  316. payloadSecure bool,
  317. tlsCache utls.ClientSessionCache) (*frontedHTTPDialParameters, error) {
  318. currentTimestamp := time.Now()
  319. dialParams := &frontedHTTPDialParameters{
  320. LastUsedTimestamp: currentTimestamp,
  321. LastUsedFrontingSpecHash: hashFrontingSpec(frontingSpec),
  322. }
  323. var err error
  324. dialParams.FrontedMeekDialParameters, err = makeFrontedMeekDialParameters(
  325. config,
  326. p,
  327. tunnel,
  328. parameters.FrontingSpecs{frontingSpec},
  329. selectedFrontingProviderID,
  330. useDeviceBinder,
  331. skipVerify,
  332. disableSystemRootCAs,
  333. payloadSecure,
  334. tlsCache,
  335. )
  336. if err != nil {
  337. return nil, errors.Trace(err)
  338. }
  339. // Initialize Dial/MeekConfigs to be passed to the corresponding dialers.
  340. err = dialParams.prepareDialConfigs(
  341. config,
  342. p,
  343. false,
  344. tunnel,
  345. skipVerify,
  346. disableSystemRootCAs,
  347. useDeviceBinder,
  348. payloadSecure,
  349. tlsCache)
  350. if err != nil {
  351. return nil, errors.Trace(err)
  352. }
  353. return dialParams, nil
  354. }
  355. // prepareDialConfigs is called for both new and replayed dial parameters.
  356. func (dialParams *frontedHTTPDialParameters) prepareDialConfigs(
  357. config *Config,
  358. p parameters.ParametersAccessor,
  359. isReplay bool,
  360. tunnel *Tunnel,
  361. useDeviceBinder,
  362. skipVerify,
  363. disableSystemRootCAs,
  364. payloadSecure bool,
  365. tlsCache utls.ClientSessionCache) error {
  366. dialParams.isReplay = isReplay
  367. if isReplay {
  368. // Initialize Dial/MeekConfigs to be passed to the corresponding dialers.
  369. err := dialParams.FrontedMeekDialParameters.prepareDialConfigs(
  370. config, p, tunnel, nil, useDeviceBinder, skipVerify,
  371. disableSystemRootCAs, payloadSecure, tlsCache)
  372. if err != nil {
  373. return errors.Trace(err)
  374. }
  375. }
  376. return nil
  377. }
  378. // GetMetrics implements the common.MetricsSource interface and returns log
  379. // fields detailing the fronted HTTP dial parameters.
  380. func (dialParams *frontedHTTPDialParameters) GetMetrics() common.LogFields {
  381. logFields := dialParams.FrontedMeekDialParameters.GetMetrics("")
  382. isReplay := "0"
  383. if dialParams.isReplay {
  384. isReplay = "1"
  385. }
  386. logFields["is_replay"] = isReplay
  387. return logFields
  388. }