tun.go 82 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615
  1. /*
  2. * Copyright (c) 2017, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. // Copyright 2009 The Go Authors. All rights reserved.
  20. // Use of this source code is governed by a BSD-style
  21. // license that can be found in the LICENSE file.
  22. /*
  23. Package tun is an IP packet tunnel server and client. It supports tunneling
  24. both IPv4 and IPv6.
  25. ......................................................... .-,( ),-.
  26. . [server] .-----. . .-( )-.
  27. . | NIC |<---->( Internet )
  28. . ....................................... '-----' . '-( ).-'
  29. . . [packet tunnel daemon] . ^ . '-.( ).-'
  30. . . . | .
  31. . . ........................... . | .
  32. . . . [session] . . NAT .
  33. . . . . . | .
  34. . . . . . v .
  35. . . . . . .---. .
  36. . . . . . | t | .
  37. . . . . . | u | .
  38. . . . .---. . .---. . | n | .
  39. . . . | q | . | d | . | | .
  40. . . . | u | . | e | . | d | .
  41. . . . .------| e |<-----| m |<---------| e | .
  42. . . . | | u | . | u | . | v | .
  43. . . . | | e | . | x | . | i | .
  44. . . . rewrite '---' . '---' . | c | .
  45. . . . | . . | e | .
  46. . . . v . . '---' .
  47. . . . .---------. . . ^ .
  48. . . . | channel |--rewrite--------------------' .
  49. . . . '---------' . . .
  50. . . ...........^............... . .
  51. . .............|......................... .
  52. ...............|.........................................
  53. |
  54. | (typically via Internet)
  55. |
  56. ...............|.................
  57. . [client] | .
  58. . | .
  59. . .............|............... .
  60. . . v . .
  61. . . .---------. . .
  62. . . | channel | . .
  63. . . '---------' . .
  64. . . ^ . .
  65. . .............|............... .
  66. . v .
  67. . .------------. .
  68. . | tun device | .
  69. . '------------' .
  70. .................................
  71. The client relays IP packets between a local tun device and a channel, which
  72. is a transport to the server. In Psiphon, the channel will be an SSH channel
  73. within an SSH connection to a Psiphon server.
  74. The server relays packets between each client and its own tun device. The
  75. server tun device is NATed to the Internet via an external network interface.
  76. In this way, client traffic is tunneled and will egress from the server host.
  77. Similar to a typical VPN, IP addresses are assigned to each client. Unlike
  78. a typical VPN, the assignment is not transmitted to the client. Instead, the
  79. server transparently rewrites the source addresses of client packets to
  80. the assigned IP address. The server also rewrites the destination address of
  81. certain DNS packets. The purpose of this is to allow clients to reconnect
  82. to different servers without having to tear down or change their local
  83. network configuration. Clients may configure their local tun device with an
  84. arbitrary IP address and a static DNS resolver address.
  85. The server uses the 24-bit 10.0.0.0/8 IPv4 private address space to maximize
  86. the number of addresses available, due to Psiphon client churn and minimum
  87. address lease time constraints. For IPv6, a 24-bit unique local space is used.
  88. When a client is allocated addresses, a unique, unused 24-bit "index" is
  89. reserved/leased. This index maps to and from IPv4 and IPv6 private addresses.
  90. The server multiplexes all client packets into a single tun device. When a
  91. packet is read, the destination address is used to map the packet back to the
  92. correct index, which maps back to the client.
  93. The server maintains client "sessions". A session maintains client IP
  94. address state and effectively holds the lease on assigned addresses. If a
  95. client is disconnected and quickly reconnects, it will resume its previous
  96. session, retaining its IP address and network connection states. Idle
  97. sessions with no client connection will eventually expire.
  98. Packet count and bytes transferred metrics are logged for each client session.
  99. The server integrates with and enforces Psiphon traffic rules and logging
  100. facilities. The server parses and validates packets. Client-to-client packets
  101. are not permitted. Only global unicast packets are permitted. Only TCP and UDP
  102. packets are permitted. The client also filters out, before sending, packets
  103. that the server won't route.
  104. Certain aspects of packet tunneling are outside the scope of this package;
  105. e.g, the Psiphon client and server are responsible for establishing an SSH
  106. channel and negotiating the correct MTU and DNS settings. The Psiphon
  107. server will call Server.ClientConnected when a client connects and establishes
  108. a packet tunnel channel; and Server.ClientDisconnected when the client closes
  109. the channel and/or disconnects.
  110. */
  111. package tun
  112. import (
  113. "context"
  114. "encoding/binary"
  115. "errors"
  116. "fmt"
  117. "io"
  118. "math/rand"
  119. "net"
  120. "sync"
  121. "sync/atomic"
  122. "time"
  123. "github.com/Psiphon-Inc/goarista/monotime"
  124. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  125. )
  126. const (
  127. DEFAULT_MTU = 1500
  128. DEFAULT_DOWNSTREAM_PACKET_QUEUE_SIZE = 32768 * 16
  129. DEFAULT_UPSTREAM_PACKET_QUEUE_SIZE = 32768
  130. DEFAULT_IDLE_SESSION_EXPIRY_SECONDS = 300
  131. ORPHAN_METRICS_CHECKPOINTER_PERIOD = 30 * time.Minute
  132. FLOW_IDLE_EXPIRY = 60 * time.Second
  133. )
  134. // ServerConfig specifies the configuration of a packet tunnel server.
  135. type ServerConfig struct {
  136. // Logger is used for logging events and metrics.
  137. Logger common.Logger
  138. // SudoNetworkConfigCommands specifies whether to use "sudo"
  139. // when executing network configuration commands. This is required
  140. // when the packet tunnel server is not run as root and when
  141. // process capabilities are not available (only Linux kernel 4.3+
  142. // has the required capabilities support). The host sudoers file
  143. // must be configured to allow the tunnel server process user to
  144. // execute the commands invoked in configureServerInterface; see
  145. // the implementation for the appropriate platform.
  146. SudoNetworkConfigCommands bool
  147. // AllowNoIPv6NetworkConfiguration indicates that failures while
  148. // configuring tun interfaces and routing for IPv6 are to be
  149. // logged as warnings only. This option is intended to support
  150. // test cases on hosts without IPv6 and is not for production use;
  151. // the packet tunnel server will still accept IPv6 packets and
  152. // replay them to the tun device.
  153. // AllowNoIPv6NetworkConfiguration may not be supported on all
  154. // platforms.
  155. AllowNoIPv6NetworkConfiguration bool
  156. // EgressInterface is the interface to which client traffic is
  157. // masqueraded/NATed. For example, "eth0". If blank, a platform-
  158. // appropriate default is used.
  159. EgressInterface string
  160. // GetDNSResolverIPv4Addresses is a function which returns the
  161. // DNS resolvers to use as transparent DNS rewrite targets for
  162. // IPv4 DNS traffic.
  163. //
  164. // GetDNSResolverIPv4Addresses is invoked for each new client
  165. // session and the list of resolvers is stored with the session.
  166. // This is a compromise between checking current resolvers for
  167. // each packet (too expensive) and simply passing in a static
  168. // list (won't pick up resolver changes). As implemented, only
  169. // new client sessions will pick up resolver changes.
  170. //
  171. // Transparent DNS rewriting occurs when the client uses the
  172. // specific, target transparent DNS addresses specified by
  173. // GetTransparentDNSResolverIPv4/6Address.
  174. //
  175. // For outbound DNS packets with a target resolver IP address,
  176. // a random resolver is selected and used for the rewrite.
  177. // For inbound packets, _any_ resolver in the list is rewritten
  178. // back to the target resolver IP address. As a side-effect,
  179. // responses to client DNS packets originally destined for a
  180. // resolver in GetDNSResolverIPv4Addresses will be lost.
  181. GetDNSResolverIPv4Addresses func() []net.IP
  182. // GetDNSResolverIPv6Addresses is a function which returns the
  183. // DNS resolvers to use as transparent DNS rewrite targets for
  184. // IPv6 DNS traffic. It functions like GetDNSResolverIPv4Addresses.
  185. GetDNSResolverIPv6Addresses func() []net.IP
  186. // DownstreamPacketQueueSize specifies the size of the downstream
  187. // packet queue. The packet tunnel server multiplexes all client
  188. // packets through a single tun device, so when a packet is read,
  189. // it must be queued or dropped if it cannot be immediately routed
  190. // to the appropriate client. Note that the TCP and SSH windows
  191. // for the underlying channel transport will impact transfer rate
  192. // and queuing.
  193. // When DownstreamPacketQueueSize is 0, a default value tuned for
  194. // Psiphon is used.
  195. DownstreamPacketQueueSize int
  196. // MTU specifies the maximum transmission unit for the packet
  197. // tunnel. Clients must be configured with the same MTU. The
  198. // server's tun device will be set to this MTU value and is
  199. // assumed not to change for the duration of the server.
  200. // When MTU is 0, a default value is used.
  201. MTU int
  202. // SessionIdleExpirySeconds specifies how long to retain client
  203. // sessions which have no client attached. Sessions are retained
  204. // across client connections so reconnecting clients can resume
  205. // a previous session. Resuming avoids leasing new IP addresses
  206. // for reconnection, and also retains NAT state for active
  207. // tunneled connections.
  208. //
  209. // SessionIdleExpirySeconds is also, effectively, the lease
  210. // time for assigned IP addresses.
  211. SessionIdleExpirySeconds int
  212. }
  213. // Server is a packet tunnel server. A packet tunnel server
  214. // maintains client sessions, relays packets through client
  215. // channels, and multiplexes packets through a single tun
  216. // device. The server assigns IP addresses to clients, performs
  217. // IP address and transparent DNS rewriting, and enforces
  218. // traffic rules.
  219. type Server struct {
  220. config *ServerConfig
  221. device *Device
  222. indexToSession sync.Map
  223. sessionIDToIndex sync.Map
  224. connectedInProgress *sync.WaitGroup
  225. workers *sync.WaitGroup
  226. runContext context.Context
  227. stopRunning context.CancelFunc
  228. orphanMetrics *packetMetrics
  229. }
  230. // NewServer initializes a server.
  231. func NewServer(config *ServerConfig) (*Server, error) {
  232. device, err := NewServerDevice(config)
  233. if err != nil {
  234. return nil, common.ContextError(err)
  235. }
  236. runContext, stopRunning := context.WithCancel(context.Background())
  237. return &Server{
  238. config: config,
  239. device: device,
  240. connectedInProgress: new(sync.WaitGroup),
  241. workers: new(sync.WaitGroup),
  242. runContext: runContext,
  243. stopRunning: stopRunning,
  244. orphanMetrics: new(packetMetrics),
  245. }, nil
  246. }
  247. // Start starts a server and returns with it running.
  248. func (server *Server) Start() {
  249. server.config.Logger.WithContext().Info("starting")
  250. server.workers.Add(1)
  251. go server.runSessionReaper()
  252. server.workers.Add(1)
  253. go server.runOrphanMetricsCheckpointer()
  254. server.workers.Add(1)
  255. go server.runDeviceDownstream()
  256. }
  257. // Stop halts a running server.
  258. func (server *Server) Stop() {
  259. server.config.Logger.WithContext().Info("stopping")
  260. server.stopRunning()
  261. // Interrupt blocked device read/writes.
  262. server.device.Close()
  263. // Wait for any in-progress ClientConnected calls to complete.
  264. server.connectedInProgress.Wait()
  265. // After this point, no further clients will be added: all
  266. // in-progress ClientConnected calls have finished; and any
  267. // later ClientConnected calls won't get past their
  268. // server.runContext.Done() checks.
  269. // Close all clients. Client workers will be joined
  270. // by the following server.workers.Wait().
  271. server.indexToSession.Range(func(_, value interface{}) bool {
  272. session := value.(*session)
  273. server.interruptSession(session)
  274. return true
  275. })
  276. server.workers.Wait()
  277. server.config.Logger.WithContext().Info("stopped")
  278. }
  279. // AllowedPortChecker is a function which returns true when it is
  280. // permitted to relay packets to the specified upstream IP address
  281. // and/or port.
  282. type AllowedPortChecker func(upstreamIPAddress net.IP, port int) bool
  283. // FlowActivityUpdater defines an interface for receiving updates for
  284. // flow activity. Values passed to UpdateProgress are bytes transferred
  285. // and flow duration since the previous UpdateProgress.
  286. type FlowActivityUpdater interface {
  287. UpdateProgress(upstreamBytes, downstreamBytes int64, durationNanoseconds int64)
  288. }
  289. // FlowActivityUpdaterMaker is a function which returns a list of
  290. // appropriate updaters for a new flow to the specified upstream
  291. // hostname (if known -- may be ""), and IP address.
  292. type FlowActivityUpdaterMaker func(
  293. upstreamHostname string, upstreamIPAddress net.IP) []FlowActivityUpdater
  294. // ClientConnected handles new client connections, creating or resuming
  295. // a session and returns with client packet handlers running.
  296. //
  297. // sessionID is used to identify sessions for resumption.
  298. //
  299. // transport provides the channel for relaying packets to and from
  300. // the client.
  301. //
  302. // checkAllowedTCPPortFunc/checkAllowedUDPPortFunc are callbacks used
  303. // to enforce traffic rules. For each TCP/UDP packet, the corresponding
  304. // function is called to check if traffic to the packet's port is
  305. // permitted. These callbacks must be efficient and safe for concurrent
  306. // calls.
  307. //
  308. // It is safe to make concurrent calls to ClientConnected for distinct
  309. // session IDs. The caller is responsible for serializing calls with the
  310. // same session ID. Further, the caller must ensure, in the case of a client
  311. // transport reconnect when an existing transport has not yet disconnected,
  312. // that ClientDisconnected is called first -- so it doesn't undo the new
  313. // ClientConnected. (psiphond meets these constraints by closing any
  314. // existing SSH client with duplicate session ID early in the lifecycle of
  315. // a new SSH client connection.)
  316. func (server *Server) ClientConnected(
  317. sessionID string,
  318. transport io.ReadWriteCloser,
  319. checkAllowedTCPPortFunc, checkAllowedUDPPortFunc AllowedPortChecker,
  320. flowActivityUpdaterMaker FlowActivityUpdaterMaker) error {
  321. // It's unusual to call both sync.WaitGroup.Add() _and_ Done() in the same
  322. // goroutine. There's no other place to call Add() since ClientConnected is
  323. // an API entrypoint. And Done() works because the invariant enforced by
  324. // connectedInProgress.Wait() is not that no ClientConnected calls are in
  325. // progress, but that no such calls are in progress past the
  326. // server.runContext.Done() check.
  327. server.connectedInProgress.Add(1)
  328. defer server.connectedInProgress.Done()
  329. select {
  330. case <-server.runContext.Done():
  331. return common.ContextError(errors.New("server stopping"))
  332. default:
  333. }
  334. server.config.Logger.WithContextFields(
  335. common.LogFields{"sessionID": sessionID}).Info("client connected")
  336. MTU := getMTU(server.config.MTU)
  337. clientSession := server.getSession(sessionID)
  338. if clientSession != nil {
  339. // Call interruptSession to ensure session is in the
  340. // expected idle state.
  341. server.interruptSession(clientSession)
  342. // Note: we don't check the session expiry; whether it has
  343. // already expired and not yet been reaped; or is about
  344. // to expire very shortly. It could happen that the reaper
  345. // will kill this session between now and when the expiry
  346. // is reset in the following resumeSession call. In this
  347. // unlikely case, the packet tunnel client should reconnect.
  348. } else {
  349. downstreamPacketQueueSize := DEFAULT_DOWNSTREAM_PACKET_QUEUE_SIZE
  350. if server.config.DownstreamPacketQueueSize > 0 {
  351. downstreamPacketQueueSize = server.config.DownstreamPacketQueueSize
  352. }
  353. // Store IPv4 resolver addresses in 4-byte representation
  354. // for use in rewritting.
  355. resolvers := server.config.GetDNSResolverIPv4Addresses()
  356. DNSResolverIPv4Addresses := make([]net.IP, len(resolvers))
  357. for i, resolver := range resolvers {
  358. // Assumes To4 is non-nil
  359. DNSResolverIPv4Addresses[i] = resolver.To4()
  360. }
  361. clientSession = &session{
  362. lastActivity: int64(monotime.Now()),
  363. sessionID: sessionID,
  364. metrics: new(packetMetrics),
  365. DNSResolverIPv4Addresses: append([]net.IP(nil), DNSResolverIPv4Addresses...),
  366. DNSResolverIPv6Addresses: append([]net.IP(nil), server.config.GetDNSResolverIPv6Addresses()...),
  367. checkAllowedTCPPortFunc: checkAllowedTCPPortFunc,
  368. checkAllowedUDPPortFunc: checkAllowedUDPPortFunc,
  369. flowActivityUpdaterMaker: flowActivityUpdaterMaker,
  370. downstreamPackets: NewPacketQueue(downstreamPacketQueueSize),
  371. workers: new(sync.WaitGroup),
  372. }
  373. // allocateIndex initializes session.index, session.assignedIPv4Address,
  374. // and session.assignedIPv6Address; and updates server.indexToSession and
  375. // server.sessionIDToIndex.
  376. err := server.allocateIndex(clientSession)
  377. if err != nil {
  378. return common.ContextError(err)
  379. }
  380. }
  381. server.resumeSession(clientSession, NewChannel(transport, MTU))
  382. return nil
  383. }
  384. // ClientDisconnected handles clients disconnecting. Packet handlers
  385. // are halted, but the client session is left intact to reserve the
  386. // assigned IP addresses and retain network state in case the client
  387. // soon reconnects.
  388. func (server *Server) ClientDisconnected(sessionID string) {
  389. session := server.getSession(sessionID)
  390. if session != nil {
  391. server.config.Logger.WithContextFields(
  392. common.LogFields{"sessionID": sessionID}).Info("client disconnected")
  393. server.interruptSession(session)
  394. }
  395. }
  396. func (server *Server) getSession(sessionID string) *session {
  397. if index, ok := server.sessionIDToIndex.Load(sessionID); ok {
  398. s, ok := server.indexToSession.Load(index.(int32))
  399. if ok {
  400. return s.(*session)
  401. }
  402. server.config.Logger.WithContext().Warning("unexpected missing session")
  403. }
  404. return nil
  405. }
  406. func (server *Server) resumeSession(session *session, channel *Channel) {
  407. session.mutex.Lock()
  408. session.mutex.Unlock()
  409. session.channel = channel
  410. // Parent context is not server.runContext so that session workers
  411. // need only check session.stopRunning to act on shutdown events.
  412. session.runContext, session.stopRunning = context.WithCancel(context.Background())
  413. // When a session is interrupted, all goroutines in session.workers
  414. // are joined. When the server is stopped, all goroutines in
  415. // server.workers are joined. So, in both cases we synchronously
  416. // stop all workers associated with this session.
  417. session.workers.Add(1)
  418. go server.runClientUpstream(session)
  419. session.workers.Add(1)
  420. go server.runClientDownstream(session)
  421. session.touch()
  422. }
  423. func (server *Server) interruptSession(session *session) {
  424. session.mutex.Lock()
  425. defer session.mutex.Unlock()
  426. wasRunning := (session.channel != nil)
  427. session.stopRunning()
  428. if session.channel != nil {
  429. // Interrupt blocked channel read/writes.
  430. session.channel.Close()
  431. }
  432. session.workers.Wait()
  433. if session.channel != nil {
  434. // Don't hold a reference to channel, allowing both it and
  435. // its conn to be garbage collected.
  436. // Setting channel to nil must happen after workers.Wait()
  437. // to ensure no goroutines remains which may access
  438. // session.channel.
  439. session.channel = nil
  440. }
  441. // interruptSession may be called for idle sessions, to ensure
  442. // the session is in an expected state: in ClientConnected,
  443. // and in server.Stop(); don't log in those cases.
  444. if wasRunning {
  445. session.metrics.checkpoint(
  446. server.config.Logger, "packet_metrics", packetMetricsAll)
  447. }
  448. }
  449. func (server *Server) runSessionReaper() {
  450. defer server.workers.Done()
  451. // Periodically iterate over all sessions and discard expired
  452. // sessions. This action, removing the index from server.indexToSession,
  453. // releases the IP addresses assigned to the session.
  454. // TODO: As-is, this will discard sessions for live SSH tunnels,
  455. // as long as the SSH channel for such a session has been idle for
  456. // a sufficient period. Should the session be retained as long as
  457. // the SSH tunnel is alive (e.g., expose and call session.touch()
  458. // on keepalive events)? Or is it better to free up resources held
  459. // by idle sessions?
  460. idleExpiry := server.sessionIdleExpiry()
  461. ticker := time.NewTicker(idleExpiry / 2)
  462. defer ticker.Stop()
  463. for {
  464. select {
  465. case <-ticker.C:
  466. server.indexToSession.Range(func(_, value interface{}) bool {
  467. session := value.(*session)
  468. if session.expired(idleExpiry) {
  469. server.removeSession(session)
  470. }
  471. return true
  472. })
  473. case <-server.runContext.Done():
  474. return
  475. }
  476. }
  477. }
  478. func (server *Server) sessionIdleExpiry() time.Duration {
  479. sessionIdleExpirySeconds := DEFAULT_IDLE_SESSION_EXPIRY_SECONDS
  480. if server.config.SessionIdleExpirySeconds > 2 {
  481. sessionIdleExpirySeconds = server.config.SessionIdleExpirySeconds
  482. }
  483. return time.Duration(sessionIdleExpirySeconds) * time.Second
  484. }
  485. func (server *Server) removeSession(session *session) {
  486. server.sessionIDToIndex.Delete(session.sessionID)
  487. server.indexToSession.Delete(session.index)
  488. server.interruptSession(session)
  489. }
  490. func (server *Server) runOrphanMetricsCheckpointer() {
  491. defer server.workers.Done()
  492. // Periodically log orphan packet metrics. Orphan metrics
  493. // are not associated with any session. This includes
  494. // packets that are rejected before they can be associated
  495. // with a session.
  496. ticker := time.NewTicker(ORPHAN_METRICS_CHECKPOINTER_PERIOD)
  497. defer ticker.Stop()
  498. for {
  499. done := false
  500. select {
  501. case <-ticker.C:
  502. case <-server.runContext.Done():
  503. done = true
  504. }
  505. // TODO: skip log if all zeros?
  506. server.orphanMetrics.checkpoint(
  507. server.config.Logger, "orphan_packet_metrics", packetMetricsRejected)
  508. if done {
  509. return
  510. }
  511. }
  512. }
  513. func (server *Server) runDeviceDownstream() {
  514. defer server.workers.Done()
  515. // Read incoming packets from the tun device, parse and validate the
  516. // packets, map them to a session/client, perform rewriting, and relay
  517. // the packets to the client.
  518. for {
  519. readPacket, err := server.device.ReadPacket()
  520. select {
  521. case <-server.runContext.Done():
  522. // No error is logged as shutdown may have interrupted read.
  523. return
  524. default:
  525. }
  526. if err != nil {
  527. server.config.Logger.WithContextFields(
  528. common.LogFields{"error": err}).Warning("read device packet failed")
  529. // May be temporary error condition, keep reading.
  530. continue
  531. }
  532. // destinationIPAddress determines which client receives this packet.
  533. // At this point, only enough of the packet is inspected to determine
  534. // this routing info; further validation happens in subsequent
  535. // processPacket in runClientDownstream.
  536. // Note that masquerading/NAT stands between the Internet and the tun
  537. // device, so arbitrary packets cannot be sent through to this point.
  538. // TODO: getPacketDestinationIPAddress and processPacket perform redundant
  539. // packet parsing; refactor to avoid extra work?
  540. destinationIPAddress, ok := getPacketDestinationIPAddress(
  541. server.orphanMetrics, packetDirectionServerDownstream, readPacket)
  542. if !ok {
  543. // Packet is dropped. Reason will be counted in orphan metrics.
  544. continue
  545. }
  546. // Map destination IP address to client session.
  547. index := server.convertIPAddressToIndex(destinationIPAddress)
  548. s, ok := server.indexToSession.Load(index)
  549. if !ok {
  550. server.orphanMetrics.rejectedPacket(
  551. packetDirectionServerDownstream, packetRejectNoSession)
  552. continue
  553. }
  554. session := s.(*session)
  555. // Simply enqueue the packet for client handling, and move on to
  556. // read the next packet. The packet tunnel server multiplexes all
  557. // client packets through a single tun device, so we must not block
  558. // on client channel I/O here.
  559. //
  560. // When the queue is full, the packet is dropped. This is standard
  561. // behavior for routers, VPN servers, etc.
  562. //
  563. // We allow packets to enqueue in an idle session in case a client
  564. // is in the process of reconnecting.
  565. //
  566. // TODO: processPacket is performed here, instead of runClientDownstream,
  567. // since packets are packed contiguously into the packet queue and if
  568. // the packet it to be omitted, that should be done before enqueuing.
  569. // The potential downside is that all packet processing is done in this
  570. // single thread of execution, blocking the next packet for the next
  571. // client. Try handing off the packet to another worker which will
  572. // call processPacket and Enqueue?
  573. // In downstream mode, processPacket rewrites the destination address
  574. // to the original client source IP address, and also rewrites DNS
  575. // packets. As documented in runClientUpstream, the original address
  576. // should already be populated via an upstream packet; if not, the
  577. // packet will be rejected.
  578. if !processPacket(
  579. session.metrics,
  580. session,
  581. packetDirectionServerDownstream,
  582. readPacket) {
  583. // Packet is rejected and dropped. Reason will be counted in metrics.
  584. continue
  585. }
  586. session.downstreamPackets.Enqueue(readPacket)
  587. }
  588. }
  589. func (server *Server) runClientUpstream(session *session) {
  590. defer session.workers.Done()
  591. // Read incoming packets from the client channel, validate the packets,
  592. // perform rewriting, and send them through to the tun device.
  593. for {
  594. readPacket, err := session.channel.ReadPacket()
  595. select {
  596. case <-session.runContext.Done():
  597. // No error is logged as shutdown may have interrupted read.
  598. return
  599. default:
  600. }
  601. if err != nil {
  602. server.config.Logger.WithContextFields(
  603. common.LogFields{"error": err}).Warning("read channel packet failed")
  604. // Tear down the session. Must be invoked asynchronously.
  605. go server.interruptSession(session)
  606. return
  607. }
  608. session.touch()
  609. // processPacket transparently rewrites the source address to the
  610. // session's assigned address and rewrites the destination of any
  611. // DNS packets destined to the target DNS resolver.
  612. //
  613. // The first time the source address is rewritten, the original
  614. // value is recorded so inbound packets can have the reverse
  615. // rewrite applied. This assumes that the client will send a
  616. // packet before receiving any packet, which is the case since
  617. // only clients can initiate TCP or UDP connections or flows.
  618. if !processPacket(
  619. session.metrics,
  620. session,
  621. packetDirectionServerUpstream,
  622. readPacket) {
  623. // Packet is rejected and dropped. Reason will be counted in metrics.
  624. continue
  625. }
  626. err = server.device.WritePacket(readPacket)
  627. if err != nil {
  628. server.config.Logger.WithContextFields(
  629. common.LogFields{"error": err}).Warning("write device packet failed")
  630. // May be temporary error condition, keep working. The packet is
  631. // most likely dropped.
  632. continue
  633. }
  634. }
  635. }
  636. func (server *Server) runClientDownstream(session *session) {
  637. defer session.workers.Done()
  638. // Dequeue, process, and relay packets to be sent to the client channel.
  639. for {
  640. packetBuffer, ok := session.downstreamPackets.DequeueFramedPackets(session.runContext)
  641. if !ok {
  642. // Dequeue aborted due to session.runContext.Done()
  643. return
  644. }
  645. err := session.channel.WriteFramedPackets(packetBuffer)
  646. if err != nil {
  647. server.config.Logger.WithContextFields(
  648. common.LogFields{"error": err}).Warning("write channel packets failed")
  649. session.downstreamPackets.Replace(packetBuffer)
  650. // Tear down the session. Must be invoked asynchronously.
  651. go server.interruptSession(session)
  652. return
  653. }
  654. session.touch()
  655. session.downstreamPackets.Replace(packetBuffer)
  656. }
  657. }
  658. var (
  659. serverIPv4AddressCIDR = "10.0.0.1/8"
  660. transparentDNSResolverIPv4Address = net.ParseIP("10.0.0.2").To4() // 4-byte for rewriting
  661. _, privateSubnetIPv4, _ = net.ParseCIDR("10.0.0.0/8")
  662. assignedIPv4AddressTemplate = "10.%02d.%02d.%02d"
  663. serverIPv6AddressCIDR = "fd19:ca83:e6d5:1c44:0000:0000:0000:0001/64"
  664. transparentDNSResolverIPv6Address = net.ParseIP("fd19:ca83:e6d5:1c44:0000:0000:0000:0002")
  665. _, privateSubnetIPv6, _ = net.ParseCIDR("fd19:ca83:e6d5:1c44::/64")
  666. assignedIPv6AddressTemplate = "fd19:ca83:e6d5:1c44:8c57:4434:ee%02x:%02x%02x"
  667. )
  668. func (server *Server) allocateIndex(newSession *session) error {
  669. // Find and assign an available index in the 24-bit index space.
  670. // The index directly maps to and so determines the assigned
  671. // IPv4 and IPv6 addresses.
  672. // Search is a random index selection followed by a linear probe.
  673. // TODO: is this the most effective (fast on average, simple) algorithm?
  674. max := 0x00FFFFFF
  675. randomInt, err := common.MakeSecureRandomInt(max + 1)
  676. if err != nil {
  677. return common.ContextError(err)
  678. }
  679. index := int32(randomInt)
  680. index &= int32(max)
  681. idleExpiry := server.sessionIdleExpiry()
  682. for tries := 0; tries < 100000; index++ {
  683. tries++
  684. // The index/address space isn't exactly 24-bits:
  685. // - 0 and 0x00FFFFFF are reserved since they map to
  686. // the network identifier (10.0.0.0) and broadcast
  687. // address (10.255.255.255) respectively
  688. // - 1 is reserved as the server tun device address,
  689. // (10.0.0.1, and IPv6 equivalent)
  690. // - 2 is reserver as the transparent DNS target
  691. // address (10.0.0.2, and IPv6 equivalent)
  692. if index <= 2 {
  693. continue
  694. }
  695. if index == 0x00FFFFFF {
  696. index = 0
  697. continue
  698. }
  699. if s, ok := server.indexToSession.LoadOrStore(index, newSession); ok {
  700. // Index is already in use or acquired concurrently.
  701. // If the existing session is expired, reap it and use index.
  702. existingSession := s.(*session)
  703. if existingSession.expired(idleExpiry) {
  704. server.removeSession(existingSession)
  705. } else {
  706. continue
  707. }
  708. }
  709. // Note: the To4() for assignedIPv4Address is essential since
  710. // that address value is assumed to be 4 bytes when rewriting.
  711. newSession.index = index
  712. newSession.assignedIPv4Address = server.convertIndexToIPv4Address(index).To4()
  713. newSession.assignedIPv6Address = server.convertIndexToIPv6Address(index)
  714. server.sessionIDToIndex.Store(newSession.sessionID, index)
  715. server.resetRouting(newSession.assignedIPv4Address, newSession.assignedIPv6Address)
  716. return nil
  717. }
  718. return common.ContextError(errors.New("unallocated index not found"))
  719. }
  720. func (server *Server) resetRouting(IPv4Address, IPv6Address net.IP) {
  721. // Attempt to clear the NAT table of any existing connection
  722. // states. This will prevent the (already unlikely) delivery
  723. // of packets to the wrong client when an assigned IP address is
  724. // recycled. Silently has no effect on some platforms, see
  725. // resetNATTables implementations.
  726. err := resetNATTables(server.config, IPv4Address)
  727. if err != nil {
  728. server.config.Logger.WithContextFields(
  729. common.LogFields{"error": err}).Warning("reset IPv4 routing failed")
  730. }
  731. err = resetNATTables(server.config, IPv6Address)
  732. if err != nil {
  733. server.config.Logger.WithContextFields(
  734. common.LogFields{"error": err}).Warning("reset IPv6 routing failed")
  735. }
  736. }
  737. func (server *Server) convertIPAddressToIndex(IP net.IP) int32 {
  738. // Assumes IP is at least 3 bytes.
  739. size := len(IP)
  740. return int32(IP[size-3])<<16 | int32(IP[size-2])<<8 | int32(IP[size-1])
  741. }
  742. func (server *Server) convertIndexToIPv4Address(index int32) net.IP {
  743. return net.ParseIP(
  744. fmt.Sprintf(
  745. assignedIPv4AddressTemplate,
  746. (index>>16)&0xFF,
  747. (index>>8)&0xFF,
  748. index&0xFF))
  749. }
  750. func (server *Server) convertIndexToIPv6Address(index int32) net.IP {
  751. return net.ParseIP(
  752. fmt.Sprintf(
  753. assignedIPv6AddressTemplate,
  754. (index>>16)&0xFF,
  755. (index>>8)&0xFF,
  756. index&0xFF))
  757. }
  758. // GetTransparentDNSResolverIPv4Address returns the static IPv4 address
  759. // to use as a DNS resolver when transparent DNS rewriting is desired.
  760. func GetTransparentDNSResolverIPv4Address() net.IP {
  761. return transparentDNSResolverIPv4Address
  762. }
  763. // GetTransparentDNSResolverIPv6Address returns the static IPv6 address
  764. // to use as a DNS resolver when transparent DNS rewriting is desired.
  765. func GetTransparentDNSResolverIPv6Address() net.IP {
  766. return transparentDNSResolverIPv6Address
  767. }
  768. type session struct {
  769. // Note: 64-bit ints used with atomic operations are placed
  770. // at the start of struct to ensure 64-bit alignment.
  771. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  772. lastActivity int64
  773. lastFlowReapIndex int64
  774. metrics *packetMetrics
  775. sessionID string
  776. index int32
  777. DNSResolverIPv4Addresses []net.IP
  778. assignedIPv4Address net.IP
  779. setOriginalIPv4Address int32
  780. originalIPv4Address net.IP
  781. DNSResolverIPv6Addresses []net.IP
  782. assignedIPv6Address net.IP
  783. setOriginalIPv6Address int32
  784. originalIPv6Address net.IP
  785. checkAllowedTCPPortFunc AllowedPortChecker
  786. checkAllowedUDPPortFunc AllowedPortChecker
  787. flowActivityUpdaterMaker FlowActivityUpdaterMaker
  788. downstreamPackets *PacketQueue
  789. flows sync.Map
  790. workers *sync.WaitGroup
  791. mutex sync.Mutex
  792. channel *Channel
  793. runContext context.Context
  794. stopRunning context.CancelFunc
  795. }
  796. // flowID identifies an IP traffic flow using the conventional
  797. // network 5-tuple. flowIDs track bidirectional flows.
  798. type flowID struct {
  799. downstreamIPAddress [net.IPv6len]byte
  800. downstreamPort uint16
  801. upstreamIPAddress [net.IPv6len]byte
  802. upstreamPort uint16
  803. protocol internetProtocol
  804. }
  805. // From: https://github.com/golang/go/blob/b88efc7e7ac15f9e0b5d8d9c82f870294f6a3839/src/net/ip.go#L55
  806. var v4InV6Prefix = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}
  807. func (f *flowID) set(
  808. downstreamIPAddress net.IP,
  809. downstreamPort uint16,
  810. upstreamIPAddress net.IP,
  811. upstreamPort uint16,
  812. protocol internetProtocol) {
  813. if len(downstreamIPAddress) == net.IPv4len {
  814. copy(f.downstreamIPAddress[:], v4InV6Prefix)
  815. copy(f.downstreamIPAddress[len(v4InV6Prefix):], downstreamIPAddress)
  816. } else { // net.IPv6len
  817. copy(f.downstreamIPAddress[:], downstreamIPAddress)
  818. }
  819. f.downstreamPort = downstreamPort
  820. if len(upstreamIPAddress) == net.IPv4len {
  821. copy(f.upstreamIPAddress[:], v4InV6Prefix)
  822. copy(f.upstreamIPAddress[len(v4InV6Prefix):], upstreamIPAddress)
  823. } else { // net.IPv6len
  824. copy(f.upstreamIPAddress[:], upstreamIPAddress)
  825. }
  826. f.upstreamPort = upstreamPort
  827. f.protocol = protocol
  828. }
  829. type flowState struct {
  830. // Note: 64-bit ints used with atomic operations are placed
  831. // at the start of struct to ensure 64-bit alignment.
  832. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  833. lastUpstreamPacketTime int64
  834. lastDownstreamPacketTime int64
  835. activityUpdaters []FlowActivityUpdater
  836. }
  837. func (flowState *flowState) expired(idleExpiry time.Duration) bool {
  838. now := monotime.Now()
  839. return (now.Sub(monotime.Time(atomic.LoadInt64(&flowState.lastUpstreamPacketTime))) > idleExpiry) ||
  840. (now.Sub(monotime.Time(atomic.LoadInt64(&flowState.lastDownstreamPacketTime))) > idleExpiry)
  841. }
  842. func (session *session) touch() {
  843. atomic.StoreInt64(&session.lastActivity, int64(monotime.Now()))
  844. }
  845. func (session *session) expired(idleExpiry time.Duration) bool {
  846. lastActivity := monotime.Time(atomic.LoadInt64(&session.lastActivity))
  847. return monotime.Since(lastActivity) > idleExpiry
  848. }
  849. func (session *session) setOriginalIPv4AddressIfNotSet(IPAddress net.IP) {
  850. if !atomic.CompareAndSwapInt32(&session.setOriginalIPv4Address, 0, 1) {
  851. return
  852. }
  853. // Make a copy of IPAddress; don't reference a slice of a reusable
  854. // packet buffer, which will be overwritten.
  855. session.originalIPv4Address = net.IP(append([]byte(nil), []byte(IPAddress)...))
  856. }
  857. func (session *session) getOriginalIPv4Address() net.IP {
  858. if atomic.LoadInt32(&session.setOriginalIPv4Address) == 0 {
  859. return nil
  860. }
  861. return session.originalIPv4Address
  862. }
  863. func (session *session) setOriginalIPv6AddressIfNotSet(IPAddress net.IP) {
  864. if !atomic.CompareAndSwapInt32(&session.setOriginalIPv6Address, 0, 1) {
  865. return
  866. }
  867. // Make a copy of IPAddress.
  868. session.originalIPv6Address = net.IP(append([]byte(nil), []byte(IPAddress)...))
  869. }
  870. func (session *session) getOriginalIPv6Address() net.IP {
  871. if atomic.LoadInt32(&session.setOriginalIPv6Address) == 0 {
  872. return nil
  873. }
  874. return session.originalIPv6Address
  875. }
  876. // isTrackingFlow checks if a flow is being tracked.
  877. func (session *session) isTrackingFlow(ID flowID) bool {
  878. f, ok := session.flows.Load(ID)
  879. if !ok {
  880. return false
  881. }
  882. flowState := f.(*flowState)
  883. // Check if flow is expired but not yet reaped.
  884. if flowState.expired(FLOW_IDLE_EXPIRY) {
  885. session.flows.Delete(ID)
  886. return false
  887. }
  888. return true
  889. }
  890. // startTrackingFlow starts flow tracking for the flow identified
  891. // by ID.
  892. //
  893. // Flow tracking is used to implement:
  894. // - one-time permissions checks for a flow
  895. // - OSLs
  896. // - domain bytes transferred [TODO]
  897. //
  898. // The applicationData from the first packet in the flow is
  899. // inspected to determine any associated hostname, using HTTP or
  900. // TLS payload. The session's FlowActivityUpdaterMaker is invoked
  901. // to determine a list of updaters to track flow activity.
  902. //
  903. // Updaters receive reports with the number of application data
  904. // bytes in each flow packet. This number, totalled for all packets
  905. // in a flow, may exceed the total bytes transferred at the
  906. // application level due to TCP retransmission. Currently, the flow
  907. // tracking logic doesn't exclude retransmitted packets from update
  908. // reporting.
  909. //
  910. // Flows are untracked after an idle expiry period. Transport
  911. // protocol indicators of end of flow, such as FIN or RST for TCP,
  912. // which may or may not appear in a flow, are not currently used.
  913. //
  914. // startTrackingFlow may be called from concurrent goroutines; if
  915. // the flow is already tracked, it is simply updated.
  916. func (session *session) startTrackingFlow(
  917. ID flowID, direction packetDirection, applicationData []byte) {
  918. now := int64(monotime.Now())
  919. // Once every period, iterate over flows and reap expired entries.
  920. reapIndex := now / int64(monotime.Time(FLOW_IDLE_EXPIRY/2))
  921. previousReapIndex := atomic.LoadInt64(&session.lastFlowReapIndex)
  922. if reapIndex != previousReapIndex &&
  923. atomic.CompareAndSwapInt64(&session.lastFlowReapIndex, previousReapIndex, reapIndex) {
  924. session.reapFlows()
  925. }
  926. var hostname string
  927. if ID.protocol == internetProtocolTCP {
  928. // TODO: implement
  929. // hostname = common.ExtractHostnameFromTCPFlow(applicationData)
  930. }
  931. flowState := &flowState{
  932. activityUpdaters: session.flowActivityUpdaterMaker(
  933. hostname,
  934. net.IP(ID.upstreamIPAddress[:])),
  935. }
  936. if direction == packetDirectionServerUpstream {
  937. flowState.lastUpstreamPacketTime = now
  938. } else {
  939. flowState.lastDownstreamPacketTime = now
  940. }
  941. // LoadOrStore will retain any existing entry
  942. session.flows.LoadOrStore(ID, flowState)
  943. session.updateFlow(ID, direction, applicationData)
  944. }
  945. func (session *session) updateFlow(
  946. ID flowID, direction packetDirection, applicationData []byte) {
  947. f, ok := session.flows.Load(ID)
  948. if !ok {
  949. return
  950. }
  951. flowState := f.(*flowState)
  952. // Note: no expired check here, since caller is assumed to
  953. // have just called isTrackingFlow.
  954. now := int64(monotime.Now())
  955. var upstreamBytes, downstreamBytes, durationNanoseconds int64
  956. if direction == packetDirectionServerUpstream {
  957. upstreamBytes = int64(len(applicationData))
  958. atomic.StoreInt64(&flowState.lastUpstreamPacketTime, now)
  959. } else {
  960. downstreamBytes = int64(len(applicationData))
  961. // Follows common.ActivityMonitoredConn semantics, where
  962. // duration is updated only for downstream activity. This
  963. // is intened to produce equivalent behaviour for port
  964. // forward clients (tracked with ActivityUpdaters) and
  965. // packet tunnel clients (tracked with FlowActivityUpdaters).
  966. durationNanoseconds = now - atomic.SwapInt64(&flowState.lastDownstreamPacketTime, now)
  967. }
  968. for _, updater := range flowState.activityUpdaters {
  969. updater.UpdateProgress(upstreamBytes, downstreamBytes, durationNanoseconds)
  970. }
  971. }
  972. // reapFlows removes expired idle flows.
  973. func (session *session) reapFlows() {
  974. session.flows.Range(func(key, value interface{}) bool {
  975. flowState := value.(*flowState)
  976. if flowState.expired(FLOW_IDLE_EXPIRY) {
  977. session.flows.Delete(key)
  978. }
  979. return true
  980. })
  981. }
  982. type packetMetrics struct {
  983. upstreamRejectReasons [packetRejectReasonCount]int64
  984. downstreamRejectReasons [packetRejectReasonCount]int64
  985. TCPIPv4 relayedPacketMetrics
  986. TCPIPv6 relayedPacketMetrics
  987. UDPIPv4 relayedPacketMetrics
  988. UDPIPv6 relayedPacketMetrics
  989. }
  990. type relayedPacketMetrics struct {
  991. packetsUp int64
  992. packetsDown int64
  993. bytesUp int64
  994. bytesDown int64
  995. }
  996. func (metrics *packetMetrics) rejectedPacket(
  997. direction packetDirection,
  998. reason packetRejectReason) {
  999. if direction == packetDirectionServerUpstream ||
  1000. direction == packetDirectionClientUpstream {
  1001. atomic.AddInt64(&metrics.upstreamRejectReasons[reason], 1)
  1002. } else { // packetDirectionDownstream
  1003. atomic.AddInt64(&metrics.downstreamRejectReasons[reason], 1)
  1004. }
  1005. }
  1006. func (metrics *packetMetrics) relayedPacket(
  1007. direction packetDirection,
  1008. version int,
  1009. protocol internetProtocol,
  1010. packetLength int) {
  1011. var packetsMetric, bytesMetric *int64
  1012. if direction == packetDirectionServerUpstream ||
  1013. direction == packetDirectionClientUpstream {
  1014. if version == 4 {
  1015. if protocol == internetProtocolTCP {
  1016. packetsMetric = &metrics.TCPIPv4.packetsUp
  1017. bytesMetric = &metrics.TCPIPv4.bytesUp
  1018. } else { // UDP
  1019. packetsMetric = &metrics.UDPIPv4.packetsUp
  1020. bytesMetric = &metrics.UDPIPv4.bytesUp
  1021. }
  1022. } else { // IPv6
  1023. if protocol == internetProtocolTCP {
  1024. packetsMetric = &metrics.TCPIPv6.packetsUp
  1025. bytesMetric = &metrics.TCPIPv6.bytesUp
  1026. } else { // UDP
  1027. packetsMetric = &metrics.UDPIPv6.packetsUp
  1028. bytesMetric = &metrics.UDPIPv6.bytesUp
  1029. }
  1030. }
  1031. } else { // packetDirectionDownstream
  1032. if version == 4 {
  1033. if protocol == internetProtocolTCP {
  1034. packetsMetric = &metrics.TCPIPv4.packetsDown
  1035. bytesMetric = &metrics.TCPIPv4.bytesDown
  1036. } else { // UDP
  1037. packetsMetric = &metrics.UDPIPv4.packetsDown
  1038. bytesMetric = &metrics.UDPIPv4.bytesDown
  1039. }
  1040. } else { // IPv6
  1041. if protocol == internetProtocolTCP {
  1042. packetsMetric = &metrics.TCPIPv6.packetsDown
  1043. bytesMetric = &metrics.TCPIPv6.bytesDown
  1044. } else { // UDP
  1045. packetsMetric = &metrics.UDPIPv6.packetsDown
  1046. bytesMetric = &metrics.UDPIPv6.bytesDown
  1047. }
  1048. }
  1049. }
  1050. // Note: packet length, and so bytes transferred, includes IP and TCP/UDP
  1051. // headers, not just payload data, as is counted in port forwarding. It
  1052. // makes sense to include this packet overhead, since we have to tunnel it.
  1053. atomic.AddInt64(packetsMetric, 1)
  1054. atomic.AddInt64(bytesMetric, int64(packetLength))
  1055. }
  1056. const (
  1057. packetMetricsRejected = 1
  1058. packetMetricsRelayed = 2
  1059. packetMetricsAll = packetMetricsRejected | packetMetricsRelayed
  1060. )
  1061. func (metrics *packetMetrics) checkpoint(
  1062. logger common.Logger, logName string, whichMetrics int) {
  1063. // Report all metric counters in a single log message. Each
  1064. // counter is reset to 0 when added to the log.
  1065. logFields := make(common.LogFields)
  1066. if whichMetrics&packetMetricsRejected != 0 {
  1067. for i := 0; i < packetRejectReasonCount; i++ {
  1068. logFields["upstream_packet_rejected_"+packetRejectReasonDescription(packetRejectReason(i))] =
  1069. atomic.SwapInt64(&metrics.upstreamRejectReasons[i], 0)
  1070. logFields["downstream_packet_rejected_"+packetRejectReasonDescription(packetRejectReason(i))] =
  1071. atomic.SwapInt64(&metrics.downstreamRejectReasons[i], 0)
  1072. }
  1073. }
  1074. if whichMetrics&packetMetricsRelayed != 0 {
  1075. relayedMetrics := []struct {
  1076. prefix string
  1077. metrics *relayedPacketMetrics
  1078. }{
  1079. {"tcp_ipv4_", &metrics.TCPIPv4},
  1080. {"tcp_ipv6_", &metrics.TCPIPv6},
  1081. {"udp_ipv4_", &metrics.UDPIPv4},
  1082. {"udp_ipv6_", &metrics.UDPIPv6},
  1083. }
  1084. for _, r := range relayedMetrics {
  1085. logFields[r.prefix+"packets_up"] = atomic.SwapInt64(&r.metrics.packetsUp, 0)
  1086. logFields[r.prefix+"packets_down"] = atomic.SwapInt64(&r.metrics.packetsDown, 0)
  1087. logFields[r.prefix+"bytes_up"] = atomic.SwapInt64(&r.metrics.bytesUp, 0)
  1088. logFields[r.prefix+"bytes_down"] = atomic.SwapInt64(&r.metrics.bytesDown, 0)
  1089. }
  1090. }
  1091. logger.LogMetric(logName, logFields)
  1092. }
  1093. // PacketQueue is a fixed-size, preallocated queue of packets.
  1094. // Enqueued packets are packed into a contiguous buffer with channel
  1095. // framing, allowing the entire queue to be written to a channel
  1096. // in a single call.
  1097. // Preallocating and reuse of the queue buffers avoids GC churn.
  1098. type PacketQueue struct {
  1099. emptyBuffers chan []byte
  1100. activeBuffer chan []byte
  1101. }
  1102. // NewPacketQueue creates a new PacketQueue.
  1103. // The caller must ensire that queueSize exceeds the
  1104. // packet MTU, or packets will will never enqueue.
  1105. func NewPacketQueue(queueSize int) *PacketQueue {
  1106. // Two buffers of size queueSize are allocated, to
  1107. // allow packets to continue to enqueue while one buffer
  1108. // is borrowed by the DequeueFramedPackets caller.
  1109. //
  1110. // TODO: is there a way to implement this without
  1111. // allocating 2x queueSize bytes? A circular queue
  1112. // won't work because we want DequeueFramedPackets
  1113. // to return a contiguous buffer. Perhaps a Bip
  1114. // Buffer would work here:
  1115. // https://www.codeproject.com/Articles/3479/The-Bip-Buffer-The-Circular-Buffer-with-a-Twist
  1116. queue := &PacketQueue{
  1117. emptyBuffers: make(chan []byte, 2),
  1118. activeBuffer: make(chan []byte, 1),
  1119. }
  1120. queue.emptyBuffers <- make([]byte, 0, queueSize)
  1121. queue.emptyBuffers <- make([]byte, 0, queueSize)
  1122. return queue
  1123. }
  1124. // Enqueue adds a packet to the queue.
  1125. // If the queue is full, the packet is dropped.
  1126. // Enqueue is _not_ safe for concurrent calls.
  1127. func (queue *PacketQueue) Enqueue(packet []byte) {
  1128. var buffer []byte
  1129. select {
  1130. case buffer = <-queue.activeBuffer:
  1131. default:
  1132. buffer = <-queue.emptyBuffers
  1133. }
  1134. packetSize := len(packet)
  1135. if cap(buffer)-len(buffer) >= channelHeaderSize+packetSize {
  1136. // Assumes len(packet)/MTU <= 64K
  1137. offset := len(buffer)
  1138. buffer = buffer[0 : len(buffer)+channelHeaderSize+packetSize]
  1139. binary.BigEndian.PutUint16(buffer[offset:offset+channelHeaderSize], uint16(packetSize))
  1140. copy(buffer[offset+channelHeaderSize:], packet)
  1141. }
  1142. // Else, queue is full, so drop packet.
  1143. queue.activeBuffer <- buffer
  1144. }
  1145. // DequeueFramedPackets waits until at least one packet is
  1146. // enqueued, and then returns a packet buffer containing one
  1147. // or more framed packets. The returned buffer remains part
  1148. // of the PacketQueue structure and the caller _must_ replace
  1149. // the buffer by calling Replace.
  1150. // DequeueFramedPackets unblocks and returns false if it receives
  1151. // runContext.Done().
  1152. // DequeueFramedPackets is _not_ safe for concurrent calls.
  1153. func (queue *PacketQueue) DequeueFramedPackets(
  1154. runContext context.Context) ([]byte, bool) {
  1155. var buffer []byte
  1156. select {
  1157. case buffer = <-queue.activeBuffer:
  1158. case <-runContext.Done():
  1159. return nil, false
  1160. }
  1161. return buffer, true
  1162. }
  1163. // Replace returns the buffer to the PacketQueue to be
  1164. // reused.
  1165. // The input must be a return value from DequeueFramedPackets.
  1166. func (queue *PacketQueue) Replace(buffer []byte) {
  1167. buffer = buffer[0:0]
  1168. // This won't block (as long as it is a DequeueFramedPackets return value).
  1169. queue.emptyBuffers <- buffer
  1170. }
  1171. // ClientConfig specifies the configuration of a packet tunnel client.
  1172. type ClientConfig struct {
  1173. // Logger is used for logging events and metrics.
  1174. Logger common.Logger
  1175. // SudoNetworkConfigCommands specifies whether to use "sudo"
  1176. // when executing network configuration commands. See description
  1177. // for ServerConfig.SudoNetworkConfigCommands.
  1178. SudoNetworkConfigCommands bool
  1179. // AllowNoIPv6NetworkConfiguration indicates that failures while
  1180. // configuring tun interfaces and routing for IPv6 are to be
  1181. // logged as warnings only. See description for
  1182. // ServerConfig.AllowNoIPv6NetworkConfiguration.
  1183. AllowNoIPv6NetworkConfiguration bool
  1184. // MTU is the packet MTU value to use; this value
  1185. // should be obtained from the packet tunnel server.
  1186. // When MTU is 0, a default value is used.
  1187. MTU int
  1188. // UpstreamPacketQueueSize specifies the size of the upstream
  1189. // packet queue.
  1190. // When UpstreamPacketQueueSize is 0, a default value tuned for
  1191. // Psiphon is used.
  1192. UpstreamPacketQueueSize int
  1193. // Transport is an established transport channel that
  1194. // will be used to relay packets to and from a packet
  1195. // tunnel server.
  1196. Transport io.ReadWriteCloser
  1197. // TunFileDescriptor specifies a file descriptor to use to
  1198. // read and write packets to be relayed to the client. When
  1199. // TunFileDescriptor is specified, the Client will use this
  1200. // existing tun device and not create its own; in this case,
  1201. // network address and routing configuration is not performed
  1202. // by the Client. As the packet tunnel server performs
  1203. // transparent source IP address and DNS rewriting, the tun
  1204. // device may have any assigned IP address, but should be
  1205. // configured with the given MTU; and DNS should be configured
  1206. // to use the transparent DNS target resolver addresses.
  1207. // Set TunFileDescriptor to <= 0 to ignore this parameter
  1208. // and create and configure a tun device.
  1209. TunFileDescriptor int
  1210. // IPv4AddressCIDR is the IPv4 address and netmask to
  1211. // assign to a newly created tun device.
  1212. IPv4AddressCIDR string
  1213. // IPv6AddressCIDR is the IPv6 address and prefix to
  1214. // assign to a newly created tun device.
  1215. IPv6AddressCIDR string
  1216. // RouteDestinations are hosts (IPs) or networks (CIDRs)
  1217. // to be configured to be routed through a newly
  1218. // created tun device.
  1219. RouteDestinations []string
  1220. }
  1221. // Client is a packet tunnel client. A packet tunnel client
  1222. // relays packets between a local tun device and a packet
  1223. // tunnel server via a transport channel.
  1224. type Client struct {
  1225. config *ClientConfig
  1226. device *Device
  1227. channel *Channel
  1228. upstreamPackets *PacketQueue
  1229. metrics *packetMetrics
  1230. runContext context.Context
  1231. stopRunning context.CancelFunc
  1232. workers *sync.WaitGroup
  1233. }
  1234. // NewClient initializes a new Client. Unless using the
  1235. // TunFileDescriptor configuration parameter, a new tun
  1236. // device is created for the client.
  1237. func NewClient(config *ClientConfig) (*Client, error) {
  1238. var device *Device
  1239. var err error
  1240. if config.TunFileDescriptor > 0 {
  1241. device, err = NewClientDeviceFromFD(config)
  1242. } else {
  1243. device, err = NewClientDevice(config)
  1244. }
  1245. if err != nil {
  1246. return nil, common.ContextError(err)
  1247. }
  1248. upstreamPacketQueueSize := DEFAULT_UPSTREAM_PACKET_QUEUE_SIZE
  1249. if config.UpstreamPacketQueueSize > 0 {
  1250. upstreamPacketQueueSize = config.UpstreamPacketQueueSize
  1251. }
  1252. runContext, stopRunning := context.WithCancel(context.Background())
  1253. return &Client{
  1254. config: config,
  1255. device: device,
  1256. channel: NewChannel(config.Transport, getMTU(config.MTU)),
  1257. upstreamPackets: NewPacketQueue(upstreamPacketQueueSize),
  1258. metrics: new(packetMetrics),
  1259. runContext: runContext,
  1260. stopRunning: stopRunning,
  1261. workers: new(sync.WaitGroup),
  1262. }, nil
  1263. }
  1264. // Start starts a client and returns with it running.
  1265. func (client *Client) Start() {
  1266. client.config.Logger.WithContext().Info("starting")
  1267. client.workers.Add(1)
  1268. go func() {
  1269. defer client.workers.Done()
  1270. for {
  1271. readPacket, err := client.device.ReadPacket()
  1272. select {
  1273. case <-client.runContext.Done():
  1274. // No error is logged as shutdown may have interrupted read.
  1275. return
  1276. default:
  1277. }
  1278. if err != nil {
  1279. client.config.Logger.WithContextFields(
  1280. common.LogFields{"error": err}).Info("read device packet failed")
  1281. // May be temporary error condition, keep working.
  1282. continue
  1283. }
  1284. // processPacket will check for packets the server will reject
  1285. // and drop those without sending.
  1286. // Limitation: packet metrics, including successful relay count,
  1287. // are incremented _before_ the packet is written to the channel.
  1288. if !processPacket(
  1289. client.metrics,
  1290. nil,
  1291. packetDirectionClientUpstream,
  1292. readPacket) {
  1293. continue
  1294. }
  1295. // Instead of immediately writing to the channel, the
  1296. // packet is enqueued, which has the effect of batching
  1297. // up IP packets into a single channel packet (for Psiphon,
  1298. // and SSH packet) to minimize overhead and, as benchmarked,
  1299. // improve throughput.
  1300. // Packet will be dropped if queue is full.
  1301. client.upstreamPackets.Enqueue(readPacket)
  1302. }
  1303. }()
  1304. client.workers.Add(1)
  1305. go func() {
  1306. defer client.workers.Done()
  1307. for {
  1308. packetBuffer, ok := client.upstreamPackets.DequeueFramedPackets(client.runContext)
  1309. if !ok {
  1310. // Dequeue aborted due to session.runContext.Done()
  1311. return
  1312. }
  1313. err := client.channel.WriteFramedPackets(packetBuffer)
  1314. client.upstreamPackets.Replace(packetBuffer)
  1315. if err != nil {
  1316. client.config.Logger.WithContextFields(
  1317. common.LogFields{"error": err}).Info("write channel packets failed")
  1318. // May be temporary error condition, such as reconnecting the tunnel;
  1319. // keep working. The packets are most likely dropped.
  1320. continue
  1321. }
  1322. }
  1323. }()
  1324. client.workers.Add(1)
  1325. go func() {
  1326. defer client.workers.Done()
  1327. for {
  1328. readPacket, err := client.channel.ReadPacket()
  1329. select {
  1330. case <-client.runContext.Done():
  1331. // No error is logged as shutdown may have interrupted read.
  1332. return
  1333. default:
  1334. }
  1335. if err != nil {
  1336. client.config.Logger.WithContextFields(
  1337. common.LogFields{"error": err}).Info("read channel packet failed")
  1338. // May be temporary error condition, such as reconnecting the tunnel;
  1339. // keep working.
  1340. continue
  1341. }
  1342. if !processPacket(
  1343. client.metrics,
  1344. nil,
  1345. packetDirectionClientDownstream,
  1346. readPacket) {
  1347. continue
  1348. }
  1349. err = client.device.WritePacket(readPacket)
  1350. if err != nil {
  1351. client.config.Logger.WithContextFields(
  1352. common.LogFields{"error": err}).Info("write device packet failed")
  1353. // May be temporary error condition, keep working. The packet is
  1354. // most likely dropped.
  1355. continue
  1356. }
  1357. }
  1358. }()
  1359. }
  1360. // Stop halts a running client.
  1361. func (client *Client) Stop() {
  1362. client.config.Logger.WithContext().Info("stopping")
  1363. client.stopRunning()
  1364. client.device.Close()
  1365. client.channel.Close()
  1366. client.workers.Wait()
  1367. client.metrics.checkpoint(
  1368. client.config.Logger, "packet_metrics", packetMetricsAll)
  1369. client.config.Logger.WithContext().Info("stopped")
  1370. }
  1371. /*
  1372. Packet offset constants in getPacketDestinationIPAddress and
  1373. processPacket are from the following RFC definitions.
  1374. IPv4 header: https://tools.ietf.org/html/rfc791
  1375. 0 1 2 3
  1376. 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
  1377. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1378. |Version| IHL |Type of Service| Total Length |
  1379. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1380. | Identification |Flags| Fragment Offset |
  1381. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1382. | Time to Live | Protocol | Header Checksum |
  1383. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1384. | Source Address |
  1385. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1386. | Destination Address |
  1387. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1388. | Options | Padding |
  1389. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1390. IPv6 header: https://tools.ietf.org/html/rfc2460
  1391. 0 1 2 3
  1392. 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
  1393. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1394. |Version| Traffic Class | Flow Label |
  1395. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1396. | Payload Length | Next Header | Hop Limit |
  1397. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1398. | |
  1399. + +
  1400. | |
  1401. + Source Address +
  1402. | |
  1403. + +
  1404. | |
  1405. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1406. | |
  1407. + +
  1408. | |
  1409. + Destination Address +
  1410. | |
  1411. + +
  1412. | |
  1413. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1414. TCP header: https://tools.ietf.org/html/rfc793
  1415. 0 1 2 3
  1416. 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
  1417. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1418. | Source Port | Destination Port |
  1419. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1420. | Sequence Number |
  1421. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1422. | Acknowledgment Number |
  1423. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1424. | Data | |U|A|P|R|S|F| |
  1425. | Offset| Reserved |R|C|S|S|Y|I| Window |
  1426. | | |G|K|H|T|N|N| |
  1427. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1428. | Checksum | Urgent Pointer |
  1429. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1430. | Options | Padding |
  1431. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1432. | data |
  1433. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  1434. UDP header: https://tools.ietf.org/html/rfc768
  1435. 0 7 8 15 16 23 24 31
  1436. +--------+--------+--------+--------+
  1437. | Source | Destination |
  1438. | Port | Port |
  1439. +--------+--------+--------+--------+
  1440. | | |
  1441. | Length | Checksum |
  1442. +--------+--------+--------+--------+
  1443. |
  1444. | data octets ...
  1445. +---------------- ...
  1446. */
  1447. const (
  1448. packetDirectionServerUpstream = 0
  1449. packetDirectionServerDownstream = 1
  1450. packetDirectionClientUpstream = 2
  1451. packetDirectionClientDownstream = 3
  1452. internetProtocolTCP = 6
  1453. internetProtocolUDP = 17
  1454. portNumberDNS = 53
  1455. packetRejectNoSession = 0
  1456. packetRejectDestinationAddress = 1
  1457. packetRejectLength = 2
  1458. packetRejectVersion = 3
  1459. packetRejectOptions = 4
  1460. packetRejectProtocol = 5
  1461. packetRejectTCPProtocolLength = 6
  1462. packetRejectUDPProtocolLength = 7
  1463. packetRejectTCPPort = 8
  1464. packetRejectUDPPort = 9
  1465. packetRejectNoOriginalAddress = 10
  1466. packetRejectNoDNSResolvers = 11
  1467. packetRejectReasonCount = 12
  1468. packetOk = 12
  1469. )
  1470. type packetDirection int
  1471. type internetProtocol int
  1472. type packetRejectReason int
  1473. func packetRejectReasonDescription(reason packetRejectReason) string {
  1474. // Description strings follow the metrics naming
  1475. // convention: all lowercase; underscore seperators.
  1476. switch reason {
  1477. case packetRejectNoSession:
  1478. return "no_session"
  1479. case packetRejectDestinationAddress:
  1480. return "invalid_destination_address"
  1481. case packetRejectLength:
  1482. return "invalid_ip_packet_length"
  1483. case packetRejectVersion:
  1484. return "invalid_ip_header_version"
  1485. case packetRejectOptions:
  1486. return "invalid_ip_header_options"
  1487. case packetRejectProtocol:
  1488. return "invalid_ip_header_protocol"
  1489. case packetRejectTCPProtocolLength:
  1490. return "invalid_tcp_packet_length"
  1491. case packetRejectUDPProtocolLength:
  1492. return "invalid_tcp_packet_length"
  1493. case packetRejectTCPPort:
  1494. return "disallowed_tcp_destination_port"
  1495. case packetRejectUDPPort:
  1496. return "disallowed_udp_destination_port"
  1497. case packetRejectNoOriginalAddress:
  1498. return "no_original_address"
  1499. case packetRejectNoDNSResolvers:
  1500. return "no_dns_resolvers"
  1501. }
  1502. return "unknown_reason"
  1503. }
  1504. // Caller: the destination IP address return value is
  1505. // a slice of the packet input value and only valid while
  1506. // the packet buffer remains valid.
  1507. func getPacketDestinationIPAddress(
  1508. metrics *packetMetrics,
  1509. direction packetDirection,
  1510. packet []byte) (net.IP, bool) {
  1511. // TODO: this function duplicates a subset of the packet
  1512. // parsing code in processPacket. Refactor to reuse code;
  1513. // also, both getPacketDestinationIPAddress and processPacket
  1514. // are called for some packets; refactor to only parse once.
  1515. if len(packet) < 1 {
  1516. metrics.rejectedPacket(direction, packetRejectLength)
  1517. return nil, false
  1518. }
  1519. version := packet[0] >> 4
  1520. if version != 4 && version != 6 {
  1521. metrics.rejectedPacket(direction, packetRejectVersion)
  1522. return nil, false
  1523. }
  1524. if version == 4 {
  1525. if len(packet) < 20 {
  1526. metrics.rejectedPacket(direction, packetRejectLength)
  1527. return nil, false
  1528. }
  1529. return packet[16:20], true
  1530. } else { // IPv6
  1531. if len(packet) < 40 {
  1532. metrics.rejectedPacket(direction, packetRejectLength)
  1533. return nil, false
  1534. }
  1535. return packet[24:40], true
  1536. }
  1537. }
  1538. // processPacket parses IP packets, applies relaying rules,
  1539. // and rewrites packet elements as required. processPacket
  1540. // returns true if a packet parses correctly, is accepted
  1541. // by the relay rules, and is successfully rewritten.
  1542. //
  1543. // When a packet is rejected, processPacket returns false
  1544. // and updates a reason in the supplied metrics.
  1545. //
  1546. // Rejection may result in partially rewritten packets.
  1547. func processPacket(
  1548. metrics *packetMetrics,
  1549. session *session,
  1550. direction packetDirection,
  1551. packet []byte) bool {
  1552. // Parse and validate IP packet structure
  1553. // Must have an IP version field.
  1554. if len(packet) < 1 {
  1555. metrics.rejectedPacket(direction, packetRejectLength)
  1556. return false
  1557. }
  1558. version := packet[0] >> 4
  1559. // Must be IPv4 or IPv6.
  1560. if version != 4 && version != 6 {
  1561. metrics.rejectedPacket(direction, packetRejectVersion)
  1562. return false
  1563. }
  1564. var protocol internetProtocol
  1565. var sourceIPAddress, destinationIPAddress net.IP
  1566. var sourcePort, destinationPort uint16
  1567. var IPChecksum, TCPChecksum, UDPChecksum []byte
  1568. var applicationData []byte
  1569. if version == 4 {
  1570. // IHL must be 5: options are not supported; a fixed
  1571. // 20 byte header is expected.
  1572. headerLength := packet[0] & 0x0F
  1573. if headerLength != 5 {
  1574. metrics.rejectedPacket(direction, packetRejectOptions)
  1575. return false
  1576. }
  1577. if len(packet) < 20 {
  1578. metrics.rejectedPacket(direction, packetRejectLength)
  1579. return false
  1580. }
  1581. // Protocol must be TCP or UDP.
  1582. protocol = internetProtocol(packet[9])
  1583. dataOffset := 0
  1584. if protocol == internetProtocolTCP {
  1585. if len(packet) < 32 {
  1586. metrics.rejectedPacket(direction, packetRejectTCPProtocolLength)
  1587. return false
  1588. }
  1589. dataOffset = 20 + 4*int(packet[32]>>4)
  1590. if len(packet) < dataOffset {
  1591. metrics.rejectedPacket(direction, packetRejectTCPProtocolLength)
  1592. return false
  1593. }
  1594. } else if protocol == internetProtocolUDP {
  1595. dataOffset := 28
  1596. if len(packet) < dataOffset {
  1597. metrics.rejectedPacket(direction, packetRejectUDPProtocolLength)
  1598. return false
  1599. }
  1600. } else {
  1601. metrics.rejectedPacket(direction, packetRejectProtocol)
  1602. return false
  1603. }
  1604. applicationData = packet[dataOffset:]
  1605. // Slices reference packet bytes to be rewritten.
  1606. sourceIPAddress = packet[12:16]
  1607. destinationIPAddress = packet[16:20]
  1608. IPChecksum = packet[10:12]
  1609. // Port numbers have the same offset in TCP and UDP.
  1610. sourcePort = binary.BigEndian.Uint16(packet[20:22])
  1611. destinationPort = binary.BigEndian.Uint16(packet[22:24])
  1612. if protocol == internetProtocolTCP {
  1613. TCPChecksum = packet[36:38]
  1614. } else { // UDP
  1615. UDPChecksum = packet[26:28]
  1616. }
  1617. } else { // IPv6
  1618. if len(packet) < 40 {
  1619. metrics.rejectedPacket(direction, packetRejectLength)
  1620. return false
  1621. }
  1622. // Next Header must be TCP or UDP.
  1623. nextHeader := packet[6]
  1624. protocol = internetProtocol(nextHeader)
  1625. dataOffset := 0
  1626. if protocol == internetProtocolTCP {
  1627. if len(packet) < 52 {
  1628. metrics.rejectedPacket(direction, packetRejectTCPProtocolLength)
  1629. return false
  1630. }
  1631. dataOffset = 40 + 4*int(packet[52]>>4)
  1632. if len(packet) < dataOffset {
  1633. metrics.rejectedPacket(direction, packetRejectTCPProtocolLength)
  1634. return false
  1635. }
  1636. } else if protocol == internetProtocolUDP {
  1637. dataOffset := 48
  1638. if len(packet) < dataOffset {
  1639. metrics.rejectedPacket(direction, packetRejectUDPProtocolLength)
  1640. return false
  1641. }
  1642. } else {
  1643. metrics.rejectedPacket(direction, packetRejectProtocol)
  1644. return false
  1645. }
  1646. applicationData = packet[dataOffset:]
  1647. // Slices reference packet bytes to be rewritten.
  1648. sourceIPAddress = packet[8:24]
  1649. destinationIPAddress = packet[24:40]
  1650. // Port numbers have the same offset in TCP and UDP.
  1651. sourcePort = binary.BigEndian.Uint16(packet[40:42])
  1652. destinationPort = binary.BigEndian.Uint16(packet[42:44])
  1653. if protocol == internetProtocolTCP {
  1654. TCPChecksum = packet[56:58]
  1655. } else { // UDP
  1656. UDPChecksum = packet[46:48]
  1657. }
  1658. }
  1659. // Apply rules
  1660. //
  1661. // Most of this logic is only applied on the server, as only
  1662. // the server knows the traffic rules configuration, and is
  1663. // tracking flows.
  1664. isServer := (direction == packetDirectionServerUpstream ||
  1665. direction == packetDirectionServerDownstream)
  1666. // Check if the packet qualifies for transparent DNS rewriting
  1667. //
  1668. // - Both TCP and UDP DNS packets may qualify
  1669. // - Transparent DNS flows are not tracked, as most DNS
  1670. // resolutions are very-short lived exchanges
  1671. // - The traffic rules checks are bypassed, since transparent
  1672. // DNS is essential
  1673. doTransparentDNS := false
  1674. if isServer {
  1675. if direction == packetDirectionServerUpstream {
  1676. // DNS packets destinated for the transparent DNS target addresses
  1677. // will be rewritten to go to one of the server's resolvers.
  1678. if destinationPort == portNumberDNS {
  1679. if version == 4 && destinationIPAddress.Equal(transparentDNSResolverIPv4Address) {
  1680. numResolvers := len(session.DNSResolverIPv4Addresses)
  1681. if numResolvers > 0 {
  1682. doTransparentDNS = true
  1683. } else {
  1684. metrics.rejectedPacket(direction, packetRejectNoDNSResolvers)
  1685. return false
  1686. }
  1687. } else if version == 6 && destinationIPAddress.Equal(transparentDNSResolverIPv6Address) {
  1688. numResolvers := len(session.DNSResolverIPv6Addresses)
  1689. if numResolvers > 0 {
  1690. doTransparentDNS = true
  1691. } else {
  1692. metrics.rejectedPacket(direction, packetRejectNoDNSResolvers)
  1693. return false
  1694. }
  1695. }
  1696. }
  1697. } else { // packetDirectionServerDownstream
  1698. // DNS packets with a source address of any of the server's
  1699. // resolvers will be rewritten back to the transparent DNS target
  1700. // address.
  1701. // Limitation: responses to client DNS packets _originally
  1702. // destined_ for a resolver in GetDNSResolverIPv4Addresses will
  1703. // be lost. This would happen if some process on the client
  1704. // ignores the system set DNS values; and forces use of the same
  1705. // resolvers as the server.
  1706. if sourcePort == portNumberDNS {
  1707. if version == 4 {
  1708. for _, IPAddress := range session.DNSResolverIPv4Addresses {
  1709. if sourceIPAddress.Equal(IPAddress) {
  1710. doTransparentDNS = true
  1711. break
  1712. }
  1713. }
  1714. } else if version == 6 {
  1715. for _, IPAddress := range session.DNSResolverIPv6Addresses {
  1716. if sourceIPAddress.Equal(IPAddress) {
  1717. doTransparentDNS = true
  1718. break
  1719. }
  1720. }
  1721. }
  1722. }
  1723. }
  1724. }
  1725. // Check if flow is tracked before checking traffic permission
  1726. doFlowTracking := !doTransparentDNS && isServer
  1727. // TODO: verify this struct is stack allocated
  1728. var ID flowID
  1729. isTrackingFlow := false
  1730. if doFlowTracking {
  1731. if direction == packetDirectionServerUpstream {
  1732. ID.set(
  1733. destinationIPAddress, destinationPort, sourceIPAddress, sourcePort, protocol)
  1734. } else if direction == packetDirectionServerDownstream {
  1735. ID.set(
  1736. sourceIPAddress, sourcePort, destinationIPAddress, destinationPort, protocol)
  1737. }
  1738. isTrackingFlow = session.isTrackingFlow(ID)
  1739. }
  1740. // Check packet source/destination is permitted; except for:
  1741. // - existing flows, which have already been checked
  1742. // - transparent DNS, which is always allowed
  1743. if !doTransparentDNS && !isTrackingFlow {
  1744. // Enforce traffic rules (allowed TCP/UDP ports).
  1745. checkPort := 0
  1746. if direction == packetDirectionServerUpstream ||
  1747. direction == packetDirectionClientUpstream {
  1748. checkPort = int(destinationPort)
  1749. } else if direction == packetDirectionServerDownstream ||
  1750. direction == packetDirectionClientDownstream {
  1751. checkPort = int(sourcePort)
  1752. }
  1753. if protocol == internetProtocolTCP {
  1754. if checkPort == 0 ||
  1755. (isServer &&
  1756. !session.checkAllowedTCPPortFunc(net.IP(ID.upstreamIPAddress[:]), checkPort)) {
  1757. metrics.rejectedPacket(direction, packetRejectTCPPort)
  1758. return false
  1759. }
  1760. } else if protocol == internetProtocolUDP {
  1761. if checkPort == 0 ||
  1762. (isServer &&
  1763. !session.checkAllowedUDPPortFunc(net.IP(ID.upstreamIPAddress[:]), checkPort)) {
  1764. metrics.rejectedPacket(direction, packetRejectUDPPort)
  1765. return false
  1766. }
  1767. }
  1768. // Enforce no localhost, multicast or broadcast packets; and
  1769. // no client-to-client packets.
  1770. if !destinationIPAddress.IsGlobalUnicast() ||
  1771. (direction == packetDirectionServerUpstream &&
  1772. ((version == 4 &&
  1773. !destinationIPAddress.Equal(transparentDNSResolverIPv4Address) &&
  1774. privateSubnetIPv4.Contains(destinationIPAddress)) ||
  1775. (version == 6 &&
  1776. !destinationIPAddress.Equal(transparentDNSResolverIPv6Address) &&
  1777. privateSubnetIPv6.Contains(destinationIPAddress)))) {
  1778. metrics.rejectedPacket(direction, packetRejectDestinationAddress)
  1779. return false
  1780. }
  1781. }
  1782. // Configure rewriting.
  1783. var checksumAccumulator int32
  1784. var rewriteSourceIPAddress, rewriteDestinationIPAddress net.IP
  1785. if direction == packetDirectionServerUpstream {
  1786. // Store original source IP address to be replaced in
  1787. // downstream rewriting.
  1788. if version == 4 {
  1789. session.setOriginalIPv4AddressIfNotSet(sourceIPAddress)
  1790. rewriteSourceIPAddress = session.assignedIPv4Address
  1791. } else { // version == 6
  1792. session.setOriginalIPv6AddressIfNotSet(sourceIPAddress)
  1793. rewriteSourceIPAddress = session.assignedIPv6Address
  1794. }
  1795. // Rewrite DNS packets destinated for the transparent DNS target
  1796. // addresses to go to one of the server's resolvers.
  1797. if doTransparentDNS {
  1798. if version == 4 {
  1799. rewriteDestinationIPAddress = session.DNSResolverIPv4Addresses[rand.Intn(
  1800. len(session.DNSResolverIPv4Addresses))]
  1801. } else { // version == 6
  1802. rewriteDestinationIPAddress = session.DNSResolverIPv6Addresses[rand.Intn(
  1803. len(session.DNSResolverIPv6Addresses))]
  1804. }
  1805. }
  1806. } else if direction == packetDirectionServerDownstream {
  1807. // Destination address will be original source address.
  1808. if version == 4 {
  1809. rewriteDestinationIPAddress = session.getOriginalIPv4Address()
  1810. } else { // version == 6
  1811. rewriteDestinationIPAddress = session.getOriginalIPv6Address()
  1812. }
  1813. if rewriteDestinationIPAddress == nil {
  1814. metrics.rejectedPacket(direction, packetRejectNoOriginalAddress)
  1815. return false
  1816. }
  1817. // Rewrite source address of packets from servers' resolvers
  1818. // to transparent DNS target address.
  1819. if doTransparentDNS {
  1820. if version == 4 {
  1821. rewriteSourceIPAddress = transparentDNSResolverIPv4Address
  1822. } else { // version == 6
  1823. rewriteSourceIPAddress = transparentDNSResolverIPv6Address
  1824. }
  1825. }
  1826. }
  1827. // Apply rewrites. IP (v4 only) and TCP/UDP all have packet
  1828. // checksums which are updated to relect the rewritten headers.
  1829. if rewriteSourceIPAddress != nil {
  1830. checksumAccumulate(sourceIPAddress, false, &checksumAccumulator)
  1831. copy(sourceIPAddress, rewriteSourceIPAddress)
  1832. checksumAccumulate(sourceIPAddress, true, &checksumAccumulator)
  1833. }
  1834. if rewriteDestinationIPAddress != nil {
  1835. checksumAccumulate(destinationIPAddress, false, &checksumAccumulator)
  1836. copy(destinationIPAddress, rewriteDestinationIPAddress)
  1837. checksumAccumulate(destinationIPAddress, true, &checksumAccumulator)
  1838. }
  1839. if rewriteSourceIPAddress != nil || rewriteDestinationIPAddress != nil {
  1840. // IPv6 doesn't have an IP header checksum.
  1841. if version == 4 {
  1842. checksumAdjust(IPChecksum, checksumAccumulator)
  1843. }
  1844. if protocol == internetProtocolTCP {
  1845. checksumAdjust(TCPChecksum, checksumAccumulator)
  1846. } else { // UDP
  1847. checksumAdjust(UDPChecksum, checksumAccumulator)
  1848. }
  1849. }
  1850. // Start/update flow tracking, only once past all possible packet rejects
  1851. if doFlowTracking {
  1852. if !isTrackingFlow {
  1853. session.startTrackingFlow(ID, direction, applicationData)
  1854. } else {
  1855. session.updateFlow(ID, direction, applicationData)
  1856. }
  1857. }
  1858. metrics.relayedPacket(direction, int(version), protocol, len(packet))
  1859. return true
  1860. }
  1861. // Checksum code based on https://github.com/OpenVPN/openvpn:
  1862. /*
  1863. OpenVPN (TM) -- An Open Source VPN daemon
  1864. Copyright (C) 2002-2017 OpenVPN Technologies, Inc. <[email protected]>
  1865. OpenVPN license:
  1866. ----------------
  1867. OpenVPN is distributed under the GPL license version 2 (see COPYRIGHT.GPL).
  1868. */
  1869. func checksumAccumulate(data []byte, newData bool, accumulator *int32) {
  1870. // Based on ADD_CHECKSUM_32 and SUB_CHECKSUM_32 macros from OpenVPN:
  1871. // https://github.com/OpenVPN/openvpn/blob/58716979640b5d8850b39820f91da616964398cc/src/openvpn/proto.h#L177
  1872. // Assumes length of data is factor of 4.
  1873. for i := 0; i < len(data); i += 4 {
  1874. var word uint32
  1875. word = uint32(data[i+0])<<24 | uint32(data[i+1])<<16 | uint32(data[i+2])<<8 | uint32(data[i+3])
  1876. if newData {
  1877. *accumulator -= int32(word & 0xFFFF)
  1878. *accumulator -= int32(word >> 16)
  1879. } else {
  1880. *accumulator += int32(word & 0xFFFF)
  1881. *accumulator += int32(word >> 16)
  1882. }
  1883. }
  1884. }
  1885. func checksumAdjust(checksumData []byte, accumulator int32) {
  1886. // Based on ADJUST_CHECKSUM macro from OpenVPN:
  1887. // https://github.com/OpenVPN/openvpn/blob/58716979640b5d8850b39820f91da616964398cc/src/openvpn/proto.h#L177
  1888. // Assumes checksumData is 2 byte slice.
  1889. checksum := uint16(checksumData[0])<<8 | uint16(checksumData[1])
  1890. accumulator += int32(checksum)
  1891. if accumulator < 0 {
  1892. accumulator = -accumulator
  1893. accumulator = (accumulator >> 16) + (accumulator & 0xFFFF)
  1894. accumulator += accumulator >> 16
  1895. checksum = uint16(^accumulator)
  1896. } else {
  1897. accumulator = (accumulator >> 16) + (accumulator & 0xFFFF)
  1898. accumulator += accumulator >> 16
  1899. checksum = uint16(accumulator)
  1900. }
  1901. checksumData[0] = byte(checksum >> 8)
  1902. checksumData[1] = byte(checksum & 0xFF)
  1903. }
  1904. /*
  1905. packet debugging snippet:
  1906. import (
  1907. "github.com/google/gopacket"
  1908. "github.com/google/gopacket/layers"
  1909. )
  1910. func tracePacket(where string, packet []byte) {
  1911. var p gopacket.Packet
  1912. if len(packet) > 0 && packet[0]>>4 == 4 {
  1913. p = gopacket.NewPacket(packet, layers.LayerTypeIPv4, gopacket.Default)
  1914. } else {
  1915. p = gopacket.NewPacket(packet, layers.LayerTypeIPv6, gopacket.Default)
  1916. }
  1917. fmt.Printf("[%s packet]:\n%s\n\n", where, p)
  1918. }
  1919. */
  1920. // Device manages a tun device. It handles packet I/O using static,
  1921. // preallocated buffers to avoid GC churn.
  1922. type Device struct {
  1923. name string
  1924. writeMutex sync.Mutex
  1925. deviceIO io.ReadWriteCloser
  1926. inboundBuffer []byte
  1927. outboundBuffer []byte
  1928. }
  1929. // NewServerDevice creates and configures a new server tun device.
  1930. // Since the server uses fixed address spaces, only one server
  1931. // device may exist per host.
  1932. func NewServerDevice(config *ServerConfig) (*Device, error) {
  1933. file, deviceName, err := OpenTunDevice("")
  1934. if err != nil {
  1935. return nil, common.ContextError(err)
  1936. }
  1937. defer file.Close()
  1938. err = configureServerInterface(config, deviceName)
  1939. if err != nil {
  1940. return nil, common.ContextError(err)
  1941. }
  1942. nio, err := NewNonblockingIO(int(file.Fd()))
  1943. if err != nil {
  1944. return nil, common.ContextError(err)
  1945. }
  1946. return newDevice(
  1947. deviceName,
  1948. nio,
  1949. getMTU(config.MTU)), nil
  1950. }
  1951. // NewClientDevice creates and configures a new client tun device.
  1952. // Multiple client tun devices may exist per host.
  1953. func NewClientDevice(config *ClientConfig) (*Device, error) {
  1954. file, deviceName, err := OpenTunDevice("")
  1955. if err != nil {
  1956. return nil, common.ContextError(err)
  1957. }
  1958. defer file.Close()
  1959. err = configureClientInterface(
  1960. config, deviceName)
  1961. if err != nil {
  1962. return nil, common.ContextError(err)
  1963. }
  1964. nio, err := NewNonblockingIO(int(file.Fd()))
  1965. if err != nil {
  1966. return nil, common.ContextError(err)
  1967. }
  1968. return newDevice(
  1969. deviceName,
  1970. nio,
  1971. getMTU(config.MTU)), nil
  1972. }
  1973. func newDevice(
  1974. name string,
  1975. deviceIO io.ReadWriteCloser,
  1976. MTU int) *Device {
  1977. return &Device{
  1978. name: name,
  1979. deviceIO: deviceIO,
  1980. inboundBuffer: makeDeviceInboundBuffer(MTU),
  1981. outboundBuffer: makeDeviceOutboundBuffer(MTU),
  1982. }
  1983. }
  1984. // NewClientDeviceFromFD wraps an existing tun device.
  1985. func NewClientDeviceFromFD(config *ClientConfig) (*Device, error) {
  1986. nio, err := NewNonblockingIO(config.TunFileDescriptor)
  1987. if err != nil {
  1988. return nil, common.ContextError(err)
  1989. }
  1990. MTU := getMTU(config.MTU)
  1991. return &Device{
  1992. name: "",
  1993. deviceIO: nio,
  1994. inboundBuffer: makeDeviceInboundBuffer(MTU),
  1995. outboundBuffer: makeDeviceOutboundBuffer(MTU),
  1996. }, nil
  1997. }
  1998. // Name returns the interface name for a created tun device,
  1999. // or returns "" for a device created by NewClientDeviceFromFD.
  2000. // The interface name may be used for additional network and
  2001. // routing configuration.
  2002. func (device *Device) Name() string {
  2003. return device.name
  2004. }
  2005. // ReadPacket reads one full packet from the tun device. The
  2006. // return value is a slice of a static, reused buffer, so the
  2007. // value is only valid until the next ReadPacket call.
  2008. // Concurrent calls to ReadPacket are _not_ supported.
  2009. func (device *Device) ReadPacket() ([]byte, error) {
  2010. // readTunPacket performs the platform dependent
  2011. // packet read operation.
  2012. offset, size, err := device.readTunPacket()
  2013. if err != nil {
  2014. return nil, common.ContextError(err)
  2015. }
  2016. return device.inboundBuffer[offset : offset+size], nil
  2017. }
  2018. // WritePacket writes one full packet to the tun device.
  2019. // Concurrent calls to WritePacket are supported.
  2020. func (device *Device) WritePacket(packet []byte) error {
  2021. // This mutex ensures that only one concurrent goroutine
  2022. // can use outboundBuffer when writing.
  2023. device.writeMutex.Lock()
  2024. defer device.writeMutex.Unlock()
  2025. // writeTunPacket performs the platform dependent
  2026. // packet write operation.
  2027. err := device.writeTunPacket(packet)
  2028. if err != nil {
  2029. return common.ContextError(err)
  2030. }
  2031. return nil
  2032. }
  2033. // Close interrupts any blocking Read/Write calls and
  2034. // tears down the tun device.
  2035. func (device *Device) Close() error {
  2036. return device.deviceIO.Close()
  2037. }
  2038. // Channel manages packet transport over a communications channel.
  2039. // Any io.ReadWriteCloser can provide transport. In psiphond, the
  2040. // io.ReadWriteCloser will be an SSH channel. Channel I/O frames
  2041. // packets with a length header and uses static, preallocated
  2042. // buffers to avoid GC churn.
  2043. type Channel struct {
  2044. transport io.ReadWriteCloser
  2045. inboundBuffer []byte
  2046. outboundBuffer []byte
  2047. }
  2048. // IP packets cannot be larger that 64K, so a 16-bit length
  2049. // header is sufficient.
  2050. const (
  2051. channelHeaderSize = 2
  2052. )
  2053. // NewChannel initializes a new Channel.
  2054. func NewChannel(transport io.ReadWriteCloser, MTU int) *Channel {
  2055. return &Channel{
  2056. transport: transport,
  2057. inboundBuffer: make([]byte, channelHeaderSize+MTU),
  2058. outboundBuffer: make([]byte, channelHeaderSize+MTU),
  2059. }
  2060. }
  2061. // ReadPacket reads one full packet from the channel. The
  2062. // return value is a slice of a static, reused buffer, so the
  2063. // value is only valid until the next ReadPacket call.
  2064. // Concurrent calls to ReadPacket are not supported.
  2065. func (channel *Channel) ReadPacket() ([]byte, error) {
  2066. header := channel.inboundBuffer[0:channelHeaderSize]
  2067. _, err := io.ReadFull(channel.transport, header)
  2068. if err != nil {
  2069. return nil, common.ContextError(err)
  2070. }
  2071. size := int(binary.BigEndian.Uint16(header))
  2072. if size > len(channel.inboundBuffer[channelHeaderSize:]) {
  2073. return nil, common.ContextError(fmt.Errorf("packet size exceeds MTU: %d", size))
  2074. }
  2075. packet := channel.inboundBuffer[channelHeaderSize : channelHeaderSize+size]
  2076. _, err = io.ReadFull(channel.transport, packet)
  2077. if err != nil {
  2078. return nil, common.ContextError(err)
  2079. }
  2080. return packet, nil
  2081. }
  2082. // WritePacket writes one full packet to the channel.
  2083. // Concurrent calls to WritePacket are not supported.
  2084. func (channel *Channel) WritePacket(packet []byte) error {
  2085. // Flow control assumed to be provided by the transport. In the case
  2086. // of SSH, the channel window size will determine whether the packet
  2087. // data is transmitted immediately or whether the transport.Write will
  2088. // block. When the channel window is full and transport.Write blocks,
  2089. // the sender's tun device will not be read (client case) or the send
  2090. // queue will fill (server case) and packets will be dropped. In this
  2091. // way, the channel window size will influence the TCP window size for
  2092. // tunneled traffic.
  2093. // When the transport is an SSH channel, the overhead per packet message
  2094. // includes:
  2095. //
  2096. // - SSH_MSG_CHANNEL_DATA: 5 bytes (https://tools.ietf.org/html/rfc4254#section-5.2)
  2097. // - SSH packet: ~28 bytes (https://tools.ietf.org/html/rfc4253#section-5.3), with MAC
  2098. // - TCP/IP transport for SSH: 40 bytes for IPv4
  2099. // Assumes MTU <= 64K and len(packet) <= MTU
  2100. size := len(packet)
  2101. binary.BigEndian.PutUint16(channel.outboundBuffer, uint16(size))
  2102. copy(channel.outboundBuffer[channelHeaderSize:], packet)
  2103. _, err := channel.transport.Write(channel.outboundBuffer[0 : channelHeaderSize+size])
  2104. if err != nil {
  2105. return common.ContextError(err)
  2106. }
  2107. return nil
  2108. }
  2109. // WriteFramedPackets writes a buffer of pre-framed packets to
  2110. // the channel.
  2111. // Concurrent calls to WriteFramedPackets are not supported.
  2112. func (channel *Channel) WriteFramedPackets(packetBuffer []byte) error {
  2113. _, err := channel.transport.Write(packetBuffer)
  2114. if err != nil {
  2115. return common.ContextError(err)
  2116. }
  2117. return nil
  2118. }
  2119. // Close interrupts any blocking Read/Write calls and
  2120. // closes the channel transport.
  2121. func (channel *Channel) Close() error {
  2122. return channel.transport.Close()
  2123. }