tunnelServer.go 190 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594459545964597459845994600460146024603460446054606460746084609461046114612461346144615461646174618461946204621462246234624462546264627462846294630463146324633463446354636463746384639464046414642464346444645464646474648464946504651465246534654465546564657465846594660466146624663466446654666466746684669467046714672467346744675467646774678467946804681468246834684468546864687468846894690469146924693469446954696469746984699470047014702470347044705470647074708470947104711471247134714471547164717471847194720472147224723472447254726472747284729473047314732473347344735473647374738473947404741474247434744474547464747474847494750475147524753475447554756475747584759476047614762476347644765476647674768476947704771477247734774477547764777477847794780478147824783478447854786478747884789479047914792479347944795479647974798479948004801480248034804480548064807480848094810481148124813481448154816481748184819482048214822482348244825482648274828482948304831483248334834483548364837483848394840484148424843484448454846484748484849485048514852485348544855485648574858485948604861486248634864486548664867486848694870487148724873487448754876487748784879488048814882488348844885488648874888488948904891489248934894489548964897489848994900490149024903490449054906490749084909491049114912491349144915491649174918491949204921492249234924492549264927492849294930493149324933493449354936493749384939494049414942494349444945494649474948494949504951495249534954495549564957495849594960496149624963496449654966496749684969497049714972497349744975497649774978497949804981498249834984498549864987498849894990499149924993499449954996499749984999500050015002500350045005500650075008500950105011501250135014501550165017501850195020502150225023502450255026502750285029503050315032503350345035503650375038503950405041504250435044504550465047504850495050505150525053505450555056505750585059506050615062506350645065506650675068506950705071507250735074507550765077507850795080508150825083508450855086508750885089509050915092509350945095509650975098509951005101510251035104510551065107510851095110511151125113511451155116511751185119512051215122512351245125512651275128512951305131513251335134513551365137513851395140514151425143514451455146514751485149515051515152515351545155515651575158515951605161516251635164516551665167516851695170517151725173517451755176517751785179518051815182518351845185518651875188518951905191519251935194519551965197519851995200520152025203520452055206520752085209521052115212521352145215521652175218521952205221522252235224522552265227522852295230523152325233523452355236523752385239524052415242524352445245524652475248524952505251525252535254525552565257525852595260526152625263526452655266526752685269527052715272527352745275527652775278527952805281528252835284528552865287528852895290529152925293529452955296529752985299530053015302530353045305530653075308530953105311531253135314531553165317531853195320532153225323532453255326532753285329533053315332533353345335533653375338533953405341534253435344534553465347534853495350535153525353535453555356535753585359536053615362536353645365536653675368536953705371537253735374537553765377537853795380538153825383538453855386538753885389539053915392539353945395539653975398539954005401540254035404540554065407540854095410541154125413541454155416541754185419542054215422542354245425542654275428542954305431543254335434543554365437543854395440544154425443544454455446544754485449545054515452545354545455545654575458545954605461546254635464546554665467546854695470547154725473547454755476547754785479548054815482548354845485548654875488548954905491549254935494549554965497549854995500550155025503550455055506550755085509551055115512551355145515551655175518551955205521552255235524552555265527552855295530553155325533553455355536553755385539554055415542554355445545554655475548554955505551555255535554555555565557555855595560556155625563556455655566556755685569557055715572557355745575557655775578557955805581558255835584558555865587558855895590559155925593559455955596559755985599560056015602560356045605560656075608560956105611561256135614561556165617561856195620562156225623562456255626562756285629563056315632563356345635563656375638563956405641564256435644564556465647564856495650565156525653565456555656
  1. /*
  2. * Copyright (c) 2016, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "bytes"
  22. "context"
  23. "crypto/rand"
  24. "crypto/subtle"
  25. "encoding/base64"
  26. "encoding/json"
  27. std_errors "errors"
  28. "fmt"
  29. "io"
  30. "io/ioutil"
  31. "net"
  32. "strconv"
  33. "strings"
  34. "sync"
  35. "sync/atomic"
  36. "syscall"
  37. "time"
  38. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  39. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/accesscontrol"
  40. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/ssh"
  41. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  42. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/inproxy"
  43. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/monotime"
  44. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
  45. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/osl"
  46. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  47. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  48. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  49. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/quic"
  50. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/refraction"
  51. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/stacktrace"
  52. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
  53. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/transforms"
  54. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tun"
  55. lrucache "github.com/cognusion/go-cache-lru"
  56. "github.com/marusama/semaphore"
  57. cache "github.com/patrickmn/go-cache"
  58. )
  59. const (
  60. SSH_AUTH_LOG_PERIOD = 30 * time.Minute
  61. SSH_HANDSHAKE_TIMEOUT = 30 * time.Second
  62. SSH_BEGIN_HANDSHAKE_TIMEOUT = 1 * time.Second
  63. SSH_CONNECTION_READ_DEADLINE = 5 * time.Minute
  64. SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE = 8192
  65. SSH_TCP_PORT_FORWARD_QUEUE_SIZE = 1024
  66. SSH_KEEP_ALIVE_PAYLOAD_MIN_BYTES = 0
  67. SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES = 256
  68. SSH_SEND_OSL_INITIAL_RETRY_DELAY = 30 * time.Second
  69. SSH_SEND_OSL_RETRY_FACTOR = 2
  70. GEOIP_SESSION_CACHE_TTL = 60 * time.Minute
  71. OSL_SESSION_CACHE_TTL = 5 * time.Minute
  72. MAX_AUTHORIZATIONS = 16
  73. PRE_HANDSHAKE_RANDOM_STREAM_MAX_COUNT = 1
  74. RANDOM_STREAM_MAX_BYTES = 10485760
  75. ALERT_REQUEST_QUEUE_BUFFER_SIZE = 16
  76. SSH_MAX_CLIENT_COUNT = 500000
  77. SSH_CLIENT_MAX_DSL_REQUEST_COUNT = 128
  78. )
  79. // TunnelServer is the main server that accepts Psiphon client
  80. // connections, via various obfuscation protocols, and provides
  81. // port forwarding (TCP and UDP) services to the Psiphon client.
  82. // At its core, TunnelServer is an SSH server. SSH is the base
  83. // protocol that provides port forward multiplexing, and transport
  84. // security. Layered on top of SSH, optionally, is Obfuscated SSH
  85. // and meek protocols, which provide further circumvention
  86. // capabilities.
  87. type TunnelServer struct {
  88. runWaitGroup *sync.WaitGroup
  89. listenerError chan error
  90. shutdownBroadcast <-chan struct{}
  91. sshServer *sshServer
  92. }
  93. type sshListener struct {
  94. net.Listener
  95. localAddress string
  96. tunnelProtocol string
  97. port int
  98. BPFProgramName string
  99. }
  100. // NewTunnelServer initializes a new tunnel server.
  101. func NewTunnelServer(
  102. support *SupportServices,
  103. shutdownBroadcast <-chan struct{}) (*TunnelServer, error) {
  104. sshServer, err := newSSHServer(support, shutdownBroadcast)
  105. if err != nil {
  106. return nil, errors.Trace(err)
  107. }
  108. return &TunnelServer{
  109. runWaitGroup: new(sync.WaitGroup),
  110. listenerError: make(chan error),
  111. shutdownBroadcast: shutdownBroadcast,
  112. sshServer: sshServer,
  113. }, nil
  114. }
  115. // Run runs the tunnel server; this function blocks while running a selection of
  116. // listeners that handle connections using various obfuscation protocols.
  117. //
  118. // Run listens on each designated tunnel port and spawns new goroutines to handle
  119. // each client connection. It halts when shutdownBroadcast is signaled. A list of active
  120. // clients is maintained, and when halting all clients are cleanly shutdown.
  121. //
  122. // Each client goroutine handles its own obfuscation (optional), SSH handshake, SSH
  123. // authentication, and then looping on client new channel requests. "direct-tcpip"
  124. // channels, dynamic port fowards, are supported. When the UDPInterceptUdpgwServerAddress
  125. // config parameter is configured, UDP port forwards over a TCP stream, following
  126. // the udpgw protocol, are handled.
  127. //
  128. // A new goroutine is spawned to handle each port forward for each client. Each port
  129. // forward tracks its bytes transferred. Overall per-client stats for connection duration,
  130. // GeoIP, number of port forwards, and bytes transferred are tracked and logged when the
  131. // client shuts down.
  132. //
  133. // Note: client handler goroutines may still be shutting down after Run() returns. See
  134. // comment in sshClient.stop(). TODO: fully synchronized shutdown.
  135. func (server *TunnelServer) Run() error {
  136. support := server.sshServer.support
  137. // First bind all listeners; once all are successful,
  138. // start accepting connections on each.
  139. var listeners []*sshListener
  140. defer func() {
  141. // Ensure listeners are closed on early error returns.
  142. for _, listener := range listeners {
  143. listener.Close()
  144. }
  145. }()
  146. for tunnelProtocol, listenPort := range support.Config.TunnelProtocolPorts {
  147. localAddress := net.JoinHostPort(
  148. support.Config.ServerIPAddress, strconv.Itoa(listenPort))
  149. var listener net.Listener
  150. var BPFProgramName string
  151. var err error
  152. if protocol.TunnelProtocolUsesFrontedMeekNonHTTPS(tunnelProtocol) {
  153. // For FRONTED-MEEK-QUIC, no listener implemented; for
  154. // FRONTED-MEEK-HTTP, no listener is run. The edge-to-server hop
  155. // uses HTTPS and the client tunnel protocol is distinguished
  156. // using protocol.MeekCookieData.ClientTunnelProtocol.
  157. continue
  158. } else if protocol.TunnelProtocolUsesQUIC(tunnelProtocol) {
  159. usesInproxy := protocol.TunnelProtocolUsesInproxy(tunnelProtocol)
  160. // in-proxy QUIC tunnel protocols don't support gQUIC.
  161. enableGQUIC := support.Config.EnableGQUIC && !usesInproxy
  162. disablePathMTUDiscovery := false
  163. maxPacketSizeAdjustment := 0
  164. if usesInproxy {
  165. // In the in-proxy WebRTC media stream mode, QUIC packets sent
  166. // back to the client, via the proxy, are encapsulated in
  167. // SRTP packet payloads, and the maximum QUIC packet size
  168. // must be adjusted to fit. MTU discovery is disabled so the
  169. // maximum packet size will not grow.
  170. //
  171. // Limitation: the WebRTC data channel mode does not have the
  172. // same QUIC packet size constraint, since data channel
  173. // messages can be far larger (up to 65536 bytes). However,
  174. // the server, at this point, does not know whether
  175. // individual connections are using WebRTC media streams or
  176. // data channels on the first hop, and will not know until
  177. // API handshake information is delivered after the QUIC,
  178. // OSSH, and SSH handshakes are completed. Currently the max
  179. // packet size adjustment is set unconditionally. For data
  180. // channels, this will result in suboptimal packet sizes and
  181. // a corresponding different traffic shape on the 2nd hop.
  182. maxPacketSizeAdjustment = inproxy.GetQUICMaxPacketSizeAdjustment()
  183. disablePathMTUDiscovery = true
  184. }
  185. logTunnelProtocol := tunnelProtocol
  186. listener, err = quic.Listen(
  187. CommonLogger(log),
  188. func(peerAddress string, err error, logFields common.LogFields) {
  189. logIrregularTunnel(
  190. support, logTunnelProtocol, listenPort, peerAddress,
  191. errors.Trace(err), LogFields(logFields))
  192. },
  193. localAddress,
  194. disablePathMTUDiscovery,
  195. maxPacketSizeAdjustment,
  196. support.Config.ObfuscatedSSHKey,
  197. enableGQUIC)
  198. if err != nil {
  199. return errors.Trace(err)
  200. }
  201. } else if protocol.TunnelProtocolUsesRefractionNetworking(tunnelProtocol) {
  202. listener, err = refraction.Listen(localAddress)
  203. if err != nil {
  204. return errors.Trace(err)
  205. }
  206. } else if protocol.TunnelProtocolUsesFrontedMeek(tunnelProtocol) {
  207. listener, err = net.Listen("tcp", localAddress)
  208. if err != nil {
  209. return errors.Trace(err)
  210. }
  211. } else {
  212. // Only direct, unfronted protocol listeners use TCP BPF circumvention
  213. // programs.
  214. listener, BPFProgramName, err = newTCPListenerWithBPF(support, localAddress)
  215. if err != nil {
  216. return errors.Trace(err)
  217. }
  218. if protocol.TunnelProtocolUsesTLSOSSH(tunnelProtocol) {
  219. listener, err = ListenTLSTunnel(support, listener, tunnelProtocol, listenPort)
  220. if err != nil {
  221. return errors.Trace(err)
  222. }
  223. } else if protocol.TunnelProtocolUsesShadowsocks(tunnelProtocol) {
  224. logTunnelProtocol := tunnelProtocol
  225. listener, err = ListenShadowsocks(
  226. support,
  227. listener,
  228. support.Config.ShadowsocksKey,
  229. func(peerAddress string, err error, logFields common.LogFields) {
  230. logIrregularTunnel(
  231. support, logTunnelProtocol, listenPort, peerAddress,
  232. errors.Trace(err), LogFields(logFields))
  233. },
  234. )
  235. if err != nil {
  236. return errors.Trace(err)
  237. }
  238. }
  239. }
  240. tacticsListener := NewTacticsListener(
  241. support,
  242. listener,
  243. tunnelProtocol,
  244. func(IP string) GeoIPData { return support.GeoIPService.Lookup(IP) })
  245. log.WithTraceFields(
  246. LogFields{
  247. "localAddress": localAddress,
  248. "tunnelProtocol": tunnelProtocol,
  249. "BPFProgramName": BPFProgramName,
  250. }).Info("listening")
  251. listeners = append(
  252. listeners,
  253. &sshListener{
  254. Listener: tacticsListener,
  255. localAddress: localAddress,
  256. port: listenPort,
  257. tunnelProtocol: tunnelProtocol,
  258. BPFProgramName: BPFProgramName,
  259. })
  260. }
  261. if server.sshServer.inproxyBrokerSessions != nil {
  262. // When running in-proxy tunnels, start the InproxyBrokerSession
  263. // background worker, which includes the proxy quality reporter.
  264. // Start this before any tunnels can be established.
  265. err := server.sshServer.inproxyBrokerSessions.Start()
  266. if err != nil {
  267. return errors.Trace(err)
  268. }
  269. }
  270. for _, listener := range listeners {
  271. server.runWaitGroup.Add(1)
  272. go func(listener *sshListener) {
  273. defer server.runWaitGroup.Done()
  274. log.WithTraceFields(
  275. LogFields{
  276. "localAddress": listener.localAddress,
  277. "tunnelProtocol": listener.tunnelProtocol,
  278. }).Info("running")
  279. server.sshServer.runListener(
  280. listener,
  281. server.listenerError)
  282. log.WithTraceFields(
  283. LogFields{
  284. "localAddress": listener.localAddress,
  285. "tunnelProtocol": listener.tunnelProtocol,
  286. }).Info("stopped")
  287. }(listener)
  288. }
  289. var err error
  290. select {
  291. case <-server.shutdownBroadcast:
  292. case err = <-server.listenerError:
  293. }
  294. for _, listener := range listeners {
  295. listener.Close()
  296. }
  297. server.sshServer.stopClients()
  298. server.runWaitGroup.Wait()
  299. if server.sshServer.inproxyBrokerSessions != nil {
  300. server.sshServer.inproxyBrokerSessions.Stop()
  301. }
  302. log.WithTrace().Info("stopped")
  303. return err
  304. }
  305. // GetLoadStats returns load stats for the tunnel server. The stats are
  306. // broken down by protocol ("SSH", "OSSH", etc.) and type. Types of stats
  307. // include current connected client count, total number of current port
  308. // forwards.
  309. func (server *TunnelServer) GetLoadStats() (
  310. UpstreamStats, ProtocolStats, RegionStats) {
  311. return server.sshServer.getLoadStats()
  312. }
  313. // GetEstablishedClientCount returns the number of currently established
  314. // clients.
  315. func (server *TunnelServer) GetEstablishedClientCount() int {
  316. return server.sshServer.getEstablishedClientCount()
  317. }
  318. // ResetAllClientTrafficRules resets all established client traffic rules
  319. // to use the latest config and client properties. Any existing traffic
  320. // rule state is lost, including throttling state.
  321. func (server *TunnelServer) ResetAllClientTrafficRules() {
  322. server.sshServer.resetAllClientTrafficRules()
  323. }
  324. // ResetAllClientOSLConfigs resets all established client OSL state to use
  325. // the latest OSL config. Any existing OSL state is lost, including partial
  326. // progress towards SLOKs.
  327. func (server *TunnelServer) ResetAllClientOSLConfigs() {
  328. server.sshServer.resetAllClientOSLConfigs()
  329. }
  330. // ReloadTactics signals components that use server-side tactics for one-time
  331. // initialization to reload and use potentially changed parameters.
  332. func (server *TunnelServer) ReloadTactics() error {
  333. return errors.Trace(server.sshServer.reloadTactics())
  334. }
  335. // SetEstablishTunnels sets whether new tunnels may be established or not.
  336. // When not establishing, incoming connections are immediately closed.
  337. func (server *TunnelServer) SetEstablishTunnels(establish bool) {
  338. server.sshServer.setEstablishTunnels(establish)
  339. }
  340. // CheckEstablishTunnels returns whether new tunnels may be established or
  341. // not, and increments a metrics counter when establishment is disallowed.
  342. func (server *TunnelServer) CheckEstablishTunnels() bool {
  343. return server.sshServer.checkEstablishTunnels()
  344. }
  345. // CheckLoadLimiting returns whether the server is in the load limiting state,
  346. // which is when EstablishTunnels is false. CheckLoadLimiting is intended to
  347. // be checked by non-tunnel components; no metrics are updated by this call.
  348. func (server *TunnelServer) CheckLoadLimiting() bool {
  349. return server.sshServer.checkLoadLimiting()
  350. }
  351. // GetEstablishTunnelsMetrics returns whether tunnel establishment is
  352. // currently allowed and the number of tunnels rejected since due to not
  353. // establishing since the last GetEstablishTunnelsMetrics call.
  354. func (server *TunnelServer) GetEstablishTunnelsMetrics() (bool, int64) {
  355. return server.sshServer.getEstablishTunnelsMetrics()
  356. }
  357. type sshServer struct {
  358. support *SupportServices
  359. establishTunnels int32
  360. lastAuthLog atomic.Int64
  361. authFailedCount atomic.Int64
  362. establishLimitedCount atomic.Int64
  363. concurrentSSHHandshakes semaphore.Semaphore
  364. shutdownBroadcast <-chan struct{}
  365. sshHostKey ssh.Signer
  366. obfuscatorSeedHistory *obfuscator.SeedHistory
  367. inproxyBrokerSessions *inproxy.ServerBrokerSessions
  368. clientsMutex sync.Mutex
  369. stoppingClients bool
  370. acceptedClientCounts map[string]map[string]int64
  371. clients map[string]*sshClient
  372. geoIPSessionCache *cache.Cache
  373. oslSessionCacheMutex sync.Mutex
  374. oslSessionCache *cache.Cache
  375. authorizationSessionIDsMutex sync.Mutex
  376. authorizationSessionIDs map[string]string
  377. meekServersMutex sync.Mutex
  378. meekServers []*MeekServer
  379. }
  380. func newSSHServer(
  381. support *SupportServices,
  382. shutdownBroadcast <-chan struct{}) (*sshServer, error) {
  383. privateKey, err := ssh.ParseRawPrivateKey([]byte(support.Config.SSHPrivateKey))
  384. if err != nil {
  385. return nil, errors.Trace(err)
  386. }
  387. // TODO: use cert (ssh.NewCertSigner) for anti-fingerprint?
  388. signer, err := ssh.NewSignerFromKey(privateKey)
  389. if err != nil {
  390. return nil, errors.Trace(err)
  391. }
  392. var concurrentSSHHandshakes semaphore.Semaphore
  393. if support.Config.MaxConcurrentSSHHandshakes > 0 {
  394. concurrentSSHHandshakes = semaphore.New(support.Config.MaxConcurrentSSHHandshakes)
  395. }
  396. // The geoIPSessionCache replaces the legacy cache that used to be in
  397. // GeoIPServices and was used for the now-retired web API. That cache was
  398. // also used for, and now geoIPSessionCache provides:
  399. // - Determining first-tunnel-in-session (from a single server's point of
  400. // view)
  401. // - GeoIP for duplicate authorizations logic.
  402. //
  403. // TODO: combine geoIPSessionCache with oslSessionCache; need to deal with
  404. // OSL flush on hot reload and reconcile differing TTLs.
  405. geoIPSessionCache := cache.New(GEOIP_SESSION_CACHE_TTL, 1*time.Minute)
  406. // The OSL session cache temporarily retains OSL seed state
  407. // progress for disconnected clients. This enables clients
  408. // that disconnect and immediately reconnect to the same
  409. // server to resume their OSL progress. Cached progress
  410. // is referenced by session ID and is retained for
  411. // OSL_SESSION_CACHE_TTL after disconnect.
  412. //
  413. // Note: session IDs are assumed to be unpredictable. If a
  414. // rogue client could guess the session ID of another client,
  415. // it could resume its OSL progress and, if the OSL config
  416. // were known, infer some activity.
  417. oslSessionCache := cache.New(OSL_SESSION_CACHE_TTL, 1*time.Minute)
  418. // inproxyBrokerSessions are the secure in-proxy broker/server sessions
  419. // used to relay information from the broker to the server, including the
  420. // original in-proxy client IP and the in-proxy proxy ID.
  421. //
  422. // Only brokers with public keys configured in the
  423. // InproxyAllBrokerSpecs tactic parameter are allowed to connect to
  424. // the server, and brokers verify the server's public key via the
  425. // InproxySessionPublicKey server entry field.
  426. //
  427. // Sessions are initialized and run for all psiphond instances running any
  428. // in-proxy tunnel protocol.
  429. //
  430. // inproxyBrokerSessions also run the server proxy quality reporter, which
  431. // makes requests to brokers configured in InproxyAllBrokerSpecs.
  432. var inproxyBrokerSessions *inproxy.ServerBrokerSessions
  433. runningInproxy := false
  434. for tunnelProtocol := range support.Config.TunnelProtocolPorts {
  435. if protocol.TunnelProtocolUsesInproxy(tunnelProtocol) {
  436. runningInproxy = true
  437. break
  438. }
  439. }
  440. if runningInproxy {
  441. inproxyPrivateKey, err := inproxy.SessionPrivateKeyFromString(
  442. support.Config.InproxyServerSessionPrivateKey)
  443. if err != nil {
  444. return nil, errors.Trace(err)
  445. }
  446. inproxyObfuscationSecret, err := inproxy.ObfuscationSecretFromString(
  447. support.Config.InproxyServerObfuscationRootSecret)
  448. if err != nil {
  449. return nil, errors.Trace(err)
  450. }
  451. makeRoundTripper := func(
  452. brokerPublicKey inproxy.SessionPublicKey) (
  453. inproxy.RoundTripper, common.APIParameters, error) {
  454. roundTripper, additionalParams, err := MakeInproxyProxyQualityBrokerRoundTripper(
  455. support, brokerPublicKey)
  456. if err != nil {
  457. return nil, nil, errors.Trace(err)
  458. }
  459. return roundTripper, additionalParams, nil
  460. }
  461. // The expected broker specd and public keys are set in reloadTactics
  462. // directly below, so none are set here.
  463. config := &inproxy.ServerBrokerSessionsConfig{
  464. Logger: CommonLogger(log),
  465. ServerPrivateKey: inproxyPrivateKey,
  466. ServerRootObfuscationSecret: inproxyObfuscationSecret,
  467. BrokerRoundTripperMaker: makeRoundTripper,
  468. ProxyMetricsValidator: getInproxyBrokerAPIParameterValidator(),
  469. ProxyMetricsFormatter: getInproxyBrokerAPIParameterLogFieldFormatter(),
  470. // Prefix for proxy metrics log fields in server_tunnel
  471. ProxyMetricsPrefix: "inproxy_proxy_",
  472. }
  473. inproxyBrokerSessions, err = inproxy.NewServerBrokerSessions(config)
  474. if err != nil {
  475. return nil, errors.Trace(err)
  476. }
  477. }
  478. // Limitation: rate limiting and resource limiting are handled by external
  479. // components, and sshServer enforces only a sanity check limit on the
  480. // number of entries in sshServer.clients; and no limit on the number of
  481. // entries in sshServer.geoIPSessionCache or sshServer.oslSessionCache.
  482. //
  483. // To avoid resource exhaustion, this implementation relies on:
  484. //
  485. // - Per-peer IP address and/or overall network connection rate limiting,
  486. // provided by iptables as configured by Psiphon automation
  487. // (https://github.com/Psiphon-Inc/psiphon-automation/blob/
  488. // 4d913d13339d7d54c053a01e5a928e343045cde8/Automation/psi_ops_install.py#L1451).
  489. //
  490. // - Host CPU/memory/network monitoring and signalling, installed Psiphon
  491. // automation
  492. // (https://github.com/Psiphon-Inc/psiphon-automation/blob/
  493. // 4d913d13339d7d54c053a01e5a928e343045cde8/Automation/psi_ops_install.py#L935).
  494. // When resource usage meets certain thresholds, the monitoring signals
  495. // this process with SIGTSTP or SIGCONT, and handlers call
  496. // sshServer.setEstablishTunnels to stop or resume accepting new clients.
  497. sshServer := &sshServer{
  498. support: support,
  499. establishTunnels: 1,
  500. concurrentSSHHandshakes: concurrentSSHHandshakes,
  501. shutdownBroadcast: shutdownBroadcast,
  502. sshHostKey: signer,
  503. acceptedClientCounts: make(map[string]map[string]int64),
  504. clients: make(map[string]*sshClient),
  505. geoIPSessionCache: geoIPSessionCache,
  506. oslSessionCache: oslSessionCache,
  507. authorizationSessionIDs: make(map[string]string),
  508. obfuscatorSeedHistory: obfuscator.NewSeedHistory(nil),
  509. inproxyBrokerSessions: inproxyBrokerSessions,
  510. }
  511. // Initialize components that use server-side tactics and which reload on
  512. // tactics change events.
  513. err = sshServer.reloadTactics()
  514. if err != nil {
  515. return nil, errors.Trace(err)
  516. }
  517. return sshServer, nil
  518. }
  519. func (sshServer *sshServer) setEstablishTunnels(establish bool) {
  520. // Do nothing when the setting is already correct. This avoids
  521. // spurious log messages when setEstablishTunnels is called
  522. // periodically with the same setting.
  523. if establish == (atomic.LoadInt32(&sshServer.establishTunnels) == 1) {
  524. return
  525. }
  526. establishFlag := int32(1)
  527. if !establish {
  528. establishFlag = 0
  529. }
  530. atomic.StoreInt32(&sshServer.establishTunnels, establishFlag)
  531. log.WithTraceFields(
  532. LogFields{"establish": establish}).Info("establishing tunnels")
  533. }
  534. func (sshServer *sshServer) checkEstablishTunnels() bool {
  535. establishTunnels := atomic.LoadInt32(&sshServer.establishTunnels) == 1
  536. if !establishTunnels {
  537. sshServer.establishLimitedCount.Add(1)
  538. }
  539. return establishTunnels
  540. }
  541. func (sshServer *sshServer) checkLoadLimiting() bool {
  542. // The server is in a general load limiting state when
  543. // sshServer.establishTunnels is false (0). This check is intended to be
  544. // used by non-tunnel components and no metrics are updated by this call.
  545. return atomic.LoadInt32(&sshServer.establishTunnels) == 0
  546. }
  547. func (sshServer *sshServer) getEstablishTunnelsMetrics() (bool, int64) {
  548. return atomic.LoadInt32(&sshServer.establishTunnels) == 1,
  549. sshServer.establishLimitedCount.Swap(0)
  550. }
  551. // additionalTransportData is additional data gathered at transport level,
  552. // such as in MeekServer at the HTTP layer, and relayed to the
  553. // sshServer/sshClient.
  554. type additionalTransportData struct {
  555. overrideTunnelProtocol string
  556. steeringIP string
  557. }
  558. // reportListenerError logs a listener error and sends it the
  559. // TunnelServer.Run. Callers should wrap the input err in an immediate
  560. // errors.Trace.
  561. func reportListenerError(listenerError chan<- error, err error) {
  562. // Record "caller" just in case the caller fails to wrap err in an
  563. // errors.Trace.
  564. log.WithTraceFields(
  565. LogFields{
  566. "error": err,
  567. "caller": stacktrace.GetParentFunctionName()}).Error("listener error")
  568. select {
  569. case listenerError <- err:
  570. default:
  571. }
  572. }
  573. // runListener is intended to run an a goroutine; it blocks
  574. // running a particular listener. If an unrecoverable error
  575. // occurs, it will send the error to the listenerError channel.
  576. func (sshServer *sshServer) runListener(sshListener *sshListener, listenerError chan<- error) {
  577. handleClient := func(conn net.Conn, transportData *additionalTransportData) {
  578. // Note: establish tunnel limiter cannot simply stop TCP
  579. // listeners in all cases (e.g., meek) since SSH tunnels can
  580. // span multiple TCP connections.
  581. if !sshServer.checkEstablishTunnels() {
  582. if IsLogLevelDebug() {
  583. log.WithTrace().Debug("not establishing tunnels")
  584. }
  585. conn.Close()
  586. return
  587. }
  588. // sshListener.tunnelProtocol indictes the tunnel protocol run by the
  589. // listener. For direct protocols, this is also the client tunnel protocol.
  590. // For fronted protocols, the client may use a different protocol to connect
  591. // to the front and then only the front-to-Psiphon server will use the
  592. // listener protocol.
  593. //
  594. // A fronted meek client, for example, reports its first hop protocol in
  595. // protocol.MeekCookieData.ClientTunnelProtocol. Most metrics record this
  596. // value as relay_protocol, since the first hop is the one subject to
  597. // adversarial conditions. In some cases, such as irregular tunnels, there
  598. // is no ClientTunnelProtocol value available and the listener tunnel
  599. // protocol will be logged.
  600. //
  601. // Similarly, listenerPort indicates the listening port, which is the dialed
  602. // port number for direct protocols; while, for fronted protocols, the
  603. // client may dial a different port for its first hop.
  604. // Process each client connection concurrently.
  605. go sshServer.handleClient(sshListener, conn, transportData)
  606. }
  607. // Note: when exiting due to a unrecoverable error, be sure
  608. // to try to send the error to listenerError so that the outer
  609. // TunnelServer.Run will properly shut down instead of remaining
  610. // running.
  611. if protocol.TunnelProtocolUsesMeekHTTP(sshListener.tunnelProtocol) ||
  612. protocol.TunnelProtocolUsesMeekHTTPS(sshListener.tunnelProtocol) {
  613. if sshServer.tunnelProtocolUsesTLSDemux(sshListener.tunnelProtocol) {
  614. sshServer.runMeekTLSOSSHDemuxListener(sshListener, listenerError, handleClient)
  615. } else {
  616. meekServer, err := NewMeekServer(
  617. sshServer.support,
  618. sshListener.Listener,
  619. sshListener.tunnelProtocol,
  620. sshListener.port,
  621. protocol.TunnelProtocolUsesMeekHTTPS(sshListener.tunnelProtocol),
  622. protocol.TunnelProtocolUsesFrontedMeek(sshListener.tunnelProtocol),
  623. protocol.TunnelProtocolUsesObfuscatedSessionTickets(sshListener.tunnelProtocol),
  624. true,
  625. handleClient,
  626. sshServer.shutdownBroadcast)
  627. if err == nil {
  628. sshServer.registerMeekServer(meekServer)
  629. err = meekServer.Run()
  630. }
  631. if err != nil {
  632. reportListenerError(listenerError, errors.Trace(err))
  633. return
  634. }
  635. }
  636. } else {
  637. runListener(sshListener.Listener, sshServer.shutdownBroadcast, listenerError, "", handleClient)
  638. }
  639. }
  640. // runMeekTLSOSSHDemuxListener blocks running a listener which demuxes meek and
  641. // TLS-OSSH connections received on the same port.
  642. func (sshServer *sshServer) runMeekTLSOSSHDemuxListener(
  643. sshListener *sshListener,
  644. listenerError chan<- error,
  645. handleClient func(conn net.Conn, transportData *additionalTransportData)) {
  646. meekClassifier := protocolClassifier{
  647. minBytesToMatch: 4,
  648. maxBytesToMatch: 4,
  649. match: func(b []byte) bool {
  650. // NOTE: HTTP transforms are only applied to plain HTTP
  651. // meek so they are not a concern here.
  652. return bytes.Contains(b, []byte("POST"))
  653. },
  654. }
  655. tlsClassifier := protocolClassifier{
  656. // NOTE: technically +1 not needed if detectors are evaluated
  657. // in order by index in classifier array, which they are.
  658. minBytesToMatch: meekClassifier.maxBytesToMatch + 1,
  659. maxBytesToMatch: meekClassifier.maxBytesToMatch + 1,
  660. match: func(b []byte) bool {
  661. return len(b) > 4 // if not classified as meek, then tls
  662. },
  663. }
  664. listener, err := ListenTLSTunnel(
  665. sshServer.support,
  666. sshListener.Listener,
  667. sshListener.tunnelProtocol,
  668. sshListener.port)
  669. if err != nil {
  670. reportListenerError(listenerError, errors.Trace(err))
  671. return
  672. }
  673. mux, listeners := newProtocolDemux(
  674. context.Background(),
  675. listener,
  676. []protocolClassifier{meekClassifier, tlsClassifier},
  677. sshServer.support.Config.sshHandshakeTimeout)
  678. var wg sync.WaitGroup
  679. wg.Add(1)
  680. go func() {
  681. // handle shutdown gracefully
  682. defer wg.Done()
  683. <-sshServer.shutdownBroadcast
  684. err := mux.Close()
  685. if err != nil {
  686. log.WithTraceFields(LogFields{"error": err}).Error("close failed")
  687. }
  688. }()
  689. wg.Add(1)
  690. go func() {
  691. // start demultiplexing TLS-OSSH and meek HTTPS connections
  692. defer wg.Done()
  693. err := mux.run()
  694. if err != nil {
  695. reportListenerError(listenerError, errors.Trace(err))
  696. return
  697. }
  698. }()
  699. wg.Add(1)
  700. go func() {
  701. // start handling TLS-OSSH connections as they are demultiplexed
  702. defer wg.Done()
  703. // Override the listener tunnel protocol to report TLS-OSSH instead.
  704. runListener(
  705. listeners[1],
  706. sshServer.shutdownBroadcast,
  707. listenerError,
  708. protocol.TUNNEL_PROTOCOL_TLS_OBFUSCATED_SSH, handleClient)
  709. }()
  710. wg.Add(1)
  711. go func() {
  712. // start handling meek HTTPS connections as they are
  713. // demultiplexed
  714. defer wg.Done()
  715. meekServer, err := NewMeekServer(
  716. sshServer.support,
  717. listeners[0],
  718. sshListener.tunnelProtocol,
  719. sshListener.port,
  720. false,
  721. protocol.TunnelProtocolUsesFrontedMeek(sshListener.tunnelProtocol),
  722. protocol.TunnelProtocolUsesObfuscatedSessionTickets(sshListener.tunnelProtocol),
  723. true,
  724. handleClient,
  725. sshServer.shutdownBroadcast)
  726. if err == nil {
  727. sshServer.registerMeekServer(meekServer)
  728. err = meekServer.Run()
  729. }
  730. if err != nil {
  731. reportListenerError(listenerError, errors.Trace(err))
  732. return
  733. }
  734. }()
  735. wg.Wait()
  736. }
  737. func runListener(
  738. listener net.Listener,
  739. shutdownBroadcast <-chan struct{},
  740. listenerError chan<- error,
  741. overrideTunnelProtocol string,
  742. handleClient func(conn net.Conn, transportData *additionalTransportData)) {
  743. for {
  744. conn, err := listener.Accept()
  745. select {
  746. case <-shutdownBroadcast:
  747. if err == nil {
  748. conn.Close()
  749. }
  750. return
  751. default:
  752. }
  753. if err != nil {
  754. if e, ok := err.(net.Error); ok && e.Temporary() {
  755. log.WithTraceFields(LogFields{"error": err}).Error("accept failed")
  756. // Temporary error, keep running
  757. continue
  758. } else if std_errors.Is(err, errRestrictedProvider) {
  759. log.WithTraceFields(LogFields{"error": err}).Error("accept rejected client")
  760. // Restricted provider, keep running
  761. continue
  762. }
  763. reportListenerError(listenerError, errors.Trace(err))
  764. return
  765. }
  766. var transportData *additionalTransportData
  767. if overrideTunnelProtocol != "" {
  768. transportData = &additionalTransportData{
  769. overrideTunnelProtocol: overrideTunnelProtocol,
  770. }
  771. }
  772. handleClient(conn, transportData)
  773. }
  774. }
  775. // registerMeekServer registers a MeekServer instance to receive tactics
  776. // reload signals.
  777. func (sshServer *sshServer) registerMeekServer(meekServer *MeekServer) {
  778. sshServer.meekServersMutex.Lock()
  779. defer sshServer.meekServersMutex.Unlock()
  780. sshServer.meekServers = append(sshServer.meekServers, meekServer)
  781. }
  782. // reloadMeekServerTactics signals each registered MeekServer instance that
  783. // tactics have reloaded and may have changed.
  784. func (sshServer *sshServer) reloadMeekServerTactics() error {
  785. sshServer.meekServersMutex.Lock()
  786. defer sshServer.meekServersMutex.Unlock()
  787. for _, meekServer := range sshServer.meekServers {
  788. err := meekServer.ReloadTactics()
  789. if err != nil {
  790. return errors.Trace(err)
  791. }
  792. }
  793. return nil
  794. }
  795. // An accepted client has completed a direct TCP or meek connection and has a
  796. // net.Conn. Registration is for tracking the number of connections.
  797. func (sshServer *sshServer) registerAcceptedClient(tunnelProtocol, region string) {
  798. sshServer.clientsMutex.Lock()
  799. defer sshServer.clientsMutex.Unlock()
  800. if sshServer.acceptedClientCounts[tunnelProtocol] == nil {
  801. sshServer.acceptedClientCounts[tunnelProtocol] = make(map[string]int64)
  802. }
  803. sshServer.acceptedClientCounts[tunnelProtocol][region] += 1
  804. }
  805. func (sshServer *sshServer) unregisterAcceptedClient(tunnelProtocol, region string) {
  806. sshServer.clientsMutex.Lock()
  807. defer sshServer.clientsMutex.Unlock()
  808. sshServer.acceptedClientCounts[tunnelProtocol][region] -= 1
  809. }
  810. // An established client has completed its SSH handshake and has a ssh.Conn. Registration is
  811. // for tracking the number of fully established clients and for maintaining a list of running
  812. // clients (for stopping at shutdown time).
  813. func (sshServer *sshServer) registerEstablishedClient(client *sshClient) bool {
  814. sshServer.clientsMutex.Lock()
  815. if sshServer.stoppingClients {
  816. sshServer.clientsMutex.Unlock()
  817. return false
  818. }
  819. // In the case of a duplicate client sessionID, the previous client is closed.
  820. // - Well-behaved clients generate a random sessionID that should be unique (won't
  821. // accidentally conflict) and hard to guess (can't be targeted by a malicious
  822. // client).
  823. // - Clients reuse the same sessionID when a tunnel is unexpectedly disconnected
  824. // and reestablished. In this case, when the same server is selected, this logic
  825. // will be hit; closing the old, dangling client is desirable.
  826. // - Multi-tunnel clients should not normally use one server for multiple tunnels.
  827. existingClient := sshServer.clients[client.sessionID]
  828. sshServer.clientsMutex.Unlock()
  829. if existingClient != nil {
  830. // This case is expected to be common, and so logged at the lowest severity
  831. // level.
  832. log.WithTrace().Debug(
  833. "stopping existing client with duplicate session ID")
  834. existingClient.stop()
  835. // Block until the existingClient is fully terminated. This is necessary to
  836. // avoid this scenario:
  837. // - existingClient is invoking handshakeAPIRequestHandler
  838. // - sshServer.clients[client.sessionID] is updated to point to new client
  839. // - existingClient's handshakeAPIRequestHandler invokes
  840. // setHandshakeState but sets the handshake parameters for new
  841. // client
  842. // - as a result, the new client handshake will fail (only a single handshake
  843. // is permitted) and the new client server_tunnel log will contain an
  844. // invalid mix of existing/new client fields
  845. //
  846. // Once existingClient.awaitStopped returns, all existingClient port
  847. // forwards and request handlers have terminated, so no API handler, either
  848. // tunneled web API or SSH API, will remain and it is safe to point
  849. // sshServer.clients[client.sessionID] to the new client.
  850. // Limitation: this scenario remains possible with _untunneled_ web API
  851. // requests.
  852. //
  853. // Blocking also ensures existingClient.releaseAuthorizations is invoked before
  854. // the new client attempts to submit the same authorizations.
  855. //
  856. // Perform blocking awaitStopped operation outside the
  857. // sshServer.clientsMutex mutex to avoid blocking all other clients for the
  858. // duration. We still expect and require that the stop process completes
  859. // rapidly, e.g., does not block on network I/O, allowing the new client
  860. // connection to proceed without delay.
  861. //
  862. // In addition, operations triggered by stop, and which must complete before
  863. // awaitStopped returns, will attempt to lock sshServer.clientsMutex,
  864. // including unregisterEstablishedClient.
  865. existingClient.awaitStopped()
  866. }
  867. sshServer.clientsMutex.Lock()
  868. defer sshServer.clientsMutex.Unlock()
  869. // existingClient's stop will have removed it from sshServer.clients via
  870. // unregisterEstablishedClient, so sshServer.clients[client.sessionID] should
  871. // be nil -- unless yet another client instance using the same sessionID has
  872. // connected in the meantime while awaiting existingClient stop. In this
  873. // case, it's not clear which is the most recent connection from the client,
  874. // so instead of this connection terminating more peers, it aborts.
  875. if sshServer.clients[client.sessionID] != nil {
  876. // As this is expected to be rare case, it's logged at a higher severity
  877. // level.
  878. log.WithTrace().Warning(
  879. "aborting new client with duplicate session ID")
  880. return false
  881. }
  882. // SSH_MAX_CLIENT_COUNT is a simple sanity check and failsafe. Load
  883. // limiting tuned to each server's host resources is provided by external
  884. // components. See comment in newSSHServer for more details.
  885. if len(sshServer.clients) >= SSH_MAX_CLIENT_COUNT {
  886. log.WithTrace().Warning("SSH_MAX_CLIENT_COUNT exceeded")
  887. return false
  888. }
  889. sshServer.clients[client.sessionID] = client
  890. return true
  891. }
  892. func (sshServer *sshServer) unregisterEstablishedClient(client *sshClient) {
  893. sshServer.clientsMutex.Lock()
  894. registeredClient := sshServer.clients[client.sessionID]
  895. // registeredClient will differ from client when client is the existingClient
  896. // terminated in registerEstablishedClient. In that case, registeredClient
  897. // remains connected, and the sshServer.clients entry should be retained.
  898. if registeredClient == client {
  899. delete(sshServer.clients, client.sessionID)
  900. }
  901. sshServer.clientsMutex.Unlock()
  902. client.stop()
  903. }
  904. type UpstreamStats map[string]interface{}
  905. type ProtocolStats map[string]map[string]interface{}
  906. type RegionStats map[string]map[string]map[string]interface{}
  907. func (sshServer *sshServer) getLoadStats() (
  908. UpstreamStats, ProtocolStats, RegionStats) {
  909. sshServer.clientsMutex.Lock()
  910. defer sshServer.clientsMutex.Unlock()
  911. // Explicitly populate with zeros to ensure 0 counts in log messages.
  912. zeroClientStats := func() map[string]interface{} {
  913. stats := make(map[string]interface{})
  914. stats["accepted_clients"] = int64(0)
  915. stats["established_clients"] = int64(0)
  916. return stats
  917. }
  918. // Due to hot reload and changes to the underlying system configuration, the
  919. // set of resolver IPs may change between getLoadStats calls, so this
  920. // enumeration for zeroing is a best effort.
  921. resolverIPs := sshServer.support.DNSResolver.GetAll()
  922. logDNSServerMetrics := sshServer.support.Config.LogDNSServerLoadMetrics
  923. // Fields which are primarily concerned with upstream/egress performance.
  924. zeroUpstreamStats := func() map[string]interface{} {
  925. stats := make(map[string]interface{})
  926. stats["dialing_tcp_port_forwards"] = int64(0)
  927. stats["tcp_port_forwards"] = int64(0)
  928. stats["total_tcp_port_forwards"] = int64(0)
  929. stats["udp_port_forwards"] = int64(0)
  930. stats["total_udp_port_forwards"] = int64(0)
  931. stats["tcp_port_forward_dialed_count"] = int64(0)
  932. stats["tcp_port_forward_dialed_duration"] = int64(0)
  933. stats["tcp_port_forward_failed_count"] = int64(0)
  934. stats["tcp_port_forward_failed_duration"] = int64(0)
  935. stats["tcp_port_forward_rejected_dialing_limit_count"] = int64(0)
  936. stats["tcp_port_forward_rejected_disallowed_count"] = int64(0)
  937. stats["udp_port_forward_rejected_disallowed_count"] = int64(0)
  938. stats["tcp_ipv4_port_forward_dialed_count"] = int64(0)
  939. stats["tcp_ipv4_port_forward_dialed_duration"] = int64(0)
  940. stats["tcp_ipv4_port_forward_failed_count"] = int64(0)
  941. stats["tcp_ipv4_port_forward_failed_duration"] = int64(0)
  942. stats["tcp_ipv6_port_forward_dialed_count"] = int64(0)
  943. stats["tcp_ipv6_port_forward_dialed_duration"] = int64(0)
  944. stats["tcp_ipv6_port_forward_failed_count"] = int64(0)
  945. stats["tcp_ipv6_port_forward_failed_duration"] = int64(0)
  946. zeroDNSStats := func() map[string]int64 {
  947. m := map[string]int64{"ALL": 0}
  948. if logDNSServerMetrics {
  949. for _, resolverIP := range resolverIPs {
  950. m[resolverIP.String()] = 0
  951. }
  952. }
  953. return m
  954. }
  955. stats["dns_count"] = zeroDNSStats()
  956. stats["dns_duration"] = zeroDNSStats()
  957. stats["dns_failed_count"] = zeroDNSStats()
  958. stats["dns_failed_duration"] = zeroDNSStats()
  959. return stats
  960. }
  961. zeroProtocolStats := func() map[string]map[string]interface{} {
  962. stats := make(map[string]map[string]interface{})
  963. stats["ALL"] = zeroClientStats()
  964. for tunnelProtocol := range sshServer.support.Config.TunnelProtocolPorts {
  965. stats[tunnelProtocol] = zeroClientStats()
  966. if sshServer.tunnelProtocolUsesTLSDemux(tunnelProtocol) {
  967. stats[protocol.TUNNEL_PROTOCOL_TLS_OBFUSCATED_SSH] = zeroClientStats()
  968. }
  969. }
  970. return stats
  971. }
  972. addInt64 := func(stats map[string]interface{}, name string, value int64) {
  973. stats[name] = stats[name].(int64) + value
  974. }
  975. upstreamStats := zeroUpstreamStats()
  976. // [<protocol or ALL>][<stat name>] -> count
  977. protocolStats := zeroProtocolStats()
  978. // [<region][<protocol or ALL>][<stat name>] -> count
  979. regionStats := make(RegionStats)
  980. // Note: as currently tracked/counted, each established client is also an accepted client
  981. // Accepted client counts use peer GeoIP data, which in the case of
  982. // in-proxy tunnel protocols is the proxy, not the client. The original
  983. // client IP is only obtained after the tunnel handshake has completed.
  984. for tunnelProtocol, regionAcceptedClientCounts := range sshServer.acceptedClientCounts {
  985. for region, acceptedClientCount := range regionAcceptedClientCounts {
  986. if acceptedClientCount > 0 {
  987. if regionStats[region] == nil {
  988. regionStats[region] = zeroProtocolStats()
  989. }
  990. addInt64(protocolStats["ALL"], "accepted_clients", acceptedClientCount)
  991. addInt64(protocolStats[tunnelProtocol], "accepted_clients", acceptedClientCount)
  992. addInt64(regionStats[region]["ALL"], "accepted_clients", acceptedClientCount)
  993. addInt64(regionStats[region][tunnelProtocol], "accepted_clients", acceptedClientCount)
  994. }
  995. }
  996. }
  997. for _, client := range sshServer.clients {
  998. client.Lock()
  999. // Limitation: registerEstablishedClient is called before the
  1000. // handshake API completes; as a result, in the case of in-proxy
  1001. // tunnel protocol, clientGeoIPData may not yet be initialized and
  1002. // will count as None.
  1003. tunnelProtocol := client.tunnelProtocol
  1004. region := client.clientGeoIPData.Country
  1005. if regionStats[region] == nil {
  1006. regionStats[region] = zeroProtocolStats()
  1007. }
  1008. for _, stats := range []map[string]interface{}{
  1009. protocolStats["ALL"],
  1010. protocolStats[tunnelProtocol],
  1011. regionStats[region]["ALL"],
  1012. regionStats[region][tunnelProtocol]} {
  1013. addInt64(stats, "established_clients", 1)
  1014. }
  1015. // Note:
  1016. // - can't sum trafficState.peakConcurrentPortForwardCount to get a global peak
  1017. // - client.udpTrafficState.concurrentDialingPortForwardCount isn't meaningful
  1018. addInt64(upstreamStats, "dialing_tcp_port_forwards",
  1019. client.tcpTrafficState.concurrentDialingPortForwardCount)
  1020. addInt64(upstreamStats, "tcp_port_forwards",
  1021. client.tcpTrafficState.concurrentPortForwardCount)
  1022. addInt64(upstreamStats, "total_tcp_port_forwards",
  1023. client.tcpTrafficState.totalPortForwardCount)
  1024. addInt64(upstreamStats, "udp_port_forwards",
  1025. client.udpTrafficState.concurrentPortForwardCount)
  1026. addInt64(upstreamStats, "total_udp_port_forwards",
  1027. client.udpTrafficState.totalPortForwardCount)
  1028. addInt64(upstreamStats, "tcp_port_forward_dialed_count",
  1029. client.qualityMetrics.TCPPortForwardDialedCount)
  1030. addInt64(upstreamStats, "tcp_port_forward_dialed_duration",
  1031. int64(client.qualityMetrics.TCPPortForwardDialedDuration/time.Millisecond))
  1032. addInt64(upstreamStats, "tcp_port_forward_failed_count",
  1033. client.qualityMetrics.TCPPortForwardFailedCount)
  1034. addInt64(upstreamStats, "tcp_port_forward_failed_duration",
  1035. int64(client.qualityMetrics.TCPPortForwardFailedDuration/time.Millisecond))
  1036. addInt64(upstreamStats, "tcp_port_forward_rejected_dialing_limit_count",
  1037. client.qualityMetrics.TCPPortForwardRejectedDialingLimitCount)
  1038. addInt64(upstreamStats, "tcp_port_forward_rejected_disallowed_count",
  1039. client.qualityMetrics.TCPPortForwardRejectedDisallowedCount)
  1040. addInt64(upstreamStats, "udp_port_forward_rejected_disallowed_count",
  1041. client.qualityMetrics.UDPPortForwardRejectedDisallowedCount)
  1042. addInt64(upstreamStats, "tcp_ipv4_port_forward_dialed_count",
  1043. client.qualityMetrics.TCPIPv4PortForwardDialedCount)
  1044. addInt64(upstreamStats, "tcp_ipv4_port_forward_dialed_duration",
  1045. int64(client.qualityMetrics.TCPIPv4PortForwardDialedDuration/time.Millisecond))
  1046. addInt64(upstreamStats, "tcp_ipv4_port_forward_failed_count",
  1047. client.qualityMetrics.TCPIPv4PortForwardFailedCount)
  1048. addInt64(upstreamStats, "tcp_ipv4_port_forward_failed_duration",
  1049. int64(client.qualityMetrics.TCPIPv4PortForwardFailedDuration/time.Millisecond))
  1050. addInt64(upstreamStats, "tcp_ipv6_port_forward_dialed_count",
  1051. client.qualityMetrics.TCPIPv6PortForwardDialedCount)
  1052. addInt64(upstreamStats, "tcp_ipv6_port_forward_dialed_duration",
  1053. int64(client.qualityMetrics.TCPIPv6PortForwardDialedDuration/time.Millisecond))
  1054. addInt64(upstreamStats, "tcp_ipv6_port_forward_failed_count",
  1055. client.qualityMetrics.TCPIPv6PortForwardFailedCount)
  1056. addInt64(upstreamStats, "tcp_ipv6_port_forward_failed_duration",
  1057. int64(client.qualityMetrics.TCPIPv6PortForwardFailedDuration/time.Millisecond))
  1058. // DNS metrics limitations:
  1059. // - port forwards (sshClient.handleTCPChannel) don't know or log the resolver IP.
  1060. // - udpgw and packet tunnel transparent DNS use a heuristic to classify success/failure,
  1061. // and there may be some delay before these code paths report DNS metrics.
  1062. // Every client.qualityMetrics DNS map has an "ALL" entry.
  1063. totalDNSCount := int64(0)
  1064. totalDNSFailedCount := int64(0)
  1065. for key, value := range client.qualityMetrics.DNSCount {
  1066. all := key == "ALL"
  1067. if all || logDNSServerMetrics {
  1068. upstreamStats["dns_count"].(map[string]int64)[key] += value
  1069. }
  1070. if all {
  1071. totalDNSCount += value
  1072. }
  1073. }
  1074. for key, value := range client.qualityMetrics.DNSDuration {
  1075. if key == "ALL" || logDNSServerMetrics {
  1076. upstreamStats["dns_duration"].(map[string]int64)[key] += int64(value / time.Millisecond)
  1077. }
  1078. }
  1079. for key, value := range client.qualityMetrics.DNSFailedCount {
  1080. all := key == "ALL"
  1081. if all || logDNSServerMetrics {
  1082. upstreamStats["dns_failed_count"].(map[string]int64)[key] += value
  1083. }
  1084. if all {
  1085. totalDNSFailedCount += value
  1086. }
  1087. }
  1088. for key, value := range client.qualityMetrics.DNSFailedDuration {
  1089. if key == "ALL" || logDNSServerMetrics {
  1090. upstreamStats["dns_failed_duration"].(map[string]int64)[key] += int64(value / time.Millisecond)
  1091. }
  1092. }
  1093. // Update client peak failure rate metrics, to be recorded in
  1094. // server_tunnel.
  1095. //
  1096. // Limitations:
  1097. //
  1098. // - This is a simple data sampling that doesn't require additional
  1099. // timers or tracking logic. Since the rates are calculated on
  1100. // getLoadStats events and using accumulated counts, these peaks
  1101. // only represent the highest failure rate within a
  1102. // Config.LoadMonitorPeriodSeconds non-sliding window. There is no
  1103. // sample recorded for short tunnels with no overlapping
  1104. // getLoadStats event.
  1105. //
  1106. // - There is no minimum sample window, as a getLoadStats event may
  1107. // occur immediately after a client first connects. This may be
  1108. // compensated for by adjusting
  1109. // Config.PeakUpstreamFailureRateMinimumSampleSize, so as to only
  1110. // consider failure rates with a larger number of samples.
  1111. //
  1112. // - Non-UDP "failures" are not currently tracked.
  1113. minimumSampleSize := int64(sshServer.support.Config.peakUpstreamFailureRateMinimumSampleSize)
  1114. sampleSize := client.qualityMetrics.TCPPortForwardDialedCount +
  1115. client.qualityMetrics.TCPPortForwardFailedCount
  1116. if sampleSize >= minimumSampleSize {
  1117. TCPPortForwardFailureRate := float64(client.qualityMetrics.TCPPortForwardFailedCount) /
  1118. float64(sampleSize)
  1119. if client.peakMetrics.TCPPortForwardFailureRate == nil {
  1120. client.peakMetrics.TCPPortForwardFailureRate = new(float64)
  1121. *client.peakMetrics.TCPPortForwardFailureRate = TCPPortForwardFailureRate
  1122. client.peakMetrics.TCPPortForwardFailureRateSampleSize = new(int64)
  1123. *client.peakMetrics.TCPPortForwardFailureRateSampleSize = sampleSize
  1124. } else if *client.peakMetrics.TCPPortForwardFailureRate < TCPPortForwardFailureRate {
  1125. *client.peakMetrics.TCPPortForwardFailureRate = TCPPortForwardFailureRate
  1126. *client.peakMetrics.TCPPortForwardFailureRateSampleSize = sampleSize
  1127. }
  1128. }
  1129. sampleSize = totalDNSCount + totalDNSFailedCount
  1130. if sampleSize >= minimumSampleSize {
  1131. DNSFailureRate := float64(totalDNSFailedCount) / float64(sampleSize)
  1132. if client.peakMetrics.DNSFailureRate == nil {
  1133. client.peakMetrics.DNSFailureRate = new(float64)
  1134. *client.peakMetrics.DNSFailureRate = DNSFailureRate
  1135. client.peakMetrics.DNSFailureRateSampleSize = new(int64)
  1136. *client.peakMetrics.DNSFailureRateSampleSize = sampleSize
  1137. } else if *client.peakMetrics.DNSFailureRate < DNSFailureRate {
  1138. *client.peakMetrics.DNSFailureRate = DNSFailureRate
  1139. *client.peakMetrics.DNSFailureRateSampleSize = sampleSize
  1140. }
  1141. }
  1142. // Reset quality metrics counters
  1143. client.qualityMetrics.reset()
  1144. client.Unlock()
  1145. }
  1146. for _, client := range sshServer.clients {
  1147. client.Lock()
  1148. // Update client peak proximate (same region) concurrently connected
  1149. // (other clients) client metrics, to be recorded in server_tunnel.
  1150. // This operation requires a second loop over sshServer.clients since
  1151. // established_clients is calculated in the first loop.
  1152. //
  1153. // Limitations:
  1154. //
  1155. // - This is an approximation, not a true peak, as it only samples
  1156. // data every Config.LoadMonitorPeriodSeconds period. There is no
  1157. // sample recorded for short tunnels with no overlapping
  1158. // getLoadStats event.
  1159. //
  1160. // - The "-1" calculation counts all but the current client as other
  1161. // clients; it can be the case that the same client has a dangling
  1162. // accepted connection that has yet to time-out server side. Due to
  1163. // NAT, we can't determine if the client is the same based on
  1164. // network address. For established clients,
  1165. // registerEstablishedClient ensures that any previous connection
  1166. // is first terminated, although this is only for the same
  1167. // session_id. Concurrent proximate clients may be considered an
  1168. // exact number of other _network connections_, even from the same
  1169. // client.
  1170. //
  1171. // Futhermore, since client.Locks aren't held between the previous
  1172. // loop and this one, it's also possible that the client's
  1173. // clientGeoIPData was None in the previous loop and is now not
  1174. // None. In this case, the regionStats may not be populated at all
  1175. // for the client's current region; if so, the client is skipped.
  1176. // This scenario can also result in a proximate undercount by one,
  1177. // when the regionStats _is_ populated: this client was counted
  1178. // under None, not the current client.peerGeoIPData.Country, so
  1179. // the -1 subtracts some _other_ client from the populated regionStats.
  1180. //
  1181. // - For in-proxy protocols, the accepted proximate metric uses the
  1182. // peer GeoIP, which represents the proxy, not the client.
  1183. stats := regionStats[client.peerGeoIPData.Country]["ALL"]
  1184. n := stats["accepted_clients"].(int64) - 1
  1185. if n >= 0 {
  1186. if client.peakMetrics.concurrentProximateAcceptedClients == nil {
  1187. client.peakMetrics.concurrentProximateAcceptedClients = new(int64)
  1188. *client.peakMetrics.concurrentProximateAcceptedClients = n
  1189. } else if *client.peakMetrics.concurrentProximateAcceptedClients < n {
  1190. *client.peakMetrics.concurrentProximateAcceptedClients = n
  1191. }
  1192. }
  1193. // Handle the in-proxy None and None/not-None cases (and any other
  1194. // potential scenario where regionStats[client.clientGeoIPData.Country]
  1195. // may not be populated).
  1196. if client.clientGeoIPData.Country == GEOIP_UNKNOWN_VALUE ||
  1197. regionStats[client.clientGeoIPData.Country] == nil {
  1198. client.Unlock()
  1199. continue
  1200. }
  1201. stats = regionStats[client.clientGeoIPData.Country]["ALL"]
  1202. n = stats["established_clients"].(int64) - 1
  1203. if n >= 0 {
  1204. if client.peakMetrics.concurrentProximateEstablishedClients == nil {
  1205. client.peakMetrics.concurrentProximateEstablishedClients = new(int64)
  1206. *client.peakMetrics.concurrentProximateEstablishedClients = n
  1207. } else if *client.peakMetrics.concurrentProximateEstablishedClients < n {
  1208. *client.peakMetrics.concurrentProximateEstablishedClients = n
  1209. }
  1210. }
  1211. client.Unlock()
  1212. }
  1213. return upstreamStats, protocolStats, regionStats
  1214. }
  1215. func (sshServer *sshServer) getEstablishedClientCount() int {
  1216. sshServer.clientsMutex.Lock()
  1217. defer sshServer.clientsMutex.Unlock()
  1218. establishedClients := len(sshServer.clients)
  1219. return establishedClients
  1220. }
  1221. func (sshServer *sshServer) resetAllClientTrafficRules() {
  1222. sshServer.clientsMutex.Lock()
  1223. clients := make(map[string]*sshClient)
  1224. for sessionID, client := range sshServer.clients {
  1225. clients[sessionID] = client
  1226. }
  1227. sshServer.clientsMutex.Unlock()
  1228. for _, client := range clients {
  1229. client.setTrafficRules()
  1230. }
  1231. }
  1232. func (sshServer *sshServer) resetAllClientOSLConfigs() {
  1233. // Flush cached seed state. This has the same effect
  1234. // and same limitations as calling setOSLConfig for
  1235. // currently connected clients -- all progress is lost.
  1236. sshServer.oslSessionCacheMutex.Lock()
  1237. sshServer.oslSessionCache.Flush()
  1238. sshServer.oslSessionCacheMutex.Unlock()
  1239. sshServer.clientsMutex.Lock()
  1240. clients := make(map[string]*sshClient)
  1241. for sessionID, client := range sshServer.clients {
  1242. clients[sessionID] = client
  1243. }
  1244. sshServer.clientsMutex.Unlock()
  1245. for _, client := range clients {
  1246. client.setOSLConfig()
  1247. }
  1248. }
  1249. // reloadTactics signals/invokes components that use server-side tactics for
  1250. // one-time initialization to reload and use potentially changed parameters.
  1251. func (sshServer *sshServer) reloadTactics() error {
  1252. // The following in-proxy components use server-side tactics with a
  1253. // one-time initialization:
  1254. //
  1255. // - For servers running in-proxy tunnel protocols,
  1256. // sshServer.inproxyBrokerSessions are the broker/server sessions and
  1257. // the set of expected broker public keys is set from tactics.
  1258. // - For servers running a broker within MeekServer, broker operational
  1259. // configuration is set from tactics.
  1260. //
  1261. // For these components, one-time initialization is more efficient than
  1262. // constantly fetching tactics. Instead, these components reinitialize
  1263. // when tactics change.
  1264. // sshServer.inproxyBrokerSessions is not nil when the server is running
  1265. // in-proxy tunnel protocols.
  1266. if sshServer.inproxyBrokerSessions != nil {
  1267. // Get InproxyAllBrokerSpecs from tactics.
  1268. //
  1269. // Limitation: assumes no GeoIP targeting for InproxyAllBrokerSpecs.
  1270. p, err := sshServer.support.ServerTacticsParametersCache.Get(NewGeoIPData())
  1271. if err != nil {
  1272. return errors.Trace(err)
  1273. }
  1274. if !p.IsNil() {
  1275. // Fall back to InproxyBrokerSpecs if InproxyAllBrokerSpecs is not
  1276. // configured.
  1277. brokerSpecs := p.InproxyBrokerSpecs(
  1278. parameters.InproxyAllBrokerSpecs, parameters.InproxyBrokerSpecs)
  1279. var brokerPublicKeys []inproxy.SessionPublicKey
  1280. var brokerRootObfuscationSecrets []inproxy.ObfuscationSecret
  1281. for _, brokerSpec := range brokerSpecs {
  1282. brokerPublicKey, err := inproxy.SessionPublicKeyFromString(
  1283. brokerSpec.BrokerPublicKey)
  1284. if err != nil {
  1285. return errors.Trace(err)
  1286. }
  1287. brokerPublicKeys = append(
  1288. brokerPublicKeys, brokerPublicKey)
  1289. brokerRootObfuscationSecret, err := inproxy.ObfuscationSecretFromString(
  1290. brokerSpec.BrokerRootObfuscationSecret)
  1291. if err != nil {
  1292. return errors.Trace(err)
  1293. }
  1294. brokerRootObfuscationSecrets = append(
  1295. brokerRootObfuscationSecrets, brokerRootObfuscationSecret)
  1296. }
  1297. // SetKnownBrokerPublicKeys will terminate any existing sessions
  1298. // for broker public keys no longer in the known/expected list;
  1299. // but will retain any existing sessions for broker public keys
  1300. // that remain in the list.
  1301. err = sshServer.inproxyBrokerSessions.SetKnownBrokers(
  1302. brokerPublicKeys, brokerRootObfuscationSecrets)
  1303. if err != nil {
  1304. return errors.Trace(err)
  1305. }
  1306. sshServer.inproxyBrokerSessions.SetProxyQualityRequestParameters(
  1307. p.Int(parameters.InproxyProxyQualityReporterMaxRequestEntries),
  1308. p.Duration(parameters.InproxyProxyQualityReporterRequestDelay),
  1309. p.Duration(parameters.InproxyProxyQualityReporterRequestTimeout),
  1310. p.Int(parameters.InproxyProxyQualityReporterRequestRetries))
  1311. }
  1312. }
  1313. err := sshServer.reloadMeekServerTactics()
  1314. if err != nil {
  1315. return errors.Trace(err)
  1316. }
  1317. return nil
  1318. }
  1319. func (sshServer *sshServer) revokeClientAuthorizations(sessionID string) {
  1320. sshServer.clientsMutex.Lock()
  1321. client := sshServer.clients[sessionID]
  1322. sshServer.clientsMutex.Unlock()
  1323. if client == nil {
  1324. return
  1325. }
  1326. // sshClient.handshakeState.authorizedAccessTypes is not cleared. Clearing
  1327. // authorizedAccessTypes may cause sshClient.logTunnel to fail to log
  1328. // access types. As the revocation may be due to legitimate use of an
  1329. // authorization in multiple sessions by a single client, useful metrics
  1330. // would be lost.
  1331. client.Lock()
  1332. client.handshakeState.authorizationsRevoked = true
  1333. client.Unlock()
  1334. // Select and apply new traffic rules, as filtered by the client's new
  1335. // authorization state.
  1336. client.setTrafficRules()
  1337. }
  1338. func (sshServer *sshServer) stopClients() {
  1339. sshServer.clientsMutex.Lock()
  1340. sshServer.stoppingClients = true
  1341. clients := sshServer.clients
  1342. sshServer.clients = make(map[string]*sshClient)
  1343. sshServer.clientsMutex.Unlock()
  1344. // Stop clients concurrently; if any one client stop hangs, due to a bug,
  1345. // this ensures that we still stop and record a server_tunnel for all
  1346. // non-hanging clients.
  1347. waitGroup := new(sync.WaitGroup)
  1348. for _, client := range clients {
  1349. waitGroup.Add(1)
  1350. go func(c *sshClient) {
  1351. defer waitGroup.Done()
  1352. c.stop()
  1353. c.awaitStopped()
  1354. }(client)
  1355. }
  1356. waitGroup.Wait()
  1357. }
  1358. func (sshServer *sshServer) handleClient(
  1359. sshListener *sshListener,
  1360. conn net.Conn,
  1361. transportData *additionalTransportData) {
  1362. // overrideTunnelProtocol sets the tunnel protocol to a value other than
  1363. // the listener tunnel protocol. This is used in fronted meek
  1364. // configuration, where a single HTTPS listener also handles fronted HTTP
  1365. // and QUIC traffic; and in the protocol demux case.
  1366. tunnelProtocol := sshListener.tunnelProtocol
  1367. if transportData != nil && transportData.overrideTunnelProtocol != "" {
  1368. tunnelProtocol = transportData.overrideTunnelProtocol
  1369. }
  1370. // Calling conn.RemoteAddr at this point, before any Read calls,
  1371. // satisfies the constraint documented in tapdance.Listen.
  1372. peerAddr := conn.RemoteAddr()
  1373. // Check if there were irregularities during the network connection
  1374. // establishment. When present, log and then behave as Obfuscated SSH does
  1375. // when the client fails to provide a valid seed message.
  1376. //
  1377. // One concrete irregular case is failure to send a PROXY protocol header for
  1378. // TAPDANCE-OSSH.
  1379. if indicator, ok := conn.(common.IrregularIndicator); ok {
  1380. tunnelErr := indicator.IrregularTunnelError()
  1381. if tunnelErr != nil {
  1382. logIrregularTunnel(
  1383. sshServer.support,
  1384. sshListener.tunnelProtocol,
  1385. sshListener.port,
  1386. common.IPAddressFromAddr(peerAddr),
  1387. errors.Trace(tunnelErr),
  1388. nil)
  1389. var afterFunc *time.Timer
  1390. if sshServer.support.Config.sshHandshakeTimeout > 0 {
  1391. afterFunc = time.AfterFunc(sshServer.support.Config.sshHandshakeTimeout, func() {
  1392. conn.Close()
  1393. })
  1394. }
  1395. _, _ = io.Copy(ioutil.Discard, conn)
  1396. conn.Close()
  1397. afterFunc.Stop()
  1398. return
  1399. }
  1400. }
  1401. // Get any packet manipulation values from GetAppliedSpecName as soon as
  1402. // possible due to the expiring TTL.
  1403. //
  1404. // In the case of in-proxy tunnel protocols, the remote address will be
  1405. // the proxy, not the client, and GeoIP targeted packet manipulation will
  1406. // apply to the 2nd hop.
  1407. serverPacketManipulation := ""
  1408. replayedServerPacketManipulation := false
  1409. if sshServer.support.Config.RunPacketManipulator &&
  1410. protocol.TunnelProtocolMayUseServerPacketManipulation(tunnelProtocol) {
  1411. // A meekConn has synthetic address values, including the original client
  1412. // address in cases where the client uses an upstream proxy to connect to
  1413. // Psiphon. For meekConn, and any other conn implementing
  1414. // UnderlyingTCPAddrSource, get the underlying TCP connection addresses.
  1415. //
  1416. // Limitation: a meek tunnel may consist of several TCP connections. The
  1417. // server_packet_manipulation metric will reflect the packet manipulation
  1418. // applied to the _first_ TCP connection only.
  1419. var localAddr, remoteAddr *net.TCPAddr
  1420. var ok bool
  1421. underlying, ok := conn.(common.UnderlyingTCPAddrSource)
  1422. if ok {
  1423. localAddr, remoteAddr, ok = underlying.GetUnderlyingTCPAddrs()
  1424. } else {
  1425. localAddr, ok = conn.LocalAddr().(*net.TCPAddr)
  1426. if ok {
  1427. remoteAddr, ok = conn.RemoteAddr().(*net.TCPAddr)
  1428. }
  1429. }
  1430. if ok {
  1431. specName, extraData, err := sshServer.support.PacketManipulator.
  1432. GetAppliedSpecName(localAddr, remoteAddr)
  1433. if err == nil {
  1434. serverPacketManipulation = specName
  1435. replayedServerPacketManipulation, _ = extraData.(bool)
  1436. }
  1437. }
  1438. }
  1439. // For in-proxy tunnel protocols, accepted client GeoIP reflects the proxy
  1440. // address, not the client.
  1441. peerGeoIPData := sshServer.support.GeoIPService.Lookup(
  1442. common.IPAddressFromAddr(peerAddr))
  1443. sshServer.registerAcceptedClient(tunnelProtocol, peerGeoIPData.Country)
  1444. defer sshServer.unregisterAcceptedClient(tunnelProtocol, peerGeoIPData.Country)
  1445. // When configured, enforce a cap on the number of concurrent SSH
  1446. // handshakes. This limits load spikes on busy servers when many clients
  1447. // attempt to connect at once. Wait a short time, SSH_BEGIN_HANDSHAKE_TIMEOUT,
  1448. // to acquire; waiting will avoid immediately creating more load on another
  1449. // server in the network when the client tries a new candidate. Disconnect the
  1450. // client when that wait time is exceeded.
  1451. //
  1452. // This mechanism limits memory allocations and CPU usage associated with the
  1453. // SSH handshake. At this point, new direct TCP connections or new meek
  1454. // connections, with associated resource usage, are already established. Those
  1455. // connections are expected to be rate or load limited using other mechanisms.
  1456. //
  1457. // TODO:
  1458. //
  1459. // - deduct time spent acquiring the semaphore from SSH_HANDSHAKE_TIMEOUT in
  1460. // sshClient.run, since the client is also applying an SSH handshake timeout
  1461. // and won't exclude time spent waiting.
  1462. // - each call to sshServer.handleClient (in sshServer.runListener) is invoked
  1463. // in its own goroutine, but shutdown doesn't synchronously await these
  1464. // goroutnes. Once this is synchronizes, the following context.WithTimeout
  1465. // should use an sshServer parent context to ensure blocking acquires
  1466. // interrupt immediately upon shutdown.
  1467. var onSSHHandshakeFinished func()
  1468. if sshServer.support.Config.MaxConcurrentSSHHandshakes > 0 {
  1469. ctx, cancelFunc := context.WithTimeout(
  1470. context.Background(),
  1471. sshServer.support.Config.sshBeginHandshakeTimeout)
  1472. defer cancelFunc()
  1473. err := sshServer.concurrentSSHHandshakes.Acquire(ctx, 1)
  1474. if err != nil {
  1475. conn.Close()
  1476. // This is a debug log as the only possible error is context timeout.
  1477. log.WithTraceFields(LogFields{"error": err}).Debug(
  1478. "acquire SSH handshake semaphore failed")
  1479. return
  1480. }
  1481. onSSHHandshakeFinished = func() {
  1482. sshServer.concurrentSSHHandshakes.Release(1)
  1483. }
  1484. }
  1485. sshClient, err := newSshClient(
  1486. sshServer,
  1487. sshListener,
  1488. tunnelProtocol,
  1489. transportData,
  1490. serverPacketManipulation,
  1491. replayedServerPacketManipulation,
  1492. peerAddr,
  1493. peerGeoIPData)
  1494. if err != nil {
  1495. conn.Close()
  1496. log.WithTraceFields(LogFields{"error": err}).Warning(
  1497. "newSshClient failed")
  1498. return
  1499. }
  1500. // sshClient.run _must_ call onSSHHandshakeFinished to release the semaphore:
  1501. // in any error case; or, as soon as the SSH handshake phase has successfully
  1502. // completed.
  1503. sshClient.run(conn, onSSHHandshakeFinished)
  1504. }
  1505. func (sshServer *sshServer) monitorPortForwardDialError(err error) {
  1506. // "err" is the error returned from a failed TCP or UDP port
  1507. // forward dial. Certain system error codes indicate low resource
  1508. // conditions: insufficient file descriptors, ephemeral ports, or
  1509. // memory. For these cases, log an alert.
  1510. // TODO: also temporarily suspend new clients
  1511. // Note: don't log net.OpError.Error() as the full error string
  1512. // may contain client destination addresses.
  1513. opErr, ok := err.(*net.OpError)
  1514. if ok {
  1515. if opErr.Err == syscall.EADDRNOTAVAIL ||
  1516. opErr.Err == syscall.EAGAIN ||
  1517. opErr.Err == syscall.ENOMEM ||
  1518. opErr.Err == syscall.EMFILE ||
  1519. opErr.Err == syscall.ENFILE {
  1520. log.WithTraceFields(
  1521. LogFields{"error": opErr.Err}).Error(
  1522. "port forward dial failed due to unavailable resource")
  1523. }
  1524. }
  1525. }
  1526. // tunnelProtocolUsesTLSDemux returns true if the server demultiplexes the given
  1527. // protocol and TLS-OSSH over the same port.
  1528. func (sshServer *sshServer) tunnelProtocolUsesTLSDemux(tunnelProtocol string) bool {
  1529. // Only use meek/TLS-OSSH demux if unfronted meek HTTPS with non-legacy
  1530. // passthrough, and not in-proxy.
  1531. if protocol.TunnelProtocolUsesMeekHTTPS(tunnelProtocol) &&
  1532. !protocol.TunnelProtocolUsesFrontedMeek(tunnelProtocol) &&
  1533. !protocol.TunnelProtocolUsesInproxy(tunnelProtocol) {
  1534. _, passthroughEnabled := sshServer.support.Config.TunnelProtocolPassthroughAddresses[tunnelProtocol]
  1535. return passthroughEnabled && !sshServer.support.Config.LegacyPassthrough
  1536. }
  1537. return false
  1538. }
  1539. // setGeoIPSessionCache adds the sessionID/geoIPData pair to the session
  1540. // cache. This value will not expire; the caller must call
  1541. // markGeoIPSessionCacheToExpire to initiate expiry. Calling
  1542. // setGeoIPSessionCache for an existing sessionID will replace the previous
  1543. // value and reset any expiry.
  1544. func (sshServer *sshServer) setGeoIPSessionCache(sessionID string, geoIPData GeoIPData) {
  1545. sshServer.geoIPSessionCache.Set(sessionID, geoIPData, cache.NoExpiration)
  1546. }
  1547. // markGeoIPSessionCacheToExpire initiates expiry for an existing session
  1548. // cache entry, if the session ID is found in the cache. Concurrency note:
  1549. // setGeoIPSessionCache and markGeoIPSessionCacheToExpire should not be
  1550. // called concurrently for a single session ID.
  1551. func (sshServer *sshServer) markGeoIPSessionCacheToExpire(sessionID string) {
  1552. geoIPData, found := sshServer.geoIPSessionCache.Get(sessionID)
  1553. // Note: potential race condition between Get and Set. In practice,
  1554. // the tunnel server won't clobber a SetSessionCache value by calling
  1555. // MarkSessionCacheToExpire concurrently.
  1556. if found {
  1557. sshServer.geoIPSessionCache.Set(sessionID, geoIPData, cache.DefaultExpiration)
  1558. }
  1559. }
  1560. // getGeoIPSessionCache returns the cached GeoIPData for the specified session
  1561. // ID; a blank GeoIPData is returned if the session ID is not found in the
  1562. // cache.
  1563. func (sshServer *sshServer) getGeoIPSessionCache(sessionID string) GeoIPData {
  1564. geoIPData, found := sshServer.geoIPSessionCache.Get(sessionID)
  1565. if !found {
  1566. return NewGeoIPData()
  1567. }
  1568. return geoIPData.(GeoIPData)
  1569. }
  1570. // inGeoIPSessionCache returns whether the session ID is present in the
  1571. // session cache.
  1572. func (sshServer *sshServer) inGeoIPSessionCache(sessionID string) bool {
  1573. _, found := sshServer.geoIPSessionCache.Get(sessionID)
  1574. return found
  1575. }
  1576. type sshClient struct {
  1577. sync.Mutex
  1578. sshServer *sshServer
  1579. sshListener *sshListener
  1580. tunnelProtocol string
  1581. isInproxyTunnelProtocol bool
  1582. additionalTransportData *additionalTransportData
  1583. sshConn ssh.Conn
  1584. throttledConn *common.ThrottledConn
  1585. serverPacketManipulation string
  1586. replayedServerPacketManipulation bool
  1587. peerAddr net.Addr
  1588. peerGeoIPData GeoIPData
  1589. clientIP string
  1590. clientGeoIPData GeoIPData
  1591. sessionID string
  1592. isFirstTunnelInSession bool
  1593. supportsServerRequests bool
  1594. sponsorID string
  1595. handshakeState handshakeState
  1596. udpgwChannelHandler *udpgwPortForwardMultiplexer
  1597. totalUdpgwChannelCount int
  1598. packetTunnelChannel ssh.Channel
  1599. totalPacketTunnelChannelCount int
  1600. trafficRules TrafficRules
  1601. tcpTrafficState trafficState
  1602. udpTrafficState trafficState
  1603. qualityMetrics *qualityMetrics
  1604. tcpPortForwardLRU *common.LRUConns
  1605. oslClientSeedState *osl.ClientSeedState
  1606. signalIssueSLOKs chan struct{}
  1607. runCtx context.Context
  1608. stopRunning context.CancelFunc
  1609. stopped chan struct{}
  1610. tcpPortForwardDialingAvailableSignal context.CancelFunc
  1611. releaseAuthorizations func()
  1612. stopTimer *time.Timer
  1613. preHandshakeRandomStreamMetrics randomStreamMetrics
  1614. postHandshakeRandomStreamMetrics randomStreamMetrics
  1615. sendAlertRequests chan protocol.AlertRequest
  1616. sentAlertRequests map[string]bool
  1617. peakMetrics peakMetrics
  1618. destinationBytesMetrics map[string]*protocolDestinationBytesMetrics
  1619. inproxyProxyQualityTracker *inproxyProxyQualityTracker
  1620. dnsResolver *net.Resolver
  1621. dnsCache *lrucache.Cache
  1622. requestCheckServerEntryTags int
  1623. checkedServerEntryTags int
  1624. invalidServerEntryTags int
  1625. sshProtocolBytesTracker *sshProtocolBytesTracker
  1626. dslRequestCount int
  1627. }
  1628. type trafficState struct {
  1629. bytesUp int64
  1630. bytesDown int64
  1631. concurrentDialingPortForwardCount int64
  1632. peakConcurrentDialingPortForwardCount int64
  1633. concurrentPortForwardCount int64
  1634. peakConcurrentPortForwardCount int64
  1635. totalPortForwardCount int64
  1636. availablePortForwardCond *sync.Cond
  1637. }
  1638. type randomStreamMetrics struct {
  1639. count int64
  1640. upstreamBytes int64
  1641. receivedUpstreamBytes int64
  1642. downstreamBytes int64
  1643. sentDownstreamBytes int64
  1644. }
  1645. type peakMetrics struct {
  1646. concurrentProximateAcceptedClients *int64
  1647. concurrentProximateEstablishedClients *int64
  1648. TCPPortForwardFailureRate *float64
  1649. TCPPortForwardFailureRateSampleSize *int64
  1650. DNSFailureRate *float64
  1651. DNSFailureRateSampleSize *int64
  1652. }
  1653. // qualityMetrics records upstream TCP dial attempts and
  1654. // elapsed time. Elapsed time includes the full TCP handshake
  1655. // and, in aggregate, is a measure of the quality of the
  1656. // upstream link. These stats are recorded by each sshClient
  1657. // and then reported and reset in sshServer.getLoadStats().
  1658. type qualityMetrics struct {
  1659. TCPPortForwardDialedCount int64
  1660. TCPPortForwardDialedDuration time.Duration
  1661. TCPPortForwardFailedCount int64
  1662. TCPPortForwardFailedDuration time.Duration
  1663. TCPPortForwardRejectedDialingLimitCount int64
  1664. TCPPortForwardRejectedDisallowedCount int64
  1665. UDPPortForwardRejectedDisallowedCount int64
  1666. TCPIPv4PortForwardDialedCount int64
  1667. TCPIPv4PortForwardDialedDuration time.Duration
  1668. TCPIPv4PortForwardFailedCount int64
  1669. TCPIPv4PortForwardFailedDuration time.Duration
  1670. TCPIPv6PortForwardDialedCount int64
  1671. TCPIPv6PortForwardDialedDuration time.Duration
  1672. TCPIPv6PortForwardFailedCount int64
  1673. TCPIPv6PortForwardFailedDuration time.Duration
  1674. DNSCount map[string]int64
  1675. DNSDuration map[string]time.Duration
  1676. DNSFailedCount map[string]int64
  1677. DNSFailedDuration map[string]time.Duration
  1678. }
  1679. func newQualityMetrics() *qualityMetrics {
  1680. return &qualityMetrics{
  1681. DNSCount: make(map[string]int64),
  1682. DNSDuration: make(map[string]time.Duration),
  1683. DNSFailedCount: make(map[string]int64),
  1684. DNSFailedDuration: make(map[string]time.Duration),
  1685. }
  1686. }
  1687. func (q *qualityMetrics) reset() {
  1688. q.TCPPortForwardDialedCount = 0
  1689. q.TCPPortForwardDialedDuration = 0
  1690. q.TCPPortForwardFailedCount = 0
  1691. q.TCPPortForwardFailedDuration = 0
  1692. q.TCPPortForwardRejectedDialingLimitCount = 0
  1693. q.TCPPortForwardRejectedDisallowedCount = 0
  1694. q.UDPPortForwardRejectedDisallowedCount = 0
  1695. q.TCPIPv4PortForwardDialedCount = 0
  1696. q.TCPIPv4PortForwardDialedDuration = 0
  1697. q.TCPIPv4PortForwardFailedCount = 0
  1698. q.TCPIPv4PortForwardFailedDuration = 0
  1699. q.TCPIPv6PortForwardDialedCount = 0
  1700. q.TCPIPv6PortForwardDialedDuration = 0
  1701. q.TCPIPv6PortForwardFailedCount = 0
  1702. q.TCPIPv6PortForwardFailedDuration = 0
  1703. // Retain existing maps to avoid memory churn. The Go compiler optimizes map
  1704. // clearing operations of the following form.
  1705. for k := range q.DNSCount {
  1706. delete(q.DNSCount, k)
  1707. }
  1708. for k := range q.DNSDuration {
  1709. delete(q.DNSDuration, k)
  1710. }
  1711. for k := range q.DNSFailedCount {
  1712. delete(q.DNSFailedCount, k)
  1713. }
  1714. for k := range q.DNSFailedDuration {
  1715. delete(q.DNSFailedDuration, k)
  1716. }
  1717. }
  1718. type handshakeStateInfo struct {
  1719. activeAuthorizationIDs []string
  1720. authorizedAccessTypes []string
  1721. upstreamBytesPerSecond int64
  1722. downstreamBytesPerSecond int64
  1723. steeringIP string
  1724. }
  1725. type handshakeState struct {
  1726. completed bool
  1727. apiProtocol string
  1728. apiParams common.APIParameters
  1729. activeAuthorizationIDs []string
  1730. authorizedAccessTypes []string
  1731. authorizationsRevoked bool
  1732. domainBytesChecksum []byte
  1733. establishedTunnelsCount int
  1734. splitTunnelLookup *splitTunnelLookup
  1735. deviceRegion string
  1736. newTacticsTag string
  1737. inproxyClientIP string
  1738. inproxyClientGeoIPData GeoIPData
  1739. inproxyProxyID inproxy.ID
  1740. inproxyMatchedPersonal bool
  1741. inproxyRelayLogFields common.LogFields
  1742. }
  1743. type protocolDestinationBytesMetrics struct {
  1744. tcpMetrics destinationBytesMetrics
  1745. udpMetrics destinationBytesMetrics
  1746. }
  1747. type destinationBytesMetrics struct {
  1748. bytesUp int64
  1749. bytesDown int64
  1750. }
  1751. func (d *destinationBytesMetrics) UpdateProgress(
  1752. downstreamBytes, upstreamBytes, _ int64) {
  1753. // Concurrency: UpdateProgress may be called without holding the sshClient
  1754. // lock; all accesses to bytesUp/bytesDown must use atomic operations.
  1755. atomic.AddInt64(&d.bytesUp, upstreamBytes)
  1756. atomic.AddInt64(&d.bytesDown, downstreamBytes)
  1757. }
  1758. func (d *destinationBytesMetrics) getBytesUp() int64 {
  1759. return atomic.LoadInt64(&d.bytesUp)
  1760. }
  1761. func (d *destinationBytesMetrics) getBytesDown() int64 {
  1762. return atomic.LoadInt64(&d.bytesDown)
  1763. }
  1764. type splitTunnelLookup struct {
  1765. regions []string
  1766. regionsLookup map[string]bool
  1767. }
  1768. func newSplitTunnelLookup(
  1769. ownRegion string,
  1770. otherRegions []string) (*splitTunnelLookup, error) {
  1771. length := len(otherRegions)
  1772. if ownRegion != "" {
  1773. length += 1
  1774. }
  1775. // This length check is a sanity check and prevents clients shipping
  1776. // excessively long lists which could impact performance.
  1777. if length > 250 {
  1778. return nil, errors.Tracef("too many regions: %d", length)
  1779. }
  1780. // Create map lookups for lists where the number of values to compare
  1781. // against exceeds a threshold where benchmarks show maps are faster than
  1782. // looping through a slice. Otherwise use a slice for lookups. In both
  1783. // cases, the input slice is no longer referenced.
  1784. if length >= stringLookupThreshold {
  1785. regionsLookup := make(map[string]bool)
  1786. if ownRegion != "" {
  1787. regionsLookup[ownRegion] = true
  1788. }
  1789. for _, region := range otherRegions {
  1790. regionsLookup[region] = true
  1791. }
  1792. return &splitTunnelLookup{
  1793. regionsLookup: regionsLookup,
  1794. }, nil
  1795. } else {
  1796. regions := []string{}
  1797. if ownRegion != "" && !common.Contains(otherRegions, ownRegion) {
  1798. regions = append(regions, ownRegion)
  1799. }
  1800. // TODO: check for other duplicate regions?
  1801. regions = append(regions, otherRegions...)
  1802. return &splitTunnelLookup{
  1803. regions: regions,
  1804. }, nil
  1805. }
  1806. }
  1807. func (lookup *splitTunnelLookup) lookup(region string) bool {
  1808. if lookup.regionsLookup != nil {
  1809. return lookup.regionsLookup[region]
  1810. } else {
  1811. return common.Contains(lookup.regions, region)
  1812. }
  1813. }
  1814. type inproxyProxyQualityTracker struct {
  1815. bytesUp atomic.Int64
  1816. bytesDown atomic.Int64
  1817. reportTriggered int32
  1818. sshClient *sshClient
  1819. targetBytesUp int64
  1820. targetBytesDown int64
  1821. targetDuration time.Duration
  1822. startTime time.Time
  1823. }
  1824. func newInproxyProxyQualityTracker(
  1825. sshClient *sshClient,
  1826. targetBytesUp int64,
  1827. targetBytesDown int64,
  1828. targetDuration time.Duration) *inproxyProxyQualityTracker {
  1829. return &inproxyProxyQualityTracker{
  1830. sshClient: sshClient,
  1831. targetBytesUp: targetBytesUp,
  1832. targetBytesDown: targetBytesDown,
  1833. targetDuration: targetDuration,
  1834. startTime: time.Now(),
  1835. }
  1836. }
  1837. func (t *inproxyProxyQualityTracker) UpdateProgress(
  1838. downstreamBytes, upstreamBytes, _ int64) {
  1839. // Concurrency: UpdateProgress may be called concurrently; all accesses to
  1840. // mutated fields use atomic operations.
  1841. if atomic.LoadInt32(&t.reportTriggered) != 0 {
  1842. // TODO: performance -- remove the updater once the target met,
  1843. // instead of making this residual, no-op update call per tunnel I/O?
  1844. return
  1845. }
  1846. bytesUp := t.bytesUp.Add(upstreamBytes)
  1847. bytesDown := t.bytesDown.Add(downstreamBytes)
  1848. if (t.targetBytesUp == 0 || bytesUp >= t.targetBytesUp) &&
  1849. (t.targetBytesDown == 0 || bytesDown >= t.targetBytesDown) &&
  1850. (t.targetDuration == 0 || time.Since(t.startTime) >= t.targetDuration) {
  1851. // The tunnel connection is wrapped with the quality tracker just
  1852. // before the SSH handshake. It's possible that the quality targets
  1853. // are met before the Psiphon handshake completes, due to sufficient
  1854. // bytes/duration during the intermediate handshakes, or during the
  1855. // liveness test. Since the proxy ID isn't known until then Psiphon
  1856. // handshake completes, delay any report until at least after the
  1857. // Psiphon handshake is completed.
  1858. handshaked, _ := t.sshClient.getHandshaked()
  1859. if handshaked {
  1860. // Limitation: reporting proxy quality is currently a
  1861. // once-per-tunnel operation. Since in-proxy brokers apply a
  1862. // quality data TTL, InproxyProxyQualityTTL, it's possible that a
  1863. // proxy that continues to relay only one single tunnel for
  1864. // longer than that TTL will eventually lose its priority
  1865. // classification even as the tunnel remains connected and relaying
  1866. // data.
  1867. //
  1868. // As a future enhancement, consider reseting the tracker and
  1869. // triggering a new quality report after the
  1870. // InproxyProxyQualityTTL period.
  1871. if !atomic.CompareAndSwapInt32(&t.reportTriggered, 0, 1) {
  1872. return
  1873. }
  1874. t.sshClient.reportProxyQuality()
  1875. }
  1876. }
  1877. }
  1878. type sshProtocolBytesTracker struct {
  1879. totalBytesRead atomic.Int64
  1880. totalBytesWritten atomic.Int64
  1881. }
  1882. func newSSHProtocolBytesTracker(sshClient *sshClient) *sshProtocolBytesTracker {
  1883. return &sshProtocolBytesTracker{}
  1884. }
  1885. func (t *sshProtocolBytesTracker) UpdateProgress(
  1886. bytesRead, bytesWritten, _ int64) {
  1887. // Concurrency: UpdateProgress may be called concurrently; all accesses to
  1888. // mutated fields use atomic operations.
  1889. t.totalBytesRead.Add(bytesRead)
  1890. t.totalBytesWritten.Add(bytesWritten)
  1891. }
  1892. func newSshClient(
  1893. sshServer *sshServer,
  1894. sshListener *sshListener,
  1895. tunnelProtocol string,
  1896. transportData *additionalTransportData,
  1897. serverPacketManipulation string,
  1898. replayedServerPacketManipulation bool,
  1899. peerAddr net.Addr,
  1900. peerGeoIPData GeoIPData) (*sshClient, error) {
  1901. runCtx, stopRunning := context.WithCancel(context.Background())
  1902. // isFirstTunnelInSession is defaulted to true so that the pre-handshake
  1903. // traffic rules won't apply UnthrottleFirstTunnelOnly and negate any
  1904. // unthrottled bytes during the initial protocol negotiation.
  1905. client := &sshClient{
  1906. sshServer: sshServer,
  1907. sshListener: sshListener,
  1908. tunnelProtocol: tunnelProtocol,
  1909. isInproxyTunnelProtocol: protocol.TunnelProtocolUsesInproxy(tunnelProtocol),
  1910. additionalTransportData: transportData,
  1911. serverPacketManipulation: serverPacketManipulation,
  1912. replayedServerPacketManipulation: replayedServerPacketManipulation,
  1913. peerAddr: peerAddr,
  1914. peerGeoIPData: peerGeoIPData,
  1915. isFirstTunnelInSession: true,
  1916. qualityMetrics: newQualityMetrics(),
  1917. tcpPortForwardLRU: common.NewLRUConns(),
  1918. signalIssueSLOKs: make(chan struct{}, 1),
  1919. runCtx: runCtx,
  1920. stopRunning: stopRunning,
  1921. stopped: make(chan struct{}),
  1922. sendAlertRequests: make(chan protocol.AlertRequest, ALERT_REQUEST_QUEUE_BUFFER_SIZE),
  1923. sentAlertRequests: make(map[string]bool),
  1924. }
  1925. client.tcpTrafficState.availablePortForwardCond = sync.NewCond(new(sync.Mutex))
  1926. client.udpTrafficState.availablePortForwardCond = sync.NewCond(new(sync.Mutex))
  1927. if peerAddr == nil {
  1928. return nil, errors.TraceNew("missing peerAddr")
  1929. }
  1930. peerIP, _, err := net.SplitHostPort(peerAddr.String())
  1931. if err != nil {
  1932. return nil, errors.Trace(err)
  1933. }
  1934. if net.ParseIP(peerIP) == nil {
  1935. return nil, errors.TraceNew("invalid peerIP")
  1936. }
  1937. // In the case of in-proxy tunnel protocols, clientIP and clientGeoIPData
  1938. // are not set until the original client IP is relayed from the broker
  1939. // during the handshake. In other cases, clientGeoIPData is the
  1940. // peerGeoIPData (this includes fronted meek).
  1941. if !client.isInproxyTunnelProtocol {
  1942. client.clientIP = peerIP
  1943. client.clientGeoIPData = peerGeoIPData
  1944. }
  1945. return client, nil
  1946. }
  1947. // getClientP gets sshClient.clientIP. See getClientGeoIPData comment.
  1948. func (sshClient *sshClient) getClientIP() string {
  1949. sshClient.Lock()
  1950. defer sshClient.Unlock()
  1951. return sshClient.clientIP
  1952. }
  1953. // getClientGeoIPData gets sshClient.clientGeoIPData. Use this helper when
  1954. // accessing this field without already holding a lock on the sshClient
  1955. // mutex. Unlike older code and unlike with client.peerGeoIPData,
  1956. // sshClient.clientGeoIPData is not static and may get set during the
  1957. // handshake, and it is not safe to access it without a lock.
  1958. func (sshClient *sshClient) getClientGeoIPData() GeoIPData {
  1959. sshClient.Lock()
  1960. defer sshClient.Unlock()
  1961. return sshClient.clientGeoIPData
  1962. }
  1963. func (sshClient *sshClient) run(
  1964. baseConn net.Conn, onSSHHandshakeFinished func()) {
  1965. // When run returns, the client has fully stopped, with all SSH state torn
  1966. // down and no port forwards or API requests in progress.
  1967. defer close(sshClient.stopped)
  1968. // onSSHHandshakeFinished must be called even if the SSH handshake is aborted.
  1969. defer func() {
  1970. if onSSHHandshakeFinished != nil {
  1971. onSSHHandshakeFinished()
  1972. }
  1973. }()
  1974. // Set initial traffic rules, pre-handshake, based on currently known info.
  1975. sshClient.setTrafficRules()
  1976. conn := baseConn
  1977. // Wrap the base client connection with an ActivityMonitoredConn which will
  1978. // terminate the connection if no data is received before the deadline. This
  1979. // timeout is in effect for the entire duration of the SSH connection. Clients
  1980. // must actively use the connection or send SSH keep alive requests to keep
  1981. // the connection active. Writes are not considered reliable activity indicators
  1982. // due to buffering.
  1983. // getTunnelActivityUpdaters wires up updaters that act on tunnel duration
  1984. // and bytes transferred, including the in-proxy proxy quality tracker.
  1985. // The quality tracker will include non-user traffic bytes, so it's not
  1986. // equivalent to server_tunnel bytes.
  1987. //
  1988. // Limitation: wrapping at this point omits some obfuscation layer bytes,
  1989. // including MEEK and QUIC.
  1990. activityConn, err := common.NewActivityMonitoredConn(
  1991. conn,
  1992. SSH_CONNECTION_READ_DEADLINE,
  1993. false,
  1994. nil,
  1995. sshClient.getTunnelActivityUpdaters()...)
  1996. if err != nil {
  1997. conn.Close()
  1998. if !isExpectedTunnelIOError(err) {
  1999. log.WithTraceFields(LogFields{"error": err}).Error("NewActivityMonitoredConn failed")
  2000. }
  2001. return
  2002. }
  2003. conn = activityConn
  2004. // Further wrap the connection with burst monitoring, when enabled.
  2005. //
  2006. // Limitations:
  2007. //
  2008. // - Burst parameters are fixed for the duration of the tunnel and do not
  2009. // change after a tactics hot reload.
  2010. //
  2011. // - In the case of in-proxy tunnel protocols, the original client IP is
  2012. // not yet known, and so burst monitoring GeoIP targeting uses the peer
  2013. // IP, which is the proxy, not the client.
  2014. var burstConn *common.BurstMonitoredConn
  2015. p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.peerGeoIPData)
  2016. if err != nil {
  2017. log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
  2018. "ServerTacticsParametersCache.Get failed")
  2019. return
  2020. }
  2021. if !p.IsNil() {
  2022. upstreamTargetBytes := int64(p.Int(parameters.ServerBurstUpstreamTargetBytes))
  2023. upstreamDeadline := p.Duration(parameters.ServerBurstUpstreamDeadline)
  2024. downstreamTargetBytes := int64(p.Int(parameters.ServerBurstDownstreamTargetBytes))
  2025. downstreamDeadline := p.Duration(parameters.ServerBurstDownstreamDeadline)
  2026. if (upstreamDeadline != 0 && upstreamTargetBytes != 0) ||
  2027. (downstreamDeadline != 0 && downstreamTargetBytes != 0) {
  2028. burstConn = common.NewBurstMonitoredConn(
  2029. conn,
  2030. true,
  2031. upstreamTargetBytes, upstreamDeadline,
  2032. downstreamTargetBytes, downstreamDeadline)
  2033. conn = burstConn
  2034. }
  2035. }
  2036. // Allow garbage collection.
  2037. p.Close()
  2038. // Further wrap the connection in a rate limiting ThrottledConn. The
  2039. // underlying dialConn is always a stream, even when the network conn
  2040. // uses UDP.
  2041. throttledConn := common.NewThrottledConn(conn, true, sshClient.rateLimits())
  2042. conn = throttledConn
  2043. // Replay of server-side parameters is set or extended after a new tunnel
  2044. // meets duration and bytes transferred targets. Set a timer now that expires
  2045. // shortly after the target duration. When the timer fires, check the time of
  2046. // last byte read (a read indicating a live connection with the client),
  2047. // along with total bytes transferred and set or extend replay if the targets
  2048. // are met.
  2049. //
  2050. // Both target checks are conservative: the tunnel may be healthy, but a byte
  2051. // may not have been read in the last second when the timer fires. Or bytes
  2052. // may be transferring, but not at the target level. Only clients that meet
  2053. // the strict targets at the single check time will trigger replay; however,
  2054. // this replay will impact all clients with similar GeoIP data.
  2055. //
  2056. // A deferred function cancels the timer and also increments the replay
  2057. // failure counter, which will ultimately clear replay parameters, when the
  2058. // tunnel fails before the API handshake is completed (this includes any
  2059. // liveness test).
  2060. //
  2061. // A tunnel which fails to meet the targets but successfully completes any
  2062. // liveness test and the API handshake is ignored in terms of replay scoring.
  2063. //
  2064. // In the case of in-proxy tunnel protocols, the peer address will be the
  2065. // proxy, not the client, and GeoIP targeted replay will apply to the 2nd
  2066. // hop.
  2067. isReplayCandidate, replayWaitDuration, replayTargetDuration :=
  2068. sshClient.sshServer.support.ReplayCache.GetReplayTargetDuration(sshClient.peerGeoIPData)
  2069. if isReplayCandidate {
  2070. getFragmentorSeed := func() *prng.Seed {
  2071. fragmentor, ok := baseConn.(common.FragmentorAccessor)
  2072. if ok {
  2073. fragmentorSeed, _ := fragmentor.GetReplay()
  2074. return fragmentorSeed
  2075. }
  2076. return nil
  2077. }
  2078. setReplayAfterFunc := time.AfterFunc(
  2079. replayWaitDuration,
  2080. func() {
  2081. if activityConn.GetActiveDuration() >= replayTargetDuration {
  2082. sshClient.Lock()
  2083. bytesUp := sshClient.tcpTrafficState.bytesUp + sshClient.udpTrafficState.bytesUp
  2084. bytesDown := sshClient.tcpTrafficState.bytesDown + sshClient.udpTrafficState.bytesDown
  2085. sshClient.Unlock()
  2086. sshClient.sshServer.support.ReplayCache.SetReplayParameters(
  2087. sshClient.tunnelProtocol,
  2088. sshClient.peerGeoIPData,
  2089. sshClient.serverPacketManipulation,
  2090. getFragmentorSeed(),
  2091. bytesUp,
  2092. bytesDown)
  2093. }
  2094. })
  2095. defer func() {
  2096. // When panicking, propagate the panic instead of trying to
  2097. // acquire the sshClient lock. Intentional panics may arise from
  2098. // the protobuf code path in logTunnel.
  2099. if r := recover(); r != nil {
  2100. panic(r)
  2101. }
  2102. setReplayAfterFunc.Stop()
  2103. completed, _ := sshClient.getHandshaked()
  2104. if !completed {
  2105. // Count a replay failure case when a tunnel used replay parameters
  2106. // (excluding OSSH fragmentation, which doesn't use the ReplayCache) and
  2107. // failed to complete the API handshake.
  2108. replayedFragmentation := false
  2109. if sshClient.tunnelProtocol != protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH {
  2110. fragmentor, ok := baseConn.(common.FragmentorAccessor)
  2111. if ok {
  2112. _, replayedFragmentation = fragmentor.GetReplay()
  2113. }
  2114. }
  2115. usedReplay := replayedFragmentation || sshClient.replayedServerPacketManipulation
  2116. if usedReplay {
  2117. sshClient.sshServer.support.ReplayCache.FailedReplayParameters(
  2118. sshClient.tunnelProtocol,
  2119. sshClient.peerGeoIPData,
  2120. sshClient.serverPacketManipulation,
  2121. getFragmentorSeed())
  2122. }
  2123. }
  2124. }()
  2125. }
  2126. // Run the initial [obfuscated] SSH handshake in a goroutine so we can both
  2127. // respect shutdownBroadcast and implement a specific handshake timeout.
  2128. // The timeout is to reclaim network resources in case the handshake takes
  2129. // too long.
  2130. type sshNewServerConnResult struct {
  2131. obfuscatedSSHConn *obfuscator.ObfuscatedSSHConn
  2132. sshConn *ssh.ServerConn
  2133. channels <-chan ssh.NewChannel
  2134. requests <-chan *ssh.Request
  2135. err error
  2136. }
  2137. resultChannel := make(chan *sshNewServerConnResult, 2)
  2138. var sshHandshakeAfterFunc *time.Timer
  2139. if sshClient.sshServer.support.Config.sshHandshakeTimeout > 0 {
  2140. sshHandshakeAfterFunc = time.AfterFunc(sshClient.sshServer.support.Config.sshHandshakeTimeout, func() {
  2141. resultChannel <- &sshNewServerConnResult{err: std_errors.New("ssh handshake timeout")}
  2142. })
  2143. }
  2144. go func(baseConn, conn net.Conn) {
  2145. sshServerConfig := &ssh.ServerConfig{
  2146. PasswordCallback: sshClient.passwordCallback,
  2147. AuthLogCallback: sshClient.authLogCallback,
  2148. ServerVersion: sshClient.sshServer.support.Config.SSHServerVersion,
  2149. }
  2150. sshServerConfig.AddHostKey(sshClient.sshServer.sshHostKey)
  2151. var err error
  2152. if protocol.TunnelProtocolUsesObfuscatedSSH(sshClient.tunnelProtocol) {
  2153. // With Encrypt-then-MAC hash algorithms, packet length is
  2154. // transmitted in plaintext, which aids in traffic analysis;
  2155. // clients may still send Encrypt-then-MAC algorithms in their
  2156. // KEX_INIT message, but do not select these algorithms.
  2157. //
  2158. // The exception is TUNNEL_PROTOCOL_SSH, which is intended to appear
  2159. // like SSH on the wire.
  2160. sshServerConfig.NoEncryptThenMACHash = true
  2161. } else {
  2162. // For TUNNEL_PROTOCOL_SSH only, randomize KEX.
  2163. if sshClient.sshServer.support.Config.ObfuscatedSSHKey != "" {
  2164. sshServerConfig.KEXPRNGSeed, err = protocol.DeriveSSHServerKEXPRNGSeed(
  2165. sshClient.sshServer.support.Config.ObfuscatedSSHKey)
  2166. if err != nil {
  2167. err = errors.Trace(err)
  2168. }
  2169. }
  2170. }
  2171. result := &sshNewServerConnResult{}
  2172. // Wrap the connection in an SSH deobfuscator when required.
  2173. if err == nil && protocol.TunnelProtocolUsesObfuscatedSSH(sshClient.tunnelProtocol) {
  2174. // In the case of in-proxy tunnel protocols, the peer address will
  2175. // be the proxy, not the client, and GeoIP targeted server-side
  2176. // OSSH tactics, including prefixes, will apply to the 2nd hop.
  2177. //
  2178. // It is recommended to set ServerOSSHPrefixSpecs, etc., in default
  2179. // tactics.
  2180. var p parameters.ParametersAccessor
  2181. p, err = sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.peerGeoIPData)
  2182. // Log error, but continue. A default prefix spec will be used by the server.
  2183. if err != nil {
  2184. log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
  2185. "ServerTacticsParametersCache.Get failed")
  2186. }
  2187. var osshPrefixEnableFragmentor bool = false
  2188. var serverOsshPrefixSpecs transforms.Specs = nil
  2189. var minDelay, maxDelay time.Duration
  2190. if !p.IsNil() {
  2191. osshPrefixEnableFragmentor = p.Bool(parameters.OSSHPrefixEnableFragmentor)
  2192. serverOsshPrefixSpecs = p.ProtocolTransformSpecs(parameters.ServerOSSHPrefixSpecs)
  2193. minDelay = p.Duration(parameters.OSSHPrefixSplitMinDelay)
  2194. maxDelay = p.Duration(parameters.OSSHPrefixSplitMaxDelay)
  2195. // Allow garbage collection.
  2196. p.Close()
  2197. }
  2198. // Note: NewServerObfuscatedSSHConn blocks on network I/O
  2199. // TODO: ensure this won't block shutdown
  2200. result.obfuscatedSSHConn, err = obfuscator.NewServerObfuscatedSSHConn(
  2201. conn,
  2202. sshClient.sshServer.support.Config.ObfuscatedSSHKey,
  2203. sshClient.sshServer.obfuscatorSeedHistory,
  2204. serverOsshPrefixSpecs,
  2205. func(peerIP string, err error, logFields common.LogFields) {
  2206. logIrregularTunnel(
  2207. sshClient.sshServer.support,
  2208. sshClient.sshListener.tunnelProtocol,
  2209. sshClient.sshListener.port,
  2210. peerIP,
  2211. errors.Trace(err),
  2212. LogFields(logFields))
  2213. })
  2214. if err != nil {
  2215. err = errors.Trace(err)
  2216. } else {
  2217. conn = result.obfuscatedSSHConn
  2218. }
  2219. // Set the OSSH prefix split config.
  2220. if err == nil && result.obfuscatedSSHConn.IsOSSHPrefixStream() {
  2221. err = result.obfuscatedSSHConn.SetOSSHPrefixSplitConfig(minDelay, maxDelay)
  2222. // Log error, but continue.
  2223. if err != nil {
  2224. log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(
  2225. "SetOSSHPrefixSplitConfig failed")
  2226. }
  2227. }
  2228. // Seed the fragmentor, when present, with seed derived from initial
  2229. // obfuscator message. See tactics.Listener.Accept. This must preceed
  2230. // ssh.NewServerConn to ensure fragmentor is seeded before downstream bytes
  2231. // are written.
  2232. if err == nil && protocol.TunnelProtocolIsObfuscatedSSH(sshClient.tunnelProtocol) {
  2233. fragmentor, ok := baseConn.(common.FragmentorAccessor)
  2234. if ok {
  2235. var fragmentorPRNG *prng.PRNG
  2236. fragmentorPRNG, err = result.obfuscatedSSHConn.GetDerivedPRNG("server-side-fragmentor")
  2237. if err != nil {
  2238. err = errors.Trace(err)
  2239. } else {
  2240. fragmentor.SetReplay(fragmentorPRNG)
  2241. }
  2242. // Stops the fragmentor if disabled for prefixed OSSH streams.
  2243. if !osshPrefixEnableFragmentor && result.obfuscatedSSHConn.IsOSSHPrefixStream() {
  2244. fragmentor.StopFragmenting()
  2245. }
  2246. }
  2247. }
  2248. }
  2249. if err == nil {
  2250. result.sshConn, result.channels, result.requests, err =
  2251. ssh.NewServerConn(conn, sshServerConfig)
  2252. if err != nil {
  2253. err = errors.Trace(err)
  2254. }
  2255. }
  2256. result.err = err
  2257. resultChannel <- result
  2258. }(baseConn, conn)
  2259. var result *sshNewServerConnResult
  2260. select {
  2261. case result = <-resultChannel:
  2262. case <-sshClient.sshServer.shutdownBroadcast:
  2263. // Close() will interrupt an ongoing handshake
  2264. // TODO: wait for SSH handshake goroutines to exit before returning?
  2265. conn.Close()
  2266. return
  2267. }
  2268. if sshHandshakeAfterFunc != nil {
  2269. sshHandshakeAfterFunc.Stop()
  2270. }
  2271. if result.err != nil {
  2272. conn.Close()
  2273. // This is a Debug log due to noise. The handshake often fails due to I/O
  2274. // errors as clients frequently interrupt connections in progress when
  2275. // client-side load balancing completes a connection to a different server.
  2276. log.WithTraceFields(LogFields{"error": result.err}).Debug("SSH handshake failed")
  2277. return
  2278. }
  2279. // The SSH handshake has finished successfully; notify now to allow other
  2280. // blocked SSH handshakes to proceed.
  2281. if onSSHHandshakeFinished != nil {
  2282. onSSHHandshakeFinished()
  2283. }
  2284. onSSHHandshakeFinished = nil
  2285. sshClient.Lock()
  2286. sshClient.sshConn = result.sshConn
  2287. sshClient.throttledConn = throttledConn
  2288. sshClient.Unlock()
  2289. if !sshClient.sshServer.registerEstablishedClient(sshClient) {
  2290. conn.Close()
  2291. log.WithTrace().Warning("register failed")
  2292. return
  2293. }
  2294. sshClient.runTunnel(result.channels, result.requests)
  2295. // sshClient.stop closes the underlying transport conn, ensuring all
  2296. // network trafic is complete before calling logTunnel.
  2297. sshClient.stop()
  2298. // Log tunnel metrics.
  2299. var additionalMetrics []LogFields
  2300. // Add activity and burst metrics.
  2301. //
  2302. // The reported duration is based on last confirmed data transfer, which for
  2303. // sshClient.activityConn.GetActiveDuration() is time of last read byte and
  2304. // not conn close time. This is important for protocols such as meek. For
  2305. // meek, the connection remains open until the HTTP session expires, which
  2306. // may be some time after the tunnel has closed. (The meek protocol has no
  2307. // allowance for signalling payload EOF, and even if it did the client may
  2308. // not have the opportunity to send a final request with an EOF flag set.)
  2309. activityMetrics := make(LogFields)
  2310. activityMetrics["start_time"] = activityConn.GetStartTime()
  2311. activityMetrics["duration"] = int64(activityConn.GetActiveDuration() / time.Millisecond)
  2312. additionalMetrics = append(additionalMetrics, activityMetrics)
  2313. if burstConn != nil {
  2314. // Any outstanding burst should be recorded by burstConn.Close which should
  2315. // be called via sshClient.stop.
  2316. additionalMetrics = append(
  2317. additionalMetrics, LogFields(burstConn.GetMetrics(activityConn.GetStartTime())))
  2318. }
  2319. // Some conns report additional metrics. Meek conns report resiliency
  2320. // metrics and fragmentor.Conns report fragmentor configs.
  2321. if metricsSource, ok := baseConn.(common.MetricsSource); ok {
  2322. additionalMetrics = append(
  2323. additionalMetrics, LogFields(metricsSource.GetMetrics()))
  2324. }
  2325. if result.obfuscatedSSHConn != nil {
  2326. additionalMetrics = append(
  2327. additionalMetrics, LogFields(result.obfuscatedSSHConn.GetMetrics()))
  2328. }
  2329. // Add server-replay metrics.
  2330. replayMetrics := make(LogFields)
  2331. replayedFragmentation := false
  2332. fragmentor, ok := baseConn.(common.FragmentorAccessor)
  2333. if ok {
  2334. _, replayedFragmentation = fragmentor.GetReplay()
  2335. }
  2336. replayMetrics["server_replay_fragmentation"] = replayedFragmentation
  2337. replayMetrics["server_replay_packet_manipulation"] = sshClient.replayedServerPacketManipulation
  2338. additionalMetrics = append(additionalMetrics, replayMetrics)
  2339. // Log the server_tunnel event. This log is only guaranteed to be recorded
  2340. // after the SSH handshake completes successfully. If the tunnel fails or
  2341. // is aborted by the client after that point, there will be a server_tunnel
  2342. // log -- with handshake_completed false, if the failure is during the
  2343. // liveness test or Psiphon API handshake, and handshake_completed true
  2344. // otherwise.
  2345. //
  2346. // Some scenarios where there is no server_tunnel log, despite a client
  2347. // initiating a dial, can include:
  2348. // - Failure during the TCP handshake.
  2349. // - Connecting to a fronting CDN, but not establishing a full meek session.
  2350. // - Failure during QUIC, TLS, or Obfuscated OSSH handshakes and all other
  2351. // obfuscation layers which come before the SSH handshake.
  2352. // - The server being in the load limiting state, SetEstablishTunnels(false)
  2353. //
  2354. // In the case of the outermost application-level network protocol,
  2355. // including SSH, we do not necessarly want to log any server_tunnel
  2356. // event until the client has passed anti-probing checks; otherwise, the
  2357. // peer may not be a legitimate client.
  2358. // Limitation: there's only one log per tunnel with bytes transferred
  2359. // metrics, so the byte count can't be attributed to a certain day for
  2360. // tunnels that remain connected for well over 24h. In practise, most
  2361. // tunnels are short-lived, especially on mobile devices.
  2362. sshClient.logTunnel(additionalMetrics)
  2363. // Transfer OSL seed state -- the OSL progress -- from the closing
  2364. // client to the session cache so the client can resume its progress
  2365. // if it reconnects to this same server.
  2366. // Note: following setOSLConfig order of locking.
  2367. sshClient.Lock()
  2368. if sshClient.oslClientSeedState != nil {
  2369. sshClient.sshServer.oslSessionCacheMutex.Lock()
  2370. sshClient.oslClientSeedState.Hibernate()
  2371. sshClient.sshServer.oslSessionCache.Set(
  2372. sshClient.sessionID, sshClient.oslClientSeedState, cache.DefaultExpiration)
  2373. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  2374. sshClient.oslClientSeedState = nil
  2375. }
  2376. sshClient.Unlock()
  2377. // Set the GeoIP session cache to expire; up to this point, the entry for
  2378. // this session ID has no expiry; retaining entries after the tunnel
  2379. // disconnects supports first-tunnel-in-session and duplicate
  2380. // authorization logic.
  2381. sshClient.sshServer.markGeoIPSessionCacheToExpire(sshClient.sessionID)
  2382. // unregisterEstablishedClient removes the client from sshServer.clients.
  2383. // This call must come after logTunnel to ensure all logTunnel calls
  2384. // complete before a sshServer.stopClients returns, in the case of a
  2385. // server shutdown.
  2386. sshClient.sshServer.unregisterEstablishedClient(sshClient)
  2387. }
  2388. func (sshClient *sshClient) passwordCallback(conn ssh.ConnMetadata, password []byte) (*ssh.Permissions, error) {
  2389. expectedSessionIDLength := 2 * protocol.PSIPHON_API_CLIENT_SESSION_ID_LENGTH
  2390. expectedSSHPasswordLength := 2 * SSH_PASSWORD_BYTE_LENGTH
  2391. var sshPasswordPayload protocol.SSHPasswordPayload
  2392. err := json.Unmarshal(password, &sshPasswordPayload)
  2393. if err != nil {
  2394. // Backwards compatibility case: instead of a JSON payload, older clients
  2395. // send the hex encoded session ID prepended to the SSH password.
  2396. // Note: there's an even older case where clients don't send any session ID,
  2397. // but that's no longer supported.
  2398. if len(password) == expectedSessionIDLength+expectedSSHPasswordLength {
  2399. sshPasswordPayload.SessionId = string(password[0:expectedSessionIDLength])
  2400. sshPasswordPayload.SshPassword = string(password[expectedSessionIDLength:])
  2401. } else {
  2402. return nil, errors.Tracef("invalid password payload for %q", conn.User())
  2403. }
  2404. }
  2405. if !isHexDigits(sshPasswordPayload.SessionId) ||
  2406. len(sshPasswordPayload.SessionId) != expectedSessionIDLength {
  2407. return nil, errors.Tracef("invalid session ID for %q", conn.User())
  2408. }
  2409. userOk := (subtle.ConstantTimeCompare(
  2410. []byte(conn.User()), []byte(sshClient.sshServer.support.Config.SSHUserName)) == 1)
  2411. passwordOk := (subtle.ConstantTimeCompare(
  2412. []byte(sshPasswordPayload.SshPassword), []byte(sshClient.sshServer.support.Config.SSHPassword)) == 1)
  2413. if !userOk || !passwordOk {
  2414. return nil, errors.Tracef("invalid password for %q", conn.User())
  2415. }
  2416. sessionID := sshPasswordPayload.SessionId
  2417. // The GeoIP session cache will be populated if there was a previous tunnel
  2418. // with this session ID. This will be true up to GEOIP_SESSION_CACHE_TTL.
  2419. isFirstTunnelInSession := !sshClient.sshServer.inGeoIPSessionCache(sessionID)
  2420. supportsServerRequests := common.Contains(
  2421. sshPasswordPayload.ClientCapabilities, protocol.CLIENT_CAPABILITY_SERVER_REQUESTS)
  2422. // This optional, early sponsor ID will be logged with server_tunnel if
  2423. // the tunnel doesn't reach handshakeState.completed.
  2424. sponsorID := sshPasswordPayload.SponsorID
  2425. if sponsorID != "" && !isSponsorID(sponsorID) {
  2426. return nil, errors.Tracef("invalid sponsor ID")
  2427. }
  2428. sshClient.Lock()
  2429. // After this point, these values are read-only as they are read
  2430. // without obtaining sshClient.Lock.
  2431. sshClient.sessionID = sessionID
  2432. sshClient.isFirstTunnelInSession = isFirstTunnelInSession
  2433. sshClient.supportsServerRequests = supportsServerRequests
  2434. sshClient.sponsorID = sponsorID
  2435. sshClient.Unlock()
  2436. // Initially, in the case of in-proxy tunnel protocols, the GeoIP session
  2437. // cache entry will be the proxy's GeoIPData. This is updated to be the
  2438. // client's GeoIPData in setHandshakeState.
  2439. sshClient.sshServer.setGeoIPSessionCache(sessionID, sshClient.peerGeoIPData)
  2440. return nil, nil
  2441. }
  2442. func (sshClient *sshClient) authLogCallback(conn ssh.ConnMetadata, method string, err error) {
  2443. if err != nil {
  2444. if method == "none" && err.Error() == "ssh: no auth passed yet" {
  2445. // In this case, the callback invocation is noise from auth negotiation
  2446. return
  2447. }
  2448. // Note: here we previously logged messages for fail2ban to act on. This is no longer
  2449. // done as the complexity outweighs the benefits.
  2450. //
  2451. // - The SSH credential is not secret -- it's in the server entry. Attackers targeting
  2452. // the server likely already have the credential. On the other hand, random scanning and
  2453. // brute forcing is mitigated with high entropy random passwords, rate limiting
  2454. // (implemented on the host via iptables), and limited capabilities (the SSH session can
  2455. // only port forward).
  2456. //
  2457. // - fail2ban coverage was inconsistent; in the case of an unfronted meek protocol through
  2458. // an upstream proxy, the remote address is the upstream proxy, which should not be blocked.
  2459. // The X-Forwarded-For header cant be used instead as it may be forged and used to get IPs
  2460. // deliberately blocked; and in any case fail2ban adds iptables rules which can only block
  2461. // by direct remote IP, not by original client IP. Fronted meek has the same iptables issue.
  2462. //
  2463. // Random scanning and brute forcing of port 22 will result in log noise. To mitigate this,
  2464. // not every authentication failure is logged. A summary log is emitted periodically to
  2465. // retain some record of this activity in case this is relevant to, e.g., a performance
  2466. // investigation.
  2467. sshClient.sshServer.authFailedCount.Add(1)
  2468. lastAuthLog := monotime.Time(sshClient.sshServer.lastAuthLog.Load())
  2469. if monotime.Since(lastAuthLog) > SSH_AUTH_LOG_PERIOD {
  2470. now := int64(monotime.Now())
  2471. if sshClient.sshServer.lastAuthLog.CompareAndSwap(int64(lastAuthLog), now) {
  2472. count := sshClient.sshServer.authFailedCount.Swap(0)
  2473. log.WithTraceFields(
  2474. LogFields{"lastError": err, "failedCount": count}).Warning("authentication failures")
  2475. }
  2476. }
  2477. log.WithTraceFields(LogFields{"error": err, "method": method}).Debug("authentication failed")
  2478. } else {
  2479. log.WithTraceFields(LogFields{"error": err, "method": method}).Debug("authentication success")
  2480. }
  2481. }
  2482. // stop signals the ssh connection to shutdown. After sshConn.Wait returns,
  2483. // the SSH connection has terminated but sshClient.run may still be running and
  2484. // in the process of exiting.
  2485. //
  2486. // The shutdown process must complete rapidly and not, e.g., block on network
  2487. // I/O, as newly connecting clients need to await stop completion of any
  2488. // existing connection that shares the same session ID.
  2489. func (sshClient *sshClient) stop() {
  2490. _ = sshClient.sshConn.Close()
  2491. _ = sshClient.sshConn.Wait()
  2492. }
  2493. // awaitStopped will block until sshClient.run has exited, at which point all
  2494. // worker goroutines associated with the sshClient, including any in-flight
  2495. // API handlers, will have exited.
  2496. func (sshClient *sshClient) awaitStopped() {
  2497. <-sshClient.stopped
  2498. }
  2499. // runTunnel handles/dispatches new channels and new requests from the client.
  2500. // When the SSH client connection closes, both the channels and requests channels
  2501. // will close and runTunnel will exit.
  2502. func (sshClient *sshClient) runTunnel(
  2503. channels <-chan ssh.NewChannel,
  2504. requests <-chan *ssh.Request) {
  2505. waitGroup := new(sync.WaitGroup)
  2506. // Start client SSH API request handler
  2507. waitGroup.Add(1)
  2508. go func() {
  2509. defer waitGroup.Done()
  2510. sshClient.handleSSHRequests(requests)
  2511. }()
  2512. // Start request senders
  2513. if sshClient.supportsServerRequests {
  2514. waitGroup.Add(1)
  2515. go func() {
  2516. defer waitGroup.Done()
  2517. sshClient.runOSLSender()
  2518. }()
  2519. waitGroup.Add(1)
  2520. go func() {
  2521. defer waitGroup.Done()
  2522. sshClient.runAlertSender()
  2523. }()
  2524. }
  2525. // Start the TCP port forward manager
  2526. // The queue size is set to the traffic rules (MaxTCPPortForwardCount +
  2527. // MaxTCPDialingPortForwardCount), which is a reasonable indication of resource
  2528. // limits per client; when that value is not set, a default is used.
  2529. // A limitation: this queue size is set once and doesn't change, for this client,
  2530. // when traffic rules are reloaded.
  2531. queueSize := sshClient.getTCPPortForwardQueueSize()
  2532. if queueSize == 0 {
  2533. queueSize = SSH_TCP_PORT_FORWARD_QUEUE_SIZE
  2534. }
  2535. newTCPPortForwards := make(chan *newTCPPortForward, queueSize)
  2536. waitGroup.Add(1)
  2537. go func() {
  2538. defer waitGroup.Done()
  2539. sshClient.handleTCPPortForwards(waitGroup, newTCPPortForwards)
  2540. }()
  2541. // Handle new channel (port forward) requests from the client.
  2542. for newChannel := range channels {
  2543. switch newChannel.ChannelType() {
  2544. case protocol.RANDOM_STREAM_CHANNEL_TYPE:
  2545. sshClient.handleNewRandomStreamChannel(waitGroup, newChannel)
  2546. case protocol.PACKET_TUNNEL_CHANNEL_TYPE:
  2547. sshClient.handleNewPacketTunnelChannel(waitGroup, newChannel)
  2548. case protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE:
  2549. // The protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE is the same as
  2550. // "direct-tcpip", except split tunnel channel rejections are disallowed
  2551. // even if the client has enabled split tunnel. This channel type allows
  2552. // the client to ensure tunneling for certain cases while split tunnel is
  2553. // enabled.
  2554. sshClient.handleNewTCPPortForwardChannel(waitGroup, newChannel, false, newTCPPortForwards)
  2555. case "direct-tcpip":
  2556. sshClient.handleNewTCPPortForwardChannel(waitGroup, newChannel, true, newTCPPortForwards)
  2557. default:
  2558. sshClient.rejectNewChannel(newChannel,
  2559. fmt.Sprintf("unknown or unsupported channel type: %s", newChannel.ChannelType()))
  2560. }
  2561. }
  2562. // The channel loop is interrupted by a client
  2563. // disconnect or by calling sshClient.stop().
  2564. // Stop the TCP port forward manager
  2565. close(newTCPPortForwards)
  2566. // Stop all other worker goroutines
  2567. sshClient.stopRunning()
  2568. if sshClient.sshServer.support.Config.RunPacketTunnel {
  2569. // PacketTunnelServer.ClientDisconnected stops packet tunnel workers.
  2570. sshClient.sshServer.support.PacketTunnelServer.ClientDisconnected(
  2571. sshClient.sessionID)
  2572. }
  2573. // Close any remaining port forward upstream connections in order to
  2574. // interrupt blocked reads.
  2575. sshClient.Lock()
  2576. udpgwChannelHandler := sshClient.udpgwChannelHandler
  2577. sshClient.Unlock()
  2578. if udpgwChannelHandler != nil {
  2579. udpgwChannelHandler.portForwardLRU.CloseAll()
  2580. }
  2581. sshClient.tcpPortForwardLRU.CloseAll()
  2582. waitGroup.Wait()
  2583. sshClient.cleanupAuthorizations()
  2584. }
  2585. func (sshClient *sshClient) handleSSHRequests(requests <-chan *ssh.Request) {
  2586. for request := range requests {
  2587. // Requests are processed serially; API responses must be sent in request order.
  2588. var responsePayload []byte
  2589. var err error
  2590. if request.Type == "keepalive@openssh.com" {
  2591. // SSH keep alive round trips are used as speed test samples.
  2592. responsePayload, err = tactics.MakeSpeedTestResponse(
  2593. SSH_KEEP_ALIVE_PAYLOAD_MIN_BYTES, SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES)
  2594. } else {
  2595. // All other requests are assumed to be API requests.
  2596. responsePayload, err = sshAPIRequestHandler(
  2597. sshClient.sshServer.support,
  2598. sshClient,
  2599. request.Type,
  2600. request.Payload)
  2601. }
  2602. if err == nil {
  2603. err = request.Reply(true, responsePayload)
  2604. } else {
  2605. log.WithTraceFields(LogFields{"error": err}).Warning("request failed")
  2606. err = request.Reply(false, nil)
  2607. }
  2608. if err != nil {
  2609. if !isExpectedTunnelIOError(err) {
  2610. log.WithTraceFields(LogFields{"error": err}).Warning("response failed")
  2611. }
  2612. }
  2613. }
  2614. }
  2615. type newTCPPortForward struct {
  2616. enqueueTime time.Time
  2617. hostToConnect string
  2618. portToConnect int
  2619. doSplitTunnel bool
  2620. newChannel ssh.NewChannel
  2621. }
  2622. func (sshClient *sshClient) handleTCPPortForwards(
  2623. waitGroup *sync.WaitGroup,
  2624. newTCPPortForwards chan *newTCPPortForward) {
  2625. // Lifecycle of a TCP port forward:
  2626. //
  2627. // 1. A "direct-tcpip" SSH request is received from the client.
  2628. //
  2629. // A new TCP port forward request is enqueued. The queue delivers TCP port
  2630. // forward requests to the TCP port forward manager, which enforces the TCP
  2631. // port forward dial limit.
  2632. //
  2633. // Enqueuing new requests allows for reading further SSH requests from the
  2634. // client without blocking when the dial limit is hit; this is to permit new
  2635. // UDP/udpgw port forwards to be restablished without delay. The maximum size
  2636. // of the queue enforces a hard cap on resources consumed by a client in the
  2637. // pre-dial phase. When the queue is full, new TCP port forwards are
  2638. // immediately rejected.
  2639. //
  2640. // 2. The TCP port forward manager dequeues the request.
  2641. //
  2642. // The manager calls dialingTCPPortForward(), which increments
  2643. // concurrentDialingPortForwardCount, and calls
  2644. // isTCPDialingPortForwardLimitExceeded() to check the concurrent dialing
  2645. // count.
  2646. //
  2647. // The manager enforces the concurrent TCP dial limit: when at the limit, the
  2648. // manager blocks waiting for the number of dials to drop below the limit before
  2649. // dispatching the request to handleTCPChannel(), which will run in its own
  2650. // goroutine and will dial and relay the port forward.
  2651. //
  2652. // The block delays the current request and also halts dequeuing of subsequent
  2653. // requests and could ultimately cause requests to be immediately rejected if
  2654. // the queue fills. These actions are intended to apply back pressure when
  2655. // upstream network resources are impaired.
  2656. //
  2657. // The time spent in the queue is deducted from the port forward's dial timeout.
  2658. // The time spent blocking while at the dial limit is similarly deducted from
  2659. // the dial timeout. If the dial timeout has expired before the dial begins, the
  2660. // port forward is rejected and a stat is recorded.
  2661. //
  2662. // 3. handleTCPChannel() performs the port forward dial and relaying.
  2663. //
  2664. // a. Dial the target, using the dial timeout remaining after queue and blocking
  2665. // time is deducted.
  2666. //
  2667. // b. If the dial fails, call abortedTCPPortForward() to decrement
  2668. // concurrentDialingPortForwardCount, freeing up a dial slot.
  2669. //
  2670. // c. If the dial succeeds, call establishedPortForward(), which decrements
  2671. // concurrentDialingPortForwardCount and increments concurrentPortForwardCount,
  2672. // the "established" port forward count.
  2673. //
  2674. // d. Check isPortForwardLimitExceeded(), which enforces the configurable limit on
  2675. // concurrentPortForwardCount, the number of _established_ TCP port forwards.
  2676. // If the limit is exceeded, the LRU established TCP port forward is closed and
  2677. // the newly established TCP port forward proceeds. This LRU logic allows some
  2678. // dangling resource consumption (e.g., TIME_WAIT) while providing a better
  2679. // experience for clients.
  2680. //
  2681. // e. Relay data.
  2682. //
  2683. // f. Call closedPortForward() which decrements concurrentPortForwardCount and
  2684. // records bytes transferred.
  2685. for newPortForward := range newTCPPortForwards {
  2686. remainingDialTimeout :=
  2687. time.Duration(sshClient.getDialTCPPortForwardTimeoutMilliseconds())*time.Millisecond -
  2688. time.Since(newPortForward.enqueueTime)
  2689. if remainingDialTimeout <= 0 {
  2690. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  2691. sshClient.rejectNewChannel(
  2692. newPortForward.newChannel, "TCP port forward timed out in queue")
  2693. continue
  2694. }
  2695. // Reserve a TCP dialing slot.
  2696. //
  2697. // TOCTOU note: important to increment counts _before_ checking limits; otherwise,
  2698. // the client could potentially consume excess resources by initiating many port
  2699. // forwards concurrently.
  2700. sshClient.dialingTCPPortForward()
  2701. // When max dials are in progress, wait up to remainingDialTimeout for dialing
  2702. // to become available. This blocks all dequeing.
  2703. if sshClient.isTCPDialingPortForwardLimitExceeded() {
  2704. blockStartTime := time.Now()
  2705. ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  2706. sshClient.setTCPPortForwardDialingAvailableSignal(cancelCtx)
  2707. <-ctx.Done()
  2708. sshClient.setTCPPortForwardDialingAvailableSignal(nil)
  2709. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  2710. remainingDialTimeout -= time.Since(blockStartTime)
  2711. }
  2712. if remainingDialTimeout <= 0 {
  2713. // Release the dialing slot here since handleTCPChannel() won't be called.
  2714. sshClient.abortedTCPPortForward()
  2715. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  2716. sshClient.rejectNewChannel(
  2717. newPortForward.newChannel, "TCP port forward timed out before dialing")
  2718. continue
  2719. }
  2720. // Dial and relay the TCP port forward. handleTCPChannel is run in its own worker goroutine.
  2721. // handleTCPChannel will release the dialing slot reserved by dialingTCPPortForward(); and
  2722. // will deal with remainingDialTimeout <= 0.
  2723. waitGroup.Add(1)
  2724. go func(remainingDialTimeout time.Duration, newPortForward *newTCPPortForward) {
  2725. defer waitGroup.Done()
  2726. sshClient.handleTCPChannel(
  2727. remainingDialTimeout,
  2728. newPortForward.hostToConnect,
  2729. newPortForward.portToConnect,
  2730. newPortForward.doSplitTunnel,
  2731. newPortForward.newChannel)
  2732. }(remainingDialTimeout, newPortForward)
  2733. }
  2734. }
  2735. func (sshClient *sshClient) handleNewRandomStreamChannel(
  2736. waitGroup *sync.WaitGroup, newChannel ssh.NewChannel) {
  2737. // A random stream channel returns the requested number of bytes -- random
  2738. // bytes -- to the client while also consuming and discarding bytes sent
  2739. // by the client.
  2740. //
  2741. // One use case for the random stream channel is a liveness test that the
  2742. // client performs to confirm that the tunnel is live. As the liveness
  2743. // test is performed in the concurrent establishment phase, before
  2744. // selecting a single candidate for handshake, the random stream channel
  2745. // is available pre-handshake, albeit with additional restrictions.
  2746. //
  2747. // The random stream is subject to throttling in traffic rules; for
  2748. // unthrottled liveness tests, set EstablishmentRead/WriteBytesPerSecond as
  2749. // required. The random stream maximum count and response size cap mitigate
  2750. // clients abusing the facility to waste server resources.
  2751. //
  2752. // Like all other channels, this channel type is handled asynchronously,
  2753. // so it's possible to run at any point in the tunnel lifecycle.
  2754. //
  2755. // Up/downstream byte counts don't include SSH packet and request
  2756. // marshalling overhead.
  2757. var request protocol.RandomStreamRequest
  2758. err := json.Unmarshal(newChannel.ExtraData(), &request)
  2759. if err != nil {
  2760. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("invalid request: %s", err))
  2761. return
  2762. }
  2763. if request.UpstreamBytes > RANDOM_STREAM_MAX_BYTES {
  2764. sshClient.rejectNewChannel(newChannel,
  2765. fmt.Sprintf("invalid upstream bytes: %d", request.UpstreamBytes))
  2766. return
  2767. }
  2768. if request.DownstreamBytes > RANDOM_STREAM_MAX_BYTES {
  2769. sshClient.rejectNewChannel(newChannel,
  2770. fmt.Sprintf("invalid downstream bytes: %d", request.DownstreamBytes))
  2771. return
  2772. }
  2773. var metrics *randomStreamMetrics
  2774. sshClient.Lock()
  2775. if !sshClient.handshakeState.completed {
  2776. metrics = &sshClient.preHandshakeRandomStreamMetrics
  2777. } else {
  2778. metrics = &sshClient.postHandshakeRandomStreamMetrics
  2779. }
  2780. countOk := true
  2781. if !sshClient.handshakeState.completed &&
  2782. metrics.count >= PRE_HANDSHAKE_RANDOM_STREAM_MAX_COUNT {
  2783. countOk = false
  2784. } else {
  2785. metrics.count++
  2786. }
  2787. sshClient.Unlock()
  2788. if !countOk {
  2789. sshClient.rejectNewChannel(newChannel, "max count exceeded")
  2790. return
  2791. }
  2792. channel, requests, err := newChannel.Accept()
  2793. if err != nil {
  2794. if !isExpectedTunnelIOError(err) {
  2795. log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")
  2796. }
  2797. return
  2798. }
  2799. go ssh.DiscardRequests(requests)
  2800. waitGroup.Add(1)
  2801. go func() {
  2802. defer waitGroup.Done()
  2803. upstream := new(sync.WaitGroup)
  2804. received := 0
  2805. sent := 0
  2806. if request.UpstreamBytes > 0 {
  2807. // Process streams concurrently to minimize elapsed time. This also
  2808. // avoids a unidirectional flow burst early in the tunnel lifecycle.
  2809. upstream.Add(1)
  2810. go func() {
  2811. defer upstream.Done()
  2812. n, err := io.CopyN(ioutil.Discard, channel, int64(request.UpstreamBytes))
  2813. received = int(n)
  2814. if err != nil {
  2815. if !isExpectedTunnelIOError(err) {
  2816. log.WithTraceFields(LogFields{"error": err}).Warning("receive failed")
  2817. }
  2818. }
  2819. }()
  2820. }
  2821. if request.DownstreamBytes > 0 {
  2822. n, err := io.CopyN(channel, rand.Reader, int64(request.DownstreamBytes))
  2823. sent = int(n)
  2824. if err != nil {
  2825. if !isExpectedTunnelIOError(err) {
  2826. log.WithTraceFields(LogFields{"error": err}).Warning("send failed")
  2827. }
  2828. }
  2829. }
  2830. upstream.Wait()
  2831. sshClient.Lock()
  2832. metrics.upstreamBytes += int64(request.UpstreamBytes)
  2833. metrics.receivedUpstreamBytes += int64(received)
  2834. metrics.downstreamBytes += int64(request.DownstreamBytes)
  2835. metrics.sentDownstreamBytes += int64(sent)
  2836. sshClient.Unlock()
  2837. channel.Close()
  2838. }()
  2839. }
  2840. func (sshClient *sshClient) handleNewPacketTunnelChannel(
  2841. waitGroup *sync.WaitGroup, newChannel ssh.NewChannel) {
  2842. // packet tunnel channels are handled by the packet tunnel server
  2843. // component. Each client may have at most one packet tunnel channel.
  2844. if !sshClient.sshServer.support.Config.RunPacketTunnel {
  2845. sshClient.rejectNewChannel(newChannel, "unsupported packet tunnel channel type")
  2846. return
  2847. }
  2848. // Accept this channel immediately. This channel will replace any
  2849. // previously existing packet tunnel channel for this client.
  2850. packetTunnelChannel, requests, err := newChannel.Accept()
  2851. if err != nil {
  2852. if !isExpectedTunnelIOError(err) {
  2853. log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")
  2854. }
  2855. return
  2856. }
  2857. go ssh.DiscardRequests(requests)
  2858. sshClient.setPacketTunnelChannel(packetTunnelChannel)
  2859. // PacketTunnelServer will run the client's packet tunnel. If necessary, ClientConnected
  2860. // will stop packet tunnel workers for any previous packet tunnel channel.
  2861. checkAllowedTCPPortFunc := func(upstreamIPAddress net.IP, port int) bool {
  2862. return sshClient.isPortForwardPermitted(portForwardTypeTCP, upstreamIPAddress, port)
  2863. }
  2864. checkAllowedUDPPortFunc := func(upstreamIPAddress net.IP, port int) bool {
  2865. return sshClient.isPortForwardPermitted(portForwardTypeUDP, upstreamIPAddress, port)
  2866. }
  2867. checkAllowedDomainFunc := func(domain string) bool {
  2868. ok, _ := sshClient.isDomainPermitted(domain)
  2869. return ok
  2870. }
  2871. flowActivityUpdaterMaker := func(
  2872. isTCP bool, upstreamHostname string, upstreamIPAddress net.IP) []tun.FlowActivityUpdater {
  2873. trafficType := portForwardTypeTCP
  2874. if !isTCP {
  2875. trafficType = portForwardTypeUDP
  2876. }
  2877. activityUpdaters := sshClient.getPortForwardActivityUpdaters(
  2878. trafficType, upstreamIPAddress)
  2879. flowUpdaters := make([]tun.FlowActivityUpdater, len(activityUpdaters))
  2880. for i, activityUpdater := range activityUpdaters {
  2881. flowUpdaters[i] = activityUpdater
  2882. }
  2883. return flowUpdaters
  2884. }
  2885. metricUpdater := func(
  2886. TCPApplicationBytesDown, TCPApplicationBytesUp,
  2887. UDPApplicationBytesDown, UDPApplicationBytesUp int64) {
  2888. sshClient.Lock()
  2889. sshClient.tcpTrafficState.bytesDown += TCPApplicationBytesDown
  2890. sshClient.tcpTrafficState.bytesUp += TCPApplicationBytesUp
  2891. sshClient.udpTrafficState.bytesDown += UDPApplicationBytesDown
  2892. sshClient.udpTrafficState.bytesUp += UDPApplicationBytesUp
  2893. sshClient.Unlock()
  2894. }
  2895. dnsQualityReporter := sshClient.updateQualityMetricsWithDNSResult
  2896. err = sshClient.sshServer.support.PacketTunnelServer.ClientConnected(
  2897. sshClient.sessionID,
  2898. packetTunnelChannel,
  2899. checkAllowedTCPPortFunc,
  2900. checkAllowedUDPPortFunc,
  2901. checkAllowedDomainFunc,
  2902. flowActivityUpdaterMaker,
  2903. metricUpdater,
  2904. dnsQualityReporter)
  2905. if err != nil {
  2906. log.WithTraceFields(LogFields{"error": err}).Warning("start packet tunnel client failed")
  2907. sshClient.setPacketTunnelChannel(nil)
  2908. }
  2909. }
  2910. func (sshClient *sshClient) handleNewTCPPortForwardChannel(
  2911. waitGroup *sync.WaitGroup,
  2912. newChannel ssh.NewChannel,
  2913. allowSplitTunnel bool,
  2914. newTCPPortForwards chan *newTCPPortForward) {
  2915. // udpgw client connections are dispatched immediately (clients use this for
  2916. // DNS, so it's essential to not block; and only one udpgw connection is
  2917. // retained at a time).
  2918. //
  2919. // All other TCP port forwards are dispatched via the TCP port forward
  2920. // manager queue.
  2921. // http://tools.ietf.org/html/rfc4254#section-7.2
  2922. var directTcpipExtraData struct {
  2923. HostToConnect string
  2924. PortToConnect uint32
  2925. OriginatorIPAddress string
  2926. OriginatorPort uint32
  2927. }
  2928. err := ssh.Unmarshal(newChannel.ExtraData(), &directTcpipExtraData)
  2929. if err != nil {
  2930. sshClient.rejectNewChannel(newChannel, "invalid extra data")
  2931. return
  2932. }
  2933. // Intercept TCP port forwards to a specified udpgw server and handle directly.
  2934. // TODO: also support UDP explicitly, e.g. with a custom "direct-udp" channel type?
  2935. isUdpgwChannel := sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress != "" &&
  2936. sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress ==
  2937. net.JoinHostPort(directTcpipExtraData.HostToConnect, strconv.Itoa(int(directTcpipExtraData.PortToConnect)))
  2938. if isUdpgwChannel {
  2939. // Dispatch immediately. handleUDPChannel runs the udpgw protocol in its
  2940. // own worker goroutine.
  2941. waitGroup.Add(1)
  2942. go func(channel ssh.NewChannel) {
  2943. defer waitGroup.Done()
  2944. sshClient.handleUdpgwChannel(channel)
  2945. }(newChannel)
  2946. } else {
  2947. // Dispatch via TCP port forward manager. When the queue is full, the channel
  2948. // is immediately rejected.
  2949. //
  2950. // Split tunnel logic is enabled for this TCP port forward when the client
  2951. // has enabled split tunnel mode and the channel type allows it.
  2952. doSplitTunnel := sshClient.handshakeState.splitTunnelLookup != nil && allowSplitTunnel
  2953. tcpPortForward := &newTCPPortForward{
  2954. enqueueTime: time.Now(),
  2955. hostToConnect: directTcpipExtraData.HostToConnect,
  2956. portToConnect: int(directTcpipExtraData.PortToConnect),
  2957. doSplitTunnel: doSplitTunnel,
  2958. newChannel: newChannel,
  2959. }
  2960. select {
  2961. case newTCPPortForwards <- tcpPortForward:
  2962. default:
  2963. sshClient.updateQualityMetricsWithRejectedDialingLimit()
  2964. sshClient.rejectNewChannel(newChannel, "TCP port forward dial queue full")
  2965. }
  2966. }
  2967. }
  2968. func (sshClient *sshClient) cleanupAuthorizations() {
  2969. sshClient.Lock()
  2970. if sshClient.releaseAuthorizations != nil {
  2971. sshClient.releaseAuthorizations()
  2972. }
  2973. if sshClient.stopTimer != nil {
  2974. sshClient.stopTimer.Stop()
  2975. }
  2976. sshClient.Unlock()
  2977. }
  2978. // setPacketTunnelChannel sets the single packet tunnel channel
  2979. // for this sshClient. Any existing packet tunnel channel is
  2980. // closed.
  2981. func (sshClient *sshClient) setPacketTunnelChannel(channel ssh.Channel) {
  2982. sshClient.Lock()
  2983. if sshClient.packetTunnelChannel != nil {
  2984. sshClient.packetTunnelChannel.Close()
  2985. }
  2986. sshClient.packetTunnelChannel = channel
  2987. sshClient.totalPacketTunnelChannelCount += 1
  2988. sshClient.Unlock()
  2989. }
  2990. // setUdpgwChannelHandler sets the single udpgw channel handler for this
  2991. // sshClient. Each sshClient may have only one concurrent udpgw
  2992. // channel/handler. Each udpgw channel multiplexes many UDP port forwards via
  2993. // the udpgw protocol. Any existing udpgw channel/handler is closed.
  2994. func (sshClient *sshClient) setUdpgwChannelHandler(udpgwChannelHandler *udpgwPortForwardMultiplexer) bool {
  2995. sshClient.Lock()
  2996. if sshClient.udpgwChannelHandler != nil {
  2997. previousHandler := sshClient.udpgwChannelHandler
  2998. sshClient.udpgwChannelHandler = nil
  2999. // stop must be run without holding the sshClient mutex lock, as the
  3000. // udpgw goroutines may attempt to lock the same mutex. For example,
  3001. // udpgwPortForwardMultiplexer.run calls sshClient.establishedPortForward
  3002. // which calls sshClient.allocatePortForward.
  3003. sshClient.Unlock()
  3004. previousHandler.stop()
  3005. sshClient.Lock()
  3006. // In case some other channel has set the sshClient.udpgwChannelHandler
  3007. // in the meantime, fail. The caller should discard this channel/handler.
  3008. if sshClient.udpgwChannelHandler != nil {
  3009. sshClient.Unlock()
  3010. return false
  3011. }
  3012. }
  3013. sshClient.udpgwChannelHandler = udpgwChannelHandler
  3014. sshClient.totalUdpgwChannelCount += 1
  3015. sshClient.Unlock()
  3016. return true
  3017. }
  3018. var serverTunnelStatParams = append(
  3019. []requestParamSpec{
  3020. {"last_connected", isLastConnected, requestParamOptional},
  3021. {"establishment_duration", isIntString, requestParamOptional}},
  3022. baseAndDialParams...)
  3023. func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {
  3024. sshClient.Lock()
  3025. // For in-proxy tunnel protocols, two sets of GeoIP fields are logged, one
  3026. // for the client and one for the proxy. The client GeoIP fields will
  3027. // be "None" if handshake did not complete.
  3028. logFields := getRequestLogFields(
  3029. "server_tunnel",
  3030. "",
  3031. sshClient.sessionID,
  3032. sshClient.clientGeoIPData,
  3033. sshClient.handshakeState.authorizedAccessTypes,
  3034. sshClient.handshakeState.apiParams,
  3035. serverTunnelStatParams)
  3036. sshClient.sshServer.support.Config.AddServerEntryTag(logFields)
  3037. if sshClient.isInproxyTunnelProtocol {
  3038. sshClient.peerGeoIPData.SetLogFieldsWithPrefix("", "inproxy_proxy", logFields)
  3039. logFields.Add(
  3040. LogFields(sshClient.handshakeState.inproxyRelayLogFields))
  3041. }
  3042. // new_tactics_tag indicates that the handshake returned new tactics.
  3043. if sshClient.handshakeState.newTacticsTag != "" {
  3044. logFields["new_tactics_tag"] = sshClient.handshakeState.newTacticsTag
  3045. }
  3046. // "relay_protocol" is sent with handshake API parameters. In pre-
  3047. // handshake logTunnel cases, this value is not yet known. As
  3048. // sshClient.tunnelProtocol is authoritative, set this value
  3049. // unconditionally, overwriting any value from handshake.
  3050. logFields["relay_protocol"] = sshClient.tunnelProtocol
  3051. if sshClient.serverPacketManipulation != "" {
  3052. logFields["server_packet_manipulation"] = sshClient.serverPacketManipulation
  3053. }
  3054. if sshClient.sshListener.BPFProgramName != "" {
  3055. logFields["server_bpf"] = sshClient.sshListener.BPFProgramName
  3056. }
  3057. logFields["handshake_completed"] = sshClient.handshakeState.completed
  3058. // Use the handshake sponsor ID unless the handshake did not complete; in
  3059. // that case, use the early sponsor ID, if one was provided.
  3060. //
  3061. // TODO: check that the handshake sponsor ID matches the early sponsor ID?
  3062. if !sshClient.handshakeState.completed && sshClient.sponsorID != "" {
  3063. logFields["sponsor_id"] = sshClient.sponsorID
  3064. }
  3065. logFields["is_first_tunnel_in_session"] = sshClient.isFirstTunnelInSession
  3066. if sshClient.preHandshakeRandomStreamMetrics.count > 0 {
  3067. logFields["pre_handshake_random_stream_count"] = sshClient.preHandshakeRandomStreamMetrics.count
  3068. logFields["pre_handshake_random_stream_upstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.upstreamBytes
  3069. logFields["pre_handshake_random_stream_received_upstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.receivedUpstreamBytes
  3070. logFields["pre_handshake_random_stream_downstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.downstreamBytes
  3071. logFields["pre_handshake_random_stream_sent_downstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.sentDownstreamBytes
  3072. }
  3073. if sshClient.handshakeState.completed {
  3074. // When !handshake_completed, all of these values can be assumed to be zero.
  3075. logFields["bytes_up_tcp"] = sshClient.tcpTrafficState.bytesUp
  3076. logFields["bytes_down_tcp"] = sshClient.tcpTrafficState.bytesDown
  3077. logFields["peak_concurrent_dialing_port_forward_count_tcp"] = sshClient.tcpTrafficState.peakConcurrentDialingPortForwardCount
  3078. logFields["peak_concurrent_port_forward_count_tcp"] = sshClient.tcpTrafficState.peakConcurrentPortForwardCount
  3079. logFields["total_port_forward_count_tcp"] = sshClient.tcpTrafficState.totalPortForwardCount
  3080. logFields["bytes_up_udp"] = sshClient.udpTrafficState.bytesUp
  3081. logFields["bytes_down_udp"] = sshClient.udpTrafficState.bytesDown
  3082. // sshClient.udpTrafficState.peakConcurrentDialingPortForwardCount isn't meaningful
  3083. logFields["peak_concurrent_port_forward_count_udp"] = sshClient.udpTrafficState.peakConcurrentPortForwardCount
  3084. logFields["total_port_forward_count_udp"] = sshClient.udpTrafficState.totalPortForwardCount
  3085. logFields["total_udpgw_channel_count"] = sshClient.totalUdpgwChannelCount
  3086. logFields["total_packet_tunnel_channel_count"] = sshClient.totalPacketTunnelChannelCount
  3087. }
  3088. if sshClient.postHandshakeRandomStreamMetrics.count > 0 {
  3089. logFields["random_stream_count"] = sshClient.postHandshakeRandomStreamMetrics.count
  3090. logFields["random_stream_upstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.upstreamBytes
  3091. logFields["random_stream_received_upstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.receivedUpstreamBytes
  3092. logFields["random_stream_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.downstreamBytes
  3093. logFields["random_stream_sent_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.sentDownstreamBytes
  3094. }
  3095. if sshClient.destinationBytesMetrics != nil {
  3096. // Only log destination bytes for ASNs that remain enabled in tactics.
  3097. //
  3098. // Any counts accumulated before DestinationBytesMetricsASNs changes
  3099. // are lost. At this time we can't change destination byte counting
  3100. // dynamically, after a tactics hot reload, as there may be
  3101. // destination bytes port forwards that were in place before the
  3102. // change, which will continue to count.
  3103. destinationBytesMetricsASNs := []string{}
  3104. // Target this using the client, not peer, GeoIP. In the case of
  3105. // in-proxy tunnel protocols, the client GeoIP fields will be None
  3106. // if the handshake does not complete. In that case, no bytes will
  3107. // have transferred.
  3108. p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.clientGeoIPData)
  3109. if err == nil && !p.IsNil() {
  3110. destinationBytesMetricsASNs = p.Strings(parameters.DestinationBytesMetricsASNs)
  3111. }
  3112. p.Close()
  3113. if len(destinationBytesMetricsASNs) > 0 {
  3114. for _, ASN := range destinationBytesMetricsASNs {
  3115. destinationBytesMetrics, ok := sshClient.destinationBytesMetrics[ASN]
  3116. if !ok {
  3117. continue
  3118. }
  3119. sshClient.sshServer.support.destBytesLogger.AddASNBytes(
  3120. ASN,
  3121. sshClient.clientGeoIPData,
  3122. sshClient.handshakeState.apiParams,
  3123. destinationBytesMetrics.tcpMetrics.getBytesUp()+
  3124. destinationBytesMetrics.tcpMetrics.getBytesDown(),
  3125. destinationBytesMetrics.udpMetrics.getBytesUp()+
  3126. destinationBytesMetrics.udpMetrics.getBytesDown())
  3127. }
  3128. }
  3129. }
  3130. // Only log fields for peakMetrics when there is data recorded, otherwise
  3131. // omit the field.
  3132. if sshClient.peakMetrics.concurrentProximateAcceptedClients != nil {
  3133. logFields["peak_concurrent_proximate_accepted_clients"] = *sshClient.peakMetrics.concurrentProximateAcceptedClients
  3134. }
  3135. if sshClient.peakMetrics.concurrentProximateEstablishedClients != nil {
  3136. logFields["peak_concurrent_proximate_established_clients"] = *sshClient.peakMetrics.concurrentProximateEstablishedClients
  3137. }
  3138. if sshClient.peakMetrics.TCPPortForwardFailureRate != nil && sshClient.peakMetrics.TCPPortForwardFailureRateSampleSize != nil {
  3139. logFields["peak_tcp_port_forward_failure_rate"] = *sshClient.peakMetrics.TCPPortForwardFailureRate
  3140. logFields["peak_tcp_port_forward_failure_rate_sample_size"] = *sshClient.peakMetrics.TCPPortForwardFailureRateSampleSize
  3141. }
  3142. if sshClient.peakMetrics.DNSFailureRate != nil && sshClient.peakMetrics.DNSFailureRateSampleSize != nil {
  3143. logFields["peak_dns_failure_rate"] = *sshClient.peakMetrics.DNSFailureRate
  3144. logFields["peak_dns_failure_rate_sample_size"] = *sshClient.peakMetrics.DNSFailureRateSampleSize
  3145. }
  3146. // Pre-calculate a total-tunneled-bytes field. This total is used
  3147. // extensively in analytics and is more performant when pre-calculated.
  3148. bytes := sshClient.tcpTrafficState.bytesUp +
  3149. sshClient.tcpTrafficState.bytesDown +
  3150. sshClient.udpTrafficState.bytesUp +
  3151. sshClient.udpTrafficState.bytesDown
  3152. logFields["bytes"] = bytes
  3153. // Pre-calculate ssh protocol bytes and overhead.
  3154. sshProtocolBytes := sshClient.sshProtocolBytesTracker.totalBytesWritten.Load() +
  3155. sshClient.sshProtocolBytesTracker.totalBytesRead.Load()
  3156. logFields["ssh_protocol_bytes"] = sshProtocolBytes
  3157. logFields["ssh_protocol_bytes_overhead"] = sshProtocolBytes - bytes
  3158. if sshClient.additionalTransportData != nil &&
  3159. sshClient.additionalTransportData.steeringIP != "" {
  3160. logFields["relayed_steering_ip"] = sshClient.additionalTransportData.steeringIP
  3161. }
  3162. if sshClient.requestCheckServerEntryTags > 0 {
  3163. logFields["request_check_server_entry_tags"] = sshClient.requestCheckServerEntryTags
  3164. logFields["checked_server_entry_tags"] = sshClient.checkedServerEntryTags
  3165. logFields["invalid_server_entry_tags"] = sshClient.invalidServerEntryTags
  3166. }
  3167. // Merge in additional metrics from the optional metrics source
  3168. for _, metrics := range additionalMetrics {
  3169. for name, value := range metrics {
  3170. // Don't overwrite any basic fields
  3171. if logFields[name] == nil {
  3172. logFields[name] = value
  3173. }
  3174. }
  3175. }
  3176. // Retain lock when invoking LogRawFieldsWithTimestamp to block any
  3177. // concurrent writes to variables referenced by logFields.
  3178. log.LogRawFieldsWithTimestamp(logFields)
  3179. sshClient.Unlock()
  3180. }
  3181. var blocklistHitsStatParams = []requestParamSpec{
  3182. {"propagation_channel_id", isHexDigits, 0},
  3183. {"sponsor_id", isHexDigits, 0},
  3184. {"client_version", isIntString, requestParamLogStringAsInt},
  3185. {"client_platform", isClientPlatform, 0},
  3186. {"client_features", isAnyString, requestParamOptional | requestParamArray},
  3187. {"client_build_rev", isHexDigits, requestParamOptional},
  3188. {"device_region", isAnyString, requestParamOptional},
  3189. {"device_location", isGeoHashString, requestParamOptional},
  3190. {"egress_region", isRegionCode, requestParamOptional},
  3191. {"last_connected", isLastConnected, requestParamOptional},
  3192. }
  3193. func (sshClient *sshClient) logBlocklistHits(IP net.IP, domain string, tags []BlocklistTag) {
  3194. sshClient.Lock()
  3195. // Log this using the client, not peer, GeoIP. In the case of in-proxy
  3196. // tunnel protocols, the client GeoIP fields will be None if the
  3197. // handshake does not complete. In that case, no port forwarding will
  3198. // occur and there will not be any blocklist hits.
  3199. logFields := getRequestLogFields(
  3200. "server_blocklist_hit",
  3201. "",
  3202. sshClient.sessionID,
  3203. sshClient.clientGeoIPData,
  3204. sshClient.handshakeState.authorizedAccessTypes,
  3205. sshClient.handshakeState.apiParams,
  3206. blocklistHitsStatParams)
  3207. // Note: see comment in logTunnel regarding unlock and concurrent access.
  3208. sshClient.Unlock()
  3209. for _, tag := range tags {
  3210. if IP != nil {
  3211. logFields["blocklist_ip_address"] = IP.String()
  3212. }
  3213. if domain != "" {
  3214. logFields["blocklist_domain"] = domain
  3215. }
  3216. logFields["blocklist_source"] = tag.Source
  3217. logFields["blocklist_subject"] = tag.Subject
  3218. log.LogRawFieldsWithTimestamp(logFields)
  3219. }
  3220. }
  3221. func (sshClient *sshClient) runOSLSender() {
  3222. for {
  3223. // Await a signal that there are SLOKs to send
  3224. // TODO: use reflect.SelectCase, and optionally await timer here?
  3225. select {
  3226. case <-sshClient.signalIssueSLOKs:
  3227. case <-sshClient.runCtx.Done():
  3228. return
  3229. }
  3230. retryDelay := SSH_SEND_OSL_INITIAL_RETRY_DELAY
  3231. for {
  3232. err := sshClient.sendOSLRequest()
  3233. if err == nil {
  3234. break
  3235. }
  3236. if !isExpectedTunnelIOError(err) {
  3237. log.WithTraceFields(LogFields{"error": err}).Warning("sendOSLRequest failed")
  3238. }
  3239. // If the request failed, retry after a delay (with exponential backoff)
  3240. // or when signaled that there are additional SLOKs to send
  3241. retryTimer := time.NewTimer(retryDelay)
  3242. select {
  3243. case <-retryTimer.C:
  3244. case <-sshClient.signalIssueSLOKs:
  3245. case <-sshClient.runCtx.Done():
  3246. retryTimer.Stop()
  3247. return
  3248. }
  3249. retryTimer.Stop()
  3250. retryDelay *= SSH_SEND_OSL_RETRY_FACTOR
  3251. }
  3252. }
  3253. }
  3254. // sendOSLRequest will invoke osl.GetSeedPayload to issue SLOKs and
  3255. // generate a payload, and send an OSL request to the client when
  3256. // there are new SLOKs in the payload.
  3257. func (sshClient *sshClient) sendOSLRequest() error {
  3258. seedPayload := sshClient.getOSLSeedPayload()
  3259. // Don't send when no SLOKs. This will happen when signalIssueSLOKs
  3260. // is received but no new SLOKs are issued.
  3261. if len(seedPayload.SLOKs) == 0 {
  3262. return nil
  3263. }
  3264. oslRequest := protocol.OSLRequest{
  3265. SeedPayload: seedPayload,
  3266. }
  3267. requestPayload, err := json.Marshal(oslRequest)
  3268. if err != nil {
  3269. return errors.Trace(err)
  3270. }
  3271. ok, _, err := sshClient.sshConn.SendRequest(
  3272. protocol.PSIPHON_API_OSL_REQUEST_NAME,
  3273. true,
  3274. requestPayload)
  3275. if err != nil {
  3276. return errors.Trace(err)
  3277. }
  3278. if !ok {
  3279. return errors.TraceNew("client rejected request")
  3280. }
  3281. sshClient.clearOSLSeedPayload()
  3282. return nil
  3283. }
  3284. // runAlertSender dequeues and sends alert requests to the client. As these
  3285. // alerts are informational, there is no retry logic and no SSH client
  3286. // acknowledgement (wantReply) is requested. This worker scheme allows
  3287. // nonconcurrent components including udpgw and packet tunnel to enqueue
  3288. // alerts without blocking their traffic processing.
  3289. func (sshClient *sshClient) runAlertSender() {
  3290. for {
  3291. select {
  3292. case <-sshClient.runCtx.Done():
  3293. return
  3294. case request := <-sshClient.sendAlertRequests:
  3295. payload, err := json.Marshal(request)
  3296. if err != nil {
  3297. log.WithTraceFields(LogFields{"error": err}).Warning("Marshal failed")
  3298. break
  3299. }
  3300. _, _, err = sshClient.sshConn.SendRequest(
  3301. protocol.PSIPHON_API_ALERT_REQUEST_NAME,
  3302. false,
  3303. payload)
  3304. if err != nil && !isExpectedTunnelIOError(err) {
  3305. log.WithTraceFields(LogFields{"error": err}).Warning("SendRequest failed")
  3306. break
  3307. }
  3308. sshClient.Lock()
  3309. sshClient.sentAlertRequests[fmt.Sprintf("%+v", request)] = true
  3310. sshClient.Unlock()
  3311. }
  3312. }
  3313. }
  3314. // enqueueAlertRequest enqueues an alert request to be sent to the client.
  3315. // Only one request is sent per tunnel per protocol.AlertRequest value;
  3316. // subsequent alerts with the same value are dropped. enqueueAlertRequest will
  3317. // not block until the queue exceeds ALERT_REQUEST_QUEUE_BUFFER_SIZE.
  3318. func (sshClient *sshClient) enqueueAlertRequest(request protocol.AlertRequest) {
  3319. sshClient.Lock()
  3320. if sshClient.sentAlertRequests[fmt.Sprintf("%+v", request)] {
  3321. sshClient.Unlock()
  3322. return
  3323. }
  3324. sshClient.Unlock()
  3325. select {
  3326. case <-sshClient.runCtx.Done():
  3327. case sshClient.sendAlertRequests <- request:
  3328. }
  3329. }
  3330. func (sshClient *sshClient) enqueueDisallowedTrafficAlertRequest() {
  3331. reason := protocol.PSIPHON_API_ALERT_DISALLOWED_TRAFFIC
  3332. actionURLs := sshClient.getAlertActionURLs(reason)
  3333. sshClient.enqueueAlertRequest(
  3334. protocol.AlertRequest{
  3335. Reason: reason,
  3336. ActionURLs: actionURLs,
  3337. })
  3338. }
  3339. func (sshClient *sshClient) enqueueUnsafeTrafficAlertRequest(tags []BlocklistTag) {
  3340. reason := protocol.PSIPHON_API_ALERT_UNSAFE_TRAFFIC
  3341. actionURLs := sshClient.getAlertActionURLs(reason)
  3342. for _, tag := range tags {
  3343. sshClient.enqueueAlertRequest(
  3344. protocol.AlertRequest{
  3345. Reason: reason,
  3346. Subject: tag.Subject,
  3347. ActionURLs: actionURLs,
  3348. })
  3349. }
  3350. }
  3351. func (sshClient *sshClient) getAlertActionURLs(alertReason string) []string {
  3352. sshClient.Lock()
  3353. sponsorID, _ := getStringRequestParam(
  3354. sshClient.handshakeState.apiParams, "sponsor_id")
  3355. clientGeoIPData := sshClient.clientGeoIPData
  3356. deviceRegion := sshClient.handshakeState.deviceRegion
  3357. sshClient.Unlock()
  3358. return sshClient.sshServer.support.PsinetDatabase.GetAlertActionURLs(
  3359. alertReason,
  3360. sponsorID,
  3361. clientGeoIPData.Country,
  3362. clientGeoIPData.ASN,
  3363. deviceRegion)
  3364. }
  3365. func (sshClient *sshClient) rejectNewChannel(newChannel ssh.NewChannel, logMessage string) {
  3366. // We always return the reject reason "Prohibited":
  3367. // - Traffic rules and connection limits may prohibit the connection.
  3368. // - External firewall rules may prohibit the connection, and this is not currently
  3369. // distinguishable from other failure modes.
  3370. // - We limit the failure information revealed to the client.
  3371. reason := ssh.Prohibited
  3372. // This log is Debug level, as logMessage can contain user traffic
  3373. // destination address information such as in the "LookupIP failed"
  3374. // and "DialTimeout failed" cases in handleTCPChannel.
  3375. if IsLogLevelDebug() {
  3376. log.WithTraceFields(
  3377. LogFields{
  3378. "sessionID": sshClient.sessionID,
  3379. "channelType": newChannel.ChannelType(),
  3380. "logMessage": logMessage,
  3381. "rejectReason": reason.String(),
  3382. }).Debug("reject new channel")
  3383. }
  3384. // Note: logMessage is internal, for logging only; just the reject reason is sent to the client.
  3385. _ = newChannel.Reject(reason, reason.String())
  3386. }
  3387. // setHandshakeState sets the handshake state -- that it completed and
  3388. // what parameters were passed -- in sshClient. This state is used for allowing
  3389. // port forwards and for future traffic rule selection. setHandshakeState
  3390. // also triggers an immediate traffic rule re-selection, as the rules selected
  3391. // upon tunnel establishment may no longer apply now that handshake values are
  3392. // set.
  3393. //
  3394. // The authorizations received from the client handshake are verified and the
  3395. // resulting list of authorized access types are applied to the client's tunnel
  3396. // and traffic rules.
  3397. //
  3398. // A list of active authorization IDs, authorized access types, and traffic
  3399. // rate limits are returned for responding to the client and logging.
  3400. //
  3401. // All slices in the returnd handshakeStateInfo are read-only, as readers may
  3402. // reference slice contents outside of locks.
  3403. func (sshClient *sshClient) setHandshakeState(
  3404. state handshakeState,
  3405. authorizations []string) (*handshakeStateInfo, error) {
  3406. sshClient.Lock()
  3407. completed := sshClient.handshakeState.completed
  3408. if !completed {
  3409. sshClient.handshakeState = state
  3410. if sshClient.isInproxyTunnelProtocol {
  3411. // Set the client IP and GeoIP data to the value obtained using
  3412. // the original client IP, from the broker, in the handshake.
  3413. // Also update the GeoIP session hash to use the client GeoIP data.
  3414. sshClient.clientIP = sshClient.handshakeState.inproxyClientIP
  3415. sshClient.clientGeoIPData =
  3416. sshClient.handshakeState.inproxyClientGeoIPData
  3417. sshClient.sshServer.setGeoIPSessionCache(
  3418. sshClient.sessionID, sshClient.clientGeoIPData)
  3419. }
  3420. }
  3421. sshClient.Unlock()
  3422. // Client must only perform one handshake
  3423. if completed {
  3424. return nil, errors.TraceNew("handshake already completed")
  3425. }
  3426. if sshClient.isInproxyTunnelProtocol {
  3427. p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(
  3428. sshClient.clientGeoIPData)
  3429. if err != nil {
  3430. return nil, errors.Trace(err)
  3431. }
  3432. // Skip check if no tactics are configured.
  3433. //
  3434. // Disconnect immediately if the tactics for the client restricts usage
  3435. // of the provider ID with inproxy protocols. The probability may be
  3436. // used to influence usage of a given provider with inproxy protocols;
  3437. // but when only that provider works for a given client, and the
  3438. // probability is less than 1.0, the client can retry until it gets a
  3439. // successful coin flip.
  3440. //
  3441. // Clients will also skip inproxy protocol candidates with restricted
  3442. // provider IDs.
  3443. // The client-side probability,
  3444. // RestrictInproxyProviderIDsClientProbability, is applied
  3445. // independently of the server-side coin flip here.
  3446. //
  3447. // At this stage, GeoIP tactics filters are active, but handshake API
  3448. // parameters are not.
  3449. //
  3450. // See the comment in server.LoadConfig regarding provider ID
  3451. // limitations.
  3452. if !p.IsNil() &&
  3453. common.ContainsAny(
  3454. p.KeyStrings(parameters.RestrictInproxyProviderRegions,
  3455. sshClient.sshServer.support.Config.GetProviderID()),
  3456. []string{"", sshClient.sshServer.support.Config.GetRegion()}) {
  3457. if p.WeightedCoinFlip(
  3458. parameters.RestrictInproxyProviderIDsServerProbability) {
  3459. return nil, errRestrictedProvider
  3460. }
  3461. }
  3462. }
  3463. // Verify the authorizations submitted by the client. Verified, active
  3464. // (non-expired) access types will be available for traffic rules
  3465. // filtering.
  3466. //
  3467. // When an authorization is active but expires while the client is
  3468. // connected, the client is disconnected to ensure the access is reset.
  3469. // This is implemented by setting a timer to perform the disconnect at the
  3470. // expiry time of the soonest expiring authorization.
  3471. //
  3472. // sshServer.authorizationSessionIDs tracks the unique mapping of active
  3473. // authorization IDs to client session IDs and is used to detect and
  3474. // prevent multiple malicious clients from reusing a single authorization
  3475. // (within the scope of this server).
  3476. // authorizationIDs and authorizedAccessTypes are returned to the client
  3477. // and logged, respectively; initialize to empty lists so the
  3478. // protocol/logs don't need to handle 'null' values.
  3479. authorizationIDs := make([]string, 0)
  3480. authorizedAccessTypes := make([]string, 0)
  3481. var stopTime time.Time
  3482. for i, authorization := range authorizations {
  3483. // This sanity check mitigates malicious clients causing excess CPU use.
  3484. if i >= MAX_AUTHORIZATIONS {
  3485. log.WithTrace().Warning("too many authorizations")
  3486. break
  3487. }
  3488. if sshClient.sshServer.support.Config.AccessControlVerificationKeyRing == nil {
  3489. if i == 0 {
  3490. log.WithTrace().Warning("authorization not configured")
  3491. }
  3492. continue
  3493. }
  3494. verifiedAuthorization, err := accesscontrol.VerifyAuthorization(
  3495. sshClient.sshServer.support.Config.AccessControlVerificationKeyRing,
  3496. authorization)
  3497. if err != nil {
  3498. log.WithTraceFields(
  3499. LogFields{"error": err}).Warning("verify authorization failed")
  3500. continue
  3501. }
  3502. authorizationID := base64.StdEncoding.EncodeToString(verifiedAuthorization.ID)
  3503. if common.Contains(authorizedAccessTypes, verifiedAuthorization.AccessType) {
  3504. log.WithTraceFields(
  3505. LogFields{"accessType": verifiedAuthorization.AccessType}).Warning("duplicate authorization access type")
  3506. continue
  3507. }
  3508. authorizationIDs = append(authorizationIDs, authorizationID)
  3509. authorizedAccessTypes = append(authorizedAccessTypes, verifiedAuthorization.AccessType)
  3510. if stopTime.IsZero() || stopTime.After(verifiedAuthorization.Expires) {
  3511. stopTime = verifiedAuthorization.Expires
  3512. }
  3513. }
  3514. // Associate all verified authorizationIDs with this client's session ID.
  3515. // Handle cases where previous associations exist:
  3516. //
  3517. // - Multiple malicious clients reusing a single authorization. In this
  3518. // case, authorizations are revoked from the previous client.
  3519. //
  3520. // - The client reconnected with a new session ID due to user toggling.
  3521. // This case is expected due to server affinity. This cannot be
  3522. // distinguished from the previous case and the same action is taken;
  3523. // this will have no impact on a legitimate client as the previous
  3524. // session is dangling.
  3525. //
  3526. // - The client automatically reconnected with the same session ID. This
  3527. // case is not expected as sshServer.registerEstablishedClient
  3528. // synchronously calls sshClient.releaseAuthorizations; as a safe guard,
  3529. // this case is distinguished and no revocation action is taken.
  3530. sshClient.sshServer.authorizationSessionIDsMutex.Lock()
  3531. for _, authorizationID := range authorizationIDs {
  3532. sessionID, ok := sshClient.sshServer.authorizationSessionIDs[authorizationID]
  3533. if ok && sessionID != sshClient.sessionID {
  3534. logFields := LogFields{
  3535. "duplicate_authorization_id": authorizationID,
  3536. }
  3537. // Log this using client, not peer, GeoIP data. In the case of
  3538. // in-proxy tunnel protocols, the client GeoIP fields will be None
  3539. // if a handshake does not complete. However, presense of a
  3540. // (duplicate) authorization implies that the handshake completed.
  3541. sshClient.getClientGeoIPData().SetClientLogFields(logFields)
  3542. duplicateClientGeoIPData := sshClient.sshServer.getGeoIPSessionCache(sessionID)
  3543. if duplicateClientGeoIPData != sshClient.getClientGeoIPData() {
  3544. duplicateClientGeoIPData.SetClientLogFieldsWithPrefix("duplicate_authorization_", logFields)
  3545. }
  3546. logIrregularTunnel(
  3547. sshClient.sshServer.support,
  3548. "", // tunnel protocol is not relevant to authorizations
  3549. 0,
  3550. "", // GeoIP data is added above
  3551. errors.TraceNew("duplicate active authorization"),
  3552. logFields)
  3553. // Invoke asynchronously to avoid deadlocks.
  3554. // TODO: invoke only once for each distinct sessionID?
  3555. go sshClient.sshServer.revokeClientAuthorizations(sessionID)
  3556. }
  3557. sshClient.sshServer.authorizationSessionIDs[authorizationID] = sshClient.sessionID
  3558. }
  3559. sshClient.sshServer.authorizationSessionIDsMutex.Unlock()
  3560. if len(authorizationIDs) > 0 {
  3561. sshClient.Lock()
  3562. // Make the authorizedAccessTypes available for traffic rules filtering.
  3563. sshClient.handshakeState.activeAuthorizationIDs = authorizationIDs
  3564. sshClient.handshakeState.authorizedAccessTypes = authorizedAccessTypes
  3565. // On exit, sshClient.runTunnel will call releaseAuthorizations, which
  3566. // will release the authorization IDs so the client can reconnect and
  3567. // present the same authorizations again. sshClient.runTunnel will
  3568. // also cancel the stopTimer in case it has not yet fired.
  3569. // Note: termination of the stopTimer goroutine is not synchronized.
  3570. sshClient.releaseAuthorizations = func() {
  3571. sshClient.sshServer.authorizationSessionIDsMutex.Lock()
  3572. for _, authorizationID := range authorizationIDs {
  3573. sessionID, ok := sshClient.sshServer.authorizationSessionIDs[authorizationID]
  3574. if ok && sessionID == sshClient.sessionID {
  3575. delete(sshClient.sshServer.authorizationSessionIDs, authorizationID)
  3576. }
  3577. }
  3578. sshClient.sshServer.authorizationSessionIDsMutex.Unlock()
  3579. }
  3580. sshClient.stopTimer = time.AfterFunc(
  3581. time.Until(stopTime),
  3582. func() {
  3583. sshClient.stop()
  3584. })
  3585. sshClient.Unlock()
  3586. }
  3587. upstreamBytesPerSecond, downstreamBytesPerSecond := sshClient.setTrafficRules()
  3588. sshClient.setOSLConfig()
  3589. // Set destination bytes metrics.
  3590. //
  3591. // Limitation: this is a one-time operation and doesn't get reset when
  3592. // tactics are hot-reloaded. This allows us to simply retain any
  3593. // destination byte counts accumulated and eventually log in
  3594. // server_tunnel, without having to deal with a destination change
  3595. // mid-tunnel. As typical tunnels are short, and destination changes can
  3596. // be applied gradually, handling mid-tunnel changes is not a priority.
  3597. sshClient.setDestinationBytesMetrics()
  3598. info := &handshakeStateInfo{
  3599. activeAuthorizationIDs: authorizationIDs,
  3600. authorizedAccessTypes: authorizedAccessTypes,
  3601. upstreamBytesPerSecond: upstreamBytesPerSecond,
  3602. downstreamBytesPerSecond: downstreamBytesPerSecond,
  3603. }
  3604. // Relay the steering IP to the API handshake handler.
  3605. if sshClient.additionalTransportData != nil {
  3606. info.steeringIP = sshClient.additionalTransportData.steeringIP
  3607. }
  3608. return info, nil
  3609. }
  3610. // getHandshaked returns whether the client has completed a handshake API
  3611. // request and whether the traffic rules that were selected after the
  3612. // handshake immediately exhaust the client.
  3613. //
  3614. // When the client is immediately exhausted it will be closed; but this
  3615. // takes effect asynchronously. The "exhausted" return value is used to
  3616. // prevent API requests by clients that will close.
  3617. func (sshClient *sshClient) getHandshaked() (bool, bool) {
  3618. sshClient.Lock()
  3619. defer sshClient.Unlock()
  3620. completed := sshClient.handshakeState.completed
  3621. exhausted := false
  3622. // Notes:
  3623. // - "Immediately exhausted" is when CloseAfterExhausted is set and
  3624. // either ReadUnthrottledBytes or WriteUnthrottledBytes starts from
  3625. // 0, so no bytes would be read or written. This check does not
  3626. // examine whether 0 bytes _remain_ in the ThrottledConn.
  3627. // - This check is made against the current traffic rules, which
  3628. // could have changed in a hot reload since the handshake.
  3629. if completed &&
  3630. *sshClient.trafficRules.RateLimits.CloseAfterExhausted &&
  3631. (*sshClient.trafficRules.RateLimits.ReadUnthrottledBytes == 0 ||
  3632. *sshClient.trafficRules.RateLimits.WriteUnthrottledBytes == 0) {
  3633. exhausted = true
  3634. }
  3635. return completed, exhausted
  3636. }
  3637. func (sshClient *sshClient) getDisableDiscovery() bool {
  3638. sshClient.Lock()
  3639. defer sshClient.Unlock()
  3640. return *sshClient.trafficRules.DisableDiscovery
  3641. }
  3642. func (sshClient *sshClient) updateAPIParameters(
  3643. apiParams common.APIParameters) {
  3644. sshClient.Lock()
  3645. defer sshClient.Unlock()
  3646. // Only update after handshake has initialized API params.
  3647. if !sshClient.handshakeState.completed {
  3648. return
  3649. }
  3650. for name, value := range apiParams {
  3651. sshClient.handshakeState.apiParams[name] = value
  3652. }
  3653. }
  3654. func (sshClient *sshClient) acceptDomainBytes() bool {
  3655. sshClient.Lock()
  3656. defer sshClient.Unlock()
  3657. // When the domain bytes checksum differs from the checksum sent to the
  3658. // client in the handshake response, the psinet regex configuration has
  3659. // changed. In this case, drop the stats so we don't continue to record
  3660. // stats as previously configured.
  3661. //
  3662. // Limitations:
  3663. // - The checksum comparison may result in dropping some stats for a
  3664. // domain that remains in the new configuration.
  3665. // - We don't push new regexs to the clients, so clients that remain
  3666. // connected will continue to send stats that will be dropped; and
  3667. // those clients will not send stats as newly configured until after
  3668. // reconnecting.
  3669. // - Due to the design of
  3670. // transferstats.ReportRecentBytesTransferredForServer in the client,
  3671. // the client may accumulate stats, reconnect before its next status
  3672. // request, get a new regex configuration, and then send the previously
  3673. // accumulated stats in its next status request. The checksum scheme
  3674. // won't prevent the reporting of those stats.
  3675. sponsorID, _ := getStringRequestParam(sshClient.handshakeState.apiParams, "sponsor_id")
  3676. domainBytesChecksum := sshClient.sshServer.support.PsinetDatabase.GetDomainBytesChecksum(sponsorID)
  3677. return bytes.Equal(sshClient.handshakeState.domainBytesChecksum, domainBytesChecksum)
  3678. }
  3679. // setOSLConfig resets the client's OSL seed state based on the latest OSL config
  3680. // As sshClient.oslClientSeedState may be reset by a concurrent goroutine,
  3681. // oslClientSeedState must only be accessed within the sshClient mutex.
  3682. func (sshClient *sshClient) setOSLConfig() {
  3683. sshClient.Lock()
  3684. defer sshClient.Unlock()
  3685. propagationChannelID, err := getStringRequestParam(
  3686. sshClient.handshakeState.apiParams, "propagation_channel_id")
  3687. if err != nil {
  3688. // This should not fail as long as client has sent valid handshake
  3689. return
  3690. }
  3691. // Use a cached seed state if one is found for the client's
  3692. // session ID. This enables resuming progress made in a previous
  3693. // tunnel.
  3694. // Note: go-cache is already concurency safe; the additional mutex
  3695. // is necessary to guarantee that Get/Delete is atomic; although in
  3696. // practice no two concurrent clients should ever supply the same
  3697. // session ID.
  3698. sshClient.sshServer.oslSessionCacheMutex.Lock()
  3699. oslClientSeedState, found := sshClient.sshServer.oslSessionCache.Get(sshClient.sessionID)
  3700. if found {
  3701. sshClient.sshServer.oslSessionCache.Delete(sshClient.sessionID)
  3702. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  3703. sshClient.oslClientSeedState = oslClientSeedState.(*osl.ClientSeedState)
  3704. sshClient.oslClientSeedState.Resume(sshClient.signalIssueSLOKs)
  3705. return
  3706. }
  3707. sshClient.sshServer.oslSessionCacheMutex.Unlock()
  3708. // Two limitations when setOSLConfig() is invoked due to an
  3709. // OSL config hot reload:
  3710. //
  3711. // 1. any partial progress towards SLOKs is lost.
  3712. //
  3713. // 2. all existing osl.ClientSeedPortForwards for existing
  3714. // port forwards will not send progress to the new client
  3715. // seed state.
  3716. // Use the client, not peer, GeoIP data. In the case of in-proxy tunnel
  3717. // protocols, the client GeoIP fields will be populated using the
  3718. // original client IP already received, from the broker, in the handshake.
  3719. sshClient.oslClientSeedState = sshClient.sshServer.support.OSLConfig.NewClientSeedState(
  3720. sshClient.clientGeoIPData.Country,
  3721. propagationChannelID,
  3722. sshClient.signalIssueSLOKs)
  3723. }
  3724. // newClientSeedPortForward will return nil when no seeding is
  3725. // associated with the specified ipAddress.
  3726. func (sshClient *sshClient) newClientSeedPortForward(IPAddress net.IP) *osl.ClientSeedPortForward {
  3727. sshClient.Lock()
  3728. defer sshClient.Unlock()
  3729. // Will not be initialized before handshake.
  3730. if sshClient.oslClientSeedState == nil {
  3731. return nil
  3732. }
  3733. lookupASN := func(IP net.IP) string {
  3734. // TODO: there are potentially multiple identical geo IP lookups per new
  3735. // port forward and flow, cache and use result of first lookup.
  3736. return sshClient.sshServer.support.GeoIPService.LookupISPForIP(IP).ASN
  3737. }
  3738. return sshClient.oslClientSeedState.NewClientSeedPortForward(IPAddress, lookupASN)
  3739. }
  3740. // getOSLSeedPayload returns a payload containing all seeded SLOKs for
  3741. // this client's session.
  3742. func (sshClient *sshClient) getOSLSeedPayload() *osl.SeedPayload {
  3743. sshClient.Lock()
  3744. defer sshClient.Unlock()
  3745. // Will not be initialized before handshake.
  3746. if sshClient.oslClientSeedState == nil {
  3747. return &osl.SeedPayload{SLOKs: make([]*osl.SLOK, 0)}
  3748. }
  3749. return sshClient.oslClientSeedState.GetSeedPayload()
  3750. }
  3751. func (sshClient *sshClient) clearOSLSeedPayload() {
  3752. sshClient.Lock()
  3753. defer sshClient.Unlock()
  3754. sshClient.oslClientSeedState.ClearSeedPayload()
  3755. }
  3756. func (sshClient *sshClient) setDestinationBytesMetrics() {
  3757. sshClient.Lock()
  3758. defer sshClient.Unlock()
  3759. // Limitation: the server-side tactics cache is used to avoid the overhead
  3760. // of an additional tactics filtering per tunnel. As this cache is
  3761. // designed for GeoIP filtering only, handshake API parameters are not
  3762. // applied to tactics filtering in this case.
  3763. //
  3764. // Use the client, not peer, GeoIP data. In the case of in-proxy tunnel
  3765. // protocols, the client GeoIP fields will be populated using the
  3766. // original client IP already received, from the broker, in the handshake.
  3767. p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(
  3768. sshClient.clientGeoIPData)
  3769. if err != nil {
  3770. log.WithTraceFields(LogFields{"error": err}).Warning("get tactics failed")
  3771. return
  3772. }
  3773. if p.IsNil() {
  3774. return
  3775. }
  3776. ASNs := p.Strings(parameters.DestinationBytesMetricsASNs)
  3777. if len(ASNs) == 0 {
  3778. return
  3779. }
  3780. sshClient.destinationBytesMetrics = make(map[string]*protocolDestinationBytesMetrics)
  3781. for _, ASN := range ASNs {
  3782. if ASN != "" {
  3783. sshClient.destinationBytesMetrics[ASN] = &protocolDestinationBytesMetrics{}
  3784. }
  3785. }
  3786. }
  3787. func (sshClient *sshClient) newDestinationBytesMetricsUpdater(
  3788. portForwardType int, IPAddress net.IP) *destinationBytesMetrics {
  3789. sshClient.Lock()
  3790. defer sshClient.Unlock()
  3791. if sshClient.destinationBytesMetrics == nil {
  3792. return nil
  3793. }
  3794. destinationASN := sshClient.sshServer.support.GeoIPService.LookupISPForIP(IPAddress).ASN
  3795. // Future enhancement: for 5 or fewer ASNs, iterate over a slice instead
  3796. // of using a map? See, for example, stringLookupThreshold in
  3797. // common/tactics.
  3798. metrics, ok := sshClient.destinationBytesMetrics[destinationASN]
  3799. if !ok {
  3800. return nil
  3801. }
  3802. if portForwardType == portForwardTypeTCP {
  3803. return &metrics.tcpMetrics
  3804. }
  3805. return &metrics.udpMetrics
  3806. }
  3807. func (sshClient *sshClient) getPortForwardActivityUpdaters(
  3808. portForwardType int, IPAddress net.IP) []common.ActivityUpdater {
  3809. var updaters []common.ActivityUpdater
  3810. clientSeedPortForward := sshClient.newClientSeedPortForward(IPAddress)
  3811. if clientSeedPortForward != nil {
  3812. updaters = append(updaters, clientSeedPortForward)
  3813. }
  3814. destinationBytesMetrics := sshClient.newDestinationBytesMetricsUpdater(portForwardType, IPAddress)
  3815. if destinationBytesMetrics != nil {
  3816. updaters = append(updaters, destinationBytesMetrics)
  3817. }
  3818. return updaters
  3819. }
  3820. func (sshClient *sshClient) newInproxyProxyQualityTracker() *inproxyProxyQualityTracker {
  3821. sshClient.Lock()
  3822. defer sshClient.Unlock()
  3823. if !protocol.TunnelProtocolUsesInproxy(sshClient.tunnelProtocol) {
  3824. return nil
  3825. }
  3826. // Limitation: assumes no GeoIP targeting for in-proxy quality
  3827. // configuration. The original client GeoIP information is not available
  3828. // until after the Psiphon handshake completes, and we want to include
  3829. // earlier tunnel bytes, including any liveness test.
  3830. //
  3831. // As a future enhancement, quality tracker targets could be _extended_ by
  3832. // GeoIP in reportProxyQuality.
  3833. //
  3834. // Note that the in-proxy broker also enforces InproxyEnableProxyQuality,
  3835. // and also assumes no GeoIP targetting.
  3836. p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(NewGeoIPData())
  3837. if err != nil {
  3838. log.WithTraceFields(LogFields{"error": err}).Warning("get tactics failed")
  3839. return nil
  3840. }
  3841. if p.IsNil() {
  3842. return nil
  3843. }
  3844. // InproxyEnableProxyQuality indicates if proxy quality reporting is
  3845. // enabled or not.
  3846. //
  3847. // Note that flipping InproxyEnableProxyQuality to false in tactics does
  3848. // not interrupt any tracker already in progress.
  3849. if !p.Bool(parameters.InproxyEnableProxyQuality) {
  3850. return nil
  3851. }
  3852. tracker := newInproxyProxyQualityTracker(
  3853. sshClient,
  3854. int64(p.Int(parameters.InproxyProxyQualityTargetUpstreamBytes)),
  3855. int64(p.Int(parameters.InproxyProxyQualityTargetDownstreamBytes)),
  3856. p.Duration(parameters.InproxyProxyQualityTargetDuration))
  3857. sshClient.inproxyProxyQualityTracker = tracker
  3858. return tracker
  3859. }
  3860. func (sshClient *sshClient) reportProxyQuality() {
  3861. sshClient.Lock()
  3862. defer sshClient.Unlock()
  3863. if !protocol.TunnelProtocolUsesInproxy(sshClient.tunnelProtocol) ||
  3864. !sshClient.handshakeState.completed {
  3865. log.Warning("unexpected reportProxyQuality call")
  3866. return
  3867. }
  3868. if sshClient.handshakeState.inproxyMatchedPersonal {
  3869. // Skip quality reporting for personal paired proxies. Brokers don't use
  3870. // quality data for personal matching, and no quality data from personal
  3871. // pairing should not influence common matching prioritization.
  3872. return
  3873. }
  3874. // Enforce InproxyEnableProxyQualityClientRegions. If set, this is a
  3875. // restricted list of client regions for which quality is reported.
  3876. //
  3877. // Note that it's possible to have an soft client GeoIP limit given that
  3878. // in-proxy protocols are default disabled and enabled via
  3879. // LimitTunnelProtocols. However, that parameter is enforced on the
  3880. // client side.
  3881. //
  3882. // Now that that the Psiphon handshake is complete, the original client IP
  3883. // is known. Here, as in newInproxyProxyQualityTracker, the tactics
  3884. // filter remains non-region specific, so
  3885. // InproxyEnableProxyQualityClientRegions should be a global list. This
  3886. // accommodates a simpler configuration vs., for example, using many
  3887. // region-specific filters to override InproxyEnableProxyQuality.
  3888. //
  3889. // Future enhancement: here, we could extend inproxyProxyQualityTracker
  3890. // targets with client GeoIP-specific values.
  3891. p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(NewGeoIPData())
  3892. if err != nil || p.IsNil() {
  3893. log.WithTraceFields(LogFields{"error": err}).Warning("get tactics failed")
  3894. return
  3895. }
  3896. enabledRegions := p.Strings(parameters.InproxyEnableProxyQualityClientRegions)
  3897. if len(enabledRegions) > 0 &&
  3898. !common.Contains(enabledRegions, sshClient.clientGeoIPData.Country) {
  3899. // Quality reporting is restricted to specific regions, and this
  3900. // client's region is not included.
  3901. return
  3902. }
  3903. // ReportQuality will enqueue the quality data to be sent to brokers.
  3904. // There's a delay before making broker requests, in an effort to batch
  3905. // up data. Requests may be made to only a subset of brokers in
  3906. // InproxyAllBrokerSpecs, depending on whether the broker is expected to
  3907. // trust this server's session public key; see ReportQuality.
  3908. sshClient.sshServer.inproxyBrokerSessions.ReportQuality(
  3909. sshClient.handshakeState.inproxyProxyID,
  3910. sshClient.peerGeoIPData.ASN,
  3911. sshClient.clientGeoIPData.ASN)
  3912. }
  3913. func (sshClient *sshClient) newSSHProtocolBytesTracker() *sshProtocolBytesTracker {
  3914. sshClient.Lock()
  3915. defer sshClient.Unlock()
  3916. tracker := newSSHProtocolBytesTracker(sshClient)
  3917. sshClient.sshProtocolBytesTracker = tracker
  3918. return tracker
  3919. }
  3920. func (sshClient *sshClient) getTunnelActivityUpdaters() []common.ActivityUpdater {
  3921. var updaters []common.ActivityUpdater
  3922. inproxyProxyQualityTracker := sshClient.newInproxyProxyQualityTracker()
  3923. if inproxyProxyQualityTracker != nil {
  3924. updaters = append(updaters, inproxyProxyQualityTracker)
  3925. }
  3926. sshProtocolBytesTracker := sshClient.newSSHProtocolBytesTracker()
  3927. updaters = append(updaters, sshProtocolBytesTracker)
  3928. return updaters
  3929. }
  3930. // setTrafficRules resets the client's traffic rules based on the latest server config
  3931. // and client properties. As sshClient.trafficRules may be reset by a concurrent
  3932. // goroutine, trafficRules must only be accessed within the sshClient mutex.
  3933. func (sshClient *sshClient) setTrafficRules() (int64, int64) {
  3934. sshClient.Lock()
  3935. defer sshClient.Unlock()
  3936. isFirstTunnelInSession := sshClient.isFirstTunnelInSession &&
  3937. sshClient.handshakeState.establishedTunnelsCount == 0
  3938. // In the case of in-proxy tunnel protocols, the client GeoIP data is None
  3939. // until the handshake completes. Pre-handhake, the rate limit is
  3940. // determined by EstablishmentRead/WriteBytesPerSecond, which default to
  3941. // unthrottled, the recommended setting; in addition, no port forwards
  3942. // are permitted until after the handshake completes, at which time
  3943. // setTrafficRules will be called again with the client GeoIP data
  3944. // populated using the original client IP received from the in-proxy
  3945. // broker.
  3946. sshClient.trafficRules = sshClient.sshServer.support.TrafficRulesSet.GetTrafficRules(
  3947. sshClient.sshServer.support.Config.GetProviderID(),
  3948. isFirstTunnelInSession,
  3949. sshClient.tunnelProtocol,
  3950. sshClient.clientGeoIPData,
  3951. sshClient.handshakeState)
  3952. if sshClient.throttledConn != nil {
  3953. // Any existing throttling state is reset.
  3954. sshClient.throttledConn.SetLimits(
  3955. sshClient.trafficRules.RateLimits.CommonRateLimits(
  3956. sshClient.handshakeState.completed))
  3957. }
  3958. return *sshClient.trafficRules.RateLimits.ReadBytesPerSecond,
  3959. *sshClient.trafficRules.RateLimits.WriteBytesPerSecond
  3960. }
  3961. func (sshClient *sshClient) rateLimits() common.RateLimits {
  3962. sshClient.Lock()
  3963. defer sshClient.Unlock()
  3964. return sshClient.trafficRules.RateLimits.CommonRateLimits(
  3965. sshClient.handshakeState.completed)
  3966. }
  3967. func (sshClient *sshClient) idleTCPPortForwardTimeout() time.Duration {
  3968. sshClient.Lock()
  3969. defer sshClient.Unlock()
  3970. return time.Duration(*sshClient.trafficRules.IdleTCPPortForwardTimeoutMilliseconds) * time.Millisecond
  3971. }
  3972. func (sshClient *sshClient) idleUDPPortForwardTimeout() time.Duration {
  3973. sshClient.Lock()
  3974. defer sshClient.Unlock()
  3975. return time.Duration(*sshClient.trafficRules.IdleUDPPortForwardTimeoutMilliseconds) * time.Millisecond
  3976. }
  3977. func (sshClient *sshClient) setTCPPortForwardDialingAvailableSignal(signal context.CancelFunc) {
  3978. sshClient.Lock()
  3979. defer sshClient.Unlock()
  3980. sshClient.tcpPortForwardDialingAvailableSignal = signal
  3981. }
  3982. const (
  3983. portForwardTypeTCP = iota
  3984. portForwardTypeUDP
  3985. )
  3986. func (sshClient *sshClient) isPortForwardPermitted(
  3987. portForwardType int,
  3988. remoteIP net.IP,
  3989. port int) bool {
  3990. // Disallow connection to bogons.
  3991. //
  3992. // As a security measure, this is a failsafe. The server should be run on a
  3993. // host with correctly configured firewall rules.
  3994. //
  3995. // This check also avoids spurious disallowed traffic alerts for destinations
  3996. // that are impossible to reach.
  3997. if !sshClient.sshServer.support.Config.AllowBogons && common.IsBogon(remoteIP) {
  3998. return false
  3999. }
  4000. // Blocklist check.
  4001. //
  4002. // Limitation: isPortForwardPermitted is not called in transparent DNS
  4003. // forwarding cases. As the destination IP address is rewritten in these
  4004. // cases, a blocklist entry won't be dialed in any case. However, no logs
  4005. // will be recorded.
  4006. if !sshClient.isIPPermitted(remoteIP) {
  4007. return false
  4008. }
  4009. // Don't lock before calling logBlocklistHits.
  4010. // Unlock before calling enqueueDisallowedTrafficAlertRequest/log.
  4011. sshClient.Lock()
  4012. allowed := true
  4013. // Client must complete handshake before port forwards are permitted.
  4014. if !sshClient.handshakeState.completed {
  4015. allowed = false
  4016. }
  4017. if allowed {
  4018. // Traffic rules checks.
  4019. switch portForwardType {
  4020. case portForwardTypeTCP:
  4021. if !sshClient.trafficRules.AllowTCPPort(
  4022. sshClient.sshServer.support.GeoIPService, remoteIP, port) {
  4023. allowed = false
  4024. }
  4025. case portForwardTypeUDP:
  4026. if !sshClient.trafficRules.AllowUDPPort(
  4027. sshClient.sshServer.support.GeoIPService, remoteIP, port) {
  4028. allowed = false
  4029. }
  4030. }
  4031. }
  4032. sshClient.Unlock()
  4033. if allowed {
  4034. return true
  4035. }
  4036. switch portForwardType {
  4037. case portForwardTypeTCP:
  4038. sshClient.updateQualityMetricsWithTCPRejectedDisallowed()
  4039. case portForwardTypeUDP:
  4040. sshClient.updateQualityMetricsWithUDPRejectedDisallowed()
  4041. }
  4042. sshClient.enqueueDisallowedTrafficAlertRequest()
  4043. if IsLogLevelDebug() {
  4044. log.WithTraceFields(
  4045. LogFields{
  4046. "type": portForwardType,
  4047. "port": port,
  4048. }).Debug("port forward denied by traffic rules")
  4049. }
  4050. return false
  4051. }
  4052. // isDomainPermitted returns true when the specified domain may be resolved
  4053. // and returns false and a reject reason otherwise.
  4054. func (sshClient *sshClient) isDomainPermitted(domain string) (bool, string) {
  4055. // We're not doing comprehensive validation, to avoid overhead per port
  4056. // forward. This is a simple sanity check to ensure we don't process
  4057. // blantantly invalid input.
  4058. //
  4059. // TODO: validate with dns.IsDomainName?
  4060. if len(domain) > 255 {
  4061. return false, "invalid domain name"
  4062. }
  4063. // Don't even attempt to resolve the default mDNS top-level domain.
  4064. // Non-default cases won't be caught here but should fail to resolve due
  4065. // to the PreferGo setting in net.Resolver.
  4066. if strings.HasSuffix(domain, ".local") {
  4067. return false, "port forward not permitted"
  4068. }
  4069. tags := sshClient.sshServer.support.Blocklist.LookupDomain(domain)
  4070. if len(tags) > 0 {
  4071. sshClient.logBlocklistHits(nil, domain, tags)
  4072. if sshClient.sshServer.support.Config.BlocklistActive {
  4073. // Actively alert and block
  4074. sshClient.enqueueUnsafeTrafficAlertRequest(tags)
  4075. return false, "port forward not permitted"
  4076. }
  4077. }
  4078. return true, ""
  4079. }
  4080. func (sshClient *sshClient) isIPPermitted(remoteIP net.IP) bool {
  4081. tags := sshClient.sshServer.support.Blocklist.LookupIP(remoteIP)
  4082. if len(tags) > 0 {
  4083. sshClient.logBlocklistHits(remoteIP, "", tags)
  4084. if sshClient.sshServer.support.Config.BlocklistActive {
  4085. // Actively alert and block
  4086. sshClient.enqueueUnsafeTrafficAlertRequest(tags)
  4087. return false
  4088. }
  4089. }
  4090. return true
  4091. }
  4092. func (sshClient *sshClient) isTCPDialingPortForwardLimitExceeded() bool {
  4093. sshClient.Lock()
  4094. defer sshClient.Unlock()
  4095. state := &sshClient.tcpTrafficState
  4096. max := *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  4097. if max > 0 && state.concurrentDialingPortForwardCount >= int64(max) {
  4098. return true
  4099. }
  4100. return false
  4101. }
  4102. func (sshClient *sshClient) getTCPPortForwardQueueSize() int {
  4103. sshClient.Lock()
  4104. defer sshClient.Unlock()
  4105. return *sshClient.trafficRules.MaxTCPPortForwardCount +
  4106. *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  4107. }
  4108. func (sshClient *sshClient) getDialTCPPortForwardTimeoutMilliseconds() int {
  4109. sshClient.Lock()
  4110. defer sshClient.Unlock()
  4111. return *sshClient.trafficRules.DialTCPPortForwardTimeoutMilliseconds
  4112. }
  4113. func (sshClient *sshClient) dialingTCPPortForward() {
  4114. sshClient.Lock()
  4115. defer sshClient.Unlock()
  4116. state := &sshClient.tcpTrafficState
  4117. state.concurrentDialingPortForwardCount += 1
  4118. if state.concurrentDialingPortForwardCount > state.peakConcurrentDialingPortForwardCount {
  4119. state.peakConcurrentDialingPortForwardCount = state.concurrentDialingPortForwardCount
  4120. }
  4121. }
  4122. func (sshClient *sshClient) abortedTCPPortForward() {
  4123. sshClient.Lock()
  4124. defer sshClient.Unlock()
  4125. sshClient.tcpTrafficState.concurrentDialingPortForwardCount -= 1
  4126. }
  4127. func (sshClient *sshClient) allocatePortForward(portForwardType int) bool {
  4128. sshClient.Lock()
  4129. defer sshClient.Unlock()
  4130. // Check if at port forward limit. The subsequent counter
  4131. // changes must be atomic with the limit check to ensure
  4132. // the counter never exceeds the limit in the case of
  4133. // concurrent allocations.
  4134. var max int
  4135. var state *trafficState
  4136. if portForwardType == portForwardTypeTCP {
  4137. max = *sshClient.trafficRules.MaxTCPPortForwardCount
  4138. state = &sshClient.tcpTrafficState
  4139. } else {
  4140. max = *sshClient.trafficRules.MaxUDPPortForwardCount
  4141. state = &sshClient.udpTrafficState
  4142. }
  4143. if max > 0 && state.concurrentPortForwardCount >= int64(max) {
  4144. return false
  4145. }
  4146. // Update port forward counters.
  4147. if portForwardType == portForwardTypeTCP {
  4148. // Assumes TCP port forwards called dialingTCPPortForward
  4149. state.concurrentDialingPortForwardCount -= 1
  4150. if sshClient.tcpPortForwardDialingAvailableSignal != nil {
  4151. max := *sshClient.trafficRules.MaxTCPDialingPortForwardCount
  4152. if max <= 0 || state.concurrentDialingPortForwardCount < int64(max) {
  4153. sshClient.tcpPortForwardDialingAvailableSignal()
  4154. }
  4155. }
  4156. }
  4157. state.concurrentPortForwardCount += 1
  4158. if state.concurrentPortForwardCount > state.peakConcurrentPortForwardCount {
  4159. state.peakConcurrentPortForwardCount = state.concurrentPortForwardCount
  4160. }
  4161. state.totalPortForwardCount += 1
  4162. return true
  4163. }
  4164. // establishedPortForward increments the concurrent port
  4165. // forward counter. closedPortForward decrements it, so it
  4166. // must always be called for each establishedPortForward
  4167. // call.
  4168. //
  4169. // When at the limit of established port forwards, the LRU
  4170. // existing port forward is closed to make way for the newly
  4171. // established one. There can be a minor delay as, in addition
  4172. // to calling Close() on the port forward net.Conn,
  4173. // establishedPortForward waits for the LRU's closedPortForward()
  4174. // call which will decrement the concurrent counter. This
  4175. // ensures all resources associated with the LRU (socket,
  4176. // goroutine) are released or will very soon be released before
  4177. // proceeding.
  4178. func (sshClient *sshClient) establishedPortForward(
  4179. portForwardType int, portForwardLRU *common.LRUConns) {
  4180. // Do not lock sshClient here.
  4181. var state *trafficState
  4182. if portForwardType == portForwardTypeTCP {
  4183. state = &sshClient.tcpTrafficState
  4184. } else {
  4185. state = &sshClient.udpTrafficState
  4186. }
  4187. // When the maximum number of port forwards is already
  4188. // established, close the LRU. CloseOldest will call
  4189. // Close on the port forward net.Conn. Both TCP and
  4190. // UDP port forwards have handler goroutines that may
  4191. // be blocked calling Read on the net.Conn. Close will
  4192. // eventually interrupt the Read and cause the handlers
  4193. // to exit, but not immediately. So the following logic
  4194. // waits for a LRU handler to be interrupted and signal
  4195. // availability.
  4196. //
  4197. // Notes:
  4198. //
  4199. // - the port forward limit can change via a traffic
  4200. // rules hot reload; the condition variable handles
  4201. // this case whereas a channel-based semaphore would
  4202. // not.
  4203. //
  4204. // - if a number of goroutines exceeding the total limit
  4205. // arrive here all concurrently, some CloseOldest() calls
  4206. // will have no effect as there can be less existing port
  4207. // forwards than new ones. In this case, the new port
  4208. // forward will be delayed. This is highly unlikely in
  4209. // practise since UDP calls to establishedPortForward are
  4210. // serialized and TCP calls are limited by the dial
  4211. // queue/count.
  4212. if !sshClient.allocatePortForward(portForwardType) {
  4213. portForwardLRU.CloseOldest()
  4214. if IsLogLevelDebug() {
  4215. log.WithTrace().Debug("closed LRU port forward")
  4216. }
  4217. state.availablePortForwardCond.L.Lock()
  4218. for !sshClient.allocatePortForward(portForwardType) {
  4219. state.availablePortForwardCond.Wait()
  4220. }
  4221. state.availablePortForwardCond.L.Unlock()
  4222. }
  4223. }
  4224. func (sshClient *sshClient) closedPortForward(
  4225. portForwardType int, bytesUp, bytesDown int64) {
  4226. sshClient.Lock()
  4227. var state *trafficState
  4228. if portForwardType == portForwardTypeTCP {
  4229. state = &sshClient.tcpTrafficState
  4230. } else {
  4231. state = &sshClient.udpTrafficState
  4232. }
  4233. state.concurrentPortForwardCount -= 1
  4234. state.bytesUp += bytesUp
  4235. state.bytesDown += bytesDown
  4236. sshClient.Unlock()
  4237. // Signal any goroutine waiting in establishedPortForward
  4238. // that an established port forward slot is available.
  4239. state.availablePortForwardCond.Signal()
  4240. }
  4241. func (sshClient *sshClient) updateQualityMetricsWithDialResult(
  4242. tcpPortForwardDialSuccess bool, dialDuration time.Duration, IP net.IP) {
  4243. sshClient.Lock()
  4244. defer sshClient.Unlock()
  4245. if tcpPortForwardDialSuccess {
  4246. sshClient.qualityMetrics.TCPPortForwardDialedCount += 1
  4247. sshClient.qualityMetrics.TCPPortForwardDialedDuration += dialDuration
  4248. if IP.To4() != nil {
  4249. sshClient.qualityMetrics.TCPIPv4PortForwardDialedCount += 1
  4250. sshClient.qualityMetrics.TCPIPv4PortForwardDialedDuration += dialDuration
  4251. } else if IP != nil {
  4252. sshClient.qualityMetrics.TCPIPv6PortForwardDialedCount += 1
  4253. sshClient.qualityMetrics.TCPIPv6PortForwardDialedDuration += dialDuration
  4254. }
  4255. } else {
  4256. sshClient.qualityMetrics.TCPPortForwardFailedCount += 1
  4257. sshClient.qualityMetrics.TCPPortForwardFailedDuration += dialDuration
  4258. if IP.To4() != nil {
  4259. sshClient.qualityMetrics.TCPIPv4PortForwardFailedCount += 1
  4260. sshClient.qualityMetrics.TCPIPv4PortForwardFailedDuration += dialDuration
  4261. } else if IP != nil {
  4262. sshClient.qualityMetrics.TCPIPv6PortForwardFailedCount += 1
  4263. sshClient.qualityMetrics.TCPIPv6PortForwardFailedDuration += dialDuration
  4264. }
  4265. }
  4266. }
  4267. func (sshClient *sshClient) updateQualityMetricsWithRejectedDialingLimit() {
  4268. sshClient.Lock()
  4269. defer sshClient.Unlock()
  4270. sshClient.qualityMetrics.TCPPortForwardRejectedDialingLimitCount += 1
  4271. }
  4272. func (sshClient *sshClient) updateQualityMetricsWithTCPRejectedDisallowed() {
  4273. sshClient.Lock()
  4274. defer sshClient.Unlock()
  4275. sshClient.qualityMetrics.TCPPortForwardRejectedDisallowedCount += 1
  4276. }
  4277. func (sshClient *sshClient) updateQualityMetricsWithUDPRejectedDisallowed() {
  4278. sshClient.Lock()
  4279. defer sshClient.Unlock()
  4280. sshClient.qualityMetrics.UDPPortForwardRejectedDisallowedCount += 1
  4281. }
  4282. func (sshClient *sshClient) updateQualityMetricsWithDNSResult(
  4283. success bool, duration time.Duration, resolverIP net.IP) {
  4284. sshClient.Lock()
  4285. defer sshClient.Unlock()
  4286. resolver := ""
  4287. if resolverIP != nil {
  4288. resolver = resolverIP.String()
  4289. }
  4290. if success {
  4291. sshClient.qualityMetrics.DNSCount["ALL"] += 1
  4292. sshClient.qualityMetrics.DNSDuration["ALL"] += duration
  4293. if resolver != "" {
  4294. sshClient.qualityMetrics.DNSCount[resolver] += 1
  4295. sshClient.qualityMetrics.DNSDuration[resolver] += duration
  4296. }
  4297. } else {
  4298. sshClient.qualityMetrics.DNSFailedCount["ALL"] += 1
  4299. sshClient.qualityMetrics.DNSFailedDuration["ALL"] += duration
  4300. if resolver != "" {
  4301. sshClient.qualityMetrics.DNSFailedCount[resolver] += 1
  4302. sshClient.qualityMetrics.DNSFailedDuration[resolver] += duration
  4303. }
  4304. }
  4305. }
  4306. func (sshClient *sshClient) handleTCPChannel(
  4307. remainingDialTimeout time.Duration,
  4308. hostToConnect string,
  4309. portToConnect int,
  4310. doSplitTunnel bool,
  4311. newChannel ssh.NewChannel) {
  4312. // Assumptions:
  4313. // - sshClient.dialingTCPPortForward() has been called
  4314. // - remainingDialTimeout > 0
  4315. established := false
  4316. defer func() {
  4317. if !established {
  4318. sshClient.abortedTCPPortForward()
  4319. }
  4320. }()
  4321. // Validate the domain name and check the domain blocklist before dialing.
  4322. //
  4323. // The IP blocklist is checked in isPortForwardPermitted, which also provides
  4324. // IP blocklist checking for the packet tunnel code path. When hostToConnect
  4325. // is an IP address, the following hostname resolution step effectively
  4326. // performs no actions and next immediate step is the isPortForwardPermitted
  4327. // check.
  4328. //
  4329. // Limitation: this case handles port forwards where the client sends the
  4330. // destination domain in the SSH port forward request but does not currently
  4331. // handle DNS-over-TCP; in the DNS-over-TCP case, a client may bypass the
  4332. // block list check.
  4333. if net.ParseIP(hostToConnect) == nil {
  4334. ok, rejectMessage := sshClient.isDomainPermitted(hostToConnect)
  4335. if !ok {
  4336. // Note: not recording a port forward failure in this case
  4337. sshClient.rejectNewChannel(newChannel, rejectMessage)
  4338. return
  4339. }
  4340. }
  4341. // Dial the remote address.
  4342. //
  4343. // Hostname resolution is performed explicitly, as a separate step, as the
  4344. // target IP address is used for traffic rules (AllowSubnets), OSL seed
  4345. // progress, and IP address blocklists.
  4346. //
  4347. // Contexts are used for cancellation (via sshClient.runCtx, which is
  4348. // cancelled when the client is stopping) and timeouts.
  4349. dialStartTime := time.Now()
  4350. IP := net.ParseIP(hostToConnect)
  4351. if IP == nil {
  4352. // Resolve the hostname
  4353. if IsLogLevelDebug() {
  4354. log.WithTraceFields(LogFields{"hostToConnect": hostToConnect}).Debug("resolving")
  4355. }
  4356. // See comments in getDNSResolver regarding DNS cache considerations.
  4357. // The cached values may be read by concurrent goroutines and must
  4358. // not be mutated.
  4359. dnsResolver, dnsCache := sshClient.getDNSResolver()
  4360. var IPs []net.IPAddr
  4361. if dnsCache != nil {
  4362. cachedIPs, ok := dnsCache.Get(hostToConnect)
  4363. if ok {
  4364. IPs = cachedIPs.([]net.IPAddr)
  4365. }
  4366. }
  4367. var err error
  4368. var resolveElapsedTime time.Duration
  4369. if len(IPs) == 0 {
  4370. ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  4371. IPs, err = dnsResolver.LookupIPAddr(ctx, hostToConnect)
  4372. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  4373. resolveElapsedTime = time.Since(dialStartTime)
  4374. if err == nil && len(IPs) > 0 {
  4375. // Add the successful DNS response to the cache. The cache
  4376. // won't be updated in the "no such host"/IsNotFound case,
  4377. // and subsequent resolves will try new requests. The "no IP
  4378. // address" error case in the following IP selection logic
  4379. // should not be reached when len(IPs) > 0.
  4380. if dnsCache != nil {
  4381. dnsCache.Add(hostToConnect, IPs, lrucache.DefaultExpiration)
  4382. }
  4383. }
  4384. // Record DNS request metrics. If LookupIPAddr returns
  4385. // net.DNSError.IsNotFound, this is "no such host" and not a DNS
  4386. // request failure. Limitation: the DNS server IP is not known.
  4387. dnsErr, ok := err.(*net.DNSError)
  4388. dnsNotFound := ok && dnsErr.IsNotFound
  4389. dnsSuccess := err == nil || dnsNotFound
  4390. sshClient.updateQualityMetricsWithDNSResult(dnsSuccess, resolveElapsedTime, nil)
  4391. }
  4392. // IPv4 is preferred in case the host has limited IPv6 routing. IPv6 is
  4393. // selected and attempted only when there's no IPv4 option.
  4394. // TODO: shuffle list to try other IPs?
  4395. for _, ip := range IPs {
  4396. if ip.IP.To4() != nil {
  4397. IP = ip.IP
  4398. break
  4399. }
  4400. }
  4401. if IP == nil && len(IPs) > 0 {
  4402. // If there are no IPv4 IPs, the first IP is IPv6.
  4403. IP = IPs[0].IP
  4404. }
  4405. if err == nil && IP == nil {
  4406. err = std_errors.New("no IP address")
  4407. }
  4408. if err != nil {
  4409. // Record a port forward failure
  4410. sshClient.updateQualityMetricsWithDialResult(false, resolveElapsedTime, IP)
  4411. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("LookupIP failed: %s", err))
  4412. return
  4413. }
  4414. remainingDialTimeout -= resolveElapsedTime
  4415. }
  4416. if remainingDialTimeout <= 0 {
  4417. sshClient.rejectNewChannel(newChannel, "TCP port forward timed out resolving")
  4418. return
  4419. }
  4420. // When the client has indicated split tunnel mode and when the channel is
  4421. // not of type protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE, check if the
  4422. // client and the port forward destination are in the same GeoIP country. If
  4423. // so, reject the port forward with a distinct response code that indicates
  4424. // to the client that this port forward should be performed locally, direct
  4425. // and untunneled.
  4426. //
  4427. // Clients are expected to cache untunneled responses to avoid this round
  4428. // trip in the immediate future and reduce server load.
  4429. //
  4430. // When the countries differ, immediately proceed with the standard port
  4431. // forward. No additional round trip is required.
  4432. //
  4433. // If either GeoIP country is "None", one or both countries are unknown
  4434. // and there is no match.
  4435. //
  4436. // Traffic rules, such as allowed ports, are not enforced for port forward
  4437. // destinations classified as untunneled.
  4438. //
  4439. // Domain and IP blocklists still apply to port forward destinations
  4440. // classified as untunneled.
  4441. //
  4442. // The client's use of split tunnel mode is logged in server_tunnel metrics
  4443. // as the boolean value split_tunnel. As they may indicate some information
  4444. // about browsing activity, no other split tunnel metrics are logged.
  4445. if doSplitTunnel {
  4446. destinationGeoIPData := sshClient.sshServer.support.GeoIPService.LookupIP(IP)
  4447. // Use the client, not peer, GeoIP data. In the case of in-proxy tunnel
  4448. // protocols, the client GeoIP fields will be populated using the
  4449. // original client IP already received, from the broker, in the handshake.
  4450. clientGeoIPData := sshClient.getClientGeoIPData()
  4451. if clientGeoIPData.Country != GEOIP_UNKNOWN_VALUE &&
  4452. sshClient.handshakeState.splitTunnelLookup.lookup(
  4453. destinationGeoIPData.Country) {
  4454. // Since isPortForwardPermitted is not called in this case, explicitly call
  4455. // ipBlocklistCheck. The domain blocklist case is handled above.
  4456. if !sshClient.isIPPermitted(IP) {
  4457. // Note: not recording a port forward failure in this case
  4458. sshClient.rejectNewChannel(newChannel, "port forward not permitted")
  4459. return
  4460. }
  4461. _ = newChannel.Reject(protocol.CHANNEL_REJECT_REASON_SPLIT_TUNNEL, "")
  4462. return
  4463. }
  4464. }
  4465. // Enforce traffic rules, using the resolved IP address.
  4466. if !sshClient.isPortForwardPermitted(
  4467. portForwardTypeTCP, IP, portToConnect) {
  4468. // Note: not recording a port forward failure in this case
  4469. sshClient.rejectNewChannel(newChannel, "port forward not permitted")
  4470. return
  4471. }
  4472. // TCP dial.
  4473. remoteAddr := net.JoinHostPort(IP.String(), strconv.Itoa(portToConnect))
  4474. if IsLogLevelDebug() {
  4475. log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("dialing")
  4476. }
  4477. ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)
  4478. fwdConn, err := (&net.Dialer{}).DialContext(ctx, "tcp", remoteAddr)
  4479. cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"
  4480. // Record port forward success or failure
  4481. sshClient.updateQualityMetricsWithDialResult(err == nil, time.Since(dialStartTime), IP)
  4482. if err != nil {
  4483. // Monitor for low resource error conditions
  4484. sshClient.sshServer.monitorPortForwardDialError(err)
  4485. sshClient.rejectNewChannel(newChannel, fmt.Sprintf("DialTimeout failed: %s", err))
  4486. return
  4487. }
  4488. // The upstream TCP port forward connection has been established. Schedule
  4489. // some cleanup and notify the SSH client that the channel is accepted.
  4490. defer fwdConn.Close()
  4491. fwdChannel, requests, err := newChannel.Accept()
  4492. if err != nil {
  4493. if !isExpectedTunnelIOError(err) {
  4494. log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")
  4495. }
  4496. return
  4497. }
  4498. go ssh.DiscardRequests(requests)
  4499. defer fwdChannel.Close()
  4500. // Release the dialing slot and acquire an established slot.
  4501. //
  4502. // establishedPortForward increments the concurrent TCP port
  4503. // forward counter and closes the LRU existing TCP port forward
  4504. // when already at the limit.
  4505. //
  4506. // Known limitations:
  4507. //
  4508. // - Closed LRU TCP sockets will enter the TIME_WAIT state,
  4509. // continuing to consume some resources.
  4510. sshClient.establishedPortForward(portForwardTypeTCP, sshClient.tcpPortForwardLRU)
  4511. // "established = true" cancels the deferred abortedTCPPortForward()
  4512. established = true
  4513. // TODO: 64-bit alignment? https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  4514. var bytesUp, bytesDown int64
  4515. defer func() {
  4516. sshClient.closedPortForward(
  4517. portForwardTypeTCP, atomic.LoadInt64(&bytesUp), atomic.LoadInt64(&bytesDown))
  4518. }()
  4519. lruEntry := sshClient.tcpPortForwardLRU.Add(fwdConn)
  4520. defer lruEntry.Remove()
  4521. // ActivityMonitoredConn monitors the TCP port forward I/O and updates
  4522. // its LRU status. ActivityMonitoredConn also times out I/O on the port
  4523. // forward if both reads and writes have been idle for the specified
  4524. // duration.
  4525. fwdConn, err = common.NewActivityMonitoredConn(
  4526. fwdConn,
  4527. sshClient.idleTCPPortForwardTimeout(),
  4528. true,
  4529. lruEntry,
  4530. sshClient.getPortForwardActivityUpdaters(portForwardTypeTCP, IP)...)
  4531. if err != nil {
  4532. log.WithTraceFields(LogFields{"error": err}).Error("NewActivityMonitoredConn failed")
  4533. return
  4534. }
  4535. // Relay channel to forwarded connection.
  4536. if IsLogLevelDebug() {
  4537. log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("relaying")
  4538. }
  4539. // TODO: relay errors to fwdChannel.Stderr()?
  4540. relayWaitGroup := new(sync.WaitGroup)
  4541. relayWaitGroup.Add(1)
  4542. go func() {
  4543. defer relayWaitGroup.Done()
  4544. // io.Copy allocates a 32K temporary buffer, and each port forward relay
  4545. // uses two of these buffers; using common.CopyBuffer with a smaller buffer
  4546. // reduces the overall memory footprint.
  4547. bytes, err := common.CopyBuffer(
  4548. fwdChannel, fwdConn, make([]byte, SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE))
  4549. atomic.AddInt64(&bytesDown, bytes)
  4550. if err != nil && err != io.EOF {
  4551. // Debug since errors such as "connection reset by peer" occur during normal operation
  4552. if IsLogLevelDebug() {
  4553. log.WithTraceFields(LogFields{"error": err}).Debug("downstream TCP relay failed")
  4554. }
  4555. }
  4556. // Interrupt upstream io.Copy when downstream is shutting down.
  4557. // TODO: this is done to quickly cleanup the port forward when
  4558. // fwdConn has a read timeout, but is it clean -- upstream may still
  4559. // be flowing?
  4560. fwdChannel.Close()
  4561. }()
  4562. bytes, err := common.CopyBuffer(
  4563. fwdConn, fwdChannel, make([]byte, SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE))
  4564. atomic.AddInt64(&bytesUp, bytes)
  4565. if err != nil && err != io.EOF {
  4566. if IsLogLevelDebug() {
  4567. log.WithTraceFields(LogFields{"error": err}).Debug("upstream TCP relay failed")
  4568. }
  4569. }
  4570. // Shutdown special case: fwdChannel will be closed and return EOF when
  4571. // the SSH connection is closed, but we need to explicitly close fwdConn
  4572. // to interrupt the downstream io.Copy, which may be blocked on a
  4573. // fwdConn.Read().
  4574. fwdConn.Close()
  4575. relayWaitGroup.Wait()
  4576. if IsLogLevelDebug() {
  4577. log.WithTraceFields(
  4578. LogFields{
  4579. "remoteAddr": remoteAddr,
  4580. "bytesUp": atomic.LoadInt64(&bytesUp),
  4581. "bytesDown": atomic.LoadInt64(&bytesDown)}).Debug("exiting")
  4582. }
  4583. }
  4584. func (sshClient *sshClient) getDNSResolver() (*net.Resolver, *lrucache.Cache) {
  4585. // Initialize the DNS resolver and cache used by handleTCPChannel in cases
  4586. // where the client sends unresolved domains through to psiphond. The
  4587. // resolver and cache are allocated on demand, to avoid overhead for
  4588. // clients that don't require this functionality.
  4589. //
  4590. // The standard library net.Resolver is used, with one instance per client
  4591. // to get the advantage of the "singleflight" functionality, where
  4592. // concurrent DNS lookups for the same domain are coalesced into a single
  4593. // in-flight DNS request.
  4594. //
  4595. // net.Resolver reads its configuration from /etc/resolv.conf, including a
  4596. // list of DNS servers, the number or retries to attempt, and whether to
  4597. // rotate the initial DNS server selection.
  4598. //
  4599. // In addition, a cache of successful DNS lookups is maintained to avoid
  4600. // rapid repeats DNS requests for the same domain. Since actual DNS
  4601. // response TTLs are not exposed by net.Resolver, the cache should be
  4602. // configured with a conservative TTL -- 10s of seconds.
  4603. //
  4604. // Each client has its own singleflight resolver and cache, which avoids
  4605. // leaking domain access information between clients. The cache should be
  4606. // configured with a modest max size appropriate for allocating one cache
  4607. // per client.
  4608. //
  4609. // As a potential future enhancement, consider using the custom DNS
  4610. // resolver, psiphon/common/resolver.Resolver, combined with the existing
  4611. // DNS server fetcher, SupportServices.DNSResolver. This resolver
  4612. // includes a cache which will respect the true TTL values in DNS
  4613. // responses; and randomly distributes load over the available DNS
  4614. // servers. Note the current limitations documented in
  4615. // Resolver.ResolveIP, which must be addressed.
  4616. sshClient.Lock()
  4617. defer sshClient.Unlock()
  4618. if sshClient.dnsResolver != nil {
  4619. return sshClient.dnsResolver, sshClient.dnsCache
  4620. }
  4621. // PreferGo, equivalent to GODEBUG=netdns=go, is specified in order to
  4622. // avoid any cases where Go's resolver fails over to the cgo-based
  4623. // resolver (see https://pkg.go.dev/net#hdr-Name_Resolution). Such
  4624. // cases, if they resolve at all, may be expected to resolve to bogon
  4625. // IPs that won't be permitted; but the cgo invocation will consume
  4626. // an OS thread, which is a performance hit we can avoid.
  4627. sshClient.dnsResolver = &net.Resolver{PreferGo: true}
  4628. // Get the server DNS resolver cache parameters from tactics. In the case
  4629. // of an error, no tactics, or zero values no cache is initialized and
  4630. // getDNSResolver initializes only the resolver and returns a nil cache.
  4631. //
  4632. // Limitations:
  4633. // - assumes no GeoIP targeting for server DNS resolver cache parameters
  4634. // - an individual client's cache is not reconfigured on tactics reloads
  4635. p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(NewGeoIPData())
  4636. if err != nil {
  4637. log.WithTraceFields(LogFields{"error": err}).Warning("get tactics failed")
  4638. return sshClient.dnsResolver, nil
  4639. }
  4640. if p.IsNil() {
  4641. return sshClient.dnsResolver, nil
  4642. }
  4643. maxSize := p.Int(parameters.ServerDNSResolverCacheMaxSize)
  4644. TTL := p.Duration(parameters.ServerDNSResolverCacheTTL)
  4645. if maxSize == 0 || TTL == 0 {
  4646. return sshClient.dnsResolver, nil
  4647. }
  4648. sshClient.dnsCache = lrucache.NewWithLRU(TTL, 1*time.Minute, maxSize)
  4649. return sshClient.dnsResolver, sshClient.dnsCache
  4650. }