rtpreceiver.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. //go:build !js
  4. // +build !js
  5. package webrtc
  6. import (
  7. "fmt"
  8. "io"
  9. "sync"
  10. "time"
  11. "github.com/pion/interceptor"
  12. "github.com/pion/rtcp"
  13. "github.com/pion/srtp/v2"
  14. "github.com/pion/webrtc/v3/internal/util"
  15. )
  16. // trackStreams maintains a mapping of RTP/RTCP streams to a specific track
  17. // a RTPReceiver may contain multiple streams if we are dealing with Simulcast
  18. type trackStreams struct {
  19. track *TrackRemote
  20. streamInfo, repairStreamInfo *interceptor.StreamInfo
  21. rtpReadStream *srtp.ReadStreamSRTP
  22. rtpInterceptor interceptor.RTPReader
  23. rtcpReadStream *srtp.ReadStreamSRTCP
  24. rtcpInterceptor interceptor.RTCPReader
  25. repairReadStream *srtp.ReadStreamSRTP
  26. repairInterceptor interceptor.RTPReader
  27. repairRtcpReadStream *srtp.ReadStreamSRTCP
  28. repairRtcpInterceptor interceptor.RTCPReader
  29. }
  30. // RTPReceiver allows an application to inspect the receipt of a TrackRemote
  31. type RTPReceiver struct {
  32. kind RTPCodecType
  33. transport *DTLSTransport
  34. tracks []trackStreams
  35. closed, received chan interface{}
  36. mu sync.RWMutex
  37. tr *RTPTransceiver
  38. // A reference to the associated api object
  39. api *API
  40. }
  41. // NewRTPReceiver constructs a new RTPReceiver
  42. func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RTPReceiver, error) {
  43. if transport == nil {
  44. return nil, errRTPReceiverDTLSTransportNil
  45. }
  46. r := &RTPReceiver{
  47. kind: kind,
  48. transport: transport,
  49. api: api,
  50. closed: make(chan interface{}),
  51. received: make(chan interface{}),
  52. tracks: []trackStreams{},
  53. }
  54. return r, nil
  55. }
  56. func (r *RTPReceiver) setRTPTransceiver(tr *RTPTransceiver) {
  57. r.mu.Lock()
  58. defer r.mu.Unlock()
  59. r.tr = tr
  60. }
  61. // Transport returns the currently-configured *DTLSTransport or nil
  62. // if one has not yet been configured
  63. func (r *RTPReceiver) Transport() *DTLSTransport {
  64. r.mu.RLock()
  65. defer r.mu.RUnlock()
  66. return r.transport
  67. }
  68. func (r *RTPReceiver) getParameters() RTPParameters {
  69. parameters := r.api.mediaEngine.getRTPParametersByKind(r.kind, []RTPTransceiverDirection{RTPTransceiverDirectionRecvonly})
  70. if r.tr != nil {
  71. parameters.Codecs = r.tr.getCodecs()
  72. }
  73. return parameters
  74. }
  75. // GetParameters describes the current configuration for the encoding and
  76. // transmission of media on the receiver's track.
  77. func (r *RTPReceiver) GetParameters() RTPParameters {
  78. r.mu.RLock()
  79. defer r.mu.RUnlock()
  80. return r.getParameters()
  81. }
  82. // Track returns the RtpTransceiver TrackRemote
  83. func (r *RTPReceiver) Track() *TrackRemote {
  84. r.mu.RLock()
  85. defer r.mu.RUnlock()
  86. if len(r.tracks) != 1 {
  87. return nil
  88. }
  89. return r.tracks[0].track
  90. }
  91. // Tracks returns the RtpTransceiver tracks
  92. // A RTPReceiver to support Simulcast may now have multiple tracks
  93. func (r *RTPReceiver) Tracks() []*TrackRemote {
  94. r.mu.RLock()
  95. defer r.mu.RUnlock()
  96. var tracks []*TrackRemote
  97. for i := range r.tracks {
  98. tracks = append(tracks, r.tracks[i].track)
  99. }
  100. return tracks
  101. }
  102. // RTPTransceiver returns the RTPTransceiver this
  103. // RTPReceiver belongs too, or nil if none
  104. func (r *RTPReceiver) RTPTransceiver() *RTPTransceiver {
  105. r.mu.Lock()
  106. defer r.mu.Unlock()
  107. return r.tr
  108. }
  109. // configureReceive initialize the track
  110. func (r *RTPReceiver) configureReceive(parameters RTPReceiveParameters) {
  111. r.mu.Lock()
  112. defer r.mu.Unlock()
  113. for i := range parameters.Encodings {
  114. t := trackStreams{
  115. track: newTrackRemote(
  116. r.kind,
  117. parameters.Encodings[i].SSRC,
  118. parameters.Encodings[i].RID,
  119. r,
  120. ),
  121. }
  122. r.tracks = append(r.tracks, t)
  123. }
  124. }
  125. // startReceive starts all the transports
  126. func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error {
  127. r.mu.Lock()
  128. defer r.mu.Unlock()
  129. select {
  130. case <-r.received:
  131. return errRTPReceiverReceiveAlreadyCalled
  132. default:
  133. }
  134. defer close(r.received)
  135. globalParams := r.getParameters()
  136. codec := RTPCodecCapability{}
  137. if len(globalParams.Codecs) != 0 {
  138. codec = globalParams.Codecs[0].RTPCodecCapability
  139. }
  140. for i := range parameters.Encodings {
  141. if parameters.Encodings[i].RID != "" {
  142. // RID based tracks will be set up in receiveForRid
  143. continue
  144. }
  145. var t *trackStreams
  146. for idx, ts := range r.tracks {
  147. if ts.track != nil && parameters.Encodings[i].SSRC != 0 && ts.track.SSRC() == parameters.Encodings[i].SSRC {
  148. t = &r.tracks[idx]
  149. break
  150. }
  151. }
  152. if t == nil {
  153. return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, parameters.Encodings[i].SSRC)
  154. }
  155. if parameters.Encodings[i].SSRC != 0 {
  156. t.streamInfo = createStreamInfo("", parameters.Encodings[i].SSRC, 0, codec, globalParams.HeaderExtensions)
  157. var err error
  158. if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *t.streamInfo); err != nil {
  159. return err
  160. }
  161. }
  162. if rtxSsrc := parameters.Encodings[i].RTX.SSRC; rtxSsrc != 0 {
  163. streamInfo := createStreamInfo("", rtxSsrc, 0, codec, globalParams.HeaderExtensions)
  164. rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, err := r.transport.streamsForSSRC(rtxSsrc, *streamInfo)
  165. if err != nil {
  166. return err
  167. }
  168. if err = r.receiveForRtx(rtxSsrc, "", streamInfo, rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor); err != nil {
  169. return err
  170. }
  171. }
  172. }
  173. return nil
  174. }
  175. // Receive initialize the track and starts all the transports
  176. func (r *RTPReceiver) Receive(parameters RTPReceiveParameters) error {
  177. r.configureReceive(parameters)
  178. return r.startReceive(parameters)
  179. }
  180. // Read reads incoming RTCP for this RTPReceiver
  181. func (r *RTPReceiver) Read(b []byte) (n int, a interceptor.Attributes, err error) {
  182. select {
  183. case <-r.received:
  184. return r.tracks[0].rtcpInterceptor.Read(b, a)
  185. case <-r.closed:
  186. return 0, nil, io.ErrClosedPipe
  187. }
  188. }
  189. // ReadSimulcast reads incoming RTCP for this RTPReceiver for given rid
  190. func (r *RTPReceiver) ReadSimulcast(b []byte, rid string) (n int, a interceptor.Attributes, err error) {
  191. select {
  192. case <-r.received:
  193. for _, t := range r.tracks {
  194. if t.track != nil && t.track.rid == rid {
  195. return t.rtcpInterceptor.Read(b, a)
  196. }
  197. }
  198. return 0, nil, fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
  199. case <-r.closed:
  200. return 0, nil, io.ErrClosedPipe
  201. }
  202. }
  203. // ReadRTCP is a convenience method that wraps Read and unmarshal for you.
  204. // It also runs any configured interceptors.
  205. func (r *RTPReceiver) ReadRTCP() ([]rtcp.Packet, interceptor.Attributes, error) {
  206. b := make([]byte, r.api.settingEngine.getReceiveMTU())
  207. i, attributes, err := r.Read(b)
  208. if err != nil {
  209. return nil, nil, err
  210. }
  211. pkts, err := rtcp.Unmarshal(b[:i])
  212. if err != nil {
  213. return nil, nil, err
  214. }
  215. return pkts, attributes, nil
  216. }
  217. // ReadSimulcastRTCP is a convenience method that wraps ReadSimulcast and unmarshal for you
  218. func (r *RTPReceiver) ReadSimulcastRTCP(rid string) ([]rtcp.Packet, interceptor.Attributes, error) {
  219. b := make([]byte, r.api.settingEngine.getReceiveMTU())
  220. i, attributes, err := r.ReadSimulcast(b, rid)
  221. if err != nil {
  222. return nil, nil, err
  223. }
  224. pkts, err := rtcp.Unmarshal(b[:i])
  225. return pkts, attributes, err
  226. }
  227. func (r *RTPReceiver) haveReceived() bool {
  228. select {
  229. case <-r.received:
  230. return true
  231. default:
  232. return false
  233. }
  234. }
  235. // Stop irreversibly stops the RTPReceiver
  236. func (r *RTPReceiver) Stop() error {
  237. r.mu.Lock()
  238. defer r.mu.Unlock()
  239. var err error
  240. select {
  241. case <-r.closed:
  242. return err
  243. default:
  244. }
  245. select {
  246. case <-r.received:
  247. for i := range r.tracks {
  248. errs := []error{}
  249. if r.tracks[i].rtcpReadStream != nil {
  250. errs = append(errs, r.tracks[i].rtcpReadStream.Close())
  251. }
  252. if r.tracks[i].rtpReadStream != nil {
  253. errs = append(errs, r.tracks[i].rtpReadStream.Close())
  254. }
  255. if r.tracks[i].repairReadStream != nil {
  256. errs = append(errs, r.tracks[i].repairReadStream.Close())
  257. }
  258. if r.tracks[i].repairRtcpReadStream != nil {
  259. errs = append(errs, r.tracks[i].repairRtcpReadStream.Close())
  260. }
  261. if r.tracks[i].streamInfo != nil {
  262. r.api.interceptor.UnbindRemoteStream(r.tracks[i].streamInfo)
  263. }
  264. if r.tracks[i].repairStreamInfo != nil {
  265. r.api.interceptor.UnbindRemoteStream(r.tracks[i].repairStreamInfo)
  266. }
  267. err = util.FlattenErrs(errs)
  268. }
  269. default:
  270. }
  271. close(r.closed)
  272. return err
  273. }
  274. func (r *RTPReceiver) streamsForTrack(t *TrackRemote) *trackStreams {
  275. for i := range r.tracks {
  276. if r.tracks[i].track == t {
  277. return &r.tracks[i]
  278. }
  279. }
  280. return nil
  281. }
  282. // readRTP should only be called by a track, this only exists so we can keep state in one place
  283. func (r *RTPReceiver) readRTP(b []byte, reader *TrackRemote) (n int, a interceptor.Attributes, err error) {
  284. <-r.received
  285. if t := r.streamsForTrack(reader); t != nil {
  286. return t.rtpInterceptor.Read(b, a)
  287. }
  288. return 0, nil, fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
  289. }
  290. // receiveForRid is the sibling of Receive expect for RIDs instead of SSRCs
  291. // It populates all the internal state for the given RID
  292. func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) (*TrackRemote, error) {
  293. r.mu.Lock()
  294. defer r.mu.Unlock()
  295. for i := range r.tracks {
  296. if r.tracks[i].track.RID() == rid {
  297. r.tracks[i].track.mu.Lock()
  298. r.tracks[i].track.kind = r.kind
  299. r.tracks[i].track.codec = params.Codecs[0]
  300. r.tracks[i].track.params = params
  301. r.tracks[i].track.ssrc = SSRC(streamInfo.SSRC)
  302. r.tracks[i].track.mu.Unlock()
  303. r.tracks[i].streamInfo = streamInfo
  304. r.tracks[i].rtpReadStream = rtpReadStream
  305. r.tracks[i].rtpInterceptor = rtpInterceptor
  306. r.tracks[i].rtcpReadStream = rtcpReadStream
  307. r.tracks[i].rtcpInterceptor = rtcpInterceptor
  308. return r.tracks[i].track, nil
  309. }
  310. }
  311. return nil, fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
  312. }
  313. // receiveForRtx starts a routine that processes the repair stream
  314. // These packets aren't exposed to the user yet, but we need to process them for
  315. // TWCC
  316. func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error {
  317. var track *trackStreams
  318. if ssrc != 0 && len(r.tracks) == 1 {
  319. track = &r.tracks[0]
  320. } else {
  321. for i := range r.tracks {
  322. if r.tracks[i].track.RID() == rsid {
  323. track = &r.tracks[i]
  324. }
  325. }
  326. }
  327. if track == nil {
  328. return fmt.Errorf("%w: ssrc(%d) rsid(%s)", errRTPReceiverForRIDTrackStreamNotFound, ssrc, rsid)
  329. }
  330. track.repairStreamInfo = streamInfo
  331. track.repairReadStream = rtpReadStream
  332. track.repairInterceptor = rtpInterceptor
  333. track.repairRtcpReadStream = rtcpReadStream
  334. track.repairRtcpInterceptor = rtcpInterceptor
  335. go func() {
  336. b := make([]byte, r.api.settingEngine.getReceiveMTU())
  337. for {
  338. if _, _, readErr := track.repairInterceptor.Read(b, nil); readErr != nil {
  339. return
  340. }
  341. }
  342. }()
  343. return nil
  344. }
  345. // SetReadDeadline sets the max amount of time the RTCP stream will block before returning. 0 is forever.
  346. func (r *RTPReceiver) SetReadDeadline(t time.Time) error {
  347. r.mu.RLock()
  348. defer r.mu.RUnlock()
  349. return r.tracks[0].rtcpReadStream.SetReadDeadline(t)
  350. }
  351. // SetReadDeadlineSimulcast sets the max amount of time the RTCP stream for a given rid will block before returning. 0 is forever.
  352. func (r *RTPReceiver) SetReadDeadlineSimulcast(deadline time.Time, rid string) error {
  353. r.mu.RLock()
  354. defer r.mu.RUnlock()
  355. for _, t := range r.tracks {
  356. if t.track != nil && t.track.rid == rid {
  357. return t.rtcpReadStream.SetReadDeadline(deadline)
  358. }
  359. }
  360. return fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
  361. }
  362. // setRTPReadDeadline sets the max amount of time the RTP stream will block before returning. 0 is forever.
  363. // This should be fired by calling SetReadDeadline on the TrackRemote
  364. func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote) error {
  365. r.mu.RLock()
  366. defer r.mu.RUnlock()
  367. if t := r.streamsForTrack(reader); t != nil {
  368. return t.rtpReadStream.SetReadDeadline(deadline)
  369. }
  370. return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
  371. }