frontedHTTP.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. package psiphon
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "sync"
  10. "time"
  11. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  12. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  13. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  14. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  15. "github.com/cespare/xxhash"
  16. )
  17. // frontedHTTPClientInstance contains the fronted HTTP dial parameters required
  18. // to create a net/http.Client, which is configured to use domain fronting.
  19. // frontedHTTPClientInstance implements HTTP client dial replay.
  20. type frontedHTTPClientInstance struct {
  21. frontedHTTPDialParameters *frontedHTTPDialParameters
  22. networkID string
  23. replayEnabled bool
  24. replayRetainFailedProbability float64
  25. replayUpdateFrequency time.Duration
  26. mutex sync.Mutex
  27. lastStoreReplay time.Time
  28. }
  29. // newFrontedHTTPClientInstance creates a new frontedHTTPClientInstance.
  30. // newFrontedHTTPClientInstance does not perform any network operations; the
  31. // new frontedHTTPClientInstance is initialized when used for a round
  32. // trip.
  33. func newFrontedHTTPClientInstance(
  34. config *Config,
  35. tunnel *Tunnel,
  36. frontingSpecs parameters.FrontingSpecs,
  37. selectedFrontingProviderID func(string),
  38. useDeviceBinder,
  39. skipVerify,
  40. disableSystemRootCAs,
  41. payloadSecure bool,
  42. ) (*frontedHTTPClientInstance, error) {
  43. // This function duplicates some code from NewInproxyBrokerClientInstance.
  44. //
  45. // TODO: merge common functionality?
  46. p := config.GetParameters().Get()
  47. defer p.Close()
  48. // Shuffle fronting specs, for random load balancing. Fronting specs with
  49. // available dial parameter replay data are preferred.
  50. permutedIndexes := prng.Perm(len(frontingSpecs))
  51. shuffledFrontingSpecs := make(parameters.FrontingSpecs, len(frontingSpecs))
  52. for i, index := range permutedIndexes {
  53. shuffledFrontingSpecs[i] = frontingSpecs[index]
  54. }
  55. frontingSpecs = shuffledFrontingSpecs
  56. // Replay fronted HTTP dial parameters.
  57. var spec *parameters.FrontingSpec
  58. var dialParams *frontedHTTPDialParameters
  59. // Replay is disabled when the TTL, FrontedHTTPClientReplayDialParametersTTL,
  60. // is 0.
  61. now := time.Now()
  62. ttl := p.Duration(parameters.FrontedHTTPClientReplayDialParametersTTL)
  63. networkID := config.GetNetworkID()
  64. // Replay is disabled if there is an active tunnel.
  65. replayEnabled := tunnel == nil &&
  66. ttl > 0 &&
  67. !config.DisableReplay &&
  68. prng.FlipWeightedCoin(p.Float(parameters.FrontedHTTPClientReplayDialParametersProbability))
  69. if replayEnabled {
  70. selectFirstCandidate := false
  71. var err error
  72. spec, dialParams, err =
  73. SelectCandidateWithNetworkReplayParameters[parameters.FrontingSpec, frontedHTTPDialParameters](
  74. networkID,
  75. selectFirstCandidate,
  76. frontingSpecs,
  77. func(spec *parameters.FrontingSpec) string { return spec.FrontingProviderID },
  78. func(spec *parameters.FrontingSpec, dialParams *frontedHTTPDialParameters) bool {
  79. // Replay the successful fronting spec, if present, by
  80. // comparing its hash with that of the candidate.
  81. return dialParams.LastUsedTimestamp.After(now.Add(-ttl)) &&
  82. bytes.Equal(dialParams.LastUsedFrontingSpecHash, hashFrontingSpec(spec))
  83. })
  84. if err != nil {
  85. NoticeWarning("SelectCandidateWithNetworkReplayParameters failed: %v", errors.Trace(err))
  86. // Continue without replay
  87. }
  88. }
  89. // Select the first fronting spec in the shuffle when replay is not enabled
  90. // or in case SelectCandidateWithNetworkReplayParameters fails.
  91. if spec == nil {
  92. spec = frontingSpecs[prng.Intn(len(frontingSpecs)-1)]
  93. }
  94. // Generate new fronted HTTP dial parameters if not replaying. Later,
  95. // isReplay is used to report the replay metric.
  96. isReplay := dialParams != nil
  97. if !isReplay {
  98. var err error
  99. dialParams, err = makeFrontedHTTPDialParameters(
  100. config,
  101. p,
  102. tunnel,
  103. spec,
  104. selectedFrontingProviderID,
  105. useDeviceBinder,
  106. skipVerify,
  107. disableSystemRootCAs,
  108. payloadSecure)
  109. if err != nil {
  110. return nil, errors.Trace(err)
  111. }
  112. } else {
  113. err := dialParams.prepareDialConfigs(
  114. config,
  115. p,
  116. isReplay,
  117. tunnel,
  118. useDeviceBinder,
  119. skipVerify,
  120. disableSystemRootCAs,
  121. payloadSecure)
  122. if err != nil {
  123. return nil, errors.Trace(err)
  124. }
  125. }
  126. return &frontedHTTPClientInstance{
  127. networkID: networkID,
  128. frontedHTTPDialParameters: dialParams,
  129. replayEnabled: replayEnabled,
  130. replayRetainFailedProbability: p.Float(parameters.FrontedHTTPClientReplayRetainFailedProbability),
  131. replayUpdateFrequency: p.Duration(parameters.FrontedHTTPClientReplayUpdateFrequency),
  132. }, nil
  133. }
  134. // RoundTrip implements the http.RoundTripper interface. RoundTrip makes a
  135. // domain fronted request to the meek server.
  136. //
  137. // Resources are cleaned up when the response body is closed.
  138. func (f *frontedHTTPClientInstance) RoundTrip(request *http.Request) (*http.Response, error) {
  139. // This function duplicates some code from InproxyBrokerRoundTripper.RoundTrip,
  140. // which has a more thorough implementation.
  141. //
  142. // TODO: merge implementations or common functionality?
  143. // Use MeekConn to domain front requests.
  144. conn, err := DialMeek(
  145. request.Context(),
  146. f.frontedHTTPDialParameters.FrontedMeekDialParameters.meekConfig,
  147. f.frontedHTTPDialParameters.FrontedMeekDialParameters.dialConfig)
  148. if err != nil {
  149. if request.Context().Err() != context.Canceled {
  150. // DialMeek performs an initial TLS handshake. Clear replay
  151. // parameters on error, excluding a cancelled context as
  152. // happens on shutdown.
  153. f.frontedHTTPClientRoundTripperFailed()
  154. }
  155. return nil, errors.Trace(err)
  156. }
  157. response, err := conn.RoundTrip(request)
  158. if err != nil {
  159. if request.Context().Err() != context.Canceled {
  160. // Clear replay parameters on other round trip errors, including
  161. // TLS failures and client-side timeouts, but excluding a cancelled
  162. // context as happens on shutdown.
  163. f.frontedHTTPClientRoundTripperFailed()
  164. }
  165. return nil, errors.Trace(err)
  166. }
  167. // Do not read the response body into memory all at once because it may
  168. // be large. Instead allow the caller to stream the response.
  169. body := newMeekHTTPResponseReadCloser(conn, response.Body)
  170. // Clear replay parameters if there are any errors while reading from the
  171. // response body.
  172. response.Body = newFrontedHTTPClientResponseReadCloser(f, body)
  173. // HTTP status codes other than 200 may indicate success depending on the
  174. // semantics of the operation. E.g., resumeable downloads are considered
  175. // successful if the HTTP server returns 200, 206, 304, 412, or 416.
  176. //
  177. // TODO: have the caller determine success and failure cases because this
  178. // is not always determined by the HTTP status code; e.g., HTTP server
  179. // returns 200 but payload signature check fails.
  180. if response.StatusCode == http.StatusOK ||
  181. response.StatusCode == http.StatusPartialContent ||
  182. response.StatusCode == http.StatusRequestedRangeNotSatisfiable ||
  183. response.StatusCode == http.StatusPreconditionFailed ||
  184. response.StatusCode == http.StatusNotModified {
  185. f.frontedHTTPClientRoundTripperSucceeded()
  186. } else {
  187. // TODO: do not clear replay parameters on temporary round tripper
  188. // failures, see InproxyBrokerRoundTripper.RoundTrip.
  189. f.frontedHTTPClientRoundTripperFailed()
  190. }
  191. return response, nil
  192. }
  193. // meekHTTPResponseReadCloser wraps an http.Response.Body received over a
  194. // frontedHTTPClientInstance in RoundTrip and exposes an io.ReadCloser.
  195. // Replay parameters are cleared if there are any errors while reading from
  196. // the response body.
  197. type frontedHTTPClientResponseReadCloser struct {
  198. client *frontedHTTPClientInstance
  199. responseBody io.ReadCloser
  200. }
  201. // newFrontedHTTPClientResponseReadCloser creates a frontedHTTPClientResponseReadCloser.
  202. func newFrontedHTTPClientResponseReadCloser(
  203. client *frontedHTTPClientInstance,
  204. responseBody io.ReadCloser) *frontedHTTPClientResponseReadCloser {
  205. return &frontedHTTPClientResponseReadCloser{
  206. client: client,
  207. responseBody: responseBody,
  208. }
  209. }
  210. // Read implements the io.Reader interface.
  211. func (f *frontedHTTPClientResponseReadCloser) Read(p []byte) (n int, err error) {
  212. n, err = f.responseBody.Read(p)
  213. if err != nil {
  214. f.client.frontedHTTPClientRoundTripperFailed()
  215. }
  216. return n, err
  217. }
  218. // Read implements the io.Closer interface.
  219. func (f *frontedHTTPClientResponseReadCloser) Close() error {
  220. return f.responseBody.Close()
  221. }
  222. // frontedHTTPClientRoundTripperSucceeded stores the current dial parameters
  223. // for replay.
  224. func (f *frontedHTTPClientInstance) frontedHTTPClientRoundTripperSucceeded() {
  225. // Note: duplicates code in BrokerClientRoundTripperSucceeded.
  226. f.mutex.Lock()
  227. defer f.mutex.Unlock()
  228. now := time.Now()
  229. if f.replayEnabled && now.Sub(f.lastStoreReplay) > f.replayUpdateFrequency {
  230. f.frontedHTTPDialParameters.LastUsedTimestamp = time.Now()
  231. replayID := f.frontedHTTPDialParameters.FrontedMeekDialParameters.FrontingProviderID
  232. err := SetNetworkReplayParameters[frontedHTTPDialParameters](
  233. f.networkID, replayID, f.frontedHTTPDialParameters)
  234. if err != nil {
  235. NoticeWarning("SetNetworkReplayParameters failed: %v", errors.Trace(err))
  236. // Continue without persisting replay changes.
  237. } else {
  238. f.lastStoreReplay = now
  239. }
  240. }
  241. }
  242. // frontedHTTPClientRoundTripperFailed clears replay parameters.
  243. func (f *frontedHTTPClientInstance) frontedHTTPClientRoundTripperFailed() {
  244. // Note: duplicates code in BrokerClientRoundTripperFailed.
  245. f.mutex.Lock()
  246. defer f.mutex.Unlock()
  247. // Delete any persistent replay dial parameters. Unlike with the success
  248. // case, consecutive, repeated deletes shouldn't write to storage, so
  249. // they are not avoided.
  250. if f.replayEnabled &&
  251. !prng.FlipWeightedCoin(f.replayRetainFailedProbability) {
  252. // Limitation: there's a race condition with multiple
  253. // frontedHTTPClientInstances writing to the replay datastore, such as
  254. // in the case where there's a feedback upload running concurrently
  255. // with a server list download; this delete could potentially clobber a
  256. // concurrent fresh replay store after a success.
  257. //
  258. // TODO: add an additional storage key distinguisher for each instance?
  259. replayID := f.frontedHTTPDialParameters.FrontedMeekDialParameters.FrontingProviderID
  260. err := DeleteNetworkReplayParameters[frontedHTTPDialParameters](
  261. f.networkID, replayID)
  262. if err != nil {
  263. NoticeWarning("DeleteNetworkReplayParameters failed: %v", errors.Trace(err))
  264. // Continue without resetting replay.
  265. }
  266. }
  267. }
  268. // hashFrontingSpec hashes the fronting spec. The hash is used to detect when
  269. // fronting spec tactics have changed.
  270. func hashFrontingSpec(spec *parameters.FrontingSpec) []byte {
  271. var hash [8]byte
  272. binary.BigEndian.PutUint64(
  273. hash[:],
  274. uint64(xxhash.Sum64String(fmt.Sprintf("%+v", spec))))
  275. return hash[:]
  276. }
  277. // frontedHTTPDialParameters represents a selected fronting transport and dial
  278. // parameters.
  279. //
  280. // frontedHTTPDialParameters is used to configure dialers; as a persistent
  281. // record to store successful dial parameters for replay; and to report dial
  282. // stats in notices and Psiphon API calls.
  283. //
  284. // frontedHTTPDialParameters is similar to tunnel DialParameters, but is
  285. // specific to fronted HTTP. It should be used for all fronted HTTP dials,
  286. // apart from the tunnel DialParameters cases.
  287. type frontedHTTPDialParameters struct {
  288. isReplay bool `json:"-"`
  289. LastUsedTimestamp time.Time
  290. LastUsedFrontingSpecHash []byte
  291. FrontedMeekDialParameters *FrontedMeekDialParameters
  292. }
  293. // makeFrontedHTTPDialParameters creates a new frontedHTTPDialParameters for
  294. // configuring a fronted HTTP client, including selecting a fronting transport
  295. // and all the various protocol attributes.
  296. //
  297. // payloadSecure must only be set if all HTTP plaintext payloads sent through
  298. // the returned net/http.Client will be wrapped in their own transport security
  299. // layer, which permits skipping of server certificate verification.
  300. func makeFrontedHTTPDialParameters(
  301. config *Config,
  302. p parameters.ParametersAccessor,
  303. tunnel *Tunnel,
  304. frontingSpec *parameters.FrontingSpec,
  305. selectedFrontingProviderID func(string),
  306. useDeviceBinder,
  307. skipVerify,
  308. disableSystemRootCAs,
  309. payloadSecure bool) (*frontedHTTPDialParameters, error) {
  310. currentTimestamp := time.Now()
  311. dialParams := &frontedHTTPDialParameters{
  312. LastUsedTimestamp: currentTimestamp,
  313. LastUsedFrontingSpecHash: hashFrontingSpec(frontingSpec),
  314. }
  315. var err error
  316. dialParams.FrontedMeekDialParameters, err = makeFrontedMeekDialParameters(
  317. config,
  318. p,
  319. tunnel,
  320. parameters.FrontingSpecs{frontingSpec},
  321. selectedFrontingProviderID,
  322. useDeviceBinder,
  323. skipVerify,
  324. disableSystemRootCAs,
  325. payloadSecure,
  326. )
  327. if err != nil {
  328. return nil, errors.Trace(err)
  329. }
  330. // Initialize Dial/MeekConfigs to be passed to the corresponding dialers.
  331. err = dialParams.prepareDialConfigs(
  332. config,
  333. p,
  334. false,
  335. tunnel,
  336. skipVerify,
  337. disableSystemRootCAs,
  338. useDeviceBinder,
  339. payloadSecure)
  340. if err != nil {
  341. return nil, errors.Trace(err)
  342. }
  343. return dialParams, nil
  344. }
  345. // prepareDialConfigs is called for both new and replayed dial parameters.
  346. func (dialParams *frontedHTTPDialParameters) prepareDialConfigs(
  347. config *Config,
  348. p parameters.ParametersAccessor,
  349. isReplay bool,
  350. tunnel *Tunnel,
  351. useDeviceBinder,
  352. skipVerify,
  353. disableSystemRootCAs,
  354. payloadSecure bool) error {
  355. dialParams.isReplay = isReplay
  356. if isReplay {
  357. // Initialize Dial/MeekConfigs to be passed to the corresponding dialers.
  358. err := dialParams.FrontedMeekDialParameters.prepareDialConfigs(
  359. config, p, tunnel, nil, useDeviceBinder, skipVerify,
  360. disableSystemRootCAs, payloadSecure)
  361. if err != nil {
  362. return errors.Trace(err)
  363. }
  364. }
  365. return nil
  366. }
  367. // GetMetrics implements the common.MetricsSource interface and returns log
  368. // fields detailing the fronted HTTP dial parameters.
  369. func (dialParams *frontedHTTPDialParameters) GetMetrics() common.LogFields {
  370. logFields := dialParams.FrontedMeekDialParameters.GetMetrics("")
  371. isReplay := "0"
  372. if dialParams.isReplay {
  373. isReplay = "1"
  374. }
  375. logFields["is_replay"] = isReplay
  376. return logFields
  377. }