tunnel.go 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "context"
  23. "crypto/rand"
  24. "encoding/base64"
  25. "encoding/json"
  26. "fmt"
  27. "io"
  28. "io/ioutil"
  29. "net"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  34. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/ssh"
  35. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  36. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/marionette"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tapdance"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/transferstats"
  45. )
  46. // Tunneler specifies the interface required by components that use a tunnel.
  47. // Components which use this interface may be serviced by a single Tunnel instance,
  48. // or a Controller which manages a pool of tunnels, or any other object which
  49. // implements Tunneler.
  50. type Tunneler interface {
  51. // Dial creates a tunneled connection.
  52. //
  53. // alwaysTunnel indicates that the connection should always be tunneled. If this
  54. // is not set, the connection may be made directly, depending on split tunnel
  55. // classification, when that feature is supported and active.
  56. //
  57. // downstreamConn is an optional parameter which specifies a connection to be
  58. // explicitly closed when the Dialed connection is closed. For instance, this
  59. // is used to close downstreamConn App<->LocalProxy connections when the related
  60. // LocalProxy<->SshPortForward connections close.
  61. Dial(remoteAddr string, alwaysTunnel bool, downstreamConn net.Conn) (conn net.Conn, err error)
  62. DirectDial(remoteAddr string) (conn net.Conn, err error)
  63. SignalComponentFailure()
  64. }
  65. // TunnelOwner specifies the interface required by Tunnel to notify its
  66. // owner when it has failed. The owner may, as in the case of the Controller,
  67. // remove the tunnel from its list of active tunnels.
  68. type TunnelOwner interface {
  69. SignalSeededNewSLOK()
  70. SignalTunnelFailure(tunnel *Tunnel)
  71. }
  72. // Tunnel is a connection to a Psiphon server. An established
  73. // tunnel includes a network connection to the specified server
  74. // and an SSH session built on top of that transport.
  75. type Tunnel struct {
  76. mutex *sync.Mutex
  77. config *Config
  78. isActivated bool
  79. isDiscarded bool
  80. isClosed bool
  81. dialParams *DialParameters
  82. livenessTestMetrics *livenessTestMetrics
  83. serverContext *ServerContext
  84. conn *common.ActivityMonitoredConn
  85. sshClient *ssh.Client
  86. sshServerRequests <-chan *ssh.Request
  87. operateWaitGroup *sync.WaitGroup
  88. operateCtx context.Context
  89. stopOperate context.CancelFunc
  90. signalPortForwardFailure chan struct{}
  91. totalPortForwardFailures int
  92. adjustedEstablishStartTime time.Time
  93. establishDuration time.Duration
  94. establishedTime time.Time
  95. handledSSHKeepAliveFailure int32
  96. inFlightConnectedRequestSignal chan struct{}
  97. }
  98. // getCustomClientParameters helpers wrap the verbose function call chain
  99. // required to get a current snapshot of the ClientParameters customized with
  100. // the dial parameters associated with a tunnel.
  101. func (tunnel *Tunnel) getCustomClientParameters() parameters.ClientParametersAccessor {
  102. return getCustomClientParameters(tunnel.config, tunnel.dialParams)
  103. }
  104. func getCustomClientParameters(
  105. config *Config, dialParams *DialParameters) parameters.ClientParametersAccessor {
  106. return config.GetClientParameters().GetCustom(dialParams.NetworkLatencyMultiplier)
  107. }
  108. // ConnectTunnel first makes a network transport connection to the
  109. // Psiphon server and then establishes an SSH client session on top of
  110. // that transport. The SSH server is authenticated using the public
  111. // key in the server entry.
  112. // Depending on the server's capabilities, the connection may use
  113. // plain SSH over TCP, obfuscated SSH over TCP, or obfuscated SSH over
  114. // HTTP (meek protocol).
  115. // When requiredProtocol is not blank, that protocol is used. Otherwise,
  116. // the a random supported protocol is used.
  117. //
  118. // Call Activate on a connected tunnel to complete its establishment
  119. // before using.
  120. //
  121. // Tunnel establishment is split into two phases: connection, and
  122. // activation. The Controller will run many ConnectTunnel calls
  123. // concurrently and then, to avoid unnecessary overhead from making
  124. // handshake requests and starting operateTunnel from tunnels which
  125. // may be discarded, call Activate on connected tunnels sequentially
  126. // as necessary.
  127. //
  128. func ConnectTunnel(
  129. ctx context.Context,
  130. config *Config,
  131. adjustedEstablishStartTime time.Time,
  132. dialParams *DialParameters) (*Tunnel, error) {
  133. // Build transport layers and establish SSH connection. Note that
  134. // dialConn and monitoredConn are the same network connection.
  135. dialResult, err := dialTunnel(ctx, config, dialParams)
  136. if err != nil {
  137. return nil, errors.Trace(err)
  138. }
  139. // The tunnel is now connected
  140. return &Tunnel{
  141. mutex: new(sync.Mutex),
  142. config: config,
  143. dialParams: dialParams,
  144. livenessTestMetrics: dialResult.livenessTestMetrics,
  145. conn: dialResult.monitoredConn,
  146. sshClient: dialResult.sshClient,
  147. sshServerRequests: dialResult.sshRequests,
  148. // A buffer allows at least one signal to be sent even when the receiver is
  149. // not listening. Senders should not block.
  150. signalPortForwardFailure: make(chan struct{}, 1),
  151. adjustedEstablishStartTime: adjustedEstablishStartTime,
  152. }, nil
  153. }
  154. // Activate completes the tunnel establishment, performing the handshake
  155. // request and starting operateTunnel, the worker that monitors the tunnel
  156. // and handles periodic management.
  157. func (tunnel *Tunnel) Activate(
  158. ctx context.Context, tunnelOwner TunnelOwner) (retErr error) {
  159. // Ensure that, unless the base context is cancelled, any replayed dial
  160. // parameters are cleared, no longer to be retried, if the tunnel fails to
  161. // activate.
  162. activationSucceeded := false
  163. baseCtx := ctx
  164. defer func() {
  165. if !activationSucceeded && baseCtx.Err() == nil {
  166. tunnel.dialParams.Failed(tunnel.config)
  167. _ = RecordFailedTunnelStat(
  168. tunnel.config,
  169. tunnel.dialParams,
  170. tunnel.livenessTestMetrics,
  171. -1,
  172. -1,
  173. retErr)
  174. }
  175. }()
  176. // Create a new Psiphon API server context for this tunnel. This includes
  177. // performing a handshake request. If the handshake fails, this activation
  178. // fails.
  179. var serverContext *ServerContext
  180. if !tunnel.config.DisableApi {
  181. NoticeInfo(
  182. "starting server context for %s",
  183. tunnel.dialParams.ServerEntry.GetDiagnosticID())
  184. // Call NewServerContext in a goroutine, as it blocks on a network operation,
  185. // the handshake request, and would block shutdown. If the shutdown signal is
  186. // received, close the tunnel, which will interrupt the handshake request
  187. // that may be blocking NewServerContext.
  188. //
  189. // Timeout after PsiphonApiServerTimeoutSeconds. NewServerContext may not
  190. // return if the tunnel network connection is unstable during the handshake
  191. // request. At this point, there is no operateTunnel monitor that will detect
  192. // this condition with SSH keep alives.
  193. timeout := tunnel.getCustomClientParameters().Duration(
  194. parameters.PsiphonAPIRequestTimeout)
  195. if timeout > 0 {
  196. var cancelFunc context.CancelFunc
  197. ctx, cancelFunc = context.WithTimeout(ctx, timeout)
  198. defer cancelFunc()
  199. }
  200. type newServerContextResult struct {
  201. serverContext *ServerContext
  202. err error
  203. }
  204. resultChannel := make(chan newServerContextResult)
  205. go func() {
  206. serverContext, err := NewServerContext(tunnel)
  207. resultChannel <- newServerContextResult{
  208. serverContext: serverContext,
  209. err: err,
  210. }
  211. }()
  212. var result newServerContextResult
  213. select {
  214. case result = <-resultChannel:
  215. case <-ctx.Done():
  216. result.err = ctx.Err()
  217. // Interrupt the goroutine
  218. tunnel.Close(true)
  219. <-resultChannel
  220. }
  221. if result.err != nil {
  222. return errors.Trace(result.err)
  223. }
  224. serverContext = result.serverContext
  225. }
  226. // The activation succeeded.
  227. activationSucceeded = true
  228. tunnel.mutex.Lock()
  229. // It may happen that the tunnel gets closed while Activate is running.
  230. // In this case, abort here, to ensure that the operateTunnel goroutine
  231. // will not be launched after Close is called.
  232. if tunnel.isClosed {
  233. return errors.TraceNew("tunnel is closed")
  234. }
  235. tunnel.isActivated = true
  236. tunnel.serverContext = serverContext
  237. // establishDuration is the elapsed time between the controller starting tunnel
  238. // establishment and this tunnel being established. The reported value represents
  239. // how long the user waited between starting the client and having a usable tunnel;
  240. // or how long between the client detecting an unexpected tunnel disconnect and
  241. // completing automatic reestablishment.
  242. //
  243. // This time period may include time spent unsuccessfully connecting to other
  244. // servers. Time spent waiting for network connectivity is excluded.
  245. tunnel.establishDuration = time.Since(tunnel.adjustedEstablishStartTime)
  246. tunnel.establishedTime = time.Now()
  247. // Use the Background context instead of the controller run context, as tunnels
  248. // are terminated when the controller calls tunnel.Close.
  249. tunnel.operateCtx, tunnel.stopOperate = context.WithCancel(context.Background())
  250. tunnel.operateWaitGroup = new(sync.WaitGroup)
  251. // Spawn the operateTunnel goroutine, which monitors the tunnel and handles periodic
  252. // stats updates.
  253. tunnel.operateWaitGroup.Add(1)
  254. go tunnel.operateTunnel(tunnelOwner)
  255. tunnel.mutex.Unlock()
  256. return nil
  257. }
  258. // Close stops operating the tunnel and closes the underlying connection.
  259. // Supports multiple and/or concurrent calls to Close().
  260. // When isDiscarded is set, operateTunnel will not attempt to send final
  261. // status requests.
  262. func (tunnel *Tunnel) Close(isDiscarded bool) {
  263. tunnel.mutex.Lock()
  264. tunnel.isDiscarded = isDiscarded
  265. isActivated := tunnel.isActivated
  266. isClosed := tunnel.isClosed
  267. tunnel.isClosed = true
  268. tunnel.mutex.Unlock()
  269. if !isClosed {
  270. // Signal operateTunnel to stop before closing the tunnel -- this
  271. // allows a final status request to be made in the case of an orderly
  272. // shutdown.
  273. // A timer is set, so if operateTunnel takes too long to stop, the
  274. // tunnel is closed, which will interrupt any slow final status request.
  275. if isActivated {
  276. timeout := tunnel.getCustomClientParameters().Duration(
  277. parameters.TunnelOperateShutdownTimeout)
  278. afterFunc := time.AfterFunc(
  279. timeout,
  280. func() { tunnel.conn.Close() })
  281. tunnel.stopOperate()
  282. tunnel.operateWaitGroup.Wait()
  283. afterFunc.Stop()
  284. }
  285. tunnel.sshClient.Close()
  286. // tunnel.conn.Close() may get called multiple times, which is allowed.
  287. tunnel.conn.Close()
  288. err := tunnel.sshClient.Wait()
  289. if err != nil {
  290. NoticeWarning("close tunnel ssh error: %s", err)
  291. }
  292. }
  293. }
  294. // SetInFlightConnectedRequest checks if a connected request can begin and
  295. // sets the channel used to signal that the request is complete.
  296. //
  297. // The caller must not initiate a connected request when
  298. // SetInFlightConnectedRequest returns false. When SetInFlightConnectedRequest
  299. // returns true, the caller must call SetInFlightConnectedRequest(nil) when
  300. // the connected request completes.
  301. func (tunnel *Tunnel) SetInFlightConnectedRequest(requestSignal chan struct{}) bool {
  302. tunnel.mutex.Lock()
  303. defer tunnel.mutex.Unlock()
  304. // If already closing, don't start a connected request: the
  305. // TunnelOperateShutdownTimeout period may be nearly expired.
  306. if tunnel.isClosed {
  307. return false
  308. }
  309. if requestSignal == nil {
  310. // Not already in-flight (not expected)
  311. if tunnel.inFlightConnectedRequestSignal == nil {
  312. return false
  313. }
  314. } else {
  315. // Already in-flight (not expected)
  316. if tunnel.inFlightConnectedRequestSignal != nil {
  317. return false
  318. }
  319. }
  320. tunnel.inFlightConnectedRequestSignal = requestSignal
  321. return true
  322. }
  323. // AwaitInFlightConnectedRequest waits for the signal that any in-flight
  324. // connected request is complete.
  325. //
  326. // AwaitInFlightConnectedRequest may block until the connected request is
  327. // aborted by terminating the tunnel.
  328. func (tunnel *Tunnel) AwaitInFlightConnectedRequest() {
  329. tunnel.mutex.Lock()
  330. requestSignal := tunnel.inFlightConnectedRequestSignal
  331. tunnel.mutex.Unlock()
  332. if requestSignal != nil {
  333. <-requestSignal
  334. }
  335. }
  336. // IsActivated returns the tunnel's activated flag.
  337. func (tunnel *Tunnel) IsActivated() bool {
  338. tunnel.mutex.Lock()
  339. defer tunnel.mutex.Unlock()
  340. return tunnel.isActivated
  341. }
  342. // IsDiscarded returns the tunnel's discarded flag.
  343. func (tunnel *Tunnel) IsDiscarded() bool {
  344. tunnel.mutex.Lock()
  345. defer tunnel.mutex.Unlock()
  346. return tunnel.isDiscarded
  347. }
  348. // SendAPIRequest sends an API request as an SSH request through the tunnel.
  349. // This function blocks awaiting a response. Only one request may be in-flight
  350. // at once; a concurrent SendAPIRequest will block until an active request
  351. // receives its response (or the SSH connection is terminated).
  352. func (tunnel *Tunnel) SendAPIRequest(
  353. name string, requestPayload []byte) ([]byte, error) {
  354. ok, responsePayload, err := tunnel.sshClient.Conn.SendRequest(
  355. name, true, requestPayload)
  356. if err != nil {
  357. return nil, errors.Trace(err)
  358. }
  359. if !ok {
  360. return nil, errors.TraceNew("API request rejected")
  361. }
  362. return responsePayload, nil
  363. }
  364. // Dial establishes a port forward connection through the tunnel
  365. // This Dial doesn't support split tunnel, so alwaysTunnel is not referenced
  366. func (tunnel *Tunnel) Dial(
  367. remoteAddr string, alwaysTunnel bool, downstreamConn net.Conn) (net.Conn, error) {
  368. channel, err := tunnel.dialChannel("tcp", remoteAddr)
  369. if err != nil {
  370. return nil, errors.Trace(err)
  371. }
  372. netConn, ok := channel.(net.Conn)
  373. if !ok {
  374. return nil, errors.Tracef("unexpected channel type: %T", channel)
  375. }
  376. conn := &TunneledConn{
  377. Conn: netConn,
  378. tunnel: tunnel,
  379. downstreamConn: downstreamConn}
  380. return tunnel.wrapWithTransferStats(conn), nil
  381. }
  382. func (tunnel *Tunnel) DialPacketTunnelChannel() (net.Conn, error) {
  383. channel, err := tunnel.dialChannel(protocol.PACKET_TUNNEL_CHANNEL_TYPE, "")
  384. if err != nil {
  385. return nil, errors.Trace(err)
  386. }
  387. sshChannel, ok := channel.(ssh.Channel)
  388. if !ok {
  389. return nil, errors.Tracef("unexpected channel type: %T", channel)
  390. }
  391. NoticeInfo("DialPacketTunnelChannel: established channel")
  392. conn := newChannelConn(sshChannel)
  393. // wrapWithTransferStats will track bytes transferred for the
  394. // packet tunnel. It will count packet overhead (TCP/UDP/IP headers).
  395. //
  396. // Since the data in the channel is not HTTP or TLS, no domain bytes
  397. // counting is expected.
  398. //
  399. // transferstats are also used to determine that there's been recent
  400. // activity and skip periodic SSH keep alives; see Tunnel.operateTunnel.
  401. return tunnel.wrapWithTransferStats(conn), nil
  402. }
  403. func (tunnel *Tunnel) dialChannel(channelType, remoteAddr string) (interface{}, error) {
  404. if !tunnel.IsActivated() {
  405. return nil, errors.TraceNew("tunnel is not activated")
  406. }
  407. // Note: there is no dial context since SSH port forward dials cannot
  408. // be interrupted directly. Closing the tunnel will interrupt the dials.
  409. // A timeout is set to unblock this function, but the goroutine may
  410. // not exit until the tunnel is closed.
  411. type channelDialResult struct {
  412. channel interface{}
  413. err error
  414. }
  415. // Use a buffer of 1 as there are two senders and only one guaranteed receive.
  416. results := make(chan *channelDialResult, 1)
  417. afterFunc := time.AfterFunc(
  418. tunnel.getCustomClientParameters().Duration(
  419. parameters.TunnelPortForwardDialTimeout),
  420. func() {
  421. results <- &channelDialResult{
  422. nil, errors.Tracef("channel dial timeout: %s", channelType)}
  423. })
  424. defer afterFunc.Stop()
  425. go func() {
  426. result := new(channelDialResult)
  427. if channelType == "tcp" {
  428. result.channel, result.err =
  429. tunnel.sshClient.Dial("tcp", remoteAddr)
  430. } else {
  431. var sshRequests <-chan *ssh.Request
  432. result.channel, sshRequests, result.err =
  433. tunnel.sshClient.OpenChannel(channelType, nil)
  434. if result.err == nil {
  435. go ssh.DiscardRequests(sshRequests)
  436. }
  437. }
  438. if result.err != nil {
  439. result.err = errors.Trace(result.err)
  440. }
  441. results <- result
  442. }()
  443. result := <-results
  444. if result.err != nil {
  445. // TODO: conditional on type of error or error message?
  446. select {
  447. case tunnel.signalPortForwardFailure <- struct{}{}:
  448. default:
  449. }
  450. return nil, errors.Trace(result.err)
  451. }
  452. return result.channel, nil
  453. }
  454. func (tunnel *Tunnel) wrapWithTransferStats(conn net.Conn) net.Conn {
  455. // Tunnel does not have a serverContext when DisableApi is set. We still use
  456. // transferstats.Conn to count bytes transferred for monitoring tunnel
  457. // quality.
  458. var regexps *transferstats.Regexps
  459. if tunnel.serverContext != nil {
  460. regexps = tunnel.serverContext.StatsRegexps()
  461. }
  462. return transferstats.NewConn(
  463. conn, tunnel.dialParams.ServerEntry.IpAddress, regexps)
  464. }
  465. // SignalComponentFailure notifies the tunnel that an associated component has failed.
  466. // This will terminate the tunnel.
  467. func (tunnel *Tunnel) SignalComponentFailure() {
  468. NoticeWarning("tunnel received component failure signal")
  469. tunnel.Close(false)
  470. }
  471. // TunneledConn implements net.Conn and wraps a port forward connection.
  472. // It is used to hook into Read and Write to observe I/O errors and
  473. // report these errors back to the tunnel monitor as port forward failures.
  474. // TunneledConn optionally tracks a peer connection to be explicitly closed
  475. // when the TunneledConn is closed.
  476. type TunneledConn struct {
  477. net.Conn
  478. tunnel *Tunnel
  479. downstreamConn net.Conn
  480. }
  481. func (conn *TunneledConn) Read(buffer []byte) (n int, err error) {
  482. n, err = conn.Conn.Read(buffer)
  483. if err != nil && err != io.EOF {
  484. // Report new failure. Won't block; assumes the receiver
  485. // has a sufficient buffer for the threshold number of reports.
  486. // TODO: conditional on type of error or error message?
  487. select {
  488. case conn.tunnel.signalPortForwardFailure <- struct{}{}:
  489. default:
  490. }
  491. }
  492. return
  493. }
  494. func (conn *TunneledConn) Write(buffer []byte) (n int, err error) {
  495. n, err = conn.Conn.Write(buffer)
  496. if err != nil && err != io.EOF {
  497. // Same as TunneledConn.Read()
  498. select {
  499. case conn.tunnel.signalPortForwardFailure <- struct{}{}:
  500. default:
  501. }
  502. }
  503. return
  504. }
  505. func (conn *TunneledConn) Close() error {
  506. if conn.downstreamConn != nil {
  507. conn.downstreamConn.Close()
  508. }
  509. return conn.Conn.Close()
  510. }
  511. type dialResult struct {
  512. dialConn net.Conn
  513. monitoredConn *common.ActivityMonitoredConn
  514. sshClient *ssh.Client
  515. sshRequests <-chan *ssh.Request
  516. livenessTestMetrics *livenessTestMetrics
  517. }
  518. // dialTunnel is a helper that builds the transport layers and establishes the
  519. // SSH connection. When additional dial configuration is used, dial metrics
  520. // are recorded and returned.
  521. //
  522. // The net.Conn return value is the value to be removed from pendingConns;
  523. // additional layering (ThrottledConn, ActivityMonitoredConn) is applied, but
  524. // this return value is the base dial conn. The *ActivityMonitoredConn return
  525. // value is the layered conn passed into the ssh.Client.
  526. func dialTunnel(
  527. ctx context.Context,
  528. config *Config,
  529. dialParams *DialParameters) (_ *dialResult, retErr error) {
  530. // Return immediately when overall context is canceled or timed-out. This
  531. // avoids notice noise.
  532. err := ctx.Err()
  533. if err != nil {
  534. return nil, errors.Trace(err)
  535. }
  536. p := getCustomClientParameters(config, dialParams)
  537. timeout := p.Duration(parameters.TunnelConnectTimeout)
  538. rateLimits := p.RateLimits(parameters.TunnelRateLimits)
  539. obfuscatedSSHMinPadding := p.Int(parameters.ObfuscatedSSHMinPadding)
  540. obfuscatedSSHMaxPadding := p.Int(parameters.ObfuscatedSSHMaxPadding)
  541. livenessTestMinUpstreamBytes := p.Int(parameters.LivenessTestMinUpstreamBytes)
  542. livenessTestMaxUpstreamBytes := p.Int(parameters.LivenessTestMaxUpstreamBytes)
  543. livenessTestMinDownstreamBytes := p.Int(parameters.LivenessTestMinDownstreamBytes)
  544. livenessTestMaxDownstreamBytes := p.Int(parameters.LivenessTestMaxDownstreamBytes)
  545. p.Close()
  546. // Ensure that, unless the base context is cancelled, any replayed dial
  547. // parameters are cleared, no longer to be retried, if the tunnel fails to
  548. // connect.
  549. //
  550. // Limitation: dials that fail to connect due to the server being in a
  551. // load-limiting state are not distinguished and excepted from this
  552. // logic.
  553. dialSucceeded := false
  554. baseCtx := ctx
  555. var failedTunnelLivenessTestMetrics *livenessTestMetrics
  556. defer func() {
  557. if !dialSucceeded && baseCtx.Err() == nil {
  558. dialParams.Failed(config)
  559. _ = RecordFailedTunnelStat(
  560. config,
  561. dialParams,
  562. failedTunnelLivenessTestMetrics,
  563. -1,
  564. -1,
  565. retErr)
  566. }
  567. }()
  568. var cancelFunc context.CancelFunc
  569. ctx, cancelFunc = context.WithTimeout(ctx, timeout)
  570. defer cancelFunc()
  571. // DialDuration is the elapsed time for both successful and failed tunnel
  572. // dials. For successful tunnels, it includes any the network protocol
  573. // handshake(s), obfuscation protocol handshake(s), SSH handshake, and
  574. // liveness test, when performed.
  575. //
  576. // Note: ensure DialDuration is set before calling any function which logs
  577. // dial_duration.
  578. startDialTime := time.Now()
  579. defer func() {
  580. dialParams.DialDuration = time.Since(startDialTime)
  581. }()
  582. // Note: dialParams.MeekResolvedIPAddress isn't set until the dial begins,
  583. // so it will always be blank in NoticeConnectingServer.
  584. NoticeConnectingServer(dialParams)
  585. // Create the base transport: meek or direct connection
  586. var dialConn net.Conn
  587. if protocol.TunnelProtocolUsesMeek(dialParams.TunnelProtocol) {
  588. dialConn, err = DialMeek(
  589. ctx,
  590. dialParams.GetMeekConfig(),
  591. dialParams.GetDialConfig())
  592. if err != nil {
  593. return nil, errors.Trace(err)
  594. }
  595. } else if protocol.TunnelProtocolUsesQUIC(dialParams.TunnelProtocol) {
  596. packetConn, remoteAddr, err := NewUDPConn(
  597. ctx,
  598. dialParams.DirectDialAddress,
  599. dialParams.GetDialConfig())
  600. if err != nil {
  601. return nil, errors.Trace(err)
  602. }
  603. dialConn, err = quic.Dial(
  604. ctx,
  605. packetConn,
  606. remoteAddr,
  607. dialParams.QUICDialSNIAddress,
  608. dialParams.QUICVersion,
  609. dialParams.ServerEntry.SshObfuscatedKey,
  610. dialParams.ObfuscatedQUICPaddingSeed)
  611. if err != nil {
  612. return nil, errors.Trace(err)
  613. }
  614. } else if protocol.TunnelProtocolUsesMarionette(dialParams.TunnelProtocol) {
  615. dialConn, err = marionette.Dial(
  616. ctx,
  617. NewNetDialer(dialParams.GetDialConfig()),
  618. dialParams.ServerEntry.MarionetteFormat,
  619. dialParams.DirectDialAddress)
  620. if err != nil {
  621. return nil, errors.Trace(err)
  622. }
  623. } else if protocol.TunnelProtocolUsesTapdance(dialParams.TunnelProtocol) {
  624. dialConn, err = tapdance.Dial(
  625. ctx,
  626. config.EmitTapdanceLogs,
  627. config.GetTapdanceDirectory(),
  628. NewNetDialer(dialParams.GetDialConfig()),
  629. dialParams.DirectDialAddress)
  630. if err != nil {
  631. return nil, errors.Trace(err)
  632. }
  633. } else {
  634. dialConn, err = DialTCP(
  635. ctx,
  636. dialParams.DirectDialAddress,
  637. dialParams.GetDialConfig())
  638. if err != nil {
  639. return nil, errors.Trace(err)
  640. }
  641. }
  642. // Some conns report additional metrics. fragmentor.Conns report
  643. // fragmentor configs.
  644. //
  645. // Limitation: for meek, GetMetrics from underlying fragmentor.Conn(s)
  646. // should be called in order to log fragmentor metrics for meek sessions.
  647. if metricsSource, ok := dialConn.(common.MetricsSource); ok {
  648. dialParams.DialConnMetrics = metricsSource
  649. }
  650. // If dialConn is not a Closer, tunnel failure detection may be slower
  651. if _, ok := dialConn.(common.Closer); !ok {
  652. NoticeWarning("tunnel.dialTunnel: dialConn is not a Closer")
  653. }
  654. cleanupConn := dialConn
  655. defer func() {
  656. // Cleanup on error
  657. if cleanupConn != nil {
  658. cleanupConn.Close()
  659. }
  660. }()
  661. // Activity monitoring is used to measure tunnel duration
  662. monitoredConn, err := common.NewActivityMonitoredConn(dialConn, 0, false, nil, nil)
  663. if err != nil {
  664. return nil, errors.Trace(err)
  665. }
  666. // Apply throttling (if configured)
  667. throttledConn := common.NewThrottledConn(
  668. monitoredConn,
  669. rateLimits)
  670. // Add obfuscated SSH layer
  671. var sshConn net.Conn = throttledConn
  672. if protocol.TunnelProtocolUsesObfuscatedSSH(dialParams.TunnelProtocol) {
  673. obfuscatedSSHConn, err := obfuscator.NewClientObfuscatedSSHConn(
  674. throttledConn,
  675. dialParams.ServerEntry.SshObfuscatedKey,
  676. dialParams.ObfuscatorPaddingSeed,
  677. &obfuscatedSSHMinPadding,
  678. &obfuscatedSSHMaxPadding)
  679. if err != nil {
  680. return nil, errors.Trace(err)
  681. }
  682. sshConn = obfuscatedSSHConn
  683. dialParams.ObfuscatedSSHConnMetrics = obfuscatedSSHConn
  684. }
  685. // Now establish the SSH session over the conn transport
  686. expectedPublicKey, err := base64.StdEncoding.DecodeString(
  687. dialParams.ServerEntry.SshHostKey)
  688. if err != nil {
  689. return nil, errors.Trace(err)
  690. }
  691. sshCertChecker := &ssh.CertChecker{
  692. HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
  693. if !bytes.Equal(expectedPublicKey, publicKey.Marshal()) {
  694. return errors.TraceNew("unexpected host public key")
  695. }
  696. return nil
  697. },
  698. }
  699. sshPasswordPayload := &protocol.SSHPasswordPayload{
  700. SessionId: config.SessionID,
  701. SshPassword: dialParams.ServerEntry.SshPassword,
  702. ClientCapabilities: []string{protocol.CLIENT_CAPABILITY_SERVER_REQUESTS},
  703. }
  704. payload, err := json.Marshal(sshPasswordPayload)
  705. if err != nil {
  706. return nil, errors.Trace(err)
  707. }
  708. sshClientConfig := &ssh.ClientConfig{
  709. User: dialParams.ServerEntry.SshUsername,
  710. Auth: []ssh.AuthMethod{
  711. ssh.Password(string(payload)),
  712. },
  713. HostKeyCallback: sshCertChecker.CheckHostKey,
  714. ClientVersion: dialParams.SSHClientVersion,
  715. }
  716. sshClientConfig.KEXPRNGSeed = dialParams.SSHKEXSeed
  717. if protocol.TunnelProtocolUsesObfuscatedSSH(dialParams.TunnelProtocol) {
  718. if config.ObfuscatedSSHAlgorithms != nil {
  719. sshClientConfig.KeyExchanges = []string{config.ObfuscatedSSHAlgorithms[0]}
  720. sshClientConfig.Ciphers = []string{config.ObfuscatedSSHAlgorithms[1]}
  721. sshClientConfig.MACs = []string{config.ObfuscatedSSHAlgorithms[2]}
  722. sshClientConfig.HostKeyAlgorithms = []string{config.ObfuscatedSSHAlgorithms[3]}
  723. } else {
  724. // With Encrypt-then-MAC hash algorithms, packet length is
  725. // transmitted in plaintext, which aids in traffic analysis.
  726. //
  727. // TUNNEL_PROTOCOL_SSH is excepted since its KEX appears in plaintext,
  728. // and the protocol is intended to look like SSH on the wire.
  729. sshClientConfig.NoEncryptThenMACHash = true
  730. }
  731. } else {
  732. // For TUNNEL_PROTOCOL_SSH only, the server is expected to randomize
  733. // its KEX; setting PeerKEXPRNGSeed will ensure successful negotiation
  734. // betweem two randomized KEXes.
  735. if dialParams.ServerEntry.SshObfuscatedKey != "" {
  736. sshClientConfig.PeerKEXPRNGSeed, err = protocol.DeriveSSHServerKEXPRNGSeed(
  737. dialParams.ServerEntry.SshObfuscatedKey)
  738. if err != nil {
  739. return nil, errors.Trace(err)
  740. }
  741. }
  742. }
  743. // The ssh session establishment (via ssh.NewClientConn) is wrapped
  744. // in a timeout to ensure it won't hang. We've encountered firewalls
  745. // that allow the TCP handshake to complete but then send a RST to the
  746. // server-side and nothing to the client-side, and if that happens
  747. // while ssh.NewClientConn is reading, it may wait forever. The timeout
  748. // closes the conn, which interrupts it.
  749. // Note: TCP handshake timeouts are provided by TCPConn, and session
  750. // timeouts *after* ssh establishment are provided by the ssh keep alive
  751. // in operate tunnel.
  752. type sshNewClientResult struct {
  753. sshClient *ssh.Client
  754. sshRequests <-chan *ssh.Request
  755. livenessTestMetrics *livenessTestMetrics
  756. err error
  757. }
  758. resultChannel := make(chan sshNewClientResult)
  759. // Call NewClientConn in a goroutine, as it blocks on SSH handshake network
  760. // operations, and would block canceling or shutdown. If the parent context
  761. // is canceled, close the net.Conn underlying SSH, which will interrupt the
  762. // SSH handshake that may be blocking NewClientConn.
  763. go func() {
  764. // The following is adapted from ssh.Dial(), here using a custom conn
  765. // The sshAddress is passed through to host key verification callbacks; we don't use it.
  766. sshAddress := ""
  767. sshClientConn, sshChannels, sshRequests, err := ssh.NewClientConn(
  768. sshConn, sshAddress, sshClientConfig)
  769. var sshClient *ssh.Client
  770. var metrics *livenessTestMetrics
  771. if err == nil {
  772. // sshRequests is handled by operateTunnel.
  773. // ssh.NewClient also expects to handle the sshRequests
  774. // value from ssh.NewClientConn and will spawn a goroutine
  775. // to handle the <-chan *ssh.Request, so we must provide
  776. // a closed channel to ensure that goroutine halts instead
  777. // of hanging on a nil channel.
  778. noRequests := make(chan *ssh.Request)
  779. close(noRequests)
  780. sshClient = ssh.NewClient(sshClientConn, sshChannels, noRequests)
  781. if livenessTestMaxUpstreamBytes > 0 || livenessTestMaxDownstreamBytes > 0 {
  782. // When configured, perform a liveness test which sends and
  783. // receives bytes through the tunnel to ensure the tunnel had
  784. // not been blocked upon or shortly after connecting. This
  785. // test is performed concurrently for each establishment
  786. // candidate before selecting a successful tunnel.
  787. //
  788. // Note that the liveness test is subject to the
  789. // TunnelConnectTimeout, which should be adjusted
  790. // accordinging.
  791. metrics, err = performLivenessTest(
  792. sshClient,
  793. livenessTestMinUpstreamBytes, livenessTestMaxUpstreamBytes,
  794. livenessTestMinDownstreamBytes, livenessTestMaxDownstreamBytes,
  795. dialParams.LivenessTestSeed)
  796. // Skip notice when cancelling.
  797. if baseCtx.Err() == nil {
  798. NoticeLivenessTest(
  799. dialParams.ServerEntry.GetDiagnosticID(), metrics, err == nil)
  800. }
  801. }
  802. }
  803. resultChannel <- sshNewClientResult{sshClient, sshRequests, metrics, err}
  804. }()
  805. var result sshNewClientResult
  806. select {
  807. case result = <-resultChannel:
  808. case <-ctx.Done():
  809. // Interrupt the goroutine and capture its error context to
  810. // distinguish point of failure.
  811. err := ctx.Err()
  812. sshConn.Close()
  813. result = <-resultChannel
  814. if result.err != nil {
  815. result.err = fmt.Errorf("%s: %s", err, result.err)
  816. } else {
  817. result.err = err
  818. }
  819. }
  820. if result.err != nil {
  821. failedTunnelLivenessTestMetrics = result.livenessTestMetrics
  822. return nil, errors.Trace(result.err)
  823. }
  824. dialSucceeded = true
  825. NoticeConnectedServer(dialParams)
  826. cleanupConn = nil
  827. // Note: dialConn may be used to close the underlying network connection
  828. // but should not be used to perform I/O as that would interfere with SSH
  829. // (and also bypasses throttling).
  830. return &dialResult{
  831. dialConn: dialConn,
  832. monitoredConn: monitoredConn,
  833. sshClient: result.sshClient,
  834. sshRequests: result.sshRequests,
  835. livenessTestMetrics: result.livenessTestMetrics},
  836. nil
  837. }
  838. // Fields are exported for JSON encoding in NoticeLivenessTest.
  839. type livenessTestMetrics struct {
  840. Duration string
  841. UpstreamBytes int
  842. SentUpstreamBytes int
  843. DownstreamBytes int
  844. ReceivedDownstreamBytes int
  845. }
  846. func performLivenessTest(
  847. sshClient *ssh.Client,
  848. minUpstreamBytes, maxUpstreamBytes int,
  849. minDownstreamBytes, maxDownstreamBytes int,
  850. livenessTestPRNGSeed *prng.Seed) (*livenessTestMetrics, error) {
  851. metrics := new(livenessTestMetrics)
  852. defer func(startTime time.Time) {
  853. metrics.Duration = time.Since(startTime).String()
  854. }(time.Now())
  855. PRNG := prng.NewPRNGWithSeed(livenessTestPRNGSeed)
  856. metrics.UpstreamBytes = PRNG.Range(minUpstreamBytes, maxUpstreamBytes)
  857. metrics.DownstreamBytes = PRNG.Range(minDownstreamBytes, maxDownstreamBytes)
  858. request := &protocol.RandomStreamRequest{
  859. UpstreamBytes: metrics.UpstreamBytes,
  860. DownstreamBytes: metrics.DownstreamBytes,
  861. }
  862. extraData, err := json.Marshal(request)
  863. if err != nil {
  864. return metrics, errors.Trace(err)
  865. }
  866. channel, requests, err := sshClient.OpenChannel(
  867. protocol.RANDOM_STREAM_CHANNEL_TYPE, extraData)
  868. if err != nil {
  869. return metrics, errors.Trace(err)
  870. }
  871. defer channel.Close()
  872. go ssh.DiscardRequests(requests)
  873. // In consideration of memory-constrained environments, use a modest-sized
  874. // copy buffer since many tunnel establishment workers may run the
  875. // liveness test concurrently.
  876. var buffer [8192]byte
  877. if metrics.UpstreamBytes > 0 {
  878. n, err := common.CopyNBuffer(channel, rand.Reader, int64(metrics.UpstreamBytes), buffer[:])
  879. metrics.SentUpstreamBytes = int(n)
  880. if err != nil {
  881. return metrics, errors.Trace(err)
  882. }
  883. }
  884. if metrics.DownstreamBytes > 0 {
  885. n, err := common.CopyNBuffer(ioutil.Discard, channel, int64(metrics.DownstreamBytes), buffer[:])
  886. metrics.ReceivedDownstreamBytes = int(n)
  887. if err != nil {
  888. return metrics, errors.Trace(err)
  889. }
  890. }
  891. return metrics, nil
  892. }
  893. // operateTunnel monitors the health of the tunnel and performs
  894. // periodic work.
  895. //
  896. // BytesTransferred and TotalBytesTransferred notices are emitted
  897. // for live reporting and diagnostics reporting, respectively.
  898. //
  899. // Status requests are sent to the Psiphon API to report bytes
  900. // transferred.
  901. //
  902. // Periodic SSH keep alive packets are sent to ensure the underlying
  903. // TCP connection isn't terminated by NAT, or other network
  904. // interference -- or test if it has been terminated while the device
  905. // has been asleep. When a keep alive times out, the tunnel is
  906. // considered failed.
  907. //
  908. // An immediate SSH keep alive "probe" is sent to test the tunnel and
  909. // server responsiveness when a port forward failure is detected: a
  910. // failed dial or failed read/write. This keep alive has a shorter
  911. // timeout.
  912. //
  913. // Note that port forward failures may be due to non-failure conditions.
  914. // For example, when the user inputs an invalid domain name and
  915. // resolution is done by the ssh server; or trying to connect to a
  916. // non-white-listed port; and the error message in these cases is not
  917. // distinguishable from a a true server error (a common error message,
  918. // "ssh: rejected: administratively prohibited (open failed)", may be
  919. // returned for these cases but also if the server has run out of
  920. // ephemeral ports, for example).
  921. //
  922. // SSH keep alives are not sent when the tunnel has been recently
  923. // active (not only does tunnel activity obviate the necessity of a keep
  924. // alive, testing has shown that keep alives may time out for "busy"
  925. // tunnels, especially over meek protocol and other high latency
  926. // conditions).
  927. //
  928. // "Recently active" is defined has having received payload bytes. Sent
  929. // bytes are not considered as testing has shown bytes may appear to
  930. // send when certain NAT devices have interfered with the tunnel, while
  931. // no bytes are received. In a pathological case, with DNS implemented
  932. // as tunneled UDP, a browser may wait excessively for a domain name to
  933. // resolve, while no new port forward is attempted which would otherwise
  934. // result in a tunnel failure detection.
  935. //
  936. // TODO: change "recently active" to include having received any
  937. // SSH protocol messages from the server, not just user payload?
  938. //
  939. func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
  940. defer tunnel.operateWaitGroup.Done()
  941. now := time.Now()
  942. lastBytesReceivedTime := now
  943. lastTotalBytesTransferedTime := now
  944. totalSent := int64(0)
  945. totalReceived := int64(0)
  946. setDialParamsSucceeded := false
  947. noticeBytesTransferredTicker := time.NewTicker(1 * time.Second)
  948. defer noticeBytesTransferredTicker.Stop()
  949. // The next status request and ssh keep alive times are picked at random,
  950. // from a range, to make the resulting traffic less fingerprintable,
  951. // Note: not using Tickers since these are not fixed time periods.
  952. nextStatusRequestPeriod := func() time.Duration {
  953. p := tunnel.getCustomClientParameters()
  954. return prng.Period(
  955. p.Duration(parameters.PsiphonAPIStatusRequestPeriodMin),
  956. p.Duration(parameters.PsiphonAPIStatusRequestPeriodMax))
  957. }
  958. statsTimer := time.NewTimer(nextStatusRequestPeriod())
  959. defer statsTimer.Stop()
  960. // Schedule an almost-immediate status request to deliver any unreported
  961. // persistent stats.
  962. unreported := CountUnreportedPersistentStats()
  963. if unreported > 0 {
  964. NoticeInfo("Unreported persistent stats: %d", unreported)
  965. p := tunnel.getCustomClientParameters()
  966. statsTimer.Reset(
  967. prng.Period(
  968. p.Duration(parameters.PsiphonAPIStatusRequestShortPeriodMin),
  969. p.Duration(parameters.PsiphonAPIStatusRequestShortPeriodMax)))
  970. }
  971. nextSshKeepAlivePeriod := func() time.Duration {
  972. p := tunnel.getCustomClientParameters()
  973. return prng.Period(
  974. p.Duration(parameters.SSHKeepAlivePeriodMin),
  975. p.Duration(parameters.SSHKeepAlivePeriodMax))
  976. }
  977. // TODO: don't initialize timer when config.DisablePeriodicSshKeepAlive is set
  978. sshKeepAliveTimer := time.NewTimer(nextSshKeepAlivePeriod())
  979. if tunnel.config.DisablePeriodicSshKeepAlive {
  980. sshKeepAliveTimer.Stop()
  981. } else {
  982. defer sshKeepAliveTimer.Stop()
  983. }
  984. // Perform network requests in separate goroutines so as not to block
  985. // other operations.
  986. requestsWaitGroup := new(sync.WaitGroup)
  987. requestsWaitGroup.Add(1)
  988. signalStatusRequest := make(chan struct{})
  989. go func() {
  990. defer requestsWaitGroup.Done()
  991. for range signalStatusRequest {
  992. sendStats(tunnel)
  993. }
  994. }()
  995. requestsWaitGroup.Add(1)
  996. signalPeriodicSshKeepAlive := make(chan time.Duration)
  997. sshKeepAliveError := make(chan error, 1)
  998. go func() {
  999. defer requestsWaitGroup.Done()
  1000. isFirstPeriodicKeepAlive := true
  1001. for timeout := range signalPeriodicSshKeepAlive {
  1002. bytesUp := atomic.LoadInt64(&totalSent)
  1003. bytesDown := atomic.LoadInt64(&totalReceived)
  1004. err := tunnel.sendSshKeepAlive(
  1005. isFirstPeriodicKeepAlive, false, timeout, bytesUp, bytesDown)
  1006. if err != nil {
  1007. select {
  1008. case sshKeepAliveError <- err:
  1009. default:
  1010. }
  1011. }
  1012. isFirstPeriodicKeepAlive = false
  1013. }
  1014. }()
  1015. // Probe-type SSH keep alives have a distinct send worker and may be sent
  1016. // concurrently, to ensure a long period keep alive timeout doesn't delay
  1017. // failed tunnel detection.
  1018. requestsWaitGroup.Add(1)
  1019. signalProbeSshKeepAlive := make(chan time.Duration)
  1020. go func() {
  1021. defer requestsWaitGroup.Done()
  1022. for timeout := range signalProbeSshKeepAlive {
  1023. bytesUp := atomic.LoadInt64(&totalSent)
  1024. bytesDown := atomic.LoadInt64(&totalReceived)
  1025. err := tunnel.sendSshKeepAlive(
  1026. false, true, timeout, bytesUp, bytesDown)
  1027. if err != nil {
  1028. select {
  1029. case sshKeepAliveError <- err:
  1030. default:
  1031. }
  1032. }
  1033. }
  1034. }()
  1035. shutdown := false
  1036. var err error
  1037. for !shutdown && err == nil {
  1038. select {
  1039. case <-noticeBytesTransferredTicker.C:
  1040. sent, received := transferstats.ReportRecentBytesTransferredForServer(
  1041. tunnel.dialParams.ServerEntry.IpAddress)
  1042. if received > 0 {
  1043. lastBytesReceivedTime = time.Now()
  1044. }
  1045. bytesUp := atomic.AddInt64(&totalSent, sent)
  1046. bytesDown := atomic.AddInt64(&totalReceived, received)
  1047. p := tunnel.getCustomClientParameters()
  1048. noticePeriod := p.Duration(parameters.TotalBytesTransferredNoticePeriod)
  1049. replayTargetUpstreamBytes := p.Int(parameters.ReplayTargetUpstreamBytes)
  1050. replayTargetDownstreamBytes := p.Int(parameters.ReplayTargetDownstreamBytes)
  1051. replayTargetTunnelDuration := p.Duration(parameters.ReplayTargetTunnelDuration)
  1052. if lastTotalBytesTransferedTime.Add(noticePeriod).Before(time.Now()) {
  1053. NoticeTotalBytesTransferred(
  1054. tunnel.dialParams.ServerEntry.GetDiagnosticID(), bytesUp, bytesDown)
  1055. lastTotalBytesTransferedTime = time.Now()
  1056. }
  1057. // Only emit the frequent BytesTransferred notice when tunnel is not idle.
  1058. if tunnel.config.EmitBytesTransferred && (sent > 0 || received > 0) {
  1059. NoticeBytesTransferred(
  1060. tunnel.dialParams.ServerEntry.GetDiagnosticID(), sent, received)
  1061. }
  1062. // Once the tunnel has connected, activated, successfully transmitted the
  1063. // targeted number of bytes, and been up for the targeted duration
  1064. // (measured from the end of establishment), store its dial parameters for
  1065. // subsequent replay.
  1066. //
  1067. // Even when target bytes and duration are all 0, the tunnel must remain up
  1068. // for at least 1 second due to use of noticeBytesTransferredTicker; for
  1069. // the same reason the granularity of ReplayTargetTunnelDuration is
  1070. // seconds.
  1071. if !setDialParamsSucceeded &&
  1072. bytesUp >= int64(replayTargetUpstreamBytes) &&
  1073. bytesDown >= int64(replayTargetDownstreamBytes) &&
  1074. time.Since(tunnel.establishedTime) >= replayTargetTunnelDuration {
  1075. tunnel.dialParams.Succeeded()
  1076. setDialParamsSucceeded = true
  1077. }
  1078. case <-statsTimer.C:
  1079. select {
  1080. case signalStatusRequest <- struct{}{}:
  1081. default:
  1082. }
  1083. statsTimer.Reset(nextStatusRequestPeriod())
  1084. case <-sshKeepAliveTimer.C:
  1085. p := tunnel.getCustomClientParameters()
  1086. inactivePeriod := p.Duration(parameters.SSHKeepAlivePeriodicInactivePeriod)
  1087. if lastBytesReceivedTime.Add(inactivePeriod).Before(time.Now()) {
  1088. timeout := p.Duration(parameters.SSHKeepAlivePeriodicTimeout)
  1089. select {
  1090. case signalPeriodicSshKeepAlive <- timeout:
  1091. default:
  1092. }
  1093. }
  1094. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  1095. case <-tunnel.signalPortForwardFailure:
  1096. // Note: no mutex on portForwardFailureTotal; only referenced here
  1097. tunnel.totalPortForwardFailures++
  1098. NoticeInfo("port forward failures for %s: %d",
  1099. tunnel.dialParams.ServerEntry.GetDiagnosticID(),
  1100. tunnel.totalPortForwardFailures)
  1101. // If the underlying Conn has closed (meek and other plugin protocols may
  1102. // close themselves in certain error conditions), the tunnel has certainly
  1103. // failed. Otherwise, probe with an SSH keep alive.
  1104. //
  1105. // TODO: the IsClosed case omits the failed tunnel logging and reset
  1106. // actions performed by sendSshKeepAlive. Should self-closing protocols
  1107. // perform these actions themselves?
  1108. if tunnel.conn.IsClosed() {
  1109. err = errors.TraceNew("underlying conn is closed")
  1110. } else {
  1111. p := tunnel.getCustomClientParameters()
  1112. inactivePeriod := p.Duration(parameters.SSHKeepAliveProbeInactivePeriod)
  1113. if lastBytesReceivedTime.Add(inactivePeriod).Before(time.Now()) {
  1114. timeout := p.Duration(parameters.SSHKeepAliveProbeTimeout)
  1115. select {
  1116. case signalProbeSshKeepAlive <- timeout:
  1117. default:
  1118. }
  1119. }
  1120. if !tunnel.config.DisablePeriodicSshKeepAlive {
  1121. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  1122. }
  1123. }
  1124. case err = <-sshKeepAliveError:
  1125. case serverRequest := <-tunnel.sshServerRequests:
  1126. if serverRequest != nil {
  1127. err := HandleServerRequest(tunnelOwner, tunnel, serverRequest.Type, serverRequest.Payload)
  1128. if err == nil {
  1129. serverRequest.Reply(true, nil)
  1130. } else {
  1131. NoticeWarning("HandleServerRequest for %s failed: %s", serverRequest.Type, err)
  1132. serverRequest.Reply(false, nil)
  1133. }
  1134. }
  1135. case <-tunnel.operateCtx.Done():
  1136. shutdown = true
  1137. }
  1138. }
  1139. close(signalPeriodicSshKeepAlive)
  1140. close(signalProbeSshKeepAlive)
  1141. close(signalStatusRequest)
  1142. requestsWaitGroup.Wait()
  1143. // Capture bytes transferred since the last noticeBytesTransferredTicker tick
  1144. sent, received := transferstats.ReportRecentBytesTransferredForServer(
  1145. tunnel.dialParams.ServerEntry.IpAddress)
  1146. bytesUp := atomic.AddInt64(&totalSent, sent)
  1147. bytesDown := atomic.AddInt64(&totalReceived, received)
  1148. // Always emit a final NoticeTotalBytesTransferred
  1149. NoticeTotalBytesTransferred(
  1150. tunnel.dialParams.ServerEntry.GetDiagnosticID(), bytesUp, bytesDown)
  1151. if err == nil {
  1152. NoticeInfo("shutdown operate tunnel")
  1153. // This commanded shutdown case is initiated by Tunnel.Close, which will
  1154. // wait up to parameters.TunnelOperateShutdownTimeout to allow the following
  1155. // requests to complete.
  1156. // Send a final status request in order to report any outstanding persistent
  1157. // stats and domain bytes transferred as soon as possible.
  1158. sendStats(tunnel)
  1159. // The controller connectedReporter may have initiated a connected request
  1160. // concurrent to this commanded shutdown. SetInFlightConnectedRequest
  1161. // ensures that a connected request doesn't start after the commanded
  1162. // shutdown. AwaitInFlightConnectedRequest blocks until any in flight
  1163. // request completes or is aborted after TunnelOperateShutdownTimeout.
  1164. //
  1165. // As any connected request is performed by a concurrent goroutine,
  1166. // sendStats is called first and AwaitInFlightConnectedRequest second.
  1167. tunnel.AwaitInFlightConnectedRequest()
  1168. } else {
  1169. NoticeWarning("operate tunnel error for %s: %s",
  1170. tunnel.dialParams.ServerEntry.GetDiagnosticID(), err)
  1171. tunnelOwner.SignalTunnelFailure(tunnel)
  1172. }
  1173. }
  1174. // sendSshKeepAlive is a helper which sends a keepalive@openssh.com request
  1175. // on the specified SSH connections and returns true of the request succeeds
  1176. // within a specified timeout. If the request fails, the associated conn is
  1177. // closed, which will terminate the associated tunnel.
  1178. func (tunnel *Tunnel) sendSshKeepAlive(
  1179. isFirstPeriodicKeepAlive bool,
  1180. isProbeKeepAlive bool,
  1181. timeout time.Duration,
  1182. bytesUp int64,
  1183. bytesDown int64) error {
  1184. p := tunnel.getCustomClientParameters()
  1185. // Random padding to frustrate fingerprinting.
  1186. request := prng.Padding(
  1187. p.Int(parameters.SSHKeepAlivePaddingMinBytes),
  1188. p.Int(parameters.SSHKeepAlivePaddingMaxBytes))
  1189. speedTestSample := isFirstPeriodicKeepAlive
  1190. if !speedTestSample {
  1191. speedTestSample = p.WeightedCoinFlip(
  1192. parameters.SSHKeepAliveSpeedTestSampleProbability)
  1193. }
  1194. networkConnectivityPollPeriod := p.Duration(
  1195. parameters.SSHKeepAliveNetworkConnectivityPollingPeriod)
  1196. resetOnFailure := p.WeightedCoinFlip(
  1197. parameters.SSHKeepAliveResetOnFailureProbability)
  1198. p.Close()
  1199. // Note: there is no request context since SSH requests cannot be interrupted
  1200. // directly. Closing the tunnel will interrupt the request. A timeout is set
  1201. // to unblock this function, but the goroutine may not exit until the tunnel
  1202. // is closed.
  1203. // Use a buffer of 1 as there are two senders and only one guaranteed receive.
  1204. errChannel := make(chan error, 1)
  1205. afterFunc := time.AfterFunc(timeout, func() {
  1206. errChannel <- errors.TraceNew("timed out")
  1207. })
  1208. defer afterFunc.Stop()
  1209. go func() {
  1210. startTime := time.Now()
  1211. // Note: reading a reply is important for last-received-time tunnel
  1212. // duration calculation.
  1213. requestOk, response, err := tunnel.sshClient.SendRequest(
  1214. "keepalive@openssh.com", true, request)
  1215. elapsedTime := time.Since(startTime)
  1216. errChannel <- err
  1217. success := (err == nil && requestOk)
  1218. if success && isProbeKeepAlive {
  1219. NoticeInfo("Probe SSH keep-alive RTT: %s", elapsedTime)
  1220. }
  1221. // Record the keep alive round trip as a speed test sample. The first
  1222. // periodic keep alive is always recorded, as many tunnels are short-lived
  1223. // and we want to ensure that some data is gathered. Subsequent keep alives
  1224. // are recorded with some configurable probability, which, considering that
  1225. // only the last SpeedTestMaxSampleCount samples are retained, enables
  1226. // tuning the sampling frequency.
  1227. if success && speedTestSample {
  1228. err = tactics.AddSpeedTestSample(
  1229. tunnel.config.GetClientParameters(),
  1230. GetTacticsStorer(tunnel.config),
  1231. tunnel.config.GetNetworkID(),
  1232. tunnel.dialParams.ServerEntry.Region,
  1233. tunnel.dialParams.TunnelProtocol,
  1234. elapsedTime,
  1235. request,
  1236. response)
  1237. if err != nil {
  1238. NoticeWarning("AddSpeedTestSample failed: %s", errors.Trace(err))
  1239. }
  1240. }
  1241. }()
  1242. // While awaiting the response, poll the network connectivity state. If there
  1243. // is network connectivity, on the same network, for the entire duration of
  1244. // the keep alive request and the request fails, record a failed tunnel
  1245. // event.
  1246. //
  1247. // The network connectivity heuristic is intended to reduce the number of
  1248. // failed tunnels reported due to routine situations such as varying mobile
  1249. // network conditions. The polling may produce false positives if the network
  1250. // goes down and up between polling periods, or changes to a new network and
  1251. // back to the previous network between polling periods.
  1252. //
  1253. // For platforms that don't provide a NetworkConnectivityChecker, it is
  1254. // assumed that there is network connectivity.
  1255. //
  1256. // The approximate number of tunneled bytes successfully sent and received is
  1257. // recorded in the failed tunnel event as a quality indicator.
  1258. ticker := time.NewTicker(networkConnectivityPollPeriod)
  1259. defer ticker.Stop()
  1260. continuousNetworkConnectivity := true
  1261. networkID := tunnel.config.GetNetworkID()
  1262. var err error
  1263. loop:
  1264. for {
  1265. select {
  1266. case err = <-errChannel:
  1267. break loop
  1268. case <-ticker.C:
  1269. connectivityChecker := tunnel.config.NetworkConnectivityChecker
  1270. if (connectivityChecker != nil &&
  1271. connectivityChecker.HasNetworkConnectivity() != 1) ||
  1272. (networkID != tunnel.config.GetNetworkID()) {
  1273. continuousNetworkConnectivity = false
  1274. }
  1275. }
  1276. }
  1277. err = errors.Trace(err)
  1278. if err != nil {
  1279. tunnel.sshClient.Close()
  1280. tunnel.conn.Close()
  1281. // Don't perform log or reset actions when the keep alive may have been
  1282. // interrupted due to shutdown.
  1283. isShutdown := false
  1284. select {
  1285. case <-tunnel.operateCtx.Done():
  1286. isShutdown = true
  1287. default:
  1288. }
  1289. // Ensure that at most one of the two SSH keep alive workers (periodic and
  1290. // probe) perform the log and reset actions.
  1291. wasHandled := atomic.CompareAndSwapInt32(&tunnel.handledSSHKeepAliveFailure, 0, 1)
  1292. if continuousNetworkConnectivity &&
  1293. !isShutdown &&
  1294. !wasHandled {
  1295. _ = RecordFailedTunnelStat(
  1296. tunnel.config,
  1297. tunnel.dialParams,
  1298. tunnel.livenessTestMetrics,
  1299. bytesUp,
  1300. bytesDown,
  1301. err)
  1302. // SSHKeepAliveResetOnFailureProbability is set when a late-lifecycle
  1303. // impaired protocol attack is suspected. With the given probability, reset
  1304. // server affinity and replay parameters for this server to avoid
  1305. // continuously reconnecting to the server and/or using the same protocol
  1306. // and dial parameters.
  1307. if resetOnFailure {
  1308. NoticeInfo("Delete dial parameters for %s", tunnel.dialParams.ServerEntry.GetDiagnosticID())
  1309. err := DeleteDialParameters(tunnel.dialParams.ServerEntry.IpAddress, tunnel.dialParams.NetworkID)
  1310. if err != nil {
  1311. NoticeWarning("DeleteDialParameters failed: %s", err)
  1312. }
  1313. NoticeInfo("Delete server affinity for %s", tunnel.dialParams.ServerEntry.GetDiagnosticID())
  1314. err = DeleteServerEntryAffinity(tunnel.dialParams.ServerEntry.IpAddress)
  1315. if err != nil {
  1316. NoticeWarning("DeleteServerEntryAffinity failed: %s", err)
  1317. }
  1318. }
  1319. }
  1320. }
  1321. return err
  1322. }
  1323. // sendStats is a helper for sending session stats to the server.
  1324. func sendStats(tunnel *Tunnel) bool {
  1325. // Tunnel does not have a serverContext when DisableApi is set
  1326. if tunnel.serverContext == nil {
  1327. return true
  1328. }
  1329. // Skip when tunnel is discarded
  1330. if tunnel.IsDiscarded() {
  1331. return true
  1332. }
  1333. err := tunnel.serverContext.DoStatusRequest(tunnel)
  1334. if err != nil {
  1335. NoticeWarning("DoStatusRequest failed for %s: %s",
  1336. tunnel.dialParams.ServerEntry.GetDiagnosticID(), err)
  1337. }
  1338. return err == nil
  1339. }