|
|
@@ -71,8 +71,8 @@ type Tunnel struct {
|
|
|
sshClient *ssh.Client
|
|
|
operateWaitGroup *sync.WaitGroup
|
|
|
shutdownOperateBroadcast chan struct{}
|
|
|
- portForwardFailures chan int
|
|
|
- portForwardFailureTotal int
|
|
|
+ signalPortForwardFailure chan struct{}
|
|
|
+ totalPortForwardFailures int
|
|
|
sessionStartTime time.Time
|
|
|
}
|
|
|
|
|
|
@@ -122,9 +122,10 @@ func EstablishTunnel(
|
|
|
sshClient: sshClient,
|
|
|
operateWaitGroup: new(sync.WaitGroup),
|
|
|
shutdownOperateBroadcast: make(chan struct{}),
|
|
|
- // portForwardFailures buffer size is large enough to receive the thresold number
|
|
|
- // of failure reports without blocking. Senders can drop failures without blocking.
|
|
|
- portForwardFailures: make(chan int, config.PortForwardFailureThreshold)}
|
|
|
+ // A buffer allows at least one signal to be sent even when the receiver is
|
|
|
+ // not listening. Senders should not block.
|
|
|
+ signalPortForwardFailure: make(chan struct{}, 1),
|
|
|
+ }
|
|
|
|
|
|
// Create a new Psiphon API session for this tunnel. This includes performing
|
|
|
// a handshake request. If the handshake fails, this establishment fails.
|
|
|
@@ -214,7 +215,7 @@ func (tunnel *Tunnel) Dial(
|
|
|
if result.err != nil {
|
|
|
// TODO: conditional on type of error or error message?
|
|
|
select {
|
|
|
- case tunnel.portForwardFailures <- 1:
|
|
|
+ case tunnel.signalPortForwardFailure <- *new(struct{}):
|
|
|
default:
|
|
|
}
|
|
|
return nil, ContextError(result.err)
|
|
|
@@ -225,11 +226,14 @@ func (tunnel *Tunnel) Dial(
|
|
|
tunnel: tunnel,
|
|
|
downstreamConn: downstreamConn}
|
|
|
|
|
|
- // Tunnel does not have a session when DisableApi is set
|
|
|
+ // Tunnel does not have a session when DisableApi is set. We still use
|
|
|
+ // transferstats.Conn to count bytes transferred for monitoring tunnel
|
|
|
+ // quality.
|
|
|
+ var regexps *transferstats.Regexps
|
|
|
if tunnel.session != nil {
|
|
|
- conn = transferstats.NewConn(
|
|
|
- conn, tunnel.session.StatsServerID(), tunnel.session.StatsRegexps())
|
|
|
+ regexps = tunnel.session.StatsRegexps()
|
|
|
}
|
|
|
+ conn = transferstats.NewConn(conn, tunnel.serverEntry.IpAddress, regexps)
|
|
|
|
|
|
return conn, nil
|
|
|
}
|
|
|
@@ -255,11 +259,11 @@ type TunneledConn struct {
|
|
|
func (conn *TunneledConn) Read(buffer []byte) (n int, err error) {
|
|
|
n, err = conn.Conn.Read(buffer)
|
|
|
if err != nil && err != io.EOF {
|
|
|
- // Report 1 new failure. Won't block; assumes the receiver
|
|
|
+ // Report new failure. Won't block; assumes the receiver
|
|
|
// has a sufficient buffer for the threshold number of reports.
|
|
|
// TODO: conditional on type of error or error message?
|
|
|
select {
|
|
|
- case conn.tunnel.portForwardFailures <- 1:
|
|
|
+ case conn.tunnel.signalPortForwardFailure <- *new(struct{}):
|
|
|
default:
|
|
|
}
|
|
|
}
|
|
|
@@ -271,7 +275,7 @@ func (conn *TunneledConn) Write(buffer []byte) (n int, err error) {
|
|
|
if err != nil && err != io.EOF {
|
|
|
// Same as TunneledConn.Read()
|
|
|
select {
|
|
|
- case conn.tunnel.portForwardFailures <- 1:
|
|
|
+ case conn.tunnel.signalPortForwardFailure <- *new(struct{}):
|
|
|
default:
|
|
|
}
|
|
|
}
|
|
|
@@ -485,74 +489,82 @@ func dialSsh(
|
|
|
return conn, result.sshClient, nil
|
|
|
}
|
|
|
|
|
|
-// operateTunnel periodically sends status requests (traffic stats updates updates)
|
|
|
-// to the Psiphon API; and monitors the tunnel for failures:
|
|
|
+// operateTunnel monitors the health of the tunnel and performs
|
|
|
+// periodic work.
|
|
|
+//
|
|
|
+// BytesTransferred and TotalBytesTransferred notices are emitted
|
|
|
+// for live reporting and diagnostics reporting, respectively.
|
|
|
+//
|
|
|
+// Status requests are sent to the Psiphon API to report bytes
|
|
|
+// transferred.
|
|
|
//
|
|
|
-// 1. Overall tunnel failure: the tunnel sends a signal to the ClosedSignal
|
|
|
-// channel on keep-alive failure and other transport I/O errors. In case
|
|
|
-// of such a failure, the tunnel is marked as failed.
|
|
|
+// Periodic SSH keep alive packets are sent to ensure the underlying
|
|
|
+// TCP connection isn't terminated by NAT, or other network
|
|
|
+// interference -- or test if it has been terminated while the device
|
|
|
+// has been asleep. When a keep alive times out, the tunnel is
|
|
|
+// considered failed.
|
|
|
//
|
|
|
-// 2. Tunnel port forward failures: the tunnel connection may stay up but
|
|
|
-// the client may still fail to establish port forwards due to server load
|
|
|
-// and other conditions. After a threshold number of such failures, the
|
|
|
-// overall tunnel is marked as failed.
|
|
|
+// An immediate SSH keep alive "probe" is sent to test the tunnel and
|
|
|
+// server responsiveness when a port forward failure is detected: a
|
|
|
+// failed dial or failed read/write. This keep alive has a shorter
|
|
|
+// timeout.
|
|
|
//
|
|
|
-// TODO: currently, any connect (dial), read, or write error associated with
|
|
|
-// a port forward is counted as a failure. It may be important to differentiate
|
|
|
-// between failures due to Psiphon server conditions and failures due to the
|
|
|
-// origin/target server (in the latter case, the tunnel is healthy). Here are
|
|
|
-// some typical error messages to consider matching against (or ignoring):
|
|
|
+// Note that port foward failures may be due to non-failure conditions.
|
|
|
+// For example, when the user inputs an invalid domain name and
|
|
|
+// resolution is done by the ssh server; or trying to connect to a
|
|
|
+// non-white-listed port; and the error message in these cases is not
|
|
|
+// distinguishable from a a true server error (a common error message,
|
|
|
+// "ssh: rejected: administratively prohibited (open failed)", may be
|
|
|
+// returned for these cases but also if the server has run out of
|
|
|
+// ephemeral ports, for example).
|
|
|
//
|
|
|
-// - "ssh: rejected: administratively prohibited (open failed)"
|
|
|
-// (this error message is reported in both actual and false cases: when a server
|
|
|
-// is overloaded and has no free ephemeral ports; and when the user mistypes
|
|
|
-// a domain in a browser address bar and name resolution fails)
|
|
|
-// - "ssh: rejected: connect failed (Connection timed out)"
|
|
|
-// - "write tcp ... broken pipe"
|
|
|
-// - "read tcp ... connection reset by peer"
|
|
|
-// - "ssh: unexpected packet in response to channel open: <nil>"
|
|
|
+// SSH keep alives are not sent when the tunnel has been recently
|
|
|
+// active (not only does tunnel activity obviate the necessity of a keep
|
|
|
+// alive, testing has shown that keep alives may time out for "busy"
|
|
|
+// tunnels, especially over meek protocol and other high latency
|
|
|
+// conditions).
|
|
|
//
|
|
|
-// Update: the above is superceded by SSH keep alives with timeouts. When a keep
|
|
|
-// alive times out, the tunnel is marked as failed. Keep alives are triggered
|
|
|
-// periodically, and also immediately in the case of a port forward failure (so
|
|
|
-// as to immediately detect a situation such as a device waking up and trying
|
|
|
-// to use a dead tunnel). By default, port forward theshold counting does not
|
|
|
-// cause a tunnel to be marked as failed, with the conservative assumption that
|
|
|
-// a server which responds to an SSH keep alive is fully functional.
|
|
|
+// "Recently active" is defined has having received payload bytes. Sent
|
|
|
+// bytes are not considered as testing has shown bytes may appear to
|
|
|
+// send when certain NAT devices have interfered with the tunnel, while
|
|
|
+// no bytes are received. In a pathological case, with DNS implemented
|
|
|
+// as tunneled UDP, a browser may wait excessively for a domain name to
|
|
|
+// resolve, while no new port forward is attempted which would otherwise
|
|
|
+// result in a tunnel failure detection.
|
|
|
//
|
|
|
func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
|
|
|
defer tunnel.operateWaitGroup.Done()
|
|
|
|
|
|
+ lastBytesReceivedTime := time.Now()
|
|
|
+
|
|
|
+ lastTotalBytesTransferedTime := time.Now()
|
|
|
+ totalSent := int64(0)
|
|
|
+ totalReceived := int64(0)
|
|
|
+
|
|
|
+ noticeBytesTransferredTicker := time.NewTicker(1 * time.Second)
|
|
|
+ defer noticeBytesTransferredTicker.Stop()
|
|
|
+
|
|
|
// The next status request and ssh keep alive times are picked at random,
|
|
|
// from a range, to make the resulting traffic less fingerprintable,
|
|
|
- // especially when then tunnel is otherwise idle.
|
|
|
// Note: not using Tickers since these are not fixed time periods.
|
|
|
-
|
|
|
nextStatusRequestPeriod := func() time.Duration {
|
|
|
return MakeRandomPeriod(
|
|
|
PSIPHON_API_STATUS_REQUEST_PERIOD_MIN,
|
|
|
PSIPHON_API_STATUS_REQUEST_PERIOD_MAX)
|
|
|
}
|
|
|
+
|
|
|
+ statsTimer := time.NewTimer(nextStatusRequestPeriod())
|
|
|
+ defer statsTimer.Stop()
|
|
|
+
|
|
|
nextSshKeepAlivePeriod := func() time.Duration {
|
|
|
return MakeRandomPeriod(
|
|
|
TUNNEL_SSH_KEEP_ALIVE_PERIOD_MIN,
|
|
|
TUNNEL_SSH_KEEP_ALIVE_PERIOD_MAX)
|
|
|
}
|
|
|
|
|
|
- // TODO: don't initialize timer if !config.EmitBytesTransferred
|
|
|
- noticeBytesTransferredTicker := time.NewTicker(1 * time.Second)
|
|
|
- if !config.EmitBytesTransferred {
|
|
|
- noticeBytesTransferredTicker.Stop()
|
|
|
- } else {
|
|
|
- defer noticeBytesTransferredTicker.Stop()
|
|
|
- }
|
|
|
-
|
|
|
- statsTimer := time.NewTimer(nextStatusRequestPeriod())
|
|
|
- defer statsTimer.Stop()
|
|
|
-
|
|
|
- // TODO: don't initialize timer if !config.EnablePeriodicSshKeepAlive
|
|
|
+ // TODO: don't initialize timer when config.DisablePeriodicSshKeepAlive is set
|
|
|
sshKeepAliveTimer := time.NewTimer(nextSshKeepAlivePeriod())
|
|
|
- if !config.EnablePeriodicSshKeepAlive {
|
|
|
+ if config.DisablePeriodicSshKeepAlive {
|
|
|
sshKeepAliveTimer.Stop()
|
|
|
} else {
|
|
|
defer sshKeepAliveTimer.Stop()
|
|
|
@@ -564,39 +576,48 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
|
|
|
case <-noticeBytesTransferredTicker.C:
|
|
|
sent, received := transferstats.GetBytesTransferredForServer(
|
|
|
tunnel.serverEntry.IpAddress)
|
|
|
- // Only emit notice when tunnel is not idle.
|
|
|
- if sent > 0 || received > 0 {
|
|
|
- NoticeBytesTransferred(sent, received)
|
|
|
+
|
|
|
+ if received > 0 {
|
|
|
+ lastBytesReceivedTime = time.Now()
|
|
|
+ }
|
|
|
+
|
|
|
+ totalSent += sent
|
|
|
+ totalReceived += received
|
|
|
+
|
|
|
+ if lastTotalBytesTransferedTime.Add(TOTAL_BYTES_TRANSFERRED_NOTICE_PERIOD).Before(time.Now()) {
|
|
|
+ NoticeTotalBytesTransferred(tunnel.serverEntry.IpAddress, totalSent, totalReceived)
|
|
|
+ lastTotalBytesTransferedTime = time.Now()
|
|
|
+ }
|
|
|
+
|
|
|
+ // Only emit the frequent BytesTransferred notice when tunnel is not idle.
|
|
|
+ if config.EmitBytesTransferred && (sent > 0 || received > 0) {
|
|
|
+ NoticeBytesTransferred(tunnel.serverEntry.IpAddress, sent, received)
|
|
|
}
|
|
|
|
|
|
case <-statsTimer.C:
|
|
|
+ // TODO: perform this request asynchronously; don't block other operations
|
|
|
sendStats(tunnel)
|
|
|
statsTimer.Reset(nextStatusRequestPeriod())
|
|
|
|
|
|
case <-sshKeepAliveTimer.C:
|
|
|
- err = sendSshKeepAlive(
|
|
|
- tunnel.sshClient, tunnel.conn, TUNNEL_SSH_KEEP_ALIVE_PERIODIC_TIMEOUT)
|
|
|
+ if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PERIODIC_INACTIVE_PERIOD).Before(time.Now()) {
|
|
|
+ err = sendSshKeepAlive(
|
|
|
+ tunnel.sshClient, tunnel.conn, TUNNEL_SSH_KEEP_ALIVE_PERIODIC_TIMEOUT)
|
|
|
+ }
|
|
|
sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
|
|
|
|
|
|
- case failures := <-tunnel.portForwardFailures:
|
|
|
+ case <-tunnel.signalPortForwardFailure:
|
|
|
// Note: no mutex on portForwardFailureTotal; only referenced here
|
|
|
- tunnel.portForwardFailureTotal += failures
|
|
|
+ tunnel.totalPortForwardFailures++
|
|
|
NoticeInfo("port forward failures for %s: %d",
|
|
|
- tunnel.serverEntry.IpAddress, tunnel.portForwardFailureTotal)
|
|
|
- if config.PortForwardFailureThreshold > 0 &&
|
|
|
- tunnel.portForwardFailureTotal > config.PortForwardFailureThreshold {
|
|
|
- err = errors.New("tunnel exceeded port forward failure threshold")
|
|
|
- } else {
|
|
|
- // Try an SSH keep alive to check the state of the SSH connection
|
|
|
- // Some port forward failures are due to intermittent conditions
|
|
|
- // on the server, so we don't abort the connection until the threshold
|
|
|
- // is hit. But if we can't make a simple round trip request to the
|
|
|
- // server, we'll immediately abort.
|
|
|
+ tunnel.serverEntry.IpAddress, tunnel.totalPortForwardFailures)
|
|
|
+
|
|
|
+ if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PROBE_INACTIVE_PERIOD).Before(time.Now()) {
|
|
|
err = sendSshKeepAlive(
|
|
|
tunnel.sshClient, tunnel.conn, TUNNEL_SSH_KEEP_ALIVE_PROBE_TIMEOUT)
|
|
|
- if config.EnablePeriodicSshKeepAlive {
|
|
|
- sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
|
|
|
- }
|
|
|
+ }
|
|
|
+ if !config.DisablePeriodicSshKeepAlive {
|
|
|
+ sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
|
|
|
}
|
|
|
|
|
|
case <-tunnel.shutdownOperateBroadcast:
|
|
|
@@ -607,6 +628,8 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ NoticeTotalBytesTransferred(tunnel.serverEntry.IpAddress, totalSent, totalReceived)
|
|
|
+
|
|
|
if err != nil {
|
|
|
NoticeAlert("operate tunnel error for %s: %s", tunnel.serverEntry.IpAddress, err)
|
|
|
tunnelOwner.SignalTunnelFailure(tunnel)
|