Selaa lähdekoodia

Merge pull request #442 from rod-hynes/master

psiphond performance tweaks
Rod Hynes 8 vuotta sitten
vanhempi
sitoutus
f305ffd253

+ 8 - 3
psiphon/common/tun/tun.go

@@ -170,7 +170,7 @@ type ServerConfig struct {
 	// logged as warnings only. This option is intended to support
 	// logged as warnings only. This option is intended to support
 	// test cases on hosts without IPv6 and is not for production use;
 	// test cases on hosts without IPv6 and is not for production use;
 	// the packet tunnel server will still accept IPv6 packets and
 	// the packet tunnel server will still accept IPv6 packets and
-	// replay them to the tun device.
+	// relay them to the tun device.
 	// AllowNoIPv6NetworkConfiguration may not be supported on all
 	// AllowNoIPv6NetworkConfiguration may not be supported on all
 	// platforms.
 	// platforms.
 	AllowNoIPv6NetworkConfiguration bool
 	AllowNoIPv6NetworkConfiguration bool
@@ -740,10 +740,14 @@ func (server *Server) runClientUpstream(session *session) {
 		}
 		}
 
 
 		if err != nil {
 		if err != nil {
+
+			// Debug since channel I/O errors occur during normal operation.
 			server.config.Logger.WithContextFields(
 			server.config.Logger.WithContextFields(
-				common.LogFields{"error": err}).Warning("read channel packet failed")
+				common.LogFields{"error": err}).Debug("read channel packet failed")
+
 			// Tear down the session. Must be invoked asynchronously.
 			// Tear down the session. Must be invoked asynchronously.
 			go server.interruptSession(session)
 			go server.interruptSession(session)
+
 			return
 			return
 		}
 		}
 
 
@@ -798,8 +802,9 @@ func (server *Server) runClientDownstream(session *session) {
 		err := session.channel.WriteFramedPackets(packetBuffer)
 		err := session.channel.WriteFramedPackets(packetBuffer)
 		if err != nil {
 		if err != nil {
 
 
+			// Debug since channel I/O errors occur during normal operation.
 			server.config.Logger.WithContextFields(
 			server.config.Logger.WithContextFields(
-				common.LogFields{"error": err}).Warning("write channel packets failed")
+				common.LogFields{"error": err}).Debug("write channel packets failed")
 
 
 			session.downstreamPackets.Replace(packetBuffer)
 			session.downstreamPackets.Replace(packetBuffer)
 
 

+ 7 - 0
psiphon/common/tun/tun_linux.go

@@ -190,6 +190,13 @@ func resetNATTables(
 		"--orig-src",
 		"--orig-src",
 		IPAddress.String())
 		IPAddress.String())
 	if err != nil {
 	if err != nil {
+
+		// conntrack exits with this error message when there are no flows
+		// to delete, which is not a failure condition.
+		if strings.Contains(err.Error(), "0 flow entries have been deleted") {
+			return nil
+		}
+
 		return common.ContextError(err)
 		return common.ContextError(err)
 	}
 	}
 
 

+ 10 - 0
psiphon/server/config.go

@@ -284,6 +284,11 @@ type Config struct {
 	// to drop below the limit.
 	// to drop below the limit.
 	// The default, 0 is no limit.
 	// The default, 0 is no limit.
 	MaxConcurrentSSHHandshakes int
 	MaxConcurrentSSHHandshakes int
+
+	// PeriodicGarbageCollectionSeconds turns on periodic calls to runtime.GC,
+	// every specified number of seconds, to force garbage collection.
+	// The default, 0 is off.
+	PeriodicGarbageCollectionSeconds int
 }
 }
 
 
 // RunWebServer indicates whether to run a web server component.
 // RunWebServer indicates whether to run a web server component.
@@ -296,6 +301,11 @@ func (config *Config) RunLoadMonitor() bool {
 	return config.LoadMonitorPeriodSeconds > 0
 	return config.LoadMonitorPeriodSeconds > 0
 }
 }
 
 
+// RunPeriodicGarbageCollection indicates whether to run periodic garbage collection.
+func (config *Config) RunPeriodicGarbageCollection() bool {
+	return config.PeriodicGarbageCollectionSeconds > 0
+}
+
 // LoadConfig loads and validates a JSON encoded server config.
 // LoadConfig loads and validates a JSON encoded server config.
 func LoadConfig(configJSON []byte) (*Config, error) {
 func LoadConfig(configJSON []byte) (*Config, error) {
 
 

+ 11 - 1
psiphon/server/meek.go

@@ -265,7 +265,9 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 
 
 	sessionID, session, err := server.getSession(request, meekCookie)
 	sessionID, session, err := server.getSession(request, meekCookie)
 	if err != nil {
 	if err != nil {
-		log.WithContextFields(LogFields{"error": err}).Warning("session lookup failed")
+		// Debug since session cookie errors commonly occur during
+		// normal operation.
+		log.WithContextFields(LogFields{"error": err}).Debug("session lookup failed")
 		server.terminateConnection(responseWriter, request)
 		server.terminateConnection(responseWriter, request)
 		return
 		return
 	}
 	}
@@ -486,6 +488,14 @@ func (server *MeekServer) getSession(
 		return existingSessionID, session, nil
 		return existingSessionID, session, nil
 	}
 	}
 
 
+	// Don't create new sessions when not establishing. A subsequent SSH handshake
+	// will not succeed, so creating a meek session just wastes resources.
+
+	if server.support.TunnelServer != nil &&
+		!server.support.TunnelServer.GetEstablishTunnels() {
+		return "", nil, common.ContextError(errors.New("not establishing tunnels"))
+	}
+
 	// TODO: can multiple http client connections using same session cookie
 	// TODO: can multiple http client connections using same session cookie
 	// cause race conditions on session struct?
 	// cause race conditions on session struct?
 
 

+ 3 - 0
psiphon/server/server_test.go

@@ -347,6 +347,9 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 	// TODO: test that the concurrency limit is correctly enforced.
 	// TODO: test that the concurrency limit is correctly enforced.
 	serverConfig["MaxConcurrentSSHHandshakes"] = 1
 	serverConfig["MaxConcurrentSSHHandshakes"] = 1
 
 
+	// Exercise this option.
+	serverConfig["PeriodicGarbageCollectionSeconds"] = 1
+
 	serverConfigJSON, _ = json.Marshal(serverConfig)
 	serverConfigJSON, _ = json.Marshal(serverConfig)
 
 
 	// run server
 	// run server

+ 49 - 16
psiphon/server/services.go

@@ -129,6 +129,23 @@ func RunServices(configJSON []byte) error {
 		}()
 		}()
 	}
 	}
 
 
+	if config.RunPeriodicGarbageCollection() {
+		waitGroup.Add(1)
+		go func() {
+			waitGroup.Done()
+			ticker := time.NewTicker(time.Duration(config.PeriodicGarbageCollectionSeconds) * time.Second)
+			defer ticker.Stop()
+			for {
+				select {
+				case <-shutdownBroadcast:
+					return
+				case <-ticker.C:
+					runtime.GC()
+				}
+			}
+		}()
+	}
+
 	if config.RunWebServer() {
 	if config.RunWebServer() {
 		waitGroup.Add(1)
 		waitGroup.Add(1)
 		go func() {
 		go func() {
@@ -234,21 +251,35 @@ loop:
 	return err
 	return err
 }
 }
 
 
-func outputProcessProfiles(config *Config) {
+func getRuntimeMetrics() LogFields {
+
+	numGoroutine := runtime.NumGoroutine()
 
 
 	var memStats runtime.MemStats
 	var memStats runtime.MemStats
 	runtime.ReadMemStats(&memStats)
 	runtime.ReadMemStats(&memStats)
-	log.WithContextFields(
-		LogFields{
-			"num_goroutine":   runtime.NumGoroutine(),
-			"alloc":           memStats.Alloc,
-			"total_alloc":     memStats.TotalAlloc,
-			"sys":             memStats.Sys,
-			"pause_total_ns":  memStats.PauseTotalNs,
-			"pause_ns":        memStats.PauseNs,
-			"num_gc":          memStats.NumGC,
-			"gc_cpu_fraction": memStats.GCCPUFraction,
-		}).Info("runtime_stats")
+
+	lastGC := ""
+	if memStats.LastGC > 0 {
+		lastGC = time.Unix(0, int64(memStats.LastGC)).UTC().Format(time.RFC3339)
+	}
+
+	return LogFields{
+		"num_goroutine": numGoroutine,
+		"heap_alloc":    memStats.HeapAlloc,
+		"heap_sys":      memStats.HeapSys,
+		"heap_idle":     memStats.HeapIdle,
+		"heap_inuse":    memStats.HeapInuse,
+		"heap_released": memStats.HeapReleased,
+		"heap_objects":  memStats.HeapObjects,
+		"num_gc":        memStats.NumGC,
+		"num_forced_gc": memStats.NumForcedGC,
+		"last_gc":       lastGC,
+	}
+}
+
+func outputProcessProfiles(config *Config) {
+
+	log.WithContextFields(getRuntimeMetrics()).Info("runtime_metrics")
 
 
 	if config.ProcessProfileOutputDirectory != "" {
 	if config.ProcessProfileOutputDirectory != "" {
 
 
@@ -333,13 +364,15 @@ func logServerLoad(server *TunnelServer) {
 
 
 	protocolStats, regionStats := server.GetLoadStats()
 	protocolStats, regionStats := server.GetLoadStats()
 
 
-	serverLoad := LogFields{
-		"event_name": "server_load",
-	}
+	serverLoad := getRuntimeMetrics()
+
+	serverLoad["event_name"] = "server_load"
+
+	serverLoad["establish_tunnels"] = server.GetEstablishTunnels()
+
 	for protocol, stats := range protocolStats {
 	for protocol, stats := range protocolStats {
 		serverLoad[protocol] = stats
 		serverLoad[protocol] = stats
 	}
 	}
-	serverLoad["establish_tunnels"] = server.GetEstablishTunnels()
 
 
 	log.LogRawFieldsWithTimestamp(serverLoad)
 	log.LogRawFieldsWithTimestamp(serverLoad)
 
 

+ 1 - 1
psiphon/server/tunnelServer.go

@@ -488,7 +488,7 @@ func (sshServer *sshServer) registerEstablishedClient(client *sshClient) bool {
 	// Call stop() outside the mutex to avoid deadlock.
 	// Call stop() outside the mutex to avoid deadlock.
 	if existingClient != nil {
 	if existingClient != nil {
 		existingClient.stop()
 		existingClient.stop()
-		log.WithContext().Info(
+		log.WithContext().Debug(
 			"stopped existing client with duplicate session ID")
 			"stopped existing client with duplicate session ID")
 	}
 	}
 
 

+ 10 - 3
psiphon/server/udp.go

@@ -107,7 +107,8 @@ func (mux *udpPortForwardMultiplexer) run() {
 		message, err := readUdpgwMessage(mux.sshChannel, buffer)
 		message, err := readUdpgwMessage(mux.sshChannel, buffer)
 		if err != nil {
 		if err != nil {
 			if err != io.EOF {
 			if err != io.EOF {
-				log.WithContextFields(LogFields{"error": err}).Warning("readUdpgwMessage failed")
+				// Debug since I/O errors occur during normal operation
+				log.WithContextFields(LogFields{"error": err}).Debug("readUdpgwMessage failed")
 			}
 			}
 			break
 			break
 		}
 		}
@@ -389,7 +390,10 @@ func readUdpgwMessage(
 
 
 		_, err := io.ReadFull(reader, buffer[0:2])
 		_, err := io.ReadFull(reader, buffer[0:2])
 		if err != nil {
 		if err != nil {
-			return nil, common.ContextError(err)
+			if err != io.EOF {
+				err = common.ContextError(err)
+			}
+			return nil, err
 		}
 		}
 
 
 		size := binary.LittleEndian.Uint16(buffer[0:2])
 		size := binary.LittleEndian.Uint16(buffer[0:2])
@@ -400,7 +404,10 @@ func readUdpgwMessage(
 
 
 		_, err = io.ReadFull(reader, buffer[2:2+size])
 		_, err = io.ReadFull(reader, buffer[2:2+size])
 		if err != nil {
 		if err != nil {
-			return nil, common.ContextError(err)
+			if err != io.EOF {
+				err = common.ContextError(err)
+			}
+			return nil, err
 		}
 		}
 
 
 		flags := buffer[2]
 		flags := buffer[2]