client_proxy.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package marionette
  2. import (
  3. "io"
  4. "net"
  5. "sync"
  6. "go.uber.org/zap"
  7. )
  8. // ClientProxy represents a proxy between incoming connections and a marionette dialer.
  9. type ClientProxy struct {
  10. ln net.Listener
  11. dialer *Dialer
  12. wg sync.WaitGroup
  13. }
  14. // NewClientProxy returns a new instance of ClientProxy.
  15. func NewClientProxy(ln net.Listener, dialer *Dialer) *ClientProxy {
  16. return &ClientProxy{
  17. ln: ln,
  18. dialer: dialer,
  19. }
  20. }
  21. // Open starts the proxy listeners and waits for connections.
  22. func (p *ClientProxy) Open() error {
  23. p.wg.Add(1)
  24. go func() { defer p.wg.Done(); p.run() }()
  25. return nil
  26. }
  27. // Close stops the listener.
  28. func (p *ClientProxy) Close() error {
  29. if p.ln != nil {
  30. return p.ln.Close()
  31. }
  32. return nil
  33. }
  34. // run executes in a separate goroutine and continually processes incoming connections.
  35. func (p *ClientProxy) run() {
  36. Logger.Debug("client proxy: listening")
  37. defer Logger.Debug("client proxy: closed")
  38. for {
  39. conn, err := p.ln.Accept()
  40. if err != nil {
  41. Logger.Debug("client proxy: listener error", zap.Error(err))
  42. return
  43. }
  44. p.wg.Add(1)
  45. go func() { defer p.wg.Done(); p.handleConn(conn) }()
  46. }
  47. }
  48. // handleConn continually copies between the incoming connection and stream.
  49. func (p *ClientProxy) handleConn(incomingConn net.Conn) {
  50. defer incomingConn.Close()
  51. Logger.Debug("client proxy: connection open")
  52. defer Logger.Debug("client proxy: connection closed")
  53. // Create a new stream.
  54. stream, err := p.dialer.Dial()
  55. if err != nil {
  56. Logger.Debug("client proxy: cannot connect create new stream", zap.Error(err))
  57. return
  58. }
  59. defer stream.Close()
  60. // Copy between incoming connection and stream until an error occurs.
  61. var wg sync.WaitGroup
  62. wg.Add(2)
  63. go func() {
  64. defer wg.Done()
  65. io.Copy(incomingConn, stream)
  66. incomingConn.Close()
  67. }()
  68. go func() {
  69. defer wg.Done()
  70. io.Copy(stream, incomingConn)
  71. stream.Close()
  72. }()
  73. wg.Wait()
  74. }