packetTunnelTransport.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. /*
  2. * Copyright (c) 2017, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "context"
  22. "net"
  23. "sync"
  24. "sync/atomic"
  25. "time"
  26. "github.com/Psiphon-Labs/goarista/monotime"
  27. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  29. )
  30. // PacketTunnelTransport is an integration layer that presents an io.ReadWriteCloser interface
  31. // to a tun.Client as the transport for relaying packets. The Psiphon client may periodically
  32. // disconnect from and reconnect to the same or different Psiphon servers. PacketTunnelTransport
  33. // allows the Psiphon client to substitute new transport channels on-the-fly.
  34. type PacketTunnelTransport struct {
  35. // Note: 64-bit ints used with atomic operations are placed
  36. // at the start of struct to ensure 64-bit alignment.
  37. // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
  38. readTimeout int64
  39. readDeadline int64
  40. runCtx context.Context
  41. stopRunning context.CancelFunc
  42. workers *sync.WaitGroup
  43. readMutex sync.Mutex
  44. writeMutex sync.Mutex
  45. channelReady *sync.Cond
  46. channelMutex sync.Mutex
  47. channelConn net.Conn
  48. channelTunnel *Tunnel
  49. }
  50. // NewPacketTunnelTransport initializes a PacketTunnelTransport.
  51. func NewPacketTunnelTransport() *PacketTunnelTransport {
  52. runCtx, stopRunning := context.WithCancel(context.Background())
  53. return &PacketTunnelTransport{
  54. runCtx: runCtx,
  55. stopRunning: stopRunning,
  56. workers: new(sync.WaitGroup),
  57. channelReady: sync.NewCond(new(sync.Mutex)),
  58. }
  59. }
  60. // Read implements the io.Reader interface. It uses the current transport channel
  61. // to read packet data, or waits for a new transport channel to be established
  62. // after a failure.
  63. func (p *PacketTunnelTransport) Read(data []byte) (int, error) {
  64. p.readMutex.Lock()
  65. defer p.readMutex.Unlock()
  66. // getChannel will block if there's no channel.
  67. channelConn, channelTunnel, err := p.getChannel()
  68. if err != nil {
  69. return 0, errors.Trace(err)
  70. }
  71. n, err := channelConn.Read(data)
  72. if err != nil {
  73. // This assumes that any error means the channel has failed, which
  74. // is the case for ssh.Channel reads. io.EOF is not ignored, since
  75. // a single ssh.Channel may EOF and still get substituted with a new
  76. // channel.
  77. p.failedChannel(channelConn, channelTunnel)
  78. } else {
  79. // Clear the read deadline now that a read has succeeded.
  80. // See read deadline comment in Write.
  81. atomic.StoreInt64(&p.readDeadline, 0)
  82. }
  83. return n, errors.Trace(err)
  84. }
  85. // Write implements the io.Writer interface. It uses the current transport channel
  86. // to write packet data, or waits for a new transport channel to be established
  87. // after a failure.
  88. func (p *PacketTunnelTransport) Write(data []byte) (int, error) {
  89. p.writeMutex.Lock()
  90. defer p.writeMutex.Unlock()
  91. channelConn, channelTunnel, err := p.getChannel()
  92. if err != nil {
  93. return 0, errors.Trace(err)
  94. }
  95. n, err := channelConn.Write(data)
  96. if err != nil {
  97. // This assumes that any error means the channel has failed, which
  98. // is the case for ssh.Channel writes.
  99. p.failedChannel(channelConn, channelTunnel)
  100. } else {
  101. // Set a read deadline: a successful read should occur within the deadline;
  102. // otherwise an SSH keep alive probe is triggered to check the tunnel
  103. // status.
  104. //
  105. // This scheme mirrors the tunnel dial port forward timeout mechanism
  106. // present in port forward mode: for any port forwarded connection attempt,
  107. // if there's a timeout before receiving a response from the server, an SSH
  108. // keep alive probe is triggered to check the tunnel state. Unlike port
  109. // forward mode, packet tunnel doesn't track tunneled connections (flows).
  110. //
  111. // Here, we deploy a heuristic based on the observation that, for most
  112. // traffic, a packet sent from the client -- a PacketTunnelTransport.Write
  113. // -- is followed by a packet received from the server -- a
  114. // PacketTunnelTransport.Read. For example, a UDP DNS request followed by a
  115. // response; or a TCP handshake sequence. The heuristic is to trigger an SSH
  116. // keep alive probe when there is no Read within the timeout period after a
  117. // Write. Any Read is sufficient to satisfy the deadline.
  118. //
  119. // To limit performance impact, we do not use, and continuously reset, a
  120. // time.Timer; instead we record the deadline upon successful Write and
  121. // check any set deadline during subsequent Writes. For the same reason, we
  122. // do we use a time.Ticker to check the deadline. This means that this
  123. // scheme depends on the host continuing to attempt to send packets in order
  124. // to trigger the SSH keep alive.
  125. //
  126. // Access to readDeadline/readTimeout is not intended to be completely
  127. // atomic.
  128. readDeadline := monotime.Time(atomic.LoadInt64(&p.readDeadline))
  129. if readDeadline > 0 {
  130. if monotime.Now().After(readDeadline) {
  131. select {
  132. case channelTunnel.signalPortForwardFailure <- struct{}{}:
  133. default:
  134. }
  135. // Clear the deadline now that a probe is triggered.
  136. atomic.StoreInt64(&p.readDeadline, 0)
  137. }
  138. // Keep an existing deadline as set: subsequent writes attempts shouldn't
  139. // extend the deadline.
  140. } else {
  141. readTimeout := time.Duration(atomic.LoadInt64(&p.readTimeout))
  142. readDeadline := monotime.Now().Add(readTimeout)
  143. atomic.StoreInt64(&p.readDeadline, int64(readDeadline))
  144. }
  145. }
  146. return n, errors.Trace(err)
  147. }
  148. // Close implements the io.Closer interface. Any underlying transport channel is
  149. // closed and any blocking Read/Write calls will be interrupted.
  150. func (p *PacketTunnelTransport) Close() error {
  151. p.stopRunning()
  152. p.workers.Wait()
  153. // This broadcast is to wake up reads or writes blocking in getChannel; those
  154. // getChannel calls should then abort on the p.runCtx.Done() check.
  155. p.channelReady.Broadcast()
  156. p.channelMutex.Lock()
  157. if p.channelConn != nil {
  158. p.channelConn.Close()
  159. p.channelConn = nil
  160. }
  161. p.channelMutex.Unlock()
  162. return nil
  163. }
  164. // UseTunnel sets the PacketTunnelTransport to use a new transport channel within
  165. // the specified tunnel. UseTunnel does not block on the open channel call; it spawns
  166. // a worker that calls tunnel.DialPacketTunnelChannel and uses the resulting channel.
  167. func (p *PacketTunnelTransport) UseTunnel(tunnel *Tunnel) {
  168. p.workers.Add(1)
  169. go func(tunnel *Tunnel) {
  170. defer p.workers.Done()
  171. // channelConn is a net.Conn, since some layering has been applied
  172. // (e.g., transferstats.Conn). PacketTunnelTransport assumes the
  173. // channelConn is ultimately an ssh.Channel, which is not a fully
  174. // functional net.Conn.
  175. channelConn, err := tunnel.DialPacketTunnelChannel()
  176. if err != nil {
  177. // Note: DialPacketTunnelChannel will signal a probe on failure,
  178. // so it's not necessary to do so here.
  179. NoticeWarning("dial packet tunnel channel failed: %s", err)
  180. // TODO: retry?
  181. return
  182. }
  183. p.setChannel(channelConn, tunnel)
  184. }(tunnel)
  185. }
  186. func (p *PacketTunnelTransport) setChannel(
  187. channelConn net.Conn, channelTunnel *Tunnel) {
  188. p.channelMutex.Lock()
  189. // Concurrency note: this check is within the mutex to ensure that a
  190. // UseTunnel call concurrent with a Close call doesn't leave a channel
  191. // set.
  192. select {
  193. case <-p.runCtx.Done():
  194. p.channelMutex.Unlock()
  195. return
  196. default:
  197. }
  198. // Interrupt Read/Write calls blocking on any previous channel.
  199. if p.channelConn != nil {
  200. p.channelConn.Close()
  201. }
  202. p.channelConn = channelConn
  203. p.channelTunnel = channelTunnel
  204. p.channelMutex.Unlock()
  205. // Initialize the read deadline mechanism using parameters associated with the
  206. // new tunnel.
  207. timeout := channelTunnel.config.
  208. GetClientParameters().
  209. GetCustom(channelTunnel.dialParams.NetworkLatencyMultiplier).
  210. Duration(parameters.PacketTunnelReadTimeout)
  211. atomic.StoreInt64(&p.readTimeout, int64(timeout))
  212. atomic.StoreInt64(&p.readDeadline, 0)
  213. p.channelReady.Broadcast()
  214. }
  215. func (p *PacketTunnelTransport) getChannel() (net.Conn, *Tunnel, error) {
  216. var channelConn net.Conn
  217. var channelTunnel *Tunnel
  218. p.channelReady.L.Lock()
  219. defer p.channelReady.L.Unlock()
  220. for {
  221. select {
  222. case <-p.runCtx.Done():
  223. return nil, nil, errors.TraceNew("already closed")
  224. default:
  225. }
  226. p.channelMutex.Lock()
  227. channelConn = p.channelConn
  228. channelTunnel = p.channelTunnel
  229. p.channelMutex.Unlock()
  230. if channelConn != nil {
  231. break
  232. }
  233. p.channelReady.Wait()
  234. }
  235. return channelConn, channelTunnel, nil
  236. }
  237. func (p *PacketTunnelTransport) failedChannel(
  238. channelConn net.Conn, channelTunnel *Tunnel) {
  239. // In case the channel read/write failed and the tunnel isn't
  240. // yet in the failed state, trigger a probe.
  241. select {
  242. case channelTunnel.signalPortForwardFailure <- struct{}{}:
  243. default:
  244. }
  245. // Clear the current channel. This will cause subsequent Read/Write
  246. // calls to block in getChannel until a new channel is provided.
  247. // Concurrency note: must check, within the mutex, that the channelConn
  248. // is still the one that failed before clearing, since both Read and
  249. // Write could call failedChannel concurrently.
  250. p.channelMutex.Lock()
  251. if p.channelConn == channelConn {
  252. p.channelConn.Close()
  253. p.channelConn = nil
  254. p.channelTunnel = nil
  255. }
  256. p.channelMutex.Unlock()
  257. // Try to establish a new channel within the current tunnel. If this
  258. // fails, a port forward failure probe will be triggered which will
  259. // ultimately trigger a SSH keep alive probe.
  260. //
  261. // One case where this is necessary is when the server closes an idle
  262. // packet tunnel port forward for a live SSH tunnel.
  263. p.UseTunnel(channelTunnel)
  264. }