packetTunnelTransport.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. /*
  2. * Copyright (c) 2017, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "context"
  22. "errors"
  23. "net"
  24. "sync"
  25. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  26. )
  27. // PacketTunnelTransport is an integration layer that presents an io.ReadWriteCloser interface
  28. // to a tun.Client as the transport for relaying packets. The Psiphon client may periodically
  29. // disconnect from and reconnect to the same or different Psiphon servers. PacketTunnelTransport
  30. // allows the Psiphon client to substitute new transport channels on-the-fly.
  31. type PacketTunnelTransport struct {
  32. runCtx context.Context
  33. stopRunning context.CancelFunc
  34. workers *sync.WaitGroup
  35. readMutex sync.Mutex
  36. writeMutex sync.Mutex
  37. channelReady *sync.Cond
  38. channelMutex sync.Mutex
  39. channelConn net.Conn
  40. channelTunnel *Tunnel
  41. }
  42. // NewPacketTunnelTransport initializes a PacketTunnelTransport.
  43. func NewPacketTunnelTransport() *PacketTunnelTransport {
  44. runCtx, stopRunning := context.WithCancel(context.Background())
  45. return &PacketTunnelTransport{
  46. runCtx: runCtx,
  47. stopRunning: stopRunning,
  48. workers: new(sync.WaitGroup),
  49. channelReady: sync.NewCond(new(sync.Mutex)),
  50. }
  51. }
  52. // Read implements the io.Reader interface. It uses the current transport channel
  53. // to read packet data, or waits for a new transport channel to be established
  54. // after a failure.
  55. func (p *PacketTunnelTransport) Read(data []byte) (int, error) {
  56. p.readMutex.Lock()
  57. defer p.readMutex.Unlock()
  58. // getChannel will block if there's no channel.
  59. channelConn, channelTunnel, err := p.getChannel()
  60. if err != nil {
  61. return 0, common.ContextError(err)
  62. }
  63. n, err := channelConn.Read(data)
  64. if err != nil {
  65. // This assumes that any error means the channel has failed, which
  66. // is the case for ssh.Channel reads. io.EOF is not ignored, since
  67. // a single ssh.Channel may EOF and still get substituted with a new
  68. // channel.
  69. p.failedChannel(channelConn, channelTunnel)
  70. }
  71. return n, common.ContextError(err)
  72. }
  73. // Write implements the io.Writer interface. It uses the current transport channel
  74. // to write packet data, or waits for a new transport channel to be established
  75. // after a failure.
  76. func (p *PacketTunnelTransport) Write(data []byte) (int, error) {
  77. p.writeMutex.Lock()
  78. defer p.writeMutex.Unlock()
  79. channelConn, channelTunnel, err := p.getChannel()
  80. if err != nil {
  81. return 0, common.ContextError(err)
  82. }
  83. n, err := channelConn.Write(data)
  84. if err != nil {
  85. // This assumes that any error means the channel has failed, which
  86. // is the case for ssh.Channel writes.
  87. p.failedChannel(channelConn, channelTunnel)
  88. }
  89. return n, common.ContextError(err)
  90. }
  91. // Close implements the io.Closer interface. Any underlying transport channel is
  92. // closed and any blocking Read/Write calls will be interrupted.
  93. func (p *PacketTunnelTransport) Close() error {
  94. p.stopRunning()
  95. p.workers.Wait()
  96. // This broadcast is to wake up reads or writes blocking in getChannel; those
  97. // getChannel calls should then abort on the p.runCtx.Done() check.
  98. p.channelReady.Broadcast()
  99. p.channelMutex.Lock()
  100. if p.channelConn != nil {
  101. p.channelConn.Close()
  102. p.channelConn = nil
  103. }
  104. p.channelMutex.Unlock()
  105. return nil
  106. }
  107. // UseTunnel sets the PacketTunnelTransport to use a new transport channel within
  108. // the specified tunnel. UseTunnel does not block on the open channel call; it spawns
  109. // a worker that calls tunnel.DialPacketTunnelChannel and uses the resulting channel.
  110. func (p *PacketTunnelTransport) UseTunnel(tunnel *Tunnel) {
  111. p.workers.Add(1)
  112. go func(tunnel *Tunnel) {
  113. defer p.workers.Done()
  114. // channelConn is a net.Conn, since some layering has been applied
  115. // (e.g., transferstats.Conn). PacketTunnelTransport assumes the
  116. // channelConn is ultimately an ssh.Channel, which is not a fully
  117. // functional net.Conn.
  118. channelConn, err := tunnel.DialPacketTunnelChannel()
  119. if err != nil {
  120. // Note: DialPacketTunnelChannel will signal a probe on failure,
  121. // so it's not necessary to do so here.
  122. NoticeAlert("dial packet tunnel channel failed : %s", err)
  123. // TODO: retry?
  124. return
  125. }
  126. p.setChannel(channelConn, tunnel)
  127. }(tunnel)
  128. }
  129. func (p *PacketTunnelTransport) setChannel(
  130. channelConn net.Conn, channelTunnel *Tunnel) {
  131. p.channelMutex.Lock()
  132. // Concurrency note: this check is within the mutex to ensure that a
  133. // UseTunnel call concurrent with a Close call doesn't leave a channel
  134. // set.
  135. select {
  136. case <-p.runCtx.Done():
  137. p.channelMutex.Unlock()
  138. return
  139. default:
  140. }
  141. // Interrupt Read/Write calls blocking on any previous channel.
  142. if p.channelConn != nil {
  143. p.channelConn.Close()
  144. }
  145. p.channelConn = channelConn
  146. p.channelTunnel = channelTunnel
  147. p.channelMutex.Unlock()
  148. p.channelReady.Broadcast()
  149. }
  150. func (p *PacketTunnelTransport) getChannel() (net.Conn, *Tunnel, error) {
  151. var channelConn net.Conn
  152. var channelTunnel *Tunnel
  153. p.channelReady.L.Lock()
  154. defer p.channelReady.L.Unlock()
  155. for {
  156. select {
  157. case <-p.runCtx.Done():
  158. return nil, nil, common.ContextError(errors.New("already closed"))
  159. default:
  160. }
  161. p.channelMutex.Lock()
  162. channelConn = p.channelConn
  163. channelTunnel = p.channelTunnel
  164. p.channelMutex.Unlock()
  165. if channelConn != nil {
  166. break
  167. }
  168. p.channelReady.Wait()
  169. }
  170. return channelConn, channelTunnel, nil
  171. }
  172. func (p *PacketTunnelTransport) failedChannel(
  173. channelConn net.Conn, channelTunnel *Tunnel) {
  174. // In case the channel read/write failed and the tunnel isn't
  175. // yet in the failed state, trigger a probe.
  176. select {
  177. case channelTunnel.signalPortForwardFailure <- *new(struct{}):
  178. default:
  179. }
  180. // Clear the current channel. This will cause subsequent Read/Write
  181. // calls to block in getChannel until a new channel is provided.
  182. // Concurrency note: must check, within the mutex, that the channelConn
  183. // is still the one that failed before clearing, since both Read and
  184. // Write could call failedChannel concurrently.
  185. p.channelMutex.Lock()
  186. if p.channelConn == channelConn {
  187. p.channelConn.Close()
  188. p.channelConn = nil
  189. p.channelTunnel = nil
  190. }
  191. p.channelMutex.Unlock()
  192. // Try to establish a new channel within the current tunnel. If this
  193. // fails, a port forward failure probe will be triggered which will
  194. // ultimately trigger a SSH keep alive probe.
  195. //
  196. // One case where this is necessary is when the server closes an idle
  197. // packet tunnel port forward for a live SSH tunnel.
  198. p.UseTunnel(channelTunnel)
  199. }