Просмотр исходного кода

Add memory-monitoring stress test

Rod Hynes 8 лет назад
Родитель
Сommit
b5def06bd3
4 измененных файлов с 206 добавлено и 19 удалено
  1. 1 0
      .travis.yml
  2. 11 0
      psiphon/controller.go
  3. 173 0
      psiphon/memory_test/memory_test.go
  4. 21 19
      psiphon/utils.go

+ 1 - 0
.travis.yml

@@ -33,6 +33,7 @@ script:
 - go test -v -covermode=count -coverprofile=server.coverprofile ./server
 - go test -v -covermode=count -coverprofile=psinet.coverprofile ./server/psinet
 - go test -v -covermode=count -coverprofile=psiphon.coverprofile
+- go test -v ./memory_test
 - $HOME/gopath/bin/gover
 - $HOME/gopath/bin/goveralls -coverprofile=gover.coverprofile -service=travis-ci -repotoken $COVERALLS_TOKEN
 before_install:

+ 11 - 0
psiphon/controller.go

@@ -748,6 +748,17 @@ loop:
 	NoticeInfo("exiting run tunnels")
 }
 
+// TerminateNextActiveTunnel is a support routine for
+// test code that must terminate the active tunnel and
+// restart establishing. This function is not guaranteed
+// to be safe for use in other cases.
+func (controller *Controller) TerminateNextActiveTunnel() {
+	tunnel := controller.getNextActiveTunnel()
+	if tunnel != nil {
+		controller.SignalTunnelFailure(tunnel)
+	}
+}
+
 // classifyImpairedProtocol tracks "impaired" protocol classifications for failed
 // tunnels. A protocol is classified as impaired if a tunnel using that protocol
 // fails, repeatedly, shortly after the start of the connection. During tunnel

+ 173 - 0
psiphon/memory_test/memory_test.go

@@ -0,0 +1,173 @@
+/*
+ * Copyright (c) 2017, Psiphon Inc.
+ * All rights reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package memory_test
+
+import (
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"runtime"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"testing"
+	"time"
+
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
+)
+
+// TestMemoryUsage is a memory stress test that repeatedly
+// establishes a tunnel, immediately terminates it, and
+// start reestablishing.
+//
+// runtime.MemStats is used to monitor system memory usage
+// during the test.
+//
+// This test is in its own package as its runtime.MemStats
+// checks must not be impacted by other test runs; this
+// test is also long-running.
+
+func TestMemoryUsage(t *testing.T) {
+
+	testDataDirName, err := ioutil.TempDir("", "psiphon-memory-test")
+	if err != nil {
+		fmt.Printf("TempDir failed: %s\n", err)
+		os.Exit(1)
+	}
+	defer os.RemoveAll(testDataDirName)
+	os.Remove(filepath.Join(testDataDirName, psiphon.DATA_STORE_FILENAME))
+
+	psiphon.SetEmitDiagnosticNotices(true)
+
+	configJSON, err := ioutil.ReadFile("../controller_test.config")
+	if err != nil {
+		// Skip, don't fail, if config file is not present
+		t.Skipf("error loading configuration file: %s", err)
+	}
+
+	// These fields must be filled in before calling LoadConfig
+	var modifyConfig map[string]interface{}
+	json.Unmarshal(configJSON, &modifyConfig)
+	modifyConfig["DataStoreDirectory"] = testDataDirName
+	modifyConfig["RemoteServerListDownloadFilename"] = filepath.Join(testDataDirName, "server_list_compressed")
+	modifyConfig["UpgradeDownloadFilename"] = filepath.Join(testDataDirName, "upgrade")
+	configJSON, _ = json.Marshal(modifyConfig)
+
+	config, err := psiphon.LoadConfig(configJSON)
+	if err != nil {
+		t.Fatalf("error processing configuration file: %s", err)
+	}
+
+	postActiveTunnelTerminateDelay := 250 * time.Millisecond
+	testDuration := 5 * time.Minute
+	memInspectionFrequency := 10 * time.Second
+	maxSysMemory := uint64(10 * 1024 * 1024)
+
+	config.ClientVersion = "999999999"
+	config.TunnelPoolSize = 1
+	fetchRemoteServerListRetryPeriodSeconds := 0
+	config.FetchRemoteServerListRetryPeriodSeconds = &fetchRemoteServerListRetryPeriodSeconds
+	establishTunnelPausePeriodSeconds := 1
+	config.EstablishTunnelPausePeriodSeconds = &establishTunnelPausePeriodSeconds
+	config.TunnelProtocol = ""
+	config.DisableLocalSocksProxy = true
+	config.DisableLocalHTTPProxy = true
+	config.ConnectionWorkerPoolSize = 10
+	config.LimitMeekConnectionWorkers = 5
+	config.LimitMeekBufferSizes = true
+	config.StaggerConnectionWorkersMilliseconds = 100
+	config.IgnoreHandshakeStatsRegexps = true
+
+	err = psiphon.InitDataStore(config)
+	if err != nil {
+		t.Fatalf("error initializing datastore: %s", err)
+	}
+
+	var controller *psiphon.Controller
+	tunnelsEstablished := int32(0)
+
+	psiphon.SetNoticeOutput(psiphon.NewNoticeReceiver(
+		func(notice []byte) {
+			noticeType, payload, err := psiphon.GetNotice(notice)
+			if err != nil {
+				return
+			}
+			switch noticeType {
+			case "Tunnels":
+				count := int(payload["count"].(float64))
+				if count > 0 {
+					atomic.AddInt32(&tunnelsEstablished, 1)
+					time.Sleep(postActiveTunnelTerminateDelay)
+					go controller.TerminateNextActiveTunnel()
+				}
+			case "Info":
+				message := payload["message"].(string)
+				if strings.Contains(message, "peak concurrent establish tunnels") {
+					fmt.Printf("%s, ", message)
+				} else if strings.Contains(message, "peak concurrent meek establish tunnels") {
+					fmt.Printf("%s\n", message)
+				}
+			}
+		}))
+
+	controller, err = psiphon.NewController(config)
+	if err != nil {
+		t.Fatalf("error creating controller: %s", err)
+	}
+
+	shutdownBroadcast := make(chan struct{})
+	controllerWaitGroup := new(sync.WaitGroup)
+	controllerWaitGroup.Add(1)
+	go func() {
+		defer controllerWaitGroup.Done()
+		controller.Run(shutdownBroadcast)
+	}()
+
+	testTimer := time.NewTimer(testDuration)
+	memInspectionTicker := time.NewTicker(memInspectionFrequency)
+
+	lastTunnelsEstablished := int32(0)
+test_loop:
+	for {
+		select {
+		case <-testTimer.C:
+			break test_loop
+		case <-memInspectionTicker.C:
+			var m runtime.MemStats
+			runtime.ReadMemStats(&m)
+			if m.Sys > maxSysMemory {
+				t.Fatalf("sys memory exceeds limit: %d", m.Sys)
+			} else {
+				n := atomic.LoadInt32(&tunnelsEstablished)
+				fmt.Printf("Tunnels established: %d, MemStats.Sys (peak system memory used): %s, MemStats.TotalAlloc (cumulative allocations): %s\n",
+					n, psiphon.FormatByteCount(m.Sys), psiphon.FormatByteCount(m.TotalAlloc))
+				if lastTunnelsEstablished-n >= 0 {
+					t.Fatalf("expected established tunnels")
+				}
+				lastTunnelsEstablished = n
+			}
+		}
+	}
+
+	close(shutdownBroadcast)
+	controllerWaitGroup.Wait()
+}

+ 21 - 19
psiphon/utils.go

@@ -189,8 +189,10 @@ func (conn *channelConn) SetWriteDeadline(_ time.Time) error {
 	return common.ContextError(errors.New("unsupported"))
 }
 
-// Based on: https://bitbucket.org/psiphon/psiphon-circumvention-system/src/b2884b0d0a491e55420ed1888aea20d00fefdb45/Android/app/src/main/java/com/psiphon3/psiphonlibrary/Utils.java?at=default#Utils.java-646
-func byteCountFormatter(bytes uint64) string {
+// FormatByteCount returns a string representation of the specified
+// byte count in conventional, human-readable format.
+func FormatByteCount(bytes uint64) string {
+	// Based on: https://bitbucket.org/psiphon/psiphon-circumvention-system/src/b2884b0d0a491e55420ed1888aea20d00fefdb45/Android/app/src/main/java/com/psiphon3/psiphonlibrary/Utils.java?at=default#Utils.java-646
 	base := uint64(1024)
 	if bytes < base {
 		return fmt.Sprintf("%dB", bytes)
@@ -206,24 +208,24 @@ func emitMemoryMetrics() {
 	NoticeInfo("Memory metrics at %s: goroutines %d | total alloc %s | sys %s | heap alloc/sys/idle/inuse/released/objects %s/%s/%s/%s/%s/%d | stack inuse/sys %s/%s | mspan inuse/sys %s/%s | mcached inuse/sys %s/%s | buckhash/gc/other sys %s/%s/%s | nextgc %s",
 		common.GetParentContext(),
 		runtime.NumGoroutine(),
-		byteCountFormatter(memStats.TotalAlloc),
-		byteCountFormatter(memStats.Sys),
-		byteCountFormatter(memStats.HeapAlloc),
-		byteCountFormatter(memStats.HeapSys),
-		byteCountFormatter(memStats.HeapIdle),
-		byteCountFormatter(memStats.HeapInuse),
-		byteCountFormatter(memStats.HeapReleased),
+		FormatByteCount(memStats.TotalAlloc),
+		FormatByteCount(memStats.Sys),
+		FormatByteCount(memStats.HeapAlloc),
+		FormatByteCount(memStats.HeapSys),
+		FormatByteCount(memStats.HeapIdle),
+		FormatByteCount(memStats.HeapInuse),
+		FormatByteCount(memStats.HeapReleased),
 		memStats.HeapObjects,
-		byteCountFormatter(memStats.StackInuse),
-		byteCountFormatter(memStats.StackSys),
-		byteCountFormatter(memStats.MSpanInuse),
-		byteCountFormatter(memStats.MSpanSys),
-		byteCountFormatter(memStats.MCacheInuse),
-		byteCountFormatter(memStats.MCacheSys),
-		byteCountFormatter(memStats.BuckHashSys),
-		byteCountFormatter(memStats.GCSys),
-		byteCountFormatter(memStats.OtherSys),
-		byteCountFormatter(memStats.NextGC))
+		FormatByteCount(memStats.StackInuse),
+		FormatByteCount(memStats.StackSys),
+		FormatByteCount(memStats.MSpanInuse),
+		FormatByteCount(memStats.MSpanSys),
+		FormatByteCount(memStats.MCacheInuse),
+		FormatByteCount(memStats.MCacheSys),
+		FormatByteCount(memStats.BuckHashSys),
+		FormatByteCount(memStats.GCSys),
+		FormatByteCount(memStats.OtherSys),
+		FormatByteCount(memStats.NextGC))
 }
 
 func aggressiveGarbageCollection() {