agent.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281
  1. // Package ice implements the Interactive Connectivity Establishment (ICE)
  2. // protocol defined in rfc5245.
  3. package ice
  4. import (
  5. "context"
  6. "fmt"
  7. "net"
  8. "strings"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. atomicx "github.com/pion/ice/v2/internal/atomic"
  13. stunx "github.com/pion/ice/v2/internal/stun"
  14. "github.com/pion/logging"
  15. "github.com/pion/mdns"
  16. "github.com/pion/stun"
  17. "github.com/pion/transport/v2"
  18. "github.com/pion/transport/v2/packetio"
  19. "github.com/pion/transport/v2/stdnet"
  20. "github.com/pion/transport/v2/vnet"
  21. "golang.org/x/net/proxy"
  22. )
  23. type bindingRequest struct {
  24. timestamp time.Time
  25. transactionID [stun.TransactionIDSize]byte
  26. destination net.Addr
  27. isUseCandidate bool
  28. }
  29. // Agent represents the ICE agent
  30. type Agent struct {
  31. chanTask chan task
  32. afterRunFn []func(ctx context.Context)
  33. muAfterRun sync.Mutex
  34. onConnectionStateChangeHdlr atomic.Value // func(ConnectionState)
  35. onSelectedCandidatePairChangeHdlr atomic.Value // func(Candidate, Candidate)
  36. onCandidateHdlr atomic.Value // func(Candidate)
  37. // State owned by the taskLoop
  38. onConnected chan struct{}
  39. onConnectedOnce sync.Once
  40. // force candidate to be contacted immediately (instead of waiting for task ticker)
  41. forceCandidateContact chan bool
  42. tieBreaker uint64
  43. lite bool
  44. connectionState ConnectionState
  45. gatheringState GatheringState
  46. mDNSMode MulticastDNSMode
  47. mDNSName string
  48. mDNSConn *mdns.Conn
  49. muHaveStarted sync.Mutex
  50. startedCh <-chan struct{}
  51. startedFn func()
  52. isControlling bool
  53. maxBindingRequests uint16
  54. hostAcceptanceMinWait time.Duration
  55. srflxAcceptanceMinWait time.Duration
  56. prflxAcceptanceMinWait time.Duration
  57. relayAcceptanceMinWait time.Duration
  58. portMin uint16
  59. portMax uint16
  60. candidateTypes []CandidateType
  61. // How long connectivity checks can fail before the ICE Agent
  62. // goes to disconnected
  63. disconnectedTimeout time.Duration
  64. // How long connectivity checks can fail before the ICE Agent
  65. // goes to failed
  66. failedTimeout time.Duration
  67. // How often should we send keepalive packets?
  68. // 0 means never
  69. keepaliveInterval time.Duration
  70. // How often should we run our internal taskLoop to check for state changes when connecting
  71. checkInterval time.Duration
  72. localUfrag string
  73. localPwd string
  74. localCandidates map[NetworkType][]Candidate
  75. remoteUfrag string
  76. remotePwd string
  77. remoteCandidates map[NetworkType][]Candidate
  78. checklist []*CandidatePair
  79. selector pairCandidateSelector
  80. selectedPair atomic.Value // *CandidatePair
  81. urls []*URL
  82. networkTypes []NetworkType
  83. buf *packetio.Buffer
  84. // LRU of outbound Binding request Transaction IDs
  85. pendingBindingRequests []bindingRequest
  86. // 1:1 D-NAT IP address mapping
  87. extIPMapper *externalIPMapper
  88. // State for closing
  89. done chan struct{}
  90. taskLoopDone chan struct{}
  91. err atomicx.Error
  92. gatherCandidateCancel func()
  93. gatherCandidateDone chan struct{}
  94. chanCandidate chan Candidate
  95. chanCandidatePair chan *CandidatePair
  96. chanState chan ConnectionState
  97. loggerFactory logging.LoggerFactory
  98. log logging.LeveledLogger
  99. net transport.Net
  100. tcpMux TCPMux
  101. udpMux UDPMux
  102. udpMuxSrflx UniversalUDPMux
  103. interfaceFilter func(string) bool
  104. ipFilter func(net.IP) bool
  105. includeLoopback bool
  106. insecureSkipVerify bool
  107. proxyDialer proxy.Dialer
  108. }
  109. type task struct {
  110. fn func(context.Context, *Agent)
  111. done chan struct{}
  112. }
  113. // afterRun registers function to be run after the task.
  114. func (a *Agent) afterRun(f func(context.Context)) {
  115. a.muAfterRun.Lock()
  116. a.afterRunFn = append(a.afterRunFn, f)
  117. a.muAfterRun.Unlock()
  118. }
  119. func (a *Agent) getAfterRunFn() []func(context.Context) {
  120. a.muAfterRun.Lock()
  121. defer a.muAfterRun.Unlock()
  122. fns := a.afterRunFn
  123. a.afterRunFn = nil
  124. return fns
  125. }
  126. func (a *Agent) ok() error {
  127. select {
  128. case <-a.done:
  129. return a.getErr()
  130. default:
  131. }
  132. return nil
  133. }
  134. func (a *Agent) getErr() error {
  135. if err := a.err.Load(); err != nil {
  136. return err
  137. }
  138. return ErrClosed
  139. }
  140. // Run task in serial. Blocking tasks must be cancelable by context.
  141. func (a *Agent) run(ctx context.Context, t func(context.Context, *Agent)) error {
  142. if err := a.ok(); err != nil {
  143. return err
  144. }
  145. done := make(chan struct{})
  146. select {
  147. case <-ctx.Done():
  148. return ctx.Err()
  149. case a.chanTask <- task{t, done}:
  150. <-done
  151. return nil
  152. }
  153. }
  154. // taskLoop handles registered tasks and agent close.
  155. func (a *Agent) taskLoop() {
  156. after := func() {
  157. for {
  158. // Get and run func registered by afterRun().
  159. fns := a.getAfterRunFn()
  160. if len(fns) == 0 {
  161. break
  162. }
  163. for _, fn := range fns {
  164. fn(a.context())
  165. }
  166. }
  167. }
  168. defer func() {
  169. a.deleteAllCandidates()
  170. a.startedFn()
  171. if err := a.buf.Close(); err != nil {
  172. a.log.Warnf("failed to close buffer: %v", err)
  173. }
  174. a.closeMulticastConn()
  175. a.updateConnectionState(ConnectionStateClosed)
  176. after()
  177. close(a.chanState)
  178. close(a.chanCandidate)
  179. close(a.chanCandidatePair)
  180. close(a.taskLoopDone)
  181. }()
  182. for {
  183. select {
  184. case <-a.done:
  185. return
  186. case t := <-a.chanTask:
  187. t.fn(a.context(), a)
  188. close(t.done)
  189. after()
  190. }
  191. }
  192. }
  193. // NewAgent creates a new Agent
  194. func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
  195. var err error
  196. if config.PortMax < config.PortMin {
  197. return nil, ErrPort
  198. }
  199. mDNSName := config.MulticastDNSHostName
  200. if mDNSName == "" {
  201. if mDNSName, err = generateMulticastDNSName(); err != nil {
  202. return nil, err
  203. }
  204. }
  205. if !strings.HasSuffix(mDNSName, ".local") || len(strings.Split(mDNSName, ".")) != 2 {
  206. return nil, ErrInvalidMulticastDNSHostName
  207. }
  208. mDNSMode := config.MulticastDNSMode
  209. if mDNSMode == 0 {
  210. mDNSMode = MulticastDNSModeQueryOnly
  211. }
  212. loggerFactory := config.LoggerFactory
  213. if loggerFactory == nil {
  214. loggerFactory = logging.NewDefaultLoggerFactory()
  215. }
  216. log := loggerFactory.NewLogger("ice")
  217. startedCtx, startedFn := context.WithCancel(context.Background())
  218. a := &Agent{
  219. chanTask: make(chan task),
  220. chanState: make(chan ConnectionState),
  221. chanCandidate: make(chan Candidate),
  222. chanCandidatePair: make(chan *CandidatePair),
  223. tieBreaker: globalMathRandomGenerator.Uint64(),
  224. lite: config.Lite,
  225. gatheringState: GatheringStateNew,
  226. connectionState: ConnectionStateNew,
  227. localCandidates: make(map[NetworkType][]Candidate),
  228. remoteCandidates: make(map[NetworkType][]Candidate),
  229. urls: config.Urls,
  230. networkTypes: config.NetworkTypes,
  231. onConnected: make(chan struct{}),
  232. buf: packetio.NewBuffer(),
  233. done: make(chan struct{}),
  234. taskLoopDone: make(chan struct{}),
  235. startedCh: startedCtx.Done(),
  236. startedFn: startedFn,
  237. portMin: config.PortMin,
  238. portMax: config.PortMax,
  239. loggerFactory: loggerFactory,
  240. log: log,
  241. net: config.Net,
  242. proxyDialer: config.ProxyDialer,
  243. mDNSMode: mDNSMode,
  244. mDNSName: mDNSName,
  245. gatherCandidateCancel: func() {},
  246. forceCandidateContact: make(chan bool, 1),
  247. interfaceFilter: config.InterfaceFilter,
  248. ipFilter: config.IPFilter,
  249. insecureSkipVerify: config.InsecureSkipVerify,
  250. includeLoopback: config.IncludeLoopback,
  251. }
  252. a.tcpMux = config.TCPMux
  253. if a.tcpMux == nil {
  254. a.tcpMux = newInvalidTCPMux()
  255. }
  256. a.udpMux = config.UDPMux
  257. a.udpMuxSrflx = config.UDPMuxSrflx
  258. if a.net == nil {
  259. a.net, err = stdnet.NewNet()
  260. if err != nil {
  261. return nil, fmt.Errorf("failed to create network: %w", err)
  262. }
  263. } else if _, isVirtual := a.net.(*vnet.Net); isVirtual {
  264. a.log.Warn("virtual network is enabled")
  265. if a.mDNSMode != MulticastDNSModeDisabled {
  266. a.log.Warn("virtual network does not support mDNS yet")
  267. }
  268. }
  269. // Opportunistic mDNS: If we can't open the connection, that's ok: we
  270. // can continue without it.
  271. if a.mDNSConn, a.mDNSMode, err = createMulticastDNS(a.net, mDNSMode, mDNSName, log); err != nil {
  272. log.Warnf("Failed to initialize mDNS %s: %v", mDNSName, err)
  273. }
  274. closeMDNSConn := func() {
  275. if a.mDNSConn != nil {
  276. if mdnsCloseErr := a.mDNSConn.Close(); mdnsCloseErr != nil {
  277. log.Warnf("Failed to close mDNS: %v", mdnsCloseErr)
  278. }
  279. }
  280. }
  281. config.initWithDefaults(a)
  282. // Make sure the buffer doesn't grow indefinitely.
  283. // NOTE: We actually won't get anywhere close to this limit.
  284. // SRTP will constantly read from the endpoint and drop packets if it's full.
  285. a.buf.SetLimitSize(maxBufferSize)
  286. if a.lite && (len(a.candidateTypes) != 1 || a.candidateTypes[0] != CandidateTypeHost) {
  287. closeMDNSConn()
  288. return nil, ErrLiteUsingNonHostCandidates
  289. }
  290. if config.Urls != nil && len(config.Urls) > 0 && !containsCandidateType(CandidateTypeServerReflexive, a.candidateTypes) && !containsCandidateType(CandidateTypeRelay, a.candidateTypes) {
  291. closeMDNSConn()
  292. return nil, ErrUselessUrlsProvided
  293. }
  294. if err = config.initExtIPMapping(a); err != nil {
  295. closeMDNSConn()
  296. return nil, err
  297. }
  298. go a.taskLoop()
  299. a.startOnConnectionStateChangeRoutine()
  300. // Restart is also used to initialize the agent for the first time
  301. if err := a.Restart(config.LocalUfrag, config.LocalPwd); err != nil {
  302. closeMDNSConn()
  303. _ = a.Close()
  304. return nil, err
  305. }
  306. return a, nil
  307. }
  308. // OnConnectionStateChange sets a handler that is fired when the connection state changes
  309. func (a *Agent) OnConnectionStateChange(f func(ConnectionState)) error {
  310. a.onConnectionStateChangeHdlr.Store(f)
  311. return nil
  312. }
  313. // OnSelectedCandidatePairChange sets a handler that is fired when the final candidate
  314. // pair is selected
  315. func (a *Agent) OnSelectedCandidatePairChange(f func(Candidate, Candidate)) error {
  316. a.onSelectedCandidatePairChangeHdlr.Store(f)
  317. return nil
  318. }
  319. // OnCandidate sets a handler that is fired when new candidates gathered. When
  320. // the gathering process complete the last candidate is nil.
  321. func (a *Agent) OnCandidate(f func(Candidate)) error {
  322. a.onCandidateHdlr.Store(f)
  323. return nil
  324. }
  325. func (a *Agent) onSelectedCandidatePairChange(p *CandidatePair) {
  326. if h, ok := a.onSelectedCandidatePairChangeHdlr.Load().(func(Candidate, Candidate)); ok {
  327. h(p.Local, p.Remote)
  328. }
  329. }
  330. func (a *Agent) onCandidate(c Candidate) {
  331. if onCandidateHdlr, ok := a.onCandidateHdlr.Load().(func(Candidate)); ok {
  332. onCandidateHdlr(c)
  333. }
  334. }
  335. func (a *Agent) onConnectionStateChange(s ConnectionState) {
  336. if hdlr, ok := a.onConnectionStateChangeHdlr.Load().(func(ConnectionState)); ok {
  337. hdlr(s)
  338. }
  339. }
  340. func (a *Agent) startOnConnectionStateChangeRoutine() {
  341. go func() {
  342. for {
  343. // CandidatePair and ConnectionState are usually changed at once.
  344. // Blocking one by the other one causes deadlock.
  345. p, isOpen := <-a.chanCandidatePair
  346. if !isOpen {
  347. return
  348. }
  349. a.onSelectedCandidatePairChange(p)
  350. }
  351. }()
  352. go func() {
  353. for {
  354. select {
  355. case s, isOpen := <-a.chanState:
  356. if !isOpen {
  357. for c := range a.chanCandidate {
  358. a.onCandidate(c)
  359. }
  360. return
  361. }
  362. go a.onConnectionStateChange(s)
  363. case c, isOpen := <-a.chanCandidate:
  364. if !isOpen {
  365. for s := range a.chanState {
  366. go a.onConnectionStateChange(s)
  367. }
  368. return
  369. }
  370. a.onCandidate(c)
  371. }
  372. }
  373. }()
  374. }
  375. func (a *Agent) startConnectivityChecks(isControlling bool, remoteUfrag, remotePwd string) error {
  376. a.muHaveStarted.Lock()
  377. defer a.muHaveStarted.Unlock()
  378. select {
  379. case <-a.startedCh:
  380. return ErrMultipleStart
  381. default:
  382. }
  383. if err := a.SetRemoteCredentials(remoteUfrag, remotePwd); err != nil { //nolint:contextcheck
  384. return err
  385. }
  386. a.log.Debugf("Started agent: isControlling? %t, remoteUfrag: %q, remotePwd: %q", isControlling, remoteUfrag, remotePwd)
  387. return a.run(a.context(), func(ctx context.Context, agent *Agent) {
  388. agent.isControlling = isControlling
  389. agent.remoteUfrag = remoteUfrag
  390. agent.remotePwd = remotePwd
  391. if isControlling {
  392. a.selector = &controllingSelector{agent: a, log: a.log}
  393. } else {
  394. a.selector = &controlledSelector{agent: a, log: a.log}
  395. }
  396. if a.lite {
  397. a.selector = &liteSelector{pairCandidateSelector: a.selector}
  398. }
  399. a.selector.Start()
  400. a.startedFn()
  401. agent.updateConnectionState(ConnectionStateChecking)
  402. a.requestConnectivityCheck()
  403. go a.connectivityChecks() //nolint:contextcheck
  404. })
  405. }
  406. func (a *Agent) connectivityChecks() {
  407. lastConnectionState := ConnectionState(0)
  408. checkingDuration := time.Time{}
  409. contact := func() {
  410. if err := a.run(a.context(), func(ctx context.Context, a *Agent) {
  411. defer func() {
  412. lastConnectionState = a.connectionState
  413. }()
  414. switch a.connectionState {
  415. case ConnectionStateFailed:
  416. // The connection is currently failed so don't send any checks
  417. // In the future it may be restarted though
  418. return
  419. case ConnectionStateChecking:
  420. // We have just entered checking for the first time so update our checking timer
  421. if lastConnectionState != a.connectionState {
  422. checkingDuration = time.Now()
  423. }
  424. // We have been in checking longer then Disconnect+Failed timeout, set the connection to Failed
  425. if time.Since(checkingDuration) > a.disconnectedTimeout+a.failedTimeout {
  426. a.updateConnectionState(ConnectionStateFailed)
  427. return
  428. }
  429. }
  430. a.selector.ContactCandidates()
  431. }); err != nil {
  432. a.log.Warnf("taskLoop failed: %v", err)
  433. }
  434. }
  435. for {
  436. interval := defaultKeepaliveInterval
  437. updateInterval := func(x time.Duration) {
  438. if x != 0 && (interval == 0 || interval > x) {
  439. interval = x
  440. }
  441. }
  442. switch lastConnectionState {
  443. case ConnectionStateNew, ConnectionStateChecking: // While connecting, check candidates more frequently
  444. updateInterval(a.checkInterval)
  445. case ConnectionStateConnected, ConnectionStateDisconnected:
  446. updateInterval(a.keepaliveInterval)
  447. default:
  448. }
  449. // Ensure we run our task loop as quickly as the minimum of our various configured timeouts
  450. updateInterval(a.disconnectedTimeout)
  451. updateInterval(a.failedTimeout)
  452. t := time.NewTimer(interval)
  453. select {
  454. case <-a.forceCandidateContact:
  455. t.Stop()
  456. contact()
  457. case <-t.C:
  458. contact()
  459. case <-a.done:
  460. t.Stop()
  461. return
  462. }
  463. }
  464. }
  465. func (a *Agent) updateConnectionState(newState ConnectionState) {
  466. if a.connectionState != newState {
  467. // Connection has gone to failed, release all gathered candidates
  468. if newState == ConnectionStateFailed {
  469. a.deleteAllCandidates()
  470. }
  471. a.log.Infof("Setting new connection state: %s", newState)
  472. a.connectionState = newState
  473. // Call handler after finishing current task since we may be holding the agent lock
  474. // and the handler may also require it
  475. a.afterRun(func(ctx context.Context) {
  476. a.chanState <- newState
  477. })
  478. }
  479. }
  480. func (a *Agent) setSelectedPair(p *CandidatePair) {
  481. if p == nil {
  482. var nilPair *CandidatePair
  483. a.selectedPair.Store(nilPair)
  484. a.log.Tracef("Unset selected candidate pair")
  485. return
  486. }
  487. p.nominated = true
  488. a.selectedPair.Store(p)
  489. a.log.Tracef("Set selected candidate pair: %s", p)
  490. a.updateConnectionState(ConnectionStateConnected)
  491. // Notify when the selected pair changes
  492. a.afterRun(func(ctx context.Context) {
  493. select {
  494. case a.chanCandidatePair <- p:
  495. case <-ctx.Done():
  496. }
  497. })
  498. // Signal connected
  499. a.onConnectedOnce.Do(func() { close(a.onConnected) })
  500. }
  501. func (a *Agent) pingAllCandidates() {
  502. a.log.Trace("pinging all candidates")
  503. if len(a.checklist) == 0 {
  504. a.log.Warn("pingAllCandidates called with no candidate pairs. Connection is not possible yet.")
  505. }
  506. for _, p := range a.checklist {
  507. if p.state == CandidatePairStateWaiting {
  508. p.state = CandidatePairStateInProgress
  509. } else if p.state != CandidatePairStateInProgress {
  510. continue
  511. }
  512. if p.bindingRequestCount > a.maxBindingRequests {
  513. a.log.Tracef("max requests reached for pair %s, marking it as failed", p)
  514. p.state = CandidatePairStateFailed
  515. } else {
  516. a.selector.PingCandidate(p.Local, p.Remote)
  517. p.bindingRequestCount++
  518. }
  519. }
  520. }
  521. func (a *Agent) getBestAvailableCandidatePair() *CandidatePair {
  522. var best *CandidatePair
  523. for _, p := range a.checklist {
  524. if p.state == CandidatePairStateFailed {
  525. continue
  526. }
  527. if best == nil {
  528. best = p
  529. } else if best.priority() < p.priority() {
  530. best = p
  531. }
  532. }
  533. return best
  534. }
  535. func (a *Agent) getBestValidCandidatePair() *CandidatePair {
  536. var best *CandidatePair
  537. for _, p := range a.checklist {
  538. if p.state != CandidatePairStateSucceeded {
  539. continue
  540. }
  541. if best == nil {
  542. best = p
  543. } else if best.priority() < p.priority() {
  544. best = p
  545. }
  546. }
  547. return best
  548. }
  549. func (a *Agent) addPair(local, remote Candidate) *CandidatePair {
  550. p := newCandidatePair(local, remote, a.isControlling)
  551. a.checklist = append(a.checklist, p)
  552. return p
  553. }
  554. func (a *Agent) findPair(local, remote Candidate) *CandidatePair {
  555. for _, p := range a.checklist {
  556. if p.Local.Equal(local) && p.Remote.Equal(remote) {
  557. return p
  558. }
  559. }
  560. return nil
  561. }
  562. // validateSelectedPair checks if the selected pair is (still) valid
  563. // Note: the caller should hold the agent lock.
  564. func (a *Agent) validateSelectedPair() bool {
  565. selectedPair := a.getSelectedPair()
  566. if selectedPair == nil {
  567. return false
  568. }
  569. disconnectedTime := time.Since(selectedPair.Remote.LastReceived())
  570. // Only allow transitions to failed if a.failedTimeout is non-zero
  571. totalTimeToFailure := a.failedTimeout
  572. if totalTimeToFailure != 0 {
  573. totalTimeToFailure += a.disconnectedTimeout
  574. }
  575. switch {
  576. case totalTimeToFailure != 0 && disconnectedTime > totalTimeToFailure:
  577. a.updateConnectionState(ConnectionStateFailed)
  578. case a.disconnectedTimeout != 0 && disconnectedTime > a.disconnectedTimeout:
  579. a.updateConnectionState(ConnectionStateDisconnected)
  580. default:
  581. a.updateConnectionState(ConnectionStateConnected)
  582. }
  583. return true
  584. }
  585. // checkKeepalive sends STUN Binding Indications to the selected pair
  586. // if no packet has been sent on that pair in the last keepaliveInterval
  587. // Note: the caller should hold the agent lock.
  588. func (a *Agent) checkKeepalive() {
  589. selectedPair := a.getSelectedPair()
  590. if selectedPair == nil {
  591. return
  592. }
  593. if (a.keepaliveInterval != 0) &&
  594. ((time.Since(selectedPair.Local.LastSent()) > a.keepaliveInterval) ||
  595. (time.Since(selectedPair.Remote.LastReceived()) > a.keepaliveInterval)) {
  596. // we use binding request instead of indication to support refresh consent schemas
  597. // see https://tools.ietf.org/html/rfc7675
  598. a.selector.PingCandidate(selectedPair.Local, selectedPair.Remote)
  599. }
  600. }
  601. // AddRemoteCandidate adds a new remote candidate
  602. func (a *Agent) AddRemoteCandidate(c Candidate) error {
  603. if c == nil {
  604. return nil
  605. }
  606. // cannot check for network yet because it might not be applied
  607. // when mDNS hostname is used.
  608. if c.TCPType() == TCPTypeActive {
  609. // TCP Candidates with TCP type active will probe server passive ones, so
  610. // no need to do anything with them.
  611. a.log.Infof("Ignoring remote candidate with tcpType active: %s", c)
  612. return nil
  613. }
  614. // If we have a mDNS Candidate lets fully resolve it before adding it locally
  615. if c.Type() == CandidateTypeHost && strings.HasSuffix(c.Address(), ".local") {
  616. if a.mDNSMode == MulticastDNSModeDisabled {
  617. a.log.Warnf("remote mDNS candidate added, but mDNS is disabled: (%s)", c.Address())
  618. return nil
  619. }
  620. hostCandidate, ok := c.(*CandidateHost)
  621. if !ok {
  622. return ErrAddressParseFailed
  623. }
  624. go a.resolveAndAddMulticastCandidate(hostCandidate)
  625. return nil
  626. }
  627. go func() {
  628. if err := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  629. agent.addRemoteCandidate(c)
  630. }); err != nil {
  631. a.log.Warnf("Failed to add remote candidate %s: %v", c.Address(), err)
  632. return
  633. }
  634. }()
  635. return nil
  636. }
  637. func (a *Agent) resolveAndAddMulticastCandidate(c *CandidateHost) {
  638. if a.mDNSConn == nil {
  639. return
  640. }
  641. _, src, err := a.mDNSConn.Query(c.context(), c.Address())
  642. if err != nil {
  643. a.log.Warnf("Failed to discover mDNS candidate %s: %v", c.Address(), err)
  644. return
  645. }
  646. ip, ipOk := parseMulticastAnswerAddr(src)
  647. if !ipOk {
  648. a.log.Warnf("Failed to discover mDNS candidate %s: failed to parse IP", c.Address())
  649. return
  650. }
  651. if err = c.setIP(ip); err != nil {
  652. a.log.Warnf("Failed to discover mDNS candidate %s: %v", c.Address(), err)
  653. return
  654. }
  655. if err = a.run(a.context(), func(ctx context.Context, agent *Agent) {
  656. agent.addRemoteCandidate(c)
  657. }); err != nil {
  658. a.log.Warnf("Failed to add mDNS candidate %s: %v", c.Address(), err)
  659. return
  660. }
  661. }
  662. func (a *Agent) requestConnectivityCheck() {
  663. select {
  664. case a.forceCandidateContact <- true:
  665. default:
  666. }
  667. }
  668. // addRemoteCandidate assumes you are holding the lock (must be execute using a.run)
  669. func (a *Agent) addRemoteCandidate(c Candidate) {
  670. set := a.remoteCandidates[c.NetworkType()]
  671. for _, candidate := range set {
  672. if candidate.Equal(c) {
  673. return
  674. }
  675. }
  676. set = append(set, c)
  677. a.remoteCandidates[c.NetworkType()] = set
  678. if localCandidates, ok := a.localCandidates[c.NetworkType()]; ok {
  679. for _, localCandidate := range localCandidates {
  680. a.addPair(localCandidate, c)
  681. }
  682. }
  683. a.requestConnectivityCheck()
  684. }
  685. func (a *Agent) addCandidate(ctx context.Context, c Candidate, candidateConn net.PacketConn) error {
  686. return a.run(ctx, func(ctx context.Context, agent *Agent) {
  687. set := a.localCandidates[c.NetworkType()]
  688. for _, candidate := range set {
  689. if candidate.Equal(c) {
  690. a.log.Debugf("Ignore duplicate candidate: %s", c.String())
  691. if err := c.close(); err != nil {
  692. a.log.Warnf("Failed to close duplicate candidate: %v", err)
  693. }
  694. if err := candidateConn.Close(); err != nil {
  695. a.log.Warnf("Failed to close duplicate candidate connection: %v", err)
  696. }
  697. return
  698. }
  699. }
  700. c.start(a, candidateConn, a.startedCh)
  701. set = append(set, c)
  702. a.localCandidates[c.NetworkType()] = set
  703. if remoteCandidates, ok := a.remoteCandidates[c.NetworkType()]; ok {
  704. for _, remoteCandidate := range remoteCandidates {
  705. a.addPair(c, remoteCandidate)
  706. }
  707. }
  708. a.requestConnectivityCheck()
  709. a.chanCandidate <- c
  710. })
  711. }
  712. // GetLocalCandidates returns the local candidates
  713. func (a *Agent) GetLocalCandidates() ([]Candidate, error) {
  714. var res []Candidate
  715. err := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  716. var candidates []Candidate
  717. for _, set := range agent.localCandidates {
  718. candidates = append(candidates, set...)
  719. }
  720. res = candidates
  721. })
  722. if err != nil {
  723. return nil, err
  724. }
  725. return res, nil
  726. }
  727. // GetLocalUserCredentials returns the local user credentials
  728. func (a *Agent) GetLocalUserCredentials() (frag string, pwd string, err error) {
  729. valSet := make(chan struct{})
  730. err = a.run(a.context(), func(ctx context.Context, agent *Agent) {
  731. frag = agent.localUfrag
  732. pwd = agent.localPwd
  733. close(valSet)
  734. })
  735. if err == nil {
  736. <-valSet
  737. }
  738. return
  739. }
  740. // GetRemoteUserCredentials returns the remote user credentials
  741. func (a *Agent) GetRemoteUserCredentials() (frag string, pwd string, err error) {
  742. valSet := make(chan struct{})
  743. err = a.run(a.context(), func(ctx context.Context, agent *Agent) {
  744. frag = agent.remoteUfrag
  745. pwd = agent.remotePwd
  746. close(valSet)
  747. })
  748. if err == nil {
  749. <-valSet
  750. }
  751. return
  752. }
  753. func (a *Agent) removeUfragFromMux() {
  754. a.tcpMux.RemoveConnByUfrag(a.localUfrag)
  755. if a.udpMux != nil {
  756. a.udpMux.RemoveConnByUfrag(a.localUfrag)
  757. }
  758. if a.udpMuxSrflx != nil {
  759. a.udpMuxSrflx.RemoveConnByUfrag(a.localUfrag)
  760. }
  761. }
  762. // Close cleans up the Agent
  763. func (a *Agent) Close() error {
  764. if err := a.ok(); err != nil {
  765. return err
  766. }
  767. a.afterRun(func(context.Context) {
  768. a.gatherCandidateCancel()
  769. if a.gatherCandidateDone != nil {
  770. <-a.gatherCandidateDone
  771. }
  772. })
  773. a.err.Store(ErrClosed)
  774. a.removeUfragFromMux()
  775. close(a.done)
  776. <-a.taskLoopDone
  777. return nil
  778. }
  779. // Remove all candidates. This closes any listening sockets
  780. // and removes both the local and remote candidate lists.
  781. //
  782. // This is used for restarts, failures and on close
  783. func (a *Agent) deleteAllCandidates() {
  784. for net, cs := range a.localCandidates {
  785. for _, c := range cs {
  786. if err := c.close(); err != nil {
  787. a.log.Warnf("Failed to close candidate %s: %v", c, err)
  788. }
  789. }
  790. delete(a.localCandidates, net)
  791. }
  792. for net, cs := range a.remoteCandidates {
  793. for _, c := range cs {
  794. if err := c.close(); err != nil {
  795. a.log.Warnf("Failed to close candidate %s: %v", c, err)
  796. }
  797. }
  798. delete(a.remoteCandidates, net)
  799. }
  800. }
  801. func (a *Agent) findRemoteCandidate(networkType NetworkType, addr net.Addr) Candidate {
  802. ip, port, _, ok := parseAddr(addr)
  803. if !ok {
  804. a.log.Warnf("Error parsing addr: %s", addr)
  805. return nil
  806. }
  807. set := a.remoteCandidates[networkType]
  808. for _, c := range set {
  809. if c.Address() == ip.String() && c.Port() == port {
  810. return c
  811. }
  812. }
  813. return nil
  814. }
  815. func (a *Agent) sendBindingRequest(m *stun.Message, local, remote Candidate) {
  816. a.log.Tracef("ping STUN from %s to %s", local.String(), remote.String())
  817. a.invalidatePendingBindingRequests(time.Now())
  818. a.pendingBindingRequests = append(a.pendingBindingRequests, bindingRequest{
  819. timestamp: time.Now(),
  820. transactionID: m.TransactionID,
  821. destination: remote.addr(),
  822. isUseCandidate: m.Contains(stun.AttrUseCandidate),
  823. })
  824. a.sendSTUN(m, local, remote)
  825. }
  826. func (a *Agent) sendBindingSuccess(m *stun.Message, local, remote Candidate) {
  827. base := remote
  828. ip, port, _, ok := parseAddr(base.addr())
  829. if !ok {
  830. a.log.Warnf("Error parsing addr: %s", base.addr())
  831. return
  832. }
  833. if out, err := stun.Build(m, stun.BindingSuccess,
  834. &stun.XORMappedAddress{
  835. IP: ip,
  836. Port: port,
  837. },
  838. stun.NewShortTermIntegrity(a.localPwd),
  839. stun.Fingerprint,
  840. ); err != nil {
  841. a.log.Warnf("Failed to handle inbound ICE from: %s to: %s error: %s", local, remote, err)
  842. } else {
  843. a.sendSTUN(out, local, remote)
  844. }
  845. }
  846. // Removes pending binding requests that are over maxBindingRequestTimeout old
  847. //
  848. // Let HTO be the transaction timeout, which SHOULD be 2*RTT if
  849. // RTT is known or 500 ms otherwise.
  850. // https://tools.ietf.org/html/rfc8445#appendix-B.1
  851. func (a *Agent) invalidatePendingBindingRequests(filterTime time.Time) {
  852. initialSize := len(a.pendingBindingRequests)
  853. temp := a.pendingBindingRequests[:0]
  854. for _, bindingRequest := range a.pendingBindingRequests {
  855. if filterTime.Sub(bindingRequest.timestamp) < maxBindingRequestTimeout {
  856. temp = append(temp, bindingRequest)
  857. }
  858. }
  859. a.pendingBindingRequests = temp
  860. if bindRequestsRemoved := initialSize - len(a.pendingBindingRequests); bindRequestsRemoved > 0 {
  861. a.log.Tracef("Discarded %d binding requests because they expired", bindRequestsRemoved)
  862. }
  863. }
  864. // Assert that the passed TransactionID is in our pendingBindingRequests and returns the destination
  865. // If the bindingRequest was valid remove it from our pending cache
  866. func (a *Agent) handleInboundBindingSuccess(id [stun.TransactionIDSize]byte) (bool, *bindingRequest) {
  867. a.invalidatePendingBindingRequests(time.Now())
  868. for i := range a.pendingBindingRequests {
  869. if a.pendingBindingRequests[i].transactionID == id {
  870. validBindingRequest := a.pendingBindingRequests[i]
  871. a.pendingBindingRequests = append(a.pendingBindingRequests[:i], a.pendingBindingRequests[i+1:]...)
  872. return true, &validBindingRequest
  873. }
  874. }
  875. return false, nil
  876. }
  877. // handleInbound processes STUN traffic from a remote candidate
  878. func (a *Agent) handleInbound(m *stun.Message, local Candidate, remote net.Addr) { //nolint:gocognit
  879. var err error
  880. if m == nil || local == nil {
  881. return
  882. }
  883. if m.Type.Method != stun.MethodBinding ||
  884. !(m.Type.Class == stun.ClassSuccessResponse ||
  885. m.Type.Class == stun.ClassRequest ||
  886. m.Type.Class == stun.ClassIndication) {
  887. a.log.Tracef("unhandled STUN from %s to %s class(%s) method(%s)", remote, local, m.Type.Class, m.Type.Method)
  888. return
  889. }
  890. if a.isControlling {
  891. if m.Contains(stun.AttrICEControlling) {
  892. a.log.Debug("inbound isControlling && a.isControlling == true")
  893. return
  894. } else if m.Contains(stun.AttrUseCandidate) {
  895. a.log.Debug("useCandidate && a.isControlling == true")
  896. return
  897. }
  898. } else {
  899. if m.Contains(stun.AttrICEControlled) {
  900. a.log.Debug("inbound isControlled && a.isControlling == false")
  901. return
  902. }
  903. }
  904. remoteCandidate := a.findRemoteCandidate(local.NetworkType(), remote)
  905. if m.Type.Class == stun.ClassSuccessResponse {
  906. if err = stun.MessageIntegrity([]byte(a.remotePwd)).Check(m); err != nil {
  907. a.log.Warnf("discard message from (%s), %v", remote, err)
  908. return
  909. }
  910. if remoteCandidate == nil {
  911. a.log.Warnf("discard success message from (%s), no such remote", remote)
  912. return
  913. }
  914. a.selector.HandleSuccessResponse(m, local, remoteCandidate, remote)
  915. } else if m.Type.Class == stun.ClassRequest {
  916. if err = stunx.AssertUsername(m, a.localUfrag+":"+a.remoteUfrag); err != nil {
  917. a.log.Warnf("discard message from (%s), %v", remote, err)
  918. return
  919. } else if err = stun.MessageIntegrity([]byte(a.localPwd)).Check(m); err != nil {
  920. a.log.Warnf("discard message from (%s), %v", remote, err)
  921. return
  922. }
  923. if remoteCandidate == nil {
  924. ip, port, networkType, ok := parseAddr(remote)
  925. if !ok {
  926. a.log.Errorf("Failed to create parse remote net.Addr when creating remote prflx candidate")
  927. return
  928. }
  929. prflxCandidateConfig := CandidatePeerReflexiveConfig{
  930. Network: networkType.String(),
  931. Address: ip.String(),
  932. Port: port,
  933. Component: local.Component(),
  934. RelAddr: "",
  935. RelPort: 0,
  936. }
  937. prflxCandidate, err := NewCandidatePeerReflexive(&prflxCandidateConfig)
  938. if err != nil {
  939. a.log.Errorf("Failed to create new remote prflx candidate (%s)", err)
  940. return
  941. }
  942. remoteCandidate = prflxCandidate
  943. a.log.Debugf("adding a new peer-reflexive candidate: %s ", remote)
  944. a.addRemoteCandidate(remoteCandidate)
  945. }
  946. a.log.Tracef("inbound STUN (Request) from %s to %s", remote.String(), local.String())
  947. a.selector.HandleBindingRequest(m, local, remoteCandidate)
  948. }
  949. if remoteCandidate != nil {
  950. remoteCandidate.seen(false)
  951. }
  952. }
  953. // validateNonSTUNTraffic processes non STUN traffic from a remote candidate,
  954. // and returns true if it is an actual remote candidate
  955. func (a *Agent) validateNonSTUNTraffic(local Candidate, remote net.Addr) bool {
  956. var isValidCandidate uint64
  957. if err := a.run(local.context(), func(ctx context.Context, agent *Agent) {
  958. remoteCandidate := a.findRemoteCandidate(local.NetworkType(), remote)
  959. if remoteCandidate != nil {
  960. remoteCandidate.seen(false)
  961. atomic.AddUint64(&isValidCandidate, 1)
  962. }
  963. }); err != nil {
  964. a.log.Warnf("failed to validate remote candidate: %v", err)
  965. }
  966. return atomic.LoadUint64(&isValidCandidate) == 1
  967. }
  968. // GetSelectedCandidatePair returns the selected pair or nil if there is none
  969. func (a *Agent) GetSelectedCandidatePair() (*CandidatePair, error) {
  970. selectedPair := a.getSelectedPair()
  971. if selectedPair == nil {
  972. return nil, nil //nolint:nilnil
  973. }
  974. local, err := selectedPair.Local.copy()
  975. if err != nil {
  976. return nil, err
  977. }
  978. remote, err := selectedPair.Remote.copy()
  979. if err != nil {
  980. return nil, err
  981. }
  982. return &CandidatePair{Local: local, Remote: remote}, nil
  983. }
  984. func (a *Agent) getSelectedPair() *CandidatePair {
  985. if selectedPair, ok := a.selectedPair.Load().(*CandidatePair); ok {
  986. return selectedPair
  987. }
  988. return nil
  989. }
  990. func (a *Agent) closeMulticastConn() {
  991. if a.mDNSConn != nil {
  992. if err := a.mDNSConn.Close(); err != nil {
  993. a.log.Warnf("failed to close mDNS Conn: %v", err)
  994. }
  995. }
  996. }
  997. // SetRemoteCredentials sets the credentials of the remote agent
  998. func (a *Agent) SetRemoteCredentials(remoteUfrag, remotePwd string) error {
  999. switch {
  1000. case remoteUfrag == "":
  1001. return ErrRemoteUfragEmpty
  1002. case remotePwd == "":
  1003. return ErrRemotePwdEmpty
  1004. }
  1005. return a.run(a.context(), func(ctx context.Context, agent *Agent) {
  1006. agent.remoteUfrag = remoteUfrag
  1007. agent.remotePwd = remotePwd
  1008. })
  1009. }
  1010. // Restart restarts the ICE Agent with the provided ufrag/pwd
  1011. // If no ufrag/pwd is provided the Agent will generate one itself
  1012. //
  1013. // If there is a gatherer routine currently running, Restart will
  1014. // cancel it.
  1015. // After a Restart, the user must then call GatherCandidates explicitly
  1016. // to start generating new ones.
  1017. func (a *Agent) Restart(ufrag, pwd string) error {
  1018. if ufrag == "" {
  1019. var err error
  1020. ufrag, err = generateUFrag()
  1021. if err != nil {
  1022. return err
  1023. }
  1024. }
  1025. if pwd == "" {
  1026. var err error
  1027. pwd, err = generatePwd()
  1028. if err != nil {
  1029. return err
  1030. }
  1031. }
  1032. if len([]rune(ufrag))*8 < 24 {
  1033. return ErrLocalUfragInsufficientBits
  1034. }
  1035. if len([]rune(pwd))*8 < 128 {
  1036. return ErrLocalPwdInsufficientBits
  1037. }
  1038. var err error
  1039. if runErr := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  1040. if agent.gatheringState == GatheringStateGathering {
  1041. agent.gatherCandidateCancel()
  1042. }
  1043. // Clear all agent needed to take back to fresh state
  1044. a.removeUfragFromMux()
  1045. agent.localUfrag = ufrag
  1046. agent.localPwd = pwd
  1047. agent.remoteUfrag = ""
  1048. agent.remotePwd = ""
  1049. a.gatheringState = GatheringStateNew
  1050. a.checklist = make([]*CandidatePair, 0)
  1051. a.pendingBindingRequests = make([]bindingRequest, 0)
  1052. a.setSelectedPair(nil)
  1053. a.deleteAllCandidates()
  1054. if a.selector != nil {
  1055. a.selector.Start()
  1056. }
  1057. // Restart is used by NewAgent. Accept/Connect should be used to move to checking
  1058. // for new Agents
  1059. if a.connectionState != ConnectionStateNew {
  1060. a.updateConnectionState(ConnectionStateChecking)
  1061. }
  1062. }); runErr != nil {
  1063. return runErr
  1064. }
  1065. return err
  1066. }
  1067. func (a *Agent) setGatheringState(newState GatheringState) error {
  1068. done := make(chan struct{})
  1069. if err := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  1070. if a.gatheringState != newState && newState == GatheringStateComplete {
  1071. a.chanCandidate <- nil
  1072. }
  1073. a.gatheringState = newState
  1074. close(done)
  1075. }); err != nil {
  1076. return err
  1077. }
  1078. <-done
  1079. return nil
  1080. }