frontedHTTPClientInstance.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package psiphon
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "sync"
  10. "time"
  11. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  12. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  13. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  14. "github.com/cespare/xxhash"
  15. )
  16. // frontedHTTPClientInstance contains the fronted HTTP dial parameters required
  17. // to create a net/http.Client, which is configured to use domain fronting.
  18. // frontedHTTPClientInstance implements HTTP client dial replay.
  19. type frontedHTTPClientInstance struct {
  20. frontedHTTPDialParameters *frontedHTTPDialParameters
  21. networkID string
  22. replayEnabled bool
  23. replayRetainFailedProbability float64
  24. replayUpdateFrequency time.Duration
  25. mutex sync.Mutex
  26. lastStoreReplay time.Time
  27. }
  28. // newFrontedHTTPClientInstance creates a new frontedHTTPClientInstance.
  29. // newFrontedHTTPClientInstance does not perform any network operations; the
  30. // new frontedHTTPClientInstance is initialized when used for a round
  31. // trip.
  32. func newFrontedHTTPClientInstance(
  33. config *Config,
  34. tunnel *Tunnel,
  35. frontingSpecs parameters.FrontingSpecs,
  36. selectedFrontingProviderID func(string),
  37. useDeviceBinder,
  38. skipVerify,
  39. disableSystemRootCAs,
  40. payloadSecure bool,
  41. ) (*frontedHTTPClientInstance, error) {
  42. // This function duplicates some code from NewInproxyBrokerClientInstance.
  43. //
  44. // TODO: merge common functionality?
  45. p := config.GetParameters().Get()
  46. defer p.Close()
  47. // Shuffle fronting specs, for random load balancing. Fronting specs with
  48. // available dial parameter replay data are preferred.
  49. permutedIndexes := prng.Perm(len(frontingSpecs))
  50. shuffledFrontingSpecs := make(parameters.FrontingSpecs, len(frontingSpecs))
  51. for i, index := range permutedIndexes {
  52. shuffledFrontingSpecs[i] = frontingSpecs[index]
  53. }
  54. frontingSpecs = shuffledFrontingSpecs
  55. // Replay fronted HTTP dial parameters.
  56. var spec *parameters.FrontingSpec
  57. var dialParams *frontedHTTPDialParameters
  58. // Replay is disabled when the TTL, TransferURLReplayDialParametersTTL,
  59. // is 0.
  60. now := time.Now()
  61. ttl := p.Duration(parameters.TransferURLReplayDialParametersTTL)
  62. networkID := config.GetNetworkID()
  63. // Replay is disabled if there is an active tunnel.
  64. replayEnabled := tunnel == nil &&
  65. ttl > 0 &&
  66. !config.DisableReplay &&
  67. prng.FlipWeightedCoin(p.Float(parameters.TransferURLReplayDialParametersProbability))
  68. if replayEnabled {
  69. selectFirstCandidate := false
  70. var err error
  71. spec, dialParams, err =
  72. SelectCandidateWithNetworkReplayParameters[parameters.FrontingSpec, frontedHTTPDialParameters](
  73. networkID,
  74. selectFirstCandidate,
  75. frontingSpecs,
  76. func(spec *parameters.FrontingSpec) string { return spec.FrontingProviderID },
  77. func(spec *parameters.FrontingSpec, dialParams *frontedHTTPDialParameters) bool {
  78. // Replay the successful fronting spec, if present, by
  79. // comparing its hash with that of the candidate.
  80. return dialParams.LastUsedTimestamp.After(now.Add(-ttl)) &&
  81. bytes.Equal(dialParams.LastUsedFrontingSpecHash, hashFrontingSpec(spec))
  82. })
  83. if err != nil {
  84. NoticeWarning("SelectCandidateWithNetworkReplayParameters failed: %v", errors.Trace(err))
  85. // Continue without replay
  86. }
  87. }
  88. // Select the first fronting spec in the shuffle when replay is not enabled
  89. // or in case SelectCandidateWithNetworkReplayParameters fails.
  90. if spec == nil {
  91. spec = frontingSpecs[prng.Intn(len(frontingSpecs)-1)]
  92. }
  93. // Generate new fronted HTTP dial parameters if not replaying. Later,
  94. // isReplay is used to report the replay metric.
  95. isReplay := dialParams != nil
  96. if !isReplay {
  97. var err error
  98. dialParams, err = makeFrontedHTTPDialParameters(
  99. config,
  100. p,
  101. tunnel,
  102. spec,
  103. selectedFrontingProviderID,
  104. useDeviceBinder,
  105. skipVerify,
  106. disableSystemRootCAs,
  107. payloadSecure)
  108. if err != nil {
  109. return nil, errors.Trace(err)
  110. }
  111. } else {
  112. err := dialParams.prepareDialConfig(
  113. config,
  114. p,
  115. isReplay,
  116. tunnel,
  117. useDeviceBinder,
  118. skipVerify,
  119. disableSystemRootCAs,
  120. payloadSecure)
  121. if err != nil {
  122. return nil, errors.Trace(err)
  123. }
  124. }
  125. return &frontedHTTPClientInstance{
  126. networkID: networkID,
  127. frontedHTTPDialParameters: dialParams,
  128. replayEnabled: replayEnabled,
  129. replayRetainFailedProbability: p.Float(parameters.TransferURLReplayRetainFailedProbability),
  130. replayUpdateFrequency: p.Duration(parameters.TransferURLReplayUpdateFrequency),
  131. }, nil
  132. }
  133. // RoundTrip implements the http.RoundTripper interface. RoundTrip makes a
  134. // domain fronted request to the meek server.
  135. //
  136. // Resources are cleaned up when the response body is closed.
  137. func (f *frontedHTTPClientInstance) RoundTrip(request *http.Request) (*http.Response, error) {
  138. // This function duplicates some code from InproxyBrokerRoundTripper.RoundTrip,
  139. // which has a more thorough implementation.
  140. //
  141. // TODO: merge implementations or common functionality?
  142. // Use MeekConn to domain front requests.
  143. conn, err := DialMeek(
  144. request.Context(),
  145. f.frontedHTTPDialParameters.FrontedMeekDialParameters.meekConfig,
  146. f.frontedHTTPDialParameters.FrontedMeekDialParameters.dialConfig)
  147. if err != nil {
  148. if request.Context().Err() != context.Canceled {
  149. // DialMeek performs an initial TLS handshake. Clear replay
  150. // parameters on error, excluding a cancelled context as
  151. // happens on shutdown.
  152. f.frontedHTTPClientRoundTripperFailed()
  153. }
  154. return nil, errors.Trace(err)
  155. }
  156. response, err := conn.RoundTrip(request)
  157. if err != nil {
  158. if request.Context().Err() != context.Canceled {
  159. // Clear replay parameters on other round trip errors, including
  160. // TLS failures and client-side timeouts, but excluding a cancelled
  161. // context as happens on shutdown.
  162. f.frontedHTTPClientRoundTripperFailed()
  163. }
  164. return nil, errors.Trace(err)
  165. }
  166. // Do not read the response body into memory all at once because it may
  167. // be large. Instead allow the caller to stream the response.
  168. body := newMeekHTTPResponseReadCloser(conn, response.Body)
  169. // Clear replay parameters if there are any errors while reading from the
  170. // response body.
  171. response.Body = newFrontedHTTPClientResponseReadCloser(f, body)
  172. if response.StatusCode == http.StatusOK {
  173. f.frontedHTTPClientRoundTripperSucceeded()
  174. } else {
  175. // TODO: do not clear replay parameters on temporary round tripper
  176. // failures, see InproxyBrokerRoundTripper.RoundTrip.
  177. f.frontedHTTPClientRoundTripperFailed()
  178. }
  179. return response, nil
  180. }
  181. // meekHTTPResponseReadCloser wraps an http.Response.Body received over a
  182. // frontedHTTPClientInstance in RoundTrip and exposes an io.ReadCloser.
  183. // Replay parameters are cleared if there are any errors while reading from
  184. // the response body.
  185. type frontedHTTPClientResponseReadCloser struct {
  186. client *frontedHTTPClientInstance
  187. responseBody io.ReadCloser
  188. }
  189. // newFrontedHTTPClientResponseReadCloser creates a frontedHTTPClientResponseReadCloser.
  190. func newFrontedHTTPClientResponseReadCloser(
  191. client *frontedHTTPClientInstance,
  192. responseBody io.ReadCloser) *frontedHTTPClientResponseReadCloser {
  193. return &frontedHTTPClientResponseReadCloser{
  194. client: client,
  195. responseBody: responseBody,
  196. }
  197. }
  198. // Read implements the io.Reader interface.
  199. func (f *frontedHTTPClientResponseReadCloser) Read(p []byte) (n int, err error) {
  200. n, err = f.responseBody.Read(p)
  201. if err != nil {
  202. f.client.frontedHTTPClientRoundTripperFailed()
  203. }
  204. return n, err
  205. }
  206. // Read implements the io.Closer interface.
  207. func (f *frontedHTTPClientResponseReadCloser) Close() error {
  208. return f.responseBody.Close()
  209. }
  210. // frontedHTTPClientRoundTripperSucceeded stores the current dial parameters
  211. // for replay.
  212. func (f *frontedHTTPClientInstance) frontedHTTPClientRoundTripperSucceeded() {
  213. // Note: duplicates code in BrokerClientRoundTripperSucceeded.
  214. f.mutex.Lock()
  215. defer f.mutex.Unlock()
  216. now := time.Now()
  217. if f.replayEnabled && now.Sub(f.lastStoreReplay) > f.replayUpdateFrequency {
  218. f.frontedHTTPDialParameters.LastUsedTimestamp = time.Now()
  219. replayID := f.frontedHTTPDialParameters.FrontedMeekDialParameters.FrontingProviderID
  220. err := SetNetworkReplayParameters[frontedHTTPDialParameters](
  221. f.networkID, replayID, f.frontedHTTPDialParameters)
  222. if err != nil {
  223. NoticeWarning("StoreFrontedHTTPDialParameters failed: %v", errors.Trace(err))
  224. // Continue without persisting replay changes.
  225. } else {
  226. f.lastStoreReplay = now
  227. }
  228. }
  229. }
  230. // frontedHTTPClientRoundTripperFailed clears replay parameters.
  231. func (f *frontedHTTPClientInstance) frontedHTTPClientRoundTripperFailed() {
  232. // Note: duplicates code in BrokerClientRoundTripperFailed.
  233. f.mutex.Lock()
  234. defer f.mutex.Unlock()
  235. // Delete any persistent replay dial parameters. Unlike with the success
  236. // case, consecutive, repeated deletes shouldn't write to storage, so
  237. // they are not avoided.
  238. if f.replayEnabled &&
  239. !prng.FlipWeightedCoin(f.replayRetainFailedProbability) {
  240. // Limitation: there's a race condition with multiple
  241. // frontedHTTPClientInstances writing to the replay datastore, such as
  242. // in the case where there's a feedback upload running concurrently
  243. // with a server list download; this delete could potentially clobber a
  244. // concurrent fresh replay store after a success.
  245. //
  246. // TODO: add an additional storage key distinguisher for each instance?
  247. replayID := f.frontedHTTPDialParameters.FrontedMeekDialParameters.FrontingProviderID
  248. err := DeleteNetworkReplayParameters[frontedHTTPDialParameters](
  249. f.networkID, replayID)
  250. if err != nil {
  251. NoticeWarning("DeleteFrontedHTTPDialParameters failed: %v", errors.Trace(err))
  252. // Continue without resetting replay.
  253. }
  254. }
  255. }
  256. // hashFrontingSpec hashes the fronting spec. The hash is used to detect when
  257. // fronting spec tactics have changed.
  258. func hashFrontingSpec(spec *parameters.FrontingSpec) []byte {
  259. var hash [8]byte
  260. binary.BigEndian.PutUint64(
  261. hash[:],
  262. uint64(xxhash.Sum64String(fmt.Sprintf("%+v", spec))))
  263. return hash[:]
  264. }