Browse Source

Mitigate ResponderSessions lock contention and blocking

- Use a read lock to allow for concurrent checks for an existing session
- Don't retain a lock when executing newSession
- Use a semaphore to enforce a sanity check maximum number of concurrent
  newSession calls
Rod Hynes 1 year ago
parent
commit
9d95e3c33b
1 changed files with 43 additions and 4 deletions
  1. 43 4
      psiphon/common/inproxy/session.go

+ 43 - 4
psiphon/common/inproxy/session.go

@@ -37,6 +37,7 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
 	lrucache "github.com/cognusion/go-cache-lru"
 	"github.com/flynn/noise"
+	"github.com/marusama/semaphore"
 	"golang.org/x/crypto/curve25519"
 	"golang.zx2c4.com/wireguard/replay"
 )
@@ -50,6 +51,8 @@ const (
 
 	resetSessionTokenName      = "psiphon-inproxy-session-reset-session-token"
 	resetSessionTokenNonceSize = 32
+
+	maxResponderConcurrentNewSessions = 32768
 )
 
 const (
@@ -450,6 +453,12 @@ func (s *InitiatorSessions) getSession(
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
 
+	// Note: unlike in ResponderSessions.getSession, there is no indication,
+	// in profiling, of high lock contention and blocking here when holding
+	// the mutex lock while calling newSession. The lock is left in place to
+	// preserve the semantics of only one concurrent newSession call,
+	// particularly for brokers initiating new sessions with servers.
+
 	session, ok := s.sessions[publicKey]
 	if ok {
 		return session, false, session.isReadyToShare(nil), nil
@@ -860,8 +869,10 @@ type ResponderSessions struct {
 	obfuscationReplayHistory    *obfuscationReplayHistory
 	expectedInitiatorPublicKeys *sessionPublicKeyLookup
 
-	mutex    sync.Mutex
+	mutex    sync.RWMutex
 	sessions *lrucache.Cache
+
+	concurrentNewSessions semaphore.Semaphore
 }
 
 // NewResponderSessions creates a new ResponderSessions which allows any
@@ -883,6 +894,7 @@ func NewResponderSessions(
 		applyTTL:                 true,
 		obfuscationReplayHistory: newObfuscationReplayHistory(),
 		sessions:                 lrucache.NewWithLRU(sessionsTTL, 1*time.Minute, sessionsMaxSize),
+		concurrentNewSessions:    semaphore.New(maxResponderConcurrentNewSessions),
 	}, nil
 }
 
@@ -1210,16 +1222,35 @@ func (s *ResponderSessions) touchSession(sessionID ID, session *session) {
 // creates a new session, and places it in the cache, if not found.
 func (s *ResponderSessions) getSession(sessionID ID) (*session, error) {
 
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
+	// Concurrency: profiling indicates that holding the mutex lock here when
+	// calling newSession leads to high contention and blocking. Instead,
+	// release the lock after checking for an existing session, and then
+	// recheck -- using lrucache.Add, which fails if an entry exists -- when
+	// inserting.
+	//
+	// A read-only lock is obtained on the initial check, allowing for
+	// concurrent checks; however, note that lrucache has its own RWMutex and
+	// obtains a write lock in Get when LRU ejection may need to be performed.
+	//
+	// A semaphore is used to enforce a sanity check maximum number of
+	// concurrent newSession calls.
+	//
+	// TODO: add a timeout or stop signal to Acquire?
 
 	strSessionID := string(sessionID[:])
 
+	s.mutex.RLock()
 	entry, ok := s.sessions.Get(strSessionID)
+	s.mutex.RUnlock()
+
 	if ok {
 		return entry.(*session), nil
 	}
 
+	err := s.concurrentNewSessions.Acquire(context.Background(), 1)
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
 	session, err := newSession(
 		false, // !isInitiator
 		s.privateKey,
@@ -1230,12 +1261,20 @@ func (s *ResponderSessions) getSession(sessionID ID) (*session, error) {
 		nil,
 		&sessionID,
 		s.expectedInitiatorPublicKeys)
+	s.concurrentNewSessions.Release(1)
+
 	if err != nil {
 		return nil, errors.Trace(err)
 	}
 
-	s.sessions.Set(
+	s.mutex.Lock()
+	err = s.sessions.Add(
 		strSessionID, session, lrucache.DefaultExpiration)
+	s.mutex.Unlock()
+
+	if err != nil {
+		return nil, errors.Trace(err)
+	}
 
 	return session, nil
 }