active_tcp.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. // SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
  2. // SPDX-License-Identifier: MIT
  3. package ice
  4. import (
  5. "context"
  6. "io"
  7. "net"
  8. "sync/atomic"
  9. "time"
  10. "github.com/pion/logging"
  11. "github.com/pion/transport/v2/packetio"
  12. )
  13. type activeTCPConn struct {
  14. readBuffer, writeBuffer *packetio.Buffer
  15. localAddr, remoteAddr atomic.Value
  16. closed int32
  17. }
  18. func newActiveTCPConn(ctx context.Context, localAddress, remoteAddress string, log logging.LeveledLogger) (a *activeTCPConn) {
  19. a = &activeTCPConn{
  20. readBuffer: packetio.NewBuffer(),
  21. writeBuffer: packetio.NewBuffer(),
  22. }
  23. laddr, err := getTCPAddrOnInterface(localAddress)
  24. if err != nil {
  25. atomic.StoreInt32(&a.closed, 1)
  26. log.Infof("Failed to dial TCP address %s: %v", remoteAddress, err)
  27. return
  28. }
  29. a.localAddr.Store(laddr)
  30. go func() {
  31. defer func() {
  32. atomic.StoreInt32(&a.closed, 1)
  33. }()
  34. dialer := &net.Dialer{
  35. LocalAddr: laddr,
  36. }
  37. conn, err := dialer.DialContext(ctx, "tcp", remoteAddress)
  38. if err != nil {
  39. log.Infof("Failed to dial TCP address %s: %v", remoteAddress, err)
  40. return
  41. }
  42. a.remoteAddr.Store(conn.RemoteAddr())
  43. go func() {
  44. buff := make([]byte, receiveMTU)
  45. for atomic.LoadInt32(&a.closed) == 0 {
  46. n, err := readStreamingPacket(conn, buff)
  47. if err != nil {
  48. log.Infof("Failed to read streaming packet: %s", err)
  49. break
  50. }
  51. if _, err := a.readBuffer.Write(buff[:n]); err != nil {
  52. log.Infof("Failed to write to buffer: %s", err)
  53. break
  54. }
  55. }
  56. }()
  57. buff := make([]byte, receiveMTU)
  58. for atomic.LoadInt32(&a.closed) == 0 {
  59. n, err := a.writeBuffer.Read(buff)
  60. if err != nil {
  61. log.Infof("Failed to read from buffer: %s", err)
  62. break
  63. }
  64. if _, err = writeStreamingPacket(conn, buff[:n]); err != nil {
  65. log.Infof("Failed to write streaming packet: %s", err)
  66. break
  67. }
  68. }
  69. if err := conn.Close(); err != nil {
  70. log.Infof("Failed to close connection: %s", err)
  71. }
  72. }()
  73. return a
  74. }
  75. func (a *activeTCPConn) ReadFrom(buff []byte) (n int, srcAddr net.Addr, err error) {
  76. if atomic.LoadInt32(&a.closed) == 1 {
  77. return 0, nil, io.ErrClosedPipe
  78. }
  79. srcAddr = a.RemoteAddr()
  80. n, err = a.readBuffer.Read(buff)
  81. return
  82. }
  83. func (a *activeTCPConn) WriteTo(buff []byte, _ net.Addr) (n int, err error) {
  84. if atomic.LoadInt32(&a.closed) == 1 {
  85. return 0, io.ErrClosedPipe
  86. }
  87. return a.writeBuffer.Write(buff)
  88. }
  89. func (a *activeTCPConn) Close() error {
  90. atomic.StoreInt32(&a.closed, 1)
  91. _ = a.readBuffer.Close()
  92. _ = a.writeBuffer.Close()
  93. return nil
  94. }
  95. func (a *activeTCPConn) LocalAddr() net.Addr {
  96. if v, ok := a.localAddr.Load().(*net.TCPAddr); ok {
  97. return v
  98. }
  99. return &net.TCPAddr{}
  100. }
  101. func (a *activeTCPConn) RemoteAddr() net.Addr {
  102. if v, ok := a.remoteAddr.Load().(*net.TCPAddr); ok {
  103. return v
  104. }
  105. return &net.TCPAddr{}
  106. }
  107. func (a *activeTCPConn) SetDeadline(time.Time) error { return io.EOF }
  108. func (a *activeTCPConn) SetReadDeadline(time.Time) error { return io.EOF }
  109. func (a *activeTCPConn) SetWriteDeadline(time.Time) error { return io.EOF }
  110. func getTCPAddrOnInterface(address string) (*net.TCPAddr, error) {
  111. addr, err := net.ResolveTCPAddr("tcp", address)
  112. if err != nil {
  113. return nil, err
  114. }
  115. l, err := net.ListenTCP("tcp", addr)
  116. if err != nil {
  117. return nil, err
  118. }
  119. defer func() {
  120. _ = l.Close()
  121. }()
  122. tcpAddr, ok := l.Addr().(*net.TCPAddr)
  123. if !ok {
  124. return nil, errInvalidAddress
  125. }
  126. return tcpAddr, nil
  127. }