Просмотр исходного кода

Fix: meek server deadlock

- When deleting expired sessions, it's necessary to call
  clientConn.Close before session.Lock in order to
  interrupt any concurrent request handler holding the
  lock and blocking on pumpReads or pumpWrites.

- Execute deletes concurrently so deleteExpiredSessions
  doesn't block for up to MEEK_HTTP_CLIENT_IO_TIMEOUT for
  each expired session.
Rod Hynes 8 лет назад
Родитель
Сommit
8560436ad5
1 измененных файлов с 56 добавлено и 40 удалено
  1. 56 40
      psiphon/server/meek.go

+ 56 - 40
psiphon/server/meek.go

@@ -381,7 +381,7 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 		if !session.cachedResponse.HasPosition(position) {
 			greaterThanSwapInt64(&session.metricCachedResponseMissPosition, int64(position))
 			server.terminateConnection(responseWriter, request)
-			session.close()
+			session.delete(true)
 			return
 		}
 
@@ -586,7 +586,7 @@ func (server *MeekServer) getSession(
 	server.sessionsLock.Unlock()
 
 	// Note: from the tunnel server's perspective, this client connection
-	// will close when session.close calls Close() on the meekConn.
+	// will close when session.delete calls Close() on the meekConn.
 	server.clientHandler(clientSessionData.ClientTunnelProtocol, session.clientConn)
 
 	return sessionID, session, nil
@@ -595,7 +595,7 @@ func (server *MeekServer) getSession(
 func (server *MeekServer) deleteSession(sessionID string) {
 
 	// Don't obtain the server.sessionsLock write lock until modifying
-	// server.sessions, as the session.close can block for up to
+	// server.sessions, as the session.delete can block for up to
 	// MEEK_HTTP_CLIENT_IO_TIMEOUT. Allow new sessions to be added
 	// concurrently.
 	//
@@ -608,25 +608,7 @@ func (server *MeekServer) deleteSession(sessionID string) {
 	server.sessionsLock.RUnlock()
 
 	if ok {
-		// session.lock must be obtained before calling session.close. Once
-		// the lock is obtained, no request for this session is being processed
-		// concurrently, and pending requests will block at session.lock.
-		//
-		// session.close makes a final call session.cachedResponse.Reset to
-		// release resources. This call requires exclusive access and the logic
-		// assumes that no further session.cachedResponse access occurs, or else
-		// resources may deplete (buffers won't be returned to the pool). These
-		// requirements are achieved by obtaining the lock, setting session.deleted,
-		// and the request handlers checking session.deleted immediately after
-		// obtaining the lock.
-		//
-		// session.lock.Lock may block for up to MEEK_HTTP_CLIENT_IO_TIMEOUT,
-		// the timeout for any active request handler processing a session
-		// request.
-		session.lock.Lock()
-		session.close()
-		session.deleted = true
-		session.lock.Unlock()
+		session.delete(false)
 
 		server.sessionsLock.Lock()
 		delete(server.sessions, sessionID)
@@ -638,11 +620,14 @@ func (server *MeekServer) deleteExpiredSessions() {
 
 	// A deleteSession call may block for up to MEEK_HTTP_CLIENT_IO_TIMEOUT,
 	// so grab a snapshot list of expired sessions and do not hold a lock for
-	// the duration of deleteExpiredSessions. Allow new sessions to be added
-	// concurrently.
+	// the duration of deleteExpiredSessions. This allows new sessions to be
+	// added concurrently.
 	//
 	// New sessions added after the snapshot is taken will be checked for
 	// expiry on subsequent periodic calls to deleteExpiredSessions.
+	//
+	// To avoid long delays in releasing resources, individual deletes are
+	// performed concurrently.
 
 	server.sessionsLock.Lock()
 	expiredSessionIDs := make([]string, 0)
@@ -653,9 +638,20 @@ func (server *MeekServer) deleteExpiredSessions() {
 	}
 	server.sessionsLock.Unlock()
 
+	start := monotime.Now()
+
+	deleteWaitGroup := new(sync.WaitGroup)
 	for _, sessionID := range expiredSessionIDs {
-		server.deleteSession(sessionID)
+		deleteWaitGroup.Add(1)
+		go func(sessionID string) {
+			defer deleteWaitGroup.Done()
+			server.deleteSession(sessionID)
+		}(sessionID)
 	}
+	deleteWaitGroup.Wait()
+
+	log.WithContextFields(
+		LogFields{"elapsed time": monotime.Since(start)}).Debug("deleted expired sessions")
 }
 
 // httpConnStateCallback tracks open persistent HTTP/HTTPS connections to the
@@ -670,7 +666,7 @@ func (server *MeekServer) httpConnStateCallback(conn net.Conn, connState http.Co
 }
 
 // terminateConnection sends a 404 response to a client and also closes
-// a persisitent connection.
+// the persistent connection.
 func (server *MeekServer) terminateConnection(
 	responseWriter http.ResponseWriter, request *http.Request) {
 
@@ -716,17 +712,46 @@ func (session *meekSession) expired() bool {
 	return monotime.Since(lastActivity) > MEEK_MAX_SESSION_STALENESS
 }
 
-// closeSession releases all resources allocated by a session.
-// session.lock must be held by the caller.
-func (session *meekSession) close() {
+// delete releases all resources allocated by a session.
+func (session *meekSession) delete(haveLock bool) {
 
-	// TODO: close the persistent HTTP client connection, if one exists
+	// TODO: close the persistent HTTP client connection, if one exists?
+
+	// This final call session.cachedResponse.Reset releases shared resources.
+	//
+	// This call requires exclusive access. session.lock is be obtained before
+	// calling session.cachedResponse.Reset. Once the lock is obtained, no
+	// request for this session is being processed concurrently, and pending
+	// requests will block at session.lock.
+	//
+	// This logic assumes that no further session.cachedResponse access occurs,
+	// or else resources may deplete (buffers won't be returned to the pool).
+	// These requirements are achieved by obtaining the lock, setting
+	// session.deleted, and any subsequent request handlers checking
+	// session.deleted immediately after obtaining the lock.
+	//
+	// session.lock.Lock may block for up to MEEK_HTTP_CLIENT_IO_TIMEOUT,
+	// the timeout for any active request handler processing a session
+	// request.
+	//
+	// When the lock must be acquired, clientConn.Close is called first, to
+	// interrupt any existing request handler blocking on pumpReads or pumpWrites.
 
 	session.clientConn.Close()
 
+	if !haveLock {
+		session.lock.Lock()
+	}
+
 	// Release all extended buffers back to the pool.
-	// This is not safe for concurrent calls.
+	// session.cachedResponse.Reset is not safe for concurrent calls.
 	session.cachedResponse.Reset()
+
+	session.deleted = true
+
+	if !haveLock {
+		session.lock.Unlock()
+	}
 }
 
 // GetMetrics implements the MetricsSource interface.
@@ -951,15 +976,6 @@ func newMeekConn(
 // Note: assumes only one concurrent call to pumpReads
 func (conn *meekConn) pumpReads(reader io.Reader) error {
 
-	// Wait for a full capacity empty buffer. This ensures we can read
-	// the maximum MEEK_MAX_REQUEST_PAYLOAD_LENGTH request payload and
-	// checksum before relaying.
-	//
-	// Note: previously, this code would select conn.partialReadBuffer
-	// and write to that, looping until the entire request payload was
-	// read. Now, the consumer, the Read() caller, must fully drain the
-	// read buffer first.
-
 	// Use either an empty or partial buffer. By using a partial
 	// buffer, pumpReads will not block if the Read() caller has
 	// not fully drained the read buffer.