sshService.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679
  1. /*
  2. * Copyright (c) 2016, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "crypto/subtle"
  22. "encoding/json"
  23. "errors"
  24. "fmt"
  25. "io"
  26. "net"
  27. "sync"
  28. "sync/atomic"
  29. "time"
  30. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
  31. "golang.org/x/crypto/ssh"
  32. )
  33. // RunSSHServer runs an ssh server with plain SSH protocol.
  34. func RunSSHServer(config *Config, shutdownBroadcast <-chan struct{}) error {
  35. return runSSHServer(config, false, shutdownBroadcast)
  36. }
  37. // RunSSHServer runs an ssh server with Obfuscated SSH protocol.
  38. func RunObfuscatedSSHServer(config *Config, shutdownBroadcast <-chan struct{}) error {
  39. return runSSHServer(config, true, shutdownBroadcast)
  40. }
  41. // runSSHServer runs an SSH or Obfuscated SSH server. In the Obfuscated SSH case, an
  42. // ObfuscatedSSHConn is layered in front of the client TCP connection; otherwise, both
  43. // modes are identical.
  44. //
  45. // runSSHServer listens on the designated port and spawns new goroutines to handle
  46. // each client connection. It halts when shutdownBroadcast is signaled. A list of active
  47. // clients is maintained, and when halting all clients are first shutdown.
  48. //
  49. // Each client goroutine handles its own obfuscation (optional), SSH handshake, SSH
  50. // authentication, and then looping on client new channel requests. At this time, only
  51. // "direct-tcpip" channels, dynamic port fowards, are expected and supported.
  52. //
  53. // A new goroutine is spawned to handle each port forward. Each port forward tracks its
  54. // bytes transferred. Overall per-client stats for connection duration, GeoIP, number of
  55. // port forwards, and bytes transferred are tracked and logged when the client shuts down.
  56. func runSSHServer(
  57. config *Config, useObfuscation bool, shutdownBroadcast <-chan struct{}) error {
  58. privateKey, err := ssh.ParseRawPrivateKey([]byte(config.SSHPrivateKey))
  59. if err != nil {
  60. return psiphon.ContextError(err)
  61. }
  62. // TODO: use cert (ssh.NewCertSigner) for anti-fingerprint?
  63. signer, err := ssh.NewSignerFromKey(privateKey)
  64. if err != nil {
  65. return psiphon.ContextError(err)
  66. }
  67. sshServer := &sshServer{
  68. config: config,
  69. useObfuscation: useObfuscation,
  70. shutdownBroadcast: shutdownBroadcast,
  71. sshHostKey: signer,
  72. nextClientID: 1,
  73. clients: make(map[sshClientID]*sshClient),
  74. }
  75. var serverPort int
  76. if useObfuscation {
  77. serverPort = config.ObfuscatedSSHServerPort
  78. } else {
  79. serverPort = config.SSHServerPort
  80. }
  81. listener, err := net.Listen(
  82. "tcp", fmt.Sprintf("%s:%d", config.ServerIPAddress, serverPort))
  83. if err != nil {
  84. return psiphon.ContextError(err)
  85. }
  86. log.WithContextFields(
  87. LogFields{
  88. "useObfuscation": useObfuscation,
  89. "port": serverPort,
  90. }).Info("starting")
  91. err = nil
  92. errors := make(chan error)
  93. waitGroup := new(sync.WaitGroup)
  94. waitGroup.Add(1)
  95. go func() {
  96. defer waitGroup.Done()
  97. loop:
  98. for {
  99. conn, err := listener.Accept()
  100. select {
  101. case <-shutdownBroadcast:
  102. if err == nil {
  103. conn.Close()
  104. }
  105. break loop
  106. default:
  107. }
  108. if err != nil {
  109. if e, ok := err.(net.Error); ok && e.Temporary() {
  110. log.WithContextFields(LogFields{"error": err}).Error("accept failed")
  111. // Temporary error, keep running
  112. continue
  113. }
  114. select {
  115. case errors <- psiphon.ContextError(err):
  116. default:
  117. }
  118. break loop
  119. }
  120. // process each client connection concurrently
  121. go sshServer.handleClient(conn.(*net.TCPConn))
  122. }
  123. sshServer.stopClients()
  124. log.WithContextFields(
  125. LogFields{"useObfuscation": useObfuscation}).Info("stopped")
  126. }()
  127. select {
  128. case <-shutdownBroadcast:
  129. case err = <-errors:
  130. }
  131. listener.Close()
  132. waitGroup.Wait()
  133. log.WithContextFields(
  134. LogFields{"useObfuscation": useObfuscation}).Info("exiting")
  135. return err
  136. }
  137. type sshClientID uint64
  138. type sshServer struct {
  139. config *Config
  140. useObfuscation bool
  141. shutdownBroadcast <-chan struct{}
  142. sshHostKey ssh.Signer
  143. nextClientID sshClientID
  144. clientsMutex sync.Mutex
  145. stoppingClients bool
  146. clients map[sshClientID]*sshClient
  147. }
  148. func (sshServer *sshServer) registerClient(client *sshClient) (sshClientID, bool) {
  149. sshServer.clientsMutex.Lock()
  150. defer sshServer.clientsMutex.Unlock()
  151. if sshServer.stoppingClients {
  152. return 0, false
  153. }
  154. clientID := sshServer.nextClientID
  155. sshServer.nextClientID += 1
  156. sshServer.clients[clientID] = client
  157. return clientID, true
  158. }
  159. func (sshServer *sshServer) unregisterClient(clientID sshClientID) {
  160. sshServer.clientsMutex.Lock()
  161. client := sshServer.clients[clientID]
  162. delete(sshServer.clients, clientID)
  163. sshServer.clientsMutex.Unlock()
  164. if client != nil {
  165. client.stop()
  166. }
  167. }
  168. func (sshServer *sshServer) stopClients() {
  169. sshServer.clientsMutex.Lock()
  170. sshServer.stoppingClients = true
  171. sshServer.clients = make(map[sshClientID]*sshClient)
  172. sshServer.clientsMutex.Unlock()
  173. for _, client := range sshServer.clients {
  174. client.stop()
  175. }
  176. }
  177. func (sshServer *sshServer) handleClient(tcpConn *net.TCPConn) {
  178. geoIPData := GeoIPLookup(psiphon.IPAddressFromAddr(tcpConn.RemoteAddr()))
  179. sshClient := newSshClient(
  180. sshServer, geoIPData, sshServer.config.GetTrafficRules(geoIPData.Country))
  181. // Wrap the base TCP connection with an IdleTimeoutConn which will terminate
  182. // the connection if no data is received before the deadline. This timeout is
  183. // in effect for the entire duration of the SSH connection. Clients must actively
  184. // use the connection or send SSH keep alive requests to keep the connection
  185. // active.
  186. var conn net.Conn
  187. conn = psiphon.NewIdleTimeoutConn(tcpConn, SSH_CONNECTION_READ_DEADLINE, false)
  188. // Further wrap the connection in a rate limiting ThrottledConn.
  189. conn = psiphon.NewThrottledConn(
  190. conn,
  191. int64(sshClient.trafficRules.LimitDownstreamBytesPerSecond),
  192. int64(sshClient.trafficRules.LimitUpstreamBytesPerSecond))
  193. // Run the initial [obfuscated] SSH handshake in a goroutine so we can both
  194. // respect shutdownBroadcast and implement a specific handshake timeout.
  195. // The timeout is to reclaim network resources in case the handshake takes
  196. // too long.
  197. type sshNewServerConnResult struct {
  198. conn net.Conn
  199. sshConn *ssh.ServerConn
  200. channels <-chan ssh.NewChannel
  201. requests <-chan *ssh.Request
  202. err error
  203. }
  204. resultChannel := make(chan *sshNewServerConnResult, 2)
  205. if SSH_HANDSHAKE_TIMEOUT > 0 {
  206. time.AfterFunc(time.Duration(SSH_HANDSHAKE_TIMEOUT), func() {
  207. resultChannel <- &sshNewServerConnResult{err: errors.New("ssh handshake timeout")}
  208. })
  209. }
  210. go func() {
  211. result := &sshNewServerConnResult{}
  212. if sshServer.useObfuscation {
  213. result.conn, result.err = psiphon.NewObfuscatedSshConn(
  214. psiphon.OBFUSCATION_CONN_MODE_SERVER, conn, sshServer.config.ObfuscatedSSHKey)
  215. } else {
  216. result.conn = conn
  217. }
  218. if result.err == nil {
  219. sshServerConfig := &ssh.ServerConfig{
  220. PasswordCallback: sshClient.passwordCallback,
  221. AuthLogCallback: sshClient.authLogCallback,
  222. ServerVersion: sshServer.config.SSHServerVersion,
  223. }
  224. sshServerConfig.AddHostKey(sshServer.sshHostKey)
  225. result.sshConn, result.channels, result.requests, result.err =
  226. ssh.NewServerConn(result.conn, sshServerConfig)
  227. }
  228. resultChannel <- result
  229. }()
  230. var result *sshNewServerConnResult
  231. select {
  232. case result = <-resultChannel:
  233. case <-sshServer.shutdownBroadcast:
  234. // Close() will interrupt an ongoing handshake
  235. // TODO: wait for goroutine to exit before returning?
  236. conn.Close()
  237. return
  238. }
  239. if result.err != nil {
  240. conn.Close()
  241. log.WithContextFields(LogFields{"error": result.err}).Warning("handshake failed")
  242. return
  243. }
  244. sshClient.Lock()
  245. sshClient.sshConn = result.sshConn
  246. sshClient.Unlock()
  247. clientID, ok := sshServer.registerClient(sshClient)
  248. if !ok {
  249. conn.Close()
  250. log.WithContext().Warning("register failed")
  251. return
  252. }
  253. defer sshServer.unregisterClient(clientID)
  254. go ssh.DiscardRequests(result.requests)
  255. sshClient.handleChannels(result.channels)
  256. }
  257. type sshClient struct {
  258. sync.Mutex
  259. sshServer *sshServer
  260. sshConn ssh.Conn
  261. startTime time.Time
  262. geoIPData GeoIPData
  263. psiphonSessionID string
  264. udpChannel ssh.Channel
  265. trafficRules TrafficRules
  266. tcpTrafficState *trafficState
  267. udpTrafficState *trafficState
  268. channelHandlerWaitGroup *sync.WaitGroup
  269. stopBroadcast chan struct{}
  270. }
  271. type trafficState struct {
  272. bytesUp int64
  273. bytesDown int64
  274. portForwardCount int64
  275. concurrentPortForwardCount int64
  276. peakConcurrentPortForwardCount int64
  277. }
  278. func newSshClient(sshServer *sshServer, geoIPData GeoIPData, trafficRules TrafficRules) *sshClient {
  279. return &sshClient{
  280. sshServer: sshServer,
  281. startTime: time.Now(),
  282. geoIPData: geoIPData,
  283. trafficRules: trafficRules,
  284. tcpTrafficState: &trafficState{},
  285. udpTrafficState: &trafficState{},
  286. channelHandlerWaitGroup: new(sync.WaitGroup),
  287. stopBroadcast: make(chan struct{}),
  288. }
  289. }
  290. func (sshClient *sshClient) handleChannels(channels <-chan ssh.NewChannel) {
  291. for newChannel := range channels {
  292. if newChannel.ChannelType() != "direct-tcpip" {
  293. sshClient.rejectNewChannel(newChannel, ssh.Prohibited, "unknown or unsupported channel type")
  294. continue
  295. }
  296. // process each port forward concurrently
  297. sshClient.channelHandlerWaitGroup.Add(1)
  298. go sshClient.handleNewPortForwardChannel(newChannel)
  299. }
  300. }
  301. func (sshClient *sshClient) rejectNewChannel(newChannel ssh.NewChannel, reason ssh.RejectionReason, message string) {
  302. // TODO: log more details?
  303. log.WithContextFields(
  304. LogFields{
  305. "channelType": newChannel.ChannelType(),
  306. "rejectMessage": message,
  307. "rejectReason": reason,
  308. }).Warning("reject new channel")
  309. newChannel.Reject(reason, message)
  310. }
  311. func (sshClient *sshClient) handleNewPortForwardChannel(newChannel ssh.NewChannel) {
  312. defer sshClient.channelHandlerWaitGroup.Done()
  313. // http://tools.ietf.org/html/rfc4254#section-7.2
  314. var directTcpipExtraData struct {
  315. HostToConnect string
  316. PortToConnect uint32
  317. OriginatorIPAddress string
  318. OriginatorPort uint32
  319. }
  320. err := ssh.Unmarshal(newChannel.ExtraData(), &directTcpipExtraData)
  321. if err != nil {
  322. sshClient.rejectNewChannel(newChannel, ssh.Prohibited, "invalid extra data")
  323. return
  324. }
  325. // Intercept TCP port forwards to a specified udpgw server and handle directly.
  326. // TODO: also support UDP explicitly, e.g. with a custom "direct-udp" channel type?
  327. isUDPChannel := sshClient.sshServer.config.UdpgwServerAddress != "" &&
  328. sshClient.sshServer.config.UdpgwServerAddress ==
  329. fmt.Sprintf("%s:%d",
  330. directTcpipExtraData.HostToConnect,
  331. directTcpipExtraData.PortToConnect)
  332. if isUDPChannel {
  333. sshClient.handleUDPChannel(newChannel)
  334. } else {
  335. sshClient.handleTCPChannel(
  336. directTcpipExtraData.HostToConnect, int(directTcpipExtraData.PortToConnect), newChannel)
  337. }
  338. }
  339. func (sshClient *sshClient) isPortForwardPermitted(
  340. port int, allowPorts []int, denyPorts []int) bool {
  341. // TODO: faster lookup?
  342. if allowPorts != nil {
  343. for _, allowPort := range allowPorts {
  344. if port == allowPort {
  345. return true
  346. }
  347. }
  348. return false
  349. }
  350. if denyPorts != nil {
  351. for _, denyPort := range denyPorts {
  352. if port == denyPort {
  353. return false
  354. }
  355. }
  356. }
  357. return true
  358. }
  359. func (sshClient *sshClient) isPortForwardLimitExceeded(
  360. state *trafficState, maxPortForwardCount int) bool {
  361. limitExceeded := false
  362. if maxPortForwardCount > 0 {
  363. sshClient.Lock()
  364. limitExceeded = state.portForwardCount >= int64(maxPortForwardCount)
  365. sshClient.Unlock()
  366. }
  367. return limitExceeded
  368. }
  369. func (sshClient *sshClient) openedPortForward(
  370. state *trafficState) {
  371. sshClient.Lock()
  372. state.portForwardCount += 1
  373. state.concurrentPortForwardCount += 1
  374. if state.concurrentPortForwardCount > state.peakConcurrentPortForwardCount {
  375. state.peakConcurrentPortForwardCount = state.concurrentPortForwardCount
  376. }
  377. sshClient.Unlock()
  378. }
  379. func (sshClient *sshClient) closedPortForward(
  380. state *trafficState, bytesUp, bytesDown int64) {
  381. sshClient.Lock()
  382. state.concurrentPortForwardCount -= 1
  383. state.bytesUp += bytesUp
  384. state.bytesDown += bytesDown
  385. sshClient.Unlock()
  386. }
  387. func (sshClient *sshClient) handleTCPChannel(
  388. hostToConnect string,
  389. portToConnect int,
  390. newChannel ssh.NewChannel) {
  391. if !sshClient.isPortForwardPermitted(
  392. portToConnect,
  393. sshClient.trafficRules.AllowTCPPorts,
  394. sshClient.trafficRules.DenyTCPPorts) {
  395. sshClient.rejectNewChannel(
  396. newChannel, ssh.Prohibited, "port forward not permitted")
  397. return
  398. }
  399. var bytesUp, bytesDown int64
  400. sshClient.openedPortForward(sshClient.tcpTrafficState)
  401. defer sshClient.closedPortForward(
  402. sshClient.tcpTrafficState, atomic.LoadInt64(&bytesUp), atomic.LoadInt64(&bytesDown))
  403. // TOCTOU note: important to increment the port forward count (via
  404. // openPortForward) _before_ checking isPortForwardLimitExceeded
  405. // otherwise, the client could potentially consume excess resources
  406. // by initiating many port forwards concurrently.
  407. // TODO: close LRU connection (after successful Dial) instead of
  408. // rejecting new connection?
  409. if sshClient.isPortForwardLimitExceeded(
  410. sshClient.tcpTrafficState,
  411. sshClient.trafficRules.MaxTCPPortForwardCount) {
  412. sshClient.rejectNewChannel(
  413. newChannel, ssh.Prohibited, "maximum port forward limit exceeded")
  414. return
  415. }
  416. remoteAddr := fmt.Sprintf("%s:%d", hostToConnect, portToConnect)
  417. log.WithContextFields(LogFields{"remoteAddr": remoteAddr}).Debug("dialing")
  418. type dialTcpResult struct {
  419. conn net.Conn
  420. err error
  421. }
  422. resultChannel := make(chan *dialTcpResult, 1)
  423. go func() {
  424. // TODO: on EADDRNOTAVAIL, temporarily suspend new clients
  425. // TODO: IPv6 support
  426. conn, err := net.DialTimeout(
  427. "tcp4", remoteAddr, SSH_TCP_PORT_FORWARD_DIAL_TIMEOUT)
  428. resultChannel <- &dialTcpResult{conn, err}
  429. }()
  430. var result *dialTcpResult
  431. select {
  432. case result = <-resultChannel:
  433. case <-sshClient.stopBroadcast:
  434. // Note: may leave dial in progress
  435. return
  436. }
  437. if result.err != nil {
  438. sshClient.rejectNewChannel(newChannel, ssh.ConnectionFailed, result.err.Error())
  439. return
  440. }
  441. fwdConn := result.conn
  442. defer fwdConn.Close()
  443. fwdChannel, requests, err := newChannel.Accept()
  444. if err != nil {
  445. log.WithContextFields(LogFields{"error": err}).Warning("accept new channel failed")
  446. return
  447. }
  448. go ssh.DiscardRequests(requests)
  449. defer fwdChannel.Close()
  450. log.WithContextFields(LogFields{"remoteAddr": remoteAddr}).Debug("relaying")
  451. // When idle port forward traffic rules are in place, wrap fwdConn
  452. // in an IdleTimeoutConn configured to reset idle on writes as well
  453. // as read. This ensures the port forward idle timeout only happens
  454. // when both upstream and downstream directions are are idle.
  455. if sshClient.trafficRules.IdlePortForwardTimeoutMilliseconds > 0 {
  456. fwdConn = psiphon.NewIdleTimeoutConn(
  457. fwdConn,
  458. time.Duration(sshClient.trafficRules.IdlePortForwardTimeoutMilliseconds)*time.Millisecond,
  459. true)
  460. }
  461. // relay channel to forwarded connection
  462. // TODO: relay errors to fwdChannel.Stderr()?
  463. // TODO: use a low-memory io.Copy?
  464. relayWaitGroup := new(sync.WaitGroup)
  465. relayWaitGroup.Add(1)
  466. go func() {
  467. defer relayWaitGroup.Done()
  468. bytes, err := io.Copy(fwdChannel, fwdConn)
  469. atomic.AddInt64(&bytesDown, bytes)
  470. if err != nil && err != io.EOF {
  471. log.WithContextFields(LogFields{"error": err}).Warning("downstream TCP relay failed")
  472. }
  473. }()
  474. bytes, err := io.Copy(fwdConn, fwdChannel)
  475. atomic.AddInt64(&bytesUp, bytes)
  476. if err != nil && err != io.EOF {
  477. log.WithContextFields(LogFields{"error": err}).Warning("upstream TCP relay failed")
  478. }
  479. // Shutdown special case: fwdChannel will be closed and return EOF when
  480. // the SSH connection is closed, but we need to explicitly close fwdConn
  481. // to interrupt the downstream io.Copy, which may be blocked on a
  482. // fwdConn.Read().
  483. fwdConn.Close()
  484. relayWaitGroup.Wait()
  485. log.WithContextFields(
  486. LogFields{
  487. "remoteAddr": remoteAddr,
  488. "bytesUp": atomic.LoadInt64(&bytesUp),
  489. "bytesDown": atomic.LoadInt64(&bytesDown)}).Debug("exiting")
  490. }
  491. func (sshClient *sshClient) passwordCallback(conn ssh.ConnMetadata, password []byte) (*ssh.Permissions, error) {
  492. var sshPasswordPayload struct {
  493. SessionId string `json:"SessionId"`
  494. SshPassword string `json:"SshPassword"`
  495. }
  496. err := json.Unmarshal(password, &sshPasswordPayload)
  497. if err != nil {
  498. return nil, psiphon.ContextError(fmt.Errorf("invalid password payload for %q", conn.User()))
  499. }
  500. userOk := (subtle.ConstantTimeCompare(
  501. []byte(conn.User()), []byte(sshClient.sshServer.config.SSHUserName)) == 1)
  502. passwordOk := (subtle.ConstantTimeCompare(
  503. []byte(sshPasswordPayload.SshPassword), []byte(sshClient.sshServer.config.SSHPassword)) == 1)
  504. if !userOk || !passwordOk {
  505. return nil, psiphon.ContextError(fmt.Errorf("invalid password for %q", conn.User()))
  506. }
  507. psiphonSessionID := sshPasswordPayload.SessionId
  508. sshClient.Lock()
  509. sshClient.psiphonSessionID = psiphonSessionID
  510. geoIPData := sshClient.geoIPData
  511. sshClient.Unlock()
  512. if sshClient.sshServer.config.UseRedis() {
  513. err = UpdateRedisForLegacyPsiWeb(psiphonSessionID, geoIPData)
  514. if err != nil {
  515. log.WithContextFields(LogFields{
  516. "psiphonSessionID": psiphonSessionID,
  517. "error": err}).Warning("UpdateRedisForLegacyPsiWeb failed")
  518. // Allow the connection to proceed; legacy psi_web will not get accurate GeoIP values.
  519. }
  520. }
  521. return nil, nil
  522. }
  523. func (sshClient *sshClient) authLogCallback(conn ssh.ConnMetadata, method string, err error) {
  524. if err != nil {
  525. if sshClient.sshServer.config.UseFail2Ban() {
  526. clientIPAddress := psiphon.IPAddressFromAddr(conn.RemoteAddr())
  527. if clientIPAddress != "" {
  528. LogFail2Ban(clientIPAddress)
  529. }
  530. }
  531. log.WithContextFields(LogFields{"error": err, "method": method}).Warning("authentication failed")
  532. } else {
  533. log.WithContextFields(LogFields{"error": err, "method": method}).Info("authentication success")
  534. }
  535. }
  536. func (sshClient *sshClient) stop() {
  537. sshClient.sshConn.Close()
  538. sshClient.sshConn.Wait()
  539. close(sshClient.stopBroadcast)
  540. sshClient.channelHandlerWaitGroup.Wait()
  541. sshClient.Lock()
  542. log.WithContextFields(
  543. LogFields{
  544. "startTime": sshClient.startTime,
  545. "duration": time.Now().Sub(sshClient.startTime),
  546. "psiphonSessionID": sshClient.psiphonSessionID,
  547. "country": sshClient.geoIPData.Country,
  548. "city": sshClient.geoIPData.City,
  549. "ISP": sshClient.geoIPData.ISP,
  550. "bytesUpTCP": sshClient.tcpTrafficState.bytesUp,
  551. "bytesDownTCP": sshClient.tcpTrafficState.bytesDown,
  552. "portForwardCountTCP": sshClient.tcpTrafficState.portForwardCount,
  553. "peakConcurrentPortForwardCountTCP": sshClient.tcpTrafficState.peakConcurrentPortForwardCount,
  554. "bytesUpUDP": sshClient.udpTrafficState.bytesUp,
  555. "bytesDownUDP": sshClient.udpTrafficState.bytesDown,
  556. "portForwardCountUDP": sshClient.udpTrafficState.portForwardCount,
  557. "peakConcurrentPortForwardCountUDP": sshClient.udpTrafficState.peakConcurrentPortForwardCount,
  558. }).Info("tunnel closed")
  559. sshClient.Unlock()
  560. }