peerconnection_media_test.go 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. //go:build !js
  4. // +build !js
  5. package webrtc
  6. import (
  7. "bufio"
  8. "bytes"
  9. "context"
  10. "errors"
  11. "fmt"
  12. "io"
  13. "strings"
  14. "sync"
  15. "testing"
  16. "time"
  17. "github.com/pion/logging"
  18. "github.com/pion/randutil"
  19. "github.com/pion/rtcp"
  20. "github.com/pion/rtp"
  21. "github.com/pion/sdp/v3"
  22. "github.com/pion/transport/v2/test"
  23. "github.com/pion/webrtc/v3/pkg/media"
  24. "github.com/stretchr/testify/assert"
  25. "github.com/stretchr/testify/require"
  26. )
  27. var (
  28. errIncomingTrackIDInvalid = errors.New("incoming Track ID is invalid")
  29. errIncomingTrackLabelInvalid = errors.New("incoming Track Label is invalid")
  30. errNoTransceiverwithMid = errors.New("no transceiver with mid")
  31. )
  32. func registerSimulcastHeaderExtensions(m *MediaEngine, codecType RTPCodecType) {
  33. for _, extension := range []string{
  34. sdp.SDESMidURI,
  35. sdp.SDESRTPStreamIDURI,
  36. sdesRepairRTPStreamIDURI,
  37. } {
  38. if err := m.RegisterHeaderExtension(RTPHeaderExtensionCapability{URI: extension}, codecType); err != nil {
  39. panic(err)
  40. }
  41. }
  42. }
  43. /*
  44. Integration test for bi-directional peers
  45. This asserts we can send RTP and RTCP both ways, and blocks until
  46. each side gets something (and asserts payload contents)
  47. */
  48. // nolint: gocyclo
  49. func TestPeerConnection_Media_Sample(t *testing.T) {
  50. const (
  51. expectedTrackID = "video"
  52. expectedStreamID = "pion"
  53. )
  54. lim := test.TimeOut(time.Second * 30)
  55. defer lim.Stop()
  56. report := test.CheckRoutines(t)
  57. defer report()
  58. pcOffer, pcAnswer, err := newPair()
  59. if err != nil {
  60. t.Fatal(err)
  61. }
  62. awaitRTPRecv := make(chan bool)
  63. awaitRTPRecvClosed := make(chan bool)
  64. awaitRTPSend := make(chan bool)
  65. awaitRTCPSenderRecv := make(chan bool)
  66. awaitRTCPSenderSend := make(chan error)
  67. awaitRTCPReceiverRecv := make(chan error)
  68. awaitRTCPReceiverSend := make(chan error)
  69. trackMetadataValid := make(chan error)
  70. pcAnswer.OnTrack(func(track *TrackRemote, receiver *RTPReceiver) {
  71. if track.ID() != expectedTrackID {
  72. trackMetadataValid <- fmt.Errorf("%w: expected(%s) actual(%s)", errIncomingTrackIDInvalid, expectedTrackID, track.ID())
  73. return
  74. }
  75. if track.StreamID() != expectedStreamID {
  76. trackMetadataValid <- fmt.Errorf("%w: expected(%s) actual(%s)", errIncomingTrackLabelInvalid, expectedStreamID, track.StreamID())
  77. return
  78. }
  79. close(trackMetadataValid)
  80. go func() {
  81. for {
  82. time.Sleep(time.Millisecond * 100)
  83. if routineErr := pcAnswer.WriteRTCP([]rtcp.Packet{&rtcp.RapidResynchronizationRequest{SenderSSRC: uint32(track.SSRC()), MediaSSRC: uint32(track.SSRC())}}); routineErr != nil {
  84. awaitRTCPReceiverSend <- routineErr
  85. return
  86. }
  87. select {
  88. case <-awaitRTCPSenderRecv:
  89. close(awaitRTCPReceiverSend)
  90. return
  91. default:
  92. }
  93. }
  94. }()
  95. go func() {
  96. _, _, routineErr := receiver.Read(make([]byte, 1400))
  97. if routineErr != nil {
  98. awaitRTCPReceiverRecv <- routineErr
  99. } else {
  100. close(awaitRTCPReceiverRecv)
  101. }
  102. }()
  103. haveClosedAwaitRTPRecv := false
  104. for {
  105. p, _, routineErr := track.ReadRTP()
  106. if routineErr != nil {
  107. close(awaitRTPRecvClosed)
  108. return
  109. } else if bytes.Equal(p.Payload, []byte{0x10, 0x00}) && !haveClosedAwaitRTPRecv {
  110. haveClosedAwaitRTPRecv = true
  111. close(awaitRTPRecv)
  112. }
  113. }
  114. })
  115. vp8Track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, expectedTrackID, expectedStreamID)
  116. if err != nil {
  117. t.Fatal(err)
  118. }
  119. sender, err := pcOffer.AddTrack(vp8Track)
  120. if err != nil {
  121. t.Fatal(err)
  122. }
  123. go func() {
  124. for {
  125. time.Sleep(time.Millisecond * 100)
  126. if pcOffer.ICEConnectionState() != ICEConnectionStateConnected {
  127. continue
  128. }
  129. if routineErr := vp8Track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}); routineErr != nil {
  130. fmt.Println(routineErr)
  131. }
  132. select {
  133. case <-awaitRTPRecv:
  134. close(awaitRTPSend)
  135. return
  136. default:
  137. }
  138. }
  139. }()
  140. go func() {
  141. for {
  142. time.Sleep(time.Millisecond * 100)
  143. if routineErr := pcOffer.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{SenderSSRC: uint32(sender.trackEncodings[0].ssrc), MediaSSRC: uint32(sender.trackEncodings[0].ssrc)}}); routineErr != nil {
  144. awaitRTCPSenderSend <- routineErr
  145. }
  146. select {
  147. case <-awaitRTCPReceiverRecv:
  148. close(awaitRTCPSenderSend)
  149. return
  150. default:
  151. }
  152. }
  153. }()
  154. go func() {
  155. if _, _, routineErr := sender.Read(make([]byte, 1400)); routineErr == nil {
  156. close(awaitRTCPSenderRecv)
  157. }
  158. }()
  159. assert.NoError(t, signalPair(pcOffer, pcAnswer))
  160. err, ok := <-trackMetadataValid
  161. if ok {
  162. t.Fatal(err)
  163. }
  164. <-awaitRTPRecv
  165. <-awaitRTPSend
  166. <-awaitRTCPSenderRecv
  167. if err, ok = <-awaitRTCPSenderSend; ok {
  168. t.Fatal(err)
  169. }
  170. <-awaitRTCPReceiverRecv
  171. if err, ok = <-awaitRTCPReceiverSend; ok {
  172. t.Fatal(err)
  173. }
  174. closePairNow(t, pcOffer, pcAnswer)
  175. <-awaitRTPRecvClosed
  176. }
  177. /*
  178. PeerConnection should be able to be torn down at anytime
  179. This test adds an input track and asserts
  180. * OnTrack doesn't fire since no video packets will arrive
  181. * No goroutine leaks
  182. * No deadlocks on shutdown
  183. */
  184. func TestPeerConnection_Media_Shutdown(t *testing.T) {
  185. iceCompleteAnswer := make(chan struct{})
  186. iceCompleteOffer := make(chan struct{})
  187. lim := test.TimeOut(time.Second * 30)
  188. defer lim.Stop()
  189. report := test.CheckRoutines(t)
  190. defer report()
  191. pcOffer, pcAnswer, err := newPair()
  192. if err != nil {
  193. t.Fatal(err)
  194. }
  195. _, err = pcOffer.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly})
  196. if err != nil {
  197. t.Fatal(err)
  198. }
  199. _, err = pcAnswer.AddTransceiverFromKind(RTPCodecTypeAudio, RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly})
  200. if err != nil {
  201. t.Fatal(err)
  202. }
  203. opusTrack, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeOpus}, "audio", "pion1")
  204. if err != nil {
  205. t.Fatal(err)
  206. }
  207. vp8Track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
  208. if err != nil {
  209. t.Fatal(err)
  210. }
  211. if _, err = pcOffer.AddTrack(opusTrack); err != nil {
  212. t.Fatal(err)
  213. } else if _, err = pcAnswer.AddTrack(vp8Track); err != nil {
  214. t.Fatal(err)
  215. }
  216. var onTrackFiredLock sync.Mutex
  217. onTrackFired := false
  218. pcAnswer.OnTrack(func(track *TrackRemote, receiver *RTPReceiver) {
  219. onTrackFiredLock.Lock()
  220. defer onTrackFiredLock.Unlock()
  221. onTrackFired = true
  222. })
  223. pcAnswer.OnICEConnectionStateChange(func(iceState ICEConnectionState) {
  224. if iceState == ICEConnectionStateConnected {
  225. close(iceCompleteAnswer)
  226. }
  227. })
  228. pcOffer.OnICEConnectionStateChange(func(iceState ICEConnectionState) {
  229. if iceState == ICEConnectionStateConnected {
  230. close(iceCompleteOffer)
  231. }
  232. })
  233. err = signalPair(pcOffer, pcAnswer)
  234. if err != nil {
  235. t.Fatal(err)
  236. }
  237. <-iceCompleteAnswer
  238. <-iceCompleteOffer
  239. // Each PeerConnection should have one sender, one receiver and one transceiver
  240. for _, pc := range []*PeerConnection{pcOffer, pcAnswer} {
  241. senders := pc.GetSenders()
  242. if len(senders) != 1 {
  243. t.Errorf("Each PeerConnection should have one RTPSender, we have %d", len(senders))
  244. }
  245. receivers := pc.GetReceivers()
  246. if len(receivers) != 2 {
  247. t.Errorf("Each PeerConnection should have two RTPReceivers, we have %d", len(receivers))
  248. }
  249. transceivers := pc.GetTransceivers()
  250. if len(transceivers) != 2 {
  251. t.Errorf("Each PeerConnection should have two RTPTransceivers, we have %d", len(transceivers))
  252. }
  253. }
  254. closePairNow(t, pcOffer, pcAnswer)
  255. onTrackFiredLock.Lock()
  256. if onTrackFired {
  257. t.Fatalf("PeerConnection OnTrack fired even though we got no packets")
  258. }
  259. onTrackFiredLock.Unlock()
  260. }
  261. /*
  262. Integration test for behavior around media and disconnected peers
  263. * Sending RTP and RTCP to a disconnected Peer shouldn't return an error
  264. */
  265. func TestPeerConnection_Media_Disconnected(t *testing.T) {
  266. lim := test.TimeOut(time.Second * 30)
  267. defer lim.Stop()
  268. report := test.CheckRoutines(t)
  269. defer report()
  270. s := SettingEngine{}
  271. s.SetICETimeouts(time.Second/2, time.Second/2, time.Second/8)
  272. m := &MediaEngine{}
  273. assert.NoError(t, m.RegisterDefaultCodecs())
  274. pcOffer, pcAnswer, err := NewAPI(WithSettingEngine(s), WithMediaEngine(m)).newPair(Configuration{})
  275. if err != nil {
  276. t.Fatal(err)
  277. }
  278. vp8Track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
  279. if err != nil {
  280. t.Fatal(err)
  281. }
  282. vp8Sender, err := pcOffer.AddTrack(vp8Track)
  283. if err != nil {
  284. t.Fatal(err)
  285. }
  286. haveDisconnected := make(chan error)
  287. pcOffer.OnICEConnectionStateChange(func(iceState ICEConnectionState) {
  288. if iceState == ICEConnectionStateDisconnected {
  289. close(haveDisconnected)
  290. } else if iceState == ICEConnectionStateConnected {
  291. // Assert that DTLS is done by pull remote certificate, don't tear down the PC early
  292. for {
  293. if len(vp8Sender.Transport().GetRemoteCertificate()) != 0 {
  294. if pcAnswer.sctpTransport.association() != nil {
  295. break
  296. }
  297. }
  298. time.Sleep(time.Second)
  299. }
  300. if pcCloseErr := pcAnswer.Close(); pcCloseErr != nil {
  301. haveDisconnected <- pcCloseErr
  302. }
  303. }
  304. })
  305. err = signalPair(pcOffer, pcAnswer)
  306. if err != nil {
  307. t.Fatal(err)
  308. }
  309. err, ok := <-haveDisconnected
  310. if ok {
  311. t.Fatal(err)
  312. }
  313. for i := 0; i <= 5; i++ {
  314. if rtpErr := vp8Track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}); rtpErr != nil {
  315. t.Fatal(rtpErr)
  316. } else if rtcpErr := pcOffer.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: 0}}); rtcpErr != nil {
  317. t.Fatal(rtcpErr)
  318. }
  319. }
  320. assert.NoError(t, pcOffer.Close())
  321. }
  322. type undeclaredSsrcLogger struct{ unhandledSimulcastError chan struct{} }
  323. func (u *undeclaredSsrcLogger) Trace(string) {}
  324. func (u *undeclaredSsrcLogger) Tracef(string, ...interface{}) {}
  325. func (u *undeclaredSsrcLogger) Debug(string) {}
  326. func (u *undeclaredSsrcLogger) Debugf(string, ...interface{}) {}
  327. func (u *undeclaredSsrcLogger) Info(string) {}
  328. func (u *undeclaredSsrcLogger) Infof(string, ...interface{}) {}
  329. func (u *undeclaredSsrcLogger) Warn(string) {}
  330. func (u *undeclaredSsrcLogger) Warnf(string, ...interface{}) {}
  331. func (u *undeclaredSsrcLogger) Error(string) {}
  332. func (u *undeclaredSsrcLogger) Errorf(format string, _ ...interface{}) {
  333. if format == incomingUnhandledRTPSsrc {
  334. close(u.unhandledSimulcastError)
  335. }
  336. }
  337. type undeclaredSsrcLoggerFactory struct{ unhandledSimulcastError chan struct{} }
  338. func (u *undeclaredSsrcLoggerFactory) NewLogger(string) logging.LeveledLogger {
  339. return &undeclaredSsrcLogger{u.unhandledSimulcastError}
  340. }
  341. // Filter SSRC lines
  342. func filterSsrc(offer string) (filteredSDP string) {
  343. scanner := bufio.NewScanner(strings.NewReader(offer))
  344. for scanner.Scan() {
  345. l := scanner.Text()
  346. if strings.HasPrefix(l, "a=ssrc") {
  347. continue
  348. }
  349. filteredSDP += l + "\n"
  350. }
  351. return
  352. }
  353. // If a SessionDescription has a single media section and no SSRC
  354. // assume that it is meant to handle all RTP packets
  355. func TestUndeclaredSSRC(t *testing.T) {
  356. lim := test.TimeOut(time.Second * 30)
  357. defer lim.Stop()
  358. report := test.CheckRoutines(t)
  359. defer report()
  360. t.Run("No SSRC", func(t *testing.T) {
  361. pcOffer, pcAnswer, err := newPair()
  362. assert.NoError(t, err)
  363. vp8Writer, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
  364. assert.NoError(t, err)
  365. _, err = pcOffer.AddTrack(vp8Writer)
  366. assert.NoError(t, err)
  367. onTrackFired := make(chan struct{})
  368. pcAnswer.OnTrack(func(trackRemote *TrackRemote, r *RTPReceiver) {
  369. assert.Equal(t, trackRemote.StreamID(), vp8Writer.StreamID())
  370. assert.Equal(t, trackRemote.ID(), vp8Writer.ID())
  371. close(onTrackFired)
  372. })
  373. offer, err := pcOffer.CreateOffer(nil)
  374. assert.NoError(t, err)
  375. offerGatheringComplete := GatheringCompletePromise(pcOffer)
  376. assert.NoError(t, pcOffer.SetLocalDescription(offer))
  377. <-offerGatheringComplete
  378. offer.SDP = filterSsrc(pcOffer.LocalDescription().SDP)
  379. assert.NoError(t, pcAnswer.SetRemoteDescription(offer))
  380. answer, err := pcAnswer.CreateAnswer(nil)
  381. assert.NoError(t, err)
  382. answerGatheringComplete := GatheringCompletePromise(pcAnswer)
  383. assert.NoError(t, pcAnswer.SetLocalDescription(answer))
  384. <-answerGatheringComplete
  385. assert.NoError(t, pcOffer.SetRemoteDescription(*pcAnswer.LocalDescription()))
  386. sendVideoUntilDone(onTrackFired, t, []*TrackLocalStaticSample{vp8Writer})
  387. closePairNow(t, pcOffer, pcAnswer)
  388. })
  389. t.Run("Has RID", func(t *testing.T) {
  390. unhandledSimulcastError := make(chan struct{})
  391. m := &MediaEngine{}
  392. assert.NoError(t, m.RegisterDefaultCodecs())
  393. pcOffer, pcAnswer, err := NewAPI(WithSettingEngine(SettingEngine{
  394. LoggerFactory: &undeclaredSsrcLoggerFactory{unhandledSimulcastError},
  395. }), WithMediaEngine(m)).newPair(Configuration{})
  396. assert.NoError(t, err)
  397. vp8Writer, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
  398. assert.NoError(t, err)
  399. _, err = pcOffer.AddTrack(vp8Writer)
  400. assert.NoError(t, err)
  401. offer, err := pcOffer.CreateOffer(nil)
  402. assert.NoError(t, err)
  403. offerGatheringComplete := GatheringCompletePromise(pcOffer)
  404. assert.NoError(t, pcOffer.SetLocalDescription(offer))
  405. <-offerGatheringComplete
  406. // Append RID to end of SessionDescription. Will not be considered unhandled anymore
  407. offer.SDP = filterSsrc(pcOffer.LocalDescription().SDP) + "a=" + sdpAttributeRid + "\r\n"
  408. assert.NoError(t, pcAnswer.SetRemoteDescription(offer))
  409. answer, err := pcAnswer.CreateAnswer(nil)
  410. assert.NoError(t, err)
  411. answerGatheringComplete := GatheringCompletePromise(pcAnswer)
  412. assert.NoError(t, pcAnswer.SetLocalDescription(answer))
  413. <-answerGatheringComplete
  414. assert.NoError(t, pcOffer.SetRemoteDescription(*pcAnswer.LocalDescription()))
  415. sendVideoUntilDone(unhandledSimulcastError, t, []*TrackLocalStaticSample{vp8Writer})
  416. closePairNow(t, pcOffer, pcAnswer)
  417. })
  418. }
  419. func TestAddTransceiverFromTrackSendOnly(t *testing.T) {
  420. lim := test.TimeOut(time.Second * 30)
  421. defer lim.Stop()
  422. report := test.CheckRoutines(t)
  423. defer report()
  424. pc, err := NewPeerConnection(Configuration{})
  425. if err != nil {
  426. t.Error(err.Error())
  427. }
  428. track, err := NewTrackLocalStaticSample(
  429. RTPCodecCapability{MimeType: "audio/Opus"},
  430. "track-id",
  431. "stream-id",
  432. )
  433. if err != nil {
  434. t.Error(err.Error())
  435. }
  436. transceiver, err := pc.AddTransceiverFromTrack(track, RTPTransceiverInit{
  437. Direction: RTPTransceiverDirectionSendonly,
  438. })
  439. if err != nil {
  440. t.Error(err.Error())
  441. }
  442. if transceiver.Receiver() != nil {
  443. t.Errorf("Transceiver shouldn't have a receiver")
  444. }
  445. if transceiver.Sender() == nil {
  446. t.Errorf("Transceiver should have a sender")
  447. }
  448. if len(pc.GetTransceivers()) != 1 {
  449. t.Errorf("PeerConnection should have one transceiver but has %d", len(pc.GetTransceivers()))
  450. }
  451. if len(pc.GetSenders()) != 1 {
  452. t.Errorf("PeerConnection should have one sender but has %d", len(pc.GetSenders()))
  453. }
  454. offer, err := pc.CreateOffer(nil)
  455. if err != nil {
  456. t.Error(err.Error())
  457. }
  458. if !offerMediaHasDirection(offer, RTPCodecTypeAudio, RTPTransceiverDirectionSendonly) {
  459. t.Errorf("Direction on SDP is not %s", RTPTransceiverDirectionSendonly)
  460. }
  461. assert.NoError(t, pc.Close())
  462. }
  463. func TestAddTransceiverFromTrackSendRecv(t *testing.T) {
  464. lim := test.TimeOut(time.Second * 30)
  465. defer lim.Stop()
  466. report := test.CheckRoutines(t)
  467. defer report()
  468. pc, err := NewPeerConnection(Configuration{})
  469. if err != nil {
  470. t.Error(err.Error())
  471. }
  472. track, err := NewTrackLocalStaticSample(
  473. RTPCodecCapability{MimeType: "audio/Opus"},
  474. "track-id",
  475. "stream-id",
  476. )
  477. if err != nil {
  478. t.Error(err.Error())
  479. }
  480. transceiver, err := pc.AddTransceiverFromTrack(track, RTPTransceiverInit{
  481. Direction: RTPTransceiverDirectionSendrecv,
  482. })
  483. if err != nil {
  484. t.Error(err.Error())
  485. }
  486. if transceiver.Receiver() == nil {
  487. t.Errorf("Transceiver should have a receiver")
  488. }
  489. if transceiver.Sender() == nil {
  490. t.Errorf("Transceiver should have a sender")
  491. }
  492. if len(pc.GetTransceivers()) != 1 {
  493. t.Errorf("PeerConnection should have one transceiver but has %d", len(pc.GetTransceivers()))
  494. }
  495. offer, err := pc.CreateOffer(nil)
  496. if err != nil {
  497. t.Error(err.Error())
  498. }
  499. if !offerMediaHasDirection(offer, RTPCodecTypeAudio, RTPTransceiverDirectionSendrecv) {
  500. t.Errorf("Direction on SDP is not %s", RTPTransceiverDirectionSendrecv)
  501. }
  502. assert.NoError(t, pc.Close())
  503. }
  504. func TestAddTransceiverAddTrack_Reuse(t *testing.T) {
  505. pc, err := NewPeerConnection(Configuration{})
  506. assert.NoError(t, err)
  507. tr, err := pc.AddTransceiverFromKind(
  508. RTPCodecTypeVideo,
  509. RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly},
  510. )
  511. assert.NoError(t, err)
  512. assert.Equal(t, []*RTPTransceiver{tr}, pc.GetTransceivers())
  513. addTrack := func() (TrackLocal, *RTPSender) {
  514. track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "foo", "bar")
  515. assert.NoError(t, err)
  516. sender, err := pc.AddTrack(track)
  517. assert.NoError(t, err)
  518. return track, sender
  519. }
  520. track1, sender1 := addTrack()
  521. assert.Equal(t, 1, len(pc.GetTransceivers()))
  522. assert.Equal(t, sender1, tr.Sender())
  523. assert.Equal(t, track1, tr.Sender().Track())
  524. require.NoError(t, pc.RemoveTrack(sender1))
  525. track2, _ := addTrack()
  526. assert.Equal(t, 1, len(pc.GetTransceivers()))
  527. assert.Equal(t, track2, tr.Sender().Track())
  528. addTrack()
  529. assert.Equal(t, 2, len(pc.GetTransceivers()))
  530. assert.NoError(t, pc.Close())
  531. }
  532. func TestAddTransceiverAddTrack_NewRTPSender_Error(t *testing.T) {
  533. pc, err := NewPeerConnection(Configuration{})
  534. assert.NoError(t, err)
  535. _, err = pc.AddTransceiverFromKind(
  536. RTPCodecTypeVideo,
  537. RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly},
  538. )
  539. assert.NoError(t, err)
  540. dtlsTransport := pc.dtlsTransport
  541. pc.dtlsTransport = nil
  542. track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "foo", "bar")
  543. assert.NoError(t, err)
  544. _, err = pc.AddTrack(track)
  545. assert.Error(t, err, "DTLSTransport must not be nil")
  546. assert.Equal(t, 1, len(pc.GetTransceivers()))
  547. pc.dtlsTransport = dtlsTransport
  548. assert.NoError(t, pc.Close())
  549. }
  550. func TestRtpSenderReceiver_ReadClose_Error(t *testing.T) {
  551. pc, err := NewPeerConnection(Configuration{})
  552. assert.NoError(t, err)
  553. tr, err := pc.AddTransceiverFromKind(
  554. RTPCodecTypeVideo,
  555. RTPTransceiverInit{Direction: RTPTransceiverDirectionSendrecv},
  556. )
  557. assert.NoError(t, err)
  558. sender, receiver := tr.Sender(), tr.Receiver()
  559. assert.NoError(t, sender.Stop())
  560. _, _, err = sender.Read(make([]byte, 0, 1400))
  561. assert.ErrorIs(t, err, io.ErrClosedPipe)
  562. assert.NoError(t, receiver.Stop())
  563. _, _, err = receiver.Read(make([]byte, 0, 1400))
  564. assert.ErrorIs(t, err, io.ErrClosedPipe)
  565. assert.NoError(t, pc.Close())
  566. }
  567. // nolint: dupl
  568. func TestAddTransceiverFromKind(t *testing.T) {
  569. lim := test.TimeOut(time.Second * 30)
  570. defer lim.Stop()
  571. report := test.CheckRoutines(t)
  572. defer report()
  573. pc, err := NewPeerConnection(Configuration{})
  574. if err != nil {
  575. t.Error(err.Error())
  576. }
  577. transceiver, err := pc.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{
  578. Direction: RTPTransceiverDirectionRecvonly,
  579. })
  580. if err != nil {
  581. t.Error(err.Error())
  582. }
  583. if transceiver.Receiver() == nil {
  584. t.Errorf("Transceiver should have a receiver")
  585. }
  586. if transceiver.Sender() != nil {
  587. t.Errorf("Transceiver shouldn't have a sender")
  588. }
  589. offer, err := pc.CreateOffer(nil)
  590. if err != nil {
  591. t.Error(err.Error())
  592. }
  593. if !offerMediaHasDirection(offer, RTPCodecTypeVideo, RTPTransceiverDirectionRecvonly) {
  594. t.Errorf("Direction on SDP is not %s", RTPTransceiverDirectionRecvonly)
  595. }
  596. assert.NoError(t, pc.Close())
  597. }
  598. func TestAddTransceiverFromTrackFailsRecvOnly(t *testing.T) {
  599. lim := test.TimeOut(time.Second * 30)
  600. defer lim.Stop()
  601. report := test.CheckRoutines(t)
  602. defer report()
  603. pc, err := NewPeerConnection(Configuration{})
  604. if err != nil {
  605. t.Error(err.Error())
  606. }
  607. track, err := NewTrackLocalStaticSample(
  608. RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f"},
  609. "track-id",
  610. "track-label",
  611. )
  612. if err != nil {
  613. t.Error(err.Error())
  614. }
  615. transceiver, err := pc.AddTransceiverFromTrack(track, RTPTransceiverInit{
  616. Direction: RTPTransceiverDirectionRecvonly,
  617. })
  618. if transceiver != nil {
  619. t.Error("AddTransceiverFromTrack shouldn't succeed with Direction RTPTransceiverDirectionRecvonly")
  620. }
  621. assert.NotNil(t, err)
  622. assert.NoError(t, pc.Close())
  623. }
  624. func TestPlanBMediaExchange(t *testing.T) {
  625. runTest := func(trackCount int, t *testing.T) {
  626. addSingleTrack := func(p *PeerConnection) *TrackLocalStaticSample {
  627. track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, fmt.Sprintf("video-%d", randutil.NewMathRandomGenerator().Uint32()), fmt.Sprintf("video-%d", randutil.NewMathRandomGenerator().Uint32()))
  628. assert.NoError(t, err)
  629. _, err = p.AddTrack(track)
  630. assert.NoError(t, err)
  631. return track
  632. }
  633. pcOffer, err := NewPeerConnection(Configuration{SDPSemantics: SDPSemanticsPlanB})
  634. assert.NoError(t, err)
  635. pcAnswer, err := NewPeerConnection(Configuration{SDPSemantics: SDPSemanticsPlanB})
  636. assert.NoError(t, err)
  637. var onTrackWaitGroup sync.WaitGroup
  638. onTrackWaitGroup.Add(trackCount)
  639. pcAnswer.OnTrack(func(track *TrackRemote, r *RTPReceiver) {
  640. onTrackWaitGroup.Done()
  641. })
  642. done := make(chan struct{})
  643. go func() {
  644. onTrackWaitGroup.Wait()
  645. close(done)
  646. }()
  647. _, err = pcAnswer.AddTransceiverFromKind(RTPCodecTypeVideo)
  648. assert.NoError(t, err)
  649. outboundTracks := []*TrackLocalStaticSample{}
  650. for i := 0; i < trackCount; i++ {
  651. outboundTracks = append(outboundTracks, addSingleTrack(pcOffer))
  652. }
  653. assert.NoError(t, signalPair(pcOffer, pcAnswer))
  654. func() {
  655. for {
  656. select {
  657. case <-time.After(20 * time.Millisecond):
  658. for _, track := range outboundTracks {
  659. assert.NoError(t, track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}))
  660. }
  661. case <-done:
  662. return
  663. }
  664. }
  665. }()
  666. closePairNow(t, pcOffer, pcAnswer)
  667. }
  668. lim := test.TimeOut(time.Second * 30)
  669. defer lim.Stop()
  670. report := test.CheckRoutines(t)
  671. defer report()
  672. t.Run("Single Track", func(t *testing.T) {
  673. runTest(1, t)
  674. })
  675. t.Run("Multi Track", func(t *testing.T) {
  676. runTest(2, t)
  677. })
  678. }
  679. // TestPeerConnection_Start_Only_Negotiated_Senders tests that only
  680. // the current negotiated transceivers senders provided in an
  681. // offer/answer are started
  682. func TestPeerConnection_Start_Only_Negotiated_Senders(t *testing.T) {
  683. lim := test.TimeOut(time.Second * 30)
  684. defer lim.Stop()
  685. report := test.CheckRoutines(t)
  686. defer report()
  687. pcOffer, err := NewPeerConnection(Configuration{})
  688. assert.NoError(t, err)
  689. defer func() { assert.NoError(t, pcOffer.Close()) }()
  690. pcAnswer, err := NewPeerConnection(Configuration{})
  691. assert.NoError(t, err)
  692. defer func() { assert.NoError(t, pcAnswer.Close()) }()
  693. track1, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion1")
  694. require.NoError(t, err)
  695. sender1, err := pcOffer.AddTrack(track1)
  696. require.NoError(t, err)
  697. offer, err := pcOffer.CreateOffer(nil)
  698. assert.NoError(t, err)
  699. offerGatheringComplete := GatheringCompletePromise(pcOffer)
  700. assert.NoError(t, pcOffer.SetLocalDescription(offer))
  701. <-offerGatheringComplete
  702. assert.NoError(t, pcAnswer.SetRemoteDescription(*pcOffer.LocalDescription()))
  703. answer, err := pcAnswer.CreateAnswer(nil)
  704. assert.NoError(t, err)
  705. answerGatheringComplete := GatheringCompletePromise(pcAnswer)
  706. assert.NoError(t, pcAnswer.SetLocalDescription(answer))
  707. <-answerGatheringComplete
  708. // Add a new track between providing the offer and applying the answer
  709. track2, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
  710. require.NoError(t, err)
  711. sender2, err := pcOffer.AddTrack(track2)
  712. require.NoError(t, err)
  713. // apply answer so we'll test generateMatchedSDP
  714. assert.NoError(t, pcOffer.SetRemoteDescription(*pcAnswer.LocalDescription()))
  715. // Wait for senders to be started by startTransports spawned goroutine
  716. pcOffer.ops.Done()
  717. // sender1 should be started but sender2 should not be started
  718. assert.True(t, sender1.hasSent(), "sender1 is not started but should be started")
  719. assert.False(t, sender2.hasSent(), "sender2 is started but should not be started")
  720. }
  721. // TestPeerConnection_Start_Right_Receiver tests that the right
  722. // receiver (the receiver which transceiver has the same media section as the track)
  723. // is started for the specified track
  724. func TestPeerConnection_Start_Right_Receiver(t *testing.T) {
  725. isTransceiverReceiverStarted := func(pc *PeerConnection, mid string) (bool, error) {
  726. for _, transceiver := range pc.GetTransceivers() {
  727. if transceiver.Mid() != mid {
  728. continue
  729. }
  730. return transceiver.Receiver() != nil && transceiver.Receiver().haveReceived(), nil
  731. }
  732. return false, fmt.Errorf("%w: %q", errNoTransceiverwithMid, mid)
  733. }
  734. lim := test.TimeOut(time.Second * 30)
  735. defer lim.Stop()
  736. report := test.CheckRoutines(t)
  737. defer report()
  738. pcOffer, pcAnswer, err := newPair()
  739. require.NoError(t, err)
  740. _, err = pcAnswer.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly})
  741. assert.NoError(t, err)
  742. track1, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion1")
  743. require.NoError(t, err)
  744. sender1, err := pcOffer.AddTrack(track1)
  745. require.NoError(t, err)
  746. assert.NoError(t, signalPair(pcOffer, pcAnswer))
  747. pcOffer.ops.Done()
  748. pcAnswer.ops.Done()
  749. // transceiver with mid 0 should be started
  750. started, err := isTransceiverReceiverStarted(pcAnswer, "0")
  751. assert.NoError(t, err)
  752. assert.True(t, started, "transceiver with mid 0 should be started")
  753. // Remove track
  754. assert.NoError(t, pcOffer.RemoveTrack(sender1))
  755. assert.NoError(t, signalPair(pcOffer, pcAnswer))
  756. pcOffer.ops.Done()
  757. pcAnswer.ops.Done()
  758. // transceiver with mid 0 should not be started
  759. started, err = isTransceiverReceiverStarted(pcAnswer, "0")
  760. assert.NoError(t, err)
  761. assert.False(t, started, "transceiver with mid 0 should not be started")
  762. // Add a new transceiver (we're not using AddTrack since it'll reuse the transceiver with mid 0)
  763. _, err = pcOffer.AddTransceiverFromTrack(track1)
  764. assert.NoError(t, err)
  765. _, err = pcAnswer.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly})
  766. assert.NoError(t, err)
  767. assert.NoError(t, signalPair(pcOffer, pcAnswer))
  768. pcOffer.ops.Done()
  769. pcAnswer.ops.Done()
  770. // transceiver with mid 0 should not be started
  771. started, err = isTransceiverReceiverStarted(pcAnswer, "0")
  772. assert.NoError(t, err)
  773. assert.False(t, started, "transceiver with mid 0 should not be started")
  774. // transceiver with mid 2 should be started
  775. started, err = isTransceiverReceiverStarted(pcAnswer, "2")
  776. assert.NoError(t, err)
  777. assert.True(t, started, "transceiver with mid 2 should be started")
  778. closePairNow(t, pcOffer, pcAnswer)
  779. }
  780. func TestPeerConnection_Simulcast_Probe(t *testing.T) {
  781. lim := test.TimeOut(time.Second * 30) //nolint
  782. defer lim.Stop()
  783. report := test.CheckRoutines(t)
  784. defer report()
  785. // Assert that failed Simulcast probing doesn't cause
  786. // the handleUndeclaredSSRC to be leaked
  787. t.Run("Leak", func(t *testing.T) {
  788. track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
  789. assert.NoError(t, err)
  790. offerer, answerer, err := newPair()
  791. assert.NoError(t, err)
  792. _, err = offerer.AddTrack(track)
  793. assert.NoError(t, err)
  794. ticker := time.NewTicker(time.Millisecond * 20)
  795. testFinished := make(chan struct{})
  796. seenFiveStreams, seenFiveStreamsCancel := context.WithCancel(context.Background())
  797. go func() {
  798. for {
  799. select {
  800. case <-testFinished:
  801. return
  802. case <-ticker.C:
  803. answerer.dtlsTransport.lock.Lock()
  804. if len(answerer.dtlsTransport.simulcastStreams) >= 5 {
  805. seenFiveStreamsCancel()
  806. }
  807. answerer.dtlsTransport.lock.Unlock()
  808. track.mu.Lock()
  809. if len(track.bindings) == 1 {
  810. _, err = track.bindings[0].writeStream.WriteRTP(&rtp.Header{
  811. Version: 2,
  812. SSRC: randutil.NewMathRandomGenerator().Uint32(),
  813. }, []byte{0, 1, 2, 3, 4, 5})
  814. assert.NoError(t, err)
  815. }
  816. track.mu.Unlock()
  817. }
  818. }
  819. }()
  820. assert.NoError(t, signalPair(offerer, answerer))
  821. peerConnectionConnected := untilConnectionState(PeerConnectionStateConnected, offerer, answerer)
  822. peerConnectionConnected.Wait()
  823. <-seenFiveStreams.Done()
  824. closePairNow(t, offerer, answerer)
  825. close(testFinished)
  826. })
  827. // Assert that NonSimulcast Traffic isn't incorrectly broken by the probe
  828. t.Run("Break NonSimulcast", func(t *testing.T) {
  829. unhandledSimulcastError := make(chan struct{})
  830. m := &MediaEngine{}
  831. if err := m.RegisterDefaultCodecs(); err != nil {
  832. panic(err)
  833. }
  834. registerSimulcastHeaderExtensions(m, RTPCodecTypeVideo)
  835. pcOffer, pcAnswer, err := NewAPI(WithSettingEngine(SettingEngine{
  836. LoggerFactory: &undeclaredSsrcLoggerFactory{unhandledSimulcastError},
  837. }), WithMediaEngine(m)).newPair(Configuration{})
  838. assert.NoError(t, err)
  839. firstTrack, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "firstTrack", "firstTrack")
  840. assert.NoError(t, err)
  841. _, err = pcOffer.AddTrack(firstTrack)
  842. assert.NoError(t, err)
  843. secondTrack, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "secondTrack", "secondTrack")
  844. assert.NoError(t, err)
  845. _, err = pcOffer.AddTrack(secondTrack)
  846. assert.NoError(t, err)
  847. assert.NoError(t, signalPairWithModification(pcOffer, pcAnswer, func(sessionDescription string) (filtered string) {
  848. shouldDiscard := false
  849. scanner := bufio.NewScanner(strings.NewReader(sessionDescription))
  850. for scanner.Scan() {
  851. if strings.HasPrefix(scanner.Text(), "m=video") {
  852. shouldDiscard = !shouldDiscard
  853. }
  854. if !shouldDiscard {
  855. filtered += scanner.Text() + "\r\n"
  856. }
  857. }
  858. return
  859. }))
  860. sequenceNumber := uint16(0)
  861. sendRTPPacket := func() {
  862. sequenceNumber++
  863. assert.NoError(t, firstTrack.WriteRTP(&rtp.Packet{
  864. Header: rtp.Header{
  865. Version: 2,
  866. SequenceNumber: sequenceNumber,
  867. },
  868. Payload: []byte{0x00},
  869. }))
  870. time.Sleep(20 * time.Millisecond)
  871. }
  872. for ; sequenceNumber <= 5; sequenceNumber++ {
  873. sendRTPPacket()
  874. }
  875. assert.NoError(t, signalPair(pcOffer, pcAnswer))
  876. trackRemoteChan := make(chan *TrackRemote, 1)
  877. pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
  878. trackRemoteChan <- trackRemote
  879. })
  880. trackRemote := func() *TrackRemote {
  881. for {
  882. select {
  883. case t := <-trackRemoteChan:
  884. return t
  885. default:
  886. sendRTPPacket()
  887. }
  888. }
  889. }()
  890. func() {
  891. for {
  892. select {
  893. case <-unhandledSimulcastError:
  894. return
  895. default:
  896. sendRTPPacket()
  897. }
  898. }
  899. }()
  900. _, _, err = trackRemote.Read(make([]byte, 1500))
  901. assert.NoError(t, err)
  902. closePairNow(t, pcOffer, pcAnswer)
  903. })
  904. }
  905. // Assert that CreateOffer returns an error for a RTPSender with no codecs
  906. // pion/webrtc#1702
  907. func TestPeerConnection_CreateOffer_NoCodecs(t *testing.T) {
  908. lim := test.TimeOut(time.Second * 30)
  909. defer lim.Stop()
  910. report := test.CheckRoutines(t)
  911. defer report()
  912. m := &MediaEngine{}
  913. pc, err := NewAPI(WithMediaEngine(m)).NewPeerConnection(Configuration{})
  914. assert.NoError(t, err)
  915. track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
  916. assert.NoError(t, err)
  917. _, err = pc.AddTrack(track)
  918. assert.NoError(t, err)
  919. _, err = pc.CreateOffer(nil)
  920. assert.Equal(t, err, ErrSenderWithNoCodecs)
  921. assert.NoError(t, pc.Close())
  922. }
  923. // Assert that AddTrack is thread-safe
  924. func TestPeerConnection_RaceReplaceTrack(t *testing.T) {
  925. pc, err := NewPeerConnection(Configuration{})
  926. assert.NoError(t, err)
  927. addTrack := func() *TrackLocalStaticSample {
  928. track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "foo", "bar")
  929. assert.NoError(t, err)
  930. _, err = pc.AddTrack(track)
  931. assert.NoError(t, err)
  932. return track
  933. }
  934. for i := 0; i < 10; i++ {
  935. addTrack()
  936. }
  937. for _, tr := range pc.GetTransceivers() {
  938. assert.NoError(t, pc.RemoveTrack(tr.Sender()))
  939. }
  940. var wg sync.WaitGroup
  941. tracks := make([]*TrackLocalStaticSample, 10)
  942. wg.Add(10)
  943. for i := 0; i < 10; i++ {
  944. go func(j int) {
  945. tracks[j] = addTrack()
  946. wg.Done()
  947. }(i)
  948. }
  949. wg.Wait()
  950. for _, track := range tracks {
  951. have := false
  952. for _, t := range pc.GetTransceivers() {
  953. if t.Sender() != nil && t.Sender().Track() == track {
  954. have = true
  955. break
  956. }
  957. }
  958. if !have {
  959. t.Errorf("track was added but not found on senders")
  960. }
  961. }
  962. assert.NoError(t, pc.Close())
  963. }
  964. func TestPeerConnection_Simulcast(t *testing.T) {
  965. lim := test.TimeOut(time.Second * 30)
  966. defer lim.Stop()
  967. report := test.CheckRoutines(t)
  968. defer report()
  969. rids := []string{"a", "b", "c"}
  970. var ridMapLock sync.RWMutex
  971. ridMap := map[string]int{}
  972. // Enable Extension Headers needed for Simulcast
  973. m := &MediaEngine{}
  974. if err := m.RegisterDefaultCodecs(); err != nil {
  975. panic(err)
  976. }
  977. registerSimulcastHeaderExtensions(m, RTPCodecTypeVideo)
  978. assertRidCorrect := func(t *testing.T) {
  979. ridMapLock.Lock()
  980. defer ridMapLock.Unlock()
  981. for _, rid := range rids {
  982. assert.Equal(t, ridMap[rid], 1)
  983. }
  984. assert.Equal(t, len(ridMap), 3)
  985. }
  986. ridsFullfilled := func() bool {
  987. ridMapLock.Lock()
  988. defer ridMapLock.Unlock()
  989. ridCount := len(ridMap)
  990. return ridCount == 3
  991. }
  992. onTrackHandler := func(trackRemote *TrackRemote, _ *RTPReceiver) {
  993. ridMapLock.Lock()
  994. defer ridMapLock.Unlock()
  995. ridMap[trackRemote.RID()] = ridMap[trackRemote.RID()] + 1
  996. }
  997. t.Run("RTP Extension Based", func(t *testing.T) {
  998. pcOffer, pcAnswer, err := NewAPI(WithMediaEngine(m)).newPair(Configuration{})
  999. assert.NoError(t, err)
  1000. vp8WriterA, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID("a"))
  1001. assert.NoError(t, err)
  1002. sender, err := pcOffer.AddTrack(vp8WriterA)
  1003. assert.NoError(t, err)
  1004. assert.NotNil(t, sender)
  1005. vp8WriterB, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID("b"))
  1006. assert.NoError(t, err)
  1007. err = sender.AddEncoding(vp8WriterB)
  1008. assert.NoError(t, err)
  1009. vp8WriterC, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID("c"))
  1010. assert.NoError(t, err)
  1011. err = sender.AddEncoding(vp8WriterC)
  1012. assert.NoError(t, err)
  1013. ridMap = map[string]int{}
  1014. pcAnswer.OnTrack(onTrackHandler)
  1015. parameters := sender.GetParameters()
  1016. assert.Equal(t, "a", parameters.Encodings[0].RID)
  1017. assert.Equal(t, "b", parameters.Encodings[1].RID)
  1018. assert.Equal(t, "c", parameters.Encodings[2].RID)
  1019. var midID, ridID, rsidID uint8
  1020. for _, extension := range parameters.HeaderExtensions {
  1021. switch extension.URI {
  1022. case sdp.SDESMidURI:
  1023. midID = uint8(extension.ID)
  1024. case sdp.SDESRTPStreamIDURI:
  1025. ridID = uint8(extension.ID)
  1026. case sdesRepairRTPStreamIDURI:
  1027. rsidID = uint8(extension.ID)
  1028. }
  1029. }
  1030. assert.NotZero(t, midID)
  1031. assert.NotZero(t, ridID)
  1032. assert.NotZero(t, rsidID)
  1033. assert.NoError(t, signalPair(pcOffer, pcAnswer))
  1034. for sequenceNumber := uint16(0); !ridsFullfilled(); sequenceNumber++ {
  1035. time.Sleep(20 * time.Millisecond)
  1036. for ssrc, rid := range rids {
  1037. header := &rtp.Header{
  1038. Version: 2,
  1039. SSRC: uint32(ssrc),
  1040. SequenceNumber: sequenceNumber,
  1041. PayloadType: 96,
  1042. }
  1043. assert.NoError(t, header.SetExtension(midID, []byte("0")))
  1044. // Send RSID for first 10 packets
  1045. if sequenceNumber >= 10 {
  1046. assert.NoError(t, header.SetExtension(ridID, []byte(rid)))
  1047. } else {
  1048. assert.NoError(t, header.SetExtension(rsidID, []byte(rid)))
  1049. header.SSRC += 10
  1050. }
  1051. var writer *TrackLocalStaticRTP
  1052. switch rid {
  1053. case "a":
  1054. writer = vp8WriterA
  1055. case "b":
  1056. writer = vp8WriterB
  1057. case "c":
  1058. writer = vp8WriterC
  1059. }
  1060. _, err = writer.bindings[0].writeStream.WriteRTP(header, []byte{0x00})
  1061. assert.NoError(t, err)
  1062. }
  1063. }
  1064. assertRidCorrect(t)
  1065. closePairNow(t, pcOffer, pcAnswer)
  1066. })
  1067. }
  1068. // Everytime we receieve a new SSRC we probe it and try to determine the proper way to handle it.
  1069. // In most cases a Track explicitly declares a SSRC and a OnTrack is fired. In two cases we don't
  1070. // know the SSRC ahead of time
  1071. // * Undeclared SSRC in a single media section (https://github.com/pion/webrtc/issues/880)
  1072. // * Simulcast
  1073. //
  1074. // The Undeclared SSRC processing code would run before Simulcast. If a Simulcast Offer/Answer only
  1075. // contained one Media Section we would never fire the OnTrack. We would assume it was a failed
  1076. // Undeclared SSRC processing. This test asserts that we properly handled this.
  1077. func TestPeerConnection_Simulcast_NoDataChannel(t *testing.T) {
  1078. lim := test.TimeOut(time.Second * 30)
  1079. defer lim.Stop()
  1080. report := test.CheckRoutines(t)
  1081. defer report()
  1082. // Enable Extension Headers needed for Simulcast
  1083. m := &MediaEngine{}
  1084. if err := m.RegisterDefaultCodecs(); err != nil {
  1085. panic(err)
  1086. }
  1087. registerSimulcastHeaderExtensions(m, RTPCodecTypeVideo)
  1088. pcSender, pcReceiver, err := NewAPI(WithMediaEngine(m)).newPair(Configuration{})
  1089. assert.NoError(t, err)
  1090. var wg sync.WaitGroup
  1091. wg.Add(4)
  1092. var connectionWg sync.WaitGroup
  1093. connectionWg.Add(2)
  1094. connectionStateChangeHandler := func(state PeerConnectionState) {
  1095. if state == PeerConnectionStateConnected {
  1096. connectionWg.Done()
  1097. }
  1098. }
  1099. pcSender.OnConnectionStateChange(connectionStateChangeHandler)
  1100. pcReceiver.OnConnectionStateChange(connectionStateChangeHandler)
  1101. pcReceiver.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
  1102. defer wg.Done()
  1103. })
  1104. go func() {
  1105. defer wg.Done()
  1106. vp8WriterA, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion", WithRTPStreamID("a"))
  1107. assert.NoError(t, err)
  1108. sender, err := pcSender.AddTrack(vp8WriterA)
  1109. assert.NoError(t, err)
  1110. assert.NotNil(t, sender)
  1111. vp8WriterB, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion", WithRTPStreamID("b"))
  1112. assert.NoError(t, err)
  1113. err = sender.AddEncoding(vp8WriterB)
  1114. assert.NoError(t, err)
  1115. vp8WriterC, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion", WithRTPStreamID("c"))
  1116. assert.NoError(t, err)
  1117. err = sender.AddEncoding(vp8WriterC)
  1118. assert.NoError(t, err)
  1119. parameters := sender.GetParameters()
  1120. var midID, ridID, rsidID uint8
  1121. for _, extension := range parameters.HeaderExtensions {
  1122. switch extension.URI {
  1123. case sdp.SDESMidURI:
  1124. midID = uint8(extension.ID)
  1125. case sdp.SDESRTPStreamIDURI:
  1126. ridID = uint8(extension.ID)
  1127. case sdesRepairRTPStreamIDURI:
  1128. rsidID = uint8(extension.ID)
  1129. }
  1130. }
  1131. assert.NotZero(t, midID)
  1132. assert.NotZero(t, ridID)
  1133. assert.NotZero(t, rsidID)
  1134. // signaling
  1135. offerSDP, err := pcSender.CreateOffer(nil)
  1136. assert.NoError(t, err)
  1137. err = pcSender.SetLocalDescription(offerSDP)
  1138. assert.NoError(t, err)
  1139. err = pcReceiver.SetRemoteDescription(offerSDP)
  1140. assert.NoError(t, err)
  1141. answerSDP, err := pcReceiver.CreateAnswer(nil)
  1142. assert.NoError(t, err)
  1143. answerGatheringComplete := GatheringCompletePromise(pcReceiver)
  1144. err = pcReceiver.SetLocalDescription(answerSDP)
  1145. assert.NoError(t, err)
  1146. <-answerGatheringComplete
  1147. assert.NoError(t, pcSender.SetRemoteDescription(*pcReceiver.LocalDescription()))
  1148. connectionWg.Wait()
  1149. var seqNo uint16
  1150. for i := 0; i < 100; i++ {
  1151. pkt := &rtp.Packet{
  1152. Header: rtp.Header{
  1153. Version: 2,
  1154. SequenceNumber: seqNo,
  1155. PayloadType: 96,
  1156. },
  1157. Payload: []byte{0x00, 0x00},
  1158. }
  1159. assert.NoError(t, pkt.SetExtension(ridID, []byte("a")))
  1160. assert.NoError(t, pkt.SetExtension(midID, []byte(sender.rtpTransceiver.Mid())))
  1161. assert.NoError(t, vp8WriterA.WriteRTP(pkt))
  1162. assert.NoError(t, pkt.SetExtension(ridID, []byte("b")))
  1163. assert.NoError(t, pkt.SetExtension(midID, []byte(sender.rtpTransceiver.Mid())))
  1164. assert.NoError(t, vp8WriterB.WriteRTP(pkt))
  1165. assert.NoError(t, pkt.SetExtension(ridID, []byte("c")))
  1166. assert.NoError(t, pkt.SetExtension(midID, []byte(sender.rtpTransceiver.Mid())))
  1167. assert.NoError(t, vp8WriterC.WriteRTP(pkt))
  1168. seqNo++
  1169. }
  1170. }()
  1171. wg.Wait()
  1172. closePairNow(t, pcSender, pcReceiver)
  1173. }