rtpreceiver.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. //go:build !js
  4. // +build !js
  5. package webrtc
  6. import (
  7. "encoding/binary"
  8. "fmt"
  9. "io"
  10. "sync"
  11. "time"
  12. "github.com/pion/interceptor"
  13. "github.com/pion/rtcp"
  14. "github.com/pion/srtp/v2"
  15. "github.com/pion/webrtc/v3/internal/util"
  16. )
  17. // trackStreams maintains a mapping of RTP/RTCP streams to a specific track
  18. // a RTPReceiver may contain multiple streams if we are dealing with Simulcast
  19. type trackStreams struct {
  20. track *TrackRemote
  21. streamInfo, repairStreamInfo *interceptor.StreamInfo
  22. rtpReadStream *srtp.ReadStreamSRTP
  23. rtpInterceptor interceptor.RTPReader
  24. rtcpReadStream *srtp.ReadStreamSRTCP
  25. rtcpInterceptor interceptor.RTCPReader
  26. repairReadStream *srtp.ReadStreamSRTP
  27. repairInterceptor interceptor.RTPReader
  28. repairStreamChannel chan rtxPacketWithAttributes
  29. repairRtcpReadStream *srtp.ReadStreamSRTCP
  30. repairRtcpInterceptor interceptor.RTCPReader
  31. }
  32. type rtxPacketWithAttributes struct {
  33. pkt []byte
  34. attributes interceptor.Attributes
  35. pool *sync.Pool
  36. }
  37. func (p *rtxPacketWithAttributes) release() {
  38. if p.pkt != nil {
  39. b := p.pkt[:cap(p.pkt)]
  40. p.pool.Put(b) // nolint:staticcheck
  41. p.pkt = nil
  42. }
  43. }
  44. // RTPReceiver allows an application to inspect the receipt of a TrackRemote
  45. type RTPReceiver struct {
  46. kind RTPCodecType
  47. transport *DTLSTransport
  48. tracks []trackStreams
  49. closed, received chan interface{}
  50. mu sync.RWMutex
  51. tr *RTPTransceiver
  52. // A reference to the associated api object
  53. api *API
  54. rtxPool sync.Pool
  55. }
  56. // NewRTPReceiver constructs a new RTPReceiver
  57. func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RTPReceiver, error) {
  58. if transport == nil {
  59. return nil, errRTPReceiverDTLSTransportNil
  60. }
  61. r := &RTPReceiver{
  62. kind: kind,
  63. transport: transport,
  64. api: api,
  65. closed: make(chan interface{}),
  66. received: make(chan interface{}),
  67. tracks: []trackStreams{},
  68. rtxPool: sync.Pool{New: func() interface{} {
  69. return make([]byte, api.settingEngine.getReceiveMTU())
  70. }},
  71. }
  72. return r, nil
  73. }
  74. func (r *RTPReceiver) setRTPTransceiver(tr *RTPTransceiver) {
  75. r.mu.Lock()
  76. defer r.mu.Unlock()
  77. r.tr = tr
  78. }
  79. // Transport returns the currently-configured *DTLSTransport or nil
  80. // if one has not yet been configured
  81. func (r *RTPReceiver) Transport() *DTLSTransport {
  82. r.mu.RLock()
  83. defer r.mu.RUnlock()
  84. return r.transport
  85. }
  86. func (r *RTPReceiver) getParameters() RTPParameters {
  87. parameters := r.api.mediaEngine.getRTPParametersByKind(r.kind, []RTPTransceiverDirection{RTPTransceiverDirectionRecvonly})
  88. if r.tr != nil {
  89. parameters.Codecs = r.tr.getCodecs()
  90. }
  91. return parameters
  92. }
  93. // GetParameters describes the current configuration for the encoding and
  94. // transmission of media on the receiver's track.
  95. func (r *RTPReceiver) GetParameters() RTPParameters {
  96. r.mu.RLock()
  97. defer r.mu.RUnlock()
  98. return r.getParameters()
  99. }
  100. // Track returns the RtpTransceiver TrackRemote
  101. func (r *RTPReceiver) Track() *TrackRemote {
  102. r.mu.RLock()
  103. defer r.mu.RUnlock()
  104. if len(r.tracks) != 1 {
  105. return nil
  106. }
  107. return r.tracks[0].track
  108. }
  109. // Tracks returns the RtpTransceiver tracks
  110. // A RTPReceiver to support Simulcast may now have multiple tracks
  111. func (r *RTPReceiver) Tracks() []*TrackRemote {
  112. r.mu.RLock()
  113. defer r.mu.RUnlock()
  114. var tracks []*TrackRemote
  115. for i := range r.tracks {
  116. tracks = append(tracks, r.tracks[i].track)
  117. }
  118. return tracks
  119. }
  120. // RTPTransceiver returns the RTPTransceiver this
  121. // RTPReceiver belongs too, or nil if none
  122. func (r *RTPReceiver) RTPTransceiver() *RTPTransceiver {
  123. r.mu.Lock()
  124. defer r.mu.Unlock()
  125. return r.tr
  126. }
  127. // configureReceive initialize the track
  128. func (r *RTPReceiver) configureReceive(parameters RTPReceiveParameters) {
  129. r.mu.Lock()
  130. defer r.mu.Unlock()
  131. for i := range parameters.Encodings {
  132. t := trackStreams{
  133. track: newTrackRemote(
  134. r.kind,
  135. parameters.Encodings[i].SSRC,
  136. parameters.Encodings[i].RTX.SSRC,
  137. parameters.Encodings[i].RID,
  138. r,
  139. ),
  140. }
  141. r.tracks = append(r.tracks, t)
  142. }
  143. }
  144. // startReceive starts all the transports
  145. func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error {
  146. r.mu.Lock()
  147. defer r.mu.Unlock()
  148. select {
  149. case <-r.received:
  150. return errRTPReceiverReceiveAlreadyCalled
  151. default:
  152. }
  153. defer close(r.received)
  154. globalParams := r.getParameters()
  155. codec := RTPCodecCapability{}
  156. if len(globalParams.Codecs) != 0 {
  157. codec = globalParams.Codecs[0].RTPCodecCapability
  158. }
  159. for i := range parameters.Encodings {
  160. if parameters.Encodings[i].RID != "" {
  161. // RID based tracks will be set up in receiveForRid
  162. continue
  163. }
  164. var t *trackStreams
  165. for idx, ts := range r.tracks {
  166. if ts.track != nil && parameters.Encodings[i].SSRC != 0 && ts.track.SSRC() == parameters.Encodings[i].SSRC {
  167. t = &r.tracks[idx]
  168. break
  169. }
  170. }
  171. if t == nil {
  172. return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, parameters.Encodings[i].SSRC)
  173. }
  174. if parameters.Encodings[i].SSRC != 0 {
  175. t.streamInfo = createStreamInfo("", parameters.Encodings[i].SSRC, 0, codec, globalParams.HeaderExtensions)
  176. var err error
  177. if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *t.streamInfo); err != nil {
  178. return err
  179. }
  180. }
  181. if rtxSsrc := parameters.Encodings[i].RTX.SSRC; rtxSsrc != 0 {
  182. streamInfo := createStreamInfo("", rtxSsrc, 0, codec, globalParams.HeaderExtensions)
  183. rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, err := r.transport.streamsForSSRC(rtxSsrc, *streamInfo)
  184. if err != nil {
  185. return err
  186. }
  187. if err = r.receiveForRtx(rtxSsrc, "", streamInfo, rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor); err != nil {
  188. return err
  189. }
  190. }
  191. }
  192. return nil
  193. }
  194. // Receive initialize the track and starts all the transports
  195. func (r *RTPReceiver) Receive(parameters RTPReceiveParameters) error {
  196. r.configureReceive(parameters)
  197. return r.startReceive(parameters)
  198. }
  199. // Read reads incoming RTCP for this RTPReceiver
  200. func (r *RTPReceiver) Read(b []byte) (n int, a interceptor.Attributes, err error) {
  201. select {
  202. case <-r.received:
  203. return r.tracks[0].rtcpInterceptor.Read(b, a)
  204. case <-r.closed:
  205. return 0, nil, io.ErrClosedPipe
  206. }
  207. }
  208. // ReadSimulcast reads incoming RTCP for this RTPReceiver for given rid
  209. func (r *RTPReceiver) ReadSimulcast(b []byte, rid string) (n int, a interceptor.Attributes, err error) {
  210. select {
  211. case <-r.received:
  212. for _, t := range r.tracks {
  213. if t.track != nil && t.track.rid == rid {
  214. return t.rtcpInterceptor.Read(b, a)
  215. }
  216. }
  217. return 0, nil, fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
  218. case <-r.closed:
  219. return 0, nil, io.ErrClosedPipe
  220. }
  221. }
  222. // ReadRTCP is a convenience method that wraps Read and unmarshal for you.
  223. // It also runs any configured interceptors.
  224. func (r *RTPReceiver) ReadRTCP() ([]rtcp.Packet, interceptor.Attributes, error) {
  225. b := make([]byte, r.api.settingEngine.getReceiveMTU())
  226. i, attributes, err := r.Read(b)
  227. if err != nil {
  228. return nil, nil, err
  229. }
  230. pkts, err := rtcp.Unmarshal(b[:i])
  231. if err != nil {
  232. return nil, nil, err
  233. }
  234. return pkts, attributes, nil
  235. }
  236. // ReadSimulcastRTCP is a convenience method that wraps ReadSimulcast and unmarshal for you
  237. func (r *RTPReceiver) ReadSimulcastRTCP(rid string) ([]rtcp.Packet, interceptor.Attributes, error) {
  238. b := make([]byte, r.api.settingEngine.getReceiveMTU())
  239. i, attributes, err := r.ReadSimulcast(b, rid)
  240. if err != nil {
  241. return nil, nil, err
  242. }
  243. pkts, err := rtcp.Unmarshal(b[:i])
  244. return pkts, attributes, err
  245. }
  246. func (r *RTPReceiver) haveReceived() bool {
  247. select {
  248. case <-r.received:
  249. return true
  250. default:
  251. return false
  252. }
  253. }
  254. // Stop irreversibly stops the RTPReceiver
  255. func (r *RTPReceiver) Stop() error {
  256. r.mu.Lock()
  257. defer r.mu.Unlock()
  258. var err error
  259. select {
  260. case <-r.closed:
  261. return err
  262. default:
  263. }
  264. select {
  265. case <-r.received:
  266. for i := range r.tracks {
  267. errs := []error{}
  268. if r.tracks[i].rtcpReadStream != nil {
  269. errs = append(errs, r.tracks[i].rtcpReadStream.Close())
  270. }
  271. if r.tracks[i].rtpReadStream != nil {
  272. errs = append(errs, r.tracks[i].rtpReadStream.Close())
  273. }
  274. if r.tracks[i].repairReadStream != nil {
  275. errs = append(errs, r.tracks[i].repairReadStream.Close())
  276. }
  277. if r.tracks[i].repairRtcpReadStream != nil {
  278. errs = append(errs, r.tracks[i].repairRtcpReadStream.Close())
  279. }
  280. if r.tracks[i].streamInfo != nil {
  281. r.api.interceptor.UnbindRemoteStream(r.tracks[i].streamInfo)
  282. }
  283. if r.tracks[i].repairStreamInfo != nil {
  284. r.api.interceptor.UnbindRemoteStream(r.tracks[i].repairStreamInfo)
  285. }
  286. err = util.FlattenErrs(errs)
  287. }
  288. default:
  289. }
  290. close(r.closed)
  291. return err
  292. }
  293. func (r *RTPReceiver) streamsForTrack(t *TrackRemote) *trackStreams {
  294. for i := range r.tracks {
  295. if r.tracks[i].track == t {
  296. return &r.tracks[i]
  297. }
  298. }
  299. return nil
  300. }
  301. // readRTP should only be called by a track, this only exists so we can keep state in one place
  302. func (r *RTPReceiver) readRTP(b []byte, reader *TrackRemote) (n int, a interceptor.Attributes, err error) {
  303. <-r.received
  304. if t := r.streamsForTrack(reader); t != nil {
  305. return t.rtpInterceptor.Read(b, a)
  306. }
  307. return 0, nil, fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
  308. }
  309. // receiveForRid is the sibling of Receive expect for RIDs instead of SSRCs
  310. // It populates all the internal state for the given RID
  311. func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) (*TrackRemote, error) {
  312. r.mu.Lock()
  313. defer r.mu.Unlock()
  314. for i := range r.tracks {
  315. if r.tracks[i].track.RID() == rid {
  316. r.tracks[i].track.mu.Lock()
  317. r.tracks[i].track.kind = r.kind
  318. r.tracks[i].track.codec = params.Codecs[0]
  319. r.tracks[i].track.params = params
  320. r.tracks[i].track.ssrc = SSRC(streamInfo.SSRC)
  321. r.tracks[i].track.mu.Unlock()
  322. r.tracks[i].streamInfo = streamInfo
  323. r.tracks[i].rtpReadStream = rtpReadStream
  324. r.tracks[i].rtpInterceptor = rtpInterceptor
  325. r.tracks[i].rtcpReadStream = rtcpReadStream
  326. r.tracks[i].rtcpInterceptor = rtcpInterceptor
  327. return r.tracks[i].track, nil
  328. }
  329. }
  330. return nil, fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
  331. }
  332. // receiveForRtx starts a routine that processes the repair stream
  333. func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error {
  334. var track *trackStreams
  335. if ssrc != 0 && len(r.tracks) == 1 {
  336. track = &r.tracks[0]
  337. } else {
  338. for i := range r.tracks {
  339. if r.tracks[i].track.RID() == rsid {
  340. track = &r.tracks[i]
  341. if track.track.RtxSSRC() == 0 {
  342. track.track.setRtxSSRC(SSRC(streamInfo.SSRC))
  343. }
  344. break
  345. }
  346. }
  347. }
  348. if track == nil {
  349. return fmt.Errorf("%w: ssrc(%d) rsid(%s)", errRTPReceiverForRIDTrackStreamNotFound, ssrc, rsid)
  350. }
  351. track.repairStreamInfo = streamInfo
  352. track.repairReadStream = rtpReadStream
  353. track.repairInterceptor = rtpInterceptor
  354. track.repairRtcpReadStream = rtcpReadStream
  355. track.repairRtcpInterceptor = rtcpInterceptor
  356. track.repairStreamChannel = make(chan rtxPacketWithAttributes)
  357. go func() {
  358. for {
  359. b := r.rtxPool.Get().([]byte) // nolint:forcetypeassert
  360. i, attributes, err := track.repairInterceptor.Read(b, nil)
  361. if err != nil {
  362. r.rtxPool.Put(b) // nolint:staticcheck
  363. return
  364. }
  365. // RTX packets have a different payload format. Move the OSN in the payload to the RTP header and rewrite the
  366. // payload type and SSRC, so that we can return RTX packets to the caller 'transparently' i.e. in the same format
  367. // as non-RTX RTP packets
  368. hasExtension := b[0]&0b10000 > 0
  369. hasPadding := b[0]&0b100000 > 0
  370. csrcCount := b[0] & 0b1111
  371. headerLength := uint16(12 + (4 * csrcCount))
  372. paddingLength := 0
  373. if hasExtension {
  374. headerLength += 4 * (1 + binary.BigEndian.Uint16(b[headerLength+2:headerLength+4]))
  375. }
  376. if hasPadding {
  377. paddingLength = int(b[i-1])
  378. }
  379. if i-int(headerLength)-paddingLength < 2 {
  380. // BWE probe packet, ignore
  381. r.rtxPool.Put(b) // nolint:staticcheck
  382. continue
  383. }
  384. if attributes == nil {
  385. attributes = make(interceptor.Attributes)
  386. }
  387. attributes.Set(AttributeRtxPayloadType, b[1]&0x7F)
  388. attributes.Set(AttributeRtxSequenceNumber, binary.BigEndian.Uint16(b[2:4]))
  389. attributes.Set(AttributeRtxSsrc, binary.BigEndian.Uint32(b[8:12]))
  390. b[1] = (b[1] & 0x80) | uint8(track.track.PayloadType())
  391. b[2] = b[headerLength]
  392. b[3] = b[headerLength+1]
  393. binary.BigEndian.PutUint32(b[8:12], uint32(track.track.SSRC()))
  394. copy(b[headerLength:i-2], b[headerLength+2:i])
  395. select {
  396. case <-r.closed:
  397. r.rtxPool.Put(b) // nolint:staticcheck
  398. return
  399. case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes, pool: &r.rtxPool}:
  400. }
  401. }
  402. }()
  403. return nil
  404. }
  405. // SetReadDeadline sets the max amount of time the RTCP stream will block before returning. 0 is forever.
  406. func (r *RTPReceiver) SetReadDeadline(t time.Time) error {
  407. r.mu.RLock()
  408. defer r.mu.RUnlock()
  409. return r.tracks[0].rtcpReadStream.SetReadDeadline(t)
  410. }
  411. // SetReadDeadlineSimulcast sets the max amount of time the RTCP stream for a given rid will block before returning. 0 is forever.
  412. func (r *RTPReceiver) SetReadDeadlineSimulcast(deadline time.Time, rid string) error {
  413. r.mu.RLock()
  414. defer r.mu.RUnlock()
  415. for _, t := range r.tracks {
  416. if t.track != nil && t.track.rid == rid {
  417. return t.rtcpReadStream.SetReadDeadline(deadline)
  418. }
  419. }
  420. return fmt.Errorf("%w: %s", errRTPReceiverForRIDTrackStreamNotFound, rid)
  421. }
  422. // setRTPReadDeadline sets the max amount of time the RTP stream will block before returning. 0 is forever.
  423. // This should be fired by calling SetReadDeadline on the TrackRemote
  424. func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote) error {
  425. r.mu.RLock()
  426. defer r.mu.RUnlock()
  427. if t := r.streamsForTrack(reader); t != nil {
  428. return t.rtpReadStream.SetReadDeadline(deadline)
  429. }
  430. return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
  431. }
  432. // readRTX returns an RTX packet if one is available on the RTX track, otherwise returns nil
  433. func (r *RTPReceiver) readRTX(reader *TrackRemote) *rtxPacketWithAttributes {
  434. if !reader.HasRTX() {
  435. return nil
  436. }
  437. select {
  438. case <-r.received:
  439. default:
  440. return nil
  441. }
  442. if t := r.streamsForTrack(reader); t != nil {
  443. select {
  444. case rtxPacketReceived := <-t.repairStreamChannel:
  445. return &rtxPacketReceived
  446. default:
  447. }
  448. }
  449. return nil
  450. }