tunnel.go 49 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "context"
  23. "encoding/base64"
  24. "encoding/json"
  25. "errors"
  26. "fmt"
  27. "io"
  28. "net"
  29. "net/url"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "github.com/Psiphon-Inc/goarista/monotime"
  34. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  35. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/ssh"
  36. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/transferstats"
  38. regen "github.com/zach-klippenstein/goregen"
  39. )
  40. // Tunneler specifies the interface required by components that use a tunnel.
  41. // Components which use this interface may be serviced by a single Tunnel instance,
  42. // or a Controller which manages a pool of tunnels, or any other object which
  43. // implements Tunneler.
  44. type Tunneler interface {
  45. // Dial creates a tunneled connection.
  46. //
  47. // alwaysTunnel indicates that the connection should always be tunneled. If this
  48. // is not set, the connection may be made directly, depending on split tunnel
  49. // classification, when that feature is supported and active.
  50. //
  51. // downstreamConn is an optional parameter which specifies a connection to be
  52. // explicitly closed when the Dialed connection is closed. For instance, this
  53. // is used to close downstreamConn App<->LocalProxy connections when the related
  54. // LocalProxy<->SshPortForward connections close.
  55. Dial(remoteAddr string, alwaysTunnel bool, downstreamConn net.Conn) (conn net.Conn, err error)
  56. DirectDial(remoteAddr string) (conn net.Conn, err error)
  57. SignalComponentFailure()
  58. }
  59. // TunnelOwner specifies the interface required by Tunnel to notify its
  60. // owner when it has failed. The owner may, as in the case of the Controller,
  61. // remove the tunnel from its list of active tunnels.
  62. type TunnelOwner interface {
  63. SignalSeededNewSLOK()
  64. SignalTunnelFailure(tunnel *Tunnel)
  65. }
  66. // Tunnel is a connection to a Psiphon server. An established
  67. // tunnel includes a network connection to the specified server
  68. // and an SSH session built on top of that transport.
  69. type Tunnel struct {
  70. mutex *sync.Mutex
  71. config *Config
  72. isActivated bool
  73. isDiscarded bool
  74. isClosed bool
  75. sessionId string
  76. serverEntry *protocol.ServerEntry
  77. serverContext *ServerContext
  78. protocol string
  79. conn *common.ActivityMonitoredConn
  80. sshClient *ssh.Client
  81. sshServerRequests <-chan *ssh.Request
  82. operateWaitGroup *sync.WaitGroup
  83. operateCtx context.Context
  84. stopOperate context.CancelFunc
  85. signalPortForwardFailure chan struct{}
  86. totalPortForwardFailures int
  87. adjustedEstablishStartTime monotime.Time
  88. establishDuration time.Duration
  89. establishedTime monotime.Time
  90. dialStats *TunnelDialStats
  91. newClientVerificationPayload chan string
  92. }
  93. // TunnelDialStats records additional dial config that is sent to the server for stats
  94. // recording. This data is used to analyze which configuration settings are successful
  95. // in various circumvention contexts, and includes meek dial params and upstream proxy
  96. // params.
  97. // For upstream proxy, only proxy type and custom header names are recorded; proxy
  98. // address and custom header values are considered PII.
  99. type TunnelDialStats struct {
  100. SelectedSSHClientVersion bool
  101. SSHClientVersion string
  102. UpstreamProxyType string
  103. UpstreamProxyCustomHeaderNames []string
  104. MeekDialAddress string
  105. MeekResolvedIPAddress string
  106. MeekSNIServerName string
  107. MeekHostHeader string
  108. MeekTransformedHostName bool
  109. SelectedUserAgent bool
  110. UserAgent string
  111. SelectedTLSProfile bool
  112. TLSProfile string
  113. }
  114. // ConnectTunnel first makes a network transport connection to the
  115. // Psiphon server and then establishes an SSH client session on top of
  116. // that transport. The SSH server is authenticated using the public
  117. // key in the server entry.
  118. // Depending on the server's capabilities, the connection may use
  119. // plain SSH over TCP, obfuscated SSH over TCP, or obfuscated SSH over
  120. // HTTP (meek protocol).
  121. // When requiredProtocol is not blank, that protocol is used. Otherwise,
  122. // the a random supported protocol is used.
  123. //
  124. // Call Activate on a connected tunnel to complete its establishment
  125. // before using.
  126. //
  127. // Tunnel establishment is split into two phases: connection, and
  128. // activation. The Controller will run many ConnectTunnel calls
  129. // concurrently and then, to avoid unnecessary overhead from making
  130. // handshake requests and starting operateTunnel from tunnels which
  131. // may be discarded, call Activate on connected tunnels sequentially
  132. // as necessary.
  133. //
  134. func ConnectTunnel(
  135. ctx context.Context,
  136. config *Config,
  137. sessionId string,
  138. serverEntry *protocol.ServerEntry,
  139. selectedProtocol string,
  140. adjustedEstablishStartTime monotime.Time) (*Tunnel, error) {
  141. if !serverEntry.SupportsProtocol(selectedProtocol) {
  142. return nil, common.ContextError(fmt.Errorf("server does not support selected protocol"))
  143. }
  144. // Build transport layers and establish SSH connection. Note that
  145. // dialConn and monitoredConn are the same network connection.
  146. dialResult, err := dialSsh(
  147. ctx, config, serverEntry, selectedProtocol, sessionId)
  148. if err != nil {
  149. return nil, common.ContextError(err)
  150. }
  151. // The tunnel is now connected
  152. return &Tunnel{
  153. mutex: new(sync.Mutex),
  154. config: config,
  155. sessionId: sessionId,
  156. serverEntry: serverEntry,
  157. protocol: selectedProtocol,
  158. conn: dialResult.monitoredConn,
  159. sshClient: dialResult.sshClient,
  160. sshServerRequests: dialResult.sshRequests,
  161. // A buffer allows at least one signal to be sent even when the receiver is
  162. // not listening. Senders should not block.
  163. signalPortForwardFailure: make(chan struct{}, 1),
  164. adjustedEstablishStartTime: adjustedEstablishStartTime,
  165. dialStats: dialResult.dialStats,
  166. // Buffer allows SetClientVerificationPayload to submit one new payload
  167. // without blocking or dropping it.
  168. newClientVerificationPayload: make(chan string, 1),
  169. }, nil
  170. }
  171. // Activate completes the tunnel establishment, performing the handshake
  172. // request and starting operateTunnel, the worker that monitors the tunnel
  173. // and handles periodic management.
  174. func (tunnel *Tunnel) Activate(
  175. ctx context.Context,
  176. tunnelOwner TunnelOwner) error {
  177. // Create a new Psiphon API server context for this tunnel. This includes
  178. // performing a handshake request. If the handshake fails, this activation
  179. // fails.
  180. var serverContext *ServerContext
  181. if !tunnel.config.DisableApi {
  182. NoticeInfo("starting server context for %s", tunnel.serverEntry.IpAddress)
  183. // Call NewServerContext in a goroutine, as it blocks on a network operation,
  184. // the handshake request, and would block shutdown. If the shutdown signal is
  185. // received, close the tunnel, which will interrupt the handshake request
  186. // that may be blocking NewServerContext.
  187. //
  188. // Timeout after PsiphonApiServerTimeoutSeconds. NewServerContext may not
  189. // return if the tunnel network connection is unstable during the handshake
  190. // request. At this point, there is no operateTunnel monitor that will detect
  191. // this condition with SSH keep alives.
  192. if *tunnel.config.PsiphonApiServerTimeoutSeconds > 0 {
  193. var cancelFunc context.CancelFunc
  194. ctx, cancelFunc = context.WithTimeout(
  195. ctx, time.Second*time.Duration(*tunnel.config.PsiphonApiServerTimeoutSeconds))
  196. defer cancelFunc()
  197. }
  198. type newServerContextResult struct {
  199. serverContext *ServerContext
  200. err error
  201. }
  202. resultChannel := make(chan newServerContextResult)
  203. go func() {
  204. serverContext, err := NewServerContext(tunnel)
  205. resultChannel <- newServerContextResult{
  206. serverContext: serverContext,
  207. err: err,
  208. }
  209. }()
  210. var result newServerContextResult
  211. select {
  212. case result = <-resultChannel:
  213. case <-ctx.Done():
  214. result.err = ctx.Err()
  215. // Interrupt the goroutine
  216. tunnel.Close(true)
  217. <-resultChannel
  218. }
  219. if result.err != nil {
  220. return common.ContextError(
  221. fmt.Errorf("error starting server context for %s: %s",
  222. tunnel.serverEntry.IpAddress, result.err))
  223. }
  224. serverContext = result.serverContext
  225. }
  226. tunnel.mutex.Lock()
  227. // It may happen that the tunnel gets closed while Activate is running.
  228. // In this case, abort here, to ensure that the operateTunnel goroutine
  229. // will not be launched after Close is called.
  230. if tunnel.isClosed {
  231. return common.ContextError(errors.New("tunnel is closed"))
  232. }
  233. tunnel.isActivated = true
  234. tunnel.serverContext = serverContext
  235. // establishDuration is the elapsed time between the controller starting tunnel
  236. // establishment and this tunnel being established. The reported value represents
  237. // how long the user waited between starting the client and having a usable tunnel;
  238. // or how long between the client detecting an unexpected tunnel disconnect and
  239. // completing automatic reestablishment.
  240. //
  241. // This time period may include time spent unsuccessfully connecting to other
  242. // servers. Time spent waiting for network connectivity is excluded.
  243. tunnel.establishDuration = monotime.Since(tunnel.adjustedEstablishStartTime)
  244. tunnel.establishedTime = monotime.Now()
  245. // Use the Background context instead of the controller run context, as tunnels
  246. // are terminated when the controller calls tunnel.Close.
  247. tunnel.operateCtx, tunnel.stopOperate = context.WithCancel(context.Background())
  248. tunnel.operateWaitGroup = new(sync.WaitGroup)
  249. // Spawn the operateTunnel goroutine, which monitors the tunnel and handles periodic
  250. // stats updates.
  251. tunnel.operateWaitGroup.Add(1)
  252. go tunnel.operateTunnel(tunnelOwner)
  253. tunnel.mutex.Unlock()
  254. return nil
  255. }
  256. // Close stops operating the tunnel and closes the underlying connection.
  257. // Supports multiple and/or concurrent calls to Close().
  258. // When isDiscarded is set, operateTunnel will not attempt to send final
  259. // status requests.
  260. func (tunnel *Tunnel) Close(isDiscarded bool) {
  261. tunnel.mutex.Lock()
  262. tunnel.isDiscarded = isDiscarded
  263. isActivated := tunnel.isActivated
  264. isClosed := tunnel.isClosed
  265. tunnel.isClosed = true
  266. tunnel.mutex.Unlock()
  267. if !isClosed {
  268. // Signal operateTunnel to stop before closing the tunnel -- this
  269. // allows a final status request to be made in the case of an orderly
  270. // shutdown.
  271. // A timer is set, so if operateTunnel takes too long to stop, the
  272. // tunnel is closed, which will interrupt any slow final status request.
  273. if isActivated {
  274. afterFunc := time.AfterFunc(TUNNEL_OPERATE_SHUTDOWN_TIMEOUT, func() { tunnel.conn.Close() })
  275. tunnel.stopOperate()
  276. tunnel.operateWaitGroup.Wait()
  277. afterFunc.Stop()
  278. }
  279. tunnel.sshClient.Close()
  280. // tunnel.conn.Close() may get called multiple times, which is allowed.
  281. tunnel.conn.Close()
  282. err := tunnel.sshClient.Wait()
  283. if err != nil {
  284. NoticeAlert("close tunnel ssh error: %s", err)
  285. }
  286. }
  287. }
  288. // IsActivated returns the tunnel's activated flag.
  289. func (tunnel *Tunnel) IsActivated() bool {
  290. tunnel.mutex.Lock()
  291. defer tunnel.mutex.Unlock()
  292. return tunnel.isActivated
  293. }
  294. // IsDiscarded returns the tunnel's discarded flag.
  295. func (tunnel *Tunnel) IsDiscarded() bool {
  296. tunnel.mutex.Lock()
  297. defer tunnel.mutex.Unlock()
  298. return tunnel.isDiscarded
  299. }
  300. // SendAPIRequest sends an API request as an SSH request through the tunnel.
  301. // This function blocks awaiting a response. Only one request may be in-flight
  302. // at once; a concurrent SendAPIRequest will block until an active request
  303. // receives its response (or the SSH connection is terminated).
  304. func (tunnel *Tunnel) SendAPIRequest(
  305. name string, requestPayload []byte) ([]byte, error) {
  306. ok, responsePayload, err := tunnel.sshClient.Conn.SendRequest(
  307. name, true, requestPayload)
  308. if err != nil {
  309. return nil, common.ContextError(err)
  310. }
  311. if !ok {
  312. return nil, common.ContextError(errors.New("API request rejected"))
  313. }
  314. return responsePayload, nil
  315. }
  316. // Dial establishes a port forward connection through the tunnel
  317. // This Dial doesn't support split tunnel, so alwaysTunnel is not referenced
  318. func (tunnel *Tunnel) Dial(
  319. remoteAddr string, alwaysTunnel bool, downstreamConn net.Conn) (conn net.Conn, err error) {
  320. if !tunnel.IsActivated() {
  321. return nil, common.ContextError(errors.New("tunnel is not activated"))
  322. }
  323. type tunnelDialResult struct {
  324. sshPortForwardConn net.Conn
  325. err error
  326. }
  327. // Note: there is no dial context since SSH port forward dials cannot
  328. // be interrupted directly. Closing the tunnel will interrupt the dials.
  329. // A timeout is set to unblock this function, but the goroutine may
  330. // not exit until the tunnel is closed.
  331. // Use a buffer of 1 as there are two senders and only one guaranteed receive.
  332. resultChannel := make(chan *tunnelDialResult, 1)
  333. if *tunnel.config.TunnelPortForwardDialTimeoutSeconds > 0 {
  334. afterFunc := time.AfterFunc(time.Duration(*tunnel.config.TunnelPortForwardDialTimeoutSeconds)*time.Second, func() {
  335. resultChannel <- &tunnelDialResult{nil, errors.New("tunnel dial timeout")}
  336. })
  337. defer afterFunc.Stop()
  338. }
  339. go func() {
  340. sshPortForwardConn, err := tunnel.sshClient.Dial("tcp", remoteAddr)
  341. resultChannel <- &tunnelDialResult{sshPortForwardConn, err}
  342. }()
  343. result := <-resultChannel
  344. if result.err != nil {
  345. // TODO: conditional on type of error or error message?
  346. select {
  347. case tunnel.signalPortForwardFailure <- *new(struct{}):
  348. default:
  349. }
  350. return nil, common.ContextError(result.err)
  351. }
  352. conn = &TunneledConn{
  353. Conn: result.sshPortForwardConn,
  354. tunnel: tunnel,
  355. downstreamConn: downstreamConn}
  356. return tunnel.wrapWithTransferStats(conn), nil
  357. }
  358. func (tunnel *Tunnel) DialPacketTunnelChannel() (net.Conn, error) {
  359. if !tunnel.IsActivated() {
  360. return nil, common.ContextError(errors.New("tunnel is not activated"))
  361. }
  362. channel, requests, err := tunnel.sshClient.OpenChannel(
  363. protocol.PACKET_TUNNEL_CHANNEL_TYPE, nil)
  364. if err != nil {
  365. // TODO: conditional on type of error or error message?
  366. select {
  367. case tunnel.signalPortForwardFailure <- *new(struct{}):
  368. default:
  369. }
  370. return nil, common.ContextError(err)
  371. }
  372. go ssh.DiscardRequests(requests)
  373. conn := newChannelConn(channel)
  374. // wrapWithTransferStats will track bytes transferred for the
  375. // packet tunnel. It will count packet overhead (TCP/UDP/IP headers).
  376. //
  377. // Since the data in the channel is not HTTP or TLS, no domain bytes
  378. // counting is expected.
  379. //
  380. // transferstats are also used to determine that there's been recent
  381. // activity and skip periodic SSH keep alives; see Tunnel.operateTunnel.
  382. return tunnel.wrapWithTransferStats(conn), nil
  383. }
  384. func (tunnel *Tunnel) wrapWithTransferStats(conn net.Conn) net.Conn {
  385. // Tunnel does not have a serverContext when DisableApi is set. We still use
  386. // transferstats.Conn to count bytes transferred for monitoring tunnel
  387. // quality.
  388. var regexps *transferstats.Regexps
  389. if tunnel.serverContext != nil {
  390. regexps = tunnel.serverContext.StatsRegexps()
  391. }
  392. return transferstats.NewConn(conn, tunnel.serverEntry.IpAddress, regexps)
  393. }
  394. // SignalComponentFailure notifies the tunnel that an associated component has failed.
  395. // This will terminate the tunnel.
  396. func (tunnel *Tunnel) SignalComponentFailure() {
  397. NoticeAlert("tunnel received component failure signal")
  398. tunnel.Close(false)
  399. }
  400. // SetClientVerificationPayload triggers a client verification request, for this
  401. // tunnel, with the specified verifiction payload. If the tunnel is not yet established,
  402. // the payload/request is enqueued. If a payload/request is already eneueued, the
  403. // new payload is dropped.
  404. func (tunnel *Tunnel) SetClientVerificationPayload(clientVerificationPayload string) {
  405. select {
  406. case tunnel.newClientVerificationPayload <- clientVerificationPayload:
  407. default:
  408. }
  409. }
  410. // TunneledConn implements net.Conn and wraps a port forward connection.
  411. // It is used to hook into Read and Write to observe I/O errors and
  412. // report these errors back to the tunnel monitor as port forward failures.
  413. // TunneledConn optionally tracks a peer connection to be explicitly closed
  414. // when the TunneledConn is closed.
  415. type TunneledConn struct {
  416. net.Conn
  417. tunnel *Tunnel
  418. downstreamConn net.Conn
  419. }
  420. func (conn *TunneledConn) Read(buffer []byte) (n int, err error) {
  421. n, err = conn.Conn.Read(buffer)
  422. if err != nil && err != io.EOF {
  423. // Report new failure. Won't block; assumes the receiver
  424. // has a sufficient buffer for the threshold number of reports.
  425. // TODO: conditional on type of error or error message?
  426. select {
  427. case conn.tunnel.signalPortForwardFailure <- *new(struct{}):
  428. default:
  429. }
  430. }
  431. return
  432. }
  433. func (conn *TunneledConn) Write(buffer []byte) (n int, err error) {
  434. n, err = conn.Conn.Write(buffer)
  435. if err != nil && err != io.EOF {
  436. // Same as TunneledConn.Read()
  437. select {
  438. case conn.tunnel.signalPortForwardFailure <- *new(struct{}):
  439. default:
  440. }
  441. }
  442. return
  443. }
  444. func (conn *TunneledConn) Close() error {
  445. if conn.downstreamConn != nil {
  446. conn.downstreamConn.Close()
  447. }
  448. return conn.Conn.Close()
  449. }
  450. var errProtocolNotSupported = errors.New("server does not support required protocol(s)")
  451. // selectProtocol is a helper that picks the tunnel protocol
  452. func selectProtocol(
  453. config *Config,
  454. serverEntry *protocol.ServerEntry,
  455. excludeMeek bool) (selectedProtocol string, err error) {
  456. // TODO: properly handle protocols (e.g. FRONTED-MEEK-OSSH) vs. capabilities (e.g., {FRONTED-MEEK, OSSH})
  457. // for now, the code is simply assuming that MEEK capabilities imply OSSH capability.
  458. if config.TunnelProtocol != "" {
  459. if !serverEntry.SupportsProtocol(config.TunnelProtocol) ||
  460. (excludeMeek && protocol.TunnelProtocolUsesMeek(config.TunnelProtocol)) {
  461. return "", errProtocolNotSupported
  462. }
  463. selectedProtocol = config.TunnelProtocol
  464. } else {
  465. // Pick at random from the supported protocols. This ensures that we'll eventually
  466. // try all possible protocols. Depending on network configuration, it may be the
  467. // case that some protocol is only available through multi-capability servers,
  468. // and a simpler ranked preference of protocols could lead to that protocol never
  469. // being selected.
  470. candidateProtocols := serverEntry.GetSupportedProtocols(excludeMeek)
  471. if len(candidateProtocols) == 0 {
  472. return "", errProtocolNotSupported
  473. }
  474. index, err := common.MakeSecureRandomInt(len(candidateProtocols))
  475. if err != nil {
  476. return "", common.ContextError(err)
  477. }
  478. selectedProtocol = candidateProtocols[index]
  479. }
  480. return selectedProtocol, nil
  481. }
  482. // selectFrontingParameters is a helper which selects/generates meek fronting
  483. // parameters where the server entry provides multiple options or patterns.
  484. func selectFrontingParameters(
  485. serverEntry *protocol.ServerEntry) (frontingAddress, frontingHost string, err error) {
  486. if len(serverEntry.MeekFrontingAddressesRegex) > 0 {
  487. // Generate a front address based on the regex.
  488. frontingAddress, err = regen.Generate(serverEntry.MeekFrontingAddressesRegex)
  489. if err != nil {
  490. return "", "", common.ContextError(err)
  491. }
  492. } else {
  493. // Randomly select, for this connection attempt, one front address for
  494. // fronting-capable servers.
  495. if len(serverEntry.MeekFrontingAddresses) == 0 {
  496. return "", "", common.ContextError(errors.New("MeekFrontingAddresses is empty"))
  497. }
  498. index, err := common.MakeSecureRandomInt(len(serverEntry.MeekFrontingAddresses))
  499. if err != nil {
  500. return "", "", common.ContextError(err)
  501. }
  502. frontingAddress = serverEntry.MeekFrontingAddresses[index]
  503. }
  504. if len(serverEntry.MeekFrontingHosts) > 0 {
  505. index, err := common.MakeSecureRandomInt(len(serverEntry.MeekFrontingHosts))
  506. if err != nil {
  507. return "", "", common.ContextError(err)
  508. }
  509. frontingHost = serverEntry.MeekFrontingHosts[index]
  510. } else {
  511. // Backwards compatibility case
  512. frontingHost = serverEntry.MeekFrontingHost
  513. }
  514. return
  515. }
  516. func doMeekTransformHostName(config *Config) bool {
  517. switch config.TransformHostNames {
  518. case TRANSFORM_HOST_NAMES_ALWAYS:
  519. return true
  520. case TRANSFORM_HOST_NAMES_NEVER:
  521. return false
  522. }
  523. return common.FlipCoin()
  524. }
  525. // initMeekConfig is a helper that creates a MeekConfig suitable for the
  526. // selected meek tunnel protocol.
  527. func initMeekConfig(
  528. config *Config,
  529. serverEntry *protocol.ServerEntry,
  530. selectedProtocol,
  531. sessionId string) (*MeekConfig, error) {
  532. // The meek protocol always uses OSSH
  533. psiphonServerAddress := fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.SshObfuscatedPort)
  534. var dialAddress string
  535. useHTTPS := false
  536. useObfuscatedSessionTickets := false
  537. var SNIServerName, hostHeader string
  538. transformedHostName := false
  539. switch selectedProtocol {
  540. case protocol.TUNNEL_PROTOCOL_FRONTED_MEEK:
  541. frontingAddress, frontingHost, err := selectFrontingParameters(serverEntry)
  542. if err != nil {
  543. return nil, common.ContextError(err)
  544. }
  545. dialAddress = fmt.Sprintf("%s:443", frontingAddress)
  546. useHTTPS = true
  547. if !serverEntry.MeekFrontingDisableSNI {
  548. SNIServerName = frontingAddress
  549. if doMeekTransformHostName(config) {
  550. SNIServerName = common.GenerateHostName()
  551. transformedHostName = true
  552. }
  553. }
  554. hostHeader = frontingHost
  555. case protocol.TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP:
  556. frontingAddress, frontingHost, err := selectFrontingParameters(serverEntry)
  557. if err != nil {
  558. return nil, common.ContextError(err)
  559. }
  560. dialAddress = fmt.Sprintf("%s:80", frontingAddress)
  561. hostHeader = frontingHost
  562. case protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK:
  563. dialAddress = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.MeekServerPort)
  564. hostname := serverEntry.IpAddress
  565. if doMeekTransformHostName(config) {
  566. hostname = common.GenerateHostName()
  567. transformedHostName = true
  568. }
  569. if serverEntry.MeekServerPort == 80 {
  570. hostHeader = hostname
  571. } else {
  572. hostHeader = fmt.Sprintf("%s:%d", hostname, serverEntry.MeekServerPort)
  573. }
  574. case protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
  575. protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_SESSION_TICKET:
  576. dialAddress = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.MeekServerPort)
  577. useHTTPS = true
  578. if selectedProtocol == protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK_SESSION_TICKET {
  579. useObfuscatedSessionTickets = true
  580. }
  581. SNIServerName = serverEntry.IpAddress
  582. if doMeekTransformHostName(config) {
  583. SNIServerName = common.GenerateHostName()
  584. transformedHostName = true
  585. }
  586. if serverEntry.MeekServerPort == 443 {
  587. hostHeader = serverEntry.IpAddress
  588. } else {
  589. hostHeader = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.MeekServerPort)
  590. }
  591. default:
  592. return nil, common.ContextError(errors.New("unexpected selectedProtocol"))
  593. }
  594. // The underlying TLS will automatically disable SNI for IP address server name
  595. // values; we have this explicit check here so we record the correct value for stats.
  596. if net.ParseIP(SNIServerName) != nil {
  597. SNIServerName = ""
  598. }
  599. // Pin the TLS profile for the entire meek connection.
  600. selectedTLSProfile := SelectTLSProfile(
  601. config.UseIndistinguishableTLS,
  602. useObfuscatedSessionTickets,
  603. true,
  604. config.TrustedCACertificatesFilename != "")
  605. return &MeekConfig{
  606. LimitBufferSizes: config.LimitMeekBufferSizes,
  607. DialAddress: dialAddress,
  608. UseHTTPS: useHTTPS,
  609. TLSProfile: selectedTLSProfile,
  610. UseObfuscatedSessionTickets: useObfuscatedSessionTickets,
  611. SNIServerName: SNIServerName,
  612. HostHeader: hostHeader,
  613. TransformedHostName: transformedHostName,
  614. PsiphonServerAddress: psiphonServerAddress,
  615. SessionID: sessionId,
  616. ClientTunnelProtocol: selectedProtocol,
  617. MeekCookieEncryptionPublicKey: serverEntry.MeekCookieEncryptionPublicKey,
  618. MeekObfuscatedKey: serverEntry.MeekObfuscatedKey,
  619. }, nil
  620. }
  621. type dialResult struct {
  622. dialConn net.Conn
  623. monitoredConn *common.ActivityMonitoredConn
  624. sshClient *ssh.Client
  625. sshRequests <-chan *ssh.Request
  626. dialStats *TunnelDialStats
  627. }
  628. // dialSsh is a helper that builds the transport layers and establishes the SSH connection.
  629. // When additional dial configuration is used, DialStats are recorded and returned.
  630. //
  631. // The net.Conn return value is the value to be removed from pendingConns; additional
  632. // layering (ThrottledConn, ActivityMonitoredConn) is applied, but this return value is the
  633. // base dial conn. The *ActivityMonitoredConn return value is the layered conn passed into
  634. // the ssh.Client.
  635. func dialSsh(
  636. ctx context.Context,
  637. config *Config,
  638. serverEntry *protocol.ServerEntry,
  639. selectedProtocol,
  640. sessionId string) (*dialResult, error) {
  641. if *config.TunnelConnectTimeoutSeconds > 0 {
  642. var cancelFunc context.CancelFunc
  643. ctx, cancelFunc = context.WithTimeout(
  644. ctx, time.Second*time.Duration(*config.TunnelConnectTimeoutSeconds))
  645. defer cancelFunc()
  646. }
  647. // The meek protocols tunnel obfuscated SSH. Obfuscated SSH is layered on top of SSH.
  648. // So depending on which protocol is used, multiple layers are initialized.
  649. // Note: when SSHClientVersion is "", a default is supplied by the ssh package:
  650. // https://godoc.org/golang.org/x/crypto/ssh#ClientConfig
  651. var selectedSSHClientVersion bool
  652. SSHClientVersion := ""
  653. useObfuscatedSsh := false
  654. var directTCPDialAddress string
  655. var meekConfig *MeekConfig
  656. var err error
  657. switch selectedProtocol {
  658. case protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH:
  659. useObfuscatedSsh = true
  660. directTCPDialAddress = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.SshObfuscatedPort)
  661. case protocol.TUNNEL_PROTOCOL_SSH:
  662. selectedSSHClientVersion = true
  663. SSHClientVersion = pickSSHClientVersion()
  664. directTCPDialAddress = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.SshPort)
  665. default:
  666. useObfuscatedSsh = true
  667. meekConfig, err = initMeekConfig(config, serverEntry, selectedProtocol, sessionId)
  668. if err != nil {
  669. return nil, common.ContextError(err)
  670. }
  671. }
  672. // Set User Agent when using meek or an upstream HTTP proxy
  673. var selectedUserAgent bool
  674. dialCustomHeaders := config.CustomHeaders
  675. var upstreamProxyType string
  676. if config.UpstreamProxyUrl != "" {
  677. // Note: UpstreamProxyUrl will be validated in the dial
  678. proxyURL, err := url.Parse(config.UpstreamProxyUrl)
  679. if err == nil {
  680. upstreamProxyType = proxyURL.Scheme
  681. }
  682. }
  683. if meekConfig != nil || upstreamProxyType == "http" {
  684. dialCustomHeaders, selectedUserAgent = UserAgentIfUnset(dialCustomHeaders)
  685. }
  686. // Use an asynchronous callback to record the resolved IP address when
  687. // dialing a domain name. Note that DialMeek doesn't immediately
  688. // establish any HTTPS connections, so the resolved IP address won't be
  689. // reported until during/after ssh session establishment (the ssh traffic
  690. // is meek payload). So don't Load() the IP address value until after that
  691. // has completed to ensure a result.
  692. var resolvedIPAddress atomic.Value
  693. resolvedIPAddress.Store("")
  694. setResolvedIPAddress := func(IPAddress string) {
  695. resolvedIPAddress.Store(IPAddress)
  696. }
  697. dialConfig := &DialConfig{
  698. UpstreamProxyUrl: config.UpstreamProxyUrl,
  699. CustomHeaders: dialCustomHeaders,
  700. DeviceBinder: config.DeviceBinder,
  701. DnsServerGetter: config.DnsServerGetter,
  702. IPv6Synthesizer: config.IPv6Synthesizer,
  703. UseIndistinguishableTLS: config.UseIndistinguishableTLS,
  704. TrustedCACertificatesFilename: config.TrustedCACertificatesFilename,
  705. DeviceRegion: config.DeviceRegion,
  706. ResolvedIPCallback: setResolvedIPAddress,
  707. }
  708. // Gather dial parameters for diagnostic logging and stats reporting
  709. dialStats := &TunnelDialStats{}
  710. if selectedSSHClientVersion {
  711. dialStats.SelectedSSHClientVersion = true
  712. dialStats.SSHClientVersion = SSHClientVersion
  713. }
  714. if selectedUserAgent {
  715. dialStats.SelectedUserAgent = true
  716. dialStats.UserAgent = dialConfig.CustomHeaders.Get("User-Agent")
  717. }
  718. if upstreamProxyType != "" {
  719. dialStats.UpstreamProxyType = upstreamProxyType
  720. }
  721. if len(dialConfig.CustomHeaders) > 0 {
  722. dialStats.UpstreamProxyCustomHeaderNames = make([]string, 0)
  723. for name := range dialConfig.CustomHeaders {
  724. if selectedUserAgent && name == "User-Agent" {
  725. continue
  726. }
  727. dialStats.UpstreamProxyCustomHeaderNames = append(dialStats.UpstreamProxyCustomHeaderNames, name)
  728. }
  729. }
  730. if meekConfig != nil {
  731. // Note: dialStats.MeekResolvedIPAddress isn't set until the dial begins,
  732. // so it will always be blank in NoticeConnectingServer.
  733. dialStats.MeekDialAddress = meekConfig.DialAddress
  734. dialStats.MeekResolvedIPAddress = ""
  735. dialStats.MeekSNIServerName = meekConfig.SNIServerName
  736. dialStats.MeekHostHeader = meekConfig.HostHeader
  737. dialStats.MeekTransformedHostName = meekConfig.TransformedHostName
  738. dialStats.SelectedTLSProfile = true
  739. dialStats.TLSProfile = meekConfig.TLSProfile
  740. }
  741. NoticeConnectingServer(
  742. serverEntry.IpAddress,
  743. serverEntry.Region,
  744. selectedProtocol,
  745. dialStats)
  746. // Create the base transport: meek or direct connection
  747. var dialConn net.Conn
  748. if meekConfig != nil {
  749. dialConn, err = DialMeek(ctx, meekConfig, dialConfig)
  750. if err != nil {
  751. return nil, common.ContextError(err)
  752. }
  753. } else {
  754. dialConn, err = DialTCP(ctx, directTCPDialAddress, dialConfig)
  755. if err != nil {
  756. return nil, common.ContextError(err)
  757. }
  758. }
  759. // If dialConn is not a Closer, tunnel failure detection may be slower
  760. _, ok := dialConn.(common.Closer)
  761. if !ok {
  762. NoticeAlert("tunnel.dialSsh: dialConn is not a Closer")
  763. }
  764. cleanupConn := dialConn
  765. defer func() {
  766. // Cleanup on error
  767. if cleanupConn != nil {
  768. cleanupConn.Close()
  769. }
  770. }()
  771. // Activity monitoring is used to measure tunnel duration
  772. monitoredConn, err := common.NewActivityMonitoredConn(dialConn, 0, false, nil, nil)
  773. if err != nil {
  774. return nil, common.ContextError(err)
  775. }
  776. // Apply throttling (if configured)
  777. throttledConn := common.NewThrottledConn(monitoredConn, config.RateLimits)
  778. // Add obfuscated SSH layer
  779. var sshConn net.Conn = throttledConn
  780. if useObfuscatedSsh {
  781. sshConn, err = common.NewObfuscatedSshConn(
  782. common.OBFUSCATION_CONN_MODE_CLIENT, throttledConn, serverEntry.SshObfuscatedKey)
  783. if err != nil {
  784. return nil, common.ContextError(err)
  785. }
  786. }
  787. // Now establish the SSH session over the conn transport
  788. expectedPublicKey, err := base64.StdEncoding.DecodeString(serverEntry.SshHostKey)
  789. if err != nil {
  790. return nil, common.ContextError(err)
  791. }
  792. sshCertChecker := &ssh.CertChecker{
  793. HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
  794. if !bytes.Equal(expectedPublicKey, publicKey.Marshal()) {
  795. return common.ContextError(errors.New("unexpected host public key"))
  796. }
  797. return nil
  798. },
  799. }
  800. sshPasswordPayload := &protocol.SSHPasswordPayload{
  801. SessionId: sessionId,
  802. SshPassword: serverEntry.SshPassword,
  803. ClientCapabilities: []string{protocol.CLIENT_CAPABILITY_SERVER_REQUESTS},
  804. }
  805. payload, err := json.Marshal(sshPasswordPayload)
  806. if err != nil {
  807. return nil, common.ContextError(err)
  808. }
  809. sshClientConfig := &ssh.ClientConfig{
  810. User: serverEntry.SshUsername,
  811. Auth: []ssh.AuthMethod{
  812. ssh.Password(string(payload)),
  813. },
  814. HostKeyCallback: sshCertChecker.CheckHostKey,
  815. ClientVersion: SSHClientVersion,
  816. }
  817. // The ssh session establishment (via ssh.NewClientConn) is wrapped
  818. // in a timeout to ensure it won't hang. We've encountered firewalls
  819. // that allow the TCP handshake to complete but then send a RST to the
  820. // server-side and nothing to the client-side, and if that happens
  821. // while ssh.NewClientConn is reading, it may wait forever. The timeout
  822. // closes the conn, which interrupts it.
  823. // Note: TCP handshake timeouts are provided by TCPConn, and session
  824. // timeouts *after* ssh establishment are provided by the ssh keep alive
  825. // in operate tunnel.
  826. type sshNewClientResult struct {
  827. sshClient *ssh.Client
  828. sshRequests <-chan *ssh.Request
  829. err error
  830. }
  831. resultChannel := make(chan sshNewClientResult)
  832. // Call NewClientConn in a goroutine, as it blocks on SSH handshake network
  833. // operations, and would block canceling or shutdown. If the parent context
  834. // is canceled, close the net.Conn underlying SSH, which will interrupt the
  835. // SSH handshake that may be blocking NewClientConn.
  836. go func() {
  837. // The following is adapted from ssh.Dial(), here using a custom conn
  838. // The sshAddress is passed through to host key verification callbacks; we don't use it.
  839. sshAddress := ""
  840. sshClientConn, sshChannels, sshRequests, err := ssh.NewClientConn(
  841. sshConn, sshAddress, sshClientConfig)
  842. var sshClient *ssh.Client
  843. if err == nil {
  844. // sshRequests is handled by operateTunnel.
  845. // ssh.NewClient also expects to handle the sshRequests
  846. // value from ssh.NewClientConn and will spawn a goroutine
  847. // to handle the <-chan *ssh.Request, so we must provide
  848. // a closed channel to ensure that goroutine halts instead
  849. // of hanging on a nil channel.
  850. noRequests := make(chan *ssh.Request)
  851. close(noRequests)
  852. sshClient = ssh.NewClient(sshClientConn, sshChannels, noRequests)
  853. }
  854. resultChannel <- sshNewClientResult{sshClient, sshRequests, err}
  855. }()
  856. var result sshNewClientResult
  857. select {
  858. case result = <-resultChannel:
  859. case <-ctx.Done():
  860. result.err = ctx.Err()
  861. // Interrupt the goroutine
  862. sshConn.Close()
  863. <-resultChannel
  864. }
  865. if result.err != nil {
  866. return nil, common.ContextError(result.err)
  867. }
  868. // Update dial parameters determined during dial
  869. if dialStats != nil && meekConfig != nil {
  870. dialStats.MeekResolvedIPAddress = resolvedIPAddress.Load().(string)
  871. }
  872. NoticeConnectedServer(
  873. serverEntry.IpAddress,
  874. serverEntry.Region,
  875. selectedProtocol,
  876. dialStats)
  877. cleanupConn = nil
  878. // Note: dialConn may be used to close the underlying network connection
  879. // but should not be used to perform I/O as that would interfere with SSH
  880. // (and also bypasses throttling).
  881. return &dialResult{
  882. dialConn: dialConn,
  883. monitoredConn: monitoredConn,
  884. sshClient: result.sshClient,
  885. sshRequests: result.sshRequests,
  886. dialStats: dialStats},
  887. nil
  888. }
  889. func makeRandomPeriod(min, max time.Duration) time.Duration {
  890. period, err := common.MakeRandomPeriod(min, max)
  891. if err != nil {
  892. NoticeAlert("MakeRandomPeriod failed: %s", err)
  893. // Proceed without random period
  894. period = max
  895. }
  896. return period
  897. }
  898. // operateTunnel monitors the health of the tunnel and performs
  899. // periodic work.
  900. //
  901. // BytesTransferred and TotalBytesTransferred notices are emitted
  902. // for live reporting and diagnostics reporting, respectively.
  903. //
  904. // Status requests are sent to the Psiphon API to report bytes
  905. // transferred.
  906. //
  907. // Periodic SSH keep alive packets are sent to ensure the underlying
  908. // TCP connection isn't terminated by NAT, or other network
  909. // interference -- or test if it has been terminated while the device
  910. // has been asleep. When a keep alive times out, the tunnel is
  911. // considered failed.
  912. //
  913. // An immediate SSH keep alive "probe" is sent to test the tunnel and
  914. // server responsiveness when a port forward failure is detected: a
  915. // failed dial or failed read/write. This keep alive has a shorter
  916. // timeout.
  917. //
  918. // Note that port forward failures may be due to non-failure conditions.
  919. // For example, when the user inputs an invalid domain name and
  920. // resolution is done by the ssh server; or trying to connect to a
  921. // non-white-listed port; and the error message in these cases is not
  922. // distinguishable from a a true server error (a common error message,
  923. // "ssh: rejected: administratively prohibited (open failed)", may be
  924. // returned for these cases but also if the server has run out of
  925. // ephemeral ports, for example).
  926. //
  927. // SSH keep alives are not sent when the tunnel has been recently
  928. // active (not only does tunnel activity obviate the necessity of a keep
  929. // alive, testing has shown that keep alives may time out for "busy"
  930. // tunnels, especially over meek protocol and other high latency
  931. // conditions).
  932. //
  933. // "Recently active" is defined has having received payload bytes. Sent
  934. // bytes are not considered as testing has shown bytes may appear to
  935. // send when certain NAT devices have interfered with the tunnel, while
  936. // no bytes are received. In a pathological case, with DNS implemented
  937. // as tunneled UDP, a browser may wait excessively for a domain name to
  938. // resolve, while no new port forward is attempted which would otherwise
  939. // result in a tunnel failure detection.
  940. //
  941. // TODO: change "recently active" to include having received any
  942. // SSH protocol messages from the server, not just user payload?
  943. //
  944. func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
  945. defer tunnel.operateWaitGroup.Done()
  946. lastBytesReceivedTime := monotime.Now()
  947. lastTotalBytesTransferedTime := monotime.Now()
  948. totalSent := int64(0)
  949. totalReceived := int64(0)
  950. noticeBytesTransferredTicker := time.NewTicker(1 * time.Second)
  951. defer noticeBytesTransferredTicker.Stop()
  952. // The next status request and ssh keep alive times are picked at random,
  953. // from a range, to make the resulting traffic less fingerprintable,
  954. // Note: not using Tickers since these are not fixed time periods.
  955. nextStatusRequestPeriod := func() time.Duration {
  956. return makeRandomPeriod(
  957. PSIPHON_API_STATUS_REQUEST_PERIOD_MIN,
  958. PSIPHON_API_STATUS_REQUEST_PERIOD_MAX)
  959. }
  960. statsTimer := time.NewTimer(nextStatusRequestPeriod())
  961. defer statsTimer.Stop()
  962. // Schedule an almost-immediate status request to deliver any unreported
  963. // persistent stats.
  964. unreported := CountUnreportedPersistentStats()
  965. if unreported > 0 {
  966. NoticeInfo("Unreported persistent stats: %d", unreported)
  967. statsTimer.Reset(makeRandomPeriod(
  968. PSIPHON_API_STATUS_REQUEST_SHORT_PERIOD_MIN,
  969. PSIPHON_API_STATUS_REQUEST_SHORT_PERIOD_MAX))
  970. }
  971. nextSshKeepAlivePeriod := func() time.Duration {
  972. return makeRandomPeriod(
  973. TUNNEL_SSH_KEEP_ALIVE_PERIOD_MIN,
  974. TUNNEL_SSH_KEEP_ALIVE_PERIOD_MAX)
  975. }
  976. // TODO: don't initialize timer when config.DisablePeriodicSshKeepAlive is set
  977. sshKeepAliveTimer := time.NewTimer(nextSshKeepAlivePeriod())
  978. if tunnel.config.DisablePeriodicSshKeepAlive {
  979. sshKeepAliveTimer.Stop()
  980. } else {
  981. defer sshKeepAliveTimer.Stop()
  982. }
  983. // Perform network requests in separate goroutines so as not to block
  984. // other operations.
  985. requestsWaitGroup := new(sync.WaitGroup)
  986. requestsWaitGroup.Add(1)
  987. signalStatusRequest := make(chan struct{})
  988. go func() {
  989. defer requestsWaitGroup.Done()
  990. for range signalStatusRequest {
  991. sendStats(tunnel)
  992. }
  993. }()
  994. requestsWaitGroup.Add(1)
  995. signalSshKeepAlive := make(chan time.Duration)
  996. sshKeepAliveError := make(chan error, 1)
  997. go func() {
  998. defer requestsWaitGroup.Done()
  999. for timeout := range signalSshKeepAlive {
  1000. err := sendSshKeepAlive(tunnel.sshClient, tunnel.conn, timeout)
  1001. if err != nil {
  1002. select {
  1003. case sshKeepAliveError <- err:
  1004. default:
  1005. }
  1006. }
  1007. }
  1008. }()
  1009. requestsWaitGroup.Add(1)
  1010. signalStopClientVerificationRequests := make(chan struct{})
  1011. go func() {
  1012. defer requestsWaitGroup.Done()
  1013. clientVerificationRequestSuccess := true
  1014. clientVerificationPayload := ""
  1015. failCount := 0
  1016. for {
  1017. // TODO: use reflect.SelectCase?
  1018. if clientVerificationRequestSuccess == true {
  1019. failCount = 0
  1020. select {
  1021. case clientVerificationPayload = <-tunnel.newClientVerificationPayload:
  1022. case <-signalStopClientVerificationRequests:
  1023. return
  1024. }
  1025. } else {
  1026. // If sendClientVerification failed to send the payload we
  1027. // will retry after a delay. Will use a new payload instead
  1028. // if that arrives in the meantime.
  1029. // If failures count is more than PSIPHON_API_CLIENT_VERIFICATION_REQUEST_MAX_RETRIES
  1030. // stop retrying for this tunnel.
  1031. failCount += 1
  1032. if failCount > PSIPHON_API_CLIENT_VERIFICATION_REQUEST_MAX_RETRIES {
  1033. return
  1034. }
  1035. timer := time.NewTimer(PSIPHON_API_CLIENT_VERIFICATION_REQUEST_RETRY_PERIOD)
  1036. doReturn := false
  1037. select {
  1038. case <-timer.C:
  1039. case clientVerificationPayload = <-tunnel.newClientVerificationPayload:
  1040. case <-signalStopClientVerificationRequests:
  1041. doReturn = true
  1042. }
  1043. timer.Stop()
  1044. if doReturn {
  1045. return
  1046. }
  1047. }
  1048. clientVerificationRequestSuccess = sendClientVerification(tunnel, clientVerificationPayload)
  1049. }
  1050. }()
  1051. shutdown := false
  1052. var err error
  1053. for !shutdown && err == nil {
  1054. select {
  1055. case <-noticeBytesTransferredTicker.C:
  1056. sent, received := transferstats.ReportRecentBytesTransferredForServer(
  1057. tunnel.serverEntry.IpAddress)
  1058. if received > 0 {
  1059. lastBytesReceivedTime = monotime.Now()
  1060. }
  1061. totalSent += sent
  1062. totalReceived += received
  1063. if lastTotalBytesTransferedTime.Add(TOTAL_BYTES_TRANSFERRED_NOTICE_PERIOD).Before(monotime.Now()) {
  1064. NoticeTotalBytesTransferred(tunnel.serverEntry.IpAddress, totalSent, totalReceived)
  1065. lastTotalBytesTransferedTime = monotime.Now()
  1066. }
  1067. // Only emit the frequent BytesTransferred notice when tunnel is not idle.
  1068. if tunnel.config.EmitBytesTransferred && (sent > 0 || received > 0) {
  1069. NoticeBytesTransferred(tunnel.serverEntry.IpAddress, sent, received)
  1070. }
  1071. case <-statsTimer.C:
  1072. select {
  1073. case signalStatusRequest <- *new(struct{}):
  1074. default:
  1075. }
  1076. statsTimer.Reset(nextStatusRequestPeriod())
  1077. case <-sshKeepAliveTimer.C:
  1078. if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PERIODIC_INACTIVE_PERIOD).Before(monotime.Now()) {
  1079. select {
  1080. case signalSshKeepAlive <- time.Duration(*tunnel.config.TunnelSshKeepAlivePeriodicTimeoutSeconds) * time.Second:
  1081. default:
  1082. }
  1083. }
  1084. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  1085. case <-tunnel.signalPortForwardFailure:
  1086. // Note: no mutex on portForwardFailureTotal; only referenced here
  1087. tunnel.totalPortForwardFailures++
  1088. NoticeInfo("port forward failures for %s: %d",
  1089. tunnel.serverEntry.IpAddress, tunnel.totalPortForwardFailures)
  1090. // If the underlying Conn has closed (meek and other plugin protocols may close
  1091. // themselves in certain error conditions), the tunnel has certainly failed.
  1092. // Otherwise, probe with an SSH keep alive.
  1093. if tunnel.conn.IsClosed() {
  1094. err = errors.New("underlying conn is closed")
  1095. } else {
  1096. if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PROBE_INACTIVE_PERIOD).Before(monotime.Now()) {
  1097. select {
  1098. case signalSshKeepAlive <- time.Duration(*tunnel.config.TunnelSshKeepAliveProbeTimeoutSeconds) * time.Second:
  1099. default:
  1100. }
  1101. }
  1102. if !tunnel.config.DisablePeriodicSshKeepAlive {
  1103. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  1104. }
  1105. }
  1106. case err = <-sshKeepAliveError:
  1107. case serverRequest := <-tunnel.sshServerRequests:
  1108. if serverRequest != nil {
  1109. err := HandleServerRequest(tunnelOwner, tunnel, serverRequest.Type, serverRequest.Payload)
  1110. if err == nil {
  1111. serverRequest.Reply(true, nil)
  1112. } else {
  1113. NoticeAlert("HandleServerRequest for %s failed: %s", serverRequest.Type, err)
  1114. serverRequest.Reply(false, nil)
  1115. }
  1116. }
  1117. case <-tunnel.operateCtx.Done():
  1118. shutdown = true
  1119. }
  1120. }
  1121. close(signalSshKeepAlive)
  1122. close(signalStatusRequest)
  1123. close(signalStopClientVerificationRequests)
  1124. requestsWaitGroup.Wait()
  1125. // Capture bytes transferred since the last noticeBytesTransferredTicker tick
  1126. sent, received := transferstats.ReportRecentBytesTransferredForServer(tunnel.serverEntry.IpAddress)
  1127. totalSent += sent
  1128. totalReceived += received
  1129. // Always emit a final NoticeTotalBytesTransferred
  1130. NoticeTotalBytesTransferred(tunnel.serverEntry.IpAddress, totalSent, totalReceived)
  1131. // Tunnel does not have a serverContext when DisableApi is set.
  1132. if tunnel.serverContext != nil && !tunnel.IsDiscarded() {
  1133. // The stats for this tunnel will be reported via the next successful
  1134. // status request.
  1135. // Since client clocks are unreliable, we report the server's timestamp from
  1136. // the handshake response as the absolute tunnel start time. This time
  1137. // will be slightly earlier than the actual tunnel activation time, as the
  1138. // client has to receive and parse the response and activate the tunnel.
  1139. tunnelStartTime := tunnel.serverContext.serverHandshakeTimestamp
  1140. // For the tunnel duration calculation, we use the local clock. The start time
  1141. // is tunnel.establishedTime as recorded when the tunnel was established. For the
  1142. // end time, we do not use the current time as we may now be long past the
  1143. // actual termination time of the tunnel. For example, the host or device may
  1144. // have resumed after a long sleep (it's not clear that the monotonic clock service
  1145. // used to measure elapsed time will or will not stop during device sleep). Instead,
  1146. // we use the last data received time as the estimated tunnel end time.
  1147. //
  1148. // One potential issue with using the last received time is receiving data
  1149. // after an extended sleep because the device sleep occurred with data still in
  1150. // the OS socket read buffer. This is not expected to happen on Android, as the
  1151. // OS will wake a process when it has TCP data available to read. (For this reason,
  1152. // the actual long sleep issue is only with an idle tunnel; in this case the client
  1153. // is responsible for sending SSH keep alives but a device sleep will delay the
  1154. // golang SSH keep alive timer.)
  1155. //
  1156. // Idle tunnels will only read data when a SSH keep alive is sent. As a result,
  1157. // the last-received-time scheme can undercount tunnel durations by up to
  1158. // TUNNEL_SSH_KEEP_ALIVE_PERIOD_MAX for idle tunnels.
  1159. tunnelDuration := tunnel.conn.GetLastActivityMonotime().Sub(tunnel.establishedTime)
  1160. // tunnelDuration can be < 0 when tunnel.establishedTime is recorded after the
  1161. // last tunnel.conn.Read() succeeds. In that case, the last read would be the
  1162. // handshake response, so the tunnel had, essentially, no duration.
  1163. if tunnelDuration < 0 {
  1164. tunnelDuration = 0
  1165. }
  1166. err := RecordTunnelStat(
  1167. tunnel.serverContext.sessionId,
  1168. tunnel.serverContext.tunnelNumber,
  1169. tunnel.serverEntry.IpAddress,
  1170. fmt.Sprintf("%d", tunnel.establishDuration),
  1171. tunnelStartTime,
  1172. fmt.Sprintf("%d", tunnelDuration),
  1173. totalSent,
  1174. totalReceived)
  1175. if err != nil {
  1176. NoticeAlert("RecordTunnelStats failed: %s", common.ContextError(err))
  1177. }
  1178. }
  1179. if err == nil {
  1180. NoticeInfo("shutdown operate tunnel")
  1181. // Send a final status request in order to report any outstanding
  1182. // domain bytes transferred stats as well as to report session stats
  1183. // as soon as possible.
  1184. // This request will be interrupted when the tunnel is closed after
  1185. // TUNNEL_OPERATE_SHUTDOWN_TIMEOUT.
  1186. sendStats(tunnel)
  1187. } else {
  1188. NoticeAlert("operate tunnel error for %s: %s", tunnel.serverEntry.IpAddress, err)
  1189. tunnelOwner.SignalTunnelFailure(tunnel)
  1190. }
  1191. }
  1192. // sendSshKeepAlive is a helper which sends a keepalive@openssh.com request
  1193. // on the specified SSH connections and returns true of the request succeeds
  1194. // within a specified timeout. If the request fails, the associated conn is
  1195. // closed, which will terminate the associated tunnel.
  1196. func sendSshKeepAlive(
  1197. sshClient *ssh.Client, conn net.Conn, timeout time.Duration) error {
  1198. // Note: there is no request context since SSH requests cannot be
  1199. // interrupted directly. Closing the tunnel will interrupt the request.
  1200. // A timeout is set to unblock this function, but the goroutine may
  1201. // not exit until the tunnel is closed.
  1202. // Use a buffer of 1 as there are two senders and only one guaranteed receive.
  1203. errChannel := make(chan error, 1)
  1204. if timeout > 0 {
  1205. afterFunc := time.AfterFunc(timeout, func() {
  1206. errChannel <- errors.New("timed out")
  1207. })
  1208. defer afterFunc.Stop()
  1209. }
  1210. go func() {
  1211. // Random padding to frustrate fingerprinting
  1212. randomPadding, err := common.MakeSecureRandomPadding(0, TUNNEL_SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES)
  1213. if err != nil {
  1214. NoticeAlert("MakeSecureRandomPadding failed: %s", err)
  1215. // Proceed without random padding
  1216. randomPadding = make([]byte, 0)
  1217. }
  1218. // Note: reading a reply is important for last-received-time tunnel
  1219. // duration calculation.
  1220. _, _, err = sshClient.SendRequest("keepalive@openssh.com", true, randomPadding)
  1221. errChannel <- err
  1222. }()
  1223. err := <-errChannel
  1224. if err != nil {
  1225. sshClient.Close()
  1226. conn.Close()
  1227. }
  1228. return common.ContextError(err)
  1229. }
  1230. // sendStats is a helper for sending session stats to the server.
  1231. func sendStats(tunnel *Tunnel) bool {
  1232. // Tunnel does not have a serverContext when DisableApi is set
  1233. if tunnel.serverContext == nil {
  1234. return true
  1235. }
  1236. // Skip when tunnel is discarded
  1237. if tunnel.IsDiscarded() {
  1238. return true
  1239. }
  1240. err := tunnel.serverContext.DoStatusRequest(tunnel)
  1241. if err != nil {
  1242. NoticeAlert("DoStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
  1243. }
  1244. return err == nil
  1245. }
  1246. // sendClientVerification is a helper for sending a client verification request
  1247. // to the server.
  1248. func sendClientVerification(tunnel *Tunnel, clientVerificationPayload string) bool {
  1249. // Tunnel does not have a serverContext when DisableApi is set
  1250. if tunnel.serverContext == nil {
  1251. return true
  1252. }
  1253. // Skip when tunnel is discarded
  1254. if tunnel.IsDiscarded() {
  1255. return true
  1256. }
  1257. err := tunnel.serverContext.DoClientVerificationRequest(clientVerificationPayload, tunnel.serverEntry.IpAddress)
  1258. if err != nil {
  1259. NoticeAlert("DoClientVerificationRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
  1260. }
  1261. return err == nil
  1262. }