tunnel.go 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "encoding/base64"
  23. "encoding/json"
  24. "errors"
  25. "fmt"
  26. "io"
  27. "net"
  28. "net/url"
  29. "sync"
  30. "sync/atomic"
  31. "time"
  32. "github.com/Psiphon-Inc/crypto/ssh"
  33. "github.com/Psiphon-Inc/goarista/monotime"
  34. regen "github.com/Psiphon-Inc/goregen"
  35. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  36. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/transferstats"
  38. )
  39. // Tunneler specifies the interface required by components that use a tunnel.
  40. // Components which use this interface may be serviced by a single Tunnel instance,
  41. // or a Controller which manages a pool of tunnels, or any other object which
  42. // implements Tunneler.
  43. // alwaysTunnel indicates that the connection should always be tunneled. If this
  44. // is not set, the connection may be made directly, depending on split tunnel
  45. // classification, when that feature is supported and active.
  46. // downstreamConn is an optional parameter which specifies a connection to be
  47. // explictly closed when the Dialed connection is closed. For instance, this
  48. // is used to close downstreamConn App<->LocalProxy connections when the related
  49. // LocalProxy<->SshPortForward connections close.
  50. type Tunneler interface {
  51. Dial(remoteAddr string, alwaysTunnel bool, downstreamConn net.Conn) (conn net.Conn, err error)
  52. SignalComponentFailure()
  53. }
  54. // TunnerOwner specifies the interface required by Tunnel to notify its
  55. // owner when it has failed. The owner may, as in the case of the Controller,
  56. // remove the tunnel from its list of active tunnels.
  57. type TunnelOwner interface {
  58. SignalSeededNewSLOK()
  59. SignalTunnelFailure(tunnel *Tunnel)
  60. }
  61. // Tunnel is a connection to a Psiphon server. An established
  62. // tunnel includes a network connection to the specified server
  63. // and an SSH session built on top of that transport.
  64. type Tunnel struct {
  65. mutex *sync.Mutex
  66. config *Config
  67. untunneledDialConfig *DialConfig
  68. isDiscarded bool
  69. isClosed bool
  70. serverEntry *protocol.ServerEntry
  71. serverContext *ServerContext
  72. protocol string
  73. conn *common.ActivityMonitoredConn
  74. sshClient *ssh.Client
  75. sshServerRequests <-chan *ssh.Request
  76. operateWaitGroup *sync.WaitGroup
  77. shutdownOperateBroadcast chan struct{}
  78. signalPortForwardFailure chan struct{}
  79. totalPortForwardFailures int
  80. establishDuration time.Duration
  81. establishedTime monotime.Time
  82. dialStats *TunnelDialStats
  83. newClientVerificationPayload chan string
  84. }
  85. // TunnelDialStats records additional dial config that is sent to the server for stats
  86. // recording. This data is used to analyze which configuration settings are successful
  87. // in various circumvention contexts, and includes meek dial params and upstream proxy
  88. // params.
  89. // For upstream proxy, only proxy type and custom header names are recorded; proxy
  90. // address and custom header values are considered PII.
  91. type TunnelDialStats struct {
  92. SelectedSSHClientVersion bool
  93. SSHClientVersion string
  94. UpstreamProxyType string
  95. UpstreamProxyCustomHeaderNames []string
  96. MeekDialAddress string
  97. MeekResolvedIPAddress string
  98. MeekSNIServerName string
  99. MeekHostHeader string
  100. MeekTransformedHostName bool
  101. SelectedUserAgent bool
  102. UserAgent string
  103. SelectedTLSProfile bool
  104. TLSProfile string
  105. }
  106. // EstablishTunnel first makes a network transport connection to the
  107. // Psiphon server and then establishes an SSH client session on top of
  108. // that transport. The SSH server is authenticated using the public
  109. // key in the server entry.
  110. // Depending on the server's capabilities, the connection may use
  111. // plain SSH over TCP, obfuscated SSH over TCP, or obfuscated SSH over
  112. // HTTP (meek protocol).
  113. // When requiredProtocol is not blank, that protocol is used. Otherwise,
  114. // the a random supported protocol is used.
  115. // untunneledDialConfig is used for untunneled final status requests.
  116. func EstablishTunnel(
  117. config *Config,
  118. untunneledDialConfig *DialConfig,
  119. sessionId string,
  120. pendingConns *common.Conns,
  121. serverEntry *protocol.ServerEntry,
  122. adjustedEstablishStartTime monotime.Time,
  123. tunnelOwner TunnelOwner) (tunnel *Tunnel, err error) {
  124. selectedProtocol, err := selectProtocol(config, serverEntry)
  125. if err != nil {
  126. return nil, common.ContextError(err)
  127. }
  128. // Build transport layers and establish SSH connection. Note that
  129. // dialConn and monitoredConn are the same network connection.
  130. dialResult, err := dialSsh(
  131. config, pendingConns, serverEntry, selectedProtocol, sessionId)
  132. if err != nil {
  133. return nil, common.ContextError(err)
  134. }
  135. // Cleanup on error
  136. defer func() {
  137. if err != nil {
  138. dialResult.sshClient.Close()
  139. dialResult.monitoredConn.Close()
  140. pendingConns.Remove(dialResult.dialConn)
  141. }
  142. }()
  143. // The tunnel is now connected
  144. tunnel = &Tunnel{
  145. mutex: new(sync.Mutex),
  146. config: config,
  147. untunneledDialConfig: untunneledDialConfig,
  148. isClosed: false,
  149. serverEntry: serverEntry,
  150. protocol: selectedProtocol,
  151. conn: dialResult.monitoredConn,
  152. sshClient: dialResult.sshClient,
  153. sshServerRequests: dialResult.sshRequests,
  154. operateWaitGroup: new(sync.WaitGroup),
  155. shutdownOperateBroadcast: make(chan struct{}),
  156. // A buffer allows at least one signal to be sent even when the receiver is
  157. // not listening. Senders should not block.
  158. signalPortForwardFailure: make(chan struct{}, 1),
  159. dialStats: dialResult.dialStats,
  160. // Buffer allows SetClientVerificationPayload to submit one new payload
  161. // without blocking or dropping it.
  162. newClientVerificationPayload: make(chan string, 1),
  163. }
  164. // Create a new Psiphon API server context for this tunnel. This includes
  165. // performing a handshake request. If the handshake fails, this establishment
  166. // fails.
  167. if !config.DisableApi {
  168. NoticeInfo("starting server context for %s", tunnel.serverEntry.IpAddress)
  169. tunnel.serverContext, err = NewServerContext(tunnel, sessionId)
  170. if err != nil {
  171. return nil, common.ContextError(
  172. fmt.Errorf("error starting server context for %s: %s",
  173. tunnel.serverEntry.IpAddress, err))
  174. }
  175. }
  176. // establishDuration is the elapsed time between the controller starting tunnel
  177. // establishment and this tunnel being established. The reported value represents
  178. // how long the user waited between starting the client and having a usable tunnel;
  179. // or how long between the client detecting an unexpected tunnel disconnect and
  180. // completing automatic reestablishment.
  181. //
  182. // This time period may include time spent unsuccessfully connecting to other
  183. // servers. Time spent waiting for network connectivity is excluded.
  184. tunnel.establishDuration = monotime.Since(adjustedEstablishStartTime)
  185. tunnel.establishedTime = monotime.Now()
  186. // Now that network operations are complete, cancel interruptibility
  187. pendingConns.Remove(dialResult.dialConn)
  188. // Spawn the operateTunnel goroutine, which monitors the tunnel and handles periodic stats updates.
  189. tunnel.operateWaitGroup.Add(1)
  190. go tunnel.operateTunnel(tunnelOwner)
  191. return tunnel, nil
  192. }
  193. // Close stops operating the tunnel and closes the underlying connection.
  194. // Supports multiple and/or concurrent calls to Close().
  195. // When isDiscarded is set, operateTunnel will not attempt to send final
  196. // status requests.
  197. func (tunnel *Tunnel) Close(isDiscarded bool) {
  198. tunnel.mutex.Lock()
  199. tunnel.isDiscarded = isDiscarded
  200. isClosed := tunnel.isClosed
  201. tunnel.isClosed = true
  202. tunnel.mutex.Unlock()
  203. if !isClosed {
  204. // Signal operateTunnel to stop before closing the tunnel -- this
  205. // allows a final status request to be made in the case of an orderly
  206. // shutdown.
  207. // A timer is set, so if operateTunnel takes too long to stop, the
  208. // tunnel is closed, which will interrupt any slow final status request.
  209. // In effect, the TUNNEL_OPERATE_SHUTDOWN_TIMEOUT value will take
  210. // precedence over the PSIPHON_API_SERVER_TIMEOUT http.Client.Timeout
  211. // value set in makePsiphonHttpsClient.
  212. timer := time.AfterFunc(TUNNEL_OPERATE_SHUTDOWN_TIMEOUT, func() { tunnel.conn.Close() })
  213. close(tunnel.shutdownOperateBroadcast)
  214. tunnel.operateWaitGroup.Wait()
  215. timer.Stop()
  216. tunnel.sshClient.Close()
  217. // tunnel.conn.Close() may get called multiple times, which is allowed.
  218. tunnel.conn.Close()
  219. err := tunnel.sshClient.Wait()
  220. if err != nil {
  221. NoticeAlert("close tunnel ssh error: %s", err)
  222. }
  223. }
  224. }
  225. // IsDiscarded returns the tunnel's discarded flag.
  226. func (tunnel *Tunnel) IsDiscarded() bool {
  227. tunnel.mutex.Lock()
  228. defer tunnel.mutex.Unlock()
  229. return tunnel.isDiscarded
  230. }
  231. // SendAPIRequest sends an API request as an SSH request through the tunnel.
  232. // This function blocks awaiting a response. Only one request may be in-flight
  233. // at once; a concurrent SendAPIRequest will block until an active request
  234. // receives its response (or the SSH connection is terminated).
  235. func (tunnel *Tunnel) SendAPIRequest(
  236. name string, requestPayload []byte) ([]byte, error) {
  237. ok, responsePayload, err := tunnel.sshClient.Conn.SendRequest(
  238. name, true, requestPayload)
  239. if err != nil {
  240. return nil, common.ContextError(err)
  241. }
  242. if !ok {
  243. return nil, common.ContextError(errors.New("API request rejected"))
  244. }
  245. return responsePayload, nil
  246. }
  247. // Dial establishes a port forward connection through the tunnel
  248. // This Dial doesn't support split tunnel, so alwaysTunnel is not referenced
  249. func (tunnel *Tunnel) Dial(
  250. remoteAddr string, alwaysTunnel bool, downstreamConn net.Conn) (conn net.Conn, err error) {
  251. type tunnelDialResult struct {
  252. sshPortForwardConn net.Conn
  253. err error
  254. }
  255. resultChannel := make(chan *tunnelDialResult, 2)
  256. if *tunnel.config.TunnelPortForwardDialTimeoutSeconds > 0 {
  257. time.AfterFunc(time.Duration(*tunnel.config.TunnelPortForwardDialTimeoutSeconds)*time.Second, func() {
  258. resultChannel <- &tunnelDialResult{nil, errors.New("tunnel dial timeout")}
  259. })
  260. }
  261. go func() {
  262. sshPortForwardConn, err := tunnel.sshClient.Dial("tcp", remoteAddr)
  263. resultChannel <- &tunnelDialResult{sshPortForwardConn, err}
  264. }()
  265. result := <-resultChannel
  266. if result.err != nil {
  267. // TODO: conditional on type of error or error message?
  268. select {
  269. case tunnel.signalPortForwardFailure <- *new(struct{}):
  270. default:
  271. }
  272. return nil, common.ContextError(result.err)
  273. }
  274. conn = &TunneledConn{
  275. Conn: result.sshPortForwardConn,
  276. tunnel: tunnel,
  277. downstreamConn: downstreamConn}
  278. return tunnel.wrapWithTransferStats(conn), nil
  279. }
  280. func (tunnel *Tunnel) DialPacketTunnelChannel() (net.Conn, error) {
  281. channel, requests, err := tunnel.sshClient.OpenChannel(
  282. protocol.PACKET_TUNNEL_CHANNEL_TYPE, nil)
  283. if err != nil {
  284. // TODO: conditional on type of error or error message?
  285. select {
  286. case tunnel.signalPortForwardFailure <- *new(struct{}):
  287. default:
  288. }
  289. return nil, common.ContextError(err)
  290. }
  291. go ssh.DiscardRequests(requests)
  292. conn := newChannelConn(channel)
  293. // wrapWithTransferStats will track bytes transferred for the
  294. // packet tunnel. It will count packet overhead (TCP/UDP/IP headers).
  295. //
  296. // Since the data in the channel is not HTTP or TLS, no domain bytes
  297. // counting is expected.
  298. //
  299. // transferstats are also used to determine that there's been recent
  300. // activity and skip periodic SSH keep alives; see Tunnel.operateTunnel.
  301. return tunnel.wrapWithTransferStats(conn), nil
  302. }
  303. func (tunnel *Tunnel) wrapWithTransferStats(conn net.Conn) net.Conn {
  304. // Tunnel does not have a serverContext when DisableApi is set. We still use
  305. // transferstats.Conn to count bytes transferred for monitoring tunnel
  306. // quality.
  307. var regexps *transferstats.Regexps
  308. if tunnel.serverContext != nil {
  309. regexps = tunnel.serverContext.StatsRegexps()
  310. }
  311. return transferstats.NewConn(conn, tunnel.serverEntry.IpAddress, regexps)
  312. }
  313. // SignalComponentFailure notifies the tunnel that an associated component has failed.
  314. // This will terminate the tunnel.
  315. func (tunnel *Tunnel) SignalComponentFailure() {
  316. NoticeAlert("tunnel received component failure signal")
  317. tunnel.Close(false)
  318. }
  319. // SetClientVerificationPayload triggers a client verification request, for this
  320. // tunnel, with the specified verifiction payload. If the tunnel is not yet established,
  321. // the payload/request is enqueued. If a payload/request is already eneueued, the
  322. // new payload is dropped.
  323. func (tunnel *Tunnel) SetClientVerificationPayload(clientVerificationPayload string) {
  324. select {
  325. case tunnel.newClientVerificationPayload <- clientVerificationPayload:
  326. default:
  327. }
  328. }
  329. // TunneledConn implements net.Conn and wraps a port foward connection.
  330. // It is used to hook into Read and Write to observe I/O errors and
  331. // report these errors back to the tunnel monitor as port forward failures.
  332. // TunneledConn optionally tracks a peer connection to be explictly closed
  333. // when the TunneledConn is closed.
  334. type TunneledConn struct {
  335. net.Conn
  336. tunnel *Tunnel
  337. downstreamConn net.Conn
  338. }
  339. func (conn *TunneledConn) Read(buffer []byte) (n int, err error) {
  340. n, err = conn.Conn.Read(buffer)
  341. if err != nil && err != io.EOF {
  342. // Report new failure. Won't block; assumes the receiver
  343. // has a sufficient buffer for the threshold number of reports.
  344. // TODO: conditional on type of error or error message?
  345. select {
  346. case conn.tunnel.signalPortForwardFailure <- *new(struct{}):
  347. default:
  348. }
  349. }
  350. return
  351. }
  352. func (conn *TunneledConn) Write(buffer []byte) (n int, err error) {
  353. n, err = conn.Conn.Write(buffer)
  354. if err != nil && err != io.EOF {
  355. // Same as TunneledConn.Read()
  356. select {
  357. case conn.tunnel.signalPortForwardFailure <- *new(struct{}):
  358. default:
  359. }
  360. }
  361. return
  362. }
  363. func (conn *TunneledConn) Close() error {
  364. if conn.downstreamConn != nil {
  365. conn.downstreamConn.Close()
  366. }
  367. return conn.Conn.Close()
  368. }
  369. // selectProtocol is a helper that picks the tunnel protocol
  370. func selectProtocol(
  371. config *Config, serverEntry *protocol.ServerEntry) (selectedProtocol string, err error) {
  372. // TODO: properly handle protocols (e.g. FRONTED-MEEK-OSSH) vs. capabilities (e.g., {FRONTED-MEEK, OSSH})
  373. // for now, the code is simply assuming that MEEK capabilities imply OSSH capability.
  374. if config.TunnelProtocol != "" {
  375. if !serverEntry.SupportsProtocol(config.TunnelProtocol) {
  376. return "", common.ContextError(fmt.Errorf("server does not have required capability"))
  377. }
  378. selectedProtocol = config.TunnelProtocol
  379. } else {
  380. // Pick at random from the supported protocols. This ensures that we'll eventually
  381. // try all possible protocols. Depending on network configuration, it may be the
  382. // case that some protocol is only available through multi-capability servers,
  383. // and a simpler ranked preference of protocols could lead to that protocol never
  384. // being selected.
  385. candidateProtocols := serverEntry.GetSupportedProtocols()
  386. if len(candidateProtocols) == 0 {
  387. return "", common.ContextError(fmt.Errorf("server does not have any supported capabilities"))
  388. }
  389. index, err := common.MakeSecureRandomInt(len(candidateProtocols))
  390. if err != nil {
  391. return "", common.ContextError(err)
  392. }
  393. selectedProtocol = candidateProtocols[index]
  394. }
  395. return selectedProtocol, nil
  396. }
  397. // selectFrontingParameters is a helper which selects/generates meek fronting
  398. // parameters where the server entry provides multiple options or patterns.
  399. func selectFrontingParameters(
  400. serverEntry *protocol.ServerEntry) (frontingAddress, frontingHost string, err error) {
  401. if len(serverEntry.MeekFrontingAddressesRegex) > 0 {
  402. // Generate a front address based on the regex.
  403. frontingAddress, err = regen.Generate(serverEntry.MeekFrontingAddressesRegex)
  404. if err != nil {
  405. return "", "", common.ContextError(err)
  406. }
  407. } else {
  408. // Randomly select, for this connection attempt, one front address for
  409. // fronting-capable servers.
  410. if len(serverEntry.MeekFrontingAddresses) == 0 {
  411. return "", "", common.ContextError(errors.New("MeekFrontingAddresses is empty"))
  412. }
  413. index, err := common.MakeSecureRandomInt(len(serverEntry.MeekFrontingAddresses))
  414. if err != nil {
  415. return "", "", common.ContextError(err)
  416. }
  417. frontingAddress = serverEntry.MeekFrontingAddresses[index]
  418. }
  419. if len(serverEntry.MeekFrontingHosts) > 0 {
  420. index, err := common.MakeSecureRandomInt(len(serverEntry.MeekFrontingHosts))
  421. if err != nil {
  422. return "", "", common.ContextError(err)
  423. }
  424. frontingHost = serverEntry.MeekFrontingHosts[index]
  425. } else {
  426. // Backwards compatibility case
  427. frontingHost = serverEntry.MeekFrontingHost
  428. }
  429. return
  430. }
  431. func doMeekTransformHostName(config *Config) bool {
  432. switch config.TransformHostNames {
  433. case TRANSFORM_HOST_NAMES_ALWAYS:
  434. return true
  435. case TRANSFORM_HOST_NAMES_NEVER:
  436. return false
  437. }
  438. return common.FlipCoin()
  439. }
  440. // initMeekConfig is a helper that creates a MeekConfig suitable for the
  441. // selected meek tunnel protocol.
  442. func initMeekConfig(
  443. config *Config,
  444. serverEntry *protocol.ServerEntry,
  445. selectedProtocol,
  446. sessionId string) (*MeekConfig, error) {
  447. // The meek protocol always uses OSSH
  448. psiphonServerAddress := fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.SshObfuscatedPort)
  449. var dialAddress string
  450. useHTTPS := false
  451. useObfuscatedSessionTickets := false
  452. var SNIServerName, hostHeader string
  453. transformedHostName := false
  454. switch selectedProtocol {
  455. case protocol.TUNNEL_PROTOCOL_FRONTED_MEEK:
  456. frontingAddress, frontingHost, err := selectFrontingParameters(serverEntry)
  457. if err != nil {
  458. return nil, common.ContextError(err)
  459. }
  460. dialAddress = fmt.Sprintf("%s:443", frontingAddress)
  461. useHTTPS = true
  462. if !serverEntry.MeekFrontingDisableSNI {
  463. SNIServerName = frontingAddress
  464. if doMeekTransformHostName(config) {
  465. SNIServerName = common.GenerateHostName()
  466. transformedHostName = true
  467. }
  468. }
  469. hostHeader = frontingHost
  470. case protocol.TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP:
  471. frontingAddress, frontingHost, err := selectFrontingParameters(serverEntry)
  472. if err != nil {
  473. return nil, common.ContextError(err)
  474. }
  475. dialAddress = fmt.Sprintf("%s:80", frontingAddress)
  476. hostHeader = frontingHost
  477. case protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK:
  478. dialAddress = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.MeekServerPort)
  479. hostname := serverEntry.IpAddress
  480. if doMeekTransformHostName(config) {
  481. hostname = common.GenerateHostName()
  482. transformedHostName = true
  483. }
  484. if serverEntry.MeekServerPort == 80 {
  485. hostHeader = hostname
  486. } else {
  487. hostHeader = fmt.Sprintf("%s:%d", hostname, serverEntry.MeekServerPort)
  488. }
  489. case protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  490. protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_SESSION_TICKET:
  491. dialAddress = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.MeekServerPort)
  492. useHTTPS = true
  493. if selectedProtocol == protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_SESSION_TICKET {
  494. useObfuscatedSessionTickets = true
  495. }
  496. SNIServerName = serverEntry.IpAddress
  497. if doMeekTransformHostName(config) {
  498. SNIServerName = common.GenerateHostName()
  499. transformedHostName = true
  500. }
  501. if serverEntry.MeekServerPort == 443 {
  502. hostHeader = serverEntry.IpAddress
  503. } else {
  504. hostHeader = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.MeekServerPort)
  505. }
  506. default:
  507. return nil, common.ContextError(errors.New("unexpected selectedProtocol"))
  508. }
  509. // The underlying TLS will automatically disable SNI for IP address server name
  510. // values; we have this explicit check here so we record the correct value for stats.
  511. if net.ParseIP(SNIServerName) != nil {
  512. SNIServerName = ""
  513. }
  514. // Pin the TLS profile for the entire meek connection.
  515. selectedTLSProfile := SelectTLSProfile(
  516. config.UseIndistinguishableTLS,
  517. useObfuscatedSessionTickets,
  518. true,
  519. config.TrustedCACertificatesFilename != "")
  520. return &MeekConfig{
  521. DialAddress: dialAddress,
  522. UseHTTPS: useHTTPS,
  523. TLSProfile: selectedTLSProfile,
  524. UseObfuscatedSessionTickets: useObfuscatedSessionTickets,
  525. SNIServerName: SNIServerName,
  526. HostHeader: hostHeader,
  527. TransformedHostName: transformedHostName,
  528. PsiphonServerAddress: psiphonServerAddress,
  529. SessionID: sessionId,
  530. ClientTunnelProtocol: selectedProtocol,
  531. MeekCookieEncryptionPublicKey: serverEntry.MeekCookieEncryptionPublicKey,
  532. MeekObfuscatedKey: serverEntry.MeekObfuscatedKey,
  533. }, nil
  534. }
  535. type dialResult struct {
  536. dialConn net.Conn
  537. monitoredConn *common.ActivityMonitoredConn
  538. sshClient *ssh.Client
  539. sshRequests <-chan *ssh.Request
  540. dialStats *TunnelDialStats
  541. }
  542. // dialSsh is a helper that builds the transport layers and establishes the SSH connection.
  543. // When additional dial configuration is used, DialStats are recorded and returned.
  544. //
  545. // The net.Conn return value is the value to be removed from pendingConns; additional
  546. // layering (ThrottledConn, ActivityMonitoredConn) is applied, but this return value is the
  547. // base dial conn. The *ActivityMonitoredConn return value is the layered conn passed into
  548. // the ssh.Client.
  549. func dialSsh(
  550. config *Config,
  551. pendingConns *common.Conns,
  552. serverEntry *protocol.ServerEntry,
  553. selectedProtocol,
  554. sessionId string) (*dialResult, error) {
  555. // The meek protocols tunnel obfuscated SSH. Obfuscated SSH is layered on top of SSH.
  556. // So depending on which protocol is used, multiple layers are initialized.
  557. // Note: when SSHClientVersion is "", a default is supplied by the ssh package:
  558. // https://godoc.org/golang.org/x/crypto/ssh#ClientConfig
  559. var selectedSSHClientVersion bool
  560. SSHClientVersion := ""
  561. useObfuscatedSsh := false
  562. var directTCPDialAddress string
  563. var meekConfig *MeekConfig
  564. var err error
  565. switch selectedProtocol {
  566. case protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH:
  567. useObfuscatedSsh = true
  568. directTCPDialAddress = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.SshObfuscatedPort)
  569. case protocol.TUNNEL_PROTOCOL_SSH:
  570. selectedSSHClientVersion = true
  571. SSHClientVersion = pickSSHClientVersion()
  572. directTCPDialAddress = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.SshPort)
  573. default:
  574. useObfuscatedSsh = true
  575. meekConfig, err = initMeekConfig(config, serverEntry, selectedProtocol, sessionId)
  576. if err != nil {
  577. return nil, common.ContextError(err)
  578. }
  579. }
  580. // Set User Agent when using meek or an upstream HTTP proxy
  581. var selectedUserAgent bool
  582. dialCustomHeaders := config.CustomHeaders
  583. var upstreamProxyType string
  584. if config.UpstreamProxyUrl != "" {
  585. // Note: UpstreamProxyUrl will be validated in the dial
  586. proxyURL, err := url.Parse(config.UpstreamProxyUrl)
  587. if err == nil {
  588. upstreamProxyType = proxyURL.Scheme
  589. }
  590. }
  591. if meekConfig != nil || upstreamProxyType == "http" {
  592. dialCustomHeaders, selectedUserAgent = UserAgentIfUnset(dialCustomHeaders)
  593. }
  594. // Use an asynchronous callback to record the resolved IP address when
  595. // dialing a domain name. Note that DialMeek doesn't immediately
  596. // establish any HTTPS connections, so the resolved IP address won't be
  597. // reported until during/after ssh session establishment (the ssh traffic
  598. // is meek payload). So don't Load() the IP address value until after that
  599. // has completed to ensure a result.
  600. var resolvedIPAddress atomic.Value
  601. resolvedIPAddress.Store("")
  602. setResolvedIPAddress := func(IPAddress string) {
  603. resolvedIPAddress.Store(IPAddress)
  604. }
  605. dialConfig := &DialConfig{
  606. UpstreamProxyUrl: config.UpstreamProxyUrl,
  607. CustomHeaders: dialCustomHeaders,
  608. ConnectTimeout: time.Duration(*config.TunnelConnectTimeoutSeconds) * time.Second,
  609. PendingConns: pendingConns,
  610. DeviceBinder: config.DeviceBinder,
  611. DnsServerGetter: config.DnsServerGetter,
  612. IPv6Synthesizer: config.IPv6Synthesizer,
  613. UseIndistinguishableTLS: config.UseIndistinguishableTLS,
  614. TrustedCACertificatesFilename: config.TrustedCACertificatesFilename,
  615. DeviceRegion: config.DeviceRegion,
  616. ResolvedIPCallback: setResolvedIPAddress,
  617. }
  618. // Gather dial parameters for diagnostic logging and stats reporting
  619. dialStats := &TunnelDialStats{}
  620. if selectedSSHClientVersion {
  621. dialStats.SelectedSSHClientVersion = true
  622. dialStats.SSHClientVersion = SSHClientVersion
  623. }
  624. if selectedUserAgent {
  625. dialStats.SelectedUserAgent = true
  626. dialStats.UserAgent = dialConfig.CustomHeaders.Get("User-Agent")
  627. }
  628. if upstreamProxyType != "" {
  629. dialStats.UpstreamProxyType = upstreamProxyType
  630. dialStats.UpstreamProxyCustomHeaderNames = make([]string, 0)
  631. for name, _ := range dialConfig.CustomHeaders {
  632. if selectedUserAgent && name == "User-Agent" {
  633. continue
  634. }
  635. dialStats.UpstreamProxyCustomHeaderNames = append(dialStats.UpstreamProxyCustomHeaderNames, name)
  636. }
  637. }
  638. if meekConfig != nil {
  639. // Note: dialStats.MeekResolvedIPAddress isn't set until the dial begins,
  640. // so it will always be blank in NoticeConnectingServer.
  641. dialStats.MeekDialAddress = meekConfig.DialAddress
  642. dialStats.MeekResolvedIPAddress = ""
  643. dialStats.MeekSNIServerName = meekConfig.SNIServerName
  644. dialStats.MeekHostHeader = meekConfig.HostHeader
  645. dialStats.MeekTransformedHostName = meekConfig.TransformedHostName
  646. dialStats.SelectedTLSProfile = true
  647. dialStats.TLSProfile = meekConfig.TLSProfile
  648. }
  649. NoticeConnectingServer(
  650. serverEntry.IpAddress,
  651. serverEntry.Region,
  652. selectedProtocol,
  653. dialStats)
  654. // Create the base transport: meek or direct connection
  655. var dialConn net.Conn
  656. if meekConfig != nil {
  657. dialConn, err = DialMeek(meekConfig, dialConfig)
  658. if err != nil {
  659. return nil, common.ContextError(err)
  660. }
  661. } else {
  662. // For some direct connect servers, DialPluginProtocol
  663. // will layer on another obfuscation protocol.
  664. // Use a copy of DialConfig without pendingConns; the
  665. // DialPluginProtocol must supply and manage its own
  666. // for its base network connections.
  667. pluginDialConfig := new(DialConfig)
  668. *pluginDialConfig = *dialConfig
  669. pluginDialConfig.PendingConns = nil
  670. var dialedPlugin bool
  671. dialedPlugin, dialConn, err = DialPluginProtocol(
  672. config,
  673. NewNoticeWriter("DialPluginProtocol"),
  674. pendingConns,
  675. directTCPDialAddress,
  676. dialConfig)
  677. if !dialedPlugin && err != nil {
  678. NoticeInfo("DialPluginProtocol intialization failed: %s", err)
  679. }
  680. if dialedPlugin {
  681. NoticeInfo("using DialPluginProtocol for %s", serverEntry.IpAddress)
  682. } else {
  683. // Standard direct connection.
  684. dialConn, err = DialTCP(directTCPDialAddress, dialConfig)
  685. }
  686. if err != nil {
  687. return nil, common.ContextError(err)
  688. }
  689. }
  690. // If dialConn is not a Closer, tunnel failure detection may be slower
  691. _, ok := dialConn.(common.Closer)
  692. if !ok {
  693. NoticeAlert("tunnel.dialSsh: dialConn is not a Closer")
  694. }
  695. cleanupConn := dialConn
  696. defer func() {
  697. // Cleanup on error
  698. if cleanupConn != nil {
  699. cleanupConn.Close()
  700. pendingConns.Remove(cleanupConn)
  701. }
  702. }()
  703. // Activity monitoring is used to measure tunnel duration
  704. monitoredConn, err := common.NewActivityMonitoredConn(dialConn, 0, false, nil, nil)
  705. if err != nil {
  706. return nil, common.ContextError(err)
  707. }
  708. // Apply throttling (if configured)
  709. throttledConn := common.NewThrottledConn(monitoredConn, config.RateLimits)
  710. // Add obfuscated SSH layer
  711. var sshConn net.Conn = throttledConn
  712. if useObfuscatedSsh {
  713. sshConn, err = common.NewObfuscatedSshConn(
  714. common.OBFUSCATION_CONN_MODE_CLIENT, throttledConn, serverEntry.SshObfuscatedKey)
  715. if err != nil {
  716. return nil, common.ContextError(err)
  717. }
  718. }
  719. // Now establish the SSH session over the conn transport
  720. expectedPublicKey, err := base64.StdEncoding.DecodeString(serverEntry.SshHostKey)
  721. if err != nil {
  722. return nil, common.ContextError(err)
  723. }
  724. sshCertChecker := &ssh.CertChecker{
  725. HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
  726. if !bytes.Equal(expectedPublicKey, publicKey.Marshal()) {
  727. return common.ContextError(errors.New("unexpected host public key"))
  728. }
  729. return nil
  730. },
  731. }
  732. sshPasswordPayload := &protocol.SSHPasswordPayload{
  733. SessionId: sessionId,
  734. SshPassword: serverEntry.SshPassword,
  735. ClientCapabilities: []string{protocol.CLIENT_CAPABILITY_SERVER_REQUESTS},
  736. }
  737. payload, err := json.Marshal(sshPasswordPayload)
  738. if err != nil {
  739. return nil, common.ContextError(err)
  740. }
  741. sshClientConfig := &ssh.ClientConfig{
  742. User: serverEntry.SshUsername,
  743. Auth: []ssh.AuthMethod{
  744. ssh.Password(string(payload)),
  745. },
  746. HostKeyCallback: sshCertChecker.CheckHostKey,
  747. ClientVersion: SSHClientVersion,
  748. }
  749. // The ssh session establishment (via ssh.NewClientConn) is wrapped
  750. // in a timeout to ensure it won't hang. We've encountered firewalls
  751. // that allow the TCP handshake to complete but then send a RST to the
  752. // server-side and nothing to the client-side, and if that happens
  753. // while ssh.NewClientConn is reading, it may wait forever. The timeout
  754. // closes the conn, which interrupts it.
  755. // Note: TCP handshake timeouts are provided by TCPConn, and session
  756. // timeouts *after* ssh establishment are provided by the ssh keep alive
  757. // in operate tunnel.
  758. // TODO: adjust the timeout to account for time-elapsed-from-start
  759. type sshNewClientResult struct {
  760. sshClient *ssh.Client
  761. sshRequests <-chan *ssh.Request
  762. err error
  763. }
  764. resultChannel := make(chan *sshNewClientResult, 2)
  765. if *config.TunnelConnectTimeoutSeconds > 0 {
  766. time.AfterFunc(time.Duration(*config.TunnelConnectTimeoutSeconds)*time.Second, func() {
  767. resultChannel <- &sshNewClientResult{nil, nil, errors.New("ssh dial timeout")}
  768. })
  769. }
  770. go func() {
  771. // The following is adapted from ssh.Dial(), here using a custom conn
  772. // The sshAddress is passed through to host key verification callbacks; we don't use it.
  773. sshAddress := ""
  774. sshClientConn, sshChannels, sshRequests, err := ssh.NewClientConn(
  775. sshConn, sshAddress, sshClientConfig)
  776. var sshClient *ssh.Client
  777. if err == nil {
  778. sshClient = ssh.NewClient(sshClientConn, sshChannels, nil)
  779. }
  780. resultChannel <- &sshNewClientResult{sshClient, sshRequests, err}
  781. }()
  782. result := <-resultChannel
  783. if result.err != nil {
  784. return nil, common.ContextError(result.err)
  785. }
  786. // Update dial parameters determined during dial
  787. if dialStats != nil && meekConfig != nil {
  788. dialStats.MeekResolvedIPAddress = resolvedIPAddress.Load().(string)
  789. }
  790. NoticeConnectedServer(
  791. serverEntry.IpAddress,
  792. serverEntry.Region,
  793. selectedProtocol,
  794. dialStats)
  795. cleanupConn = nil
  796. // Note: dialConn may be used to close the underlying network connection
  797. // but should not be used to perform I/O as that would interfere with SSH
  798. // (and also bypasses throttling).
  799. return &dialResult{
  800. dialConn: dialConn,
  801. monitoredConn: monitoredConn,
  802. sshClient: result.sshClient,
  803. sshRequests: result.sshRequests,
  804. dialStats: dialStats},
  805. nil
  806. }
  807. func makeRandomPeriod(min, max time.Duration) time.Duration {
  808. period, err := common.MakeRandomPeriod(min, max)
  809. if err != nil {
  810. NoticeAlert("MakeRandomPeriod failed: %s", err)
  811. // Proceed without random period
  812. period = max
  813. }
  814. return period
  815. }
  816. // operateTunnel monitors the health of the tunnel and performs
  817. // periodic work.
  818. //
  819. // BytesTransferred and TotalBytesTransferred notices are emitted
  820. // for live reporting and diagnostics reporting, respectively.
  821. //
  822. // Status requests are sent to the Psiphon API to report bytes
  823. // transferred.
  824. //
  825. // Periodic SSH keep alive packets are sent to ensure the underlying
  826. // TCP connection isn't terminated by NAT, or other network
  827. // interference -- or test if it has been terminated while the device
  828. // has been asleep. When a keep alive times out, the tunnel is
  829. // considered failed.
  830. //
  831. // An immediate SSH keep alive "probe" is sent to test the tunnel and
  832. // server responsiveness when a port forward failure is detected: a
  833. // failed dial or failed read/write. This keep alive has a shorter
  834. // timeout.
  835. //
  836. // Note that port foward failures may be due to non-failure conditions.
  837. // For example, when the user inputs an invalid domain name and
  838. // resolution is done by the ssh server; or trying to connect to a
  839. // non-white-listed port; and the error message in these cases is not
  840. // distinguishable from a a true server error (a common error message,
  841. // "ssh: rejected: administratively prohibited (open failed)", may be
  842. // returned for these cases but also if the server has run out of
  843. // ephemeral ports, for example).
  844. //
  845. // SSH keep alives are not sent when the tunnel has been recently
  846. // active (not only does tunnel activity obviate the necessity of a keep
  847. // alive, testing has shown that keep alives may time out for "busy"
  848. // tunnels, especially over meek protocol and other high latency
  849. // conditions).
  850. //
  851. // "Recently active" is defined has having received payload bytes. Sent
  852. // bytes are not considered as testing has shown bytes may appear to
  853. // send when certain NAT devices have interfered with the tunnel, while
  854. // no bytes are received. In a pathological case, with DNS implemented
  855. // as tunneled UDP, a browser may wait excessively for a domain name to
  856. // resolve, while no new port forward is attempted which would otherwise
  857. // result in a tunnel failure detection.
  858. //
  859. // TODO: change "recently active" to include having received any
  860. // SSH protocol messages from the server, not just user payload?
  861. //
  862. func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
  863. defer tunnel.operateWaitGroup.Done()
  864. lastBytesReceivedTime := monotime.Now()
  865. lastTotalBytesTransferedTime := monotime.Now()
  866. totalSent := int64(0)
  867. totalReceived := int64(0)
  868. noticeBytesTransferredTicker := time.NewTicker(1 * time.Second)
  869. defer noticeBytesTransferredTicker.Stop()
  870. // The next status request and ssh keep alive times are picked at random,
  871. // from a range, to make the resulting traffic less fingerprintable,
  872. // Note: not using Tickers since these are not fixed time periods.
  873. nextStatusRequestPeriod := func() time.Duration {
  874. return makeRandomPeriod(
  875. PSIPHON_API_STATUS_REQUEST_PERIOD_MIN,
  876. PSIPHON_API_STATUS_REQUEST_PERIOD_MAX)
  877. }
  878. statsTimer := time.NewTimer(nextStatusRequestPeriod())
  879. defer statsTimer.Stop()
  880. // Schedule an immediate status request to deliver any unreported
  881. // persistent stats.
  882. // Note: this may not be effective when there's an outstanding
  883. // asynchronous untunneled final status request is holding the
  884. // persistent stats records. It may also conflict with other
  885. // tunnel candidates which attempt to send an immediate request
  886. // before being discarded. For now, we mitigate this with a short,
  887. // random delay.
  888. unreported := CountUnreportedPersistentStats()
  889. if unreported > 0 {
  890. NoticeInfo("Unreported persistent stats: %d", unreported)
  891. statsTimer.Reset(makeRandomPeriod(
  892. PSIPHON_API_STATUS_REQUEST_SHORT_PERIOD_MIN,
  893. PSIPHON_API_STATUS_REQUEST_SHORT_PERIOD_MAX))
  894. }
  895. nextSshKeepAlivePeriod := func() time.Duration {
  896. return makeRandomPeriod(
  897. TUNNEL_SSH_KEEP_ALIVE_PERIOD_MIN,
  898. TUNNEL_SSH_KEEP_ALIVE_PERIOD_MAX)
  899. }
  900. // TODO: don't initialize timer when config.DisablePeriodicSshKeepAlive is set
  901. sshKeepAliveTimer := time.NewTimer(nextSshKeepAlivePeriod())
  902. if tunnel.config.DisablePeriodicSshKeepAlive {
  903. sshKeepAliveTimer.Stop()
  904. } else {
  905. defer sshKeepAliveTimer.Stop()
  906. }
  907. // Perform network requests in separate goroutines so as not to block
  908. // other operations.
  909. requestsWaitGroup := new(sync.WaitGroup)
  910. requestsWaitGroup.Add(1)
  911. signalStatusRequest := make(chan struct{})
  912. go func() {
  913. defer requestsWaitGroup.Done()
  914. for _ = range signalStatusRequest {
  915. sendStats(tunnel)
  916. }
  917. }()
  918. requestsWaitGroup.Add(1)
  919. signalSshKeepAlive := make(chan time.Duration)
  920. sshKeepAliveError := make(chan error, 1)
  921. go func() {
  922. defer requestsWaitGroup.Done()
  923. for timeout := range signalSshKeepAlive {
  924. err := sendSshKeepAlive(tunnel.sshClient, tunnel.conn, timeout)
  925. if err != nil {
  926. select {
  927. case sshKeepAliveError <- err:
  928. default:
  929. }
  930. }
  931. }
  932. }()
  933. requestsWaitGroup.Add(1)
  934. signalStopClientVerificationRequests := make(chan struct{})
  935. go func() {
  936. defer requestsWaitGroup.Done()
  937. clientVerificationRequestSuccess := true
  938. clientVerificationPayload := ""
  939. failCount := 0
  940. for {
  941. // TODO: use reflect.SelectCase?
  942. if clientVerificationRequestSuccess == true {
  943. failCount = 0
  944. select {
  945. case clientVerificationPayload = <-tunnel.newClientVerificationPayload:
  946. case <-signalStopClientVerificationRequests:
  947. return
  948. }
  949. } else {
  950. // If sendClientVerification failed to send the payload we
  951. // will retry after a delay. Will use a new payload instead
  952. // if that arrives in the meantime.
  953. // If failures count is more than PSIPHON_API_CLIENT_VERIFICATION_REQUEST_MAX_RETRIES
  954. // stop retrying for this tunnel.
  955. failCount += 1
  956. if failCount > PSIPHON_API_CLIENT_VERIFICATION_REQUEST_MAX_RETRIES {
  957. return
  958. }
  959. timeout := time.After(PSIPHON_API_CLIENT_VERIFICATION_REQUEST_RETRY_PERIOD)
  960. select {
  961. case <-timeout:
  962. case clientVerificationPayload = <-tunnel.newClientVerificationPayload:
  963. case <-signalStopClientVerificationRequests:
  964. return
  965. }
  966. }
  967. clientVerificationRequestSuccess = sendClientVerification(tunnel, clientVerificationPayload)
  968. }
  969. }()
  970. shutdown := false
  971. var err error
  972. for !shutdown && err == nil {
  973. select {
  974. case <-noticeBytesTransferredTicker.C:
  975. sent, received := transferstats.ReportRecentBytesTransferredForServer(
  976. tunnel.serverEntry.IpAddress)
  977. if received > 0 {
  978. lastBytesReceivedTime = monotime.Now()
  979. }
  980. totalSent += sent
  981. totalReceived += received
  982. if lastTotalBytesTransferedTime.Add(TOTAL_BYTES_TRANSFERRED_NOTICE_PERIOD).Before(monotime.Now()) {
  983. NoticeTotalBytesTransferred(tunnel.serverEntry.IpAddress, totalSent, totalReceived)
  984. lastTotalBytesTransferedTime = monotime.Now()
  985. }
  986. // Only emit the frequent BytesTransferred notice when tunnel is not idle.
  987. if tunnel.config.EmitBytesTransferred && (sent > 0 || received > 0) {
  988. NoticeBytesTransferred(tunnel.serverEntry.IpAddress, sent, received)
  989. }
  990. case <-statsTimer.C:
  991. select {
  992. case signalStatusRequest <- *new(struct{}):
  993. default:
  994. }
  995. statsTimer.Reset(nextStatusRequestPeriod())
  996. case <-sshKeepAliveTimer.C:
  997. if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PERIODIC_INACTIVE_PERIOD).Before(monotime.Now()) {
  998. select {
  999. case signalSshKeepAlive <- time.Duration(*tunnel.config.TunnelSshKeepAlivePeriodicTimeoutSeconds) * time.Second:
  1000. default:
  1001. }
  1002. }
  1003. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  1004. case <-tunnel.signalPortForwardFailure:
  1005. // Note: no mutex on portForwardFailureTotal; only referenced here
  1006. tunnel.totalPortForwardFailures++
  1007. NoticeInfo("port forward failures for %s: %d",
  1008. tunnel.serverEntry.IpAddress, tunnel.totalPortForwardFailures)
  1009. // If the underlying Conn has closed (meek and other plugin protocols may close
  1010. // themselves in certain error conditions), the tunnel has certainly failed.
  1011. // Otherwise, probe with an SSH keep alive.
  1012. if tunnel.conn.IsClosed() {
  1013. err = errors.New("underlying conn is closed")
  1014. } else {
  1015. if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PROBE_INACTIVE_PERIOD).Before(monotime.Now()) {
  1016. select {
  1017. case signalSshKeepAlive <- time.Duration(*tunnel.config.TunnelSshKeepAliveProbeTimeoutSeconds) * time.Second:
  1018. default:
  1019. }
  1020. }
  1021. if !tunnel.config.DisablePeriodicSshKeepAlive {
  1022. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  1023. }
  1024. }
  1025. case err = <-sshKeepAliveError:
  1026. case serverRequest := <-tunnel.sshServerRequests:
  1027. if serverRequest != nil {
  1028. err := HandleServerRequest(tunnelOwner, tunnel, serverRequest.Type, serverRequest.Payload)
  1029. if err == nil {
  1030. serverRequest.Reply(true, nil)
  1031. } else {
  1032. NoticeAlert("HandleServerRequest for %s failed: %s", serverRequest.Type, err)
  1033. serverRequest.Reply(false, nil)
  1034. }
  1035. }
  1036. case <-tunnel.shutdownOperateBroadcast:
  1037. shutdown = true
  1038. }
  1039. }
  1040. close(signalSshKeepAlive)
  1041. close(signalStatusRequest)
  1042. close(signalStopClientVerificationRequests)
  1043. requestsWaitGroup.Wait()
  1044. // Capture bytes transferred since the last noticeBytesTransferredTicker tick
  1045. sent, received := transferstats.ReportRecentBytesTransferredForServer(tunnel.serverEntry.IpAddress)
  1046. totalSent += sent
  1047. totalReceived += received
  1048. // Always emit a final NoticeTotalBytesTransferred
  1049. NoticeTotalBytesTransferred(tunnel.serverEntry.IpAddress, totalSent, totalReceived)
  1050. // Tunnel does not have a serverContext when DisableApi is set.
  1051. if tunnel.serverContext != nil && !tunnel.IsDiscarded() {
  1052. // The stats for this tunnel will be reported via the next successful
  1053. // status request.
  1054. // Since client clocks are unreliable, we report the server's timestamp from
  1055. // the handshake response as the absolute tunnel start time. This time
  1056. // will be slightly earlier than the actual tunnel activation time, as the
  1057. // client has to receive and parse the response and activate the tunnel.
  1058. tunnelStartTime := tunnel.serverContext.serverHandshakeTimestamp
  1059. // For the tunnel duration calculation, we use the local clock. The start time
  1060. // is tunnel.establishedTime as recorded when the tunnel was established. For the
  1061. // end time, we do not use the current time as we may now be long past the
  1062. // actual termination time of the tunnel. For example, the host or device may
  1063. // have resumed after a long sleep (it's not clear that the monotonic clock service
  1064. // used to measure elapsed time will or will not stop during device sleep). Instead,
  1065. // we use the last data received time as the estimated tunnel end time.
  1066. //
  1067. // One potential issue with using the last received time is receiving data
  1068. // after an extended sleep because the device sleep occured with data still in
  1069. // the OS socket read buffer. This is not expected to happen on Android, as the
  1070. // OS will wake a process when it has TCP data available to read. (For this reason,
  1071. // the actual long sleep issue is only with an idle tunnel; in this case the client
  1072. // is responsible for sending SSH keep alives but a device sleep will delay the
  1073. // golang SSH keep alive timer.)
  1074. //
  1075. // Idle tunnels will only read data when a SSH keep alive is sent. As a result,
  1076. // the last-received-time scheme can undercount tunnel durations by up to
  1077. // TUNNEL_SSH_KEEP_ALIVE_PERIOD_MAX for idle tunnels.
  1078. tunnelDuration := tunnel.conn.GetLastActivityMonotime().Sub(tunnel.establishedTime)
  1079. // tunnelDuration can be < 0 when tunnel.establishedTime is recorded after the
  1080. // last tunnel.conn.Read() succeeds. In that case, the last read would be the
  1081. // handshake response, so the tunnel had, essentially, no duration.
  1082. if tunnelDuration < 0 {
  1083. tunnelDuration = 0
  1084. }
  1085. err := RecordTunnelStat(
  1086. tunnel.serverContext.sessionId,
  1087. tunnel.serverContext.tunnelNumber,
  1088. tunnel.serverEntry.IpAddress,
  1089. fmt.Sprintf("%d", tunnel.establishDuration),
  1090. tunnelStartTime,
  1091. fmt.Sprintf("%d", tunnelDuration),
  1092. totalSent,
  1093. totalReceived)
  1094. if err != nil {
  1095. NoticeAlert("RecordTunnelStats failed: %s", common.ContextError(err))
  1096. }
  1097. }
  1098. // Final status request notes:
  1099. //
  1100. // It's highly desirable to send a final status request in order to report
  1101. // domain bytes transferred stats as well as to report tunnel stats as
  1102. // soon as possible. For this reason, we attempt untunneled requests when
  1103. // the tunneled request isn't possible or has failed.
  1104. //
  1105. // In an orderly shutdown (err == nil), the Controller is stopping and
  1106. // everything must be wrapped up quickly. Also, we still have a working
  1107. // tunnel. So we first attempt a tunneled status request (with a short
  1108. // timeout) and then attempt, synchronously -- otherwise the Contoller's
  1109. // runWaitGroup.Wait() will return while a request is still in progress
  1110. // -- untunneled requests (also with short timeouts). Note that in this
  1111. // case the untunneled request will opt out of untunneledPendingConns so
  1112. // that it's not inadvertently canceled by the Controller shutdown
  1113. // sequence (see doUntunneledStatusRequest).
  1114. //
  1115. // If the tunnel has failed, the Controller may continue working. We want
  1116. // to re-establish as soon as possible (so don't want to block on status
  1117. // requests, even for a second). We may have a long time to attempt
  1118. // untunneled requests in the background. And there is no tunnel through
  1119. // which to attempt tunneled requests. So we spawn a goroutine to run the
  1120. // untunneled requests, which are allowed a longer timeout. These requests
  1121. // will be interrupted by the Controller's untunneledPendingConns.CloseAll()
  1122. // in the case of a shutdown.
  1123. if err == nil {
  1124. NoticeInfo("shutdown operate tunnel")
  1125. if !sendStats(tunnel) {
  1126. sendUntunneledStats(tunnel, true)
  1127. }
  1128. } else {
  1129. NoticeAlert("operate tunnel error for %s: %s", tunnel.serverEntry.IpAddress, err)
  1130. go sendUntunneledStats(tunnel, false)
  1131. tunnelOwner.SignalTunnelFailure(tunnel)
  1132. }
  1133. }
  1134. // sendSshKeepAlive is a helper which sends a [email protected] request
  1135. // on the specified SSH connections and returns true of the request succeeds
  1136. // within a specified timeout. If the request fails, the associated conn is
  1137. // closed, which will terminate the associated tunnel.
  1138. func sendSshKeepAlive(
  1139. sshClient *ssh.Client, conn net.Conn, timeout time.Duration) error {
  1140. errChannel := make(chan error, 2)
  1141. if timeout > 0 {
  1142. time.AfterFunc(timeout, func() {
  1143. errChannel <- errors.New("timed out")
  1144. })
  1145. }
  1146. go func() {
  1147. // Random padding to frustrate fingerprinting
  1148. randomPadding, err := common.MakeSecureRandomPadding(0, TUNNEL_SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES)
  1149. if err != nil {
  1150. NoticeAlert("MakeSecureRandomPadding failed: %s", err)
  1151. // Proceed without random padding
  1152. randomPadding = make([]byte, 0)
  1153. }
  1154. // Note: reading a reply is important for last-received-time tunnel
  1155. // duration calculation.
  1156. _, _, err = sshClient.SendRequest("[email protected]", true, randomPadding)
  1157. errChannel <- err
  1158. }()
  1159. err := <-errChannel
  1160. if err != nil {
  1161. sshClient.Close()
  1162. conn.Close()
  1163. }
  1164. return common.ContextError(err)
  1165. }
  1166. // sendStats is a helper for sending session stats to the server.
  1167. func sendStats(tunnel *Tunnel) bool {
  1168. // Tunnel does not have a serverContext when DisableApi is set
  1169. if tunnel.serverContext == nil {
  1170. return true
  1171. }
  1172. // Skip when tunnel is discarded
  1173. if tunnel.IsDiscarded() {
  1174. return true
  1175. }
  1176. err := tunnel.serverContext.DoStatusRequest(tunnel)
  1177. if err != nil {
  1178. NoticeAlert("DoStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
  1179. }
  1180. return err == nil
  1181. }
  1182. // sendUntunnelStats sends final status requests directly to Psiphon
  1183. // servers after the tunnel has already failed. This is an attempt
  1184. // to retain useful bytes transferred stats.
  1185. func sendUntunneledStats(tunnel *Tunnel, isShutdown bool) {
  1186. // Tunnel does not have a serverContext when DisableApi is set
  1187. if tunnel.serverContext == nil {
  1188. return
  1189. }
  1190. // Skip when tunnel is discarded
  1191. if tunnel.IsDiscarded() {
  1192. return
  1193. }
  1194. err := tunnel.serverContext.TryUntunneledStatusRequest(isShutdown)
  1195. if err != nil {
  1196. NoticeAlert("TryUntunneledStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
  1197. }
  1198. }
  1199. // sendClientVerification is a helper for sending a client verification request
  1200. // to the server.
  1201. func sendClientVerification(tunnel *Tunnel, clientVerificationPayload string) bool {
  1202. // Tunnel does not have a serverContext when DisableApi is set
  1203. if tunnel.serverContext == nil {
  1204. return true
  1205. }
  1206. // Skip when tunnel is discarded
  1207. if tunnel.IsDiscarded() {
  1208. return true
  1209. }
  1210. err := tunnel.serverContext.DoClientVerificationRequest(clientVerificationPayload, tunnel.serverEntry.IpAddress)
  1211. if err != nil {
  1212. NoticeAlert("DoClientVerificationRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
  1213. }
  1214. return err == nil
  1215. }