Kaynağa Gözat

Don't block operateTunnel on network requests

* Run distinct goroutines for making status and ssh keep alive requests: don't block other operateTunnel logic while waiting for network requests
* Periodic requests are now on schedule and not offset by request time.
* Fix: NoticeTotalBytesTransferred always called on end of operateTunnel
Rod Hynes 10 yıl önce
ebeveyn
işleme
edfb823748
1 değiştirilmiş dosya ile 50 ekleme ve 8 silme
  1. 50 8
      psiphon/tunnel.go

+ 50 - 8
psiphon/tunnel.go

@@ -541,6 +541,9 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 	totalSent := int64(0)
 	totalReceived := int64(0)
 
+	// Always emit a final NoticeTotalBytesTransferred
+	defer NoticeTotalBytesTransferred(tunnel.serverEntry.IpAddress, totalSent, totalReceived)
+
 	noticeBytesTransferredTicker := time.NewTicker(1 * time.Second)
 	defer noticeBytesTransferredTicker.Stop()
 
@@ -570,6 +573,39 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 		defer sshKeepAliveTimer.Stop()
 	}
 
+	// Perform network requests in separate goroutines so as not to block
+	// other operations.
+	// Note: defer LIFO dependency: channels to be closed before Wait()
+	requestsWaitGroup := new(sync.WaitGroup)
+	defer requestsWaitGroup.Wait()
+
+	requestsWaitGroup.Add(1)
+	signalStatusRequest := make(chan struct{})
+	defer close(signalStatusRequest)
+	go func() {
+		defer requestsWaitGroup.Done()
+		for _ = range signalStatusRequest {
+			sendStats(tunnel)
+		}
+	}()
+
+	requestsWaitGroup.Add(1)
+	signalSshKeepAlive := make(chan time.Duration)
+	sshKeepAliveError := make(chan error, 1)
+	defer close(signalSshKeepAlive)
+	go func() {
+		defer requestsWaitGroup.Done()
+		for timeout := range signalSshKeepAlive {
+			err := sendSshKeepAlive(tunnel.sshClient, tunnel.conn, timeout)
+			if err != nil {
+				select {
+				case sshKeepAliveError <- err:
+				default:
+				}
+			}
+		}
+	}()
+
 	var err error
 	for err == nil {
 		select {
@@ -595,14 +631,18 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 			}
 
 		case <-statsTimer.C:
-			// TODO: perform this request asynchronously; don't block other operations
-			sendStats(tunnel)
+			select {
+			case signalStatusRequest <- *new(struct{}):
+			default:
+			}
 			statsTimer.Reset(nextStatusRequestPeriod())
 
 		case <-sshKeepAliveTimer.C:
 			if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PERIODIC_INACTIVE_PERIOD).Before(time.Now()) {
-				err = sendSshKeepAlive(
-					tunnel.sshClient, tunnel.conn, TUNNEL_SSH_KEEP_ALIVE_PERIODIC_TIMEOUT)
+				select {
+				case signalSshKeepAlive <- TUNNEL_SSH_KEEP_ALIVE_PERIODIC_TIMEOUT:
+				default:
+				}
 			}
 			sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
 
@@ -613,13 +653,17 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 				tunnel.serverEntry.IpAddress, tunnel.totalPortForwardFailures)
 
 			if lastBytesReceivedTime.Add(TUNNEL_SSH_KEEP_ALIVE_PROBE_INACTIVE_PERIOD).Before(time.Now()) {
-				err = sendSshKeepAlive(
-					tunnel.sshClient, tunnel.conn, TUNNEL_SSH_KEEP_ALIVE_PROBE_TIMEOUT)
+				select {
+				case signalSshKeepAlive <- TUNNEL_SSH_KEEP_ALIVE_PROBE_TIMEOUT:
+				default:
+				}
 			}
 			if !config.DisablePeriodicSshKeepAlive {
 				sshKeepAliveTimer.Reset(nextSshKeepAlivePeriod())
 			}
 
+		case err = <-sshKeepAliveError:
+
 		case <-tunnel.shutdownOperateBroadcast:
 			// Attempt to send any remaining stats
 			sendStats(tunnel)
@@ -628,8 +672,6 @@ func (tunnel *Tunnel) operateTunnel(config *Config, tunnelOwner TunnelOwner) {
 		}
 	}
 
-	NoticeTotalBytesTransferred(tunnel.serverEntry.IpAddress, totalSent, totalReceived)
-
 	if err != nil {
 		NoticeAlert("operate tunnel error for %s: %s", tunnel.serverEntry.IpAddress, err)
 		tunnelOwner.SignalTunnelFailure(tunnel)