Просмотр исходного кода

Further adjustments to RoundTripperFailedError logic and other fixes

- Add missing DialMeek failure RoundTripperFailedError case
- Classify fast failures as RoundTripperFailedErrors
- Return 200 OK, no-match in matcher rate limit cases
- Fix STUN tactics overwrite and race condition
- Make resolver "context canceled" errors less ambiguous
Rod Hynes 1 год назад
Родитель
Сommit
f448b18af6

+ 56 - 23
psiphon/common/inproxy/broker.go

@@ -21,6 +21,7 @@ package inproxy
 
 import (
 	"context"
+	std_errors "errors"
 	"net"
 	"strconv"
 	"sync"
@@ -596,21 +597,37 @@ func (b *Broker) handleProxyAnnounce(
 		})
 	if err != nil {
 
-		if announceCtx.Err() == nil {
+		var limitError *MatcherLimitError
+		limited := std_errors.As(err, &limitError)
+
+		timeout := announceCtx.Err() == context.DeadlineExceeded
+
+		if !limited && !timeout {
 			return nil, errors.Trace(err)
 		}
 
-		timedOut = true
+		if timeout {
+
+			// Time out awaiting match. Still send a no-match response, as this is
+			// not an unexpected outcome and the proxy should not incorrectly
+			// flag its BrokerClient as having failed.
+			//
+			// Note: the respective proxy and broker timeouts,
+			// InproxyBrokerProxyAnnounceTimeout and
+			// InproxyProxyAnnounceRequestTimeout in tactics, should be
+			// configured so that the broker will timeout first and have an
+			// opportunity to send this response before the proxy times out.
+
+			timedOut = true
 
-		// Time out awaiting match. Still send a no-match response, as this is
-		// not an unexpected outcome and the proxy should not incorrectly
-		// flag its BrokerClient as having failed.
-		//
-		// Note: the respective proxy and broker timeouts,
-		// InproxyBrokerProxyAnnounceTimeout and
-		// InproxyProxyAnnounceRequestTimeout in tactics, should be
-		// configured so that the broker will timeout first and have an
-		// opportunity to send this response before the proxy times out.
+		} else if limited {
+
+			// The limit error case also returns a no-match response, so the
+			// proxy doesn't receive a 404 flag its BrokerClient as having failed.
+
+			b.config.Logger.WithTraceFields(
+				common.LogFields{"err": err.Error()}).Info("announcement limited")
+		}
 
 		responsePayload, err := MarshalProxyAnnounceResponse(
 			&ProxyAnnounceResponse{
@@ -802,21 +819,37 @@ func (b *Broker) handleClientOffer(
 		clientMatchOffer)
 	if err != nil {
 
-		if offerCtx.Err() == nil {
+		var limitError *MatcherLimitError
+		limited := std_errors.As(err, &limitError)
+
+		timeout := offerCtx.Err() == context.DeadlineExceeded
+
+		if !limited && !timeout {
 			return nil, errors.Trace(err)
 		}
 
-		timedOut = true
-
-		// Time out awaiting match. Still send a no-match response, as this is
-		// not an unexpected outcome and the client should not incorrectly
-		// flag its BrokerClient as having failed.
-		//
-		// Note: the respective client and broker timeouts,
-		// InproxyBrokerClientOfferTimeout and
-		// InproxyClientOfferRequestTimeout in tactics, should be configured
-		// so that the broker will timeout first and have an opportunity to
-		// send this response before the client times out.
+		if timeout {
+
+			// Time out awaiting match. Still send a no-match response, as this is
+			// not an unexpected outcome and the client should not incorrectly
+			// flag its BrokerClient as having failed.
+			//
+			// Note: the respective client and broker timeouts,
+			// InproxyBrokerClientOfferTimeout and
+			// InproxyClientOfferRequestTimeout in tactics, should be configured
+			// so that the broker will timeout first and have an opportunity to
+			// send this response before the client times out.
+
+			timedOut = true
+
+		} else if limited {
+
+			// The limit error case also returns a no-match response, so the
+			// client doesn't receive a 404 flag its BrokerClient as having failed.
+
+			b.config.Logger.WithTraceFields(
+				common.LogFields{"err": err.Error()}).Info("offer limited")
+		}
 
 		responsePayload, err := MarshalClientOfferResponse(
 			&ClientOfferResponse{NoMatch: true})

+ 19 - 2
psiphon/common/inproxy/matcher.go

@@ -20,6 +20,7 @@ package inproxy
 
 import (
 	"context"
+	std_errors "errors"
 	"net"
 	"sync"
 	"time"
@@ -744,6 +745,20 @@ func (m *Matcher) matchOffer(offerEntry *offerEntry) (int, bool) {
 	return bestMatch, bestMatch != -1
 }
 
+// MatcherLimitError is the error type returned by Announce or Offer when the
+// caller has exceeded configured queue entry or rate limits.
+type MatcherLimitError struct {
+	err error
+}
+
+func NewMatcherLimitError(err error) *MatcherLimitError {
+	return &MatcherLimitError{err: err}
+}
+
+func (e MatcherLimitError) Error() string {
+	return e.err.Error()
+}
+
 func (m *Matcher) applyLimits(isAnnouncement bool, limitIP string, proxyID ID) error {
 
 	// Assumes the m.announcementQueueMutex or m.offerQueue mutex is locked.
@@ -794,14 +809,16 @@ func (m *Matcher) applyLimits(isAnnouncement bool, limitIP string, proxyID ID) e
 		}
 
 		if rateLimiter.TakeAvailable(1) < 1 {
-			return errors.TraceNew("rate exceeded for IP")
+			return errors.Trace(
+				NewMatcherLimitError(std_errors.New("rate exceeded for IP")))
 		}
 	}
 
 	if limitEntryCount > 0 {
 		entryCount, ok := entryCountByIP[limitIP]
 		if ok && entryCount >= limitEntryCount {
-			return errors.TraceNew("max entries for IP")
+			return errors.Trace(
+				NewMatcherLimitError(std_errors.New("max entries for IP")))
 		}
 	}
 

+ 2 - 0
psiphon/common/parameters/parameters.go

@@ -402,6 +402,7 @@ const (
 	InproxyClientOfferRetryDelay                       = "InproxyClientOfferRetryDelay"
 	InproxyClientOfferRetryJitter                      = "InproxyClientOfferRetryJitter"
 	InproxyClientRelayedPacketRequestTimeout           = "InproxyCloientRelayedPacketRequestTimeout"
+	InproxyBrokerRoundTripStatusCodeFailureThreshold   = "InproxyBrokerRoundTripStatusCodeFailureThreshold"
 	InproxyDTLSRandomizationProbability                = "InproxyDTLSRandomizationProbability"
 	InproxyDataChannelTrafficShapingProbability        = "InproxyDataChannelTrafficShapingProbability"
 	InproxyDataChannelTrafficShapingParameters         = "InproxyDataChannelTrafficShapingParameters"
@@ -882,6 +883,7 @@ var defaultParameters = map[string]struct {
 	InproxyClientOfferRetryDelay:                       {value: 1 * time.Second, minimum: time.Duration(0)},
 	InproxyClientOfferRetryJitter:                      {value: 0.3, minimum: 0.0},
 	InproxyClientRelayedPacketRequestTimeout:           {value: 10 * time.Second, minimum: time.Duration(0)},
+	InproxyBrokerRoundTripStatusCodeFailureThreshold:   {value: 2 * time.Second, minimum: time.Duration(0), flags: useNetworkLatencyMultiplier},
 	InproxyDTLSRandomizationProbability:                {value: 0.5, minimum: 0.0},
 	InproxyDataChannelTrafficShapingProbability:        {value: 0.5, minimum: 0.0},
 	InproxyDataChannelTrafficShapingParameters:         {value: InproxyDataChannelTrafficShapingParametersValue{0, 10, 0, 1500, 0, 10, 1, 1500, 0.5}},

+ 9 - 5
psiphon/common/resolver/resolver.go

@@ -668,8 +668,8 @@ func (r *Resolver) ResolveIP(
 
 	// Orchestrate the DNS requests
 
-	resolveCtx, cancelFunc := context.WithCancel(ctx)
-	defer cancelFunc()
+	resolveCtx, cancelFunc := context.WithCancelCause(ctx)
+	defer cancelFunc(nil)
 	waitGroup := new(sync.WaitGroup)
 	conns := common.NewConns[net.Conn]()
 	type answer struct {
@@ -883,7 +883,8 @@ func (r *Resolver) ResolveIP(
 			//
 			// Append the existing lastErr, which may convey useful
 			// information to be reported in a failed_tunnel error message.
-			lastErr.Store(errors.Tracef("%v (lastErr: %v)", ctx.Err(), lastErr.Load()))
+			lastErr.Store(errors.Tracef(
+				"%v (lastErr: %v)", context.Cause(resolveCtx), lastErr.Load()))
 			stop = true
 		}
 	}
@@ -955,13 +956,16 @@ func (r *Resolver) ResolveIP(
 	}
 
 	// Interrupt all workers.
-	cancelFunc()
+	cancelFunc(errors.TraceNew("resolve canceled"))
 	conns.CloseAll()
 	waitGroup.Wait()
 
 	// When there's no answer, return the last error.
 	if result == nil {
 		err := lastErr.Load()
+		if err == nil {
+			err = context.Cause(resolveCtx)
+		}
 		if err == nil {
 			err = errors.TraceNew("unexpected missing error")
 		}
@@ -1515,7 +1519,7 @@ func performDNSQuery(
 			// information about why a response was rejected.
 			err := lastErr
 			if err == nil {
-				err = errors.Trace(resolveCtx.Err())
+				err = errors.Trace(context.Cause(resolveCtx))
 			}
 
 			return nil, nil, RTT, err

+ 49 - 24
psiphon/inproxy.go

@@ -152,6 +152,7 @@ func (b *InproxyBrokerClientManager) reset() error {
 		// close is invoked in the resetBrokerClientOnRoundTripperFailed
 		// case, where it's expected that the round tripped has permanently
 		// failed.
+
 		b.brokerClientInstance.Close()
 	}
 
@@ -314,7 +315,7 @@ func NewInproxyBrokerClientInstance(
 		return nil, errors.Trace(err)
 	}
 
-	roundTripper := NewInproxyBrokerRoundTripper(brokerDialParams)
+	roundTripper := NewInproxyBrokerRoundTripper(p, brokerDialParams)
 
 	// Clients always generate an ephemeral session key pair. Proxies may opt
 	// to use a long-lived key pair for proxied traffic attribution.
@@ -1147,6 +1148,7 @@ type InproxyBrokerRoundTripper struct {
 	dialCompleted    chan struct{}
 	dialErr          error
 	conn             *MeekConn
+	failureThreshold time.Duration
 }
 
 // NewInproxyBrokerRoundTripper creates a new InproxyBrokerRoundTripper. The
@@ -1156,6 +1158,7 @@ type InproxyBrokerRoundTripper struct {
 // The input brokerDialParams dial parameter and config fields must not
 // modifed after NewInproxyBrokerRoundTripper is called.
 func NewInproxyBrokerRoundTripper(
+	p parameters.ParametersAccessor,
 	brokerDialParams *InproxyBrokerDialParameters) *InproxyBrokerRoundTripper {
 
 	runCtx, stopRunning := context.WithCancel(context.Background())
@@ -1165,6 +1168,8 @@ func NewInproxyBrokerRoundTripper(
 		runCtx:           runCtx,
 		stopRunning:      stopRunning,
 		dialCompleted:    make(chan struct{}),
+		failureThreshold: p.Duration(
+			parameters.InproxyBrokerRoundTripStatusCodeFailureThreshold),
 	}
 }
 
@@ -1240,6 +1245,17 @@ func (rt *InproxyBrokerRoundTripper) RoundTrip(
 			rt.brokerDialParams.meekConfig,
 			rt.brokerDialParams.dialConfig)
 
+		if err != nil && ctx.Err() != context.Canceled {
+
+			// DialMeek performs an initial TLS handshake. DialMeek errors,
+			// excluding a cancelled context as happens on shutdown, are
+			// classified as as RoundTripperFailedErrors, which will invoke
+			// BrokerClientRoundTripperFailed, resetting the round tripper
+			// and clearing replay parameters.
+
+			err = inproxy.NewRoundTripperFailedError(err)
+		}
+
 		rt.conn = conn
 		rt.dialErr = err
 		close(rt.dialCompleted)
@@ -1259,6 +1275,11 @@ func (rt *InproxyBrokerRoundTripper) RoundTrip(
 		}
 
 		if rt.dialErr != nil {
+
+			// There is no NewRoundTripperFailedError wrapping here, as the
+			// DialMeek caller will wrap its error and
+			// BrokerClientRoundTripperFailed will be invoked already.
+
 			return nil, errors.Trace(rt.dialErr)
 		}
 	}
@@ -1278,29 +1299,44 @@ func (rt *InproxyBrokerRoundTripper) RoundTrip(
 		return nil, errors.Trace(err)
 	}
 
+	startTime := time.Now()
 	response, err := rt.conn.RoundTrip(request)
+	roundTripDuration := time.Since(startTime)
+
 	if err == nil {
 		defer response.Body.Close()
 		if response.StatusCode != http.StatusOK {
 
-			// This case is treated as a temporary round tripper failure,
-			// since we received a response from the CDN, secured with TLS
-			// and VerifyPins, or from broker itself. One common scenario is
-			// the CDN returning a temporary gateway failed or timeout error.
+			err = fmt.Errorf("unexpected response status code: %d", response.StatusCode)
+
+			// Depending on the round trip duration, this case is treated as a
+			// temporary round tripper failure, since we received a response
+			// from the CDN, secured with TLS and VerifyPins, or from broker
+			// itself. One common scenario is the CDN returning a temporary
+			// timeout error, as can happen when CDN timeouts and broker
+			// timeouts are misaligned, especially for long-polling requests.
+			//
 			// In this scenario, we can reuse the existing round tripper and
 			// it may be counterproductive to return a RoundTripperFailedError
 			// which will trigger a clearing of any broker dial replay
 			// parameters as well as reseting the round tripper.
+			//
+			// When the round trip duration is sufficiently short, much
+			// shorter than expected round trip timeouts, this is still
+			// classified as a RoundTripperFailedError error, as it is more
+			// likely due to a more serious issue between the CDN and broker.
 
-			err = fmt.Errorf("unexpected response status code: %d", response.StatusCode)
+			if rt.failureThreshold > 0 &&
+				roundTripDuration <= rt.failureThreshold {
+
+				err = inproxy.NewRoundTripperFailedError(err)
+			}
 		}
 	} else if ctx.Err() != context.Canceled {
 
 		// Other round trip errors, including TLS failures and client-side
-		// timeouts,  but excluding a cancelled context as happens on
-		// shutdown, are classified as RoundTripperFailedErrors, which will
-		// invoke BrokerClientRoundTripperFailed, resetting the round tripper
-		// and clearing replay parameters.
+		// timeouts, but excluding a cancelled context as happens on
+		// shutdown, are classified as RoundTripperFailedErrors.
 
 		err = inproxy.NewRoundTripperFailedError(err)
 	}
@@ -1697,23 +1733,12 @@ func MakeInproxySTUNDialParameters(
 	var stunServerAddress, stunServerAddressRFC5780 string
 
 	if len(stunServerAddresses) > 0 {
-		prng.Shuffle(
-			len(stunServerAddresses),
-			func(i, j int) {
-				stunServerAddresses[i], stunServerAddresses[j] =
-					stunServerAddresses[j], stunServerAddresses[i]
-			})
-		stunServerAddress = stunServerAddresses[0]
+		stunServerAddress = stunServerAddresses[prng.Range(0, len(stunServerAddresses)-1)]
 	}
 
 	if len(stunServerAddressesRFC5780) > 0 {
-		prng.Shuffle(
-			len(stunServerAddressesRFC5780),
-			func(i, j int) {
-				stunServerAddressesRFC5780[i], stunServerAddressesRFC5780[j] =
-					stunServerAddressesRFC5780[j], stunServerAddressesRFC5780[i]
-			})
-		stunServerAddressRFC5780 = stunServerAddressesRFC5780[0]
+		stunServerAddressRFC5780 =
+			stunServerAddressesRFC5780[prng.Range(0, len(stunServerAddressesRFC5780)-1)]
 	}
 
 	// Create DNS resolver dial parameters to use when resolving STUN server