| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404 |
- // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
- // SPDX-License-Identifier: MIT
- package ice
- import (
- "errors"
- "io"
- "net"
- "os"
- "strings"
- "sync"
- "github.com/pion/logging"
- "github.com/pion/stun"
- "github.com/pion/transport/v2"
- "github.com/pion/transport/v2/stdnet"
- )
- // UDPMux allows multiple connections to go over a single UDP port
- type UDPMux interface {
- io.Closer
- GetConn(ufrag string, addr net.Addr) (net.PacketConn, error)
- RemoveConnByUfrag(ufrag string)
- GetListenAddresses() []net.Addr
- }
- // UDPMuxDefault is an implementation of the interface
- type UDPMuxDefault struct {
- params UDPMuxParams
- closedChan chan struct{}
- closeOnce sync.Once
- // connsIPv4 and connsIPv6 are maps of all udpMuxedConn indexed by ufrag|network|candidateType
- connsIPv4, connsIPv6 map[string]*udpMuxedConn
- addressMapMu sync.RWMutex
- addressMap map[udpMuxedConnAddr]*udpMuxedConn
- // Buffer pool to recycle buffers for net.UDPAddr encodes/decodes
- pool *sync.Pool
- mu sync.Mutex
- // For UDP connection listen at unspecified address
- localAddrsForUnspecified []net.Addr
- }
- // UDPMuxParams are parameters for UDPMux.
- type UDPMuxParams struct {
- Logger logging.LeveledLogger
- UDPConn net.PacketConn
- // Required for gathering local addresses
- // in case a un UDPConn is passed which does not
- // bind to a specific local address.
- Net transport.Net
- }
- // NewUDPMuxDefault creates an implementation of UDPMux
- func NewUDPMuxDefault(params UDPMuxParams) *UDPMuxDefault {
- if params.Logger == nil {
- params.Logger = logging.NewDefaultLoggerFactory().NewLogger("ice")
- }
- var localAddrsForUnspecified []net.Addr
- if addr, ok := params.UDPConn.LocalAddr().(*net.UDPAddr); !ok {
- params.Logger.Errorf("LocalAddr is not a net.UDPAddr, got %T", params.UDPConn.LocalAddr())
- } else if ok && addr.IP.IsUnspecified() {
- // For unspecified addresses, the correct behavior is to return errListenUnspecified, but
- // it will break the applications that are already using unspecified UDP connection
- // with UDPMuxDefault, so print a warn log and create a local address list for mux.
- params.Logger.Warn("UDPMuxDefault should not listening on unspecified address, use NewMultiUDPMuxFromPort instead")
- var networks []NetworkType
- switch {
- case addr.IP.To4() != nil:
- networks = []NetworkType{NetworkTypeUDP4}
- case addr.IP.To16() != nil:
- networks = []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6}
- default:
- params.Logger.Errorf("LocalAddr expected IPV4 or IPV6, got %T", params.UDPConn.LocalAddr())
- }
- if len(networks) > 0 {
- if params.Net == nil {
- var err error
- if params.Net, err = stdnet.NewNet(); err != nil {
- params.Logger.Errorf("Failed to get create network: %v", err)
- }
- }
- ips, err := localInterfaces(params.Net, nil, nil, networks, true)
- if err == nil {
- for _, ip := range ips {
- localAddrsForUnspecified = append(localAddrsForUnspecified, &net.UDPAddr{IP: ip, Port: addr.Port})
- }
- } else {
- params.Logger.Errorf("Failed to get local interfaces for unspecified addr: %v", err)
- }
- }
- }
- m := &UDPMuxDefault{
- addressMap: map[udpMuxedConnAddr]*udpMuxedConn{},
- params: params,
- connsIPv4: make(map[string]*udpMuxedConn),
- connsIPv6: make(map[string]*udpMuxedConn),
- closedChan: make(chan struct{}, 1),
- pool: &sync.Pool{
- New: func() interface{} {
- // Big enough buffer to fit both packet and address
- return newBufferHolder(receiveMTU)
- },
- },
- localAddrsForUnspecified: localAddrsForUnspecified,
- }
- // [Psiphon]
- //
- // - Currently, pion/ice code produces the following race condition due to
- // NewUDPMuxDefault launching go m.connWorker() before the called
- // assigns m.UDPMuxDefault = NewUDPMuxDefault(udpMuxParams), while
- // connWorker may access fields in the
- // UniversalUDPMuxDefault.UDPMuxDefault embedded struct.
- //
- // - For Psiphon's use case, the simple workaround is to delay launching
- // go m.connWorker() until after the assignment in
- // NewUniversalUDPMuxDefault. This isn't a general purpose fix since it
- // means NewUDPMuxDefault by itself won't work.
- //
- // - Note that the IsPsiphon flag/check added for
- // gatherCandidatesSrflxUDPMux also checks that this fix is in place.
- //
- //
- // ==================
- // WARNING: DATA RACE
- // Read at 0x00c000ee28c0 by goroutine 22319:
- // github.com/pion/ice/v2.(*UniversalUDPMuxDefault).isXORMappedResponse()
- // /pion/ice/v2/udp_mux_universal.go:136 +0x40
- // github.com/pion/ice/v2.(*udpConn).ReadFrom()
- // /pion/ice/v2/udp_mux_universal.go:122 +0x234
- // github.com/pion/ice/v2.(*UDPMuxDefault).connWorker()
- // /pion/ice/v2/udp_mux.go:286 +0xd4
- // github.com/pion/ice/v2.NewUDPMuxDefault.func2()
- // /pion/ice/v2/udp_mux.go:122 +0x34
- //
- // Previous write at 0x00c000ee28c0 by goroutine 22315:
- // github.com/pion/ice/v2.NewUniversalUDPMuxDefault()
- // /pion/ice/v2/udp_mux_universal.go:73 +0x354
- // github.com/pion/webrtc/v3.NewICEUniversalUDPMux()
- // /pion/webrtc/v3/icemux.go:39 +0x2b4
- // ==================
- //go m.connWorker()
- return m
- }
- // LocalAddr returns the listening address of this UDPMuxDefault
- func (m *UDPMuxDefault) LocalAddr() net.Addr {
- return m.params.UDPConn.LocalAddr()
- }
- // GetListenAddresses returns the list of addresses that this mux is listening on
- func (m *UDPMuxDefault) GetListenAddresses() []net.Addr {
- if len(m.localAddrsForUnspecified) > 0 {
- return m.localAddrsForUnspecified
- }
- return []net.Addr{m.LocalAddr()}
- }
- // GetConn returns a PacketConn given the connection's ufrag and network address
- // creates the connection if an existing one can't be found
- func (m *UDPMuxDefault) GetConn(ufrag string, addr net.Addr) (net.PacketConn, error) {
- // don't check addr for mux using unspecified address
- if len(m.localAddrsForUnspecified) == 0 && m.params.UDPConn.LocalAddr().String() != addr.String() {
- return nil, errInvalidAddress
- }
- var isIPv6 bool
- if udpAddr, _ := addr.(*net.UDPAddr); udpAddr != nil && udpAddr.IP.To4() == nil {
- isIPv6 = true
- }
- m.mu.Lock()
- defer m.mu.Unlock()
- if m.IsClosed() {
- return nil, io.ErrClosedPipe
- }
- if conn, ok := m.getConn(ufrag, isIPv6); ok {
- return conn, nil
- }
- c := m.createMuxedConn(ufrag)
- go func() {
- <-c.CloseChannel()
- m.RemoveConnByUfrag(ufrag)
- }()
- if isIPv6 {
- m.connsIPv6[ufrag] = c
- } else {
- m.connsIPv4[ufrag] = c
- }
- return c, nil
- }
- // RemoveConnByUfrag stops and removes the muxed packet connection
- func (m *UDPMuxDefault) RemoveConnByUfrag(ufrag string) {
- removedConns := make([]*udpMuxedConn, 0, 2)
- // Keep lock section small to avoid deadlock with conn lock
- m.mu.Lock()
- if c, ok := m.connsIPv4[ufrag]; ok {
- delete(m.connsIPv4, ufrag)
- removedConns = append(removedConns, c)
- }
- if c, ok := m.connsIPv6[ufrag]; ok {
- delete(m.connsIPv6, ufrag)
- removedConns = append(removedConns, c)
- }
- m.mu.Unlock()
- if len(removedConns) == 0 {
- // No need to lock if no connection was found
- return
- }
- m.addressMapMu.Lock()
- defer m.addressMapMu.Unlock()
- for _, c := range removedConns {
- addresses := c.getAddresses()
- for _, addr := range addresses {
- delete(m.addressMap, addr)
- }
- }
- }
- // IsClosed returns true if the mux had been closed
- func (m *UDPMuxDefault) IsClosed() bool {
- select {
- case <-m.closedChan:
- return true
- default:
- return false
- }
- }
- // Close the mux, no further connections could be created
- func (m *UDPMuxDefault) Close() error {
- var err error
- m.closeOnce.Do(func() {
- m.mu.Lock()
- defer m.mu.Unlock()
- for _, c := range m.connsIPv4 {
- _ = c.Close()
- }
- for _, c := range m.connsIPv6 {
- _ = c.Close()
- }
- m.connsIPv4 = make(map[string]*udpMuxedConn)
- m.connsIPv6 = make(map[string]*udpMuxedConn)
- close(m.closedChan)
- _ = m.params.UDPConn.Close()
- })
- return err
- }
- func (m *UDPMuxDefault) writeTo(buf []byte, rAddr net.Addr) (n int, err error) {
- return m.params.UDPConn.WriteTo(buf, rAddr)
- }
- func (m *UDPMuxDefault) registerConnForAddress(conn *udpMuxedConn, addr udpMuxedConnAddr) {
- if m.IsClosed() {
- return
- }
- m.addressMapMu.Lock()
- defer m.addressMapMu.Unlock()
- existing, ok := m.addressMap[addr]
- if ok {
- existing.removeAddress(addr)
- }
- m.addressMap[addr] = conn
- m.params.Logger.Debugf("Registered %s for %s", addr, conn.params.Key)
- }
- func (m *UDPMuxDefault) createMuxedConn(key string) *udpMuxedConn {
- c := newUDPMuxedConn(&udpMuxedConnParams{
- Mux: m,
- Key: key,
- AddrPool: m.pool,
- LocalAddr: m.LocalAddr(),
- Logger: m.params.Logger,
- })
- return c
- }
- func (m *UDPMuxDefault) connWorker() {
- logger := m.params.Logger
- defer func() {
- _ = m.Close()
- }()
- buf := make([]byte, receiveMTU)
- for {
- n, addr, err := m.params.UDPConn.ReadFrom(buf)
- if m.IsClosed() {
- return
- } else if err != nil {
- if os.IsTimeout(err) {
- continue
- } else if !errors.Is(err, io.EOF) {
- logger.Errorf("Failed to read UDP packet: %v", err)
- }
- return
- }
- udpAddr, ok := addr.(*net.UDPAddr)
- if !ok {
- logger.Errorf("Underlying PacketConn did not return a UDPAddr")
- return
- }
- // If we have already seen this address dispatch to the appropriate destination
- m.addressMapMu.Lock()
- destinationConn := m.addressMap[newUDPMuxedConnAddr(udpAddr)]
- m.addressMapMu.Unlock()
- // If we haven't seen this address before but is a STUN packet lookup by ufrag
- if destinationConn == nil && stun.IsMessage(buf[:n]) {
- msg := &stun.Message{
- Raw: append([]byte{}, buf[:n]...),
- }
- if err = msg.Decode(); err != nil {
- m.params.Logger.Warnf("Failed to handle decode ICE from %s: %v", addr.String(), err)
- continue
- }
- attr, stunAttrErr := msg.Get(stun.AttrUsername)
- if stunAttrErr != nil {
- m.params.Logger.Warnf("No Username attribute in STUN message from %s", addr.String())
- continue
- }
- ufrag := strings.Split(string(attr), ":")[0]
- isIPv6 := udpAddr.IP.To4() == nil
- m.mu.Lock()
- destinationConn, _ = m.getConn(ufrag, isIPv6)
- m.mu.Unlock()
- }
- if destinationConn == nil {
- m.params.Logger.Tracef("Dropping packet from %s, addr: %s", udpAddr, addr)
- continue
- }
- if err = destinationConn.writePacket(buf[:n], udpAddr); err != nil {
- m.params.Logger.Errorf("Failed to write packet: %v", err)
- }
- }
- }
- func (m *UDPMuxDefault) getConn(ufrag string, isIPv6 bool) (val *udpMuxedConn, ok bool) {
- if isIPv6 {
- val, ok = m.connsIPv6[ufrag]
- } else {
- val, ok = m.connsIPv4[ufrag]
- }
- return
- }
- type bufferHolder struct {
- next *bufferHolder
- buf []byte
- addr *net.UDPAddr
- }
- func newBufferHolder(size int) *bufferHolder {
- return &bufferHolder{
- buf: make([]byte, size),
- }
- }
- func (b *bufferHolder) reset() {
- b.next = nil
- b.addr = nil
- }
|