agent.go 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. // Package ice implements the Interactive Connectivity Establishment (ICE)
  4. // protocol defined in rfc5245.
  5. package ice
  6. import (
  7. "context"
  8. "fmt"
  9. "math"
  10. "net"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. atomicx "github.com/pion/ice/v2/internal/atomic"
  17. stunx "github.com/pion/ice/v2/internal/stun"
  18. "github.com/pion/logging"
  19. "github.com/pion/mdns"
  20. "github.com/pion/stun"
  21. "github.com/pion/transport/v2"
  22. "github.com/pion/transport/v2/packetio"
  23. "github.com/pion/transport/v2/stdnet"
  24. "github.com/pion/transport/v2/vnet"
  25. "golang.org/x/net/proxy"
  26. )
  27. type bindingRequest struct {
  28. timestamp time.Time
  29. transactionID [stun.TransactionIDSize]byte
  30. destination net.Addr
  31. isUseCandidate bool
  32. }
  33. // Agent represents the ICE agent
  34. type Agent struct {
  35. chanTask chan task
  36. afterRunFn []func(ctx context.Context)
  37. muAfterRun sync.Mutex
  38. onConnectionStateChangeHdlr atomic.Value // func(ConnectionState)
  39. onSelectedCandidatePairChangeHdlr atomic.Value // func(Candidate, Candidate)
  40. onCandidateHdlr atomic.Value // func(Candidate)
  41. // State owned by the taskLoop
  42. onConnected chan struct{}
  43. onConnectedOnce sync.Once
  44. // Force candidate to be contacted immediately (instead of waiting for task ticker)
  45. forceCandidateContact chan bool
  46. tieBreaker uint64
  47. lite bool
  48. connectionState ConnectionState
  49. gatheringState GatheringState
  50. mDNSMode MulticastDNSMode
  51. mDNSName string
  52. mDNSConn *mdns.Conn
  53. muHaveStarted sync.Mutex
  54. startedCh <-chan struct{}
  55. startedFn func()
  56. isControlling bool
  57. maxBindingRequests uint16
  58. hostAcceptanceMinWait time.Duration
  59. srflxAcceptanceMinWait time.Duration
  60. prflxAcceptanceMinWait time.Duration
  61. relayAcceptanceMinWait time.Duration
  62. tcpPriorityOffset uint16
  63. disableActiveTCP bool
  64. portMin uint16
  65. portMax uint16
  66. candidateTypes []CandidateType
  67. // How long connectivity checks can fail before the ICE Agent
  68. // goes to disconnected
  69. disconnectedTimeout time.Duration
  70. // How long connectivity checks can fail before the ICE Agent
  71. // goes to failed
  72. failedTimeout time.Duration
  73. // How often should we send keepalive packets?
  74. // 0 means never
  75. keepaliveInterval time.Duration
  76. // How often should we run our internal taskLoop to check for state changes when connecting
  77. checkInterval time.Duration
  78. localUfrag string
  79. localPwd string
  80. localCandidates map[NetworkType][]Candidate
  81. remoteUfrag string
  82. remotePwd string
  83. remoteCandidates map[NetworkType][]Candidate
  84. checklist []*CandidatePair
  85. selector pairCandidateSelector
  86. selectedPair atomic.Value // *CandidatePair
  87. urls []*stun.URI
  88. networkTypes []NetworkType
  89. buf *packetio.Buffer
  90. // LRU of outbound Binding request Transaction IDs
  91. pendingBindingRequests []bindingRequest
  92. // 1:1 D-NAT IP address mapping
  93. extIPMapper *externalIPMapper
  94. // State for closing
  95. done chan struct{}
  96. taskLoopDone chan struct{}
  97. err atomicx.Error
  98. // Callback that allows user to implement custom behavior
  99. // for STUN Binding Requests
  100. userBindingRequestHandler func(m *stun.Message, local, remote Candidate, pair *CandidatePair) bool
  101. gatherCandidateCancel func()
  102. gatherCandidateDone chan struct{}
  103. chanCandidate chan Candidate
  104. chanCandidatePair chan *CandidatePair
  105. chanState chan ConnectionState
  106. loggerFactory logging.LoggerFactory
  107. log logging.LeveledLogger
  108. net transport.Net
  109. tcpMux TCPMux
  110. udpMux UDPMux
  111. udpMuxSrflx UniversalUDPMux
  112. interfaceFilter func(string) bool
  113. ipFilter func(net.IP) bool
  114. includeLoopback bool
  115. insecureSkipVerify bool
  116. proxyDialer proxy.Dialer
  117. }
  118. type task struct {
  119. fn func(context.Context, *Agent)
  120. done chan struct{}
  121. }
  122. // afterRun registers function to be run after the task.
  123. func (a *Agent) afterRun(f func(context.Context)) {
  124. a.muAfterRun.Lock()
  125. a.afterRunFn = append(a.afterRunFn, f)
  126. a.muAfterRun.Unlock()
  127. }
  128. func (a *Agent) getAfterRunFn() []func(context.Context) {
  129. a.muAfterRun.Lock()
  130. defer a.muAfterRun.Unlock()
  131. fns := a.afterRunFn
  132. a.afterRunFn = nil
  133. return fns
  134. }
  135. func (a *Agent) ok() error {
  136. select {
  137. case <-a.done:
  138. return a.getErr()
  139. default:
  140. }
  141. return nil
  142. }
  143. func (a *Agent) getErr() error {
  144. if err := a.err.Load(); err != nil {
  145. return err
  146. }
  147. return ErrClosed
  148. }
  149. // Run task in serial. Blocking tasks must be cancelable by context.
  150. func (a *Agent) run(ctx context.Context, t func(context.Context, *Agent)) error {
  151. if err := a.ok(); err != nil {
  152. return err
  153. }
  154. done := make(chan struct{})
  155. select {
  156. case <-ctx.Done():
  157. return ctx.Err()
  158. case a.chanTask <- task{t, done}:
  159. <-done
  160. return nil
  161. }
  162. }
  163. // taskLoop handles registered tasks and agent close.
  164. func (a *Agent) taskLoop() {
  165. after := func() {
  166. for {
  167. // Get and run func registered by afterRun().
  168. fns := a.getAfterRunFn()
  169. if len(fns) == 0 {
  170. break
  171. }
  172. for _, fn := range fns {
  173. fn(a.context())
  174. }
  175. }
  176. }
  177. defer func() {
  178. a.deleteAllCandidates()
  179. a.startedFn()
  180. if err := a.buf.Close(); err != nil {
  181. a.log.Warnf("Failed to close buffer: %v", err)
  182. }
  183. a.closeMulticastConn()
  184. a.updateConnectionState(ConnectionStateClosed)
  185. after()
  186. close(a.chanState)
  187. close(a.chanCandidate)
  188. close(a.chanCandidatePair)
  189. close(a.taskLoopDone)
  190. }()
  191. for {
  192. select {
  193. case <-a.done:
  194. return
  195. case t := <-a.chanTask:
  196. t.fn(a.context(), a)
  197. close(t.done)
  198. after()
  199. }
  200. }
  201. }
  202. // NewAgent creates a new Agent
  203. func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
  204. var err error
  205. if config.PortMax < config.PortMin {
  206. return nil, ErrPort
  207. }
  208. mDNSName := config.MulticastDNSHostName
  209. if mDNSName == "" {
  210. if mDNSName, err = generateMulticastDNSName(); err != nil {
  211. return nil, err
  212. }
  213. }
  214. if !strings.HasSuffix(mDNSName, ".local") || len(strings.Split(mDNSName, ".")) != 2 {
  215. return nil, ErrInvalidMulticastDNSHostName
  216. }
  217. mDNSMode := config.MulticastDNSMode
  218. if mDNSMode == 0 {
  219. mDNSMode = MulticastDNSModeQueryOnly
  220. }
  221. loggerFactory := config.LoggerFactory
  222. if loggerFactory == nil {
  223. loggerFactory = logging.NewDefaultLoggerFactory()
  224. }
  225. log := loggerFactory.NewLogger("ice")
  226. startedCtx, startedFn := context.WithCancel(context.Background())
  227. a := &Agent{
  228. chanTask: make(chan task),
  229. chanState: make(chan ConnectionState),
  230. chanCandidate: make(chan Candidate),
  231. chanCandidatePair: make(chan *CandidatePair),
  232. tieBreaker: globalMathRandomGenerator.Uint64(),
  233. lite: config.Lite,
  234. gatheringState: GatheringStateNew,
  235. connectionState: ConnectionStateNew,
  236. localCandidates: make(map[NetworkType][]Candidate),
  237. remoteCandidates: make(map[NetworkType][]Candidate),
  238. urls: config.Urls,
  239. networkTypes: config.NetworkTypes,
  240. onConnected: make(chan struct{}),
  241. buf: packetio.NewBuffer(),
  242. done: make(chan struct{}),
  243. taskLoopDone: make(chan struct{}),
  244. startedCh: startedCtx.Done(),
  245. startedFn: startedFn,
  246. portMin: config.PortMin,
  247. portMax: config.PortMax,
  248. loggerFactory: loggerFactory,
  249. log: log,
  250. net: config.Net,
  251. proxyDialer: config.ProxyDialer,
  252. tcpMux: config.TCPMux,
  253. udpMux: config.UDPMux,
  254. udpMuxSrflx: config.UDPMuxSrflx,
  255. mDNSMode: mDNSMode,
  256. mDNSName: mDNSName,
  257. gatherCandidateCancel: func() {},
  258. forceCandidateContact: make(chan bool, 1),
  259. interfaceFilter: config.InterfaceFilter,
  260. ipFilter: config.IPFilter,
  261. insecureSkipVerify: config.InsecureSkipVerify,
  262. includeLoopback: config.IncludeLoopback,
  263. disableActiveTCP: config.DisableActiveTCP,
  264. userBindingRequestHandler: config.BindingRequestHandler,
  265. }
  266. if a.net == nil {
  267. a.net, err = stdnet.NewNet()
  268. if err != nil {
  269. return nil, fmt.Errorf("failed to create network: %w", err)
  270. }
  271. } else if _, isVirtual := a.net.(*vnet.Net); isVirtual {
  272. a.log.Warn("Virtual network is enabled")
  273. if a.mDNSMode != MulticastDNSModeDisabled {
  274. a.log.Warn("Virtual network does not support mDNS yet")
  275. }
  276. }
  277. // Opportunistic mDNS: If we can't open the connection, that's ok: we
  278. // can continue without it.
  279. if a.mDNSConn, a.mDNSMode, err = createMulticastDNS(a.net, mDNSMode, mDNSName, log); err != nil {
  280. log.Warnf("Failed to initialize mDNS %s: %v", mDNSName, err)
  281. }
  282. config.initWithDefaults(a)
  283. // Make sure the buffer doesn't grow indefinitely.
  284. // NOTE: We actually won't get anywhere close to this limit.
  285. // SRTP will constantly read from the endpoint and drop packets if it's full.
  286. a.buf.SetLimitSize(maxBufferSize)
  287. if a.lite && (len(a.candidateTypes) != 1 || a.candidateTypes[0] != CandidateTypeHost) {
  288. a.closeMulticastConn()
  289. return nil, ErrLiteUsingNonHostCandidates
  290. }
  291. if config.Urls != nil && len(config.Urls) > 0 && !containsCandidateType(CandidateTypeServerReflexive, a.candidateTypes) && !containsCandidateType(CandidateTypeRelay, a.candidateTypes) {
  292. a.closeMulticastConn()
  293. return nil, ErrUselessUrlsProvided
  294. }
  295. if err = config.initExtIPMapping(a); err != nil {
  296. a.closeMulticastConn()
  297. return nil, err
  298. }
  299. go a.taskLoop()
  300. // CandidatePair and ConnectionState are usually changed at once.
  301. // Blocking one by the other one causes deadlock.
  302. // Hence, we call handlers from independent Goroutines.
  303. go a.candidatePairRoutine()
  304. go a.connectionStateRoutine()
  305. go a.candidateRoutine()
  306. // Restart is also used to initialize the agent for the first time
  307. if err := a.Restart(config.LocalUfrag, config.LocalPwd); err != nil {
  308. a.closeMulticastConn()
  309. _ = a.Close()
  310. return nil, err
  311. }
  312. return a, nil
  313. }
  314. func (a *Agent) startConnectivityChecks(isControlling bool, remoteUfrag, remotePwd string) error {
  315. a.muHaveStarted.Lock()
  316. defer a.muHaveStarted.Unlock()
  317. select {
  318. case <-a.startedCh:
  319. return ErrMultipleStart
  320. default:
  321. }
  322. if err := a.SetRemoteCredentials(remoteUfrag, remotePwd); err != nil { //nolint:contextcheck
  323. return err
  324. }
  325. a.log.Debugf("Started agent: isControlling? %t, remoteUfrag: %q, remotePwd: %q", isControlling, remoteUfrag, remotePwd)
  326. return a.run(a.context(), func(ctx context.Context, agent *Agent) {
  327. agent.isControlling = isControlling
  328. agent.remoteUfrag = remoteUfrag
  329. agent.remotePwd = remotePwd
  330. if isControlling {
  331. a.selector = &controllingSelector{agent: a, log: a.log}
  332. } else {
  333. a.selector = &controlledSelector{agent: a, log: a.log}
  334. }
  335. if a.lite {
  336. a.selector = &liteSelector{pairCandidateSelector: a.selector}
  337. }
  338. a.selector.Start()
  339. a.startedFn()
  340. agent.updateConnectionState(ConnectionStateChecking)
  341. a.requestConnectivityCheck()
  342. go a.connectivityChecks() //nolint:contextcheck
  343. })
  344. }
  345. func (a *Agent) connectivityChecks() {
  346. lastConnectionState := ConnectionState(0)
  347. checkingDuration := time.Time{}
  348. contact := func() {
  349. if err := a.run(a.context(), func(ctx context.Context, a *Agent) {
  350. defer func() {
  351. lastConnectionState = a.connectionState
  352. }()
  353. switch a.connectionState {
  354. case ConnectionStateFailed:
  355. // The connection is currently failed so don't send any checks
  356. // In the future it may be restarted though
  357. return
  358. case ConnectionStateChecking:
  359. // We have just entered checking for the first time so update our checking timer
  360. if lastConnectionState != a.connectionState {
  361. checkingDuration = time.Now()
  362. }
  363. // We have been in checking longer then Disconnect+Failed timeout, set the connection to Failed
  364. if time.Since(checkingDuration) > a.disconnectedTimeout+a.failedTimeout {
  365. a.updateConnectionState(ConnectionStateFailed)
  366. return
  367. }
  368. default:
  369. }
  370. a.selector.ContactCandidates()
  371. }); err != nil {
  372. a.log.Warnf("Failed to start connectivity checks: %v", err)
  373. }
  374. }
  375. t := time.NewTimer(math.MaxInt64)
  376. t.Stop()
  377. for {
  378. interval := defaultKeepaliveInterval
  379. updateInterval := func(x time.Duration) {
  380. if x != 0 && (interval == 0 || interval > x) {
  381. interval = x
  382. }
  383. }
  384. switch lastConnectionState {
  385. case ConnectionStateNew, ConnectionStateChecking: // While connecting, check candidates more frequently
  386. updateInterval(a.checkInterval)
  387. case ConnectionStateConnected, ConnectionStateDisconnected:
  388. updateInterval(a.keepaliveInterval)
  389. default:
  390. }
  391. // Ensure we run our task loop as quickly as the minimum of our various configured timeouts
  392. updateInterval(a.disconnectedTimeout)
  393. updateInterval(a.failedTimeout)
  394. t.Reset(interval)
  395. select {
  396. case <-a.forceCandidateContact:
  397. if !t.Stop() {
  398. <-t.C
  399. }
  400. contact()
  401. case <-t.C:
  402. contact()
  403. case <-a.done:
  404. t.Stop()
  405. return
  406. }
  407. }
  408. }
  409. func (a *Agent) updateConnectionState(newState ConnectionState) {
  410. if a.connectionState != newState {
  411. // Connection has gone to failed, release all gathered candidates
  412. if newState == ConnectionStateFailed {
  413. a.removeUfragFromMux()
  414. a.checklist = make([]*CandidatePair, 0)
  415. a.pendingBindingRequests = make([]bindingRequest, 0)
  416. a.setSelectedPair(nil)
  417. a.deleteAllCandidates()
  418. }
  419. a.log.Infof("Setting new connection state: %s", newState)
  420. a.connectionState = newState
  421. // Call handler after finishing current task since we may be holding the agent lock
  422. // and the handler may also require it
  423. a.afterRun(func(ctx context.Context) {
  424. a.chanState <- newState
  425. })
  426. }
  427. }
  428. func (a *Agent) setSelectedPair(p *CandidatePair) {
  429. if p == nil {
  430. var nilPair *CandidatePair
  431. a.selectedPair.Store(nilPair)
  432. a.log.Tracef("Unset selected candidate pair")
  433. return
  434. }
  435. p.nominated = true
  436. a.selectedPair.Store(p)
  437. a.log.Tracef("Set selected candidate pair: %s", p)
  438. a.updateConnectionState(ConnectionStateConnected)
  439. // Notify when the selected pair changes
  440. a.afterRun(func(ctx context.Context) {
  441. select {
  442. case a.chanCandidatePair <- p:
  443. case <-ctx.Done():
  444. }
  445. })
  446. // Signal connected
  447. a.onConnectedOnce.Do(func() { close(a.onConnected) })
  448. }
  449. func (a *Agent) pingAllCandidates() {
  450. a.log.Trace("Pinging all candidates")
  451. if len(a.checklist) == 0 {
  452. a.log.Warn("Failed to ping without candidate pairs. Connection is not possible yet.")
  453. }
  454. for _, p := range a.checklist {
  455. if p.state == CandidatePairStateWaiting {
  456. p.state = CandidatePairStateInProgress
  457. } else if p.state != CandidatePairStateInProgress {
  458. continue
  459. }
  460. if p.bindingRequestCount > a.maxBindingRequests {
  461. a.log.Tracef("Maximum requests reached for pair %s, marking it as failed", p)
  462. p.state = CandidatePairStateFailed
  463. } else {
  464. a.selector.PingCandidate(p.Local, p.Remote)
  465. p.bindingRequestCount++
  466. }
  467. }
  468. }
  469. func (a *Agent) getBestAvailableCandidatePair() *CandidatePair {
  470. var best *CandidatePair
  471. for _, p := range a.checklist {
  472. if p.state == CandidatePairStateFailed {
  473. continue
  474. }
  475. if best == nil {
  476. best = p
  477. } else if best.priority() < p.priority() {
  478. best = p
  479. }
  480. }
  481. return best
  482. }
  483. func (a *Agent) getBestValidCandidatePair() *CandidatePair {
  484. var best *CandidatePair
  485. for _, p := range a.checklist {
  486. if p.state != CandidatePairStateSucceeded {
  487. continue
  488. }
  489. if best == nil {
  490. best = p
  491. } else if best.priority() < p.priority() {
  492. best = p
  493. }
  494. }
  495. return best
  496. }
  497. func (a *Agent) addPair(local, remote Candidate) *CandidatePair {
  498. p := newCandidatePair(local, remote, a.isControlling)
  499. a.checklist = append(a.checklist, p)
  500. return p
  501. }
  502. func (a *Agent) findPair(local, remote Candidate) *CandidatePair {
  503. for _, p := range a.checklist {
  504. if p.Local.Equal(local) && p.Remote.Equal(remote) {
  505. return p
  506. }
  507. }
  508. return nil
  509. }
  510. // validateSelectedPair checks if the selected pair is (still) valid
  511. // Note: the caller should hold the agent lock.
  512. func (a *Agent) validateSelectedPair() bool {
  513. selectedPair := a.getSelectedPair()
  514. if selectedPair == nil {
  515. return false
  516. }
  517. disconnectedTime := time.Since(selectedPair.Remote.LastReceived())
  518. // Only allow transitions to failed if a.failedTimeout is non-zero
  519. totalTimeToFailure := a.failedTimeout
  520. if totalTimeToFailure != 0 {
  521. totalTimeToFailure += a.disconnectedTimeout
  522. }
  523. switch {
  524. case totalTimeToFailure != 0 && disconnectedTime > totalTimeToFailure:
  525. a.updateConnectionState(ConnectionStateFailed)
  526. case a.disconnectedTimeout != 0 && disconnectedTime > a.disconnectedTimeout:
  527. a.updateConnectionState(ConnectionStateDisconnected)
  528. default:
  529. a.updateConnectionState(ConnectionStateConnected)
  530. }
  531. return true
  532. }
  533. // checkKeepalive sends STUN Binding Indications to the selected pair
  534. // if no packet has been sent on that pair in the last keepaliveInterval
  535. // Note: the caller should hold the agent lock.
  536. func (a *Agent) checkKeepalive() {
  537. selectedPair := a.getSelectedPair()
  538. if selectedPair == nil {
  539. return
  540. }
  541. if (a.keepaliveInterval != 0) &&
  542. ((time.Since(selectedPair.Local.LastSent()) > a.keepaliveInterval) ||
  543. (time.Since(selectedPair.Remote.LastReceived()) > a.keepaliveInterval)) {
  544. // We use binding request instead of indication to support refresh consent schemas
  545. // see https://tools.ietf.org/html/rfc7675
  546. a.selector.PingCandidate(selectedPair.Local, selectedPair.Remote)
  547. }
  548. }
  549. // AddRemoteCandidate adds a new remote candidate
  550. func (a *Agent) AddRemoteCandidate(c Candidate) error {
  551. if c == nil {
  552. return nil
  553. }
  554. // TCP Candidates with TCP type active will probe server passive ones, so
  555. // no need to do anything with them.
  556. if c.TCPType() == TCPTypeActive {
  557. a.log.Infof("Ignoring remote candidate with tcpType active: %s", c)
  558. return nil
  559. }
  560. // If we have a mDNS Candidate lets fully resolve it before adding it locally
  561. if c.Type() == CandidateTypeHost && strings.HasSuffix(c.Address(), ".local") {
  562. if a.mDNSMode == MulticastDNSModeDisabled {
  563. a.log.Warnf("Remote mDNS candidate added, but mDNS is disabled: (%s)", c.Address())
  564. return nil
  565. }
  566. hostCandidate, ok := c.(*CandidateHost)
  567. if !ok {
  568. return ErrAddressParseFailed
  569. }
  570. go a.resolveAndAddMulticastCandidate(hostCandidate)
  571. return nil
  572. }
  573. go func() {
  574. if err := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  575. // nolint: contextcheck
  576. agent.addRemoteCandidate(c)
  577. }); err != nil {
  578. a.log.Warnf("Failed to add remote candidate %s: %v", c.Address(), err)
  579. return
  580. }
  581. }()
  582. return nil
  583. }
  584. func (a *Agent) resolveAndAddMulticastCandidate(c *CandidateHost) {
  585. if a.mDNSConn == nil {
  586. return
  587. }
  588. _, src, err := a.mDNSConn.Query(c.context(), c.Address())
  589. if err != nil {
  590. a.log.Warnf("Failed to discover mDNS candidate %s: %v", c.Address(), err)
  591. return
  592. }
  593. ip, ipOk := parseMulticastAnswerAddr(src)
  594. if !ipOk {
  595. a.log.Warnf("Failed to discover mDNS candidate %s: failed to parse IP", c.Address())
  596. return
  597. }
  598. if err = c.setIP(ip); err != nil {
  599. a.log.Warnf("Failed to discover mDNS candidate %s: %v", c.Address(), err)
  600. return
  601. }
  602. if err = a.run(a.context(), func(ctx context.Context, agent *Agent) {
  603. // nolint: contextcheck
  604. agent.addRemoteCandidate(c)
  605. }); err != nil {
  606. a.log.Warnf("Failed to add mDNS candidate %s: %v", c.Address(), err)
  607. return
  608. }
  609. }
  610. func (a *Agent) requestConnectivityCheck() {
  611. select {
  612. case a.forceCandidateContact <- true:
  613. default:
  614. }
  615. }
  616. func (a *Agent) addRemotePassiveTCPCandidate(remoteCandidate Candidate) {
  617. localIPs, err := localInterfaces(a.net, a.interfaceFilter, a.ipFilter, []NetworkType{remoteCandidate.NetworkType()}, a.includeLoopback)
  618. if err != nil {
  619. a.log.Warnf("Failed to iterate local interfaces, host candidates will not be gathered %s", err)
  620. return
  621. }
  622. for i := range localIPs {
  623. conn := newActiveTCPConn(
  624. a.context(),
  625. net.JoinHostPort(localIPs[i].String(), "0"),
  626. net.JoinHostPort(remoteCandidate.Address(), strconv.Itoa(remoteCandidate.Port())),
  627. a.log,
  628. )
  629. tcpAddr, ok := conn.LocalAddr().(*net.TCPAddr)
  630. if !ok {
  631. closeConnAndLog(conn, a.log, "Failed to create Active ICE-TCP Candidate: %v", errInvalidAddress)
  632. continue
  633. }
  634. localCandidate, err := NewCandidateHost(&CandidateHostConfig{
  635. Network: remoteCandidate.NetworkType().String(),
  636. Address: localIPs[i].String(),
  637. Port: tcpAddr.Port,
  638. Component: ComponentRTP,
  639. TCPType: TCPTypeActive,
  640. })
  641. if err != nil {
  642. closeConnAndLog(conn, a.log, "Failed to create Active ICE-TCP Candidate: %v", err)
  643. continue
  644. }
  645. localCandidate.start(a, conn, a.startedCh)
  646. a.localCandidates[localCandidate.NetworkType()] = append(a.localCandidates[localCandidate.NetworkType()], localCandidate)
  647. a.chanCandidate <- localCandidate
  648. a.addPair(localCandidate, remoteCandidate)
  649. }
  650. }
  651. // addRemoteCandidate assumes you are holding the lock (must be execute using a.run)
  652. func (a *Agent) addRemoteCandidate(c Candidate) {
  653. set := a.remoteCandidates[c.NetworkType()]
  654. for _, candidate := range set {
  655. if candidate.Equal(c) {
  656. return
  657. }
  658. }
  659. tcpNetworkTypeFound := false
  660. for _, networkType := range a.networkTypes {
  661. if networkType.IsTCP() {
  662. tcpNetworkTypeFound = true
  663. }
  664. }
  665. if !a.disableActiveTCP && tcpNetworkTypeFound && c.TCPType() == TCPTypePassive {
  666. a.addRemotePassiveTCPCandidate(c)
  667. }
  668. set = append(set, c)
  669. a.remoteCandidates[c.NetworkType()] = set
  670. if c.TCPType() != TCPTypePassive {
  671. if localCandidates, ok := a.localCandidates[c.NetworkType()]; ok {
  672. for _, localCandidate := range localCandidates {
  673. a.addPair(localCandidate, c)
  674. }
  675. }
  676. }
  677. a.requestConnectivityCheck()
  678. }
  679. func (a *Agent) addCandidate(ctx context.Context, c Candidate, candidateConn net.PacketConn) error {
  680. return a.run(ctx, func(ctx context.Context, agent *Agent) {
  681. set := a.localCandidates[c.NetworkType()]
  682. for _, candidate := range set {
  683. if candidate.Equal(c) {
  684. a.log.Debugf("Ignore duplicate candidate: %s", c)
  685. if err := c.close(); err != nil {
  686. a.log.Warnf("Failed to close duplicate candidate: %v", err)
  687. }
  688. if err := candidateConn.Close(); err != nil {
  689. a.log.Warnf("Failed to close duplicate candidate connection: %v", err)
  690. }
  691. return
  692. }
  693. }
  694. c.start(a, candidateConn, a.startedCh)
  695. set = append(set, c)
  696. a.localCandidates[c.NetworkType()] = set
  697. if remoteCandidates, ok := a.remoteCandidates[c.NetworkType()]; ok {
  698. for _, remoteCandidate := range remoteCandidates {
  699. a.addPair(c, remoteCandidate)
  700. }
  701. }
  702. a.requestConnectivityCheck()
  703. a.chanCandidate <- c
  704. })
  705. }
  706. // GetRemoteCandidates returns the remote candidates
  707. func (a *Agent) GetRemoteCandidates() ([]Candidate, error) {
  708. var res []Candidate
  709. err := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  710. var candidates []Candidate
  711. for _, set := range agent.remoteCandidates {
  712. candidates = append(candidates, set...)
  713. }
  714. res = candidates
  715. })
  716. if err != nil {
  717. return nil, err
  718. }
  719. return res, nil
  720. }
  721. // GetLocalCandidates returns the local candidates
  722. func (a *Agent) GetLocalCandidates() ([]Candidate, error) {
  723. var res []Candidate
  724. err := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  725. var candidates []Candidate
  726. for _, set := range agent.localCandidates {
  727. candidates = append(candidates, set...)
  728. }
  729. res = candidates
  730. })
  731. if err != nil {
  732. return nil, err
  733. }
  734. return res, nil
  735. }
  736. // GetLocalUserCredentials returns the local user credentials
  737. func (a *Agent) GetLocalUserCredentials() (frag string, pwd string, err error) {
  738. valSet := make(chan struct{})
  739. err = a.run(a.context(), func(ctx context.Context, agent *Agent) {
  740. frag = agent.localUfrag
  741. pwd = agent.localPwd
  742. close(valSet)
  743. })
  744. if err == nil {
  745. <-valSet
  746. }
  747. return
  748. }
  749. // GetRemoteUserCredentials returns the remote user credentials
  750. func (a *Agent) GetRemoteUserCredentials() (frag string, pwd string, err error) {
  751. valSet := make(chan struct{})
  752. err = a.run(a.context(), func(ctx context.Context, agent *Agent) {
  753. frag = agent.remoteUfrag
  754. pwd = agent.remotePwd
  755. close(valSet)
  756. })
  757. if err == nil {
  758. <-valSet
  759. }
  760. return
  761. }
  762. func (a *Agent) removeUfragFromMux() {
  763. if a.tcpMux != nil {
  764. a.tcpMux.RemoveConnByUfrag(a.localUfrag)
  765. }
  766. if a.udpMux != nil {
  767. a.udpMux.RemoveConnByUfrag(a.localUfrag)
  768. }
  769. if a.udpMuxSrflx != nil {
  770. a.udpMuxSrflx.RemoveConnByUfrag(a.localUfrag)
  771. }
  772. }
  773. // Close cleans up the Agent
  774. func (a *Agent) Close() error {
  775. if err := a.ok(); err != nil {
  776. return err
  777. }
  778. a.afterRun(func(context.Context) {
  779. a.gatherCandidateCancel()
  780. if a.gatherCandidateDone != nil {
  781. <-a.gatherCandidateDone
  782. }
  783. })
  784. a.err.Store(ErrClosed)
  785. a.removeUfragFromMux()
  786. close(a.done)
  787. <-a.taskLoopDone
  788. return nil
  789. }
  790. // Remove all candidates. This closes any listening sockets
  791. // and removes both the local and remote candidate lists.
  792. //
  793. // This is used for restarts, failures and on close
  794. func (a *Agent) deleteAllCandidates() {
  795. for net, cs := range a.localCandidates {
  796. for _, c := range cs {
  797. if err := c.close(); err != nil {
  798. a.log.Warnf("Failed to close candidate %s: %v", c, err)
  799. }
  800. }
  801. delete(a.localCandidates, net)
  802. }
  803. for net, cs := range a.remoteCandidates {
  804. for _, c := range cs {
  805. if err := c.close(); err != nil {
  806. a.log.Warnf("Failed to close candidate %s: %v", c, err)
  807. }
  808. }
  809. delete(a.remoteCandidates, net)
  810. }
  811. }
  812. func (a *Agent) findRemoteCandidate(networkType NetworkType, addr net.Addr) Candidate {
  813. ip, port, _, ok := parseAddr(addr)
  814. if !ok {
  815. a.log.Warnf("Failed to parse address: %s", addr)
  816. return nil
  817. }
  818. set := a.remoteCandidates[networkType]
  819. for _, c := range set {
  820. if c.Address() == ip.String() && c.Port() == port {
  821. return c
  822. }
  823. }
  824. return nil
  825. }
  826. func (a *Agent) sendBindingRequest(m *stun.Message, local, remote Candidate) {
  827. a.log.Tracef("Ping STUN from %s to %s", local, remote)
  828. a.invalidatePendingBindingRequests(time.Now())
  829. a.pendingBindingRequests = append(a.pendingBindingRequests, bindingRequest{
  830. timestamp: time.Now(),
  831. transactionID: m.TransactionID,
  832. destination: remote.addr(),
  833. isUseCandidate: m.Contains(stun.AttrUseCandidate),
  834. })
  835. a.sendSTUN(m, local, remote)
  836. }
  837. func (a *Agent) sendBindingSuccess(m *stun.Message, local, remote Candidate) {
  838. base := remote
  839. ip, port, _, ok := parseAddr(base.addr())
  840. if !ok {
  841. a.log.Warnf("Failed to parse address: %s", base.addr())
  842. return
  843. }
  844. if out, err := stun.Build(m, stun.BindingSuccess,
  845. &stun.XORMappedAddress{
  846. IP: ip,
  847. Port: port,
  848. },
  849. stun.NewShortTermIntegrity(a.localPwd),
  850. stun.Fingerprint,
  851. ); err != nil {
  852. a.log.Warnf("Failed to handle inbound ICE from: %s to: %s error: %s", local, remote, err)
  853. } else {
  854. a.sendSTUN(out, local, remote)
  855. }
  856. }
  857. // Removes pending binding requests that are over maxBindingRequestTimeout old
  858. //
  859. // Let HTO be the transaction timeout, which SHOULD be 2*RTT if
  860. // RTT is known or 500 ms otherwise.
  861. // https://tools.ietf.org/html/rfc8445#appendix-B.1
  862. func (a *Agent) invalidatePendingBindingRequests(filterTime time.Time) {
  863. initialSize := len(a.pendingBindingRequests)
  864. temp := a.pendingBindingRequests[:0]
  865. for _, bindingRequest := range a.pendingBindingRequests {
  866. if filterTime.Sub(bindingRequest.timestamp) < maxBindingRequestTimeout {
  867. temp = append(temp, bindingRequest)
  868. }
  869. }
  870. a.pendingBindingRequests = temp
  871. if bindRequestsRemoved := initialSize - len(a.pendingBindingRequests); bindRequestsRemoved > 0 {
  872. a.log.Tracef("Discarded %d binding requests because they expired", bindRequestsRemoved)
  873. }
  874. }
  875. // Assert that the passed TransactionID is in our pendingBindingRequests and returns the destination
  876. // If the bindingRequest was valid remove it from our pending cache
  877. func (a *Agent) handleInboundBindingSuccess(id [stun.TransactionIDSize]byte) (bool, *bindingRequest) {
  878. a.invalidatePendingBindingRequests(time.Now())
  879. for i := range a.pendingBindingRequests {
  880. if a.pendingBindingRequests[i].transactionID == id {
  881. validBindingRequest := a.pendingBindingRequests[i]
  882. a.pendingBindingRequests = append(a.pendingBindingRequests[:i], a.pendingBindingRequests[i+1:]...)
  883. return true, &validBindingRequest
  884. }
  885. }
  886. return false, nil
  887. }
  888. // handleInbound processes STUN traffic from a remote candidate
  889. func (a *Agent) handleInbound(m *stun.Message, local Candidate, remote net.Addr) { //nolint:gocognit
  890. var err error
  891. if m == nil || local == nil {
  892. return
  893. }
  894. if m.Type.Method != stun.MethodBinding ||
  895. !(m.Type.Class == stun.ClassSuccessResponse ||
  896. m.Type.Class == stun.ClassRequest ||
  897. m.Type.Class == stun.ClassIndication) {
  898. a.log.Tracef("Unhandled STUN from %s to %s class(%s) method(%s)", remote, local, m.Type.Class, m.Type.Method)
  899. return
  900. }
  901. if a.isControlling {
  902. if m.Contains(stun.AttrICEControlling) {
  903. a.log.Debug("Inbound STUN message: isControlling && a.isControlling == true")
  904. return
  905. } else if m.Contains(stun.AttrUseCandidate) {
  906. a.log.Debug("Inbound STUN message: useCandidate && a.isControlling == true")
  907. return
  908. }
  909. } else {
  910. if m.Contains(stun.AttrICEControlled) {
  911. a.log.Debug("Inbound STUN message: isControlled && a.isControlling == false")
  912. return
  913. }
  914. }
  915. remoteCandidate := a.findRemoteCandidate(local.NetworkType(), remote)
  916. if m.Type.Class == stun.ClassSuccessResponse {
  917. if err = stun.MessageIntegrity([]byte(a.remotePwd)).Check(m); err != nil {
  918. a.log.Warnf("Discard message from (%s), %v", remote, err)
  919. return
  920. }
  921. if remoteCandidate == nil {
  922. a.log.Warnf("Discard success message from (%s), no such remote", remote)
  923. return
  924. }
  925. a.selector.HandleSuccessResponse(m, local, remoteCandidate, remote)
  926. } else if m.Type.Class == stun.ClassRequest {
  927. a.log.Tracef("Inbound STUN (Request) from %s to %s, useCandidate: %v", remote, local, m.Contains(stun.AttrUseCandidate))
  928. if err = stunx.AssertUsername(m, a.localUfrag+":"+a.remoteUfrag); err != nil {
  929. a.log.Warnf("Discard message from (%s), %v", remote, err)
  930. return
  931. } else if err = stun.MessageIntegrity([]byte(a.localPwd)).Check(m); err != nil {
  932. a.log.Warnf("Discard message from (%s), %v", remote, err)
  933. return
  934. }
  935. if remoteCandidate == nil {
  936. ip, port, networkType, ok := parseAddr(remote)
  937. if !ok {
  938. a.log.Errorf("Failed to create parse remote net.Addr when creating remote prflx candidate")
  939. return
  940. }
  941. prflxCandidateConfig := CandidatePeerReflexiveConfig{
  942. Network: networkType.String(),
  943. Address: ip.String(),
  944. Port: port,
  945. Component: local.Component(),
  946. RelAddr: "",
  947. RelPort: 0,
  948. }
  949. prflxCandidate, err := NewCandidatePeerReflexive(&prflxCandidateConfig)
  950. if err != nil {
  951. a.log.Errorf("Failed to create new remote prflx candidate (%s)", err)
  952. return
  953. }
  954. remoteCandidate = prflxCandidate
  955. a.log.Debugf("Adding a new peer-reflexive candidate: %s ", remote)
  956. a.addRemoteCandidate(remoteCandidate)
  957. }
  958. a.selector.HandleBindingRequest(m, local, remoteCandidate)
  959. }
  960. if remoteCandidate != nil {
  961. remoteCandidate.seen(false)
  962. }
  963. }
  964. // validateNonSTUNTraffic processes non STUN traffic from a remote candidate,
  965. // and returns true if it is an actual remote candidate
  966. func (a *Agent) validateNonSTUNTraffic(local Candidate, remote net.Addr) (Candidate, bool) {
  967. var remoteCandidate Candidate
  968. if err := a.run(local.context(), func(ctx context.Context, agent *Agent) {
  969. remoteCandidate = a.findRemoteCandidate(local.NetworkType(), remote)
  970. if remoteCandidate != nil {
  971. remoteCandidate.seen(false)
  972. }
  973. }); err != nil {
  974. a.log.Warnf("Failed to validate remote candidate: %v", err)
  975. }
  976. return remoteCandidate, remoteCandidate != nil
  977. }
  978. // GetSelectedCandidatePair returns the selected pair or nil if there is none
  979. func (a *Agent) GetSelectedCandidatePair() (*CandidatePair, error) {
  980. selectedPair := a.getSelectedPair()
  981. if selectedPair == nil {
  982. return nil, nil //nolint:nilnil
  983. }
  984. local, err := selectedPair.Local.copy()
  985. if err != nil {
  986. return nil, err
  987. }
  988. remote, err := selectedPair.Remote.copy()
  989. if err != nil {
  990. return nil, err
  991. }
  992. return &CandidatePair{Local: local, Remote: remote}, nil
  993. }
  994. func (a *Agent) getSelectedPair() *CandidatePair {
  995. if selectedPair, ok := a.selectedPair.Load().(*CandidatePair); ok {
  996. return selectedPair
  997. }
  998. return nil
  999. }
  1000. func (a *Agent) closeMulticastConn() {
  1001. if a.mDNSConn != nil {
  1002. if err := a.mDNSConn.Close(); err != nil {
  1003. a.log.Warnf("Failed to close mDNS Conn: %v", err)
  1004. }
  1005. }
  1006. }
  1007. // SetRemoteCredentials sets the credentials of the remote agent
  1008. func (a *Agent) SetRemoteCredentials(remoteUfrag, remotePwd string) error {
  1009. switch {
  1010. case remoteUfrag == "":
  1011. return ErrRemoteUfragEmpty
  1012. case remotePwd == "":
  1013. return ErrRemotePwdEmpty
  1014. }
  1015. return a.run(a.context(), func(ctx context.Context, agent *Agent) {
  1016. agent.remoteUfrag = remoteUfrag
  1017. agent.remotePwd = remotePwd
  1018. })
  1019. }
  1020. // Restart restarts the ICE Agent with the provided ufrag/pwd
  1021. // If no ufrag/pwd is provided the Agent will generate one itself
  1022. //
  1023. // If there is a gatherer routine currently running, Restart will
  1024. // cancel it.
  1025. // After a Restart, the user must then call GatherCandidates explicitly
  1026. // to start generating new ones.
  1027. func (a *Agent) Restart(ufrag, pwd string) error {
  1028. if ufrag == "" {
  1029. var err error
  1030. ufrag, err = generateUFrag()
  1031. if err != nil {
  1032. return err
  1033. }
  1034. }
  1035. if pwd == "" {
  1036. var err error
  1037. pwd, err = generatePwd()
  1038. if err != nil {
  1039. return err
  1040. }
  1041. }
  1042. if len([]rune(ufrag))*8 < 24 {
  1043. return ErrLocalUfragInsufficientBits
  1044. }
  1045. if len([]rune(pwd))*8 < 128 {
  1046. return ErrLocalPwdInsufficientBits
  1047. }
  1048. var err error
  1049. if runErr := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  1050. if agent.gatheringState == GatheringStateGathering {
  1051. agent.gatherCandidateCancel()
  1052. }
  1053. // Clear all agent needed to take back to fresh state
  1054. a.removeUfragFromMux()
  1055. agent.localUfrag = ufrag
  1056. agent.localPwd = pwd
  1057. agent.remoteUfrag = ""
  1058. agent.remotePwd = ""
  1059. a.gatheringState = GatheringStateNew
  1060. a.checklist = make([]*CandidatePair, 0)
  1061. a.pendingBindingRequests = make([]bindingRequest, 0)
  1062. a.setSelectedPair(nil)
  1063. a.deleteAllCandidates()
  1064. if a.selector != nil {
  1065. a.selector.Start()
  1066. }
  1067. // Restart is used by NewAgent. Accept/Connect should be used to move to checking
  1068. // for new Agents
  1069. if a.connectionState != ConnectionStateNew {
  1070. a.updateConnectionState(ConnectionStateChecking)
  1071. }
  1072. }); runErr != nil {
  1073. return runErr
  1074. }
  1075. return err
  1076. }
  1077. func (a *Agent) setGatheringState(newState GatheringState) error {
  1078. done := make(chan struct{})
  1079. if err := a.run(a.context(), func(ctx context.Context, agent *Agent) {
  1080. if a.gatheringState != newState && newState == GatheringStateComplete {
  1081. a.chanCandidate <- nil
  1082. }
  1083. a.gatheringState = newState
  1084. close(done)
  1085. }); err != nil {
  1086. return err
  1087. }
  1088. <-done
  1089. return nil
  1090. }