rtpreceiver.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. //go:build !js
  2. // +build !js
  3. package webrtc
  4. import (
  5. "fmt"
  6. "io"
  7. "sync"
  8. "time"
  9. "github.com/pion/interceptor"
  10. "github.com/pion/rtcp"
  11. "github.com/pion/srtp/v2"
  12. "github.com/pion/webrtc/v3/internal/util"
  13. )
  14. // trackStreams maintains a mapping of RTP/RTCP streams to a specific track
  15. // a RTPReceiver may contain multiple streams if we are dealing with Simulcast
  16. type trackStreams struct {
  17. track *TrackRemote
  18. streamInfo, repairStreamInfo *interceptor.StreamInfo
  19. rtpReadStream *srtp.ReadStreamSRTP
  20. rtpInterceptor interceptor.RTPReader
  21. rtcpReadStream *srtp.ReadStreamSRTCP
  22. rtcpInterceptor interceptor.RTCPReader
  23. repairReadStream *srtp.ReadStreamSRTP
  24. repairInterceptor interceptor.RTPReader
  25. repairRtcpReadStream *srtp.ReadStreamSRTCP
  26. repairRtcpInterceptor interceptor.RTCPReader
  27. }
  28. // RTPReceiver allows an application to inspect the receipt of a TrackRemote
  29. type RTPReceiver struct {
  30. kind RTPCodecType
  31. transport *DTLSTransport
  32. tracks []trackStreams
  33. closed, received chan interface{}
  34. mu sync.RWMutex
  35. tr *RTPTransceiver
  36. // A reference to the associated api object
  37. api *API
  38. }
  39. // NewRTPReceiver constructs a new RTPReceiver
  40. func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RTPReceiver, error) {
  41. if transport == nil {
  42. return nil, errRTPReceiverDTLSTransportNil
  43. }
  44. r := &RTPReceiver{
  45. kind: kind,
  46. transport: transport,
  47. api: api,
  48. closed: make(chan interface{}),
  49. received: make(chan interface{}),
  50. tracks: []trackStreams{},
  51. }
  52. return r, nil
  53. }
  54. func (r *RTPReceiver) setRTPTransceiver(tr *RTPTransceiver) {
  55. r.mu.Lock()
  56. defer r.mu.Unlock()
  57. r.tr = tr
  58. }
  59. // Transport returns the currently-configured *DTLSTransport or nil
  60. // if one has not yet been configured
  61. func (r *RTPReceiver) Transport() *DTLSTransport {
  62. r.mu.RLock()
  63. defer r.mu.RUnlock()
  64. return r.transport
  65. }
  66. func (r *RTPReceiver) getParameters() RTPParameters {
  67. parameters := r.api.mediaEngine.getRTPParametersByKind(r.kind, []RTPTransceiverDirection{RTPTransceiverDirectionRecvonly})
  68. if r.tr != nil {
  69. parameters.Codecs = r.tr.getCodecs()
  70. }
  71. return parameters
  72. }
  73. // GetParameters describes the current configuration for the encoding and
  74. // transmission of media on the receiver's track.
  75. func (r *RTPReceiver) GetParameters() RTPParameters {
  76. r.mu.RLock()
  77. defer r.mu.RUnlock()
  78. return r.getParameters()
  79. }
  80. // Track returns the RtpTransceiver TrackRemote
  81. func (r *RTPReceiver) Track() *TrackRemote {
  82. r.mu.RLock()
  83. defer r.mu.RUnlock()
  84. if len(r.tracks) != 1 {
  85. return nil
  86. }
  87. return r.tracks[0].track
  88. }
  89. // Tracks returns the RtpTransceiver tracks
  90. // A RTPReceiver to support Simulcast may now have multiple tracks
  91. func (r *RTPReceiver) Tracks() []*TrackRemote {
  92. r.mu.RLock()
  93. defer r.mu.RUnlock()
  94. var tracks []*TrackRemote
  95. for i := range r.tracks {
  96. tracks = append(tracks, r.tracks[i].track)
  97. }
  98. return tracks
  99. }
  100. // configureReceive initialize the track
  101. func (r *RTPReceiver) configureReceive(parameters RTPReceiveParameters) {
  102. r.mu.Lock()
  103. defer r.mu.Unlock()
  104. for i := range parameters.Encodings {
  105. t := trackStreams{
  106. track: newTrackRemote(
  107. r.kind,
  108. parameters.Encodings[i].SSRC,
  109. parameters.Encodings[i].RID,
  110. r,
  111. ),
  112. }
  113. r.tracks = append(r.tracks, t)
  114. }
  115. }
  116. // startReceive starts all the transports
  117. func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error {
  118. r.mu.Lock()
  119. defer r.mu.Unlock()
  120. select {
  121. case <-r.received:
  122. return errRTPReceiverReceiveAlreadyCalled
  123. default:
  124. }
  125. defer close(r.received)
  126. globalParams := r.getParameters()
  127. codec := RTPCodecCapability{}
  128. if len(globalParams.Codecs) != 0 {
  129. codec = globalParams.Codecs[0].RTPCodecCapability
  130. }
  131. for i := range parameters.Encodings {
  132. if parameters.Encodings[i].RID != "" {
  133. // RID based tracks will be set up in receiveForRid
  134. continue
  135. }
  136. var t *trackStreams
  137. for idx, ts := range r.tracks {
  138. if ts.track != nil && parameters.Encodings[i].SSRC != 0 && ts.track.SSRC() == parameters.Encodings[i].SSRC {
  139. t = &r.tracks[idx]
  140. break
  141. }
  142. }
  143. if t == nil {
  144. return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, parameters.Encodings[i].SSRC)
  145. }
  146. if parameters.Encodings[i].SSRC != 0 {
  147. t.streamInfo = createStreamInfo("", parameters.Encodings[i].SSRC, 0, codec, globalParams.HeaderExtensions)
  148. var err error
  149. if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *t.streamInfo); err != nil {
  150. return err
  151. }
  152. }
  153. if rtxSsrc := parameters.Encodings[i].RTX.SSRC; rtxSsrc != 0 {
  154. streamInfo := createStreamInfo("", rtxSsrc, 0, codec, globalParams.HeaderExtensions)
  155. rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, err := r.transport.streamsForSSRC(rtxSsrc, *streamInfo)
  156. if err != nil {
  157. return err
  158. }
  159. if err = r.receiveForRtx(rtxSsrc, "", streamInfo, rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor); err != nil {
  160. return err
  161. }
  162. }
  163. }
  164. return nil
  165. }
  166. // Receive initialize the track and starts all the transports
  167. func (r *RTPReceiver) Receive(parameters RTPReceiveParameters) error {
  168. r.configureReceive(parameters)
  169. return r.startReceive(parameters)
  170. }
  171. // Read reads incoming RTCP for this RTPReceiver
  172. func (r *RTPReceiver) Read(b []byte) (n int, a interceptor.Attributes, err error) {
  173. select {
  174. case <-r.received:
  175. return r.tracks[0].rtcpInterceptor.Read(b, a)
  176. case <-r.closed:
  177. return 0, nil, io.ErrClosedPipe
  178. }
  179. }
  180. // ReadSimulcast reads incoming RTCP for this RTPReceiver for given rid
  181. func (r *RTPReceiver) ReadSimulcast(b []byte, rid string) (n int, a interceptor.Attributes, err error) {
  182. select {
  183. case <-r.received:
  184. for _, t := range r.tracks {
  185. if t.track != nil && t.track.rid == rid {
  186. return t.rtcpInterceptor.Read(b, a)
  187. }
  188. }
  189. return 0, nil, fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
  190. case <-r.closed:
  191. return 0, nil, io.ErrClosedPipe
  192. }
  193. }
  194. // ReadRTCP is a convenience method that wraps Read and unmarshal for you.
  195. // It also runs any configured interceptors.
  196. func (r *RTPReceiver) ReadRTCP() ([]rtcp.Packet, interceptor.Attributes, error) {
  197. b := make([]byte, r.api.settingEngine.getReceiveMTU())
  198. i, attributes, err := r.Read(b)
  199. if err != nil {
  200. return nil, nil, err
  201. }
  202. pkts, err := rtcp.Unmarshal(b[:i])
  203. if err != nil {
  204. return nil, nil, err
  205. }
  206. return pkts, attributes, nil
  207. }
  208. // ReadSimulcastRTCP is a convenience method that wraps ReadSimulcast and unmarshal for you
  209. func (r *RTPReceiver) ReadSimulcastRTCP(rid string) ([]rtcp.Packet, interceptor.Attributes, error) {
  210. b := make([]byte, r.api.settingEngine.getReceiveMTU())
  211. i, attributes, err := r.ReadSimulcast(b, rid)
  212. if err != nil {
  213. return nil, nil, err
  214. }
  215. pkts, err := rtcp.Unmarshal(b[:i])
  216. return pkts, attributes, err
  217. }
  218. func (r *RTPReceiver) haveReceived() bool {
  219. select {
  220. case <-r.received:
  221. return true
  222. default:
  223. return false
  224. }
  225. }
  226. // Stop irreversibly stops the RTPReceiver
  227. func (r *RTPReceiver) Stop() error {
  228. r.mu.Lock()
  229. defer r.mu.Unlock()
  230. var err error
  231. select {
  232. case <-r.closed:
  233. return err
  234. default:
  235. }
  236. select {
  237. case <-r.received:
  238. for i := range r.tracks {
  239. errs := []error{}
  240. if r.tracks[i].rtcpReadStream != nil {
  241. errs = append(errs, r.tracks[i].rtcpReadStream.Close())
  242. }
  243. if r.tracks[i].rtpReadStream != nil {
  244. errs = append(errs, r.tracks[i].rtpReadStream.Close())
  245. }
  246. if r.tracks[i].repairReadStream != nil {
  247. errs = append(errs, r.tracks[i].repairReadStream.Close())
  248. }
  249. if r.tracks[i].repairRtcpReadStream != nil {
  250. errs = append(errs, r.tracks[i].repairRtcpReadStream.Close())
  251. }
  252. if r.tracks[i].streamInfo != nil {
  253. r.api.interceptor.UnbindRemoteStream(r.tracks[i].streamInfo)
  254. }
  255. if r.tracks[i].repairStreamInfo != nil {
  256. r.api.interceptor.UnbindRemoteStream(r.tracks[i].repairStreamInfo)
  257. }
  258. err = util.FlattenErrs(errs)
  259. }
  260. default:
  261. }
  262. close(r.closed)
  263. return err
  264. }
  265. func (r *RTPReceiver) streamsForTrack(t *TrackRemote) *trackStreams {
  266. for i := range r.tracks {
  267. if r.tracks[i].track == t {
  268. return &r.tracks[i]
  269. }
  270. }
  271. return nil
  272. }
  273. // readRTP should only be called by a track, this only exists so we can keep state in one place
  274. func (r *RTPReceiver) readRTP(b []byte, reader *TrackRemote) (n int, a interceptor.Attributes, err error) {
  275. <-r.received
  276. if t := r.streamsForTrack(reader); t != nil {
  277. return t.rtpInterceptor.Read(b, a)
  278. }
  279. return 0, nil, fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
  280. }
  281. // receiveForRid is the sibling of Receive expect for RIDs instead of SSRCs
  282. // It populates all the internal state for the given RID
  283. func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) (*TrackRemote, error) {
  284. r.mu.Lock()
  285. defer r.mu.Unlock()
  286. for i := range r.tracks {
  287. if r.tracks[i].track.RID() == rid {
  288. r.tracks[i].track.mu.Lock()
  289. r.tracks[i].track.kind = r.kind
  290. r.tracks[i].track.codec = params.Codecs[0]
  291. r.tracks[i].track.params = params
  292. r.tracks[i].track.ssrc = SSRC(streamInfo.SSRC)
  293. r.tracks[i].track.mu.Unlock()
  294. r.tracks[i].streamInfo = streamInfo
  295. r.tracks[i].rtpReadStream = rtpReadStream
  296. r.tracks[i].rtpInterceptor = rtpInterceptor
  297. r.tracks[i].rtcpReadStream = rtcpReadStream
  298. r.tracks[i].rtcpInterceptor = rtcpInterceptor
  299. return r.tracks[i].track, nil
  300. }
  301. }
  302. return nil, fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
  303. }
  304. // receiveForRtx starts a routine that processes the repair stream
  305. // These packets aren't exposed to the user yet, but we need to process them for
  306. // TWCC
  307. func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error {
  308. var track *trackStreams
  309. if ssrc != 0 && len(r.tracks) == 1 {
  310. track = &r.tracks[0]
  311. } else {
  312. for i := range r.tracks {
  313. if r.tracks[i].track.RID() == rsid {
  314. track = &r.tracks[i]
  315. }
  316. }
  317. }
  318. if track == nil {
  319. return fmt.Errorf("%w: ssrc(%d) rsid(%s)", errRTPReceiverForRIDTrackStreamNotFound, ssrc, rsid)
  320. }
  321. track.repairStreamInfo = streamInfo
  322. track.repairReadStream = rtpReadStream
  323. track.repairInterceptor = rtpInterceptor
  324. track.repairRtcpReadStream = rtcpReadStream
  325. track.repairRtcpInterceptor = rtcpInterceptor
  326. go func() {
  327. b := make([]byte, r.api.settingEngine.getReceiveMTU())
  328. for {
  329. if _, _, readErr := track.repairInterceptor.Read(b, nil); readErr != nil {
  330. return
  331. }
  332. }
  333. }()
  334. return nil
  335. }
  336. // SetReadDeadline sets the max amount of time the RTCP stream will block before returning. 0 is forever.
  337. func (r *RTPReceiver) SetReadDeadline(t time.Time) error {
  338. r.mu.RLock()
  339. defer r.mu.RUnlock()
  340. return r.tracks[0].rtcpReadStream.SetReadDeadline(t)
  341. }
  342. // SetReadDeadlineSimulcast sets the max amount of time the RTCP stream for a given rid will block before returning. 0 is forever.
  343. func (r *RTPReceiver) SetReadDeadlineSimulcast(deadline time.Time, rid string) error {
  344. r.mu.RLock()
  345. defer r.mu.RUnlock()
  346. for _, t := range r.tracks {
  347. if t.track != nil && t.track.rid == rid {
  348. return t.rtcpReadStream.SetReadDeadline(deadline)
  349. }
  350. }
  351. return fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
  352. }
  353. // setRTPReadDeadline sets the max amount of time the RTP stream will block before returning. 0 is forever.
  354. // This should be fired by calling SetReadDeadline on the TrackRemote
  355. func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote) error {
  356. r.mu.RLock()
  357. defer r.mu.RUnlock()
  358. if t := r.streamsForTrack(reader); t != nil {
  359. return t.rtpReadStream.SetReadDeadline(deadline)
  360. }
  361. return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
  362. }