main.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. //go:build !js
  4. // +build !js
  5. // show-network-usage shows the amount of packets flowing through the vnet
  6. package main
  7. import (
  8. "fmt"
  9. "log"
  10. "net"
  11. "os"
  12. "sync/atomic"
  13. "time"
  14. "github.com/pion/logging"
  15. "github.com/pion/transport/v2/vnet"
  16. "github.com/pion/webrtc/v3"
  17. )
  18. /* VNet Configuration
  19. + - - - - - - - - - - - - - - - - - - - - - - - +
  20. VNet
  21. | +-------------------------------------------+ |
  22. | wan:vnet.Router |
  23. | +---------+----------------------+----------+ |
  24. | |
  25. | +---------+----------+ +---------+----------+ |
  26. | offerVNet:vnet.Net | |answerVNet:vnet.Net |
  27. | +---------+----------+ +---------+----------+ |
  28. | |
  29. + - - - - - + - - - - - - - - - - -+- - - - - - +
  30. | |
  31. +---------+----------+ +---------+----------+
  32. |offerPeerConnection | |answerPeerConnection|
  33. +--------------------+ +--------------------+
  34. */
  35. func main() {
  36. var inboundBytes int32 // for offerPeerConnection
  37. var outboundBytes int32 // for offerPeerConnection
  38. // Create a root router
  39. wan, err := vnet.NewRouter(&vnet.RouterConfig{
  40. CIDR: "1.2.3.0/24",
  41. LoggerFactory: logging.NewDefaultLoggerFactory(),
  42. })
  43. panicIfError(err)
  44. // Add a filter that monitors the traffic on the router
  45. wan.AddChunkFilter(func(c vnet.Chunk) bool {
  46. netType := c.SourceAddr().Network()
  47. if netType == "udp" {
  48. dstAddr := c.DestinationAddr().String()
  49. host, _, err2 := net.SplitHostPort(dstAddr)
  50. panicIfError(err2)
  51. if host == "1.2.3.4" {
  52. // c.UserData() returns a []byte of UDP payload
  53. atomic.AddInt32(&inboundBytes, int32(len(c.UserData())))
  54. }
  55. srcAddr := c.SourceAddr().String()
  56. host, _, err2 = net.SplitHostPort(srcAddr)
  57. panicIfError(err2)
  58. if host == "1.2.3.4" {
  59. // c.UserData() returns a []byte of UDP payload
  60. atomic.AddInt32(&outboundBytes, int32(len(c.UserData())))
  61. }
  62. }
  63. return true
  64. })
  65. // Log throughput every 3 seconds
  66. go func() {
  67. duration := 2 * time.Second
  68. for {
  69. time.Sleep(duration)
  70. inBytes := atomic.SwapInt32(&inboundBytes, 0) // read & reset
  71. outBytes := atomic.SwapInt32(&outboundBytes, 0) // read & reset
  72. inboundThroughput := float64(inBytes) / duration.Seconds()
  73. outboundThroughput := float64(outBytes) / duration.Seconds()
  74. log.Printf("inbound throughput : %.01f [Byte/s]\n", inboundThroughput)
  75. log.Printf("outbound throughput: %.01f [Byte/s]\n", outboundThroughput)
  76. }
  77. }()
  78. // Create a network interface for offerer
  79. offerVNet, err := vnet.NewNet(&vnet.NetConfig{
  80. StaticIPs: []string{"1.2.3.4"},
  81. })
  82. panicIfError(err)
  83. // Add the network interface to the router
  84. panicIfError(wan.AddNet(offerVNet))
  85. offerSettingEngine := webrtc.SettingEngine{}
  86. offerSettingEngine.SetVNet(offerVNet)
  87. offerAPI := webrtc.NewAPI(webrtc.WithSettingEngine(offerSettingEngine))
  88. // Create a network interface for answerer
  89. answerVNet, err := vnet.NewNet(&vnet.NetConfig{
  90. StaticIPs: []string{"1.2.3.5"},
  91. })
  92. panicIfError(err)
  93. // Add the network interface to the router
  94. panicIfError(wan.AddNet(answerVNet))
  95. answerSettingEngine := webrtc.SettingEngine{}
  96. answerSettingEngine.SetVNet(answerVNet)
  97. answerAPI := webrtc.NewAPI(webrtc.WithSettingEngine(answerSettingEngine))
  98. // Start the virtual network by calling Start() on the root router
  99. panicIfError(wan.Start())
  100. offerPeerConnection, err := offerAPI.NewPeerConnection(webrtc.Configuration{})
  101. panicIfError(err)
  102. defer func() {
  103. if cErr := offerPeerConnection.Close(); cErr != nil {
  104. fmt.Printf("cannot close offerPeerConnection: %v\n", cErr)
  105. }
  106. }()
  107. answerPeerConnection, err := answerAPI.NewPeerConnection(webrtc.Configuration{})
  108. panicIfError(err)
  109. defer func() {
  110. if cErr := answerPeerConnection.Close(); cErr != nil {
  111. fmt.Printf("cannot close answerPeerConnection: %v\n", cErr)
  112. }
  113. }()
  114. // Set the handler for Peer connection state
  115. // This will notify you when the peer has connected/disconnected
  116. offerPeerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
  117. fmt.Printf("Peer Connection State has changed: %s (offerer)\n", s.String())
  118. if s == webrtc.PeerConnectionStateFailed {
  119. // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
  120. // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
  121. // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
  122. fmt.Println("Peer Connection has gone to failed exiting")
  123. os.Exit(0)
  124. }
  125. })
  126. // Set the handler for Peer connection state
  127. // This will notify you when the peer has connected/disconnected
  128. answerPeerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
  129. fmt.Printf("Peer Connection State has changed: %s (answerer)\n", s.String())
  130. if s == webrtc.PeerConnectionStateFailed {
  131. // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
  132. // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
  133. // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
  134. fmt.Println("Peer Connection has gone to failed exiting")
  135. os.Exit(0)
  136. }
  137. })
  138. // Set ICE Candidate handler. As soon as a PeerConnection has gathered a candidate
  139. // send it to the other peer
  140. answerPeerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
  141. if i != nil {
  142. panicIfError(offerPeerConnection.AddICECandidate(i.ToJSON()))
  143. }
  144. })
  145. // Set ICE Candidate handler. As soon as a PeerConnection has gathered a candidate
  146. // send it to the other peer
  147. offerPeerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
  148. if i != nil {
  149. panicIfError(answerPeerConnection.AddICECandidate(i.ToJSON()))
  150. }
  151. })
  152. offerDataChannel, err := offerPeerConnection.CreateDataChannel("label", nil)
  153. panicIfError(err)
  154. msgSendLoop := func(dc *webrtc.DataChannel, interval time.Duration) {
  155. for {
  156. time.Sleep(interval)
  157. panicIfError(dc.SendText("My DataChannel Message"))
  158. }
  159. }
  160. offerDataChannel.OnOpen(func() {
  161. // Send test from offerer every 100 msec
  162. msgSendLoop(offerDataChannel, 100*time.Millisecond)
  163. })
  164. answerPeerConnection.OnDataChannel(func(answerDataChannel *webrtc.DataChannel) {
  165. answerDataChannel.OnOpen(func() {
  166. // Send test from answerer every 200 msec
  167. msgSendLoop(answerDataChannel, 200*time.Millisecond)
  168. })
  169. })
  170. offer, err := offerPeerConnection.CreateOffer(nil)
  171. panicIfError(err)
  172. panicIfError(offerPeerConnection.SetLocalDescription(offer))
  173. panicIfError(answerPeerConnection.SetRemoteDescription(offer))
  174. answer, err := answerPeerConnection.CreateAnswer(nil)
  175. panicIfError(err)
  176. panicIfError(answerPeerConnection.SetLocalDescription(answer))
  177. panicIfError(offerPeerConnection.SetRemoteDescription(answer))
  178. // Block forever
  179. select {}
  180. }
  181. func panicIfError(err error) {
  182. if err != nil {
  183. panic(err)
  184. }
  185. }