Просмотр исходного кода

Add MaxConcurrentSSHHandshakes

Rod Hynes 8 лет назад
Родитель
Сommit
324fef9af9

+ 1 - 0
README.md

@@ -123,6 +123,7 @@ Psiphon Tunnel Core uses:
 * [mitchellh/panicwrap](https://github.com/mitchellh/panicwrap)
 * [juju/ratelimit](https://github.com/juju/ratelimit)
 * [codahale/sss](https://github.com/codahale/sss)
+* [marusama/semaphore](https://github.com/marusama/semaphore)
 
 Licensing
 --------------------------------------------------------------------------------

+ 10 - 0
psiphon/server/config.go

@@ -274,6 +274,16 @@ type Config struct {
 	// PacketTunnelSudoNetworkConfigCommands sets
 	// tun.ServerConfig.SudoNetworkConfigCommands.
 	PacketTunnelSudoNetworkConfigCommands bool
+
+	// MaxConcurrentSSHHandshakes specifies a limit on the number of concurrent
+	// SSH handshake negotiations. This is set to mitigate spikes in memory
+	// allocations and CPU usage associated with SSH handshakes when many clients
+	// attempt to connect concurrently. When a maximum limit is specified and
+	// reached, additional clients that establish TCP or meek connections will
+	// be disconnected after a short wait for the number of concurrent handshakes
+	// to drop below the limit.
+	// The default, 0 is no limit.
+	MaxConcurrentSSHHandshakes int
 }
 
 // RunWebServer indicates whether to run a web server component.

+ 4 - 0
psiphon/server/server_test.go

@@ -343,6 +343,10 @@ func runServer(t *testing.T, runConfig *runServerConfig) {
 	serverConfig["LogFilename"] = filepath.Join(testDataDirName, "psiphond.log")
 	serverConfig["LogLevel"] = "debug"
 
+	// Set this parameter so at least the semaphore functions are called.
+	// TODO: test that the concurrency limit is correctly enforced.
+	serverConfig["MaxConcurrentSSHHandshakes"] = 1
+
 	serverConfigJSON, _ = json.Marshal(serverConfig)
 
 	// run server

+ 93 - 21
psiphon/server/tunnelServer.go

@@ -39,12 +39,14 @@ import (
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/osl"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tun"
+	"github.com/marusama/semaphore"
 	cache "github.com/patrickmn/go-cache"
 )
 
 const (
 	SSH_AUTH_LOG_PERIOD                   = 30 * time.Minute
 	SSH_HANDSHAKE_TIMEOUT                 = 30 * time.Second
+	SSH_BEGIN_HANDSHAKE_TIMEOUT           = 1 * time.Second
 	SSH_CONNECTION_READ_DEADLINE          = 5 * time.Minute
 	SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE = 8192
 	SSH_TCP_PORT_FORWARD_QUEUE_SIZE       = 1024
@@ -249,18 +251,19 @@ type sshServer struct {
 	// Note: 64-bit ints used with atomic operations are placed
 	// at the start of struct to ensure 64-bit alignment.
 	// (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
-	lastAuthLog          int64
-	authFailedCount      int64
-	support              *SupportServices
-	establishTunnels     int32
-	shutdownBroadcast    <-chan struct{}
-	sshHostKey           ssh.Signer
-	clientsMutex         sync.Mutex
-	stoppingClients      bool
-	acceptedClientCounts map[string]map[string]int64
-	clients              map[string]*sshClient
-	oslSessionCacheMutex sync.Mutex
-	oslSessionCache      *cache.Cache
+	lastAuthLog             int64
+	authFailedCount         int64
+	support                 *SupportServices
+	establishTunnels        int32
+	concurrentSSHHandshakes semaphore.Semaphore
+	shutdownBroadcast       <-chan struct{}
+	sshHostKey              ssh.Signer
+	clientsMutex            sync.Mutex
+	stoppingClients         bool
+	acceptedClientCounts    map[string]map[string]int64
+	clients                 map[string]*sshClient
+	oslSessionCacheMutex    sync.Mutex
+	oslSessionCache         *cache.Cache
 }
 
 func newSSHServer(
@@ -278,6 +281,11 @@ func newSSHServer(
 		return nil, common.ContextError(err)
 	}
 
+	var concurrentSSHHandshakes semaphore.Semaphore
+	if support.Config.MaxConcurrentSSHHandshakes > 0 {
+		concurrentSSHHandshakes = semaphore.New(support.Config.MaxConcurrentSSHHandshakes)
+	}
+
 	// The OSL session cache temporarily retains OSL seed state
 	// progress for disconnected clients. This enables clients
 	// that disconnect and immediately reconnect to the same
@@ -292,13 +300,14 @@ func newSSHServer(
 	oslSessionCache := cache.New(OSL_SESSION_CACHE_TTL, 1*time.Minute)
 
 	return &sshServer{
-		support:              support,
-		establishTunnels:     1,
-		shutdownBroadcast:    shutdownBroadcast,
-		sshHostKey:           signer,
-		acceptedClientCounts: make(map[string]map[string]int64),
-		clients:              make(map[string]*sshClient),
-		oslSessionCache:      oslSessionCache,
+		support:                 support,
+		establishTunnels:        1,
+		concurrentSSHHandshakes: concurrentSSHHandshakes,
+		shutdownBroadcast:       shutdownBroadcast,
+		sshHostKey:              signer,
+		acceptedClientCounts:    make(map[string]map[string]int64),
+		clients:                 make(map[string]*sshClient),
+		oslSessionCache:         oslSessionCache,
 	}, nil
 }
 
@@ -709,9 +718,57 @@ func (sshServer *sshServer) handleClient(tunnelProtocol string, clientConn net.C
 	sshServer.registerAcceptedClient(tunnelProtocol, geoIPData.Country)
 	defer sshServer.unregisterAcceptedClient(tunnelProtocol, geoIPData.Country)
 
+	// When configured, enforce a cap on the number of concurrent SSH
+	// handshakes. This limits load spikes on busy servers when many clients
+	// attempt to connect at once. Wait a short time, SSH_BEGIN_HANDSHAKE_TIMEOUT,
+	// to acquire; waiting will avoid immediately creating more load on another
+	// server in the network when the client tries a new candidate. Disconnect the
+	// client when that wait time is exceeded.
+	//
+	// This mechanism limits memory allocations and CPU usage associated with the
+	// SSH handshake. At this point, new direct TCP connections or new meek
+	// connections, with associated resource usage, are already established. Those
+	// connections are expected to be rate or load limited using other mechanisms.
+	//
+	// TODO:
+	//
+	// - deduct time spent acquiring the semaphore from SSH_HANDSHAKE_TIMEOUT in
+	//   sshClient.run, since the client is also applying an SSH handshake timeout
+	//   and won't exclude time spent waiting.
+	// - each call to sshServer.handleClient (in sshServer.runListener) is invoked
+	//   in its own goroutine, but shutdown doesn't synchronously await these
+	//   goroutnes. Once this is synchronizes, the following context.WithTimeout
+	//   should use an sshServer parent context to ensure blocking acquires
+	//   interrupt immediately upon shutdown.
+
+	var onSSHHandshakeFinished func()
+	if sshServer.support.Config.MaxConcurrentSSHHandshakes > 0 {
+
+		ctx, cancelFunc := context.WithTimeout(
+			context.Background(), SSH_BEGIN_HANDSHAKE_TIMEOUT)
+		defer cancelFunc()
+
+		err := sshServer.concurrentSSHHandshakes.Acquire(ctx, 1)
+		if err != nil {
+			clientConn.Close()
+			// This is a debug log as the only possible error is context timeout.
+			log.WithContextFields(LogFields{"error": err}).Debug(
+				"acquire SSH handshake semaphore failed")
+			return
+		}
+
+		onSSHHandshakeFinished = func() {
+			sshServer.concurrentSSHHandshakes.Release(1)
+		}
+	}
+
 	sshClient := newSshClient(sshServer, tunnelProtocol, geoIPData)
 
-	sshClient.run(clientConn)
+	// sshClient.run _must_ call onSSHHandshakeFinished to release the semaphore:
+	// in any error case; or, as soon as the SSH handshake phase has successfully
+	// completed.
+
+	sshClient.run(clientConn, onSSHHandshakeFinished)
 }
 
 func (sshServer *sshServer) monitorPortForwardDialError(err error) {
@@ -817,7 +874,15 @@ func newSshClient(
 	return client
 }
 
-func (sshClient *sshClient) run(clientConn net.Conn) {
+func (sshClient *sshClient) run(
+	clientConn net.Conn, onSSHHandshakeFinished func()) {
+
+	// onSSHHandshakeFinished must be called even if the SSH handshake is aborted.
+	defer func() {
+		if onSSHHandshakeFinished != nil {
+			onSSHHandshakeFinished()
+		}
+	}()
 
 	// Some conns report additional metrics
 	metricsSource, isMetricsSource := clientConn.(MetricsSource)
@@ -928,6 +993,13 @@ func (sshClient *sshClient) run(clientConn net.Conn) {
 		return
 	}
 
+	// The SSH handshake has finished successfully; notify now to allow other
+	// blocked SSH handshakes to proceed.
+	if onSSHHandshakeFinished != nil {
+		onSSHHandshakeFinished()
+	}
+	onSSHHandshakeFinished = nil
+
 	sshClient.Lock()
 	sshClient.sshConn = result.sshConn
 	sshClient.activityConn = activityConn

+ 21 - 0
vendor/github.com/marusama/semaphore/LICENSE

@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2017 marusama
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.

+ 81 - 0
vendor/github.com/marusama/semaphore/README.md

@@ -0,0 +1,81 @@
+semaphore
+=========
+[![Build Status](https://travis-ci.org/marusama/semaphore.svg?branch=master)](https://travis-ci.org/marusama/semaphore)
+[![Go Report Card](https://goreportcard.com/badge/github.com/marusama/semaphore)](https://goreportcard.com/report/github.com/marusama/semaphore)
+[![Coverage Status](https://coveralls.io/repos/github/marusama/semaphore/badge.svg?branch=master)](https://coveralls.io/github/marusama/semaphore?branch=master)
+[![GoDoc](https://godoc.org/github.com/kamilsk/semaphore?status.svg)](https://godoc.org/github.com/marusama/semaphore)
+[![License](https://img.shields.io/github/license/mashape/apistatus.svg?maxAge=2592000)](LICENSE)
+
+Fast resizable golang semaphore based on CAS
+
+* allows weighted acquire/release;
+* supports cancellation via context;
+* allows change semaphore limit after creation;
+* faster than channels based semaphores.
+
+### Usage
+Initiate
+```go
+import "github.com/marusama/semaphore"
+...
+sem := semaphore.New(5) // new semaphore with limit=5
+```
+Acquire
+```go
+sem.Acquire(ctx, n)     // acquire n with context
+sem.TryAcquire(n)       // try acquire n without blocking 
+...
+ctx := context.WithTimeout(context.Background(), time.Second)
+sem.Acquire(ctx, n)     // acquire n with timeout
+``` 
+Release
+```go
+sem.Release(n)          // release n
+```
+Change semaphore limit
+```go
+sem.SetLimit(new_limit) // set new semaphore limit
+```
+
+
+### Some benchmarks
+Run on MacBook Pro (early 2015) with 2,7GHz Core i5 cpu and 16GB DDR3 ram:
+```text
+// this semaphore:
+BenchmarkSemaphore_Acquire_Release_under_limit_simple-4                   	50000000	        31.1 ns/op	       0 B/op	       0 allocs/op
+BenchmarkSemaphore_Acquire_Release_under_limit-4                          	 1000000	      1383 ns/op	       0 B/op	       0 allocs/op
+BenchmarkSemaphore_Acquire_Release_over_limit-4                           	  100000	     13468 ns/op	      24 B/op	       0 allocs/op
+
+
+// some other implementations:
+
+// golang.org/x/sync/semaphore:
+BenchmarkXSyncSemaphore_Acquire_Release_under_limit_simple-4              	30000000	        51.8 ns/op	       0 B/op	       0 allocs/op
+BenchmarkXSyncSemaphore_Acquire_Release_under_limit-4                     	  500000	      2655 ns/op	       0 B/op	       0 allocs/op
+BenchmarkXSyncSemaphore_Acquire_Release_over_limit-4                      	   20000	    100004 ns/op	   15991 B/op	     299 allocs/op
+
+// github.com/abiosoft/semaphore:
+BenchmarkAbiosoftSemaphore_Acquire_Release_under_limit_simple-4           	 5000000	       269 ns/op	       0 B/op	       0 allocs/op
+BenchmarkAbiosoftSemaphore_Acquire_Release_under_limit-4                  	  300000	      5602 ns/op	       0 B/op	       0 allocs/op
+BenchmarkAbiosoftSemaphore_Acquire_Release_over_limit-4                   	   30000	     54090 ns/op	       0 B/op	       0 allocs/op
+
+// github.com/dropbox/godropbox
+BenchmarkDropboxBoundedSemaphore_Acquire_Release_under_limit_simple-4     	20000000	        99.6 ns/op	       0 B/op	       0 allocs/op
+BenchmarkDropboxBoundedSemaphore_Acquire_Release_under_limit-4            	 1000000	      1343 ns/op	       0 B/op	       0 allocs/op
+BenchmarkDropboxBoundedSemaphore_Acquire_Release_over_limit-4             	  100000	     35735 ns/op	       0 B/op	       0 allocs/op
+BenchmarkDropboxUnboundedSemaphore_Acquire_Release_under_limit_simple-4   	30000000	        56.0 ns/op	       0 B/op	       0 allocs/op
+BenchmarkDropboxUnboundedSemaphore_Acquire_Release_under_limit-4          	  500000	      2871 ns/op	       0 B/op	       0 allocs/op
+BenchmarkDropboxUnboundedSemaphore_Acquire_Release_over_limit-4           	   30000	     41089 ns/op	       0 B/op	       0 allocs/op
+
+// github.com/kamilsk/semaphore
+BenchmarkKamilskSemaphore_Acquire_Release_under_limit_simple-4            	10000000	       170 ns/op	      16 B/op	       1 allocs/op
+BenchmarkKamilskSemaphore_Acquire_Release_under_limit-4                   	  500000	      3023 ns/op	     160 B/op	      10 allocs/op
+BenchmarkKamilskSemaphore_Acquire_Release_over_limit-4                    	   20000	     67687 ns/op	    1600 B/op	     100 allocs/op
+
+// github.com/pivotal-golang/semaphore
+BenchmarkPivotalGolangSemaphore_Acquire_Release_under_limit_simple-4      	 1000000	      1006 ns/op	     136 B/op	       2 allocs/op
+BenchmarkPivotalGolangSemaphore_Acquire_Release_under_limit-4             	  100000	     11837 ns/op	    1280 B/op	      20 allocs/op
+BenchmarkPivotalGolangSemaphore_Acquire_Release_over_limit-4              	   10000	    128890 ns/op	   12800 B/op	     200 allocs/op
+
+```
+You can rerun these benchmarks, just checkout `benchmarks` branch and run `go test -bench=. -benchmem ./bench/...`

+ 227 - 0
vendor/github.com/marusama/semaphore/semaphore.go

@@ -0,0 +1,227 @@
+// Copyright 2017 Maru Sama. All rights reserved.
+// Use of this source code is governed by the MIT license
+// that can be found in the LICENSE file.
+
+// Package semaphore provides an implementation of counting semaphore primitive with possibility to change limit
+// after creation. This implementation is based on Compare-and-Swap primitive that in general case works faster
+// than other golang channel-based semaphore implementations.
+package semaphore // import "github.com/marusama/semaphore"
+
+import (
+	"context"
+	"errors"
+	"sync"
+	"sync/atomic"
+)
+
+// Semaphore counting resizable semaphore synchronization primitive.
+// Use the Semaphore to control access to a pool of resources.
+// There is no guaranteed order, such as FIFO or LIFO, in which blocked goroutines enter the semaphore.
+// A goroutine can enter the semaphore multiple times, by calling the Acquire or TryAcquire methods repeatedly.
+// To release some or all of these entries, the goroutine can call the Release method
+// that specifies the number of entries to be released.
+// Change Semaphore capacity to lower or higher by SetLimit.
+type Semaphore interface {
+	// Acquire enters the semaphore a specified number of times, blocking only until ctx is done.
+	// This operation can be cancelled via passed context (but it's allowed to pass ctx='nil').
+	// Method can return error 'ErrCtxDone' if the passed context is cancelled,
+	// but this behavior is not guaranteed and sometimes semaphore will still be acquired.
+	Acquire(ctx context.Context, n int) error
+
+	// TryAcquire acquires the semaphore without blocking.
+	// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
+	TryAcquire(n int) bool
+
+	// Release exits the semaphore a specified number of times and returns the previous count.
+	Release(n int) int
+
+	// SetLimit changes current semaphore limit in concurrent way.
+	// It is allowed to change limit many times and it's safe to set limit higher or lower.
+	SetLimit(limit int)
+
+	// GetLimit returns current semaphore limit.
+	GetLimit() int
+
+	// GetCount returns current number of occupied entries in semaphore.
+	GetCount() int
+}
+
+var (
+	// ErrCtxDone predefined error - context is cancelled.
+	ErrCtxDone = errors.New("ctx.Done()")
+)
+
+// semaphore impl Semaphore intf
+type semaphore struct {
+	//  state holds limit and count in one 64 bits unsigned integer
+	//
+	//                            state (64 bits)
+	// +-----------------------------------------------------------------+
+	//      limit (high 32 bits)                 count (low 32 bits)
+	// +--------------------------------|--------------------------------+
+	//
+	state uint64
+
+	// broadcast fields
+	lock        sync.RWMutex
+	broadcastCh chan struct{}
+}
+
+// New initializes a new instance of the Semaphore, specifying the maximum number of concurrent entries.
+func New(limit int) Semaphore {
+	if limit <= 0 {
+		panic("semaphore limit must be greater than 0")
+	}
+	broadcastCh := make(chan struct{})
+	return &semaphore{
+		state:       uint64(limit) << 32,
+		broadcastCh: broadcastCh,
+	}
+}
+
+func (s *semaphore) Acquire(ctx context.Context, n int) error {
+	if n <= 0 {
+		panic("n must be positive number")
+	}
+	for {
+		if ctx != nil {
+			select {
+			case <-ctx.Done():
+				return ErrCtxDone
+			default:
+			}
+		}
+
+		// get current semaphore count and limit
+		state := atomic.LoadUint64(&s.state)
+		count := state & 0xFFFFFFFF
+		limit := state >> 32
+
+		// new count
+		newCount := count + uint64(n)
+
+		if newCount <= limit {
+			if atomic.CompareAndSwapUint64(&s.state, state, limit<<32+newCount) {
+				// acquired
+				return nil
+			}
+
+			// CAS failed, try again
+			continue
+		} else {
+			// semaphore is full, let's wait
+			s.lock.RLock()
+			broadcastCh := s.broadcastCh
+			s.lock.RUnlock()
+
+			if ctx != nil {
+				select {
+				case <-ctx.Done():
+					return ErrCtxDone
+				// waiting for broadcast signal
+				case <-broadcastCh:
+				}
+			} else {
+				select {
+				// waiting for broadcast signal
+				case <-broadcastCh:
+				}
+			}
+		}
+	}
+}
+
+func (s *semaphore) TryAcquire(n int) bool {
+	if n <= 0 {
+		panic("n must be positive number")
+	}
+
+	for {
+		// get current semaphore count and limit
+		state := atomic.LoadUint64(&s.state)
+		count := state & 0xFFFFFFFF
+		limit := state >> 32
+
+		// new count
+		newCount := count + uint64(n)
+
+		if newCount <= limit {
+			if atomic.CompareAndSwapUint64(&s.state, state, limit<<32+newCount) {
+				// acquired
+				return true
+			}
+
+			// CAS failed, try again
+			continue
+		}
+
+		// semaphore is full
+		return false
+	}
+}
+
+func (s *semaphore) Release(n int) int {
+	if n <= 0 {
+		panic("n must be positive number")
+	}
+	for {
+		// get current semaphore count and limit
+		state := atomic.LoadUint64(&s.state)
+		count := state & 0xFFFFFFFF
+		limit := state >> 32
+
+		if count < uint64(n) {
+			panic("semaphore release without acquire")
+		}
+
+		// new count
+		newCount := count - uint64(n)
+
+		if atomic.CompareAndSwapUint64(&s.state, state, state&0xFFFFFFFF00000000+newCount) {
+
+			// notifying possible waiters only if there weren't free slots before
+			if count >= limit {
+				newBroadcastCh := make(chan struct{})
+				s.lock.Lock()
+				oldBroadcastCh := s.broadcastCh
+				s.broadcastCh = newBroadcastCh
+				s.lock.Unlock()
+
+				// send broadcast signal
+				close(oldBroadcastCh)
+			}
+
+			return int(count)
+		}
+	}
+}
+
+func (s *semaphore) SetLimit(limit int) {
+	if limit <= 0 {
+		panic("semaphore limit must be greater than 0")
+	}
+	for {
+		state := atomic.LoadUint64(&s.state)
+		if atomic.CompareAndSwapUint64(&s.state, state, uint64(limit)<<32+state&0xFFFFFFFF) {
+			newBroadcastCh := make(chan struct{})
+			s.lock.Lock()
+			oldBroadcastCh := s.broadcastCh
+			s.broadcastCh = newBroadcastCh
+			s.lock.Unlock()
+
+			// send broadcast signal
+			close(oldBroadcastCh)
+			return
+		}
+	}
+}
+
+func (s *semaphore) GetCount() int {
+	state := atomic.LoadUint64(&s.state)
+	return int(state & 0xFFFFFFFF)
+}
+
+func (s *semaphore) GetLimit() int {
+	state := atomic.LoadUint64(&s.state)
+	return int(state >> 32)
+}

+ 6 - 0
vendor/vendor.json

@@ -74,6 +74,12 @@
 			"revision": "ae77be60afb1dcacde03767a8c37337fad28ac14",
 			"revisionTime": "2017-05-10T13:15:34Z"
 		},
+		{
+			"checksumSHA1": "sY8sshVIEXnJgg3S6C5FcN33Vq4=",
+			"path": "github.com/marusama/semaphore",
+			"revision": "565ffd8e868a4e9a2b4e218e940cf4cb40ebd105",
+			"revisionTime": "2017-12-14T15:47:24Z"
+		},
 		{
 			"checksumSHA1": "WBdr9cf6JJbbCjHsYZnGVK5i4fY=",
 			"path": "github.com/mattn/go-sqlite3",