gather_test.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. //go:build !js
  4. // +build !js
  5. package ice
  6. import (
  7. "context"
  8. "crypto/tls"
  9. "io"
  10. "net"
  11. "net/url"
  12. "reflect"
  13. "sort"
  14. "strconv"
  15. "sync"
  16. "sync/atomic"
  17. "testing"
  18. "time"
  19. "github.com/pion/dtls/v2"
  20. "github.com/pion/dtls/v2/pkg/crypto/selfsign"
  21. "github.com/pion/logging"
  22. "github.com/pion/stun"
  23. "github.com/pion/transport/v2/test"
  24. "github.com/pion/turn/v2"
  25. "github.com/stretchr/testify/assert"
  26. "github.com/stretchr/testify/require"
  27. "golang.org/x/net/proxy"
  28. )
  29. func TestListenUDP(t *testing.T) {
  30. a, err := NewAgent(&AgentConfig{})
  31. assert.NoError(t, err)
  32. localIPs, err := localInterfaces(a.net, a.interfaceFilter, a.ipFilter, []NetworkType{NetworkTypeUDP4}, false)
  33. assert.NotEqual(t, len(localIPs), 0, "localInterfaces found no interfaces, unable to test")
  34. assert.NoError(t, err)
  35. ip := localIPs[0]
  36. conn, err := listenUDPInPortRange(a.net, a.log, 0, 0, udp, &net.UDPAddr{IP: ip, Port: 0})
  37. assert.NoError(t, err, "listenUDP error with no port restriction")
  38. assert.NotNil(t, conn, "listenUDP error with no port restriction return a nil conn")
  39. _, err = listenUDPInPortRange(a.net, a.log, 4999, 5000, udp, &net.UDPAddr{IP: ip, Port: 0})
  40. assert.Equal(t, err, ErrPort, "listenUDP with invalid port range did not return ErrPort")
  41. conn, err = listenUDPInPortRange(a.net, a.log, 5000, 5000, udp, &net.UDPAddr{IP: ip, Port: 0})
  42. assert.NoError(t, err, "listenUDP error with no port restriction")
  43. assert.NotNil(t, conn, "listenUDP error with no port restriction return a nil conn")
  44. _, port, err := net.SplitHostPort(conn.LocalAddr().String())
  45. assert.NoError(t, err)
  46. assert.Equal(t, port, "5000", "listenUDP with port restriction of 5000 listened on incorrect port")
  47. portMin := 5100
  48. portMax := 5109
  49. total := portMax - portMin + 1
  50. result := make([]int, 0, total)
  51. portRange := make([]int, 0, total)
  52. for i := 0; i < total; i++ {
  53. conn, err = listenUDPInPortRange(a.net, a.log, portMax, portMin, udp, &net.UDPAddr{IP: ip, Port: 0})
  54. assert.NoError(t, err, "listenUDP error with no port restriction")
  55. assert.NotNil(t, conn, "listenUDP error with no port restriction return a nil conn")
  56. _, port, err = net.SplitHostPort(conn.LocalAddr().String())
  57. if err != nil {
  58. t.Fatal(err)
  59. }
  60. p, _ := strconv.Atoi(port)
  61. if p < portMin || p > portMax {
  62. t.Fatalf("listenUDP with port restriction [%d, %d] listened on incorrect port (%s)", portMin, portMax, port)
  63. }
  64. result = append(result, p)
  65. portRange = append(portRange, portMin+i)
  66. }
  67. if sort.IntsAreSorted(result) {
  68. t.Fatalf("listenUDP with port restriction [%d, %d], ports result should be random", portMin, portMax)
  69. }
  70. sort.Ints(result)
  71. if !reflect.DeepEqual(result, portRange) {
  72. t.Fatalf("listenUDP with port restriction [%d, %d], got:%v, want:%v", portMin, portMax, result, portRange)
  73. }
  74. _, err = listenUDPInPortRange(a.net, a.log, portMax, portMin, udp, &net.UDPAddr{IP: ip, Port: 0})
  75. assert.Equal(t, err, ErrPort, "listenUDP with port restriction [%d, %d], did not return ErrPort", portMin, portMax)
  76. assert.NoError(t, a.Close())
  77. }
  78. func TestGatherConcurrency(t *testing.T) {
  79. report := test.CheckRoutines(t)
  80. defer report()
  81. lim := test.TimeOut(time.Second * 30)
  82. defer lim.Stop()
  83. a, err := NewAgent(&AgentConfig{
  84. NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6},
  85. IncludeLoopback: true,
  86. })
  87. assert.NoError(t, err)
  88. candidateGathered, candidateGatheredFunc := context.WithCancel(context.Background())
  89. assert.NoError(t, a.OnCandidate(func(c Candidate) {
  90. candidateGatheredFunc()
  91. }))
  92. // Testing for panic
  93. for i := 0; i < 10; i++ {
  94. _ = a.GatherCandidates()
  95. }
  96. <-candidateGathered.Done()
  97. assert.NoError(t, a.Close())
  98. }
  99. func TestLoopbackCandidate(t *testing.T) {
  100. report := test.CheckRoutines(t)
  101. defer report()
  102. lim := test.TimeOut(time.Second * 30)
  103. defer lim.Stop()
  104. type testCase struct {
  105. name string
  106. agentConfig *AgentConfig
  107. loExpected bool
  108. }
  109. mux, err := NewMultiUDPMuxFromPort(12500)
  110. assert.NoError(t, err)
  111. muxWithLo, errlo := NewMultiUDPMuxFromPort(12501, UDPMuxFromPortWithLoopback())
  112. assert.NoError(t, errlo)
  113. testCases := []testCase{
  114. {
  115. name: "mux should not have loopback candidate",
  116. agentConfig: &AgentConfig{
  117. NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6},
  118. UDPMux: mux,
  119. },
  120. loExpected: false,
  121. },
  122. {
  123. name: "mux with loopback should not have loopback candidate",
  124. agentConfig: &AgentConfig{
  125. NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6},
  126. UDPMux: muxWithLo,
  127. },
  128. loExpected: true,
  129. },
  130. {
  131. name: "includeloopback enabled",
  132. agentConfig: &AgentConfig{
  133. NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6},
  134. IncludeLoopback: true,
  135. },
  136. loExpected: true,
  137. },
  138. {
  139. name: "includeloopback disabled",
  140. agentConfig: &AgentConfig{
  141. NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6},
  142. IncludeLoopback: false,
  143. },
  144. loExpected: false,
  145. },
  146. }
  147. for _, tc := range testCases {
  148. tcase := tc
  149. t.Run(tcase.name, func(t *testing.T) {
  150. a, err := NewAgent(tc.agentConfig)
  151. assert.NoError(t, err)
  152. candidateGathered, candidateGatheredFunc := context.WithCancel(context.Background())
  153. var loopback int32
  154. assert.NoError(t, a.OnCandidate(func(c Candidate) {
  155. if c != nil {
  156. if net.ParseIP(c.Address()).IsLoopback() {
  157. atomic.StoreInt32(&loopback, 1)
  158. }
  159. } else {
  160. candidateGatheredFunc()
  161. return
  162. }
  163. t.Log(c.NetworkType(), c.Priority(), c)
  164. }))
  165. assert.NoError(t, a.GatherCandidates())
  166. <-candidateGathered.Done()
  167. assert.NoError(t, a.Close())
  168. assert.Equal(t, tcase.loExpected, atomic.LoadInt32(&loopback) == 1)
  169. })
  170. }
  171. assert.NoError(t, mux.Close())
  172. assert.NoError(t, muxWithLo.Close())
  173. }
  174. // Assert that STUN gathering is done concurrently
  175. func TestSTUNConcurrency(t *testing.T) {
  176. report := test.CheckRoutines(t)
  177. defer report()
  178. lim := test.TimeOut(time.Second * 30)
  179. defer lim.Stop()
  180. serverPort := randomPort(t)
  181. serverListener, err := net.ListenPacket("udp4", "127.0.0.1:"+strconv.Itoa(serverPort))
  182. assert.NoError(t, err)
  183. server, err := turn.NewServer(turn.ServerConfig{
  184. Realm: "pion.ly",
  185. AuthHandler: optimisticAuthHandler,
  186. PacketConnConfigs: []turn.PacketConnConfig{
  187. {
  188. PacketConn: serverListener,
  189. RelayAddressGenerator: &turn.RelayAddressGeneratorNone{Address: "127.0.0.1"},
  190. },
  191. },
  192. })
  193. assert.NoError(t, err)
  194. urls := []*stun.URI{}
  195. for i := 0; i <= 10; i++ {
  196. urls = append(urls, &stun.URI{
  197. Scheme: stun.SchemeTypeSTUN,
  198. Host: "127.0.0.1",
  199. Port: serverPort + 1,
  200. })
  201. }
  202. urls = append(urls, &stun.URI{
  203. Scheme: stun.SchemeTypeSTUN,
  204. Host: "127.0.0.1",
  205. Port: serverPort,
  206. })
  207. listener, err := net.ListenTCP("tcp", &net.TCPAddr{
  208. IP: net.IP{127, 0, 0, 1},
  209. })
  210. require.NoError(t, err)
  211. defer func() {
  212. _ = listener.Close()
  213. }()
  214. a, err := NewAgent(&AgentConfig{
  215. NetworkTypes: supportedNetworkTypes(),
  216. Urls: urls,
  217. CandidateTypes: []CandidateType{CandidateTypeHost, CandidateTypeServerReflexive},
  218. TCPMux: NewTCPMuxDefault(
  219. TCPMuxParams{
  220. Listener: listener,
  221. Logger: logging.NewDefaultLoggerFactory().NewLogger("ice"),
  222. ReadBufferSize: 8,
  223. },
  224. ),
  225. })
  226. assert.NoError(t, err)
  227. candidateGathered, candidateGatheredFunc := context.WithCancel(context.Background())
  228. assert.NoError(t, a.OnCandidate(func(c Candidate) {
  229. if c == nil {
  230. candidateGatheredFunc()
  231. return
  232. }
  233. t.Log(c.NetworkType(), c.Priority(), c)
  234. }))
  235. assert.NoError(t, a.GatherCandidates())
  236. <-candidateGathered.Done()
  237. assert.NoError(t, a.Close())
  238. assert.NoError(t, server.Close())
  239. }
  240. // Assert that TURN gathering is done concurrently
  241. func TestTURNConcurrency(t *testing.T) {
  242. report := test.CheckRoutines(t)
  243. defer report()
  244. lim := test.TimeOut(time.Second * 30)
  245. defer lim.Stop()
  246. runTest := func(protocol stun.ProtoType, scheme stun.SchemeType, packetConn net.PacketConn, listener net.Listener, serverPort int) {
  247. packetConnConfigs := []turn.PacketConnConfig{}
  248. if packetConn != nil {
  249. packetConnConfigs = append(packetConnConfigs, turn.PacketConnConfig{
  250. PacketConn: packetConn,
  251. RelayAddressGenerator: &turn.RelayAddressGeneratorNone{Address: "127.0.0.1"},
  252. })
  253. }
  254. listenerConfigs := []turn.ListenerConfig{}
  255. if listener != nil {
  256. listenerConfigs = append(listenerConfigs, turn.ListenerConfig{
  257. Listener: listener,
  258. RelayAddressGenerator: &turn.RelayAddressGeneratorNone{Address: "127.0.0.1"},
  259. })
  260. }
  261. server, err := turn.NewServer(turn.ServerConfig{
  262. Realm: "pion.ly",
  263. AuthHandler: optimisticAuthHandler,
  264. PacketConnConfigs: packetConnConfigs,
  265. ListenerConfigs: listenerConfigs,
  266. })
  267. assert.NoError(t, err)
  268. urls := []*stun.URI{}
  269. for i := 0; i <= 10; i++ {
  270. urls = append(urls, &stun.URI{
  271. Scheme: scheme,
  272. Host: "127.0.0.1",
  273. Username: "username",
  274. Password: "password",
  275. Proto: protocol,
  276. Port: serverPort + 1 + i,
  277. })
  278. }
  279. urls = append(urls, &stun.URI{
  280. Scheme: scheme,
  281. Host: "127.0.0.1",
  282. Username: "username",
  283. Password: "password",
  284. Proto: protocol,
  285. Port: serverPort,
  286. })
  287. a, err := NewAgent(&AgentConfig{
  288. CandidateTypes: []CandidateType{CandidateTypeRelay},
  289. InsecureSkipVerify: true,
  290. NetworkTypes: supportedNetworkTypes(),
  291. Urls: urls,
  292. })
  293. assert.NoError(t, err)
  294. candidateGathered, candidateGatheredFunc := context.WithCancel(context.Background())
  295. assert.NoError(t, a.OnCandidate(func(c Candidate) {
  296. if c != nil {
  297. candidateGatheredFunc()
  298. }
  299. }))
  300. assert.NoError(t, a.GatherCandidates())
  301. <-candidateGathered.Done()
  302. assert.NoError(t, a.Close())
  303. assert.NoError(t, server.Close())
  304. }
  305. t.Run("UDP Relay", func(t *testing.T) {
  306. serverPort := randomPort(t)
  307. serverListener, err := net.ListenPacket("udp", "127.0.0.1:"+strconv.Itoa(serverPort))
  308. assert.NoError(t, err)
  309. runTest(stun.ProtoTypeUDP, stun.SchemeTypeTURN, serverListener, nil, serverPort)
  310. })
  311. t.Run("TCP Relay", func(t *testing.T) {
  312. serverPort := randomPort(t)
  313. serverListener, err := net.Listen("tcp", "127.0.0.1:"+strconv.Itoa(serverPort))
  314. assert.NoError(t, err)
  315. runTest(stun.ProtoTypeTCP, stun.SchemeTypeTURN, nil, serverListener, serverPort)
  316. })
  317. t.Run("TLS Relay", func(t *testing.T) {
  318. certificate, genErr := selfsign.GenerateSelfSigned()
  319. assert.NoError(t, genErr)
  320. serverPort := randomPort(t)
  321. serverListener, err := tls.Listen("tcp", "127.0.0.1:"+strconv.Itoa(serverPort), &tls.Config{ //nolint:gosec
  322. Certificates: []tls.Certificate{certificate},
  323. })
  324. assert.NoError(t, err)
  325. runTest(stun.ProtoTypeTCP, stun.SchemeTypeTURNS, nil, serverListener, serverPort)
  326. })
  327. t.Run("DTLS Relay", func(t *testing.T) {
  328. certificate, genErr := selfsign.GenerateSelfSigned()
  329. assert.NoError(t, genErr)
  330. serverPort := randomPort(t)
  331. serverListener, err := dtls.Listen("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: serverPort}, &dtls.Config{
  332. Certificates: []tls.Certificate{certificate},
  333. })
  334. assert.NoError(t, err)
  335. runTest(stun.ProtoTypeUDP, stun.SchemeTypeTURNS, nil, serverListener, serverPort)
  336. })
  337. }
  338. // Assert that STUN and TURN gathering are done concurrently
  339. func TestSTUNTURNConcurrency(t *testing.T) {
  340. report := test.CheckRoutines(t)
  341. defer report()
  342. lim := test.TimeOut(time.Second * 8)
  343. defer lim.Stop()
  344. serverPort := randomPort(t)
  345. serverListener, err := net.ListenPacket("udp4", "127.0.0.1:"+strconv.Itoa(serverPort))
  346. assert.NoError(t, err)
  347. server, err := turn.NewServer(turn.ServerConfig{
  348. Realm: "pion.ly",
  349. AuthHandler: optimisticAuthHandler,
  350. PacketConnConfigs: []turn.PacketConnConfig{
  351. {
  352. PacketConn: serverListener,
  353. RelayAddressGenerator: &turn.RelayAddressGeneratorNone{Address: "127.0.0.1"},
  354. },
  355. },
  356. })
  357. assert.NoError(t, err)
  358. urls := []*stun.URI{}
  359. for i := 0; i <= 10; i++ {
  360. urls = append(urls, &stun.URI{
  361. Scheme: stun.SchemeTypeSTUN,
  362. Host: "127.0.0.1",
  363. Port: serverPort + 1,
  364. })
  365. }
  366. urls = append(urls, &stun.URI{
  367. Scheme: stun.SchemeTypeTURN,
  368. Proto: stun.ProtoTypeUDP,
  369. Host: "127.0.0.1",
  370. Port: serverPort,
  371. Username: "username",
  372. Password: "password",
  373. })
  374. a, err := NewAgent(&AgentConfig{
  375. NetworkTypes: supportedNetworkTypes(),
  376. Urls: urls,
  377. CandidateTypes: []CandidateType{CandidateTypeServerReflexive, CandidateTypeRelay},
  378. })
  379. assert.NoError(t, err)
  380. {
  381. gatherLim := test.TimeOut(time.Second * 3) // As TURN and STUN should be checked in parallel, this should complete before the default STUN timeout (5s)
  382. candidateGathered, candidateGatheredFunc := context.WithCancel(context.Background())
  383. assert.NoError(t, a.OnCandidate(func(c Candidate) {
  384. if c != nil {
  385. candidateGatheredFunc()
  386. }
  387. }))
  388. assert.NoError(t, a.GatherCandidates())
  389. <-candidateGathered.Done()
  390. gatherLim.Stop()
  391. }
  392. assert.NoError(t, a.Close())
  393. assert.NoError(t, server.Close())
  394. }
  395. // Assert that srflx candidates can be gathered from TURN servers
  396. //
  397. // When TURN servers are utilized, both types of candidates
  398. // (i.e. srflx and relay) are obtained from the TURN server.
  399. //
  400. // https://tools.ietf.org/html/rfc5245#section-2.1
  401. func TestTURNSrflx(t *testing.T) {
  402. report := test.CheckRoutines(t)
  403. defer report()
  404. lim := test.TimeOut(time.Second * 30)
  405. defer lim.Stop()
  406. serverPort := randomPort(t)
  407. serverListener, err := net.ListenPacket("udp4", "127.0.0.1:"+strconv.Itoa(serverPort))
  408. assert.NoError(t, err)
  409. server, err := turn.NewServer(turn.ServerConfig{
  410. Realm: "pion.ly",
  411. AuthHandler: optimisticAuthHandler,
  412. PacketConnConfigs: []turn.PacketConnConfig{
  413. {
  414. PacketConn: serverListener,
  415. RelayAddressGenerator: &turn.RelayAddressGeneratorNone{Address: "127.0.0.1"},
  416. },
  417. },
  418. })
  419. assert.NoError(t, err)
  420. urls := []*stun.URI{{
  421. Scheme: stun.SchemeTypeTURN,
  422. Proto: stun.ProtoTypeUDP,
  423. Host: "127.0.0.1",
  424. Port: serverPort,
  425. Username: "username",
  426. Password: "password",
  427. }}
  428. a, err := NewAgent(&AgentConfig{
  429. NetworkTypes: supportedNetworkTypes(),
  430. Urls: urls,
  431. CandidateTypes: []CandidateType{CandidateTypeServerReflexive, CandidateTypeRelay},
  432. })
  433. assert.NoError(t, err)
  434. candidateGathered, candidateGatheredFunc := context.WithCancel(context.Background())
  435. assert.NoError(t, a.OnCandidate(func(c Candidate) {
  436. if c != nil && c.Type() == CandidateTypeServerReflexive {
  437. candidateGatheredFunc()
  438. }
  439. }))
  440. assert.NoError(t, a.GatherCandidates())
  441. <-candidateGathered.Done()
  442. assert.NoError(t, a.Close())
  443. assert.NoError(t, server.Close())
  444. }
  445. func TestCloseConnLog(t *testing.T) {
  446. a, err := NewAgent(&AgentConfig{})
  447. assert.NoError(t, err)
  448. closeConnAndLog(nil, a.log, "normal nil")
  449. var nc *net.UDPConn
  450. closeConnAndLog(nc, a.log, "nil ptr")
  451. assert.NoError(t, a.Close())
  452. }
  453. type mockProxy struct {
  454. proxyWasDialed func()
  455. }
  456. type mockConn struct{}
  457. func (m *mockConn) Read([]byte) (n int, err error) { return 0, io.EOF }
  458. func (m *mockConn) Write([]byte) (int, error) { return 0, io.EOF }
  459. func (m *mockConn) Close() error { return io.EOF }
  460. func (m *mockConn) LocalAddr() net.Addr { return &net.TCPAddr{} }
  461. func (m *mockConn) RemoteAddr() net.Addr { return &net.TCPAddr{} }
  462. func (m *mockConn) SetDeadline(time.Time) error { return io.EOF }
  463. func (m *mockConn) SetReadDeadline(time.Time) error { return io.EOF }
  464. func (m *mockConn) SetWriteDeadline(time.Time) error { return io.EOF }
  465. func (m *mockProxy) Dial(string, string) (net.Conn, error) {
  466. m.proxyWasDialed()
  467. return &mockConn{}, nil
  468. }
  469. func TestTURNProxyDialer(t *testing.T) {
  470. report := test.CheckRoutines(t)
  471. defer report()
  472. lim := test.TimeOut(time.Second * 30)
  473. defer lim.Stop()
  474. proxyWasDialed, proxyWasDialedFunc := context.WithCancel(context.Background())
  475. proxy.RegisterDialerType("tcp", func(*url.URL, proxy.Dialer) (proxy.Dialer, error) {
  476. return &mockProxy{proxyWasDialedFunc}, nil
  477. })
  478. tcpProxyURI, err := url.Parse("tcp://fakeproxy:3128")
  479. assert.NoError(t, err)
  480. proxyDialer, err := proxy.FromURL(tcpProxyURI, proxy.Direct)
  481. assert.NoError(t, err)
  482. a, err := NewAgent(&AgentConfig{
  483. CandidateTypes: []CandidateType{CandidateTypeRelay},
  484. NetworkTypes: supportedNetworkTypes(),
  485. Urls: []*stun.URI{
  486. {
  487. Scheme: stun.SchemeTypeTURN,
  488. Host: "127.0.0.1",
  489. Username: "username",
  490. Password: "password",
  491. Proto: stun.ProtoTypeTCP,
  492. Port: 5000,
  493. },
  494. },
  495. ProxyDialer: proxyDialer,
  496. })
  497. assert.NoError(t, err)
  498. candidateGatherFinish, candidateGatherFinishFunc := context.WithCancel(context.Background())
  499. assert.NoError(t, a.OnCandidate(func(c Candidate) {
  500. if c == nil {
  501. candidateGatherFinishFunc()
  502. }
  503. }))
  504. assert.NoError(t, a.GatherCandidates())
  505. <-candidateGatherFinish.Done()
  506. <-proxyWasDialed.Done()
  507. assert.NoError(t, a.Close())
  508. }
  509. // TestUDPMuxDefaultWithNAT1To1IPsUsage asserts that candidates
  510. // are given and connections are valid when using UDPMuxDefault and NAT1To1IPs.
  511. func TestUDPMuxDefaultWithNAT1To1IPsUsage(t *testing.T) {
  512. report := test.CheckRoutines(t)
  513. defer report()
  514. lim := test.TimeOut(time.Second * 30)
  515. defer lim.Stop()
  516. conn, err := net.ListenPacket("udp4", ":0")
  517. assert.NoError(t, err)
  518. defer func() {
  519. _ = conn.Close()
  520. }()
  521. mux := NewUDPMuxDefault(UDPMuxParams{
  522. UDPConn: conn,
  523. })
  524. defer func() {
  525. _ = mux.Close()
  526. }()
  527. a, err := NewAgent(&AgentConfig{
  528. NAT1To1IPs: []string{"1.2.3.4"},
  529. NAT1To1IPCandidateType: CandidateTypeHost,
  530. UDPMux: mux,
  531. })
  532. assert.NoError(t, err)
  533. gatherCandidateDone := make(chan struct{})
  534. assert.NoError(t, a.OnCandidate(func(c Candidate) {
  535. if c == nil {
  536. close(gatherCandidateDone)
  537. } else {
  538. assert.Equal(t, "1.2.3.4", c.Address())
  539. }
  540. }))
  541. assert.NoError(t, a.GatherCandidates())
  542. <-gatherCandidateDone
  543. assert.NotEqual(t, 0, len(mux.connsIPv4))
  544. assert.NoError(t, a.Close())
  545. }
  546. // Assert that candidates are given for each mux in a MultiUDPMux
  547. func TestMultiUDPMuxUsage(t *testing.T) {
  548. report := test.CheckRoutines(t)
  549. defer report()
  550. lim := test.TimeOut(time.Second * 30)
  551. defer lim.Stop()
  552. var expectedPorts []int
  553. var udpMuxInstances []UDPMux
  554. for i := 0; i < 3; i++ {
  555. port := randomPort(t)
  556. conn, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IP{127, 0, 0, 1}, Port: port})
  557. assert.NoError(t, err)
  558. defer func() {
  559. _ = conn.Close()
  560. }()
  561. expectedPorts = append(expectedPorts, port)
  562. muxDefault := NewUDPMuxDefault(UDPMuxParams{UDPConn: conn})
  563. udpMuxInstances = append(udpMuxInstances, muxDefault)
  564. idx := i
  565. defer func() {
  566. _ = udpMuxInstances[idx].Close()
  567. }()
  568. }
  569. a, err := NewAgent(&AgentConfig{
  570. NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6},
  571. CandidateTypes: []CandidateType{CandidateTypeHost},
  572. UDPMux: NewMultiUDPMuxDefault(udpMuxInstances...),
  573. })
  574. assert.NoError(t, err)
  575. candidateCh := make(chan Candidate)
  576. assert.NoError(t, a.OnCandidate(func(c Candidate) {
  577. if c == nil {
  578. close(candidateCh)
  579. return
  580. }
  581. candidateCh <- c
  582. }))
  583. assert.NoError(t, a.GatherCandidates())
  584. portFound := make(map[int]bool)
  585. for c := range candidateCh {
  586. portFound[c.Port()] = true
  587. assert.True(t, c.NetworkType().IsUDP(), "All candidates should be UDP")
  588. }
  589. assert.Len(t, portFound, len(expectedPorts))
  590. for _, port := range expectedPorts {
  591. assert.True(t, portFound[port], "There should be a candidate for each UDP mux port")
  592. }
  593. assert.NoError(t, a.Close())
  594. }
  595. // Assert that candidates are given for each mux in a MultiTCPMux
  596. func TestMultiTCPMuxUsage(t *testing.T) {
  597. report := test.CheckRoutines(t)
  598. defer report()
  599. lim := test.TimeOut(time.Second * 30)
  600. defer lim.Stop()
  601. var expectedPorts []int
  602. var tcpMuxInstances []TCPMux
  603. for i := 0; i < 3; i++ {
  604. port := randomPort(t)
  605. listener, err := net.ListenTCP("tcp", &net.TCPAddr{
  606. IP: net.IP{127, 0, 0, 1},
  607. Port: port,
  608. })
  609. assert.NoError(t, err)
  610. defer func() {
  611. _ = listener.Close()
  612. }()
  613. expectedPorts = append(expectedPorts, port)
  614. tcpMuxInstances = append(tcpMuxInstances, NewTCPMuxDefault(TCPMuxParams{
  615. Listener: listener,
  616. ReadBufferSize: 8,
  617. }))
  618. }
  619. a, err := NewAgent(&AgentConfig{
  620. NetworkTypes: supportedNetworkTypes(),
  621. CandidateTypes: []CandidateType{CandidateTypeHost},
  622. TCPMux: NewMultiTCPMuxDefault(tcpMuxInstances...),
  623. })
  624. assert.NoError(t, err)
  625. candidateCh := make(chan Candidate)
  626. assert.NoError(t, a.OnCandidate(func(c Candidate) {
  627. if c == nil {
  628. close(candidateCh)
  629. return
  630. }
  631. candidateCh <- c
  632. }))
  633. assert.NoError(t, a.GatherCandidates())
  634. portFound := make(map[int]bool)
  635. for c := range candidateCh {
  636. activeCandidate := c.Port() == 0
  637. if c.NetworkType().IsTCP() && !activeCandidate {
  638. portFound[c.Port()] = true
  639. }
  640. }
  641. assert.Len(t, portFound, len(expectedPorts))
  642. for _, port := range expectedPorts {
  643. assert.True(t, portFound[port], "There should be a candidate for each TCP mux port")
  644. }
  645. assert.NoError(t, a.Close())
  646. }
  647. // Assert that UniversalUDPMux is used while gathering when configured in the Agent
  648. func TestUniversalUDPMuxUsage(t *testing.T) {
  649. report := test.CheckRoutines(t)
  650. defer report()
  651. lim := test.TimeOut(time.Second * 30)
  652. defer lim.Stop()
  653. conn, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IP{127, 0, 0, 1}, Port: randomPort(t)})
  654. assert.NoError(t, err)
  655. defer func() {
  656. _ = conn.Close()
  657. }()
  658. udpMuxSrflx := &universalUDPMuxMock{
  659. conn: conn,
  660. }
  661. numSTUNS := 3
  662. urls := []*stun.URI{}
  663. for i := 0; i < numSTUNS; i++ {
  664. urls = append(urls, &stun.URI{
  665. Scheme: SchemeTypeSTUN,
  666. Host: "127.0.0.1",
  667. Port: 3478 + i,
  668. })
  669. }
  670. a, err := NewAgent(&AgentConfig{
  671. NetworkTypes: supportedNetworkTypes(),
  672. Urls: urls,
  673. CandidateTypes: []CandidateType{CandidateTypeServerReflexive},
  674. UDPMuxSrflx: udpMuxSrflx,
  675. })
  676. assert.NoError(t, err)
  677. candidateGathered, candidateGatheredFunc := context.WithCancel(context.Background())
  678. assert.NoError(t, a.OnCandidate(func(c Candidate) {
  679. if c == nil {
  680. candidateGatheredFunc()
  681. return
  682. }
  683. t.Log(c.NetworkType(), c.Priority(), c)
  684. }))
  685. assert.NoError(t, a.GatherCandidates())
  686. <-candidateGathered.Done()
  687. assert.NoError(t, a.Close())
  688. // Twice because of 2 STUN servers configured
  689. assert.Equal(t, numSTUNS, udpMuxSrflx.getXORMappedAddrUsedTimes, "expected times that GetXORMappedAddr should be called")
  690. // One for Restart() when agent has been initialized and one time when Close() the agent
  691. assert.Equal(t, 2, udpMuxSrflx.removeConnByUfragTimes, "expected times that RemoveConnByUfrag should be called")
  692. // Twice because of 2 STUN servers configured
  693. assert.Equal(t, numSTUNS, udpMuxSrflx.getConnForURLTimes, "expected times that GetConnForURL should be called")
  694. }
  695. type universalUDPMuxMock struct {
  696. UDPMux
  697. getXORMappedAddrUsedTimes int
  698. removeConnByUfragTimes int
  699. getConnForURLTimes int
  700. mu sync.Mutex
  701. conn *net.UDPConn
  702. }
  703. func (m *universalUDPMuxMock) GetRelayedAddr(net.Addr, time.Duration) (*net.Addr, error) {
  704. return nil, errNotImplemented
  705. }
  706. func (m *universalUDPMuxMock) GetConnForURL(string, string, net.Addr) (net.PacketConn, error) {
  707. m.mu.Lock()
  708. defer m.mu.Unlock()
  709. m.getConnForURLTimes++
  710. return m.conn, nil
  711. }
  712. func (m *universalUDPMuxMock) GetXORMappedAddr(net.Addr, time.Duration) (*stun.XORMappedAddress, error) {
  713. m.mu.Lock()
  714. defer m.mu.Unlock()
  715. m.getXORMappedAddrUsedTimes++
  716. return &stun.XORMappedAddress{IP: net.IP{100, 64, 0, 1}, Port: 77878}, nil
  717. }
  718. func (m *universalUDPMuxMock) RemoveConnByUfrag(string) {
  719. m.mu.Lock()
  720. defer m.mu.Unlock()
  721. m.removeConnByUfragTimes++
  722. }
  723. func (m *universalUDPMuxMock) GetListenAddresses() []net.Addr {
  724. return []net.Addr{m.conn.LocalAddr()}
  725. }