TCPConn.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. /*
  2. * Copyright (c) 2015, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package psiphon
  20. import (
  21. "errors"
  22. "net"
  23. "sync"
  24. "time"
  25. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/upstreamproxy"
  26. )
  27. // TCPConn is a customized TCP connection that:
  28. // - can be interrupted while connecting;
  29. // - implements a connect timeout;
  30. // - implements idle read/write timeouts;
  31. // - uses an upstream proxy when specified, and includes
  32. // upstream proxy dialing in the connect timeout;
  33. // - can be bound to a specific system device (for Android VpnService
  34. // routing compatibility, for example);
  35. // - implements the psiphon.Conn interface
  36. type TCPConn struct {
  37. net.Conn
  38. mutex sync.Mutex
  39. isClosed bool
  40. closedSignal chan struct{}
  41. interruptible interruptibleTCPSocket
  42. readTimeout time.Duration
  43. writeTimeout time.Duration
  44. }
  45. // NewTCPDialer creates a TCPDialer.
  46. func NewTCPDialer(config *DialConfig) Dialer {
  47. return makeTCPDialer(config)
  48. }
  49. // DialTCP creates a new, connected TCPConn.
  50. func DialTCP(addr string, config *DialConfig) (conn net.Conn, err error) {
  51. return makeTCPDialer(config)("tcp", addr)
  52. }
  53. // makeTCPDialer creates a custom dialer which creates TCPConn. An upstream
  54. // proxy is used when specified.
  55. func makeTCPDialer(config *DialConfig) func(network, addr string) (net.Conn, error) {
  56. dialer := func(network, addr string) (net.Conn, error) {
  57. if network != "tcp" {
  58. return nil, errors.New("unsupported network type in TCPConn dialer")
  59. }
  60. conn, err := interruptibleTCPDial(addr, config)
  61. if err != nil {
  62. return nil, ContextError(err)
  63. }
  64. if config.ClosedSignal != nil {
  65. if !conn.SetClosedSignal(config.ClosedSignal) {
  66. // Conn is already closed. This is not unexpected -- for example,
  67. // when establish is interrupted.
  68. // TODO: make this not log an error when called from establishTunnelWorker?
  69. return nil, ContextError(errors.New("conn already closed"))
  70. }
  71. }
  72. return conn, nil
  73. }
  74. if config.UpstreamProxyUrl != "" {
  75. upstreamDialer := upstreamproxy.NewProxyDialFunc(
  76. &upstreamproxy.UpstreamProxyConfig{
  77. ForwardDialFunc: dialer,
  78. ProxyURIString: config.UpstreamProxyUrl,
  79. })
  80. dialer = func(network, addr string) (net.Conn, error) {
  81. // The entire upstream dial is wrapped in an explicit timeout. This
  82. // may include network connection read and writes when proxy auth negotation
  83. // is performed.
  84. type upstreamDialResult struct {
  85. conn net.Conn
  86. err error
  87. }
  88. resultChannel := make(chan *upstreamDialResult, 2)
  89. time.AfterFunc(config.ConnectTimeout, func() {
  90. // TODO: we could "interrupt" the underlying TCPConn at this point, as
  91. // it's being abandoned. But we don't have a reference to it. It's left
  92. // to the outer DialConfig.PendingConns to track and clean up that TCPConn.
  93. resultChannel <- &upstreamDialResult{nil, errors.New("upstreamproxy dial timeout")}
  94. })
  95. go func() {
  96. conn, err := upstreamDialer(network, addr)
  97. resultChannel <- &upstreamDialResult{conn, err}
  98. }()
  99. result := <-resultChannel
  100. if _, ok := result.err.(*upstreamproxy.Error); ok {
  101. NoticeUpstreamProxyError(result.err)
  102. }
  103. return result.conn, result.err
  104. }
  105. }
  106. return dialer
  107. }
  108. // SetClosedSignal implements psiphon.Conn.SetClosedSignal.
  109. func (conn *TCPConn) SetClosedSignal(closedSignal chan struct{}) bool {
  110. conn.mutex.Lock()
  111. defer conn.mutex.Unlock()
  112. if conn.isClosed {
  113. return false
  114. }
  115. conn.closedSignal = closedSignal
  116. return true
  117. }
  118. // Close terminates a connected (net.Conn) or connecting (socketFd) TCPConn.
  119. // A mutex is required to support psiphon.Conn.SetClosedSignal concurrency semantics.
  120. func (conn *TCPConn) Close() (err error) {
  121. conn.mutex.Lock()
  122. defer conn.mutex.Unlock()
  123. if !conn.isClosed {
  124. if conn.Conn == nil {
  125. err = interruptibleTCPClose(conn.interruptible)
  126. } else {
  127. err = conn.Conn.Close()
  128. }
  129. conn.isClosed = true
  130. select {
  131. case conn.closedSignal <- *new(struct{}):
  132. default:
  133. }
  134. }
  135. return err
  136. }
  137. // Read wraps standard Read to add an idle timeout. The connection
  138. // is explicitly closed on timeout.
  139. func (conn *TCPConn) Read(buffer []byte) (n int, err error) {
  140. // Note: no mutex on the conn.readTimeout access
  141. if conn.readTimeout != 0 {
  142. err = conn.Conn.SetReadDeadline(time.Now().Add(conn.readTimeout))
  143. if err != nil {
  144. return 0, ContextError(err)
  145. }
  146. }
  147. n, err = conn.Conn.Read(buffer)
  148. if err != nil {
  149. conn.Close()
  150. }
  151. return
  152. }
  153. // Write wraps standard Write to add an idle timeout The connection
  154. // is explicitly closed on timeout.
  155. func (conn *TCPConn) Write(buffer []byte) (n int, err error) {
  156. // Note: no mutex on the conn.writeTimeout access
  157. if conn.writeTimeout != 0 {
  158. err = conn.Conn.SetWriteDeadline(time.Now().Add(conn.writeTimeout))
  159. if err != nil {
  160. return 0, ContextError(err)
  161. }
  162. }
  163. n, err = conn.Conn.Write(buffer)
  164. if err != nil {
  165. conn.Close()
  166. }
  167. return
  168. }
  169. // Override implementation of net.Conn.SetDeadline
  170. func (conn *TCPConn) SetDeadline(t time.Time) error {
  171. return errors.New("net.Conn SetDeadline not supported")
  172. }
  173. // Override implementation of net.Conn.SetReadDeadline
  174. func (conn *TCPConn) SetReadDeadline(t time.Time) error {
  175. return errors.New("net.Conn SetReadDeadline not supported")
  176. }
  177. // Override implementation of net.Conn.SetWriteDeadline
  178. func (conn *TCPConn) SetWriteDeadline(t time.Time) error {
  179. return errors.New("net.Conn SetWriteDeadline not supported")
  180. }