net.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. // for HTTPSServer.ServeTLS:
  20. /*
  21. Copyright (c) 2012 The Go Authors. All rights reserved.
  22. Redistribution and use in source and binary forms, with or without
  23. modification, are permitted provided that the following conditions are
  24. met:
  25. * Redistributions of source code must retain the above copyright
  26. notice, this list of conditions and the following disclaimer.
  27. * Redistributions in binary form must reproduce the above
  28. copyright notice, this list of conditions and the following disclaimer
  29. in the documentation and/or other materials provided with the
  30. distribution.
  31. * Neither the name of Google Inc. nor the names of its
  32. contributors may be used to endorse or promote products derived from
  33. this software without specific prior written permission.
  34. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  35. "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  36. LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  37. A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  38. OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  39. SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  40. LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  41. DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  42. THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  43. (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  44. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  45. */
  46. package psiphon
  47. import (
  48. "container/list"
  49. "crypto/tls"
  50. "crypto/x509"
  51. "errors"
  52. "fmt"
  53. "io"
  54. "io/ioutil"
  55. "net"
  56. "net/http"
  57. "net/url"
  58. "os"
  59. "reflect"
  60. "sync"
  61. "sync/atomic"
  62. "time"
  63. "github.com/Psiphon-Inc/dns"
  64. "github.com/Psiphon-Inc/ratelimit"
  65. )
  66. const DNS_PORT = 53
  67. // DialConfig contains parameters to determine the behavior
  68. // of a Psiphon dialer (TCPDial, MeekDial, etc.)
  69. type DialConfig struct {
  70. // UpstreamProxyUrl specifies a proxy to connect through.
  71. // E.g., "http://proxyhost:8080"
  72. // "socks5://user:password@proxyhost:1080"
  73. // "socks4a://proxyhost:1080"
  74. // "http://NTDOMAIN\NTUser:password@proxyhost:3375"
  75. //
  76. // Certain tunnel protocols require HTTP CONNECT support
  77. // when a HTTP proxy is specified. If CONNECT is not
  78. // supported, those protocols will not connect.
  79. UpstreamProxyUrl string
  80. // UpstreamProxyCustomHeader is a set of additional arbitrary HTTP headers that are
  81. // added to all HTTP requests made through the upstream proxy specified by UpstreamProxyUrl
  82. // in case of HTTP proxy
  83. UpstreamProxyCustomHeaders http.Header
  84. ConnectTimeout time.Duration
  85. // PendingConns is used to track and interrupt dials in progress.
  86. // Dials may be interrupted using PendingConns.CloseAll(). Once instantiated,
  87. // a conn is added to pendingConns before the network connect begins and
  88. // removed from pendingConns once the connect succeeds or fails.
  89. // May be nil.
  90. PendingConns *Conns
  91. // BindToDevice parameters are used to exclude connections and
  92. // associated DNS requests from VPN routing.
  93. // When DeviceBinder is set, any underlying socket is
  94. // submitted to the device binding servicebefore connecting.
  95. // The service should bind the socket to a device so that it doesn't route
  96. // through a VPN interface. This service is also used to bind UDP sockets used
  97. // for DNS requests, in which case DnsServerGetter is used to get the
  98. // current active untunneled network DNS server.
  99. DeviceBinder DeviceBinder
  100. DnsServerGetter DnsServerGetter
  101. // UseIndistinguishableTLS specifies whether to try to use an
  102. // alternative stack for TLS. From a circumvention perspective,
  103. // Go's TLS has a distinct fingerprint that may be used for blocking.
  104. // Only applies to TLS connections.
  105. UseIndistinguishableTLS bool
  106. // TrustedCACertificatesFilename specifies a file containing trusted
  107. // CA certs. The file contents should be compatible with OpenSSL's
  108. // SSL_CTX_load_verify_locations.
  109. // Only applies to UseIndistinguishableTLS connections.
  110. TrustedCACertificatesFilename string
  111. // DeviceRegion is the reported region the host device is running in.
  112. // When set, this value may be used, pre-connection, to select performance
  113. // or circumvention optimization strategies for the given region.
  114. DeviceRegion string
  115. // ResolvedIPCallback, when set, is called with the IP address that was
  116. // dialed. This is either the specified IP address in the dial address,
  117. // or the resolved IP address in the case where the dial address is a
  118. // domain name.
  119. // The callback may be invoked by a concurrent goroutine.
  120. ResolvedIPCallback func(string)
  121. }
  122. // NetworkConnectivityChecker defines the interface to the external
  123. // HasNetworkConnectivity provider
  124. type NetworkConnectivityChecker interface {
  125. // TODO: change to bool return value once gobind supports that type
  126. HasNetworkConnectivity() int
  127. }
  128. // DeviceBinder defines the interface to the external BindToDevice provider
  129. type DeviceBinder interface {
  130. BindToDevice(fileDescriptor int) error
  131. }
  132. // DnsServerGetter defines the interface to the external GetDnsServer provider
  133. type DnsServerGetter interface {
  134. GetPrimaryDnsServer() string
  135. GetSecondaryDnsServer() string
  136. }
  137. // HostNameTransformer defines the interface for pluggable hostname
  138. // transformation circumvention strategies.
  139. type HostNameTransformer interface {
  140. TransformHostName(hostname string) (string, bool)
  141. }
  142. // IdentityHostNameTransformer is the default HostNameTransformer, which
  143. // returns the hostname unchanged.
  144. type IdentityHostNameTransformer struct{}
  145. func (IdentityHostNameTransformer) TransformHostName(hostname string) (string, bool) {
  146. return hostname, false
  147. }
  148. // TimeoutError implements the error interface
  149. type TimeoutError struct{}
  150. func (TimeoutError) Error() string { return "timed out" }
  151. func (TimeoutError) Timeout() bool { return true }
  152. func (TimeoutError) Temporary() bool { return true }
  153. // Dialer is a custom dialer compatible with http.Transport.Dial.
  154. type Dialer func(string, string) (net.Conn, error)
  155. // Conns is a synchronized list of Conns that is used to coordinate
  156. // interrupting a set of goroutines establishing connections, or
  157. // close a set of open connections, etc.
  158. // Once the list is closed, no more items may be added to the
  159. // list (unless it is reset).
  160. type Conns struct {
  161. mutex sync.Mutex
  162. isClosed bool
  163. conns map[net.Conn]bool
  164. }
  165. func (conns *Conns) Reset() {
  166. conns.mutex.Lock()
  167. defer conns.mutex.Unlock()
  168. conns.isClosed = false
  169. conns.conns = make(map[net.Conn]bool)
  170. }
  171. func (conns *Conns) Add(conn net.Conn) bool {
  172. conns.mutex.Lock()
  173. defer conns.mutex.Unlock()
  174. if conns.isClosed {
  175. return false
  176. }
  177. if conns.conns == nil {
  178. conns.conns = make(map[net.Conn]bool)
  179. }
  180. conns.conns[conn] = true
  181. return true
  182. }
  183. func (conns *Conns) Remove(conn net.Conn) {
  184. conns.mutex.Lock()
  185. defer conns.mutex.Unlock()
  186. delete(conns.conns, conn)
  187. }
  188. func (conns *Conns) CloseAll() {
  189. conns.mutex.Lock()
  190. defer conns.mutex.Unlock()
  191. conns.isClosed = true
  192. for conn, _ := range conns.conns {
  193. conn.Close()
  194. }
  195. conns.conns = make(map[net.Conn]bool)
  196. }
  197. // LRUConns is a concurrency-safe list of net.Conns ordered
  198. // by recent activity. Its purpose is to facilitate closing
  199. // the oldest connection in a set of connections.
  200. //
  201. // New connections added are referenced by a LRUConnsEntry,
  202. // which is used to Touch() active connections, which
  203. // promotes them to the front of the order and to Remove()
  204. // connections that are no longer LRU candidates.
  205. //
  206. // CloseOldest() will remove the oldest connection from the
  207. // list and call net.Conn.Close() on the connection.
  208. //
  209. // After an entry has been removed, LRUConnsEntry Touch()
  210. // and Remove() will have no effect.
  211. type LRUConns struct {
  212. mutex sync.Mutex
  213. list *list.List
  214. }
  215. // NewLRUConns initializes a new LRUConns.
  216. func NewLRUConns() *LRUConns {
  217. return &LRUConns{list: list.New()}
  218. }
  219. // Add inserts a net.Conn as the freshest connection
  220. // in a LRUConns and returns an LRUConnsEntry to be
  221. // used to freshen the connection or remove the connection
  222. // from the LRU list.
  223. func (conns *LRUConns) Add(conn net.Conn) *LRUConnsEntry {
  224. conns.mutex.Lock()
  225. defer conns.mutex.Unlock()
  226. return &LRUConnsEntry{
  227. lruConns: conns,
  228. element: conns.list.PushFront(conn),
  229. }
  230. }
  231. // CloseOldest closes the oldest connection in a
  232. // LRUConns. It calls net.Conn.Close() on the
  233. // connection.
  234. func (conns *LRUConns) CloseOldest() {
  235. conns.mutex.Lock()
  236. oldest := conns.list.Back()
  237. conn, ok := oldest.Value.(net.Conn)
  238. if oldest != nil {
  239. conns.list.Remove(oldest)
  240. }
  241. // Release mutex before closing conn
  242. conns.mutex.Unlock()
  243. if ok {
  244. conn.Close()
  245. }
  246. }
  247. // LRUConnsEntry is an entry in a LRUConns list.
  248. type LRUConnsEntry struct {
  249. lruConns *LRUConns
  250. element *list.Element
  251. }
  252. // Remove deletes the connection referenced by the
  253. // LRUConnsEntry from the associated LRUConns.
  254. // Has no effect if the entry was not initialized
  255. // or previously removed.
  256. func (entry *LRUConnsEntry) Remove() {
  257. if entry.lruConns == nil || entry.element == nil {
  258. return
  259. }
  260. entry.lruConns.mutex.Lock()
  261. defer entry.lruConns.mutex.Unlock()
  262. entry.lruConns.list.Remove(entry.element)
  263. }
  264. // Touch promotes the connection referenced by the
  265. // LRUConnsEntry to the front of the associated LRUConns.
  266. // Has no effect if the entry was not initialized
  267. // or previously removed.
  268. func (entry *LRUConnsEntry) Touch() {
  269. if entry.lruConns == nil || entry.element == nil {
  270. return
  271. }
  272. entry.lruConns.mutex.Lock()
  273. defer entry.lruConns.mutex.Unlock()
  274. entry.lruConns.list.MoveToFront(entry.element)
  275. }
  276. // LocalProxyRelay sends to remoteConn bytes received from localConn,
  277. // and sends to localConn bytes received from remoteConn.
  278. func LocalProxyRelay(proxyType string, localConn, remoteConn net.Conn) {
  279. copyWaitGroup := new(sync.WaitGroup)
  280. copyWaitGroup.Add(1)
  281. go func() {
  282. defer copyWaitGroup.Done()
  283. _, err := io.Copy(localConn, remoteConn)
  284. if err != nil {
  285. err = fmt.Errorf("Relay failed: %s", ContextError(err))
  286. NoticeLocalProxyError(proxyType, err)
  287. }
  288. }()
  289. _, err := io.Copy(remoteConn, localConn)
  290. if err != nil {
  291. err = fmt.Errorf("Relay failed: %s", ContextError(err))
  292. NoticeLocalProxyError(proxyType, err)
  293. }
  294. copyWaitGroup.Wait()
  295. }
  296. // WaitForNetworkConnectivity uses a NetworkConnectivityChecker to
  297. // periodically check for network connectivity. It returns true if
  298. // no NetworkConnectivityChecker is provided (waiting is disabled)
  299. // or when NetworkConnectivityChecker.HasNetworkConnectivity()
  300. // indicates connectivity. It waits and polls the checker once a second.
  301. // If any stop is broadcast, false is returned immediately.
  302. func WaitForNetworkConnectivity(
  303. connectivityChecker NetworkConnectivityChecker, stopBroadcasts ...<-chan struct{}) bool {
  304. if connectivityChecker == nil || 1 == connectivityChecker.HasNetworkConnectivity() {
  305. return true
  306. }
  307. NoticeInfo("waiting for network connectivity")
  308. ticker := time.NewTicker(1 * time.Second)
  309. for {
  310. if 1 == connectivityChecker.HasNetworkConnectivity() {
  311. return true
  312. }
  313. selectCases := make([]reflect.SelectCase, 1+len(stopBroadcasts))
  314. selectCases[0] = reflect.SelectCase{
  315. Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ticker.C)}
  316. for i, stopBroadcast := range stopBroadcasts {
  317. selectCases[i+1] = reflect.SelectCase{
  318. Dir: reflect.SelectRecv, Chan: reflect.ValueOf(stopBroadcast)}
  319. }
  320. chosen, _, ok := reflect.Select(selectCases)
  321. if chosen == 0 && ok {
  322. // Ticker case, so check again
  323. } else {
  324. // Stop case
  325. return false
  326. }
  327. }
  328. }
  329. // ResolveIP uses a custom dns stack to make a DNS query over the
  330. // given TCP or UDP conn. This is used, e.g., when we need to ensure
  331. // that a DNS connection bypasses a VPN interface (BindToDevice) or
  332. // when we need to ensure that a DNS connection is tunneled.
  333. // Caller must set timeouts or interruptibility as required for conn.
  334. func ResolveIP(host string, conn net.Conn) (addrs []net.IP, ttls []time.Duration, err error) {
  335. // Send the DNS query
  336. dnsConn := &dns.Conn{Conn: conn}
  337. defer dnsConn.Close()
  338. query := new(dns.Msg)
  339. query.SetQuestion(dns.Fqdn(host), dns.TypeA)
  340. query.RecursionDesired = true
  341. dnsConn.WriteMsg(query)
  342. // Process the response
  343. response, err := dnsConn.ReadMsg()
  344. if err != nil {
  345. return nil, nil, ContextError(err)
  346. }
  347. addrs = make([]net.IP, 0)
  348. ttls = make([]time.Duration, 0)
  349. for _, answer := range response.Answer {
  350. if a, ok := answer.(*dns.A); ok {
  351. addrs = append(addrs, a.A)
  352. ttl := time.Duration(a.Hdr.Ttl) * time.Second
  353. ttls = append(ttls, ttl)
  354. }
  355. }
  356. return addrs, ttls, nil
  357. }
  358. // MakeUntunneledHttpsClient returns a net/http.Client which is
  359. // configured to use custom dialing features -- including BindToDevice,
  360. // UseIndistinguishableTLS, etc. -- for a specific HTTPS request URL.
  361. // If verifyLegacyCertificate is not nil, it's used for certificate
  362. // verification.
  363. // Because UseIndistinguishableTLS requires a hack to work with
  364. // net/http, MakeUntunneledHttpClient may return a modified request URL
  365. // to be used. Callers should always use this return value to make
  366. // requests, not the input value.
  367. func MakeUntunneledHttpsClient(
  368. dialConfig *DialConfig,
  369. verifyLegacyCertificate *x509.Certificate,
  370. requestUrl string,
  371. requestTimeout time.Duration) (*http.Client, string, error) {
  372. // Change the scheme to "http"; otherwise http.Transport will try to do
  373. // another TLS handshake inside the explicit TLS session. Also need to
  374. // force an explicit port, as the default for "http", 80, won't talk TLS.
  375. urlComponents, err := url.Parse(requestUrl)
  376. if err != nil {
  377. return nil, "", ContextError(err)
  378. }
  379. urlComponents.Scheme = "http"
  380. host, port, err := net.SplitHostPort(urlComponents.Host)
  381. if err != nil {
  382. // Assume there's no port
  383. host = urlComponents.Host
  384. port = ""
  385. }
  386. if port == "" {
  387. port = "443"
  388. }
  389. urlComponents.Host = net.JoinHostPort(host, port)
  390. // Note: IndistinguishableTLS mode doesn't support VerifyLegacyCertificate
  391. useIndistinguishableTLS := dialConfig.UseIndistinguishableTLS && verifyLegacyCertificate == nil
  392. dialer := NewCustomTLSDialer(
  393. // Note: when verifyLegacyCertificate is not nil, some
  394. // of the other CustomTLSConfig is overridden.
  395. &CustomTLSConfig{
  396. Dial: NewTCPDialer(dialConfig),
  397. VerifyLegacyCertificate: verifyLegacyCertificate,
  398. SNIServerName: host,
  399. SkipVerify: false,
  400. UseIndistinguishableTLS: useIndistinguishableTLS,
  401. TrustedCACertificatesFilename: dialConfig.TrustedCACertificatesFilename,
  402. })
  403. transport := &http.Transport{
  404. Dial: dialer,
  405. }
  406. httpClient := &http.Client{
  407. Timeout: requestTimeout,
  408. Transport: transport,
  409. }
  410. return httpClient, urlComponents.String(), nil
  411. }
  412. // MakeTunneledHttpClient returns a net/http.Client which is
  413. // configured to use custom dialing features including tunneled
  414. // dialing and, optionally, UseTrustedCACertificatesForStockTLS.
  415. // Unlike MakeUntunneledHttpsClient and makePsiphonHttpsClient,
  416. // This http.Client uses stock TLS and no scheme transformation
  417. // hack is required.
  418. func MakeTunneledHttpClient(
  419. config *Config,
  420. tunnel *Tunnel,
  421. requestTimeout time.Duration) (*http.Client, error) {
  422. tunneledDialer := func(_, addr string) (conn net.Conn, err error) {
  423. return tunnel.sshClient.Dial("tcp", addr)
  424. }
  425. transport := &http.Transport{
  426. Dial: tunneledDialer,
  427. }
  428. if config.UseTrustedCACertificatesForStockTLS {
  429. if config.TrustedCACertificatesFilename == "" {
  430. return nil, ContextError(errors.New(
  431. "UseTrustedCACertificatesForStockTLS requires TrustedCACertificatesFilename"))
  432. }
  433. rootCAs := x509.NewCertPool()
  434. certData, err := ioutil.ReadFile(config.TrustedCACertificatesFilename)
  435. if err != nil {
  436. return nil, ContextError(err)
  437. }
  438. rootCAs.AppendCertsFromPEM(certData)
  439. transport.TLSClientConfig = &tls.Config{RootCAs: rootCAs}
  440. }
  441. return &http.Client{
  442. Transport: transport,
  443. Timeout: requestTimeout,
  444. }, nil
  445. }
  446. // MakeDownloadHttpClient is a resusable helper that sets up a
  447. // http.Client for use either untunneled or through a tunnel.
  448. // See MakeUntunneledHttpsClient for a note about request URL
  449. // rewritting.
  450. func MakeDownloadHttpClient(
  451. config *Config,
  452. tunnel *Tunnel,
  453. untunneledDialConfig *DialConfig,
  454. requestUrl string,
  455. requestTimeout time.Duration) (*http.Client, string, error) {
  456. var httpClient *http.Client
  457. var err error
  458. if tunnel != nil {
  459. httpClient, err = MakeTunneledHttpClient(config, tunnel, requestTimeout)
  460. if err != nil {
  461. return nil, "", ContextError(err)
  462. }
  463. } else {
  464. httpClient, requestUrl, err = MakeUntunneledHttpsClient(
  465. untunneledDialConfig, nil, requestUrl, requestTimeout)
  466. if err != nil {
  467. return nil, "", ContextError(err)
  468. }
  469. }
  470. return httpClient, requestUrl, nil
  471. }
  472. // ResumeDownload is a resuable helper that downloads requestUrl via the
  473. // httpClient, storing the result in downloadFilename when the download is
  474. // complete. Intermediate, partial downloads state is stored in
  475. // downloadFilename.part and downloadFilename.part.etag.
  476. // Any existing downloadFilename file will be overwritten.
  477. //
  478. // In the case where the remote object has change while a partial download
  479. // is to be resumed, the partial state is reset and resumeDownload fails.
  480. // The caller must restart the download.
  481. //
  482. // When ifNoneMatchETag is specified, no download is made if the remote
  483. // object has the same ETag. ifNoneMatchETag has an effect only when no
  484. // partial download is in progress.
  485. //
  486. func ResumeDownload(
  487. httpClient *http.Client,
  488. requestUrl string,
  489. downloadFilename string,
  490. ifNoneMatchETag string) (int64, string, error) {
  491. partialFilename := fmt.Sprintf("%s.part", downloadFilename)
  492. partialETagFilename := fmt.Sprintf("%s.part.etag", downloadFilename)
  493. file, err := os.OpenFile(partialFilename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
  494. if err != nil {
  495. return 0, "", ContextError(err)
  496. }
  497. defer file.Close()
  498. fileInfo, err := file.Stat()
  499. if err != nil {
  500. return 0, "", ContextError(err)
  501. }
  502. // A partial download should have an ETag which is to be sent with the
  503. // Range request to ensure that the source object is the same as the
  504. // one that is partially downloaded.
  505. var partialETag []byte
  506. if fileInfo.Size() > 0 {
  507. partialETag, err = ioutil.ReadFile(partialETagFilename)
  508. // When the ETag can't be loaded, delete the partial download. To keep the
  509. // code simple, there is no immediate, inline retry here, on the assumption
  510. // that the controller's upgradeDownloader will shortly call DownloadUpgrade
  511. // again.
  512. if err != nil {
  513. os.Remove(partialFilename)
  514. os.Remove(partialETagFilename)
  515. return 0, "", ContextError(
  516. fmt.Errorf("failed to load partial download ETag: %s", err))
  517. }
  518. }
  519. request, err := http.NewRequest("GET", requestUrl, nil)
  520. if err != nil {
  521. return 0, "", ContextError(err)
  522. }
  523. request.Header.Add("Range", fmt.Sprintf("bytes=%d-", fileInfo.Size()))
  524. if partialETag != nil {
  525. // Note: not using If-Range, since not all host servers support it.
  526. // Using If-Match means we need to check for status code 412 and reset
  527. // when the ETag has changed since the last partial download.
  528. request.Header.Add("If-Match", string(partialETag))
  529. } else if ifNoneMatchETag != "" {
  530. // Can't specify both If-Match and If-None-Match. Behavior is undefined.
  531. // https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.26
  532. // So for downloaders that store an ETag and wish to use that to prevent
  533. // redundant downloads, that ETag is sent as If-None-Match in the case
  534. // where a partial download is not in progress. When a partial download
  535. // is in progress, the partial ETag is sent as If-Match: either that's
  536. // a version that was never fully received, or it's no longer current in
  537. // which case the response will be StatusPreconditionFailed, the partial
  538. // download will be discarded, and then the next retry will use
  539. // If-None-Match.
  540. // Note: in this case, fileInfo.Size() == 0
  541. request.Header.Add("If-None-Match", ifNoneMatchETag)
  542. }
  543. response, err := httpClient.Do(request)
  544. // The resumeable download may ask for bytes past the resource range
  545. // since it doesn't store the "completed download" state. In this case,
  546. // the HTTP server returns 416. Otherwise, we expect 206. We may also
  547. // receive 412 on ETag mismatch.
  548. if err == nil &&
  549. (response.StatusCode != http.StatusPartialContent &&
  550. response.StatusCode != http.StatusRequestedRangeNotSatisfiable &&
  551. response.StatusCode != http.StatusPreconditionFailed &&
  552. response.StatusCode != http.StatusNotModified) {
  553. response.Body.Close()
  554. err = fmt.Errorf("unexpected response status code: %d", response.StatusCode)
  555. }
  556. if err != nil {
  557. return 0, "", ContextError(err)
  558. }
  559. defer response.Body.Close()
  560. responseETag := response.Header.Get("ETag")
  561. if response.StatusCode == http.StatusPreconditionFailed {
  562. // When the ETag no longer matches, delete the partial download. As above,
  563. // simply failing and relying on the caller's retry schedule.
  564. os.Remove(partialFilename)
  565. os.Remove(partialETagFilename)
  566. return 0, "", ContextError(errors.New("partial download ETag mismatch"))
  567. } else if response.StatusCode == http.StatusNotModified {
  568. // This status code is possible in the "If-None-Match" case. Don't leave
  569. // any partial download in progress. Caller should check that responseETag
  570. // matches ifNoneMatchETag.
  571. os.Remove(partialFilename)
  572. os.Remove(partialETagFilename)
  573. return 0, responseETag, nil
  574. }
  575. // Not making failure to write ETag file fatal, in case the entire download
  576. // succeeds in this one request.
  577. ioutil.WriteFile(partialETagFilename, []byte(responseETag), 0600)
  578. // A partial download occurs when this copy is interrupted. The io.Copy
  579. // will fail, leaving a partial download in place (.part and .part.etag).
  580. n, err := io.Copy(NewSyncFileWriter(file), response.Body)
  581. // From this point, n bytes are indicated as downloaded, even if there is
  582. // an error; the caller may use this to report partial download progress.
  583. if err != nil {
  584. return n, "", ContextError(err)
  585. }
  586. // Ensure the file is flushed to disk. The deferred close
  587. // will be a noop when this succeeds.
  588. err = file.Close()
  589. if err != nil {
  590. return n, "", ContextError(err)
  591. }
  592. // Remove if exists, to enable rename
  593. os.Remove(downloadFilename)
  594. err = os.Rename(partialFilename, downloadFilename)
  595. if err != nil {
  596. return n, "", ContextError(err)
  597. }
  598. os.Remove(partialETagFilename)
  599. return n, responseETag, nil
  600. }
  601. // IPAddressFromAddr is a helper which extracts an IP address
  602. // from a net.Addr or returns "" if there is no IP address.
  603. func IPAddressFromAddr(addr net.Addr) string {
  604. ipAddress := ""
  605. if addr != nil {
  606. host, _, err := net.SplitHostPort(addr.String())
  607. if err == nil {
  608. ipAddress = host
  609. }
  610. }
  611. return ipAddress
  612. }
  613. // HTTPSServer is a wrapper around http.Server which adds the
  614. // ServeTLS function.
  615. type HTTPSServer struct {
  616. http.Server
  617. }
  618. // ServeTLS is a offers the equivalent interface as http.Serve.
  619. // The http package has both ListenAndServe and ListenAndServeTLS higher-
  620. // level interfaces, but only Serve (not TLS) offers a lower-level interface that
  621. // allows the caller to keep a refererence to the Listener, allowing for external
  622. // shutdown. ListenAndServeTLS also requires the TLS cert and key to be in files
  623. // and we avoid that here.
  624. // tcpKeepAliveListener is used in http.ListenAndServeTLS but not exported,
  625. // so we use a copy from https://golang.org/src/net/http/server.go.
  626. func (server *HTTPSServer) ServeTLS(listener net.Listener) error {
  627. tlsListener := tls.NewListener(tcpKeepAliveListener{listener.(*net.TCPListener)}, server.TLSConfig)
  628. return server.Serve(tlsListener)
  629. }
  630. type tcpKeepAliveListener struct {
  631. *net.TCPListener
  632. }
  633. func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
  634. tc, err := ln.AcceptTCP()
  635. if err != nil {
  636. return
  637. }
  638. tc.SetKeepAlive(true)
  639. tc.SetKeepAlivePeriod(3 * time.Minute)
  640. return tc, nil
  641. }
  642. // ActivityMonitoredConn wraps a net.Conn, adding logic to deal with
  643. // events triggered by I/O activity.
  644. //
  645. // When an inactivity timeout is specified, the net.Conn Read() will
  646. // timeout after the specified period of read inactivity. Optionally,
  647. // ActivityMonitoredConn will also consider the connection active when
  648. // data is written to it.
  649. //
  650. // When a LRUConnsEntry is specified, then the LRU entry is promoted on
  651. // either a successful read or write.
  652. //
  653. type ActivityMonitoredConn struct {
  654. net.Conn
  655. inactivityTimeout time.Duration
  656. activeOnWrite bool
  657. startTime int64
  658. lastActivityTime int64
  659. lruEntry *LRUConnsEntry
  660. }
  661. func NewActivityMonitoredConn(
  662. conn net.Conn,
  663. inactivityTimeout time.Duration,
  664. activeOnWrite bool,
  665. lruEntry *LRUConnsEntry) *ActivityMonitoredConn {
  666. if inactivityTimeout > 0 {
  667. conn.SetReadDeadline(time.Now().Add(inactivityTimeout))
  668. }
  669. now := time.Now().UnixNano()
  670. return &ActivityMonitoredConn{
  671. Conn: conn,
  672. inactivityTimeout: inactivityTimeout,
  673. activeOnWrite: activeOnWrite,
  674. startTime: now,
  675. lastActivityTime: now,
  676. lruEntry: lruEntry,
  677. }
  678. }
  679. // GetStartTime gets the time when the ActivityMonitoredConn was
  680. // initialized.
  681. func (conn *ActivityMonitoredConn) GetStartTime() time.Time {
  682. return time.Unix(0, conn.startTime)
  683. }
  684. // GetActiveDuration returns the time elapsed between the initialization
  685. // of the ActivityMonitoredConn and the last Read (or Write when
  686. // activeOnWrite is specified).
  687. func (conn *ActivityMonitoredConn) GetActiveDuration() time.Duration {
  688. return time.Duration(atomic.LoadInt64(&conn.lastActivityTime) - conn.startTime)
  689. }
  690. func (conn *ActivityMonitoredConn) Read(buffer []byte) (int, error) {
  691. n, err := conn.Conn.Read(buffer)
  692. if err == nil {
  693. atomic.StoreInt64(&conn.lastActivityTime, time.Now().UnixNano())
  694. if conn.inactivityTimeout > 0 {
  695. conn.Conn.SetReadDeadline(time.Now().Add(conn.inactivityTimeout))
  696. }
  697. if conn.lruEntry != nil {
  698. conn.lruEntry.Touch()
  699. }
  700. }
  701. return n, err
  702. }
  703. func (conn *ActivityMonitoredConn) Write(buffer []byte) (int, error) {
  704. n, err := conn.Conn.Write(buffer)
  705. if err == nil {
  706. if conn.activeOnWrite {
  707. atomic.StoreInt64(&conn.lastActivityTime, time.Now().UnixNano())
  708. if conn.inactivityTimeout > 0 {
  709. conn.Conn.SetReadDeadline(time.Now().Add(conn.inactivityTimeout))
  710. }
  711. }
  712. if conn.lruEntry != nil {
  713. conn.lruEntry.Touch()
  714. }
  715. }
  716. return n, err
  717. }
  718. // ThrottledConn wraps a net.Conn with read and write rate limiters.
  719. // Rates are specified as bytes per second. Optional unlimited byte
  720. // counts allow for a number of bytes to read or write before
  721. // applying rate limiting. Specify limit values of 0 to set no rate
  722. // limit (unlimited counts are ignored in this case).
  723. // The underlying rate limiter uses the token bucket algorithm to
  724. // calculate delay times for read and write operations.
  725. type ThrottledConn struct {
  726. net.Conn
  727. unlimitedReadBytes int64
  728. limitingReads int32
  729. limitedReader io.Reader
  730. unlimitedWriteBytes int64
  731. limitingWrites int32
  732. limitedWriter io.Writer
  733. }
  734. // NewThrottledConn initializes a new ThrottledConn.
  735. func NewThrottledConn(
  736. conn net.Conn,
  737. unlimitedReadBytes, limitReadBytesPerSecond,
  738. unlimitedWriteBytes, limitWriteBytesPerSecond int64) *ThrottledConn {
  739. // When no limit is specified, the rate limited reader/writer
  740. // is simply the base reader/writer.
  741. var reader io.Reader
  742. if limitReadBytesPerSecond == 0 {
  743. reader = conn
  744. } else {
  745. reader = ratelimit.Reader(conn,
  746. ratelimit.NewBucketWithRate(
  747. float64(limitReadBytesPerSecond), limitReadBytesPerSecond))
  748. }
  749. var writer io.Writer
  750. if limitWriteBytesPerSecond == 0 {
  751. writer = conn
  752. } else {
  753. writer = ratelimit.Writer(conn,
  754. ratelimit.NewBucketWithRate(
  755. float64(limitWriteBytesPerSecond), limitWriteBytesPerSecond))
  756. }
  757. return &ThrottledConn{
  758. Conn: conn,
  759. unlimitedReadBytes: unlimitedReadBytes,
  760. limitingReads: 0,
  761. limitedReader: reader,
  762. unlimitedWriteBytes: unlimitedWriteBytes,
  763. limitingWrites: 0,
  764. limitedWriter: writer,
  765. }
  766. }
  767. func (conn *ThrottledConn) Read(buffer []byte) (int, error) {
  768. // Use the base reader until the unlimited count is exhausted.
  769. if atomic.LoadInt32(&conn.limitingReads) == 0 {
  770. if atomic.AddInt64(&conn.unlimitedReadBytes, -int64(len(buffer))) <= 0 {
  771. atomic.StoreInt32(&conn.limitingReads, 1)
  772. } else {
  773. return conn.Read(buffer)
  774. }
  775. }
  776. return conn.limitedReader.Read(buffer)
  777. }
  778. func (conn *ThrottledConn) Write(buffer []byte) (int, error) {
  779. // Use the base writer until the unlimited count is exhausted.
  780. if atomic.LoadInt32(&conn.limitingWrites) == 0 {
  781. if atomic.AddInt64(&conn.unlimitedWriteBytes, -int64(len(buffer))) <= 0 {
  782. atomic.StoreInt32(&conn.limitingWrites, 1)
  783. } else {
  784. return conn.Write(buffer)
  785. }
  786. }
  787. return conn.limitedWriter.Write(buffer)
  788. }