tunnelServer.go 153 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594
  1. /*
  2. * Copyright (c) 2016, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "bytes"
  22. "context"
  23. "crypto/rand"
  24. "crypto/subtle"
  25. "encoding/base64"
  26. "encoding/json"
  27. std_errors "errors"
  28. "fmt"
  29. "io"
  30. "io/ioutil"
  31. "net"
  32. "strconv"
  33. "sync"
  34. "sync/atomic"
  35. "syscall"
  36. "time"
  37. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/accesscontrol"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/ssh"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/monotime"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/osl"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  45. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  46. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  47. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  48. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/refraction"
  49. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
  50. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/transforms"
  51. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tun"
  52. "github.com/marusama/semaphore"
  53. cache "github.com/patrickmn/go-cache"
  54. )
  55. const (
  56. SSH_AUTH_LOG_PERIOD = 30 * time.Minute
  57. SSH_HANDSHAKE_TIMEOUT = 30 * time.Second
  58. SSH_BEGIN_HANDSHAKE_TIMEOUT = 1 * time.Second
  59. SSH_CONNECTION_READ_DEADLINE = 5 * time.Minute
  60. SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE = 8192
  61. SSH_TCP_PORT_FORWARD_QUEUE_SIZE = 1024
  62. SSH_KEEP_ALIVE_PAYLOAD_MIN_BYTES = 0
  63. SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES = 256
  64. SSH_SEND_OSL_INITIAL_RETRY_DELAY = 30 * time.Second
  65. SSH_SEND_OSL_RETRY_FACTOR = 2
  66. OSL_SESSION_CACHE_TTL = 5 * time.Minute
  67. MAX_AUTHORIZATIONS = 16
  68. PRE_HANDSHAKE_RANDOM_STREAM_MAX_COUNT = 1
  69. RANDOM_STREAM_MAX_BYTES = 10485760
  70. ALERT_REQUEST_QUEUE_BUFFER_SIZE = 16
  71. )
  72. // TunnelServer is the main server that accepts Psiphon client
  73. // connections, via various obfuscation protocols, and provides
  74. // port forwarding (TCP and UDP) services to the Psiphon client.
  75. // At its core, TunnelServer is an SSH server. SSH is the base
  76. // protocol that provides port forward multiplexing, and transport
  77. // security. Layered on top of SSH, optionally, is Obfuscated SSH
  78. // and meek protocols, which provide further circumvention
  79. // capabilities.
  80. type TunnelServer struct {
  81. runWaitGroup *sync.WaitGroup
  82. listenerError chan error
  83. shutdownBroadcast <-chan struct{}
  84. sshServer *sshServer
  85. }
  86. type sshListener struct {
  87. net.Listener
  88. localAddress string
  89. tunnelProtocol string
  90. port int
  91. BPFProgramName string
  92. }
  93. // NewTunnelServer initializes a new tunnel server.
  94. func NewTunnelServer(
  95. support *SupportServices,
  96. shutdownBroadcast <-chan struct{}) (*TunnelServer, error) {
  97. sshServer, err := newSSHServer(support, shutdownBroadcast)
  98. if err != nil {
  99. return nil, errors.Trace(err)
  100. }
  101. return &TunnelServer{
  102. runWaitGroup: new(sync.WaitGroup),
  103. listenerError: make(chan error),
  104. shutdownBroadcast: shutdownBroadcast,
  105. sshServer: sshServer,
  106. }, nil
  107. }
  108. // Run runs the tunnel server; this function blocks while running a selection of
  109. // listeners that handle connections using various obfuscation protocols.
  110. //
  111. // Run listens on each designated tunnel port and spawns new goroutines to handle
  112. // each client connection. It halts when shutdownBroadcast is signaled. A list of active
  113. // clients is maintained, and when halting all clients are cleanly shutdown.
  114. //
  115. // Each client goroutine handles its own obfuscation (optional), SSH handshake, SSH
  116. // authentication, and then looping on client new channel requests. "direct-tcpip"
  117. // channels, dynamic port fowards, are supported. When the UDPInterceptUdpgwServerAddress
  118. // config parameter is configured, UDP port forwards over a TCP stream, following
  119. // the udpgw protocol, are handled.
  120. //
  121. // A new goroutine is spawned to handle each port forward for each client. Each port
  122. // forward tracks its bytes transferred. Overall per-client stats for connection duration,
  123. // GeoIP, number of port forwards, and bytes transferred are tracked and logged when the
  124. // client shuts down.
  125. //
  126. // Note: client handler goroutines may still be shutting down after Run() returns. See
  127. // comment in sshClient.stop(). TODO: fully synchronized shutdown.
  128. func (server *TunnelServer) Run() error {
  129. // TODO: should TunnelServer hold its own support pointer?
  130. support := server.sshServer.support
  131. // First bind all listeners; once all are successful,
  132. // start accepting connections on each.
  133. var listeners []*sshListener
  134. for tunnelProtocol, listenPort := range support.Config.TunnelProtocolPorts {
  135. localAddress := net.JoinHostPort(
  136. support.Config.ServerIPAddress, strconv.Itoa(listenPort))
  137. var listener net.Listener
  138. var BPFProgramName string
  139. var err error
  140. if protocol.TunnelProtocolUsesFrontedMeekQUIC(tunnelProtocol) {
  141. // For FRONTED-MEEK-QUIC-OSSH, no listener implemented. The edge-to-server
  142. // hop uses HTTPS and the client tunnel protocol is distinguished using
  143. // protocol.MeekCookieData.ClientTunnelProtocol.
  144. continue
  145. } else if protocol.TunnelProtocolUsesQUIC(tunnelProtocol) {
  146. logTunnelProtocol := tunnelProtocol
  147. listener, err = quic.Listen(
  148. CommonLogger(log),
  149. func(clientAddress string, err error, logFields common.LogFields) {
  150. logIrregularTunnel(
  151. support, logTunnelProtocol, listenPort, clientAddress,
  152. errors.Trace(err), LogFields(logFields))
  153. },
  154. localAddress,
  155. support.Config.ObfuscatedSSHKey,
  156. support.Config.EnableGQUIC)
  157. } else if protocol.TunnelProtocolUsesRefractionNetworking(tunnelProtocol) {
  158. listener, err = refraction.Listen(localAddress)
  159. } else if protocol.TunnelProtocolUsesFrontedMeek(tunnelProtocol) {
  160. listener, err = net.Listen("tcp", localAddress)
  161. } else {
  162. // Only direct, unfronted protocol listeners use TCP BPF circumvention
  163. // programs.
  164. listener, BPFProgramName, err = newTCPListenerWithBPF(support, localAddress)
  165. if protocol.TunnelProtocolUsesTLSOSSH(tunnelProtocol) {
  166. listener, err = ListenTLSTunnel(support, listener, tunnelProtocol, listenPort)
  167. if err != nil {
  168. return errors.Trace(err)
  169. }
  170. }
  171. }
  172. if err != nil {
  173. for _, existingListener := range listeners {
  174. existingListener.Listener.Close()
  175. }
  176. return errors.Trace(err)
  177. }
  178. tacticsListener := NewTacticsListener(
  179. support,
  180. listener,
  181. tunnelProtocol,
  182. func(IP string) GeoIPData { return support.GeoIPService.Lookup(IP) })
  183. log.WithTraceFields(
  184. LogFields{
  185. "localAddress": localAddress,
  186. "tunnelProtocol": tunnelProtocol,
  187. "BPFProgramName": BPFProgramName,
  188. }).Info("listening")
  189. listeners = append(
  190. listeners,
  191. &sshListener{
  192. Listener: tacticsListener,
  193. localAddress: localAddress,
  194. port: listenPort,
  195. tunnelProtocol: tunnelProtocol,
  196. BPFProgramName: BPFProgramName,
  197. })
  198. }
  199. for _, listener := range listeners {
  200. server.runWaitGroup.Add(1)
  201. go func(listener *sshListener) {
  202. defer server.runWaitGroup.Done()
  203. log.WithTraceFields(
  204. LogFields{
  205. "localAddress": listener.localAddress,
  206. "tunnelProtocol": listener.tunnelProtocol,
  207. }).Info("running")
  208. server.sshServer.runListener(
  209. listener,
  210. server.listenerError)
  211. log.WithTraceFields(
  212. LogFields{
  213. "localAddress": listener.localAddress,
  214. "tunnelProtocol": listener.tunnelProtocol,
  215. }).Info("stopped")
  216. }(listener)
  217. }
  218. var err error
  219. select {
  220. case <-server.shutdownBroadcast:
  221. case err = <-server.listenerError:
  222. }
  223. for _, listener := range listeners {
  224. listener.Close()
  225. }
  226. server.sshServer.stopClients()
  227. server.runWaitGroup.Wait()
  228. log.WithTrace().Info("stopped")
  229. return err
  230. }
  231. // GetLoadStats returns load stats for the tunnel server. The stats are
  232. // broken down by protocol ("SSH", "OSSH", etc.) and type. Types of stats
  233. // include current connected client count, total number of current port
  234. // forwards.
  235. func (server *TunnelServer) GetLoadStats() (
  236. UpstreamStats, ProtocolStats, RegionStats) {
  237. return server.sshServer.getLoadStats()
  238. }
  239. // GetEstablishedClientCount returns the number of currently established
  240. // clients.
  241. func (server *TunnelServer) GetEstablishedClientCount() int {
  242. return server.sshServer.getEstablishedClientCount()
  243. }
  244. // ResetAllClientTrafficRules resets all established client traffic rules
  245. // to use the latest config and client properties. Any existing traffic
  246. // rule state is lost, including throttling state.
  247. func (server *TunnelServer) ResetAllClientTrafficRules() {
  248. server.sshServer.resetAllClientTrafficRules()
  249. }
  250. // ResetAllClientOSLConfigs resets all established client OSL state to use
  251. // the latest OSL config. Any existing OSL state is lost, including partial
  252. // progress towards SLOKs.
  253. func (server *TunnelServer) ResetAllClientOSLConfigs() {
  254. server.sshServer.resetAllClientOSLConfigs()
  255. }
  256. // SetClientHandshakeState sets the handshake state -- that it completed and
  257. // what parameters were passed -- in sshClient. This state is used for allowing
  258. // port forwards and for future traffic rule selection. SetClientHandshakeState
  259. // also triggers an immediate traffic rule re-selection, as the rules selected
  260. // upon tunnel establishment may no longer apply now that handshake values are
  261. // set.
  262. //
  263. // The authorizations received from the client handshake are verified and the
  264. // resulting list of authorized access types are applied to the client's tunnel
  265. // and traffic rules.
  266. //
  267. // A list of active authorization IDs, authorized access types, and traffic
  268. // rate limits are returned for responding to the client and logging.
  269. func (server *TunnelServer) SetClientHandshakeState(
  270. sessionID string,
  271. state handshakeState,
  272. authorizations []string) (*handshakeStateInfo, error) {
  273. return server.sshServer.setClientHandshakeState(sessionID, state, authorizations)
  274. }
  275. // GetClientHandshaked indicates whether the client has completed a handshake
  276. // and whether its traffic rules are immediately exhausted.
  277. func (server *TunnelServer) GetClientHandshaked(
  278. sessionID string) (bool, bool, error) {
  279. return server.sshServer.getClientHandshaked(sessionID)
  280. }
  281. // GetClientDisableDiscovery indicates whether discovery is disabled for the
  282. // client corresponding to sessionID.
  283. func (server *TunnelServer) GetClientDisableDiscovery(
  284. sessionID string) (bool, error) {
  285. return server.sshServer.getClientDisableDiscovery(sessionID)
  286. }
  287. // UpdateClientAPIParameters updates the recorded handshake API parameters for
  288. // the client corresponding to sessionID.
  289. func (server *TunnelServer) UpdateClientAPIParameters(
  290. sessionID string,
  291. apiParams common.APIParameters) error {
  292. return server.sshServer.updateClientAPIParameters(sessionID, apiParams)
  293. }
  294. // AcceptClientDomainBytes indicates whether to accept domain bytes reported
  295. // by the client.
  296. func (server *TunnelServer) AcceptClientDomainBytes(
  297. sessionID string) (bool, error) {
  298. return server.sshServer.acceptClientDomainBytes(sessionID)
  299. }
  300. // SetEstablishTunnels sets whether new tunnels may be established or not.
  301. // When not establishing, incoming connections are immediately closed.
  302. func (server *TunnelServer) SetEstablishTunnels(establish bool) {
  303. server.sshServer.setEstablishTunnels(establish)
  304. }
  305. // CheckEstablishTunnels returns whether new tunnels may be established or
  306. // not, and increments a metrics counter when establishment is disallowed.
  307. func (server *TunnelServer) CheckEstablishTunnels() bool {
  308. return server.sshServer.checkEstablishTunnels()
  309. }
  310. // GetEstablishTunnelsMetrics returns whether tunnel establishment is
  311. // currently allowed and the number of tunnels rejected since due to not
  312. // establishing since the last GetEstablishTunnelsMetrics call.
  313. func (server *TunnelServer) GetEstablishTunnelsMetrics() (bool, int64) {
  314. return server.sshServer.getEstablishTunnelsMetrics()
  315. }
  316. type sshServer struct {
  317. // Note: 64-bit ints used with atomic operations are placed
  318. // at the start of struct to ensure 64-bit alignment.
  319. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  320. lastAuthLog int64
  321. authFailedCount int64
  322. establishLimitedCount int64
  323. support *SupportServices
  324. establishTunnels int32
  325. concurrentSSHHandshakes semaphore.Semaphore
  326. shutdownBroadcast <-chan struct{}
  327. sshHostKey ssh.Signer
  328. clientsMutex sync.Mutex
  329. stoppingClients bool
  330. acceptedClientCounts map[string]map[string]int64
  331. clients map[string]*sshClient
  332. oslSessionCacheMutex sync.Mutex
  333. oslSessionCache *cache.Cache
  334. authorizationSessionIDsMutex sync.Mutex
  335. authorizationSessionIDs map[string]string
  336. obfuscatorSeedHistory *obfuscator.SeedHistory
  337. }
  338. func newSSHServer(
  339. support *SupportServices,
  340. shutdownBroadcast <-chan struct{}) (*sshServer, error) {
  341. privateKey, err := ssh.ParseRawPrivateKey([]byte(support.Config.SSHPrivateKey))
  342. if err != nil {
  343. return nil, errors.Trace(err)
  344. }
  345. // TODO: use cert (ssh.NewCertSigner) for anti-fingerprint?
  346. signer, err := ssh.NewSignerFromKey(privateKey)
  347. if err != nil {
  348. return nil, errors.Trace(err)
  349. }
  350. var concurrentSSHHandshakes semaphore.Semaphore
  351. if support.Config.MaxConcurrentSSHHandshakes > 0 {
  352. concurrentSSHHandshakes = semaphore.New(support.Config.MaxConcurrentSSHHandshakes)
  353. }
  354. // The OSL session cache temporarily retains OSL seed state
  355. // progress for disconnected clients. This enables clients
  356. // that disconnect and immediately reconnect to the same
  357. // server to resume their OSL progress. Cached progress
  358. // is referenced by session ID and is retained for
  359. // OSL_SESSION_CACHE_TTL after disconnect.
  360. //
  361. // Note: session IDs are assumed to be unpredictable. If a
  362. // rogue client could guess the session ID of another client,
  363. // it could resume its OSL progress and, if the OSL config
  364. // were known, infer some activity.
  365. oslSessionCache := cache.New(OSL_SESSION_CACHE_TTL, 1*time.Minute)
  366. return &sshServer{
  367. support: support,
  368. establishTunnels: 1,
  369. concurrentSSHHandshakes: concurrentSSHHandshakes,
  370. shutdownBroadcast: shutdownBroadcast,
  371. sshHostKey: signer,
  372. acceptedClientCounts: make(map[string]map[string]int64),
  373. clients: make(map[string]*sshClient),
  374. oslSessionCache: oslSessionCache,
  375. authorizationSessionIDs: make(map[string]string),
  376. obfuscatorSeedHistory: obfuscator.NewSeedHistory(nil),
  377. }, nil
  378. }
  379. func (sshServer *sshServer) setEstablishTunnels(establish bool) {
  380. // Do nothing when the setting is already correct. This avoids
  381. // spurious log messages when setEstablishTunnels is called
  382. // periodically with the same setting.
  383. if establish == (atomic.LoadInt32(&sshServer.establishTunnels) == 1) {
  384. return
  385. }
  386. establishFlag := int32(1)
  387. if !establish {
  388. establishFlag = 0
  389. }
  390. atomic.StoreInt32(&sshServer.establishTunnels, establishFlag)
  391. log.WithTraceFields(
  392. LogFields{"establish": establish}).Info("establishing tunnels")
  393. }
  394. func (sshServer *sshServer) checkEstablishTunnels() bool {
  395. establishTunnels := atomic.LoadInt32(&sshServer.establishTunnels) == 1
  396. if !establishTunnels {
  397. atomic.AddInt64(&sshServer.establishLimitedCount, 1)
  398. }
  399. return establishTunnels
  400. }
  401. func (sshServer *sshServer) getEstablishTunnelsMetrics() (bool, int64) {
  402. return atomic.LoadInt32(&sshServer.establishTunnels) == 1,
  403. atomic.SwapInt64(&sshServer.establishLimitedCount, 0)
  404. }
  405. // runListener is intended to run an a goroutine; it blocks
  406. // running a particular listener. If an unrecoverable error
  407. // occurs, it will send the error to the listenerError channel.
  408. func (sshServer *sshServer) runListener(sshListener *sshListener, listenerError chan<- error) {
  409. handleClient := func(clientTunnelProtocol string, clientConn net.Conn) {
  410. // Note: establish tunnel limiter cannot simply stop TCP
  411. // listeners in all cases (e.g., meek) since SSH tunnels can
  412. // span multiple TCP connections.
  413. if !sshServer.checkEstablishTunnels() {
  414. log.WithTrace().Debug("not establishing tunnels")
  415. clientConn.Close()
  416. return
  417. }
  418. // tunnelProtocol is used for stats and traffic rules. In many cases, its
  419. // value is unambiguously determined by the listener port. In certain cases,
  420. // such as multiple fronted protocols with a single backend listener, the
  421. // client's reported tunnel protocol value is used. The caller must validate
  422. // clientTunnelProtocol with protocol.IsValidClientTunnelProtocol.
  423. tunnelProtocol := sshListener.tunnelProtocol
  424. if clientTunnelProtocol != "" {
  425. tunnelProtocol = clientTunnelProtocol
  426. }
  427. // sshListener.tunnelProtocol indictes the tunnel protocol run by the
  428. // listener. For direct protocols, this is also the client tunnel protocol.
  429. // For fronted protocols, the client may use a different protocol to connect
  430. // to the front and then only the front-to-Psiphon server will use the
  431. // listener protocol.
  432. //
  433. // A fronted meek client, for example, reports its first hop protocol in
  434. // protocol.MeekCookieData.ClientTunnelProtocol. Most metrics record this
  435. // value as relay_protocol, since the first hop is the one subject to
  436. // adversarial conditions. In some cases, such as irregular tunnels, there
  437. // is no ClientTunnelProtocol value available and the listener tunnel
  438. // protocol will be logged.
  439. //
  440. // Similarly, listenerPort indicates the listening port, which is the dialed
  441. // port number for direct protocols; while, for fronted protocols, the
  442. // client may dial a different port for its first hop.
  443. // Process each client connection concurrently.
  444. go sshServer.handleClient(sshListener, tunnelProtocol, clientConn)
  445. }
  446. // Note: when exiting due to a unrecoverable error, be sure
  447. // to try to send the error to listenerError so that the outer
  448. // TunnelServer.Run will properly shut down instead of remaining
  449. // running.
  450. if protocol.TunnelProtocolUsesMeekHTTP(sshListener.tunnelProtocol) || protocol.TunnelProtocolUsesMeekHTTPS(sshListener.tunnelProtocol) {
  451. if sshServer.tunnelProtocolUsesTLSDemux(sshListener.tunnelProtocol) {
  452. sshServer.runMeekTLSOSSHDemuxListener(sshListener, listenerError, handleClient)
  453. } else {
  454. meekServer, err := NewMeekServer(
  455. sshServer.support,
  456. sshListener.Listener,
  457. sshListener.tunnelProtocol,
  458. sshListener.port,
  459. protocol.TunnelProtocolUsesMeekHTTPS(sshListener.tunnelProtocol),
  460. protocol.TunnelProtocolUsesFrontedMeek(sshListener.tunnelProtocol),
  461. protocol.TunnelProtocolUsesObfuscatedSessionTickets(sshListener.tunnelProtocol),
  462. true,
  463. handleClient,
  464. sshServer.shutdownBroadcast)
  465. if err == nil {
  466. err = meekServer.Run()
  467. }
  468. if err != nil {
  469. select {
  470. case listenerError <- errors.Trace(err):
  471. default:
  472. }
  473. return
  474. }
  475. }
  476. } else {
  477. runListener(sshListener.Listener, sshServer.shutdownBroadcast, listenerError, "", handleClient)
  478. }
  479. }
  480. // runMeekTLSOSSHDemuxListener blocks running a listener which demuxes meek and
  481. // TLS-OSSH connections received on the same port.
  482. func (sshServer *sshServer) runMeekTLSOSSHDemuxListener(sshListener *sshListener, listenerError chan<- error, handleClient func(clientTunnelProtocol string, clientConn net.Conn)) {
  483. meekClassifier := protocolClassifier{
  484. minBytesToMatch: 4,
  485. maxBytesToMatch: 4,
  486. match: func(b []byte) bool {
  487. // NOTE: HTTP transforms are only applied to plain HTTP
  488. // meek so they are not a concern here.
  489. return bytes.Contains(b, []byte("POST"))
  490. },
  491. }
  492. tlsClassifier := protocolClassifier{
  493. // NOTE: technically +1 not needed if detectors are evaluated
  494. // in order by index in classifier array, which they are.
  495. minBytesToMatch: meekClassifier.maxBytesToMatch + 1,
  496. maxBytesToMatch: meekClassifier.maxBytesToMatch + 1,
  497. match: func(b []byte) bool {
  498. return len(b) > 4 // if not classified as meek, then tls
  499. },
  500. }
  501. listener, err := ListenTLSTunnel(sshServer.support, sshListener.Listener, sshListener.tunnelProtocol, sshListener.port)
  502. if err != nil {
  503. select {
  504. case listenerError <- errors.Trace(err):
  505. default:
  506. }
  507. return
  508. }
  509. mux, listeners := newProtocolDemux(context.Background(), listener, []protocolClassifier{meekClassifier, tlsClassifier}, sshServer.support.Config.sshHandshakeTimeout)
  510. var wg sync.WaitGroup
  511. wg.Add(1)
  512. go func() {
  513. // handle shutdown gracefully
  514. defer wg.Done()
  515. <-sshServer.shutdownBroadcast
  516. err := mux.Close()
  517. if err != nil {
  518. log.WithTraceFields(LogFields{"error": err}).Error("close failed")
  519. }
  520. }()
  521. wg.Add(1)
  522. go func() {
  523. // start demultiplexing TLS-OSSH and meek HTTPS connections
  524. defer wg.Done()
  525. err := mux.run()
  526. if err != nil {
  527. select {
  528. case listenerError <- errors.Trace(err):
  529. default:
  530. }
  531. return
  532. }
  533. }()
  534. wg.Add(1)
  535. go func() {
  536. // start handling TLS-OSSH connections as they are demultiplexed
  537. defer wg.Done()
  538. // Override the listener tunnel protocol to report TLS-OSSH instead.
  539. runListener(listeners[1], sshServer.shutdownBroadcast, listenerError, protocol.TUNNEL_PROTOCOL_TLS_OBFUSCATED_SSH, handleClient)
  540. }()
  541. wg.Add(1)
  542. go func() {
  543. // start handling meek HTTPS connections as they are
  544. // demultiplexed
  545. defer wg.Done()
  546. meekServer, err := NewMeekServer(
  547. sshServer.support,
  548. listeners[0],
  549. sshListener.tunnelProtocol,
  550. sshListener.port,
  551. false,
  552. protocol.TunnelProtocolUsesFrontedMeek(sshListener.tunnelProtocol),
  553. protocol.TunnelProtocolUsesObfuscatedSessionTickets(sshListener.tunnelProtocol),
  554. true,
  555. handleClient,
  556. sshServer.shutdownBroadcast)
  557. if err == nil {
  558. err = meekServer.Run()
  559. }
  560. if err != nil {
  561. select {
  562. case listenerError <- errors.Trace(err):
  563. default:
  564. }
  565. return
  566. }
  567. }()
  568. wg.Wait()
  569. }
  570. func runListener(listener net.Listener, shutdownBroadcast <-chan struct{}, listenerError chan<- error, overrideTunnelProtocol string, handleClient func(clientTunnelProtocol string, clientConn net.Conn)) {
  571. for {
  572. conn, err := listener.Accept()
  573. select {
  574. case <-shutdownBroadcast:
  575. if err == nil {
  576. conn.Close()
  577. }
  578. return
  579. default:
  580. }
  581. if err != nil {
  582. if e, ok := err.(net.Error); ok && e.Temporary() {
  583. log.WithTraceFields(LogFields{"error": err}).Error("accept failed")
  584. // Temporary error, keep running
  585. continue
  586. } else if std_errors.Is(err, errRestrictedProvider) {
  587. log.WithTraceFields(LogFields{"error": err}).Error("accept rejected client")
  588. // Restricted provider, keep running
  589. continue
  590. }
  591. select {
  592. case listenerError <- errors.Trace(err):
  593. default:
  594. }
  595. return
  596. }
  597. handleClient(overrideTunnelProtocol, conn)
  598. }
  599. }
  600. // An accepted client has completed a direct TCP or meek connection and has a net.Conn. Registration
  601. // is for tracking the number of connections.
  602. func (sshServer *sshServer) registerAcceptedClient(tunnelProtocol, region string) {
  603. sshServer.clientsMutex.Lock()
  604. defer sshServer.clientsMutex.Unlock()
  605. if sshServer.acceptedClientCounts[tunnelProtocol] == nil {
  606. sshServer.acceptedClientCounts[tunnelProtocol] = make(map[string]int64)
  607. }
  608. sshServer.acceptedClientCounts[tunnelProtocol][region] += 1
  609. }
  610. func (sshServer *sshServer) unregisterAcceptedClient(tunnelProtocol, region string) {
  611. sshServer.clientsMutex.Lock()
  612. defer sshServer.clientsMutex.Unlock()
  613. sshServer.acceptedClientCounts[tunnelProtocol][region] -= 1
  614. }
  615. // An established client has completed its SSH handshake and has a ssh.Conn. Registration is
  616. // for tracking the number of fully established clients and for maintaining a list of running
  617. // clients (for stopping at shutdown time).
  618. func (sshServer *sshServer) registerEstablishedClient(client *sshClient) bool {
  619. sshServer.clientsMutex.Lock()
  620. if sshServer.stoppingClients {
  621. sshServer.clientsMutex.Unlock()
  622. return false
  623. }
  624. // In the case of a duplicate client sessionID, the previous client is closed.
  625. // - Well-behaved clients generate a random sessionID that should be unique (won't
  626. // accidentally conflict) and hard to guess (can't be targeted by a malicious
  627. // client).
  628. // - Clients reuse the same sessionID when a tunnel is unexpectedly disconnected
  629. // and reestablished. In this case, when the same server is selected, this logic
  630. // will be hit; closing the old, dangling client is desirable.
  631. // - Multi-tunnel clients should not normally use one server for multiple tunnels.
  632. existingClient := sshServer.clients[client.sessionID]
  633. sshServer.clientsMutex.Unlock()
  634. if existingClient != nil {
  635. // This case is expected to be common, and so logged at the lowest severity
  636. // level.
  637. log.WithTrace().Debug(
  638. "stopping existing client with duplicate session ID")
  639. existingClient.stop()
  640. // Block until the existingClient is fully terminated. This is necessary to
  641. // avoid this scenario:
  642. // - existingClient is invoking handshakeAPIRequestHandler
  643. // - sshServer.clients[client.sessionID] is updated to point to new client
  644. // - existingClient's handshakeAPIRequestHandler invokes
  645. // SetClientHandshakeState but sets the handshake parameters for new
  646. // client
  647. // - as a result, the new client handshake will fail (only a single handshake
  648. // is permitted) and the new client server_tunnel log will contain an
  649. // invalid mix of existing/new client fields
  650. //
  651. // Once existingClient.awaitStopped returns, all existingClient port
  652. // forwards and request handlers have terminated, so no API handler, either
  653. // tunneled web API or SSH API, will remain and it is safe to point
  654. // sshServer.clients[client.sessionID] to the new client.
  655. // Limitation: this scenario remains possible with _untunneled_ web API
  656. // requests.
  657. //
  658. // Blocking also ensures existingClient.releaseAuthorizations is invoked before
  659. // the new client attempts to submit the same authorizations.
  660. //
  661. // Perform blocking awaitStopped operation outside the
  662. // sshServer.clientsMutex mutex to avoid blocking all other clients for the
  663. // duration. We still expect and require that the stop process completes
  664. // rapidly, e.g., does not block on network I/O, allowing the new client
  665. // connection to proceed without delay.
  666. //
  667. // In addition, operations triggered by stop, and which must complete before
  668. // awaitStopped returns, will attempt to lock sshServer.clientsMutex,
  669. // including unregisterEstablishedClient.
  670. existingClient.awaitStopped()
  671. }
  672. sshServer.clientsMutex.Lock()
  673. defer sshServer.clientsMutex.Unlock()
  674. // existingClient's stop will have removed it from sshServer.clients via
  675. // unregisterEstablishedClient, so sshServer.clients[client.sessionID] should
  676. // be nil -- unless yet another client instance using the same sessionID has
  677. // connected in the meantime while awaiting existingClient stop. In this
  678. // case, it's not clear which is the most recent connection from the client,
  679. // so instead of this connection terminating more peers, it aborts.
  680. if sshServer.clients[client.sessionID] != nil {
  681. // As this is expected to be rare case, it's logged at a higher severity
  682. // level.
  683. log.WithTrace().Warning(
  684. "aborting new client with duplicate session ID")
  685. return false
  686. }
  687. sshServer.clients[client.sessionID] = client
  688. return true
  689. }
  690. func (sshServer *sshServer) unregisterEstablishedClient(client *sshClient) {
  691. sshServer.clientsMutex.Lock()
  692. registeredClient := sshServer.clients[client.sessionID]
  693. // registeredClient will differ from client when client is the existingClient
  694. // terminated in registerEstablishedClient. In that case, registeredClient
  695. // remains connected, and the sshServer.clients entry should be retained.
  696. if registeredClient == client {
  697. delete(sshServer.clients, client.sessionID)
  698. }
  699. sshServer.clientsMutex.Unlock()
  700. client.stop()
  701. }
  702. type UpstreamStats map[string]interface{}
  703. type ProtocolStats map[string]map[string]interface{}
  704. type RegionStats map[string]map[string]map[string]interface{}
  705. func (sshServer *sshServer) getLoadStats() (
  706. UpstreamStats, ProtocolStats, RegionStats) {
  707. sshServer.clientsMutex.Lock()
  708. defer sshServer.clientsMutex.Unlock()
  709. // Explicitly populate with zeros to ensure 0 counts in log messages.
  710. zeroClientStats := func() map[string]interface{} {
  711. stats := make(map[string]interface{})
  712. stats["accepted_clients"] = int64(0)
  713. stats["established_clients"] = int64(0)
  714. return stats
  715. }
  716. // Due to hot reload and changes to the underlying system configuration, the
  717. // set of resolver IPs may change between getLoadStats calls, so this
  718. // enumeration for zeroing is a best effort.
  719. resolverIPs := sshServer.support.DNSResolver.GetAll()
  720. // Fields which are primarily concerned with upstream/egress performance.
  721. zeroUpstreamStats := func() map[string]interface{} {
  722. stats := make(map[string]interface{})
  723. stats["dialing_tcp_port_forwards"] = int64(0)
  724. stats["tcp_port_forwards"] = int64(0)
  725. stats["total_tcp_port_forwards"] = int64(0)
  726. stats["udp_port_forwards"] = int64(0)
  727. stats["total_udp_port_forwards"] = int64(0)
  728. stats["tcp_port_forward_dialed_count"] = int64(0)
  729. stats["tcp_port_forward_dialed_duration"] = int64(0)
  730. stats["tcp_port_forward_failed_count"] = int64(0)
  731. stats["tcp_port_forward_failed_duration"] = int64(0)
  732. stats["tcp_port_forward_rejected_dialing_limit_count"] = int64(0)
  733. stats["tcp_port_forward_rejected_disallowed_count"] = int64(0)
  734. stats["udp_port_forward_rejected_disallowed_count"] = int64(0)
  735. stats["tcp_ipv4_port_forward_dialed_count"] = int64(0)
  736. stats["tcp_ipv4_port_forward_dialed_duration"] = int64(0)
  737. stats["tcp_ipv4_port_forward_failed_count"] = int64(0)
  738. stats["tcp_ipv4_port_forward_failed_duration"] = int64(0)
  739. stats["tcp_ipv6_port_forward_dialed_count"] = int64(0)
  740. stats["tcp_ipv6_port_forward_dialed_duration"] = int64(0)
  741. stats["tcp_ipv6_port_forward_failed_count"] = int64(0)
  742. stats["tcp_ipv6_port_forward_failed_duration"] = int64(0)
  743. zeroDNSStats := func() map[string]int64 {
  744. m := map[string]int64{"ALL": 0}
  745. for _, resolverIP := range resolverIPs {
  746. m[resolverIP.String()] = 0
  747. }
  748. return m
  749. }
  750. stats["dns_count"] = zeroDNSStats()
  751. stats["dns_duration"] = zeroDNSStats()
  752. stats["dns_failed_count"] = zeroDNSStats()
  753. stats["dns_failed_duration"] = zeroDNSStats()
  754. return stats
  755. }
  756. zeroProtocolStats := func() map[string]map[string]interface{} {
  757. stats := make(map[string]map[string]interface{})
  758. stats["ALL"] = zeroClientStats()
  759. for tunnelProtocol := range sshServer.support.Config.TunnelProtocolPorts {
  760. stats[tunnelProtocol] = zeroClientStats()
  761. if sshServer.tunnelProtocolUsesTLSDemux(tunnelProtocol) {
  762. stats[protocol.TUNNEL_PROTOCOL_TLS_OBFUSCATED_SSH] = zeroClientStats()
  763. }
  764. }
  765. return stats
  766. }
  767. addInt64 := func(stats map[string]interface{}, name string, value int64) {
  768. stats[name] = stats[name].(int64) + value
  769. }
  770. upstreamStats := zeroUpstreamStats()
  771. // [<protocol or ALL>][<stat name>] -> count
  772. protocolStats := zeroProtocolStats()
  773. // [<region][<protocol or ALL>][<stat name>] -> count
  774. regionStats := make(RegionStats)
  775. // Note: as currently tracked/counted, each established client is also an accepted client
  776. for tunnelProtocol, regionAcceptedClientCounts := range sshServer.acceptedClientCounts {
  777. for region, acceptedClientCount := range regionAcceptedClientCounts {
  778. if acceptedClientCount > 0 {
  779. if regionStats[region] == nil {
  780. regionStats[region] = zeroProtocolStats()
  781. }
  782. addInt64(protocolStats["ALL"], "accepted_clients", acceptedClientCount)
  783. addInt64(protocolStats[tunnelProtocol], "accepted_clients", acceptedClientCount)
  784. addInt64(regionStats[region]["ALL"], "accepted_clients", acceptedClientCount)
  785. addInt64(regionStats[region][tunnelProtocol], "accepted_clients", acceptedClientCount)
  786. }
  787. }
  788. }
  789. for _, client := range sshServer.clients {
  790. client.Lock()
  791. tunnelProtocol := client.tunnelProtocol
  792. region := client.geoIPData.Country
  793. if regionStats[region] == nil {
  794. regionStats[region] = zeroProtocolStats()
  795. }
  796. for _, stats := range []map[string]interface{}{
  797. protocolStats["ALL"],
  798. protocolStats[tunnelProtocol],
  799. regionStats[region]["ALL"],
  800. regionStats[region][tunnelProtocol]} {
  801. addInt64(stats, "established_clients", 1)
  802. }
  803. // Note:
  804. // - can't sum trafficState.peakConcurrentPortForwardCount to get a global peak
  805. // - client.udpTrafficState.concurrentDialingPortForwardCount isn't meaningful
  806. addInt64(upstreamStats, "dialing_tcp_port_forwards",
  807. client.tcpTrafficState.concurrentDialingPortForwardCount)
  808. addInt64(upstreamStats, "tcp_port_forwards",
  809. client.tcpTrafficState.concurrentPortForwardCount)
  810. addInt64(upstreamStats, "total_tcp_port_forwards",
  811. client.tcpTrafficState.totalPortForwardCount)
  812. addInt64(upstreamStats, "udp_port_forwards",
  813. client.udpTrafficState.concurrentPortForwardCount)
  814. addInt64(upstreamStats, "total_udp_port_forwards",
  815. client.udpTrafficState.totalPortForwardCount)
  816. addInt64(upstreamStats, "tcp_port_forward_dialed_count",
  817. client.qualityMetrics.TCPPortForwardDialedCount)
  818. addInt64(upstreamStats, "tcp_port_forward_dialed_duration",
  819. int64(client.qualityMetrics.TCPPortForwardDialedDuration/time.Millisecond))
  820. addInt64(upstreamStats, "tcp_port_forward_failed_count",
  821. client.qualityMetrics.TCPPortForwardFailedCount)
  822. addInt64(upstreamStats, "tcp_port_forward_failed_duration",
  823. int64(client.qualityMetrics.TCPPortForwardFailedDuration/time.Millisecond))
  824. addInt64(upstreamStats, "tcp_port_forward_rejected_dialing_limit_count",
  825. client.qualityMetrics.TCPPortForwardRejectedDialingLimitCount)
  826. addInt64(upstreamStats, "tcp_port_forward_rejected_disallowed_count",
  827. client.qualityMetrics.TCPPortForwardRejectedDisallowedCount)
  828. addInt64(upstreamStats, "udp_port_forward_rejected_disallowed_count",
  829. client.qualityMetrics.UDPPortForwardRejectedDisallowedCount)
  830. addInt64(upstreamStats, "tcp_ipv4_port_forward_dialed_count",
  831. client.qualityMetrics.TCPIPv4PortForwardDialedCount)
  832. addInt64(upstreamStats, "tcp_ipv4_port_forward_dialed_duration",
  833. int64(client.qualityMetrics.TCPIPv4PortForwardDialedDuration/time.Millisecond))
  834. addInt64(upstreamStats, "tcp_ipv4_port_forward_failed_count",
  835. client.qualityMetrics.TCPIPv4PortForwardFailedCount)
  836. addInt64(upstreamStats, "tcp_ipv4_port_forward_failed_duration",
  837. int64(client.qualityMetrics.TCPIPv4PortForwardFailedDuration/time.Millisecond))
  838. addInt64(upstreamStats, "tcp_ipv6_port_forward_dialed_count",
  839. client.qualityMetrics.TCPIPv6PortForwardDialedCount)
  840. addInt64(upstreamStats, "tcp_ipv6_port_forward_dialed_duration",
  841. int64(client.qualityMetrics.TCPIPv6PortForwardDialedDuration/time.Millisecond))
  842. addInt64(upstreamStats, "tcp_ipv6_port_forward_failed_count",
  843. client.qualityMetrics.TCPIPv6PortForwardFailedCount)
  844. addInt64(upstreamStats, "tcp_ipv6_port_forward_failed_duration",
  845. int64(client.qualityMetrics.TCPIPv6PortForwardFailedDuration/time.Millisecond))
  846. // DNS metrics limitations:
  847. // - port forwards (sshClient.handleTCPChannel) don't know or log the resolver IP.
  848. // - udpgw and packet tunnel transparent DNS use a heuristic to classify success/failure,
  849. // and there may be some delay before these code paths report DNS metrics.
  850. // Every client.qualityMetrics DNS map has an "ALL" entry.
  851. totalDNSCount := int64(0)
  852. totalDNSFailedCount := int64(0)
  853. for key, value := range client.qualityMetrics.DNSCount {
  854. upstreamStats["dns_count"].(map[string]int64)[key] += value
  855. totalDNSCount += value
  856. }
  857. for key, value := range client.qualityMetrics.DNSDuration {
  858. upstreamStats["dns_duration"].(map[string]int64)[key] += int64(value / time.Millisecond)
  859. }
  860. for key, value := range client.qualityMetrics.DNSFailedCount {
  861. upstreamStats["dns_failed_count"].(map[string]int64)[key] += value
  862. totalDNSFailedCount += value
  863. }
  864. for key, value := range client.qualityMetrics.DNSFailedDuration {
  865. upstreamStats["dns_failed_duration"].(map[string]int64)[key] += int64(value / time.Millisecond)
  866. }
  867. // Update client peak failure rate metrics, to be recorded in
  868. // server_tunnel.
  869. //
  870. // Limitations:
  871. //
  872. // - This is a simple data sampling that doesn't require additional
  873. // timers or tracking logic. Since the rates are calculated on
  874. // getLoadStats events and using accumulated counts, these peaks
  875. // only represent the highest failure rate within a
  876. // Config.LoadMonitorPeriodSeconds non-sliding window. There is no
  877. // sample recorded for short tunnels with no overlapping
  878. // getLoadStats event.
  879. //
  880. // - There is no minimum sample window, as a getLoadStats event may
  881. // occur immediately after a client first connects. This may be
  882. // compensated for by adjusting
  883. // Config.PeakUpstreamFailureRateMinimumSampleSize, so as to only
  884. // consider failure rates with a larger number of samples.
  885. //
  886. // - Non-UDP "failures" are not currently tracked.
  887. minimumSampleSize := int64(sshServer.support.Config.peakUpstreamFailureRateMinimumSampleSize)
  888. sampleSize := client.qualityMetrics.TCPPortForwardDialedCount +
  889. client.qualityMetrics.TCPPortForwardFailedCount
  890. if sampleSize >= minimumSampleSize {
  891. TCPPortForwardFailureRate := float64(client.qualityMetrics.TCPPortForwardFailedCount) /
  892. float64(sampleSize)
  893. if client.peakMetrics.TCPPortForwardFailureRate == nil {
  894. client.peakMetrics.TCPPortForwardFailureRate = new(float64)
  895. *client.peakMetrics.TCPPortForwardFailureRate = TCPPortForwardFailureRate
  896. client.peakMetrics.TCPPortForwardFailureRateSampleSize = new(int64)
  897. *client.peakMetrics.TCPPortForwardFailureRateSampleSize = sampleSize
  898. } else if *client.peakMetrics.TCPPortForwardFailureRate < TCPPortForwardFailureRate {
  899. *client.peakMetrics.TCPPortForwardFailureRate = TCPPortForwardFailureRate
  900. *client.peakMetrics.TCPPortForwardFailureRateSampleSize = sampleSize
  901. }
  902. }
  903. sampleSize = totalDNSCount + totalDNSFailedCount
  904. if sampleSize >= minimumSampleSize {
  905. DNSFailureRate := float64(totalDNSFailedCount) / float64(sampleSize)
  906. if client.peakMetrics.DNSFailureRate == nil {
  907. client.peakMetrics.DNSFailureRate = new(float64)
  908. *client.peakMetrics.DNSFailureRate = DNSFailureRate
  909. client.peakMetrics.DNSFailureRateSampleSize = new(int64)
  910. *client.peakMetrics.DNSFailureRateSampleSize = sampleSize
  911. } else if *client.peakMetrics.DNSFailureRate < DNSFailureRate {
  912. *client.peakMetrics.DNSFailureRate = DNSFailureRate
  913. *client.peakMetrics.DNSFailureRateSampleSize = sampleSize
  914. }
  915. }
  916. // Reset quality metrics counters
  917. client.qualityMetrics.reset()
  918. client.Unlock()
  919. }
  920. for _, client := range sshServer.clients {
  921. client.Lock()
  922. // Update client peak proximate (same region) concurrently connected
  923. // (other clients) client metrics, to be recorded in server_tunnel.
  924. // This operation requires a second loop over sshServer.clients since
  925. // established_clients is calculated in the first loop.
  926. //
  927. // Limitations:
  928. //
  929. // - This is an approximation, not a true peak, as it only samples
  930. // data every Config.LoadMonitorPeriodSeconds period. There is no
  931. // sample recorded for short tunnels with no overlapping
  932. // getLoadStats event.
  933. //
  934. // - The "-1" calculation counts all but the current client as other
  935. // clients; it can be the case that the same client has a dangling
  936. // accepted connection that has yet to time-out server side. Due to
  937. // NAT, we can't determine if the client is the same based on
  938. // network address. For established clients,
  939. // registerEstablishedClient ensures that any previous connection
  940. // is first terminated, although this is only for the same
  941. // session_id. Concurrent proximate clients may be considered an
  942. // exact number of other _network connections_, even from the same
  943. // client.
  944. region := client.geoIPData.Country
  945. stats := regionStats[region]["ALL"]
  946. n := stats["accepted_clients"].(int64) - 1
  947. if n >= 0 {
  948. if client.peakMetrics.concurrentProximateAcceptedClients == nil {
  949. client.peakMetrics.concurrentProximateAcceptedClients = new(int64)
  950. *client.peakMetrics.concurrentProximateAcceptedClients = n
  951. } else if *client.peakMetrics.concurrentProximateAcceptedClients < n {
  952. *client.peakMetrics.concurrentProximateAcceptedClients = n
  953. }
  954. }
  955. n = stats["established_clients"].(int64) - 1
  956. if n >= 0 {
  957. if client.peakMetrics.concurrentProximateEstablishedClients == nil {
  958. client.peakMetrics.concurrentProximateEstablishedClients = new(int64)
  959. *client.peakMetrics.concurrentProximateEstablishedClients = n
  960. } else if *client.peakMetrics.concurrentProximateEstablishedClients < n {
  961. *client.peakMetrics.concurrentProximateEstablishedClients = n
  962. }
  963. }
  964. client.Unlock()
  965. }
  966. return upstreamStats, protocolStats, regionStats
  967. }
  968. func (sshServer *sshServer) getEstablishedClientCount() int {
  969. sshServer.clientsMutex.Lock()
  970. defer sshServer.clientsMutex.Unlock()
  971. establishedClients := len(sshServer.clients)
  972. return establishedClients
  973. }
  974. func (sshServer *sshServer) resetAllClientTrafficRules() {
  975. sshServer.clientsMutex.Lock()
  976. clients := make(map[string]*sshClient)
  977. for sessionID, client := range sshServer.clients {
  978. clients[sessionID] = client
  979. }
  980. sshServer.clientsMutex.Unlock()
  981. for _, client := range clients {
  982. client.setTrafficRules()
  983. }
  984. }
  985. func (sshServer *sshServer) resetAllClientOSLConfigs() {
  986. // Flush cached seed state. This has the same effect
  987. // and same limitations as calling setOSLConfig for
  988. // currently connected clients -- all progress is lost.
  989. sshServer.oslSessionCacheMutex.Lock()
  990. sshServer.oslSessionCache.Flush()
  991. sshServer.oslSessionCacheMutex.Unlock()
  992. sshServer.clientsMutex.Lock()
  993. clients := make(map[string]*sshClient)
  994. for sessionID, client := range sshServer.clients {
  995. clients[sessionID] = client
  996. }
  997. sshServer.clientsMutex.Unlock()
  998. for _, client := range clients {
  999. client.setOSLConfig()
  1000. }
  1001. }
  1002. func (sshServer *sshServer) setClientHandshakeState(
  1003. sessionID string,
  1004. state handshakeState,
  1005. authorizations []string) (*handshakeStateInfo, error) {
  1006. sshServer.clientsMutex.Lock()
  1007. client := sshServer.clients[sessionID]
  1008. sshServer.clientsMutex.Unlock()
  1009. if client == nil {
  1010. return nil, errors.TraceNew("unknown session ID")
  1011. }
  1012. handshakeStateInfo, err := client.setHandshakeState(
  1013. state, authorizations)
  1014. if err != nil {
  1015. return nil, errors.Trace(err)
  1016. }
  1017. return handshakeStateInfo, nil
  1018. }
  1019. func (sshServer *sshServer) getClientHandshaked(
  1020. sessionID string) (bool, bool, error) {
  1021. sshServer.clientsMutex.Lock()
  1022. client := sshServer.clients[sessionID]
  1023. sshServer.clientsMutex.Unlock()
  1024. if client == nil {
  1025. return false, false, errors.TraceNew("unknown session ID")
  1026. }
  1027. completed, exhausted := client.getHandshaked()
  1028. return completed, exhausted, nil
  1029. }
  1030. func (sshServer *sshServer) getClientDisableDiscovery(
  1031. sessionID string) (bool, error) {
  1032. sshServer.clientsMutex.Lock()
  1033. client := sshServer.clients[sessionID]
  1034. sshServer.clientsMutex.Unlock()
  1035. if client == nil {
  1036. return false, errors.TraceNew("unknown session ID")
  1037. }
  1038. return client.getDisableDiscovery(), nil
  1039. }
  1040. func (sshServer *sshServer) updateClientAPIParameters(
  1041. sessionID string,
  1042. apiParams common.APIParameters) error {
  1043. sshServer.clientsMutex.Lock()
  1044. client := sshServer.clients[sessionID]
  1045. sshServer.clientsMutex.Unlock()
  1046. if client == nil {
  1047. return errors.TraceNew("unknown session ID")
  1048. }
  1049. client.updateAPIParameters(apiParams)
  1050. return nil
  1051. }
  1052. func (sshServer *sshServer) revokeClientAuthorizations(sessionID string) {
  1053. sshServer.clientsMutex.Lock()
  1054. client := sshServer.clients[sessionID]
  1055. sshServer.clientsMutex.Unlock()
  1056. if client == nil {
  1057. return
  1058. }
  1059. // sshClient.handshakeState.authorizedAccessTypes is not cleared. Clearing
  1060. // authorizedAccessTypes may cause sshClient.logTunnel to fail to log
  1061. // access types. As the revocation may be due to legitimate use of an
  1062. // authorization in multiple sessions by a single client, useful metrics
  1063. // would be lost.
  1064. client.Lock()
  1065. client.handshakeState.authorizationsRevoked = true
  1066. client.Unlock()
  1067. // Select and apply new traffic rules, as filtered by the client's new
  1068. // authorization state.
  1069. client.setTrafficRules()
  1070. }
  1071. func (sshServer *sshServer) acceptClientDomainBytes(
  1072. sessionID string) (bool, error) {
  1073. sshServer.clientsMutex.Lock()
  1074. client := sshServer.clients[sessionID]
  1075. sshServer.clientsMutex.Unlock()
  1076. if client == nil {
  1077. return false, errors.TraceNew("unknown session ID")
  1078. }
  1079. return client.acceptDomainBytes(), nil
  1080. }
  1081. func (sshServer *sshServer) stopClients() {
  1082. sshServer.clientsMutex.Lock()
  1083. sshServer.stoppingClients = true
  1084. clients := sshServer.clients
  1085. sshServer.clients = make(map[string]*sshClient)
  1086. sshServer.clientsMutex.Unlock()
  1087. for _, client := range clients {
  1088. client.stop()
  1089. }
  1090. }
  1091. func (sshServer *sshServer) handleClient(
  1092. sshListener *sshListener, tunnelProtocol string, clientConn net.Conn) {
  1093. // Calling clientConn.RemoteAddr at this point, before any Read calls,
  1094. // satisfies the constraint documented in tapdance.Listen.
  1095. clientAddr := clientConn.RemoteAddr()
  1096. // Check if there were irregularities during the network connection
  1097. // establishment. When present, log and then behave as Obfuscated SSH does
  1098. // when the client fails to provide a valid seed message.
  1099. //
  1100. // One concrete irregular case is failure to send a PROXY protocol header for
  1101. // TAPDANCE-OSSH.
  1102. if indicator, ok := clientConn.(common.IrregularIndicator); ok {
  1103. tunnelErr := indicator.IrregularTunnelError()
  1104. if tunnelErr != nil {
  1105. logIrregularTunnel(
  1106. sshServer.support,
  1107. sshListener.tunnelProtocol,
  1108. sshListener.port,
  1109. common.IPAddressFromAddr(clientAddr),
  1110. errors.Trace(tunnelErr),
  1111. nil)
  1112. var afterFunc *time.Timer
  1113. if sshServer.support.Config.sshHandshakeTimeout > 0 {
  1114. afterFunc = time.AfterFunc(sshServer.support.Config.sshHandshakeTimeout, func() {
  1115. clientConn.Close()
  1116. })
  1117. }
  1118. io.Copy(ioutil.Discard, clientConn)
  1119. clientConn.Close()
  1120. afterFunc.Stop()
  1121. return
  1122. }
  1123. }
  1124. // Get any packet manipulation values from GetAppliedSpecName as soon as
  1125. // possible due to the expiring TTL.
  1126. serverPacketManipulation := ""
  1127. replayedServerPacketManipulation := false
  1128. if sshServer.support.Config.RunPacketManipulator &&
  1129. protocol.TunnelProtocolMayUseServerPacketManipulation(tunnelProtocol) {
  1130. // A meekConn has synthetic address values, including the original client
  1131. // address in cases where the client uses an upstream proxy to connect to
  1132. // Psiphon. For meekConn, and any other conn implementing
  1133. // UnderlyingTCPAddrSource, get the underlying TCP connection addresses.
  1134. //
  1135. // Limitation: a meek tunnel may consist of several TCP connections. The
  1136. // server_packet_manipulation metric will reflect the packet manipulation
  1137. // applied to the _first_ TCP connection only.
  1138. var localAddr, remoteAddr *net.TCPAddr
  1139. var ok bool
  1140. underlying, ok := clientConn.(common.UnderlyingTCPAddrSource)
  1141. if ok {
  1142. localAddr, remoteAddr, ok = underlying.GetUnderlyingTCPAddrs()
  1143. } else {
  1144. localAddr, ok = clientConn.LocalAddr().(*net.TCPAddr)
  1145. if ok {
  1146. remoteAddr, ok = clientConn.RemoteAddr().(*net.TCPAddr)
  1147. }
  1148. }
  1149. if ok {
  1150. specName, extraData, err := sshServer.support.PacketManipulator.
  1151. GetAppliedSpecName(localAddr, remoteAddr)
  1152. if err == nil {
  1153. serverPacketManipulation = specName
  1154. replayedServerPacketManipulation, _ = extraData.(bool)
  1155. }
  1156. }
  1157. }
  1158. geoIPData := sshServer.support.GeoIPService.Lookup(
  1159. common.IPAddressFromAddr(clientAddr))
  1160. sshServer.registerAcceptedClient(tunnelProtocol, geoIPData.Country)
  1161. defer sshServer.unregisterAcceptedClient(tunnelProtocol, geoIPData.Country)
  1162. // When configured, enforce a cap on the number of concurrent SSH
  1163. // handshakes. This limits load spikes on busy servers when many clients
  1164. // attempt to connect at once. Wait a short time, SSH_BEGIN_HANDSHAKE_TIMEOUT,
  1165. // to acquire; waiting will avoid immediately creating more load on another
  1166. // server in the network when the client tries a new candidate. Disconnect the
  1167. // client when that wait time is exceeded.
  1168. //
  1169. // This mechanism limits memory allocations and CPU usage associated with the
  1170. // SSH handshake. At this point, new direct TCP connections or new meek
  1171. // connections, with associated resource usage, are already established. Those
  1172. // connections are expected to be rate or load limited using other mechanisms.
  1173. //
  1174. // TODO:
  1175. //
  1176. // - deduct time spent acquiring the semaphore from SSH_HANDSHAKE_TIMEOUT in
  1177. // sshClient.run, since the client is also applying an SSH handshake timeout
  1178. // and won't exclude time spent waiting.
  1179. // - each call to sshServer.handleClient (in sshServer.runListener) is invoked
  1180. // in its own goroutine, but shutdown doesn't synchronously await these
  1181. // goroutnes. Once this is synchronizes, the following context.WithTimeout
  1182. // should use an sshServer parent context to ensure blocking acquires
  1183. // interrupt immediately upon shutdown.
  1184. var onSSHHandshakeFinished func()
  1185. if sshServer.support.Config.MaxConcurrentSSHHandshakes > 0 {
  1186. ctx, cancelFunc := context.WithTimeout(
  1187. context.Background(),
  1188. sshServer.support.Config.sshBeginHandshakeTimeout)
  1189. defer cancelFunc()
  1190. err := sshServer.concurrentSSHHandshakes.Acquire(ctx, 1)
  1191. if err != nil {
  1192. clientConn.Close()
  1193. // This is a debug log as the only possible error is context timeout.
  1194. log.WithTraceFields(LogFields{"error": err}).Debug(
  1195. "acquire SSH handshake semaphore failed")
  1196. return
  1197. }
  1198. onSSHHandshakeFinished = func() {
  1199. sshServer.concurrentSSHHandshakes.Release(1)
  1200. }
  1201. }
  1202. sshClient := newSshClient(
  1203. sshServer,
  1204. sshListener,
  1205. tunnelProtocol,
  1206. serverPacketManipulation,
  1207. replayedServerPacketManipulation,
  1208. clientAddr,
  1209. geoIPData)
  1210. // sshClient.run _must_ call onSSHHandshakeFinished to release the semaphore:
  1211. // in any error case; or, as soon as the SSH handshake phase has successfully
  1212. // completed.
  1213. sshClient.run(clientConn, onSSHHandshakeFinished)
  1214. }
  1215. func (sshServer *sshServer) monitorPortForwardDialError(err error) {
  1216. // "err" is the error returned from a failed TCP or UDP port
  1217. // forward dial. Certain system error codes indicate low resource
  1218. // conditions: insufficient file descriptors, ephemeral ports, or
  1219. // memory. For these cases, log an alert.
  1220. // TODO: also temporarily suspend new clients
  1221. // Note: don't log net.OpError.Error() as the full error string
  1222. // may contain client destination addresses.
  1223. opErr, ok := err.(*net.OpError)
  1224. if ok {
  1225. if opErr.Err == syscall.EADDRNOTAVAIL ||
  1226. opErr.Err == syscall.EAGAIN ||
  1227. opErr.Err == syscall.ENOMEM ||
  1228. opErr.Err == syscall.EMFILE ||
  1229. opErr.Err == syscall.ENFILE {
  1230. log.WithTraceFields(
  1231. LogFields{"error": opErr.Err}).Error(
  1232. "port forward dial failed due to unavailable resource")
  1233. }
  1234. }
  1235. }
  1236. // tunnelProtocolUsesTLSDemux returns true if the server demultiplexes the given
  1237. // protocol and TLS-OSSH over the same port.
  1238. func (sshServer *sshServer) tunnelProtocolUsesTLSDemux(tunnelProtocol string) bool {
  1239. // Only use meek/TLS-OSSH demux if unfronted meek HTTPS with non-legacy passthrough.
  1240. if protocol.TunnelProtocolUsesMeekHTTPS(tunnelProtocol) && !protocol.TunnelProtocolUsesFrontedMeek(tunnelProtocol) {
  1241. _, passthroughEnabled := sshServer.support.Config.TunnelProtocolPassthroughAddresses[tunnelProtocol]
  1242. return passthroughEnabled && !sshServer.support.Config.LegacyPassthrough
  1243. }
  1244. return false
  1245. }
  1246. type sshClient struct {
  1247. sync.Mutex
  1248. sshServer *sshServer
  1249. sshListener *sshListener
  1250. tunnelProtocol string
  1251. sshConn ssh.Conn
  1252. throttledConn *common.ThrottledConn
  1253. serverPacketManipulation string
  1254. replayedServerPacketManipulation bool
  1255. clientAddr net.Addr
  1256. geoIPData GeoIPData
  1257. sessionID string
  1258. isFirstTunnelInSession bool
  1259. supportsServerRequests bool
  1260. handshakeState handshakeState
  1261. udpgwChannelHandler *udpgwPortForwardMultiplexer
  1262. totalUdpgwChannelCount int
  1263. packetTunnelChannel ssh.Channel
  1264. totalPacketTunnelChannelCount int
  1265. trafficRules TrafficRules
  1266. tcpTrafficState trafficState
  1267. udpTrafficState trafficState
  1268. qualityMetrics *qualityMetrics
  1269. tcpPortForwardLRU *common.LRUConns
  1270. oslClientSeedState *osl.ClientSeedState
  1271. signalIssueSLOKs chan struct{}
  1272. runCtx context.Context
  1273. stopRunning context.CancelFunc
  1274. stopped chan struct{}
  1275. tcpPortForwardDialingAvailableSignal context.CancelFunc
  1276. releaseAuthorizations func()
  1277. stopTimer *time.Timer
  1278. preHandshakeRandomStreamMetrics randomStreamMetrics
  1279. postHandshakeRandomStreamMetrics randomStreamMetrics
  1280. sendAlertRequests chan protocol.AlertRequest
  1281. sentAlertRequests map[string]bool
  1282. peakMetrics peakMetrics
  1283. destinationBytesMetricsASN string
  1284. tcpDestinationBytesMetrics destinationBytesMetrics
  1285. udpDestinationBytesMetrics destinationBytesMetrics
  1286. }
  1287. type trafficState struct {
  1288. bytesUp int64
  1289. bytesDown int64
  1290. concurrentDialingPortForwardCount int64
  1291. peakConcurrentDialingPortForwardCount int64
  1292. concurrentPortForwardCount int64
  1293. peakConcurrentPortForwardCount int64
  1294. totalPortForwardCount int64
  1295. availablePortForwardCond *sync.Cond
  1296. }
  1297. type randomStreamMetrics struct {
  1298. count int64
  1299. upstreamBytes int64
  1300. receivedUpstreamBytes int64
  1301. downstreamBytes int64
  1302. sentDownstreamBytes int64
  1303. }
  1304. type peakMetrics struct {
  1305. concurrentProximateAcceptedClients *int64
  1306. concurrentProximateEstablishedClients *int64
  1307. TCPPortForwardFailureRate *float64
  1308. TCPPortForwardFailureRateSampleSize *int64
  1309. DNSFailureRate *float64
  1310. DNSFailureRateSampleSize *int64
  1311. }
  1312. // qualityMetrics records upstream TCP dial attempts and
  1313. // elapsed time. Elapsed time includes the full TCP handshake
  1314. // and, in aggregate, is a measure of the quality of the
  1315. // upstream link. These stats are recorded by each sshClient
  1316. // and then reported and reset in sshServer.getLoadStats().
  1317. type qualityMetrics struct {
  1318. TCPPortForwardDialedCount int64
  1319. TCPPortForwardDialedDuration time.Duration
  1320. TCPPortForwardFailedCount int64
  1321. TCPPortForwardFailedDuration time.Duration
  1322. TCPPortForwardRejectedDialingLimitCount int64
  1323. TCPPortForwardRejectedDisallowedCount int64
  1324. UDPPortForwardRejectedDisallowedCount int64
  1325. TCPIPv4PortForwardDialedCount int64
  1326. TCPIPv4PortForwardDialedDuration time.Duration
  1327. TCPIPv4PortForwardFailedCount int64
  1328. TCPIPv4PortForwardFailedDuration time.Duration
  1329. TCPIPv6PortForwardDialedCount int64
  1330. TCPIPv6PortForwardDialedDuration time.Duration
  1331. TCPIPv6PortForwardFailedCount int64
  1332. TCPIPv6PortForwardFailedDuration time.Duration
  1333. DNSCount map[string]int64
  1334. DNSDuration map[string]time.Duration
  1335. DNSFailedCount map[string]int64
  1336. DNSFailedDuration map[string]time.Duration
  1337. }
  1338. func newQualityMetrics() *qualityMetrics {
  1339. return &qualityMetrics{
  1340. DNSCount: make(map[string]int64),
  1341. DNSDuration: make(map[string]time.Duration),
  1342. DNSFailedCount: make(map[string]int64),
  1343. DNSFailedDuration: make(map[string]time.Duration),
  1344. }
  1345. }
  1346. func (q *qualityMetrics) reset() {
  1347. q.TCPPortForwardDialedCount = 0
  1348. q.TCPPortForwardDialedDuration = 0
  1349. q.TCPPortForwardFailedCount = 0
  1350. q.TCPPortForwardFailedDuration = 0
  1351. q.TCPPortForwardRejectedDialingLimitCount = 0
  1352. q.TCPPortForwardRejectedDisallowedCount = 0
  1353. q.UDPPortForwardRejectedDisallowedCount = 0
  1354. q.TCPIPv4PortForwardDialedCount = 0
  1355. q.TCPIPv4PortForwardDialedDuration = 0
  1356. q.TCPIPv4PortForwardFailedCount = 0
  1357. q.TCPIPv4PortForwardFailedDuration = 0
  1358. q.TCPIPv6PortForwardDialedCount = 0
  1359. q.TCPIPv6PortForwardDialedDuration = 0
  1360. q.TCPIPv6PortForwardFailedCount = 0
  1361. q.TCPIPv6PortForwardFailedDuration = 0
  1362. // Retain existing maps to avoid memory churn. The Go compiler optimizes map
  1363. // clearing operations of the following form.
  1364. for k := range q.DNSCount {
  1365. delete(q.DNSCount, k)
  1366. }
  1367. for k := range q.DNSDuration {
  1368. delete(q.DNSDuration, k)
  1369. }
  1370. for k := range q.DNSFailedCount {
  1371. delete(q.DNSFailedCount, k)
  1372. }
  1373. for k := range q.DNSFailedDuration {
  1374. delete(q.DNSFailedDuration, k)
  1375. }
  1376. }
  1377. type handshakeStateInfo struct {
  1378. activeAuthorizationIDs []string
  1379. authorizedAccessTypes []string
  1380. upstreamBytesPerSecond int64
  1381. downstreamBytesPerSecond int64
  1382. }
  1383. type handshakeState struct {
  1384. completed bool
  1385. apiProtocol string
  1386. apiParams common.APIParameters
  1387. activeAuthorizationIDs []string
  1388. authorizedAccessTypes []string
  1389. authorizationsRevoked bool
  1390. domainBytesChecksum []byte
  1391. establishedTunnelsCount int
  1392. splitTunnelLookup *splitTunnelLookup
  1393. deviceRegion string
  1394. }
  1395. type destinationBytesMetrics struct {
  1396. bytesUp int64
  1397. bytesDown int64
  1398. }
  1399. func (d *destinationBytesMetrics) UpdateProgress(
  1400. downstreamBytes, upstreamBytes, _ int64) {
  1401. // Concurrency: UpdateProgress may be called without holding the sshClient
  1402. // lock; all accesses to bytesUp/bytesDown must use atomic operations.
  1403. atomic.AddInt64(&d.bytesUp, upstreamBytes)
  1404. atomic.AddInt64(&d.bytesDown, downstreamBytes)
  1405. }
  1406. func (d *destinationBytesMetrics) getBytesUp() int64 {
  1407. return atomic.LoadInt64(&d.bytesUp)
  1408. }
  1409. func (d *destinationBytesMetrics) getBytesDown() int64 {
  1410. return atomic.LoadInt64(&d.bytesDown)
  1411. }
  1412. type splitTunnelLookup struct {
  1413. regions []string
  1414. regionsLookup map[string]bool
  1415. }
  1416. func newSplitTunnelLookup(
  1417. ownRegion string,
  1418. otherRegions []string) (*splitTunnelLookup, error) {
  1419. length := len(otherRegions)
  1420. if ownRegion != "" {
  1421. length += 1
  1422. }
  1423. // This length check is a sanity check and prevents clients shipping
  1424. // excessively long lists which could impact performance.
  1425. if length > 250 {
  1426. return nil, errors.Tracef("too many regions: %d", length)
  1427. }
  1428. // Create map lookups for lists where the number of values to compare
  1429. // against exceeds a threshold where benchmarks show maps are faster than
  1430. // looping through a slice. Otherwise use a slice for lookups. In both
  1431. // cases, the input slice is no longer referenced.
  1432. if length >= stringLookupThreshold {
  1433. regionsLookup := make(map[string]bool)
  1434. if ownRegion != "" {
  1435. regionsLookup[ownRegion] = true
  1436. }
  1437. for _, region := range otherRegions {
  1438. regionsLookup[region] = true
  1439. }
  1440. return &splitTunnelLookup{
  1441. regionsLookup: regionsLookup,
  1442. }, nil
  1443. } else {
  1444. regions := []string{}
  1445. if ownRegion != "" && !common.Contains(otherRegions, ownRegion) {
  1446. regions = append(regions, ownRegion)
  1447. }
  1448. // TODO: check for other duplicate regions?
  1449. regions = append(regions, otherRegions...)
  1450. return &splitTunnelLookup{
  1451. regions: regions,
  1452. }, nil
  1453. }
  1454. }
  1455. func (lookup *splitTunnelLookup) lookup(region string) bool {
  1456. if lookup.regionsLookup != nil {
  1457. return lookup.regionsLookup[region]
  1458. } else {
  1459. return common.Contains(lookup.regions, region)
  1460. }
  1461. }
  1462. func newSshClient(
  1463. sshServer *sshServer,
  1464. sshListener *sshListener,
  1465. tunnelProtocol string,
  1466. serverPacketManipulation string,
  1467. replayedServerPacketManipulation bool,
  1468. clientAddr net.Addr,
  1469. geoIPData GeoIPData) *sshClient {
  1470. runCtx, stopRunning := context.WithCancel(context.Background())
  1471. // isFirstTunnelInSession is defaulted to true so that the pre-handshake
  1472. // traffic rules won't apply UnthrottleFirstTunnelOnly and negate any
  1473. // unthrottled bytes during the initial protocol negotiation.
  1474. client := &sshClient{
  1475. sshServer: sshServer,
  1476. sshListener: sshListener,
  1477. tunnelProtocol: tunnelProtocol,
  1478. serverPacketManipulation: serverPacketManipulation,
  1479. replayedServerPacketManipulation: replayedServerPacketManipulation,
  1480. clientAddr: clientAddr,
  1481. geoIPData: geoIPData,
  1482. isFirstTunnelInSession: true,
  1483. qualityMetrics: newQualityMetrics(),
  1484. tcpPortForwardLRU: common.NewLRUConns(),
  1485. signalIssueSLOKs: make(chan struct{}, 1),
  1486. runCtx: runCtx,
  1487. stopRunning: stopRunning,
  1488. stopped: make(chan struct{}),
  1489. sendAlertRequests: make(chan protocol.AlertRequest, ALERT_REQUEST_QUEUE_BUFFER_SIZE),
  1490. sentAlertRequests: make(map[string]bool),
  1491. }
  1492. client.tcpTrafficState.availablePortForwardCond = sync.NewCond(new(sync.Mutex))
  1493. client.udpTrafficState.availablePortForwardCond = sync.NewCond(new(sync.Mutex))
  1494. return client
  1495. }
  1496. func (sshClient *sshClient) run(
  1497. baseConn net.Conn, onSSHHandshakeFinished func()) {
  1498. // When run returns, the client has fully stopped, with all SSH state torn
  1499. // down and no port forwards or API requests in progress.
  1500. defer close(sshClient.stopped)
  1501. // onSSHHandshakeFinished must be called even if the SSH handshake is aborted.
  1502. defer func() {
  1503. if onSSHHandshakeFinished != nil {
  1504. onSSHHandshakeFinished()
  1505. }
  1506. }()
  1507. // Set initial traffic rules, pre-handshake, based on currently known info.
  1508. sshClient.setTrafficRules()
  1509. conn := baseConn
  1510. // Wrap the base client connection with an ActivityMonitoredConn which will
  1511. // terminate the connection if no data is received before the deadline. This
  1512. // timeout is in effect for the entire duration of the SSH connection. Clients
  1513. // must actively use the connection or send SSH keep alive requests to keep
  1514. // the connection active. Writes are not considered reliable activity indicators
  1515. // due to buffering.
  1516. activityConn, err := common.NewActivityMonitoredConn(
  1517. conn,
  1518. SSH_CONNECTION_READ_DEADLINE,
  1519. false,
  1520. nil)
  1521. if err != nil {
  1522. conn.Close()
  1523. if !isExpectedTunnelIOError(err) {
  1524. log.WithTraceFields(LogFields{"error": err}).Error("NewActivityMonitoredConn failed")
  1525. }
  1526. return
  1527. }
  1528. conn = activityConn
  1529. // Further wrap the connection with burst monitoring, when enabled.
  1530. //
  1531. // Limitation: burst parameters are fixed for the duration of the tunnel
  1532. // and do not change after a tactics hot reload.
  1533. var burstConn *common.BurstMonitoredConn
  1534. p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.geoIPData)
  1535. if err != nil {
  1536. log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
  1537. "ServerTacticsParametersCache.Get failed")
  1538. return
  1539. }
  1540. if !p.IsNil() {
  1541. upstreamTargetBytes := int64(p.Int(parameters.ServerBurstUpstreamTargetBytes))
  1542. upstreamDeadline := p.Duration(parameters.ServerBurstUpstreamDeadline)
  1543. downstreamTargetBytes := int64(p.Int(parameters.ServerBurstDownstreamTargetBytes))
  1544. downstreamDeadline := p.Duration(parameters.ServerBurstDownstreamDeadline)
  1545. if (upstreamDeadline != 0 && upstreamTargetBytes != 0) ||
  1546. (downstreamDeadline != 0 && downstreamTargetBytes != 0) {
  1547. burstConn = common.NewBurstMonitoredConn(
  1548. conn,
  1549. true,
  1550. upstreamTargetBytes, upstreamDeadline,
  1551. downstreamTargetBytes, downstreamDeadline)
  1552. conn = burstConn
  1553. }
  1554. }
  1555. // Allow garbage collection.
  1556. p.Close()
  1557. // Further wrap the connection in a rate limiting ThrottledConn.
  1558. throttledConn := common.NewThrottledConn(conn, sshClient.rateLimits())
  1559. conn = throttledConn
  1560. // Replay of server-side parameters is set or extended after a new tunnel
  1561. // meets duration and bytes transferred targets. Set a timer now that expires
  1562. // shortly after the target duration. When the timer fires, check the time of
  1563. // last byte read (a read indicating a live connection with the client),
  1564. // along with total bytes transferred and set or extend replay if the targets
  1565. // are met.
  1566. //
  1567. // Both target checks are conservative: the tunnel may be healthy, but a byte
  1568. // may not have been read in the last second when the timer fires. Or bytes
  1569. // may be transferring, but not at the target level. Only clients that meet
  1570. // the strict targets at the single check time will trigger replay; however,
  1571. // this replay will impact all clients with similar GeoIP data.
  1572. //
  1573. // A deferred function cancels the timer and also increments the replay
  1574. // failure counter, which will ultimately clear replay parameters, when the
  1575. // tunnel fails before the API handshake is completed (this includes any
  1576. // liveness test).
  1577. //
  1578. // A tunnel which fails to meet the targets but successfully completes any
  1579. // liveness test and the API handshake is ignored in terms of replay scoring.
  1580. isReplayCandidate, replayWaitDuration, replayTargetDuration :=
  1581. sshClient.sshServer.support.ReplayCache.GetReplayTargetDuration(sshClient.geoIPData)
  1582. if isReplayCandidate {
  1583. getFragmentorSeed := func() *prng.Seed {
  1584. fragmentor, ok := baseConn.(common.FragmentorAccessor)
  1585. if ok {
  1586. fragmentorSeed, _ := fragmentor.GetReplay()
  1587. return fragmentorSeed
  1588. }
  1589. return nil
  1590. }
  1591. setReplayAfterFunc := time.AfterFunc(
  1592. replayWaitDuration,
  1593. func() {
  1594. if activityConn.GetActiveDuration() >= replayTargetDuration {
  1595. sshClient.Lock()
  1596. bytesUp := sshClient.tcpTrafficState.bytesUp + sshClient.udpTrafficState.bytesUp
  1597. bytesDown := sshClient.tcpTrafficState.bytesDown + sshClient.udpTrafficState.bytesDown
  1598. sshClient.Unlock()
  1599. sshClient.sshServer.support.ReplayCache.SetReplayParameters(
  1600. sshClient.tunnelProtocol,
  1601. sshClient.geoIPData,
  1602. sshClient.serverPacketManipulation,
  1603. getFragmentorSeed(),
  1604. bytesUp,
  1605. bytesDown)
  1606. }
  1607. })
  1608. defer func() {
  1609. setReplayAfterFunc.Stop()
  1610. completed, _ := sshClient.getHandshaked()
  1611. if !completed {
  1612. // Count a replay failure case when a tunnel used replay parameters
  1613. // (excluding OSSH fragmentation, which doesn't use the ReplayCache) and
  1614. // failed to complete the API handshake.
  1615. replayedFragmentation := false
  1616. if sshClient.tunnelProtocol != protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH {
  1617. fragmentor, ok := baseConn.(common.FragmentorAccessor)
  1618. if ok {
  1619. _, replayedFragmentation = fragmentor.GetReplay()
  1620. }
  1621. }
  1622. usedReplay := replayedFragmentation || sshClient.replayedServerPacketManipulation
  1623. if usedReplay {
  1624. sshClient.sshServer.support.ReplayCache.FailedReplayParameters(
  1625. sshClient.tunnelProtocol,
  1626. sshClient.geoIPData,
  1627. sshClient.serverPacketManipulation,
  1628. getFragmentorSeed())
  1629. }
  1630. }
  1631. }()
  1632. }
  1633. // Run the initial [obfuscated] SSH handshake in a goroutine so we can both
  1634. // respect shutdownBroadcast and implement a specific handshake timeout.
  1635. // The timeout is to reclaim network resources in case the handshake takes
  1636. // too long.
  1637. type sshNewServerConnResult struct {
  1638. obfuscatedSSHConn *obfuscator.ObfuscatedSSHConn
  1639. sshConn *ssh.ServerConn
  1640. channels <-chan ssh.NewChannel
  1641. requests <-chan *ssh.Request
  1642. err error
  1643. }
  1644. resultChannel := make(chan *sshNewServerConnResult, 2)
  1645. var sshHandshakeAfterFunc *time.Timer
  1646. if sshClient.sshServer.support.Config.sshHandshakeTimeout > 0 {
  1647. sshHandshakeAfterFunc = time.AfterFunc(sshClient.sshServer.support.Config.sshHandshakeTimeout, func() {
  1648. resultChannel <- &sshNewServerConnResult{err: std_errors.New("ssh handshake timeout")}
  1649. })
  1650. }
  1651. go func(baseConn, conn net.Conn) {
  1652. sshServerConfig := &ssh.ServerConfig{
  1653. PasswordCallback: sshClient.passwordCallback,
  1654. AuthLogCallback: sshClient.authLogCallback,
  1655. ServerVersion: sshClient.sshServer.support.Config.SSHServerVersion,
  1656. }
  1657. sshServerConfig.AddHostKey(sshClient.sshServer.sshHostKey)
  1658. var err error
  1659. if protocol.TunnelProtocolUsesObfuscatedSSH(sshClient.tunnelProtocol) {
  1660. // With Encrypt-then-MAC hash algorithms, packet length is
  1661. // transmitted in plaintext, which aids in traffic analysis;
  1662. // clients may still send Encrypt-then-MAC algorithms in their
  1663. // KEX_INIT message, but do not select these algorithms.
  1664. //
  1665. // The exception is TUNNEL_PROTOCOL_SSH, which is intended to appear
  1666. // like SSH on the wire.
  1667. sshServerConfig.NoEncryptThenMACHash = true
  1668. } else {
  1669. // For TUNNEL_PROTOCOL_SSH only, randomize KEX.
  1670. if sshClient.sshServer.support.Config.ObfuscatedSSHKey != "" {
  1671. sshServerConfig.KEXPRNGSeed, err = protocol.DeriveSSHServerKEXPRNGSeed(
  1672. sshClient.sshServer.support.Config.ObfuscatedSSHKey)
  1673. if err != nil {
  1674. err = errors.Trace(err)
  1675. }
  1676. }
  1677. }
  1678. result := &sshNewServerConnResult{}
  1679. // Wrap the connection in an SSH deobfuscator when required.
  1680. if err == nil && protocol.TunnelProtocolUsesObfuscatedSSH(sshClient.tunnelProtocol) {
  1681. p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.geoIPData)
  1682. // Log error, but continue. A default prefix spec will be used by the server.
  1683. if err != nil {
  1684. log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
  1685. "ServerTacticsParametersCache.Get failed")
  1686. }
  1687. var osshPrefixEnableFragmentor bool = false
  1688. var serverOsshPrefixSpecs transforms.Specs = nil
  1689. var minDelay, maxDelay time.Duration
  1690. if !p.IsNil() {
  1691. osshPrefixEnableFragmentor = p.Bool(parameters.OSSHPrefixEnableFragmentor)
  1692. serverOsshPrefixSpecs = p.ProtocolTransformSpecs(parameters.ServerOSSHPrefixSpecs)
  1693. minDelay = p.Duration(parameters.OSSHPrefixSplitMinDelay)
  1694. maxDelay = p.Duration(parameters.OSSHPrefixSplitMaxDelay)
  1695. // Allow garbage collection.
  1696. p.Close()
  1697. }
  1698. // Note: NewServerObfuscatedSSHConn blocks on network I/O
  1699. // TODO: ensure this won't block shutdown
  1700. result.obfuscatedSSHConn, err = obfuscator.NewServerObfuscatedSSHConn(
  1701. conn,
  1702. sshClient.sshServer.support.Config.ObfuscatedSSHKey,
  1703. sshClient.sshServer.obfuscatorSeedHistory,
  1704. serverOsshPrefixSpecs,
  1705. func(clientIP string, err error, logFields common.LogFields) {
  1706. logIrregularTunnel(
  1707. sshClient.sshServer.support,
  1708. sshClient.sshListener.tunnelProtocol,
  1709. sshClient.sshListener.port,
  1710. clientIP,
  1711. errors.Trace(err),
  1712. LogFields(logFields))
  1713. })
  1714. if err != nil {
  1715. err = errors.Trace(err)
  1716. } else {
  1717. conn = result.obfuscatedSSHConn
  1718. }
  1719. // Set the OSSH prefix split config.
  1720. if err == nil && result.obfuscatedSSHConn.IsOSSHPrefixStream() {
  1721. err = result.obfuscatedSSHConn.SetOSSHPrefixSplitConfig(minDelay, maxDelay)
  1722. // Log error, but continue.
  1723. if err != nil {
  1724. log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
  1725. "SetOSSHPrefixSplitConfig failed")
  1726. }
  1727. }
  1728. // Seed the fragmentor, when present, with seed derived from initial
  1729. // obfuscator message. See tactics.Listener.Accept. This must preceed
  1730. // ssh.NewServerConn to ensure fragmentor is seeded before downstream bytes
  1731. // are written.
  1732. if err == nil && sshClient.tunnelProtocol == protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH {
  1733. fragmentor, ok := baseConn.(common.FragmentorAccessor)
  1734. if ok {
  1735. var fragmentorPRNG *prng.PRNG
  1736. fragmentorPRNG, err = result.obfuscatedSSHConn.GetDerivedPRNG("server-side-fragmentor")
  1737. if err != nil {
  1738. err = errors.Trace(err)
  1739. } else {
  1740. fragmentor.SetReplay(fragmentorPRNG)
  1741. }
  1742. // Stops the fragmentor if disabled for prefixed OSSH streams.
  1743. if !osshPrefixEnableFragmentor && result.obfuscatedSSHConn.IsOSSHPrefixStream() {
  1744. fragmentor.StopFragmenting()
  1745. }
  1746. }
  1747. }
  1748. }
  1749. if err == nil {
  1750. result.sshConn, result.channels, result.requests, err =
  1751. ssh.NewServerConn(conn, sshServerConfig)
  1752. if err != nil {
  1753. err = errors.Trace(err)
  1754. }
  1755. }
  1756. result.err = err
  1757. resultChannel <- result
  1758. }(baseConn, conn)
  1759. var result *sshNewServerConnResult
  1760. select {
  1761. case result = <-resultChannel:
  1762. case <-sshClient.sshServer.shutdownBroadcast:
  1763. // Close() will interrupt an ongoing handshake
  1764. // TODO: wait for SSH handshake goroutines to exit before returning?
  1765. conn.Close()
  1766. return
  1767. }
  1768. if sshHandshakeAfterFunc != nil {
  1769. sshHandshakeAfterFunc.Stop()
  1770. }
  1771. if result.err != nil {
  1772. conn.Close()
  1773. // This is a Debug log due to noise. The handshake often fails due to I/O
  1774. // errors as clients frequently interrupt connections in progress when
  1775. // client-side load balancing completes a connection to a different server.
  1776. log.WithTraceFields(LogFields{"error": result.err}).Debug("SSH handshake failed")
  1777. return
  1778. }
  1779. // The SSH handshake has finished successfully; notify now to allow other
  1780. // blocked SSH handshakes to proceed.
  1781. if onSSHHandshakeFinished != nil {
  1782. onSSHHandshakeFinished()
  1783. }
  1784. onSSHHandshakeFinished = nil
  1785. sshClient.Lock()
  1786. sshClient.sshConn = result.sshConn
  1787. sshClient.throttledConn = throttledConn
  1788. sshClient.Unlock()
  1789. if !sshClient.sshServer.registerEstablishedClient(sshClient) {
  1790. conn.Close()
  1791. log.WithTrace().Warning("register failed")
  1792. return
  1793. }
  1794. sshClient.runTunnel(result.channels, result.requests)
  1795. // Note: sshServer.unregisterEstablishedClient calls sshClient.stop(),
  1796. // which also closes underlying transport Conn.
  1797. sshClient.sshServer.unregisterEstablishedClient(sshClient)
  1798. // Log tunnel metrics.
  1799. var additionalMetrics []LogFields
  1800. // Add activity and burst metrics.
  1801. //
  1802. // The reported duration is based on last confirmed data transfer, which for
  1803. // sshClient.activityConn.GetActiveDuration() is time of last read byte and
  1804. // not conn close time. This is important for protocols such as meek. For
  1805. // meek, the connection remains open until the HTTP session expires, which
  1806. // may be some time after the tunnel has closed. (The meek protocol has no
  1807. // allowance for signalling payload EOF, and even if it did the client may
  1808. // not have the opportunity to send a final request with an EOF flag set.)
  1809. activityMetrics := make(LogFields)
  1810. activityMetrics["start_time"] = activityConn.GetStartTime()
  1811. activityMetrics["duration"] = int64(activityConn.GetActiveDuration() / time.Millisecond)
  1812. additionalMetrics = append(additionalMetrics, activityMetrics)
  1813. if burstConn != nil {
  1814. // Any outstanding burst should be recorded by burstConn.Close which should
  1815. // be called by unregisterEstablishedClient.
  1816. additionalMetrics = append(
  1817. additionalMetrics, LogFields(burstConn.GetMetrics(activityConn.GetStartTime())))
  1818. }
  1819. // Some conns report additional metrics. Meek conns report resiliency
  1820. // metrics and fragmentor.Conns report fragmentor configs.
  1821. if metricsSource, ok := baseConn.(common.MetricsSource); ok {
  1822. additionalMetrics = append(
  1823. additionalMetrics, LogFields(metricsSource.GetMetrics()))
  1824. }
  1825. if result.obfuscatedSSHConn != nil {
  1826. additionalMetrics = append(
  1827. additionalMetrics, LogFields(result.obfuscatedSSHConn.GetMetrics()))
  1828. }
  1829. // Add server-replay metrics.
  1830. replayMetrics := make(LogFields)
  1831. replayedFragmentation := false
  1832. fragmentor, ok := baseConn.(common.FragmentorAccessor)
  1833. if ok {
  1834. _, replayedFragmentation = fragmentor.GetReplay()
  1835. }
  1836. replayMetrics["server_replay_fragmentation"] = replayedFragmentation
  1837. replayMetrics["server_replay_packet_manipulation"] = sshClient.replayedServerPacketManipulation
  1838. additionalMetrics = append(additionalMetrics, replayMetrics)
  1839. // Limitation: there's only one log per tunnel with bytes transferred
  1840. // metrics, so the byte count can't be attributed to a certain day for
  1841. // tunnels that remain connected for well over 24h. In practise, most
  1842. // tunnels are short-lived, especially on mobile devices.
  1843. sshClient.logTunnel(additionalMetrics)
  1844. // Transfer OSL seed state -- the OSL progress -- from the closing
  1845. // client to the session cache so the client can resume its progress
  1846. // if it reconnects to this same server.
  1847. // Note: following setOSLConfig order of locking.
  1848. sshClient.Lock()
  1849. if sshClient.oslClientSeedState != nil {
  1850. sshClient.sshServer.oslSessionCacheMutex.Lock()
  1851. sshClient.oslClientSeedState.Hibernate()
  1852. sshClient.sshServer.oslSessionCache.Set(
  1853. sshClient.sessionID, sshClient.oslClientSeedState, cache.DefaultExpiration)
  1854. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  1855. sshClient.oslClientSeedState = nil
  1856. }
  1857. sshClient.Unlock()
  1858. // Initiate cleanup of the GeoIP session cache. To allow for post-tunnel
  1859. // final status requests, the lifetime of cached GeoIP records exceeds the
  1860. // lifetime of the sshClient.
  1861. sshClient.sshServer.support.GeoIPService.MarkSessionCacheToExpire(sshClient.sessionID)
  1862. }
  1863. func (sshClient *sshClient) passwordCallback(conn ssh.ConnMetadata, password []byte) (*ssh.Permissions, error) {
  1864. expectedSessionIDLength := 2 * protocol.PSIPHON_API_CLIENT_SESSION_ID_LENGTH
  1865. expectedSSHPasswordLength := 2 * SSH_PASSWORD_BYTE_LENGTH
  1866. var sshPasswordPayload protocol.SSHPasswordPayload
  1867. err := json.Unmarshal(password, &sshPasswordPayload)
  1868. if err != nil {
  1869. // Backwards compatibility case: instead of a JSON payload, older clients
  1870. // send the hex encoded session ID prepended to the SSH password.
  1871. // Note: there's an even older case where clients don't send any session ID,
  1872. // but that's no longer supported.
  1873. if len(password) == expectedSessionIDLength+expectedSSHPasswordLength {
  1874. sshPasswordPayload.SessionId = string(password[0:expectedSessionIDLength])
  1875. sshPasswordPayload.SshPassword = string(password[expectedSessionIDLength:])
  1876. } else {
  1877. return nil, errors.Tracef("invalid password payload for %q", conn.User())
  1878. }
  1879. }
  1880. if !isHexDigits(sshClient.sshServer.support.Config, sshPasswordPayload.SessionId) ||
  1881. len(sshPasswordPayload.SessionId) != expectedSessionIDLength {
  1882. return nil, errors.Tracef("invalid session ID for %q", conn.User())
  1883. }
  1884. userOk := (subtle.ConstantTimeCompare(
  1885. []byte(conn.User()), []byte(sshClient.sshServer.support.Config.SSHUserName)) == 1)
  1886. passwordOk := (subtle.ConstantTimeCompare(
  1887. []byte(sshPasswordPayload.SshPassword), []byte(sshClient.sshServer.support.Config.SSHPassword)) == 1)
  1888. if !userOk || !passwordOk {
  1889. return nil, errors.Tracef("invalid password for %q", conn.User())
  1890. }
  1891. sessionID := sshPasswordPayload.SessionId
  1892. // The GeoIP session cache will be populated if there was a previous tunnel
  1893. // with this session ID. This will be true up to GEOIP_SESSION_CACHE_TTL, which
  1894. // is currently much longer than the OSL session cache, another option to use if
  1895. // the GeoIP session cache is retired (the GeoIP session cache currently only
  1896. // supports legacy use cases).
  1897. isFirstTunnelInSession := !sshClient.sshServer.support.GeoIPService.InSessionCache(sessionID)
  1898. supportsServerRequests := common.Contains(
  1899. sshPasswordPayload.ClientCapabilities, protocol.CLIENT_CAPABILITY_SERVER_REQUESTS)
  1900. sshClient.Lock()
  1901. // After this point, these values are read-only as they are read
  1902. // without obtaining sshClient.Lock.
  1903. sshClient.sessionID = sessionID
  1904. sshClient.isFirstTunnelInSession = isFirstTunnelInSession
  1905. sshClient.supportsServerRequests = supportsServerRequests
  1906. geoIPData := sshClient.geoIPData
  1907. sshClient.Unlock()
  1908. // Store the GeoIP data associated with the session ID. This makes
  1909. // the GeoIP data available to the web server for web API requests.
  1910. // A cache that's distinct from the sshClient record is used to allow
  1911. // for or post-tunnel final status requests.
  1912. // If the client is reconnecting with the same session ID, this call
  1913. // will undo the expiry set by MarkSessionCacheToExpire.
  1914. sshClient.sshServer.support.GeoIPService.SetSessionCache(sessionID, geoIPData)
  1915. return nil, nil
  1916. }
  1917. func (sshClient *sshClient) authLogCallback(conn ssh.ConnMetadata, method string, err error) {
  1918. if err != nil {
  1919. if method == "none" && err.Error() == "ssh: no auth passed yet" {
  1920. // In this case, the callback invocation is noise from auth negotiation
  1921. return
  1922. }
  1923. // Note: here we previously logged messages for fail2ban to act on. This is no longer
  1924. // done as the complexity outweighs the benefits.
  1925. //
  1926. // - The SSH credential is not secret -- it's in the server entry. Attackers targeting
  1927. // the server likely already have the credential. On the other hand, random scanning and
  1928. // brute forcing is mitigated with high entropy random passwords, rate limiting
  1929. // (implemented on the host via iptables), and limited capabilities (the SSH session can
  1930. // only port forward).
  1931. //
  1932. // - fail2ban coverage was inconsistent; in the case of an unfronted meek protocol through
  1933. // an upstream proxy, the remote address is the upstream proxy, which should not be blocked.
  1934. // The X-Forwarded-For header cant be used instead as it may be forged and used to get IPs
  1935. // deliberately blocked; and in any case fail2ban adds iptables rules which can only block
  1936. // by direct remote IP, not by original client IP. Fronted meek has the same iptables issue.
  1937. //
  1938. // Random scanning and brute forcing of port 22 will result in log noise. To mitigate this,
  1939. // not every authentication failure is logged. A summary log is emitted periodically to
  1940. // retain some record of this activity in case this is relevant to, e.g., a performance
  1941. // investigation.
  1942. atomic.AddInt64(&sshClient.sshServer.authFailedCount, 1)
  1943. lastAuthLog := monotime.Time(atomic.LoadInt64(&sshClient.sshServer.lastAuthLog))
  1944. if monotime.Since(lastAuthLog) > SSH_AUTH_LOG_PERIOD {
  1945. now := int64(monotime.Now())
  1946. if atomic.CompareAndSwapInt64(&sshClient.sshServer.lastAuthLog, int64(lastAuthLog), now) {
  1947. count := atomic.SwapInt64(&sshClient.sshServer.authFailedCount, 0)
  1948. log.WithTraceFields(
  1949. LogFields{"lastError": err, "failedCount": count}).Warning("authentication failures")
  1950. }
  1951. }
  1952. log.WithTraceFields(LogFields{"error": err, "method": method}).Debug("authentication failed")
  1953. } else {
  1954. log.WithTraceFields(LogFields{"error": err, "method": method}).Debug("authentication success")
  1955. }
  1956. }
  1957. // stop signals the ssh connection to shutdown. After sshConn.Wait returns,
  1958. // the SSH connection has terminated but sshClient.run may still be running and
  1959. // in the process of exiting.
  1960. //
  1961. // The shutdown process must complete rapidly and not, e.g., block on network
  1962. // I/O, as newly connecting clients need to await stop completion of any
  1963. // existing connection that shares the same session ID.
  1964. func (sshClient *sshClient) stop() {
  1965. sshClient.sshConn.Close()
  1966. sshClient.sshConn.Wait()
  1967. }
  1968. // awaitStopped will block until sshClient.run has exited, at which point all
  1969. // worker goroutines associated with the sshClient, including any in-flight
  1970. // API handlers, will have exited.
  1971. func (sshClient *sshClient) awaitStopped() {
  1972. <-sshClient.stopped
  1973. }
  1974. // runTunnel handles/dispatches new channels and new requests from the client.
  1975. // When the SSH client connection closes, both the channels and requests channels
  1976. // will close and runTunnel will exit.
  1977. func (sshClient *sshClient) runTunnel(
  1978. channels <-chan ssh.NewChannel,
  1979. requests <-chan *ssh.Request) {
  1980. waitGroup := new(sync.WaitGroup)
  1981. // Start client SSH API request handler
  1982. waitGroup.Add(1)
  1983. go func() {
  1984. defer waitGroup.Done()
  1985. sshClient.handleSSHRequests(requests)
  1986. }()
  1987. // Start request senders
  1988. if sshClient.supportsServerRequests {
  1989. waitGroup.Add(1)
  1990. go func() {
  1991. defer waitGroup.Done()
  1992. sshClient.runOSLSender()
  1993. }()
  1994. waitGroup.Add(1)
  1995. go func() {
  1996. defer waitGroup.Done()
  1997. sshClient.runAlertSender()
  1998. }()
  1999. }
  2000. // Start the TCP port forward manager
  2001. // The queue size is set to the traffic rules (MaxTCPPortForwardCount +
  2002. // MaxTCPDialingPortForwardCount), which is a reasonable indication of resource
  2003. // limits per client; when that value is not set, a default is used.
  2004. // A limitation: this queue size is set once and doesn't change, for this client,
  2005. // when traffic rules are reloaded.
  2006. queueSize := sshClient.getTCPPortForwardQueueSize()
  2007. if queueSize == 0 {
  2008. queueSize = SSH_TCP_PORT_FORWARD_QUEUE_SIZE
  2009. }
  2010. newTCPPortForwards := make(chan *newTCPPortForward, queueSize)
  2011. waitGroup.Add(1)
  2012. go func() {
  2013. defer waitGroup.Done()
  2014. sshClient.handleTCPPortForwards(waitGroup, newTCPPortForwards)
  2015. }()
  2016. // Handle new channel (port forward) requests from the client.
  2017. for newChannel := range channels {
  2018. switch newChannel.ChannelType() {
  2019. case protocol.RANDOM_STREAM_CHANNEL_TYPE:
  2020. sshClient.handleNewRandomStreamChannel(waitGroup, newChannel)
  2021. case protocol.PACKET_TUNNEL_CHANNEL_TYPE:
  2022. sshClient.handleNewPacketTunnelChannel(waitGroup, newChannel)
  2023. case protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE:
  2024. // The protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE is the same as
  2025. // "direct-tcpip", except split tunnel channel rejections are disallowed
  2026. // even if the client has enabled split tunnel. This channel type allows
  2027. // the client to ensure tunneling for certain cases while split tunnel is
  2028. // enabled.
  2029. sshClient.handleNewTCPPortForwardChannel(waitGroup, newChannel, false, newTCPPortForwards)
  2030. case "direct-tcpip":
  2031. sshClient.handleNewTCPPortForwardChannel(waitGroup, newChannel, true, newTCPPortForwards)
  2032. default:
  2033. sshClient.rejectNewChannel(newChannel,
  2034. fmt.Sprintf("unknown or unsupported channel type: %s", newChannel.ChannelType()))
  2035. }
  2036. }
  2037. // The channel loop is interrupted by a client
  2038. // disconnect or by calling sshClient.stop().
  2039. // Stop the TCP port forward manager
  2040. close(newTCPPortForwards)
  2041. // Stop all other worker goroutines
  2042. sshClient.stopRunning()
  2043. if sshClient.sshServer.support.Config.RunPacketTunnel {
  2044. // PacketTunnelServer.ClientDisconnected stops packet tunnel workers.
  2045. sshClient.sshServer.support.PacketTunnelServer.ClientDisconnected(
  2046. sshClient.sessionID)
  2047. }
  2048. waitGroup.Wait()
  2049. sshClient.cleanupAuthorizations()
  2050. }
  2051. func (sshClient *sshClient) handleSSHRequests(requests <-chan *ssh.Request) {
  2052. for request := range requests {
  2053. // Requests are processed serially; API responses must be sent in request order.
  2054. var responsePayload []byte
  2055. var err error
  2056. if request.Type == "[email protected]" {
  2057. // SSH keep alive round trips are used as speed test samples.
  2058. responsePayload, err = tactics.MakeSpeedTestResponse(
  2059. SSH_KEEP_ALIVE_PAYLOAD_MIN_BYTES, SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES)
  2060. } else {
  2061. // All other requests are assumed to be API requests.
  2062. sshClient.Lock()
  2063. authorizedAccessTypes := sshClient.handshakeState.authorizedAccessTypes
  2064. sshClient.Unlock()
  2065. // Note: unlock before use is only safe as long as referenced sshClient data,
  2066. // such as slices in handshakeState, is read-only after initially set.
  2067. clientAddr := ""
  2068. if sshClient.clientAddr != nil {
  2069. clientAddr = sshClient.clientAddr.String()
  2070. }
  2071. responsePayload, err = sshAPIRequestHandler(
  2072. sshClient.sshServer.support,
  2073. clientAddr,
  2074. sshClient.geoIPData,
  2075. authorizedAccessTypes,
  2076. request.Type,
  2077. request.Payload)
  2078. }
  2079. if err == nil {
  2080. err = request.Reply(true, responsePayload)
  2081. } else {
  2082. log.WithTraceFields(LogFields{"error": err}).Warning("request failed")
  2083. err = request.Reply(false, nil)
  2084. }
  2085. if err != nil {
  2086. if !isExpectedTunnelIOError(err) {
  2087. log.WithTraceFields(LogFields{"error": err}).Warning("response failed")
  2088. }
  2089. }
  2090. }
  2091. }
  2092. type newTCPPortForward struct {
  2093. enqueueTime time.Time
  2094. hostToConnect string
  2095. portToConnect int
  2096. doSplitTunnel bool
  2097. newChannel ssh.NewChannel
  2098. }
  2099. func (sshClient *sshClient) handleTCPPortForwards(
  2100. waitGroup *sync.WaitGroup,
  2101. newTCPPortForwards chan *newTCPPortForward) {
  2102. // Lifecycle of a TCP port forward:
  2103. //
  2104. // 1. A "direct-tcpip" SSH request is received from the client.
  2105. //
  2106. // A new TCP port forward request is enqueued. The queue delivers TCP port
  2107. // forward requests to the TCP port forward manager, which enforces the TCP
  2108. // port forward dial limit.
  2109. //
  2110. // Enqueuing new requests allows for reading further SSH requests from the
  2111. // client without blocking when the dial limit is hit; this is to permit new
  2112. // UDP/udpgw port forwards to be restablished without delay. The maximum size
  2113. // of the queue enforces a hard cap on resources consumed by a client in the
  2114. // pre-dial phase. When the queue is full, new TCP port forwards are
  2115. // immediately rejected.
  2116. //
  2117. // 2. The TCP port forward manager dequeues the request.
  2118. //
  2119. // The manager calls dialingTCPPortForward(), which increments
  2120. // concurrentDialingPortForwardCount, and calls
  2121. // isTCPDialingPortForwardLimitExceeded() to check the concurrent dialing
  2122. // count.
  2123. //
  2124. // The manager enforces the concurrent TCP dial limit: when at the limit, the
  2125. // manager blocks waiting for the number of dials to drop below the limit before
  2126. // dispatching the request to handleTCPPortForward(), which will run in its own
  2127. // goroutine and will dial and relay the port forward.
  2128. //
  2129. // The block delays the current request and also halts dequeuing of subsequent
  2130. // requests and could ultimately cause requests to be immediately rejected if
  2131. // the queue fills. These actions are intended to apply back pressure when
  2132. // upstream network resources are impaired.
  2133. //
  2134. // The time spent in the queue is deducted from the port forward's dial timeout.
  2135. // The time spent blocking while at the dial limit is similarly deducted from
  2136. // the dial timeout. If the dial timeout has expired before the dial begins, the
  2137. // port forward is rejected and a stat is recorded.
  2138. //
  2139. // 3. handleTCPPortForward() performs the port forward dial and relaying.
  2140. //
  2141. // a. Dial the target, using the dial timeout remaining after queue and blocking
  2142. // time is deducted.
  2143. //
  2144. // b. If the dial fails, call abortedTCPPortForward() to decrement
  2145. // concurrentDialingPortForwardCount, freeing up a dial slot.
  2146. //
  2147. // c. If the dial succeeds, call establishedPortForward(), which decrements
  2148. // concurrentDialingPortForwardCount and increments concurrentPortForwardCount,
  2149. // the "established" port forward count.
  2150. //
  2151. // d. Check isPortForwardLimitExceeded(), which enforces the configurable limit on
  2152. // concurrentPortForwardCount, the number of _established_ TCP port forwards.
  2153. // If the limit is exceeded, the LRU established TCP port forward is closed and
  2154. // the newly established TCP port forward proceeds. This LRU logic allows some
  2155. // dangling resource consumption (e.g., TIME_WAIT) while providing a better
  2156. // experience for clients.
  2157. //
  2158. // e. Relay data.
  2159. //
  2160. // f. Call closedPortForward() which decrements concurrentPortForwardCount and
  2161. // records bytes transferred.
  2162. for newPortForward := range newTCPPortForwards {
  2163. remainingDialTimeout :=
  2164. time.Duration(sshClient.getDialTCPPortForwardTimeoutMilliseconds())*time.Millisecond -
  2165. time.Since(newPortForward.enqueueTime)
  2166. if remainingDialTimeout <= 0 {
  2167. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  2168. sshClient.rejectNewChannel(
  2169. newPortForward.newChannel, "TCP port forward timed out in queue")
  2170. continue
  2171. }
  2172. // Reserve a TCP dialing slot.
  2173. //
  2174. // TOCTOU note: important to increment counts _before_ checking limits; otherwise,
  2175. // the client could potentially consume excess resources by initiating many port
  2176. // forwards concurrently.
  2177. sshClient.dialingTCPPortForward()
  2178. // When max dials are in progress, wait up to remainingDialTimeout for dialing
  2179. // to become available. This blocks all dequeing.
  2180. if sshClient.isTCPDialingPortForwardLimitExceeded() {
  2181. blockStartTime := time.Now()
  2182. ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  2183. sshClient.setTCPPortForwardDialingAvailableSignal(cancelCtx)
  2184. <-ctx.Done()
  2185. sshClient.setTCPPortForwardDialingAvailableSignal(nil)
  2186. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  2187. remainingDialTimeout -= time.Since(blockStartTime)
  2188. }
  2189. if remainingDialTimeout <= 0 {
  2190. // Release the dialing slot here since handleTCPChannel() won't be called.
  2191. sshClient.abortedTCPPortForward()
  2192. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  2193. sshClient.rejectNewChannel(
  2194. newPortForward.newChannel, "TCP port forward timed out before dialing")
  2195. continue
  2196. }
  2197. // Dial and relay the TCP port forward. handleTCPChannel is run in its own worker goroutine.
  2198. // handleTCPChannel will release the dialing slot reserved by dialingTCPPortForward(); and
  2199. // will deal with remainingDialTimeout <= 0.
  2200. waitGroup.Add(1)
  2201. go func(remainingDialTimeout time.Duration, newPortForward *newTCPPortForward) {
  2202. defer waitGroup.Done()
  2203. sshClient.handleTCPChannel(
  2204. remainingDialTimeout,
  2205. newPortForward.hostToConnect,
  2206. newPortForward.portToConnect,
  2207. newPortForward.doSplitTunnel,
  2208. newPortForward.newChannel)
  2209. }(remainingDialTimeout, newPortForward)
  2210. }
  2211. }
  2212. func (sshClient *sshClient) handleNewRandomStreamChannel(
  2213. waitGroup *sync.WaitGroup, newChannel ssh.NewChannel) {
  2214. // A random stream channel returns the requested number of bytes -- random
  2215. // bytes -- to the client while also consuming and discarding bytes sent
  2216. // by the client.
  2217. //
  2218. // One use case for the random stream channel is a liveness test that the
  2219. // client performs to confirm that the tunnel is live. As the liveness
  2220. // test is performed in the concurrent establishment phase, before
  2221. // selecting a single candidate for handshake, the random stream channel
  2222. // is available pre-handshake, albeit with additional restrictions.
  2223. //
  2224. // The random stream is subject to throttling in traffic rules; for
  2225. // unthrottled liveness tests, set EstablishmentRead/WriteBytesPerSecond as
  2226. // required. The random stream maximum count and response size cap mitigate
  2227. // clients abusing the facility to waste server resources.
  2228. //
  2229. // Like all other channels, this channel type is handled asynchronously,
  2230. // so it's possible to run at any point in the tunnel lifecycle.
  2231. //
  2232. // Up/downstream byte counts don't include SSH packet and request
  2233. // marshalling overhead.
  2234. var request protocol.RandomStreamRequest
  2235. err := json.Unmarshal(newChannel.ExtraData(), &request)
  2236. if err != nil {
  2237. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("invalid request: %s", err))
  2238. return
  2239. }
  2240. if request.UpstreamBytes > RANDOM_STREAM_MAX_BYTES {
  2241. sshClient.rejectNewChannel(newChannel,
  2242. fmt.Sprintf("invalid upstream bytes: %d", request.UpstreamBytes))
  2243. return
  2244. }
  2245. if request.DownstreamBytes > RANDOM_STREAM_MAX_BYTES {
  2246. sshClient.rejectNewChannel(newChannel,
  2247. fmt.Sprintf("invalid downstream bytes: %d", request.DownstreamBytes))
  2248. return
  2249. }
  2250. var metrics *randomStreamMetrics
  2251. sshClient.Lock()
  2252. if !sshClient.handshakeState.completed {
  2253. metrics = &sshClient.preHandshakeRandomStreamMetrics
  2254. } else {
  2255. metrics = &sshClient.postHandshakeRandomStreamMetrics
  2256. }
  2257. countOk := true
  2258. if !sshClient.handshakeState.completed &&
  2259. metrics.count >= PRE_HANDSHAKE_RANDOM_STREAM_MAX_COUNT {
  2260. countOk = false
  2261. } else {
  2262. metrics.count++
  2263. }
  2264. sshClient.Unlock()
  2265. if !countOk {
  2266. sshClient.rejectNewChannel(newChannel, "max count exceeded")
  2267. return
  2268. }
  2269. channel, requests, err := newChannel.Accept()
  2270. if err != nil {
  2271. if !isExpectedTunnelIOError(err) {
  2272. log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")
  2273. }
  2274. return
  2275. }
  2276. go ssh.DiscardRequests(requests)
  2277. waitGroup.Add(1)
  2278. go func() {
  2279. defer waitGroup.Done()
  2280. upstream := new(sync.WaitGroup)
  2281. received := 0
  2282. sent := 0
  2283. if request.UpstreamBytes > 0 {
  2284. // Process streams concurrently to minimize elapsed time. This also
  2285. // avoids a unidirectional flow burst early in the tunnel lifecycle.
  2286. upstream.Add(1)
  2287. go func() {
  2288. defer upstream.Done()
  2289. n, err := io.CopyN(ioutil.Discard, channel, int64(request.UpstreamBytes))
  2290. received = int(n)
  2291. if err != nil {
  2292. if !isExpectedTunnelIOError(err) {
  2293. log.WithTraceFields(LogFields{"error": err}).Warning("receive failed")
  2294. }
  2295. }
  2296. }()
  2297. }
  2298. if request.DownstreamBytes > 0 {
  2299. n, err := io.CopyN(channel, rand.Reader, int64(request.DownstreamBytes))
  2300. sent = int(n)
  2301. if err != nil {
  2302. if !isExpectedTunnelIOError(err) {
  2303. log.WithTraceFields(LogFields{"error": err}).Warning("send failed")
  2304. }
  2305. }
  2306. }
  2307. upstream.Wait()
  2308. sshClient.Lock()
  2309. metrics.upstreamBytes += int64(request.UpstreamBytes)
  2310. metrics.receivedUpstreamBytes += int64(received)
  2311. metrics.downstreamBytes += int64(request.DownstreamBytes)
  2312. metrics.sentDownstreamBytes += int64(sent)
  2313. sshClient.Unlock()
  2314. channel.Close()
  2315. }()
  2316. }
  2317. func (sshClient *sshClient) handleNewPacketTunnelChannel(
  2318. waitGroup *sync.WaitGroup, newChannel ssh.NewChannel) {
  2319. // packet tunnel channels are handled by the packet tunnel server
  2320. // component. Each client may have at most one packet tunnel channel.
  2321. if !sshClient.sshServer.support.Config.RunPacketTunnel {
  2322. sshClient.rejectNewChannel(newChannel, "unsupported packet tunnel channel type")
  2323. return
  2324. }
  2325. // Accept this channel immediately. This channel will replace any
  2326. // previously existing packet tunnel channel for this client.
  2327. packetTunnelChannel, requests, err := newChannel.Accept()
  2328. if err != nil {
  2329. if !isExpectedTunnelIOError(err) {
  2330. log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")
  2331. }
  2332. return
  2333. }
  2334. go ssh.DiscardRequests(requests)
  2335. sshClient.setPacketTunnelChannel(packetTunnelChannel)
  2336. // PacketTunnelServer will run the client's packet tunnel. If necessary, ClientConnected
  2337. // will stop packet tunnel workers for any previous packet tunnel channel.
  2338. checkAllowedTCPPortFunc := func(upstreamIPAddress net.IP, port int) bool {
  2339. return sshClient.isPortForwardPermitted(portForwardTypeTCP, upstreamIPAddress, port)
  2340. }
  2341. checkAllowedUDPPortFunc := func(upstreamIPAddress net.IP, port int) bool {
  2342. return sshClient.isPortForwardPermitted(portForwardTypeUDP, upstreamIPAddress, port)
  2343. }
  2344. checkAllowedDomainFunc := func(domain string) bool {
  2345. ok, _ := sshClient.isDomainPermitted(domain)
  2346. return ok
  2347. }
  2348. flowActivityUpdaterMaker := func(
  2349. isTCP bool, upstreamHostname string, upstreamIPAddress net.IP) []tun.FlowActivityUpdater {
  2350. trafficType := portForwardTypeTCP
  2351. if !isTCP {
  2352. trafficType = portForwardTypeUDP
  2353. }
  2354. activityUpdaters := sshClient.getActivityUpdaters(trafficType, upstreamIPAddress)
  2355. flowUpdaters := make([]tun.FlowActivityUpdater, len(activityUpdaters))
  2356. for i, activityUpdater := range activityUpdaters {
  2357. flowUpdaters[i] = activityUpdater
  2358. }
  2359. return flowUpdaters
  2360. }
  2361. metricUpdater := func(
  2362. TCPApplicationBytesDown, TCPApplicationBytesUp,
  2363. UDPApplicationBytesDown, UDPApplicationBytesUp int64) {
  2364. sshClient.Lock()
  2365. sshClient.tcpTrafficState.bytesDown += TCPApplicationBytesDown
  2366. sshClient.tcpTrafficState.bytesUp += TCPApplicationBytesUp
  2367. sshClient.udpTrafficState.bytesDown += UDPApplicationBytesDown
  2368. sshClient.udpTrafficState.bytesUp += UDPApplicationBytesUp
  2369. sshClient.Unlock()
  2370. }
  2371. dnsQualityReporter := sshClient.updateQualityMetricsWithDNSResult
  2372. err = sshClient.sshServer.support.PacketTunnelServer.ClientConnected(
  2373. sshClient.sessionID,
  2374. packetTunnelChannel,
  2375. checkAllowedTCPPortFunc,
  2376. checkAllowedUDPPortFunc,
  2377. checkAllowedDomainFunc,
  2378. flowActivityUpdaterMaker,
  2379. metricUpdater,
  2380. dnsQualityReporter)
  2381. if err != nil {
  2382. log.WithTraceFields(LogFields{"error": err}).Warning("start packet tunnel client failed")
  2383. sshClient.setPacketTunnelChannel(nil)
  2384. }
  2385. }
  2386. func (sshClient *sshClient) handleNewTCPPortForwardChannel(
  2387. waitGroup *sync.WaitGroup,
  2388. newChannel ssh.NewChannel,
  2389. allowSplitTunnel bool,
  2390. newTCPPortForwards chan *newTCPPortForward) {
  2391. // udpgw client connections are dispatched immediately (clients use this for
  2392. // DNS, so it's essential to not block; and only one udpgw connection is
  2393. // retained at a time).
  2394. //
  2395. // All other TCP port forwards are dispatched via the TCP port forward
  2396. // manager queue.
  2397. // http://tools.ietf.org/html/rfc4254#section-7.2
  2398. var directTcpipExtraData struct {
  2399. HostToConnect string
  2400. PortToConnect uint32
  2401. OriginatorIPAddress string
  2402. OriginatorPort uint32
  2403. }
  2404. err := ssh.Unmarshal(newChannel.ExtraData(), &directTcpipExtraData)
  2405. if err != nil {
  2406. sshClient.rejectNewChannel(newChannel, "invalid extra data")
  2407. return
  2408. }
  2409. // Intercept TCP port forwards to a specified udpgw server and handle directly.
  2410. // TODO: also support UDP explicitly, e.g. with a custom "direct-udp" channel type?
  2411. isUdpgwChannel := sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress != "" &&
  2412. sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress ==
  2413. net.JoinHostPort(directTcpipExtraData.HostToConnect, strconv.Itoa(int(directTcpipExtraData.PortToConnect)))
  2414. if isUdpgwChannel {
  2415. // Dispatch immediately. handleUDPChannel runs the udpgw protocol in its
  2416. // own worker goroutine.
  2417. waitGroup.Add(1)
  2418. go func(channel ssh.NewChannel) {
  2419. defer waitGroup.Done()
  2420. sshClient.handleUdpgwChannel(channel)
  2421. }(newChannel)
  2422. } else {
  2423. // Dispatch via TCP port forward manager. When the queue is full, the channel
  2424. // is immediately rejected.
  2425. //
  2426. // Split tunnel logic is enabled for this TCP port forward when the client
  2427. // has enabled split tunnel mode and the channel type allows it.
  2428. doSplitTunnel := sshClient.handshakeState.splitTunnelLookup != nil && allowSplitTunnel
  2429. tcpPortForward := &newTCPPortForward{
  2430. enqueueTime: time.Now(),
  2431. hostToConnect: directTcpipExtraData.HostToConnect,
  2432. portToConnect: int(directTcpipExtraData.PortToConnect),
  2433. doSplitTunnel: doSplitTunnel,
  2434. newChannel: newChannel,
  2435. }
  2436. select {
  2437. case newTCPPortForwards <- tcpPortForward:
  2438. default:
  2439. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  2440. sshClient.rejectNewChannel(newChannel, "TCP port forward dial queue full")
  2441. }
  2442. }
  2443. }
  2444. func (sshClient *sshClient) cleanupAuthorizations() {
  2445. sshClient.Lock()
  2446. if sshClient.releaseAuthorizations != nil {
  2447. sshClient.releaseAuthorizations()
  2448. }
  2449. if sshClient.stopTimer != nil {
  2450. sshClient.stopTimer.Stop()
  2451. }
  2452. sshClient.Unlock()
  2453. }
  2454. // setPacketTunnelChannel sets the single packet tunnel channel
  2455. // for this sshClient. Any existing packet tunnel channel is
  2456. // closed.
  2457. func (sshClient *sshClient) setPacketTunnelChannel(channel ssh.Channel) {
  2458. sshClient.Lock()
  2459. if sshClient.packetTunnelChannel != nil {
  2460. sshClient.packetTunnelChannel.Close()
  2461. }
  2462. sshClient.packetTunnelChannel = channel
  2463. sshClient.totalPacketTunnelChannelCount += 1
  2464. sshClient.Unlock()
  2465. }
  2466. // setUdpgwChannelHandler sets the single udpgw channel handler for this
  2467. // sshClient. Each sshClient may have only one concurrent udpgw
  2468. // channel/handler. Each udpgw channel multiplexes many UDP port forwards via
  2469. // the udpgw protocol. Any existing udpgw channel/handler is closed.
  2470. func (sshClient *sshClient) setUdpgwChannelHandler(udpgwChannelHandler *udpgwPortForwardMultiplexer) bool {
  2471. sshClient.Lock()
  2472. if sshClient.udpgwChannelHandler != nil {
  2473. previousHandler := sshClient.udpgwChannelHandler
  2474. sshClient.udpgwChannelHandler = nil
  2475. // stop must be run without holding the sshClient mutex lock, as the
  2476. // udpgw goroutines may attempt to lock the same mutex. For example,
  2477. // udpgwPortForwardMultiplexer.run calls sshClient.establishedPortForward
  2478. // which calls sshClient.allocatePortForward.
  2479. sshClient.Unlock()
  2480. previousHandler.stop()
  2481. sshClient.Lock()
  2482. // In case some other channel has set the sshClient.udpgwChannelHandler
  2483. // in the meantime, fail. The caller should discard this channel/handler.
  2484. if sshClient.udpgwChannelHandler != nil {
  2485. sshClient.Unlock()
  2486. return false
  2487. }
  2488. }
  2489. sshClient.udpgwChannelHandler = udpgwChannelHandler
  2490. sshClient.totalUdpgwChannelCount += 1
  2491. sshClient.Unlock()
  2492. return true
  2493. }
  2494. var serverTunnelStatParams = append(
  2495. []requestParamSpec{
  2496. {"last_connected", isLastConnected, requestParamOptional},
  2497. {"establishment_duration", isIntString, requestParamOptional}},
  2498. baseSessionAndDialParams...)
  2499. func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
  2500. sshClient.Lock()
  2501. logFields := getRequestLogFields(
  2502. "server_tunnel",
  2503. sshClient.geoIPData,
  2504. sshClient.handshakeState.authorizedAccessTypes,
  2505. sshClient.handshakeState.apiParams,
  2506. serverTunnelStatParams)
  2507. // "relay_protocol" is sent with handshake API parameters. In pre-
  2508. // handshake logTunnel cases, this value is not yet known. As
  2509. // sshClient.tunnelProtocol is authoritative, set this value
  2510. // unconditionally, overwriting any value from handshake.
  2511. logFields["relay_protocol"] = sshClient.tunnelProtocol
  2512. if sshClient.serverPacketManipulation != "" {
  2513. logFields["server_packet_manipulation"] = sshClient.serverPacketManipulation
  2514. }
  2515. if sshClient.sshListener.BPFProgramName != "" {
  2516. logFields["server_bpf"] = sshClient.sshListener.BPFProgramName
  2517. }
  2518. logFields["session_id"] = sshClient.sessionID
  2519. logFields["is_first_tunnel_in_session"] = sshClient.isFirstTunnelInSession
  2520. logFields["handshake_completed"] = sshClient.handshakeState.completed
  2521. logFields["bytes_up_tcp"] = sshClient.tcpTrafficState.bytesUp
  2522. logFields["bytes_down_tcp"] = sshClient.tcpTrafficState.bytesDown
  2523. logFields["peak_concurrent_dialing_port_forward_count_tcp"] = sshClient.tcpTrafficState.peakConcurrentDialingPortForwardCount
  2524. logFields["peak_concurrent_port_forward_count_tcp"] = sshClient.tcpTrafficState.peakConcurrentPortForwardCount
  2525. logFields["total_port_forward_count_tcp"] = sshClient.tcpTrafficState.totalPortForwardCount
  2526. logFields["bytes_up_udp"] = sshClient.udpTrafficState.bytesUp
  2527. logFields["bytes_down_udp"] = sshClient.udpTrafficState.bytesDown
  2528. // sshClient.udpTrafficState.peakConcurrentDialingPortForwardCount isn't meaningful
  2529. logFields["peak_concurrent_port_forward_count_udp"] = sshClient.udpTrafficState.peakConcurrentPortForwardCount
  2530. logFields["total_port_forward_count_udp"] = sshClient.udpTrafficState.totalPortForwardCount
  2531. logFields["total_udpgw_channel_count"] = sshClient.totalUdpgwChannelCount
  2532. logFields["total_packet_tunnel_channel_count"] = sshClient.totalPacketTunnelChannelCount
  2533. logFields["pre_handshake_random_stream_count"] = sshClient.preHandshakeRandomStreamMetrics.count
  2534. logFields["pre_handshake_random_stream_upstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.upstreamBytes
  2535. logFields["pre_handshake_random_stream_received_upstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.receivedUpstreamBytes
  2536. logFields["pre_handshake_random_stream_downstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.downstreamBytes
  2537. logFields["pre_handshake_random_stream_sent_downstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.sentDownstreamBytes
  2538. logFields["random_stream_count"] = sshClient.postHandshakeRandomStreamMetrics.count
  2539. logFields["random_stream_upstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.upstreamBytes
  2540. logFields["random_stream_received_upstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.receivedUpstreamBytes
  2541. logFields["random_stream_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.downstreamBytes
  2542. logFields["random_stream_sent_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.sentDownstreamBytes
  2543. if sshClient.destinationBytesMetricsASN != "" {
  2544. // Check if the configured DestinationBytesMetricsASN has changed
  2545. // (or been cleared). If so, don't log and discard the accumulated
  2546. // bytes to ensure we don't continue to record stats as previously
  2547. // configured.
  2548. //
  2549. // Any counts accumulated before the DestinationBytesMetricsASN change
  2550. // are lost. At this time we can't change
  2551. // sshClient.destinationBytesMetricsASN dynamically, after a tactics
  2552. // hot reload, as there may be destination bytes port forwards that
  2553. // were in place before the change, which will continue to count.
  2554. logDestBytes := true
  2555. if sshClient.sshServer.support.ServerTacticsParametersCache != nil {
  2556. p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.geoIPData)
  2557. if err != nil || p.IsNil() ||
  2558. sshClient.destinationBytesMetricsASN != p.String(parameters.DestinationBytesMetricsASN) {
  2559. logDestBytes = false
  2560. }
  2561. }
  2562. if logDestBytes {
  2563. bytesUpTCP := sshClient.tcpDestinationBytesMetrics.getBytesUp()
  2564. bytesDownTCP := sshClient.tcpDestinationBytesMetrics.getBytesDown()
  2565. bytesUpUDP := sshClient.udpDestinationBytesMetrics.getBytesUp()
  2566. bytesDownUDP := sshClient.udpDestinationBytesMetrics.getBytesDown()
  2567. logFields["dest_bytes_asn"] = sshClient.destinationBytesMetricsASN
  2568. logFields["dest_bytes_up_tcp"] = bytesUpTCP
  2569. logFields["dest_bytes_down_tcp"] = bytesDownTCP
  2570. logFields["dest_bytes_up_udp"] = bytesUpUDP
  2571. logFields["dest_bytes_down_udp"] = bytesDownUDP
  2572. logFields["dest_bytes"] = bytesUpTCP + bytesDownTCP + bytesUpUDP + bytesDownUDP
  2573. }
  2574. }
  2575. // Only log fields for peakMetrics when there is data recorded, otherwise
  2576. // omit the field.
  2577. if sshClient.peakMetrics.concurrentProximateAcceptedClients != nil {
  2578. logFields["peak_concurrent_proximate_accepted_clients"] = *sshClient.peakMetrics.concurrentProximateAcceptedClients
  2579. }
  2580. if sshClient.peakMetrics.concurrentProximateEstablishedClients != nil {
  2581. logFields["peak_concurrent_proximate_established_clients"] = *sshClient.peakMetrics.concurrentProximateEstablishedClients
  2582. }
  2583. if sshClient.peakMetrics.TCPPortForwardFailureRate != nil && sshClient.peakMetrics.TCPPortForwardFailureRateSampleSize != nil {
  2584. logFields["peak_tcp_port_forward_failure_rate"] = *sshClient.peakMetrics.TCPPortForwardFailureRate
  2585. logFields["peak_tcp_port_forward_failure_rate_sample_size"] = *sshClient.peakMetrics.TCPPortForwardFailureRateSampleSize
  2586. }
  2587. if sshClient.peakMetrics.DNSFailureRate != nil && sshClient.peakMetrics.DNSFailureRateSampleSize != nil {
  2588. logFields["peak_dns_failure_rate"] = *sshClient.peakMetrics.DNSFailureRate
  2589. logFields["peak_dns_failure_rate_sample_size"] = *sshClient.peakMetrics.DNSFailureRateSampleSize
  2590. }
  2591. // Pre-calculate a total-tunneled-bytes field. This total is used
  2592. // extensively in analytics and is more performant when pre-calculated.
  2593. logFields["bytes"] = sshClient.tcpTrafficState.bytesUp +
  2594. sshClient.tcpTrafficState.bytesDown +
  2595. sshClient.udpTrafficState.bytesUp +
  2596. sshClient.udpTrafficState.bytesDown
  2597. // Merge in additional metrics from the optional metrics source
  2598. for _, metrics := range additionalMetrics {
  2599. for name, value := range metrics {
  2600. // Don't overwrite any basic fields
  2601. if logFields[name] == nil {
  2602. logFields[name] = value
  2603. }
  2604. }
  2605. }
  2606. // Retain lock when invoking LogRawFieldsWithTimestamp to block any
  2607. // concurrent writes to variables referenced by logFields.
  2608. log.LogRawFieldsWithTimestamp(logFields)
  2609. sshClient.Unlock()
  2610. }
  2611. var blocklistHitsStatParams = []requestParamSpec{
  2612. {"propagation_channel_id", isHexDigits, 0},
  2613. {"sponsor_id", isHexDigits, 0},
  2614. {"client_version", isIntString, requestParamLogStringAsInt},
  2615. {"client_platform", isClientPlatform, 0},
  2616. {"client_features", isAnyString, requestParamOptional | requestParamArray},
  2617. {"client_build_rev", isHexDigits, requestParamOptional},
  2618. {"device_region", isAnyString, requestParamOptional},
  2619. {"device_location", isGeoHashString, requestParamOptional},
  2620. {"egress_region", isRegionCode, requestParamOptional},
  2621. {"session_id", isHexDigits, 0},
  2622. {"last_connected", isLastConnected, requestParamOptional},
  2623. }
  2624. func (sshClient *sshClient) logBlocklistHits(IP net.IP, domain string, tags []BlocklistTag) {
  2625. sshClient.Lock()
  2626. logFields := getRequestLogFields(
  2627. "server_blocklist_hit",
  2628. sshClient.geoIPData,
  2629. sshClient.handshakeState.authorizedAccessTypes,
  2630. sshClient.handshakeState.apiParams,
  2631. blocklistHitsStatParams)
  2632. logFields["session_id"] = sshClient.sessionID
  2633. // Note: see comment in logTunnel regarding unlock and concurrent access.
  2634. sshClient.Unlock()
  2635. for _, tag := range tags {
  2636. if IP != nil {
  2637. logFields["blocklist_ip_address"] = IP.String()
  2638. }
  2639. if domain != "" {
  2640. logFields["blocklist_domain"] = domain
  2641. }
  2642. logFields["blocklist_source"] = tag.Source
  2643. logFields["blocklist_subject"] = tag.Subject
  2644. log.LogRawFieldsWithTimestamp(logFields)
  2645. }
  2646. }
  2647. func (sshClient *sshClient) runOSLSender() {
  2648. for {
  2649. // Await a signal that there are SLOKs to send
  2650. // TODO: use reflect.SelectCase, and optionally await timer here?
  2651. select {
  2652. case <-sshClient.signalIssueSLOKs:
  2653. case <-sshClient.runCtx.Done():
  2654. return
  2655. }
  2656. retryDelay := SSH_SEND_OSL_INITIAL_RETRY_DELAY
  2657. for {
  2658. err := sshClient.sendOSLRequest()
  2659. if err == nil {
  2660. break
  2661. }
  2662. if !isExpectedTunnelIOError(err) {
  2663. log.WithTraceFields(LogFields{"error": err}).Warning("sendOSLRequest failed")
  2664. }
  2665. // If the request failed, retry after a delay (with exponential backoff)
  2666. // or when signaled that there are additional SLOKs to send
  2667. retryTimer := time.NewTimer(retryDelay)
  2668. select {
  2669. case <-retryTimer.C:
  2670. case <-sshClient.signalIssueSLOKs:
  2671. case <-sshClient.runCtx.Done():
  2672. retryTimer.Stop()
  2673. return
  2674. }
  2675. retryTimer.Stop()
  2676. retryDelay *= SSH_SEND_OSL_RETRY_FACTOR
  2677. }
  2678. }
  2679. }
  2680. // sendOSLRequest will invoke osl.GetSeedPayload to issue SLOKs and
  2681. // generate a payload, and send an OSL request to the client when
  2682. // there are new SLOKs in the payload.
  2683. func (sshClient *sshClient) sendOSLRequest() error {
  2684. seedPayload := sshClient.getOSLSeedPayload()
  2685. // Don't send when no SLOKs. This will happen when signalIssueSLOKs
  2686. // is received but no new SLOKs are issued.
  2687. if len(seedPayload.SLOKs) == 0 {
  2688. return nil
  2689. }
  2690. oslRequest := protocol.OSLRequest{
  2691. SeedPayload: seedPayload,
  2692. }
  2693. requestPayload, err := json.Marshal(oslRequest)
  2694. if err != nil {
  2695. return errors.Trace(err)
  2696. }
  2697. ok, _, err := sshClient.sshConn.SendRequest(
  2698. protocol.PSIPHON_API_OSL_REQUEST_NAME,
  2699. true,
  2700. requestPayload)
  2701. if err != nil {
  2702. return errors.Trace(err)
  2703. }
  2704. if !ok {
  2705. return errors.TraceNew("client rejected request")
  2706. }
  2707. sshClient.clearOSLSeedPayload()
  2708. return nil
  2709. }
  2710. // runAlertSender dequeues and sends alert requests to the client. As these
  2711. // alerts are informational, there is no retry logic and no SSH client
  2712. // acknowledgement (wantReply) is requested. This worker scheme allows
  2713. // nonconcurrent components including udpgw and packet tunnel to enqueue
  2714. // alerts without blocking their traffic processing.
  2715. func (sshClient *sshClient) runAlertSender() {
  2716. for {
  2717. select {
  2718. case <-sshClient.runCtx.Done():
  2719. return
  2720. case request := <-sshClient.sendAlertRequests:
  2721. payload, err := json.Marshal(request)
  2722. if err != nil {
  2723. log.WithTraceFields(LogFields{"error": err}).Warning("Marshal failed")
  2724. break
  2725. }
  2726. _, _, err = sshClient.sshConn.SendRequest(
  2727. protocol.PSIPHON_API_ALERT_REQUEST_NAME,
  2728. false,
  2729. payload)
  2730. if err != nil && !isExpectedTunnelIOError(err) {
  2731. log.WithTraceFields(LogFields{"error": err}).Warning("SendRequest failed")
  2732. break
  2733. }
  2734. sshClient.Lock()
  2735. sshClient.sentAlertRequests[fmt.Sprintf("%+v", request)] = true
  2736. sshClient.Unlock()
  2737. }
  2738. }
  2739. }
  2740. // enqueueAlertRequest enqueues an alert request to be sent to the client.
  2741. // Only one request is sent per tunnel per protocol.AlertRequest value;
  2742. // subsequent alerts with the same value are dropped. enqueueAlertRequest will
  2743. // not block until the queue exceeds ALERT_REQUEST_QUEUE_BUFFER_SIZE.
  2744. func (sshClient *sshClient) enqueueAlertRequest(request protocol.AlertRequest) {
  2745. sshClient.Lock()
  2746. if sshClient.sentAlertRequests[fmt.Sprintf("%+v", request)] {
  2747. sshClient.Unlock()
  2748. return
  2749. }
  2750. sshClient.Unlock()
  2751. select {
  2752. case <-sshClient.runCtx.Done():
  2753. case sshClient.sendAlertRequests <- request:
  2754. }
  2755. }
  2756. func (sshClient *sshClient) enqueueDisallowedTrafficAlertRequest() {
  2757. reason := protocol.PSIPHON_API_ALERT_DISALLOWED_TRAFFIC
  2758. actionURLs := sshClient.getAlertActionURLs(reason)
  2759. sshClient.enqueueAlertRequest(
  2760. protocol.AlertRequest{
  2761. Reason: reason,
  2762. ActionURLs: actionURLs,
  2763. })
  2764. }
  2765. func (sshClient *sshClient) enqueueUnsafeTrafficAlertRequest(tags []BlocklistTag) {
  2766. reason := protocol.PSIPHON_API_ALERT_UNSAFE_TRAFFIC
  2767. actionURLs := sshClient.getAlertActionURLs(reason)
  2768. for _, tag := range tags {
  2769. sshClient.enqueueAlertRequest(
  2770. protocol.AlertRequest{
  2771. Reason: reason,
  2772. Subject: tag.Subject,
  2773. ActionURLs: actionURLs,
  2774. })
  2775. }
  2776. }
  2777. func (sshClient *sshClient) getAlertActionURLs(alertReason string) []string {
  2778. sshClient.Lock()
  2779. sponsorID, _ := getStringRequestParam(
  2780. sshClient.handshakeState.apiParams, "sponsor_id")
  2781. sshClient.Unlock()
  2782. return sshClient.sshServer.support.PsinetDatabase.GetAlertActionURLs(
  2783. alertReason,
  2784. sponsorID,
  2785. sshClient.geoIPData.Country,
  2786. sshClient.geoIPData.ASN,
  2787. sshClient.handshakeState.deviceRegion)
  2788. }
  2789. func (sshClient *sshClient) rejectNewChannel(newChannel ssh.NewChannel, logMessage string) {
  2790. // We always return the reject reason "Prohibited":
  2791. // - Traffic rules and connection limits may prohibit the connection.
  2792. // - External firewall rules may prohibit the connection, and this is not currently
  2793. // distinguishable from other failure modes.
  2794. // - We limit the failure information revealed to the client.
  2795. reason := ssh.Prohibited
  2796. // Note: Debug level, as logMessage may contain user traffic destination address information
  2797. log.WithTraceFields(
  2798. LogFields{
  2799. "channelType": newChannel.ChannelType(),
  2800. "logMessage": logMessage,
  2801. "rejectReason": reason.String(),
  2802. }).Debug("reject new channel")
  2803. // Note: logMessage is internal, for logging only; just the reject reason is sent to the client.
  2804. newChannel.Reject(reason, reason.String())
  2805. }
  2806. // setHandshakeState records that a client has completed a handshake API request.
  2807. // Some parameters from the handshake request may be used in future traffic rule
  2808. // selection. Port forwards are disallowed until a handshake is complete. The
  2809. // handshake parameters are included in the session summary log recorded in
  2810. // sshClient.stop().
  2811. func (sshClient *sshClient) setHandshakeState(
  2812. state handshakeState,
  2813. authorizations []string) (*handshakeStateInfo, error) {
  2814. sshClient.Lock()
  2815. completed := sshClient.handshakeState.completed
  2816. if !completed {
  2817. sshClient.handshakeState = state
  2818. }
  2819. sshClient.Unlock()
  2820. // Client must only perform one handshake
  2821. if completed {
  2822. return nil, errors.TraceNew("handshake already completed")
  2823. }
  2824. // Verify the authorizations submitted by the client. Verified, active
  2825. // (non-expired) access types will be available for traffic rules
  2826. // filtering.
  2827. //
  2828. // When an authorization is active but expires while the client is
  2829. // connected, the client is disconnected to ensure the access is reset.
  2830. // This is implemented by setting a timer to perform the disconnect at the
  2831. // expiry time of the soonest expiring authorization.
  2832. //
  2833. // sshServer.authorizationSessionIDs tracks the unique mapping of active
  2834. // authorization IDs to client session IDs and is used to detect and
  2835. // prevent multiple malicious clients from reusing a single authorization
  2836. // (within the scope of this server).
  2837. // authorizationIDs and authorizedAccessTypes are returned to the client
  2838. // and logged, respectively; initialize to empty lists so the
  2839. // protocol/logs don't need to handle 'null' values.
  2840. authorizationIDs := make([]string, 0)
  2841. authorizedAccessTypes := make([]string, 0)
  2842. var stopTime time.Time
  2843. for i, authorization := range authorizations {
  2844. // This sanity check mitigates malicious clients causing excess CPU use.
  2845. if i >= MAX_AUTHORIZATIONS {
  2846. log.WithTrace().Warning("too many authorizations")
  2847. break
  2848. }
  2849. verifiedAuthorization, err := accesscontrol.VerifyAuthorization(
  2850. &sshClient.sshServer.support.Config.AccessControlVerificationKeyRing,
  2851. authorization)
  2852. if err != nil {
  2853. log.WithTraceFields(
  2854. LogFields{"error": err}).Warning("verify authorization failed")
  2855. continue
  2856. }
  2857. authorizationID := base64.StdEncoding.EncodeToString(verifiedAuthorization.ID)
  2858. if common.Contains(authorizedAccessTypes, verifiedAuthorization.AccessType) {
  2859. log.WithTraceFields(
  2860. LogFields{"accessType": verifiedAuthorization.AccessType}).Warning("duplicate authorization access type")
  2861. continue
  2862. }
  2863. authorizationIDs = append(authorizationIDs, authorizationID)
  2864. authorizedAccessTypes = append(authorizedAccessTypes, verifiedAuthorization.AccessType)
  2865. if stopTime.IsZero() || stopTime.After(verifiedAuthorization.Expires) {
  2866. stopTime = verifiedAuthorization.Expires
  2867. }
  2868. }
  2869. // Associate all verified authorizationIDs with this client's session ID.
  2870. // Handle cases where previous associations exist:
  2871. //
  2872. // - Multiple malicious clients reusing a single authorization. In this
  2873. // case, authorizations are revoked from the previous client.
  2874. //
  2875. // - The client reconnected with a new session ID due to user toggling.
  2876. // This case is expected due to server affinity. This cannot be
  2877. // distinguished from the previous case and the same action is taken;
  2878. // this will have no impact on a legitimate client as the previous
  2879. // session is dangling.
  2880. //
  2881. // - The client automatically reconnected with the same session ID. This
  2882. // case is not expected as sshServer.registerEstablishedClient
  2883. // synchronously calls sshClient.releaseAuthorizations; as a safe guard,
  2884. // this case is distinguished and no revocation action is taken.
  2885. sshClient.sshServer.authorizationSessionIDsMutex.Lock()
  2886. for _, authorizationID := range authorizationIDs {
  2887. sessionID, ok := sshClient.sshServer.authorizationSessionIDs[authorizationID]
  2888. if ok && sessionID != sshClient.sessionID {
  2889. logFields := LogFields{
  2890. "event_name": "irregular_tunnel",
  2891. "tunnel_error": "duplicate active authorization",
  2892. "duplicate_authorization_id": authorizationID,
  2893. }
  2894. sshClient.geoIPData.SetLogFields(logFields)
  2895. duplicateGeoIPData := sshClient.sshServer.support.GeoIPService.GetSessionCache(sessionID)
  2896. if duplicateGeoIPData != sshClient.geoIPData {
  2897. duplicateGeoIPData.SetLogFieldsWithPrefix("duplicate_authorization_", logFields)
  2898. }
  2899. log.LogRawFieldsWithTimestamp(logFields)
  2900. // Invoke asynchronously to avoid deadlocks.
  2901. // TODO: invoke only once for each distinct sessionID?
  2902. go sshClient.sshServer.revokeClientAuthorizations(sessionID)
  2903. }
  2904. sshClient.sshServer.authorizationSessionIDs[authorizationID] = sshClient.sessionID
  2905. }
  2906. sshClient.sshServer.authorizationSessionIDsMutex.Unlock()
  2907. if len(authorizationIDs) > 0 {
  2908. sshClient.Lock()
  2909. // Make the authorizedAccessTypes available for traffic rules filtering.
  2910. sshClient.handshakeState.activeAuthorizationIDs = authorizationIDs
  2911. sshClient.handshakeState.authorizedAccessTypes = authorizedAccessTypes
  2912. // On exit, sshClient.runTunnel will call releaseAuthorizations, which
  2913. // will release the authorization IDs so the client can reconnect and
  2914. // present the same authorizations again. sshClient.runTunnel will
  2915. // also cancel the stopTimer in case it has not yet fired.
  2916. // Note: termination of the stopTimer goroutine is not synchronized.
  2917. sshClient.releaseAuthorizations = func() {
  2918. sshClient.sshServer.authorizationSessionIDsMutex.Lock()
  2919. for _, authorizationID := range authorizationIDs {
  2920. sessionID, ok := sshClient.sshServer.authorizationSessionIDs[authorizationID]
  2921. if ok && sessionID == sshClient.sessionID {
  2922. delete(sshClient.sshServer.authorizationSessionIDs, authorizationID)
  2923. }
  2924. }
  2925. sshClient.sshServer.authorizationSessionIDsMutex.Unlock()
  2926. }
  2927. sshClient.stopTimer = time.AfterFunc(
  2928. time.Until(stopTime),
  2929. func() {
  2930. sshClient.stop()
  2931. })
  2932. sshClient.Unlock()
  2933. }
  2934. upstreamBytesPerSecond, downstreamBytesPerSecond := sshClient.setTrafficRules()
  2935. sshClient.setOSLConfig()
  2936. // Set destination bytes metrics.
  2937. //
  2938. // Limitation: this is a one-time operation and doesn't get reset when
  2939. // tactics are hot-reloaded. This allows us to simply retain any
  2940. // destination byte counts accumulated and eventually log in
  2941. // server_tunnel, without having to deal with a destination change
  2942. // mid-tunnel. As typical tunnels are short, and destination changes can
  2943. // be applied gradually, handling mid-tunnel changes is not a priority.
  2944. sshClient.setDestinationBytesMetrics()
  2945. return &handshakeStateInfo{
  2946. activeAuthorizationIDs: authorizationIDs,
  2947. authorizedAccessTypes: authorizedAccessTypes,
  2948. upstreamBytesPerSecond: upstreamBytesPerSecond,
  2949. downstreamBytesPerSecond: downstreamBytesPerSecond,
  2950. }, nil
  2951. }
  2952. // getHandshaked returns whether the client has completed a handshake API
  2953. // request and whether the traffic rules that were selected after the
  2954. // handshake immediately exhaust the client.
  2955. //
  2956. // When the client is immediately exhausted it will be closed; but this
  2957. // takes effect asynchronously. The "exhausted" return value is used to
  2958. // prevent API requests by clients that will close.
  2959. func (sshClient *sshClient) getHandshaked() (bool, bool) {
  2960. sshClient.Lock()
  2961. defer sshClient.Unlock()
  2962. completed := sshClient.handshakeState.completed
  2963. exhausted := false
  2964. // Notes:
  2965. // - "Immediately exhausted" is when CloseAfterExhausted is set and
  2966. // either ReadUnthrottledBytes or WriteUnthrottledBytes starts from
  2967. // 0, so no bytes would be read or written. This check does not
  2968. // examine whether 0 bytes _remain_ in the ThrottledConn.
  2969. // - This check is made against the current traffic rules, which
  2970. // could have changed in a hot reload since the handshake.
  2971. if completed &&
  2972. *sshClient.trafficRules.RateLimits.CloseAfterExhausted &&
  2973. (*sshClient.trafficRules.RateLimits.ReadUnthrottledBytes == 0 ||
  2974. *sshClient.trafficRules.RateLimits.WriteUnthrottledBytes == 0) {
  2975. exhausted = true
  2976. }
  2977. return completed, exhausted
  2978. }
  2979. func (sshClient *sshClient) getDisableDiscovery() bool {
  2980. sshClient.Lock()
  2981. defer sshClient.Unlock()
  2982. return *sshClient.trafficRules.DisableDiscovery
  2983. }
  2984. func (sshClient *sshClient) updateAPIParameters(
  2985. apiParams common.APIParameters) {
  2986. sshClient.Lock()
  2987. defer sshClient.Unlock()
  2988. // Only update after handshake has initialized API params.
  2989. if !sshClient.handshakeState.completed {
  2990. return
  2991. }
  2992. for name, value := range apiParams {
  2993. sshClient.handshakeState.apiParams[name] = value
  2994. }
  2995. }
  2996. func (sshClient *sshClient) acceptDomainBytes() bool {
  2997. sshClient.Lock()
  2998. defer sshClient.Unlock()
  2999. // When the domain bytes checksum differs from the checksum sent to the
  3000. // client in the handshake response, the psinet regex configuration has
  3001. // changed. In this case, drop the stats so we don't continue to record
  3002. // stats as previously configured.
  3003. //
  3004. // Limitations:
  3005. // - The checksum comparison may result in dropping some stats for a
  3006. // domain that remains in the new configuration.
  3007. // - We don't push new regexs to the clients, so clients that remain
  3008. // connected will continue to send stats that will be dropped; and
  3009. // those clients will not send stats as newly configured until after
  3010. // reconnecting.
  3011. // - Due to the design of
  3012. // transferstats.ReportRecentBytesTransferredForServer in the client,
  3013. // the client may accumulate stats, reconnect before its next status
  3014. // request, get a new regex configuration, and then send the previously
  3015. // accumulated stats in its next status request. The checksum scheme
  3016. // won't prevent the reporting of those stats.
  3017. sponsorID, _ := getStringRequestParam(sshClient.handshakeState.apiParams, "sponsor_id")
  3018. domainBytesChecksum := sshClient.sshServer.support.PsinetDatabase.GetDomainBytesChecksum(sponsorID)
  3019. return bytes.Equal(sshClient.handshakeState.domainBytesChecksum, domainBytesChecksum)
  3020. }
  3021. // setOSLConfig resets the client's OSL seed state based on the latest OSL config
  3022. // As sshClient.oslClientSeedState may be reset by a concurrent goroutine,
  3023. // oslClientSeedState must only be accessed within the sshClient mutex.
  3024. func (sshClient *sshClient) setOSLConfig() {
  3025. sshClient.Lock()
  3026. defer sshClient.Unlock()
  3027. propagationChannelID, err := getStringRequestParam(
  3028. sshClient.handshakeState.apiParams, "propagation_channel_id")
  3029. if err != nil {
  3030. // This should not fail as long as client has sent valid handshake
  3031. return
  3032. }
  3033. // Use a cached seed state if one is found for the client's
  3034. // session ID. This enables resuming progress made in a previous
  3035. // tunnel.
  3036. // Note: go-cache is already concurency safe; the additional mutex
  3037. // is necessary to guarantee that Get/Delete is atomic; although in
  3038. // practice no two concurrent clients should ever supply the same
  3039. // session ID.
  3040. sshClient.sshServer.oslSessionCacheMutex.Lock()
  3041. oslClientSeedState, found := sshClient.sshServer.oslSessionCache.Get(sshClient.sessionID)
  3042. if found {
  3043. sshClient.sshServer.oslSessionCache.Delete(sshClient.sessionID)
  3044. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  3045. sshClient.oslClientSeedState = oslClientSeedState.(*osl.ClientSeedState)
  3046. sshClient.oslClientSeedState.Resume(sshClient.signalIssueSLOKs)
  3047. return
  3048. }
  3049. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  3050. // Two limitations when setOSLConfig() is invoked due to an
  3051. // OSL config hot reload:
  3052. //
  3053. // 1. any partial progress towards SLOKs is lost.
  3054. //
  3055. // 2. all existing osl.ClientSeedPortForwards for existing
  3056. // port forwards will not send progress to the new client
  3057. // seed state.
  3058. sshClient.oslClientSeedState = sshClient.sshServer.support.OSLConfig.NewClientSeedState(
  3059. sshClient.geoIPData.Country,
  3060. propagationChannelID,
  3061. sshClient.signalIssueSLOKs)
  3062. }
  3063. // newClientSeedPortForward will return nil when no seeding is
  3064. // associated with the specified ipAddress.
  3065. func (sshClient *sshClient) newClientSeedPortForward(IPAddress net.IP) *osl.ClientSeedPortForward {
  3066. sshClient.Lock()
  3067. defer sshClient.Unlock()
  3068. // Will not be initialized before handshake.
  3069. if sshClient.oslClientSeedState == nil {
  3070. return nil
  3071. }
  3072. return sshClient.oslClientSeedState.NewClientSeedPortForward(IPAddress)
  3073. }
  3074. // getOSLSeedPayload returns a payload containing all seeded SLOKs for
  3075. // this client's session.
  3076. func (sshClient *sshClient) getOSLSeedPayload() *osl.SeedPayload {
  3077. sshClient.Lock()
  3078. defer sshClient.Unlock()
  3079. // Will not be initialized before handshake.
  3080. if sshClient.oslClientSeedState == nil {
  3081. return &osl.SeedPayload{SLOKs: make([]*osl.SLOK, 0)}
  3082. }
  3083. return sshClient.oslClientSeedState.GetSeedPayload()
  3084. }
  3085. func (sshClient *sshClient) clearOSLSeedPayload() {
  3086. sshClient.Lock()
  3087. defer sshClient.Unlock()
  3088. sshClient.oslClientSeedState.ClearSeedPayload()
  3089. }
  3090. func (sshClient *sshClient) setDestinationBytesMetrics() {
  3091. sshClient.Lock()
  3092. defer sshClient.Unlock()
  3093. // Limitation: the server-side tactics cache is used to avoid the overhead
  3094. // of an additional tactics filtering per tunnel. As this cache is
  3095. // designed for GeoIP filtering only, handshake API parameters are not
  3096. // applied to tactics filtering in this case.
  3097. tacticsCache := sshClient.sshServer.support.ServerTacticsParametersCache
  3098. if tacticsCache == nil {
  3099. return
  3100. }
  3101. p, err := tacticsCache.Get(sshClient.geoIPData)
  3102. if err != nil {
  3103. log.WithTraceFields(LogFields{"error": err}).Warning("get tactics failed")
  3104. return
  3105. }
  3106. if p.IsNil() {
  3107. return
  3108. }
  3109. sshClient.destinationBytesMetricsASN = p.String(parameters.DestinationBytesMetricsASN)
  3110. }
  3111. func (sshClient *sshClient) newDestinationBytesMetricsUpdater(portForwardType int, IPAddress net.IP) *destinationBytesMetrics {
  3112. sshClient.Lock()
  3113. defer sshClient.Unlock()
  3114. if sshClient.destinationBytesMetricsASN == "" {
  3115. return nil
  3116. }
  3117. if sshClient.sshServer.support.GeoIPService.LookupISPForIP(IPAddress).ASN != sshClient.destinationBytesMetricsASN {
  3118. return nil
  3119. }
  3120. if portForwardType == portForwardTypeTCP {
  3121. return &sshClient.tcpDestinationBytesMetrics
  3122. }
  3123. return &sshClient.udpDestinationBytesMetrics
  3124. }
  3125. func (sshClient *sshClient) getActivityUpdaters(portForwardType int, IPAddress net.IP) []common.ActivityUpdater {
  3126. var updaters []common.ActivityUpdater
  3127. clientSeedPortForward := sshClient.newClientSeedPortForward(IPAddress)
  3128. if clientSeedPortForward != nil {
  3129. updaters = append(updaters, clientSeedPortForward)
  3130. }
  3131. destinationBytesMetrics := sshClient.newDestinationBytesMetricsUpdater(portForwardType, IPAddress)
  3132. if destinationBytesMetrics != nil {
  3133. updaters = append(updaters, destinationBytesMetrics)
  3134. }
  3135. return updaters
  3136. }
  3137. // setTrafficRules resets the client's traffic rules based on the latest server config
  3138. // and client properties. As sshClient.trafficRules may be reset by a concurrent
  3139. // goroutine, trafficRules must only be accessed within the sshClient mutex.
  3140. func (sshClient *sshClient) setTrafficRules() (int64, int64) {
  3141. sshClient.Lock()
  3142. defer sshClient.Unlock()
  3143. isFirstTunnelInSession := sshClient.isFirstTunnelInSession &&
  3144. sshClient.handshakeState.establishedTunnelsCount == 0
  3145. sshClient.trafficRules = sshClient.sshServer.support.TrafficRulesSet.GetTrafficRules(
  3146. isFirstTunnelInSession,
  3147. sshClient.tunnelProtocol,
  3148. sshClient.geoIPData,
  3149. sshClient.handshakeState)
  3150. if sshClient.throttledConn != nil {
  3151. // Any existing throttling state is reset.
  3152. sshClient.throttledConn.SetLimits(
  3153. sshClient.trafficRules.RateLimits.CommonRateLimits(
  3154. sshClient.handshakeState.completed))
  3155. }
  3156. return *sshClient.trafficRules.RateLimits.ReadBytesPerSecond,
  3157. *sshClient.trafficRules.RateLimits.WriteBytesPerSecond
  3158. }
  3159. func (sshClient *sshClient) rateLimits() common.RateLimits {
  3160. sshClient.Lock()
  3161. defer sshClient.Unlock()
  3162. return sshClient.trafficRules.RateLimits.CommonRateLimits(
  3163. sshClient.handshakeState.completed)
  3164. }
  3165. func (sshClient *sshClient) idleTCPPortForwardTimeout() time.Duration {
  3166. sshClient.Lock()
  3167. defer sshClient.Unlock()
  3168. return time.Duration(*sshClient.trafficRules.IdleTCPPortForwardTimeoutMilliseconds) * time.Millisecond
  3169. }
  3170. func (sshClient *sshClient) idleUDPPortForwardTimeout() time.Duration {
  3171. sshClient.Lock()
  3172. defer sshClient.Unlock()
  3173. return time.Duration(*sshClient.trafficRules.IdleUDPPortForwardTimeoutMilliseconds) * time.Millisecond
  3174. }
  3175. func (sshClient *sshClient) setTCPPortForwardDialingAvailableSignal(signal context.CancelFunc) {
  3176. sshClient.Lock()
  3177. defer sshClient.Unlock()
  3178. sshClient.tcpPortForwardDialingAvailableSignal = signal
  3179. }
  3180. const (
  3181. portForwardTypeTCP = iota
  3182. portForwardTypeUDP
  3183. )
  3184. func (sshClient *sshClient) isPortForwardPermitted(
  3185. portForwardType int,
  3186. remoteIP net.IP,
  3187. port int) bool {
  3188. // Disallow connection to bogons.
  3189. //
  3190. // As a security measure, this is a failsafe. The server should be run on a
  3191. // host with correctly configured firewall rules.
  3192. //
  3193. // This check also avoids spurious disallowed traffic alerts for destinations
  3194. // that are impossible to reach.
  3195. if !sshClient.sshServer.support.Config.AllowBogons && common.IsBogon(remoteIP) {
  3196. return false
  3197. }
  3198. // Blocklist check.
  3199. //
  3200. // Limitation: isPortForwardPermitted is not called in transparent DNS
  3201. // forwarding cases. As the destination IP address is rewritten in these
  3202. // cases, a blocklist entry won't be dialed in any case. However, no logs
  3203. // will be recorded.
  3204. if !sshClient.isIPPermitted(remoteIP) {
  3205. return false
  3206. }
  3207. // Don't lock before calling logBlocklistHits.
  3208. // Unlock before calling enqueueDisallowedTrafficAlertRequest/log.
  3209. sshClient.Lock()
  3210. allowed := true
  3211. // Client must complete handshake before port forwards are permitted.
  3212. if !sshClient.handshakeState.completed {
  3213. allowed = false
  3214. }
  3215. if allowed {
  3216. // Traffic rules checks.
  3217. switch portForwardType {
  3218. case portForwardTypeTCP:
  3219. if !sshClient.trafficRules.AllowTCPPort(
  3220. sshClient.sshServer.support.GeoIPService, remoteIP, port) {
  3221. allowed = false
  3222. }
  3223. case portForwardTypeUDP:
  3224. if !sshClient.trafficRules.AllowUDPPort(
  3225. sshClient.sshServer.support.GeoIPService, remoteIP, port) {
  3226. allowed = false
  3227. }
  3228. }
  3229. }
  3230. sshClient.Unlock()
  3231. if allowed {
  3232. return true
  3233. }
  3234. switch portForwardType {
  3235. case portForwardTypeTCP:
  3236. sshClient.updateQualityMetricsWithTCPRejectedDisallowed()
  3237. case portForwardTypeUDP:
  3238. sshClient.updateQualityMetricsWithUDPRejectedDisallowed()
  3239. }
  3240. sshClient.enqueueDisallowedTrafficAlertRequest()
  3241. log.WithTraceFields(
  3242. LogFields{
  3243. "type": portForwardType,
  3244. "port": port,
  3245. }).Debug("port forward denied by traffic rules")
  3246. return false
  3247. }
  3248. // isDomainPermitted returns true when the specified domain may be resolved
  3249. // and returns false and a reject reason otherwise.
  3250. func (sshClient *sshClient) isDomainPermitted(domain string) (bool, string) {
  3251. // We're not doing comprehensive validation, to avoid overhead per port
  3252. // forward. This is a simple sanity check to ensure we don't process
  3253. // blantantly invalid input.
  3254. //
  3255. // TODO: validate with dns.IsDomainName?
  3256. if len(domain) > 255 {
  3257. return false, "invalid domain name"
  3258. }
  3259. tags := sshClient.sshServer.support.Blocklist.LookupDomain(domain)
  3260. if len(tags) > 0 {
  3261. sshClient.logBlocklistHits(nil, domain, tags)
  3262. if sshClient.sshServer.support.Config.BlocklistActive {
  3263. // Actively alert and block
  3264. sshClient.enqueueUnsafeTrafficAlertRequest(tags)
  3265. return false, "port forward not permitted"
  3266. }
  3267. }
  3268. return true, ""
  3269. }
  3270. func (sshClient *sshClient) isIPPermitted(remoteIP net.IP) bool {
  3271. tags := sshClient.sshServer.support.Blocklist.LookupIP(remoteIP)
  3272. if len(tags) > 0 {
  3273. sshClient.logBlocklistHits(remoteIP, "", tags)
  3274. if sshClient.sshServer.support.Config.BlocklistActive {
  3275. // Actively alert and block
  3276. sshClient.enqueueUnsafeTrafficAlertRequest(tags)
  3277. return false
  3278. }
  3279. }
  3280. return true
  3281. }
  3282. func (sshClient *sshClient) isTCPDialingPortForwardLimitExceeded() bool {
  3283. sshClient.Lock()
  3284. defer sshClient.Unlock()
  3285. state := &sshClient.tcpTrafficState
  3286. max := *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  3287. if max > 0 && state.concurrentDialingPortForwardCount >= int64(max) {
  3288. return true
  3289. }
  3290. return false
  3291. }
  3292. func (sshClient *sshClient) getTCPPortForwardQueueSize() int {
  3293. sshClient.Lock()
  3294. defer sshClient.Unlock()
  3295. return *sshClient.trafficRules.MaxTCPPortForwardCount +
  3296. *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  3297. }
  3298. func (sshClient *sshClient) getDialTCPPortForwardTimeoutMilliseconds() int {
  3299. sshClient.Lock()
  3300. defer sshClient.Unlock()
  3301. return *sshClient.trafficRules.DialTCPPortForwardTimeoutMilliseconds
  3302. }
  3303. func (sshClient *sshClient) dialingTCPPortForward() {
  3304. sshClient.Lock()
  3305. defer sshClient.Unlock()
  3306. state := &sshClient.tcpTrafficState
  3307. state.concurrentDialingPortForwardCount += 1
  3308. if state.concurrentDialingPortForwardCount > state.peakConcurrentDialingPortForwardCount {
  3309. state.peakConcurrentDialingPortForwardCount = state.concurrentDialingPortForwardCount
  3310. }
  3311. }
  3312. func (sshClient *sshClient) abortedTCPPortForward() {
  3313. sshClient.Lock()
  3314. defer sshClient.Unlock()
  3315. sshClient.tcpTrafficState.concurrentDialingPortForwardCount -= 1
  3316. }
  3317. func (sshClient *sshClient) allocatePortForward(portForwardType int) bool {
  3318. sshClient.Lock()
  3319. defer sshClient.Unlock()
  3320. // Check if at port forward limit. The subsequent counter
  3321. // changes must be atomic with the limit check to ensure
  3322. // the counter never exceeds the limit in the case of
  3323. // concurrent allocations.
  3324. var max int
  3325. var state *trafficState
  3326. if portForwardType == portForwardTypeTCP {
  3327. max = *sshClient.trafficRules.MaxTCPPortForwardCount
  3328. state = &sshClient.tcpTrafficState
  3329. } else {
  3330. max = *sshClient.trafficRules.MaxUDPPortForwardCount
  3331. state = &sshClient.udpTrafficState
  3332. }
  3333. if max > 0 && state.concurrentPortForwardCount >= int64(max) {
  3334. return false
  3335. }
  3336. // Update port forward counters.
  3337. if portForwardType == portForwardTypeTCP {
  3338. // Assumes TCP port forwards called dialingTCPPortForward
  3339. state.concurrentDialingPortForwardCount -= 1
  3340. if sshClient.tcpPortForwardDialingAvailableSignal != nil {
  3341. max := *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  3342. if max <= 0 || state.concurrentDialingPortForwardCount < int64(max) {
  3343. sshClient.tcpPortForwardDialingAvailableSignal()
  3344. }
  3345. }
  3346. }
  3347. state.concurrentPortForwardCount += 1
  3348. if state.concurrentPortForwardCount > state.peakConcurrentPortForwardCount {
  3349. state.peakConcurrentPortForwardCount = state.concurrentPortForwardCount
  3350. }
  3351. state.totalPortForwardCount += 1
  3352. return true
  3353. }
  3354. // establishedPortForward increments the concurrent port
  3355. // forward counter. closedPortForward decrements it, so it
  3356. // must always be called for each establishedPortForward
  3357. // call.
  3358. //
  3359. // When at the limit of established port forwards, the LRU
  3360. // existing port forward is closed to make way for the newly
  3361. // established one. There can be a minor delay as, in addition
  3362. // to calling Close() on the port forward net.Conn,
  3363. // establishedPortForward waits for the LRU's closedPortForward()
  3364. // call which will decrement the concurrent counter. This
  3365. // ensures all resources associated with the LRU (socket,
  3366. // goroutine) are released or will very soon be released before
  3367. // proceeding.
  3368. func (sshClient *sshClient) establishedPortForward(
  3369. portForwardType int, portForwardLRU *common.LRUConns) {
  3370. // Do not lock sshClient here.
  3371. var state *trafficState
  3372. if portForwardType == portForwardTypeTCP {
  3373. state = &sshClient.tcpTrafficState
  3374. } else {
  3375. state = &sshClient.udpTrafficState
  3376. }
  3377. // When the maximum number of port forwards is already
  3378. // established, close the LRU. CloseOldest will call
  3379. // Close on the port forward net.Conn. Both TCP and
  3380. // UDP port forwards have handler goroutines that may
  3381. // be blocked calling Read on the net.Conn. Close will
  3382. // eventually interrupt the Read and cause the handlers
  3383. // to exit, but not immediately. So the following logic
  3384. // waits for a LRU handler to be interrupted and signal
  3385. // availability.
  3386. //
  3387. // Notes:
  3388. //
  3389. // - the port forward limit can change via a traffic
  3390. // rules hot reload; the condition variable handles
  3391. // this case whereas a channel-based semaphore would
  3392. // not.
  3393. //
  3394. // - if a number of goroutines exceeding the total limit
  3395. // arrive here all concurrently, some CloseOldest() calls
  3396. // will have no effect as there can be less existing port
  3397. // forwards than new ones. In this case, the new port
  3398. // forward will be delayed. This is highly unlikely in
  3399. // practise since UDP calls to establishedPortForward are
  3400. // serialized and TCP calls are limited by the dial
  3401. // queue/count.
  3402. if !sshClient.allocatePortForward(portForwardType) {
  3403. portForwardLRU.CloseOldest()
  3404. log.WithTrace().Debug("closed LRU port forward")
  3405. state.availablePortForwardCond.L.Lock()
  3406. for !sshClient.allocatePortForward(portForwardType) {
  3407. state.availablePortForwardCond.Wait()
  3408. }
  3409. state.availablePortForwardCond.L.Unlock()
  3410. }
  3411. }
  3412. func (sshClient *sshClient) closedPortForward(
  3413. portForwardType int, bytesUp, bytesDown int64) {
  3414. sshClient.Lock()
  3415. var state *trafficState
  3416. if portForwardType == portForwardTypeTCP {
  3417. state = &sshClient.tcpTrafficState
  3418. } else {
  3419. state = &sshClient.udpTrafficState
  3420. }
  3421. state.concurrentPortForwardCount -= 1
  3422. state.bytesUp += bytesUp
  3423. state.bytesDown += bytesDown
  3424. sshClient.Unlock()
  3425. // Signal any goroutine waiting in establishedPortForward
  3426. // that an established port forward slot is available.
  3427. state.availablePortForwardCond.Signal()
  3428. }
  3429. func (sshClient *sshClient) updateQualityMetricsWithDialResult(
  3430. tcpPortForwardDialSuccess bool, dialDuration time.Duration, IP net.IP) {
  3431. sshClient.Lock()
  3432. defer sshClient.Unlock()
  3433. if tcpPortForwardDialSuccess {
  3434. sshClient.qualityMetrics.TCPPortForwardDialedCount += 1
  3435. sshClient.qualityMetrics.TCPPortForwardDialedDuration += dialDuration
  3436. if IP.To4() != nil {
  3437. sshClient.qualityMetrics.TCPIPv4PortForwardDialedCount += 1
  3438. sshClient.qualityMetrics.TCPIPv4PortForwardDialedDuration += dialDuration
  3439. } else if IP != nil {
  3440. sshClient.qualityMetrics.TCPIPv6PortForwardDialedCount += 1
  3441. sshClient.qualityMetrics.TCPIPv6PortForwardDialedDuration += dialDuration
  3442. }
  3443. } else {
  3444. sshClient.qualityMetrics.TCPPortForwardFailedCount += 1
  3445. sshClient.qualityMetrics.TCPPortForwardFailedDuration += dialDuration
  3446. if IP.To4() != nil {
  3447. sshClient.qualityMetrics.TCPIPv4PortForwardFailedCount += 1
  3448. sshClient.qualityMetrics.TCPIPv4PortForwardFailedDuration += dialDuration
  3449. } else if IP != nil {
  3450. sshClient.qualityMetrics.TCPIPv6PortForwardFailedCount += 1
  3451. sshClient.qualityMetrics.TCPIPv6PortForwardFailedDuration += dialDuration
  3452. }
  3453. }
  3454. }
  3455. func (sshClient *sshClient) updateQualityMetricsWithRejectedDialingLimit() {
  3456. sshClient.Lock()
  3457. defer sshClient.Unlock()
  3458. sshClient.qualityMetrics.TCPPortForwardRejectedDialingLimitCount += 1
  3459. }
  3460. func (sshClient *sshClient) updateQualityMetricsWithTCPRejectedDisallowed() {
  3461. sshClient.Lock()
  3462. defer sshClient.Unlock()
  3463. sshClient.qualityMetrics.TCPPortForwardRejectedDisallowedCount += 1
  3464. }
  3465. func (sshClient *sshClient) updateQualityMetricsWithUDPRejectedDisallowed() {
  3466. sshClient.Lock()
  3467. defer sshClient.Unlock()
  3468. sshClient.qualityMetrics.UDPPortForwardRejectedDisallowedCount += 1
  3469. }
  3470. func (sshClient *sshClient) updateQualityMetricsWithDNSResult(
  3471. success bool, duration time.Duration, resolverIP net.IP) {
  3472. sshClient.Lock()
  3473. defer sshClient.Unlock()
  3474. resolver := ""
  3475. if resolverIP != nil {
  3476. resolver = resolverIP.String()
  3477. }
  3478. if success {
  3479. sshClient.qualityMetrics.DNSCount["ALL"] += 1
  3480. sshClient.qualityMetrics.DNSDuration["ALL"] += duration
  3481. if resolver != "" {
  3482. sshClient.qualityMetrics.DNSCount[resolver] += 1
  3483. sshClient.qualityMetrics.DNSDuration[resolver] += duration
  3484. }
  3485. } else {
  3486. sshClient.qualityMetrics.DNSFailedCount["ALL"] += 1
  3487. sshClient.qualityMetrics.DNSFailedDuration["ALL"] += duration
  3488. if resolver != "" {
  3489. sshClient.qualityMetrics.DNSFailedCount[resolver] += 1
  3490. sshClient.qualityMetrics.DNSFailedDuration[resolver] += duration
  3491. }
  3492. }
  3493. }
  3494. func (sshClient *sshClient) handleTCPChannel(
  3495. remainingDialTimeout time.Duration,
  3496. hostToConnect string,
  3497. portToConnect int,
  3498. doSplitTunnel bool,
  3499. newChannel ssh.NewChannel) {
  3500. // Assumptions:
  3501. // - sshClient.dialingTCPPortForward() has been called
  3502. // - remainingDialTimeout > 0
  3503. established := false
  3504. defer func() {
  3505. if !established {
  3506. sshClient.abortedTCPPortForward()
  3507. }
  3508. }()
  3509. // Transparently redirect web API request connections.
  3510. isWebServerPortForward := false
  3511. config := sshClient.sshServer.support.Config
  3512. if config.WebServerPortForwardAddress != "" {
  3513. destination := net.JoinHostPort(hostToConnect, strconv.Itoa(portToConnect))
  3514. if destination == config.WebServerPortForwardAddress {
  3515. isWebServerPortForward = true
  3516. if config.WebServerPortForwardRedirectAddress != "" {
  3517. // Note: redirect format is validated when config is loaded
  3518. host, portStr, _ := net.SplitHostPort(config.WebServerPortForwardRedirectAddress)
  3519. port, _ := strconv.Atoi(portStr)
  3520. hostToConnect = host
  3521. portToConnect = port
  3522. }
  3523. }
  3524. }
  3525. // Validate the domain name and check the domain blocklist before dialing.
  3526. //
  3527. // The IP blocklist is checked in isPortForwardPermitted, which also provides
  3528. // IP blocklist checking for the packet tunnel code path. When hostToConnect
  3529. // is an IP address, the following hostname resolution step effectively
  3530. // performs no actions and next immediate step is the isPortForwardPermitted
  3531. // check.
  3532. //
  3533. // Limitation: this case handles port forwards where the client sends the
  3534. // destination domain in the SSH port forward request but does not currently
  3535. // handle DNS-over-TCP; in the DNS-over-TCP case, a client may bypass the
  3536. // block list check.
  3537. if !isWebServerPortForward &&
  3538. net.ParseIP(hostToConnect) == nil {
  3539. ok, rejectMessage := sshClient.isDomainPermitted(hostToConnect)
  3540. if !ok {
  3541. // Note: not recording a port forward failure in this case
  3542. sshClient.rejectNewChannel(newChannel, rejectMessage)
  3543. return
  3544. }
  3545. }
  3546. // Dial the remote address.
  3547. //
  3548. // Hostname resolution is performed explicitly, as a separate step, as the
  3549. // target IP address is used for traffic rules (AllowSubnets), OSL seed
  3550. // progress, and IP address blocklists.
  3551. //
  3552. // Contexts are used for cancellation (via sshClient.runCtx, which is
  3553. // cancelled when the client is stopping) and timeouts.
  3554. dialStartTime := time.Now()
  3555. IP := net.ParseIP(hostToConnect)
  3556. if IP == nil {
  3557. // Resolve the hostname
  3558. log.WithTraceFields(LogFields{"hostToConnect": hostToConnect}).Debug("resolving")
  3559. ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  3560. IPs, err := (&net.Resolver{}).LookupIPAddr(ctx, hostToConnect)
  3561. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  3562. resolveElapsedTime := time.Since(dialStartTime)
  3563. // Record DNS metrics. If LookupIPAddr returns net.DNSError.IsNotFound, this
  3564. // is "no such host" and not a DNS failure. Limitation: the resolver IP is
  3565. // not known.
  3566. dnsErr, ok := err.(*net.DNSError)
  3567. dnsNotFound := ok && dnsErr.IsNotFound
  3568. dnsSuccess := err == nil || dnsNotFound
  3569. sshClient.updateQualityMetricsWithDNSResult(dnsSuccess, resolveElapsedTime, nil)
  3570. // IPv4 is preferred in case the host has limited IPv6 routing. IPv6 is
  3571. // selected and attempted only when there's no IPv4 option.
  3572. // TODO: shuffle list to try other IPs?
  3573. for _, ip := range IPs {
  3574. if ip.IP.To4() != nil {
  3575. IP = ip.IP
  3576. break
  3577. }
  3578. }
  3579. if IP == nil && len(IPs) > 0 {
  3580. // If there are no IPv4 IPs, the first IP is IPv6.
  3581. IP = IPs[0].IP
  3582. }
  3583. if err == nil && IP == nil {
  3584. err = std_errors.New("no IP address")
  3585. }
  3586. if err != nil {
  3587. // Record a port forward failure
  3588. sshClient.updateQualityMetricsWithDialResult(false, resolveElapsedTime, IP)
  3589. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("LookupIP failed: %s", err))
  3590. return
  3591. }
  3592. remainingDialTimeout -= resolveElapsedTime
  3593. }
  3594. if remainingDialTimeout <= 0 {
  3595. sshClient.rejectNewChannel(newChannel, "TCP port forward timed out resolving")
  3596. return
  3597. }
  3598. // When the client has indicated split tunnel mode and when the channel is
  3599. // not of type protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE, check if the
  3600. // client and the port forward destination are in the same GeoIP country. If
  3601. // so, reject the port forward with a distinct response code that indicates
  3602. // to the client that this port forward should be performed locally, direct
  3603. // and untunneled.
  3604. //
  3605. // Clients are expected to cache untunneled responses to avoid this round
  3606. // trip in the immediate future and reduce server load.
  3607. //
  3608. // When the countries differ, immediately proceed with the standard port
  3609. // forward. No additional round trip is required.
  3610. //
  3611. // If either GeoIP country is "None", one or both countries are unknown
  3612. // and there is no match.
  3613. //
  3614. // Traffic rules, such as allowed ports, are not enforced for port forward
  3615. // destinations classified as untunneled.
  3616. //
  3617. // Domain and IP blocklists still apply to port forward destinations
  3618. // classified as untunneled.
  3619. //
  3620. // The client's use of split tunnel mode is logged in server_tunnel metrics
  3621. // as the boolean value split_tunnel. As they may indicate some information
  3622. // about browsing activity, no other split tunnel metrics are logged.
  3623. if doSplitTunnel {
  3624. destinationGeoIPData := sshClient.sshServer.support.GeoIPService.LookupIP(IP)
  3625. if sshClient.geoIPData.Country != GEOIP_UNKNOWN_VALUE &&
  3626. sshClient.handshakeState.splitTunnelLookup.lookup(
  3627. destinationGeoIPData.Country) {
  3628. // Since isPortForwardPermitted is not called in this case, explicitly call
  3629. // ipBlocklistCheck. The domain blocklist case is handled above.
  3630. if !sshClient.isIPPermitted(IP) {
  3631. // Note: not recording a port forward failure in this case
  3632. sshClient.rejectNewChannel(newChannel, "port forward not permitted")
  3633. return
  3634. }
  3635. newChannel.Reject(protocol.CHANNEL_REJECT_REASON_SPLIT_TUNNEL, "")
  3636. return
  3637. }
  3638. }
  3639. // Enforce traffic rules, using the resolved IP address.
  3640. if !isWebServerPortForward &&
  3641. !sshClient.isPortForwardPermitted(
  3642. portForwardTypeTCP,
  3643. IP,
  3644. portToConnect) {
  3645. // Note: not recording a port forward failure in this case
  3646. sshClient.rejectNewChannel(newChannel, "port forward not permitted")
  3647. return
  3648. }
  3649. // TCP dial.
  3650. remoteAddr := net.JoinHostPort(IP.String(), strconv.Itoa(portToConnect))
  3651. log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("dialing")
  3652. ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  3653. fwdConn, err := (&net.Dialer{}).DialContext(ctx, "tcp", remoteAddr)
  3654. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  3655. // Record port forward success or failure
  3656. sshClient.updateQualityMetricsWithDialResult(err == nil, time.Since(dialStartTime), IP)
  3657. if err != nil {
  3658. // Monitor for low resource error conditions
  3659. sshClient.sshServer.monitorPortForwardDialError(err)
  3660. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("DialTimeout failed: %s", err))
  3661. return
  3662. }
  3663. // The upstream TCP port forward connection has been established. Schedule
  3664. // some cleanup and notify the SSH client that the channel is accepted.
  3665. defer fwdConn.Close()
  3666. fwdChannel, requests, err := newChannel.Accept()
  3667. if err != nil {
  3668. if !isExpectedTunnelIOError(err) {
  3669. log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")
  3670. }
  3671. return
  3672. }
  3673. go ssh.DiscardRequests(requests)
  3674. defer fwdChannel.Close()
  3675. // Release the dialing slot and acquire an established slot.
  3676. //
  3677. // establishedPortForward increments the concurrent TCP port
  3678. // forward counter and closes the LRU existing TCP port forward
  3679. // when already at the limit.
  3680. //
  3681. // Known limitations:
  3682. //
  3683. // - Closed LRU TCP sockets will enter the TIME_WAIT state,
  3684. // continuing to consume some resources.
  3685. sshClient.establishedPortForward(portForwardTypeTCP, sshClient.tcpPortForwardLRU)
  3686. // "established = true" cancels the deferred abortedTCPPortForward()
  3687. established = true
  3688. // TODO: 64-bit alignment? https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  3689. var bytesUp, bytesDown int64
  3690. defer func() {
  3691. sshClient.closedPortForward(
  3692. portForwardTypeTCP, atomic.LoadInt64(&bytesUp), atomic.LoadInt64(&bytesDown))
  3693. }()
  3694. lruEntry := sshClient.tcpPortForwardLRU.Add(fwdConn)
  3695. defer lruEntry.Remove()
  3696. // ActivityMonitoredConn monitors the TCP port forward I/O and updates
  3697. // its LRU status. ActivityMonitoredConn also times out I/O on the port
  3698. // forward if both reads and writes have been idle for the specified
  3699. // duration.
  3700. fwdConn, err = common.NewActivityMonitoredConn(
  3701. fwdConn,
  3702. sshClient.idleTCPPortForwardTimeout(),
  3703. true,
  3704. lruEntry,
  3705. sshClient.getActivityUpdaters(portForwardTypeTCP, IP)...)
  3706. if err != nil {
  3707. log.WithTraceFields(LogFields{"error": err}).Error("NewActivityMonitoredConn failed")
  3708. return
  3709. }
  3710. // Relay channel to forwarded connection.
  3711. log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("relaying")
  3712. // TODO: relay errors to fwdChannel.Stderr()?
  3713. relayWaitGroup := new(sync.WaitGroup)
  3714. relayWaitGroup.Add(1)
  3715. go func() {
  3716. defer relayWaitGroup.Done()
  3717. // io.Copy allocates a 32K temporary buffer, and each port forward relay
  3718. // uses two of these buffers; using common.CopyBuffer with a smaller buffer
  3719. // reduces the overall memory footprint.
  3720. bytes, err := common.CopyBuffer(
  3721. fwdChannel, fwdConn, make([]byte, SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE))
  3722. atomic.AddInt64(&bytesDown, bytes)
  3723. if err != nil && err != io.EOF {
  3724. // Debug since errors such as "connection reset by peer" occur during normal operation
  3725. log.WithTraceFields(LogFields{"error": err}).Debug("downstream TCP relay failed")
  3726. }
  3727. // Interrupt upstream io.Copy when downstream is shutting down.
  3728. // TODO: this is done to quickly cleanup the port forward when
  3729. // fwdConn has a read timeout, but is it clean -- upstream may still
  3730. // be flowing?
  3731. fwdChannel.Close()
  3732. }()
  3733. bytes, err := common.CopyBuffer(
  3734. fwdConn, fwdChannel, make([]byte, SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE))
  3735. atomic.AddInt64(&bytesUp, bytes)
  3736. if err != nil && err != io.EOF {
  3737. log.WithTraceFields(LogFields{"error": err}).Debug("upstream TCP relay failed")
  3738. }
  3739. // Shutdown special case: fwdChannel will be closed and return EOF when
  3740. // the SSH connection is closed, but we need to explicitly close fwdConn
  3741. // to interrupt the downstream io.Copy, which may be blocked on a
  3742. // fwdConn.Read().
  3743. fwdConn.Close()
  3744. relayWaitGroup.Wait()
  3745. log.WithTraceFields(
  3746. LogFields{
  3747. "remoteAddr": remoteAddr,
  3748. "bytesUp": atomic.LoadInt64(&bytesUp),
  3749. "bytesDown": atomic.LoadInt64(&bytesDown)}).Debug("exiting")
  3750. }