Просмотр исходного кода

Merge pull request #424 from rod-hynes/master

Meek race condition fixes
Rod Hynes 8 лет назад
Родитель
Сommit
9b78444ae4
2 измененных файлов с 103 добавлено и 32 удалено
  1. 19 7
      psiphon/meekConn.go
  2. 84 25
      psiphon/server/meek.go

+ 19 - 7
psiphon/meekConn.go

@@ -809,6 +809,13 @@ func (meek *MeekConn) roundTrip(sendBuffer *bytes.Buffer) (int64, error) {
 		var requestBody io.ReadCloser
 		contentLength := 0
 		if !serverAcknowledgedRequestPayload && sendBuffer != nil {
+
+			// sendBuffer will be replaced once the data is no longer needed,
+			// when RoundTrip calls Close on the Body; this allows meekConn.Write()
+			// to unblock and start buffering data for the next roung trip while
+			// still reading the current round trip response. signaller provides
+			// the hook for awaiting RoundTrip's call to Close.
+
 			signaller = NewReadCloseSignaller(bytes.NewReader(sendBuffer.Bytes()))
 			requestBody = signaller
 			contentLength = sendBuffer.Len()
@@ -850,12 +857,16 @@ func (meek *MeekConn) roundTrip(sendBuffer *bytes.Buffer) (int64, error) {
 			request.Header.Set("Range", fmt.Sprintf("bytes=%d-", receivedPayloadSize))
 		}
 
-		// sendBuffer will be replaced once the data is no longer needed,
-		// when RoundTrip calls Close on the Body; this allows meekConn.Write()
-		// to unblock and start buffering data for the next roung trip while
-		// still reading the current round trip response.
-
 		response, err := meek.transport.RoundTrip(request)
+
+		// Wait for RoundTrip to call Close on the request body, when
+		// there is one. This is necessary to ensure it's safe to
+		// subsequently replace sendBuffer in both the success and
+		// error cases.
+		if signaller != nil {
+			signaller.AwaitClosed()
+		}
+
 		if err != nil {
 			select {
 			case <-meek.runContext.Done():
@@ -897,8 +908,9 @@ func (meek *MeekConn) roundTrip(sendBuffer *bytes.Buffer) (int64, error) {
 			// buffer may be replaced; this allows meekConn.Write() to unblock
 			// and start buffering data for the next round trip while still
 			// reading the current round trip response.
-			if signaller != nil {
-				signaller.AwaitClosed()
+			if sendBuffer != nil {
+				// Assumes signaller.AwaitClosed is called above, so
+				// sendBuffer will no longer be accessed by RoundTrip.
 				sendBuffer.Truncate(0)
 				meek.replaceSendBuffer(sendBuffer)
 				sendBuffer = nil

+ 84 - 25
psiphon/server/meek.go

@@ -156,8 +156,6 @@ func NewMeekServer(
 // To stop the meek server, both Close() the listener and set the stopBroadcast
 // signal specified in NewMeekServer.
 func (server *MeekServer) Run() error {
-	defer server.listener.Close()
-	defer server.openConns.CloseAll()
 
 	// Expire sessions
 
@@ -170,7 +168,7 @@ func (server *MeekServer) Run() error {
 		for {
 			select {
 			case <-ticker.C:
-				server.closeExpireSessions()
+				server.deleteExpiredSessions()
 			case <-server.stopBroadcast:
 				return
 			}
@@ -215,6 +213,14 @@ func (server *MeekServer) Run() error {
 	default:
 	}
 
+	// deleteExpiredSessions calls deleteSession which may block waiting
+	// for active request handlers to complete; timely shutdown requires
+	// stopping the listener and closing all existing connections before
+	// awaiting the reaperWaitGroup.
+
+	server.listener.Close()
+	server.openConns.CloseAll()
+
 	reaperWaitGroup.Wait()
 
 	return err
@@ -290,7 +296,12 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 
 	// If a newer request has arrived while waiting, discard this one.
 	// Do not delay processing the newest request.
-	if atomic.LoadInt64(&session.requestCount) > requestNumber {
+	//
+	// If the session expired and was deleted while this request was waiting,
+	// discard this request. The session is no longer valid, and the final call
+	// to session.cachedResponse.Reset may have already occured, so any further
+	// session.cachedResponse access may deplete resources (fail to refill the pool).
+	if atomic.LoadInt64(&session.requestCount) > requestNumber || session.deleted {
 		server.terminateConnection(responseWriter, request)
 		return
 	}
@@ -370,7 +381,7 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 		if !session.cachedResponse.HasPosition(position) {
 			greaterThanSwapInt64(&session.metricCachedResponseMissPosition, int64(position))
 			server.terminateConnection(responseWriter, request)
-			server.closeSession(sessionID)
+			session.close()
 			return
 		}
 
@@ -575,42 +586,76 @@ func (server *MeekServer) getSession(
 	server.sessionsLock.Unlock()
 
 	// Note: from the tunnel server's perspective, this client connection
-	// will close when closeSessionHelper calls Close() on the meekConn.
+	// will close when session.close calls Close() on the meekConn.
 	server.clientHandler(clientSessionData.ClientTunnelProtocol, session.clientConn)
 
 	return sessionID, session, nil
 }
 
-func (server *MeekServer) closeSessionHelper(
-	sessionID string, session *meekSession) {
-
-	// TODO: close the persistent HTTP client connection, if one exists
-	session.clientConn.Close()
-
-	// Release all extended buffers back to the pool
-	session.cachedResponse.Reset()
+func (server *MeekServer) deleteSession(sessionID string) {
 
-	// Note: assumes caller holds lock on sessionsLock
-	delete(server.sessions, sessionID)
-}
-
-func (server *MeekServer) closeSession(sessionID string) {
-	server.sessionsLock.Lock()
+	// Don't obtain the server.sessionsLock write lock until modifying
+	// server.sessions, as the session.close can block for up to
+	// MEEK_HTTP_CLIENT_IO_TIMEOUT. Allow new sessions to be added
+	// concurrently.
+	//
+	// Since a lock isn't held for the duration, concurrent calls to
+	// deleteSession with the same sessionID could happen; this is
+	// not expected since only the reaper goroutine calls deleteExpiredSessions
+	// (and in any case concurrent execution of the ok block is not an issue).
+	server.sessionsLock.RLock()
 	session, ok := server.sessions[sessionID]
+	server.sessionsLock.RUnlock()
+
 	if ok {
-		server.closeSessionHelper(sessionID, session)
+		// session.lock must be obtained before calling session.close. Once
+		// the lock is obtained, no request for this session is being processed
+		// concurrently, and pending requests will block at session.lock.
+		//
+		// session.close makes a final call session.cachedResponse.Reset to
+		// release resources. This call requires exclusive access and the logic
+		// assumes that no further session.cachedResponse access occurs, or else
+		// resources may deplete (buffers won't be returned to the pool). These
+		// requirements are achieved by obtaining the lock, setting session.deleted,
+		// and the request handlers checking session.deleted immediately after
+		// obtaining the lock.
+		//
+		// session.lock.Lock may block for up to MEEK_HTTP_CLIENT_IO_TIMEOUT,
+		// the timeout for any active request handler processing a session
+		// request.
+		session.lock.Lock()
+		session.close()
+		session.deleted = true
+		session.lock.Unlock()
+
+		server.sessionsLock.Lock()
+		delete(server.sessions, sessionID)
+		server.sessionsLock.Unlock()
 	}
-	server.sessionsLock.Unlock()
 }
 
-func (server *MeekServer) closeExpireSessions() {
+func (server *MeekServer) deleteExpiredSessions() {
+
+	// A deleteSession call may block for up to MEEK_HTTP_CLIENT_IO_TIMEOUT,
+	// so grab a snapshot list of expired sessions and do not hold a lock for
+	// the duration of deleteExpiredSessions. Allow new sessions to be added
+	// concurrently.
+	//
+	// New sessions added after the snapshot is taken will be checked for
+	// expiry on subsequent periodic calls to deleteExpiredSessions.
+
 	server.sessionsLock.Lock()
+	expiredSessionIDs := make([]string, 0)
 	for sessionID, session := range server.sessions {
 		if session.expired() {
-			server.closeSessionHelper(sessionID, session)
+			expiredSessionIDs = append(expiredSessionIDs, sessionID)
 		}
 	}
 	server.sessionsLock.Unlock()
+
+	for _, sessionID := range expiredSessionIDs {
+		server.deleteSession(sessionID)
+	}
 }
 
 // httpConnStateCallback tracks open persistent HTTP/HTTPS connections to the
@@ -655,6 +700,7 @@ type meekSession struct {
 	metricPeakCachedResponseHitSize  int64
 	metricCachedResponseMissPosition int64
 	lock                             sync.Mutex
+	deleted                          bool
 	clientConn                       *meekConn
 	meekProtocolVersion              int
 	sessionIDSent                    bool
@@ -670,7 +716,20 @@ func (session *meekSession) expired() bool {
 	return monotime.Since(lastActivity) > MEEK_MAX_SESSION_STALENESS
 }
 
-// GetMetrics implements the MetricsSource interface
+// closeSession releases all resources allocated by a session.
+// session.lock must be held by the caller.
+func (session *meekSession) close() {
+
+	// TODO: close the persistent HTTP client connection, if one exists
+
+	session.clientConn.Close()
+
+	// Release all extended buffers back to the pool.
+	// This is not safe for concurrent calls.
+	session.cachedResponse.Reset()
+}
+
+// GetMetrics implements the MetricsSource interface.
 func (session *meekSession) GetMetrics() LogFields {
 	logFields := make(LogFields)
 	logFields["meek_client_retries"] = atomic.LoadInt64(&session.metricClientRetries)