Ver código fonte

Merge pull request #104 from rod-hynes/master

Fixes and enhancements related to Psiphon Android conversion to tunnel-core
Rod Hynes 10 anos atrás
pai
commit
1ebf50b92f

+ 7 - 34
AndroidLibrary/psi/psi.go

@@ -74,40 +74,13 @@ func Start(
 		return fmt.Errorf("error initializing datastore: %s", err)
 	}
 
-	// If specified, the embedded server list is loaded and stored. When there
-	// are no server candidates at all, we wait for this import to complete
-	// before starting the Psiphon controller. Otherwise, we import while
-	// concurrently starting the controller to minimize delay before attempting
-	// to connect to existing candidate servers.
-	// If the import fails, an error notice is emitted, but the controller is
-	// still started: either existing candidate servers may suffice, or the
-	// remote server list fetch may obtain candidate servers.
-	// TODO: duplicates logic in psiphonClient.go -- refactor?
-	if embeddedServerEntryList != "" {
-		embeddedServerListWaitGroup := new(sync.WaitGroup)
-		embeddedServerListWaitGroup.Add(1)
-		go func() {
-			defer embeddedServerListWaitGroup.Done()
-			// TODO: stream embedded server list data?
-			serverEntries, err := psiphon.DecodeAndValidateServerEntryList(embeddedServerEntryList)
-			if err != nil {
-				psiphon.NoticeError("error decoding embedded server entry list file: %s", err)
-				return
-			}
-			// Since embedded server list entries may become stale, they will not
-			// overwrite existing stored entries for the same server.
-			err = psiphon.StoreServerEntries(serverEntries, false)
-			if err != nil {
-				psiphon.NoticeError("error storing embedded server entry list data: %s", err)
-				return
-			}
-		}()
-
-		if psiphon.CountServerEntries(config.EgressRegion, config.TunnelProtocol) == 0 {
-			embeddedServerListWaitGroup.Wait()
-		} else {
-			defer embeddedServerListWaitGroup.Wait()
-		}
+	serverEntries, err := psiphon.DecodeAndValidateServerEntryList(embeddedServerEntryList)
+	if err != nil {
+		return fmt.Errorf("error decoding embedded server entry list: %s", err)
+	}
+	err = psiphon.StoreServerEntries(serverEntries, false)
+	if err != nil {
+		return fmt.Errorf("error storing embedded server entry list: %s", err)
 	}
 
 	controller, err = psiphon.NewController(config)

+ 113 - 73
SampleApps/Psibot/app/src/main/java/ca/psiphon/PsiphonVpn.java → SampleApps/Psibot/app/src/main/java/ca/psiphon/PsiphonTunnel.java

@@ -29,12 +29,11 @@ import android.os.Build;
 import android.os.ParcelFileDescriptor;
 
 import org.apache.http.conn.util.InetAddressUtils;
+import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
@@ -50,16 +49,27 @@ import java.util.Map;
 
 import go.psi.Psi;
 
-public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
+public class PsiphonTunnel extends Psi.PsiphonProvider.Stub {
 
     public interface HostService {
         public String getAppName();
+        public Context getContext();
         public VpnService getVpnService();
         public VpnService.Builder newVpnServiceBuilder();
-        public InputStream getPsiphonConfigResource();
-        public void customizeConfigParameters(JSONObject config);
-        public void logWarning(String message);
-        public void logInfo(String message);
+        public String getPsiphonConfig();
+        public void onDiagnosticMessage(String message);
+        public void onAvailableEgressRegions(List<String> regions);
+        public void onSocksProxyPortInUse(int port);
+        public void onHttpProxyPortInUse(int port);
+        public void onListeningSocksProxyPort(int port);
+        public void onListeningHttpProxyPort(int port);
+        public void onUpstreamProxyError(String message);
+        public void onConnecting();
+        public void onConnected();
+        public void onHomepage(String url);
+        public void onClientUpgradeDownloaded(String filename);
+        public void onSplitTunnelRegion(String region);
+        public void onUntunneledAddress(String address);
     }
 
     private final HostService mHostService;
@@ -71,19 +81,19 @@ public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
 
     // Only one PsiphonVpn instance may exist at a time, as the underlying
     // go.psi.Psi and tun2socks implementations each contain global state.
-    private static PsiphonVpn mPsiphonVpn;
+    private static PsiphonTunnel mPsiphonTunnel;
 
-    public static synchronized PsiphonVpn newPsiphonVpn(HostService hostService) {
-        if (mPsiphonVpn != null) {
-            mPsiphonVpn.stop();
+    public static synchronized PsiphonTunnel newPsiphonVpn(HostService hostService) {
+        if (mPsiphonTunnel != null) {
+            mPsiphonTunnel.stop();
         }
         // Load the native go code embedded in psi.aar
         System.loadLibrary("gojni");
-        mPsiphonVpn = new PsiphonVpn(hostService);
-        return mPsiphonVpn;
+        mPsiphonTunnel = new PsiphonTunnel(hostService);
+        return mPsiphonTunnel;
     }
 
-    private PsiphonVpn(HostService hostService) {
+    private PsiphonTunnel(HostService hostService) {
         mHostService = hostService;
         mLocalSocksProxyPort = 0;
         mRoutingThroughTunnel = false;
@@ -110,22 +120,18 @@ public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
     // Throws an exception in error conditions. In the case of an exception, the routing
     // started by startRouting() is not immediately torn down (this allows the caller to control
     // exactly when VPN routing is stopped); caller should call stop() to clean up.
-    public synchronized void startTunneling() throws Exception {
-        if (mTunFd == null) {
-            // Most likely, startRouting() was not called before this function.
-            throw new Exception("startTunneling: missing tun fd");
-        }
-        startPsiphon();
+    public synchronized void startTunneling(String embeddedServerEntries) throws Exception {
+        startPsiphon(embeddedServerEntries);
     }
 
     public synchronized void restartPsiphon() throws Exception {
         stopPsiphon();
-        startPsiphon();
+        startPsiphon("");
     }
 
     public synchronized void stop() {
-        stopPsiphon();
         stopVpn();
+        stopPsiphon();
         mLocalSocksProxyPort = 0;
     }
 
@@ -138,6 +144,7 @@ public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
     private final static int UDPGW_SERVER_PORT = 7300;
     private final static String DEFAULT_DNS_SERVER = "8.8.4.4";
 
+    @TargetApi(Build.VERSION_CODES.ICE_CREAM_SANDWICH)
     private boolean startVpn() throws Exception {
 
         mPrivateAddress = selectPrivateAddress();
@@ -162,7 +169,7 @@ public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
                 // this application is no longer prepared or was revoked.
                 return false;
             }
-            mHostService.logInfo("VPN established");
+            mHostService.onDiagnosticMessage("VPN established");
 
         } catch(IllegalArgumentException e) {
             throw new Exception(errorMessage, e);
@@ -198,7 +205,7 @@ public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
                 udpgwServerAddress,
                 true);
         mTunFd = null;
-        mHostService.logInfo("routing through tunnel");
+        mHostService.onDiagnosticMessage("routing through tunnel");
 
         // TODO: should double-check tunnel routing; see:
         // https://bitbucket.org/psiphon/psiphon-circumvention-system/src/1dc5e4257dca99790109f3bf374e8ab3a0ead4d7/Android/PsiphonAndroidLibrary/src/com/psiphon3/psiphonlibrary/TunnelCore.java?at=default#cl-779
@@ -212,10 +219,12 @@ public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
             }
             mTunFd = null;
         }
-        stopTun2Socks();
-        mRoutingThroughTunnel = false;
+        if (mRoutingThroughTunnel) {
+            stopTun2Socks();
+            mRoutingThroughTunnel = false;
+        }
     }
-
+    
     //----------------------------------------------------------------------------------------------
     // PsiphonProvider (Core support) interface implementation
     //----------------------------------------------------------------------------------------------
@@ -225,6 +234,7 @@ public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
         handlePsiphonNotice(noticeJSON);
     }
 
+    @TargetApi(Build.VERSION_CODES.ICE_CREAM_SANDWICH)
     @Override
     public void BindToDevice(long fileDescriptor) throws Exception {
         if (!mHostService.getVpnService().protect((int)fileDescriptor)) {
@@ -235,7 +245,7 @@ public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
     @Override
     public long HasNetworkConnectivity() {
         // TODO: change to bool return value once gobind supports that type
-        return hasNetworkConnectivity(mHostService.getVpnService()) ? 1 : 0;
+        return hasNetworkConnectivity(mHostService.getContext()) ? 1 : 0;
     }
 
     @Override
@@ -244,7 +254,7 @@ public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
         try {
             dnsResolver = getFirstActiveNetworkDnsResolver(mHostService.getVpnService());
         } catch (Exception e) {
-            mHostService.logWarning("failed to get active network DNS resolver: " + e.getMessage());
+            mHostService.onDiagnosticMessage("failed to get active network DNS resolver: " + e.getMessage());
             dnsResolver = DEFAULT_DNS_SERVER;
         }
         return dnsResolver;
@@ -254,24 +264,26 @@ public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
     // Psiphon Tunnel Core
     //----------------------------------------------------------------------------------------------
 
-    private void startPsiphon() throws Exception {
+    private void startPsiphon(String embeddedServerEntries) throws Exception {
         stopPsiphon();
-        mHostService.logInfo("starting Psiphon");
+        mHostService.onDiagnosticMessage("starting Psiphon library");
         try {
+            boolean useDeviceBinder = (mTunFd != null);
             Psi.Start(
-                loadPsiphonConfig(mHostService.getVpnService()),
-                "", // TODO: supply embedded server list
-                this);
+                loadPsiphonConfig(mHostService.getContext()),
+                embeddedServerEntries,
+                this,
+                useDeviceBinder);
         } catch (java.lang.Exception e) {
-            throw new Exception("failed to start Psiphon", e);
+            throw new Exception("failed to start Psiphon library", e);
         }
-        mHostService.logInfo("Psiphon started");
+        mHostService.onDiagnosticMessage("Psiphon library started");
     }
 
     private void stopPsiphon() {
-        mHostService.logInfo("stopping Psiphon");
+        mHostService.onDiagnosticMessage("stopping Psiphon library");
         Psi.Stop();
-        mHostService.logInfo("Psiphon stopped");
+        mHostService.onDiagnosticMessage("Psiphon library stopped");
     }
 
     private String loadPsiphonConfig(Context context)
@@ -279,9 +291,7 @@ public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
 
         // Load settings from the raw resource JSON config file and
         // update as necessary. Then write JSON to disk for the Go client.
-        JSONObject json = new JSONObject(
-                readInputStreamToString(
-                    mHostService.getPsiphonConfigResource()));
+        JSONObject json = new JSONObject(mHostService.getPsiphonConfig());
 
         // On Android, these directories must be set to the app private storage area.
         // The Psiphon library won't be able to use its current working directory
@@ -289,7 +299,11 @@ public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
         json.put("DataStoreDirectory", context.getFilesDir());
         json.put("DataStoreTempDirectory", context.getCacheDir());
 
-        mPsiphonVpn.mHostService.customizeConfigParameters(json);
+        // Note: onConnecting/onConnected logic assumes 1 tunnel connection
+        json.put("TunnelPoolSize", 1);
+
+        // Continue to run indefinitely until connected
+        json.put("EstablishTunnelTimeoutSeconds", 0);
 
         if (mLocalSocksProxyPort != 0) {
             // When mLocalSocksProxyPort is set, tun2socks is already configured
@@ -304,22 +318,68 @@ public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
 
     private void handlePsiphonNotice(String noticeJSON) {
         try {
+            // All notices are sent on as diagnostic messages
+            // except those that may contain private user data.
+            boolean diagnostic = true;
+            
             JSONObject notice = new JSONObject(noticeJSON);
             String noticeType = notice.getString("noticeType");
+            
             if (noticeType.equals("Tunnels")) {
                 int count = notice.getJSONObject("data").getInt("count");
                 if (count > 0) {
-                    routeThroughTunnel();
+                    if (mTunFd != null) {
+                        routeThroughTunnel();
+                    }
+                    mHostService.onConnected();
+                } else {
+                    mHostService.onConnecting();
+                }
+
+            } else if (noticeType.equals("AvailableEgressRegions")) {
+                JSONArray egressRegions = notice.getJSONObject("data").getJSONArray("regions");
+                ArrayList<String> regions = new ArrayList<String>();
+                for (int i=0; i<egressRegions.length(); i++) {
+                    regions.add(egressRegions.getString(i));
                 }
+                mHostService.onAvailableEgressRegions(regions);
+                
+            } else if (noticeType.equals("SocksProxyPortInUse")) {
+                mHostService.onSocksProxyPortInUse(notice.getJSONObject("data").getInt("port"));
+
+            } else if (noticeType.equals("HttpProxyPortInUse")) {
+                mHostService.onHttpProxyPortInUse(notice.getJSONObject("data").getInt("port"));
+
             } else if (noticeType.equals("ListeningSocksProxyPort")) {
-                setLocalSocksProxyPort(notice.getJSONObject("data").getInt("port"));
-            /*
+                int port = notice.getJSONObject("data").getInt("port");
+                setLocalSocksProxyPort(port);
+                mHostService.onListeningSocksProxyPort(port);
+
+            } else if (noticeType.equals("ListeningHttpProxyPort")) {
+                int port = notice.getJSONObject("data").getInt("port");
+                mHostService.onListeningHttpProxyPort(port);
+
+            } else if (noticeType.equals("UpstreamProxyError")) {
+                mHostService.onUpstreamProxyError(notice.getJSONObject("data").getString("message"));
+
+            } else if (noticeType.equals("ClientUpgradeDownloaded")) {
+                mHostService.onHomepage(notice.getJSONObject("data").getString("filename"));
+
             } else if (noticeType.equals("Homepage")) {
-                String homePage = notice.getJSONObject("data").getString("url");
-            */
+                mHostService.onHomepage(notice.getJSONObject("data").getString("url"));
+
+            } else if (noticeType.equals("SplitTunnelRegion")) {
+                mHostService.onHomepage(notice.getJSONObject("data").getString("region"));
+
+            } else if (noticeType.equals("UntunneledAddress")) {
+                mHostService.onHomepage(notice.getJSONObject("data").getString("address"));
+            }
+
+            if (diagnostic) {
+                String diagnosticMessage = noticeType + ": " + notice.getJSONObject("data").toString();
+                mHostService.onDiagnosticMessage(diagnosticMessage);
             }
-            String displayNotice = noticeType + " " + notice.getJSONObject("data").toString();
-            mHostService.logInfo(displayNotice);
+
         } catch (JSONException e) {
             // Ignore notice
         }
@@ -329,6 +389,7 @@ public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
     // Tun2Socks
     //----------------------------------------------------------------------------------------------
 
+    @TargetApi(Build.VERSION_CODES.HONEYCOMB_MR1)
     private void startTun2Socks(
             final ParcelFileDescriptor vpnInterfaceFileDescriptor,
             final int vpnInterfaceMTU,
@@ -351,9 +412,8 @@ public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
                         udpgwTransparentDNS ? 1 : 0);
             }
         });
-        mPsiphonVpn = this;
         mTun2SocksThread.start();
-        mHostService.logInfo("tun2socks started");
+        mHostService.onDiagnosticMessage("tun2socks started");
     }
 
     private void stopTun2Socks() {
@@ -365,13 +425,13 @@ public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
                 Thread.currentThread().interrupt();
             }
             mTun2SocksThread = null;
-            mHostService.logInfo("tun2socks stopped");
+            mHostService.onDiagnosticMessage("tun2socks stopped");
         }
     }
 
     public static void logTun2Socks(String level, String channel, String msg) {
         String logMsg = "tun2socks: " + level + "(" + channel + "): " + msg;
-        mPsiphonVpn.mHostService.logWarning(logMsg);
+        mPsiphonTunnel.mHostService.onDiagnosticMessage(logMsg);
     }
 
     private native static int runTun2Socks(
@@ -516,26 +576,6 @@ public class PsiphonVpn extends Psi.PsiphonProvider.Stub {
         return dnsAddresses;
     }
 
-    //----------------------------------------------------------------------------------------------
-    // Implementation: Resource Utils
-    //----------------------------------------------------------------------------------------------
-
-    private static String readInputStreamToString(InputStream inputStream) throws IOException {
-        return new String(readInputStreamToBytes(inputStream), "UTF-8");
-    }
-
-    private static byte[] readInputStreamToBytes(InputStream inputStream) throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        int readCount;
-        byte[] buffer = new byte[16384];
-        while ((readCount = inputStream.read(buffer, 0, buffer.length)) != -1) {
-            outputStream.write(buffer, 0, readCount);
-        }
-        outputStream.flush();
-        inputStream.close();
-        return outputStream.toByteArray();
-    }
-
     //----------------------------------------------------------------------------------------------
     // Exception
     //----------------------------------------------------------------------------------------------

+ 112 - 28
SampleApps/Psibot/app/src/main/java/ca/psiphon/psibot/Service.java

@@ -21,6 +21,7 @@ package ca.psiphon.psibot;
 
 import android.app.Notification;
 import android.app.PendingIntent;
+import android.content.Context;
 import android.content.Intent;
 import android.content.SharedPreferences;
 import android.net.VpnService;
@@ -29,27 +30,30 @@ import android.preference.PreferenceManager;
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.InputStream;
+import java.util.List;
 
-import ca.psiphon.PsiphonVpn;
+import ca.psiphon.PsiphonTunnel;
 
 public class Service extends VpnService
-        implements PsiphonVpn.HostService, SharedPreferences.OnSharedPreferenceChangeListener {
+        implements PsiphonTunnel.HostService, SharedPreferences.OnSharedPreferenceChangeListener {
 
-    private PsiphonVpn mPsiphonVpn;
+    private PsiphonTunnel mPsiphonTunnel;
 
     @Override
     public void onCreate() {
-        mPsiphonVpn = PsiphonVpn.newPsiphonVpn(this);
+        mPsiphonTunnel = PsiphonTunnel.newPsiphonVpn(this);
         startForeground(R.string.foregroundServiceNotificationId, makeForegroundNotification());
         try {
-            if (!mPsiphonVpn.startRouting()) {
-                throw new PsiphonVpn.Exception("VPN not prepared");
+            if (!mPsiphonTunnel.startRouting()) {
+                throw new PsiphonTunnel.Exception("VPN not prepared");
             }
-            mPsiphonVpn.startTunneling();
-        } catch (PsiphonVpn.Exception e) {
+            mPsiphonTunnel.startTunneling("");
+        } catch (PsiphonTunnel.Exception e) {
             Log.addEntry("failed to start Psiphon VPN: " + e.getMessage());
-            mPsiphonVpn.stop();
+            mPsiphonTunnel.stop();
             stopSelf();
         }
         PreferenceManager.getDefaultSharedPreferences(this).
@@ -60,17 +64,17 @@ public class Service extends VpnService
     public void onDestroy() {
         PreferenceManager.getDefaultSharedPreferences(this).
                 unregisterOnSharedPreferenceChangeListener(this);
-        mPsiphonVpn.stop();
+        mPsiphonTunnel.stop();
         stopForeground(true);
     }
 
     @Override
     public synchronized void onSharedPreferenceChanged(SharedPreferences sharedPreferences, String key) {
         try {
-            mPsiphonVpn.restartPsiphon();
-        } catch (PsiphonVpn.Exception e) {
+            mPsiphonTunnel.restartPsiphon();
+        } catch (PsiphonTunnel.Exception e) {
             Log.addEntry("failed to restart Psiphon: " + e.getMessage());
-            mPsiphonVpn.stop();
+            mPsiphonTunnel.stop();
             stopSelf();
         }
     }
@@ -81,27 +85,31 @@ public class Service extends VpnService
     }
 
     @Override
-    public VpnService getVpnService() {
+    public Context getContext() {
         return this;
     }
 
     @Override
-    public VpnService.Builder newVpnServiceBuilder() {
-        return new VpnService.Builder();
+    public VpnService getVpnService() {
+        return this;
     }
 
     @Override
-    public InputStream getPsiphonConfigResource() {
-        return getResources().openRawResource(R.raw.psiphon_config);
+    public VpnService.Builder newVpnServiceBuilder() {
+        return new VpnService.Builder();
     }
 
     @Override
-    public void customizeConfigParameters(JSONObject config) {
-        // User-specified settings.
-        // Note: currently, validation is not comprehensive, and related errors are
-        // not directly parsed.
-        SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
+    public String getPsiphonConfig() {
         try {
+            JSONObject config = new JSONObject(
+                    readInputStreamToString(
+                        getResources().openRawResource(R.raw.psiphon_config)));
+
+            // Insert user-specified settings.
+            // Note: currently, validation is not comprehensive, and related errors are
+            // not directly parsed.
+            SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
             config.put("EgressRegion",
                     preferences.getString(
                             getString(R.string.preferenceEgressRegion),
@@ -139,21 +147,97 @@ public class Service extends VpnService
                             preferences.getString(
                                     getString(R.string.preferencePortForwardFailureThreshold),
                                     getString(R.string.preferencePortForwardFailureThresholdDefaultValue))));
+
+            return config.toString();
+
+        } catch (IOException e) {
+            Log.addEntry("error setting config parameters: " + e.getMessage());
         } catch (JSONException e) {
             Log.addEntry("error setting config parameters: " + e.getMessage());
         }
+        return "";
     }
 
     @Override
-    public void logWarning(String message) {
-        android.util.Log.w(getString(R.string.app_name), message);
+    public void onDiagnosticMessage(String message) {
+        android.util.Log.i(getString(R.string.app_name), message);
         Log.addEntry(message);
     }
 
     @Override
-    public void logInfo(String message) {
-        android.util.Log.i(getString(R.string.app_name), message);
-        Log.addEntry(message);
+    public void onAvailableEgressRegions(List<String> regions) {
+        // TODO: show only available regions in SettingActivity
+    }
+
+    @Override
+    public void onSocksProxyPortInUse(int port) {
+        Log.addEntry("local SOCKS proxy port in use: " + Integer.toString(port));
+    }
+
+    @Override
+    public void onHttpProxyPortInUse(int port) {
+        Log.addEntry("local HTTP proxy port in use: " + Integer.toString(port));
+    }
+
+    @Override
+    public void onListeningSocksProxyPort(int port) {
+        Log.addEntry("local SOCKS proxy listening on port: " + Integer.toString(port));
+    }
+
+    @Override
+    public void onListeningHttpProxyPort(int port) {
+        Log.addEntry("local HTTP proxy listening on port: " + Integer.toString(port));
+    }
+
+    @Override
+    public void onUpstreamProxyError(String message) {
+        Log.addEntry("upstream proxy error: " + message);
+    }
+
+    @Override
+    public void onConnecting() {
+        Log.addEntry("connecting...");
+    }
+
+    @Override
+    public void onConnected() {
+        Log.addEntry("connected");
+    }
+
+    @Override
+    public void onHomepage(String url) {
+        Log.addEntry("home page: " + url);
+    }
+
+    @Override
+    public void onClientUpgradeDownloaded(String filename) {
+        Log.addEntry("client upgrade downloaded");
+    }
+
+    @Override
+    public void onSplitTunnelRegion(String region) {
+        Log.addEntry("split tunnel region: " + region);
+    }
+
+    @Override
+    public void onUntunneledAddress(String address) {
+        Log.addEntry("untunneled address: " + address);
+    }
+
+    private static String readInputStreamToString(InputStream inputStream) throws IOException {
+        return new String(readInputStreamToBytes(inputStream), "UTF-8");
+    }
+
+    private static byte[] readInputStreamToBytes(InputStream inputStream) throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        int readCount;
+        byte[] buffer = new byte[16384];
+        while ((readCount = inputStream.read(buffer, 0, buffer.length)) != -1) {
+            outputStream.write(buffer, 0, readCount);
+        }
+        outputStream.flush();
+        inputStream.close();
+        return outputStream.toByteArray();
     }
 
     private Notification makeForegroundNotification() {

BIN
SampleApps/Psibot/app/src/main/jniLibs/armeabi-v7a/libtun2socks.so


+ 5 - 79
psiphon/TCPConn.go

@@ -31,20 +31,15 @@ import (
 // TCPConn is a customized TCP connection that:
 // - can be interrupted while connecting;
 // - implements a connect timeout;
-// - implements idle read/write timeouts;
 // - uses an upstream proxy when specified, and includes
 //   upstream proxy dialing in the connect timeout;
 // - can be bound to a specific system device (for Android VpnService
 //   routing compatibility, for example);
-// - implements the psiphon.Conn interface
 type TCPConn struct {
 	net.Conn
 	mutex         sync.Mutex
 	isClosed      bool
-	closedSignal  chan struct{}
 	interruptible interruptibleTCPSocket
-	readTimeout   time.Duration
-	writeTimeout  time.Duration
 }
 
 // NewTCPDialer creates a TCPDialer.
@@ -69,14 +64,6 @@ func makeTCPDialer(config *DialConfig) func(network, addr string) (net.Conn, err
 		if err != nil {
 			return nil, ContextError(err)
 		}
-		if config.ClosedSignal != nil {
-			if !conn.SetClosedSignal(config.ClosedSignal) {
-				// Conn is already closed. This is not unexpected -- for example,
-				// when establish is interrupted.
-				// TODO: make this not log an error when called from establishTunnelWorker?
-				return nil, ContextError(errors.New("conn already closed"))
-			}
-		}
 		return conn, nil
 	}
 
@@ -122,82 +109,21 @@ func makeTCPDialer(config *DialConfig) func(network, addr string) (net.Conn, err
 	return dialer
 }
 
-// SetClosedSignal implements psiphon.Conn.SetClosedSignal.
-func (conn *TCPConn) SetClosedSignal(closedSignal chan struct{}) bool {
-	conn.mutex.Lock()
-	defer conn.mutex.Unlock()
-	if conn.isClosed {
-		return false
-	}
-	conn.closedSignal = closedSignal
-	return true
-}
-
 // Close terminates a connected (net.Conn) or connecting (socketFd) TCPConn.
-// A mutex is required to support psiphon.Conn.SetClosedSignal concurrency semantics.
+// A mutex is required to support net.Conn concurrency semantics.
+// Note also use of mutex around conn.interruptible and conn.Conn in
+// TCPConn_unix.go.
 func (conn *TCPConn) Close() (err error) {
 	conn.mutex.Lock()
 	defer conn.mutex.Unlock()
+
 	if !conn.isClosed {
+		conn.isClosed = true
 		if conn.Conn == nil {
 			err = interruptibleTCPClose(conn.interruptible)
 		} else {
 			err = conn.Conn.Close()
 		}
-		conn.isClosed = true
-		select {
-		case conn.closedSignal <- *new(struct{}):
-		default:
-		}
 	}
 	return err
 }
-
-// Read wraps standard Read to add an idle timeout. The connection
-// is explicitly closed on timeout.
-func (conn *TCPConn) Read(buffer []byte) (n int, err error) {
-	// Note: no mutex on the conn.readTimeout access
-	if conn.readTimeout != 0 {
-		err = conn.Conn.SetReadDeadline(time.Now().Add(conn.readTimeout))
-		if err != nil {
-			return 0, ContextError(err)
-		}
-	}
-	n, err = conn.Conn.Read(buffer)
-	if err != nil {
-		conn.Close()
-	}
-	return
-}
-
-// Write wraps standard Write to add an idle timeout The connection
-// is explicitly closed on timeout.
-func (conn *TCPConn) Write(buffer []byte) (n int, err error) {
-	// Note: no mutex on the conn.writeTimeout access
-	if conn.writeTimeout != 0 {
-		err = conn.Conn.SetWriteDeadline(time.Now().Add(conn.writeTimeout))
-		if err != nil {
-			return 0, ContextError(err)
-		}
-	}
-	n, err = conn.Conn.Write(buffer)
-	if err != nil {
-		conn.Close()
-	}
-	return
-}
-
-// Override implementation of net.Conn.SetDeadline
-func (conn *TCPConn) SetDeadline(t time.Time) error {
-	return errors.New("net.Conn SetDeadline not supported")
-}
-
-// Override implementation of net.Conn.SetReadDeadline
-func (conn *TCPConn) SetReadDeadline(t time.Time) error {
-	return errors.New("net.Conn SetReadDeadline not supported")
-}
-
-// Override implementation of net.Conn.SetWriteDeadline
-func (conn *TCPConn) SetWriteDeadline(t time.Time) error {
-	return errors.New("net.Conn SetWriteDeadline not supported")
-}

+ 49 - 53
psiphon/TCPConn_unix.go

@@ -44,29 +44,7 @@ const _INVALID_FD = -1
 // To implement socket device binding and interruptible connecting, the lower-level
 // syscall APIs are used. The sequence of syscalls in this implementation are
 // taken from: https://code.google.com/p/go/issues/detail?id=6966
-func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err error) {
-
-	// Create a socket and then, before connecting, add a TCPConn with
-	// the unconnected socket to pendingConns. This allows pendingConns to
-	// abort connections in progress.
-	socketFd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
-	if err != nil {
-		return nil, ContextError(err)
-	}
-	defer func() {
-		// Cleanup on error
-		// (socketFd is reset to _INVALID_FD once it should no longer be closed)
-		if err != nil && socketFd != _INVALID_FD {
-			syscall.Close(socketFd)
-		}
-	}()
-
-	if config.DeviceBinder != nil {
-		err = config.DeviceBinder.BindToDevice(socketFd)
-		if err != nil {
-			return nil, ContextError(fmt.Errorf("BindToDevice failed: %s", err))
-		}
-	}
+func interruptibleTCPDial(addr string, config *DialConfig) (*TCPConn, error) {
 
 	// Get the remote IP and port, resolving a domain name if necessary
 	// TODO: domain name resolution isn't interruptible
@@ -100,16 +78,44 @@ func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err e
 	var ip [4]byte
 	copy(ip[:], ipAddrs[index].To4())
 
-	// Enable interruption
-	conn = &TCPConn{
-		interruptible: interruptibleTCPSocket{socketFd: socketFd},
-		readTimeout:   config.ReadTimeout,
-		writeTimeout:  config.WriteTimeout}
+	// Create a socket and then, before connecting, add a TCPConn with
+	// the unconnected socket to pendingConns. This allows pendingConns to
+	// interrupt/abort connections in progress.
+	socketFd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
+	if err != nil {
+		return nil, ContextError(err)
+	}
 
+	conn := &TCPConn{interruptible: interruptibleTCPSocket{socketFd: socketFd}}
+
+	// Cleanup on error
+	defer func() {
+		// Mutex required since conn may be in pendingConns, through which
+		// conn.Close() may be called from another goroutine. There are two
+		// risks:
+		// 1. standard race conditions reading/writing conn members.
+		// 2. closing the fd more than once, with the chance that other
+		//    concurrent goroutines or threads may have already reused the fd.
+		conn.mutex.Lock()
+		if err != nil && conn.interruptible.socketFd != _INVALID_FD {
+			syscall.Close(conn.interruptible.socketFd)
+			conn.interruptible.socketFd = _INVALID_FD
+		}
+		conn.mutex.Unlock()
+	}()
+
+	// Enable interruption
 	if !config.PendingConns.Add(conn) {
 		return nil, ContextError(errors.New("pending connections already closed"))
 	}
 
+	if config.DeviceBinder != nil {
+		err = config.DeviceBinder.BindToDevice(conn.interruptible.socketFd)
+		if err != nil {
+			return nil, ContextError(fmt.Errorf("BindToDevice failed: %s", err))
+		}
+	}
+
 	// Connect the socket
 	// TODO: adjust the timeout to account for time spent resolving hostname
 	sockAddr := syscall.SockaddrInet4{Addr: ip, Port: port}
@@ -119,52 +125,42 @@ func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err e
 			errChannel <- errors.New("connect timeout")
 		})
 		go func() {
-			errChannel <- syscall.Connect(socketFd, &sockAddr)
+			errChannel <- syscall.Connect(conn.interruptible.socketFd, &sockAddr)
 		}()
 		err = <-errChannel
 	} else {
-		err = syscall.Connect(socketFd, &sockAddr)
+		err = syscall.Connect(conn.interruptible.socketFd, &sockAddr)
 	}
-
-	// Mutex required for writing to conn, since conn remains in
-	// pendingConns, through which conn.Close() may be called from
-	// another goroutine.
-
-	conn.mutex.Lock()
-
-	// From this point, ensure conn.interruptible.socketFd is reset
-	// since the fd value may be reused for a different file or socket
-	// before Close() -- and interruptibleTCPClose() -- is called for
-	// this conn.
-	conn.interruptible.socketFd = _INVALID_FD // (requires mutex)
-
-	// This is the syscall.Connect result
 	if err != nil {
-		conn.mutex.Unlock()
 		return nil, ContextError(err)
 	}
 
 	// Convert the socket fd to a net.Conn
+	// See mutex note above.
+	conn.mutex.Lock()
 
-	file := os.NewFile(uintptr(socketFd), "")
-	fileConn, err := net.FileConn(file)
-	file.Close()
-	// No more deferred fd clean up on err
-	socketFd = _INVALID_FD
+	file := os.NewFile(uintptr(conn.interruptible.socketFd), "")
+	fileConn, err := net.FileConn(file) // net.FileConn() dups the fd
+	file.Close()                        // file.Close() closes the fd
+	conn.interruptible.socketFd = _INVALID_FD
 	if err != nil {
 		conn.mutex.Unlock()
 		return nil, ContextError(err)
 	}
-	conn.Conn = fileConn // (requires mutex)
-
+	conn.Conn = fileConn
 	conn.mutex.Unlock()
 
 	return conn, nil
 }
 
 func interruptibleTCPClose(interruptible interruptibleTCPSocket) error {
+
+	// Assumes conn.mutex is held
+
 	if interruptible.socketFd == _INVALID_FD {
 		return nil
 	}
-	return syscall.Close(interruptible.socketFd)
+	err := syscall.Close(interruptible.socketFd)
+	interruptible.socketFd = _INVALID_FD
+	return err
 }

+ 1 - 3
psiphon/TCPConn_windows.go

@@ -52,9 +52,7 @@ func interruptibleTCPDial(addr string, config *DialConfig) (conn *TCPConn, err e
 
 	// Enable interruption
 	conn = &TCPConn{
-		interruptible: interruptibleTCPSocket{results: make(chan *interruptibleDialResult, 2)},
-		readTimeout:   config.ReadTimeout,
-		writeTimeout:  config.WriteTimeout}
+		interruptible: interruptibleTCPSocket{results: make(chan *interruptibleDialResult, 2)}}
 
 	if !config.PendingConns.Add(conn) {
 		return nil, ContextError(errors.New("pending connections already closed"))

+ 2 - 3
psiphon/config.go

@@ -34,9 +34,8 @@ const (
 	CONNECTION_WORKER_POOL_SIZE                  = 10
 	TUNNEL_POOL_SIZE                             = 1
 	TUNNEL_CONNECT_TIMEOUT                       = 15 * time.Second
-	TUNNEL_READ_TIMEOUT                          = 0 * time.Second
-	TUNNEL_WRITE_TIMEOUT                         = 5 * time.Second
 	TUNNEL_OPERATE_SHUTDOWN_TIMEOUT              = 2 * time.Second
+	TUNNEL_PORT_FORWARD_DIAL_TIMEOUT             = 10 * time.Second
 	TUNNEL_SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES      = 256
 	TUNNEL_SSH_KEEP_ALIVE_PERIOD_MIN             = 60 * time.Second
 	TUNNEL_SSH_KEEP_ALIVE_PERIOD_MAX             = 120 * time.Second
@@ -44,7 +43,7 @@ const (
 	ESTABLISH_TUNNEL_TIMEOUT_SECONDS             = 300
 	ESTABLISH_TUNNEL_WORK_TIME_SECONDS           = 60 * time.Second
 	ESTABLISH_TUNNEL_PAUSE_PERIOD                = 5 * time.Second
-	PORT_FORWARD_FAILURE_THRESHOLD               = 10
+	PORT_FORWARD_FAILURE_THRESHOLD               = 0
 	HTTP_PROXY_ORIGIN_SERVER_TIMEOUT             = 15 * time.Second
 	HTTP_PROXY_MAX_IDLE_CONNECTIONS_PER_HOST     = 50
 	FETCH_REMOTE_SERVER_LIST_TIMEOUT             = 10 * time.Second

+ 14 - 0
psiphon/controller.go

@@ -405,6 +405,20 @@ loop:
 		case failedTunnel := <-controller.failedTunnels:
 			NoticeAlert("tunnel failed: %s", failedTunnel.serverEntry.IpAddress)
 			controller.terminateTunnel(failedTunnel)
+
+			// Note: we make this extra check to ensure the shutdown signal takes priority
+			// and that we do not start establishing. Critically, startEstablishing() calls
+			// establishPendingConns.Reset() which clears the closed flag in
+			// establishPendingConns; this causes the pendingConns.Add() within
+			// interruptibleTCPDial to succeed instead of aborting, and the result
+			// is that it's possible for extablish goroutines to run all the way through
+			// NewSession before being discarded... delaying shutdown.
+			select {
+			case <-controller.shutdownBroadcast:
+				break loop
+			default:
+			}
+
 			// Concurrency note: only this goroutine may call startEstablishing/stopEstablishing
 			// and access isEstablishing.
 			if !controller.isEstablishing {

+ 16 - 23
psiphon/meekConn.go

@@ -77,7 +77,6 @@ type MeekConn struct {
 	transport            transporter
 	mutex                sync.Mutex
 	isClosed             bool
-	closedSignal         chan struct{}
 	broadcastClosed      chan struct{}
 	relayWaitGroup       *sync.WaitGroup
 	emptyReceiveBuffer   chan *bytes.Buffer
@@ -255,46 +254,40 @@ func DialMeek(
 	go meek.relay()
 
 	// Enable interruption
-	config.PendingConns.Add(meek)
+	if !config.PendingConns.Add(meek) {
+		meek.Close()
+		return nil, ContextError(errors.New("pending connections already closed"))
+	}
 
 	return meek, nil
 }
 
-// SetClosedSignal implements psiphon.Conn.SetClosedSignal
-func (meek *MeekConn) SetClosedSignal(closedSignal chan struct{}) bool {
-	meek.mutex.Lock()
-	defer meek.mutex.Unlock()
-	if meek.isClosed {
-		return false
-	}
-	meek.closedSignal = closedSignal
-	return true
-}
-
 // Close terminates the meek connection. Close waits for the relay processing goroutine
 // to stop and releases HTTP transport resources.
-// A mutex is required to support psiphon.Conn.SetClosedSignal concurrency semantics.
+// A mutex is required to support net.Conn concurrency semantics.
 func (meek *MeekConn) Close() (err error) {
+
 	meek.mutex.Lock()
-	defer meek.mutex.Unlock()
-	if !meek.isClosed {
+	isClosed := meek.isClosed
+	meek.isClosed = true
+	meek.mutex.Unlock()
+
+	if !isClosed {
 		close(meek.broadcastClosed)
 		meek.pendingConns.CloseAll()
 		meek.relayWaitGroup.Wait()
 		meek.transport.CloseIdleConnections()
-		meek.isClosed = true
-		select {
-		case meek.closedSignal <- *new(struct{}):
-		default:
-		}
 	}
 	return nil
 }
 
 func (meek *MeekConn) closed() bool {
+
 	meek.mutex.Lock()
-	defer meek.mutex.Unlock()
-	return meek.isClosed
+	isClosed := meek.isClosed
+	meek.mutex.Unlock()
+
+	return isClosed
 }
 
 // Read reads data from the connection.

+ 0 - 24
psiphon/net.go

@@ -34,13 +34,6 @@ const DNS_PORT = 53
 // of a Psiphon dialer (TCPDial, MeekDial, etc.)
 type DialConfig struct {
 
-	// ClosedSignal is triggered when an underlying TCPConn network
-	// connection is closed. This is used in operateTunnel to detect
-	// an unexpected disconnect. Channel should be have buffer to
-	// receive at least on signal. Sender in TCPConn.Close() does not
-	// block.
-	ClosedSignal chan struct{}
-
 	// UpstreamProxyUrl specifies a proxy to connect through.
 	// E.g., "http://proxyhost:8080"
 	//       "socks5://user:password@proxyhost:1080"
@@ -53,8 +46,6 @@ type DialConfig struct {
 	UpstreamProxyUrl string
 
 	ConnectTimeout time.Duration
-	ReadTimeout    time.Duration
-	WriteTimeout   time.Duration
 
 	// PendingConns is used to interrupt dials in progress.
 	// The dial may be interrupted using PendingConns.CloseAll(): on platforms
@@ -101,21 +92,6 @@ func (TimeoutError) Temporary() bool { return true }
 // Dialer is a custom dialer compatible with http.Transport.Dial.
 type Dialer func(string, string) (net.Conn, error)
 
-// Conn is a net.Conn which supports sending a signal to a channel when
-// it is closed. In Psiphon, this interface is implemented by tunnel
-// connection types (DirectConn and MeekConn) and the close signal is
-// used as one trigger for tearing down the tunnel.
-type Conn interface {
-	net.Conn
-
-	// SetClosedSignal sets the channel which will be signaled
-	// when the connection is closed. This function returns false
-	// if the connection is already closed (and would never send
-	// the signal). SetClosedSignal and Close may be called by
-	// concurrent goroutines.
-	SetClosedSignal(closedSignal chan struct{}) bool
-}
-
 // Conns is a synchronized list of Conns that is used to coordinate
 // interrupting a set of goroutines establishing connections, or
 // close a set of open connections, etc.

+ 57 - 33
psiphon/tunnel.go

@@ -83,7 +83,6 @@ type Tunnel struct {
 	session                  *Session
 	protocol                 string
 	conn                     net.Conn
-	closedSignal             chan struct{}
 	sshClient                *ssh.Client
 	operateWaitGroup         *sync.WaitGroup
 	shutdownOperateBroadcast chan struct{}
@@ -114,7 +113,7 @@ func EstablishTunnel(
 	}
 
 	// Build transport layers and establish SSH connection
-	conn, closedSignal, sshClient, err := dialSsh(
+	conn, sshClient, err := dialSsh(
 		config, pendingConns, serverEntry, selectedProtocol, sessionId)
 	if err != nil {
 		return nil, ContextError(err)
@@ -123,6 +122,7 @@ func EstablishTunnel(
 	// Cleanup on error
 	defer func() {
 		if err != nil {
+			sshClient.Close()
 			conn.Close()
 		}
 	}()
@@ -134,7 +134,6 @@ func EstablishTunnel(
 		serverEntry:              serverEntry,
 		protocol:                 selectedProtocol,
 		conn:                     conn,
-		closedSignal:             closedSignal,
 		sshClient:                sshClient,
 		operateWaitGroup:         new(sync.WaitGroup),
 		shutdownOperateBroadcast: make(chan struct{}),
@@ -173,8 +172,13 @@ func EstablishTunnel(
 // Close stops operating the tunnel and closes the underlying connection.
 // Supports multiple and/or concurrent calls to Close().
 func (tunnel *Tunnel) Close() {
+
 	tunnel.mutex.Lock()
-	if !tunnel.isClosed {
+	isClosed := tunnel.isClosed
+	tunnel.isClosed = true
+	tunnel.mutex.Unlock()
+
+	if !isClosed {
 		// Signal operateTunnel to stop before closing the tunnel -- this
 		// allows a final status request to be made in the case of an orderly
 		// shutdown.
@@ -187,11 +191,10 @@ func (tunnel *Tunnel) Close() {
 		close(tunnel.shutdownOperateBroadcast)
 		tunnel.operateWaitGroup.Wait()
 		timer.Stop()
+		tunnel.sshClient.Close()
 		// tunnel.conn.Close() may get called twice, which is allowed.
 		tunnel.conn.Close()
 	}
-	tunnel.isClosed = true
-	tunnel.mutex.Unlock()
 }
 
 // Dial establishes a port forward connection through the tunnel
@@ -202,22 +205,36 @@ func (tunnel *Tunnel) Dial(
 	tunnel.mutex.Lock()
 	isClosed := tunnel.isClosed
 	tunnel.mutex.Unlock()
+
 	if isClosed {
 		return nil, errors.New("tunnel is closed")
 	}
 
-	sshPortForwardConn, err := tunnel.sshClient.Dial("tcp", remoteAddr)
-	if err != nil {
+	type tunnelDialResult struct {
+		sshPortForwardConn net.Conn
+		err                error
+	}
+	resultChannel := make(chan *tunnelDialResult, 2)
+	time.AfterFunc(TUNNEL_PORT_FORWARD_DIAL_TIMEOUT, func() {
+		resultChannel <- &tunnelDialResult{nil, errors.New("tunnel dial timeout")}
+	})
+	go func() {
+		sshPortForwardConn, err := tunnel.sshClient.Dial("tcp", remoteAddr)
+		resultChannel <- &tunnelDialResult{sshPortForwardConn, err}
+	}()
+	result := <-resultChannel
+
+	if result.err != nil {
 		// TODO: conditional on type of error or error message?
 		select {
 		case tunnel.portForwardFailures <- 1:
 		default:
 		}
-		return nil, ContextError(err)
+		return nil, ContextError(result.err)
 	}
 
 	conn = &TunneledConn{
-		Conn:           sshPortForwardConn,
+		Conn:           result.sshPortForwardConn,
 		tunnel:         tunnel,
 		downstreamConn: downstreamConn}
 
@@ -337,7 +354,7 @@ func dialSsh(
 	pendingConns *Conns,
 	serverEntry *ServerEntry,
 	selectedProtocol,
-	sessionId string) (conn net.Conn, closedSignal chan struct{}, sshClient *ssh.Client, err error) {
+	sessionId string) (conn net.Conn, sshClient *ssh.Client, err error) {
 
 	// The meek protocols tunnel obfuscated SSH. Obfuscated SSH is layered on top of SSH.
 	// So depending on which protocol is used, multiple layers are initialized.
@@ -368,11 +385,11 @@ func dialSsh(
 		// fronting-capable servers.
 
 		if len(serverEntry.MeekFrontingAddresses) == 0 {
-			return nil, nil, nil, ContextError(errors.New("MeekFrontingAddresses is empty"))
+			return nil, nil, ContextError(errors.New("MeekFrontingAddresses is empty"))
 		}
 		index, err := MakeSecureRandomInt(len(serverEntry.MeekFrontingAddresses))
 		if err != nil {
-			return nil, nil, nil, ContextError(err)
+			return nil, nil, ContextError(err)
 		}
 		frontingAddress = serverEntry.MeekFrontingAddresses[index]
 	}
@@ -382,15 +399,10 @@ func dialSsh(
 		selectedProtocol,
 		frontingAddress)
 
-	closedSignal = make(chan struct{}, 1)
-
 	// Create the base transport: meek or direct connection
 	dialConfig := &DialConfig{
-		ClosedSignal:     closedSignal,
 		UpstreamProxyUrl: config.UpstreamProxyUrl,
 		ConnectTimeout:   TUNNEL_CONNECT_TIMEOUT,
-		ReadTimeout:      TUNNEL_READ_TIMEOUT,
-		WriteTimeout:     TUNNEL_WRITE_TIMEOUT,
 		PendingConns:     pendingConns,
 		DeviceBinder:     config.DeviceBinder,
 		DnsServerGetter:  config.DnsServerGetter,
@@ -398,12 +410,12 @@ func dialSsh(
 	if useMeek {
 		conn, err = DialMeek(serverEntry, sessionId, frontingAddress, dialConfig)
 		if err != nil {
-			return nil, nil, nil, ContextError(err)
+			return nil, nil, ContextError(err)
 		}
 	} else {
 		conn, err = DialTCP(fmt.Sprintf("%s:%d", serverEntry.IpAddress, port), dialConfig)
 		if err != nil {
-			return nil, nil, nil, ContextError(err)
+			return nil, nil, ContextError(err)
 		}
 	}
 
@@ -421,14 +433,14 @@ func dialSsh(
 	if useObfuscatedSsh {
 		sshConn, err = NewObfuscatedSshConn(conn, serverEntry.SshObfuscatedKey)
 		if err != nil {
-			return nil, nil, nil, ContextError(err)
+			return nil, nil, ContextError(err)
 		}
 	}
 
 	// Now establish the SSH session over the sshConn transport
 	expectedPublicKey, err := base64.StdEncoding.DecodeString(serverEntry.SshHostKey)
 	if err != nil {
-		return nil, nil, nil, ContextError(err)
+		return nil, nil, ContextError(err)
 	}
 	sshCertChecker := &ssh.CertChecker{
 		HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error {
@@ -444,7 +456,7 @@ func dialSsh(
 			SshPassword string `json:"SshPassword"`
 		}{sessionId, serverEntry.SshPassword})
 	if err != nil {
-		return nil, nil, nil, ContextError(err)
+		return nil, nil, ContextError(err)
 	}
 	sshClientConfig := &ssh.ClientConfig{
 		User: serverEntry.SshUsername,
@@ -488,10 +500,10 @@ func dialSsh(
 
 	result := <-resultChannel
 	if result.err != nil {
-		return nil, nil, nil, ContextError(result.err)
+		return nil, nil, ContextError(result.err)
 	}
 
-	return conn, closedSignal, result.sshClient, nil
+	return conn, result.sshClient, nil
 }
 
 // operateTunnel periodically sends status requests (traffic stats updates updates)
@@ -521,6 +533,14 @@ func dialSsh(
 // - "read tcp ... connection reset by peer"
 // - "ssh: unexpected packet in response to channel open: <nil>"
 //
+// Update: the above is superceded by SSH keep alives with timeouts. When a keep
+// alive times out, the tunnel is marked as failed. Keep alives are triggered
+// periodically, and also immediately in the case of a port forward failure (so
+// as to immediately detect a situation such as a device waking up and trying
+// to use a dead tunnel). By default, port forward theshold counting does not
+// cause a tunnel to be marked as failed, with the conservative assumption that
+// a server which responds to an SSH keep alive is fully functional.
+//
 func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 	defer tunnel.operateWaitGroup.Done()
 
@@ -554,7 +574,7 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 			statsTimer.Reset(nextStatusRequestPeriod())
 
 		case <-sshKeepAliveTimer.C:
-			err = sendSshKeepAlive(tunnel.sshClient)
+			err = sendSshKeepAlive(tunnel.sshClient, tunnel.conn)
 			sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
 
 		case failures := <-tunnel.portForwardFailures:
@@ -562,7 +582,8 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 			tunnel.portForwardFailureTotal += failures
 			NoticeInfo("port forward failures for %s: %d",
 				tunnel.serverEntry.IpAddress, tunnel.portForwardFailureTotal)
-			if tunnel.portForwardFailureTotal > config.PortForwardFailureThreshold {
+			if config.PortForwardFailureThreshold > 0 &&
+				tunnel.portForwardFailureTotal > config.PortForwardFailureThreshold {
 				err = errors.New("tunnel exceeded port forward failure threshold")
 			} else {
 				// Try an SSH keep alive to check the state of the SSH connection
@@ -570,13 +591,10 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 				// on the server, so we don't abort the connection until the threshold
 				// is hit. But if we can't make a simple round trip request to the
 				// server, we'll immediately abort.
-				err = sendSshKeepAlive(tunnel.sshClient)
+				err = sendSshKeepAlive(tunnel.sshClient, tunnel.conn)
 				sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
 			}
 
-		case <-tunnel.closedSignal:
-			err = errors.New("tunnel closed unexpectedly")
-
 		case <-tunnel.shutdownOperateBroadcast:
 			// Attempt to send any remaining stats
 			sendStats(tunnel)
@@ -594,7 +612,7 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 // sendSshKeepAlive is a helper which sends a keepalive@openssh.com request
 // on the specified SSH connections and returns true of the request succeeds
 // within a specified timeout.
-func sendSshKeepAlive(sshClient *ssh.Client) error {
+func sendSshKeepAlive(sshClient *ssh.Client, conn net.Conn) error {
 
 	errChannel := make(chan error, 2)
 	time.AfterFunc(TUNNEL_SSH_KEEP_ALIVE_TIMEOUT, func() {
@@ -609,7 +627,13 @@ func sendSshKeepAlive(sshClient *ssh.Client) error {
 		errChannel <- err
 	}()
 
-	return ContextError(<-errChannel)
+	err := <-errChannel
+	if err != nil {
+		sshClient.Close()
+		conn.Close()
+	}
+
+	return ContextError(err)
 }
 
 // sendStats is a helper for sending session stats to the server.