icegatherer.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. //go:build !js
  4. // +build !js
  5. package webrtc
  6. import (
  7. "fmt"
  8. "sync"
  9. "sync/atomic"
  10. "github.com/pion/ice/v2"
  11. "github.com/pion/logging"
  12. "github.com/pion/stun"
  13. )
  14. // ICEGatherer gathers local host, server reflexive and relay
  15. // candidates, as well as enabling the retrieval of local Interactive
  16. // Connectivity Establishment (ICE) parameters which can be
  17. // exchanged in signaling.
  18. type ICEGatherer struct {
  19. lock sync.RWMutex
  20. log logging.LeveledLogger
  21. state ICEGathererState
  22. validatedServers []*stun.URI
  23. gatherPolicy ICETransportPolicy
  24. agent *ice.Agent
  25. onLocalCandidateHandler atomic.Value // func(candidate *ICECandidate)
  26. onStateChangeHandler atomic.Value // func(state ICEGathererState)
  27. // Used for GatheringCompletePromise
  28. onGatheringCompleteHandler atomic.Value // func()
  29. api *API
  30. }
  31. // NewICEGatherer creates a new NewICEGatherer.
  32. // This constructor is part of the ORTC API. It is not
  33. // meant to be used together with the basic WebRTC API.
  34. func (api *API) NewICEGatherer(opts ICEGatherOptions) (*ICEGatherer, error) {
  35. var validatedServers []*stun.URI
  36. if len(opts.ICEServers) > 0 {
  37. for _, server := range opts.ICEServers {
  38. url, err := server.urls()
  39. if err != nil {
  40. return nil, err
  41. }
  42. validatedServers = append(validatedServers, url...)
  43. }
  44. }
  45. return &ICEGatherer{
  46. state: ICEGathererStateNew,
  47. gatherPolicy: opts.ICEGatherPolicy,
  48. validatedServers: validatedServers,
  49. api: api,
  50. log: api.settingEngine.LoggerFactory.NewLogger("ice"),
  51. }, nil
  52. }
  53. func (g *ICEGatherer) createAgent() error {
  54. g.lock.Lock()
  55. defer g.lock.Unlock()
  56. if g.agent != nil || g.State() != ICEGathererStateNew {
  57. return nil
  58. }
  59. candidateTypes := []ice.CandidateType{}
  60. if g.api.settingEngine.candidates.ICELite {
  61. candidateTypes = append(candidateTypes, ice.CandidateTypeHost)
  62. } else if g.gatherPolicy == ICETransportPolicyRelay {
  63. candidateTypes = append(candidateTypes, ice.CandidateTypeRelay)
  64. }
  65. var nat1To1CandiTyp ice.CandidateType
  66. switch g.api.settingEngine.candidates.NAT1To1IPCandidateType {
  67. case ICECandidateTypeHost:
  68. nat1To1CandiTyp = ice.CandidateTypeHost
  69. case ICECandidateTypeSrflx:
  70. nat1To1CandiTyp = ice.CandidateTypeServerReflexive
  71. default:
  72. nat1To1CandiTyp = ice.CandidateTypeUnspecified
  73. }
  74. mDNSMode := g.api.settingEngine.candidates.MulticastDNSMode
  75. if mDNSMode != ice.MulticastDNSModeDisabled && mDNSMode != ice.MulticastDNSModeQueryAndGather {
  76. // If enum is in state we don't recognized default to MulticastDNSModeQueryOnly
  77. mDNSMode = ice.MulticastDNSModeQueryOnly
  78. }
  79. config := &ice.AgentConfig{
  80. Lite: g.api.settingEngine.candidates.ICELite,
  81. Urls: g.validatedServers,
  82. PortMin: g.api.settingEngine.ephemeralUDP.PortMin,
  83. PortMax: g.api.settingEngine.ephemeralUDP.PortMax,
  84. DisconnectedTimeout: g.api.settingEngine.timeout.ICEDisconnectedTimeout,
  85. FailedTimeout: g.api.settingEngine.timeout.ICEFailedTimeout,
  86. KeepaliveInterval: g.api.settingEngine.timeout.ICEKeepaliveInterval,
  87. LoggerFactory: g.api.settingEngine.LoggerFactory,
  88. CandidateTypes: candidateTypes,
  89. HostAcceptanceMinWait: g.api.settingEngine.timeout.ICEHostAcceptanceMinWait,
  90. SrflxAcceptanceMinWait: g.api.settingEngine.timeout.ICESrflxAcceptanceMinWait,
  91. PrflxAcceptanceMinWait: g.api.settingEngine.timeout.ICEPrflxAcceptanceMinWait,
  92. RelayAcceptanceMinWait: g.api.settingEngine.timeout.ICERelayAcceptanceMinWait,
  93. InterfaceFilter: g.api.settingEngine.candidates.InterfaceFilter,
  94. IPFilter: g.api.settingEngine.candidates.IPFilter,
  95. NAT1To1IPs: g.api.settingEngine.candidates.NAT1To1IPs,
  96. NAT1To1IPCandidateType: nat1To1CandiTyp,
  97. IncludeLoopback: g.api.settingEngine.candidates.IncludeLoopbackCandidate,
  98. Net: g.api.settingEngine.net,
  99. MulticastDNSMode: mDNSMode,
  100. MulticastDNSHostName: g.api.settingEngine.candidates.MulticastDNSHostName,
  101. LocalUfrag: g.api.settingEngine.candidates.UsernameFragment,
  102. LocalPwd: g.api.settingEngine.candidates.Password,
  103. TCPMux: g.api.settingEngine.iceTCPMux,
  104. UDPMux: g.api.settingEngine.iceUDPMux,
  105. ProxyDialer: g.api.settingEngine.iceProxyDialer,
  106. DisableActiveTCP: g.api.settingEngine.iceDisableActiveTCP,
  107. // [Psiphon] from https://github.com/pion/webrtc/pull/2298
  108. UDPMuxSrflx: g.api.settingEngine.iceUDPMuxSrflx,
  109. }
  110. requestedNetworkTypes := g.api.settingEngine.candidates.ICENetworkTypes
  111. if len(requestedNetworkTypes) == 0 {
  112. requestedNetworkTypes = supportedNetworkTypes()
  113. }
  114. for _, typ := range requestedNetworkTypes {
  115. config.NetworkTypes = append(config.NetworkTypes, ice.NetworkType(typ))
  116. }
  117. agent, err := ice.NewAgent(config)
  118. if err != nil {
  119. return err
  120. }
  121. g.agent = agent
  122. return nil
  123. }
  124. // Gather ICE candidates.
  125. func (g *ICEGatherer) Gather() error {
  126. if err := g.createAgent(); err != nil {
  127. return err
  128. }
  129. agent := g.getAgent()
  130. // it is possible agent had just been closed
  131. if agent == nil {
  132. return fmt.Errorf("%w: unable to gather", errICEAgentNotExist)
  133. }
  134. g.setState(ICEGathererStateGathering)
  135. if err := agent.OnCandidate(func(candidate ice.Candidate) {
  136. onLocalCandidateHandler := func(*ICECandidate) {}
  137. if handler, ok := g.onLocalCandidateHandler.Load().(func(candidate *ICECandidate)); ok && handler != nil {
  138. onLocalCandidateHandler = handler
  139. }
  140. onGatheringCompleteHandler := func() {}
  141. if handler, ok := g.onGatheringCompleteHandler.Load().(func()); ok && handler != nil {
  142. onGatheringCompleteHandler = handler
  143. }
  144. if candidate != nil {
  145. c, err := newICECandidateFromICE(candidate)
  146. if err != nil {
  147. g.log.Warnf("Failed to convert ice.Candidate: %s", err)
  148. return
  149. }
  150. onLocalCandidateHandler(&c)
  151. } else {
  152. g.setState(ICEGathererStateComplete)
  153. onGatheringCompleteHandler()
  154. onLocalCandidateHandler(nil)
  155. }
  156. }); err != nil {
  157. return err
  158. }
  159. return agent.GatherCandidates()
  160. }
  161. // Close prunes all local candidates, and closes the ports.
  162. func (g *ICEGatherer) Close() error {
  163. g.lock.Lock()
  164. defer g.lock.Unlock()
  165. if g.agent == nil {
  166. return nil
  167. } else if err := g.agent.Close(); err != nil {
  168. return err
  169. }
  170. g.agent = nil
  171. g.setState(ICEGathererStateClosed)
  172. return nil
  173. }
  174. // GetLocalParameters returns the ICE parameters of the ICEGatherer.
  175. func (g *ICEGatherer) GetLocalParameters() (ICEParameters, error) {
  176. if err := g.createAgent(); err != nil {
  177. return ICEParameters{}, err
  178. }
  179. agent := g.getAgent()
  180. // it is possible agent had just been closed
  181. if agent == nil {
  182. return ICEParameters{}, fmt.Errorf("%w: unable to get local parameters", errICEAgentNotExist)
  183. }
  184. frag, pwd, err := agent.GetLocalUserCredentials()
  185. if err != nil {
  186. return ICEParameters{}, err
  187. }
  188. return ICEParameters{
  189. UsernameFragment: frag,
  190. Password: pwd,
  191. ICELite: false,
  192. }, nil
  193. }
  194. // GetLocalCandidates returns the sequence of valid local candidates associated with the ICEGatherer.
  195. func (g *ICEGatherer) GetLocalCandidates() ([]ICECandidate, error) {
  196. if err := g.createAgent(); err != nil {
  197. return nil, err
  198. }
  199. agent := g.getAgent()
  200. // it is possible agent had just been closed
  201. if agent == nil {
  202. return nil, fmt.Errorf("%w: unable to get local candidates", errICEAgentNotExist)
  203. }
  204. iceCandidates, err := agent.GetLocalCandidates()
  205. if err != nil {
  206. return nil, err
  207. }
  208. return newICECandidatesFromICE(iceCandidates)
  209. }
  210. // OnLocalCandidate sets an event handler which fires when a new local ICE candidate is available
  211. // Take note that the handler will be called with a nil pointer when gathering is finished.
  212. func (g *ICEGatherer) OnLocalCandidate(f func(*ICECandidate)) {
  213. g.onLocalCandidateHandler.Store(f)
  214. }
  215. // OnStateChange fires any time the ICEGatherer changes
  216. func (g *ICEGatherer) OnStateChange(f func(ICEGathererState)) {
  217. g.onStateChangeHandler.Store(f)
  218. }
  219. // State indicates the current state of the ICE gatherer.
  220. func (g *ICEGatherer) State() ICEGathererState {
  221. return atomicLoadICEGathererState(&g.state)
  222. }
  223. func (g *ICEGatherer) setState(s ICEGathererState) {
  224. atomicStoreICEGathererState(&g.state, s)
  225. if handler, ok := g.onStateChangeHandler.Load().(func(state ICEGathererState)); ok && handler != nil {
  226. handler(s)
  227. }
  228. }
  229. func (g *ICEGatherer) getAgent() *ice.Agent {
  230. g.lock.RLock()
  231. defer g.lock.RUnlock()
  232. return g.agent
  233. }
  234. func (g *ICEGatherer) collectStats(collector *statsReportCollector) {
  235. agent := g.getAgent()
  236. if agent == nil {
  237. return
  238. }
  239. collector.Collecting()
  240. go func(collector *statsReportCollector, agent *ice.Agent) {
  241. for _, candidatePairStats := range agent.GetCandidatePairsStats() {
  242. collector.Collecting()
  243. state, err := toStatsICECandidatePairState(candidatePairStats.State)
  244. if err != nil {
  245. g.log.Error(err.Error())
  246. }
  247. pairID := newICECandidatePairStatsID(candidatePairStats.LocalCandidateID,
  248. candidatePairStats.RemoteCandidateID)
  249. stats := ICECandidatePairStats{
  250. Timestamp: statsTimestampFrom(candidatePairStats.Timestamp),
  251. Type: StatsTypeCandidatePair,
  252. ID: pairID,
  253. // TransportID:
  254. LocalCandidateID: candidatePairStats.LocalCandidateID,
  255. RemoteCandidateID: candidatePairStats.RemoteCandidateID,
  256. State: state,
  257. Nominated: candidatePairStats.Nominated,
  258. PacketsSent: candidatePairStats.PacketsSent,
  259. PacketsReceived: candidatePairStats.PacketsReceived,
  260. BytesSent: candidatePairStats.BytesSent,
  261. BytesReceived: candidatePairStats.BytesReceived,
  262. LastPacketSentTimestamp: statsTimestampFrom(candidatePairStats.LastPacketSentTimestamp),
  263. LastPacketReceivedTimestamp: statsTimestampFrom(candidatePairStats.LastPacketReceivedTimestamp),
  264. FirstRequestTimestamp: statsTimestampFrom(candidatePairStats.FirstRequestTimestamp),
  265. LastRequestTimestamp: statsTimestampFrom(candidatePairStats.LastRequestTimestamp),
  266. LastResponseTimestamp: statsTimestampFrom(candidatePairStats.LastResponseTimestamp),
  267. TotalRoundTripTime: candidatePairStats.TotalRoundTripTime,
  268. CurrentRoundTripTime: candidatePairStats.CurrentRoundTripTime,
  269. AvailableOutgoingBitrate: candidatePairStats.AvailableOutgoingBitrate,
  270. AvailableIncomingBitrate: candidatePairStats.AvailableIncomingBitrate,
  271. CircuitBreakerTriggerCount: candidatePairStats.CircuitBreakerTriggerCount,
  272. RequestsReceived: candidatePairStats.RequestsReceived,
  273. RequestsSent: candidatePairStats.RequestsSent,
  274. ResponsesReceived: candidatePairStats.ResponsesReceived,
  275. ResponsesSent: candidatePairStats.ResponsesSent,
  276. RetransmissionsReceived: candidatePairStats.RetransmissionsReceived,
  277. RetransmissionsSent: candidatePairStats.RetransmissionsSent,
  278. ConsentRequestsSent: candidatePairStats.ConsentRequestsSent,
  279. ConsentExpiredTimestamp: statsTimestampFrom(candidatePairStats.ConsentExpiredTimestamp),
  280. }
  281. collector.Collect(stats.ID, stats)
  282. }
  283. for _, candidateStats := range agent.GetLocalCandidatesStats() {
  284. collector.Collecting()
  285. networkType, err := getNetworkType(candidateStats.NetworkType)
  286. if err != nil {
  287. g.log.Error(err.Error())
  288. }
  289. candidateType, err := getCandidateType(candidateStats.CandidateType)
  290. if err != nil {
  291. g.log.Error(err.Error())
  292. }
  293. stats := ICECandidateStats{
  294. Timestamp: statsTimestampFrom(candidateStats.Timestamp),
  295. ID: candidateStats.ID,
  296. Type: StatsTypeLocalCandidate,
  297. IP: candidateStats.IP,
  298. Port: int32(candidateStats.Port),
  299. Protocol: networkType.Protocol(),
  300. CandidateType: candidateType,
  301. Priority: int32(candidateStats.Priority),
  302. URL: candidateStats.URL,
  303. RelayProtocol: candidateStats.RelayProtocol,
  304. Deleted: candidateStats.Deleted,
  305. }
  306. collector.Collect(stats.ID, stats)
  307. }
  308. for _, candidateStats := range agent.GetRemoteCandidatesStats() {
  309. collector.Collecting()
  310. networkType, err := getNetworkType(candidateStats.NetworkType)
  311. if err != nil {
  312. g.log.Error(err.Error())
  313. }
  314. candidateType, err := getCandidateType(candidateStats.CandidateType)
  315. if err != nil {
  316. g.log.Error(err.Error())
  317. }
  318. stats := ICECandidateStats{
  319. Timestamp: statsTimestampFrom(candidateStats.Timestamp),
  320. ID: candidateStats.ID,
  321. Type: StatsTypeRemoteCandidate,
  322. IP: candidateStats.IP,
  323. Port: int32(candidateStats.Port),
  324. Protocol: networkType.Protocol(),
  325. CandidateType: candidateType,
  326. Priority: int32(candidateStats.Priority),
  327. URL: candidateStats.URL,
  328. RelayProtocol: candidateStats.RelayProtocol,
  329. }
  330. collector.Collect(stats.ID, stats)
  331. }
  332. collector.Done()
  333. }(collector, agent)
  334. }