tunnel.go 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "bytes"
  22. "encoding/base64"
  23. "encoding/json"
  24. "errors"
  25. "fmt"
  26. "io"
  27. "net"
  28. "sync"
  29. "sync/atomic"
  30. "time"
  31. regen "github.com/Psiphon-Inc/goregen"
  32. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/transferstats"
  33. "golang.org/x/crypto/ssh"
  34. )
  35. // Tunneler specifies the interface required by components that use a tunnel.
  36. // Components which use this interface may be serviced by a single Tunnel instance,
  37. // or a Controller which manages a pool of tunnels, or any other object which
  38. // implements Tunneler.
  39. // alwaysTunnel indicates that the connection should always be tunneled. If this
  40. // is not set, the connection may be made directly, depending on split tunnel
  41. // classification, when that feature is supported and active.
  42. // downstreamConn is an optional parameter which specifies a connection to be
  43. // explictly closed when the Dialed connection is closed. For instance, this
  44. // is used to close downstreamConn App<->LocalProxy connections when the related
  45. // LocalProxy<->SshPortForward connections close.
  46. type Tunneler interface {
  47. Dial(remoteAddr string, alwaysTunnel bool, downstreamConn net.Conn) (conn net.Conn, err error)
  48. SignalComponentFailure()
  49. }
  50. // TunnerOwner specifies the interface required by Tunnel to notify its
  51. // owner when it has failed. The owner may, as in the case of the Controller,
  52. // remove the tunnel from its list of active tunnels.
  53. type TunnelOwner interface {
  54. SignalTunnelFailure(tunnel *Tunnel)
  55. }
  56. // Tunnel is a connection to a Psiphon server. An established
  57. // tunnel includes a network connection to the specified server
  58. // and an SSH session built on top of that transport.
  59. type Tunnel struct {
  60. mutex *sync.Mutex
  61. config *Config
  62. untunneledDialConfig *DialConfig
  63. isDiscarded bool
  64. isClosed bool
  65. serverEntry *ServerEntry
  66. serverContext *ServerContext
  67. protocol string
  68. conn net.Conn
  69. sshClient *ssh.Client
  70. operateWaitGroup *sync.WaitGroup
  71. shutdownOperateBroadcast chan struct{}
  72. signalPortForwardFailure chan struct{}
  73. totalPortForwardFailures int
  74. startTime time.Time
  75. meekStats *MeekStats
  76. newClientVerificationPayload chan string
  77. }
  78. // EstablishTunnel first makes a network transport connection to the
  79. // Psiphon server and then establishes an SSH client session on top of
  80. // that transport. The SSH server is authenticated using the public
  81. // key in the server entry.
  82. // Depending on the server's capabilities, the connection may use
  83. // plain SSH over TCP, obfuscated SSH over TCP, or obfuscated SSH over
  84. // HTTP (meek protocol).
  85. // When requiredProtocol is not blank, that protocol is used. Otherwise,
  86. // the a random supported protocol is used.
  87. // untunneledDialConfig is used for untunneled final status requests.
  88. func EstablishTunnel(
  89. config *Config,
  90. untunneledDialConfig *DialConfig,
  91. sessionId string,
  92. pendingConns *Conns,
  93. serverEntry *ServerEntry,
  94. tunnelOwner TunnelOwner) (tunnel *Tunnel, err error) {
  95. selectedProtocol, err := selectProtocol(config, serverEntry)
  96. if err != nil {
  97. return nil, ContextError(err)
  98. }
  99. // Build transport layers and establish SSH connection
  100. conn, sshClient, meekStats, err := dialSsh(
  101. config, pendingConns, serverEntry, selectedProtocol, sessionId)
  102. if err != nil {
  103. return nil, ContextError(err)
  104. }
  105. // Cleanup on error
  106. defer func() {
  107. if err != nil {
  108. sshClient.Close()
  109. conn.Close()
  110. }
  111. }()
  112. // The tunnel is now connected
  113. tunnel = &Tunnel{
  114. mutex: new(sync.Mutex),
  115. config: config,
  116. untunneledDialConfig: untunneledDialConfig,
  117. isClosed: false,
  118. serverEntry: serverEntry,
  119. protocol: selectedProtocol,
  120. conn: conn,
  121. sshClient: sshClient,
  122. operateWaitGroup: new(sync.WaitGroup),
  123. shutdownOperateBroadcast: make(chan struct{}),
  124. // A buffer allows at least one signal to be sent even when the receiver is
  125. // not listening. Senders should not block.
  126. signalPortForwardFailure: make(chan struct{}, 1),
  127. meekStats: meekStats,
  128. // Buffer allows SetClientVerificationPayload to submit one new payload
  129. // without blocking or dropping it.
  130. newClientVerificationPayload: make(chan string, 1),
  131. }
  132. // Create a new Psiphon API server context for this tunnel. This includes
  133. // performing a handshake request. If the handshake fails, this establishment
  134. // fails.
  135. if !config.DisableApi {
  136. NoticeInfo("starting server context for %s", tunnel.serverEntry.IpAddress)
  137. tunnel.serverContext, err = NewServerContext(tunnel, sessionId)
  138. if err != nil {
  139. return nil, ContextError(
  140. fmt.Errorf("error starting server context for %s: %s",
  141. tunnel.serverEntry.IpAddress, err))
  142. }
  143. }
  144. tunnel.startTime = time.Now()
  145. // Now that network operations are complete, cancel interruptibility
  146. pendingConns.Remove(conn)
  147. // Spawn the operateTunnel goroutine, which monitors the tunnel and handles periodic stats updates.
  148. tunnel.operateWaitGroup.Add(1)
  149. go tunnel.operateTunnel(tunnelOwner)
  150. return tunnel, nil
  151. }
  152. // Close stops operating the tunnel and closes the underlying connection.
  153. // Supports multiple and/or concurrent calls to Close().
  154. // When isDicarded is set, operateTunnel will not attempt to send final
  155. // status requests.
  156. func (tunnel *Tunnel) Close(isDiscarded bool) {
  157. tunnel.mutex.Lock()
  158. tunnel.isDiscarded = isDiscarded
  159. isClosed := tunnel.isClosed
  160. tunnel.isClosed = true
  161. tunnel.mutex.Unlock()
  162. if !isClosed {
  163. // Signal operateTunnel to stop before closing the tunnel -- this
  164. // allows a final status request to be made in the case of an orderly
  165. // shutdown.
  166. // A timer is set, so if operateTunnel takes too long to stop, the
  167. // tunnel is closed, which will interrupt any slow final status request.
  168. // In effect, the TUNNEL_OPERATE_SHUTDOWN_TIMEOUT value will take
  169. // precedence over the PSIPHON_API_SERVER_TIMEOUT http.Client.Timeout
  170. // value set in makePsiphonHttpsClient.
  171. timer := time.AfterFunc(TUNNEL_OPERATE_SHUTDOWN_TIMEOUT, func() { tunnel.conn.Close() })
  172. close(tunnel.shutdownOperateBroadcast)
  173. tunnel.operateWaitGroup.Wait()
  174. timer.Stop()
  175. tunnel.sshClient.Close()
  176. // tunnel.conn.Close() may get called twice, which is allowed.
  177. tunnel.conn.Close()
  178. }
  179. }
  180. // IsClosed returns the tunnel's closed status.
  181. func (tunnel *Tunnel) IsClosed() bool {
  182. tunnel.mutex.Lock()
  183. defer tunnel.mutex.Unlock()
  184. return tunnel.isClosed
  185. }
  186. // IsDiscarded returns the tunnel's discarded flag.
  187. func (tunnel *Tunnel) IsDiscarded() bool {
  188. tunnel.mutex.Lock()
  189. defer tunnel.mutex.Unlock()
  190. return tunnel.isDiscarded
  191. }
  192. // SendAPIRequest sends an API request as an SSH request through the tunnel.
  193. // This function blocks awaiting a response. Only one request may be in-flight
  194. // at once; a concurrent SendAPIRequest will block until an active request
  195. // receives its response (or the SSH connection is terminated).
  196. func (tunnel *Tunnel) SendAPIRequest(
  197. name string, requestPayload []byte) ([]byte, error) {
  198. if tunnel.IsClosed() {
  199. return nil, ContextError(errors.New("tunnel is closed"))
  200. }
  201. ok, responsePayload, err := tunnel.sshClient.Conn.SendRequest(
  202. name, true, requestPayload)
  203. if err != nil {
  204. return nil, ContextError(err)
  205. }
  206. if !ok {
  207. return nil, ContextError(errors.New("API request rejected"))
  208. }
  209. return responsePayload, nil
  210. }
  211. // Dial establishes a port forward connection through the tunnel
  212. // This Dial doesn't support split tunnel, so alwaysTunnel is not referenced
  213. func (tunnel *Tunnel) Dial(
  214. remoteAddr string, alwaysTunnel bool, downstreamConn net.Conn) (conn net.Conn, err error) {
  215. if tunnel.IsClosed() {
  216. return nil, ContextError(errors.New("tunnel is closed"))
  217. }
  218. type tunnelDialResult struct {
  219. sshPortForwardConn net.Conn
  220. err error
  221. }
  222. resultChannel := make(chan *tunnelDialResult, 2)
  223. if *tunnel.config.TunnelPortForwardTimeoutSeconds > 0 {
  224. time.AfterFunc(time.Duration(*tunnel.config.TunnelPortForwardTimeoutSeconds)*time.Second, func() {
  225. resultChannel <- &tunnelDialResult{nil, errors.New("tunnel dial timeout")}
  226. })
  227. }
  228. go func() {
  229. sshPortForwardConn, err := tunnel.sshClient.Dial("tcp", remoteAddr)
  230. resultChannel <- &tunnelDialResult{sshPortForwardConn, err}
  231. }()
  232. result := <-resultChannel
  233. if result.err != nil {
  234. // TODO: conditional on type of error or error message?
  235. select {
  236. case tunnel.signalPortForwardFailure <- *new(struct{}):
  237. default:
  238. }
  239. return nil, ContextError(result.err)
  240. }
  241. conn = &TunneledConn{
  242. Conn: result.sshPortForwardConn,
  243. tunnel: tunnel,
  244. downstreamConn: downstreamConn}
  245. // Tunnel does not have a serverContext when DisableApi is set. We still use
  246. // transferstats.Conn to count bytes transferred for monitoring tunnel
  247. // quality.
  248. var regexps *transferstats.Regexps
  249. if tunnel.serverContext != nil {
  250. regexps = tunnel.serverContext.StatsRegexps()
  251. }
  252. conn = transferstats.NewConn(conn, tunnel.serverEntry.IpAddress, regexps)
  253. return conn, nil
  254. }
  255. // SignalComponentFailure notifies the tunnel that an associated component has failed.
  256. // This will terminate the tunnel.
  257. func (tunnel *Tunnel) SignalComponentFailure() {
  258. NoticeAlert("tunnel received component failure signal")
  259. tunnel.Close(false)
  260. }
  261. // SetClientVerificationPayload triggers a client verification request, for this
  262. // tunnel, with the specified verifiction payload. If the tunnel is not yet established,
  263. // the payload/request is enqueued. If a payload/request is already eneueued, the
  264. // new payload is dropped.
  265. func (tunnel *Tunnel) SetClientVerificationPayload(clientVerificationPayload string) {
  266. select {
  267. case tunnel.newClientVerificationPayload <- clientVerificationPayload:
  268. default:
  269. }
  270. }
  271. // TunneledConn implements net.Conn and wraps a port foward connection.
  272. // It is used to hook into Read and Write to observe I/O errors and
  273. // report these errors back to the tunnel monitor as port forward failures.
  274. // TunneledConn optionally tracks a peer connection to be explictly closed
  275. // when the TunneledConn is closed.
  276. type TunneledConn struct {
  277. net.Conn
  278. tunnel *Tunnel
  279. downstreamConn net.Conn
  280. }
  281. func (conn *TunneledConn) Read(buffer []byte) (n int, err error) {
  282. n, err = conn.Conn.Read(buffer)
  283. if err != nil && err != io.EOF {
  284. // Report new failure. Won't block; assumes the receiver
  285. // has a sufficient buffer for the threshold number of reports.
  286. // TODO: conditional on type of error or error message?
  287. select {
  288. case conn.tunnel.signalPortForwardFailure <- *new(struct{}):
  289. default:
  290. }
  291. }
  292. return
  293. }
  294. func (conn *TunneledConn) Write(buffer []byte) (n int, err error) {
  295. n, err = conn.Conn.Write(buffer)
  296. if err != nil && err != io.EOF {
  297. // Same as TunneledConn.Read()
  298. select {
  299. case conn.tunnel.signalPortForwardFailure <- *new(struct{}):
  300. default:
  301. }
  302. }
  303. return
  304. }
  305. func (conn *TunneledConn) Close() error {
  306. if conn.downstreamConn != nil {
  307. conn.downstreamConn.Close()
  308. }
  309. return conn.Conn.Close()
  310. }
  311. // selectProtocol is a helper that picks the tunnel protocol
  312. func selectProtocol(config *Config, serverEntry *ServerEntry) (selectedProtocol string, err error) {
  313. // TODO: properly handle protocols (e.g. FRONTED-MEEK-OSSH) vs. capabilities (e.g., {FRONTED-MEEK, OSSH})
  314. // for now, the code is simply assuming that MEEK capabilities imply OSSH capability.
  315. if config.TunnelProtocol != "" {
  316. if !serverEntry.SupportsProtocol(config.TunnelProtocol) {
  317. return "", ContextError(fmt.Errorf("server does not have required capability"))
  318. }
  319. selectedProtocol = config.TunnelProtocol
  320. } else {
  321. // Pick at random from the supported protocols. This ensures that we'll eventually
  322. // try all possible protocols. Depending on network configuration, it may be the
  323. // case that some protocol is only available through multi-capability servers,
  324. // and a simpler ranked preference of protocols could lead to that protocol never
  325. // being selected.
  326. candidateProtocols := serverEntry.GetSupportedProtocols()
  327. if len(candidateProtocols) == 0 {
  328. return "", ContextError(fmt.Errorf("server does not have any supported capabilities"))
  329. }
  330. index, err := MakeSecureRandomInt(len(candidateProtocols))
  331. if err != nil {
  332. return "", ContextError(err)
  333. }
  334. selectedProtocol = candidateProtocols[index]
  335. }
  336. return selectedProtocol, nil
  337. }
  338. // selectFrontingParameters is a helper which selects/generates meek fronting
  339. // parameters where the server entry provides multiple options or patterns.
  340. func selectFrontingParameters(
  341. serverEntry *ServerEntry) (frontingAddress, frontingHost string, err error) {
  342. if len(serverEntry.MeekFrontingAddressesRegex) > 0 {
  343. // Generate a front address based on the regex.
  344. frontingAddress, err = regen.Generate(serverEntry.MeekFrontingAddressesRegex)
  345. if err != nil {
  346. return "", "", ContextError(err)
  347. }
  348. } else {
  349. // Randomly select, for this connection attempt, one front address for
  350. // fronting-capable servers.
  351. if len(serverEntry.MeekFrontingAddresses) == 0 {
  352. return "", "", ContextError(errors.New("MeekFrontingAddresses is empty"))
  353. }
  354. index, err := MakeSecureRandomInt(len(serverEntry.MeekFrontingAddresses))
  355. if err != nil {
  356. return "", "", ContextError(err)
  357. }
  358. frontingAddress = serverEntry.MeekFrontingAddresses[index]
  359. }
  360. if len(serverEntry.MeekFrontingHosts) > 0 {
  361. index, err := MakeSecureRandomInt(len(serverEntry.MeekFrontingHosts))
  362. if err != nil {
  363. return "", "", ContextError(err)
  364. }
  365. frontingHost = serverEntry.MeekFrontingHosts[index]
  366. } else {
  367. // Backwards compatibility case
  368. frontingHost = serverEntry.MeekFrontingHost
  369. }
  370. return
  371. }
  372. // initMeekConfig is a helper that creates a MeekConfig suitable for the
  373. // selected meek tunnel protocol.
  374. func initMeekConfig(
  375. config *Config,
  376. serverEntry *ServerEntry,
  377. selectedProtocol,
  378. sessionId string) (*MeekConfig, error) {
  379. // The meek protocol always uses OSSH
  380. psiphonServerAddress := fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.SshObfuscatedPort)
  381. var dialAddress string
  382. useHTTPS := false
  383. var SNIServerName, hostHeader string
  384. transformedHostName := false
  385. switch selectedProtocol {
  386. case TUNNEL_PROTOCOL_FRONTED_MEEK:
  387. frontingAddress, frontingHost, err := selectFrontingParameters(serverEntry)
  388. if err != nil {
  389. return nil, ContextError(err)
  390. }
  391. dialAddress = fmt.Sprintf("%s:443", frontingAddress)
  392. useHTTPS = true
  393. if !serverEntry.MeekFrontingDisableSNI {
  394. SNIServerName, transformedHostName =
  395. config.HostNameTransformer.TransformHostName(frontingAddress)
  396. }
  397. hostHeader = frontingHost
  398. case TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP:
  399. frontingAddress, frontingHost, err := selectFrontingParameters(serverEntry)
  400. if err != nil {
  401. return nil, ContextError(err)
  402. }
  403. dialAddress = fmt.Sprintf("%s:80", frontingAddress)
  404. hostHeader = frontingHost
  405. case TUNNEL_PROTOCOL_UNFRONTED_MEEK:
  406. dialAddress = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.MeekServerPort)
  407. hostname := serverEntry.IpAddress
  408. hostname, transformedHostName = config.HostNameTransformer.TransformHostName(hostname)
  409. if serverEntry.MeekServerPort == 80 {
  410. hostHeader = hostname
  411. } else {
  412. hostHeader = fmt.Sprintf("%s:%d", hostname, serverEntry.MeekServerPort)
  413. }
  414. case TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS:
  415. dialAddress = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.MeekServerPort)
  416. useHTTPS = true
  417. SNIServerName, transformedHostName =
  418. config.HostNameTransformer.TransformHostName(serverEntry.IpAddress)
  419. if serverEntry.MeekServerPort == 443 {
  420. hostHeader = serverEntry.IpAddress
  421. } else {
  422. hostHeader = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.MeekServerPort)
  423. }
  424. default:
  425. return nil, ContextError(errors.New("unexpected selectedProtocol"))
  426. }
  427. // The unnderlying TLS will automatically disable SNI for IP address server name
  428. // values; we have this explicit check here so we record the correct value for stats.
  429. if net.ParseIP(SNIServerName) != nil {
  430. SNIServerName = ""
  431. }
  432. return &MeekConfig{
  433. DialAddress: dialAddress,
  434. UseHTTPS: useHTTPS,
  435. SNIServerName: SNIServerName,
  436. HostHeader: hostHeader,
  437. TransformedHostName: transformedHostName,
  438. PsiphonServerAddress: psiphonServerAddress,
  439. SessionID: sessionId,
  440. MeekCookieEncryptionPublicKey: serverEntry.MeekCookieEncryptionPublicKey,
  441. MeekObfuscatedKey: serverEntry.MeekObfuscatedKey,
  442. }, nil
  443. }
  444. // dialSsh is a helper that builds the transport layers and establishes the SSH connection.
  445. // When a meek protocols is selected, additional MeekStats are recorded and returned.
  446. func dialSsh(
  447. config *Config,
  448. pendingConns *Conns,
  449. serverEntry *ServerEntry,
  450. selectedProtocol,
  451. sessionId string) (net.Conn, *ssh.Client, *MeekStats, error) {
  452. // The meek protocols tunnel obfuscated SSH. Obfuscated SSH is layered on top of SSH.
  453. // So depending on which protocol is used, multiple layers are initialized.
  454. useObfuscatedSsh := false
  455. var directTCPDialAddress string
  456. var meekConfig *MeekConfig
  457. var err error
  458. switch selectedProtocol {
  459. case TUNNEL_PROTOCOL_OBFUSCATED_SSH:
  460. useObfuscatedSsh = true
  461. directTCPDialAddress = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.SshObfuscatedPort)
  462. case TUNNEL_PROTOCOL_SSH:
  463. directTCPDialAddress = fmt.Sprintf("%s:%d", serverEntry.IpAddress, serverEntry.SshPort)
  464. default:
  465. useObfuscatedSsh = true
  466. meekConfig, err = initMeekConfig(config, serverEntry, selectedProtocol, sessionId)
  467. if err != nil {
  468. return nil, nil, nil, ContextError(err)
  469. }
  470. }
  471. NoticeConnectingServer(
  472. serverEntry.IpAddress,
  473. serverEntry.Region,
  474. selectedProtocol,
  475. directTCPDialAddress,
  476. meekConfig)
  477. // Use an asynchronous callback to record the resolved IP address when
  478. // dialing a domain name. Note that DialMeek doesn't immediately
  479. // establish any HTTPS connections, so the resolved IP address won't be
  480. // reported until during/after ssh session establishment (the ssh traffic
  481. // is meek payload). So don't Load() the IP address value until after that
  482. // has completed to ensure a result.
  483. var resolvedIPAddress atomic.Value
  484. resolvedIPAddress.Store("")
  485. setResolvedIPAddress := func(IPAddress string) {
  486. resolvedIPAddress.Store(IPAddress)
  487. }
  488. // Create the base transport: meek or direct connection
  489. dialConfig := &DialConfig{
  490. UpstreamProxyUrl: config.UpstreamProxyUrl,
  491. ConnectTimeout: time.Duration(*config.TunnelConnectTimeoutSeconds) * time.Second,
  492. PendingConns: pendingConns,
  493. DeviceBinder: config.DeviceBinder,
  494. DnsServerGetter: config.DnsServerGetter,
  495. UseIndistinguishableTLS: config.UseIndistinguishableTLS,
  496. TrustedCACertificatesFilename: config.TrustedCACertificatesFilename,
  497. DeviceRegion: config.DeviceRegion,
  498. ResolvedIPCallback: setResolvedIPAddress,
  499. }
  500. var conn net.Conn
  501. if meekConfig != nil {
  502. conn, err = DialMeek(meekConfig, dialConfig)
  503. if err != nil {
  504. return nil, nil, nil, ContextError(err)
  505. }
  506. } else {
  507. conn, err = DialTCP(directTCPDialAddress, dialConfig)
  508. if err != nil {
  509. return nil, nil, nil, ContextError(err)
  510. }
  511. }
  512. cleanupConn := conn
  513. defer func() {
  514. // Cleanup on error
  515. if cleanupConn != nil {
  516. cleanupConn.Close()
  517. }
  518. }()
  519. // Add obfuscated SSH layer
  520. sshConn := conn
  521. if useObfuscatedSsh {
  522. sshConn, err = NewObfuscatedSshConn(
  523. OBFUSCATION_CONN_MODE_CLIENT, conn, serverEntry.SshObfuscatedKey)
  524. if err != nil {
  525. return nil, nil, nil, ContextError(err)
  526. }
  527. }
  528. // Now establish the SSH session over the conn transport
  529. expectedPublicKey, err := base64.StdEncoding.DecodeString(serverEntry.SshHostKey)
  530. if err != nil {
  531. return nil, nil, nil, ContextError(err)
  532. }
  533. sshCertChecker := &ssh.CertChecker{
  534. HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
  535. if !bytes.Equal(expectedPublicKey, publicKey.Marshal()) {
  536. return ContextError(errors.New("unexpected host public key"))
  537. }
  538. return nil
  539. },
  540. }
  541. sshPasswordPayload, err := json.Marshal(
  542. struct {
  543. SessionId string `json:"SessionId"`
  544. SshPassword string `json:"SshPassword"`
  545. }{sessionId, serverEntry.SshPassword})
  546. if err != nil {
  547. return nil, nil, nil, ContextError(err)
  548. }
  549. sshClientConfig := &ssh.ClientConfig{
  550. User: serverEntry.SshUsername,
  551. Auth: []ssh.AuthMethod{
  552. ssh.Password(string(sshPasswordPayload)),
  553. },
  554. HostKeyCallback: sshCertChecker.CheckHostKey,
  555. }
  556. // The ssh session establishment (via ssh.NewClientConn) is wrapped
  557. // in a timeout to ensure it won't hang. We've encountered firewalls
  558. // that allow the TCP handshake to complete but then send a RST to the
  559. // server-side and nothing to the client-side, and if that happens
  560. // while ssh.NewClientConn is reading, it may wait forever. The timeout
  561. // closes the conn, which interrupts it.
  562. // Note: TCP handshake timeouts are provided by TCPConn, and session
  563. // timeouts *after* ssh establishment are provided by the ssh keep alive
  564. // in operate tunnel.
  565. // TODO: adjust the timeout to account for time-elapsed-from-start
  566. type sshNewClientResult struct {
  567. sshClient *ssh.Client
  568. err error
  569. }
  570. resultChannel := make(chan *sshNewClientResult, 2)
  571. if *config.TunnelConnectTimeoutSeconds > 0 {
  572. time.AfterFunc(time.Duration(*config.TunnelConnectTimeoutSeconds)*time.Second, func() {
  573. resultChannel <- &sshNewClientResult{nil, errors.New("ssh dial timeout")}
  574. })
  575. }
  576. go func() {
  577. // The following is adapted from ssh.Dial(), here using a custom conn
  578. // The sshAddress is passed through to host key verification callbacks; we don't use it.
  579. sshAddress := ""
  580. sshClientConn, sshChans, sshReqs, err := ssh.NewClientConn(sshConn, sshAddress, sshClientConfig)
  581. var sshClient *ssh.Client
  582. if err == nil {
  583. sshClient = ssh.NewClient(sshClientConn, sshChans, sshReqs)
  584. }
  585. resultChannel <- &sshNewClientResult{sshClient, err}
  586. }()
  587. result := <-resultChannel
  588. if result.err != nil {
  589. return nil, nil, nil, ContextError(result.err)
  590. }
  591. var meekStats *MeekStats
  592. if meekConfig != nil {
  593. meekStats = &MeekStats{
  594. DialAddress: meekConfig.DialAddress,
  595. ResolvedIPAddress: resolvedIPAddress.Load().(string),
  596. SNIServerName: meekConfig.SNIServerName,
  597. HostHeader: meekConfig.HostHeader,
  598. TransformedHostName: meekConfig.TransformedHostName,
  599. }
  600. NoticeConnectedMeekStats(serverEntry.IpAddress, meekStats)
  601. }
  602. cleanupConn = nil
  603. return conn, result.sshClient, meekStats, nil
  604. }
  605. // operateTunnel monitors the health of the tunnel and performs
  606. // periodic work.
  607. //
  608. // BytesTransferred and TotalBytesTransferred notices are emitted
  609. // for live reporting and diagnostics reporting, respectively.
  610. //
  611. // Status requests are sent to the Psiphon API to report bytes
  612. // transferred.
  613. //
  614. // Periodic SSH keep alive packets are sent to ensure the underlying
  615. // TCP connection isn't terminated by NAT, or other network
  616. // interference -- or test if it has been terminated while the device
  617. // has been asleep. When a keep alive times out, the tunnel is
  618. // considered failed.
  619. //
  620. // An immediate SSH keep alive "probe" is sent to test the tunnel and
  621. // server responsiveness when a port forward failure is detected: a
  622. // failed dial or failed read/write. This keep alive has a shorter
  623. // timeout.
  624. //
  625. // Note that port foward failures may be due to non-failure conditions.
  626. // For example, when the user inputs an invalid domain name and
  627. // resolution is done by the ssh server; or trying to connect to a
  628. // non-white-listed port; and the error message in these cases is not
  629. // distinguishable from a a true server error (a common error message,
  630. // "ssh: rejected: administratively prohibited (open failed)", may be
  631. // returned for these cases but also if the server has run out of
  632. // ephemeral ports, for example).
  633. //
  634. // SSH keep alives are not sent when the tunnel has been recently
  635. // active (not only does tunnel activity obviate the necessity of a keep
  636. // alive, testing has shown that keep alives may time out for "busy"
  637. // tunnels, especially over meek protocol and other high latency
  638. // conditions).
  639. //
  640. // "Recently active" is defined has having received payload bytes. Sent
  641. // bytes are not considered as testing has shown bytes may appear to
  642. // send when certain NAT devices have interfered with the tunnel, while
  643. // no bytes are received. In a pathological case, with DNS implemented
  644. // as tunneled UDP, a browser may wait excessively for a domain name to
  645. // resolve, while no new port forward is attempted which would otherwise
  646. // result in a tunnel failure detection.
  647. //
  648. // TODO: change "recently active" to include having received any
  649. // SSH protocol messages from the server, not just user payload?
  650. //
  651. func (tunnel *Tunnel) operateTunnel(tunnelOwner TunnelOwner) {
  652. defer tunnel.operateWaitGroup.Done()
  653. lastBytesReceivedTime := time.Now()
  654. lastTotalBytesTransferedTime := time.Now()
  655. totalSent := int64(0)
  656. totalReceived := int64(0)
  657. noticeBytesTransferredTicker := time.NewTicker(1 * time.Second)
  658. defer noticeBytesTransferredTicker.Stop()
  659. // The next status request and ssh keep alive times are picked at random,
  660. // from a range, to make the resulting traffic less fingerprintable,
  661. // Note: not using Tickers since these are not fixed time periods.
  662. nextStatusRequestPeriod := func() time.Duration {
  663. return MakeRandomPeriod(
  664. PSIPHON_API_STATUS_REQUEST_PERIOD_MIN,
  665. PSIPHON_API_STATUS_REQUEST_PERIOD_MAX)
  666. }
  667. statsTimer := time.NewTimer(nextStatusRequestPeriod())
  668. defer statsTimer.Stop()
  669. // Schedule an immediate status request to deliver any unreported
  670. // tunnel stats.
  671. // Note: this may not be effective when there's an outstanding
  672. // asynchronous untunneled final status request is holding the
  673. // tunnel stats records. It may also conflict with other
  674. // tunnel candidates which attempt to send an immediate request
  675. // before being discarded. For now, we mitigate this with a short,
  676. // random delay.
  677. unreported := CountUnreportedTunnelStats()
  678. if unreported > 0 {
  679. NoticeInfo("Unreported tunnel stats: %d", unreported)
  680. statsTimer.Reset(MakeRandomPeriod(
  681. PSIPHON_API_STATUS_REQUEST_SHORT_PERIOD_MIN,
  682. PSIPHON_API_STATUS_REQUEST_SHORT_PERIOD_MAX))
  683. }
  684. nextSshKeepAlivePeriod := func() time.Duration {
  685. return MakeRandomPeriod(
  686. TUNNEL_SSH_KEEP_ALIVE_PERIOD_MIN,
  687. TUNNEL_SSH_KEEP_ALIVE_PERIOD_MAX)
  688. }
  689. // TODO: don't initialize timer when config.DisablePeriodicSshKeepAlive is set
  690. sshKeepAliveTimer := time.NewTimer(nextSshKeepAlivePeriod())
  691. if tunnel.config.DisablePeriodicSshKeepAlive {
  692. sshKeepAliveTimer.Stop()
  693. } else {
  694. defer sshKeepAliveTimer.Stop()
  695. }
  696. // Perform network requests in separate goroutines so as not to block
  697. // other operations.
  698. requestsWaitGroup := new(sync.WaitGroup)
  699. requestsWaitGroup.Add(1)
  700. signalStatusRequest := make(chan struct{})
  701. go func() {
  702. defer requestsWaitGroup.Done()
  703. for _ = range signalStatusRequest {
  704. sendStats(tunnel)
  705. }
  706. }()
  707. requestsWaitGroup.Add(1)
  708. signalSshKeepAlive := make(chan time.Duration)
  709. sshKeepAliveError := make(chan error, 1)
  710. go func() {
  711. defer requestsWaitGroup.Done()
  712. for timeout := range signalSshKeepAlive {
  713. err := sendSshKeepAlive(tunnel.sshClient, tunnel.conn, timeout)
  714. if err != nil {
  715. select {
  716. case sshKeepAliveError <- err:
  717. default:
  718. }
  719. }
  720. }
  721. }()
  722. requestsWaitGroup.Add(1)
  723. signalStopClientVerificationRequests := make(chan struct{})
  724. go func() {
  725. defer requestsWaitGroup.Done()
  726. clientVerificationPayload := ""
  727. for {
  728. // TODO: use reflect.SelectCase?
  729. if clientVerificationPayload == "" {
  730. select {
  731. case clientVerificationPayload = <-tunnel.newClientVerificationPayload:
  732. case <-signalStopClientVerificationRequests:
  733. return
  734. }
  735. } else {
  736. // When clientVerificationPayload is not "", the request for that
  737. // payload so retry after a delay. Will use a new payload instead
  738. // if that arrives in the meantime.
  739. timeout := time.After(PSIPHON_API_CLIENT_VERIFICATION_REQUEST_RETRY_PERIOD)
  740. select {
  741. case <-timeout:
  742. case clientVerificationPayload = <-tunnel.newClientVerificationPayload:
  743. case <-signalStopClientVerificationRequests:
  744. return
  745. }
  746. }
  747. if sendClientVerification(tunnel, clientVerificationPayload) {
  748. clientVerificationPayload = ""
  749. }
  750. }
  751. }()
  752. shutdown := false
  753. var err error
  754. for !shutdown && err == nil {
  755. select {
  756. case <-noticeBytesTransferredTicker.C:
  757. sent, received := transferstats.ReportRecentBytesTransferredForServer(
  758. tunnel.serverEntry.IpAddress)
  759. if received > 0 {
  760. lastBytesReceivedTime = time.Now()
  761. }
  762. totalSent += sent
  763. totalReceived += received
  764. if lastTotalBytesTransferedTime.Add(TOTAL_BYTES_TRANSFERRED_NOTICE_PERIOD).Before(time.Now()) {
  765. NoticeTotalBytesTransferred(tunnel.serverEntry.IpAddress, totalSent, totalReceived)
  766. lastTotalBytesTransferedTime = time.Now()
  767. }
  768. // Only emit the frequent BytesTransferred notice when tunnel is not idle.
  769. if tunnel.config.EmitBytesTransferred && (sent > 0 || received > 0) {
  770. NoticeBytesTransferred(tunnel.serverEntry.IpAddress, sent, received)
  771. }
  772. case <-statsTimer.C:
  773. select {
  774. case signalStatusRequest <- *new(struct{}):
  775. default:
  776. }
  777. statsTimer.Reset(nextStatusRequestPeriod())
  778. case <-sshKeepAliveTimer.C:
  779. if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PERIODIC_INACTIVE_PERIOD).Before(time.Now()) {
  780. select {
  781. case signalSshKeepAlive <- time.Duration(*tunnel.config.TunnelSshKeepAlivePeriodicTimeoutSeconds) * time.Second:
  782. default:
  783. }
  784. }
  785. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  786. case <-tunnel.signalPortForwardFailure:
  787. // Note: no mutex on portForwardFailureTotal; only referenced here
  788. tunnel.totalPortForwardFailures++
  789. NoticeInfo("port forward failures for %s: %d",
  790. tunnel.serverEntry.IpAddress, tunnel.totalPortForwardFailures)
  791. if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PROBE_INACTIVE_PERIOD).Before(time.Now()) {
  792. select {
  793. case signalSshKeepAlive <- time.Duration(*tunnel.config.TunnelSshKeepAliveProbeTimeoutSeconds) * time.Second:
  794. default:
  795. }
  796. }
  797. if !tunnel.config.DisablePeriodicSshKeepAlive {
  798. sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
  799. }
  800. case err = <-sshKeepAliveError:
  801. case <-tunnel.shutdownOperateBroadcast:
  802. shutdown = true
  803. }
  804. }
  805. close(signalSshKeepAlive)
  806. close(signalStatusRequest)
  807. close(signalStopClientVerificationRequests)
  808. requestsWaitGroup.Wait()
  809. // Capture bytes transferred since the last noticeBytesTransferredTicker tick
  810. sent, received := transferstats.ReportRecentBytesTransferredForServer(tunnel.serverEntry.IpAddress)
  811. totalSent += sent
  812. totalReceived += received
  813. // Always emit a final NoticeTotalBytesTransferred
  814. NoticeTotalBytesTransferred(tunnel.serverEntry.IpAddress, totalSent, totalReceived)
  815. // The stats for this tunnel will be reported via the next successful
  816. // status request.
  817. // Note: Since client clocks are unreliable, we use the server's reported
  818. // timestamp in the handshake response as the tunnel start time. This time
  819. // will be slightly earlier than the actual tunnel activation time, as the
  820. // client has to receive and parse the response and activate the tunnel.
  821. // Tunnel does not have a serverContext when DisableApi is set.
  822. if tunnel.serverContext != nil && !tunnel.IsDiscarded() {
  823. err := RecordTunnelStats(
  824. tunnel.serverContext.sessionId,
  825. tunnel.serverContext.tunnelNumber,
  826. tunnel.serverEntry.IpAddress,
  827. tunnel.serverContext.serverHandshakeTimestamp,
  828. fmt.Sprintf("%d", time.Now().Sub(tunnel.startTime)),
  829. totalSent,
  830. totalReceived)
  831. if err != nil {
  832. NoticeAlert("RecordTunnelStats failed: %s", ContextError(err))
  833. }
  834. }
  835. // Final status request notes:
  836. //
  837. // It's highly desirable to send a final status request in order to report
  838. // domain bytes transferred stats as well as to report tunnel stats as
  839. // soon as possible. For this reason, we attempt untunneled requests when
  840. // the tunneled request isn't possible or has failed.
  841. //
  842. // In an orderly shutdown (err == nil), the Controller is stopping and
  843. // everything must be wrapped up quickly. Also, we still have a working
  844. // tunnel. So we first attempt a tunneled status request (with a short
  845. // timeout) and then attempt, synchronously -- otherwise the Contoller's
  846. // runWaitGroup.Wait() will return while a request is still in progress
  847. // -- untunneled requests (also with short timeouts). Note that in this
  848. // case the untunneled request will opt out of untunneledPendingConns so
  849. // that it's not inadvertently canceled by the Controller shutdown
  850. // sequence (see doUntunneledStatusRequest).
  851. //
  852. // If the tunnel has failed, the Controller may continue working. We want
  853. // to re-establish as soon as possible (so don't want to block on status
  854. // requests, even for a second). We may have a long time to attempt
  855. // untunneled requests in the background. And there is no tunnel through
  856. // which to attempt tunneled requests. So we spawn a goroutine to run the
  857. // untunneled requests, which are allowed a longer timeout. These requests
  858. // will be interrupted by the Controller's untunneledPendingConns.CloseAll()
  859. // in the case of a shutdown.
  860. if err == nil {
  861. NoticeInfo("shutdown operate tunnel")
  862. if !sendStats(tunnel) {
  863. sendUntunneledStats(tunnel, true)
  864. }
  865. } else {
  866. NoticeAlert("operate tunnel error for %s: %s", tunnel.serverEntry.IpAddress, err)
  867. go sendUntunneledStats(tunnel, false)
  868. tunnelOwner.SignalTunnelFailure(tunnel)
  869. }
  870. }
  871. // sendSshKeepAlive is a helper which sends a [email protected] request
  872. // on the specified SSH connections and returns true of the request succeeds
  873. // within a specified timeout.
  874. func sendSshKeepAlive(
  875. sshClient *ssh.Client, conn net.Conn, timeout time.Duration) error {
  876. errChannel := make(chan error, 2)
  877. if timeout > 0 {
  878. time.AfterFunc(timeout, func() {
  879. errChannel <- TimeoutError{}
  880. })
  881. }
  882. go func() {
  883. // Random padding to frustrate fingerprinting
  884. _, _, err := sshClient.SendRequest(
  885. "[email protected]", true,
  886. MakeSecureRandomPadding(0, TUNNEL_SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES))
  887. errChannel <- err
  888. }()
  889. err := <-errChannel
  890. if err != nil {
  891. sshClient.Close()
  892. conn.Close()
  893. }
  894. return ContextError(err)
  895. }
  896. // sendStats is a helper for sending session stats to the server.
  897. func sendStats(tunnel *Tunnel) bool {
  898. // Tunnel does not have a serverContext when DisableApi is set
  899. if tunnel.serverContext == nil {
  900. return true
  901. }
  902. // Skip when tunnel is discarded
  903. if tunnel.IsDiscarded() {
  904. return true
  905. }
  906. err := tunnel.serverContext.DoStatusRequest(tunnel)
  907. if err != nil {
  908. NoticeAlert("DoStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
  909. }
  910. return err == nil
  911. }
  912. // sendUntunnelStats sends final status requests directly to Psiphon
  913. // servers after the tunnel has already failed. This is an attempt
  914. // to retain useful bytes transferred stats.
  915. func sendUntunneledStats(tunnel *Tunnel, isShutdown bool) {
  916. // Tunnel does not have a serverContext when DisableApi is set
  917. if tunnel.serverContext == nil {
  918. return
  919. }
  920. // Skip when tunnel is discarded
  921. if tunnel.IsDiscarded() {
  922. return
  923. }
  924. err := tunnel.serverContext.TryUntunneledStatusRequest(isShutdown)
  925. if err != nil {
  926. NoticeAlert("TryUntunneledStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
  927. }
  928. }
  929. // sendClientVerification is a helper for sending a client verification request
  930. // to the server.
  931. func sendClientVerification(tunnel *Tunnel, clientVerificationPayload string) bool {
  932. // Tunnel does not have a serverContext when DisableApi is set
  933. if tunnel.serverContext == nil {
  934. return true
  935. }
  936. // Skip when tunnel is discarded
  937. if tunnel.IsDiscarded() {
  938. return true
  939. }
  940. err := tunnel.serverContext.DoClientVerificationRequest(clientVerificationPayload)
  941. if err != nil {
  942. NoticeAlert("DoClientVerificationRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
  943. } else {
  944. NoticeClientVerificationRequestCompleted(tunnel.serverEntry.IpAddress)
  945. }
  946. return err == nil
  947. }