Просмотр исходного кода

Add fragmentor dial stats and metrics

- Refactor fragmentor config to make it
  available for stats and logging.

- In the client, the fragmentor is made
  available to any DialTCP, and is now
  potentially applied to tactics dials.

- Limitation: downstream fragmentor may
  be used for TCP conns underlying meek,
  but stats are not yet captured in this
  case.
Rod Hynes 7 лет назад
Родитель
Сommit
4ee9d25885

+ 9 - 0
psiphon/TCPConn.go

@@ -27,6 +27,7 @@ import (
 	"sync/atomic"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/fragmentor"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/upstreamproxy"
 )
 
@@ -79,6 +80,14 @@ func DialTCP(
 			config.ResolvedIPCallback(ipAddress)
 		}
 	}
+
+	if config.FragmentorConfig.IsFragmenting() {
+		fragmentor.NewConn(
+			config.FragmentorConfig,
+			func(message string) { NoticeInfo(message) },
+			conn)
+	}
+
 	return conn, nil
 }
 

+ 102 - 22
psiphon/common/fragmentor/fragmentor.go

@@ -29,6 +29,7 @@ import (
 	"time"
 
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
 )
 
 const (
@@ -36,6 +37,89 @@ const (
 	MAX_FRAGMENTOR_ITERATIONS_PER_NOTICE = 5
 )
 
+// Config specifies a fragmentor configuration. NewUpstreamConfig and
+// NewDownstreamConfig will generate configurations based on the given
+// client parameters.
+type Config struct {
+	isUpstream      bool
+	bytesToFragment int
+	minWriteBytes   int
+	maxWriteBytes   int
+	minDelay        time.Duration
+	maxDelay        time.Duration
+}
+
+// NewUpstreamConfig creates a new Config; may return nil.
+func NewUpstreamConfig(
+	p *parameters.ClientParametersSnapshot, tunnelProtocol string) *Config {
+	return newConfig(p, true, tunnelProtocol)
+}
+
+// NewDownstreamConfig creates a new Config; may return nil.
+func NewDownstreamConfig(
+	p *parameters.ClientParametersSnapshot, tunnelProtocol string) *Config {
+	return newConfig(p, false, tunnelProtocol)
+}
+
+func newConfig(
+	p *parameters.ClientParametersSnapshot,
+	isUpstream bool,
+	tunnelProtocol string) *Config {
+
+	coinFlip := p.WeightedCoinFlip(parameters.FragmentorDownstreamProbability)
+	tunnelProtocols := p.TunnelProtocols(parameters.FragmentorDownstreamLimitProtocols)
+
+	if !coinFlip || (len(tunnelProtocols) > 0 && common.Contains(tunnelProtocols, tunnelProtocol)) {
+		return nil
+	}
+
+	bytesToFragment, err := common.MakeSecureRandomRange(
+		p.Int(parameters.FragmentorDownstreamMinTotalBytes),
+		p.Int(parameters.FragmentorDownstreamMaxTotalBytes))
+	if err != nil {
+		bytesToFragment = 0
+	}
+
+	if bytesToFragment == 0 {
+		return nil
+	}
+
+	return &Config{
+		isUpstream:      isUpstream,
+		bytesToFragment: bytesToFragment,
+		minWriteBytes:   p.Int(parameters.FragmentorDownstreamMinWriteBytes),
+		maxWriteBytes:   p.Int(parameters.FragmentorDownstreamMaxWriteBytes),
+		minDelay:        p.Duration(parameters.FragmentorDownstreamMinDelay),
+		maxDelay:        p.Duration(parameters.FragmentorDownstreamMaxDelay),
+	}
+}
+
+// IsFragmenting indicates whether the fragmentor configuration results in any
+// fragmentation; config may be nil.
+func (config *Config) IsFragmenting() bool {
+	return config != nil && config.bytesToFragment > 0
+}
+
+// GetMetrics returns the fragmentor configuration as log fields; config may
+// be nil.
+func (config *Config) GetMetrics() common.LogFields {
+	logFields := make(common.LogFields)
+	if config != nil {
+		var prefix string
+		if config.isUpstream {
+			prefix = "upstream_"
+		} else {
+			prefix = "downstream_"
+		}
+		logFields[prefix+"bytes_to_fragment"] = config.bytesToFragment
+		logFields[prefix+"min_write_bytes"] = config.minWriteBytes
+		logFields[prefix+"max_write_bytes"] = config.maxWriteBytes
+		logFields[prefix+"min_delay"] = int(config.minDelay / time.Microsecond)
+		logFields[prefix+"max_delay"] = int(config.maxDelay / time.Microsecond)
+	}
+	return logFields
+}
+
 // Conn implements simple fragmentation of application-level messages/packets
 // into multiple TCP packets by splitting writes into smaller sizes and adding
 // delays between writes.
@@ -46,47 +130,43 @@ const (
 // portion of a TCP flow.
 type Conn struct {
 	net.Conn
+	config          *Config
 	noticeEmitter   func(string)
 	runCtx          context.Context
 	stopRunning     context.CancelFunc
 	isClosed        int32
 	writeMutex      sync.Mutex
 	numNotices      int
-	bytesToFragment int
 	bytesFragmented int
-	minWriteBytes   int
-	maxWriteBytes   int
-	minDelay        time.Duration
-	maxDelay        time.Duration
 }
 
 // NewConn creates a new Conn.
 func NewConn(
-	conn net.Conn,
+	config *Config,
 	noticeEmitter func(string),
-	bytesToFragment, minWriteBytes, maxWriteBytes int,
-	minDelay, maxDelay time.Duration) *Conn {
+	conn net.Conn) *Conn {
 
 	runCtx, stopRunning := context.WithCancel(context.Background())
 	return &Conn{
-		Conn:            conn,
-		noticeEmitter:   noticeEmitter,
-		runCtx:          runCtx,
-		stopRunning:     stopRunning,
-		bytesToFragment: bytesToFragment,
-		minWriteBytes:   minWriteBytes,
-		maxWriteBytes:   maxWriteBytes,
-		minDelay:        minDelay,
-		maxDelay:        maxDelay,
+		Conn:          conn,
+		config:        config,
+		noticeEmitter: noticeEmitter,
+		runCtx:        runCtx,
+		stopRunning:   stopRunning,
 	}
 }
 
+// GetMetrics implements the common.MetricsSource interface.
+func (c *Conn) GetMetrics() common.LogFields {
+	return c.config.GetMetrics()
+}
+
 func (c *Conn) Write(buffer []byte) (int, error) {
 
 	c.writeMutex.Lock()
 	defer c.writeMutex.Unlock()
 
-	if c.bytesFragmented >= c.bytesToFragment {
+	if c.bytesFragmented >= c.config.bytesToFragment {
 		return c.Conn.Write(buffer)
 	}
 
@@ -112,9 +192,9 @@ func (c *Conn) Write(buffer []byte) (int, error) {
 	for iterations := 0; len(buffer) > 0; iterations += 1 {
 
 		delay, err := common.MakeSecureRandomPeriod(
-			c.minDelay, c.maxDelay)
+			c.config.minDelay, c.config.maxDelay)
 		if err != nil {
-			delay = c.minDelay
+			delay = c.config.minDelay
 		}
 
 		timer := time.NewTimer(delay)
@@ -130,12 +210,12 @@ func (c *Conn) Write(buffer []byte) (int, error) {
 			return totalBytesWritten, err
 		}
 
-		minWriteBytes := c.minWriteBytes
+		minWriteBytes := c.config.minWriteBytes
 		if minWriteBytes > len(buffer) {
 			minWriteBytes = len(buffer)
 		}
 
-		maxWriteBytes := c.maxWriteBytes
+		maxWriteBytes := c.config.maxWriteBytes
 		if maxWriteBytes > len(buffer) {
 			maxWriteBytes = len(buffer)
 		}

+ 8 - 0
psiphon/common/logger.go

@@ -42,3 +42,11 @@ type LogContext interface {
 // LogFields is type-compatible with psiphon/server.LogFields
 // and logrus.LogFields.
 type LogFields map[string]interface{}
+
+// MetricsSource is an object that provides metrics to be logged
+type MetricsSource interface {
+
+	// GetMetrics returns a LogFields populated with
+	// metrics from the MetricsSource
+	GetMetrics() LogFields
+}

+ 11 - 27
psiphon/common/tactics/tactics.go

@@ -1155,33 +1155,17 @@ func (listener *Listener) Accept() (net.Conn, error) {
 		// or not fragment all TCP connections for a one meek session, the server
 		// will make a coin flip per connection.
 
-		tunnelProtocols := p.TunnelProtocols(parameters.FragmentorDownstreamLimitProtocols)
-		if (len(tunnelProtocols) == 0 ||
-			common.Contains(tunnelProtocols, listener.tunnelProtocol)) &&
-			p.WeightedCoinFlip(parameters.FragmentorDownstreamProbability) {
-
-			totalBytes, err := common.MakeSecureRandomRange(
-				p.Int(parameters.FragmentorDownstreamMinTotalBytes),
-				p.Int(parameters.FragmentorDownstreamMaxTotalBytes))
-			if err != nil {
-				listener.server.logger.WithContextFields(
-					common.LogFields{"error": err}).Warning("MakeSecureRandomRange failed")
-				totalBytes = 0
-			}
-
-			if totalBytes > 0 {
-				conn = fragmentor.NewConn(
-					conn,
-					func(message string) {
-						listener.server.logger.WithContextFields(
-							common.LogFields{"message": message}).Debug("Fragmentor")
-					},
-					totalBytes,
-					p.Int(parameters.FragmentorDownstreamMinWriteBytes),
-					p.Int(parameters.FragmentorDownstreamMaxWriteBytes),
-					p.Duration(parameters.FragmentorDownstreamMinDelay),
-					p.Duration(parameters.FragmentorDownstreamMaxDelay))
-			}
+		fragmentorConfig := fragmentor.NewDownstreamConfig(
+			p, listener.tunnelProtocol)
+
+		if fragmentorConfig.IsFragmenting() {
+			conn = fragmentor.NewConn(
+				fragmentorConfig,
+				func(message string) {
+					listener.server.logger.WithContextFields(
+						common.LogFields{"message": message}).Debug("Fragmentor")
+				},
+				conn)
 		}
 
 		return conn, nil

+ 2 - 1
psiphon/controller.go

@@ -1343,7 +1343,8 @@ func (controller *Controller) doFetchTactics(
 
 	meekConfig.RoundTripperOnly = true
 
-	dialConfig, dialStats := initDialConfig(controller.config, meekConfig)
+	dialConfig, dialStats := initDialConfig(
+		controller.config, meekConfig, tacticsProtocol)
 
 	NoticeRequestingTactics(
 		serverEntry.IpAddress,

+ 0 - 107
psiphon/fragmentor.go

@@ -1,107 +0,0 @@
-/*
- * Copyright (c) 2018, Psiphon Inc.
- * All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- *
- */
-
-package psiphon
-
-import (
-	"context"
-	"fmt"
-	"net"
-
-	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
-	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/fragmentor"
-	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
-)
-
-// NewTCPFragmentorDialer creates a TCP dialer that wraps dialed conns in
-// fragmentor.Conn. A single FragmentorProbability coin flip is made and all
-// conns get the same treatment.
-func NewTCPFragmentorDialer(
-	config *DialConfig,
-	tunnelProtocol string,
-	clientParameters *parameters.ClientParameters) Dialer {
-
-	p := clientParameters.Get()
-	coinFlip := p.WeightedCoinFlip(parameters.FragmentorProbability)
-	p = nil
-
-	return func(ctx context.Context, network, addr string) (net.Conn, error) {
-		if network != "tcp" {
-			return nil, common.ContextError(fmt.Errorf("%s unsupported", network))
-		}
-		return DialTCPFragmentor(ctx, addr, config, tunnelProtocol, clientParameters, &coinFlip)
-	}
-}
-
-// DialTCPFragmentor performs a DialTCP and wraps the dialed conn in a
-// fragmentor.Conn, subject to FragmentorProbability and
-// FragmentorLimitProtocols.
-func DialTCPFragmentor(
-	ctx context.Context,
-	addr string,
-	config *DialConfig,
-	tunnelProtocol string,
-	clientParameters *parameters.ClientParameters,
-	oneTimeCoinFlip *bool) (net.Conn, error) {
-
-	conn, err := DialTCP(ctx, addr, config)
-	if err != nil {
-		return nil, common.ContextError(err)
-	}
-
-	p := clientParameters.Get()
-
-	protocols := p.TunnelProtocols(parameters.FragmentorLimitProtocols)
-	if len(protocols) > 0 && !common.Contains(protocols, tunnelProtocol) {
-		return conn, nil
-	}
-
-	var coinFlip bool
-	if oneTimeCoinFlip != nil {
-		coinFlip = *oneTimeCoinFlip
-	} else {
-		coinFlip = p.WeightedCoinFlip(parameters.FragmentorProbability)
-	}
-
-	if coinFlip {
-		return conn, nil
-	}
-
-	totalBytes, err := common.MakeSecureRandomRange(
-		p.Int(parameters.FragmentorMinTotalBytes),
-		p.Int(parameters.FragmentorMaxTotalBytes))
-	if err != nil {
-		totalBytes = 0
-		NoticeAlert("MakeSecureRandomRange failed: %s", common.ContextError(err))
-	}
-
-	if totalBytes == 0 {
-		return conn, nil
-	}
-
-	return fragmentor.NewConn(
-			conn,
-			func(message string) { NoticeInfo(message) },
-			totalBytes,
-			p.Int(parameters.FragmentorMinWriteBytes),
-			p.Int(parameters.FragmentorMaxWriteBytes),
-			p.Duration(parameters.FragmentorMinDelay),
-			p.Duration(parameters.FragmentorMaxDelay)),
-		nil
-}

+ 6 - 16
psiphon/meekConn.go

@@ -235,15 +235,10 @@ func DialMeek(
 
 		scheme = "https"
 
-		tcpDialer := NewTCPFragmentorDialer(
-			dialConfig,
-			meekConfig.ClientTunnelProtocol,
-			meekConfig.ClientParameters)
-
 		tlsConfig := &CustomTLSConfig{
 			ClientParameters:              meekConfig.ClientParameters,
 			DialAddr:                      meekConfig.DialAddress,
-			Dial:                          tcpDialer,
+			Dial:                          NewTCPDialer(dialConfig),
 			SNIServerName:                 meekConfig.SNIServerName,
 			SkipVerify:                    true,
 			TLSProfile:                    meekConfig.TLSProfile,
@@ -345,20 +340,15 @@ func DialMeek(
 			*copyDialConfig = *dialConfig
 			copyDialConfig.UpstreamProxyURL = ""
 
-			dialer = NewTCPFragmentorDialer(
-				copyDialConfig,
-				meekConfig.ClientTunnelProtocol,
-				meekConfig.ClientParameters)
+			dialer = NewTCPDialer(copyDialConfig)
 
 		} else {
 
-			baseDialer := NewTCPFragmentorDialer(
-				dialConfig,
-				meekConfig.ClientTunnelProtocol,
-				meekConfig.ClientParameters)
+			baseDialer := NewTCPDialer(dialConfig)
 
-			// The dialer ignores address that http.Transport will pass in (derived
-			// from the HTTP request URL) and always dials meekConfig.DialAddress.
+			// The dialer ignores any address that http.Transport will pass in
+			// (derived from the HTTP request URL) and always dials
+			// meekConfig.DialAddress.
 			dialer = func(ctx context.Context, network, _ string) (net.Conn, error) {
 				return baseDialer(ctx, network, meekConfig.DialAddress)
 			}

+ 5 - 0
psiphon/net.go

@@ -35,6 +35,7 @@ import (
 
 	"github.com/Psiphon-Labs/dns"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/fragmentor"
 )
 
 const DNS_PORT = 53
@@ -83,6 +84,10 @@ type DialConfig struct {
 	// domain name.
 	// The callback may be invoked by a concurrent goroutine.
 	ResolvedIPCallback func(string)
+
+	// FragmentorConfig specifies whether to layer a fragmentor.Conn on top
+	// of dialed TCP conns, and the fragmentation configuration to use.
+	FragmentorConfig *fragmentor.Config
 }
 
 // NetworkConnectivityChecker defines the interface to the external

+ 7 - 0
psiphon/notice.go

@@ -462,6 +462,13 @@ func noticeWithDialStats(noticeType, ipAddress, region, protocol string, dialSta
 		args = append(args, "QUICDialSNIAddress", dialStats.QUICDialSNIAddress)
 	}
 
+	if dialStats.FragmentorConfig.IsFragmenting() {
+		metrics := dialStats.FragmentorConfig.GetMetrics()
+		for name, value := range metrics {
+			args = append(args, name, value)
+		}
+	}
+
 	singletonNoticeLogger.outputNotice(
 		noticeType, noticeIsDiagnostic,
 		args...)

+ 5 - 0
psiphon/server/api.go

@@ -554,6 +554,11 @@ var baseRequestParams = []requestParamSpec{
 	{"dial_port_number", isIntString, requestParamOptional},
 	{"quic_version", isAnyString, requestParamOptional},
 	{"quic_dial_sni_address", isAnyString, requestParamOptional},
+	{"upstream_bytes_to_fragment", isIntString, requestParamOptional},
+	{"upstream_min_write_bytes", isIntString, requestParamOptional},
+	{"upstream_max_write_bytes", isIntString, requestParamOptional},
+	{"upstream_min_delay", isIntString, requestParamOptional},
+	{"upstream_max_delay", isIntString, requestParamOptional},
 }
 
 func validateRequestParams(

+ 0 - 8
psiphon/server/log.go

@@ -34,14 +34,6 @@ import (
 	"github.com/sirupsen/logrus"
 )
 
-// MetricsSource is an object that provides metrics to be logged
-type MetricsSource interface {
-
-	// GetMetrics returns a LogFields populated with
-	// metrics from the MetricsSource
-	GetMetrics() LogFields
-}
-
 // ContextLogger adds context logging functionality to the
 // underlying logging packages.
 type ContextLogger struct {

+ 8 - 7
psiphon/server/meek.go

@@ -906,9 +906,9 @@ func (session *meekSession) delete(haveLock bool) {
 	}
 }
 
-// GetMetrics implements the MetricsSource interface.
-func (session *meekSession) GetMetrics() LogFields {
-	logFields := make(LogFields)
+// GetMetrics implements the common.MetricsSource interface.
+func (session *meekSession) GetMetrics() common.LogFields {
+	logFields := make(common.LogFields)
 	logFields["meek_client_retries"] = atomic.LoadInt64(&session.metricClientRetries)
 	logFields["meek_peak_response_size"] = atomic.LoadInt64(&session.metricPeakResponseSize)
 	logFields["meek_peak_cached_response_size"] = atomic.LoadInt64(&session.metricPeakCachedResponseSize)
@@ -1375,9 +1375,10 @@ func (conn *meekConn) SetWriteDeadline(t time.Time) error {
 	return common.ContextError(errors.New("not supported"))
 }
 
-// GetMetrics implements the MetricsSource interface. The metrics are maintained
-// in the meek session type; but logTunnel, which calls MetricsSource.GetMetrics,
-// has a pointer only to this conn, so it calls through to the session.
-func (conn *meekConn) GetMetrics() LogFields {
+// GetMetrics implements the common.MetricsSource interface. The metrics are
+// maintained in the meek session type; but logTunnel, which calls
+// MetricsSource.GetMetrics, has a pointer only to this conn, so it calls
+// through to the session.
+func (conn *meekConn) GetMetrics() common.LogFields {
 	return conn.meekSession.GetMetrics()
 }

+ 7 - 3
psiphon/server/tunnelServer.go

@@ -1007,8 +1007,12 @@ func (sshClient *sshClient) run(
 		}
 	}()
 
-	// Some conns report additional metrics
-	metricsSource, isMetricsSource := clientConn.(MetricsSource)
+	// Some conns report additional metrics. Meek conns report resiliency
+	// metrics and fragmentor.Conns report fragmentor configs.
+	//
+	// Limitation: for meek, GetMetrics from underlying fragmentor.Conns
+	// should be called in order to log fragmentor metrics for meek sessions.
+	metricsSource, isMetricsSource := clientConn.(common.MetricsSource)
 
 	// Set initial traffic rules, pre-handshake, based on currently known info.
 	sshClient.setTrafficRules()
@@ -1160,7 +1164,7 @@ func (sshClient *sshClient) run(
 
 	var additionalMetrics LogFields
 	if isMetricsSource {
-		additionalMetrics = metricsSource.GetMetrics()
+		additionalMetrics = LogFields(metricsSource.GetMetrics())
 	}
 	sshClient.logTunnel(additionalMetrics)
 

+ 7 - 0
psiphon/serverApi.go

@@ -783,6 +783,13 @@ func getBaseAPIParameters(
 		params["quic_dial_sni_address"] = dialStats.QUICDialSNIAddress
 	}
 
+	if dialStats.FragmentorConfig.IsFragmenting() {
+		metrics := dialStats.FragmentorConfig.GetMetrics()
+		for name, value := range metrics {
+			params[name] = fmt.Sprintf("%s", value)
+		}
+	}
+
 	return params
 }
 

+ 17 - 13
psiphon/tunnel.go

@@ -36,6 +36,7 @@ import (
 	"github.com/Psiphon-Labs/goarista/monotime"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/ssh"
+	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/fragmentor"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/marionette"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
@@ -134,6 +135,7 @@ type DialStats struct {
 	DialPortNumber                 string
 	QUICVersion                    string
 	QUICDialSNIAddress             string
+	FragmentorConfig               *fragmentor.Config
 }
 
 // ConnectTunnel first makes a network transport connection to the
@@ -694,10 +696,13 @@ func initMeekConfig(
 
 // initDialConfig is a helper that creates a DialConfig for the tunnel.
 func initDialConfig(
-	config *Config, meekConfig *MeekConfig) (*DialConfig, *DialStats) {
+	config *Config,
+	meekConfig *MeekConfig,
+	tunnelProtocol string) (*DialConfig, *DialStats) {
 
-	var upstreamProxyType string
+	p := config.clientParameters.Get()
 
+	var upstreamProxyType string
 	if config.UseUpstreamProxy() {
 		// Note: UpstreamProxyURL will be validated in the dial
 		proxyURL, err := url.Parse(config.UpstreamProxyURL)
@@ -714,9 +719,7 @@ func initDialConfig(
 		}
 	}
 
-	additionalCustomHeaders :=
-		config.clientParameters.Get().HTTPHeaders(parameters.AdditionalCustomHeaders)
-
+	additionalCustomHeaders := p.HTTPHeaders(parameters.AdditionalCustomHeaders)
 	if additionalCustomHeaders != nil {
 		for k, v := range additionalCustomHeaders {
 			dialCustomHeaders[k] = make([]string, len(v))
@@ -725,12 +728,13 @@ func initDialConfig(
 	}
 
 	// Set User-Agent when using meek or an upstream HTTP proxy
-
 	var selectedUserAgent bool
 	if meekConfig != nil || upstreamProxyType == "http" {
 		selectedUserAgent = UserAgentIfUnset(config.clientParameters, dialCustomHeaders)
 	}
 
+	fragmentorConfig := fragmentor.NewUpstreamConfig(p, tunnelProtocol)
+
 	dialConfig := &DialConfig{
 		UpstreamProxyURL:              config.UpstreamProxyURL,
 		CustomHeaders:                 dialCustomHeaders,
@@ -738,9 +742,12 @@ func initDialConfig(
 		DnsServerGetter:               config.DnsServerGetter,
 		IPv6Synthesizer:               config.IPv6Synthesizer,
 		TrustedCACertificatesFilename: config.TrustedCACertificatesFilename,
+		FragmentorConfig:              fragmentorConfig,
 	}
 
-	dialStats := &DialStats{}
+	dialStats := &DialStats{
+		FragmentorConfig: fragmentorConfig,
+	}
 
 	if selectedUserAgent {
 		dialStats.SelectedUserAgent = true
@@ -860,7 +867,7 @@ func dialSsh(
 		}
 	}
 
-	dialConfig, dialStats := initDialConfig(config, meekConfig)
+	dialConfig, dialStats := initDialConfig(config, meekConfig, selectedProtocol)
 
 	if meekConfig != nil {
 		_, dialStats.DialPortNumber, _ = net.SplitHostPort(meekConfig.DialAddress)
@@ -945,13 +952,10 @@ func dialSsh(
 
 	} else {
 
-		dialConn, err = DialTCPFragmentor(
+		dialConn, err = DialTCP(
 			ctx,
 			directDialAddress,
-			dialConfig,
-			selectedProtocol,
-			config.clientParameters,
-			nil)
+			dialConfig)
 		if err != nil {
 			return nil, common.ContextError(err)
 		}