Просмотр исходного кода

Merge branch 'master' of github.com:Psiphon-Labs/psiphon-tunnel-core into console-client-docker

Michael Goldberger 10 лет назад
Родитель
Сommit
8d5bc53306

+ 1 - 0
AndroidLibrary/README.md

@@ -20,6 +20,7 @@ Follow Go Android documentation:
 * [gomobile documentation](https://godoc.org/golang.org/x/mobile/cmd/gomobile)
 * Requires Go 1.5 or later.
 * Build command: `gomobile bind -target=android github.com/Psiphon-Labs/psiphon-tunnel-core/AndroidLibrary/psi`
+  * Record build version info, as described [here](https://github.com/Psiphon-Labs/psiphon-tunnel-core/blob/master/README.md#setup), by passing a `-ldflags` argument to `gomobile bind`.
 * Output: `psi.aar`
 
 Using

+ 3 - 3
README.md

@@ -31,9 +31,9 @@ Setup
     BUILDREPO=$(git config --get remote.origin.url)
     BUILDREV=$(git rev-parse HEAD)
     LDFLAGS="\
-    -X github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon.buildDate $BUILDDATE \
-    -X github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon.buildRepo $BUILDREPO \
-    -X github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon.buildRev $BUILDREV \
+    -X github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon.buildDate=$BUILDDATE \
+    -X github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon.buildRepo=$BUILDREPO \
+    -X github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon.buildRev=$BUILDREV \
     "
     ```
 

+ 103 - 18
SampleApps/Psibot/app/src/main/java/ca/psiphon/PsiphonTunnel.java

@@ -27,25 +27,36 @@ import android.net.NetworkInfo;
 import android.net.VpnService;
 import android.os.Build;
 import android.os.ParcelFileDescriptor;
+import android.util.Base64;
 
 import org.apache.http.conn.util.InetAddressUtils;
 import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
 import java.net.SocketException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import go.psi.Psi;
 
@@ -72,6 +83,7 @@ public class PsiphonTunnel extends Psi.PsiphonProvider.Stub {
         public void onSplitTunnelRegion(String region);
         public void onUntunneledAddress(String address);
         public void onBytesTransferred(long sent, long received);
+        public void onStartedWaitingForNetworkConnectivity();
     }
 
     private final HostService mHostService;
@@ -80,6 +92,7 @@ public class PsiphonTunnel extends Psi.PsiphonProvider.Stub {
     private int mLocalSocksProxyPort;
     private boolean mRoutingThroughTunnel;
     private Thread mTun2SocksThread;
+    private AtomicBoolean mIsWaitingForNetworkConnectivity;
 
     // Only one PsiphonVpn instance may exist at a time, as the underlying
     // go.psi.Psi and tun2socks implementations each contain global state.
@@ -99,6 +112,7 @@ public class PsiphonTunnel extends Psi.PsiphonProvider.Stub {
         mHostService = hostService;
         mLocalSocksProxyPort = 0;
         mRoutingThroughTunnel = false;
+        mIsWaitingForNetworkConnectivity = new AtomicBoolean(false);
     }
 
     public Object clone() throws CloneNotSupportedException {
@@ -243,8 +257,16 @@ public class PsiphonTunnel extends Psi.PsiphonProvider.Stub {
 
     @Override
     public long HasNetworkConnectivity() {
+        boolean hasConnectivity = hasNetworkConnectivity(mHostService.getContext());
+        boolean wasWaitingForNetworkConnectivity = mIsWaitingForNetworkConnectivity.getAndSet(!hasConnectivity);
+        if (!hasConnectivity && !wasWaitingForNetworkConnectivity) {
+            // HasNetworkConnectivity may be called many times, but only call
+            // onStartedWaitingForNetworkConnectivity once per loss of connectivity,
+            // so the HostService may log a single message.
+            mHostService.onStartedWaitingForNetworkConnectivity();
+        }
         // TODO: change to bool return value once gobind supports that type
-        return hasNetworkConnectivity(mHostService.getContext()) ? 1 : 0;
+        return hasConnectivity ? 1 : 0;
     }
 
     @Override
@@ -307,16 +329,6 @@ public class PsiphonTunnel extends Psi.PsiphonProvider.Stub {
         // This parameter is for stats reporting
         json.put("TunnelWholeDevice", isVpnMode ? 1 : 0);
 
-        // Enable tunnel auto-reconnect after a threshold number of port
-        // forward failures. By default, this mechanism is disabled in
-        // tunnel-core due to the chance of false positives due to
-        // bad user input. Since VpnService mode resolves domain names
-        // differently (udpgw), invalid domain name user input won't result
-        // in SSH port forward failures.
-        if (isVpnMode) {
-            json.put("PortForwardFailureThreshold", 10);
-        }
-
         json.put("EmitBytesTransferred", true);
 
         if (mLocalSocksProxyPort != 0) {
@@ -326,15 +338,18 @@ public class PsiphonTunnel extends Psi.PsiphonProvider.Stub {
             // has no effect with restartPsiphon(), a full stop() is necessary.
             json.put("LocalSocksProxyPort", mLocalSocksProxyPort);
         }
-        
+
         json.put("UseIndistinguishableTLS", true);
 
-        // TODO: doesn't work due to OpenSSL version incompatibility; try using
-        // the KeyStore API to build a local copy of trusted CAs cert files.
-        //
-        //if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.ICE_CREAM_SANDWICH) {
-        //    json.put("SystemCACertificateDirectory", "/system/etc/security/cacerts");
-        //}
+        try {
+            // Also enable indistinguishable TLS for HTTPS requests that
+            // require system CAs.
+            json.put(
+                "TrustedCACertificatesFilename",
+                setupTrustedCertificates(mHostService.getContext()));
+        } catch (Exception e) {
+            mHostService.onDiagnosticMessage(e.getMessage());
+        }
 
         return json.toString();
     }
@@ -399,6 +414,7 @@ public class PsiphonTunnel extends Psi.PsiphonProvider.Stub {
                 mHostService.onUntunneledAddress(notice.getJSONObject("data").getString("address"));
 
             } else if (noticeType.equals("BytesTransferred")) {
+                diagnostic = false;
                 JSONObject data = notice.getJSONObject("data");
                 mHostService.onBytesTransferred(data.getLong("sent"), data.getLong("received"));
             }
@@ -413,6 +429,75 @@ public class PsiphonTunnel extends Psi.PsiphonProvider.Stub {
         }
     }
 
+    private String setupTrustedCertificates(Context context) throws Exception {
+
+        // Copy the Android system CA store to a local, private cert bundle file.
+        //
+        // This results in a file that can be passed to SSL_CTX_load_verify_locations
+        // for use with OpenSSL modes in tunnel-core.
+        // https://www.openssl.org/docs/manmaster/ssl/SSL_CTX_load_verify_locations.html
+        //
+        // TODO: to use the path mode of load_verify_locations would require emulating
+        // the filename scheme used by c_rehash:
+        // https://www.openssl.org/docs/manmaster/apps/c_rehash.html
+        // http://stackoverflow.com/questions/19237167/the-new-subject-hash-openssl-algorithm-differs
+
+        File directory = context.getDir("PsiphonCAStore", Context.MODE_PRIVATE);
+
+        final String errorMessage = "copy AndroidCAStore failed";
+        try {
+
+            File file = new File(directory, "certs.dat");
+
+            // Pave a fresh copy on every run, which ensures we're not using old certs.
+            // Note: assumes KeyStore doesn't return revoked certs.
+            //
+            // TODO: this takes under 1 second, but should we avoid repaving every time?
+            file.delete();
+
+            PrintStream output = null;
+            try {
+                output = new PrintStream(new FileOutputStream(file));
+
+                KeyStore keyStore = KeyStore.getInstance("AndroidCAStore");
+                keyStore.load(null, null);
+
+                Enumeration<String> aliases = keyStore.aliases();
+                while (aliases.hasMoreElements()) {
+                    String alias = aliases.nextElement();
+                    X509Certificate cert = (X509Certificate) keyStore.getCertificate(alias);
+
+                    output.println("-----BEGIN CERTIFICATE-----");
+                    String pemCert = new String(Base64.encode(cert.getEncoded(), Base64.NO_WRAP), "UTF-8");
+                    // OpenSSL appears to reject the default linebreaking done by Base64.encode,
+                    // so we manually linebreak every 64 characters
+                    for (int i = 0; i < pemCert.length() ; i+= 64) {
+                        output.println(pemCert.substring(i, Math.min(i + 64, pemCert.length())));
+                    }
+                    output.println("-----END CERTIFICATE-----");
+                }
+
+                mHostService.onDiagnosticMessage("prepared PsiphonCAStore");
+
+                return file.getAbsolutePath();
+
+            } finally {
+                if (output != null) {
+                    output.close();
+                }
+            }
+
+        } catch (KeyStoreException e) {
+            throw new Exception(errorMessage, e);
+        } catch (NoSuchAlgorithmException e) {
+            throw new Exception(errorMessage, e);
+        } catch (CertificateException e) {
+            throw new Exception(errorMessage, e);
+        } catch (IOException e) {
+            throw new Exception(errorMessage, e);
+        }
+    }
+
     //----------------------------------------------------------------------------------------------
     // Tun2Socks
     //----------------------------------------------------------------------------------------------

+ 5 - 0
SampleApps/Psibot/app/src/main/java/ca/psiphon/psibot/Service.java

@@ -228,6 +228,11 @@ public class Service extends VpnService
     public void onBytesTransferred(long sent, long received) {
     }
 
+    @Override
+    public void onStartedWaitingForNetworkConnectivity() {
+        Log.addEntry("waiting for network connectivity...");
+    }
+
     @Override
     public void onClientRegion(String region) {
         Log.addEntry("client region: " + region);

+ 39 - 43
psiphon/config.go

@@ -29,37 +29,40 @@ import (
 // TODO: allow all params to be configured
 
 const (
-	DATA_STORE_FILENAME                          = "psiphon.db"
-	CONNECTION_WORKER_POOL_SIZE                  = 10
-	TUNNEL_POOL_SIZE                             = 1
-	TUNNEL_CONNECT_TIMEOUT                       = 15 * time.Second
-	TUNNEL_OPERATE_SHUTDOWN_TIMEOUT              = 500 * time.Millisecond
-	TUNNEL_PORT_FORWARD_DIAL_TIMEOUT             = 10 * time.Second
-	TUNNEL_SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES      = 256
-	TUNNEL_SSH_KEEP_ALIVE_PERIOD_MIN             = 60 * time.Second
-	TUNNEL_SSH_KEEP_ALIVE_PERIOD_MAX             = 120 * time.Second
-	TUNNEL_SSH_KEEP_ALIVE_TIMEOUT                = 10 * time.Second
-	ESTABLISH_TUNNEL_TIMEOUT_SECONDS             = 300
-	ESTABLISH_TUNNEL_WORK_TIME_SECONDS           = 60 * time.Second
-	ESTABLISH_TUNNEL_PAUSE_PERIOD                = 5 * time.Second
-	PORT_FORWARD_FAILURE_THRESHOLD               = 0
-	HTTP_PROXY_ORIGIN_SERVER_TIMEOUT             = 15 * time.Second
-	HTTP_PROXY_MAX_IDLE_CONNECTIONS_PER_HOST     = 50
-	FETCH_REMOTE_SERVER_LIST_TIMEOUT             = 10 * time.Second
-	FETCH_REMOTE_SERVER_LIST_RETRY_PERIOD        = 5 * time.Second
-	FETCH_REMOTE_SERVER_LIST_STALE_PERIOD        = 6 * time.Hour
-	PSIPHON_API_CLIENT_SESSION_ID_LENGTH         = 16
-	PSIPHON_API_SERVER_TIMEOUT                   = 20 * time.Second
-	PSIPHON_API_STATUS_REQUEST_PERIOD_MIN        = 5 * time.Minute
-	PSIPHON_API_STATUS_REQUEST_PERIOD_MAX        = 10 * time.Minute
-	PSIPHON_API_STATUS_REQUEST_PADDING_MAX_BYTES = 256
-	PSIPHON_API_CONNECTED_REQUEST_PERIOD         = 24 * time.Hour
-	PSIPHON_API_CONNECTED_REQUEST_RETRY_PERIOD   = 5 * time.Second
-	FETCH_ROUTES_TIMEOUT                         = 1 * time.Minute
-	DOWNLOAD_UPGRADE_TIMEOUT                     = 15 * time.Minute
-	DOWNLOAD_UPGRADE_RETRY_PAUSE_PERIOD          = 5 * time.Second
-	IMPAIRED_PROTOCOL_CLASSIFICATION_DURATION    = 2 * time.Minute
-	IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD   = 3
+	DATA_STORE_FILENAME                            = "psiphon.db"
+	CONNECTION_WORKER_POOL_SIZE                    = 10
+	TUNNEL_POOL_SIZE                               = 1
+	TUNNEL_CONNECT_TIMEOUT                         = 20 * time.Second
+	TUNNEL_OPERATE_SHUTDOWN_TIMEOUT                = 500 * time.Millisecond
+	TUNNEL_PORT_FORWARD_DIAL_TIMEOUT               = 10 * time.Second
+	TUNNEL_SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES        = 256
+	TUNNEL_SSH_KEEP_ALIVE_PERIOD_MIN               = 60 * time.Second
+	TUNNEL_SSH_KEEP_ALIVE_PERIOD_MAX               = 120 * time.Second
+	TUNNEL_SSH_KEEP_ALIVE_PERIODIC_TIMEOUT         = 30 * time.Second
+	TUNNEL_SSH_KEEP_ALIVE_PERIODIC_INACTIVE_PERIOD = 10 * time.Second
+	TUNNEL_SSH_KEEP_ALIVE_PROBE_TIMEOUT            = 5 * time.Second
+	TUNNEL_SSH_KEEP_ALIVE_PROBE_INACTIVE_PERIOD    = 5 * time.Second
+	ESTABLISH_TUNNEL_TIMEOUT_SECONDS               = 300
+	ESTABLISH_TUNNEL_WORK_TIME                     = 60 * time.Second
+	ESTABLISH_TUNNEL_PAUSE_PERIOD                  = 5 * time.Second
+	HTTP_PROXY_ORIGIN_SERVER_TIMEOUT               = 15 * time.Second
+	HTTP_PROXY_MAX_IDLE_CONNECTIONS_PER_HOST       = 50
+	FETCH_REMOTE_SERVER_LIST_TIMEOUT               = 30 * time.Second
+	FETCH_REMOTE_SERVER_LIST_RETRY_PERIOD          = 5 * time.Second
+	FETCH_REMOTE_SERVER_LIST_STALE_PERIOD          = 6 * time.Hour
+	PSIPHON_API_CLIENT_SESSION_ID_LENGTH           = 16
+	PSIPHON_API_SERVER_TIMEOUT                     = 20 * time.Second
+	PSIPHON_API_STATUS_REQUEST_PERIOD_MIN          = 5 * time.Minute
+	PSIPHON_API_STATUS_REQUEST_PERIOD_MAX          = 10 * time.Minute
+	PSIPHON_API_STATUS_REQUEST_PADDING_MAX_BYTES   = 256
+	PSIPHON_API_CONNECTED_REQUEST_PERIOD           = 24 * time.Hour
+	PSIPHON_API_CONNECTED_REQUEST_RETRY_PERIOD     = 5 * time.Second
+	FETCH_ROUTES_TIMEOUT                           = 1 * time.Minute
+	DOWNLOAD_UPGRADE_TIMEOUT                       = 15 * time.Minute
+	DOWNLOAD_UPGRADE_RETRY_PAUSE_PERIOD            = 5 * time.Second
+	IMPAIRED_PROTOCOL_CLASSIFICATION_DURATION      = 2 * time.Minute
+	IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD     = 3
+	TOTAL_BYTES_TRANSFERRED_NOTICE_PERIOD          = 5 * time.Minute
 )
 
 // To distinguish omitted timeout params from explicit 0 value timeout
@@ -161,14 +164,6 @@ type Config struct {
 	// which is recommended.
 	TunnelPoolSize int
 
-	// PortForwardFailureThreshold specifies a threshold number of port forward
-	// failures (failure to connect, or I/O failure) after which the tunnel is
-	// considered to be degraded and a re-establish is launched. This facility
-	// can suffer from false positives, especially when the host client is running
-	// in configuration where domain name resolution is done as part of the port
-	// forward (as opposed to tunneling UDP, for example). The default is 0, off.
-	PortForwardFailureThreshold int
-
 	// UpstreamProxyUrl is a URL specifying an upstream proxy to use for all
 	// outbound connections. The URL should include proxy type and authentication
 	// information, as required. See example URLs here:
@@ -249,6 +244,11 @@ type Config struct {
 	// When specified, this enables use of indistinguishable TLS for HTTPS requests
 	// that require typical (system CA) server authentication.
 	TrustedCACertificatesFilename string
+
+	// DisablePeriodicSshKeepAlive indicates whether to send an SSH keepalive every
+	// 1-2 minutes, when the tunnel is idle. If the SSH keepalive times out, the tunnel
+	// is considered to have failed.
+	DisablePeriodicSshKeepAlive bool
 }
 
 // LoadConfig parses and validates a JSON format Psiphon config JSON
@@ -301,10 +301,6 @@ func LoadConfig(configJson []byte) (*Config, error) {
 		config.TunnelPoolSize = TUNNEL_POOL_SIZE
 	}
 
-	if config.PortForwardFailureThreshold == 0 {
-		config.PortForwardFailureThreshold = PORT_FORWARD_FAILURE_THRESHOLD
-	}
-
 	if config.NetworkConnectivityChecker != nil {
 		return nil, ContextError(errors.New("NetworkConnectivityChecker interface must be set at runtime"))
 	}

+ 60 - 40
psiphon/controller.go

@@ -58,6 +58,7 @@ type Controller struct {
 	splitTunnelClassifier          *SplitTunnelClassifier
 	signalFetchRemoteServerList    chan struct{}
 	impairedProtocolClassification map[string]int
+	signalReportConnected          chan struct{}
 }
 
 // NewController initializes a new controller.
@@ -97,20 +98,22 @@ func NewController(config *Config) (controller *Controller, err error) {
 		runWaitGroup:           new(sync.WaitGroup),
 		// establishedTunnels and failedTunnels buffer sizes are large enough to
 		// receive full pools of tunnels without blocking. Senders should not block.
-		establishedTunnels:       make(chan *Tunnel, config.TunnelPoolSize),
-		failedTunnels:            make(chan *Tunnel, config.TunnelPoolSize),
-		tunnels:                  make([]*Tunnel, 0),
-		establishedOnce:          false,
-		startedConnectedReporter: false,
-		startedUpgradeDownloader: false,
-		isEstablishing:           false,
-		establishPendingConns:    new(Conns),
-		untunneledPendingConns:   untunneledPendingConns,
-		untunneledDialConfig:     untunneledDialConfig,
-		// A buffer allows at least one signal to be sent even when the receiver is
-		// not listening. Senders should not block.
-		signalFetchRemoteServerList:    make(chan struct{}, 1),
+		establishedTunnels:             make(chan *Tunnel, config.TunnelPoolSize),
+		failedTunnels:                  make(chan *Tunnel, config.TunnelPoolSize),
+		tunnels:                        make([]*Tunnel, 0),
+		establishedOnce:                false,
+		startedConnectedReporter:       false,
+		startedUpgradeDownloader:       false,
+		isEstablishing:                 false,
+		establishPendingConns:          new(Conns),
+		untunneledPendingConns:         untunneledPendingConns,
+		untunneledDialConfig:           untunneledDialConfig,
 		impairedProtocolClassification: make(map[string]int),
+		// TODO: Add a buffer of 1 so we don't miss a signal while receiver is
+		// starting? Trade-off is potential back-to-back fetch remotes. As-is,
+		// establish will eventually signal another fetch remote.
+		signalFetchRemoteServerList: make(chan struct{}),
+		signalReportConnected:       make(chan struct{}),
 	}
 
 	controller.splitTunnelClassifier = NewSplitTunnelClassifier(config, controller)
@@ -274,7 +277,9 @@ func (controller *Controller) establishTunnelWatcher() {
 // comment in DoConnectedRequest for a description of the request mechanism.
 // To ensure we don't over- or under-count unique users, only one connected
 // request is made across all simultaneous multi-tunnels; and the connected
-// request is repeated periodically.
+// request is repeated periodically for very long-lived tunnels.
+// The signalReportConnected mechanism is used to trigger another connected
+// request immediately after a reconnect.
 func (controller *Controller) connectedReporter() {
 	defer controller.runWaitGroup.Done()
 loop:
@@ -302,8 +307,10 @@ loop:
 		}
 		timeout := time.After(duration)
 		select {
+		case <-controller.signalReportConnected:
 		case <-timeout:
 			// Make another connected request
+
 		case <-controller.shutdownBroadcast:
 			break loop
 		}
@@ -312,7 +319,7 @@ loop:
 	NoticeInfo("exiting connected reporter")
 }
 
-func (controller *Controller) startConnectedReporter() {
+func (controller *Controller) startOrSignalConnectedReporter() {
 	// session is nil when DisableApi is set
 	if controller.config.DisableApi {
 		return
@@ -324,6 +331,11 @@ func (controller *Controller) startConnectedReporter() {
 		controller.startedConnectedReporter = true
 		controller.runWaitGroup.Add(1)
 		go controller.connectedReporter()
+	} else {
+		select {
+		case controller.signalReportConnected <- *new(struct{}):
+		default:
+		}
 	}
 }
 
@@ -439,16 +451,39 @@ loop:
 		// !TODO! design issue: might not be enough server entries with region/caps to ever fill tunnel slots
 		// solution(?) target MIN(CountServerEntries(region, protocol), TunnelPoolSize)
 		case establishedTunnel := <-controller.establishedTunnels:
-			if controller.registerTunnel(establishedTunnel) {
-				NoticeActiveTunnel(establishedTunnel.serverEntry.IpAddress)
+			tunnelCount, registered := controller.registerTunnel(establishedTunnel)
+			if registered {
+				NoticeActiveTunnel(establishedTunnel.serverEntry.IpAddress, establishedTunnel.protocol)
+
+				if tunnelCount == 1 {
+
+					// The split tunnel classifier is started once the first tunnel is
+					// established. This first tunnel is passed in to be used to make
+					// the routes data request.
+					// A long-running controller may run while the host device is present
+					// in different regions. In this case, we want the split tunnel logic
+					// to switch to routes for new regions and not classify traffic based
+					// on routes installed for older regions.
+					// We assume that when regions change, the host network will also
+					// change, and so all tunnels will fail and be re-established. Under
+					// that assumption, the classifier will be re-Start()-ed here when
+					// the region has changed.
+					controller.splitTunnelClassifier.Start(establishedTunnel)
+
+					// Signal a connected request on each 1st tunnel establishment. For
+					// multi-tunnels, the session is connected as long as at least one
+					// tunnel is established.
+					controller.startOrSignalConnectedReporter()
+
+					controller.startClientUpgradeDownloader(establishedTunnel.session)
+				}
+
 			} else {
 				controller.discardTunnel(establishedTunnel)
 			}
 			if controller.isFullyEstablished() {
 				controller.stopEstablishing()
 			}
-			controller.startConnectedReporter()
-			controller.startClientUpgradeDownloader(establishedTunnel.session)
 
 		case <-controller.shutdownBroadcast:
 			break loop
@@ -546,40 +581,25 @@ func (controller *Controller) discardTunnel(tunnel *Tunnel) {
 // registerTunnel adds the connected tunnel to the pool of active tunnels
 // which are candidates for port forwarding. Returns true if the pool has an
 // empty slot and false if the pool is full (caller should discard the tunnel).
-func (controller *Controller) registerTunnel(tunnel *Tunnel) bool {
+func (controller *Controller) registerTunnel(tunnel *Tunnel) (int, bool) {
 	controller.tunnelMutex.Lock()
 	defer controller.tunnelMutex.Unlock()
 	if len(controller.tunnels) >= controller.config.TunnelPoolSize {
-		return false
+		return len(controller.tunnels), false
 	}
 	// Perform a final check just in case we've established
 	// a duplicate connection.
 	for _, activeTunnel := range controller.tunnels {
 		if activeTunnel.serverEntry.IpAddress == tunnel.serverEntry.IpAddress {
 			NoticeAlert("duplicate tunnel: %s", tunnel.serverEntry.IpAddress)
-			return false
+			return len(controller.tunnels), false
 		}
 	}
 	controller.establishedOnce = true
 	controller.tunnels = append(controller.tunnels, tunnel)
 	NoticeTunnels(len(controller.tunnels))
 
-	// The split tunnel classifier is started once the first tunnel is
-	// established. This first tunnel is passed in to be used to make
-	// the routes data request.
-	// A long-running controller may run while the host device is present
-	// in different regions. In this case, we want the split tunnel logic
-	// to switch to routes for new regions and not classify traffic based
-	// on routes installed for older regions.
-	// We assume that when regions change, the host network will also
-	// change, and so all tunnels will fail and be re-established. Under
-	// that assumption, the classifier will be re-Start()-ed here when
-	// the region has changed.
-	if len(controller.tunnels) == 1 {
-		controller.splitTunnelClassifier.Start(tunnel)
-	}
-
-	return true
+	return len(controller.tunnels), true
 }
 
 // hasEstablishedOnce indicates if at least one active tunnel has
@@ -798,7 +818,7 @@ loop:
 			}
 
 			// Disable impaired protocols. This is only done for the
-			// first iteration of the ESTABLISH_TUNNEL_WORK_TIME_SECONDS
+			// first iteration of the ESTABLISH_TUNNEL_WORK_TIME
 			// loop since (a) one iteration should be sufficient to
 			// evade the attack; (b) there's a good chance of false
 			// positives (such as short session durations due to network
@@ -828,7 +848,7 @@ loop:
 				break loop
 			}
 
-			if time.Now().After(startTime.Add(ESTABLISH_TUNNEL_WORK_TIME_SECONDS)) {
+			if time.Now().After(startTime.Add(ESTABLISH_TUNNEL_WORK_TIME)) {
 				// Start over, after a brief pause, with a new shuffle of the server
 				// entries, and potentially some newly fetched server entries.
 				break

+ 13 - 5
psiphon/notice.go

@@ -124,8 +124,8 @@ func NoticeConnectingServer(ipAddress, region, protocol, frontingAddress string)
 }
 
 // NoticeActiveTunnel is a successful connection that is used as an active tunnel for port forwarding
-func NoticeActiveTunnel(ipAddress string) {
-	outputNotice("ActiveTunnel", false, "ipAddress", ipAddress)
+func NoticeActiveTunnel(ipAddress, protocol string) {
+	outputNotice("ActiveTunnel", false, "ipAddress", ipAddress, "protocol", protocol)
 }
 
 // NoticeSocksProxyPortInUse is a failure to use the configured LocalSocksProxyPort
@@ -201,9 +201,17 @@ func NoticeClientUpgradeDownloaded(filename string) {
 }
 
 // NoticeBytesTransferred reports how many tunneled bytes have been
-// transferred since the last NoticeBytesTransferred.
-func NoticeBytesTransferred(sent, received int64) {
-	outputNotice("BytesTransferred", false, "sent", sent, "received", received)
+// transferred since the last NoticeBytesTransferred, for the tunnel
+// to the server at ipAddress.
+func NoticeBytesTransferred(ipAddress string, sent, received int64) {
+	outputNotice("BytesTransferred", false, "ipAddress", ipAddress, "sent", sent, "received", received)
+}
+
+// NoticeTotalBytesTransferred reports how many tunneled bytes have been
+// transferred in total up to this point, for the tunnel to the server
+// at ipAddress.
+func NoticeTotalBytesTransferred(ipAddress string, sent, received int64) {
+	outputNotice("TotalBytesTransferred", false, "ipAddress", ipAddress, "sent", sent, "received", received)
 }
 
 // NoticeLocalProxyError reports a local proxy error message. Repetitive

+ 0 - 8
psiphon/serverApi.go

@@ -44,7 +44,6 @@ type Session struct {
 	baseRequestUrl       string
 	psiphonHttpsClient   *http.Client
 	statsRegexps         *transferstats.Regexps
-	statsServerId        string
 	clientRegion         string
 	clientUpgradeVersion string
 }
@@ -77,7 +76,6 @@ func NewSession(config *Config, tunnel *Tunnel, sessionId string) (session *Sess
 		sessionId:          sessionId,
 		baseRequestUrl:     makeBaseRequestUrl(config, tunnel, sessionId),
 		psiphonHttpsClient: psiphonHttpsClient,
-		statsServerId:      tunnel.serverEntry.IpAddress,
 	}
 
 	err = session.doHandshakeRequest()
@@ -127,12 +125,6 @@ func (session *Session) DoConnectedRequest() error {
 	return nil
 }
 
-// ServerID provides a unique identifier for the server the session connects to.
-// This ID is consistent between multiple sessions/tunnels connected to that server.
-func (session *Session) StatsServerID() string {
-	return session.statsServerId
-}
-
 // StatsRegexps gets the Regexps used for the statistics for this tunnel.
 func (session *Session) StatsRegexps() *transferstats.Regexps {
 	return session.statsRegexps

+ 6 - 4
psiphon/transferstats/regexp.go

@@ -71,10 +71,12 @@ func MakeRegexps(pageViewRegexes, httpsRequestRegexes []map[string]string) (rege
 // string that should be used for stats.
 func regexHostname(hostname string, regexps *Regexps) (statsHostname string) {
 	statsHostname = "(OTHER)"
-	for _, rr := range *regexps {
-		if rr.regexp.MatchString(hostname) {
-			statsHostname = rr.regexp.ReplaceAllString(hostname, rr.replace)
-			break
+	if regexps != nil {
+		for _, rr := range *regexps {
+			if rr.regexp.MatchString(hostname) {
+				statsHostname = rr.regexp.ReplaceAllString(hostname, rr.replace)
+				break
+			}
 		}
 	}
 	return

+ 149 - 72
psiphon/tunnel.go

@@ -71,8 +71,8 @@ type Tunnel struct {
 	sshClient                *ssh.Client
 	operateWaitGroup         *sync.WaitGroup
 	shutdownOperateBroadcast chan struct{}
-	portForwardFailures      chan int
-	portForwardFailureTotal  int
+	signalPortForwardFailure chan struct{}
+	totalPortForwardFailures int
 	sessionStartTime         time.Time
 }
 
@@ -122,9 +122,10 @@ func EstablishTunnel(
 		sshClient:                sshClient,
 		operateWaitGroup:         new(sync.WaitGroup),
 		shutdownOperateBroadcast: make(chan struct{}),
-		// portForwardFailures buffer size is large enough to receive the thresold number
-		// of failure reports without blocking. Senders can drop failures without blocking.
-		portForwardFailures: make(chan int, config.PortForwardFailureThreshold)}
+		// A buffer allows at least one signal to be sent even when the receiver is
+		// not listening. Senders should not block.
+		signalPortForwardFailure: make(chan struct{}, 1),
+	}
 
 	// Create a new Psiphon API session for this tunnel. This includes performing
 	// a handshake request. If the handshake fails, this establishment fails.
@@ -214,7 +215,7 @@ func (tunnel *Tunnel) Dial(
 	if result.err != nil {
 		// TODO: conditional on type of error or error message?
 		select {
-		case tunnel.portForwardFailures <- 1:
+		case tunnel.signalPortForwardFailure <- *new(struct{}):
 		default:
 		}
 		return nil, ContextError(result.err)
@@ -225,11 +226,14 @@ func (tunnel *Tunnel) Dial(
 		tunnel:         tunnel,
 		downstreamConn: downstreamConn}
 
-	// Tunnel does not have a session when DisableApi is set
+	// Tunnel does not have a session when DisableApi is set. We still use
+	// transferstats.Conn to count bytes transferred for monitoring tunnel
+	// quality.
+	var regexps *transferstats.Regexps
 	if tunnel.session != nil {
-		conn = transferstats.NewConn(
-			conn, tunnel.session.StatsServerID(), tunnel.session.StatsRegexps())
+		regexps = tunnel.session.StatsRegexps()
 	}
+	conn = transferstats.NewConn(conn, tunnel.serverEntry.IpAddress, regexps)
 
 	return conn, nil
 }
@@ -255,11 +259,11 @@ type TunneledConn struct {
 func (conn *TunneledConn) Read(buffer []byte) (n int, err error) {
 	n, err = conn.Conn.Read(buffer)
 	if err != nil && err != io.EOF {
-		// Report 1 new failure. Won't block; assumes the receiver
+		// Report new failure. Won't block; assumes the receiver
 		// has a sufficient buffer for the threshold number of reports.
 		// TODO: conditional on type of error or error message?
 		select {
-		case conn.tunnel.portForwardFailures <- 1:
+		case conn.tunnel.signalPortForwardFailure <- *new(struct{}):
 		default:
 		}
 	}
@@ -271,7 +275,7 @@ func (conn *TunneledConn) Write(buffer []byte) (n int, err error) {
 	if err != nil && err != io.EOF {
 		// Same as TunneledConn.Read()
 		select {
-		case conn.tunnel.portForwardFailures <- 1:
+		case conn.tunnel.signalPortForwardFailure <- *new(struct{}):
 		default:
 		}
 	}
@@ -485,73 +489,124 @@ func dialSsh(
 	return conn, result.sshClient, nil
 }
 
-// operateTunnel periodically sends status requests (traffic stats updates updates)
-// to the Psiphon API; and monitors the tunnel for failures:
+// operateTunnel monitors the health of the tunnel and performs
+// periodic work.
+//
+// BytesTransferred and TotalBytesTransferred notices are emitted
+// for live reporting and diagnostics reporting, respectively.
 //
-// 1. Overall tunnel failure: the tunnel sends a signal to the ClosedSignal
-// channel on keep-alive failure and other transport I/O errors. In case
-// of such a failure, the tunnel is marked as failed.
+// Status requests are sent to the Psiphon API to report bytes
+// transferred.
 //
-// 2. Tunnel port forward failures: the tunnel connection may stay up but
-// the client may still fail to establish port forwards due to server load
-// and other conditions. After a threshold number of such failures, the
-// overall tunnel is marked as failed.
+// Periodic SSH keep alive packets are sent to ensure the underlying
+// TCP connection isn't terminated by NAT, or other network
+// interference -- or test if it has been terminated while the device
+// has been asleep. When a keep alive times out, the tunnel is
+// considered failed.
 //
-// TODO: currently, any connect (dial), read, or write error associated with
-// a port forward is counted as a failure. It may be important to differentiate
-// between failures due to Psiphon server conditions and failures due to the
-// origin/target server (in the latter case, the tunnel is healthy). Here are
-// some typical error messages to consider matching against (or ignoring):
+// An immediate SSH keep alive "probe" is sent to test the tunnel and
+// server responsiveness when a port forward failure is detected: a
+// failed dial or failed read/write. This keep alive has a shorter
+// timeout.
 //
-// - "ssh: rejected: administratively prohibited (open failed)"
-//   (this error message is reported in both actual and false cases: when a server
-//    is overloaded and has no free ephemeral ports; and when the user mistypes
-//    a domain in a browser address bar and name resolution fails)
-// - "ssh: rejected: connect failed (Connection timed out)"
-// - "write tcp ... broken pipe"
-// - "read tcp ... connection reset by peer"
-// - "ssh: unexpected packet in response to channel open: <nil>"
+// Note that port foward failures may be due to non-failure conditions.
+// For example, when the user inputs an invalid domain name and
+// resolution is done by the ssh server; or trying to connect to a
+// non-white-listed port; and the error message in these cases is not
+// distinguishable from a a true server error (a common error message,
+// "ssh: rejected: administratively prohibited (open failed)", may be
+// returned for these cases but also if the server has run out of
+// ephemeral ports, for example).
 //
-// Update: the above is superceded by SSH keep alives with timeouts. When a keep
-// alive times out, the tunnel is marked as failed. Keep alives are triggered
-// periodically, and also immediately in the case of a port forward failure (so
-// as to immediately detect a situation such as a device waking up and trying
-// to use a dead tunnel). By default, port forward theshold counting does not
-// cause a tunnel to be marked as failed, with the conservative assumption that
-// a server which responds to an SSH keep alive is fully functional.
+// SSH keep alives are not sent when the tunnel has been recently
+// active (not only does tunnel activity obviate the necessity of a keep
+// alive, testing has shown that keep alives may time out for "busy"
+// tunnels, especially over meek protocol and other high latency
+// conditions).
+//
+// "Recently active" is defined has having received payload bytes. Sent
+// bytes are not considered as testing has shown bytes may appear to
+// send when certain NAT devices have interfered with the tunnel, while
+// no bytes are received. In a pathological case, with DNS implemented
+// as tunneled UDP, a browser may wait excessively for a domain name to
+// resolve, while no new port forward is attempted which would otherwise
+// result in a tunnel failure detection.
 //
 func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 	defer tunnel.operateWaitGroup.Done()
 
+	lastBytesReceivedTime := time.Now()
+
+	lastTotalBytesTransferedTime := time.Now()
+	totalSent := int64(0)
+	totalReceived := int64(0)
+
+	// Always emit a final NoticeTotalBytesTransferred
+	defer func() {
+		NoticeTotalBytesTransferred(tunnel.serverEntry.IpAddress, totalSent, totalReceived)
+	}()
+
+	noticeBytesTransferredTicker := time.NewTicker(1 * time.Second)
+	defer noticeBytesTransferredTicker.Stop()
+
 	// The next status request and ssh keep alive times are picked at random,
 	// from a range, to make the resulting traffic less fingerprintable,
-	// especially when then tunnel is otherwise idle.
 	// Note: not using Tickers since these are not fixed time periods.
-
 	nextStatusRequestPeriod := func() time.Duration {
 		return MakeRandomPeriod(
 			PSIPHON_API_STATUS_REQUEST_PERIOD_MIN,
 			PSIPHON_API_STATUS_REQUEST_PERIOD_MAX)
 	}
+
+	statsTimer := time.NewTimer(nextStatusRequestPeriod())
+	defer statsTimer.Stop()
+
 	nextSshKeepAlivePeriod := func() time.Duration {
 		return MakeRandomPeriod(
 			TUNNEL_SSH_KEEP_ALIVE_PERIOD_MIN,
 			TUNNEL_SSH_KEEP_ALIVE_PERIOD_MAX)
 	}
 
-	// TODO: don't initialize if !config.EmitBytesTransferred
-	noticeBytesTransferredTicker := time.NewTicker(1 * time.Second)
-	if !config.EmitBytesTransferred {
-		noticeBytesTransferredTicker.Stop()
+	// TODO: don't initialize timer when config.DisablePeriodicSshKeepAlive is set
+	sshKeepAliveTimer := time.NewTimer(nextSshKeepAlivePeriod())
+	if config.DisablePeriodicSshKeepAlive {
+		sshKeepAliveTimer.Stop()
 	} else {
-		defer noticeBytesTransferredTicker.Stop()
+		defer sshKeepAliveTimer.Stop()
 	}
 
-	statsTimer := time.NewTimer(nextStatusRequestPeriod())
-	defer statsTimer.Stop()
+	// Perform network requests in separate goroutines so as not to block
+	// other operations.
+	// Note: defer LIFO dependency: channels to be closed before Wait()
+	requestsWaitGroup := new(sync.WaitGroup)
+	defer requestsWaitGroup.Wait()
 
-	sshKeepAliveTimer := time.NewTimer(nextSshKeepAlivePeriod())
-	defer sshKeepAliveTimer.Stop()
+	requestsWaitGroup.Add(1)
+	signalStatusRequest := make(chan struct{})
+	defer close(signalStatusRequest)
+	go func() {
+		defer requestsWaitGroup.Done()
+		for _ = range signalStatusRequest {
+			sendStats(tunnel)
+		}
+	}()
+
+	requestsWaitGroup.Add(1)
+	signalSshKeepAlive := make(chan time.Duration)
+	sshKeepAliveError := make(chan error, 1)
+	defer close(signalSshKeepAlive)
+	go func() {
+		defer requestsWaitGroup.Done()
+		for timeout := range signalSshKeepAlive {
+			err := sendSshKeepAlive(tunnel.sshClient, tunnel.conn, timeout)
+			if err != nil {
+				select {
+				case sshKeepAliveError <- err:
+				default:
+				}
+			}
+		}
+	}()
 
 	var err error
 	for err == nil {
@@ -559,37 +614,58 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 		case <-noticeBytesTransferredTicker.C:
 			sent, received := transferstats.GetBytesTransferredForServer(
 				tunnel.serverEntry.IpAddress)
-			// Only emit notice when tunnel is not idle.
-			if sent > 0 || received > 0 {
-				NoticeBytesTransferred(sent, received)
+
+			if received > 0 {
+				lastBytesReceivedTime = time.Now()
+			}
+
+			totalSent += sent
+			totalReceived += received
+
+			if lastTotalBytesTransferedTime.Add(TOTAL_BYTES_TRANSFERRED_NOTICE_PERIOD).Before(time.Now()) {
+				NoticeTotalBytesTransferred(tunnel.serverEntry.IpAddress, totalSent, totalReceived)
+				lastTotalBytesTransferedTime = time.Now()
+			}
+
+			// Only emit the frequent BytesTransferred notice when tunnel is not idle.
+			if config.EmitBytesTransferred && (sent > 0 || received > 0) {
+				NoticeBytesTransferred(tunnel.serverEntry.IpAddress, sent, received)
 			}
 
 		case <-statsTimer.C:
-			sendStats(tunnel)
+			select {
+			case signalStatusRequest <- *new(struct{}):
+			default:
+			}
 			statsTimer.Reset(nextStatusRequestPeriod())
 
 		case <-sshKeepAliveTimer.C:
-			err = sendSshKeepAlive(tunnel.sshClient, tunnel.conn)
+			if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PERIODIC_INACTIVE_PERIOD).Before(time.Now()) {
+				select {
+				case signalSshKeepAlive <- TUNNEL_SSH_KEEP_ALIVE_PERIODIC_TIMEOUT:
+				default:
+				}
+			}
 			sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
 
-		case failures := <-tunnel.portForwardFailures:
+		case <-tunnel.signalPortForwardFailure:
 			// Note: no mutex on portForwardFailureTotal; only referenced here
-			tunnel.portForwardFailureTotal += failures
+			tunnel.totalPortForwardFailures++
 			NoticeInfo("port forward failures for %s: %d",
-				tunnel.serverEntry.IpAddress, tunnel.portForwardFailureTotal)
-			if config.PortForwardFailureThreshold > 0 &&
-				tunnel.portForwardFailureTotal > config.PortForwardFailureThreshold {
-				err = errors.New("tunnel exceeded port forward failure threshold")
-			} else {
-				// Try an SSH keep alive to check the state of the SSH connection
-				// Some port forward failures are due to intermittent conditions
-				// on the server, so we don't abort the connection until the threshold
-				// is hit. But if we can't make a simple round trip request to the
-				// server, we'll immediately abort.
-				err = sendSshKeepAlive(tunnel.sshClient, tunnel.conn)
+				tunnel.serverEntry.IpAddress, tunnel.totalPortForwardFailures)
+
+			if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PROBE_INACTIVE_PERIOD).Before(time.Now()) {
+				select {
+				case signalSshKeepAlive <- TUNNEL_SSH_KEEP_ALIVE_PROBE_TIMEOUT:
+				default:
+				}
+			}
+			if !config.DisablePeriodicSshKeepAlive {
 				sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
 			}
 
+		case err = <-sshKeepAliveError:
+
 		case <-tunnel.shutdownOperateBroadcast:
 			// Attempt to send any remaining stats
 			sendStats(tunnel)
@@ -607,10 +683,11 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 // sendSshKeepAlive is a helper which sends a keepalive@openssh.com request
 // on the specified SSH connections and returns true of the request succeeds
 // within a specified timeout.
-func sendSshKeepAlive(sshClient *ssh.Client, conn net.Conn) error {
+func sendSshKeepAlive(
+	sshClient *ssh.Client, conn net.Conn, timeout time.Duration) error {
 
 	errChannel := make(chan error, 2)
-	time.AfterFunc(TUNNEL_SSH_KEEP_ALIVE_TIMEOUT, func() {
+	time.AfterFunc(timeout, func() {
 		errChannel <- TimeoutError{}
 	})