瀏覽代碼

Add LimitCPUThreads and LimitRelayBufferSizes

Rod Hynes 4 年之前
父節點
當前提交
442a45108b
共有 5 個文件被更改,包括 46 次插入7 次删除
  1. 7 0
      psiphon/config.go
  2. 5 0
      psiphon/controller.go
  3. 5 3
      psiphon/httpProxy.go
  4. 26 3
      psiphon/net.go
  5. 3 1
      psiphon/socksProxy.go

+ 7 - 0
psiphon/config.go

@@ -247,6 +247,13 @@ type Config struct {
 	// LimitMeekBufferSizes selects smaller buffers for meek protocols.
 	LimitMeekBufferSizes bool
 
+	// LimitCPUThreads minimizes the number of CPU threads -- and associated
+	// overhead -- the are used.
+	LimitCPUThreads bool
+
+	// LimitRelayBufferSizes selects smaller buffers for port forward relaying.
+	LimitRelayBufferSizes bool
+
 	// IgnoreHandshakeStatsRegexps skips compiling and using stats regexes.
 	IgnoreHandshakeStatsRegexps bool
 

+ 5 - 0
psiphon/controller.go

@@ -28,6 +28,7 @@ import (
 	"fmt"
 	"math/rand"
 	"net"
+	"runtime"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -188,6 +189,10 @@ func NewController(config *Config) (controller *Controller, err error) {
 // component fails or the parent context is canceled.
 func (controller *Controller) Run(ctx context.Context) {
 
+	if controller.config.LimitCPUThreads {
+		runtime.GOMAXPROCS(1)
+	}
+
 	pprofRun()
 
 	// Ensure fresh repetitive notice state for each run, so the

+ 5 - 3
psiphon/httpProxy.go

@@ -76,6 +76,7 @@ import (
 // URL encoded.
 //
 type HttpProxy struct {
+	config                 *Config
 	tunneler               Tunneler
 	listener               net.Listener
 	serveWaitGroup         *sync.WaitGroup
@@ -162,6 +163,7 @@ func NewHttpProxy(
 	proxyPort, _ := strconv.Atoi(proxyPortString)
 
 	proxy = &HttpProxy{
+		config:                 config,
 		tunneler:               tunneler,
 		listener:               listener,
 		serveWaitGroup:         new(sync.WaitGroup),
@@ -262,7 +264,7 @@ func (proxy *HttpProxy) httpConnectHandler(localConn net.Conn, target string) (e
 	if err != nil {
 		return errors.Trace(err)
 	}
-	LocalProxyRelay(_HTTP_PROXY_TYPE, localConn, remoteConn)
+	LocalProxyRelay(proxy.config, _HTTP_PROXY_TYPE, localConn, remoteConn)
 	return nil
 }
 
@@ -572,7 +574,7 @@ func (proxy *HttpProxy) relayHTTPRequest(
 			return
 		}
 
-		_, err = io.Copy(conn, response.Body)
+		_, err = RelayCopyBuffer(proxy.config, conn, response.Body)
 		if err != nil {
 			NoticeWarning("write body failed: %s", errors.Trace(err))
 			conn.Close()
@@ -584,7 +586,7 @@ func (proxy *HttpProxy) relayHTTPRequest(
 		// Standard HTTP response.
 
 		responseWriter.WriteHeader(response.StatusCode)
-		_, err = io.Copy(responseWriter, response.Body)
+		_, err = RelayCopyBuffer(proxy.config, responseWriter, response.Body)
 		if err != nil {
 			NoticeWarning("%s", errors.Trace(err))
 			forceClose(responseWriter)

+ 26 - 3
psiphon/net.go

@@ -220,7 +220,7 @@ func (d *NetDialer) DialContext(ctx context.Context, network, address string) (n
 // LocalProxyRelay must close localConn in order to interrupt blocking
 // I/O calls when the upstream port forward is closed. remoteConn is
 // also closed before returning.
-func LocalProxyRelay(proxyType string, localConn, remoteConn net.Conn) {
+func LocalProxyRelay(config *Config, proxyType string, localConn, remoteConn net.Conn) {
 
 	closing := int32(0)
 
@@ -230,7 +230,7 @@ func LocalProxyRelay(proxyType string, localConn, remoteConn net.Conn) {
 	go func() {
 		defer copyWaitGroup.Done()
 
-		_, err := io.Copy(localConn, remoteConn)
+		_, err := RelayCopyBuffer(config, localConn, remoteConn)
 		if err != nil && atomic.LoadInt32(&closing) != 1 {
 			NoticeLocalProxyError(proxyType, errors.TraceMsg(err, "Relay failed"))
 		}
@@ -245,7 +245,7 @@ func LocalProxyRelay(proxyType string, localConn, remoteConn net.Conn) {
 		localConn.Close()
 	}()
 
-	_, err := io.Copy(remoteConn, localConn)
+	_, err := RelayCopyBuffer(config, remoteConn, localConn)
 	if err != nil && atomic.LoadInt32(&closing) != 1 {
 		NoticeLocalProxyError(proxyType, errors.TraceMsg(err, "Relay failed"))
 	}
@@ -260,6 +260,29 @@ func LocalProxyRelay(proxyType string, localConn, remoteConn net.Conn) {
 	copyWaitGroup.Wait()
 }
 
+// RelayCopyBuffer performs an io.Copy, optionally using a smaller buffer when
+// config.LimitRelayBufferSizes is set.
+func RelayCopyBuffer(config *Config, dst io.Writer, src io.Reader) (int64, error) {
+
+	// By default, io.CopyBuffer will allocate a 32K buffer when a nil buffer
+	// is passed in. When configured, make and specify a smaller buffer. But
+	// only if src doesn't implement WriterTo and dst doesn't implement
+	// ReaderFrom, as in those cases io.CopyBuffer entirely avoids a buffer
+	// allocation.
+
+	var buffer []byte
+	if config.LimitRelayBufferSizes {
+		_, isWT := src.(io.WriterTo)
+		_, isRF := dst.(io.ReaderFrom)
+		if !isWT && !isRF {
+			buffer = make([]byte, 4096)
+		}
+	}
+
+	// Do not wrap any I/O errors
+	return io.CopyBuffer(dst, src, buffer)
+}
+
 // WaitForNetworkConnectivity uses a NetworkConnectivityChecker to
 // periodically check for network connectivity. It returns true if
 // no NetworkConnectivityChecker is provided (waiting is disabled)

+ 3 - 1
psiphon/socksProxy.go

@@ -35,6 +35,7 @@ import (
 // the tunnel SSH client and relays traffic through the port
 // forward.
 type SocksProxy struct {
+	config                 *Config
 	tunneler               Tunneler
 	listener               *socks.SocksListener
 	serveWaitGroup         *sync.WaitGroup
@@ -61,6 +62,7 @@ func NewSocksProxy(
 		return nil, errors.Trace(err)
 	}
 	proxy = &SocksProxy{
+		config:                 config,
 		tunneler:               tunneler,
 		listener:               listener,
 		serveWaitGroup:         new(sync.WaitGroup),
@@ -113,7 +115,7 @@ func (proxy *SocksProxy) socksConnectionHandler(localConn *socks.SocksConn) (err
 		return errors.Trace(err)
 	}
 
-	LocalProxyRelay(_SOCKS_PROXY_TYPE, localConn, remoteConn)
+	LocalProxyRelay(proxy.config, _SOCKS_PROXY_TYPE, localConn, remoteConn)
 
 	return nil
 }