|
|
@@ -448,20 +448,18 @@ func (server *Server) resumeSession(session *session, channel *Channel) {
|
|
|
|
|
|
session.channel = channel
|
|
|
|
|
|
- // Both session.stopRunning() and server.stopRunning() will trigger
|
|
|
- // session.runContext.Done().
|
|
|
- session.runContext, session.stopRunning = context.WithCancel(server.runContext)
|
|
|
+ // Parent context is not server.runContext so that session workers
|
|
|
+ // need only check session.stopRunning to act on shutdown events.
|
|
|
+ session.runContext, session.stopRunning = context.WithCancel(context.Background())
|
|
|
|
|
|
// When a session is interrupted, all goroutines in session.workers
|
|
|
// are joined. When the server is stopped, all goroutines in
|
|
|
// server.workers are joined. So, in both cases we synchronously
|
|
|
// stop all workers associated with this session.
|
|
|
|
|
|
- server.workers.Add(1)
|
|
|
session.workers.Add(1)
|
|
|
go server.runClientUpstream(session)
|
|
|
|
|
|
- server.workers.Add(1)
|
|
|
session.workers.Add(1)
|
|
|
go server.runClientDownstream(session)
|
|
|
|
|
|
@@ -665,7 +663,6 @@ func (server *Server) runDeviceDownstream() {
|
|
|
|
|
|
func (server *Server) runClientUpstream(session *session) {
|
|
|
|
|
|
- defer server.workers.Done()
|
|
|
defer session.workers.Done()
|
|
|
|
|
|
// Read incoming packets from the client channel, validate the packets,
|
|
|
@@ -684,8 +681,8 @@ func (server *Server) runClientUpstream(session *session) {
|
|
|
if err != nil {
|
|
|
server.config.Logger.WithContextFields(
|
|
|
common.LogFields{"error": err}).Warning("read channel packet failed")
|
|
|
- // Tear down the session.
|
|
|
- server.interruptSession(session)
|
|
|
+ // Tear down the session. Must be invoked asynchronously.
|
|
|
+ go server.interruptSession(session)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
@@ -725,7 +722,6 @@ func (server *Server) runClientUpstream(session *session) {
|
|
|
|
|
|
func (server *Server) runClientDownstream(session *session) {
|
|
|
|
|
|
- defer server.workers.Done()
|
|
|
defer session.workers.Done()
|
|
|
|
|
|
// Dequeue, process, and relay packets to be sent to the client channel.
|
|
|
@@ -758,8 +754,8 @@ func (server *Server) runClientDownstream(session *session) {
|
|
|
if err != nil {
|
|
|
server.config.Logger.WithContextFields(
|
|
|
common.LogFields{"error": err}).Warning("write channel packet failed")
|
|
|
- // Tear down the session.
|
|
|
- server.interruptSession(session)
|
|
|
+ // Tear down the session. Must be invoked asynchronously.
|
|
|
+ go server.interruptSession(session)
|
|
|
return
|
|
|
}
|
|
|
|