tunnel.go 43 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "context"
  23. "crypto/rand"
  24. "encoding/base64"
  25. "encoding/json"
  26. "errors"
  27. "fmt"
  28. "io"
  29. "io/ioutil"
  30. "net"
  31. "sync"
  32. "time"
  33. "github.com/Psiphon-Labs/goarista/monotime"
  34. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  35. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/ssh"
  36. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/marionette"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tapdance"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/transferstats"
  45. )
  46. // Tunneler specifies the interface required by components that use a tunnel.
  47. // Components which use this interface may be serviced by a single Tunnel instance,
  48. // or a Controller which manages a pool of tunnels, or any other object which
  49. // implements Tunneler.
  50. type Tunneler interface {
  51. // Dial creates a tunneled connection.
  52. //
  53. // alwaysTunnel indicates that the connection should always be tunneled. If this
  54. // is not set, the connection may be made directly, depending on split tunnel
  55. // classification, when that feature is supported and active.
  56. //
  57. // downstreamConn is an optional parameter which specifies a connection to be
  58. // explicitly closed when the Dialed connection is closed. For instance, this
  59. // is used to close downstreamConn App<->LocalProxy connections when the related
  60. // LocalProxy<->SshPortForward connections close.
  61. Dial(remoteAddr string, alwaysTunnel bool, downstreamConn net.Conn) (conn net.Conn, err error)
  62. DirectDial(remoteAddr string) (conn net.Conn, err error)
  63. SignalComponentFailure()
  64. }
  65. // TunnelOwner specifies the interface required by Tunnel to notify its
  66. // owner when it has failed. The owner may, as in the case of the Controller,
  67. // remove the tunnel from its list of active tunnels.
  68. type TunnelOwner interface {
  69. SignalSeededNewSLOK()
  70. SignalTunnelFailure(tunnel *Tunnel)
  71. }
  72. // Tunnel is a connection to a Psiphon server. An established
  73. // tunnel includes a network connection to the specified server
  74. // and an SSH session built on top of that transport.
  75. type Tunnel struct {
  76. mutex *sync.Mutex
  77. config *Config
  78. isActivated bool
  79. isDiscarded bool
  80. isClosed bool
  81. dialParams *DialParameters
  82. serverContext *ServerContext
  83. conn *common.ActivityMonitoredConn
  84. sshClient *ssh.Client
  85. sshServerRequests <-chan *ssh.Request
  86. operateWaitGroup *sync.WaitGroup
  87. operateCtx context.Context
  88. stopOperate context.CancelFunc
  89. signalPortForwardFailure chan struct{}
  90. totalPortForwardFailures int
  91. adjustedEstablishStartTime monotime.Time
  92. establishDuration time.Duration
  93. establishedTime monotime.Time
  94. }
  95. // getCustomClientParameters helpers wrap the verbose function call chain
  96. // required to get a current snapshot of the ClientParameters customized with
  97. // the dial parameters associated with a tunnel.
  98. func (tunnel *Tunnel) getCustomClientParameters() parameters.ClientParametersAccessor {
  99. return getCustomClientParameters(tunnel.config, tunnel.dialParams)
  100. }
  101. func getCustomClientParameters(
  102. config *Config, dialParams *DialParameters) parameters.ClientParametersAccessor {
  103. return config.GetClientParameters().GetCustom(dialParams.NetworkLatencyMultiplier)
  104. }
  105. // ConnectTunnel first makes a network transport connection to the
  106. // Psiphon server and then establishes an SSH client session on top of
  107. // that transport. The SSH server is authenticated using the public
  108. // key in the server entry.
  109. // Depending on the server's capabilities, the connection may use
  110. // plain SSH over TCP, obfuscated SSH over TCP, or obfuscated SSH over
  111. // HTTP (meek protocol).
  112. // When requiredProtocol is not blank, that protocol is used. Otherwise,
  113. // the a random supported protocol is used.
  114. //
  115. // Call Activate on a connected tunnel to complete its establishment
  116. // before using.
  117. //
  118. // Tunnel establishment is split into two phases: connection, and
  119. // activation. The Controller will run many ConnectTunnel calls
  120. // concurrently and then, to avoid unnecessary overhead from making
  121. // handshake requests and starting operateTunnel from tunnels which
  122. // may be discarded, call Activate on connected tunnels sequentially
  123. // as necessary.
  124. //
  125. func ConnectTunnel(
  126. ctx context.Context,
  127. config *Config,
  128. adjustedEstablishStartTime monotime.Time,
  129. dialParams *DialParameters) (*Tunnel, error) {
  130. // Build transport layers and establish SSH connection. Note that
  131. // dialConn and monitoredConn are the same network connection.
  132. dialResult, err := dialTunnel(ctx, config, dialParams)
  133. if err != nil {
  134. return nil, common.ContextError(err)
  135. }
  136. // The tunnel is now connected
  137. return &Tunnel{
  138. mutex: new(sync.Mutex),
  139. config: config,
  140. dialParams: dialParams,
  141. conn: dialResult.monitoredConn,
  142. sshClient: dialResult.sshClient,
  143. sshServerRequests: dialResult.sshRequests,
  144. // A buffer allows at least one signal to be sent even when the receiver is
  145. // not listening. Senders should not block.
  146. signalPortForwardFailure: make(chan struct{}, 1),
  147. adjustedEstablishStartTime: adjustedEstablishStartTime,
  148. }, nil
  149. }
  150. // Activate completes the tunnel establishment, performing the handshake
  151. // request and starting operateTunnel, the worker that monitors the tunnel
  152. // and handles periodic management.
  153. func (tunnel *Tunnel) Activate(
  154. ctx context.Context, tunnelOwner TunnelOwner) (retErr error) {
  155. // Ensure that, unless the context is cancelled, any replayed dial
  156. // parameters are cleared, no longer to be retried, if the tunnel fails to
  157. // activate.
  158. activationSucceeded := false
  159. defer func() {
  160. if !activationSucceeded && ctx.Err() == nil {
  161. tunnel.dialParams.Failed(tunnel.config)
  162. _ = RecordFailedTunnelStat(tunnel.config, tunnel.dialParams, retErr)
  163. }
  164. }()
  165. // Create a new Psiphon API server context for this tunnel. This includes
  166. // performing a handshake request. If the handshake fails, this activation
  167. // fails.
  168. var serverContext *ServerContext
  169. if !tunnel.config.DisableApi {
  170. NoticeInfo(
  171. "starting server context for %s",
  172. tunnel.dialParams.ServerEntry.GetDiagnosticID())
  173. // Call NewServerContext in a goroutine, as it blocks on a network operation,
  174. // the handshake request, and would block shutdown. If the shutdown signal is
  175. // received, close the tunnel, which will interrupt the handshake request
  176. // that may be blocking NewServerContext.
  177. //
  178. // Timeout after PsiphonApiServerTimeoutSeconds. NewServerContext may not
  179. // return if the tunnel network connection is unstable during the handshake
  180. // request. At this point, there is no operateTunnel monitor that will detect
  181. // this condition with SSH keep alives.
  182. timeout := tunnel.getCustomClientParameters().Duration(
  183. parameters.PsiphonAPIRequestTimeout)
  184. if timeout > 0 {
  185. var cancelFunc context.CancelFunc
  186. ctx, cancelFunc = context.WithTimeout(ctx, timeout)
  187. defer cancelFunc()
  188. }
  189. type newServerContextResult struct {
  190. serverContext *ServerContext
  191. err error
  192. }
  193. resultChannel := make(chan newServerContextResult)
  194. go func() {
  195. serverContext, err := NewServerContext(tunnel)
  196. resultChannel <- newServerContextResult{
  197. serverContext: serverContext,
  198. err: err,
  199. }
  200. }()
  201. var result newServerContextResult
  202. select {
  203. case result = <-resultChannel:
  204. case <-ctx.Done():
  205. result.err = ctx.Err()
  206. // Interrupt the goroutine
  207. tunnel.Close(true)
  208. <-resultChannel
  209. }
  210. if result.err != nil {
  211. return common.ContextError(
  212. fmt.Errorf("error starting server context for %s: %s",
  213. tunnel.dialParams.ServerEntry.GetDiagnosticID(), result.err))
  214. }
  215. serverContext = result.serverContext
  216. }
  217. // The activation succeeded.
  218. activationSucceeded = true
  219. tunnel.mutex.Lock()
  220. // It may happen that the tunnel gets closed while Activate is running.
  221. // In this case, abort here, to ensure that the operateTunnel goroutine
  222. // will not be launched after Close is called.
  223. if tunnel.isClosed {
  224. return common.ContextError(errors.New("tunnel is closed"))
  225. }
  226. tunnel.isActivated = true
  227. tunnel.serverContext = serverContext
  228. // establishDuration is the elapsed time between the controller starting tunnel
  229. // establishment and this tunnel being established. The reported value represents
  230. // how long the user waited between starting the client and having a usable tunnel;
  231. // or how long between the client detecting an unexpected tunnel disconnect and
  232. // completing automatic reestablishment.
  233. //
  234. // This time period may include time spent unsuccessfully connecting to other
  235. // servers. Time spent waiting for network connectivity is excluded.
  236. tunnel.establishDuration = monotime.Since(tunnel.adjustedEstablishStartTime)
  237. tunnel.establishedTime = monotime.Now()
  238. // Use the Background context instead of the controller run context, as tunnels
  239. // are terminated when the controller calls tunnel.Close.
  240. tunnel.operateCtx, tunnel.stopOperate = context.WithCancel(context.Background())
  241. tunnel.operateWaitGroup = new(sync.WaitGroup)
  242. // Spawn the operateTunnel goroutine, which monitors the tunnel and handles periodic
  243. // stats updates.
  244. tunnel.operateWaitGroup.Add(1)
  245. go tunnel.operateTunnel(tunnelOwner)
  246. tunnel.mutex.Unlock()
  247. return nil
  248. }
  249. // Close stops operating the tunnel and closes the underlying connection.
  250. // Supports multiple and/or concurrent calls to Close().
  251. // When isDiscarded is set, operateTunnel will not attempt to send final
  252. // status requests.
  253. func (tunnel *Tunnel) Close(isDiscarded bool) {
  254. tunnel.mutex.Lock()
  255. tunnel.isDiscarded = isDiscarded
  256. isActivated := tunnel.isActivated
  257. isClosed := tunnel.isClosed
  258. tunnel.isClosed = true
  259. tunnel.mutex.Unlock()
  260. if !isClosed {
  261. // Signal operateTunnel to stop before closing the tunnel -- this
  262. // allows a final status request to be made in the case of an orderly
  263. // shutdown.
  264. // A timer is set, so if operateTunnel takes too long to stop, the
  265. // tunnel is closed, which will interrupt any slow final status request.
  266. if isActivated {
  267. timeout := tunnel.getCustomClientParameters().Duration(
  268. parameters.TunnelOperateShutdownTimeout)
  269. afterFunc := time.AfterFunc(
  270. timeout,
  271. func() { tunnel.conn.Close() })
  272. tunnel.stopOperate()
  273. tunnel.operateWaitGroup.Wait()
  274. afterFunc.Stop()
  275. }
  276. tunnel.sshClient.Close()
  277. // tunnel.conn.Close() may get called multiple times, which is allowed.
  278. tunnel.conn.Close()
  279. err := tunnel.sshClient.Wait()
  280. if err != nil {
  281. NoticeAlert("close tunnel ssh error: %s", err)
  282. }
  283. }
  284. }
  285. // IsActivated returns the tunnel's activated flag.
  286. func (tunnel *Tunnel) IsActivated() bool {
  287. tunnel.mutex.Lock()
  288. defer tunnel.mutex.Unlock()
  289. return tunnel.isActivated
  290. }
  291. // IsDiscarded returns the tunnel's discarded flag.
  292. func (tunnel *Tunnel) IsDiscarded() bool {
  293. tunnel.mutex.Lock()
  294. defer tunnel.mutex.Unlock()
  295. return tunnel.isDiscarded
  296. }
  297. // SendAPIRequest sends an API request as an SSH request through the tunnel.
  298. // This function blocks awaiting a response. Only one request may be in-flight
  299. // at once; a concurrent SendAPIRequest will block until an active request
  300. // receives its response (or the SSH connection is terminated).
  301. func (tunnel *Tunnel) SendAPIRequest(
  302. name string, requestPayload []byte) ([]byte, error) {
  303. ok, responsePayload, err := tunnel.sshClient.Conn.SendRequest(
  304. name, true, requestPayload)
  305. if err != nil {
  306. return nil, common.ContextError(err)
  307. }
  308. if !ok {
  309. return nil, common.ContextError(errors.New("API request rejected"))
  310. }
  311. return responsePayload, nil
  312. }
  313. // Dial establishes a port forward connection through the tunnel
  314. // This Dial doesn't support split tunnel, so alwaysTunnel is not referenced
  315. func (tunnel *Tunnel) Dial(
  316. remoteAddr string, alwaysTunnel bool, downstreamConn net.Conn) (conn net.Conn, err error) {
  317. if !tunnel.IsActivated() {
  318. return nil, common.ContextError(errors.New("tunnel is not activated"))
  319. }
  320. type tunnelDialResult struct {
  321. sshPortForwardConn net.Conn
  322. err error
  323. }
  324. // Note: there is no dial context since SSH port forward dials cannot
  325. // be interrupted directly. Closing the tunnel will interrupt the dials.
  326. // A timeout is set to unblock this function, but the goroutine may
  327. // not exit until the tunnel is closed.
  328. // Use a buffer of 1 as there are two senders and only one guaranteed receive.
  329. resultChannel := make(chan *tunnelDialResult, 1)
  330. timeout := tunnel.getCustomClientParameters().Duration(
  331. parameters.TunnelPortForwardDialTimeout)
  332. afterFunc := time.AfterFunc(
  333. timeout,
  334. func() {
  335. resultChannel <- &tunnelDialResult{nil, errors.New("tunnel dial timeout")}
  336. })
  337. defer afterFunc.Stop()
  338. go func() {
  339. sshPortForwardConn, err := tunnel.sshClient.Dial("tcp", remoteAddr)
  340. resultChannel <- &tunnelDialResult{sshPortForwardConn, err}
  341. }()
  342. result := <-resultChannel
  343. if result.err != nil {
  344. // TODO: conditional on type of error or error message?
  345. select {
  346. case tunnel.signalPortForwardFailure <- *new(struct{}):
  347. default:
  348. }
  349. return nil, common.ContextError(result.err)
  350. }
  351. conn = &TunneledConn{
  352. Conn: result.sshPortForwardConn,
  353. tunnel: tunnel,
  354. downstreamConn: downstreamConn}
  355. return tunnel.wrapWithTransferStats(conn), nil
  356. }
  357. func (tunnel *Tunnel) DialPacketTunnelChannel() (net.Conn, error) {
  358. if !tunnel.IsActivated() {
  359. return nil, common.ContextError(errors.New("tunnel is not activated"))
  360. }
  361. channel, requests, err := tunnel.sshClient.OpenChannel(
  362. protocol.PACKET_TUNNEL_CHANNEL_TYPE, nil)
  363. if err != nil {
  364. // TODO: conditional on type of error or error message?
  365. select {
  366. case tunnel.signalPortForwardFailure <- *new(struct{}):
  367. default:
  368. }
  369. return nil, common.ContextError(err)
  370. }
  371. go ssh.DiscardRequests(requests)
  372. conn := newChannelConn(channel)
  373. // wrapWithTransferStats will track bytes transferred for the
  374. // packet tunnel. It will count packet overhead (TCP/UDP/IP headers).
  375. //
  376. // Since the data in the channel is not HTTP or TLS, no domain bytes
  377. // counting is expected.
  378. //
  379. // transferstats are also used to determine that there's been recent
  380. // activity and skip periodic SSH keep alives; see Tunnel.operateTunnel.
  381. return tunnel.wrapWithTransferStats(conn), nil
  382. }
  383. func (tunnel *Tunnel) wrapWithTransferStats(conn net.Conn) net.Conn {
  384. // Tunnel does not have a serverContext when DisableApi is set. We still use
  385. // transferstats.Conn to count bytes transferred for monitoring tunnel
  386. // quality.
  387. var regexps *transferstats.Regexps
  388. if tunnel.serverContext != nil {
  389. regexps = tunnel.serverContext.StatsRegexps()
  390. }
  391. return transferstats.NewConn(
  392. conn, tunnel.dialParams.ServerEntry.IpAddress, regexps)
  393. }
  394. // SignalComponentFailure notifies the tunnel that an associated component has failed.
  395. // This will terminate the tunnel.
  396. func (tunnel *Tunnel) SignalComponentFailure() {
  397. NoticeAlert("tunnel received component failure signal")
  398. tunnel.Close(false)
  399. }
  400. // TunneledConn implements net.Conn and wraps a port forward connection.
  401. // It is used to hook into Read and Write to observe I/O errors and
  402. // report these errors back to the tunnel monitor as port forward failures.
  403. // TunneledConn optionally tracks a peer connection to be explicitly closed
  404. // when the TunneledConn is closed.
  405. type TunneledConn struct {
  406. net.Conn
  407. tunnel *Tunnel
  408. downstreamConn net.Conn
  409. }
  410. func (conn *TunneledConn) Read(buffer []byte) (n int, err error) {
  411. n, err = conn.Conn.Read(buffer)
  412. if err != nil && err != io.EOF {
  413. // Report new failure. Won't block; assumes the receiver
  414. // has a sufficient buffer for the threshold number of reports.
  415. // TODO: conditional on type of error or error message?
  416. select {
  417. case conn.tunnel.signalPortForwardFailure <- *new(struct{}):
  418. default:
  419. }
  420. }
  421. return
  422. }
  423. func (conn *TunneledConn) Write(buffer []byte) (n int, err error) {
  424. n, err = conn.Conn.Write(buffer)
  425. if err != nil && err != io.EOF {
  426. // Same as TunneledConn.Read()
  427. select {
  428. case conn.tunnel.signalPortForwardFailure <- *new(struct{}):
  429. default:
  430. }
  431. }
  432. return
  433. }
  434. func (conn *TunneledConn) Close() error {
  435. if conn.downstreamConn != nil {
  436. conn.downstreamConn.Close()
  437. }
  438. return conn.Conn.Close()
  439. }
  440. type dialResult struct {
  441. dialConn net.Conn
  442. monitoredConn *common.ActivityMonitoredConn
  443. sshClient *ssh.Client
  444. sshRequests <-chan *ssh.Request
  445. }
  446. // dialTunnel is a helper that builds the transport layers and establishes the
  447. // SSH connection. When additional dial configuration is used, dial metrics
  448. // are recorded and returned.
  449. //
  450. // The net.Conn return value is the value to be removed from pendingConns;
  451. // additional layering (ThrottledConn, ActivityMonitoredConn) is applied, but
  452. // this return value is the base dial conn. The *ActivityMonitoredConn return
  453. // value is the layered conn passed into the ssh.Client.
  454. func dialTunnel(
  455. ctx context.Context,
  456. config *Config,
  457. dialParams *DialParameters) (_ *dialResult, retErr error) {
  458. // Return immediately when overall context is canceled or timed-out. This
  459. // avoids notice noise.
  460. err := ctx.Err()
  461. if err != nil {
  462. return nil, common.ContextError(err)
  463. }
  464. p := getCustomClientParameters(config, dialParams)
  465. timeout := p.Duration(parameters.TunnelConnectTimeout)
  466. rateLimits := p.RateLimits(parameters.TunnelRateLimits)
  467. obfuscatedSSHMinPadding := p.Int(parameters.ObfuscatedSSHMinPadding)
  468. obfuscatedSSHMaxPadding := p.Int(parameters.ObfuscatedSSHMaxPadding)
  469. livenessTestMinUpstreamBytes := p.Int(parameters.LivenessTestMinUpstreamBytes)
  470. livenessTestMaxUpstreamBytes := p.Int(parameters.LivenessTestMaxUpstreamBytes)
  471. livenessTestMinDownstreamBytes := p.Int(parameters.LivenessTestMinDownstreamBytes)
  472. livenessTestMaxDownstreamBytes := p.Int(parameters.LivenessTestMaxDownstreamBytes)
  473. p.Close()
  474. // Ensure that, unless the base context is cancelled, any replayed dial
  475. // parameters are cleared, no longer to be retried, if the tunnel fails to
  476. // connect.
  477. //
  478. // Limitation: dials that fail to connect due to the server being in a
  479. // load-limiting state are not distinguished and excepted from this
  480. // logic.
  481. dialSucceeded := false
  482. baseCtx := ctx
  483. defer func() {
  484. if !dialSucceeded && baseCtx.Err() == nil {
  485. dialParams.Failed(config)
  486. _ = RecordFailedTunnelStat(config, dialParams, retErr)
  487. }
  488. }()
  489. var cancelFunc context.CancelFunc
  490. ctx, cancelFunc = context.WithTimeout(ctx, timeout)
  491. defer cancelFunc()
  492. // DialDuration is the elapsed time for both successful and failed tunnel
  493. // dials. For successful tunnels, it includes any the network protocol
  494. // handshake(s), obfuscation protocol handshake(s), SSH handshake, and
  495. // liveness test, when performed.
  496. //
  497. // Note: ensure DialDuration is set before calling any function which logs
  498. // dial_duration.
  499. startDialTime := monotime.Now()
  500. defer func() {
  501. dialParams.DialDuration = monotime.Since(startDialTime)
  502. }()
  503. // Note: dialParams.MeekResolvedIPAddress isn't set until the dial begins,
  504. // so it will always be blank in NoticeConnectingServer.
  505. NoticeConnectingServer(dialParams)
  506. // Create the base transport: meek or direct connection
  507. var dialConn net.Conn
  508. if protocol.TunnelProtocolUsesMeek(dialParams.TunnelProtocol) {
  509. dialConn, err = DialMeek(
  510. ctx,
  511. dialParams.GetMeekConfig(),
  512. dialParams.GetDialConfig())
  513. if err != nil {
  514. return nil, common.ContextError(err)
  515. }
  516. } else if protocol.TunnelProtocolUsesQUIC(dialParams.TunnelProtocol) {
  517. packetConn, remoteAddr, err := NewUDPConn(
  518. ctx,
  519. dialParams.DirectDialAddress,
  520. dialParams.GetDialConfig())
  521. if err != nil {
  522. return nil, common.ContextError(err)
  523. }
  524. dialConn, err = quic.Dial(
  525. ctx,
  526. packetConn,
  527. remoteAddr,
  528. dialParams.QUICDialSNIAddress,
  529. dialParams.QUICVersion,
  530. dialParams.ServerEntry.SshObfuscatedKey,
  531. dialParams.ObfuscatedQUICPaddingSeed)
  532. if err != nil {
  533. return nil, common.ContextError(err)
  534. }
  535. } else if protocol.TunnelProtocolUsesMarionette(dialParams.TunnelProtocol) {
  536. dialConn, err = marionette.Dial(
  537. ctx,
  538. NewNetDialer(dialParams.GetDialConfig()),
  539. dialParams.ServerEntry.MarionetteFormat,
  540. dialParams.DirectDialAddress)
  541. if err != nil {
  542. return nil, common.ContextError(err)
  543. }
  544. } else if protocol.TunnelProtocolUsesTapdance(dialParams.TunnelProtocol) {
  545. dialConn, err = tapdance.Dial(
  546. ctx,
  547. config.EmitTapdanceLogs,
  548. config.DataStoreDirectory,
  549. NewNetDialer(dialParams.GetDialConfig()),
  550. dialParams.DirectDialAddress)
  551. if err != nil {
  552. return nil, common.ContextError(err)
  553. }
  554. } else {
  555. dialConn, err = DialTCP(
  556. ctx,
  557. dialParams.DirectDialAddress,
  558. dialParams.GetDialConfig())
  559. if err != nil {
  560. return nil, common.ContextError(err)
  561. }
  562. }
  563. // Some conns report additional metrics. fragmentor.Conns report
  564. // fragmentor configs.
  565. //
  566. // Limitation: for meek, GetMetrics from underlying fragmentor.Conn(s)
  567. // should be called in order to log fragmentor metrics for meek sessions.
  568. if metricsSource, ok := dialConn.(common.MetricsSource); ok {
  569. dialParams.DialConnMetrics = metricsSource
  570. }
  571. // If dialConn is not a Closer, tunnel failure detection may be slower
  572. if _, ok := dialConn.(common.Closer); !ok {
  573. NoticeAlert("tunnel.dialTunnel: dialConn is not a Closer")
  574. }
  575. cleanupConn := dialConn
  576. defer func() {
  577. // Cleanup on error
  578. if cleanupConn != nil {
  579. cleanupConn.Close()
  580. }
  581. }()
  582. // Activity monitoring is used to measure tunnel duration
  583. monitoredConn, err := common.NewActivityMonitoredConn(dialConn, 0, false, nil, nil)
  584. if err != nil {
  585. return nil, common.ContextError(err)
  586. }
  587. // Apply throttling (if configured)
  588. throttledConn := common.NewThrottledConn(
  589. monitoredConn,
  590. rateLimits)
  591. // Add obfuscated SSH layer
  592. var sshConn net.Conn = throttledConn
  593. if protocol.TunnelProtocolUsesObfuscatedSSH(dialParams.TunnelProtocol) {
  594. obfuscatedSSHConn, err := obfuscator.NewObfuscatedSSHConn(
  595. obfuscator.OBFUSCATION_CONN_MODE_CLIENT,
  596. throttledConn,
  597. dialParams.ServerEntry.SshObfuscatedKey,
  598. dialParams.ObfuscatorPaddingSeed,
  599. &obfuscatedSSHMinPadding,
  600. &obfuscatedSSHMaxPadding)
  601. if err != nil {
  602. return nil, common.ContextError(err)
  603. }
  604. sshConn = obfuscatedSSHConn
  605. dialParams.ObfuscatedSSHConnMetrics = obfuscatedSSHConn
  606. }
  607. // Now establish the SSH session over the conn transport
  608. expectedPublicKey, err := base64.StdEncoding.DecodeString(
  609. dialParams.ServerEntry.SshHostKey)
  610. if err != nil {
  611. return nil, common.ContextError(err)
  612. }
  613. sshCertChecker := &ssh.CertChecker{
  614. HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
  615. if !bytes.Equal(expectedPublicKey, publicKey.Marshal()) {
  616. return common.ContextError(errors.New("unexpected host public key"))
  617. }
  618. return nil
  619. },
  620. }
  621. sshPasswordPayload := &protocol.SSHPasswordPayload{
  622. SessionId: config.SessionID,
  623. SshPassword: dialParams.ServerEntry.SshPassword,
  624. ClientCapabilities: []string{protocol.CLIENT_CAPABILITY_SERVER_REQUESTS},
  625. }
  626. payload, err := json.Marshal(sshPasswordPayload)
  627. if err != nil {
  628. return nil, common.ContextError(err)
  629. }
  630. sshClientConfig := &ssh.ClientConfig{
  631. User: dialParams.ServerEntry.SshUsername,
  632. Auth: []ssh.AuthMethod{
  633. ssh.Password(string(payload)),
  634. },
  635. HostKeyCallback: sshCertChecker.CheckHostKey,
  636. ClientVersion: dialParams.SSHClientVersion,
  637. }
  638. sshClientConfig.KEXPRNGSeed = dialParams.SSHKEXSeed
  639. if protocol.TunnelProtocolUsesObfuscatedSSH(dialParams.TunnelProtocol) {
  640. if config.ObfuscatedSSHAlgorithms != nil {
  641. sshClientConfig.KeyExchanges = []string{config.ObfuscatedSSHAlgorithms[0]}
  642. sshClientConfig.Ciphers = []string{config.ObfuscatedSSHAlgorithms[1]}
  643. sshClientConfig.MACs = []string{config.ObfuscatedSSHAlgorithms[2]}
  644. sshClientConfig.HostKeyAlgorithms = []string{config.ObfuscatedSSHAlgorithms[3]}
  645. } else {
  646. // With Encrypt-then-MAC hash algorithms, packet length is
  647. // transmitted in plaintext, which aids in traffic analysis.
  648. //
  649. // TUNNEL_PROTOCOL_SSH is excepted since its KEX appears in plaintext,
  650. // and the protocol is intended to look like SSH on the wire.
  651. sshClientConfig.NoEncryptThenMACHash = true
  652. }
  653. } else {
  654. // For TUNNEL_PROTOCOL_SSH only, the server is expected to randomize
  655. // its KEX; setting PeerKEXPRNGSeed will ensure successful negotiation
  656. // betweem two randomized KEXes.
  657. if dialParams.ServerEntry.SshObfuscatedKey != "" {
  658. sshClientConfig.PeerKEXPRNGSeed, err = protocol.DeriveSSHServerKEXPRNGSeed(
  659. dialParams.ServerEntry.SshObfuscatedKey)
  660. if err != nil {
  661. return nil, common.ContextError(err)
  662. }
  663. }
  664. }
  665. // The ssh session establishment (via ssh.NewClientConn) is wrapped
  666. // in a timeout to ensure it won't hang. We've encountered firewalls
  667. // that allow the TCP handshake to complete but then send a RST to the
  668. // server-side and nothing to the client-side, and if that happens
  669. // while ssh.NewClientConn is reading, it may wait forever. The timeout
  670. // closes the conn, which interrupts it.
  671. // Note: TCP handshake timeouts are provided by TCPConn, and session
  672. // timeouts *after* ssh establishment are provided by the ssh keep alive
  673. // in operate tunnel.
  674. type sshNewClientResult struct {
  675. sshClient *ssh.Client
  676. sshRequests <-chan *ssh.Request
  677. err error
  678. }
  679. resultChannel := make(chan sshNewClientResult)
  680. // Call NewClientConn in a goroutine, as it blocks on SSH handshake network
  681. // operations, and would block canceling or shutdown. If the parent context
  682. // is canceled, close the net.Conn underlying SSH, which will interrupt the
  683. // SSH handshake that may be blocking NewClientConn.
  684. go func() {
  685. // The following is adapted from ssh.Dial(), here using a custom conn
  686. // The sshAddress is passed through to host key verification callbacks; we don't use it.
  687. sshAddress := ""
  688. sshClientConn, sshChannels, sshRequests, err := ssh.NewClientConn(
  689. sshConn, sshAddress, sshClientConfig)
  690. var sshClient *ssh.Client
  691. if err == nil {
  692. // sshRequests is handled by operateTunnel.
  693. // ssh.NewClient also expects to handle the sshRequests
  694. // value from ssh.NewClientConn and will spawn a goroutine
  695. // to handle the <-chan *ssh.Request, so we must provide
  696. // a closed channel to ensure that goroutine halts instead
  697. // of hanging on a nil channel.
  698. noRequests := make(chan *ssh.Request)
  699. close(noRequests)
  700. sshClient = ssh.NewClient(sshClientConn, sshChannels, noRequests)
  701. if livenessTestMaxUpstreamBytes > 0 || livenessTestMaxDownstreamBytes > 0 {
  702. // When configured, perform a liveness test which sends and
  703. // receives bytes through the tunnel to ensure the tunnel had
  704. // not been blocked upon or shortly after connecting. This
  705. // test is performed concurrently for each establishment
  706. // candidate before selecting a successful tunnel.
  707. //
  708. // Note that the liveness test is subject to the
  709. // TunnelConnectTimeout, which should be adjusted
  710. // accordinging.
  711. var metrics *livenessTestMetrics
  712. metrics, err = performLivenessTest(
  713. sshClient,
  714. livenessTestMinUpstreamBytes, livenessTestMaxUpstreamBytes,
  715. livenessTestMinDownstreamBytes, livenessTestMaxDownstreamBytes,
  716. dialParams.LivenessTestSeed)
  717. // Skip notice when cancelling.
  718. if baseCtx.Err() == nil {
  719. NoticeLivenessTest(
  720. dialParams.ServerEntry.GetDiagnosticID(), metrics, err == nil)
  721. }
  722. }
  723. }
  724. resultChannel <- sshNewClientResult{sshClient, sshRequests, err}
  725. }()
  726. var result sshNewClientResult
  727. select {
  728. case result = <-resultChannel:
  729. case <-ctx.Done():
  730. // Interrupt the goroutine and capture its error context to
  731. // distinguish point of failure.
  732. err := ctx.Err()
  733. sshConn.Close()
  734. result = <-resultChannel
  735. if result.err != nil {
  736. result.err = fmt.Errorf("%s: %s", err, result.err)
  737. } else {
  738. result.err = err
  739. }
  740. }
  741. if result.err != nil {
  742. return nil, common.ContextError(result.err)
  743. }
  744. dialSucceeded = true
  745. NoticeConnectedServer(dialParams)
  746. cleanupConn = nil
  747. // Note: dialConn may be used to close the underlying network connection
  748. // but should not be used to perform I/O as that would interfere with SSH
  749. // (and also bypasses throttling).
  750. return &dialResult{
  751. dialConn: dialConn,
  752. monitoredConn: monitoredConn,
  753. sshClient: result.sshClient,
  754. sshRequests: result.sshRequests},
  755. nil
  756. }
  757. // Fields are exported for JSON encoding in NoticeLivenessTest.
  758. type livenessTestMetrics struct {
  759. Duration string
  760. UpstreamBytes int
  761. SentUpstreamBytes int
  762. DownstreamBytes int
  763. ReceivedDownstreamBytes int
  764. }
  765. func performLivenessTest(
  766. sshClient *ssh.Client,
  767. minUpstreamBytes, maxUpstreamBytes int,
  768. minDownstreamBytes, maxDownstreamBytes int,
  769. livenessTestPRNGSeed *prng.Seed) (*livenessTestMetrics, error) {
  770. metrics := new(livenessTestMetrics)
  771. defer func(startTime monotime.Time) {
  772. metrics.Duration = fmt.Sprintf("%s", monotime.Since(startTime))
  773. }(monotime.Now())
  774. PRNG := prng.NewPRNGWithSeed(livenessTestPRNGSeed)
  775. metrics.UpstreamBytes = PRNG.Range(minUpstreamBytes, maxUpstreamBytes)
  776. metrics.DownstreamBytes = PRNG.Range(minDownstreamBytes, maxDownstreamBytes)
  777. request := &protocol.RandomStreamRequest{
  778. UpstreamBytes: metrics.UpstreamBytes,
  779. DownstreamBytes: metrics.DownstreamBytes,
  780. }
  781. extraData, err := json.Marshal(request)
  782. if err != nil {
  783. return metrics, common.ContextError(err)
  784. }
  785. channel, requests, err := sshClient.OpenChannel(
  786. protocol.RANDOM_STREAM_CHANNEL_TYPE, extraData)
  787. if err != nil {
  788. return metrics, common.ContextError(err)
  789. }
  790. defer channel.Close()
  791. go ssh.DiscardRequests(requests)
  792. // In consideration of memory-constrained environments, use a modest-sized
  793. // copy buffer since many tunnel establishment workers may run the
  794. // liveness test concurrently.
  795. var buffer [8192]byte
  796. if metrics.UpstreamBytes > 0 {
  797. n, err := common.CopyNBuffer(channel, rand.Reader, int64(metrics.UpstreamBytes), buffer[:])
  798. metrics.SentUpstreamBytes = int(n)
  799. if err != nil {
  800. return metrics, common.ContextError(err)
  801. }
  802. }
  803. if metrics.DownstreamBytes > 0 {
  804. n, err := common.CopyNBuffer(ioutil.Discard, channel, int64(metrics.DownstreamBytes), buffer[:])
  805. metrics.ReceivedDownstreamBytes = int(n)
  806. if err != nil {
  807. return metrics, common.ContextError(err)
  808. }
  809. }
  810. return metrics, nil
  811. }
  812. // operateTunnel monitors the health of the tunnel and performs
  813. // periodic work.
  814. //
  815. // BytesTransferred and TotalBytesTransferred notices are emitted
  816. // for live reporting and diagnostics reporting, respectively.
  817. //
  818. // Status requests are sent to the Psiphon API to report bytes
  819. // transferred.
  820. //
  821. // Periodic SSH keep alive packets are sent to ensure the underlying
  822. // TCP connection isn't terminated by NAT, or other network
  823. // interference -- or test if it has been terminated while the device
  824. // has been asleep. When a keep alive times out, the tunnel is
  825. // considered failed.
  826. //
  827. // An immediate SSH keep alive "probe" is sent to test the tunnel and
  828. // server responsiveness when a port forward failure is detected: a
  829. // failed dial or failed read/write. This keep alive has a shorter
  830. // timeout.
  831. //
  832. // Note that port forward failures may be due to non-failure conditions.
  833. // For example, when the user inputs an invalid domain name and
  834. // resolution is done by the ssh server; or trying to connect to a
  835. // non-white-listed port; and the error message in these cases is not
  836. // distinguishable from a a true server error (a common error message,
  837. // "ssh: rejected: administratively prohibited (open failed)", may be
  838. // returned for these cases but also if the server has run out of
  839. // ephemeral ports, for example).
  840. //
  841. // SSH keep alives are not sent when the tunnel has been recently
  842. // active (not only does tunnel activity obviate the necessity of a keep
  843. // alive, testing has shown that keep alives may time out for "busy"
  844. // tunnels, especially over meek protocol and other high latency
  845. // conditions).
  846. //
  847. // "Recently active" is defined has having received payload bytes. Sent
  848. // bytes are not considered as testing has shown bytes may appear to
  849. // send when certain NAT devices have interfered with the tunnel, while
  850. // no bytes are received. In a pathological case, with DNS implemented
  851. // as tunneled UDP, a browser may wait excessively for a domain name to
  852. // resolve, while no new port forward is attempted which would otherwise
  853. // result in a tunnel failure detection.
  854. //
  855. // TODO: change "recently active" to include having received any
  856. // SSH protocol messages from the server, not just user payload?
  857. //
  858. func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
  859. defer tunnel.operateWaitGroup.Done()
  860. now := monotime.Now()
  861. lastBytesReceivedTime := now
  862. lastTotalBytesTransferedTime := now
  863. totalSent := int64(0)
  864. totalReceived := int64(0)
  865. setDialParamsSucceeded := false
  866. noticeBytesTransferredTicker := time.NewTicker(1 * time.Second)
  867. defer noticeBytesTransferredTicker.Stop()
  868. // The next status request and ssh keep alive times are picked at random,
  869. // from a range, to make the resulting traffic less fingerprintable,
  870. // Note: not using Tickers since these are not fixed time periods.
  871. nextStatusRequestPeriod := func() time.Duration {
  872. p := tunnel.getCustomClientParameters()
  873. return prng.Period(
  874. p.Duration(parameters.PsiphonAPIStatusRequestPeriodMin),
  875. p.Duration(parameters.PsiphonAPIStatusRequestPeriodMax))
  876. }
  877. statsTimer := time.NewTimer(nextStatusRequestPeriod())
  878. defer statsTimer.Stop()
  879. // Schedule an almost-immediate status request to deliver any unreported
  880. // persistent stats.
  881. unreported := CountUnreportedPersistentStats()
  882. if unreported > 0 {
  883. NoticeInfo("Unreported persistent stats: %d", unreported)
  884. p := tunnel.getCustomClientParameters()
  885. statsTimer.Reset(
  886. prng.Period(
  887. p.Duration(parameters.PsiphonAPIStatusRequestShortPeriodMin),
  888. p.Duration(parameters.PsiphonAPIStatusRequestShortPeriodMax)))
  889. }
  890. nextSshKeepAlivePeriod := func() time.Duration {
  891. p := tunnel.getCustomClientParameters()
  892. return prng.Period(
  893. p.Duration(parameters.SSHKeepAlivePeriodMin),
  894. p.Duration(parameters.SSHKeepAlivePeriodMax))
  895. }
  896. // TODO: don't initialize timer when config.DisablePeriodicSshKeepAlive is set
  897. sshKeepAliveTimer := time.NewTimer(nextSshKeepAlivePeriod())
  898. if tunnel.config.DisablePeriodicSshKeepAlive {
  899. sshKeepAliveTimer.Stop()
  900. } else {
  901. defer sshKeepAliveTimer.Stop()
  902. }
  903. // Perform network requests in separate goroutines so as not to block
  904. // other operations.
  905. requestsWaitGroup := new(sync.WaitGroup)
  906. requestsWaitGroup.Add(1)
  907. signalStatusRequest := make(chan struct{})
  908. go func() {
  909. defer requestsWaitGroup.Done()
  910. for range signalStatusRequest {
  911. sendStats(tunnel)
  912. }
  913. }()
  914. requestsWaitGroup.Add(1)
  915. signalSshKeepAlive := make(chan time.Duration)
  916. sshKeepAliveError := make(chan error, 1)
  917. go func() {
  918. defer requestsWaitGroup.Done()
  919. isFirstKeepAlive := true
  920. for timeout := range signalSshKeepAlive {
  921. err := tunnel.sendSshKeepAlive(isFirstKeepAlive, timeout)
  922. if err != nil {
  923. select {
  924. case sshKeepAliveError <- err:
  925. default:
  926. }
  927. }
  928. isFirstKeepAlive = false
  929. }
  930. }()
  931. shutdown := false
  932. var err error
  933. for !shutdown && err == nil {
  934. select {
  935. case <-noticeBytesTransferredTicker.C:
  936. sent, received := transferstats.ReportRecentBytesTransferredForServer(
  937. tunnel.dialParams.ServerEntry.IpAddress)
  938. if received > 0 {
  939. lastBytesReceivedTime = monotime.Now()
  940. }
  941. totalSent += sent
  942. totalReceived += received
  943. p := tunnel.getCustomClientParameters()
  944. noticePeriod := p.Duration(parameters.TotalBytesTransferredNoticePeriod)
  945. replayTargetUpstreamBytes := p.Int(parameters.ReplayTargetUpstreamBytes)
  946. replayTargetDownstreamBytes := p.Int(parameters.ReplayTargetDownstreamBytes)
  947. replayTargetTunnelDuration := p.Duration(parameters.ReplayTargetTunnelDuration)
  948. if lastTotalBytesTransferedTime.Add(noticePeriod).Before(monotime.Now()) {
  949. NoticeTotalBytesTransferred(
  950. tunnel.dialParams.ServerEntry.GetDiagnosticID(), totalSent, totalReceived)
  951. lastTotalBytesTransferedTime = monotime.Now()
  952. }
  953. // Only emit the frequent BytesTransferred notice when tunnel is not idle.
  954. if tunnel.config.EmitBytesTransferred && (sent > 0 || received > 0) {
  955. NoticeBytesTransferred(
  956. tunnel.dialParams.ServerEntry.GetDiagnosticID(), sent, received)
  957. }
  958. // Once the tunnel has connected, activated, successfully transmitted the
  959. // targeted number of bytes, and been up for the targeted duration
  960. // (measured from the end of establishment), store its dial parameters for
  961. // subsequent replay.
  962. //
  963. // Even when target bytes and duration are all 0, the tunnel must remain up
  964. // for at least 1 second due to use of noticeBytesTransferredTicker; for
  965. // the same reason the granularity of ReplayTargetTunnelDuration is
  966. // seconds.
  967. if !setDialParamsSucceeded &&
  968. totalSent >= int64(replayTargetUpstreamBytes) &&
  969. totalReceived >= int64(replayTargetDownstreamBytes) &&
  970. monotime.Since(tunnel.establishedTime) >= replayTargetTunnelDuration {
  971. tunnel.dialParams.Succeeded()
  972. setDialParamsSucceeded = true
  973. }
  974. case <-statsTimer.C:
  975. select {
  976. case signalStatusRequest <- *new(struct{}):
  977. default:
  978. }
  979. statsTimer.Reset(nextStatusRequestPeriod())
  980. case <-sshKeepAliveTimer.C:
  981. p := tunnel.getCustomClientParameters()
  982. inactivePeriod := p.Duration(parameters.SSHKeepAlivePeriodicInactivePeriod)
  983. if lastBytesReceivedTime.Add(inactivePeriod).Before(monotime.Now()) {
  984. timeout := p.Duration(parameters.SSHKeepAlivePeriodicTimeout)
  985. select {
  986. case signalSshKeepAlive <- timeout:
  987. default:
  988. }
  989. }
  990. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  991. case <-tunnel.signalPortForwardFailure:
  992. // Note: no mutex on portForwardFailureTotal; only referenced here
  993. tunnel.totalPortForwardFailures++
  994. NoticeInfo("port forward failures for %s: %d",
  995. tunnel.dialParams.ServerEntry.GetDiagnosticID(),
  996. tunnel.totalPortForwardFailures)
  997. // If the underlying Conn has closed (meek and other plugin protocols may close
  998. // themselves in certain error conditions), the tunnel has certainly failed.
  999. // Otherwise, probe with an SSH keep alive.
  1000. if tunnel.conn.IsClosed() {
  1001. err = errors.New("underlying conn is closed")
  1002. } else {
  1003. p := tunnel.getCustomClientParameters()
  1004. inactivePeriod := p.Duration(parameters.SSHKeepAliveProbeInactivePeriod)
  1005. if lastBytesReceivedTime.Add(inactivePeriod).Before(monotime.Now()) {
  1006. timeout := p.Duration(parameters.SSHKeepAliveProbeTimeout)
  1007. select {
  1008. case signalSshKeepAlive <- timeout:
  1009. default:
  1010. }
  1011. }
  1012. if !tunnel.config.DisablePeriodicSshKeepAlive {
  1013. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  1014. }
  1015. }
  1016. case err = <-sshKeepAliveError:
  1017. case serverRequest := <-tunnel.sshServerRequests:
  1018. if serverRequest != nil {
  1019. err := HandleServerRequest(tunnelOwner, tunnel, serverRequest.Type, serverRequest.Payload)
  1020. if err == nil {
  1021. serverRequest.Reply(true, nil)
  1022. } else {
  1023. NoticeAlert("HandleServerRequest for %s failed: %s", serverRequest.Type, err)
  1024. serverRequest.Reply(false, nil)
  1025. }
  1026. }
  1027. case <-tunnel.operateCtx.Done():
  1028. shutdown = true
  1029. }
  1030. }
  1031. close(signalSshKeepAlive)
  1032. close(signalStatusRequest)
  1033. requestsWaitGroup.Wait()
  1034. // Capture bytes transferred since the last noticeBytesTransferredTicker tick
  1035. sent, received := transferstats.ReportRecentBytesTransferredForServer(
  1036. tunnel.dialParams.ServerEntry.IpAddress)
  1037. totalSent += sent
  1038. totalReceived += received
  1039. // Always emit a final NoticeTotalBytesTransferred
  1040. NoticeTotalBytesTransferred(
  1041. tunnel.dialParams.ServerEntry.GetDiagnosticID(), totalSent, totalReceived)
  1042. if err == nil {
  1043. NoticeInfo("shutdown operate tunnel")
  1044. // Send a final status request in order to report any outstanding
  1045. // domain bytes transferred stats as well as to report session stats
  1046. // as soon as possible.
  1047. // This request will be interrupted when the tunnel is closed after
  1048. // an operate shutdown timeout.
  1049. sendStats(tunnel)
  1050. } else {
  1051. NoticeAlert("operate tunnel error for %s: %s",
  1052. tunnel.dialParams.ServerEntry.GetDiagnosticID(), err)
  1053. tunnelOwner.SignalTunnelFailure(tunnel)
  1054. }
  1055. }
  1056. // sendSshKeepAlive is a helper which sends a [email protected] request
  1057. // on the specified SSH connections and returns true of the request succeeds
  1058. // within a specified timeout. If the request fails, the associated conn is
  1059. // closed, which will terminate the associated tunnel.
  1060. func (tunnel *Tunnel) sendSshKeepAlive(isFirstKeepAlive bool, timeout time.Duration) error {
  1061. // Note: there is no request context since SSH requests cannot be
  1062. // interrupted directly. Closing the tunnel will interrupt the request.
  1063. // A timeout is set to unblock this function, but the goroutine may
  1064. // not exit until the tunnel is closed.
  1065. // Use a buffer of 1 as there are two senders and only one guaranteed receive.
  1066. errChannel := make(chan error, 1)
  1067. afterFunc := time.AfterFunc(timeout, func() {
  1068. errChannel <- errors.New("timed out")
  1069. })
  1070. defer afterFunc.Stop()
  1071. go func() {
  1072. // Random padding to frustrate fingerprinting.
  1073. p := tunnel.getCustomClientParameters()
  1074. request := prng.Padding(
  1075. p.Int(parameters.SSHKeepAlivePaddingMinBytes),
  1076. p.Int(parameters.SSHKeepAlivePaddingMaxBytes))
  1077. p.Close()
  1078. startTime := monotime.Now()
  1079. // Note: reading a reply is important for last-received-time tunnel
  1080. // duration calculation.
  1081. requestOk, response, err := tunnel.sshClient.SendRequest(
  1082. "[email protected]", true, request)
  1083. elapsedTime := monotime.Since(startTime)
  1084. errChannel <- err
  1085. // Record the keep alive round trip as a speed test sample. The first
  1086. // keep alive is always recorded, as many tunnels are short-lived and
  1087. // we want to ensure that some data is gathered. Subsequent keep
  1088. // alives are recorded with some configurable probability, which,
  1089. // considering that only the last SpeedTestMaxSampleCount samples are
  1090. // retained, enables tuning the sampling frequency.
  1091. if err == nil && requestOk &&
  1092. (isFirstKeepAlive ||
  1093. tunnel.getCustomClientParameters().WeightedCoinFlip(
  1094. parameters.SSHKeepAliveSpeedTestSampleProbability)) {
  1095. err = tactics.AddSpeedTestSample(
  1096. tunnel.config.GetClientParameters(),
  1097. GetTacticsStorer(),
  1098. tunnel.config.GetNetworkID(),
  1099. tunnel.dialParams.ServerEntry.Region,
  1100. tunnel.dialParams.TunnelProtocol,
  1101. elapsedTime,
  1102. request,
  1103. response)
  1104. if err != nil {
  1105. NoticeAlert("AddSpeedTestSample failed: %s", common.ContextError(err))
  1106. }
  1107. }
  1108. }()
  1109. err := <-errChannel
  1110. if err != nil {
  1111. tunnel.sshClient.Close()
  1112. tunnel.conn.Close()
  1113. }
  1114. return common.ContextError(err)
  1115. }
  1116. // sendStats is a helper for sending session stats to the server.
  1117. func sendStats(tunnel *Tunnel) bool {
  1118. // Tunnel does not have a serverContext when DisableApi is set
  1119. if tunnel.serverContext == nil {
  1120. return true
  1121. }
  1122. // Skip when tunnel is discarded
  1123. if tunnel.IsDiscarded() {
  1124. return true
  1125. }
  1126. err := tunnel.serverContext.DoStatusRequest(tunnel)
  1127. if err != nil {
  1128. NoticeAlert("DoStatusRequest failed for %s: %s",
  1129. tunnel.dialParams.ServerEntry.GetDiagnosticID(), err)
  1130. }
  1131. return err == nil
  1132. }