Browse Source

use an explicit channel to signal workers to stop as soon as a connection is established; distinguish explicitly closed (cancelled) tunnels for logging

Rod Hynes 11 years ago
parent
commit
cc408cc25d
2 changed files with 27 additions and 10 deletions
  1. 25 8
      psiphon/runTunnel.go
  2. 2 2
      psiphon/tunnel.go

+ 25 - 8
psiphon/runTunnel.go

@@ -37,13 +37,24 @@ import (
 // if there's not already an established tunnel. This function is to be used in a pool
 // of goroutines.
 func establishTunnelWorker(
-	waitGroup *sync.WaitGroup, candidateQueue chan *Tunnel, firstEstablishedTunnel chan *Tunnel) {
+	waitGroup *sync.WaitGroup, candidateQueue chan *Tunnel,
+	broadcastStopWorkers chan bool, firstEstablishedTunnel chan *Tunnel) {
 	defer waitGroup.Done()
 	for tunnel := range candidateQueue {
+		select {
+		case <-broadcastStopWorkers:
+			return
+		default:
+		}
 		log.Printf("Connecting to %s...", tunnel.serverEntry.IpAddress)
 		err := EstablishTunnel(tunnel)
 		if err != nil {
-			log.Printf("failed to connect to %s: %s", tunnel.serverEntry.IpAddress, err)
+			if tunnel.isClosed {
+				log.Printf("cancelled connect to %s", tunnel.serverEntry.IpAddress)
+			} else {
+				log.Printf("failed to connect to %s: %s", tunnel.serverEntry.IpAddress, err)
+			}
+			tunnel.Close()
 		} else {
 			log.Printf("success connecting to %s", tunnel.serverEntry.IpAddress)
 			select {
@@ -78,9 +89,10 @@ func runTunnel(config *Config) error {
 	candidateQueue := make(chan *Tunnel, len(candidateList))
 	firstEstablishedTunnel := make(chan *Tunnel, 1)
 	timeout := time.After(ESTABLISH_TUNNEL_TIMEOUT)
+	broadcastStopWorkers := make(chan bool)
 	for i := 0; i < CONNECTION_WORKER_POOL_SIZE; i++ {
 		waitGroup.Add(1)
-		go establishTunnelWorker(waitGroup, candidateQueue, firstEstablishedTunnel)
+		go establishTunnelWorker(waitGroup, candidateQueue, broadcastStopWorkers, firstEstablishedTunnel)
 	}
 	for _, tunnel := range candidateList {
 		candidateQueue <- tunnel
@@ -96,20 +108,25 @@ func runTunnel(config *Config) error {
 	log.Printf("stopping workers")
 	for _, candidate := range candidateList {
 		if candidate != establishedTunnel {
-			// Immediately cancel any partial connections in progress
+			// Interrupt any partial connections in progress, so that
+			// the worker will terminate immediately
 			candidate.Close()
 		}
 	}
+	close(broadcastStopWorkers)
 	// TODO: can start SOCKS before synchronizing work group
 	waitGroup.Wait()
 	if establishedTunnel != nil {
-		stopSignal := make(chan bool)
-		err = establishedTunnel.conn.SetDisconnectionSignal(stopSignal)
+		// Don't hold references to candidates while running tunnel
+		candidateList = nil
+		candidateQueue = nil
+		stopTunnelSignal := make(chan bool)
+		err = establishedTunnel.conn.SetDisconnectionSignal(stopTunnelSignal)
 		if err != nil {
 			return fmt.Errorf("failed to set disconnection signal: %s", err)
 		}
 		log.Printf("starting local SOCKS proxy")
-		socksServer := NewSocksServer(establishedTunnel, stopSignal)
+		socksServer := NewSocksServer(establishedTunnel, stopTunnelSignal)
 		if err != nil {
 			return fmt.Errorf("error initializing local SOCKS proxy: %s", err)
 		}
@@ -119,7 +136,7 @@ func runTunnel(config *Config) error {
 		}
 		defer socksServer.Close()
 		log.Printf("monitoring for failure")
-		<-stopSignal
+		<-stopTunnelSignal
 	}
 	return nil
 }

+ 2 - 2
psiphon/tunnel.go

@@ -37,6 +37,7 @@ type Tunnel struct {
 	serverEntry *ServerEntry
 	conn        *Conn
 	sshClient   *ssh.Client
+	isClosed    bool
 }
 
 // Close terminates the tunnel SSH client session and the
@@ -44,12 +45,11 @@ type Tunnel struct {
 func (tunnel *Tunnel) Close() {
 	if tunnel.sshClient != nil {
 		tunnel.sshClient.Close()
-		tunnel.sshClient = nil
 	}
 	if tunnel.conn != nil {
 		tunnel.conn.Close()
-		tunnel.conn = nil
 	}
+	tunnel.isClosed = true
 }
 
 // EstablishTunnel first makes a network transport connection to the