agent.go 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. // Package ice implements the Interactive Connectivity Establishment (ICE)
  4. // protocol defined in rfc5245.
  5. package ice
  6. import (
  7. "context"
  8. "fmt"
  9. "net"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. atomicx "github.com/pion/ice/v2/internal/atomic"
  16. stunx "github.com/pion/ice/v2/internal/stun"
  17. "github.com/pion/logging"
  18. "github.com/pion/mdns"
  19. "github.com/pion/stun"
  20. "github.com/pion/transport/v2"
  21. "github.com/pion/transport/v2/packetio"
  22. "github.com/pion/transport/v2/stdnet"
  23. "github.com/pion/transport/v2/vnet"
  24. "golang.org/x/net/proxy"
  25. )
  26. type bindingRequest struct {
  27. timestamp time.Time
  28. transactionID [stun.TransactionIDSize]byte
  29. destination net.Addr
  30. isUseCandidate bool
  31. }
  32. // Agent represents the ICE agent
  33. type Agent struct {
  34. chanTask chan task
  35. afterRunFn []func(ctx context.Context)
  36. muAfterRun sync.Mutex
  37. onConnectionStateChangeHdlr atomic.Value // func(ConnectionState)
  38. onSelectedCandidatePairChangeHdlr atomic.Value // func(Candidate, Candidate)
  39. onCandidateHdlr atomic.Value // func(Candidate)
  40. // State owned by the taskLoop
  41. onConnected chan struct{}
  42. onConnectedOnce sync.Once
  43. // Force candidate to be contacted immediately (instead of waiting for task ticker)
  44. forceCandidateContact chan bool
  45. tieBreaker uint64
  46. lite bool
  47. connectionState ConnectionState
  48. gatheringState GatheringState
  49. mDNSMode MulticastDNSMode
  50. mDNSName string
  51. mDNSConn *mdns.Conn
  52. muHaveStarted sync.Mutex
  53. startedCh <-chan struct{}
  54. startedFn func()
  55. isControlling bool
  56. maxBindingRequests uint16
  57. hostAcceptanceMinWait time.Duration
  58. srflxAcceptanceMinWait time.Duration
  59. prflxAcceptanceMinWait time.Duration
  60. relayAcceptanceMinWait time.Duration
  61. tcpPriorityOffset uint16
  62. disableActiveTCP bool
  63. portMin uint16
  64. portMax uint16
  65. candidateTypes []CandidateType
  66. // How long connectivity checks can fail before the ICE Agent
  67. // goes to disconnected
  68. disconnectedTimeout time.Duration
  69. // How long connectivity checks can fail before the ICE Agent
  70. // goes to failed
  71. failedTimeout time.Duration
  72. // How often should we send keepalive packets?
  73. // 0 means never
  74. keepaliveInterval time.Duration
  75. // How often should we run our internal taskLoop to check for state changes when connecting
  76. checkInterval time.Duration
  77. localUfrag string
  78. localPwd string
  79. localCandidates map[NetworkType][]Candidate
  80. remoteUfrag string
  81. remotePwd string
  82. remoteCandidates map[NetworkType][]Candidate
  83. checklist []*CandidatePair
  84. selector pairCandidateSelector
  85. selectedPair atomic.Value // *CandidatePair
  86. urls []*stun.URI
  87. networkTypes []NetworkType
  88. buf *packetio.Buffer
  89. // LRU of outbound Binding request Transaction IDs
  90. pendingBindingRequests []bindingRequest
  91. // 1:1 D-NAT IP address mapping
  92. extIPMapper *externalIPMapper
  93. // State for closing
  94. done chan struct{}
  95. taskLoopDone chan struct{}
  96. err atomicx.Error
  97. gatherCandidateCancel func()
  98. gatherCandidateDone chan struct{}
  99. chanCandidate chan Candidate
  100. chanCandidatePair chan *CandidatePair
  101. chanState chan ConnectionState
  102. loggerFactory logging.LoggerFactory
  103. log logging.LeveledLogger
  104. net transport.Net
  105. tcpMux TCPMux
  106. udpMux UDPMux
  107. udpMuxSrflx UniversalUDPMux
  108. interfaceFilter func(string) bool
  109. ipFilter func(net.IP) bool
  110. includeLoopback bool
  111. insecureSkipVerify bool
  112. proxyDialer proxy.Dialer
  113. }
  114. type task struct {
  115. fn func(context.Context, *Agent)
  116. done chan struct{}
  117. }
  118. // afterRun registers function to be run after the task.
  119. func (a *Agent) afterRun(f func(context.Context)) {
  120. a.muAfterRun.Lock()
  121. a.afterRunFn = append(a.afterRunFn, f)
  122. a.muAfterRun.Unlock()
  123. }
  124. func (a *Agent) getAfterRunFn() []func(context.Context) {
  125. a.muAfterRun.Lock()
  126. defer a.muAfterRun.Unlock()
  127. fns := a.afterRunFn
  128. a.afterRunFn = nil
  129. return fns
  130. }
  131. func (a *Agent) ok() error {
  132. select {
  133. case <-a.done:
  134. return a.getErr()
  135. default:
  136. }
  137. return nil
  138. }
  139. func (a *Agent) getErr() error {
  140. if err := a.err.Load(); err != nil {
  141. return err
  142. }
  143. return ErrClosed
  144. }
  145. // Run task in serial. Blocking tasks must be cancelable by context.
  146. func (a *Agent) run(ctx context.Context, t func(context.Context, *Agent)) error {
  147. if err := a.ok(); err != nil {
  148. return err
  149. }
  150. done := make(chan struct{})
  151. select {
  152. case <-ctx.Done():
  153. return ctx.Err()
  154. case a.chanTask <- task{t, done}:
  155. <-done
  156. return nil
  157. }
  158. }
  159. // taskLoop handles registered tasks and agent close.
  160. func (a *Agent) taskLoop() {
  161. after := func() {
  162. for {
  163. // Get and run func registered by afterRun().
  164. fns := a.getAfterRunFn()
  165. if len(fns) == 0 {
  166. break
  167. }
  168. for _, fn := range fns {
  169. fn(a.context())
  170. }
  171. }
  172. }
  173. defer func() {
  174. a.deleteAllCandidates()
  175. a.startedFn()
  176. if err := a.buf.Close(); err != nil {
  177. a.log.Warnf("Failed to close buffer: %v", err)
  178. }
  179. a.closeMulticastConn()
  180. a.updateConnectionState(ConnectionStateClosed)
  181. after()
  182. close(a.chanState)
  183. close(a.chanCandidate)
  184. close(a.chanCandidatePair)
  185. close(a.taskLoopDone)
  186. }()
  187. for {
  188. select {
  189. case <-a.done:
  190. return
  191. case t := <-a.chanTask:
  192. t.fn(a.context(), a)
  193. close(t.done)
  194. after()
  195. }
  196. }
  197. }
  198. // NewAgent creates a new Agent
  199. func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
  200. var err error
  201. if config.PortMax < config.PortMin {
  202. return nil, ErrPort
  203. }
  204. mDNSName := config.MulticastDNSHostName
  205. if mDNSName == "" {
  206. if mDNSName, err = generateMulticastDNSName(); err != nil {
  207. return nil, err
  208. }
  209. }
  210. if !strings.HasSuffix(mDNSName, ".local") || len(strings.Split(mDNSName, ".")) != 2 {
  211. return nil, ErrInvalidMulticastDNSHostName
  212. }
  213. mDNSMode := config.MulticastDNSMode
  214. if mDNSMode == 0 {
  215. mDNSMode = MulticastDNSModeQueryOnly
  216. }
  217. loggerFactory := config.LoggerFactory
  218. if loggerFactory == nil {
  219. loggerFactory = logging.NewDefaultLoggerFactory()
  220. }
  221. log := loggerFactory.NewLogger("ice")
  222. startedCtx, startedFn := context.WithCancel(context.Background())
  223. a := &Agent{
  224. chanTask: make(chan task),
  225. chanState: make(chan ConnectionState),
  226. chanCandidate: make(chan Candidate),
  227. chanCandidatePair: make(chan *CandidatePair),
  228. tieBreaker: globalMathRandomGenerator.Uint64(),
  229. lite: config.Lite,
  230. gatheringState: GatheringStateNew,
  231. connectionState: ConnectionStateNew,
  232. localCandidates: make(map[NetworkType][]Candidate),
  233. remoteCandidates: make(map[NetworkType][]Candidate),
  234. urls: config.Urls,
  235. networkTypes: config.NetworkTypes,
  236. onConnected: make(chan struct{}),
  237. buf: packetio.NewBuffer(),
  238. done: make(chan struct{}),
  239. taskLoopDone: make(chan struct{}),
  240. startedCh: startedCtx.Done(),
  241. startedFn: startedFn,
  242. portMin: config.PortMin,
  243. portMax: config.PortMax,
  244. loggerFactory: loggerFactory,
  245. log: log,
  246. net: config.Net,
  247. proxyDialer: config.ProxyDialer,
  248. tcpMux: config.TCPMux,
  249. udpMux: config.UDPMux,
  250. udpMuxSrflx: config.UDPMuxSrflx,
  251. mDNSMode: mDNSMode,
  252. mDNSName: mDNSName,
  253. gatherCandidateCancel: func() {},
  254. forceCandidateContact: make(chan bool, 1),
  255. interfaceFilter: config.InterfaceFilter,
  256. ipFilter: config.IPFilter,
  257. insecureSkipVerify: config.InsecureSkipVerify,
  258. includeLoopback: config.IncludeLoopback,
  259. disableActiveTCP: config.DisableActiveTCP,
  260. }
  261. if a.net == nil {
  262. a.net, err = stdnet.NewNet()
  263. if err != nil {
  264. return nil, fmt.Errorf("failed to create network: %w", err)
  265. }
  266. } else if _, isVirtual := a.net.(*vnet.Net); isVirtual {
  267. a.log.Warn("Virtual network is enabled")
  268. if a.mDNSMode != MulticastDNSModeDisabled {
  269. a.log.Warn("Virtual network does not support mDNS yet")
  270. }
  271. }
  272. // Opportunistic mDNS: If we can't open the connection, that's ok: we
  273. // can continue without it.
  274. if a.mDNSConn, a.mDNSMode, err = createMulticastDNS(a.net, mDNSMode, mDNSName, log); err != nil {
  275. log.Warnf("Failed to initialize mDNS %s: %v", mDNSName, err)
  276. }
  277. config.initWithDefaults(a)
  278. // Make sure the buffer doesn't grow indefinitely.
  279. // NOTE: We actually won't get anywhere close to this limit.
  280. // SRTP will constantly read from the endpoint and drop packets if it's full.
  281. a.buf.SetLimitSize(maxBufferSize)
  282. if a.lite && (len(a.candidateTypes) != 1 || a.candidateTypes[0] != CandidateTypeHost) {
  283. a.closeMulticastConn()
  284. return nil, ErrLiteUsingNonHostCandidates
  285. }
  286. if config.Urls != nil && len(config.Urls) > 0 && !containsCandidateType(CandidateTypeServerReflexive, a.candidateTypes) && !containsCandidateType(CandidateTypeRelay, a.candidateTypes) {
  287. a.closeMulticastConn()
  288. return nil, ErrUselessUrlsProvided
  289. }
  290. if err = config.initExtIPMapping(a); err != nil {
  291. a.closeMulticastConn()
  292. return nil, err
  293. }
  294. go a.taskLoop()
  295. // CandidatePair and ConnectionState are usually changed at once.
  296. // Blocking one by the other one causes deadlock.
  297. // Hence, we call handlers from independent Goroutines.
  298. go a.candidatePairRoutine()
  299. go a.connectionStateRoutine()
  300. go a.candidateRoutine()
  301. // Restart is also used to initialize the agent for the first time
  302. if err := a.Restart(config.LocalUfrag, config.LocalPwd); err != nil {
  303. a.closeMulticastConn()
  304. _ = a.Close()
  305. return nil, err
  306. }
  307. return a, nil
  308. }
  309. func (a *Agent) startConnectivityChecks(isControlling bool, remoteUfrag, remotePwd string) error {
  310. a.muHaveStarted.Lock()
  311. defer a.muHaveStarted.Unlock()
  312. select {
  313. case <-a.startedCh:
  314. return ErrMultipleStart
  315. default:
  316. }
  317. if err := a.SetRemoteCredentials(remoteUfrag, remotePwd); err != nil { //nolint:contextcheck
  318. return err
  319. }
  320. a.log.Debugf("Started agent: isControlling? %t, remoteUfrag: %q, remotePwd: %q", isControlling, remoteUfrag, remotePwd)
  321. return a.run(a.context(), func(ctx context.Context, agent *Agent) {
  322. agent.isControlling = isControlling
  323. agent.remoteUfrag = remoteUfrag
  324. agent.remotePwd = remotePwd
  325. if isControlling {
  326. a.selector = &controllingSelector{agent: a, log: a.log}
  327. } else {
  328. a.selector = &controlledSelector{agent: a, log: a.log}
  329. }
  330. if a.lite {
  331. a.selector = &liteSelector{pairCandidateSelector: a.selector}
  332. }
  333. a.selector.Start()
  334. a.startedFn()
  335. agent.updateConnectionState(ConnectionStateChecking)
  336. a.requestConnectivityCheck()
  337. go a.connectivityChecks() //nolint:contextcheck
  338. })
  339. }
  340. func (a *Agent) connectivityChecks() {
  341. lastConnectionState := ConnectionState(0)
  342. checkingDuration := time.Time{}
  343. contact := func() {
  344. if err := a.run(a.context(), func(ctx context.Context, a *Agent) {
  345. defer func() {
  346. lastConnectionState = a.connectionState
  347. }()
  348. switch a.connectionState {
  349. case ConnectionStateFailed:
  350. // The connection is currently failed so don't send any checks
  351. // In the future it may be restarted though
  352. return
  353. case ConnectionStateChecking:
  354. // We have just entered checking for the first time so update our checking timer
  355. if lastConnectionState != a.connectionState {
  356. checkingDuration = time.Now()
  357. }
  358. // We have been in checking longer then Disconnect+Failed timeout, set the connection to Failed
  359. if time.Since(checkingDuration) > a.disconnectedTimeout+a.failedTimeout {
  360. a.updateConnectionState(ConnectionStateFailed)
  361. return
  362. }
  363. default:
  364. }
  365. a.selector.ContactCandidates()
  366. }); err != nil {
  367. a.log.Warnf("Failed to start connectivity checks: %v", err)
  368. }
  369. }
  370. for {
  371. interval := defaultKeepaliveInterval
  372. updateInterval := func(x time.Duration) {
  373. if x != 0 && (interval == 0 || interval > x) {
  374. interval = x
  375. }
  376. }
  377. switch lastConnectionState {
  378. case ConnectionStateNew, ConnectionStateChecking: // While connecting, check candidates more frequently
  379. updateInterval(a.checkInterval)
  380. case ConnectionStateConnected, ConnectionStateDisconnected:
  381. updateInterval(a.keepaliveInterval)
  382. default:
  383. }
  384. // Ensure we run our task loop as quickly as the minimum of our various configured timeouts
  385. updateInterval(a.disconnectedTimeout)
  386. updateInterval(a.failedTimeout)
  387. t := time.NewTimer(interval)
  388. select {
  389. case <-a.forceCandidateContact:
  390. t.Stop()
  391. contact()
  392. case <-t.C:
  393. contact()
  394. case <-a.done:
  395. t.Stop()
  396. return
  397. }
  398. }
  399. }
  400. func (a *Agent) updateConnectionState(newState ConnectionState) {
  401. if a.connectionState != newState {
  402. // Connection has gone to failed, release all gathered candidates
  403. if newState == ConnectionStateFailed {
  404. a.removeUfragFromMux()
  405. a.checklist = make([]*CandidatePair, 0)
  406. a.pendingBindingRequests = make([]bindingRequest, 0)
  407. a.setSelectedPair(nil)
  408. a.deleteAllCandidates()
  409. }
  410. a.log.Infof("Setting new connection state: %s", newState)
  411. a.connectionState = newState
  412. // Call handler after finishing current task since we may be holding the agent lock
  413. // and the handler may also require it
  414. a.afterRun(func(ctx context.Context) {
  415. a.chanState <- newState
  416. })
  417. }
  418. }
  419. func (a *Agent) setSelectedPair(p *CandidatePair) {
  420. if p == nil {
  421. var nilPair *CandidatePair
  422. a.selectedPair.Store(nilPair)
  423. a.log.Tracef("Unset selected candidate pair")
  424. return
  425. }
  426. p.nominated = true
  427. a.selectedPair.Store(p)
  428. a.log.Tracef("Set selected candidate pair: %s", p)
  429. a.updateConnectionState(ConnectionStateConnected)
  430. // Notify when the selected pair changes
  431. a.afterRun(func(ctx context.Context) {
  432. select {
  433. case a.chanCandidatePair <- p:
  434. case <-ctx.Done():
  435. }
  436. })
  437. // Signal connected
  438. a.onConnectedOnce.Do(func() { close(a.onConnected) })
  439. }
  440. func (a *Agent) pingAllCandidates() {
  441. a.log.Trace("Pinging all candidates")
  442. if len(a.checklist) == 0 {
  443. a.log.Warn("Failed to ping without candidate pairs. Connection is not possible yet.")
  444. }
  445. for _, p := range a.checklist {
  446. if p.state == CandidatePairStateWaiting {
  447. p.state = CandidatePairStateInProgress
  448. } else if p.state != CandidatePairStateInProgress {
  449. continue
  450. }
  451. if p.bindingRequestCount > a.maxBindingRequests {
  452. a.log.Tracef("Maximum requests reached for pair %s, marking it as failed", p)
  453. p.state = CandidatePairStateFailed
  454. } else {
  455. a.selector.PingCandidate(p.Local, p.Remote)
  456. p.bindingRequestCount++
  457. }
  458. }
  459. }
  460. func (a *Agent) getBestAvailableCandidatePair() *CandidatePair {
  461. var best *CandidatePair
  462. for _, p := range a.checklist {
  463. if p.state == CandidatePairStateFailed {
  464. continue
  465. }
  466. if best == nil {
  467. best = p
  468. } else if best.priority() < p.priority() {
  469. best = p
  470. }
  471. }
  472. return best
  473. }
  474. func (a *Agent) getBestValidCandidatePair() *CandidatePair {
  475. var best *CandidatePair
  476. for _, p := range a.checklist {
  477. if p.state != CandidatePairStateSucceeded {
  478. continue
  479. }
  480. if best == nil {
  481. best = p
  482. } else if best.priority() < p.priority() {
  483. best = p
  484. }
  485. }
  486. return best
  487. }
  488. func (a *Agent) addPair(local, remote Candidate) *CandidatePair {
  489. p := newCandidatePair(local, remote, a.isControlling)
  490. a.checklist = append(a.checklist, p)
  491. return p
  492. }
  493. func (a *Agent) findPair(local, remote Candidate) *CandidatePair {
  494. for _, p := range a.checklist {
  495. if p.Local.Equal(local) && p.Remote.Equal(remote) {
  496. return p
  497. }
  498. }
  499. return nil
  500. }
  501. // validateSelectedPair checks if the selected pair is (still) valid
  502. // Note: the caller should hold the agent lock.
  503. func (a *Agent) validateSelectedPair() bool {
  504. selectedPair := a.getSelectedPair()
  505. if selectedPair == nil {
  506. return false
  507. }
  508. disconnectedTime := time.Since(selectedPair.Remote.LastReceived())
  509. // Only allow transitions to failed if a.failedTimeout is non-zero
  510. totalTimeToFailure := a.failedTimeout
  511. if totalTimeToFailure != 0 {
  512. totalTimeToFailure += a.disconnectedTimeout
  513. }
  514. switch {
  515. case totalTimeToFailure != 0 && disconnectedTime > totalTimeToFailure:
  516. a.updateConnectionState(ConnectionStateFailed)
  517. case a.disconnectedTimeout != 0 && disconnectedTime > a.disconnectedTimeout:
  518. a.updateConnectionState(ConnectionStateDisconnected)
  519. default:
  520. a.updateConnectionState(ConnectionStateConnected)
  521. }
  522. return true
  523. }
  524. // checkKeepalive sends STUN Binding Indications to the selected pair
  525. // if no packet has been sent on that pair in the last keepaliveInterval
  526. // Note: the caller should hold the agent lock.
  527. func (a *Agent) checkKeepalive() {
  528. selectedPair := a.getSelectedPair()
  529. if selectedPair == nil {
  530. return
  531. }
  532. if (a.keepaliveInterval != 0) &&
  533. ((time.Since(selectedPair.Local.LastSent()) > a.keepaliveInterval) ||
  534. (time.Since(selectedPair.Remote.LastReceived()) > a.keepaliveInterval)) {
  535. // We use binding request instead of indication to support refresh consent schemas
  536. // see https://tools.ietf.org/html/rfc7675
  537. a.selector.PingCandidate(selectedPair.Local, selectedPair.Remote)
  538. }
  539. }
  540. // AddRemoteCandidate adds a new remote candidate
  541. func (a *Agent) AddRemoteCandidate(c Candidate) error {
  542. if c == nil {
  543. return nil
  544. }
  545. // TCP Candidates with TCP type active will probe server passive ones, so
  546. // no need to do anything with them.
  547. if c.TCPType() == TCPTypeActive {
  548. a.log.Infof("Ignoring remote candidate with tcpType active: %s", c)
  549. return nil
  550. }
  551. // If we have a mDNS Candidate lets fully resolve it before adding it locally
  552. if c.Type() == CandidateTypeHost && strings.HasSuffix(c.Address(), ".local") {
  553. if a.mDNSMode == MulticastDNSModeDisabled {
  554. a.log.Warnf("Remote mDNS candidate added, but mDNS is disabled: (%s)", c.Address())
  555. return nil
  556. }
  557. hostCandidate, ok := c.(*CandidateHost)
  558. if !ok {
  559. return ErrAddressParseFailed
  560. }
  561. go a.resolveAndAddMulticastCandidate(hostCandidate)
  562. return nil
  563. }
  564. go func() {
  565. if err := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  566. // nolint: contextcheck
  567. agent.addRemoteCandidate(c)
  568. }); err != nil {
  569. a.log.Warnf("Failed to add remote candidate %s: %v", c.Address(), err)
  570. return
  571. }
  572. }()
  573. return nil
  574. }
  575. func (a *Agent) resolveAndAddMulticastCandidate(c *CandidateHost) {
  576. if a.mDNSConn == nil {
  577. return
  578. }
  579. _, src, err := a.mDNSConn.Query(c.context(), c.Address())
  580. if err != nil {
  581. a.log.Warnf("Failed to discover mDNS candidate %s: %v", c.Address(), err)
  582. return
  583. }
  584. ip, ipOk := parseMulticastAnswerAddr(src)
  585. if !ipOk {
  586. a.log.Warnf("Failed to discover mDNS candidate %s: failed to parse IP", c.Address())
  587. return
  588. }
  589. if err = c.setIP(ip); err != nil {
  590. a.log.Warnf("Failed to discover mDNS candidate %s: %v", c.Address(), err)
  591. return
  592. }
  593. if err = a.run(a.context(), func(ctx context.Context, agent *Agent) {
  594. // nolint: contextcheck
  595. agent.addRemoteCandidate(c)
  596. }); err != nil {
  597. a.log.Warnf("Failed to add mDNS candidate %s: %v", c.Address(), err)
  598. return
  599. }
  600. }
  601. func (a *Agent) requestConnectivityCheck() {
  602. select {
  603. case a.forceCandidateContact <- true:
  604. default:
  605. }
  606. }
  607. func (a *Agent) addRemotePassiveTCPCandidate(remoteCandidate Candidate) {
  608. localIPs, err := localInterfaces(a.net, a.interfaceFilter, a.ipFilter, []NetworkType{remoteCandidate.NetworkType()}, a.includeLoopback)
  609. if err != nil {
  610. a.log.Warnf("Failed to iterate local interfaces, host candidates will not be gathered %s", err)
  611. return
  612. }
  613. for i := range localIPs {
  614. conn := newActiveTCPConn(
  615. a.context(),
  616. net.JoinHostPort(localIPs[i].String(), "0"),
  617. net.JoinHostPort(remoteCandidate.Address(), strconv.Itoa(remoteCandidate.Port())),
  618. a.log,
  619. )
  620. tcpAddr, ok := conn.LocalAddr().(*net.TCPAddr)
  621. if !ok {
  622. closeConnAndLog(conn, a.log, "Failed to create Active ICE-TCP Candidate: %v", errInvalidAddress)
  623. continue
  624. }
  625. localCandidate, err := NewCandidateHost(&CandidateHostConfig{
  626. Network: remoteCandidate.NetworkType().String(),
  627. Address: localIPs[i].String(),
  628. Port: tcpAddr.Port,
  629. Component: ComponentRTP,
  630. TCPType: TCPTypeActive,
  631. })
  632. if err != nil {
  633. closeConnAndLog(conn, a.log, "Failed to create Active ICE-TCP Candidate: %v", err)
  634. continue
  635. }
  636. localCandidate.start(a, conn, a.startedCh)
  637. a.localCandidates[localCandidate.NetworkType()] = append(a.localCandidates[localCandidate.NetworkType()], localCandidate)
  638. a.chanCandidate <- localCandidate
  639. a.addPair(localCandidate, remoteCandidate)
  640. }
  641. }
  642. // addRemoteCandidate assumes you are holding the lock (must be execute using a.run)
  643. func (a *Agent) addRemoteCandidate(c Candidate) {
  644. set := a.remoteCandidates[c.NetworkType()]
  645. for _, candidate := range set {
  646. if candidate.Equal(c) {
  647. return
  648. }
  649. }
  650. tcpNetworkTypeFound := false
  651. for _, networkType := range a.networkTypes {
  652. if networkType.IsTCP() {
  653. tcpNetworkTypeFound = true
  654. }
  655. }
  656. if !a.disableActiveTCP && tcpNetworkTypeFound && c.TCPType() == TCPTypePassive {
  657. a.addRemotePassiveTCPCandidate(c)
  658. }
  659. set = append(set, c)
  660. a.remoteCandidates[c.NetworkType()] = set
  661. if c.TCPType() != TCPTypePassive {
  662. if localCandidates, ok := a.localCandidates[c.NetworkType()]; ok {
  663. for _, localCandidate := range localCandidates {
  664. a.addPair(localCandidate, c)
  665. }
  666. }
  667. }
  668. a.requestConnectivityCheck()
  669. }
  670. func (a *Agent) addCandidate(ctx context.Context, c Candidate, candidateConn net.PacketConn) error {
  671. return a.run(ctx, func(ctx context.Context, agent *Agent) {
  672. set := a.localCandidates[c.NetworkType()]
  673. for _, candidate := range set {
  674. if candidate.Equal(c) {
  675. a.log.Debugf("Ignore duplicate candidate: %s", c.String())
  676. if err := c.close(); err != nil {
  677. a.log.Warnf("Failed to close duplicate candidate: %v", err)
  678. }
  679. if err := candidateConn.Close(); err != nil {
  680. a.log.Warnf("Failed to close duplicate candidate connection: %v", err)
  681. }
  682. return
  683. }
  684. }
  685. c.start(a, candidateConn, a.startedCh)
  686. set = append(set, c)
  687. a.localCandidates[c.NetworkType()] = set
  688. if remoteCandidates, ok := a.remoteCandidates[c.NetworkType()]; ok {
  689. for _, remoteCandidate := range remoteCandidates {
  690. a.addPair(c, remoteCandidate)
  691. }
  692. }
  693. a.requestConnectivityCheck()
  694. a.chanCandidate <- c
  695. })
  696. }
  697. // GetRemoteCandidates returns the remote candidates
  698. func (a *Agent) GetRemoteCandidates() ([]Candidate, error) {
  699. var res []Candidate
  700. err := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  701. var candidates []Candidate
  702. for _, set := range agent.remoteCandidates {
  703. candidates = append(candidates, set...)
  704. }
  705. res = candidates
  706. })
  707. if err != nil {
  708. return nil, err
  709. }
  710. return res, nil
  711. }
  712. // GetLocalCandidates returns the local candidates
  713. func (a *Agent) GetLocalCandidates() ([]Candidate, error) {
  714. var res []Candidate
  715. err := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  716. var candidates []Candidate
  717. for _, set := range agent.localCandidates {
  718. candidates = append(candidates, set...)
  719. }
  720. res = candidates
  721. })
  722. if err != nil {
  723. return nil, err
  724. }
  725. return res, nil
  726. }
  727. // GetLocalUserCredentials returns the local user credentials
  728. func (a *Agent) GetLocalUserCredentials() (frag string, pwd string, err error) {
  729. valSet := make(chan struct{})
  730. err = a.run(a.context(), func(ctx context.Context, agent *Agent) {
  731. frag = agent.localUfrag
  732. pwd = agent.localPwd
  733. close(valSet)
  734. })
  735. if err == nil {
  736. <-valSet
  737. }
  738. return
  739. }
  740. // GetRemoteUserCredentials returns the remote user credentials
  741. func (a *Agent) GetRemoteUserCredentials() (frag string, pwd string, err error) {
  742. valSet := make(chan struct{})
  743. err = a.run(a.context(), func(ctx context.Context, agent *Agent) {
  744. frag = agent.remoteUfrag
  745. pwd = agent.remotePwd
  746. close(valSet)
  747. })
  748. if err == nil {
  749. <-valSet
  750. }
  751. return
  752. }
  753. func (a *Agent) removeUfragFromMux() {
  754. if a.tcpMux != nil {
  755. a.tcpMux.RemoveConnByUfrag(a.localUfrag)
  756. }
  757. if a.udpMux != nil {
  758. a.udpMux.RemoveConnByUfrag(a.localUfrag)
  759. }
  760. if a.udpMuxSrflx != nil {
  761. a.udpMuxSrflx.RemoveConnByUfrag(a.localUfrag)
  762. }
  763. }
  764. // Close cleans up the Agent
  765. func (a *Agent) Close() error {
  766. if err := a.ok(); err != nil {
  767. return err
  768. }
  769. a.afterRun(func(context.Context) {
  770. a.gatherCandidateCancel()
  771. if a.gatherCandidateDone != nil {
  772. <-a.gatherCandidateDone
  773. }
  774. })
  775. a.err.Store(ErrClosed)
  776. a.removeUfragFromMux()
  777. close(a.done)
  778. <-a.taskLoopDone
  779. return nil
  780. }
  781. // Remove all candidates. This closes any listening sockets
  782. // and removes both the local and remote candidate lists.
  783. //
  784. // This is used for restarts, failures and on close
  785. func (a *Agent) deleteAllCandidates() {
  786. for net, cs := range a.localCandidates {
  787. for _, c := range cs {
  788. if err := c.close(); err != nil {
  789. a.log.Warnf("Failed to close candidate %s: %v", c, err)
  790. }
  791. }
  792. delete(a.localCandidates, net)
  793. }
  794. for net, cs := range a.remoteCandidates {
  795. for _, c := range cs {
  796. if err := c.close(); err != nil {
  797. a.log.Warnf("Failed to close candidate %s: %v", c, err)
  798. }
  799. }
  800. delete(a.remoteCandidates, net)
  801. }
  802. }
  803. func (a *Agent) findRemoteCandidate(networkType NetworkType, addr net.Addr) Candidate {
  804. ip, port, _, ok := parseAddr(addr)
  805. if !ok {
  806. a.log.Warnf("Failed to parse address: %s", addr)
  807. return nil
  808. }
  809. set := a.remoteCandidates[networkType]
  810. for _, c := range set {
  811. if c.Address() == ip.String() && c.Port() == port {
  812. return c
  813. }
  814. }
  815. return nil
  816. }
  817. func (a *Agent) sendBindingRequest(m *stun.Message, local, remote Candidate) {
  818. a.log.Tracef("Ping STUN from %s to %s", local.String(), remote.String())
  819. a.invalidatePendingBindingRequests(time.Now())
  820. a.pendingBindingRequests = append(a.pendingBindingRequests, bindingRequest{
  821. timestamp: time.Now(),
  822. transactionID: m.TransactionID,
  823. destination: remote.addr(),
  824. isUseCandidate: m.Contains(stun.AttrUseCandidate),
  825. })
  826. a.sendSTUN(m, local, remote)
  827. }
  828. func (a *Agent) sendBindingSuccess(m *stun.Message, local, remote Candidate) {
  829. base := remote
  830. ip, port, _, ok := parseAddr(base.addr())
  831. if !ok {
  832. a.log.Warnf("Failed to parse address: %s", base.addr())
  833. return
  834. }
  835. if out, err := stun.Build(m, stun.BindingSuccess,
  836. &stun.XORMappedAddress{
  837. IP: ip,
  838. Port: port,
  839. },
  840. stun.NewShortTermIntegrity(a.localPwd),
  841. stun.Fingerprint,
  842. ); err != nil {
  843. a.log.Warnf("Failed to handle inbound ICE from: %s to: %s error: %s", local, remote, err)
  844. } else {
  845. a.sendSTUN(out, local, remote)
  846. }
  847. }
  848. // Removes pending binding requests that are over maxBindingRequestTimeout old
  849. //
  850. // Let HTO be the transaction timeout, which SHOULD be 2*RTT if
  851. // RTT is known or 500 ms otherwise.
  852. // https://tools.ietf.org/html/rfc8445#appendix-B.1
  853. func (a *Agent) invalidatePendingBindingRequests(filterTime time.Time) {
  854. initialSize := len(a.pendingBindingRequests)
  855. temp := a.pendingBindingRequests[:0]
  856. for _, bindingRequest := range a.pendingBindingRequests {
  857. if filterTime.Sub(bindingRequest.timestamp) < maxBindingRequestTimeout {
  858. temp = append(temp, bindingRequest)
  859. }
  860. }
  861. a.pendingBindingRequests = temp
  862. if bindRequestsRemoved := initialSize - len(a.pendingBindingRequests); bindRequestsRemoved > 0 {
  863. a.log.Tracef("Discarded %d binding requests because they expired", bindRequestsRemoved)
  864. }
  865. }
  866. // Assert that the passed TransactionID is in our pendingBindingRequests and returns the destination
  867. // If the bindingRequest was valid remove it from our pending cache
  868. func (a *Agent) handleInboundBindingSuccess(id [stun.TransactionIDSize]byte) (bool, *bindingRequest) {
  869. a.invalidatePendingBindingRequests(time.Now())
  870. for i := range a.pendingBindingRequests {
  871. if a.pendingBindingRequests[i].transactionID == id {
  872. validBindingRequest := a.pendingBindingRequests[i]
  873. a.pendingBindingRequests = append(a.pendingBindingRequests[:i], a.pendingBindingRequests[i+1:]...)
  874. return true, &validBindingRequest
  875. }
  876. }
  877. return false, nil
  878. }
  879. // handleInbound processes STUN traffic from a remote candidate
  880. func (a *Agent) handleInbound(m *stun.Message, local Candidate, remote net.Addr) { //nolint:gocognit
  881. var err error
  882. if m == nil || local == nil {
  883. return
  884. }
  885. if m.Type.Method != stun.MethodBinding ||
  886. !(m.Type.Class == stun.ClassSuccessResponse ||
  887. m.Type.Class == stun.ClassRequest ||
  888. m.Type.Class == stun.ClassIndication) {
  889. a.log.Tracef("Unhandled STUN from %s to %s class(%s) method(%s)", remote, local, m.Type.Class, m.Type.Method)
  890. return
  891. }
  892. if a.isControlling {
  893. if m.Contains(stun.AttrICEControlling) {
  894. a.log.Debug("Inbound STUN message: isControlling && a.isControlling == true")
  895. return
  896. } else if m.Contains(stun.AttrUseCandidate) {
  897. a.log.Debug("Inbound STUN message: useCandidate && a.isControlling == true")
  898. return
  899. }
  900. } else {
  901. if m.Contains(stun.AttrICEControlled) {
  902. a.log.Debug("Inbound STUN message: isControlled && a.isControlling == false")
  903. return
  904. }
  905. }
  906. remoteCandidate := a.findRemoteCandidate(local.NetworkType(), remote)
  907. if m.Type.Class == stun.ClassSuccessResponse {
  908. if err = stun.MessageIntegrity([]byte(a.remotePwd)).Check(m); err != nil {
  909. a.log.Warnf("Discard message from (%s), %v", remote, err)
  910. return
  911. }
  912. if remoteCandidate == nil {
  913. a.log.Warnf("Discard success message from (%s), no such remote", remote)
  914. return
  915. }
  916. a.selector.HandleSuccessResponse(m, local, remoteCandidate, remote)
  917. } else if m.Type.Class == stun.ClassRequest {
  918. a.log.Tracef("Inbound STUN (Request) from %s to %s, useCandidate: %v", remote.String(), local.String(), m.Contains(stun.AttrUseCandidate))
  919. if err = stunx.AssertUsername(m, a.localUfrag+":"+a.remoteUfrag); err != nil {
  920. a.log.Warnf("Discard message from (%s), %v", remote, err)
  921. return
  922. } else if err = stun.MessageIntegrity([]byte(a.localPwd)).Check(m); err != nil {
  923. a.log.Warnf("Discard message from (%s), %v", remote, err)
  924. return
  925. }
  926. if remoteCandidate == nil {
  927. ip, port, networkType, ok := parseAddr(remote)
  928. if !ok {
  929. a.log.Errorf("Failed to create parse remote net.Addr when creating remote prflx candidate")
  930. return
  931. }
  932. prflxCandidateConfig := CandidatePeerReflexiveConfig{
  933. Network: networkType.String(),
  934. Address: ip.String(),
  935. Port: port,
  936. Component: local.Component(),
  937. RelAddr: "",
  938. RelPort: 0,
  939. }
  940. prflxCandidate, err := NewCandidatePeerReflexive(&prflxCandidateConfig)
  941. if err != nil {
  942. a.log.Errorf("Failed to create new remote prflx candidate (%s)", err)
  943. return
  944. }
  945. remoteCandidate = prflxCandidate
  946. a.log.Debugf("Adding a new peer-reflexive candidate: %s ", remote)
  947. a.addRemoteCandidate(remoteCandidate)
  948. }
  949. a.selector.HandleBindingRequest(m, local, remoteCandidate)
  950. }
  951. if remoteCandidate != nil {
  952. remoteCandidate.seen(false)
  953. }
  954. }
  955. // validateNonSTUNTraffic processes non STUN traffic from a remote candidate,
  956. // and returns true if it is an actual remote candidate
  957. func (a *Agent) validateNonSTUNTraffic(local Candidate, remote net.Addr) (Candidate, bool) {
  958. var remoteCandidate Candidate
  959. if err := a.run(local.context(), func(ctx context.Context, agent *Agent) {
  960. remoteCandidate = a.findRemoteCandidate(local.NetworkType(), remote)
  961. if remoteCandidate != nil {
  962. remoteCandidate.seen(false)
  963. }
  964. }); err != nil {
  965. a.log.Warnf("Failed to validate remote candidate: %v", err)
  966. }
  967. return remoteCandidate, remoteCandidate != nil
  968. }
  969. // GetSelectedCandidatePair returns the selected pair or nil if there is none
  970. func (a *Agent) GetSelectedCandidatePair() (*CandidatePair, error) {
  971. selectedPair := a.getSelectedPair()
  972. if selectedPair == nil {
  973. return nil, nil //nolint:nilnil
  974. }
  975. local, err := selectedPair.Local.copy()
  976. if err != nil {
  977. return nil, err
  978. }
  979. remote, err := selectedPair.Remote.copy()
  980. if err != nil {
  981. return nil, err
  982. }
  983. return &CandidatePair{Local: local, Remote: remote}, nil
  984. }
  985. func (a *Agent) getSelectedPair() *CandidatePair {
  986. if selectedPair, ok := a.selectedPair.Load().(*CandidatePair); ok {
  987. return selectedPair
  988. }
  989. return nil
  990. }
  991. func (a *Agent) closeMulticastConn() {
  992. if a.mDNSConn != nil {
  993. if err := a.mDNSConn.Close(); err != nil {
  994. a.log.Warnf("Failed to close mDNS Conn: %v", err)
  995. }
  996. }
  997. }
  998. // SetRemoteCredentials sets the credentials of the remote agent
  999. func (a *Agent) SetRemoteCredentials(remoteUfrag, remotePwd string) error {
  1000. switch {
  1001. case remoteUfrag == "":
  1002. return ErrRemoteUfragEmpty
  1003. case remotePwd == "":
  1004. return ErrRemotePwdEmpty
  1005. }
  1006. return a.run(a.context(), func(ctx context.Context, agent *Agent) {
  1007. agent.remoteUfrag = remoteUfrag
  1008. agent.remotePwd = remotePwd
  1009. })
  1010. }
  1011. // Restart restarts the ICE Agent with the provided ufrag/pwd
  1012. // If no ufrag/pwd is provided the Agent will generate one itself
  1013. //
  1014. // If there is a gatherer routine currently running, Restart will
  1015. // cancel it.
  1016. // After a Restart, the user must then call GatherCandidates explicitly
  1017. // to start generating new ones.
  1018. func (a *Agent) Restart(ufrag, pwd string) error {
  1019. if ufrag == "" {
  1020. var err error
  1021. ufrag, err = generateUFrag()
  1022. if err != nil {
  1023. return err
  1024. }
  1025. }
  1026. if pwd == "" {
  1027. var err error
  1028. pwd, err = generatePwd()
  1029. if err != nil {
  1030. return err
  1031. }
  1032. }
  1033. if len([]rune(ufrag))*8 < 24 {
  1034. return ErrLocalUfragInsufficientBits
  1035. }
  1036. if len([]rune(pwd))*8 < 128 {
  1037. return ErrLocalPwdInsufficientBits
  1038. }
  1039. var err error
  1040. if runErr := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  1041. if agent.gatheringState == GatheringStateGathering {
  1042. agent.gatherCandidateCancel()
  1043. }
  1044. // Clear all agent needed to take back to fresh state
  1045. a.removeUfragFromMux()
  1046. agent.localUfrag = ufrag
  1047. agent.localPwd = pwd
  1048. agent.remoteUfrag = ""
  1049. agent.remotePwd = ""
  1050. a.gatheringState = GatheringStateNew
  1051. a.checklist = make([]*CandidatePair, 0)
  1052. a.pendingBindingRequests = make([]bindingRequest, 0)
  1053. a.setSelectedPair(nil)
  1054. a.deleteAllCandidates()
  1055. if a.selector != nil {
  1056. a.selector.Start()
  1057. }
  1058. // Restart is used by NewAgent. Accept/Connect should be used to move to checking
  1059. // for new Agents
  1060. if a.connectionState != ConnectionStateNew {
  1061. a.updateConnectionState(ConnectionStateChecking)
  1062. }
  1063. }); runErr != nil {
  1064. return runErr
  1065. }
  1066. return err
  1067. }
  1068. func (a *Agent) setGatheringState(newState GatheringState) error {
  1069. done := make(chan struct{})
  1070. if err := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  1071. if a.gatheringState != newState && newState == GatheringStateComplete {
  1072. a.chanCandidate <- nil
  1073. }
  1074. a.gatheringState = newState
  1075. close(done)
  1076. }); err != nil {
  1077. return err
  1078. }
  1079. <-done
  1080. return nil
  1081. }