tunnel.go 62 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "context"
  23. "crypto/rand"
  24. "encoding/base64"
  25. "encoding/json"
  26. std_errors "errors"
  27. "fmt"
  28. "io"
  29. "io/ioutil"
  30. "net"
  31. "net/http"
  32. "sync"
  33. "sync/atomic"
  34. "time"
  35. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  36. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/ssh"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/refraction"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
  45. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/transferstats"
  46. )
  47. // Tunneler specifies the interface required by components that use tunnels.
  48. type Tunneler interface {
  49. // Dial creates a tunneled connection.
  50. //
  51. // When split tunnel mode is enabled, the connection may be untunneled,
  52. // depending on GeoIP classification of the destination.
  53. //
  54. // downstreamConn is an optional parameter which specifies a connection to be
  55. // explicitly closed when the Dialed connection is closed. For instance, this
  56. // is used to close downstreamConn App<->LocalProxy connections when the related
  57. // LocalProxy<->SshPortForward connections close.
  58. Dial(remoteAddr string, downstreamConn net.Conn) (conn net.Conn, err error)
  59. DirectDial(remoteAddr string) (conn net.Conn, err error)
  60. SignalComponentFailure()
  61. }
  62. // TunnelOwner specifies the interface required by Tunnel to notify its
  63. // owner when it has failed. The owner may, as in the case of the Controller,
  64. // remove the tunnel from its list of active tunnels.
  65. type TunnelOwner interface {
  66. SignalSeededNewSLOK()
  67. SignalTunnelFailure(tunnel *Tunnel)
  68. }
  69. // Tunnel is a connection to a Psiphon server. An established
  70. // tunnel includes a network connection to the specified server
  71. // and an SSH session built on top of that transport.
  72. type Tunnel struct {
  73. mutex *sync.Mutex
  74. config *Config
  75. isActivated bool
  76. isDiscarded bool
  77. isClosed bool
  78. dialParams *DialParameters
  79. livenessTestMetrics *livenessTestMetrics
  80. extraFailureAction func()
  81. serverContext *ServerContext
  82. monitoringStartTime time.Time
  83. conn *common.BurstMonitoredConn
  84. sshClient *ssh.Client
  85. sshServerRequests <-chan *ssh.Request
  86. operateWaitGroup *sync.WaitGroup
  87. operateCtx context.Context
  88. stopOperate context.CancelFunc
  89. signalPortForwardFailure chan struct{}
  90. totalPortForwardFailures int
  91. adjustedEstablishStartTime time.Time
  92. establishDuration time.Duration
  93. establishedTime time.Time
  94. handledSSHKeepAliveFailure int32
  95. inFlightConnectedRequestSignal chan struct{}
  96. }
  97. // getCustomParameters helpers wrap the verbose function call chain required
  98. // to get a current snapshot of the parameters.Parameters customized with the
  99. // dial parameters associated with a tunnel.
  100. func (tunnel *Tunnel) getCustomParameters() parameters.ParametersAccessor {
  101. return getCustomParameters(tunnel.config, tunnel.dialParams)
  102. }
  103. func getCustomParameters(
  104. config *Config, dialParams *DialParameters) parameters.ParametersAccessor {
  105. return config.GetParameters().GetCustom(dialParams.NetworkLatencyMultiplier)
  106. }
  107. // ConnectTunnel first makes a network transport connection to the
  108. // Psiphon server and then establishes an SSH client session on top of
  109. // that transport. The SSH server is authenticated using the public
  110. // key in the server entry.
  111. // Depending on the server's capabilities, the connection may use
  112. // plain SSH over TCP, obfuscated SSH over TCP, or obfuscated SSH over
  113. // HTTP (meek protocol).
  114. // When requiredProtocol is not blank, that protocol is used. Otherwise,
  115. // the a random supported protocol is used.
  116. //
  117. // Call Activate on a connected tunnel to complete its establishment
  118. // before using.
  119. //
  120. // Tunnel establishment is split into two phases: connection, and
  121. // activation. The Controller will run many ConnectTunnel calls
  122. // concurrently and then, to avoid unnecessary overhead from making
  123. // handshake requests and starting operateTunnel from tunnels which
  124. // may be discarded, call Activate on connected tunnels sequentially
  125. // as necessary.
  126. func ConnectTunnel(
  127. ctx context.Context,
  128. config *Config,
  129. adjustedEstablishStartTime time.Time,
  130. dialParams *DialParameters) (*Tunnel, error) {
  131. // Build transport layers and establish SSH connection. Note that
  132. // dialConn and monitoredConn are the same network connection.
  133. dialResult, err := dialTunnel(ctx, config, dialParams)
  134. if err != nil {
  135. return nil, errors.Trace(err)
  136. }
  137. // The tunnel is now connected
  138. return &Tunnel{
  139. mutex: new(sync.Mutex),
  140. config: config,
  141. dialParams: dialParams,
  142. livenessTestMetrics: dialResult.livenessTestMetrics,
  143. extraFailureAction: dialResult.extraFailureAction,
  144. monitoringStartTime: dialResult.monitoringStartTime,
  145. conn: dialResult.monitoredConn,
  146. sshClient: dialResult.sshClient,
  147. sshServerRequests: dialResult.sshRequests,
  148. // A buffer allows at least one signal to be sent even when the receiver is
  149. // not listening. Senders should not block.
  150. signalPortForwardFailure: make(chan struct{}, 1),
  151. adjustedEstablishStartTime: adjustedEstablishStartTime,
  152. }, nil
  153. }
  154. // Activate completes the tunnel establishment, performing the handshake
  155. // request and starting operateTunnel, the worker that monitors the tunnel
  156. // and handles periodic management.
  157. func (tunnel *Tunnel) Activate(
  158. ctx context.Context, tunnelOwner TunnelOwner) (retErr error) {
  159. // Ensure that, unless the base context is cancelled, any replayed dial
  160. // parameters are cleared, no longer to be retried, if the tunnel fails to
  161. // activate.
  162. activationSucceeded := false
  163. baseCtx := ctx
  164. defer func() {
  165. if !activationSucceeded && baseCtx.Err() != context.Canceled {
  166. tunnel.dialParams.Failed(tunnel.config)
  167. if tunnel.extraFailureAction != nil {
  168. tunnel.extraFailureAction()
  169. }
  170. if retErr != nil {
  171. _ = RecordFailedTunnelStat(
  172. tunnel.config,
  173. tunnel.dialParams,
  174. tunnel.livenessTestMetrics,
  175. -1,
  176. -1,
  177. retErr)
  178. }
  179. }
  180. }()
  181. // Create a new Psiphon API server context for this tunnel. This includes
  182. // performing a handshake request. If the handshake fails, this activation
  183. // fails.
  184. var serverContext *ServerContext
  185. if !tunnel.config.DisableApi {
  186. NoticeInfo(
  187. "starting server context for %s",
  188. tunnel.dialParams.ServerEntry.GetDiagnosticID())
  189. // Call NewServerContext in a goroutine, as it blocks on a network operation,
  190. // the handshake request, and would block shutdown. If the shutdown signal is
  191. // received, close the tunnel, which will interrupt the handshake request
  192. // that may be blocking NewServerContext.
  193. //
  194. // Timeout after PsiphonApiServerTimeoutSeconds. NewServerContext may not
  195. // return if the tunnel network connection is unstable during the handshake
  196. // request. At this point, there is no operateTunnel monitor that will detect
  197. // this condition with SSH keep alives.
  198. timeout := tunnel.getCustomParameters().Duration(
  199. parameters.PsiphonAPIRequestTimeout)
  200. if timeout > 0 {
  201. var cancelFunc context.CancelFunc
  202. ctx, cancelFunc = context.WithTimeout(ctx, timeout)
  203. defer cancelFunc()
  204. }
  205. type newServerContextResult struct {
  206. serverContext *ServerContext
  207. err error
  208. }
  209. resultChannel := make(chan newServerContextResult)
  210. go func() {
  211. serverContext, err := NewServerContext(tunnel)
  212. resultChannel <- newServerContextResult{
  213. serverContext: serverContext,
  214. err: err,
  215. }
  216. }()
  217. var result newServerContextResult
  218. select {
  219. case result = <-resultChannel:
  220. case <-ctx.Done():
  221. result.err = ctx.Err()
  222. // Interrupt the goroutine
  223. tunnel.Close(true)
  224. <-resultChannel
  225. }
  226. if result.err != nil {
  227. return errors.Trace(result.err)
  228. }
  229. serverContext = result.serverContext
  230. }
  231. // The activation succeeded.
  232. activationSucceeded = true
  233. tunnel.mutex.Lock()
  234. // It may happen that the tunnel gets closed while Activate is running.
  235. // In this case, abort here, to ensure that the operateTunnel goroutine
  236. // will not be launched after Close is called.
  237. if tunnel.isClosed {
  238. return errors.TraceNew("tunnel is closed")
  239. }
  240. tunnel.isActivated = true
  241. tunnel.serverContext = serverContext
  242. // establishDuration is the elapsed time between the controller starting tunnel
  243. // establishment and this tunnel being established. The reported value represents
  244. // how long the user waited between starting the client and having a usable tunnel;
  245. // or how long between the client detecting an unexpected tunnel disconnect and
  246. // completing automatic reestablishment.
  247. //
  248. // This time period may include time spent unsuccessfully connecting to other
  249. // servers. Time spent waiting for network connectivity is excluded.
  250. tunnel.establishDuration = time.Since(tunnel.adjustedEstablishStartTime)
  251. tunnel.establishedTime = time.Now()
  252. // Use the Background context instead of the controller run context, as tunnels
  253. // are terminated when the controller calls tunnel.Close.
  254. tunnel.operateCtx, tunnel.stopOperate = context.WithCancel(context.Background())
  255. tunnel.operateWaitGroup = new(sync.WaitGroup)
  256. // Spawn the operateTunnel goroutine, which monitors the tunnel and handles periodic
  257. // stats updates.
  258. tunnel.operateWaitGroup.Add(1)
  259. go tunnel.operateTunnel(tunnelOwner)
  260. tunnel.mutex.Unlock()
  261. return nil
  262. }
  263. // Close stops operating the tunnel and closes the underlying connection.
  264. // Supports multiple and/or concurrent calls to Close().
  265. // When isDiscarded is set, operateTunnel will not attempt to send final
  266. // status requests.
  267. func (tunnel *Tunnel) Close(isDiscarded bool) {
  268. tunnel.mutex.Lock()
  269. tunnel.isDiscarded = isDiscarded
  270. isActivated := tunnel.isActivated
  271. isClosed := tunnel.isClosed
  272. tunnel.isClosed = true
  273. tunnel.mutex.Unlock()
  274. if !isClosed {
  275. // Signal operateTunnel to stop before closing the tunnel -- this
  276. // allows a final status request to be made in the case of an orderly
  277. // shutdown.
  278. // A timer is set, so if operateTunnel takes too long to stop, the
  279. // tunnel is closed, which will interrupt any slow final status request.
  280. if isActivated {
  281. timeout := tunnel.getCustomParameters().Duration(
  282. parameters.TunnelOperateShutdownTimeout)
  283. afterFunc := time.AfterFunc(
  284. timeout,
  285. func() { tunnel.conn.Close() })
  286. tunnel.stopOperate()
  287. tunnel.operateWaitGroup.Wait()
  288. afterFunc.Stop()
  289. }
  290. tunnel.sshClient.Close()
  291. // tunnel.conn.Close() may get called multiple times, which is allowed.
  292. tunnel.conn.Close()
  293. err := tunnel.sshClient.Wait()
  294. if err != nil {
  295. NoticeWarning("close tunnel ssh error: %s", err)
  296. }
  297. }
  298. // Log burst metrics now that the BurstMonitoredConn is closed.
  299. // Metrics will be empty when burst monitoring is disabled.
  300. if !isDiscarded && isActivated {
  301. burstMetrics := tunnel.conn.GetMetrics(tunnel.monitoringStartTime)
  302. if len(burstMetrics) > 0 {
  303. NoticeBursts(
  304. tunnel.dialParams.ServerEntry.GetDiagnosticID(),
  305. burstMetrics)
  306. }
  307. }
  308. }
  309. // SetInFlightConnectedRequest checks if a connected request can begin and
  310. // sets the channel used to signal that the request is complete.
  311. //
  312. // The caller must not initiate a connected request when
  313. // SetInFlightConnectedRequest returns false. When SetInFlightConnectedRequest
  314. // returns true, the caller must call SetInFlightConnectedRequest(nil) when
  315. // the connected request completes.
  316. func (tunnel *Tunnel) SetInFlightConnectedRequest(requestSignal chan struct{}) bool {
  317. tunnel.mutex.Lock()
  318. defer tunnel.mutex.Unlock()
  319. // If already closing, don't start a connected request: the
  320. // TunnelOperateShutdownTimeout period may be nearly expired.
  321. if tunnel.isClosed {
  322. return false
  323. }
  324. if requestSignal == nil {
  325. // Not already in-flight (not expected)
  326. if tunnel.inFlightConnectedRequestSignal == nil {
  327. return false
  328. }
  329. } else {
  330. // Already in-flight (not expected)
  331. if tunnel.inFlightConnectedRequestSignal != nil {
  332. return false
  333. }
  334. }
  335. tunnel.inFlightConnectedRequestSignal = requestSignal
  336. return true
  337. }
  338. // AwaitInFlightConnectedRequest waits for the signal that any in-flight
  339. // connected request is complete.
  340. //
  341. // AwaitInFlightConnectedRequest may block until the connected request is
  342. // aborted by terminating the tunnel.
  343. func (tunnel *Tunnel) AwaitInFlightConnectedRequest() {
  344. tunnel.mutex.Lock()
  345. requestSignal := tunnel.inFlightConnectedRequestSignal
  346. tunnel.mutex.Unlock()
  347. if requestSignal != nil {
  348. <-requestSignal
  349. }
  350. }
  351. // IsActivated returns the tunnel's activated flag.
  352. func (tunnel *Tunnel) IsActivated() bool {
  353. tunnel.mutex.Lock()
  354. defer tunnel.mutex.Unlock()
  355. return tunnel.isActivated
  356. }
  357. // IsDiscarded returns the tunnel's discarded flag.
  358. func (tunnel *Tunnel) IsDiscarded() bool {
  359. tunnel.mutex.Lock()
  360. defer tunnel.mutex.Unlock()
  361. return tunnel.isDiscarded
  362. }
  363. // SendAPIRequest sends an API request as an SSH request through the tunnel.
  364. // This function blocks awaiting a response. Only one request may be in-flight
  365. // at once; a concurrent SendAPIRequest will block until an active request
  366. // receives its response (or the SSH connection is terminated).
  367. func (tunnel *Tunnel) SendAPIRequest(
  368. name string, requestPayload []byte) ([]byte, error) {
  369. ok, responsePayload, err := tunnel.sshClient.Conn.SendRequest(
  370. name, true, requestPayload)
  371. if err != nil {
  372. return nil, errors.Trace(err)
  373. }
  374. if !ok {
  375. return nil, errors.TraceNew("API request rejected")
  376. }
  377. return responsePayload, nil
  378. }
  379. // DialTCPChannel establishes a TCP port forward connection through the
  380. // tunnel.
  381. //
  382. // When split tunnel mode is enabled, and unless alwaysTunneled is set, the
  383. // server may reject the port forward and indicate that the client is to make
  384. // direct, untunneled connection. In this case, the bool return value is true
  385. // and net.Conn and error are nil.
  386. //
  387. // downstreamConn is an optional parameter which specifies a connection to be
  388. // explicitly closed when the dialed connection is closed.
  389. func (tunnel *Tunnel) DialTCPChannel(
  390. remoteAddr string,
  391. alwaysTunneled bool,
  392. downstreamConn net.Conn) (net.Conn, bool, error) {
  393. channelType := "direct-tcpip"
  394. if alwaysTunneled && tunnel.config.IsSplitTunnelEnabled() {
  395. // This channel type is only necessary in split tunnel mode.
  396. channelType = protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE
  397. }
  398. channel, err := tunnel.dialChannel(channelType, remoteAddr)
  399. if err != nil {
  400. if isSplitTunnelRejectReason(err) {
  401. return nil, true, nil
  402. }
  403. return nil, false, errors.Trace(err)
  404. }
  405. netConn, ok := channel.(net.Conn)
  406. if !ok {
  407. return nil, false, errors.Tracef("unexpected channel type: %T", channel)
  408. }
  409. conn := &TunneledConn{
  410. Conn: netConn,
  411. tunnel: tunnel,
  412. downstreamConn: downstreamConn}
  413. return tunnel.wrapWithTransferStats(conn), false, nil
  414. }
  415. func (tunnel *Tunnel) DialPacketTunnelChannel() (net.Conn, error) {
  416. channel, err := tunnel.dialChannel(protocol.PACKET_TUNNEL_CHANNEL_TYPE, "")
  417. if err != nil {
  418. return nil, errors.Trace(err)
  419. }
  420. sshChannel, ok := channel.(ssh.Channel)
  421. if !ok {
  422. return nil, errors.Tracef("unexpected channel type: %T", channel)
  423. }
  424. NoticeInfo("DialPacketTunnelChannel: established channel")
  425. conn := newChannelConn(sshChannel)
  426. // wrapWithTransferStats will track bytes transferred for the
  427. // packet tunnel. It will count packet overhead (TCP/UDP/IP headers).
  428. //
  429. // Since the data in the channel is not HTTP or TLS, no domain bytes
  430. // counting is expected.
  431. //
  432. // transferstats are also used to determine that there's been recent
  433. // activity and skip periodic SSH keep alives; see Tunnel.operateTunnel.
  434. return tunnel.wrapWithTransferStats(conn), nil
  435. }
  436. func (tunnel *Tunnel) dialChannel(channelType, remoteAddr string) (interface{}, error) {
  437. if !tunnel.IsActivated() {
  438. return nil, errors.TraceNew("tunnel is not activated")
  439. }
  440. // Note: there is no dial context since SSH port forward dials cannot
  441. // be interrupted directly. Closing the tunnel will interrupt the dials.
  442. // A timeout is set to unblock this function, but the goroutine may
  443. // not exit until the tunnel is closed.
  444. type channelDialResult struct {
  445. channel interface{}
  446. err error
  447. }
  448. // Use a buffer of 1 as there are two senders and only one guaranteed receive.
  449. results := make(chan *channelDialResult, 1)
  450. afterFunc := time.AfterFunc(
  451. tunnel.getCustomParameters().Duration(
  452. parameters.TunnelPortForwardDialTimeout),
  453. func() {
  454. results <- &channelDialResult{
  455. nil, errors.Tracef("channel dial timeout: %s", channelType)}
  456. })
  457. defer afterFunc.Stop()
  458. go func() {
  459. result := new(channelDialResult)
  460. switch channelType {
  461. case "direct-tcpip", protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE:
  462. // The protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE is the same as
  463. // "direct-tcpip", except split tunnel channel rejections are disallowed
  464. // even when split tunnel mode is enabled.
  465. result.channel, result.err =
  466. tunnel.sshClient.Dial(channelType, remoteAddr)
  467. default:
  468. var sshRequests <-chan *ssh.Request
  469. result.channel, sshRequests, result.err =
  470. tunnel.sshClient.OpenChannel(channelType, nil)
  471. if result.err == nil {
  472. go ssh.DiscardRequests(sshRequests)
  473. }
  474. }
  475. if result.err != nil {
  476. result.err = errors.Trace(result.err)
  477. }
  478. results <- result
  479. }()
  480. result := <-results
  481. if result.err != nil {
  482. if !isSplitTunnelRejectReason(result.err) {
  483. select {
  484. case tunnel.signalPortForwardFailure <- struct{}{}:
  485. default:
  486. }
  487. }
  488. return nil, errors.Trace(result.err)
  489. }
  490. return result.channel, nil
  491. }
  492. func isSplitTunnelRejectReason(err error) bool {
  493. var openChannelErr *ssh.OpenChannelError
  494. if std_errors.As(err, &openChannelErr) {
  495. return openChannelErr.Reason ==
  496. ssh.RejectionReason(protocol.CHANNEL_REJECT_REASON_SPLIT_TUNNEL)
  497. }
  498. return false
  499. }
  500. func (tunnel *Tunnel) wrapWithTransferStats(conn net.Conn) net.Conn {
  501. // Tunnel does not have a serverContext when DisableApi is set. We still use
  502. // transferstats.Conn to count bytes transferred for monitoring tunnel
  503. // quality.
  504. var regexps *transferstats.Regexps
  505. if tunnel.serverContext != nil {
  506. regexps = tunnel.serverContext.StatsRegexps()
  507. }
  508. return transferstats.NewConn(
  509. conn, tunnel.dialParams.ServerEntry.IpAddress, regexps)
  510. }
  511. // SignalComponentFailure notifies the tunnel that an associated component has failed.
  512. // This will terminate the tunnel.
  513. func (tunnel *Tunnel) SignalComponentFailure() {
  514. NoticeWarning("tunnel received component failure signal")
  515. tunnel.Close(false)
  516. }
  517. // TunneledConn implements net.Conn and wraps a port forward connection.
  518. // It is used to hook into Read and Write to observe I/O errors and
  519. // report these errors back to the tunnel monitor as port forward failures.
  520. // TunneledConn optionally tracks a peer connection to be explicitly closed
  521. // when the TunneledConn is closed.
  522. type TunneledConn struct {
  523. net.Conn
  524. tunnel *Tunnel
  525. downstreamConn net.Conn
  526. }
  527. func (conn *TunneledConn) Read(buffer []byte) (n int, err error) {
  528. n, err = conn.Conn.Read(buffer)
  529. if err != nil && err != io.EOF {
  530. // Report new failure. Won't block; assumes the receiver
  531. // has a sufficient buffer for the threshold number of reports.
  532. // TODO: conditional on type of error or error message?
  533. select {
  534. case conn.tunnel.signalPortForwardFailure <- struct{}{}:
  535. default:
  536. }
  537. }
  538. return
  539. }
  540. func (conn *TunneledConn) Write(buffer []byte) (n int, err error) {
  541. n, err = conn.Conn.Write(buffer)
  542. if err != nil && err != io.EOF {
  543. // Same as TunneledConn.Read()
  544. select {
  545. case conn.tunnel.signalPortForwardFailure <- struct{}{}:
  546. default:
  547. }
  548. }
  549. return
  550. }
  551. func (conn *TunneledConn) Close() error {
  552. if conn.downstreamConn != nil {
  553. conn.downstreamConn.Close()
  554. }
  555. return conn.Conn.Close()
  556. }
  557. type dialResult struct {
  558. dialConn net.Conn
  559. monitoringStartTime time.Time
  560. monitoredConn *common.BurstMonitoredConn
  561. sshClient *ssh.Client
  562. sshRequests <-chan *ssh.Request
  563. livenessTestMetrics *livenessTestMetrics
  564. extraFailureAction func()
  565. }
  566. // dialTunnel is a helper that builds the transport layers and establishes the
  567. // SSH connection. When additional dial configuration is used, dial metrics
  568. // are recorded and returned.
  569. func dialTunnel(
  570. ctx context.Context,
  571. config *Config,
  572. dialParams *DialParameters) (_ *dialResult, retErr error) {
  573. // Return immediately when overall context is canceled or timed-out. This
  574. // avoids notice noise.
  575. err := ctx.Err()
  576. if err != nil {
  577. return nil, errors.Trace(err)
  578. }
  579. p := getCustomParameters(config, dialParams)
  580. timeout := p.Duration(parameters.TunnelConnectTimeout)
  581. rateLimits := p.RateLimits(parameters.TunnelRateLimits)
  582. obfuscatedSSHMinPadding := p.Int(parameters.ObfuscatedSSHMinPadding)
  583. obfuscatedSSHMaxPadding := p.Int(parameters.ObfuscatedSSHMaxPadding)
  584. livenessTestMinUpstreamBytes := p.Int(parameters.LivenessTestMinUpstreamBytes)
  585. livenessTestMaxUpstreamBytes := p.Int(parameters.LivenessTestMaxUpstreamBytes)
  586. livenessTestMinDownstreamBytes := p.Int(parameters.LivenessTestMinDownstreamBytes)
  587. livenessTestMaxDownstreamBytes := p.Int(parameters.LivenessTestMaxDownstreamBytes)
  588. burstUpstreamTargetBytes := int64(p.Int(parameters.ClientBurstUpstreamTargetBytes))
  589. burstUpstreamDeadline := p.Duration(parameters.ClientBurstUpstreamDeadline)
  590. burstDownstreamTargetBytes := int64(p.Int(parameters.ClientBurstDownstreamTargetBytes))
  591. burstDownstreamDeadline := p.Duration(parameters.ClientBurstDownstreamDeadline)
  592. tlsOSSHApplyTrafficShaping := p.WeightedCoinFlip(parameters.TLSTunnelTrafficShapingProbability)
  593. tlsOSSHMinTLSPadding := p.Int(parameters.TLSTunnelMinTLSPadding)
  594. tlsOSSHMaxTLSPadding := p.Int(parameters.TLSTunnelMaxTLSPadding)
  595. conjureEnableIPv6Dials := p.Bool(parameters.ConjureEnableIPv6Dials)
  596. conjureEnablePortRandomization := p.Bool(parameters.ConjureEnablePortRandomization)
  597. conjureEnableRegistrationOverrides := p.Bool(parameters.ConjureEnableRegistrationOverrides)
  598. p.Close()
  599. // Ensure that, unless the base context is cancelled, any replayed dial
  600. // parameters are cleared, no longer to be retried, if the tunnel fails to
  601. // connect.
  602. //
  603. // Limitation: dials that fail to connect due to the server being in a
  604. // load-limiting state are not distinguished and excepted from this
  605. // logic.
  606. dialSucceeded := false
  607. baseCtx := ctx
  608. var failedTunnelLivenessTestMetrics *livenessTestMetrics
  609. var extraFailureAction func()
  610. defer func() {
  611. if !dialSucceeded && baseCtx.Err() != context.Canceled {
  612. dialParams.Failed(config)
  613. if extraFailureAction != nil {
  614. extraFailureAction()
  615. }
  616. if retErr != nil {
  617. _ = RecordFailedTunnelStat(
  618. config,
  619. dialParams,
  620. failedTunnelLivenessTestMetrics,
  621. -1,
  622. -1,
  623. retErr)
  624. }
  625. }
  626. }()
  627. var cancelFunc context.CancelFunc
  628. ctx, cancelFunc = context.WithTimeout(ctx, timeout)
  629. defer cancelFunc()
  630. // DialDuration is the elapsed time for both successful and failed tunnel
  631. // dials. For successful tunnels, it includes any the network protocol
  632. // handshake(s), obfuscation protocol handshake(s), SSH handshake, and
  633. // liveness test, when performed.
  634. //
  635. // Note: ensure DialDuration is set before calling any function which logs
  636. // dial_duration.
  637. startDialTime := time.Now()
  638. defer func() {
  639. dialParams.DialDuration = time.Since(startDialTime)
  640. }()
  641. // Note: dialParams.MeekResolvedIPAddress isn't set until the dial begins,
  642. // so it will always be blank in NoticeConnectingServer.
  643. NoticeConnectingServer(dialParams)
  644. // Create the base transport: meek or direct connection
  645. var dialConn net.Conn
  646. if protocol.TunnelProtocolUsesTLSOSSH(dialParams.TunnelProtocol) {
  647. dialConn, err = DialTLSTunnel(
  648. ctx,
  649. dialParams.GetTLSOSSHConfig(config),
  650. dialParams.GetDialConfig(),
  651. tlsOSSHApplyTrafficShaping,
  652. tlsOSSHMinTLSPadding,
  653. tlsOSSHMaxTLSPadding)
  654. if err != nil {
  655. return nil, errors.Trace(err)
  656. }
  657. } else if protocol.TunnelProtocolUsesMeek(dialParams.TunnelProtocol) {
  658. dialConn, err = DialMeek(
  659. ctx,
  660. dialParams.GetMeekConfig(),
  661. dialParams.GetDialConfig())
  662. if err != nil {
  663. return nil, errors.Trace(err)
  664. }
  665. } else if protocol.TunnelProtocolUsesQUIC(dialParams.TunnelProtocol) {
  666. packetConn, remoteAddr, err := NewUDPConn(
  667. ctx, "udp", false, "", dialParams.DirectDialAddress, dialParams.GetDialConfig())
  668. if err != nil {
  669. return nil, errors.Trace(err)
  670. }
  671. dialConn, err = quic.Dial(
  672. ctx,
  673. packetConn,
  674. remoteAddr,
  675. dialParams.QUICDialSNIAddress,
  676. dialParams.QUICVersion,
  677. dialParams.QUICClientHelloSeed,
  678. dialParams.ServerEntry.SshObfuscatedKey,
  679. dialParams.ObfuscatedQUICPaddingSeed,
  680. dialParams.ObfuscatedQUICNonceTransformerParameters,
  681. dialParams.QUICDisablePathMTUDiscovery)
  682. if err != nil {
  683. return nil, errors.Trace(err)
  684. }
  685. } else if protocol.TunnelProtocolUsesTapDance(dialParams.TunnelProtocol) {
  686. dialConn, err = refraction.DialTapDance(
  687. ctx,
  688. config.EmitRefractionNetworkingLogs,
  689. config.GetPsiphonDataDirectory(),
  690. NewRefractionNetworkingDialer(dialParams.GetDialConfig()).DialContext,
  691. dialParams.DirectDialAddress)
  692. if err != nil {
  693. return nil, errors.Trace(err)
  694. }
  695. } else if protocol.TunnelProtocolUsesConjure(dialParams.TunnelProtocol) {
  696. // Specify a cache key with a scope that ensures that:
  697. //
  698. // (a) cached registrations aren't used across different networks, as a
  699. // registration requires the client's public IP to match the value at time
  700. // of registration;
  701. //
  702. // (b) cached registrations are associated with specific Psiphon server
  703. // candidates, to ensure that replay will use the same phantom IP(s).
  704. //
  705. // This scheme allows for reuse of cached registrations on network A when a
  706. // client roams from network A to network B and back to network A.
  707. //
  708. // Using the network ID as a proxy for client public IP address is a
  709. // heurisitic: it's possible that a clients public IP address changes
  710. // without the network ID changing, and it's not guaranteed that the client
  711. // will be assigned the original public IP on network A; so there's some
  712. // chance the registration cannot be reused.
  713. diagnosticID := dialParams.ServerEntry.GetDiagnosticID()
  714. cacheKey := dialParams.NetworkID + "-" + diagnosticID
  715. conjureConfig := &refraction.ConjureConfig{
  716. RegistrationCacheTTL: dialParams.ConjureCachedRegistrationTTL,
  717. RegistrationCacheKey: cacheKey,
  718. EnableIPv6Dials: conjureEnableIPv6Dials,
  719. EnablePortRandomization: conjureEnablePortRandomization,
  720. EnableRegistrationOverrides: conjureEnableRegistrationOverrides,
  721. Transport: dialParams.ConjureTransport,
  722. STUNServerAddress: dialParams.ConjureSTUNServerAddress,
  723. DTLSEmptyInitialPacket: dialParams.ConjureDTLSEmptyInitialPacket,
  724. DiagnosticID: diagnosticID,
  725. Logger: NoticeCommonLogger(),
  726. }
  727. // Set extraFailureAction, which is invoked whenever the tunnel fails (i.e.,
  728. // where RecordFailedTunnelStat is invoked). The action will remove any
  729. // cached registration. When refraction.DialConjure succeeds, the underlying
  730. // registration is cached. After refraction.DialConjure returns, it no
  731. // longer modifies the cached state of that registration, assuming that it
  732. // remains valid and effective. However adversarial impact on a given
  733. // phantom IP may not become evident until after the initial TCP connection
  734. // establishment and handshake performed by refraction.DialConjure. For
  735. // example, it may be that the phantom dial is targeted for severe
  736. // throttling which begins or is only evident later in the flow. Scheduling
  737. // a call to DeleteCachedConjureRegistration allows us to invalidate the
  738. // cached registration for a tunnel that fails later in its lifecycle.
  739. //
  740. // Note that extraFailureAction will retain a reference to conjureConfig for
  741. // the lifetime of the tunnel.
  742. extraFailureAction = func() {
  743. refraction.DeleteCachedConjureRegistration(conjureConfig)
  744. }
  745. if dialParams.ConjureAPIRegistration {
  746. // Use MeekConn to domain front Conjure API registration.
  747. //
  748. // ConjureAPIRegistrarFrontingSpecs are applied via
  749. // dialParams.GetMeekConfig, and will be subject to replay.
  750. //
  751. // Since DialMeek will create a TLS connection immediately, and a cached
  752. // registration may be used, we will delay initializing the MeekConn-based
  753. // RoundTripper until we know it's needed. This is implemented by passing
  754. // in a RoundTripper that establishes a MeekConn when RoundTrip is called.
  755. //
  756. // In refraction.dial we configure 0 retries for API registration requests,
  757. // assuming it's better to let another Psiphon candidate retry, with new
  758. // domaing fronting parameters. As such, we expect only one round trip call
  759. // per NewHTTPRoundTripper, so, in practise, there's no performance penalty
  760. // from establishing a new MeekConn per round trip.
  761. //
  762. // Performing the full DialMeek/RoundTrip operation here allows us to call
  763. // MeekConn.Close and ensure all resources are immediately cleaned up.
  764. roundTrip := func(request *http.Request) (*http.Response, error) {
  765. conn, err := DialMeek(
  766. ctx, dialParams.GetMeekConfig(), dialParams.GetDialConfig())
  767. if err != nil {
  768. return nil, errors.Trace(err)
  769. }
  770. defer conn.Close()
  771. response, err := conn.RoundTrip(request)
  772. if err != nil {
  773. return nil, errors.Trace(err)
  774. }
  775. // Read the response into a buffer and close the response
  776. // body, ensuring that MeekConn.Close closes all idle connections.
  777. //
  778. // Alternatively, we could Clone the request to set
  779. // http.Request.Close and avoid keeping any idle connection
  780. // open after the response body is read by gotapdance. Since
  781. // the response body is small and since gotapdance does not
  782. // stream the response body, we're taking this approach which
  783. // ensures cleanup.
  784. body, err := ioutil.ReadAll(response.Body)
  785. _ = response.Body.Close()
  786. if err != nil {
  787. return nil, errors.Trace(err)
  788. }
  789. response.Body = io.NopCloser(bytes.NewReader(body))
  790. return response, nil
  791. }
  792. conjureConfig.APIRegistrarHTTPClient = &http.Client{
  793. Transport: common.NewHTTPRoundTripper(roundTrip),
  794. }
  795. conjureConfig.APIRegistrarBidirectionalURL =
  796. dialParams.ConjureAPIRegistrarBidirectionalURL
  797. conjureConfig.APIRegistrarDelay = dialParams.ConjureAPIRegistrarDelay
  798. } else if dialParams.ConjureDecoyRegistration {
  799. // The Conjure "phantom" connection is compatible with fragmentation, but
  800. // the decoy registrar connection, like Tapdance, is not, so force it off.
  801. // Any tunnel fragmentation metrics will refer to the "phantom" connection
  802. // only.
  803. conjureConfig.DoDecoyRegistration = true
  804. conjureConfig.DecoyRegistrarWidth = dialParams.ConjureDecoyRegistrarWidth
  805. conjureConfig.DecoyRegistrarDelay = dialParams.ConjureDecoyRegistrarDelay
  806. }
  807. dialConn, err = refraction.DialConjure(
  808. ctx,
  809. config.EmitRefractionNetworkingLogs,
  810. config.GetPsiphonDataDirectory(),
  811. NewRefractionNetworkingDialer(dialParams.GetDialConfig()).DialContext,
  812. dialParams.DirectDialAddress,
  813. conjureConfig)
  814. if err != nil {
  815. return nil, errors.Trace(err)
  816. }
  817. } else {
  818. dialConn, err = DialTCP(
  819. ctx,
  820. dialParams.DirectDialAddress,
  821. dialParams.GetDialConfig())
  822. if err != nil {
  823. return nil, errors.Trace(err)
  824. }
  825. }
  826. // Some conns report additional metrics. fragmentor.Conns report
  827. // fragmentor configs.
  828. if metricsSource, ok := dialConn.(common.MetricsSource); ok {
  829. dialParams.DialConnMetrics = metricsSource
  830. }
  831. if noticeMetricsSource, ok := dialConn.(common.NoticeMetricsSource); ok {
  832. dialParams.DialConnNoticeMetrics = noticeMetricsSource
  833. }
  834. // If dialConn is not a Closer, tunnel failure detection may be slower
  835. if _, ok := dialConn.(common.Closer); !ok {
  836. NoticeWarning("tunnel.dialTunnel: dialConn is not a Closer")
  837. }
  838. cleanupConn := dialConn
  839. defer func() {
  840. // Cleanup on error
  841. if cleanupConn != nil {
  842. cleanupConn.Close()
  843. }
  844. }()
  845. monitoringStartTime := time.Now()
  846. monitoredConn := common.NewBurstMonitoredConn(
  847. dialConn,
  848. false,
  849. burstUpstreamTargetBytes, burstUpstreamDeadline,
  850. burstDownstreamTargetBytes, burstDownstreamDeadline)
  851. // Apply throttling (if configured)
  852. throttledConn := common.NewThrottledConn(
  853. monitoredConn,
  854. rateLimits)
  855. // Add obfuscated SSH layer
  856. var sshConn net.Conn = throttledConn
  857. if protocol.TunnelProtocolUsesObfuscatedSSH(dialParams.TunnelProtocol) {
  858. obfuscatedSSHConn, err := obfuscator.NewClientObfuscatedSSHConn(
  859. throttledConn,
  860. dialParams.ServerEntry.SshObfuscatedKey,
  861. dialParams.ObfuscatorPaddingSeed,
  862. dialParams.OSSHObfuscatorSeedTransformerParameters,
  863. dialParams.OSSHPrefixSpec,
  864. dialParams.OSSHPrefixSplitConfig,
  865. &obfuscatedSSHMinPadding,
  866. &obfuscatedSSHMaxPadding)
  867. if err != nil {
  868. return nil, errors.Trace(err)
  869. }
  870. sshConn = obfuscatedSSHConn
  871. dialParams.ObfuscatedSSHConnMetrics = obfuscatedSSHConn
  872. }
  873. // Now establish the SSH session over the conn transport
  874. expectedPublicKey, err := base64.StdEncoding.DecodeString(
  875. dialParams.ServerEntry.SshHostKey)
  876. if err != nil {
  877. return nil, errors.Trace(err)
  878. }
  879. sshCertChecker := &ssh.CertChecker{
  880. IsHostAuthority: func(auth ssh.PublicKey, address string) bool {
  881. // Psiphon servers do not currently use SSH certificates. This CertChecker
  882. // code path may still be hit if a client attempts to connect using an
  883. // obsolete server entry.
  884. return false
  885. },
  886. HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
  887. if !bytes.Equal(expectedPublicKey, publicKey.Marshal()) {
  888. return errors.TraceNew("unexpected host public key")
  889. }
  890. return nil
  891. },
  892. }
  893. sshPasswordPayload := &protocol.SSHPasswordPayload{
  894. SessionId: config.SessionID,
  895. SshPassword: dialParams.ServerEntry.SshPassword,
  896. ClientCapabilities: []string{protocol.CLIENT_CAPABILITY_SERVER_REQUESTS},
  897. }
  898. payload, err := json.Marshal(sshPasswordPayload)
  899. if err != nil {
  900. return nil, errors.Trace(err)
  901. }
  902. sshClientConfig := &ssh.ClientConfig{
  903. User: dialParams.ServerEntry.SshUsername,
  904. Auth: []ssh.AuthMethod{
  905. ssh.Password(string(payload)),
  906. },
  907. HostKeyCallback: sshCertChecker.CheckHostKey,
  908. ClientVersion: dialParams.SSHClientVersion,
  909. }
  910. sshClientConfig.KEXPRNGSeed = dialParams.SSHKEXSeed
  911. if protocol.TunnelProtocolUsesObfuscatedSSH(dialParams.TunnelProtocol) {
  912. if config.ObfuscatedSSHAlgorithms != nil {
  913. sshClientConfig.KeyExchanges = []string{config.ObfuscatedSSHAlgorithms[0]}
  914. sshClientConfig.Ciphers = []string{config.ObfuscatedSSHAlgorithms[1]}
  915. sshClientConfig.MACs = []string{config.ObfuscatedSSHAlgorithms[2]}
  916. sshClientConfig.HostKeyAlgorithms = []string{config.ObfuscatedSSHAlgorithms[3]}
  917. } else {
  918. // With Encrypt-then-MAC hash algorithms, packet length is
  919. // transmitted in plaintext, which aids in traffic analysis.
  920. //
  921. // TUNNEL_PROTOCOL_SSH is excepted since its KEX appears in plaintext,
  922. // and the protocol is intended to look like SSH on the wire.
  923. sshClientConfig.NoEncryptThenMACHash = true
  924. }
  925. } else {
  926. // For TUNNEL_PROTOCOL_SSH only, the server is expected to randomize
  927. // its KEX; setting PeerKEXPRNGSeed will ensure successful negotiation
  928. // betweem two randomized KEXes.
  929. if dialParams.ServerEntry.SshObfuscatedKey != "" {
  930. sshClientConfig.PeerKEXPRNGSeed, err = protocol.DeriveSSHServerKEXPRNGSeed(
  931. dialParams.ServerEntry.SshObfuscatedKey)
  932. if err != nil {
  933. return nil, errors.Trace(err)
  934. }
  935. }
  936. }
  937. // The ssh session establishment (via ssh.NewClientConn) is wrapped
  938. // in a timeout to ensure it won't hang. We've encountered firewalls
  939. // that allow the TCP handshake to complete but then send a RST to the
  940. // server-side and nothing to the client-side, and if that happens
  941. // while ssh.NewClientConn is reading, it may wait forever. The timeout
  942. // closes the conn, which interrupts it.
  943. // Note: TCP handshake timeouts are provided by TCPConn, and session
  944. // timeouts *after* ssh establishment are provided by the ssh keep alive
  945. // in operate tunnel.
  946. type sshNewClientResult struct {
  947. sshClient *ssh.Client
  948. sshRequests <-chan *ssh.Request
  949. livenessTestMetrics *livenessTestMetrics
  950. err error
  951. }
  952. resultChannel := make(chan sshNewClientResult)
  953. // Call NewClientConn in a goroutine, as it blocks on SSH handshake network
  954. // operations, and would block canceling or shutdown. If the parent context
  955. // is canceled, close the net.Conn underlying SSH, which will interrupt the
  956. // SSH handshake that may be blocking NewClientConn.
  957. go func() {
  958. // The following is adapted from ssh.Dial(), here using a custom conn
  959. // The sshAddress is passed through to host key verification callbacks; we don't use it.
  960. sshAddress := ""
  961. sshClientConn, sshChannels, sshRequests, err := ssh.NewClientConn(
  962. sshConn, sshAddress, sshClientConfig)
  963. var sshClient *ssh.Client
  964. var metrics *livenessTestMetrics
  965. if err == nil {
  966. // sshRequests is handled by operateTunnel.
  967. // ssh.NewClient also expects to handle the sshRequests
  968. // value from ssh.NewClientConn and will spawn a goroutine
  969. // to handle the <-chan *ssh.Request, so we must provide
  970. // a closed channel to ensure that goroutine halts instead
  971. // of hanging on a nil channel.
  972. noRequests := make(chan *ssh.Request)
  973. close(noRequests)
  974. sshClient = ssh.NewClient(sshClientConn, sshChannels, noRequests)
  975. if livenessTestMaxUpstreamBytes > 0 || livenessTestMaxDownstreamBytes > 0 {
  976. // When configured, perform a liveness test which sends and
  977. // receives bytes through the tunnel to ensure the tunnel had
  978. // not been blocked upon or shortly after connecting. This
  979. // test is performed concurrently for each establishment
  980. // candidate before selecting a successful tunnel.
  981. //
  982. // Note that the liveness test is subject to the
  983. // TunnelConnectTimeout, which should be adjusted
  984. // accordinging.
  985. metrics, err = performLivenessTest(
  986. sshClient,
  987. livenessTestMinUpstreamBytes, livenessTestMaxUpstreamBytes,
  988. livenessTestMinDownstreamBytes, livenessTestMaxDownstreamBytes,
  989. dialParams.LivenessTestSeed)
  990. // Skip notice when cancelling.
  991. if baseCtx.Err() == nil {
  992. NoticeLivenessTest(
  993. dialParams.ServerEntry.GetDiagnosticID(), metrics, err == nil)
  994. }
  995. }
  996. }
  997. resultChannel <- sshNewClientResult{sshClient, sshRequests, metrics, err}
  998. }()
  999. var result sshNewClientResult
  1000. select {
  1001. case result = <-resultChannel:
  1002. case <-ctx.Done():
  1003. // Interrupt the goroutine and capture its error context to
  1004. // distinguish point of failure.
  1005. err := ctx.Err()
  1006. sshConn.Close()
  1007. result = <-resultChannel
  1008. if result.err != nil {
  1009. result.err = fmt.Errorf("%s: %s", err, result.err)
  1010. } else {
  1011. result.err = err
  1012. }
  1013. }
  1014. if result.err != nil {
  1015. failedTunnelLivenessTestMetrics = result.livenessTestMetrics
  1016. return nil, errors.Trace(result.err)
  1017. }
  1018. dialSucceeded = true
  1019. NoticeConnectedServer(dialParams)
  1020. cleanupConn = nil
  1021. // Invoke DNS cache extension (if enabled in the resolver) now that the
  1022. // tunnel is connected and the Psiphon server is authenticated. This
  1023. // demonstrates that any domain name resolved to an endpoint that is or
  1024. // is forwarded to the expected Psiphon server.
  1025. //
  1026. // Limitation: DNS cache extension is not implemented for Refraction
  1027. // Networking protocols. iOS VPN, the primary use case for DNS cache
  1028. // extension, does not enable Refraction Networking.
  1029. if protocol.TunnelProtocolUsesFrontedMeek(dialParams.TunnelProtocol) {
  1030. resolver := config.GetResolver()
  1031. if resolver != nil {
  1032. resolver.VerifyCacheExtension(dialParams.MeekFrontingDialAddress)
  1033. }
  1034. }
  1035. // When configured to do so, hold-off on activating this tunnel. This allows
  1036. // some extra time for slower but less resource intensive protocols to
  1037. // establish tunnels. By holding off post-connect, the client has this
  1038. // established tunnel ready to activate in case other protocols fail to
  1039. // establish. This hold-off phase continues to consume one connection worker.
  1040. //
  1041. // The network latency multiplier is not applied to HoldOffTunnelDuration,
  1042. // as the goal is to apply a consistent hold-off range across all tunnel
  1043. // candidates; and this avoids scaling up any delay users experience.
  1044. //
  1045. // The hold-off is applied regardless of whether this is the first tunnel
  1046. // in a session or a reconnection, even to a server affinity candidate,
  1047. // so that the advantage for other protocols persists.
  1048. if dialParams.HoldOffTunnelDuration > 0 {
  1049. NoticeHoldOffTunnel(dialParams.ServerEntry.GetDiagnosticID(), dialParams.HoldOffTunnelDuration)
  1050. common.SleepWithContext(ctx, dialParams.HoldOffTunnelDuration)
  1051. }
  1052. // Note: dialConn may be used to close the underlying network connection
  1053. // but should not be used to perform I/O as that would interfere with SSH
  1054. // (and also bypasses throttling).
  1055. return &dialResult{
  1056. dialConn: dialConn,
  1057. monitoringStartTime: monitoringStartTime,
  1058. monitoredConn: monitoredConn,
  1059. sshClient: result.sshClient,
  1060. sshRequests: result.sshRequests,
  1061. livenessTestMetrics: result.livenessTestMetrics,
  1062. extraFailureAction: extraFailureAction,
  1063. },
  1064. nil
  1065. }
  1066. // Fields are exported for JSON encoding in NoticeLivenessTest.
  1067. type livenessTestMetrics struct {
  1068. Duration string
  1069. UpstreamBytes int
  1070. SentUpstreamBytes int
  1071. DownstreamBytes int
  1072. ReceivedDownstreamBytes int
  1073. }
  1074. func performLivenessTest(
  1075. sshClient *ssh.Client,
  1076. minUpstreamBytes, maxUpstreamBytes int,
  1077. minDownstreamBytes, maxDownstreamBytes int,
  1078. livenessTestPRNGSeed *prng.Seed) (*livenessTestMetrics, error) {
  1079. metrics := new(livenessTestMetrics)
  1080. defer func(startTime time.Time) {
  1081. metrics.Duration = time.Since(startTime).String()
  1082. }(time.Now())
  1083. PRNG := prng.NewPRNGWithSeed(livenessTestPRNGSeed)
  1084. metrics.UpstreamBytes = PRNG.Range(minUpstreamBytes, maxUpstreamBytes)
  1085. metrics.DownstreamBytes = PRNG.Range(minDownstreamBytes, maxDownstreamBytes)
  1086. request := &protocol.RandomStreamRequest{
  1087. UpstreamBytes: metrics.UpstreamBytes,
  1088. DownstreamBytes: metrics.DownstreamBytes,
  1089. }
  1090. extraData, err := json.Marshal(request)
  1091. if err != nil {
  1092. return metrics, errors.Trace(err)
  1093. }
  1094. channel, requests, err := sshClient.OpenChannel(
  1095. protocol.RANDOM_STREAM_CHANNEL_TYPE, extraData)
  1096. if err != nil {
  1097. return metrics, errors.Trace(err)
  1098. }
  1099. defer channel.Close()
  1100. go ssh.DiscardRequests(requests)
  1101. sent := 0
  1102. received := 0
  1103. upstream := new(sync.WaitGroup)
  1104. var errUpstream, errDownstream error
  1105. if metrics.UpstreamBytes > 0 {
  1106. // Process streams concurrently to minimize elapsed time. This also
  1107. // avoids a unidirectional flow burst early in the tunnel lifecycle.
  1108. upstream.Add(1)
  1109. go func() {
  1110. defer upstream.Done()
  1111. // In consideration of memory-constrained environments, use modest-sized copy
  1112. // buffers since many tunnel establishment workers may run the liveness test
  1113. // concurrently.
  1114. var buffer [4096]byte
  1115. n, err := common.CopyNBuffer(channel, rand.Reader, int64(metrics.UpstreamBytes), buffer[:])
  1116. sent = int(n)
  1117. if err != nil {
  1118. errUpstream = errors.Trace(err)
  1119. }
  1120. }()
  1121. }
  1122. if metrics.DownstreamBytes > 0 {
  1123. var buffer [4096]byte
  1124. n, err := common.CopyNBuffer(ioutil.Discard, channel, int64(metrics.DownstreamBytes), buffer[:])
  1125. received = int(n)
  1126. if err != nil {
  1127. errDownstream = errors.Trace(err)
  1128. }
  1129. }
  1130. upstream.Wait()
  1131. metrics.SentUpstreamBytes = sent
  1132. metrics.ReceivedDownstreamBytes = received
  1133. if errUpstream != nil {
  1134. return metrics, errUpstream
  1135. } else if errDownstream != nil {
  1136. return metrics, errDownstream
  1137. }
  1138. return metrics, nil
  1139. }
  1140. // operateTunnel monitors the health of the tunnel and performs
  1141. // periodic work.
  1142. //
  1143. // BytesTransferred and TotalBytesTransferred notices are emitted
  1144. // for live reporting and diagnostics reporting, respectively.
  1145. //
  1146. // Status requests are sent to the Psiphon API to report bytes
  1147. // transferred.
  1148. //
  1149. // Periodic SSH keep alive packets are sent to ensure the underlying
  1150. // TCP connection isn't terminated by NAT, or other network
  1151. // interference -- or test if it has been terminated while the device
  1152. // has been asleep. When a keep alive times out, the tunnel is
  1153. // considered failed.
  1154. //
  1155. // An immediate SSH keep alive "probe" is sent to test the tunnel and
  1156. // server responsiveness when a port forward failure is detected: a
  1157. // failed dial or failed read/write. This keep alive has a shorter
  1158. // timeout.
  1159. //
  1160. // Note that port forward failures may be due to non-failure conditions.
  1161. // For example, when the user inputs an invalid domain name and
  1162. // resolution is done by the ssh server; or trying to connect to a
  1163. // non-white-listed port; and the error message in these cases is not
  1164. // distinguishable from a a true server error (a common error message,
  1165. // "ssh: rejected: administratively prohibited (open failed)", may be
  1166. // returned for these cases but also if the server has run out of
  1167. // ephemeral ports, for example).
  1168. //
  1169. // SSH keep alives are not sent when the tunnel has been recently
  1170. // active (not only does tunnel activity obviate the necessity of a keep
  1171. // alive, testing has shown that keep alives may time out for "busy"
  1172. // tunnels, especially over meek protocol and other high latency
  1173. // conditions).
  1174. //
  1175. // "Recently active" is defined has having received payload bytes. Sent
  1176. // bytes are not considered as testing has shown bytes may appear to
  1177. // send when certain NAT devices have interfered with the tunnel, while
  1178. // no bytes are received. In a pathological case, with DNS implemented
  1179. // as tunneled UDP, a browser may wait excessively for a domain name to
  1180. // resolve, while no new port forward is attempted which would otherwise
  1181. // result in a tunnel failure detection.
  1182. //
  1183. // TODO: change "recently active" to include having received any
  1184. // SSH protocol messages from the server, not just user payload?
  1185. func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
  1186. defer tunnel.operateWaitGroup.Done()
  1187. now := time.Now()
  1188. lastBytesReceivedTime := now
  1189. lastTotalBytesTransferedTime := now
  1190. totalSent := int64(0)
  1191. totalReceived := int64(0)
  1192. setDialParamsSucceeded := false
  1193. noticeBytesTransferredTicker := time.NewTicker(1 * time.Second)
  1194. defer noticeBytesTransferredTicker.Stop()
  1195. // The next status request and ssh keep alive times are picked at random,
  1196. // from a range, to make the resulting traffic less fingerprintable,
  1197. // Note: not using Tickers since these are not fixed time periods.
  1198. nextStatusRequestPeriod := func() time.Duration {
  1199. p := tunnel.getCustomParameters()
  1200. return prng.Period(
  1201. p.Duration(parameters.PsiphonAPIStatusRequestPeriodMin),
  1202. p.Duration(parameters.PsiphonAPIStatusRequestPeriodMax))
  1203. }
  1204. statsTimer := time.NewTimer(nextStatusRequestPeriod())
  1205. defer statsTimer.Stop()
  1206. // Schedule an almost-immediate status request to deliver any unreported
  1207. // persistent stats.
  1208. unreported := CountUnreportedPersistentStats()
  1209. if unreported > 0 {
  1210. NoticeInfo("Unreported persistent stats: %d", unreported)
  1211. p := tunnel.getCustomParameters()
  1212. statsTimer.Reset(
  1213. prng.Period(
  1214. p.Duration(parameters.PsiphonAPIStatusRequestShortPeriodMin),
  1215. p.Duration(parameters.PsiphonAPIStatusRequestShortPeriodMax)))
  1216. }
  1217. nextSshKeepAlivePeriod := func() time.Duration {
  1218. p := tunnel.getCustomParameters()
  1219. return prng.Period(
  1220. p.Duration(parameters.SSHKeepAlivePeriodMin),
  1221. p.Duration(parameters.SSHKeepAlivePeriodMax))
  1222. }
  1223. // TODO: don't initialize timer when config.DisablePeriodicSshKeepAlive is set
  1224. sshKeepAliveTimer := time.NewTimer(nextSshKeepAlivePeriod())
  1225. if tunnel.config.DisablePeriodicSshKeepAlive {
  1226. sshKeepAliveTimer.Stop()
  1227. } else {
  1228. defer sshKeepAliveTimer.Stop()
  1229. }
  1230. // Perform network requests in separate goroutines so as not to block
  1231. // other operations.
  1232. requestsWaitGroup := new(sync.WaitGroup)
  1233. requestsWaitGroup.Add(1)
  1234. signalStatusRequest := make(chan struct{})
  1235. go func() {
  1236. defer requestsWaitGroup.Done()
  1237. for range signalStatusRequest {
  1238. sendStats(tunnel)
  1239. }
  1240. }()
  1241. requestsWaitGroup.Add(1)
  1242. signalPeriodicSshKeepAlive := make(chan time.Duration)
  1243. sshKeepAliveError := make(chan error, 1)
  1244. go func() {
  1245. defer requestsWaitGroup.Done()
  1246. isFirstPeriodicKeepAlive := true
  1247. for timeout := range signalPeriodicSshKeepAlive {
  1248. bytesUp := atomic.LoadInt64(&totalSent)
  1249. bytesDown := atomic.LoadInt64(&totalReceived)
  1250. err := tunnel.sendSshKeepAlive(
  1251. isFirstPeriodicKeepAlive, false, timeout, bytesUp, bytesDown)
  1252. if err != nil {
  1253. select {
  1254. case sshKeepAliveError <- err:
  1255. default:
  1256. }
  1257. }
  1258. isFirstPeriodicKeepAlive = false
  1259. }
  1260. }()
  1261. // Probe-type SSH keep alives have a distinct send worker and may be sent
  1262. // concurrently, to ensure a long period keep alive timeout doesn't delay
  1263. // failed tunnel detection.
  1264. requestsWaitGroup.Add(1)
  1265. signalProbeSshKeepAlive := make(chan time.Duration)
  1266. go func() {
  1267. defer requestsWaitGroup.Done()
  1268. for timeout := range signalProbeSshKeepAlive {
  1269. bytesUp := atomic.LoadInt64(&totalSent)
  1270. bytesDown := atomic.LoadInt64(&totalReceived)
  1271. err := tunnel.sendSshKeepAlive(
  1272. false, true, timeout, bytesUp, bytesDown)
  1273. if err != nil {
  1274. select {
  1275. case sshKeepAliveError <- err:
  1276. default:
  1277. }
  1278. }
  1279. }
  1280. }()
  1281. shutdown := false
  1282. var err error
  1283. for !shutdown && err == nil {
  1284. select {
  1285. case <-noticeBytesTransferredTicker.C:
  1286. sent, received := transferstats.ReportRecentBytesTransferredForServer(
  1287. tunnel.dialParams.ServerEntry.IpAddress)
  1288. if received > 0 {
  1289. lastBytesReceivedTime = time.Now()
  1290. }
  1291. bytesUp := atomic.AddInt64(&totalSent, sent)
  1292. bytesDown := atomic.AddInt64(&totalReceived, received)
  1293. p := tunnel.getCustomParameters()
  1294. noticePeriod := p.Duration(parameters.TotalBytesTransferredNoticePeriod)
  1295. doEmitMemoryMetrics := p.Bool(parameters.TotalBytesTransferredEmitMemoryMetrics)
  1296. replayTargetUpstreamBytes := p.Int(parameters.ReplayTargetUpstreamBytes)
  1297. replayTargetDownstreamBytes := p.Int(parameters.ReplayTargetDownstreamBytes)
  1298. replayTargetTunnelDuration := p.Duration(parameters.ReplayTargetTunnelDuration)
  1299. if lastTotalBytesTransferedTime.Add(noticePeriod).Before(time.Now()) {
  1300. NoticeTotalBytesTransferred(
  1301. tunnel.dialParams.ServerEntry.GetDiagnosticID(), bytesUp, bytesDown)
  1302. if doEmitMemoryMetrics {
  1303. emitMemoryMetrics()
  1304. }
  1305. lastTotalBytesTransferedTime = time.Now()
  1306. }
  1307. // Only emit the frequent BytesTransferred notice when tunnel is not idle.
  1308. if tunnel.config.EmitBytesTransferred && (sent > 0 || received > 0) {
  1309. NoticeBytesTransferred(
  1310. tunnel.dialParams.ServerEntry.GetDiagnosticID(), sent, received)
  1311. }
  1312. // Once the tunnel has connected, activated, successfully transmitted the
  1313. // targeted number of bytes, and been up for the targeted duration
  1314. // (measured from the end of establishment), store its dial parameters for
  1315. // subsequent replay.
  1316. //
  1317. // Even when target bytes and duration are all 0, the tunnel must remain up
  1318. // for at least 1 second due to use of noticeBytesTransferredTicker; for
  1319. // the same reason the granularity of ReplayTargetTunnelDuration is
  1320. // seconds.
  1321. if !setDialParamsSucceeded &&
  1322. bytesUp >= int64(replayTargetUpstreamBytes) &&
  1323. bytesDown >= int64(replayTargetDownstreamBytes) &&
  1324. time.Since(tunnel.establishedTime) >= replayTargetTunnelDuration {
  1325. tunnel.dialParams.Succeeded()
  1326. setDialParamsSucceeded = true
  1327. }
  1328. case <-statsTimer.C:
  1329. select {
  1330. case signalStatusRequest <- struct{}{}:
  1331. default:
  1332. }
  1333. statsTimer.Reset(nextStatusRequestPeriod())
  1334. case <-sshKeepAliveTimer.C:
  1335. p := tunnel.getCustomParameters()
  1336. inactivePeriod := p.Duration(parameters.SSHKeepAlivePeriodicInactivePeriod)
  1337. if lastBytesReceivedTime.Add(inactivePeriod).Before(time.Now()) {
  1338. timeout := p.Duration(parameters.SSHKeepAlivePeriodicTimeout)
  1339. select {
  1340. case signalPeriodicSshKeepAlive <- timeout:
  1341. default:
  1342. }
  1343. }
  1344. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  1345. case <-tunnel.signalPortForwardFailure:
  1346. // Note: no mutex on portForwardFailureTotal; only referenced here
  1347. tunnel.totalPortForwardFailures++
  1348. NoticeInfo("port forward failures for %s: %d",
  1349. tunnel.dialParams.ServerEntry.GetDiagnosticID(),
  1350. tunnel.totalPortForwardFailures)
  1351. // If the underlying Conn has closed (meek and other plugin protocols may
  1352. // close themselves in certain error conditions), the tunnel has certainly
  1353. // failed. Otherwise, probe with an SSH keep alive.
  1354. //
  1355. // TODO: the IsClosed case omits the failed tunnel logging and reset
  1356. // actions performed by sendSshKeepAlive. Should self-closing protocols
  1357. // perform these actions themselves?
  1358. if tunnel.conn.IsClosed() {
  1359. err = errors.TraceNew("underlying conn is closed")
  1360. } else {
  1361. p := tunnel.getCustomParameters()
  1362. inactivePeriod := p.Duration(parameters.SSHKeepAliveProbeInactivePeriod)
  1363. if lastBytesReceivedTime.Add(inactivePeriod).Before(time.Now()) {
  1364. timeout := p.Duration(parameters.SSHKeepAliveProbeTimeout)
  1365. select {
  1366. case signalProbeSshKeepAlive <- timeout:
  1367. default:
  1368. }
  1369. }
  1370. if !tunnel.config.DisablePeriodicSshKeepAlive {
  1371. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  1372. }
  1373. }
  1374. case err = <-sshKeepAliveError:
  1375. case serverRequest := <-tunnel.sshServerRequests:
  1376. if serverRequest != nil {
  1377. err := HandleServerRequest(tunnelOwner, tunnel, serverRequest.Type, serverRequest.Payload)
  1378. if err == nil {
  1379. serverRequest.Reply(true, nil)
  1380. } else {
  1381. NoticeWarning("HandleServerRequest for %s failed: %s", serverRequest.Type, err)
  1382. serverRequest.Reply(false, nil)
  1383. }
  1384. }
  1385. case <-tunnel.operateCtx.Done():
  1386. shutdown = true
  1387. }
  1388. }
  1389. close(signalPeriodicSshKeepAlive)
  1390. close(signalProbeSshKeepAlive)
  1391. close(signalStatusRequest)
  1392. requestsWaitGroup.Wait()
  1393. // Capture bytes transferred since the last noticeBytesTransferredTicker tick
  1394. sent, received := transferstats.ReportRecentBytesTransferredForServer(
  1395. tunnel.dialParams.ServerEntry.IpAddress)
  1396. bytesUp := atomic.AddInt64(&totalSent, sent)
  1397. bytesDown := atomic.AddInt64(&totalReceived, received)
  1398. // Always emit a final NoticeTotalBytesTransferred
  1399. NoticeTotalBytesTransferred(
  1400. tunnel.dialParams.ServerEntry.GetDiagnosticID(), bytesUp, bytesDown)
  1401. if err == nil {
  1402. NoticeInfo("shutdown operate tunnel")
  1403. // This commanded shutdown case is initiated by Tunnel.Close, which will
  1404. // wait up to parameters.TunnelOperateShutdownTimeout to allow the following
  1405. // requests to complete.
  1406. // Send a final status request in order to report any outstanding persistent
  1407. // stats and domain bytes transferred as soon as possible.
  1408. sendStats(tunnel)
  1409. // The controller connectedReporter may have initiated a connected request
  1410. // concurrent to this commanded shutdown. SetInFlightConnectedRequest
  1411. // ensures that a connected request doesn't start after the commanded
  1412. // shutdown. AwaitInFlightConnectedRequest blocks until any in flight
  1413. // request completes or is aborted after TunnelOperateShutdownTimeout.
  1414. //
  1415. // As any connected request is performed by a concurrent goroutine,
  1416. // sendStats is called first and AwaitInFlightConnectedRequest second.
  1417. tunnel.AwaitInFlightConnectedRequest()
  1418. } else {
  1419. NoticeWarning("operate tunnel error for %s: %s",
  1420. tunnel.dialParams.ServerEntry.GetDiagnosticID(), err)
  1421. tunnelOwner.SignalTunnelFailure(tunnel)
  1422. }
  1423. }
  1424. // sendSshKeepAlive is a helper which sends a keepalive@openssh.com request
  1425. // on the specified SSH connections and returns true of the request succeeds
  1426. // within a specified timeout. If the request fails, the associated conn is
  1427. // closed, which will terminate the associated tunnel.
  1428. func (tunnel *Tunnel) sendSshKeepAlive(
  1429. isFirstPeriodicKeepAlive bool,
  1430. isProbeKeepAlive bool,
  1431. timeout time.Duration,
  1432. bytesUp int64,
  1433. bytesDown int64) error {
  1434. p := tunnel.getCustomParameters()
  1435. // Random padding to frustrate fingerprinting.
  1436. request := prng.Padding(
  1437. p.Int(parameters.SSHKeepAlivePaddingMinBytes),
  1438. p.Int(parameters.SSHKeepAlivePaddingMaxBytes))
  1439. speedTestSample := isFirstPeriodicKeepAlive
  1440. if !speedTestSample {
  1441. speedTestSample = p.WeightedCoinFlip(
  1442. parameters.SSHKeepAliveSpeedTestSampleProbability)
  1443. }
  1444. networkConnectivityPollPeriod := p.Duration(
  1445. parameters.SSHKeepAliveNetworkConnectivityPollingPeriod)
  1446. resetOnFailure := p.WeightedCoinFlip(
  1447. parameters.SSHKeepAliveResetOnFailureProbability)
  1448. p.Close()
  1449. // Note: there is no request context since SSH requests cannot be interrupted
  1450. // directly. Closing the tunnel will interrupt the request. A timeout is set
  1451. // to unblock this function, but the goroutine may not exit until the tunnel
  1452. // is closed.
  1453. // Use a buffer of 1 as there are two senders and only one guaranteed receive.
  1454. errChannel := make(chan error, 1)
  1455. afterFunc := time.AfterFunc(timeout, func() {
  1456. errChannel <- errors.TraceNew("timed out")
  1457. })
  1458. defer afterFunc.Stop()
  1459. go func() {
  1460. startTime := time.Now()
  1461. // Note: reading a reply is important for last-received-time tunnel
  1462. // duration calculation.
  1463. requestOk, response, err := tunnel.sshClient.SendRequest(
  1464. "keepalive@openssh.com", true, request)
  1465. elapsedTime := time.Since(startTime)
  1466. errChannel <- err
  1467. success := (err == nil && requestOk)
  1468. if success && isProbeKeepAlive {
  1469. NoticeInfo("Probe SSH keep-alive RTT: %s", elapsedTime)
  1470. }
  1471. // Record the keep alive round trip as a speed test sample. The first
  1472. // periodic keep alive is always recorded, as many tunnels are short-lived
  1473. // and we want to ensure that some data is gathered. Subsequent keep alives
  1474. // are recorded with some configurable probability, which, considering that
  1475. // only the last SpeedTestMaxSampleCount samples are retained, enables
  1476. // tuning the sampling frequency.
  1477. if success && speedTestSample {
  1478. err = tactics.AddSpeedTestSample(
  1479. tunnel.config.GetParameters(),
  1480. GetTacticsStorer(tunnel.config),
  1481. tunnel.config.GetNetworkID(),
  1482. tunnel.dialParams.ServerEntry.Region,
  1483. tunnel.dialParams.TunnelProtocol,
  1484. elapsedTime,
  1485. request,
  1486. response)
  1487. if err != nil {
  1488. NoticeWarning("AddSpeedTestSample failed: %s", errors.Trace(err))
  1489. }
  1490. }
  1491. }()
  1492. // While awaiting the response, poll the network connectivity state. If there
  1493. // is network connectivity, on the same network, for the entire duration of
  1494. // the keep alive request and the request fails, record a failed tunnel
  1495. // event.
  1496. //
  1497. // The network connectivity heuristic is intended to reduce the number of
  1498. // failed tunnels reported due to routine situations such as varying mobile
  1499. // network conditions. The polling may produce false positives if the network
  1500. // goes down and up between polling periods, or changes to a new network and
  1501. // back to the previous network between polling periods.
  1502. //
  1503. // For platforms that don't provide a NetworkConnectivityChecker, it is
  1504. // assumed that there is network connectivity.
  1505. //
  1506. // The approximate number of tunneled bytes successfully sent and received is
  1507. // recorded in the failed tunnel event as a quality indicator.
  1508. ticker := time.NewTicker(networkConnectivityPollPeriod)
  1509. defer ticker.Stop()
  1510. continuousNetworkConnectivity := true
  1511. networkID := tunnel.config.GetNetworkID()
  1512. var err error
  1513. loop:
  1514. for {
  1515. select {
  1516. case err = <-errChannel:
  1517. break loop
  1518. case <-ticker.C:
  1519. connectivityChecker := tunnel.config.NetworkConnectivityChecker
  1520. if (connectivityChecker != nil &&
  1521. connectivityChecker.HasNetworkConnectivity() != 1) ||
  1522. (networkID != tunnel.config.GetNetworkID()) {
  1523. continuousNetworkConnectivity = false
  1524. }
  1525. }
  1526. }
  1527. err = errors.Trace(err)
  1528. if err != nil {
  1529. tunnel.sshClient.Close()
  1530. tunnel.conn.Close()
  1531. // Don't perform log or reset actions when the keep alive may have been
  1532. // interrupted due to shutdown.
  1533. isShutdown := false
  1534. select {
  1535. case <-tunnel.operateCtx.Done():
  1536. isShutdown = true
  1537. default:
  1538. }
  1539. // Ensure that at most one of the two SSH keep alive workers (periodic and
  1540. // probe) perform the log and reset actions.
  1541. wasHandled := atomic.CompareAndSwapInt32(&tunnel.handledSSHKeepAliveFailure, 0, 1)
  1542. if continuousNetworkConnectivity &&
  1543. !isShutdown &&
  1544. !wasHandled {
  1545. _ = RecordFailedTunnelStat(
  1546. tunnel.config,
  1547. tunnel.dialParams,
  1548. tunnel.livenessTestMetrics,
  1549. bytesUp,
  1550. bytesDown,
  1551. err)
  1552. if tunnel.extraFailureAction != nil {
  1553. tunnel.extraFailureAction()
  1554. }
  1555. // SSHKeepAliveResetOnFailureProbability is set when a late-lifecycle
  1556. // impaired protocol attack is suspected. With the given probability, reset
  1557. // server affinity and replay parameters for this server to avoid
  1558. // continuously reconnecting to the server and/or using the same protocol
  1559. // and dial parameters.
  1560. if resetOnFailure {
  1561. NoticeInfo("Delete dial parameters for %s", tunnel.dialParams.ServerEntry.GetDiagnosticID())
  1562. err := DeleteDialParameters(tunnel.dialParams.ServerEntry.IpAddress, tunnel.dialParams.NetworkID)
  1563. if err != nil {
  1564. NoticeWarning("DeleteDialParameters failed: %s", err)
  1565. }
  1566. NoticeInfo("Delete server affinity for %s", tunnel.dialParams.ServerEntry.GetDiagnosticID())
  1567. err = DeleteServerEntryAffinity(tunnel.dialParams.ServerEntry.IpAddress)
  1568. if err != nil {
  1569. NoticeWarning("DeleteServerEntryAffinity failed: %s", err)
  1570. }
  1571. }
  1572. }
  1573. }
  1574. return err
  1575. }
  1576. // sendStats is a helper for sending session stats to the server.
  1577. func sendStats(tunnel *Tunnel) bool {
  1578. // Tunnel does not have a serverContext when DisableApi is set
  1579. if tunnel.serverContext == nil {
  1580. return true
  1581. }
  1582. // Skip when tunnel is discarded
  1583. if tunnel.IsDiscarded() {
  1584. return true
  1585. }
  1586. err := tunnel.serverContext.DoStatusRequest(tunnel)
  1587. if err != nil {
  1588. NoticeWarning("DoStatusRequest failed for %s: %s",
  1589. tunnel.dialParams.ServerEntry.GetDiagnosticID(), err)
  1590. }
  1591. return err == nil
  1592. }