| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694 |
- // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
- // SPDX-License-Identifier: MIT
- //go:build !js
- // +build !js
- package webrtc
- import (
- "bytes"
- "crypto/rand"
- "encoding/binary"
- "io"
- "io/ioutil"
- "math/big"
- "reflect"
- "regexp"
- "strings"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "github.com/pion/datachannel"
- "github.com/pion/logging"
- "github.com/pion/transport/v2/test"
- "github.com/stretchr/testify/assert"
- )
- func TestDataChannel_EventHandlers(t *testing.T) {
- to := test.TimeOut(time.Second * 20)
- defer to.Stop()
- report := test.CheckRoutines(t)
- defer report()
- api := NewAPI()
- dc := &DataChannel{api: api}
- onDialCalled := make(chan struct{})
- onOpenCalled := make(chan struct{})
- onMessageCalled := make(chan struct{})
- // Verify that the noop case works
- assert.NotPanics(t, func() { dc.onOpen() })
- dc.OnDial(func() {
- close(onDialCalled)
- })
- dc.OnOpen(func() {
- close(onOpenCalled)
- })
- dc.OnMessage(func(p DataChannelMessage) {
- close(onMessageCalled)
- })
- // Verify that the set handlers are called
- assert.NotPanics(t, func() { dc.onDial() })
- assert.NotPanics(t, func() { dc.onOpen() })
- assert.NotPanics(t, func() { dc.onMessage(DataChannelMessage{Data: []byte("o hai")}) })
- // Wait for all handlers to be called
- <-onDialCalled
- <-onOpenCalled
- <-onMessageCalled
- }
- func TestDataChannel_MessagesAreOrdered(t *testing.T) {
- report := test.CheckRoutines(t)
- defer report()
- api := NewAPI()
- dc := &DataChannel{api: api}
- max := 512
- out := make(chan int)
- inner := func(msg DataChannelMessage) {
- // randomly sleep
- // math/rand a weak RNG, but this does not need to be secure. Ignore with #nosec
- /* #nosec */
- randInt, err := rand.Int(rand.Reader, big.NewInt(int64(max)))
- /* #nosec */ if err != nil {
- t.Fatalf("Failed to get random sleep duration: %s", err)
- }
- time.Sleep(time.Duration(randInt.Int64()) * time.Microsecond)
- s, _ := binary.Varint(msg.Data)
- out <- int(s)
- }
- dc.OnMessage(func(p DataChannelMessage) {
- inner(p)
- })
- go func() {
- for i := 1; i <= max; i++ {
- buf := make([]byte, 8)
- binary.PutVarint(buf, int64(i))
- dc.onMessage(DataChannelMessage{Data: buf})
- // Change the registered handler a couple of times to make sure
- // that everything continues to work, we don't lose messages, etc.
- if i%2 == 0 {
- handler := func(msg DataChannelMessage) {
- inner(msg)
- }
- dc.OnMessage(handler)
- }
- }
- }()
- values := make([]int, 0, max)
- for v := range out {
- values = append(values, v)
- if len(values) == max {
- close(out)
- }
- }
- expected := make([]int, max)
- for i := 1; i <= max; i++ {
- expected[i-1] = i
- }
- assert.EqualValues(t, expected, values)
- }
- // Note(albrow): This test includes some features that aren't supported by the
- // Wasm bindings (at least for now).
- func TestDataChannelParamters_Go(t *testing.T) {
- report := test.CheckRoutines(t)
- defer report()
- t.Run("MaxPacketLifeTime exchange", func(t *testing.T) {
- ordered := true
- var maxPacketLifeTime uint16 = 3
- options := &DataChannelInit{
- Ordered: &ordered,
- MaxPacketLifeTime: &maxPacketLifeTime,
- }
- offerPC, answerPC, dc, done := setUpDataChannelParametersTest(t, options)
- // Check if parameters are correctly set
- assert.True(t, dc.Ordered(), "Ordered should be set to true")
- if assert.NotNil(t, dc.MaxPacketLifeTime(), "should not be nil") {
- assert.Equal(t, maxPacketLifeTime, *dc.MaxPacketLifeTime(), "should match")
- }
- answerPC.OnDataChannel(func(d *DataChannel) {
- // Make sure this is the data channel we were looking for. (Not the one
- // created in signalPair).
- if d.Label() != expectedLabel {
- return
- }
- // Check if parameters are correctly set
- assert.True(t, d.ordered, "Ordered should be set to true")
- if assert.NotNil(t, d.maxPacketLifeTime, "should not be nil") {
- assert.Equal(t, maxPacketLifeTime, *d.maxPacketLifeTime, "should match")
- }
- done <- true
- })
- closeReliabilityParamTest(t, offerPC, answerPC, done)
- })
- t.Run("All other property methods", func(t *testing.T) {
- id := uint16(123)
- dc := &DataChannel{}
- dc.id = &id
- dc.label = "mylabel"
- dc.protocol = "myprotocol"
- dc.negotiated = true
- assert.Equal(t, dc.id, dc.ID(), "should match")
- assert.Equal(t, dc.label, dc.Label(), "should match")
- assert.Equal(t, dc.protocol, dc.Protocol(), "should match")
- assert.Equal(t, dc.negotiated, dc.Negotiated(), "should match")
- assert.Equal(t, uint64(0), dc.BufferedAmount(), "should match")
- dc.SetBufferedAmountLowThreshold(1500)
- assert.Equal(t, uint64(1500), dc.BufferedAmountLowThreshold(), "should match")
- })
- }
- func TestDataChannelBufferedAmount(t *testing.T) {
- t.Run("set before datachannel becomes open", func(t *testing.T) {
- report := test.CheckRoutines(t)
- defer report()
- var nOfferBufferedAmountLowCbs uint32
- var offerBufferedAmountLowThreshold uint64 = 1500
- var nAnswerBufferedAmountLowCbs uint32
- var answerBufferedAmountLowThreshold uint64 = 1400
- buf := make([]byte, 1000)
- offerPC, answerPC, err := newPair()
- if err != nil {
- t.Fatalf("Failed to create a PC pair for testing")
- }
- nPacketsToSend := int(10)
- var nOfferReceived uint32
- var nAnswerReceived uint32
- done := make(chan bool)
- answerPC.OnDataChannel(func(answerDC *DataChannel) {
- // Make sure this is the data channel we were looking for. (Not the one
- // created in signalPair).
- if answerDC.Label() != expectedLabel {
- return
- }
- answerDC.OnOpen(func() {
- assert.Equal(t, answerBufferedAmountLowThreshold, answerDC.BufferedAmountLowThreshold(), "value mismatch")
- for i := 0; i < nPacketsToSend; i++ {
- e := answerDC.Send(buf)
- if e != nil {
- t.Fatalf("Failed to send string on data channel")
- }
- }
- })
- answerDC.OnMessage(func(msg DataChannelMessage) {
- atomic.AddUint32(&nAnswerReceived, 1)
- })
- assert.True(t, answerDC.Ordered(), "Ordered should be set to true")
- // The value is temporarily stored in the answerDC object
- // until the answerDC gets opened
- answerDC.SetBufferedAmountLowThreshold(answerBufferedAmountLowThreshold)
- // The callback function is temporarily stored in the answerDC object
- // until the answerDC gets opened
- answerDC.OnBufferedAmountLow(func() {
- atomic.AddUint32(&nAnswerBufferedAmountLowCbs, 1)
- if atomic.LoadUint32(&nOfferBufferedAmountLowCbs) > 0 {
- done <- true
- }
- })
- })
- offerDC, err := offerPC.CreateDataChannel(expectedLabel, nil)
- if err != nil {
- t.Fatalf("Failed to create a PC pair for testing")
- }
- assert.True(t, offerDC.Ordered(), "Ordered should be set to true")
- offerDC.OnOpen(func() {
- assert.Equal(t, offerBufferedAmountLowThreshold, offerDC.BufferedAmountLowThreshold(), "value mismatch")
- for i := 0; i < nPacketsToSend; i++ {
- e := offerDC.Send(buf)
- if e != nil {
- t.Fatalf("Failed to send string on data channel")
- }
- // assert.Equal(t, (i+1)*len(buf), int(offerDC.BufferedAmount()), "unexpected bufferedAmount")
- }
- })
- offerDC.OnMessage(func(msg DataChannelMessage) {
- atomic.AddUint32(&nOfferReceived, 1)
- })
- // The value is temporarily stored in the offerDC object
- // until the offerDC gets opened
- offerDC.SetBufferedAmountLowThreshold(offerBufferedAmountLowThreshold)
- // The callback function is temporarily stored in the offerDC object
- // until the offerDC gets opened
- offerDC.OnBufferedAmountLow(func() {
- atomic.AddUint32(&nOfferBufferedAmountLowCbs, 1)
- if atomic.LoadUint32(&nAnswerBufferedAmountLowCbs) > 0 {
- done <- true
- }
- })
- err = signalPair(offerPC, answerPC)
- if err != nil {
- t.Fatalf("Failed to signal our PC pair for testing")
- }
- closePair(t, offerPC, answerPC, done)
- t.Logf("nOfferBufferedAmountLowCbs : %d", nOfferBufferedAmountLowCbs)
- t.Logf("nAnswerBufferedAmountLowCbs: %d", nAnswerBufferedAmountLowCbs)
- assert.True(t, nOfferBufferedAmountLowCbs > uint32(0), "callback should be made at least once")
- assert.True(t, nAnswerBufferedAmountLowCbs > uint32(0), "callback should be made at least once")
- })
- t.Run("set after datachannel becomes open", func(t *testing.T) {
- report := test.CheckRoutines(t)
- defer report()
- var nCbs int
- buf := make([]byte, 1000)
- offerPC, answerPC, err := newPair()
- if err != nil {
- t.Fatalf("Failed to create a PC pair for testing")
- }
- done := make(chan bool)
- answerPC.OnDataChannel(func(d *DataChannel) {
- // Make sure this is the data channel we were looking for. (Not the one
- // created in signalPair).
- if d.Label() != expectedLabel {
- return
- }
- var nPacketsReceived int
- d.OnMessage(func(msg DataChannelMessage) {
- nPacketsReceived++
- if nPacketsReceived == 10 {
- go func() {
- time.Sleep(time.Second)
- done <- true
- }()
- }
- })
- assert.True(t, d.Ordered(), "Ordered should be set to true")
- })
- dc, err := offerPC.CreateDataChannel(expectedLabel, nil)
- if err != nil {
- t.Fatalf("Failed to create a PC pair for testing")
- }
- assert.True(t, dc.Ordered(), "Ordered should be set to true")
- dc.OnOpen(func() {
- // The value should directly be passed to sctp
- dc.SetBufferedAmountLowThreshold(1500)
- // The callback function should directly be passed to sctp
- dc.OnBufferedAmountLow(func() {
- nCbs++
- })
- for i := 0; i < 10; i++ {
- e := dc.Send(buf)
- if e != nil {
- t.Fatalf("Failed to send string on data channel")
- }
- assert.Equal(t, uint64(1500), dc.BufferedAmountLowThreshold(), "value mismatch")
- // assert.Equal(t, (i+1)*len(buf), int(dc.BufferedAmount()), "unexpected bufferedAmount")
- }
- })
- dc.OnMessage(func(msg DataChannelMessage) {
- })
- err = signalPair(offerPC, answerPC)
- if err != nil {
- t.Fatalf("Failed to signal our PC pair for testing")
- }
- closePair(t, offerPC, answerPC, done)
- assert.True(t, nCbs > 0, "callback should be made at least once")
- })
- }
- func TestEOF(t *testing.T) {
- report := test.CheckRoutines(t)
- defer report()
- log := logging.NewDefaultLoggerFactory().NewLogger("test")
- label := "test-channel"
- testData := []byte("this is some test data")
- t.Run("Detach", func(t *testing.T) {
- // Use Detach data channels mode
- s := SettingEngine{}
- s.DetachDataChannels()
- api := NewAPI(WithSettingEngine(s))
- // Set up two peer connections.
- config := Configuration{}
- pca, err := api.NewPeerConnection(config)
- if err != nil {
- t.Fatal(err)
- }
- pcb, err := api.NewPeerConnection(config)
- if err != nil {
- t.Fatal(err)
- }
- defer closePairNow(t, pca, pcb)
- var wg sync.WaitGroup
- dcChan := make(chan datachannel.ReadWriteCloser)
- pcb.OnDataChannel(func(dc *DataChannel) {
- if dc.Label() != label {
- return
- }
- log.Debug("OnDataChannel was called")
- dc.OnOpen(func() {
- detached, err2 := dc.Detach()
- if err2 != nil {
- log.Debugf("Detach failed: %s", err2.Error())
- t.Error(err2)
- }
- dcChan <- detached
- })
- })
- wg.Add(1)
- go func() {
- defer wg.Done()
- var msg []byte
- log.Debug("Waiting for OnDataChannel")
- dc := <-dcChan
- log.Debug("data channel opened")
- defer func() { assert.NoError(t, dc.Close(), "should succeed") }()
- log.Debug("Waiting for ping...")
- msg, err2 := ioutil.ReadAll(dc)
- log.Debugf("Received ping! \"%s\"", string(msg))
- if err2 != nil {
- t.Error(err2)
- }
- if !bytes.Equal(msg, testData) {
- t.Errorf("expected %q, got %q", string(msg), string(testData))
- } else {
- log.Debug("Received ping successfully!")
- }
- }()
- if err = signalPair(pca, pcb); err != nil {
- t.Fatal(err)
- }
- attached, err := pca.CreateDataChannel(label, nil)
- if err != nil {
- t.Fatal(err)
- }
- log.Debug("Waiting for data channel to open")
- open := make(chan struct{})
- attached.OnOpen(func() {
- open <- struct{}{}
- })
- <-open
- log.Debug("data channel opened")
- var dc io.ReadWriteCloser
- dc, err = attached.Detach()
- if err != nil {
- t.Fatal(err)
- }
- wg.Add(1)
- go func() {
- defer wg.Done()
- log.Debug("Sending ping...")
- if _, err2 := dc.Write(testData); err2 != nil {
- t.Error(err2)
- }
- log.Debug("Sent ping")
- assert.NoError(t, dc.Close(), "should succeed")
- log.Debug("Wating for EOF")
- ret, err2 := ioutil.ReadAll(dc)
- assert.Nil(t, err2, "should succeed")
- assert.Equal(t, 0, len(ret), "should be empty")
- }()
- wg.Wait()
- })
- t.Run("No detach", func(t *testing.T) {
- lim := test.TimeOut(time.Second * 5)
- defer lim.Stop()
- // Set up two peer connections.
- config := Configuration{}
- pca, err := NewPeerConnection(config)
- if err != nil {
- t.Fatal(err)
- }
- pcb, err := NewPeerConnection(config)
- if err != nil {
- t.Fatal(err)
- }
- defer closePairNow(t, pca, pcb)
- var dca, dcb *DataChannel
- dcaClosedCh := make(chan struct{})
- dcbClosedCh := make(chan struct{})
- pcb.OnDataChannel(func(dc *DataChannel) {
- if dc.Label() != label {
- return
- }
- log.Debugf("pcb: new datachannel: %s", dc.Label())
- dcb = dc
- // Register channel opening handling
- dcb.OnOpen(func() {
- log.Debug("pcb: datachannel opened")
- })
- dcb.OnClose(func() {
- // (2)
- log.Debug("pcb: data channel closed")
- close(dcbClosedCh)
- })
- // Register the OnMessage to handle incoming messages
- log.Debug("pcb: registering onMessage callback")
- dcb.OnMessage(func(dcMsg DataChannelMessage) {
- log.Debugf("pcb: received ping: %s", string(dcMsg.Data))
- if !reflect.DeepEqual(dcMsg.Data, testData) {
- t.Error("data mismatch")
- }
- })
- })
- dca, err = pca.CreateDataChannel(label, nil)
- if err != nil {
- t.Fatal(err)
- }
- dca.OnOpen(func() {
- log.Debug("pca: data channel opened")
- log.Debugf("pca: sending \"%s\"", string(testData))
- if err := dca.Send(testData); err != nil {
- t.Fatal(err)
- }
- log.Debug("pca: sent ping")
- assert.NoError(t, dca.Close(), "should succeed") // <-- dca closes
- })
- dca.OnClose(func() {
- // (1)
- log.Debug("pca: data channel closed")
- close(dcaClosedCh)
- })
- // Register the OnMessage to handle incoming messages
- log.Debug("pca: registering onMessage callback")
- dca.OnMessage(func(dcMsg DataChannelMessage) {
- log.Debugf("pca: received pong: %s", string(dcMsg.Data))
- if !reflect.DeepEqual(dcMsg.Data, testData) {
- t.Error("data mismatch")
- }
- })
- if err := signalPair(pca, pcb); err != nil {
- t.Fatal(err)
- }
- // When dca closes the channel,
- // (1) dca.Onclose() will fire immediately, then
- // (2) dcb.OnClose will also fire
- <-dcaClosedCh // (1)
- <-dcbClosedCh // (2)
- })
- }
- // Assert that a Session Description that doesn't follow
- // draft-ietf-mmusic-sctp-sdp is still accepted
- func TestDataChannel_NonStandardSessionDescription(t *testing.T) {
- to := test.TimeOut(time.Second * 20)
- defer to.Stop()
- report := test.CheckRoutines(t)
- defer report()
- offerPC, answerPC, err := newPair()
- assert.NoError(t, err)
- _, err = offerPC.CreateDataChannel("foo", nil)
- assert.NoError(t, err)
- onDataChannelCalled := make(chan struct{})
- answerPC.OnDataChannel(func(_ *DataChannel) {
- close(onDataChannelCalled)
- })
- offer, err := offerPC.CreateOffer(nil)
- assert.NoError(t, err)
- offerGatheringComplete := GatheringCompletePromise(offerPC)
- assert.NoError(t, offerPC.SetLocalDescription(offer))
- <-offerGatheringComplete
- offer = *offerPC.LocalDescription()
- // Replace with old values
- const (
- oldApplication = "m=application 63743 DTLS/SCTP 5000\r"
- oldAttribute = "a=sctpmap:5000 webrtc-datachannel 256\r"
- )
- offer.SDP = regexp.MustCompile(`m=application (.*?)\r`).ReplaceAllString(offer.SDP, oldApplication)
- offer.SDP = regexp.MustCompile(`a=sctp-port(.*?)\r`).ReplaceAllString(offer.SDP, oldAttribute)
- // Assert that replace worked
- assert.True(t, strings.Contains(offer.SDP, oldApplication))
- assert.True(t, strings.Contains(offer.SDP, oldAttribute))
- assert.NoError(t, answerPC.SetRemoteDescription(offer))
- answer, err := answerPC.CreateAnswer(nil)
- assert.NoError(t, err)
- answerGatheringComplete := GatheringCompletePromise(answerPC)
- assert.NoError(t, answerPC.SetLocalDescription(answer))
- <-answerGatheringComplete
- assert.NoError(t, offerPC.SetRemoteDescription(*answerPC.LocalDescription()))
- <-onDataChannelCalled
- closePairNow(t, offerPC, answerPC)
- }
- func TestDataChannel_Dial(t *testing.T) {
- t.Run("handler should be called once, by dialing peer only", func(t *testing.T) {
- report := test.CheckRoutines(t)
- defer report()
- dialCalls := make(chan bool, 2)
- wg := new(sync.WaitGroup)
- wg.Add(2)
- offerPC, answerPC, err := newPair()
- if err != nil {
- t.Fatalf("Failed to create a PC pair for testing")
- }
- answerPC.OnDataChannel(func(d *DataChannel) {
- if d.Label() != expectedLabel {
- return
- }
- d.OnDial(func() {
- // only dialing side should fire OnDial
- t.Fatalf("answering side should not call on dial")
- })
- d.OnOpen(wg.Done)
- })
- d, err := offerPC.CreateDataChannel(expectedLabel, nil)
- assert.NoError(t, err)
- d.OnDial(func() {
- dialCalls <- true
- wg.Done()
- })
- assert.NoError(t, signalPair(offerPC, answerPC))
- wg.Wait()
- closePairNow(t, offerPC, answerPC)
- assert.Len(t, dialCalls, 1)
- })
- t.Run("handler should be called immediately if already dialed", func(t *testing.T) {
- report := test.CheckRoutines(t)
- defer report()
- done := make(chan bool)
- offerPC, answerPC, err := newPair()
- if err != nil {
- t.Fatalf("Failed to create a PC pair for testing")
- }
- d, err := offerPC.CreateDataChannel(expectedLabel, nil)
- assert.NoError(t, err)
- d.OnOpen(func() {
- // when the offer DC has been opened, its guaranteed to have dialed since it has
- // received a response to said dial. this test represents an unrealistic usage,
- // but its the best way to guarantee we "missed" the dial event and still invoke
- // the handler.
- d.OnDial(func() {
- done <- true
- })
- })
- assert.NoError(t, signalPair(offerPC, answerPC))
- closePair(t, offerPC, answerPC, done)
- })
- }
|