main.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. // data-channels-flow-control demonstrates how to use the DataChannel congestion control APIs
  4. package main
  5. import (
  6. "encoding/json"
  7. "fmt"
  8. "log"
  9. "os"
  10. "sync/atomic"
  11. "time"
  12. "github.com/pion/webrtc/v3"
  13. )
  14. const (
  15. bufferedAmountLowThreshold uint64 = 512 * 1024 // 512 KB
  16. maxBufferedAmount uint64 = 1024 * 1024 // 1 MB
  17. )
  18. func check(err error) {
  19. if err != nil {
  20. panic(err)
  21. }
  22. }
  23. func setRemoteDescription(pc *webrtc.PeerConnection, sdp []byte) {
  24. var desc webrtc.SessionDescription
  25. err := json.Unmarshal(sdp, &desc)
  26. check(err)
  27. // Apply the desc as the remote description
  28. err = pc.SetRemoteDescription(desc)
  29. check(err)
  30. }
  31. func createOfferer() *webrtc.PeerConnection {
  32. // Prepare the configuration
  33. config := webrtc.Configuration{
  34. ICEServers: []webrtc.ICEServer{},
  35. }
  36. // Create a new PeerConnection
  37. pc, err := webrtc.NewPeerConnection(config)
  38. check(err)
  39. buf := make([]byte, 1024)
  40. ordered := false
  41. maxRetransmits := uint16(0)
  42. options := &webrtc.DataChannelInit{
  43. Ordered: &ordered,
  44. MaxRetransmits: &maxRetransmits,
  45. }
  46. sendMoreCh := make(chan struct{}, 1)
  47. // Create a datachannel with label 'data'
  48. dc, err := pc.CreateDataChannel("data", options)
  49. check(err)
  50. // Register channel opening handling
  51. dc.OnOpen(func() {
  52. log.Printf("OnOpen: %s-%d. Start sending a series of 1024-byte packets as fast as it can\n", dc.Label(), dc.ID())
  53. for {
  54. err2 := dc.Send(buf)
  55. check(err2)
  56. if dc.BufferedAmount()+uint64(len(buf)) > maxBufferedAmount {
  57. // Wait until the bufferedAmount becomes lower than the threshold
  58. <-sendMoreCh
  59. }
  60. }
  61. })
  62. // Set bufferedAmountLowThreshold so that we can get notified when
  63. // we can send more
  64. dc.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold)
  65. // This callback is made when the current bufferedAmount becomes lower than the threshold
  66. dc.OnBufferedAmountLow(func() {
  67. // Make sure to not block this channel or perform long running operations in this callback
  68. // This callback is executed by pion/sctp. If this callback is blocking it will stop operations
  69. select {
  70. case sendMoreCh <- struct{}{}:
  71. default:
  72. }
  73. })
  74. return pc
  75. }
  76. func createAnswerer() *webrtc.PeerConnection {
  77. // Prepare the configuration
  78. config := webrtc.Configuration{
  79. ICEServers: []webrtc.ICEServer{},
  80. }
  81. // Create a new PeerConnection
  82. pc, err := webrtc.NewPeerConnection(config)
  83. check(err)
  84. pc.OnDataChannel(func(dc *webrtc.DataChannel) {
  85. var totalBytesReceived uint64
  86. // Register channel opening handling
  87. dc.OnOpen(func() {
  88. log.Printf("OnOpen: %s-%d. Start receiving data", dc.Label(), dc.ID())
  89. since := time.Now()
  90. // Start printing out the observed throughput
  91. for range time.NewTicker(1000 * time.Millisecond).C {
  92. bps := float64(atomic.LoadUint64(&totalBytesReceived)*8) / time.Since(since).Seconds()
  93. log.Printf("Throughput: %.03f Mbps", bps/1024/1024)
  94. }
  95. })
  96. // Register the OnMessage to handle incoming messages
  97. dc.OnMessage(func(dcMsg webrtc.DataChannelMessage) {
  98. n := len(dcMsg.Data)
  99. atomic.AddUint64(&totalBytesReceived, uint64(n))
  100. })
  101. })
  102. return pc
  103. }
  104. func main() {
  105. offerPC := createOfferer()
  106. defer func() {
  107. if err := offerPC.Close(); err != nil {
  108. fmt.Printf("cannot close offerPC: %v\n", err)
  109. }
  110. }()
  111. answerPC := createAnswerer()
  112. defer func() {
  113. if err := answerPC.Close(); err != nil {
  114. fmt.Printf("cannot close answerPC: %v\n", err)
  115. }
  116. }()
  117. // Set ICE Candidate handler. As soon as a PeerConnection has gathered a candidate
  118. // send it to the other peer
  119. answerPC.OnICECandidate(func(i *webrtc.ICECandidate) {
  120. if i != nil {
  121. check(offerPC.AddICECandidate(i.ToJSON()))
  122. }
  123. })
  124. // Set ICE Candidate handler. As soon as a PeerConnection has gathered a candidate
  125. // send it to the other peer
  126. offerPC.OnICECandidate(func(i *webrtc.ICECandidate) {
  127. if i != nil {
  128. check(answerPC.AddICECandidate(i.ToJSON()))
  129. }
  130. })
  131. // Set the handler for Peer connection state
  132. // This will notify you when the peer has connected/disconnected
  133. offerPC.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
  134. fmt.Printf("Peer Connection State has changed: %s (offerer)\n", s.String())
  135. if s == webrtc.PeerConnectionStateFailed {
  136. // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
  137. // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
  138. // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
  139. fmt.Println("Peer Connection has gone to failed exiting")
  140. os.Exit(0)
  141. }
  142. })
  143. // Set the handler for Peer connection state
  144. // This will notify you when the peer has connected/disconnected
  145. answerPC.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
  146. fmt.Printf("Peer Connection State has changed: %s (answerer)\n", s.String())
  147. if s == webrtc.PeerConnectionStateFailed {
  148. // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
  149. // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
  150. // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
  151. fmt.Println("Peer Connection has gone to failed exiting")
  152. os.Exit(0)
  153. }
  154. })
  155. // Now, create an offer
  156. offer, err := offerPC.CreateOffer(nil)
  157. check(err)
  158. check(offerPC.SetLocalDescription(offer))
  159. desc, err := json.Marshal(offer)
  160. check(err)
  161. setRemoteDescription(answerPC, desc)
  162. answer, err := answerPC.CreateAnswer(nil)
  163. check(err)
  164. check(answerPC.SetLocalDescription(answer))
  165. desc2, err := json.Marshal(answer)
  166. check(err)
  167. setRemoteDescription(offerPC, desc2)
  168. // Block forever
  169. select {}
  170. }