tunnelServer.go 127 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800
  1. /*
  2. * Copyright (c) 2016, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "context"
  22. "crypto/rand"
  23. "crypto/subtle"
  24. "encoding/base64"
  25. "encoding/json"
  26. std_errors "errors"
  27. "fmt"
  28. "io"
  29. "io/ioutil"
  30. "net"
  31. "strconv"
  32. "sync"
  33. "sync/atomic"
  34. "syscall"
  35. "time"
  36. "github.com/Psiphon-Labs/goarista/monotime"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/accesscontrol"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/ssh"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/marionette"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/osl"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  45. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  46. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  47. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  48. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/refraction"
  49. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
  50. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tun"
  51. "github.com/marusama/semaphore"
  52. cache "github.com/patrickmn/go-cache"
  53. )
  54. const (
  55. SSH_AUTH_LOG_PERIOD = 30 * time.Minute
  56. SSH_HANDSHAKE_TIMEOUT = 30 * time.Second
  57. SSH_BEGIN_HANDSHAKE_TIMEOUT = 1 * time.Second
  58. SSH_CONNECTION_READ_DEADLINE = 5 * time.Minute
  59. SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE = 8192
  60. SSH_TCP_PORT_FORWARD_QUEUE_SIZE = 1024
  61. SSH_KEEP_ALIVE_PAYLOAD_MIN_BYTES = 0
  62. SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES = 256
  63. SSH_SEND_OSL_INITIAL_RETRY_DELAY = 30 * time.Second
  64. SSH_SEND_OSL_RETRY_FACTOR = 2
  65. OSL_SESSION_CACHE_TTL = 5 * time.Minute
  66. MAX_AUTHORIZATIONS = 16
  67. PRE_HANDSHAKE_RANDOM_STREAM_MAX_COUNT = 1
  68. RANDOM_STREAM_MAX_BYTES = 10485760
  69. ALERT_REQUEST_QUEUE_BUFFER_SIZE = 16
  70. )
  71. // TunnelServer is the main server that accepts Psiphon client
  72. // connections, via various obfuscation protocols, and provides
  73. // port forwarding (TCP and UDP) services to the Psiphon client.
  74. // At its core, TunnelServer is an SSH server. SSH is the base
  75. // protocol that provides port forward multiplexing, and transport
  76. // security. Layered on top of SSH, optionally, is Obfuscated SSH
  77. // and meek protocols, which provide further circumvention
  78. // capabilities.
  79. type TunnelServer struct {
  80. runWaitGroup *sync.WaitGroup
  81. listenerError chan error
  82. shutdownBroadcast <-chan struct{}
  83. sshServer *sshServer
  84. }
  85. type sshListener struct {
  86. net.Listener
  87. localAddress string
  88. tunnelProtocol string
  89. port int
  90. BPFProgramName string
  91. }
  92. // NewTunnelServer initializes a new tunnel server.
  93. func NewTunnelServer(
  94. support *SupportServices,
  95. shutdownBroadcast <-chan struct{}) (*TunnelServer, error) {
  96. sshServer, err := newSSHServer(support, shutdownBroadcast)
  97. if err != nil {
  98. return nil, errors.Trace(err)
  99. }
  100. return &TunnelServer{
  101. runWaitGroup: new(sync.WaitGroup),
  102. listenerError: make(chan error),
  103. shutdownBroadcast: shutdownBroadcast,
  104. sshServer: sshServer,
  105. }, nil
  106. }
  107. // Run runs the tunnel server; this function blocks while running a selection of
  108. // listeners that handle connection using various obfuscation protocols.
  109. //
  110. // Run listens on each designated tunnel port and spawns new goroutines to handle
  111. // each client connection. It halts when shutdownBroadcast is signaled. A list of active
  112. // clients is maintained, and when halting all clients are cleanly shutdown.
  113. //
  114. // Each client goroutine handles its own obfuscation (optional), SSH handshake, SSH
  115. // authentication, and then looping on client new channel requests. "direct-tcpip"
  116. // channels, dynamic port fowards, are supported. When the UDPInterceptUdpgwServerAddress
  117. // config parameter is configured, UDP port forwards over a TCP stream, following
  118. // the udpgw protocol, are handled.
  119. //
  120. // A new goroutine is spawned to handle each port forward for each client. Each port
  121. // forward tracks its bytes transferred. Overall per-client stats for connection duration,
  122. // GeoIP, number of port forwards, and bytes transferred are tracked and logged when the
  123. // client shuts down.
  124. //
  125. // Note: client handler goroutines may still be shutting down after Run() returns. See
  126. // comment in sshClient.stop(). TODO: fully synchronized shutdown.
  127. func (server *TunnelServer) Run() error {
  128. // TODO: should TunnelServer hold its own support pointer?
  129. support := server.sshServer.support
  130. // First bind all listeners; once all are successful,
  131. // start accepting connections on each.
  132. var listeners []*sshListener
  133. for tunnelProtocol, listenPort := range support.Config.TunnelProtocolPorts {
  134. localAddress := fmt.Sprintf(
  135. "%s:%d", support.Config.ServerIPAddress, listenPort)
  136. var listener net.Listener
  137. var BPFProgramName string
  138. var err error
  139. if protocol.TunnelProtocolUsesFrontedMeekQUIC(tunnelProtocol) {
  140. // For FRONTED-MEEK-QUIC-OSSH, no listener implemented. The edge-to-server
  141. // hop uses HTTPS and the client tunnel protocol is distinguished using
  142. // protocol.MeekCookieData.ClientTunnelProtocol.
  143. continue
  144. } else if protocol.TunnelProtocolUsesQUIC(tunnelProtocol) {
  145. listener, err = quic.Listen(
  146. CommonLogger(log),
  147. localAddress,
  148. support.Config.ObfuscatedSSHKey)
  149. } else if protocol.TunnelProtocolUsesMarionette(tunnelProtocol) {
  150. listener, err = marionette.Listen(
  151. support.Config.ServerIPAddress,
  152. support.Config.MarionetteFormat)
  153. } else if protocol.TunnelProtocolUsesRefractionNetworking(tunnelProtocol) {
  154. listener, err = refraction.Listen(localAddress)
  155. } else if protocol.TunnelProtocolUsesFrontedMeek(tunnelProtocol) {
  156. listener, err = net.Listen("tcp", localAddress)
  157. } else {
  158. // Only direct, unfronted protocol listeners use TCP BPF circumvention
  159. // programs.
  160. listener, BPFProgramName, err = newTCPListenerWithBPF(support, localAddress)
  161. }
  162. if err != nil {
  163. for _, existingListener := range listeners {
  164. existingListener.Listener.Close()
  165. }
  166. return errors.Trace(err)
  167. }
  168. tacticsListener := NewTacticsListener(
  169. support,
  170. listener,
  171. tunnelProtocol,
  172. func(IP string) GeoIPData { return support.GeoIPService.Lookup(IP, false) })
  173. log.WithTraceFields(
  174. LogFields{
  175. "localAddress": localAddress,
  176. "tunnelProtocol": tunnelProtocol,
  177. "BPFProgramName": BPFProgramName,
  178. }).Info("listening")
  179. listeners = append(
  180. listeners,
  181. &sshListener{
  182. Listener: tacticsListener,
  183. localAddress: localAddress,
  184. port: listenPort,
  185. tunnelProtocol: tunnelProtocol,
  186. BPFProgramName: BPFProgramName,
  187. })
  188. }
  189. for _, listener := range listeners {
  190. server.runWaitGroup.Add(1)
  191. go func(listener *sshListener) {
  192. defer server.runWaitGroup.Done()
  193. log.WithTraceFields(
  194. LogFields{
  195. "localAddress": listener.localAddress,
  196. "tunnelProtocol": listener.tunnelProtocol,
  197. }).Info("running")
  198. server.sshServer.runListener(
  199. listener,
  200. server.listenerError)
  201. log.WithTraceFields(
  202. LogFields{
  203. "localAddress": listener.localAddress,
  204. "tunnelProtocol": listener.tunnelProtocol,
  205. }).Info("stopped")
  206. }(listener)
  207. }
  208. var err error
  209. select {
  210. case <-server.shutdownBroadcast:
  211. case err = <-server.listenerError:
  212. }
  213. for _, listener := range listeners {
  214. listener.Close()
  215. }
  216. server.sshServer.stopClients()
  217. server.runWaitGroup.Wait()
  218. log.WithTrace().Info("stopped")
  219. return err
  220. }
  221. // GetLoadStats returns load stats for the tunnel server. The stats are
  222. // broken down by protocol ("SSH", "OSSH", etc.) and type. Types of stats
  223. // include current connected client count, total number of current port
  224. // forwards.
  225. func (server *TunnelServer) GetLoadStats() (ProtocolStats, RegionStats) {
  226. return server.sshServer.getLoadStats()
  227. }
  228. // GetEstablishedClientCount returns the number of currently established
  229. // clients.
  230. func (server *TunnelServer) GetEstablishedClientCount() int {
  231. return server.sshServer.getEstablishedClientCount()
  232. }
  233. // ResetAllClientTrafficRules resets all established client traffic rules
  234. // to use the latest config and client properties. Any existing traffic
  235. // rule state is lost, including throttling state.
  236. func (server *TunnelServer) ResetAllClientTrafficRules() {
  237. server.sshServer.resetAllClientTrafficRules()
  238. }
  239. // ResetAllClientOSLConfigs resets all established client OSL state to use
  240. // the latest OSL config. Any existing OSL state is lost, including partial
  241. // progress towards SLOKs.
  242. func (server *TunnelServer) ResetAllClientOSLConfigs() {
  243. server.sshServer.resetAllClientOSLConfigs()
  244. }
  245. // SetClientHandshakeState sets the handshake state -- that it completed and
  246. // what parameters were passed -- in sshClient. This state is used for allowing
  247. // port forwards and for future traffic rule selection. SetClientHandshakeState
  248. // also triggers an immediate traffic rule re-selection, as the rules selected
  249. // upon tunnel establishment may no longer apply now that handshake values are
  250. // set.
  251. //
  252. // The authorizations received from the client handshake are verified and the
  253. // resulting list of authorized access types are applied to the client's tunnel
  254. // and traffic rules.
  255. //
  256. // A list of active authorization IDs, authorized access types, and traffic
  257. // rate limits are returned for responding to the client and logging.
  258. func (server *TunnelServer) SetClientHandshakeState(
  259. sessionID string,
  260. state handshakeState,
  261. authorizations []string) (*handshakeStateInfo, error) {
  262. return server.sshServer.setClientHandshakeState(sessionID, state, authorizations)
  263. }
  264. // GetClientHandshaked indicates whether the client has completed a handshake
  265. // and whether its traffic rules are immediately exhausted.
  266. func (server *TunnelServer) GetClientHandshaked(
  267. sessionID string) (bool, bool, error) {
  268. return server.sshServer.getClientHandshaked(sessionID)
  269. }
  270. // UpdateClientAPIParameters updates the recorded handshake API parameters for
  271. // the client corresponding to sessionID.
  272. func (server *TunnelServer) UpdateClientAPIParameters(
  273. sessionID string,
  274. apiParams common.APIParameters) error {
  275. return server.sshServer.updateClientAPIParameters(sessionID, apiParams)
  276. }
  277. // ExpectClientDomainBytes indicates whether the client was configured to report
  278. // domain bytes in its handshake response.
  279. func (server *TunnelServer) ExpectClientDomainBytes(
  280. sessionID string) (bool, error) {
  281. return server.sshServer.expectClientDomainBytes(sessionID)
  282. }
  283. // SetEstablishTunnels sets whether new tunnels may be established or not.
  284. // When not establishing, incoming connections are immediately closed.
  285. func (server *TunnelServer) SetEstablishTunnels(establish bool) {
  286. server.sshServer.setEstablishTunnels(establish)
  287. }
  288. // CheckEstablishTunnels returns whether new tunnels may be established or
  289. // not, and increments a metrics counter when establishment is disallowed.
  290. func (server *TunnelServer) CheckEstablishTunnels() bool {
  291. return server.sshServer.checkEstablishTunnels()
  292. }
  293. // GetEstablishTunnelsMetrics returns whether tunnel establishment is
  294. // currently allowed and the number of tunnels rejected since due to not
  295. // establishing since the last GetEstablishTunnelsMetrics call.
  296. func (server *TunnelServer) GetEstablishTunnelsMetrics() (bool, int64) {
  297. return server.sshServer.getEstablishTunnelsMetrics()
  298. }
  299. type sshServer struct {
  300. // Note: 64-bit ints used with atomic operations are placed
  301. // at the start of struct to ensure 64-bit alignment.
  302. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  303. lastAuthLog int64
  304. authFailedCount int64
  305. establishLimitedCount int64
  306. support *SupportServices
  307. establishTunnels int32
  308. concurrentSSHHandshakes semaphore.Semaphore
  309. shutdownBroadcast <-chan struct{}
  310. sshHostKey ssh.Signer
  311. clientsMutex sync.Mutex
  312. stoppingClients bool
  313. acceptedClientCounts map[string]map[string]int64
  314. clients map[string]*sshClient
  315. oslSessionCacheMutex sync.Mutex
  316. oslSessionCache *cache.Cache
  317. authorizationSessionIDsMutex sync.Mutex
  318. authorizationSessionIDs map[string]string
  319. obfuscatorSeedHistory *obfuscator.SeedHistory
  320. }
  321. func newSSHServer(
  322. support *SupportServices,
  323. shutdownBroadcast <-chan struct{}) (*sshServer, error) {
  324. privateKey, err := ssh.ParseRawPrivateKey([]byte(support.Config.SSHPrivateKey))
  325. if err != nil {
  326. return nil, errors.Trace(err)
  327. }
  328. // TODO: use cert (ssh.NewCertSigner) for anti-fingerprint?
  329. signer, err := ssh.NewSignerFromKey(privateKey)
  330. if err != nil {
  331. return nil, errors.Trace(err)
  332. }
  333. var concurrentSSHHandshakes semaphore.Semaphore
  334. if support.Config.MaxConcurrentSSHHandshakes > 0 {
  335. concurrentSSHHandshakes = semaphore.New(support.Config.MaxConcurrentSSHHandshakes)
  336. }
  337. // The OSL session cache temporarily retains OSL seed state
  338. // progress for disconnected clients. This enables clients
  339. // that disconnect and immediately reconnect to the same
  340. // server to resume their OSL progress. Cached progress
  341. // is referenced by session ID and is retained for
  342. // OSL_SESSION_CACHE_TTL after disconnect.
  343. //
  344. // Note: session IDs are assumed to be unpredictable. If a
  345. // rogue client could guess the session ID of another client,
  346. // it could resume its OSL progress and, if the OSL config
  347. // were known, infer some activity.
  348. oslSessionCache := cache.New(OSL_SESSION_CACHE_TTL, 1*time.Minute)
  349. return &sshServer{
  350. support: support,
  351. establishTunnels: 1,
  352. concurrentSSHHandshakes: concurrentSSHHandshakes,
  353. shutdownBroadcast: shutdownBroadcast,
  354. sshHostKey: signer,
  355. acceptedClientCounts: make(map[string]map[string]int64),
  356. clients: make(map[string]*sshClient),
  357. oslSessionCache: oslSessionCache,
  358. authorizationSessionIDs: make(map[string]string),
  359. obfuscatorSeedHistory: obfuscator.NewSeedHistory(nil),
  360. }, nil
  361. }
  362. func (sshServer *sshServer) setEstablishTunnels(establish bool) {
  363. // Do nothing when the setting is already correct. This avoids
  364. // spurious log messages when setEstablishTunnels is called
  365. // periodically with the same setting.
  366. if establish == (atomic.LoadInt32(&sshServer.establishTunnels) == 1) {
  367. return
  368. }
  369. establishFlag := int32(1)
  370. if !establish {
  371. establishFlag = 0
  372. }
  373. atomic.StoreInt32(&sshServer.establishTunnels, establishFlag)
  374. log.WithTraceFields(
  375. LogFields{"establish": establish}).Info("establishing tunnels")
  376. }
  377. func (sshServer *sshServer) checkEstablishTunnels() bool {
  378. establishTunnels := atomic.LoadInt32(&sshServer.establishTunnels) == 1
  379. if !establishTunnels {
  380. atomic.AddInt64(&sshServer.establishLimitedCount, 1)
  381. }
  382. return establishTunnels
  383. }
  384. func (sshServer *sshServer) getEstablishTunnelsMetrics() (bool, int64) {
  385. return atomic.LoadInt32(&sshServer.establishTunnels) == 1,
  386. atomic.SwapInt64(&sshServer.establishLimitedCount, 0)
  387. }
  388. // runListener is intended to run an a goroutine; it blocks
  389. // running a particular listener. If an unrecoverable error
  390. // occurs, it will send the error to the listenerError channel.
  391. func (sshServer *sshServer) runListener(sshListener *sshListener, listenerError chan<- error) {
  392. runningProtocols := make([]string, 0)
  393. for tunnelProtocol := range sshServer.support.Config.TunnelProtocolPorts {
  394. runningProtocols = append(runningProtocols, tunnelProtocol)
  395. }
  396. handleClient := func(clientTunnelProtocol string, clientConn net.Conn) {
  397. // Note: establish tunnel limiter cannot simply stop TCP
  398. // listeners in all cases (e.g., meek) since SSH tunnels can
  399. // span multiple TCP connections.
  400. if !sshServer.checkEstablishTunnels() {
  401. log.WithTrace().Debug("not establishing tunnels")
  402. clientConn.Close()
  403. return
  404. }
  405. // The tunnelProtocol passed to handleClient is used for stats,
  406. // throttling, etc. When the tunnel protocol can be determined
  407. // unambiguously from the listening port, use that protocol and
  408. // don't use any client-declared value. Only use the client's
  409. // value, if present, in special cases where the listening port
  410. // cannot distinguish the protocol.
  411. tunnelProtocol := sshListener.tunnelProtocol
  412. if clientTunnelProtocol != "" {
  413. if !common.Contains(runningProtocols, clientTunnelProtocol) {
  414. log.WithTraceFields(
  415. LogFields{
  416. "clientTunnelProtocol": clientTunnelProtocol}).
  417. Warning("invalid client tunnel protocol")
  418. clientConn.Close()
  419. return
  420. }
  421. if protocol.UseClientTunnelProtocol(
  422. clientTunnelProtocol, runningProtocols) {
  423. tunnelProtocol = clientTunnelProtocol
  424. }
  425. }
  426. // sshListener.tunnelProtocol indictes the tunnel protocol run by the
  427. // listener. For direct protocols, this is also the client tunnel protocol.
  428. // For fronted protocols, the client may use a different protocol to connect
  429. // to the front and then only the front-to-Psiphon server will use the
  430. // listener protocol.
  431. //
  432. // A fronted meek client, for example, reports its first hop protocol in
  433. // protocol.MeekCookieData.ClientTunnelProtocol. Most metrics record this
  434. // value as relay_protocol, since the first hop is the one subject to
  435. // adversarial conditions. In some cases, such as irregular tunnels, there
  436. // is no ClientTunnelProtocol value available and the listener tunnel
  437. // protocol will be logged.
  438. //
  439. // Similarly, listenerPort indicates the listening port, which is the dialed
  440. // port number for direct protocols; while, for fronted protocols, the
  441. // client may dial a different port for its first hop.
  442. // Process each client connection concurrently.
  443. go sshServer.handleClient(sshListener, tunnelProtocol, clientConn)
  444. }
  445. // Note: when exiting due to a unrecoverable error, be sure
  446. // to try to send the error to listenerError so that the outer
  447. // TunnelServer.Run will properly shut down instead of remaining
  448. // running.
  449. if protocol.TunnelProtocolUsesMeekHTTP(sshListener.tunnelProtocol) ||
  450. protocol.TunnelProtocolUsesMeekHTTPS(sshListener.tunnelProtocol) {
  451. meekServer, err := NewMeekServer(
  452. sshServer.support,
  453. sshListener.Listener,
  454. sshListener.tunnelProtocol,
  455. sshListener.port,
  456. protocol.TunnelProtocolUsesMeekHTTPS(sshListener.tunnelProtocol),
  457. protocol.TunnelProtocolUsesFrontedMeek(sshListener.tunnelProtocol),
  458. protocol.TunnelProtocolUsesObfuscatedSessionTickets(sshListener.tunnelProtocol),
  459. handleClient,
  460. sshServer.shutdownBroadcast)
  461. if err == nil {
  462. err = meekServer.Run()
  463. }
  464. if err != nil {
  465. select {
  466. case listenerError <- errors.Trace(err):
  467. default:
  468. }
  469. return
  470. }
  471. } else {
  472. for {
  473. conn, err := sshListener.Listener.Accept()
  474. select {
  475. case <-sshServer.shutdownBroadcast:
  476. if err == nil {
  477. conn.Close()
  478. }
  479. return
  480. default:
  481. }
  482. if err != nil {
  483. if e, ok := err.(net.Error); ok && e.Temporary() {
  484. log.WithTraceFields(LogFields{"error": err}).Error("accept failed")
  485. // Temporary error, keep running
  486. continue
  487. }
  488. select {
  489. case listenerError <- errors.Trace(err):
  490. default:
  491. }
  492. return
  493. }
  494. handleClient("", conn)
  495. }
  496. }
  497. }
  498. // An accepted client has completed a direct TCP or meek connection and has a net.Conn. Registration
  499. // is for tracking the number of connections.
  500. func (sshServer *sshServer) registerAcceptedClient(tunnelProtocol, region string) {
  501. sshServer.clientsMutex.Lock()
  502. defer sshServer.clientsMutex.Unlock()
  503. if sshServer.acceptedClientCounts[tunnelProtocol] == nil {
  504. sshServer.acceptedClientCounts[tunnelProtocol] = make(map[string]int64)
  505. }
  506. sshServer.acceptedClientCounts[tunnelProtocol][region] += 1
  507. }
  508. func (sshServer *sshServer) unregisterAcceptedClient(tunnelProtocol, region string) {
  509. sshServer.clientsMutex.Lock()
  510. defer sshServer.clientsMutex.Unlock()
  511. sshServer.acceptedClientCounts[tunnelProtocol][region] -= 1
  512. }
  513. // An established client has completed its SSH handshake and has a ssh.Conn. Registration is
  514. // for tracking the number of fully established clients and for maintaining a list of running
  515. // clients (for stopping at shutdown time).
  516. func (sshServer *sshServer) registerEstablishedClient(client *sshClient) bool {
  517. sshServer.clientsMutex.Lock()
  518. if sshServer.stoppingClients {
  519. sshServer.clientsMutex.Unlock()
  520. return false
  521. }
  522. // In the case of a duplicate client sessionID, the previous client is closed.
  523. // - Well-behaved clients generate a random sessionID that should be unique (won't
  524. // accidentally conflict) and hard to guess (can't be targeted by a malicious
  525. // client).
  526. // - Clients reuse the same sessionID when a tunnel is unexpectedly disconnected
  527. // and reestablished. In this case, when the same server is selected, this logic
  528. // will be hit; closing the old, dangling client is desirable.
  529. // - Multi-tunnel clients should not normally use one server for multiple tunnels.
  530. existingClient := sshServer.clients[client.sessionID]
  531. sshServer.clientsMutex.Unlock()
  532. if existingClient != nil {
  533. // This case is expected to be common, and so logged at the lowest severity
  534. // level.
  535. log.WithTrace().Debug(
  536. "stopping existing client with duplicate session ID")
  537. existingClient.stop()
  538. // Block until the existingClient is fully terminated. This is necessary to
  539. // avoid this scenario:
  540. // - existingClient is invoking handshakeAPIRequestHandler
  541. // - sshServer.clients[client.sessionID] is updated to point to new client
  542. // - existingClient's handshakeAPIRequestHandler invokes
  543. // SetClientHandshakeState but sets the handshake parameters for new
  544. // client
  545. // - as a result, the new client handshake will fail (only a single handshake
  546. // is permitted) and the new client server_tunnel log will contain an
  547. // invalid mix of existing/new client fields
  548. //
  549. // Once existingClient.awaitStopped returns, all existingClient port
  550. // forwards and request handlers have terminated, so no API handler, either
  551. // tunneled web API or SSH API, will remain and it is safe to point
  552. // sshServer.clients[client.sessionID] to the new client.
  553. // Limitation: this scenario remains possible with _untunneled_ web API
  554. // requests.
  555. //
  556. // Blocking also ensures existingClient.releaseAuthorizations is invoked before
  557. // the new client attempts to submit the same authorizations.
  558. //
  559. // Perform blocking awaitStopped operation outside the
  560. // sshServer.clientsMutex mutex to avoid blocking all other clients for the
  561. // duration. We still expect and require that the stop process completes
  562. // rapidly, e.g., does not block on network I/O, allowing the new client
  563. // connection to proceed without delay.
  564. //
  565. // In addition, operations triggered by stop, and which must complete before
  566. // awaitStopped returns, will attempt to lock sshServer.clientsMutex,
  567. // including unregisterEstablishedClient.
  568. existingClient.awaitStopped()
  569. }
  570. sshServer.clientsMutex.Lock()
  571. defer sshServer.clientsMutex.Unlock()
  572. // existingClient's stop will have removed it from sshServer.clients via
  573. // unregisterEstablishedClient, so sshServer.clients[client.sessionID] should
  574. // be nil -- unless yet another client instance using the same sessionID has
  575. // connected in the meantime while awaiting existingClient stop. In this
  576. // case, it's not clear which is the most recent connection from the client,
  577. // so instead of this connection terminating more peers, it aborts.
  578. if sshServer.clients[client.sessionID] != nil {
  579. // As this is expected to be rare case, it's logged at a higher severity
  580. // level.
  581. log.WithTrace().Warning(
  582. "aborting new client with duplicate session ID")
  583. return false
  584. }
  585. sshServer.clients[client.sessionID] = client
  586. return true
  587. }
  588. func (sshServer *sshServer) unregisterEstablishedClient(client *sshClient) {
  589. sshServer.clientsMutex.Lock()
  590. registeredClient := sshServer.clients[client.sessionID]
  591. // registeredClient will differ from client when client is the existingClient
  592. // terminated in registerEstablishedClient. In that case, registeredClient
  593. // remains connected, and the sshServer.clients entry should be retained.
  594. if registeredClient == client {
  595. delete(sshServer.clients, client.sessionID)
  596. }
  597. sshServer.clientsMutex.Unlock()
  598. client.stop()
  599. }
  600. type ProtocolStats map[string]map[string]int64
  601. type RegionStats map[string]map[string]map[string]int64
  602. func (sshServer *sshServer) getLoadStats() (ProtocolStats, RegionStats) {
  603. sshServer.clientsMutex.Lock()
  604. defer sshServer.clientsMutex.Unlock()
  605. // Explicitly populate with zeros to ensure 0 counts in log messages
  606. zeroStats := func() map[string]int64 {
  607. stats := make(map[string]int64)
  608. stats["accepted_clients"] = 0
  609. stats["established_clients"] = 0
  610. stats["dialing_tcp_port_forwards"] = 0
  611. stats["tcp_port_forwards"] = 0
  612. stats["total_tcp_port_forwards"] = 0
  613. stats["udp_port_forwards"] = 0
  614. stats["total_udp_port_forwards"] = 0
  615. stats["tcp_port_forward_dialed_count"] = 0
  616. stats["tcp_port_forward_dialed_duration"] = 0
  617. stats["tcp_port_forward_failed_count"] = 0
  618. stats["tcp_port_forward_failed_duration"] = 0
  619. stats["tcp_port_forward_rejected_dialing_limit_count"] = 0
  620. stats["tcp_port_forward_rejected_disallowed_count"] = 0
  621. stats["udp_port_forward_rejected_disallowed_count"] = 0
  622. stats["tcp_ipv4_port_forward_dialed_count"] = 0
  623. stats["tcp_ipv4_port_forward_dialed_duration"] = 0
  624. stats["tcp_ipv4_port_forward_failed_count"] = 0
  625. stats["tcp_ipv4_port_forward_failed_duration"] = 0
  626. stats["tcp_ipv6_port_forward_dialed_count"] = 0
  627. stats["tcp_ipv6_port_forward_dialed_duration"] = 0
  628. stats["tcp_ipv6_port_forward_failed_count"] = 0
  629. stats["tcp_ipv6_port_forward_failed_duration"] = 0
  630. return stats
  631. }
  632. zeroProtocolStats := func() map[string]map[string]int64 {
  633. stats := make(map[string]map[string]int64)
  634. stats["ALL"] = zeroStats()
  635. for tunnelProtocol := range sshServer.support.Config.TunnelProtocolPorts {
  636. stats[tunnelProtocol] = zeroStats()
  637. }
  638. return stats
  639. }
  640. // [<protocol or ALL>][<stat name>] -> count
  641. protocolStats := zeroProtocolStats()
  642. // [<region][<protocol or ALL>][<stat name>] -> count
  643. regionStats := make(RegionStats)
  644. // Note: as currently tracked/counted, each established client is also an accepted client
  645. for tunnelProtocol, regionAcceptedClientCounts := range sshServer.acceptedClientCounts {
  646. for region, acceptedClientCount := range regionAcceptedClientCounts {
  647. if acceptedClientCount > 0 {
  648. if regionStats[region] == nil {
  649. regionStats[region] = zeroProtocolStats()
  650. }
  651. protocolStats["ALL"]["accepted_clients"] += acceptedClientCount
  652. protocolStats[tunnelProtocol]["accepted_clients"] += acceptedClientCount
  653. regionStats[region]["ALL"]["accepted_clients"] += acceptedClientCount
  654. regionStats[region][tunnelProtocol]["accepted_clients"] += acceptedClientCount
  655. }
  656. }
  657. }
  658. for _, client := range sshServer.clients {
  659. client.Lock()
  660. tunnelProtocol := client.tunnelProtocol
  661. region := client.geoIPData.Country
  662. if regionStats[region] == nil {
  663. regionStats[region] = zeroProtocolStats()
  664. }
  665. stats := []map[string]int64{
  666. protocolStats["ALL"],
  667. protocolStats[tunnelProtocol],
  668. regionStats[region]["ALL"],
  669. regionStats[region][tunnelProtocol]}
  670. for _, stat := range stats {
  671. stat["established_clients"] += 1
  672. // Note: can't sum trafficState.peakConcurrentPortForwardCount to get a global peak
  673. stat["dialing_tcp_port_forwards"] += client.tcpTrafficState.concurrentDialingPortForwardCount
  674. stat["tcp_port_forwards"] += client.tcpTrafficState.concurrentPortForwardCount
  675. stat["total_tcp_port_forwards"] += client.tcpTrafficState.totalPortForwardCount
  676. // client.udpTrafficState.concurrentDialingPortForwardCount isn't meaningful
  677. stat["udp_port_forwards"] += client.udpTrafficState.concurrentPortForwardCount
  678. stat["total_udp_port_forwards"] += client.udpTrafficState.totalPortForwardCount
  679. stat["tcp_port_forward_dialed_count"] += client.qualityMetrics.TCPPortForwardDialedCount
  680. stat["tcp_port_forward_dialed_duration"] +=
  681. int64(client.qualityMetrics.TCPPortForwardDialedDuration / time.Millisecond)
  682. stat["tcp_port_forward_failed_count"] += client.qualityMetrics.TCPPortForwardFailedCount
  683. stat["tcp_port_forward_failed_duration"] +=
  684. int64(client.qualityMetrics.TCPPortForwardFailedDuration / time.Millisecond)
  685. stat["tcp_port_forward_rejected_dialing_limit_count"] +=
  686. client.qualityMetrics.TCPPortForwardRejectedDialingLimitCount
  687. stat["tcp_port_forward_rejected_disallowed_count"] +=
  688. client.qualityMetrics.TCPPortForwardRejectedDisallowedCount
  689. stat["udp_port_forward_rejected_disallowed_count"] +=
  690. client.qualityMetrics.UDPPortForwardRejectedDisallowedCount
  691. stat["tcp_ipv4_port_forward_dialed_count"] += client.qualityMetrics.TCPIPv4PortForwardDialedCount
  692. stat["tcp_ipv4_port_forward_dialed_duration"] +=
  693. int64(client.qualityMetrics.TCPIPv4PortForwardDialedDuration / time.Millisecond)
  694. stat["tcp_ipv4_port_forward_failed_count"] += client.qualityMetrics.TCPIPv4PortForwardFailedCount
  695. stat["tcp_ipv4_port_forward_failed_duration"] +=
  696. int64(client.qualityMetrics.TCPIPv4PortForwardFailedDuration / time.Millisecond)
  697. stat["tcp_ipv6_port_forward_dialed_count"] += client.qualityMetrics.TCPIPv6PortForwardDialedCount
  698. stat["tcp_ipv6_port_forward_dialed_duration"] +=
  699. int64(client.qualityMetrics.TCPIPv6PortForwardDialedDuration / time.Millisecond)
  700. stat["tcp_ipv6_port_forward_failed_count"] += client.qualityMetrics.TCPIPv6PortForwardFailedCount
  701. stat["tcp_ipv6_port_forward_failed_duration"] +=
  702. int64(client.qualityMetrics.TCPIPv6PortForwardFailedDuration / time.Millisecond)
  703. }
  704. client.qualityMetrics.TCPPortForwardDialedCount = 0
  705. client.qualityMetrics.TCPPortForwardDialedDuration = 0
  706. client.qualityMetrics.TCPPortForwardFailedCount = 0
  707. client.qualityMetrics.TCPPortForwardFailedDuration = 0
  708. client.qualityMetrics.TCPPortForwardRejectedDialingLimitCount = 0
  709. client.qualityMetrics.TCPPortForwardRejectedDisallowedCount = 0
  710. client.qualityMetrics.UDPPortForwardRejectedDisallowedCount = 0
  711. client.qualityMetrics.TCPIPv4PortForwardDialedCount = 0
  712. client.qualityMetrics.TCPIPv4PortForwardDialedDuration = 0
  713. client.qualityMetrics.TCPIPv4PortForwardFailedCount = 0
  714. client.qualityMetrics.TCPIPv4PortForwardFailedDuration = 0
  715. client.qualityMetrics.TCPIPv6PortForwardDialedCount = 0
  716. client.qualityMetrics.TCPIPv6PortForwardDialedDuration = 0
  717. client.qualityMetrics.TCPIPv6PortForwardFailedCount = 0
  718. client.qualityMetrics.TCPIPv6PortForwardFailedDuration = 0
  719. client.Unlock()
  720. }
  721. return protocolStats, regionStats
  722. }
  723. func (sshServer *sshServer) getEstablishedClientCount() int {
  724. sshServer.clientsMutex.Lock()
  725. defer sshServer.clientsMutex.Unlock()
  726. establishedClients := len(sshServer.clients)
  727. return establishedClients
  728. }
  729. func (sshServer *sshServer) resetAllClientTrafficRules() {
  730. sshServer.clientsMutex.Lock()
  731. clients := make(map[string]*sshClient)
  732. for sessionID, client := range sshServer.clients {
  733. clients[sessionID] = client
  734. }
  735. sshServer.clientsMutex.Unlock()
  736. for _, client := range clients {
  737. client.setTrafficRules()
  738. }
  739. }
  740. func (sshServer *sshServer) resetAllClientOSLConfigs() {
  741. // Flush cached seed state. This has the same effect
  742. // and same limitations as calling setOSLConfig for
  743. // currently connected clients -- all progress is lost.
  744. sshServer.oslSessionCacheMutex.Lock()
  745. sshServer.oslSessionCache.Flush()
  746. sshServer.oslSessionCacheMutex.Unlock()
  747. sshServer.clientsMutex.Lock()
  748. clients := make(map[string]*sshClient)
  749. for sessionID, client := range sshServer.clients {
  750. clients[sessionID] = client
  751. }
  752. sshServer.clientsMutex.Unlock()
  753. for _, client := range clients {
  754. client.setOSLConfig()
  755. }
  756. }
  757. func (sshServer *sshServer) setClientHandshakeState(
  758. sessionID string,
  759. state handshakeState,
  760. authorizations []string) (*handshakeStateInfo, error) {
  761. sshServer.clientsMutex.Lock()
  762. client := sshServer.clients[sessionID]
  763. sshServer.clientsMutex.Unlock()
  764. if client == nil {
  765. return nil, errors.TraceNew("unknown session ID")
  766. }
  767. handshakeStateInfo, err := client.setHandshakeState(
  768. state, authorizations)
  769. if err != nil {
  770. return nil, errors.Trace(err)
  771. }
  772. return handshakeStateInfo, nil
  773. }
  774. func (sshServer *sshServer) getClientHandshaked(
  775. sessionID string) (bool, bool, error) {
  776. sshServer.clientsMutex.Lock()
  777. client := sshServer.clients[sessionID]
  778. sshServer.clientsMutex.Unlock()
  779. if client == nil {
  780. return false, false, errors.TraceNew("unknown session ID")
  781. }
  782. completed, exhausted := client.getHandshaked()
  783. return completed, exhausted, nil
  784. }
  785. func (sshServer *sshServer) updateClientAPIParameters(
  786. sessionID string,
  787. apiParams common.APIParameters) error {
  788. sshServer.clientsMutex.Lock()
  789. client := sshServer.clients[sessionID]
  790. sshServer.clientsMutex.Unlock()
  791. if client == nil {
  792. return errors.TraceNew("unknown session ID")
  793. }
  794. client.updateAPIParameters(apiParams)
  795. return nil
  796. }
  797. func (sshServer *sshServer) revokeClientAuthorizations(sessionID string) {
  798. sshServer.clientsMutex.Lock()
  799. client := sshServer.clients[sessionID]
  800. sshServer.clientsMutex.Unlock()
  801. if client == nil {
  802. return
  803. }
  804. // sshClient.handshakeState.authorizedAccessTypes is not cleared. Clearing
  805. // authorizedAccessTypes may cause sshClient.logTunnel to fail to log
  806. // access types. As the revocation may be due to legitimate use of an
  807. // authorization in multiple sessions by a single client, useful metrics
  808. // would be lost.
  809. client.Lock()
  810. client.handshakeState.authorizationsRevoked = true
  811. client.Unlock()
  812. // Select and apply new traffic rules, as filtered by the client's new
  813. // authorization state.
  814. client.setTrafficRules()
  815. }
  816. func (sshServer *sshServer) expectClientDomainBytes(
  817. sessionID string) (bool, error) {
  818. sshServer.clientsMutex.Lock()
  819. client := sshServer.clients[sessionID]
  820. sshServer.clientsMutex.Unlock()
  821. if client == nil {
  822. return false, errors.TraceNew("unknown session ID")
  823. }
  824. return client.expectDomainBytes(), nil
  825. }
  826. func (sshServer *sshServer) stopClients() {
  827. sshServer.clientsMutex.Lock()
  828. sshServer.stoppingClients = true
  829. clients := sshServer.clients
  830. sshServer.clients = make(map[string]*sshClient)
  831. sshServer.clientsMutex.Unlock()
  832. for _, client := range clients {
  833. client.stop()
  834. }
  835. }
  836. func (sshServer *sshServer) handleClient(
  837. sshListener *sshListener, tunnelProtocol string, clientConn net.Conn) {
  838. // Calling clientConn.RemoteAddr at this point, before any Read calls,
  839. // satisfies the constraint documented in tapdance.Listen.
  840. clientAddr := clientConn.RemoteAddr()
  841. // Check if there were irregularities during the network connection
  842. // establishment. When present, log and then behave as Obfuscated SSH does
  843. // when the client fails to provide a valid seed message.
  844. //
  845. // One concrete irregular case is failure to send a PROXY protocol header for
  846. // TAPDANCE-OSSH.
  847. if indicator, ok := clientConn.(common.IrregularIndicator); ok {
  848. tunnelErr := indicator.IrregularTunnelError()
  849. if tunnelErr != nil {
  850. logIrregularTunnel(
  851. sshServer.support,
  852. sshListener.tunnelProtocol,
  853. sshListener.port,
  854. common.IPAddressFromAddr(clientAddr),
  855. errors.Trace(tunnelErr),
  856. nil)
  857. var afterFunc *time.Timer
  858. if sshServer.support.Config.sshHandshakeTimeout > 0 {
  859. afterFunc = time.AfterFunc(sshServer.support.Config.sshHandshakeTimeout, func() {
  860. clientConn.Close()
  861. })
  862. }
  863. io.Copy(ioutil.Discard, clientConn)
  864. clientConn.Close()
  865. afterFunc.Stop()
  866. return
  867. }
  868. }
  869. // Get any packet manipulation values from GetAppliedSpecName as soon as
  870. // possible due to the expiring TTL.
  871. serverPacketManipulation := ""
  872. replayedServerPacketManipulation := false
  873. if sshServer.support.Config.RunPacketManipulator &&
  874. protocol.TunnelProtocolMayUseServerPacketManipulation(tunnelProtocol) {
  875. // A meekConn has synthetic address values, including the original client
  876. // address in cases where the client uses an upstream proxy to connect to
  877. // Psiphon. For meekConn, and any other conn implementing
  878. // UnderlyingTCPAddrSource, get the underlying TCP connection addresses.
  879. //
  880. // Limitation: a meek tunnel may consist of several TCP connections. The
  881. // server_packet_manipulation metric will reflect the packet manipulation
  882. // applied to the _first_ TCP connection only.
  883. var localAddr, remoteAddr *net.TCPAddr
  884. var ok bool
  885. underlying, ok := clientConn.(common.UnderlyingTCPAddrSource)
  886. if ok {
  887. localAddr, remoteAddr, ok = underlying.GetUnderlyingTCPAddrs()
  888. } else {
  889. localAddr, ok = clientConn.LocalAddr().(*net.TCPAddr)
  890. if ok {
  891. remoteAddr, ok = clientConn.RemoteAddr().(*net.TCPAddr)
  892. }
  893. }
  894. if ok {
  895. specName, extraData, err := sshServer.support.PacketManipulator.
  896. GetAppliedSpecName(localAddr, remoteAddr)
  897. if err == nil {
  898. serverPacketManipulation = specName
  899. replayedServerPacketManipulation, _ = extraData.(bool)
  900. }
  901. }
  902. }
  903. geoIPData := sshServer.support.GeoIPService.Lookup(
  904. common.IPAddressFromAddr(clientAddr), true)
  905. sshServer.registerAcceptedClient(tunnelProtocol, geoIPData.Country)
  906. defer sshServer.unregisterAcceptedClient(tunnelProtocol, geoIPData.Country)
  907. // When configured, enforce a cap on the number of concurrent SSH
  908. // handshakes. This limits load spikes on busy servers when many clients
  909. // attempt to connect at once. Wait a short time, SSH_BEGIN_HANDSHAKE_TIMEOUT,
  910. // to acquire; waiting will avoid immediately creating more load on another
  911. // server in the network when the client tries a new candidate. Disconnect the
  912. // client when that wait time is exceeded.
  913. //
  914. // This mechanism limits memory allocations and CPU usage associated with the
  915. // SSH handshake. At this point, new direct TCP connections or new meek
  916. // connections, with associated resource usage, are already established. Those
  917. // connections are expected to be rate or load limited using other mechanisms.
  918. //
  919. // TODO:
  920. //
  921. // - deduct time spent acquiring the semaphore from SSH_HANDSHAKE_TIMEOUT in
  922. // sshClient.run, since the client is also applying an SSH handshake timeout
  923. // and won't exclude time spent waiting.
  924. // - each call to sshServer.handleClient (in sshServer.runListener) is invoked
  925. // in its own goroutine, but shutdown doesn't synchronously await these
  926. // goroutnes. Once this is synchronizes, the following context.WithTimeout
  927. // should use an sshServer parent context to ensure blocking acquires
  928. // interrupt immediately upon shutdown.
  929. var onSSHHandshakeFinished func()
  930. if sshServer.support.Config.MaxConcurrentSSHHandshakes > 0 {
  931. ctx, cancelFunc := context.WithTimeout(
  932. context.Background(),
  933. sshServer.support.Config.sshBeginHandshakeTimeout)
  934. defer cancelFunc()
  935. err := sshServer.concurrentSSHHandshakes.Acquire(ctx, 1)
  936. if err != nil {
  937. clientConn.Close()
  938. // This is a debug log as the only possible error is context timeout.
  939. log.WithTraceFields(LogFields{"error": err}).Debug(
  940. "acquire SSH handshake semaphore failed")
  941. return
  942. }
  943. onSSHHandshakeFinished = func() {
  944. sshServer.concurrentSSHHandshakes.Release(1)
  945. }
  946. }
  947. sshClient := newSshClient(
  948. sshServer,
  949. sshListener,
  950. tunnelProtocol,
  951. serverPacketManipulation,
  952. replayedServerPacketManipulation,
  953. geoIPData)
  954. // sshClient.run _must_ call onSSHHandshakeFinished to release the semaphore:
  955. // in any error case; or, as soon as the SSH handshake phase has successfully
  956. // completed.
  957. sshClient.run(clientConn, onSSHHandshakeFinished)
  958. }
  959. func (sshServer *sshServer) monitorPortForwardDialError(err error) {
  960. // "err" is the error returned from a failed TCP or UDP port
  961. // forward dial. Certain system error codes indicate low resource
  962. // conditions: insufficient file descriptors, ephemeral ports, or
  963. // memory. For these cases, log an alert.
  964. // TODO: also temporarily suspend new clients
  965. // Note: don't log net.OpError.Error() as the full error string
  966. // may contain client destination addresses.
  967. opErr, ok := err.(*net.OpError)
  968. if ok {
  969. if opErr.Err == syscall.EADDRNOTAVAIL ||
  970. opErr.Err == syscall.EAGAIN ||
  971. opErr.Err == syscall.ENOMEM ||
  972. opErr.Err == syscall.EMFILE ||
  973. opErr.Err == syscall.ENFILE {
  974. log.WithTraceFields(
  975. LogFields{"error": opErr.Err}).Error(
  976. "port forward dial failed due to unavailable resource")
  977. }
  978. }
  979. }
  980. type sshClient struct {
  981. sync.Mutex
  982. sshServer *sshServer
  983. sshListener *sshListener
  984. tunnelProtocol string
  985. sshConn ssh.Conn
  986. throttledConn *common.ThrottledConn
  987. serverPacketManipulation string
  988. replayedServerPacketManipulation bool
  989. geoIPData GeoIPData
  990. sessionID string
  991. isFirstTunnelInSession bool
  992. supportsServerRequests bool
  993. handshakeState handshakeState
  994. udpChannel ssh.Channel
  995. packetTunnelChannel ssh.Channel
  996. trafficRules TrafficRules
  997. tcpTrafficState trafficState
  998. udpTrafficState trafficState
  999. qualityMetrics qualityMetrics
  1000. tcpPortForwardLRU *common.LRUConns
  1001. oslClientSeedState *osl.ClientSeedState
  1002. signalIssueSLOKs chan struct{}
  1003. runCtx context.Context
  1004. stopRunning context.CancelFunc
  1005. stopped chan struct{}
  1006. tcpPortForwardDialingAvailableSignal context.CancelFunc
  1007. releaseAuthorizations func()
  1008. stopTimer *time.Timer
  1009. preHandshakeRandomStreamMetrics randomStreamMetrics
  1010. postHandshakeRandomStreamMetrics randomStreamMetrics
  1011. sendAlertRequests chan protocol.AlertRequest
  1012. sentAlertRequests map[string]bool
  1013. }
  1014. type trafficState struct {
  1015. bytesUp int64
  1016. bytesDown int64
  1017. concurrentDialingPortForwardCount int64
  1018. peakConcurrentDialingPortForwardCount int64
  1019. concurrentPortForwardCount int64
  1020. peakConcurrentPortForwardCount int64
  1021. totalPortForwardCount int64
  1022. availablePortForwardCond *sync.Cond
  1023. }
  1024. type randomStreamMetrics struct {
  1025. count int
  1026. upstreamBytes int
  1027. receivedUpstreamBytes int
  1028. downstreamBytes int
  1029. sentDownstreamBytes int
  1030. }
  1031. // qualityMetrics records upstream TCP dial attempts and
  1032. // elapsed time. Elapsed time includes the full TCP handshake
  1033. // and, in aggregate, is a measure of the quality of the
  1034. // upstream link. These stats are recorded by each sshClient
  1035. // and then reported and reset in sshServer.getLoadStats().
  1036. type qualityMetrics struct {
  1037. TCPPortForwardDialedCount int64
  1038. TCPPortForwardDialedDuration time.Duration
  1039. TCPPortForwardFailedCount int64
  1040. TCPPortForwardFailedDuration time.Duration
  1041. TCPPortForwardRejectedDialingLimitCount int64
  1042. TCPPortForwardRejectedDisallowedCount int64
  1043. UDPPortForwardRejectedDisallowedCount int64
  1044. TCPIPv4PortForwardDialedCount int64
  1045. TCPIPv4PortForwardDialedDuration time.Duration
  1046. TCPIPv4PortForwardFailedCount int64
  1047. TCPIPv4PortForwardFailedDuration time.Duration
  1048. TCPIPv6PortForwardDialedCount int64
  1049. TCPIPv6PortForwardDialedDuration time.Duration
  1050. TCPIPv6PortForwardFailedCount int64
  1051. TCPIPv6PortForwardFailedDuration time.Duration
  1052. }
  1053. type handshakeState struct {
  1054. completed bool
  1055. apiProtocol string
  1056. apiParams common.APIParameters
  1057. activeAuthorizationIDs []string
  1058. authorizedAccessTypes []string
  1059. authorizationsRevoked bool
  1060. expectDomainBytes bool
  1061. establishedTunnelsCount int
  1062. splitTunnel bool
  1063. }
  1064. type handshakeStateInfo struct {
  1065. activeAuthorizationIDs []string
  1066. authorizedAccessTypes []string
  1067. upstreamBytesPerSecond int64
  1068. downstreamBytesPerSecond int64
  1069. }
  1070. func newSshClient(
  1071. sshServer *sshServer,
  1072. sshListener *sshListener,
  1073. tunnelProtocol string,
  1074. serverPacketManipulation string,
  1075. replayedServerPacketManipulation bool,
  1076. geoIPData GeoIPData) *sshClient {
  1077. runCtx, stopRunning := context.WithCancel(context.Background())
  1078. // isFirstTunnelInSession is defaulted to true so that the pre-handshake
  1079. // traffic rules won't apply UnthrottleFirstTunnelOnly and negate any
  1080. // unthrottled bytes during the initial protocol negotiation.
  1081. client := &sshClient{
  1082. sshServer: sshServer,
  1083. sshListener: sshListener,
  1084. tunnelProtocol: tunnelProtocol,
  1085. serverPacketManipulation: serverPacketManipulation,
  1086. replayedServerPacketManipulation: replayedServerPacketManipulation,
  1087. geoIPData: geoIPData,
  1088. isFirstTunnelInSession: true,
  1089. tcpPortForwardLRU: common.NewLRUConns(),
  1090. signalIssueSLOKs: make(chan struct{}, 1),
  1091. runCtx: runCtx,
  1092. stopRunning: stopRunning,
  1093. stopped: make(chan struct{}),
  1094. sendAlertRequests: make(chan protocol.AlertRequest, ALERT_REQUEST_QUEUE_BUFFER_SIZE),
  1095. sentAlertRequests: make(map[string]bool),
  1096. }
  1097. client.tcpTrafficState.availablePortForwardCond = sync.NewCond(new(sync.Mutex))
  1098. client.udpTrafficState.availablePortForwardCond = sync.NewCond(new(sync.Mutex))
  1099. return client
  1100. }
  1101. func (sshClient *sshClient) run(
  1102. baseConn net.Conn, onSSHHandshakeFinished func()) {
  1103. // When run returns, the client has fully stopped, with all SSH state torn
  1104. // down and no port forwards or API requests in progress.
  1105. defer close(sshClient.stopped)
  1106. // onSSHHandshakeFinished must be called even if the SSH handshake is aborted.
  1107. defer func() {
  1108. if onSSHHandshakeFinished != nil {
  1109. onSSHHandshakeFinished()
  1110. }
  1111. }()
  1112. // Set initial traffic rules, pre-handshake, based on currently known info.
  1113. sshClient.setTrafficRules()
  1114. conn := baseConn
  1115. // Wrap the base client connection with an ActivityMonitoredConn which will
  1116. // terminate the connection if no data is received before the deadline. This
  1117. // timeout is in effect for the entire duration of the SSH connection. Clients
  1118. // must actively use the connection or send SSH keep alive requests to keep
  1119. // the connection active. Writes are not considered reliable activity indicators
  1120. // due to buffering.
  1121. activityConn, err := common.NewActivityMonitoredConn(
  1122. conn,
  1123. SSH_CONNECTION_READ_DEADLINE,
  1124. false,
  1125. nil,
  1126. nil)
  1127. if err != nil {
  1128. conn.Close()
  1129. if !isExpectedTunnelIOError(err) {
  1130. log.WithTraceFields(LogFields{"error": err}).Error("NewActivityMonitoredConn failed")
  1131. }
  1132. return
  1133. }
  1134. conn = activityConn
  1135. // Further wrap the connection with burst monitoring, when enabled.
  1136. //
  1137. // Limitation: burst parameters are fixed for the duration of the tunnel
  1138. // and do not change after a tactics hot reload.
  1139. var burstConn *common.BurstMonitoredConn
  1140. p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.geoIPData)
  1141. if err != nil {
  1142. log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
  1143. "ServerTacticsParametersCache.Get failed")
  1144. return
  1145. }
  1146. if !p.IsNil() {
  1147. upstreamTargetBytes := int64(p.Int(parameters.ServerBurstUpstreamTargetBytes))
  1148. upstreamDeadline := p.Duration(parameters.ServerBurstUpstreamDeadline)
  1149. downstreamTargetBytes := int64(p.Int(parameters.ServerBurstDownstreamTargetBytes))
  1150. downstreamDeadline := p.Duration(parameters.ServerBurstDownstreamDeadline)
  1151. if (upstreamDeadline != 0 && upstreamTargetBytes != 0) ||
  1152. (downstreamDeadline != 0 && downstreamTargetBytes != 0) {
  1153. burstConn = common.NewBurstMonitoredConn(
  1154. conn,
  1155. true,
  1156. upstreamTargetBytes, upstreamDeadline,
  1157. downstreamTargetBytes, downstreamDeadline)
  1158. conn = burstConn
  1159. }
  1160. }
  1161. // Allow garbage collection.
  1162. p.Close()
  1163. // Further wrap the connection in a rate limiting ThrottledConn.
  1164. throttledConn := common.NewThrottledConn(conn, sshClient.rateLimits())
  1165. conn = throttledConn
  1166. // Replay of server-side parameters is set or extended after a new tunnel
  1167. // meets duration and bytes transferred targets. Set a timer now that expires
  1168. // shortly after the target duration. When the timer fires, check the time of
  1169. // last byte read (a read indicating a live connection with the client),
  1170. // along with total bytes transferred and set or extend replay if the targets
  1171. // are met.
  1172. //
  1173. // Both target checks are conservative: the tunnel may be healthy, but a byte
  1174. // may not have been read in the last second when the timer fires. Or bytes
  1175. // may be transferring, but not at the target level. Only clients that meet
  1176. // the strict targets at the single check time will trigger replay; however,
  1177. // this replay will impact all clients with similar GeoIP data.
  1178. //
  1179. // A deferred function cancels the timer and also increments the replay
  1180. // failure counter, which will ultimately clear replay parameters, when the
  1181. // tunnel fails before the API handshake is completed (this includes any
  1182. // liveness test).
  1183. //
  1184. // A tunnel which fails to meet the targets but successfully completes any
  1185. // liveness test and the API handshake is ignored in terms of replay scoring.
  1186. isReplayCandidate, replayWaitDuration, replayTargetDuration :=
  1187. sshClient.sshServer.support.ReplayCache.GetReplayTargetDuration(sshClient.geoIPData)
  1188. if isReplayCandidate {
  1189. getFragmentorSeed := func() *prng.Seed {
  1190. fragmentor, ok := baseConn.(common.FragmentorReplayAccessor)
  1191. if ok {
  1192. fragmentorSeed, _ := fragmentor.GetReplay()
  1193. return fragmentorSeed
  1194. }
  1195. return nil
  1196. }
  1197. setReplayAfterFunc := time.AfterFunc(
  1198. replayWaitDuration,
  1199. func() {
  1200. if activityConn.GetActiveDuration() >= replayTargetDuration {
  1201. sshClient.Lock()
  1202. bytesUp := sshClient.tcpTrafficState.bytesUp + sshClient.udpTrafficState.bytesUp
  1203. bytesDown := sshClient.tcpTrafficState.bytesDown + sshClient.udpTrafficState.bytesDown
  1204. sshClient.Unlock()
  1205. sshClient.sshServer.support.ReplayCache.SetReplayParameters(
  1206. sshClient.tunnelProtocol,
  1207. sshClient.geoIPData,
  1208. sshClient.serverPacketManipulation,
  1209. getFragmentorSeed(),
  1210. bytesUp,
  1211. bytesDown)
  1212. }
  1213. })
  1214. defer func() {
  1215. setReplayAfterFunc.Stop()
  1216. completed, _ := sshClient.getHandshaked()
  1217. if !completed {
  1218. // Count a replay failure case when a tunnel used replay parameters
  1219. // (excluding OSSH fragmentation, which doesn't use the ReplayCache) and
  1220. // failed to complete the API handshake.
  1221. replayedFragmentation := false
  1222. if sshClient.tunnelProtocol != protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH {
  1223. fragmentor, ok := baseConn.(common.FragmentorReplayAccessor)
  1224. if ok {
  1225. _, replayedFragmentation = fragmentor.GetReplay()
  1226. }
  1227. }
  1228. usedReplay := replayedFragmentation || sshClient.replayedServerPacketManipulation
  1229. if usedReplay {
  1230. sshClient.sshServer.support.ReplayCache.FailedReplayParameters(
  1231. sshClient.tunnelProtocol,
  1232. sshClient.geoIPData,
  1233. sshClient.serverPacketManipulation,
  1234. getFragmentorSeed())
  1235. }
  1236. }
  1237. }()
  1238. }
  1239. // Run the initial [obfuscated] SSH handshake in a goroutine so we can both
  1240. // respect shutdownBroadcast and implement a specific handshake timeout.
  1241. // The timeout is to reclaim network resources in case the handshake takes
  1242. // too long.
  1243. type sshNewServerConnResult struct {
  1244. obfuscatedSSHConn *obfuscator.ObfuscatedSSHConn
  1245. sshConn *ssh.ServerConn
  1246. channels <-chan ssh.NewChannel
  1247. requests <-chan *ssh.Request
  1248. err error
  1249. }
  1250. resultChannel := make(chan *sshNewServerConnResult, 2)
  1251. var sshHandshakeAfterFunc *time.Timer
  1252. if sshClient.sshServer.support.Config.sshHandshakeTimeout > 0 {
  1253. sshHandshakeAfterFunc = time.AfterFunc(sshClient.sshServer.support.Config.sshHandshakeTimeout, func() {
  1254. resultChannel <- &sshNewServerConnResult{err: std_errors.New("ssh handshake timeout")}
  1255. })
  1256. }
  1257. go func(baseConn, conn net.Conn) {
  1258. sshServerConfig := &ssh.ServerConfig{
  1259. PasswordCallback: sshClient.passwordCallback,
  1260. AuthLogCallback: sshClient.authLogCallback,
  1261. ServerVersion: sshClient.sshServer.support.Config.SSHServerVersion,
  1262. }
  1263. sshServerConfig.AddHostKey(sshClient.sshServer.sshHostKey)
  1264. var err error
  1265. if protocol.TunnelProtocolUsesObfuscatedSSH(sshClient.tunnelProtocol) {
  1266. // With Encrypt-then-MAC hash algorithms, packet length is
  1267. // transmitted in plaintext, which aids in traffic analysis;
  1268. // clients may still send Encrypt-then-MAC algorithms in their
  1269. // KEX_INIT message, but do not select these algorithms.
  1270. //
  1271. // The exception is TUNNEL_PROTOCOL_SSH, which is intended to appear
  1272. // like SSH on the wire.
  1273. sshServerConfig.NoEncryptThenMACHash = true
  1274. } else {
  1275. // For TUNNEL_PROTOCOL_SSH only, randomize KEX.
  1276. if sshClient.sshServer.support.Config.ObfuscatedSSHKey != "" {
  1277. sshServerConfig.KEXPRNGSeed, err = protocol.DeriveSSHServerKEXPRNGSeed(
  1278. sshClient.sshServer.support.Config.ObfuscatedSSHKey)
  1279. if err != nil {
  1280. err = errors.Trace(err)
  1281. }
  1282. }
  1283. }
  1284. result := &sshNewServerConnResult{}
  1285. // Wrap the connection in an SSH deobfuscator when required.
  1286. if err == nil && protocol.TunnelProtocolUsesObfuscatedSSH(sshClient.tunnelProtocol) {
  1287. // Note: NewServerObfuscatedSSHConn blocks on network I/O
  1288. // TODO: ensure this won't block shutdown
  1289. result.obfuscatedSSHConn, err = obfuscator.NewServerObfuscatedSSHConn(
  1290. conn,
  1291. sshClient.sshServer.support.Config.ObfuscatedSSHKey,
  1292. sshClient.sshServer.obfuscatorSeedHistory,
  1293. func(clientIP string, err error, logFields common.LogFields) {
  1294. logIrregularTunnel(
  1295. sshClient.sshServer.support,
  1296. sshClient.sshListener.tunnelProtocol,
  1297. sshClient.sshListener.port,
  1298. clientIP,
  1299. errors.Trace(err),
  1300. LogFields(logFields))
  1301. })
  1302. if err != nil {
  1303. err = errors.Trace(err)
  1304. } else {
  1305. conn = result.obfuscatedSSHConn
  1306. }
  1307. // Seed the fragmentor, when present, with seed derived from initial
  1308. // obfuscator message. See tactics.Listener.Accept. This must preceed
  1309. // ssh.NewServerConn to ensure fragmentor is seeded before downstream bytes
  1310. // are written.
  1311. if err == nil && sshClient.tunnelProtocol == protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH {
  1312. fragmentor, ok := baseConn.(common.FragmentorReplayAccessor)
  1313. if ok {
  1314. var fragmentorPRNG *prng.PRNG
  1315. fragmentorPRNG, err = result.obfuscatedSSHConn.GetDerivedPRNG("server-side-fragmentor")
  1316. if err != nil {
  1317. err = errors.Trace(err)
  1318. } else {
  1319. fragmentor.SetReplay(fragmentorPRNG)
  1320. }
  1321. }
  1322. }
  1323. }
  1324. if err == nil {
  1325. result.sshConn, result.channels, result.requests, err =
  1326. ssh.NewServerConn(conn, sshServerConfig)
  1327. if err != nil {
  1328. err = errors.Trace(err)
  1329. }
  1330. }
  1331. result.err = err
  1332. resultChannel <- result
  1333. }(baseConn, conn)
  1334. var result *sshNewServerConnResult
  1335. select {
  1336. case result = <-resultChannel:
  1337. case <-sshClient.sshServer.shutdownBroadcast:
  1338. // Close() will interrupt an ongoing handshake
  1339. // TODO: wait for SSH handshake goroutines to exit before returning?
  1340. conn.Close()
  1341. return
  1342. }
  1343. if sshHandshakeAfterFunc != nil {
  1344. sshHandshakeAfterFunc.Stop()
  1345. }
  1346. if result.err != nil {
  1347. conn.Close()
  1348. // This is a Debug log due to noise. The handshake often fails due to I/O
  1349. // errors as clients frequently interrupt connections in progress when
  1350. // client-side load balancing completes a connection to a different server.
  1351. log.WithTraceFields(LogFields{"error": result.err}).Debug("SSH handshake failed")
  1352. return
  1353. }
  1354. // The SSH handshake has finished successfully; notify now to allow other
  1355. // blocked SSH handshakes to proceed.
  1356. if onSSHHandshakeFinished != nil {
  1357. onSSHHandshakeFinished()
  1358. }
  1359. onSSHHandshakeFinished = nil
  1360. sshClient.Lock()
  1361. sshClient.sshConn = result.sshConn
  1362. sshClient.throttledConn = throttledConn
  1363. sshClient.Unlock()
  1364. if !sshClient.sshServer.registerEstablishedClient(sshClient) {
  1365. conn.Close()
  1366. log.WithTrace().Warning("register failed")
  1367. return
  1368. }
  1369. sshClient.runTunnel(result.channels, result.requests)
  1370. // Note: sshServer.unregisterEstablishedClient calls sshClient.stop(),
  1371. // which also closes underlying transport Conn.
  1372. sshClient.sshServer.unregisterEstablishedClient(sshClient)
  1373. // Log tunnel metrics.
  1374. var additionalMetrics []LogFields
  1375. // Add activity and burst metrics.
  1376. //
  1377. // The reported duration is based on last confirmed data transfer, which for
  1378. // sshClient.activityConn.GetActiveDuration() is time of last read byte and
  1379. // not conn close time. This is important for protocols such as meek. For
  1380. // meek, the connection remains open until the HTTP session expires, which
  1381. // may be some time after the tunnel has closed. (The meek protocol has no
  1382. // allowance for signalling payload EOF, and even if it did the client may
  1383. // not have the opportunity to send a final request with an EOF flag set.)
  1384. activityMetrics := make(LogFields)
  1385. activityMetrics["start_time"] = activityConn.GetStartTime()
  1386. activityMetrics["duration"] = int64(activityConn.GetActiveDuration() / time.Millisecond)
  1387. additionalMetrics = append(additionalMetrics, activityMetrics)
  1388. if burstConn != nil {
  1389. // Any outstanding burst should be recorded by burstConn.Close which should
  1390. // be called by unregisterEstablishedClient.
  1391. additionalMetrics = append(
  1392. additionalMetrics, LogFields(burstConn.GetMetrics(activityConn.GetStartTime())))
  1393. }
  1394. // Some conns report additional metrics. Meek conns report resiliency
  1395. // metrics and fragmentor.Conns report fragmentor configs.
  1396. if metricsSource, ok := baseConn.(common.MetricsSource); ok {
  1397. additionalMetrics = append(
  1398. additionalMetrics, LogFields(metricsSource.GetMetrics()))
  1399. }
  1400. if result.obfuscatedSSHConn != nil {
  1401. additionalMetrics = append(
  1402. additionalMetrics, LogFields(result.obfuscatedSSHConn.GetMetrics()))
  1403. }
  1404. // Add server-replay metrics.
  1405. replayMetrics := make(LogFields)
  1406. replayedFragmentation := false
  1407. fragmentor, ok := baseConn.(common.FragmentorReplayAccessor)
  1408. if ok {
  1409. _, replayedFragmentation = fragmentor.GetReplay()
  1410. }
  1411. replayMetrics["server_replay_fragmentation"] = replayedFragmentation
  1412. replayMetrics["server_replay_packet_manipulation"] = sshClient.replayedServerPacketManipulation
  1413. additionalMetrics = append(additionalMetrics, replayMetrics)
  1414. sshClient.logTunnel(additionalMetrics)
  1415. // Transfer OSL seed state -- the OSL progress -- from the closing
  1416. // client to the session cache so the client can resume its progress
  1417. // if it reconnects to this same server.
  1418. // Note: following setOSLConfig order of locking.
  1419. sshClient.Lock()
  1420. if sshClient.oslClientSeedState != nil {
  1421. sshClient.sshServer.oslSessionCacheMutex.Lock()
  1422. sshClient.oslClientSeedState.Hibernate()
  1423. sshClient.sshServer.oslSessionCache.Set(
  1424. sshClient.sessionID, sshClient.oslClientSeedState, cache.DefaultExpiration)
  1425. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  1426. sshClient.oslClientSeedState = nil
  1427. }
  1428. sshClient.Unlock()
  1429. // Initiate cleanup of the GeoIP session cache. To allow for post-tunnel
  1430. // final status requests, the lifetime of cached GeoIP records exceeds the
  1431. // lifetime of the sshClient.
  1432. sshClient.sshServer.support.GeoIPService.MarkSessionCacheToExpire(sshClient.sessionID)
  1433. }
  1434. func (sshClient *sshClient) passwordCallback(conn ssh.ConnMetadata, password []byte) (*ssh.Permissions, error) {
  1435. expectedSessionIDLength := 2 * protocol.PSIPHON_API_CLIENT_SESSION_ID_LENGTH
  1436. expectedSSHPasswordLength := 2 * SSH_PASSWORD_BYTE_LENGTH
  1437. var sshPasswordPayload protocol.SSHPasswordPayload
  1438. err := json.Unmarshal(password, &sshPasswordPayload)
  1439. if err != nil {
  1440. // Backwards compatibility case: instead of a JSON payload, older clients
  1441. // send the hex encoded session ID prepended to the SSH password.
  1442. // Note: there's an even older case where clients don't send any session ID,
  1443. // but that's no longer supported.
  1444. if len(password) == expectedSessionIDLength+expectedSSHPasswordLength {
  1445. sshPasswordPayload.SessionId = string(password[0:expectedSessionIDLength])
  1446. sshPasswordPayload.SshPassword = string(password[expectedSessionIDLength:])
  1447. } else {
  1448. return nil, errors.Tracef("invalid password payload for %q", conn.User())
  1449. }
  1450. }
  1451. if !isHexDigits(sshClient.sshServer.support.Config, sshPasswordPayload.SessionId) ||
  1452. len(sshPasswordPayload.SessionId) != expectedSessionIDLength {
  1453. return nil, errors.Tracef("invalid session ID for %q", conn.User())
  1454. }
  1455. userOk := (subtle.ConstantTimeCompare(
  1456. []byte(conn.User()), []byte(sshClient.sshServer.support.Config.SSHUserName)) == 1)
  1457. passwordOk := (subtle.ConstantTimeCompare(
  1458. []byte(sshPasswordPayload.SshPassword), []byte(sshClient.sshServer.support.Config.SSHPassword)) == 1)
  1459. if !userOk || !passwordOk {
  1460. return nil, errors.Tracef("invalid password for %q", conn.User())
  1461. }
  1462. sessionID := sshPasswordPayload.SessionId
  1463. // The GeoIP session cache will be populated if there was a previous tunnel
  1464. // with this session ID. This will be true up to GEOIP_SESSION_CACHE_TTL, which
  1465. // is currently much longer than the OSL session cache, another option to use if
  1466. // the GeoIP session cache is retired (the GeoIP session cache currently only
  1467. // supports legacy use cases).
  1468. isFirstTunnelInSession := !sshClient.sshServer.support.GeoIPService.InSessionCache(sessionID)
  1469. supportsServerRequests := common.Contains(
  1470. sshPasswordPayload.ClientCapabilities, protocol.CLIENT_CAPABILITY_SERVER_REQUESTS)
  1471. sshClient.Lock()
  1472. // After this point, these values are read-only as they are read
  1473. // without obtaining sshClient.Lock.
  1474. sshClient.sessionID = sessionID
  1475. sshClient.isFirstTunnelInSession = isFirstTunnelInSession
  1476. sshClient.supportsServerRequests = supportsServerRequests
  1477. geoIPData := sshClient.geoIPData
  1478. sshClient.Unlock()
  1479. // Store the GeoIP data associated with the session ID. This makes
  1480. // the GeoIP data available to the web server for web API requests.
  1481. // A cache that's distinct from the sshClient record is used to allow
  1482. // for or post-tunnel final status requests.
  1483. // If the client is reconnecting with the same session ID, this call
  1484. // will undo the expiry set by MarkSessionCacheToExpire.
  1485. sshClient.sshServer.support.GeoIPService.SetSessionCache(sessionID, geoIPData)
  1486. return nil, nil
  1487. }
  1488. func (sshClient *sshClient) authLogCallback(conn ssh.ConnMetadata, method string, err error) {
  1489. if err != nil {
  1490. if method == "none" && err.Error() == "ssh: no auth passed yet" {
  1491. // In this case, the callback invocation is noise from auth negotiation
  1492. return
  1493. }
  1494. // Note: here we previously logged messages for fail2ban to act on. This is no longer
  1495. // done as the complexity outweighs the benefits.
  1496. //
  1497. // - The SSH credential is not secret -- it's in the server entry. Attackers targeting
  1498. // the server likely already have the credential. On the other hand, random scanning and
  1499. // brute forcing is mitigated with high entropy random passwords, rate limiting
  1500. // (implemented on the host via iptables), and limited capabilities (the SSH session can
  1501. // only port forward).
  1502. //
  1503. // - fail2ban coverage was inconsistent; in the case of an unfronted meek protocol through
  1504. // an upstream proxy, the remote address is the upstream proxy, which should not be blocked.
  1505. // The X-Forwarded-For header cant be used instead as it may be forged and used to get IPs
  1506. // deliberately blocked; and in any case fail2ban adds iptables rules which can only block
  1507. // by direct remote IP, not by original client IP. Fronted meek has the same iptables issue.
  1508. //
  1509. // Random scanning and brute forcing of port 22 will result in log noise. To mitigate this,
  1510. // not every authentication failure is logged. A summary log is emitted periodically to
  1511. // retain some record of this activity in case this is relevant to, e.g., a performance
  1512. // investigation.
  1513. atomic.AddInt64(&sshClient.sshServer.authFailedCount, 1)
  1514. lastAuthLog := monotime.Time(atomic.LoadInt64(&sshClient.sshServer.lastAuthLog))
  1515. if monotime.Since(lastAuthLog) > SSH_AUTH_LOG_PERIOD {
  1516. now := int64(monotime.Now())
  1517. if atomic.CompareAndSwapInt64(&sshClient.sshServer.lastAuthLog, int64(lastAuthLog), now) {
  1518. count := atomic.SwapInt64(&sshClient.sshServer.authFailedCount, 0)
  1519. log.WithTraceFields(
  1520. LogFields{"lastError": err, "failedCount": count}).Warning("authentication failures")
  1521. }
  1522. }
  1523. log.WithTraceFields(LogFields{"error": err, "method": method}).Debug("authentication failed")
  1524. } else {
  1525. log.WithTraceFields(LogFields{"error": err, "method": method}).Debug("authentication success")
  1526. }
  1527. }
  1528. // stop signals the ssh connection to shutdown. After sshConn.Wait returns,
  1529. // the SSH connection has terminated but sshClient.run may still be running and
  1530. // in the process of exiting.
  1531. //
  1532. // The shutdown process must complete rapidly and not, e.g., block on network
  1533. // I/O, as newly connecting clients need to await stop completion of any
  1534. // existing connection that shares the same session ID.
  1535. func (sshClient *sshClient) stop() {
  1536. sshClient.sshConn.Close()
  1537. sshClient.sshConn.Wait()
  1538. }
  1539. // awaitStopped will block until sshClient.run has exited, at which point all
  1540. // worker goroutines associated with the sshClient, including any in-flight
  1541. // API handlers, will have exited.
  1542. func (sshClient *sshClient) awaitStopped() {
  1543. <-sshClient.stopped
  1544. }
  1545. // runTunnel handles/dispatches new channels and new requests from the client.
  1546. // When the SSH client connection closes, both the channels and requests channels
  1547. // will close and runTunnel will exit.
  1548. func (sshClient *sshClient) runTunnel(
  1549. channels <-chan ssh.NewChannel,
  1550. requests <-chan *ssh.Request) {
  1551. waitGroup := new(sync.WaitGroup)
  1552. // Start client SSH API request handler
  1553. waitGroup.Add(1)
  1554. go func() {
  1555. defer waitGroup.Done()
  1556. sshClient.handleSSHRequests(requests)
  1557. }()
  1558. // Start request senders
  1559. if sshClient.supportsServerRequests {
  1560. waitGroup.Add(1)
  1561. go func() {
  1562. defer waitGroup.Done()
  1563. sshClient.runOSLSender()
  1564. }()
  1565. waitGroup.Add(1)
  1566. go func() {
  1567. defer waitGroup.Done()
  1568. sshClient.runAlertSender()
  1569. }()
  1570. }
  1571. // Start the TCP port forward manager
  1572. // The queue size is set to the traffic rules (MaxTCPPortForwardCount +
  1573. // MaxTCPDialingPortForwardCount), which is a reasonable indication of resource
  1574. // limits per client; when that value is not set, a default is used.
  1575. // A limitation: this queue size is set once and doesn't change, for this client,
  1576. // when traffic rules are reloaded.
  1577. queueSize := sshClient.getTCPPortForwardQueueSize()
  1578. if queueSize == 0 {
  1579. queueSize = SSH_TCP_PORT_FORWARD_QUEUE_SIZE
  1580. }
  1581. newTCPPortForwards := make(chan *newTCPPortForward, queueSize)
  1582. waitGroup.Add(1)
  1583. go func() {
  1584. defer waitGroup.Done()
  1585. sshClient.handleTCPPortForwards(waitGroup, newTCPPortForwards)
  1586. }()
  1587. // Handle new channel (port forward) requests from the client.
  1588. for newChannel := range channels {
  1589. switch newChannel.ChannelType() {
  1590. case protocol.RANDOM_STREAM_CHANNEL_TYPE:
  1591. sshClient.handleNewRandomStreamChannel(waitGroup, newChannel)
  1592. case protocol.PACKET_TUNNEL_CHANNEL_TYPE:
  1593. sshClient.handleNewPacketTunnelChannel(waitGroup, newChannel)
  1594. case protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE:
  1595. // The protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE is the same as
  1596. // "direct-tcpip", except split tunnel channel rejections are disallowed
  1597. // even if the client has enabled split tunnel. This channel type allows
  1598. // the client to ensure tunneling for certain cases while split tunnel is
  1599. // enabled.
  1600. sshClient.handleNewTCPPortForwardChannel(waitGroup, newChannel, false, newTCPPortForwards)
  1601. case "direct-tcpip":
  1602. sshClient.handleNewTCPPortForwardChannel(waitGroup, newChannel, true, newTCPPortForwards)
  1603. default:
  1604. sshClient.rejectNewChannel(newChannel,
  1605. fmt.Sprintf("unknown or unsupported channel type: %s", newChannel.ChannelType()))
  1606. }
  1607. }
  1608. // The channel loop is interrupted by a client
  1609. // disconnect or by calling sshClient.stop().
  1610. // Stop the TCP port forward manager
  1611. close(newTCPPortForwards)
  1612. // Stop all other worker goroutines
  1613. sshClient.stopRunning()
  1614. if sshClient.sshServer.support.Config.RunPacketTunnel {
  1615. // PacketTunnelServer.ClientDisconnected stops packet tunnel workers.
  1616. sshClient.sshServer.support.PacketTunnelServer.ClientDisconnected(
  1617. sshClient.sessionID)
  1618. }
  1619. waitGroup.Wait()
  1620. sshClient.cleanupAuthorizations()
  1621. }
  1622. func (sshClient *sshClient) handleSSHRequests(requests <-chan *ssh.Request) {
  1623. for request := range requests {
  1624. // Requests are processed serially; API responses must be sent in request order.
  1625. var responsePayload []byte
  1626. var err error
  1627. if request.Type == "keepalive@openssh.com" {
  1628. // SSH keep alive round trips are used as speed test samples.
  1629. responsePayload, err = tactics.MakeSpeedTestResponse(
  1630. SSH_KEEP_ALIVE_PAYLOAD_MIN_BYTES, SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES)
  1631. } else {
  1632. // All other requests are assumed to be API requests.
  1633. sshClient.Lock()
  1634. authorizedAccessTypes := sshClient.handshakeState.authorizedAccessTypes
  1635. sshClient.Unlock()
  1636. // Note: unlock before use is only safe as long as referenced sshClient data,
  1637. // such as slices in handshakeState, is read-only after initially set.
  1638. responsePayload, err = sshAPIRequestHandler(
  1639. sshClient.sshServer.support,
  1640. sshClient.geoIPData,
  1641. authorizedAccessTypes,
  1642. request.Type,
  1643. request.Payload)
  1644. }
  1645. if err == nil {
  1646. err = request.Reply(true, responsePayload)
  1647. } else {
  1648. log.WithTraceFields(LogFields{"error": err}).Warning("request failed")
  1649. err = request.Reply(false, nil)
  1650. }
  1651. if err != nil {
  1652. if !isExpectedTunnelIOError(err) {
  1653. log.WithTraceFields(LogFields{"error": err}).Warning("response failed")
  1654. }
  1655. }
  1656. }
  1657. }
  1658. type newTCPPortForward struct {
  1659. enqueueTime time.Time
  1660. hostToConnect string
  1661. portToConnect int
  1662. doSplitTunnel bool
  1663. newChannel ssh.NewChannel
  1664. }
  1665. func (sshClient *sshClient) handleTCPPortForwards(
  1666. waitGroup *sync.WaitGroup,
  1667. newTCPPortForwards chan *newTCPPortForward) {
  1668. // Lifecycle of a TCP port forward:
  1669. //
  1670. // 1. A "direct-tcpip" SSH request is received from the client.
  1671. //
  1672. // A new TCP port forward request is enqueued. The queue delivers TCP port
  1673. // forward requests to the TCP port forward manager, which enforces the TCP
  1674. // port forward dial limit.
  1675. //
  1676. // Enqueuing new requests allows for reading further SSH requests from the
  1677. // client without blocking when the dial limit is hit; this is to permit new
  1678. // UDP/udpgw port forwards to be restablished without delay. The maximum size
  1679. // of the queue enforces a hard cap on resources consumed by a client in the
  1680. // pre-dial phase. When the queue is full, new TCP port forwards are
  1681. // immediately rejected.
  1682. //
  1683. // 2. The TCP port forward manager dequeues the request.
  1684. //
  1685. // The manager calls dialingTCPPortForward(), which increments
  1686. // concurrentDialingPortForwardCount, and calls
  1687. // isTCPDialingPortForwardLimitExceeded() to check the concurrent dialing
  1688. // count.
  1689. //
  1690. // The manager enforces the concurrent TCP dial limit: when at the limit, the
  1691. // manager blocks waiting for the number of dials to drop below the limit before
  1692. // dispatching the request to handleTCPPortForward(), which will run in its own
  1693. // goroutine and will dial and relay the port forward.
  1694. //
  1695. // The block delays the current request and also halts dequeuing of subsequent
  1696. // requests and could ultimately cause requests to be immediately rejected if
  1697. // the queue fills. These actions are intended to apply back pressure when
  1698. // upstream network resources are impaired.
  1699. //
  1700. // The time spent in the queue is deducted from the port forward's dial timeout.
  1701. // The time spent blocking while at the dial limit is similarly deducted from
  1702. // the dial timeout. If the dial timeout has expired before the dial begins, the
  1703. // port forward is rejected and a stat is recorded.
  1704. //
  1705. // 3. handleTCPPortForward() performs the port forward dial and relaying.
  1706. //
  1707. // a. Dial the target, using the dial timeout remaining after queue and blocking
  1708. // time is deducted.
  1709. //
  1710. // b. If the dial fails, call abortedTCPPortForward() to decrement
  1711. // concurrentDialingPortForwardCount, freeing up a dial slot.
  1712. //
  1713. // c. If the dial succeeds, call establishedPortForward(), which decrements
  1714. // concurrentDialingPortForwardCount and increments concurrentPortForwardCount,
  1715. // the "established" port forward count.
  1716. //
  1717. // d. Check isPortForwardLimitExceeded(), which enforces the configurable limit on
  1718. // concurrentPortForwardCount, the number of _established_ TCP port forwards.
  1719. // If the limit is exceeded, the LRU established TCP port forward is closed and
  1720. // the newly established TCP port forward proceeds. This LRU logic allows some
  1721. // dangling resource consumption (e.g., TIME_WAIT) while providing a better
  1722. // experience for clients.
  1723. //
  1724. // e. Relay data.
  1725. //
  1726. // f. Call closedPortForward() which decrements concurrentPortForwardCount and
  1727. // records bytes transferred.
  1728. for newPortForward := range newTCPPortForwards {
  1729. remainingDialTimeout :=
  1730. time.Duration(sshClient.getDialTCPPortForwardTimeoutMilliseconds())*time.Millisecond -
  1731. time.Since(newPortForward.enqueueTime)
  1732. if remainingDialTimeout <= 0 {
  1733. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  1734. sshClient.rejectNewChannel(
  1735. newPortForward.newChannel, "TCP port forward timed out in queue")
  1736. continue
  1737. }
  1738. // Reserve a TCP dialing slot.
  1739. //
  1740. // TOCTOU note: important to increment counts _before_ checking limits; otherwise,
  1741. // the client could potentially consume excess resources by initiating many port
  1742. // forwards concurrently.
  1743. sshClient.dialingTCPPortForward()
  1744. // When max dials are in progress, wait up to remainingDialTimeout for dialing
  1745. // to become available. This blocks all dequeing.
  1746. if sshClient.isTCPDialingPortForwardLimitExceeded() {
  1747. blockStartTime := time.Now()
  1748. ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  1749. sshClient.setTCPPortForwardDialingAvailableSignal(cancelCtx)
  1750. <-ctx.Done()
  1751. sshClient.setTCPPortForwardDialingAvailableSignal(nil)
  1752. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  1753. remainingDialTimeout -= time.Since(blockStartTime)
  1754. }
  1755. if remainingDialTimeout <= 0 {
  1756. // Release the dialing slot here since handleTCPChannel() won't be called.
  1757. sshClient.abortedTCPPortForward()
  1758. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  1759. sshClient.rejectNewChannel(
  1760. newPortForward.newChannel, "TCP port forward timed out before dialing")
  1761. continue
  1762. }
  1763. // Dial and relay the TCP port forward. handleTCPChannel is run in its own worker goroutine.
  1764. // handleTCPChannel will release the dialing slot reserved by dialingTCPPortForward(); and
  1765. // will deal with remainingDialTimeout <= 0.
  1766. waitGroup.Add(1)
  1767. go func(remainingDialTimeout time.Duration, newPortForward *newTCPPortForward) {
  1768. defer waitGroup.Done()
  1769. sshClient.handleTCPChannel(
  1770. remainingDialTimeout,
  1771. newPortForward.hostToConnect,
  1772. newPortForward.portToConnect,
  1773. newPortForward.doSplitTunnel,
  1774. newPortForward.newChannel)
  1775. }(remainingDialTimeout, newPortForward)
  1776. }
  1777. }
  1778. func (sshClient *sshClient) handleNewRandomStreamChannel(
  1779. waitGroup *sync.WaitGroup, newChannel ssh.NewChannel) {
  1780. // A random stream channel returns the requested number of bytes -- random
  1781. // bytes -- to the client while also consuming and discarding bytes sent
  1782. // by the client.
  1783. //
  1784. // One use case for the random stream channel is a liveness test that the
  1785. // client performs to confirm that the tunnel is live. As the liveness
  1786. // test is performed in the concurrent establishment phase, before
  1787. // selecting a single candidate for handshake, the random stream channel
  1788. // is available pre-handshake, albeit with additional restrictions.
  1789. //
  1790. // The random stream is subject to throttling in traffic rules; for
  1791. // unthrottled liveness tests, set initial Read/WriteUnthrottledBytes as
  1792. // required. The random stream maximum count and response size cap
  1793. // mitigate clients abusing the facility to waste server resources.
  1794. //
  1795. // Like all other channels, this channel type is handled asynchronously,
  1796. // so it's possible to run at any point in the tunnel lifecycle.
  1797. //
  1798. // Up/downstream byte counts don't include SSH packet and request
  1799. // marshalling overhead.
  1800. var request protocol.RandomStreamRequest
  1801. err := json.Unmarshal(newChannel.ExtraData(), &request)
  1802. if err != nil {
  1803. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("invalid request: %s", err))
  1804. return
  1805. }
  1806. if request.UpstreamBytes > RANDOM_STREAM_MAX_BYTES {
  1807. sshClient.rejectNewChannel(newChannel,
  1808. fmt.Sprintf("invalid upstream bytes: %d", request.UpstreamBytes))
  1809. return
  1810. }
  1811. if request.DownstreamBytes > RANDOM_STREAM_MAX_BYTES {
  1812. sshClient.rejectNewChannel(newChannel,
  1813. fmt.Sprintf("invalid downstream bytes: %d", request.DownstreamBytes))
  1814. return
  1815. }
  1816. var metrics *randomStreamMetrics
  1817. sshClient.Lock()
  1818. if !sshClient.handshakeState.completed {
  1819. metrics = &sshClient.preHandshakeRandomStreamMetrics
  1820. } else {
  1821. metrics = &sshClient.postHandshakeRandomStreamMetrics
  1822. }
  1823. countOk := true
  1824. if !sshClient.handshakeState.completed &&
  1825. metrics.count >= PRE_HANDSHAKE_RANDOM_STREAM_MAX_COUNT {
  1826. countOk = false
  1827. } else {
  1828. metrics.count++
  1829. }
  1830. sshClient.Unlock()
  1831. if !countOk {
  1832. sshClient.rejectNewChannel(newChannel, "max count exceeded")
  1833. return
  1834. }
  1835. channel, requests, err := newChannel.Accept()
  1836. if err != nil {
  1837. if !isExpectedTunnelIOError(err) {
  1838. log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")
  1839. }
  1840. return
  1841. }
  1842. go ssh.DiscardRequests(requests)
  1843. waitGroup.Add(1)
  1844. go func() {
  1845. defer waitGroup.Done()
  1846. upstream := new(sync.WaitGroup)
  1847. received := 0
  1848. sent := 0
  1849. if request.UpstreamBytes > 0 {
  1850. // Process streams concurrently to minimize elapsed time. This also
  1851. // avoids a unidirectional flow burst early in the tunnel lifecycle.
  1852. upstream.Add(1)
  1853. go func() {
  1854. defer upstream.Done()
  1855. n, err := io.CopyN(ioutil.Discard, channel, int64(request.UpstreamBytes))
  1856. received = int(n)
  1857. if err != nil {
  1858. if !isExpectedTunnelIOError(err) {
  1859. log.WithTraceFields(LogFields{"error": err}).Warning("receive failed")
  1860. }
  1861. }
  1862. }()
  1863. }
  1864. if request.DownstreamBytes > 0 {
  1865. n, err := io.CopyN(channel, rand.Reader, int64(request.DownstreamBytes))
  1866. sent = int(n)
  1867. if err != nil {
  1868. if !isExpectedTunnelIOError(err) {
  1869. log.WithTraceFields(LogFields{"error": err}).Warning("send failed")
  1870. }
  1871. }
  1872. }
  1873. upstream.Wait()
  1874. sshClient.Lock()
  1875. metrics.upstreamBytes += request.UpstreamBytes
  1876. metrics.receivedUpstreamBytes += received
  1877. metrics.downstreamBytes += request.DownstreamBytes
  1878. metrics.sentDownstreamBytes += sent
  1879. sshClient.Unlock()
  1880. channel.Close()
  1881. }()
  1882. }
  1883. func (sshClient *sshClient) handleNewPacketTunnelChannel(
  1884. waitGroup *sync.WaitGroup, newChannel ssh.NewChannel) {
  1885. // packet tunnel channels are handled by the packet tunnel server
  1886. // component. Each client may have at most one packet tunnel channel.
  1887. if !sshClient.sshServer.support.Config.RunPacketTunnel {
  1888. sshClient.rejectNewChannel(newChannel, "unsupported packet tunnel channel type")
  1889. return
  1890. }
  1891. // Accept this channel immediately. This channel will replace any
  1892. // previously existing packet tunnel channel for this client.
  1893. packetTunnelChannel, requests, err := newChannel.Accept()
  1894. if err != nil {
  1895. if !isExpectedTunnelIOError(err) {
  1896. log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")
  1897. }
  1898. return
  1899. }
  1900. go ssh.DiscardRequests(requests)
  1901. sshClient.setPacketTunnelChannel(packetTunnelChannel)
  1902. // PacketTunnelServer will run the client's packet tunnel. If necessary, ClientConnected
  1903. // will stop packet tunnel workers for any previous packet tunnel channel.
  1904. checkAllowedTCPPortFunc := func(upstreamIPAddress net.IP, port int) bool {
  1905. return sshClient.isPortForwardPermitted(portForwardTypeTCP, upstreamIPAddress, port)
  1906. }
  1907. checkAllowedUDPPortFunc := func(upstreamIPAddress net.IP, port int) bool {
  1908. return sshClient.isPortForwardPermitted(portForwardTypeUDP, upstreamIPAddress, port)
  1909. }
  1910. checkAllowedDomainFunc := func(domain string) bool {
  1911. ok, _ := sshClient.isDomainPermitted(domain)
  1912. return ok
  1913. }
  1914. flowActivityUpdaterMaker := func(
  1915. upstreamHostname string, upstreamIPAddress net.IP) []tun.FlowActivityUpdater {
  1916. var updaters []tun.FlowActivityUpdater
  1917. oslUpdater := sshClient.newClientSeedPortForward(upstreamIPAddress)
  1918. if oslUpdater != nil {
  1919. updaters = append(updaters, oslUpdater)
  1920. }
  1921. return updaters
  1922. }
  1923. metricUpdater := func(
  1924. TCPApplicationBytesDown, TCPApplicationBytesUp,
  1925. UDPApplicationBytesDown, UDPApplicationBytesUp int64) {
  1926. sshClient.Lock()
  1927. sshClient.tcpTrafficState.bytesDown += TCPApplicationBytesDown
  1928. sshClient.tcpTrafficState.bytesUp += TCPApplicationBytesUp
  1929. sshClient.udpTrafficState.bytesDown += UDPApplicationBytesDown
  1930. sshClient.udpTrafficState.bytesUp += UDPApplicationBytesUp
  1931. sshClient.Unlock()
  1932. }
  1933. err = sshClient.sshServer.support.PacketTunnelServer.ClientConnected(
  1934. sshClient.sessionID,
  1935. packetTunnelChannel,
  1936. checkAllowedTCPPortFunc,
  1937. checkAllowedUDPPortFunc,
  1938. checkAllowedDomainFunc,
  1939. flowActivityUpdaterMaker,
  1940. metricUpdater)
  1941. if err != nil {
  1942. log.WithTraceFields(LogFields{"error": err}).Warning("start packet tunnel client failed")
  1943. sshClient.setPacketTunnelChannel(nil)
  1944. }
  1945. }
  1946. func (sshClient *sshClient) handleNewTCPPortForwardChannel(
  1947. waitGroup *sync.WaitGroup,
  1948. newChannel ssh.NewChannel,
  1949. allowSplitTunnel bool,
  1950. newTCPPortForwards chan *newTCPPortForward) {
  1951. // udpgw client connections are dispatched immediately (clients use this for
  1952. // DNS, so it's essential to not block; and only one udpgw connection is
  1953. // retained at a time).
  1954. //
  1955. // All other TCP port forwards are dispatched via the TCP port forward
  1956. // manager queue.
  1957. // http://tools.ietf.org/html/rfc4254#section-7.2
  1958. var directTcpipExtraData struct {
  1959. HostToConnect string
  1960. PortToConnect uint32
  1961. OriginatorIPAddress string
  1962. OriginatorPort uint32
  1963. }
  1964. err := ssh.Unmarshal(newChannel.ExtraData(), &directTcpipExtraData)
  1965. if err != nil {
  1966. sshClient.rejectNewChannel(newChannel, "invalid extra data")
  1967. return
  1968. }
  1969. // Intercept TCP port forwards to a specified udpgw server and handle directly.
  1970. // TODO: also support UDP explicitly, e.g. with a custom "direct-udp" channel type?
  1971. isUDPChannel := sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress != "" &&
  1972. sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress ==
  1973. net.JoinHostPort(directTcpipExtraData.HostToConnect, strconv.Itoa(int(directTcpipExtraData.PortToConnect)))
  1974. if isUDPChannel {
  1975. // Dispatch immediately. handleUDPChannel runs the udpgw protocol in its
  1976. // own worker goroutine.
  1977. waitGroup.Add(1)
  1978. go func(channel ssh.NewChannel) {
  1979. defer waitGroup.Done()
  1980. sshClient.handleUDPChannel(channel)
  1981. }(newChannel)
  1982. } else {
  1983. // Dispatch via TCP port forward manager. When the queue is full, the channel
  1984. // is immediately rejected.
  1985. //
  1986. // Split tunnel logic is enabled for this TCP port forward when the client
  1987. // has enabled split tunnel mode and the channel type allows it.
  1988. tcpPortForward := &newTCPPortForward{
  1989. enqueueTime: time.Now(),
  1990. hostToConnect: directTcpipExtraData.HostToConnect,
  1991. portToConnect: int(directTcpipExtraData.PortToConnect),
  1992. doSplitTunnel: sshClient.handshakeState.splitTunnel && allowSplitTunnel,
  1993. newChannel: newChannel,
  1994. }
  1995. select {
  1996. case newTCPPortForwards <- tcpPortForward:
  1997. default:
  1998. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  1999. sshClient.rejectNewChannel(newChannel, "TCP port forward dial queue full")
  2000. }
  2001. }
  2002. }
  2003. func (sshClient *sshClient) cleanupAuthorizations() {
  2004. sshClient.Lock()
  2005. if sshClient.releaseAuthorizations != nil {
  2006. sshClient.releaseAuthorizations()
  2007. }
  2008. if sshClient.stopTimer != nil {
  2009. sshClient.stopTimer.Stop()
  2010. }
  2011. sshClient.Unlock()
  2012. }
  2013. // setPacketTunnelChannel sets the single packet tunnel channel
  2014. // for this sshClient. Any existing packet tunnel channel is
  2015. // closed.
  2016. func (sshClient *sshClient) setPacketTunnelChannel(channel ssh.Channel) {
  2017. sshClient.Lock()
  2018. if sshClient.packetTunnelChannel != nil {
  2019. sshClient.packetTunnelChannel.Close()
  2020. }
  2021. sshClient.packetTunnelChannel = channel
  2022. sshClient.Unlock()
  2023. }
  2024. // setUDPChannel sets the single UDP channel for this sshClient.
  2025. // Each sshClient may have only one concurrent UDP channel. Each
  2026. // UDP channel multiplexes many UDP port forwards via the udpgw
  2027. // protocol. Any existing UDP channel is closed.
  2028. func (sshClient *sshClient) setUDPChannel(channel ssh.Channel) {
  2029. sshClient.Lock()
  2030. if sshClient.udpChannel != nil {
  2031. sshClient.udpChannel.Close()
  2032. }
  2033. sshClient.udpChannel = channel
  2034. sshClient.Unlock()
  2035. }
  2036. var serverTunnelStatParams = append(
  2037. []requestParamSpec{
  2038. {"last_connected", isLastConnected, requestParamOptional},
  2039. {"establishment_duration", isIntString, requestParamOptional}},
  2040. baseSessionAndDialParams...)
  2041. func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
  2042. sshClient.Lock()
  2043. logFields := getRequestLogFields(
  2044. "server_tunnel",
  2045. sshClient.geoIPData,
  2046. sshClient.handshakeState.authorizedAccessTypes,
  2047. sshClient.handshakeState.apiParams,
  2048. serverTunnelStatParams)
  2049. // "relay_protocol" is sent with handshake API parameters. In pre-
  2050. // handshake logTunnel cases, this value is not yet known. As
  2051. // sshClient.tunnelProtocol is authoritative, set this value
  2052. // unconditionally, overwriting any value from handshake.
  2053. logFields["relay_protocol"] = sshClient.tunnelProtocol
  2054. if sshClient.serverPacketManipulation != "" {
  2055. logFields["server_packet_manipulation"] = sshClient.serverPacketManipulation
  2056. }
  2057. if sshClient.sshListener.BPFProgramName != "" {
  2058. logFields["server_bpf"] = sshClient.sshListener.BPFProgramName
  2059. }
  2060. logFields["session_id"] = sshClient.sessionID
  2061. logFields["is_first_tunnel_in_session"] = sshClient.isFirstTunnelInSession
  2062. logFields["handshake_completed"] = sshClient.handshakeState.completed
  2063. logFields["bytes_up_tcp"] = sshClient.tcpTrafficState.bytesUp
  2064. logFields["bytes_down_tcp"] = sshClient.tcpTrafficState.bytesDown
  2065. logFields["peak_concurrent_dialing_port_forward_count_tcp"] = sshClient.tcpTrafficState.peakConcurrentDialingPortForwardCount
  2066. logFields["peak_concurrent_port_forward_count_tcp"] = sshClient.tcpTrafficState.peakConcurrentPortForwardCount
  2067. logFields["total_port_forward_count_tcp"] = sshClient.tcpTrafficState.totalPortForwardCount
  2068. logFields["bytes_up_udp"] = sshClient.udpTrafficState.bytesUp
  2069. logFields["bytes_down_udp"] = sshClient.udpTrafficState.bytesDown
  2070. // sshClient.udpTrafficState.peakConcurrentDialingPortForwardCount isn't meaningful
  2071. logFields["peak_concurrent_port_forward_count_udp"] = sshClient.udpTrafficState.peakConcurrentPortForwardCount
  2072. logFields["total_port_forward_count_udp"] = sshClient.udpTrafficState.totalPortForwardCount
  2073. logFields["pre_handshake_random_stream_count"] = sshClient.preHandshakeRandomStreamMetrics.count
  2074. logFields["pre_handshake_random_stream_upstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.upstreamBytes
  2075. logFields["pre_handshake_random_stream_received_upstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.receivedUpstreamBytes
  2076. logFields["pre_handshake_random_stream_downstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.downstreamBytes
  2077. logFields["pre_handshake_random_stream_sent_downstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.sentDownstreamBytes
  2078. logFields["random_stream_count"] = sshClient.postHandshakeRandomStreamMetrics.count
  2079. logFields["random_stream_upstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.upstreamBytes
  2080. logFields["random_stream_received_upstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.receivedUpstreamBytes
  2081. logFields["random_stream_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.downstreamBytes
  2082. logFields["random_stream_sent_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.sentDownstreamBytes
  2083. // Pre-calculate a total-tunneled-bytes field. This total is used
  2084. // extensively in analytics and is more performant when pre-calculated.
  2085. logFields["bytes"] = sshClient.tcpTrafficState.bytesUp +
  2086. sshClient.tcpTrafficState.bytesDown +
  2087. sshClient.udpTrafficState.bytesUp +
  2088. sshClient.udpTrafficState.bytesDown
  2089. // Merge in additional metrics from the optional metrics source
  2090. for _, metrics := range additionalMetrics {
  2091. for name, value := range metrics {
  2092. // Don't overwrite any basic fields
  2093. if logFields[name] == nil {
  2094. logFields[name] = value
  2095. }
  2096. }
  2097. }
  2098. sshClient.Unlock()
  2099. // Note: unlock before use is only safe as long as referenced sshClient data,
  2100. // such as slices in handshakeState, is read-only after initially set.
  2101. log.LogRawFieldsWithTimestamp(logFields)
  2102. }
  2103. var blocklistHitsStatParams = []requestParamSpec{
  2104. {"propagation_channel_id", isHexDigits, 0},
  2105. {"sponsor_id", isHexDigits, 0},
  2106. {"client_version", isIntString, requestParamLogStringAsInt},
  2107. {"client_platform", isClientPlatform, 0},
  2108. {"client_features", isAnyString, requestParamOptional | requestParamArray},
  2109. {"client_build_rev", isHexDigits, requestParamOptional},
  2110. {"device_region", isAnyString, requestParamOptional},
  2111. {"egress_region", isRegionCode, requestParamOptional},
  2112. {"session_id", isHexDigits, 0},
  2113. {"last_connected", isLastConnected, requestParamOptional},
  2114. }
  2115. func (sshClient *sshClient) logBlocklistHits(IP net.IP, domain string, tags []BlocklistTag) {
  2116. sshClient.Lock()
  2117. logFields := getRequestLogFields(
  2118. "server_blocklist_hit",
  2119. sshClient.geoIPData,
  2120. sshClient.handshakeState.authorizedAccessTypes,
  2121. sshClient.handshakeState.apiParams,
  2122. blocklistHitsStatParams)
  2123. logFields["session_id"] = sshClient.sessionID
  2124. // Note: see comment in logTunnel regarding unlock and concurrent access.
  2125. sshClient.Unlock()
  2126. for _, tag := range tags {
  2127. if IP != nil {
  2128. logFields["blocklist_ip_address"] = IP.String()
  2129. }
  2130. if domain != "" {
  2131. logFields["blocklist_domain"] = domain
  2132. }
  2133. logFields["blocklist_source"] = tag.Source
  2134. logFields["blocklist_subject"] = tag.Subject
  2135. log.LogRawFieldsWithTimestamp(logFields)
  2136. }
  2137. }
  2138. func (sshClient *sshClient) runOSLSender() {
  2139. for {
  2140. // Await a signal that there are SLOKs to send
  2141. // TODO: use reflect.SelectCase, and optionally await timer here?
  2142. select {
  2143. case <-sshClient.signalIssueSLOKs:
  2144. case <-sshClient.runCtx.Done():
  2145. return
  2146. }
  2147. retryDelay := SSH_SEND_OSL_INITIAL_RETRY_DELAY
  2148. for {
  2149. err := sshClient.sendOSLRequest()
  2150. if err == nil {
  2151. break
  2152. }
  2153. if !isExpectedTunnelIOError(err) {
  2154. log.WithTraceFields(LogFields{"error": err}).Warning("sendOSLRequest failed")
  2155. }
  2156. // If the request failed, retry after a delay (with exponential backoff)
  2157. // or when signaled that there are additional SLOKs to send
  2158. retryTimer := time.NewTimer(retryDelay)
  2159. select {
  2160. case <-retryTimer.C:
  2161. case <-sshClient.signalIssueSLOKs:
  2162. case <-sshClient.runCtx.Done():
  2163. retryTimer.Stop()
  2164. return
  2165. }
  2166. retryTimer.Stop()
  2167. retryDelay *= SSH_SEND_OSL_RETRY_FACTOR
  2168. }
  2169. }
  2170. }
  2171. // sendOSLRequest will invoke osl.GetSeedPayload to issue SLOKs and
  2172. // generate a payload, and send an OSL request to the client when
  2173. // there are new SLOKs in the payload.
  2174. func (sshClient *sshClient) sendOSLRequest() error {
  2175. seedPayload := sshClient.getOSLSeedPayload()
  2176. // Don't send when no SLOKs. This will happen when signalIssueSLOKs
  2177. // is received but no new SLOKs are issued.
  2178. if len(seedPayload.SLOKs) == 0 {
  2179. return nil
  2180. }
  2181. oslRequest := protocol.OSLRequest{
  2182. SeedPayload: seedPayload,
  2183. }
  2184. requestPayload, err := json.Marshal(oslRequest)
  2185. if err != nil {
  2186. return errors.Trace(err)
  2187. }
  2188. ok, _, err := sshClient.sshConn.SendRequest(
  2189. protocol.PSIPHON_API_OSL_REQUEST_NAME,
  2190. true,
  2191. requestPayload)
  2192. if err != nil {
  2193. return errors.Trace(err)
  2194. }
  2195. if !ok {
  2196. return errors.TraceNew("client rejected request")
  2197. }
  2198. sshClient.clearOSLSeedPayload()
  2199. return nil
  2200. }
  2201. // runAlertSender dequeues and sends alert requests to the client. As these
  2202. // alerts are informational, there is no retry logic and no SSH client
  2203. // acknowledgement (wantReply) is requested. This worker scheme allows
  2204. // nonconcurrent components including udpgw and packet tunnel to enqueue
  2205. // alerts without blocking their traffic processing.
  2206. func (sshClient *sshClient) runAlertSender() {
  2207. for {
  2208. select {
  2209. case <-sshClient.runCtx.Done():
  2210. return
  2211. case request := <-sshClient.sendAlertRequests:
  2212. payload, err := json.Marshal(request)
  2213. if err != nil {
  2214. log.WithTraceFields(LogFields{"error": err}).Warning("Marshal failed")
  2215. break
  2216. }
  2217. _, _, err = sshClient.sshConn.SendRequest(
  2218. protocol.PSIPHON_API_ALERT_REQUEST_NAME,
  2219. false,
  2220. payload)
  2221. if err != nil && !isExpectedTunnelIOError(err) {
  2222. log.WithTraceFields(LogFields{"error": err}).Warning("SendRequest failed")
  2223. break
  2224. }
  2225. sshClient.Lock()
  2226. sshClient.sentAlertRequests[fmt.Sprintf("%+v", request)] = true
  2227. sshClient.Unlock()
  2228. }
  2229. }
  2230. }
  2231. // enqueueAlertRequest enqueues an alert request to be sent to the client.
  2232. // Only one request is sent per tunnel per protocol.AlertRequest value;
  2233. // subsequent alerts with the same value are dropped. enqueueAlertRequest will
  2234. // not block until the queue exceeds ALERT_REQUEST_QUEUE_BUFFER_SIZE.
  2235. func (sshClient *sshClient) enqueueAlertRequest(request protocol.AlertRequest) {
  2236. sshClient.Lock()
  2237. if sshClient.sentAlertRequests[fmt.Sprintf("%+v", request)] {
  2238. sshClient.Unlock()
  2239. return
  2240. }
  2241. sshClient.Unlock()
  2242. select {
  2243. case <-sshClient.runCtx.Done():
  2244. case sshClient.sendAlertRequests <- request:
  2245. }
  2246. }
  2247. func (sshClient *sshClient) enqueueDisallowedTrafficAlertRequest() {
  2248. reason := protocol.PSIPHON_API_ALERT_DISALLOWED_TRAFFIC
  2249. actionURLs := sshClient.getAlertActionURLs(reason)
  2250. sshClient.enqueueAlertRequest(
  2251. protocol.AlertRequest{
  2252. Reason: protocol.PSIPHON_API_ALERT_DISALLOWED_TRAFFIC,
  2253. ActionURLs: actionURLs,
  2254. })
  2255. }
  2256. func (sshClient *sshClient) enqueueUnsafeTrafficAlertRequest(tags []BlocklistTag) {
  2257. reason := protocol.PSIPHON_API_ALERT_UNSAFE_TRAFFIC
  2258. actionURLs := sshClient.getAlertActionURLs(reason)
  2259. for _, tag := range tags {
  2260. sshClient.enqueueAlertRequest(
  2261. protocol.AlertRequest{
  2262. Reason: reason,
  2263. Subject: tag.Subject,
  2264. ActionURLs: actionURLs,
  2265. })
  2266. }
  2267. }
  2268. func (sshClient *sshClient) getAlertActionURLs(alertReason string) []string {
  2269. sshClient.Lock()
  2270. sponsorID, _ := getStringRequestParam(
  2271. sshClient.handshakeState.apiParams, "sponsor_id")
  2272. sshClient.Unlock()
  2273. return sshClient.sshServer.support.PsinetDatabase.GetAlertActionURLs(
  2274. alertReason,
  2275. sponsorID,
  2276. sshClient.geoIPData.Country,
  2277. sshClient.geoIPData.ASN)
  2278. }
  2279. func (sshClient *sshClient) rejectNewChannel(newChannel ssh.NewChannel, logMessage string) {
  2280. // We always return the reject reason "Prohibited":
  2281. // - Traffic rules and connection limits may prohibit the connection.
  2282. // - External firewall rules may prohibit the connection, and this is not currently
  2283. // distinguishable from other failure modes.
  2284. // - We limit the failure information revealed to the client.
  2285. reason := ssh.Prohibited
  2286. // Note: Debug level, as logMessage may contain user traffic destination address information
  2287. log.WithTraceFields(
  2288. LogFields{
  2289. "channelType": newChannel.ChannelType(),
  2290. "logMessage": logMessage,
  2291. "rejectReason": reason.String(),
  2292. }).Debug("reject new channel")
  2293. // Note: logMessage is internal, for logging only; just the reject reason is sent to the client.
  2294. newChannel.Reject(reason, reason.String())
  2295. }
  2296. // setHandshakeState records that a client has completed a handshake API request.
  2297. // Some parameters from the handshake request may be used in future traffic rule
  2298. // selection. Port forwards are disallowed until a handshake is complete. The
  2299. // handshake parameters are included in the session summary log recorded in
  2300. // sshClient.stop().
  2301. func (sshClient *sshClient) setHandshakeState(
  2302. state handshakeState,
  2303. authorizations []string) (*handshakeStateInfo, error) {
  2304. sshClient.Lock()
  2305. completed := sshClient.handshakeState.completed
  2306. if !completed {
  2307. sshClient.handshakeState = state
  2308. }
  2309. sshClient.Unlock()
  2310. // Client must only perform one handshake
  2311. if completed {
  2312. return nil, errors.TraceNew("handshake already completed")
  2313. }
  2314. // Verify the authorizations submitted by the client. Verified, active
  2315. // (non-expired) access types will be available for traffic rules
  2316. // filtering.
  2317. //
  2318. // When an authorization is active but expires while the client is
  2319. // connected, the client is disconnected to ensure the access is reset.
  2320. // This is implemented by setting a timer to perform the disconnect at the
  2321. // expiry time of the soonest expiring authorization.
  2322. //
  2323. // sshServer.authorizationSessionIDs tracks the unique mapping of active
  2324. // authorization IDs to client session IDs and is used to detect and
  2325. // prevent multiple malicious clients from reusing a single authorization
  2326. // (within the scope of this server).
  2327. // authorizationIDs and authorizedAccessTypes are returned to the client
  2328. // and logged, respectively; initialize to empty lists so the
  2329. // protocol/logs don't need to handle 'null' values.
  2330. authorizationIDs := make([]string, 0)
  2331. authorizedAccessTypes := make([]string, 0)
  2332. var stopTime time.Time
  2333. for i, authorization := range authorizations {
  2334. // This sanity check mitigates malicious clients causing excess CPU use.
  2335. if i >= MAX_AUTHORIZATIONS {
  2336. log.WithTrace().Warning("too many authorizations")
  2337. break
  2338. }
  2339. verifiedAuthorization, err := accesscontrol.VerifyAuthorization(
  2340. &sshClient.sshServer.support.Config.AccessControlVerificationKeyRing,
  2341. authorization)
  2342. if err != nil {
  2343. log.WithTraceFields(
  2344. LogFields{"error": err}).Warning("verify authorization failed")
  2345. continue
  2346. }
  2347. authorizationID := base64.StdEncoding.EncodeToString(verifiedAuthorization.ID)
  2348. if common.Contains(authorizedAccessTypes, verifiedAuthorization.AccessType) {
  2349. log.WithTraceFields(
  2350. LogFields{"accessType": verifiedAuthorization.AccessType}).Warning("duplicate authorization access type")
  2351. continue
  2352. }
  2353. authorizationIDs = append(authorizationIDs, authorizationID)
  2354. authorizedAccessTypes = append(authorizedAccessTypes, verifiedAuthorization.AccessType)
  2355. if stopTime.IsZero() || stopTime.After(verifiedAuthorization.Expires) {
  2356. stopTime = verifiedAuthorization.Expires
  2357. }
  2358. }
  2359. // Associate all verified authorizationIDs with this client's session ID.
  2360. // Handle cases where previous associations exist:
  2361. //
  2362. // - Multiple malicious clients reusing a single authorization. In this
  2363. // case, authorizations are revoked from the previous client.
  2364. //
  2365. // - The client reconnected with a new session ID due to user toggling.
  2366. // This case is expected due to server affinity. This cannot be
  2367. // distinguished from the previous case and the same action is taken;
  2368. // this will have no impact on a legitimate client as the previous
  2369. // session is dangling.
  2370. //
  2371. // - The client automatically reconnected with the same session ID. This
  2372. // case is not expected as sshServer.registerEstablishedClient
  2373. // synchronously calls sshClient.releaseAuthorizations; as a safe guard,
  2374. // this case is distinguished and no revocation action is taken.
  2375. sshClient.sshServer.authorizationSessionIDsMutex.Lock()
  2376. for _, authorizationID := range authorizationIDs {
  2377. sessionID, ok := sshClient.sshServer.authorizationSessionIDs[authorizationID]
  2378. if ok && sessionID != sshClient.sessionID {
  2379. logFields := LogFields{
  2380. "event_name": "irregular_tunnel",
  2381. "tunnel_error": "duplicate active authorization",
  2382. "duplicate_authorization_id": authorizationID,
  2383. }
  2384. sshClient.geoIPData.SetLogFields(logFields)
  2385. duplicateGeoIPData := sshClient.sshServer.support.GeoIPService.GetSessionCache(sessionID)
  2386. if duplicateGeoIPData != sshClient.geoIPData {
  2387. duplicateGeoIPData.SetLogFieldsWithPrefix("duplicate_authorization_", logFields)
  2388. }
  2389. log.LogRawFieldsWithTimestamp(logFields)
  2390. // Invoke asynchronously to avoid deadlocks.
  2391. // TODO: invoke only once for each distinct sessionID?
  2392. go sshClient.sshServer.revokeClientAuthorizations(sessionID)
  2393. }
  2394. sshClient.sshServer.authorizationSessionIDs[authorizationID] = sshClient.sessionID
  2395. }
  2396. sshClient.sshServer.authorizationSessionIDsMutex.Unlock()
  2397. if len(authorizationIDs) > 0 {
  2398. sshClient.Lock()
  2399. // Make the authorizedAccessTypes available for traffic rules filtering.
  2400. sshClient.handshakeState.activeAuthorizationIDs = authorizationIDs
  2401. sshClient.handshakeState.authorizedAccessTypes = authorizedAccessTypes
  2402. // On exit, sshClient.runTunnel will call releaseAuthorizations, which
  2403. // will release the authorization IDs so the client can reconnect and
  2404. // present the same authorizations again. sshClient.runTunnel will
  2405. // also cancel the stopTimer in case it has not yet fired.
  2406. // Note: termination of the stopTimer goroutine is not synchronized.
  2407. sshClient.releaseAuthorizations = func() {
  2408. sshClient.sshServer.authorizationSessionIDsMutex.Lock()
  2409. for _, authorizationID := range authorizationIDs {
  2410. sessionID, ok := sshClient.sshServer.authorizationSessionIDs[authorizationID]
  2411. if ok && sessionID == sshClient.sessionID {
  2412. delete(sshClient.sshServer.authorizationSessionIDs, authorizationID)
  2413. }
  2414. }
  2415. sshClient.sshServer.authorizationSessionIDsMutex.Unlock()
  2416. }
  2417. sshClient.stopTimer = time.AfterFunc(
  2418. time.Until(stopTime),
  2419. func() {
  2420. sshClient.stop()
  2421. })
  2422. sshClient.Unlock()
  2423. }
  2424. upstreamBytesPerSecond, downstreamBytesPerSecond := sshClient.setTrafficRules()
  2425. sshClient.setOSLConfig()
  2426. return &handshakeStateInfo{
  2427. activeAuthorizationIDs: authorizationIDs,
  2428. authorizedAccessTypes: authorizedAccessTypes,
  2429. upstreamBytesPerSecond: upstreamBytesPerSecond,
  2430. downstreamBytesPerSecond: downstreamBytesPerSecond,
  2431. }, nil
  2432. }
  2433. // getHandshaked returns whether the client has completed a handshake API
  2434. // request and whether the traffic rules that were selected after the
  2435. // handshake immediately exhaust the client.
  2436. //
  2437. // When the client is immediately exhausted it will be closed; but this
  2438. // takes effect asynchronously. The "exhausted" return value is used to
  2439. // prevent API requests by clients that will close.
  2440. func (sshClient *sshClient) getHandshaked() (bool, bool) {
  2441. sshClient.Lock()
  2442. defer sshClient.Unlock()
  2443. completed := sshClient.handshakeState.completed
  2444. exhausted := false
  2445. // Notes:
  2446. // - "Immediately exhausted" is when CloseAfterExhausted is set and
  2447. // either ReadUnthrottledBytes or WriteUnthrottledBytes starts from
  2448. // 0, so no bytes would be read or written. This check does not
  2449. // examine whether 0 bytes _remain_ in the ThrottledConn.
  2450. // - This check is made against the current traffic rules, which
  2451. // could have changed in a hot reload since the handshake.
  2452. if completed &&
  2453. *sshClient.trafficRules.RateLimits.CloseAfterExhausted &&
  2454. (*sshClient.trafficRules.RateLimits.ReadUnthrottledBytes == 0 ||
  2455. *sshClient.trafficRules.RateLimits.WriteUnthrottledBytes == 0) {
  2456. exhausted = true
  2457. }
  2458. return completed, exhausted
  2459. }
  2460. func (sshClient *sshClient) updateAPIParameters(
  2461. apiParams common.APIParameters) {
  2462. sshClient.Lock()
  2463. defer sshClient.Unlock()
  2464. // Only update after handshake has initialized API params.
  2465. if !sshClient.handshakeState.completed {
  2466. return
  2467. }
  2468. for name, value := range apiParams {
  2469. sshClient.handshakeState.apiParams[name] = value
  2470. }
  2471. }
  2472. func (sshClient *sshClient) expectDomainBytes() bool {
  2473. sshClient.Lock()
  2474. defer sshClient.Unlock()
  2475. return sshClient.handshakeState.expectDomainBytes
  2476. }
  2477. // setTrafficRules resets the client's traffic rules based on the latest server config
  2478. // and client properties. As sshClient.trafficRules may be reset by a concurrent
  2479. // goroutine, trafficRules must only be accessed within the sshClient mutex.
  2480. func (sshClient *sshClient) setTrafficRules() (int64, int64) {
  2481. sshClient.Lock()
  2482. defer sshClient.Unlock()
  2483. isFirstTunnelInSession := sshClient.isFirstTunnelInSession &&
  2484. sshClient.handshakeState.establishedTunnelsCount == 0
  2485. sshClient.trafficRules = sshClient.sshServer.support.TrafficRulesSet.GetTrafficRules(
  2486. isFirstTunnelInSession,
  2487. sshClient.tunnelProtocol,
  2488. sshClient.geoIPData,
  2489. sshClient.handshakeState)
  2490. if sshClient.throttledConn != nil {
  2491. // Any existing throttling state is reset.
  2492. sshClient.throttledConn.SetLimits(
  2493. sshClient.trafficRules.RateLimits.CommonRateLimits())
  2494. }
  2495. return *sshClient.trafficRules.RateLimits.ReadBytesPerSecond,
  2496. *sshClient.trafficRules.RateLimits.WriteBytesPerSecond
  2497. }
  2498. // setOSLConfig resets the client's OSL seed state based on the latest OSL config
  2499. // As sshClient.oslClientSeedState may be reset by a concurrent goroutine,
  2500. // oslClientSeedState must only be accessed within the sshClient mutex.
  2501. func (sshClient *sshClient) setOSLConfig() {
  2502. sshClient.Lock()
  2503. defer sshClient.Unlock()
  2504. propagationChannelID, err := getStringRequestParam(
  2505. sshClient.handshakeState.apiParams, "propagation_channel_id")
  2506. if err != nil {
  2507. // This should not fail as long as client has sent valid handshake
  2508. return
  2509. }
  2510. // Use a cached seed state if one is found for the client's
  2511. // session ID. This enables resuming progress made in a previous
  2512. // tunnel.
  2513. // Note: go-cache is already concurency safe; the additional mutex
  2514. // is necessary to guarantee that Get/Delete is atomic; although in
  2515. // practice no two concurrent clients should ever supply the same
  2516. // session ID.
  2517. sshClient.sshServer.oslSessionCacheMutex.Lock()
  2518. oslClientSeedState, found := sshClient.sshServer.oslSessionCache.Get(sshClient.sessionID)
  2519. if found {
  2520. sshClient.sshServer.oslSessionCache.Delete(sshClient.sessionID)
  2521. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  2522. sshClient.oslClientSeedState = oslClientSeedState.(*osl.ClientSeedState)
  2523. sshClient.oslClientSeedState.Resume(sshClient.signalIssueSLOKs)
  2524. return
  2525. }
  2526. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  2527. // Two limitations when setOSLConfig() is invoked due to an
  2528. // OSL config hot reload:
  2529. //
  2530. // 1. any partial progress towards SLOKs is lost.
  2531. //
  2532. // 2. all existing osl.ClientSeedPortForwards for existing
  2533. // port forwards will not send progress to the new client
  2534. // seed state.
  2535. sshClient.oslClientSeedState = sshClient.sshServer.support.OSLConfig.NewClientSeedState(
  2536. sshClient.geoIPData.Country,
  2537. propagationChannelID,
  2538. sshClient.signalIssueSLOKs)
  2539. }
  2540. // newClientSeedPortForward will return nil when no seeding is
  2541. // associated with the specified ipAddress.
  2542. func (sshClient *sshClient) newClientSeedPortForward(ipAddress net.IP) *osl.ClientSeedPortForward {
  2543. sshClient.Lock()
  2544. defer sshClient.Unlock()
  2545. // Will not be initialized before handshake.
  2546. if sshClient.oslClientSeedState == nil {
  2547. return nil
  2548. }
  2549. return sshClient.oslClientSeedState.NewClientSeedPortForward(ipAddress)
  2550. }
  2551. // getOSLSeedPayload returns a payload containing all seeded SLOKs for
  2552. // this client's session.
  2553. func (sshClient *sshClient) getOSLSeedPayload() *osl.SeedPayload {
  2554. sshClient.Lock()
  2555. defer sshClient.Unlock()
  2556. // Will not be initialized before handshake.
  2557. if sshClient.oslClientSeedState == nil {
  2558. return &osl.SeedPayload{SLOKs: make([]*osl.SLOK, 0)}
  2559. }
  2560. return sshClient.oslClientSeedState.GetSeedPayload()
  2561. }
  2562. func (sshClient *sshClient) clearOSLSeedPayload() {
  2563. sshClient.Lock()
  2564. defer sshClient.Unlock()
  2565. sshClient.oslClientSeedState.ClearSeedPayload()
  2566. }
  2567. func (sshClient *sshClient) rateLimits() common.RateLimits {
  2568. sshClient.Lock()
  2569. defer sshClient.Unlock()
  2570. return sshClient.trafficRules.RateLimits.CommonRateLimits()
  2571. }
  2572. func (sshClient *sshClient) idleTCPPortForwardTimeout() time.Duration {
  2573. sshClient.Lock()
  2574. defer sshClient.Unlock()
  2575. return time.Duration(*sshClient.trafficRules.IdleTCPPortForwardTimeoutMilliseconds) * time.Millisecond
  2576. }
  2577. func (sshClient *sshClient) idleUDPPortForwardTimeout() time.Duration {
  2578. sshClient.Lock()
  2579. defer sshClient.Unlock()
  2580. return time.Duration(*sshClient.trafficRules.IdleUDPPortForwardTimeoutMilliseconds) * time.Millisecond
  2581. }
  2582. func (sshClient *sshClient) setTCPPortForwardDialingAvailableSignal(signal context.CancelFunc) {
  2583. sshClient.Lock()
  2584. defer sshClient.Unlock()
  2585. sshClient.tcpPortForwardDialingAvailableSignal = signal
  2586. }
  2587. const (
  2588. portForwardTypeTCP = iota
  2589. portForwardTypeUDP
  2590. )
  2591. func (sshClient *sshClient) isPortForwardPermitted(
  2592. portForwardType int,
  2593. remoteIP net.IP,
  2594. port int) bool {
  2595. // Disallow connection to bogons.
  2596. //
  2597. // As a security measure, this is a failsafe. The server should be run on a
  2598. // host with correctly configured firewall rules.
  2599. //
  2600. // This check also avoids spurious disallowed traffic alerts for destinations
  2601. // that are impossible to reach.
  2602. if !sshClient.sshServer.support.Config.AllowBogons && common.IsBogon(remoteIP) {
  2603. return false
  2604. }
  2605. // Blocklist check.
  2606. //
  2607. // Limitation: isPortForwardPermitted is not called in transparent DNS
  2608. // forwarding cases. As the destination IP address is rewritten in these
  2609. // cases, a blocklist entry won't be dialed in any case. However, no logs
  2610. // will be recorded.
  2611. if !sshClient.isIPPermitted(remoteIP) {
  2612. return false
  2613. }
  2614. // Don't lock before calling logBlocklistHits.
  2615. // Unlock before calling enqueueDisallowedTrafficAlertRequest/log.
  2616. sshClient.Lock()
  2617. allowed := true
  2618. // Client must complete handshake before port forwards are permitted.
  2619. if !sshClient.handshakeState.completed {
  2620. allowed = false
  2621. }
  2622. if allowed {
  2623. // Traffic rules checks.
  2624. switch portForwardType {
  2625. case portForwardTypeTCP:
  2626. if !sshClient.trafficRules.AllowTCPPort(remoteIP, port) {
  2627. allowed = false
  2628. }
  2629. case portForwardTypeUDP:
  2630. if !sshClient.trafficRules.AllowUDPPort(remoteIP, port) {
  2631. allowed = false
  2632. }
  2633. }
  2634. }
  2635. sshClient.Unlock()
  2636. if allowed {
  2637. return true
  2638. }
  2639. switch portForwardType {
  2640. case portForwardTypeTCP:
  2641. sshClient.updateQualityMetricsWithTCPRejectedDisallowed()
  2642. case portForwardTypeUDP:
  2643. sshClient.updateQualityMetricsWithUDPRejectedDisallowed()
  2644. }
  2645. sshClient.enqueueDisallowedTrafficAlertRequest()
  2646. log.WithTraceFields(
  2647. LogFields{
  2648. "type": portForwardType,
  2649. "port": port,
  2650. }).Debug("port forward denied by traffic rules")
  2651. return false
  2652. }
  2653. // isDomainPermitted returns true when the specified domain may be resolved
  2654. // and returns false and a reject reason otherwise.
  2655. func (sshClient *sshClient) isDomainPermitted(domain string) (bool, string) {
  2656. // We're not doing comprehensive validation, to avoid overhead per port
  2657. // forward. This is a simple sanity check to ensure we don't process
  2658. // blantantly invalid input.
  2659. //
  2660. // TODO: validate with dns.IsDomainName?
  2661. if len(domain) > 255 {
  2662. return false, "invalid domain name"
  2663. }
  2664. tags := sshClient.sshServer.support.Blocklist.LookupDomain(domain)
  2665. if len(tags) > 0 {
  2666. sshClient.logBlocklistHits(nil, domain, tags)
  2667. if sshClient.sshServer.support.Config.BlocklistActive {
  2668. // Actively alert and block
  2669. sshClient.enqueueUnsafeTrafficAlertRequest(tags)
  2670. return false, "port forward not permitted"
  2671. }
  2672. }
  2673. return true, ""
  2674. }
  2675. func (sshClient *sshClient) isIPPermitted(remoteIP net.IP) bool {
  2676. tags := sshClient.sshServer.support.Blocklist.LookupIP(remoteIP)
  2677. if len(tags) > 0 {
  2678. sshClient.logBlocklistHits(remoteIP, "", tags)
  2679. if sshClient.sshServer.support.Config.BlocklistActive {
  2680. // Actively alert and block
  2681. sshClient.enqueueUnsafeTrafficAlertRequest(tags)
  2682. return false
  2683. }
  2684. }
  2685. return true
  2686. }
  2687. func (sshClient *sshClient) isTCPDialingPortForwardLimitExceeded() bool {
  2688. sshClient.Lock()
  2689. defer sshClient.Unlock()
  2690. state := &sshClient.tcpTrafficState
  2691. max := *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  2692. if max > 0 && state.concurrentDialingPortForwardCount >= int64(max) {
  2693. return true
  2694. }
  2695. return false
  2696. }
  2697. func (sshClient *sshClient) getTCPPortForwardQueueSize() int {
  2698. sshClient.Lock()
  2699. defer sshClient.Unlock()
  2700. return *sshClient.trafficRules.MaxTCPPortForwardCount +
  2701. *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  2702. }
  2703. func (sshClient *sshClient) getDialTCPPortForwardTimeoutMilliseconds() int {
  2704. sshClient.Lock()
  2705. defer sshClient.Unlock()
  2706. return *sshClient.trafficRules.DialTCPPortForwardTimeoutMilliseconds
  2707. }
  2708. func (sshClient *sshClient) dialingTCPPortForward() {
  2709. sshClient.Lock()
  2710. defer sshClient.Unlock()
  2711. state := &sshClient.tcpTrafficState
  2712. state.concurrentDialingPortForwardCount += 1
  2713. if state.concurrentDialingPortForwardCount > state.peakConcurrentDialingPortForwardCount {
  2714. state.peakConcurrentDialingPortForwardCount = state.concurrentDialingPortForwardCount
  2715. }
  2716. }
  2717. func (sshClient *sshClient) abortedTCPPortForward() {
  2718. sshClient.Lock()
  2719. defer sshClient.Unlock()
  2720. sshClient.tcpTrafficState.concurrentDialingPortForwardCount -= 1
  2721. }
  2722. func (sshClient *sshClient) allocatePortForward(portForwardType int) bool {
  2723. sshClient.Lock()
  2724. defer sshClient.Unlock()
  2725. // Check if at port forward limit. The subsequent counter
  2726. // changes must be atomic with the limit check to ensure
  2727. // the counter never exceeds the limit in the case of
  2728. // concurrent allocations.
  2729. var max int
  2730. var state *trafficState
  2731. if portForwardType == portForwardTypeTCP {
  2732. max = *sshClient.trafficRules.MaxTCPPortForwardCount
  2733. state = &sshClient.tcpTrafficState
  2734. } else {
  2735. max = *sshClient.trafficRules.MaxUDPPortForwardCount
  2736. state = &sshClient.udpTrafficState
  2737. }
  2738. if max > 0 && state.concurrentPortForwardCount >= int64(max) {
  2739. return false
  2740. }
  2741. // Update port forward counters.
  2742. if portForwardType == portForwardTypeTCP {
  2743. // Assumes TCP port forwards called dialingTCPPortForward
  2744. state.concurrentDialingPortForwardCount -= 1
  2745. if sshClient.tcpPortForwardDialingAvailableSignal != nil {
  2746. max := *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  2747. if max <= 0 || state.concurrentDialingPortForwardCount < int64(max) {
  2748. sshClient.tcpPortForwardDialingAvailableSignal()
  2749. }
  2750. }
  2751. }
  2752. state.concurrentPortForwardCount += 1
  2753. if state.concurrentPortForwardCount > state.peakConcurrentPortForwardCount {
  2754. state.peakConcurrentPortForwardCount = state.concurrentPortForwardCount
  2755. }
  2756. state.totalPortForwardCount += 1
  2757. return true
  2758. }
  2759. // establishedPortForward increments the concurrent port
  2760. // forward counter. closedPortForward decrements it, so it
  2761. // must always be called for each establishedPortForward
  2762. // call.
  2763. //
  2764. // When at the limit of established port forwards, the LRU
  2765. // existing port forward is closed to make way for the newly
  2766. // established one. There can be a minor delay as, in addition
  2767. // to calling Close() on the port forward net.Conn,
  2768. // establishedPortForward waits for the LRU's closedPortForward()
  2769. // call which will decrement the concurrent counter. This
  2770. // ensures all resources associated with the LRU (socket,
  2771. // goroutine) are released or will very soon be released before
  2772. // proceeding.
  2773. func (sshClient *sshClient) establishedPortForward(
  2774. portForwardType int, portForwardLRU *common.LRUConns) {
  2775. // Do not lock sshClient here.
  2776. var state *trafficState
  2777. if portForwardType == portForwardTypeTCP {
  2778. state = &sshClient.tcpTrafficState
  2779. } else {
  2780. state = &sshClient.udpTrafficState
  2781. }
  2782. // When the maximum number of port forwards is already
  2783. // established, close the LRU. CloseOldest will call
  2784. // Close on the port forward net.Conn. Both TCP and
  2785. // UDP port forwards have handler goroutines that may
  2786. // be blocked calling Read on the net.Conn. Close will
  2787. // eventually interrupt the Read and cause the handlers
  2788. // to exit, but not immediately. So the following logic
  2789. // waits for a LRU handler to be interrupted and signal
  2790. // availability.
  2791. //
  2792. // Notes:
  2793. //
  2794. // - the port forward limit can change via a traffic
  2795. // rules hot reload; the condition variable handles
  2796. // this case whereas a channel-based semaphore would
  2797. // not.
  2798. //
  2799. // - if a number of goroutines exceeding the total limit
  2800. // arrive here all concurrently, some CloseOldest() calls
  2801. // will have no effect as there can be less existing port
  2802. // forwards than new ones. In this case, the new port
  2803. // forward will be delayed. This is highly unlikely in
  2804. // practise since UDP calls to establishedPortForward are
  2805. // serialized and TCP calls are limited by the dial
  2806. // queue/count.
  2807. if !sshClient.allocatePortForward(portForwardType) {
  2808. portForwardLRU.CloseOldest()
  2809. log.WithTrace().Debug("closed LRU port forward")
  2810. state.availablePortForwardCond.L.Lock()
  2811. for !sshClient.allocatePortForward(portForwardType) {
  2812. state.availablePortForwardCond.Wait()
  2813. }
  2814. state.availablePortForwardCond.L.Unlock()
  2815. }
  2816. }
  2817. func (sshClient *sshClient) closedPortForward(
  2818. portForwardType int, bytesUp, bytesDown int64) {
  2819. sshClient.Lock()
  2820. var state *trafficState
  2821. if portForwardType == portForwardTypeTCP {
  2822. state = &sshClient.tcpTrafficState
  2823. } else {
  2824. state = &sshClient.udpTrafficState
  2825. }
  2826. state.concurrentPortForwardCount -= 1
  2827. state.bytesUp += bytesUp
  2828. state.bytesDown += bytesDown
  2829. sshClient.Unlock()
  2830. // Signal any goroutine waiting in establishedPortForward
  2831. // that an established port forward slot is available.
  2832. state.availablePortForwardCond.Signal()
  2833. }
  2834. func (sshClient *sshClient) updateQualityMetricsWithDialResult(
  2835. tcpPortForwardDialSuccess bool, dialDuration time.Duration, IP net.IP) {
  2836. sshClient.Lock()
  2837. defer sshClient.Unlock()
  2838. if tcpPortForwardDialSuccess {
  2839. sshClient.qualityMetrics.TCPPortForwardDialedCount += 1
  2840. sshClient.qualityMetrics.TCPPortForwardDialedDuration += dialDuration
  2841. if IP.To4() != nil {
  2842. sshClient.qualityMetrics.TCPIPv4PortForwardDialedCount += 1
  2843. sshClient.qualityMetrics.TCPIPv4PortForwardDialedDuration += dialDuration
  2844. } else if IP != nil {
  2845. sshClient.qualityMetrics.TCPIPv6PortForwardDialedCount += 1
  2846. sshClient.qualityMetrics.TCPIPv6PortForwardDialedDuration += dialDuration
  2847. }
  2848. } else {
  2849. sshClient.qualityMetrics.TCPPortForwardFailedCount += 1
  2850. sshClient.qualityMetrics.TCPPortForwardFailedDuration += dialDuration
  2851. if IP.To4() != nil {
  2852. sshClient.qualityMetrics.TCPIPv4PortForwardFailedCount += 1
  2853. sshClient.qualityMetrics.TCPIPv4PortForwardFailedDuration += dialDuration
  2854. } else if IP != nil {
  2855. sshClient.qualityMetrics.TCPIPv6PortForwardFailedCount += 1
  2856. sshClient.qualityMetrics.TCPIPv6PortForwardFailedDuration += dialDuration
  2857. }
  2858. }
  2859. }
  2860. func (sshClient *sshClient) updateQualityMetricsWithRejectedDialingLimit() {
  2861. sshClient.Lock()
  2862. defer sshClient.Unlock()
  2863. sshClient.qualityMetrics.TCPPortForwardRejectedDialingLimitCount += 1
  2864. }
  2865. func (sshClient *sshClient) updateQualityMetricsWithTCPRejectedDisallowed() {
  2866. sshClient.Lock()
  2867. defer sshClient.Unlock()
  2868. sshClient.qualityMetrics.TCPPortForwardRejectedDisallowedCount += 1
  2869. }
  2870. func (sshClient *sshClient) updateQualityMetricsWithUDPRejectedDisallowed() {
  2871. sshClient.Lock()
  2872. defer sshClient.Unlock()
  2873. sshClient.qualityMetrics.UDPPortForwardRejectedDisallowedCount += 1
  2874. }
  2875. func (sshClient *sshClient) handleTCPChannel(
  2876. remainingDialTimeout time.Duration,
  2877. hostToConnect string,
  2878. portToConnect int,
  2879. doSplitTunnel bool,
  2880. newChannel ssh.NewChannel) {
  2881. // Assumptions:
  2882. // - sshClient.dialingTCPPortForward() has been called
  2883. // - remainingDialTimeout > 0
  2884. established := false
  2885. defer func() {
  2886. if !established {
  2887. sshClient.abortedTCPPortForward()
  2888. }
  2889. }()
  2890. // Transparently redirect web API request connections.
  2891. isWebServerPortForward := false
  2892. config := sshClient.sshServer.support.Config
  2893. if config.WebServerPortForwardAddress != "" {
  2894. destination := net.JoinHostPort(hostToConnect, strconv.Itoa(portToConnect))
  2895. if destination == config.WebServerPortForwardAddress {
  2896. isWebServerPortForward = true
  2897. if config.WebServerPortForwardRedirectAddress != "" {
  2898. // Note: redirect format is validated when config is loaded
  2899. host, portStr, _ := net.SplitHostPort(config.WebServerPortForwardRedirectAddress)
  2900. port, _ := strconv.Atoi(portStr)
  2901. hostToConnect = host
  2902. portToConnect = port
  2903. }
  2904. }
  2905. }
  2906. // Validate the domain name and check the domain blocklist before dialing.
  2907. //
  2908. // The IP blocklist is checked in isPortForwardPermitted, which also provides
  2909. // IP blocklist checking for the packet tunnel code path. When hostToConnect
  2910. // is an IP address, the following hostname resolution step effectively
  2911. // performs no actions and next immediate step is the isPortForwardPermitted
  2912. // check.
  2913. //
  2914. // Limitation: this case handles port forwards where the client sends the
  2915. // destination domain in the SSH port forward request but does not currently
  2916. // handle DNS-over-TCP; in the DNS-over-TCP case, a client may bypass the
  2917. // block list check.
  2918. if !isWebServerPortForward &&
  2919. net.ParseIP(hostToConnect) == nil {
  2920. ok, rejectMessage := sshClient.isDomainPermitted(hostToConnect)
  2921. if !ok {
  2922. // Note: not recording a port forward failure in this case
  2923. sshClient.rejectNewChannel(newChannel, rejectMessage)
  2924. return
  2925. }
  2926. }
  2927. // Dial the remote address.
  2928. //
  2929. // Hostname resolution is performed explicitly, as a separate step, as the
  2930. // target IP address is used for traffic rules (AllowSubnets), OSL seed
  2931. // progress, and IP address blocklists.
  2932. //
  2933. // Contexts are used for cancellation (via sshClient.runCtx, which is
  2934. // cancelled when the client is stopping) and timeouts.
  2935. dialStartTime := time.Now()
  2936. log.WithTraceFields(LogFields{"hostToConnect": hostToConnect}).Debug("resolving")
  2937. ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  2938. IPs, err := (&net.Resolver{}).LookupIPAddr(ctx, hostToConnect)
  2939. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  2940. // IPv4 is preferred in case the host has limited IPv6 routing. IPv6 is
  2941. // selected and attempted only when there's no IPv4 option.
  2942. // TODO: shuffle list to try other IPs?
  2943. var IP net.IP
  2944. for _, ip := range IPs {
  2945. if ip.IP.To4() != nil {
  2946. IP = ip.IP
  2947. break
  2948. }
  2949. }
  2950. if IP == nil && len(IPs) > 0 {
  2951. // If there are no IPv4 IPs, the first IP is IPv6.
  2952. IP = IPs[0].IP
  2953. }
  2954. if err == nil && IP == nil {
  2955. err = std_errors.New("no IP address")
  2956. }
  2957. resolveElapsedTime := time.Since(dialStartTime)
  2958. if err != nil {
  2959. // Record a port forward failure
  2960. sshClient.updateQualityMetricsWithDialResult(false, resolveElapsedTime, IP)
  2961. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("LookupIP failed: %s", err))
  2962. return
  2963. }
  2964. remainingDialTimeout -= resolveElapsedTime
  2965. if remainingDialTimeout <= 0 {
  2966. sshClient.rejectNewChannel(newChannel, "TCP port forward timed out resolving")
  2967. return
  2968. }
  2969. // When the client has indicated split tunnel mode and when the channel is
  2970. // not of type protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE, check if the
  2971. // client and the port forward destination are in the same GeoIP country. If
  2972. // so, reject the port forward with a distinct response code that indicates
  2973. // to the client that this port forward should be performed locally, direct
  2974. // and untunneled.
  2975. //
  2976. // Clients are expected to cache untunneled responses to avoid this round
  2977. // trip in the immediate future and reduce server load.
  2978. //
  2979. // When the countries differ, immediately proceed with the standard port
  2980. // forward. No additional round trip is required.
  2981. //
  2982. // If either GeoIP country is "None", one or both countries are unknown
  2983. // and there is no match.
  2984. //
  2985. // Traffic rules, such as allowed ports, are not enforced for port forward
  2986. // destinations classified as untunneled.
  2987. //
  2988. // Domain and IP blocklists still apply to port forward destinations
  2989. // classified as untunneled.
  2990. //
  2991. // The client's use of split tunnel mode is logged in server_tunnel metrics
  2992. // as the boolean value split_tunnel. As they may indicate some information
  2993. // about browsing activity, no other split tunnel metrics are logged.
  2994. if doSplitTunnel {
  2995. destinationGeoIPData := sshClient.sshServer.support.GeoIPService.LookupIP(IP, false)
  2996. if destinationGeoIPData.Country == sshClient.geoIPData.Country &&
  2997. sshClient.geoIPData.Country != GEOIP_UNKNOWN_VALUE {
  2998. // Since isPortForwardPermitted is not called in this case, explicitly call
  2999. // ipBlocklistCheck. The domain blocklist case is handled above.
  3000. if !sshClient.isIPPermitted(IP) {
  3001. // Note: not recording a port forward failure in this case
  3002. sshClient.rejectNewChannel(newChannel, "port forward not permitted")
  3003. }
  3004. newChannel.Reject(protocol.CHANNEL_REJECT_REASON_SPLIT_TUNNEL, "")
  3005. }
  3006. }
  3007. // Enforce traffic rules, using the resolved IP address.
  3008. if !isWebServerPortForward &&
  3009. !sshClient.isPortForwardPermitted(
  3010. portForwardTypeTCP,
  3011. IP,
  3012. portToConnect) {
  3013. // Note: not recording a port forward failure in this case
  3014. sshClient.rejectNewChannel(newChannel, "port forward not permitted")
  3015. return
  3016. }
  3017. // TCP dial.
  3018. remoteAddr := net.JoinHostPort(IP.String(), strconv.Itoa(portToConnect))
  3019. log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("dialing")
  3020. ctx, cancelCtx = context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  3021. fwdConn, err := (&net.Dialer{}).DialContext(ctx, "tcp", remoteAddr)
  3022. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  3023. // Record port forward success or failure
  3024. sshClient.updateQualityMetricsWithDialResult(err == nil, time.Since(dialStartTime), IP)
  3025. if err != nil {
  3026. // Monitor for low resource error conditions
  3027. sshClient.sshServer.monitorPortForwardDialError(err)
  3028. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("DialTimeout failed: %s", err))
  3029. return
  3030. }
  3031. // The upstream TCP port forward connection has been established. Schedule
  3032. // some cleanup and notify the SSH client that the channel is accepted.
  3033. defer fwdConn.Close()
  3034. fwdChannel, requests, err := newChannel.Accept()
  3035. if err != nil {
  3036. if !isExpectedTunnelIOError(err) {
  3037. log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")
  3038. }
  3039. return
  3040. }
  3041. go ssh.DiscardRequests(requests)
  3042. defer fwdChannel.Close()
  3043. // Release the dialing slot and acquire an established slot.
  3044. //
  3045. // establishedPortForward increments the concurrent TCP port
  3046. // forward counter and closes the LRU existing TCP port forward
  3047. // when already at the limit.
  3048. //
  3049. // Known limitations:
  3050. //
  3051. // - Closed LRU TCP sockets will enter the TIME_WAIT state,
  3052. // continuing to consume some resources.
  3053. sshClient.establishedPortForward(portForwardTypeTCP, sshClient.tcpPortForwardLRU)
  3054. // "established = true" cancels the deferred abortedTCPPortForward()
  3055. established = true
  3056. // TODO: 64-bit alignment? https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  3057. var bytesUp, bytesDown int64
  3058. defer func() {
  3059. sshClient.closedPortForward(
  3060. portForwardTypeTCP, atomic.LoadInt64(&bytesUp), atomic.LoadInt64(&bytesDown))
  3061. }()
  3062. lruEntry := sshClient.tcpPortForwardLRU.Add(fwdConn)
  3063. defer lruEntry.Remove()
  3064. // ActivityMonitoredConn monitors the TCP port forward I/O and updates
  3065. // its LRU status. ActivityMonitoredConn also times out I/O on the port
  3066. // forward if both reads and writes have been idle for the specified
  3067. // duration.
  3068. // Ensure nil interface if newClientSeedPortForward returns nil
  3069. var updater common.ActivityUpdater
  3070. seedUpdater := sshClient.newClientSeedPortForward(IP)
  3071. if seedUpdater != nil {
  3072. updater = seedUpdater
  3073. }
  3074. fwdConn, err = common.NewActivityMonitoredConn(
  3075. fwdConn,
  3076. sshClient.idleTCPPortForwardTimeout(),
  3077. true,
  3078. updater,
  3079. lruEntry)
  3080. if err != nil {
  3081. log.WithTraceFields(LogFields{"error": err}).Error("NewActivityMonitoredConn failed")
  3082. return
  3083. }
  3084. // Relay channel to forwarded connection.
  3085. log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("relaying")
  3086. // TODO: relay errors to fwdChannel.Stderr()?
  3087. relayWaitGroup := new(sync.WaitGroup)
  3088. relayWaitGroup.Add(1)
  3089. go func() {
  3090. defer relayWaitGroup.Done()
  3091. // io.Copy allocates a 32K temporary buffer, and each port forward relay
  3092. // uses two of these buffers; using common.CopyBuffer with a smaller buffer
  3093. // reduces the overall memory footprint.
  3094. bytes, err := common.CopyBuffer(
  3095. fwdChannel, fwdConn, make([]byte, SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE))
  3096. atomic.AddInt64(&bytesDown, bytes)
  3097. if err != nil && err != io.EOF {
  3098. // Debug since errors such as "connection reset by peer" occur during normal operation
  3099. log.WithTraceFields(LogFields{"error": err}).Debug("downstream TCP relay failed")
  3100. }
  3101. // Interrupt upstream io.Copy when downstream is shutting down.
  3102. // TODO: this is done to quickly cleanup the port forward when
  3103. // fwdConn has a read timeout, but is it clean -- upstream may still
  3104. // be flowing?
  3105. fwdChannel.Close()
  3106. }()
  3107. bytes, err := common.CopyBuffer(
  3108. fwdConn, fwdChannel, make([]byte, SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE))
  3109. atomic.AddInt64(&bytesUp, bytes)
  3110. if err != nil && err != io.EOF {
  3111. log.WithTraceFields(LogFields{"error": err}).Debug("upstream TCP relay failed")
  3112. }
  3113. // Shutdown special case: fwdChannel will be closed and return EOF when
  3114. // the SSH connection is closed, but we need to explicitly close fwdConn
  3115. // to interrupt the downstream io.Copy, which may be blocked on a
  3116. // fwdConn.Read().
  3117. fwdConn.Close()
  3118. relayWaitGroup.Wait()
  3119. log.WithTraceFields(
  3120. LogFields{
  3121. "remoteAddr": remoteAddr,
  3122. "bytesUp": atomic.LoadInt64(&bytesUp),
  3123. "bytesDown": atomic.LoadInt64(&bytesDown)}).Debug("exiting")
  3124. }