tunnel.go 76 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "context"
  23. "crypto/rand"
  24. "encoding/base64"
  25. "encoding/json"
  26. std_errors "errors"
  27. "fmt"
  28. "io"
  29. "io/ioutil"
  30. "math"
  31. "net"
  32. "net/http"
  33. "slices"
  34. "strconv"
  35. "sync"
  36. "sync/atomic"
  37. "time"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/ssh"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/fragmentor"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy"
  43. inproxy_dtls "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy/dtls"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
  45. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  46. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  47. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  48. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  49. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/refraction"
  50. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
  51. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/wildcard"
  52. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/transferstats"
  53. "github.com/fxamacker/cbor/v2"
  54. )
  55. // Tunneler specifies the interface required by components that use tunnels.
  56. type Tunneler interface {
  57. // Dial creates a tunneled connection.
  58. //
  59. // When split tunnel mode is enabled, the connection may be untunneled,
  60. // depending on GeoIP classification of the destination.
  61. //
  62. // downstreamConn is an optional parameter which specifies a connection to be
  63. // explicitly closed when the Dialed connection is closed. For instance, this
  64. // is used to close downstreamConn App<->LocalProxy connections when the related
  65. // LocalProxy<->SshPortForward connections close.
  66. Dial(remoteAddr string, downstreamConn net.Conn) (conn net.Conn, err error)
  67. DirectDial(remoteAddr string) (conn net.Conn, err error)
  68. SignalComponentFailure()
  69. }
  70. // TunnelOwner specifies the interface required by Tunnel to notify its
  71. // owner when it has failed. The owner may, as in the case of the Controller,
  72. // remove the tunnel from its list of active tunnels.
  73. type TunnelOwner interface {
  74. SignalSeededNewSLOK()
  75. SignalTunnelFailure(tunnel *Tunnel)
  76. }
  77. // Tunnel is a connection to a Psiphon server. An established
  78. // tunnel includes a network connection to the specified server
  79. // and an SSH session built on top of that transport.
  80. type Tunnel struct {
  81. mutex *sync.Mutex
  82. config *Config
  83. isActivated bool
  84. isStatusReporter bool
  85. isDiscarded bool
  86. isClosed bool
  87. dialParams *DialParameters
  88. livenessTestMetrics *livenessTestMetrics
  89. extraFailureAction func()
  90. serverContext *ServerContext
  91. monitoringStartTime time.Time
  92. conn *common.BurstMonitoredConn
  93. sshClient *ssh.Client
  94. sshServerRequests <-chan *ssh.Request
  95. operateWaitGroup *sync.WaitGroup
  96. operateCtx context.Context
  97. stopOperate context.CancelFunc
  98. signalPortForwardFailure chan struct{}
  99. totalPortForwardFailures int
  100. adjustedEstablishStartTime time.Time
  101. establishDuration time.Duration
  102. establishedTime time.Time
  103. handledSSHKeepAliveFailure int32
  104. inFlightConnectedRequestSignal chan struct{}
  105. }
  106. // getCustomParameters helpers wrap the verbose function call chain required
  107. // to get a current snapshot of the parameters.Parameters customized with the
  108. // dial parameters associated with a tunnel.
  109. func (tunnel *Tunnel) getCustomParameters() parameters.ParametersAccessor {
  110. return getCustomParameters(tunnel.config, tunnel.dialParams)
  111. }
  112. func getCustomParameters(
  113. config *Config, dialParams *DialParameters) parameters.ParametersAccessor {
  114. return config.GetParameters().GetCustom(dialParams.NetworkLatencyMultiplier)
  115. }
  116. // ConnectTunnel first makes a network transport connection to the
  117. // Psiphon server and then establishes an SSH client session on top of
  118. // that transport. The SSH server is authenticated using the public
  119. // key in the server entry.
  120. // Depending on the server's capabilities, the connection may use
  121. // plain SSH over TCP, obfuscated SSH over TCP, or obfuscated SSH over
  122. // HTTP (meek protocol).
  123. // When requiredProtocol is not blank, that protocol is used. Otherwise,
  124. // the a random supported protocol is used.
  125. //
  126. // Call Activate on a connected tunnel to complete its establishment
  127. // before using.
  128. //
  129. // Tunnel establishment is split into two phases: connection, and
  130. // activation. The Controller will run many ConnectTunnel calls
  131. // concurrently and then, to avoid unnecessary overhead from making
  132. // handshake requests and starting operateTunnel from tunnels which
  133. // may be discarded, call Activate on connected tunnels sequentially
  134. // as necessary.
  135. func ConnectTunnel(
  136. ctx context.Context,
  137. config *Config,
  138. adjustedEstablishStartTime time.Time,
  139. dialParams *DialParameters) (*Tunnel, error) {
  140. // Build transport layers and establish SSH connection. Note that
  141. // dialConn and monitoredConn are the same network connection.
  142. dialResult, err := dialTunnel(ctx, config, dialParams)
  143. if err != nil {
  144. return nil, errors.Trace(err)
  145. }
  146. // The tunnel is now connected
  147. return &Tunnel{
  148. mutex: new(sync.Mutex),
  149. config: config,
  150. dialParams: dialParams,
  151. livenessTestMetrics: dialResult.livenessTestMetrics,
  152. extraFailureAction: dialResult.extraFailureAction,
  153. monitoringStartTime: dialResult.monitoringStartTime,
  154. conn: dialResult.monitoredConn,
  155. sshClient: dialResult.sshClient,
  156. sshServerRequests: dialResult.sshRequests,
  157. // A buffer allows at least one signal to be sent even when the receiver is
  158. // not listening. Senders should not block.
  159. signalPortForwardFailure: make(chan struct{}, 1),
  160. adjustedEstablishStartTime: adjustedEstablishStartTime,
  161. }, nil
  162. }
  163. // Activate completes the tunnel establishment, performing the handshake
  164. // request and starting operateTunnel, the worker that monitors the tunnel
  165. // and handles periodic management.
  166. func (tunnel *Tunnel) Activate(
  167. ctx context.Context,
  168. tunnelOwner TunnelOwner,
  169. isStatusReporter bool) (retErr error) {
  170. // Ensure that, unless the base context is cancelled, any replayed dial
  171. // parameters are cleared, no longer to be retried, if the tunnel fails to
  172. // activate.
  173. activationSucceeded := false
  174. baseCtx := ctx
  175. defer func() {
  176. if !activationSucceeded && baseCtx.Err() != context.Canceled {
  177. tunnel.dialParams.Failed(tunnel.config, retErr)
  178. if tunnel.extraFailureAction != nil {
  179. tunnel.extraFailureAction()
  180. }
  181. if retErr != nil {
  182. _ = RecordFailedTunnelStat(
  183. tunnel.config,
  184. tunnel.dialParams,
  185. tunnel.livenessTestMetrics,
  186. -1,
  187. -1,
  188. retErr)
  189. }
  190. }
  191. }()
  192. // Create a new Psiphon API server context for this tunnel. This includes
  193. // performing a handshake request. If the handshake fails, this activation
  194. // fails.
  195. var serverContext *ServerContext
  196. if !tunnel.config.DisableApi {
  197. NoticeInfo(
  198. "starting server context for %s",
  199. tunnel.dialParams.ServerEntry.GetDiagnosticID())
  200. // Call NewServerContext in a goroutine, as it blocks on a network operation,
  201. // the handshake request, and would block shutdown. If the shutdown signal is
  202. // received, close the tunnel, which will interrupt the handshake request
  203. // that may be blocking NewServerContext.
  204. //
  205. // Timeout after PsiphonApiServerTimeoutSeconds. NewServerContext may not
  206. // return if the tunnel network connection is unstable during the handshake
  207. // request. At this point, there is no operateTunnel monitor that will detect
  208. // this condition with SSH keep alives.
  209. doInproxy := protocol.TunnelProtocolUsesInproxy(tunnel.dialParams.TunnelProtocol)
  210. var timeoutParameter string
  211. if doInproxy {
  212. // Optionally allow more time in case the broker/server relay
  213. // requires additional round trips to establish a new session.
  214. timeoutParameter = parameters.InproxyPsiphonAPIRequestTimeout
  215. } else {
  216. timeoutParameter = parameters.PsiphonAPIRequestTimeout
  217. }
  218. timeout := tunnel.getCustomParameters().Duration(timeoutParameter)
  219. var handshakeCtx context.Context
  220. var cancelFunc context.CancelFunc
  221. if timeout > 0 {
  222. handshakeCtx, cancelFunc = context.WithTimeout(ctx, timeout)
  223. } else {
  224. handshakeCtx, cancelFunc = context.WithCancel(ctx)
  225. }
  226. type newServerContextResult struct {
  227. serverContext *ServerContext
  228. err error
  229. }
  230. resultChannel := make(chan newServerContextResult)
  231. wg := new(sync.WaitGroup)
  232. if doInproxy {
  233. // Launch a handler to handle broker/server relay SSH requests,
  234. // which will occur when the broker needs to establish a new
  235. // session with the server.
  236. wg.Add(1)
  237. go func() {
  238. defer wg.Done()
  239. notice := true
  240. for {
  241. select {
  242. case serverRequest := <-tunnel.sshServerRequests:
  243. if serverRequest != nil {
  244. if serverRequest.Type == protocol.PSIPHON_API_INPROXY_RELAY_REQUEST_NAME {
  245. if notice {
  246. NoticeInfo(
  247. "relaying inproxy broker packets for %s",
  248. tunnel.dialParams.ServerEntry.GetDiagnosticID())
  249. notice = false
  250. }
  251. err := tunnel.relayInproxyPacketRoundTrip(handshakeCtx, serverRequest)
  252. if err != nil {
  253. NoticeWarning(
  254. "relay inproxy broker packets failed: %v",
  255. errors.Trace(err))
  256. // Continue
  257. }
  258. } else {
  259. // There's a potential race condition in which
  260. // post-handshake SSH requests, such as OSL or
  261. // alert requests, arrive to this handler instead
  262. // of operateTunnel, so invoke HandleServerRequest here.
  263. HandleServerRequest(tunnelOwner, tunnel, serverRequest)
  264. }
  265. }
  266. case <-handshakeCtx.Done():
  267. return
  268. }
  269. }
  270. }()
  271. }
  272. wg.Add(1)
  273. go func() {
  274. defer wg.Done()
  275. serverContext, err := NewServerContext(tunnel)
  276. resultChannel <- newServerContextResult{
  277. serverContext: serverContext,
  278. err: err,
  279. }
  280. }()
  281. var result newServerContextResult
  282. select {
  283. case result = <-resultChannel:
  284. case <-handshakeCtx.Done():
  285. result.err = handshakeCtx.Err()
  286. // Interrupt the goroutine
  287. tunnel.Close(true)
  288. <-resultChannel
  289. }
  290. cancelFunc()
  291. wg.Wait()
  292. if result.err != nil {
  293. return errors.Trace(result.err)
  294. }
  295. serverContext = result.serverContext
  296. }
  297. // The activation succeeded.
  298. activationSucceeded = true
  299. tunnel.mutex.Lock()
  300. // It may happen that the tunnel gets closed while Activate is running.
  301. // In this case, abort here, to ensure that the operateTunnel goroutine
  302. // will not be launched after Close is called.
  303. if tunnel.isClosed {
  304. return errors.TraceNew("tunnel is closed")
  305. }
  306. tunnel.isActivated = true
  307. tunnel.isStatusReporter = isStatusReporter
  308. tunnel.serverContext = serverContext
  309. // establishDuration is the elapsed time between the controller starting tunnel
  310. // establishment and this tunnel being established. The reported value represents
  311. // how long the user waited between starting the client and having a usable tunnel;
  312. // or how long between the client detecting an unexpected tunnel disconnect and
  313. // completing automatic reestablishment.
  314. //
  315. // This time period may include time spent unsuccessfully connecting to other
  316. // servers. Time spent waiting for network connectivity is excluded.
  317. tunnel.establishDuration = time.Since(tunnel.adjustedEstablishStartTime)
  318. tunnel.establishedTime = time.Now()
  319. // Use the Background context instead of the controller run context, as tunnels
  320. // are terminated when the controller calls tunnel.Close.
  321. tunnel.operateCtx, tunnel.stopOperate = context.WithCancel(context.Background())
  322. tunnel.operateWaitGroup = new(sync.WaitGroup)
  323. // Spawn the operateTunnel goroutine, which monitors the tunnel and handles periodic
  324. // stats updates.
  325. tunnel.operateWaitGroup.Add(1)
  326. go tunnel.operateTunnel(tunnelOwner)
  327. tunnel.mutex.Unlock()
  328. return nil
  329. }
  330. func (tunnel *Tunnel) relayInproxyPacketRoundTrip(
  331. ctx context.Context, request *ssh.Request) (retErr error) {
  332. defer func() {
  333. if retErr != nil {
  334. _ = request.Reply(false, nil)
  335. }
  336. }()
  337. // Continue the broker/server relay started in handshake round trip.
  338. // server -> broker
  339. var relayRequest protocol.InproxyRelayRequest
  340. err := cbor.Unmarshal(request.Payload, &relayRequest)
  341. if err != nil {
  342. return errors.Trace(err)
  343. }
  344. inproxyConn := tunnel.dialParams.inproxyConn.Load().(*inproxy.ClientConn)
  345. if inproxyConn == nil {
  346. return errors.TraceNew("missing inproxyConn")
  347. }
  348. responsePacket, err := inproxyConn.RelayPacket(ctx, relayRequest.Packet)
  349. if err != nil {
  350. return errors.Trace(err)
  351. }
  352. // RelayPacket may return a nil packet when the relay is complete.
  353. if responsePacket == nil {
  354. return nil
  355. }
  356. // broker -> server
  357. relayResponse := &protocol.InproxyRelayResponse{
  358. Packet: responsePacket,
  359. }
  360. responsePayload, err := protocol.CBOREncoding.Marshal(relayResponse)
  361. if err != nil {
  362. return errors.Trace(err)
  363. }
  364. err = request.Reply(true, responsePayload)
  365. if err != nil {
  366. return errors.Trace(err)
  367. }
  368. return nil
  369. }
  370. // Close stops operating the tunnel and closes the underlying connection.
  371. // Supports multiple and/or concurrent calls to Close().
  372. // When isDiscarded is set, operateTunnel will not attempt to send final
  373. // status requests.
  374. func (tunnel *Tunnel) Close(isDiscarded bool) {
  375. tunnel.mutex.Lock()
  376. tunnel.isDiscarded = isDiscarded
  377. isActivated := tunnel.isActivated
  378. isClosed := tunnel.isClosed
  379. tunnel.isClosed = true
  380. tunnel.mutex.Unlock()
  381. if !isClosed {
  382. // Signal operateTunnel to stop before closing the tunnel -- this
  383. // allows a final status request to be made in the case of an orderly
  384. // shutdown.
  385. // A timer is set, so if operateTunnel takes too long to stop, the
  386. // tunnel is closed, which will interrupt any slow final status request.
  387. if isActivated {
  388. timeout := tunnel.getCustomParameters().Duration(
  389. parameters.TunnelOperateShutdownTimeout)
  390. afterFunc := time.AfterFunc(
  391. timeout,
  392. func() { tunnel.conn.Close() })
  393. tunnel.stopOperate()
  394. tunnel.operateWaitGroup.Wait()
  395. afterFunc.Stop()
  396. }
  397. tunnel.sshClient.Close()
  398. // tunnel.conn.Close() may get called multiple times, which is allowed.
  399. tunnel.conn.Close()
  400. err := tunnel.sshClient.Wait()
  401. if err != nil {
  402. NoticeWarning("close tunnel ssh error: %s", err)
  403. }
  404. }
  405. // Log burst metrics now that the BurstMonitoredConn is closed.
  406. // Metrics will be empty when burst monitoring is disabled.
  407. if !isDiscarded && isActivated {
  408. burstMetrics := tunnel.conn.GetMetrics(tunnel.monitoringStartTime)
  409. if len(burstMetrics) > 0 {
  410. NoticeBursts(
  411. tunnel.dialParams.ServerEntry.GetDiagnosticID(),
  412. burstMetrics)
  413. }
  414. }
  415. }
  416. // SetInFlightConnectedRequest checks if a connected request can begin and
  417. // sets the channel used to signal that the request is complete.
  418. //
  419. // The caller must not initiate a connected request when
  420. // SetInFlightConnectedRequest returns false. When SetInFlightConnectedRequest
  421. // returns true, the caller must call SetInFlightConnectedRequest(nil) when
  422. // the connected request completes.
  423. func (tunnel *Tunnel) SetInFlightConnectedRequest(requestSignal chan struct{}) bool {
  424. tunnel.mutex.Lock()
  425. defer tunnel.mutex.Unlock()
  426. // If already closing, don't start a connected request: the
  427. // TunnelOperateShutdownTimeout period may be nearly expired.
  428. if tunnel.isClosed {
  429. return false
  430. }
  431. if requestSignal == nil {
  432. // Not already in-flight (not expected)
  433. if tunnel.inFlightConnectedRequestSignal == nil {
  434. return false
  435. }
  436. } else {
  437. // Already in-flight (not expected)
  438. if tunnel.inFlightConnectedRequestSignal != nil {
  439. return false
  440. }
  441. }
  442. tunnel.inFlightConnectedRequestSignal = requestSignal
  443. return true
  444. }
  445. // AwaitInFlightConnectedRequest waits for the signal that any in-flight
  446. // connected request is complete.
  447. //
  448. // AwaitInFlightConnectedRequest may block until the connected request is
  449. // aborted by terminating the tunnel.
  450. func (tunnel *Tunnel) AwaitInFlightConnectedRequest() {
  451. tunnel.mutex.Lock()
  452. requestSignal := tunnel.inFlightConnectedRequestSignal
  453. tunnel.mutex.Unlock()
  454. if requestSignal != nil {
  455. <-requestSignal
  456. }
  457. }
  458. // IsActivated returns the tunnel's activated flag.
  459. func (tunnel *Tunnel) IsActivated() bool {
  460. tunnel.mutex.Lock()
  461. defer tunnel.mutex.Unlock()
  462. return tunnel.isActivated
  463. }
  464. // IsDiscarded returns the tunnel's discarded flag.
  465. func (tunnel *Tunnel) IsDiscarded() bool {
  466. tunnel.mutex.Lock()
  467. defer tunnel.mutex.Unlock()
  468. return tunnel.isDiscarded
  469. }
  470. // SendAPIRequest sends an API request as an SSH request through the tunnel.
  471. // This function blocks awaiting a response. Only one request may be in-flight
  472. // at once; a concurrent SendAPIRequest will block until an active request
  473. // receives its response (or the SSH connection is terminated).
  474. func (tunnel *Tunnel) SendAPIRequest(
  475. name string, requestPayload []byte) ([]byte, error) {
  476. ok, responsePayload, err := tunnel.sshClient.Conn.SendRequest(
  477. name, true, requestPayload)
  478. if err != nil {
  479. return nil, errors.Trace(err)
  480. }
  481. if !ok {
  482. return nil, errors.TraceNew("API request rejected")
  483. }
  484. return responsePayload, nil
  485. }
  486. // DialTCPChannel establishes a TCP port forward connection through the
  487. // tunnel.
  488. //
  489. // When split tunnel mode is enabled, and unless alwaysTunneled is set, the
  490. // server may reject the port forward and indicate that the client is to make
  491. // direct, untunneled connection. In this case, the bool return value is true
  492. // and net.Conn and error are nil.
  493. //
  494. // downstreamConn is an optional parameter which specifies a connection to be
  495. // explicitly closed when the dialed connection is closed.
  496. func (tunnel *Tunnel) DialTCPChannel(
  497. remoteAddr string,
  498. alwaysTunneled bool,
  499. downstreamConn net.Conn) (net.Conn, bool, error) {
  500. channelType := "direct-tcpip"
  501. if alwaysTunneled && tunnel.config.IsSplitTunnelEnabled() {
  502. // This channel type is only necessary in split tunnel mode.
  503. channelType = protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE
  504. }
  505. channel, err := tunnel.dialChannel(channelType, remoteAddr)
  506. if err != nil {
  507. if isSplitTunnelRejectReason(err) {
  508. return nil, true, nil
  509. }
  510. return nil, false, errors.Trace(err)
  511. }
  512. netConn, ok := channel.(net.Conn)
  513. if !ok {
  514. return nil, false, errors.Tracef("unexpected channel type: %T", channel)
  515. }
  516. conn := &TunneledConn{
  517. Conn: netConn,
  518. tunnel: tunnel,
  519. downstreamConn: downstreamConn}
  520. return tunnel.wrapWithTransferStats(conn), false, nil
  521. }
  522. func (tunnel *Tunnel) DialPacketTunnelChannel() (net.Conn, error) {
  523. channel, err := tunnel.dialChannel(protocol.PACKET_TUNNEL_CHANNEL_TYPE, "")
  524. if err != nil {
  525. return nil, errors.Trace(err)
  526. }
  527. sshChannel, ok := channel.(ssh.Channel)
  528. if !ok {
  529. return nil, errors.Tracef("unexpected channel type: %T", channel)
  530. }
  531. NoticeInfo("DialPacketTunnelChannel: established channel")
  532. conn := newChannelConn(sshChannel)
  533. // wrapWithTransferStats will track bytes transferred for the
  534. // packet tunnel. It will count packet overhead (TCP/UDP/IP headers).
  535. //
  536. // Since the data in the channel is not HTTP or TLS, no domain bytes
  537. // counting is expected.
  538. //
  539. // transferstats are also used to determine that there's been recent
  540. // activity and skip periodic SSH keep alives; see Tunnel.operateTunnel.
  541. return tunnel.wrapWithTransferStats(conn), nil
  542. }
  543. func (tunnel *Tunnel) dialChannel(channelType, remoteAddr string) (interface{}, error) {
  544. if !tunnel.IsActivated() {
  545. return nil, errors.TraceNew("tunnel is not activated")
  546. }
  547. // Note: there is no dial context since SSH port forward dials cannot
  548. // be interrupted directly. Closing the tunnel will interrupt the dials.
  549. // A timeout is set to unblock this function, but the goroutine may
  550. // not exit until the tunnel is closed.
  551. type channelDialResult struct {
  552. channel interface{}
  553. err error
  554. }
  555. // Use a buffer of 1 as there are two senders and only one guaranteed receive.
  556. results := make(chan *channelDialResult, 1)
  557. afterFunc := time.AfterFunc(
  558. tunnel.getCustomParameters().Duration(
  559. parameters.TunnelPortForwardDialTimeout),
  560. func() {
  561. results <- &channelDialResult{
  562. nil, errors.Tracef("channel dial timeout: %s", channelType)}
  563. })
  564. defer afterFunc.Stop()
  565. go func() {
  566. result := new(channelDialResult)
  567. switch channelType {
  568. case "direct-tcpip", protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE:
  569. // The protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE is the same as
  570. // "direct-tcpip", except split tunnel channel rejections are disallowed
  571. // even when split tunnel mode is enabled.
  572. result.channel, result.err =
  573. tunnel.sshClient.Dial(channelType, remoteAddr)
  574. default:
  575. var sshRequests <-chan *ssh.Request
  576. result.channel, sshRequests, result.err =
  577. tunnel.sshClient.OpenChannel(channelType, nil)
  578. if result.err == nil {
  579. go ssh.DiscardRequests(sshRequests)
  580. }
  581. }
  582. if result.err != nil {
  583. result.err = errors.Trace(result.err)
  584. }
  585. results <- result
  586. }()
  587. result := <-results
  588. if result.err != nil {
  589. if !isSplitTunnelRejectReason(result.err) {
  590. select {
  591. case tunnel.signalPortForwardFailure <- struct{}{}:
  592. default:
  593. }
  594. }
  595. return nil, errors.Trace(result.err)
  596. }
  597. return result.channel, nil
  598. }
  599. func isSplitTunnelRejectReason(err error) bool {
  600. var openChannelErr *ssh.OpenChannelError
  601. if std_errors.As(err, &openChannelErr) {
  602. return openChannelErr.Reason ==
  603. ssh.RejectionReason(protocol.CHANNEL_REJECT_REASON_SPLIT_TUNNEL)
  604. }
  605. return false
  606. }
  607. func (tunnel *Tunnel) wrapWithTransferStats(conn net.Conn) net.Conn {
  608. // Tunnel does not have a serverContext when DisableApi is set. We still use
  609. // transferstats.Conn to count bytes transferred for monitoring tunnel
  610. // quality.
  611. var regexps *transferstats.Regexps
  612. if tunnel.serverContext != nil {
  613. regexps = tunnel.serverContext.StatsRegexps()
  614. }
  615. return transferstats.NewConn(
  616. conn, tunnel.dialParams.ServerEntry.IpAddress, regexps)
  617. }
  618. // SignalComponentFailure notifies the tunnel that an associated component has failed.
  619. // This will terminate the tunnel.
  620. func (tunnel *Tunnel) SignalComponentFailure() {
  621. NoticeWarning("tunnel received component failure signal")
  622. tunnel.Close(false)
  623. }
  624. // TunneledConn implements net.Conn and wraps a port forward connection.
  625. // It is used to hook into Read and Write to observe I/O errors and
  626. // report these errors back to the tunnel monitor as port forward failures.
  627. // TunneledConn optionally tracks a peer connection to be explicitly closed
  628. // when the TunneledConn is closed.
  629. type TunneledConn struct {
  630. net.Conn
  631. tunnel *Tunnel
  632. downstreamConn net.Conn
  633. }
  634. func (conn *TunneledConn) Read(buffer []byte) (n int, err error) {
  635. n, err = conn.Conn.Read(buffer)
  636. if err != nil && err != io.EOF {
  637. // Report new failure. Won't block; assumes the receiver
  638. // has a sufficient buffer for the threshold number of reports.
  639. // TODO: conditional on type of error or error message?
  640. select {
  641. case conn.tunnel.signalPortForwardFailure <- struct{}{}:
  642. default:
  643. }
  644. }
  645. return
  646. }
  647. func (conn *TunneledConn) Write(buffer []byte) (n int, err error) {
  648. n, err = conn.Conn.Write(buffer)
  649. if err != nil && err != io.EOF {
  650. // Same as TunneledConn.Read()
  651. select {
  652. case conn.tunnel.signalPortForwardFailure <- struct{}{}:
  653. default:
  654. }
  655. }
  656. return
  657. }
  658. func (conn *TunneledConn) Close() error {
  659. if conn.downstreamConn != nil {
  660. conn.downstreamConn.Close()
  661. }
  662. return conn.Conn.Close()
  663. }
  664. type dialResult struct {
  665. dialConn net.Conn
  666. monitoringStartTime time.Time
  667. monitoredConn *common.BurstMonitoredConn
  668. sshClient *ssh.Client
  669. sshRequests <-chan *ssh.Request
  670. livenessTestMetrics *livenessTestMetrics
  671. extraFailureAction func()
  672. }
  673. // dialTunnel is a helper that builds the transport layers and establishes the
  674. // SSH connection. When additional dial configuration is used, dial metrics
  675. // are recorded and returned.
  676. func dialTunnel(
  677. ctx context.Context,
  678. config *Config,
  679. dialParams *DialParameters) (_ *dialResult, retErr error) {
  680. // Return immediately when overall context is canceled or timed-out. This
  681. // avoids notice noise.
  682. err := ctx.Err()
  683. if err != nil {
  684. return nil, errors.Trace(err)
  685. }
  686. p := getCustomParameters(config, dialParams)
  687. timeout := p.Duration(parameters.TunnelConnectTimeout)
  688. rateLimits := p.RateLimits(parameters.TunnelRateLimits)
  689. obfuscatedSSHMinPadding := p.Int(parameters.ObfuscatedSSHMinPadding)
  690. obfuscatedSSHMaxPadding := p.Int(parameters.ObfuscatedSSHMaxPadding)
  691. livenessTestSpec := getLivenessTestSpec(p, dialParams.TunnelProtocol, dialParams.EstablishedTunnelsCount)
  692. burstUpstreamTargetBytes := int64(p.Int(parameters.ClientBurstUpstreamTargetBytes))
  693. burstUpstreamDeadline := p.Duration(parameters.ClientBurstUpstreamDeadline)
  694. burstDownstreamTargetBytes := int64(p.Int(parameters.ClientBurstDownstreamTargetBytes))
  695. burstDownstreamDeadline := p.Duration(parameters.ClientBurstDownstreamDeadline)
  696. tlsOSSHApplyTrafficShaping := p.WeightedCoinFlip(parameters.TLSTunnelTrafficShapingProbability)
  697. tlsOSSHMinTLSPadding := p.Int(parameters.TLSTunnelMinTLSPadding)
  698. tlsOSSHMaxTLSPadding := p.Int(parameters.TLSTunnelMaxTLSPadding)
  699. conjureEnableIPv6Dials := p.Bool(parameters.ConjureEnableIPv6Dials)
  700. conjureEnablePortRandomization := p.Bool(parameters.ConjureEnablePortRandomization)
  701. conjureEnableRegistrationOverrides := p.Bool(parameters.ConjureEnableRegistrationOverrides)
  702. p.Close()
  703. // Ensure that, unless the base context is cancelled, any replayed dial
  704. // parameters are cleared, no longer to be retried, if the tunnel fails to
  705. // connect.
  706. //
  707. // Limitation: dials that fail to connect due to the server being in a
  708. // load-limiting state are not distinguished and excepted from this
  709. // logic.
  710. dialSucceeded := false
  711. baseCtx := ctx
  712. var failedTunnelLivenessTestMetrics *livenessTestMetrics
  713. var extraFailureAction func()
  714. defer func() {
  715. if !dialSucceeded && baseCtx.Err() != context.Canceled {
  716. dialParams.Failed(config, retErr)
  717. if extraFailureAction != nil {
  718. extraFailureAction()
  719. }
  720. if retErr != nil {
  721. _ = RecordFailedTunnelStat(
  722. config,
  723. dialParams,
  724. failedTunnelLivenessTestMetrics,
  725. -1,
  726. -1,
  727. retErr)
  728. }
  729. }
  730. }()
  731. var cancelFunc context.CancelFunc
  732. ctx, cancelFunc = context.WithTimeout(ctx, timeout)
  733. defer cancelFunc()
  734. // DialDuration is the elapsed time for both successful and failed tunnel
  735. // dials. For successful tunnels, it includes any the network protocol
  736. // handshake(s), obfuscation protocol handshake(s), SSH handshake, and
  737. // liveness test, when performed.
  738. //
  739. // Note: ensure DialDuration is set before calling any function which logs
  740. // dial_duration.
  741. startDialTime := time.Now()
  742. defer func() {
  743. dialParams.DialDuration = time.Since(startDialTime)
  744. }()
  745. // Note: dialParams.MeekResolvedIPAddress isn't set until the dial begins,
  746. // so it will always be blank in NoticeConnectingServer.
  747. NoticeConnectingServer(dialParams)
  748. // Create the base transport: meek or direct connection
  749. var dialConn net.Conn
  750. if protocol.TunnelProtocolUsesMeek(dialParams.TunnelProtocol) {
  751. dialConn, err = DialMeek(
  752. ctx,
  753. dialParams.GetMeekConfig(),
  754. dialParams.GetDialConfig())
  755. if err != nil {
  756. return nil, errors.Trace(err)
  757. }
  758. } else if protocol.TunnelProtocolUsesQUIC(dialParams.TunnelProtocol) {
  759. var packetConn net.PacketConn
  760. var remoteAddr *net.UDPAddr
  761. // Special case: explict in-proxy dial. TCP dials wire up in-proxy
  762. // dials via DialConfig and its CustomDialer using
  763. // makeInproxyTCPDialer. common/quic doesn't have an equivilent to
  764. // CustomDialer.
  765. if protocol.TunnelProtocolUsesInproxy(dialParams.TunnelProtocol) {
  766. packetConn, err = dialInproxy(ctx, config, dialParams)
  767. if err != nil {
  768. return nil, errors.Trace(err)
  769. }
  770. // Use the actual 2nd hop destination address as the remote
  771. // address for correct behavior in
  772. // common/quic.getMaxPreDiscoveryPacketSize, which differs for
  773. // IPv4 vs. IPv6 destination addresses; and
  774. // ObfuscatedPacketConn.RemoteAddr. The 2nd hop destination
  775. // address is not actually dialed.
  776. //
  777. // Limitation: for domain destinations, the in-proxy proxy
  778. // resolves the domain, so just assume IPv6, which has lower max
  779. // padding(see quic.getMaxPreDiscoveryPacketSize), and use a stub
  780. // address.
  781. host, portStr, err := net.SplitHostPort(dialParams.DirectDialAddress)
  782. if err != nil {
  783. return nil, errors.Trace(err)
  784. }
  785. port, err := strconv.Atoi(portStr)
  786. if err != nil {
  787. return nil, errors.Trace(err)
  788. }
  789. IP := net.ParseIP(host)
  790. if IP == nil {
  791. IP = net.ParseIP("fd00::")
  792. }
  793. remoteAddr = &net.UDPAddr{IP: IP, Port: port}
  794. } else {
  795. packetConn, remoteAddr, err = NewUDPConn(
  796. ctx, "udp", false, "", dialParams.DirectDialAddress, dialParams.GetDialConfig())
  797. if err != nil {
  798. return nil, errors.Trace(err)
  799. }
  800. }
  801. dialConn, err = quic.Dial(
  802. ctx,
  803. packetConn,
  804. remoteAddr,
  805. dialParams.QUICDialSNIAddress,
  806. dialParams.QUICVersion,
  807. dialParams.QUICClientHelloSeed,
  808. dialParams.ServerEntry.SshObfuscatedKey,
  809. dialParams.ObfuscatedQUICPaddingSeed,
  810. dialParams.ObfuscatedQUICNonceTransformerParameters,
  811. dialParams.QUICDisablePathMTUDiscovery,
  812. dialParams.QUICMaxPacketSizeAdjustment,
  813. dialParams.QUICDialEarly,
  814. dialParams.QUICUseObfuscatedPSK,
  815. dialParams.quicTLSClientSessionCache)
  816. if err != nil {
  817. return nil, errors.Trace(err)
  818. }
  819. } else if protocol.TunnelProtocolUsesTapDance(dialParams.TunnelProtocol) {
  820. dialConn, err = refraction.DialTapDance(
  821. ctx,
  822. config.EmitRefractionNetworkingLogs,
  823. config.GetPsiphonDataDirectory(),
  824. NewRefractionNetworkingDialer(dialParams.GetDialConfig()).DialContext,
  825. dialParams.DirectDialAddress)
  826. if err != nil {
  827. return nil, errors.Trace(err)
  828. }
  829. } else if protocol.TunnelProtocolUsesConjure(dialParams.TunnelProtocol) {
  830. dialConn, extraFailureAction, err = dialConjure(
  831. ctx,
  832. config,
  833. dialParams,
  834. conjureEnableIPv6Dials,
  835. conjureEnablePortRandomization,
  836. conjureEnableRegistrationOverrides)
  837. if err != nil {
  838. return nil, errors.Trace(err)
  839. }
  840. } else if protocol.TunnelProtocolUsesTLSOSSH(dialParams.TunnelProtocol) {
  841. dialConn, err = DialTLSTunnel(
  842. ctx,
  843. dialParams.GetTLSOSSHConfig(config),
  844. dialParams.GetDialConfig(),
  845. tlsOSSHApplyTrafficShaping,
  846. tlsOSSHMinTLSPadding,
  847. tlsOSSHMaxTLSPadding)
  848. if err != nil {
  849. return nil, errors.Trace(err)
  850. }
  851. } else if protocol.TunnelProtocolUsesShadowsocks(dialParams.TunnelProtocol) {
  852. dialConn, err = DialShadowsocksTunnel(
  853. ctx,
  854. dialParams.GetShadowsocksConfig(),
  855. dialParams.GetDialConfig(),
  856. )
  857. if err != nil {
  858. return nil, errors.Trace(err)
  859. }
  860. } else {
  861. // Use NewTCPDialer and don't use DialTCP directly, to ensure that
  862. // dialParams.GetDialConfig()CustomDialer is applied.
  863. tcpDialer := NewTCPDialer(dialParams.GetDialConfig())
  864. dialConn, err = tcpDialer(ctx, "tcp", dialParams.DirectDialAddress)
  865. if err != nil {
  866. return nil, errors.Trace(err)
  867. }
  868. }
  869. // Some conns report additional metrics. fragmentor.Conns report
  870. // fragmentor configs.
  871. if metricsSource, ok := dialConn.(common.MetricsSource); ok {
  872. dialParams.DialConnMetrics = metricsSource
  873. }
  874. if noticeMetricsSource, ok := dialConn.(common.NoticeMetricsSource); ok {
  875. dialParams.DialConnNoticeMetrics = noticeMetricsSource
  876. }
  877. // If dialConn is not a Closer, tunnel failure detection may be slower
  878. if _, ok := dialConn.(common.Closer); !ok {
  879. NoticeWarning("tunnel.dialTunnel: dialConn is not a Closer")
  880. }
  881. cleanupConn := dialConn
  882. defer func() {
  883. // Cleanup on error
  884. if cleanupConn != nil {
  885. cleanupConn.Close()
  886. }
  887. }()
  888. monitoringStartTime := time.Now()
  889. monitoredConn := common.NewBurstMonitoredConn(
  890. dialConn,
  891. false,
  892. burstUpstreamTargetBytes, burstUpstreamDeadline,
  893. burstDownstreamTargetBytes, burstDownstreamDeadline)
  894. // Apply throttling (if configured). The underlying dialConn is always a
  895. // stream, even when the network conn uses UDP.
  896. throttledConn := common.NewThrottledConn(
  897. monitoredConn,
  898. true,
  899. rateLimits)
  900. // Add obfuscated SSH layer
  901. var sshConn net.Conn = throttledConn
  902. if protocol.TunnelProtocolUsesObfuscatedSSH(dialParams.TunnelProtocol) {
  903. obfuscatedSSHConn, err := obfuscator.NewClientObfuscatedSSHConn(
  904. throttledConn,
  905. dialParams.ServerEntry.SshObfuscatedKey,
  906. dialParams.ObfuscatorPaddingSeed,
  907. dialParams.OSSHObfuscatorSeedTransformerParameters,
  908. dialParams.OSSHPrefixSpec,
  909. dialParams.OSSHPrefixSplitConfig,
  910. &obfuscatedSSHMinPadding,
  911. &obfuscatedSSHMaxPadding)
  912. if err != nil {
  913. return nil, errors.Trace(err)
  914. }
  915. sshConn = obfuscatedSSHConn
  916. dialParams.ObfuscatedSSHConnMetrics = obfuscatedSSHConn
  917. }
  918. // Now establish the SSH session over the conn transport
  919. expectedPublicKey, err := base64.StdEncoding.DecodeString(
  920. dialParams.ServerEntry.SshHostKey)
  921. if err != nil {
  922. return nil, errors.Trace(err)
  923. }
  924. sshCertChecker := &ssh.CertChecker{
  925. IsHostAuthority: func(auth ssh.PublicKey, address string) bool {
  926. // Psiphon servers do not currently use SSH certificates. This CertChecker
  927. // code path may still be hit if a client attempts to connect using an
  928. // obsolete server entry.
  929. return false
  930. },
  931. HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
  932. // The remote address input isn't checked. In the case of fronted
  933. // protocols, the immediate remote peer won't be the Psiphon
  934. // server. In direct cases, the client has just dialed the IP
  935. // address and expected public key both taken from the same
  936. // trusted, signed server entry.
  937. if !bytes.Equal(expectedPublicKey, publicKey.Marshal()) {
  938. return errors.TraceNew("unexpected host public key")
  939. }
  940. return nil
  941. },
  942. }
  943. sshPasswordPayload := &protocol.SSHPasswordPayload{
  944. SessionId: config.SessionID,
  945. SshPassword: dialParams.ServerEntry.SshPassword,
  946. ClientCapabilities: []string{protocol.CLIENT_CAPABILITY_SERVER_REQUESTS},
  947. SponsorID: config.GetSponsorID(),
  948. }
  949. payload, err := json.Marshal(sshPasswordPayload)
  950. if err != nil {
  951. return nil, errors.Trace(err)
  952. }
  953. sshClientConfig := &ssh.ClientConfig{
  954. User: dialParams.ServerEntry.SshUsername,
  955. Auth: []ssh.AuthMethod{
  956. ssh.Password(string(payload)),
  957. },
  958. HostKeyCallback: sshCertChecker.CheckHostKey,
  959. ClientVersion: dialParams.SSHClientVersion,
  960. }
  961. sshClientConfig.KEXPRNGSeed = dialParams.SSHKEXSeed
  962. if protocol.TunnelProtocolUsesObfuscatedSSH(dialParams.TunnelProtocol) {
  963. if config.ObfuscatedSSHAlgorithms != nil {
  964. sshClientConfig.KeyExchanges = []string{config.ObfuscatedSSHAlgorithms[0]}
  965. sshClientConfig.Ciphers = []string{config.ObfuscatedSSHAlgorithms[1]}
  966. sshClientConfig.MACs = []string{config.ObfuscatedSSHAlgorithms[2]}
  967. sshClientConfig.HostKeyAlgorithms = []string{config.ObfuscatedSSHAlgorithms[3]}
  968. } else {
  969. // With Encrypt-then-MAC hash algorithms, packet length is
  970. // transmitted in plaintext, which aids in traffic analysis.
  971. //
  972. // TUNNEL_PROTOCOL_SSH is excepted since its KEX appears in plaintext,
  973. // and the protocol is intended to look like SSH on the wire.
  974. sshClientConfig.NoEncryptThenMACHash = true
  975. }
  976. } else {
  977. // For TUNNEL_PROTOCOL_SSH only, the server is expected to randomize
  978. // its KEX; setting PeerKEXPRNGSeed will ensure successful negotiation
  979. // between two randomized KEXes.
  980. if dialParams.ServerEntry.SshObfuscatedKey != "" {
  981. sshClientConfig.PeerKEXPRNGSeed, err = protocol.DeriveSSHServerKEXPRNGSeed(
  982. dialParams.ServerEntry.SshObfuscatedKey)
  983. if err != nil {
  984. return nil, errors.Trace(err)
  985. }
  986. }
  987. }
  988. // The ssh session establishment (via ssh.NewClientConn) is wrapped
  989. // in a timeout to ensure it won't hang. We've encountered firewalls
  990. // that allow the TCP handshake to complete but then send a RST to the
  991. // server-side and nothing to the client-side, and if that happens
  992. // while ssh.NewClientConn is reading, it may wait forever. The timeout
  993. // closes the conn, which interrupts it.
  994. // Note: TCP handshake timeouts are provided by TCPConn, and session
  995. // timeouts *after* ssh establishment are provided by the ssh keep alive
  996. // in operate tunnel.
  997. type sshNewClientResult struct {
  998. sshClient *ssh.Client
  999. sshRequests <-chan *ssh.Request
  1000. livenessTestMetrics *livenessTestMetrics
  1001. err error
  1002. }
  1003. resultChannel := make(chan sshNewClientResult)
  1004. // Call NewClientConn in a goroutine, as it blocks on SSH handshake network
  1005. // operations, and would block canceling or shutdown. If the parent context
  1006. // is canceled, close the net.Conn underlying SSH, which will interrupt the
  1007. // SSH handshake that may be blocking NewClientConn.
  1008. go func() {
  1009. // The following is adapted from ssh.Dial(), here using a custom conn
  1010. // The sshAddress is passed through to host key verification callbacks; we don't use it.
  1011. sshAddress := ""
  1012. sshClientConn, sshChannels, sshRequests, err := ssh.NewClientConn(
  1013. sshConn, sshAddress, sshClientConfig)
  1014. var sshClient *ssh.Client
  1015. var metrics *livenessTestMetrics
  1016. if err == nil {
  1017. // sshRequests is handled by operateTunnel.
  1018. // ssh.NewClient also expects to handle the sshRequests
  1019. // value from ssh.NewClientConn and will spawn a goroutine
  1020. // to handle the <-chan *ssh.Request, so we must provide
  1021. // a closed channel to ensure that goroutine halts instead
  1022. // of hanging on a nil channel.
  1023. noRequests := make(chan *ssh.Request)
  1024. close(noRequests)
  1025. sshClient = ssh.NewClient(sshClientConn, sshChannels, noRequests)
  1026. if livenessTestSpec.MaxUpstreamBytes > 0 || livenessTestSpec.MaxDownstreamBytes > 0 {
  1027. // When configured, perform a liveness test which sends and
  1028. // receives bytes through the tunnel to ensure the tunnel had
  1029. // not been blocked upon or shortly after connecting. This
  1030. // test is performed concurrently for each establishment
  1031. // candidate before selecting a successful tunnel.
  1032. //
  1033. // Note that the liveness test is subject to the
  1034. // TunnelConnectTimeout, which should be adjusted
  1035. // accordinging.
  1036. metrics, err = performLivenessTest(
  1037. sshClient,
  1038. livenessTestSpec,
  1039. dialParams.LivenessTestSeed)
  1040. // Skip notice when cancelling.
  1041. if baseCtx.Err() == nil {
  1042. NoticeLivenessTest(
  1043. dialParams.ServerEntry.GetDiagnosticID(), metrics, err == nil)
  1044. }
  1045. }
  1046. }
  1047. resultChannel <- sshNewClientResult{sshClient, sshRequests, metrics, err}
  1048. }()
  1049. var result sshNewClientResult
  1050. select {
  1051. case result = <-resultChannel:
  1052. case <-ctx.Done():
  1053. // Interrupt the goroutine and capture its error context to
  1054. // distinguish point of failure.
  1055. err := ctx.Err()
  1056. sshConn.Close()
  1057. result = <-resultChannel
  1058. if result.err != nil {
  1059. result.err = fmt.Errorf("%s: %s", err, result.err)
  1060. } else {
  1061. result.err = err
  1062. }
  1063. }
  1064. if result.err != nil {
  1065. failedTunnelLivenessTestMetrics = result.livenessTestMetrics
  1066. return nil, errors.Trace(result.err)
  1067. }
  1068. dialSucceeded = true
  1069. NoticeConnectedServer(dialParams)
  1070. cleanupConn = nil
  1071. // Invoke DNS cache extension (if enabled in the resolver) now that the
  1072. // tunnel is connected and the Psiphon server is authenticated. This
  1073. // demonstrates that any domain name resolved to an endpoint that is or
  1074. // is forwarded to the expected Psiphon server.
  1075. //
  1076. // Limitation: DNS cache extension is not implemented for Refraction
  1077. // Networking protocols. iOS VPN, the primary use case for DNS cache
  1078. // extension, does not enable Refraction Networking.
  1079. if protocol.TunnelProtocolUsesFrontedMeek(dialParams.TunnelProtocol) {
  1080. resolver := config.GetResolver()
  1081. if resolver != nil {
  1082. resolver.VerifyCacheExtension(dialParams.MeekFrontingDialAddress)
  1083. }
  1084. }
  1085. // When configured to do so, hold-off on activating this tunnel. This allows
  1086. // some extra time for slower but less resource intensive protocols to
  1087. // establish tunnels. By holding off post-connect, the client has this
  1088. // established tunnel ready to activate in case other protocols fail to
  1089. // establish. This hold-off phase continues to consume one connection worker.
  1090. //
  1091. // The network latency multiplier is not applied to HoldOffTunnelDuration,
  1092. // as the goal is to apply a consistent hold-off range across all tunnel
  1093. // candidates; and this avoids scaling up any delay users experience.
  1094. //
  1095. // The hold-off is applied regardless of whether this is the first tunnel
  1096. // in a session or a reconnection, even to a server affinity candidate,
  1097. // so that the advantage for other protocols persists.
  1098. if dialParams.HoldOffTunnelDuration > 0 {
  1099. NoticeHoldOffTunnel(dialParams.ServerEntry.GetDiagnosticID(), dialParams.HoldOffTunnelDuration)
  1100. common.SleepWithContext(ctx, dialParams.HoldOffTunnelDuration)
  1101. }
  1102. // Note: dialConn may be used to close the underlying network connection
  1103. // but should not be used to perform I/O as that would interfere with SSH
  1104. // (and also bypasses throttling).
  1105. return &dialResult{
  1106. dialConn: dialConn,
  1107. monitoringStartTime: monitoringStartTime,
  1108. monitoredConn: monitoredConn,
  1109. sshClient: result.sshClient,
  1110. sshRequests: result.sshRequests,
  1111. livenessTestMetrics: result.livenessTestMetrics,
  1112. extraFailureAction: extraFailureAction,
  1113. },
  1114. nil
  1115. }
  1116. func dialConjure(
  1117. ctx context.Context,
  1118. config *Config,
  1119. dialParams *DialParameters,
  1120. enableIPv6Dials bool,
  1121. enablePortRandomization bool,
  1122. enableRegistrationOverrides bool) (net.Conn, func(), error) {
  1123. // Specify a cache key with a scope that ensures that:
  1124. //
  1125. // (a) cached registrations aren't used across different networks, as a
  1126. // registration requires the client's public IP to match the value at time
  1127. // of registration;
  1128. //
  1129. // (b) cached registrations are associated with specific Psiphon server
  1130. // candidates, to ensure that replay will use the same phantom IP(s).
  1131. //
  1132. // This scheme allows for reuse of cached registrations on network A when a
  1133. // client roams from network A to network B and back to network A.
  1134. //
  1135. // Using the network ID as a proxy for client public IP address is a
  1136. // heurisitic: it's possible that a clients public IP address changes
  1137. // without the network ID changing, and it's not guaranteed that the client
  1138. // will be assigned the original public IP on network A; so there's some
  1139. // chance the registration cannot be reused.
  1140. diagnosticID := dialParams.ServerEntry.GetDiagnosticID()
  1141. cacheKey := dialParams.NetworkID + "-" + diagnosticID
  1142. conjureConfig := &refraction.ConjureConfig{
  1143. RegistrationCacheTTL: dialParams.ConjureCachedRegistrationTTL,
  1144. RegistrationCacheKey: cacheKey,
  1145. EnableIPv6Dials: enableIPv6Dials,
  1146. EnablePortRandomization: enablePortRandomization,
  1147. EnableRegistrationOverrides: enableRegistrationOverrides,
  1148. Transport: dialParams.ConjureTransport,
  1149. STUNServerAddress: dialParams.ConjureSTUNServerAddress,
  1150. DTLSEmptyInitialPacket: dialParams.ConjureDTLSEmptyInitialPacket,
  1151. DiagnosticID: diagnosticID,
  1152. Logger: NoticeCommonLogger(false),
  1153. }
  1154. if dialParams.ConjureAPIRegistration {
  1155. // Use MeekConn to domain front Conjure API registration.
  1156. //
  1157. // ConjureAPIRegistrarFrontingSpecs are applied via
  1158. // dialParams.GetMeekConfig, and will be subject to replay.
  1159. //
  1160. // Since DialMeek will create a TLS connection immediately, and a cached
  1161. // registration may be used, we will delay initializing the MeekConn-based
  1162. // RoundTripper until we know it's needed. This is implemented by passing
  1163. // in a RoundTripper that establishes a MeekConn when RoundTrip is called.
  1164. //
  1165. // In refraction.dial we configure 0 retries for API registration requests,
  1166. // assuming it's better to let another Psiphon candidate retry, with new
  1167. // domaing fronting parameters. As such, we expect only one round trip call
  1168. // per NewHTTPRoundTripper, so, in practise, there's no performance penalty
  1169. // from establishing a new MeekConn per round trip.
  1170. //
  1171. // Performing the full DialMeek/RoundTrip operation here allows us to call
  1172. // MeekConn.Close and ensure all resources are immediately cleaned up.
  1173. roundTrip := func(request *http.Request) (*http.Response, error) {
  1174. conn, err := DialMeek(
  1175. ctx, dialParams.GetMeekConfig(), dialParams.GetDialConfig())
  1176. if err != nil {
  1177. return nil, errors.Trace(err)
  1178. }
  1179. defer conn.Close()
  1180. response, err := conn.RoundTrip(request)
  1181. if err != nil {
  1182. return nil, errors.Trace(err)
  1183. }
  1184. // Read the response into a buffer and close the response
  1185. // body, ensuring that MeekConn.Close closes all idle connections.
  1186. //
  1187. // Alternatively, we could Clone the request to set
  1188. // http.Request.Close and avoid keeping any idle connection
  1189. // open after the response body is read by gotapdance. Since
  1190. // the response body is small and since gotapdance does not
  1191. // stream the response body, we're taking this approach which
  1192. // ensures cleanup.
  1193. body, err := ioutil.ReadAll(response.Body)
  1194. _ = response.Body.Close()
  1195. if err != nil {
  1196. return nil, errors.Trace(err)
  1197. }
  1198. response.Body = io.NopCloser(bytes.NewReader(body))
  1199. return response, nil
  1200. }
  1201. conjureConfig.APIRegistrarHTTPClient = &http.Client{
  1202. Transport: common.NewHTTPRoundTripper(roundTrip),
  1203. }
  1204. conjureConfig.APIRegistrarBidirectionalURL =
  1205. dialParams.ConjureAPIRegistrarBidirectionalURL
  1206. conjureConfig.APIRegistrarDelay = dialParams.ConjureAPIRegistrarDelay
  1207. } else if dialParams.ConjureDecoyRegistration {
  1208. // The Conjure "phantom" connection is compatible with fragmentation, but
  1209. // the decoy registrar connection, like Tapdance, is not, so force it off.
  1210. // Any tunnel fragmentation metrics will refer to the "phantom" connection
  1211. // only.
  1212. conjureConfig.DoDecoyRegistration = true
  1213. conjureConfig.DecoyRegistrarWidth = dialParams.ConjureDecoyRegistrarWidth
  1214. conjureConfig.DecoyRegistrarDelay = dialParams.ConjureDecoyRegistrarDelay
  1215. }
  1216. // Set extraFailureAction, which is invoked whenever the tunnel fails (i.e.,
  1217. // where RecordFailedTunnelStat is invoked). The action will remove any
  1218. // cached registration. When refraction.DialConjure succeeds, the underlying
  1219. // registration is cached. After refraction.DialConjure returns, it no
  1220. // longer modifies the cached state of that registration, assuming that it
  1221. // remains valid and effective. However adversarial impact on a given
  1222. // phantom IP may not become evident until after the initial TCP connection
  1223. // establishment and handshake performed by refraction.DialConjure. For
  1224. // example, it may be that the phantom dial is targeted for severe
  1225. // throttling which begins or is only evident later in the flow. Scheduling
  1226. // a call to DeleteCachedConjureRegistration allows us to invalidate the
  1227. // cached registration for a tunnel that fails later in its lifecycle.
  1228. //
  1229. // Note that extraFailureAction will retain a reference to conjureConfig for
  1230. // the lifetime of the tunnel.
  1231. extraFailureAction := func() {
  1232. refraction.DeleteCachedConjureRegistration(conjureConfig)
  1233. }
  1234. dialCtx := ctx
  1235. if protocol.ConjureTransportUsesDTLS(dialParams.ConjureTransport) {
  1236. // Conjure doesn't use the DTLS seed scheme, which supports in-proxy
  1237. // DTLS randomization. But every DTLS dial expects to find a seed
  1238. // state, so set the no-seed state.
  1239. dialCtx = inproxy_dtls.SetNoDTLSSeed(ctx)
  1240. }
  1241. dialConn, err := refraction.DialConjure(
  1242. dialCtx,
  1243. config.EmitRefractionNetworkingLogs,
  1244. config.GetPsiphonDataDirectory(),
  1245. NewRefractionNetworkingDialer(dialParams.GetDialConfig()).DialContext,
  1246. dialParams.DirectDialAddress,
  1247. conjureConfig)
  1248. if err != nil {
  1249. // When this function fails, invoke extraFailureAction directly; when it
  1250. // succeeds, return extraFailureAction to be called later.
  1251. extraFailureAction()
  1252. return nil, nil, errors.Trace(err)
  1253. }
  1254. return dialConn, extraFailureAction, nil
  1255. }
  1256. // makeInproxyTCPDialer returns a dialer which proxies TCP dials via an
  1257. // in-proxy proxy, as configured in dialParams.
  1258. //
  1259. // Limitation: MeekConn may redial TCP for a single tunnel connection, but
  1260. // that case is not supported by the in-proxy protocol, as the in-proxy proxy
  1261. // closes both its WebRTC DataChannel and the overall client connection when
  1262. // the upstream TCP connection closes. Any new connection from the client to
  1263. // the proxy must be a new tunnel connection with and accompanying
  1264. // broker/server relay. As a future enhancement, consider extending the
  1265. // in-proxy protocol to enable the client and proxy to establish additional
  1266. // WebRTC DataChannels and new upstream TCP connections within the scope of a
  1267. // single proxy/client connection.
  1268. func makeInproxyTCPDialer(
  1269. config *Config, dialParams *DialParameters) common.Dialer {
  1270. return func(ctx context.Context, _, _ string) (net.Conn, error) {
  1271. if dialParams.inproxyConn.Load() != nil {
  1272. return nil, errors.TraceNew("redial not supported")
  1273. }
  1274. var conn net.Conn
  1275. var err error
  1276. conn, err = dialInproxy(ctx, config, dialParams)
  1277. if err != nil {
  1278. return nil, errors.Trace(err)
  1279. }
  1280. // When the TCP fragmentor is configured for the 2nd hop protocol,
  1281. // approximate the behavior by applying the fragmentor to the WebRTC
  1282. // DataChannel writes, which will result in delays and DataChannel
  1283. // message sizes which will be reflected in the proxy's relay to its
  1284. // upstream TCP connection.
  1285. //
  1286. // This code is copied from DialTCP.
  1287. //
  1288. // Limitation: TCP BPF settings are not supported and currently
  1289. // disabled for all 2nd hop cases in
  1290. // protocol.TunnelProtocolMayUseClientBPF.
  1291. if dialParams.dialConfig.FragmentorConfig.MayFragment() {
  1292. conn = fragmentor.NewConn(
  1293. dialParams.dialConfig.FragmentorConfig,
  1294. func(message string) {
  1295. NoticeFragmentor(dialParams.dialConfig.DiagnosticID, message)
  1296. },
  1297. conn)
  1298. }
  1299. return conn, nil
  1300. }
  1301. }
  1302. type inproxyDialFailedError struct {
  1303. err error
  1304. }
  1305. func (e inproxyDialFailedError) Error() string {
  1306. return e.err.Error()
  1307. }
  1308. // dialInproxy performs the in-proxy dial and returns the resulting conn for
  1309. // use as an underlying conn for the 2nd hop protocol. The in-proxy dial
  1310. // first connects to the broker (or reuses an existing connection) to match
  1311. // with a proxy; and then establishes connection to the proxy.
  1312. func dialInproxy(
  1313. ctx context.Context,
  1314. config *Config,
  1315. dialParams *DialParameters) (retConn *inproxy.ClientConn, retErr error) {
  1316. defer func() {
  1317. // Wrap all returned errors with inproxyDialFailedError so callers can
  1318. // check for dialInproxy failures within nested errors.
  1319. if retErr != nil {
  1320. retErr = &inproxyDialFailedError{err: retErr}
  1321. }
  1322. }()
  1323. isProxy := false
  1324. webRTCDialInstance, err := NewInproxyWebRTCDialInstance(
  1325. config,
  1326. dialParams.NetworkID,
  1327. isProxy,
  1328. dialParams.inproxyNATStateManager,
  1329. dialParams.InproxySTUNDialParameters,
  1330. dialParams.InproxyWebRTCDialParameters)
  1331. if err != nil {
  1332. return nil, errors.Trace(err)
  1333. }
  1334. // dialAddress indicates to the broker and proxy how to dial the upstream
  1335. // Psiphon server, based on the 2nd hop tunnel protocol.
  1336. networkProtocol := inproxy.NetworkProtocolUDP
  1337. reliableTransport := false
  1338. if protocol.TunnelProtocolUsesTCP(dialParams.TunnelProtocol) {
  1339. networkProtocol = inproxy.NetworkProtocolTCP
  1340. reliableTransport = true
  1341. }
  1342. dialAddress := dialParams.DirectDialAddress
  1343. if protocol.TunnelProtocolUsesMeek(dialParams.TunnelProtocol) {
  1344. dialAddress = dialParams.MeekDialAddress
  1345. }
  1346. // Specify the value to be returned by inproxy.ClientConn.RemoteAddr.
  1347. // Currently, the one caller of RemoteAddr is utls, which uses the
  1348. // RemoteAddr as a TLS session cache key when there is no SNI.
  1349. // GetTLSSessionCacheKeyAddress returns a cache key value that is a valid
  1350. // address and that is also a more appropriate TLS session cache key than
  1351. // the proxy address.
  1352. remoteAddrOverride, err := dialParams.ServerEntry.GetTLSSessionCacheKeyAddress(
  1353. dialParams.TunnelProtocol)
  1354. if err != nil {
  1355. return nil, errors.Trace(err)
  1356. }
  1357. // Unlike the proxy broker case, clients already actively fetch tactics
  1358. // during tunnel establishment, so tactics.SetTacticsAPIParameters are not
  1359. // sent to the broker and no tactics are returned by the broker.
  1360. //
  1361. // No padding is added via the params as this is provided by the broker
  1362. // request obfuscation layer.
  1363. includeSessionID := true
  1364. params := getBaseAPIParameters(
  1365. baseParametersNoDialParameters, nil, includeSessionID, config, nil)
  1366. // The debugLogging flag is passed to both NoticeCommonLogger and to the
  1367. // inproxy package as well; skipping debug logs in the inproxy package,
  1368. // before calling into the notice logger, avoids unnecessary allocations
  1369. // and formatting when debug logging is off.
  1370. debugLogging := config.InproxyEnableWebRTCDebugLogging
  1371. clientConfig := &inproxy.ClientConfig{
  1372. Logger: NoticeCommonLogger(debugLogging),
  1373. EnableWebRTCDebugLogging: debugLogging,
  1374. BaseAPIParameters: params,
  1375. BrokerClient: dialParams.inproxyBrokerClient,
  1376. WebRTCDialCoordinator: webRTCDialInstance,
  1377. ReliableTransport: reliableTransport,
  1378. DialNetworkProtocol: networkProtocol,
  1379. DialAddress: dialAddress,
  1380. RemoteAddrOverride: remoteAddrOverride,
  1381. PackedDestinationServerEntry: dialParams.inproxyPackedSignedServerEntry,
  1382. MustUpgrade: config.OnInproxyMustUpgrade,
  1383. }
  1384. conn, err := inproxy.DialClient(ctx, clientConfig)
  1385. if err != nil {
  1386. return nil, errors.Trace(err)
  1387. }
  1388. // The inproxy.ClientConn is stored in dialParams.inproxyConn in order to
  1389. // later fetch its connection ID and to facilitate broker/client replay.
  1390. dialParams.inproxyConn.Store(conn)
  1391. return conn, nil
  1392. }
  1393. // Fields are exported for JSON encoding in NoticeLivenessTest.
  1394. type livenessTestMetrics struct {
  1395. Duration string
  1396. UpstreamBytes int
  1397. SentUpstreamBytes int
  1398. DownstreamBytes int
  1399. ReceivedDownstreamBytes int
  1400. }
  1401. func performLivenessTest(
  1402. sshClient *ssh.Client,
  1403. spec *parameters.LivenessTestSpec,
  1404. livenessTestPRNGSeed *prng.Seed) (*livenessTestMetrics, error) {
  1405. metrics := new(livenessTestMetrics)
  1406. defer func(startTime time.Time) {
  1407. metrics.Duration = time.Since(startTime).String()
  1408. }(time.Now())
  1409. PRNG := prng.NewPRNGWithSeed(livenessTestPRNGSeed)
  1410. metrics.UpstreamBytes = PRNG.Range(spec.MinUpstreamBytes, spec.MaxUpstreamBytes)
  1411. metrics.DownstreamBytes = PRNG.Range(spec.MinDownstreamBytes, spec.MaxDownstreamBytes)
  1412. request := &protocol.RandomStreamRequest{
  1413. UpstreamBytes: metrics.UpstreamBytes,
  1414. DownstreamBytes: metrics.DownstreamBytes,
  1415. }
  1416. extraData, err := json.Marshal(request)
  1417. if err != nil {
  1418. return metrics, errors.Trace(err)
  1419. }
  1420. channel, requests, err := sshClient.OpenChannel(
  1421. protocol.RANDOM_STREAM_CHANNEL_TYPE, extraData)
  1422. if err != nil {
  1423. return metrics, errors.Trace(err)
  1424. }
  1425. defer channel.Close()
  1426. go ssh.DiscardRequests(requests)
  1427. sent := 0
  1428. received := 0
  1429. upstream := new(sync.WaitGroup)
  1430. var errUpstream, errDownstream error
  1431. if metrics.UpstreamBytes > 0 {
  1432. // Process streams concurrently to minimize elapsed time. This also
  1433. // avoids a unidirectional flow burst early in the tunnel lifecycle.
  1434. upstream.Add(1)
  1435. go func() {
  1436. defer upstream.Done()
  1437. // In consideration of memory-constrained environments, use modest-sized copy
  1438. // buffers since many tunnel establishment workers may run the liveness test
  1439. // concurrently.
  1440. var buffer [4096]byte
  1441. n, err := common.CopyNBuffer(channel, rand.Reader, int64(metrics.UpstreamBytes), buffer[:])
  1442. sent = int(n)
  1443. if err != nil {
  1444. errUpstream = errors.Trace(err)
  1445. }
  1446. }()
  1447. }
  1448. if metrics.DownstreamBytes > 0 {
  1449. var buffer [4096]byte
  1450. n, err := common.CopyNBuffer(ioutil.Discard, channel, int64(metrics.DownstreamBytes), buffer[:])
  1451. received = int(n)
  1452. if err != nil {
  1453. errDownstream = errors.Trace(err)
  1454. }
  1455. }
  1456. upstream.Wait()
  1457. metrics.SentUpstreamBytes = sent
  1458. metrics.ReceivedDownstreamBytes = received
  1459. if errUpstream != nil {
  1460. return metrics, errUpstream
  1461. } else if errDownstream != nil {
  1462. return metrics, errDownstream
  1463. }
  1464. return metrics, nil
  1465. }
  1466. // operateTunnel monitors the health of the tunnel and performs
  1467. // periodic work.
  1468. //
  1469. // BytesTransferred and TotalBytesTransferred notices are emitted
  1470. // for live reporting and diagnostics reporting, respectively.
  1471. //
  1472. // Status requests are sent to the Psiphon API to report bytes
  1473. // transferred.
  1474. //
  1475. // Periodic SSH keep alive packets are sent to ensure the underlying
  1476. // TCP connection isn't terminated by NAT, or other network
  1477. // interference -- or test if it has been terminated while the device
  1478. // has been asleep. When a keep alive times out, the tunnel is
  1479. // considered failed.
  1480. //
  1481. // An immediate SSH keep alive "probe" is sent to test the tunnel and
  1482. // server responsiveness when a port forward failure is detected: a
  1483. // failed dial or failed read/write. This keep alive has a shorter
  1484. // timeout.
  1485. //
  1486. // Note that port forward failures may be due to non-failure conditions.
  1487. // For example, when the user inputs an invalid domain name and
  1488. // resolution is done by the ssh server; or trying to connect to a
  1489. // non-white-listed port; and the error message in these cases is not
  1490. // distinguishable from a a true server error (a common error message,
  1491. // "ssh: rejected: administratively prohibited (open failed)", may be
  1492. // returned for these cases but also if the server has run out of
  1493. // ephemeral ports, for example).
  1494. //
  1495. // SSH keep alives are not sent when the tunnel has been recently
  1496. // active (not only does tunnel activity obviate the necessity of a keep
  1497. // alive, testing has shown that keep alives may time out for "busy"
  1498. // tunnels, especially over meek protocol and other high latency
  1499. // conditions).
  1500. //
  1501. // "Recently active" is defined has having received payload bytes. Sent
  1502. // bytes are not considered as testing has shown bytes may appear to
  1503. // send when certain NAT devices have interfered with the tunnel, while
  1504. // no bytes are received. In a pathological case, with DNS implemented
  1505. // as tunneled UDP, a browser may wait excessively for a domain name to
  1506. // resolve, while no new port forward is attempted which would otherwise
  1507. // result in a tunnel failure detection.
  1508. //
  1509. // TODO: change "recently active" to include having received any
  1510. // SSH protocol messages from the server, not just user payload?
  1511. func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
  1512. defer tunnel.operateWaitGroup.Done()
  1513. now := time.Now()
  1514. lastBytesReceivedTime := now
  1515. lastTotalBytesTransferedTime := now
  1516. totalSent := int64(0)
  1517. totalReceived := int64(0)
  1518. setDialParamsSucceeded := false
  1519. noticeBytesTransferredTicker := time.NewTicker(1 * time.Second)
  1520. defer noticeBytesTransferredTicker.Stop()
  1521. // The next status request and ssh keep alive times are picked at random,
  1522. // from a range, to make the resulting traffic less fingerprintable,
  1523. // Note: not using Tickers since these are not fixed time periods.
  1524. nextStatusRequestPeriod := func() time.Duration {
  1525. p := tunnel.getCustomParameters()
  1526. return prng.Period(
  1527. p.Duration(parameters.PsiphonAPIStatusRequestPeriodMin),
  1528. p.Duration(parameters.PsiphonAPIStatusRequestPeriodMax))
  1529. }
  1530. statsTimer := time.NewTimer(nextStatusRequestPeriod())
  1531. defer statsTimer.Stop()
  1532. // Only one active tunnel should be designated as the status reporter for
  1533. // sending stats and prune checks. While the stats provide a "take out"
  1534. // scheme that would allow for multiple, concurrent requesters, the prune
  1535. // check does not.
  1536. //
  1537. // The statsTimer is retained, but set to practically never trigger, in
  1538. // the !isStatusReporter case to simplify following select statements.
  1539. if tunnel.isStatusReporter {
  1540. // Schedule an almost-immediate status request to deliver any unreported
  1541. // persistent stats or perform a server entry prune check.
  1542. unreported := CountUnreportedPersistentStats()
  1543. isCheckDue := IsCheckServerEntryTagsDue(tunnel.config)
  1544. if unreported > 0 || isCheckDue {
  1545. NoticeInfo(
  1546. "Unreported persistent stats: %d; server entry check due: %v",
  1547. unreported, isCheckDue)
  1548. p := tunnel.getCustomParameters()
  1549. statsTimer.Reset(
  1550. prng.Period(
  1551. p.Duration(parameters.PsiphonAPIStatusRequestShortPeriodMin),
  1552. p.Duration(parameters.PsiphonAPIStatusRequestShortPeriodMax)))
  1553. }
  1554. } else {
  1555. statsTimer = time.NewTimer(time.Duration(math.MaxInt64))
  1556. }
  1557. nextSshKeepAlivePeriod := func() time.Duration {
  1558. p := tunnel.getCustomParameters()
  1559. return prng.Period(
  1560. p.Duration(parameters.SSHKeepAlivePeriodMin),
  1561. p.Duration(parameters.SSHKeepAlivePeriodMax))
  1562. }
  1563. // TODO: don't initialize timer when config.DisablePeriodicSshKeepAlive is set
  1564. sshKeepAliveTimer := time.NewTimer(nextSshKeepAlivePeriod())
  1565. if tunnel.config.DisablePeriodicSshKeepAlive {
  1566. sshKeepAliveTimer.Stop()
  1567. } else {
  1568. defer sshKeepAliveTimer.Stop()
  1569. }
  1570. // Perform network requests in separate goroutines so as not to block
  1571. // other operations.
  1572. requestsWaitGroup := new(sync.WaitGroup)
  1573. requestsWaitGroup.Add(1)
  1574. signalStatusRequest := make(chan struct{})
  1575. go func() {
  1576. defer requestsWaitGroup.Done()
  1577. for range signalStatusRequest {
  1578. sendStats(tunnel)
  1579. }
  1580. }()
  1581. requestsWaitGroup.Add(1)
  1582. signalPeriodicSshKeepAlive := make(chan time.Duration)
  1583. sshKeepAliveError := make(chan error, 1)
  1584. go func() {
  1585. defer requestsWaitGroup.Done()
  1586. isFirstPeriodicKeepAlive := true
  1587. for timeout := range signalPeriodicSshKeepAlive {
  1588. bytesUp := atomic.LoadInt64(&totalSent)
  1589. bytesDown := atomic.LoadInt64(&totalReceived)
  1590. err := tunnel.sendSshKeepAlive(
  1591. isFirstPeriodicKeepAlive, false, timeout, bytesUp, bytesDown)
  1592. if err != nil {
  1593. select {
  1594. case sshKeepAliveError <- err:
  1595. default:
  1596. }
  1597. }
  1598. isFirstPeriodicKeepAlive = false
  1599. }
  1600. }()
  1601. // Probe-type SSH keep alives have a distinct send worker and may be sent
  1602. // concurrently, to ensure a long period keep alive timeout doesn't delay
  1603. // failed tunnel detection.
  1604. requestsWaitGroup.Add(1)
  1605. signalProbeSshKeepAlive := make(chan time.Duration)
  1606. go func() {
  1607. defer requestsWaitGroup.Done()
  1608. for timeout := range signalProbeSshKeepAlive {
  1609. bytesUp := atomic.LoadInt64(&totalSent)
  1610. bytesDown := atomic.LoadInt64(&totalReceived)
  1611. err := tunnel.sendSshKeepAlive(
  1612. false, true, timeout, bytesUp, bytesDown)
  1613. if err != nil {
  1614. select {
  1615. case sshKeepAliveError <- err:
  1616. default:
  1617. }
  1618. }
  1619. }
  1620. }()
  1621. shutdown := false
  1622. var err error
  1623. for !shutdown && err == nil {
  1624. select {
  1625. case <-noticeBytesTransferredTicker.C:
  1626. sent, received := transferstats.ReportRecentBytesTransferredForServer(
  1627. tunnel.dialParams.ServerEntry.IpAddress)
  1628. if received > 0 {
  1629. lastBytesReceivedTime = time.Now()
  1630. }
  1631. bytesUp := atomic.AddInt64(&totalSent, sent)
  1632. bytesDown := atomic.AddInt64(&totalReceived, received)
  1633. p := tunnel.getCustomParameters()
  1634. noticePeriod := p.Duration(parameters.TotalBytesTransferredNoticePeriod)
  1635. doEmitMemoryMetrics := p.Bool(parameters.TotalBytesTransferredEmitMemoryMetrics)
  1636. replayTargetUpstreamBytes := p.Int(parameters.ReplayTargetUpstreamBytes)
  1637. replayTargetDownstreamBytes := p.Int(parameters.ReplayTargetDownstreamBytes)
  1638. replayTargetTunnelDuration := p.Duration(parameters.ReplayTargetTunnelDuration)
  1639. if lastTotalBytesTransferedTime.Add(noticePeriod).Before(time.Now()) {
  1640. NoticeTotalBytesTransferred(
  1641. tunnel.dialParams.ServerEntry.GetDiagnosticID(), bytesUp, bytesDown)
  1642. if doEmitMemoryMetrics {
  1643. emitMemoryMetrics()
  1644. }
  1645. lastTotalBytesTransferedTime = time.Now()
  1646. }
  1647. // Only emit the frequent BytesTransferred notice when tunnel is not idle.
  1648. if tunnel.config.EmitBytesTransferred && (sent > 0 || received > 0) {
  1649. NoticeBytesTransferred(
  1650. tunnel.dialParams.ServerEntry.GetDiagnosticID(), sent, received)
  1651. }
  1652. // Once the tunnel has connected, activated, successfully transmitted the
  1653. // targeted number of bytes, and been up for the targeted duration
  1654. // (measured from the end of establishment), store its dial parameters for
  1655. // subsequent replay.
  1656. //
  1657. // Even when target bytes and duration are all 0, the tunnel must remain up
  1658. // for at least 1 second due to use of noticeBytesTransferredTicker; for
  1659. // the same reason the granularity of ReplayTargetTunnelDuration is
  1660. // seconds.
  1661. if !setDialParamsSucceeded &&
  1662. bytesUp >= int64(replayTargetUpstreamBytes) &&
  1663. bytesDown >= int64(replayTargetDownstreamBytes) &&
  1664. time.Since(tunnel.establishedTime) >= replayTargetTunnelDuration {
  1665. tunnel.dialParams.Succeeded()
  1666. setDialParamsSucceeded = true
  1667. }
  1668. case <-statsTimer.C:
  1669. if tunnel.isStatusReporter {
  1670. select {
  1671. case signalStatusRequest <- struct{}{}:
  1672. default:
  1673. }
  1674. statsTimer.Reset(nextStatusRequestPeriod())
  1675. }
  1676. case <-sshKeepAliveTimer.C:
  1677. p := tunnel.getCustomParameters()
  1678. inactivePeriod := p.Duration(parameters.SSHKeepAlivePeriodicInactivePeriod)
  1679. if lastBytesReceivedTime.Add(inactivePeriod).Before(time.Now()) {
  1680. timeout := p.Duration(parameters.SSHKeepAlivePeriodicTimeout)
  1681. select {
  1682. case signalPeriodicSshKeepAlive <- timeout:
  1683. default:
  1684. }
  1685. }
  1686. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  1687. case <-tunnel.signalPortForwardFailure:
  1688. // Note: no mutex on portForwardFailureTotal; only referenced here
  1689. tunnel.totalPortForwardFailures++
  1690. NoticeInfo("port forward failures for %s: %d",
  1691. tunnel.dialParams.ServerEntry.GetDiagnosticID(),
  1692. tunnel.totalPortForwardFailures)
  1693. // If the underlying Conn has closed (meek and other plugin protocols may
  1694. // close themselves in certain error conditions), the tunnel has certainly
  1695. // failed. Otherwise, probe with an SSH keep alive.
  1696. //
  1697. // TODO: the IsClosed case omits the failed tunnel logging and reset
  1698. // actions performed by sendSshKeepAlive. Should self-closing protocols
  1699. // perform these actions themselves?
  1700. if tunnel.conn.IsClosed() {
  1701. err = errors.TraceNew("underlying conn is closed")
  1702. } else {
  1703. p := tunnel.getCustomParameters()
  1704. inactivePeriod := p.Duration(parameters.SSHKeepAliveProbeInactivePeriod)
  1705. if lastBytesReceivedTime.Add(inactivePeriod).Before(time.Now()) {
  1706. timeout := p.Duration(parameters.SSHKeepAliveProbeTimeout)
  1707. select {
  1708. case signalProbeSshKeepAlive <- timeout:
  1709. default:
  1710. }
  1711. }
  1712. if !tunnel.config.DisablePeriodicSshKeepAlive {
  1713. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  1714. }
  1715. }
  1716. case err = <-sshKeepAliveError:
  1717. case serverRequest := <-tunnel.sshServerRequests:
  1718. if serverRequest != nil {
  1719. HandleServerRequest(tunnelOwner, tunnel, serverRequest)
  1720. }
  1721. case <-tunnel.operateCtx.Done():
  1722. shutdown = true
  1723. }
  1724. }
  1725. close(signalPeriodicSshKeepAlive)
  1726. close(signalProbeSshKeepAlive)
  1727. close(signalStatusRequest)
  1728. requestsWaitGroup.Wait()
  1729. // Capture bytes transferred since the last noticeBytesTransferredTicker tick
  1730. sent, received := transferstats.ReportRecentBytesTransferredForServer(
  1731. tunnel.dialParams.ServerEntry.IpAddress)
  1732. bytesUp := atomic.AddInt64(&totalSent, sent)
  1733. bytesDown := atomic.AddInt64(&totalReceived, received)
  1734. // Always emit a final NoticeTotalBytesTransferred
  1735. NoticeTotalBytesTransferred(
  1736. tunnel.dialParams.ServerEntry.GetDiagnosticID(), bytesUp, bytesDown)
  1737. if err == nil {
  1738. NoticeInfo("shutdown operate tunnel")
  1739. // This commanded shutdown case is initiated by Tunnel.Close, which will
  1740. // wait up to parameters.TunnelOperateShutdownTimeout to allow the following
  1741. // requests to complete.
  1742. // Send a final status request in order to report any outstanding persistent
  1743. // stats and domain bytes transferred as soon as possible.
  1744. sendStats(tunnel)
  1745. // The controller connectedReporter may have initiated a connected request
  1746. // concurrent to this commanded shutdown. SetInFlightConnectedRequest
  1747. // ensures that a connected request doesn't start after the commanded
  1748. // shutdown. AwaitInFlightConnectedRequest blocks until any in flight
  1749. // request completes or is aborted after TunnelOperateShutdownTimeout.
  1750. //
  1751. // As any connected request is performed by a concurrent goroutine,
  1752. // sendStats is called first and AwaitInFlightConnectedRequest second.
  1753. tunnel.AwaitInFlightConnectedRequest()
  1754. } else {
  1755. NoticeWarning("operate tunnel error for %s: %s",
  1756. tunnel.dialParams.ServerEntry.GetDiagnosticID(), err)
  1757. tunnelOwner.SignalTunnelFailure(tunnel)
  1758. }
  1759. }
  1760. // sendSshKeepAlive is a helper which sends a [email protected] request
  1761. // on the specified SSH connections and returns true of the request succeeds
  1762. // within a specified timeout. If the request fails, the associated conn is
  1763. // closed, which will terminate the associated tunnel.
  1764. func (tunnel *Tunnel) sendSshKeepAlive(
  1765. isFirstPeriodicKeepAlive bool,
  1766. isProbeKeepAlive bool,
  1767. timeout time.Duration,
  1768. bytesUp int64,
  1769. bytesDown int64) error {
  1770. p := tunnel.getCustomParameters()
  1771. // Random padding to frustrate fingerprinting.
  1772. request := prng.Padding(
  1773. p.Int(parameters.SSHKeepAlivePaddingMinBytes),
  1774. p.Int(parameters.SSHKeepAlivePaddingMaxBytes))
  1775. speedTestSample := isFirstPeriodicKeepAlive
  1776. if !speedTestSample {
  1777. speedTestSample = p.WeightedCoinFlip(
  1778. parameters.SSHKeepAliveSpeedTestSampleProbability)
  1779. }
  1780. networkConnectivityPollPeriod := p.Duration(
  1781. parameters.SSHKeepAliveNetworkConnectivityPollingPeriod)
  1782. resetOnFailure := p.WeightedCoinFlip(
  1783. parameters.SSHKeepAliveResetOnFailureProbability)
  1784. p.Close()
  1785. // Note: there is no request context since SSH requests cannot be interrupted
  1786. // directly. Closing the tunnel will interrupt the request. A timeout is set
  1787. // to unblock this function, but the goroutine may not exit until the tunnel
  1788. // is closed.
  1789. // Use a buffer of 1 as there are two senders and only one guaranteed receive.
  1790. errChannel := make(chan error, 1)
  1791. afterFunc := time.AfterFunc(timeout, func() {
  1792. errChannel <- errors.TraceNew("timed out")
  1793. })
  1794. defer afterFunc.Stop()
  1795. go func() {
  1796. startTime := time.Now()
  1797. // Note: reading a reply is important for last-received-time tunnel
  1798. // duration calculation.
  1799. requestOk, response, err := tunnel.sshClient.SendRequest(
  1800. "[email protected]", true, request)
  1801. elapsedTime := time.Since(startTime)
  1802. errChannel <- err
  1803. success := (err == nil && requestOk)
  1804. if success && isProbeKeepAlive {
  1805. NoticeInfo("Probe SSH keep-alive RTT: %s", elapsedTime)
  1806. }
  1807. // Record the keep alive round trip as a speed test sample. The first
  1808. // periodic keep alive is always recorded, as many tunnels are short-lived
  1809. // and we want to ensure that some data is gathered. Subsequent keep alives
  1810. // are recorded with some configurable probability, which, considering that
  1811. // only the last SpeedTestMaxSampleCount samples are retained, enables
  1812. // tuning the sampling frequency.
  1813. if success && speedTestSample {
  1814. err = tactics.AddSpeedTestSample(
  1815. tunnel.config.GetParameters(),
  1816. GetTacticsStorer(tunnel.config),
  1817. tunnel.config.GetNetworkID(),
  1818. tunnel.dialParams.ServerEntry.Region,
  1819. tunnel.dialParams.TunnelProtocol,
  1820. elapsedTime,
  1821. request,
  1822. response)
  1823. if err != nil {
  1824. NoticeWarning("AddSpeedTestSample failed: %s", errors.Trace(err))
  1825. }
  1826. }
  1827. }()
  1828. // While awaiting the response, poll the network connectivity state. If there
  1829. // is network connectivity, on the same network, for the entire duration of
  1830. // the keep alive request and the request fails, record a failed tunnel
  1831. // event.
  1832. //
  1833. // The network connectivity heuristic is intended to reduce the number of
  1834. // failed tunnels reported due to routine situations such as varying mobile
  1835. // network conditions. The polling may produce false positives if the network
  1836. // goes down and up between polling periods, or changes to a new network and
  1837. // back to the previous network between polling periods.
  1838. //
  1839. // For platforms that don't provide a NetworkConnectivityChecker, it is
  1840. // assumed that there is network connectivity.
  1841. //
  1842. // The approximate number of tunneled bytes successfully sent and received is
  1843. // recorded in the failed tunnel event as a quality indicator.
  1844. ticker := time.NewTicker(networkConnectivityPollPeriod)
  1845. defer ticker.Stop()
  1846. continuousNetworkConnectivity := true
  1847. networkID := tunnel.config.GetNetworkID()
  1848. var err error
  1849. loop:
  1850. for {
  1851. select {
  1852. case err = <-errChannel:
  1853. break loop
  1854. case <-ticker.C:
  1855. connectivityChecker := tunnel.config.NetworkConnectivityChecker
  1856. if (connectivityChecker != nil &&
  1857. connectivityChecker.HasNetworkConnectivity() != 1) ||
  1858. (networkID != tunnel.config.GetNetworkID()) {
  1859. continuousNetworkConnectivity = false
  1860. }
  1861. }
  1862. }
  1863. err = errors.Trace(err)
  1864. if err != nil {
  1865. tunnel.sshClient.Close()
  1866. tunnel.conn.Close()
  1867. // Don't perform log or reset actions when the keep alive may have been
  1868. // interrupted due to shutdown.
  1869. isShutdown := false
  1870. select {
  1871. case <-tunnel.operateCtx.Done():
  1872. isShutdown = true
  1873. default:
  1874. }
  1875. // Ensure that at most one of the two SSH keep alive workers (periodic and
  1876. // probe) perform the log and reset actions.
  1877. wasHandled := atomic.CompareAndSwapInt32(&tunnel.handledSSHKeepAliveFailure, 0, 1)
  1878. if continuousNetworkConnectivity &&
  1879. !isShutdown &&
  1880. !wasHandled {
  1881. // Note that tunnel.dialParams.Failed is not called in this failed
  1882. // tunnel case, and any replay parameters are retained.
  1883. //
  1884. // The continuousNetworkConnectivity mechanism is an imperfect
  1885. // best-effort to filter out bad network conditions, and isn't
  1886. // enabled on platforms without NetworkConnectivityChecker. There
  1887. // remains a possibility of failure due to innocuous bad network
  1888. // conditions and perhaps device sleep cycles.
  1889. //
  1890. // Furthermore, at this point the tunnel has already passed any
  1891. // pre-handshake liveness test, which is intended to catch cases
  1892. // of late-life cycle blocking.
  1893. _ = RecordFailedTunnelStat(
  1894. tunnel.config,
  1895. tunnel.dialParams,
  1896. tunnel.livenessTestMetrics,
  1897. bytesUp,
  1898. bytesDown,
  1899. err)
  1900. if tunnel.extraFailureAction != nil {
  1901. tunnel.extraFailureAction()
  1902. }
  1903. // SSHKeepAliveResetOnFailureProbability is set when a late-lifecycle
  1904. // impaired protocol attack is suspected. With the given probability, reset
  1905. // server affinity and replay parameters for this server to avoid
  1906. // continuously reconnecting to the server and/or using the same protocol
  1907. // and dial parameters.
  1908. if resetOnFailure {
  1909. NoticeInfo("Delete dial parameters for %s", tunnel.dialParams.ServerEntry.GetDiagnosticID())
  1910. err := DeleteDialParameters(tunnel.dialParams.ServerEntry.IpAddress, tunnel.dialParams.NetworkID)
  1911. if err != nil {
  1912. NoticeWarning("DeleteDialParameters failed: %s", err)
  1913. }
  1914. NoticeInfo("Delete server affinity for %s", tunnel.dialParams.ServerEntry.GetDiagnosticID())
  1915. err = DeleteServerEntryAffinity(tunnel.dialParams.ServerEntry.IpAddress)
  1916. if err != nil {
  1917. NoticeWarning("DeleteServerEntryAffinity failed: %s", err)
  1918. }
  1919. }
  1920. }
  1921. }
  1922. return err
  1923. }
  1924. // sendStats is a helper for sending session stats to the server.
  1925. func sendStats(tunnel *Tunnel) bool {
  1926. // Tunnel does not have a serverContext when DisableApi is set
  1927. if tunnel.serverContext == nil {
  1928. return true
  1929. }
  1930. // Skip when tunnel is discarded
  1931. if tunnel.IsDiscarded() {
  1932. return true
  1933. }
  1934. err := tunnel.serverContext.DoStatusRequest()
  1935. if err != nil {
  1936. NoticeWarning("DoStatusRequest failed for %s: %s",
  1937. tunnel.dialParams.ServerEntry.GetDiagnosticID(), err)
  1938. }
  1939. return err == nil
  1940. }
  1941. // getLivenessTestSpec returns the LivenessTestSpec for the given tunnel protocol.
  1942. func getLivenessTestSpec(
  1943. p parameters.ParametersAccessor,
  1944. tunnelProtocol string,
  1945. establishedTunnelsCount int) *parameters.LivenessTestSpec {
  1946. // matchingSpec returns the first matching LivenessTestSpec for the given
  1947. // tunnelProtocol.
  1948. matchingSpec := func(
  1949. spec parameters.LivenessTestSpecs,
  1950. tunnelProtocol string) *parameters.LivenessTestSpec {
  1951. if len(spec) != 0 {
  1952. // Sort the patterns by length, longest first, so that the most specific
  1953. // match is found first.
  1954. patterns := make([]string, 0, len(spec))
  1955. for p := range spec {
  1956. patterns = append(patterns, p)
  1957. }
  1958. slices.SortFunc(patterns, func(i, j string) int {
  1959. return len(j) - len(i)
  1960. })
  1961. // Find the first and longest pattern that matches the tunnel protocol.
  1962. for _, p := range patterns {
  1963. if wildcard.Match(p, tunnelProtocol) {
  1964. return spec[p]
  1965. }
  1966. }
  1967. // Default to LIVENESS_ANY if no pattern matches.
  1968. if v, ok := spec[parameters.LIVENESS_ANY]; ok {
  1969. return v
  1970. }
  1971. }
  1972. return nil
  1973. }
  1974. // If EstablishedTunnelsCount is 0, attempt the InitialLivenessTest specification.
  1975. // If no match is found, or if this is a subsequent connection, proceed to LivenessTest.
  1976. if establishedTunnelsCount == 0 {
  1977. spec := matchingSpec(p.LivenessTest(parameters.InitialLivenessTest), tunnelProtocol)
  1978. if spec != nil {
  1979. return spec
  1980. }
  1981. }
  1982. spec := matchingSpec(p.LivenessTest(parameters.LivenessTest), tunnelProtocol)
  1983. if spec != nil {
  1984. return spec
  1985. }
  1986. // Return legacy values as a last resort.
  1987. return &parameters.LivenessTestSpec{
  1988. MinUpstreamBytes: p.Int(parameters.LivenessTestMinUpstreamBytes),
  1989. MaxUpstreamBytes: p.Int(parameters.LivenessTestMaxUpstreamBytes),
  1990. MinDownstreamBytes: p.Int(parameters.LivenessTestMinDownstreamBytes),
  1991. MaxDownstreamBytes: p.Int(parameters.LivenessTestMaxDownstreamBytes),
  1992. }
  1993. }