icegatherer.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. //go:build !js
  4. // +build !js
  5. package webrtc
  6. import (
  7. "fmt"
  8. "sync"
  9. "sync/atomic"
  10. "github.com/pion/ice/v2"
  11. "github.com/pion/logging"
  12. "github.com/pion/stun"
  13. )
  14. // ICEGatherer gathers local host, server reflexive and relay
  15. // candidates, as well as enabling the retrieval of local Interactive
  16. // Connectivity Establishment (ICE) parameters which can be
  17. // exchanged in signaling.
  18. type ICEGatherer struct {
  19. lock sync.RWMutex
  20. log logging.LeveledLogger
  21. state ICEGathererState
  22. validatedServers []*stun.URI
  23. gatherPolicy ICETransportPolicy
  24. agent *ice.Agent
  25. onLocalCandidateHandler atomic.Value // func(candidate *ICECandidate)
  26. onStateChangeHandler atomic.Value // func(state ICEGathererState)
  27. // Used for GatheringCompletePromise
  28. onGatheringCompleteHandler atomic.Value // func()
  29. api *API
  30. }
  31. // NewICEGatherer creates a new NewICEGatherer.
  32. // This constructor is part of the ORTC API. It is not
  33. // meant to be used together with the basic WebRTC API.
  34. func (api *API) NewICEGatherer(opts ICEGatherOptions) (*ICEGatherer, error) {
  35. var validatedServers []*stun.URI
  36. if len(opts.ICEServers) > 0 {
  37. for _, server := range opts.ICEServers {
  38. url, err := server.urls()
  39. if err != nil {
  40. return nil, err
  41. }
  42. validatedServers = append(validatedServers, url...)
  43. }
  44. }
  45. return &ICEGatherer{
  46. state: ICEGathererStateNew,
  47. gatherPolicy: opts.ICEGatherPolicy,
  48. validatedServers: validatedServers,
  49. api: api,
  50. log: api.settingEngine.LoggerFactory.NewLogger("ice"),
  51. }, nil
  52. }
  53. func (g *ICEGatherer) createAgent() error {
  54. g.lock.Lock()
  55. defer g.lock.Unlock()
  56. if g.agent != nil || g.State() != ICEGathererStateNew {
  57. return nil
  58. }
  59. candidateTypes := []ice.CandidateType{}
  60. if g.api.settingEngine.candidates.ICELite {
  61. candidateTypes = append(candidateTypes, ice.CandidateTypeHost)
  62. } else if g.gatherPolicy == ICETransportPolicyRelay {
  63. candidateTypes = append(candidateTypes, ice.CandidateTypeRelay)
  64. }
  65. var nat1To1CandiTyp ice.CandidateType
  66. switch g.api.settingEngine.candidates.NAT1To1IPCandidateType {
  67. case ICECandidateTypeHost:
  68. nat1To1CandiTyp = ice.CandidateTypeHost
  69. case ICECandidateTypeSrflx:
  70. nat1To1CandiTyp = ice.CandidateTypeServerReflexive
  71. default:
  72. nat1To1CandiTyp = ice.CandidateTypeUnspecified
  73. }
  74. mDNSMode := g.api.settingEngine.candidates.MulticastDNSMode
  75. if mDNSMode != ice.MulticastDNSModeDisabled && mDNSMode != ice.MulticastDNSModeQueryAndGather {
  76. // If enum is in state we don't recognized default to MulticastDNSModeQueryOnly
  77. mDNSMode = ice.MulticastDNSModeQueryOnly
  78. }
  79. config := &ice.AgentConfig{
  80. Lite: g.api.settingEngine.candidates.ICELite,
  81. Urls: g.validatedServers,
  82. PortMin: g.api.settingEngine.ephemeralUDP.PortMin,
  83. PortMax: g.api.settingEngine.ephemeralUDP.PortMax,
  84. DisconnectedTimeout: g.api.settingEngine.timeout.ICEDisconnectedTimeout,
  85. FailedTimeout: g.api.settingEngine.timeout.ICEFailedTimeout,
  86. KeepaliveInterval: g.api.settingEngine.timeout.ICEKeepaliveInterval,
  87. LoggerFactory: g.api.settingEngine.LoggerFactory,
  88. CandidateTypes: candidateTypes,
  89. HostAcceptanceMinWait: g.api.settingEngine.timeout.ICEHostAcceptanceMinWait,
  90. SrflxAcceptanceMinWait: g.api.settingEngine.timeout.ICESrflxAcceptanceMinWait,
  91. PrflxAcceptanceMinWait: g.api.settingEngine.timeout.ICEPrflxAcceptanceMinWait,
  92. RelayAcceptanceMinWait: g.api.settingEngine.timeout.ICERelayAcceptanceMinWait,
  93. InterfaceFilter: g.api.settingEngine.candidates.InterfaceFilter,
  94. IPFilter: g.api.settingEngine.candidates.IPFilter,
  95. NAT1To1IPs: g.api.settingEngine.candidates.NAT1To1IPs,
  96. NAT1To1IPCandidateType: nat1To1CandiTyp,
  97. IncludeLoopback: g.api.settingEngine.candidates.IncludeLoopbackCandidate,
  98. Net: g.api.settingEngine.net,
  99. MulticastDNSMode: mDNSMode,
  100. MulticastDNSHostName: g.api.settingEngine.candidates.MulticastDNSHostName,
  101. LocalUfrag: g.api.settingEngine.candidates.UsernameFragment,
  102. LocalPwd: g.api.settingEngine.candidates.Password,
  103. TCPMux: g.api.settingEngine.iceTCPMux,
  104. UDPMux: g.api.settingEngine.iceUDPMux,
  105. ProxyDialer: g.api.settingEngine.iceProxyDialer,
  106. DisableActiveTCP: g.api.settingEngine.iceDisableActiveTCP,
  107. BindingRequestHandler: g.api.settingEngine.iceBindingRequestHandler,
  108. // [Psiphon] from https://github.com/pion/webrtc/pull/2298, https://github.com/pion/webrtc/commit/906f20c4
  109. UDPMuxSrflx: g.api.settingEngine.iceUDPMuxSrflx,
  110. MaxBindingRequests: g.api.settingEngine.iceMaxBindingRequests,
  111. }
  112. requestedNetworkTypes := g.api.settingEngine.candidates.ICENetworkTypes
  113. if len(requestedNetworkTypes) == 0 {
  114. requestedNetworkTypes = supportedNetworkTypes()
  115. }
  116. for _, typ := range requestedNetworkTypes {
  117. config.NetworkTypes = append(config.NetworkTypes, ice.NetworkType(typ))
  118. }
  119. agent, err := ice.NewAgent(config)
  120. if err != nil {
  121. return err
  122. }
  123. g.agent = agent
  124. return nil
  125. }
  126. // Gather ICE candidates.
  127. func (g *ICEGatherer) Gather() error {
  128. if err := g.createAgent(); err != nil {
  129. return err
  130. }
  131. agent := g.getAgent()
  132. // it is possible agent had just been closed
  133. if agent == nil {
  134. return fmt.Errorf("%w: unable to gather", errICEAgentNotExist)
  135. }
  136. g.setState(ICEGathererStateGathering)
  137. if err := agent.OnCandidate(func(candidate ice.Candidate) {
  138. onLocalCandidateHandler := func(*ICECandidate) {}
  139. if handler, ok := g.onLocalCandidateHandler.Load().(func(candidate *ICECandidate)); ok && handler != nil {
  140. onLocalCandidateHandler = handler
  141. }
  142. onGatheringCompleteHandler := func() {}
  143. if handler, ok := g.onGatheringCompleteHandler.Load().(func()); ok && handler != nil {
  144. onGatheringCompleteHandler = handler
  145. }
  146. if candidate != nil {
  147. c, err := newICECandidateFromICE(candidate)
  148. if err != nil {
  149. g.log.Warnf("Failed to convert ice.Candidate: %s", err)
  150. return
  151. }
  152. onLocalCandidateHandler(&c)
  153. } else {
  154. g.setState(ICEGathererStateComplete)
  155. onGatheringCompleteHandler()
  156. onLocalCandidateHandler(nil)
  157. }
  158. }); err != nil {
  159. return err
  160. }
  161. return agent.GatherCandidates()
  162. }
  163. // Close prunes all local candidates, and closes the ports.
  164. func (g *ICEGatherer) Close() error {
  165. g.lock.Lock()
  166. defer g.lock.Unlock()
  167. if g.agent == nil {
  168. return nil
  169. } else if err := g.agent.Close(); err != nil {
  170. return err
  171. }
  172. g.agent = nil
  173. g.setState(ICEGathererStateClosed)
  174. return nil
  175. }
  176. // GetLocalParameters returns the ICE parameters of the ICEGatherer.
  177. func (g *ICEGatherer) GetLocalParameters() (ICEParameters, error) {
  178. if err := g.createAgent(); err != nil {
  179. return ICEParameters{}, err
  180. }
  181. agent := g.getAgent()
  182. // it is possible agent had just been closed
  183. if agent == nil {
  184. return ICEParameters{}, fmt.Errorf("%w: unable to get local parameters", errICEAgentNotExist)
  185. }
  186. frag, pwd, err := agent.GetLocalUserCredentials()
  187. if err != nil {
  188. return ICEParameters{}, err
  189. }
  190. return ICEParameters{
  191. UsernameFragment: frag,
  192. Password: pwd,
  193. ICELite: false,
  194. }, nil
  195. }
  196. // GetLocalCandidates returns the sequence of valid local candidates associated with the ICEGatherer.
  197. func (g *ICEGatherer) GetLocalCandidates() ([]ICECandidate, error) {
  198. if err := g.createAgent(); err != nil {
  199. return nil, err
  200. }
  201. agent := g.getAgent()
  202. // it is possible agent had just been closed
  203. if agent == nil {
  204. return nil, fmt.Errorf("%w: unable to get local candidates", errICEAgentNotExist)
  205. }
  206. iceCandidates, err := agent.GetLocalCandidates()
  207. if err != nil {
  208. return nil, err
  209. }
  210. return newICECandidatesFromICE(iceCandidates)
  211. }
  212. // OnLocalCandidate sets an event handler which fires when a new local ICE candidate is available
  213. // Take note that the handler will be called with a nil pointer when gathering is finished.
  214. func (g *ICEGatherer) OnLocalCandidate(f func(*ICECandidate)) {
  215. g.onLocalCandidateHandler.Store(f)
  216. }
  217. // OnStateChange fires any time the ICEGatherer changes
  218. func (g *ICEGatherer) OnStateChange(f func(ICEGathererState)) {
  219. g.onStateChangeHandler.Store(f)
  220. }
  221. // State indicates the current state of the ICE gatherer.
  222. func (g *ICEGatherer) State() ICEGathererState {
  223. return atomicLoadICEGathererState(&g.state)
  224. }
  225. func (g *ICEGatherer) setState(s ICEGathererState) {
  226. atomicStoreICEGathererState(&g.state, s)
  227. if handler, ok := g.onStateChangeHandler.Load().(func(state ICEGathererState)); ok && handler != nil {
  228. handler(s)
  229. }
  230. }
  231. func (g *ICEGatherer) getAgent() *ice.Agent {
  232. g.lock.RLock()
  233. defer g.lock.RUnlock()
  234. return g.agent
  235. }
  236. func (g *ICEGatherer) collectStats(collector *statsReportCollector) {
  237. agent := g.getAgent()
  238. if agent == nil {
  239. return
  240. }
  241. collector.Collecting()
  242. go func(collector *statsReportCollector, agent *ice.Agent) {
  243. for _, candidatePairStats := range agent.GetCandidatePairsStats() {
  244. collector.Collecting()
  245. state, err := toStatsICECandidatePairState(candidatePairStats.State)
  246. if err != nil {
  247. g.log.Error(err.Error())
  248. }
  249. pairID := newICECandidatePairStatsID(candidatePairStats.LocalCandidateID,
  250. candidatePairStats.RemoteCandidateID)
  251. stats := ICECandidatePairStats{
  252. Timestamp: statsTimestampFrom(candidatePairStats.Timestamp),
  253. Type: StatsTypeCandidatePair,
  254. ID: pairID,
  255. // TransportID:
  256. LocalCandidateID: candidatePairStats.LocalCandidateID,
  257. RemoteCandidateID: candidatePairStats.RemoteCandidateID,
  258. State: state,
  259. Nominated: candidatePairStats.Nominated,
  260. PacketsSent: candidatePairStats.PacketsSent,
  261. PacketsReceived: candidatePairStats.PacketsReceived,
  262. BytesSent: candidatePairStats.BytesSent,
  263. BytesReceived: candidatePairStats.BytesReceived,
  264. LastPacketSentTimestamp: statsTimestampFrom(candidatePairStats.LastPacketSentTimestamp),
  265. LastPacketReceivedTimestamp: statsTimestampFrom(candidatePairStats.LastPacketReceivedTimestamp),
  266. FirstRequestTimestamp: statsTimestampFrom(candidatePairStats.FirstRequestTimestamp),
  267. LastRequestTimestamp: statsTimestampFrom(candidatePairStats.LastRequestTimestamp),
  268. LastResponseTimestamp: statsTimestampFrom(candidatePairStats.LastResponseTimestamp),
  269. TotalRoundTripTime: candidatePairStats.TotalRoundTripTime,
  270. CurrentRoundTripTime: candidatePairStats.CurrentRoundTripTime,
  271. AvailableOutgoingBitrate: candidatePairStats.AvailableOutgoingBitrate,
  272. AvailableIncomingBitrate: candidatePairStats.AvailableIncomingBitrate,
  273. CircuitBreakerTriggerCount: candidatePairStats.CircuitBreakerTriggerCount,
  274. RequestsReceived: candidatePairStats.RequestsReceived,
  275. RequestsSent: candidatePairStats.RequestsSent,
  276. ResponsesReceived: candidatePairStats.ResponsesReceived,
  277. ResponsesSent: candidatePairStats.ResponsesSent,
  278. RetransmissionsReceived: candidatePairStats.RetransmissionsReceived,
  279. RetransmissionsSent: candidatePairStats.RetransmissionsSent,
  280. ConsentRequestsSent: candidatePairStats.ConsentRequestsSent,
  281. ConsentExpiredTimestamp: statsTimestampFrom(candidatePairStats.ConsentExpiredTimestamp),
  282. }
  283. collector.Collect(stats.ID, stats)
  284. }
  285. for _, candidateStats := range agent.GetLocalCandidatesStats() {
  286. collector.Collecting()
  287. networkType, err := getNetworkType(candidateStats.NetworkType)
  288. if err != nil {
  289. g.log.Error(err.Error())
  290. }
  291. candidateType, err := getCandidateType(candidateStats.CandidateType)
  292. if err != nil {
  293. g.log.Error(err.Error())
  294. }
  295. stats := ICECandidateStats{
  296. Timestamp: statsTimestampFrom(candidateStats.Timestamp),
  297. ID: candidateStats.ID,
  298. Type: StatsTypeLocalCandidate,
  299. IP: candidateStats.IP,
  300. Port: int32(candidateStats.Port),
  301. Protocol: networkType.Protocol(),
  302. CandidateType: candidateType,
  303. Priority: int32(candidateStats.Priority),
  304. URL: candidateStats.URL,
  305. RelayProtocol: candidateStats.RelayProtocol,
  306. Deleted: candidateStats.Deleted,
  307. }
  308. collector.Collect(stats.ID, stats)
  309. }
  310. for _, candidateStats := range agent.GetRemoteCandidatesStats() {
  311. collector.Collecting()
  312. networkType, err := getNetworkType(candidateStats.NetworkType)
  313. if err != nil {
  314. g.log.Error(err.Error())
  315. }
  316. candidateType, err := getCandidateType(candidateStats.CandidateType)
  317. if err != nil {
  318. g.log.Error(err.Error())
  319. }
  320. stats := ICECandidateStats{
  321. Timestamp: statsTimestampFrom(candidateStats.Timestamp),
  322. ID: candidateStats.ID,
  323. Type: StatsTypeRemoteCandidate,
  324. IP: candidateStats.IP,
  325. Port: int32(candidateStats.Port),
  326. Protocol: networkType.Protocol(),
  327. CandidateType: candidateType,
  328. Priority: int32(candidateStats.Priority),
  329. URL: candidateStats.URL,
  330. RelayProtocol: candidateStats.RelayProtocol,
  331. }
  332. collector.Collect(stats.ID, stats)
  333. }
  334. collector.Done()
  335. }(collector, agent)
  336. }