Browse Source

VLESS Reverse Proxy: Check burstObservatory immediately after inbound adds new reverse-mux to reverse-outbound (#5752)

Fixes https://github.com/XTLS/Xray-core/issues/5750

---------

Co-authored-by: RPRX <[email protected]>
风扇滑翔翼 2 months ago
parent
commit
bb05684407

+ 4 - 0
app/observatory/burst/burstobserver.go

@@ -32,6 +32,10 @@ func (o *Observer) GetObservation(ctx context.Context) (proto.Message, error) {
 	return &observatory.ObservationResult{Status: o.createResult()}, nil
 	return &observatory.ObservationResult{Status: o.createResult()}, nil
 }
 }
 
 
+func (o *Observer) Check(tag []string) {
+	o.hp.Check(tag)
+}
+
 func (o *Observer) createResult() []*observatory.OutboundStatus {
 func (o *Observer) createResult() []*observatory.OutboundStatus {
 	var result []*observatory.OutboundStatus
 	var result []*observatory.OutboundStatus
 	o.hp.access.Lock()
 	o.hp.access.Lock()

+ 5 - 0
features/extension/observatory.go

@@ -13,6 +13,11 @@ type Observatory interface {
 	GetObservation(ctx context.Context) (proto.Message, error)
 	GetObservation(ctx context.Context) (proto.Message, error)
 }
 }
 
 
+type BurstObservatory interface {
+	Observatory
+	Check(tag []string)
+}
+
 func ObservatoryType() interface{} {
 func ObservatoryType() interface{} {
 	return (*Observatory)(nil)
 	return (*Observatory)(nil)
 }
 }

+ 9 - 2
proxy/vless/inbound/inbound.go

@@ -27,7 +27,9 @@ import (
 	"github.com/xtls/xray-core/common/signal"
 	"github.com/xtls/xray-core/common/signal"
 	"github.com/xtls/xray-core/common/task"
 	"github.com/xtls/xray-core/common/task"
 	"github.com/xtls/xray-core/core"
 	"github.com/xtls/xray-core/core"
+	"github.com/xtls/xray-core/features"
 	"github.com/xtls/xray-core/features/dns"
 	"github.com/xtls/xray-core/features/dns"
+	"github.com/xtls/xray-core/features/extension"
 	feature_inbound "github.com/xtls/xray-core/features/inbound"
 	feature_inbound "github.com/xtls/xray-core/features/inbound"
 	"github.com/xtls/xray-core/features/outbound"
 	"github.com/xtls/xray-core/features/outbound"
 	"github.com/xtls/xray-core/features/policy"
 	"github.com/xtls/xray-core/features/policy"
@@ -78,6 +80,7 @@ type Handler struct {
 	validator              vless.Validator
 	validator              vless.Validator
 	decryption             *encryption.ServerInstance
 	decryption             *encryption.ServerInstance
 	outboundHandlerManager outbound.Manager
 	outboundHandlerManager outbound.Manager
+	observer               features.Feature
 	defaultDispatcher      routing.Dispatcher
 	defaultDispatcher      routing.Dispatcher
 	ctx                    context.Context
 	ctx                    context.Context
 	fallbacks              map[string]map[string]map[string]*Fallback // or nil
 	fallbacks              map[string]map[string]map[string]*Fallback // or nil
@@ -93,6 +96,7 @@ func New(ctx context.Context, config *Config, dc dns.Client, validator vless.Val
 		stats:                  v.GetFeature(stats.ManagerType()).(stats.Manager),
 		stats:                  v.GetFeature(stats.ManagerType()).(stats.Manager),
 		validator:              validator,
 		validator:              validator,
 		outboundHandlerManager: v.GetFeature(outbound.ManagerType()).(outbound.Manager),
 		outboundHandlerManager: v.GetFeature(outbound.ManagerType()).(outbound.Manager),
+		observer:               v.GetFeature(extension.ObservatoryType()),
 		defaultDispatcher:      v.GetFeature(routing.DispatcherType()).(routing.Dispatcher),
 		defaultDispatcher:      v.GetFeature(routing.DispatcherType()).(routing.Dispatcher),
 		ctx:                    ctx,
 		ctx:                    ctx,
 	}
 	}
@@ -623,7 +627,7 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
-		return r.NewMux(ctx, dispatcher.WrapLink(ctx, h.policyManager, h.stats, &transport.Link{Reader: clientReader, Writer: clientWriter}))
+		return r.NewMux(ctx, dispatcher.WrapLink(ctx, h.policyManager, h.stats, &transport.Link{Reader: clientReader, Writer: clientWriter}), h.observer)
 	}
 	}
 
 
 	if err := dispatch.DispatchLink(ctx, request.Destination(), &transport.Link{
 	if err := dispatch.DispatchLink(ctx, request.Destination(), &transport.Link{
@@ -645,7 +649,7 @@ func (r *Reverse) Tag() string {
 	return r.tag
 	return r.tag
 }
 }
 
 
-func (r *Reverse) NewMux(ctx context.Context, link *transport.Link) error {
+func (r *Reverse) NewMux(ctx context.Context, link *transport.Link, observer features.Feature) error {
 	muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{})
 	muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{})
 	if err != nil {
 	if err != nil {
 		return errors.New("failed to create mux client worker").Base(err).AtWarning()
 		return errors.New("failed to create mux client worker").Base(err).AtWarning()
@@ -655,6 +659,9 @@ func (r *Reverse) NewMux(ctx context.Context, link *transport.Link) error {
 		return errors.New("failed to create portal worker").Base(err).AtWarning()
 		return errors.New("failed to create portal worker").Base(err).AtWarning()
 	}
 	}
 	r.picker.AddWorker(worker)
 	r.picker.AddWorker(worker)
+	if burstObs, ok := observer.(extension.BurstObservatory); ok {
+		go burstObs.Check([]string{r.Tag()})
+	}
 	select {
 	select {
 	case <-ctx.Done():
 	case <-ctx.Done():
 	case <-muxClient.WaitClosed():
 	case <-muxClient.WaitClosed():