tun.go 101 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206
  1. /*
  2. * Copyright (c) 2017, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. // Copyright 2009 The Go Authors. All rights reserved.
  20. // Use of this source code is governed by a BSD-style
  21. // license that can be found in the LICENSE file.
  22. /*
  23. Package tun is an IP packet tunnel server and client. It supports tunneling
  24. both IPv4 and IPv6.
  25. ......................................................... .-,( ),-.
  26. . [server] .-----. . .-( )-.
  27. . | NIC |<---->( Internet )
  28. . ....................................... '-----' . '-( ).-'
  29. . . [packet tunnel daemon] . ^ . '-.( ).-'
  30. . . . | .
  31. . . ........................... . | .
  32. . . . [session] . . NAT .
  33. . . . . . | .
  34. . . . . . v .
  35. . . . . . .---. .
  36. . . . . . | t | .
  37. . . . . . | u | .
  38. . . . .---. . .---. . | n | .
  39. . . . | q | . | d | . | | .
  40. . . . | u | . | e | . | d | .
  41. . . . .------| e |<-----| m |<---------| e | .
  42. . . . | | u | . | u | . | v | .
  43. . . . | | e | . | x | . | i | .
  44. . . . rewrite '---' . '---' . | c | .
  45. . . . | . . | e | .
  46. . . . v . . '---' .
  47. . . . .---------. . . ^ .
  48. . . . | channel |--rewrite--------------------' .
  49. . . . '---------' . . .
  50. . . ...........^............... . .
  51. . .............|......................... .
  52. ...............|.........................................
  53. |
  54. | (typically via Internet)
  55. |
  56. ...............|.................
  57. . [client] | .
  58. . | .
  59. . .............|............... .
  60. . . v . .
  61. . . .---------. . .
  62. . . | channel | . .
  63. . . '---------' . .
  64. . . ^ . .
  65. . .............|............... .
  66. . v .
  67. . .------------. .
  68. . | tun device | .
  69. . '------------' .
  70. .................................
  71. The client relays IP packets between a local tun device and a channel, which
  72. is a transport to the server. In Psiphon, the channel will be an SSH channel
  73. within an SSH connection to a Psiphon server.
  74. The server relays packets between each client and its own tun device. The
  75. server tun device is NATed to the Internet via an external network interface.
  76. In this way, client traffic is tunneled and will egress from the server host.
  77. Similar to a typical VPN, IP addresses are assigned to each client. Unlike
  78. a typical VPN, the assignment is not transmitted to the client. Instead, the
  79. server transparently rewrites the source addresses of client packets to
  80. the assigned IP address. The server also rewrites the destination address of
  81. certain DNS packets. The purpose of this is to allow clients to reconnect
  82. to different servers without having to tear down or change their local
  83. network configuration. Clients may configure their local tun device with an
  84. arbitrary IP address and an arbitrary DNS resolver address.
  85. The server uses the 24-bit 10.0.0.0/8 IPv4 private address space to maximize
  86. the number of addresses available, due to Psiphon client churn and minimum
  87. address lease time constraints. For IPv6, a 24-bit unique local space is used.
  88. When a client is allocated addresses, a unique, unused 24-bit "index" is
  89. reserved/leased. This index maps to and from IPv4 and IPv6 private addresses.
  90. The server multiplexes all client packets into a single tun device. When a
  91. packet is read, the destination address is used to map the packet back to the
  92. correct index, which maps back to the client.
  93. The server maintains client "sessions". A session maintains client IP
  94. address state and effectively holds the lease on assigned addresses. If a
  95. client is disconnected and quickly reconnects, it will resume its previous
  96. session, retaining its IP address and network connection states. Idle
  97. sessions with no client connection will eventually expire.
  98. Packet count and bytes transferred metrics are logged for each client session.
  99. The server integrates with and enforces Psiphon traffic rules and logging
  100. facilities. The server parses and validates packets. Client-to-client packets
  101. are not permitted. Only global unicast packets are permitted. Only TCP and UDP
  102. packets are permitted. The client also filters out, before sending, packets
  103. that the server won't route.
  104. Certain aspects of packet tunneling are outside the scope of this package;
  105. e.g, the Psiphon client and server are responsible for establishing an SSH
  106. channel and negotiating the correct MTU and DNS settings. The Psiphon
  107. server will call Server.ClientConnected when a client connects and establishes
  108. a packet tunnel channel; and Server.ClientDisconnected when the client closes
  109. the channel and/or disconnects.
  110. */
  111. package tun
  112. import (
  113. "context"
  114. "encoding/binary"
  115. "fmt"
  116. "io"
  117. "math/rand"
  118. "net"
  119. "sync"
  120. "sync/atomic"
  121. "time"
  122. "unsafe"
  123. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  124. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  125. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/monotime"
  126. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  127. )
  128. const (
  129. DEFAULT_MTU = 1500
  130. DEFAULT_DOWNSTREAM_PACKET_QUEUE_SIZE = 32768 * 16
  131. DEFAULT_UPSTREAM_PACKET_QUEUE_SIZE = 32768
  132. DEFAULT_IDLE_SESSION_EXPIRY_SECONDS = 300
  133. ORPHAN_METRICS_CHECKPOINTER_PERIOD = 30 * time.Minute
  134. FLOW_IDLE_EXPIRY = 60 * time.Second
  135. )
  136. // ServerConfig specifies the configuration of a packet tunnel server.
  137. type ServerConfig struct {
  138. // Logger is used for logging events and metrics.
  139. Logger common.Logger
  140. // SudoNetworkConfigCommands specifies whether to use "sudo"
  141. // when executing network configuration commands. This is required
  142. // when the packet tunnel server is not run as root and when
  143. // process capabilities are not available (only Linux kernel 4.3+
  144. // has the required capabilities support). The host sudoers file
  145. // must be configured to allow the tunnel server process user to
  146. // execute the commands invoked in configureServerInterface; see
  147. // the implementation for the appropriate platform.
  148. SudoNetworkConfigCommands bool
  149. // AllowNoIPv6NetworkConfiguration indicates that failures while
  150. // configuring tun interfaces and routing for IPv6 are to be
  151. // logged as warnings only. This option is intended to support
  152. // test cases on hosts without IPv6 and is not for production use;
  153. // the packet tunnel server will still accept IPv6 packets and
  154. // relay them to the tun device.
  155. // AllowNoIPv6NetworkConfiguration may not be supported on all
  156. // platforms.
  157. AllowNoIPv6NetworkConfiguration bool
  158. // EgressInterface is the interface to which client traffic is
  159. // masqueraded/NATed. For example, "eth0". If blank, a platform-
  160. // appropriate default is used.
  161. EgressInterface string
  162. // GetDNSResolverIPv4Addresses is a function which returns the
  163. // DNS resolvers to use as transparent DNS rewrite targets for
  164. // IPv4 DNS traffic.
  165. //
  166. // GetDNSResolverIPv4Addresses is invoked for each new client
  167. // session and the list of resolvers is stored with the session.
  168. // This is a compromise between checking current resolvers for
  169. // each packet (too expensive) and simply passing in a static
  170. // list (won't pick up resolver changes). As implemented, only
  171. // new client sessions will pick up resolver changes.
  172. //
  173. // Transparent DNS rewriting occurs when the client uses the
  174. // specific, target transparent DNS addresses specified by
  175. // GetTransparentDNSResolverIPv4/6Address.
  176. //
  177. // For outbound DNS packets with a target resolver IP address,
  178. // a random resolver is selected and used for the rewrite.
  179. // For inbound packets, _any_ resolver in the list is rewritten
  180. // back to the target resolver IP address. As a side-effect,
  181. // responses to client DNS packets originally destined for a
  182. // resolver in GetDNSResolverIPv4Addresses will be lost.
  183. GetDNSResolverIPv4Addresses func() []net.IP
  184. // GetDNSResolverIPv6Addresses is a function which returns the
  185. // DNS resolvers to use as transparent DNS rewrite targets for
  186. // IPv6 DNS traffic. It functions like GetDNSResolverIPv4Addresses.
  187. GetDNSResolverIPv6Addresses func() []net.IP
  188. // EnableDNSFlowTracking specifies whether to apply flow tracking to DNS
  189. // flows, as required for DNS quality metrics. Typically there are many
  190. // short-lived DNS flows to track and each tracked flow adds some overhead,
  191. // so this defaults to off.
  192. EnableDNSFlowTracking bool
  193. // DownstreamPacketQueueSize specifies the size of the downstream
  194. // packet queue. The packet tunnel server multiplexes all client
  195. // packets through a single tun device, so when a packet is read,
  196. // it must be queued or dropped if it cannot be immediately routed
  197. // to the appropriate client. Note that the TCP and SSH windows
  198. // for the underlying channel transport will impact transfer rate
  199. // and queuing.
  200. // When DownstreamPacketQueueSize is 0, a default value tuned for
  201. // Psiphon is used.
  202. DownstreamPacketQueueSize int
  203. // MTU specifies the maximum transmission unit for the packet
  204. // tunnel. Clients must be configured with the same MTU. The
  205. // server's tun device will be set to this MTU value and is
  206. // assumed not to change for the duration of the server.
  207. // When MTU is 0, a default value is used.
  208. MTU int
  209. // SessionIdleExpirySeconds specifies how long to retain client
  210. // sessions which have no client attached. Sessions are retained
  211. // across client connections so reconnecting clients can resume
  212. // a previous session. Resuming avoids leasing new IP addresses
  213. // for reconnection, and also retains NAT state for active
  214. // tunneled connections.
  215. //
  216. // SessionIdleExpirySeconds is also, effectively, the lease
  217. // time for assigned IP addresses.
  218. SessionIdleExpirySeconds int
  219. // AllowBogons disables bogon checks. This should be used only
  220. // for testing.
  221. AllowBogons bool
  222. }
  223. // Server is a packet tunnel server. A packet tunnel server
  224. // maintains client sessions, relays packets through client
  225. // channels, and multiplexes packets through a single tun
  226. // device. The server assigns IP addresses to clients, performs
  227. // IP address and transparent DNS rewriting, and enforces
  228. // traffic rules.
  229. type Server struct {
  230. config *ServerConfig
  231. device *Device
  232. indexToSession sync.Map
  233. sessionIDToIndex sync.Map
  234. connectedInProgress *sync.WaitGroup
  235. workers *sync.WaitGroup
  236. runContext context.Context
  237. stopRunning context.CancelFunc
  238. orphanMetrics *packetMetrics
  239. }
  240. // NewServer initializes a server.
  241. func NewServer(config *ServerConfig) (*Server, error) {
  242. device, err := NewServerDevice(config)
  243. if err != nil {
  244. return nil, errors.Trace(err)
  245. }
  246. runContext, stopRunning := context.WithCancel(context.Background())
  247. return &Server{
  248. config: config,
  249. device: device,
  250. connectedInProgress: new(sync.WaitGroup),
  251. workers: new(sync.WaitGroup),
  252. runContext: runContext,
  253. stopRunning: stopRunning,
  254. orphanMetrics: new(packetMetrics),
  255. }, nil
  256. }
  257. // Start starts a server and returns with it running.
  258. func (server *Server) Start() {
  259. server.config.Logger.WithTrace().Info("starting")
  260. server.workers.Add(1)
  261. go server.runSessionReaper()
  262. server.workers.Add(1)
  263. go server.runOrphanMetricsCheckpointer()
  264. server.workers.Add(1)
  265. go server.runDeviceDownstream()
  266. }
  267. // Stop halts a running server.
  268. func (server *Server) Stop() {
  269. server.config.Logger.WithTrace().Info("stopping")
  270. server.stopRunning()
  271. // Interrupt blocked device read/writes.
  272. server.device.Close()
  273. // Wait for any in-progress ClientConnected calls to complete.
  274. server.connectedInProgress.Wait()
  275. // After this point, no further clients will be added: all
  276. // in-progress ClientConnected calls have finished; and any
  277. // later ClientConnected calls won't get past their
  278. // server.runContext.Done() checks.
  279. // Close all clients. Client workers will be joined
  280. // by the following server.workers.Wait().
  281. server.indexToSession.Range(func(_, value interface{}) bool {
  282. session := value.(*session)
  283. server.interruptSession(session)
  284. return true
  285. })
  286. server.workers.Wait()
  287. server.config.Logger.WithTrace().Info("stopped")
  288. }
  289. // AllowedPortChecker is a function which returns true when it is
  290. // permitted to relay packets to the specified upstream IP address
  291. // and/or port.
  292. type AllowedPortChecker func(upstreamIPAddress net.IP, port int) bool
  293. // AllowedDomainChecker is a function which returns true when it is
  294. // permitted to resolve the specified domain name.
  295. type AllowedDomainChecker func(string) bool
  296. // FlowActivityUpdater defines an interface for receiving updates for
  297. // flow activity. Values passed to UpdateProgress are bytes transferred
  298. // and flow duration since the previous UpdateProgress.
  299. type FlowActivityUpdater interface {
  300. UpdateProgress(downstreamBytes, upstreamBytes, durationNanoseconds int64)
  301. }
  302. // FlowActivityUpdaterMaker is a function which returns a list of
  303. // appropriate updaters for a new flow to the specified upstream
  304. // hostname (if known -- may be ""), and IP address.
  305. // The flow is TCP when isTCP is true, and UDP otherwise.
  306. type FlowActivityUpdaterMaker func(
  307. isTCP bool, upstreamHostname string, upstreamIPAddress net.IP) []FlowActivityUpdater
  308. // MetricsUpdater is a function which receives a checkpoint summary
  309. // of application bytes transferred through a packet tunnel.
  310. type MetricsUpdater func(
  311. TCPApplicationBytesDown, TCPApplicationBytesUp,
  312. UDPApplicationBytesDown, UDPApplicationBytesUp int64)
  313. // DNSQualityReporter is a function which receives a DNS quality report:
  314. // whether a DNS request received a reponse, the elapsed time, and the
  315. // resolver used.
  316. type DNSQualityReporter func(
  317. receivedResponse bool, requestDuration time.Duration, resolverIP net.IP)
  318. // ClientConnected handles new client connections, creating or resuming
  319. // a session and returns with client packet handlers running.
  320. //
  321. // sessionID is used to identify sessions for resumption.
  322. //
  323. // transport provides the channel for relaying packets to and from
  324. // the client.
  325. //
  326. // checkAllowedTCPPortFunc/checkAllowedUDPPortFunc/checkAllowedDomainFunc
  327. // are callbacks used to enforce traffic rules. For each TCP/UDP flow, the
  328. // corresponding AllowedPort function is called to check if traffic to the
  329. // packet's port is permitted. For upstream DNS query packets,
  330. // checkAllowedDomainFunc is called to check if domain resolution is
  331. // permitted. These callbacks must be efficient and safe for concurrent
  332. // calls.
  333. //
  334. // flowActivityUpdaterMaker is a callback invoked for each new packet
  335. // flow; it may create updaters to track flow activity.
  336. //
  337. // metricsUpdater is a callback invoked at metrics checkpoints (usually
  338. // when the client disconnects) with a summary of application bytes
  339. // transferred.
  340. //
  341. // It is safe to make concurrent calls to ClientConnected for distinct
  342. // session IDs. The caller is responsible for serializing calls with the
  343. // same session ID. Further, the caller must ensure, in the case of a client
  344. // transport reconnect when an existing transport has not yet disconnected,
  345. // that ClientDisconnected is called first -- so it doesn't undo the new
  346. // ClientConnected. (psiphond meets these constraints by closing any
  347. // existing SSH client with duplicate session ID early in the lifecycle of
  348. // a new SSH client connection.)
  349. func (server *Server) ClientConnected(
  350. sessionID string,
  351. transport io.ReadWriteCloser,
  352. checkAllowedTCPPortFunc, checkAllowedUDPPortFunc AllowedPortChecker,
  353. checkAllowedDomainFunc AllowedDomainChecker,
  354. flowActivityUpdaterMaker FlowActivityUpdaterMaker,
  355. metricsUpdater MetricsUpdater,
  356. dnsQualityReporter DNSQualityReporter) error {
  357. // It's unusual to call both sync.WaitGroup.Add() _and_ Done() in the same
  358. // goroutine. There's no other place to call Add() since ClientConnected is
  359. // an API entrypoint. And Done() works because the invariant enforced by
  360. // connectedInProgress.Wait() is not that no ClientConnected calls are in
  361. // progress, but that no such calls are in progress past the
  362. // server.runContext.Done() check.
  363. // TODO: will this violate https://golang.org/pkg/sync/#WaitGroup.Add:
  364. // "calls with a positive delta that occur when the counter is zero must happen before a Wait"?
  365. server.connectedInProgress.Add(1)
  366. defer server.connectedInProgress.Done()
  367. select {
  368. case <-server.runContext.Done():
  369. return errors.TraceNew("server stopping")
  370. default:
  371. }
  372. server.config.Logger.WithTraceFields(
  373. common.LogFields{"sessionID": sessionID}).Debug("client connected")
  374. MTU := getMTU(server.config.MTU)
  375. clientSession := server.getSession(sessionID)
  376. if clientSession != nil {
  377. // Call interruptSession to ensure session is in the
  378. // expected idle state.
  379. server.interruptSession(clientSession)
  380. // Note: we don't check the session expiry; whether it has
  381. // already expired and not yet been reaped; or is about
  382. // to expire very shortly. It could happen that the reaper
  383. // will kill this session between now and when the expiry
  384. // is reset in the following resumeSession call. In this
  385. // unlikely case, the packet tunnel client should reconnect.
  386. } else {
  387. // Store IPv4 resolver addresses in 4-byte representation
  388. // for use in rewritting.
  389. resolvers := server.config.GetDNSResolverIPv4Addresses()
  390. DNSResolverIPv4Addresses := make([]net.IP, len(resolvers))
  391. for i, resolver := range resolvers {
  392. // Assumes To4 is non-nil
  393. DNSResolverIPv4Addresses[i] = resolver.To4()
  394. }
  395. clientSession = &session{
  396. allowBogons: server.config.AllowBogons,
  397. lastActivity: int64(monotime.Now()),
  398. sessionID: sessionID,
  399. metrics: new(packetMetrics),
  400. enableDNSFlowTracking: server.config.EnableDNSFlowTracking,
  401. DNSResolverIPv4Addresses: append([]net.IP(nil), DNSResolverIPv4Addresses...),
  402. DNSResolverIPv6Addresses: append([]net.IP(nil), server.config.GetDNSResolverIPv6Addresses()...),
  403. workers: new(sync.WaitGroup),
  404. }
  405. // One-time, for this session, random resolver selection for TCP transparent
  406. // DNS forwarding. See comment in processPacket.
  407. if len(clientSession.DNSResolverIPv4Addresses) > 0 {
  408. clientSession.TCPDNSResolverIPv4Index = prng.Intn(len(clientSession.DNSResolverIPv4Addresses))
  409. }
  410. if len(clientSession.DNSResolverIPv6Addresses) > 0 {
  411. clientSession.TCPDNSResolverIPv6Index = prng.Intn(len(clientSession.DNSResolverIPv6Addresses))
  412. }
  413. // allocateIndex initializes session.index, session.assignedIPv4Address,
  414. // and session.assignedIPv6Address; and updates server.indexToSession and
  415. // server.sessionIDToIndex.
  416. err := server.allocateIndex(clientSession)
  417. if err != nil {
  418. return errors.Trace(err)
  419. }
  420. }
  421. // Note: it's possible that a client disconnects (or reconnects before a
  422. // disconnect is detected) and interruptSession is called between
  423. // allocateIndex and resumeSession calls here, so interruptSession and
  424. // related code must not assume resumeSession has been called.
  425. server.resumeSession(
  426. clientSession,
  427. NewChannel(transport, MTU),
  428. checkAllowedTCPPortFunc,
  429. checkAllowedUDPPortFunc,
  430. checkAllowedDomainFunc,
  431. flowActivityUpdaterMaker,
  432. metricsUpdater,
  433. dnsQualityReporter)
  434. return nil
  435. }
  436. // ClientDisconnected handles clients disconnecting. Packet handlers
  437. // are halted, but the client session is left intact to reserve the
  438. // assigned IP addresses and retain network state in case the client
  439. // soon reconnects.
  440. func (server *Server) ClientDisconnected(sessionID string) {
  441. session := server.getSession(sessionID)
  442. if session != nil {
  443. server.config.Logger.WithTraceFields(
  444. common.LogFields{"sessionID": sessionID}).Debug("client disconnected")
  445. server.interruptSession(session)
  446. }
  447. }
  448. func (server *Server) getSession(sessionID string) *session {
  449. if index, ok := server.sessionIDToIndex.Load(sessionID); ok {
  450. s, ok := server.indexToSession.Load(index.(int32))
  451. if ok {
  452. return s.(*session)
  453. }
  454. server.config.Logger.WithTrace().Warning("unexpected missing session")
  455. }
  456. return nil
  457. }
  458. func (server *Server) resumeSession(
  459. session *session,
  460. channel *Channel,
  461. checkAllowedTCPPortFunc, checkAllowedUDPPortFunc AllowedPortChecker,
  462. checkAllowedDomainFunc AllowedDomainChecker,
  463. flowActivityUpdaterMaker FlowActivityUpdaterMaker,
  464. metricsUpdater MetricsUpdater,
  465. dnsQualityReporter DNSQualityReporter) {
  466. session.mutex.Lock()
  467. defer session.mutex.Unlock()
  468. // Performance/concurrency note: the downstream packet queue
  469. // and various packet event callbacks may be accessed while
  470. // the session is idle, via the runDeviceDownstream goroutine,
  471. // which runs concurrent to resumeSession/interruptSession calls.
  472. // Consequently, all accesses to these fields must be
  473. // synchronized.
  474. //
  475. // Benchmarking indicates the atomic.LoadPointer mechanism
  476. // outperforms a mutex; approx. 2 ns/op vs. 20 ns/op in the case
  477. // of getCheckAllowedTCPPortFunc. Since these accesses occur
  478. // multiple times per packet, atomic.LoadPointer is used and so
  479. // each of these fields is an unsafe.Pointer in the session
  480. // struct.
  481. // Begin buffering downstream packets.
  482. downstreamPacketQueueSize := DEFAULT_DOWNSTREAM_PACKET_QUEUE_SIZE
  483. if server.config.DownstreamPacketQueueSize > 0 {
  484. downstreamPacketQueueSize = server.config.DownstreamPacketQueueSize
  485. }
  486. downstreamPackets := NewPacketQueue(downstreamPacketQueueSize)
  487. session.setDownstreamPackets(downstreamPackets)
  488. // Set new access control, flow monitoring, and metrics
  489. // callbacks; all associated with the new client connection.
  490. // IMPORTANT: any new callbacks or references to the outer client added
  491. // here must be cleared in interruptSession to ensure that a paused
  492. // session does not retain references to old client connection objects
  493. // after the client disconnects.
  494. session.setCheckAllowedTCPPortFunc(&checkAllowedTCPPortFunc)
  495. session.setCheckAllowedUDPPortFunc(&checkAllowedUDPPortFunc)
  496. session.setCheckAllowedDomainFunc(&checkAllowedDomainFunc)
  497. session.setFlowActivityUpdaterMaker(&flowActivityUpdaterMaker)
  498. session.setMetricsUpdater(&metricsUpdater)
  499. session.setDNSQualityReporter(&dnsQualityReporter)
  500. session.channel = channel
  501. // Parent context is not server.runContext so that session workers
  502. // need only check session.stopRunning to act on shutdown events.
  503. session.runContext, session.stopRunning = context.WithCancel(context.Background())
  504. // When a session is interrupted, all goroutines in session.workers
  505. // are joined. When the server is stopped, all goroutines in
  506. // server.workers are joined. So, in both cases we synchronously
  507. // stop all workers associated with this session.
  508. session.workers.Add(1)
  509. go server.runClientUpstream(session)
  510. session.workers.Add(1)
  511. go server.runClientDownstream(session)
  512. session.touch()
  513. }
  514. func (server *Server) interruptSession(session *session) {
  515. session.mutex.Lock()
  516. defer session.mutex.Unlock()
  517. wasRunning := (session.channel != nil)
  518. if session.stopRunning != nil {
  519. session.stopRunning()
  520. }
  521. if session.channel != nil {
  522. // Interrupt blocked channel read/writes.
  523. session.channel.Close()
  524. }
  525. session.workers.Wait()
  526. if session.channel != nil {
  527. // Don't hold a reference to channel, allowing both it and
  528. // its conn to be garbage collected.
  529. // Setting channel to nil must happen after workers.Wait()
  530. // to ensure no goroutine remains which may access
  531. // session.channel.
  532. session.channel = nil
  533. }
  534. metricsUpdater := session.getMetricsUpdater()
  535. // interruptSession may be called for idle sessions, to ensure
  536. // the session is in an expected state: in ClientConnected,
  537. // and in server.Stop(); don't log in those cases.
  538. if wasRunning {
  539. session.metrics.checkpoint(
  540. server.config.Logger,
  541. metricsUpdater,
  542. "server_packet_metrics",
  543. packetMetricsAll)
  544. }
  545. // Release the downstream packet buffer, so the associated
  546. // memory is not consumed while no client is connected.
  547. //
  548. // Since runDeviceDownstream continues to run and will access
  549. // session.downstreamPackets, an atomic pointer is used to
  550. // synchronize access.
  551. session.setDownstreamPackets(nil)
  552. session.setCheckAllowedTCPPortFunc(nil)
  553. session.setCheckAllowedUDPPortFunc(nil)
  554. session.setCheckAllowedDomainFunc(nil)
  555. session.setFlowActivityUpdaterMaker(nil)
  556. session.setMetricsUpdater(nil)
  557. session.setDNSQualityReporter(nil)
  558. }
  559. func (server *Server) runSessionReaper() {
  560. defer server.workers.Done()
  561. // Periodically iterate over all sessions and discard expired
  562. // sessions. This action, removing the index from server.indexToSession,
  563. // releases the IP addresses assigned to the session.
  564. // TODO: As-is, this will discard sessions for live SSH tunnels,
  565. // as long as the SSH channel for such a session has been idle for
  566. // a sufficient period. Should the session be retained as long as
  567. // the SSH tunnel is alive (e.g., expose and call session.touch()
  568. // on keepalive events)? Or is it better to free up resources held
  569. // by idle sessions?
  570. idleExpiry := server.sessionIdleExpiry()
  571. ticker := time.NewTicker(idleExpiry / 2)
  572. defer ticker.Stop()
  573. for {
  574. select {
  575. case <-ticker.C:
  576. server.indexToSession.Range(func(_, value interface{}) bool {
  577. session := value.(*session)
  578. if session.expired(idleExpiry) {
  579. server.removeSession(session)
  580. }
  581. return true
  582. })
  583. case <-server.runContext.Done():
  584. return
  585. }
  586. }
  587. }
  588. func (server *Server) sessionIdleExpiry() time.Duration {
  589. sessionIdleExpirySeconds := DEFAULT_IDLE_SESSION_EXPIRY_SECONDS
  590. if server.config.SessionIdleExpirySeconds > 2 {
  591. sessionIdleExpirySeconds = server.config.SessionIdleExpirySeconds
  592. }
  593. return time.Duration(sessionIdleExpirySeconds) * time.Second
  594. }
  595. func (server *Server) removeSession(session *session) {
  596. server.sessionIDToIndex.Delete(session.sessionID)
  597. server.indexToSession.Delete(session.index)
  598. server.interruptSession(session)
  599. // Delete flows to ensure any pending flow metrics are reported.
  600. session.deleteFlows()
  601. }
  602. func (server *Server) runOrphanMetricsCheckpointer() {
  603. defer server.workers.Done()
  604. // Periodically log orphan packet metrics. Orphan metrics
  605. // are not associated with any session. This includes
  606. // packets that are rejected before they can be associated
  607. // with a session.
  608. ticker := time.NewTicker(ORPHAN_METRICS_CHECKPOINTER_PERIOD)
  609. defer ticker.Stop()
  610. for {
  611. done := false
  612. select {
  613. case <-ticker.C:
  614. case <-server.runContext.Done():
  615. done = true
  616. }
  617. // TODO: skip log if all zeros?
  618. server.orphanMetrics.checkpoint(
  619. server.config.Logger,
  620. nil,
  621. "server_orphan_packet_metrics",
  622. packetMetricsRejected)
  623. if done {
  624. return
  625. }
  626. }
  627. }
  628. func (server *Server) runDeviceDownstream() {
  629. defer server.workers.Done()
  630. // Read incoming packets from the tun device, parse and validate the
  631. // packets, map them to a session/client, perform rewriting, and relay
  632. // the packets to the client.
  633. for {
  634. readPacket, err := server.device.ReadPacket()
  635. select {
  636. case <-server.runContext.Done():
  637. // No error is logged as shutdown may have interrupted read.
  638. return
  639. default:
  640. }
  641. if err != nil {
  642. server.config.Logger.WithTraceFields(
  643. common.LogFields{"error": err}).Warning("read device packet failed")
  644. // May be temporary error condition, keep reading.
  645. continue
  646. }
  647. // destinationIPAddress determines which client receives this packet.
  648. // At this point, only enough of the packet is inspected to determine
  649. // this routing info; further validation happens in subsequent
  650. // processPacket in runClientDownstream.
  651. // Note that masquerading/NAT stands between the Internet and the tun
  652. // device, so arbitrary packets cannot be sent through to this point.
  653. // TODO: getPacketDestinationIPAddress and processPacket perform redundant
  654. // packet parsing; refactor to avoid extra work?
  655. destinationIPAddress, ok := getPacketDestinationIPAddress(
  656. server.orphanMetrics, packetDirectionServerDownstream, readPacket)
  657. if !ok {
  658. // Packet is dropped. Reason will be counted in orphan metrics.
  659. continue
  660. }
  661. // Map destination IP address to client session.
  662. index := server.convertIPAddressToIndex(destinationIPAddress)
  663. s, ok := server.indexToSession.Load(index)
  664. if !ok {
  665. server.orphanMetrics.rejectedPacket(
  666. packetDirectionServerDownstream, packetRejectNoSession)
  667. continue
  668. }
  669. session := s.(*session)
  670. downstreamPackets := session.getDownstreamPackets()
  671. // No downstreamPackets buffer is maintained when no client is
  672. // connected, so the packet is dropped.
  673. if downstreamPackets == nil {
  674. server.orphanMetrics.rejectedPacket(
  675. packetDirectionServerDownstream, packetRejectNoClient)
  676. continue
  677. }
  678. // Simply enqueue the packet for client handling, and move on to
  679. // read the next packet. The packet tunnel server multiplexes all
  680. // client packets through a single tun device, so we must not block
  681. // on client channel I/O here.
  682. //
  683. // When the queue is full, the packet is dropped. This is standard
  684. // behavior for routers, VPN servers, etc.
  685. //
  686. // TODO: processPacket is performed here, instead of runClientDownstream,
  687. // since packets are packed contiguously into the packet queue and if
  688. // the packet it to be omitted, that should be done before enqueuing.
  689. // The potential downside is that all packet processing is done in this
  690. // single thread of execution, blocking the next packet for the next
  691. // client. Try handing off the packet to another worker which will
  692. // call processPacket and Enqueue?
  693. // In downstream mode, processPacket rewrites the destination address
  694. // to the original client source IP address, and also rewrites DNS
  695. // packets. As documented in runClientUpstream, the original address
  696. // should already be populated via an upstream packet; if not, the
  697. // packet will be rejected.
  698. if !processPacket(
  699. session.metrics,
  700. session,
  701. nil,
  702. packetDirectionServerDownstream,
  703. readPacket) {
  704. // Packet is rejected and dropped. Reason will be counted in metrics.
  705. continue
  706. }
  707. downstreamPackets.Enqueue(readPacket)
  708. }
  709. }
  710. func (server *Server) runClientUpstream(session *session) {
  711. defer session.workers.Done()
  712. // Read incoming packets from the client channel, validate the packets,
  713. // perform rewriting, and send them through to the tun device.
  714. for {
  715. readPacket, err := session.channel.ReadPacket()
  716. select {
  717. case <-session.runContext.Done():
  718. // No error is logged as shutdown may have interrupted read.
  719. return
  720. default:
  721. }
  722. if err != nil {
  723. // Debug since channel I/O errors occur during normal operation.
  724. server.config.Logger.WithTraceFields(
  725. common.LogFields{"error": err}).Debug("read channel packet failed")
  726. // Tear down the session. Must be invoked asynchronously.
  727. go server.interruptSession(session)
  728. return
  729. }
  730. session.touch()
  731. // processPacket transparently rewrites the source address to the
  732. // session's assigned address and rewrites the destination of any
  733. // DNS packets destined to the transparent DNS resolver.
  734. //
  735. // The first time the source address is rewritten, the original
  736. // value is recorded so inbound packets can have the reverse
  737. // rewrite applied. This assumes that the client will send a
  738. // packet before receiving any packet, which is the case since
  739. // only clients can initiate TCP or UDP connections or flows.
  740. if !processPacket(
  741. session.metrics,
  742. session,
  743. nil,
  744. packetDirectionServerUpstream,
  745. readPacket) {
  746. // Packet is rejected and dropped. Reason will be counted in metrics.
  747. continue
  748. }
  749. err = server.device.WritePacket(readPacket)
  750. if err != nil {
  751. server.config.Logger.WithTraceFields(
  752. common.LogFields{"error": err}).Warning("write device packet failed")
  753. // May be temporary error condition, keep working. The packet is
  754. // most likely dropped.
  755. continue
  756. }
  757. }
  758. }
  759. func (server *Server) runClientDownstream(session *session) {
  760. defer session.workers.Done()
  761. // Dequeue, process, and relay packets to be sent to the client channel.
  762. for {
  763. downstreamPackets := session.getDownstreamPackets()
  764. // Note: downstreamPackets will not be nil, since this goroutine only
  765. // runs while the session has a connected client.
  766. packetBuffer, ok := downstreamPackets.DequeueFramedPackets(session.runContext)
  767. if !ok {
  768. // Dequeue aborted due to session.runContext.Done()
  769. return
  770. }
  771. err := session.channel.WriteFramedPackets(packetBuffer)
  772. if err != nil {
  773. // Debug since channel I/O errors occur during normal operation.
  774. server.config.Logger.WithTraceFields(
  775. common.LogFields{"error": err}).Debug("write channel packets failed")
  776. downstreamPackets.Replace(packetBuffer)
  777. // Tear down the session. Must be invoked asynchronously.
  778. go server.interruptSession(session)
  779. return
  780. }
  781. session.touch()
  782. downstreamPackets.Replace(packetBuffer)
  783. }
  784. }
  785. var (
  786. serverIPv4AddressCIDR = "10.0.0.1/8"
  787. transparentDNSResolverIPv4Address = net.ParseIP("10.0.0.2").To4() // 4-byte for rewriting
  788. _, privateSubnetIPv4, _ = net.ParseCIDR("10.0.0.0/8")
  789. assignedIPv4AddressTemplate = "10.%d.%d.%d"
  790. serverIPv6AddressCIDR = "fd19:ca83:e6d5:1c44:0000:0000:0000:0001/64"
  791. transparentDNSResolverIPv6Address = net.ParseIP("fd19:ca83:e6d5:1c44:0000:0000:0000:0002")
  792. _, privateSubnetIPv6, _ = net.ParseCIDR("fd19:ca83:e6d5:1c44::/64")
  793. assignedIPv6AddressTemplate = "fd19:ca83:e6d5:1c44:8c57:4434:ee%02x:%02x%02x"
  794. )
  795. func (server *Server) allocateIndex(newSession *session) error {
  796. // Find and assign an available index in the 24-bit index space.
  797. // The index directly maps to and so determines the assigned
  798. // IPv4 and IPv6 addresses.
  799. // Search is a random index selection followed by a linear probe.
  800. // TODO: is this the most effective (fast on average, simple) algorithm?
  801. max := 0x00FFFFFF
  802. randomInt := prng.Intn(max + 1)
  803. index := int32(randomInt)
  804. index &= int32(max)
  805. idleExpiry := server.sessionIdleExpiry()
  806. for tries := 0; tries < 100000; index++ {
  807. tries++
  808. // The index/address space isn't exactly 24-bits:
  809. // - 0 and 0x00FFFFFF are reserved since they map to
  810. // the network identifier (10.0.0.0) and broadcast
  811. // address (10.255.255.255) respectively
  812. // - 1 is reserved as the server tun device address,
  813. // (10.0.0.1, and IPv6 equivalent)
  814. // - 2 is reserved as the transparent DNS target
  815. // address (10.0.0.2, and IPv6 equivalent)
  816. if index <= 2 {
  817. continue
  818. }
  819. if index == 0x00FFFFFF {
  820. index = 0
  821. continue
  822. }
  823. IPv4Address := server.convertIndexToIPv4Address(index).To4()
  824. IPv6Address := server.convertIndexToIPv6Address(index)
  825. // Ensure that the index converts to valid IPs. This is not expected
  826. // to fail, but continuing with nil IPs will silently misroute
  827. // packets with rewritten source IPs.
  828. if IPv4Address == nil || IPv6Address == nil {
  829. server.config.Logger.WithTraceFields(
  830. common.LogFields{"index": index}).Warning("convert index to IP address failed")
  831. continue
  832. }
  833. if s, ok := server.indexToSession.LoadOrStore(index, newSession); ok {
  834. // Index is already in use or acquired concurrently.
  835. // If the existing session is expired, reap it and try again
  836. // to acquire it.
  837. existingSession := s.(*session)
  838. if existingSession.expired(idleExpiry) {
  839. server.removeSession(existingSession)
  840. // Try to acquire this index again. We can't fall through and
  841. // use this index as removeSession has cleared indexToSession.
  842. index--
  843. }
  844. continue
  845. }
  846. // Note: the To4() for assignedIPv4Address is essential since
  847. // that address value is assumed to be 4 bytes when rewriting.
  848. newSession.index = index
  849. newSession.assignedIPv4Address = IPv4Address
  850. newSession.assignedIPv6Address = IPv6Address
  851. server.sessionIDToIndex.Store(newSession.sessionID, index)
  852. server.resetRouting(newSession.assignedIPv4Address, newSession.assignedIPv6Address)
  853. return nil
  854. }
  855. return errors.TraceNew("unallocated index not found")
  856. }
  857. func (server *Server) resetRouting(IPv4Address, IPv6Address net.IP) {
  858. // Attempt to clear the NAT table of any existing connection
  859. // states. This will prevent the (already unlikely) delivery
  860. // of packets to the wrong client when an assigned IP address is
  861. // recycled. Silently has no effect on some platforms, see
  862. // resetNATTables implementations.
  863. err := resetNATTables(server.config, IPv4Address)
  864. if err != nil {
  865. server.config.Logger.WithTraceFields(
  866. common.LogFields{"error": err}).Warning("reset IPv4 routing failed")
  867. }
  868. err = resetNATTables(server.config, IPv6Address)
  869. if err != nil {
  870. server.config.Logger.WithTraceFields(
  871. common.LogFields{"error": err}).Warning("reset IPv6 routing failed")
  872. }
  873. }
  874. func (server *Server) convertIPAddressToIndex(IP net.IP) int32 {
  875. // Assumes IP is at least 3 bytes.
  876. size := len(IP)
  877. return int32(IP[size-3])<<16 | int32(IP[size-2])<<8 | int32(IP[size-1])
  878. }
  879. func (server *Server) convertIndexToIPv4Address(index int32) net.IP {
  880. return net.ParseIP(
  881. fmt.Sprintf(
  882. assignedIPv4AddressTemplate,
  883. (index>>16)&0xFF,
  884. (index>>8)&0xFF,
  885. index&0xFF))
  886. }
  887. func (server *Server) convertIndexToIPv6Address(index int32) net.IP {
  888. return net.ParseIP(
  889. fmt.Sprintf(
  890. assignedIPv6AddressTemplate,
  891. (index>>16)&0xFF,
  892. (index>>8)&0xFF,
  893. index&0xFF))
  894. }
  895. type session struct {
  896. // Note: 64-bit ints used with atomic operations are placed
  897. // at the start of struct to ensure 64-bit alignment.
  898. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  899. lastActivity int64
  900. lastFlowReapIndex int64
  901. downstreamPackets unsafe.Pointer
  902. checkAllowedTCPPortFunc unsafe.Pointer
  903. checkAllowedUDPPortFunc unsafe.Pointer
  904. checkAllowedDomainFunc unsafe.Pointer
  905. flowActivityUpdaterMaker unsafe.Pointer
  906. metricsUpdater unsafe.Pointer
  907. dnsQualityReporter unsafe.Pointer
  908. allowBogons bool
  909. metrics *packetMetrics
  910. sessionID string
  911. index int32
  912. enableDNSFlowTracking bool
  913. DNSResolverIPv4Addresses []net.IP
  914. TCPDNSResolverIPv4Index int
  915. assignedIPv4Address net.IP
  916. setOriginalIPv4Address int32
  917. originalIPv4Address net.IP
  918. DNSResolverIPv6Addresses []net.IP
  919. TCPDNSResolverIPv6Index int
  920. assignedIPv6Address net.IP
  921. setOriginalIPv6Address int32
  922. originalIPv6Address net.IP
  923. flows sync.Map
  924. workers *sync.WaitGroup
  925. mutex sync.Mutex
  926. channel *Channel
  927. runContext context.Context
  928. stopRunning context.CancelFunc
  929. }
  930. func (session *session) touch() {
  931. atomic.StoreInt64(&session.lastActivity, int64(monotime.Now()))
  932. }
  933. func (session *session) expired(idleExpiry time.Duration) bool {
  934. lastActivity := monotime.Time(atomic.LoadInt64(&session.lastActivity))
  935. return monotime.Since(lastActivity) > idleExpiry
  936. }
  937. func (session *session) setOriginalIPv4AddressIfNotSet(IPAddress net.IP) {
  938. if !atomic.CompareAndSwapInt32(&session.setOriginalIPv4Address, 0, 1) {
  939. return
  940. }
  941. // Make a copy of IPAddress; don't reference a slice of a reusable
  942. // packet buffer, which will be overwritten.
  943. session.originalIPv4Address = net.IP(append([]byte(nil), []byte(IPAddress)...))
  944. }
  945. func (session *session) getOriginalIPv4Address() net.IP {
  946. if atomic.LoadInt32(&session.setOriginalIPv4Address) == 0 {
  947. return nil
  948. }
  949. return session.originalIPv4Address
  950. }
  951. func (session *session) setOriginalIPv6AddressIfNotSet(IPAddress net.IP) {
  952. if !atomic.CompareAndSwapInt32(&session.setOriginalIPv6Address, 0, 1) {
  953. return
  954. }
  955. // Make a copy of IPAddress.
  956. session.originalIPv6Address = net.IP(append([]byte(nil), []byte(IPAddress)...))
  957. }
  958. func (session *session) getOriginalIPv6Address() net.IP {
  959. if atomic.LoadInt32(&session.setOriginalIPv6Address) == 0 {
  960. return nil
  961. }
  962. return session.originalIPv6Address
  963. }
  964. func (session *session) setDownstreamPackets(p *PacketQueue) {
  965. atomic.StorePointer(&session.downstreamPackets, unsafe.Pointer(p))
  966. }
  967. func (session *session) getDownstreamPackets() *PacketQueue {
  968. return (*PacketQueue)(atomic.LoadPointer(&session.downstreamPackets))
  969. }
  970. func (session *session) setCheckAllowedTCPPortFunc(p *AllowedPortChecker) {
  971. atomic.StorePointer(&session.checkAllowedTCPPortFunc, unsafe.Pointer(p))
  972. }
  973. func (session *session) getCheckAllowedTCPPortFunc() AllowedPortChecker {
  974. p := (*AllowedPortChecker)(atomic.LoadPointer(&session.checkAllowedTCPPortFunc))
  975. if p == nil {
  976. return nil
  977. }
  978. return *p
  979. }
  980. func (session *session) setCheckAllowedUDPPortFunc(p *AllowedPortChecker) {
  981. atomic.StorePointer(&session.checkAllowedUDPPortFunc, unsafe.Pointer(p))
  982. }
  983. func (session *session) getCheckAllowedUDPPortFunc() AllowedPortChecker {
  984. p := (*AllowedPortChecker)(atomic.LoadPointer(&session.checkAllowedUDPPortFunc))
  985. if p == nil {
  986. return nil
  987. }
  988. return *p
  989. }
  990. func (session *session) setCheckAllowedDomainFunc(p *AllowedDomainChecker) {
  991. atomic.StorePointer(&session.checkAllowedDomainFunc, unsafe.Pointer(p))
  992. }
  993. func (session *session) getCheckAllowedDomainFunc() AllowedDomainChecker {
  994. p := (*AllowedDomainChecker)(atomic.LoadPointer(&session.checkAllowedDomainFunc))
  995. if p == nil {
  996. return nil
  997. }
  998. return *p
  999. }
  1000. func (session *session) setFlowActivityUpdaterMaker(p *FlowActivityUpdaterMaker) {
  1001. atomic.StorePointer(&session.flowActivityUpdaterMaker, unsafe.Pointer(p))
  1002. }
  1003. func (session *session) getFlowActivityUpdaterMaker() FlowActivityUpdaterMaker {
  1004. p := (*FlowActivityUpdaterMaker)(atomic.LoadPointer(&session.flowActivityUpdaterMaker))
  1005. if p == nil {
  1006. return nil
  1007. }
  1008. return *p
  1009. }
  1010. func (session *session) setMetricsUpdater(p *MetricsUpdater) {
  1011. atomic.StorePointer(&session.metricsUpdater, unsafe.Pointer(p))
  1012. }
  1013. func (session *session) getMetricsUpdater() MetricsUpdater {
  1014. p := (*MetricsUpdater)(atomic.LoadPointer(&session.metricsUpdater))
  1015. if p == nil {
  1016. return nil
  1017. }
  1018. return *p
  1019. }
  1020. func (session *session) setDNSQualityReporter(p *DNSQualityReporter) {
  1021. atomic.StorePointer(&session.dnsQualityReporter, unsafe.Pointer(p))
  1022. }
  1023. func (session *session) getDNSQualityReporter() DNSQualityReporter {
  1024. p := (*DNSQualityReporter)(atomic.LoadPointer(&session.dnsQualityReporter))
  1025. if p == nil {
  1026. return nil
  1027. }
  1028. return *p
  1029. }
  1030. // flowID identifies an IP traffic flow using the conventional
  1031. // network 5-tuple. flowIDs track bidirectional flows.
  1032. type flowID struct {
  1033. downstreamIPAddress [net.IPv6len]byte
  1034. downstreamPort uint16
  1035. upstreamIPAddress [net.IPv6len]byte
  1036. upstreamPort uint16
  1037. protocol internetProtocol
  1038. }
  1039. // From: https://github.com/golang/go/blob/b88efc7e7ac15f9e0b5d8d9c82f870294f6a3839/src/net/ip.go#L55
  1040. var v4InV6Prefix = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}
  1041. func (f *flowID) set(
  1042. downstreamIPAddress net.IP,
  1043. downstreamPort uint16,
  1044. upstreamIPAddress net.IP,
  1045. upstreamPort uint16,
  1046. protocol internetProtocol) {
  1047. if len(downstreamIPAddress) == net.IPv4len {
  1048. copy(f.downstreamIPAddress[:], v4InV6Prefix)
  1049. copy(f.downstreamIPAddress[len(v4InV6Prefix):], downstreamIPAddress)
  1050. } else { // net.IPv6len
  1051. copy(f.downstreamIPAddress[:], downstreamIPAddress)
  1052. }
  1053. f.downstreamPort = downstreamPort
  1054. if len(upstreamIPAddress) == net.IPv4len {
  1055. copy(f.upstreamIPAddress[:], v4InV6Prefix)
  1056. copy(f.upstreamIPAddress[len(v4InV6Prefix):], upstreamIPAddress)
  1057. } else { // net.IPv6len
  1058. copy(f.upstreamIPAddress[:], upstreamIPAddress)
  1059. }
  1060. f.upstreamPort = upstreamPort
  1061. f.protocol = protocol
  1062. }
  1063. type flowState struct {
  1064. // Note: 64-bit ints used with atomic operations are placed
  1065. // at the start of struct to ensure 64-bit alignment.
  1066. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  1067. firstUpstreamPacketTime int64
  1068. lastUpstreamPacketTime int64
  1069. firstDownstreamPacketTime int64
  1070. lastDownstreamPacketTime int64
  1071. isDNS bool
  1072. dnsQualityReporter DNSQualityReporter
  1073. activityUpdaters []FlowActivityUpdater
  1074. }
  1075. func (flowState *flowState) expired(idleExpiry time.Duration) bool {
  1076. now := monotime.Now()
  1077. // Traffic in either direction keeps the flow alive. Initially, only one of
  1078. // lastUpstreamPacketTime or lastDownstreamPacketTime will be set by
  1079. // startTrackingFlow, and the other value will be 0 and evaluate as expired.
  1080. return (now.Sub(monotime.Time(atomic.LoadInt64(&flowState.lastUpstreamPacketTime))) > idleExpiry) &&
  1081. (now.Sub(monotime.Time(atomic.LoadInt64(&flowState.lastDownstreamPacketTime))) > idleExpiry)
  1082. }
  1083. // isTrackingFlow checks if a flow is being tracked.
  1084. func (session *session) isTrackingFlow(ID flowID) bool {
  1085. f, ok := session.flows.Load(ID)
  1086. if !ok {
  1087. return false
  1088. }
  1089. flowState := f.(*flowState)
  1090. // Check if flow is expired but not yet reaped.
  1091. if flowState.expired(FLOW_IDLE_EXPIRY) {
  1092. session.deleteFlow(ID, flowState)
  1093. return false
  1094. }
  1095. return true
  1096. }
  1097. // startTrackingFlow starts flow tracking for the flow identified
  1098. // by ID.
  1099. //
  1100. // Flow tracking is used to implement:
  1101. // - one-time permissions checks for a flow
  1102. // - OSLs
  1103. // - domain bytes transferred [TODO]
  1104. // - DNS quality metrics
  1105. //
  1106. // The applicationData from the first packet in the flow is
  1107. // inspected to determine any associated hostname, using HTTP or
  1108. // TLS payload. The session's FlowActivityUpdaterMaker is invoked
  1109. // to determine a list of updaters to track flow activity.
  1110. //
  1111. // Updaters receive reports with the number of application data
  1112. // bytes in each flow packet. This number, totalled for all packets
  1113. // in a flow, may exceed the total bytes transferred at the
  1114. // application level due to TCP retransmission. Currently, the flow
  1115. // tracking logic doesn't exclude retransmitted packets from update
  1116. // reporting.
  1117. //
  1118. // Flows are untracked after an idle expiry period. Transport
  1119. // protocol indicators of end of flow, such as FIN or RST for TCP,
  1120. // which may or may not appear in a flow, are not currently used.
  1121. //
  1122. // startTrackingFlow may be called from concurrent goroutines; if
  1123. // the flow is already tracked, it is simply updated.
  1124. func (session *session) startTrackingFlow(
  1125. ID flowID,
  1126. direction packetDirection,
  1127. applicationData []byte,
  1128. isDNS bool) {
  1129. now := int64(monotime.Now())
  1130. // Once every period, iterate over flows and reap expired entries.
  1131. reapIndex := now / int64(monotime.Time(FLOW_IDLE_EXPIRY/2))
  1132. previousReapIndex := atomic.LoadInt64(&session.lastFlowReapIndex)
  1133. if reapIndex != previousReapIndex &&
  1134. atomic.CompareAndSwapInt64(&session.lastFlowReapIndex, previousReapIndex, reapIndex) {
  1135. session.reapFlows()
  1136. }
  1137. var isTCP bool
  1138. var hostname string
  1139. if ID.protocol == internetProtocolTCP {
  1140. // TODO: implement
  1141. // hostname = common.ExtractHostnameFromTCPFlow(applicationData)
  1142. isTCP = true
  1143. }
  1144. var activityUpdaters []FlowActivityUpdater
  1145. // Don't incur activity monitor overhead for DNS requests
  1146. if !isDNS {
  1147. flowActivityUpdaterMaker := session.getFlowActivityUpdaterMaker()
  1148. if flowActivityUpdaterMaker != nil {
  1149. activityUpdaters = flowActivityUpdaterMaker(
  1150. isTCP,
  1151. hostname,
  1152. net.IP(ID.upstreamIPAddress[:]))
  1153. }
  1154. }
  1155. flowState := &flowState{
  1156. isDNS: isDNS,
  1157. activityUpdaters: activityUpdaters,
  1158. dnsQualityReporter: session.getDNSQualityReporter(),
  1159. }
  1160. if direction == packetDirectionServerUpstream {
  1161. flowState.firstUpstreamPacketTime = now
  1162. flowState.lastUpstreamPacketTime = now
  1163. } else {
  1164. flowState.firstDownstreamPacketTime = now
  1165. flowState.lastDownstreamPacketTime = now
  1166. }
  1167. // LoadOrStore will retain any existing entry
  1168. session.flows.LoadOrStore(ID, flowState)
  1169. session.updateFlow(ID, direction, applicationData)
  1170. }
  1171. func (session *session) updateFlow(
  1172. ID flowID,
  1173. direction packetDirection,
  1174. applicationData []byte) {
  1175. f, ok := session.flows.Load(ID)
  1176. if !ok {
  1177. return
  1178. }
  1179. flowState := f.(*flowState)
  1180. // Note: no expired check here, since caller is assumed to
  1181. // have just called isTrackingFlow.
  1182. now := int64(monotime.Now())
  1183. var upstreamBytes, downstreamBytes, durationNanoseconds int64
  1184. if direction == packetDirectionServerUpstream {
  1185. upstreamBytes = int64(len(applicationData))
  1186. atomic.CompareAndSwapInt64(&flowState.firstUpstreamPacketTime, 0, now)
  1187. atomic.StoreInt64(&flowState.lastUpstreamPacketTime, now)
  1188. } else {
  1189. downstreamBytes = int64(len(applicationData))
  1190. atomic.CompareAndSwapInt64(&flowState.firstDownstreamPacketTime, 0, now)
  1191. // Follows common.ActivityMonitoredConn semantics, where
  1192. // duration is updated only for downstream activity. This
  1193. // is intened to produce equivalent behaviour for port
  1194. // forward clients (tracked with ActivityUpdaters) and
  1195. // packet tunnel clients (tracked with FlowActivityUpdaters).
  1196. durationNanoseconds = now - atomic.SwapInt64(&flowState.lastDownstreamPacketTime, now)
  1197. }
  1198. for _, updater := range flowState.activityUpdaters {
  1199. updater.UpdateProgress(downstreamBytes, upstreamBytes, durationNanoseconds)
  1200. }
  1201. }
  1202. // deleteFlow stops tracking a flow and logs any outstanding metrics.
  1203. // flowState is passed in to avoid duplicating the lookup that all callers
  1204. // have already performed.
  1205. func (session *session) deleteFlow(ID flowID, flowState *flowState) {
  1206. if flowState.isDNS {
  1207. dnsStartTime := monotime.Time(
  1208. atomic.LoadInt64(&flowState.firstUpstreamPacketTime))
  1209. if dnsStartTime > 0 {
  1210. // Record DNS quality metrics using a heuristic: if a packet was sent and
  1211. // then a packet was received, assume the DNS request successfully received
  1212. // a valid response; failure occurs when the resolver fails to provide a
  1213. // response; a "no such host" response is still a success. Limitations: we
  1214. // assume a resolver will not respond when, e.g., rate limiting; we ignore
  1215. // subsequent requests made via the same UDP/TCP flow; deleteFlow may be
  1216. // called only after the flow has expired, which adds some delay to the
  1217. // recording of the DNS metric.
  1218. dnsEndTime := monotime.Time(
  1219. atomic.LoadInt64(&flowState.firstDownstreamPacketTime))
  1220. dnsSuccess := true
  1221. if dnsEndTime == 0 {
  1222. dnsSuccess = false
  1223. dnsEndTime = monotime.Now()
  1224. }
  1225. resolveElapsedTime := dnsEndTime.Sub(dnsStartTime)
  1226. if flowState.dnsQualityReporter != nil {
  1227. flowState.dnsQualityReporter(
  1228. dnsSuccess,
  1229. resolveElapsedTime,
  1230. net.IP(ID.upstreamIPAddress[:]))
  1231. }
  1232. }
  1233. }
  1234. session.flows.Delete(ID)
  1235. }
  1236. // reapFlows removes expired idle flows.
  1237. func (session *session) reapFlows() {
  1238. session.flows.Range(func(key, value interface{}) bool {
  1239. flowState := value.(*flowState)
  1240. if flowState.expired(FLOW_IDLE_EXPIRY) {
  1241. session.deleteFlow(key.(flowID), flowState)
  1242. }
  1243. return true
  1244. })
  1245. }
  1246. // deleteFlows deletes all flows.
  1247. func (session *session) deleteFlows() {
  1248. session.flows.Range(func(key, value interface{}) bool {
  1249. session.deleteFlow(key.(flowID), value.(*flowState))
  1250. return true
  1251. })
  1252. }
  1253. type packetMetrics struct {
  1254. upstreamRejectReasons [packetRejectReasonCount]int64
  1255. downstreamRejectReasons [packetRejectReasonCount]int64
  1256. TCPIPv4 relayedPacketMetrics
  1257. TCPIPv6 relayedPacketMetrics
  1258. UDPIPv4 relayedPacketMetrics
  1259. UDPIPv6 relayedPacketMetrics
  1260. }
  1261. type relayedPacketMetrics struct {
  1262. packetsUp int64
  1263. packetsDown int64
  1264. bytesUp int64
  1265. bytesDown int64
  1266. applicationBytesUp int64
  1267. applicationBytesDown int64
  1268. }
  1269. func (metrics *packetMetrics) rejectedPacket(
  1270. direction packetDirection,
  1271. reason packetRejectReason) {
  1272. if direction == packetDirectionServerUpstream ||
  1273. direction == packetDirectionClientUpstream {
  1274. atomic.AddInt64(&metrics.upstreamRejectReasons[reason], 1)
  1275. } else { // packetDirectionDownstream
  1276. atomic.AddInt64(&metrics.downstreamRejectReasons[reason], 1)
  1277. }
  1278. }
  1279. func (metrics *packetMetrics) relayedPacket(
  1280. direction packetDirection,
  1281. version int,
  1282. protocol internetProtocol,
  1283. packetLength, applicationDataLength int) {
  1284. var packetsMetric, bytesMetric, applicationBytesMetric *int64
  1285. if direction == packetDirectionServerUpstream ||
  1286. direction == packetDirectionClientUpstream {
  1287. if version == 4 {
  1288. if protocol == internetProtocolTCP {
  1289. packetsMetric = &metrics.TCPIPv4.packetsUp
  1290. bytesMetric = &metrics.TCPIPv4.bytesUp
  1291. applicationBytesMetric = &metrics.TCPIPv4.applicationBytesUp
  1292. } else { // UDP
  1293. packetsMetric = &metrics.UDPIPv4.packetsUp
  1294. bytesMetric = &metrics.UDPIPv4.bytesUp
  1295. applicationBytesMetric = &metrics.UDPIPv4.applicationBytesUp
  1296. }
  1297. } else { // IPv6
  1298. if protocol == internetProtocolTCP {
  1299. packetsMetric = &metrics.TCPIPv6.packetsUp
  1300. bytesMetric = &metrics.TCPIPv6.bytesUp
  1301. applicationBytesMetric = &metrics.TCPIPv6.applicationBytesUp
  1302. } else { // UDP
  1303. packetsMetric = &metrics.UDPIPv6.packetsUp
  1304. bytesMetric = &metrics.UDPIPv6.bytesUp
  1305. applicationBytesMetric = &metrics.UDPIPv6.applicationBytesUp
  1306. }
  1307. }
  1308. } else { // packetDirectionDownstream
  1309. if version == 4 {
  1310. if protocol == internetProtocolTCP {
  1311. packetsMetric = &metrics.TCPIPv4.packetsDown
  1312. bytesMetric = &metrics.TCPIPv4.bytesDown
  1313. applicationBytesMetric = &metrics.TCPIPv4.applicationBytesDown
  1314. } else { // UDP
  1315. packetsMetric = &metrics.UDPIPv4.packetsDown
  1316. bytesMetric = &metrics.UDPIPv4.bytesDown
  1317. applicationBytesMetric = &metrics.UDPIPv4.applicationBytesDown
  1318. }
  1319. } else { // IPv6
  1320. if protocol == internetProtocolTCP {
  1321. packetsMetric = &metrics.TCPIPv6.packetsDown
  1322. bytesMetric = &metrics.TCPIPv6.bytesDown
  1323. applicationBytesMetric = &metrics.TCPIPv6.applicationBytesDown
  1324. } else { // UDP
  1325. packetsMetric = &metrics.UDPIPv6.packetsDown
  1326. bytesMetric = &metrics.UDPIPv6.bytesDown
  1327. applicationBytesMetric = &metrics.UDPIPv6.applicationBytesDown
  1328. }
  1329. }
  1330. }
  1331. atomic.AddInt64(packetsMetric, 1)
  1332. atomic.AddInt64(bytesMetric, int64(packetLength))
  1333. atomic.AddInt64(applicationBytesMetric, int64(applicationDataLength))
  1334. }
  1335. const (
  1336. packetMetricsRejected = 1
  1337. packetMetricsRelayed = 2
  1338. packetMetricsAll = packetMetricsRejected | packetMetricsRelayed
  1339. )
  1340. func (metrics *packetMetrics) checkpoint(
  1341. logger common.Logger, updater MetricsUpdater, logName string, whichMetrics int) {
  1342. // Report all metric counters in a single log message. Each
  1343. // counter is reset to 0 when added to the log.
  1344. logFields := make(common.LogFields)
  1345. if whichMetrics&packetMetricsRejected != 0 {
  1346. for i := 0; i < packetRejectReasonCount; i++ {
  1347. logFields["upstream_packet_rejected_"+packetRejectReasonDescription(packetRejectReason(i))] =
  1348. atomic.SwapInt64(&metrics.upstreamRejectReasons[i], 0)
  1349. logFields["downstream_packet_rejected_"+packetRejectReasonDescription(packetRejectReason(i))] =
  1350. atomic.SwapInt64(&metrics.downstreamRejectReasons[i], 0)
  1351. }
  1352. }
  1353. if whichMetrics&packetMetricsRelayed != 0 {
  1354. var TCPApplicationBytesUp, TCPApplicationBytesDown,
  1355. UDPApplicationBytesUp, UDPApplicationBytesDown int64
  1356. relayedMetrics := []struct {
  1357. prefix string
  1358. metrics *relayedPacketMetrics
  1359. updaterBytesUp *int64
  1360. updaterBytesDown *int64
  1361. }{
  1362. {"tcp_ipv4_", &metrics.TCPIPv4, &TCPApplicationBytesUp, &TCPApplicationBytesDown},
  1363. {"tcp_ipv6_", &metrics.TCPIPv6, &TCPApplicationBytesUp, &TCPApplicationBytesDown},
  1364. {"udp_ipv4_", &metrics.UDPIPv4, &UDPApplicationBytesUp, &UDPApplicationBytesDown},
  1365. {"udp_ipv6_", &metrics.UDPIPv6, &UDPApplicationBytesUp, &UDPApplicationBytesDown},
  1366. }
  1367. for _, r := range relayedMetrics {
  1368. applicationBytesUp := atomic.SwapInt64(&r.metrics.applicationBytesUp, 0)
  1369. applicationBytesDown := atomic.SwapInt64(&r.metrics.applicationBytesDown, 0)
  1370. *r.updaterBytesUp += applicationBytesUp
  1371. *r.updaterBytesDown += applicationBytesDown
  1372. logFields[r.prefix+"packets_up"] = atomic.SwapInt64(&r.metrics.packetsUp, 0)
  1373. logFields[r.prefix+"packets_down"] = atomic.SwapInt64(&r.metrics.packetsDown, 0)
  1374. logFields[r.prefix+"bytes_up"] = atomic.SwapInt64(&r.metrics.bytesUp, 0)
  1375. logFields[r.prefix+"bytes_down"] = atomic.SwapInt64(&r.metrics.bytesDown, 0)
  1376. logFields[r.prefix+"application_bytes_up"] = applicationBytesUp
  1377. logFields[r.prefix+"application_bytes_down"] = applicationBytesDown
  1378. }
  1379. if updater != nil {
  1380. updater(
  1381. TCPApplicationBytesDown, TCPApplicationBytesUp,
  1382. UDPApplicationBytesDown, UDPApplicationBytesUp)
  1383. }
  1384. }
  1385. logger.LogMetric(logName, logFields)
  1386. }
  1387. // PacketQueue is a fixed-size, preallocated queue of packets.
  1388. // Enqueued packets are packed into a contiguous buffer with channel
  1389. // framing, allowing the entire queue to be written to a channel
  1390. // in a single call.
  1391. // Reuse of the queue buffers avoids GC churn. To avoid memory use
  1392. // spikes when many clients connect and may disconnect before relaying
  1393. // packets, the packet queue buffers start small and grow when required,
  1394. // up to the maximum size, and then remain static.
  1395. type PacketQueue struct {
  1396. maxSize int
  1397. emptyBuffers chan []byte
  1398. activeBuffer chan []byte
  1399. }
  1400. // NewPacketQueue creates a new PacketQueue.
  1401. // The caller must ensure that maxSize exceeds the
  1402. // packet MTU, or packets will will never enqueue.
  1403. func NewPacketQueue(maxSize int) *PacketQueue {
  1404. // Two buffers of size up to maxSize are allocated, to
  1405. // allow packets to continue to enqueue while one buffer
  1406. // is borrowed by the DequeueFramedPackets caller.
  1407. //
  1408. // TODO: is there a way to implement this without
  1409. // allocating up to 2x maxSize bytes? A circular queue
  1410. // won't work because we want DequeueFramedPackets
  1411. // to return a contiguous buffer. Perhaps a Bip
  1412. // Buffer would work here:
  1413. // https://www.codeproject.com/Articles/3479/The-Bip-Buffer-The-Circular-Buffer-with-a-Twist
  1414. queue := &PacketQueue{
  1415. maxSize: maxSize,
  1416. emptyBuffers: make(chan []byte, 2),
  1417. activeBuffer: make(chan []byte, 1),
  1418. }
  1419. queue.emptyBuffers <- make([]byte, 0)
  1420. queue.emptyBuffers <- make([]byte, 0)
  1421. return queue
  1422. }
  1423. // Enqueue adds a packet to the queue.
  1424. // If the queue is full, the packet is dropped.
  1425. // Enqueue is _not_ safe for concurrent calls.
  1426. func (queue *PacketQueue) Enqueue(packet []byte) {
  1427. var buffer []byte
  1428. select {
  1429. case buffer = <-queue.activeBuffer:
  1430. default:
  1431. buffer = <-queue.emptyBuffers
  1432. }
  1433. packetSize := len(packet)
  1434. if queue.maxSize-len(buffer) >= channelHeaderSize+packetSize {
  1435. // Assumes len(packet)/MTU <= 64K
  1436. var channelHeader [channelHeaderSize]byte
  1437. binary.BigEndian.PutUint16(channelHeader[:], uint16(packetSize))
  1438. // Once the buffer has reached maxSize capacity
  1439. // and been replaced (buffer = buffer[0:0]), these
  1440. // appends should no longer allocate new memory and
  1441. // should just copy to preallocated memory.
  1442. buffer = append(buffer, channelHeader[:]...)
  1443. buffer = append(buffer, packet...)
  1444. }
  1445. // Else, queue is full, so drop packet.
  1446. queue.activeBuffer <- buffer
  1447. }
  1448. // DequeueFramedPackets waits until at least one packet is
  1449. // enqueued, and then returns a packet buffer containing one
  1450. // or more framed packets. The returned buffer remains part
  1451. // of the PacketQueue structure and the caller _must_ replace
  1452. // the buffer by calling Replace.
  1453. // DequeueFramedPackets unblocks and returns false if it receives
  1454. // runContext.Done().
  1455. // DequeueFramedPackets is _not_ safe for concurrent calls.
  1456. func (queue *PacketQueue) DequeueFramedPackets(
  1457. runContext context.Context) ([]byte, bool) {
  1458. var buffer []byte
  1459. select {
  1460. case buffer = <-queue.activeBuffer:
  1461. case <-runContext.Done():
  1462. return nil, false
  1463. }
  1464. return buffer, true
  1465. }
  1466. // Replace returns the buffer to the PacketQueue to be
  1467. // reused.
  1468. // The input must be a return value from DequeueFramedPackets.
  1469. func (queue *PacketQueue) Replace(buffer []byte) {
  1470. buffer = buffer[0:0]
  1471. // This won't block (as long as it is a DequeueFramedPackets return value).
  1472. queue.emptyBuffers <- buffer
  1473. }
  1474. // ClientConfig specifies the configuration of a packet tunnel client.
  1475. type ClientConfig struct {
  1476. // Logger is used for logging events and metrics.
  1477. Logger common.Logger
  1478. // SudoNetworkConfigCommands specifies whether to use "sudo"
  1479. // when executing network configuration commands. See description
  1480. // for ServerConfig.SudoNetworkConfigCommands.
  1481. SudoNetworkConfigCommands bool
  1482. // AllowNoIPv6NetworkConfiguration indicates that failures while
  1483. // configuring tun interfaces and routing for IPv6 are to be
  1484. // logged as warnings only. See description for
  1485. // ServerConfig.AllowNoIPv6NetworkConfiguration.
  1486. AllowNoIPv6NetworkConfiguration bool
  1487. // MTU is the packet MTU value to use; this value
  1488. // should be obtained from the packet tunnel server.
  1489. // When MTU is 0, a default value is used.
  1490. MTU int
  1491. // UpstreamPacketQueueSize specifies the size of the upstream
  1492. // packet queue.
  1493. // When UpstreamPacketQueueSize is 0, a default value tuned for
  1494. // Psiphon is used.
  1495. UpstreamPacketQueueSize int
  1496. // Transport is an established transport channel that
  1497. // will be used to relay packets to and from a packet
  1498. // tunnel server.
  1499. Transport io.ReadWriteCloser
  1500. // TunFileDescriptor specifies a file descriptor to use to
  1501. // read and write packets to be relayed to the client. When
  1502. // TunFileDescriptor is specified, the Client will use this
  1503. // existing tun device and not create its own; in this case,
  1504. // network address and routing configuration is not performed
  1505. // by the Client. As the packet tunnel server performs
  1506. // transparent source IP address and DNS rewriting, the tun
  1507. // device may have any assigned IP address, but should be
  1508. // configured with the given MTU; and DNS should be configured
  1509. // to use the specified transparent DNS resolver addresses.
  1510. // Set TunFileDescriptor to <= 0 to ignore this parameter
  1511. // and create and configure a tun device.
  1512. TunFileDescriptor int
  1513. // IPv4AddressCIDR is the IPv4 address and netmask to
  1514. // assign to a newly created tun device.
  1515. IPv4AddressCIDR string
  1516. // IPv6AddressCIDR is the IPv6 address and prefix to
  1517. // assign to a newly created tun device.
  1518. IPv6AddressCIDR string
  1519. // TransparentDNSIPv4Address is the IPv4 address of the DNS server
  1520. // configured by a VPN using a packet tunnel. All DNS packets
  1521. // destined to this DNS server are transparently redirected to
  1522. // the Psiphon server DNS.
  1523. TransparentDNSIPv4Address string
  1524. // TransparentDNSIPv4Address is the IPv6 address of the DNS server
  1525. // configured by a VPN using a packet tunnel. All DNS packets
  1526. // destined to this DNS server are transparently redirected to
  1527. // the Psiphon server DNS.
  1528. TransparentDNSIPv6Address string
  1529. // RouteDestinations are hosts (IPs) or networks (CIDRs)
  1530. // to be configured to be routed through a newly
  1531. // created tun device.
  1532. RouteDestinations []string
  1533. }
  1534. // Client is a packet tunnel client. A packet tunnel client
  1535. // relays packets between a local tun device and a packet
  1536. // tunnel server via a transport channel.
  1537. type Client struct {
  1538. config *ClientConfig
  1539. transparentDNS *clientTransparentDNS
  1540. device *Device
  1541. channel *Channel
  1542. upstreamPackets *PacketQueue
  1543. metrics *packetMetrics
  1544. runContext context.Context
  1545. stopRunning context.CancelFunc
  1546. workers *sync.WaitGroup
  1547. }
  1548. // clientTransparentDNS caches the parsed representions of
  1549. // TransparentDNSIPv4/6Address for fast packet processing and rewriting.
  1550. type clientTransparentDNS struct {
  1551. IPv4Address net.IP
  1552. IPv6Address net.IP
  1553. }
  1554. func newClientTransparentDNS(
  1555. IPv4Address, IPv6Address string) (*clientTransparentDNS, error) {
  1556. var IPv4, IPv6 net.IP
  1557. if IPv4Address != "" {
  1558. IPv4 = net.ParseIP(IPv4Address)
  1559. if IPv4 != nil {
  1560. IPv4 = IPv4.To4()
  1561. }
  1562. if IPv4 == nil {
  1563. return nil, errors.TraceNew("invalid IPv4 address")
  1564. }
  1565. }
  1566. if IPv6Address != "" {
  1567. IPv6 = net.ParseIP(IPv6Address)
  1568. if IPv6 == nil || IPv6.To4() != nil {
  1569. return nil, errors.TraceNew("invalid IPv6 address")
  1570. }
  1571. }
  1572. return &clientTransparentDNS{
  1573. IPv4Address: IPv4,
  1574. IPv6Address: IPv6,
  1575. }, nil
  1576. }
  1577. // NewClient initializes a new Client. Unless using the
  1578. // TunFileDescriptor configuration parameter, a new tun
  1579. // device is created for the client.
  1580. func NewClient(config *ClientConfig) (*Client, error) {
  1581. var device *Device
  1582. var err error
  1583. if config.TunFileDescriptor > 0 {
  1584. device, err = NewClientDeviceFromFD(config)
  1585. } else {
  1586. device, err = NewClientDevice(config)
  1587. }
  1588. if err != nil {
  1589. return nil, errors.Trace(err)
  1590. }
  1591. upstreamPacketQueueSize := DEFAULT_UPSTREAM_PACKET_QUEUE_SIZE
  1592. if config.UpstreamPacketQueueSize > 0 {
  1593. upstreamPacketQueueSize = config.UpstreamPacketQueueSize
  1594. }
  1595. transparentDNS, err := newClientTransparentDNS(
  1596. config.TransparentDNSIPv4Address,
  1597. config.TransparentDNSIPv6Address)
  1598. if err != nil {
  1599. return nil, errors.Trace(err)
  1600. }
  1601. runContext, stopRunning := context.WithCancel(context.Background())
  1602. return &Client{
  1603. config: config,
  1604. transparentDNS: transparentDNS,
  1605. device: device,
  1606. channel: NewChannel(config.Transport, getMTU(config.MTU)),
  1607. upstreamPackets: NewPacketQueue(upstreamPacketQueueSize),
  1608. metrics: new(packetMetrics),
  1609. runContext: runContext,
  1610. stopRunning: stopRunning,
  1611. workers: new(sync.WaitGroup),
  1612. }, nil
  1613. }
  1614. // Start starts a client and returns with it running.
  1615. func (client *Client) Start() {
  1616. client.config.Logger.WithTrace().Info("starting")
  1617. client.workers.Add(1)
  1618. go func() {
  1619. defer client.workers.Done()
  1620. for {
  1621. readPacket, err := client.device.ReadPacket()
  1622. select {
  1623. case <-client.runContext.Done():
  1624. // No error is logged as shutdown may have interrupted read.
  1625. return
  1626. default:
  1627. }
  1628. if err != nil {
  1629. client.config.Logger.WithTraceFields(
  1630. common.LogFields{"error": err}).Info("read device packet failed")
  1631. // May be temporary error condition, keep working.
  1632. continue
  1633. }
  1634. // processPacket will check for packets the server will reject
  1635. // and drop those without sending.
  1636. // Limitation: packet metrics, including successful relay count,
  1637. // are incremented _before_ the packet is written to the channel.
  1638. if !processPacket(
  1639. client.metrics,
  1640. nil,
  1641. client.transparentDNS,
  1642. packetDirectionClientUpstream,
  1643. readPacket) {
  1644. continue
  1645. }
  1646. // Instead of immediately writing to the channel, the
  1647. // packet is enqueued, which has the effect of batching
  1648. // up IP packets into a single channel packet (for Psiphon,
  1649. // an SSH packet) to minimize overhead and, as benchmarked,
  1650. // improve throughput.
  1651. // Packet will be dropped if queue is full.
  1652. client.upstreamPackets.Enqueue(readPacket)
  1653. }
  1654. }()
  1655. client.workers.Add(1)
  1656. go func() {
  1657. defer client.workers.Done()
  1658. for {
  1659. packetBuffer, ok := client.upstreamPackets.DequeueFramedPackets(client.runContext)
  1660. if !ok {
  1661. // Dequeue aborted due to session.runContext.Done()
  1662. return
  1663. }
  1664. err := client.channel.WriteFramedPackets(packetBuffer)
  1665. client.upstreamPackets.Replace(packetBuffer)
  1666. if err != nil {
  1667. client.config.Logger.WithTraceFields(
  1668. common.LogFields{"error": err}).Info("write channel packets failed")
  1669. // May be temporary error condition, such as reconnecting the tunnel;
  1670. // keep working. The packets are most likely dropped.
  1671. continue
  1672. }
  1673. }
  1674. }()
  1675. client.workers.Add(1)
  1676. go func() {
  1677. defer client.workers.Done()
  1678. for {
  1679. readPacket, err := client.channel.ReadPacket()
  1680. select {
  1681. case <-client.runContext.Done():
  1682. // No error is logged as shutdown may have interrupted read.
  1683. return
  1684. default:
  1685. }
  1686. if err != nil {
  1687. client.config.Logger.WithTraceFields(
  1688. common.LogFields{"error": err}).Info("read channel packet failed")
  1689. // May be temporary error condition, such as reconnecting the tunnel;
  1690. // keep working.
  1691. continue
  1692. }
  1693. if !processPacket(
  1694. client.metrics,
  1695. nil,
  1696. client.transparentDNS,
  1697. packetDirectionClientDownstream,
  1698. readPacket) {
  1699. continue
  1700. }
  1701. err = client.device.WritePacket(readPacket)
  1702. if err != nil {
  1703. client.config.Logger.WithTraceFields(
  1704. common.LogFields{"error": err}).Info("write device packet failed")
  1705. // May be temporary error condition, keep working. The packet is
  1706. // most likely dropped.
  1707. continue
  1708. }
  1709. }
  1710. }()
  1711. }
  1712. // Stop halts a running client.
  1713. func (client *Client) Stop() {
  1714. client.config.Logger.WithTrace().Info("stopping")
  1715. client.stopRunning()
  1716. client.device.Close()
  1717. client.channel.Close()
  1718. client.workers.Wait()
  1719. client.metrics.checkpoint(
  1720. client.config.Logger, nil, "packet_metrics", packetMetricsAll)
  1721. client.config.Logger.WithTrace().Info("stopped")
  1722. }
  1723. /*
  1724. Packet offset constants in getPacketDestinationIPAddress and
  1725. processPacket are from the following RFC definitions.
  1726. IPv4 header: https://tools.ietf.org/html/rfc791
  1727. 0 1 2 3
  1728. 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
  1729. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1730. |Version| IHL |Type of Service| Total Length |
  1731. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1732. | Identification |Flags| Fragment Offset |
  1733. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1734. | Time to Live | Protocol | Header Checksum |
  1735. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1736. | Source Address |
  1737. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1738. | Destination Address |
  1739. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1740. | Options | Padding |
  1741. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1742. IPv6 header: https://tools.ietf.org/html/rfc2460
  1743. 0 1 2 3
  1744. 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
  1745. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1746. |Version| Traffic Class | Flow Label |
  1747. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1748. | Payload Length | Next Header | Hop Limit |
  1749. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1750. | |
  1751. + +
  1752. | |
  1753. + Source Address +
  1754. | |
  1755. + +
  1756. | |
  1757. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1758. | |
  1759. + +
  1760. | |
  1761. + Destination Address +
  1762. | |
  1763. + +
  1764. | |
  1765. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1766. TCP header: https://tools.ietf.org/html/rfc793
  1767. 0 1 2 3
  1768. 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
  1769. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1770. | Source Port | Destination Port |
  1771. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1772. | Sequence Number |
  1773. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1774. | Acknowledgment Number |
  1775. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1776. | Data | |U|A|P|R|S|F| |
  1777. | Offset| Reserved |R|C|S|S|Y|I| Window |
  1778. | | |G|K|H|T|N|N| |
  1779. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1780. | Checksum | Urgent Pointer |
  1781. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1782. | Options | Padding |
  1783. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1784. | data |
  1785. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1786. UDP header: https://tools.ietf.org/html/rfc768
  1787. 0 7 8 15 16 23 24 31
  1788. +--------+--------+--------+--------+
  1789. | Source | Destination |
  1790. | Port | Port |
  1791. +--------+--------+--------+--------+
  1792. | | |
  1793. | Length | Checksum |
  1794. +--------+--------+--------+--------+
  1795. |
  1796. | data octets ...
  1797. +---------------- ...
  1798. */
  1799. const (
  1800. packetDirectionServerUpstream = 0
  1801. packetDirectionServerDownstream = 1
  1802. packetDirectionClientUpstream = 2
  1803. packetDirectionClientDownstream = 3
  1804. internetProtocolTCP = 6
  1805. internetProtocolUDP = 17
  1806. portNumberDNS = 53
  1807. packetRejectNoSession = 0
  1808. packetRejectDestinationAddress = 1
  1809. packetRejectLength = 2
  1810. packetRejectVersion = 3
  1811. packetRejectOptions = 4
  1812. packetRejectProtocol = 5
  1813. packetRejectTCPProtocolLength = 6
  1814. packetRejectUDPProtocolLength = 7
  1815. packetRejectTCPPort = 8
  1816. packetRejectUDPPort = 9
  1817. packetRejectNoOriginalAddress = 10
  1818. packetRejectNoDNSResolvers = 11
  1819. packetRejectInvalidDNSMessage = 12
  1820. packetRejectDisallowedDomain = 13
  1821. packetRejectNoClient = 14
  1822. packetRejectReasonCount = 15
  1823. packetOk = 15
  1824. )
  1825. type packetDirection int
  1826. type internetProtocol int
  1827. type packetRejectReason int
  1828. func packetRejectReasonDescription(reason packetRejectReason) string {
  1829. // Description strings follow the metrics naming
  1830. // convention: all lowercase; underscore seperators.
  1831. switch reason {
  1832. case packetRejectNoSession:
  1833. return "no_session"
  1834. case packetRejectDestinationAddress:
  1835. return "invalid_destination_address"
  1836. case packetRejectLength:
  1837. return "invalid_ip_packet_length"
  1838. case packetRejectVersion:
  1839. return "invalid_ip_header_version"
  1840. case packetRejectOptions:
  1841. return "invalid_ip_header_options"
  1842. case packetRejectProtocol:
  1843. return "invalid_ip_header_protocol"
  1844. case packetRejectTCPProtocolLength:
  1845. return "invalid_tcp_packet_length"
  1846. case packetRejectUDPProtocolLength:
  1847. return "invalid_tcp_packet_length"
  1848. case packetRejectTCPPort:
  1849. return "disallowed_tcp_destination_port"
  1850. case packetRejectUDPPort:
  1851. return "disallowed_udp_destination_port"
  1852. case packetRejectNoOriginalAddress:
  1853. return "no_original_address"
  1854. case packetRejectNoDNSResolvers:
  1855. return "no_dns_resolvers"
  1856. case packetRejectInvalidDNSMessage:
  1857. return "invalid_dns_message"
  1858. case packetRejectDisallowedDomain:
  1859. return "disallowed_domain"
  1860. case packetRejectNoClient:
  1861. return "no_client"
  1862. }
  1863. return "unknown_reason"
  1864. }
  1865. // Caller: the destination IP address return value is
  1866. // a slice of the packet input value and only valid while
  1867. // the packet buffer remains valid.
  1868. func getPacketDestinationIPAddress(
  1869. metrics *packetMetrics,
  1870. direction packetDirection,
  1871. packet []byte) (net.IP, bool) {
  1872. // TODO: this function duplicates a subset of the packet
  1873. // parsing code in processPacket. Refactor to reuse code;
  1874. // also, both getPacketDestinationIPAddress and processPacket
  1875. // are called for some packets; refactor to only parse once.
  1876. if len(packet) < 1 {
  1877. metrics.rejectedPacket(direction, packetRejectLength)
  1878. return nil, false
  1879. }
  1880. version := packet[0] >> 4
  1881. if version != 4 && version != 6 {
  1882. metrics.rejectedPacket(direction, packetRejectVersion)
  1883. return nil, false
  1884. }
  1885. if version == 4 {
  1886. if len(packet) < 20 {
  1887. metrics.rejectedPacket(direction, packetRejectLength)
  1888. return nil, false
  1889. }
  1890. return packet[16:20], true
  1891. } else { // IPv6
  1892. if len(packet) < 40 {
  1893. metrics.rejectedPacket(direction, packetRejectLength)
  1894. return nil, false
  1895. }
  1896. return packet[24:40], true
  1897. }
  1898. }
  1899. // processPacket parses IP packets, applies relaying rules,
  1900. // and rewrites packet elements as required. processPacket
  1901. // returns true if a packet parses correctly, is accepted
  1902. // by the relay rules, and is successfully rewritten.
  1903. //
  1904. // When a packet is rejected, processPacket returns false
  1905. // and updates a reason in the supplied metrics.
  1906. //
  1907. // Rejection may result in partially rewritten packets.
  1908. func processPacket(
  1909. metrics *packetMetrics,
  1910. session *session,
  1911. clientTransparentDNS *clientTransparentDNS,
  1912. direction packetDirection,
  1913. packet []byte) bool {
  1914. // Parse and validate IP packet structure
  1915. // Must have an IP version field.
  1916. if len(packet) < 1 {
  1917. metrics.rejectedPacket(direction, packetRejectLength)
  1918. return false
  1919. }
  1920. version := packet[0] >> 4
  1921. // Must be IPv4 or IPv6.
  1922. if version != 4 && version != 6 {
  1923. metrics.rejectedPacket(direction, packetRejectVersion)
  1924. return false
  1925. }
  1926. var protocol internetProtocol
  1927. var sourceIPAddress, destinationIPAddress net.IP
  1928. var sourcePort, destinationPort uint16
  1929. var IPChecksum, TCPChecksum, UDPChecksum []byte
  1930. var applicationData []byte
  1931. if version == 4 {
  1932. // IHL must be 5: options are not supported; a fixed
  1933. // 20 byte header is expected.
  1934. headerLength := packet[0] & 0x0F
  1935. if headerLength != 5 {
  1936. metrics.rejectedPacket(direction, packetRejectOptions)
  1937. return false
  1938. }
  1939. if len(packet) < 20 {
  1940. metrics.rejectedPacket(direction, packetRejectLength)
  1941. return false
  1942. }
  1943. // Protocol must be TCP or UDP.
  1944. protocol = internetProtocol(packet[9])
  1945. dataOffset := 0
  1946. if protocol == internetProtocolTCP {
  1947. if len(packet) < 33 {
  1948. metrics.rejectedPacket(direction, packetRejectTCPProtocolLength)
  1949. return false
  1950. }
  1951. dataOffset = 20 + 4*int(packet[32]>>4)
  1952. if len(packet) < dataOffset {
  1953. metrics.rejectedPacket(direction, packetRejectTCPProtocolLength)
  1954. return false
  1955. }
  1956. } else if protocol == internetProtocolUDP {
  1957. dataOffset = 28
  1958. if len(packet) < dataOffset {
  1959. metrics.rejectedPacket(direction, packetRejectUDPProtocolLength)
  1960. return false
  1961. }
  1962. } else {
  1963. metrics.rejectedPacket(direction, packetRejectProtocol)
  1964. return false
  1965. }
  1966. applicationData = packet[dataOffset:]
  1967. // Slices reference packet bytes to be rewritten.
  1968. sourceIPAddress = packet[12:16]
  1969. destinationIPAddress = packet[16:20]
  1970. IPChecksum = packet[10:12]
  1971. // Port numbers have the same offset in TCP and UDP.
  1972. sourcePort = binary.BigEndian.Uint16(packet[20:22])
  1973. destinationPort = binary.BigEndian.Uint16(packet[22:24])
  1974. if protocol == internetProtocolTCP {
  1975. TCPChecksum = packet[36:38]
  1976. } else { // UDP
  1977. UDPChecksum = packet[26:28]
  1978. }
  1979. } else { // IPv6
  1980. if len(packet) < 40 {
  1981. metrics.rejectedPacket(direction, packetRejectLength)
  1982. return false
  1983. }
  1984. // Next Header must be TCP or UDP.
  1985. nextHeader := packet[6]
  1986. protocol = internetProtocol(nextHeader)
  1987. dataOffset := 0
  1988. if protocol == internetProtocolTCP {
  1989. if len(packet) < 53 {
  1990. metrics.rejectedPacket(direction, packetRejectTCPProtocolLength)
  1991. return false
  1992. }
  1993. dataOffset = 40 + 4*int(packet[52]>>4)
  1994. if len(packet) < dataOffset {
  1995. metrics.rejectedPacket(direction, packetRejectTCPProtocolLength)
  1996. return false
  1997. }
  1998. } else if protocol == internetProtocolUDP {
  1999. dataOffset = 48
  2000. if len(packet) < dataOffset {
  2001. metrics.rejectedPacket(direction, packetRejectUDPProtocolLength)
  2002. return false
  2003. }
  2004. } else {
  2005. metrics.rejectedPacket(direction, packetRejectProtocol)
  2006. return false
  2007. }
  2008. applicationData = packet[dataOffset:]
  2009. // Slices reference packet bytes to be rewritten.
  2010. sourceIPAddress = packet[8:24]
  2011. destinationIPAddress = packet[24:40]
  2012. // Port numbers have the same offset in TCP and UDP.
  2013. sourcePort = binary.BigEndian.Uint16(packet[40:42])
  2014. destinationPort = binary.BigEndian.Uint16(packet[42:44])
  2015. if protocol == internetProtocolTCP {
  2016. TCPChecksum = packet[56:58]
  2017. } else { // UDP
  2018. UDPChecksum = packet[46:48]
  2019. }
  2020. }
  2021. // Apply rules
  2022. //
  2023. // Most of this logic is only applied on the server, as only
  2024. // the server knows the traffic rules configuration, and is
  2025. // tracking flows.
  2026. isServer := (direction == packetDirectionServerUpstream ||
  2027. direction == packetDirectionServerDownstream)
  2028. // Check if the packet qualifies for transparent DNS rewriting
  2029. //
  2030. // - Both TCP and UDP DNS packets may qualify
  2031. // - Unless configured, transparent DNS flows are not tracked,
  2032. // as most DNS resolutions are very-short lived exchanges
  2033. // - The traffic rules checks are bypassed, since transparent
  2034. // DNS is essential
  2035. // Transparent DNS is a two-step translation. On the client, the VPN
  2036. // can be configured with any private address range, so as to not
  2037. // conflict with other local networks, such as WiFi. For example, the
  2038. // client may select from 192.168.0.0/16, when an existing interface
  2039. // uses a subnet in 10.0.0.0/8, and specify the VPN DNS server as 192.168.0.1.
  2040. //
  2041. // The first translation, on the client side, rewrites packets
  2042. // destined to 192.168.0.1:53, the DNS server, to the destination
  2043. // transparentDNSResolverIPv4Address:53. This packet is sent to the
  2044. // server.
  2045. //
  2046. // The second translation, on the server side, rewrites packets
  2047. // destined to transparentDNSResolverIPv4Address:53 to an actual DNS
  2048. // server destination.
  2049. //
  2050. // Then, reverse rewrites are applied to DNS response packets: the
  2051. // server rewrites the source address actual-DNS-server:53 to
  2052. // transparentDNSResolverIPv4Address:53, and then the client rewrites
  2053. // the source address transparentDNSResolverIPv4Address:53 to
  2054. // 192.168.0.1:53, and that packet is written to the tun device.
  2055. doTransparentDNS := false
  2056. if isServer {
  2057. if direction == packetDirectionServerUpstream {
  2058. // DNS packets destinated for the transparent DNS target addresses
  2059. // will be rewritten to go to one of the server's resolvers.
  2060. if destinationPort == portNumberDNS {
  2061. if version == 4 &&
  2062. destinationIPAddress.Equal(transparentDNSResolverIPv4Address) {
  2063. numResolvers := len(session.DNSResolverIPv4Addresses)
  2064. if numResolvers > 0 {
  2065. doTransparentDNS = true
  2066. } else {
  2067. metrics.rejectedPacket(direction, packetRejectNoDNSResolvers)
  2068. return false
  2069. }
  2070. } else if version == 6 &&
  2071. destinationIPAddress.Equal(transparentDNSResolverIPv6Address) {
  2072. numResolvers := len(session.DNSResolverIPv6Addresses)
  2073. if numResolvers > 0 {
  2074. doTransparentDNS = true
  2075. } else {
  2076. metrics.rejectedPacket(direction, packetRejectNoDNSResolvers)
  2077. return false
  2078. }
  2079. }
  2080. // Limitation: checkAllowedDomainFunc is applied only to DNS queries in
  2081. // UDP; currently DNS-over-TCP will bypass the domain block list check.
  2082. if doTransparentDNS && protocol == internetProtocolUDP {
  2083. domain, err := common.ParseDNSQuestion(applicationData)
  2084. if err != nil {
  2085. metrics.rejectedPacket(direction, packetRejectInvalidDNSMessage)
  2086. return false
  2087. }
  2088. if domain != "" {
  2089. checkAllowedDomainFunc := session.getCheckAllowedDomainFunc()
  2090. if !checkAllowedDomainFunc(domain) {
  2091. metrics.rejectedPacket(direction, packetRejectDisallowedDomain)
  2092. return false
  2093. }
  2094. }
  2095. }
  2096. }
  2097. } else { // packetDirectionServerDownstream
  2098. // DNS packets with a source address of any of the server's
  2099. // resolvers will be rewritten back to the transparent DNS target
  2100. // address.
  2101. // Limitation: responses to client DNS packets _originally
  2102. // destined_ for a resolver in GetDNSResolverIPv4Addresses will
  2103. // be lost. This would happen if some process on the client
  2104. // ignores the system set DNS values; and forces use of the same
  2105. // resolvers as the server.
  2106. if sourcePort == portNumberDNS {
  2107. if version == 4 {
  2108. for _, IPAddress := range session.DNSResolverIPv4Addresses {
  2109. if sourceIPAddress.Equal(IPAddress) {
  2110. doTransparentDNS = true
  2111. break
  2112. }
  2113. }
  2114. } else if version == 6 {
  2115. for _, IPAddress := range session.DNSResolverIPv6Addresses {
  2116. if sourceIPAddress.Equal(IPAddress) {
  2117. doTransparentDNS = true
  2118. break
  2119. }
  2120. }
  2121. }
  2122. }
  2123. }
  2124. } else { // isClient
  2125. if direction == packetDirectionClientUpstream {
  2126. // DNS packets destined to the configured VPN DNS servers,
  2127. // specified in clientTransparentDNS, are rewritten to go to
  2128. // transparentDNSResolverIPv4/6Address.
  2129. if destinationPort == portNumberDNS {
  2130. if (version == 4 && destinationIPAddress.Equal(clientTransparentDNS.IPv4Address)) ||
  2131. (version == 6 && destinationIPAddress.Equal(clientTransparentDNS.IPv6Address)) {
  2132. doTransparentDNS = true
  2133. }
  2134. }
  2135. } else { // packetDirectionClientDownstream
  2136. // DNS packets with a transparentDNSResolverIPv4/6Address source
  2137. // address are rewritten to come from the configured VPN DNS servers.
  2138. if sourcePort == portNumberDNS {
  2139. if (version == 4 && sourceIPAddress.Equal(transparentDNSResolverIPv4Address)) ||
  2140. (version == 6 && sourceIPAddress.Equal(transparentDNSResolverIPv6Address)) {
  2141. doTransparentDNS = true
  2142. }
  2143. }
  2144. }
  2145. }
  2146. // Apply rewrites before determining flow ID to ensure that corresponding up-
  2147. // and downstream flows yield the same flow ID.
  2148. var rewriteSourceIPAddress, rewriteDestinationIPAddress net.IP
  2149. if direction == packetDirectionServerUpstream {
  2150. // Store original source IP address to be replaced in
  2151. // downstream rewriting.
  2152. if version == 4 {
  2153. session.setOriginalIPv4AddressIfNotSet(sourceIPAddress)
  2154. rewriteSourceIPAddress = session.assignedIPv4Address
  2155. } else { // version == 6
  2156. session.setOriginalIPv6AddressIfNotSet(sourceIPAddress)
  2157. rewriteSourceIPAddress = session.assignedIPv6Address
  2158. }
  2159. // Rewrite DNS packets destinated for the transparent DNS target addresses
  2160. // to go to one of the server's resolvers. This random selection uses
  2161. // math/rand to minimize overhead.
  2162. //
  2163. // Limitation: TCP packets are always assigned to the same resolver, as
  2164. // currently there is no method for tracking the assigned resolver per TCP
  2165. // flow.
  2166. if doTransparentDNS {
  2167. if version == 4 {
  2168. index := session.TCPDNSResolverIPv4Index
  2169. if protocol == internetProtocolUDP {
  2170. index = rand.Intn(len(session.DNSResolverIPv4Addresses))
  2171. }
  2172. rewriteDestinationIPAddress = session.DNSResolverIPv4Addresses[index]
  2173. } else { // version == 6
  2174. index := session.TCPDNSResolverIPv6Index
  2175. if protocol == internetProtocolUDP {
  2176. index = rand.Intn(len(session.DNSResolverIPv6Addresses))
  2177. }
  2178. rewriteDestinationIPAddress = session.DNSResolverIPv6Addresses[index]
  2179. }
  2180. }
  2181. } else if direction == packetDirectionServerDownstream {
  2182. // Destination address will be original source address.
  2183. if version == 4 {
  2184. rewriteDestinationIPAddress = session.getOriginalIPv4Address()
  2185. } else if version == 6 {
  2186. rewriteDestinationIPAddress = session.getOriginalIPv6Address()
  2187. }
  2188. if rewriteDestinationIPAddress == nil {
  2189. metrics.rejectedPacket(direction, packetRejectNoOriginalAddress)
  2190. return false
  2191. }
  2192. // Rewrite source address of packets from servers' resolvers
  2193. // to transparent DNS target address.
  2194. if doTransparentDNS {
  2195. if version == 4 {
  2196. rewriteSourceIPAddress = transparentDNSResolverIPv4Address
  2197. } else if version == 6 {
  2198. rewriteSourceIPAddress = transparentDNSResolverIPv6Address
  2199. }
  2200. }
  2201. } else if direction == packetDirectionClientUpstream {
  2202. // Rewrite the destination address to be
  2203. // transparentDNSResolverIPv4/6Address, which the server will
  2204. // subsequently send on to actual DNS servers.
  2205. if doTransparentDNS {
  2206. if version == 4 {
  2207. rewriteDestinationIPAddress = transparentDNSResolverIPv4Address
  2208. } else if version == 6 {
  2209. rewriteDestinationIPAddress = transparentDNSResolverIPv6Address
  2210. }
  2211. }
  2212. } else if direction == packetDirectionClientDownstream {
  2213. // Rewrite the source address so the DNS response appears to come from
  2214. // the configured VPN DNS server.
  2215. if doTransparentDNS {
  2216. if version == 4 {
  2217. rewriteSourceIPAddress = clientTransparentDNS.IPv4Address
  2218. } else if version == 6 {
  2219. rewriteSourceIPAddress = clientTransparentDNS.IPv6Address
  2220. }
  2221. }
  2222. }
  2223. // Check if flow is tracked before checking traffic permission
  2224. doFlowTracking := isServer && (!doTransparentDNS || session.enableDNSFlowTracking)
  2225. // TODO: verify this struct is stack allocated
  2226. var ID flowID
  2227. isTrackingFlow := false
  2228. if doFlowTracking {
  2229. if direction == packetDirectionServerUpstream {
  2230. // Reflect rewrites in the upstream case and don't reflect rewrites in the
  2231. // following downstream case: all flow IDs are in the upstream space, with
  2232. // the assigned private IP for the client and, in the case of DNS, the
  2233. // actual resolver IP.
  2234. srcIP := sourceIPAddress
  2235. if rewriteSourceIPAddress != nil {
  2236. srcIP = rewriteSourceIPAddress
  2237. }
  2238. destIP := destinationIPAddress
  2239. if rewriteDestinationIPAddress != nil {
  2240. destIP = rewriteDestinationIPAddress
  2241. }
  2242. ID.set(srcIP, sourcePort, destIP, destinationPort, protocol)
  2243. } else if direction == packetDirectionServerDownstream {
  2244. ID.set(
  2245. destinationIPAddress,
  2246. destinationPort,
  2247. sourceIPAddress,
  2248. sourcePort,
  2249. protocol)
  2250. }
  2251. isTrackingFlow = session.isTrackingFlow(ID)
  2252. }
  2253. // Check packet source/destination is permitted; except for:
  2254. // - existing flows, which have already been checked
  2255. // - transparent DNS, which is always allowed
  2256. if !doTransparentDNS && !isTrackingFlow {
  2257. // Enforce traffic rules (allowed TCP/UDP ports).
  2258. checkPort := 0
  2259. if direction == packetDirectionServerUpstream ||
  2260. direction == packetDirectionClientUpstream {
  2261. checkPort = int(destinationPort)
  2262. } else if direction == packetDirectionServerDownstream ||
  2263. direction == packetDirectionClientDownstream {
  2264. checkPort = int(sourcePort)
  2265. }
  2266. if protocol == internetProtocolTCP {
  2267. invalidPort := (checkPort == 0)
  2268. if !invalidPort && isServer {
  2269. checkAllowedTCPPortFunc := session.getCheckAllowedTCPPortFunc()
  2270. if checkAllowedTCPPortFunc == nil ||
  2271. !checkAllowedTCPPortFunc(net.IP(ID.upstreamIPAddress[:]), checkPort) {
  2272. invalidPort = true
  2273. }
  2274. }
  2275. if invalidPort {
  2276. metrics.rejectedPacket(direction, packetRejectTCPPort)
  2277. return false
  2278. }
  2279. } else if protocol == internetProtocolUDP {
  2280. invalidPort := (checkPort == 0)
  2281. if !invalidPort && isServer {
  2282. checkAllowedUDPPortFunc := session.getCheckAllowedUDPPortFunc()
  2283. if checkAllowedUDPPortFunc == nil ||
  2284. !checkAllowedUDPPortFunc(net.IP(ID.upstreamIPAddress[:]), checkPort) {
  2285. invalidPort = true
  2286. }
  2287. }
  2288. if invalidPort {
  2289. metrics.rejectedPacket(direction, packetRejectUDPPort)
  2290. return false
  2291. }
  2292. }
  2293. // Enforce no localhost, multicast or broadcast packets; and no
  2294. // client-to-client packets.
  2295. //
  2296. // TODO: a client-side check could check that destination IP
  2297. // is strictly a tun device IP address.
  2298. if !destinationIPAddress.IsGlobalUnicast() ||
  2299. (direction == packetDirectionServerUpstream &&
  2300. !session.allowBogons &&
  2301. common.IsBogon(destinationIPAddress)) ||
  2302. // Client-to-client packets are disallowed even when other bogons are
  2303. // allowed.
  2304. (direction == packetDirectionServerUpstream &&
  2305. ((version == 4 &&
  2306. !destinationIPAddress.Equal(transparentDNSResolverIPv4Address) &&
  2307. privateSubnetIPv4.Contains(destinationIPAddress)) ||
  2308. (version == 6 &&
  2309. !destinationIPAddress.Equal(transparentDNSResolverIPv6Address) &&
  2310. privateSubnetIPv6.Contains(destinationIPAddress)))) {
  2311. metrics.rejectedPacket(direction, packetRejectDestinationAddress)
  2312. return false
  2313. }
  2314. }
  2315. // Apply packet rewrites. IP (v4 only) and TCP/UDP all have packet
  2316. // checksums which are updated to relect the rewritten headers.
  2317. var checksumAccumulator int32
  2318. if rewriteSourceIPAddress != nil {
  2319. checksumAccumulate(sourceIPAddress, false, &checksumAccumulator)
  2320. copy(sourceIPAddress, rewriteSourceIPAddress)
  2321. checksumAccumulate(sourceIPAddress, true, &checksumAccumulator)
  2322. }
  2323. if rewriteDestinationIPAddress != nil {
  2324. checksumAccumulate(destinationIPAddress, false, &checksumAccumulator)
  2325. copy(destinationIPAddress, rewriteDestinationIPAddress)
  2326. checksumAccumulate(destinationIPAddress, true, &checksumAccumulator)
  2327. }
  2328. if rewriteSourceIPAddress != nil || rewriteDestinationIPAddress != nil {
  2329. // IPv6 doesn't have an IP header checksum.
  2330. if version == 4 {
  2331. checksumAdjust(IPChecksum, checksumAccumulator)
  2332. }
  2333. if protocol == internetProtocolTCP {
  2334. checksumAdjust(TCPChecksum, checksumAccumulator)
  2335. } else { // UDP
  2336. checksumAdjust(UDPChecksum, checksumAccumulator)
  2337. }
  2338. }
  2339. // Start/update flow tracking, only once past all possible packet rejects
  2340. if doFlowTracking {
  2341. if !isTrackingFlow {
  2342. session.startTrackingFlow(ID, direction, applicationData, doTransparentDNS)
  2343. } else {
  2344. session.updateFlow(ID, direction, applicationData)
  2345. }
  2346. }
  2347. metrics.relayedPacket(direction, int(version), protocol, len(packet), len(applicationData))
  2348. return true
  2349. }
  2350. // Checksum code based on https://github.com/OpenVPN/openvpn:
  2351. /*
  2352. OpenVPN (TM) -- An Open Source VPN daemon
  2353. Copyright (C) 2002-2017 OpenVPN Technologies, Inc. <sales@openvpn.net>
  2354. OpenVPN license:
  2355. ----------------
  2356. OpenVPN is distributed under the GPL license version 2 (see COPYRIGHT.GPL).
  2357. */
  2358. func checksumAccumulate(data []byte, newData bool, accumulator *int32) {
  2359. // Based on ADD_CHECKSUM_32 and SUB_CHECKSUM_32 macros from OpenVPN:
  2360. // https://github.com/OpenVPN/openvpn/blob/58716979640b5d8850b39820f91da616964398cc/src/openvpn/proto.h#L177
  2361. // Assumes length of data is factor of 4.
  2362. for i := 0; i < len(data); i += 4 {
  2363. word := uint32(data[i+0])<<24 | uint32(data[i+1])<<16 | uint32(data[i+2])<<8 | uint32(data[i+3])
  2364. if newData {
  2365. *accumulator -= int32(word & 0xFFFF)
  2366. *accumulator -= int32(word >> 16)
  2367. } else {
  2368. *accumulator += int32(word & 0xFFFF)
  2369. *accumulator += int32(word >> 16)
  2370. }
  2371. }
  2372. }
  2373. func checksumAdjust(checksumData []byte, accumulator int32) {
  2374. // Based on ADJUST_CHECKSUM macro from OpenVPN:
  2375. // https://github.com/OpenVPN/openvpn/blob/58716979640b5d8850b39820f91da616964398cc/src/openvpn/proto.h#L177
  2376. // Assumes checksumData is 2 byte slice.
  2377. checksum := uint16(checksumData[0])<<8 | uint16(checksumData[1])
  2378. accumulator += int32(checksum)
  2379. if accumulator < 0 {
  2380. accumulator = -accumulator
  2381. accumulator = (accumulator >> 16) + (accumulator & 0xFFFF)
  2382. accumulator += accumulator >> 16
  2383. checksum = uint16(^accumulator)
  2384. } else {
  2385. accumulator = (accumulator >> 16) + (accumulator & 0xFFFF)
  2386. accumulator += accumulator >> 16
  2387. checksum = uint16(accumulator)
  2388. }
  2389. checksumData[0] = byte(checksum >> 8)
  2390. checksumData[1] = byte(checksum & 0xFF)
  2391. }
  2392. /*
  2393. packet debugging snippet:
  2394. import (
  2395. "github.com/google/gopacket"
  2396. "github.com/google/gopacket/layers"
  2397. )
  2398. func tracePacket(where string, packet []byte) {
  2399. var p gopacket.Packet
  2400. if len(packet) > 0 && packet[0]>>4 == 4 {
  2401. p = gopacket.NewPacket(packet, layers.LayerTypeIPv4, gopacket.Default)
  2402. } else {
  2403. p = gopacket.NewPacket(packet, layers.LayerTypeIPv6, gopacket.Default)
  2404. }
  2405. fmt.Printf("[%s packet]:\n%s\n\n", where, p)
  2406. }
  2407. */
  2408. // Device manages a tun device. It handles packet I/O using static,
  2409. // preallocated buffers to avoid GC churn.
  2410. type Device struct {
  2411. name string
  2412. writeMutex sync.Mutex
  2413. deviceIO io.ReadWriteCloser
  2414. inboundBuffer []byte
  2415. outboundBuffer []byte
  2416. }
  2417. // NewServerDevice creates and configures a new server tun device.
  2418. // Since the server uses fixed address spaces, only one server
  2419. // device may exist per host.
  2420. func NewServerDevice(config *ServerConfig) (*Device, error) {
  2421. file, deviceName, err := OpenTunDevice("")
  2422. if err != nil {
  2423. return nil, errors.Trace(err)
  2424. }
  2425. err = configureServerInterface(config, deviceName)
  2426. if err != nil {
  2427. _ = file.Close()
  2428. return nil, errors.Trace(err)
  2429. }
  2430. return newDevice(
  2431. deviceName,
  2432. file,
  2433. getMTU(config.MTU)), nil
  2434. }
  2435. // NewClientDevice creates and configures a new client tun device.
  2436. // Multiple client tun devices may exist per host.
  2437. func NewClientDevice(config *ClientConfig) (*Device, error) {
  2438. file, deviceName, err := OpenTunDevice("")
  2439. if err != nil {
  2440. return nil, errors.Trace(err)
  2441. }
  2442. err = configureClientInterface(
  2443. config, deviceName)
  2444. if err != nil {
  2445. _ = file.Close()
  2446. return nil, errors.Trace(err)
  2447. }
  2448. return newDevice(
  2449. deviceName,
  2450. file,
  2451. getMTU(config.MTU)), nil
  2452. }
  2453. func newDevice(
  2454. name string,
  2455. deviceIO io.ReadWriteCloser,
  2456. MTU int) *Device {
  2457. return &Device{
  2458. name: name,
  2459. deviceIO: deviceIO,
  2460. inboundBuffer: makeDeviceInboundBuffer(MTU),
  2461. outboundBuffer: makeDeviceOutboundBuffer(MTU),
  2462. }
  2463. }
  2464. // NewClientDeviceFromFD wraps an existing tun device.
  2465. func NewClientDeviceFromFD(config *ClientConfig) (*Device, error) {
  2466. file, err := fileFromFD(config.TunFileDescriptor, "")
  2467. if err != nil {
  2468. return nil, errors.Trace(err)
  2469. }
  2470. MTU := getMTU(config.MTU)
  2471. return &Device{
  2472. name: "",
  2473. deviceIO: file,
  2474. inboundBuffer: makeDeviceInboundBuffer(MTU),
  2475. outboundBuffer: makeDeviceOutboundBuffer(MTU),
  2476. }, nil
  2477. }
  2478. // Name returns the interface name for a created tun device,
  2479. // or returns "" for a device created by NewClientDeviceFromFD.
  2480. // The interface name may be used for additional network and
  2481. // routing configuration.
  2482. func (device *Device) Name() string {
  2483. return device.name
  2484. }
  2485. // ReadPacket reads one full packet from the tun device. The
  2486. // return value is a slice of a static, reused buffer, so the
  2487. // value is only valid until the next ReadPacket call.
  2488. // Concurrent calls to ReadPacket are _not_ supported.
  2489. func (device *Device) ReadPacket() ([]byte, error) {
  2490. // readTunPacket performs the platform dependent
  2491. // packet read operation.
  2492. offset, size, err := device.readTunPacket()
  2493. if err != nil {
  2494. return nil, errors.Trace(err)
  2495. }
  2496. return device.inboundBuffer[offset : offset+size], nil
  2497. }
  2498. // WritePacket writes one full packet to the tun device.
  2499. // Concurrent calls to WritePacket are supported.
  2500. func (device *Device) WritePacket(packet []byte) error {
  2501. // This mutex ensures that only one concurrent goroutine
  2502. // can use outboundBuffer when writing.
  2503. device.writeMutex.Lock()
  2504. defer device.writeMutex.Unlock()
  2505. // writeTunPacket performs the platform dependent
  2506. // packet write operation.
  2507. err := device.writeTunPacket(packet)
  2508. if err != nil {
  2509. return errors.Trace(err)
  2510. }
  2511. return nil
  2512. }
  2513. // Close interrupts any blocking Read/Write calls and
  2514. // tears down the tun device.
  2515. func (device *Device) Close() error {
  2516. return device.deviceIO.Close()
  2517. }
  2518. // Channel manages packet transport over a communications channel.
  2519. // Any io.ReadWriteCloser can provide transport. In psiphond, the
  2520. // io.ReadWriteCloser will be an SSH channel. Channel I/O frames
  2521. // packets with a length header and uses static, preallocated
  2522. // buffers to avoid GC churn.
  2523. type Channel struct {
  2524. transport io.ReadWriteCloser
  2525. inboundBuffer []byte
  2526. outboundBuffer []byte
  2527. }
  2528. // IP packets cannot be larger that 64K, so a 16-bit length
  2529. // header is sufficient.
  2530. const (
  2531. channelHeaderSize = 2
  2532. )
  2533. // NewChannel initializes a new Channel.
  2534. func NewChannel(transport io.ReadWriteCloser, MTU int) *Channel {
  2535. return &Channel{
  2536. transport: transport,
  2537. inboundBuffer: make([]byte, channelHeaderSize+MTU),
  2538. outboundBuffer: make([]byte, channelHeaderSize+MTU),
  2539. }
  2540. }
  2541. // ReadPacket reads one full packet from the channel. The
  2542. // return value is a slice of a static, reused buffer, so the
  2543. // value is only valid until the next ReadPacket call.
  2544. // Concurrent calls to ReadPacket are not supported.
  2545. func (channel *Channel) ReadPacket() ([]byte, error) {
  2546. header := channel.inboundBuffer[0:channelHeaderSize]
  2547. _, err := io.ReadFull(channel.transport, header)
  2548. if err != nil {
  2549. return nil, errors.Trace(err)
  2550. }
  2551. size := int(binary.BigEndian.Uint16(header))
  2552. if size > len(channel.inboundBuffer[channelHeaderSize:]) {
  2553. return nil, errors.Tracef("packet size exceeds MTU: %d", size)
  2554. }
  2555. packet := channel.inboundBuffer[channelHeaderSize : channelHeaderSize+size]
  2556. _, err = io.ReadFull(channel.transport, packet)
  2557. if err != nil {
  2558. return nil, errors.Trace(err)
  2559. }
  2560. return packet, nil
  2561. }
  2562. // WritePacket writes one full packet to the channel.
  2563. // Concurrent calls to WritePacket are not supported.
  2564. func (channel *Channel) WritePacket(packet []byte) error {
  2565. // Flow control assumed to be provided by the transport. In the case
  2566. // of SSH, the channel window size will determine whether the packet
  2567. // data is transmitted immediately or whether the transport.Write will
  2568. // block. When the channel window is full and transport.Write blocks,
  2569. // the sender's tun device will not be read (client case) or the send
  2570. // queue will fill (server case) and packets will be dropped. In this
  2571. // way, the channel window size will influence the TCP window size for
  2572. // tunneled traffic.
  2573. // When the transport is an SSH channel, the overhead per packet message
  2574. // includes:
  2575. //
  2576. // - SSH_MSG_CHANNEL_DATA: 5 bytes (https://tools.ietf.org/html/rfc4254#section-5.2)
  2577. // - SSH packet: ~28 bytes (https://tools.ietf.org/html/rfc4253#section-5.3), with MAC
  2578. // - TCP/IP transport for SSH: 40 bytes for IPv4
  2579. // Assumes MTU <= 64K and len(packet) <= MTU
  2580. size := len(packet)
  2581. binary.BigEndian.PutUint16(channel.outboundBuffer, uint16(size))
  2582. copy(channel.outboundBuffer[channelHeaderSize:], packet)
  2583. _, err := channel.transport.Write(channel.outboundBuffer[0 : channelHeaderSize+size])
  2584. if err != nil {
  2585. return errors.Trace(err)
  2586. }
  2587. return nil
  2588. }
  2589. // WriteFramedPackets writes a buffer of pre-framed packets to
  2590. // the channel.
  2591. // Concurrent calls to WriteFramedPackets are not supported.
  2592. func (channel *Channel) WriteFramedPackets(packetBuffer []byte) error {
  2593. _, err := channel.transport.Write(packetBuffer)
  2594. if err != nil {
  2595. return errors.Trace(err)
  2596. }
  2597. return nil
  2598. }
  2599. // Close interrupts any blocking Read/Write calls and
  2600. // closes the channel transport.
  2601. func (channel *Channel) Close() error {
  2602. return channel.transport.Close()
  2603. }