tunnelServer.go 171 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743474447454746474747484749475047514752475347544755475647574758475947604761476247634764476547664767476847694770477147724773477447754776477747784779478047814782478347844785478647874788478947904791479247934794479547964797479847994800480148024803480448054806480748084809481048114812481348144815481648174818481948204821482248234824482548264827482848294830483148324833483448354836483748384839484048414842484348444845484648474848484948504851485248534854485548564857485848594860486148624863486448654866486748684869487048714872487348744875487648774878487948804881488248834884488548864887488848894890489148924893489448954896489748984899490049014902490349044905490649074908490949104911491249134914491549164917491849194920492149224923492449254926492749284929493049314932493349344935493649374938493949404941494249434944494549464947494849494950495149524953495449554956495749584959496049614962496349644965496649674968496949704971497249734974497549764977497849794980498149824983498449854986498749884989499049914992499349944995499649974998499950005001500250035004500550065007500850095010501150125013501450155016501750185019502050215022502350245025502650275028502950305031503250335034503550365037503850395040504150425043504450455046504750485049505050515052505350545055505650575058505950605061
  1. /*
  2. * Copyright (c) 2016, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "bytes"
  22. "context"
  23. "crypto/rand"
  24. "crypto/subtle"
  25. "encoding/base64"
  26. "encoding/json"
  27. std_errors "errors"
  28. "fmt"
  29. "io"
  30. "io/ioutil"
  31. "net"
  32. "strconv"
  33. "strings"
  34. "sync"
  35. "sync/atomic"
  36. "syscall"
  37. "time"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/accesscontrol"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/ssh"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/monotime"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
  45. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/osl"
  46. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  47. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  48. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  49. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  50. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/refraction"
  51. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/stacktrace"
  52. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
  53. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/transforms"
  54. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tun"
  55. "github.com/marusama/semaphore"
  56. cache "github.com/patrickmn/go-cache"
  57. )
  58. const (
  59. SSH_AUTH_LOG_PERIOD = 30 * time.Minute
  60. SSH_HANDSHAKE_TIMEOUT = 30 * time.Second
  61. SSH_BEGIN_HANDSHAKE_TIMEOUT = 1 * time.Second
  62. SSH_CONNECTION_READ_DEADLINE = 5 * time.Minute
  63. SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE = 8192
  64. SSH_TCP_PORT_FORWARD_QUEUE_SIZE = 1024
  65. SSH_KEEP_ALIVE_PAYLOAD_MIN_BYTES = 0
  66. SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES = 256
  67. SSH_SEND_OSL_INITIAL_RETRY_DELAY = 30 * time.Second
  68. SSH_SEND_OSL_RETRY_FACTOR = 2
  69. GEOIP_SESSION_CACHE_TTL = 60 * time.Minute
  70. OSL_SESSION_CACHE_TTL = 5 * time.Minute
  71. MAX_AUTHORIZATIONS = 16
  72. PRE_HANDSHAKE_RANDOM_STREAM_MAX_COUNT = 1
  73. RANDOM_STREAM_MAX_BYTES = 10485760
  74. ALERT_REQUEST_QUEUE_BUFFER_SIZE = 16
  75. SSH_MAX_CLIENT_COUNT = 100000
  76. )
  77. // TunnelServer is the main server that accepts Psiphon client
  78. // connections, via various obfuscation protocols, and provides
  79. // port forwarding (TCP and UDP) services to the Psiphon client.
  80. // At its core, TunnelServer is an SSH server. SSH is the base
  81. // protocol that provides port forward multiplexing, and transport
  82. // security. Layered on top of SSH, optionally, is Obfuscated SSH
  83. // and meek protocols, which provide further circumvention
  84. // capabilities.
  85. type TunnelServer struct {
  86. runWaitGroup *sync.WaitGroup
  87. listenerError chan error
  88. shutdownBroadcast <-chan struct{}
  89. sshServer *sshServer
  90. }
  91. type sshListener struct {
  92. net.Listener
  93. localAddress string
  94. tunnelProtocol string
  95. port int
  96. BPFProgramName string
  97. }
  98. // NewTunnelServer initializes a new tunnel server.
  99. func NewTunnelServer(
  100. support *SupportServices,
  101. shutdownBroadcast <-chan struct{}) (*TunnelServer, error) {
  102. sshServer, err := newSSHServer(support, shutdownBroadcast)
  103. if err != nil {
  104. return nil, errors.Trace(err)
  105. }
  106. return &TunnelServer{
  107. runWaitGroup: new(sync.WaitGroup),
  108. listenerError: make(chan error),
  109. shutdownBroadcast: shutdownBroadcast,
  110. sshServer: sshServer,
  111. }, nil
  112. }
  113. // Run runs the tunnel server; this function blocks while running a selection of
  114. // listeners that handle connections using various obfuscation protocols.
  115. //
  116. // Run listens on each designated tunnel port and spawns new goroutines to handle
  117. // each client connection. It halts when shutdownBroadcast is signaled. A list of active
  118. // clients is maintained, and when halting all clients are cleanly shutdown.
  119. //
  120. // Each client goroutine handles its own obfuscation (optional), SSH handshake, SSH
  121. // authentication, and then looping on client new channel requests. "direct-tcpip"
  122. // channels, dynamic port fowards, are supported. When the UDPInterceptUdpgwServerAddress
  123. // config parameter is configured, UDP port forwards over a TCP stream, following
  124. // the udpgw protocol, are handled.
  125. //
  126. // A new goroutine is spawned to handle each port forward for each client. Each port
  127. // forward tracks its bytes transferred. Overall per-client stats for connection duration,
  128. // GeoIP, number of port forwards, and bytes transferred are tracked and logged when the
  129. // client shuts down.
  130. //
  131. // Note: client handler goroutines may still be shutting down after Run() returns. See
  132. // comment in sshClient.stop(). TODO: fully synchronized shutdown.
  133. func (server *TunnelServer) Run() error {
  134. // TODO: should TunnelServer hold its own support pointer?
  135. support := server.sshServer.support
  136. // First bind all listeners; once all are successful,
  137. // start accepting connections on each.
  138. var listeners []*sshListener
  139. for tunnelProtocol, listenPort := range support.Config.TunnelProtocolPorts {
  140. localAddress := net.JoinHostPort(
  141. support.Config.ServerIPAddress, strconv.Itoa(listenPort))
  142. var listener net.Listener
  143. var BPFProgramName string
  144. var err error
  145. if protocol.TunnelProtocolUsesFrontedMeekQUIC(tunnelProtocol) {
  146. // For FRONTED-MEEK-QUIC-OSSH, no listener implemented. The edge-to-server
  147. // hop uses HTTPS and the client tunnel protocol is distinguished using
  148. // protocol.MeekCookieData.ClientTunnelProtocol.
  149. continue
  150. } else if protocol.TunnelProtocolUsesQUIC(tunnelProtocol) {
  151. // in-proxy QUIC tunnel protocols don't support gQUIC.
  152. enableGQUIC := support.Config.EnableGQUIC && !protocol.TunnelProtocolUsesInproxy(tunnelProtocol)
  153. logTunnelProtocol := tunnelProtocol
  154. listener, err = quic.Listen(
  155. CommonLogger(log),
  156. func(peerAddress string, err error, logFields common.LogFields) {
  157. logIrregularTunnel(
  158. support, logTunnelProtocol, listenPort, peerAddress,
  159. errors.Trace(err), LogFields(logFields))
  160. },
  161. localAddress,
  162. support.Config.ObfuscatedSSHKey,
  163. enableGQUIC)
  164. } else if protocol.TunnelProtocolUsesRefractionNetworking(tunnelProtocol) {
  165. listener, err = refraction.Listen(localAddress)
  166. } else if protocol.TunnelProtocolUsesFrontedMeek(tunnelProtocol) {
  167. listener, err = net.Listen("tcp", localAddress)
  168. } else {
  169. // Only direct, unfronted protocol listeners use TCP BPF circumvention
  170. // programs.
  171. listener, BPFProgramName, err = newTCPListenerWithBPF(support, localAddress)
  172. if protocol.TunnelProtocolUsesTLSOSSH(tunnelProtocol) {
  173. listener, err = ListenTLSTunnel(support, listener, tunnelProtocol, listenPort)
  174. if err != nil {
  175. return errors.Trace(err)
  176. }
  177. }
  178. }
  179. if err != nil {
  180. for _, existingListener := range listeners {
  181. existingListener.Listener.Close()
  182. }
  183. return errors.Trace(err)
  184. }
  185. tacticsListener := NewTacticsListener(
  186. support,
  187. listener,
  188. tunnelProtocol,
  189. func(IP string) GeoIPData { return support.GeoIPService.Lookup(IP) })
  190. log.WithTraceFields(
  191. LogFields{
  192. "localAddress": localAddress,
  193. "tunnelProtocol": tunnelProtocol,
  194. "BPFProgramName": BPFProgramName,
  195. }).Info("listening")
  196. listeners = append(
  197. listeners,
  198. &sshListener{
  199. Listener: tacticsListener,
  200. localAddress: localAddress,
  201. port: listenPort,
  202. tunnelProtocol: tunnelProtocol,
  203. BPFProgramName: BPFProgramName,
  204. })
  205. }
  206. for _, listener := range listeners {
  207. server.runWaitGroup.Add(1)
  208. go func(listener *sshListener) {
  209. defer server.runWaitGroup.Done()
  210. log.WithTraceFields(
  211. LogFields{
  212. "localAddress": listener.localAddress,
  213. "tunnelProtocol": listener.tunnelProtocol,
  214. }).Info("running")
  215. server.sshServer.runListener(
  216. listener,
  217. server.listenerError)
  218. log.WithTraceFields(
  219. LogFields{
  220. "localAddress": listener.localAddress,
  221. "tunnelProtocol": listener.tunnelProtocol,
  222. }).Info("stopped")
  223. }(listener)
  224. }
  225. var err error
  226. select {
  227. case <-server.shutdownBroadcast:
  228. case err = <-server.listenerError:
  229. }
  230. for _, listener := range listeners {
  231. listener.Close()
  232. }
  233. server.sshServer.stopClients()
  234. server.runWaitGroup.Wait()
  235. log.WithTrace().Info("stopped")
  236. return err
  237. }
  238. // GetLoadStats returns load stats for the tunnel server. The stats are
  239. // broken down by protocol ("SSH", "OSSH", etc.) and type. Types of stats
  240. // include current connected client count, total number of current port
  241. // forwards.
  242. func (server *TunnelServer) GetLoadStats() (
  243. UpstreamStats, ProtocolStats, RegionStats) {
  244. return server.sshServer.getLoadStats()
  245. }
  246. // GetEstablishedClientCount returns the number of currently established
  247. // clients.
  248. func (server *TunnelServer) GetEstablishedClientCount() int {
  249. return server.sshServer.getEstablishedClientCount()
  250. }
  251. // ResetAllClientTrafficRules resets all established client traffic rules
  252. // to use the latest config and client properties. Any existing traffic
  253. // rule state is lost, including throttling state.
  254. func (server *TunnelServer) ResetAllClientTrafficRules() {
  255. server.sshServer.resetAllClientTrafficRules()
  256. }
  257. // ResetAllClientOSLConfigs resets all established client OSL state to use
  258. // the latest OSL config. Any existing OSL state is lost, including partial
  259. // progress towards SLOKs.
  260. func (server *TunnelServer) ResetAllClientOSLConfigs() {
  261. server.sshServer.resetAllClientOSLConfigs()
  262. }
  263. // ReloadTactics signals components that use server-side tactics for one-time
  264. // initialization to reload and use potentially changed parameters.
  265. func (server *TunnelServer) ReloadTactics() error {
  266. return errors.Trace(server.sshServer.reloadTactics())
  267. }
  268. // SetEstablishTunnels sets whether new tunnels may be established or not.
  269. // When not establishing, incoming connections are immediately closed.
  270. func (server *TunnelServer) SetEstablishTunnels(establish bool) {
  271. server.sshServer.setEstablishTunnels(establish)
  272. }
  273. // CheckEstablishTunnels returns whether new tunnels may be established or
  274. // not, and increments a metrics counter when establishment is disallowed.
  275. func (server *TunnelServer) CheckEstablishTunnels() bool {
  276. return server.sshServer.checkEstablishTunnels()
  277. }
  278. // CheckLoadLimiting returns whether the server is in the load limiting state,
  279. // which is when EstablishTunnels is false. CheckLoadLimiting is intended to
  280. // be checked by non-tunnel components; no metrics are updated by this call.
  281. func (server *TunnelServer) CheckLoadLimiting() bool {
  282. return server.sshServer.checkLoadLimiting()
  283. }
  284. // GetEstablishTunnelsMetrics returns whether tunnel establishment is
  285. // currently allowed and the number of tunnels rejected since due to not
  286. // establishing since the last GetEstablishTunnelsMetrics call.
  287. func (server *TunnelServer) GetEstablishTunnelsMetrics() (bool, int64) {
  288. return server.sshServer.getEstablishTunnelsMetrics()
  289. }
  290. type sshServer struct {
  291. // Note: 64-bit ints used with atomic operations are placed
  292. // at the start of struct to ensure 64-bit alignment.
  293. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  294. lastAuthLog int64
  295. authFailedCount int64
  296. establishLimitedCount int64
  297. support *SupportServices
  298. establishTunnels int32
  299. concurrentSSHHandshakes semaphore.Semaphore
  300. shutdownBroadcast <-chan struct{}
  301. sshHostKey ssh.Signer
  302. obfuscatorSeedHistory *obfuscator.SeedHistory
  303. inproxyBrokerSessions *inproxy.ServerBrokerSessions
  304. clientsMutex sync.Mutex
  305. stoppingClients bool
  306. acceptedClientCounts map[string]map[string]int64
  307. clients map[string]*sshClient
  308. geoIPSessionCache *cache.Cache
  309. oslSessionCacheMutex sync.Mutex
  310. oslSessionCache *cache.Cache
  311. authorizationSessionIDsMutex sync.Mutex
  312. authorizationSessionIDs map[string]string
  313. meekServersMutex sync.Mutex
  314. meekServers []*MeekServer
  315. }
  316. func newSSHServer(
  317. support *SupportServices,
  318. shutdownBroadcast <-chan struct{}) (*sshServer, error) {
  319. privateKey, err := ssh.ParseRawPrivateKey([]byte(support.Config.SSHPrivateKey))
  320. if err != nil {
  321. return nil, errors.Trace(err)
  322. }
  323. // TODO: use cert (ssh.NewCertSigner) for anti-fingerprint?
  324. signer, err := ssh.NewSignerFromKey(privateKey)
  325. if err != nil {
  326. return nil, errors.Trace(err)
  327. }
  328. var concurrentSSHHandshakes semaphore.Semaphore
  329. if support.Config.MaxConcurrentSSHHandshakes > 0 {
  330. concurrentSSHHandshakes = semaphore.New(support.Config.MaxConcurrentSSHHandshakes)
  331. }
  332. // The geoIPSessionCache replaces the legacy cache that used to be in
  333. // GeoIPServices and was used for the now-retired web API. That cache was
  334. // also used for, and now geoIPSessionCache provides:
  335. // - Determining first-tunnel-in-session (from a single server's point of
  336. // view)
  337. // - GeoIP for duplicate authorizations logic.
  338. //
  339. // TODO: combine geoIPSessionCache with oslSessionCache; need to deal with
  340. // OSL flush on hot reload and reconcile differing TTLs.
  341. geoIPSessionCache := cache.New(GEOIP_SESSION_CACHE_TTL, 1*time.Minute)
  342. // The OSL session cache temporarily retains OSL seed state
  343. // progress for disconnected clients. This enables clients
  344. // that disconnect and immediately reconnect to the same
  345. // server to resume their OSL progress. Cached progress
  346. // is referenced by session ID and is retained for
  347. // OSL_SESSION_CACHE_TTL after disconnect.
  348. //
  349. // Note: session IDs are assumed to be unpredictable. If a
  350. // rogue client could guess the session ID of another client,
  351. // it could resume its OSL progress and, if the OSL config
  352. // were known, infer some activity.
  353. oslSessionCache := cache.New(OSL_SESSION_CACHE_TTL, 1*time.Minute)
  354. // inproxyBrokerSessions are the secure in-proxy broker/server sessions
  355. // used to relay information from the broker to the server, including the
  356. // original in-proxy client IP and the in-proxy proxy ID.
  357. //
  358. // Only brokers with public keys configured in the
  359. // InproxyAllBrokerPublicKeys tactic parameter are allowed to connect to
  360. // the server, and brokers verify the server's public key via the
  361. // InproxySessionPublicKey server entry field.
  362. //
  363. // Sessions are initialized and run for all psiphond instances running any
  364. // in-proxy tunnel protocol.
  365. var inproxyBrokerSessions *inproxy.ServerBrokerSessions
  366. runningInproxy := false
  367. for tunnelProtocol := range support.Config.TunnelProtocolPorts {
  368. if protocol.TunnelProtocolUsesInproxy(tunnelProtocol) {
  369. runningInproxy = true
  370. break
  371. }
  372. }
  373. if runningInproxy {
  374. inproxyPrivateKey, err := inproxy.SessionPrivateKeyFromString(
  375. support.Config.InproxyServerSessionPrivateKey)
  376. if err != nil {
  377. return nil, errors.Trace(err)
  378. }
  379. inproxyObfuscationSecret, err := inproxy.ObfuscationSecretFromString(
  380. support.Config.InproxyServerObfuscationRootSecret)
  381. if err != nil {
  382. return nil, errors.Trace(err)
  383. }
  384. // The expected broker public keys are set in reloadTactics directly
  385. // below, so none are set here.
  386. inproxyBrokerSessions, err = inproxy.NewServerBrokerSessions(
  387. inproxyPrivateKey,
  388. inproxyObfuscationSecret,
  389. nil,
  390. getInproxyBrokerAPIParameterValidator(support.Config),
  391. getInproxyBrokerAPIParameterLogFieldFormatter(),
  392. "inproxy_proxy_") // Prefix for proxy metrics log fields in server_tunnel
  393. if err != nil {
  394. return nil, errors.Trace(err)
  395. }
  396. }
  397. // Limitation: rate limiting and resource limiting are handled by external
  398. // components, and sshServer enforces only a sanity check limit on the
  399. // number of entries in sshServer.clients; and no limit on the number of
  400. // entries in sshServer.geoIPSessionCache or sshServer.oslSessionCache.
  401. //
  402. // To avoid resource exhaustion, this implementation relies on:
  403. //
  404. // - Per-peer IP address and/or overall network connection rate limiting,
  405. // provided by iptables as configured by Psiphon automation
  406. // (https://github.com/Psiphon-Inc/psiphon-automation/blob/
  407. // 4d913d13339d7d54c053a01e5a928e343045cde8/Automation/psi_ops_install.py#L1451).
  408. //
  409. // - Host CPU/memory/network monitoring and signalling, installed Psiphon
  410. // automation
  411. // (https://github.com/Psiphon-Inc/psiphon-automation/blob/
  412. // 4d913d13339d7d54c053a01e5a928e343045cde8/Automation/psi_ops_install.py#L935).
  413. // When resource usage meets certain thresholds, the monitoring signals
  414. // this process with SIGTSTP or SIGCONT, and handlers call
  415. // sshServer.setEstablishTunnels to stop or resume accepting new clients.
  416. sshServer := &sshServer{
  417. support: support,
  418. establishTunnels: 1,
  419. concurrentSSHHandshakes: concurrentSSHHandshakes,
  420. shutdownBroadcast: shutdownBroadcast,
  421. sshHostKey: signer,
  422. acceptedClientCounts: make(map[string]map[string]int64),
  423. clients: make(map[string]*sshClient),
  424. geoIPSessionCache: geoIPSessionCache,
  425. oslSessionCache: oslSessionCache,
  426. authorizationSessionIDs: make(map[string]string),
  427. obfuscatorSeedHistory: obfuscator.NewSeedHistory(nil),
  428. inproxyBrokerSessions: inproxyBrokerSessions,
  429. }
  430. // Initialize components that use server-side tactics and which reload on
  431. // tactics change events.
  432. err = sshServer.reloadTactics()
  433. if err != nil {
  434. return nil, errors.Trace(err)
  435. }
  436. return sshServer, nil
  437. }
  438. func (sshServer *sshServer) setEstablishTunnels(establish bool) {
  439. // Do nothing when the setting is already correct. This avoids
  440. // spurious log messages when setEstablishTunnels is called
  441. // periodically with the same setting.
  442. if establish == (atomic.LoadInt32(&sshServer.establishTunnels) == 1) {
  443. return
  444. }
  445. establishFlag := int32(1)
  446. if !establish {
  447. establishFlag = 0
  448. }
  449. atomic.StoreInt32(&sshServer.establishTunnels, establishFlag)
  450. log.WithTraceFields(
  451. LogFields{"establish": establish}).Info("establishing tunnels")
  452. }
  453. func (sshServer *sshServer) checkEstablishTunnels() bool {
  454. establishTunnels := atomic.LoadInt32(&sshServer.establishTunnels) == 1
  455. if !establishTunnels {
  456. atomic.AddInt64(&sshServer.establishLimitedCount, 1)
  457. }
  458. return establishTunnels
  459. }
  460. func (sshServer *sshServer) checkLoadLimiting() bool {
  461. // The server is in a general load limiting state when
  462. // sshServer.establishTunnels is false (0). This check is intended to be
  463. // used by non-tunnel components and no metrics are updated by this call.
  464. return atomic.LoadInt32(&sshServer.establishTunnels) == 0
  465. }
  466. func (sshServer *sshServer) getEstablishTunnelsMetrics() (bool, int64) {
  467. return atomic.LoadInt32(&sshServer.establishTunnels) == 1,
  468. atomic.SwapInt64(&sshServer.establishLimitedCount, 0)
  469. }
  470. // additionalTransportData is additional data gathered at transport level,
  471. // such as in MeekServer at the HTTP layer, and relayed to the
  472. // sshServer/sshClient.
  473. type additionalTransportData struct {
  474. overrideTunnelProtocol string
  475. steeringIP string
  476. }
  477. // reportListenerError logs a listener error and sends it the
  478. // TunnelServer.Run. Callers should wrap the input err in an immediate
  479. // errors.Trace.
  480. func reportListenerError(listenerError chan<- error, err error) {
  481. // Record "caller" just in case the caller fails to wrap err in an
  482. // errors.Trace.
  483. log.WithTraceFields(
  484. LogFields{
  485. "error": err,
  486. "caller": stacktrace.GetParentFunctionName()}).Error("listener error")
  487. select {
  488. case listenerError <- err:
  489. default:
  490. }
  491. }
  492. // runListener is intended to run an a goroutine; it blocks
  493. // running a particular listener. If an unrecoverable error
  494. // occurs, it will send the error to the listenerError channel.
  495. func (sshServer *sshServer) runListener(sshListener *sshListener, listenerError chan<- error) {
  496. handleClient := func(conn net.Conn, transportData *additionalTransportData) {
  497. // Note: establish tunnel limiter cannot simply stop TCP
  498. // listeners in all cases (e.g., meek) since SSH tunnels can
  499. // span multiple TCP connections.
  500. if !sshServer.checkEstablishTunnels() {
  501. if IsLogLevelDebug() {
  502. log.WithTrace().Debug("not establishing tunnels")
  503. }
  504. conn.Close()
  505. return
  506. }
  507. // sshListener.tunnelProtocol indictes the tunnel protocol run by the
  508. // listener. For direct protocols, this is also the client tunnel protocol.
  509. // For fronted protocols, the client may use a different protocol to connect
  510. // to the front and then only the front-to-Psiphon server will use the
  511. // listener protocol.
  512. //
  513. // A fronted meek client, for example, reports its first hop protocol in
  514. // protocol.MeekCookieData.ClientTunnelProtocol. Most metrics record this
  515. // value as relay_protocol, since the first hop is the one subject to
  516. // adversarial conditions. In some cases, such as irregular tunnels, there
  517. // is no ClientTunnelProtocol value available and the listener tunnel
  518. // protocol will be logged.
  519. //
  520. // Similarly, listenerPort indicates the listening port, which is the dialed
  521. // port number for direct protocols; while, for fronted protocols, the
  522. // client may dial a different port for its first hop.
  523. // Process each client connection concurrently.
  524. go sshServer.handleClient(sshListener, conn, transportData)
  525. }
  526. // Note: when exiting due to a unrecoverable error, be sure
  527. // to try to send the error to listenerError so that the outer
  528. // TunnelServer.Run will properly shut down instead of remaining
  529. // running.
  530. if protocol.TunnelProtocolUsesMeekHTTP(sshListener.tunnelProtocol) ||
  531. protocol.TunnelProtocolUsesMeekHTTPS(sshListener.tunnelProtocol) {
  532. if sshServer.tunnelProtocolUsesTLSDemux(sshListener.tunnelProtocol) {
  533. sshServer.runMeekTLSOSSHDemuxListener(sshListener, listenerError, handleClient)
  534. } else {
  535. meekServer, err := NewMeekServer(
  536. sshServer.support,
  537. sshListener.Listener,
  538. sshListener.tunnelProtocol,
  539. sshListener.port,
  540. protocol.TunnelProtocolUsesMeekHTTPS(sshListener.tunnelProtocol),
  541. protocol.TunnelProtocolUsesFrontedMeek(sshListener.tunnelProtocol),
  542. protocol.TunnelProtocolUsesObfuscatedSessionTickets(sshListener.tunnelProtocol),
  543. true,
  544. handleClient,
  545. sshServer.shutdownBroadcast)
  546. if err == nil {
  547. sshServer.registerMeekServer(meekServer)
  548. err = meekServer.Run()
  549. }
  550. if err != nil {
  551. reportListenerError(listenerError, errors.Trace(err))
  552. return
  553. }
  554. }
  555. } else {
  556. runListener(sshListener.Listener, sshServer.shutdownBroadcast, listenerError, "", handleClient)
  557. }
  558. }
  559. // runMeekTLSOSSHDemuxListener blocks running a listener which demuxes meek and
  560. // TLS-OSSH connections received on the same port.
  561. func (sshServer *sshServer) runMeekTLSOSSHDemuxListener(
  562. sshListener *sshListener,
  563. listenerError chan<- error,
  564. handleClient func(conn net.Conn, transportData *additionalTransportData)) {
  565. meekClassifier := protocolClassifier{
  566. minBytesToMatch: 4,
  567. maxBytesToMatch: 4,
  568. match: func(b []byte) bool {
  569. // NOTE: HTTP transforms are only applied to plain HTTP
  570. // meek so they are not a concern here.
  571. return bytes.Contains(b, []byte("POST"))
  572. },
  573. }
  574. tlsClassifier := protocolClassifier{
  575. // NOTE: technically +1 not needed if detectors are evaluated
  576. // in order by index in classifier array, which they are.
  577. minBytesToMatch: meekClassifier.maxBytesToMatch + 1,
  578. maxBytesToMatch: meekClassifier.maxBytesToMatch + 1,
  579. match: func(b []byte) bool {
  580. return len(b) > 4 // if not classified as meek, then tls
  581. },
  582. }
  583. listener, err := ListenTLSTunnel(
  584. sshServer.support,
  585. sshListener.Listener,
  586. sshListener.tunnelProtocol,
  587. sshListener.port)
  588. if err != nil {
  589. reportListenerError(listenerError, errors.Trace(err))
  590. return
  591. }
  592. mux, listeners := newProtocolDemux(
  593. context.Background(),
  594. listener,
  595. []protocolClassifier{meekClassifier, tlsClassifier},
  596. sshServer.support.Config.sshHandshakeTimeout)
  597. var wg sync.WaitGroup
  598. wg.Add(1)
  599. go func() {
  600. // handle shutdown gracefully
  601. defer wg.Done()
  602. <-sshServer.shutdownBroadcast
  603. err := mux.Close()
  604. if err != nil {
  605. log.WithTraceFields(LogFields{"error": err}).Error("close failed")
  606. }
  607. }()
  608. wg.Add(1)
  609. go func() {
  610. // start demultiplexing TLS-OSSH and meek HTTPS connections
  611. defer wg.Done()
  612. err := mux.run()
  613. if err != nil {
  614. reportListenerError(listenerError, errors.Trace(err))
  615. return
  616. }
  617. }()
  618. wg.Add(1)
  619. go func() {
  620. // start handling TLS-OSSH connections as they are demultiplexed
  621. defer wg.Done()
  622. // Override the listener tunnel protocol to report TLS-OSSH instead.
  623. runListener(
  624. listeners[1],
  625. sshServer.shutdownBroadcast,
  626. listenerError,
  627. protocol.TUNNEL_PROTOCOL_TLS_OBFUSCATED_SSH, handleClient)
  628. }()
  629. wg.Add(1)
  630. go func() {
  631. // start handling meek HTTPS connections as they are
  632. // demultiplexed
  633. defer wg.Done()
  634. meekServer, err := NewMeekServer(
  635. sshServer.support,
  636. listeners[0],
  637. sshListener.tunnelProtocol,
  638. sshListener.port,
  639. false,
  640. protocol.TunnelProtocolUsesFrontedMeek(sshListener.tunnelProtocol),
  641. protocol.TunnelProtocolUsesObfuscatedSessionTickets(sshListener.tunnelProtocol),
  642. true,
  643. handleClient,
  644. sshServer.shutdownBroadcast)
  645. if err == nil {
  646. sshServer.registerMeekServer(meekServer)
  647. err = meekServer.Run()
  648. }
  649. if err != nil {
  650. reportListenerError(listenerError, errors.Trace(err))
  651. return
  652. }
  653. }()
  654. wg.Wait()
  655. }
  656. func runListener(
  657. listener net.Listener,
  658. shutdownBroadcast <-chan struct{},
  659. listenerError chan<- error,
  660. overrideTunnelProtocol string,
  661. handleClient func(conn net.Conn, transportData *additionalTransportData)) {
  662. for {
  663. conn, err := listener.Accept()
  664. select {
  665. case <-shutdownBroadcast:
  666. if err == nil {
  667. conn.Close()
  668. }
  669. return
  670. default:
  671. }
  672. if err != nil {
  673. if e, ok := err.(net.Error); ok && e.Temporary() {
  674. log.WithTraceFields(LogFields{"error": err}).Error("accept failed")
  675. // Temporary error, keep running
  676. continue
  677. } else if std_errors.Is(err, errRestrictedProvider) {
  678. log.WithTraceFields(LogFields{"error": err}).Error("accept rejected client")
  679. // Restricted provider, keep running
  680. continue
  681. }
  682. reportListenerError(listenerError, errors.Trace(err))
  683. return
  684. }
  685. var transportData *additionalTransportData
  686. if overrideTunnelProtocol != "" {
  687. transportData = &additionalTransportData{
  688. overrideTunnelProtocol: overrideTunnelProtocol,
  689. }
  690. }
  691. handleClient(conn, transportData)
  692. }
  693. }
  694. // registerMeekServer registers a MeekServer instance to receive tactics
  695. // reload signals.
  696. func (sshServer *sshServer) registerMeekServer(meekServer *MeekServer) {
  697. sshServer.meekServersMutex.Lock()
  698. defer sshServer.meekServersMutex.Unlock()
  699. sshServer.meekServers = append(sshServer.meekServers, meekServer)
  700. }
  701. // reloadMeekServerTactics signals each registered MeekServer instance that
  702. // tactics have reloaded and may have changed.
  703. func (sshServer *sshServer) reloadMeekServerTactics() error {
  704. sshServer.meekServersMutex.Lock()
  705. defer sshServer.meekServersMutex.Unlock()
  706. for _, meekServer := range sshServer.meekServers {
  707. err := meekServer.ReloadTactics()
  708. if err != nil {
  709. return errors.Trace(err)
  710. }
  711. }
  712. return nil
  713. }
  714. // An accepted client has completed a direct TCP or meek connection and has a
  715. // net.Conn. Registration is for tracking the number of connections.
  716. func (sshServer *sshServer) registerAcceptedClient(tunnelProtocol, region string) {
  717. sshServer.clientsMutex.Lock()
  718. defer sshServer.clientsMutex.Unlock()
  719. if sshServer.acceptedClientCounts[tunnelProtocol] == nil {
  720. sshServer.acceptedClientCounts[tunnelProtocol] = make(map[string]int64)
  721. }
  722. sshServer.acceptedClientCounts[tunnelProtocol][region] += 1
  723. }
  724. func (sshServer *sshServer) unregisterAcceptedClient(tunnelProtocol, region string) {
  725. sshServer.clientsMutex.Lock()
  726. defer sshServer.clientsMutex.Unlock()
  727. sshServer.acceptedClientCounts[tunnelProtocol][region] -= 1
  728. }
  729. // An established client has completed its SSH handshake and has a ssh.Conn. Registration is
  730. // for tracking the number of fully established clients and for maintaining a list of running
  731. // clients (for stopping at shutdown time).
  732. func (sshServer *sshServer) registerEstablishedClient(client *sshClient) bool {
  733. sshServer.clientsMutex.Lock()
  734. if sshServer.stoppingClients {
  735. sshServer.clientsMutex.Unlock()
  736. return false
  737. }
  738. // In the case of a duplicate client sessionID, the previous client is closed.
  739. // - Well-behaved clients generate a random sessionID that should be unique (won't
  740. // accidentally conflict) and hard to guess (can't be targeted by a malicious
  741. // client).
  742. // - Clients reuse the same sessionID when a tunnel is unexpectedly disconnected
  743. // and reestablished. In this case, when the same server is selected, this logic
  744. // will be hit; closing the old, dangling client is desirable.
  745. // - Multi-tunnel clients should not normally use one server for multiple tunnels.
  746. existingClient := sshServer.clients[client.sessionID]
  747. sshServer.clientsMutex.Unlock()
  748. if existingClient != nil {
  749. // This case is expected to be common, and so logged at the lowest severity
  750. // level.
  751. log.WithTrace().Debug(
  752. "stopping existing client with duplicate session ID")
  753. existingClient.stop()
  754. // Block until the existingClient is fully terminated. This is necessary to
  755. // avoid this scenario:
  756. // - existingClient is invoking handshakeAPIRequestHandler
  757. // - sshServer.clients[client.sessionID] is updated to point to new client
  758. // - existingClient's handshakeAPIRequestHandler invokes
  759. // setHandshakeState but sets the handshake parameters for new
  760. // client
  761. // - as a result, the new client handshake will fail (only a single handshake
  762. // is permitted) and the new client server_tunnel log will contain an
  763. // invalid mix of existing/new client fields
  764. //
  765. // Once existingClient.awaitStopped returns, all existingClient port
  766. // forwards and request handlers have terminated, so no API handler, either
  767. // tunneled web API or SSH API, will remain and it is safe to point
  768. // sshServer.clients[client.sessionID] to the new client.
  769. // Limitation: this scenario remains possible with _untunneled_ web API
  770. // requests.
  771. //
  772. // Blocking also ensures existingClient.releaseAuthorizations is invoked before
  773. // the new client attempts to submit the same authorizations.
  774. //
  775. // Perform blocking awaitStopped operation outside the
  776. // sshServer.clientsMutex mutex to avoid blocking all other clients for the
  777. // duration. We still expect and require that the stop process completes
  778. // rapidly, e.g., does not block on network I/O, allowing the new client
  779. // connection to proceed without delay.
  780. //
  781. // In addition, operations triggered by stop, and which must complete before
  782. // awaitStopped returns, will attempt to lock sshServer.clientsMutex,
  783. // including unregisterEstablishedClient.
  784. existingClient.awaitStopped()
  785. }
  786. sshServer.clientsMutex.Lock()
  787. defer sshServer.clientsMutex.Unlock()
  788. // existingClient's stop will have removed it from sshServer.clients via
  789. // unregisterEstablishedClient, so sshServer.clients[client.sessionID] should
  790. // be nil -- unless yet another client instance using the same sessionID has
  791. // connected in the meantime while awaiting existingClient stop. In this
  792. // case, it's not clear which is the most recent connection from the client,
  793. // so instead of this connection terminating more peers, it aborts.
  794. if sshServer.clients[client.sessionID] != nil {
  795. // As this is expected to be rare case, it's logged at a higher severity
  796. // level.
  797. log.WithTrace().Warning(
  798. "aborting new client with duplicate session ID")
  799. return false
  800. }
  801. // SSH_MAX_CLIENT_COUNT is a simple sanity check and failsafe. Load
  802. // limiting tuned to each server's host resources is provided by external
  803. // components. See comment in newSSHServer for more details.
  804. if len(sshServer.clients) >= SSH_MAX_CLIENT_COUNT {
  805. log.WithTrace().Warning("SSH_MAX_CLIENT_COUNT exceeded")
  806. return false
  807. }
  808. sshServer.clients[client.sessionID] = client
  809. return true
  810. }
  811. func (sshServer *sshServer) unregisterEstablishedClient(client *sshClient) {
  812. sshServer.clientsMutex.Lock()
  813. registeredClient := sshServer.clients[client.sessionID]
  814. // registeredClient will differ from client when client is the existingClient
  815. // terminated in registerEstablishedClient. In that case, registeredClient
  816. // remains connected, and the sshServer.clients entry should be retained.
  817. if registeredClient == client {
  818. delete(sshServer.clients, client.sessionID)
  819. }
  820. sshServer.clientsMutex.Unlock()
  821. client.stop()
  822. }
  823. type UpstreamStats map[string]interface{}
  824. type ProtocolStats map[string]map[string]interface{}
  825. type RegionStats map[string]map[string]map[string]interface{}
  826. func (sshServer *sshServer) getLoadStats() (
  827. UpstreamStats, ProtocolStats, RegionStats) {
  828. sshServer.clientsMutex.Lock()
  829. defer sshServer.clientsMutex.Unlock()
  830. // Explicitly populate with zeros to ensure 0 counts in log messages.
  831. zeroClientStats := func() map[string]interface{} {
  832. stats := make(map[string]interface{})
  833. stats["accepted_clients"] = int64(0)
  834. stats["established_clients"] = int64(0)
  835. return stats
  836. }
  837. // Due to hot reload and changes to the underlying system configuration, the
  838. // set of resolver IPs may change between getLoadStats calls, so this
  839. // enumeration for zeroing is a best effort.
  840. resolverIPs := sshServer.support.DNSResolver.GetAll()
  841. // Fields which are primarily concerned with upstream/egress performance.
  842. zeroUpstreamStats := func() map[string]interface{} {
  843. stats := make(map[string]interface{})
  844. stats["dialing_tcp_port_forwards"] = int64(0)
  845. stats["tcp_port_forwards"] = int64(0)
  846. stats["total_tcp_port_forwards"] = int64(0)
  847. stats["udp_port_forwards"] = int64(0)
  848. stats["total_udp_port_forwards"] = int64(0)
  849. stats["tcp_port_forward_dialed_count"] = int64(0)
  850. stats["tcp_port_forward_dialed_duration"] = int64(0)
  851. stats["tcp_port_forward_failed_count"] = int64(0)
  852. stats["tcp_port_forward_failed_duration"] = int64(0)
  853. stats["tcp_port_forward_rejected_dialing_limit_count"] = int64(0)
  854. stats["tcp_port_forward_rejected_disallowed_count"] = int64(0)
  855. stats["udp_port_forward_rejected_disallowed_count"] = int64(0)
  856. stats["tcp_ipv4_port_forward_dialed_count"] = int64(0)
  857. stats["tcp_ipv4_port_forward_dialed_duration"] = int64(0)
  858. stats["tcp_ipv4_port_forward_failed_count"] = int64(0)
  859. stats["tcp_ipv4_port_forward_failed_duration"] = int64(0)
  860. stats["tcp_ipv6_port_forward_dialed_count"] = int64(0)
  861. stats["tcp_ipv6_port_forward_dialed_duration"] = int64(0)
  862. stats["tcp_ipv6_port_forward_failed_count"] = int64(0)
  863. stats["tcp_ipv6_port_forward_failed_duration"] = int64(0)
  864. zeroDNSStats := func() map[string]int64 {
  865. m := map[string]int64{"ALL": 0}
  866. for _, resolverIP := range resolverIPs {
  867. m[resolverIP.String()] = 0
  868. }
  869. return m
  870. }
  871. stats["dns_count"] = zeroDNSStats()
  872. stats["dns_duration"] = zeroDNSStats()
  873. stats["dns_failed_count"] = zeroDNSStats()
  874. stats["dns_failed_duration"] = zeroDNSStats()
  875. return stats
  876. }
  877. zeroProtocolStats := func() map[string]map[string]interface{} {
  878. stats := make(map[string]map[string]interface{})
  879. stats["ALL"] = zeroClientStats()
  880. for tunnelProtocol := range sshServer.support.Config.TunnelProtocolPorts {
  881. stats[tunnelProtocol] = zeroClientStats()
  882. if sshServer.tunnelProtocolUsesTLSDemux(tunnelProtocol) {
  883. stats[protocol.TUNNEL_PROTOCOL_TLS_OBFUSCATED_SSH] = zeroClientStats()
  884. }
  885. }
  886. return stats
  887. }
  888. addInt64 := func(stats map[string]interface{}, name string, value int64) {
  889. stats[name] = stats[name].(int64) + value
  890. }
  891. upstreamStats := zeroUpstreamStats()
  892. // [<protocol or ALL>][<stat name>] -> count
  893. protocolStats := zeroProtocolStats()
  894. // [<region][<protocol or ALL>][<stat name>] -> count
  895. regionStats := make(RegionStats)
  896. // Note: as currently tracked/counted, each established client is also an accepted client
  897. // Accepted client counts use peer GeoIP data, which in the case of
  898. // in-proxy tunnel protocols is the proxy, not the client. The original
  899. // client IP is only obtained after the tunnel handshake has completed.
  900. for tunnelProtocol, regionAcceptedClientCounts := range sshServer.acceptedClientCounts {
  901. for region, acceptedClientCount := range regionAcceptedClientCounts {
  902. if acceptedClientCount > 0 {
  903. if regionStats[region] == nil {
  904. regionStats[region] = zeroProtocolStats()
  905. }
  906. addInt64(protocolStats["ALL"], "accepted_clients", acceptedClientCount)
  907. addInt64(protocolStats[tunnelProtocol], "accepted_clients", acceptedClientCount)
  908. addInt64(regionStats[region]["ALL"], "accepted_clients", acceptedClientCount)
  909. addInt64(regionStats[region][tunnelProtocol], "accepted_clients", acceptedClientCount)
  910. }
  911. }
  912. }
  913. for _, client := range sshServer.clients {
  914. client.Lock()
  915. // Limitation: registerEstablishedClient is called before the
  916. // handshake API completes; as a result, in the case of in-proxy
  917. // tunnel protocol, clientGeoIPData may not yet be initialized and
  918. // will count as None.
  919. tunnelProtocol := client.tunnelProtocol
  920. region := client.clientGeoIPData.Country
  921. if regionStats[region] == nil {
  922. regionStats[region] = zeroProtocolStats()
  923. }
  924. for _, stats := range []map[string]interface{}{
  925. protocolStats["ALL"],
  926. protocolStats[tunnelProtocol],
  927. regionStats[region]["ALL"],
  928. regionStats[region][tunnelProtocol]} {
  929. addInt64(stats, "established_clients", 1)
  930. }
  931. // Note:
  932. // - can't sum trafficState.peakConcurrentPortForwardCount to get a global peak
  933. // - client.udpTrafficState.concurrentDialingPortForwardCount isn't meaningful
  934. addInt64(upstreamStats, "dialing_tcp_port_forwards",
  935. client.tcpTrafficState.concurrentDialingPortForwardCount)
  936. addInt64(upstreamStats, "tcp_port_forwards",
  937. client.tcpTrafficState.concurrentPortForwardCount)
  938. addInt64(upstreamStats, "total_tcp_port_forwards",
  939. client.tcpTrafficState.totalPortForwardCount)
  940. addInt64(upstreamStats, "udp_port_forwards",
  941. client.udpTrafficState.concurrentPortForwardCount)
  942. addInt64(upstreamStats, "total_udp_port_forwards",
  943. client.udpTrafficState.totalPortForwardCount)
  944. addInt64(upstreamStats, "tcp_port_forward_dialed_count",
  945. client.qualityMetrics.TCPPortForwardDialedCount)
  946. addInt64(upstreamStats, "tcp_port_forward_dialed_duration",
  947. int64(client.qualityMetrics.TCPPortForwardDialedDuration/time.Millisecond))
  948. addInt64(upstreamStats, "tcp_port_forward_failed_count",
  949. client.qualityMetrics.TCPPortForwardFailedCount)
  950. addInt64(upstreamStats, "tcp_port_forward_failed_duration",
  951. int64(client.qualityMetrics.TCPPortForwardFailedDuration/time.Millisecond))
  952. addInt64(upstreamStats, "tcp_port_forward_rejected_dialing_limit_count",
  953. client.qualityMetrics.TCPPortForwardRejectedDialingLimitCount)
  954. addInt64(upstreamStats, "tcp_port_forward_rejected_disallowed_count",
  955. client.qualityMetrics.TCPPortForwardRejectedDisallowedCount)
  956. addInt64(upstreamStats, "udp_port_forward_rejected_disallowed_count",
  957. client.qualityMetrics.UDPPortForwardRejectedDisallowedCount)
  958. addInt64(upstreamStats, "tcp_ipv4_port_forward_dialed_count",
  959. client.qualityMetrics.TCPIPv4PortForwardDialedCount)
  960. addInt64(upstreamStats, "tcp_ipv4_port_forward_dialed_duration",
  961. int64(client.qualityMetrics.TCPIPv4PortForwardDialedDuration/time.Millisecond))
  962. addInt64(upstreamStats, "tcp_ipv4_port_forward_failed_count",
  963. client.qualityMetrics.TCPIPv4PortForwardFailedCount)
  964. addInt64(upstreamStats, "tcp_ipv4_port_forward_failed_duration",
  965. int64(client.qualityMetrics.TCPIPv4PortForwardFailedDuration/time.Millisecond))
  966. addInt64(upstreamStats, "tcp_ipv6_port_forward_dialed_count",
  967. client.qualityMetrics.TCPIPv6PortForwardDialedCount)
  968. addInt64(upstreamStats, "tcp_ipv6_port_forward_dialed_duration",
  969. int64(client.qualityMetrics.TCPIPv6PortForwardDialedDuration/time.Millisecond))
  970. addInt64(upstreamStats, "tcp_ipv6_port_forward_failed_count",
  971. client.qualityMetrics.TCPIPv6PortForwardFailedCount)
  972. addInt64(upstreamStats, "tcp_ipv6_port_forward_failed_duration",
  973. int64(client.qualityMetrics.TCPIPv6PortForwardFailedDuration/time.Millisecond))
  974. // DNS metrics limitations:
  975. // - port forwards (sshClient.handleTCPChannel) don't know or log the resolver IP.
  976. // - udpgw and packet tunnel transparent DNS use a heuristic to classify success/failure,
  977. // and there may be some delay before these code paths report DNS metrics.
  978. // Every client.qualityMetrics DNS map has an "ALL" entry.
  979. totalDNSCount := int64(0)
  980. totalDNSFailedCount := int64(0)
  981. for key, value := range client.qualityMetrics.DNSCount {
  982. upstreamStats["dns_count"].(map[string]int64)[key] += value
  983. totalDNSCount += value
  984. }
  985. for key, value := range client.qualityMetrics.DNSDuration {
  986. upstreamStats["dns_duration"].(map[string]int64)[key] += int64(value / time.Millisecond)
  987. }
  988. for key, value := range client.qualityMetrics.DNSFailedCount {
  989. upstreamStats["dns_failed_count"].(map[string]int64)[key] += value
  990. totalDNSFailedCount += value
  991. }
  992. for key, value := range client.qualityMetrics.DNSFailedDuration {
  993. upstreamStats["dns_failed_duration"].(map[string]int64)[key] += int64(value / time.Millisecond)
  994. }
  995. // Update client peak failure rate metrics, to be recorded in
  996. // server_tunnel.
  997. //
  998. // Limitations:
  999. //
  1000. // - This is a simple data sampling that doesn't require additional
  1001. // timers or tracking logic. Since the rates are calculated on
  1002. // getLoadStats events and using accumulated counts, these peaks
  1003. // only represent the highest failure rate within a
  1004. // Config.LoadMonitorPeriodSeconds non-sliding window. There is no
  1005. // sample recorded for short tunnels with no overlapping
  1006. // getLoadStats event.
  1007. //
  1008. // - There is no minimum sample window, as a getLoadStats event may
  1009. // occur immediately after a client first connects. This may be
  1010. // compensated for by adjusting
  1011. // Config.PeakUpstreamFailureRateMinimumSampleSize, so as to only
  1012. // consider failure rates with a larger number of samples.
  1013. //
  1014. // - Non-UDP "failures" are not currently tracked.
  1015. minimumSampleSize := int64(sshServer.support.Config.peakUpstreamFailureRateMinimumSampleSize)
  1016. sampleSize := client.qualityMetrics.TCPPortForwardDialedCount +
  1017. client.qualityMetrics.TCPPortForwardFailedCount
  1018. if sampleSize >= minimumSampleSize {
  1019. TCPPortForwardFailureRate := float64(client.qualityMetrics.TCPPortForwardFailedCount) /
  1020. float64(sampleSize)
  1021. if client.peakMetrics.TCPPortForwardFailureRate == nil {
  1022. client.peakMetrics.TCPPortForwardFailureRate = new(float64)
  1023. *client.peakMetrics.TCPPortForwardFailureRate = TCPPortForwardFailureRate
  1024. client.peakMetrics.TCPPortForwardFailureRateSampleSize = new(int64)
  1025. *client.peakMetrics.TCPPortForwardFailureRateSampleSize = sampleSize
  1026. } else if *client.peakMetrics.TCPPortForwardFailureRate < TCPPortForwardFailureRate {
  1027. *client.peakMetrics.TCPPortForwardFailureRate = TCPPortForwardFailureRate
  1028. *client.peakMetrics.TCPPortForwardFailureRateSampleSize = sampleSize
  1029. }
  1030. }
  1031. sampleSize = totalDNSCount + totalDNSFailedCount
  1032. if sampleSize >= minimumSampleSize {
  1033. DNSFailureRate := float64(totalDNSFailedCount) / float64(sampleSize)
  1034. if client.peakMetrics.DNSFailureRate == nil {
  1035. client.peakMetrics.DNSFailureRate = new(float64)
  1036. *client.peakMetrics.DNSFailureRate = DNSFailureRate
  1037. client.peakMetrics.DNSFailureRateSampleSize = new(int64)
  1038. *client.peakMetrics.DNSFailureRateSampleSize = sampleSize
  1039. } else if *client.peakMetrics.DNSFailureRate < DNSFailureRate {
  1040. *client.peakMetrics.DNSFailureRate = DNSFailureRate
  1041. *client.peakMetrics.DNSFailureRateSampleSize = sampleSize
  1042. }
  1043. }
  1044. // Reset quality metrics counters
  1045. client.qualityMetrics.reset()
  1046. client.Unlock()
  1047. }
  1048. for _, client := range sshServer.clients {
  1049. client.Lock()
  1050. // Update client peak proximate (same region) concurrently connected
  1051. // (other clients) client metrics, to be recorded in server_tunnel.
  1052. // This operation requires a second loop over sshServer.clients since
  1053. // established_clients is calculated in the first loop.
  1054. //
  1055. // Limitations:
  1056. //
  1057. // - This is an approximation, not a true peak, as it only samples
  1058. // data every Config.LoadMonitorPeriodSeconds period. There is no
  1059. // sample recorded for short tunnels with no overlapping
  1060. // getLoadStats event.
  1061. //
  1062. // - The "-1" calculation counts all but the current client as other
  1063. // clients; it can be the case that the same client has a dangling
  1064. // accepted connection that has yet to time-out server side. Due to
  1065. // NAT, we can't determine if the client is the same based on
  1066. // network address. For established clients,
  1067. // registerEstablishedClient ensures that any previous connection
  1068. // is first terminated, although this is only for the same
  1069. // session_id. Concurrent proximate clients may be considered an
  1070. // exact number of other _network connections_, even from the same
  1071. // client.
  1072. //
  1073. // Futhermore, since client.Locks aren't held between the previous
  1074. // loop and this one, it's also possible that the client's
  1075. // clientGeoIPData was None in the previous loop and is now not
  1076. // None. In this case, the regionStats may not be populated at all
  1077. // for the client's current region; if so, the client is skipped.
  1078. // This scenario can also result in a proximate undercount by one,
  1079. // when the regionStats _is_ populated: this client was counted
  1080. // under None, not the current client.peerGeoIPData.Country, so
  1081. // the -1 subtracts some _other_ client from the populated regionStats.
  1082. //
  1083. // - For in-proxy protocols, the accepted proximate metric uses the
  1084. // peer GeoIP, which represents the proxy, not the client.
  1085. stats := regionStats[client.peerGeoIPData.Country]["ALL"]
  1086. n := stats["accepted_clients"].(int64) - 1
  1087. if n >= 0 {
  1088. if client.peakMetrics.concurrentProximateAcceptedClients == nil {
  1089. client.peakMetrics.concurrentProximateAcceptedClients = new(int64)
  1090. *client.peakMetrics.concurrentProximateAcceptedClients = n
  1091. } else if *client.peakMetrics.concurrentProximateAcceptedClients < n {
  1092. *client.peakMetrics.concurrentProximateAcceptedClients = n
  1093. }
  1094. }
  1095. // Handle the in-proxy None and None/not-None cases (and any other
  1096. // potential scenario where regionStats[client.clientGeoIPData.Country]
  1097. // may not be populated).
  1098. if client.clientGeoIPData.Country == GEOIP_UNKNOWN_VALUE ||
  1099. regionStats[client.clientGeoIPData.Country] == nil {
  1100. client.Unlock()
  1101. continue
  1102. }
  1103. stats = regionStats[client.clientGeoIPData.Country]["ALL"]
  1104. n = stats["established_clients"].(int64) - 1
  1105. if n >= 0 {
  1106. if client.peakMetrics.concurrentProximateEstablishedClients == nil {
  1107. client.peakMetrics.concurrentProximateEstablishedClients = new(int64)
  1108. *client.peakMetrics.concurrentProximateEstablishedClients = n
  1109. } else if *client.peakMetrics.concurrentProximateEstablishedClients < n {
  1110. *client.peakMetrics.concurrentProximateEstablishedClients = n
  1111. }
  1112. }
  1113. client.Unlock()
  1114. }
  1115. return upstreamStats, protocolStats, regionStats
  1116. }
  1117. func (sshServer *sshServer) getEstablishedClientCount() int {
  1118. sshServer.clientsMutex.Lock()
  1119. defer sshServer.clientsMutex.Unlock()
  1120. establishedClients := len(sshServer.clients)
  1121. return establishedClients
  1122. }
  1123. func (sshServer *sshServer) resetAllClientTrafficRules() {
  1124. sshServer.clientsMutex.Lock()
  1125. clients := make(map[string]*sshClient)
  1126. for sessionID, client := range sshServer.clients {
  1127. clients[sessionID] = client
  1128. }
  1129. sshServer.clientsMutex.Unlock()
  1130. for _, client := range clients {
  1131. client.setTrafficRules()
  1132. }
  1133. }
  1134. func (sshServer *sshServer) resetAllClientOSLConfigs() {
  1135. // Flush cached seed state. This has the same effect
  1136. // and same limitations as calling setOSLConfig for
  1137. // currently connected clients -- all progress is lost.
  1138. sshServer.oslSessionCacheMutex.Lock()
  1139. sshServer.oslSessionCache.Flush()
  1140. sshServer.oslSessionCacheMutex.Unlock()
  1141. sshServer.clientsMutex.Lock()
  1142. clients := make(map[string]*sshClient)
  1143. for sessionID, client := range sshServer.clients {
  1144. clients[sessionID] = client
  1145. }
  1146. sshServer.clientsMutex.Unlock()
  1147. for _, client := range clients {
  1148. client.setOSLConfig()
  1149. }
  1150. }
  1151. // reloadTactics signals/invokes components that use server-side tactics for
  1152. // one-time initialization to reload and use potentially changed parameters.
  1153. func (sshServer *sshServer) reloadTactics() error {
  1154. // The following in-proxy components use server-side tactics with a
  1155. // one-time initialization:
  1156. //
  1157. // - For servers running in-proxy tunnel protocols,
  1158. // sshServer.inproxyBrokerSessions are the broker/server sessions and
  1159. // the set of expected broker public keys is set from tactics.
  1160. // - For servers running a broker within MeekServer, broker operational
  1161. // configuration is set from tactics.
  1162. //
  1163. // For these components, one-time initialization is more efficient than
  1164. // constantly fetching tactics. Instead, these components reinitialize
  1165. // when tactics change.
  1166. // sshServer.inproxyBrokerSessions is not nil when the server is running
  1167. // in-proxy tunnel protocols.
  1168. if sshServer.inproxyBrokerSessions != nil {
  1169. // Get InproxyAllBrokerPublicKeys from tactics.
  1170. //
  1171. // Limitation: assumes no GeoIP targeting for InproxyAllBrokerPublicKeys.
  1172. p, err := sshServer.support.ServerTacticsParametersCache.Get(NewGeoIPData())
  1173. if err != nil {
  1174. return errors.Trace(err)
  1175. }
  1176. if !p.IsNil() {
  1177. brokerPublicKeys, err := inproxy.SessionPublicKeysFromStrings(
  1178. p.Strings(parameters.InproxyAllBrokerPublicKeys))
  1179. if err != nil {
  1180. return errors.Trace(err)
  1181. }
  1182. // SetKnownBrokerPublicKeys will terminate any existing sessions
  1183. // for broker public keys no longer in the known/expected list;
  1184. // but will retain any existing sessions for broker public keys
  1185. // that remain in the list.
  1186. err = sshServer.inproxyBrokerSessions.SetKnownBrokerPublicKeys(brokerPublicKeys)
  1187. if err != nil {
  1188. return errors.Trace(err)
  1189. }
  1190. }
  1191. }
  1192. err := sshServer.reloadMeekServerTactics()
  1193. if err != nil {
  1194. return errors.Trace(err)
  1195. }
  1196. return nil
  1197. }
  1198. func (sshServer *sshServer) revokeClientAuthorizations(sessionID string) {
  1199. sshServer.clientsMutex.Lock()
  1200. client := sshServer.clients[sessionID]
  1201. sshServer.clientsMutex.Unlock()
  1202. if client == nil {
  1203. return
  1204. }
  1205. // sshClient.handshakeState.authorizedAccessTypes is not cleared. Clearing
  1206. // authorizedAccessTypes may cause sshClient.logTunnel to fail to log
  1207. // access types. As the revocation may be due to legitimate use of an
  1208. // authorization in multiple sessions by a single client, useful metrics
  1209. // would be lost.
  1210. client.Lock()
  1211. client.handshakeState.authorizationsRevoked = true
  1212. client.Unlock()
  1213. // Select and apply new traffic rules, as filtered by the client's new
  1214. // authorization state.
  1215. client.setTrafficRules()
  1216. }
  1217. func (sshServer *sshServer) stopClients() {
  1218. sshServer.clientsMutex.Lock()
  1219. sshServer.stoppingClients = true
  1220. clients := sshServer.clients
  1221. sshServer.clients = make(map[string]*sshClient)
  1222. sshServer.clientsMutex.Unlock()
  1223. for _, client := range clients {
  1224. client.stop()
  1225. }
  1226. }
  1227. func (sshServer *sshServer) handleClient(
  1228. sshListener *sshListener,
  1229. conn net.Conn,
  1230. transportData *additionalTransportData) {
  1231. // overrideTunnelProtocol sets the tunnel protocol to a value other than
  1232. // the listener tunnel protocol. This is used in fronted meek
  1233. // configuration, where a single HTTPS listener also handles fronted HTTP
  1234. // and QUIC traffic; and in the protocol demux case.
  1235. tunnelProtocol := sshListener.tunnelProtocol
  1236. if transportData != nil && transportData.overrideTunnelProtocol != "" {
  1237. tunnelProtocol = transportData.overrideTunnelProtocol
  1238. }
  1239. // Calling conn.RemoteAddr at this point, before any Read calls,
  1240. // satisfies the constraint documented in tapdance.Listen.
  1241. peerAddr := conn.RemoteAddr()
  1242. // Check if there were irregularities during the network connection
  1243. // establishment. When present, log and then behave as Obfuscated SSH does
  1244. // when the client fails to provide a valid seed message.
  1245. //
  1246. // One concrete irregular case is failure to send a PROXY protocol header for
  1247. // TAPDANCE-OSSH.
  1248. if indicator, ok := conn.(common.IrregularIndicator); ok {
  1249. tunnelErr := indicator.IrregularTunnelError()
  1250. if tunnelErr != nil {
  1251. logIrregularTunnel(
  1252. sshServer.support,
  1253. sshListener.tunnelProtocol,
  1254. sshListener.port,
  1255. common.IPAddressFromAddr(peerAddr),
  1256. errors.Trace(tunnelErr),
  1257. nil)
  1258. var afterFunc *time.Timer
  1259. if sshServer.support.Config.sshHandshakeTimeout > 0 {
  1260. afterFunc = time.AfterFunc(sshServer.support.Config.sshHandshakeTimeout, func() {
  1261. conn.Close()
  1262. })
  1263. }
  1264. _, _ = io.Copy(ioutil.Discard, conn)
  1265. conn.Close()
  1266. afterFunc.Stop()
  1267. return
  1268. }
  1269. }
  1270. // Get any packet manipulation values from GetAppliedSpecName as soon as
  1271. // possible due to the expiring TTL.
  1272. //
  1273. // In the case of in-proxy tunnel protocols, the remote address will be
  1274. // the proxy, not the client, and GeoIP targeted packet manipulation will
  1275. // apply to the 2nd hop.
  1276. serverPacketManipulation := ""
  1277. replayedServerPacketManipulation := false
  1278. if sshServer.support.Config.RunPacketManipulator &&
  1279. protocol.TunnelProtocolMayUseServerPacketManipulation(tunnelProtocol) {
  1280. // A meekConn has synthetic address values, including the original client
  1281. // address in cases where the client uses an upstream proxy to connect to
  1282. // Psiphon. For meekConn, and any other conn implementing
  1283. // UnderlyingTCPAddrSource, get the underlying TCP connection addresses.
  1284. //
  1285. // Limitation: a meek tunnel may consist of several TCP connections. The
  1286. // server_packet_manipulation metric will reflect the packet manipulation
  1287. // applied to the _first_ TCP connection only.
  1288. var localAddr, remoteAddr *net.TCPAddr
  1289. var ok bool
  1290. underlying, ok := conn.(common.UnderlyingTCPAddrSource)
  1291. if ok {
  1292. localAddr, remoteAddr, ok = underlying.GetUnderlyingTCPAddrs()
  1293. } else {
  1294. localAddr, ok = conn.LocalAddr().(*net.TCPAddr)
  1295. if ok {
  1296. remoteAddr, ok = conn.RemoteAddr().(*net.TCPAddr)
  1297. }
  1298. }
  1299. if ok {
  1300. specName, extraData, err := sshServer.support.PacketManipulator.
  1301. GetAppliedSpecName(localAddr, remoteAddr)
  1302. if err == nil {
  1303. serverPacketManipulation = specName
  1304. replayedServerPacketManipulation, _ = extraData.(bool)
  1305. }
  1306. }
  1307. }
  1308. // For in-proxy tunnel protocols, accepted client GeoIP reflects the proxy
  1309. // address, not the client.
  1310. peerGeoIPData := sshServer.support.GeoIPService.Lookup(
  1311. common.IPAddressFromAddr(peerAddr))
  1312. sshServer.registerAcceptedClient(tunnelProtocol, peerGeoIPData.Country)
  1313. defer sshServer.unregisterAcceptedClient(tunnelProtocol, peerGeoIPData.Country)
  1314. // When configured, enforce a cap on the number of concurrent SSH
  1315. // handshakes. This limits load spikes on busy servers when many clients
  1316. // attempt to connect at once. Wait a short time, SSH_BEGIN_HANDSHAKE_TIMEOUT,
  1317. // to acquire; waiting will avoid immediately creating more load on another
  1318. // server in the network when the client tries a new candidate. Disconnect the
  1319. // client when that wait time is exceeded.
  1320. //
  1321. // This mechanism limits memory allocations and CPU usage associated with the
  1322. // SSH handshake. At this point, new direct TCP connections or new meek
  1323. // connections, with associated resource usage, are already established. Those
  1324. // connections are expected to be rate or load limited using other mechanisms.
  1325. //
  1326. // TODO:
  1327. //
  1328. // - deduct time spent acquiring the semaphore from SSH_HANDSHAKE_TIMEOUT in
  1329. // sshClient.run, since the client is also applying an SSH handshake timeout
  1330. // and won't exclude time spent waiting.
  1331. // - each call to sshServer.handleClient (in sshServer.runListener) is invoked
  1332. // in its own goroutine, but shutdown doesn't synchronously await these
  1333. // goroutnes. Once this is synchronizes, the following context.WithTimeout
  1334. // should use an sshServer parent context to ensure blocking acquires
  1335. // interrupt immediately upon shutdown.
  1336. var onSSHHandshakeFinished func()
  1337. if sshServer.support.Config.MaxConcurrentSSHHandshakes > 0 {
  1338. ctx, cancelFunc := context.WithTimeout(
  1339. context.Background(),
  1340. sshServer.support.Config.sshBeginHandshakeTimeout)
  1341. defer cancelFunc()
  1342. err := sshServer.concurrentSSHHandshakes.Acquire(ctx, 1)
  1343. if err != nil {
  1344. conn.Close()
  1345. // This is a debug log as the only possible error is context timeout.
  1346. log.WithTraceFields(LogFields{"error": err}).Debug(
  1347. "acquire SSH handshake semaphore failed")
  1348. return
  1349. }
  1350. onSSHHandshakeFinished = func() {
  1351. sshServer.concurrentSSHHandshakes.Release(1)
  1352. }
  1353. }
  1354. sshClient := newSshClient(
  1355. sshServer,
  1356. sshListener,
  1357. tunnelProtocol,
  1358. transportData,
  1359. serverPacketManipulation,
  1360. replayedServerPacketManipulation,
  1361. peerAddr,
  1362. peerGeoIPData)
  1363. // sshClient.run _must_ call onSSHHandshakeFinished to release the semaphore:
  1364. // in any error case; or, as soon as the SSH handshake phase has successfully
  1365. // completed.
  1366. sshClient.run(conn, onSSHHandshakeFinished)
  1367. }
  1368. func (sshServer *sshServer) monitorPortForwardDialError(err error) {
  1369. // "err" is the error returned from a failed TCP or UDP port
  1370. // forward dial. Certain system error codes indicate low resource
  1371. // conditions: insufficient file descriptors, ephemeral ports, or
  1372. // memory. For these cases, log an alert.
  1373. // TODO: also temporarily suspend new clients
  1374. // Note: don't log net.OpError.Error() as the full error string
  1375. // may contain client destination addresses.
  1376. opErr, ok := err.(*net.OpError)
  1377. if ok {
  1378. if opErr.Err == syscall.EADDRNOTAVAIL ||
  1379. opErr.Err == syscall.EAGAIN ||
  1380. opErr.Err == syscall.ENOMEM ||
  1381. opErr.Err == syscall.EMFILE ||
  1382. opErr.Err == syscall.ENFILE {
  1383. log.WithTraceFields(
  1384. LogFields{"error": opErr.Err}).Error(
  1385. "port forward dial failed due to unavailable resource")
  1386. }
  1387. }
  1388. }
  1389. // tunnelProtocolUsesTLSDemux returns true if the server demultiplexes the given
  1390. // protocol and TLS-OSSH over the same port.
  1391. func (sshServer *sshServer) tunnelProtocolUsesTLSDemux(tunnelProtocol string) bool {
  1392. // Only use meek/TLS-OSSH demux if unfronted meek HTTPS with non-legacy
  1393. // passthrough, and not in-proxy.
  1394. if protocol.TunnelProtocolUsesMeekHTTPS(tunnelProtocol) &&
  1395. !protocol.TunnelProtocolUsesFrontedMeek(tunnelProtocol) &&
  1396. !protocol.TunnelProtocolUsesInproxy(tunnelProtocol) {
  1397. _, passthroughEnabled := sshServer.support.Config.TunnelProtocolPassthroughAddresses[tunnelProtocol]
  1398. return passthroughEnabled && !sshServer.support.Config.LegacyPassthrough
  1399. }
  1400. return false
  1401. }
  1402. // setGeoIPSessionCache adds the sessionID/geoIPData pair to the session
  1403. // cache. This value will not expire; the caller must call
  1404. // markGeoIPSessionCacheToExpire to initiate expiry. Calling
  1405. // setGeoIPSessionCache for an existing sessionID will replace the previous
  1406. // value and reset any expiry.
  1407. func (sshServer *sshServer) setGeoIPSessionCache(sessionID string, geoIPData GeoIPData) {
  1408. sshServer.geoIPSessionCache.Set(sessionID, geoIPData, cache.NoExpiration)
  1409. }
  1410. // markGeoIPSessionCacheToExpire initiates expiry for an existing session
  1411. // cache entry, if the session ID is found in the cache. Concurrency note:
  1412. // setGeoIPSessionCache and markGeoIPSessionCacheToExpire should not be
  1413. // called concurrently for a single session ID.
  1414. func (sshServer *sshServer) markGeoIPSessionCacheToExpire(sessionID string) {
  1415. geoIPData, found := sshServer.geoIPSessionCache.Get(sessionID)
  1416. // Note: potential race condition between Get and Set. In practice,
  1417. // the tunnel server won't clobber a SetSessionCache value by calling
  1418. // MarkSessionCacheToExpire concurrently.
  1419. if found {
  1420. sshServer.geoIPSessionCache.Set(sessionID, geoIPData, cache.DefaultExpiration)
  1421. }
  1422. }
  1423. // getGeoIPSessionCache returns the cached GeoIPData for the specified session
  1424. // ID; a blank GeoIPData is returned if the session ID is not found in the
  1425. // cache.
  1426. func (sshServer *sshServer) getGeoIPSessionCache(sessionID string) GeoIPData {
  1427. geoIPData, found := sshServer.geoIPSessionCache.Get(sessionID)
  1428. if !found {
  1429. return NewGeoIPData()
  1430. }
  1431. return geoIPData.(GeoIPData)
  1432. }
  1433. // inGeoIPSessionCache returns whether the session ID is present in the
  1434. // session cache.
  1435. func (sshServer *sshServer) inGeoIPSessionCache(sessionID string) bool {
  1436. _, found := sshServer.geoIPSessionCache.Get(sessionID)
  1437. return found
  1438. }
  1439. type sshClient struct {
  1440. sync.Mutex
  1441. sshServer *sshServer
  1442. sshListener *sshListener
  1443. tunnelProtocol string
  1444. isInproxyTunnelProtocol bool
  1445. additionalTransportData *additionalTransportData
  1446. sshConn ssh.Conn
  1447. throttledConn *common.ThrottledConn
  1448. serverPacketManipulation string
  1449. replayedServerPacketManipulation bool
  1450. peerAddr net.Addr
  1451. peerGeoIPData GeoIPData
  1452. clientGeoIPData GeoIPData
  1453. sessionID string
  1454. isFirstTunnelInSession bool
  1455. supportsServerRequests bool
  1456. handshakeState handshakeState
  1457. udpgwChannelHandler *udpgwPortForwardMultiplexer
  1458. totalUdpgwChannelCount int
  1459. packetTunnelChannel ssh.Channel
  1460. totalPacketTunnelChannelCount int
  1461. trafficRules TrafficRules
  1462. tcpTrafficState trafficState
  1463. udpTrafficState trafficState
  1464. qualityMetrics *qualityMetrics
  1465. tcpPortForwardLRU *common.LRUConns
  1466. oslClientSeedState *osl.ClientSeedState
  1467. signalIssueSLOKs chan struct{}
  1468. runCtx context.Context
  1469. stopRunning context.CancelFunc
  1470. stopped chan struct{}
  1471. tcpPortForwardDialingAvailableSignal context.CancelFunc
  1472. releaseAuthorizations func()
  1473. stopTimer *time.Timer
  1474. preHandshakeRandomStreamMetrics randomStreamMetrics
  1475. postHandshakeRandomStreamMetrics randomStreamMetrics
  1476. sendAlertRequests chan protocol.AlertRequest
  1477. sentAlertRequests map[string]bool
  1478. peakMetrics peakMetrics
  1479. destinationBytesMetrics map[string]*protocolDestinationBytesMetrics
  1480. }
  1481. type trafficState struct {
  1482. bytesUp int64
  1483. bytesDown int64
  1484. concurrentDialingPortForwardCount int64
  1485. peakConcurrentDialingPortForwardCount int64
  1486. concurrentPortForwardCount int64
  1487. peakConcurrentPortForwardCount int64
  1488. totalPortForwardCount int64
  1489. availablePortForwardCond *sync.Cond
  1490. }
  1491. type randomStreamMetrics struct {
  1492. count int64
  1493. upstreamBytes int64
  1494. receivedUpstreamBytes int64
  1495. downstreamBytes int64
  1496. sentDownstreamBytes int64
  1497. }
  1498. type peakMetrics struct {
  1499. concurrentProximateAcceptedClients *int64
  1500. concurrentProximateEstablishedClients *int64
  1501. TCPPortForwardFailureRate *float64
  1502. TCPPortForwardFailureRateSampleSize *int64
  1503. DNSFailureRate *float64
  1504. DNSFailureRateSampleSize *int64
  1505. }
  1506. // qualityMetrics records upstream TCP dial attempts and
  1507. // elapsed time. Elapsed time includes the full TCP handshake
  1508. // and, in aggregate, is a measure of the quality of the
  1509. // upstream link. These stats are recorded by each sshClient
  1510. // and then reported and reset in sshServer.getLoadStats().
  1511. type qualityMetrics struct {
  1512. TCPPortForwardDialedCount int64
  1513. TCPPortForwardDialedDuration time.Duration
  1514. TCPPortForwardFailedCount int64
  1515. TCPPortForwardFailedDuration time.Duration
  1516. TCPPortForwardRejectedDialingLimitCount int64
  1517. TCPPortForwardRejectedDisallowedCount int64
  1518. UDPPortForwardRejectedDisallowedCount int64
  1519. TCPIPv4PortForwardDialedCount int64
  1520. TCPIPv4PortForwardDialedDuration time.Duration
  1521. TCPIPv4PortForwardFailedCount int64
  1522. TCPIPv4PortForwardFailedDuration time.Duration
  1523. TCPIPv6PortForwardDialedCount int64
  1524. TCPIPv6PortForwardDialedDuration time.Duration
  1525. TCPIPv6PortForwardFailedCount int64
  1526. TCPIPv6PortForwardFailedDuration time.Duration
  1527. DNSCount map[string]int64
  1528. DNSDuration map[string]time.Duration
  1529. DNSFailedCount map[string]int64
  1530. DNSFailedDuration map[string]time.Duration
  1531. }
  1532. func newQualityMetrics() *qualityMetrics {
  1533. return &qualityMetrics{
  1534. DNSCount: make(map[string]int64),
  1535. DNSDuration: make(map[string]time.Duration),
  1536. DNSFailedCount: make(map[string]int64),
  1537. DNSFailedDuration: make(map[string]time.Duration),
  1538. }
  1539. }
  1540. func (q *qualityMetrics) reset() {
  1541. q.TCPPortForwardDialedCount = 0
  1542. q.TCPPortForwardDialedDuration = 0
  1543. q.TCPPortForwardFailedCount = 0
  1544. q.TCPPortForwardFailedDuration = 0
  1545. q.TCPPortForwardRejectedDialingLimitCount = 0
  1546. q.TCPPortForwardRejectedDisallowedCount = 0
  1547. q.UDPPortForwardRejectedDisallowedCount = 0
  1548. q.TCPIPv4PortForwardDialedCount = 0
  1549. q.TCPIPv4PortForwardDialedDuration = 0
  1550. q.TCPIPv4PortForwardFailedCount = 0
  1551. q.TCPIPv4PortForwardFailedDuration = 0
  1552. q.TCPIPv6PortForwardDialedCount = 0
  1553. q.TCPIPv6PortForwardDialedDuration = 0
  1554. q.TCPIPv6PortForwardFailedCount = 0
  1555. q.TCPIPv6PortForwardFailedDuration = 0
  1556. // Retain existing maps to avoid memory churn. The Go compiler optimizes map
  1557. // clearing operations of the following form.
  1558. for k := range q.DNSCount {
  1559. delete(q.DNSCount, k)
  1560. }
  1561. for k := range q.DNSDuration {
  1562. delete(q.DNSDuration, k)
  1563. }
  1564. for k := range q.DNSFailedCount {
  1565. delete(q.DNSFailedCount, k)
  1566. }
  1567. for k := range q.DNSFailedDuration {
  1568. delete(q.DNSFailedDuration, k)
  1569. }
  1570. }
  1571. type handshakeStateInfo struct {
  1572. activeAuthorizationIDs []string
  1573. authorizedAccessTypes []string
  1574. upstreamBytesPerSecond int64
  1575. downstreamBytesPerSecond int64
  1576. steeringIP string
  1577. }
  1578. type handshakeState struct {
  1579. completed bool
  1580. apiProtocol string
  1581. apiParams common.APIParameters
  1582. activeAuthorizationIDs []string
  1583. authorizedAccessTypes []string
  1584. authorizationsRevoked bool
  1585. domainBytesChecksum []byte
  1586. establishedTunnelsCount int
  1587. splitTunnelLookup *splitTunnelLookup
  1588. deviceRegion string
  1589. newTacticsTag string
  1590. inproxyClientIP string
  1591. inproxyClientGeoIPData GeoIPData
  1592. inproxyRelayLogFields common.LogFields
  1593. }
  1594. type protocolDestinationBytesMetrics struct {
  1595. tcpMetrics destinationBytesMetrics
  1596. udpMetrics destinationBytesMetrics
  1597. }
  1598. type destinationBytesMetrics struct {
  1599. bytesUp int64
  1600. bytesDown int64
  1601. }
  1602. func (d *destinationBytesMetrics) UpdateProgress(
  1603. downstreamBytes, upstreamBytes, _ int64) {
  1604. // Concurrency: UpdateProgress may be called without holding the sshClient
  1605. // lock; all accesses to bytesUp/bytesDown must use atomic operations.
  1606. atomic.AddInt64(&d.bytesUp, upstreamBytes)
  1607. atomic.AddInt64(&d.bytesDown, downstreamBytes)
  1608. }
  1609. func (d *destinationBytesMetrics) getBytesUp() int64 {
  1610. return atomic.LoadInt64(&d.bytesUp)
  1611. }
  1612. func (d *destinationBytesMetrics) getBytesDown() int64 {
  1613. return atomic.LoadInt64(&d.bytesDown)
  1614. }
  1615. type splitTunnelLookup struct {
  1616. regions []string
  1617. regionsLookup map[string]bool
  1618. }
  1619. func newSplitTunnelLookup(
  1620. ownRegion string,
  1621. otherRegions []string) (*splitTunnelLookup, error) {
  1622. length := len(otherRegions)
  1623. if ownRegion != "" {
  1624. length += 1
  1625. }
  1626. // This length check is a sanity check and prevents clients shipping
  1627. // excessively long lists which could impact performance.
  1628. if length > 250 {
  1629. return nil, errors.Tracef("too many regions: %d", length)
  1630. }
  1631. // Create map lookups for lists where the number of values to compare
  1632. // against exceeds a threshold where benchmarks show maps are faster than
  1633. // looping through a slice. Otherwise use a slice for lookups. In both
  1634. // cases, the input slice is no longer referenced.
  1635. if length >= stringLookupThreshold {
  1636. regionsLookup := make(map[string]bool)
  1637. if ownRegion != "" {
  1638. regionsLookup[ownRegion] = true
  1639. }
  1640. for _, region := range otherRegions {
  1641. regionsLookup[region] = true
  1642. }
  1643. return &splitTunnelLookup{
  1644. regionsLookup: regionsLookup,
  1645. }, nil
  1646. } else {
  1647. regions := []string{}
  1648. if ownRegion != "" && !common.Contains(otherRegions, ownRegion) {
  1649. regions = append(regions, ownRegion)
  1650. }
  1651. // TODO: check for other duplicate regions?
  1652. regions = append(regions, otherRegions...)
  1653. return &splitTunnelLookup{
  1654. regions: regions,
  1655. }, nil
  1656. }
  1657. }
  1658. func (lookup *splitTunnelLookup) lookup(region string) bool {
  1659. if lookup.regionsLookup != nil {
  1660. return lookup.regionsLookup[region]
  1661. } else {
  1662. return common.Contains(lookup.regions, region)
  1663. }
  1664. }
  1665. func newSshClient(
  1666. sshServer *sshServer,
  1667. sshListener *sshListener,
  1668. tunnelProtocol string,
  1669. transportData *additionalTransportData,
  1670. serverPacketManipulation string,
  1671. replayedServerPacketManipulation bool,
  1672. peerAddr net.Addr,
  1673. peerGeoIPData GeoIPData) *sshClient {
  1674. runCtx, stopRunning := context.WithCancel(context.Background())
  1675. // isFirstTunnelInSession is defaulted to true so that the pre-handshake
  1676. // traffic rules won't apply UnthrottleFirstTunnelOnly and negate any
  1677. // unthrottled bytes during the initial protocol negotiation.
  1678. client := &sshClient{
  1679. sshServer: sshServer,
  1680. sshListener: sshListener,
  1681. tunnelProtocol: tunnelProtocol,
  1682. isInproxyTunnelProtocol: protocol.TunnelProtocolUsesInproxy(tunnelProtocol),
  1683. additionalTransportData: transportData,
  1684. serverPacketManipulation: serverPacketManipulation,
  1685. replayedServerPacketManipulation: replayedServerPacketManipulation,
  1686. peerAddr: peerAddr,
  1687. isFirstTunnelInSession: true,
  1688. qualityMetrics: newQualityMetrics(),
  1689. tcpPortForwardLRU: common.NewLRUConns(),
  1690. signalIssueSLOKs: make(chan struct{}, 1),
  1691. runCtx: runCtx,
  1692. stopRunning: stopRunning,
  1693. stopped: make(chan struct{}),
  1694. sendAlertRequests: make(chan protocol.AlertRequest, ALERT_REQUEST_QUEUE_BUFFER_SIZE),
  1695. sentAlertRequests: make(map[string]bool),
  1696. }
  1697. client.tcpTrafficState.availablePortForwardCond = sync.NewCond(new(sync.Mutex))
  1698. client.udpTrafficState.availablePortForwardCond = sync.NewCond(new(sync.Mutex))
  1699. // In the case of in-proxy tunnel protocols, clientGeoIPData is not set
  1700. // until the original client IP is relayed from the broker during the
  1701. // handshake. In other cases, clientGeoIPData is the peerGeoIPData
  1702. // (this includes fronted meek).
  1703. client.peerGeoIPData = peerGeoIPData
  1704. if !client.isInproxyTunnelProtocol {
  1705. client.clientGeoIPData = peerGeoIPData
  1706. }
  1707. return client
  1708. }
  1709. // getClientGeoIPData gets sshClient.clientGeoIPData. Use this helper when
  1710. // accessing this field without already holding a lock on the sshClient
  1711. // mutex. Unlike older code and unlike with client.peerGeoIPData,
  1712. // sshClient.clientGeoIPData is not static and may get set during the
  1713. // handshake, and it is not safe to access it without a lock.
  1714. func (sshClient *sshClient) getClientGeoIPData() GeoIPData {
  1715. sshClient.Lock()
  1716. defer sshClient.Unlock()
  1717. return sshClient.clientGeoIPData
  1718. }
  1719. func (sshClient *sshClient) run(
  1720. baseConn net.Conn, onSSHHandshakeFinished func()) {
  1721. // When run returns, the client has fully stopped, with all SSH state torn
  1722. // down and no port forwards or API requests in progress.
  1723. defer close(sshClient.stopped)
  1724. // onSSHHandshakeFinished must be called even if the SSH handshake is aborted.
  1725. defer func() {
  1726. if onSSHHandshakeFinished != nil {
  1727. onSSHHandshakeFinished()
  1728. }
  1729. }()
  1730. // Set initial traffic rules, pre-handshake, based on currently known info.
  1731. sshClient.setTrafficRules()
  1732. conn := baseConn
  1733. // Wrap the base client connection with an ActivityMonitoredConn which will
  1734. // terminate the connection if no data is received before the deadline. This
  1735. // timeout is in effect for the entire duration of the SSH connection. Clients
  1736. // must actively use the connection or send SSH keep alive requests to keep
  1737. // the connection active. Writes are not considered reliable activity indicators
  1738. // due to buffering.
  1739. activityConn, err := common.NewActivityMonitoredConn(
  1740. conn,
  1741. SSH_CONNECTION_READ_DEADLINE,
  1742. false,
  1743. nil)
  1744. if err != nil {
  1745. conn.Close()
  1746. if !isExpectedTunnelIOError(err) {
  1747. log.WithTraceFields(LogFields{"error": err}).Error("NewActivityMonitoredConn failed")
  1748. }
  1749. return
  1750. }
  1751. conn = activityConn
  1752. // Further wrap the connection with burst monitoring, when enabled.
  1753. //
  1754. // Limitations:
  1755. //
  1756. // - Burst parameters are fixed for the duration of the tunnel and do not
  1757. // change after a tactics hot reload.
  1758. //
  1759. // - In the case of in-proxy tunnel protocols, the original client IP is
  1760. // not yet known, and so burst monitoring GeoIP targeting uses the peer
  1761. // IP, which is the proxy, not the client.
  1762. var burstConn *common.BurstMonitoredConn
  1763. p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.peerGeoIPData)
  1764. if err != nil {
  1765. log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
  1766. "ServerTacticsParametersCache.Get failed")
  1767. return
  1768. }
  1769. if !p.IsNil() {
  1770. upstreamTargetBytes := int64(p.Int(parameters.ServerBurstUpstreamTargetBytes))
  1771. upstreamDeadline := p.Duration(parameters.ServerBurstUpstreamDeadline)
  1772. downstreamTargetBytes := int64(p.Int(parameters.ServerBurstDownstreamTargetBytes))
  1773. downstreamDeadline := p.Duration(parameters.ServerBurstDownstreamDeadline)
  1774. if (upstreamDeadline != 0 && upstreamTargetBytes != 0) ||
  1775. (downstreamDeadline != 0 && downstreamTargetBytes != 0) {
  1776. burstConn = common.NewBurstMonitoredConn(
  1777. conn,
  1778. true,
  1779. upstreamTargetBytes, upstreamDeadline,
  1780. downstreamTargetBytes, downstreamDeadline)
  1781. conn = burstConn
  1782. }
  1783. }
  1784. // Allow garbage collection.
  1785. p.Close()
  1786. // Further wrap the connection in a rate limiting ThrottledConn. The
  1787. // underlying dialConn is always a stream, even when the network conn
  1788. // uses UDP.
  1789. throttledConn := common.NewThrottledConn(conn, true, sshClient.rateLimits())
  1790. conn = throttledConn
  1791. // Replay of server-side parameters is set or extended after a new tunnel
  1792. // meets duration and bytes transferred targets. Set a timer now that expires
  1793. // shortly after the target duration. When the timer fires, check the time of
  1794. // last byte read (a read indicating a live connection with the client),
  1795. // along with total bytes transferred and set or extend replay if the targets
  1796. // are met.
  1797. //
  1798. // Both target checks are conservative: the tunnel may be healthy, but a byte
  1799. // may not have been read in the last second when the timer fires. Or bytes
  1800. // may be transferring, but not at the target level. Only clients that meet
  1801. // the strict targets at the single check time will trigger replay; however,
  1802. // this replay will impact all clients with similar GeoIP data.
  1803. //
  1804. // A deferred function cancels the timer and also increments the replay
  1805. // failure counter, which will ultimately clear replay parameters, when the
  1806. // tunnel fails before the API handshake is completed (this includes any
  1807. // liveness test).
  1808. //
  1809. // A tunnel which fails to meet the targets but successfully completes any
  1810. // liveness test and the API handshake is ignored in terms of replay scoring.
  1811. //
  1812. // In the case of in-proxy tunnel protocols, the peer address will be the
  1813. // proxy, not the client, and GeoIP targeted replay will apply to the 2nd
  1814. // hop.
  1815. isReplayCandidate, replayWaitDuration, replayTargetDuration :=
  1816. sshClient.sshServer.support.ReplayCache.GetReplayTargetDuration(sshClient.peerGeoIPData)
  1817. if isReplayCandidate {
  1818. getFragmentorSeed := func() *prng.Seed {
  1819. fragmentor, ok := baseConn.(common.FragmentorAccessor)
  1820. if ok {
  1821. fragmentorSeed, _ := fragmentor.GetReplay()
  1822. return fragmentorSeed
  1823. }
  1824. return nil
  1825. }
  1826. setReplayAfterFunc := time.AfterFunc(
  1827. replayWaitDuration,
  1828. func() {
  1829. if activityConn.GetActiveDuration() >= replayTargetDuration {
  1830. sshClient.Lock()
  1831. bytesUp := sshClient.tcpTrafficState.bytesUp + sshClient.udpTrafficState.bytesUp
  1832. bytesDown := sshClient.tcpTrafficState.bytesDown + sshClient.udpTrafficState.bytesDown
  1833. sshClient.Unlock()
  1834. sshClient.sshServer.support.ReplayCache.SetReplayParameters(
  1835. sshClient.tunnelProtocol,
  1836. sshClient.peerGeoIPData,
  1837. sshClient.serverPacketManipulation,
  1838. getFragmentorSeed(),
  1839. bytesUp,
  1840. bytesDown)
  1841. }
  1842. })
  1843. defer func() {
  1844. setReplayAfterFunc.Stop()
  1845. completed, _ := sshClient.getHandshaked()
  1846. if !completed {
  1847. // Count a replay failure case when a tunnel used replay parameters
  1848. // (excluding OSSH fragmentation, which doesn't use the ReplayCache) and
  1849. // failed to complete the API handshake.
  1850. replayedFragmentation := false
  1851. if sshClient.tunnelProtocol != protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH {
  1852. fragmentor, ok := baseConn.(common.FragmentorAccessor)
  1853. if ok {
  1854. _, replayedFragmentation = fragmentor.GetReplay()
  1855. }
  1856. }
  1857. usedReplay := replayedFragmentation || sshClient.replayedServerPacketManipulation
  1858. if usedReplay {
  1859. sshClient.sshServer.support.ReplayCache.FailedReplayParameters(
  1860. sshClient.tunnelProtocol,
  1861. sshClient.peerGeoIPData,
  1862. sshClient.serverPacketManipulation,
  1863. getFragmentorSeed())
  1864. }
  1865. }
  1866. }()
  1867. }
  1868. // Run the initial [obfuscated] SSH handshake in a goroutine so we can both
  1869. // respect shutdownBroadcast and implement a specific handshake timeout.
  1870. // The timeout is to reclaim network resources in case the handshake takes
  1871. // too long.
  1872. type sshNewServerConnResult struct {
  1873. obfuscatedSSHConn *obfuscator.ObfuscatedSSHConn
  1874. sshConn *ssh.ServerConn
  1875. channels <-chan ssh.NewChannel
  1876. requests <-chan *ssh.Request
  1877. err error
  1878. }
  1879. resultChannel := make(chan *sshNewServerConnResult, 2)
  1880. var sshHandshakeAfterFunc *time.Timer
  1881. if sshClient.sshServer.support.Config.sshHandshakeTimeout > 0 {
  1882. sshHandshakeAfterFunc = time.AfterFunc(sshClient.sshServer.support.Config.sshHandshakeTimeout, func() {
  1883. resultChannel <- &sshNewServerConnResult{err: std_errors.New("ssh handshake timeout")}
  1884. })
  1885. }
  1886. go func(baseConn, conn net.Conn) {
  1887. sshServerConfig := &ssh.ServerConfig{
  1888. PasswordCallback: sshClient.passwordCallback,
  1889. AuthLogCallback: sshClient.authLogCallback,
  1890. ServerVersion: sshClient.sshServer.support.Config.SSHServerVersion,
  1891. }
  1892. sshServerConfig.AddHostKey(sshClient.sshServer.sshHostKey)
  1893. var err error
  1894. if protocol.TunnelProtocolUsesObfuscatedSSH(sshClient.tunnelProtocol) {
  1895. // With Encrypt-then-MAC hash algorithms, packet length is
  1896. // transmitted in plaintext, which aids in traffic analysis;
  1897. // clients may still send Encrypt-then-MAC algorithms in their
  1898. // KEX_INIT message, but do not select these algorithms.
  1899. //
  1900. // The exception is TUNNEL_PROTOCOL_SSH, which is intended to appear
  1901. // like SSH on the wire.
  1902. sshServerConfig.NoEncryptThenMACHash = true
  1903. } else {
  1904. // For TUNNEL_PROTOCOL_SSH only, randomize KEX.
  1905. if sshClient.sshServer.support.Config.ObfuscatedSSHKey != "" {
  1906. sshServerConfig.KEXPRNGSeed, err = protocol.DeriveSSHServerKEXPRNGSeed(
  1907. sshClient.sshServer.support.Config.ObfuscatedSSHKey)
  1908. if err != nil {
  1909. err = errors.Trace(err)
  1910. }
  1911. }
  1912. }
  1913. result := &sshNewServerConnResult{}
  1914. // Wrap the connection in an SSH deobfuscator when required.
  1915. if err == nil && protocol.TunnelProtocolUsesObfuscatedSSH(sshClient.tunnelProtocol) {
  1916. // In the case of in-proxy tunnel protocols, the peer address will
  1917. // be the proxy, not the client, and GeoIP targeted server-side
  1918. // OSSH tactics, including prefixes, will apply to the 2nd hop.
  1919. //
  1920. // It is recommended to set ServerOSSHPrefixSpecs, etc., in default
  1921. // tactics.
  1922. var p parameters.ParametersAccessor
  1923. p, err = sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.peerGeoIPData)
  1924. // Log error, but continue. A default prefix spec will be used by the server.
  1925. if err != nil {
  1926. log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
  1927. "ServerTacticsParametersCache.Get failed")
  1928. }
  1929. var osshPrefixEnableFragmentor bool = false
  1930. var serverOsshPrefixSpecs transforms.Specs = nil
  1931. var minDelay, maxDelay time.Duration
  1932. if !p.IsNil() {
  1933. osshPrefixEnableFragmentor = p.Bool(parameters.OSSHPrefixEnableFragmentor)
  1934. serverOsshPrefixSpecs = p.ProtocolTransformSpecs(parameters.ServerOSSHPrefixSpecs)
  1935. minDelay = p.Duration(parameters.OSSHPrefixSplitMinDelay)
  1936. maxDelay = p.Duration(parameters.OSSHPrefixSplitMaxDelay)
  1937. // Allow garbage collection.
  1938. p.Close()
  1939. }
  1940. // Note: NewServerObfuscatedSSHConn blocks on network I/O
  1941. // TODO: ensure this won't block shutdown
  1942. result.obfuscatedSSHConn, err = obfuscator.NewServerObfuscatedSSHConn(
  1943. conn,
  1944. sshClient.sshServer.support.Config.ObfuscatedSSHKey,
  1945. sshClient.sshServer.obfuscatorSeedHistory,
  1946. serverOsshPrefixSpecs,
  1947. func(peerIP string, err error, logFields common.LogFields) {
  1948. logIrregularTunnel(
  1949. sshClient.sshServer.support,
  1950. sshClient.sshListener.tunnelProtocol,
  1951. sshClient.sshListener.port,
  1952. peerIP,
  1953. errors.Trace(err),
  1954. LogFields(logFields))
  1955. })
  1956. if err != nil {
  1957. err = errors.Trace(err)
  1958. } else {
  1959. conn = result.obfuscatedSSHConn
  1960. }
  1961. // Set the OSSH prefix split config.
  1962. if err == nil && result.obfuscatedSSHConn.IsOSSHPrefixStream() {
  1963. err = result.obfuscatedSSHConn.SetOSSHPrefixSplitConfig(minDelay, maxDelay)
  1964. // Log error, but continue.
  1965. if err != nil {
  1966. log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
  1967. "SetOSSHPrefixSplitConfig failed")
  1968. }
  1969. }
  1970. // Seed the fragmentor, when present, with seed derived from initial
  1971. // obfuscator message. See tactics.Listener.Accept. This must preceed
  1972. // ssh.NewServerConn to ensure fragmentor is seeded before downstream bytes
  1973. // are written.
  1974. if err == nil && protocol.TunnelProtocolIsObfuscatedSSH(sshClient.tunnelProtocol) {
  1975. fragmentor, ok := baseConn.(common.FragmentorAccessor)
  1976. if ok {
  1977. var fragmentorPRNG *prng.PRNG
  1978. fragmentorPRNG, err = result.obfuscatedSSHConn.GetDerivedPRNG("server-side-fragmentor")
  1979. if err != nil {
  1980. err = errors.Trace(err)
  1981. } else {
  1982. fragmentor.SetReplay(fragmentorPRNG)
  1983. }
  1984. // Stops the fragmentor if disabled for prefixed OSSH streams.
  1985. if !osshPrefixEnableFragmentor && result.obfuscatedSSHConn.IsOSSHPrefixStream() {
  1986. fragmentor.StopFragmenting()
  1987. }
  1988. }
  1989. }
  1990. }
  1991. if err == nil {
  1992. result.sshConn, result.channels, result.requests, err =
  1993. ssh.NewServerConn(conn, sshServerConfig)
  1994. if err != nil {
  1995. err = errors.Trace(err)
  1996. }
  1997. }
  1998. result.err = err
  1999. resultChannel <- result
  2000. }(baseConn, conn)
  2001. var result *sshNewServerConnResult
  2002. select {
  2003. case result = <-resultChannel:
  2004. case <-sshClient.sshServer.shutdownBroadcast:
  2005. // Close() will interrupt an ongoing handshake
  2006. // TODO: wait for SSH handshake goroutines to exit before returning?
  2007. conn.Close()
  2008. return
  2009. }
  2010. if sshHandshakeAfterFunc != nil {
  2011. sshHandshakeAfterFunc.Stop()
  2012. }
  2013. if result.err != nil {
  2014. conn.Close()
  2015. // This is a Debug log due to noise. The handshake often fails due to I/O
  2016. // errors as clients frequently interrupt connections in progress when
  2017. // client-side load balancing completes a connection to a different server.
  2018. log.WithTraceFields(LogFields{"error": result.err}).Debug("SSH handshake failed")
  2019. return
  2020. }
  2021. // The SSH handshake has finished successfully; notify now to allow other
  2022. // blocked SSH handshakes to proceed.
  2023. if onSSHHandshakeFinished != nil {
  2024. onSSHHandshakeFinished()
  2025. }
  2026. onSSHHandshakeFinished = nil
  2027. sshClient.Lock()
  2028. sshClient.sshConn = result.sshConn
  2029. sshClient.throttledConn = throttledConn
  2030. sshClient.Unlock()
  2031. if !sshClient.sshServer.registerEstablishedClient(sshClient) {
  2032. conn.Close()
  2033. log.WithTrace().Warning("register failed")
  2034. return
  2035. }
  2036. sshClient.runTunnel(result.channels, result.requests)
  2037. // Note: sshServer.unregisterEstablishedClient calls sshClient.stop(),
  2038. // which also closes underlying transport Conn.
  2039. sshClient.sshServer.unregisterEstablishedClient(sshClient)
  2040. // Log tunnel metrics.
  2041. var additionalMetrics []LogFields
  2042. // Add activity and burst metrics.
  2043. //
  2044. // The reported duration is based on last confirmed data transfer, which for
  2045. // sshClient.activityConn.GetActiveDuration() is time of last read byte and
  2046. // not conn close time. This is important for protocols such as meek. For
  2047. // meek, the connection remains open until the HTTP session expires, which
  2048. // may be some time after the tunnel has closed. (The meek protocol has no
  2049. // allowance for signalling payload EOF, and even if it did the client may
  2050. // not have the opportunity to send a final request with an EOF flag set.)
  2051. activityMetrics := make(LogFields)
  2052. activityMetrics["start_time"] = activityConn.GetStartTime()
  2053. activityMetrics["duration"] = int64(activityConn.GetActiveDuration() / time.Millisecond)
  2054. additionalMetrics = append(additionalMetrics, activityMetrics)
  2055. if burstConn != nil {
  2056. // Any outstanding burst should be recorded by burstConn.Close which should
  2057. // be called by unregisterEstablishedClient.
  2058. additionalMetrics = append(
  2059. additionalMetrics, LogFields(burstConn.GetMetrics(activityConn.GetStartTime())))
  2060. }
  2061. // Some conns report additional metrics. Meek conns report resiliency
  2062. // metrics and fragmentor.Conns report fragmentor configs.
  2063. if metricsSource, ok := baseConn.(common.MetricsSource); ok {
  2064. additionalMetrics = append(
  2065. additionalMetrics, LogFields(metricsSource.GetMetrics()))
  2066. }
  2067. if result.obfuscatedSSHConn != nil {
  2068. additionalMetrics = append(
  2069. additionalMetrics, LogFields(result.obfuscatedSSHConn.GetMetrics()))
  2070. }
  2071. // Add server-replay metrics.
  2072. replayMetrics := make(LogFields)
  2073. replayedFragmentation := false
  2074. fragmentor, ok := baseConn.(common.FragmentorAccessor)
  2075. if ok {
  2076. _, replayedFragmentation = fragmentor.GetReplay()
  2077. }
  2078. replayMetrics["server_replay_fragmentation"] = replayedFragmentation
  2079. replayMetrics["server_replay_packet_manipulation"] = sshClient.replayedServerPacketManipulation
  2080. additionalMetrics = append(additionalMetrics, replayMetrics)
  2081. // Limitation: there's only one log per tunnel with bytes transferred
  2082. // metrics, so the byte count can't be attributed to a certain day for
  2083. // tunnels that remain connected for well over 24h. In practise, most
  2084. // tunnels are short-lived, especially on mobile devices.
  2085. sshClient.logTunnel(additionalMetrics)
  2086. // Transfer OSL seed state -- the OSL progress -- from the closing
  2087. // client to the session cache so the client can resume its progress
  2088. // if it reconnects to this same server.
  2089. // Note: following setOSLConfig order of locking.
  2090. sshClient.Lock()
  2091. if sshClient.oslClientSeedState != nil {
  2092. sshClient.sshServer.oslSessionCacheMutex.Lock()
  2093. sshClient.oslClientSeedState.Hibernate()
  2094. sshClient.sshServer.oslSessionCache.Set(
  2095. sshClient.sessionID, sshClient.oslClientSeedState, cache.DefaultExpiration)
  2096. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  2097. sshClient.oslClientSeedState = nil
  2098. }
  2099. sshClient.Unlock()
  2100. // Set the GeoIP session cache to expire; up to this point, the entry for
  2101. // this session ID has no expiry; retaining entries after the tunnel
  2102. // disconnects supports first-tunnel-in-session and duplicate
  2103. // authorization logic.
  2104. sshClient.sshServer.markGeoIPSessionCacheToExpire(sshClient.sessionID)
  2105. }
  2106. func (sshClient *sshClient) passwordCallback(conn ssh.ConnMetadata, password []byte) (*ssh.Permissions, error) {
  2107. expectedSessionIDLength := 2 * protocol.PSIPHON_API_CLIENT_SESSION_ID_LENGTH
  2108. expectedSSHPasswordLength := 2 * SSH_PASSWORD_BYTE_LENGTH
  2109. var sshPasswordPayload protocol.SSHPasswordPayload
  2110. err := json.Unmarshal(password, &sshPasswordPayload)
  2111. if err != nil {
  2112. // Backwards compatibility case: instead of a JSON payload, older clients
  2113. // send the hex encoded session ID prepended to the SSH password.
  2114. // Note: there's an even older case where clients don't send any session ID,
  2115. // but that's no longer supported.
  2116. if len(password) == expectedSessionIDLength+expectedSSHPasswordLength {
  2117. sshPasswordPayload.SessionId = string(password[0:expectedSessionIDLength])
  2118. sshPasswordPayload.SshPassword = string(password[expectedSessionIDLength:])
  2119. } else {
  2120. return nil, errors.Tracef("invalid password payload for %q", conn.User())
  2121. }
  2122. }
  2123. if !isHexDigits(sshClient.sshServer.support.Config, sshPasswordPayload.SessionId) ||
  2124. len(sshPasswordPayload.SessionId) != expectedSessionIDLength {
  2125. return nil, errors.Tracef("invalid session ID for %q", conn.User())
  2126. }
  2127. userOk := (subtle.ConstantTimeCompare(
  2128. []byte(conn.User()), []byte(sshClient.sshServer.support.Config.SSHUserName)) == 1)
  2129. passwordOk := (subtle.ConstantTimeCompare(
  2130. []byte(sshPasswordPayload.SshPassword), []byte(sshClient.sshServer.support.Config.SSHPassword)) == 1)
  2131. if !userOk || !passwordOk {
  2132. return nil, errors.Tracef("invalid password for %q", conn.User())
  2133. }
  2134. sessionID := sshPasswordPayload.SessionId
  2135. // The GeoIP session cache will be populated if there was a previous tunnel
  2136. // with this session ID. This will be true up to GEOIP_SESSION_CACHE_TTL.
  2137. isFirstTunnelInSession := !sshClient.sshServer.inGeoIPSessionCache(sessionID)
  2138. supportsServerRequests := common.Contains(
  2139. sshPasswordPayload.ClientCapabilities, protocol.CLIENT_CAPABILITY_SERVER_REQUESTS)
  2140. sshClient.Lock()
  2141. // After this point, these values are read-only as they are read
  2142. // without obtaining sshClient.Lock.
  2143. sshClient.sessionID = sessionID
  2144. sshClient.isFirstTunnelInSession = isFirstTunnelInSession
  2145. sshClient.supportsServerRequests = supportsServerRequests
  2146. sshClient.Unlock()
  2147. // Initially, in the case of in-proxy tunnel protocols, the GeoIP session
  2148. // cache entry will be the proxy's GeoIPData. This is updated to be the
  2149. // client's GeoIPData in setHandshakeState.
  2150. sshClient.sshServer.setGeoIPSessionCache(sessionID, sshClient.peerGeoIPData)
  2151. return nil, nil
  2152. }
  2153. func (sshClient *sshClient) authLogCallback(conn ssh.ConnMetadata, method string, err error) {
  2154. if err != nil {
  2155. if method == "none" && err.Error() == "ssh: no auth passed yet" {
  2156. // In this case, the callback invocation is noise from auth negotiation
  2157. return
  2158. }
  2159. // Note: here we previously logged messages for fail2ban to act on. This is no longer
  2160. // done as the complexity outweighs the benefits.
  2161. //
  2162. // - The SSH credential is not secret -- it's in the server entry. Attackers targeting
  2163. // the server likely already have the credential. On the other hand, random scanning and
  2164. // brute forcing is mitigated with high entropy random passwords, rate limiting
  2165. // (implemented on the host via iptables), and limited capabilities (the SSH session can
  2166. // only port forward).
  2167. //
  2168. // - fail2ban coverage was inconsistent; in the case of an unfronted meek protocol through
  2169. // an upstream proxy, the remote address is the upstream proxy, which should not be blocked.
  2170. // The X-Forwarded-For header cant be used instead as it may be forged and used to get IPs
  2171. // deliberately blocked; and in any case fail2ban adds iptables rules which can only block
  2172. // by direct remote IP, not by original client IP. Fronted meek has the same iptables issue.
  2173. //
  2174. // Random scanning and brute forcing of port 22 will result in log noise. To mitigate this,
  2175. // not every authentication failure is logged. A summary log is emitted periodically to
  2176. // retain some record of this activity in case this is relevant to, e.g., a performance
  2177. // investigation.
  2178. atomic.AddInt64(&sshClient.sshServer.authFailedCount, 1)
  2179. lastAuthLog := monotime.Time(atomic.LoadInt64(&sshClient.sshServer.lastAuthLog))
  2180. if monotime.Since(lastAuthLog) > SSH_AUTH_LOG_PERIOD {
  2181. now := int64(monotime.Now())
  2182. if atomic.CompareAndSwapInt64(&sshClient.sshServer.lastAuthLog, int64(lastAuthLog), now) {
  2183. count := atomic.SwapInt64(&sshClient.sshServer.authFailedCount, 0)
  2184. log.WithTraceFields(
  2185. LogFields{"lastError": err, "failedCount": count}).Warning("authentication failures")
  2186. }
  2187. }
  2188. log.WithTraceFields(LogFields{"error": err, "method": method}).Debug("authentication failed")
  2189. } else {
  2190. log.WithTraceFields(LogFields{"error": err, "method": method}).Debug("authentication success")
  2191. }
  2192. }
  2193. // stop signals the ssh connection to shutdown. After sshConn.Wait returns,
  2194. // the SSH connection has terminated but sshClient.run may still be running and
  2195. // in the process of exiting.
  2196. //
  2197. // The shutdown process must complete rapidly and not, e.g., block on network
  2198. // I/O, as newly connecting clients need to await stop completion of any
  2199. // existing connection that shares the same session ID.
  2200. func (sshClient *sshClient) stop() {
  2201. _ = sshClient.sshConn.Close()
  2202. _ = sshClient.sshConn.Wait()
  2203. }
  2204. // awaitStopped will block until sshClient.run has exited, at which point all
  2205. // worker goroutines associated with the sshClient, including any in-flight
  2206. // API handlers, will have exited.
  2207. func (sshClient *sshClient) awaitStopped() {
  2208. <-sshClient.stopped
  2209. }
  2210. // runTunnel handles/dispatches new channels and new requests from the client.
  2211. // When the SSH client connection closes, both the channels and requests channels
  2212. // will close and runTunnel will exit.
  2213. func (sshClient *sshClient) runTunnel(
  2214. channels <-chan ssh.NewChannel,
  2215. requests <-chan *ssh.Request) {
  2216. waitGroup := new(sync.WaitGroup)
  2217. // Start client SSH API request handler
  2218. waitGroup.Add(1)
  2219. go func() {
  2220. defer waitGroup.Done()
  2221. sshClient.handleSSHRequests(requests)
  2222. }()
  2223. // Start request senders
  2224. if sshClient.supportsServerRequests {
  2225. waitGroup.Add(1)
  2226. go func() {
  2227. defer waitGroup.Done()
  2228. sshClient.runOSLSender()
  2229. }()
  2230. waitGroup.Add(1)
  2231. go func() {
  2232. defer waitGroup.Done()
  2233. sshClient.runAlertSender()
  2234. }()
  2235. }
  2236. // Start the TCP port forward manager
  2237. // The queue size is set to the traffic rules (MaxTCPPortForwardCount +
  2238. // MaxTCPDialingPortForwardCount), which is a reasonable indication of resource
  2239. // limits per client; when that value is not set, a default is used.
  2240. // A limitation: this queue size is set once and doesn't change, for this client,
  2241. // when traffic rules are reloaded.
  2242. queueSize := sshClient.getTCPPortForwardQueueSize()
  2243. if queueSize == 0 {
  2244. queueSize = SSH_TCP_PORT_FORWARD_QUEUE_SIZE
  2245. }
  2246. newTCPPortForwards := make(chan *newTCPPortForward, queueSize)
  2247. waitGroup.Add(1)
  2248. go func() {
  2249. defer waitGroup.Done()
  2250. sshClient.handleTCPPortForwards(waitGroup, newTCPPortForwards)
  2251. }()
  2252. // Handle new channel (port forward) requests from the client.
  2253. for newChannel := range channels {
  2254. switch newChannel.ChannelType() {
  2255. case protocol.RANDOM_STREAM_CHANNEL_TYPE:
  2256. sshClient.handleNewRandomStreamChannel(waitGroup, newChannel)
  2257. case protocol.PACKET_TUNNEL_CHANNEL_TYPE:
  2258. sshClient.handleNewPacketTunnelChannel(waitGroup, newChannel)
  2259. case protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE:
  2260. // The protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE is the same as
  2261. // "direct-tcpip", except split tunnel channel rejections are disallowed
  2262. // even if the client has enabled split tunnel. This channel type allows
  2263. // the client to ensure tunneling for certain cases while split tunnel is
  2264. // enabled.
  2265. sshClient.handleNewTCPPortForwardChannel(waitGroup, newChannel, false, newTCPPortForwards)
  2266. case "direct-tcpip":
  2267. sshClient.handleNewTCPPortForwardChannel(waitGroup, newChannel, true, newTCPPortForwards)
  2268. default:
  2269. sshClient.rejectNewChannel(newChannel,
  2270. fmt.Sprintf("unknown or unsupported channel type: %s", newChannel.ChannelType()))
  2271. }
  2272. }
  2273. // The channel loop is interrupted by a client
  2274. // disconnect or by calling sshClient.stop().
  2275. // Stop the TCP port forward manager
  2276. close(newTCPPortForwards)
  2277. // Stop all other worker goroutines
  2278. sshClient.stopRunning()
  2279. if sshClient.sshServer.support.Config.RunPacketTunnel {
  2280. // PacketTunnelServer.ClientDisconnected stops packet tunnel workers.
  2281. sshClient.sshServer.support.PacketTunnelServer.ClientDisconnected(
  2282. sshClient.sessionID)
  2283. }
  2284. waitGroup.Wait()
  2285. sshClient.cleanupAuthorizations()
  2286. }
  2287. func (sshClient *sshClient) handleSSHRequests(requests <-chan *ssh.Request) {
  2288. for request := range requests {
  2289. // Requests are processed serially; API responses must be sent in request order.
  2290. var responsePayload []byte
  2291. var err error
  2292. if request.Type == "keepalive@openssh.com" {
  2293. // SSH keep alive round trips are used as speed test samples.
  2294. responsePayload, err = tactics.MakeSpeedTestResponse(
  2295. SSH_KEEP_ALIVE_PAYLOAD_MIN_BYTES, SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES)
  2296. } else {
  2297. // All other requests are assumed to be API requests.
  2298. responsePayload, err = sshAPIRequestHandler(
  2299. sshClient.sshServer.support,
  2300. sshClient,
  2301. request.Type,
  2302. request.Payload)
  2303. }
  2304. if err == nil {
  2305. err = request.Reply(true, responsePayload)
  2306. } else {
  2307. log.WithTraceFields(LogFields{"error": err}).Warning("request failed")
  2308. err = request.Reply(false, nil)
  2309. }
  2310. if err != nil {
  2311. if !isExpectedTunnelIOError(err) {
  2312. log.WithTraceFields(LogFields{"error": err}).Warning("response failed")
  2313. }
  2314. }
  2315. }
  2316. }
  2317. type newTCPPortForward struct {
  2318. enqueueTime time.Time
  2319. hostToConnect string
  2320. portToConnect int
  2321. doSplitTunnel bool
  2322. newChannel ssh.NewChannel
  2323. }
  2324. func (sshClient *sshClient) handleTCPPortForwards(
  2325. waitGroup *sync.WaitGroup,
  2326. newTCPPortForwards chan *newTCPPortForward) {
  2327. // Lifecycle of a TCP port forward:
  2328. //
  2329. // 1. A "direct-tcpip" SSH request is received from the client.
  2330. //
  2331. // A new TCP port forward request is enqueued. The queue delivers TCP port
  2332. // forward requests to the TCP port forward manager, which enforces the TCP
  2333. // port forward dial limit.
  2334. //
  2335. // Enqueuing new requests allows for reading further SSH requests from the
  2336. // client without blocking when the dial limit is hit; this is to permit new
  2337. // UDP/udpgw port forwards to be restablished without delay. The maximum size
  2338. // of the queue enforces a hard cap on resources consumed by a client in the
  2339. // pre-dial phase. When the queue is full, new TCP port forwards are
  2340. // immediately rejected.
  2341. //
  2342. // 2. The TCP port forward manager dequeues the request.
  2343. //
  2344. // The manager calls dialingTCPPortForward(), which increments
  2345. // concurrentDialingPortForwardCount, and calls
  2346. // isTCPDialingPortForwardLimitExceeded() to check the concurrent dialing
  2347. // count.
  2348. //
  2349. // The manager enforces the concurrent TCP dial limit: when at the limit, the
  2350. // manager blocks waiting for the number of dials to drop below the limit before
  2351. // dispatching the request to handleTCPChannel(), which will run in its own
  2352. // goroutine and will dial and relay the port forward.
  2353. //
  2354. // The block delays the current request and also halts dequeuing of subsequent
  2355. // requests and could ultimately cause requests to be immediately rejected if
  2356. // the queue fills. These actions are intended to apply back pressure when
  2357. // upstream network resources are impaired.
  2358. //
  2359. // The time spent in the queue is deducted from the port forward's dial timeout.
  2360. // The time spent blocking while at the dial limit is similarly deducted from
  2361. // the dial timeout. If the dial timeout has expired before the dial begins, the
  2362. // port forward is rejected and a stat is recorded.
  2363. //
  2364. // 3. handleTCPChannel() performs the port forward dial and relaying.
  2365. //
  2366. // a. Dial the target, using the dial timeout remaining after queue and blocking
  2367. // time is deducted.
  2368. //
  2369. // b. If the dial fails, call abortedTCPPortForward() to decrement
  2370. // concurrentDialingPortForwardCount, freeing up a dial slot.
  2371. //
  2372. // c. If the dial succeeds, call establishedPortForward(), which decrements
  2373. // concurrentDialingPortForwardCount and increments concurrentPortForwardCount,
  2374. // the "established" port forward count.
  2375. //
  2376. // d. Check isPortForwardLimitExceeded(), which enforces the configurable limit on
  2377. // concurrentPortForwardCount, the number of _established_ TCP port forwards.
  2378. // If the limit is exceeded, the LRU established TCP port forward is closed and
  2379. // the newly established TCP port forward proceeds. This LRU logic allows some
  2380. // dangling resource consumption (e.g., TIME_WAIT) while providing a better
  2381. // experience for clients.
  2382. //
  2383. // e. Relay data.
  2384. //
  2385. // f. Call closedPortForward() which decrements concurrentPortForwardCount and
  2386. // records bytes transferred.
  2387. for newPortForward := range newTCPPortForwards {
  2388. remainingDialTimeout :=
  2389. time.Duration(sshClient.getDialTCPPortForwardTimeoutMilliseconds())*time.Millisecond -
  2390. time.Since(newPortForward.enqueueTime)
  2391. if remainingDialTimeout <= 0 {
  2392. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  2393. sshClient.rejectNewChannel(
  2394. newPortForward.newChannel, "TCP port forward timed out in queue")
  2395. continue
  2396. }
  2397. // Reserve a TCP dialing slot.
  2398. //
  2399. // TOCTOU note: important to increment counts _before_ checking limits; otherwise,
  2400. // the client could potentially consume excess resources by initiating many port
  2401. // forwards concurrently.
  2402. sshClient.dialingTCPPortForward()
  2403. // When max dials are in progress, wait up to remainingDialTimeout for dialing
  2404. // to become available. This blocks all dequeing.
  2405. if sshClient.isTCPDialingPortForwardLimitExceeded() {
  2406. blockStartTime := time.Now()
  2407. ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  2408. sshClient.setTCPPortForwardDialingAvailableSignal(cancelCtx)
  2409. <-ctx.Done()
  2410. sshClient.setTCPPortForwardDialingAvailableSignal(nil)
  2411. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  2412. remainingDialTimeout -= time.Since(blockStartTime)
  2413. }
  2414. if remainingDialTimeout <= 0 {
  2415. // Release the dialing slot here since handleTCPChannel() won't be called.
  2416. sshClient.abortedTCPPortForward()
  2417. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  2418. sshClient.rejectNewChannel(
  2419. newPortForward.newChannel, "TCP port forward timed out before dialing")
  2420. continue
  2421. }
  2422. // Dial and relay the TCP port forward. handleTCPChannel is run in its own worker goroutine.
  2423. // handleTCPChannel will release the dialing slot reserved by dialingTCPPortForward(); and
  2424. // will deal with remainingDialTimeout <= 0.
  2425. waitGroup.Add(1)
  2426. go func(remainingDialTimeout time.Duration, newPortForward *newTCPPortForward) {
  2427. defer waitGroup.Done()
  2428. sshClient.handleTCPChannel(
  2429. remainingDialTimeout,
  2430. newPortForward.hostToConnect,
  2431. newPortForward.portToConnect,
  2432. newPortForward.doSplitTunnel,
  2433. newPortForward.newChannel)
  2434. }(remainingDialTimeout, newPortForward)
  2435. }
  2436. }
  2437. func (sshClient *sshClient) handleNewRandomStreamChannel(
  2438. waitGroup *sync.WaitGroup, newChannel ssh.NewChannel) {
  2439. // A random stream channel returns the requested number of bytes -- random
  2440. // bytes -- to the client while also consuming and discarding bytes sent
  2441. // by the client.
  2442. //
  2443. // One use case for the random stream channel is a liveness test that the
  2444. // client performs to confirm that the tunnel is live. As the liveness
  2445. // test is performed in the concurrent establishment phase, before
  2446. // selecting a single candidate for handshake, the random stream channel
  2447. // is available pre-handshake, albeit with additional restrictions.
  2448. //
  2449. // The random stream is subject to throttling in traffic rules; for
  2450. // unthrottled liveness tests, set EstablishmentRead/WriteBytesPerSecond as
  2451. // required. The random stream maximum count and response size cap mitigate
  2452. // clients abusing the facility to waste server resources.
  2453. //
  2454. // Like all other channels, this channel type is handled asynchronously,
  2455. // so it's possible to run at any point in the tunnel lifecycle.
  2456. //
  2457. // Up/downstream byte counts don't include SSH packet and request
  2458. // marshalling overhead.
  2459. var request protocol.RandomStreamRequest
  2460. err := json.Unmarshal(newChannel.ExtraData(), &request)
  2461. if err != nil {
  2462. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("invalid request: %s", err))
  2463. return
  2464. }
  2465. if request.UpstreamBytes > RANDOM_STREAM_MAX_BYTES {
  2466. sshClient.rejectNewChannel(newChannel,
  2467. fmt.Sprintf("invalid upstream bytes: %d", request.UpstreamBytes))
  2468. return
  2469. }
  2470. if request.DownstreamBytes > RANDOM_STREAM_MAX_BYTES {
  2471. sshClient.rejectNewChannel(newChannel,
  2472. fmt.Sprintf("invalid downstream bytes: %d", request.DownstreamBytes))
  2473. return
  2474. }
  2475. var metrics *randomStreamMetrics
  2476. sshClient.Lock()
  2477. if !sshClient.handshakeState.completed {
  2478. metrics = &sshClient.preHandshakeRandomStreamMetrics
  2479. } else {
  2480. metrics = &sshClient.postHandshakeRandomStreamMetrics
  2481. }
  2482. countOk := true
  2483. if !sshClient.handshakeState.completed &&
  2484. metrics.count >= PRE_HANDSHAKE_RANDOM_STREAM_MAX_COUNT {
  2485. countOk = false
  2486. } else {
  2487. metrics.count++
  2488. }
  2489. sshClient.Unlock()
  2490. if !countOk {
  2491. sshClient.rejectNewChannel(newChannel, "max count exceeded")
  2492. return
  2493. }
  2494. channel, requests, err := newChannel.Accept()
  2495. if err != nil {
  2496. if !isExpectedTunnelIOError(err) {
  2497. log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")
  2498. }
  2499. return
  2500. }
  2501. go ssh.DiscardRequests(requests)
  2502. waitGroup.Add(1)
  2503. go func() {
  2504. defer waitGroup.Done()
  2505. upstream := new(sync.WaitGroup)
  2506. received := 0
  2507. sent := 0
  2508. if request.UpstreamBytes > 0 {
  2509. // Process streams concurrently to minimize elapsed time. This also
  2510. // avoids a unidirectional flow burst early in the tunnel lifecycle.
  2511. upstream.Add(1)
  2512. go func() {
  2513. defer upstream.Done()
  2514. n, err := io.CopyN(ioutil.Discard, channel, int64(request.UpstreamBytes))
  2515. received = int(n)
  2516. if err != nil {
  2517. if !isExpectedTunnelIOError(err) {
  2518. log.WithTraceFields(LogFields{"error": err}).Warning("receive failed")
  2519. }
  2520. }
  2521. }()
  2522. }
  2523. if request.DownstreamBytes > 0 {
  2524. n, err := io.CopyN(channel, rand.Reader, int64(request.DownstreamBytes))
  2525. sent = int(n)
  2526. if err != nil {
  2527. if !isExpectedTunnelIOError(err) {
  2528. log.WithTraceFields(LogFields{"error": err}).Warning("send failed")
  2529. }
  2530. }
  2531. }
  2532. upstream.Wait()
  2533. sshClient.Lock()
  2534. metrics.upstreamBytes += int64(request.UpstreamBytes)
  2535. metrics.receivedUpstreamBytes += int64(received)
  2536. metrics.downstreamBytes += int64(request.DownstreamBytes)
  2537. metrics.sentDownstreamBytes += int64(sent)
  2538. sshClient.Unlock()
  2539. channel.Close()
  2540. }()
  2541. }
  2542. func (sshClient *sshClient) handleNewPacketTunnelChannel(
  2543. waitGroup *sync.WaitGroup, newChannel ssh.NewChannel) {
  2544. // packet tunnel channels are handled by the packet tunnel server
  2545. // component. Each client may have at most one packet tunnel channel.
  2546. if !sshClient.sshServer.support.Config.RunPacketTunnel {
  2547. sshClient.rejectNewChannel(newChannel, "unsupported packet tunnel channel type")
  2548. return
  2549. }
  2550. // Accept this channel immediately. This channel will replace any
  2551. // previously existing packet tunnel channel for this client.
  2552. packetTunnelChannel, requests, err := newChannel.Accept()
  2553. if err != nil {
  2554. if !isExpectedTunnelIOError(err) {
  2555. log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")
  2556. }
  2557. return
  2558. }
  2559. go ssh.DiscardRequests(requests)
  2560. sshClient.setPacketTunnelChannel(packetTunnelChannel)
  2561. // PacketTunnelServer will run the client's packet tunnel. If necessary, ClientConnected
  2562. // will stop packet tunnel workers for any previous packet tunnel channel.
  2563. checkAllowedTCPPortFunc := func(upstreamIPAddress net.IP, port int) bool {
  2564. return sshClient.isPortForwardPermitted(portForwardTypeTCP, upstreamIPAddress, port)
  2565. }
  2566. checkAllowedUDPPortFunc := func(upstreamIPAddress net.IP, port int) bool {
  2567. return sshClient.isPortForwardPermitted(portForwardTypeUDP, upstreamIPAddress, port)
  2568. }
  2569. checkAllowedDomainFunc := func(domain string) bool {
  2570. ok, _ := sshClient.isDomainPermitted(domain)
  2571. return ok
  2572. }
  2573. flowActivityUpdaterMaker := func(
  2574. isTCP bool, upstreamHostname string, upstreamIPAddress net.IP) []tun.FlowActivityUpdater {
  2575. trafficType := portForwardTypeTCP
  2576. if !isTCP {
  2577. trafficType = portForwardTypeUDP
  2578. }
  2579. activityUpdaters := sshClient.getActivityUpdaters(trafficType, upstreamIPAddress)
  2580. flowUpdaters := make([]tun.FlowActivityUpdater, len(activityUpdaters))
  2581. for i, activityUpdater := range activityUpdaters {
  2582. flowUpdaters[i] = activityUpdater
  2583. }
  2584. return flowUpdaters
  2585. }
  2586. metricUpdater := func(
  2587. TCPApplicationBytesDown, TCPApplicationBytesUp,
  2588. UDPApplicationBytesDown, UDPApplicationBytesUp int64) {
  2589. sshClient.Lock()
  2590. sshClient.tcpTrafficState.bytesDown += TCPApplicationBytesDown
  2591. sshClient.tcpTrafficState.bytesUp += TCPApplicationBytesUp
  2592. sshClient.udpTrafficState.bytesDown += UDPApplicationBytesDown
  2593. sshClient.udpTrafficState.bytesUp += UDPApplicationBytesUp
  2594. sshClient.Unlock()
  2595. }
  2596. dnsQualityReporter := sshClient.updateQualityMetricsWithDNSResult
  2597. err = sshClient.sshServer.support.PacketTunnelServer.ClientConnected(
  2598. sshClient.sessionID,
  2599. packetTunnelChannel,
  2600. checkAllowedTCPPortFunc,
  2601. checkAllowedUDPPortFunc,
  2602. checkAllowedDomainFunc,
  2603. flowActivityUpdaterMaker,
  2604. metricUpdater,
  2605. dnsQualityReporter)
  2606. if err != nil {
  2607. log.WithTraceFields(LogFields{"error": err}).Warning("start packet tunnel client failed")
  2608. sshClient.setPacketTunnelChannel(nil)
  2609. }
  2610. }
  2611. func (sshClient *sshClient) handleNewTCPPortForwardChannel(
  2612. waitGroup *sync.WaitGroup,
  2613. newChannel ssh.NewChannel,
  2614. allowSplitTunnel bool,
  2615. newTCPPortForwards chan *newTCPPortForward) {
  2616. // udpgw client connections are dispatched immediately (clients use this for
  2617. // DNS, so it's essential to not block; and only one udpgw connection is
  2618. // retained at a time).
  2619. //
  2620. // All other TCP port forwards are dispatched via the TCP port forward
  2621. // manager queue.
  2622. // http://tools.ietf.org/html/rfc4254#section-7.2
  2623. var directTcpipExtraData struct {
  2624. HostToConnect string
  2625. PortToConnect uint32
  2626. OriginatorIPAddress string
  2627. OriginatorPort uint32
  2628. }
  2629. err := ssh.Unmarshal(newChannel.ExtraData(), &directTcpipExtraData)
  2630. if err != nil {
  2631. sshClient.rejectNewChannel(newChannel, "invalid extra data")
  2632. return
  2633. }
  2634. // Intercept TCP port forwards to a specified udpgw server and handle directly.
  2635. // TODO: also support UDP explicitly, e.g. with a custom "direct-udp" channel type?
  2636. isUdpgwChannel := sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress != "" &&
  2637. sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress ==
  2638. net.JoinHostPort(directTcpipExtraData.HostToConnect, strconv.Itoa(int(directTcpipExtraData.PortToConnect)))
  2639. if isUdpgwChannel {
  2640. // Dispatch immediately. handleUDPChannel runs the udpgw protocol in its
  2641. // own worker goroutine.
  2642. waitGroup.Add(1)
  2643. go func(channel ssh.NewChannel) {
  2644. defer waitGroup.Done()
  2645. sshClient.handleUdpgwChannel(channel)
  2646. }(newChannel)
  2647. } else {
  2648. // Dispatch via TCP port forward manager. When the queue is full, the channel
  2649. // is immediately rejected.
  2650. //
  2651. // Split tunnel logic is enabled for this TCP port forward when the client
  2652. // has enabled split tunnel mode and the channel type allows it.
  2653. doSplitTunnel := sshClient.handshakeState.splitTunnelLookup != nil && allowSplitTunnel
  2654. tcpPortForward := &newTCPPortForward{
  2655. enqueueTime: time.Now(),
  2656. hostToConnect: directTcpipExtraData.HostToConnect,
  2657. portToConnect: int(directTcpipExtraData.PortToConnect),
  2658. doSplitTunnel: doSplitTunnel,
  2659. newChannel: newChannel,
  2660. }
  2661. select {
  2662. case newTCPPortForwards <- tcpPortForward:
  2663. default:
  2664. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  2665. sshClient.rejectNewChannel(newChannel, "TCP port forward dial queue full")
  2666. }
  2667. }
  2668. }
  2669. func (sshClient *sshClient) cleanupAuthorizations() {
  2670. sshClient.Lock()
  2671. if sshClient.releaseAuthorizations != nil {
  2672. sshClient.releaseAuthorizations()
  2673. }
  2674. if sshClient.stopTimer != nil {
  2675. sshClient.stopTimer.Stop()
  2676. }
  2677. sshClient.Unlock()
  2678. }
  2679. // setPacketTunnelChannel sets the single packet tunnel channel
  2680. // for this sshClient. Any existing packet tunnel channel is
  2681. // closed.
  2682. func (sshClient *sshClient) setPacketTunnelChannel(channel ssh.Channel) {
  2683. sshClient.Lock()
  2684. if sshClient.packetTunnelChannel != nil {
  2685. sshClient.packetTunnelChannel.Close()
  2686. }
  2687. sshClient.packetTunnelChannel = channel
  2688. sshClient.totalPacketTunnelChannelCount += 1
  2689. sshClient.Unlock()
  2690. }
  2691. // setUdpgwChannelHandler sets the single udpgw channel handler for this
  2692. // sshClient. Each sshClient may have only one concurrent udpgw
  2693. // channel/handler. Each udpgw channel multiplexes many UDP port forwards via
  2694. // the udpgw protocol. Any existing udpgw channel/handler is closed.
  2695. func (sshClient *sshClient) setUdpgwChannelHandler(udpgwChannelHandler *udpgwPortForwardMultiplexer) bool {
  2696. sshClient.Lock()
  2697. if sshClient.udpgwChannelHandler != nil {
  2698. previousHandler := sshClient.udpgwChannelHandler
  2699. sshClient.udpgwChannelHandler = nil
  2700. // stop must be run without holding the sshClient mutex lock, as the
  2701. // udpgw goroutines may attempt to lock the same mutex. For example,
  2702. // udpgwPortForwardMultiplexer.run calls sshClient.establishedPortForward
  2703. // which calls sshClient.allocatePortForward.
  2704. sshClient.Unlock()
  2705. previousHandler.stop()
  2706. sshClient.Lock()
  2707. // In case some other channel has set the sshClient.udpgwChannelHandler
  2708. // in the meantime, fail. The caller should discard this channel/handler.
  2709. if sshClient.udpgwChannelHandler != nil {
  2710. sshClient.Unlock()
  2711. return false
  2712. }
  2713. }
  2714. sshClient.udpgwChannelHandler = udpgwChannelHandler
  2715. sshClient.totalUdpgwChannelCount += 1
  2716. sshClient.Unlock()
  2717. return true
  2718. }
  2719. var serverTunnelStatParams = append(
  2720. []requestParamSpec{
  2721. {"last_connected", isLastConnected, requestParamOptional},
  2722. {"establishment_duration", isIntString, requestParamOptional}},
  2723. baseAndDialParams...)
  2724. func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
  2725. sshClient.Lock()
  2726. // For in-proxy tunnel protocols, two sets of GeoIP fields are logged, one
  2727. // for the client and one for the proxy. The client GeoIP fields will
  2728. // be "None" if handshake did not complete.
  2729. logFields := getRequestLogFields(
  2730. "server_tunnel",
  2731. "",
  2732. sshClient.sessionID,
  2733. sshClient.clientGeoIPData,
  2734. sshClient.handshakeState.authorizedAccessTypes,
  2735. sshClient.handshakeState.apiParams,
  2736. serverTunnelStatParams)
  2737. if sshClient.isInproxyTunnelProtocol {
  2738. sshClient.peerGeoIPData.SetLogFieldsWithPrefix("", "inproxy_proxy", logFields)
  2739. logFields.Add(
  2740. LogFields(sshClient.handshakeState.inproxyRelayLogFields))
  2741. }
  2742. // new_tactics_tag indicates that the handshake returned new tactics.
  2743. if sshClient.handshakeState.newTacticsTag != "" {
  2744. logFields["new_tactics_tag"] = sshClient.handshakeState.newTacticsTag
  2745. }
  2746. // "relay_protocol" is sent with handshake API parameters. In pre-
  2747. // handshake logTunnel cases, this value is not yet known. As
  2748. // sshClient.tunnelProtocol is authoritative, set this value
  2749. // unconditionally, overwriting any value from handshake.
  2750. logFields["relay_protocol"] = sshClient.tunnelProtocol
  2751. if sshClient.serverPacketManipulation != "" {
  2752. logFields["server_packet_manipulation"] = sshClient.serverPacketManipulation
  2753. }
  2754. if sshClient.sshListener.BPFProgramName != "" {
  2755. logFields["server_bpf"] = sshClient.sshListener.BPFProgramName
  2756. }
  2757. logFields["is_first_tunnel_in_session"] = sshClient.isFirstTunnelInSession
  2758. logFields["handshake_completed"] = sshClient.handshakeState.completed
  2759. logFields["bytes_up_tcp"] = sshClient.tcpTrafficState.bytesUp
  2760. logFields["bytes_down_tcp"] = sshClient.tcpTrafficState.bytesDown
  2761. logFields["peak_concurrent_dialing_port_forward_count_tcp"] = sshClient.tcpTrafficState.peakConcurrentDialingPortForwardCount
  2762. logFields["peak_concurrent_port_forward_count_tcp"] = sshClient.tcpTrafficState.peakConcurrentPortForwardCount
  2763. logFields["total_port_forward_count_tcp"] = sshClient.tcpTrafficState.totalPortForwardCount
  2764. logFields["bytes_up_udp"] = sshClient.udpTrafficState.bytesUp
  2765. logFields["bytes_down_udp"] = sshClient.udpTrafficState.bytesDown
  2766. // sshClient.udpTrafficState.peakConcurrentDialingPortForwardCount isn't meaningful
  2767. logFields["peak_concurrent_port_forward_count_udp"] = sshClient.udpTrafficState.peakConcurrentPortForwardCount
  2768. logFields["total_port_forward_count_udp"] = sshClient.udpTrafficState.totalPortForwardCount
  2769. logFields["total_udpgw_channel_count"] = sshClient.totalUdpgwChannelCount
  2770. logFields["total_packet_tunnel_channel_count"] = sshClient.totalPacketTunnelChannelCount
  2771. logFields["pre_handshake_random_stream_count"] = sshClient.preHandshakeRandomStreamMetrics.count
  2772. logFields["pre_handshake_random_stream_upstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.upstreamBytes
  2773. logFields["pre_handshake_random_stream_received_upstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.receivedUpstreamBytes
  2774. logFields["pre_handshake_random_stream_downstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.downstreamBytes
  2775. logFields["pre_handshake_random_stream_sent_downstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.sentDownstreamBytes
  2776. logFields["random_stream_count"] = sshClient.postHandshakeRandomStreamMetrics.count
  2777. logFields["random_stream_upstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.upstreamBytes
  2778. logFields["random_stream_received_upstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.receivedUpstreamBytes
  2779. logFields["random_stream_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.downstreamBytes
  2780. logFields["random_stream_sent_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.sentDownstreamBytes
  2781. if sshClient.destinationBytesMetrics != nil {
  2782. // Only log destination bytes for ASNs that remain enabled in tactics.
  2783. //
  2784. // Any counts accumulated before DestinationBytesMetricsASN[s] changes
  2785. // are lost. At this time we can't change destination byte counting
  2786. // dynamically, after a tactics hot reload, as there may be
  2787. // destination bytes port forwards that were in place before the
  2788. // change, which will continue to count.
  2789. destinationBytesMetricsASNs := []string{}
  2790. destinationBytesMetricsASN := ""
  2791. if sshClient.sshServer.support.ServerTacticsParametersCache != nil {
  2792. // Target this using the client, not peer, GeoIP. In the case of
  2793. // in-proxy tunnel protocols, the client GeoIP fields will be None
  2794. // if the handshake does not complete. In that case, no bytes will
  2795. // have transferred.
  2796. p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.clientGeoIPData)
  2797. if err == nil && !p.IsNil() {
  2798. destinationBytesMetricsASNs = p.Strings(parameters.DestinationBytesMetricsASNs)
  2799. destinationBytesMetricsASN = p.String(parameters.DestinationBytesMetricsASN)
  2800. }
  2801. p.Close()
  2802. }
  2803. if destinationBytesMetricsASN != "" {
  2804. // Log any parameters.DestinationBytesMetricsASN data in the
  2805. // legacy log field format.
  2806. destinationBytesMetrics, ok :=
  2807. sshClient.destinationBytesMetrics[destinationBytesMetricsASN]
  2808. if ok {
  2809. bytesUpTCP := destinationBytesMetrics.tcpMetrics.getBytesUp()
  2810. bytesDownTCP := destinationBytesMetrics.tcpMetrics.getBytesDown()
  2811. bytesUpUDP := destinationBytesMetrics.udpMetrics.getBytesUp()
  2812. bytesDownUDP := destinationBytesMetrics.udpMetrics.getBytesDown()
  2813. logFields["dest_bytes_asn"] = destinationBytesMetricsASN
  2814. logFields["dest_bytes"] = bytesUpTCP + bytesDownTCP + bytesUpUDP + bytesDownUDP
  2815. logFields["dest_bytes_up_tcp"] = bytesUpTCP
  2816. logFields["dest_bytes_down_tcp"] = bytesDownTCP
  2817. logFields["dest_bytes_up_udp"] = bytesUpUDP
  2818. logFields["dest_bytes_down_udp"] = bytesDownUDP
  2819. }
  2820. }
  2821. if len(destinationBytesMetricsASNs) > 0 {
  2822. destBytes := make(map[string]int64)
  2823. destBytesUpTCP := make(map[string]int64)
  2824. destBytesDownTCP := make(map[string]int64)
  2825. destBytesUpUDP := make(map[string]int64)
  2826. destBytesDownUDP := make(map[string]int64)
  2827. for _, ASN := range destinationBytesMetricsASNs {
  2828. destinationBytesMetrics, ok :=
  2829. sshClient.destinationBytesMetrics[ASN]
  2830. if !ok {
  2831. continue
  2832. }
  2833. bytesUpTCP := destinationBytesMetrics.tcpMetrics.getBytesUp()
  2834. bytesDownTCP := destinationBytesMetrics.tcpMetrics.getBytesDown()
  2835. bytesUpUDP := destinationBytesMetrics.udpMetrics.getBytesUp()
  2836. bytesDownUDP := destinationBytesMetrics.udpMetrics.getBytesDown()
  2837. destBytes[ASN] = bytesUpTCP + bytesDownTCP + bytesUpUDP + bytesDownUDP
  2838. destBytesUpTCP[ASN] = bytesUpTCP
  2839. destBytesDownTCP[ASN] = bytesDownTCP
  2840. destBytesUpUDP[ASN] = bytesUpUDP
  2841. destBytesDownUDP[ASN] = bytesDownUDP
  2842. }
  2843. logFields["asn_dest_bytes"] = destBytes
  2844. logFields["asn_dest_bytes_up_tcp"] = destBytesUpTCP
  2845. logFields["asn_dest_bytes_down_tcp"] = destBytesDownTCP
  2846. logFields["asn_dest_bytes_up_udp"] = destBytesUpUDP
  2847. logFields["asn_dest_bytes_down_udp"] = destBytesDownUDP
  2848. }
  2849. }
  2850. // Only log fields for peakMetrics when there is data recorded, otherwise
  2851. // omit the field.
  2852. if sshClient.peakMetrics.concurrentProximateAcceptedClients != nil {
  2853. logFields["peak_concurrent_proximate_accepted_clients"] = *sshClient.peakMetrics.concurrentProximateAcceptedClients
  2854. }
  2855. if sshClient.peakMetrics.concurrentProximateEstablishedClients != nil {
  2856. logFields["peak_concurrent_proximate_established_clients"] = *sshClient.peakMetrics.concurrentProximateEstablishedClients
  2857. }
  2858. if sshClient.peakMetrics.TCPPortForwardFailureRate != nil && sshClient.peakMetrics.TCPPortForwardFailureRateSampleSize != nil {
  2859. logFields["peak_tcp_port_forward_failure_rate"] = *sshClient.peakMetrics.TCPPortForwardFailureRate
  2860. logFields["peak_tcp_port_forward_failure_rate_sample_size"] = *sshClient.peakMetrics.TCPPortForwardFailureRateSampleSize
  2861. }
  2862. if sshClient.peakMetrics.DNSFailureRate != nil && sshClient.peakMetrics.DNSFailureRateSampleSize != nil {
  2863. logFields["peak_dns_failure_rate"] = *sshClient.peakMetrics.DNSFailureRate
  2864. logFields["peak_dns_failure_rate_sample_size"] = *sshClient.peakMetrics.DNSFailureRateSampleSize
  2865. }
  2866. // Pre-calculate a total-tunneled-bytes field. This total is used
  2867. // extensively in analytics and is more performant when pre-calculated.
  2868. logFields["bytes"] = sshClient.tcpTrafficState.bytesUp +
  2869. sshClient.tcpTrafficState.bytesDown +
  2870. sshClient.udpTrafficState.bytesUp +
  2871. sshClient.udpTrafficState.bytesDown
  2872. // Merge in additional metrics from the optional metrics source
  2873. for _, metrics := range additionalMetrics {
  2874. for name, value := range metrics {
  2875. // Don't overwrite any basic fields
  2876. if logFields[name] == nil {
  2877. logFields[name] = value
  2878. }
  2879. }
  2880. }
  2881. if sshClient.additionalTransportData != nil &&
  2882. sshClient.additionalTransportData.steeringIP != "" {
  2883. logFields["relayed_steering_ip"] = sshClient.additionalTransportData.steeringIP
  2884. }
  2885. // Retain lock when invoking LogRawFieldsWithTimestamp to block any
  2886. // concurrent writes to variables referenced by logFields.
  2887. log.LogRawFieldsWithTimestamp(logFields)
  2888. sshClient.Unlock()
  2889. }
  2890. var blocklistHitsStatParams = []requestParamSpec{
  2891. {"propagation_channel_id", isHexDigits, 0},
  2892. {"sponsor_id", isHexDigits, 0},
  2893. {"client_version", isIntString, requestParamLogStringAsInt},
  2894. {"client_platform", isClientPlatform, 0},
  2895. {"client_features", isAnyString, requestParamOptional | requestParamArray},
  2896. {"client_build_rev", isHexDigits, requestParamOptional},
  2897. {"device_region", isAnyString, requestParamOptional},
  2898. {"device_location", isGeoHashString, requestParamOptional},
  2899. {"egress_region", isRegionCode, requestParamOptional},
  2900. {"last_connected", isLastConnected, requestParamOptional},
  2901. }
  2902. func (sshClient *sshClient) logBlocklistHits(IP net.IP, domain string, tags []BlocklistTag) {
  2903. sshClient.Lock()
  2904. // Log this using the client, not peer, GeoIP. In the case of in-proxy
  2905. // tunnel protocols, the client GeoIP fields will be None if the
  2906. // handshake does not complete. In that case, no port forwarding will
  2907. // occur and there will not be any blocklist hits.
  2908. logFields := getRequestLogFields(
  2909. "server_blocklist_hit",
  2910. "",
  2911. sshClient.sessionID,
  2912. sshClient.clientGeoIPData,
  2913. sshClient.handshakeState.authorizedAccessTypes,
  2914. sshClient.handshakeState.apiParams,
  2915. blocklistHitsStatParams)
  2916. // Note: see comment in logTunnel regarding unlock and concurrent access.
  2917. sshClient.Unlock()
  2918. for _, tag := range tags {
  2919. if IP != nil {
  2920. logFields["blocklist_ip_address"] = IP.String()
  2921. }
  2922. if domain != "" {
  2923. logFields["blocklist_domain"] = domain
  2924. }
  2925. logFields["blocklist_source"] = tag.Source
  2926. logFields["blocklist_subject"] = tag.Subject
  2927. log.LogRawFieldsWithTimestamp(logFields)
  2928. }
  2929. }
  2930. func (sshClient *sshClient) runOSLSender() {
  2931. for {
  2932. // Await a signal that there are SLOKs to send
  2933. // TODO: use reflect.SelectCase, and optionally await timer here?
  2934. select {
  2935. case <-sshClient.signalIssueSLOKs:
  2936. case <-sshClient.runCtx.Done():
  2937. return
  2938. }
  2939. retryDelay := SSH_SEND_OSL_INITIAL_RETRY_DELAY
  2940. for {
  2941. err := sshClient.sendOSLRequest()
  2942. if err == nil {
  2943. break
  2944. }
  2945. if !isExpectedTunnelIOError(err) {
  2946. log.WithTraceFields(LogFields{"error": err}).Warning("sendOSLRequest failed")
  2947. }
  2948. // If the request failed, retry after a delay (with exponential backoff)
  2949. // or when signaled that there are additional SLOKs to send
  2950. retryTimer := time.NewTimer(retryDelay)
  2951. select {
  2952. case <-retryTimer.C:
  2953. case <-sshClient.signalIssueSLOKs:
  2954. case <-sshClient.runCtx.Done():
  2955. retryTimer.Stop()
  2956. return
  2957. }
  2958. retryTimer.Stop()
  2959. retryDelay *= SSH_SEND_OSL_RETRY_FACTOR
  2960. }
  2961. }
  2962. }
  2963. // sendOSLRequest will invoke osl.GetSeedPayload to issue SLOKs and
  2964. // generate a payload, and send an OSL request to the client when
  2965. // there are new SLOKs in the payload.
  2966. func (sshClient *sshClient) sendOSLRequest() error {
  2967. seedPayload := sshClient.getOSLSeedPayload()
  2968. // Don't send when no SLOKs. This will happen when signalIssueSLOKs
  2969. // is received but no new SLOKs are issued.
  2970. if len(seedPayload.SLOKs) == 0 {
  2971. return nil
  2972. }
  2973. oslRequest := protocol.OSLRequest{
  2974. SeedPayload: seedPayload,
  2975. }
  2976. requestPayload, err := json.Marshal(oslRequest)
  2977. if err != nil {
  2978. return errors.Trace(err)
  2979. }
  2980. ok, _, err := sshClient.sshConn.SendRequest(
  2981. protocol.PSIPHON_API_OSL_REQUEST_NAME,
  2982. true,
  2983. requestPayload)
  2984. if err != nil {
  2985. return errors.Trace(err)
  2986. }
  2987. if !ok {
  2988. return errors.TraceNew("client rejected request")
  2989. }
  2990. sshClient.clearOSLSeedPayload()
  2991. return nil
  2992. }
  2993. // runAlertSender dequeues and sends alert requests to the client. As these
  2994. // alerts are informational, there is no retry logic and no SSH client
  2995. // acknowledgement (wantReply) is requested. This worker scheme allows
  2996. // nonconcurrent components including udpgw and packet tunnel to enqueue
  2997. // alerts without blocking their traffic processing.
  2998. func (sshClient *sshClient) runAlertSender() {
  2999. for {
  3000. select {
  3001. case <-sshClient.runCtx.Done():
  3002. return
  3003. case request := <-sshClient.sendAlertRequests:
  3004. payload, err := json.Marshal(request)
  3005. if err != nil {
  3006. log.WithTraceFields(LogFields{"error": err}).Warning("Marshal failed")
  3007. break
  3008. }
  3009. _, _, err = sshClient.sshConn.SendRequest(
  3010. protocol.PSIPHON_API_ALERT_REQUEST_NAME,
  3011. false,
  3012. payload)
  3013. if err != nil && !isExpectedTunnelIOError(err) {
  3014. log.WithTraceFields(LogFields{"error": err}).Warning("SendRequest failed")
  3015. break
  3016. }
  3017. sshClient.Lock()
  3018. sshClient.sentAlertRequests[fmt.Sprintf("%+v", request)] = true
  3019. sshClient.Unlock()
  3020. }
  3021. }
  3022. }
  3023. // enqueueAlertRequest enqueues an alert request to be sent to the client.
  3024. // Only one request is sent per tunnel per protocol.AlertRequest value;
  3025. // subsequent alerts with the same value are dropped. enqueueAlertRequest will
  3026. // not block until the queue exceeds ALERT_REQUEST_QUEUE_BUFFER_SIZE.
  3027. func (sshClient *sshClient) enqueueAlertRequest(request protocol.AlertRequest) {
  3028. sshClient.Lock()
  3029. if sshClient.sentAlertRequests[fmt.Sprintf("%+v", request)] {
  3030. sshClient.Unlock()
  3031. return
  3032. }
  3033. sshClient.Unlock()
  3034. select {
  3035. case <-sshClient.runCtx.Done():
  3036. case sshClient.sendAlertRequests <- request:
  3037. }
  3038. }
  3039. func (sshClient *sshClient) enqueueDisallowedTrafficAlertRequest() {
  3040. reason := protocol.PSIPHON_API_ALERT_DISALLOWED_TRAFFIC
  3041. actionURLs := sshClient.getAlertActionURLs(reason)
  3042. sshClient.enqueueAlertRequest(
  3043. protocol.AlertRequest{
  3044. Reason: reason,
  3045. ActionURLs: actionURLs,
  3046. })
  3047. }
  3048. func (sshClient *sshClient) enqueueUnsafeTrafficAlertRequest(tags []BlocklistTag) {
  3049. reason := protocol.PSIPHON_API_ALERT_UNSAFE_TRAFFIC
  3050. actionURLs := sshClient.getAlertActionURLs(reason)
  3051. for _, tag := range tags {
  3052. sshClient.enqueueAlertRequest(
  3053. protocol.AlertRequest{
  3054. Reason: reason,
  3055. Subject: tag.Subject,
  3056. ActionURLs: actionURLs,
  3057. })
  3058. }
  3059. }
  3060. func (sshClient *sshClient) getAlertActionURLs(alertReason string) []string {
  3061. sshClient.Lock()
  3062. sponsorID, _ := getStringRequestParam(
  3063. sshClient.handshakeState.apiParams, "sponsor_id")
  3064. clientGeoIPData := sshClient.clientGeoIPData
  3065. deviceRegion := sshClient.handshakeState.deviceRegion
  3066. sshClient.Unlock()
  3067. return sshClient.sshServer.support.PsinetDatabase.GetAlertActionURLs(
  3068. alertReason,
  3069. sponsorID,
  3070. clientGeoIPData.Country,
  3071. clientGeoIPData.ASN,
  3072. deviceRegion)
  3073. }
  3074. func (sshClient *sshClient) rejectNewChannel(newChannel ssh.NewChannel, logMessage string) {
  3075. // We always return the reject reason "Prohibited":
  3076. // - Traffic rules and connection limits may prohibit the connection.
  3077. // - External firewall rules may prohibit the connection, and this is not currently
  3078. // distinguishable from other failure modes.
  3079. // - We limit the failure information revealed to the client.
  3080. reason := ssh.Prohibited
  3081. // Note: Debug level, as logMessage may contain user traffic destination address information
  3082. if IsLogLevelDebug() {
  3083. log.WithTraceFields(
  3084. LogFields{
  3085. "channelType": newChannel.ChannelType(),
  3086. "logMessage": logMessage,
  3087. "rejectReason": reason.String(),
  3088. }).Debug("reject new channel")
  3089. }
  3090. // Note: logMessage is internal, for logging only; just the reject reason is sent to the client.
  3091. _ = newChannel.Reject(reason, reason.String())
  3092. }
  3093. // setHandshakeState sets the handshake state -- that it completed and
  3094. // what parameters were passed -- in sshClient. This state is used for allowing
  3095. // port forwards and for future traffic rule selection. setHandshakeState
  3096. // also triggers an immediate traffic rule re-selection, as the rules selected
  3097. // upon tunnel establishment may no longer apply now that handshake values are
  3098. // set.
  3099. //
  3100. // The authorizations received from the client handshake are verified and the
  3101. // resulting list of authorized access types are applied to the client's tunnel
  3102. // and traffic rules.
  3103. //
  3104. // A list of active authorization IDs, authorized access types, and traffic
  3105. // rate limits are returned for responding to the client and logging.
  3106. //
  3107. // All slices in the returnd handshakeStateInfo are read-only, as readers may
  3108. // reference slice contents outside of locks.
  3109. func (sshClient *sshClient) setHandshakeState(
  3110. state handshakeState,
  3111. authorizations []string) (*handshakeStateInfo, error) {
  3112. sshClient.Lock()
  3113. completed := sshClient.handshakeState.completed
  3114. if !completed {
  3115. sshClient.handshakeState = state
  3116. if sshClient.isInproxyTunnelProtocol {
  3117. // Set the client GeoIP data to the value obtained using the
  3118. // original client IP, from the broker, in the handshake. Also
  3119. // update the GeoIP session hash to use the client GeoIP data.
  3120. sshClient.clientGeoIPData =
  3121. sshClient.handshakeState.inproxyClientGeoIPData
  3122. sshClient.sshServer.setGeoIPSessionCache(
  3123. sshClient.sessionID, sshClient.clientGeoIPData)
  3124. }
  3125. }
  3126. sshClient.Unlock()
  3127. // Client must only perform one handshake
  3128. if completed {
  3129. return nil, errors.TraceNew("handshake already completed")
  3130. }
  3131. if sshClient.isInproxyTunnelProtocol {
  3132. p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.clientGeoIPData)
  3133. if err != nil {
  3134. return nil, errors.Trace(err)
  3135. }
  3136. // Skip check if no tactics are configured.
  3137. //
  3138. // Disconnect immediately if the tactics for the client restricts usage
  3139. // of the provider ID with inproxy protocols. The probability may be
  3140. // used to influence usage of a given provider with inproxy protocols;
  3141. // but when only that provider works for a given client, and the
  3142. // probability is less than 1.0, the client can retry until it gets a
  3143. // successful coin flip.
  3144. //
  3145. // Clients will also skip inproxy protocol candidates with restricted
  3146. // provider IDs.
  3147. // The client-side probability,
  3148. // RestrictInproxyProviderIDsClientProbability, is applied
  3149. // independently of the server-side coin flip here.
  3150. //
  3151. // At this stage, GeoIP tactics filters are active, but handshake API
  3152. // parameters are not.
  3153. //
  3154. // See the comment in server.LoadConfig regarding provider ID
  3155. // limitations.
  3156. if !p.IsNil() &&
  3157. common.ContainsAny(
  3158. p.KeyStrings(parameters.RestrictInproxyProviderRegions,
  3159. sshClient.sshServer.support.Config.GetProviderID()),
  3160. []string{"", sshClient.sshServer.support.Config.GetRegion()}) {
  3161. if p.WeightedCoinFlip(
  3162. parameters.RestrictInproxyProviderIDsServerProbability) {
  3163. return nil, errRestrictedProvider
  3164. }
  3165. }
  3166. }
  3167. // Verify the authorizations submitted by the client. Verified, active
  3168. // (non-expired) access types will be available for traffic rules
  3169. // filtering.
  3170. //
  3171. // When an authorization is active but expires while the client is
  3172. // connected, the client is disconnected to ensure the access is reset.
  3173. // This is implemented by setting a timer to perform the disconnect at the
  3174. // expiry time of the soonest expiring authorization.
  3175. //
  3176. // sshServer.authorizationSessionIDs tracks the unique mapping of active
  3177. // authorization IDs to client session IDs and is used to detect and
  3178. // prevent multiple malicious clients from reusing a single authorization
  3179. // (within the scope of this server).
  3180. // authorizationIDs and authorizedAccessTypes are returned to the client
  3181. // and logged, respectively; initialize to empty lists so the
  3182. // protocol/logs don't need to handle 'null' values.
  3183. authorizationIDs := make([]string, 0)
  3184. authorizedAccessTypes := make([]string, 0)
  3185. var stopTime time.Time
  3186. for i, authorization := range authorizations {
  3187. // This sanity check mitigates malicious clients causing excess CPU use.
  3188. if i >= MAX_AUTHORIZATIONS {
  3189. log.WithTrace().Warning("too many authorizations")
  3190. break
  3191. }
  3192. verifiedAuthorization, err := accesscontrol.VerifyAuthorization(
  3193. &sshClient.sshServer.support.Config.AccessControlVerificationKeyRing,
  3194. authorization)
  3195. if err != nil {
  3196. log.WithTraceFields(
  3197. LogFields{"error": err}).Warning("verify authorization failed")
  3198. continue
  3199. }
  3200. authorizationID := base64.StdEncoding.EncodeToString(verifiedAuthorization.ID)
  3201. if common.Contains(authorizedAccessTypes, verifiedAuthorization.AccessType) {
  3202. log.WithTraceFields(
  3203. LogFields{"accessType": verifiedAuthorization.AccessType}).Warning("duplicate authorization access type")
  3204. continue
  3205. }
  3206. authorizationIDs = append(authorizationIDs, authorizationID)
  3207. authorizedAccessTypes = append(authorizedAccessTypes, verifiedAuthorization.AccessType)
  3208. if stopTime.IsZero() || stopTime.After(verifiedAuthorization.Expires) {
  3209. stopTime = verifiedAuthorization.Expires
  3210. }
  3211. }
  3212. // Associate all verified authorizationIDs with this client's session ID.
  3213. // Handle cases where previous associations exist:
  3214. //
  3215. // - Multiple malicious clients reusing a single authorization. In this
  3216. // case, authorizations are revoked from the previous client.
  3217. //
  3218. // - The client reconnected with a new session ID due to user toggling.
  3219. // This case is expected due to server affinity. This cannot be
  3220. // distinguished from the previous case and the same action is taken;
  3221. // this will have no impact on a legitimate client as the previous
  3222. // session is dangling.
  3223. //
  3224. // - The client automatically reconnected with the same session ID. This
  3225. // case is not expected as sshServer.registerEstablishedClient
  3226. // synchronously calls sshClient.releaseAuthorizations; as a safe guard,
  3227. // this case is distinguished and no revocation action is taken.
  3228. sshClient.sshServer.authorizationSessionIDsMutex.Lock()
  3229. for _, authorizationID := range authorizationIDs {
  3230. sessionID, ok := sshClient.sshServer.authorizationSessionIDs[authorizationID]
  3231. if ok && sessionID != sshClient.sessionID {
  3232. logFields := LogFields{
  3233. "event_name": "irregular_tunnel",
  3234. "tunnel_error": "duplicate active authorization",
  3235. "duplicate_authorization_id": authorizationID,
  3236. }
  3237. // Log this using client, not peer, GeoIP data. In the case of
  3238. // in-proxy tunnel protocols, the client GeoIP fields will be None
  3239. // if a handshake does not complete. However, presense of a
  3240. // (duplicate) authorization implies that the handshake completed.
  3241. sshClient.getClientGeoIPData().SetClientLogFields(logFields)
  3242. duplicateClientGeoIPData := sshClient.sshServer.getGeoIPSessionCache(sessionID)
  3243. if duplicateClientGeoIPData != sshClient.getClientGeoIPData() {
  3244. duplicateClientGeoIPData.SetClientLogFieldsWithPrefix("duplicate_authorization_", logFields)
  3245. }
  3246. log.LogRawFieldsWithTimestamp(logFields)
  3247. // Invoke asynchronously to avoid deadlocks.
  3248. // TODO: invoke only once for each distinct sessionID?
  3249. go sshClient.sshServer.revokeClientAuthorizations(sessionID)
  3250. }
  3251. sshClient.sshServer.authorizationSessionIDs[authorizationID] = sshClient.sessionID
  3252. }
  3253. sshClient.sshServer.authorizationSessionIDsMutex.Unlock()
  3254. if len(authorizationIDs) > 0 {
  3255. sshClient.Lock()
  3256. // Make the authorizedAccessTypes available for traffic rules filtering.
  3257. sshClient.handshakeState.activeAuthorizationIDs = authorizationIDs
  3258. sshClient.handshakeState.authorizedAccessTypes = authorizedAccessTypes
  3259. // On exit, sshClient.runTunnel will call releaseAuthorizations, which
  3260. // will release the authorization IDs so the client can reconnect and
  3261. // present the same authorizations again. sshClient.runTunnel will
  3262. // also cancel the stopTimer in case it has not yet fired.
  3263. // Note: termination of the stopTimer goroutine is not synchronized.
  3264. sshClient.releaseAuthorizations = func() {
  3265. sshClient.sshServer.authorizationSessionIDsMutex.Lock()
  3266. for _, authorizationID := range authorizationIDs {
  3267. sessionID, ok := sshClient.sshServer.authorizationSessionIDs[authorizationID]
  3268. if ok && sessionID == sshClient.sessionID {
  3269. delete(sshClient.sshServer.authorizationSessionIDs, authorizationID)
  3270. }
  3271. }
  3272. sshClient.sshServer.authorizationSessionIDsMutex.Unlock()
  3273. }
  3274. sshClient.stopTimer = time.AfterFunc(
  3275. time.Until(stopTime),
  3276. func() {
  3277. sshClient.stop()
  3278. })
  3279. sshClient.Unlock()
  3280. }
  3281. upstreamBytesPerSecond, downstreamBytesPerSecond := sshClient.setTrafficRules()
  3282. sshClient.setOSLConfig()
  3283. // Set destination bytes metrics.
  3284. //
  3285. // Limitation: this is a one-time operation and doesn't get reset when
  3286. // tactics are hot-reloaded. This allows us to simply retain any
  3287. // destination byte counts accumulated and eventually log in
  3288. // server_tunnel, without having to deal with a destination change
  3289. // mid-tunnel. As typical tunnels are short, and destination changes can
  3290. // be applied gradually, handling mid-tunnel changes is not a priority.
  3291. sshClient.setDestinationBytesMetrics()
  3292. info := &handshakeStateInfo{
  3293. activeAuthorizationIDs: authorizationIDs,
  3294. authorizedAccessTypes: authorizedAccessTypes,
  3295. upstreamBytesPerSecond: upstreamBytesPerSecond,
  3296. downstreamBytesPerSecond: downstreamBytesPerSecond,
  3297. }
  3298. // Relay the steering IP to the API handshake handler.
  3299. if sshClient.additionalTransportData != nil {
  3300. info.steeringIP = sshClient.additionalTransportData.steeringIP
  3301. }
  3302. return info, nil
  3303. }
  3304. // getHandshaked returns whether the client has completed a handshake API
  3305. // request and whether the traffic rules that were selected after the
  3306. // handshake immediately exhaust the client.
  3307. //
  3308. // When the client is immediately exhausted it will be closed; but this
  3309. // takes effect asynchronously. The "exhausted" return value is used to
  3310. // prevent API requests by clients that will close.
  3311. func (sshClient *sshClient) getHandshaked() (bool, bool) {
  3312. sshClient.Lock()
  3313. defer sshClient.Unlock()
  3314. completed := sshClient.handshakeState.completed
  3315. exhausted := false
  3316. // Notes:
  3317. // - "Immediately exhausted" is when CloseAfterExhausted is set and
  3318. // either ReadUnthrottledBytes or WriteUnthrottledBytes starts from
  3319. // 0, so no bytes would be read or written. This check does not
  3320. // examine whether 0 bytes _remain_ in the ThrottledConn.
  3321. // - This check is made against the current traffic rules, which
  3322. // could have changed in a hot reload since the handshake.
  3323. if completed &&
  3324. *sshClient.trafficRules.RateLimits.CloseAfterExhausted &&
  3325. (*sshClient.trafficRules.RateLimits.ReadUnthrottledBytes == 0 ||
  3326. *sshClient.trafficRules.RateLimits.WriteUnthrottledBytes == 0) {
  3327. exhausted = true
  3328. }
  3329. return completed, exhausted
  3330. }
  3331. func (sshClient *sshClient) getDisableDiscovery() bool {
  3332. sshClient.Lock()
  3333. defer sshClient.Unlock()
  3334. return *sshClient.trafficRules.DisableDiscovery
  3335. }
  3336. func (sshClient *sshClient) updateAPIParameters(
  3337. apiParams common.APIParameters) {
  3338. sshClient.Lock()
  3339. defer sshClient.Unlock()
  3340. // Only update after handshake has initialized API params.
  3341. if !sshClient.handshakeState.completed {
  3342. return
  3343. }
  3344. for name, value := range apiParams {
  3345. sshClient.handshakeState.apiParams[name] = value
  3346. }
  3347. }
  3348. func (sshClient *sshClient) acceptDomainBytes() bool {
  3349. sshClient.Lock()
  3350. defer sshClient.Unlock()
  3351. // When the domain bytes checksum differs from the checksum sent to the
  3352. // client in the handshake response, the psinet regex configuration has
  3353. // changed. In this case, drop the stats so we don't continue to record
  3354. // stats as previously configured.
  3355. //
  3356. // Limitations:
  3357. // - The checksum comparison may result in dropping some stats for a
  3358. // domain that remains in the new configuration.
  3359. // - We don't push new regexs to the clients, so clients that remain
  3360. // connected will continue to send stats that will be dropped; and
  3361. // those clients will not send stats as newly configured until after
  3362. // reconnecting.
  3363. // - Due to the design of
  3364. // transferstats.ReportRecentBytesTransferredForServer in the client,
  3365. // the client may accumulate stats, reconnect before its next status
  3366. // request, get a new regex configuration, and then send the previously
  3367. // accumulated stats in its next status request. The checksum scheme
  3368. // won't prevent the reporting of those stats.
  3369. sponsorID, _ := getStringRequestParam(sshClient.handshakeState.apiParams, "sponsor_id")
  3370. domainBytesChecksum := sshClient.sshServer.support.PsinetDatabase.GetDomainBytesChecksum(sponsorID)
  3371. return bytes.Equal(sshClient.handshakeState.domainBytesChecksum, domainBytesChecksum)
  3372. }
  3373. // setOSLConfig resets the client's OSL seed state based on the latest OSL config
  3374. // As sshClient.oslClientSeedState may be reset by a concurrent goroutine,
  3375. // oslClientSeedState must only be accessed within the sshClient mutex.
  3376. func (sshClient *sshClient) setOSLConfig() {
  3377. sshClient.Lock()
  3378. defer sshClient.Unlock()
  3379. propagationChannelID, err := getStringRequestParam(
  3380. sshClient.handshakeState.apiParams, "propagation_channel_id")
  3381. if err != nil {
  3382. // This should not fail as long as client has sent valid handshake
  3383. return
  3384. }
  3385. // Use a cached seed state if one is found for the client's
  3386. // session ID. This enables resuming progress made in a previous
  3387. // tunnel.
  3388. // Note: go-cache is already concurency safe; the additional mutex
  3389. // is necessary to guarantee that Get/Delete is atomic; although in
  3390. // practice no two concurrent clients should ever supply the same
  3391. // session ID.
  3392. sshClient.sshServer.oslSessionCacheMutex.Lock()
  3393. oslClientSeedState, found := sshClient.sshServer.oslSessionCache.Get(sshClient.sessionID)
  3394. if found {
  3395. sshClient.sshServer.oslSessionCache.Delete(sshClient.sessionID)
  3396. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  3397. sshClient.oslClientSeedState = oslClientSeedState.(*osl.ClientSeedState)
  3398. sshClient.oslClientSeedState.Resume(sshClient.signalIssueSLOKs)
  3399. return
  3400. }
  3401. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  3402. // Two limitations when setOSLConfig() is invoked due to an
  3403. // OSL config hot reload:
  3404. //
  3405. // 1. any partial progress towards SLOKs is lost.
  3406. //
  3407. // 2. all existing osl.ClientSeedPortForwards for existing
  3408. // port forwards will not send progress to the new client
  3409. // seed state.
  3410. // Use the client, not peer, GeoIP data. In the case of in-proxy tunnel
  3411. // protocols, the client GeoIP fields will be populated using the
  3412. // original client IP already received, from the broker, in the handshake.
  3413. sshClient.oslClientSeedState = sshClient.sshServer.support.OSLConfig.NewClientSeedState(
  3414. sshClient.clientGeoIPData.Country,
  3415. propagationChannelID,
  3416. sshClient.signalIssueSLOKs)
  3417. }
  3418. // newClientSeedPortForward will return nil when no seeding is
  3419. // associated with the specified ipAddress.
  3420. func (sshClient *sshClient) newClientSeedPortForward(IPAddress net.IP) *osl.ClientSeedPortForward {
  3421. sshClient.Lock()
  3422. defer sshClient.Unlock()
  3423. // Will not be initialized before handshake.
  3424. if sshClient.oslClientSeedState == nil {
  3425. return nil
  3426. }
  3427. lookupASN := func(IP net.IP) string {
  3428. // TODO: there are potentially multiple identical geo IP lookups per new
  3429. // port forward and flow, cache and use result of first lookup.
  3430. return sshClient.sshServer.support.GeoIPService.LookupISPForIP(IP).ASN
  3431. }
  3432. return sshClient.oslClientSeedState.NewClientSeedPortForward(IPAddress, lookupASN)
  3433. }
  3434. // getOSLSeedPayload returns a payload containing all seeded SLOKs for
  3435. // this client's session.
  3436. func (sshClient *sshClient) getOSLSeedPayload() *osl.SeedPayload {
  3437. sshClient.Lock()
  3438. defer sshClient.Unlock()
  3439. // Will not be initialized before handshake.
  3440. if sshClient.oslClientSeedState == nil {
  3441. return &osl.SeedPayload{SLOKs: make([]*osl.SLOK, 0)}
  3442. }
  3443. return sshClient.oslClientSeedState.GetSeedPayload()
  3444. }
  3445. func (sshClient *sshClient) clearOSLSeedPayload() {
  3446. sshClient.Lock()
  3447. defer sshClient.Unlock()
  3448. sshClient.oslClientSeedState.ClearSeedPayload()
  3449. }
  3450. func (sshClient *sshClient) setDestinationBytesMetrics() {
  3451. sshClient.Lock()
  3452. defer sshClient.Unlock()
  3453. // Limitation: the server-side tactics cache is used to avoid the overhead
  3454. // of an additional tactics filtering per tunnel. As this cache is
  3455. // designed for GeoIP filtering only, handshake API parameters are not
  3456. // applied to tactics filtering in this case.
  3457. tacticsCache := sshClient.sshServer.support.ServerTacticsParametersCache
  3458. if tacticsCache == nil {
  3459. return
  3460. }
  3461. // Use the client, not peer, GeoIP data. In the case of in-proxy tunnel
  3462. // protocols, the client GeoIP fields will be populated using the
  3463. // original client IP already received, from the broker, in the handshake.
  3464. p, err := tacticsCache.Get(sshClient.clientGeoIPData)
  3465. if err != nil {
  3466. log.WithTraceFields(LogFields{"error": err}).Warning("get tactics failed")
  3467. return
  3468. }
  3469. if p.IsNil() {
  3470. return
  3471. }
  3472. ASNs := p.Strings(parameters.DestinationBytesMetricsASNs)
  3473. // Merge in any legacy parameters.DestinationBytesMetricsASN
  3474. // configuration. Data for this target will be logged using the legacy
  3475. // log field format; see logTunnel. If an ASN is in _both_ configuration
  3476. // parameters, its data will be logged in both log field formats.
  3477. ASN := p.String(parameters.DestinationBytesMetricsASN)
  3478. if len(ASNs) == 0 && ASN == "" {
  3479. return
  3480. }
  3481. sshClient.destinationBytesMetrics = make(map[string]*protocolDestinationBytesMetrics)
  3482. for _, ASN := range ASNs {
  3483. if ASN != "" {
  3484. sshClient.destinationBytesMetrics[ASN] = &protocolDestinationBytesMetrics{}
  3485. }
  3486. }
  3487. if ASN != "" {
  3488. sshClient.destinationBytesMetrics[ASN] = &protocolDestinationBytesMetrics{}
  3489. }
  3490. }
  3491. func (sshClient *sshClient) newDestinationBytesMetricsUpdater(portForwardType int, IPAddress net.IP) *destinationBytesMetrics {
  3492. sshClient.Lock()
  3493. defer sshClient.Unlock()
  3494. if sshClient.destinationBytesMetrics == nil {
  3495. return nil
  3496. }
  3497. destinationASN := sshClient.sshServer.support.GeoIPService.LookupISPForIP(IPAddress).ASN
  3498. // Future enhancement: for 5 or fewer ASNs, iterate over a slice instead
  3499. // of using a map? See, for example, stringLookupThreshold in
  3500. // common/tactics.
  3501. metrics, ok := sshClient.destinationBytesMetrics[destinationASN]
  3502. if !ok {
  3503. return nil
  3504. }
  3505. if portForwardType == portForwardTypeTCP {
  3506. return &metrics.tcpMetrics
  3507. }
  3508. return &metrics.udpMetrics
  3509. }
  3510. func (sshClient *sshClient) getActivityUpdaters(portForwardType int, IPAddress net.IP) []common.ActivityUpdater {
  3511. var updaters []common.ActivityUpdater
  3512. clientSeedPortForward := sshClient.newClientSeedPortForward(IPAddress)
  3513. if clientSeedPortForward != nil {
  3514. updaters = append(updaters, clientSeedPortForward)
  3515. }
  3516. destinationBytesMetrics := sshClient.newDestinationBytesMetricsUpdater(portForwardType, IPAddress)
  3517. if destinationBytesMetrics != nil {
  3518. updaters = append(updaters, destinationBytesMetrics)
  3519. }
  3520. return updaters
  3521. }
  3522. // setTrafficRules resets the client's traffic rules based on the latest server config
  3523. // and client properties. As sshClient.trafficRules may be reset by a concurrent
  3524. // goroutine, trafficRules must only be accessed within the sshClient mutex.
  3525. func (sshClient *sshClient) setTrafficRules() (int64, int64) {
  3526. sshClient.Lock()
  3527. defer sshClient.Unlock()
  3528. isFirstTunnelInSession := sshClient.isFirstTunnelInSession &&
  3529. sshClient.handshakeState.establishedTunnelsCount == 0
  3530. // In the case of in-proxy tunnel protocols, the client GeoIP data is None
  3531. // until the handshake completes. Pre-handhake, the rate limit is
  3532. // determined by EstablishmentRead/WriteBytesPerSecond, which default to
  3533. // unthrottled, the recommended setting; in addition, no port forwards
  3534. // are permitted until after the handshake completes, at which time
  3535. // setTrafficRules will be called again with the client GeoIP data
  3536. // populated using the original client IP received from the in-proxy
  3537. // broker.
  3538. sshClient.trafficRules = sshClient.sshServer.support.TrafficRulesSet.GetTrafficRules(
  3539. isFirstTunnelInSession,
  3540. sshClient.tunnelProtocol,
  3541. sshClient.clientGeoIPData,
  3542. sshClient.handshakeState)
  3543. if sshClient.throttledConn != nil {
  3544. // Any existing throttling state is reset.
  3545. sshClient.throttledConn.SetLimits(
  3546. sshClient.trafficRules.RateLimits.CommonRateLimits(
  3547. sshClient.handshakeState.completed))
  3548. }
  3549. return *sshClient.trafficRules.RateLimits.ReadBytesPerSecond,
  3550. *sshClient.trafficRules.RateLimits.WriteBytesPerSecond
  3551. }
  3552. func (sshClient *sshClient) rateLimits() common.RateLimits {
  3553. sshClient.Lock()
  3554. defer sshClient.Unlock()
  3555. return sshClient.trafficRules.RateLimits.CommonRateLimits(
  3556. sshClient.handshakeState.completed)
  3557. }
  3558. func (sshClient *sshClient) idleTCPPortForwardTimeout() time.Duration {
  3559. sshClient.Lock()
  3560. defer sshClient.Unlock()
  3561. return time.Duration(*sshClient.trafficRules.IdleTCPPortForwardTimeoutMilliseconds) * time.Millisecond
  3562. }
  3563. func (sshClient *sshClient) idleUDPPortForwardTimeout() time.Duration {
  3564. sshClient.Lock()
  3565. defer sshClient.Unlock()
  3566. return time.Duration(*sshClient.trafficRules.IdleUDPPortForwardTimeoutMilliseconds) * time.Millisecond
  3567. }
  3568. func (sshClient *sshClient) setTCPPortForwardDialingAvailableSignal(signal context.CancelFunc) {
  3569. sshClient.Lock()
  3570. defer sshClient.Unlock()
  3571. sshClient.tcpPortForwardDialingAvailableSignal = signal
  3572. }
  3573. const (
  3574. portForwardTypeTCP = iota
  3575. portForwardTypeUDP
  3576. )
  3577. func (sshClient *sshClient) isPortForwardPermitted(
  3578. portForwardType int,
  3579. remoteIP net.IP,
  3580. port int) bool {
  3581. // Disallow connection to bogons.
  3582. //
  3583. // As a security measure, this is a failsafe. The server should be run on a
  3584. // host with correctly configured firewall rules.
  3585. //
  3586. // This check also avoids spurious disallowed traffic alerts for destinations
  3587. // that are impossible to reach.
  3588. if !sshClient.sshServer.support.Config.AllowBogons && common.IsBogon(remoteIP) {
  3589. return false
  3590. }
  3591. // Blocklist check.
  3592. //
  3593. // Limitation: isPortForwardPermitted is not called in transparent DNS
  3594. // forwarding cases. As the destination IP address is rewritten in these
  3595. // cases, a blocklist entry won't be dialed in any case. However, no logs
  3596. // will be recorded.
  3597. if !sshClient.isIPPermitted(remoteIP) {
  3598. return false
  3599. }
  3600. // Don't lock before calling logBlocklistHits.
  3601. // Unlock before calling enqueueDisallowedTrafficAlertRequest/log.
  3602. sshClient.Lock()
  3603. allowed := true
  3604. // Client must complete handshake before port forwards are permitted.
  3605. if !sshClient.handshakeState.completed {
  3606. allowed = false
  3607. }
  3608. if allowed {
  3609. // Traffic rules checks.
  3610. switch portForwardType {
  3611. case portForwardTypeTCP:
  3612. if !sshClient.trafficRules.AllowTCPPort(
  3613. sshClient.sshServer.support.GeoIPService, remoteIP, port) {
  3614. allowed = false
  3615. }
  3616. case portForwardTypeUDP:
  3617. if !sshClient.trafficRules.AllowUDPPort(
  3618. sshClient.sshServer.support.GeoIPService, remoteIP, port) {
  3619. allowed = false
  3620. }
  3621. }
  3622. }
  3623. sshClient.Unlock()
  3624. if allowed {
  3625. return true
  3626. }
  3627. switch portForwardType {
  3628. case portForwardTypeTCP:
  3629. sshClient.updateQualityMetricsWithTCPRejectedDisallowed()
  3630. case portForwardTypeUDP:
  3631. sshClient.updateQualityMetricsWithUDPRejectedDisallowed()
  3632. }
  3633. sshClient.enqueueDisallowedTrafficAlertRequest()
  3634. if IsLogLevelDebug() {
  3635. log.WithTraceFields(
  3636. LogFields{
  3637. "type": portForwardType,
  3638. "port": port,
  3639. }).Debug("port forward denied by traffic rules")
  3640. }
  3641. return false
  3642. }
  3643. // isDomainPermitted returns true when the specified domain may be resolved
  3644. // and returns false and a reject reason otherwise.
  3645. func (sshClient *sshClient) isDomainPermitted(domain string) (bool, string) {
  3646. // We're not doing comprehensive validation, to avoid overhead per port
  3647. // forward. This is a simple sanity check to ensure we don't process
  3648. // blantantly invalid input.
  3649. //
  3650. // TODO: validate with dns.IsDomainName?
  3651. if len(domain) > 255 {
  3652. return false, "invalid domain name"
  3653. }
  3654. // Don't even attempt to resolve the default mDNS top-level domain.
  3655. // Non-default cases won't be caught here but should fail to resolve due
  3656. // to the PreferGo setting in net.Resolver.
  3657. if strings.HasSuffix(domain, ".local") {
  3658. return false, "port forward not permitted"
  3659. }
  3660. tags := sshClient.sshServer.support.Blocklist.LookupDomain(domain)
  3661. if len(tags) > 0 {
  3662. sshClient.logBlocklistHits(nil, domain, tags)
  3663. if sshClient.sshServer.support.Config.BlocklistActive {
  3664. // Actively alert and block
  3665. sshClient.enqueueUnsafeTrafficAlertRequest(tags)
  3666. return false, "port forward not permitted"
  3667. }
  3668. }
  3669. return true, ""
  3670. }
  3671. func (sshClient *sshClient) isIPPermitted(remoteIP net.IP) bool {
  3672. tags := sshClient.sshServer.support.Blocklist.LookupIP(remoteIP)
  3673. if len(tags) > 0 {
  3674. sshClient.logBlocklistHits(remoteIP, "", tags)
  3675. if sshClient.sshServer.support.Config.BlocklistActive {
  3676. // Actively alert and block
  3677. sshClient.enqueueUnsafeTrafficAlertRequest(tags)
  3678. return false
  3679. }
  3680. }
  3681. return true
  3682. }
  3683. func (sshClient *sshClient) isTCPDialingPortForwardLimitExceeded() bool {
  3684. sshClient.Lock()
  3685. defer sshClient.Unlock()
  3686. state := &sshClient.tcpTrafficState
  3687. max := *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  3688. if max > 0 && state.concurrentDialingPortForwardCount >= int64(max) {
  3689. return true
  3690. }
  3691. return false
  3692. }
  3693. func (sshClient *sshClient) getTCPPortForwardQueueSize() int {
  3694. sshClient.Lock()
  3695. defer sshClient.Unlock()
  3696. return *sshClient.trafficRules.MaxTCPPortForwardCount +
  3697. *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  3698. }
  3699. func (sshClient *sshClient) getDialTCPPortForwardTimeoutMilliseconds() int {
  3700. sshClient.Lock()
  3701. defer sshClient.Unlock()
  3702. return *sshClient.trafficRules.DialTCPPortForwardTimeoutMilliseconds
  3703. }
  3704. func (sshClient *sshClient) dialingTCPPortForward() {
  3705. sshClient.Lock()
  3706. defer sshClient.Unlock()
  3707. state := &sshClient.tcpTrafficState
  3708. state.concurrentDialingPortForwardCount += 1
  3709. if state.concurrentDialingPortForwardCount > state.peakConcurrentDialingPortForwardCount {
  3710. state.peakConcurrentDialingPortForwardCount = state.concurrentDialingPortForwardCount
  3711. }
  3712. }
  3713. func (sshClient *sshClient) abortedTCPPortForward() {
  3714. sshClient.Lock()
  3715. defer sshClient.Unlock()
  3716. sshClient.tcpTrafficState.concurrentDialingPortForwardCount -= 1
  3717. }
  3718. func (sshClient *sshClient) allocatePortForward(portForwardType int) bool {
  3719. sshClient.Lock()
  3720. defer sshClient.Unlock()
  3721. // Check if at port forward limit. The subsequent counter
  3722. // changes must be atomic with the limit check to ensure
  3723. // the counter never exceeds the limit in the case of
  3724. // concurrent allocations.
  3725. var max int
  3726. var state *trafficState
  3727. if portForwardType == portForwardTypeTCP {
  3728. max = *sshClient.trafficRules.MaxTCPPortForwardCount
  3729. state = &sshClient.tcpTrafficState
  3730. } else {
  3731. max = *sshClient.trafficRules.MaxUDPPortForwardCount
  3732. state = &sshClient.udpTrafficState
  3733. }
  3734. if max > 0 && state.concurrentPortForwardCount >= int64(max) {
  3735. return false
  3736. }
  3737. // Update port forward counters.
  3738. if portForwardType == portForwardTypeTCP {
  3739. // Assumes TCP port forwards called dialingTCPPortForward
  3740. state.concurrentDialingPortForwardCount -= 1
  3741. if sshClient.tcpPortForwardDialingAvailableSignal != nil {
  3742. max := *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  3743. if max <= 0 || state.concurrentDialingPortForwardCount < int64(max) {
  3744. sshClient.tcpPortForwardDialingAvailableSignal()
  3745. }
  3746. }
  3747. }
  3748. state.concurrentPortForwardCount += 1
  3749. if state.concurrentPortForwardCount > state.peakConcurrentPortForwardCount {
  3750. state.peakConcurrentPortForwardCount = state.concurrentPortForwardCount
  3751. }
  3752. state.totalPortForwardCount += 1
  3753. return true
  3754. }
  3755. // establishedPortForward increments the concurrent port
  3756. // forward counter. closedPortForward decrements it, so it
  3757. // must always be called for each establishedPortForward
  3758. // call.
  3759. //
  3760. // When at the limit of established port forwards, the LRU
  3761. // existing port forward is closed to make way for the newly
  3762. // established one. There can be a minor delay as, in addition
  3763. // to calling Close() on the port forward net.Conn,
  3764. // establishedPortForward waits for the LRU's closedPortForward()
  3765. // call which will decrement the concurrent counter. This
  3766. // ensures all resources associated with the LRU (socket,
  3767. // goroutine) are released or will very soon be released before
  3768. // proceeding.
  3769. func (sshClient *sshClient) establishedPortForward(
  3770. portForwardType int, portForwardLRU *common.LRUConns) {
  3771. // Do not lock sshClient here.
  3772. var state *trafficState
  3773. if portForwardType == portForwardTypeTCP {
  3774. state = &sshClient.tcpTrafficState
  3775. } else {
  3776. state = &sshClient.udpTrafficState
  3777. }
  3778. // When the maximum number of port forwards is already
  3779. // established, close the LRU. CloseOldest will call
  3780. // Close on the port forward net.Conn. Both TCP and
  3781. // UDP port forwards have handler goroutines that may
  3782. // be blocked calling Read on the net.Conn. Close will
  3783. // eventually interrupt the Read and cause the handlers
  3784. // to exit, but not immediately. So the following logic
  3785. // waits for a LRU handler to be interrupted and signal
  3786. // availability.
  3787. //
  3788. // Notes:
  3789. //
  3790. // - the port forward limit can change via a traffic
  3791. // rules hot reload; the condition variable handles
  3792. // this case whereas a channel-based semaphore would
  3793. // not.
  3794. //
  3795. // - if a number of goroutines exceeding the total limit
  3796. // arrive here all concurrently, some CloseOldest() calls
  3797. // will have no effect as there can be less existing port
  3798. // forwards than new ones. In this case, the new port
  3799. // forward will be delayed. This is highly unlikely in
  3800. // practise since UDP calls to establishedPortForward are
  3801. // serialized and TCP calls are limited by the dial
  3802. // queue/count.
  3803. if !sshClient.allocatePortForward(portForwardType) {
  3804. portForwardLRU.CloseOldest()
  3805. if IsLogLevelDebug() {
  3806. log.WithTrace().Debug("closed LRU port forward")
  3807. }
  3808. state.availablePortForwardCond.L.Lock()
  3809. for !sshClient.allocatePortForward(portForwardType) {
  3810. state.availablePortForwardCond.Wait()
  3811. }
  3812. state.availablePortForwardCond.L.Unlock()
  3813. }
  3814. }
  3815. func (sshClient *sshClient) closedPortForward(
  3816. portForwardType int, bytesUp, bytesDown int64) {
  3817. sshClient.Lock()
  3818. var state *trafficState
  3819. if portForwardType == portForwardTypeTCP {
  3820. state = &sshClient.tcpTrafficState
  3821. } else {
  3822. state = &sshClient.udpTrafficState
  3823. }
  3824. state.concurrentPortForwardCount -= 1
  3825. state.bytesUp += bytesUp
  3826. state.bytesDown += bytesDown
  3827. sshClient.Unlock()
  3828. // Signal any goroutine waiting in establishedPortForward
  3829. // that an established port forward slot is available.
  3830. state.availablePortForwardCond.Signal()
  3831. }
  3832. func (sshClient *sshClient) updateQualityMetricsWithDialResult(
  3833. tcpPortForwardDialSuccess bool, dialDuration time.Duration, IP net.IP) {
  3834. sshClient.Lock()
  3835. defer sshClient.Unlock()
  3836. if tcpPortForwardDialSuccess {
  3837. sshClient.qualityMetrics.TCPPortForwardDialedCount += 1
  3838. sshClient.qualityMetrics.TCPPortForwardDialedDuration += dialDuration
  3839. if IP.To4() != nil {
  3840. sshClient.qualityMetrics.TCPIPv4PortForwardDialedCount += 1
  3841. sshClient.qualityMetrics.TCPIPv4PortForwardDialedDuration += dialDuration
  3842. } else if IP != nil {
  3843. sshClient.qualityMetrics.TCPIPv6PortForwardDialedCount += 1
  3844. sshClient.qualityMetrics.TCPIPv6PortForwardDialedDuration += dialDuration
  3845. }
  3846. } else {
  3847. sshClient.qualityMetrics.TCPPortForwardFailedCount += 1
  3848. sshClient.qualityMetrics.TCPPortForwardFailedDuration += dialDuration
  3849. if IP.To4() != nil {
  3850. sshClient.qualityMetrics.TCPIPv4PortForwardFailedCount += 1
  3851. sshClient.qualityMetrics.TCPIPv4PortForwardFailedDuration += dialDuration
  3852. } else if IP != nil {
  3853. sshClient.qualityMetrics.TCPIPv6PortForwardFailedCount += 1
  3854. sshClient.qualityMetrics.TCPIPv6PortForwardFailedDuration += dialDuration
  3855. }
  3856. }
  3857. }
  3858. func (sshClient *sshClient) updateQualityMetricsWithRejectedDialingLimit() {
  3859. sshClient.Lock()
  3860. defer sshClient.Unlock()
  3861. sshClient.qualityMetrics.TCPPortForwardRejectedDialingLimitCount += 1
  3862. }
  3863. func (sshClient *sshClient) updateQualityMetricsWithTCPRejectedDisallowed() {
  3864. sshClient.Lock()
  3865. defer sshClient.Unlock()
  3866. sshClient.qualityMetrics.TCPPortForwardRejectedDisallowedCount += 1
  3867. }
  3868. func (sshClient *sshClient) updateQualityMetricsWithUDPRejectedDisallowed() {
  3869. sshClient.Lock()
  3870. defer sshClient.Unlock()
  3871. sshClient.qualityMetrics.UDPPortForwardRejectedDisallowedCount += 1
  3872. }
  3873. func (sshClient *sshClient) updateQualityMetricsWithDNSResult(
  3874. success bool, duration time.Duration, resolverIP net.IP) {
  3875. sshClient.Lock()
  3876. defer sshClient.Unlock()
  3877. resolver := ""
  3878. if resolverIP != nil {
  3879. resolver = resolverIP.String()
  3880. }
  3881. if success {
  3882. sshClient.qualityMetrics.DNSCount["ALL"] += 1
  3883. sshClient.qualityMetrics.DNSDuration["ALL"] += duration
  3884. if resolver != "" {
  3885. sshClient.qualityMetrics.DNSCount[resolver] += 1
  3886. sshClient.qualityMetrics.DNSDuration[resolver] += duration
  3887. }
  3888. } else {
  3889. sshClient.qualityMetrics.DNSFailedCount["ALL"] += 1
  3890. sshClient.qualityMetrics.DNSFailedDuration["ALL"] += duration
  3891. if resolver != "" {
  3892. sshClient.qualityMetrics.DNSFailedCount[resolver] += 1
  3893. sshClient.qualityMetrics.DNSFailedDuration[resolver] += duration
  3894. }
  3895. }
  3896. }
  3897. func (sshClient *sshClient) handleTCPChannel(
  3898. remainingDialTimeout time.Duration,
  3899. hostToConnect string,
  3900. portToConnect int,
  3901. doSplitTunnel bool,
  3902. newChannel ssh.NewChannel) {
  3903. // Assumptions:
  3904. // - sshClient.dialingTCPPortForward() has been called
  3905. // - remainingDialTimeout > 0
  3906. established := false
  3907. defer func() {
  3908. if !established {
  3909. sshClient.abortedTCPPortForward()
  3910. }
  3911. }()
  3912. // Validate the domain name and check the domain blocklist before dialing.
  3913. //
  3914. // The IP blocklist is checked in isPortForwardPermitted, which also provides
  3915. // IP blocklist checking for the packet tunnel code path. When hostToConnect
  3916. // is an IP address, the following hostname resolution step effectively
  3917. // performs no actions and next immediate step is the isPortForwardPermitted
  3918. // check.
  3919. //
  3920. // Limitation: this case handles port forwards where the client sends the
  3921. // destination domain in the SSH port forward request but does not currently
  3922. // handle DNS-over-TCP; in the DNS-over-TCP case, a client may bypass the
  3923. // block list check.
  3924. if net.ParseIP(hostToConnect) == nil {
  3925. ok, rejectMessage := sshClient.isDomainPermitted(hostToConnect)
  3926. if !ok {
  3927. // Note: not recording a port forward failure in this case
  3928. sshClient.rejectNewChannel(newChannel, rejectMessage)
  3929. return
  3930. }
  3931. }
  3932. // Dial the remote address.
  3933. //
  3934. // Hostname resolution is performed explicitly, as a separate step, as the
  3935. // target IP address is used for traffic rules (AllowSubnets), OSL seed
  3936. // progress, and IP address blocklists.
  3937. //
  3938. // Contexts are used for cancellation (via sshClient.runCtx, which is
  3939. // cancelled when the client is stopping) and timeouts.
  3940. dialStartTime := time.Now()
  3941. IP := net.ParseIP(hostToConnect)
  3942. if IP == nil {
  3943. // Resolve the hostname
  3944. // PreferGo, equivalent to GODEBUG=netdns=go, is specified in order to
  3945. // avoid any cases where Go's resolver fails over to the cgo-based
  3946. // resolver (see https://pkg.go.dev/net#hdr-Name_Resolution). Such
  3947. // cases, if they resolve at all, may be expected to resolve to bogon
  3948. // IPs that won't be permitted; but the cgo invocation will consume
  3949. // an OS thread, which is a performance hit we can avoid.
  3950. if IsLogLevelDebug() {
  3951. log.WithTraceFields(LogFields{"hostToConnect": hostToConnect}).Debug("resolving")
  3952. }
  3953. ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  3954. IPs, err := (&net.Resolver{PreferGo: true}).LookupIPAddr(ctx, hostToConnect)
  3955. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  3956. resolveElapsedTime := time.Since(dialStartTime)
  3957. // Record DNS metrics. If LookupIPAddr returns net.DNSError.IsNotFound, this
  3958. // is "no such host" and not a DNS failure. Limitation: the resolver IP is
  3959. // not known.
  3960. dnsErr, ok := err.(*net.DNSError)
  3961. dnsNotFound := ok && dnsErr.IsNotFound
  3962. dnsSuccess := err == nil || dnsNotFound
  3963. sshClient.updateQualityMetricsWithDNSResult(dnsSuccess, resolveElapsedTime, nil)
  3964. // IPv4 is preferred in case the host has limited IPv6 routing. IPv6 is
  3965. // selected and attempted only when there's no IPv4 option.
  3966. // TODO: shuffle list to try other IPs?
  3967. for _, ip := range IPs {
  3968. if ip.IP.To4() != nil {
  3969. IP = ip.IP
  3970. break
  3971. }
  3972. }
  3973. if IP == nil && len(IPs) > 0 {
  3974. // If there are no IPv4 IPs, the first IP is IPv6.
  3975. IP = IPs[0].IP
  3976. }
  3977. if err == nil && IP == nil {
  3978. err = std_errors.New("no IP address")
  3979. }
  3980. if err != nil {
  3981. // Record a port forward failure
  3982. sshClient.updateQualityMetricsWithDialResult(false, resolveElapsedTime, IP)
  3983. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("LookupIP failed: %s", err))
  3984. return
  3985. }
  3986. remainingDialTimeout -= resolveElapsedTime
  3987. }
  3988. if remainingDialTimeout <= 0 {
  3989. sshClient.rejectNewChannel(newChannel, "TCP port forward timed out resolving")
  3990. return
  3991. }
  3992. // When the client has indicated split tunnel mode and when the channel is
  3993. // not of type protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE, check if the
  3994. // client and the port forward destination are in the same GeoIP country. If
  3995. // so, reject the port forward with a distinct response code that indicates
  3996. // to the client that this port forward should be performed locally, direct
  3997. // and untunneled.
  3998. //
  3999. // Clients are expected to cache untunneled responses to avoid this round
  4000. // trip in the immediate future and reduce server load.
  4001. //
  4002. // When the countries differ, immediately proceed with the standard port
  4003. // forward. No additional round trip is required.
  4004. //
  4005. // If either GeoIP country is "None", one or both countries are unknown
  4006. // and there is no match.
  4007. //
  4008. // Traffic rules, such as allowed ports, are not enforced for port forward
  4009. // destinations classified as untunneled.
  4010. //
  4011. // Domain and IP blocklists still apply to port forward destinations
  4012. // classified as untunneled.
  4013. //
  4014. // The client's use of split tunnel mode is logged in server_tunnel metrics
  4015. // as the boolean value split_tunnel. As they may indicate some information
  4016. // about browsing activity, no other split tunnel metrics are logged.
  4017. if doSplitTunnel {
  4018. destinationGeoIPData := sshClient.sshServer.support.GeoIPService.LookupIP(IP)
  4019. // Use the client, not peer, GeoIP data. In the case of in-proxy tunnel
  4020. // protocols, the client GeoIP fields will be populated using the
  4021. // original client IP already received, from the broker, in the handshake.
  4022. clientGeoIPData := sshClient.getClientGeoIPData()
  4023. if clientGeoIPData.Country != GEOIP_UNKNOWN_VALUE &&
  4024. sshClient.handshakeState.splitTunnelLookup.lookup(
  4025. destinationGeoIPData.Country) {
  4026. // Since isPortForwardPermitted is not called in this case, explicitly call
  4027. // ipBlocklistCheck. The domain blocklist case is handled above.
  4028. if !sshClient.isIPPermitted(IP) {
  4029. // Note: not recording a port forward failure in this case
  4030. sshClient.rejectNewChannel(newChannel, "port forward not permitted")
  4031. return
  4032. }
  4033. _ = newChannel.Reject(protocol.CHANNEL_REJECT_REASON_SPLIT_TUNNEL, "")
  4034. return
  4035. }
  4036. }
  4037. // Enforce traffic rules, using the resolved IP address.
  4038. if !sshClient.isPortForwardPermitted(
  4039. portForwardTypeTCP, IP, portToConnect) {
  4040. // Note: not recording a port forward failure in this case
  4041. sshClient.rejectNewChannel(newChannel, "port forward not permitted")
  4042. return
  4043. }
  4044. // TCP dial.
  4045. remoteAddr := net.JoinHostPort(IP.String(), strconv.Itoa(portToConnect))
  4046. if IsLogLevelDebug() {
  4047. log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("dialing")
  4048. }
  4049. ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  4050. fwdConn, err := (&net.Dialer{}).DialContext(ctx, "tcp", remoteAddr)
  4051. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  4052. // Record port forward success or failure
  4053. sshClient.updateQualityMetricsWithDialResult(err == nil, time.Since(dialStartTime), IP)
  4054. if err != nil {
  4055. // Monitor for low resource error conditions
  4056. sshClient.sshServer.monitorPortForwardDialError(err)
  4057. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("DialTimeout failed: %s", err))
  4058. return
  4059. }
  4060. // The upstream TCP port forward connection has been established. Schedule
  4061. // some cleanup and notify the SSH client that the channel is accepted.
  4062. defer fwdConn.Close()
  4063. fwdChannel, requests, err := newChannel.Accept()
  4064. if err != nil {
  4065. if !isExpectedTunnelIOError(err) {
  4066. log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")
  4067. }
  4068. return
  4069. }
  4070. go ssh.DiscardRequests(requests)
  4071. defer fwdChannel.Close()
  4072. // Release the dialing slot and acquire an established slot.
  4073. //
  4074. // establishedPortForward increments the concurrent TCP port
  4075. // forward counter and closes the LRU existing TCP port forward
  4076. // when already at the limit.
  4077. //
  4078. // Known limitations:
  4079. //
  4080. // - Closed LRU TCP sockets will enter the TIME_WAIT state,
  4081. // continuing to consume some resources.
  4082. sshClient.establishedPortForward(portForwardTypeTCP, sshClient.tcpPortForwardLRU)
  4083. // "established = true" cancels the deferred abortedTCPPortForward()
  4084. established = true
  4085. // TODO: 64-bit alignment? https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  4086. var bytesUp, bytesDown int64
  4087. defer func() {
  4088. sshClient.closedPortForward(
  4089. portForwardTypeTCP, atomic.LoadInt64(&bytesUp), atomic.LoadInt64(&bytesDown))
  4090. }()
  4091. lruEntry := sshClient.tcpPortForwardLRU.Add(fwdConn)
  4092. defer lruEntry.Remove()
  4093. // ActivityMonitoredConn monitors the TCP port forward I/O and updates
  4094. // its LRU status. ActivityMonitoredConn also times out I/O on the port
  4095. // forward if both reads and writes have been idle for the specified
  4096. // duration.
  4097. fwdConn, err = common.NewActivityMonitoredConn(
  4098. fwdConn,
  4099. sshClient.idleTCPPortForwardTimeout(),
  4100. true,
  4101. lruEntry,
  4102. sshClient.getActivityUpdaters(portForwardTypeTCP, IP)...)
  4103. if err != nil {
  4104. log.WithTraceFields(LogFields{"error": err}).Error("NewActivityMonitoredConn failed")
  4105. return
  4106. }
  4107. // Relay channel to forwarded connection.
  4108. if IsLogLevelDebug() {
  4109. log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("relaying")
  4110. }
  4111. // TODO: relay errors to fwdChannel.Stderr()?
  4112. relayWaitGroup := new(sync.WaitGroup)
  4113. relayWaitGroup.Add(1)
  4114. go func() {
  4115. defer relayWaitGroup.Done()
  4116. // io.Copy allocates a 32K temporary buffer, and each port forward relay
  4117. // uses two of these buffers; using common.CopyBuffer with a smaller buffer
  4118. // reduces the overall memory footprint.
  4119. bytes, err := common.CopyBuffer(
  4120. fwdChannel, fwdConn, make([]byte, SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE))
  4121. atomic.AddInt64(&bytesDown, bytes)
  4122. if err != nil && err != io.EOF {
  4123. // Debug since errors such as "connection reset by peer" occur during normal operation
  4124. if IsLogLevelDebug() {
  4125. log.WithTraceFields(LogFields{"error": err}).Debug("downstream TCP relay failed")
  4126. }
  4127. }
  4128. // Interrupt upstream io.Copy when downstream is shutting down.
  4129. // TODO: this is done to quickly cleanup the port forward when
  4130. // fwdConn has a read timeout, but is it clean -- upstream may still
  4131. // be flowing?
  4132. fwdChannel.Close()
  4133. }()
  4134. bytes, err := common.CopyBuffer(
  4135. fwdConn, fwdChannel, make([]byte, SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE))
  4136. atomic.AddInt64(&bytesUp, bytes)
  4137. if err != nil && err != io.EOF {
  4138. if IsLogLevelDebug() {
  4139. log.WithTraceFields(LogFields{"error": err}).Debug("upstream TCP relay failed")
  4140. }
  4141. }
  4142. // Shutdown special case: fwdChannel will be closed and return EOF when
  4143. // the SSH connection is closed, but we need to explicitly close fwdConn
  4144. // to interrupt the downstream io.Copy, which may be blocked on a
  4145. // fwdConn.Read().
  4146. fwdConn.Close()
  4147. relayWaitGroup.Wait()
  4148. if IsLogLevelDebug() {
  4149. log.WithTraceFields(
  4150. LogFields{
  4151. "remoteAddr": remoteAddr,
  4152. "bytesUp": atomic.LoadInt64(&bytesUp),
  4153. "bytesDown": atomic.LoadInt64(&bytesDown)}).Debug("exiting")
  4154. }
  4155. }