tunnelServer.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002
  1. /*
  2. * Copyright (c) 2016, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "crypto/subtle"
  22. "encoding/json"
  23. "errors"
  24. "fmt"
  25. "io"
  26. "net"
  27. "sync"
  28. "sync/atomic"
  29. "time"
  30. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
  31. "golang.org/x/crypto/ssh"
  32. )
  33. // TunnelServer is the main server that accepts Psiphon client
  34. // connections, via various obfuscation protocols, and provides
  35. // port forwarding (TCP and UDP) services to the Psiphon client.
  36. // At its core, TunnelServer is an SSH server. SSH is the base
  37. // protocol that provides port forward multiplexing, and transport
  38. // security. Layered on top of SSH, optionally, is Obfuscated SSH
  39. // and meek protocols, which provide further circumvention
  40. // capabilities.
  41. type TunnelServer struct {
  42. runWaitGroup *sync.WaitGroup
  43. listenerError chan error
  44. shutdownBroadcast <-chan struct{}
  45. sshServer *sshServer
  46. }
  47. // NewTunnelServer initializes a new tunnel server.
  48. func NewTunnelServer(
  49. support *SupportServices,
  50. shutdownBroadcast <-chan struct{}) (*TunnelServer, error) {
  51. sshServer, err := newSSHServer(support, shutdownBroadcast)
  52. if err != nil {
  53. return nil, psiphon.ContextError(err)
  54. }
  55. return &TunnelServer{
  56. runWaitGroup: new(sync.WaitGroup),
  57. listenerError: make(chan error),
  58. shutdownBroadcast: shutdownBroadcast,
  59. sshServer: sshServer,
  60. }, nil
  61. }
  62. // GetLoadStats returns load stats for the tunnel server. The stats are
  63. // broken down by protocol ("SSH", "OSSH", etc.) and type. Types of stats
  64. // include current connected client count, total number of current port
  65. // forwards.
  66. func (server *TunnelServer) GetLoadStats() map[string]map[string]int64 {
  67. return server.sshServer.getLoadStats()
  68. }
  69. // Run runs the tunnel server; this function blocks while running a selection of
  70. // listeners that handle connection using various obfuscation protocols.
  71. //
  72. // Run listens on each designated tunnel port and spawns new goroutines to handle
  73. // each client connection. It halts when shutdownBroadcast is signaled. A list of active
  74. // clients is maintained, and when halting all clients are cleanly shutdown.
  75. //
  76. // Each client goroutine handles its own obfuscation (optional), SSH handshake, SSH
  77. // authentication, and then looping on client new channel requests. "direct-tcpip"
  78. // channels, dynamic port fowards, are supported. When the UDPInterceptUdpgwServerAddress
  79. // config parameter is configured, UDP port forwards over a TCP stream, following
  80. // the udpgw protocol, are handled.
  81. //
  82. // A new goroutine is spawned to handle each port forward for each client. Each port
  83. // forward tracks its bytes transferred. Overall per-client stats for connection duration,
  84. // GeoIP, number of port forwards, and bytes transferred are tracked and logged when the
  85. // client shuts down.
  86. func (server *TunnelServer) Run() error {
  87. type sshListener struct {
  88. net.Listener
  89. localAddress string
  90. tunnelProtocol string
  91. }
  92. // TODO: should TunnelServer hold its own support pointer?
  93. support := server.sshServer.support
  94. // First bind all listeners; once all are successful,
  95. // start accepting connections on each.
  96. var listeners []*sshListener
  97. for tunnelProtocol, listenPort := range support.Config.TunnelProtocolPorts {
  98. localAddress := fmt.Sprintf(
  99. "%s:%d", support.Config.ServerIPAddress, listenPort)
  100. listener, err := net.Listen("tcp", localAddress)
  101. if err != nil {
  102. for _, existingListener := range listeners {
  103. existingListener.Listener.Close()
  104. }
  105. return psiphon.ContextError(err)
  106. }
  107. log.WithContextFields(
  108. LogFields{
  109. "localAddress": localAddress,
  110. "tunnelProtocol": tunnelProtocol,
  111. }).Info("listening")
  112. listeners = append(
  113. listeners,
  114. &sshListener{
  115. Listener: listener,
  116. localAddress: localAddress,
  117. tunnelProtocol: tunnelProtocol,
  118. })
  119. }
  120. for _, listener := range listeners {
  121. server.runWaitGroup.Add(1)
  122. go func(listener *sshListener) {
  123. defer server.runWaitGroup.Done()
  124. log.WithContextFields(
  125. LogFields{
  126. "localAddress": listener.localAddress,
  127. "tunnelProtocol": listener.tunnelProtocol,
  128. }).Info("running")
  129. server.sshServer.runListener(
  130. listener.Listener,
  131. server.listenerError,
  132. listener.tunnelProtocol)
  133. log.WithContextFields(
  134. LogFields{
  135. "localAddress": listener.localAddress,
  136. "tunnelProtocol": listener.tunnelProtocol,
  137. }).Info("stopped")
  138. }(listener)
  139. }
  140. var err error
  141. select {
  142. case <-server.shutdownBroadcast:
  143. case err = <-server.listenerError:
  144. }
  145. for _, listener := range listeners {
  146. listener.Close()
  147. }
  148. server.sshServer.stopClients()
  149. server.runWaitGroup.Wait()
  150. log.WithContext().Info("stopped")
  151. return err
  152. }
  153. type sshClientID uint64
  154. type sshServer struct {
  155. support *SupportServices
  156. shutdownBroadcast <-chan struct{}
  157. sshHostKey ssh.Signer
  158. nextClientID sshClientID
  159. clientsMutex sync.Mutex
  160. stoppingClients bool
  161. acceptedClientCounts map[string]int64
  162. clients map[sshClientID]*sshClient
  163. }
  164. func newSSHServer(
  165. support *SupportServices,
  166. shutdownBroadcast <-chan struct{}) (*sshServer, error) {
  167. privateKey, err := ssh.ParseRawPrivateKey([]byte(support.Config.SSHPrivateKey))
  168. if err != nil {
  169. return nil, psiphon.ContextError(err)
  170. }
  171. // TODO: use cert (ssh.NewCertSigner) for anti-fingerprint?
  172. signer, err := ssh.NewSignerFromKey(privateKey)
  173. if err != nil {
  174. return nil, psiphon.ContextError(err)
  175. }
  176. return &sshServer{
  177. support: support,
  178. shutdownBroadcast: shutdownBroadcast,
  179. sshHostKey: signer,
  180. nextClientID: 1,
  181. acceptedClientCounts: make(map[string]int64),
  182. clients: make(map[sshClientID]*sshClient),
  183. }, nil
  184. }
  185. // runListener is intended to run an a goroutine; it blocks
  186. // running a particular listener. If an unrecoverable error
  187. // occurs, it will send the error to the listenerError channel.
  188. func (sshServer *sshServer) runListener(
  189. listener net.Listener,
  190. listenerError chan<- error,
  191. tunnelProtocol string) {
  192. handleClient := func(clientConn net.Conn) {
  193. // process each client connection concurrently
  194. go sshServer.handleClient(tunnelProtocol, clientConn)
  195. }
  196. // Note: when exiting due to a unrecoverable error, be sure
  197. // to try to send the error to listenerError so that the outer
  198. // TunnelServer.Run will properly shut down instead of remaining
  199. // running.
  200. if psiphon.TunnelProtocolUsesMeekHTTP(tunnelProtocol) ||
  201. psiphon.TunnelProtocolUsesMeekHTTPS(tunnelProtocol) {
  202. meekServer, err := NewMeekServer(
  203. sshServer.support,
  204. listener,
  205. psiphon.TunnelProtocolUsesMeekHTTPS(tunnelProtocol),
  206. handleClient,
  207. sshServer.shutdownBroadcast)
  208. if err != nil {
  209. select {
  210. case listenerError <- psiphon.ContextError(err):
  211. default:
  212. }
  213. return
  214. }
  215. meekServer.Run()
  216. } else {
  217. for {
  218. conn, err := listener.Accept()
  219. select {
  220. case <-sshServer.shutdownBroadcast:
  221. if err == nil {
  222. conn.Close()
  223. }
  224. return
  225. default:
  226. }
  227. if err != nil {
  228. if e, ok := err.(net.Error); ok && e.Temporary() {
  229. log.WithContextFields(LogFields{"error": err}).Error("accept failed")
  230. // Temporary error, keep running
  231. continue
  232. }
  233. select {
  234. case listenerError <- psiphon.ContextError(err):
  235. default:
  236. }
  237. return
  238. }
  239. handleClient(conn)
  240. }
  241. }
  242. }
  243. // An accepted client has completed a direct TCP or meek connection and has a net.Conn. Registration
  244. // is for tracking the number of connections.
  245. func (sshServer *sshServer) registerAcceptedClient(tunnelProtocol string) {
  246. sshServer.clientsMutex.Lock()
  247. defer sshServer.clientsMutex.Unlock()
  248. sshServer.acceptedClientCounts[tunnelProtocol] += 1
  249. }
  250. func (sshServer *sshServer) unregisterAcceptedClient(tunnelProtocol string) {
  251. sshServer.clientsMutex.Lock()
  252. defer sshServer.clientsMutex.Unlock()
  253. sshServer.acceptedClientCounts[tunnelProtocol] -= 1
  254. }
  255. // An established client has completed its SSH handshake and has a ssh.Conn. Registration is
  256. // for tracking the number of fully established clients and for maintaining a list of running
  257. // clients (for stopping at shutdown time).
  258. func (sshServer *sshServer) registerEstablishedClient(client *sshClient) (sshClientID, bool) {
  259. sshServer.clientsMutex.Lock()
  260. defer sshServer.clientsMutex.Unlock()
  261. if sshServer.stoppingClients {
  262. return 0, false
  263. }
  264. clientID := sshServer.nextClientID
  265. sshServer.nextClientID += 1
  266. sshServer.clients[clientID] = client
  267. return clientID, true
  268. }
  269. func (sshServer *sshServer) unregisterEstablishedClient(clientID sshClientID) {
  270. sshServer.clientsMutex.Lock()
  271. client := sshServer.clients[clientID]
  272. delete(sshServer.clients, clientID)
  273. sshServer.clientsMutex.Unlock()
  274. if client != nil {
  275. client.stop()
  276. }
  277. }
  278. func (sshServer *sshServer) getLoadStats() map[string]map[string]int64 {
  279. sshServer.clientsMutex.Lock()
  280. defer sshServer.clientsMutex.Unlock()
  281. loadStats := make(map[string]map[string]int64)
  282. // Explicitly populate with zeros to get 0 counts in log messages derived from getLoadStats()
  283. for tunnelProtocol, _ := range sshServer.support.Config.TunnelProtocolPorts {
  284. loadStats[tunnelProtocol] = make(map[string]int64)
  285. loadStats[tunnelProtocol]["AcceptedClients"] = 0
  286. loadStats[tunnelProtocol]["EstablishedClients"] = 0
  287. loadStats[tunnelProtocol]["TCPPortForwards"] = 0
  288. loadStats[tunnelProtocol]["TotalTCPPortForwards"] = 0
  289. loadStats[tunnelProtocol]["UDPPortForwards"] = 0
  290. loadStats[tunnelProtocol]["TotalUDPPortForwards"] = 0
  291. }
  292. // Note: as currently tracked/counted, each established client is also an accepted client
  293. for tunnelProtocol, acceptedClientCount := range sshServer.acceptedClientCounts {
  294. loadStats[tunnelProtocol]["AcceptedClients"] = acceptedClientCount
  295. }
  296. for _, client := range sshServer.clients {
  297. // Note: can't sum trafficState.peakConcurrentPortForwardCount to get a global peak
  298. loadStats[client.tunnelProtocol]["EstablishedClients"] += 1
  299. client.Lock()
  300. loadStats[client.tunnelProtocol]["TCPPortForwards"] += client.tcpTrafficState.concurrentPortForwardCount
  301. loadStats[client.tunnelProtocol]["TotalTCPPortForwards"] += client.tcpTrafficState.totalPortForwardCount
  302. loadStats[client.tunnelProtocol]["UDPPortForwards"] += client.udpTrafficState.concurrentPortForwardCount
  303. loadStats[client.tunnelProtocol]["TotalUDPPortForwards"] += client.udpTrafficState.totalPortForwardCount
  304. client.Unlock()
  305. }
  306. return loadStats
  307. }
  308. func (sshServer *sshServer) stopClients() {
  309. sshServer.clientsMutex.Lock()
  310. sshServer.stoppingClients = true
  311. clients := sshServer.clients
  312. sshServer.clients = make(map[sshClientID]*sshClient)
  313. sshServer.clientsMutex.Unlock()
  314. for _, client := range clients {
  315. client.stop()
  316. }
  317. }
  318. func (sshServer *sshServer) handleClient(tunnelProtocol string, clientConn net.Conn) {
  319. sshServer.registerAcceptedClient(tunnelProtocol)
  320. defer sshServer.unregisterAcceptedClient(tunnelProtocol)
  321. geoIPData := sshServer.support.GeoIPService.Lookup(
  322. psiphon.IPAddressFromAddr(clientConn.RemoteAddr()))
  323. // TODO: apply reload of TrafficRulesSet to existing clients
  324. sshClient := newSshClient(
  325. sshServer,
  326. tunnelProtocol,
  327. geoIPData,
  328. sshServer.support.TrafficRulesSet.GetTrafficRules(geoIPData.Country))
  329. // Wrap the base client connection with an ActivityMonitoredConn which will
  330. // terminate the connection if no data is received before the deadline. This
  331. // timeout is in effect for the entire duration of the SSH connection. Clients
  332. // must actively use the connection or send SSH keep alive requests to keep
  333. // the connection active.
  334. activityConn := psiphon.NewActivityMonitoredConn(
  335. clientConn,
  336. SSH_CONNECTION_READ_DEADLINE,
  337. false,
  338. nil)
  339. clientConn = activityConn
  340. // Further wrap the connection in a rate limiting ThrottledConn.
  341. rateLimits := sshClient.trafficRules.GetRateLimits(tunnelProtocol)
  342. clientConn = psiphon.NewThrottledConn(
  343. clientConn,
  344. rateLimits.DownstreamUnlimitedBytes,
  345. int64(rateLimits.DownstreamBytesPerSecond),
  346. rateLimits.UpstreamUnlimitedBytes,
  347. int64(rateLimits.UpstreamBytesPerSecond))
  348. // Run the initial [obfuscated] SSH handshake in a goroutine so we can both
  349. // respect shutdownBroadcast and implement a specific handshake timeout.
  350. // The timeout is to reclaim network resources in case the handshake takes
  351. // too long.
  352. type sshNewServerConnResult struct {
  353. conn net.Conn
  354. sshConn *ssh.ServerConn
  355. channels <-chan ssh.NewChannel
  356. requests <-chan *ssh.Request
  357. err error
  358. }
  359. resultChannel := make(chan *sshNewServerConnResult, 2)
  360. if SSH_HANDSHAKE_TIMEOUT > 0 {
  361. time.AfterFunc(time.Duration(SSH_HANDSHAKE_TIMEOUT), func() {
  362. resultChannel <- &sshNewServerConnResult{err: errors.New("ssh handshake timeout")}
  363. })
  364. }
  365. go func(conn net.Conn) {
  366. sshServerConfig := &ssh.ServerConfig{
  367. PasswordCallback: sshClient.passwordCallback,
  368. AuthLogCallback: sshClient.authLogCallback,
  369. ServerVersion: sshServer.support.Config.SSHServerVersion,
  370. }
  371. sshServerConfig.AddHostKey(sshServer.sshHostKey)
  372. result := &sshNewServerConnResult{}
  373. // Wrap the connection in an SSH deobfuscator when required.
  374. if psiphon.TunnelProtocolUsesObfuscatedSSH(tunnelProtocol) {
  375. // Note: NewObfuscatedSshConn blocks on network I/O
  376. // TODO: ensure this won't block shutdown
  377. conn, result.err = psiphon.NewObfuscatedSshConn(
  378. psiphon.OBFUSCATION_CONN_MODE_SERVER,
  379. clientConn,
  380. sshServer.support.Config.ObfuscatedSSHKey)
  381. if result.err != nil {
  382. result.err = psiphon.ContextError(result.err)
  383. }
  384. }
  385. if result.err == nil {
  386. result.sshConn, result.channels, result.requests, result.err =
  387. ssh.NewServerConn(conn, sshServerConfig)
  388. }
  389. resultChannel <- result
  390. }(clientConn)
  391. var result *sshNewServerConnResult
  392. select {
  393. case result = <-resultChannel:
  394. case <-sshServer.shutdownBroadcast:
  395. // Close() will interrupt an ongoing handshake
  396. // TODO: wait for goroutine to exit before returning?
  397. clientConn.Close()
  398. return
  399. }
  400. if result.err != nil {
  401. clientConn.Close()
  402. // This is a Debug log due to noise. The handshake often fails due to I/O
  403. // errors as clients frequently interrupt connections in progress when
  404. // client-side load balancing completes a connection to a different server.
  405. log.WithContextFields(LogFields{"error": result.err}).Debug("handshake failed")
  406. return
  407. }
  408. sshClient.Lock()
  409. sshClient.sshConn = result.sshConn
  410. sshClient.activityConn = activityConn
  411. sshClient.Unlock()
  412. clientID, ok := sshServer.registerEstablishedClient(sshClient)
  413. if !ok {
  414. clientConn.Close()
  415. log.WithContext().Warning("register failed")
  416. return
  417. }
  418. defer sshServer.unregisterEstablishedClient(clientID)
  419. sshClient.runClient(result.channels, result.requests)
  420. // Note: sshServer.unregisterClient calls sshClient.Close(),
  421. // which also closes underlying transport Conn.
  422. }
  423. type sshClient struct {
  424. sync.Mutex
  425. sshServer *sshServer
  426. tunnelProtocol string
  427. sshConn ssh.Conn
  428. activityConn *psiphon.ActivityMonitoredConn
  429. geoIPData GeoIPData
  430. psiphonSessionID string
  431. udpChannel ssh.Channel
  432. trafficRules TrafficRules
  433. tcpTrafficState *trafficState
  434. udpTrafficState *trafficState
  435. channelHandlerWaitGroup *sync.WaitGroup
  436. tcpPortForwardLRU *psiphon.LRUConns
  437. stopBroadcast chan struct{}
  438. }
  439. type trafficState struct {
  440. bytesUp int64
  441. bytesDown int64
  442. concurrentPortForwardCount int64
  443. peakConcurrentPortForwardCount int64
  444. totalPortForwardCount int64
  445. }
  446. func newSshClient(
  447. sshServer *sshServer, tunnelProtocol string, geoIPData GeoIPData, trafficRules TrafficRules) *sshClient {
  448. return &sshClient{
  449. sshServer: sshServer,
  450. tunnelProtocol: tunnelProtocol,
  451. geoIPData: geoIPData,
  452. trafficRules: trafficRules,
  453. tcpTrafficState: &trafficState{},
  454. udpTrafficState: &trafficState{},
  455. channelHandlerWaitGroup: new(sync.WaitGroup),
  456. tcpPortForwardLRU: psiphon.NewLRUConns(),
  457. stopBroadcast: make(chan struct{}),
  458. }
  459. }
  460. func (sshClient *sshClient) passwordCallback(conn ssh.ConnMetadata, password []byte) (*ssh.Permissions, error) {
  461. var sshPasswordPayload struct {
  462. SessionId string `json:"SessionId"`
  463. SshPassword string `json:"SshPassword"`
  464. }
  465. err := json.Unmarshal(password, &sshPasswordPayload)
  466. if err != nil {
  467. // Backwards compatibility case: instead of a JSON payload, older clients
  468. // send the hex encoded session ID prepended to the SSH password.
  469. // Note: there's an even older case where clients don't send any session ID,
  470. // but that's no longer supported.
  471. if len(password) == 2*psiphon.PSIPHON_API_CLIENT_SESSION_ID_LENGTH+2*SSH_PASSWORD_BYTE_LENGTH {
  472. sshPasswordPayload.SessionId = string(password[0 : 2*psiphon.PSIPHON_API_CLIENT_SESSION_ID_LENGTH])
  473. sshPasswordPayload.SshPassword = string(password[2*psiphon.PSIPHON_API_CLIENT_SESSION_ID_LENGTH : len(password)])
  474. } else {
  475. return nil, psiphon.ContextError(fmt.Errorf("invalid password payload for %q", conn.User()))
  476. }
  477. }
  478. if !isHexDigits(sshClient.sshServer.support, sshPasswordPayload.SessionId) {
  479. return nil, psiphon.ContextError(fmt.Errorf("invalid session ID for %q", conn.User()))
  480. }
  481. userOk := (subtle.ConstantTimeCompare(
  482. []byte(conn.User()), []byte(sshClient.sshServer.support.Config.SSHUserName)) == 1)
  483. passwordOk := (subtle.ConstantTimeCompare(
  484. []byte(sshPasswordPayload.SshPassword), []byte(sshClient.sshServer.support.Config.SSHPassword)) == 1)
  485. if !userOk || !passwordOk {
  486. return nil, psiphon.ContextError(fmt.Errorf("invalid password for %q", conn.User()))
  487. }
  488. psiphonSessionID := sshPasswordPayload.SessionId
  489. sshClient.Lock()
  490. sshClient.psiphonSessionID = psiphonSessionID
  491. geoIPData := sshClient.geoIPData
  492. sshClient.Unlock()
  493. // Store the GeoIP data associated with the session ID. This makes the GeoIP data
  494. // available to the web server for web transport Psiphon API requests.
  495. sshClient.sshServer.support.GeoIPService.SetSessionCache(
  496. psiphonSessionID, geoIPData)
  497. return nil, nil
  498. }
  499. func (sshClient *sshClient) authLogCallback(conn ssh.ConnMetadata, method string, err error) {
  500. if err != nil {
  501. if method == "none" && err.Error() == "no auth passed yet" {
  502. // In this case, the callback invocation is noise from auth negotiation
  503. return
  504. }
  505. logFields := LogFields{"error": err, "method": method}
  506. if sshClient.sshServer.support.Config.UseFail2Ban() {
  507. clientIPAddress := psiphon.IPAddressFromAddr(conn.RemoteAddr())
  508. if clientIPAddress != "" {
  509. logFields["fail2ban"] = fmt.Sprintf(
  510. sshClient.sshServer.support.Config.Fail2BanFormat, clientIPAddress)
  511. }
  512. }
  513. log.WithContextFields(logFields).Error("authentication failed")
  514. } else {
  515. log.WithContextFields(LogFields{"error": err, "method": method}).Debug("authentication success")
  516. }
  517. }
  518. func (sshClient *sshClient) stop() {
  519. sshClient.sshConn.Close()
  520. sshClient.sshConn.Wait()
  521. close(sshClient.stopBroadcast)
  522. sshClient.channelHandlerWaitGroup.Wait()
  523. // Note: reporting duration based on last confirmed data transfer, which
  524. // is reads for sshClient.activityConn.GetActiveDuration(), and not
  525. // connection closing is important for protocols such as meek. For
  526. // meek, the connection remains open until the HTTP session expires,
  527. // which may be some time after the tunnel has closed. (The meek
  528. // protocol has no allowance for signalling payload EOF, and even if
  529. // it did the client may not have the opportunity to send a final
  530. // request with an EOF flag set.)
  531. sshClient.Lock()
  532. log.WithContextFields(
  533. LogFields{
  534. "startTime": sshClient.activityConn.GetStartTime(),
  535. "duration": sshClient.activityConn.GetActiveDuration(),
  536. "psiphonSessionID": sshClient.psiphonSessionID,
  537. "country": sshClient.geoIPData.Country,
  538. "city": sshClient.geoIPData.City,
  539. "ISP": sshClient.geoIPData.ISP,
  540. "bytesUpTCP": sshClient.tcpTrafficState.bytesUp,
  541. "bytesDownTCP": sshClient.tcpTrafficState.bytesDown,
  542. "peakConcurrentPortForwardCountTCP": sshClient.tcpTrafficState.peakConcurrentPortForwardCount,
  543. "totalPortForwardCountTCP": sshClient.tcpTrafficState.totalPortForwardCount,
  544. "bytesUpUDP": sshClient.udpTrafficState.bytesUp,
  545. "bytesDownUDP": sshClient.udpTrafficState.bytesDown,
  546. "peakConcurrentPortForwardCountUDP": sshClient.udpTrafficState.peakConcurrentPortForwardCount,
  547. "totalPortForwardCountUDP": sshClient.udpTrafficState.totalPortForwardCount,
  548. }).Info("tunnel closed")
  549. sshClient.Unlock()
  550. }
  551. // runClient handles/dispatches new channel and new requests from the client.
  552. // When the SSH client connection closes, both the channels and requests channels
  553. // will close and runClient will exit.
  554. func (sshClient *sshClient) runClient(
  555. channels <-chan ssh.NewChannel, requests <-chan *ssh.Request) {
  556. requestsWaitGroup := new(sync.WaitGroup)
  557. requestsWaitGroup.Add(1)
  558. go func() {
  559. defer requestsWaitGroup.Done()
  560. for request := range requests {
  561. // Discard keepalive requests.
  562. if request.Type == "[email protected]" && request.WantReply == false {
  563. continue
  564. }
  565. // Requests are processed serially; responses must be sent in request order.
  566. responsePayload, err := sshAPIRequestHandler(
  567. sshClient.sshServer.support,
  568. sshClient.geoIPData,
  569. request.Type,
  570. request.Payload)
  571. if err == nil {
  572. err = request.Reply(true, responsePayload)
  573. } else {
  574. log.WithContextFields(LogFields{"error": err}).Warning("request failed")
  575. err = request.Reply(false, nil)
  576. }
  577. if err != nil {
  578. log.WithContextFields(LogFields{"error": err}).Warning("response failed")
  579. }
  580. }
  581. }()
  582. for newChannel := range channels {
  583. if newChannel.ChannelType() != "direct-tcpip" {
  584. sshClient.rejectNewChannel(newChannel, ssh.Prohibited, "unknown or unsupported channel type")
  585. continue
  586. }
  587. // process each port forward concurrently
  588. sshClient.channelHandlerWaitGroup.Add(1)
  589. go sshClient.handleNewPortForwardChannel(newChannel)
  590. }
  591. requestsWaitGroup.Wait()
  592. }
  593. func (sshClient *sshClient) rejectNewChannel(newChannel ssh.NewChannel, reason ssh.RejectionReason, message string) {
  594. // TODO: log more details?
  595. log.WithContextFields(
  596. LogFields{
  597. "channelType": newChannel.ChannelType(),
  598. "rejectMessage": message,
  599. "rejectReason": reason,
  600. }).Warning("reject new channel")
  601. newChannel.Reject(reason, message)
  602. }
  603. func (sshClient *sshClient) handleNewPortForwardChannel(newChannel ssh.NewChannel) {
  604. defer sshClient.channelHandlerWaitGroup.Done()
  605. // http://tools.ietf.org/html/rfc4254#section-7.2
  606. var directTcpipExtraData struct {
  607. HostToConnect string
  608. PortToConnect uint32
  609. OriginatorIPAddress string
  610. OriginatorPort uint32
  611. }
  612. err := ssh.Unmarshal(newChannel.ExtraData(), &directTcpipExtraData)
  613. if err != nil {
  614. sshClient.rejectNewChannel(newChannel, ssh.Prohibited, "invalid extra data")
  615. return
  616. }
  617. // Intercept TCP port forwards to a specified udpgw server and handle directly.
  618. // TODO: also support UDP explicitly, e.g. with a custom "direct-udp" channel type?
  619. isUDPChannel := sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress != "" &&
  620. sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress ==
  621. fmt.Sprintf("%s:%d",
  622. directTcpipExtraData.HostToConnect,
  623. directTcpipExtraData.PortToConnect)
  624. if isUDPChannel {
  625. sshClient.handleUDPChannel(newChannel)
  626. } else {
  627. sshClient.handleTCPChannel(
  628. directTcpipExtraData.HostToConnect, int(directTcpipExtraData.PortToConnect), newChannel)
  629. }
  630. }
  631. func (sshClient *sshClient) isPortForwardPermitted(
  632. port int, allowPorts []int, denyPorts []int) bool {
  633. // TODO: faster lookup?
  634. if len(allowPorts) > 0 {
  635. for _, allowPort := range allowPorts {
  636. if port == allowPort {
  637. return true
  638. }
  639. }
  640. return false
  641. }
  642. if len(denyPorts) > 0 {
  643. for _, denyPort := range denyPorts {
  644. if port == denyPort {
  645. return false
  646. }
  647. }
  648. }
  649. return true
  650. }
  651. func (sshClient *sshClient) isPortForwardLimitExceeded(
  652. state *trafficState, maxPortForwardCount int) bool {
  653. limitExceeded := false
  654. if maxPortForwardCount > 0 {
  655. sshClient.Lock()
  656. limitExceeded = state.concurrentPortForwardCount >= int64(maxPortForwardCount)
  657. sshClient.Unlock()
  658. }
  659. return limitExceeded
  660. }
  661. func (sshClient *sshClient) openedPortForward(
  662. state *trafficState) {
  663. sshClient.Lock()
  664. state.concurrentPortForwardCount += 1
  665. if state.concurrentPortForwardCount > state.peakConcurrentPortForwardCount {
  666. state.peakConcurrentPortForwardCount = state.concurrentPortForwardCount
  667. }
  668. state.totalPortForwardCount += 1
  669. sshClient.Unlock()
  670. }
  671. func (sshClient *sshClient) closedPortForward(
  672. state *trafficState, bytesUp, bytesDown int64) {
  673. sshClient.Lock()
  674. state.concurrentPortForwardCount -= 1
  675. state.bytesUp += bytesUp
  676. state.bytesDown += bytesDown
  677. sshClient.Unlock()
  678. }
  679. func (sshClient *sshClient) handleTCPChannel(
  680. hostToConnect string,
  681. portToConnect int,
  682. newChannel ssh.NewChannel) {
  683. if !sshClient.isPortForwardPermitted(
  684. portToConnect,
  685. sshClient.trafficRules.AllowTCPPorts,
  686. sshClient.trafficRules.DenyTCPPorts) {
  687. sshClient.rejectNewChannel(
  688. newChannel, ssh.Prohibited, "port forward not permitted")
  689. return
  690. }
  691. var bytesUp, bytesDown int64
  692. sshClient.openedPortForward(sshClient.tcpTrafficState)
  693. defer func() {
  694. sshClient.closedPortForward(
  695. sshClient.tcpTrafficState,
  696. atomic.LoadInt64(&bytesUp),
  697. atomic.LoadInt64(&bytesDown))
  698. }()
  699. // TOCTOU note: important to increment the port forward count (via
  700. // openPortForward) _before_ checking isPortForwardLimitExceeded
  701. // otherwise, the client could potentially consume excess resources
  702. // by initiating many port forwards concurrently.
  703. // TODO: close LRU connection (after successful Dial) instead of
  704. // rejecting new connection?
  705. if sshClient.isPortForwardLimitExceeded(
  706. sshClient.tcpTrafficState,
  707. sshClient.trafficRules.MaxTCPPortForwardCount) {
  708. // Close the oldest TCP port forward. CloseOldest() closes
  709. // the conn and the port forward's goroutine will complete
  710. // the cleanup asynchronously.
  711. //
  712. // Some known limitations:
  713. //
  714. // - Since CloseOldest() closes the upstream socket but does not
  715. // clean up all resources associated with the port forward. These
  716. // include the goroutine(s) relaying traffic as well as the SSH
  717. // channel. Closing the socket will interrupt the goroutines which
  718. // will then complete the cleanup. But, since the full cleanup is
  719. // asynchronous, there exists a possibility that a client can consume
  720. // more than max port forward resources -- just not upstream sockets.
  721. //
  722. // - An LRU list entry for this port forward is not added until
  723. // after the dial completes, but the port forward is counted
  724. // towards max limits. This means many dials in progress will
  725. // put established connections in jeopardy.
  726. //
  727. // - We're closing the oldest open connection _before_ successfully
  728. // dialing the new port forward. This means we are potentially
  729. // discarding a good connection to make way for a failed connection.
  730. // We cannot simply dial first and still maintain a limit on
  731. // resources used, so to address this we'd need to add some
  732. // accounting for connections still establishing.
  733. sshClient.tcpPortForwardLRU.CloseOldest()
  734. log.WithContextFields(
  735. LogFields{
  736. "maxCount": sshClient.trafficRules.MaxTCPPortForwardCount,
  737. }).Debug("closed LRU TCP port forward")
  738. }
  739. // Dial the target remote address. This is done in a goroutine to
  740. // ensure the shutdown signal is handled immediately.
  741. remoteAddr := fmt.Sprintf("%s:%d", hostToConnect, portToConnect)
  742. log.WithContextFields(LogFields{"remoteAddr": remoteAddr}).Debug("dialing")
  743. type dialTcpResult struct {
  744. conn net.Conn
  745. err error
  746. }
  747. resultChannel := make(chan *dialTcpResult, 1)
  748. go func() {
  749. // TODO: on EADDRNOTAVAIL, temporarily suspend new clients
  750. // TODO: IPv6 support
  751. conn, err := net.DialTimeout(
  752. "tcp4", remoteAddr, SSH_TCP_PORT_FORWARD_DIAL_TIMEOUT)
  753. resultChannel <- &dialTcpResult{conn, err}
  754. }()
  755. var result *dialTcpResult
  756. select {
  757. case result = <-resultChannel:
  758. case <-sshClient.stopBroadcast:
  759. // Note: may leave dial in progress
  760. return
  761. }
  762. if result.err != nil {
  763. sshClient.rejectNewChannel(newChannel, ssh.ConnectionFailed, result.err.Error())
  764. return
  765. }
  766. // The upstream TCP port forward connection has been established. Schedule
  767. // some cleanup and notify the SSH client that the channel is accepted.
  768. fwdConn := result.conn
  769. defer fwdConn.Close()
  770. lruEntry := sshClient.tcpPortForwardLRU.Add(fwdConn)
  771. defer lruEntry.Remove()
  772. // ActivityMonitoredConn monitors the TCP port forward I/O and updates
  773. // its LRU status. ActivityMonitoredConn also times out read on the port
  774. // forward if both reads and writes have been idle for the specified
  775. // duration.
  776. fwdConn = psiphon.NewActivityMonitoredConn(
  777. fwdConn,
  778. time.Duration(sshClient.trafficRules.IdleTCPPortForwardTimeoutMilliseconds)*time.Millisecond,
  779. true,
  780. lruEntry)
  781. fwdChannel, requests, err := newChannel.Accept()
  782. if err != nil {
  783. log.WithContextFields(LogFields{"error": err}).Warning("accept new channel failed")
  784. return
  785. }
  786. go ssh.DiscardRequests(requests)
  787. defer fwdChannel.Close()
  788. log.WithContextFields(LogFields{"remoteAddr": remoteAddr}).Debug("relaying")
  789. // Relay channel to forwarded connection.
  790. // TODO: relay errors to fwdChannel.Stderr()?
  791. relayWaitGroup := new(sync.WaitGroup)
  792. relayWaitGroup.Add(1)
  793. go func() {
  794. defer relayWaitGroup.Done()
  795. // io.Copy allocates a 32K temporary buffer, and each port forward relay uses
  796. // two of these buffers; using io.CopyBuffer with a smaller buffer reduces the
  797. // overall memory footprint.
  798. bytes, err := io.CopyBuffer(
  799. fwdChannel, fwdConn, make([]byte, SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE))
  800. atomic.AddInt64(&bytesDown, bytes)
  801. if err != nil && err != io.EOF {
  802. // Debug since errors such as "connection reset by peer" occur during normal operation
  803. log.WithContextFields(LogFields{"error": err}).Debug("downstream TCP relay failed")
  804. }
  805. // Interrupt upstream io.Copy when downstream is shutting down.
  806. // TODO: this is done to quickly cleanup the port forward when
  807. // fwdConn has a read timeout, but is it clean -- upstream may still
  808. // be flowing?
  809. fwdChannel.Close()
  810. }()
  811. bytes, err := io.CopyBuffer(
  812. fwdConn, fwdChannel, make([]byte, SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE))
  813. atomic.AddInt64(&bytesUp, bytes)
  814. if err != nil && err != io.EOF {
  815. log.WithContextFields(LogFields{"error": err}).Debug("upstream TCP relay failed")
  816. }
  817. // Shutdown special case: fwdChannel will be closed and return EOF when
  818. // the SSH connection is closed, but we need to explicitly close fwdConn
  819. // to interrupt the downstream io.Copy, which may be blocked on a
  820. // fwdConn.Read().
  821. fwdConn.Close()
  822. relayWaitGroup.Wait()
  823. log.WithContextFields(
  824. LogFields{
  825. "remoteAddr": remoteAddr,
  826. "bytesUp": atomic.LoadInt64(&bytesUp),
  827. "bytesDown": atomic.LoadInt64(&bytesDown)}).Debug("exiting")
  828. }