samplebuilder.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. // Package samplebuilder provides functionality to reconstruct media frames from RTP packets.
  4. package samplebuilder
  5. import (
  6. "math"
  7. "time"
  8. "github.com/pion/rtp"
  9. "github.com/pion/webrtc/v3/pkg/media"
  10. )
  11. // SampleBuilder buffers packets until media frames are complete.
  12. type SampleBuilder struct {
  13. maxLate uint16 // how many packets to wait until we get a valid Sample
  14. maxLateTimestamp uint32 // max timestamp between old and new timestamps before dropping packets
  15. buffer [math.MaxUint16 + 1]*rtp.Packet
  16. preparedSamples [math.MaxUint16 + 1]*media.Sample
  17. // Interface that allows us to take RTP packets to samples
  18. depacketizer rtp.Depacketizer
  19. // sampleRate allows us to compute duration of media.SamplecA
  20. sampleRate uint32
  21. // the handler to be called when the builder is about to remove the
  22. // reference to some packet.
  23. packetReleaseHandler func(*rtp.Packet)
  24. // filled contains the head/tail of the packets inserted into the buffer
  25. filled sampleSequenceLocation
  26. // active contains the active head/tail of the timestamp being actively processed
  27. active sampleSequenceLocation
  28. // prepared contains the samples that have been processed to date
  29. prepared sampleSequenceLocation
  30. // number of packets forced to be dropped
  31. droppedPackets uint16
  32. // allows inspecting head packets of each sample and then returns a custom metadata
  33. packetHeadHandler func(headPacket interface{}) interface{}
  34. }
  35. // New constructs a new SampleBuilder.
  36. // maxLate is how long to wait until we can construct a completed media.Sample.
  37. // maxLate is measured in RTP packet sequence numbers.
  38. // A large maxLate will result in less packet loss but higher latency.
  39. // The depacketizer extracts media samples from RTP packets.
  40. // Several depacketizers are available in package github.com/pion/rtp/codecs.
  41. func New(maxLate uint16, depacketizer rtp.Depacketizer, sampleRate uint32, opts ...Option) *SampleBuilder {
  42. s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate}
  43. for _, o := range opts {
  44. o(s)
  45. }
  46. return s
  47. }
  48. func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool {
  49. if s.maxLateTimestamp == 0 {
  50. return false
  51. }
  52. var foundHead *rtp.Packet
  53. var foundTail *rtp.Packet
  54. for i := location.head; i != location.tail; i++ {
  55. if packet := s.buffer[i]; packet != nil {
  56. foundHead = packet
  57. break
  58. }
  59. }
  60. if foundHead == nil {
  61. return false
  62. }
  63. for i := location.tail - 1; i != location.head; i-- {
  64. if packet := s.buffer[i]; packet != nil {
  65. foundTail = packet
  66. break
  67. }
  68. }
  69. if foundTail == nil {
  70. return false
  71. }
  72. return timestampDistance(foundHead.Timestamp, foundTail.Timestamp) > s.maxLateTimestamp
  73. }
  74. // fetchTimestamp returns the timestamp associated with a given sample location
  75. func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timestamp uint32, hasData bool) {
  76. if location.empty() {
  77. return 0, false
  78. }
  79. packet := s.buffer[location.head]
  80. if packet == nil {
  81. return 0, false
  82. }
  83. return packet.Timestamp, true
  84. }
  85. func (s *SampleBuilder) releasePacket(i uint16) {
  86. var p *rtp.Packet
  87. p, s.buffer[i] = s.buffer[i], nil
  88. if p != nil && s.packetReleaseHandler != nil {
  89. s.packetReleaseHandler(p)
  90. }
  91. }
  92. // purgeConsumedBuffers clears all buffers that have already been consumed by
  93. // popping.
  94. func (s *SampleBuilder) purgeConsumedBuffers() {
  95. s.purgeConsumedLocation(s.active, false)
  96. }
  97. // purgeConsumedLocation clears all buffers that have already been consumed
  98. // during a sample building method.
  99. func (s *SampleBuilder) purgeConsumedLocation(consume sampleSequenceLocation, forceConsume bool) {
  100. if !s.filled.hasData() {
  101. return
  102. }
  103. switch consume.compare(s.filled.head) {
  104. case slCompareInside:
  105. if !forceConsume {
  106. break
  107. }
  108. fallthrough
  109. case slCompareBefore:
  110. s.releasePacket(s.filled.head)
  111. s.filled.head++
  112. }
  113. }
  114. // purgeBuffers flushes all buffers that are already consumed or those buffers
  115. // that are too late to consume.
  116. func (s *SampleBuilder) purgeBuffers() {
  117. s.purgeConsumedBuffers()
  118. for (s.tooOld(s.filled) || (s.filled.count() > s.maxLate)) && s.filled.hasData() {
  119. if s.active.empty() {
  120. // refill the active based on the filled packets
  121. s.active = s.filled
  122. }
  123. if s.active.hasData() && (s.active.head == s.filled.head) {
  124. // attempt to force the active packet to be consumed even though
  125. // outstanding data may be pending arrival
  126. if s.buildSample(true) != nil {
  127. continue
  128. }
  129. // could not build the sample so drop it
  130. s.active.head++
  131. s.droppedPackets++
  132. }
  133. s.releasePacket(s.filled.head)
  134. s.filled.head++
  135. }
  136. }
  137. // Push adds an RTP Packet to s's buffer.
  138. //
  139. // Push does not copy the input. If you wish to reuse
  140. // this memory make sure to copy before calling Push
  141. func (s *SampleBuilder) Push(p *rtp.Packet) {
  142. s.buffer[p.SequenceNumber] = p
  143. switch s.filled.compare(p.SequenceNumber) {
  144. case slCompareVoid:
  145. s.filled.head = p.SequenceNumber
  146. s.filled.tail = p.SequenceNumber + 1
  147. case slCompareBefore:
  148. s.filled.head = p.SequenceNumber
  149. case slCompareAfter:
  150. s.filled.tail = p.SequenceNumber + 1
  151. case slCompareInside:
  152. break
  153. }
  154. s.purgeBuffers()
  155. }
  156. const secondToNanoseconds = 1000000000
  157. // buildSample creates a sample from a valid collection of RTP Packets by
  158. // walking forwards building a sample if everything looks good clear and
  159. // update buffer+values
  160. func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
  161. if s.active.empty() {
  162. s.active = s.filled
  163. }
  164. if s.active.empty() {
  165. return nil
  166. }
  167. if s.filled.compare(s.active.tail) == slCompareInside {
  168. s.active.tail = s.filled.tail
  169. }
  170. var consume sampleSequenceLocation
  171. for i := s.active.head; s.buffer[i] != nil && s.active.compare(i) != slCompareAfter; i++ {
  172. if s.depacketizer.IsPartitionTail(s.buffer[i].Marker, s.buffer[i].Payload) {
  173. consume.head = s.active.head
  174. consume.tail = i + 1
  175. break
  176. }
  177. headTimestamp, hasData := s.fetchTimestamp(s.active)
  178. if hasData && s.buffer[i].Timestamp != headTimestamp {
  179. consume.head = s.active.head
  180. consume.tail = i
  181. break
  182. }
  183. }
  184. if consume.empty() {
  185. return nil
  186. }
  187. if !purgingBuffers && s.buffer[consume.tail] == nil {
  188. // wait for the next packet after this set of packets to arrive
  189. // to ensure at least one post sample timestamp is known
  190. // (unless we have to release right now)
  191. return nil
  192. }
  193. sampleTimestamp, _ := s.fetchTimestamp(s.active)
  194. afterTimestamp := sampleTimestamp
  195. // scan for any packet after the current and use that time stamp as the diff point
  196. for i := consume.tail; i < s.active.tail; i++ {
  197. if s.buffer[i] != nil {
  198. afterTimestamp = s.buffer[i].Timestamp
  199. break
  200. }
  201. }
  202. // the head set of packets is now fully consumed
  203. s.active.head = consume.tail
  204. // prior to decoding all the packets, check if this packet
  205. // would end being disposed anyway
  206. if !s.depacketizer.IsPartitionHead(s.buffer[consume.head].Payload) {
  207. s.droppedPackets += consume.count()
  208. s.purgeConsumedLocation(consume, true)
  209. s.purgeConsumedBuffers()
  210. return nil
  211. }
  212. // merge all the buffers into a sample
  213. data := []byte{}
  214. var metadata interface{}
  215. for i := consume.head; i != consume.tail; i++ {
  216. p, err := s.depacketizer.Unmarshal(s.buffer[i].Payload)
  217. if err != nil {
  218. return nil
  219. }
  220. if i == consume.head && s.packetHeadHandler != nil {
  221. metadata = s.packetHeadHandler(s.depacketizer)
  222. }
  223. data = append(data, p...)
  224. }
  225. samples := afterTimestamp - sampleTimestamp
  226. sample := &media.Sample{
  227. Data: data,
  228. Duration: time.Duration((float64(samples)/float64(s.sampleRate))*secondToNanoseconds) * time.Nanosecond,
  229. PacketTimestamp: sampleTimestamp,
  230. PrevDroppedPackets: s.droppedPackets,
  231. Metadata: metadata,
  232. }
  233. s.droppedPackets = 0
  234. s.preparedSamples[s.prepared.tail] = sample
  235. s.prepared.tail++
  236. s.purgeConsumedLocation(consume, true)
  237. s.purgeConsumedBuffers()
  238. return sample
  239. }
  240. // Pop compiles pushed RTP packets into media samples and then
  241. // returns the next valid sample (or nil if no sample is compiled).
  242. func (s *SampleBuilder) Pop() *media.Sample {
  243. _ = s.buildSample(false)
  244. if s.prepared.empty() {
  245. return nil
  246. }
  247. var result *media.Sample
  248. result, s.preparedSamples[s.prepared.head] = s.preparedSamples[s.prepared.head], nil
  249. s.prepared.head++
  250. return result
  251. }
  252. // PopWithTimestamp compiles pushed RTP packets into media samples and then
  253. // returns the next valid sample with its associated RTP timestamp (or nil, 0 if
  254. // no sample is compiled).
  255. //
  256. // Deprecated: PopWithTimestamp will be removed in v4. Use Sample.PacketTimestamp field instead.
  257. func (s *SampleBuilder) PopWithTimestamp() (*media.Sample, uint32) {
  258. sample := s.Pop()
  259. if sample == nil {
  260. return nil, 0
  261. }
  262. return sample, sample.PacketTimestamp
  263. }
  264. // seqnumDistance computes the distance between two sequence numbers
  265. func seqnumDistance(x, y uint16) uint16 {
  266. diff := int16(x - y)
  267. if diff < 0 {
  268. return uint16(-diff)
  269. }
  270. return uint16(diff)
  271. }
  272. // timestampDistance computes the distance between two timestamps
  273. func timestampDistance(x, y uint32) uint32 {
  274. diff := int32(x - y)
  275. if diff < 0 {
  276. return uint32(-diff)
  277. }
  278. return uint32(diff)
  279. }
  280. // An Option configures a SampleBuilder.
  281. type Option func(o *SampleBuilder)
  282. // WithPartitionHeadChecker is obsolete, it does nothing.
  283. func WithPartitionHeadChecker(interface{}) Option {
  284. return func(o *SampleBuilder) {
  285. }
  286. }
  287. // WithPacketReleaseHandler set a callback when the builder is about to release
  288. // some packet.
  289. func WithPacketReleaseHandler(h func(*rtp.Packet)) Option {
  290. return func(o *SampleBuilder) {
  291. o.packetReleaseHandler = h
  292. }
  293. }
  294. // WithPacketHeadHandler set a head packet handler to allow inspecting
  295. // the packet to extract certain information and return as custom metadata
  296. func WithPacketHeadHandler(h func(headPacket interface{}) interface{}) Option {
  297. return func(o *SampleBuilder) {
  298. o.packetHeadHandler = h
  299. }
  300. }
  301. // WithMaxTimeDelay ensures that packets that are too old in the buffer get
  302. // purged based on time rather than building up an extraordinarily long delay.
  303. func WithMaxTimeDelay(maxLateDuration time.Duration) Option {
  304. return func(o *SampleBuilder) {
  305. totalMillis := maxLateDuration.Milliseconds()
  306. o.maxLateTimestamp = uint32(int64(o.sampleRate) * totalMillis / 1000)
  307. }
  308. }