main.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. //go:build !js
  4. // +build !js
  5. // bandwidth-estimation-from-disk demonstrates how to use Pion's Bandwidth Estimation APIs.
  6. package main
  7. import (
  8. "errors"
  9. "fmt"
  10. "io"
  11. "os"
  12. "time"
  13. "github.com/pion/interceptor"
  14. "github.com/pion/interceptor/pkg/cc"
  15. "github.com/pion/interceptor/pkg/gcc"
  16. "github.com/pion/webrtc/v3"
  17. "github.com/pion/webrtc/v3/examples/internal/signal"
  18. "github.com/pion/webrtc/v3/pkg/media"
  19. "github.com/pion/webrtc/v3/pkg/media/ivfreader"
  20. )
  21. const (
  22. lowFile = "low.ivf"
  23. lowBitrate = 300_000
  24. medFile = "med.ivf"
  25. medBitrate = 1_000_000
  26. highFile = "high.ivf"
  27. highBitrate = 2_500_000
  28. ivfHeaderSize = 32
  29. )
  30. // nolint: gocognit
  31. func main() {
  32. qualityLevels := []struct {
  33. fileName string
  34. bitrate int
  35. }{
  36. {lowFile, lowBitrate},
  37. {medFile, medBitrate},
  38. {highFile, highBitrate},
  39. }
  40. currentQuality := 0
  41. for _, level := range qualityLevels {
  42. _, err := os.Stat(level.fileName)
  43. if os.IsNotExist(err) {
  44. panic(fmt.Sprintf("File %s was not found", level.fileName))
  45. }
  46. }
  47. i := &interceptor.Registry{}
  48. m := &webrtc.MediaEngine{}
  49. if err := m.RegisterDefaultCodecs(); err != nil {
  50. panic(err)
  51. }
  52. // Create a Congestion Controller. This analyzes inbound and outbound data and provides
  53. // suggestions on how much we should be sending.
  54. //
  55. // Passing `nil` means we use the default Estimation Algorithm which is Google Congestion Control.
  56. // You can use the other ones that Pion provides, or write your own!
  57. congestionController, err := cc.NewInterceptor(func() (cc.BandwidthEstimator, error) {
  58. return gcc.NewSendSideBWE(gcc.SendSideBWEInitialBitrate(lowBitrate))
  59. })
  60. if err != nil {
  61. panic(err)
  62. }
  63. estimatorChan := make(chan cc.BandwidthEstimator, 1)
  64. congestionController.OnNewPeerConnection(func(id string, estimator cc.BandwidthEstimator) {
  65. estimatorChan <- estimator
  66. })
  67. i.Add(congestionController)
  68. if err = webrtc.ConfigureTWCCHeaderExtensionSender(m, i); err != nil {
  69. panic(err)
  70. }
  71. if err = webrtc.RegisterDefaultInterceptors(m, i); err != nil {
  72. panic(err)
  73. }
  74. // Create a new RTCPeerConnection
  75. peerConnection, err := webrtc.NewAPI(webrtc.WithInterceptorRegistry(i), webrtc.WithMediaEngine(m)).NewPeerConnection(webrtc.Configuration{
  76. ICEServers: []webrtc.ICEServer{
  77. {
  78. URLs: []string{"stun:stun.l.google.com:19302"},
  79. },
  80. },
  81. })
  82. if err != nil {
  83. panic(err)
  84. }
  85. defer func() {
  86. if cErr := peerConnection.Close(); cErr != nil {
  87. fmt.Printf("cannot close peerConnection: %v\n", cErr)
  88. }
  89. }()
  90. // Wait until our Bandwidth Estimator has been created
  91. estimator := <-estimatorChan
  92. // Create a video track
  93. videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8}, "video", "pion")
  94. if err != nil {
  95. panic(err)
  96. }
  97. rtpSender, err := peerConnection.AddTrack(videoTrack)
  98. if err != nil {
  99. panic(err)
  100. }
  101. // Read incoming RTCP packets
  102. // Before these packets are returned they are processed by interceptors. For things
  103. // like NACK this needs to be called.
  104. go func() {
  105. rtcpBuf := make([]byte, 1500)
  106. for {
  107. if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
  108. return
  109. }
  110. }
  111. }()
  112. // Set the handler for ICE connection state
  113. // This will notify you when the peer has connected/disconnected
  114. peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
  115. fmt.Printf("Connection State has changed %s \n", connectionState.String())
  116. })
  117. // Set the handler for Peer connection state
  118. // This will notify you when the peer has connected/disconnected
  119. peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
  120. fmt.Printf("Peer Connection State has changed: %s\n", s.String())
  121. })
  122. // Wait for the offer to be pasted
  123. offer := webrtc.SessionDescription{}
  124. signal.Decode(signal.MustReadStdin(), &offer)
  125. // Set the remote SessionDescription
  126. if err = peerConnection.SetRemoteDescription(offer); err != nil {
  127. panic(err)
  128. }
  129. // Create answer
  130. answer, err := peerConnection.CreateAnswer(nil)
  131. if err != nil {
  132. panic(err)
  133. }
  134. // Create channel that is blocked until ICE Gathering is complete
  135. gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
  136. // Sets the LocalDescription, and starts our UDP listeners
  137. if err = peerConnection.SetLocalDescription(answer); err != nil {
  138. panic(err)
  139. }
  140. // Block until ICE Gathering is complete, disabling trickle ICE
  141. // we do this because we only can exchange one signaling message
  142. // in a production application you should exchange ICE Candidates via OnICECandidate
  143. <-gatherComplete
  144. // Output the answer in base64 so we can paste it in browser
  145. fmt.Println(signal.Encode(*peerConnection.LocalDescription()))
  146. // Open a IVF file and start reading using our IVFReader
  147. file, err := os.Open(qualityLevels[currentQuality].fileName)
  148. if err != nil {
  149. panic(err)
  150. }
  151. ivf, header, err := ivfreader.NewWith(file)
  152. if err != nil {
  153. panic(err)
  154. }
  155. // Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as.
  156. // This isn't required since the video is timestamped, but we will such much higher loss if we send all at once.
  157. //
  158. // It is important to use a time.Ticker instead of time.Sleep because
  159. // * avoids accumulating skew, just calling time.Sleep didn't compensate for the time spent parsing the data
  160. // * works around latency issues with Sleep (see https://github.com/golang/go/issues/44343)
  161. ticker := time.NewTicker(time.Millisecond * time.Duration((float32(header.TimebaseNumerator)/float32(header.TimebaseDenominator))*1000))
  162. frame := []byte{}
  163. frameHeader := &ivfreader.IVFFrameHeader{}
  164. currentTimestamp := uint64(0)
  165. switchQualityLevel := func(newQualityLevel int) {
  166. fmt.Printf("Switching from %s to %s \n", qualityLevels[currentQuality].fileName, qualityLevels[newQualityLevel].fileName)
  167. currentQuality = newQualityLevel
  168. ivf.ResetReader(setReaderFile(qualityLevels[currentQuality].fileName))
  169. for {
  170. if frame, frameHeader, err = ivf.ParseNextFrame(); err != nil {
  171. break
  172. } else if frameHeader.Timestamp >= currentTimestamp && frame[0]&0x1 == 0 {
  173. break
  174. }
  175. }
  176. }
  177. for ; true; <-ticker.C {
  178. targetBitrate := estimator.GetTargetBitrate()
  179. switch {
  180. // If current quality level is below target bitrate drop to level below
  181. case currentQuality != 0 && targetBitrate < qualityLevels[currentQuality].bitrate:
  182. switchQualityLevel(currentQuality - 1)
  183. // If next quality level is above target bitrate move to next level
  184. case len(qualityLevels) > (currentQuality+1) && targetBitrate > qualityLevels[currentQuality+1].bitrate:
  185. switchQualityLevel(currentQuality + 1)
  186. // Adjust outbound bandwidth for probing
  187. default:
  188. frame, _, err = ivf.ParseNextFrame()
  189. }
  190. switch {
  191. // If we have reached the end of the file start again
  192. case errors.Is(err, io.EOF):
  193. ivf.ResetReader(setReaderFile(qualityLevels[currentQuality].fileName))
  194. // No error write the video frame
  195. case err == nil:
  196. currentTimestamp = frameHeader.Timestamp
  197. if err = videoTrack.WriteSample(media.Sample{Data: frame, Duration: time.Second}); err != nil {
  198. panic(err)
  199. }
  200. // Error besides io.EOF that we dont know how to handle
  201. default:
  202. panic(err)
  203. }
  204. }
  205. }
  206. func setReaderFile(filename string) func(_ int64) io.Reader {
  207. return func(_ int64) io.Reader {
  208. file, err := os.Open(filename) // nolint
  209. if err != nil {
  210. panic(err)
  211. }
  212. if _, err = file.Seek(ivfHeaderSize, io.SeekStart); err != nil {
  213. panic(err)
  214. }
  215. return file
  216. }
  217. }