Browse Source

Broker client memory leak fix and other changes

- When resetting the broker client, fully close the previous instance instead
  of leaving it dangling, where references could be retained due to
  underlying http2 goroutines.
- Track all MeekConn underlying conns and fully close all when the MeekConn is
  closed.
- Fix missing mutex lock in broker client TacticsApplied reset case.
- Don't reset the broker client in the case of temporary round trip errors,
  such as CDN request timeouts.
- Wire up WaitForNetworkConnectivity in inproxy proxy main loop.
- Log which of ICE or port mapping WebRTC candidates were gathered.
- Pass raw API parameters, not processed log fields into broker tactics
  handler.
- Fix speed_test_samples API parameter type incompatibility.
- Increase the matcher rate limit defaults.
- Remove psiphon/server/net.go; no longer used as of 7bf72494.
Rod Hynes 1 year ago
parent
commit
d1a3c2ac4c

+ 1 - 1
psiphon/TCPConn.go

@@ -106,7 +106,7 @@ func DialTCP(
 func proxiedTcpDial(
 	ctx context.Context, addr string, config *DialConfig) (net.Conn, error) {
 
-	interruptConns := common.NewConns()
+	interruptConns := common.NewConns[net.Conn]()
 
 	// Note: using interruptConns to interrupt a proxy dial assumes
 	// that the underlying proxy code will immediately exit with an

+ 22 - 21
psiphon/common/inproxy/api.go

@@ -415,41 +415,41 @@ const (
 	maxDecoySize      = 16384
 )
 
-// ValidateAndGetLogFields validates the ProxyMetrics and returns
-// common.LogFields for logging.
-func (metrics *ProxyMetrics) ValidateAndGetLogFields(
+// ValidateAndGetParametersAndLogFields validates the ProxyMetrics and returns
+// Psiphon API parameters for processing and common.LogFields for logging.
+func (metrics *ProxyMetrics) ValidateAndGetParametersAndLogFields(
 	baseAPIParameterValidator common.APIParameterValidator,
 	formatter common.APIParameterLogFieldFormatter,
-	geoIPData common.GeoIPData) (common.LogFields, error) {
+	geoIPData common.GeoIPData) (common.APIParameters, common.LogFields, error) {
 
 	if metrics.BaseAPIParameters == nil {
-		return nil, errors.TraceNew("missing base API parameters")
+		return nil, nil, errors.TraceNew("missing base API parameters")
 	}
 
 	baseParams, err := protocol.DecodePackedAPIParameters(metrics.BaseAPIParameters)
 	if err != nil {
-		return nil, errors.Trace(err)
+		return nil, nil, errors.Trace(err)
 	}
 
 	err = baseAPIParameterValidator(baseParams)
 	if err != nil {
-		return nil, errors.Trace(err)
+		return nil, nil, errors.Trace(err)
 	}
 
 	if metrics.ProxyProtocolVersion != ProxyProtocolVersion1 {
-		return nil, errors.Tracef("invalid proxy protocol version: %v", metrics.ProxyProtocolVersion)
+		return nil, nil, errors.Tracef("invalid proxy protocol version: %v", metrics.ProxyProtocolVersion)
 	}
 
 	if !metrics.NATType.IsValid() {
-		return nil, errors.Tracef("invalid NAT type: %v", metrics.NATType)
+		return nil, nil, errors.Tracef("invalid NAT type: %v", metrics.NATType)
 	}
 
 	if len(metrics.PortMappingTypes) > maxPortMappingTypes {
-		return nil, errors.Tracef("invalid portmapping types length: %d", len(metrics.PortMappingTypes))
+		return nil, nil, errors.Tracef("invalid portmapping types length: %d", len(metrics.PortMappingTypes))
 	}
 
 	if !metrics.PortMappingTypes.IsValid() {
-		return nil, errors.Tracef("invalid portmapping types: %v", metrics.PortMappingTypes)
+		return nil, nil, errors.Tracef("invalid portmapping types: %v", metrics.PortMappingTypes)
 	}
 
 	logFields := formatter(geoIPData, baseParams)
@@ -465,7 +465,7 @@ func (metrics *ProxyMetrics) ValidateAndGetLogFields(
 	logFields["peak_upstream_bytes_per_second"] = metrics.PeakUpstreamBytesPerSecond
 	logFields["peak_downstream_bytes_per_second"] = metrics.PeakDownstreamBytesPerSecond
 
-	return logFields, nil
+	return baseParams, logFields, nil
 }
 
 // ValidateAndGetLogFields validates the ClientMetrics and returns
@@ -514,26 +514,27 @@ func (metrics *ClientMetrics) ValidateAndGetLogFields(
 	return logFields, nil
 }
 
-// ValidateAndGetLogFields validates the ProxyAnnounceRequest and returns
-// common.LogFields for logging.
-func (request *ProxyAnnounceRequest) ValidateAndGetLogFields(
+// ValidateAndGetParametersAndLogFields validates the ProxyAnnounceRequest and
+// returns Psiphon API parameters for processing and common.LogFields for
+// logging.
+func (request *ProxyAnnounceRequest) ValidateAndGetParametersAndLogFields(
 	maxCompartmentIDs int,
 	baseAPIParameterValidator common.APIParameterValidator,
 	formatter common.APIParameterLogFieldFormatter,
-	geoIPData common.GeoIPData) (common.LogFields, error) {
+	geoIPData common.GeoIPData) (common.APIParameters, common.LogFields, error) {
 
 	if len(request.PersonalCompartmentIDs) > maxCompartmentIDs {
-		return nil, errors.Tracef("invalid compartment IDs length: %d", len(request.PersonalCompartmentIDs))
+		return nil, nil, errors.Tracef("invalid compartment IDs length: %d", len(request.PersonalCompartmentIDs))
 	}
 
 	if request.Metrics == nil {
-		return nil, errors.TraceNew("missing metrics")
+		return nil, nil, errors.TraceNew("missing metrics")
 	}
 
-	logFields, err := request.Metrics.ValidateAndGetLogFields(
+	apiParams, logFields, err := request.Metrics.ValidateAndGetParametersAndLogFields(
 		baseAPIParameterValidator, formatter, geoIPData)
 	if err != nil {
-		return nil, errors.Trace(err)
+		return nil, nil, errors.Trace(err)
 	}
 
 	// PersonalCompartmentIDs are user-generated and shared out-of-band;
@@ -543,7 +544,7 @@ func (request *ProxyAnnounceRequest) ValidateAndGetLogFields(
 
 	logFields["has_personal_compartment_ids"] = hasPersonalCompartmentIDs
 
-	return logFields, nil
+	return apiParams, logFields, nil
 }
 
 // ValidateAndGetLogFields validates the ClientOfferRequest and returns

+ 3 - 3
psiphon/common/inproxy/broker.go

@@ -507,7 +507,8 @@ func (b *Broker) handleProxyAnnounce(
 		return nil, errors.Trace(err)
 	}
 
-	logFields, err = announceRequest.ValidateAndGetLogFields(
+	var apiParams common.APIParameters
+	apiParams, logFields, err = announceRequest.ValidateAndGetParametersAndLogFields(
 		int(atomic.LoadInt64(&b.maxCompartmentIDs)),
 		b.config.APIParameterValidator,
 		b.config.APIParameterLogFieldFormatter,
@@ -525,8 +526,7 @@ func (b *Broker) handleProxyAnnounce(
 	// proxy can store and apply the new tactics before announcing again.
 
 	var tacticsPayload []byte
-	tacticsPayload, newTacticsTag, err = b.config.GetTactics(
-		geoIPData, common.APIParameters(logFields))
+	tacticsPayload, newTacticsTag, err = b.config.GetTactics(geoIPData, apiParams)
 	if err != nil {
 		return nil, errors.Trace(err)
 	}

+ 14 - 8
psiphon/common/inproxy/brokerClient.go

@@ -21,6 +21,7 @@ package inproxy
 
 import (
 	"context"
+	std_errors "errors"
 	"time"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
@@ -262,14 +263,19 @@ func (b *BrokerClient) roundTrip(
 		request)
 	if err != nil {
 
-		// The BrokerDialCoordinator provider should close the existing
-		// BrokerClientRoundTripper and create a new RoundTripper to return
-		// in the next BrokerClientRoundTripper call.
-		//
-		// The session will be closed, if necessary, by InitiatorSessions.
-		// It's possible that the session remains valid and only the
-		// RoundTripper transport layer needs to be reset.
-		b.coordinator.BrokerClientRoundTripperFailed(roundTripper)
+		var failedError *RoundTripperFailedError
+		failed := std_errors.As(err, &failedError)
+
+		if failed {
+			// The BrokerDialCoordinator provider should close the existing
+			// BrokerClientRoundTripper and create a new RoundTripper to return
+			// in the next BrokerClientRoundTripper call.
+			//
+			// The session will be closed, if necessary, by InitiatorSessions.
+			// It's possible that the session remains valid and only the
+			// RoundTripper transport layer needs to be reset.
+			b.coordinator.BrokerClientRoundTripperFailed(roundTripper)
+		}
 
 		return nil, errors.Trace(err)
 	}

+ 17 - 0
psiphon/common/inproxy/coordinator.go

@@ -33,6 +33,23 @@ type RoundTripper interface {
 	RoundTrip(ctx context.Context, requestPayload []byte) (responsePayload []byte, err error)
 }
 
+// RoundTripperFailedError is an error type that should be returned from
+// RoundTripper.RoundTrip when the round trip transport has permanently
+// failed. When RoundTrip returns an error of type RoundTripperFailedError to
+// a broker client, the broker client will invoke
+// BrokerClientRoundTripperFailed.
+type RoundTripperFailedError struct {
+	err error
+}
+
+func NewRoundTripperFailedError(err error) *RoundTripperFailedError {
+	return &RoundTripperFailedError{err: err}
+}
+
+func (e RoundTripperFailedError) Error() string {
+	return e.err.Error()
+}
+
 // BrokerDialCoordinator provides in-proxy dial parameters and configuration,
 // used by both clients and proxies, and an interface for signaling when
 // parameters are successful or not, to facilitate replay of successful

+ 4 - 0
psiphon/common/inproxy/inproxy_test.go

@@ -395,6 +395,10 @@ func runTestInproxy() error {
 
 			Logger: logger,
 
+			WaitForNetworkConnectivity: func() bool {
+				return true
+			},
+
 			GetBrokerClient: func() (*BrokerClient, error) {
 				return brokerClient, nil
 			},

+ 9 - 0
psiphon/common/inproxy/proxy.go

@@ -74,6 +74,11 @@ type ProxyConfig struct {
 	// EnableWebRTCDebugLogging indicates whether to emit WebRTC debug logs.
 	EnableWebRTCDebugLogging bool
 
+	// WaitForNetworkConnectivity is a callback that should block until there
+	// is network connectivity or shutdown. The return value is true when
+	// there is network connectivity, and false for shutdown.
+	WaitForNetworkConnectivity func() bool
+
 	// GetBrokerClient provides a BrokerClient which the proxy will use for
 	// making broker requests. If GetBrokerClient returns a shared
 	// BrokerClient instance, the BrokerClient must support multiple,
@@ -304,6 +309,10 @@ func (p *Proxy) proxyClients(ctx context.Context, delayFirstAnnounce bool) {
 
 	for i := 0; ctx.Err() == nil; i++ {
 
+		if !p.config.WaitForNetworkConnectivity() {
+			break
+		}
+
 		// When delayFirstAnnounce is true, the very first proxyOneClient
 		// proxy announcement is delayed to give another, concurrent
 		// proxyClients proxy announcment a head start in order to fetch and

+ 1 - 0
psiphon/common/inproxy/session.go

@@ -321,6 +321,7 @@ func (s *InitiatorSessions) RoundTrip(
 			// If this round trip owns its session and there are any
 			// waitToShareSession initiators awaiting the session, signal them
 			// that the session will not become ready.
+
 			rt.TransportFailed()
 
 			return nil, errors.Trace(err)

+ 6 - 1
psiphon/common/inproxy/webrtc.go

@@ -365,7 +365,7 @@ func newWebRTCConn(
 	config.Logger.WithTraceFields(common.LogFields{
 		"dtls_randomization":           config.DoDTLSRandomization,
 		"data_channel_traffic_shaping": config.TrafficShapingParameters != nil,
-	}).Info("data_channel_obfuscation")
+	}).Info("webrtc_data_channel_obfuscation")
 
 	// Facilitate DTLS Client/ServerHello randomization. The client decides
 	// whether to do DTLS randomization and generates and the proxy receives
@@ -663,6 +663,11 @@ func newWebRTCConn(
 		conn.portMapper = nil
 	}
 
+	config.Logger.WithTraceFields(common.LogFields{
+		"ice_completed": iceCompleted,
+		"port_mapping":  portMappingExternalAddr != "",
+	}).Info("webrtc_candidates_gathered")
+
 	// Get the offer or answer, now populated with any ICE candidates.
 
 	localDescription := conn.peerConnection.LocalDescription()

+ 32 - 14
psiphon/common/net.go

@@ -22,6 +22,7 @@ package common
 import (
 	"container/list"
 	"context"
+	"io"
 	"net"
 	"net/http"
 	"net/netip"
@@ -151,51 +152,68 @@ func PortFromAddr(addr net.Addr) int {
 // close a set of open connections, etc.
 // Once the list is closed, no more items may be added to the
 // list (unless it is reset).
-type Conns struct {
+type Conns[T interface {
+	comparable
+	io.Closer
+}] struct {
 	mutex    sync.Mutex
 	isClosed bool
-	conns    map[net.Conn]bool
+	conns    map[T]bool
 }
 
 // NewConns initializes a new Conns.
-func NewConns() *Conns {
-	return &Conns{}
+func NewConns[T interface {
+	comparable
+	io.Closer
+}]() *Conns[T] {
+	return &Conns[T]{}
 }
 
-func (conns *Conns) Reset() {
+func (conns *Conns[T]) Reset() {
 	conns.mutex.Lock()
 	defer conns.mutex.Unlock()
 	conns.isClosed = false
-	conns.conns = make(map[net.Conn]bool)
+	conns.conns = make(map[T]bool)
 }
 
-func (conns *Conns) Add(conn net.Conn) bool {
+func (conns *Conns[T]) Add(conn T) bool {
 	conns.mutex.Lock()
 	defer conns.mutex.Unlock()
 	if conns.isClosed {
 		return false
 	}
 	if conns.conns == nil {
-		conns.conns = make(map[net.Conn]bool)
+		conns.conns = make(map[T]bool)
 	}
 	conns.conns[conn] = true
 	return true
 }
 
-func (conns *Conns) Remove(conn net.Conn) {
+func (conns *Conns[T]) Remove(conn T) {
 	conns.mutex.Lock()
 	defer conns.mutex.Unlock()
 	delete(conns.conns, conn)
 }
 
-func (conns *Conns) CloseAll() {
+func (conns *Conns[T]) CloseAll() {
+
 	conns.mutex.Lock()
-	defer conns.mutex.Unlock()
 	conns.isClosed = true
-	for conn := range conns.conns {
-		conn.Close()
+	closeConns := conns.conns
+	conns.conns = make(map[T]bool)
+	conns.mutex.Unlock()
+
+	// Close is invoked outside of the mutex in case a member conn's Close
+	// invokes Remove.
+	for conn := range closeConns {
+		_ = conn.Close()
 	}
-	conns.conns = make(map[net.Conn]bool)
+}
+
+func (conns *Conns[T]) IsClosed() bool {
+	conns.mutex.Lock()
+	defer conns.mutex.Unlock()
+	return conns.isClosed
 }
 
 // LRUConns is a concurrency-safe list of net.Conns ordered

+ 2 - 2
psiphon/common/parameters/parameters.go

@@ -865,11 +865,11 @@ var defaultParameters = map[string]struct {
 	InproxyCommonCompartmentIDs:                        {value: InproxyCompartmentIDsValue{}},
 	InproxyMaxCompartmentIDListLength:                  {value: 50, minimum: 0},
 	InproxyBrokerMatcherAnnouncementLimitEntryCount:    {value: 50, minimum: 0, flags: serverSideOnly},
-	InproxyBrokerMatcherAnnouncementRateLimitQuantity:  {value: 10, minimum: 0, flags: serverSideOnly},
+	InproxyBrokerMatcherAnnouncementRateLimitQuantity:  {value: 50, minimum: 0, flags: serverSideOnly},
 	InproxyBrokerMatcherAnnouncementRateLimitInterval:  {value: 1 * time.Minute, minimum: time.Duration(0), flags: serverSideOnly},
 	InproxyBrokerMatcherAnnouncementNonlimitedProxyIDs: {value: []string{}, flags: serverSideOnly},
 	InproxyBrokerMatcherOfferLimitEntryCount:           {value: 10, minimum: 0, flags: serverSideOnly},
-	InproxyBrokerMatcherOfferRateLimitQuantity:         {value: 10, minimum: 0, flags: serverSideOnly},
+	InproxyBrokerMatcherOfferRateLimitQuantity:         {value: 50, minimum: 0, flags: serverSideOnly},
 	InproxyBrokerMatcherOfferRateLimitInterval:         {value: 1 * time.Minute, minimum: time.Duration(0)},
 	InproxyBrokerProxyAnnounceTimeout:                  {value: 2 * time.Minute, minimum: time.Duration(0), flags: serverSideOnly},
 	InproxyBrokerClientOfferTimeout:                    {value: 10 * time.Second, minimum: time.Duration(0), flags: serverSideOnly},

+ 46 - 9
psiphon/common/protocol/packed.go

@@ -501,6 +501,42 @@ func unpackRawJSON(v interface{}) (interface{}, error) {
 	return unmarshaledJSON, nil
 }
 
+func unpackSliceOfJSONCompatibleMaps(v interface{}) (interface{}, error) {
+
+	// For compatibility with the legacy JSON encoding as used for tactics
+	// speed test sample parameters. This converts CBOR maps of type map
+	// [interface{}]interface{} to JSON-compatible maps of type map
+	// [string]interface{}.
+
+	if v == nil {
+		return nil, nil
+	}
+
+	packedEntries, ok := v.([]interface{})
+	if !ok {
+		return nil, errors.Tracef("expected []interface{} type")
+	}
+
+	entries := make([]map[string]interface{}, len(packedEntries))
+
+	for i, packedEntry := range packedEntries {
+		entry, ok := packedEntry.(map[interface{}]interface{})
+		if !ok {
+			return nil, errors.Tracef("expected map[interface{}]interface{} type")
+		}
+		entries[i] = make(map[string]interface{})
+		for key, value := range entry {
+			strKey, ok := key.(string)
+			if !ok {
+				return nil, errors.Tracef("expected string type")
+			}
+			entries[i][strKey] = value
+		}
+	}
+
+	return entries, nil
+}
+
 func stringOrTextMarshal(v interface{}) (string, error) {
 	switch value := v.(type) {
 	case string:
@@ -528,14 +564,15 @@ var (
 	packedServerEntryFieldsNameToSpec = make(map[string]packSpec)
 	packedServerEntryFieldsKeyToSpec  = make(map[int]packSpec)
 
-	intConverter            = &packConverter{packInt, unpackInt}
-	floatConverter          = &packConverter{packFloat, unpackFloat}
-	lowerHexConverter       = &packConverter{packHex, unpackHexLower}
-	upperHexConverter       = &packConverter{packHex, unpackHexUpper}
-	base64Converter         = &packConverter{packBase64, unpackBase64}
-	unpaddedBase64Converter = &packConverter{packUnpaddedBase64, unpackUnpaddedBase64}
-	authorizationsConverter = &packConverter{packAuthorizations, unpackAuthorizations}
-	rawJSONConverter        = &packConverter{packNoop, unpackRawJSON}
+	intConverter               = &packConverter{packInt, unpackInt}
+	floatConverter             = &packConverter{packFloat, unpackFloat}
+	lowerHexConverter          = &packConverter{packHex, unpackHexLower}
+	upperHexConverter          = &packConverter{packHex, unpackHexUpper}
+	base64Converter            = &packConverter{packBase64, unpackBase64}
+	unpaddedBase64Converter    = &packConverter{packUnpaddedBase64, unpackUnpaddedBase64}
+	authorizationsConverter    = &packConverter{packAuthorizations, unpackAuthorizations}
+	rawJSONConverter           = &packConverter{packNoop, unpackRawJSON}
+	compatibleJSONMapConverter = &packConverter{packNoop, unpackSliceOfJSONCompatibleMaps}
 )
 
 func init() {
@@ -563,7 +600,7 @@ func init() {
 		// tactics.STORED_TACTICS_TAG_PARAMETER_NAME
 
 		{2, "stored_tactics_tag", lowerHexConverter},
-		{3, "speed_test_samples", nil},
+		{3, "speed_test_samples", compatibleJSONMapConverter},
 		{4, "applied_tactics_tag", lowerHexConverter},
 
 		// Specs: server.baseParams

+ 1 - 1
psiphon/common/resolver/resolver.go

@@ -671,7 +671,7 @@ func (r *Resolver) ResolveIP(
 	resolveCtx, cancelFunc := context.WithCancel(ctx)
 	defer cancelFunc()
 	waitGroup := new(sync.WaitGroup)
-	conns := common.NewConns()
+	conns := common.NewConns[net.Conn]()
 	type answer struct {
 		attempt int
 		IPs     []net.IP

+ 2 - 1
psiphon/common/tactics/tactics.go

@@ -920,6 +920,7 @@ func (server *Server) GetTactics(
 
 			var speedTestSamples []SpeedTestSample
 			err := getJSONRequestParam(apiParams, SPEED_TEST_SAMPLES_PARAMETER_NAME, &speedTestSamples)
+
 			if err != nil {
 				// TODO: log speed test parameter errors?
 				// This API param is not explicitly validated elsewhere.
@@ -981,7 +982,7 @@ func getJSONRequestParam(apiParams common.APIParameters, name string, value inte
 
 	// Remarshal the parameter from common.APIParameters, as the initial API parameter
 	// unmarshal will not have known the correct target type. I.e., instead of doing
-	// unmarhsal-into-struct, common.APIParameters will have an unmarshal-into-interface
+	// unmarshal-into-struct, common.APIParameters will have an unmarshal-into-interface
 	// value as described here: https://golang.org/pkg/encoding/json/#Unmarshal.
 
 	jsonValue, err := json.Marshal(apiParams[name])

+ 7 - 0
psiphon/controller.go

@@ -2586,6 +2586,7 @@ func (controller *Controller) runInproxyProxy() {
 	config := &inproxy.ProxyConfig{
 		Logger:                        NoticeCommonLogger(debugLogging),
 		EnableWebRTCDebugLogging:      debugLogging,
+		WaitForNetworkConnectivity:    controller.inproxyWaitForNetworkConnectivity,
 		GetBrokerClient:               controller.inproxyGetProxyBrokerClient,
 		GetBaseAPIParameters:          controller.inproxyGetAPIParameters,
 		MakeWebRTCDialCoordinator:     controller.inproxyMakeWebRTCDialCoordinator,
@@ -2652,6 +2653,12 @@ func (controller *Controller) inproxyAwaitBrokerSpecs(isProxy bool) bool {
 	}
 }
 
+func (controller *Controller) inproxyWaitForNetworkConnectivity() bool {
+	return WaitForNetworkConnectivity(
+		controller.runCtx,
+		controller.config.NetworkConnectivityChecker)
+}
+
 // inproxyGetProxyBrokerClient returns the broker client shared by all proxy
 // operations.
 func (controller *Controller) inproxyGetProxyBrokerClient() (*inproxy.BrokerClient, error) {

+ 2 - 4
psiphon/httpProxy.go

@@ -74,7 +74,6 @@ import (
 //
 // Origin URLs must include the scheme prefix ("http://" or "https://") and must be
 // URL encoded.
-//
 type HttpProxy struct {
 	config                 *Config
 	tunneler               Tunneler
@@ -86,7 +85,7 @@ type HttpProxy struct {
 	urlProxyDirectRelay    *http.Transport
 	urlProxyDirectClient   *http.Client
 	responseHeaderTimeout  time.Duration
-	openConns              *common.Conns
+	openConns              *common.Conns[net.Conn]
 	stopListeningBroadcast chan struct{}
 	listenIP               string
 	listenPort             int
@@ -173,7 +172,7 @@ func NewHttpProxy(
 		urlProxyDirectRelay:    urlProxyDirectRelay,
 		urlProxyDirectClient:   urlProxyDirectClient,
 		responseHeaderTimeout:  responseHeaderTimeout,
-		openConns:              common.NewConns(),
+		openConns:              common.NewConns[net.Conn](),
 		stopListeningBroadcast: make(chan struct{}),
 		listenIP:               proxyIP,
 		listenPort:             proxyPort,
@@ -226,7 +225,6 @@ func (proxy *HttpProxy) Close() {
 // Copyright 2011 The Go Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
-//
 func (proxy *HttpProxy) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
 	if request.Method == "CONNECT" {
 		conn := hijack(responseWriter)

+ 47 - 18
psiphon/inproxy.go

@@ -28,7 +28,6 @@ import (
 	"net"
 	"net/http"
 	"net/netip"
-	"runtime"
 	"strconv"
 	"sync"
 	"sync/atomic"
@@ -100,6 +99,8 @@ func NewInproxyBrokerClientManager(
 // called when tactics have changed, which triggers a broker client reset in
 // order to apply potentially changed parameters.
 func (b *InproxyBrokerClientManager) TacticsApplied() error {
+	b.mutex.Lock()
+	defer b.mutex.Unlock()
 
 	// TODO: as a future future enhancement, don't reset when the tactics
 	// brokerSpecs.Hash() is unchanged?
@@ -133,7 +134,7 @@ func (b *InproxyBrokerClientManager) GetBrokerClient(
 		nil
 }
 
-func (b *InproxyBrokerClientManager) resetBrokerClientOnRoundTripFailed() error {
+func (b *InproxyBrokerClientManager) resetBrokerClientOnRoundTripperFailed() error {
 	b.mutex.Lock()
 	defer b.mutex.Unlock()
 
@@ -144,6 +145,16 @@ func (b *InproxyBrokerClientManager) reset() error {
 
 	// Assumes b.mutex lock is held.
 
+	if b.brokerClientInstance != nil {
+
+		// Close the existing broker client. This will close all underlying
+		// network connections, interrupting any in-flight requests. This
+		// close is invoked in the resetBrokerClientOnRoundTripperFailed
+		// case, where it's expected that the round tripped has permanently
+		// failed.
+		b.brokerClientInstance.Close()
+	}
+
 	// Any existing broker client is removed, even if
 	// NewInproxyBrokerClientInstance fails. This ensures, for example, that
 	// an existing broker client is removed when its spec is no longer
@@ -167,7 +178,7 @@ func (b *InproxyBrokerClientManager) reset() error {
 
 // InproxyBrokerClientInstance pairs an inproxy.BrokerClient instance with an
 // implementation of the inproxy.BrokerDialCoordinator interface and the
-// associated, underlying broker dial paramaters. InproxyBrokerClientInstance
+// associated, underlying broker dial parameters. InproxyBrokerClientInstance
 // implements broker client dial replay.
 type InproxyBrokerClientInstance struct {
 	config                        *Config
@@ -358,8 +369,8 @@ func NewInproxyBrokerClientInstance(
 
 	// Initialize broker client. This will start with a fresh broker session.
 	//
-	// When resetBrokerClientOnRoundTripFailed is invoked due to a failure at
-	// the transport level -- TLS or domain fronting --
+	// When resetBrokerClientOnRoundTripperFailed is invoked due to a failure
+	// at the transport level -- TLS or domain fronting --
 	// NewInproxyBrokerClientInstance is invoked, resetting both the broker
 	// client round tripper and the broker session. As a future enhancement,
 	// consider distinguishing between transport and session errors and
@@ -371,16 +382,6 @@ func NewInproxyBrokerClientInstance(
 		return nil, errors.Trace(err)
 	}
 
-	// Set a finalizer to close any open network resources associated with the
-	// round tripper once the InproxyBrokerClientInstance is no longer
-	// referenced. Note that there's no explicit call to close in
-	// InproxyBrokerClientManager.reset when a new instance is created in
-	// case the old insstance is still in use.
-
-	runtime.SetFinalizer(b, func(b *InproxyBrokerClientInstance) {
-		_ = b.roundTripper.Close()
-	})
-
 	return b, nil
 }
 
@@ -494,6 +495,14 @@ func prepareCompartmentIDs(
 	return commonCompartmentIDs, personalCompartmentIDs, nil
 }
 
+// Close closes the broker client round tripped, including closing all
+// underlying network connections, which will interrupt any in-flight round
+// trips.
+func (b *InproxyBrokerClientInstance) Close() error {
+	err := b.roundTripper.Close()
+	return errors.Trace(err)
+}
+
 // Implements the inproxy.BrokerDialCoordinator interface.
 func (b *InproxyBrokerClientInstance) NetworkID() string {
 	return b.networkID
@@ -623,7 +632,7 @@ func (b *InproxyBrokerClientInstance) BrokerClientRoundTripperFailed(roundTrippe
 		}
 	}
 
-	// Invoke resetBrokerClientOnRoundTripFailed to signal the
+	// Invoke resetBrokerClientOnRoundTripperFailed to signal the
 	// InproxyBrokerClientManager to create a new
 	// InproxyBrokerClientInstance, with new dial parameters and a new round
 	// tripper, after a failure.
@@ -635,7 +644,7 @@ func (b *InproxyBrokerClientInstance) BrokerClientRoundTripperFailed(roundTrippe
 	// Limitation: a transport-level failure may unnecessarily reset the
 	// broker session state; see comment in NewInproxyBrokerClientInstance.
 
-	err := b.brokerClientManager.resetBrokerClientOnRoundTripFailed()
+	err := b.brokerClientManager.resetBrokerClientOnRoundTripperFailed()
 	if err != nil {
 		NoticeWarning("reset broker client failed: %v", errors.Trace(err))
 		// Continue with old broker client instance.
@@ -1218,7 +1227,7 @@ func (rt *InproxyBrokerRoundTripper) RoundTrip(
 	// connection, while concurrent round trips over HTTP connections may
 	// spawn additional TLS persistent connections.
 	//
-	// There is no retry here is DialMeek fails, as higher levels will invoke
+	// There is no retry here if DialMeek fails, as higher levels will invoke
 	// BrokerClientRoundTripperFailed on failure, clear any replay, select
 	// new dial parameters, and retry.
 
@@ -1273,8 +1282,27 @@ func (rt *InproxyBrokerRoundTripper) RoundTrip(
 	if err == nil {
 		defer response.Body.Close()
 		if response.StatusCode != http.StatusOK {
+
+			// This case is treated as a temporary round tripper failure,
+			// since we received a response from the CDN, secured with TLS
+			// and VerifyPins, or from broker itself. One common scenario is
+			// the CDN returning a temporary gateway failed or timeout error.
+			// In this scenario, we can reuse the existing round tripper and
+			// it may be counterproductive to return a RoundTripperFailedError
+			// which will trigger a clearing of any broker dial replay
+			// parameters as well as reseting the round tripper.
+
 			err = fmt.Errorf("unexpected response status code: %d", response.StatusCode)
 		}
+	} else if ctx.Err() != context.Canceled {
+
+		// Other round trip errors, including TLS failures and client-side
+		// timeouts,  but excluding a cancelled context as happens on
+		// shutdown, are classified as RoundTripperFailedErrors, which will
+		// invoke BrokerClientRoundTripperFailed, resetting the round tripper
+		// and clearing replay parameters.
+
+		err = inproxy.NewRoundTripperFailedError(err)
 	}
 	if err != nil {
 		return nil, errors.Trace(err)
@@ -1282,6 +1310,7 @@ func (rt *InproxyBrokerRoundTripper) RoundTrip(
 
 	responsePayload, err := io.ReadAll(response.Body)
 	if err != nil {
+		err = inproxy.NewRoundTripperFailedError(err)
 		return nil, errors.Trace(err)
 	}
 

+ 197 - 105
psiphon/meekConn.go

@@ -34,7 +34,6 @@ import (
 	"net/url"
 	"strings"
 	"sync"
-	"sync/atomic"
 	"time"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
@@ -244,15 +243,14 @@ type MeekConn struct {
 	tlsPadding                int
 	limitRequestPayloadLength int
 	redialTLSProbability      float64
-	underlyingDialer          common.Dialer
-	cachedTLSDialer           *cachedTLSDialer
 	transport                 transporter
-	mutex                     sync.Mutex
-	isClosed                  bool
-	runCtx                    context.Context
-	stopRunning               context.CancelFunc
-	relayWaitGroup            *sync.WaitGroup
-	firstUnderlyingConn       net.Conn
+	connManager               *meekUnderlyingConnManager
+
+	mutex          sync.Mutex
+	isClosed       bool
+	runCtx         context.Context
+	stopRunning    context.CancelFunc
+	relayWaitGroup *sync.WaitGroup
 
 	// For MeekModeObfuscatedRoundTrip
 	meekCookieEncryptionPublicKey string
@@ -324,20 +322,6 @@ func DialMeek(
 
 	runCtx, stopRunning := context.WithCancel(context.Background())
 
-	cleanupStopRunning := true
-	cleanupCachedTLSDialer := true
-	var cachedTLSDialer *cachedTLSDialer
-
-	// Cleanup in error cases
-	defer func() {
-		if cleanupStopRunning {
-			stopRunning()
-		}
-		if cleanupCachedTLSDialer && cachedTLSDialer != nil {
-			cachedTLSDialer.close()
-		}
-	}()
-
 	meek := &MeekConn{
 		params:                   meekConfig.Parameters,
 		mode:                     meekConfig.Mode,
@@ -348,6 +332,19 @@ func DialMeek(
 		relayWaitGroup:           new(sync.WaitGroup),
 	}
 
+	cleanupStopRunning := true
+	cleanupConns := true
+
+	// Cleanup in error cases
+	defer func() {
+		if cleanupStopRunning {
+			meek.stopRunning()
+		}
+		if cleanupConns && meek.connManager != nil {
+			meek.connManager.closeAll()
+		}
+	}()
+
 	if meek.mode == MeekModeRelay {
 		var err error
 		meek.cookie,
@@ -396,13 +393,15 @@ func DialMeek(
 			return packetConn, remoteAddr, nil
 		}
 
+		meek.connManager = newMeekUnderlyingConnManager(nil, nil, udpDialer)
+
 		var err error
 		transport, err = quic.NewQUICTransporter(
 			ctx,
 			func(message string) {
 				NoticeInfo(message)
 			},
-			udpDialer,
+			meek.connManager.dialPacketConn,
 			meekConfig.SNIServerName,
 			meekConfig.QUICVersion,
 			meekConfig.QUICClientHelloSeed,
@@ -448,12 +447,10 @@ func DialMeek(
 
 		scheme = "https"
 
-		meek.initUnderlyingDialer(dialConfig)
-
 		tlsConfig := &CustomTLSConfig{
 			Parameters:                    meekConfig.Parameters,
 			DialAddr:                      meekConfig.DialAddress,
-			Dial:                          meek.underlyingDial,
+			Dial:                          NewTCPDialer(dialConfig),
 			SNIServerName:                 meekConfig.SNIServerName,
 			SkipVerify:                    skipVerify,
 			VerifyServerName:              meekConfig.VerifyServerName,
@@ -531,22 +528,19 @@ func DialMeek(
 			return nil, errors.Trace(err)
 		}
 
-		cachedTLSDialer = newCachedTLSDialer(preConn, tlsDialer)
+		meek.connManager = newMeekUnderlyingConnManager(preConn, tlsDialer, nil)
 
 		if IsTLSConnUsingHTTP2(preConn) {
 			NoticeInfo("negotiated HTTP/2 for %s", meekConfig.DiagnosticID)
 			transport = &http2.Transport{
 				DialTLSContext: func(
 					ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) {
-					return cachedTLSDialer.dial(ctx, network, addr)
+					return meek.connManager.dial(ctx, network, addr)
 				},
 			}
 		} else {
 			transport = &http.Transport{
-				DialTLSContext: func(
-					ctx context.Context, network, addr string) (net.Conn, error) {
-					return cachedTLSDialer.dial(ctx, network, addr)
-				},
+				DialTLSContext: meek.connManager.dial,
 			}
 		}
 
@@ -577,8 +571,7 @@ func DialMeek(
 			*copyDialConfig = *dialConfig
 			copyDialConfig.UpstreamProxyURL = ""
 
-			meek.initUnderlyingDialer(copyDialConfig)
-			dialer = meek.underlyingDial
+			dialer = NewTCPDialer(copyDialConfig)
 
 			// In this proxy case, the destination server address is in the
 			// request line URL. net/http will render the request line using
@@ -602,8 +595,7 @@ func DialMeek(
 			// If dialConfig.UpstreamProxyURL is set, HTTP proxying via
 			// CONNECT will be used by the dialer.
 
-			meek.initUnderlyingDialer(dialConfig)
-			baseDialer := meek.underlyingDial
+			baseDialer := NewTCPDialer(dialConfig)
 
 			// The dialer ignores any address that http.Transport will pass in
 			// (derived from the HTTP request URL) and always dials
@@ -617,14 +609,19 @@ func DialMeek(
 			// Only apply transformer if it will perform a transform; otherwise
 			// applying a no-op transform will incur an unnecessary performance
 			// cost.
-			if meekConfig.HTTPTransformerParameters != nil && meekConfig.HTTPTransformerParameters.ProtocolTransformSpec != nil {
-				dialer = transforms.WrapDialerWithHTTPTransformer(dialer, meekConfig.HTTPTransformerParameters)
+			if meekConfig.HTTPTransformerParameters != nil &&
+				meekConfig.HTTPTransformerParameters.ProtocolTransformSpec != nil {
+
+				dialer = transforms.WrapDialerWithHTTPTransformer(
+					dialer, meekConfig.HTTPTransformerParameters)
 			}
 		}
 
+		meek.connManager = newMeekUnderlyingConnManager(nil, dialer, nil)
+
 		httpTransport := &http.Transport{
 			Proxy:       proxyUrl,
-			DialContext: dialer,
+			DialContext: meek.connManager.dial,
 		}
 
 		if proxyUrl != nil {
@@ -694,12 +691,11 @@ func DialMeek(
 
 	meek.url = url
 	meek.additionalHeaders = additionalHeaders
-	meek.cachedTLSDialer = cachedTLSDialer
 	meek.transport = transport
 
 	// stopRunning and cachedTLSDialer will now be closed in meek.Close()
 	cleanupStopRunning = false
-	cleanupCachedTLSDialer = false
+	cleanupConns = false
 
 	// Allocate relay resources, including buffers and running the relay
 	// go routine, only when running in relay mode.
@@ -763,56 +759,175 @@ func DialMeek(
 	return meek, nil
 }
 
-func (meek *MeekConn) initUnderlyingDialer(dialConfig *DialConfig) {
+type meekPacketConnDialer func(ctx context.Context) (net.PacketConn, *net.UDPAddr, error)
 
-	// Not safe for concurrent calls; should be called only from DialMeek.
-	meek.underlyingDialer = NewTCPDialer(dialConfig)
+// meekUnderlyingConnManager tracks the TCP/TLS and UDP connections underlying
+// the meek HTTP/HTTPS/QUIC transports. This tracking is used to:
+//
+//   - Use the cached predial TLS conn created in DialMeek.
+//   - Gather metrics from mechanisms enabled in the underlying conns, such as
+//     the fragmentor, or inproxy.
+//   - Fully close all underlying connections with the MeekConn is closed.
+type meekUnderlyingConnManager struct {
+	mutex           sync.Mutex
+	cachedConn      net.Conn
+	firstConn       net.Conn
+	firstPacketConn net.PacketConn
+
+	dialer       common.Dialer
+	managedConns *common.Conns[net.Conn]
+
+	packetConnDialer   meekPacketConnDialer
+	managedPacketConns *common.Conns[net.PacketConn]
 }
 
-func (meek *MeekConn) underlyingDial(ctx context.Context, network, addr string) (net.Conn, error) {
-	conn, err := meek.underlyingDialer(ctx, network, addr)
-	if err == nil {
-		meek.mutex.Lock()
-		if meek.firstUnderlyingConn == nil {
-			// Keep a reference to the first underlying conn to be used as a
-			// common.MetricsSource in GetMetrics. This enables capturing
-			// metrics such as fragmentor configuration.
-			meek.firstUnderlyingConn = conn
-		}
-		meek.mutex.Unlock()
-	}
+type meekUnderlyingConn struct {
+	net.Conn
+	connManager *meekUnderlyingConnManager
+}
+
+func (conn *meekUnderlyingConn) Close() error {
+	conn.connManager.managedConns.Remove(conn)
+
 	// Note: no trace error to preserve error type
-	return conn, err
+	return conn.Conn.Close()
+}
+
+type meekUnderlyingPacketConn struct {
+	net.PacketConn
+	connManager *meekUnderlyingConnManager
 }
 
-type cachedTLSDialer struct {
-	usedCachedConn int32
-	cachedConn     net.Conn
-	dialer         common.Dialer
+func (packetConn *meekUnderlyingPacketConn) Close() error {
+	packetConn.connManager.managedPacketConns.Remove(packetConn)
+	return packetConn.PacketConn.Close()
 }
 
-func newCachedTLSDialer(cachedConn net.Conn, dialer common.Dialer) *cachedTLSDialer {
-	return &cachedTLSDialer{
-		cachedConn: cachedConn,
-		dialer:     dialer,
+func newMeekUnderlyingConnManager(
+	cachedConn net.Conn,
+	dialer common.Dialer,
+	packetConnDialer meekPacketConnDialer) *meekUnderlyingConnManager {
+
+	m := &meekUnderlyingConnManager{
+		dialer:       dialer,
+		managedConns: common.NewConns[net.Conn](),
+
+		packetConnDialer:   packetConnDialer,
+		managedPacketConns: common.NewConns[net.PacketConn](),
 	}
+
+	if cachedConn != nil {
+		m.cachedConn = &meekUnderlyingConn{Conn: cachedConn, connManager: m}
+		m.firstConn = cachedConn
+	}
+
+	return m
 }
 
-func (c *cachedTLSDialer) dial(ctx context.Context, network, addr string) (net.Conn, error) {
-	if atomic.CompareAndSwapInt32(&c.usedCachedConn, 0, 1) {
-		conn := c.cachedConn
-		c.cachedConn = nil
+func (m *meekUnderlyingConnManager) GetMetrics() common.LogFields {
+
+	logFields := common.LogFields{}
+
+	m.mutex.Lock()
+	underlyingMetrics, ok := m.firstConn.(common.MetricsSource)
+	if ok {
+		logFields.Add(underlyingMetrics.GetMetrics())
+	}
+
+	underlyingMetrics, ok = m.firstPacketConn.(common.MetricsSource)
+	if ok {
+		logFields.Add(underlyingMetrics.GetMetrics())
+	}
+	m.mutex.Unlock()
+
+	return logFields
+}
+
+func (m *meekUnderlyingConnManager) dial(
+	ctx context.Context, network, addr string) (net.Conn, error) {
+
+	if m.managedConns.IsClosed() {
+		return nil, errors.TraceNew("closed")
+	}
+
+	// Consume the cached conn when present.
+
+	m.mutex.Lock()
+	var conn net.Conn
+	if m.cachedConn != nil {
+		conn = m.cachedConn
+		m.cachedConn = nil
+	}
+	m.mutex.Unlock()
+
+	if conn != nil {
 		return conn, nil
 	}
 
-	return c.dialer(ctx, network, addr)
+	// The mutex lock is not held for the duration of dial, allowing for
+	// concurrent dials.
+
+	conn, err := m.dialer(ctx, network, addr)
+	if err != nil {
+		// Note: no trace error to preserve error type
+		return nil, err
+	}
+
+	// Keep a reference to the first underlying conn to be used as a
+	// common.MetricsSource in GetMetrics. This enables capturing metrics
+	// such as fragmentor configuration.
+
+	m.mutex.Lock()
+	if m.firstConn == nil {
+		m.firstConn = conn
+	}
+	m.mutex.Unlock()
+
+	// Wrap the dialed conn with meekUnderlyingConn, which will remove the
+	// conn from the set of tracked conns when the conn is closed.
+
+	conn = &meekUnderlyingConn{Conn: conn, connManager: m}
+
+	if !m.managedConns.Add(conn) {
+		_ = conn.Close()
+		return nil, errors.TraceNew("closed")
+	}
+
+	return conn, nil
 }
 
-func (c *cachedTLSDialer) close() {
-	if atomic.CompareAndSwapInt32(&c.usedCachedConn, 0, 1) {
-		c.cachedConn.Close()
-		c.cachedConn = nil
+func (m *meekUnderlyingConnManager) dialPacketConn(
+	ctx context.Context) (net.PacketConn, *net.UDPAddr, error) {
+
+	if m.managedPacketConns.IsClosed() {
+		return nil, nil, errors.TraceNew("closed")
 	}
+
+	packetConn, addr, err := m.packetConnDialer(ctx)
+	if err != nil {
+		// Note: no trace error to preserve error type
+		return nil, nil, err
+	}
+
+	m.mutex.Lock()
+	if m.firstPacketConn != nil {
+		m.firstPacketConn = packetConn
+	}
+	m.mutex.Unlock()
+
+	packetConn = &meekUnderlyingPacketConn{PacketConn: packetConn, connManager: m}
+
+	if !m.managedPacketConns.Add(packetConn) {
+		_ = packetConn.Close()
+		return nil, nil, errors.TraceNew("closed")
+	}
+
+	return packetConn, addr, nil
+}
+
+func (m *meekUnderlyingConnManager) closeAll() {
+	m.managedConns.CloseAll()
+	m.managedPacketConns.CloseAll()
 }
 
 // Close terminates the meek connection and releases its resources. In in
@@ -828,31 +943,12 @@ func (meek *MeekConn) Close() (err error) {
 
 	if !isClosed {
 		meek.stopRunning()
-		if meek.cachedTLSDialer != nil {
-			meek.cachedTLSDialer.close()
-		}
-
-		// stopRunning interrupts HTTP requests in progress by closing the context
-		// associated with the request. In the case of h2quic.RoundTripper, testing
-		// indicates that quic-go.receiveStream.readImpl is _not_ interrupted in
-		// this case, and so an in-flight FRONTED-MEEK-QUIC round trip may hang shutdown
-		// in relayRoundTrip->readPayload->...->quic-go.receiveStream.readImpl.
-		// TODO: check if this is still the case in newer quic-go versions.
-		//
-		// To workaround this, we call CloseIdleConnections _before_ Wait, as, in
-		// the case of QUICTransporter, this closes the underlying UDP sockets which
-		// interrupts any blocking I/O calls.
-		//
-		// The standard CloseIdleConnections call _after_ wait is for the net/http
-		// case: it only closes idle connections, so the call should be after wait.
-		// This call is intended to clean up all network resources deterministically
-		// before Close returns.
-		if meek.isQUIC {
-			meek.transport.CloseIdleConnections()
-		}
-
+		meek.connManager.closeAll()
 		meek.relayWaitGroup.Wait()
-		meek.transport.CloseIdleConnections()
+
+		// meek.transport.CloseIdleConnections is no longed called here since
+		// meekUnderlyingConnManager.closeAll will terminate all underlying
+		// connections and prevent opening any new connections.
 	}
 	return nil
 }
@@ -877,16 +973,12 @@ func (meek *MeekConn) GetMetrics() common.LogFields {
 		logFields["meek_limit_request"] = meek.limitRequestPayloadLength
 		logFields["meek_redial_probability"] = meek.redialTLSProbability
 	}
+
 	// Include metrics, such as fragmentor metrics, from the _first_ underlying
 	// dial conn. Properties of subsequent underlying dial conns are not reflected
 	// in these metrics; we assume that the first dial conn, which most likely
 	// transits the various protocol handshakes, is most significant.
-	meek.mutex.Lock()
-	underlyingMetrics, ok := meek.firstUnderlyingConn.(common.MetricsSource)
-	if ok {
-		logFields.Add(underlyingMetrics.GetMetrics())
-	}
-	meek.mutex.Unlock()
+	logFields.Add(meek.connManager.GetMetrics())
 	return logFields
 }
 

+ 2 - 2
psiphon/server/meek.go

@@ -124,7 +124,7 @@ type MeekServer struct {
 	psiphonTLSConfig                *psiphon_tls.Config
 	obfuscatorSeedHistory           *obfuscator.SeedHistory
 	clientHandler                   func(clientConn net.Conn, data *additionalTransportData)
-	openConns                       *common.Conns
+	openConns                       *common.Conns[net.Conn]
 	stopBroadcast                   <-chan struct{}
 	sessionsLock                    sync.RWMutex
 	sessions                        map[string]*meekSession
@@ -240,7 +240,7 @@ func NewMeekServer(
 		httpClientIOTimeout:             httpClientIOTimeout,
 		obfuscatorSeedHistory:           obfuscator.NewSeedHistory(nil),
 		clientHandler:                   clientHandler,
-		openConns:                       common.NewConns(),
+		openConns:                       common.NewConns[net.Conn](),
 		stopBroadcast:                   stopBroadcast,
 		sessions:                        make(map[string]*meekSession),
 		checksumTable:                   checksumTable,

+ 0 - 79
psiphon/server/net.go

@@ -1,79 +0,0 @@
-/*
- * Copyright (c) 2016, Psiphon Inc.
- * All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- *
- */
-
-// for HTTPSServer.ServeTLS:
-/*
-Copyright (c) 2012 The Go Authors. All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are
-met:
-
-   * Redistributions of source code must retain the above copyright
-notice, this list of conditions and the following disclaimer.
-   * Redistributions in binary form must reproduce the above
-copyright notice, this list of conditions and the following disclaimer
-in the documentation and/or other materials provided with the
-distribution.
-   * Neither the name of Google Inc. nor the names of its
-contributors may be used to endorse or promote products derived from
-this software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-*/
-
-package server
-
-import (
-	"net"
-	"net/http"
-
-	tls "github.com/Psiphon-Labs/psiphon-tls"
-)
-
-// HTTPSServer is a wrapper around http.Server which adds the
-// ServeTLS function.
-type HTTPSServer struct {
-	*http.Server
-}
-
-// ServeTLS is similar to http.Serve, but uses TLS.
-//
-// The http package has both ListenAndServe and ListenAndServeTLS higher-
-// level interfaces, but only Serve (not TLS) offers a lower-level interface that
-// allows the caller to keep a refererence to the Listener, allowing for external
-// shutdown. ListenAndServeTLS also requires the TLS cert and key to be in files
-// and we avoid that here.
-//
-// Note that the http.Server.TLSConfig field is ignored and the tls.Config
-// parameter is used intead.
-func (server *HTTPSServer) ServeTLS(listener net.Listener, config *tls.Config) error {
-	tlsListener := tls.NewListener(listener, config)
-	return server.Serve(tlsListener)
-}

+ 2 - 2
psiphon/socksProxy.go

@@ -38,7 +38,7 @@ type SocksProxy struct {
 	tunneler               Tunneler
 	listener               *socks.SocksListener
 	serveWaitGroup         *sync.WaitGroup
-	openConns              *common.Conns
+	openConns              *common.Conns[net.Conn]
 	stopListeningBroadcast chan struct{}
 }
 
@@ -65,7 +65,7 @@ func NewSocksProxy(
 		tunneler:               tunneler,
 		listener:               socks.NewSocksListener(listener),
 		serveWaitGroup:         new(sync.WaitGroup),
-		openConns:              common.NewConns(),
+		openConns:              common.NewConns[net.Conn](),
 		stopListeningBroadcast: make(chan struct{}),
 	}
 	proxy.serveWaitGroup.Add(1)