udp_mux_universal.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. package ice
  4. import (
  5. "fmt"
  6. "net"
  7. "time"
  8. "github.com/pion/logging"
  9. "github.com/pion/stun"
  10. "github.com/pion/transport/v2"
  11. )
  12. // UniversalUDPMux allows multiple connections to go over a single UDP port for
  13. // host, server reflexive and relayed candidates.
  14. // Actual connection muxing is happening in the UDPMux.
  15. type UniversalUDPMux interface {
  16. UDPMux
  17. GetXORMappedAddr(stunAddr net.Addr, deadline time.Duration) (*stun.XORMappedAddress, error)
  18. GetRelayedAddr(turnAddr net.Addr, deadline time.Duration) (*net.Addr, error)
  19. GetConnForURL(ufrag string, url string, addr net.Addr) (net.PacketConn, error)
  20. }
  21. // UniversalUDPMuxDefault handles STUN and TURN servers packets by wrapping the original UDPConn overriding ReadFrom.
  22. // It the passes packets to the UDPMux that does the actual connection muxing.
  23. type UniversalUDPMuxDefault struct {
  24. *UDPMuxDefault
  25. params UniversalUDPMuxParams
  26. // Since we have a shared socket, for srflx candidates it makes sense to have a shared mapped address across all the agents
  27. // stun.XORMappedAddress indexed by the STUN server addr
  28. xorMappedMap map[string]*xorMapped
  29. }
  30. // UniversalUDPMuxParams are parameters for UniversalUDPMux server reflexive.
  31. type UniversalUDPMuxParams struct {
  32. Logger logging.LeveledLogger
  33. UDPConn net.PacketConn
  34. XORMappedAddrCacheTTL time.Duration
  35. Net transport.Net
  36. }
  37. // NewUniversalUDPMuxDefault creates an implementation of UniversalUDPMux embedding UDPMux
  38. func NewUniversalUDPMuxDefault(params UniversalUDPMuxParams) *UniversalUDPMuxDefault {
  39. if params.Logger == nil {
  40. params.Logger = logging.NewDefaultLoggerFactory().NewLogger("ice")
  41. }
  42. if params.XORMappedAddrCacheTTL == 0 {
  43. params.XORMappedAddrCacheTTL = time.Second * 25
  44. }
  45. m := &UniversalUDPMuxDefault{
  46. params: params,
  47. xorMappedMap: make(map[string]*xorMapped),
  48. }
  49. // Wrap UDP connection, process server reflexive messages
  50. // before they are passed to the UDPMux connection handler (connWorker)
  51. m.params.UDPConn = &udpConn{
  52. PacketConn: params.UDPConn,
  53. mux: m,
  54. logger: params.Logger,
  55. }
  56. // Embed UDPMux
  57. udpMuxParams := UDPMuxParams{
  58. Logger: params.Logger,
  59. UDPConn: m.params.UDPConn,
  60. Net: m.params.Net,
  61. }
  62. m.UDPMuxDefault = NewUDPMuxDefault(udpMuxParams)
  63. // [Psiphon]
  64. // See race condition comment in NewUDPMuxDefault.
  65. go m.UDPMuxDefault.connWorker()
  66. return m
  67. }
  68. // udpConn is a wrapper around UDPMux conn that overrides ReadFrom and handles STUN/TURN packets
  69. type udpConn struct {
  70. net.PacketConn
  71. mux *UniversalUDPMuxDefault
  72. logger logging.LeveledLogger
  73. }
  74. // GetRelayedAddr creates relayed connection to the given TURN service and returns the relayed addr.
  75. // Not implemented yet.
  76. func (m *UniversalUDPMuxDefault) GetRelayedAddr(net.Addr, time.Duration) (*net.Addr, error) {
  77. return nil, errNotImplemented
  78. }
  79. // GetConnForURL add uniques to the muxed connection by concatenating ufrag and URL (e.g. STUN URL) to be able to support multiple STUN/TURN servers
  80. // and return a unique connection per server.
  81. func (m *UniversalUDPMuxDefault) GetConnForURL(ufrag string, url string, addr net.Addr) (net.PacketConn, error) {
  82. return m.UDPMuxDefault.GetConn(fmt.Sprintf("%s%s", ufrag, url), addr)
  83. }
  84. // ReadFrom is called by UDPMux connWorker and handles packets coming from the STUN server discovering a mapped address.
  85. // It passes processed packets further to the UDPMux (maybe this is not really necessary).
  86. func (c *udpConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
  87. n, addr, err = c.PacketConn.ReadFrom(p)
  88. if err != nil {
  89. return
  90. }
  91. if stun.IsMessage(p[:n]) {
  92. msg := &stun.Message{
  93. Raw: append([]byte{}, p[:n]...),
  94. }
  95. if err = msg.Decode(); err != nil {
  96. c.logger.Warnf("Failed to handle decode ICE from %s: %v", addr.String(), err)
  97. err = nil
  98. return
  99. }
  100. udpAddr, ok := addr.(*net.UDPAddr)
  101. if !ok {
  102. // Message about this err will be logged in the UDPMux
  103. return
  104. }
  105. if c.mux.isXORMappedResponse(msg, udpAddr.String()) {
  106. err = c.mux.handleXORMappedResponse(udpAddr, msg)
  107. if err != nil {
  108. c.logger.Debugf("%w: %v", errGetXorMappedAddrResponse, err)
  109. err = nil
  110. }
  111. return
  112. }
  113. }
  114. return n, addr, err
  115. }
  116. // isXORMappedResponse indicates whether the message is a XORMappedAddress and is coming from the known STUN server.
  117. func (m *UniversalUDPMuxDefault) isXORMappedResponse(msg *stun.Message, stunAddr string) bool {
  118. m.mu.Lock()
  119. defer m.mu.Unlock()
  120. // Check first if it is a STUN server address because remote peer can also send similar messages but as a BindingSuccess
  121. _, ok := m.xorMappedMap[stunAddr]
  122. _, err := msg.Get(stun.AttrXORMappedAddress)
  123. return err == nil && ok
  124. }
  125. // handleXORMappedResponse parses response from the STUN server, extracts XORMappedAddress attribute
  126. // and set the mapped address for the server
  127. func (m *UniversalUDPMuxDefault) handleXORMappedResponse(stunAddr *net.UDPAddr, msg *stun.Message) error {
  128. m.mu.Lock()
  129. defer m.mu.Unlock()
  130. mappedAddr, ok := m.xorMappedMap[stunAddr.String()]
  131. if !ok {
  132. return errNoXorAddrMapping
  133. }
  134. var addr stun.XORMappedAddress
  135. if err := addr.GetFrom(msg); err != nil {
  136. return err
  137. }
  138. m.xorMappedMap[stunAddr.String()] = mappedAddr
  139. mappedAddr.SetAddr(&addr)
  140. return nil
  141. }
  142. // GetXORMappedAddr returns *stun.XORMappedAddress if already present for a given STUN server.
  143. // Makes a STUN binding request to discover mapped address otherwise.
  144. // Blocks until the stun.XORMappedAddress has been discovered or deadline.
  145. // Method is safe for concurrent use.
  146. func (m *UniversalUDPMuxDefault) GetXORMappedAddr(serverAddr net.Addr, deadline time.Duration) (*stun.XORMappedAddress, error) {
  147. m.mu.Lock()
  148. mappedAddr, ok := m.xorMappedMap[serverAddr.String()]
  149. // If we already have a mapping for this STUN server (address already received)
  150. // and if it is not too old we return it without making a new request to STUN server
  151. if ok {
  152. if mappedAddr.expired() {
  153. mappedAddr.closeWaiters()
  154. delete(m.xorMappedMap, serverAddr.String())
  155. ok = false
  156. } else if mappedAddr.pending() {
  157. ok = false
  158. }
  159. }
  160. m.mu.Unlock()
  161. if ok {
  162. return mappedAddr.addr, nil
  163. }
  164. // Otherwise, make a STUN request to discover the address
  165. // or wait for already sent request to complete
  166. waitAddrReceived, err := m.writeSTUN(serverAddr)
  167. if err != nil {
  168. return nil, fmt.Errorf("%w: %s", errWriteSTUNMessage, err) //nolint:errorlint
  169. }
  170. // Block until response was handled by the connWorker routine and XORMappedAddress was updated
  171. select {
  172. case <-waitAddrReceived:
  173. // When channel closed, addr was obtained
  174. m.mu.Lock()
  175. mappedAddr := *m.xorMappedMap[serverAddr.String()]
  176. m.mu.Unlock()
  177. if mappedAddr.addr == nil {
  178. return nil, errNoXorAddrMapping
  179. }
  180. return mappedAddr.addr, nil
  181. case <-time.After(deadline):
  182. return nil, errXORMappedAddrTimeout
  183. }
  184. }
  185. // writeSTUN sends a STUN request via UDP conn.
  186. //
  187. // The returned channel is closed when the STUN response has been received.
  188. // Method is safe for concurrent use.
  189. func (m *UniversalUDPMuxDefault) writeSTUN(serverAddr net.Addr) (chan struct{}, error) {
  190. m.mu.Lock()
  191. defer m.mu.Unlock()
  192. // If record present in the map, we already sent a STUN request,
  193. // just wait when waitAddrReceived will be closed
  194. addrMap, ok := m.xorMappedMap[serverAddr.String()]
  195. if !ok {
  196. addrMap = &xorMapped{
  197. expiresAt: time.Now().Add(m.params.XORMappedAddrCacheTTL),
  198. waitAddrReceived: make(chan struct{}),
  199. }
  200. m.xorMappedMap[serverAddr.String()] = addrMap
  201. }
  202. req, err := stun.Build(stun.BindingRequest, stun.TransactionID)
  203. if err != nil {
  204. return nil, err
  205. }
  206. if _, err = m.params.UDPConn.WriteTo(req.Raw, serverAddr); err != nil {
  207. return nil, err
  208. }
  209. return addrMap.waitAddrReceived, nil
  210. }
  211. type xorMapped struct {
  212. addr *stun.XORMappedAddress
  213. waitAddrReceived chan struct{}
  214. expiresAt time.Time
  215. }
  216. func (a *xorMapped) closeWaiters() {
  217. select {
  218. case <-a.waitAddrReceived:
  219. // Notify was close, ok, that means we received duplicate response just exit
  220. break
  221. default:
  222. // Notify tha twe have a new addr
  223. close(a.waitAddrReceived)
  224. }
  225. }
  226. func (a *xorMapped) pending() bool {
  227. return a.addr == nil
  228. }
  229. func (a *xorMapped) expired() bool {
  230. return a.expiresAt.Before(time.Now())
  231. }
  232. func (a *xorMapped) SetAddr(addr *stun.XORMappedAddress) {
  233. a.addr = addr
  234. a.closeWaiters()
  235. }