Просмотр исходного кода

Fix hang in InitiatorSessions.RoundTrip in waitToShareSession mode

- Signal initiators waiting for a session when the session fails and will
  never become ready to share, not just when the session becomes ready.

- Also increase inproxy_test round trip timeouts to ensure race detector test
  runs don't timeout unexpectedly.
Rod Hynes 1 год назад
Родитель
Сommit
380cc66c0a

+ 3 - 3
psiphon/common/inproxy/inproxy_test.go

@@ -807,8 +807,8 @@ func runHTTPServer(listener net.Listener, broker *Broker) error {
 
 	// WriteTimeout will be extended via extendTimeout.
 	httpServer := &http.Server{
-		ReadTimeout:  1 * time.Second,
-		WriteTimeout: 10 * time.Millisecond,
+		ReadTimeout:  10 * time.Second,
+		WriteTimeout: 10 * time.Second,
 		IdleTimeout:  1 * time.Minute,
 		Handler:      handler,
 	}
@@ -843,7 +843,7 @@ func newHTTPRoundTripper(endpointAddr string, path string) *httpRoundTripper {
 				ForceAttemptHTTP2:   true,
 				MaxIdleConns:        2,
 				IdleConnTimeout:     1 * time.Minute,
-				TLSHandshakeTimeout: 1 * time.Second,
+				TLSHandshakeTimeout: 10 * time.Second,
 				TLSClientConfig: &tls.Config{
 					InsecureSkipVerify: true,
 				},

+ 90 - 10
psiphon/common/inproxy/session.go

@@ -318,6 +318,11 @@ func (s *InitiatorSessions) RoundTrip(
 			// dials; the proxy will retry its announce request if it
 			// fails -- after an appropriate delay.
 
+			// If this round trip owns its session and there are any
+			// waitToShareSession initiators awaiting the session, signal them
+			// that the session will not become ready.
+			rt.TransportFailed()
+
 			return nil, errors.Trace(err)
 		}
 	}
@@ -389,7 +394,7 @@ func (s *InitiatorSessions) NewRoundTrip(
 func (s *InitiatorSessions) getSession(
 	publicKey SessionPublicKey,
 	newSession func() (*session, error)) (
-	retSession *session, retisNew bool, retIsReady bool, retErr error) {
+	retSession *session, retIsNew bool, retIsReady bool, retErr error) {
 
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
@@ -561,6 +566,17 @@ func (r *InitiatorRoundTrip) Next(
 					if !session.isReadyToShare(signal) {
 						select {
 						case <-signal:
+							if !session.isReadyToShare(nil) {
+
+								// The session failed to become ready to share due to a transport
+								// failure during the handshake. Fail this round trip. Don't
+								// create a new, unshared session since waitToShareSession was
+								// specified. It's expected that there will be retries by the
+								// RoundTrip caller.
+
+								return nil, errors.TraceNew("waitToShareSession failed")
+							}
+							// else, use the session
 						case <-ctx.Done():
 							return nil, errors.Trace(ctx.Err())
 						}
@@ -737,6 +753,22 @@ func (r *InitiatorRoundTrip) Next(
 	return sendPacket, nil
 }
 
+// TransportFailed marks any owned, not yet ready-to-share session as failed
+// and signals any other initiators waiting to share the session.
+//
+// TransportFailed should be called when using waitToShareSession and when
+// there is a transport level failure to relay a session packet.
+func (r *InitiatorRoundTrip) TransportFailed() {
+
+	r.mutex.Lock()
+	defer r.mutex.Unlock()
+
+	if !r.sharingSession && !r.session.isReadyToShare(nil) {
+		r.session.transportFailed()
+		r.initiatorSessions.removeIfSession(r.responderPublicKey, r.session)
+	}
+}
+
 // Response returns the round trip response. Call Response after Next returns
 // nil for the next packet to send, indicating that the round trip is
 // complete.
@@ -1304,6 +1336,7 @@ const (
 	sessionStateInitiator_XK_send_e_es = iota
 	sessionStateInitiator_XK_recv_e_ee_send_s_se_payload
 	sessionStateInitiator_XK_established
+	sessionStateInitiator_failed
 
 	sessionStateResponder_XK_recv_e_es_send_e_ee
 	sessionStateResponder_XK_recv_s_se_payload
@@ -1365,7 +1398,7 @@ type session struct {
 
 	mutex               sync.Mutex
 	state               sessionState
-	signalOnEstablished []chan struct{}
+	signalAwaitingReady []chan struct{}
 	handshake           *noise.HandshakeState
 	firstPayload        []byte
 	peerPublicKey       []byte
@@ -1475,7 +1508,7 @@ func newSession(
 		replayHistory:               replayHistory,
 		expectedInitiatorPublicKeys: expectedInitiatorPublicKeys,
 		state:                       state,
-		signalOnEstablished:         make([]chan struct{}, 0), // must be non-nil
+		signalAwaitingReady:         make([]chan struct{}, 0), // must be non-nil
 		handshake:                   handshake,
 		firstPayload:                firstPayload,
 	}, nil
@@ -1510,27 +1543,70 @@ func (s *session) isEstablished() bool {
 // isReadyToShare becomes true once the round trip performing the handshake
 // receives its round trip response, which demonstrates that the responder
 // received the final message.
+//
+// When a signal channel is specified, it is registered and signaled once the
+// session becomes ready to share _or_ the session fails to become ready due
+// to a transport failure. When signaled, the caller must call isReadyToShare
+// once again to distinguish between these two outcomes.
 func (s *session) isReadyToShare(signal chan struct{}) bool {
 
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
 
-	if !s.isInitiator {
+	if !s.isInitiator || s.state == sessionStateInitiator_failed {
+		// Signal immediately if transportFailed was already called.
+		if signal != nil {
+			close(signal)
+		}
 		return false
 	}
 
-	if s.handshake == nil && s.signalOnEstablished == nil {
+	if s.handshake == nil && s.signalAwaitingReady == nil {
 		return true
 	}
 
 	if signal != nil {
-		s.signalOnEstablished = append(
-			s.signalOnEstablished, signal)
+		s.signalAwaitingReady = append(
+			s.signalAwaitingReady, signal)
 	}
 
 	return false
 }
 
+// transportFailed marks the session as failed and signals any initiators
+// waiting to share the session.
+//
+// transportFailed is ignored if the session is already ready to share, as any
+// transport failures past that point affect only one application-level round
+// trip and not the session.
+func (s *session) transportFailed() {
+
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+
+	if !s.isInitiator {
+		return
+	}
+
+	// Already ready to share, so ignore the transport failure.
+	if s.handshake == nil && s.signalAwaitingReady == nil {
+		return
+	}
+
+	if s.state == sessionStateInitiator_failed {
+		return
+	}
+
+	// In the sessionStateInitiator_failed state, nextHandshakePacket will
+	// always fail.
+	s.state = sessionStateInitiator_failed
+
+	for _, signal := range s.signalAwaitingReady {
+		close(signal)
+	}
+	s.signalAwaitingReady = nil
+}
+
 // getPeerID returns the peer's public key, in the form of an ID. A given peer
 // identifier can only be provided by the peer with the corresponding private
 // key.
@@ -1859,14 +1935,18 @@ func (s *session) readyToShare() {
 
 	// Assumes s.mutex lock is held.
 
-	if s.signalOnEstablished == nil {
+	if !s.isInitiator {
+		return
+	}
+
+	if s.signalAwaitingReady == nil {
 		return
 	}
 
-	for _, signal := range s.signalOnEstablished {
+	for _, signal := range s.signalAwaitingReady {
 		close(signal)
 	}
-	s.signalOnEstablished = nil
+	s.signalAwaitingReady = nil
 }
 
 // Marshal and obfuscate a SessionPacket.

+ 46 - 0
psiphon/common/inproxy/session_test.go

@@ -120,6 +120,48 @@ func runTestSessions() error {
 		return errors.TraceNew("unexpected response")
 	}
 
+	// Test: RoundTrips with waitToShareSession are interrupted when session
+	// fails
+
+	responderSessions.sessions.Flush()
+
+	initiatorSessions = NewInitiatorSessions(initiatorPrivateKey)
+
+	failingRoundTripper := newTestSessionRoundTripper(nil, &initiatorPublicKey)
+
+	roundTripCount := 100
+
+	results := make(chan error, roundTripCount)
+
+	for i := 0; i < roundTripCount; i++ {
+		go func() {
+			time.Sleep(prng.DefaultPRNG().Period(0, 10*time.Millisecond))
+			waitToShareSession := true
+			_, err := initiatorSessions.RoundTrip(
+				context.Background(),
+				failingRoundTripper,
+				responderPublicKey,
+				responderRootObfuscationSecret,
+				waitToShareSession,
+				roundTripper.MakeRequest())
+			results <- err
+		}()
+	}
+
+	waitToShareSessionFailed := false
+	for i := 0; i < roundTripCount; i++ {
+		err := <-results
+		if err == nil {
+			return errors.TraceNew("unexpected success")
+		}
+		if strings.HasSuffix(err.Error(), "waitToShareSession failed") {
+			waitToShareSessionFailed = true
+		}
+	}
+	if !waitToShareSessionFailed {
+		return errors.TraceNew("missing waitToShareSession failed error")
+	}
+
 	// Test: expected known initiator public key
 
 	initiatorSessions = NewInitiatorSessions(initiatorPrivateKey)
@@ -354,6 +396,10 @@ func (t *testSessionRoundTripper) RoundTrip(ctx context.Context, requestPayload
 		return nil, errors.Trace(err)
 	}
 
+	if t.sessions == nil {
+		return nil, errors.TraceNew("closed")
+	}
+
 	unwrappedRequestHandler := func(initiatorID ID, unwrappedRequest []byte) ([]byte, error) {
 
 		if t.expectedPeerPublicKey != nil {