tunnel.go 73 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "context"
  23. "crypto/rand"
  24. "encoding/base64"
  25. "encoding/json"
  26. std_errors "errors"
  27. "fmt"
  28. "io"
  29. "io/ioutil"
  30. "net"
  31. "net/http"
  32. "strconv"
  33. "sync"
  34. "sync/atomic"
  35. "time"
  36. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/ssh"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/fragmentor"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy"
  41. inproxy_dtls "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy/dtls"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  45. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  46. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  47. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/refraction"
  48. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
  49. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/transferstats"
  50. "github.com/fxamacker/cbor/v2"
  51. )
  52. // Tunneler specifies the interface required by components that use tunnels.
  53. type Tunneler interface {
  54. // Dial creates a tunneled connection.
  55. //
  56. // When split tunnel mode is enabled, the connection may be untunneled,
  57. // depending on GeoIP classification of the destination.
  58. //
  59. // downstreamConn is an optional parameter which specifies a connection to be
  60. // explicitly closed when the Dialed connection is closed. For instance, this
  61. // is used to close downstreamConn App<->LocalProxy connections when the related
  62. // LocalProxy<->SshPortForward connections close.
  63. Dial(remoteAddr string, downstreamConn net.Conn) (conn net.Conn, err error)
  64. DirectDial(remoteAddr string) (conn net.Conn, err error)
  65. SignalComponentFailure()
  66. }
  67. // TunnelOwner specifies the interface required by Tunnel to notify its
  68. // owner when it has failed. The owner may, as in the case of the Controller,
  69. // remove the tunnel from its list of active tunnels.
  70. type TunnelOwner interface {
  71. SignalSeededNewSLOK()
  72. SignalTunnelFailure(tunnel *Tunnel)
  73. }
  74. // Tunnel is a connection to a Psiphon server. An established
  75. // tunnel includes a network connection to the specified server
  76. // and an SSH session built on top of that transport.
  77. type Tunnel struct {
  78. mutex *sync.Mutex
  79. config *Config
  80. isActivated bool
  81. isDiscarded bool
  82. isClosed bool
  83. dialParams *DialParameters
  84. livenessTestMetrics *livenessTestMetrics
  85. extraFailureAction func()
  86. serverContext *ServerContext
  87. monitoringStartTime time.Time
  88. conn *common.BurstMonitoredConn
  89. sshClient *ssh.Client
  90. sshServerRequests <-chan *ssh.Request
  91. operateWaitGroup *sync.WaitGroup
  92. operateCtx context.Context
  93. stopOperate context.CancelFunc
  94. signalPortForwardFailure chan struct{}
  95. totalPortForwardFailures int
  96. adjustedEstablishStartTime time.Time
  97. establishDuration time.Duration
  98. establishedTime time.Time
  99. handledSSHKeepAliveFailure int32
  100. inFlightConnectedRequestSignal chan struct{}
  101. }
  102. // getCustomParameters helpers wrap the verbose function call chain required
  103. // to get a current snapshot of the parameters.Parameters customized with the
  104. // dial parameters associated with a tunnel.
  105. func (tunnel *Tunnel) getCustomParameters() parameters.ParametersAccessor {
  106. return getCustomParameters(tunnel.config, tunnel.dialParams)
  107. }
  108. func getCustomParameters(
  109. config *Config, dialParams *DialParameters) parameters.ParametersAccessor {
  110. return config.GetParameters().GetCustom(dialParams.NetworkLatencyMultiplier)
  111. }
  112. // ConnectTunnel first makes a network transport connection to the
  113. // Psiphon server and then establishes an SSH client session on top of
  114. // that transport. The SSH server is authenticated using the public
  115. // key in the server entry.
  116. // Depending on the server's capabilities, the connection may use
  117. // plain SSH over TCP, obfuscated SSH over TCP, or obfuscated SSH over
  118. // HTTP (meek protocol).
  119. // When requiredProtocol is not blank, that protocol is used. Otherwise,
  120. // the a random supported protocol is used.
  121. //
  122. // Call Activate on a connected tunnel to complete its establishment
  123. // before using.
  124. //
  125. // Tunnel establishment is split into two phases: connection, and
  126. // activation. The Controller will run many ConnectTunnel calls
  127. // concurrently and then, to avoid unnecessary overhead from making
  128. // handshake requests and starting operateTunnel from tunnels which
  129. // may be discarded, call Activate on connected tunnels sequentially
  130. // as necessary.
  131. func ConnectTunnel(
  132. ctx context.Context,
  133. config *Config,
  134. adjustedEstablishStartTime time.Time,
  135. dialParams *DialParameters) (*Tunnel, error) {
  136. // Build transport layers and establish SSH connection. Note that
  137. // dialConn and monitoredConn are the same network connection.
  138. dialResult, err := dialTunnel(ctx, config, dialParams)
  139. if err != nil {
  140. return nil, errors.Trace(err)
  141. }
  142. // The tunnel is now connected
  143. return &Tunnel{
  144. mutex: new(sync.Mutex),
  145. config: config,
  146. dialParams: dialParams,
  147. livenessTestMetrics: dialResult.livenessTestMetrics,
  148. extraFailureAction: dialResult.extraFailureAction,
  149. monitoringStartTime: dialResult.monitoringStartTime,
  150. conn: dialResult.monitoredConn,
  151. sshClient: dialResult.sshClient,
  152. sshServerRequests: dialResult.sshRequests,
  153. // A buffer allows at least one signal to be sent even when the receiver is
  154. // not listening. Senders should not block.
  155. signalPortForwardFailure: make(chan struct{}, 1),
  156. adjustedEstablishStartTime: adjustedEstablishStartTime,
  157. }, nil
  158. }
  159. // Activate completes the tunnel establishment, performing the handshake
  160. // request and starting operateTunnel, the worker that monitors the tunnel
  161. // and handles periodic management.
  162. func (tunnel *Tunnel) Activate(
  163. ctx context.Context, tunnelOwner TunnelOwner) (retErr error) {
  164. // Ensure that, unless the base context is cancelled, any replayed dial
  165. // parameters are cleared, no longer to be retried, if the tunnel fails to
  166. // activate.
  167. activationSucceeded := false
  168. baseCtx := ctx
  169. defer func() {
  170. if !activationSucceeded && baseCtx.Err() != context.Canceled {
  171. tunnel.dialParams.Failed(tunnel.config)
  172. if tunnel.extraFailureAction != nil {
  173. tunnel.extraFailureAction()
  174. }
  175. if retErr != nil {
  176. _ = RecordFailedTunnelStat(
  177. tunnel.config,
  178. tunnel.dialParams,
  179. tunnel.livenessTestMetrics,
  180. -1,
  181. -1,
  182. retErr)
  183. }
  184. }
  185. }()
  186. // Create a new Psiphon API server context for this tunnel. This includes
  187. // performing a handshake request. If the handshake fails, this activation
  188. // fails.
  189. var serverContext *ServerContext
  190. if !tunnel.config.DisableApi {
  191. NoticeInfo(
  192. "starting server context for %s",
  193. tunnel.dialParams.ServerEntry.GetDiagnosticID())
  194. // Call NewServerContext in a goroutine, as it blocks on a network operation,
  195. // the handshake request, and would block shutdown. If the shutdown signal is
  196. // received, close the tunnel, which will interrupt the handshake request
  197. // that may be blocking NewServerContext.
  198. //
  199. // Timeout after PsiphonApiServerTimeoutSeconds. NewServerContext may not
  200. // return if the tunnel network connection is unstable during the handshake
  201. // request. At this point, there is no operateTunnel monitor that will detect
  202. // this condition with SSH keep alives.
  203. doInproxy := protocol.TunnelProtocolUsesInproxy(tunnel.dialParams.TunnelProtocol)
  204. var timeoutParameter string
  205. if doInproxy {
  206. // Optionally allow more time in case the broker/server relay
  207. // requires additional round trips to establish a new session.
  208. timeoutParameter = parameters.InproxyPsiphonAPIRequestTimeout
  209. } else {
  210. timeoutParameter = parameters.PsiphonAPIRequestTimeout
  211. }
  212. timeout := tunnel.getCustomParameters().Duration(timeoutParameter)
  213. var handshakeCtx context.Context
  214. var cancelFunc context.CancelFunc
  215. if timeout > 0 {
  216. handshakeCtx, cancelFunc = context.WithTimeout(ctx, timeout)
  217. } else {
  218. handshakeCtx, cancelFunc = context.WithCancel(ctx)
  219. }
  220. type newServerContextResult struct {
  221. serverContext *ServerContext
  222. err error
  223. }
  224. resultChannel := make(chan newServerContextResult)
  225. wg := new(sync.WaitGroup)
  226. if doInproxy {
  227. // Launch a handler to handle broker/server relay SSH requests,
  228. // which will occur when the broker needs to establish a new
  229. // session with the server.
  230. wg.Add(1)
  231. go func() {
  232. defer wg.Done()
  233. notice := true
  234. for {
  235. select {
  236. case serverRequest := <-tunnel.sshServerRequests:
  237. if serverRequest != nil {
  238. if serverRequest.Type == protocol.PSIPHON_API_INPROXY_RELAY_REQUEST_NAME {
  239. if notice {
  240. NoticeInfo(
  241. "relaying inproxy broker packets for %s",
  242. tunnel.dialParams.ServerEntry.GetDiagnosticID())
  243. notice = false
  244. }
  245. err := tunnel.relayInproxyPacketRoundTrip(handshakeCtx, serverRequest)
  246. if err != nil {
  247. NoticeWarning(
  248. "relay inproxy broker packets failed: %v",
  249. errors.Trace(err))
  250. // Continue
  251. }
  252. } else {
  253. // There's a potential race condition in which
  254. // post-handshake SSH requests, such as OSL or
  255. // alert requests, arrive to this handler instead
  256. // of operateTunnel, so invoke HandleServerRequest here.
  257. HandleServerRequest(tunnelOwner, tunnel, serverRequest)
  258. }
  259. }
  260. case <-handshakeCtx.Done():
  261. return
  262. }
  263. }
  264. }()
  265. }
  266. wg.Add(1)
  267. go func() {
  268. defer wg.Done()
  269. serverContext, err := NewServerContext(tunnel)
  270. resultChannel <- newServerContextResult{
  271. serverContext: serverContext,
  272. err: err,
  273. }
  274. }()
  275. var result newServerContextResult
  276. select {
  277. case result = <-resultChannel:
  278. case <-handshakeCtx.Done():
  279. result.err = handshakeCtx.Err()
  280. // Interrupt the goroutine
  281. tunnel.Close(true)
  282. <-resultChannel
  283. }
  284. cancelFunc()
  285. wg.Wait()
  286. if result.err != nil {
  287. return errors.Trace(result.err)
  288. }
  289. serverContext = result.serverContext
  290. }
  291. // The activation succeeded.
  292. activationSucceeded = true
  293. tunnel.mutex.Lock()
  294. // It may happen that the tunnel gets closed while Activate is running.
  295. // In this case, abort here, to ensure that the operateTunnel goroutine
  296. // will not be launched after Close is called.
  297. if tunnel.isClosed {
  298. return errors.TraceNew("tunnel is closed")
  299. }
  300. tunnel.isActivated = true
  301. tunnel.serverContext = serverContext
  302. // establishDuration is the elapsed time between the controller starting tunnel
  303. // establishment and this tunnel being established. The reported value represents
  304. // how long the user waited between starting the client and having a usable tunnel;
  305. // or how long between the client detecting an unexpected tunnel disconnect and
  306. // completing automatic reestablishment.
  307. //
  308. // This time period may include time spent unsuccessfully connecting to other
  309. // servers. Time spent waiting for network connectivity is excluded.
  310. tunnel.establishDuration = time.Since(tunnel.adjustedEstablishStartTime)
  311. tunnel.establishedTime = time.Now()
  312. // Use the Background context instead of the controller run context, as tunnels
  313. // are terminated when the controller calls tunnel.Close.
  314. tunnel.operateCtx, tunnel.stopOperate = context.WithCancel(context.Background())
  315. tunnel.operateWaitGroup = new(sync.WaitGroup)
  316. // Spawn the operateTunnel goroutine, which monitors the tunnel and handles periodic
  317. // stats updates.
  318. tunnel.operateWaitGroup.Add(1)
  319. go tunnel.operateTunnel(tunnelOwner)
  320. tunnel.mutex.Unlock()
  321. return nil
  322. }
  323. func (tunnel *Tunnel) relayInproxyPacketRoundTrip(
  324. ctx context.Context, request *ssh.Request) (retErr error) {
  325. defer func() {
  326. if retErr != nil {
  327. _ = request.Reply(false, nil)
  328. }
  329. }()
  330. // Continue the broker/server relay started in handshake round trip.
  331. // server -> broker
  332. var relayRequest protocol.InproxyRelayRequest
  333. err := cbor.Unmarshal(request.Payload, &relayRequest)
  334. if err != nil {
  335. return errors.Trace(err)
  336. }
  337. inproxyConn := tunnel.dialParams.inproxyConn.Load().(*inproxy.ClientConn)
  338. if inproxyConn == nil {
  339. return errors.TraceNew("missing inproxyConn")
  340. }
  341. responsePacket, err := inproxyConn.RelayPacket(ctx, relayRequest.Packet)
  342. if err != nil {
  343. return errors.Trace(err)
  344. }
  345. // RelayPacket may return a nil packet when the relay is complete.
  346. if responsePacket == nil {
  347. return nil
  348. }
  349. // broker -> server
  350. relayResponse := &protocol.InproxyRelayResponse{
  351. Packet: responsePacket,
  352. }
  353. responsePayload, err := protocol.CBOREncoding.Marshal(relayResponse)
  354. if err != nil {
  355. return errors.Trace(err)
  356. }
  357. err = request.Reply(true, responsePayload)
  358. if err != nil {
  359. return errors.Trace(err)
  360. }
  361. return nil
  362. }
  363. // Close stops operating the tunnel and closes the underlying connection.
  364. // Supports multiple and/or concurrent calls to Close().
  365. // When isDiscarded is set, operateTunnel will not attempt to send final
  366. // status requests.
  367. func (tunnel *Tunnel) Close(isDiscarded bool) {
  368. tunnel.mutex.Lock()
  369. tunnel.isDiscarded = isDiscarded
  370. isActivated := tunnel.isActivated
  371. isClosed := tunnel.isClosed
  372. tunnel.isClosed = true
  373. tunnel.mutex.Unlock()
  374. if !isClosed {
  375. // Signal operateTunnel to stop before closing the tunnel -- this
  376. // allows a final status request to be made in the case of an orderly
  377. // shutdown.
  378. // A timer is set, so if operateTunnel takes too long to stop, the
  379. // tunnel is closed, which will interrupt any slow final status request.
  380. if isActivated {
  381. timeout := tunnel.getCustomParameters().Duration(
  382. parameters.TunnelOperateShutdownTimeout)
  383. afterFunc := time.AfterFunc(
  384. timeout,
  385. func() { tunnel.conn.Close() })
  386. tunnel.stopOperate()
  387. tunnel.operateWaitGroup.Wait()
  388. afterFunc.Stop()
  389. }
  390. tunnel.sshClient.Close()
  391. // tunnel.conn.Close() may get called multiple times, which is allowed.
  392. tunnel.conn.Close()
  393. err := tunnel.sshClient.Wait()
  394. if err != nil {
  395. NoticeWarning("close tunnel ssh error: %s", err)
  396. }
  397. }
  398. // Log burst metrics now that the BurstMonitoredConn is closed.
  399. // Metrics will be empty when burst monitoring is disabled.
  400. if !isDiscarded && isActivated {
  401. burstMetrics := tunnel.conn.GetMetrics(tunnel.monitoringStartTime)
  402. if len(burstMetrics) > 0 {
  403. NoticeBursts(
  404. tunnel.dialParams.ServerEntry.GetDiagnosticID(),
  405. burstMetrics)
  406. }
  407. }
  408. }
  409. // SetInFlightConnectedRequest checks if a connected request can begin and
  410. // sets the channel used to signal that the request is complete.
  411. //
  412. // The caller must not initiate a connected request when
  413. // SetInFlightConnectedRequest returns false. When SetInFlightConnectedRequest
  414. // returns true, the caller must call SetInFlightConnectedRequest(nil) when
  415. // the connected request completes.
  416. func (tunnel *Tunnel) SetInFlightConnectedRequest(requestSignal chan struct{}) bool {
  417. tunnel.mutex.Lock()
  418. defer tunnel.mutex.Unlock()
  419. // If already closing, don't start a connected request: the
  420. // TunnelOperateShutdownTimeout period may be nearly expired.
  421. if tunnel.isClosed {
  422. return false
  423. }
  424. if requestSignal == nil {
  425. // Not already in-flight (not expected)
  426. if tunnel.inFlightConnectedRequestSignal == nil {
  427. return false
  428. }
  429. } else {
  430. // Already in-flight (not expected)
  431. if tunnel.inFlightConnectedRequestSignal != nil {
  432. return false
  433. }
  434. }
  435. tunnel.inFlightConnectedRequestSignal = requestSignal
  436. return true
  437. }
  438. // AwaitInFlightConnectedRequest waits for the signal that any in-flight
  439. // connected request is complete.
  440. //
  441. // AwaitInFlightConnectedRequest may block until the connected request is
  442. // aborted by terminating the tunnel.
  443. func (tunnel *Tunnel) AwaitInFlightConnectedRequest() {
  444. tunnel.mutex.Lock()
  445. requestSignal := tunnel.inFlightConnectedRequestSignal
  446. tunnel.mutex.Unlock()
  447. if requestSignal != nil {
  448. <-requestSignal
  449. }
  450. }
  451. // IsActivated returns the tunnel's activated flag.
  452. func (tunnel *Tunnel) IsActivated() bool {
  453. tunnel.mutex.Lock()
  454. defer tunnel.mutex.Unlock()
  455. return tunnel.isActivated
  456. }
  457. // IsDiscarded returns the tunnel's discarded flag.
  458. func (tunnel *Tunnel) IsDiscarded() bool {
  459. tunnel.mutex.Lock()
  460. defer tunnel.mutex.Unlock()
  461. return tunnel.isDiscarded
  462. }
  463. // SendAPIRequest sends an API request as an SSH request through the tunnel.
  464. // This function blocks awaiting a response. Only one request may be in-flight
  465. // at once; a concurrent SendAPIRequest will block until an active request
  466. // receives its response (or the SSH connection is terminated).
  467. func (tunnel *Tunnel) SendAPIRequest(
  468. name string, requestPayload []byte) ([]byte, error) {
  469. ok, responsePayload, err := tunnel.sshClient.Conn.SendRequest(
  470. name, true, requestPayload)
  471. if err != nil {
  472. return nil, errors.Trace(err)
  473. }
  474. if !ok {
  475. return nil, errors.TraceNew("API request rejected")
  476. }
  477. return responsePayload, nil
  478. }
  479. // DialTCPChannel establishes a TCP port forward connection through the
  480. // tunnel.
  481. //
  482. // When split tunnel mode is enabled, and unless alwaysTunneled is set, the
  483. // server may reject the port forward and indicate that the client is to make
  484. // direct, untunneled connection. In this case, the bool return value is true
  485. // and net.Conn and error are nil.
  486. //
  487. // downstreamConn is an optional parameter which specifies a connection to be
  488. // explicitly closed when the dialed connection is closed.
  489. func (tunnel *Tunnel) DialTCPChannel(
  490. remoteAddr string,
  491. alwaysTunneled bool,
  492. downstreamConn net.Conn) (net.Conn, bool, error) {
  493. channelType := "direct-tcpip"
  494. if alwaysTunneled && tunnel.config.IsSplitTunnelEnabled() {
  495. // This channel type is only necessary in split tunnel mode.
  496. channelType = protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE
  497. }
  498. channel, err := tunnel.dialChannel(channelType, remoteAddr)
  499. if err != nil {
  500. if isSplitTunnelRejectReason(err) {
  501. return nil, true, nil
  502. }
  503. return nil, false, errors.Trace(err)
  504. }
  505. netConn, ok := channel.(net.Conn)
  506. if !ok {
  507. return nil, false, errors.Tracef("unexpected channel type: %T", channel)
  508. }
  509. conn := &TunneledConn{
  510. Conn: netConn,
  511. tunnel: tunnel,
  512. downstreamConn: downstreamConn}
  513. return tunnel.wrapWithTransferStats(conn), false, nil
  514. }
  515. func (tunnel *Tunnel) DialPacketTunnelChannel() (net.Conn, error) {
  516. channel, err := tunnel.dialChannel(protocol.PACKET_TUNNEL_CHANNEL_TYPE, "")
  517. if err != nil {
  518. return nil, errors.Trace(err)
  519. }
  520. sshChannel, ok := channel.(ssh.Channel)
  521. if !ok {
  522. return nil, errors.Tracef("unexpected channel type: %T", channel)
  523. }
  524. NoticeInfo("DialPacketTunnelChannel: established channel")
  525. conn := newChannelConn(sshChannel)
  526. // wrapWithTransferStats will track bytes transferred for the
  527. // packet tunnel. It will count packet overhead (TCP/UDP/IP headers).
  528. //
  529. // Since the data in the channel is not HTTP or TLS, no domain bytes
  530. // counting is expected.
  531. //
  532. // transferstats are also used to determine that there's been recent
  533. // activity and skip periodic SSH keep alives; see Tunnel.operateTunnel.
  534. return tunnel.wrapWithTransferStats(conn), nil
  535. }
  536. func (tunnel *Tunnel) dialChannel(channelType, remoteAddr string) (interface{}, error) {
  537. if !tunnel.IsActivated() {
  538. return nil, errors.TraceNew("tunnel is not activated")
  539. }
  540. // Note: there is no dial context since SSH port forward dials cannot
  541. // be interrupted directly. Closing the tunnel will interrupt the dials.
  542. // A timeout is set to unblock this function, but the goroutine may
  543. // not exit until the tunnel is closed.
  544. type channelDialResult struct {
  545. channel interface{}
  546. err error
  547. }
  548. // Use a buffer of 1 as there are two senders and only one guaranteed receive.
  549. results := make(chan *channelDialResult, 1)
  550. afterFunc := time.AfterFunc(
  551. tunnel.getCustomParameters().Duration(
  552. parameters.TunnelPortForwardDialTimeout),
  553. func() {
  554. results <- &channelDialResult{
  555. nil, errors.Tracef("channel dial timeout: %s", channelType)}
  556. })
  557. defer afterFunc.Stop()
  558. go func() {
  559. result := new(channelDialResult)
  560. switch channelType {
  561. case "direct-tcpip", protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE:
  562. // The protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE is the same as
  563. // "direct-tcpip", except split tunnel channel rejections are disallowed
  564. // even when split tunnel mode is enabled.
  565. result.channel, result.err =
  566. tunnel.sshClient.Dial(channelType, remoteAddr)
  567. default:
  568. var sshRequests <-chan *ssh.Request
  569. result.channel, sshRequests, result.err =
  570. tunnel.sshClient.OpenChannel(channelType, nil)
  571. if result.err == nil {
  572. go ssh.DiscardRequests(sshRequests)
  573. }
  574. }
  575. if result.err != nil {
  576. result.err = errors.Trace(result.err)
  577. }
  578. results <- result
  579. }()
  580. result := <-results
  581. if result.err != nil {
  582. if !isSplitTunnelRejectReason(result.err) {
  583. select {
  584. case tunnel.signalPortForwardFailure <- struct{}{}:
  585. default:
  586. }
  587. }
  588. return nil, errors.Trace(result.err)
  589. }
  590. return result.channel, nil
  591. }
  592. func isSplitTunnelRejectReason(err error) bool {
  593. var openChannelErr *ssh.OpenChannelError
  594. if std_errors.As(err, &openChannelErr) {
  595. return openChannelErr.Reason ==
  596. ssh.RejectionReason(protocol.CHANNEL_REJECT_REASON_SPLIT_TUNNEL)
  597. }
  598. return false
  599. }
  600. func (tunnel *Tunnel) wrapWithTransferStats(conn net.Conn) net.Conn {
  601. // Tunnel does not have a serverContext when DisableApi is set. We still use
  602. // transferstats.Conn to count bytes transferred for monitoring tunnel
  603. // quality.
  604. var regexps *transferstats.Regexps
  605. if tunnel.serverContext != nil {
  606. regexps = tunnel.serverContext.StatsRegexps()
  607. }
  608. return transferstats.NewConn(
  609. conn, tunnel.dialParams.ServerEntry.IpAddress, regexps)
  610. }
  611. // SignalComponentFailure notifies the tunnel that an associated component has failed.
  612. // This will terminate the tunnel.
  613. func (tunnel *Tunnel) SignalComponentFailure() {
  614. NoticeWarning("tunnel received component failure signal")
  615. tunnel.Close(false)
  616. }
  617. // TunneledConn implements net.Conn and wraps a port forward connection.
  618. // It is used to hook into Read and Write to observe I/O errors and
  619. // report these errors back to the tunnel monitor as port forward failures.
  620. // TunneledConn optionally tracks a peer connection to be explicitly closed
  621. // when the TunneledConn is closed.
  622. type TunneledConn struct {
  623. net.Conn
  624. tunnel *Tunnel
  625. downstreamConn net.Conn
  626. }
  627. func (conn *TunneledConn) Read(buffer []byte) (n int, err error) {
  628. n, err = conn.Conn.Read(buffer)
  629. if err != nil && err != io.EOF {
  630. // Report new failure. Won't block; assumes the receiver
  631. // has a sufficient buffer for the threshold number of reports.
  632. // TODO: conditional on type of error or error message?
  633. select {
  634. case conn.tunnel.signalPortForwardFailure <- struct{}{}:
  635. default:
  636. }
  637. }
  638. return
  639. }
  640. func (conn *TunneledConn) Write(buffer []byte) (n int, err error) {
  641. n, err = conn.Conn.Write(buffer)
  642. if err != nil && err != io.EOF {
  643. // Same as TunneledConn.Read()
  644. select {
  645. case conn.tunnel.signalPortForwardFailure <- struct{}{}:
  646. default:
  647. }
  648. }
  649. return
  650. }
  651. func (conn *TunneledConn) Close() error {
  652. if conn.downstreamConn != nil {
  653. conn.downstreamConn.Close()
  654. }
  655. return conn.Conn.Close()
  656. }
  657. type dialResult struct {
  658. dialConn net.Conn
  659. monitoringStartTime time.Time
  660. monitoredConn *common.BurstMonitoredConn
  661. sshClient *ssh.Client
  662. sshRequests <-chan *ssh.Request
  663. livenessTestMetrics *livenessTestMetrics
  664. extraFailureAction func()
  665. }
  666. // dialTunnel is a helper that builds the transport layers and establishes the
  667. // SSH connection. When additional dial configuration is used, dial metrics
  668. // are recorded and returned.
  669. func dialTunnel(
  670. ctx context.Context,
  671. config *Config,
  672. dialParams *DialParameters) (_ *dialResult, retErr error) {
  673. // Return immediately when overall context is canceled or timed-out. This
  674. // avoids notice noise.
  675. err := ctx.Err()
  676. if err != nil {
  677. return nil, errors.Trace(err)
  678. }
  679. p := getCustomParameters(config, dialParams)
  680. timeout := p.Duration(parameters.TunnelConnectTimeout)
  681. rateLimits := p.RateLimits(parameters.TunnelRateLimits)
  682. obfuscatedSSHMinPadding := p.Int(parameters.ObfuscatedSSHMinPadding)
  683. obfuscatedSSHMaxPadding := p.Int(parameters.ObfuscatedSSHMaxPadding)
  684. livenessTestMinUpstreamBytes := p.Int(parameters.LivenessTestMinUpstreamBytes)
  685. livenessTestMaxUpstreamBytes := p.Int(parameters.LivenessTestMaxUpstreamBytes)
  686. livenessTestMinDownstreamBytes := p.Int(parameters.LivenessTestMinDownstreamBytes)
  687. livenessTestMaxDownstreamBytes := p.Int(parameters.LivenessTestMaxDownstreamBytes)
  688. burstUpstreamTargetBytes := int64(p.Int(parameters.ClientBurstUpstreamTargetBytes))
  689. burstUpstreamDeadline := p.Duration(parameters.ClientBurstUpstreamDeadline)
  690. burstDownstreamTargetBytes := int64(p.Int(parameters.ClientBurstDownstreamTargetBytes))
  691. burstDownstreamDeadline := p.Duration(parameters.ClientBurstDownstreamDeadline)
  692. tlsOSSHApplyTrafficShaping := p.WeightedCoinFlip(parameters.TLSTunnelTrafficShapingProbability)
  693. tlsOSSHMinTLSPadding := p.Int(parameters.TLSTunnelMinTLSPadding)
  694. tlsOSSHMaxTLSPadding := p.Int(parameters.TLSTunnelMaxTLSPadding)
  695. conjureEnableIPv6Dials := p.Bool(parameters.ConjureEnableIPv6Dials)
  696. conjureEnablePortRandomization := p.Bool(parameters.ConjureEnablePortRandomization)
  697. conjureEnableRegistrationOverrides := p.Bool(parameters.ConjureEnableRegistrationOverrides)
  698. p.Close()
  699. // Ensure that, unless the base context is cancelled, any replayed dial
  700. // parameters are cleared, no longer to be retried, if the tunnel fails to
  701. // connect.
  702. //
  703. // Limitation: dials that fail to connect due to the server being in a
  704. // load-limiting state are not distinguished and excepted from this
  705. // logic.
  706. dialSucceeded := false
  707. baseCtx := ctx
  708. var failedTunnelLivenessTestMetrics *livenessTestMetrics
  709. var extraFailureAction func()
  710. defer func() {
  711. if !dialSucceeded && baseCtx.Err() != context.Canceled {
  712. dialParams.Failed(config)
  713. if extraFailureAction != nil {
  714. extraFailureAction()
  715. }
  716. if retErr != nil {
  717. _ = RecordFailedTunnelStat(
  718. config,
  719. dialParams,
  720. failedTunnelLivenessTestMetrics,
  721. -1,
  722. -1,
  723. retErr)
  724. }
  725. }
  726. }()
  727. var cancelFunc context.CancelFunc
  728. ctx, cancelFunc = context.WithTimeout(ctx, timeout)
  729. defer cancelFunc()
  730. // DialDuration is the elapsed time for both successful and failed tunnel
  731. // dials. For successful tunnels, it includes any the network protocol
  732. // handshake(s), obfuscation protocol handshake(s), SSH handshake, and
  733. // liveness test, when performed.
  734. //
  735. // Note: ensure DialDuration is set before calling any function which logs
  736. // dial_duration.
  737. startDialTime := time.Now()
  738. defer func() {
  739. dialParams.DialDuration = time.Since(startDialTime)
  740. }()
  741. // Note: dialParams.MeekResolvedIPAddress isn't set until the dial begins,
  742. // so it will always be blank in NoticeConnectingServer.
  743. NoticeConnectingServer(dialParams)
  744. // Create the base transport: meek or direct connection
  745. var dialConn net.Conn
  746. if protocol.TunnelProtocolUsesMeek(dialParams.TunnelProtocol) {
  747. dialConn, err = DialMeek(
  748. ctx,
  749. dialParams.GetMeekConfig(),
  750. dialParams.GetDialConfig())
  751. if err != nil {
  752. return nil, errors.Trace(err)
  753. }
  754. } else if protocol.TunnelProtocolUsesQUIC(dialParams.TunnelProtocol) {
  755. var packetConn net.PacketConn
  756. var remoteAddr *net.UDPAddr
  757. // Special case: explict in-proxy dial. TCP dials wire up in-proxy
  758. // dials via DialConfig and its CustomDialer using
  759. // makeInproxyTCPDialer. common/quic doesn't have an equivilent to
  760. // CustomDialer.
  761. if protocol.TunnelProtocolUsesInproxy(dialParams.TunnelProtocol) {
  762. packetConn, err = dialInproxy(ctx, config, dialParams)
  763. if err != nil {
  764. return nil, errors.Trace(err)
  765. }
  766. // Use the actual 2nd hop destination address as the remote
  767. // address for correct behavior in
  768. // common/quic.getMaxPreDiscoveryPacketSize, which differs for
  769. // IPv4 vs. IPv6 destination addresses; and
  770. // ObfuscatedPacketConn.RemoteAddr. The 2nd hop destination
  771. // address is not actually dialed.
  772. //
  773. // Limitation: for domain destinations, the in-proxy proxy
  774. // resolves the domain, so just assume IPv6, which has lower max
  775. // padding(see quic.getMaxPreDiscoveryPacketSize), and use a stub
  776. // address.
  777. host, portStr, err := net.SplitHostPort(dialParams.DirectDialAddress)
  778. if err != nil {
  779. return nil, errors.Trace(err)
  780. }
  781. port, err := strconv.Atoi(portStr)
  782. if err != nil {
  783. return nil, errors.Trace(err)
  784. }
  785. IP := net.ParseIP(host)
  786. if IP == nil {
  787. IP = net.ParseIP("fd00::")
  788. }
  789. remoteAddr = &net.UDPAddr{IP: IP, Port: port}
  790. } else {
  791. packetConn, remoteAddr, err = NewUDPConn(
  792. ctx, "udp", false, "", dialParams.DirectDialAddress, dialParams.GetDialConfig())
  793. if err != nil {
  794. return nil, errors.Trace(err)
  795. }
  796. }
  797. dialConn, err = quic.Dial(
  798. ctx,
  799. packetConn,
  800. remoteAddr,
  801. dialParams.QUICDialSNIAddress,
  802. dialParams.QUICVersion,
  803. dialParams.QUICClientHelloSeed,
  804. dialParams.ServerEntry.SshObfuscatedKey,
  805. dialParams.ObfuscatedQUICPaddingSeed,
  806. dialParams.ObfuscatedQUICNonceTransformerParameters,
  807. dialParams.QUICDisablePathMTUDiscovery,
  808. dialParams.QUICMaxPacketSizeAdjustment,
  809. dialParams.QUICDialEarly,
  810. dialParams.QUICUseObfuscatedPSK,
  811. dialParams.quicTLSClientSessionCache)
  812. if err != nil {
  813. return nil, errors.Trace(err)
  814. }
  815. } else if protocol.TunnelProtocolUsesTapDance(dialParams.TunnelProtocol) {
  816. dialConn, err = refraction.DialTapDance(
  817. ctx,
  818. config.EmitRefractionNetworkingLogs,
  819. config.GetPsiphonDataDirectory(),
  820. NewRefractionNetworkingDialer(dialParams.GetDialConfig()).DialContext,
  821. dialParams.DirectDialAddress)
  822. if err != nil {
  823. return nil, errors.Trace(err)
  824. }
  825. } else if protocol.TunnelProtocolUsesConjure(dialParams.TunnelProtocol) {
  826. dialConn, extraFailureAction, err = dialConjure(
  827. ctx,
  828. config,
  829. dialParams,
  830. conjureEnableIPv6Dials,
  831. conjureEnablePortRandomization,
  832. conjureEnableRegistrationOverrides)
  833. if err != nil {
  834. return nil, errors.Trace(err)
  835. }
  836. } else if protocol.TunnelProtocolUsesTLSOSSH(dialParams.TunnelProtocol) {
  837. dialConn, err = DialTLSTunnel(
  838. ctx,
  839. dialParams.GetTLSOSSHConfig(config),
  840. dialParams.GetDialConfig(),
  841. tlsOSSHApplyTrafficShaping,
  842. tlsOSSHMinTLSPadding,
  843. tlsOSSHMaxTLSPadding)
  844. if err != nil {
  845. return nil, errors.Trace(err)
  846. }
  847. } else if protocol.TunnelProtocolUsesShadowsocks(dialParams.TunnelProtocol) {
  848. dialConn, err = DialShadowsocksTunnel(
  849. ctx,
  850. dialParams.GetShadowsocksConfig(),
  851. dialParams.GetDialConfig(),
  852. )
  853. if err != nil {
  854. return nil, errors.Trace(err)
  855. }
  856. } else {
  857. // Use NewTCPDialer and don't use DialTCP directly, to ensure that
  858. // dialParams.GetDialConfig()CustomDialer is applied.
  859. tcpDialer := NewTCPDialer(dialParams.GetDialConfig())
  860. dialConn, err = tcpDialer(ctx, "tcp", dialParams.DirectDialAddress)
  861. if err != nil {
  862. return nil, errors.Trace(err)
  863. }
  864. }
  865. // Some conns report additional metrics. fragmentor.Conns report
  866. // fragmentor configs.
  867. if metricsSource, ok := dialConn.(common.MetricsSource); ok {
  868. dialParams.DialConnMetrics = metricsSource
  869. }
  870. if noticeMetricsSource, ok := dialConn.(common.NoticeMetricsSource); ok {
  871. dialParams.DialConnNoticeMetrics = noticeMetricsSource
  872. }
  873. // If dialConn is not a Closer, tunnel failure detection may be slower
  874. if _, ok := dialConn.(common.Closer); !ok {
  875. NoticeWarning("tunnel.dialTunnel: dialConn is not a Closer")
  876. }
  877. cleanupConn := dialConn
  878. defer func() {
  879. // Cleanup on error
  880. if cleanupConn != nil {
  881. cleanupConn.Close()
  882. }
  883. }()
  884. monitoringStartTime := time.Now()
  885. monitoredConn := common.NewBurstMonitoredConn(
  886. dialConn,
  887. false,
  888. burstUpstreamTargetBytes, burstUpstreamDeadline,
  889. burstDownstreamTargetBytes, burstDownstreamDeadline)
  890. // Apply throttling (if configured). The underlying dialConn is always a
  891. // stream, even when the network conn uses UDP.
  892. throttledConn := common.NewThrottledConn(
  893. monitoredConn,
  894. true,
  895. rateLimits)
  896. // Add obfuscated SSH layer
  897. var sshConn net.Conn = throttledConn
  898. if protocol.TunnelProtocolUsesObfuscatedSSH(dialParams.TunnelProtocol) {
  899. obfuscatedSSHConn, err := obfuscator.NewClientObfuscatedSSHConn(
  900. throttledConn,
  901. dialParams.ServerEntry.SshObfuscatedKey,
  902. dialParams.ObfuscatorPaddingSeed,
  903. dialParams.OSSHObfuscatorSeedTransformerParameters,
  904. dialParams.OSSHPrefixSpec,
  905. dialParams.OSSHPrefixSplitConfig,
  906. &obfuscatedSSHMinPadding,
  907. &obfuscatedSSHMaxPadding)
  908. if err != nil {
  909. return nil, errors.Trace(err)
  910. }
  911. sshConn = obfuscatedSSHConn
  912. dialParams.ObfuscatedSSHConnMetrics = obfuscatedSSHConn
  913. }
  914. // Now establish the SSH session over the conn transport
  915. expectedPublicKey, err := base64.StdEncoding.DecodeString(
  916. dialParams.ServerEntry.SshHostKey)
  917. if err != nil {
  918. return nil, errors.Trace(err)
  919. }
  920. sshCertChecker := &ssh.CertChecker{
  921. IsHostAuthority: func(auth ssh.PublicKey, address string) bool {
  922. // Psiphon servers do not currently use SSH certificates. This CertChecker
  923. // code path may still be hit if a client attempts to connect using an
  924. // obsolete server entry.
  925. return false
  926. },
  927. HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
  928. // The remote address input isn't checked. In the case of fronted
  929. // protocols, the immediate remote peer won't be the Psiphon
  930. // server. In direct cases, the client has just dialed the IP
  931. // address and expected public key both taken from the same
  932. // trusted, signed server entry.
  933. if !bytes.Equal(expectedPublicKey, publicKey.Marshal()) {
  934. return errors.TraceNew("unexpected host public key")
  935. }
  936. return nil
  937. },
  938. }
  939. sshPasswordPayload := &protocol.SSHPasswordPayload{
  940. SessionId: config.SessionID,
  941. SshPassword: dialParams.ServerEntry.SshPassword,
  942. ClientCapabilities: []string{protocol.CLIENT_CAPABILITY_SERVER_REQUESTS},
  943. }
  944. payload, err := json.Marshal(sshPasswordPayload)
  945. if err != nil {
  946. return nil, errors.Trace(err)
  947. }
  948. sshClientConfig := &ssh.ClientConfig{
  949. User: dialParams.ServerEntry.SshUsername,
  950. Auth: []ssh.AuthMethod{
  951. ssh.Password(string(payload)),
  952. },
  953. HostKeyCallback: sshCertChecker.CheckHostKey,
  954. ClientVersion: dialParams.SSHClientVersion,
  955. }
  956. sshClientConfig.KEXPRNGSeed = dialParams.SSHKEXSeed
  957. if protocol.TunnelProtocolUsesObfuscatedSSH(dialParams.TunnelProtocol) {
  958. if config.ObfuscatedSSHAlgorithms != nil {
  959. sshClientConfig.KeyExchanges = []string{config.ObfuscatedSSHAlgorithms[0]}
  960. sshClientConfig.Ciphers = []string{config.ObfuscatedSSHAlgorithms[1]}
  961. sshClientConfig.MACs = []string{config.ObfuscatedSSHAlgorithms[2]}
  962. sshClientConfig.HostKeyAlgorithms = []string{config.ObfuscatedSSHAlgorithms[3]}
  963. } else {
  964. // With Encrypt-then-MAC hash algorithms, packet length is
  965. // transmitted in plaintext, which aids in traffic analysis.
  966. //
  967. // TUNNEL_PROTOCOL_SSH is excepted since its KEX appears in plaintext,
  968. // and the protocol is intended to look like SSH on the wire.
  969. sshClientConfig.NoEncryptThenMACHash = true
  970. }
  971. } else {
  972. // For TUNNEL_PROTOCOL_SSH only, the server is expected to randomize
  973. // its KEX; setting PeerKEXPRNGSeed will ensure successful negotiation
  974. // between two randomized KEXes.
  975. if dialParams.ServerEntry.SshObfuscatedKey != "" {
  976. sshClientConfig.PeerKEXPRNGSeed, err = protocol.DeriveSSHServerKEXPRNGSeed(
  977. dialParams.ServerEntry.SshObfuscatedKey)
  978. if err != nil {
  979. return nil, errors.Trace(err)
  980. }
  981. }
  982. }
  983. // The ssh session establishment (via ssh.NewClientConn) is wrapped
  984. // in a timeout to ensure it won't hang. We've encountered firewalls
  985. // that allow the TCP handshake to complete but then send a RST to the
  986. // server-side and nothing to the client-side, and if that happens
  987. // while ssh.NewClientConn is reading, it may wait forever. The timeout
  988. // closes the conn, which interrupts it.
  989. // Note: TCP handshake timeouts are provided by TCPConn, and session
  990. // timeouts *after* ssh establishment are provided by the ssh keep alive
  991. // in operate tunnel.
  992. type sshNewClientResult struct {
  993. sshClient *ssh.Client
  994. sshRequests <-chan *ssh.Request
  995. livenessTestMetrics *livenessTestMetrics
  996. err error
  997. }
  998. resultChannel := make(chan sshNewClientResult)
  999. // Call NewClientConn in a goroutine, as it blocks on SSH handshake network
  1000. // operations, and would block canceling or shutdown. If the parent context
  1001. // is canceled, close the net.Conn underlying SSH, which will interrupt the
  1002. // SSH handshake that may be blocking NewClientConn.
  1003. go func() {
  1004. // The following is adapted from ssh.Dial(), here using a custom conn
  1005. // The sshAddress is passed through to host key verification callbacks; we don't use it.
  1006. sshAddress := ""
  1007. sshClientConn, sshChannels, sshRequests, err := ssh.NewClientConn(
  1008. sshConn, sshAddress, sshClientConfig)
  1009. var sshClient *ssh.Client
  1010. var metrics *livenessTestMetrics
  1011. if err == nil {
  1012. // sshRequests is handled by operateTunnel.
  1013. // ssh.NewClient also expects to handle the sshRequests
  1014. // value from ssh.NewClientConn and will spawn a goroutine
  1015. // to handle the <-chan *ssh.Request, so we must provide
  1016. // a closed channel to ensure that goroutine halts instead
  1017. // of hanging on a nil channel.
  1018. noRequests := make(chan *ssh.Request)
  1019. close(noRequests)
  1020. sshClient = ssh.NewClient(sshClientConn, sshChannels, noRequests)
  1021. if livenessTestMaxUpstreamBytes > 0 || livenessTestMaxDownstreamBytes > 0 {
  1022. // When configured, perform a liveness test which sends and
  1023. // receives bytes through the tunnel to ensure the tunnel had
  1024. // not been blocked upon or shortly after connecting. This
  1025. // test is performed concurrently for each establishment
  1026. // candidate before selecting a successful tunnel.
  1027. //
  1028. // Note that the liveness test is subject to the
  1029. // TunnelConnectTimeout, which should be adjusted
  1030. // accordinging.
  1031. metrics, err = performLivenessTest(
  1032. sshClient,
  1033. livenessTestMinUpstreamBytes, livenessTestMaxUpstreamBytes,
  1034. livenessTestMinDownstreamBytes, livenessTestMaxDownstreamBytes,
  1035. dialParams.LivenessTestSeed)
  1036. // Skip notice when cancelling.
  1037. if baseCtx.Err() == nil {
  1038. NoticeLivenessTest(
  1039. dialParams.ServerEntry.GetDiagnosticID(), metrics, err == nil)
  1040. }
  1041. }
  1042. }
  1043. resultChannel <- sshNewClientResult{sshClient, sshRequests, metrics, err}
  1044. }()
  1045. var result sshNewClientResult
  1046. select {
  1047. case result = <-resultChannel:
  1048. case <-ctx.Done():
  1049. // Interrupt the goroutine and capture its error context to
  1050. // distinguish point of failure.
  1051. err := ctx.Err()
  1052. sshConn.Close()
  1053. result = <-resultChannel
  1054. if result.err != nil {
  1055. result.err = fmt.Errorf("%s: %s", err, result.err)
  1056. } else {
  1057. result.err = err
  1058. }
  1059. }
  1060. if result.err != nil {
  1061. failedTunnelLivenessTestMetrics = result.livenessTestMetrics
  1062. return nil, errors.Trace(result.err)
  1063. }
  1064. dialSucceeded = true
  1065. NoticeConnectedServer(dialParams)
  1066. cleanupConn = nil
  1067. // Invoke DNS cache extension (if enabled in the resolver) now that the
  1068. // tunnel is connected and the Psiphon server is authenticated. This
  1069. // demonstrates that any domain name resolved to an endpoint that is or
  1070. // is forwarded to the expected Psiphon server.
  1071. //
  1072. // Limitation: DNS cache extension is not implemented for Refraction
  1073. // Networking protocols. iOS VPN, the primary use case for DNS cache
  1074. // extension, does not enable Refraction Networking.
  1075. if protocol.TunnelProtocolUsesFrontedMeek(dialParams.TunnelProtocol) {
  1076. resolver := config.GetResolver()
  1077. if resolver != nil {
  1078. resolver.VerifyCacheExtension(dialParams.MeekFrontingDialAddress)
  1079. }
  1080. }
  1081. // When configured to do so, hold-off on activating this tunnel. This allows
  1082. // some extra time for slower but less resource intensive protocols to
  1083. // establish tunnels. By holding off post-connect, the client has this
  1084. // established tunnel ready to activate in case other protocols fail to
  1085. // establish. This hold-off phase continues to consume one connection worker.
  1086. //
  1087. // The network latency multiplier is not applied to HoldOffTunnelDuration,
  1088. // as the goal is to apply a consistent hold-off range across all tunnel
  1089. // candidates; and this avoids scaling up any delay users experience.
  1090. //
  1091. // The hold-off is applied regardless of whether this is the first tunnel
  1092. // in a session or a reconnection, even to a server affinity candidate,
  1093. // so that the advantage for other protocols persists.
  1094. if dialParams.HoldOffTunnelDuration > 0 {
  1095. NoticeHoldOffTunnel(dialParams.ServerEntry.GetDiagnosticID(), dialParams.HoldOffTunnelDuration)
  1096. common.SleepWithContext(ctx, dialParams.HoldOffTunnelDuration)
  1097. }
  1098. // Note: dialConn may be used to close the underlying network connection
  1099. // but should not be used to perform I/O as that would interfere with SSH
  1100. // (and also bypasses throttling).
  1101. return &dialResult{
  1102. dialConn: dialConn,
  1103. monitoringStartTime: monitoringStartTime,
  1104. monitoredConn: monitoredConn,
  1105. sshClient: result.sshClient,
  1106. sshRequests: result.sshRequests,
  1107. livenessTestMetrics: result.livenessTestMetrics,
  1108. extraFailureAction: extraFailureAction,
  1109. },
  1110. nil
  1111. }
  1112. func dialConjure(
  1113. ctx context.Context,
  1114. config *Config,
  1115. dialParams *DialParameters,
  1116. enableIPv6Dials bool,
  1117. enablePortRandomization bool,
  1118. enableRegistrationOverrides bool) (net.Conn, func(), error) {
  1119. // Specify a cache key with a scope that ensures that:
  1120. //
  1121. // (a) cached registrations aren't used across different networks, as a
  1122. // registration requires the client's public IP to match the value at time
  1123. // of registration;
  1124. //
  1125. // (b) cached registrations are associated with specific Psiphon server
  1126. // candidates, to ensure that replay will use the same phantom IP(s).
  1127. //
  1128. // This scheme allows for reuse of cached registrations on network A when a
  1129. // client roams from network A to network B and back to network A.
  1130. //
  1131. // Using the network ID as a proxy for client public IP address is a
  1132. // heurisitic: it's possible that a clients public IP address changes
  1133. // without the network ID changing, and it's not guaranteed that the client
  1134. // will be assigned the original public IP on network A; so there's some
  1135. // chance the registration cannot be reused.
  1136. diagnosticID := dialParams.ServerEntry.GetDiagnosticID()
  1137. cacheKey := dialParams.NetworkID + "-" + diagnosticID
  1138. conjureConfig := &refraction.ConjureConfig{
  1139. RegistrationCacheTTL: dialParams.ConjureCachedRegistrationTTL,
  1140. RegistrationCacheKey: cacheKey,
  1141. EnableIPv6Dials: enableIPv6Dials,
  1142. EnablePortRandomization: enablePortRandomization,
  1143. EnableRegistrationOverrides: enableRegistrationOverrides,
  1144. Transport: dialParams.ConjureTransport,
  1145. STUNServerAddress: dialParams.ConjureSTUNServerAddress,
  1146. DTLSEmptyInitialPacket: dialParams.ConjureDTLSEmptyInitialPacket,
  1147. DiagnosticID: diagnosticID,
  1148. Logger: NoticeCommonLogger(false),
  1149. }
  1150. if dialParams.ConjureAPIRegistration {
  1151. // Use MeekConn to domain front Conjure API registration.
  1152. //
  1153. // ConjureAPIRegistrarFrontingSpecs are applied via
  1154. // dialParams.GetMeekConfig, and will be subject to replay.
  1155. //
  1156. // Since DialMeek will create a TLS connection immediately, and a cached
  1157. // registration may be used, we will delay initializing the MeekConn-based
  1158. // RoundTripper until we know it's needed. This is implemented by passing
  1159. // in a RoundTripper that establishes a MeekConn when RoundTrip is called.
  1160. //
  1161. // In refraction.dial we configure 0 retries for API registration requests,
  1162. // assuming it's better to let another Psiphon candidate retry, with new
  1163. // domaing fronting parameters. As such, we expect only one round trip call
  1164. // per NewHTTPRoundTripper, so, in practise, there's no performance penalty
  1165. // from establishing a new MeekConn per round trip.
  1166. //
  1167. // Performing the full DialMeek/RoundTrip operation here allows us to call
  1168. // MeekConn.Close and ensure all resources are immediately cleaned up.
  1169. roundTrip := func(request *http.Request) (*http.Response, error) {
  1170. conn, err := DialMeek(
  1171. ctx, dialParams.GetMeekConfig(), dialParams.GetDialConfig())
  1172. if err != nil {
  1173. return nil, errors.Trace(err)
  1174. }
  1175. defer conn.Close()
  1176. response, err := conn.RoundTrip(request)
  1177. if err != nil {
  1178. return nil, errors.Trace(err)
  1179. }
  1180. // Read the response into a buffer and close the response
  1181. // body, ensuring that MeekConn.Close closes all idle connections.
  1182. //
  1183. // Alternatively, we could Clone the request to set
  1184. // http.Request.Close and avoid keeping any idle connection
  1185. // open after the response body is read by gotapdance. Since
  1186. // the response body is small and since gotapdance does not
  1187. // stream the response body, we're taking this approach which
  1188. // ensures cleanup.
  1189. body, err := ioutil.ReadAll(response.Body)
  1190. _ = response.Body.Close()
  1191. if err != nil {
  1192. return nil, errors.Trace(err)
  1193. }
  1194. response.Body = io.NopCloser(bytes.NewReader(body))
  1195. return response, nil
  1196. }
  1197. conjureConfig.APIRegistrarHTTPClient = &http.Client{
  1198. Transport: common.NewHTTPRoundTripper(roundTrip),
  1199. }
  1200. conjureConfig.APIRegistrarBidirectionalURL =
  1201. dialParams.ConjureAPIRegistrarBidirectionalURL
  1202. conjureConfig.APIRegistrarDelay = dialParams.ConjureAPIRegistrarDelay
  1203. } else if dialParams.ConjureDecoyRegistration {
  1204. // The Conjure "phantom" connection is compatible with fragmentation, but
  1205. // the decoy registrar connection, like Tapdance, is not, so force it off.
  1206. // Any tunnel fragmentation metrics will refer to the "phantom" connection
  1207. // only.
  1208. conjureConfig.DoDecoyRegistration = true
  1209. conjureConfig.DecoyRegistrarWidth = dialParams.ConjureDecoyRegistrarWidth
  1210. conjureConfig.DecoyRegistrarDelay = dialParams.ConjureDecoyRegistrarDelay
  1211. }
  1212. // Set extraFailureAction, which is invoked whenever the tunnel fails (i.e.,
  1213. // where RecordFailedTunnelStat is invoked). The action will remove any
  1214. // cached registration. When refraction.DialConjure succeeds, the underlying
  1215. // registration is cached. After refraction.DialConjure returns, it no
  1216. // longer modifies the cached state of that registration, assuming that it
  1217. // remains valid and effective. However adversarial impact on a given
  1218. // phantom IP may not become evident until after the initial TCP connection
  1219. // establishment and handshake performed by refraction.DialConjure. For
  1220. // example, it may be that the phantom dial is targeted for severe
  1221. // throttling which begins or is only evident later in the flow. Scheduling
  1222. // a call to DeleteCachedConjureRegistration allows us to invalidate the
  1223. // cached registration for a tunnel that fails later in its lifecycle.
  1224. //
  1225. // Note that extraFailureAction will retain a reference to conjureConfig for
  1226. // the lifetime of the tunnel.
  1227. extraFailureAction := func() {
  1228. refraction.DeleteCachedConjureRegistration(conjureConfig)
  1229. }
  1230. dialCtx := ctx
  1231. if protocol.ConjureTransportUsesDTLS(dialParams.ConjureTransport) {
  1232. // Conjure doesn't use the DTLS seed scheme, which supports in-proxy
  1233. // DTLS randomization. But every DTLS dial expects to find a seed
  1234. // state, so set the no-seed state.
  1235. dialCtx = inproxy_dtls.SetNoDTLSSeed(ctx)
  1236. }
  1237. dialConn, err := refraction.DialConjure(
  1238. dialCtx,
  1239. config.EmitRefractionNetworkingLogs,
  1240. config.GetPsiphonDataDirectory(),
  1241. NewRefractionNetworkingDialer(dialParams.GetDialConfig()).DialContext,
  1242. dialParams.DirectDialAddress,
  1243. conjureConfig)
  1244. if err != nil {
  1245. // When this function fails, invoke extraFailureAction directly; when it
  1246. // succeeds, return extraFailureAction to be called later.
  1247. extraFailureAction()
  1248. return nil, nil, errors.Trace(err)
  1249. }
  1250. return dialConn, extraFailureAction, nil
  1251. }
  1252. // makeInproxyTCPDialer returns a dialer which proxies TCP dials via an
  1253. // in-proxy proxy, as configured in dialParams.
  1254. //
  1255. // Limitation: MeekConn may redial TCP for a single tunnel connection, but
  1256. // that case is not supported by the in-proxy protocol, as the in-proxy proxy
  1257. // closes both its WebRTC DataChannel and the overall client connection when
  1258. // the upstream TCP connection closes. Any new connection from the client to
  1259. // the proxy must be a new tunnel connection with and accompanying
  1260. // broker/server relay. As a future enhancement, consider extending the
  1261. // in-proxy protocol to enable the client and proxy to establish additional
  1262. // WebRTC DataChannels and new upstream TCP connections within the scope of a
  1263. // single proxy/client connection.
  1264. func makeInproxyTCPDialer(
  1265. config *Config, dialParams *DialParameters) common.Dialer {
  1266. return func(ctx context.Context, _, _ string) (net.Conn, error) {
  1267. if dialParams.inproxyConn.Load() != nil {
  1268. return nil, errors.TraceNew("redial not supported")
  1269. }
  1270. var conn net.Conn
  1271. var err error
  1272. conn, err = dialInproxy(ctx, config, dialParams)
  1273. if err != nil {
  1274. return nil, errors.Trace(err)
  1275. }
  1276. // When the TCP fragmentor is configured for the 2nd hop protocol,
  1277. // approximate the behavior by applying the fragmentor to the WebRTC
  1278. // DataChannel writes, which will result in delays and DataChannel
  1279. // message sizes which will be reflected in the proxy's relay to its
  1280. // upstream TCP connection.
  1281. //
  1282. // This code is copied from DialTCP.
  1283. //
  1284. // Limitation: TCP BPF settings are not supported and currently
  1285. // disabled for all 2nd hop cases in
  1286. // protocol.TunnelProtocolMayUseClientBPF.
  1287. if dialParams.dialConfig.FragmentorConfig.MayFragment() {
  1288. conn = fragmentor.NewConn(
  1289. dialParams.dialConfig.FragmentorConfig,
  1290. func(message string) {
  1291. NoticeFragmentor(dialParams.dialConfig.DiagnosticID, message)
  1292. },
  1293. conn)
  1294. }
  1295. return conn, nil
  1296. }
  1297. }
  1298. // dialInproxy performs the in-proxy dial and returns the resulting conn for
  1299. // use as an underlying conn for the 2nd hop protocol. The in-proxy dial
  1300. // first connects to the broker (or reuses an existing connection) to match
  1301. // with a proxy; and then establishes connection to the proxy.
  1302. func dialInproxy(
  1303. ctx context.Context,
  1304. config *Config,
  1305. dialParams *DialParameters) (*inproxy.ClientConn, error) {
  1306. isProxy := false
  1307. webRTCDialInstance, err := NewInproxyWebRTCDialInstance(
  1308. config,
  1309. dialParams.NetworkID,
  1310. isProxy,
  1311. dialParams.inproxyNATStateManager,
  1312. dialParams.InproxySTUNDialParameters,
  1313. dialParams.InproxyWebRTCDialParameters)
  1314. if err != nil {
  1315. return nil, errors.Trace(err)
  1316. }
  1317. // dialAddress indicates to the broker and proxy how to dial the upstream
  1318. // Psiphon server, based on the 2nd hop tunnel protocol.
  1319. networkProtocol := inproxy.NetworkProtocolUDP
  1320. reliableTransport := false
  1321. if protocol.TunnelProtocolUsesTCP(dialParams.TunnelProtocol) {
  1322. networkProtocol = inproxy.NetworkProtocolTCP
  1323. reliableTransport = true
  1324. }
  1325. dialAddress := dialParams.DirectDialAddress
  1326. if protocol.TunnelProtocolUsesMeek(dialParams.TunnelProtocol) {
  1327. dialAddress = dialParams.MeekDialAddress
  1328. }
  1329. // Specify the value to be returned by inproxy.ClientConn.RemoteAddr.
  1330. // Currently, the one caller of RemoteAddr is utls, which uses the
  1331. // RemoteAddr as a TLS session cache key when there is no SNI.
  1332. // GetTLSSessionCacheKeyAddress returns a cache key value that is a valid
  1333. // address and that is also a more appropriate TLS session cache key than
  1334. // the proxy address.
  1335. remoteAddrOverride, err := dialParams.ServerEntry.GetTLSSessionCacheKeyAddress(
  1336. dialParams.TunnelProtocol)
  1337. if err != nil {
  1338. return nil, errors.Trace(err)
  1339. }
  1340. // Unlike the proxy broker case, clients already actively fetch tactics
  1341. // during tunnel estalishment, so tactics.SetTacticsAPIParameters are not
  1342. // sent to the broker and no tactics are returned by the broker.
  1343. params := getBaseAPIParameters(
  1344. baseParametersNoDialParameters, true, config, nil)
  1345. // The debugLogging flag is passed to both NoticeCommonLogger and to the
  1346. // inproxy package as well; skipping debug logs in the inproxy package,
  1347. // before calling into the notice logger, avoids unnecessary allocations
  1348. // and formatting when debug logging is off.
  1349. debugLogging := config.InproxyEnableWebRTCDebugLogging
  1350. clientConfig := &inproxy.ClientConfig{
  1351. Logger: NoticeCommonLogger(debugLogging),
  1352. EnableWebRTCDebugLogging: debugLogging,
  1353. BaseAPIParameters: params,
  1354. BrokerClient: dialParams.inproxyBrokerClient,
  1355. WebRTCDialCoordinator: webRTCDialInstance,
  1356. ReliableTransport: reliableTransport,
  1357. DialNetworkProtocol: networkProtocol,
  1358. DialAddress: dialAddress,
  1359. RemoteAddrOverride: remoteAddrOverride,
  1360. PackedDestinationServerEntry: dialParams.inproxyPackedSignedServerEntry,
  1361. MustUpgrade: config.OnInproxyMustUpgrade,
  1362. }
  1363. conn, err := inproxy.DialClient(ctx, clientConfig)
  1364. if err != nil {
  1365. return nil, errors.Trace(err)
  1366. }
  1367. // The inproxy.ClientConn is stored in dialParams.inproxyConn in order to
  1368. // later fetch its connection ID and to facilitate broker/client replay.
  1369. dialParams.inproxyConn.Store(conn)
  1370. return conn, nil
  1371. }
  1372. // Fields are exported for JSON encoding in NoticeLivenessTest.
  1373. type livenessTestMetrics struct {
  1374. Duration string
  1375. UpstreamBytes int
  1376. SentUpstreamBytes int
  1377. DownstreamBytes int
  1378. ReceivedDownstreamBytes int
  1379. }
  1380. func performLivenessTest(
  1381. sshClient *ssh.Client,
  1382. minUpstreamBytes, maxUpstreamBytes int,
  1383. minDownstreamBytes, maxDownstreamBytes int,
  1384. livenessTestPRNGSeed *prng.Seed) (*livenessTestMetrics, error) {
  1385. metrics := new(livenessTestMetrics)
  1386. defer func(startTime time.Time) {
  1387. metrics.Duration = time.Since(startTime).String()
  1388. }(time.Now())
  1389. PRNG := prng.NewPRNGWithSeed(livenessTestPRNGSeed)
  1390. metrics.UpstreamBytes = PRNG.Range(minUpstreamBytes, maxUpstreamBytes)
  1391. metrics.DownstreamBytes = PRNG.Range(minDownstreamBytes, maxDownstreamBytes)
  1392. request := &protocol.RandomStreamRequest{
  1393. UpstreamBytes: metrics.UpstreamBytes,
  1394. DownstreamBytes: metrics.DownstreamBytes,
  1395. }
  1396. extraData, err := json.Marshal(request)
  1397. if err != nil {
  1398. return metrics, errors.Trace(err)
  1399. }
  1400. channel, requests, err := sshClient.OpenChannel(
  1401. protocol.RANDOM_STREAM_CHANNEL_TYPE, extraData)
  1402. if err != nil {
  1403. return metrics, errors.Trace(err)
  1404. }
  1405. defer channel.Close()
  1406. go ssh.DiscardRequests(requests)
  1407. sent := 0
  1408. received := 0
  1409. upstream := new(sync.WaitGroup)
  1410. var errUpstream, errDownstream error
  1411. if metrics.UpstreamBytes > 0 {
  1412. // Process streams concurrently to minimize elapsed time. This also
  1413. // avoids a unidirectional flow burst early in the tunnel lifecycle.
  1414. upstream.Add(1)
  1415. go func() {
  1416. defer upstream.Done()
  1417. // In consideration of memory-constrained environments, use modest-sized copy
  1418. // buffers since many tunnel establishment workers may run the liveness test
  1419. // concurrently.
  1420. var buffer [4096]byte
  1421. n, err := common.CopyNBuffer(channel, rand.Reader, int64(metrics.UpstreamBytes), buffer[:])
  1422. sent = int(n)
  1423. if err != nil {
  1424. errUpstream = errors.Trace(err)
  1425. }
  1426. }()
  1427. }
  1428. if metrics.DownstreamBytes > 0 {
  1429. var buffer [4096]byte
  1430. n, err := common.CopyNBuffer(ioutil.Discard, channel, int64(metrics.DownstreamBytes), buffer[:])
  1431. received = int(n)
  1432. if err != nil {
  1433. errDownstream = errors.Trace(err)
  1434. }
  1435. }
  1436. upstream.Wait()
  1437. metrics.SentUpstreamBytes = sent
  1438. metrics.ReceivedDownstreamBytes = received
  1439. if errUpstream != nil {
  1440. return metrics, errUpstream
  1441. } else if errDownstream != nil {
  1442. return metrics, errDownstream
  1443. }
  1444. return metrics, nil
  1445. }
  1446. // operateTunnel monitors the health of the tunnel and performs
  1447. // periodic work.
  1448. //
  1449. // BytesTransferred and TotalBytesTransferred notices are emitted
  1450. // for live reporting and diagnostics reporting, respectively.
  1451. //
  1452. // Status requests are sent to the Psiphon API to report bytes
  1453. // transferred.
  1454. //
  1455. // Periodic SSH keep alive packets are sent to ensure the underlying
  1456. // TCP connection isn't terminated by NAT, or other network
  1457. // interference -- or test if it has been terminated while the device
  1458. // has been asleep. When a keep alive times out, the tunnel is
  1459. // considered failed.
  1460. //
  1461. // An immediate SSH keep alive "probe" is sent to test the tunnel and
  1462. // server responsiveness when a port forward failure is detected: a
  1463. // failed dial or failed read/write. This keep alive has a shorter
  1464. // timeout.
  1465. //
  1466. // Note that port forward failures may be due to non-failure conditions.
  1467. // For example, when the user inputs an invalid domain name and
  1468. // resolution is done by the ssh server; or trying to connect to a
  1469. // non-white-listed port; and the error message in these cases is not
  1470. // distinguishable from a a true server error (a common error message,
  1471. // "ssh: rejected: administratively prohibited (open failed)", may be
  1472. // returned for these cases but also if the server has run out of
  1473. // ephemeral ports, for example).
  1474. //
  1475. // SSH keep alives are not sent when the tunnel has been recently
  1476. // active (not only does tunnel activity obviate the necessity of a keep
  1477. // alive, testing has shown that keep alives may time out for "busy"
  1478. // tunnels, especially over meek protocol and other high latency
  1479. // conditions).
  1480. //
  1481. // "Recently active" is defined has having received payload bytes. Sent
  1482. // bytes are not considered as testing has shown bytes may appear to
  1483. // send when certain NAT devices have interfered with the tunnel, while
  1484. // no bytes are received. In a pathological case, with DNS implemented
  1485. // as tunneled UDP, a browser may wait excessively for a domain name to
  1486. // resolve, while no new port forward is attempted which would otherwise
  1487. // result in a tunnel failure detection.
  1488. //
  1489. // TODO: change "recently active" to include having received any
  1490. // SSH protocol messages from the server, not just user payload?
  1491. func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
  1492. defer tunnel.operateWaitGroup.Done()
  1493. now := time.Now()
  1494. lastBytesReceivedTime := now
  1495. lastTotalBytesTransferedTime := now
  1496. totalSent := int64(0)
  1497. totalReceived := int64(0)
  1498. setDialParamsSucceeded := false
  1499. noticeBytesTransferredTicker := time.NewTicker(1 * time.Second)
  1500. defer noticeBytesTransferredTicker.Stop()
  1501. // The next status request and ssh keep alive times are picked at random,
  1502. // from a range, to make the resulting traffic less fingerprintable,
  1503. // Note: not using Tickers since these are not fixed time periods.
  1504. nextStatusRequestPeriod := func() time.Duration {
  1505. p := tunnel.getCustomParameters()
  1506. return prng.Period(
  1507. p.Duration(parameters.PsiphonAPIStatusRequestPeriodMin),
  1508. p.Duration(parameters.PsiphonAPIStatusRequestPeriodMax))
  1509. }
  1510. statsTimer := time.NewTimer(nextStatusRequestPeriod())
  1511. defer statsTimer.Stop()
  1512. // Schedule an almost-immediate status request to deliver any unreported
  1513. // persistent stats.
  1514. unreported := CountUnreportedPersistentStats()
  1515. if unreported > 0 {
  1516. NoticeInfo("Unreported persistent stats: %d", unreported)
  1517. p := tunnel.getCustomParameters()
  1518. statsTimer.Reset(
  1519. prng.Period(
  1520. p.Duration(parameters.PsiphonAPIStatusRequestShortPeriodMin),
  1521. p.Duration(parameters.PsiphonAPIStatusRequestShortPeriodMax)))
  1522. }
  1523. nextSshKeepAlivePeriod := func() time.Duration {
  1524. p := tunnel.getCustomParameters()
  1525. return prng.Period(
  1526. p.Duration(parameters.SSHKeepAlivePeriodMin),
  1527. p.Duration(parameters.SSHKeepAlivePeriodMax))
  1528. }
  1529. // TODO: don't initialize timer when config.DisablePeriodicSshKeepAlive is set
  1530. sshKeepAliveTimer := time.NewTimer(nextSshKeepAlivePeriod())
  1531. if tunnel.config.DisablePeriodicSshKeepAlive {
  1532. sshKeepAliveTimer.Stop()
  1533. } else {
  1534. defer sshKeepAliveTimer.Stop()
  1535. }
  1536. // Perform network requests in separate goroutines so as not to block
  1537. // other operations.
  1538. requestsWaitGroup := new(sync.WaitGroup)
  1539. requestsWaitGroup.Add(1)
  1540. signalStatusRequest := make(chan struct{})
  1541. go func() {
  1542. defer requestsWaitGroup.Done()
  1543. for range signalStatusRequest {
  1544. sendStats(tunnel)
  1545. }
  1546. }()
  1547. requestsWaitGroup.Add(1)
  1548. signalPeriodicSshKeepAlive := make(chan time.Duration)
  1549. sshKeepAliveError := make(chan error, 1)
  1550. go func() {
  1551. defer requestsWaitGroup.Done()
  1552. isFirstPeriodicKeepAlive := true
  1553. for timeout := range signalPeriodicSshKeepAlive {
  1554. bytesUp := atomic.LoadInt64(&totalSent)
  1555. bytesDown := atomic.LoadInt64(&totalReceived)
  1556. err := tunnel.sendSshKeepAlive(
  1557. isFirstPeriodicKeepAlive, false, timeout, bytesUp, bytesDown)
  1558. if err != nil {
  1559. select {
  1560. case sshKeepAliveError <- err:
  1561. default:
  1562. }
  1563. }
  1564. isFirstPeriodicKeepAlive = false
  1565. }
  1566. }()
  1567. // Probe-type SSH keep alives have a distinct send worker and may be sent
  1568. // concurrently, to ensure a long period keep alive timeout doesn't delay
  1569. // failed tunnel detection.
  1570. requestsWaitGroup.Add(1)
  1571. signalProbeSshKeepAlive := make(chan time.Duration)
  1572. go func() {
  1573. defer requestsWaitGroup.Done()
  1574. for timeout := range signalProbeSshKeepAlive {
  1575. bytesUp := atomic.LoadInt64(&totalSent)
  1576. bytesDown := atomic.LoadInt64(&totalReceived)
  1577. err := tunnel.sendSshKeepAlive(
  1578. false, true, timeout, bytesUp, bytesDown)
  1579. if err != nil {
  1580. select {
  1581. case sshKeepAliveError <- err:
  1582. default:
  1583. }
  1584. }
  1585. }
  1586. }()
  1587. shutdown := false
  1588. var err error
  1589. for !shutdown && err == nil {
  1590. select {
  1591. case <-noticeBytesTransferredTicker.C:
  1592. sent, received := transferstats.ReportRecentBytesTransferredForServer(
  1593. tunnel.dialParams.ServerEntry.IpAddress)
  1594. if received > 0 {
  1595. lastBytesReceivedTime = time.Now()
  1596. }
  1597. bytesUp := atomic.AddInt64(&totalSent, sent)
  1598. bytesDown := atomic.AddInt64(&totalReceived, received)
  1599. p := tunnel.getCustomParameters()
  1600. noticePeriod := p.Duration(parameters.TotalBytesTransferredNoticePeriod)
  1601. doEmitMemoryMetrics := p.Bool(parameters.TotalBytesTransferredEmitMemoryMetrics)
  1602. replayTargetUpstreamBytes := p.Int(parameters.ReplayTargetUpstreamBytes)
  1603. replayTargetDownstreamBytes := p.Int(parameters.ReplayTargetDownstreamBytes)
  1604. replayTargetTunnelDuration := p.Duration(parameters.ReplayTargetTunnelDuration)
  1605. if lastTotalBytesTransferedTime.Add(noticePeriod).Before(time.Now()) {
  1606. NoticeTotalBytesTransferred(
  1607. tunnel.dialParams.ServerEntry.GetDiagnosticID(), bytesUp, bytesDown)
  1608. if doEmitMemoryMetrics {
  1609. emitMemoryMetrics()
  1610. }
  1611. lastTotalBytesTransferedTime = time.Now()
  1612. }
  1613. // Only emit the frequent BytesTransferred notice when tunnel is not idle.
  1614. if tunnel.config.EmitBytesTransferred && (sent > 0 || received > 0) {
  1615. NoticeBytesTransferred(
  1616. tunnel.dialParams.ServerEntry.GetDiagnosticID(), sent, received)
  1617. }
  1618. // Once the tunnel has connected, activated, successfully transmitted the
  1619. // targeted number of bytes, and been up for the targeted duration
  1620. // (measured from the end of establishment), store its dial parameters for
  1621. // subsequent replay.
  1622. //
  1623. // Even when target bytes and duration are all 0, the tunnel must remain up
  1624. // for at least 1 second due to use of noticeBytesTransferredTicker; for
  1625. // the same reason the granularity of ReplayTargetTunnelDuration is
  1626. // seconds.
  1627. if !setDialParamsSucceeded &&
  1628. bytesUp >= int64(replayTargetUpstreamBytes) &&
  1629. bytesDown >= int64(replayTargetDownstreamBytes) &&
  1630. time.Since(tunnel.establishedTime) >= replayTargetTunnelDuration {
  1631. tunnel.dialParams.Succeeded()
  1632. setDialParamsSucceeded = true
  1633. }
  1634. case <-statsTimer.C:
  1635. select {
  1636. case signalStatusRequest <- struct{}{}:
  1637. default:
  1638. }
  1639. statsTimer.Reset(nextStatusRequestPeriod())
  1640. case <-sshKeepAliveTimer.C:
  1641. p := tunnel.getCustomParameters()
  1642. inactivePeriod := p.Duration(parameters.SSHKeepAlivePeriodicInactivePeriod)
  1643. if lastBytesReceivedTime.Add(inactivePeriod).Before(time.Now()) {
  1644. timeout := p.Duration(parameters.SSHKeepAlivePeriodicTimeout)
  1645. select {
  1646. case signalPeriodicSshKeepAlive <- timeout:
  1647. default:
  1648. }
  1649. }
  1650. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  1651. case <-tunnel.signalPortForwardFailure:
  1652. // Note: no mutex on portForwardFailureTotal; only referenced here
  1653. tunnel.totalPortForwardFailures++
  1654. NoticeInfo("port forward failures for %s: %d",
  1655. tunnel.dialParams.ServerEntry.GetDiagnosticID(),
  1656. tunnel.totalPortForwardFailures)
  1657. // If the underlying Conn has closed (meek and other plugin protocols may
  1658. // close themselves in certain error conditions), the tunnel has certainly
  1659. // failed. Otherwise, probe with an SSH keep alive.
  1660. //
  1661. // TODO: the IsClosed case omits the failed tunnel logging and reset
  1662. // actions performed by sendSshKeepAlive. Should self-closing protocols
  1663. // perform these actions themselves?
  1664. if tunnel.conn.IsClosed() {
  1665. err = errors.TraceNew("underlying conn is closed")
  1666. } else {
  1667. p := tunnel.getCustomParameters()
  1668. inactivePeriod := p.Duration(parameters.SSHKeepAliveProbeInactivePeriod)
  1669. if lastBytesReceivedTime.Add(inactivePeriod).Before(time.Now()) {
  1670. timeout := p.Duration(parameters.SSHKeepAliveProbeTimeout)
  1671. select {
  1672. case signalProbeSshKeepAlive <- timeout:
  1673. default:
  1674. }
  1675. }
  1676. if !tunnel.config.DisablePeriodicSshKeepAlive {
  1677. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  1678. }
  1679. }
  1680. case err = <-sshKeepAliveError:
  1681. case serverRequest := <-tunnel.sshServerRequests:
  1682. if serverRequest != nil {
  1683. HandleServerRequest(tunnelOwner, tunnel, serverRequest)
  1684. }
  1685. case <-tunnel.operateCtx.Done():
  1686. shutdown = true
  1687. }
  1688. }
  1689. close(signalPeriodicSshKeepAlive)
  1690. close(signalProbeSshKeepAlive)
  1691. close(signalStatusRequest)
  1692. requestsWaitGroup.Wait()
  1693. // Capture bytes transferred since the last noticeBytesTransferredTicker tick
  1694. sent, received := transferstats.ReportRecentBytesTransferredForServer(
  1695. tunnel.dialParams.ServerEntry.IpAddress)
  1696. bytesUp := atomic.AddInt64(&totalSent, sent)
  1697. bytesDown := atomic.AddInt64(&totalReceived, received)
  1698. // Always emit a final NoticeTotalBytesTransferred
  1699. NoticeTotalBytesTransferred(
  1700. tunnel.dialParams.ServerEntry.GetDiagnosticID(), bytesUp, bytesDown)
  1701. if err == nil {
  1702. NoticeInfo("shutdown operate tunnel")
  1703. // This commanded shutdown case is initiated by Tunnel.Close, which will
  1704. // wait up to parameters.TunnelOperateShutdownTimeout to allow the following
  1705. // requests to complete.
  1706. // Send a final status request in order to report any outstanding persistent
  1707. // stats and domain bytes transferred as soon as possible.
  1708. sendStats(tunnel)
  1709. // The controller connectedReporter may have initiated a connected request
  1710. // concurrent to this commanded shutdown. SetInFlightConnectedRequest
  1711. // ensures that a connected request doesn't start after the commanded
  1712. // shutdown. AwaitInFlightConnectedRequest blocks until any in flight
  1713. // request completes or is aborted after TunnelOperateShutdownTimeout.
  1714. //
  1715. // As any connected request is performed by a concurrent goroutine,
  1716. // sendStats is called first and AwaitInFlightConnectedRequest second.
  1717. tunnel.AwaitInFlightConnectedRequest()
  1718. } else {
  1719. NoticeWarning("operate tunnel error for %s: %s",
  1720. tunnel.dialParams.ServerEntry.GetDiagnosticID(), err)
  1721. tunnelOwner.SignalTunnelFailure(tunnel)
  1722. }
  1723. }
  1724. // sendSshKeepAlive is a helper which sends a keepalive@openssh.com request
  1725. // on the specified SSH connections and returns true of the request succeeds
  1726. // within a specified timeout. If the request fails, the associated conn is
  1727. // closed, which will terminate the associated tunnel.
  1728. func (tunnel *Tunnel) sendSshKeepAlive(
  1729. isFirstPeriodicKeepAlive bool,
  1730. isProbeKeepAlive bool,
  1731. timeout time.Duration,
  1732. bytesUp int64,
  1733. bytesDown int64) error {
  1734. p := tunnel.getCustomParameters()
  1735. // Random padding to frustrate fingerprinting.
  1736. request := prng.Padding(
  1737. p.Int(parameters.SSHKeepAlivePaddingMinBytes),
  1738. p.Int(parameters.SSHKeepAlivePaddingMaxBytes))
  1739. speedTestSample := isFirstPeriodicKeepAlive
  1740. if !speedTestSample {
  1741. speedTestSample = p.WeightedCoinFlip(
  1742. parameters.SSHKeepAliveSpeedTestSampleProbability)
  1743. }
  1744. networkConnectivityPollPeriod := p.Duration(
  1745. parameters.SSHKeepAliveNetworkConnectivityPollingPeriod)
  1746. resetOnFailure := p.WeightedCoinFlip(
  1747. parameters.SSHKeepAliveResetOnFailureProbability)
  1748. p.Close()
  1749. // Note: there is no request context since SSH requests cannot be interrupted
  1750. // directly. Closing the tunnel will interrupt the request. A timeout is set
  1751. // to unblock this function, but the goroutine may not exit until the tunnel
  1752. // is closed.
  1753. // Use a buffer of 1 as there are two senders and only one guaranteed receive.
  1754. errChannel := make(chan error, 1)
  1755. afterFunc := time.AfterFunc(timeout, func() {
  1756. errChannel <- errors.TraceNew("timed out")
  1757. })
  1758. defer afterFunc.Stop()
  1759. go func() {
  1760. startTime := time.Now()
  1761. // Note: reading a reply is important for last-received-time tunnel
  1762. // duration calculation.
  1763. requestOk, response, err := tunnel.sshClient.SendRequest(
  1764. "keepalive@openssh.com", true, request)
  1765. elapsedTime := time.Since(startTime)
  1766. errChannel <- err
  1767. success := (err == nil && requestOk)
  1768. if success && isProbeKeepAlive {
  1769. NoticeInfo("Probe SSH keep-alive RTT: %s", elapsedTime)
  1770. }
  1771. // Record the keep alive round trip as a speed test sample. The first
  1772. // periodic keep alive is always recorded, as many tunnels are short-lived
  1773. // and we want to ensure that some data is gathered. Subsequent keep alives
  1774. // are recorded with some configurable probability, which, considering that
  1775. // only the last SpeedTestMaxSampleCount samples are retained, enables
  1776. // tuning the sampling frequency.
  1777. if success && speedTestSample {
  1778. err = tactics.AddSpeedTestSample(
  1779. tunnel.config.GetParameters(),
  1780. GetTacticsStorer(tunnel.config),
  1781. tunnel.config.GetNetworkID(),
  1782. tunnel.dialParams.ServerEntry.Region,
  1783. tunnel.dialParams.TunnelProtocol,
  1784. elapsedTime,
  1785. request,
  1786. response)
  1787. if err != nil {
  1788. NoticeWarning("AddSpeedTestSample failed: %s", errors.Trace(err))
  1789. }
  1790. }
  1791. }()
  1792. // While awaiting the response, poll the network connectivity state. If there
  1793. // is network connectivity, on the same network, for the entire duration of
  1794. // the keep alive request and the request fails, record a failed tunnel
  1795. // event.
  1796. //
  1797. // The network connectivity heuristic is intended to reduce the number of
  1798. // failed tunnels reported due to routine situations such as varying mobile
  1799. // network conditions. The polling may produce false positives if the network
  1800. // goes down and up between polling periods, or changes to a new network and
  1801. // back to the previous network between polling periods.
  1802. //
  1803. // For platforms that don't provide a NetworkConnectivityChecker, it is
  1804. // assumed that there is network connectivity.
  1805. //
  1806. // The approximate number of tunneled bytes successfully sent and received is
  1807. // recorded in the failed tunnel event as a quality indicator.
  1808. ticker := time.NewTicker(networkConnectivityPollPeriod)
  1809. defer ticker.Stop()
  1810. continuousNetworkConnectivity := true
  1811. networkID := tunnel.config.GetNetworkID()
  1812. var err error
  1813. loop:
  1814. for {
  1815. select {
  1816. case err = <-errChannel:
  1817. break loop
  1818. case <-ticker.C:
  1819. connectivityChecker := tunnel.config.NetworkConnectivityChecker
  1820. if (connectivityChecker != nil &&
  1821. connectivityChecker.HasNetworkConnectivity() != 1) ||
  1822. (networkID != tunnel.config.GetNetworkID()) {
  1823. continuousNetworkConnectivity = false
  1824. }
  1825. }
  1826. }
  1827. err = errors.Trace(err)
  1828. if err != nil {
  1829. tunnel.sshClient.Close()
  1830. tunnel.conn.Close()
  1831. // Don't perform log or reset actions when the keep alive may have been
  1832. // interrupted due to shutdown.
  1833. isShutdown := false
  1834. select {
  1835. case <-tunnel.operateCtx.Done():
  1836. isShutdown = true
  1837. default:
  1838. }
  1839. // Ensure that at most one of the two SSH keep alive workers (periodic and
  1840. // probe) perform the log and reset actions.
  1841. wasHandled := atomic.CompareAndSwapInt32(&tunnel.handledSSHKeepAliveFailure, 0, 1)
  1842. if continuousNetworkConnectivity &&
  1843. !isShutdown &&
  1844. !wasHandled {
  1845. _ = RecordFailedTunnelStat(
  1846. tunnel.config,
  1847. tunnel.dialParams,
  1848. tunnel.livenessTestMetrics,
  1849. bytesUp,
  1850. bytesDown,
  1851. err)
  1852. if tunnel.extraFailureAction != nil {
  1853. tunnel.extraFailureAction()
  1854. }
  1855. // SSHKeepAliveResetOnFailureProbability is set when a late-lifecycle
  1856. // impaired protocol attack is suspected. With the given probability, reset
  1857. // server affinity and replay parameters for this server to avoid
  1858. // continuously reconnecting to the server and/or using the same protocol
  1859. // and dial parameters.
  1860. if resetOnFailure {
  1861. NoticeInfo("Delete dial parameters for %s", tunnel.dialParams.ServerEntry.GetDiagnosticID())
  1862. err := DeleteDialParameters(tunnel.dialParams.ServerEntry.IpAddress, tunnel.dialParams.NetworkID)
  1863. if err != nil {
  1864. NoticeWarning("DeleteDialParameters failed: %s", err)
  1865. }
  1866. NoticeInfo("Delete server affinity for %s", tunnel.dialParams.ServerEntry.GetDiagnosticID())
  1867. err = DeleteServerEntryAffinity(tunnel.dialParams.ServerEntry.IpAddress)
  1868. if err != nil {
  1869. NoticeWarning("DeleteServerEntryAffinity failed: %s", err)
  1870. }
  1871. }
  1872. }
  1873. }
  1874. return err
  1875. }
  1876. // sendStats is a helper for sending session stats to the server.
  1877. func sendStats(tunnel *Tunnel) bool {
  1878. // Tunnel does not have a serverContext when DisableApi is set
  1879. if tunnel.serverContext == nil {
  1880. return true
  1881. }
  1882. // Skip when tunnel is discarded
  1883. if tunnel.IsDiscarded() {
  1884. return true
  1885. }
  1886. err := tunnel.serverContext.DoStatusRequest(tunnel)
  1887. if err != nil {
  1888. NoticeWarning("DoStatusRequest failed for %s: %s",
  1889. tunnel.dialParams.ServerEntry.GetDiagnosticID(), err)
  1890. }
  1891. return err == nil
  1892. }