tapdance.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. // +build TAPDANCE
  2. /*
  3. * Copyright (c) 2018, Psiphon Inc.
  4. * All rights reserved.
  5. *
  6. * This program is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License
  17. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. *
  19. */
  20. /*
  21. Package tapdance wraps github.com/refraction-networking/gotapdance with net.Listener
  22. and net.Conn types that provide drop-in integration with Psiphon.
  23. */
  24. package tapdance
  25. import (
  26. "context"
  27. "io/ioutil"
  28. "net"
  29. "os"
  30. "path/filepath"
  31. "sync"
  32. "sync/atomic"
  33. "time"
  34. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  35. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  36. "github.com/armon/go-proxyproto"
  37. refraction_networking_tapdance "github.com/refraction-networking/gotapdance/tapdance"
  38. )
  39. const (
  40. READ_PROXY_PROTOCOL_HEADER_TIMEOUT = 5 * time.Second
  41. )
  42. // Enabled indicates if Tapdance functionality is enabled.
  43. func Enabled() bool {
  44. return true
  45. }
  46. // Listener is a net.Listener.
  47. type Listener struct {
  48. net.Listener
  49. }
  50. // Listen creates a new Tapdance listener on top of an existing TCP listener.
  51. //
  52. // The Tapdance station will send the original client address via the HAProxy
  53. // proxy protocol v1, https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt.
  54. // The original client address is read and returned by accepted conns'
  55. // RemoteAddr. RemoteAddr _must_ be called non-concurrently before calling Read
  56. // on accepted conns as the HAProxy proxy protocol header reading logic sets
  57. // SetReadDeadline and performs a Read.
  58. func Listen(tcpListener net.Listener) (net.Listener, error) {
  59. // Setting a timeout ensures that reading the proxy protocol
  60. // header completes or times out and RemoteAddr will not block. See:
  61. // https://godoc.org/github.com/armon/go-proxyproto#Conn.RemoteAddr
  62. proxyListener := &proxyproto.Listener{
  63. Listener: tcpListener,
  64. ProxyHeaderTimeout: READ_PROXY_PROTOCOL_HEADER_TIMEOUT}
  65. stationListener := &stationListener{
  66. proxyListener: proxyListener,
  67. }
  68. return &Listener{Listener: stationListener}, nil
  69. }
  70. // stationListener uses the proxyproto.Listener SourceCheck callback to
  71. // capture and record the direct remote address, the Tapdance station address,
  72. // and wraps accepted conns to provide station address metrics via GetMetrics.
  73. // These metrics enable identifying which station fronted a connection, which
  74. // is useful for network operations and troubleshooting.
  75. //
  76. // go-proxyproto.Conn.RemoteAddr reports the originating client IP address,
  77. // which is geolocated and recorded for metrics. The underlying conn's remote
  78. // address, the Tapdance station address, is not accessible via the
  79. // go-proxyproto API.
  80. //
  81. // stationListener is not safe for concurrent access.
  82. type stationListener struct {
  83. proxyListener *proxyproto.Listener
  84. }
  85. func (l *stationListener) Accept() (net.Conn, error) {
  86. var stationRemoteAddr net.Addr
  87. l.proxyListener.SourceCheck = func(addr net.Addr) (bool, error) {
  88. stationRemoteAddr = addr
  89. return true, nil
  90. }
  91. conn, err := l.proxyListener.Accept()
  92. if err != nil {
  93. return nil, err
  94. }
  95. if stationRemoteAddr == nil {
  96. return nil, errors.TraceNew("missing station address")
  97. }
  98. return &stationConn{
  99. Conn: conn,
  100. stationIPAddress: common.IPAddressFromAddr(stationRemoteAddr),
  101. }, nil
  102. }
  103. func (l *stationListener) Close() error {
  104. return l.proxyListener.Close()
  105. }
  106. func (l *stationListener) Addr() net.Addr {
  107. return l.proxyListener.Addr()
  108. }
  109. type stationConn struct {
  110. net.Conn
  111. stationIPAddress string
  112. }
  113. // IrregularTunnelError implements the common.IrregularIndicator interface.
  114. func (c *stationConn) IrregularTunnelError() error {
  115. // We expect a PROXY protocol header, but go-proxyproto does not produce an
  116. // error if the "PROXY " prefix is absent; instead the connection will
  117. // proceed. To detect this case, check if the go-proxyproto RemoteAddr IP
  118. // address matches the underlying connection IP address. When these values
  119. // match, there was no PROXY protocol header.
  120. //
  121. // Limitation: the values will match if there is a PROXY protocol header
  122. // containing the same IP address as the underlying connection. This is not
  123. // an expected case.
  124. if common.IPAddressFromAddr(c.RemoteAddr()) == c.stationIPAddress {
  125. return errors.TraceNew("unexpected station IP address")
  126. }
  127. return nil
  128. }
  129. // GetMetrics implements the common.MetricsSource interface.
  130. func (c *stationConn) GetMetrics() common.LogFields {
  131. logFields := make(common.LogFields)
  132. // Ensure we don't log a potential non-station IP address.
  133. if c.IrregularTunnelError() == nil {
  134. logFields["station_ip_address"] = c.stationIPAddress
  135. }
  136. return logFields
  137. }
  138. // dialManager tracks all dials performed by and dialed conns used by a
  139. // refraction_networking_tapdance client. dialManager.close interrupts/closes
  140. // all pending dials and established conns immediately. This ensures that
  141. // blocking calls within refraction_networking_tapdance, such as tls.Handhake,
  142. // are interrupted:
  143. // E.g., https://github.com/refraction-networking/gotapdance/blob/4d84655dad2e242b0af0459c31f687b12085dcca/tapdance/conn_raw.go#L307
  144. // (...preceeding SetDeadline is insufficient for immediate cancellation.)
  145. type dialManager struct {
  146. tcpDialer func(ctx context.Context, network, address string) (net.Conn, error)
  147. ctxMutex sync.Mutex
  148. useRunCtx bool
  149. initialDialCtx context.Context
  150. runCtx context.Context
  151. stopRunning context.CancelFunc
  152. conns *common.Conns
  153. }
  154. func newDialManager(
  155. tcpDialer func(ctx context.Context, network, address string) (net.Conn, error)) *dialManager {
  156. runCtx, stopRunning := context.WithCancel(context.Background())
  157. return &dialManager{
  158. tcpDialer: tcpDialer,
  159. runCtx: runCtx,
  160. stopRunning: stopRunning,
  161. conns: common.NewConns(),
  162. }
  163. }
  164. func (manager *dialManager) dial(ctx context.Context, network, address string) (net.Conn, error) {
  165. if network != "tcp" {
  166. return nil, errors.Tracef("unsupported network: %s", network)
  167. }
  168. // The context for this dial is either:
  169. // - ctx, during the initial tapdance.DialContext, when this is Psiphon tunnel
  170. // establishment.
  171. // - manager.runCtx after the initial tapdance.Dial completes, in which case
  172. // this is a Tapdance protocol reconnection that occurs periodically for
  173. // already established tunnels.
  174. manager.ctxMutex.Lock()
  175. if manager.useRunCtx {
  176. // Preserve the random timeout configured by the tapdance client:
  177. // https://github.com/refraction-networking/gotapdance/blob/4d84655dad2e242b0af0459c31f687b12085dcca/tapdance/conn_raw.go#L263
  178. deadline, ok := ctx.Deadline()
  179. if !ok {
  180. return nil, errors.Tracef("unexpected nil deadline")
  181. }
  182. var cancelFunc context.CancelFunc
  183. ctx, cancelFunc = context.WithDeadline(manager.runCtx, deadline)
  184. defer cancelFunc()
  185. }
  186. manager.ctxMutex.Unlock()
  187. conn, err := manager.tcpDialer(ctx, network, address)
  188. if err != nil {
  189. return nil, errors.Trace(err)
  190. }
  191. // Fail immediately if CloseWrite isn't available in the underlying dialed
  192. // conn. The equivalent check in managedConn.CloseWrite isn't fatal and
  193. // tapdance will run in a degraded state.
  194. // Limitation: if the underlying conn _also_ passes through CloseWrite, this
  195. // check may be insufficient.
  196. if _, ok := conn.(common.CloseWriter); !ok {
  197. return nil, errors.TraceNew("underlying conn is not a CloseWriter")
  198. }
  199. conn = &managedConn{
  200. Conn: conn,
  201. manager: manager,
  202. }
  203. if !manager.conns.Add(conn) {
  204. conn.Close()
  205. return nil, errors.TraceNew("already closed")
  206. }
  207. return conn, nil
  208. }
  209. func (manager *dialManager) startUsingRunCtx() {
  210. manager.ctxMutex.Lock()
  211. manager.initialDialCtx = nil
  212. manager.useRunCtx = true
  213. manager.ctxMutex.Unlock()
  214. }
  215. func (manager *dialManager) close() {
  216. manager.conns.CloseAll()
  217. manager.stopRunning()
  218. }
  219. type managedConn struct {
  220. net.Conn
  221. manager *dialManager
  222. }
  223. // CloseWrite exposes the net.TCPConn.CloseWrite() functionality
  224. // required by tapdance.
  225. func (conn *managedConn) CloseWrite() error {
  226. if closeWriter, ok := conn.Conn.(common.CloseWriter); ok {
  227. return closeWriter.CloseWrite()
  228. }
  229. return errors.TraceNew("underlying conn is not a CloseWriter")
  230. }
  231. func (conn *managedConn) Close() error {
  232. // Remove must be invoked asynchronously, as this Close may be called by
  233. // conns.CloseAll, leading to a reentrant lock situation.
  234. go conn.manager.conns.Remove(conn)
  235. return conn.Conn.Close()
  236. }
  237. type tapdanceConn struct {
  238. net.Conn
  239. manager *dialManager
  240. isClosed int32
  241. }
  242. func (conn *tapdanceConn) Close() error {
  243. conn.manager.close()
  244. err := conn.Conn.Close()
  245. atomic.StoreInt32(&conn.isClosed, 1)
  246. return err
  247. }
  248. func (conn *tapdanceConn) IsClosed() bool {
  249. return atomic.LoadInt32(&conn.isClosed) == 1
  250. }
  251. // Dial establishes a new Tapdance session to a Tapdance station specified in
  252. // the config assets and forwarding through to the Psiphon server specified by
  253. // address.
  254. //
  255. // The Tapdance station config assets are read from dataDirectory/"tapdance".
  256. // When no config is found, default assets are paved. ctx is expected to have
  257. // a timeout for the dial.
  258. //
  259. // Limitation: the parameters emitLogs and dataDirectory are used for one-time
  260. // initialization and are ignored after the first Dial call.
  261. func Dial(
  262. ctx context.Context,
  263. emitLogs bool,
  264. dataDirectory string,
  265. netDialer common.NetDialer,
  266. address string) (net.Conn, error) {
  267. err := initTapdance(emitLogs, dataDirectory)
  268. if err != nil {
  269. return nil, errors.Trace(err)
  270. }
  271. if _, ok := ctx.Deadline(); !ok {
  272. return nil, errors.TraceNew("dial context has no timeout")
  273. }
  274. manager := newDialManager(netDialer.DialContext)
  275. tapdanceDialer := &refraction_networking_tapdance.Dialer{
  276. TcpDialer: manager.dial,
  277. }
  278. // If the dial context is cancelled, use dialManager to interrupt
  279. // tapdanceDialer.DialContext. See dialManager comment explaining why
  280. // tapdanceDialer.DialContext may block even when the input context is
  281. // cancelled.
  282. dialComplete := make(chan struct{})
  283. go func() {
  284. select {
  285. case <-ctx.Done():
  286. case <-dialComplete:
  287. }
  288. select {
  289. // Prioritize the dialComplete case.
  290. case <-dialComplete:
  291. return
  292. default:
  293. }
  294. manager.close()
  295. }()
  296. conn, err := tapdanceDialer.DialContext(ctx, "tcp", address)
  297. close(dialComplete)
  298. if err != nil {
  299. manager.close()
  300. return nil, errors.Trace(err)
  301. }
  302. manager.startUsingRunCtx()
  303. return &tapdanceConn{
  304. Conn: conn,
  305. manager: manager,
  306. }, nil
  307. }
  308. var initTapdanceOnce sync.Once
  309. func initTapdance(emitLogs bool, dataDirectory string) error {
  310. var initErr error
  311. initTapdanceOnce.Do(func() {
  312. if !emitLogs {
  313. refraction_networking_tapdance.Logger().Out = ioutil.Discard
  314. }
  315. refraction_networking_tapdance.EnableProxyProtocol()
  316. assetsDir := filepath.Join(dataDirectory, "tapdance")
  317. err := os.MkdirAll(assetsDir, 0700)
  318. if err != nil {
  319. initErr = errors.Trace(err)
  320. return
  321. }
  322. clientConfFileName := filepath.Join(assetsDir, "ClientConf")
  323. _, err = os.Stat(clientConfFileName)
  324. if err != nil && os.IsNotExist(err) {
  325. err = ioutil.WriteFile(clientConfFileName, getEmbeddedClientConf(), 0644)
  326. }
  327. if err != nil {
  328. initErr = errors.Trace(err)
  329. return
  330. }
  331. refraction_networking_tapdance.AssetsSetDir(assetsDir)
  332. })
  333. return initErr
  334. }