Просмотр исходного кода

Fix: meek server session race condition

- A session could expire and be closed while a
  request handler concurrently accessed the
  session. This would lead to unsafe access to
  the session.cachedResponse.Reset, along with
  possibly failing to refill the shared buffer
  pool.

- The expired session reaper now waits when an
  expired session has an active request handler.
  Accommodations are made so this waiting does
  not block new sessions and will be interrupted
  on shutdown.
Rod Hynes 8 лет назад
Родитель
Сommit
656e712d2c
1 измененных файлов с 84 добавлено и 25 удалено
  1. 84 25
      psiphon/server/meek.go

+ 84 - 25
psiphon/server/meek.go

@@ -156,8 +156,6 @@ func NewMeekServer(
 // To stop the meek server, both Close() the listener and set the stopBroadcast
 // signal specified in NewMeekServer.
 func (server *MeekServer) Run() error {
-	defer server.listener.Close()
-	defer server.openConns.CloseAll()
 
 	// Expire sessions
 
@@ -170,7 +168,7 @@ func (server *MeekServer) Run() error {
 		for {
 			select {
 			case <-ticker.C:
-				server.closeExpireSessions()
+				server.deleteExpiredSessions()
 			case <-server.stopBroadcast:
 				return
 			}
@@ -215,6 +213,14 @@ func (server *MeekServer) Run() error {
 	default:
 	}
 
+	// deleteExpiredSessions calls deleteSession which may block waiting
+	// for active request handlers to complete; timely shutdown requires
+	// stopping the listener and closing all existing connections before
+	// awaiting the reaperWaitGroup.
+
+	server.listener.Close()
+	server.openConns.CloseAll()
+
 	reaperWaitGroup.Wait()
 
 	return err
@@ -290,7 +296,12 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 
 	// If a newer request has arrived while waiting, discard this one.
 	// Do not delay processing the newest request.
-	if atomic.LoadInt64(&session.requestCount) > requestNumber {
+	//
+	// If the session expired and was deleted while this request was waiting,
+	// discard this request. The session is no longer valid, and the final call
+	// to session.cachedResponse.Reset may have already occured, so any further
+	// session.cachedResponse access may deplete resources (fail to refill the pool).
+	if atomic.LoadInt64(&session.requestCount) > requestNumber || session.deleted {
 		server.terminateConnection(responseWriter, request)
 		return
 	}
@@ -370,7 +381,7 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 		if !session.cachedResponse.HasPosition(position) {
 			greaterThanSwapInt64(&session.metricCachedResponseMissPosition, int64(position))
 			server.terminateConnection(responseWriter, request)
-			server.closeSession(sessionID)
+			session.close()
 			return
 		}
 
@@ -575,42 +586,76 @@ func (server *MeekServer) getSession(
 	server.sessionsLock.Unlock()
 
 	// Note: from the tunnel server's perspective, this client connection
-	// will close when closeSessionHelper calls Close() on the meekConn.
+	// will close when session.close calls Close() on the meekConn.
 	server.clientHandler(clientSessionData.ClientTunnelProtocol, session.clientConn)
 
 	return sessionID, session, nil
 }
 
-func (server *MeekServer) closeSessionHelper(
-	sessionID string, session *meekSession) {
-
-	// TODO: close the persistent HTTP client connection, if one exists
-	session.clientConn.Close()
-
-	// Release all extended buffers back to the pool
-	session.cachedResponse.Reset()
+func (server *MeekServer) deleteSession(sessionID string) {
 
-	// Note: assumes caller holds lock on sessionsLock
-	delete(server.sessions, sessionID)
-}
-
-func (server *MeekServer) closeSession(sessionID string) {
-	server.sessionsLock.Lock()
+	// Don't obtain the server.sessionsLock write lock until modifying
+	// server.sessions, as the session.close can block for up to
+	// MEEK_HTTP_CLIENT_IO_TIMEOUT. Allow new sessions to be added
+	// concurrently.
+	//
+	// Since a lock isn't held for the duration, concurrent calls to
+	// deleteSession with the same sessionID could happen; this is
+	// not expected since only the reaper goroutine calls deleteExpiredSessions
+	// (and in any case concurrent execution of the ok block is not an issue).
+	server.sessionsLock.RLock()
 	session, ok := server.sessions[sessionID]
+	server.sessionsLock.RUnlock()
+
 	if ok {
-		server.closeSessionHelper(sessionID, session)
+		// session.lock must be obtained before calling session.close. Once
+		// the lock is obtained, no request for this session is being processed
+		// concurrently, and pending requests will block at session.lock.
+		//
+		// session.close makes a final call session.cachedResponse.Reset to
+		// release resources. This call requires exclusive access and the logic
+		// assumes that no further session.cachedResponse access occurs, or else
+		// resources may deplete (buffers won't be returned to the pool). These
+		// requirements are achieved by obtaining the lock, setting session.deleted,
+		// and the request handlers checking session.deleted immediately after
+		// obtaining the lock.
+		//
+		// session.lock.Lock may block for up to MEEK_HTTP_CLIENT_IO_TIMEOUT,
+		// the timeout for any active request handler processing a session
+		// request.
+		session.lock.Lock()
+		session.close()
+		session.deleted = true
+		session.lock.Unlock()
+
+		server.sessionsLock.Lock()
+		delete(server.sessions, sessionID)
+		server.sessionsLock.Unlock()
 	}
-	server.sessionsLock.Unlock()
 }
 
-func (server *MeekServer) closeExpireSessions() {
+func (server *MeekServer) deleteExpiredSessions() {
+
+	// A deleteSession call may block for up to MEEK_HTTP_CLIENT_IO_TIMEOUT,
+	// so grab a snapshot list of expired sessions and do not hold a lock for
+	// the duration of deleteExpiredSessions. Allow new sessions to be added
+	// concurrently.
+	//
+	// New sessions added after the snapshot is taken will be checked for
+	// expiry on subsequent periodic calls to deleteExpiredSessions.
+
 	server.sessionsLock.Lock()
+	expiredSessionIDs := make([]string, 0)
 	for sessionID, session := range server.sessions {
 		if session.expired() {
-			server.closeSessionHelper(sessionID, session)
+			expiredSessionIDs = append(expiredSessionIDs, sessionID)
 		}
 	}
 	server.sessionsLock.Unlock()
+
+	for _, sessionID := range expiredSessionIDs {
+		server.deleteSession(sessionID)
+	}
 }
 
 // httpConnStateCallback tracks open persistent HTTP/HTTPS connections to the
@@ -655,6 +700,7 @@ type meekSession struct {
 	metricPeakCachedResponseHitSize  int64
 	metricCachedResponseMissPosition int64
 	lock                             sync.Mutex
+	deleted                          bool
 	clientConn                       *meekConn
 	meekProtocolVersion              int
 	sessionIDSent                    bool
@@ -670,7 +716,20 @@ func (session *meekSession) expired() bool {
 	return monotime.Since(lastActivity) > MEEK_MAX_SESSION_STALENESS
 }
 
-// GetMetrics implements the MetricsSource interface
+// closeSession releases all resources allocated by a session.
+// session.lock must be held by the caller.
+func (session *meekSession) close() {
+
+	// TODO: close the persistent HTTP client connection, if one exists
+
+	session.clientConn.Close()
+
+	// Release all extended buffers back to the pool.
+	// This is not safe for concurrent calls.
+	session.cachedResponse.Reset()
+}
+
+// GetMetrics implements the MetricsSource interface.
 func (session *meekSession) GetMetrics() LogFields {
 	logFields := make(LogFields)
 	logFields["meek_client_retries"] = atomic.LoadInt64(&session.metricClientRetries)