frontedHTTP.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. package psiphon
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "sync"
  10. "time"
  11. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  12. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  13. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  14. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  15. "github.com/cespare/xxhash"
  16. )
  17. // frontedHTTPClientInstance contains the fronted HTTP dial parameters required
  18. // to create a net/http.Client, which is configured to use domain fronting.
  19. // frontedHTTPClientInstance implements HTTP client dial replay.
  20. type frontedHTTPClientInstance struct {
  21. frontedHTTPDialParameters *frontedHTTPDialParameters
  22. networkID string
  23. replayEnabled bool
  24. replayRetainFailedProbability float64
  25. replayUpdateFrequency time.Duration
  26. mutex sync.Mutex
  27. lastStoreReplay time.Time
  28. }
  29. // newFrontedHTTPClientInstance creates a new frontedHTTPClientInstance.
  30. // newFrontedHTTPClientInstance does not perform any network operations; the
  31. // new frontedHTTPClientInstance is initialized when used for a round
  32. // trip.
  33. func newFrontedHTTPClientInstance(
  34. config *Config,
  35. tunnel *Tunnel,
  36. frontingSpecs parameters.FrontingSpecs,
  37. selectedFrontingProviderID func(string),
  38. useDeviceBinder,
  39. skipVerify,
  40. disableSystemRootCAs,
  41. payloadSecure bool,
  42. ) (*frontedHTTPClientInstance, error) {
  43. if len(frontingSpecs) == 0 {
  44. return nil, errors.TraceNew("no fronting specs")
  45. }
  46. // This function duplicates some code from NewInproxyBrokerClientInstance.
  47. //
  48. // TODO: merge common functionality?
  49. p := config.GetParameters().Get()
  50. defer p.Close()
  51. // Shuffle fronting specs, for random load balancing. Fronting specs with
  52. // available dial parameter replay data are preferred.
  53. permutedIndexes := prng.Perm(len(frontingSpecs))
  54. shuffledFrontingSpecs := make(parameters.FrontingSpecs, len(frontingSpecs))
  55. for i, index := range permutedIndexes {
  56. shuffledFrontingSpecs[i] = frontingSpecs[index]
  57. }
  58. frontingSpecs = shuffledFrontingSpecs
  59. // Replay fronted HTTP dial parameters.
  60. var spec *parameters.FrontingSpec
  61. var dialParams *frontedHTTPDialParameters
  62. // Replay is disabled when the TTL, FrontedHTTPClientReplayDialParametersTTL,
  63. // is 0.
  64. now := time.Now()
  65. ttl := p.Duration(parameters.FrontedHTTPClientReplayDialParametersTTL)
  66. networkID := config.GetNetworkID()
  67. // Replay is disabled if there is an active tunnel.
  68. replayEnabled := tunnel == nil &&
  69. ttl > 0 &&
  70. !config.DisableReplay &&
  71. prng.FlipWeightedCoin(p.Float(parameters.FrontedHTTPClientReplayDialParametersProbability))
  72. if replayEnabled {
  73. selectFirstCandidate := false
  74. var err error
  75. spec, dialParams, err =
  76. SelectCandidateWithNetworkReplayParameters[parameters.FrontingSpec, frontedHTTPDialParameters](
  77. networkID,
  78. selectFirstCandidate,
  79. frontingSpecs,
  80. func(spec *parameters.FrontingSpec) string { return spec.FrontingProviderID },
  81. func(spec *parameters.FrontingSpec, dialParams *frontedHTTPDialParameters) bool {
  82. // Replay the successful fronting spec, if present, by
  83. // comparing its hash with that of the candidate.
  84. return dialParams.LastUsedTimestamp.After(now.Add(-ttl)) &&
  85. bytes.Equal(dialParams.LastUsedFrontingSpecHash, hashFrontingSpec(spec))
  86. })
  87. if err != nil {
  88. NoticeWarning("SelectCandidateWithNetworkReplayParameters failed: %v", errors.Trace(err))
  89. // Continue without replay
  90. }
  91. }
  92. // Select the first fronting spec in the shuffle when replay is not enabled
  93. // or in case SelectCandidateWithNetworkReplayParameters fails.
  94. if spec == nil {
  95. spec = frontingSpecs[0]
  96. }
  97. // Generate new fronted HTTP dial parameters if not replaying. Later,
  98. // isReplay is used to report the replay metric.
  99. isReplay := dialParams != nil
  100. if !isReplay {
  101. var err error
  102. dialParams, err = makeFrontedHTTPDialParameters(
  103. config,
  104. p,
  105. tunnel,
  106. spec,
  107. selectedFrontingProviderID,
  108. useDeviceBinder,
  109. skipVerify,
  110. disableSystemRootCAs,
  111. payloadSecure)
  112. if err != nil {
  113. return nil, errors.Trace(err)
  114. }
  115. } else {
  116. err := dialParams.prepareDialConfigs(
  117. config,
  118. p,
  119. isReplay,
  120. tunnel,
  121. useDeviceBinder,
  122. skipVerify,
  123. disableSystemRootCAs,
  124. payloadSecure)
  125. if err != nil {
  126. return nil, errors.Trace(err)
  127. }
  128. }
  129. return &frontedHTTPClientInstance{
  130. networkID: networkID,
  131. frontedHTTPDialParameters: dialParams,
  132. replayEnabled: replayEnabled,
  133. replayRetainFailedProbability: p.Float(parameters.FrontedHTTPClientReplayRetainFailedProbability),
  134. replayUpdateFrequency: p.Duration(parameters.FrontedHTTPClientReplayUpdateFrequency),
  135. }, nil
  136. }
  137. // RoundTrip implements the http.RoundTripper interface. RoundTrip makes a
  138. // domain fronted request to the meek server.
  139. //
  140. // Resources are cleaned up when the response body is closed.
  141. func (f *frontedHTTPClientInstance) RoundTrip(request *http.Request) (*http.Response, error) {
  142. // This function duplicates some code from InproxyBrokerRoundTripper.RoundTrip,
  143. // which has a more thorough implementation.
  144. //
  145. // TODO: merge implementations or common functionality?
  146. // Use MeekConn to domain front requests.
  147. conn, err := DialMeek(
  148. request.Context(),
  149. f.frontedHTTPDialParameters.FrontedMeekDialParameters.meekConfig,
  150. f.frontedHTTPDialParameters.FrontedMeekDialParameters.dialConfig)
  151. if err != nil {
  152. if request.Context().Err() != context.Canceled {
  153. // DialMeek performs an initial TLS handshake. Clear replay
  154. // parameters on error, excluding a cancelled context as
  155. // happens on shutdown.
  156. f.frontedHTTPClientRoundTripperFailed()
  157. }
  158. return nil, errors.Trace(err)
  159. }
  160. response, err := conn.RoundTrip(request)
  161. if err != nil {
  162. if request.Context().Err() != context.Canceled {
  163. // Clear replay parameters on other round trip errors, including
  164. // TLS failures and client-side timeouts, but excluding a cancelled
  165. // context as happens on shutdown.
  166. f.frontedHTTPClientRoundTripperFailed()
  167. }
  168. return nil, errors.Trace(err)
  169. }
  170. // Do not read the response body into memory all at once because it may
  171. // be large. Instead allow the caller to stream the response.
  172. body := newMeekHTTPResponseReadCloser(conn, response.Body)
  173. // Clear replay parameters if there are any errors while reading from the
  174. // response body.
  175. response.Body = newFrontedHTTPClientResponseReadCloser(f, body)
  176. // HTTP status codes other than 200 may indicate success depending on the
  177. // semantics of the operation. E.g., resumeable downloads are considered
  178. // successful if the HTTP server returns 200, 206, 304, 412, or 416.
  179. //
  180. // TODO: have the caller determine success and failure cases because this
  181. // is not always determined by the HTTP status code; e.g., HTTP server
  182. // returns 200 but payload signature check fails.
  183. if response.StatusCode == http.StatusOK ||
  184. response.StatusCode == http.StatusPartialContent ||
  185. response.StatusCode == http.StatusRequestedRangeNotSatisfiable ||
  186. response.StatusCode == http.StatusPreconditionFailed ||
  187. response.StatusCode == http.StatusNotModified {
  188. f.frontedHTTPClientRoundTripperSucceeded()
  189. } else {
  190. // TODO: do not clear replay parameters on temporary round tripper
  191. // failures, see InproxyBrokerRoundTripper.RoundTrip.
  192. f.frontedHTTPClientRoundTripperFailed()
  193. }
  194. return response, nil
  195. }
  196. // meekHTTPResponseReadCloser wraps an http.Response.Body received over a
  197. // frontedHTTPClientInstance in RoundTrip and exposes an io.ReadCloser.
  198. // Replay parameters are cleared if there are any errors while reading from
  199. // the response body.
  200. type frontedHTTPClientResponseReadCloser struct {
  201. client *frontedHTTPClientInstance
  202. responseBody io.ReadCloser
  203. }
  204. // newFrontedHTTPClientResponseReadCloser creates a frontedHTTPClientResponseReadCloser.
  205. func newFrontedHTTPClientResponseReadCloser(
  206. client *frontedHTTPClientInstance,
  207. responseBody io.ReadCloser) *frontedHTTPClientResponseReadCloser {
  208. return &frontedHTTPClientResponseReadCloser{
  209. client: client,
  210. responseBody: responseBody,
  211. }
  212. }
  213. // Read implements the io.Reader interface.
  214. func (f *frontedHTTPClientResponseReadCloser) Read(p []byte) (n int, err error) {
  215. n, err = f.responseBody.Read(p)
  216. if err != nil {
  217. f.client.frontedHTTPClientRoundTripperFailed()
  218. }
  219. return n, err
  220. }
  221. // Read implements the io.Closer interface.
  222. func (f *frontedHTTPClientResponseReadCloser) Close() error {
  223. return f.responseBody.Close()
  224. }
  225. // frontedHTTPClientRoundTripperSucceeded stores the current dial parameters
  226. // for replay.
  227. func (f *frontedHTTPClientInstance) frontedHTTPClientRoundTripperSucceeded() {
  228. // Note: duplicates code in BrokerClientRoundTripperSucceeded.
  229. f.mutex.Lock()
  230. defer f.mutex.Unlock()
  231. now := time.Now()
  232. if f.replayEnabled && now.Sub(f.lastStoreReplay) > f.replayUpdateFrequency {
  233. f.frontedHTTPDialParameters.LastUsedTimestamp = time.Now()
  234. replayID := f.frontedHTTPDialParameters.FrontedMeekDialParameters.FrontingProviderID
  235. err := SetNetworkReplayParameters[frontedHTTPDialParameters](
  236. f.networkID, replayID, f.frontedHTTPDialParameters)
  237. if err != nil {
  238. NoticeWarning("SetNetworkReplayParameters failed: %v", errors.Trace(err))
  239. // Continue without persisting replay changes.
  240. } else {
  241. f.lastStoreReplay = now
  242. }
  243. }
  244. }
  245. // frontedHTTPClientRoundTripperFailed clears replay parameters.
  246. func (f *frontedHTTPClientInstance) frontedHTTPClientRoundTripperFailed() {
  247. // Note: duplicates code in BrokerClientRoundTripperFailed.
  248. f.mutex.Lock()
  249. defer f.mutex.Unlock()
  250. // Delete any persistent replay dial parameters. Unlike with the success
  251. // case, consecutive, repeated deletes shouldn't write to storage, so
  252. // they are not avoided.
  253. if f.replayEnabled &&
  254. !prng.FlipWeightedCoin(f.replayRetainFailedProbability) {
  255. // Limitation: there's a race condition with multiple
  256. // frontedHTTPClientInstances writing to the replay datastore, such as
  257. // in the case where there's a feedback upload running concurrently
  258. // with a server list download; this delete could potentially clobber a
  259. // concurrent fresh replay store after a success.
  260. //
  261. // TODO: add an additional storage key distinguisher for each instance?
  262. replayID := f.frontedHTTPDialParameters.FrontedMeekDialParameters.FrontingProviderID
  263. err := DeleteNetworkReplayParameters[frontedHTTPDialParameters](
  264. f.networkID, replayID)
  265. if err != nil {
  266. NoticeWarning("DeleteNetworkReplayParameters failed: %v", errors.Trace(err))
  267. // Continue without resetting replay.
  268. }
  269. }
  270. }
  271. // hashFrontingSpec hashes the fronting spec. The hash is used to detect when
  272. // fronting spec tactics have changed.
  273. func hashFrontingSpec(spec *parameters.FrontingSpec) []byte {
  274. var hash [8]byte
  275. binary.BigEndian.PutUint64(
  276. hash[:],
  277. uint64(xxhash.Sum64String(fmt.Sprintf("%+v", spec))))
  278. return hash[:]
  279. }
  280. // frontedHTTPDialParameters represents a selected fronting transport and dial
  281. // parameters.
  282. //
  283. // frontedHTTPDialParameters is used to configure dialers; as a persistent
  284. // record to store successful dial parameters for replay; and to report dial
  285. // stats in notices and Psiphon API calls.
  286. //
  287. // frontedHTTPDialParameters is similar to tunnel DialParameters, but is
  288. // specific to fronted HTTP. It should be used for all fronted HTTP dials,
  289. // apart from the tunnel DialParameters cases.
  290. type frontedHTTPDialParameters struct {
  291. isReplay bool `json:"-"`
  292. LastUsedTimestamp time.Time
  293. LastUsedFrontingSpecHash []byte
  294. FrontedMeekDialParameters *FrontedMeekDialParameters
  295. }
  296. // makeFrontedHTTPDialParameters creates a new frontedHTTPDialParameters for
  297. // configuring a fronted HTTP client, including selecting a fronting transport
  298. // and all the various protocol attributes.
  299. //
  300. // payloadSecure must only be set if all HTTP plaintext payloads sent through
  301. // the returned net/http.Client will be wrapped in their own transport security
  302. // layer, which permits skipping of server certificate verification.
  303. func makeFrontedHTTPDialParameters(
  304. config *Config,
  305. p parameters.ParametersAccessor,
  306. tunnel *Tunnel,
  307. frontingSpec *parameters.FrontingSpec,
  308. selectedFrontingProviderID func(string),
  309. useDeviceBinder,
  310. skipVerify,
  311. disableSystemRootCAs,
  312. payloadSecure bool) (*frontedHTTPDialParameters, error) {
  313. currentTimestamp := time.Now()
  314. dialParams := &frontedHTTPDialParameters{
  315. LastUsedTimestamp: currentTimestamp,
  316. LastUsedFrontingSpecHash: hashFrontingSpec(frontingSpec),
  317. }
  318. var err error
  319. dialParams.FrontedMeekDialParameters, err = makeFrontedMeekDialParameters(
  320. config,
  321. p,
  322. tunnel,
  323. parameters.FrontingSpecs{frontingSpec},
  324. selectedFrontingProviderID,
  325. useDeviceBinder,
  326. skipVerify,
  327. disableSystemRootCAs,
  328. payloadSecure,
  329. )
  330. if err != nil {
  331. return nil, errors.Trace(err)
  332. }
  333. // Initialize Dial/MeekConfigs to be passed to the corresponding dialers.
  334. err = dialParams.prepareDialConfigs(
  335. config,
  336. p,
  337. false,
  338. tunnel,
  339. skipVerify,
  340. disableSystemRootCAs,
  341. useDeviceBinder,
  342. payloadSecure)
  343. if err != nil {
  344. return nil, errors.Trace(err)
  345. }
  346. return dialParams, nil
  347. }
  348. // prepareDialConfigs is called for both new and replayed dial parameters.
  349. func (dialParams *frontedHTTPDialParameters) prepareDialConfigs(
  350. config *Config,
  351. p parameters.ParametersAccessor,
  352. isReplay bool,
  353. tunnel *Tunnel,
  354. useDeviceBinder,
  355. skipVerify,
  356. disableSystemRootCAs,
  357. payloadSecure bool) error {
  358. dialParams.isReplay = isReplay
  359. if isReplay {
  360. // Initialize Dial/MeekConfigs to be passed to the corresponding dialers.
  361. err := dialParams.FrontedMeekDialParameters.prepareDialConfigs(
  362. config, p, tunnel, nil, useDeviceBinder, skipVerify,
  363. disableSystemRootCAs, payloadSecure)
  364. if err != nil {
  365. return errors.Trace(err)
  366. }
  367. }
  368. return nil
  369. }
  370. // GetMetrics implements the common.MetricsSource interface and returns log
  371. // fields detailing the fronted HTTP dial parameters.
  372. func (dialParams *frontedHTTPDialParameters) GetMetrics() common.LogFields {
  373. logFields := dialParams.FrontedMeekDialParameters.GetMetrics("")
  374. isReplay := "0"
  375. if dialParams.isReplay {
  376. isReplay = "1"
  377. }
  378. logFields["is_replay"] = isReplay
  379. return logFields
  380. }