tunnelServer.go 166 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574457545764577457845794580458145824583458445854586458745884589459045914592459345944595459645974598459946004601460246034604460546064607460846094610461146124613461446154616461746184619462046214622462346244625462646274628462946304631463246334634463546364637463846394640464146424643464446454646464746484649465046514652465346544655465646574658465946604661466246634664466546664667466846694670467146724673467446754676467746784679468046814682468346844685468646874688468946904691469246934694469546964697469846994700470147024703470447054706470747084709471047114712471347144715471647174718471947204721472247234724472547264727472847294730473147324733473447354736473747384739474047414742474347444745474647474748474947504751475247534754475547564757475847594760476147624763476447654766476747684769477047714772477347744775477647774778477947804781478247834784478547864787478847894790479147924793479447954796479747984799480048014802480348044805480648074808480948104811481248134814481548164817481848194820482148224823482448254826482748284829483048314832483348344835483648374838483948404841484248434844484548464847484848494850485148524853485448554856485748584859486048614862486348644865486648674868486948704871487248734874487548764877487848794880488148824883488448854886488748884889489048914892489348944895489648974898489949004901490249034904490549064907490849094910491149124913491449154916491749184919
  1. /*
  2. * Copyright (c) 2016, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "bytes"
  22. "context"
  23. "crypto/rand"
  24. "crypto/subtle"
  25. "encoding/base64"
  26. "encoding/json"
  27. std_errors "errors"
  28. "fmt"
  29. "io"
  30. "io/ioutil"
  31. "net"
  32. "strconv"
  33. "strings"
  34. "sync"
  35. "sync/atomic"
  36. "syscall"
  37. "time"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/accesscontrol"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/ssh"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/monotime"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
  45. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/osl"
  46. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  47. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  48. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  49. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  50. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/refraction"
  51. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
  52. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/transforms"
  53. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tun"
  54. "github.com/marusama/semaphore"
  55. cache "github.com/patrickmn/go-cache"
  56. )
  57. const (
  58. SSH_AUTH_LOG_PERIOD = 30 * time.Minute
  59. SSH_HANDSHAKE_TIMEOUT = 30 * time.Second
  60. SSH_BEGIN_HANDSHAKE_TIMEOUT = 1 * time.Second
  61. SSH_CONNECTION_READ_DEADLINE = 5 * time.Minute
  62. SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE = 8192
  63. SSH_TCP_PORT_FORWARD_QUEUE_SIZE = 1024
  64. SSH_KEEP_ALIVE_PAYLOAD_MIN_BYTES = 0
  65. SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES = 256
  66. SSH_SEND_OSL_INITIAL_RETRY_DELAY = 30 * time.Second
  67. SSH_SEND_OSL_RETRY_FACTOR = 2
  68. GEOIP_SESSION_CACHE_TTL = 60 * time.Minute
  69. OSL_SESSION_CACHE_TTL = 5 * time.Minute
  70. MAX_AUTHORIZATIONS = 16
  71. PRE_HANDSHAKE_RANDOM_STREAM_MAX_COUNT = 1
  72. RANDOM_STREAM_MAX_BYTES = 10485760
  73. ALERT_REQUEST_QUEUE_BUFFER_SIZE = 16
  74. SSH_MAX_CLIENT_COUNT = 100000
  75. )
  76. // TunnelServer is the main server that accepts Psiphon client
  77. // connections, via various obfuscation protocols, and provides
  78. // port forwarding (TCP and UDP) services to the Psiphon client.
  79. // At its core, TunnelServer is an SSH server. SSH is the base
  80. // protocol that provides port forward multiplexing, and transport
  81. // security. Layered on top of SSH, optionally, is Obfuscated SSH
  82. // and meek protocols, which provide further circumvention
  83. // capabilities.
  84. type TunnelServer struct {
  85. runWaitGroup *sync.WaitGroup
  86. listenerError chan error
  87. shutdownBroadcast <-chan struct{}
  88. sshServer *sshServer
  89. }
  90. type sshListener struct {
  91. net.Listener
  92. localAddress string
  93. tunnelProtocol string
  94. port int
  95. BPFProgramName string
  96. }
  97. // NewTunnelServer initializes a new tunnel server.
  98. func NewTunnelServer(
  99. support *SupportServices,
  100. shutdownBroadcast <-chan struct{}) (*TunnelServer, error) {
  101. sshServer, err := newSSHServer(support, shutdownBroadcast)
  102. if err != nil {
  103. return nil, errors.Trace(err)
  104. }
  105. return &TunnelServer{
  106. runWaitGroup: new(sync.WaitGroup),
  107. listenerError: make(chan error),
  108. shutdownBroadcast: shutdownBroadcast,
  109. sshServer: sshServer,
  110. }, nil
  111. }
  112. // Run runs the tunnel server; this function blocks while running a selection of
  113. // listeners that handle connections using various obfuscation protocols.
  114. //
  115. // Run listens on each designated tunnel port and spawns new goroutines to handle
  116. // each client connection. It halts when shutdownBroadcast is signaled. A list of active
  117. // clients is maintained, and when halting all clients are cleanly shutdown.
  118. //
  119. // Each client goroutine handles its own obfuscation (optional), SSH handshake, SSH
  120. // authentication, and then looping on client new channel requests. "direct-tcpip"
  121. // channels, dynamic port fowards, are supported. When the UDPInterceptUdpgwServerAddress
  122. // config parameter is configured, UDP port forwards over a TCP stream, following
  123. // the udpgw protocol, are handled.
  124. //
  125. // A new goroutine is spawned to handle each port forward for each client. Each port
  126. // forward tracks its bytes transferred. Overall per-client stats for connection duration,
  127. // GeoIP, number of port forwards, and bytes transferred are tracked and logged when the
  128. // client shuts down.
  129. //
  130. // Note: client handler goroutines may still be shutting down after Run() returns. See
  131. // comment in sshClient.stop(). TODO: fully synchronized shutdown.
  132. func (server *TunnelServer) Run() error {
  133. // TODO: should TunnelServer hold its own support pointer?
  134. support := server.sshServer.support
  135. // First bind all listeners; once all are successful,
  136. // start accepting connections on each.
  137. var listeners []*sshListener
  138. for tunnelProtocol, listenPort := range support.Config.TunnelProtocolPorts {
  139. localAddress := net.JoinHostPort(
  140. support.Config.ServerIPAddress, strconv.Itoa(listenPort))
  141. var listener net.Listener
  142. var BPFProgramName string
  143. var err error
  144. if protocol.TunnelProtocolUsesFrontedMeekQUIC(tunnelProtocol) {
  145. // For FRONTED-MEEK-QUIC-OSSH, no listener implemented. The edge-to-server
  146. // hop uses HTTPS and the client tunnel protocol is distinguished using
  147. // protocol.MeekCookieData.ClientTunnelProtocol.
  148. continue
  149. } else if protocol.TunnelProtocolUsesQUIC(tunnelProtocol) {
  150. // in-proxy QUIC tunnel protocols don't support gQUIC.
  151. enableGQUIC := support.Config.EnableGQUIC && !protocol.TunnelProtocolUsesInproxy(tunnelProtocol)
  152. logTunnelProtocol := tunnelProtocol
  153. listener, err = quic.Listen(
  154. CommonLogger(log),
  155. func(peerAddress string, err error, logFields common.LogFields) {
  156. logIrregularTunnel(
  157. support, logTunnelProtocol, listenPort, peerAddress,
  158. errors.Trace(err), LogFields(logFields))
  159. },
  160. localAddress,
  161. support.Config.ObfuscatedSSHKey,
  162. enableGQUIC)
  163. } else if protocol.TunnelProtocolUsesRefractionNetworking(tunnelProtocol) {
  164. listener, err = refraction.Listen(localAddress)
  165. } else if protocol.TunnelProtocolUsesFrontedMeek(tunnelProtocol) {
  166. listener, err = net.Listen("tcp", localAddress)
  167. } else {
  168. // Only direct, unfronted protocol listeners use TCP BPF circumvention
  169. // programs.
  170. listener, BPFProgramName, err = newTCPListenerWithBPF(support, localAddress)
  171. if protocol.TunnelProtocolUsesTLSOSSH(tunnelProtocol) {
  172. listener, err = ListenTLSTunnel(support, listener, tunnelProtocol, listenPort)
  173. if err != nil {
  174. return errors.Trace(err)
  175. }
  176. }
  177. }
  178. if err != nil {
  179. for _, existingListener := range listeners {
  180. existingListener.Listener.Close()
  181. }
  182. return errors.Trace(err)
  183. }
  184. tacticsListener := NewTacticsListener(
  185. support,
  186. listener,
  187. tunnelProtocol,
  188. func(IP string) GeoIPData { return support.GeoIPService.Lookup(IP) })
  189. log.WithTraceFields(
  190. LogFields{
  191. "localAddress": localAddress,
  192. "tunnelProtocol": tunnelProtocol,
  193. "BPFProgramName": BPFProgramName,
  194. }).Info("listening")
  195. listeners = append(
  196. listeners,
  197. &sshListener{
  198. Listener: tacticsListener,
  199. localAddress: localAddress,
  200. port: listenPort,
  201. tunnelProtocol: tunnelProtocol,
  202. BPFProgramName: BPFProgramName,
  203. })
  204. }
  205. for _, listener := range listeners {
  206. server.runWaitGroup.Add(1)
  207. go func(listener *sshListener) {
  208. defer server.runWaitGroup.Done()
  209. log.WithTraceFields(
  210. LogFields{
  211. "localAddress": listener.localAddress,
  212. "tunnelProtocol": listener.tunnelProtocol,
  213. }).Info("running")
  214. server.sshServer.runListener(
  215. listener,
  216. server.listenerError)
  217. log.WithTraceFields(
  218. LogFields{
  219. "localAddress": listener.localAddress,
  220. "tunnelProtocol": listener.tunnelProtocol,
  221. }).Info("stopped")
  222. }(listener)
  223. }
  224. var err error
  225. select {
  226. case <-server.shutdownBroadcast:
  227. case err = <-server.listenerError:
  228. }
  229. for _, listener := range listeners {
  230. listener.Close()
  231. }
  232. server.sshServer.stopClients()
  233. server.runWaitGroup.Wait()
  234. log.WithTrace().Info("stopped")
  235. return err
  236. }
  237. // GetLoadStats returns load stats for the tunnel server. The stats are
  238. // broken down by protocol ("SSH", "OSSH", etc.) and type. Types of stats
  239. // include current connected client count, total number of current port
  240. // forwards.
  241. func (server *TunnelServer) GetLoadStats() (
  242. UpstreamStats, ProtocolStats, RegionStats) {
  243. return server.sshServer.getLoadStats()
  244. }
  245. // GetEstablishedClientCount returns the number of currently established
  246. // clients.
  247. func (server *TunnelServer) GetEstablishedClientCount() int {
  248. return server.sshServer.getEstablishedClientCount()
  249. }
  250. // ResetAllClientTrafficRules resets all established client traffic rules
  251. // to use the latest config and client properties. Any existing traffic
  252. // rule state is lost, including throttling state.
  253. func (server *TunnelServer) ResetAllClientTrafficRules() {
  254. server.sshServer.resetAllClientTrafficRules()
  255. }
  256. // ResetAllClientOSLConfigs resets all established client OSL state to use
  257. // the latest OSL config. Any existing OSL state is lost, including partial
  258. // progress towards SLOKs.
  259. func (server *TunnelServer) ResetAllClientOSLConfigs() {
  260. server.sshServer.resetAllClientOSLConfigs()
  261. }
  262. // ReloadTactics signals components that use server-side tactics for one-time
  263. // initialization to reload and use potentially changed parameters.
  264. func (server *TunnelServer) ReloadTactics() error {
  265. return errors.Trace(server.sshServer.reloadTactics())
  266. }
  267. // SetEstablishTunnels sets whether new tunnels may be established or not.
  268. // When not establishing, incoming connections are immediately closed.
  269. func (server *TunnelServer) SetEstablishTunnels(establish bool) {
  270. server.sshServer.setEstablishTunnels(establish)
  271. }
  272. // CheckEstablishTunnels returns whether new tunnels may be established or
  273. // not, and increments a metrics counter when establishment is disallowed.
  274. func (server *TunnelServer) CheckEstablishTunnels() bool {
  275. return server.sshServer.checkEstablishTunnels()
  276. }
  277. // GetEstablishTunnelsMetrics returns whether tunnel establishment is
  278. // currently allowed and the number of tunnels rejected since due to not
  279. // establishing since the last GetEstablishTunnelsMetrics call.
  280. func (server *TunnelServer) GetEstablishTunnelsMetrics() (bool, int64) {
  281. return server.sshServer.getEstablishTunnelsMetrics()
  282. }
  283. type sshServer struct {
  284. // Note: 64-bit ints used with atomic operations are placed
  285. // at the start of struct to ensure 64-bit alignment.
  286. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  287. lastAuthLog int64
  288. authFailedCount int64
  289. establishLimitedCount int64
  290. support *SupportServices
  291. establishTunnels int32
  292. concurrentSSHHandshakes semaphore.Semaphore
  293. shutdownBroadcast <-chan struct{}
  294. sshHostKey ssh.Signer
  295. obfuscatorSeedHistory *obfuscator.SeedHistory
  296. inproxyBrokerSessions *inproxy.ServerBrokerSessions
  297. clientsMutex sync.Mutex
  298. stoppingClients bool
  299. acceptedClientCounts map[string]map[string]int64
  300. clients map[string]*sshClient
  301. geoIPSessionCache *cache.Cache
  302. oslSessionCacheMutex sync.Mutex
  303. oslSessionCache *cache.Cache
  304. authorizationSessionIDsMutex sync.Mutex
  305. authorizationSessionIDs map[string]string
  306. meekServersMutex sync.Mutex
  307. meekServers []*MeekServer
  308. }
  309. func newSSHServer(
  310. support *SupportServices,
  311. shutdownBroadcast <-chan struct{}) (*sshServer, error) {
  312. privateKey, err := ssh.ParseRawPrivateKey([]byte(support.Config.SSHPrivateKey))
  313. if err != nil {
  314. return nil, errors.Trace(err)
  315. }
  316. // TODO: use cert (ssh.NewCertSigner) for anti-fingerprint?
  317. signer, err := ssh.NewSignerFromKey(privateKey)
  318. if err != nil {
  319. return nil, errors.Trace(err)
  320. }
  321. var concurrentSSHHandshakes semaphore.Semaphore
  322. if support.Config.MaxConcurrentSSHHandshakes > 0 {
  323. concurrentSSHHandshakes = semaphore.New(support.Config.MaxConcurrentSSHHandshakes)
  324. }
  325. // The geoIPSessionCache replaces the legacy cache that used to be in
  326. // GeoIPServices and was used for the now-retired web API. That cache was
  327. // also used for, and now geoIPSessionCache provides:
  328. // - Determining first-tunnel-in-session (from a single server's point of
  329. // view)
  330. // - GeoIP for duplicate authorizations logic.
  331. //
  332. // TODO: combine geoIPSessionCache with oslSessionCache; need to deal with
  333. // OSL flush on hot reload and reconcile differing TTLs.
  334. geoIPSessionCache := cache.New(GEOIP_SESSION_CACHE_TTL, 1*time.Minute)
  335. // The OSL session cache temporarily retains OSL seed state
  336. // progress for disconnected clients. This enables clients
  337. // that disconnect and immediately reconnect to the same
  338. // server to resume their OSL progress. Cached progress
  339. // is referenced by session ID and is retained for
  340. // OSL_SESSION_CACHE_TTL after disconnect.
  341. //
  342. // Note: session IDs are assumed to be unpredictable. If a
  343. // rogue client could guess the session ID of another client,
  344. // it could resume its OSL progress and, if the OSL config
  345. // were known, infer some activity.
  346. oslSessionCache := cache.New(OSL_SESSION_CACHE_TTL, 1*time.Minute)
  347. // inproxyBrokerSessions are the secure in-proxy broker/server sessions
  348. // used to relay information from the broker to the server, including the
  349. // original in-proxy client IP and the in-proxy proxy ID.
  350. //
  351. // Only brokers with public keys configured in the
  352. // InproxyAllBrokerPublicKeys tactic parameter are allowed to connect to
  353. // the server, and brokers verify the server's public key via the
  354. // InproxySessionPublicKey server entry field.
  355. //
  356. // Sessions are initialized and run for all psiphond instances running any
  357. // in-proxy tunnel protocol.
  358. var inproxyBrokerSessions *inproxy.ServerBrokerSessions
  359. runningInproxy := false
  360. for tunnelProtocol := range support.Config.TunnelProtocolPorts {
  361. if protocol.TunnelProtocolUsesInproxy(tunnelProtocol) {
  362. runningInproxy = true
  363. break
  364. }
  365. }
  366. if runningInproxy {
  367. inproxyPrivateKey, err := inproxy.SessionPrivateKeyFromString(
  368. support.Config.InproxyServerSessionPrivateKey)
  369. if err != nil {
  370. return nil, errors.Trace(err)
  371. }
  372. inproxyObfuscationSecret, err := inproxy.ObfuscationSecretFromString(
  373. support.Config.InproxyServerObfuscationRootSecret)
  374. if err != nil {
  375. return nil, errors.Trace(err)
  376. }
  377. // The expected broker public keys are set in reloadTactics directly
  378. // below, so none are set here.
  379. inproxyBrokerSessions, err = inproxy.NewServerBrokerSessions(
  380. inproxyPrivateKey, inproxyObfuscationSecret, nil)
  381. if err != nil {
  382. return nil, errors.Trace(err)
  383. }
  384. }
  385. // Limitation: rate limiting and resource limiting are handled by external
  386. // components, and sshServer enforces only a sanity check limit on the
  387. // number of entries in sshServer.clients; and no limit on the number of
  388. // entries in sshServer.geoIPSessionCache or sshServer.oslSessionCache.
  389. //
  390. // To avoid resource exhaustion, this implementation relies on:
  391. //
  392. // - Per-peer IP address and/or overall network connection rate limiting,
  393. // provided by iptables as configured by Psiphon automation
  394. // (https://github.com/Psiphon-Inc/psiphon-automation/blob/
  395. // 4d913d13339d7d54c053a01e5a928e343045cde8/Automation/psi_ops_install.py#L1451).
  396. //
  397. // - Host CPU/memory/network monitoring and signalling, installed Psiphon
  398. // automation
  399. // (https://github.com/Psiphon-Inc/psiphon-automation/blob/
  400. // 4d913d13339d7d54c053a01e5a928e343045cde8/Automation/psi_ops_install.py#L935).
  401. // When resource usage meets certain thresholds, the monitoring signals
  402. // this process with SIGTSTP or SIGCONT, and handlers call
  403. // sshServer.setEstablishTunnels to stop or resume accepting new clients.
  404. sshServer := &sshServer{
  405. support: support,
  406. establishTunnels: 1,
  407. concurrentSSHHandshakes: concurrentSSHHandshakes,
  408. shutdownBroadcast: shutdownBroadcast,
  409. sshHostKey: signer,
  410. acceptedClientCounts: make(map[string]map[string]int64),
  411. clients: make(map[string]*sshClient),
  412. geoIPSessionCache: geoIPSessionCache,
  413. oslSessionCache: oslSessionCache,
  414. authorizationSessionIDs: make(map[string]string),
  415. obfuscatorSeedHistory: obfuscator.NewSeedHistory(nil),
  416. inproxyBrokerSessions: inproxyBrokerSessions,
  417. }
  418. // Initialize components that use server-side tactics and which reload on
  419. // tactics change events.
  420. err = sshServer.reloadTactics()
  421. if err != nil {
  422. return nil, errors.Trace(err)
  423. }
  424. return sshServer, nil
  425. }
  426. func (sshServer *sshServer) setEstablishTunnels(establish bool) {
  427. // Do nothing when the setting is already correct. This avoids
  428. // spurious log messages when setEstablishTunnels is called
  429. // periodically with the same setting.
  430. if establish == (atomic.LoadInt32(&sshServer.establishTunnels) == 1) {
  431. return
  432. }
  433. establishFlag := int32(1)
  434. if !establish {
  435. establishFlag = 0
  436. }
  437. atomic.StoreInt32(&sshServer.establishTunnels, establishFlag)
  438. log.WithTraceFields(
  439. LogFields{"establish": establish}).Info("establishing tunnels")
  440. }
  441. func (sshServer *sshServer) checkEstablishTunnels() bool {
  442. establishTunnels := atomic.LoadInt32(&sshServer.establishTunnels) == 1
  443. if !establishTunnels {
  444. atomic.AddInt64(&sshServer.establishLimitedCount, 1)
  445. }
  446. return establishTunnels
  447. }
  448. func (sshServer *sshServer) getEstablishTunnelsMetrics() (bool, int64) {
  449. return atomic.LoadInt32(&sshServer.establishTunnels) == 1,
  450. atomic.SwapInt64(&sshServer.establishLimitedCount, 0)
  451. }
  452. // additionalTransportData is additional data gathered at transport level,
  453. // such as in MeekServer at the HTTP layer, and relayed to the
  454. // sshServer/sshClient.
  455. type additionalTransportData struct {
  456. overrideTunnelProtocol string
  457. steeringIP string
  458. }
  459. // runListener is intended to run an a goroutine; it blocks
  460. // running a particular listener. If an unrecoverable error
  461. // occurs, it will send the error to the listenerError channel.
  462. func (sshServer *sshServer) runListener(sshListener *sshListener, listenerError chan<- error) {
  463. handleClient := func(conn net.Conn, transportData *additionalTransportData) {
  464. // Note: establish tunnel limiter cannot simply stop TCP
  465. // listeners in all cases (e.g., meek) since SSH tunnels can
  466. // span multiple TCP connections.
  467. if !sshServer.checkEstablishTunnels() {
  468. if IsLogLevelDebug() {
  469. log.WithTrace().Debug("not establishing tunnels")
  470. }
  471. conn.Close()
  472. return
  473. }
  474. // sshListener.tunnelProtocol indictes the tunnel protocol run by the
  475. // listener. For direct protocols, this is also the client tunnel protocol.
  476. // For fronted protocols, the client may use a different protocol to connect
  477. // to the front and then only the front-to-Psiphon server will use the
  478. // listener protocol.
  479. //
  480. // A fronted meek client, for example, reports its first hop protocol in
  481. // protocol.MeekCookieData.ClientTunnelProtocol. Most metrics record this
  482. // value as relay_protocol, since the first hop is the one subject to
  483. // adversarial conditions. In some cases, such as irregular tunnels, there
  484. // is no ClientTunnelProtocol value available and the listener tunnel
  485. // protocol will be logged.
  486. //
  487. // Similarly, listenerPort indicates the listening port, which is the dialed
  488. // port number for direct protocols; while, for fronted protocols, the
  489. // client may dial a different port for its first hop.
  490. // Process each client connection concurrently.
  491. go sshServer.handleClient(sshListener, conn, transportData)
  492. }
  493. // Note: when exiting due to a unrecoverable error, be sure
  494. // to try to send the error to listenerError so that the outer
  495. // TunnelServer.Run will properly shut down instead of remaining
  496. // running.
  497. if protocol.TunnelProtocolUsesMeekHTTP(sshListener.tunnelProtocol) ||
  498. protocol.TunnelProtocolUsesMeekHTTPS(sshListener.tunnelProtocol) {
  499. if sshServer.tunnelProtocolUsesTLSDemux(sshListener.tunnelProtocol) {
  500. sshServer.runMeekTLSOSSHDemuxListener(sshListener, listenerError, handleClient)
  501. } else {
  502. meekServer, err := NewMeekServer(
  503. sshServer.support,
  504. sshListener.Listener,
  505. sshListener.tunnelProtocol,
  506. sshListener.port,
  507. protocol.TunnelProtocolUsesMeekHTTPS(sshListener.tunnelProtocol),
  508. protocol.TunnelProtocolUsesFrontedMeek(sshListener.tunnelProtocol),
  509. protocol.TunnelProtocolUsesObfuscatedSessionTickets(sshListener.tunnelProtocol),
  510. true,
  511. handleClient,
  512. sshServer.shutdownBroadcast)
  513. if err == nil {
  514. sshServer.registerMeekServer(meekServer)
  515. err = meekServer.Run()
  516. }
  517. if err != nil {
  518. select {
  519. case listenerError <- errors.Trace(err):
  520. default:
  521. }
  522. return
  523. }
  524. }
  525. } else {
  526. runListener(sshListener.Listener, sshServer.shutdownBroadcast, listenerError, "", handleClient)
  527. }
  528. }
  529. // runMeekTLSOSSHDemuxListener blocks running a listener which demuxes meek and
  530. // TLS-OSSH connections received on the same port.
  531. func (sshServer *sshServer) runMeekTLSOSSHDemuxListener(
  532. sshListener *sshListener,
  533. listenerError chan<- error,
  534. handleClient func(conn net.Conn, transportData *additionalTransportData)) {
  535. meekClassifier := protocolClassifier{
  536. minBytesToMatch: 4,
  537. maxBytesToMatch: 4,
  538. match: func(b []byte) bool {
  539. // NOTE: HTTP transforms are only applied to plain HTTP
  540. // meek so they are not a concern here.
  541. return bytes.Contains(b, []byte("POST"))
  542. },
  543. }
  544. tlsClassifier := protocolClassifier{
  545. // NOTE: technically +1 not needed if detectors are evaluated
  546. // in order by index in classifier array, which they are.
  547. minBytesToMatch: meekClassifier.maxBytesToMatch + 1,
  548. maxBytesToMatch: meekClassifier.maxBytesToMatch + 1,
  549. match: func(b []byte) bool {
  550. return len(b) > 4 // if not classified as meek, then tls
  551. },
  552. }
  553. listener, err := ListenTLSTunnel(
  554. sshServer.support,
  555. sshListener.Listener,
  556. sshListener.tunnelProtocol,
  557. sshListener.port)
  558. if err != nil {
  559. select {
  560. case listenerError <- errors.Trace(err):
  561. default:
  562. }
  563. return
  564. }
  565. mux, listeners := newProtocolDemux(
  566. context.Background(),
  567. listener,
  568. []protocolClassifier{meekClassifier, tlsClassifier},
  569. sshServer.support.Config.sshHandshakeTimeout)
  570. var wg sync.WaitGroup
  571. wg.Add(1)
  572. go func() {
  573. // handle shutdown gracefully
  574. defer wg.Done()
  575. <-sshServer.shutdownBroadcast
  576. err := mux.Close()
  577. if err != nil {
  578. log.WithTraceFields(LogFields{"error": err}).Error("close failed")
  579. }
  580. }()
  581. wg.Add(1)
  582. go func() {
  583. // start demultiplexing TLS-OSSH and meek HTTPS connections
  584. defer wg.Done()
  585. err := mux.run()
  586. if err != nil {
  587. select {
  588. case listenerError <- errors.Trace(err):
  589. default:
  590. }
  591. return
  592. }
  593. }()
  594. wg.Add(1)
  595. go func() {
  596. // start handling TLS-OSSH connections as they are demultiplexed
  597. defer wg.Done()
  598. // Override the listener tunnel protocol to report TLS-OSSH instead.
  599. runListener(
  600. listeners[1],
  601. sshServer.shutdownBroadcast,
  602. listenerError,
  603. protocol.TUNNEL_PROTOCOL_TLS_OBFUSCATED_SSH, handleClient)
  604. }()
  605. wg.Add(1)
  606. go func() {
  607. // start handling meek HTTPS connections as they are
  608. // demultiplexed
  609. defer wg.Done()
  610. meekServer, err := NewMeekServer(
  611. sshServer.support,
  612. listeners[0],
  613. sshListener.tunnelProtocol,
  614. sshListener.port,
  615. false,
  616. protocol.TunnelProtocolUsesFrontedMeek(sshListener.tunnelProtocol),
  617. protocol.TunnelProtocolUsesObfuscatedSessionTickets(sshListener.tunnelProtocol),
  618. true,
  619. handleClient,
  620. sshServer.shutdownBroadcast)
  621. if err == nil {
  622. sshServer.registerMeekServer(meekServer)
  623. err = meekServer.Run()
  624. }
  625. if err != nil {
  626. select {
  627. case listenerError <- errors.Trace(err):
  628. default:
  629. }
  630. return
  631. }
  632. }()
  633. wg.Wait()
  634. }
  635. func runListener(
  636. listener net.Listener,
  637. shutdownBroadcast <-chan struct{},
  638. listenerError chan<- error,
  639. overrideTunnelProtocol string,
  640. handleClient func(conn net.Conn, transportData *additionalTransportData)) {
  641. for {
  642. conn, err := listener.Accept()
  643. select {
  644. case <-shutdownBroadcast:
  645. if err == nil {
  646. conn.Close()
  647. }
  648. return
  649. default:
  650. }
  651. if err != nil {
  652. if e, ok := err.(net.Error); ok && e.Temporary() {
  653. log.WithTraceFields(LogFields{"error": err}).Error("accept failed")
  654. // Temporary error, keep running
  655. continue
  656. } else if std_errors.Is(err, errRestrictedProvider) {
  657. log.WithTraceFields(LogFields{"error": err}).Error("accept rejected client")
  658. // Restricted provider, keep running
  659. continue
  660. }
  661. select {
  662. case listenerError <- errors.Trace(err):
  663. default:
  664. }
  665. return
  666. }
  667. var transportData *additionalTransportData
  668. if overrideTunnelProtocol != "" {
  669. transportData = &additionalTransportData{
  670. overrideTunnelProtocol: overrideTunnelProtocol,
  671. }
  672. }
  673. handleClient(conn, transportData)
  674. }
  675. }
  676. // registerMeekServer registers a MeekServer instance to receive tactics
  677. // reload signals.
  678. func (sshServer *sshServer) registerMeekServer(meekServer *MeekServer) {
  679. sshServer.meekServersMutex.Lock()
  680. defer sshServer.meekServersMutex.Unlock()
  681. sshServer.meekServers = append(sshServer.meekServers, meekServer)
  682. }
  683. // reloadMeekServerTactics signals each registered MeekServer instance that
  684. // tactics have reloaded and may have changed.
  685. func (sshServer *sshServer) reloadMeekServerTactics() error {
  686. sshServer.meekServersMutex.Lock()
  687. defer sshServer.meekServersMutex.Unlock()
  688. for _, meekServer := range sshServer.meekServers {
  689. err := meekServer.ReloadTactics()
  690. if err != nil {
  691. return errors.Trace(err)
  692. }
  693. }
  694. return nil
  695. }
  696. // An accepted client has completed a direct TCP or meek connection and has a
  697. // net.Conn. Registration is for tracking the number of connections.
  698. func (sshServer *sshServer) registerAcceptedClient(tunnelProtocol, region string) {
  699. sshServer.clientsMutex.Lock()
  700. defer sshServer.clientsMutex.Unlock()
  701. if sshServer.acceptedClientCounts[tunnelProtocol] == nil {
  702. sshServer.acceptedClientCounts[tunnelProtocol] = make(map[string]int64)
  703. }
  704. sshServer.acceptedClientCounts[tunnelProtocol][region] += 1
  705. }
  706. func (sshServer *sshServer) unregisterAcceptedClient(tunnelProtocol, region string) {
  707. sshServer.clientsMutex.Lock()
  708. defer sshServer.clientsMutex.Unlock()
  709. sshServer.acceptedClientCounts[tunnelProtocol][region] -= 1
  710. }
  711. // An established client has completed its SSH handshake and has a ssh.Conn. Registration is
  712. // for tracking the number of fully established clients and for maintaining a list of running
  713. // clients (for stopping at shutdown time).
  714. func (sshServer *sshServer) registerEstablishedClient(client *sshClient) bool {
  715. sshServer.clientsMutex.Lock()
  716. if sshServer.stoppingClients {
  717. sshServer.clientsMutex.Unlock()
  718. return false
  719. }
  720. // In the case of a duplicate client sessionID, the previous client is closed.
  721. // - Well-behaved clients generate a random sessionID that should be unique (won't
  722. // accidentally conflict) and hard to guess (can't be targeted by a malicious
  723. // client).
  724. // - Clients reuse the same sessionID when a tunnel is unexpectedly disconnected
  725. // and reestablished. In this case, when the same server is selected, this logic
  726. // will be hit; closing the old, dangling client is desirable.
  727. // - Multi-tunnel clients should not normally use one server for multiple tunnels.
  728. existingClient := sshServer.clients[client.sessionID]
  729. sshServer.clientsMutex.Unlock()
  730. if existingClient != nil {
  731. // This case is expected to be common, and so logged at the lowest severity
  732. // level.
  733. log.WithTrace().Debug(
  734. "stopping existing client with duplicate session ID")
  735. existingClient.stop()
  736. // Block until the existingClient is fully terminated. This is necessary to
  737. // avoid this scenario:
  738. // - existingClient is invoking handshakeAPIRequestHandler
  739. // - sshServer.clients[client.sessionID] is updated to point to new client
  740. // - existingClient's handshakeAPIRequestHandler invokes
  741. // setHandshakeState but sets the handshake parameters for new
  742. // client
  743. // - as a result, the new client handshake will fail (only a single handshake
  744. // is permitted) and the new client server_tunnel log will contain an
  745. // invalid mix of existing/new client fields
  746. //
  747. // Once existingClient.awaitStopped returns, all existingClient port
  748. // forwards and request handlers have terminated, so no API handler, either
  749. // tunneled web API or SSH API, will remain and it is safe to point
  750. // sshServer.clients[client.sessionID] to the new client.
  751. // Limitation: this scenario remains possible with _untunneled_ web API
  752. // requests.
  753. //
  754. // Blocking also ensures existingClient.releaseAuthorizations is invoked before
  755. // the new client attempts to submit the same authorizations.
  756. //
  757. // Perform blocking awaitStopped operation outside the
  758. // sshServer.clientsMutex mutex to avoid blocking all other clients for the
  759. // duration. We still expect and require that the stop process completes
  760. // rapidly, e.g., does not block on network I/O, allowing the new client
  761. // connection to proceed without delay.
  762. //
  763. // In addition, operations triggered by stop, and which must complete before
  764. // awaitStopped returns, will attempt to lock sshServer.clientsMutex,
  765. // including unregisterEstablishedClient.
  766. existingClient.awaitStopped()
  767. }
  768. sshServer.clientsMutex.Lock()
  769. defer sshServer.clientsMutex.Unlock()
  770. // existingClient's stop will have removed it from sshServer.clients via
  771. // unregisterEstablishedClient, so sshServer.clients[client.sessionID] should
  772. // be nil -- unless yet another client instance using the same sessionID has
  773. // connected in the meantime while awaiting existingClient stop. In this
  774. // case, it's not clear which is the most recent connection from the client,
  775. // so instead of this connection terminating more peers, it aborts.
  776. if sshServer.clients[client.sessionID] != nil {
  777. // As this is expected to be rare case, it's logged at a higher severity
  778. // level.
  779. log.WithTrace().Warning(
  780. "aborting new client with duplicate session ID")
  781. return false
  782. }
  783. // SSH_MAX_CLIENT_COUNT is a simple sanity check and failsafe. Load
  784. // limiting tuned to each server's host resources is provided by external
  785. // components. See comment in newSSHServer for more details.
  786. if len(sshServer.clients) >= SSH_MAX_CLIENT_COUNT {
  787. log.WithTrace().Warning("SSH_MAX_CLIENT_COUNT exceeded")
  788. return false
  789. }
  790. sshServer.clients[client.sessionID] = client
  791. return true
  792. }
  793. func (sshServer *sshServer) unregisterEstablishedClient(client *sshClient) {
  794. sshServer.clientsMutex.Lock()
  795. registeredClient := sshServer.clients[client.sessionID]
  796. // registeredClient will differ from client when client is the existingClient
  797. // terminated in registerEstablishedClient. In that case, registeredClient
  798. // remains connected, and the sshServer.clients entry should be retained.
  799. if registeredClient == client {
  800. delete(sshServer.clients, client.sessionID)
  801. }
  802. sshServer.clientsMutex.Unlock()
  803. client.stop()
  804. }
  805. type UpstreamStats map[string]interface{}
  806. type ProtocolStats map[string]map[string]interface{}
  807. type RegionStats map[string]map[string]map[string]interface{}
  808. func (sshServer *sshServer) getLoadStats() (
  809. UpstreamStats, ProtocolStats, RegionStats) {
  810. sshServer.clientsMutex.Lock()
  811. defer sshServer.clientsMutex.Unlock()
  812. // Explicitly populate with zeros to ensure 0 counts in log messages.
  813. zeroClientStats := func() map[string]interface{} {
  814. stats := make(map[string]interface{})
  815. stats["accepted_clients"] = int64(0)
  816. stats["established_clients"] = int64(0)
  817. return stats
  818. }
  819. // Due to hot reload and changes to the underlying system configuration, the
  820. // set of resolver IPs may change between getLoadStats calls, so this
  821. // enumeration for zeroing is a best effort.
  822. resolverIPs := sshServer.support.DNSResolver.GetAll()
  823. // Fields which are primarily concerned with upstream/egress performance.
  824. zeroUpstreamStats := func() map[string]interface{} {
  825. stats := make(map[string]interface{})
  826. stats["dialing_tcp_port_forwards"] = int64(0)
  827. stats["tcp_port_forwards"] = int64(0)
  828. stats["total_tcp_port_forwards"] = int64(0)
  829. stats["udp_port_forwards"] = int64(0)
  830. stats["total_udp_port_forwards"] = int64(0)
  831. stats["tcp_port_forward_dialed_count"] = int64(0)
  832. stats["tcp_port_forward_dialed_duration"] = int64(0)
  833. stats["tcp_port_forward_failed_count"] = int64(0)
  834. stats["tcp_port_forward_failed_duration"] = int64(0)
  835. stats["tcp_port_forward_rejected_dialing_limit_count"] = int64(0)
  836. stats["tcp_port_forward_rejected_disallowed_count"] = int64(0)
  837. stats["udp_port_forward_rejected_disallowed_count"] = int64(0)
  838. stats["tcp_ipv4_port_forward_dialed_count"] = int64(0)
  839. stats["tcp_ipv4_port_forward_dialed_duration"] = int64(0)
  840. stats["tcp_ipv4_port_forward_failed_count"] = int64(0)
  841. stats["tcp_ipv4_port_forward_failed_duration"] = int64(0)
  842. stats["tcp_ipv6_port_forward_dialed_count"] = int64(0)
  843. stats["tcp_ipv6_port_forward_dialed_duration"] = int64(0)
  844. stats["tcp_ipv6_port_forward_failed_count"] = int64(0)
  845. stats["tcp_ipv6_port_forward_failed_duration"] = int64(0)
  846. zeroDNSStats := func() map[string]int64 {
  847. m := map[string]int64{"ALL": 0}
  848. for _, resolverIP := range resolverIPs {
  849. m[resolverIP.String()] = 0
  850. }
  851. return m
  852. }
  853. stats["dns_count"] = zeroDNSStats()
  854. stats["dns_duration"] = zeroDNSStats()
  855. stats["dns_failed_count"] = zeroDNSStats()
  856. stats["dns_failed_duration"] = zeroDNSStats()
  857. return stats
  858. }
  859. zeroProtocolStats := func() map[string]map[string]interface{} {
  860. stats := make(map[string]map[string]interface{})
  861. stats["ALL"] = zeroClientStats()
  862. for tunnelProtocol := range sshServer.support.Config.TunnelProtocolPorts {
  863. stats[tunnelProtocol] = zeroClientStats()
  864. if sshServer.tunnelProtocolUsesTLSDemux(tunnelProtocol) {
  865. stats[protocol.TUNNEL_PROTOCOL_TLS_OBFUSCATED_SSH] = zeroClientStats()
  866. }
  867. }
  868. return stats
  869. }
  870. addInt64 := func(stats map[string]interface{}, name string, value int64) {
  871. stats[name] = stats[name].(int64) + value
  872. }
  873. upstreamStats := zeroUpstreamStats()
  874. // [<protocol or ALL>][<stat name>] -> count
  875. protocolStats := zeroProtocolStats()
  876. // [<region][<protocol or ALL>][<stat name>] -> count
  877. regionStats := make(RegionStats)
  878. // Note: as currently tracked/counted, each established client is also an accepted client
  879. // Accepted client counts use peer GeoIP data, which in the case of
  880. // in-proxy tunnel protocols is the proxy, not the client. The original
  881. // client IP is only obtained after the tunnel handshake has completed.
  882. for tunnelProtocol, regionAcceptedClientCounts := range sshServer.acceptedClientCounts {
  883. for region, acceptedClientCount := range regionAcceptedClientCounts {
  884. if acceptedClientCount > 0 {
  885. if regionStats[region] == nil {
  886. regionStats[region] = zeroProtocolStats()
  887. }
  888. addInt64(protocolStats["ALL"], "accepted_clients", acceptedClientCount)
  889. addInt64(protocolStats[tunnelProtocol], "accepted_clients", acceptedClientCount)
  890. addInt64(regionStats[region]["ALL"], "accepted_clients", acceptedClientCount)
  891. addInt64(regionStats[region][tunnelProtocol], "accepted_clients", acceptedClientCount)
  892. }
  893. }
  894. }
  895. for _, client := range sshServer.clients {
  896. client.Lock()
  897. // Limitation: registerEstablishedClient is called before the
  898. // handshake API completes; as a result, in the case of in-proxy
  899. // tunnel protocol, clientGeoIPData may not yet be initialized and
  900. // will count as None.
  901. tunnelProtocol := client.tunnelProtocol
  902. region := client.clientGeoIPData.Country
  903. if regionStats[region] == nil {
  904. regionStats[region] = zeroProtocolStats()
  905. }
  906. for _, stats := range []map[string]interface{}{
  907. protocolStats["ALL"],
  908. protocolStats[tunnelProtocol],
  909. regionStats[region]["ALL"],
  910. regionStats[region][tunnelProtocol]} {
  911. addInt64(stats, "established_clients", 1)
  912. }
  913. // Note:
  914. // - can't sum trafficState.peakConcurrentPortForwardCount to get a global peak
  915. // - client.udpTrafficState.concurrentDialingPortForwardCount isn't meaningful
  916. addInt64(upstreamStats, "dialing_tcp_port_forwards",
  917. client.tcpTrafficState.concurrentDialingPortForwardCount)
  918. addInt64(upstreamStats, "tcp_port_forwards",
  919. client.tcpTrafficState.concurrentPortForwardCount)
  920. addInt64(upstreamStats, "total_tcp_port_forwards",
  921. client.tcpTrafficState.totalPortForwardCount)
  922. addInt64(upstreamStats, "udp_port_forwards",
  923. client.udpTrafficState.concurrentPortForwardCount)
  924. addInt64(upstreamStats, "total_udp_port_forwards",
  925. client.udpTrafficState.totalPortForwardCount)
  926. addInt64(upstreamStats, "tcp_port_forward_dialed_count",
  927. client.qualityMetrics.TCPPortForwardDialedCount)
  928. addInt64(upstreamStats, "tcp_port_forward_dialed_duration",
  929. int64(client.qualityMetrics.TCPPortForwardDialedDuration/time.Millisecond))
  930. addInt64(upstreamStats, "tcp_port_forward_failed_count",
  931. client.qualityMetrics.TCPPortForwardFailedCount)
  932. addInt64(upstreamStats, "tcp_port_forward_failed_duration",
  933. int64(client.qualityMetrics.TCPPortForwardFailedDuration/time.Millisecond))
  934. addInt64(upstreamStats, "tcp_port_forward_rejected_dialing_limit_count",
  935. client.qualityMetrics.TCPPortForwardRejectedDialingLimitCount)
  936. addInt64(upstreamStats, "tcp_port_forward_rejected_disallowed_count",
  937. client.qualityMetrics.TCPPortForwardRejectedDisallowedCount)
  938. addInt64(upstreamStats, "udp_port_forward_rejected_disallowed_count",
  939. client.qualityMetrics.UDPPortForwardRejectedDisallowedCount)
  940. addInt64(upstreamStats, "tcp_ipv4_port_forward_dialed_count",
  941. client.qualityMetrics.TCPIPv4PortForwardDialedCount)
  942. addInt64(upstreamStats, "tcp_ipv4_port_forward_dialed_duration",
  943. int64(client.qualityMetrics.TCPIPv4PortForwardDialedDuration/time.Millisecond))
  944. addInt64(upstreamStats, "tcp_ipv4_port_forward_failed_count",
  945. client.qualityMetrics.TCPIPv4PortForwardFailedCount)
  946. addInt64(upstreamStats, "tcp_ipv4_port_forward_failed_duration",
  947. int64(client.qualityMetrics.TCPIPv4PortForwardFailedDuration/time.Millisecond))
  948. addInt64(upstreamStats, "tcp_ipv6_port_forward_dialed_count",
  949. client.qualityMetrics.TCPIPv6PortForwardDialedCount)
  950. addInt64(upstreamStats, "tcp_ipv6_port_forward_dialed_duration",
  951. int64(client.qualityMetrics.TCPIPv6PortForwardDialedDuration/time.Millisecond))
  952. addInt64(upstreamStats, "tcp_ipv6_port_forward_failed_count",
  953. client.qualityMetrics.TCPIPv6PortForwardFailedCount)
  954. addInt64(upstreamStats, "tcp_ipv6_port_forward_failed_duration",
  955. int64(client.qualityMetrics.TCPIPv6PortForwardFailedDuration/time.Millisecond))
  956. // DNS metrics limitations:
  957. // - port forwards (sshClient.handleTCPChannel) don't know or log the resolver IP.
  958. // - udpgw and packet tunnel transparent DNS use a heuristic to classify success/failure,
  959. // and there may be some delay before these code paths report DNS metrics.
  960. // Every client.qualityMetrics DNS map has an "ALL" entry.
  961. totalDNSCount := int64(0)
  962. totalDNSFailedCount := int64(0)
  963. for key, value := range client.qualityMetrics.DNSCount {
  964. upstreamStats["dns_count"].(map[string]int64)[key] += value
  965. totalDNSCount += value
  966. }
  967. for key, value := range client.qualityMetrics.DNSDuration {
  968. upstreamStats["dns_duration"].(map[string]int64)[key] += int64(value / time.Millisecond)
  969. }
  970. for key, value := range client.qualityMetrics.DNSFailedCount {
  971. upstreamStats["dns_failed_count"].(map[string]int64)[key] += value
  972. totalDNSFailedCount += value
  973. }
  974. for key, value := range client.qualityMetrics.DNSFailedDuration {
  975. upstreamStats["dns_failed_duration"].(map[string]int64)[key] += int64(value / time.Millisecond)
  976. }
  977. // Update client peak failure rate metrics, to be recorded in
  978. // server_tunnel.
  979. //
  980. // Limitations:
  981. //
  982. // - This is a simple data sampling that doesn't require additional
  983. // timers or tracking logic. Since the rates are calculated on
  984. // getLoadStats events and using accumulated counts, these peaks
  985. // only represent the highest failure rate within a
  986. // Config.LoadMonitorPeriodSeconds non-sliding window. There is no
  987. // sample recorded for short tunnels with no overlapping
  988. // getLoadStats event.
  989. //
  990. // - There is no minimum sample window, as a getLoadStats event may
  991. // occur immediately after a client first connects. This may be
  992. // compensated for by adjusting
  993. // Config.PeakUpstreamFailureRateMinimumSampleSize, so as to only
  994. // consider failure rates with a larger number of samples.
  995. //
  996. // - Non-UDP "failures" are not currently tracked.
  997. minimumSampleSize := int64(sshServer.support.Config.peakUpstreamFailureRateMinimumSampleSize)
  998. sampleSize := client.qualityMetrics.TCPPortForwardDialedCount +
  999. client.qualityMetrics.TCPPortForwardFailedCount
  1000. if sampleSize >= minimumSampleSize {
  1001. TCPPortForwardFailureRate := float64(client.qualityMetrics.TCPPortForwardFailedCount) /
  1002. float64(sampleSize)
  1003. if client.peakMetrics.TCPPortForwardFailureRate == nil {
  1004. client.peakMetrics.TCPPortForwardFailureRate = new(float64)
  1005. *client.peakMetrics.TCPPortForwardFailureRate = TCPPortForwardFailureRate
  1006. client.peakMetrics.TCPPortForwardFailureRateSampleSize = new(int64)
  1007. *client.peakMetrics.TCPPortForwardFailureRateSampleSize = sampleSize
  1008. } else if *client.peakMetrics.TCPPortForwardFailureRate < TCPPortForwardFailureRate {
  1009. *client.peakMetrics.TCPPortForwardFailureRate = TCPPortForwardFailureRate
  1010. *client.peakMetrics.TCPPortForwardFailureRateSampleSize = sampleSize
  1011. }
  1012. }
  1013. sampleSize = totalDNSCount + totalDNSFailedCount
  1014. if sampleSize >= minimumSampleSize {
  1015. DNSFailureRate := float64(totalDNSFailedCount) / float64(sampleSize)
  1016. if client.peakMetrics.DNSFailureRate == nil {
  1017. client.peakMetrics.DNSFailureRate = new(float64)
  1018. *client.peakMetrics.DNSFailureRate = DNSFailureRate
  1019. client.peakMetrics.DNSFailureRateSampleSize = new(int64)
  1020. *client.peakMetrics.DNSFailureRateSampleSize = sampleSize
  1021. } else if *client.peakMetrics.DNSFailureRate < DNSFailureRate {
  1022. *client.peakMetrics.DNSFailureRate = DNSFailureRate
  1023. *client.peakMetrics.DNSFailureRateSampleSize = sampleSize
  1024. }
  1025. }
  1026. // Reset quality metrics counters
  1027. client.qualityMetrics.reset()
  1028. client.Unlock()
  1029. }
  1030. for _, client := range sshServer.clients {
  1031. client.Lock()
  1032. // Update client peak proximate (same region) concurrently connected
  1033. // (other clients) client metrics, to be recorded in server_tunnel.
  1034. // This operation requires a second loop over sshServer.clients since
  1035. // established_clients is calculated in the first loop.
  1036. //
  1037. // Limitations:
  1038. //
  1039. // - This is an approximation, not a true peak, as it only samples
  1040. // data every Config.LoadMonitorPeriodSeconds period. There is no
  1041. // sample recorded for short tunnels with no overlapping
  1042. // getLoadStats event.
  1043. //
  1044. // - The "-1" calculation counts all but the current client as other
  1045. // clients; it can be the case that the same client has a dangling
  1046. // accepted connection that has yet to time-out server side. Due to
  1047. // NAT, we can't determine if the client is the same based on
  1048. // network address. For established clients,
  1049. // registerEstablishedClient ensures that any previous connection
  1050. // is first terminated, although this is only for the same
  1051. // session_id. Concurrent proximate clients may be considered an
  1052. // exact number of other _network connections_, even from the same
  1053. // client.
  1054. //
  1055. // Futhermore, since client.Locks aren't held between the previous
  1056. // loop and this one, it's also possible that the client's
  1057. // clientGeoIPData was None in the previous loop and is now not
  1058. // None. In this case, the regionStats may not be populated at all
  1059. // for the client's current region; if so, the client is skipped.
  1060. // This scenario can also result in a proximate undercount by one,
  1061. // when the regionStats _is_ populated: this client was counted
  1062. // under None, not the current client.peerGeoIPData.Country, so
  1063. // the -1 subtracts some _other_ client from the populated regionStats.
  1064. //
  1065. // - For in-proxy protocols, the accepted proximate metric uses the
  1066. // peer GeoIP, which represents the proxy, not the client.
  1067. stats := regionStats[client.peerGeoIPData.Country]["ALL"]
  1068. n := stats["accepted_clients"].(int64) - 1
  1069. if n >= 0 {
  1070. if client.peakMetrics.concurrentProximateAcceptedClients == nil {
  1071. client.peakMetrics.concurrentProximateAcceptedClients = new(int64)
  1072. *client.peakMetrics.concurrentProximateAcceptedClients = n
  1073. } else if *client.peakMetrics.concurrentProximateAcceptedClients < n {
  1074. *client.peakMetrics.concurrentProximateAcceptedClients = n
  1075. }
  1076. }
  1077. // Handle the in-proxy None and None/not-None cases (and any other
  1078. // potential scenario where regionStats[client.clientGeoIPData.Country]
  1079. // may not be populated).
  1080. if client.clientGeoIPData.Country == GEOIP_UNKNOWN_VALUE ||
  1081. regionStats[client.clientGeoIPData.Country] == nil {
  1082. client.Unlock()
  1083. continue
  1084. }
  1085. stats = regionStats[client.clientGeoIPData.Country]["ALL"]
  1086. n = stats["established_clients"].(int64) - 1
  1087. if n >= 0 {
  1088. if client.peakMetrics.concurrentProximateEstablishedClients == nil {
  1089. client.peakMetrics.concurrentProximateEstablishedClients = new(int64)
  1090. *client.peakMetrics.concurrentProximateEstablishedClients = n
  1091. } else if *client.peakMetrics.concurrentProximateEstablishedClients < n {
  1092. *client.peakMetrics.concurrentProximateEstablishedClients = n
  1093. }
  1094. }
  1095. client.Unlock()
  1096. }
  1097. return upstreamStats, protocolStats, regionStats
  1098. }
  1099. func (sshServer *sshServer) getEstablishedClientCount() int {
  1100. sshServer.clientsMutex.Lock()
  1101. defer sshServer.clientsMutex.Unlock()
  1102. establishedClients := len(sshServer.clients)
  1103. return establishedClients
  1104. }
  1105. func (sshServer *sshServer) resetAllClientTrafficRules() {
  1106. sshServer.clientsMutex.Lock()
  1107. clients := make(map[string]*sshClient)
  1108. for sessionID, client := range sshServer.clients {
  1109. clients[sessionID] = client
  1110. }
  1111. sshServer.clientsMutex.Unlock()
  1112. for _, client := range clients {
  1113. client.setTrafficRules()
  1114. }
  1115. }
  1116. func (sshServer *sshServer) resetAllClientOSLConfigs() {
  1117. // Flush cached seed state. This has the same effect
  1118. // and same limitations as calling setOSLConfig for
  1119. // currently connected clients -- all progress is lost.
  1120. sshServer.oslSessionCacheMutex.Lock()
  1121. sshServer.oslSessionCache.Flush()
  1122. sshServer.oslSessionCacheMutex.Unlock()
  1123. sshServer.clientsMutex.Lock()
  1124. clients := make(map[string]*sshClient)
  1125. for sessionID, client := range sshServer.clients {
  1126. clients[sessionID] = client
  1127. }
  1128. sshServer.clientsMutex.Unlock()
  1129. for _, client := range clients {
  1130. client.setOSLConfig()
  1131. }
  1132. }
  1133. // reloadTactics signals/invokes components that use server-side tactics for
  1134. // one-time initialization to reload and use potentially changed parameters.
  1135. func (sshServer *sshServer) reloadTactics() error {
  1136. // The following in-proxy components use server-side tactics with a
  1137. // one-time initialization:
  1138. //
  1139. // - For servers running in-proxy tunnel protocols,
  1140. // sshServer.inproxyBrokerSessions are the broker/server sessions and
  1141. // the set of expected broker public keys is set from tactics.
  1142. // - For servers running a broker within MeekServer, broker operational
  1143. // configuration is set from tactics.
  1144. //
  1145. // For these components, one-time initialization is more efficient than
  1146. // constantly fetching tactics. Instead, these components reinitialize
  1147. // when tactics change.
  1148. // sshServer.inproxyBrokerSessions is not nil when the server is running
  1149. // in-proxy tunnel protocols.
  1150. if sshServer.inproxyBrokerSessions != nil {
  1151. // Get InproxyAllBrokerPublicKeys from tactics.
  1152. //
  1153. // Limitation: assumes no GeoIP targeting for InproxyAllBrokerPublicKeys.
  1154. p, err := sshServer.support.ServerTacticsParametersCache.Get(NewGeoIPData())
  1155. if err != nil {
  1156. return errors.Trace(err)
  1157. }
  1158. if !p.IsNil() {
  1159. brokerPublicKeys, err := inproxy.SessionPublicKeysFromStrings(
  1160. p.Strings(parameters.InproxyAllBrokerPublicKeys))
  1161. if err != nil {
  1162. return errors.Trace(err)
  1163. }
  1164. // SetKnownBrokerPublicKeys will terminate any existing sessions
  1165. // for broker public keys no longer in the known/expected list;
  1166. // but will retain any existing sessions for broker public keys
  1167. // that remain in the list.
  1168. sshServer.inproxyBrokerSessions.SetKnownBrokerPublicKeys(brokerPublicKeys)
  1169. }
  1170. }
  1171. err := sshServer.reloadMeekServerTactics()
  1172. if err != nil {
  1173. return errors.Trace(err)
  1174. }
  1175. return nil
  1176. }
  1177. func (sshServer *sshServer) revokeClientAuthorizations(sessionID string) {
  1178. sshServer.clientsMutex.Lock()
  1179. client := sshServer.clients[sessionID]
  1180. sshServer.clientsMutex.Unlock()
  1181. if client == nil {
  1182. return
  1183. }
  1184. // sshClient.handshakeState.authorizedAccessTypes is not cleared. Clearing
  1185. // authorizedAccessTypes may cause sshClient.logTunnel to fail to log
  1186. // access types. As the revocation may be due to legitimate use of an
  1187. // authorization in multiple sessions by a single client, useful metrics
  1188. // would be lost.
  1189. client.Lock()
  1190. client.handshakeState.authorizationsRevoked = true
  1191. client.Unlock()
  1192. // Select and apply new traffic rules, as filtered by the client's new
  1193. // authorization state.
  1194. client.setTrafficRules()
  1195. }
  1196. func (sshServer *sshServer) stopClients() {
  1197. sshServer.clientsMutex.Lock()
  1198. sshServer.stoppingClients = true
  1199. clients := sshServer.clients
  1200. sshServer.clients = make(map[string]*sshClient)
  1201. sshServer.clientsMutex.Unlock()
  1202. for _, client := range clients {
  1203. client.stop()
  1204. }
  1205. }
  1206. func (sshServer *sshServer) handleClient(
  1207. sshListener *sshListener,
  1208. conn net.Conn,
  1209. transportData *additionalTransportData) {
  1210. // overrideTunnelProtocol sets the tunnel protocol to a value other than
  1211. // the listener tunnel protocol. This is used in fronted meek
  1212. // configuration, where a single HTTPS listener also handles fronted HTTP
  1213. // and QUIC traffic; and in the protocol demux case.
  1214. tunnelProtocol := sshListener.tunnelProtocol
  1215. if transportData != nil && transportData.overrideTunnelProtocol != "" {
  1216. tunnelProtocol = transportData.overrideTunnelProtocol
  1217. }
  1218. // Calling conn.RemoteAddr at this point, before any Read calls,
  1219. // satisfies the constraint documented in tapdance.Listen.
  1220. peerAddr := conn.RemoteAddr()
  1221. // Check if there were irregularities during the network connection
  1222. // establishment. When present, log and then behave as Obfuscated SSH does
  1223. // when the client fails to provide a valid seed message.
  1224. //
  1225. // One concrete irregular case is failure to send a PROXY protocol header for
  1226. // TAPDANCE-OSSH.
  1227. if indicator, ok := conn.(common.IrregularIndicator); ok {
  1228. tunnelErr := indicator.IrregularTunnelError()
  1229. if tunnelErr != nil {
  1230. logIrregularTunnel(
  1231. sshServer.support,
  1232. sshListener.tunnelProtocol,
  1233. sshListener.port,
  1234. common.IPAddressFromAddr(peerAddr),
  1235. errors.Trace(tunnelErr),
  1236. nil)
  1237. var afterFunc *time.Timer
  1238. if sshServer.support.Config.sshHandshakeTimeout > 0 {
  1239. afterFunc = time.AfterFunc(sshServer.support.Config.sshHandshakeTimeout, func() {
  1240. conn.Close()
  1241. })
  1242. }
  1243. io.Copy(ioutil.Discard, conn)
  1244. conn.Close()
  1245. afterFunc.Stop()
  1246. return
  1247. }
  1248. }
  1249. // Get any packet manipulation values from GetAppliedSpecName as soon as
  1250. // possible due to the expiring TTL.
  1251. //
  1252. // In the case of in-proxy tunnel protocols, the remote address will be
  1253. // the proxy, not the client, and GeoIP targeted packet manipulation will
  1254. // apply to the 2nd hop.
  1255. serverPacketManipulation := ""
  1256. replayedServerPacketManipulation := false
  1257. if sshServer.support.Config.RunPacketManipulator &&
  1258. protocol.TunnelProtocolMayUseServerPacketManipulation(tunnelProtocol) {
  1259. // A meekConn has synthetic address values, including the original client
  1260. // address in cases where the client uses an upstream proxy to connect to
  1261. // Psiphon. For meekConn, and any other conn implementing
  1262. // UnderlyingTCPAddrSource, get the underlying TCP connection addresses.
  1263. //
  1264. // Limitation: a meek tunnel may consist of several TCP connections. The
  1265. // server_packet_manipulation metric will reflect the packet manipulation
  1266. // applied to the _first_ TCP connection only.
  1267. var localAddr, remoteAddr *net.TCPAddr
  1268. var ok bool
  1269. underlying, ok := conn.(common.UnderlyingTCPAddrSource)
  1270. if ok {
  1271. localAddr, remoteAddr, ok = underlying.GetUnderlyingTCPAddrs()
  1272. } else {
  1273. localAddr, ok = conn.LocalAddr().(*net.TCPAddr)
  1274. if ok {
  1275. remoteAddr, ok = conn.RemoteAddr().(*net.TCPAddr)
  1276. }
  1277. }
  1278. if ok {
  1279. specName, extraData, err := sshServer.support.PacketManipulator.
  1280. GetAppliedSpecName(localAddr, remoteAddr)
  1281. if err == nil {
  1282. serverPacketManipulation = specName
  1283. replayedServerPacketManipulation, _ = extraData.(bool)
  1284. }
  1285. }
  1286. }
  1287. // For in-proxy tunnel protocols, accepted client GeoIP reflects the proxy
  1288. // address, not the client.
  1289. peerGeoIPData := sshServer.support.GeoIPService.Lookup(
  1290. common.IPAddressFromAddr(peerAddr))
  1291. sshServer.registerAcceptedClient(tunnelProtocol, peerGeoIPData.Country)
  1292. defer sshServer.unregisterAcceptedClient(tunnelProtocol, peerGeoIPData.Country)
  1293. // When configured, enforce a cap on the number of concurrent SSH
  1294. // handshakes. This limits load spikes on busy servers when many clients
  1295. // attempt to connect at once. Wait a short time, SSH_BEGIN_HANDSHAKE_TIMEOUT,
  1296. // to acquire; waiting will avoid immediately creating more load on another
  1297. // server in the network when the client tries a new candidate. Disconnect the
  1298. // client when that wait time is exceeded.
  1299. //
  1300. // This mechanism limits memory allocations and CPU usage associated with the
  1301. // SSH handshake. At this point, new direct TCP connections or new meek
  1302. // connections, with associated resource usage, are already established. Those
  1303. // connections are expected to be rate or load limited using other mechanisms.
  1304. //
  1305. // TODO:
  1306. //
  1307. // - deduct time spent acquiring the semaphore from SSH_HANDSHAKE_TIMEOUT in
  1308. // sshClient.run, since the client is also applying an SSH handshake timeout
  1309. // and won't exclude time spent waiting.
  1310. // - each call to sshServer.handleClient (in sshServer.runListener) is invoked
  1311. // in its own goroutine, but shutdown doesn't synchronously await these
  1312. // goroutnes. Once this is synchronizes, the following context.WithTimeout
  1313. // should use an sshServer parent context to ensure blocking acquires
  1314. // interrupt immediately upon shutdown.
  1315. var onSSHHandshakeFinished func()
  1316. if sshServer.support.Config.MaxConcurrentSSHHandshakes > 0 {
  1317. ctx, cancelFunc := context.WithTimeout(
  1318. context.Background(),
  1319. sshServer.support.Config.sshBeginHandshakeTimeout)
  1320. defer cancelFunc()
  1321. err := sshServer.concurrentSSHHandshakes.Acquire(ctx, 1)
  1322. if err != nil {
  1323. conn.Close()
  1324. // This is a debug log as the only possible error is context timeout.
  1325. log.WithTraceFields(LogFields{"error": err}).Debug(
  1326. "acquire SSH handshake semaphore failed")
  1327. return
  1328. }
  1329. onSSHHandshakeFinished = func() {
  1330. sshServer.concurrentSSHHandshakes.Release(1)
  1331. }
  1332. }
  1333. sshClient := newSshClient(
  1334. sshServer,
  1335. sshListener,
  1336. tunnelProtocol,
  1337. transportData,
  1338. serverPacketManipulation,
  1339. replayedServerPacketManipulation,
  1340. peerAddr,
  1341. peerGeoIPData)
  1342. // sshClient.run _must_ call onSSHHandshakeFinished to release the semaphore:
  1343. // in any error case; or, as soon as the SSH handshake phase has successfully
  1344. // completed.
  1345. sshClient.run(conn, onSSHHandshakeFinished)
  1346. }
  1347. func (sshServer *sshServer) monitorPortForwardDialError(err error) {
  1348. // "err" is the error returned from a failed TCP or UDP port
  1349. // forward dial. Certain system error codes indicate low resource
  1350. // conditions: insufficient file descriptors, ephemeral ports, or
  1351. // memory. For these cases, log an alert.
  1352. // TODO: also temporarily suspend new clients
  1353. // Note: don't log net.OpError.Error() as the full error string
  1354. // may contain client destination addresses.
  1355. opErr, ok := err.(*net.OpError)
  1356. if ok {
  1357. if opErr.Err == syscall.EADDRNOTAVAIL ||
  1358. opErr.Err == syscall.EAGAIN ||
  1359. opErr.Err == syscall.ENOMEM ||
  1360. opErr.Err == syscall.EMFILE ||
  1361. opErr.Err == syscall.ENFILE {
  1362. log.WithTraceFields(
  1363. LogFields{"error": opErr.Err}).Error(
  1364. "port forward dial failed due to unavailable resource")
  1365. }
  1366. }
  1367. }
  1368. // tunnelProtocolUsesTLSDemux returns true if the server demultiplexes the given
  1369. // protocol and TLS-OSSH over the same port.
  1370. func (sshServer *sshServer) tunnelProtocolUsesTLSDemux(tunnelProtocol string) bool {
  1371. // Only use meek/TLS-OSSH demux if unfronted meek HTTPS with non-legacy
  1372. // passthrough, and not in-proxy.
  1373. if protocol.TunnelProtocolUsesMeekHTTPS(tunnelProtocol) &&
  1374. !protocol.TunnelProtocolUsesFrontedMeek(tunnelProtocol) &&
  1375. !protocol.TunnelProtocolUsesInproxy(tunnelProtocol) {
  1376. _, passthroughEnabled := sshServer.support.Config.TunnelProtocolPassthroughAddresses[tunnelProtocol]
  1377. return passthroughEnabled && !sshServer.support.Config.LegacyPassthrough
  1378. }
  1379. return false
  1380. }
  1381. // setGeoIPSessionCache adds the sessionID/geoIPData pair to the session
  1382. // cache. This value will not expire; the caller must call
  1383. // markGeoIPSessionCacheToExpire to initiate expiry. Calling
  1384. // setGeoIPSessionCache for an existing sessionID will replace the previous
  1385. // value and reset any expiry.
  1386. func (sshServer *sshServer) setGeoIPSessionCache(sessionID string, geoIPData GeoIPData) {
  1387. sshServer.geoIPSessionCache.Set(sessionID, geoIPData, cache.NoExpiration)
  1388. }
  1389. // markGeoIPSessionCacheToExpire initiates expiry for an existing session
  1390. // cache entry, if the session ID is found in the cache. Concurrency note:
  1391. // setGeoIPSessionCache and markGeoIPSessionCacheToExpire should not be
  1392. // called concurrently for a single session ID.
  1393. func (sshServer *sshServer) markGeoIPSessionCacheToExpire(sessionID string) {
  1394. geoIPData, found := sshServer.geoIPSessionCache.Get(sessionID)
  1395. // Note: potential race condition between Get and Set. In practice,
  1396. // the tunnel server won't clobber a SetSessionCache value by calling
  1397. // MarkSessionCacheToExpire concurrently.
  1398. if found {
  1399. sshServer.geoIPSessionCache.Set(sessionID, geoIPData, cache.DefaultExpiration)
  1400. }
  1401. }
  1402. // getGeoIPSessionCache returns the cached GeoIPData for the specified session
  1403. // ID; a blank GeoIPData is returned if the session ID is not found in the
  1404. // cache.
  1405. func (sshServer *sshServer) getGeoIPSessionCache(sessionID string) GeoIPData {
  1406. geoIPData, found := sshServer.geoIPSessionCache.Get(sessionID)
  1407. if !found {
  1408. return NewGeoIPData()
  1409. }
  1410. return geoIPData.(GeoIPData)
  1411. }
  1412. // inGeoIPSessionCache returns whether the session ID is present in the
  1413. // session cache.
  1414. func (sshServer *sshServer) inGeoIPSessionCache(sessionID string) bool {
  1415. _, found := sshServer.geoIPSessionCache.Get(sessionID)
  1416. return found
  1417. }
  1418. type sshClient struct {
  1419. sync.Mutex
  1420. sshServer *sshServer
  1421. sshListener *sshListener
  1422. tunnelProtocol string
  1423. isInproxyTunnelProtocol bool
  1424. additionalTransportData *additionalTransportData
  1425. sshConn ssh.Conn
  1426. throttledConn *common.ThrottledConn
  1427. serverPacketManipulation string
  1428. replayedServerPacketManipulation bool
  1429. peerAddr net.Addr
  1430. peerGeoIPData GeoIPData
  1431. clientGeoIPData GeoIPData
  1432. sessionID string
  1433. isFirstTunnelInSession bool
  1434. supportsServerRequests bool
  1435. handshakeState handshakeState
  1436. udpgwChannelHandler *udpgwPortForwardMultiplexer
  1437. totalUdpgwChannelCount int
  1438. packetTunnelChannel ssh.Channel
  1439. totalPacketTunnelChannelCount int
  1440. trafficRules TrafficRules
  1441. tcpTrafficState trafficState
  1442. udpTrafficState trafficState
  1443. qualityMetrics *qualityMetrics
  1444. tcpPortForwardLRU *common.LRUConns
  1445. oslClientSeedState *osl.ClientSeedState
  1446. signalIssueSLOKs chan struct{}
  1447. runCtx context.Context
  1448. stopRunning context.CancelFunc
  1449. stopped chan struct{}
  1450. tcpPortForwardDialingAvailableSignal context.CancelFunc
  1451. releaseAuthorizations func()
  1452. stopTimer *time.Timer
  1453. preHandshakeRandomStreamMetrics randomStreamMetrics
  1454. postHandshakeRandomStreamMetrics randomStreamMetrics
  1455. sendAlertRequests chan protocol.AlertRequest
  1456. sentAlertRequests map[string]bool
  1457. peakMetrics peakMetrics
  1458. destinationBytesMetricsASN string
  1459. tcpDestinationBytesMetrics destinationBytesMetrics
  1460. udpDestinationBytesMetrics destinationBytesMetrics
  1461. }
  1462. type trafficState struct {
  1463. bytesUp int64
  1464. bytesDown int64
  1465. concurrentDialingPortForwardCount int64
  1466. peakConcurrentDialingPortForwardCount int64
  1467. concurrentPortForwardCount int64
  1468. peakConcurrentPortForwardCount int64
  1469. totalPortForwardCount int64
  1470. availablePortForwardCond *sync.Cond
  1471. }
  1472. type randomStreamMetrics struct {
  1473. count int64
  1474. upstreamBytes int64
  1475. receivedUpstreamBytes int64
  1476. downstreamBytes int64
  1477. sentDownstreamBytes int64
  1478. }
  1479. type peakMetrics struct {
  1480. concurrentProximateAcceptedClients *int64
  1481. concurrentProximateEstablishedClients *int64
  1482. TCPPortForwardFailureRate *float64
  1483. TCPPortForwardFailureRateSampleSize *int64
  1484. DNSFailureRate *float64
  1485. DNSFailureRateSampleSize *int64
  1486. }
  1487. // qualityMetrics records upstream TCP dial attempts and
  1488. // elapsed time. Elapsed time includes the full TCP handshake
  1489. // and, in aggregate, is a measure of the quality of the
  1490. // upstream link. These stats are recorded by each sshClient
  1491. // and then reported and reset in sshServer.getLoadStats().
  1492. type qualityMetrics struct {
  1493. TCPPortForwardDialedCount int64
  1494. TCPPortForwardDialedDuration time.Duration
  1495. TCPPortForwardFailedCount int64
  1496. TCPPortForwardFailedDuration time.Duration
  1497. TCPPortForwardRejectedDialingLimitCount int64
  1498. TCPPortForwardRejectedDisallowedCount int64
  1499. UDPPortForwardRejectedDisallowedCount int64
  1500. TCPIPv4PortForwardDialedCount int64
  1501. TCPIPv4PortForwardDialedDuration time.Duration
  1502. TCPIPv4PortForwardFailedCount int64
  1503. TCPIPv4PortForwardFailedDuration time.Duration
  1504. TCPIPv6PortForwardDialedCount int64
  1505. TCPIPv6PortForwardDialedDuration time.Duration
  1506. TCPIPv6PortForwardFailedCount int64
  1507. TCPIPv6PortForwardFailedDuration time.Duration
  1508. DNSCount map[string]int64
  1509. DNSDuration map[string]time.Duration
  1510. DNSFailedCount map[string]int64
  1511. DNSFailedDuration map[string]time.Duration
  1512. }
  1513. func newQualityMetrics() *qualityMetrics {
  1514. return &qualityMetrics{
  1515. DNSCount: make(map[string]int64),
  1516. DNSDuration: make(map[string]time.Duration),
  1517. DNSFailedCount: make(map[string]int64),
  1518. DNSFailedDuration: make(map[string]time.Duration),
  1519. }
  1520. }
  1521. func (q *qualityMetrics) reset() {
  1522. q.TCPPortForwardDialedCount = 0
  1523. q.TCPPortForwardDialedDuration = 0
  1524. q.TCPPortForwardFailedCount = 0
  1525. q.TCPPortForwardFailedDuration = 0
  1526. q.TCPPortForwardRejectedDialingLimitCount = 0
  1527. q.TCPPortForwardRejectedDisallowedCount = 0
  1528. q.UDPPortForwardRejectedDisallowedCount = 0
  1529. q.TCPIPv4PortForwardDialedCount = 0
  1530. q.TCPIPv4PortForwardDialedDuration = 0
  1531. q.TCPIPv4PortForwardFailedCount = 0
  1532. q.TCPIPv4PortForwardFailedDuration = 0
  1533. q.TCPIPv6PortForwardDialedCount = 0
  1534. q.TCPIPv6PortForwardDialedDuration = 0
  1535. q.TCPIPv6PortForwardFailedCount = 0
  1536. q.TCPIPv6PortForwardFailedDuration = 0
  1537. // Retain existing maps to avoid memory churn. The Go compiler optimizes map
  1538. // clearing operations of the following form.
  1539. for k := range q.DNSCount {
  1540. delete(q.DNSCount, k)
  1541. }
  1542. for k := range q.DNSDuration {
  1543. delete(q.DNSDuration, k)
  1544. }
  1545. for k := range q.DNSFailedCount {
  1546. delete(q.DNSFailedCount, k)
  1547. }
  1548. for k := range q.DNSFailedDuration {
  1549. delete(q.DNSFailedDuration, k)
  1550. }
  1551. }
  1552. type handshakeStateInfo struct {
  1553. activeAuthorizationIDs []string
  1554. authorizedAccessTypes []string
  1555. upstreamBytesPerSecond int64
  1556. downstreamBytesPerSecond int64
  1557. steeringIP string
  1558. }
  1559. type handshakeState struct {
  1560. completed bool
  1561. apiProtocol string
  1562. apiParams common.APIParameters
  1563. activeAuthorizationIDs []string
  1564. authorizedAccessTypes []string
  1565. authorizationsRevoked bool
  1566. domainBytesChecksum []byte
  1567. establishedTunnelsCount int
  1568. splitTunnelLookup *splitTunnelLookup
  1569. deviceRegion string
  1570. newTacticsTag string
  1571. inproxyClientIP string
  1572. inproxyClientGeoIPData GeoIPData
  1573. inproxyRelayLogFields common.LogFields
  1574. }
  1575. type destinationBytesMetrics struct {
  1576. bytesUp int64
  1577. bytesDown int64
  1578. }
  1579. func (d *destinationBytesMetrics) UpdateProgress(
  1580. downstreamBytes, upstreamBytes, _ int64) {
  1581. // Concurrency: UpdateProgress may be called without holding the sshClient
  1582. // lock; all accesses to bytesUp/bytesDown must use atomic operations.
  1583. atomic.AddInt64(&d.bytesUp, upstreamBytes)
  1584. atomic.AddInt64(&d.bytesDown, downstreamBytes)
  1585. }
  1586. func (d *destinationBytesMetrics) getBytesUp() int64 {
  1587. return atomic.LoadInt64(&d.bytesUp)
  1588. }
  1589. func (d *destinationBytesMetrics) getBytesDown() int64 {
  1590. return atomic.LoadInt64(&d.bytesDown)
  1591. }
  1592. type splitTunnelLookup struct {
  1593. regions []string
  1594. regionsLookup map[string]bool
  1595. }
  1596. func newSplitTunnelLookup(
  1597. ownRegion string,
  1598. otherRegions []string) (*splitTunnelLookup, error) {
  1599. length := len(otherRegions)
  1600. if ownRegion != "" {
  1601. length += 1
  1602. }
  1603. // This length check is a sanity check and prevents clients shipping
  1604. // excessively long lists which could impact performance.
  1605. if length > 250 {
  1606. return nil, errors.Tracef("too many regions: %d", length)
  1607. }
  1608. // Create map lookups for lists where the number of values to compare
  1609. // against exceeds a threshold where benchmarks show maps are faster than
  1610. // looping through a slice. Otherwise use a slice for lookups. In both
  1611. // cases, the input slice is no longer referenced.
  1612. if length >= stringLookupThreshold {
  1613. regionsLookup := make(map[string]bool)
  1614. if ownRegion != "" {
  1615. regionsLookup[ownRegion] = true
  1616. }
  1617. for _, region := range otherRegions {
  1618. regionsLookup[region] = true
  1619. }
  1620. return &splitTunnelLookup{
  1621. regionsLookup: regionsLookup,
  1622. }, nil
  1623. } else {
  1624. regions := []string{}
  1625. if ownRegion != "" && !common.Contains(otherRegions, ownRegion) {
  1626. regions = append(regions, ownRegion)
  1627. }
  1628. // TODO: check for other duplicate regions?
  1629. regions = append(regions, otherRegions...)
  1630. return &splitTunnelLookup{
  1631. regions: regions,
  1632. }, nil
  1633. }
  1634. }
  1635. func (lookup *splitTunnelLookup) lookup(region string) bool {
  1636. if lookup.regionsLookup != nil {
  1637. return lookup.regionsLookup[region]
  1638. } else {
  1639. return common.Contains(lookup.regions, region)
  1640. }
  1641. }
  1642. func newSshClient(
  1643. sshServer *sshServer,
  1644. sshListener *sshListener,
  1645. tunnelProtocol string,
  1646. transportData *additionalTransportData,
  1647. serverPacketManipulation string,
  1648. replayedServerPacketManipulation bool,
  1649. peerAddr net.Addr,
  1650. peerGeoIPData GeoIPData) *sshClient {
  1651. runCtx, stopRunning := context.WithCancel(context.Background())
  1652. // isFirstTunnelInSession is defaulted to true so that the pre-handshake
  1653. // traffic rules won't apply UnthrottleFirstTunnelOnly and negate any
  1654. // unthrottled bytes during the initial protocol negotiation.
  1655. client := &sshClient{
  1656. sshServer: sshServer,
  1657. sshListener: sshListener,
  1658. tunnelProtocol: tunnelProtocol,
  1659. isInproxyTunnelProtocol: protocol.TunnelProtocolUsesInproxy(tunnelProtocol),
  1660. additionalTransportData: transportData,
  1661. serverPacketManipulation: serverPacketManipulation,
  1662. replayedServerPacketManipulation: replayedServerPacketManipulation,
  1663. peerAddr: peerAddr,
  1664. isFirstTunnelInSession: true,
  1665. qualityMetrics: newQualityMetrics(),
  1666. tcpPortForwardLRU: common.NewLRUConns(),
  1667. signalIssueSLOKs: make(chan struct{}, 1),
  1668. runCtx: runCtx,
  1669. stopRunning: stopRunning,
  1670. stopped: make(chan struct{}),
  1671. sendAlertRequests: make(chan protocol.AlertRequest, ALERT_REQUEST_QUEUE_BUFFER_SIZE),
  1672. sentAlertRequests: make(map[string]bool),
  1673. }
  1674. client.tcpTrafficState.availablePortForwardCond = sync.NewCond(new(sync.Mutex))
  1675. client.udpTrafficState.availablePortForwardCond = sync.NewCond(new(sync.Mutex))
  1676. // In the case of in-proxy tunnel protocols, clientGeoIPData is not set
  1677. // until the original client IP is relayed from the broker during the
  1678. // handshake. In other cases, clientGeoIPData is the peerGeoIPData
  1679. // (this includes fronted meek).
  1680. client.peerGeoIPData = peerGeoIPData
  1681. if !client.isInproxyTunnelProtocol {
  1682. client.clientGeoIPData = peerGeoIPData
  1683. }
  1684. return client
  1685. }
  1686. // getClientGeoIPData gets sshClient.clientGeoIPData. Use this helper when
  1687. // accessing this field without already holding a lock on the sshClient
  1688. // mutex. Unlike older code and unlike with client.peerGeoIPData,
  1689. // sshClient.clientGeoIPData is not static and may get set during the
  1690. // handshake, and it is not safe to access it without a lock.
  1691. func (sshClient *sshClient) getClientGeoIPData() GeoIPData {
  1692. sshClient.Lock()
  1693. defer sshClient.Unlock()
  1694. return sshClient.clientGeoIPData
  1695. }
  1696. func (sshClient *sshClient) run(
  1697. baseConn net.Conn, onSSHHandshakeFinished func()) {
  1698. // When run returns, the client has fully stopped, with all SSH state torn
  1699. // down and no port forwards or API requests in progress.
  1700. defer close(sshClient.stopped)
  1701. // onSSHHandshakeFinished must be called even if the SSH handshake is aborted.
  1702. defer func() {
  1703. if onSSHHandshakeFinished != nil {
  1704. onSSHHandshakeFinished()
  1705. }
  1706. }()
  1707. // Set initial traffic rules, pre-handshake, based on currently known info.
  1708. sshClient.setTrafficRules()
  1709. conn := baseConn
  1710. // Wrap the base client connection with an ActivityMonitoredConn which will
  1711. // terminate the connection if no data is received before the deadline. This
  1712. // timeout is in effect for the entire duration of the SSH connection. Clients
  1713. // must actively use the connection or send SSH keep alive requests to keep
  1714. // the connection active. Writes are not considered reliable activity indicators
  1715. // due to buffering.
  1716. activityConn, err := common.NewActivityMonitoredConn(
  1717. conn,
  1718. SSH_CONNECTION_READ_DEADLINE,
  1719. false,
  1720. nil)
  1721. if err != nil {
  1722. conn.Close()
  1723. if !isExpectedTunnelIOError(err) {
  1724. log.WithTraceFields(LogFields{"error": err}).Error("NewActivityMonitoredConn failed")
  1725. }
  1726. return
  1727. }
  1728. conn = activityConn
  1729. // Further wrap the connection with burst monitoring, when enabled.
  1730. //
  1731. // Limitations:
  1732. //
  1733. // - Burst parameters are fixed for the duration of the tunnel and do not
  1734. // change after a tactics hot reload.
  1735. //
  1736. // - In the case of in-proxy tunnel protocols, the original client IP is
  1737. // not yet known, and so burst monitoring GeoIP targeting uses the peer
  1738. // IP, which is the proxy, not the client.
  1739. var burstConn *common.BurstMonitoredConn
  1740. p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.peerGeoIPData)
  1741. if err != nil {
  1742. log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
  1743. "ServerTacticsParametersCache.Get failed")
  1744. return
  1745. }
  1746. if !p.IsNil() {
  1747. upstreamTargetBytes := int64(p.Int(parameters.ServerBurstUpstreamTargetBytes))
  1748. upstreamDeadline := p.Duration(parameters.ServerBurstUpstreamDeadline)
  1749. downstreamTargetBytes := int64(p.Int(parameters.ServerBurstDownstreamTargetBytes))
  1750. downstreamDeadline := p.Duration(parameters.ServerBurstDownstreamDeadline)
  1751. if (upstreamDeadline != 0 && upstreamTargetBytes != 0) ||
  1752. (downstreamDeadline != 0 && downstreamTargetBytes != 0) {
  1753. burstConn = common.NewBurstMonitoredConn(
  1754. conn,
  1755. true,
  1756. upstreamTargetBytes, upstreamDeadline,
  1757. downstreamTargetBytes, downstreamDeadline)
  1758. conn = burstConn
  1759. }
  1760. }
  1761. // Allow garbage collection.
  1762. p.Close()
  1763. // Further wrap the connection in a rate limiting ThrottledConn. The
  1764. // underlying dialConn is always a stream, even when the network conn
  1765. // uses UDP.
  1766. throttledConn := common.NewThrottledConn(conn, true, sshClient.rateLimits())
  1767. conn = throttledConn
  1768. // Replay of server-side parameters is set or extended after a new tunnel
  1769. // meets duration and bytes transferred targets. Set a timer now that expires
  1770. // shortly after the target duration. When the timer fires, check the time of
  1771. // last byte read (a read indicating a live connection with the client),
  1772. // along with total bytes transferred and set or extend replay if the targets
  1773. // are met.
  1774. //
  1775. // Both target checks are conservative: the tunnel may be healthy, but a byte
  1776. // may not have been read in the last second when the timer fires. Or bytes
  1777. // may be transferring, but not at the target level. Only clients that meet
  1778. // the strict targets at the single check time will trigger replay; however,
  1779. // this replay will impact all clients with similar GeoIP data.
  1780. //
  1781. // A deferred function cancels the timer and also increments the replay
  1782. // failure counter, which will ultimately clear replay parameters, when the
  1783. // tunnel fails before the API handshake is completed (this includes any
  1784. // liveness test).
  1785. //
  1786. // A tunnel which fails to meet the targets but successfully completes any
  1787. // liveness test and the API handshake is ignored in terms of replay scoring.
  1788. //
  1789. // In the case of in-proxy tunnel protocols, the peer address will be the
  1790. // proxy, not the client, and GeoIP targeted replay will apply to the 2nd
  1791. // hop.
  1792. isReplayCandidate, replayWaitDuration, replayTargetDuration :=
  1793. sshClient.sshServer.support.ReplayCache.GetReplayTargetDuration(sshClient.peerGeoIPData)
  1794. if isReplayCandidate {
  1795. getFragmentorSeed := func() *prng.Seed {
  1796. fragmentor, ok := baseConn.(common.FragmentorAccessor)
  1797. if ok {
  1798. fragmentorSeed, _ := fragmentor.GetReplay()
  1799. return fragmentorSeed
  1800. }
  1801. return nil
  1802. }
  1803. setReplayAfterFunc := time.AfterFunc(
  1804. replayWaitDuration,
  1805. func() {
  1806. if activityConn.GetActiveDuration() >= replayTargetDuration {
  1807. sshClient.Lock()
  1808. bytesUp := sshClient.tcpTrafficState.bytesUp + sshClient.udpTrafficState.bytesUp
  1809. bytesDown := sshClient.tcpTrafficState.bytesDown + sshClient.udpTrafficState.bytesDown
  1810. sshClient.Unlock()
  1811. sshClient.sshServer.support.ReplayCache.SetReplayParameters(
  1812. sshClient.tunnelProtocol,
  1813. sshClient.peerGeoIPData,
  1814. sshClient.serverPacketManipulation,
  1815. getFragmentorSeed(),
  1816. bytesUp,
  1817. bytesDown)
  1818. }
  1819. })
  1820. defer func() {
  1821. setReplayAfterFunc.Stop()
  1822. completed, _ := sshClient.getHandshaked()
  1823. if !completed {
  1824. // Count a replay failure case when a tunnel used replay parameters
  1825. // (excluding OSSH fragmentation, which doesn't use the ReplayCache) and
  1826. // failed to complete the API handshake.
  1827. replayedFragmentation := false
  1828. if sshClient.tunnelProtocol != protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH {
  1829. fragmentor, ok := baseConn.(common.FragmentorAccessor)
  1830. if ok {
  1831. _, replayedFragmentation = fragmentor.GetReplay()
  1832. }
  1833. }
  1834. usedReplay := replayedFragmentation || sshClient.replayedServerPacketManipulation
  1835. if usedReplay {
  1836. sshClient.sshServer.support.ReplayCache.FailedReplayParameters(
  1837. sshClient.tunnelProtocol,
  1838. sshClient.peerGeoIPData,
  1839. sshClient.serverPacketManipulation,
  1840. getFragmentorSeed())
  1841. }
  1842. }
  1843. }()
  1844. }
  1845. // Run the initial [obfuscated] SSH handshake in a goroutine so we can both
  1846. // respect shutdownBroadcast and implement a specific handshake timeout.
  1847. // The timeout is to reclaim network resources in case the handshake takes
  1848. // too long.
  1849. type sshNewServerConnResult struct {
  1850. obfuscatedSSHConn *obfuscator.ObfuscatedSSHConn
  1851. sshConn *ssh.ServerConn
  1852. channels <-chan ssh.NewChannel
  1853. requests <-chan *ssh.Request
  1854. err error
  1855. }
  1856. resultChannel := make(chan *sshNewServerConnResult, 2)
  1857. var sshHandshakeAfterFunc *time.Timer
  1858. if sshClient.sshServer.support.Config.sshHandshakeTimeout > 0 {
  1859. sshHandshakeAfterFunc = time.AfterFunc(sshClient.sshServer.support.Config.sshHandshakeTimeout, func() {
  1860. resultChannel <- &sshNewServerConnResult{err: std_errors.New("ssh handshake timeout")}
  1861. })
  1862. }
  1863. go func(baseConn, conn net.Conn) {
  1864. sshServerConfig := &ssh.ServerConfig{
  1865. PasswordCallback: sshClient.passwordCallback,
  1866. AuthLogCallback: sshClient.authLogCallback,
  1867. ServerVersion: sshClient.sshServer.support.Config.SSHServerVersion,
  1868. }
  1869. sshServerConfig.AddHostKey(sshClient.sshServer.sshHostKey)
  1870. var err error
  1871. if protocol.TunnelProtocolUsesObfuscatedSSH(sshClient.tunnelProtocol) {
  1872. // With Encrypt-then-MAC hash algorithms, packet length is
  1873. // transmitted in plaintext, which aids in traffic analysis;
  1874. // clients may still send Encrypt-then-MAC algorithms in their
  1875. // KEX_INIT message, but do not select these algorithms.
  1876. //
  1877. // The exception is TUNNEL_PROTOCOL_SSH, which is intended to appear
  1878. // like SSH on the wire.
  1879. sshServerConfig.NoEncryptThenMACHash = true
  1880. } else {
  1881. // For TUNNEL_PROTOCOL_SSH only, randomize KEX.
  1882. if sshClient.sshServer.support.Config.ObfuscatedSSHKey != "" {
  1883. sshServerConfig.KEXPRNGSeed, err = protocol.DeriveSSHServerKEXPRNGSeed(
  1884. sshClient.sshServer.support.Config.ObfuscatedSSHKey)
  1885. if err != nil {
  1886. err = errors.Trace(err)
  1887. }
  1888. }
  1889. }
  1890. result := &sshNewServerConnResult{}
  1891. // Wrap the connection in an SSH deobfuscator when required.
  1892. if err == nil && protocol.TunnelProtocolUsesObfuscatedSSH(sshClient.tunnelProtocol) {
  1893. // In the case of in-proxy tunnel protocols, the peer address will
  1894. // be the proxy, not the client, and GeoIP targeted server-side
  1895. // OSSH tactics, including prefixes, will apply to the 2nd hop.
  1896. //
  1897. // It is recommended to set ServerOSSHPrefixSpecs, etc., in default
  1898. // tactics.
  1899. p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.peerGeoIPData)
  1900. // Log error, but continue. A default prefix spec will be used by the server.
  1901. if err != nil {
  1902. log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
  1903. "ServerTacticsParametersCache.Get failed")
  1904. }
  1905. var osshPrefixEnableFragmentor bool = false
  1906. var serverOsshPrefixSpecs transforms.Specs = nil
  1907. var minDelay, maxDelay time.Duration
  1908. if !p.IsNil() {
  1909. osshPrefixEnableFragmentor = p.Bool(parameters.OSSHPrefixEnableFragmentor)
  1910. serverOsshPrefixSpecs = p.ProtocolTransformSpecs(parameters.ServerOSSHPrefixSpecs)
  1911. minDelay = p.Duration(parameters.OSSHPrefixSplitMinDelay)
  1912. maxDelay = p.Duration(parameters.OSSHPrefixSplitMaxDelay)
  1913. // Allow garbage collection.
  1914. p.Close()
  1915. }
  1916. // Note: NewServerObfuscatedSSHConn blocks on network I/O
  1917. // TODO: ensure this won't block shutdown
  1918. result.obfuscatedSSHConn, err = obfuscator.NewServerObfuscatedSSHConn(
  1919. conn,
  1920. sshClient.sshServer.support.Config.ObfuscatedSSHKey,
  1921. sshClient.sshServer.obfuscatorSeedHistory,
  1922. serverOsshPrefixSpecs,
  1923. func(peerIP string, err error, logFields common.LogFields) {
  1924. logIrregularTunnel(
  1925. sshClient.sshServer.support,
  1926. sshClient.sshListener.tunnelProtocol,
  1927. sshClient.sshListener.port,
  1928. peerIP,
  1929. errors.Trace(err),
  1930. LogFields(logFields))
  1931. })
  1932. if err != nil {
  1933. err = errors.Trace(err)
  1934. } else {
  1935. conn = result.obfuscatedSSHConn
  1936. }
  1937. // Set the OSSH prefix split config.
  1938. if err == nil && result.obfuscatedSSHConn.IsOSSHPrefixStream() {
  1939. err = result.obfuscatedSSHConn.SetOSSHPrefixSplitConfig(minDelay, maxDelay)
  1940. // Log error, but continue.
  1941. if err != nil {
  1942. log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
  1943. "SetOSSHPrefixSplitConfig failed")
  1944. }
  1945. }
  1946. // Seed the fragmentor, when present, with seed derived from initial
  1947. // obfuscator message. See tactics.Listener.Accept. This must preceed
  1948. // ssh.NewServerConn to ensure fragmentor is seeded before downstream bytes
  1949. // are written.
  1950. if err == nil && protocol.TunnelProtocolIsObfuscatedSSH(sshClient.tunnelProtocol) {
  1951. fragmentor, ok := baseConn.(common.FragmentorAccessor)
  1952. if ok {
  1953. var fragmentorPRNG *prng.PRNG
  1954. fragmentorPRNG, err = result.obfuscatedSSHConn.GetDerivedPRNG("server-side-fragmentor")
  1955. if err != nil {
  1956. err = errors.Trace(err)
  1957. } else {
  1958. fragmentor.SetReplay(fragmentorPRNG)
  1959. }
  1960. // Stops the fragmentor if disabled for prefixed OSSH streams.
  1961. if !osshPrefixEnableFragmentor && result.obfuscatedSSHConn.IsOSSHPrefixStream() {
  1962. fragmentor.StopFragmenting()
  1963. }
  1964. }
  1965. }
  1966. }
  1967. if err == nil {
  1968. result.sshConn, result.channels, result.requests, err =
  1969. ssh.NewServerConn(conn, sshServerConfig)
  1970. if err != nil {
  1971. err = errors.Trace(err)
  1972. }
  1973. }
  1974. result.err = err
  1975. resultChannel <- result
  1976. }(baseConn, conn)
  1977. var result *sshNewServerConnResult
  1978. select {
  1979. case result = <-resultChannel:
  1980. case <-sshClient.sshServer.shutdownBroadcast:
  1981. // Close() will interrupt an ongoing handshake
  1982. // TODO: wait for SSH handshake goroutines to exit before returning?
  1983. conn.Close()
  1984. return
  1985. }
  1986. if sshHandshakeAfterFunc != nil {
  1987. sshHandshakeAfterFunc.Stop()
  1988. }
  1989. if result.err != nil {
  1990. conn.Close()
  1991. // This is a Debug log due to noise. The handshake often fails due to I/O
  1992. // errors as clients frequently interrupt connections in progress when
  1993. // client-side load balancing completes a connection to a different server.
  1994. log.WithTraceFields(LogFields{"error": result.err}).Debug("SSH handshake failed")
  1995. return
  1996. }
  1997. // The SSH handshake has finished successfully; notify now to allow other
  1998. // blocked SSH handshakes to proceed.
  1999. if onSSHHandshakeFinished != nil {
  2000. onSSHHandshakeFinished()
  2001. }
  2002. onSSHHandshakeFinished = nil
  2003. sshClient.Lock()
  2004. sshClient.sshConn = result.sshConn
  2005. sshClient.throttledConn = throttledConn
  2006. sshClient.Unlock()
  2007. if !sshClient.sshServer.registerEstablishedClient(sshClient) {
  2008. conn.Close()
  2009. log.WithTrace().Warning("register failed")
  2010. return
  2011. }
  2012. sshClient.runTunnel(result.channels, result.requests)
  2013. // Note: sshServer.unregisterEstablishedClient calls sshClient.stop(),
  2014. // which also closes underlying transport Conn.
  2015. sshClient.sshServer.unregisterEstablishedClient(sshClient)
  2016. // Log tunnel metrics.
  2017. var additionalMetrics []LogFields
  2018. // Add activity and burst metrics.
  2019. //
  2020. // The reported duration is based on last confirmed data transfer, which for
  2021. // sshClient.activityConn.GetActiveDuration() is time of last read byte and
  2022. // not conn close time. This is important for protocols such as meek. For
  2023. // meek, the connection remains open until the HTTP session expires, which
  2024. // may be some time after the tunnel has closed. (The meek protocol has no
  2025. // allowance for signalling payload EOF, and even if it did the client may
  2026. // not have the opportunity to send a final request with an EOF flag set.)
  2027. activityMetrics := make(LogFields)
  2028. activityMetrics["start_time"] = activityConn.GetStartTime()
  2029. activityMetrics["duration"] = int64(activityConn.GetActiveDuration() / time.Millisecond)
  2030. additionalMetrics = append(additionalMetrics, activityMetrics)
  2031. if burstConn != nil {
  2032. // Any outstanding burst should be recorded by burstConn.Close which should
  2033. // be called by unregisterEstablishedClient.
  2034. additionalMetrics = append(
  2035. additionalMetrics, LogFields(burstConn.GetMetrics(activityConn.GetStartTime())))
  2036. }
  2037. // Some conns report additional metrics. Meek conns report resiliency
  2038. // metrics and fragmentor.Conns report fragmentor configs.
  2039. if metricsSource, ok := baseConn.(common.MetricsSource); ok {
  2040. additionalMetrics = append(
  2041. additionalMetrics, LogFields(metricsSource.GetMetrics()))
  2042. }
  2043. if result.obfuscatedSSHConn != nil {
  2044. additionalMetrics = append(
  2045. additionalMetrics, LogFields(result.obfuscatedSSHConn.GetMetrics()))
  2046. }
  2047. // Add server-replay metrics.
  2048. replayMetrics := make(LogFields)
  2049. replayedFragmentation := false
  2050. fragmentor, ok := baseConn.(common.FragmentorAccessor)
  2051. if ok {
  2052. _, replayedFragmentation = fragmentor.GetReplay()
  2053. }
  2054. replayMetrics["server_replay_fragmentation"] = replayedFragmentation
  2055. replayMetrics["server_replay_packet_manipulation"] = sshClient.replayedServerPacketManipulation
  2056. additionalMetrics = append(additionalMetrics, replayMetrics)
  2057. // Limitation: there's only one log per tunnel with bytes transferred
  2058. // metrics, so the byte count can't be attributed to a certain day for
  2059. // tunnels that remain connected for well over 24h. In practise, most
  2060. // tunnels are short-lived, especially on mobile devices.
  2061. sshClient.logTunnel(additionalMetrics)
  2062. // Transfer OSL seed state -- the OSL progress -- from the closing
  2063. // client to the session cache so the client can resume its progress
  2064. // if it reconnects to this same server.
  2065. // Note: following setOSLConfig order of locking.
  2066. sshClient.Lock()
  2067. if sshClient.oslClientSeedState != nil {
  2068. sshClient.sshServer.oslSessionCacheMutex.Lock()
  2069. sshClient.oslClientSeedState.Hibernate()
  2070. sshClient.sshServer.oslSessionCache.Set(
  2071. sshClient.sessionID, sshClient.oslClientSeedState, cache.DefaultExpiration)
  2072. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  2073. sshClient.oslClientSeedState = nil
  2074. }
  2075. sshClient.Unlock()
  2076. // Set the GeoIP session cache to expire; up to this point, the entry for
  2077. // this session ID has no expiry; retaining entries after the tunnel
  2078. // disconnects supports first-tunnel-in-session and duplicate
  2079. // authorization logic.
  2080. sshClient.sshServer.markGeoIPSessionCacheToExpire(sshClient.sessionID)
  2081. }
  2082. func (sshClient *sshClient) passwordCallback(conn ssh.ConnMetadata, password []byte) (*ssh.Permissions, error) {
  2083. expectedSessionIDLength := 2 * protocol.PSIPHON_API_CLIENT_SESSION_ID_LENGTH
  2084. expectedSSHPasswordLength := 2 * SSH_PASSWORD_BYTE_LENGTH
  2085. var sshPasswordPayload protocol.SSHPasswordPayload
  2086. err := json.Unmarshal(password, &sshPasswordPayload)
  2087. if err != nil {
  2088. // Backwards compatibility case: instead of a JSON payload, older clients
  2089. // send the hex encoded session ID prepended to the SSH password.
  2090. // Note: there's an even older case where clients don't send any session ID,
  2091. // but that's no longer supported.
  2092. if len(password) == expectedSessionIDLength+expectedSSHPasswordLength {
  2093. sshPasswordPayload.SessionId = string(password[0:expectedSessionIDLength])
  2094. sshPasswordPayload.SshPassword = string(password[expectedSessionIDLength:])
  2095. } else {
  2096. return nil, errors.Tracef("invalid password payload for %q", conn.User())
  2097. }
  2098. }
  2099. if !isHexDigits(sshClient.sshServer.support.Config, sshPasswordPayload.SessionId) ||
  2100. len(sshPasswordPayload.SessionId) != expectedSessionIDLength {
  2101. return nil, errors.Tracef("invalid session ID for %q", conn.User())
  2102. }
  2103. userOk := (subtle.ConstantTimeCompare(
  2104. []byte(conn.User()), []byte(sshClient.sshServer.support.Config.SSHUserName)) == 1)
  2105. passwordOk := (subtle.ConstantTimeCompare(
  2106. []byte(sshPasswordPayload.SshPassword), []byte(sshClient.sshServer.support.Config.SSHPassword)) == 1)
  2107. if !userOk || !passwordOk {
  2108. return nil, errors.Tracef("invalid password for %q", conn.User())
  2109. }
  2110. sessionID := sshPasswordPayload.SessionId
  2111. // The GeoIP session cache will be populated if there was a previous tunnel
  2112. // with this session ID. This will be true up to GEOIP_SESSION_CACHE_TTL.
  2113. isFirstTunnelInSession := !sshClient.sshServer.inGeoIPSessionCache(sessionID)
  2114. supportsServerRequests := common.Contains(
  2115. sshPasswordPayload.ClientCapabilities, protocol.CLIENT_CAPABILITY_SERVER_REQUESTS)
  2116. sshClient.Lock()
  2117. // After this point, these values are read-only as they are read
  2118. // without obtaining sshClient.Lock.
  2119. sshClient.sessionID = sessionID
  2120. sshClient.isFirstTunnelInSession = isFirstTunnelInSession
  2121. sshClient.supportsServerRequests = supportsServerRequests
  2122. sshClient.Unlock()
  2123. // Initially, in the case of in-proxy tunnel protocols, the GeoIP session
  2124. // cache entry will be the proxy's GeoIPData. This is updated to be the
  2125. // client's GeoIPData in setHandshakeState.
  2126. sshClient.sshServer.setGeoIPSessionCache(sessionID, sshClient.peerGeoIPData)
  2127. return nil, nil
  2128. }
  2129. func (sshClient *sshClient) authLogCallback(conn ssh.ConnMetadata, method string, err error) {
  2130. if err != nil {
  2131. if method == "none" && err.Error() == "ssh: no auth passed yet" {
  2132. // In this case, the callback invocation is noise from auth negotiation
  2133. return
  2134. }
  2135. // Note: here we previously logged messages for fail2ban to act on. This is no longer
  2136. // done as the complexity outweighs the benefits.
  2137. //
  2138. // - The SSH credential is not secret -- it's in the server entry. Attackers targeting
  2139. // the server likely already have the credential. On the other hand, random scanning and
  2140. // brute forcing is mitigated with high entropy random passwords, rate limiting
  2141. // (implemented on the host via iptables), and limited capabilities (the SSH session can
  2142. // only port forward).
  2143. //
  2144. // - fail2ban coverage was inconsistent; in the case of an unfronted meek protocol through
  2145. // an upstream proxy, the remote address is the upstream proxy, which should not be blocked.
  2146. // The X-Forwarded-For header cant be used instead as it may be forged and used to get IPs
  2147. // deliberately blocked; and in any case fail2ban adds iptables rules which can only block
  2148. // by direct remote IP, not by original client IP. Fronted meek has the same iptables issue.
  2149. //
  2150. // Random scanning and brute forcing of port 22 will result in log noise. To mitigate this,
  2151. // not every authentication failure is logged. A summary log is emitted periodically to
  2152. // retain some record of this activity in case this is relevant to, e.g., a performance
  2153. // investigation.
  2154. atomic.AddInt64(&sshClient.sshServer.authFailedCount, 1)
  2155. lastAuthLog := monotime.Time(atomic.LoadInt64(&sshClient.sshServer.lastAuthLog))
  2156. if monotime.Since(lastAuthLog) > SSH_AUTH_LOG_PERIOD {
  2157. now := int64(monotime.Now())
  2158. if atomic.CompareAndSwapInt64(&sshClient.sshServer.lastAuthLog, int64(lastAuthLog), now) {
  2159. count := atomic.SwapInt64(&sshClient.sshServer.authFailedCount, 0)
  2160. log.WithTraceFields(
  2161. LogFields{"lastError": err, "failedCount": count}).Warning("authentication failures")
  2162. }
  2163. }
  2164. log.WithTraceFields(LogFields{"error": err, "method": method}).Debug("authentication failed")
  2165. } else {
  2166. log.WithTraceFields(LogFields{"error": err, "method": method}).Debug("authentication success")
  2167. }
  2168. }
  2169. // stop signals the ssh connection to shutdown. After sshConn.Wait returns,
  2170. // the SSH connection has terminated but sshClient.run may still be running and
  2171. // in the process of exiting.
  2172. //
  2173. // The shutdown process must complete rapidly and not, e.g., block on network
  2174. // I/O, as newly connecting clients need to await stop completion of any
  2175. // existing connection that shares the same session ID.
  2176. func (sshClient *sshClient) stop() {
  2177. sshClient.sshConn.Close()
  2178. sshClient.sshConn.Wait()
  2179. }
  2180. // awaitStopped will block until sshClient.run has exited, at which point all
  2181. // worker goroutines associated with the sshClient, including any in-flight
  2182. // API handlers, will have exited.
  2183. func (sshClient *sshClient) awaitStopped() {
  2184. <-sshClient.stopped
  2185. }
  2186. // runTunnel handles/dispatches new channels and new requests from the client.
  2187. // When the SSH client connection closes, both the channels and requests channels
  2188. // will close and runTunnel will exit.
  2189. func (sshClient *sshClient) runTunnel(
  2190. channels <-chan ssh.NewChannel,
  2191. requests <-chan *ssh.Request) {
  2192. waitGroup := new(sync.WaitGroup)
  2193. // Start client SSH API request handler
  2194. waitGroup.Add(1)
  2195. go func() {
  2196. defer waitGroup.Done()
  2197. sshClient.handleSSHRequests(requests)
  2198. }()
  2199. // Start request senders
  2200. if sshClient.supportsServerRequests {
  2201. waitGroup.Add(1)
  2202. go func() {
  2203. defer waitGroup.Done()
  2204. sshClient.runOSLSender()
  2205. }()
  2206. waitGroup.Add(1)
  2207. go func() {
  2208. defer waitGroup.Done()
  2209. sshClient.runAlertSender()
  2210. }()
  2211. }
  2212. // Start the TCP port forward manager
  2213. // The queue size is set to the traffic rules (MaxTCPPortForwardCount +
  2214. // MaxTCPDialingPortForwardCount), which is a reasonable indication of resource
  2215. // limits per client; when that value is not set, a default is used.
  2216. // A limitation: this queue size is set once and doesn't change, for this client,
  2217. // when traffic rules are reloaded.
  2218. queueSize := sshClient.getTCPPortForwardQueueSize()
  2219. if queueSize == 0 {
  2220. queueSize = SSH_TCP_PORT_FORWARD_QUEUE_SIZE
  2221. }
  2222. newTCPPortForwards := make(chan *newTCPPortForward, queueSize)
  2223. waitGroup.Add(1)
  2224. go func() {
  2225. defer waitGroup.Done()
  2226. sshClient.handleTCPPortForwards(waitGroup, newTCPPortForwards)
  2227. }()
  2228. // Handle new channel (port forward) requests from the client.
  2229. for newChannel := range channels {
  2230. switch newChannel.ChannelType() {
  2231. case protocol.RANDOM_STREAM_CHANNEL_TYPE:
  2232. sshClient.handleNewRandomStreamChannel(waitGroup, newChannel)
  2233. case protocol.PACKET_TUNNEL_CHANNEL_TYPE:
  2234. sshClient.handleNewPacketTunnelChannel(waitGroup, newChannel)
  2235. case protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE:
  2236. // The protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE is the same as
  2237. // "direct-tcpip", except split tunnel channel rejections are disallowed
  2238. // even if the client has enabled split tunnel. This channel type allows
  2239. // the client to ensure tunneling for certain cases while split tunnel is
  2240. // enabled.
  2241. sshClient.handleNewTCPPortForwardChannel(waitGroup, newChannel, false, newTCPPortForwards)
  2242. case "direct-tcpip":
  2243. sshClient.handleNewTCPPortForwardChannel(waitGroup, newChannel, true, newTCPPortForwards)
  2244. default:
  2245. sshClient.rejectNewChannel(newChannel,
  2246. fmt.Sprintf("unknown or unsupported channel type: %s", newChannel.ChannelType()))
  2247. }
  2248. }
  2249. // The channel loop is interrupted by a client
  2250. // disconnect or by calling sshClient.stop().
  2251. // Stop the TCP port forward manager
  2252. close(newTCPPortForwards)
  2253. // Stop all other worker goroutines
  2254. sshClient.stopRunning()
  2255. if sshClient.sshServer.support.Config.RunPacketTunnel {
  2256. // PacketTunnelServer.ClientDisconnected stops packet tunnel workers.
  2257. sshClient.sshServer.support.PacketTunnelServer.ClientDisconnected(
  2258. sshClient.sessionID)
  2259. }
  2260. waitGroup.Wait()
  2261. sshClient.cleanupAuthorizations()
  2262. }
  2263. func (sshClient *sshClient) handleSSHRequests(requests <-chan *ssh.Request) {
  2264. for request := range requests {
  2265. // Requests are processed serially; API responses must be sent in request order.
  2266. var responsePayload []byte
  2267. var err error
  2268. if request.Type == "keepalive@openssh.com" {
  2269. // SSH keep alive round trips are used as speed test samples.
  2270. responsePayload, err = tactics.MakeSpeedTestResponse(
  2271. SSH_KEEP_ALIVE_PAYLOAD_MIN_BYTES, SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES)
  2272. } else {
  2273. // All other requests are assumed to be API requests.
  2274. responsePayload, err = sshAPIRequestHandler(
  2275. sshClient.sshServer.support,
  2276. sshClient,
  2277. request.Type,
  2278. request.Payload)
  2279. }
  2280. if err == nil {
  2281. err = request.Reply(true, responsePayload)
  2282. } else {
  2283. log.WithTraceFields(LogFields{"error": err}).Warning("request failed")
  2284. err = request.Reply(false, nil)
  2285. }
  2286. if err != nil {
  2287. if !isExpectedTunnelIOError(err) {
  2288. log.WithTraceFields(LogFields{"error": err}).Warning("response failed")
  2289. }
  2290. }
  2291. }
  2292. }
  2293. type newTCPPortForward struct {
  2294. enqueueTime time.Time
  2295. hostToConnect string
  2296. portToConnect int
  2297. doSplitTunnel bool
  2298. newChannel ssh.NewChannel
  2299. }
  2300. func (sshClient *sshClient) handleTCPPortForwards(
  2301. waitGroup *sync.WaitGroup,
  2302. newTCPPortForwards chan *newTCPPortForward) {
  2303. // Lifecycle of a TCP port forward:
  2304. //
  2305. // 1. A "direct-tcpip" SSH request is received from the client.
  2306. //
  2307. // A new TCP port forward request is enqueued. The queue delivers TCP port
  2308. // forward requests to the TCP port forward manager, which enforces the TCP
  2309. // port forward dial limit.
  2310. //
  2311. // Enqueuing new requests allows for reading further SSH requests from the
  2312. // client without blocking when the dial limit is hit; this is to permit new
  2313. // UDP/udpgw port forwards to be restablished without delay. The maximum size
  2314. // of the queue enforces a hard cap on resources consumed by a client in the
  2315. // pre-dial phase. When the queue is full, new TCP port forwards are
  2316. // immediately rejected.
  2317. //
  2318. // 2. The TCP port forward manager dequeues the request.
  2319. //
  2320. // The manager calls dialingTCPPortForward(), which increments
  2321. // concurrentDialingPortForwardCount, and calls
  2322. // isTCPDialingPortForwardLimitExceeded() to check the concurrent dialing
  2323. // count.
  2324. //
  2325. // The manager enforces the concurrent TCP dial limit: when at the limit, the
  2326. // manager blocks waiting for the number of dials to drop below the limit before
  2327. // dispatching the request to handleTCPChannel(), which will run in its own
  2328. // goroutine and will dial and relay the port forward.
  2329. //
  2330. // The block delays the current request and also halts dequeuing of subsequent
  2331. // requests and could ultimately cause requests to be immediately rejected if
  2332. // the queue fills. These actions are intended to apply back pressure when
  2333. // upstream network resources are impaired.
  2334. //
  2335. // The time spent in the queue is deducted from the port forward's dial timeout.
  2336. // The time spent blocking while at the dial limit is similarly deducted from
  2337. // the dial timeout. If the dial timeout has expired before the dial begins, the
  2338. // port forward is rejected and a stat is recorded.
  2339. //
  2340. // 3. handleTCPChannel() performs the port forward dial and relaying.
  2341. //
  2342. // a. Dial the target, using the dial timeout remaining after queue and blocking
  2343. // time is deducted.
  2344. //
  2345. // b. If the dial fails, call abortedTCPPortForward() to decrement
  2346. // concurrentDialingPortForwardCount, freeing up a dial slot.
  2347. //
  2348. // c. If the dial succeeds, call establishedPortForward(), which decrements
  2349. // concurrentDialingPortForwardCount and increments concurrentPortForwardCount,
  2350. // the "established" port forward count.
  2351. //
  2352. // d. Check isPortForwardLimitExceeded(), which enforces the configurable limit on
  2353. // concurrentPortForwardCount, the number of _established_ TCP port forwards.
  2354. // If the limit is exceeded, the LRU established TCP port forward is closed and
  2355. // the newly established TCP port forward proceeds. This LRU logic allows some
  2356. // dangling resource consumption (e.g., TIME_WAIT) while providing a better
  2357. // experience for clients.
  2358. //
  2359. // e. Relay data.
  2360. //
  2361. // f. Call closedPortForward() which decrements concurrentPortForwardCount and
  2362. // records bytes transferred.
  2363. for newPortForward := range newTCPPortForwards {
  2364. remainingDialTimeout :=
  2365. time.Duration(sshClient.getDialTCPPortForwardTimeoutMilliseconds())*time.Millisecond -
  2366. time.Since(newPortForward.enqueueTime)
  2367. if remainingDialTimeout <= 0 {
  2368. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  2369. sshClient.rejectNewChannel(
  2370. newPortForward.newChannel, "TCP port forward timed out in queue")
  2371. continue
  2372. }
  2373. // Reserve a TCP dialing slot.
  2374. //
  2375. // TOCTOU note: important to increment counts _before_ checking limits; otherwise,
  2376. // the client could potentially consume excess resources by initiating many port
  2377. // forwards concurrently.
  2378. sshClient.dialingTCPPortForward()
  2379. // When max dials are in progress, wait up to remainingDialTimeout for dialing
  2380. // to become available. This blocks all dequeing.
  2381. if sshClient.isTCPDialingPortForwardLimitExceeded() {
  2382. blockStartTime := time.Now()
  2383. ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  2384. sshClient.setTCPPortForwardDialingAvailableSignal(cancelCtx)
  2385. <-ctx.Done()
  2386. sshClient.setTCPPortForwardDialingAvailableSignal(nil)
  2387. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  2388. remainingDialTimeout -= time.Since(blockStartTime)
  2389. }
  2390. if remainingDialTimeout <= 0 {
  2391. // Release the dialing slot here since handleTCPChannel() won't be called.
  2392. sshClient.abortedTCPPortForward()
  2393. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  2394. sshClient.rejectNewChannel(
  2395. newPortForward.newChannel, "TCP port forward timed out before dialing")
  2396. continue
  2397. }
  2398. // Dial and relay the TCP port forward. handleTCPChannel is run in its own worker goroutine.
  2399. // handleTCPChannel will release the dialing slot reserved by dialingTCPPortForward(); and
  2400. // will deal with remainingDialTimeout <= 0.
  2401. waitGroup.Add(1)
  2402. go func(remainingDialTimeout time.Duration, newPortForward *newTCPPortForward) {
  2403. defer waitGroup.Done()
  2404. sshClient.handleTCPChannel(
  2405. remainingDialTimeout,
  2406. newPortForward.hostToConnect,
  2407. newPortForward.portToConnect,
  2408. newPortForward.doSplitTunnel,
  2409. newPortForward.newChannel)
  2410. }(remainingDialTimeout, newPortForward)
  2411. }
  2412. }
  2413. func (sshClient *sshClient) handleNewRandomStreamChannel(
  2414. waitGroup *sync.WaitGroup, newChannel ssh.NewChannel) {
  2415. // A random stream channel returns the requested number of bytes -- random
  2416. // bytes -- to the client while also consuming and discarding bytes sent
  2417. // by the client.
  2418. //
  2419. // One use case for the random stream channel is a liveness test that the
  2420. // client performs to confirm that the tunnel is live. As the liveness
  2421. // test is performed in the concurrent establishment phase, before
  2422. // selecting a single candidate for handshake, the random stream channel
  2423. // is available pre-handshake, albeit with additional restrictions.
  2424. //
  2425. // The random stream is subject to throttling in traffic rules; for
  2426. // unthrottled liveness tests, set EstablishmentRead/WriteBytesPerSecond as
  2427. // required. The random stream maximum count and response size cap mitigate
  2428. // clients abusing the facility to waste server resources.
  2429. //
  2430. // Like all other channels, this channel type is handled asynchronously,
  2431. // so it's possible to run at any point in the tunnel lifecycle.
  2432. //
  2433. // Up/downstream byte counts don't include SSH packet and request
  2434. // marshalling overhead.
  2435. var request protocol.RandomStreamRequest
  2436. err := json.Unmarshal(newChannel.ExtraData(), &request)
  2437. if err != nil {
  2438. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("invalid request: %s", err))
  2439. return
  2440. }
  2441. if request.UpstreamBytes > RANDOM_STREAM_MAX_BYTES {
  2442. sshClient.rejectNewChannel(newChannel,
  2443. fmt.Sprintf("invalid upstream bytes: %d", request.UpstreamBytes))
  2444. return
  2445. }
  2446. if request.DownstreamBytes > RANDOM_STREAM_MAX_BYTES {
  2447. sshClient.rejectNewChannel(newChannel,
  2448. fmt.Sprintf("invalid downstream bytes: %d", request.DownstreamBytes))
  2449. return
  2450. }
  2451. var metrics *randomStreamMetrics
  2452. sshClient.Lock()
  2453. if !sshClient.handshakeState.completed {
  2454. metrics = &sshClient.preHandshakeRandomStreamMetrics
  2455. } else {
  2456. metrics = &sshClient.postHandshakeRandomStreamMetrics
  2457. }
  2458. countOk := true
  2459. if !sshClient.handshakeState.completed &&
  2460. metrics.count >= PRE_HANDSHAKE_RANDOM_STREAM_MAX_COUNT {
  2461. countOk = false
  2462. } else {
  2463. metrics.count++
  2464. }
  2465. sshClient.Unlock()
  2466. if !countOk {
  2467. sshClient.rejectNewChannel(newChannel, "max count exceeded")
  2468. return
  2469. }
  2470. channel, requests, err := newChannel.Accept()
  2471. if err != nil {
  2472. if !isExpectedTunnelIOError(err) {
  2473. log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")
  2474. }
  2475. return
  2476. }
  2477. go ssh.DiscardRequests(requests)
  2478. waitGroup.Add(1)
  2479. go func() {
  2480. defer waitGroup.Done()
  2481. upstream := new(sync.WaitGroup)
  2482. received := 0
  2483. sent := 0
  2484. if request.UpstreamBytes > 0 {
  2485. // Process streams concurrently to minimize elapsed time. This also
  2486. // avoids a unidirectional flow burst early in the tunnel lifecycle.
  2487. upstream.Add(1)
  2488. go func() {
  2489. defer upstream.Done()
  2490. n, err := io.CopyN(ioutil.Discard, channel, int64(request.UpstreamBytes))
  2491. received = int(n)
  2492. if err != nil {
  2493. if !isExpectedTunnelIOError(err) {
  2494. log.WithTraceFields(LogFields{"error": err}).Warning("receive failed")
  2495. }
  2496. }
  2497. }()
  2498. }
  2499. if request.DownstreamBytes > 0 {
  2500. n, err := io.CopyN(channel, rand.Reader, int64(request.DownstreamBytes))
  2501. sent = int(n)
  2502. if err != nil {
  2503. if !isExpectedTunnelIOError(err) {
  2504. log.WithTraceFields(LogFields{"error": err}).Warning("send failed")
  2505. }
  2506. }
  2507. }
  2508. upstream.Wait()
  2509. sshClient.Lock()
  2510. metrics.upstreamBytes += int64(request.UpstreamBytes)
  2511. metrics.receivedUpstreamBytes += int64(received)
  2512. metrics.downstreamBytes += int64(request.DownstreamBytes)
  2513. metrics.sentDownstreamBytes += int64(sent)
  2514. sshClient.Unlock()
  2515. channel.Close()
  2516. }()
  2517. }
  2518. func (sshClient *sshClient) handleNewPacketTunnelChannel(
  2519. waitGroup *sync.WaitGroup, newChannel ssh.NewChannel) {
  2520. // packet tunnel channels are handled by the packet tunnel server
  2521. // component. Each client may have at most one packet tunnel channel.
  2522. if !sshClient.sshServer.support.Config.RunPacketTunnel {
  2523. sshClient.rejectNewChannel(newChannel, "unsupported packet tunnel channel type")
  2524. return
  2525. }
  2526. // Accept this channel immediately. This channel will replace any
  2527. // previously existing packet tunnel channel for this client.
  2528. packetTunnelChannel, requests, err := newChannel.Accept()
  2529. if err != nil {
  2530. if !isExpectedTunnelIOError(err) {
  2531. log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")
  2532. }
  2533. return
  2534. }
  2535. go ssh.DiscardRequests(requests)
  2536. sshClient.setPacketTunnelChannel(packetTunnelChannel)
  2537. // PacketTunnelServer will run the client's packet tunnel. If necessary, ClientConnected
  2538. // will stop packet tunnel workers for any previous packet tunnel channel.
  2539. checkAllowedTCPPortFunc := func(upstreamIPAddress net.IP, port int) bool {
  2540. return sshClient.isPortForwardPermitted(portForwardTypeTCP, upstreamIPAddress, port)
  2541. }
  2542. checkAllowedUDPPortFunc := func(upstreamIPAddress net.IP, port int) bool {
  2543. return sshClient.isPortForwardPermitted(portForwardTypeUDP, upstreamIPAddress, port)
  2544. }
  2545. checkAllowedDomainFunc := func(domain string) bool {
  2546. ok, _ := sshClient.isDomainPermitted(domain)
  2547. return ok
  2548. }
  2549. flowActivityUpdaterMaker := func(
  2550. isTCP bool, upstreamHostname string, upstreamIPAddress net.IP) []tun.FlowActivityUpdater {
  2551. trafficType := portForwardTypeTCP
  2552. if !isTCP {
  2553. trafficType = portForwardTypeUDP
  2554. }
  2555. activityUpdaters := sshClient.getActivityUpdaters(trafficType, upstreamIPAddress)
  2556. flowUpdaters := make([]tun.FlowActivityUpdater, len(activityUpdaters))
  2557. for i, activityUpdater := range activityUpdaters {
  2558. flowUpdaters[i] = activityUpdater
  2559. }
  2560. return flowUpdaters
  2561. }
  2562. metricUpdater := func(
  2563. TCPApplicationBytesDown, TCPApplicationBytesUp,
  2564. UDPApplicationBytesDown, UDPApplicationBytesUp int64) {
  2565. sshClient.Lock()
  2566. sshClient.tcpTrafficState.bytesDown += TCPApplicationBytesDown
  2567. sshClient.tcpTrafficState.bytesUp += TCPApplicationBytesUp
  2568. sshClient.udpTrafficState.bytesDown += UDPApplicationBytesDown
  2569. sshClient.udpTrafficState.bytesUp += UDPApplicationBytesUp
  2570. sshClient.Unlock()
  2571. }
  2572. dnsQualityReporter := sshClient.updateQualityMetricsWithDNSResult
  2573. err = sshClient.sshServer.support.PacketTunnelServer.ClientConnected(
  2574. sshClient.sessionID,
  2575. packetTunnelChannel,
  2576. checkAllowedTCPPortFunc,
  2577. checkAllowedUDPPortFunc,
  2578. checkAllowedDomainFunc,
  2579. flowActivityUpdaterMaker,
  2580. metricUpdater,
  2581. dnsQualityReporter)
  2582. if err != nil {
  2583. log.WithTraceFields(LogFields{"error": err}).Warning("start packet tunnel client failed")
  2584. sshClient.setPacketTunnelChannel(nil)
  2585. }
  2586. }
  2587. func (sshClient *sshClient) handleNewTCPPortForwardChannel(
  2588. waitGroup *sync.WaitGroup,
  2589. newChannel ssh.NewChannel,
  2590. allowSplitTunnel bool,
  2591. newTCPPortForwards chan *newTCPPortForward) {
  2592. // udpgw client connections are dispatched immediately (clients use this for
  2593. // DNS, so it's essential to not block; and only one udpgw connection is
  2594. // retained at a time).
  2595. //
  2596. // All other TCP port forwards are dispatched via the TCP port forward
  2597. // manager queue.
  2598. // http://tools.ietf.org/html/rfc4254#section-7.2
  2599. var directTcpipExtraData struct {
  2600. HostToConnect string
  2601. PortToConnect uint32
  2602. OriginatorIPAddress string
  2603. OriginatorPort uint32
  2604. }
  2605. err := ssh.Unmarshal(newChannel.ExtraData(), &directTcpipExtraData)
  2606. if err != nil {
  2607. sshClient.rejectNewChannel(newChannel, "invalid extra data")
  2608. return
  2609. }
  2610. // Intercept TCP port forwards to a specified udpgw server and handle directly.
  2611. // TODO: also support UDP explicitly, e.g. with a custom "direct-udp" channel type?
  2612. isUdpgwChannel := sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress != "" &&
  2613. sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress ==
  2614. net.JoinHostPort(directTcpipExtraData.HostToConnect, strconv.Itoa(int(directTcpipExtraData.PortToConnect)))
  2615. if isUdpgwChannel {
  2616. // Dispatch immediately. handleUDPChannel runs the udpgw protocol in its
  2617. // own worker goroutine.
  2618. waitGroup.Add(1)
  2619. go func(channel ssh.NewChannel) {
  2620. defer waitGroup.Done()
  2621. sshClient.handleUdpgwChannel(channel)
  2622. }(newChannel)
  2623. } else {
  2624. // Dispatch via TCP port forward manager. When the queue is full, the channel
  2625. // is immediately rejected.
  2626. //
  2627. // Split tunnel logic is enabled for this TCP port forward when the client
  2628. // has enabled split tunnel mode and the channel type allows it.
  2629. doSplitTunnel := sshClient.handshakeState.splitTunnelLookup != nil && allowSplitTunnel
  2630. tcpPortForward := &newTCPPortForward{
  2631. enqueueTime: time.Now(),
  2632. hostToConnect: directTcpipExtraData.HostToConnect,
  2633. portToConnect: int(directTcpipExtraData.PortToConnect),
  2634. doSplitTunnel: doSplitTunnel,
  2635. newChannel: newChannel,
  2636. }
  2637. select {
  2638. case newTCPPortForwards <- tcpPortForward:
  2639. default:
  2640. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  2641. sshClient.rejectNewChannel(newChannel, "TCP port forward dial queue full")
  2642. }
  2643. }
  2644. }
  2645. func (sshClient *sshClient) cleanupAuthorizations() {
  2646. sshClient.Lock()
  2647. if sshClient.releaseAuthorizations != nil {
  2648. sshClient.releaseAuthorizations()
  2649. }
  2650. if sshClient.stopTimer != nil {
  2651. sshClient.stopTimer.Stop()
  2652. }
  2653. sshClient.Unlock()
  2654. }
  2655. // setPacketTunnelChannel sets the single packet tunnel channel
  2656. // for this sshClient. Any existing packet tunnel channel is
  2657. // closed.
  2658. func (sshClient *sshClient) setPacketTunnelChannel(channel ssh.Channel) {
  2659. sshClient.Lock()
  2660. if sshClient.packetTunnelChannel != nil {
  2661. sshClient.packetTunnelChannel.Close()
  2662. }
  2663. sshClient.packetTunnelChannel = channel
  2664. sshClient.totalPacketTunnelChannelCount += 1
  2665. sshClient.Unlock()
  2666. }
  2667. // setUdpgwChannelHandler sets the single udpgw channel handler for this
  2668. // sshClient. Each sshClient may have only one concurrent udpgw
  2669. // channel/handler. Each udpgw channel multiplexes many UDP port forwards via
  2670. // the udpgw protocol. Any existing udpgw channel/handler is closed.
  2671. func (sshClient *sshClient) setUdpgwChannelHandler(udpgwChannelHandler *udpgwPortForwardMultiplexer) bool {
  2672. sshClient.Lock()
  2673. if sshClient.udpgwChannelHandler != nil {
  2674. previousHandler := sshClient.udpgwChannelHandler
  2675. sshClient.udpgwChannelHandler = nil
  2676. // stop must be run without holding the sshClient mutex lock, as the
  2677. // udpgw goroutines may attempt to lock the same mutex. For example,
  2678. // udpgwPortForwardMultiplexer.run calls sshClient.establishedPortForward
  2679. // which calls sshClient.allocatePortForward.
  2680. sshClient.Unlock()
  2681. previousHandler.stop()
  2682. sshClient.Lock()
  2683. // In case some other channel has set the sshClient.udpgwChannelHandler
  2684. // in the meantime, fail. The caller should discard this channel/handler.
  2685. if sshClient.udpgwChannelHandler != nil {
  2686. sshClient.Unlock()
  2687. return false
  2688. }
  2689. }
  2690. sshClient.udpgwChannelHandler = udpgwChannelHandler
  2691. sshClient.totalUdpgwChannelCount += 1
  2692. sshClient.Unlock()
  2693. return true
  2694. }
  2695. var serverTunnelStatParams = append(
  2696. []requestParamSpec{
  2697. {"last_connected", isLastConnected, requestParamOptional},
  2698. {"establishment_duration", isIntString, requestParamOptional}},
  2699. baseAndDialParams...)
  2700. func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
  2701. sshClient.Lock()
  2702. // For in-proxy tunnel protocols, two sets of GeoIP fields are logged, one
  2703. // for the client and one for the proxy. The client GeoIP fields will
  2704. // be "None" if handshake did not complete.
  2705. logFields := getRequestLogFields(
  2706. "server_tunnel",
  2707. sshClient.sessionID,
  2708. sshClient.clientGeoIPData,
  2709. sshClient.handshakeState.authorizedAccessTypes,
  2710. sshClient.handshakeState.apiParams,
  2711. serverTunnelStatParams)
  2712. if sshClient.isInproxyTunnelProtocol {
  2713. sshClient.peerGeoIPData.SetLogFieldsWithPrefix("", "proxy", logFields)
  2714. logFields.Add(
  2715. LogFields(sshClient.handshakeState.inproxyRelayLogFields))
  2716. }
  2717. // new_tactics_tag indicates that the handshake returned new tactics.
  2718. if sshClient.handshakeState.newTacticsTag != "" {
  2719. logFields["new_tactics_tag"] = sshClient.handshakeState.newTacticsTag
  2720. }
  2721. // "relay_protocol" is sent with handshake API parameters. In pre-
  2722. // handshake logTunnel cases, this value is not yet known. As
  2723. // sshClient.tunnelProtocol is authoritative, set this value
  2724. // unconditionally, overwriting any value from handshake.
  2725. logFields["relay_protocol"] = sshClient.tunnelProtocol
  2726. if sshClient.serverPacketManipulation != "" {
  2727. logFields["server_packet_manipulation"] = sshClient.serverPacketManipulation
  2728. }
  2729. if sshClient.sshListener.BPFProgramName != "" {
  2730. logFields["server_bpf"] = sshClient.sshListener.BPFProgramName
  2731. }
  2732. logFields["is_first_tunnel_in_session"] = sshClient.isFirstTunnelInSession
  2733. logFields["handshake_completed"] = sshClient.handshakeState.completed
  2734. logFields["bytes_up_tcp"] = sshClient.tcpTrafficState.bytesUp
  2735. logFields["bytes_down_tcp"] = sshClient.tcpTrafficState.bytesDown
  2736. logFields["peak_concurrent_dialing_port_forward_count_tcp"] = sshClient.tcpTrafficState.peakConcurrentDialingPortForwardCount
  2737. logFields["peak_concurrent_port_forward_count_tcp"] = sshClient.tcpTrafficState.peakConcurrentPortForwardCount
  2738. logFields["total_port_forward_count_tcp"] = sshClient.tcpTrafficState.totalPortForwardCount
  2739. logFields["bytes_up_udp"] = sshClient.udpTrafficState.bytesUp
  2740. logFields["bytes_down_udp"] = sshClient.udpTrafficState.bytesDown
  2741. // sshClient.udpTrafficState.peakConcurrentDialingPortForwardCount isn't meaningful
  2742. logFields["peak_concurrent_port_forward_count_udp"] = sshClient.udpTrafficState.peakConcurrentPortForwardCount
  2743. logFields["total_port_forward_count_udp"] = sshClient.udpTrafficState.totalPortForwardCount
  2744. logFields["total_udpgw_channel_count"] = sshClient.totalUdpgwChannelCount
  2745. logFields["total_packet_tunnel_channel_count"] = sshClient.totalPacketTunnelChannelCount
  2746. logFields["pre_handshake_random_stream_count"] = sshClient.preHandshakeRandomStreamMetrics.count
  2747. logFields["pre_handshake_random_stream_upstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.upstreamBytes
  2748. logFields["pre_handshake_random_stream_received_upstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.receivedUpstreamBytes
  2749. logFields["pre_handshake_random_stream_downstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.downstreamBytes
  2750. logFields["pre_handshake_random_stream_sent_downstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.sentDownstreamBytes
  2751. logFields["random_stream_count"] = sshClient.postHandshakeRandomStreamMetrics.count
  2752. logFields["random_stream_upstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.upstreamBytes
  2753. logFields["random_stream_received_upstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.receivedUpstreamBytes
  2754. logFields["random_stream_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.downstreamBytes
  2755. logFields["random_stream_sent_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.sentDownstreamBytes
  2756. if sshClient.destinationBytesMetricsASN != "" {
  2757. // Check if the configured DestinationBytesMetricsASN has changed
  2758. // (or been cleared). If so, don't log and discard the accumulated
  2759. // bytes to ensure we don't continue to record stats as previously
  2760. // configured.
  2761. //
  2762. // Any counts accumulated before the DestinationBytesMetricsASN change
  2763. // are lost. At this time we can't change
  2764. // sshClient.destinationBytesMetricsASN dynamically, after a tactics
  2765. // hot reload, as there may be destination bytes port forwards that
  2766. // were in place before the change, which will continue to count.
  2767. logDestBytes := true
  2768. if sshClient.sshServer.support.ServerTacticsParametersCache != nil {
  2769. // Target this using the client, not peer, GeoIP. In the case of
  2770. // in-proxy tunnel protocols, the client GeoIP fields will be None
  2771. // if the handshake does not complete. In that case, no bytes will
  2772. // have transferred.
  2773. p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.clientGeoIPData)
  2774. if err != nil || p.IsNil() ||
  2775. sshClient.destinationBytesMetricsASN != p.String(parameters.DestinationBytesMetricsASN) {
  2776. logDestBytes = false
  2777. }
  2778. }
  2779. if logDestBytes {
  2780. bytesUpTCP := sshClient.tcpDestinationBytesMetrics.getBytesUp()
  2781. bytesDownTCP := sshClient.tcpDestinationBytesMetrics.getBytesDown()
  2782. bytesUpUDP := sshClient.udpDestinationBytesMetrics.getBytesUp()
  2783. bytesDownUDP := sshClient.udpDestinationBytesMetrics.getBytesDown()
  2784. logFields["dest_bytes_asn"] = sshClient.destinationBytesMetricsASN
  2785. logFields["dest_bytes_up_tcp"] = bytesUpTCP
  2786. logFields["dest_bytes_down_tcp"] = bytesDownTCP
  2787. logFields["dest_bytes_up_udp"] = bytesUpUDP
  2788. logFields["dest_bytes_down_udp"] = bytesDownUDP
  2789. logFields["dest_bytes"] = bytesUpTCP + bytesDownTCP + bytesUpUDP + bytesDownUDP
  2790. }
  2791. }
  2792. // Only log fields for peakMetrics when there is data recorded, otherwise
  2793. // omit the field.
  2794. if sshClient.peakMetrics.concurrentProximateAcceptedClients != nil {
  2795. logFields["peak_concurrent_proximate_accepted_clients"] = *sshClient.peakMetrics.concurrentProximateAcceptedClients
  2796. }
  2797. if sshClient.peakMetrics.concurrentProximateEstablishedClients != nil {
  2798. logFields["peak_concurrent_proximate_established_clients"] = *sshClient.peakMetrics.concurrentProximateEstablishedClients
  2799. }
  2800. if sshClient.peakMetrics.TCPPortForwardFailureRate != nil && sshClient.peakMetrics.TCPPortForwardFailureRateSampleSize != nil {
  2801. logFields["peak_tcp_port_forward_failure_rate"] = *sshClient.peakMetrics.TCPPortForwardFailureRate
  2802. logFields["peak_tcp_port_forward_failure_rate_sample_size"] = *sshClient.peakMetrics.TCPPortForwardFailureRateSampleSize
  2803. }
  2804. if sshClient.peakMetrics.DNSFailureRate != nil && sshClient.peakMetrics.DNSFailureRateSampleSize != nil {
  2805. logFields["peak_dns_failure_rate"] = *sshClient.peakMetrics.DNSFailureRate
  2806. logFields["peak_dns_failure_rate_sample_size"] = *sshClient.peakMetrics.DNSFailureRateSampleSize
  2807. }
  2808. // Pre-calculate a total-tunneled-bytes field. This total is used
  2809. // extensively in analytics and is more performant when pre-calculated.
  2810. logFields["bytes"] = sshClient.tcpTrafficState.bytesUp +
  2811. sshClient.tcpTrafficState.bytesDown +
  2812. sshClient.udpTrafficState.bytesUp +
  2813. sshClient.udpTrafficState.bytesDown
  2814. // Merge in additional metrics from the optional metrics source
  2815. for _, metrics := range additionalMetrics {
  2816. for name, value := range metrics {
  2817. // Don't overwrite any basic fields
  2818. if logFields[name] == nil {
  2819. logFields[name] = value
  2820. }
  2821. }
  2822. }
  2823. if sshClient.additionalTransportData != nil &&
  2824. sshClient.additionalTransportData.steeringIP != "" {
  2825. logFields["relayed_steering_ip"] = sshClient.additionalTransportData.steeringIP
  2826. }
  2827. // Retain lock when invoking LogRawFieldsWithTimestamp to block any
  2828. // concurrent writes to variables referenced by logFields.
  2829. log.LogRawFieldsWithTimestamp(logFields)
  2830. sshClient.Unlock()
  2831. }
  2832. var blocklistHitsStatParams = []requestParamSpec{
  2833. {"propagation_channel_id", isHexDigits, 0},
  2834. {"sponsor_id", isHexDigits, 0},
  2835. {"client_version", isIntString, requestParamLogStringAsInt},
  2836. {"client_platform", isClientPlatform, 0},
  2837. {"client_features", isAnyString, requestParamOptional | requestParamArray},
  2838. {"client_build_rev", isHexDigits, requestParamOptional},
  2839. {"device_region", isAnyString, requestParamOptional},
  2840. {"device_location", isGeoHashString, requestParamOptional},
  2841. {"egress_region", isRegionCode, requestParamOptional},
  2842. {"last_connected", isLastConnected, requestParamOptional},
  2843. }
  2844. func (sshClient *sshClient) logBlocklistHits(IP net.IP, domain string, tags []BlocklistTag) {
  2845. sshClient.Lock()
  2846. // Log this using the client, not peer, GeoIP. In the case of in-proxy
  2847. // tunnel protocols, the client GeoIP fields will be None if the
  2848. // handshake does not complete. In that case, no port forwarding will
  2849. // occur and there will not be any blocklist hits.
  2850. logFields := getRequestLogFields(
  2851. "server_blocklist_hit",
  2852. sshClient.sessionID,
  2853. sshClient.clientGeoIPData,
  2854. sshClient.handshakeState.authorizedAccessTypes,
  2855. sshClient.handshakeState.apiParams,
  2856. blocklistHitsStatParams)
  2857. // Note: see comment in logTunnel regarding unlock and concurrent access.
  2858. sshClient.Unlock()
  2859. for _, tag := range tags {
  2860. if IP != nil {
  2861. logFields["blocklist_ip_address"] = IP.String()
  2862. }
  2863. if domain != "" {
  2864. logFields["blocklist_domain"] = domain
  2865. }
  2866. logFields["blocklist_source"] = tag.Source
  2867. logFields["blocklist_subject"] = tag.Subject
  2868. log.LogRawFieldsWithTimestamp(logFields)
  2869. }
  2870. }
  2871. func (sshClient *sshClient) runOSLSender() {
  2872. for {
  2873. // Await a signal that there are SLOKs to send
  2874. // TODO: use reflect.SelectCase, and optionally await timer here?
  2875. select {
  2876. case <-sshClient.signalIssueSLOKs:
  2877. case <-sshClient.runCtx.Done():
  2878. return
  2879. }
  2880. retryDelay := SSH_SEND_OSL_INITIAL_RETRY_DELAY
  2881. for {
  2882. err := sshClient.sendOSLRequest()
  2883. if err == nil {
  2884. break
  2885. }
  2886. if !isExpectedTunnelIOError(err) {
  2887. log.WithTraceFields(LogFields{"error": err}).Warning("sendOSLRequest failed")
  2888. }
  2889. // If the request failed, retry after a delay (with exponential backoff)
  2890. // or when signaled that there are additional SLOKs to send
  2891. retryTimer := time.NewTimer(retryDelay)
  2892. select {
  2893. case <-retryTimer.C:
  2894. case <-sshClient.signalIssueSLOKs:
  2895. case <-sshClient.runCtx.Done():
  2896. retryTimer.Stop()
  2897. return
  2898. }
  2899. retryTimer.Stop()
  2900. retryDelay *= SSH_SEND_OSL_RETRY_FACTOR
  2901. }
  2902. }
  2903. }
  2904. // sendOSLRequest will invoke osl.GetSeedPayload to issue SLOKs and
  2905. // generate a payload, and send an OSL request to the client when
  2906. // there are new SLOKs in the payload.
  2907. func (sshClient *sshClient) sendOSLRequest() error {
  2908. seedPayload := sshClient.getOSLSeedPayload()
  2909. // Don't send when no SLOKs. This will happen when signalIssueSLOKs
  2910. // is received but no new SLOKs are issued.
  2911. if len(seedPayload.SLOKs) == 0 {
  2912. return nil
  2913. }
  2914. oslRequest := protocol.OSLRequest{
  2915. SeedPayload: seedPayload,
  2916. }
  2917. requestPayload, err := json.Marshal(oslRequest)
  2918. if err != nil {
  2919. return errors.Trace(err)
  2920. }
  2921. ok, _, err := sshClient.sshConn.SendRequest(
  2922. protocol.PSIPHON_API_OSL_REQUEST_NAME,
  2923. true,
  2924. requestPayload)
  2925. if err != nil {
  2926. return errors.Trace(err)
  2927. }
  2928. if !ok {
  2929. return errors.TraceNew("client rejected request")
  2930. }
  2931. sshClient.clearOSLSeedPayload()
  2932. return nil
  2933. }
  2934. // runAlertSender dequeues and sends alert requests to the client. As these
  2935. // alerts are informational, there is no retry logic and no SSH client
  2936. // acknowledgement (wantReply) is requested. This worker scheme allows
  2937. // nonconcurrent components including udpgw and packet tunnel to enqueue
  2938. // alerts without blocking their traffic processing.
  2939. func (sshClient *sshClient) runAlertSender() {
  2940. for {
  2941. select {
  2942. case <-sshClient.runCtx.Done():
  2943. return
  2944. case request := <-sshClient.sendAlertRequests:
  2945. payload, err := json.Marshal(request)
  2946. if err != nil {
  2947. log.WithTraceFields(LogFields{"error": err}).Warning("Marshal failed")
  2948. break
  2949. }
  2950. _, _, err = sshClient.sshConn.SendRequest(
  2951. protocol.PSIPHON_API_ALERT_REQUEST_NAME,
  2952. false,
  2953. payload)
  2954. if err != nil && !isExpectedTunnelIOError(err) {
  2955. log.WithTraceFields(LogFields{"error": err}).Warning("SendRequest failed")
  2956. break
  2957. }
  2958. sshClient.Lock()
  2959. sshClient.sentAlertRequests[fmt.Sprintf("%+v", request)] = true
  2960. sshClient.Unlock()
  2961. }
  2962. }
  2963. }
  2964. // enqueueAlertRequest enqueues an alert request to be sent to the client.
  2965. // Only one request is sent per tunnel per protocol.AlertRequest value;
  2966. // subsequent alerts with the same value are dropped. enqueueAlertRequest will
  2967. // not block until the queue exceeds ALERT_REQUEST_QUEUE_BUFFER_SIZE.
  2968. func (sshClient *sshClient) enqueueAlertRequest(request protocol.AlertRequest) {
  2969. sshClient.Lock()
  2970. if sshClient.sentAlertRequests[fmt.Sprintf("%+v", request)] {
  2971. sshClient.Unlock()
  2972. return
  2973. }
  2974. sshClient.Unlock()
  2975. select {
  2976. case <-sshClient.runCtx.Done():
  2977. case sshClient.sendAlertRequests <- request:
  2978. }
  2979. }
  2980. func (sshClient *sshClient) enqueueDisallowedTrafficAlertRequest() {
  2981. reason := protocol.PSIPHON_API_ALERT_DISALLOWED_TRAFFIC
  2982. actionURLs := sshClient.getAlertActionURLs(reason)
  2983. sshClient.enqueueAlertRequest(
  2984. protocol.AlertRequest{
  2985. Reason: reason,
  2986. ActionURLs: actionURLs,
  2987. })
  2988. }
  2989. func (sshClient *sshClient) enqueueUnsafeTrafficAlertRequest(tags []BlocklistTag) {
  2990. reason := protocol.PSIPHON_API_ALERT_UNSAFE_TRAFFIC
  2991. actionURLs := sshClient.getAlertActionURLs(reason)
  2992. for _, tag := range tags {
  2993. sshClient.enqueueAlertRequest(
  2994. protocol.AlertRequest{
  2995. Reason: reason,
  2996. Subject: tag.Subject,
  2997. ActionURLs: actionURLs,
  2998. })
  2999. }
  3000. }
  3001. func (sshClient *sshClient) getAlertActionURLs(alertReason string) []string {
  3002. sshClient.Lock()
  3003. sponsorID, _ := getStringRequestParam(
  3004. sshClient.handshakeState.apiParams, "sponsor_id")
  3005. clientGeoIPData := sshClient.clientGeoIPData
  3006. deviceRegion := sshClient.handshakeState.deviceRegion
  3007. sshClient.Unlock()
  3008. return sshClient.sshServer.support.PsinetDatabase.GetAlertActionURLs(
  3009. alertReason,
  3010. sponsorID,
  3011. clientGeoIPData.Country,
  3012. clientGeoIPData.ASN,
  3013. deviceRegion)
  3014. }
  3015. func (sshClient *sshClient) rejectNewChannel(newChannel ssh.NewChannel, logMessage string) {
  3016. // We always return the reject reason "Prohibited":
  3017. // - Traffic rules and connection limits may prohibit the connection.
  3018. // - External firewall rules may prohibit the connection, and this is not currently
  3019. // distinguishable from other failure modes.
  3020. // - We limit the failure information revealed to the client.
  3021. reason := ssh.Prohibited
  3022. // Note: Debug level, as logMessage may contain user traffic destination address information
  3023. if IsLogLevelDebug() {
  3024. log.WithTraceFields(
  3025. LogFields{
  3026. "channelType": newChannel.ChannelType(),
  3027. "logMessage": logMessage,
  3028. "rejectReason": reason.String(),
  3029. }).Debug("reject new channel")
  3030. }
  3031. // Note: logMessage is internal, for logging only; just the reject reason is sent to the client.
  3032. newChannel.Reject(reason, reason.String())
  3033. }
  3034. // setHandshakeState sets the handshake state -- that it completed and
  3035. // what parameters were passed -- in sshClient. This state is used for allowing
  3036. // port forwards and for future traffic rule selection. setHandshakeState
  3037. // also triggers an immediate traffic rule re-selection, as the rules selected
  3038. // upon tunnel establishment may no longer apply now that handshake values are
  3039. // set.
  3040. //
  3041. // The authorizations received from the client handshake are verified and the
  3042. // resulting list of authorized access types are applied to the client's tunnel
  3043. // and traffic rules.
  3044. //
  3045. // A list of active authorization IDs, authorized access types, and traffic
  3046. // rate limits are returned for responding to the client and logging.
  3047. //
  3048. // All slices in the returnd handshakeStateInfo are read-only, as readers may
  3049. // reference slice contents outside of locks.
  3050. func (sshClient *sshClient) setHandshakeState(
  3051. state handshakeState,
  3052. authorizations []string) (*handshakeStateInfo, error) {
  3053. sshClient.Lock()
  3054. completed := sshClient.handshakeState.completed
  3055. if !completed {
  3056. sshClient.handshakeState = state
  3057. if sshClient.isInproxyTunnelProtocol {
  3058. // Set the client GeoIP data to the value obtained using the
  3059. // original client IP, from the broker, in the handshake. Also
  3060. // update the GeoIP session hash to use the client GeoIP data.
  3061. sshClient.clientGeoIPData =
  3062. sshClient.handshakeState.inproxyClientGeoIPData
  3063. sshClient.sshServer.setGeoIPSessionCache(
  3064. sshClient.sessionID, sshClient.clientGeoIPData)
  3065. }
  3066. }
  3067. sshClient.Unlock()
  3068. // Client must only perform one handshake
  3069. if completed {
  3070. return nil, errors.TraceNew("handshake already completed")
  3071. }
  3072. // Verify the authorizations submitted by the client. Verified, active
  3073. // (non-expired) access types will be available for traffic rules
  3074. // filtering.
  3075. //
  3076. // When an authorization is active but expires while the client is
  3077. // connected, the client is disconnected to ensure the access is reset.
  3078. // This is implemented by setting a timer to perform the disconnect at the
  3079. // expiry time of the soonest expiring authorization.
  3080. //
  3081. // sshServer.authorizationSessionIDs tracks the unique mapping of active
  3082. // authorization IDs to client session IDs and is used to detect and
  3083. // prevent multiple malicious clients from reusing a single authorization
  3084. // (within the scope of this server).
  3085. // authorizationIDs and authorizedAccessTypes are returned to the client
  3086. // and logged, respectively; initialize to empty lists so the
  3087. // protocol/logs don't need to handle 'null' values.
  3088. authorizationIDs := make([]string, 0)
  3089. authorizedAccessTypes := make([]string, 0)
  3090. var stopTime time.Time
  3091. for i, authorization := range authorizations {
  3092. // This sanity check mitigates malicious clients causing excess CPU use.
  3093. if i >= MAX_AUTHORIZATIONS {
  3094. log.WithTrace().Warning("too many authorizations")
  3095. break
  3096. }
  3097. verifiedAuthorization, err := accesscontrol.VerifyAuthorization(
  3098. &sshClient.sshServer.support.Config.AccessControlVerificationKeyRing,
  3099. authorization)
  3100. if err != nil {
  3101. log.WithTraceFields(
  3102. LogFields{"error": err}).Warning("verify authorization failed")
  3103. continue
  3104. }
  3105. authorizationID := base64.StdEncoding.EncodeToString(verifiedAuthorization.ID)
  3106. if common.Contains(authorizedAccessTypes, verifiedAuthorization.AccessType) {
  3107. log.WithTraceFields(
  3108. LogFields{"accessType": verifiedAuthorization.AccessType}).Warning("duplicate authorization access type")
  3109. continue
  3110. }
  3111. authorizationIDs = append(authorizationIDs, authorizationID)
  3112. authorizedAccessTypes = append(authorizedAccessTypes, verifiedAuthorization.AccessType)
  3113. if stopTime.IsZero() || stopTime.After(verifiedAuthorization.Expires) {
  3114. stopTime = verifiedAuthorization.Expires
  3115. }
  3116. }
  3117. // Associate all verified authorizationIDs with this client's session ID.
  3118. // Handle cases where previous associations exist:
  3119. //
  3120. // - Multiple malicious clients reusing a single authorization. In this
  3121. // case, authorizations are revoked from the previous client.
  3122. //
  3123. // - The client reconnected with a new session ID due to user toggling.
  3124. // This case is expected due to server affinity. This cannot be
  3125. // distinguished from the previous case and the same action is taken;
  3126. // this will have no impact on a legitimate client as the previous
  3127. // session is dangling.
  3128. //
  3129. // - The client automatically reconnected with the same session ID. This
  3130. // case is not expected as sshServer.registerEstablishedClient
  3131. // synchronously calls sshClient.releaseAuthorizations; as a safe guard,
  3132. // this case is distinguished and no revocation action is taken.
  3133. sshClient.sshServer.authorizationSessionIDsMutex.Lock()
  3134. for _, authorizationID := range authorizationIDs {
  3135. sessionID, ok := sshClient.sshServer.authorizationSessionIDs[authorizationID]
  3136. if ok && sessionID != sshClient.sessionID {
  3137. logFields := LogFields{
  3138. "event_name": "irregular_tunnel",
  3139. "tunnel_error": "duplicate active authorization",
  3140. "duplicate_authorization_id": authorizationID,
  3141. }
  3142. // Log this using client, not peer, GeoIP data. In the case of
  3143. // in-proxy tunnel protocols, the client GeoIP fields will be None
  3144. // if a handshake does not complete. However, presense of a
  3145. // (duplicate) authorization implies that the handshake completed.
  3146. sshClient.getClientGeoIPData().SetClientLogFields(logFields)
  3147. duplicateClientGeoIPData := sshClient.sshServer.getGeoIPSessionCache(sessionID)
  3148. if duplicateClientGeoIPData != sshClient.getClientGeoIPData() {
  3149. duplicateClientGeoIPData.SetClientLogFieldsWithPrefix("duplicate_authorization_", logFields)
  3150. }
  3151. log.LogRawFieldsWithTimestamp(logFields)
  3152. // Invoke asynchronously to avoid deadlocks.
  3153. // TODO: invoke only once for each distinct sessionID?
  3154. go sshClient.sshServer.revokeClientAuthorizations(sessionID)
  3155. }
  3156. sshClient.sshServer.authorizationSessionIDs[authorizationID] = sshClient.sessionID
  3157. }
  3158. sshClient.sshServer.authorizationSessionIDsMutex.Unlock()
  3159. if len(authorizationIDs) > 0 {
  3160. sshClient.Lock()
  3161. // Make the authorizedAccessTypes available for traffic rules filtering.
  3162. sshClient.handshakeState.activeAuthorizationIDs = authorizationIDs
  3163. sshClient.handshakeState.authorizedAccessTypes = authorizedAccessTypes
  3164. // On exit, sshClient.runTunnel will call releaseAuthorizations, which
  3165. // will release the authorization IDs so the client can reconnect and
  3166. // present the same authorizations again. sshClient.runTunnel will
  3167. // also cancel the stopTimer in case it has not yet fired.
  3168. // Note: termination of the stopTimer goroutine is not synchronized.
  3169. sshClient.releaseAuthorizations = func() {
  3170. sshClient.sshServer.authorizationSessionIDsMutex.Lock()
  3171. for _, authorizationID := range authorizationIDs {
  3172. sessionID, ok := sshClient.sshServer.authorizationSessionIDs[authorizationID]
  3173. if ok && sessionID == sshClient.sessionID {
  3174. delete(sshClient.sshServer.authorizationSessionIDs, authorizationID)
  3175. }
  3176. }
  3177. sshClient.sshServer.authorizationSessionIDsMutex.Unlock()
  3178. }
  3179. sshClient.stopTimer = time.AfterFunc(
  3180. time.Until(stopTime),
  3181. func() {
  3182. sshClient.stop()
  3183. })
  3184. sshClient.Unlock()
  3185. }
  3186. upstreamBytesPerSecond, downstreamBytesPerSecond := sshClient.setTrafficRules()
  3187. sshClient.setOSLConfig()
  3188. // Set destination bytes metrics.
  3189. //
  3190. // Limitation: this is a one-time operation and doesn't get reset when
  3191. // tactics are hot-reloaded. This allows us to simply retain any
  3192. // destination byte counts accumulated and eventually log in
  3193. // server_tunnel, without having to deal with a destination change
  3194. // mid-tunnel. As typical tunnels are short, and destination changes can
  3195. // be applied gradually, handling mid-tunnel changes is not a priority.
  3196. sshClient.setDestinationBytesMetrics()
  3197. info := &handshakeStateInfo{
  3198. activeAuthorizationIDs: authorizationIDs,
  3199. authorizedAccessTypes: authorizedAccessTypes,
  3200. upstreamBytesPerSecond: upstreamBytesPerSecond,
  3201. downstreamBytesPerSecond: downstreamBytesPerSecond,
  3202. }
  3203. // Relay the steering IP to the API handshake handler.
  3204. if sshClient.additionalTransportData != nil {
  3205. info.steeringIP = sshClient.additionalTransportData.steeringIP
  3206. }
  3207. return info, nil
  3208. }
  3209. // getHandshaked returns whether the client has completed a handshake API
  3210. // request and whether the traffic rules that were selected after the
  3211. // handshake immediately exhaust the client.
  3212. //
  3213. // When the client is immediately exhausted it will be closed; but this
  3214. // takes effect asynchronously. The "exhausted" return value is used to
  3215. // prevent API requests by clients that will close.
  3216. func (sshClient *sshClient) getHandshaked() (bool, bool) {
  3217. sshClient.Lock()
  3218. defer sshClient.Unlock()
  3219. completed := sshClient.handshakeState.completed
  3220. exhausted := false
  3221. // Notes:
  3222. // - "Immediately exhausted" is when CloseAfterExhausted is set and
  3223. // either ReadUnthrottledBytes or WriteUnthrottledBytes starts from
  3224. // 0, so no bytes would be read or written. This check does not
  3225. // examine whether 0 bytes _remain_ in the ThrottledConn.
  3226. // - This check is made against the current traffic rules, which
  3227. // could have changed in a hot reload since the handshake.
  3228. if completed &&
  3229. *sshClient.trafficRules.RateLimits.CloseAfterExhausted &&
  3230. (*sshClient.trafficRules.RateLimits.ReadUnthrottledBytes == 0 ||
  3231. *sshClient.trafficRules.RateLimits.WriteUnthrottledBytes == 0) {
  3232. exhausted = true
  3233. }
  3234. return completed, exhausted
  3235. }
  3236. func (sshClient *sshClient) getDisableDiscovery() bool {
  3237. sshClient.Lock()
  3238. defer sshClient.Unlock()
  3239. return *sshClient.trafficRules.DisableDiscovery
  3240. }
  3241. func (sshClient *sshClient) updateAPIParameters(
  3242. apiParams common.APIParameters) {
  3243. sshClient.Lock()
  3244. defer sshClient.Unlock()
  3245. // Only update after handshake has initialized API params.
  3246. if !sshClient.handshakeState.completed {
  3247. return
  3248. }
  3249. for name, value := range apiParams {
  3250. sshClient.handshakeState.apiParams[name] = value
  3251. }
  3252. }
  3253. func (sshClient *sshClient) acceptDomainBytes() bool {
  3254. sshClient.Lock()
  3255. defer sshClient.Unlock()
  3256. // When the domain bytes checksum differs from the checksum sent to the
  3257. // client in the handshake response, the psinet regex configuration has
  3258. // changed. In this case, drop the stats so we don't continue to record
  3259. // stats as previously configured.
  3260. //
  3261. // Limitations:
  3262. // - The checksum comparison may result in dropping some stats for a
  3263. // domain that remains in the new configuration.
  3264. // - We don't push new regexs to the clients, so clients that remain
  3265. // connected will continue to send stats that will be dropped; and
  3266. // those clients will not send stats as newly configured until after
  3267. // reconnecting.
  3268. // - Due to the design of
  3269. // transferstats.ReportRecentBytesTransferredForServer in the client,
  3270. // the client may accumulate stats, reconnect before its next status
  3271. // request, get a new regex configuration, and then send the previously
  3272. // accumulated stats in its next status request. The checksum scheme
  3273. // won't prevent the reporting of those stats.
  3274. sponsorID, _ := getStringRequestParam(sshClient.handshakeState.apiParams, "sponsor_id")
  3275. domainBytesChecksum := sshClient.sshServer.support.PsinetDatabase.GetDomainBytesChecksum(sponsorID)
  3276. return bytes.Equal(sshClient.handshakeState.domainBytesChecksum, domainBytesChecksum)
  3277. }
  3278. // setOSLConfig resets the client's OSL seed state based on the latest OSL config
  3279. // As sshClient.oslClientSeedState may be reset by a concurrent goroutine,
  3280. // oslClientSeedState must only be accessed within the sshClient mutex.
  3281. func (sshClient *sshClient) setOSLConfig() {
  3282. sshClient.Lock()
  3283. defer sshClient.Unlock()
  3284. propagationChannelID, err := getStringRequestParam(
  3285. sshClient.handshakeState.apiParams, "propagation_channel_id")
  3286. if err != nil {
  3287. // This should not fail as long as client has sent valid handshake
  3288. return
  3289. }
  3290. // Use a cached seed state if one is found for the client's
  3291. // session ID. This enables resuming progress made in a previous
  3292. // tunnel.
  3293. // Note: go-cache is already concurency safe; the additional mutex
  3294. // is necessary to guarantee that Get/Delete is atomic; although in
  3295. // practice no two concurrent clients should ever supply the same
  3296. // session ID.
  3297. sshClient.sshServer.oslSessionCacheMutex.Lock()
  3298. oslClientSeedState, found := sshClient.sshServer.oslSessionCache.Get(sshClient.sessionID)
  3299. if found {
  3300. sshClient.sshServer.oslSessionCache.Delete(sshClient.sessionID)
  3301. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  3302. sshClient.oslClientSeedState = oslClientSeedState.(*osl.ClientSeedState)
  3303. sshClient.oslClientSeedState.Resume(sshClient.signalIssueSLOKs)
  3304. return
  3305. }
  3306. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  3307. // Two limitations when setOSLConfig() is invoked due to an
  3308. // OSL config hot reload:
  3309. //
  3310. // 1. any partial progress towards SLOKs is lost.
  3311. //
  3312. // 2. all existing osl.ClientSeedPortForwards for existing
  3313. // port forwards will not send progress to the new client
  3314. // seed state.
  3315. // Use the client, not peer, GeoIP data. In the case of in-proxy tunnel
  3316. // protocols, the client GeoIP fields will be populated using the
  3317. // original client IP already received, from the broker, in the handshake.
  3318. sshClient.oslClientSeedState = sshClient.sshServer.support.OSLConfig.NewClientSeedState(
  3319. sshClient.clientGeoIPData.Country,
  3320. propagationChannelID,
  3321. sshClient.signalIssueSLOKs)
  3322. }
  3323. // newClientSeedPortForward will return nil when no seeding is
  3324. // associated with the specified ipAddress.
  3325. func (sshClient *sshClient) newClientSeedPortForward(IPAddress net.IP) *osl.ClientSeedPortForward {
  3326. sshClient.Lock()
  3327. defer sshClient.Unlock()
  3328. // Will not be initialized before handshake.
  3329. if sshClient.oslClientSeedState == nil {
  3330. return nil
  3331. }
  3332. lookupASN := func(IP net.IP) string {
  3333. // TODO: there are potentially multiple identical geo IP lookups per new
  3334. // port forward and flow, cache and use result of first lookup.
  3335. return sshClient.sshServer.support.GeoIPService.LookupISPForIP(IP).ASN
  3336. }
  3337. return sshClient.oslClientSeedState.NewClientSeedPortForward(IPAddress, lookupASN)
  3338. }
  3339. // getOSLSeedPayload returns a payload containing all seeded SLOKs for
  3340. // this client's session.
  3341. func (sshClient *sshClient) getOSLSeedPayload() *osl.SeedPayload {
  3342. sshClient.Lock()
  3343. defer sshClient.Unlock()
  3344. // Will not be initialized before handshake.
  3345. if sshClient.oslClientSeedState == nil {
  3346. return &osl.SeedPayload{SLOKs: make([]*osl.SLOK, 0)}
  3347. }
  3348. return sshClient.oslClientSeedState.GetSeedPayload()
  3349. }
  3350. func (sshClient *sshClient) clearOSLSeedPayload() {
  3351. sshClient.Lock()
  3352. defer sshClient.Unlock()
  3353. sshClient.oslClientSeedState.ClearSeedPayload()
  3354. }
  3355. func (sshClient *sshClient) setDestinationBytesMetrics() {
  3356. sshClient.Lock()
  3357. defer sshClient.Unlock()
  3358. // Limitation: the server-side tactics cache is used to avoid the overhead
  3359. // of an additional tactics filtering per tunnel. As this cache is
  3360. // designed for GeoIP filtering only, handshake API parameters are not
  3361. // applied to tactics filtering in this case.
  3362. tacticsCache := sshClient.sshServer.support.ServerTacticsParametersCache
  3363. if tacticsCache == nil {
  3364. return
  3365. }
  3366. // Use the client, not peer, GeoIP data. In the case of in-proxy tunnel
  3367. // protocols, the client GeoIP fields will be populated using the
  3368. // original client IP already received, from the broker, in the handshake.
  3369. p, err := tacticsCache.Get(sshClient.clientGeoIPData)
  3370. if err != nil {
  3371. log.WithTraceFields(LogFields{"error": err}).Warning("get tactics failed")
  3372. return
  3373. }
  3374. if p.IsNil() {
  3375. return
  3376. }
  3377. sshClient.destinationBytesMetricsASN = p.String(parameters.DestinationBytesMetricsASN)
  3378. }
  3379. func (sshClient *sshClient) newDestinationBytesMetricsUpdater(portForwardType int, IPAddress net.IP) *destinationBytesMetrics {
  3380. sshClient.Lock()
  3381. defer sshClient.Unlock()
  3382. if sshClient.destinationBytesMetricsASN == "" {
  3383. return nil
  3384. }
  3385. if sshClient.sshServer.support.GeoIPService.LookupISPForIP(IPAddress).ASN != sshClient.destinationBytesMetricsASN {
  3386. return nil
  3387. }
  3388. if portForwardType == portForwardTypeTCP {
  3389. return &sshClient.tcpDestinationBytesMetrics
  3390. }
  3391. return &sshClient.udpDestinationBytesMetrics
  3392. }
  3393. func (sshClient *sshClient) getActivityUpdaters(portForwardType int, IPAddress net.IP) []common.ActivityUpdater {
  3394. var updaters []common.ActivityUpdater
  3395. clientSeedPortForward := sshClient.newClientSeedPortForward(IPAddress)
  3396. if clientSeedPortForward != nil {
  3397. updaters = append(updaters, clientSeedPortForward)
  3398. }
  3399. destinationBytesMetrics := sshClient.newDestinationBytesMetricsUpdater(portForwardType, IPAddress)
  3400. if destinationBytesMetrics != nil {
  3401. updaters = append(updaters, destinationBytesMetrics)
  3402. }
  3403. return updaters
  3404. }
  3405. // setTrafficRules resets the client's traffic rules based on the latest server config
  3406. // and client properties. As sshClient.trafficRules may be reset by a concurrent
  3407. // goroutine, trafficRules must only be accessed within the sshClient mutex.
  3408. func (sshClient *sshClient) setTrafficRules() (int64, int64) {
  3409. sshClient.Lock()
  3410. defer sshClient.Unlock()
  3411. isFirstTunnelInSession := sshClient.isFirstTunnelInSession &&
  3412. sshClient.handshakeState.establishedTunnelsCount == 0
  3413. // In the case of in-proxy tunnel protocols, the client GeoIP data is None
  3414. // until the handshake completes. Pre-handhake, the rate limit is
  3415. // determined by EstablishmentRead/WriteBytesPerSecond, which default to
  3416. // unthrottled, the recommended setting; in addition, no port forwards
  3417. // are permitted until after the handshake completes, at which time
  3418. // setTrafficRules will be called again with the client GeoIP data
  3419. // populated using the original client IP received from the in-proxy
  3420. // broker.
  3421. sshClient.trafficRules = sshClient.sshServer.support.TrafficRulesSet.GetTrafficRules(
  3422. isFirstTunnelInSession,
  3423. sshClient.tunnelProtocol,
  3424. sshClient.clientGeoIPData,
  3425. sshClient.handshakeState)
  3426. if sshClient.throttledConn != nil {
  3427. // Any existing throttling state is reset.
  3428. sshClient.throttledConn.SetLimits(
  3429. sshClient.trafficRules.RateLimits.CommonRateLimits(
  3430. sshClient.handshakeState.completed))
  3431. }
  3432. return *sshClient.trafficRules.RateLimits.ReadBytesPerSecond,
  3433. *sshClient.trafficRules.RateLimits.WriteBytesPerSecond
  3434. }
  3435. func (sshClient *sshClient) rateLimits() common.RateLimits {
  3436. sshClient.Lock()
  3437. defer sshClient.Unlock()
  3438. return sshClient.trafficRules.RateLimits.CommonRateLimits(
  3439. sshClient.handshakeState.completed)
  3440. }
  3441. func (sshClient *sshClient) idleTCPPortForwardTimeout() time.Duration {
  3442. sshClient.Lock()
  3443. defer sshClient.Unlock()
  3444. return time.Duration(*sshClient.trafficRules.IdleTCPPortForwardTimeoutMilliseconds) * time.Millisecond
  3445. }
  3446. func (sshClient *sshClient) idleUDPPortForwardTimeout() time.Duration {
  3447. sshClient.Lock()
  3448. defer sshClient.Unlock()
  3449. return time.Duration(*sshClient.trafficRules.IdleUDPPortForwardTimeoutMilliseconds) * time.Millisecond
  3450. }
  3451. func (sshClient *sshClient) setTCPPortForwardDialingAvailableSignal(signal context.CancelFunc) {
  3452. sshClient.Lock()
  3453. defer sshClient.Unlock()
  3454. sshClient.tcpPortForwardDialingAvailableSignal = signal
  3455. }
  3456. const (
  3457. portForwardTypeTCP = iota
  3458. portForwardTypeUDP
  3459. )
  3460. func (sshClient *sshClient) isPortForwardPermitted(
  3461. portForwardType int,
  3462. remoteIP net.IP,
  3463. port int) bool {
  3464. // Disallow connection to bogons.
  3465. //
  3466. // As a security measure, this is a failsafe. The server should be run on a
  3467. // host with correctly configured firewall rules.
  3468. //
  3469. // This check also avoids spurious disallowed traffic alerts for destinations
  3470. // that are impossible to reach.
  3471. if !sshClient.sshServer.support.Config.AllowBogons && common.IsBogon(remoteIP) {
  3472. return false
  3473. }
  3474. // Blocklist check.
  3475. //
  3476. // Limitation: isPortForwardPermitted is not called in transparent DNS
  3477. // forwarding cases. As the destination IP address is rewritten in these
  3478. // cases, a blocklist entry won't be dialed in any case. However, no logs
  3479. // will be recorded.
  3480. if !sshClient.isIPPermitted(remoteIP) {
  3481. return false
  3482. }
  3483. // Don't lock before calling logBlocklistHits.
  3484. // Unlock before calling enqueueDisallowedTrafficAlertRequest/log.
  3485. sshClient.Lock()
  3486. allowed := true
  3487. // Client must complete handshake before port forwards are permitted.
  3488. if !sshClient.handshakeState.completed {
  3489. allowed = false
  3490. }
  3491. if allowed {
  3492. // Traffic rules checks.
  3493. switch portForwardType {
  3494. case portForwardTypeTCP:
  3495. if !sshClient.trafficRules.AllowTCPPort(
  3496. sshClient.sshServer.support.GeoIPService, remoteIP, port) {
  3497. allowed = false
  3498. }
  3499. case portForwardTypeUDP:
  3500. if !sshClient.trafficRules.AllowUDPPort(
  3501. sshClient.sshServer.support.GeoIPService, remoteIP, port) {
  3502. allowed = false
  3503. }
  3504. }
  3505. }
  3506. sshClient.Unlock()
  3507. if allowed {
  3508. return true
  3509. }
  3510. switch portForwardType {
  3511. case portForwardTypeTCP:
  3512. sshClient.updateQualityMetricsWithTCPRejectedDisallowed()
  3513. case portForwardTypeUDP:
  3514. sshClient.updateQualityMetricsWithUDPRejectedDisallowed()
  3515. }
  3516. sshClient.enqueueDisallowedTrafficAlertRequest()
  3517. if IsLogLevelDebug() {
  3518. log.WithTraceFields(
  3519. LogFields{
  3520. "type": portForwardType,
  3521. "port": port,
  3522. }).Debug("port forward denied by traffic rules")
  3523. }
  3524. return false
  3525. }
  3526. // isDomainPermitted returns true when the specified domain may be resolved
  3527. // and returns false and a reject reason otherwise.
  3528. func (sshClient *sshClient) isDomainPermitted(domain string) (bool, string) {
  3529. // We're not doing comprehensive validation, to avoid overhead per port
  3530. // forward. This is a simple sanity check to ensure we don't process
  3531. // blantantly invalid input.
  3532. //
  3533. // TODO: validate with dns.IsDomainName?
  3534. if len(domain) > 255 {
  3535. return false, "invalid domain name"
  3536. }
  3537. // Don't even attempt to resolve the default mDNS top-level domain.
  3538. // Non-default cases won't be caught here but should fail to resolve due
  3539. // to the PreferGo setting in net.Resolver.
  3540. if strings.HasSuffix(domain, ".local") {
  3541. return false, "port forward not permitted"
  3542. }
  3543. tags := sshClient.sshServer.support.Blocklist.LookupDomain(domain)
  3544. if len(tags) > 0 {
  3545. sshClient.logBlocklistHits(nil, domain, tags)
  3546. if sshClient.sshServer.support.Config.BlocklistActive {
  3547. // Actively alert and block
  3548. sshClient.enqueueUnsafeTrafficAlertRequest(tags)
  3549. return false, "port forward not permitted"
  3550. }
  3551. }
  3552. return true, ""
  3553. }
  3554. func (sshClient *sshClient) isIPPermitted(remoteIP net.IP) bool {
  3555. tags := sshClient.sshServer.support.Blocklist.LookupIP(remoteIP)
  3556. if len(tags) > 0 {
  3557. sshClient.logBlocklistHits(remoteIP, "", tags)
  3558. if sshClient.sshServer.support.Config.BlocklistActive {
  3559. // Actively alert and block
  3560. sshClient.enqueueUnsafeTrafficAlertRequest(tags)
  3561. return false
  3562. }
  3563. }
  3564. return true
  3565. }
  3566. func (sshClient *sshClient) isTCPDialingPortForwardLimitExceeded() bool {
  3567. sshClient.Lock()
  3568. defer sshClient.Unlock()
  3569. state := &sshClient.tcpTrafficState
  3570. max := *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  3571. if max > 0 && state.concurrentDialingPortForwardCount >= int64(max) {
  3572. return true
  3573. }
  3574. return false
  3575. }
  3576. func (sshClient *sshClient) getTCPPortForwardQueueSize() int {
  3577. sshClient.Lock()
  3578. defer sshClient.Unlock()
  3579. return *sshClient.trafficRules.MaxTCPPortForwardCount +
  3580. *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  3581. }
  3582. func (sshClient *sshClient) getDialTCPPortForwardTimeoutMilliseconds() int {
  3583. sshClient.Lock()
  3584. defer sshClient.Unlock()
  3585. return *sshClient.trafficRules.DialTCPPortForwardTimeoutMilliseconds
  3586. }
  3587. func (sshClient *sshClient) dialingTCPPortForward() {
  3588. sshClient.Lock()
  3589. defer sshClient.Unlock()
  3590. state := &sshClient.tcpTrafficState
  3591. state.concurrentDialingPortForwardCount += 1
  3592. if state.concurrentDialingPortForwardCount > state.peakConcurrentDialingPortForwardCount {
  3593. state.peakConcurrentDialingPortForwardCount = state.concurrentDialingPortForwardCount
  3594. }
  3595. }
  3596. func (sshClient *sshClient) abortedTCPPortForward() {
  3597. sshClient.Lock()
  3598. defer sshClient.Unlock()
  3599. sshClient.tcpTrafficState.concurrentDialingPortForwardCount -= 1
  3600. }
  3601. func (sshClient *sshClient) allocatePortForward(portForwardType int) bool {
  3602. sshClient.Lock()
  3603. defer sshClient.Unlock()
  3604. // Check if at port forward limit. The subsequent counter
  3605. // changes must be atomic with the limit check to ensure
  3606. // the counter never exceeds the limit in the case of
  3607. // concurrent allocations.
  3608. var max int
  3609. var state *trafficState
  3610. if portForwardType == portForwardTypeTCP {
  3611. max = *sshClient.trafficRules.MaxTCPPortForwardCount
  3612. state = &sshClient.tcpTrafficState
  3613. } else {
  3614. max = *sshClient.trafficRules.MaxUDPPortForwardCount
  3615. state = &sshClient.udpTrafficState
  3616. }
  3617. if max > 0 && state.concurrentPortForwardCount >= int64(max) {
  3618. return false
  3619. }
  3620. // Update port forward counters.
  3621. if portForwardType == portForwardTypeTCP {
  3622. // Assumes TCP port forwards called dialingTCPPortForward
  3623. state.concurrentDialingPortForwardCount -= 1
  3624. if sshClient.tcpPortForwardDialingAvailableSignal != nil {
  3625. max := *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  3626. if max <= 0 || state.concurrentDialingPortForwardCount < int64(max) {
  3627. sshClient.tcpPortForwardDialingAvailableSignal()
  3628. }
  3629. }
  3630. }
  3631. state.concurrentPortForwardCount += 1
  3632. if state.concurrentPortForwardCount > state.peakConcurrentPortForwardCount {
  3633. state.peakConcurrentPortForwardCount = state.concurrentPortForwardCount
  3634. }
  3635. state.totalPortForwardCount += 1
  3636. return true
  3637. }
  3638. // establishedPortForward increments the concurrent port
  3639. // forward counter. closedPortForward decrements it, so it
  3640. // must always be called for each establishedPortForward
  3641. // call.
  3642. //
  3643. // When at the limit of established port forwards, the LRU
  3644. // existing port forward is closed to make way for the newly
  3645. // established one. There can be a minor delay as, in addition
  3646. // to calling Close() on the port forward net.Conn,
  3647. // establishedPortForward waits for the LRU's closedPortForward()
  3648. // call which will decrement the concurrent counter. This
  3649. // ensures all resources associated with the LRU (socket,
  3650. // goroutine) are released or will very soon be released before
  3651. // proceeding.
  3652. func (sshClient *sshClient) establishedPortForward(
  3653. portForwardType int, portForwardLRU *common.LRUConns) {
  3654. // Do not lock sshClient here.
  3655. var state *trafficState
  3656. if portForwardType == portForwardTypeTCP {
  3657. state = &sshClient.tcpTrafficState
  3658. } else {
  3659. state = &sshClient.udpTrafficState
  3660. }
  3661. // When the maximum number of port forwards is already
  3662. // established, close the LRU. CloseOldest will call
  3663. // Close on the port forward net.Conn. Both TCP and
  3664. // UDP port forwards have handler goroutines that may
  3665. // be blocked calling Read on the net.Conn. Close will
  3666. // eventually interrupt the Read and cause the handlers
  3667. // to exit, but not immediately. So the following logic
  3668. // waits for a LRU handler to be interrupted and signal
  3669. // availability.
  3670. //
  3671. // Notes:
  3672. //
  3673. // - the port forward limit can change via a traffic
  3674. // rules hot reload; the condition variable handles
  3675. // this case whereas a channel-based semaphore would
  3676. // not.
  3677. //
  3678. // - if a number of goroutines exceeding the total limit
  3679. // arrive here all concurrently, some CloseOldest() calls
  3680. // will have no effect as there can be less existing port
  3681. // forwards than new ones. In this case, the new port
  3682. // forward will be delayed. This is highly unlikely in
  3683. // practise since UDP calls to establishedPortForward are
  3684. // serialized and TCP calls are limited by the dial
  3685. // queue/count.
  3686. if !sshClient.allocatePortForward(portForwardType) {
  3687. portForwardLRU.CloseOldest()
  3688. if IsLogLevelDebug() {
  3689. log.WithTrace().Debug("closed LRU port forward")
  3690. }
  3691. state.availablePortForwardCond.L.Lock()
  3692. for !sshClient.allocatePortForward(portForwardType) {
  3693. state.availablePortForwardCond.Wait()
  3694. }
  3695. state.availablePortForwardCond.L.Unlock()
  3696. }
  3697. }
  3698. func (sshClient *sshClient) closedPortForward(
  3699. portForwardType int, bytesUp, bytesDown int64) {
  3700. sshClient.Lock()
  3701. var state *trafficState
  3702. if portForwardType == portForwardTypeTCP {
  3703. state = &sshClient.tcpTrafficState
  3704. } else {
  3705. state = &sshClient.udpTrafficState
  3706. }
  3707. state.concurrentPortForwardCount -= 1
  3708. state.bytesUp += bytesUp
  3709. state.bytesDown += bytesDown
  3710. sshClient.Unlock()
  3711. // Signal any goroutine waiting in establishedPortForward
  3712. // that an established port forward slot is available.
  3713. state.availablePortForwardCond.Signal()
  3714. }
  3715. func (sshClient *sshClient) updateQualityMetricsWithDialResult(
  3716. tcpPortForwardDialSuccess bool, dialDuration time.Duration, IP net.IP) {
  3717. sshClient.Lock()
  3718. defer sshClient.Unlock()
  3719. if tcpPortForwardDialSuccess {
  3720. sshClient.qualityMetrics.TCPPortForwardDialedCount += 1
  3721. sshClient.qualityMetrics.TCPPortForwardDialedDuration += dialDuration
  3722. if IP.To4() != nil {
  3723. sshClient.qualityMetrics.TCPIPv4PortForwardDialedCount += 1
  3724. sshClient.qualityMetrics.TCPIPv4PortForwardDialedDuration += dialDuration
  3725. } else if IP != nil {
  3726. sshClient.qualityMetrics.TCPIPv6PortForwardDialedCount += 1
  3727. sshClient.qualityMetrics.TCPIPv6PortForwardDialedDuration += dialDuration
  3728. }
  3729. } else {
  3730. sshClient.qualityMetrics.TCPPortForwardFailedCount += 1
  3731. sshClient.qualityMetrics.TCPPortForwardFailedDuration += dialDuration
  3732. if IP.To4() != nil {
  3733. sshClient.qualityMetrics.TCPIPv4PortForwardFailedCount += 1
  3734. sshClient.qualityMetrics.TCPIPv4PortForwardFailedDuration += dialDuration
  3735. } else if IP != nil {
  3736. sshClient.qualityMetrics.TCPIPv6PortForwardFailedCount += 1
  3737. sshClient.qualityMetrics.TCPIPv6PortForwardFailedDuration += dialDuration
  3738. }
  3739. }
  3740. }
  3741. func (sshClient *sshClient) updateQualityMetricsWithRejectedDialingLimit() {
  3742. sshClient.Lock()
  3743. defer sshClient.Unlock()
  3744. sshClient.qualityMetrics.TCPPortForwardRejectedDialingLimitCount += 1
  3745. }
  3746. func (sshClient *sshClient) updateQualityMetricsWithTCPRejectedDisallowed() {
  3747. sshClient.Lock()
  3748. defer sshClient.Unlock()
  3749. sshClient.qualityMetrics.TCPPortForwardRejectedDisallowedCount += 1
  3750. }
  3751. func (sshClient *sshClient) updateQualityMetricsWithUDPRejectedDisallowed() {
  3752. sshClient.Lock()
  3753. defer sshClient.Unlock()
  3754. sshClient.qualityMetrics.UDPPortForwardRejectedDisallowedCount += 1
  3755. }
  3756. func (sshClient *sshClient) updateQualityMetricsWithDNSResult(
  3757. success bool, duration time.Duration, resolverIP net.IP) {
  3758. sshClient.Lock()
  3759. defer sshClient.Unlock()
  3760. resolver := ""
  3761. if resolverIP != nil {
  3762. resolver = resolverIP.String()
  3763. }
  3764. if success {
  3765. sshClient.qualityMetrics.DNSCount["ALL"] += 1
  3766. sshClient.qualityMetrics.DNSDuration["ALL"] += duration
  3767. if resolver != "" {
  3768. sshClient.qualityMetrics.DNSCount[resolver] += 1
  3769. sshClient.qualityMetrics.DNSDuration[resolver] += duration
  3770. }
  3771. } else {
  3772. sshClient.qualityMetrics.DNSFailedCount["ALL"] += 1
  3773. sshClient.qualityMetrics.DNSFailedDuration["ALL"] += duration
  3774. if resolver != "" {
  3775. sshClient.qualityMetrics.DNSFailedCount[resolver] += 1
  3776. sshClient.qualityMetrics.DNSFailedDuration[resolver] += duration
  3777. }
  3778. }
  3779. }
  3780. func (sshClient *sshClient) handleTCPChannel(
  3781. remainingDialTimeout time.Duration,
  3782. hostToConnect string,
  3783. portToConnect int,
  3784. doSplitTunnel bool,
  3785. newChannel ssh.NewChannel) {
  3786. // Assumptions:
  3787. // - sshClient.dialingTCPPortForward() has been called
  3788. // - remainingDialTimeout > 0
  3789. established := false
  3790. defer func() {
  3791. if !established {
  3792. sshClient.abortedTCPPortForward()
  3793. }
  3794. }()
  3795. // Validate the domain name and check the domain blocklist before dialing.
  3796. //
  3797. // The IP blocklist is checked in isPortForwardPermitted, which also provides
  3798. // IP blocklist checking for the packet tunnel code path. When hostToConnect
  3799. // is an IP address, the following hostname resolution step effectively
  3800. // performs no actions and next immediate step is the isPortForwardPermitted
  3801. // check.
  3802. //
  3803. // Limitation: this case handles port forwards where the client sends the
  3804. // destination domain in the SSH port forward request but does not currently
  3805. // handle DNS-over-TCP; in the DNS-over-TCP case, a client may bypass the
  3806. // block list check.
  3807. if net.ParseIP(hostToConnect) == nil {
  3808. ok, rejectMessage := sshClient.isDomainPermitted(hostToConnect)
  3809. if !ok {
  3810. // Note: not recording a port forward failure in this case
  3811. sshClient.rejectNewChannel(newChannel, rejectMessage)
  3812. return
  3813. }
  3814. }
  3815. // Dial the remote address.
  3816. //
  3817. // Hostname resolution is performed explicitly, as a separate step, as the
  3818. // target IP address is used for traffic rules (AllowSubnets), OSL seed
  3819. // progress, and IP address blocklists.
  3820. //
  3821. // Contexts are used for cancellation (via sshClient.runCtx, which is
  3822. // cancelled when the client is stopping) and timeouts.
  3823. dialStartTime := time.Now()
  3824. IP := net.ParseIP(hostToConnect)
  3825. if IP == nil {
  3826. // Resolve the hostname
  3827. // PreferGo, equivalent to GODEBUG=netdns=go, is specified in order to
  3828. // avoid any cases where Go's resolver fails over to the cgo-based
  3829. // resolver (see https://pkg.go.dev/net#hdr-Name_Resolution). Such
  3830. // cases, if they resolve at all, may be expected to resolve to bogon
  3831. // IPs that won't be permitted; but the cgo invocation will consume
  3832. // an OS thread, which is a performance hit we can avoid.
  3833. if IsLogLevelDebug() {
  3834. log.WithTraceFields(LogFields{"hostToConnect": hostToConnect}).Debug("resolving")
  3835. }
  3836. ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  3837. IPs, err := (&net.Resolver{PreferGo: true}).LookupIPAddr(ctx, hostToConnect)
  3838. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  3839. resolveElapsedTime := time.Since(dialStartTime)
  3840. // Record DNS metrics. If LookupIPAddr returns net.DNSError.IsNotFound, this
  3841. // is "no such host" and not a DNS failure. Limitation: the resolver IP is
  3842. // not known.
  3843. dnsErr, ok := err.(*net.DNSError)
  3844. dnsNotFound := ok && dnsErr.IsNotFound
  3845. dnsSuccess := err == nil || dnsNotFound
  3846. sshClient.updateQualityMetricsWithDNSResult(dnsSuccess, resolveElapsedTime, nil)
  3847. // IPv4 is preferred in case the host has limited IPv6 routing. IPv6 is
  3848. // selected and attempted only when there's no IPv4 option.
  3849. // TODO: shuffle list to try other IPs?
  3850. for _, ip := range IPs {
  3851. if ip.IP.To4() != nil {
  3852. IP = ip.IP
  3853. break
  3854. }
  3855. }
  3856. if IP == nil && len(IPs) > 0 {
  3857. // If there are no IPv4 IPs, the first IP is IPv6.
  3858. IP = IPs[0].IP
  3859. }
  3860. if err == nil && IP == nil {
  3861. err = std_errors.New("no IP address")
  3862. }
  3863. if err != nil {
  3864. // Record a port forward failure
  3865. sshClient.updateQualityMetricsWithDialResult(false, resolveElapsedTime, IP)
  3866. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("LookupIP failed: %s", err))
  3867. return
  3868. }
  3869. remainingDialTimeout -= resolveElapsedTime
  3870. }
  3871. if remainingDialTimeout <= 0 {
  3872. sshClient.rejectNewChannel(newChannel, "TCP port forward timed out resolving")
  3873. return
  3874. }
  3875. // When the client has indicated split tunnel mode and when the channel is
  3876. // not of type protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE, check if the
  3877. // client and the port forward destination are in the same GeoIP country. If
  3878. // so, reject the port forward with a distinct response code that indicates
  3879. // to the client that this port forward should be performed locally, direct
  3880. // and untunneled.
  3881. //
  3882. // Clients are expected to cache untunneled responses to avoid this round
  3883. // trip in the immediate future and reduce server load.
  3884. //
  3885. // When the countries differ, immediately proceed with the standard port
  3886. // forward. No additional round trip is required.
  3887. //
  3888. // If either GeoIP country is "None", one or both countries are unknown
  3889. // and there is no match.
  3890. //
  3891. // Traffic rules, such as allowed ports, are not enforced for port forward
  3892. // destinations classified as untunneled.
  3893. //
  3894. // Domain and IP blocklists still apply to port forward destinations
  3895. // classified as untunneled.
  3896. //
  3897. // The client's use of split tunnel mode is logged in server_tunnel metrics
  3898. // as the boolean value split_tunnel. As they may indicate some information
  3899. // about browsing activity, no other split tunnel metrics are logged.
  3900. if doSplitTunnel {
  3901. destinationGeoIPData := sshClient.sshServer.support.GeoIPService.LookupIP(IP)
  3902. // Use the client, not peer, GeoIP data. In the case of in-proxy tunnel
  3903. // protocols, the client GeoIP fields will be populated using the
  3904. // original client IP already received, from the broker, in the handshake.
  3905. clientGeoIPData := sshClient.getClientGeoIPData()
  3906. if clientGeoIPData.Country != GEOIP_UNKNOWN_VALUE &&
  3907. sshClient.handshakeState.splitTunnelLookup.lookup(
  3908. destinationGeoIPData.Country) {
  3909. // Since isPortForwardPermitted is not called in this case, explicitly call
  3910. // ipBlocklistCheck. The domain blocklist case is handled above.
  3911. if !sshClient.isIPPermitted(IP) {
  3912. // Note: not recording a port forward failure in this case
  3913. sshClient.rejectNewChannel(newChannel, "port forward not permitted")
  3914. return
  3915. }
  3916. newChannel.Reject(protocol.CHANNEL_REJECT_REASON_SPLIT_TUNNEL, "")
  3917. return
  3918. }
  3919. }
  3920. // Enforce traffic rules, using the resolved IP address.
  3921. if !sshClient.isPortForwardPermitted(
  3922. portForwardTypeTCP, IP, portToConnect) {
  3923. // Note: not recording a port forward failure in this case
  3924. sshClient.rejectNewChannel(newChannel, "port forward not permitted")
  3925. return
  3926. }
  3927. // TCP dial.
  3928. remoteAddr := net.JoinHostPort(IP.String(), strconv.Itoa(portToConnect))
  3929. if IsLogLevelDebug() {
  3930. log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("dialing")
  3931. }
  3932. ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  3933. fwdConn, err := (&net.Dialer{}).DialContext(ctx, "tcp", remoteAddr)
  3934. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  3935. // Record port forward success or failure
  3936. sshClient.updateQualityMetricsWithDialResult(err == nil, time.Since(dialStartTime), IP)
  3937. if err != nil {
  3938. // Monitor for low resource error conditions
  3939. sshClient.sshServer.monitorPortForwardDialError(err)
  3940. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("DialTimeout failed: %s", err))
  3941. return
  3942. }
  3943. // The upstream TCP port forward connection has been established. Schedule
  3944. // some cleanup and notify the SSH client that the channel is accepted.
  3945. defer fwdConn.Close()
  3946. fwdChannel, requests, err := newChannel.Accept()
  3947. if err != nil {
  3948. if !isExpectedTunnelIOError(err) {
  3949. log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")
  3950. }
  3951. return
  3952. }
  3953. go ssh.DiscardRequests(requests)
  3954. defer fwdChannel.Close()
  3955. // Release the dialing slot and acquire an established slot.
  3956. //
  3957. // establishedPortForward increments the concurrent TCP port
  3958. // forward counter and closes the LRU existing TCP port forward
  3959. // when already at the limit.
  3960. //
  3961. // Known limitations:
  3962. //
  3963. // - Closed LRU TCP sockets will enter the TIME_WAIT state,
  3964. // continuing to consume some resources.
  3965. sshClient.establishedPortForward(portForwardTypeTCP, sshClient.tcpPortForwardLRU)
  3966. // "established = true" cancels the deferred abortedTCPPortForward()
  3967. established = true
  3968. // TODO: 64-bit alignment? https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  3969. var bytesUp, bytesDown int64
  3970. defer func() {
  3971. sshClient.closedPortForward(
  3972. portForwardTypeTCP, atomic.LoadInt64(&bytesUp), atomic.LoadInt64(&bytesDown))
  3973. }()
  3974. lruEntry := sshClient.tcpPortForwardLRU.Add(fwdConn)
  3975. defer lruEntry.Remove()
  3976. // ActivityMonitoredConn monitors the TCP port forward I/O and updates
  3977. // its LRU status. ActivityMonitoredConn also times out I/O on the port
  3978. // forward if both reads and writes have been idle for the specified
  3979. // duration.
  3980. fwdConn, err = common.NewActivityMonitoredConn(
  3981. fwdConn,
  3982. sshClient.idleTCPPortForwardTimeout(),
  3983. true,
  3984. lruEntry,
  3985. sshClient.getActivityUpdaters(portForwardTypeTCP, IP)...)
  3986. if err != nil {
  3987. log.WithTraceFields(LogFields{"error": err}).Error("NewActivityMonitoredConn failed")
  3988. return
  3989. }
  3990. // Relay channel to forwarded connection.
  3991. if IsLogLevelDebug() {
  3992. log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("relaying")
  3993. }
  3994. // TODO: relay errors to fwdChannel.Stderr()?
  3995. relayWaitGroup := new(sync.WaitGroup)
  3996. relayWaitGroup.Add(1)
  3997. go func() {
  3998. defer relayWaitGroup.Done()
  3999. // io.Copy allocates a 32K temporary buffer, and each port forward relay
  4000. // uses two of these buffers; using common.CopyBuffer with a smaller buffer
  4001. // reduces the overall memory footprint.
  4002. bytes, err := common.CopyBuffer(
  4003. fwdChannel, fwdConn, make([]byte, SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE))
  4004. atomic.AddInt64(&bytesDown, bytes)
  4005. if err != nil && err != io.EOF {
  4006. // Debug since errors such as "connection reset by peer" occur during normal operation
  4007. if IsLogLevelDebug() {
  4008. log.WithTraceFields(LogFields{"error": err}).Debug("downstream TCP relay failed")
  4009. }
  4010. }
  4011. // Interrupt upstream io.Copy when downstream is shutting down.
  4012. // TODO: this is done to quickly cleanup the port forward when
  4013. // fwdConn has a read timeout, but is it clean -- upstream may still
  4014. // be flowing?
  4015. fwdChannel.Close()
  4016. }()
  4017. bytes, err := common.CopyBuffer(
  4018. fwdConn, fwdChannel, make([]byte, SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE))
  4019. atomic.AddInt64(&bytesUp, bytes)
  4020. if err != nil && err != io.EOF {
  4021. if IsLogLevelDebug() {
  4022. log.WithTraceFields(LogFields{"error": err}).Debug("upstream TCP relay failed")
  4023. }
  4024. }
  4025. // Shutdown special case: fwdChannel will be closed and return EOF when
  4026. // the SSH connection is closed, but we need to explicitly close fwdConn
  4027. // to interrupt the downstream io.Copy, which may be blocked on a
  4028. // fwdConn.Read().
  4029. fwdConn.Close()
  4030. relayWaitGroup.Wait()
  4031. if IsLogLevelDebug() {
  4032. log.WithTraceFields(
  4033. LogFields{
  4034. "remoteAddr": remoteAddr,
  4035. "bytesUp": atomic.LoadInt64(&bytesUp),
  4036. "bytesDown": atomic.LoadInt64(&bytesDown)}).Debug("exiting")
  4037. }
  4038. }