datachannel.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. //go:build !js
  4. // +build !js
  5. package webrtc
  6. import (
  7. "errors"
  8. "fmt"
  9. "io"
  10. "math"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/pion/datachannel"
  15. "github.com/pion/logging"
  16. "github.com/pion/webrtc/v3/pkg/rtcerr"
  17. )
  18. const dataChannelBufferSize = math.MaxUint16 // message size limit for Chromium
  19. var errSCTPNotEstablished = errors.New("SCTP not established")
  20. // DataChannel represents a WebRTC DataChannel
  21. // The DataChannel interface represents a network channel
  22. // which can be used for bidirectional peer-to-peer transfers of arbitrary data
  23. type DataChannel struct {
  24. mu sync.RWMutex
  25. statsID string
  26. label string
  27. ordered bool
  28. maxPacketLifeTime *uint16
  29. maxRetransmits *uint16
  30. protocol string
  31. negotiated bool
  32. id *uint16
  33. readyState atomic.Value // DataChannelState
  34. bufferedAmountLowThreshold uint64
  35. detachCalled bool
  36. // The binaryType represents attribute MUST, on getting, return the value to
  37. // which it was last set. On setting, if the new value is either the string
  38. // "blob" or the string "arraybuffer", then set the IDL attribute to this
  39. // new value. Otherwise, throw a SyntaxError. When an DataChannel object
  40. // is created, the binaryType attribute MUST be initialized to the string
  41. // "blob". This attribute controls how binary data is exposed to scripts.
  42. // binaryType string
  43. onMessageHandler func(DataChannelMessage)
  44. openHandlerOnce sync.Once
  45. onOpenHandler func()
  46. dialHandlerOnce sync.Once
  47. onDialHandler func()
  48. onCloseHandler func()
  49. onBufferedAmountLow func()
  50. onErrorHandler func(error)
  51. sctpTransport *SCTPTransport
  52. dataChannel *datachannel.DataChannel
  53. // A reference to the associated api object used by this datachannel
  54. api *API
  55. log logging.LeveledLogger
  56. }
  57. // NewDataChannel creates a new DataChannel.
  58. // This constructor is part of the ORTC API. It is not
  59. // meant to be used together with the basic WebRTC API.
  60. func (api *API) NewDataChannel(transport *SCTPTransport, params *DataChannelParameters) (*DataChannel, error) {
  61. d, err := api.newDataChannel(params, nil, api.settingEngine.LoggerFactory.NewLogger("ortc"))
  62. if err != nil {
  63. return nil, err
  64. }
  65. err = d.open(transport)
  66. if err != nil {
  67. return nil, err
  68. }
  69. return d, nil
  70. }
  71. // newDataChannel is an internal constructor for the data channel used to
  72. // create the DataChannel object before the networking is set up.
  73. func (api *API) newDataChannel(params *DataChannelParameters, sctpTransport *SCTPTransport, log logging.LeveledLogger) (*DataChannel, error) {
  74. // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #5)
  75. if len(params.Label) > 65535 {
  76. return nil, &rtcerr.TypeError{Err: ErrStringSizeLimit}
  77. }
  78. d := &DataChannel{
  79. sctpTransport: sctpTransport,
  80. statsID: fmt.Sprintf("DataChannel-%d", time.Now().UnixNano()),
  81. label: params.Label,
  82. protocol: params.Protocol,
  83. negotiated: params.Negotiated,
  84. id: params.ID,
  85. ordered: params.Ordered,
  86. maxPacketLifeTime: params.MaxPacketLifeTime,
  87. maxRetransmits: params.MaxRetransmits,
  88. api: api,
  89. log: log,
  90. }
  91. d.setReadyState(DataChannelStateConnecting)
  92. return d, nil
  93. }
  94. // open opens the datachannel over the sctp transport
  95. func (d *DataChannel) open(sctpTransport *SCTPTransport) error {
  96. association := sctpTransport.association()
  97. if association == nil {
  98. return errSCTPNotEstablished
  99. }
  100. d.mu.Lock()
  101. if d.sctpTransport != nil { // already open
  102. d.mu.Unlock()
  103. return nil
  104. }
  105. d.sctpTransport = sctpTransport
  106. var channelType datachannel.ChannelType
  107. var reliabilityParameter uint32
  108. switch {
  109. case d.maxPacketLifeTime == nil && d.maxRetransmits == nil:
  110. if d.ordered {
  111. channelType = datachannel.ChannelTypeReliable
  112. } else {
  113. channelType = datachannel.ChannelTypeReliableUnordered
  114. }
  115. case d.maxRetransmits != nil:
  116. reliabilityParameter = uint32(*d.maxRetransmits)
  117. if d.ordered {
  118. channelType = datachannel.ChannelTypePartialReliableRexmit
  119. } else {
  120. channelType = datachannel.ChannelTypePartialReliableRexmitUnordered
  121. }
  122. default:
  123. reliabilityParameter = uint32(*d.maxPacketLifeTime)
  124. if d.ordered {
  125. channelType = datachannel.ChannelTypePartialReliableTimed
  126. } else {
  127. channelType = datachannel.ChannelTypePartialReliableTimedUnordered
  128. }
  129. }
  130. cfg := &datachannel.Config{
  131. ChannelType: channelType,
  132. Priority: datachannel.ChannelPriorityNormal,
  133. ReliabilityParameter: reliabilityParameter,
  134. Label: d.label,
  135. Protocol: d.protocol,
  136. Negotiated: d.negotiated,
  137. LoggerFactory: d.api.settingEngine.LoggerFactory,
  138. }
  139. if d.id == nil {
  140. // avoid holding lock when generating ID, since id generation locks
  141. d.mu.Unlock()
  142. var dcID *uint16
  143. err := d.sctpTransport.generateAndSetDataChannelID(d.sctpTransport.dtlsTransport.role(), &dcID)
  144. if err != nil {
  145. return err
  146. }
  147. d.mu.Lock()
  148. d.id = dcID
  149. }
  150. dc, err := datachannel.Dial(association, *d.id, cfg)
  151. if err != nil {
  152. d.mu.Unlock()
  153. return err
  154. }
  155. // bufferedAmountLowThreshold and onBufferedAmountLow might be set earlier
  156. dc.SetBufferedAmountLowThreshold(d.bufferedAmountLowThreshold)
  157. dc.OnBufferedAmountLow(d.onBufferedAmountLow)
  158. d.mu.Unlock()
  159. d.onDial()
  160. d.handleOpen(dc, false, d.negotiated)
  161. return nil
  162. }
  163. // Transport returns the SCTPTransport instance the DataChannel is sending over.
  164. func (d *DataChannel) Transport() *SCTPTransport {
  165. d.mu.RLock()
  166. defer d.mu.RUnlock()
  167. return d.sctpTransport
  168. }
  169. // After onOpen is complete check that the user called detach
  170. // and provide an error message if the call was missed
  171. func (d *DataChannel) checkDetachAfterOpen() {
  172. d.mu.RLock()
  173. defer d.mu.RUnlock()
  174. if d.api.settingEngine.detach.DataChannels && !d.detachCalled {
  175. d.log.Warn("webrtc.DetachDataChannels() enabled but didn't Detach, call Detach from OnOpen")
  176. }
  177. }
  178. // OnOpen sets an event handler which is invoked when
  179. // the underlying data transport has been established (or re-established).
  180. func (d *DataChannel) OnOpen(f func()) {
  181. d.mu.Lock()
  182. d.openHandlerOnce = sync.Once{}
  183. d.onOpenHandler = f
  184. d.mu.Unlock()
  185. if d.ReadyState() == DataChannelStateOpen {
  186. // If the data channel is already open, call the handler immediately.
  187. go d.openHandlerOnce.Do(func() {
  188. f()
  189. d.checkDetachAfterOpen()
  190. })
  191. }
  192. }
  193. func (d *DataChannel) onOpen() {
  194. d.mu.RLock()
  195. handler := d.onOpenHandler
  196. d.mu.RUnlock()
  197. if handler != nil {
  198. go d.openHandlerOnce.Do(func() {
  199. handler()
  200. d.checkDetachAfterOpen()
  201. })
  202. }
  203. }
  204. // OnDial sets an event handler which is invoked when the
  205. // peer has been dialed, but before said peer has responsed
  206. func (d *DataChannel) OnDial(f func()) {
  207. d.mu.Lock()
  208. d.dialHandlerOnce = sync.Once{}
  209. d.onDialHandler = f
  210. d.mu.Unlock()
  211. if d.ReadyState() == DataChannelStateOpen {
  212. // If the data channel is already open, call the handler immediately.
  213. go d.dialHandlerOnce.Do(f)
  214. }
  215. }
  216. func (d *DataChannel) onDial() {
  217. d.mu.RLock()
  218. handler := d.onDialHandler
  219. d.mu.RUnlock()
  220. if handler != nil {
  221. go d.dialHandlerOnce.Do(handler)
  222. }
  223. }
  224. // OnClose sets an event handler which is invoked when
  225. // the underlying data transport has been closed.
  226. func (d *DataChannel) OnClose(f func()) {
  227. d.mu.Lock()
  228. defer d.mu.Unlock()
  229. d.onCloseHandler = f
  230. }
  231. func (d *DataChannel) onClose() {
  232. d.mu.RLock()
  233. handler := d.onCloseHandler
  234. d.mu.RUnlock()
  235. if handler != nil {
  236. go handler()
  237. }
  238. }
  239. // OnMessage sets an event handler which is invoked on a binary
  240. // message arrival over the sctp transport from a remote peer.
  241. // OnMessage can currently receive messages up to 16384 bytes
  242. // in size. Check out the detach API if you want to use larger
  243. // message sizes. Note that browser support for larger messages
  244. // is also limited.
  245. func (d *DataChannel) OnMessage(f func(msg DataChannelMessage)) {
  246. d.mu.Lock()
  247. defer d.mu.Unlock()
  248. d.onMessageHandler = f
  249. }
  250. func (d *DataChannel) onMessage(msg DataChannelMessage) {
  251. d.mu.RLock()
  252. handler := d.onMessageHandler
  253. d.mu.RUnlock()
  254. if handler == nil {
  255. return
  256. }
  257. handler(msg)
  258. }
  259. func (d *DataChannel) handleOpen(dc *datachannel.DataChannel, isRemote, isAlreadyNegotiated bool) {
  260. d.mu.Lock()
  261. d.dataChannel = dc
  262. bufferedAmountLowThreshold := d.bufferedAmountLowThreshold
  263. onBufferedAmountLow := d.onBufferedAmountLow
  264. d.mu.Unlock()
  265. d.setReadyState(DataChannelStateOpen)
  266. // Fire the OnOpen handler immediately not using pion/datachannel
  267. // * detached datachannels have no read loop, the user needs to read and query themselves
  268. // * remote datachannels should fire OnOpened. This isn't spec compliant, but we can't break behavior yet
  269. // * already negotiated datachannels should fire OnOpened
  270. if d.api.settingEngine.detach.DataChannels || isRemote || isAlreadyNegotiated {
  271. // bufferedAmountLowThreshold and onBufferedAmountLow might be set earlier
  272. d.dataChannel.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold)
  273. d.dataChannel.OnBufferedAmountLow(onBufferedAmountLow)
  274. d.onOpen()
  275. } else {
  276. dc.OnOpen(func() {
  277. d.onOpen()
  278. })
  279. }
  280. d.mu.Lock()
  281. defer d.mu.Unlock()
  282. if !d.api.settingEngine.detach.DataChannels {
  283. go d.readLoop()
  284. }
  285. }
  286. // OnError sets an event handler which is invoked when
  287. // the underlying data transport cannot be read.
  288. func (d *DataChannel) OnError(f func(err error)) {
  289. d.mu.Lock()
  290. defer d.mu.Unlock()
  291. d.onErrorHandler = f
  292. }
  293. func (d *DataChannel) onError(err error) {
  294. d.mu.RLock()
  295. handler := d.onErrorHandler
  296. d.mu.RUnlock()
  297. if handler != nil {
  298. go handler(err)
  299. }
  300. }
  301. // See https://github.com/pion/webrtc/issues/1516
  302. // nolint:gochecknoglobals
  303. var rlBufPool = sync.Pool{New: func() interface{} {
  304. return make([]byte, dataChannelBufferSize)
  305. }}
  306. func (d *DataChannel) readLoop() {
  307. for {
  308. buffer := rlBufPool.Get().([]byte) //nolint:forcetypeassert
  309. n, isString, err := d.dataChannel.ReadDataChannel(buffer)
  310. if err != nil {
  311. rlBufPool.Put(buffer) // nolint:staticcheck
  312. d.setReadyState(DataChannelStateClosed)
  313. if !errors.Is(err, io.EOF) {
  314. d.onError(err)
  315. }
  316. d.onClose()
  317. return
  318. }
  319. m := DataChannelMessage{Data: make([]byte, n), IsString: isString}
  320. copy(m.Data, buffer[:n])
  321. // The 'staticcheck' pragma is a false positive on the part of the CI linter.
  322. rlBufPool.Put(buffer) // nolint:staticcheck
  323. // NB: Why was DataChannelMessage not passed as a pointer value?
  324. d.onMessage(m) // nolint:staticcheck
  325. }
  326. }
  327. // Send sends the binary message to the DataChannel peer
  328. func (d *DataChannel) Send(data []byte) error {
  329. err := d.ensureOpen()
  330. if err != nil {
  331. return err
  332. }
  333. _, err = d.dataChannel.WriteDataChannel(data, false)
  334. return err
  335. }
  336. // SendText sends the text message to the DataChannel peer
  337. func (d *DataChannel) SendText(s string) error {
  338. err := d.ensureOpen()
  339. if err != nil {
  340. return err
  341. }
  342. _, err = d.dataChannel.WriteDataChannel([]byte(s), true)
  343. return err
  344. }
  345. func (d *DataChannel) ensureOpen() error {
  346. d.mu.RLock()
  347. defer d.mu.RUnlock()
  348. if d.ReadyState() != DataChannelStateOpen {
  349. return io.ErrClosedPipe
  350. }
  351. return nil
  352. }
  353. // Detach allows you to detach the underlying datachannel. This provides
  354. // an idiomatic API to work with, however it disables the OnMessage callback.
  355. // Before calling Detach you have to enable this behavior by calling
  356. // webrtc.DetachDataChannels(). Combining detached and normal data channels
  357. // is not supported.
  358. // Please refer to the data-channels-detach example and the
  359. // pion/datachannel documentation for the correct way to handle the
  360. // resulting DataChannel object.
  361. func (d *DataChannel) Detach() (datachannel.ReadWriteCloser, error) {
  362. d.mu.Lock()
  363. defer d.mu.Unlock()
  364. if !d.api.settingEngine.detach.DataChannels {
  365. return nil, errDetachNotEnabled
  366. }
  367. if d.dataChannel == nil {
  368. return nil, errDetachBeforeOpened
  369. }
  370. d.detachCalled = true
  371. return d.dataChannel, nil
  372. }
  373. // Close Closes the DataChannel. It may be called regardless of whether
  374. // the DataChannel object was created by this peer or the remote peer.
  375. func (d *DataChannel) Close() error {
  376. d.mu.Lock()
  377. haveSctpTransport := d.dataChannel != nil
  378. d.mu.Unlock()
  379. if d.ReadyState() == DataChannelStateClosed {
  380. return nil
  381. }
  382. d.setReadyState(DataChannelStateClosing)
  383. if !haveSctpTransport {
  384. return nil
  385. }
  386. return d.dataChannel.Close()
  387. }
  388. // Label represents a label that can be used to distinguish this
  389. // DataChannel object from other DataChannel objects. Scripts are
  390. // allowed to create multiple DataChannel objects with the same label.
  391. func (d *DataChannel) Label() string {
  392. d.mu.RLock()
  393. defer d.mu.RUnlock()
  394. return d.label
  395. }
  396. // Ordered returns true if the DataChannel is ordered, and false if
  397. // out-of-order delivery is allowed.
  398. func (d *DataChannel) Ordered() bool {
  399. d.mu.RLock()
  400. defer d.mu.RUnlock()
  401. return d.ordered
  402. }
  403. // MaxPacketLifeTime represents the length of the time window (msec) during
  404. // which transmissions and retransmissions may occur in unreliable mode.
  405. func (d *DataChannel) MaxPacketLifeTime() *uint16 {
  406. d.mu.RLock()
  407. defer d.mu.RUnlock()
  408. return d.maxPacketLifeTime
  409. }
  410. // MaxRetransmits represents the maximum number of retransmissions that are
  411. // attempted in unreliable mode.
  412. func (d *DataChannel) MaxRetransmits() *uint16 {
  413. d.mu.RLock()
  414. defer d.mu.RUnlock()
  415. return d.maxRetransmits
  416. }
  417. // Protocol represents the name of the sub-protocol used with this
  418. // DataChannel.
  419. func (d *DataChannel) Protocol() string {
  420. d.mu.RLock()
  421. defer d.mu.RUnlock()
  422. return d.protocol
  423. }
  424. // Negotiated represents whether this DataChannel was negotiated by the
  425. // application (true), or not (false).
  426. func (d *DataChannel) Negotiated() bool {
  427. d.mu.RLock()
  428. defer d.mu.RUnlock()
  429. return d.negotiated
  430. }
  431. // ID represents the ID for this DataChannel. The value is initially
  432. // null, which is what will be returned if the ID was not provided at
  433. // channel creation time, and the DTLS role of the SCTP transport has not
  434. // yet been negotiated. Otherwise, it will return the ID that was either
  435. // selected by the script or generated. After the ID is set to a non-null
  436. // value, it will not change.
  437. func (d *DataChannel) ID() *uint16 {
  438. d.mu.RLock()
  439. defer d.mu.RUnlock()
  440. return d.id
  441. }
  442. // ReadyState represents the state of the DataChannel object.
  443. func (d *DataChannel) ReadyState() DataChannelState {
  444. if v, ok := d.readyState.Load().(DataChannelState); ok {
  445. return v
  446. }
  447. return DataChannelState(0)
  448. }
  449. // BufferedAmount represents the number of bytes of application data
  450. // (UTF-8 text and binary data) that have been queued using send(). Even
  451. // though the data transmission can occur in parallel, the returned value
  452. // MUST NOT be decreased before the current task yielded back to the event
  453. // loop to prevent race conditions. The value does not include framing
  454. // overhead incurred by the protocol, or buffering done by the operating
  455. // system or network hardware. The value of BufferedAmount slot will only
  456. // increase with each call to the send() method as long as the ReadyState is
  457. // open; however, BufferedAmount does not reset to zero once the channel
  458. // closes.
  459. func (d *DataChannel) BufferedAmount() uint64 {
  460. d.mu.RLock()
  461. defer d.mu.RUnlock()
  462. if d.dataChannel == nil {
  463. return 0
  464. }
  465. return d.dataChannel.BufferedAmount()
  466. }
  467. // BufferedAmountLowThreshold represents the threshold at which the
  468. // bufferedAmount is considered to be low. When the bufferedAmount decreases
  469. // from above this threshold to equal or below it, the bufferedamountlow
  470. // event fires. BufferedAmountLowThreshold is initially zero on each new
  471. // DataChannel, but the application may change its value at any time.
  472. // The threshold is set to 0 by default.
  473. func (d *DataChannel) BufferedAmountLowThreshold() uint64 {
  474. d.mu.RLock()
  475. defer d.mu.RUnlock()
  476. if d.dataChannel == nil {
  477. return d.bufferedAmountLowThreshold
  478. }
  479. return d.dataChannel.BufferedAmountLowThreshold()
  480. }
  481. // SetBufferedAmountLowThreshold is used to update the threshold.
  482. // See BufferedAmountLowThreshold().
  483. func (d *DataChannel) SetBufferedAmountLowThreshold(th uint64) {
  484. d.mu.Lock()
  485. defer d.mu.Unlock()
  486. d.bufferedAmountLowThreshold = th
  487. if d.dataChannel != nil {
  488. d.dataChannel.SetBufferedAmountLowThreshold(th)
  489. }
  490. }
  491. // OnBufferedAmountLow sets an event handler which is invoked when
  492. // the number of bytes of outgoing data becomes lower than the
  493. // BufferedAmountLowThreshold.
  494. func (d *DataChannel) OnBufferedAmountLow(f func()) {
  495. d.mu.Lock()
  496. defer d.mu.Unlock()
  497. d.onBufferedAmountLow = f
  498. if d.dataChannel != nil {
  499. d.dataChannel.OnBufferedAmountLow(f)
  500. }
  501. }
  502. func (d *DataChannel) getStatsID() string {
  503. d.mu.Lock()
  504. defer d.mu.Unlock()
  505. return d.statsID
  506. }
  507. func (d *DataChannel) collectStats(collector *statsReportCollector) {
  508. collector.Collecting()
  509. d.mu.Lock()
  510. defer d.mu.Unlock()
  511. stats := DataChannelStats{
  512. Timestamp: statsTimestampNow(),
  513. Type: StatsTypeDataChannel,
  514. ID: d.statsID,
  515. Label: d.label,
  516. Protocol: d.protocol,
  517. // TransportID string `json:"transportId"`
  518. State: d.ReadyState(),
  519. }
  520. if d.id != nil {
  521. stats.DataChannelIdentifier = int32(*d.id)
  522. }
  523. if d.dataChannel != nil {
  524. stats.MessagesSent = d.dataChannel.MessagesSent()
  525. stats.BytesSent = d.dataChannel.BytesSent()
  526. stats.MessagesReceived = d.dataChannel.MessagesReceived()
  527. stats.BytesReceived = d.dataChannel.BytesReceived()
  528. }
  529. collector.Collect(stats.ID, stats)
  530. }
  531. func (d *DataChannel) setReadyState(r DataChannelState) {
  532. d.readyState.Store(r)
  533. }