rtpsender.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. //go:build !js
  4. // +build !js
  5. package webrtc
  6. import (
  7. "fmt"
  8. "io"
  9. "sync"
  10. "time"
  11. "github.com/pion/interceptor"
  12. "github.com/pion/randutil"
  13. "github.com/pion/rtcp"
  14. "github.com/pion/rtp"
  15. "github.com/pion/webrtc/v3/internal/util"
  16. )
  17. type trackEncoding struct {
  18. track TrackLocal
  19. srtpStream *srtpWriterFuture
  20. rtcpInterceptor interceptor.RTCPReader
  21. streamInfo interceptor.StreamInfo
  22. context *baseTrackLocalContext
  23. ssrc SSRC
  24. }
  25. // RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer
  26. type RTPSender struct {
  27. trackEncodings []*trackEncoding
  28. transport *DTLSTransport
  29. payloadType PayloadType
  30. kind RTPCodecType
  31. // nolint:godox
  32. // TODO(sgotti) remove this when in future we'll avoid replacing
  33. // a transceiver sender since we can just check the
  34. // transceiver negotiation status
  35. negotiated bool
  36. // A reference to the associated api object
  37. api *API
  38. id string
  39. rtpTransceiver *RTPTransceiver
  40. mu sync.RWMutex
  41. sendCalled, stopCalled chan struct{}
  42. }
  43. // NewRTPSender constructs a new RTPSender
  44. func (api *API) NewRTPSender(track TrackLocal, transport *DTLSTransport) (*RTPSender, error) {
  45. if track == nil {
  46. return nil, errRTPSenderTrackNil
  47. } else if transport == nil {
  48. return nil, errRTPSenderDTLSTransportNil
  49. }
  50. id, err := randutil.GenerateCryptoRandomString(32, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
  51. if err != nil {
  52. return nil, err
  53. }
  54. r := &RTPSender{
  55. transport: transport,
  56. api: api,
  57. sendCalled: make(chan struct{}),
  58. stopCalled: make(chan struct{}),
  59. id: id,
  60. kind: track.Kind(),
  61. }
  62. r.addEncoding(track)
  63. return r, nil
  64. }
  65. func (r *RTPSender) isNegotiated() bool {
  66. r.mu.RLock()
  67. defer r.mu.RUnlock()
  68. return r.negotiated
  69. }
  70. func (r *RTPSender) setNegotiated() {
  71. r.mu.Lock()
  72. defer r.mu.Unlock()
  73. r.negotiated = true
  74. }
  75. func (r *RTPSender) setRTPTransceiver(rtpTransceiver *RTPTransceiver) {
  76. r.mu.Lock()
  77. defer r.mu.Unlock()
  78. r.rtpTransceiver = rtpTransceiver
  79. }
  80. // Transport returns the currently-configured *DTLSTransport or nil
  81. // if one has not yet been configured
  82. func (r *RTPSender) Transport() *DTLSTransport {
  83. r.mu.RLock()
  84. defer r.mu.RUnlock()
  85. return r.transport
  86. }
  87. func (r *RTPSender) getParameters() RTPSendParameters {
  88. var encodings []RTPEncodingParameters
  89. for _, trackEncoding := range r.trackEncodings {
  90. var rid string
  91. if trackEncoding.track != nil {
  92. rid = trackEncoding.track.RID()
  93. }
  94. encodings = append(encodings, RTPEncodingParameters{
  95. RTPCodingParameters: RTPCodingParameters{
  96. RID: rid,
  97. SSRC: trackEncoding.ssrc,
  98. PayloadType: r.payloadType,
  99. },
  100. })
  101. }
  102. sendParameters := RTPSendParameters{
  103. RTPParameters: r.api.mediaEngine.getRTPParametersByKind(
  104. r.kind,
  105. []RTPTransceiverDirection{RTPTransceiverDirectionSendonly},
  106. ),
  107. Encodings: encodings,
  108. }
  109. if r.rtpTransceiver != nil {
  110. sendParameters.Codecs = r.rtpTransceiver.getCodecs()
  111. } else {
  112. sendParameters.Codecs = r.api.mediaEngine.getCodecsByKind(r.kind)
  113. }
  114. return sendParameters
  115. }
  116. // GetParameters describes the current configuration for the encoding and
  117. // transmission of media on the sender's track.
  118. func (r *RTPSender) GetParameters() RTPSendParameters {
  119. r.mu.RLock()
  120. defer r.mu.RUnlock()
  121. return r.getParameters()
  122. }
  123. // AddEncoding adds an encoding to RTPSender. Used by simulcast senders.
  124. func (r *RTPSender) AddEncoding(track TrackLocal) error {
  125. r.mu.Lock()
  126. defer r.mu.Unlock()
  127. if track == nil {
  128. return errRTPSenderTrackNil
  129. }
  130. if track.RID() == "" {
  131. return errRTPSenderRidNil
  132. }
  133. if r.hasStopped() {
  134. return errRTPSenderStopped
  135. }
  136. if r.hasSent() {
  137. return errRTPSenderSendAlreadyCalled
  138. }
  139. var refTrack TrackLocal
  140. if len(r.trackEncodings) != 0 {
  141. refTrack = r.trackEncodings[0].track
  142. }
  143. if refTrack == nil || refTrack.RID() == "" {
  144. return errRTPSenderNoBaseEncoding
  145. }
  146. if refTrack.ID() != track.ID() || refTrack.StreamID() != track.StreamID() || refTrack.Kind() != track.Kind() {
  147. return errRTPSenderBaseEncodingMismatch
  148. }
  149. for _, encoding := range r.trackEncodings {
  150. if encoding.track == nil {
  151. continue
  152. }
  153. if encoding.track.RID() == track.RID() {
  154. return errRTPSenderRIDCollision
  155. }
  156. }
  157. r.addEncoding(track)
  158. return nil
  159. }
  160. func (r *RTPSender) addEncoding(track TrackLocal) {
  161. ssrc := SSRC(randutil.NewMathRandomGenerator().Uint32())
  162. trackEncoding := &trackEncoding{
  163. track: track,
  164. srtpStream: &srtpWriterFuture{ssrc: ssrc},
  165. ssrc: ssrc,
  166. }
  167. trackEncoding.srtpStream.rtpSender = r
  168. trackEncoding.rtcpInterceptor = r.api.interceptor.BindRTCPReader(
  169. interceptor.RTPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
  170. n, err = trackEncoding.srtpStream.Read(in)
  171. return n, a, err
  172. }),
  173. )
  174. r.trackEncodings = append(r.trackEncodings, trackEncoding)
  175. }
  176. // Track returns the RTCRtpTransceiver track, or nil
  177. func (r *RTPSender) Track() TrackLocal {
  178. r.mu.RLock()
  179. defer r.mu.RUnlock()
  180. if len(r.trackEncodings) == 0 {
  181. return nil
  182. }
  183. return r.trackEncodings[0].track
  184. }
  185. // ReplaceTrack replaces the track currently being used as the sender's source with a new TrackLocal.
  186. // The new track must be of the same media kind (audio, video, etc) and switching the track should not
  187. // require negotiation.
  188. func (r *RTPSender) ReplaceTrack(track TrackLocal) error {
  189. r.mu.Lock()
  190. defer r.mu.Unlock()
  191. if track != nil && r.kind != track.Kind() {
  192. return ErrRTPSenderNewTrackHasIncorrectKind
  193. }
  194. // cannot replace simulcast envelope
  195. if track != nil && len(r.trackEncodings) > 1 {
  196. return ErrRTPSenderNewTrackHasIncorrectEnvelope
  197. }
  198. var replacedTrack TrackLocal
  199. var context *baseTrackLocalContext
  200. for _, e := range r.trackEncodings {
  201. replacedTrack = e.track
  202. context = e.context
  203. if r.hasSent() && replacedTrack != nil {
  204. if err := replacedTrack.Unbind(context); err != nil {
  205. return err
  206. }
  207. }
  208. if !r.hasSent() || track == nil {
  209. e.track = track
  210. }
  211. }
  212. if !r.hasSent() || track == nil {
  213. return nil
  214. }
  215. // If we reach this point in the routine, there is only 1 track encoding
  216. codec, err := track.Bind(&baseTrackLocalContext{
  217. id: context.ID(),
  218. params: r.api.mediaEngine.getRTPParametersByKind(track.Kind(), []RTPTransceiverDirection{RTPTransceiverDirectionSendonly}),
  219. ssrc: context.SSRC(),
  220. writeStream: context.WriteStream(),
  221. rtcpInterceptor: context.RTCPReader(),
  222. })
  223. if err != nil {
  224. // Re-bind the original track
  225. if _, reBindErr := replacedTrack.Bind(context); reBindErr != nil {
  226. return reBindErr
  227. }
  228. return err
  229. }
  230. // Codec has changed
  231. if r.payloadType != codec.PayloadType {
  232. context.params.Codecs = []RTPCodecParameters{codec}
  233. }
  234. r.trackEncodings[0].track = track
  235. return nil
  236. }
  237. // Send Attempts to set the parameters controlling the sending of media.
  238. func (r *RTPSender) Send(parameters RTPSendParameters) error {
  239. r.mu.Lock()
  240. defer r.mu.Unlock()
  241. switch {
  242. case r.hasSent():
  243. return errRTPSenderSendAlreadyCalled
  244. case r.trackEncodings[0].track == nil:
  245. return errRTPSenderTrackRemoved
  246. }
  247. for idx, trackEncoding := range r.trackEncodings {
  248. writeStream := &interceptorToTrackLocalWriter{}
  249. trackEncoding.context = &baseTrackLocalContext{
  250. id: r.id,
  251. params: r.api.mediaEngine.getRTPParametersByKind(trackEncoding.track.Kind(), []RTPTransceiverDirection{RTPTransceiverDirectionSendonly}),
  252. ssrc: parameters.Encodings[idx].SSRC,
  253. writeStream: writeStream,
  254. rtcpInterceptor: trackEncoding.rtcpInterceptor,
  255. }
  256. codec, err := trackEncoding.track.Bind(trackEncoding.context)
  257. if err != nil {
  258. return err
  259. }
  260. trackEncoding.context.params.Codecs = []RTPCodecParameters{codec}
  261. trackEncoding.streamInfo = *createStreamInfo(
  262. r.id,
  263. parameters.Encodings[idx].SSRC,
  264. codec.PayloadType,
  265. codec.RTPCodecCapability,
  266. parameters.HeaderExtensions,
  267. )
  268. srtpStream := trackEncoding.srtpStream
  269. rtpInterceptor := r.api.interceptor.BindLocalStream(
  270. &trackEncoding.streamInfo,
  271. interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
  272. return srtpStream.WriteRTP(header, payload)
  273. }),
  274. )
  275. writeStream.interceptor.Store(rtpInterceptor)
  276. }
  277. close(r.sendCalled)
  278. return nil
  279. }
  280. // Stop irreversibly stops the RTPSender
  281. func (r *RTPSender) Stop() error {
  282. r.mu.Lock()
  283. if stopped := r.hasStopped(); stopped {
  284. r.mu.Unlock()
  285. return nil
  286. }
  287. close(r.stopCalled)
  288. r.mu.Unlock()
  289. if !r.hasSent() {
  290. return nil
  291. }
  292. if err := r.ReplaceTrack(nil); err != nil {
  293. return err
  294. }
  295. errs := []error{}
  296. for _, trackEncoding := range r.trackEncodings {
  297. r.api.interceptor.UnbindLocalStream(&trackEncoding.streamInfo)
  298. errs = append(errs, trackEncoding.srtpStream.Close())
  299. }
  300. return util.FlattenErrs(errs)
  301. }
  302. // Read reads incoming RTCP for this RTPSender
  303. func (r *RTPSender) Read(b []byte) (n int, a interceptor.Attributes, err error) {
  304. select {
  305. case <-r.sendCalled:
  306. return r.trackEncodings[0].rtcpInterceptor.Read(b, a)
  307. case <-r.stopCalled:
  308. return 0, nil, io.ErrClosedPipe
  309. }
  310. }
  311. // ReadRTCP is a convenience method that wraps Read and unmarshals for you.
  312. func (r *RTPSender) ReadRTCP() ([]rtcp.Packet, interceptor.Attributes, error) {
  313. b := make([]byte, r.api.settingEngine.getReceiveMTU())
  314. i, attributes, err := r.Read(b)
  315. if err != nil {
  316. return nil, nil, err
  317. }
  318. pkts, err := rtcp.Unmarshal(b[:i])
  319. if err != nil {
  320. return nil, nil, err
  321. }
  322. return pkts, attributes, nil
  323. }
  324. // ReadSimulcast reads incoming RTCP for this RTPSender for given rid
  325. func (r *RTPSender) ReadSimulcast(b []byte, rid string) (n int, a interceptor.Attributes, err error) {
  326. select {
  327. case <-r.sendCalled:
  328. for _, t := range r.trackEncodings {
  329. if t.track != nil && t.track.RID() == rid {
  330. return t.rtcpInterceptor.Read(b, a)
  331. }
  332. }
  333. return 0, nil, fmt.Errorf("%w: %s", errRTPSenderNoTrackForRID, rid)
  334. case <-r.stopCalled:
  335. return 0, nil, io.ErrClosedPipe
  336. }
  337. }
  338. // ReadSimulcastRTCP is a convenience method that wraps ReadSimulcast and unmarshal for you
  339. func (r *RTPSender) ReadSimulcastRTCP(rid string) ([]rtcp.Packet, interceptor.Attributes, error) {
  340. b := make([]byte, r.api.settingEngine.getReceiveMTU())
  341. i, attributes, err := r.ReadSimulcast(b, rid)
  342. if err != nil {
  343. return nil, nil, err
  344. }
  345. pkts, err := rtcp.Unmarshal(b[:i])
  346. return pkts, attributes, err
  347. }
  348. // SetReadDeadline sets the deadline for the Read operation.
  349. // Setting to zero means no deadline.
  350. func (r *RTPSender) SetReadDeadline(t time.Time) error {
  351. return r.trackEncodings[0].srtpStream.SetReadDeadline(t)
  352. }
  353. // SetReadDeadlineSimulcast sets the max amount of time the RTCP stream for a given rid will block before returning. 0 is forever.
  354. func (r *RTPSender) SetReadDeadlineSimulcast(deadline time.Time, rid string) error {
  355. r.mu.RLock()
  356. defer r.mu.RUnlock()
  357. for _, t := range r.trackEncodings {
  358. if t.track != nil && t.track.RID() == rid {
  359. return t.srtpStream.SetReadDeadline(deadline)
  360. }
  361. }
  362. return fmt.Errorf("%w: %s", errRTPSenderNoTrackForRID, rid)
  363. }
  364. // hasSent tells if data has been ever sent for this instance
  365. func (r *RTPSender) hasSent() bool {
  366. select {
  367. case <-r.sendCalled:
  368. return true
  369. default:
  370. return false
  371. }
  372. }
  373. // hasStopped tells if stop has been called
  374. func (r *RTPSender) hasStopped() bool {
  375. select {
  376. case <-r.stopCalled:
  377. return true
  378. default:
  379. return false
  380. }
  381. }