Browse Source

Log listener tunnel protocol and port with irregular_tunnel

Rod Hynes 6 years ago
parent
commit
cc763c3d5e
4 changed files with 94 additions and 34 deletions
  1. 36 27
      psiphon/server/meek.go
  2. 4 0
      psiphon/server/meek_test.go
  3. 8 1
      psiphon/server/services.go
  4. 46 6
      psiphon/server/tunnelServer.go

+ 36 - 27
psiphon/server/meek.go

@@ -97,27 +97,31 @@ const (
 // HTTP payload traffic for a given session into net.Conn conforming Read()s and Write()s via
 // the meekConn struct.
 type MeekServer struct {
-	support               *SupportServices
-	listener              net.Listener
-	tlsConfig             *tris.Config
-	obfuscatorSeedHistory *obfuscator.SeedHistory
-	clientHandler         func(clientTunnelProtocol string, clientConn net.Conn)
-	openConns             *common.Conns
-	stopBroadcast         <-chan struct{}
-	sessionsLock          sync.RWMutex
-	sessions              map[string]*meekSession
-	checksumTable         *crc64.Table
-	bufferPool            *CachedResponseBufferPool
-	rateLimitLock         sync.Mutex
-	rateLimitHistory      map[string][]time.Time
-	rateLimitCount        int
-	rateLimitSignalGC     chan struct{}
+	support                *SupportServices
+	listener               net.Listener
+	listenerTunnelProtocol string
+	listenerPort           int
+	tlsConfig              *tris.Config
+	obfuscatorSeedHistory  *obfuscator.SeedHistory
+	clientHandler          func(clientTunnelProtocol string, clientConn net.Conn)
+	openConns              *common.Conns
+	stopBroadcast          <-chan struct{}
+	sessionsLock           sync.RWMutex
+	sessions               map[string]*meekSession
+	checksumTable          *crc64.Table
+	bufferPool             *CachedResponseBufferPool
+	rateLimitLock          sync.Mutex
+	rateLimitHistory       map[string][]time.Time
+	rateLimitCount         int
+	rateLimitSignalGC      chan struct{}
 }
 
 // NewMeekServer initializes a new meek server.
 func NewMeekServer(
 	support *SupportServices,
 	listener net.Listener,
+	listenerTunnelProtocol string,
+	listenerPort int,
 	useTLS, isFronted, useObfuscatedSessionTickets bool,
 	clientHandler func(clientTunnelProtocol string, clientConn net.Conn),
 	stopBroadcast <-chan struct{}) (*MeekServer, error) {
@@ -137,17 +141,19 @@ func NewMeekServer(
 	bufferPool := NewCachedResponseBufferPool(bufferLength, bufferCount)
 
 	meekServer := &MeekServer{
-		support:               support,
-		listener:              listener,
-		obfuscatorSeedHistory: obfuscator.NewSeedHistory(),
-		clientHandler:         clientHandler,
-		openConns:             common.NewConns(),
-		stopBroadcast:         stopBroadcast,
-		sessions:              make(map[string]*meekSession),
-		checksumTable:         checksumTable,
-		bufferPool:            bufferPool,
-		rateLimitHistory:      make(map[string][]time.Time),
-		rateLimitSignalGC:     make(chan struct{}, 1),
+		support:                support,
+		listener:               listener,
+		listenerTunnelProtocol: listenerTunnelProtocol,
+		listenerPort:           listenerPort,
+		obfuscatorSeedHistory:  obfuscator.NewSeedHistory(),
+		clientHandler:          clientHandler,
+		openConns:              common.NewConns(),
+		stopBroadcast:          stopBroadcast,
+		sessions:               make(map[string]*meekSession),
+		checksumTable:          checksumTable,
+		bufferPool:             bufferPool,
+		rateLimitHistory:       make(map[string][]time.Time),
+		rateLimitSignalGC:      make(chan struct{}, 1),
 	}
 
 	if useTLS {
@@ -876,7 +882,10 @@ func (server *MeekServer) getMeekCookiePayload(
 			SeedHistory: server.obfuscatorSeedHistory,
 			IrregularLogger: func(err error) {
 				logIrregularTunnel(
-					server.support.GeoIPService.Lookup(clientIP), err)
+					server.listenerTunnelProtocol,
+					server.listenerPort,
+					server.support.GeoIPService.Lookup(clientIP),
+					err)
 			},
 		})
 	if err != nil {

+ 4 - 0
psiphon/server/meek_test.go

@@ -276,6 +276,8 @@ func TestMeekResiliency(t *testing.T) {
 	server, err := NewMeekServer(
 		mockSupport,
 		listener,
+		"",
+		0,
 		useTLS,
 		isFronted,
 		useObfuscatedSessionTickets,
@@ -430,6 +432,8 @@ func TestMeekRateLimiter(t *testing.T) {
 	server, err := NewMeekServer(
 		mockSupport,
 		listener,
+		"",
+		0,
 		useTLS,
 		isFronted,
 		useObfuscatedSessionTickets,

+ 8 - 1
psiphon/server/services.go

@@ -358,8 +358,15 @@ func logServerLoad(server *TunnelServer) {
 	}
 }
 
-func logIrregularTunnel(geoIPData GeoIPData, tunnelError error) {
+func logIrregularTunnel(
+	listenerTunnelProtocol string,
+	listenerPort int,
+	geoIPData GeoIPData,
+	tunnelError error) {
+
 	irregularTunnel := getRequestLogFields("irregular_tunnel", geoIPData, nil, nil, nil)
+	irregularTunnel["listener_protocol"] = listenerTunnelProtocol
+	irregularTunnel["listener_port_number"] = listenerPort
 	irregularTunnel["tunnel_error"] = tunnelError.Error()
 	log.LogRawFieldsWithTimestamp(irregularTunnel)
 }

+ 46 - 6
psiphon/server/tunnelServer.go

@@ -433,6 +433,9 @@ func (sshServer *sshServer) runListener(
 	listenerError chan<- error,
 	listenerTunnelProtocol string) {
 
+	_, listenerPortStr, _ := net.SplitHostPort(listener.Addr().String())
+	listenerPort, _ := strconv.Atoi(listenerPortStr)
+
 	runningProtocols := make([]string, 0)
 	for tunnelProtocol := range sshServer.support.Config.TunnelProtocolPorts {
 		runningProtocols = append(runningProtocols, tunnelProtocol)
@@ -474,8 +477,26 @@ func (sshServer *sshServer) runListener(
 			}
 		}
 
-		// process each client connection concurrently
-		go sshServer.handleClient(tunnelProtocol, clientConn)
+		// listenerTunnelProtocol indictes the tunnel protocol run by the listener.
+		// For direct protocols, this is also the client tunnel protocol. For
+		// fronted protocols, the client may use a different protocol to connect to
+		// the front and then only the front-to-Psiphon server will use the listener
+		// protocol.
+		//
+		// A fronted meek client, for example, reports its first hop protocol in
+		// protocol.MeekCookieData.ClientTunnelProtocol. Most metrics record this
+		// value as relay_protocol, since the first hop is the one subject to
+		// adversarial conditions. In some cases, such as irregular tunnels, there
+		// is no ClientTunnelProtocol value available and the listener tunnel
+		// protocol will be logged.
+		//
+		// Similarly, listenerPort indicates the listening port, which is the dialed
+		// port number for direct protocols; while, for fronted protocols, the
+		// client may dial a different port for its first hop.
+
+		// Process each client connection concurrently.
+		go sshServer.handleClient(
+			listenerTunnelProtocol, listenerPort, tunnelProtocol, clientConn)
 	}
 
 	// Note: when exiting due to a unrecoverable error, be sure
@@ -489,6 +510,8 @@ func (sshServer *sshServer) runListener(
 		meekServer, err := NewMeekServer(
 			sshServer.support,
 			listener,
+			listenerTunnelProtocol,
+			listenerPort,
 			protocol.TunnelProtocolUsesMeekHTTPS(listenerTunnelProtocol),
 			protocol.TunnelProtocolUsesFrontedMeek(listenerTunnelProtocol),
 			protocol.TunnelProtocolUsesObfuscatedSessionTickets(listenerTunnelProtocol),
@@ -932,7 +955,9 @@ func (sshServer *sshServer) stopClients() {
 	}
 }
 
-func (sshServer *sshServer) handleClient(tunnelProtocol string, clientConn net.Conn) {
+func (sshServer *sshServer) handleClient(
+	listenerTunnelProtocol string, listenerPort int,
+	tunnelProtocol string, clientConn net.Conn) {
 
 	// Calling clientConn.RemoteAddr at this point, before any Read calls,
 	// satisfies the constraint documented in tapdance.Listen.
@@ -988,7 +1013,8 @@ func (sshServer *sshServer) handleClient(tunnelProtocol string, clientConn net.C
 		}
 	}
 
-	sshClient := newSshClient(sshServer, tunnelProtocol, geoIPData)
+	sshClient := newSshClient(
+		sshServer, listenerTunnelProtocol, listenerPort, tunnelProtocol, geoIPData)
 
 	// sshClient.run _must_ call onSSHHandshakeFinished to release the semaphore:
 	// in any error case; or, as soon as the SSH handshake phase has successfully
@@ -1027,6 +1053,8 @@ func (sshServer *sshServer) monitorPortForwardDialError(err error) {
 type sshClient struct {
 	sync.Mutex
 	sshServer                            *sshServer
+	listenerTunnelProtocol               string
+	listenerPort                         int
 	tunnelProtocol                       string
 	sshConn                              ssh.Conn
 	activityConn                         *common.ActivityMonitoredConn
@@ -1097,7 +1125,11 @@ type handshakeState struct {
 }
 
 func newSshClient(
-	sshServer *sshServer, tunnelProtocol string, geoIPData GeoIPData) *sshClient {
+	sshServer *sshServer,
+	listenerTunnelProtocol string,
+	listenerPort int,
+	tunnelProtocol string,
+	geoIPData GeoIPData) *sshClient {
 
 	runCtx, stopRunning := context.WithCancel(context.Background())
 
@@ -1107,6 +1139,8 @@ func newSshClient(
 
 	client := &sshClient{
 		sshServer:              sshServer,
+		listenerTunnelProtocol: listenerTunnelProtocol,
+		listenerPort:           listenerPort,
 		tunnelProtocol:         tunnelProtocol,
 		geoIPData:              geoIPData,
 		isFirstTunnelInSession: true,
@@ -1234,7 +1268,13 @@ func (sshClient *sshClient) run(
 				conn,
 				sshClient.sshServer.support.Config.ObfuscatedSSHKey,
 				sshClient.sshServer.obfuscatorSeedHistory,
-				func(err error) { logIrregularTunnel(sshClient.geoIPData, err) })
+				func(err error) {
+					logIrregularTunnel(
+						sshClient.listenerTunnelProtocol,
+						sshClient.listenerPort,
+						sshClient.geoIPData,
+						err)
+				})
 
 			if err != nil {
 				err = errors.Trace(err)