| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201 |
- /*
- * Copyright (c) 2017, Psiphon Inc.
- * All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- */
- // Copyright 2009 The Go Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- /*
- Package tun is an IP packet tunnel server and client. It supports tunneling
- both IPv4 and IPv6.
- ......................................................... .-,( ),-.
- . [server] .-----. . .-( )-.
- . | NIC |<---->( Internet )
- . ....................................... '-----' . '-( ).-'
- . . [packet tunnel daemon] . ^ . '-.( ).-'
- . . . | .
- . . ........................... . | .
- . . . [session] . . NAT .
- . . . . . | .
- . . . . . v .
- . . . . . .---. .
- . . . . . | t | .
- . . . . . | u | .
- . . . .---. . .---. . | n | .
- . . . | q | . | d | . | | .
- . . . | u | . | e | . | d | .
- . . . .------| e |<-----| m |<---------| e | .
- . . . | | u | . | u | . | v | .
- . . . | | e | . | x | . | i | .
- . . . rewrite '---' . '---' . | c | .
- . . . | . . | e | .
- . . . v . . '---' .
- . . . .---------. . . ^ .
- . . . | channel |--rewrite--------------------' .
- . . . '---------' . . .
- . . ...........^............... . .
- . .............|......................... .
- ...............|.........................................
- |
- | (typically via Internet)
- |
- ...............|.................
- . [client] | .
- . | .
- . .............|............... .
- . . v . .
- . . .---------. . .
- . . | channel | . .
- . . '---------' . .
- . . ^ . .
- . .............|............... .
- . v .
- . .------------. .
- . | tun device | .
- . '------------' .
- .................................
- The client relays IP packets between a local tun device and a channel, which
- is a transport to the server. In Psiphon, the channel will be an SSH channel
- within an SSH connection to a Psiphon server.
- The server relays packets between each client and its own tun device. The
- server tun device is NATed to the Internet via an external network interface.
- In this way, client traffic is tunneled and will egress from the server host.
- Similar to a typical VPN, IP addresses are assigned to each client. Unlike
- a typical VPN, the assignment is not transmitted to the client. Instead, the
- server transparently rewrites the source addresses of client packets to
- the assigned IP address. The server also rewrites the destination address of
- certain DNS packets. The purpose of this is to allow clients to reconnect
- to different servers without having to tear down or change their local
- network configuration. Clients may configure their local tun device with an
- arbitrary IP address and an arbitrary DNS resolver address.
- The server uses the 24-bit 10.0.0.0/8 IPv4 private address space to maximize
- the number of addresses available, due to Psiphon client churn and minimum
- address lease time constraints. For IPv6, a 24-bit unique local space is used.
- When a client is allocated addresses, a unique, unused 24-bit "index" is
- reserved/leased. This index maps to and from IPv4 and IPv6 private addresses.
- The server multiplexes all client packets into a single tun device. When a
- packet is read, the destination address is used to map the packet back to the
- correct index, which maps back to the client.
- The server maintains client "sessions". A session maintains client IP
- address state and effectively holds the lease on assigned addresses. If a
- client is disconnected and quickly reconnects, it will resume its previous
- session, retaining its IP address and network connection states. Idle
- sessions with no client connection will eventually expire.
- Packet count and bytes transferred metrics are logged for each client session.
- The server integrates with and enforces Psiphon traffic rules and logging
- facilities. The server parses and validates packets. Client-to-client packets
- are not permitted. Only global unicast packets are permitted. Only TCP and UDP
- packets are permitted. The client also filters out, before sending, packets
- that the server won't route.
- Certain aspects of packet tunneling are outside the scope of this package;
- e.g, the Psiphon client and server are responsible for establishing an SSH
- channel and negotiating the correct MTU and DNS settings. The Psiphon
- server will call Server.ClientConnected when a client connects and establishes
- a packet tunnel channel; and Server.ClientDisconnected when the client closes
- the channel and/or disconnects.
- */
- package tun
- import (
- "context"
- "encoding/binary"
- "fmt"
- "io"
- "math/rand"
- "net"
- "sync"
- "sync/atomic"
- "time"
- "unsafe"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/monotime"
- "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
- )
- const (
- DEFAULT_MTU = 1500
- DEFAULT_DOWNSTREAM_PACKET_QUEUE_SIZE = 32768 * 16
- DEFAULT_UPSTREAM_PACKET_QUEUE_SIZE = 32768
- DEFAULT_IDLE_SESSION_EXPIRY_SECONDS = 300
- ORPHAN_METRICS_CHECKPOINTER_PERIOD = 30 * time.Minute
- FLOW_IDLE_EXPIRY = 60 * time.Second
- )
- // ServerConfig specifies the configuration of a packet tunnel server.
- type ServerConfig struct {
- // Logger is used for logging events and metrics.
- Logger common.Logger
- // SudoNetworkConfigCommands specifies whether to use "sudo"
- // when executing network configuration commands. This is required
- // when the packet tunnel server is not run as root and when
- // process capabilities are not available (only Linux kernel 4.3+
- // has the required capabilities support). The host sudoers file
- // must be configured to allow the tunnel server process user to
- // execute the commands invoked in configureServerInterface; see
- // the implementation for the appropriate platform.
- SudoNetworkConfigCommands bool
- // AllowNoIPv6NetworkConfiguration indicates that failures while
- // configuring tun interfaces and routing for IPv6 are to be
- // logged as warnings only. This option is intended to support
- // test cases on hosts without IPv6 and is not for production use;
- // the packet tunnel server will still accept IPv6 packets and
- // relay them to the tun device.
- // AllowNoIPv6NetworkConfiguration may not be supported on all
- // platforms.
- AllowNoIPv6NetworkConfiguration bool
- // EgressInterface is the interface to which client traffic is
- // masqueraded/NATed. For example, "eth0". If blank, a platform-
- // appropriate default is used.
- EgressInterface string
- // GetDNSResolverIPv4Addresses is a function which returns the
- // DNS resolvers to use as transparent DNS rewrite targets for
- // IPv4 DNS traffic.
- //
- // GetDNSResolverIPv4Addresses is invoked for each new client
- // session and the list of resolvers is stored with the session.
- // This is a compromise between checking current resolvers for
- // each packet (too expensive) and simply passing in a static
- // list (won't pick up resolver changes). As implemented, only
- // new client sessions will pick up resolver changes.
- //
- // Transparent DNS rewriting occurs when the client uses the
- // specific, target transparent DNS addresses specified by
- // GetTransparentDNSResolverIPv4/6Address.
- //
- // For outbound DNS packets with a target resolver IP address,
- // a random resolver is selected and used for the rewrite.
- // For inbound packets, _any_ resolver in the list is rewritten
- // back to the target resolver IP address. As a side-effect,
- // responses to client DNS packets originally destined for a
- // resolver in GetDNSResolverIPv4Addresses will be lost.
- GetDNSResolverIPv4Addresses func() []net.IP
- // GetDNSResolverIPv6Addresses is a function which returns the
- // DNS resolvers to use as transparent DNS rewrite targets for
- // IPv6 DNS traffic. It functions like GetDNSResolverIPv4Addresses.
- GetDNSResolverIPv6Addresses func() []net.IP
- // EnableDNSFlowTracking specifies whether to apply flow tracking to DNS
- // flows, as required for DNS quality metrics. Typically there are many
- // short-lived DNS flows to track and each tracked flow adds some overhead,
- // so this defaults to off.
- EnableDNSFlowTracking bool
- // DownstreamPacketQueueSize specifies the size of the downstream
- // packet queue. The packet tunnel server multiplexes all client
- // packets through a single tun device, so when a packet is read,
- // it must be queued or dropped if it cannot be immediately routed
- // to the appropriate client. Note that the TCP and SSH windows
- // for the underlying channel transport will impact transfer rate
- // and queuing.
- // When DownstreamPacketQueueSize is 0, a default value tuned for
- // Psiphon is used.
- DownstreamPacketQueueSize int
- // MTU specifies the maximum transmission unit for the packet
- // tunnel. Clients must be configured with the same MTU. The
- // server's tun device will be set to this MTU value and is
- // assumed not to change for the duration of the server.
- // When MTU is 0, a default value is used.
- MTU int
- // SessionIdleExpirySeconds specifies how long to retain client
- // sessions which have no client attached. Sessions are retained
- // across client connections so reconnecting clients can resume
- // a previous session. Resuming avoids leasing new IP addresses
- // for reconnection, and also retains NAT state for active
- // tunneled connections.
- //
- // SessionIdleExpirySeconds is also, effectively, the lease
- // time for assigned IP addresses.
- SessionIdleExpirySeconds int
- // AllowBogons disables bogon checks. This should be used only
- // for testing.
- AllowBogons bool
- }
- // Server is a packet tunnel server. A packet tunnel server
- // maintains client sessions, relays packets through client
- // channels, and multiplexes packets through a single tun
- // device. The server assigns IP addresses to clients, performs
- // IP address and transparent DNS rewriting, and enforces
- // traffic rules.
- type Server struct {
- config *ServerConfig
- device *Device
- indexToSession sync.Map
- sessionIDToIndex sync.Map
- connectedInProgress *sync.WaitGroup
- workers *sync.WaitGroup
- runContext context.Context
- stopRunning context.CancelFunc
- orphanMetrics *packetMetrics
- }
- // NewServer initializes a server.
- func NewServer(config *ServerConfig) (*Server, error) {
- device, err := NewServerDevice(config)
- if err != nil {
- return nil, errors.Trace(err)
- }
- runContext, stopRunning := context.WithCancel(context.Background())
- return &Server{
- config: config,
- device: device,
- connectedInProgress: new(sync.WaitGroup),
- workers: new(sync.WaitGroup),
- runContext: runContext,
- stopRunning: stopRunning,
- orphanMetrics: new(packetMetrics),
- }, nil
- }
- // Start starts a server and returns with it running.
- func (server *Server) Start() {
- server.config.Logger.WithTrace().Info("starting")
- server.workers.Add(1)
- go server.runSessionReaper()
- server.workers.Add(1)
- go server.runOrphanMetricsCheckpointer()
- server.workers.Add(1)
- go server.runDeviceDownstream()
- }
- // Stop halts a running server.
- func (server *Server) Stop() {
- server.config.Logger.WithTrace().Info("stopping")
- server.stopRunning()
- // Interrupt blocked device read/writes.
- server.device.Close()
- // Wait for any in-progress ClientConnected calls to complete.
- server.connectedInProgress.Wait()
- // After this point, no further clients will be added: all
- // in-progress ClientConnected calls have finished; and any
- // later ClientConnected calls won't get past their
- // server.runContext.Done() checks.
- // Close all clients. Client workers will be joined
- // by the following server.workers.Wait().
- server.indexToSession.Range(func(_, value interface{}) bool {
- session := value.(*session)
- server.interruptSession(session)
- return true
- })
- server.workers.Wait()
- server.config.Logger.WithTrace().Info("stopped")
- }
- // AllowedPortChecker is a function which returns true when it is
- // permitted to relay packets to the specified upstream IP address
- // and/or port.
- type AllowedPortChecker func(upstreamIPAddress net.IP, port int) bool
- // AllowedDomainChecker is a function which returns true when it is
- // permitted to resolve the specified domain name.
- type AllowedDomainChecker func(string) bool
- // FlowActivityUpdater defines an interface for receiving updates for
- // flow activity. Values passed to UpdateProgress are bytes transferred
- // and flow duration since the previous UpdateProgress.
- type FlowActivityUpdater interface {
- UpdateProgress(downstreamBytes, upstreamBytes, durationNanoseconds int64)
- }
- // FlowActivityUpdaterMaker is a function which returns a list of
- // appropriate updaters for a new flow to the specified upstream
- // hostname (if known -- may be ""), and IP address.
- // The flow is TCP when isTCP is true, and UDP otherwise.
- type FlowActivityUpdaterMaker func(
- isTCP bool, upstreamHostname string, upstreamIPAddress net.IP) []FlowActivityUpdater
- // MetricsUpdater is a function which receives a checkpoint summary
- // of application bytes transferred through a packet tunnel.
- type MetricsUpdater func(
- TCPApplicationBytesDown, TCPApplicationBytesUp,
- UDPApplicationBytesDown, UDPApplicationBytesUp int64)
- // DNSQualityReporter is a function which receives a DNS quality report:
- // whether a DNS request received a reponse, the elapsed time, and the
- // resolver used.
- type DNSQualityReporter func(
- receivedResponse bool, requestDuration time.Duration, resolverIP net.IP)
- // ClientConnected handles new client connections, creating or resuming
- // a session and returns with client packet handlers running.
- //
- // sessionID is used to identify sessions for resumption.
- //
- // transport provides the channel for relaying packets to and from
- // the client.
- //
- // checkAllowedTCPPortFunc/checkAllowedUDPPortFunc/checkAllowedDomainFunc
- // are callbacks used to enforce traffic rules. For each TCP/UDP flow, the
- // corresponding AllowedPort function is called to check if traffic to the
- // packet's port is permitted. For upstream DNS query packets,
- // checkAllowedDomainFunc is called to check if domain resolution is
- // permitted. These callbacks must be efficient and safe for concurrent
- // calls.
- //
- // flowActivityUpdaterMaker is a callback invoked for each new packet
- // flow; it may create updaters to track flow activity.
- //
- // metricsUpdater is a callback invoked at metrics checkpoints (usually
- // when the client disconnects) with a summary of application bytes
- // transferred.
- //
- // It is safe to make concurrent calls to ClientConnected for distinct
- // session IDs. The caller is responsible for serializing calls with the
- // same session ID. Further, the caller must ensure, in the case of a client
- // transport reconnect when an existing transport has not yet disconnected,
- // that ClientDisconnected is called first -- so it doesn't undo the new
- // ClientConnected. (psiphond meets these constraints by closing any
- // existing SSH client with duplicate session ID early in the lifecycle of
- // a new SSH client connection.)
- func (server *Server) ClientConnected(
- sessionID string,
- transport io.ReadWriteCloser,
- checkAllowedTCPPortFunc, checkAllowedUDPPortFunc AllowedPortChecker,
- checkAllowedDomainFunc AllowedDomainChecker,
- flowActivityUpdaterMaker FlowActivityUpdaterMaker,
- metricsUpdater MetricsUpdater,
- dnsQualityReporter DNSQualityReporter) error {
- // It's unusual to call both sync.WaitGroup.Add() _and_ Done() in the same
- // goroutine. There's no other place to call Add() since ClientConnected is
- // an API entrypoint. And Done() works because the invariant enforced by
- // connectedInProgress.Wait() is not that no ClientConnected calls are in
- // progress, but that no such calls are in progress past the
- // server.runContext.Done() check.
- // TODO: will this violate https://golang.org/pkg/sync/#WaitGroup.Add:
- // "calls with a positive delta that occur when the counter is zero must happen before a Wait"?
- server.connectedInProgress.Add(1)
- defer server.connectedInProgress.Done()
- select {
- case <-server.runContext.Done():
- return errors.TraceNew("server stopping")
- default:
- }
- server.config.Logger.WithTraceFields(
- common.LogFields{"sessionID": sessionID}).Debug("client connected")
- MTU := getMTU(server.config.MTU)
- clientSession := server.getSession(sessionID)
- if clientSession != nil {
- // Call interruptSession to ensure session is in the
- // expected idle state.
- server.interruptSession(clientSession)
- // Note: we don't check the session expiry; whether it has
- // already expired and not yet been reaped; or is about
- // to expire very shortly. It could happen that the reaper
- // will kill this session between now and when the expiry
- // is reset in the following resumeSession call. In this
- // unlikely case, the packet tunnel client should reconnect.
- } else {
- // Store IPv4 resolver addresses in 4-byte representation
- // for use in rewritting.
- resolvers := server.config.GetDNSResolverIPv4Addresses()
- DNSResolverIPv4Addresses := make([]net.IP, len(resolvers))
- for i, resolver := range resolvers {
- // Assumes To4 is non-nil
- DNSResolverIPv4Addresses[i] = resolver.To4()
- }
- clientSession = &session{
- allowBogons: server.config.AllowBogons,
- sessionID: sessionID,
- metrics: new(packetMetrics),
- enableDNSFlowTracking: server.config.EnableDNSFlowTracking,
- DNSResolverIPv4Addresses: append([]net.IP(nil), DNSResolverIPv4Addresses...),
- DNSResolverIPv6Addresses: append([]net.IP(nil), server.config.GetDNSResolverIPv6Addresses()...),
- workers: new(sync.WaitGroup),
- }
- clientSession.lastActivity.Store(int64(monotime.Now()))
- // One-time, for this session, random resolver selection for TCP transparent
- // DNS forwarding. See comment in processPacket.
- if len(clientSession.DNSResolverIPv4Addresses) > 0 {
- clientSession.TCPDNSResolverIPv4Index = prng.Intn(len(clientSession.DNSResolverIPv4Addresses))
- }
- if len(clientSession.DNSResolverIPv6Addresses) > 0 {
- clientSession.TCPDNSResolverIPv6Index = prng.Intn(len(clientSession.DNSResolverIPv6Addresses))
- }
- // allocateIndex initializes session.index, session.assignedIPv4Address,
- // and session.assignedIPv6Address; and updates server.indexToSession and
- // server.sessionIDToIndex.
- err := server.allocateIndex(clientSession)
- if err != nil {
- return errors.Trace(err)
- }
- }
- // Note: it's possible that a client disconnects (or reconnects before a
- // disconnect is detected) and interruptSession is called between
- // allocateIndex and resumeSession calls here, so interruptSession and
- // related code must not assume resumeSession has been called.
- server.resumeSession(
- clientSession,
- NewChannel(transport, MTU),
- checkAllowedTCPPortFunc,
- checkAllowedUDPPortFunc,
- checkAllowedDomainFunc,
- flowActivityUpdaterMaker,
- metricsUpdater,
- dnsQualityReporter)
- return nil
- }
- // ClientDisconnected handles clients disconnecting. Packet handlers
- // are halted, but the client session is left intact to reserve the
- // assigned IP addresses and retain network state in case the client
- // soon reconnects.
- func (server *Server) ClientDisconnected(sessionID string) {
- session := server.getSession(sessionID)
- if session != nil {
- server.config.Logger.WithTraceFields(
- common.LogFields{"sessionID": sessionID}).Debug("client disconnected")
- server.interruptSession(session)
- }
- }
- func (server *Server) getSession(sessionID string) *session {
- if index, ok := server.sessionIDToIndex.Load(sessionID); ok {
- s, ok := server.indexToSession.Load(index.(int32))
- if ok {
- return s.(*session)
- }
- server.config.Logger.WithTrace().Warning("unexpected missing session")
- }
- return nil
- }
- func (server *Server) resumeSession(
- session *session,
- channel *Channel,
- checkAllowedTCPPortFunc, checkAllowedUDPPortFunc AllowedPortChecker,
- checkAllowedDomainFunc AllowedDomainChecker,
- flowActivityUpdaterMaker FlowActivityUpdaterMaker,
- metricsUpdater MetricsUpdater,
- dnsQualityReporter DNSQualityReporter) {
- session.mutex.Lock()
- defer session.mutex.Unlock()
- // Performance/concurrency note: the downstream packet queue
- // and various packet event callbacks may be accessed while
- // the session is idle, via the runDeviceDownstream goroutine,
- // which runs concurrent to resumeSession/interruptSession calls.
- // Consequently, all accesses to these fields must be
- // synchronized.
- //
- // Benchmarking indicates the atomic.LoadPointer mechanism
- // outperforms a mutex; approx. 2 ns/op vs. 20 ns/op in the case
- // of getCheckAllowedTCPPortFunc. Since these accesses occur
- // multiple times per packet, atomic.LoadPointer is used and so
- // each of these fields is an unsafe.Pointer in the session
- // struct.
- // Begin buffering downstream packets.
- downstreamPacketQueueSize := DEFAULT_DOWNSTREAM_PACKET_QUEUE_SIZE
- if server.config.DownstreamPacketQueueSize > 0 {
- downstreamPacketQueueSize = server.config.DownstreamPacketQueueSize
- }
- downstreamPackets := NewPacketQueue(downstreamPacketQueueSize)
- session.setDownstreamPackets(downstreamPackets)
- // Set new access control, flow monitoring, and metrics
- // callbacks; all associated with the new client connection.
- // IMPORTANT: any new callbacks or references to the outer client added
- // here must be cleared in interruptSession to ensure that a paused
- // session does not retain references to old client connection objects
- // after the client disconnects.
- session.setCheckAllowedTCPPortFunc(&checkAllowedTCPPortFunc)
- session.setCheckAllowedUDPPortFunc(&checkAllowedUDPPortFunc)
- session.setCheckAllowedDomainFunc(&checkAllowedDomainFunc)
- session.setFlowActivityUpdaterMaker(&flowActivityUpdaterMaker)
- session.setMetricsUpdater(&metricsUpdater)
- session.setDNSQualityReporter(&dnsQualityReporter)
- session.channel = channel
- // Parent context is not server.runContext so that session workers
- // need only check session.stopRunning to act on shutdown events.
- session.runContext, session.stopRunning = context.WithCancel(context.Background())
- // When a session is interrupted, all goroutines in session.workers
- // are joined. When the server is stopped, all goroutines in
- // server.workers are joined. So, in both cases we synchronously
- // stop all workers associated with this session.
- session.workers.Add(1)
- go server.runClientUpstream(session)
- session.workers.Add(1)
- go server.runClientDownstream(session)
- session.touch()
- }
- func (server *Server) interruptSession(session *session) {
- session.mutex.Lock()
- defer session.mutex.Unlock()
- wasRunning := (session.channel != nil)
- if session.stopRunning != nil {
- session.stopRunning()
- }
- if session.channel != nil {
- // Interrupt blocked channel read/writes.
- session.channel.Close()
- }
- session.workers.Wait()
- if session.channel != nil {
- // Don't hold a reference to channel, allowing both it and
- // its conn to be garbage collected.
- // Setting channel to nil must happen after workers.Wait()
- // to ensure no goroutine remains which may access
- // session.channel.
- session.channel = nil
- }
- metricsUpdater := session.getMetricsUpdater()
- // interruptSession may be called for idle sessions, to ensure
- // the session is in an expected state: in ClientConnected,
- // and in server.Stop(); don't log in those cases.
- if wasRunning {
- session.metrics.checkpoint(
- server.config.Logger,
- metricsUpdater,
- "server_packet_metrics",
- packetMetricsAll)
- }
- // Release the downstream packet buffer, so the associated
- // memory is not consumed while no client is connected.
- //
- // Since runDeviceDownstream continues to run and will access
- // session.downstreamPackets, an atomic pointer is used to
- // synchronize access.
- session.setDownstreamPackets(nil)
- session.setCheckAllowedTCPPortFunc(nil)
- session.setCheckAllowedUDPPortFunc(nil)
- session.setCheckAllowedDomainFunc(nil)
- session.setFlowActivityUpdaterMaker(nil)
- session.setMetricsUpdater(nil)
- session.setDNSQualityReporter(nil)
- }
- func (server *Server) runSessionReaper() {
- defer server.workers.Done()
- // Periodically iterate over all sessions and discard expired
- // sessions. This action, removing the index from server.indexToSession,
- // releases the IP addresses assigned to the session.
- // TODO: As-is, this will discard sessions for live SSH tunnels,
- // as long as the SSH channel for such a session has been idle for
- // a sufficient period. Should the session be retained as long as
- // the SSH tunnel is alive (e.g., expose and call session.touch()
- // on keepalive events)? Or is it better to free up resources held
- // by idle sessions?
- idleExpiry := server.sessionIdleExpiry()
- ticker := time.NewTicker(idleExpiry / 2)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- server.indexToSession.Range(func(_, value interface{}) bool {
- session := value.(*session)
- if session.expired(idleExpiry) {
- server.removeSession(session)
- }
- return true
- })
- case <-server.runContext.Done():
- return
- }
- }
- }
- func (server *Server) sessionIdleExpiry() time.Duration {
- sessionIdleExpirySeconds := DEFAULT_IDLE_SESSION_EXPIRY_SECONDS
- if server.config.SessionIdleExpirySeconds > 2 {
- sessionIdleExpirySeconds = server.config.SessionIdleExpirySeconds
- }
- return time.Duration(sessionIdleExpirySeconds) * time.Second
- }
- func (server *Server) removeSession(session *session) {
- server.sessionIDToIndex.Delete(session.sessionID)
- server.indexToSession.Delete(session.index)
- server.interruptSession(session)
- // Delete flows to ensure any pending flow metrics are reported.
- session.deleteFlows()
- }
- func (server *Server) runOrphanMetricsCheckpointer() {
- defer server.workers.Done()
- // Periodically log orphan packet metrics. Orphan metrics
- // are not associated with any session. This includes
- // packets that are rejected before they can be associated
- // with a session.
- ticker := time.NewTicker(ORPHAN_METRICS_CHECKPOINTER_PERIOD)
- defer ticker.Stop()
- for {
- done := false
- select {
- case <-ticker.C:
- case <-server.runContext.Done():
- done = true
- }
- // TODO: skip log if all zeros?
- server.orphanMetrics.checkpoint(
- server.config.Logger,
- nil,
- "server_orphan_packet_metrics",
- packetMetricsRejected)
- if done {
- return
- }
- }
- }
- func (server *Server) runDeviceDownstream() {
- defer server.workers.Done()
- // Read incoming packets from the tun device, parse and validate the
- // packets, map them to a session/client, perform rewriting, and relay
- // the packets to the client.
- for {
- readPacket, err := server.device.ReadPacket()
- select {
- case <-server.runContext.Done():
- // No error is logged as shutdown may have interrupted read.
- return
- default:
- }
- if err != nil {
- server.config.Logger.WithTraceFields(
- common.LogFields{"error": err}).Warning("read device packet failed")
- // May be temporary error condition, keep reading.
- continue
- }
- // destinationIPAddress determines which client receives this packet.
- // At this point, only enough of the packet is inspected to determine
- // this routing info; further validation happens in subsequent
- // processPacket in runClientDownstream.
- // Note that masquerading/NAT stands between the Internet and the tun
- // device, so arbitrary packets cannot be sent through to this point.
- // TODO: getPacketDestinationIPAddress and processPacket perform redundant
- // packet parsing; refactor to avoid extra work?
- destinationIPAddress, ok := getPacketDestinationIPAddress(
- server.orphanMetrics, packetDirectionServerDownstream, readPacket)
- if !ok {
- // Packet is dropped. Reason will be counted in orphan metrics.
- continue
- }
- // Map destination IP address to client session.
- index := server.convertIPAddressToIndex(destinationIPAddress)
- s, ok := server.indexToSession.Load(index)
- if !ok {
- server.orphanMetrics.rejectedPacket(
- packetDirectionServerDownstream, packetRejectNoSession)
- continue
- }
- session := s.(*session)
- downstreamPackets := session.getDownstreamPackets()
- // No downstreamPackets buffer is maintained when no client is
- // connected, so the packet is dropped.
- if downstreamPackets == nil {
- server.orphanMetrics.rejectedPacket(
- packetDirectionServerDownstream, packetRejectNoClient)
- continue
- }
- // Simply enqueue the packet for client handling, and move on to
- // read the next packet. The packet tunnel server multiplexes all
- // client packets through a single tun device, so we must not block
- // on client channel I/O here.
- //
- // When the queue is full, the packet is dropped. This is standard
- // behavior for routers, VPN servers, etc.
- //
- // TODO: processPacket is performed here, instead of runClientDownstream,
- // since packets are packed contiguously into the packet queue and if
- // the packet it to be omitted, that should be done before enqueuing.
- // The potential downside is that all packet processing is done in this
- // single thread of execution, blocking the next packet for the next
- // client. Try handing off the packet to another worker which will
- // call processPacket and Enqueue?
- // In downstream mode, processPacket rewrites the destination address
- // to the original client source IP address, and also rewrites DNS
- // packets. As documented in runClientUpstream, the original address
- // should already be populated via an upstream packet; if not, the
- // packet will be rejected.
- if !processPacket(
- session.metrics,
- session,
- nil,
- packetDirectionServerDownstream,
- readPacket) {
- // Packet is rejected and dropped. Reason will be counted in metrics.
- continue
- }
- downstreamPackets.Enqueue(readPacket)
- }
- }
- func (server *Server) runClientUpstream(session *session) {
- defer session.workers.Done()
- // Read incoming packets from the client channel, validate the packets,
- // perform rewriting, and send them through to the tun device.
- for {
- readPacket, err := session.channel.ReadPacket()
- select {
- case <-session.runContext.Done():
- // No error is logged as shutdown may have interrupted read.
- return
- default:
- }
- if err != nil {
- // Debug since channel I/O errors occur during normal operation.
- server.config.Logger.WithTraceFields(
- common.LogFields{"error": err}).Debug("read channel packet failed")
- // Tear down the session. Must be invoked asynchronously.
- go server.interruptSession(session)
- return
- }
- session.touch()
- // processPacket transparently rewrites the source address to the
- // session's assigned address and rewrites the destination of any
- // DNS packets destined to the transparent DNS resolver.
- //
- // The first time the source address is rewritten, the original
- // value is recorded so inbound packets can have the reverse
- // rewrite applied. This assumes that the client will send a
- // packet before receiving any packet, which is the case since
- // only clients can initiate TCP or UDP connections or flows.
- if !processPacket(
- session.metrics,
- session,
- nil,
- packetDirectionServerUpstream,
- readPacket) {
- // Packet is rejected and dropped. Reason will be counted in metrics.
- continue
- }
- err = server.device.WritePacket(readPacket)
- if err != nil {
- server.config.Logger.WithTraceFields(
- common.LogFields{"error": err}).Warning("write device packet failed")
- // May be temporary error condition, keep working. The packet is
- // most likely dropped.
- continue
- }
- }
- }
- func (server *Server) runClientDownstream(session *session) {
- defer session.workers.Done()
- // Dequeue, process, and relay packets to be sent to the client channel.
- for {
- downstreamPackets := session.getDownstreamPackets()
- // Note: downstreamPackets will not be nil, since this goroutine only
- // runs while the session has a connected client.
- packetBuffer, ok := downstreamPackets.DequeueFramedPackets(session.runContext)
- if !ok {
- // Dequeue aborted due to session.runContext.Done()
- return
- }
- err := session.channel.WriteFramedPackets(packetBuffer)
- if err != nil {
- // Debug since channel I/O errors occur during normal operation.
- server.config.Logger.WithTraceFields(
- common.LogFields{"error": err}).Debug("write channel packets failed")
- downstreamPackets.Replace(packetBuffer)
- // Tear down the session. Must be invoked asynchronously.
- go server.interruptSession(session)
- return
- }
- session.touch()
- downstreamPackets.Replace(packetBuffer)
- }
- }
- var (
- serverIPv4AddressCIDR = "10.0.0.1/8"
- transparentDNSResolverIPv4Address = net.ParseIP("10.0.0.2").To4() // 4-byte for rewriting
- _, privateSubnetIPv4, _ = net.ParseCIDR("10.0.0.0/8")
- assignedIPv4AddressTemplate = "10.%d.%d.%d"
- serverIPv6AddressCIDR = "fd19:ca83:e6d5:1c44:0000:0000:0000:0001/64"
- transparentDNSResolverIPv6Address = net.ParseIP("fd19:ca83:e6d5:1c44:0000:0000:0000:0002")
- _, privateSubnetIPv6, _ = net.ParseCIDR("fd19:ca83:e6d5:1c44::/64")
- assignedIPv6AddressTemplate = "fd19:ca83:e6d5:1c44:8c57:4434:ee%02x:%02x%02x"
- )
- func (server *Server) allocateIndex(newSession *session) error {
- // Find and assign an available index in the 24-bit index space.
- // The index directly maps to and so determines the assigned
- // IPv4 and IPv6 addresses.
- // Search is a random index selection followed by a linear probe.
- // TODO: is this the most effective (fast on average, simple) algorithm?
- max := 0x00FFFFFF
- randomInt := prng.Intn(max + 1)
- index := int32(randomInt)
- index &= int32(max)
- idleExpiry := server.sessionIdleExpiry()
- for tries := 0; tries < 100000; index++ {
- tries++
- // The index/address space isn't exactly 24-bits:
- // - 0 and 0x00FFFFFF are reserved since they map to
- // the network identifier (10.0.0.0) and broadcast
- // address (10.255.255.255) respectively
- // - 1 is reserved as the server tun device address,
- // (10.0.0.1, and IPv6 equivalent)
- // - 2 is reserved as the transparent DNS target
- // address (10.0.0.2, and IPv6 equivalent)
- if index <= 2 {
- continue
- }
- if index == 0x00FFFFFF {
- index = 0
- continue
- }
- IPv4Address := server.convertIndexToIPv4Address(index).To4()
- IPv6Address := server.convertIndexToIPv6Address(index)
- // Ensure that the index converts to valid IPs. This is not expected
- // to fail, but continuing with nil IPs will silently misroute
- // packets with rewritten source IPs.
- if IPv4Address == nil || IPv6Address == nil {
- server.config.Logger.WithTraceFields(
- common.LogFields{"index": index}).Warning("convert index to IP address failed")
- continue
- }
- if s, ok := server.indexToSession.LoadOrStore(index, newSession); ok {
- // Index is already in use or acquired concurrently.
- // If the existing session is expired, reap it and try again
- // to acquire it.
- existingSession := s.(*session)
- if existingSession.expired(idleExpiry) {
- server.removeSession(existingSession)
- // Try to acquire this index again. We can't fall through and
- // use this index as removeSession has cleared indexToSession.
- index--
- }
- continue
- }
- // Note: the To4() for assignedIPv4Address is essential since
- // that address value is assumed to be 4 bytes when rewriting.
- newSession.index = index
- newSession.assignedIPv4Address = IPv4Address
- newSession.assignedIPv6Address = IPv6Address
- server.sessionIDToIndex.Store(newSession.sessionID, index)
- server.resetRouting(newSession.assignedIPv4Address, newSession.assignedIPv6Address)
- return nil
- }
- return errors.TraceNew("unallocated index not found")
- }
- func (server *Server) resetRouting(IPv4Address, IPv6Address net.IP) {
- // Attempt to clear the NAT table of any existing connection
- // states. This will prevent the (already unlikely) delivery
- // of packets to the wrong client when an assigned IP address is
- // recycled. Silently has no effect on some platforms, see
- // resetNATTables implementations.
- err := resetNATTables(server.config, IPv4Address)
- if err != nil {
- server.config.Logger.WithTraceFields(
- common.LogFields{"error": err}).Warning("reset IPv4 routing failed")
- }
- err = resetNATTables(server.config, IPv6Address)
- if err != nil {
- server.config.Logger.WithTraceFields(
- common.LogFields{"error": err}).Warning("reset IPv6 routing failed")
- }
- }
- func (server *Server) convertIPAddressToIndex(IP net.IP) int32 {
- // Assumes IP is at least 3 bytes.
- size := len(IP)
- return int32(IP[size-3])<<16 | int32(IP[size-2])<<8 | int32(IP[size-1])
- }
- func (server *Server) convertIndexToIPv4Address(index int32) net.IP {
- return net.ParseIP(
- fmt.Sprintf(
- assignedIPv4AddressTemplate,
- (index>>16)&0xFF,
- (index>>8)&0xFF,
- index&0xFF))
- }
- func (server *Server) convertIndexToIPv6Address(index int32) net.IP {
- return net.ParseIP(
- fmt.Sprintf(
- assignedIPv6AddressTemplate,
- (index>>16)&0xFF,
- (index>>8)&0xFF,
- index&0xFF))
- }
- type session struct {
- lastActivity atomic.Int64
- lastFlowReapIndex atomic.Int64
- downstreamPackets unsafe.Pointer
- checkAllowedTCPPortFunc unsafe.Pointer
- checkAllowedUDPPortFunc unsafe.Pointer
- checkAllowedDomainFunc unsafe.Pointer
- flowActivityUpdaterMaker unsafe.Pointer
- metricsUpdater unsafe.Pointer
- dnsQualityReporter unsafe.Pointer
- allowBogons bool
- metrics *packetMetrics
- sessionID string
- index int32
- enableDNSFlowTracking bool
- DNSResolverIPv4Addresses []net.IP
- TCPDNSResolverIPv4Index int
- assignedIPv4Address net.IP
- setOriginalIPv4Address int32
- originalIPv4Address net.IP
- DNSResolverIPv6Addresses []net.IP
- TCPDNSResolverIPv6Index int
- assignedIPv6Address net.IP
- setOriginalIPv6Address int32
- originalIPv6Address net.IP
- flows sync.Map
- workers *sync.WaitGroup
- mutex sync.Mutex
- channel *Channel
- runContext context.Context
- stopRunning context.CancelFunc
- }
- func (session *session) touch() {
- session.lastActivity.Store(int64(monotime.Now()))
- }
- func (session *session) expired(idleExpiry time.Duration) bool {
- lastActivity := monotime.Time(session.lastActivity.Load())
- return monotime.Since(lastActivity) > idleExpiry
- }
- func (session *session) setOriginalIPv4AddressIfNotSet(IPAddress net.IP) {
- if !atomic.CompareAndSwapInt32(&session.setOriginalIPv4Address, 0, 1) {
- return
- }
- // Make a copy of IPAddress; don't reference a slice of a reusable
- // packet buffer, which will be overwritten.
- session.originalIPv4Address = net.IP(append([]byte(nil), []byte(IPAddress)...))
- }
- func (session *session) getOriginalIPv4Address() net.IP {
- if atomic.LoadInt32(&session.setOriginalIPv4Address) == 0 {
- return nil
- }
- return session.originalIPv4Address
- }
- func (session *session) setOriginalIPv6AddressIfNotSet(IPAddress net.IP) {
- if !atomic.CompareAndSwapInt32(&session.setOriginalIPv6Address, 0, 1) {
- return
- }
- // Make a copy of IPAddress.
- session.originalIPv6Address = net.IP(append([]byte(nil), []byte(IPAddress)...))
- }
- func (session *session) getOriginalIPv6Address() net.IP {
- if atomic.LoadInt32(&session.setOriginalIPv6Address) == 0 {
- return nil
- }
- return session.originalIPv6Address
- }
- func (session *session) setDownstreamPackets(p *PacketQueue) {
- atomic.StorePointer(&session.downstreamPackets, unsafe.Pointer(p))
- }
- func (session *session) getDownstreamPackets() *PacketQueue {
- return (*PacketQueue)(atomic.LoadPointer(&session.downstreamPackets))
- }
- func (session *session) setCheckAllowedTCPPortFunc(p *AllowedPortChecker) {
- atomic.StorePointer(&session.checkAllowedTCPPortFunc, unsafe.Pointer(p))
- }
- func (session *session) getCheckAllowedTCPPortFunc() AllowedPortChecker {
- p := (*AllowedPortChecker)(atomic.LoadPointer(&session.checkAllowedTCPPortFunc))
- if p == nil {
- return nil
- }
- return *p
- }
- func (session *session) setCheckAllowedUDPPortFunc(p *AllowedPortChecker) {
- atomic.StorePointer(&session.checkAllowedUDPPortFunc, unsafe.Pointer(p))
- }
- func (session *session) getCheckAllowedUDPPortFunc() AllowedPortChecker {
- p := (*AllowedPortChecker)(atomic.LoadPointer(&session.checkAllowedUDPPortFunc))
- if p == nil {
- return nil
- }
- return *p
- }
- func (session *session) setCheckAllowedDomainFunc(p *AllowedDomainChecker) {
- atomic.StorePointer(&session.checkAllowedDomainFunc, unsafe.Pointer(p))
- }
- func (session *session) getCheckAllowedDomainFunc() AllowedDomainChecker {
- p := (*AllowedDomainChecker)(atomic.LoadPointer(&session.checkAllowedDomainFunc))
- if p == nil {
- return nil
- }
- return *p
- }
- func (session *session) setFlowActivityUpdaterMaker(p *FlowActivityUpdaterMaker) {
- atomic.StorePointer(&session.flowActivityUpdaterMaker, unsafe.Pointer(p))
- }
- func (session *session) getFlowActivityUpdaterMaker() FlowActivityUpdaterMaker {
- p := (*FlowActivityUpdaterMaker)(atomic.LoadPointer(&session.flowActivityUpdaterMaker))
- if p == nil {
- return nil
- }
- return *p
- }
- func (session *session) setMetricsUpdater(p *MetricsUpdater) {
- atomic.StorePointer(&session.metricsUpdater, unsafe.Pointer(p))
- }
- func (session *session) getMetricsUpdater() MetricsUpdater {
- p := (*MetricsUpdater)(atomic.LoadPointer(&session.metricsUpdater))
- if p == nil {
- return nil
- }
- return *p
- }
- func (session *session) setDNSQualityReporter(p *DNSQualityReporter) {
- atomic.StorePointer(&session.dnsQualityReporter, unsafe.Pointer(p))
- }
- func (session *session) getDNSQualityReporter() DNSQualityReporter {
- p := (*DNSQualityReporter)(atomic.LoadPointer(&session.dnsQualityReporter))
- if p == nil {
- return nil
- }
- return *p
- }
- // flowID identifies an IP traffic flow using the conventional
- // network 5-tuple. flowIDs track bidirectional flows.
- type flowID struct {
- downstreamIPAddress [net.IPv6len]byte
- downstreamPort uint16
- upstreamIPAddress [net.IPv6len]byte
- upstreamPort uint16
- protocol internetProtocol
- }
- // From: https://github.com/golang/go/blob/b88efc7e7ac15f9e0b5d8d9c82f870294f6a3839/src/net/ip.go#L55
- var v4InV6Prefix = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}
- func (f *flowID) set(
- downstreamIPAddress net.IP,
- downstreamPort uint16,
- upstreamIPAddress net.IP,
- upstreamPort uint16,
- protocol internetProtocol) {
- if len(downstreamIPAddress) == net.IPv4len {
- copy(f.downstreamIPAddress[:], v4InV6Prefix)
- copy(f.downstreamIPAddress[len(v4InV6Prefix):], downstreamIPAddress)
- } else { // net.IPv6len
- copy(f.downstreamIPAddress[:], downstreamIPAddress)
- }
- f.downstreamPort = downstreamPort
- if len(upstreamIPAddress) == net.IPv4len {
- copy(f.upstreamIPAddress[:], v4InV6Prefix)
- copy(f.upstreamIPAddress[len(v4InV6Prefix):], upstreamIPAddress)
- } else { // net.IPv6len
- copy(f.upstreamIPAddress[:], upstreamIPAddress)
- }
- f.upstreamPort = upstreamPort
- f.protocol = protocol
- }
- type flowState struct {
- firstUpstreamPacketTime atomic.Int64
- lastUpstreamPacketTime atomic.Int64
- firstDownstreamPacketTime atomic.Int64
- lastDownstreamPacketTime atomic.Int64
- isDNS bool
- dnsQualityReporter DNSQualityReporter
- activityUpdaters []FlowActivityUpdater
- }
- func (flowState *flowState) expired(idleExpiry time.Duration) bool {
- now := monotime.Now()
- // Traffic in either direction keeps the flow alive. Initially, only one of
- // lastUpstreamPacketTime or lastDownstreamPacketTime will be set by
- // startTrackingFlow, and the other value will be 0 and evaluate as expired.
- return (now.Sub(monotime.Time(flowState.lastUpstreamPacketTime.Load())) > idleExpiry) &&
- (now.Sub(monotime.Time(flowState.lastDownstreamPacketTime.Load())) > idleExpiry)
- }
- // isTrackingFlow checks if a flow is being tracked.
- func (session *session) isTrackingFlow(ID flowID) bool {
- f, ok := session.flows.Load(ID)
- if !ok {
- return false
- }
- flowState := f.(*flowState)
- // Check if flow is expired but not yet reaped.
- if flowState.expired(FLOW_IDLE_EXPIRY) {
- session.deleteFlow(ID, flowState)
- return false
- }
- return true
- }
- // startTrackingFlow starts flow tracking for the flow identified
- // by ID.
- //
- // Flow tracking is used to implement:
- // - one-time permissions checks for a flow
- // - OSLs
- // - domain bytes transferred [TODO]
- // - DNS quality metrics
- //
- // The applicationData from the first packet in the flow is
- // inspected to determine any associated hostname, using HTTP or
- // TLS payload. The session's FlowActivityUpdaterMaker is invoked
- // to determine a list of updaters to track flow activity.
- //
- // Updaters receive reports with the number of application data
- // bytes in each flow packet. This number, totalled for all packets
- // in a flow, may exceed the total bytes transferred at the
- // application level due to TCP retransmission. Currently, the flow
- // tracking logic doesn't exclude retransmitted packets from update
- // reporting.
- //
- // Flows are untracked after an idle expiry period. Transport
- // protocol indicators of end of flow, such as FIN or RST for TCP,
- // which may or may not appear in a flow, are not currently used.
- //
- // startTrackingFlow may be called from concurrent goroutines; if
- // the flow is already tracked, it is simply updated.
- func (session *session) startTrackingFlow(
- ID flowID,
- direction packetDirection,
- applicationData []byte,
- isDNS bool) {
- now := int64(monotime.Now())
- // Once every period, iterate over flows and reap expired entries.
- reapIndex := now / int64(monotime.Time(FLOW_IDLE_EXPIRY/2))
- previousReapIndex := session.lastFlowReapIndex.Load()
- if reapIndex != previousReapIndex &&
- session.lastFlowReapIndex.CompareAndSwap(previousReapIndex, reapIndex) {
- session.reapFlows()
- }
- var isTCP bool
- var hostname string
- if ID.protocol == internetProtocolTCP {
- // TODO: implement
- // hostname = common.ExtractHostnameFromTCPFlow(applicationData)
- isTCP = true
- }
- var activityUpdaters []FlowActivityUpdater
- // Don't incur activity monitor overhead for DNS requests
- if !isDNS {
- flowActivityUpdaterMaker := session.getFlowActivityUpdaterMaker()
- if flowActivityUpdaterMaker != nil {
- activityUpdaters = flowActivityUpdaterMaker(
- isTCP,
- hostname,
- net.IP(ID.upstreamIPAddress[:]))
- }
- }
- flowState := &flowState{
- isDNS: isDNS,
- activityUpdaters: activityUpdaters,
- dnsQualityReporter: session.getDNSQualityReporter(),
- }
- if direction == packetDirectionServerUpstream {
- flowState.firstUpstreamPacketTime.Store(now)
- flowState.lastUpstreamPacketTime.Store(now)
- } else {
- flowState.firstDownstreamPacketTime.Store(now)
- flowState.lastDownstreamPacketTime.Store(now)
- }
- // LoadOrStore will retain any existing entry
- session.flows.LoadOrStore(ID, flowState)
- session.updateFlow(ID, direction, applicationData)
- }
- func (session *session) updateFlow(
- ID flowID,
- direction packetDirection,
- applicationData []byte) {
- f, ok := session.flows.Load(ID)
- if !ok {
- return
- }
- flowState := f.(*flowState)
- // Note: no expired check here, since caller is assumed to
- // have just called isTrackingFlow.
- now := int64(monotime.Now())
- var upstreamBytes, downstreamBytes, durationNanoseconds int64
- if direction == packetDirectionServerUpstream {
- upstreamBytes = int64(len(applicationData))
- flowState.firstUpstreamPacketTime.CompareAndSwap(0, now)
- flowState.lastUpstreamPacketTime.Store(now)
- } else {
- downstreamBytes = int64(len(applicationData))
- flowState.firstDownstreamPacketTime.CompareAndSwap(0, now)
- // Follows common.ActivityMonitoredConn semantics, where
- // duration is updated only for downstream activity. This
- // is intened to produce equivalent behaviour for port
- // forward clients (tracked with ActivityUpdaters) and
- // packet tunnel clients (tracked with FlowActivityUpdaters).
- durationNanoseconds = now - flowState.lastDownstreamPacketTime.Swap(now)
- }
- for _, updater := range flowState.activityUpdaters {
- updater.UpdateProgress(downstreamBytes, upstreamBytes, durationNanoseconds)
- }
- }
- // deleteFlow stops tracking a flow and logs any outstanding metrics.
- // flowState is passed in to avoid duplicating the lookup that all callers
- // have already performed.
- func (session *session) deleteFlow(ID flowID, flowState *flowState) {
- if flowState.isDNS {
- dnsStartTime := monotime.Time(
- flowState.firstUpstreamPacketTime.Load())
- if dnsStartTime > 0 {
- // Record DNS quality metrics using a heuristic: if a packet was sent and
- // then a packet was received, assume the DNS request successfully received
- // a valid response; failure occurs when the resolver fails to provide a
- // response; a "no such host" response is still a success. Limitations: we
- // assume a resolver will not respond when, e.g., rate limiting; we ignore
- // subsequent requests made via the same UDP/TCP flow; deleteFlow may be
- // called only after the flow has expired, which adds some delay to the
- // recording of the DNS metric.
- dnsEndTime := monotime.Time(
- flowState.firstDownstreamPacketTime.Load())
- dnsSuccess := true
- if dnsEndTime == 0 {
- dnsSuccess = false
- dnsEndTime = monotime.Now()
- }
- resolveElapsedTime := dnsEndTime.Sub(dnsStartTime)
- if flowState.dnsQualityReporter != nil {
- flowState.dnsQualityReporter(
- dnsSuccess,
- resolveElapsedTime,
- net.IP(ID.upstreamIPAddress[:]))
- }
- }
- }
- session.flows.Delete(ID)
- }
- // reapFlows removes expired idle flows.
- func (session *session) reapFlows() {
- session.flows.Range(func(key, value interface{}) bool {
- flowState := value.(*flowState)
- if flowState.expired(FLOW_IDLE_EXPIRY) {
- session.deleteFlow(key.(flowID), flowState)
- }
- return true
- })
- }
- // deleteFlows deletes all flows.
- func (session *session) deleteFlows() {
- session.flows.Range(func(key, value interface{}) bool {
- session.deleteFlow(key.(flowID), value.(*flowState))
- return true
- })
- }
- type packetMetrics struct {
- upstreamRejectReasons [packetRejectReasonCount]atomic.Int64
- downstreamRejectReasons [packetRejectReasonCount]atomic.Int64
- TCPIPv4 relayedPacketMetrics
- TCPIPv6 relayedPacketMetrics
- UDPIPv4 relayedPacketMetrics
- UDPIPv6 relayedPacketMetrics
- }
- type relayedPacketMetrics struct {
- packetsUp atomic.Int64
- packetsDown atomic.Int64
- bytesUp atomic.Int64
- bytesDown atomic.Int64
- applicationBytesUp atomic.Int64
- applicationBytesDown atomic.Int64
- }
- func (metrics *packetMetrics) rejectedPacket(
- direction packetDirection,
- reason packetRejectReason) {
- if direction == packetDirectionServerUpstream ||
- direction == packetDirectionClientUpstream {
- metrics.upstreamRejectReasons[reason].Add(1)
- } else { // packetDirectionDownstream
- metrics.downstreamRejectReasons[reason].Add(1)
- }
- }
- func (metrics *packetMetrics) relayedPacket(
- direction packetDirection,
- version int,
- protocol internetProtocol,
- packetLength, applicationDataLength int) {
- var packetsMetric, bytesMetric, applicationBytesMetric *atomic.Int64
- if direction == packetDirectionServerUpstream ||
- direction == packetDirectionClientUpstream {
- if version == 4 {
- if protocol == internetProtocolTCP {
- packetsMetric = &metrics.TCPIPv4.packetsUp
- bytesMetric = &metrics.TCPIPv4.bytesUp
- applicationBytesMetric = &metrics.TCPIPv4.applicationBytesUp
- } else { // UDP
- packetsMetric = &metrics.UDPIPv4.packetsUp
- bytesMetric = &metrics.UDPIPv4.bytesUp
- applicationBytesMetric = &metrics.UDPIPv4.applicationBytesUp
- }
- } else { // IPv6
- if protocol == internetProtocolTCP {
- packetsMetric = &metrics.TCPIPv6.packetsUp
- bytesMetric = &metrics.TCPIPv6.bytesUp
- applicationBytesMetric = &metrics.TCPIPv6.applicationBytesUp
- } else { // UDP
- packetsMetric = &metrics.UDPIPv6.packetsUp
- bytesMetric = &metrics.UDPIPv6.bytesUp
- applicationBytesMetric = &metrics.UDPIPv6.applicationBytesUp
- }
- }
- } else { // packetDirectionDownstream
- if version == 4 {
- if protocol == internetProtocolTCP {
- packetsMetric = &metrics.TCPIPv4.packetsDown
- bytesMetric = &metrics.TCPIPv4.bytesDown
- applicationBytesMetric = &metrics.TCPIPv4.applicationBytesDown
- } else { // UDP
- packetsMetric = &metrics.UDPIPv4.packetsDown
- bytesMetric = &metrics.UDPIPv4.bytesDown
- applicationBytesMetric = &metrics.UDPIPv4.applicationBytesDown
- }
- } else { // IPv6
- if protocol == internetProtocolTCP {
- packetsMetric = &metrics.TCPIPv6.packetsDown
- bytesMetric = &metrics.TCPIPv6.bytesDown
- applicationBytesMetric = &metrics.TCPIPv6.applicationBytesDown
- } else { // UDP
- packetsMetric = &metrics.UDPIPv6.packetsDown
- bytesMetric = &metrics.UDPIPv6.bytesDown
- applicationBytesMetric = &metrics.UDPIPv6.applicationBytesDown
- }
- }
- }
- packetsMetric.Add(1)
- bytesMetric.Add(int64(packetLength))
- applicationBytesMetric.Add(int64(applicationDataLength))
- }
- const (
- packetMetricsRejected = 1
- packetMetricsRelayed = 2
- packetMetricsAll = packetMetricsRejected | packetMetricsRelayed
- )
- func (metrics *packetMetrics) checkpoint(
- logger common.Logger, updater MetricsUpdater, logName string, whichMetrics int) {
- // Report all metric counters in a single log message. Each
- // counter is reset to 0 when added to the log.
- logFields := make(common.LogFields)
- if whichMetrics&packetMetricsRejected != 0 {
- for i := 0; i < packetRejectReasonCount; i++ {
- logFields["upstream_packet_rejected_"+packetRejectReasonDescription(packetRejectReason(i))] =
- metrics.upstreamRejectReasons[i].Swap(0)
- logFields["downstream_packet_rejected_"+packetRejectReasonDescription(packetRejectReason(i))] =
- metrics.downstreamRejectReasons[i].Swap(0)
- }
- }
- if whichMetrics&packetMetricsRelayed != 0 {
- var TCPApplicationBytesUp, TCPApplicationBytesDown,
- UDPApplicationBytesUp, UDPApplicationBytesDown int64
- relayedMetrics := []struct {
- prefix string
- metrics *relayedPacketMetrics
- updaterBytesUp *int64
- updaterBytesDown *int64
- }{
- {"tcp_ipv4_", &metrics.TCPIPv4, &TCPApplicationBytesUp, &TCPApplicationBytesDown},
- {"tcp_ipv6_", &metrics.TCPIPv6, &TCPApplicationBytesUp, &TCPApplicationBytesDown},
- {"udp_ipv4_", &metrics.UDPIPv4, &UDPApplicationBytesUp, &UDPApplicationBytesDown},
- {"udp_ipv6_", &metrics.UDPIPv6, &UDPApplicationBytesUp, &UDPApplicationBytesDown},
- }
- for _, r := range relayedMetrics {
- applicationBytesUp := r.metrics.applicationBytesUp.Swap(0)
- applicationBytesDown := r.metrics.applicationBytesDown.Swap(0)
- *r.updaterBytesUp += applicationBytesUp
- *r.updaterBytesDown += applicationBytesDown
- logFields[r.prefix+"packets_up"] = r.metrics.packetsUp.Swap(0)
- logFields[r.prefix+"packets_down"] = r.metrics.packetsDown.Swap(0)
- logFields[r.prefix+"bytes_up"] = r.metrics.bytesUp.Swap(0)
- logFields[r.prefix+"bytes_down"] = r.metrics.bytesDown.Swap(0)
- logFields[r.prefix+"application_bytes_up"] = applicationBytesUp
- logFields[r.prefix+"application_bytes_down"] = applicationBytesDown
- }
- if updater != nil {
- updater(
- TCPApplicationBytesDown, TCPApplicationBytesUp,
- UDPApplicationBytesDown, UDPApplicationBytesUp)
- }
- }
- // Not currently a shipped LogMetric.
- logger.WithTraceFields(logFields).Info(logName)
- }
- // PacketQueue is a fixed-size, preallocated queue of packets.
- // Enqueued packets are packed into a contiguous buffer with channel
- // framing, allowing the entire queue to be written to a channel
- // in a single call.
- // Reuse of the queue buffers avoids GC churn. To avoid memory use
- // spikes when many clients connect and may disconnect before relaying
- // packets, the packet queue buffers start small and grow when required,
- // up to the maximum size, and then remain static.
- type PacketQueue struct {
- maxSize int
- emptyBuffers chan []byte
- activeBuffer chan []byte
- }
- // NewPacketQueue creates a new PacketQueue.
- // The caller must ensure that maxSize exceeds the
- // packet MTU, or packets will will never enqueue.
- func NewPacketQueue(maxSize int) *PacketQueue {
- // Two buffers of size up to maxSize are allocated, to
- // allow packets to continue to enqueue while one buffer
- // is borrowed by the DequeueFramedPackets caller.
- //
- // TODO: is there a way to implement this without
- // allocating up to 2x maxSize bytes? A circular queue
- // won't work because we want DequeueFramedPackets
- // to return a contiguous buffer. Perhaps a Bip
- // Buffer would work here:
- // https://www.codeproject.com/Articles/3479/The-Bip-Buffer-The-Circular-Buffer-with-a-Twist
- queue := &PacketQueue{
- maxSize: maxSize,
- emptyBuffers: make(chan []byte, 2),
- activeBuffer: make(chan []byte, 1),
- }
- queue.emptyBuffers <- make([]byte, 0)
- queue.emptyBuffers <- make([]byte, 0)
- return queue
- }
- // Enqueue adds a packet to the queue.
- // If the queue is full, the packet is dropped.
- // Enqueue is _not_ safe for concurrent calls.
- func (queue *PacketQueue) Enqueue(packet []byte) {
- var buffer []byte
- select {
- case buffer = <-queue.activeBuffer:
- default:
- buffer = <-queue.emptyBuffers
- }
- packetSize := len(packet)
- if queue.maxSize-len(buffer) >= channelHeaderSize+packetSize {
- // Assumes len(packet)/MTU <= 64K
- var channelHeader [channelHeaderSize]byte
- binary.BigEndian.PutUint16(channelHeader[:], uint16(packetSize))
- // Once the buffer has reached maxSize capacity
- // and been replaced (buffer = buffer[0:0]), these
- // appends should no longer allocate new memory and
- // should just copy to preallocated memory.
- buffer = append(buffer, channelHeader[:]...)
- buffer = append(buffer, packet...)
- }
- // Else, queue is full, so drop packet.
- queue.activeBuffer <- buffer
- }
- // DequeueFramedPackets waits until at least one packet is
- // enqueued, and then returns a packet buffer containing one
- // or more framed packets. The returned buffer remains part
- // of the PacketQueue structure and the caller _must_ replace
- // the buffer by calling Replace.
- // DequeueFramedPackets unblocks and returns false if it receives
- // runContext.Done().
- // DequeueFramedPackets is _not_ safe for concurrent calls.
- func (queue *PacketQueue) DequeueFramedPackets(
- runContext context.Context) ([]byte, bool) {
- var buffer []byte
- select {
- case buffer = <-queue.activeBuffer:
- case <-runContext.Done():
- return nil, false
- }
- return buffer, true
- }
- // Replace returns the buffer to the PacketQueue to be
- // reused.
- // The input must be a return value from DequeueFramedPackets.
- func (queue *PacketQueue) Replace(buffer []byte) {
- buffer = buffer[0:0]
- // This won't block (as long as it is a DequeueFramedPackets return value).
- queue.emptyBuffers <- buffer
- }
- // ClientConfig specifies the configuration of a packet tunnel client.
- type ClientConfig struct {
- // Logger is used for logging events and metrics.
- Logger common.Logger
- // SudoNetworkConfigCommands specifies whether to use "sudo"
- // when executing network configuration commands. See description
- // for ServerConfig.SudoNetworkConfigCommands.
- SudoNetworkConfigCommands bool
- // AllowNoIPv6NetworkConfiguration indicates that failures while
- // configuring tun interfaces and routing for IPv6 are to be
- // logged as warnings only. See description for
- // ServerConfig.AllowNoIPv6NetworkConfiguration.
- AllowNoIPv6NetworkConfiguration bool
- // MTU is the packet MTU value to use; this value
- // should be obtained from the packet tunnel server.
- // When MTU is 0, a default value is used.
- MTU int
- // UpstreamPacketQueueSize specifies the size of the upstream
- // packet queue.
- // When UpstreamPacketQueueSize is 0, a default value tuned for
- // Psiphon is used.
- UpstreamPacketQueueSize int
- // Transport is an established transport channel that
- // will be used to relay packets to and from a packet
- // tunnel server.
- Transport io.ReadWriteCloser
- // TunFileDescriptor specifies a file descriptor to use to
- // read and write packets to be relayed to the client. When
- // TunFileDescriptor is specified, the Client will use this
- // existing tun device and not create its own; in this case,
- // network address and routing configuration is not performed
- // by the Client. As the packet tunnel server performs
- // transparent source IP address and DNS rewriting, the tun
- // device may have any assigned IP address, but should be
- // configured with the given MTU; and DNS should be configured
- // to use the specified transparent DNS resolver addresses.
- // Set TunFileDescriptor to <= 0 to ignore this parameter
- // and create and configure a tun device.
- TunFileDescriptor int
- // IPv4AddressCIDR is the IPv4 address and netmask to
- // assign to a newly created tun device.
- IPv4AddressCIDR string
- // IPv6AddressCIDR is the IPv6 address and prefix to
- // assign to a newly created tun device.
- IPv6AddressCIDR string
- // TransparentDNSIPv4Address is the IPv4 address of the DNS server
- // configured by a VPN using a packet tunnel. All DNS packets
- // destined to this DNS server are transparently redirected to
- // the Psiphon server DNS.
- TransparentDNSIPv4Address string
- // TransparentDNSIPv4Address is the IPv6 address of the DNS server
- // configured by a VPN using a packet tunnel. All DNS packets
- // destined to this DNS server are transparently redirected to
- // the Psiphon server DNS.
- TransparentDNSIPv6Address string
- // RouteDestinations are hosts (IPs) or networks (CIDRs)
- // to be configured to be routed through a newly
- // created tun device.
- RouteDestinations []string
- }
- // Client is a packet tunnel client. A packet tunnel client
- // relays packets between a local tun device and a packet
- // tunnel server via a transport channel.
- type Client struct {
- config *ClientConfig
- transparentDNS *clientTransparentDNS
- device *Device
- channel *Channel
- upstreamPackets *PacketQueue
- metrics *packetMetrics
- runContext context.Context
- stopRunning context.CancelFunc
- workers *sync.WaitGroup
- }
- // clientTransparentDNS caches the parsed representions of
- // TransparentDNSIPv4/6Address for fast packet processing and rewriting.
- type clientTransparentDNS struct {
- IPv4Address net.IP
- IPv6Address net.IP
- }
- func newClientTransparentDNS(
- IPv4Address, IPv6Address string) (*clientTransparentDNS, error) {
- var IPv4, IPv6 net.IP
- if IPv4Address != "" {
- IPv4 = net.ParseIP(IPv4Address)
- if IPv4 != nil {
- IPv4 = IPv4.To4()
- }
- if IPv4 == nil {
- return nil, errors.TraceNew("invalid IPv4 address")
- }
- }
- if IPv6Address != "" {
- IPv6 = net.ParseIP(IPv6Address)
- if IPv6 == nil || IPv6.To4() != nil {
- return nil, errors.TraceNew("invalid IPv6 address")
- }
- }
- return &clientTransparentDNS{
- IPv4Address: IPv4,
- IPv6Address: IPv6,
- }, nil
- }
- // NewClient initializes a new Client. Unless using the
- // TunFileDescriptor configuration parameter, a new tun
- // device is created for the client.
- func NewClient(config *ClientConfig) (*Client, error) {
- var device *Device
- var err error
- if config.TunFileDescriptor > 0 {
- device, err = NewClientDeviceFromFD(config)
- } else {
- device, err = NewClientDevice(config)
- }
- if err != nil {
- return nil, errors.Trace(err)
- }
- upstreamPacketQueueSize := DEFAULT_UPSTREAM_PACKET_QUEUE_SIZE
- if config.UpstreamPacketQueueSize > 0 {
- upstreamPacketQueueSize = config.UpstreamPacketQueueSize
- }
- transparentDNS, err := newClientTransparentDNS(
- config.TransparentDNSIPv4Address,
- config.TransparentDNSIPv6Address)
- if err != nil {
- return nil, errors.Trace(err)
- }
- runContext, stopRunning := context.WithCancel(context.Background())
- return &Client{
- config: config,
- transparentDNS: transparentDNS,
- device: device,
- channel: NewChannel(config.Transport, getMTU(config.MTU)),
- upstreamPackets: NewPacketQueue(upstreamPacketQueueSize),
- metrics: new(packetMetrics),
- runContext: runContext,
- stopRunning: stopRunning,
- workers: new(sync.WaitGroup),
- }, nil
- }
- // Start starts a client and returns with it running.
- func (client *Client) Start() {
- client.config.Logger.WithTrace().Info("starting")
- client.workers.Add(1)
- go func() {
- defer client.workers.Done()
- for {
- readPacket, err := client.device.ReadPacket()
- select {
- case <-client.runContext.Done():
- // No error is logged as shutdown may have interrupted read.
- return
- default:
- }
- if err != nil {
- client.config.Logger.WithTraceFields(
- common.LogFields{"error": err}).Info("read device packet failed")
- // May be temporary error condition, keep working.
- continue
- }
- // processPacket will check for packets the server will reject
- // and drop those without sending.
- // Limitation: packet metrics, including successful relay count,
- // are incremented _before_ the packet is written to the channel.
- if !processPacket(
- client.metrics,
- nil,
- client.transparentDNS,
- packetDirectionClientUpstream,
- readPacket) {
- continue
- }
- // Instead of immediately writing to the channel, the
- // packet is enqueued, which has the effect of batching
- // up IP packets into a single channel packet (for Psiphon,
- // an SSH packet) to minimize overhead and, as benchmarked,
- // improve throughput.
- // Packet will be dropped if queue is full.
- client.upstreamPackets.Enqueue(readPacket)
- }
- }()
- client.workers.Add(1)
- go func() {
- defer client.workers.Done()
- for {
- packetBuffer, ok := client.upstreamPackets.DequeueFramedPackets(client.runContext)
- if !ok {
- // Dequeue aborted due to session.runContext.Done()
- return
- }
- err := client.channel.WriteFramedPackets(packetBuffer)
- client.upstreamPackets.Replace(packetBuffer)
- if err != nil {
- client.config.Logger.WithTraceFields(
- common.LogFields{"error": err}).Info("write channel packets failed")
- // May be temporary error condition, such as reconnecting the tunnel;
- // keep working. The packets are most likely dropped.
- continue
- }
- }
- }()
- client.workers.Add(1)
- go func() {
- defer client.workers.Done()
- for {
- readPacket, err := client.channel.ReadPacket()
- select {
- case <-client.runContext.Done():
- // No error is logged as shutdown may have interrupted read.
- return
- default:
- }
- if err != nil {
- client.config.Logger.WithTraceFields(
- common.LogFields{"error": err}).Info("read channel packet failed")
- // May be temporary error condition, such as reconnecting the tunnel;
- // keep working.
- continue
- }
- if !processPacket(
- client.metrics,
- nil,
- client.transparentDNS,
- packetDirectionClientDownstream,
- readPacket) {
- continue
- }
- err = client.device.WritePacket(readPacket)
- if err != nil {
- client.config.Logger.WithTraceFields(
- common.LogFields{"error": err}).Info("write device packet failed")
- // May be temporary error condition, keep working. The packet is
- // most likely dropped.
- continue
- }
- }
- }()
- }
- // Stop halts a running client.
- func (client *Client) Stop() {
- client.config.Logger.WithTrace().Info("stopping")
- client.stopRunning()
- client.device.Close()
- client.channel.Close()
- client.workers.Wait()
- client.metrics.checkpoint(
- client.config.Logger, nil, "packet_metrics", packetMetricsAll)
- client.config.Logger.WithTrace().Info("stopped")
- }
- /*
- Packet offset constants in getPacketDestinationIPAddress and
- processPacket are from the following RFC definitions.
- IPv4 header: https://tools.ietf.org/html/rfc791
- 0 1 2 3
- 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- |Version| IHL |Type of Service| Total Length |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Identification |Flags| Fragment Offset |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Time to Live | Protocol | Header Checksum |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Source Address |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Destination Address |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Options | Padding |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- IPv6 header: https://tools.ietf.org/html/rfc2460
- 0 1 2 3
- 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- |Version| Traffic Class | Flow Label |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Payload Length | Next Header | Hop Limit |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | |
- + +
- | |
- + Source Address +
- | |
- + +
- | |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | |
- + +
- | |
- + Destination Address +
- | |
- + +
- | |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- TCP header: https://tools.ietf.org/html/rfc793
- 0 1 2 3
- 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Source Port | Destination Port |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Sequence Number |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Acknowledgment Number |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Data | |U|A|P|R|S|F| |
- | Offset| Reserved |R|C|S|S|Y|I| Window |
- | | |G|K|H|T|N|N| |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Checksum | Urgent Pointer |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Options | Padding |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | data |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- UDP header: https://tools.ietf.org/html/rfc768
- 0 7 8 15 16 23 24 31
- +--------+--------+--------+--------+
- | Source | Destination |
- | Port | Port |
- +--------+--------+--------+--------+
- | | |
- | Length | Checksum |
- +--------+--------+--------+--------+
- |
- | data octets ...
- +---------------- ...
- */
- const (
- packetDirectionServerUpstream = 0
- packetDirectionServerDownstream = 1
- packetDirectionClientUpstream = 2
- packetDirectionClientDownstream = 3
- internetProtocolTCP = 6
- internetProtocolUDP = 17
- portNumberDNS = 53
- packetRejectNoSession = 0
- packetRejectDestinationAddress = 1
- packetRejectLength = 2
- packetRejectVersion = 3
- packetRejectOptions = 4
- packetRejectProtocol = 5
- packetRejectTCPProtocolLength = 6
- packetRejectUDPProtocolLength = 7
- packetRejectTCPPort = 8
- packetRejectUDPPort = 9
- packetRejectNoOriginalAddress = 10
- packetRejectNoDNSResolvers = 11
- packetRejectInvalidDNSMessage = 12
- packetRejectDisallowedDomain = 13
- packetRejectNoClient = 14
- packetRejectReasonCount = 15
- packetOk = 15
- )
- type packetDirection int
- type internetProtocol int
- type packetRejectReason int
- func packetRejectReasonDescription(reason packetRejectReason) string {
- // Description strings follow the metrics naming
- // convention: all lowercase; underscore seperators.
- switch reason {
- case packetRejectNoSession:
- return "no_session"
- case packetRejectDestinationAddress:
- return "invalid_destination_address"
- case packetRejectLength:
- return "invalid_ip_packet_length"
- case packetRejectVersion:
- return "invalid_ip_header_version"
- case packetRejectOptions:
- return "invalid_ip_header_options"
- case packetRejectProtocol:
- return "invalid_ip_header_protocol"
- case packetRejectTCPProtocolLength:
- return "invalid_tcp_packet_length"
- case packetRejectUDPProtocolLength:
- return "invalid_tcp_packet_length"
- case packetRejectTCPPort:
- return "disallowed_tcp_destination_port"
- case packetRejectUDPPort:
- return "disallowed_udp_destination_port"
- case packetRejectNoOriginalAddress:
- return "no_original_address"
- case packetRejectNoDNSResolvers:
- return "no_dns_resolvers"
- case packetRejectInvalidDNSMessage:
- return "invalid_dns_message"
- case packetRejectDisallowedDomain:
- return "disallowed_domain"
- case packetRejectNoClient:
- return "no_client"
- }
- return "unknown_reason"
- }
- // Caller: the destination IP address return value is
- // a slice of the packet input value and only valid while
- // the packet buffer remains valid.
- func getPacketDestinationIPAddress(
- metrics *packetMetrics,
- direction packetDirection,
- packet []byte) (net.IP, bool) {
- // TODO: this function duplicates a subset of the packet
- // parsing code in processPacket. Refactor to reuse code;
- // also, both getPacketDestinationIPAddress and processPacket
- // are called for some packets; refactor to only parse once.
- if len(packet) < 1 {
- metrics.rejectedPacket(direction, packetRejectLength)
- return nil, false
- }
- version := packet[0] >> 4
- if version != 4 && version != 6 {
- metrics.rejectedPacket(direction, packetRejectVersion)
- return nil, false
- }
- if version == 4 {
- if len(packet) < 20 {
- metrics.rejectedPacket(direction, packetRejectLength)
- return nil, false
- }
- return packet[16:20], true
- } else { // IPv6
- if len(packet) < 40 {
- metrics.rejectedPacket(direction, packetRejectLength)
- return nil, false
- }
- return packet[24:40], true
- }
- }
- // processPacket parses IP packets, applies relaying rules,
- // and rewrites packet elements as required. processPacket
- // returns true if a packet parses correctly, is accepted
- // by the relay rules, and is successfully rewritten.
- //
- // When a packet is rejected, processPacket returns false
- // and updates a reason in the supplied metrics.
- //
- // Rejection may result in partially rewritten packets.
- func processPacket(
- metrics *packetMetrics,
- session *session,
- clientTransparentDNS *clientTransparentDNS,
- direction packetDirection,
- packet []byte) bool {
- // Parse and validate IP packet structure
- // Must have an IP version field.
- if len(packet) < 1 {
- metrics.rejectedPacket(direction, packetRejectLength)
- return false
- }
- version := packet[0] >> 4
- // Must be IPv4 or IPv6.
- if version != 4 && version != 6 {
- metrics.rejectedPacket(direction, packetRejectVersion)
- return false
- }
- var protocol internetProtocol
- var sourceIPAddress, destinationIPAddress net.IP
- var sourcePort, destinationPort uint16
- var IPChecksum, TCPChecksum, UDPChecksum []byte
- var applicationData []byte
- if version == 4 {
- // IHL must be 5: options are not supported; a fixed
- // 20 byte header is expected.
- headerLength := packet[0] & 0x0F
- if headerLength != 5 {
- metrics.rejectedPacket(direction, packetRejectOptions)
- return false
- }
- if len(packet) < 20 {
- metrics.rejectedPacket(direction, packetRejectLength)
- return false
- }
- // Protocol must be TCP or UDP.
- protocol = internetProtocol(packet[9])
- dataOffset := 0
- if protocol == internetProtocolTCP {
- if len(packet) < 38 {
- metrics.rejectedPacket(direction, packetRejectTCPProtocolLength)
- return false
- }
- dataOffset = 20 + 4*int(packet[32]>>4)
- if len(packet) < dataOffset {
- metrics.rejectedPacket(direction, packetRejectTCPProtocolLength)
- return false
- }
- } else if protocol == internetProtocolUDP {
- dataOffset = 28
- if len(packet) < dataOffset {
- metrics.rejectedPacket(direction, packetRejectUDPProtocolLength)
- return false
- }
- } else {
- metrics.rejectedPacket(direction, packetRejectProtocol)
- return false
- }
- applicationData = packet[dataOffset:]
- // Slices reference packet bytes to be rewritten.
- sourceIPAddress = packet[12:16]
- destinationIPAddress = packet[16:20]
- IPChecksum = packet[10:12]
- // Port numbers have the same offset in TCP and UDP.
- sourcePort = binary.BigEndian.Uint16(packet[20:22])
- destinationPort = binary.BigEndian.Uint16(packet[22:24])
- if protocol == internetProtocolTCP {
- TCPChecksum = packet[36:38]
- } else { // UDP
- UDPChecksum = packet[26:28]
- }
- } else { // IPv6
- if len(packet) < 40 {
- metrics.rejectedPacket(direction, packetRejectLength)
- return false
- }
- // Next Header must be TCP or UDP.
- nextHeader := packet[6]
- protocol = internetProtocol(nextHeader)
- dataOffset := 0
- if protocol == internetProtocolTCP {
- if len(packet) < 58 {
- metrics.rejectedPacket(direction, packetRejectTCPProtocolLength)
- return false
- }
- dataOffset = 40 + 4*int(packet[52]>>4)
- if len(packet) < dataOffset {
- metrics.rejectedPacket(direction, packetRejectTCPProtocolLength)
- return false
- }
- } else if protocol == internetProtocolUDP {
- dataOffset = 48
- if len(packet) < dataOffset {
- metrics.rejectedPacket(direction, packetRejectUDPProtocolLength)
- return false
- }
- } else {
- metrics.rejectedPacket(direction, packetRejectProtocol)
- return false
- }
- applicationData = packet[dataOffset:]
- // Slices reference packet bytes to be rewritten.
- sourceIPAddress = packet[8:24]
- destinationIPAddress = packet[24:40]
- // Port numbers have the same offset in TCP and UDP.
- sourcePort = binary.BigEndian.Uint16(packet[40:42])
- destinationPort = binary.BigEndian.Uint16(packet[42:44])
- if protocol == internetProtocolTCP {
- TCPChecksum = packet[56:58]
- } else { // UDP
- UDPChecksum = packet[46:48]
- }
- }
- // Apply rules
- //
- // Most of this logic is only applied on the server, as only
- // the server knows the traffic rules configuration, and is
- // tracking flows.
- isServer := (direction == packetDirectionServerUpstream ||
- direction == packetDirectionServerDownstream)
- // Check if the packet qualifies for transparent DNS rewriting
- //
- // - Both TCP and UDP DNS packets may qualify
- // - Unless configured, transparent DNS flows are not tracked,
- // as most DNS resolutions are very-short lived exchanges
- // - The traffic rules checks are bypassed, since transparent
- // DNS is essential
- // Transparent DNS is a two-step translation. On the client, the VPN
- // can be configured with any private address range, so as to not
- // conflict with other local networks, such as WiFi. For example, the
- // client may select from 192.168.0.0/16, when an existing interface
- // uses a subnet in 10.0.0.0/8, and specify the VPN DNS server as 192.168.0.1.
- //
- // The first translation, on the client side, rewrites packets
- // destined to 192.168.0.1:53, the DNS server, to the destination
- // transparentDNSResolverIPv4Address:53. This packet is sent to the
- // server.
- //
- // The second translation, on the server side, rewrites packets
- // destined to transparentDNSResolverIPv4Address:53 to an actual DNS
- // server destination.
- //
- // Then, reverse rewrites are applied to DNS response packets: the
- // server rewrites the source address actual-DNS-server:53 to
- // transparentDNSResolverIPv4Address:53, and then the client rewrites
- // the source address transparentDNSResolverIPv4Address:53 to
- // 192.168.0.1:53, and that packet is written to the tun device.
- doTransparentDNS := false
- if isServer {
- if direction == packetDirectionServerUpstream {
- // DNS packets destinated for the transparent DNS target addresses
- // will be rewritten to go to one of the server's resolvers.
- if destinationPort == portNumberDNS {
- if version == 4 &&
- destinationIPAddress.Equal(transparentDNSResolverIPv4Address) {
- numResolvers := len(session.DNSResolverIPv4Addresses)
- if numResolvers > 0 {
- doTransparentDNS = true
- } else {
- metrics.rejectedPacket(direction, packetRejectNoDNSResolvers)
- return false
- }
- } else if version == 6 &&
- destinationIPAddress.Equal(transparentDNSResolverIPv6Address) {
- numResolvers := len(session.DNSResolverIPv6Addresses)
- if numResolvers > 0 {
- doTransparentDNS = true
- } else {
- metrics.rejectedPacket(direction, packetRejectNoDNSResolvers)
- return false
- }
- }
- // Limitation: checkAllowedDomainFunc is applied only to DNS queries in
- // UDP; currently DNS-over-TCP will bypass the domain block list check.
- if doTransparentDNS && protocol == internetProtocolUDP {
- domain, err := common.ParseDNSQuestion(applicationData)
- if err != nil {
- metrics.rejectedPacket(direction, packetRejectInvalidDNSMessage)
- return false
- }
- if domain != "" {
- checkAllowedDomainFunc := session.getCheckAllowedDomainFunc()
- if !checkAllowedDomainFunc(domain) {
- metrics.rejectedPacket(direction, packetRejectDisallowedDomain)
- return false
- }
- }
- }
- }
- } else { // packetDirectionServerDownstream
- // DNS packets with a source address of any of the server's
- // resolvers will be rewritten back to the transparent DNS target
- // address.
- // Limitation: responses to client DNS packets _originally
- // destined_ for a resolver in GetDNSResolverIPv4Addresses will
- // be lost. This would happen if some process on the client
- // ignores the system set DNS values; and forces use of the same
- // resolvers as the server.
- if sourcePort == portNumberDNS {
- if version == 4 {
- for _, IPAddress := range session.DNSResolverIPv4Addresses {
- if sourceIPAddress.Equal(IPAddress) {
- doTransparentDNS = true
- break
- }
- }
- } else if version == 6 {
- for _, IPAddress := range session.DNSResolverIPv6Addresses {
- if sourceIPAddress.Equal(IPAddress) {
- doTransparentDNS = true
- break
- }
- }
- }
- }
- }
- } else { // isClient
- if direction == packetDirectionClientUpstream {
- // DNS packets destined to the configured VPN DNS servers,
- // specified in clientTransparentDNS, are rewritten to go to
- // transparentDNSResolverIPv4/6Address.
- if destinationPort == portNumberDNS {
- if (version == 4 && destinationIPAddress.Equal(clientTransparentDNS.IPv4Address)) ||
- (version == 6 && destinationIPAddress.Equal(clientTransparentDNS.IPv6Address)) {
- doTransparentDNS = true
- }
- }
- } else { // packetDirectionClientDownstream
- // DNS packets with a transparentDNSResolverIPv4/6Address source
- // address are rewritten to come from the configured VPN DNS servers.
- if sourcePort == portNumberDNS {
- if (version == 4 && sourceIPAddress.Equal(transparentDNSResolverIPv4Address)) ||
- (version == 6 && sourceIPAddress.Equal(transparentDNSResolverIPv6Address)) {
- doTransparentDNS = true
- }
- }
- }
- }
- // Apply rewrites before determining flow ID to ensure that corresponding up-
- // and downstream flows yield the same flow ID.
- var rewriteSourceIPAddress, rewriteDestinationIPAddress net.IP
- if direction == packetDirectionServerUpstream {
- // Store original source IP address to be replaced in
- // downstream rewriting.
- if version == 4 {
- session.setOriginalIPv4AddressIfNotSet(sourceIPAddress)
- rewriteSourceIPAddress = session.assignedIPv4Address
- } else { // version == 6
- session.setOriginalIPv6AddressIfNotSet(sourceIPAddress)
- rewriteSourceIPAddress = session.assignedIPv6Address
- }
- // Rewrite DNS packets destinated for the transparent DNS target addresses
- // to go to one of the server's resolvers. This random selection uses
- // math/rand to minimize overhead.
- //
- // Limitation: TCP packets are always assigned to the same resolver, as
- // currently there is no method for tracking the assigned resolver per TCP
- // flow.
- if doTransparentDNS {
- if version == 4 {
- index := session.TCPDNSResolverIPv4Index
- if protocol == internetProtocolUDP {
- index = rand.Intn(len(session.DNSResolverIPv4Addresses))
- }
- rewriteDestinationIPAddress = session.DNSResolverIPv4Addresses[index]
- } else { // version == 6
- index := session.TCPDNSResolverIPv6Index
- if protocol == internetProtocolUDP {
- index = rand.Intn(len(session.DNSResolverIPv6Addresses))
- }
- rewriteDestinationIPAddress = session.DNSResolverIPv6Addresses[index]
- }
- }
- } else if direction == packetDirectionServerDownstream {
- // Destination address will be original source address.
- if version == 4 {
- rewriteDestinationIPAddress = session.getOriginalIPv4Address()
- } else if version == 6 {
- rewriteDestinationIPAddress = session.getOriginalIPv6Address()
- }
- if rewriteDestinationIPAddress == nil {
- metrics.rejectedPacket(direction, packetRejectNoOriginalAddress)
- return false
- }
- // Rewrite source address of packets from servers' resolvers
- // to transparent DNS target address.
- if doTransparentDNS {
- if version == 4 {
- rewriteSourceIPAddress = transparentDNSResolverIPv4Address
- } else if version == 6 {
- rewriteSourceIPAddress = transparentDNSResolverIPv6Address
- }
- }
- } else if direction == packetDirectionClientUpstream {
- // Rewrite the destination address to be
- // transparentDNSResolverIPv4/6Address, which the server will
- // subsequently send on to actual DNS servers.
- if doTransparentDNS {
- if version == 4 {
- rewriteDestinationIPAddress = transparentDNSResolverIPv4Address
- } else if version == 6 {
- rewriteDestinationIPAddress = transparentDNSResolverIPv6Address
- }
- }
- } else if direction == packetDirectionClientDownstream {
- // Rewrite the source address so the DNS response appears to come from
- // the configured VPN DNS server.
- if doTransparentDNS {
- if version == 4 {
- rewriteSourceIPAddress = clientTransparentDNS.IPv4Address
- } else if version == 6 {
- rewriteSourceIPAddress = clientTransparentDNS.IPv6Address
- }
- }
- }
- // Check if flow is tracked before checking traffic permission
- doFlowTracking := isServer && (!doTransparentDNS || session.enableDNSFlowTracking)
- // TODO: verify this struct is stack allocated
- var ID flowID
- isTrackingFlow := false
- if doFlowTracking {
- if direction == packetDirectionServerUpstream {
- // Reflect rewrites in the upstream case and don't reflect rewrites in the
- // following downstream case: all flow IDs are in the upstream space, with
- // the assigned private IP for the client and, in the case of DNS, the
- // actual resolver IP.
- srcIP := sourceIPAddress
- if rewriteSourceIPAddress != nil {
- srcIP = rewriteSourceIPAddress
- }
- destIP := destinationIPAddress
- if rewriteDestinationIPAddress != nil {
- destIP = rewriteDestinationIPAddress
- }
- ID.set(srcIP, sourcePort, destIP, destinationPort, protocol)
- } else if direction == packetDirectionServerDownstream {
- ID.set(
- destinationIPAddress,
- destinationPort,
- sourceIPAddress,
- sourcePort,
- protocol)
- }
- isTrackingFlow = session.isTrackingFlow(ID)
- }
- // Check packet source/destination is permitted; except for:
- // - existing flows, which have already been checked
- // - transparent DNS, which is always allowed
- if !doTransparentDNS && !isTrackingFlow {
- // Enforce traffic rules (allowed TCP/UDP ports).
- checkPort := 0
- if direction == packetDirectionServerUpstream ||
- direction == packetDirectionClientUpstream {
- checkPort = int(destinationPort)
- } else if direction == packetDirectionServerDownstream ||
- direction == packetDirectionClientDownstream {
- checkPort = int(sourcePort)
- }
- if protocol == internetProtocolTCP {
- invalidPort := (checkPort == 0)
- if !invalidPort && isServer {
- checkAllowedTCPPortFunc := session.getCheckAllowedTCPPortFunc()
- if checkAllowedTCPPortFunc == nil ||
- !checkAllowedTCPPortFunc(net.IP(ID.upstreamIPAddress[:]), checkPort) {
- invalidPort = true
- }
- }
- if invalidPort {
- metrics.rejectedPacket(direction, packetRejectTCPPort)
- return false
- }
- } else if protocol == internetProtocolUDP {
- invalidPort := (checkPort == 0)
- if !invalidPort && isServer {
- checkAllowedUDPPortFunc := session.getCheckAllowedUDPPortFunc()
- if checkAllowedUDPPortFunc == nil ||
- !checkAllowedUDPPortFunc(net.IP(ID.upstreamIPAddress[:]), checkPort) {
- invalidPort = true
- }
- }
- if invalidPort {
- metrics.rejectedPacket(direction, packetRejectUDPPort)
- return false
- }
- }
- // Enforce no localhost, multicast or broadcast packets; and no
- // client-to-client packets.
- //
- // TODO: a client-side check could check that destination IP
- // is strictly a tun device IP address.
- if !destinationIPAddress.IsGlobalUnicast() ||
- (direction == packetDirectionServerUpstream &&
- !session.allowBogons &&
- common.IsBogon(destinationIPAddress)) ||
- // Client-to-client packets are disallowed even when other bogons are
- // allowed.
- (direction == packetDirectionServerUpstream &&
- ((version == 4 &&
- !destinationIPAddress.Equal(transparentDNSResolverIPv4Address) &&
- privateSubnetIPv4.Contains(destinationIPAddress)) ||
- (version == 6 &&
- !destinationIPAddress.Equal(transparentDNSResolverIPv6Address) &&
- privateSubnetIPv6.Contains(destinationIPAddress)))) {
- metrics.rejectedPacket(direction, packetRejectDestinationAddress)
- return false
- }
- }
- // Apply packet rewrites. IP (v4 only) and TCP/UDP all have packet
- // checksums which are updated to relect the rewritten headers.
- var checksumAccumulator int32
- if rewriteSourceIPAddress != nil {
- checksumAccumulate(sourceIPAddress, false, &checksumAccumulator)
- copy(sourceIPAddress, rewriteSourceIPAddress)
- checksumAccumulate(sourceIPAddress, true, &checksumAccumulator)
- }
- if rewriteDestinationIPAddress != nil {
- checksumAccumulate(destinationIPAddress, false, &checksumAccumulator)
- copy(destinationIPAddress, rewriteDestinationIPAddress)
- checksumAccumulate(destinationIPAddress, true, &checksumAccumulator)
- }
- if rewriteSourceIPAddress != nil || rewriteDestinationIPAddress != nil {
- // IPv6 doesn't have an IP header checksum.
- if version == 4 {
- checksumAdjust(IPChecksum, checksumAccumulator)
- }
- if protocol == internetProtocolTCP {
- checksumAdjust(TCPChecksum, checksumAccumulator)
- } else { // UDP
- checksumAdjust(UDPChecksum, checksumAccumulator)
- }
- }
- // Start/update flow tracking, only once past all possible packet rejects
- if doFlowTracking {
- if !isTrackingFlow {
- session.startTrackingFlow(ID, direction, applicationData, doTransparentDNS)
- } else {
- session.updateFlow(ID, direction, applicationData)
- }
- }
- metrics.relayedPacket(direction, int(version), protocol, len(packet), len(applicationData))
- return true
- }
- // Checksum code based on https://github.com/OpenVPN/openvpn:
- /*
- OpenVPN (TM) -- An Open Source VPN daemon
- Copyright (C) 2002-2017 OpenVPN Technologies, Inc. <[email protected]>
- OpenVPN license:
- ----------------
- OpenVPN is distributed under the GPL license version 2 (see COPYRIGHT.GPL).
- */
- func checksumAccumulate(data []byte, newData bool, accumulator *int32) {
- // Based on ADD_CHECKSUM_32 and SUB_CHECKSUM_32 macros from OpenVPN:
- // https://github.com/OpenVPN/openvpn/blob/58716979640b5d8850b39820f91da616964398cc/src/openvpn/proto.h#L177
- // Assumes length of data is factor of 4.
- for i := 0; i < len(data); i += 4 {
- word := uint32(data[i+0])<<24 | uint32(data[i+1])<<16 | uint32(data[i+2])<<8 | uint32(data[i+3])
- if newData {
- *accumulator -= int32(word & 0xFFFF)
- *accumulator -= int32(word >> 16)
- } else {
- *accumulator += int32(word & 0xFFFF)
- *accumulator += int32(word >> 16)
- }
- }
- }
- func checksumAdjust(checksumData []byte, accumulator int32) {
- // Based on ADJUST_CHECKSUM macro from OpenVPN:
- // https://github.com/OpenVPN/openvpn/blob/58716979640b5d8850b39820f91da616964398cc/src/openvpn/proto.h#L177
- // Assumes checksumData is 2 byte slice.
- checksum := uint16(checksumData[0])<<8 | uint16(checksumData[1])
- accumulator += int32(checksum)
- if accumulator < 0 {
- accumulator = -accumulator
- accumulator = (accumulator >> 16) + (accumulator & 0xFFFF)
- accumulator += accumulator >> 16
- checksum = uint16(^accumulator)
- } else {
- accumulator = (accumulator >> 16) + (accumulator & 0xFFFF)
- accumulator += accumulator >> 16
- checksum = uint16(accumulator)
- }
- checksumData[0] = byte(checksum >> 8)
- checksumData[1] = byte(checksum & 0xFF)
- }
- /*
- packet debugging snippet:
- import (
- "github.com/google/gopacket"
- "github.com/google/gopacket/layers"
- )
- func tracePacket(where string, packet []byte) {
- var p gopacket.Packet
- if len(packet) > 0 && packet[0]>>4 == 4 {
- p = gopacket.NewPacket(packet, layers.LayerTypeIPv4, gopacket.Default)
- } else {
- p = gopacket.NewPacket(packet, layers.LayerTypeIPv6, gopacket.Default)
- }
- fmt.Printf("[%s packet]:\n%s\n\n", where, p)
- }
- */
- // Device manages a tun device. It handles packet I/O using static,
- // preallocated buffers to avoid GC churn.
- type Device struct {
- name string
- writeMutex sync.Mutex
- deviceIO io.ReadWriteCloser
- inboundBuffer []byte
- outboundBuffer []byte
- }
- // NewServerDevice creates and configures a new server tun device.
- // Since the server uses fixed address spaces, only one server
- // device may exist per host.
- func NewServerDevice(config *ServerConfig) (*Device, error) {
- file, deviceName, err := OpenTunDevice("")
- if err != nil {
- return nil, errors.Trace(err)
- }
- err = configureServerInterface(config, deviceName)
- if err != nil {
- _ = file.Close()
- return nil, errors.Trace(err)
- }
- return newDevice(
- deviceName,
- file,
- getMTU(config.MTU)), nil
- }
- // NewClientDevice creates and configures a new client tun device.
- // Multiple client tun devices may exist per host.
- func NewClientDevice(config *ClientConfig) (*Device, error) {
- file, deviceName, err := OpenTunDevice("")
- if err != nil {
- return nil, errors.Trace(err)
- }
- err = configureClientInterface(
- config, deviceName)
- if err != nil {
- _ = file.Close()
- return nil, errors.Trace(err)
- }
- return newDevice(
- deviceName,
- file,
- getMTU(config.MTU)), nil
- }
- func newDevice(
- name string,
- deviceIO io.ReadWriteCloser,
- MTU int) *Device {
- return &Device{
- name: name,
- deviceIO: deviceIO,
- inboundBuffer: makeDeviceInboundBuffer(MTU),
- outboundBuffer: makeDeviceOutboundBuffer(MTU),
- }
- }
- // NewClientDeviceFromFD wraps an existing tun device.
- func NewClientDeviceFromFD(config *ClientConfig) (*Device, error) {
- file, err := fileFromFD(config.TunFileDescriptor, "")
- if err != nil {
- return nil, errors.Trace(err)
- }
- MTU := getMTU(config.MTU)
- return &Device{
- name: "",
- deviceIO: file,
- inboundBuffer: makeDeviceInboundBuffer(MTU),
- outboundBuffer: makeDeviceOutboundBuffer(MTU),
- }, nil
- }
- // Name returns the interface name for a created tun device,
- // or returns "" for a device created by NewClientDeviceFromFD.
- // The interface name may be used for additional network and
- // routing configuration.
- func (device *Device) Name() string {
- return device.name
- }
- // ReadPacket reads one full packet from the tun device. The
- // return value is a slice of a static, reused buffer, so the
- // value is only valid until the next ReadPacket call.
- // Concurrent calls to ReadPacket are _not_ supported.
- func (device *Device) ReadPacket() ([]byte, error) {
- // readTunPacket performs the platform dependent
- // packet read operation.
- offset, size, err := device.readTunPacket()
- if err != nil {
- return nil, errors.Trace(err)
- }
- return device.inboundBuffer[offset : offset+size], nil
- }
- // WritePacket writes one full packet to the tun device.
- // Concurrent calls to WritePacket are supported.
- func (device *Device) WritePacket(packet []byte) error {
- // This mutex ensures that only one concurrent goroutine
- // can use outboundBuffer when writing.
- device.writeMutex.Lock()
- defer device.writeMutex.Unlock()
- // writeTunPacket performs the platform dependent
- // packet write operation.
- err := device.writeTunPacket(packet)
- if err != nil {
- return errors.Trace(err)
- }
- return nil
- }
- // Close interrupts any blocking Read/Write calls and
- // tears down the tun device.
- func (device *Device) Close() error {
- return device.deviceIO.Close()
- }
- // Channel manages packet transport over a communications channel.
- // Any io.ReadWriteCloser can provide transport. In psiphond, the
- // io.ReadWriteCloser will be an SSH channel. Channel I/O frames
- // packets with a length header and uses static, preallocated
- // buffers to avoid GC churn.
- type Channel struct {
- transport io.ReadWriteCloser
- inboundBuffer []byte
- outboundBuffer []byte
- }
- // IP packets cannot be larger that 64K, so a 16-bit length
- // header is sufficient.
- const (
- channelHeaderSize = 2
- )
- // NewChannel initializes a new Channel.
- func NewChannel(transport io.ReadWriteCloser, MTU int) *Channel {
- return &Channel{
- transport: transport,
- inboundBuffer: make([]byte, channelHeaderSize+MTU),
- outboundBuffer: make([]byte, channelHeaderSize+MTU),
- }
- }
- // ReadPacket reads one full packet from the channel. The
- // return value is a slice of a static, reused buffer, so the
- // value is only valid until the next ReadPacket call.
- // Concurrent calls to ReadPacket are not supported.
- func (channel *Channel) ReadPacket() ([]byte, error) {
- header := channel.inboundBuffer[0:channelHeaderSize]
- _, err := io.ReadFull(channel.transport, header)
- if err != nil {
- return nil, errors.Trace(err)
- }
- size := int(binary.BigEndian.Uint16(header))
- if size > len(channel.inboundBuffer[channelHeaderSize:]) {
- return nil, errors.Tracef("packet size exceeds MTU: %d", size)
- }
- packet := channel.inboundBuffer[channelHeaderSize : channelHeaderSize+size]
- _, err = io.ReadFull(channel.transport, packet)
- if err != nil {
- return nil, errors.Trace(err)
- }
- return packet, nil
- }
- // WritePacket writes one full packet to the channel.
- // Concurrent calls to WritePacket are not supported.
- func (channel *Channel) WritePacket(packet []byte) error {
- // Flow control assumed to be provided by the transport. In the case
- // of SSH, the channel window size will determine whether the packet
- // data is transmitted immediately or whether the transport.Write will
- // block. When the channel window is full and transport.Write blocks,
- // the sender's tun device will not be read (client case) or the send
- // queue will fill (server case) and packets will be dropped. In this
- // way, the channel window size will influence the TCP window size for
- // tunneled traffic.
- // When the transport is an SSH channel, the overhead per packet message
- // includes:
- //
- // - SSH_MSG_CHANNEL_DATA: 5 bytes (https://tools.ietf.org/html/rfc4254#section-5.2)
- // - SSH packet: ~28 bytes (https://tools.ietf.org/html/rfc4253#section-5.3), with MAC
- // - TCP/IP transport for SSH: 40 bytes for IPv4
- // Assumes MTU <= 64K and len(packet) <= MTU
- size := len(packet)
- binary.BigEndian.PutUint16(channel.outboundBuffer, uint16(size))
- copy(channel.outboundBuffer[channelHeaderSize:], packet)
- _, err := channel.transport.Write(channel.outboundBuffer[0 : channelHeaderSize+size])
- if err != nil {
- return errors.Trace(err)
- }
- return nil
- }
- // WriteFramedPackets writes a buffer of pre-framed packets to
- // the channel.
- // Concurrent calls to WriteFramedPackets are not supported.
- func (channel *Channel) WriteFramedPackets(packetBuffer []byte) error {
- _, err := channel.transport.Write(packetBuffer)
- if err != nil {
- return errors.Trace(err)
- }
- return nil
- }
- // Close interrupts any blocking Read/Write calls and
- // closes the channel transport.
- func (channel *Channel) Close() error {
- return channel.transport.Close()
- }
|