|
|
@@ -102,6 +102,7 @@ type Matcher struct {
|
|
|
offerLimitEntryCount int
|
|
|
offerRateLimitQuantity int
|
|
|
offerRateLimitInterval time.Duration
|
|
|
+ offerMinimumDeadline time.Duration
|
|
|
|
|
|
matchSignal chan struct{}
|
|
|
|
|
|
@@ -124,6 +125,7 @@ type MatcherConfig struct {
|
|
|
OfferLimitEntryCount int
|
|
|
OfferRateLimitQuantity int
|
|
|
OfferRateLimitInterval time.Duration
|
|
|
+ OfferMinimumDeadline time.Duration
|
|
|
|
|
|
// Proxy quality state.
|
|
|
ProxyQualityState *ProxyQualityState
|
|
|
@@ -220,6 +222,7 @@ type MatchAnswer struct {
|
|
|
// MatchMetrics records statistics about the match queue state at the time a
|
|
|
// match is made.
|
|
|
type MatchMetrics struct {
|
|
|
+ OfferDeadline time.Duration
|
|
|
OfferMatchIndex int
|
|
|
OfferQueueSize int
|
|
|
AnnouncementMatchIndex int
|
|
|
@@ -233,6 +236,7 @@ func (metrics *MatchMetrics) GetMetrics() common.LogFields {
|
|
|
return nil
|
|
|
}
|
|
|
return common.LogFields{
|
|
|
+ "offer_deadline": int64(metrics.OfferDeadline / time.Millisecond),
|
|
|
"offer_match_index": metrics.OfferMatchIndex,
|
|
|
"offer_queue_size": metrics.OfferQueueSize,
|
|
|
"announcement_match_index": metrics.AnnouncementMatchIndex,
|
|
|
@@ -283,6 +287,11 @@ func (offerEntry *offerEntry) getMatchMetrics() *MatchMetrics {
|
|
|
type answerInfo struct {
|
|
|
announcement *MatchAnnouncement
|
|
|
answer *MatchAnswer
|
|
|
+
|
|
|
+ // offerDropped is sent to Offer's answer channel when the offer has been
|
|
|
+ // dropped by the matcher due to age. This allows Offer to return
|
|
|
+ // immediately on drop and the request handler to log this outcome.
|
|
|
+ offerDropped bool
|
|
|
}
|
|
|
|
|
|
// pendingAnswer represents an answer that is expected to arrive from a
|
|
|
@@ -333,7 +342,8 @@ func NewMatcher(config *MatcherConfig) *Matcher {
|
|
|
config.AnnouncementNonlimitedProxyIDs,
|
|
|
config.OfferLimitEntryCount,
|
|
|
config.OfferRateLimitQuantity,
|
|
|
- config.OfferRateLimitInterval)
|
|
|
+ config.OfferRateLimitInterval,
|
|
|
+ config.OfferMinimumDeadline)
|
|
|
|
|
|
return m
|
|
|
}
|
|
|
@@ -350,7 +360,8 @@ func (m *Matcher) SetLimits(
|
|
|
announcementNonlimitedProxyIDs []ID,
|
|
|
offerLimitEntryCount int,
|
|
|
offerRateLimitQuantity int,
|
|
|
- offerRateLimitInterval time.Duration) {
|
|
|
+ offerRateLimitInterval time.Duration,
|
|
|
+ offerMinimumDeadline time.Duration) {
|
|
|
|
|
|
nonlimitedProxyIDs := make(map[ID]struct{})
|
|
|
for _, proxyID := range announcementNonlimitedProxyIDs {
|
|
|
@@ -368,6 +379,7 @@ func (m *Matcher) SetLimits(
|
|
|
m.offerLimitEntryCount = offerLimitEntryCount
|
|
|
m.offerRateLimitQuantity = offerRateLimitQuantity
|
|
|
m.offerRateLimitInterval = offerRateLimitInterval
|
|
|
+ m.offerMinimumDeadline = offerMinimumDeadline
|
|
|
m.offerQueueMutex.Unlock()
|
|
|
}
|
|
|
|
|
|
@@ -473,6 +485,8 @@ func (m *Matcher) Announce(
|
|
|
return clientOffer, announcementEntry.getMatchMetrics(), nil
|
|
|
}
|
|
|
|
|
|
+var errOfferDropped = std_errors.New("offer dropped")
|
|
|
+
|
|
|
// Offer enqueues the client offer and blocks until it is matched with a
|
|
|
// returned announcement or ctx is done. The caller must not mutate the offer
|
|
|
// or its properties after calling Announce.
|
|
|
@@ -546,6 +560,11 @@ func (m *Matcher) Offer(
|
|
|
offerEntry.getMatchMetrics(), errors.TraceNew("no answer")
|
|
|
}
|
|
|
|
|
|
+ if proxyAnswerInfo.offerDropped {
|
|
|
+ return nil, nil,
|
|
|
+ offerEntry.getMatchMetrics(), errOfferDropped
|
|
|
+ }
|
|
|
+
|
|
|
// This is a sanity check and not expected to fail.
|
|
|
if !proxyAnswerInfo.answer.ConnectionID.Equal(
|
|
|
proxyAnswerInfo.announcement.ConnectionID) {
|
|
|
@@ -559,6 +578,8 @@ func (m *Matcher) Offer(
|
|
|
nil
|
|
|
}
|
|
|
|
|
|
+var errNoPendingAnswer = std_errors.New("no pending answer")
|
|
|
+
|
|
|
// AnnouncementHasPersonalCompartmentIDs looks for a pending answer for an
|
|
|
// announcement identified by the specified proxy ID and connection ID and
|
|
|
// returns whether the announcement has personal compartment IDs, indicating
|
|
|
@@ -573,7 +594,7 @@ func (m *Matcher) AnnouncementHasPersonalCompartmentIDs(
|
|
|
if !ok {
|
|
|
// The input IDs don't correspond to a pending answer, or the client
|
|
|
// is no longer awaiting the response.
|
|
|
- return false, errors.TraceNew("no pending answer")
|
|
|
+ return false, errors.Trace(errNoPendingAnswer)
|
|
|
}
|
|
|
|
|
|
pendingAnswer := pendingAnswerValue.(*pendingAnswer)
|
|
|
@@ -599,7 +620,7 @@ func (m *Matcher) Answer(
|
|
|
if !ok {
|
|
|
// The input IDs don't correspond to a pending answer, or the client
|
|
|
// is no longer awaiting the response.
|
|
|
- return errors.TraceNew("no pending answer")
|
|
|
+ return errors.Trace(errNoPendingAnswer)
|
|
|
}
|
|
|
|
|
|
m.pendingAnswers.Delete(key)
|
|
|
@@ -677,15 +698,30 @@ func (m *Matcher) matchAllOffers() {
|
|
|
|
|
|
offerEntry := offer.Value.(*offerEntry)
|
|
|
|
|
|
- // Skip and remove this offer if its deadline has already passed.
|
|
|
- // There is no signal to the awaiting Offer function, as it will exit
|
|
|
- // based on the same ctx.
|
|
|
+ // Skip and remove this offer if its deadline has already passed or
|
|
|
+ // the context is canceled. There is no signal to the awaiting Offer
|
|
|
+ // function, as it will exit based on the same ctx.
|
|
|
|
|
|
if offerEntry.ctx.Err() != nil {
|
|
|
m.removeOfferEntry(false, offerEntry)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
+ offerDeadline, _ := offerEntry.ctx.Deadline()
|
|
|
+ untilOfferDeadline := time.Until(offerDeadline)
|
|
|
+
|
|
|
+ // Drop this offer if it no longer has a sufficient remaining deadline
|
|
|
+ // for the proxy answer phase. This case signals Offer's answerChan
|
|
|
+ // so it can return immediately.
|
|
|
+
|
|
|
+ if m.offerMinimumDeadline > 0 &&
|
|
|
+ untilOfferDeadline < m.offerMinimumDeadline {
|
|
|
+
|
|
|
+ m.removeOfferEntry(false, offerEntry)
|
|
|
+ offerEntry.answerChan <- &answerInfo{offerDropped: true}
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
announcementEntry, announcementMatchIndex := m.matchOffer(offerEntry)
|
|
|
if announcementEntry == nil {
|
|
|
continue
|
|
|
@@ -698,6 +734,7 @@ func (m *Matcher) matchAllOffers() {
|
|
|
// were inspected before matching.
|
|
|
|
|
|
matchMetrics := &MatchMetrics{
|
|
|
+ OfferDeadline: untilOfferDeadline,
|
|
|
OfferMatchIndex: offerIndex,
|
|
|
OfferQueueSize: m.offerQueue.Len(),
|
|
|
AnnouncementMatchIndex: announcementMatchIndex,
|