tunnel.go 72 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "context"
  23. "crypto/rand"
  24. "encoding/base64"
  25. "encoding/json"
  26. std_errors "errors"
  27. "fmt"
  28. "io"
  29. "io/ioutil"
  30. "net"
  31. "net/http"
  32. "strconv"
  33. "sync"
  34. "sync/atomic"
  35. "time"
  36. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/ssh"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/fragmentor"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy"
  41. inproxy_dtls "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy/dtls"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  45. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  46. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  47. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/refraction"
  48. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
  49. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/transferstats"
  50. "github.com/fxamacker/cbor/v2"
  51. )
  52. // Tunneler specifies the interface required by components that use tunnels.
  53. type Tunneler interface {
  54. // Dial creates a tunneled connection.
  55. //
  56. // When split tunnel mode is enabled, the connection may be untunneled,
  57. // depending on GeoIP classification of the destination.
  58. //
  59. // downstreamConn is an optional parameter which specifies a connection to be
  60. // explicitly closed when the Dialed connection is closed. For instance, this
  61. // is used to close downstreamConn App<->LocalProxy connections when the related
  62. // LocalProxy<->SshPortForward connections close.
  63. Dial(remoteAddr string, downstreamConn net.Conn) (conn net.Conn, err error)
  64. DirectDial(remoteAddr string) (conn net.Conn, err error)
  65. SignalComponentFailure()
  66. }
  67. // TunnelOwner specifies the interface required by Tunnel to notify its
  68. // owner when it has failed. The owner may, as in the case of the Controller,
  69. // remove the tunnel from its list of active tunnels.
  70. type TunnelOwner interface {
  71. SignalSeededNewSLOK()
  72. SignalTunnelFailure(tunnel *Tunnel)
  73. }
  74. // Tunnel is a connection to a Psiphon server. An established
  75. // tunnel includes a network connection to the specified server
  76. // and an SSH session built on top of that transport.
  77. type Tunnel struct {
  78. mutex *sync.Mutex
  79. config *Config
  80. isActivated bool
  81. isDiscarded bool
  82. isClosed bool
  83. dialParams *DialParameters
  84. livenessTestMetrics *livenessTestMetrics
  85. extraFailureAction func()
  86. serverContext *ServerContext
  87. monitoringStartTime time.Time
  88. conn *common.BurstMonitoredConn
  89. sshClient *ssh.Client
  90. sshServerRequests <-chan *ssh.Request
  91. operateWaitGroup *sync.WaitGroup
  92. operateCtx context.Context
  93. stopOperate context.CancelFunc
  94. signalPortForwardFailure chan struct{}
  95. totalPortForwardFailures int
  96. adjustedEstablishStartTime time.Time
  97. establishDuration time.Duration
  98. establishedTime time.Time
  99. handledSSHKeepAliveFailure int32
  100. inFlightConnectedRequestSignal chan struct{}
  101. }
  102. // getCustomParameters helpers wrap the verbose function call chain required
  103. // to get a current snapshot of the parameters.Parameters customized with the
  104. // dial parameters associated with a tunnel.
  105. func (tunnel *Tunnel) getCustomParameters() parameters.ParametersAccessor {
  106. return getCustomParameters(tunnel.config, tunnel.dialParams)
  107. }
  108. func getCustomParameters(
  109. config *Config, dialParams *DialParameters) parameters.ParametersAccessor {
  110. return config.GetParameters().GetCustom(dialParams.NetworkLatencyMultiplier)
  111. }
  112. // ConnectTunnel first makes a network transport connection to the
  113. // Psiphon server and then establishes an SSH client session on top of
  114. // that transport. The SSH server is authenticated using the public
  115. // key in the server entry.
  116. // Depending on the server's capabilities, the connection may use
  117. // plain SSH over TCP, obfuscated SSH over TCP, or obfuscated SSH over
  118. // HTTP (meek protocol).
  119. // When requiredProtocol is not blank, that protocol is used. Otherwise,
  120. // the a random supported protocol is used.
  121. //
  122. // Call Activate on a connected tunnel to complete its establishment
  123. // before using.
  124. //
  125. // Tunnel establishment is split into two phases: connection, and
  126. // activation. The Controller will run many ConnectTunnel calls
  127. // concurrently and then, to avoid unnecessary overhead from making
  128. // handshake requests and starting operateTunnel from tunnels which
  129. // may be discarded, call Activate on connected tunnels sequentially
  130. // as necessary.
  131. func ConnectTunnel(
  132. ctx context.Context,
  133. config *Config,
  134. adjustedEstablishStartTime time.Time,
  135. dialParams *DialParameters) (*Tunnel, error) {
  136. // Build transport layers and establish SSH connection. Note that
  137. // dialConn and monitoredConn are the same network connection.
  138. dialResult, err := dialTunnel(ctx, config, dialParams)
  139. if err != nil {
  140. return nil, errors.Trace(err)
  141. }
  142. // The tunnel is now connected
  143. return &Tunnel{
  144. mutex: new(sync.Mutex),
  145. config: config,
  146. dialParams: dialParams,
  147. livenessTestMetrics: dialResult.livenessTestMetrics,
  148. extraFailureAction: dialResult.extraFailureAction,
  149. monitoringStartTime: dialResult.monitoringStartTime,
  150. conn: dialResult.monitoredConn,
  151. sshClient: dialResult.sshClient,
  152. sshServerRequests: dialResult.sshRequests,
  153. // A buffer allows at least one signal to be sent even when the receiver is
  154. // not listening. Senders should not block.
  155. signalPortForwardFailure: make(chan struct{}, 1),
  156. adjustedEstablishStartTime: adjustedEstablishStartTime,
  157. }, nil
  158. }
  159. // Activate completes the tunnel establishment, performing the handshake
  160. // request and starting operateTunnel, the worker that monitors the tunnel
  161. // and handles periodic management.
  162. func (tunnel *Tunnel) Activate(
  163. ctx context.Context, tunnelOwner TunnelOwner) (retErr error) {
  164. // Ensure that, unless the base context is cancelled, any replayed dial
  165. // parameters are cleared, no longer to be retried, if the tunnel fails to
  166. // activate.
  167. activationSucceeded := false
  168. baseCtx := ctx
  169. defer func() {
  170. if !activationSucceeded && baseCtx.Err() != context.Canceled {
  171. tunnel.dialParams.Failed(tunnel.config)
  172. if tunnel.extraFailureAction != nil {
  173. tunnel.extraFailureAction()
  174. }
  175. if retErr != nil {
  176. _ = RecordFailedTunnelStat(
  177. tunnel.config,
  178. tunnel.dialParams,
  179. tunnel.livenessTestMetrics,
  180. -1,
  181. -1,
  182. retErr)
  183. }
  184. }
  185. }()
  186. // Create a new Psiphon API server context for this tunnel. This includes
  187. // performing a handshake request. If the handshake fails, this activation
  188. // fails.
  189. var serverContext *ServerContext
  190. if !tunnel.config.DisableApi {
  191. NoticeInfo(
  192. "starting server context for %s",
  193. tunnel.dialParams.ServerEntry.GetDiagnosticID())
  194. // Call NewServerContext in a goroutine, as it blocks on a network operation,
  195. // the handshake request, and would block shutdown. If the shutdown signal is
  196. // received, close the tunnel, which will interrupt the handshake request
  197. // that may be blocking NewServerContext.
  198. //
  199. // Timeout after PsiphonApiServerTimeoutSeconds. NewServerContext may not
  200. // return if the tunnel network connection is unstable during the handshake
  201. // request. At this point, there is no operateTunnel monitor that will detect
  202. // this condition with SSH keep alives.
  203. doInproxy := protocol.TunnelProtocolUsesInproxy(tunnel.dialParams.TunnelProtocol)
  204. var timeoutParameter string
  205. if doInproxy {
  206. // Optionally allow more time in case the broker/server relay
  207. // requires additional round trips to establish a new session.
  208. timeoutParameter = parameters.InproxyPsiphonAPIRequestTimeout
  209. } else {
  210. timeoutParameter = parameters.PsiphonAPIRequestTimeout
  211. }
  212. timeout := tunnel.getCustomParameters().Duration(timeoutParameter)
  213. var handshakeCtx context.Context
  214. var cancelFunc context.CancelFunc
  215. if timeout > 0 {
  216. handshakeCtx, cancelFunc = context.WithTimeout(ctx, timeout)
  217. } else {
  218. handshakeCtx, cancelFunc = context.WithCancel(ctx)
  219. }
  220. type newServerContextResult struct {
  221. serverContext *ServerContext
  222. err error
  223. }
  224. resultChannel := make(chan newServerContextResult)
  225. wg := new(sync.WaitGroup)
  226. if doInproxy {
  227. // Launch a handler to handle broker/server relay SSH requests,
  228. // which will occur when the broker needs to establish a new
  229. // session with the server.
  230. wg.Add(1)
  231. go func() {
  232. defer wg.Done()
  233. notice := true
  234. select {
  235. case serverRequest := <-tunnel.sshServerRequests:
  236. if serverRequest != nil {
  237. if serverRequest.Type == protocol.PSIPHON_API_INPROXY_RELAY_REQUEST_NAME {
  238. if notice {
  239. NoticeInfo(
  240. "relaying inproxy broker packets for %s",
  241. tunnel.dialParams.ServerEntry.GetDiagnosticID())
  242. notice = false
  243. }
  244. tunnel.relayInproxyPacketRoundTrip(handshakeCtx, serverRequest)
  245. } else {
  246. // There's a potential race condition in which
  247. // post-handshake SSH requests, such as OSL or
  248. // alert requests, arrive to this handler instead
  249. // of operateTunnel, so invoke HandleServerRequest here.
  250. HandleServerRequest(tunnelOwner, tunnel, serverRequest)
  251. }
  252. }
  253. case <-handshakeCtx.Done():
  254. return
  255. }
  256. }()
  257. }
  258. wg.Add(1)
  259. go func() {
  260. defer wg.Done()
  261. serverContext, err := NewServerContext(tunnel)
  262. resultChannel <- newServerContextResult{
  263. serverContext: serverContext,
  264. err: err,
  265. }
  266. }()
  267. var result newServerContextResult
  268. select {
  269. case result = <-resultChannel:
  270. case <-handshakeCtx.Done():
  271. result.err = handshakeCtx.Err()
  272. // Interrupt the goroutine
  273. tunnel.Close(true)
  274. <-resultChannel
  275. }
  276. cancelFunc()
  277. wg.Wait()
  278. if result.err != nil {
  279. return errors.Trace(result.err)
  280. }
  281. serverContext = result.serverContext
  282. }
  283. // The activation succeeded.
  284. activationSucceeded = true
  285. tunnel.mutex.Lock()
  286. // It may happen that the tunnel gets closed while Activate is running.
  287. // In this case, abort here, to ensure that the operateTunnel goroutine
  288. // will not be launched after Close is called.
  289. if tunnel.isClosed {
  290. return errors.TraceNew("tunnel is closed")
  291. }
  292. tunnel.isActivated = true
  293. tunnel.serverContext = serverContext
  294. // establishDuration is the elapsed time between the controller starting tunnel
  295. // establishment and this tunnel being established. The reported value represents
  296. // how long the user waited between starting the client and having a usable tunnel;
  297. // or how long between the client detecting an unexpected tunnel disconnect and
  298. // completing automatic reestablishment.
  299. //
  300. // This time period may include time spent unsuccessfully connecting to other
  301. // servers. Time spent waiting for network connectivity is excluded.
  302. tunnel.establishDuration = time.Since(tunnel.adjustedEstablishStartTime)
  303. tunnel.establishedTime = time.Now()
  304. // Use the Background context instead of the controller run context, as tunnels
  305. // are terminated when the controller calls tunnel.Close.
  306. tunnel.operateCtx, tunnel.stopOperate = context.WithCancel(context.Background())
  307. tunnel.operateWaitGroup = new(sync.WaitGroup)
  308. // Spawn the operateTunnel goroutine, which monitors the tunnel and handles periodic
  309. // stats updates.
  310. tunnel.operateWaitGroup.Add(1)
  311. go tunnel.operateTunnel(tunnelOwner)
  312. tunnel.mutex.Unlock()
  313. return nil
  314. }
  315. func (tunnel *Tunnel) relayInproxyPacketRoundTrip(
  316. ctx context.Context, request *ssh.Request) (retErr error) {
  317. defer func() {
  318. if retErr != nil {
  319. request.Reply(false, nil)
  320. }
  321. }()
  322. // Continue the broker/server relay started in handshake round trip.
  323. // server -> broker
  324. var relayRequest protocol.InproxyRelayRequest
  325. err := cbor.Unmarshal(request.Payload, &relayRequest)
  326. inproxyConn := tunnel.dialParams.inproxyConn.Load().(*inproxy.ClientConn)
  327. if inproxyConn == nil {
  328. return errors.TraceNew("missing inproxyConn")
  329. }
  330. responsePacket, err := inproxyConn.RelayPacket(ctx, relayRequest.Packet)
  331. if err != nil {
  332. return errors.Trace(err)
  333. }
  334. // RelayPacket may return a nil packet when the relay is complete.
  335. if responsePacket == nil {
  336. return nil
  337. }
  338. // broker -> server
  339. relayResponse := &protocol.InproxyRelayResponse{
  340. Packet: responsePacket,
  341. }
  342. responsePayload, err := protocol.CBOREncoding.Marshal(relayResponse)
  343. if err != nil {
  344. return errors.Trace(err)
  345. }
  346. err = request.Reply(true, responsePayload)
  347. if err != nil {
  348. return errors.Trace(err)
  349. }
  350. return nil
  351. }
  352. // Close stops operating the tunnel and closes the underlying connection.
  353. // Supports multiple and/or concurrent calls to Close().
  354. // When isDiscarded is set, operateTunnel will not attempt to send final
  355. // status requests.
  356. func (tunnel *Tunnel) Close(isDiscarded bool) {
  357. tunnel.mutex.Lock()
  358. tunnel.isDiscarded = isDiscarded
  359. isActivated := tunnel.isActivated
  360. isClosed := tunnel.isClosed
  361. tunnel.isClosed = true
  362. tunnel.mutex.Unlock()
  363. if !isClosed {
  364. // Signal operateTunnel to stop before closing the tunnel -- this
  365. // allows a final status request to be made in the case of an orderly
  366. // shutdown.
  367. // A timer is set, so if operateTunnel takes too long to stop, the
  368. // tunnel is closed, which will interrupt any slow final status request.
  369. if isActivated {
  370. timeout := tunnel.getCustomParameters().Duration(
  371. parameters.TunnelOperateShutdownTimeout)
  372. afterFunc := time.AfterFunc(
  373. timeout,
  374. func() { tunnel.conn.Close() })
  375. tunnel.stopOperate()
  376. tunnel.operateWaitGroup.Wait()
  377. afterFunc.Stop()
  378. }
  379. tunnel.sshClient.Close()
  380. // tunnel.conn.Close() may get called multiple times, which is allowed.
  381. tunnel.conn.Close()
  382. err := tunnel.sshClient.Wait()
  383. if err != nil {
  384. NoticeWarning("close tunnel ssh error: %s", err)
  385. }
  386. }
  387. // Log burst metrics now that the BurstMonitoredConn is closed.
  388. // Metrics will be empty when burst monitoring is disabled.
  389. if !isDiscarded && isActivated {
  390. burstMetrics := tunnel.conn.GetMetrics(tunnel.monitoringStartTime)
  391. if len(burstMetrics) > 0 {
  392. NoticeBursts(
  393. tunnel.dialParams.ServerEntry.GetDiagnosticID(),
  394. burstMetrics)
  395. }
  396. }
  397. }
  398. // SetInFlightConnectedRequest checks if a connected request can begin and
  399. // sets the channel used to signal that the request is complete.
  400. //
  401. // The caller must not initiate a connected request when
  402. // SetInFlightConnectedRequest returns false. When SetInFlightConnectedRequest
  403. // returns true, the caller must call SetInFlightConnectedRequest(nil) when
  404. // the connected request completes.
  405. func (tunnel *Tunnel) SetInFlightConnectedRequest(requestSignal chan struct{}) bool {
  406. tunnel.mutex.Lock()
  407. defer tunnel.mutex.Unlock()
  408. // If already closing, don't start a connected request: the
  409. // TunnelOperateShutdownTimeout period may be nearly expired.
  410. if tunnel.isClosed {
  411. return false
  412. }
  413. if requestSignal == nil {
  414. // Not already in-flight (not expected)
  415. if tunnel.inFlightConnectedRequestSignal == nil {
  416. return false
  417. }
  418. } else {
  419. // Already in-flight (not expected)
  420. if tunnel.inFlightConnectedRequestSignal != nil {
  421. return false
  422. }
  423. }
  424. tunnel.inFlightConnectedRequestSignal = requestSignal
  425. return true
  426. }
  427. // AwaitInFlightConnectedRequest waits for the signal that any in-flight
  428. // connected request is complete.
  429. //
  430. // AwaitInFlightConnectedRequest may block until the connected request is
  431. // aborted by terminating the tunnel.
  432. func (tunnel *Tunnel) AwaitInFlightConnectedRequest() {
  433. tunnel.mutex.Lock()
  434. requestSignal := tunnel.inFlightConnectedRequestSignal
  435. tunnel.mutex.Unlock()
  436. if requestSignal != nil {
  437. <-requestSignal
  438. }
  439. }
  440. // IsActivated returns the tunnel's activated flag.
  441. func (tunnel *Tunnel) IsActivated() bool {
  442. tunnel.mutex.Lock()
  443. defer tunnel.mutex.Unlock()
  444. return tunnel.isActivated
  445. }
  446. // IsDiscarded returns the tunnel's discarded flag.
  447. func (tunnel *Tunnel) IsDiscarded() bool {
  448. tunnel.mutex.Lock()
  449. defer tunnel.mutex.Unlock()
  450. return tunnel.isDiscarded
  451. }
  452. // SendAPIRequest sends an API request as an SSH request through the tunnel.
  453. // This function blocks awaiting a response. Only one request may be in-flight
  454. // at once; a concurrent SendAPIRequest will block until an active request
  455. // receives its response (or the SSH connection is terminated).
  456. func (tunnel *Tunnel) SendAPIRequest(
  457. name string, requestPayload []byte) ([]byte, error) {
  458. ok, responsePayload, err := tunnel.sshClient.Conn.SendRequest(
  459. name, true, requestPayload)
  460. if err != nil {
  461. return nil, errors.Trace(err)
  462. }
  463. if !ok {
  464. return nil, errors.TraceNew("API request rejected")
  465. }
  466. return responsePayload, nil
  467. }
  468. // DialTCPChannel establishes a TCP port forward connection through the
  469. // tunnel.
  470. //
  471. // When split tunnel mode is enabled, and unless alwaysTunneled is set, the
  472. // server may reject the port forward and indicate that the client is to make
  473. // direct, untunneled connection. In this case, the bool return value is true
  474. // and net.Conn and error are nil.
  475. //
  476. // downstreamConn is an optional parameter which specifies a connection to be
  477. // explicitly closed when the dialed connection is closed.
  478. func (tunnel *Tunnel) DialTCPChannel(
  479. remoteAddr string,
  480. alwaysTunneled bool,
  481. downstreamConn net.Conn) (net.Conn, bool, error) {
  482. channelType := "direct-tcpip"
  483. if alwaysTunneled && tunnel.config.IsSplitTunnelEnabled() {
  484. // This channel type is only necessary in split tunnel mode.
  485. channelType = protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE
  486. }
  487. channel, err := tunnel.dialChannel(channelType, remoteAddr)
  488. if err != nil {
  489. if isSplitTunnelRejectReason(err) {
  490. return nil, true, nil
  491. }
  492. return nil, false, errors.Trace(err)
  493. }
  494. netConn, ok := channel.(net.Conn)
  495. if !ok {
  496. return nil, false, errors.Tracef("unexpected channel type: %T", channel)
  497. }
  498. conn := &TunneledConn{
  499. Conn: netConn,
  500. tunnel: tunnel,
  501. downstreamConn: downstreamConn}
  502. return tunnel.wrapWithTransferStats(conn), false, nil
  503. }
  504. func (tunnel *Tunnel) DialPacketTunnelChannel() (net.Conn, error) {
  505. channel, err := tunnel.dialChannel(protocol.PACKET_TUNNEL_CHANNEL_TYPE, "")
  506. if err != nil {
  507. return nil, errors.Trace(err)
  508. }
  509. sshChannel, ok := channel.(ssh.Channel)
  510. if !ok {
  511. return nil, errors.Tracef("unexpected channel type: %T", channel)
  512. }
  513. NoticeInfo("DialPacketTunnelChannel: established channel")
  514. conn := newChannelConn(sshChannel)
  515. // wrapWithTransferStats will track bytes transferred for the
  516. // packet tunnel. It will count packet overhead (TCP/UDP/IP headers).
  517. //
  518. // Since the data in the channel is not HTTP or TLS, no domain bytes
  519. // counting is expected.
  520. //
  521. // transferstats are also used to determine that there's been recent
  522. // activity and skip periodic SSH keep alives; see Tunnel.operateTunnel.
  523. return tunnel.wrapWithTransferStats(conn), nil
  524. }
  525. func (tunnel *Tunnel) dialChannel(channelType, remoteAddr string) (interface{}, error) {
  526. if !tunnel.IsActivated() {
  527. return nil, errors.TraceNew("tunnel is not activated")
  528. }
  529. // Note: there is no dial context since SSH port forward dials cannot
  530. // be interrupted directly. Closing the tunnel will interrupt the dials.
  531. // A timeout is set to unblock this function, but the goroutine may
  532. // not exit until the tunnel is closed.
  533. type channelDialResult struct {
  534. channel interface{}
  535. err error
  536. }
  537. // Use a buffer of 1 as there are two senders and only one guaranteed receive.
  538. results := make(chan *channelDialResult, 1)
  539. afterFunc := time.AfterFunc(
  540. tunnel.getCustomParameters().Duration(
  541. parameters.TunnelPortForwardDialTimeout),
  542. func() {
  543. results <- &channelDialResult{
  544. nil, errors.Tracef("channel dial timeout: %s", channelType)}
  545. })
  546. defer afterFunc.Stop()
  547. go func() {
  548. result := new(channelDialResult)
  549. switch channelType {
  550. case "direct-tcpip", protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE:
  551. // The protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE is the same as
  552. // "direct-tcpip", except split tunnel channel rejections are disallowed
  553. // even when split tunnel mode is enabled.
  554. result.channel, result.err =
  555. tunnel.sshClient.Dial(channelType, remoteAddr)
  556. default:
  557. var sshRequests <-chan *ssh.Request
  558. result.channel, sshRequests, result.err =
  559. tunnel.sshClient.OpenChannel(channelType, nil)
  560. if result.err == nil {
  561. go ssh.DiscardRequests(sshRequests)
  562. }
  563. }
  564. if result.err != nil {
  565. result.err = errors.Trace(result.err)
  566. }
  567. results <- result
  568. }()
  569. result := <-results
  570. if result.err != nil {
  571. if !isSplitTunnelRejectReason(result.err) {
  572. select {
  573. case tunnel.signalPortForwardFailure <- struct{}{}:
  574. default:
  575. }
  576. }
  577. return nil, errors.Trace(result.err)
  578. }
  579. return result.channel, nil
  580. }
  581. func isSplitTunnelRejectReason(err error) bool {
  582. var openChannelErr *ssh.OpenChannelError
  583. if std_errors.As(err, &openChannelErr) {
  584. return openChannelErr.Reason ==
  585. ssh.RejectionReason(protocol.CHANNEL_REJECT_REASON_SPLIT_TUNNEL)
  586. }
  587. return false
  588. }
  589. func (tunnel *Tunnel) wrapWithTransferStats(conn net.Conn) net.Conn {
  590. // Tunnel does not have a serverContext when DisableApi is set. We still use
  591. // transferstats.Conn to count bytes transferred for monitoring tunnel
  592. // quality.
  593. var regexps *transferstats.Regexps
  594. if tunnel.serverContext != nil {
  595. regexps = tunnel.serverContext.StatsRegexps()
  596. }
  597. return transferstats.NewConn(
  598. conn, tunnel.dialParams.ServerEntry.IpAddress, regexps)
  599. }
  600. // SignalComponentFailure notifies the tunnel that an associated component has failed.
  601. // This will terminate the tunnel.
  602. func (tunnel *Tunnel) SignalComponentFailure() {
  603. NoticeWarning("tunnel received component failure signal")
  604. tunnel.Close(false)
  605. }
  606. // TunneledConn implements net.Conn and wraps a port forward connection.
  607. // It is used to hook into Read and Write to observe I/O errors and
  608. // report these errors back to the tunnel monitor as port forward failures.
  609. // TunneledConn optionally tracks a peer connection to be explicitly closed
  610. // when the TunneledConn is closed.
  611. type TunneledConn struct {
  612. net.Conn
  613. tunnel *Tunnel
  614. downstreamConn net.Conn
  615. }
  616. func (conn *TunneledConn) Read(buffer []byte) (n int, err error) {
  617. n, err = conn.Conn.Read(buffer)
  618. if err != nil && err != io.EOF {
  619. // Report new failure. Won't block; assumes the receiver
  620. // has a sufficient buffer for the threshold number of reports.
  621. // TODO: conditional on type of error or error message?
  622. select {
  623. case conn.tunnel.signalPortForwardFailure <- struct{}{}:
  624. default:
  625. }
  626. }
  627. return
  628. }
  629. func (conn *TunneledConn) Write(buffer []byte) (n int, err error) {
  630. n, err = conn.Conn.Write(buffer)
  631. if err != nil && err != io.EOF {
  632. // Same as TunneledConn.Read()
  633. select {
  634. case conn.tunnel.signalPortForwardFailure <- struct{}{}:
  635. default:
  636. }
  637. }
  638. return
  639. }
  640. func (conn *TunneledConn) Close() error {
  641. if conn.downstreamConn != nil {
  642. conn.downstreamConn.Close()
  643. }
  644. return conn.Conn.Close()
  645. }
  646. type dialResult struct {
  647. dialConn net.Conn
  648. monitoringStartTime time.Time
  649. monitoredConn *common.BurstMonitoredConn
  650. sshClient *ssh.Client
  651. sshRequests <-chan *ssh.Request
  652. livenessTestMetrics *livenessTestMetrics
  653. extraFailureAction func()
  654. }
  655. // dialTunnel is a helper that builds the transport layers and establishes the
  656. // SSH connection. When additional dial configuration is used, dial metrics
  657. // are recorded and returned.
  658. func dialTunnel(
  659. ctx context.Context,
  660. config *Config,
  661. dialParams *DialParameters) (_ *dialResult, retErr error) {
  662. // Return immediately when overall context is canceled or timed-out. This
  663. // avoids notice noise.
  664. err := ctx.Err()
  665. if err != nil {
  666. return nil, errors.Trace(err)
  667. }
  668. p := getCustomParameters(config, dialParams)
  669. timeout := p.Duration(parameters.TunnelConnectTimeout)
  670. rateLimits := p.RateLimits(parameters.TunnelRateLimits)
  671. obfuscatedSSHMinPadding := p.Int(parameters.ObfuscatedSSHMinPadding)
  672. obfuscatedSSHMaxPadding := p.Int(parameters.ObfuscatedSSHMaxPadding)
  673. livenessTestMinUpstreamBytes := p.Int(parameters.LivenessTestMinUpstreamBytes)
  674. livenessTestMaxUpstreamBytes := p.Int(parameters.LivenessTestMaxUpstreamBytes)
  675. livenessTestMinDownstreamBytes := p.Int(parameters.LivenessTestMinDownstreamBytes)
  676. livenessTestMaxDownstreamBytes := p.Int(parameters.LivenessTestMaxDownstreamBytes)
  677. burstUpstreamTargetBytes := int64(p.Int(parameters.ClientBurstUpstreamTargetBytes))
  678. burstUpstreamDeadline := p.Duration(parameters.ClientBurstUpstreamDeadline)
  679. burstDownstreamTargetBytes := int64(p.Int(parameters.ClientBurstDownstreamTargetBytes))
  680. burstDownstreamDeadline := p.Duration(parameters.ClientBurstDownstreamDeadline)
  681. tlsOSSHApplyTrafficShaping := p.WeightedCoinFlip(parameters.TLSTunnelTrafficShapingProbability)
  682. tlsOSSHMinTLSPadding := p.Int(parameters.TLSTunnelMinTLSPadding)
  683. tlsOSSHMaxTLSPadding := p.Int(parameters.TLSTunnelMaxTLSPadding)
  684. conjureEnableIPv6Dials := p.Bool(parameters.ConjureEnableIPv6Dials)
  685. conjureEnablePortRandomization := p.Bool(parameters.ConjureEnablePortRandomization)
  686. conjureEnableRegistrationOverrides := p.Bool(parameters.ConjureEnableRegistrationOverrides)
  687. p.Close()
  688. // Ensure that, unless the base context is cancelled, any replayed dial
  689. // parameters are cleared, no longer to be retried, if the tunnel fails to
  690. // connect.
  691. //
  692. // Limitation: dials that fail to connect due to the server being in a
  693. // load-limiting state are not distinguished and excepted from this
  694. // logic.
  695. dialSucceeded := false
  696. baseCtx := ctx
  697. var failedTunnelLivenessTestMetrics *livenessTestMetrics
  698. var extraFailureAction func()
  699. defer func() {
  700. if !dialSucceeded && baseCtx.Err() != context.Canceled {
  701. dialParams.Failed(config)
  702. if extraFailureAction != nil {
  703. extraFailureAction()
  704. }
  705. if retErr != nil {
  706. _ = RecordFailedTunnelStat(
  707. config,
  708. dialParams,
  709. failedTunnelLivenessTestMetrics,
  710. -1,
  711. -1,
  712. retErr)
  713. }
  714. }
  715. }()
  716. var cancelFunc context.CancelFunc
  717. ctx, cancelFunc = context.WithTimeout(ctx, timeout)
  718. defer cancelFunc()
  719. // DialDuration is the elapsed time for both successful and failed tunnel
  720. // dials. For successful tunnels, it includes any the network protocol
  721. // handshake(s), obfuscation protocol handshake(s), SSH handshake, and
  722. // liveness test, when performed.
  723. //
  724. // Note: ensure DialDuration is set before calling any function which logs
  725. // dial_duration.
  726. startDialTime := time.Now()
  727. defer func() {
  728. dialParams.DialDuration = time.Since(startDialTime)
  729. }()
  730. // Note: dialParams.MeekResolvedIPAddress isn't set until the dial begins,
  731. // so it will always be blank in NoticeConnectingServer.
  732. NoticeConnectingServer(dialParams)
  733. // Create the base transport: meek or direct connection
  734. var dialConn net.Conn
  735. if protocol.TunnelProtocolUsesMeek(dialParams.TunnelProtocol) {
  736. dialConn, err = DialMeek(
  737. ctx,
  738. dialParams.GetMeekConfig(),
  739. dialParams.GetDialConfig())
  740. if err != nil {
  741. return nil, errors.Trace(err)
  742. }
  743. } else if protocol.TunnelProtocolUsesQUIC(dialParams.TunnelProtocol) {
  744. var packetConn net.PacketConn
  745. var remoteAddr *net.UDPAddr
  746. // Special case: explict in-proxy dial. TCP dials wire up in-proxy
  747. // dials via DialConfig and its CustomDialer using
  748. // makeInproxyTCPDialer. common/quic doesn't have an equivilent to
  749. // CustomDialer.
  750. if protocol.TunnelProtocolUsesInproxy(dialParams.TunnelProtocol) {
  751. packetConn, err = dialInproxy(ctx, config, dialParams)
  752. if err != nil {
  753. return nil, errors.Trace(err)
  754. }
  755. // Use the actual 2nd hop destination address as the remote
  756. // address for correct behavior in
  757. // common/quic.getMaxPreDiscoveryPacketSize, which differs for
  758. // IPv4 vs. IPv6 destination addresses; and
  759. // ObfuscatedPacketConn.RemoteAddr. The 2nd hop destination
  760. // address is not actually dialed.
  761. //
  762. // Limitation: for domain destinations, the in-proxy proxy
  763. // resolves the domain, so just assume IPv6, which has lower max
  764. // padding(see quic.getMaxPreDiscoveryPacketSize), and use a stub
  765. // address.
  766. host, portStr, err := net.SplitHostPort(dialParams.DirectDialAddress)
  767. if err != nil {
  768. return nil, errors.Trace(err)
  769. }
  770. port, err := strconv.Atoi(portStr)
  771. if err != nil {
  772. return nil, errors.Trace(err)
  773. }
  774. IP := net.ParseIP(host)
  775. if IP == nil {
  776. IP = net.ParseIP("fd00::")
  777. }
  778. remoteAddr = &net.UDPAddr{IP: IP, Port: port}
  779. } else {
  780. packetConn, remoteAddr, err = NewUDPConn(
  781. ctx, "udp", false, "", dialParams.DirectDialAddress, dialParams.GetDialConfig())
  782. if err != nil {
  783. return nil, errors.Trace(err)
  784. }
  785. }
  786. dialConn, err = quic.Dial(
  787. ctx,
  788. packetConn,
  789. remoteAddr,
  790. dialParams.QUICDialSNIAddress,
  791. dialParams.QUICVersion,
  792. dialParams.QUICClientHelloSeed,
  793. dialParams.ServerEntry.SshObfuscatedKey,
  794. dialParams.ObfuscatedQUICPaddingSeed,
  795. dialParams.ObfuscatedQUICNonceTransformerParameters,
  796. dialParams.QUICDisablePathMTUDiscovery)
  797. if err != nil {
  798. return nil, errors.Trace(err)
  799. }
  800. } else if protocol.TunnelProtocolUsesTapDance(dialParams.TunnelProtocol) {
  801. dialConn, err = refraction.DialTapDance(
  802. ctx,
  803. config.EmitRefractionNetworkingLogs,
  804. config.GetPsiphonDataDirectory(),
  805. NewRefractionNetworkingDialer(dialParams.GetDialConfig()).DialContext,
  806. dialParams.DirectDialAddress)
  807. if err != nil {
  808. return nil, errors.Trace(err)
  809. }
  810. } else if protocol.TunnelProtocolUsesConjure(dialParams.TunnelProtocol) {
  811. dialConn, extraFailureAction, err = dialConjure(
  812. ctx,
  813. config,
  814. dialParams,
  815. conjureEnableIPv6Dials,
  816. conjureEnablePortRandomization,
  817. conjureEnableRegistrationOverrides)
  818. if err != nil {
  819. return nil, errors.Trace(err)
  820. }
  821. } else if protocol.TunnelProtocolUsesTLSOSSH(dialParams.TunnelProtocol) {
  822. dialConn, err = DialTLSTunnel(
  823. ctx,
  824. dialParams.GetTLSOSSHConfig(config),
  825. dialParams.GetDialConfig(),
  826. tlsOSSHApplyTrafficShaping,
  827. tlsOSSHMinTLSPadding,
  828. tlsOSSHMaxTLSPadding)
  829. if err != nil {
  830. return nil, errors.Trace(err)
  831. }
  832. } else {
  833. // Use NewTCPDialer and don't use DialTCP directly, to ensure that
  834. // dialParams.GetDialConfig()CustomDialer is applied.
  835. tcpDialer := NewTCPDialer(dialParams.GetDialConfig())
  836. dialConn, err = tcpDialer(ctx, "tcp", dialParams.DirectDialAddress)
  837. if err != nil {
  838. return nil, errors.Trace(err)
  839. }
  840. }
  841. // Some conns report additional metrics. fragmentor.Conns report
  842. // fragmentor configs.
  843. if metricsSource, ok := dialConn.(common.MetricsSource); ok {
  844. dialParams.DialConnMetrics = metricsSource
  845. }
  846. if noticeMetricsSource, ok := dialConn.(common.NoticeMetricsSource); ok {
  847. dialParams.DialConnNoticeMetrics = noticeMetricsSource
  848. }
  849. // If dialConn is not a Closer, tunnel failure detection may be slower
  850. if _, ok := dialConn.(common.Closer); !ok {
  851. NoticeWarning("tunnel.dialTunnel: dialConn is not a Closer")
  852. }
  853. cleanupConn := dialConn
  854. defer func() {
  855. // Cleanup on error
  856. if cleanupConn != nil {
  857. cleanupConn.Close()
  858. }
  859. }()
  860. monitoringStartTime := time.Now()
  861. monitoredConn := common.NewBurstMonitoredConn(
  862. dialConn,
  863. false,
  864. burstUpstreamTargetBytes, burstUpstreamDeadline,
  865. burstDownstreamTargetBytes, burstDownstreamDeadline)
  866. // Apply throttling (if configured). The underlying dialConn is always a
  867. // stream, even when the network conn uses UDP.
  868. throttledConn := common.NewThrottledConn(
  869. monitoredConn,
  870. true,
  871. rateLimits)
  872. // Add obfuscated SSH layer
  873. var sshConn net.Conn = throttledConn
  874. if protocol.TunnelProtocolUsesObfuscatedSSH(dialParams.TunnelProtocol) {
  875. obfuscatedSSHConn, err := obfuscator.NewClientObfuscatedSSHConn(
  876. throttledConn,
  877. dialParams.ServerEntry.SshObfuscatedKey,
  878. dialParams.ObfuscatorPaddingSeed,
  879. dialParams.OSSHObfuscatorSeedTransformerParameters,
  880. dialParams.OSSHPrefixSpec,
  881. dialParams.OSSHPrefixSplitConfig,
  882. &obfuscatedSSHMinPadding,
  883. &obfuscatedSSHMaxPadding)
  884. if err != nil {
  885. return nil, errors.Trace(err)
  886. }
  887. sshConn = obfuscatedSSHConn
  888. dialParams.ObfuscatedSSHConnMetrics = obfuscatedSSHConn
  889. }
  890. // Now establish the SSH session over the conn transport
  891. expectedPublicKey, err := base64.StdEncoding.DecodeString(
  892. dialParams.ServerEntry.SshHostKey)
  893. if err != nil {
  894. return nil, errors.Trace(err)
  895. }
  896. sshCertChecker := &ssh.CertChecker{
  897. IsHostAuthority: func(auth ssh.PublicKey, address string) bool {
  898. // Psiphon servers do not currently use SSH certificates. This CertChecker
  899. // code path may still be hit if a client attempts to connect using an
  900. // obsolete server entry.
  901. return false
  902. },
  903. HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
  904. // The remote address input isn't checked. In the case of fronted
  905. // protocols, the immediate remote peer won't be the Psiphon
  906. // server. In direct cases, the client has just dialed the IP
  907. // address and expected public key both taken from the same
  908. // trusted, signed server entry.
  909. if !bytes.Equal(expectedPublicKey, publicKey.Marshal()) {
  910. return errors.TraceNew("unexpected host public key")
  911. }
  912. return nil
  913. },
  914. }
  915. sshPasswordPayload := &protocol.SSHPasswordPayload{
  916. SessionId: config.SessionID,
  917. SshPassword: dialParams.ServerEntry.SshPassword,
  918. ClientCapabilities: []string{protocol.CLIENT_CAPABILITY_SERVER_REQUESTS},
  919. }
  920. payload, err := json.Marshal(sshPasswordPayload)
  921. if err != nil {
  922. return nil, errors.Trace(err)
  923. }
  924. sshClientConfig := &ssh.ClientConfig{
  925. User: dialParams.ServerEntry.SshUsername,
  926. Auth: []ssh.AuthMethod{
  927. ssh.Password(string(payload)),
  928. },
  929. HostKeyCallback: sshCertChecker.CheckHostKey,
  930. ClientVersion: dialParams.SSHClientVersion,
  931. }
  932. sshClientConfig.KEXPRNGSeed = dialParams.SSHKEXSeed
  933. if protocol.TunnelProtocolUsesObfuscatedSSH(dialParams.TunnelProtocol) {
  934. if config.ObfuscatedSSHAlgorithms != nil {
  935. sshClientConfig.KeyExchanges = []string{config.ObfuscatedSSHAlgorithms[0]}
  936. sshClientConfig.Ciphers = []string{config.ObfuscatedSSHAlgorithms[1]}
  937. sshClientConfig.MACs = []string{config.ObfuscatedSSHAlgorithms[2]}
  938. sshClientConfig.HostKeyAlgorithms = []string{config.ObfuscatedSSHAlgorithms[3]}
  939. } else {
  940. // With Encrypt-then-MAC hash algorithms, packet length is
  941. // transmitted in plaintext, which aids in traffic analysis.
  942. //
  943. // TUNNEL_PROTOCOL_SSH is excepted since its KEX appears in plaintext,
  944. // and the protocol is intended to look like SSH on the wire.
  945. sshClientConfig.NoEncryptThenMACHash = true
  946. }
  947. } else {
  948. // For TUNNEL_PROTOCOL_SSH only, the server is expected to randomize
  949. // its KEX; setting PeerKEXPRNGSeed will ensure successful negotiation
  950. // between two randomized KEXes.
  951. if dialParams.ServerEntry.SshObfuscatedKey != "" {
  952. sshClientConfig.PeerKEXPRNGSeed, err = protocol.DeriveSSHServerKEXPRNGSeed(
  953. dialParams.ServerEntry.SshObfuscatedKey)
  954. if err != nil {
  955. return nil, errors.Trace(err)
  956. }
  957. }
  958. }
  959. // The ssh session establishment (via ssh.NewClientConn) is wrapped
  960. // in a timeout to ensure it won't hang. We've encountered firewalls
  961. // that allow the TCP handshake to complete but then send a RST to the
  962. // server-side and nothing to the client-side, and if that happens
  963. // while ssh.NewClientConn is reading, it may wait forever. The timeout
  964. // closes the conn, which interrupts it.
  965. // Note: TCP handshake timeouts are provided by TCPConn, and session
  966. // timeouts *after* ssh establishment are provided by the ssh keep alive
  967. // in operate tunnel.
  968. type sshNewClientResult struct {
  969. sshClient *ssh.Client
  970. sshRequests <-chan *ssh.Request
  971. livenessTestMetrics *livenessTestMetrics
  972. err error
  973. }
  974. resultChannel := make(chan sshNewClientResult)
  975. // Call NewClientConn in a goroutine, as it blocks on SSH handshake network
  976. // operations, and would block canceling or shutdown. If the parent context
  977. // is canceled, close the net.Conn underlying SSH, which will interrupt the
  978. // SSH handshake that may be blocking NewClientConn.
  979. go func() {
  980. // The following is adapted from ssh.Dial(), here using a custom conn
  981. // The sshAddress is passed through to host key verification callbacks; we don't use it.
  982. sshAddress := ""
  983. sshClientConn, sshChannels, sshRequests, err := ssh.NewClientConn(
  984. sshConn, sshAddress, sshClientConfig)
  985. var sshClient *ssh.Client
  986. var metrics *livenessTestMetrics
  987. if err == nil {
  988. // sshRequests is handled by operateTunnel.
  989. // ssh.NewClient also expects to handle the sshRequests
  990. // value from ssh.NewClientConn and will spawn a goroutine
  991. // to handle the <-chan *ssh.Request, so we must provide
  992. // a closed channel to ensure that goroutine halts instead
  993. // of hanging on a nil channel.
  994. noRequests := make(chan *ssh.Request)
  995. close(noRequests)
  996. sshClient = ssh.NewClient(sshClientConn, sshChannels, noRequests)
  997. if livenessTestMaxUpstreamBytes > 0 || livenessTestMaxDownstreamBytes > 0 {
  998. // When configured, perform a liveness test which sends and
  999. // receives bytes through the tunnel to ensure the tunnel had
  1000. // not been blocked upon or shortly after connecting. This
  1001. // test is performed concurrently for each establishment
  1002. // candidate before selecting a successful tunnel.
  1003. //
  1004. // Note that the liveness test is subject to the
  1005. // TunnelConnectTimeout, which should be adjusted
  1006. // accordinging.
  1007. metrics, err = performLivenessTest(
  1008. sshClient,
  1009. livenessTestMinUpstreamBytes, livenessTestMaxUpstreamBytes,
  1010. livenessTestMinDownstreamBytes, livenessTestMaxDownstreamBytes,
  1011. dialParams.LivenessTestSeed)
  1012. // Skip notice when cancelling.
  1013. if baseCtx.Err() == nil {
  1014. NoticeLivenessTest(
  1015. dialParams.ServerEntry.GetDiagnosticID(), metrics, err == nil)
  1016. }
  1017. }
  1018. }
  1019. resultChannel <- sshNewClientResult{sshClient, sshRequests, metrics, err}
  1020. }()
  1021. var result sshNewClientResult
  1022. select {
  1023. case result = <-resultChannel:
  1024. case <-ctx.Done():
  1025. // Interrupt the goroutine and capture its error context to
  1026. // distinguish point of failure.
  1027. err := ctx.Err()
  1028. sshConn.Close()
  1029. result = <-resultChannel
  1030. if result.err != nil {
  1031. result.err = fmt.Errorf("%s: %s", err, result.err)
  1032. } else {
  1033. result.err = err
  1034. }
  1035. }
  1036. if result.err != nil {
  1037. failedTunnelLivenessTestMetrics = result.livenessTestMetrics
  1038. return nil, errors.Trace(result.err)
  1039. }
  1040. dialSucceeded = true
  1041. NoticeConnectedServer(dialParams)
  1042. cleanupConn = nil
  1043. // Invoke DNS cache extension (if enabled in the resolver) now that the
  1044. // tunnel is connected and the Psiphon server is authenticated. This
  1045. // demonstrates that any domain name resolved to an endpoint that is or
  1046. // is forwarded to the expected Psiphon server.
  1047. //
  1048. // Limitation: DNS cache extension is not implemented for Refraction
  1049. // Networking protocols. iOS VPN, the primary use case for DNS cache
  1050. // extension, does not enable Refraction Networking.
  1051. if protocol.TunnelProtocolUsesFrontedMeek(dialParams.TunnelProtocol) {
  1052. resolver := config.GetResolver()
  1053. if resolver != nil {
  1054. resolver.VerifyCacheExtension(dialParams.MeekFrontingDialAddress)
  1055. }
  1056. }
  1057. // When configured to do so, hold-off on activating this tunnel. This allows
  1058. // some extra time for slower but less resource intensive protocols to
  1059. // establish tunnels. By holding off post-connect, the client has this
  1060. // established tunnel ready to activate in case other protocols fail to
  1061. // establish. This hold-off phase continues to consume one connection worker.
  1062. //
  1063. // The network latency multiplier is not applied to HoldOffTunnelDuration,
  1064. // as the goal is to apply a consistent hold-off range across all tunnel
  1065. // candidates; and this avoids scaling up any delay users experience.
  1066. //
  1067. // The hold-off is applied regardless of whether this is the first tunnel
  1068. // in a session or a reconnection, even to a server affinity candidate,
  1069. // so that the advantage for other protocols persists.
  1070. if dialParams.HoldOffTunnelDuration > 0 {
  1071. NoticeHoldOffTunnel(dialParams.ServerEntry.GetDiagnosticID(), dialParams.HoldOffTunnelDuration)
  1072. common.SleepWithContext(ctx, dialParams.HoldOffTunnelDuration)
  1073. }
  1074. // Note: dialConn may be used to close the underlying network connection
  1075. // but should not be used to perform I/O as that would interfere with SSH
  1076. // (and also bypasses throttling).
  1077. return &dialResult{
  1078. dialConn: dialConn,
  1079. monitoringStartTime: monitoringStartTime,
  1080. monitoredConn: monitoredConn,
  1081. sshClient: result.sshClient,
  1082. sshRequests: result.sshRequests,
  1083. livenessTestMetrics: result.livenessTestMetrics,
  1084. extraFailureAction: extraFailureAction,
  1085. },
  1086. nil
  1087. }
  1088. func dialConjure(
  1089. ctx context.Context,
  1090. config *Config,
  1091. dialParams *DialParameters,
  1092. enableIPv6Dials bool,
  1093. enablePortRandomization bool,
  1094. enableRegistrationOverrides bool) (net.Conn, func(), error) {
  1095. // Specify a cache key with a scope that ensures that:
  1096. //
  1097. // (a) cached registrations aren't used across different networks, as a
  1098. // registration requires the client's public IP to match the value at time
  1099. // of registration;
  1100. //
  1101. // (b) cached registrations are associated with specific Psiphon server
  1102. // candidates, to ensure that replay will use the same phantom IP(s).
  1103. //
  1104. // This scheme allows for reuse of cached registrations on network A when a
  1105. // client roams from network A to network B and back to network A.
  1106. //
  1107. // Using the network ID as a proxy for client public IP address is a
  1108. // heurisitic: it's possible that a clients public IP address changes
  1109. // without the network ID changing, and it's not guaranteed that the client
  1110. // will be assigned the original public IP on network A; so there's some
  1111. // chance the registration cannot be reused.
  1112. diagnosticID := dialParams.ServerEntry.GetDiagnosticID()
  1113. cacheKey := dialParams.NetworkID + "-" + diagnosticID
  1114. conjureConfig := &refraction.ConjureConfig{
  1115. RegistrationCacheTTL: dialParams.ConjureCachedRegistrationTTL,
  1116. RegistrationCacheKey: cacheKey,
  1117. EnableIPv6Dials: enableIPv6Dials,
  1118. EnablePortRandomization: enablePortRandomization,
  1119. EnableRegistrationOverrides: enableRegistrationOverrides,
  1120. Transport: dialParams.ConjureTransport,
  1121. STUNServerAddress: dialParams.ConjureSTUNServerAddress,
  1122. DTLSEmptyInitialPacket: dialParams.ConjureDTLSEmptyInitialPacket,
  1123. DiagnosticID: diagnosticID,
  1124. Logger: NoticeCommonLogger(false),
  1125. }
  1126. if dialParams.ConjureAPIRegistration {
  1127. // Use MeekConn to domain front Conjure API registration.
  1128. //
  1129. // ConjureAPIRegistrarFrontingSpecs are applied via
  1130. // dialParams.GetMeekConfig, and will be subject to replay.
  1131. //
  1132. // Since DialMeek will create a TLS connection immediately, and a cached
  1133. // registration may be used, we will delay initializing the MeekConn-based
  1134. // RoundTripper until we know it's needed. This is implemented by passing
  1135. // in a RoundTripper that establishes a MeekConn when RoundTrip is called.
  1136. //
  1137. // In refraction.dial we configure 0 retries for API registration requests,
  1138. // assuming it's better to let another Psiphon candidate retry, with new
  1139. // domaing fronting parameters. As such, we expect only one round trip call
  1140. // per NewHTTPRoundTripper, so, in practise, there's no performance penalty
  1141. // from establishing a new MeekConn per round trip.
  1142. //
  1143. // Performing the full DialMeek/RoundTrip operation here allows us to call
  1144. // MeekConn.Close and ensure all resources are immediately cleaned up.
  1145. roundTrip := func(request *http.Request) (*http.Response, error) {
  1146. conn, err := DialMeek(
  1147. ctx, dialParams.GetMeekConfig(), dialParams.GetDialConfig())
  1148. if err != nil {
  1149. return nil, errors.Trace(err)
  1150. }
  1151. defer conn.Close()
  1152. response, err := conn.RoundTrip(request)
  1153. if err != nil {
  1154. return nil, errors.Trace(err)
  1155. }
  1156. // Read the response into a buffer and close the response
  1157. // body, ensuring that MeekConn.Close closes all idle connections.
  1158. //
  1159. // Alternatively, we could Clone the request to set
  1160. // http.Request.Close and avoid keeping any idle connection
  1161. // open after the response body is read by gotapdance. Since
  1162. // the response body is small and since gotapdance does not
  1163. // stream the response body, we're taking this approach which
  1164. // ensures cleanup.
  1165. body, err := ioutil.ReadAll(response.Body)
  1166. _ = response.Body.Close()
  1167. if err != nil {
  1168. return nil, errors.Trace(err)
  1169. }
  1170. response.Body = io.NopCloser(bytes.NewReader(body))
  1171. return response, nil
  1172. }
  1173. conjureConfig.APIRegistrarHTTPClient = &http.Client{
  1174. Transport: common.NewHTTPRoundTripper(roundTrip),
  1175. }
  1176. conjureConfig.APIRegistrarBidirectionalURL =
  1177. dialParams.ConjureAPIRegistrarBidirectionalURL
  1178. conjureConfig.APIRegistrarDelay = dialParams.ConjureAPIRegistrarDelay
  1179. } else if dialParams.ConjureDecoyRegistration {
  1180. // The Conjure "phantom" connection is compatible with fragmentation, but
  1181. // the decoy registrar connection, like Tapdance, is not, so force it off.
  1182. // Any tunnel fragmentation metrics will refer to the "phantom" connection
  1183. // only.
  1184. conjureConfig.DoDecoyRegistration = true
  1185. conjureConfig.DecoyRegistrarWidth = dialParams.ConjureDecoyRegistrarWidth
  1186. conjureConfig.DecoyRegistrarDelay = dialParams.ConjureDecoyRegistrarDelay
  1187. }
  1188. // Set extraFailureAction, which is invoked whenever the tunnel fails (i.e.,
  1189. // where RecordFailedTunnelStat is invoked). The action will remove any
  1190. // cached registration. When refraction.DialConjure succeeds, the underlying
  1191. // registration is cached. After refraction.DialConjure returns, it no
  1192. // longer modifies the cached state of that registration, assuming that it
  1193. // remains valid and effective. However adversarial impact on a given
  1194. // phantom IP may not become evident until after the initial TCP connection
  1195. // establishment and handshake performed by refraction.DialConjure. For
  1196. // example, it may be that the phantom dial is targeted for severe
  1197. // throttling which begins or is only evident later in the flow. Scheduling
  1198. // a call to DeleteCachedConjureRegistration allows us to invalidate the
  1199. // cached registration for a tunnel that fails later in its lifecycle.
  1200. //
  1201. // Note that extraFailureAction will retain a reference to conjureConfig for
  1202. // the lifetime of the tunnel.
  1203. extraFailureAction := func() {
  1204. refraction.DeleteCachedConjureRegistration(conjureConfig)
  1205. }
  1206. dialCtx := ctx
  1207. if protocol.ConjureTransportUsesDTLS(dialParams.ConjureTransport) {
  1208. // Conjure doesn't use the DTLS seed scheme, which supports in-proxy
  1209. // DTLS randomization. But every DTLS dial expects to find a seed
  1210. // state, so set the no-seed state.
  1211. dialCtx = inproxy_dtls.SetNoDTLSSeed(ctx)
  1212. }
  1213. dialConn, err := refraction.DialConjure(
  1214. dialCtx,
  1215. config.EmitRefractionNetworkingLogs,
  1216. config.GetPsiphonDataDirectory(),
  1217. NewRefractionNetworkingDialer(dialParams.GetDialConfig()).DialContext,
  1218. dialParams.DirectDialAddress,
  1219. conjureConfig)
  1220. if err != nil {
  1221. // When this function fails, invoke extraFailureAction directly; when it
  1222. // succeeds, return extraFailureAction to be called later.
  1223. extraFailureAction()
  1224. return nil, nil, errors.Trace(err)
  1225. }
  1226. return dialConn, extraFailureAction, nil
  1227. }
  1228. // makeInproxyTCPDialer returns a dialer which proxies TCP dials via an
  1229. // in-proxy proxy, as configured in dialParams.
  1230. //
  1231. // Limitation: MeekConn may redial TCP for a single tunnel connection, but
  1232. // that case is not supported by the in-proxy protocol, as the in-proxy proxy
  1233. // closes both its WebRTC DataChannel and the overall client connection when
  1234. // the upstream TCP connection closes. Any new connection from the client to
  1235. // the proxy must be a new tunnel connection with and accompanying
  1236. // broker/server relay. As a future enhancement, consider extending the
  1237. // in-proxy protocol to enable the client and proxy to establish additional
  1238. // WebRTC DataChannels and new upstream TCP connections within the scope of a
  1239. // single proxy/client connection.
  1240. func makeInproxyTCPDialer(
  1241. config *Config, dialParams *DialParameters) common.Dialer {
  1242. return func(ctx context.Context, _, _ string) (net.Conn, error) {
  1243. if dialParams.inproxyConn.Load() != nil {
  1244. return nil, errors.TraceNew("redial not supported")
  1245. }
  1246. var conn net.Conn
  1247. var err error
  1248. conn, err = dialInproxy(ctx, config, dialParams)
  1249. if err != nil {
  1250. return nil, errors.Trace(err)
  1251. }
  1252. // When the TCP fragmentor is configured for the 2nd hop protocol,
  1253. // approximate the behavior by applying the fragmentor to the WebRTC
  1254. // DataChannel writes, which will result in delays and DataChannel
  1255. // message sizes which will be reflected in the proxy's relay to its
  1256. // upstream TCP connection.
  1257. //
  1258. // This code is copied from DialTCP.
  1259. //
  1260. // Limitation: TCP BPF settings are not supported and currently
  1261. // disabled for all 2nd hop cases in
  1262. // protocol.TunnelProtocolMayUseClientBPF.
  1263. if dialParams.dialConfig.FragmentorConfig.MayFragment() {
  1264. conn = fragmentor.NewConn(
  1265. dialParams.dialConfig.FragmentorConfig,
  1266. func(message string) {
  1267. NoticeFragmentor(dialParams.dialConfig.DiagnosticID, message)
  1268. },
  1269. conn)
  1270. }
  1271. return conn, nil
  1272. }
  1273. }
  1274. // dialInproxy performs the in-proxy dial and returns the resulting conn for
  1275. // use as an underlying conn for the 2nd hop protocol. The in-proxy dial
  1276. // first connects to the broker (or reuses an existing connection) to match
  1277. // with a proxy; and then establishes connection to the proxy.
  1278. func dialInproxy(
  1279. ctx context.Context,
  1280. config *Config,
  1281. dialParams *DialParameters) (*inproxy.ClientConn, error) {
  1282. isProxy := false
  1283. webRTCDialInstance, err := NewInproxyWebRTCDialInstance(
  1284. config,
  1285. dialParams.NetworkID,
  1286. isProxy,
  1287. dialParams.inproxyNATStateManager,
  1288. dialParams.InproxySTUNDialParameters,
  1289. dialParams.InproxyWebRTCDialParameters)
  1290. if err != nil {
  1291. return nil, errors.Trace(err)
  1292. }
  1293. // dialAddress indicates to the broker and proxy how to dial the upstream
  1294. // Psiphon server, based on the 2nd hop tunnel protocol.
  1295. networkProtocol := inproxy.NetworkProtocolUDP
  1296. reliableTransport := false
  1297. if protocol.TunnelProtocolUsesTCP(dialParams.TunnelProtocol) {
  1298. networkProtocol = inproxy.NetworkProtocolTCP
  1299. reliableTransport = true
  1300. }
  1301. dialAddress := dialParams.DirectDialAddress
  1302. if protocol.TunnelProtocolUsesMeek(dialParams.TunnelProtocol) {
  1303. dialAddress = dialParams.MeekDialAddress
  1304. }
  1305. // Specify the value to be returned by inproxy.ClientConn.RemoteAddr.
  1306. // Currently, the one caller of RemoteAddr is utls, which uses the
  1307. // RemoteAddr as a TLS session cache key when there is no SNI.
  1308. // GetTLSSessionCacheKeyAddress returns a cache key value that is a valid
  1309. // address and that is also a more appropriate TLS session cache key than
  1310. // the proxy address.
  1311. remoteAddrOverride, err := dialParams.ServerEntry.GetTLSSessionCacheKeyAddress(
  1312. dialParams.TunnelProtocol)
  1313. if err != nil {
  1314. return nil, errors.Trace(err)
  1315. }
  1316. // Unlike the proxy broker case, clients already actively fetch tactics
  1317. // during tunnel estalishment, so tactics.SetTacticsAPIParameters are not
  1318. // sent to the broker and no tactics are returned by the broker.
  1319. //
  1320. // TODO: include broker fronting dial parameters to be logged by the
  1321. // broker -- as successful parameters might not otherwise by logged via
  1322. // server_tunnel if the subsequent WebRTC dials fail.
  1323. params := getBaseAPIParameters(
  1324. baseParametersNoDialParameters, true, config, nil)
  1325. // The debugLogging flag is passed to both NoticeCommonLogger and to the
  1326. // inproxy package as well; skipping debug logs in the inproxy package,
  1327. // before calling into the notice logger, avoids unnecessary allocations
  1328. // and formatting when debug logging is off.
  1329. debugLogging := config.InproxyEnableWebRTCDebugLogging
  1330. clientConfig := &inproxy.ClientConfig{
  1331. Logger: NoticeCommonLogger(debugLogging),
  1332. EnableWebRTCDebugLogging: debugLogging,
  1333. BaseAPIParameters: params,
  1334. BrokerClient: dialParams.inproxyBrokerClient,
  1335. WebRTCDialCoordinator: webRTCDialInstance,
  1336. ReliableTransport: reliableTransport,
  1337. DialNetworkProtocol: networkProtocol,
  1338. DialAddress: dialAddress,
  1339. RemoteAddrOverride: remoteAddrOverride,
  1340. PackedDestinationServerEntry: dialParams.inproxyPackedSignedServerEntry,
  1341. MustUpgrade: config.OnInproxyMustUpgrade,
  1342. }
  1343. conn, err := inproxy.DialClient(ctx, clientConfig)
  1344. if err != nil {
  1345. return nil, errors.Trace(err)
  1346. }
  1347. // The inproxy.ClientConn is stored in dialParams.inproxyConn in order to
  1348. // later fetch its connection ID and to facilitate broker/client replay.
  1349. dialParams.inproxyConn.Store(conn)
  1350. return conn, nil
  1351. }
  1352. // Fields are exported for JSON encoding in NoticeLivenessTest.
  1353. type livenessTestMetrics struct {
  1354. Duration string
  1355. UpstreamBytes int
  1356. SentUpstreamBytes int
  1357. DownstreamBytes int
  1358. ReceivedDownstreamBytes int
  1359. }
  1360. func performLivenessTest(
  1361. sshClient *ssh.Client,
  1362. minUpstreamBytes, maxUpstreamBytes int,
  1363. minDownstreamBytes, maxDownstreamBytes int,
  1364. livenessTestPRNGSeed *prng.Seed) (*livenessTestMetrics, error) {
  1365. metrics := new(livenessTestMetrics)
  1366. defer func(startTime time.Time) {
  1367. metrics.Duration = time.Since(startTime).String()
  1368. }(time.Now())
  1369. PRNG := prng.NewPRNGWithSeed(livenessTestPRNGSeed)
  1370. metrics.UpstreamBytes = PRNG.Range(minUpstreamBytes, maxUpstreamBytes)
  1371. metrics.DownstreamBytes = PRNG.Range(minDownstreamBytes, maxDownstreamBytes)
  1372. request := &protocol.RandomStreamRequest{
  1373. UpstreamBytes: metrics.UpstreamBytes,
  1374. DownstreamBytes: metrics.DownstreamBytes,
  1375. }
  1376. extraData, err := json.Marshal(request)
  1377. if err != nil {
  1378. return metrics, errors.Trace(err)
  1379. }
  1380. channel, requests, err := sshClient.OpenChannel(
  1381. protocol.RANDOM_STREAM_CHANNEL_TYPE, extraData)
  1382. if err != nil {
  1383. return metrics, errors.Trace(err)
  1384. }
  1385. defer channel.Close()
  1386. go ssh.DiscardRequests(requests)
  1387. sent := 0
  1388. received := 0
  1389. upstream := new(sync.WaitGroup)
  1390. var errUpstream, errDownstream error
  1391. if metrics.UpstreamBytes > 0 {
  1392. // Process streams concurrently to minimize elapsed time. This also
  1393. // avoids a unidirectional flow burst early in the tunnel lifecycle.
  1394. upstream.Add(1)
  1395. go func() {
  1396. defer upstream.Done()
  1397. // In consideration of memory-constrained environments, use modest-sized copy
  1398. // buffers since many tunnel establishment workers may run the liveness test
  1399. // concurrently.
  1400. var buffer [4096]byte
  1401. n, err := common.CopyNBuffer(channel, rand.Reader, int64(metrics.UpstreamBytes), buffer[:])
  1402. sent = int(n)
  1403. if err != nil {
  1404. errUpstream = errors.Trace(err)
  1405. }
  1406. }()
  1407. }
  1408. if metrics.DownstreamBytes > 0 {
  1409. var buffer [4096]byte
  1410. n, err := common.CopyNBuffer(ioutil.Discard, channel, int64(metrics.DownstreamBytes), buffer[:])
  1411. received = int(n)
  1412. if err != nil {
  1413. errDownstream = errors.Trace(err)
  1414. }
  1415. }
  1416. upstream.Wait()
  1417. metrics.SentUpstreamBytes = sent
  1418. metrics.ReceivedDownstreamBytes = received
  1419. if errUpstream != nil {
  1420. return metrics, errUpstream
  1421. } else if errDownstream != nil {
  1422. return metrics, errDownstream
  1423. }
  1424. return metrics, nil
  1425. }
  1426. // operateTunnel monitors the health of the tunnel and performs
  1427. // periodic work.
  1428. //
  1429. // BytesTransferred and TotalBytesTransferred notices are emitted
  1430. // for live reporting and diagnostics reporting, respectively.
  1431. //
  1432. // Status requests are sent to the Psiphon API to report bytes
  1433. // transferred.
  1434. //
  1435. // Periodic SSH keep alive packets are sent to ensure the underlying
  1436. // TCP connection isn't terminated by NAT, or other network
  1437. // interference -- or test if it has been terminated while the device
  1438. // has been asleep. When a keep alive times out, the tunnel is
  1439. // considered failed.
  1440. //
  1441. // An immediate SSH keep alive "probe" is sent to test the tunnel and
  1442. // server responsiveness when a port forward failure is detected: a
  1443. // failed dial or failed read/write. This keep alive has a shorter
  1444. // timeout.
  1445. //
  1446. // Note that port forward failures may be due to non-failure conditions.
  1447. // For example, when the user inputs an invalid domain name and
  1448. // resolution is done by the ssh server; or trying to connect to a
  1449. // non-white-listed port; and the error message in these cases is not
  1450. // distinguishable from a a true server error (a common error message,
  1451. // "ssh: rejected: administratively prohibited (open failed)", may be
  1452. // returned for these cases but also if the server has run out of
  1453. // ephemeral ports, for example).
  1454. //
  1455. // SSH keep alives are not sent when the tunnel has been recently
  1456. // active (not only does tunnel activity obviate the necessity of a keep
  1457. // alive, testing has shown that keep alives may time out for "busy"
  1458. // tunnels, especially over meek protocol and other high latency
  1459. // conditions).
  1460. //
  1461. // "Recently active" is defined has having received payload bytes. Sent
  1462. // bytes are not considered as testing has shown bytes may appear to
  1463. // send when certain NAT devices have interfered with the tunnel, while
  1464. // no bytes are received. In a pathological case, with DNS implemented
  1465. // as tunneled UDP, a browser may wait excessively for a domain name to
  1466. // resolve, while no new port forward is attempted which would otherwise
  1467. // result in a tunnel failure detection.
  1468. //
  1469. // TODO: change "recently active" to include having received any
  1470. // SSH protocol messages from the server, not just user payload?
  1471. func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
  1472. defer tunnel.operateWaitGroup.Done()
  1473. now := time.Now()
  1474. lastBytesReceivedTime := now
  1475. lastTotalBytesTransferedTime := now
  1476. totalSent := int64(0)
  1477. totalReceived := int64(0)
  1478. setDialParamsSucceeded := false
  1479. noticeBytesTransferredTicker := time.NewTicker(1 * time.Second)
  1480. defer noticeBytesTransferredTicker.Stop()
  1481. // The next status request and ssh keep alive times are picked at random,
  1482. // from a range, to make the resulting traffic less fingerprintable,
  1483. // Note: not using Tickers since these are not fixed time periods.
  1484. nextStatusRequestPeriod := func() time.Duration {
  1485. p := tunnel.getCustomParameters()
  1486. return prng.Period(
  1487. p.Duration(parameters.PsiphonAPIStatusRequestPeriodMin),
  1488. p.Duration(parameters.PsiphonAPIStatusRequestPeriodMax))
  1489. }
  1490. statsTimer := time.NewTimer(nextStatusRequestPeriod())
  1491. defer statsTimer.Stop()
  1492. // Schedule an almost-immediate status request to deliver any unreported
  1493. // persistent stats.
  1494. unreported := CountUnreportedPersistentStats()
  1495. if unreported > 0 {
  1496. NoticeInfo("Unreported persistent stats: %d", unreported)
  1497. p := tunnel.getCustomParameters()
  1498. statsTimer.Reset(
  1499. prng.Period(
  1500. p.Duration(parameters.PsiphonAPIStatusRequestShortPeriodMin),
  1501. p.Duration(parameters.PsiphonAPIStatusRequestShortPeriodMax)))
  1502. }
  1503. nextSshKeepAlivePeriod := func() time.Duration {
  1504. p := tunnel.getCustomParameters()
  1505. return prng.Period(
  1506. p.Duration(parameters.SSHKeepAlivePeriodMin),
  1507. p.Duration(parameters.SSHKeepAlivePeriodMax))
  1508. }
  1509. // TODO: don't initialize timer when config.DisablePeriodicSshKeepAlive is set
  1510. sshKeepAliveTimer := time.NewTimer(nextSshKeepAlivePeriod())
  1511. if tunnel.config.DisablePeriodicSshKeepAlive {
  1512. sshKeepAliveTimer.Stop()
  1513. } else {
  1514. defer sshKeepAliveTimer.Stop()
  1515. }
  1516. // Perform network requests in separate goroutines so as not to block
  1517. // other operations.
  1518. requestsWaitGroup := new(sync.WaitGroup)
  1519. requestsWaitGroup.Add(1)
  1520. signalStatusRequest := make(chan struct{})
  1521. go func() {
  1522. defer requestsWaitGroup.Done()
  1523. for range signalStatusRequest {
  1524. sendStats(tunnel)
  1525. }
  1526. }()
  1527. requestsWaitGroup.Add(1)
  1528. signalPeriodicSshKeepAlive := make(chan time.Duration)
  1529. sshKeepAliveError := make(chan error, 1)
  1530. go func() {
  1531. defer requestsWaitGroup.Done()
  1532. isFirstPeriodicKeepAlive := true
  1533. for timeout := range signalPeriodicSshKeepAlive {
  1534. bytesUp := atomic.LoadInt64(&totalSent)
  1535. bytesDown := atomic.LoadInt64(&totalReceived)
  1536. err := tunnel.sendSshKeepAlive(
  1537. isFirstPeriodicKeepAlive, false, timeout, bytesUp, bytesDown)
  1538. if err != nil {
  1539. select {
  1540. case sshKeepAliveError <- err:
  1541. default:
  1542. }
  1543. }
  1544. isFirstPeriodicKeepAlive = false
  1545. }
  1546. }()
  1547. // Probe-type SSH keep alives have a distinct send worker and may be sent
  1548. // concurrently, to ensure a long period keep alive timeout doesn't delay
  1549. // failed tunnel detection.
  1550. requestsWaitGroup.Add(1)
  1551. signalProbeSshKeepAlive := make(chan time.Duration)
  1552. go func() {
  1553. defer requestsWaitGroup.Done()
  1554. for timeout := range signalProbeSshKeepAlive {
  1555. bytesUp := atomic.LoadInt64(&totalSent)
  1556. bytesDown := atomic.LoadInt64(&totalReceived)
  1557. err := tunnel.sendSshKeepAlive(
  1558. false, true, timeout, bytesUp, bytesDown)
  1559. if err != nil {
  1560. select {
  1561. case sshKeepAliveError <- err:
  1562. default:
  1563. }
  1564. }
  1565. }
  1566. }()
  1567. shutdown := false
  1568. var err error
  1569. for !shutdown && err == nil {
  1570. select {
  1571. case <-noticeBytesTransferredTicker.C:
  1572. sent, received := transferstats.ReportRecentBytesTransferredForServer(
  1573. tunnel.dialParams.ServerEntry.IpAddress)
  1574. if received > 0 {
  1575. lastBytesReceivedTime = time.Now()
  1576. }
  1577. bytesUp := atomic.AddInt64(&totalSent, sent)
  1578. bytesDown := atomic.AddInt64(&totalReceived, received)
  1579. p := tunnel.getCustomParameters()
  1580. noticePeriod := p.Duration(parameters.TotalBytesTransferredNoticePeriod)
  1581. doEmitMemoryMetrics := p.Bool(parameters.TotalBytesTransferredEmitMemoryMetrics)
  1582. replayTargetUpstreamBytes := p.Int(parameters.ReplayTargetUpstreamBytes)
  1583. replayTargetDownstreamBytes := p.Int(parameters.ReplayTargetDownstreamBytes)
  1584. replayTargetTunnelDuration := p.Duration(parameters.ReplayTargetTunnelDuration)
  1585. if lastTotalBytesTransferedTime.Add(noticePeriod).Before(time.Now()) {
  1586. NoticeTotalBytesTransferred(
  1587. tunnel.dialParams.ServerEntry.GetDiagnosticID(), bytesUp, bytesDown)
  1588. if doEmitMemoryMetrics {
  1589. emitMemoryMetrics()
  1590. }
  1591. lastTotalBytesTransferedTime = time.Now()
  1592. }
  1593. // Only emit the frequent BytesTransferred notice when tunnel is not idle.
  1594. if tunnel.config.EmitBytesTransferred && (sent > 0 || received > 0) {
  1595. NoticeBytesTransferred(
  1596. tunnel.dialParams.ServerEntry.GetDiagnosticID(), sent, received)
  1597. }
  1598. // Once the tunnel has connected, activated, successfully transmitted the
  1599. // targeted number of bytes, and been up for the targeted duration
  1600. // (measured from the end of establishment), store its dial parameters for
  1601. // subsequent replay.
  1602. //
  1603. // Even when target bytes and duration are all 0, the tunnel must remain up
  1604. // for at least 1 second due to use of noticeBytesTransferredTicker; for
  1605. // the same reason the granularity of ReplayTargetTunnelDuration is
  1606. // seconds.
  1607. if !setDialParamsSucceeded &&
  1608. bytesUp >= int64(replayTargetUpstreamBytes) &&
  1609. bytesDown >= int64(replayTargetDownstreamBytes) &&
  1610. time.Since(tunnel.establishedTime) >= replayTargetTunnelDuration {
  1611. tunnel.dialParams.Succeeded()
  1612. setDialParamsSucceeded = true
  1613. }
  1614. case <-statsTimer.C:
  1615. select {
  1616. case signalStatusRequest <- struct{}{}:
  1617. default:
  1618. }
  1619. statsTimer.Reset(nextStatusRequestPeriod())
  1620. case <-sshKeepAliveTimer.C:
  1621. p := tunnel.getCustomParameters()
  1622. inactivePeriod := p.Duration(parameters.SSHKeepAlivePeriodicInactivePeriod)
  1623. if lastBytesReceivedTime.Add(inactivePeriod).Before(time.Now()) {
  1624. timeout := p.Duration(parameters.SSHKeepAlivePeriodicTimeout)
  1625. select {
  1626. case signalPeriodicSshKeepAlive <- timeout:
  1627. default:
  1628. }
  1629. }
  1630. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  1631. case <-tunnel.signalPortForwardFailure:
  1632. // Note: no mutex on portForwardFailureTotal; only referenced here
  1633. tunnel.totalPortForwardFailures++
  1634. NoticeInfo("port forward failures for %s: %d",
  1635. tunnel.dialParams.ServerEntry.GetDiagnosticID(),
  1636. tunnel.totalPortForwardFailures)
  1637. // If the underlying Conn has closed (meek and other plugin protocols may
  1638. // close themselves in certain error conditions), the tunnel has certainly
  1639. // failed. Otherwise, probe with an SSH keep alive.
  1640. //
  1641. // TODO: the IsClosed case omits the failed tunnel logging and reset
  1642. // actions performed by sendSshKeepAlive. Should self-closing protocols
  1643. // perform these actions themselves?
  1644. if tunnel.conn.IsClosed() {
  1645. err = errors.TraceNew("underlying conn is closed")
  1646. } else {
  1647. p := tunnel.getCustomParameters()
  1648. inactivePeriod := p.Duration(parameters.SSHKeepAliveProbeInactivePeriod)
  1649. if lastBytesReceivedTime.Add(inactivePeriod).Before(time.Now()) {
  1650. timeout := p.Duration(parameters.SSHKeepAliveProbeTimeout)
  1651. select {
  1652. case signalProbeSshKeepAlive <- timeout:
  1653. default:
  1654. }
  1655. }
  1656. if !tunnel.config.DisablePeriodicSshKeepAlive {
  1657. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  1658. }
  1659. }
  1660. case err = <-sshKeepAliveError:
  1661. case serverRequest := <-tunnel.sshServerRequests:
  1662. if serverRequest != nil {
  1663. HandleServerRequest(tunnelOwner, tunnel, serverRequest)
  1664. }
  1665. case <-tunnel.operateCtx.Done():
  1666. shutdown = true
  1667. }
  1668. }
  1669. close(signalPeriodicSshKeepAlive)
  1670. close(signalProbeSshKeepAlive)
  1671. close(signalStatusRequest)
  1672. requestsWaitGroup.Wait()
  1673. // Capture bytes transferred since the last noticeBytesTransferredTicker tick
  1674. sent, received := transferstats.ReportRecentBytesTransferredForServer(
  1675. tunnel.dialParams.ServerEntry.IpAddress)
  1676. bytesUp := atomic.AddInt64(&totalSent, sent)
  1677. bytesDown := atomic.AddInt64(&totalReceived, received)
  1678. // Always emit a final NoticeTotalBytesTransferred
  1679. NoticeTotalBytesTransferred(
  1680. tunnel.dialParams.ServerEntry.GetDiagnosticID(), bytesUp, bytesDown)
  1681. if err == nil {
  1682. NoticeInfo("shutdown operate tunnel")
  1683. // This commanded shutdown case is initiated by Tunnel.Close, which will
  1684. // wait up to parameters.TunnelOperateShutdownTimeout to allow the following
  1685. // requests to complete.
  1686. // Send a final status request in order to report any outstanding persistent
  1687. // stats and domain bytes transferred as soon as possible.
  1688. sendStats(tunnel)
  1689. // The controller connectedReporter may have initiated a connected request
  1690. // concurrent to this commanded shutdown. SetInFlightConnectedRequest
  1691. // ensures that a connected request doesn't start after the commanded
  1692. // shutdown. AwaitInFlightConnectedRequest blocks until any in flight
  1693. // request completes or is aborted after TunnelOperateShutdownTimeout.
  1694. //
  1695. // As any connected request is performed by a concurrent goroutine,
  1696. // sendStats is called first and AwaitInFlightConnectedRequest second.
  1697. tunnel.AwaitInFlightConnectedRequest()
  1698. } else {
  1699. NoticeWarning("operate tunnel error for %s: %s",
  1700. tunnel.dialParams.ServerEntry.GetDiagnosticID(), err)
  1701. tunnelOwner.SignalTunnelFailure(tunnel)
  1702. }
  1703. }
  1704. // sendSshKeepAlive is a helper which sends a keepalive@openssh.com request
  1705. // on the specified SSH connections and returns true of the request succeeds
  1706. // within a specified timeout. If the request fails, the associated conn is
  1707. // closed, which will terminate the associated tunnel.
  1708. func (tunnel *Tunnel) sendSshKeepAlive(
  1709. isFirstPeriodicKeepAlive bool,
  1710. isProbeKeepAlive bool,
  1711. timeout time.Duration,
  1712. bytesUp int64,
  1713. bytesDown int64) error {
  1714. p := tunnel.getCustomParameters()
  1715. // Random padding to frustrate fingerprinting.
  1716. request := prng.Padding(
  1717. p.Int(parameters.SSHKeepAlivePaddingMinBytes),
  1718. p.Int(parameters.SSHKeepAlivePaddingMaxBytes))
  1719. speedTestSample := isFirstPeriodicKeepAlive
  1720. if !speedTestSample {
  1721. speedTestSample = p.WeightedCoinFlip(
  1722. parameters.SSHKeepAliveSpeedTestSampleProbability)
  1723. }
  1724. networkConnectivityPollPeriod := p.Duration(
  1725. parameters.SSHKeepAliveNetworkConnectivityPollingPeriod)
  1726. resetOnFailure := p.WeightedCoinFlip(
  1727. parameters.SSHKeepAliveResetOnFailureProbability)
  1728. p.Close()
  1729. // Note: there is no request context since SSH requests cannot be interrupted
  1730. // directly. Closing the tunnel will interrupt the request. A timeout is set
  1731. // to unblock this function, but the goroutine may not exit until the tunnel
  1732. // is closed.
  1733. // Use a buffer of 1 as there are two senders and only one guaranteed receive.
  1734. errChannel := make(chan error, 1)
  1735. afterFunc := time.AfterFunc(timeout, func() {
  1736. errChannel <- errors.TraceNew("timed out")
  1737. })
  1738. defer afterFunc.Stop()
  1739. go func() {
  1740. startTime := time.Now()
  1741. // Note: reading a reply is important for last-received-time tunnel
  1742. // duration calculation.
  1743. requestOk, response, err := tunnel.sshClient.SendRequest(
  1744. "keepalive@openssh.com", true, request)
  1745. elapsedTime := time.Since(startTime)
  1746. errChannel <- err
  1747. success := (err == nil && requestOk)
  1748. if success && isProbeKeepAlive {
  1749. NoticeInfo("Probe SSH keep-alive RTT: %s", elapsedTime)
  1750. }
  1751. // Record the keep alive round trip as a speed test sample. The first
  1752. // periodic keep alive is always recorded, as many tunnels are short-lived
  1753. // and we want to ensure that some data is gathered. Subsequent keep alives
  1754. // are recorded with some configurable probability, which, considering that
  1755. // only the last SpeedTestMaxSampleCount samples are retained, enables
  1756. // tuning the sampling frequency.
  1757. if success && speedTestSample {
  1758. err = tactics.AddSpeedTestSample(
  1759. tunnel.config.GetParameters(),
  1760. GetTacticsStorer(tunnel.config),
  1761. tunnel.config.GetNetworkID(),
  1762. tunnel.dialParams.ServerEntry.Region,
  1763. tunnel.dialParams.TunnelProtocol,
  1764. elapsedTime,
  1765. request,
  1766. response)
  1767. if err != nil {
  1768. NoticeWarning("AddSpeedTestSample failed: %s", errors.Trace(err))
  1769. }
  1770. }
  1771. }()
  1772. // While awaiting the response, poll the network connectivity state. If there
  1773. // is network connectivity, on the same network, for the entire duration of
  1774. // the keep alive request and the request fails, record a failed tunnel
  1775. // event.
  1776. //
  1777. // The network connectivity heuristic is intended to reduce the number of
  1778. // failed tunnels reported due to routine situations such as varying mobile
  1779. // network conditions. The polling may produce false positives if the network
  1780. // goes down and up between polling periods, or changes to a new network and
  1781. // back to the previous network between polling periods.
  1782. //
  1783. // For platforms that don't provide a NetworkConnectivityChecker, it is
  1784. // assumed that there is network connectivity.
  1785. //
  1786. // The approximate number of tunneled bytes successfully sent and received is
  1787. // recorded in the failed tunnel event as a quality indicator.
  1788. ticker := time.NewTicker(networkConnectivityPollPeriod)
  1789. defer ticker.Stop()
  1790. continuousNetworkConnectivity := true
  1791. networkID := tunnel.config.GetNetworkID()
  1792. var err error
  1793. loop:
  1794. for {
  1795. select {
  1796. case err = <-errChannel:
  1797. break loop
  1798. case <-ticker.C:
  1799. connectivityChecker := tunnel.config.NetworkConnectivityChecker
  1800. if (connectivityChecker != nil &&
  1801. connectivityChecker.HasNetworkConnectivity() != 1) ||
  1802. (networkID != tunnel.config.GetNetworkID()) {
  1803. continuousNetworkConnectivity = false
  1804. }
  1805. }
  1806. }
  1807. err = errors.Trace(err)
  1808. if err != nil {
  1809. tunnel.sshClient.Close()
  1810. tunnel.conn.Close()
  1811. // Don't perform log or reset actions when the keep alive may have been
  1812. // interrupted due to shutdown.
  1813. isShutdown := false
  1814. select {
  1815. case <-tunnel.operateCtx.Done():
  1816. isShutdown = true
  1817. default:
  1818. }
  1819. // Ensure that at most one of the two SSH keep alive workers (periodic and
  1820. // probe) perform the log and reset actions.
  1821. wasHandled := atomic.CompareAndSwapInt32(&tunnel.handledSSHKeepAliveFailure, 0, 1)
  1822. if continuousNetworkConnectivity &&
  1823. !isShutdown &&
  1824. !wasHandled {
  1825. _ = RecordFailedTunnelStat(
  1826. tunnel.config,
  1827. tunnel.dialParams,
  1828. tunnel.livenessTestMetrics,
  1829. bytesUp,
  1830. bytesDown,
  1831. err)
  1832. if tunnel.extraFailureAction != nil {
  1833. tunnel.extraFailureAction()
  1834. }
  1835. // SSHKeepAliveResetOnFailureProbability is set when a late-lifecycle
  1836. // impaired protocol attack is suspected. With the given probability, reset
  1837. // server affinity and replay parameters for this server to avoid
  1838. // continuously reconnecting to the server and/or using the same protocol
  1839. // and dial parameters.
  1840. if resetOnFailure {
  1841. NoticeInfo("Delete dial parameters for %s", tunnel.dialParams.ServerEntry.GetDiagnosticID())
  1842. err := DeleteDialParameters(tunnel.dialParams.ServerEntry.IpAddress, tunnel.dialParams.NetworkID)
  1843. if err != nil {
  1844. NoticeWarning("DeleteDialParameters failed: %s", err)
  1845. }
  1846. NoticeInfo("Delete server affinity for %s", tunnel.dialParams.ServerEntry.GetDiagnosticID())
  1847. err = DeleteServerEntryAffinity(tunnel.dialParams.ServerEntry.IpAddress)
  1848. if err != nil {
  1849. NoticeWarning("DeleteServerEntryAffinity failed: %s", err)
  1850. }
  1851. }
  1852. }
  1853. }
  1854. return err
  1855. }
  1856. // sendStats is a helper for sending session stats to the server.
  1857. func sendStats(tunnel *Tunnel) bool {
  1858. // Tunnel does not have a serverContext when DisableApi is set
  1859. if tunnel.serverContext == nil {
  1860. return true
  1861. }
  1862. // Skip when tunnel is discarded
  1863. if tunnel.IsDiscarded() {
  1864. return true
  1865. }
  1866. err := tunnel.serverContext.DoStatusRequest(tunnel)
  1867. if err != nil {
  1868. NoticeWarning("DoStatusRequest failed for %s: %s",
  1869. tunnel.dialParams.ServerEntry.GetDiagnosticID(), err)
  1870. }
  1871. return err == nil
  1872. }