tunnelServer.go 103 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094
  1. /*
  2. * Copyright (c) 2016, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "context"
  22. "crypto/rand"
  23. "crypto/subtle"
  24. "encoding/base64"
  25. "encoding/json"
  26. "errors"
  27. "fmt"
  28. "io"
  29. "io/ioutil"
  30. "net"
  31. "strconv"
  32. "sync"
  33. "sync/atomic"
  34. "syscall"
  35. "time"
  36. "github.com/Psiphon-Labs/goarista/monotime"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/accesscontrol"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/ssh"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/fragmentor"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/marionette"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/osl"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  45. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  46. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
  47. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tapdance"
  48. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tun"
  49. "github.com/marusama/semaphore"
  50. cache "github.com/patrickmn/go-cache"
  51. )
  52. const (
  53. SSH_AUTH_LOG_PERIOD = 30 * time.Minute
  54. SSH_HANDSHAKE_TIMEOUT = 30 * time.Second
  55. SSH_BEGIN_HANDSHAKE_TIMEOUT = 1 * time.Second
  56. SSH_CONNECTION_READ_DEADLINE = 5 * time.Minute
  57. SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE = 8192
  58. SSH_TCP_PORT_FORWARD_QUEUE_SIZE = 1024
  59. SSH_KEEP_ALIVE_PAYLOAD_MIN_BYTES = 0
  60. SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES = 256
  61. SSH_SEND_OSL_INITIAL_RETRY_DELAY = 30 * time.Second
  62. SSH_SEND_OSL_RETRY_FACTOR = 2
  63. OSL_SESSION_CACHE_TTL = 5 * time.Minute
  64. MAX_AUTHORIZATIONS = 16
  65. PRE_HANDSHAKE_RANDOM_STREAM_MAX_COUNT = 1
  66. RANDOM_STREAM_MAX_BYTES = 10485760
  67. )
  68. // TunnelServer is the main server that accepts Psiphon client
  69. // connections, via various obfuscation protocols, and provides
  70. // port forwarding (TCP and UDP) services to the Psiphon client.
  71. // At its core, TunnelServer is an SSH server. SSH is the base
  72. // protocol that provides port forward multiplexing, and transport
  73. // security. Layered on top of SSH, optionally, is Obfuscated SSH
  74. // and meek protocols, which provide further circumvention
  75. // capabilities.
  76. type TunnelServer struct {
  77. runWaitGroup *sync.WaitGroup
  78. listenerError chan error
  79. shutdownBroadcast <-chan struct{}
  80. sshServer *sshServer
  81. }
  82. // NewTunnelServer initializes a new tunnel server.
  83. func NewTunnelServer(
  84. support *SupportServices,
  85. shutdownBroadcast <-chan struct{}) (*TunnelServer, error) {
  86. sshServer, err := newSSHServer(support, shutdownBroadcast)
  87. if err != nil {
  88. return nil, common.ContextError(err)
  89. }
  90. return &TunnelServer{
  91. runWaitGroup: new(sync.WaitGroup),
  92. listenerError: make(chan error),
  93. shutdownBroadcast: shutdownBroadcast,
  94. sshServer: sshServer,
  95. }, nil
  96. }
  97. // Run runs the tunnel server; this function blocks while running a selection of
  98. // listeners that handle connection using various obfuscation protocols.
  99. //
  100. // Run listens on each designated tunnel port and spawns new goroutines to handle
  101. // each client connection. It halts when shutdownBroadcast is signaled. A list of active
  102. // clients is maintained, and when halting all clients are cleanly shutdown.
  103. //
  104. // Each client goroutine handles its own obfuscation (optional), SSH handshake, SSH
  105. // authentication, and then looping on client new channel requests. "direct-tcpip"
  106. // channels, dynamic port fowards, are supported. When the UDPInterceptUdpgwServerAddress
  107. // config parameter is configured, UDP port forwards over a TCP stream, following
  108. // the udpgw protocol, are handled.
  109. //
  110. // A new goroutine is spawned to handle each port forward for each client. Each port
  111. // forward tracks its bytes transferred. Overall per-client stats for connection duration,
  112. // GeoIP, number of port forwards, and bytes transferred are tracked and logged when the
  113. // client shuts down.
  114. //
  115. // Note: client handler goroutines may still be shutting down after Run() returns. See
  116. // comment in sshClient.stop(). TODO: fully synchronized shutdown.
  117. func (server *TunnelServer) Run() error {
  118. type sshListener struct {
  119. net.Listener
  120. localAddress string
  121. tunnelProtocol string
  122. }
  123. // TODO: should TunnelServer hold its own support pointer?
  124. support := server.sshServer.support
  125. // First bind all listeners; once all are successful,
  126. // start accepting connections on each.
  127. var listeners []*sshListener
  128. for tunnelProtocol, listenPort := range support.Config.TunnelProtocolPorts {
  129. localAddress := fmt.Sprintf(
  130. "%s:%d", support.Config.ServerIPAddress, listenPort)
  131. var listener net.Listener
  132. var err error
  133. if protocol.TunnelProtocolUsesFrontedMeekQUIC(tunnelProtocol) {
  134. // For FRONTED-MEEK-QUIC-OSSH, no listener implemented. The edge-to-server
  135. // hop uses HTTPS and the client tunnel protocol is distinguished using
  136. // protocol.MeekCookieData.ClientTunnelProtocol.
  137. continue
  138. } else if protocol.TunnelProtocolUsesQUIC(tunnelProtocol) {
  139. listener, err = quic.Listen(
  140. CommonLogger(log),
  141. localAddress,
  142. support.Config.ObfuscatedSSHKey)
  143. } else if protocol.TunnelProtocolUsesMarionette(tunnelProtocol) {
  144. listener, err = marionette.Listen(
  145. support.Config.ServerIPAddress,
  146. support.Config.MarionetteFormat)
  147. } else if protocol.TunnelProtocolUsesTapdance(tunnelProtocol) {
  148. listener, err = tapdance.Listen(localAddress)
  149. } else {
  150. listener, err = net.Listen("tcp", localAddress)
  151. }
  152. if err != nil {
  153. for _, existingListener := range listeners {
  154. existingListener.Listener.Close()
  155. }
  156. return common.ContextError(err)
  157. }
  158. tacticsListener := tactics.NewListener(
  159. listener,
  160. support.TacticsServer,
  161. tunnelProtocol,
  162. func(IPAddress string) common.GeoIPData {
  163. return common.GeoIPData(support.GeoIPService.Lookup(IPAddress))
  164. })
  165. log.WithContextFields(
  166. LogFields{
  167. "localAddress": localAddress,
  168. "tunnelProtocol": tunnelProtocol,
  169. }).Info("listening")
  170. listeners = append(
  171. listeners,
  172. &sshListener{
  173. Listener: tacticsListener,
  174. localAddress: localAddress,
  175. tunnelProtocol: tunnelProtocol,
  176. })
  177. }
  178. for _, listener := range listeners {
  179. server.runWaitGroup.Add(1)
  180. go func(listener *sshListener) {
  181. defer server.runWaitGroup.Done()
  182. log.WithContextFields(
  183. LogFields{
  184. "localAddress": listener.localAddress,
  185. "tunnelProtocol": listener.tunnelProtocol,
  186. }).Info("running")
  187. server.sshServer.runListener(
  188. listener.Listener,
  189. server.listenerError,
  190. listener.tunnelProtocol)
  191. log.WithContextFields(
  192. LogFields{
  193. "localAddress": listener.localAddress,
  194. "tunnelProtocol": listener.tunnelProtocol,
  195. }).Info("stopped")
  196. }(listener)
  197. }
  198. var err error
  199. select {
  200. case <-server.shutdownBroadcast:
  201. case err = <-server.listenerError:
  202. }
  203. for _, listener := range listeners {
  204. listener.Close()
  205. }
  206. server.sshServer.stopClients()
  207. server.runWaitGroup.Wait()
  208. log.WithContext().Info("stopped")
  209. return err
  210. }
  211. // GetLoadStats returns load stats for the tunnel server. The stats are
  212. // broken down by protocol ("SSH", "OSSH", etc.) and type. Types of stats
  213. // include current connected client count, total number of current port
  214. // forwards.
  215. func (server *TunnelServer) GetLoadStats() (ProtocolStats, RegionStats) {
  216. return server.sshServer.getLoadStats()
  217. }
  218. // ResetAllClientTrafficRules resets all established client traffic rules
  219. // to use the latest config and client properties. Any existing traffic
  220. // rule state is lost, including throttling state.
  221. func (server *TunnelServer) ResetAllClientTrafficRules() {
  222. server.sshServer.resetAllClientTrafficRules()
  223. }
  224. // ResetAllClientOSLConfigs resets all established client OSL state to use
  225. // the latest OSL config. Any existing OSL state is lost, including partial
  226. // progress towards SLOKs.
  227. func (server *TunnelServer) ResetAllClientOSLConfigs() {
  228. server.sshServer.resetAllClientOSLConfigs()
  229. }
  230. // SetClientHandshakeState sets the handshake state -- that it completed and
  231. // what parameters were passed -- in sshClient. This state is used for allowing
  232. // port forwards and for future traffic rule selection. SetClientHandshakeState
  233. // also triggers an immediate traffic rule re-selection, as the rules selected
  234. // upon tunnel establishment may no longer apply now that handshake values are
  235. // set.
  236. //
  237. // The authorizations received from the client handshake are verified and the
  238. // resulting list of authorized access types are applied to the client's tunnel
  239. // and traffic rules. A list of active authorization IDs and authorized access
  240. // types is returned for responding to the client and logging.
  241. func (server *TunnelServer) SetClientHandshakeState(
  242. sessionID string,
  243. state handshakeState,
  244. authorizations []string) ([]string, []string, error) {
  245. return server.sshServer.setClientHandshakeState(sessionID, state, authorizations)
  246. }
  247. // GetClientHandshaked indicates whether the client has completed a handshake
  248. // and whether its traffic rules are immediately exhausted.
  249. func (server *TunnelServer) GetClientHandshaked(
  250. sessionID string) (bool, bool, error) {
  251. return server.sshServer.getClientHandshaked(sessionID)
  252. }
  253. // UpdateClientAPIParameters updates the recorded handshake API parameters for
  254. // the client corresponding to sessionID.
  255. func (server *TunnelServer) UpdateClientAPIParameters(
  256. sessionID string,
  257. apiParams common.APIParameters) error {
  258. return server.sshServer.updateClientAPIParameters(sessionID, apiParams)
  259. }
  260. // ExpectClientDomainBytes indicates whether the client was configured to report
  261. // domain bytes in its handshake response.
  262. func (server *TunnelServer) ExpectClientDomainBytes(
  263. sessionID string) (bool, error) {
  264. return server.sshServer.expectClientDomainBytes(sessionID)
  265. }
  266. // SetEstablishTunnels sets whether new tunnels may be established or not.
  267. // When not establishing, incoming connections are immediately closed.
  268. func (server *TunnelServer) SetEstablishTunnels(establish bool) {
  269. server.sshServer.setEstablishTunnels(establish)
  270. }
  271. // GetEstablishTunnels returns whether new tunnels may be established or not.
  272. func (server *TunnelServer) GetEstablishTunnels() bool {
  273. return server.sshServer.getEstablishTunnels()
  274. }
  275. type sshServer struct {
  276. // Note: 64-bit ints used with atomic operations are placed
  277. // at the start of struct to ensure 64-bit alignment.
  278. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  279. lastAuthLog int64
  280. authFailedCount int64
  281. support *SupportServices
  282. establishTunnels int32
  283. concurrentSSHHandshakes semaphore.Semaphore
  284. shutdownBroadcast <-chan struct{}
  285. sshHostKey ssh.Signer
  286. clientsMutex sync.Mutex
  287. stoppingClients bool
  288. acceptedClientCounts map[string]map[string]int64
  289. clients map[string]*sshClient
  290. oslSessionCacheMutex sync.Mutex
  291. oslSessionCache *cache.Cache
  292. authorizationSessionIDsMutex sync.Mutex
  293. authorizationSessionIDs map[string]string
  294. }
  295. func newSSHServer(
  296. support *SupportServices,
  297. shutdownBroadcast <-chan struct{}) (*sshServer, error) {
  298. privateKey, err := ssh.ParseRawPrivateKey([]byte(support.Config.SSHPrivateKey))
  299. if err != nil {
  300. return nil, common.ContextError(err)
  301. }
  302. // TODO: use cert (ssh.NewCertSigner) for anti-fingerprint?
  303. signer, err := ssh.NewSignerFromKey(privateKey)
  304. if err != nil {
  305. return nil, common.ContextError(err)
  306. }
  307. var concurrentSSHHandshakes semaphore.Semaphore
  308. if support.Config.MaxConcurrentSSHHandshakes > 0 {
  309. concurrentSSHHandshakes = semaphore.New(support.Config.MaxConcurrentSSHHandshakes)
  310. }
  311. // The OSL session cache temporarily retains OSL seed state
  312. // progress for disconnected clients. This enables clients
  313. // that disconnect and immediately reconnect to the same
  314. // server to resume their OSL progress. Cached progress
  315. // is referenced by session ID and is retained for
  316. // OSL_SESSION_CACHE_TTL after disconnect.
  317. //
  318. // Note: session IDs are assumed to be unpredictable. If a
  319. // rogue client could guess the session ID of another client,
  320. // it could resume its OSL progress and, if the OSL config
  321. // were known, infer some activity.
  322. oslSessionCache := cache.New(OSL_SESSION_CACHE_TTL, 1*time.Minute)
  323. return &sshServer{
  324. support: support,
  325. establishTunnels: 1,
  326. concurrentSSHHandshakes: concurrentSSHHandshakes,
  327. shutdownBroadcast: shutdownBroadcast,
  328. sshHostKey: signer,
  329. acceptedClientCounts: make(map[string]map[string]int64),
  330. clients: make(map[string]*sshClient),
  331. oslSessionCache: oslSessionCache,
  332. authorizationSessionIDs: make(map[string]string),
  333. }, nil
  334. }
  335. func (sshServer *sshServer) setEstablishTunnels(establish bool) {
  336. // Do nothing when the setting is already correct. This avoids
  337. // spurious log messages when setEstablishTunnels is called
  338. // periodically with the same setting.
  339. if establish == sshServer.getEstablishTunnels() {
  340. return
  341. }
  342. establishFlag := int32(1)
  343. if !establish {
  344. establishFlag = 0
  345. }
  346. atomic.StoreInt32(&sshServer.establishTunnels, establishFlag)
  347. log.WithContextFields(
  348. LogFields{"establish": establish}).Info("establishing tunnels")
  349. }
  350. func (sshServer *sshServer) getEstablishTunnels() bool {
  351. return atomic.LoadInt32(&sshServer.establishTunnels) == 1
  352. }
  353. // runListener is intended to run an a goroutine; it blocks
  354. // running a particular listener. If an unrecoverable error
  355. // occurs, it will send the error to the listenerError channel.
  356. func (sshServer *sshServer) runListener(
  357. listener net.Listener,
  358. listenerError chan<- error,
  359. listenerTunnelProtocol string) {
  360. runningProtocols := make([]string, 0)
  361. for tunnelProtocol := range sshServer.support.Config.TunnelProtocolPorts {
  362. runningProtocols = append(runningProtocols, tunnelProtocol)
  363. }
  364. handleClient := func(clientTunnelProtocol string, clientConn net.Conn) {
  365. // Note: establish tunnel limiter cannot simply stop TCP
  366. // listeners in all cases (e.g., meek) since SSH tunnels can
  367. // span multiple TCP connections.
  368. if !sshServer.getEstablishTunnels() {
  369. log.WithContext().Debug("not establishing tunnels")
  370. clientConn.Close()
  371. return
  372. }
  373. // The tunnelProtocol passed to handleClient is used for stats,
  374. // throttling, etc. When the tunnel protocol can be determined
  375. // unambiguously from the listening port, use that protocol and
  376. // don't use any client-declared value. Only use the client's
  377. // value, if present, in special cases where the listening port
  378. // cannot distinguish the protocol.
  379. tunnelProtocol := listenerTunnelProtocol
  380. if clientTunnelProtocol != "" {
  381. if !common.Contains(runningProtocols, clientTunnelProtocol) {
  382. log.WithContextFields(
  383. LogFields{
  384. "clientTunnelProtocol": clientTunnelProtocol}).
  385. Warning("invalid client tunnel protocol")
  386. clientConn.Close()
  387. return
  388. }
  389. if protocol.UseClientTunnelProtocol(
  390. clientTunnelProtocol, runningProtocols) {
  391. tunnelProtocol = clientTunnelProtocol
  392. }
  393. }
  394. // process each client connection concurrently
  395. go sshServer.handleClient(tunnelProtocol, clientConn)
  396. }
  397. // Note: when exiting due to a unrecoverable error, be sure
  398. // to try to send the error to listenerError so that the outer
  399. // TunnelServer.Run will properly shut down instead of remaining
  400. // running.
  401. if protocol.TunnelProtocolUsesMeekHTTP(listenerTunnelProtocol) ||
  402. protocol.TunnelProtocolUsesMeekHTTPS(listenerTunnelProtocol) {
  403. meekServer, err := NewMeekServer(
  404. sshServer.support,
  405. listener,
  406. protocol.TunnelProtocolUsesMeekHTTPS(listenerTunnelProtocol),
  407. protocol.TunnelProtocolUsesFrontedMeek(listenerTunnelProtocol),
  408. protocol.TunnelProtocolUsesObfuscatedSessionTickets(listenerTunnelProtocol),
  409. handleClient,
  410. sshServer.shutdownBroadcast)
  411. if err == nil {
  412. err = meekServer.Run()
  413. }
  414. if err != nil {
  415. select {
  416. case listenerError <- common.ContextError(err):
  417. default:
  418. }
  419. return
  420. }
  421. } else {
  422. for {
  423. conn, err := listener.Accept()
  424. select {
  425. case <-sshServer.shutdownBroadcast:
  426. if err == nil {
  427. conn.Close()
  428. }
  429. return
  430. default:
  431. }
  432. if err != nil {
  433. if e, ok := err.(net.Error); ok && e.Temporary() {
  434. log.WithContextFields(LogFields{"error": err}).Error("accept failed")
  435. // Temporary error, keep running
  436. continue
  437. }
  438. select {
  439. case listenerError <- common.ContextError(err):
  440. default:
  441. }
  442. return
  443. }
  444. handleClient("", conn)
  445. }
  446. }
  447. }
  448. // An accepted client has completed a direct TCP or meek connection and has a net.Conn. Registration
  449. // is for tracking the number of connections.
  450. func (sshServer *sshServer) registerAcceptedClient(tunnelProtocol, region string) {
  451. sshServer.clientsMutex.Lock()
  452. defer sshServer.clientsMutex.Unlock()
  453. if sshServer.acceptedClientCounts[tunnelProtocol] == nil {
  454. sshServer.acceptedClientCounts[tunnelProtocol] = make(map[string]int64)
  455. }
  456. sshServer.acceptedClientCounts[tunnelProtocol][region] += 1
  457. }
  458. func (sshServer *sshServer) unregisterAcceptedClient(tunnelProtocol, region string) {
  459. sshServer.clientsMutex.Lock()
  460. defer sshServer.clientsMutex.Unlock()
  461. sshServer.acceptedClientCounts[tunnelProtocol][region] -= 1
  462. }
  463. // An established client has completed its SSH handshake and has a ssh.Conn. Registration is
  464. // for tracking the number of fully established clients and for maintaining a list of running
  465. // clients (for stopping at shutdown time).
  466. func (sshServer *sshServer) registerEstablishedClient(client *sshClient) bool {
  467. sshServer.clientsMutex.Lock()
  468. if sshServer.stoppingClients {
  469. sshServer.clientsMutex.Unlock()
  470. return false
  471. }
  472. // In the case of a duplicate client sessionID, the previous client is closed.
  473. // - Well-behaved clients generate a random sessionID that should be unique (won't
  474. // accidentally conflict) and hard to guess (can't be targeted by a malicious
  475. // client).
  476. // - Clients reuse the same sessionID when a tunnel is unexpectedly disconnected
  477. // and reestablished. In this case, when the same server is selected, this logic
  478. // will be hit; closing the old, dangling client is desirable.
  479. // - Multi-tunnel clients should not normally use one server for multiple tunnels.
  480. existingClient := sshServer.clients[client.sessionID]
  481. sshServer.clientsMutex.Unlock()
  482. if existingClient != nil {
  483. // This case is expected to be common, and so logged at the lowest severity
  484. // level.
  485. log.WithContext().Debug(
  486. "stopping existing client with duplicate session ID")
  487. existingClient.stop()
  488. // Block until the existingClient is fully terminated. This is necessary to
  489. // avoid this scenario:
  490. // - existingClient is invoking handshakeAPIRequestHandler
  491. // - sshServer.clients[client.sessionID] is updated to point to new client
  492. // - existingClient's handshakeAPIRequestHandler invokes
  493. // SetClientHandshakeState but sets the handshake parameters for new
  494. // client
  495. // - as a result, the new client handshake will fail (only a single handshake
  496. // is permitted) and the new client server_tunnel log will contain an
  497. // invalid mix of existing/new client fields
  498. //
  499. // Once existingClient.awaitStopped returns, all existingClient port
  500. // forwards and request handlers have terminated, so no API handler, either
  501. // tunneled web API or SSH API, will remain and it is safe to point
  502. // sshServer.clients[client.sessionID] to the new client.
  503. // Limitation: this scenario remains possible with _untunneled_ web API
  504. // requests.
  505. //
  506. // Blocking also ensures existingClient.releaseAuthorizations is invoked before
  507. // the new client attempts to submit the same authorizations.
  508. //
  509. // Perform blocking awaitStopped operation outside the
  510. // sshServer.clientsMutex mutex to avoid blocking all other clients for the
  511. // duration. We still expect and require that the stop process completes
  512. // rapidly, e.g., does not block on network I/O, allowing the new client
  513. // connection to proceed without delay.
  514. //
  515. // In addition, operations triggered by stop, and which must complete before
  516. // awaitStopped returns, will attempt to lock sshServer.clientsMutex,
  517. // including unregisterEstablishedClient.
  518. existingClient.awaitStopped()
  519. }
  520. sshServer.clientsMutex.Lock()
  521. defer sshServer.clientsMutex.Unlock()
  522. // existingClient's stop will have removed it from sshServer.clients via
  523. // unregisterEstablishedClient, so sshServer.clients[client.sessionID] should
  524. // be nil -- unless yet another client instance using the same sessionID has
  525. // connected in the meantime while awaiting existingClient stop. In this
  526. // case, it's not clear which is the most recent connection from the client,
  527. // so instead of this connection terminating more peers, it aborts.
  528. if sshServer.clients[client.sessionID] != nil {
  529. // As this is expected to be rare case, it's logged at a higher severity
  530. // level.
  531. log.WithContext().Warning(
  532. "aborting new client with duplicate session ID")
  533. return false
  534. }
  535. sshServer.clients[client.sessionID] = client
  536. return true
  537. }
  538. func (sshServer *sshServer) unregisterEstablishedClient(client *sshClient) {
  539. sshServer.clientsMutex.Lock()
  540. registeredClient := sshServer.clients[client.sessionID]
  541. // registeredClient will differ from client when client is the existingClient
  542. // terminated in registerEstablishedClient. In that case, registeredClient
  543. // remains connected, and the sshServer.clients entry should be retained.
  544. if registeredClient == client {
  545. delete(sshServer.clients, client.sessionID)
  546. }
  547. sshServer.clientsMutex.Unlock()
  548. client.stop()
  549. }
  550. type ProtocolStats map[string]map[string]int64
  551. type RegionStats map[string]map[string]map[string]int64
  552. func (sshServer *sshServer) getLoadStats() (ProtocolStats, RegionStats) {
  553. sshServer.clientsMutex.Lock()
  554. defer sshServer.clientsMutex.Unlock()
  555. // Explicitly populate with zeros to ensure 0 counts in log messages
  556. zeroStats := func() map[string]int64 {
  557. stats := make(map[string]int64)
  558. stats["accepted_clients"] = 0
  559. stats["established_clients"] = 0
  560. stats["dialing_tcp_port_forwards"] = 0
  561. stats["tcp_port_forwards"] = 0
  562. stats["total_tcp_port_forwards"] = 0
  563. stats["udp_port_forwards"] = 0
  564. stats["total_udp_port_forwards"] = 0
  565. stats["tcp_port_forward_dialed_count"] = 0
  566. stats["tcp_port_forward_dialed_duration"] = 0
  567. stats["tcp_port_forward_failed_count"] = 0
  568. stats["tcp_port_forward_failed_duration"] = 0
  569. stats["tcp_port_forward_rejected_dialing_limit_count"] = 0
  570. return stats
  571. }
  572. zeroProtocolStats := func() map[string]map[string]int64 {
  573. stats := make(map[string]map[string]int64)
  574. stats["ALL"] = zeroStats()
  575. for tunnelProtocol := range sshServer.support.Config.TunnelProtocolPorts {
  576. stats[tunnelProtocol] = zeroStats()
  577. }
  578. return stats
  579. }
  580. // [<protocol or ALL>][<stat name>] -> count
  581. protocolStats := zeroProtocolStats()
  582. // [<region][<protocol or ALL>][<stat name>] -> count
  583. regionStats := make(RegionStats)
  584. // Note: as currently tracked/counted, each established client is also an accepted client
  585. for tunnelProtocol, regionAcceptedClientCounts := range sshServer.acceptedClientCounts {
  586. for region, acceptedClientCount := range regionAcceptedClientCounts {
  587. if acceptedClientCount > 0 {
  588. if regionStats[region] == nil {
  589. regionStats[region] = zeroProtocolStats()
  590. }
  591. protocolStats["ALL"]["accepted_clients"] += acceptedClientCount
  592. protocolStats[tunnelProtocol]["accepted_clients"] += acceptedClientCount
  593. regionStats[region]["ALL"]["accepted_clients"] += acceptedClientCount
  594. regionStats[region][tunnelProtocol]["accepted_clients"] += acceptedClientCount
  595. }
  596. }
  597. }
  598. for _, client := range sshServer.clients {
  599. client.Lock()
  600. tunnelProtocol := client.tunnelProtocol
  601. region := client.geoIPData.Country
  602. if regionStats[region] == nil {
  603. regionStats[region] = zeroProtocolStats()
  604. }
  605. stats := []map[string]int64{
  606. protocolStats["ALL"],
  607. protocolStats[tunnelProtocol],
  608. regionStats[region]["ALL"],
  609. regionStats[region][tunnelProtocol]}
  610. for _, stat := range stats {
  611. stat["established_clients"] += 1
  612. // Note: can't sum trafficState.peakConcurrentPortForwardCount to get a global peak
  613. stat["dialing_tcp_port_forwards"] += client.tcpTrafficState.concurrentDialingPortForwardCount
  614. stat["tcp_port_forwards"] += client.tcpTrafficState.concurrentPortForwardCount
  615. stat["total_tcp_port_forwards"] += client.tcpTrafficState.totalPortForwardCount
  616. // client.udpTrafficState.concurrentDialingPortForwardCount isn't meaningful
  617. stat["udp_port_forwards"] += client.udpTrafficState.concurrentPortForwardCount
  618. stat["total_udp_port_forwards"] += client.udpTrafficState.totalPortForwardCount
  619. stat["tcp_port_forward_dialed_count"] += client.qualityMetrics.tcpPortForwardDialedCount
  620. stat["tcp_port_forward_dialed_duration"] +=
  621. int64(client.qualityMetrics.tcpPortForwardDialedDuration / time.Millisecond)
  622. stat["tcp_port_forward_failed_count"] += client.qualityMetrics.tcpPortForwardFailedCount
  623. stat["tcp_port_forward_failed_duration"] +=
  624. int64(client.qualityMetrics.tcpPortForwardFailedDuration / time.Millisecond)
  625. stat["tcp_port_forward_rejected_dialing_limit_count"] +=
  626. client.qualityMetrics.tcpPortForwardRejectedDialingLimitCount
  627. }
  628. client.qualityMetrics.tcpPortForwardDialedCount = 0
  629. client.qualityMetrics.tcpPortForwardDialedDuration = 0
  630. client.qualityMetrics.tcpPortForwardFailedCount = 0
  631. client.qualityMetrics.tcpPortForwardFailedDuration = 0
  632. client.qualityMetrics.tcpPortForwardRejectedDialingLimitCount = 0
  633. client.Unlock()
  634. }
  635. return protocolStats, regionStats
  636. }
  637. func (sshServer *sshServer) resetAllClientTrafficRules() {
  638. sshServer.clientsMutex.Lock()
  639. clients := make(map[string]*sshClient)
  640. for sessionID, client := range sshServer.clients {
  641. clients[sessionID] = client
  642. }
  643. sshServer.clientsMutex.Unlock()
  644. for _, client := range clients {
  645. client.setTrafficRules()
  646. }
  647. }
  648. func (sshServer *sshServer) resetAllClientOSLConfigs() {
  649. // Flush cached seed state. This has the same effect
  650. // and same limitations as calling setOSLConfig for
  651. // currently connected clients -- all progress is lost.
  652. sshServer.oslSessionCacheMutex.Lock()
  653. sshServer.oslSessionCache.Flush()
  654. sshServer.oslSessionCacheMutex.Unlock()
  655. sshServer.clientsMutex.Lock()
  656. clients := make(map[string]*sshClient)
  657. for sessionID, client := range sshServer.clients {
  658. clients[sessionID] = client
  659. }
  660. sshServer.clientsMutex.Unlock()
  661. for _, client := range clients {
  662. client.setOSLConfig()
  663. }
  664. }
  665. func (sshServer *sshServer) setClientHandshakeState(
  666. sessionID string,
  667. state handshakeState,
  668. authorizations []string) ([]string, []string, error) {
  669. sshServer.clientsMutex.Lock()
  670. client := sshServer.clients[sessionID]
  671. sshServer.clientsMutex.Unlock()
  672. if client == nil {
  673. return nil, nil, common.ContextError(errors.New("unknown session ID"))
  674. }
  675. activeAuthorizationIDs, authorizedAccessTypes, err := client.setHandshakeState(
  676. state, authorizations)
  677. if err != nil {
  678. return nil, nil, common.ContextError(err)
  679. }
  680. return activeAuthorizationIDs, authorizedAccessTypes, nil
  681. }
  682. func (sshServer *sshServer) getClientHandshaked(
  683. sessionID string) (bool, bool, error) {
  684. sshServer.clientsMutex.Lock()
  685. client := sshServer.clients[sessionID]
  686. sshServer.clientsMutex.Unlock()
  687. if client == nil {
  688. return false, false, common.ContextError(errors.New("unknown session ID"))
  689. }
  690. completed, exhausted := client.getHandshaked()
  691. return completed, exhausted, nil
  692. }
  693. func (sshServer *sshServer) updateClientAPIParameters(
  694. sessionID string,
  695. apiParams common.APIParameters) error {
  696. sshServer.clientsMutex.Lock()
  697. client := sshServer.clients[sessionID]
  698. sshServer.clientsMutex.Unlock()
  699. if client == nil {
  700. return common.ContextError(errors.New("unknown session ID"))
  701. }
  702. client.updateAPIParameters(apiParams)
  703. return nil
  704. }
  705. func (sshServer *sshServer) revokeClientAuthorizations(sessionID string) {
  706. sshServer.clientsMutex.Lock()
  707. client := sshServer.clients[sessionID]
  708. sshServer.clientsMutex.Unlock()
  709. if client == nil {
  710. return
  711. }
  712. // sshClient.handshakeState.authorizedAccessTypes is not cleared. Clearing
  713. // authorizedAccessTypes may cause sshClient.logTunnel to fail to log
  714. // access types. As the revocation may be due to legitimate use of an
  715. // authorization in multiple sessions by a single client, useful metrics
  716. // would be lost.
  717. client.Lock()
  718. client.handshakeState.authorizationsRevoked = true
  719. client.Unlock()
  720. // Select and apply new traffic rules, as filtered by the client's new
  721. // authorization state.
  722. client.setTrafficRules()
  723. }
  724. func (sshServer *sshServer) expectClientDomainBytes(
  725. sessionID string) (bool, error) {
  726. sshServer.clientsMutex.Lock()
  727. client := sshServer.clients[sessionID]
  728. sshServer.clientsMutex.Unlock()
  729. if client == nil {
  730. return false, common.ContextError(errors.New("unknown session ID"))
  731. }
  732. return client.expectDomainBytes(), nil
  733. }
  734. func (sshServer *sshServer) stopClients() {
  735. sshServer.clientsMutex.Lock()
  736. sshServer.stoppingClients = true
  737. clients := sshServer.clients
  738. sshServer.clients = make(map[string]*sshClient)
  739. sshServer.clientsMutex.Unlock()
  740. for _, client := range clients {
  741. client.stop()
  742. }
  743. }
  744. func (sshServer *sshServer) handleClient(tunnelProtocol string, clientConn net.Conn) {
  745. // Calling clientConn.RemoteAddr at this point, before any Read calls,
  746. // satisfies the constraint documented in tapdance.Listen.
  747. geoIPData := sshServer.support.GeoIPService.Lookup(
  748. common.IPAddressFromAddr(clientConn.RemoteAddr()))
  749. sshServer.registerAcceptedClient(tunnelProtocol, geoIPData.Country)
  750. defer sshServer.unregisterAcceptedClient(tunnelProtocol, geoIPData.Country)
  751. // When configured, enforce a cap on the number of concurrent SSH
  752. // handshakes. This limits load spikes on busy servers when many clients
  753. // attempt to connect at once. Wait a short time, SSH_BEGIN_HANDSHAKE_TIMEOUT,
  754. // to acquire; waiting will avoid immediately creating more load on another
  755. // server in the network when the client tries a new candidate. Disconnect the
  756. // client when that wait time is exceeded.
  757. //
  758. // This mechanism limits memory allocations and CPU usage associated with the
  759. // SSH handshake. At this point, new direct TCP connections or new meek
  760. // connections, with associated resource usage, are already established. Those
  761. // connections are expected to be rate or load limited using other mechanisms.
  762. //
  763. // TODO:
  764. //
  765. // - deduct time spent acquiring the semaphore from SSH_HANDSHAKE_TIMEOUT in
  766. // sshClient.run, since the client is also applying an SSH handshake timeout
  767. // and won't exclude time spent waiting.
  768. // - each call to sshServer.handleClient (in sshServer.runListener) is invoked
  769. // in its own goroutine, but shutdown doesn't synchronously await these
  770. // goroutnes. Once this is synchronizes, the following context.WithTimeout
  771. // should use an sshServer parent context to ensure blocking acquires
  772. // interrupt immediately upon shutdown.
  773. var onSSHHandshakeFinished func()
  774. if sshServer.support.Config.MaxConcurrentSSHHandshakes > 0 {
  775. ctx, cancelFunc := context.WithTimeout(
  776. context.Background(),
  777. sshServer.support.Config.sshBeginHandshakeTimeout)
  778. defer cancelFunc()
  779. err := sshServer.concurrentSSHHandshakes.Acquire(ctx, 1)
  780. if err != nil {
  781. clientConn.Close()
  782. // This is a debug log as the only possible error is context timeout.
  783. log.WithContextFields(LogFields{"error": err}).Debug(
  784. "acquire SSH handshake semaphore failed")
  785. return
  786. }
  787. onSSHHandshakeFinished = func() {
  788. sshServer.concurrentSSHHandshakes.Release(1)
  789. }
  790. }
  791. sshClient := newSshClient(sshServer, tunnelProtocol, geoIPData)
  792. // sshClient.run _must_ call onSSHHandshakeFinished to release the semaphore:
  793. // in any error case; or, as soon as the SSH handshake phase has successfully
  794. // completed.
  795. sshClient.run(clientConn, onSSHHandshakeFinished)
  796. }
  797. func (sshServer *sshServer) monitorPortForwardDialError(err error) {
  798. // "err" is the error returned from a failed TCP or UDP port
  799. // forward dial. Certain system error codes indicate low resource
  800. // conditions: insufficient file descriptors, ephemeral ports, or
  801. // memory. For these cases, log an alert.
  802. // TODO: also temporarily suspend new clients
  803. // Note: don't log net.OpError.Error() as the full error string
  804. // may contain client destination addresses.
  805. opErr, ok := err.(*net.OpError)
  806. if ok {
  807. if opErr.Err == syscall.EADDRNOTAVAIL ||
  808. opErr.Err == syscall.EAGAIN ||
  809. opErr.Err == syscall.ENOMEM ||
  810. opErr.Err == syscall.EMFILE ||
  811. opErr.Err == syscall.ENFILE {
  812. log.WithContextFields(
  813. LogFields{"error": opErr.Err}).Error(
  814. "port forward dial failed due to unavailable resource")
  815. }
  816. }
  817. }
  818. type sshClient struct {
  819. sync.Mutex
  820. sshServer *sshServer
  821. tunnelProtocol string
  822. sshConn ssh.Conn
  823. activityConn *common.ActivityMonitoredConn
  824. throttledConn *common.ThrottledConn
  825. geoIPData GeoIPData
  826. sessionID string
  827. isFirstTunnelInSession bool
  828. supportsServerRequests bool
  829. handshakeState handshakeState
  830. udpChannel ssh.Channel
  831. packetTunnelChannel ssh.Channel
  832. trafficRules TrafficRules
  833. tcpTrafficState trafficState
  834. udpTrafficState trafficState
  835. qualityMetrics qualityMetrics
  836. tcpPortForwardLRU *common.LRUConns
  837. oslClientSeedState *osl.ClientSeedState
  838. signalIssueSLOKs chan struct{}
  839. runCtx context.Context
  840. stopRunning context.CancelFunc
  841. stopped chan struct{}
  842. tcpPortForwardDialingAvailableSignal context.CancelFunc
  843. releaseAuthorizations func()
  844. stopTimer *time.Timer
  845. preHandshakeRandomStreamMetrics randomStreamMetrics
  846. postHandshakeRandomStreamMetrics randomStreamMetrics
  847. }
  848. type trafficState struct {
  849. bytesUp int64
  850. bytesDown int64
  851. concurrentDialingPortForwardCount int64
  852. peakConcurrentDialingPortForwardCount int64
  853. concurrentPortForwardCount int64
  854. peakConcurrentPortForwardCount int64
  855. totalPortForwardCount int64
  856. availablePortForwardCond *sync.Cond
  857. }
  858. type randomStreamMetrics struct {
  859. count int
  860. upstreamBytes int
  861. receivedUpstreamBytes int
  862. downstreamBytes int
  863. sentDownstreamBytes int
  864. }
  865. // qualityMetrics records upstream TCP dial attempts and
  866. // elapsed time. Elapsed time includes the full TCP handshake
  867. // and, in aggregate, is a measure of the quality of the
  868. // upstream link. These stats are recorded by each sshClient
  869. // and then reported and reset in sshServer.getLoadStats().
  870. type qualityMetrics struct {
  871. tcpPortForwardDialedCount int64
  872. tcpPortForwardDialedDuration time.Duration
  873. tcpPortForwardFailedCount int64
  874. tcpPortForwardFailedDuration time.Duration
  875. tcpPortForwardRejectedDialingLimitCount int64
  876. }
  877. type handshakeState struct {
  878. completed bool
  879. apiProtocol string
  880. apiParams common.APIParameters
  881. authorizedAccessTypes []string
  882. authorizationsRevoked bool
  883. expectDomainBytes bool
  884. }
  885. func newSshClient(
  886. sshServer *sshServer, tunnelProtocol string, geoIPData GeoIPData) *sshClient {
  887. runCtx, stopRunning := context.WithCancel(context.Background())
  888. // isFirstTunnelInSession is defaulted to true so that the pre-handshake
  889. // traffic rules won't apply UnthrottleFirstTunnelOnly and negate any
  890. // unthrottled bytes during the initial protocol negotiation.
  891. client := &sshClient{
  892. sshServer: sshServer,
  893. tunnelProtocol: tunnelProtocol,
  894. geoIPData: geoIPData,
  895. isFirstTunnelInSession: true,
  896. tcpPortForwardLRU: common.NewLRUConns(),
  897. signalIssueSLOKs: make(chan struct{}, 1),
  898. runCtx: runCtx,
  899. stopRunning: stopRunning,
  900. stopped: make(chan struct{}),
  901. }
  902. client.tcpTrafficState.availablePortForwardCond = sync.NewCond(new(sync.Mutex))
  903. client.udpTrafficState.availablePortForwardCond = sync.NewCond(new(sync.Mutex))
  904. return client
  905. }
  906. func (sshClient *sshClient) run(
  907. baseConn net.Conn, onSSHHandshakeFinished func()) {
  908. // When run returns, the client has fully stopped, with all SSH state torn
  909. // down and no port forwards or API requests in progress.
  910. defer close(sshClient.stopped)
  911. // onSSHHandshakeFinished must be called even if the SSH handshake is aborted.
  912. defer func() {
  913. if onSSHHandshakeFinished != nil {
  914. onSSHHandshakeFinished()
  915. }
  916. }()
  917. // Set initial traffic rules, pre-handshake, based on currently known info.
  918. sshClient.setTrafficRules()
  919. conn := baseConn
  920. // Wrap the base client connection with an ActivityMonitoredConn which will
  921. // terminate the connection if no data is received before the deadline. This
  922. // timeout is in effect for the entire duration of the SSH connection. Clients
  923. // must actively use the connection or send SSH keep alive requests to keep
  924. // the connection active. Writes are not considered reliable activity indicators
  925. // due to buffering.
  926. activityConn, err := common.NewActivityMonitoredConn(
  927. conn,
  928. SSH_CONNECTION_READ_DEADLINE,
  929. false,
  930. nil,
  931. nil)
  932. if err != nil {
  933. conn.Close()
  934. if !isExpectedTunnelIOError(err) {
  935. log.WithContextFields(LogFields{"error": err}).Error("NewActivityMonitoredConn failed")
  936. }
  937. return
  938. }
  939. conn = activityConn
  940. // Further wrap the connection in a rate limiting ThrottledConn.
  941. throttledConn := common.NewThrottledConn(conn, sshClient.rateLimits())
  942. conn = throttledConn
  943. // Run the initial [obfuscated] SSH handshake in a goroutine so we can both
  944. // respect shutdownBroadcast and implement a specific handshake timeout.
  945. // The timeout is to reclaim network resources in case the handshake takes
  946. // too long.
  947. type sshNewServerConnResult struct {
  948. obfuscatedSSHConn *obfuscator.ObfuscatedSSHConn
  949. sshConn *ssh.ServerConn
  950. channels <-chan ssh.NewChannel
  951. requests <-chan *ssh.Request
  952. err error
  953. }
  954. resultChannel := make(chan *sshNewServerConnResult, 2)
  955. var afterFunc *time.Timer
  956. if sshClient.sshServer.support.Config.sshHandshakeTimeout > 0 {
  957. afterFunc = time.AfterFunc(sshClient.sshServer.support.Config.sshHandshakeTimeout, func() {
  958. resultChannel <- &sshNewServerConnResult{err: errors.New("ssh handshake timeout")}
  959. })
  960. }
  961. go func(baseConn, conn net.Conn) {
  962. sshServerConfig := &ssh.ServerConfig{
  963. PasswordCallback: sshClient.passwordCallback,
  964. AuthLogCallback: sshClient.authLogCallback,
  965. ServerVersion: sshClient.sshServer.support.Config.SSHServerVersion,
  966. }
  967. sshServerConfig.AddHostKey(sshClient.sshServer.sshHostKey)
  968. var err error
  969. if protocol.TunnelProtocolUsesObfuscatedSSH(sshClient.tunnelProtocol) {
  970. // With Encrypt-then-MAC hash algorithms, packet length is
  971. // transmitted in plaintext, which aids in traffic analysis;
  972. // clients may still send Encrypt-then-MAC algorithms in their
  973. // KEX_INIT message, but do not select these algorithms.
  974. //
  975. // The exception is TUNNEL_PROTOCOL_SSH, which is intended to appear
  976. // like SSH on the wire.
  977. sshServerConfig.NoEncryptThenMACHash = true
  978. } else {
  979. // For TUNNEL_PROTOCOL_SSH only, randomize KEX.
  980. if sshClient.sshServer.support.Config.ObfuscatedSSHKey != "" {
  981. sshServerConfig.KEXPRNGSeed, err = protocol.DeriveSSHServerKEXPRNGSeed(
  982. sshClient.sshServer.support.Config.ObfuscatedSSHKey)
  983. if err != nil {
  984. err = common.ContextError(err)
  985. }
  986. }
  987. }
  988. result := &sshNewServerConnResult{}
  989. // Wrap the connection in an SSH deobfuscator when required.
  990. if err == nil && protocol.TunnelProtocolUsesObfuscatedSSH(sshClient.tunnelProtocol) {
  991. // Note: NewObfuscatedSSHConn blocks on network I/O
  992. // TODO: ensure this won't block shutdown
  993. result.obfuscatedSSHConn, err = obfuscator.NewObfuscatedSSHConn(
  994. obfuscator.OBFUSCATION_CONN_MODE_SERVER,
  995. conn,
  996. sshClient.sshServer.support.Config.ObfuscatedSSHKey,
  997. nil, nil, nil)
  998. if err != nil {
  999. err = common.ContextError(err)
  1000. } else {
  1001. conn = result.obfuscatedSSHConn
  1002. }
  1003. // Now seed fragmentor, when present, with seed derived from
  1004. // initial obfuscator message. See tactics.Listener.Accept.
  1005. // This must preceed ssh.NewServerConn to ensure fragmentor
  1006. // is seeded before downstream bytes are written.
  1007. if err == nil && sshClient.tunnelProtocol == protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH {
  1008. if fragmentorConn, ok := baseConn.(*fragmentor.Conn); ok {
  1009. fragmentorPRNG, err := result.obfuscatedSSHConn.GetDerivedPRNG("server-side-fragmentor")
  1010. if err != nil {
  1011. err = common.ContextError(err)
  1012. } else {
  1013. fragmentorConn.SetPRNG(fragmentorPRNG)
  1014. }
  1015. }
  1016. }
  1017. }
  1018. if err == nil {
  1019. result.sshConn, result.channels, result.requests, err =
  1020. ssh.NewServerConn(conn, sshServerConfig)
  1021. if err != nil {
  1022. err = common.ContextError(err)
  1023. }
  1024. }
  1025. result.err = err
  1026. resultChannel <- result
  1027. }(baseConn, conn)
  1028. var result *sshNewServerConnResult
  1029. select {
  1030. case result = <-resultChannel:
  1031. case <-sshClient.sshServer.shutdownBroadcast:
  1032. // Close() will interrupt an ongoing handshake
  1033. // TODO: wait for SSH handshake goroutines to exit before returning?
  1034. conn.Close()
  1035. return
  1036. }
  1037. if afterFunc != nil {
  1038. afterFunc.Stop()
  1039. }
  1040. if result.err != nil {
  1041. conn.Close()
  1042. // This is a Debug log due to noise. The handshake often fails due to I/O
  1043. // errors as clients frequently interrupt connections in progress when
  1044. // client-side load balancing completes a connection to a different server.
  1045. log.WithContextFields(LogFields{"error": result.err}).Debug("handshake failed")
  1046. return
  1047. }
  1048. // The SSH handshake has finished successfully; notify now to allow other
  1049. // blocked SSH handshakes to proceed.
  1050. if onSSHHandshakeFinished != nil {
  1051. onSSHHandshakeFinished()
  1052. }
  1053. onSSHHandshakeFinished = nil
  1054. sshClient.Lock()
  1055. sshClient.sshConn = result.sshConn
  1056. sshClient.activityConn = activityConn
  1057. sshClient.throttledConn = throttledConn
  1058. sshClient.Unlock()
  1059. if !sshClient.sshServer.registerEstablishedClient(sshClient) {
  1060. conn.Close()
  1061. log.WithContext().Warning("register failed")
  1062. return
  1063. }
  1064. sshClient.runTunnel(result.channels, result.requests)
  1065. // Note: sshServer.unregisterEstablishedClient calls sshClient.stop(),
  1066. // which also closes underlying transport Conn.
  1067. sshClient.sshServer.unregisterEstablishedClient(sshClient)
  1068. // Some conns report additional metrics. Meek conns report resiliency
  1069. // metrics and fragmentor.Conns report fragmentor configs.
  1070. //
  1071. // Limitation: for meek, GetMetrics from underlying fragmentor.Conn(s)
  1072. // should be called in order to log fragmentor metrics for meek sessions.
  1073. var additionalMetrics []LogFields
  1074. if metricsSource, ok := baseConn.(common.MetricsSource); ok {
  1075. additionalMetrics = append(
  1076. additionalMetrics, LogFields(metricsSource.GetMetrics()))
  1077. }
  1078. if result.obfuscatedSSHConn != nil {
  1079. additionalMetrics = append(
  1080. additionalMetrics, LogFields(result.obfuscatedSSHConn.GetMetrics()))
  1081. }
  1082. sshClient.logTunnel(additionalMetrics)
  1083. // Transfer OSL seed state -- the OSL progress -- from the closing
  1084. // client to the session cache so the client can resume its progress
  1085. // if it reconnects to this same server.
  1086. // Note: following setOSLConfig order of locking.
  1087. sshClient.Lock()
  1088. if sshClient.oslClientSeedState != nil {
  1089. sshClient.sshServer.oslSessionCacheMutex.Lock()
  1090. sshClient.oslClientSeedState.Hibernate()
  1091. sshClient.sshServer.oslSessionCache.Set(
  1092. sshClient.sessionID, sshClient.oslClientSeedState, cache.DefaultExpiration)
  1093. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  1094. sshClient.oslClientSeedState = nil
  1095. }
  1096. sshClient.Unlock()
  1097. // Initiate cleanup of the GeoIP session cache. To allow for post-tunnel
  1098. // final status requests, the lifetime of cached GeoIP records exceeds the
  1099. // lifetime of the sshClient.
  1100. sshClient.sshServer.support.GeoIPService.MarkSessionCacheToExpire(sshClient.sessionID)
  1101. }
  1102. func (sshClient *sshClient) passwordCallback(conn ssh.ConnMetadata, password []byte) (*ssh.Permissions, error) {
  1103. expectedSessionIDLength := 2 * protocol.PSIPHON_API_CLIENT_SESSION_ID_LENGTH
  1104. expectedSSHPasswordLength := 2 * SSH_PASSWORD_BYTE_LENGTH
  1105. var sshPasswordPayload protocol.SSHPasswordPayload
  1106. err := json.Unmarshal(password, &sshPasswordPayload)
  1107. if err != nil {
  1108. // Backwards compatibility case: instead of a JSON payload, older clients
  1109. // send the hex encoded session ID prepended to the SSH password.
  1110. // Note: there's an even older case where clients don't send any session ID,
  1111. // but that's no longer supported.
  1112. if len(password) == expectedSessionIDLength+expectedSSHPasswordLength {
  1113. sshPasswordPayload.SessionId = string(password[0:expectedSessionIDLength])
  1114. sshPasswordPayload.SshPassword = string(password[expectedSessionIDLength:])
  1115. } else {
  1116. return nil, common.ContextError(fmt.Errorf("invalid password payload for %q", conn.User()))
  1117. }
  1118. }
  1119. if !isHexDigits(sshClient.sshServer.support.Config, sshPasswordPayload.SessionId) ||
  1120. len(sshPasswordPayload.SessionId) != expectedSessionIDLength {
  1121. return nil, common.ContextError(fmt.Errorf("invalid session ID for %q", conn.User()))
  1122. }
  1123. userOk := (subtle.ConstantTimeCompare(
  1124. []byte(conn.User()), []byte(sshClient.sshServer.support.Config.SSHUserName)) == 1)
  1125. passwordOk := (subtle.ConstantTimeCompare(
  1126. []byte(sshPasswordPayload.SshPassword), []byte(sshClient.sshServer.support.Config.SSHPassword)) == 1)
  1127. if !userOk || !passwordOk {
  1128. return nil, common.ContextError(fmt.Errorf("invalid password for %q", conn.User()))
  1129. }
  1130. sessionID := sshPasswordPayload.SessionId
  1131. // The GeoIP session cache will be populated if there was a previous tunnel
  1132. // with this session ID. This will be true up to GEOIP_SESSION_CACHE_TTL, which
  1133. // is currently much longer than the OSL session cache, another option to use if
  1134. // the GeoIP session cache is retired (the GeoIP session cache currently only
  1135. // supports legacy use cases).
  1136. isFirstTunnelInSession := !sshClient.sshServer.support.GeoIPService.InSessionCache(sessionID)
  1137. supportsServerRequests := common.Contains(
  1138. sshPasswordPayload.ClientCapabilities, protocol.CLIENT_CAPABILITY_SERVER_REQUESTS)
  1139. sshClient.Lock()
  1140. // After this point, these values are read-only as they are read
  1141. // without obtaining sshClient.Lock.
  1142. sshClient.sessionID = sessionID
  1143. sshClient.isFirstTunnelInSession = isFirstTunnelInSession
  1144. sshClient.supportsServerRequests = supportsServerRequests
  1145. geoIPData := sshClient.geoIPData
  1146. sshClient.Unlock()
  1147. // Store the GeoIP data associated with the session ID. This makes
  1148. // the GeoIP data available to the web server for web API requests.
  1149. // A cache that's distinct from the sshClient record is used to allow
  1150. // for or post-tunnel final status requests.
  1151. // If the client is reconnecting with the same session ID, this call
  1152. // will undo the expiry set by MarkSessionCacheToExpire.
  1153. sshClient.sshServer.support.GeoIPService.SetSessionCache(sessionID, geoIPData)
  1154. return nil, nil
  1155. }
  1156. func (sshClient *sshClient) authLogCallback(conn ssh.ConnMetadata, method string, err error) {
  1157. if err != nil {
  1158. if method == "none" && err.Error() == "no auth passed yet" {
  1159. // In this case, the callback invocation is noise from auth negotiation
  1160. return
  1161. }
  1162. // Note: here we previously logged messages for fail2ban to act on. This is no longer
  1163. // done as the complexity outweighs the benefits.
  1164. //
  1165. // - The SSH credential is not secret -- it's in the server entry. Attackers targeting
  1166. // the server likely already have the credential. On the other hand, random scanning and
  1167. // brute forcing is mitigated with high entropy random passwords, rate limiting
  1168. // (implemented on the host via iptables), and limited capabilities (the SSH session can
  1169. // only port forward).
  1170. //
  1171. // - fail2ban coverage was inconsistent; in the case of an unfronted meek protocol through
  1172. // an upstream proxy, the remote address is the upstream proxy, which should not be blocked.
  1173. // The X-Forwarded-For header cant be used instead as it may be forged and used to get IPs
  1174. // deliberately blocked; and in any case fail2ban adds iptables rules which can only block
  1175. // by direct remote IP, not by original client IP. Fronted meek has the same iptables issue.
  1176. //
  1177. // Random scanning and brute forcing of port 22 will result in log noise. To mitigate this,
  1178. // not every authentication failure is logged. A summary log is emitted periodically to
  1179. // retain some record of this activity in case this is relevant to, e.g., a performance
  1180. // investigation.
  1181. atomic.AddInt64(&sshClient.sshServer.authFailedCount, 1)
  1182. lastAuthLog := monotime.Time(atomic.LoadInt64(&sshClient.sshServer.lastAuthLog))
  1183. if monotime.Since(lastAuthLog) > SSH_AUTH_LOG_PERIOD {
  1184. now := int64(monotime.Now())
  1185. if atomic.CompareAndSwapInt64(&sshClient.sshServer.lastAuthLog, int64(lastAuthLog), now) {
  1186. count := atomic.SwapInt64(&sshClient.sshServer.authFailedCount, 0)
  1187. log.WithContextFields(
  1188. LogFields{"lastError": err, "failedCount": count}).Warning("authentication failures")
  1189. }
  1190. }
  1191. log.WithContextFields(LogFields{"error": err, "method": method}).Debug("authentication failed")
  1192. } else {
  1193. log.WithContextFields(LogFields{"error": err, "method": method}).Debug("authentication success")
  1194. }
  1195. }
  1196. // stop signals the ssh connection to shutdown. After sshConn.Wait returns,
  1197. // the SSH connection has terminated but sshClient.run may still be running and
  1198. // in the process of exiting.
  1199. //
  1200. // The shutdown process must complete rapidly and not, e.g., block on network
  1201. // I/O, as newly connecting clients need to await stop completion of any
  1202. // existing connection that shares the same session ID.
  1203. func (sshClient *sshClient) stop() {
  1204. sshClient.sshConn.Close()
  1205. sshClient.sshConn.Wait()
  1206. }
  1207. // awaitStopped will block until sshClient.run has exited, at which point all
  1208. // worker goroutines associated with the sshClient, including any in-flight
  1209. // API handlers, will have exited.
  1210. func (sshClient *sshClient) awaitStopped() {
  1211. <-sshClient.stopped
  1212. }
  1213. // runTunnel handles/dispatches new channels and new requests from the client.
  1214. // When the SSH client connection closes, both the channels and requests channels
  1215. // will close and runTunnel will exit.
  1216. func (sshClient *sshClient) runTunnel(
  1217. channels <-chan ssh.NewChannel,
  1218. requests <-chan *ssh.Request) {
  1219. waitGroup := new(sync.WaitGroup)
  1220. // Start client SSH API request handler
  1221. waitGroup.Add(1)
  1222. go func() {
  1223. defer waitGroup.Done()
  1224. sshClient.handleSSHRequests(requests)
  1225. }()
  1226. // Start OSL sender
  1227. if sshClient.supportsServerRequests {
  1228. waitGroup.Add(1)
  1229. go func() {
  1230. defer waitGroup.Done()
  1231. sshClient.runOSLSender()
  1232. }()
  1233. }
  1234. // Start the TCP port forward manager
  1235. // The queue size is set to the traffic rules (MaxTCPPortForwardCount +
  1236. // MaxTCPDialingPortForwardCount), which is a reasonable indication of resource
  1237. // limits per client; when that value is not set, a default is used.
  1238. // A limitation: this queue size is set once and doesn't change, for this client,
  1239. // when traffic rules are reloaded.
  1240. queueSize := sshClient.getTCPPortForwardQueueSize()
  1241. if queueSize == 0 {
  1242. queueSize = SSH_TCP_PORT_FORWARD_QUEUE_SIZE
  1243. }
  1244. newTCPPortForwards := make(chan *newTCPPortForward, queueSize)
  1245. waitGroup.Add(1)
  1246. go func() {
  1247. defer waitGroup.Done()
  1248. sshClient.handleTCPPortForwards(waitGroup, newTCPPortForwards)
  1249. }()
  1250. // Handle new channel (port forward) requests from the client.
  1251. for newChannel := range channels {
  1252. switch newChannel.ChannelType() {
  1253. case protocol.RANDOM_STREAM_CHANNEL_TYPE:
  1254. sshClient.handleNewRandomStreamChannel(waitGroup, newChannel)
  1255. case protocol.PACKET_TUNNEL_CHANNEL_TYPE:
  1256. sshClient.handleNewPacketTunnelChannel(waitGroup, newChannel)
  1257. case "direct-tcpip":
  1258. sshClient.handleNewTCPPortForwardChannel(waitGroup, newChannel, newTCPPortForwards)
  1259. default:
  1260. sshClient.rejectNewChannel(newChannel,
  1261. fmt.Sprintf("unknown or unsupported channel type: %s", newChannel.ChannelType()))
  1262. }
  1263. }
  1264. // The channel loop is interrupted by a client
  1265. // disconnect or by calling sshClient.stop().
  1266. // Stop the TCP port forward manager
  1267. close(newTCPPortForwards)
  1268. // Stop all other worker goroutines
  1269. sshClient.stopRunning()
  1270. if sshClient.sshServer.support.Config.RunPacketTunnel {
  1271. // PacketTunnelServer.ClientDisconnected stops packet tunnel workers.
  1272. sshClient.sshServer.support.PacketTunnelServer.ClientDisconnected(
  1273. sshClient.sessionID)
  1274. }
  1275. waitGroup.Wait()
  1276. sshClient.cleanupAuthorizations()
  1277. }
  1278. func (sshClient *sshClient) handleSSHRequests(requests <-chan *ssh.Request) {
  1279. for request := range requests {
  1280. // Requests are processed serially; API responses must be sent in request order.
  1281. var responsePayload []byte
  1282. var err error
  1283. if request.Type == "keepalive@openssh.com" {
  1284. // SSH keep alive round trips are used as speed test samples.
  1285. responsePayload, err = tactics.MakeSpeedTestResponse(
  1286. SSH_KEEP_ALIVE_PAYLOAD_MIN_BYTES, SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES)
  1287. } else {
  1288. // All other requests are assumed to be API requests.
  1289. sshClient.Lock()
  1290. authorizedAccessTypes := sshClient.handshakeState.authorizedAccessTypes
  1291. sshClient.Unlock()
  1292. // Note: unlock before use is only safe as long as referenced sshClient data,
  1293. // such as slices in handshakeState, is read-only after initially set.
  1294. responsePayload, err = sshAPIRequestHandler(
  1295. sshClient.sshServer.support,
  1296. sshClient.geoIPData,
  1297. authorizedAccessTypes,
  1298. request.Type,
  1299. request.Payload)
  1300. }
  1301. if err == nil {
  1302. err = request.Reply(true, responsePayload)
  1303. } else {
  1304. log.WithContextFields(LogFields{"error": err}).Warning("request failed")
  1305. err = request.Reply(false, nil)
  1306. }
  1307. if err != nil {
  1308. if !isExpectedTunnelIOError(err) {
  1309. log.WithContextFields(LogFields{"error": err}).Warning("response failed")
  1310. }
  1311. }
  1312. }
  1313. }
  1314. type newTCPPortForward struct {
  1315. enqueueTime monotime.Time
  1316. hostToConnect string
  1317. portToConnect int
  1318. newChannel ssh.NewChannel
  1319. }
  1320. func (sshClient *sshClient) handleTCPPortForwards(
  1321. waitGroup *sync.WaitGroup,
  1322. newTCPPortForwards chan *newTCPPortForward) {
  1323. // Lifecycle of a TCP port forward:
  1324. //
  1325. // 1. A "direct-tcpip" SSH request is received from the client.
  1326. //
  1327. // A new TCP port forward request is enqueued. The queue delivers TCP port
  1328. // forward requests to the TCP port forward manager, which enforces the TCP
  1329. // port forward dial limit.
  1330. //
  1331. // Enqueuing new requests allows for reading further SSH requests from the
  1332. // client without blocking when the dial limit is hit; this is to permit new
  1333. // UDP/udpgw port forwards to be restablished without delay. The maximum size
  1334. // of the queue enforces a hard cap on resources consumed by a client in the
  1335. // pre-dial phase. When the queue is full, new TCP port forwards are
  1336. // immediately rejected.
  1337. //
  1338. // 2. The TCP port forward manager dequeues the request.
  1339. //
  1340. // The manager calls dialingTCPPortForward(), which increments
  1341. // concurrentDialingPortForwardCount, and calls
  1342. // isTCPDialingPortForwardLimitExceeded() to check the concurrent dialing
  1343. // count.
  1344. //
  1345. // The manager enforces the concurrent TCP dial limit: when at the limit, the
  1346. // manager blocks waiting for the number of dials to drop below the limit before
  1347. // dispatching the request to handleTCPPortForward(), which will run in its own
  1348. // goroutine and will dial and relay the port forward.
  1349. //
  1350. // The block delays the current request and also halts dequeuing of subsequent
  1351. // requests and could ultimately cause requests to be immediately rejected if
  1352. // the queue fills. These actions are intended to apply back pressure when
  1353. // upstream network resources are impaired.
  1354. //
  1355. // The time spent in the queue is deducted from the port forward's dial timeout.
  1356. // The time spent blocking while at the dial limit is similarly deducted from
  1357. // the dial timeout. If the dial timeout has expired before the dial begins, the
  1358. // port forward is rejected and a stat is recorded.
  1359. //
  1360. // 3. handleTCPPortForward() performs the port forward dial and relaying.
  1361. //
  1362. // a. Dial the target, using the dial timeout remaining after queue and blocking
  1363. // time is deducted.
  1364. //
  1365. // b. If the dial fails, call abortedTCPPortForward() to decrement
  1366. // concurrentDialingPortForwardCount, freeing up a dial slot.
  1367. //
  1368. // c. If the dial succeeds, call establishedPortForward(), which decrements
  1369. // concurrentDialingPortForwardCount and increments concurrentPortForwardCount,
  1370. // the "established" port forward count.
  1371. //
  1372. // d. Check isPortForwardLimitExceeded(), which enforces the configurable limit on
  1373. // concurrentPortForwardCount, the number of _established_ TCP port forwards.
  1374. // If the limit is exceeded, the LRU established TCP port forward is closed and
  1375. // the newly established TCP port forward proceeds. This LRU logic allows some
  1376. // dangling resource consumption (e.g., TIME_WAIT) while providing a better
  1377. // experience for clients.
  1378. //
  1379. // e. Relay data.
  1380. //
  1381. // f. Call closedPortForward() which decrements concurrentPortForwardCount and
  1382. // records bytes transferred.
  1383. for newPortForward := range newTCPPortForwards {
  1384. remainingDialTimeout :=
  1385. time.Duration(sshClient.getDialTCPPortForwardTimeoutMilliseconds())*time.Millisecond -
  1386. monotime.Since(newPortForward.enqueueTime)
  1387. if remainingDialTimeout <= 0 {
  1388. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  1389. sshClient.rejectNewChannel(
  1390. newPortForward.newChannel, "TCP port forward timed out in queue")
  1391. continue
  1392. }
  1393. // Reserve a TCP dialing slot.
  1394. //
  1395. // TOCTOU note: important to increment counts _before_ checking limits; otherwise,
  1396. // the client could potentially consume excess resources by initiating many port
  1397. // forwards concurrently.
  1398. sshClient.dialingTCPPortForward()
  1399. // When max dials are in progress, wait up to remainingDialTimeout for dialing
  1400. // to become available. This blocks all dequeing.
  1401. if sshClient.isTCPDialingPortForwardLimitExceeded() {
  1402. blockStartTime := monotime.Now()
  1403. ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  1404. sshClient.setTCPPortForwardDialingAvailableSignal(cancelCtx)
  1405. <-ctx.Done()
  1406. sshClient.setTCPPortForwardDialingAvailableSignal(nil)
  1407. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  1408. remainingDialTimeout -= monotime.Since(blockStartTime)
  1409. }
  1410. if remainingDialTimeout <= 0 {
  1411. // Release the dialing slot here since handleTCPChannel() won't be called.
  1412. sshClient.abortedTCPPortForward()
  1413. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  1414. sshClient.rejectNewChannel(
  1415. newPortForward.newChannel, "TCP port forward timed out before dialing")
  1416. continue
  1417. }
  1418. // Dial and relay the TCP port forward. handleTCPChannel is run in its own worker goroutine.
  1419. // handleTCPChannel will release the dialing slot reserved by dialingTCPPortForward(); and
  1420. // will deal with remainingDialTimeout <= 0.
  1421. waitGroup.Add(1)
  1422. go func(remainingDialTimeout time.Duration, newPortForward *newTCPPortForward) {
  1423. defer waitGroup.Done()
  1424. sshClient.handleTCPChannel(
  1425. remainingDialTimeout,
  1426. newPortForward.hostToConnect,
  1427. newPortForward.portToConnect,
  1428. newPortForward.newChannel)
  1429. }(remainingDialTimeout, newPortForward)
  1430. }
  1431. }
  1432. func (sshClient *sshClient) handleNewRandomStreamChannel(
  1433. waitGroup *sync.WaitGroup, newChannel ssh.NewChannel) {
  1434. // A random stream channel returns the requested number of bytes -- random
  1435. // bytes -- to the client while also consuming and discarding bytes sent
  1436. // by the client.
  1437. //
  1438. // One use case for the random stream channel is a liveness test that the
  1439. // client performs to confirm that the tunnel is live. As the liveness
  1440. // test is performed in the concurrent establishment phase, before
  1441. // selecting a single candidate for handshake, the random stream channel
  1442. // is available pre-handshake, albeit with additional restrictions.
  1443. //
  1444. // The random stream is subject to throttling in traffic rules; for
  1445. // unthrottled liveness tests, set initial Read/WriteUnthrottledBytes as
  1446. // required. The random stream maximum count and response size cap
  1447. // mitigate clients abusing the facility to waste server resources.
  1448. //
  1449. // Like all other channels, this channel type is handled asynchronously,
  1450. // so it's possible to run at any point in the tunnel lifecycle.
  1451. //
  1452. // Up/downstream byte counts don't include SSH packet and request
  1453. // marshalling overhead.
  1454. var request protocol.RandomStreamRequest
  1455. err := json.Unmarshal(newChannel.ExtraData(), &request)
  1456. if err != nil {
  1457. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("invalid request: %s", err))
  1458. return
  1459. }
  1460. if request.UpstreamBytes > RANDOM_STREAM_MAX_BYTES {
  1461. sshClient.rejectNewChannel(newChannel,
  1462. fmt.Sprintf("invalid upstream bytes: %d", request.UpstreamBytes))
  1463. return
  1464. }
  1465. if request.DownstreamBytes > RANDOM_STREAM_MAX_BYTES {
  1466. sshClient.rejectNewChannel(newChannel,
  1467. fmt.Sprintf("invalid downstream bytes: %d", request.DownstreamBytes))
  1468. return
  1469. }
  1470. var metrics *randomStreamMetrics
  1471. sshClient.Lock()
  1472. if !sshClient.handshakeState.completed {
  1473. metrics = &sshClient.preHandshakeRandomStreamMetrics
  1474. } else {
  1475. metrics = &sshClient.postHandshakeRandomStreamMetrics
  1476. }
  1477. countOk := true
  1478. if !sshClient.handshakeState.completed &&
  1479. metrics.count >= PRE_HANDSHAKE_RANDOM_STREAM_MAX_COUNT {
  1480. countOk = false
  1481. } else {
  1482. metrics.count++
  1483. }
  1484. sshClient.Unlock()
  1485. if !countOk {
  1486. sshClient.rejectNewChannel(newChannel, "max count exceeded")
  1487. return
  1488. }
  1489. channel, requests, err := newChannel.Accept()
  1490. if err != nil {
  1491. if !isExpectedTunnelIOError(err) {
  1492. log.WithContextFields(LogFields{"error": err}).Warning("accept new channel failed")
  1493. }
  1494. return
  1495. }
  1496. go ssh.DiscardRequests(requests)
  1497. waitGroup.Add(1)
  1498. go func() {
  1499. defer waitGroup.Done()
  1500. received := 0
  1501. sent := 0
  1502. if request.UpstreamBytes > 0 {
  1503. n, err := io.CopyN(ioutil.Discard, channel, int64(request.UpstreamBytes))
  1504. received = int(n)
  1505. if err != nil {
  1506. if !isExpectedTunnelIOError(err) {
  1507. log.WithContextFields(LogFields{"error": err}).Warning("receive failed")
  1508. }
  1509. // Fall through and record any bytes received...
  1510. }
  1511. }
  1512. if request.DownstreamBytes > 0 {
  1513. n, err := io.CopyN(channel, rand.Reader, int64(request.DownstreamBytes))
  1514. sent = int(n)
  1515. if err != nil {
  1516. if !isExpectedTunnelIOError(err) {
  1517. log.WithContextFields(LogFields{"error": err}).Warning("send failed")
  1518. }
  1519. }
  1520. }
  1521. sshClient.Lock()
  1522. metrics.upstreamBytes += request.UpstreamBytes
  1523. metrics.receivedUpstreamBytes += received
  1524. metrics.downstreamBytes += request.DownstreamBytes
  1525. metrics.sentDownstreamBytes += sent
  1526. sshClient.Unlock()
  1527. channel.Close()
  1528. }()
  1529. }
  1530. func (sshClient *sshClient) handleNewPacketTunnelChannel(
  1531. waitGroup *sync.WaitGroup, newChannel ssh.NewChannel) {
  1532. // packet tunnel channels are handled by the packet tunnel server
  1533. // component. Each client may have at most one packet tunnel channel.
  1534. if !sshClient.sshServer.support.Config.RunPacketTunnel {
  1535. sshClient.rejectNewChannel(newChannel, "unsupported packet tunnel channel type")
  1536. return
  1537. }
  1538. // Accept this channel immediately. This channel will replace any
  1539. // previously existing packet tunnel channel for this client.
  1540. packetTunnelChannel, requests, err := newChannel.Accept()
  1541. if err != nil {
  1542. if !isExpectedTunnelIOError(err) {
  1543. log.WithContextFields(LogFields{"error": err}).Warning("accept new channel failed")
  1544. }
  1545. return
  1546. }
  1547. go ssh.DiscardRequests(requests)
  1548. sshClient.setPacketTunnelChannel(packetTunnelChannel)
  1549. // PacketTunnelServer will run the client's packet tunnel. If necessary, ClientConnected
  1550. // will stop packet tunnel workers for any previous packet tunnel channel.
  1551. checkAllowedTCPPortFunc := func(upstreamIPAddress net.IP, port int) bool {
  1552. return sshClient.isPortForwardPermitted(portForwardTypeTCP, upstreamIPAddress, port)
  1553. }
  1554. checkAllowedUDPPortFunc := func(upstreamIPAddress net.IP, port int) bool {
  1555. return sshClient.isPortForwardPermitted(portForwardTypeUDP, upstreamIPAddress, port)
  1556. }
  1557. flowActivityUpdaterMaker := func(
  1558. upstreamHostname string, upstreamIPAddress net.IP) []tun.FlowActivityUpdater {
  1559. var updaters []tun.FlowActivityUpdater
  1560. oslUpdater := sshClient.newClientSeedPortForward(upstreamIPAddress)
  1561. if oslUpdater != nil {
  1562. updaters = append(updaters, oslUpdater)
  1563. }
  1564. return updaters
  1565. }
  1566. metricUpdater := func(
  1567. TCPApplicationBytesDown, TCPApplicationBytesUp,
  1568. UDPApplicationBytesDown, UDPApplicationBytesUp int64) {
  1569. sshClient.Lock()
  1570. sshClient.tcpTrafficState.bytesDown += TCPApplicationBytesDown
  1571. sshClient.tcpTrafficState.bytesUp += TCPApplicationBytesUp
  1572. sshClient.udpTrafficState.bytesDown += UDPApplicationBytesDown
  1573. sshClient.udpTrafficState.bytesUp += UDPApplicationBytesUp
  1574. sshClient.Unlock()
  1575. }
  1576. err = sshClient.sshServer.support.PacketTunnelServer.ClientConnected(
  1577. sshClient.sessionID,
  1578. packetTunnelChannel,
  1579. checkAllowedTCPPortFunc,
  1580. checkAllowedUDPPortFunc,
  1581. flowActivityUpdaterMaker,
  1582. metricUpdater)
  1583. if err != nil {
  1584. log.WithContextFields(LogFields{"error": err}).Warning("start packet tunnel client failed")
  1585. sshClient.setPacketTunnelChannel(nil)
  1586. }
  1587. }
  1588. func (sshClient *sshClient) handleNewTCPPortForwardChannel(
  1589. waitGroup *sync.WaitGroup, newChannel ssh.NewChannel,
  1590. newTCPPortForwards chan *newTCPPortForward) {
  1591. // udpgw client connections are dispatched immediately (clients use this for
  1592. // DNS, so it's essential to not block; and only one udpgw connection is
  1593. // retained at a time).
  1594. //
  1595. // All other TCP port forwards are dispatched via the TCP port forward
  1596. // manager queue.
  1597. // http://tools.ietf.org/html/rfc4254#section-7.2
  1598. var directTcpipExtraData struct {
  1599. HostToConnect string
  1600. PortToConnect uint32
  1601. OriginatorIPAddress string
  1602. OriginatorPort uint32
  1603. }
  1604. err := ssh.Unmarshal(newChannel.ExtraData(), &directTcpipExtraData)
  1605. if err != nil {
  1606. sshClient.rejectNewChannel(newChannel, "invalid extra data")
  1607. return
  1608. }
  1609. // Intercept TCP port forwards to a specified udpgw server and handle directly.
  1610. // TODO: also support UDP explicitly, e.g. with a custom "direct-udp" channel type?
  1611. isUDPChannel := sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress != "" &&
  1612. sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress ==
  1613. net.JoinHostPort(directTcpipExtraData.HostToConnect, strconv.Itoa(int(directTcpipExtraData.PortToConnect)))
  1614. if isUDPChannel {
  1615. // Dispatch immediately. handleUDPChannel runs the udpgw protocol in its
  1616. // own worker goroutine.
  1617. waitGroup.Add(1)
  1618. go func(channel ssh.NewChannel) {
  1619. defer waitGroup.Done()
  1620. sshClient.handleUDPChannel(channel)
  1621. }(newChannel)
  1622. } else {
  1623. // Dispatch via TCP port forward manager. When the queue is full, the channel
  1624. // is immediately rejected.
  1625. tcpPortForward := &newTCPPortForward{
  1626. enqueueTime: monotime.Now(),
  1627. hostToConnect: directTcpipExtraData.HostToConnect,
  1628. portToConnect: int(directTcpipExtraData.PortToConnect),
  1629. newChannel: newChannel,
  1630. }
  1631. select {
  1632. case newTCPPortForwards <- tcpPortForward:
  1633. default:
  1634. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  1635. sshClient.rejectNewChannel(newChannel, "TCP port forward dial queue full")
  1636. }
  1637. }
  1638. }
  1639. func (sshClient *sshClient) cleanupAuthorizations() {
  1640. sshClient.Lock()
  1641. if sshClient.releaseAuthorizations != nil {
  1642. sshClient.releaseAuthorizations()
  1643. }
  1644. if sshClient.stopTimer != nil {
  1645. sshClient.stopTimer.Stop()
  1646. }
  1647. sshClient.Unlock()
  1648. }
  1649. // setPacketTunnelChannel sets the single packet tunnel channel
  1650. // for this sshClient. Any existing packet tunnel channel is
  1651. // closed.
  1652. func (sshClient *sshClient) setPacketTunnelChannel(channel ssh.Channel) {
  1653. sshClient.Lock()
  1654. if sshClient.packetTunnelChannel != nil {
  1655. sshClient.packetTunnelChannel.Close()
  1656. }
  1657. sshClient.packetTunnelChannel = channel
  1658. sshClient.Unlock()
  1659. }
  1660. // setUDPChannel sets the single UDP channel for this sshClient.
  1661. // Each sshClient may have only one concurrent UDP channel. Each
  1662. // UDP channel multiplexes many UDP port forwards via the udpgw
  1663. // protocol. Any existing UDP channel is closed.
  1664. func (sshClient *sshClient) setUDPChannel(channel ssh.Channel) {
  1665. sshClient.Lock()
  1666. if sshClient.udpChannel != nil {
  1667. sshClient.udpChannel.Close()
  1668. }
  1669. sshClient.udpChannel = channel
  1670. sshClient.Unlock()
  1671. }
  1672. var serverTunnelStatParams = append(
  1673. []requestParamSpec{
  1674. {"last_connected", isLastConnected, requestParamOptional},
  1675. {"establishment_duration", isIntString, requestParamOptional}},
  1676. baseRequestParams...)
  1677. func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
  1678. // Note: reporting duration based on last confirmed data transfer, which
  1679. // is reads for sshClient.activityConn.GetActiveDuration(), and not
  1680. // connection closing is important for protocols such as meek. For
  1681. // meek, the connection remains open until the HTTP session expires,
  1682. // which may be some time after the tunnel has closed. (The meek
  1683. // protocol has no allowance for signalling payload EOF, and even if
  1684. // it did the client may not have the opportunity to send a final
  1685. // request with an EOF flag set.)
  1686. sshClient.Lock()
  1687. logFields := getRequestLogFields(
  1688. "server_tunnel",
  1689. sshClient.geoIPData,
  1690. sshClient.handshakeState.authorizedAccessTypes,
  1691. sshClient.handshakeState.apiParams,
  1692. serverTunnelStatParams)
  1693. // "relay_protocol" is sent with handshake API parameters. In pre-
  1694. // handshake logTunnel cases, this value is not yet known. As
  1695. // sshClient.tunnelProtocol is authoritative, set this value
  1696. // unconditionally, overwriting any value from handshake.
  1697. logFields["relay_protocol"] = sshClient.tunnelProtocol
  1698. logFields["session_id"] = sshClient.sessionID
  1699. logFields["handshake_completed"] = sshClient.handshakeState.completed
  1700. logFields["start_time"] = sshClient.activityConn.GetStartTime()
  1701. logFields["duration"] = sshClient.activityConn.GetActiveDuration() / time.Millisecond
  1702. logFields["bytes_up_tcp"] = sshClient.tcpTrafficState.bytesUp
  1703. logFields["bytes_down_tcp"] = sshClient.tcpTrafficState.bytesDown
  1704. logFields["peak_concurrent_dialing_port_forward_count_tcp"] = sshClient.tcpTrafficState.peakConcurrentDialingPortForwardCount
  1705. logFields["peak_concurrent_port_forward_count_tcp"] = sshClient.tcpTrafficState.peakConcurrentPortForwardCount
  1706. logFields["total_port_forward_count_tcp"] = sshClient.tcpTrafficState.totalPortForwardCount
  1707. logFields["bytes_up_udp"] = sshClient.udpTrafficState.bytesUp
  1708. logFields["bytes_down_udp"] = sshClient.udpTrafficState.bytesDown
  1709. // sshClient.udpTrafficState.peakConcurrentDialingPortForwardCount isn't meaningful
  1710. logFields["peak_concurrent_port_forward_count_udp"] = sshClient.udpTrafficState.peakConcurrentPortForwardCount
  1711. logFields["total_port_forward_count_udp"] = sshClient.udpTrafficState.totalPortForwardCount
  1712. logFields["pre_handshake_random_stream_count"] = sshClient.preHandshakeRandomStreamMetrics.count
  1713. logFields["pre_handshake_random_stream_upstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.upstreamBytes
  1714. logFields["pre_handshake_random_stream_received_upstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.receivedUpstreamBytes
  1715. logFields["pre_handshake_random_stream_downstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.downstreamBytes
  1716. logFields["pre_handshake_random_stream_sent_downstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.sentDownstreamBytes
  1717. logFields["random_stream_count"] = sshClient.postHandshakeRandomStreamMetrics.count
  1718. logFields["random_stream_upstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.upstreamBytes
  1719. logFields["random_stream_received_upstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.receivedUpstreamBytes
  1720. logFields["random_stream_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.downstreamBytes
  1721. logFields["random_stream_sent_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.sentDownstreamBytes
  1722. // Pre-calculate a total-tunneled-bytes field. This total is used
  1723. // extensively in analytics and is more performant when pre-calculated.
  1724. logFields["bytes"] = sshClient.tcpTrafficState.bytesUp +
  1725. sshClient.tcpTrafficState.bytesDown +
  1726. sshClient.udpTrafficState.bytesUp +
  1727. sshClient.udpTrafficState.bytesDown
  1728. // Merge in additional metrics from the optional metrics source
  1729. for _, metrics := range additionalMetrics {
  1730. for name, value := range metrics {
  1731. // Don't overwrite any basic fields
  1732. if logFields[name] == nil {
  1733. logFields[name] = value
  1734. }
  1735. }
  1736. }
  1737. sshClient.Unlock()
  1738. // Note: unlock before use is only safe as long as referenced sshClient data,
  1739. // such as slices in handshakeState, is read-only after initially set.
  1740. log.LogRawFieldsWithTimestamp(logFields)
  1741. }
  1742. var blocklistHitsStatParams = []requestParamSpec{
  1743. {"propagation_channel_id", isHexDigits, 0},
  1744. {"sponsor_id", isHexDigits, 0},
  1745. {"client_version", isIntString, requestParamLogStringAsInt},
  1746. {"client_platform", isClientPlatform, 0},
  1747. {"client_build_rev", isHexDigits, requestParamOptional},
  1748. {"tunnel_whole_device", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool},
  1749. {"device_region", isAnyString, requestParamOptional},
  1750. {"egress_region", isRegionCode, requestParamOptional},
  1751. {"session_id", isHexDigits, 0},
  1752. {"last_connected", isLastConnected, requestParamOptional},
  1753. }
  1754. func (sshClient *sshClient) logBlocklistHits(remoteIP net.IP, tags []BlocklistTag) {
  1755. sshClient.Lock()
  1756. logFields := getRequestLogFields(
  1757. "server_blocklist_hit",
  1758. sshClient.geoIPData,
  1759. sshClient.handshakeState.authorizedAccessTypes,
  1760. sshClient.handshakeState.apiParams,
  1761. blocklistHitsStatParams)
  1762. logFields["session_id"] = sshClient.sessionID
  1763. // Note: see comment in logTunnel regarding unlock and concurrent access.
  1764. sshClient.Unlock()
  1765. for _, tag := range tags {
  1766. logFields["blocklist_ip_address"] = remoteIP.String()
  1767. logFields["blocklist_source"] = tag.Source
  1768. logFields["blocklist_subject"] = tag.Subject
  1769. log.LogRawFieldsWithTimestamp(logFields)
  1770. }
  1771. }
  1772. func (sshClient *sshClient) runOSLSender() {
  1773. for {
  1774. // Await a signal that there are SLOKs to send
  1775. // TODO: use reflect.SelectCase, and optionally await timer here?
  1776. select {
  1777. case <-sshClient.signalIssueSLOKs:
  1778. case <-sshClient.runCtx.Done():
  1779. return
  1780. }
  1781. retryDelay := SSH_SEND_OSL_INITIAL_RETRY_DELAY
  1782. for {
  1783. err := sshClient.sendOSLRequest()
  1784. if err == nil {
  1785. break
  1786. }
  1787. if !isExpectedTunnelIOError(err) {
  1788. log.WithContextFields(LogFields{"error": err}).Warning("sendOSLRequest failed")
  1789. }
  1790. // If the request failed, retry after a delay (with exponential backoff)
  1791. // or when signaled that there are additional SLOKs to send
  1792. retryTimer := time.NewTimer(retryDelay)
  1793. select {
  1794. case <-retryTimer.C:
  1795. case <-sshClient.signalIssueSLOKs:
  1796. case <-sshClient.runCtx.Done():
  1797. retryTimer.Stop()
  1798. return
  1799. }
  1800. retryTimer.Stop()
  1801. retryDelay *= SSH_SEND_OSL_RETRY_FACTOR
  1802. }
  1803. }
  1804. }
  1805. // sendOSLRequest will invoke osl.GetSeedPayload to issue SLOKs and
  1806. // generate a payload, and send an OSL request to the client when
  1807. // there are new SLOKs in the payload.
  1808. func (sshClient *sshClient) sendOSLRequest() error {
  1809. seedPayload := sshClient.getOSLSeedPayload()
  1810. // Don't send when no SLOKs. This will happen when signalIssueSLOKs
  1811. // is received but no new SLOKs are issued.
  1812. if len(seedPayload.SLOKs) == 0 {
  1813. return nil
  1814. }
  1815. oslRequest := protocol.OSLRequest{
  1816. SeedPayload: seedPayload,
  1817. }
  1818. requestPayload, err := json.Marshal(oslRequest)
  1819. if err != nil {
  1820. return common.ContextError(err)
  1821. }
  1822. ok, _, err := sshClient.sshConn.SendRequest(
  1823. protocol.PSIPHON_API_OSL_REQUEST_NAME,
  1824. true,
  1825. requestPayload)
  1826. if err != nil {
  1827. return common.ContextError(err)
  1828. }
  1829. if !ok {
  1830. return common.ContextError(errors.New("client rejected request"))
  1831. }
  1832. sshClient.clearOSLSeedPayload()
  1833. return nil
  1834. }
  1835. func (sshClient *sshClient) rejectNewChannel(newChannel ssh.NewChannel, logMessage string) {
  1836. // We always return the reject reason "Prohibited":
  1837. // - Traffic rules and connection limits may prohibit the connection.
  1838. // - External firewall rules may prohibit the connection, and this is not currently
  1839. // distinguishable from other failure modes.
  1840. // - We limit the failure information revealed to the client.
  1841. reason := ssh.Prohibited
  1842. // Note: Debug level, as logMessage may contain user traffic destination address information
  1843. log.WithContextFields(
  1844. LogFields{
  1845. "channelType": newChannel.ChannelType(),
  1846. "logMessage": logMessage,
  1847. "rejectReason": reason.String(),
  1848. }).Debug("reject new channel")
  1849. // Note: logMessage is internal, for logging only; just the reject reason is sent to the client.
  1850. newChannel.Reject(reason, reason.String())
  1851. }
  1852. // setHandshakeState records that a client has completed a handshake API request.
  1853. // Some parameters from the handshake request may be used in future traffic rule
  1854. // selection. Port forwards are disallowed until a handshake is complete. The
  1855. // handshake parameters are included in the session summary log recorded in
  1856. // sshClient.stop().
  1857. func (sshClient *sshClient) setHandshakeState(
  1858. state handshakeState,
  1859. authorizations []string) ([]string, []string, error) {
  1860. sshClient.Lock()
  1861. completed := sshClient.handshakeState.completed
  1862. if !completed {
  1863. sshClient.handshakeState = state
  1864. }
  1865. sshClient.Unlock()
  1866. // Client must only perform one handshake
  1867. if completed {
  1868. return nil, nil, common.ContextError(errors.New("handshake already completed"))
  1869. }
  1870. // Verify the authorizations submitted by the client. Verified, active
  1871. // (non-expired) access types will be available for traffic rules
  1872. // filtering.
  1873. //
  1874. // When an authorization is active but expires while the client is
  1875. // connected, the client is disconnected to ensure the access is reset.
  1876. // This is implemented by setting a timer to perform the disconnect at the
  1877. // expiry time of the soonest expiring authorization.
  1878. //
  1879. // sshServer.authorizationSessionIDs tracks the unique mapping of active
  1880. // authorization IDs to client session IDs and is used to detect and
  1881. // prevent multiple malicious clients from reusing a single authorization
  1882. // (within the scope of this server).
  1883. // authorizationIDs and authorizedAccessTypes are returned to the client
  1884. // and logged, respectively; initialize to empty lists so the
  1885. // protocol/logs don't need to handle 'null' values.
  1886. authorizationIDs := make([]string, 0)
  1887. authorizedAccessTypes := make([]string, 0)
  1888. var stopTime time.Time
  1889. for i, authorization := range authorizations {
  1890. // This sanity check mitigates malicious clients causing excess CPU use.
  1891. if i >= MAX_AUTHORIZATIONS {
  1892. log.WithContext().Warning("too many authorizations")
  1893. break
  1894. }
  1895. verifiedAuthorization, err := accesscontrol.VerifyAuthorization(
  1896. &sshClient.sshServer.support.Config.AccessControlVerificationKeyRing,
  1897. authorization)
  1898. if err != nil {
  1899. log.WithContextFields(
  1900. LogFields{"error": err}).Warning("verify authorization failed")
  1901. continue
  1902. }
  1903. authorizationID := base64.StdEncoding.EncodeToString(verifiedAuthorization.ID)
  1904. if common.Contains(authorizedAccessTypes, verifiedAuthorization.AccessType) {
  1905. log.WithContextFields(
  1906. LogFields{"accessType": verifiedAuthorization.AccessType}).Warning("duplicate authorization access type")
  1907. continue
  1908. }
  1909. authorizationIDs = append(authorizationIDs, authorizationID)
  1910. authorizedAccessTypes = append(authorizedAccessTypes, verifiedAuthorization.AccessType)
  1911. if stopTime.IsZero() || stopTime.After(verifiedAuthorization.Expires) {
  1912. stopTime = verifiedAuthorization.Expires
  1913. }
  1914. }
  1915. // Associate all verified authorizationIDs with this client's session ID.
  1916. // Handle cases where previous associations exist:
  1917. //
  1918. // - Multiple malicious clients reusing a single authorization. In this
  1919. // case, authorizations are revoked from the previous client.
  1920. //
  1921. // - The client reconnected with a new session ID due to user toggling.
  1922. // This case is expected due to server affinity. This cannot be
  1923. // distinguished from the previous case and the same action is taken;
  1924. // this will have no impact on a legitimate client as the previous
  1925. // session is dangling.
  1926. //
  1927. // - The client automatically reconnected with the same session ID. This
  1928. // case is not expected as sshServer.registerEstablishedClient
  1929. // synchronously calls sshClient.releaseAuthorizations; as a safe guard,
  1930. // this case is distinguished and no revocation action is taken.
  1931. sshClient.sshServer.authorizationSessionIDsMutex.Lock()
  1932. for _, authorizationID := range authorizationIDs {
  1933. sessionID, ok := sshClient.sshServer.authorizationSessionIDs[authorizationID]
  1934. if ok && sessionID != sshClient.sessionID {
  1935. log.WithContextFields(
  1936. LogFields{"authorizationID": authorizationID}).Warning("duplicate active authorization")
  1937. // Invoke asynchronously to avoid deadlocks.
  1938. // TODO: invoke only once for each distinct sessionID?
  1939. go sshClient.sshServer.revokeClientAuthorizations(sessionID)
  1940. }
  1941. sshClient.sshServer.authorizationSessionIDs[authorizationID] = sshClient.sessionID
  1942. }
  1943. sshClient.sshServer.authorizationSessionIDsMutex.Unlock()
  1944. if len(authorizationIDs) > 0 {
  1945. sshClient.Lock()
  1946. // Make the authorizedAccessTypes available for traffic rules filtering.
  1947. sshClient.handshakeState.authorizedAccessTypes = authorizedAccessTypes
  1948. // On exit, sshClient.runTunnel will call releaseAuthorizations, which
  1949. // will release the authorization IDs so the client can reconnect and
  1950. // present the same authorizations again. sshClient.runTunnel will
  1951. // also cancel the stopTimer in case it has not yet fired.
  1952. // Note: termination of the stopTimer goroutine is not synchronized.
  1953. sshClient.releaseAuthorizations = func() {
  1954. sshClient.sshServer.authorizationSessionIDsMutex.Lock()
  1955. for _, authorizationID := range authorizationIDs {
  1956. sessionID, ok := sshClient.sshServer.authorizationSessionIDs[authorizationID]
  1957. if ok && sessionID == sshClient.sessionID {
  1958. delete(sshClient.sshServer.authorizationSessionIDs, authorizationID)
  1959. }
  1960. }
  1961. sshClient.sshServer.authorizationSessionIDsMutex.Unlock()
  1962. }
  1963. sshClient.stopTimer = time.AfterFunc(
  1964. stopTime.Sub(time.Now()),
  1965. func() {
  1966. sshClient.stop()
  1967. })
  1968. sshClient.Unlock()
  1969. }
  1970. sshClient.setTrafficRules()
  1971. sshClient.setOSLConfig()
  1972. return authorizationIDs, authorizedAccessTypes, nil
  1973. }
  1974. // getHandshaked returns whether the client has completed a handshake API
  1975. // request and whether the traffic rules that were selected after the
  1976. // handshake immediately exhaust the client.
  1977. //
  1978. // When the client is immediately exhausted it will be closed; but this
  1979. // takes effect asynchronously. The "exhausted" return value is used to
  1980. // prevent API requests by clients that will close.
  1981. func (sshClient *sshClient) getHandshaked() (bool, bool) {
  1982. sshClient.Lock()
  1983. defer sshClient.Unlock()
  1984. completed := sshClient.handshakeState.completed
  1985. exhausted := false
  1986. // Notes:
  1987. // - "Immediately exhausted" is when CloseAfterExhausted is set and
  1988. // either ReadUnthrottledBytes or WriteUnthrottledBytes starts from
  1989. // 0, so no bytes would be read or written. This check does not
  1990. // examine whether 0 bytes _remain_ in the ThrottledConn.
  1991. // - This check is made against the current traffic rules, which
  1992. // could have changed in a hot reload since the handshake.
  1993. if completed &&
  1994. *sshClient.trafficRules.RateLimits.CloseAfterExhausted == true &&
  1995. (*sshClient.trafficRules.RateLimits.ReadUnthrottledBytes == 0 ||
  1996. *sshClient.trafficRules.RateLimits.WriteUnthrottledBytes == 0) {
  1997. exhausted = true
  1998. }
  1999. return completed, exhausted
  2000. }
  2001. func (sshClient *sshClient) updateAPIParameters(
  2002. apiParams common.APIParameters) {
  2003. sshClient.Lock()
  2004. defer sshClient.Unlock()
  2005. // Only update after handshake has initialized API params.
  2006. if !sshClient.handshakeState.completed {
  2007. return
  2008. }
  2009. for name, value := range apiParams {
  2010. sshClient.handshakeState.apiParams[name] = value
  2011. }
  2012. }
  2013. func (sshClient *sshClient) expectDomainBytes() bool {
  2014. sshClient.Lock()
  2015. defer sshClient.Unlock()
  2016. return sshClient.handshakeState.expectDomainBytes
  2017. }
  2018. // setTrafficRules resets the client's traffic rules based on the latest server config
  2019. // and client properties. As sshClient.trafficRules may be reset by a concurrent
  2020. // goroutine, trafficRules must only be accessed within the sshClient mutex.
  2021. func (sshClient *sshClient) setTrafficRules() {
  2022. sshClient.Lock()
  2023. defer sshClient.Unlock()
  2024. sshClient.trafficRules = sshClient.sshServer.support.TrafficRulesSet.GetTrafficRules(
  2025. sshClient.isFirstTunnelInSession,
  2026. sshClient.tunnelProtocol,
  2027. sshClient.geoIPData,
  2028. sshClient.handshakeState)
  2029. if sshClient.throttledConn != nil {
  2030. // Any existing throttling state is reset.
  2031. sshClient.throttledConn.SetLimits(
  2032. sshClient.trafficRules.RateLimits.CommonRateLimits())
  2033. }
  2034. }
  2035. // setOSLConfig resets the client's OSL seed state based on the latest OSL config
  2036. // As sshClient.oslClientSeedState may be reset by a concurrent goroutine,
  2037. // oslClientSeedState must only be accessed within the sshClient mutex.
  2038. func (sshClient *sshClient) setOSLConfig() {
  2039. sshClient.Lock()
  2040. defer sshClient.Unlock()
  2041. propagationChannelID, err := getStringRequestParam(
  2042. sshClient.handshakeState.apiParams, "propagation_channel_id")
  2043. if err != nil {
  2044. // This should not fail as long as client has sent valid handshake
  2045. return
  2046. }
  2047. // Use a cached seed state if one is found for the client's
  2048. // session ID. This enables resuming progress made in a previous
  2049. // tunnel.
  2050. // Note: go-cache is already concurency safe; the additional mutex
  2051. // is necessary to guarantee that Get/Delete is atomic; although in
  2052. // practice no two concurrent clients should ever supply the same
  2053. // session ID.
  2054. sshClient.sshServer.oslSessionCacheMutex.Lock()
  2055. oslClientSeedState, found := sshClient.sshServer.oslSessionCache.Get(sshClient.sessionID)
  2056. if found {
  2057. sshClient.sshServer.oslSessionCache.Delete(sshClient.sessionID)
  2058. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  2059. sshClient.oslClientSeedState = oslClientSeedState.(*osl.ClientSeedState)
  2060. sshClient.oslClientSeedState.Resume(sshClient.signalIssueSLOKs)
  2061. return
  2062. }
  2063. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  2064. // Two limitations when setOSLConfig() is invoked due to an
  2065. // OSL config hot reload:
  2066. //
  2067. // 1. any partial progress towards SLOKs is lost.
  2068. //
  2069. // 2. all existing osl.ClientSeedPortForwards for existing
  2070. // port forwards will not send progress to the new client
  2071. // seed state.
  2072. sshClient.oslClientSeedState = sshClient.sshServer.support.OSLConfig.NewClientSeedState(
  2073. sshClient.geoIPData.Country,
  2074. propagationChannelID,
  2075. sshClient.signalIssueSLOKs)
  2076. }
  2077. // newClientSeedPortForward will return nil when no seeding is
  2078. // associated with the specified ipAddress.
  2079. func (sshClient *sshClient) newClientSeedPortForward(ipAddress net.IP) *osl.ClientSeedPortForward {
  2080. sshClient.Lock()
  2081. defer sshClient.Unlock()
  2082. // Will not be initialized before handshake.
  2083. if sshClient.oslClientSeedState == nil {
  2084. return nil
  2085. }
  2086. return sshClient.oslClientSeedState.NewClientSeedPortForward(ipAddress)
  2087. }
  2088. // getOSLSeedPayload returns a payload containing all seeded SLOKs for
  2089. // this client's session.
  2090. func (sshClient *sshClient) getOSLSeedPayload() *osl.SeedPayload {
  2091. sshClient.Lock()
  2092. defer sshClient.Unlock()
  2093. // Will not be initialized before handshake.
  2094. if sshClient.oslClientSeedState == nil {
  2095. return &osl.SeedPayload{SLOKs: make([]*osl.SLOK, 0)}
  2096. }
  2097. return sshClient.oslClientSeedState.GetSeedPayload()
  2098. }
  2099. func (sshClient *sshClient) clearOSLSeedPayload() {
  2100. sshClient.Lock()
  2101. defer sshClient.Unlock()
  2102. sshClient.oslClientSeedState.ClearSeedPayload()
  2103. }
  2104. func (sshClient *sshClient) rateLimits() common.RateLimits {
  2105. sshClient.Lock()
  2106. defer sshClient.Unlock()
  2107. return sshClient.trafficRules.RateLimits.CommonRateLimits()
  2108. }
  2109. func (sshClient *sshClient) idleTCPPortForwardTimeout() time.Duration {
  2110. sshClient.Lock()
  2111. defer sshClient.Unlock()
  2112. return time.Duration(*sshClient.trafficRules.IdleTCPPortForwardTimeoutMilliseconds) * time.Millisecond
  2113. }
  2114. func (sshClient *sshClient) idleUDPPortForwardTimeout() time.Duration {
  2115. sshClient.Lock()
  2116. defer sshClient.Unlock()
  2117. return time.Duration(*sshClient.trafficRules.IdleUDPPortForwardTimeoutMilliseconds) * time.Millisecond
  2118. }
  2119. func (sshClient *sshClient) setTCPPortForwardDialingAvailableSignal(signal context.CancelFunc) {
  2120. sshClient.Lock()
  2121. defer sshClient.Unlock()
  2122. sshClient.tcpPortForwardDialingAvailableSignal = signal
  2123. }
  2124. const (
  2125. portForwardTypeTCP = iota
  2126. portForwardTypeUDP
  2127. )
  2128. func (sshClient *sshClient) isPortForwardPermitted(
  2129. portForwardType int,
  2130. remoteIP net.IP,
  2131. port int) bool {
  2132. // Disallow connection to loopback. This is a failsafe. The server
  2133. // should be run on a host with correctly configured firewall rules.
  2134. if remoteIP.IsLoopback() {
  2135. return false
  2136. }
  2137. // Blocklist check.
  2138. //
  2139. // Limitation: isPortForwardPermitted is not called in transparent DNS
  2140. // forwarding cases. As the destination IP address is rewritten in these
  2141. // cases, a blocklist entry won't be dialed in any case. However, no logs
  2142. // will be recorded.
  2143. tags := sshClient.sshServer.support.Blocklist.Lookup(remoteIP)
  2144. if len(tags) > 0 {
  2145. sshClient.logBlocklistHits(remoteIP, tags)
  2146. if sshClient.sshServer.support.Config.BlocklistActive {
  2147. return false
  2148. }
  2149. }
  2150. // Don't lock before calling logBlocklistHits.
  2151. sshClient.Lock()
  2152. defer sshClient.Unlock()
  2153. // Client must complete handshake before port forwards are permitted.
  2154. if !sshClient.handshakeState.completed {
  2155. return false
  2156. }
  2157. // Traffic rules checks.
  2158. switch portForwardType {
  2159. case portForwardTypeTCP:
  2160. if sshClient.trafficRules.AllowTCPPort(remoteIP, port) {
  2161. return true
  2162. }
  2163. case portForwardTypeUDP:
  2164. if sshClient.trafficRules.AllowUDPPort(remoteIP, port) {
  2165. return true
  2166. }
  2167. }
  2168. log.WithContextFields(
  2169. LogFields{
  2170. "type": portForwardType,
  2171. "port": port,
  2172. }).Debug("port forward denied by traffic rules")
  2173. return false
  2174. }
  2175. func (sshClient *sshClient) isTCPDialingPortForwardLimitExceeded() bool {
  2176. sshClient.Lock()
  2177. defer sshClient.Unlock()
  2178. state := &sshClient.tcpTrafficState
  2179. max := *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  2180. if max > 0 && state.concurrentDialingPortForwardCount >= int64(max) {
  2181. return true
  2182. }
  2183. return false
  2184. }
  2185. func (sshClient *sshClient) getTCPPortForwardQueueSize() int {
  2186. sshClient.Lock()
  2187. defer sshClient.Unlock()
  2188. return *sshClient.trafficRules.MaxTCPPortForwardCount +
  2189. *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  2190. }
  2191. func (sshClient *sshClient) getDialTCPPortForwardTimeoutMilliseconds() int {
  2192. sshClient.Lock()
  2193. defer sshClient.Unlock()
  2194. return *sshClient.trafficRules.DialTCPPortForwardTimeoutMilliseconds
  2195. }
  2196. func (sshClient *sshClient) dialingTCPPortForward() {
  2197. sshClient.Lock()
  2198. defer sshClient.Unlock()
  2199. state := &sshClient.tcpTrafficState
  2200. state.concurrentDialingPortForwardCount += 1
  2201. if state.concurrentDialingPortForwardCount > state.peakConcurrentDialingPortForwardCount {
  2202. state.peakConcurrentDialingPortForwardCount = state.concurrentDialingPortForwardCount
  2203. }
  2204. }
  2205. func (sshClient *sshClient) abortedTCPPortForward() {
  2206. sshClient.Lock()
  2207. defer sshClient.Unlock()
  2208. sshClient.tcpTrafficState.concurrentDialingPortForwardCount -= 1
  2209. }
  2210. func (sshClient *sshClient) allocatePortForward(portForwardType int) bool {
  2211. sshClient.Lock()
  2212. defer sshClient.Unlock()
  2213. // Check if at port forward limit. The subsequent counter
  2214. // changes must be atomic with the limit check to ensure
  2215. // the counter never exceeds the limit in the case of
  2216. // concurrent allocations.
  2217. var max int
  2218. var state *trafficState
  2219. if portForwardType == portForwardTypeTCP {
  2220. max = *sshClient.trafficRules.MaxTCPPortForwardCount
  2221. state = &sshClient.tcpTrafficState
  2222. } else {
  2223. max = *sshClient.trafficRules.MaxUDPPortForwardCount
  2224. state = &sshClient.udpTrafficState
  2225. }
  2226. if max > 0 && state.concurrentPortForwardCount >= int64(max) {
  2227. return false
  2228. }
  2229. // Update port forward counters.
  2230. if portForwardType == portForwardTypeTCP {
  2231. // Assumes TCP port forwards called dialingTCPPortForward
  2232. state.concurrentDialingPortForwardCount -= 1
  2233. if sshClient.tcpPortForwardDialingAvailableSignal != nil {
  2234. max := *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  2235. if max <= 0 || state.concurrentDialingPortForwardCount < int64(max) {
  2236. sshClient.tcpPortForwardDialingAvailableSignal()
  2237. }
  2238. }
  2239. }
  2240. state.concurrentPortForwardCount += 1
  2241. if state.concurrentPortForwardCount > state.peakConcurrentPortForwardCount {
  2242. state.peakConcurrentPortForwardCount = state.concurrentPortForwardCount
  2243. }
  2244. state.totalPortForwardCount += 1
  2245. return true
  2246. }
  2247. // establishedPortForward increments the concurrent port
  2248. // forward counter. closedPortForward decrements it, so it
  2249. // must always be called for each establishedPortForward
  2250. // call.
  2251. //
  2252. // When at the limit of established port forwards, the LRU
  2253. // existing port forward is closed to make way for the newly
  2254. // established one. There can be a minor delay as, in addition
  2255. // to calling Close() on the port forward net.Conn,
  2256. // establishedPortForward waits for the LRU's closedPortForward()
  2257. // call which will decrement the concurrent counter. This
  2258. // ensures all resources associated with the LRU (socket,
  2259. // goroutine) are released or will very soon be released before
  2260. // proceeding.
  2261. func (sshClient *sshClient) establishedPortForward(
  2262. portForwardType int, portForwardLRU *common.LRUConns) {
  2263. // Do not lock sshClient here.
  2264. var state *trafficState
  2265. if portForwardType == portForwardTypeTCP {
  2266. state = &sshClient.tcpTrafficState
  2267. } else {
  2268. state = &sshClient.udpTrafficState
  2269. }
  2270. // When the maximum number of port forwards is already
  2271. // established, close the LRU. CloseOldest will call
  2272. // Close on the port forward net.Conn. Both TCP and
  2273. // UDP port forwards have handler goroutines that may
  2274. // be blocked calling Read on the net.Conn. Close will
  2275. // eventually interrupt the Read and cause the handlers
  2276. // to exit, but not immediately. So the following logic
  2277. // waits for a LRU handler to be interrupted and signal
  2278. // availability.
  2279. //
  2280. // Notes:
  2281. //
  2282. // - the port forward limit can change via a traffic
  2283. // rules hot reload; the condition variable handles
  2284. // this case whereas a channel-based semaphore would
  2285. // not.
  2286. //
  2287. // - if a number of goroutines exceeding the total limit
  2288. // arrive here all concurrently, some CloseOldest() calls
  2289. // will have no effect as there can be less existing port
  2290. // forwards than new ones. In this case, the new port
  2291. // forward will be delayed. This is highly unlikely in
  2292. // practise since UDP calls to establishedPortForward are
  2293. // serialized and TCP calls are limited by the dial
  2294. // queue/count.
  2295. if !sshClient.allocatePortForward(portForwardType) {
  2296. portForwardLRU.CloseOldest()
  2297. log.WithContext().Debug("closed LRU port forward")
  2298. state.availablePortForwardCond.L.Lock()
  2299. for !sshClient.allocatePortForward(portForwardType) {
  2300. state.availablePortForwardCond.Wait()
  2301. }
  2302. state.availablePortForwardCond.L.Unlock()
  2303. }
  2304. }
  2305. func (sshClient *sshClient) closedPortForward(
  2306. portForwardType int, bytesUp, bytesDown int64) {
  2307. sshClient.Lock()
  2308. var state *trafficState
  2309. if portForwardType == portForwardTypeTCP {
  2310. state = &sshClient.tcpTrafficState
  2311. } else {
  2312. state = &sshClient.udpTrafficState
  2313. }
  2314. state.concurrentPortForwardCount -= 1
  2315. state.bytesUp += bytesUp
  2316. state.bytesDown += bytesDown
  2317. sshClient.Unlock()
  2318. // Signal any goroutine waiting in establishedPortForward
  2319. // that an established port forward slot is available.
  2320. state.availablePortForwardCond.Signal()
  2321. }
  2322. func (sshClient *sshClient) updateQualityMetricsWithDialResult(
  2323. tcpPortForwardDialSuccess bool, dialDuration time.Duration) {
  2324. sshClient.Lock()
  2325. defer sshClient.Unlock()
  2326. if tcpPortForwardDialSuccess {
  2327. sshClient.qualityMetrics.tcpPortForwardDialedCount += 1
  2328. sshClient.qualityMetrics.tcpPortForwardDialedDuration += dialDuration
  2329. } else {
  2330. sshClient.qualityMetrics.tcpPortForwardFailedCount += 1
  2331. sshClient.qualityMetrics.tcpPortForwardFailedDuration += dialDuration
  2332. }
  2333. }
  2334. func (sshClient *sshClient) updateQualityMetricsWithRejectedDialingLimit() {
  2335. sshClient.Lock()
  2336. defer sshClient.Unlock()
  2337. sshClient.qualityMetrics.tcpPortForwardRejectedDialingLimitCount += 1
  2338. }
  2339. func (sshClient *sshClient) handleTCPChannel(
  2340. remainingDialTimeout time.Duration,
  2341. hostToConnect string,
  2342. portToConnect int,
  2343. newChannel ssh.NewChannel) {
  2344. // Assumptions:
  2345. // - sshClient.dialingTCPPortForward() has been called
  2346. // - remainingDialTimeout > 0
  2347. established := false
  2348. defer func() {
  2349. if !established {
  2350. sshClient.abortedTCPPortForward()
  2351. }
  2352. }()
  2353. // Transparently redirect web API request connections.
  2354. isWebServerPortForward := false
  2355. config := sshClient.sshServer.support.Config
  2356. if config.WebServerPortForwardAddress != "" {
  2357. destination := net.JoinHostPort(hostToConnect, strconv.Itoa(portToConnect))
  2358. if destination == config.WebServerPortForwardAddress {
  2359. isWebServerPortForward = true
  2360. if config.WebServerPortForwardRedirectAddress != "" {
  2361. // Note: redirect format is validated when config is loaded
  2362. host, portStr, _ := net.SplitHostPort(config.WebServerPortForwardRedirectAddress)
  2363. port, _ := strconv.Atoi(portStr)
  2364. hostToConnect = host
  2365. portToConnect = port
  2366. }
  2367. }
  2368. }
  2369. // Dial the remote address.
  2370. //
  2371. // Hostname resolution is performed explicitly, as a separate step, as the target IP
  2372. // address is used for traffic rules (AllowSubnets) and OSL seed progress.
  2373. //
  2374. // Contexts are used for cancellation (via sshClient.runCtx, which is cancelled
  2375. // when the client is stopping) and timeouts.
  2376. dialStartTime := monotime.Now()
  2377. log.WithContextFields(LogFields{"hostToConnect": hostToConnect}).Debug("resolving")
  2378. ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  2379. IPs, err := (&net.Resolver{}).LookupIPAddr(ctx, hostToConnect)
  2380. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  2381. // TODO: shuffle list to try other IPs?
  2382. // TODO: IPv6 support
  2383. var IP net.IP
  2384. for _, ip := range IPs {
  2385. if ip.IP.To4() != nil {
  2386. IP = ip.IP
  2387. break
  2388. }
  2389. }
  2390. if err == nil && IP == nil {
  2391. err = errors.New("no IP address")
  2392. }
  2393. resolveElapsedTime := monotime.Since(dialStartTime)
  2394. if err != nil {
  2395. // Record a port forward failure
  2396. sshClient.updateQualityMetricsWithDialResult(false, resolveElapsedTime)
  2397. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("LookupIP failed: %s", err))
  2398. return
  2399. }
  2400. remainingDialTimeout -= resolveElapsedTime
  2401. if remainingDialTimeout <= 0 {
  2402. sshClient.rejectNewChannel(newChannel, "TCP port forward timed out resolving")
  2403. return
  2404. }
  2405. // Enforce traffic rules, using the resolved IP address.
  2406. if !isWebServerPortForward &&
  2407. !sshClient.isPortForwardPermitted(
  2408. portForwardTypeTCP,
  2409. IP,
  2410. portToConnect) {
  2411. // Note: not recording a port forward failure in this case
  2412. sshClient.rejectNewChannel(newChannel, "port forward not permitted")
  2413. return
  2414. }
  2415. // TCP dial.
  2416. remoteAddr := net.JoinHostPort(IP.String(), strconv.Itoa(portToConnect))
  2417. log.WithContextFields(LogFields{"remoteAddr": remoteAddr}).Debug("dialing")
  2418. ctx, cancelCtx = context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  2419. fwdConn, err := (&net.Dialer{}).DialContext(ctx, "tcp", remoteAddr)
  2420. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  2421. // Record port forward success or failure
  2422. sshClient.updateQualityMetricsWithDialResult(err == nil, monotime.Since(dialStartTime))
  2423. if err != nil {
  2424. // Monitor for low resource error conditions
  2425. sshClient.sshServer.monitorPortForwardDialError(err)
  2426. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("DialTimeout failed: %s", err))
  2427. return
  2428. }
  2429. // The upstream TCP port forward connection has been established. Schedule
  2430. // some cleanup and notify the SSH client that the channel is accepted.
  2431. defer fwdConn.Close()
  2432. fwdChannel, requests, err := newChannel.Accept()
  2433. if err != nil {
  2434. if !isExpectedTunnelIOError(err) {
  2435. log.WithContextFields(LogFields{"error": err}).Warning("accept new channel failed")
  2436. }
  2437. return
  2438. }
  2439. go ssh.DiscardRequests(requests)
  2440. defer fwdChannel.Close()
  2441. // Release the dialing slot and acquire an established slot.
  2442. //
  2443. // establishedPortForward increments the concurrent TCP port
  2444. // forward counter and closes the LRU existing TCP port forward
  2445. // when already at the limit.
  2446. //
  2447. // Known limitations:
  2448. //
  2449. // - Closed LRU TCP sockets will enter the TIME_WAIT state,
  2450. // continuing to consume some resources.
  2451. sshClient.establishedPortForward(portForwardTypeTCP, sshClient.tcpPortForwardLRU)
  2452. // "established = true" cancels the deferred abortedTCPPortForward()
  2453. established = true
  2454. // TODO: 64-bit alignment? https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  2455. var bytesUp, bytesDown int64
  2456. defer func() {
  2457. sshClient.closedPortForward(
  2458. portForwardTypeTCP, atomic.LoadInt64(&bytesUp), atomic.LoadInt64(&bytesDown))
  2459. }()
  2460. lruEntry := sshClient.tcpPortForwardLRU.Add(fwdConn)
  2461. defer lruEntry.Remove()
  2462. // ActivityMonitoredConn monitors the TCP port forward I/O and updates
  2463. // its LRU status. ActivityMonitoredConn also times out I/O on the port
  2464. // forward if both reads and writes have been idle for the specified
  2465. // duration.
  2466. // Ensure nil interface if newClientSeedPortForward returns nil
  2467. var updater common.ActivityUpdater
  2468. seedUpdater := sshClient.newClientSeedPortForward(IP)
  2469. if seedUpdater != nil {
  2470. updater = seedUpdater
  2471. }
  2472. fwdConn, err = common.NewActivityMonitoredConn(
  2473. fwdConn,
  2474. sshClient.idleTCPPortForwardTimeout(),
  2475. true,
  2476. updater,
  2477. lruEntry)
  2478. if err != nil {
  2479. log.WithContextFields(LogFields{"error": err}).Error("NewActivityMonitoredConn failed")
  2480. return
  2481. }
  2482. // Relay channel to forwarded connection.
  2483. log.WithContextFields(LogFields{"remoteAddr": remoteAddr}).Debug("relaying")
  2484. // TODO: relay errors to fwdChannel.Stderr()?
  2485. relayWaitGroup := new(sync.WaitGroup)
  2486. relayWaitGroup.Add(1)
  2487. go func() {
  2488. defer relayWaitGroup.Done()
  2489. // io.Copy allocates a 32K temporary buffer, and each port forward relay uses
  2490. // two of these buffers; using io.CopyBuffer with a smaller buffer reduces the
  2491. // overall memory footprint.
  2492. bytes, err := io.CopyBuffer(
  2493. fwdChannel, fwdConn, make([]byte, SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE))
  2494. atomic.AddInt64(&bytesDown, bytes)
  2495. if err != nil && err != io.EOF {
  2496. // Debug since errors such as "connection reset by peer" occur during normal operation
  2497. log.WithContextFields(LogFields{"error": err}).Debug("downstream TCP relay failed")
  2498. }
  2499. // Interrupt upstream io.Copy when downstream is shutting down.
  2500. // TODO: this is done to quickly cleanup the port forward when
  2501. // fwdConn has a read timeout, but is it clean -- upstream may still
  2502. // be flowing?
  2503. fwdChannel.Close()
  2504. }()
  2505. bytes, err := io.CopyBuffer(
  2506. fwdConn, fwdChannel, make([]byte, SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE))
  2507. atomic.AddInt64(&bytesUp, bytes)
  2508. if err != nil && err != io.EOF {
  2509. log.WithContextFields(LogFields{"error": err}).Debug("upstream TCP relay failed")
  2510. }
  2511. // Shutdown special case: fwdChannel will be closed and return EOF when
  2512. // the SSH connection is closed, but we need to explicitly close fwdConn
  2513. // to interrupt the downstream io.Copy, which may be blocked on a
  2514. // fwdConn.Read().
  2515. fwdConn.Close()
  2516. relayWaitGroup.Wait()
  2517. log.WithContextFields(
  2518. LogFields{
  2519. "remoteAddr": remoteAddr,
  2520. "bytesUp": atomic.LoadInt64(&bytesUp),
  2521. "bytesDown": atomic.LoadInt64(&bytesDown)}).Debug("exiting")
  2522. }