|
|
@@ -572,26 +572,74 @@ func (sshServer *sshServer) registerEstablishedClient(client *sshClient) bool {
|
|
|
// and reestablished. In this case, when the same server is selected, this logic
|
|
|
// will be hit; closing the old, dangling client is desirable.
|
|
|
// - Multi-tunnel clients should not normally use one server for multiple tunnels.
|
|
|
- existingClient := sshServer.clients[client.sessionID]
|
|
|
|
|
|
- sshServer.clients[client.sessionID] = client
|
|
|
+ existingClient := sshServer.clients[client.sessionID]
|
|
|
|
|
|
sshServer.clientsMutex.Unlock()
|
|
|
|
|
|
- // Call stop() outside the mutex to avoid deadlock.
|
|
|
if existingClient != nil {
|
|
|
+
|
|
|
+ // This case is expected to be common, and so logged at the lowest severity
|
|
|
+ // level.
|
|
|
+ log.WithContext().Debug(
|
|
|
+ "stopping existing client with duplicate session ID")
|
|
|
+
|
|
|
existingClient.stop()
|
|
|
|
|
|
- // Since existingClient.run() isn't guaranteed to have terminated at
|
|
|
- // this point, synchronously release authorizations for the previous
|
|
|
- // client here. This ensures that the authorization IDs are not in
|
|
|
- // use when the reconnecting client submits its authorizations.
|
|
|
- existingClient.cleanupAuthorizations()
|
|
|
+ // Block until the existingClient is fully terminated. This is necessary to
|
|
|
+ // avoid this scenario:
|
|
|
+ // - existingClient is invoking handshakeAPIRequestHandler
|
|
|
+ // - sshServer.clients[client.sessionID] is updated to point to new client
|
|
|
+ // - existingClient's handshakeAPIRequestHandler invokes
|
|
|
+ // SetClientHandshakeState but sets the handshake parameters for new
|
|
|
+ // client
|
|
|
+ // - as a result, the new client handshake will fail (only a single handshake
|
|
|
+ // is permitted) and the new client server_tunnel log will contain an
|
|
|
+ // invalid mix of existing/new client fields
|
|
|
+ //
|
|
|
+ // Once existingClient.awaitStopped returns, all existingClient port
|
|
|
+ // forwards and request handlers have terminated, so no API handler, either
|
|
|
+ // tunneled web API or SSH API, will remain and it is safe to point
|
|
|
+ // sshServer.clients[client.sessionID] to the new client.
|
|
|
+ // Limitation: this scenario remains possible with _untunneled_ web API
|
|
|
+ // requests.
|
|
|
+ //
|
|
|
+ // Blocking also ensures existingClient.releaseAuthorizations is invoked before
|
|
|
+ // the new client attempts to submit the same authorizations.
|
|
|
+ //
|
|
|
+ // Perform blocking awaitStopped operation outside the
|
|
|
+ // sshServer.clientsMutex mutex to avoid blocking all other clients for the
|
|
|
+ // duration. We still expect and require that the stop process completes
|
|
|
+ // rapidly, e.g., does not block on network I/O, allowing the new client
|
|
|
+ // connection to proceed without delay.
|
|
|
+ //
|
|
|
+ // In addition, operations triggered by stop, and which must complete before
|
|
|
+ // awaitStopped returns, will attempt to lock sshServer.clientsMutex,
|
|
|
+ // including unregisterEstablishedClient.
|
|
|
|
|
|
- log.WithContext().Debug(
|
|
|
- "stopped existing client with duplicate session ID")
|
|
|
+ existingClient.awaitStopped()
|
|
|
}
|
|
|
|
|
|
+ sshServer.clientsMutex.Lock()
|
|
|
+ defer sshServer.clientsMutex.Unlock()
|
|
|
+
|
|
|
+ // existingClient's stop will have removed it from sshServer.clients via
|
|
|
+ // unregisterEstablishedClient, so sshServer.clients[client.sessionID] should
|
|
|
+ // be nil -- unless yet another client instance using the same sessionID has
|
|
|
+ // connected in the meantime while awaiting existingClient stop. In this
|
|
|
+ // case, it's not clear which is the most recent connection from the client,
|
|
|
+ // so instead of this connection terminating more peers, it aborts.
|
|
|
+
|
|
|
+ if sshServer.clients[client.sessionID] != nil {
|
|
|
+ // As this is expected to be rare case, it's logged at a higher severity
|
|
|
+ // level.
|
|
|
+ log.WithContext().Warning(
|
|
|
+ "aborting new client with duplicate session ID")
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ sshServer.clients[client.sessionID] = client
|
|
|
+
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
@@ -601,17 +649,15 @@ func (sshServer *sshServer) unregisterEstablishedClient(client *sshClient) {
|
|
|
|
|
|
registeredClient := sshServer.clients[client.sessionID]
|
|
|
|
|
|
- // registeredClient will differ from client when client
|
|
|
- // is the existingClient terminated in registerEstablishedClient.
|
|
|
- // In that case, registeredClient remains connected, and
|
|
|
- // the sshServer.clients entry should be retained.
|
|
|
+ // registeredClient will differ from client when client is the existingClient
|
|
|
+ // terminated in registerEstablishedClient. In that case, registeredClient
|
|
|
+ // remains connected, and the sshServer.clients entry should be retained.
|
|
|
if registeredClient == client {
|
|
|
delete(sshServer.clients, client.sessionID)
|
|
|
}
|
|
|
|
|
|
sshServer.clientsMutex.Unlock()
|
|
|
|
|
|
- // Call stop() outside the mutex to avoid deadlock.
|
|
|
client.stop()
|
|
|
}
|
|
|
|
|
|
@@ -984,6 +1030,7 @@ type sshClient struct {
|
|
|
signalIssueSLOKs chan struct{}
|
|
|
runCtx context.Context
|
|
|
stopRunning context.CancelFunc
|
|
|
+ stopped chan struct{}
|
|
|
tcpPortForwardDialingAvailableSignal context.CancelFunc
|
|
|
releaseAuthorizations func()
|
|
|
stopTimer *time.Timer
|
|
|
@@ -1050,6 +1097,7 @@ func newSshClient(
|
|
|
signalIssueSLOKs: make(chan struct{}, 1),
|
|
|
runCtx: runCtx,
|
|
|
stopRunning: stopRunning,
|
|
|
+ stopped: make(chan struct{}),
|
|
|
}
|
|
|
|
|
|
client.tcpTrafficState.availablePortForwardCond = sync.NewCond(new(sync.Mutex))
|
|
|
@@ -1061,6 +1109,10 @@ func newSshClient(
|
|
|
func (sshClient *sshClient) run(
|
|
|
baseConn net.Conn, onSSHHandshakeFinished func()) {
|
|
|
|
|
|
+ // When run returns, the client has fully stopped, with all SSH state torn
|
|
|
+ // down and no port forwards or API requests in progress.
|
|
|
+ defer close(sshClient.stopped)
|
|
|
+
|
|
|
// onSSHHandshakeFinished must be called even if the SSH handshake is aborted.
|
|
|
defer func() {
|
|
|
if onSSHHandshakeFinished != nil {
|
|
|
@@ -1410,14 +1462,25 @@ func (sshClient *sshClient) authLogCallback(conn ssh.ConnMetadata, method string
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// stop signals the ssh connection to shutdown. After sshConn() returns,
|
|
|
-// the connection has terminated but sshClient.run() may still be
|
|
|
-// running and in the process of exiting.
|
|
|
+// stop signals the ssh connection to shutdown. After sshConn.Wait returns,
|
|
|
+// the SSH connection has terminated but sshClient.run may still be running and
|
|
|
+// in the process of exiting.
|
|
|
+//
|
|
|
+// The shutdown process must complete rapidly and not, e.g., block on network
|
|
|
+// I/O, as newly connecting clients need to await stop completion of any
|
|
|
+// existing connection that shares the same session ID.
|
|
|
func (sshClient *sshClient) stop() {
|
|
|
sshClient.sshConn.Close()
|
|
|
sshClient.sshConn.Wait()
|
|
|
}
|
|
|
|
|
|
+// awaitStopped will block until sshClient.run has exited, at which point all
|
|
|
+// worker goroutines associated with the sshClient, including any in-flight
|
|
|
+// API handlers, will have exited.
|
|
|
+func (sshClient *sshClient) awaitStopped() {
|
|
|
+ <-sshClient.stopped
|
|
|
+}
|
|
|
+
|
|
|
// runTunnel handles/dispatches new channels and new requests from the client.
|
|
|
// When the SSH client connection closes, both the channels and requests channels
|
|
|
// will close and runTunnel will exit.
|