burstobserver.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package burst
  2. import (
  3. "context"
  4. "sync"
  5. "github.com/xtls/xray-core/app/observatory"
  6. "github.com/xtls/xray-core/common"
  7. "github.com/xtls/xray-core/common/errors"
  8. "github.com/xtls/xray-core/common/signal/done"
  9. "github.com/xtls/xray-core/core"
  10. "github.com/xtls/xray-core/features/extension"
  11. "github.com/xtls/xray-core/features/outbound"
  12. "github.com/xtls/xray-core/features/routing"
  13. "google.golang.org/protobuf/proto"
  14. )
  15. type Observer struct {
  16. config *Config
  17. ctx context.Context
  18. statusLock sync.Mutex
  19. hp *HealthPing
  20. finished *done.Instance
  21. ohm outbound.Manager
  22. }
  23. func (o *Observer) GetObservation(ctx context.Context) (proto.Message, error) {
  24. return &observatory.ObservationResult{Status: o.createResult()}, nil
  25. }
  26. func (o *Observer) createResult() []*observatory.OutboundStatus {
  27. var result []*observatory.OutboundStatus
  28. o.hp.access.Lock()
  29. defer o.hp.access.Unlock()
  30. for name, value := range o.hp.Results {
  31. status := observatory.OutboundStatus{
  32. Alive: value.getStatistics().All != value.getStatistics().Fail,
  33. Delay: value.getStatistics().Average.Milliseconds(),
  34. LastErrorReason: "",
  35. OutboundTag: name,
  36. LastSeenTime: 0,
  37. LastTryTime: 0,
  38. HealthPing: &observatory.HealthPingMeasurementResult{
  39. All: int64(value.getStatistics().All),
  40. Fail: int64(value.getStatistics().Fail),
  41. Deviation: int64(value.getStatistics().Deviation),
  42. Average: int64(value.getStatistics().Average),
  43. Max: int64(value.getStatistics().Max),
  44. Min: int64(value.getStatistics().Min),
  45. },
  46. }
  47. result = append(result, &status)
  48. }
  49. return result
  50. }
  51. func (o *Observer) Type() interface{} {
  52. return extension.ObservatoryType()
  53. }
  54. func (o *Observer) Start() error {
  55. if o.config != nil && len(o.config.SubjectSelector) != 0 {
  56. o.finished = done.New()
  57. o.hp.StartScheduler(func() ([]string, error) {
  58. hs, ok := o.ohm.(outbound.HandlerSelector)
  59. if !ok {
  60. return nil, errors.New("outbound.Manager is not a HandlerSelector")
  61. }
  62. outbounds := hs.Select(o.config.SubjectSelector)
  63. return outbounds, nil
  64. })
  65. }
  66. return nil
  67. }
  68. func (o *Observer) Close() error {
  69. if o.finished != nil {
  70. o.hp.StopScheduler()
  71. return o.finished.Close()
  72. }
  73. return nil
  74. }
  75. func New(ctx context.Context, config *Config) (*Observer, error) {
  76. var outboundManager outbound.Manager
  77. var dispatcher routing.Dispatcher
  78. err := core.RequireFeatures(ctx, func(om outbound.Manager, rd routing.Dispatcher) {
  79. outboundManager = om
  80. dispatcher = rd
  81. })
  82. if err != nil {
  83. return nil, errors.New("Cannot get depended features").Base(err)
  84. }
  85. hp := NewHealthPing(ctx, dispatcher, config.PingConfig)
  86. return &Observer{
  87. config: config,
  88. ctx: ctx,
  89. ohm: outboundManager,
  90. hp: hp,
  91. }, nil
  92. }
  93. func init() {
  94. common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  95. return New(ctx, config.(*Config))
  96. }))
  97. }