datachannel_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. package webrtc
  4. import (
  5. "fmt"
  6. "io"
  7. "sync"
  8. "testing"
  9. "time"
  10. "github.com/pion/transport/v2/test"
  11. "github.com/stretchr/testify/assert"
  12. )
  13. // expectedLabel represents the label of the data channel we are trying to test.
  14. // Some other channels may have been created during initialization (in the Wasm
  15. // bindings this is a requirement).
  16. const expectedLabel = "data"
  17. func closePairNow(t testing.TB, pc1, pc2 io.Closer) {
  18. var fail bool
  19. if err := pc1.Close(); err != nil {
  20. t.Errorf("Failed to close PeerConnection: %v", err)
  21. fail = true
  22. }
  23. if err := pc2.Close(); err != nil {
  24. t.Errorf("Failed to close PeerConnection: %v", err)
  25. fail = true
  26. }
  27. if fail {
  28. t.FailNow()
  29. }
  30. }
  31. func closePair(t *testing.T, pc1, pc2 io.Closer, done <-chan bool) {
  32. select {
  33. case <-time.After(10 * time.Second):
  34. t.Fatalf("closePair timed out waiting for done signal")
  35. case <-done:
  36. closePairNow(t, pc1, pc2)
  37. }
  38. }
  39. func setUpDataChannelParametersTest(t *testing.T, options *DataChannelInit) (*PeerConnection, *PeerConnection, *DataChannel, chan bool) {
  40. offerPC, answerPC, err := newPair()
  41. if err != nil {
  42. t.Fatalf("Failed to create a PC pair for testing")
  43. }
  44. done := make(chan bool)
  45. dc, err := offerPC.CreateDataChannel(expectedLabel, options)
  46. if err != nil {
  47. t.Fatalf("Failed to create a PC pair for testing")
  48. }
  49. return offerPC, answerPC, dc, done
  50. }
  51. func closeReliabilityParamTest(t *testing.T, pc1, pc2 *PeerConnection, done chan bool) {
  52. err := signalPair(pc1, pc2)
  53. if err != nil {
  54. t.Fatalf("Failed to signal our PC pair for testing")
  55. }
  56. closePair(t, pc1, pc2, done)
  57. }
  58. func BenchmarkDataChannelSend2(b *testing.B) { benchmarkDataChannelSend(b, 2) }
  59. func BenchmarkDataChannelSend4(b *testing.B) { benchmarkDataChannelSend(b, 4) }
  60. func BenchmarkDataChannelSend8(b *testing.B) { benchmarkDataChannelSend(b, 8) }
  61. func BenchmarkDataChannelSend16(b *testing.B) { benchmarkDataChannelSend(b, 16) }
  62. func BenchmarkDataChannelSend32(b *testing.B) { benchmarkDataChannelSend(b, 32) }
  63. // See https://github.com/pion/webrtc/issues/1516
  64. func benchmarkDataChannelSend(b *testing.B, numChannels int) {
  65. offerPC, answerPC, err := newPair()
  66. if err != nil {
  67. b.Fatalf("Failed to create a PC pair for testing")
  68. }
  69. open := make(map[string]chan bool)
  70. answerPC.OnDataChannel(func(d *DataChannel) {
  71. if _, ok := open[d.Label()]; !ok {
  72. // Ignore anything unknown channel label.
  73. return
  74. }
  75. d.OnOpen(func() { open[d.Label()] <- true })
  76. })
  77. var wg sync.WaitGroup
  78. for i := 0; i < numChannels; i++ {
  79. label := fmt.Sprintf("dc-%d", i)
  80. open[label] = make(chan bool)
  81. wg.Add(1)
  82. dc, err := offerPC.CreateDataChannel(label, nil)
  83. assert.NoError(b, err)
  84. dc.OnOpen(func() {
  85. <-open[label]
  86. for n := 0; n < b.N/numChannels; n++ {
  87. if err := dc.SendText("Ping"); err != nil {
  88. b.Fatalf("Unexpected error sending data (label=%q): %v", label, err)
  89. }
  90. }
  91. wg.Done()
  92. })
  93. }
  94. assert.NoError(b, signalPair(offerPC, answerPC))
  95. wg.Wait()
  96. closePairNow(b, offerPC, answerPC)
  97. }
  98. func TestDataChannel_Open(t *testing.T) {
  99. const openOnceChannelCapacity = 2
  100. t.Run("handler should be called once", func(t *testing.T) {
  101. report := test.CheckRoutines(t)
  102. defer report()
  103. offerPC, answerPC, err := newPair()
  104. if err != nil {
  105. t.Fatalf("Failed to create a PC pair for testing")
  106. }
  107. done := make(chan bool)
  108. openCalls := make(chan bool, openOnceChannelCapacity)
  109. answerPC.OnDataChannel(func(d *DataChannel) {
  110. if d.Label() != expectedLabel {
  111. return
  112. }
  113. d.OnOpen(func() {
  114. openCalls <- true
  115. })
  116. d.OnMessage(func(msg DataChannelMessage) {
  117. go func() {
  118. // Wait a little bit to ensure all messages are processed.
  119. time.Sleep(100 * time.Millisecond)
  120. done <- true
  121. }()
  122. })
  123. })
  124. dc, err := offerPC.CreateDataChannel(expectedLabel, nil)
  125. assert.NoError(t, err)
  126. dc.OnOpen(func() {
  127. e := dc.SendText("Ping")
  128. if e != nil {
  129. t.Fatalf("Failed to send string on data channel")
  130. }
  131. })
  132. assert.NoError(t, signalPair(offerPC, answerPC))
  133. closePair(t, offerPC, answerPC, done)
  134. assert.Len(t, openCalls, 1)
  135. })
  136. t.Run("handler should be called once when already negotiated", func(t *testing.T) {
  137. report := test.CheckRoutines(t)
  138. defer report()
  139. offerPC, answerPC, err := newPair()
  140. if err != nil {
  141. t.Fatalf("Failed to create a PC pair for testing")
  142. }
  143. done := make(chan bool)
  144. answerOpenCalls := make(chan bool, openOnceChannelCapacity)
  145. offerOpenCalls := make(chan bool, openOnceChannelCapacity)
  146. negotiated := true
  147. ordered := true
  148. dataChannelID := uint16(0)
  149. answerDC, err := answerPC.CreateDataChannel(expectedLabel, &DataChannelInit{
  150. ID: &dataChannelID,
  151. Negotiated: &negotiated,
  152. Ordered: &ordered,
  153. })
  154. assert.NoError(t, err)
  155. offerDC, err := offerPC.CreateDataChannel(expectedLabel, &DataChannelInit{
  156. ID: &dataChannelID,
  157. Negotiated: &negotiated,
  158. Ordered: &ordered,
  159. })
  160. assert.NoError(t, err)
  161. answerDC.OnMessage(func(msg DataChannelMessage) {
  162. go func() {
  163. // Wait a little bit to ensure all messages are processed.
  164. time.Sleep(100 * time.Millisecond)
  165. done <- true
  166. }()
  167. })
  168. answerDC.OnOpen(func() {
  169. answerOpenCalls <- true
  170. })
  171. offerDC.OnOpen(func() {
  172. offerOpenCalls <- true
  173. e := offerDC.SendText("Ping")
  174. if e != nil {
  175. t.Fatalf("Failed to send string on data channel")
  176. }
  177. })
  178. assert.NoError(t, signalPair(offerPC, answerPC))
  179. closePair(t, offerPC, answerPC, done)
  180. assert.Len(t, answerOpenCalls, 1)
  181. assert.Len(t, offerOpenCalls, 1)
  182. })
  183. }
  184. func TestDataChannel_Send(t *testing.T) {
  185. t.Run("before signaling", func(t *testing.T) {
  186. report := test.CheckRoutines(t)
  187. defer report()
  188. offerPC, answerPC, err := newPair()
  189. if err != nil {
  190. t.Fatalf("Failed to create a PC pair for testing")
  191. }
  192. done := make(chan bool)
  193. answerPC.OnDataChannel(func(d *DataChannel) {
  194. // Make sure this is the data channel we were looking for. (Not the one
  195. // created in signalPair).
  196. if d.Label() != expectedLabel {
  197. return
  198. }
  199. d.OnMessage(func(msg DataChannelMessage) {
  200. e := d.Send([]byte("Pong"))
  201. if e != nil {
  202. t.Fatalf("Failed to send string on data channel")
  203. }
  204. })
  205. assert.True(t, d.Ordered(), "Ordered should be set to true")
  206. })
  207. dc, err := offerPC.CreateDataChannel(expectedLabel, nil)
  208. if err != nil {
  209. t.Fatalf("Failed to create a PC pair for testing")
  210. }
  211. assert.True(t, dc.Ordered(), "Ordered should be set to true")
  212. dc.OnOpen(func() {
  213. e := dc.SendText("Ping")
  214. if e != nil {
  215. t.Fatalf("Failed to send string on data channel")
  216. }
  217. })
  218. dc.OnMessage(func(msg DataChannelMessage) {
  219. done <- true
  220. })
  221. err = signalPair(offerPC, answerPC)
  222. if err != nil {
  223. t.Fatalf("Failed to signal our PC pair for testing: %+v", err)
  224. }
  225. closePair(t, offerPC, answerPC, done)
  226. })
  227. t.Run("after connected", func(t *testing.T) {
  228. report := test.CheckRoutines(t)
  229. defer report()
  230. offerPC, answerPC, err := newPair()
  231. if err != nil {
  232. t.Fatalf("Failed to create a PC pair for testing")
  233. }
  234. done := make(chan bool)
  235. answerPC.OnDataChannel(func(d *DataChannel) {
  236. // Make sure this is the data channel we were looking for. (Not the one
  237. // created in signalPair).
  238. if d.Label() != expectedLabel {
  239. return
  240. }
  241. d.OnMessage(func(msg DataChannelMessage) {
  242. e := d.Send([]byte("Pong"))
  243. if e != nil {
  244. t.Fatalf("Failed to send string on data channel")
  245. }
  246. })
  247. assert.True(t, d.Ordered(), "Ordered should be set to true")
  248. })
  249. once := &sync.Once{}
  250. offerPC.OnICEConnectionStateChange(func(state ICEConnectionState) {
  251. if state == ICEConnectionStateConnected || state == ICEConnectionStateCompleted {
  252. // wasm fires completed state multiple times
  253. once.Do(func() {
  254. dc, createErr := offerPC.CreateDataChannel(expectedLabel, nil)
  255. if createErr != nil {
  256. t.Fatalf("Failed to create a PC pair for testing")
  257. }
  258. assert.True(t, dc.Ordered(), "Ordered should be set to true")
  259. dc.OnMessage(func(msg DataChannelMessage) {
  260. done <- true
  261. })
  262. if e := dc.SendText("Ping"); e != nil {
  263. // wasm binding doesn't fire OnOpen (we probably already missed it)
  264. dc.OnOpen(func() {
  265. e = dc.SendText("Ping")
  266. if e != nil {
  267. t.Fatalf("Failed to send string on data channel")
  268. }
  269. })
  270. }
  271. })
  272. }
  273. })
  274. err = signalPair(offerPC, answerPC)
  275. if err != nil {
  276. t.Fatalf("Failed to signal our PC pair for testing")
  277. }
  278. closePair(t, offerPC, answerPC, done)
  279. })
  280. }
  281. func TestDataChannel_Close(t *testing.T) {
  282. report := test.CheckRoutines(t)
  283. defer report()
  284. t.Run("Close after PeerConnection Closed", func(t *testing.T) {
  285. offerPC, answerPC, err := newPair()
  286. assert.NoError(t, err)
  287. dc, err := offerPC.CreateDataChannel(expectedLabel, nil)
  288. assert.NoError(t, err)
  289. closePairNow(t, offerPC, answerPC)
  290. assert.NoError(t, dc.Close())
  291. })
  292. t.Run("Close before connected", func(t *testing.T) {
  293. offerPC, answerPC, err := newPair()
  294. assert.NoError(t, err)
  295. dc, err := offerPC.CreateDataChannel(expectedLabel, nil)
  296. assert.NoError(t, err)
  297. assert.NoError(t, dc.Close())
  298. closePairNow(t, offerPC, answerPC)
  299. })
  300. }
  301. func TestDataChannelParameters(t *testing.T) {
  302. report := test.CheckRoutines(t)
  303. defer report()
  304. t.Run("MaxPacketLifeTime exchange", func(t *testing.T) {
  305. ordered := true
  306. maxPacketLifeTime := uint16(3)
  307. options := &DataChannelInit{
  308. Ordered: &ordered,
  309. MaxPacketLifeTime: &maxPacketLifeTime,
  310. }
  311. offerPC, answerPC, dc, done := setUpDataChannelParametersTest(t, options)
  312. // Check if parameters are correctly set
  313. assert.Equal(t, dc.Ordered(), ordered, "Ordered should be same value as set in DataChannelInit")
  314. if assert.NotNil(t, dc.MaxPacketLifeTime(), "should not be nil") {
  315. assert.Equal(t, maxPacketLifeTime, *dc.MaxPacketLifeTime(), "should match")
  316. }
  317. answerPC.OnDataChannel(func(d *DataChannel) {
  318. if d.Label() != expectedLabel {
  319. return
  320. }
  321. // Check if parameters are correctly set
  322. assert.Equal(t, d.Ordered(), ordered, "Ordered should be same value as set in DataChannelInit")
  323. if assert.NotNil(t, d.MaxPacketLifeTime(), "should not be nil") {
  324. assert.Equal(t, maxPacketLifeTime, *d.MaxPacketLifeTime(), "should match")
  325. }
  326. done <- true
  327. })
  328. closeReliabilityParamTest(t, offerPC, answerPC, done)
  329. })
  330. t.Run("MaxRetransmits exchange", func(t *testing.T) {
  331. ordered := false
  332. maxRetransmits := uint16(3000)
  333. options := &DataChannelInit{
  334. Ordered: &ordered,
  335. MaxRetransmits: &maxRetransmits,
  336. }
  337. offerPC, answerPC, dc, done := setUpDataChannelParametersTest(t, options)
  338. // Check if parameters are correctly set
  339. assert.False(t, dc.Ordered(), "Ordered should be set to false")
  340. if assert.NotNil(t, dc.MaxRetransmits(), "should not be nil") {
  341. assert.Equal(t, maxRetransmits, *dc.MaxRetransmits(), "should match")
  342. }
  343. answerPC.OnDataChannel(func(d *DataChannel) {
  344. // Make sure this is the data channel we were looking for. (Not the one
  345. // created in signalPair).
  346. if d.Label() != expectedLabel {
  347. return
  348. }
  349. // Check if parameters are correctly set
  350. assert.False(t, d.Ordered(), "Ordered should be set to false")
  351. if assert.NotNil(t, d.MaxRetransmits(), "should not be nil") {
  352. assert.Equal(t, maxRetransmits, *d.MaxRetransmits(), "should match")
  353. }
  354. done <- true
  355. })
  356. closeReliabilityParamTest(t, offerPC, answerPC, done)
  357. })
  358. t.Run("Protocol exchange", func(t *testing.T) {
  359. protocol := "json"
  360. options := &DataChannelInit{
  361. Protocol: &protocol,
  362. }
  363. offerPC, answerPC, dc, done := setUpDataChannelParametersTest(t, options)
  364. // Check if parameters are correctly set
  365. assert.Equal(t, protocol, dc.Protocol(), "Protocol should match DataChannelInit")
  366. answerPC.OnDataChannel(func(d *DataChannel) {
  367. // Make sure this is the data channel we were looking for. (Not the one
  368. // created in signalPair).
  369. if d.Label() != expectedLabel {
  370. return
  371. }
  372. // Check if parameters are correctly set
  373. assert.Equal(t, protocol, d.Protocol(), "Protocol should match what channel creator declared")
  374. done <- true
  375. })
  376. closeReliabilityParamTest(t, offerPC, answerPC, done)
  377. })
  378. t.Run("Negotiated exchange", func(t *testing.T) {
  379. const expectedMessage = "Hello World"
  380. negotiated := true
  381. var id uint16 = 500
  382. options := &DataChannelInit{
  383. Negotiated: &negotiated,
  384. ID: &id,
  385. }
  386. offerPC, answerPC, offerDatachannel, done := setUpDataChannelParametersTest(t, options)
  387. answerDatachannel, err := answerPC.CreateDataChannel(expectedLabel, options)
  388. assert.NoError(t, err)
  389. answerPC.OnDataChannel(func(d *DataChannel) {
  390. // Ignore our default channel, exists to force ICE candidates. See signalPair for more info
  391. if d.Label() == "initial_data_channel" {
  392. return
  393. }
  394. t.Fatal("OnDataChannel must not be fired when negotiated == true")
  395. })
  396. offerPC.OnDataChannel(func(d *DataChannel) {
  397. t.Fatal("OnDataChannel must not be fired when negotiated == true")
  398. })
  399. seenAnswerMessage := &atomicBool{}
  400. seenOfferMessage := &atomicBool{}
  401. answerDatachannel.OnMessage(func(msg DataChannelMessage) {
  402. if msg.IsString && string(msg.Data) == expectedMessage {
  403. seenAnswerMessage.set(true)
  404. }
  405. })
  406. offerDatachannel.OnMessage(func(msg DataChannelMessage) {
  407. if msg.IsString && string(msg.Data) == expectedMessage {
  408. seenOfferMessage.set(true)
  409. }
  410. })
  411. go func() {
  412. for {
  413. if seenAnswerMessage.get() && seenOfferMessage.get() {
  414. break
  415. }
  416. if offerDatachannel.ReadyState() == DataChannelStateOpen {
  417. assert.NoError(t, offerDatachannel.SendText(expectedMessage))
  418. }
  419. if answerDatachannel.ReadyState() == DataChannelStateOpen {
  420. assert.NoError(t, answerDatachannel.SendText(expectedMessage))
  421. }
  422. time.Sleep(500 * time.Millisecond)
  423. }
  424. done <- true
  425. }()
  426. closeReliabilityParamTest(t, offerPC, answerPC, done)
  427. })
  428. }