icetransport.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. //go:build !js
  4. // +build !js
  5. package webrtc
  6. import (
  7. "context"
  8. "fmt"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/pion/ice/v2"
  13. "github.com/pion/logging"
  14. "github.com/pion/webrtc/v3/internal/mux"
  15. )
  16. // ICETransport allows an application access to information about the ICE
  17. // transport over which packets are sent and received.
  18. type ICETransport struct {
  19. lock sync.RWMutex
  20. role ICERole
  21. onConnectionStateChangeHandler atomic.Value // func(ICETransportState)
  22. internalOnConnectionStateChangeHandler atomic.Value // func(ICETransportState)
  23. onSelectedCandidatePairChangeHandler atomic.Value // func(*ICECandidatePair)
  24. state atomic.Value // ICETransportState
  25. gatherer *ICEGatherer
  26. conn *ice.Conn
  27. mux *mux.Mux
  28. ctx context.Context
  29. ctxCancel func()
  30. loggerFactory logging.LoggerFactory
  31. log logging.LeveledLogger
  32. }
  33. // GetSelectedCandidatePair returns the selected candidate pair on which packets are sent
  34. // if there is no selected pair nil is returned
  35. func (t *ICETransport) GetSelectedCandidatePair() (*ICECandidatePair, error) {
  36. agent := t.gatherer.getAgent()
  37. if agent == nil {
  38. return nil, nil //nolint:nilnil
  39. }
  40. icePair, err := agent.GetSelectedCandidatePair()
  41. if icePair == nil || err != nil {
  42. return nil, err
  43. }
  44. local, err := newICECandidateFromICE(icePair.Local)
  45. if err != nil {
  46. return nil, err
  47. }
  48. remote, err := newICECandidateFromICE(icePair.Remote)
  49. if err != nil {
  50. return nil, err
  51. }
  52. return &ICECandidatePair{Local: &local, Remote: &remote}, nil
  53. }
  54. // NewICETransport creates a new NewICETransport.
  55. func NewICETransport(gatherer *ICEGatherer, loggerFactory logging.LoggerFactory) *ICETransport {
  56. iceTransport := &ICETransport{
  57. gatherer: gatherer,
  58. loggerFactory: loggerFactory,
  59. log: loggerFactory.NewLogger("ortc"),
  60. }
  61. iceTransport.setState(ICETransportStateNew)
  62. return iceTransport
  63. }
  64. // Start incoming connectivity checks based on its configured role.
  65. func (t *ICETransport) Start(gatherer *ICEGatherer, params ICEParameters, role *ICERole) error {
  66. t.lock.Lock()
  67. defer t.lock.Unlock()
  68. if t.State() != ICETransportStateNew {
  69. return errICETransportNotInNew
  70. }
  71. if gatherer != nil {
  72. t.gatherer = gatherer
  73. }
  74. if err := t.ensureGatherer(); err != nil {
  75. return err
  76. }
  77. agent := t.gatherer.getAgent()
  78. if agent == nil {
  79. return fmt.Errorf("%w: unable to start ICETransport", errICEAgentNotExist)
  80. }
  81. if err := agent.OnConnectionStateChange(func(iceState ice.ConnectionState) {
  82. state := newICETransportStateFromICE(iceState)
  83. t.setState(state)
  84. t.onConnectionStateChange(state)
  85. }); err != nil {
  86. return err
  87. }
  88. if err := agent.OnSelectedCandidatePairChange(func(local, remote ice.Candidate) {
  89. candidates, err := newICECandidatesFromICE([]ice.Candidate{local, remote})
  90. if err != nil {
  91. t.log.Warnf("%w: %s", errICECandiatesCoversionFailed, err)
  92. return
  93. }
  94. t.onSelectedCandidatePairChange(NewICECandidatePair(&candidates[0], &candidates[1]))
  95. }); err != nil {
  96. return err
  97. }
  98. if role == nil {
  99. controlled := ICERoleControlled
  100. role = &controlled
  101. }
  102. t.role = *role
  103. t.ctx, t.ctxCancel = context.WithCancel(context.Background())
  104. // Drop the lock here to allow ICE candidates to be
  105. // added so that the agent can complete a connection
  106. t.lock.Unlock()
  107. var iceConn *ice.Conn
  108. var err error
  109. switch *role {
  110. case ICERoleControlling:
  111. iceConn, err = agent.Dial(t.ctx,
  112. params.UsernameFragment,
  113. params.Password)
  114. case ICERoleControlled:
  115. iceConn, err = agent.Accept(t.ctx,
  116. params.UsernameFragment,
  117. params.Password)
  118. default:
  119. err = errICERoleUnknown
  120. }
  121. // Reacquire the lock to set the connection/mux
  122. t.lock.Lock()
  123. if err != nil {
  124. return err
  125. }
  126. t.conn = iceConn
  127. config := mux.Config{
  128. Conn: t.conn,
  129. BufferSize: int(t.gatherer.api.settingEngine.getReceiveMTU()),
  130. LoggerFactory: t.loggerFactory,
  131. }
  132. t.mux = mux.NewMux(config)
  133. return nil
  134. }
  135. // restart is not exposed currently because ORTC has users create a whole new ICETransport
  136. // so for now lets keep it private so we don't cause ORTC users to depend on non-standard APIs
  137. func (t *ICETransport) restart() error {
  138. t.lock.Lock()
  139. defer t.lock.Unlock()
  140. agent := t.gatherer.getAgent()
  141. if agent == nil {
  142. return fmt.Errorf("%w: unable to restart ICETransport", errICEAgentNotExist)
  143. }
  144. if err := agent.Restart(t.gatherer.api.settingEngine.candidates.UsernameFragment, t.gatherer.api.settingEngine.candidates.Password); err != nil {
  145. return err
  146. }
  147. return t.gatherer.Gather()
  148. }
  149. // Stop irreversibly stops the ICETransport.
  150. func (t *ICETransport) Stop() error {
  151. t.lock.Lock()
  152. defer t.lock.Unlock()
  153. t.setState(ICETransportStateClosed)
  154. if t.ctxCancel != nil {
  155. t.ctxCancel()
  156. }
  157. if t.mux != nil {
  158. return t.mux.Close()
  159. } else if t.gatherer != nil {
  160. return t.gatherer.Close()
  161. }
  162. return nil
  163. }
  164. // OnSelectedCandidatePairChange sets a handler that is invoked when a new
  165. // ICE candidate pair is selected
  166. func (t *ICETransport) OnSelectedCandidatePairChange(f func(*ICECandidatePair)) {
  167. t.onSelectedCandidatePairChangeHandler.Store(f)
  168. }
  169. func (t *ICETransport) onSelectedCandidatePairChange(pair *ICECandidatePair) {
  170. if handler, ok := t.onSelectedCandidatePairChangeHandler.Load().(func(*ICECandidatePair)); ok {
  171. handler(pair)
  172. }
  173. }
  174. // OnConnectionStateChange sets a handler that is fired when the ICE
  175. // connection state changes.
  176. func (t *ICETransport) OnConnectionStateChange(f func(ICETransportState)) {
  177. t.onConnectionStateChangeHandler.Store(f)
  178. }
  179. func (t *ICETransport) onConnectionStateChange(state ICETransportState) {
  180. if handler, ok := t.onConnectionStateChangeHandler.Load().(func(ICETransportState)); ok {
  181. handler(state)
  182. }
  183. if handler, ok := t.internalOnConnectionStateChangeHandler.Load().(func(ICETransportState)); ok {
  184. handler(state)
  185. }
  186. }
  187. // Role indicates the current role of the ICE transport.
  188. func (t *ICETransport) Role() ICERole {
  189. t.lock.RLock()
  190. defer t.lock.RUnlock()
  191. return t.role
  192. }
  193. // SetRemoteCandidates sets the sequence of candidates associated with the remote ICETransport.
  194. func (t *ICETransport) SetRemoteCandidates(remoteCandidates []ICECandidate) error {
  195. t.lock.RLock()
  196. defer t.lock.RUnlock()
  197. if err := t.ensureGatherer(); err != nil {
  198. return err
  199. }
  200. agent := t.gatherer.getAgent()
  201. if agent == nil {
  202. return fmt.Errorf("%w: unable to set remote candidates", errICEAgentNotExist)
  203. }
  204. for _, c := range remoteCandidates {
  205. i, err := c.toICE()
  206. if err != nil {
  207. return err
  208. }
  209. if err = agent.AddRemoteCandidate(i); err != nil {
  210. return err
  211. }
  212. }
  213. return nil
  214. }
  215. // AddRemoteCandidate adds a candidate associated with the remote ICETransport.
  216. func (t *ICETransport) AddRemoteCandidate(remoteCandidate *ICECandidate) error {
  217. t.lock.RLock()
  218. defer t.lock.RUnlock()
  219. var (
  220. c ice.Candidate
  221. err error
  222. )
  223. if err = t.ensureGatherer(); err != nil {
  224. return err
  225. }
  226. if remoteCandidate != nil {
  227. if c, err = remoteCandidate.toICE(); err != nil {
  228. return err
  229. }
  230. }
  231. agent := t.gatherer.getAgent()
  232. if agent == nil {
  233. return fmt.Errorf("%w: unable to add remote candidates", errICEAgentNotExist)
  234. }
  235. return agent.AddRemoteCandidate(c)
  236. }
  237. // State returns the current ice transport state.
  238. func (t *ICETransport) State() ICETransportState {
  239. if v, ok := t.state.Load().(ICETransportState); ok {
  240. return v
  241. }
  242. return ICETransportState(0)
  243. }
  244. // GetLocalParameters returns an IceParameters object which provides information
  245. // uniquely identifying the local peer for the duration of the ICE session.
  246. func (t *ICETransport) GetLocalParameters() (ICEParameters, error) {
  247. if err := t.ensureGatherer(); err != nil {
  248. return ICEParameters{}, err
  249. }
  250. return t.gatherer.GetLocalParameters()
  251. }
  252. func (t *ICETransport) setState(i ICETransportState) {
  253. t.state.Store(i)
  254. }
  255. func (t *ICETransport) newEndpoint(f mux.MatchFunc) *mux.Endpoint {
  256. t.lock.Lock()
  257. defer t.lock.Unlock()
  258. return t.mux.NewEndpoint(f)
  259. }
  260. func (t *ICETransport) ensureGatherer() error {
  261. if t.gatherer == nil {
  262. return errICEGathererNotStarted
  263. } else if t.gatherer.getAgent() == nil {
  264. if err := t.gatherer.createAgent(); err != nil {
  265. return err
  266. }
  267. }
  268. return nil
  269. }
  270. func (t *ICETransport) collectStats(collector *statsReportCollector) {
  271. t.lock.Lock()
  272. conn := t.conn
  273. t.lock.Unlock()
  274. collector.Collecting()
  275. stats := TransportStats{
  276. Timestamp: statsTimestampFrom(time.Now()),
  277. Type: StatsTypeTransport,
  278. ID: "iceTransport",
  279. }
  280. if conn != nil {
  281. stats.BytesSent = conn.BytesSent()
  282. stats.BytesReceived = conn.BytesReceived()
  283. }
  284. collector.Collect(stats.ID, stats)
  285. }
  286. func (t *ICETransport) haveRemoteCredentialsChange(newUfrag, newPwd string) bool {
  287. t.lock.Lock()
  288. defer t.lock.Unlock()
  289. agent := t.gatherer.getAgent()
  290. if agent == nil {
  291. return false
  292. }
  293. uFrag, uPwd, err := agent.GetRemoteUserCredentials()
  294. if err != nil {
  295. return false
  296. }
  297. return uFrag != newUfrag || uPwd != newPwd
  298. }
  299. func (t *ICETransport) setRemoteCredentials(newUfrag, newPwd string) error {
  300. t.lock.Lock()
  301. defer t.lock.Unlock()
  302. agent := t.gatherer.getAgent()
  303. if agent == nil {
  304. return fmt.Errorf("%w: unable to SetRemoteCredentials", errICEAgentNotExist)
  305. }
  306. return agent.SetRemoteCredentials(newUfrag, newPwd)
  307. }