|
@@ -1,5 +1,5 @@
|
|
|
/*
|
|
/*
|
|
|
- * Copyright (c) 2014, Psiphon Inc.
|
|
|
|
|
|
|
+ * Copyright (c) 2015, Psiphon Inc.
|
|
|
* All rights reserved.
|
|
* All rights reserved.
|
|
|
*
|
|
*
|
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* This program is free software: you can redistribute it and/or modify
|
|
@@ -25,8 +25,6 @@ package psiphon
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
"errors"
|
|
"errors"
|
|
|
- "fmt"
|
|
|
|
|
- "io"
|
|
|
|
|
"net"
|
|
"net"
|
|
|
"sync"
|
|
"sync"
|
|
|
"time"
|
|
"time"
|
|
@@ -37,7 +35,7 @@ import (
|
|
|
// route traffic through the tunnels.
|
|
// route traffic through the tunnels.
|
|
|
type Controller struct {
|
|
type Controller struct {
|
|
|
config *Config
|
|
config *Config
|
|
|
- failureSignal chan struct{}
|
|
|
|
|
|
|
+ componentFailureSignal chan struct{}
|
|
|
shutdownBroadcast chan struct{}
|
|
shutdownBroadcast chan struct{}
|
|
|
runWaitGroup *sync.WaitGroup
|
|
runWaitGroup *sync.WaitGroup
|
|
|
establishedTunnels chan *Tunnel
|
|
establishedTunnels chan *Tunnel
|
|
@@ -45,32 +43,32 @@ type Controller struct {
|
|
|
tunnelMutex sync.Mutex
|
|
tunnelMutex sync.Mutex
|
|
|
tunnels []*Tunnel
|
|
tunnels []*Tunnel
|
|
|
nextTunnel int
|
|
nextTunnel int
|
|
|
- operateWaitGroup *sync.WaitGroup
|
|
|
|
|
isEstablishing bool
|
|
isEstablishing bool
|
|
|
establishWaitGroup *sync.WaitGroup
|
|
establishWaitGroup *sync.WaitGroup
|
|
|
stopEstablishingBroadcast chan struct{}
|
|
stopEstablishingBroadcast chan struct{}
|
|
|
candidateServerEntries chan *ServerEntry
|
|
candidateServerEntries chan *ServerEntry
|
|
|
- pendingConns *Conns
|
|
|
|
|
|
|
+ establishPendingConns *Conns
|
|
|
|
|
+ fetchRemotePendingConns *Conns
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// NewController initializes a new controller.
|
|
// NewController initializes a new controller.
|
|
|
func NewController(config *Config) (controller *Controller) {
|
|
func NewController(config *Config) (controller *Controller) {
|
|
|
return &Controller{
|
|
return &Controller{
|
|
|
config: config,
|
|
config: config,
|
|
|
- // failureSignal receives a signal from a component (including socks and
|
|
|
|
|
|
|
+ // componentFailureSignal receives a signal from a component (including socks and
|
|
|
// http local proxies) if they unexpectedly fail. Senders should not block.
|
|
// http local proxies) if they unexpectedly fail. Senders should not block.
|
|
|
// A buffer allows at least one stop signal to be sent before there is a receiver.
|
|
// A buffer allows at least one stop signal to be sent before there is a receiver.
|
|
|
- failureSignal: make(chan struct{}, 1),
|
|
|
|
|
- shutdownBroadcast: make(chan struct{}),
|
|
|
|
|
- runWaitGroup: new(sync.WaitGroup),
|
|
|
|
|
|
|
+ componentFailureSignal: make(chan struct{}, 1),
|
|
|
|
|
+ shutdownBroadcast: make(chan struct{}),
|
|
|
|
|
+ runWaitGroup: new(sync.WaitGroup),
|
|
|
// establishedTunnels and failedTunnels buffer sizes are large enough to
|
|
// establishedTunnels and failedTunnels buffer sizes are large enough to
|
|
|
// receive full pools of tunnels without blocking. Senders should not block.
|
|
// receive full pools of tunnels without blocking. Senders should not block.
|
|
|
- establishedTunnels: make(chan *Tunnel, config.TunnelPoolSize),
|
|
|
|
|
- failedTunnels: make(chan *Tunnel, config.TunnelPoolSize),
|
|
|
|
|
- tunnels: make([]*Tunnel, 0),
|
|
|
|
|
- operateWaitGroup: new(sync.WaitGroup),
|
|
|
|
|
- isEstablishing: false,
|
|
|
|
|
- pendingConns: new(Conns),
|
|
|
|
|
|
|
+ establishedTunnels: make(chan *Tunnel, config.TunnelPoolSize),
|
|
|
|
|
+ failedTunnels: make(chan *Tunnel, config.TunnelPoolSize),
|
|
|
|
|
+ tunnels: make([]*Tunnel, 0),
|
|
|
|
|
+ isEstablishing: false,
|
|
|
|
|
+ establishPendingConns: new(Conns),
|
|
|
|
|
+ fetchRemotePendingConns: new(Conns),
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -105,24 +103,23 @@ func (controller *Controller) Run(shutdownBroadcast <-chan struct{}) {
|
|
|
select {
|
|
select {
|
|
|
case <-shutdownBroadcast:
|
|
case <-shutdownBroadcast:
|
|
|
Notice(NOTICE_INFO, "controller shutdown by request")
|
|
Notice(NOTICE_INFO, "controller shutdown by request")
|
|
|
- case <-controller.failureSignal:
|
|
|
|
|
- Notice(NOTICE_ALERT, "controller shutdown due to failure")
|
|
|
|
|
|
|
+ case <-controller.componentFailureSignal:
|
|
|
|
|
+ Notice(NOTICE_ALERT, "controller shutdown due to component failure")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Note: in addition to establish(), this pendingConns will interrupt
|
|
|
|
|
- // FetchRemoteServerList
|
|
|
|
|
- controller.pendingConns.CloseAll()
|
|
|
|
|
close(controller.shutdownBroadcast)
|
|
close(controller.shutdownBroadcast)
|
|
|
|
|
+ controller.establishPendingConns.CloseAll()
|
|
|
|
|
+ controller.fetchRemotePendingConns.CloseAll()
|
|
|
controller.runWaitGroup.Wait()
|
|
controller.runWaitGroup.Wait()
|
|
|
|
|
|
|
|
Notice(NOTICE_INFO, "exiting controller")
|
|
Notice(NOTICE_INFO, "exiting controller")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// SignalFailure notifies the controller that an associated component has failed.
|
|
|
|
|
|
|
+// SignalComponentFailure notifies the controller that an associated component has failed.
|
|
|
// This will terminate the controller.
|
|
// This will terminate the controller.
|
|
|
-func (controller *Controller) SignalFailure() {
|
|
|
|
|
|
|
+func (controller *Controller) SignalComponentFailure() {
|
|
|
select {
|
|
select {
|
|
|
- case controller.failureSignal <- *new(struct{}):
|
|
|
|
|
|
|
+ case controller.componentFailureSignal <- *new(struct{}):
|
|
|
default:
|
|
default:
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -133,13 +130,13 @@ func (controller *Controller) SignalFailure() {
|
|
|
func (controller *Controller) remoteServerListFetcher() {
|
|
func (controller *Controller) remoteServerListFetcher() {
|
|
|
defer controller.runWaitGroup.Done()
|
|
defer controller.runWaitGroup.Done()
|
|
|
|
|
|
|
|
- // Note: unlike existing Psiphon clients, this code
|
|
|
|
|
|
|
+ // Note: unlike legacy Psiphon clients, this code
|
|
|
// always makes the fetch remote server list request
|
|
// always makes the fetch remote server list request
|
|
|
loop:
|
|
loop:
|
|
|
for {
|
|
for {
|
|
|
- // TODO: FetchRemoteServerList should have its own pendingConns,
|
|
|
|
|
- // otherwise it may needlessly abort when establish is stopped.
|
|
|
|
|
- err := FetchRemoteServerList(controller.config, controller.pendingConns)
|
|
|
|
|
|
|
+ err := FetchRemoteServerList(
|
|
|
|
|
+ controller.config, controller.fetchRemotePendingConns)
|
|
|
|
|
+
|
|
|
var duration time.Duration
|
|
var duration time.Duration
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
Notice(NOTICE_ALERT, "failed to fetch remote server list: %s", err)
|
|
Notice(NOTICE_ALERT, "failed to fetch remote server list: %s", err)
|
|
@@ -162,9 +159,8 @@ loop:
|
|
|
// runTunnels is the controller tunnel management main loop. It starts and stops
|
|
// runTunnels is the controller tunnel management main loop. It starts and stops
|
|
|
// establishing tunnels based on the target tunnel pool size and the current size
|
|
// establishing tunnels based on the target tunnel pool size and the current size
|
|
|
// of the pool. Tunnels are established asynchronously using worker goroutines.
|
|
// of the pool. Tunnels are established asynchronously using worker goroutines.
|
|
|
-// When a tunnel is established, it's added to the active pool and a corresponding
|
|
|
|
|
-// operateTunnel goroutine is launched which starts a session in the tunnel and
|
|
|
|
|
-// monitors the tunnel for failures.
|
|
|
|
|
|
|
+// When a tunnel is established, it's added to the active pool. The tunnel's
|
|
|
|
|
+// operateTunnel goroutine monitors the tunnel.
|
|
|
// When a tunnel fails, it's removed from the pool and the establish process is
|
|
// When a tunnel fails, it's removed from the pool and the establish process is
|
|
|
// restarted to fill the pool.
|
|
// restarted to fill the pool.
|
|
|
func (controller *Controller) runTunnels() {
|
|
func (controller *Controller) runTunnels() {
|
|
@@ -205,8 +201,6 @@ loop:
|
|
|
Notice(NOTICE_INFO, "established tunnel: %s", establishedTunnel.serverEntry.IpAddress)
|
|
Notice(NOTICE_INFO, "established tunnel: %s", establishedTunnel.serverEntry.IpAddress)
|
|
|
if controller.registerTunnel(establishedTunnel) {
|
|
if controller.registerTunnel(establishedTunnel) {
|
|
|
Notice(NOTICE_INFO, "active tunnel: %s", establishedTunnel.serverEntry.IpAddress)
|
|
Notice(NOTICE_INFO, "active tunnel: %s", establishedTunnel.serverEntry.IpAddress)
|
|
|
- controller.operateWaitGroup.Add(1)
|
|
|
|
|
- go controller.operateTunnel(establishedTunnel)
|
|
|
|
|
} else {
|
|
} else {
|
|
|
controller.discardTunnel(establishedTunnel)
|
|
controller.discardTunnel(establishedTunnel)
|
|
|
}
|
|
}
|
|
@@ -220,7 +214,6 @@ loop:
|
|
|
}
|
|
}
|
|
|
controller.stopEstablishing()
|
|
controller.stopEstablishing()
|
|
|
controller.terminateAllTunnels()
|
|
controller.terminateAllTunnels()
|
|
|
- controller.operateWaitGroup.Wait()
|
|
|
|
|
|
|
|
|
|
// Drain tunnel channels
|
|
// Drain tunnel channels
|
|
|
close(controller.establishedTunnels)
|
|
close(controller.establishedTunnels)
|
|
@@ -235,6 +228,22 @@ loop:
|
|
|
Notice(NOTICE_INFO, "exiting run tunnels")
|
|
Notice(NOTICE_INFO, "exiting run tunnels")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// HandleFailedTunnel implements the TunnelOwner interface. This function
|
|
|
|
|
+// is called by Tunnel.operateTunnel when the tunnel has detected that it
|
|
|
|
|
+// has failed. The Controller will signal runTunnels to create a new
|
|
|
|
|
+// tunnel and/or remove the tunnel from the list of active tunnels.
|
|
|
|
|
+func (controller *Controller) SignalTunnelFailure(tunnel *Tunnel) {
|
|
|
|
|
+ // Don't block. Assumes the receiver has a buffer large enough for
|
|
|
|
|
+ // the typical number of operated tunnels. In case there's no room,
|
|
|
|
|
+ // terminate the tunnel (runTunnels won't get a signal in this case,
|
|
|
|
|
+ // but the tunnel will be removed from the list of active tunnels).
|
|
|
|
|
+ select {
|
|
|
|
|
+ case controller.failedTunnels <- tunnel:
|
|
|
|
|
+ default:
|
|
|
|
|
+ controller.terminateTunnel(tunnel)
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// discardTunnel disposes of a successful connection that is no longer required.
|
|
// discardTunnel disposes of a successful connection that is no longer required.
|
|
|
func (controller *Controller) discardTunnel(tunnel *Tunnel) {
|
|
func (controller *Controller) discardTunnel(tunnel *Tunnel) {
|
|
|
Notice(NOTICE_INFO, "discard tunnel: %s", tunnel.serverEntry.IpAddress)
|
|
Notice(NOTICE_INFO, "discard tunnel: %s", tunnel.serverEntry.IpAddress)
|
|
@@ -319,12 +328,7 @@ func (controller *Controller) getNextActiveTunnel() (tunnel *Tunnel) {
|
|
|
tunnel = controller.tunnels[controller.nextTunnel]
|
|
tunnel = controller.tunnels[controller.nextTunnel]
|
|
|
controller.nextTunnel =
|
|
controller.nextTunnel =
|
|
|
(controller.nextTunnel + 1) % len(controller.tunnels)
|
|
(controller.nextTunnel + 1) % len(controller.tunnels)
|
|
|
- // A tunnel must[*] have started its session (performed the server
|
|
|
|
|
- // API handshake sequence) before it may be used for tunneling traffic
|
|
|
|
|
- // [*]currently not enforced by the server, but may be in the future.
|
|
|
|
|
- if tunnel.IsSessionStarted() {
|
|
|
|
|
- return tunnel
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ return tunnel
|
|
|
}
|
|
}
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
@@ -342,142 +346,6 @@ func (controller *Controller) isActiveTunnelServerEntry(serverEntry *ServerEntry
|
|
|
return false
|
|
return false
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// operateTunnel starts a Psiphon session (handshake, etc.) on a newly
|
|
|
|
|
-// connected tunnel, and then monitors the tunnel for failures:
|
|
|
|
|
-//
|
|
|
|
|
-// 1. Overall tunnel failure: the tunnel sends a signal to the ClosedSignal
|
|
|
|
|
-// channel on keep-alive failure and other transport I/O errors. In case
|
|
|
|
|
-// of such a failure, the tunnel is marked as failed.
|
|
|
|
|
-//
|
|
|
|
|
-// 2. Tunnel port forward failures: the tunnel connection may stay up but
|
|
|
|
|
-// the client may still fail to establish port forwards due to server load
|
|
|
|
|
-// and other conditions. After a threshold number of such failures, the
|
|
|
|
|
-// overall tunnel is marked as failed.
|
|
|
|
|
-//
|
|
|
|
|
-// TODO: currently, any connect (dial), read, or write error associated with
|
|
|
|
|
-// a port forward is counted as a failure. It may be important to differentiate
|
|
|
|
|
-// between failures due to Psiphon server conditions and failures due to the
|
|
|
|
|
-// origin/target server (in the latter case, the tunnel is healthy). Here are
|
|
|
|
|
-// some typical error messages to consider matching against (or ignoring):
|
|
|
|
|
-//
|
|
|
|
|
-// - "ssh: rejected: administratively prohibited (open failed)"
|
|
|
|
|
-// - "ssh: rejected: connect failed (Connection timed out)"
|
|
|
|
|
-// - "write tcp ... broken pipe"
|
|
|
|
|
-// - "read tcp ... connection reset by peer"
|
|
|
|
|
-// - "ssh: unexpected packet in response to channel open: <nil>"
|
|
|
|
|
-//
|
|
|
|
|
-func (controller *Controller) operateTunnel(tunnel *Tunnel) {
|
|
|
|
|
- defer controller.operateWaitGroup.Done()
|
|
|
|
|
-
|
|
|
|
|
- tunnelClosedSignal := make(chan struct{}, 1)
|
|
|
|
|
- err := tunnel.conn.SetClosedSignal(tunnelClosedSignal)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- err = fmt.Errorf("failed to set closed signal: %s", err)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- Notice(NOTICE_INFO, "starting session for %s", tunnel.serverEntry.IpAddress)
|
|
|
|
|
- // TODO: NewSession server API calls may block shutdown
|
|
|
|
|
- session, err := NewSession(controller.config, tunnel)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- err = fmt.Errorf("error starting session for %s: %s", tunnel.serverEntry.IpAddress, err)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // Tunnel may now be used for port forwarding
|
|
|
|
|
- tunnel.SetSessionStarted()
|
|
|
|
|
-
|
|
|
|
|
- // Promote this successful tunnel to first rank so it's one
|
|
|
|
|
- // of the first candidates next time establish runs.
|
|
|
|
|
- PromoteServerEntry(tunnel.serverEntry.IpAddress)
|
|
|
|
|
-
|
|
|
|
|
- statsTimer := time.NewTimer(NextSendPeriod())
|
|
|
|
|
-
|
|
|
|
|
- for err == nil {
|
|
|
|
|
- select {
|
|
|
|
|
- case failures := <-tunnel.portForwardFailures:
|
|
|
|
|
- tunnel.portForwardFailureTotal += failures
|
|
|
|
|
- Notice(
|
|
|
|
|
- NOTICE_INFO, "port forward failures for %s: %d",
|
|
|
|
|
- tunnel.serverEntry.IpAddress, tunnel.portForwardFailureTotal)
|
|
|
|
|
- if tunnel.portForwardFailureTotal > controller.config.PortForwardFailureThreshold {
|
|
|
|
|
- err = errors.New("tunnel exceeded port forward failure threshold")
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- case <-tunnelClosedSignal:
|
|
|
|
|
- // TODO: this signal can be received during a commanded shutdown due to
|
|
|
|
|
- // how tunnels are closed; should rework this to avoid log noise.
|
|
|
|
|
- err = errors.New("tunnel closed unexpectedly")
|
|
|
|
|
-
|
|
|
|
|
- case <-controller.shutdownBroadcast:
|
|
|
|
|
- // Send final stats
|
|
|
|
|
- sendStats(tunnel, session, true)
|
|
|
|
|
- Notice(NOTICE_INFO, "shutdown operate tunnel")
|
|
|
|
|
- return
|
|
|
|
|
-
|
|
|
|
|
- case <-statsTimer.C:
|
|
|
|
|
- sendStats(tunnel, session, false)
|
|
|
|
|
- statsTimer.Reset(NextSendPeriod())
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- Notice(NOTICE_ALERT, "operate tunnel error for %s: %s", tunnel.serverEntry.IpAddress, err)
|
|
|
|
|
- // Don't block. Assumes the receiver has a buffer large enough for
|
|
|
|
|
- // the typical number of operated tunnels. In case there's no room,
|
|
|
|
|
- // terminate the tunnel (runTunnels won't get a signal in this case).
|
|
|
|
|
- select {
|
|
|
|
|
- case controller.failedTunnels <- tunnel:
|
|
|
|
|
- default:
|
|
|
|
|
- controller.terminateTunnel(tunnel)
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-// sendStats is a helper for sending session stats to the server.
|
|
|
|
|
-func sendStats(tunnel *Tunnel, session *Session, final bool) {
|
|
|
|
|
- payload := GetForServer(tunnel.serverEntry.IpAddress)
|
|
|
|
|
- if payload != nil {
|
|
|
|
|
- err := session.DoStatusRequest(payload, final)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- Notice(NOTICE_ALERT, "DoStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
|
|
|
|
|
- PutBack(tunnel.serverEntry.IpAddress, payload)
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-// TunneledConn implements net.Conn and wraps a port foward connection.
|
|
|
|
|
-// It is used to hook into Read and Write to observe I/O errors and
|
|
|
|
|
-// report these errors back to the tunnel monitor as port forward failures.
|
|
|
|
|
-type TunneledConn struct {
|
|
|
|
|
- net.Conn
|
|
|
|
|
- tunnel *Tunnel
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (conn *TunneledConn) Read(buffer []byte) (n int, err error) {
|
|
|
|
|
- n, err = conn.Conn.Read(buffer)
|
|
|
|
|
- if err != nil && err != io.EOF {
|
|
|
|
|
- // Report 1 new failure. Won't block; assumes the receiver
|
|
|
|
|
- // has a sufficient buffer for the threshold number of reports.
|
|
|
|
|
- // TODO: conditional on type of error or error message?
|
|
|
|
|
- select {
|
|
|
|
|
- case conn.tunnel.portForwardFailures <- 1:
|
|
|
|
|
- default:
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- return
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (conn *TunneledConn) Write(buffer []byte) (n int, err error) {
|
|
|
|
|
- n, err = conn.Conn.Write(buffer)
|
|
|
|
|
- if err != nil && err != io.EOF {
|
|
|
|
|
- // Same as TunneledConn.Read()
|
|
|
|
|
- select {
|
|
|
|
|
- case conn.tunnel.portForwardFailures <- 1:
|
|
|
|
|
- default:
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- return
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
// Dial selects an active tunnel and establishes a port forward
|
|
// Dial selects an active tunnel and establishes a port forward
|
|
|
// connection through the selected tunnel. Failure to connect is considered
|
|
// connection through the selected tunnel. Failure to connect is considered
|
|
|
// a port foward failure, for the purpose of monitoring tunnel health.
|
|
// a port foward failure, for the purpose of monitoring tunnel health.
|
|
@@ -486,23 +354,16 @@ func (controller *Controller) Dial(remoteAddr string) (conn net.Conn, err error)
|
|
|
if tunnel == nil {
|
|
if tunnel == nil {
|
|
|
return nil, ContextError(errors.New("no active tunnels"))
|
|
return nil, ContextError(errors.New("no active tunnels"))
|
|
|
}
|
|
}
|
|
|
- tunnelConn, err := tunnel.Dial(remoteAddr)
|
|
|
|
|
|
|
+
|
|
|
|
|
+ tunneledConn, err := tunnel.Dial(remoteAddr)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- // TODO: conditional on type of error or error message?
|
|
|
|
|
- select {
|
|
|
|
|
- case tunnel.portForwardFailures <- 1:
|
|
|
|
|
- default:
|
|
|
|
|
- }
|
|
|
|
|
return nil, ContextError(err)
|
|
return nil, ContextError(err)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- statsConn := NewStatsConn(tunnelConn, tunnel.ServerID(), tunnel.StatsRegexps())
|
|
|
|
|
|
|
+ statsConn := NewStatsConn(
|
|
|
|
|
+ tunneledConn, tunnel.session.StatsServerID(), tunnel.session.StatsRegexps())
|
|
|
|
|
|
|
|
- conn = &TunneledConn{
|
|
|
|
|
- Conn: statsConn,
|
|
|
|
|
- tunnel: tunnel}
|
|
|
|
|
-
|
|
|
|
|
- return
|
|
|
|
|
|
|
+ return statsConn, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// startEstablishing creates a pool of worker goroutines which will
|
|
// startEstablishing creates a pool of worker goroutines which will
|
|
@@ -517,6 +378,7 @@ func (controller *Controller) startEstablishing() {
|
|
|
controller.establishWaitGroup = new(sync.WaitGroup)
|
|
controller.establishWaitGroup = new(sync.WaitGroup)
|
|
|
controller.stopEstablishingBroadcast = make(chan struct{})
|
|
controller.stopEstablishingBroadcast = make(chan struct{})
|
|
|
controller.candidateServerEntries = make(chan *ServerEntry)
|
|
controller.candidateServerEntries = make(chan *ServerEntry)
|
|
|
|
|
+ controller.establishPendingConns.Reset()
|
|
|
|
|
|
|
|
for i := 0; i < controller.config.ConnectionWorkerPoolSize; i++ {
|
|
for i := 0; i < controller.config.ConnectionWorkerPoolSize; i++ {
|
|
|
controller.establishWaitGroup.Add(1)
|
|
controller.establishWaitGroup.Add(1)
|
|
@@ -535,10 +397,10 @@ func (controller *Controller) stopEstablishing() {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
Notice(NOTICE_INFO, "stop establishing")
|
|
Notice(NOTICE_INFO, "stop establishing")
|
|
|
|
|
+ close(controller.stopEstablishingBroadcast)
|
|
|
// Note: on Windows, interruptibleTCPClose doesn't really interrupt socket connects
|
|
// Note: on Windows, interruptibleTCPClose doesn't really interrupt socket connects
|
|
|
// and may leave goroutines running for a time after the Wait call.
|
|
// and may leave goroutines running for a time after the Wait call.
|
|
|
- controller.pendingConns.CloseAll()
|
|
|
|
|
- close(controller.stopEstablishingBroadcast)
|
|
|
|
|
|
|
+ controller.establishPendingConns.CloseAll()
|
|
|
// Note: establishCandidateGenerator closes controller.candidateServerEntries
|
|
// Note: establishCandidateGenerator closes controller.candidateServerEntries
|
|
|
// (as it may be sending to that channel).
|
|
// (as it may be sending to that channel).
|
|
|
controller.establishWaitGroup.Wait()
|
|
controller.establishWaitGroup.Wait()
|
|
@@ -559,7 +421,7 @@ func (controller *Controller) establishCandidateGenerator() {
|
|
|
controller.config.EgressRegion, controller.config.TunnelProtocol)
|
|
controller.config.EgressRegion, controller.config.TunnelProtocol)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
Notice(NOTICE_ALERT, "failed to iterate over candidates: %s", err)
|
|
Notice(NOTICE_ALERT, "failed to iterate over candidates: %s", err)
|
|
|
- controller.SignalFailure()
|
|
|
|
|
|
|
+ controller.SignalComponentFailure()
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
defer iterator.Close()
|
|
defer iterator.Close()
|
|
@@ -570,7 +432,7 @@ loop:
|
|
|
serverEntry, err := iterator.Next()
|
|
serverEntry, err := iterator.Next()
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
Notice(NOTICE_ALERT, "failed to get next candidate: %s", err)
|
|
Notice(NOTICE_ALERT, "failed to get next candidate: %s", err)
|
|
|
- controller.SignalFailure()
|
|
|
|
|
|
|
+ controller.SignalComponentFailure()
|
|
|
break loop
|
|
break loop
|
|
|
}
|
|
}
|
|
|
if serverEntry == nil {
|
|
if serverEntry == nil {
|
|
@@ -608,33 +470,52 @@ loop:
|
|
|
// a connection to the tunnel server, and delivers the established tunnel to a channel.
|
|
// a connection to the tunnel server, and delivers the established tunnel to a channel.
|
|
|
func (controller *Controller) establishTunnelWorker() {
|
|
func (controller *Controller) establishTunnelWorker() {
|
|
|
defer controller.establishWaitGroup.Done()
|
|
defer controller.establishWaitGroup.Done()
|
|
|
|
|
+loop:
|
|
|
for serverEntry := range controller.candidateServerEntries {
|
|
for serverEntry := range controller.candidateServerEntries {
|
|
|
- // Note: don't receive from candidateQueue and broadcastStopWorkers in the same
|
|
|
|
|
- // select, since we want to prioritize receiving the stop signal
|
|
|
|
|
- select {
|
|
|
|
|
- case <-controller.stopEstablishingBroadcast:
|
|
|
|
|
- return
|
|
|
|
|
- default:
|
|
|
|
|
|
|
+ // Note: don't receive from candidateServerEntries and stopEstablishingBroadcast
|
|
|
|
|
+ // in the same select, since we want to prioritize receiving the stop signal
|
|
|
|
|
+ if controller.isStopEstablishingBroadcast() {
|
|
|
|
|
+ break loop
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
// There may already be a tunnel to this candidate. If so, skip it.
|
|
// There may already be a tunnel to this candidate. If so, skip it.
|
|
|
if controller.isActiveTunnelServerEntry(serverEntry) {
|
|
if controller.isActiveTunnelServerEntry(serverEntry) {
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
tunnel, err := EstablishTunnel(
|
|
tunnel, err := EstablishTunnel(
|
|
|
- controller.config, controller.pendingConns, serverEntry)
|
|
|
|
|
|
|
+ controller.config,
|
|
|
|
|
+ controller.establishPendingConns,
|
|
|
|
|
+ serverEntry,
|
|
|
|
|
+ controller) // TunnelOwner
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- // TODO: distingush case where conn is interrupted?
|
|
|
|
|
- Notice(NOTICE_INFO, "failed to connect to %s: %s", serverEntry.IpAddress, err)
|
|
|
|
|
- } else {
|
|
|
|
|
- // Don't block. Assumes the receiver has a buffer large enough for
|
|
|
|
|
- // the number of desired tunnels. If there's no room, the tunnel must
|
|
|
|
|
- // not be required so it's discarded.
|
|
|
|
|
- select {
|
|
|
|
|
- case controller.establishedTunnels <- tunnel:
|
|
|
|
|
- default:
|
|
|
|
|
- controller.discardTunnel(tunnel)
|
|
|
|
|
|
|
+ // Before emitting error, check if establish interrupted, in which
|
|
|
|
|
+ // case the error is noise.
|
|
|
|
|
+ if controller.isStopEstablishingBroadcast() {
|
|
|
|
|
+ break loop
|
|
|
}
|
|
}
|
|
|
|
|
+ Notice(NOTICE_INFO, "failed to connect to %s: %s", serverEntry.IpAddress, err)
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Deliver established tunnel.
|
|
|
|
|
+ // Don't block. Assumes the receiver has a buffer large enough for
|
|
|
|
|
+ // the number of desired tunnels. If there's no room, the tunnel must
|
|
|
|
|
+ // not be required so it's discarded.
|
|
|
|
|
+ select {
|
|
|
|
|
+ case controller.establishedTunnels <- tunnel:
|
|
|
|
|
+ default:
|
|
|
|
|
+ controller.discardTunnel(tunnel)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
Notice(NOTICE_INFO, "stopped establish worker")
|
|
Notice(NOTICE_INFO, "stopped establish worker")
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+func (controller *Controller) isStopEstablishingBroadcast() bool {
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-controller.stopEstablishingBroadcast:
|
|
|
|
|
+ return true
|
|
|
|
|
+ default:
|
|
|
|
|
+ }
|
|
|
|
|
+ return false
|
|
|
|
|
+}
|