packetTunnelTransport.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. /*
  2. * Copyright (c) 2017, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "net"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  26. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/monotime"
  27. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  28. )
  29. // PacketTunnelTransport is an integration layer that presents an io.ReadWriteCloser interface
  30. // to a tun.Client as the transport for relaying packets. The Psiphon client may periodically
  31. // disconnect from and reconnect to the same or different Psiphon servers. PacketTunnelTransport
  32. // allows the Psiphon client to substitute new transport channels on-the-fly.
  33. type PacketTunnelTransport struct {
  34. // Note: 64-bit ints used with atomic operations are placed
  35. // at the start of struct to ensure 64-bit alignment.
  36. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  37. readTimeout int64
  38. readDeadline int64
  39. closed int32
  40. workers *sync.WaitGroup
  41. readMutex sync.Mutex
  42. writeMutex sync.Mutex
  43. channelReady *sync.Cond
  44. channelMutex sync.Mutex
  45. channelConn net.Conn
  46. channelTunnel *Tunnel
  47. }
  48. // NewPacketTunnelTransport initializes a PacketTunnelTransport.
  49. func NewPacketTunnelTransport() *PacketTunnelTransport {
  50. return &PacketTunnelTransport{
  51. workers: new(sync.WaitGroup),
  52. channelReady: sync.NewCond(new(sync.Mutex)),
  53. }
  54. }
  55. // Read implements the io.Reader interface. It uses the current transport channel
  56. // to read packet data, or waits for a new transport channel to be established
  57. // after a failure.
  58. func (p *PacketTunnelTransport) Read(data []byte) (int, error) {
  59. p.readMutex.Lock()
  60. defer p.readMutex.Unlock()
  61. // getChannel will block if there's no channel, or return an error when
  62. // closed.
  63. channelConn, channelTunnel, err := p.getChannel()
  64. if err != nil {
  65. return 0, errors.Trace(err)
  66. }
  67. n, err := channelConn.Read(data)
  68. if err != nil {
  69. // This assumes that any error means the channel has failed, which
  70. // is the case for ssh.Channel reads. io.EOF is not ignored, since
  71. // a single ssh.Channel may EOF and still get substituted with a new
  72. // channel.
  73. p.failedChannel(channelConn, channelTunnel)
  74. } else {
  75. // Clear the read deadline now that a read has succeeded.
  76. // See read deadline comment in Write.
  77. atomic.StoreInt64(&p.readDeadline, 0)
  78. }
  79. return n, errors.Trace(err)
  80. }
  81. // Write implements the io.Writer interface. It uses the current transport channel
  82. // to write packet data, or waits for a new transport channel to be established
  83. // after a failure.
  84. func (p *PacketTunnelTransport) Write(data []byte) (int, error) {
  85. p.writeMutex.Lock()
  86. defer p.writeMutex.Unlock()
  87. // getChannel will block if there's no channel, or return an error when
  88. // closed.
  89. channelConn, channelTunnel, err := p.getChannel()
  90. if err != nil {
  91. return 0, errors.Trace(err)
  92. }
  93. n, err := channelConn.Write(data)
  94. if err != nil {
  95. // This assumes that any error means the channel has failed, which
  96. // is the case for ssh.Channel writes.
  97. p.failedChannel(channelConn, channelTunnel)
  98. } else {
  99. // Set a read deadline: a successful read should occur within the deadline;
  100. // otherwise an SSH keep alive probe is triggered to check the tunnel
  101. // status.
  102. //
  103. // This scheme mirrors the tunnel dial port forward timeout mechanism
  104. // present in port forward mode: for any port forwarded connection attempt,
  105. // if there's a timeout before receiving a response from the server, an SSH
  106. // keep alive probe is triggered to check the tunnel state. Unlike port
  107. // forward mode, packet tunnel doesn't track tunneled connections (flows).
  108. //
  109. // Here, we deploy a heuristic based on the observation that, for most
  110. // traffic, a packet sent from the client -- a PacketTunnelTransport.Write
  111. // -- is followed by a packet received from the server -- a
  112. // PacketTunnelTransport.Read. For example, a UDP DNS request followed by a
  113. // response; or a TCP handshake sequence. The heuristic is to trigger an SSH
  114. // keep alive probe when there is no Read within the timeout period after a
  115. // Write. Any Read is sufficient to satisfy the deadline.
  116. //
  117. // To limit performance impact, we do not use, and continuously reset, a
  118. // time.Timer; instead we record the deadline upon successful Write and
  119. // check any set deadline during subsequent Writes. For the same reason, we
  120. // do we use a time.Ticker to check the deadline. This means that this
  121. // scheme depends on the host continuing to attempt to send packets in order
  122. // to trigger the SSH keep alive.
  123. //
  124. // Access to readDeadline/readTimeout is not intended to be completely
  125. // atomic.
  126. readDeadline := monotime.Time(atomic.LoadInt64(&p.readDeadline))
  127. if readDeadline > 0 {
  128. if monotime.Now().After(readDeadline) {
  129. select {
  130. case channelTunnel.signalPortForwardFailure <- struct{}{}:
  131. default:
  132. }
  133. // Clear the deadline now that a probe is triggered.
  134. atomic.StoreInt64(&p.readDeadline, 0)
  135. }
  136. // Keep an existing deadline as set: subsequent writes attempts shouldn't
  137. // extend the deadline.
  138. } else {
  139. readTimeout := time.Duration(atomic.LoadInt64(&p.readTimeout))
  140. readDeadline := monotime.Now().Add(readTimeout)
  141. atomic.StoreInt64(&p.readDeadline, int64(readDeadline))
  142. }
  143. }
  144. return n, errors.Trace(err)
  145. }
  146. // Close implements the io.Closer interface. Any underlying transport channel is
  147. // closed and any blocking Read/Write calls will be interrupted.
  148. func (p *PacketTunnelTransport) Close() error {
  149. if !atomic.CompareAndSwapInt32(&p.closed, 0, 1) {
  150. return nil
  151. }
  152. p.workers.Wait()
  153. // This broadcast is to wake up reads or writes blocking in getChannel; those
  154. // getChannel calls should then abort on the p.closed check.
  155. p.channelReady.Broadcast()
  156. p.channelMutex.Lock()
  157. if p.channelConn != nil {
  158. p.channelConn.Close()
  159. p.channelConn = nil
  160. }
  161. p.channelMutex.Unlock()
  162. return nil
  163. }
  164. // UseTunnel sets the PacketTunnelTransport to use a new transport channel within
  165. // the specified tunnel. UseTunnel does not block on the open channel call; it spawns
  166. // a worker that calls tunnel.DialPacketTunnelChannel and uses the resulting channel.
  167. // UseTunnel has no effect once Close is called.
  168. //
  169. // Note that a blocked tunnel.DialPacketTunnelChannel with block Close;
  170. // callers should arrange for DialPacketTunnelChannel to be interrupted when
  171. // calling Close.
  172. func (p *PacketTunnelTransport) UseTunnel(tunnel *Tunnel) {
  173. // Don't start a worker when closed, after which workers.Wait may be called.
  174. if atomic.LoadInt32(&p.closed) == 1 {
  175. return
  176. }
  177. // Spawning a new worker ensures that the latest tunnel is used to dial a
  178. // new channel without delaying, as might happen if using a single worker
  179. // that consumes a channel of tunnels.
  180. p.workers.Add(1)
  181. go func(tunnel *Tunnel) {
  182. defer p.workers.Done()
  183. // channelConn is a net.Conn, since some layering has been applied
  184. // (e.g., transferstats.Conn). PacketTunnelTransport assumes the
  185. // channelConn is ultimately an ssh.Channel, which is not a fully
  186. // functional net.Conn.
  187. channelConn, err := tunnel.DialPacketTunnelChannel()
  188. if err != nil {
  189. // Note: DialPacketTunnelChannel will signal a probe on failure,
  190. // so it's not necessary to do so here.
  191. NoticeWarning("dial packet tunnel channel failed: %s", err)
  192. // TODO: retry?
  193. return
  194. }
  195. p.setChannel(channelConn, tunnel)
  196. }(tunnel)
  197. }
  198. func (p *PacketTunnelTransport) setChannel(
  199. channelConn net.Conn, channelTunnel *Tunnel) {
  200. p.channelMutex.Lock()
  201. // Concurrency note: this check is within the mutex to ensure that a
  202. // UseTunnel call concurrent with a Close call doesn't leave a channel
  203. // set.
  204. if atomic.LoadInt32(&p.closed) == 1 {
  205. p.channelMutex.Unlock()
  206. return
  207. }
  208. // Interrupt Read/Write calls blocking on any previous channel.
  209. if p.channelConn != nil {
  210. p.channelConn.Close()
  211. }
  212. p.channelConn = channelConn
  213. p.channelTunnel = channelTunnel
  214. p.channelMutex.Unlock()
  215. // Initialize the read deadline mechanism using parameters associated with the
  216. // new tunnel.
  217. timeout := channelTunnel.config.
  218. GetParameters().
  219. GetCustom(channelTunnel.dialParams.NetworkLatencyMultiplier).
  220. Duration(parameters.PacketTunnelReadTimeout)
  221. atomic.StoreInt64(&p.readTimeout, int64(timeout))
  222. atomic.StoreInt64(&p.readDeadline, 0)
  223. p.channelReady.Broadcast()
  224. }
  225. func (p *PacketTunnelTransport) getChannel() (net.Conn, *Tunnel, error) {
  226. var channelConn net.Conn
  227. var channelTunnel *Tunnel
  228. p.channelReady.L.Lock()
  229. defer p.channelReady.L.Unlock()
  230. for {
  231. if atomic.LoadInt32(&p.closed) == 1 {
  232. return nil, nil, errors.TraceNew("already closed")
  233. }
  234. p.channelMutex.Lock()
  235. channelConn = p.channelConn
  236. channelTunnel = p.channelTunnel
  237. p.channelMutex.Unlock()
  238. if channelConn != nil {
  239. break
  240. }
  241. p.channelReady.Wait()
  242. }
  243. return channelConn, channelTunnel, nil
  244. }
  245. func (p *PacketTunnelTransport) failedChannel(
  246. channelConn net.Conn, channelTunnel *Tunnel) {
  247. // In case the channel read/write failed and the tunnel isn't
  248. // yet in the failed state, trigger a probe.
  249. select {
  250. case channelTunnel.signalPortForwardFailure <- struct{}{}:
  251. default:
  252. }
  253. // Clear the current channel. This will cause subsequent Read/Write
  254. // calls to block in getChannel until a new channel is provided.
  255. // Concurrency note: must check, within the mutex, that the channelConn
  256. // is still the one that failed before clearing, since both Read and
  257. // Write could call failedChannel concurrently.
  258. p.channelMutex.Lock()
  259. if p.channelConn == channelConn {
  260. p.channelConn.Close()
  261. p.channelConn = nil
  262. p.channelTunnel = nil
  263. }
  264. p.channelMutex.Unlock()
  265. // Try to establish a new channel within the current tunnel. If this
  266. // fails, a port forward failure probe will be triggered which will
  267. // ultimately trigger a SSH keep alive probe.
  268. //
  269. // One case where this is necessary is when the server closes an idle
  270. // packet tunnel port forward for a live SSH tunnel.
  271. p.UseTunnel(channelTunnel)
  272. }