peerconnection_media_test.go 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. //go:build !js
  4. // +build !js
  5. package webrtc
  6. import (
  7. "bufio"
  8. "bytes"
  9. "context"
  10. "errors"
  11. "fmt"
  12. "io"
  13. "regexp"
  14. "strings"
  15. "sync"
  16. "sync/atomic"
  17. "testing"
  18. "time"
  19. "github.com/pion/logging"
  20. "github.com/pion/randutil"
  21. "github.com/pion/rtcp"
  22. "github.com/pion/rtp"
  23. "github.com/pion/sdp/v3"
  24. "github.com/pion/transport/v2/test"
  25. "github.com/pion/webrtc/v3/internal/util"
  26. "github.com/pion/webrtc/v3/pkg/media"
  27. "github.com/stretchr/testify/assert"
  28. "github.com/stretchr/testify/require"
  29. )
  30. var (
  31. errIncomingTrackIDInvalid = errors.New("incoming Track ID is invalid")
  32. errIncomingTrackLabelInvalid = errors.New("incoming Track Label is invalid")
  33. errNoTransceiverwithMid = errors.New("no transceiver with mid")
  34. )
  35. func registerSimulcastHeaderExtensions(m *MediaEngine, codecType RTPCodecType) {
  36. for _, extension := range []string{
  37. sdp.SDESMidURI,
  38. sdp.SDESRTPStreamIDURI,
  39. sdesRepairRTPStreamIDURI,
  40. } {
  41. if err := m.RegisterHeaderExtension(RTPHeaderExtensionCapability{URI: extension}, codecType); err != nil {
  42. panic(err)
  43. }
  44. }
  45. }
  46. /*
  47. Integration test for bi-directional peers
  48. This asserts we can send RTP and RTCP both ways, and blocks until
  49. each side gets something (and asserts payload contents)
  50. */
  51. // nolint: gocyclo
  52. func TestPeerConnection_Media_Sample(t *testing.T) {
  53. const (
  54. expectedTrackID = "video"
  55. expectedStreamID = "pion"
  56. )
  57. lim := test.TimeOut(time.Second * 30)
  58. defer lim.Stop()
  59. report := test.CheckRoutines(t)
  60. defer report()
  61. pcOffer, pcAnswer, err := newPair()
  62. if err != nil {
  63. t.Fatal(err)
  64. }
  65. awaitRTPRecv := make(chan bool)
  66. awaitRTPRecvClosed := make(chan bool)
  67. awaitRTPSend := make(chan bool)
  68. awaitRTCPSenderRecv := make(chan bool)
  69. awaitRTCPSenderSend := make(chan error)
  70. awaitRTCPReceiverRecv := make(chan error)
  71. awaitRTCPReceiverSend := make(chan error)
  72. trackMetadataValid := make(chan error)
  73. pcAnswer.OnTrack(func(track *TrackRemote, receiver *RTPReceiver) {
  74. if track.ID() != expectedTrackID {
  75. trackMetadataValid <- fmt.Errorf("%w: expected(%s) actual(%s)", errIncomingTrackIDInvalid, expectedTrackID, track.ID())
  76. return
  77. }
  78. if track.StreamID() != expectedStreamID {
  79. trackMetadataValid <- fmt.Errorf("%w: expected(%s) actual(%s)", errIncomingTrackLabelInvalid, expectedStreamID, track.StreamID())
  80. return
  81. }
  82. close(trackMetadataValid)
  83. go func() {
  84. for {
  85. time.Sleep(time.Millisecond * 100)
  86. if routineErr := pcAnswer.WriteRTCP([]rtcp.Packet{&rtcp.RapidResynchronizationRequest{SenderSSRC: uint32(track.SSRC()), MediaSSRC: uint32(track.SSRC())}}); routineErr != nil {
  87. awaitRTCPReceiverSend <- routineErr
  88. return
  89. }
  90. select {
  91. case <-awaitRTCPSenderRecv:
  92. close(awaitRTCPReceiverSend)
  93. return
  94. default:
  95. }
  96. }
  97. }()
  98. go func() {
  99. _, _, routineErr := receiver.Read(make([]byte, 1400))
  100. if routineErr != nil {
  101. awaitRTCPReceiverRecv <- routineErr
  102. } else {
  103. close(awaitRTCPReceiverRecv)
  104. }
  105. }()
  106. haveClosedAwaitRTPRecv := false
  107. for {
  108. p, _, routineErr := track.ReadRTP()
  109. if routineErr != nil {
  110. close(awaitRTPRecvClosed)
  111. return
  112. } else if bytes.Equal(p.Payload, []byte{0x10, 0x00}) && !haveClosedAwaitRTPRecv {
  113. haveClosedAwaitRTPRecv = true
  114. close(awaitRTPRecv)
  115. }
  116. }
  117. })
  118. vp8Track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, expectedTrackID, expectedStreamID)
  119. if err != nil {
  120. t.Fatal(err)
  121. }
  122. sender, err := pcOffer.AddTrack(vp8Track)
  123. if err != nil {
  124. t.Fatal(err)
  125. }
  126. go func() {
  127. for {
  128. time.Sleep(time.Millisecond * 100)
  129. if pcOffer.ICEConnectionState() != ICEConnectionStateConnected {
  130. continue
  131. }
  132. if routineErr := vp8Track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}); routineErr != nil {
  133. fmt.Println(routineErr)
  134. }
  135. select {
  136. case <-awaitRTPRecv:
  137. close(awaitRTPSend)
  138. return
  139. default:
  140. }
  141. }
  142. }()
  143. go func() {
  144. for {
  145. time.Sleep(time.Millisecond * 100)
  146. if routineErr := pcOffer.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{SenderSSRC: uint32(sender.trackEncodings[0].ssrc), MediaSSRC: uint32(sender.trackEncodings[0].ssrc)}}); routineErr != nil {
  147. awaitRTCPSenderSend <- routineErr
  148. }
  149. select {
  150. case <-awaitRTCPReceiverRecv:
  151. close(awaitRTCPSenderSend)
  152. return
  153. default:
  154. }
  155. }
  156. }()
  157. go func() {
  158. if _, _, routineErr := sender.Read(make([]byte, 1400)); routineErr == nil {
  159. close(awaitRTCPSenderRecv)
  160. }
  161. }()
  162. assert.NoError(t, signalPair(pcOffer, pcAnswer))
  163. err, ok := <-trackMetadataValid
  164. if ok {
  165. t.Fatal(err)
  166. }
  167. <-awaitRTPRecv
  168. <-awaitRTPSend
  169. <-awaitRTCPSenderRecv
  170. if err, ok = <-awaitRTCPSenderSend; ok {
  171. t.Fatal(err)
  172. }
  173. <-awaitRTCPReceiverRecv
  174. if err, ok = <-awaitRTCPReceiverSend; ok {
  175. t.Fatal(err)
  176. }
  177. closePairNow(t, pcOffer, pcAnswer)
  178. <-awaitRTPRecvClosed
  179. }
  180. /*
  181. PeerConnection should be able to be torn down at anytime
  182. This test adds an input track and asserts
  183. * OnTrack doesn't fire since no video packets will arrive
  184. * No goroutine leaks
  185. * No deadlocks on shutdown
  186. */
  187. func TestPeerConnection_Media_Shutdown(t *testing.T) {
  188. iceCompleteAnswer := make(chan struct{})
  189. iceCompleteOffer := make(chan struct{})
  190. lim := test.TimeOut(time.Second * 30)
  191. defer lim.Stop()
  192. report := test.CheckRoutines(t)
  193. defer report()
  194. pcOffer, pcAnswer, err := newPair()
  195. if err != nil {
  196. t.Fatal(err)
  197. }
  198. _, err = pcOffer.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly})
  199. if err != nil {
  200. t.Fatal(err)
  201. }
  202. _, err = pcAnswer.AddTransceiverFromKind(RTPCodecTypeAudio, RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly})
  203. if err != nil {
  204. t.Fatal(err)
  205. }
  206. opusTrack, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeOpus}, "audio", "pion1")
  207. if err != nil {
  208. t.Fatal(err)
  209. }
  210. vp8Track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
  211. if err != nil {
  212. t.Fatal(err)
  213. }
  214. if _, err = pcOffer.AddTrack(opusTrack); err != nil {
  215. t.Fatal(err)
  216. } else if _, err = pcAnswer.AddTrack(vp8Track); err != nil {
  217. t.Fatal(err)
  218. }
  219. var onTrackFiredLock sync.Mutex
  220. onTrackFired := false
  221. pcAnswer.OnTrack(func(track *TrackRemote, receiver *RTPReceiver) {
  222. onTrackFiredLock.Lock()
  223. defer onTrackFiredLock.Unlock()
  224. onTrackFired = true
  225. })
  226. pcAnswer.OnICEConnectionStateChange(func(iceState ICEConnectionState) {
  227. if iceState == ICEConnectionStateConnected {
  228. close(iceCompleteAnswer)
  229. }
  230. })
  231. pcOffer.OnICEConnectionStateChange(func(iceState ICEConnectionState) {
  232. if iceState == ICEConnectionStateConnected {
  233. close(iceCompleteOffer)
  234. }
  235. })
  236. err = signalPair(pcOffer, pcAnswer)
  237. if err != nil {
  238. t.Fatal(err)
  239. }
  240. <-iceCompleteAnswer
  241. <-iceCompleteOffer
  242. // Each PeerConnection should have one sender, one receiver and one transceiver
  243. for _, pc := range []*PeerConnection{pcOffer, pcAnswer} {
  244. senders := pc.GetSenders()
  245. if len(senders) != 1 {
  246. t.Errorf("Each PeerConnection should have one RTPSender, we have %d", len(senders))
  247. }
  248. receivers := pc.GetReceivers()
  249. if len(receivers) != 2 {
  250. t.Errorf("Each PeerConnection should have two RTPReceivers, we have %d", len(receivers))
  251. }
  252. transceivers := pc.GetTransceivers()
  253. if len(transceivers) != 2 {
  254. t.Errorf("Each PeerConnection should have two RTPTransceivers, we have %d", len(transceivers))
  255. }
  256. }
  257. closePairNow(t, pcOffer, pcAnswer)
  258. onTrackFiredLock.Lock()
  259. if onTrackFired {
  260. t.Fatalf("PeerConnection OnTrack fired even though we got no packets")
  261. }
  262. onTrackFiredLock.Unlock()
  263. }
  264. /*
  265. Integration test for behavior around media and disconnected peers
  266. * Sending RTP and RTCP to a disconnected Peer shouldn't return an error
  267. */
  268. func TestPeerConnection_Media_Disconnected(t *testing.T) {
  269. lim := test.TimeOut(time.Second * 30)
  270. defer lim.Stop()
  271. report := test.CheckRoutines(t)
  272. defer report()
  273. s := SettingEngine{}
  274. s.SetICETimeouts(time.Second/2, time.Second/2, time.Second/8)
  275. m := &MediaEngine{}
  276. assert.NoError(t, m.RegisterDefaultCodecs())
  277. pcOffer, pcAnswer, err := NewAPI(WithSettingEngine(s), WithMediaEngine(m)).newPair(Configuration{})
  278. if err != nil {
  279. t.Fatal(err)
  280. }
  281. vp8Track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
  282. if err != nil {
  283. t.Fatal(err)
  284. }
  285. vp8Sender, err := pcOffer.AddTrack(vp8Track)
  286. if err != nil {
  287. t.Fatal(err)
  288. }
  289. haveDisconnected := make(chan error)
  290. pcOffer.OnICEConnectionStateChange(func(iceState ICEConnectionState) {
  291. if iceState == ICEConnectionStateDisconnected {
  292. close(haveDisconnected)
  293. } else if iceState == ICEConnectionStateConnected {
  294. // Assert that DTLS is done by pull remote certificate, don't tear down the PC early
  295. for {
  296. if len(vp8Sender.Transport().GetRemoteCertificate()) != 0 {
  297. if pcAnswer.sctpTransport.association() != nil {
  298. break
  299. }
  300. }
  301. time.Sleep(time.Second)
  302. }
  303. if pcCloseErr := pcAnswer.Close(); pcCloseErr != nil {
  304. haveDisconnected <- pcCloseErr
  305. }
  306. }
  307. })
  308. err = signalPair(pcOffer, pcAnswer)
  309. if err != nil {
  310. t.Fatal(err)
  311. }
  312. err, ok := <-haveDisconnected
  313. if ok {
  314. t.Fatal(err)
  315. }
  316. for i := 0; i <= 5; i++ {
  317. if rtpErr := vp8Track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}); rtpErr != nil {
  318. t.Fatal(rtpErr)
  319. } else if rtcpErr := pcOffer.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: 0}}); rtcpErr != nil {
  320. t.Fatal(rtcpErr)
  321. }
  322. }
  323. assert.NoError(t, pcOffer.Close())
  324. }
  325. type undeclaredSsrcLogger struct{ unhandledSimulcastError chan struct{} }
  326. func (u *undeclaredSsrcLogger) Trace(string) {}
  327. func (u *undeclaredSsrcLogger) Tracef(string, ...interface{}) {}
  328. func (u *undeclaredSsrcLogger) Debug(string) {}
  329. func (u *undeclaredSsrcLogger) Debugf(string, ...interface{}) {}
  330. func (u *undeclaredSsrcLogger) Info(string) {}
  331. func (u *undeclaredSsrcLogger) Infof(string, ...interface{}) {}
  332. func (u *undeclaredSsrcLogger) Warn(string) {}
  333. func (u *undeclaredSsrcLogger) Warnf(string, ...interface{}) {}
  334. func (u *undeclaredSsrcLogger) Error(string) {}
  335. func (u *undeclaredSsrcLogger) Errorf(format string, _ ...interface{}) {
  336. if format == incomingUnhandledRTPSsrc {
  337. close(u.unhandledSimulcastError)
  338. }
  339. }
  340. type undeclaredSsrcLoggerFactory struct{ unhandledSimulcastError chan struct{} }
  341. func (u *undeclaredSsrcLoggerFactory) NewLogger(string) logging.LeveledLogger {
  342. return &undeclaredSsrcLogger{u.unhandledSimulcastError}
  343. }
  344. // Filter SSRC lines
  345. func filterSsrc(offer string) (filteredSDP string) {
  346. scanner := bufio.NewScanner(strings.NewReader(offer))
  347. for scanner.Scan() {
  348. l := scanner.Text()
  349. if strings.HasPrefix(l, "a=ssrc") {
  350. continue
  351. }
  352. filteredSDP += l + "\n"
  353. }
  354. return
  355. }
  356. // If a SessionDescription has a single media section and no SSRC
  357. // assume that it is meant to handle all RTP packets
  358. func TestUndeclaredSSRC(t *testing.T) {
  359. lim := test.TimeOut(time.Second * 30)
  360. defer lim.Stop()
  361. report := test.CheckRoutines(t)
  362. defer report()
  363. t.Run("No SSRC", func(t *testing.T) {
  364. pcOffer, pcAnswer, err := newPair()
  365. assert.NoError(t, err)
  366. vp8Writer, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
  367. assert.NoError(t, err)
  368. _, err = pcOffer.AddTrack(vp8Writer)
  369. assert.NoError(t, err)
  370. onTrackFired := make(chan struct{})
  371. pcAnswer.OnTrack(func(trackRemote *TrackRemote, r *RTPReceiver) {
  372. assert.Equal(t, trackRemote.StreamID(), vp8Writer.StreamID())
  373. assert.Equal(t, trackRemote.ID(), vp8Writer.ID())
  374. close(onTrackFired)
  375. })
  376. offer, err := pcOffer.CreateOffer(nil)
  377. assert.NoError(t, err)
  378. offerGatheringComplete := GatheringCompletePromise(pcOffer)
  379. assert.NoError(t, pcOffer.SetLocalDescription(offer))
  380. <-offerGatheringComplete
  381. offer.SDP = filterSsrc(pcOffer.LocalDescription().SDP)
  382. assert.NoError(t, pcAnswer.SetRemoteDescription(offer))
  383. answer, err := pcAnswer.CreateAnswer(nil)
  384. assert.NoError(t, err)
  385. answerGatheringComplete := GatheringCompletePromise(pcAnswer)
  386. assert.NoError(t, pcAnswer.SetLocalDescription(answer))
  387. <-answerGatheringComplete
  388. assert.NoError(t, pcOffer.SetRemoteDescription(*pcAnswer.LocalDescription()))
  389. sendVideoUntilDone(onTrackFired, t, []*TrackLocalStaticSample{vp8Writer})
  390. closePairNow(t, pcOffer, pcAnswer)
  391. })
  392. t.Run("Has RID", func(t *testing.T) {
  393. unhandledSimulcastError := make(chan struct{})
  394. m := &MediaEngine{}
  395. assert.NoError(t, m.RegisterDefaultCodecs())
  396. pcOffer, pcAnswer, err := NewAPI(WithSettingEngine(SettingEngine{
  397. LoggerFactory: &undeclaredSsrcLoggerFactory{unhandledSimulcastError},
  398. }), WithMediaEngine(m)).newPair(Configuration{})
  399. assert.NoError(t, err)
  400. vp8Writer, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
  401. assert.NoError(t, err)
  402. _, err = pcOffer.AddTrack(vp8Writer)
  403. assert.NoError(t, err)
  404. offer, err := pcOffer.CreateOffer(nil)
  405. assert.NoError(t, err)
  406. offerGatheringComplete := GatheringCompletePromise(pcOffer)
  407. assert.NoError(t, pcOffer.SetLocalDescription(offer))
  408. <-offerGatheringComplete
  409. // Append RID to end of SessionDescription. Will not be considered unhandled anymore
  410. offer.SDP = filterSsrc(pcOffer.LocalDescription().SDP) + "a=" + sdpAttributeRid + "\r\n"
  411. assert.NoError(t, pcAnswer.SetRemoteDescription(offer))
  412. answer, err := pcAnswer.CreateAnswer(nil)
  413. assert.NoError(t, err)
  414. answerGatheringComplete := GatheringCompletePromise(pcAnswer)
  415. assert.NoError(t, pcAnswer.SetLocalDescription(answer))
  416. <-answerGatheringComplete
  417. assert.NoError(t, pcOffer.SetRemoteDescription(*pcAnswer.LocalDescription()))
  418. sendVideoUntilDone(unhandledSimulcastError, t, []*TrackLocalStaticSample{vp8Writer})
  419. closePairNow(t, pcOffer, pcAnswer)
  420. })
  421. }
  422. func TestAddTransceiverFromTrackSendOnly(t *testing.T) {
  423. lim := test.TimeOut(time.Second * 30)
  424. defer lim.Stop()
  425. report := test.CheckRoutines(t)
  426. defer report()
  427. pc, err := NewPeerConnection(Configuration{})
  428. if err != nil {
  429. t.Error(err.Error())
  430. }
  431. track, err := NewTrackLocalStaticSample(
  432. RTPCodecCapability{MimeType: "audio/Opus"},
  433. "track-id",
  434. "stream-id",
  435. )
  436. if err != nil {
  437. t.Error(err.Error())
  438. }
  439. transceiver, err := pc.AddTransceiverFromTrack(track, RTPTransceiverInit{
  440. Direction: RTPTransceiverDirectionSendonly,
  441. })
  442. if err != nil {
  443. t.Error(err.Error())
  444. }
  445. if transceiver.Receiver() != nil {
  446. t.Errorf("Transceiver shouldn't have a receiver")
  447. }
  448. if transceiver.Sender() == nil {
  449. t.Errorf("Transceiver should have a sender")
  450. }
  451. if len(pc.GetTransceivers()) != 1 {
  452. t.Errorf("PeerConnection should have one transceiver but has %d", len(pc.GetTransceivers()))
  453. }
  454. if len(pc.GetSenders()) != 1 {
  455. t.Errorf("PeerConnection should have one sender but has %d", len(pc.GetSenders()))
  456. }
  457. offer, err := pc.CreateOffer(nil)
  458. if err != nil {
  459. t.Error(err.Error())
  460. }
  461. if !offerMediaHasDirection(offer, RTPCodecTypeAudio, RTPTransceiverDirectionSendonly) {
  462. t.Errorf("Direction on SDP is not %s", RTPTransceiverDirectionSendonly)
  463. }
  464. assert.NoError(t, pc.Close())
  465. }
  466. func TestAddTransceiverFromTrackSendRecv(t *testing.T) {
  467. lim := test.TimeOut(time.Second * 30)
  468. defer lim.Stop()
  469. report := test.CheckRoutines(t)
  470. defer report()
  471. pc, err := NewPeerConnection(Configuration{})
  472. if err != nil {
  473. t.Error(err.Error())
  474. }
  475. track, err := NewTrackLocalStaticSample(
  476. RTPCodecCapability{MimeType: "audio/Opus"},
  477. "track-id",
  478. "stream-id",
  479. )
  480. if err != nil {
  481. t.Error(err.Error())
  482. }
  483. transceiver, err := pc.AddTransceiverFromTrack(track, RTPTransceiverInit{
  484. Direction: RTPTransceiverDirectionSendrecv,
  485. })
  486. if err != nil {
  487. t.Error(err.Error())
  488. }
  489. if transceiver.Receiver() == nil {
  490. t.Errorf("Transceiver should have a receiver")
  491. }
  492. if transceiver.Sender() == nil {
  493. t.Errorf("Transceiver should have a sender")
  494. }
  495. if len(pc.GetTransceivers()) != 1 {
  496. t.Errorf("PeerConnection should have one transceiver but has %d", len(pc.GetTransceivers()))
  497. }
  498. offer, err := pc.CreateOffer(nil)
  499. if err != nil {
  500. t.Error(err.Error())
  501. }
  502. if !offerMediaHasDirection(offer, RTPCodecTypeAudio, RTPTransceiverDirectionSendrecv) {
  503. t.Errorf("Direction on SDP is not %s", RTPTransceiverDirectionSendrecv)
  504. }
  505. assert.NoError(t, pc.Close())
  506. }
  507. func TestAddTransceiverAddTrack_Reuse(t *testing.T) {
  508. pc, err := NewPeerConnection(Configuration{})
  509. assert.NoError(t, err)
  510. tr, err := pc.AddTransceiverFromKind(
  511. RTPCodecTypeVideo,
  512. RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly},
  513. )
  514. assert.NoError(t, err)
  515. assert.Equal(t, []*RTPTransceiver{tr}, pc.GetTransceivers())
  516. addTrack := func() (TrackLocal, *RTPSender) {
  517. track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "foo", "bar")
  518. assert.NoError(t, err)
  519. sender, err := pc.AddTrack(track)
  520. assert.NoError(t, err)
  521. return track, sender
  522. }
  523. track1, sender1 := addTrack()
  524. assert.Equal(t, 1, len(pc.GetTransceivers()))
  525. assert.Equal(t, sender1, tr.Sender())
  526. assert.Equal(t, track1, tr.Sender().Track())
  527. require.NoError(t, pc.RemoveTrack(sender1))
  528. track2, _ := addTrack()
  529. assert.Equal(t, 1, len(pc.GetTransceivers()))
  530. assert.Equal(t, track2, tr.Sender().Track())
  531. addTrack()
  532. assert.Equal(t, 2, len(pc.GetTransceivers()))
  533. assert.NoError(t, pc.Close())
  534. }
  535. func TestAddTransceiverAddTrack_NewRTPSender_Error(t *testing.T) {
  536. pc, err := NewPeerConnection(Configuration{})
  537. assert.NoError(t, err)
  538. _, err = pc.AddTransceiverFromKind(
  539. RTPCodecTypeVideo,
  540. RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly},
  541. )
  542. assert.NoError(t, err)
  543. dtlsTransport := pc.dtlsTransport
  544. pc.dtlsTransport = nil
  545. track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "foo", "bar")
  546. assert.NoError(t, err)
  547. _, err = pc.AddTrack(track)
  548. assert.Error(t, err, "DTLSTransport must not be nil")
  549. assert.Equal(t, 1, len(pc.GetTransceivers()))
  550. pc.dtlsTransport = dtlsTransport
  551. assert.NoError(t, pc.Close())
  552. }
  553. func TestRtpSenderReceiver_ReadClose_Error(t *testing.T) {
  554. pc, err := NewPeerConnection(Configuration{})
  555. assert.NoError(t, err)
  556. tr, err := pc.AddTransceiverFromKind(
  557. RTPCodecTypeVideo,
  558. RTPTransceiverInit{Direction: RTPTransceiverDirectionSendrecv},
  559. )
  560. assert.NoError(t, err)
  561. sender, receiver := tr.Sender(), tr.Receiver()
  562. assert.NoError(t, sender.Stop())
  563. _, _, err = sender.Read(make([]byte, 0, 1400))
  564. assert.ErrorIs(t, err, io.ErrClosedPipe)
  565. assert.NoError(t, receiver.Stop())
  566. _, _, err = receiver.Read(make([]byte, 0, 1400))
  567. assert.ErrorIs(t, err, io.ErrClosedPipe)
  568. assert.NoError(t, pc.Close())
  569. }
  570. // nolint: dupl
  571. func TestAddTransceiverFromKind(t *testing.T) {
  572. lim := test.TimeOut(time.Second * 30)
  573. defer lim.Stop()
  574. report := test.CheckRoutines(t)
  575. defer report()
  576. pc, err := NewPeerConnection(Configuration{})
  577. if err != nil {
  578. t.Error(err.Error())
  579. }
  580. transceiver, err := pc.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{
  581. Direction: RTPTransceiverDirectionRecvonly,
  582. })
  583. if err != nil {
  584. t.Error(err.Error())
  585. }
  586. if transceiver.Receiver() == nil {
  587. t.Errorf("Transceiver should have a receiver")
  588. }
  589. if transceiver.Sender() != nil {
  590. t.Errorf("Transceiver shouldn't have a sender")
  591. }
  592. offer, err := pc.CreateOffer(nil)
  593. if err != nil {
  594. t.Error(err.Error())
  595. }
  596. if !offerMediaHasDirection(offer, RTPCodecTypeVideo, RTPTransceiverDirectionRecvonly) {
  597. t.Errorf("Direction on SDP is not %s", RTPTransceiverDirectionRecvonly)
  598. }
  599. assert.NoError(t, pc.Close())
  600. }
  601. func TestAddTransceiverFromTrackFailsRecvOnly(t *testing.T) {
  602. lim := test.TimeOut(time.Second * 30)
  603. defer lim.Stop()
  604. report := test.CheckRoutines(t)
  605. defer report()
  606. pc, err := NewPeerConnection(Configuration{})
  607. if err != nil {
  608. t.Error(err.Error())
  609. }
  610. track, err := NewTrackLocalStaticSample(
  611. RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f"},
  612. "track-id",
  613. "track-label",
  614. )
  615. if err != nil {
  616. t.Error(err.Error())
  617. }
  618. transceiver, err := pc.AddTransceiverFromTrack(track, RTPTransceiverInit{
  619. Direction: RTPTransceiverDirectionRecvonly,
  620. })
  621. if transceiver != nil {
  622. t.Error("AddTransceiverFromTrack shouldn't succeed with Direction RTPTransceiverDirectionRecvonly")
  623. }
  624. assert.NotNil(t, err)
  625. assert.NoError(t, pc.Close())
  626. }
  627. func TestPlanBMediaExchange(t *testing.T) {
  628. runTest := func(trackCount int, t *testing.T) {
  629. addSingleTrack := func(p *PeerConnection) *TrackLocalStaticSample {
  630. track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, fmt.Sprintf("video-%d", randutil.NewMathRandomGenerator().Uint32()), fmt.Sprintf("video-%d", randutil.NewMathRandomGenerator().Uint32()))
  631. assert.NoError(t, err)
  632. _, err = p.AddTrack(track)
  633. assert.NoError(t, err)
  634. return track
  635. }
  636. pcOffer, err := NewPeerConnection(Configuration{SDPSemantics: SDPSemanticsPlanB})
  637. assert.NoError(t, err)
  638. pcAnswer, err := NewPeerConnection(Configuration{SDPSemantics: SDPSemanticsPlanB})
  639. assert.NoError(t, err)
  640. var onTrackWaitGroup sync.WaitGroup
  641. onTrackWaitGroup.Add(trackCount)
  642. pcAnswer.OnTrack(func(track *TrackRemote, r *RTPReceiver) {
  643. onTrackWaitGroup.Done()
  644. })
  645. done := make(chan struct{})
  646. go func() {
  647. onTrackWaitGroup.Wait()
  648. close(done)
  649. }()
  650. _, err = pcAnswer.AddTransceiverFromKind(RTPCodecTypeVideo)
  651. assert.NoError(t, err)
  652. outboundTracks := []*TrackLocalStaticSample{}
  653. for i := 0; i < trackCount; i++ {
  654. outboundTracks = append(outboundTracks, addSingleTrack(pcOffer))
  655. }
  656. assert.NoError(t, signalPair(pcOffer, pcAnswer))
  657. func() {
  658. for {
  659. select {
  660. case <-time.After(20 * time.Millisecond):
  661. for _, track := range outboundTracks {
  662. assert.NoError(t, track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}))
  663. }
  664. case <-done:
  665. return
  666. }
  667. }
  668. }()
  669. closePairNow(t, pcOffer, pcAnswer)
  670. }
  671. lim := test.TimeOut(time.Second * 30)
  672. defer lim.Stop()
  673. report := test.CheckRoutines(t)
  674. defer report()
  675. t.Run("Single Track", func(t *testing.T) {
  676. runTest(1, t)
  677. })
  678. t.Run("Multi Track", func(t *testing.T) {
  679. runTest(2, t)
  680. })
  681. }
  682. // TestPeerConnection_Start_Only_Negotiated_Senders tests that only
  683. // the current negotiated transceivers senders provided in an
  684. // offer/answer are started
  685. func TestPeerConnection_Start_Only_Negotiated_Senders(t *testing.T) {
  686. lim := test.TimeOut(time.Second * 30)
  687. defer lim.Stop()
  688. report := test.CheckRoutines(t)
  689. defer report()
  690. pcOffer, err := NewPeerConnection(Configuration{})
  691. assert.NoError(t, err)
  692. defer func() { assert.NoError(t, pcOffer.Close()) }()
  693. pcAnswer, err := NewPeerConnection(Configuration{})
  694. assert.NoError(t, err)
  695. defer func() { assert.NoError(t, pcAnswer.Close()) }()
  696. track1, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion1")
  697. require.NoError(t, err)
  698. sender1, err := pcOffer.AddTrack(track1)
  699. require.NoError(t, err)
  700. offer, err := pcOffer.CreateOffer(nil)
  701. assert.NoError(t, err)
  702. offerGatheringComplete := GatheringCompletePromise(pcOffer)
  703. assert.NoError(t, pcOffer.SetLocalDescription(offer))
  704. <-offerGatheringComplete
  705. assert.NoError(t, pcAnswer.SetRemoteDescription(*pcOffer.LocalDescription()))
  706. answer, err := pcAnswer.CreateAnswer(nil)
  707. assert.NoError(t, err)
  708. answerGatheringComplete := GatheringCompletePromise(pcAnswer)
  709. assert.NoError(t, pcAnswer.SetLocalDescription(answer))
  710. <-answerGatheringComplete
  711. // Add a new track between providing the offer and applying the answer
  712. track2, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
  713. require.NoError(t, err)
  714. sender2, err := pcOffer.AddTrack(track2)
  715. require.NoError(t, err)
  716. // apply answer so we'll test generateMatchedSDP
  717. assert.NoError(t, pcOffer.SetRemoteDescription(*pcAnswer.LocalDescription()))
  718. // Wait for senders to be started by startTransports spawned goroutine
  719. pcOffer.ops.Done()
  720. // sender1 should be started but sender2 should not be started
  721. assert.True(t, sender1.hasSent(), "sender1 is not started but should be started")
  722. assert.False(t, sender2.hasSent(), "sender2 is started but should not be started")
  723. }
  724. // TestPeerConnection_Start_Right_Receiver tests that the right
  725. // receiver (the receiver which transceiver has the same media section as the track)
  726. // is started for the specified track
  727. func TestPeerConnection_Start_Right_Receiver(t *testing.T) {
  728. isTransceiverReceiverStarted := func(pc *PeerConnection, mid string) (bool, error) {
  729. for _, transceiver := range pc.GetTransceivers() {
  730. if transceiver.Mid() != mid {
  731. continue
  732. }
  733. return transceiver.Receiver() != nil && transceiver.Receiver().haveReceived(), nil
  734. }
  735. return false, fmt.Errorf("%w: %q", errNoTransceiverwithMid, mid)
  736. }
  737. lim := test.TimeOut(time.Second * 30)
  738. defer lim.Stop()
  739. report := test.CheckRoutines(t)
  740. defer report()
  741. pcOffer, pcAnswer, err := newPair()
  742. require.NoError(t, err)
  743. _, err = pcAnswer.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly})
  744. assert.NoError(t, err)
  745. track1, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion1")
  746. require.NoError(t, err)
  747. sender1, err := pcOffer.AddTrack(track1)
  748. require.NoError(t, err)
  749. assert.NoError(t, signalPair(pcOffer, pcAnswer))
  750. pcOffer.ops.Done()
  751. pcAnswer.ops.Done()
  752. // transceiver with mid 0 should be started
  753. started, err := isTransceiverReceiverStarted(pcAnswer, "0")
  754. assert.NoError(t, err)
  755. assert.True(t, started, "transceiver with mid 0 should be started")
  756. // Remove track
  757. assert.NoError(t, pcOffer.RemoveTrack(sender1))
  758. assert.NoError(t, signalPair(pcOffer, pcAnswer))
  759. pcOffer.ops.Done()
  760. pcAnswer.ops.Done()
  761. // transceiver with mid 0 should not be started
  762. started, err = isTransceiverReceiverStarted(pcAnswer, "0")
  763. assert.NoError(t, err)
  764. assert.False(t, started, "transceiver with mid 0 should not be started")
  765. // Add a new transceiver (we're not using AddTrack since it'll reuse the transceiver with mid 0)
  766. _, err = pcOffer.AddTransceiverFromTrack(track1)
  767. assert.NoError(t, err)
  768. _, err = pcAnswer.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly})
  769. assert.NoError(t, err)
  770. assert.NoError(t, signalPair(pcOffer, pcAnswer))
  771. pcOffer.ops.Done()
  772. pcAnswer.ops.Done()
  773. // transceiver with mid 0 should not be started
  774. started, err = isTransceiverReceiverStarted(pcAnswer, "0")
  775. assert.NoError(t, err)
  776. assert.False(t, started, "transceiver with mid 0 should not be started")
  777. // transceiver with mid 2 should be started
  778. started, err = isTransceiverReceiverStarted(pcAnswer, "2")
  779. assert.NoError(t, err)
  780. assert.True(t, started, "transceiver with mid 2 should be started")
  781. closePairNow(t, pcOffer, pcAnswer)
  782. }
  783. func TestPeerConnection_Simulcast_Probe(t *testing.T) {
  784. lim := test.TimeOut(time.Second * 30) //nolint
  785. defer lim.Stop()
  786. report := test.CheckRoutines(t)
  787. defer report()
  788. // Assert that failed Simulcast probing doesn't cause
  789. // the handleUndeclaredSSRC to be leaked
  790. t.Run("Leak", func(t *testing.T) {
  791. track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
  792. assert.NoError(t, err)
  793. offerer, answerer, err := newPair()
  794. assert.NoError(t, err)
  795. _, err = offerer.AddTrack(track)
  796. assert.NoError(t, err)
  797. ticker := time.NewTicker(time.Millisecond * 20)
  798. testFinished := make(chan struct{})
  799. seenFiveStreams, seenFiveStreamsCancel := context.WithCancel(context.Background())
  800. go func() {
  801. for {
  802. select {
  803. case <-testFinished:
  804. return
  805. case <-ticker.C:
  806. answerer.dtlsTransport.lock.Lock()
  807. if len(answerer.dtlsTransport.simulcastStreams) >= 5 {
  808. seenFiveStreamsCancel()
  809. }
  810. answerer.dtlsTransport.lock.Unlock()
  811. track.mu.Lock()
  812. if len(track.bindings) == 1 {
  813. _, err = track.bindings[0].writeStream.WriteRTP(&rtp.Header{
  814. Version: 2,
  815. SSRC: randutil.NewMathRandomGenerator().Uint32(),
  816. }, []byte{0, 1, 2, 3, 4, 5})
  817. assert.NoError(t, err)
  818. }
  819. track.mu.Unlock()
  820. }
  821. }
  822. }()
  823. assert.NoError(t, signalPair(offerer, answerer))
  824. peerConnectionConnected := untilConnectionState(PeerConnectionStateConnected, offerer, answerer)
  825. peerConnectionConnected.Wait()
  826. <-seenFiveStreams.Done()
  827. closePairNow(t, offerer, answerer)
  828. close(testFinished)
  829. })
  830. // Assert that NonSimulcast Traffic isn't incorrectly broken by the probe
  831. t.Run("Break NonSimulcast", func(t *testing.T) {
  832. unhandledSimulcastError := make(chan struct{})
  833. m := &MediaEngine{}
  834. if err := m.RegisterDefaultCodecs(); err != nil {
  835. panic(err)
  836. }
  837. registerSimulcastHeaderExtensions(m, RTPCodecTypeVideo)
  838. pcOffer, pcAnswer, err := NewAPI(WithSettingEngine(SettingEngine{
  839. LoggerFactory: &undeclaredSsrcLoggerFactory{unhandledSimulcastError},
  840. }), WithMediaEngine(m)).newPair(Configuration{})
  841. assert.NoError(t, err)
  842. firstTrack, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "firstTrack", "firstTrack")
  843. assert.NoError(t, err)
  844. _, err = pcOffer.AddTrack(firstTrack)
  845. assert.NoError(t, err)
  846. secondTrack, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "secondTrack", "secondTrack")
  847. assert.NoError(t, err)
  848. _, err = pcOffer.AddTrack(secondTrack)
  849. assert.NoError(t, err)
  850. assert.NoError(t, signalPairWithModification(pcOffer, pcAnswer, func(sessionDescription string) (filtered string) {
  851. shouldDiscard := false
  852. scanner := bufio.NewScanner(strings.NewReader(sessionDescription))
  853. for scanner.Scan() {
  854. if strings.HasPrefix(scanner.Text(), "m=video") {
  855. shouldDiscard = !shouldDiscard
  856. }
  857. if !shouldDiscard {
  858. filtered += scanner.Text() + "\r\n"
  859. }
  860. }
  861. return
  862. }))
  863. sequenceNumber := uint16(0)
  864. sendRTPPacket := func() {
  865. sequenceNumber++
  866. assert.NoError(t, firstTrack.WriteRTP(&rtp.Packet{
  867. Header: rtp.Header{
  868. Version: 2,
  869. SequenceNumber: sequenceNumber,
  870. },
  871. Payload: []byte{0x00},
  872. }))
  873. time.Sleep(20 * time.Millisecond)
  874. }
  875. for ; sequenceNumber <= 5; sequenceNumber++ {
  876. sendRTPPacket()
  877. }
  878. assert.NoError(t, signalPair(pcOffer, pcAnswer))
  879. trackRemoteChan := make(chan *TrackRemote, 1)
  880. pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
  881. trackRemoteChan <- trackRemote
  882. })
  883. trackRemote := func() *TrackRemote {
  884. for {
  885. select {
  886. case t := <-trackRemoteChan:
  887. return t
  888. default:
  889. sendRTPPacket()
  890. }
  891. }
  892. }()
  893. func() {
  894. for {
  895. select {
  896. case <-unhandledSimulcastError:
  897. return
  898. default:
  899. sendRTPPacket()
  900. }
  901. }
  902. }()
  903. _, _, err = trackRemote.Read(make([]byte, 1500))
  904. assert.NoError(t, err)
  905. closePairNow(t, pcOffer, pcAnswer)
  906. })
  907. }
  908. // Assert that CreateOffer returns an error for a RTPSender with no codecs
  909. // pion/webrtc#1702
  910. func TestPeerConnection_CreateOffer_NoCodecs(t *testing.T) {
  911. lim := test.TimeOut(time.Second * 30)
  912. defer lim.Stop()
  913. report := test.CheckRoutines(t)
  914. defer report()
  915. m := &MediaEngine{}
  916. pc, err := NewAPI(WithMediaEngine(m)).NewPeerConnection(Configuration{})
  917. assert.NoError(t, err)
  918. track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
  919. assert.NoError(t, err)
  920. _, err = pc.AddTrack(track)
  921. assert.NoError(t, err)
  922. _, err = pc.CreateOffer(nil)
  923. assert.Equal(t, err, ErrSenderWithNoCodecs)
  924. assert.NoError(t, pc.Close())
  925. }
  926. // Assert that AddTrack is thread-safe
  927. func TestPeerConnection_RaceReplaceTrack(t *testing.T) {
  928. pc, err := NewPeerConnection(Configuration{})
  929. assert.NoError(t, err)
  930. addTrack := func() *TrackLocalStaticSample {
  931. track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "foo", "bar")
  932. assert.NoError(t, err)
  933. _, err = pc.AddTrack(track)
  934. assert.NoError(t, err)
  935. return track
  936. }
  937. for i := 0; i < 10; i++ {
  938. addTrack()
  939. }
  940. for _, tr := range pc.GetTransceivers() {
  941. assert.NoError(t, pc.RemoveTrack(tr.Sender()))
  942. }
  943. var wg sync.WaitGroup
  944. tracks := make([]*TrackLocalStaticSample, 10)
  945. wg.Add(10)
  946. for i := 0; i < 10; i++ {
  947. go func(j int) {
  948. tracks[j] = addTrack()
  949. wg.Done()
  950. }(i)
  951. }
  952. wg.Wait()
  953. for _, track := range tracks {
  954. have := false
  955. for _, t := range pc.GetTransceivers() {
  956. if t.Sender() != nil && t.Sender().Track() == track {
  957. have = true
  958. break
  959. }
  960. }
  961. if !have {
  962. t.Errorf("track was added but not found on senders")
  963. }
  964. }
  965. assert.NoError(t, pc.Close())
  966. }
  967. func TestPeerConnection_Simulcast(t *testing.T) {
  968. lim := test.TimeOut(time.Second * 30)
  969. defer lim.Stop()
  970. report := test.CheckRoutines(t)
  971. defer report()
  972. rids := []string{"a", "b", "c"}
  973. var ridMapLock sync.RWMutex
  974. ridMap := map[string]int{}
  975. // Enable Extension Headers needed for Simulcast
  976. m := &MediaEngine{}
  977. if err := m.RegisterDefaultCodecs(); err != nil {
  978. panic(err)
  979. }
  980. registerSimulcastHeaderExtensions(m, RTPCodecTypeVideo)
  981. assertRidCorrect := func(t *testing.T) {
  982. ridMapLock.Lock()
  983. defer ridMapLock.Unlock()
  984. for _, rid := range rids {
  985. assert.Equal(t, ridMap[rid], 1)
  986. }
  987. assert.Equal(t, len(ridMap), 3)
  988. }
  989. ridsFullfilled := func() bool {
  990. ridMapLock.Lock()
  991. defer ridMapLock.Unlock()
  992. ridCount := len(ridMap)
  993. return ridCount == 3
  994. }
  995. onTrackHandler := func(trackRemote *TrackRemote, _ *RTPReceiver) {
  996. ridMapLock.Lock()
  997. defer ridMapLock.Unlock()
  998. ridMap[trackRemote.RID()] = ridMap[trackRemote.RID()] + 1
  999. }
  1000. t.Run("RTP Extension Based", func(t *testing.T) {
  1001. pcOffer, pcAnswer, err := NewAPI(WithMediaEngine(m)).newPair(Configuration{})
  1002. assert.NoError(t, err)
  1003. vp8WriterA, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID("a"))
  1004. assert.NoError(t, err)
  1005. sender, err := pcOffer.AddTrack(vp8WriterA)
  1006. assert.NoError(t, err)
  1007. assert.NotNil(t, sender)
  1008. vp8WriterB, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID("b"))
  1009. assert.NoError(t, err)
  1010. err = sender.AddEncoding(vp8WriterB)
  1011. assert.NoError(t, err)
  1012. vp8WriterC, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID("c"))
  1013. assert.NoError(t, err)
  1014. err = sender.AddEncoding(vp8WriterC)
  1015. assert.NoError(t, err)
  1016. ridMap = map[string]int{}
  1017. pcAnswer.OnTrack(onTrackHandler)
  1018. parameters := sender.GetParameters()
  1019. assert.Equal(t, "a", parameters.Encodings[0].RID)
  1020. assert.Equal(t, "b", parameters.Encodings[1].RID)
  1021. assert.Equal(t, "c", parameters.Encodings[2].RID)
  1022. var midID, ridID, rsidID uint8
  1023. for _, extension := range parameters.HeaderExtensions {
  1024. switch extension.URI {
  1025. case sdp.SDESMidURI:
  1026. midID = uint8(extension.ID)
  1027. case sdp.SDESRTPStreamIDURI:
  1028. ridID = uint8(extension.ID)
  1029. case sdesRepairRTPStreamIDURI:
  1030. rsidID = uint8(extension.ID)
  1031. }
  1032. }
  1033. assert.NotZero(t, midID)
  1034. assert.NotZero(t, ridID)
  1035. assert.NotZero(t, rsidID)
  1036. assert.NoError(t, signalPair(pcOffer, pcAnswer))
  1037. // padding only packets should not affect simulcast probe
  1038. var sequenceNumber uint16
  1039. for sequenceNumber = 0; sequenceNumber < simulcastProbeCount+10; sequenceNumber++ {
  1040. time.Sleep(20 * time.Millisecond)
  1041. for _, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} {
  1042. pkt := &rtp.Packet{
  1043. Header: rtp.Header{
  1044. Version: 2,
  1045. SequenceNumber: sequenceNumber,
  1046. PayloadType: 96,
  1047. Padding: true,
  1048. },
  1049. Payload: []byte{0x00, 0x02},
  1050. }
  1051. assert.NoError(t, track.WriteRTP(pkt))
  1052. }
  1053. }
  1054. assert.False(t, ridsFullfilled(), "Simulcast probe should not be fulfilled by padding only packets")
  1055. for ; !ridsFullfilled(); sequenceNumber++ {
  1056. time.Sleep(20 * time.Millisecond)
  1057. for ssrc, rid := range rids {
  1058. header := &rtp.Header{
  1059. Version: 2,
  1060. SSRC: uint32(ssrc),
  1061. SequenceNumber: sequenceNumber,
  1062. PayloadType: 96,
  1063. }
  1064. assert.NoError(t, header.SetExtension(midID, []byte("0")))
  1065. // Send RSID for first 10 packets
  1066. if sequenceNumber >= 10 {
  1067. assert.NoError(t, header.SetExtension(ridID, []byte(rid)))
  1068. } else {
  1069. assert.NoError(t, header.SetExtension(rsidID, []byte(rid)))
  1070. header.SSRC += 10
  1071. }
  1072. var writer *TrackLocalStaticRTP
  1073. switch rid {
  1074. case "a":
  1075. writer = vp8WriterA
  1076. case "b":
  1077. writer = vp8WriterB
  1078. case "c":
  1079. writer = vp8WriterC
  1080. }
  1081. _, err = writer.bindings[0].writeStream.WriteRTP(header, []byte{0x00})
  1082. assert.NoError(t, err)
  1083. }
  1084. }
  1085. assertRidCorrect(t)
  1086. closePairNow(t, pcOffer, pcAnswer)
  1087. })
  1088. }
  1089. type simulcastTestTrackLocal struct {
  1090. *TrackLocalStaticRTP
  1091. }
  1092. // don't use ssrc&payload in bindings to let the test write different stream packets.
  1093. func (s *simulcastTestTrackLocal) WriteRTP(pkt *rtp.Packet) error {
  1094. packet := getPacketAllocationFromPool()
  1095. defer resetPacketPoolAllocation(packet)
  1096. *packet = *pkt
  1097. s.mu.RLock()
  1098. defer s.mu.RUnlock()
  1099. writeErrs := []error{}
  1100. for _, b := range s.bindings {
  1101. if _, err := b.writeStream.WriteRTP(&packet.Header, packet.Payload); err != nil {
  1102. writeErrs = append(writeErrs, err)
  1103. }
  1104. }
  1105. return util.FlattenErrs(writeErrs)
  1106. }
  1107. func TestPeerConnection_Simulcast_RTX(t *testing.T) {
  1108. lim := test.TimeOut(time.Second * 30)
  1109. defer lim.Stop()
  1110. report := test.CheckRoutines(t)
  1111. defer report()
  1112. rids := []string{"a", "b"}
  1113. pcOffer, pcAnswer, err := newPair()
  1114. assert.NoError(t, err)
  1115. vp8WriterAStatic, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[0]))
  1116. assert.NoError(t, err)
  1117. vp8WriterBStatic, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[1]))
  1118. assert.NoError(t, err)
  1119. vp8WriterA, vp8WriterB := &simulcastTestTrackLocal{vp8WriterAStatic}, &simulcastTestTrackLocal{vp8WriterBStatic}
  1120. sender, err := pcOffer.AddTrack(vp8WriterA)
  1121. assert.NoError(t, err)
  1122. assert.NotNil(t, sender)
  1123. assert.NoError(t, sender.AddEncoding(vp8WriterB))
  1124. var ridMapLock sync.RWMutex
  1125. ridMap := map[string]int{}
  1126. assertRidCorrect := func(t *testing.T) {
  1127. ridMapLock.Lock()
  1128. defer ridMapLock.Unlock()
  1129. for _, rid := range rids {
  1130. assert.Equal(t, ridMap[rid], 1)
  1131. }
  1132. assert.Equal(t, len(ridMap), 2)
  1133. }
  1134. ridsFullfilled := func() bool {
  1135. ridMapLock.Lock()
  1136. defer ridMapLock.Unlock()
  1137. ridCount := len(ridMap)
  1138. return ridCount == 2
  1139. }
  1140. var rtxPacketRead atomic.Int32
  1141. var wg sync.WaitGroup
  1142. wg.Add(2)
  1143. pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
  1144. ridMapLock.Lock()
  1145. ridMap[trackRemote.RID()] = ridMap[trackRemote.RID()] + 1
  1146. ridMapLock.Unlock()
  1147. defer wg.Done()
  1148. for {
  1149. _, attr, rerr := trackRemote.ReadRTP()
  1150. if rerr != nil {
  1151. break
  1152. }
  1153. if pt, ok := attr.Get(AttributeRtxPayloadType).(byte); ok {
  1154. if pt == 97 {
  1155. rtxPacketRead.Add(1)
  1156. }
  1157. }
  1158. }
  1159. })
  1160. parameters := sender.GetParameters()
  1161. assert.Equal(t, "a", parameters.Encodings[0].RID)
  1162. assert.Equal(t, "b", parameters.Encodings[1].RID)
  1163. var midID, ridID, rsid uint8
  1164. for _, extension := range parameters.HeaderExtensions {
  1165. switch extension.URI {
  1166. case sdp.SDESMidURI:
  1167. midID = uint8(extension.ID)
  1168. case sdp.SDESRTPStreamIDURI:
  1169. ridID = uint8(extension.ID)
  1170. case sdesRepairRTPStreamIDURI:
  1171. rsid = uint8(extension.ID)
  1172. }
  1173. }
  1174. assert.NotZero(t, midID)
  1175. assert.NotZero(t, ridID)
  1176. assert.NotZero(t, rsid)
  1177. err = signalPairWithModification(pcOffer, pcAnswer, func(sdp string) string {
  1178. // Original chrome sdp contains no ssrc info https://pastebin.com/raw/JTjX6zg6
  1179. re := regexp.MustCompile("(?m)[\r\n]+^.*a=ssrc.*$")
  1180. res := re.ReplaceAllString(sdp, "")
  1181. return res
  1182. })
  1183. assert.NoError(t, err)
  1184. // padding only packets should not affect simulcast probe
  1185. var sequenceNumber uint16
  1186. for sequenceNumber = 0; sequenceNumber < simulcastProbeCount+10; sequenceNumber++ {
  1187. time.Sleep(20 * time.Millisecond)
  1188. for i, track := range []*simulcastTestTrackLocal{vp8WriterA, vp8WriterB} {
  1189. pkt := &rtp.Packet{
  1190. Header: rtp.Header{
  1191. Version: 2,
  1192. SequenceNumber: sequenceNumber,
  1193. PayloadType: 96,
  1194. Padding: true,
  1195. SSRC: uint32(i),
  1196. },
  1197. Payload: []byte{0x00, 0x02},
  1198. }
  1199. assert.NoError(t, track.WriteRTP(pkt))
  1200. }
  1201. }
  1202. assert.False(t, ridsFullfilled(), "Simulcast probe should not be fulfilled by padding only packets")
  1203. for ; !ridsFullfilled(); sequenceNumber++ {
  1204. time.Sleep(20 * time.Millisecond)
  1205. for i, track := range []*simulcastTestTrackLocal{vp8WriterA, vp8WriterB} {
  1206. pkt := &rtp.Packet{
  1207. Header: rtp.Header{
  1208. Version: 2,
  1209. SequenceNumber: sequenceNumber,
  1210. PayloadType: 96,
  1211. SSRC: uint32(i),
  1212. },
  1213. Payload: []byte{0x00},
  1214. }
  1215. assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
  1216. assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))
  1217. assert.NoError(t, track.WriteRTP(pkt))
  1218. }
  1219. }
  1220. assertRidCorrect(t)
  1221. for i := 0; i < simulcastProbeCount+10; i++ {
  1222. sequenceNumber++
  1223. time.Sleep(10 * time.Millisecond)
  1224. for j, track := range []*simulcastTestTrackLocal{vp8WriterA, vp8WriterB} {
  1225. pkt := &rtp.Packet{
  1226. Header: rtp.Header{
  1227. Version: 2,
  1228. SequenceNumber: sequenceNumber,
  1229. PayloadType: 97,
  1230. SSRC: uint32(100 + j),
  1231. },
  1232. Payload: []byte{0x00, 0x00, 0x00, 0x00, 0x00},
  1233. }
  1234. assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
  1235. assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))
  1236. assert.NoError(t, pkt.Header.SetExtension(rsid, []byte(track.RID())))
  1237. assert.NoError(t, track.WriteRTP(pkt))
  1238. }
  1239. }
  1240. for ; rtxPacketRead.Load() == 0; sequenceNumber++ {
  1241. time.Sleep(20 * time.Millisecond)
  1242. for i, track := range []*simulcastTestTrackLocal{vp8WriterA, vp8WriterB} {
  1243. pkt := &rtp.Packet{
  1244. Header: rtp.Header{
  1245. Version: 2,
  1246. SequenceNumber: sequenceNumber,
  1247. PayloadType: 96,
  1248. SSRC: uint32(i),
  1249. },
  1250. Payload: []byte{0x00},
  1251. }
  1252. assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
  1253. assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))
  1254. assert.NoError(t, track.WriteRTP(pkt))
  1255. }
  1256. }
  1257. closePairNow(t, pcOffer, pcAnswer)
  1258. wg.Wait()
  1259. assert.Greater(t, rtxPacketRead.Load(), int32(0), "no rtx packet read")
  1260. }
  1261. // Everytime we receieve a new SSRC we probe it and try to determine the proper way to handle it.
  1262. // In most cases a Track explicitly declares a SSRC and a OnTrack is fired. In two cases we don't
  1263. // know the SSRC ahead of time
  1264. // * Undeclared SSRC in a single media section (https://github.com/pion/webrtc/issues/880)
  1265. // * Simulcast
  1266. //
  1267. // The Undeclared SSRC processing code would run before Simulcast. If a Simulcast Offer/Answer only
  1268. // contained one Media Section we would never fire the OnTrack. We would assume it was a failed
  1269. // Undeclared SSRC processing. This test asserts that we properly handled this.
  1270. func TestPeerConnection_Simulcast_NoDataChannel(t *testing.T) {
  1271. lim := test.TimeOut(time.Second * 30)
  1272. defer lim.Stop()
  1273. report := test.CheckRoutines(t)
  1274. defer report()
  1275. // Enable Extension Headers needed for Simulcast
  1276. m := &MediaEngine{}
  1277. if err := m.RegisterDefaultCodecs(); err != nil {
  1278. panic(err)
  1279. }
  1280. registerSimulcastHeaderExtensions(m, RTPCodecTypeVideo)
  1281. pcSender, pcReceiver, err := NewAPI(WithMediaEngine(m)).newPair(Configuration{})
  1282. assert.NoError(t, err)
  1283. var wg sync.WaitGroup
  1284. wg.Add(4)
  1285. var connectionWg sync.WaitGroup
  1286. connectionWg.Add(2)
  1287. connectionStateChangeHandler := func(state PeerConnectionState) {
  1288. if state == PeerConnectionStateConnected {
  1289. connectionWg.Done()
  1290. }
  1291. }
  1292. pcSender.OnConnectionStateChange(connectionStateChangeHandler)
  1293. pcReceiver.OnConnectionStateChange(connectionStateChangeHandler)
  1294. pcReceiver.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
  1295. defer wg.Done()
  1296. })
  1297. go func() {
  1298. defer wg.Done()
  1299. vp8WriterA, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion", WithRTPStreamID("a"))
  1300. assert.NoError(t, err)
  1301. sender, err := pcSender.AddTrack(vp8WriterA)
  1302. assert.NoError(t, err)
  1303. assert.NotNil(t, sender)
  1304. vp8WriterB, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion", WithRTPStreamID("b"))
  1305. assert.NoError(t, err)
  1306. err = sender.AddEncoding(vp8WriterB)
  1307. assert.NoError(t, err)
  1308. vp8WriterC, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion", WithRTPStreamID("c"))
  1309. assert.NoError(t, err)
  1310. err = sender.AddEncoding(vp8WriterC)
  1311. assert.NoError(t, err)
  1312. parameters := sender.GetParameters()
  1313. var midID, ridID, rsidID uint8
  1314. for _, extension := range parameters.HeaderExtensions {
  1315. switch extension.URI {
  1316. case sdp.SDESMidURI:
  1317. midID = uint8(extension.ID)
  1318. case sdp.SDESRTPStreamIDURI:
  1319. ridID = uint8(extension.ID)
  1320. case sdesRepairRTPStreamIDURI:
  1321. rsidID = uint8(extension.ID)
  1322. }
  1323. }
  1324. assert.NotZero(t, midID)
  1325. assert.NotZero(t, ridID)
  1326. assert.NotZero(t, rsidID)
  1327. // signaling
  1328. offerSDP, err := pcSender.CreateOffer(nil)
  1329. assert.NoError(t, err)
  1330. err = pcSender.SetLocalDescription(offerSDP)
  1331. assert.NoError(t, err)
  1332. err = pcReceiver.SetRemoteDescription(offerSDP)
  1333. assert.NoError(t, err)
  1334. answerSDP, err := pcReceiver.CreateAnswer(nil)
  1335. assert.NoError(t, err)
  1336. answerGatheringComplete := GatheringCompletePromise(pcReceiver)
  1337. err = pcReceiver.SetLocalDescription(answerSDP)
  1338. assert.NoError(t, err)
  1339. <-answerGatheringComplete
  1340. assert.NoError(t, pcSender.SetRemoteDescription(*pcReceiver.LocalDescription()))
  1341. connectionWg.Wait()
  1342. var seqNo uint16
  1343. for i := 0; i < 100; i++ {
  1344. pkt := &rtp.Packet{
  1345. Header: rtp.Header{
  1346. Version: 2,
  1347. SequenceNumber: seqNo,
  1348. PayloadType: 96,
  1349. },
  1350. Payload: []byte{0x00, 0x00},
  1351. }
  1352. assert.NoError(t, pkt.SetExtension(ridID, []byte("a")))
  1353. assert.NoError(t, pkt.SetExtension(midID, []byte(sender.rtpTransceiver.Mid())))
  1354. assert.NoError(t, vp8WriterA.WriteRTP(pkt))
  1355. assert.NoError(t, pkt.SetExtension(ridID, []byte("b")))
  1356. assert.NoError(t, pkt.SetExtension(midID, []byte(sender.rtpTransceiver.Mid())))
  1357. assert.NoError(t, vp8WriterB.WriteRTP(pkt))
  1358. assert.NoError(t, pkt.SetExtension(ridID, []byte("c")))
  1359. assert.NoError(t, pkt.SetExtension(midID, []byte(sender.rtpTransceiver.Mid())))
  1360. assert.NoError(t, vp8WriterC.WriteRTP(pkt))
  1361. seqNo++
  1362. }
  1363. }()
  1364. wg.Wait()
  1365. closePairNow(t, pcSender, pcReceiver)
  1366. }