Browse Source

Merge pull request #229 from rod-hynes/master

Roll-up recent changes
Rod Hynes 9 years ago
parent
commit
15917c7ed2

+ 6 - 6
psiphon/config.go

@@ -328,10 +328,10 @@ type Config struct {
 	// If omitted, the default value is TUNNEL_CONNECT_TIMEOUT_SECONDS.
 	TunnelConnectTimeoutSeconds *int
 
-	// TunnelPortForwardTimeoutSeconds specifies a timeout per SSH port forward.
-	// Zero value means a port forward will not time out.
+	// TunnelPortForwardDialTimeoutSeconds specifies a dial timeout per SSH port forward.
+	// Zero value means a port forward dial will not time out.
 	// If omitted, the default value is TUNNEL_PORT_FORWARD_DIAL_TIMEOUT_SECONDS.
-	TunnelPortForwardTimeoutSeconds *int
+	TunnelPortForwardDialTimeoutSeconds *int
 
 	// TunnelSshKeepAliveProbeTimeoutSeconds specifies a timeout value for "probe"
 	// SSH keep-alive that is sent upon port forward failure.
@@ -481,9 +481,9 @@ func LoadConfig(configJson []byte) (*Config, error) {
 		config.TunnelConnectTimeoutSeconds = &defaultTunnelConnectTimeoutSeconds
 	}
 
-	if config.TunnelPortForwardTimeoutSeconds == nil {
-		defaultTunnelPortForwardTimeoutSeconds := TUNNEL_PORT_FORWARD_DIAL_TIMEOUT_SECONDS
-		config.TunnelPortForwardTimeoutSeconds = &defaultTunnelPortForwardTimeoutSeconds
+	if config.TunnelPortForwardDialTimeoutSeconds == nil {
+		TunnelPortForwardDialTimeoutSeconds := TUNNEL_PORT_FORWARD_DIAL_TIMEOUT_SECONDS
+		config.TunnelPortForwardDialTimeoutSeconds = &TunnelPortForwardDialTimeoutSeconds
 	}
 
 	if config.TunnelSshKeepAliveProbeTimeoutSeconds == nil {

+ 8 - 8
psiphon/server/api.go

@@ -125,13 +125,13 @@ func handshakeAPIRequestHandler(
 		return nil, common.ContextError(err)
 	}
 
-	log.WithContextFields(
+	log.LogRawFieldsWithTimestamp(
 		getRequestLogFields(
 			support,
 			"handshake",
 			geoIPData,
 			params,
-			baseRequestParams)).Info("API event")
+			baseRequestParams))
 
 	// TODO: share struct definition with psiphon/serverApi.go?
 	var handshakeResponse struct {
@@ -197,13 +197,13 @@ func connectedAPIRequestHandler(
 		return nil, common.ContextError(err)
 	}
 
-	log.WithContextFields(
+	log.LogRawFieldsWithTimestamp(
 		getRequestLogFields(
 			support,
 			"connected",
 			geoIPData,
 			params,
-			connectedRequestParams)).Info("API event")
+			connectedRequestParams))
 
 	var connectedResponse struct {
 		ConnectedTimestamp string `json:"connected_timestamp"`
@@ -256,7 +256,7 @@ func statusAPIRequestHandler(
 	bytesTransferredFields := getRequestLogFields(
 		support, "bytes_transferred", geoIPData, params, statusRequestParams)
 	bytesTransferredFields["bytes"] = bytesTransferred
-	log.WithContextFields(bytesTransferredFields).Info("API event")
+	log.LogRawFieldsWithTimestamp(bytesTransferredFields)
 
 	// Domain bytes transferred stats
 	// Older clients may not submit this data
@@ -272,7 +272,7 @@ func statusAPIRequestHandler(
 		for domain, bytes := range hostBytes {
 			domainBytesFields["domain"] = domain
 			domainBytesFields["bytes"] = bytes
-			log.WithContextFields(domainBytesFields).Info("API event")
+			log.LogRawFieldsWithTimestamp(domainBytesFields)
 		}
 	}
 
@@ -336,7 +336,7 @@ func statusAPIRequestHandler(
 			}
 			sessionFields["total_bytes_received"] = totalBytesReceived
 
-			log.WithContextFields(sessionFields).Info("API event")
+			log.LogRawFieldsWithTimestamp(sessionFields)
 		}
 	}
 
@@ -400,7 +400,7 @@ func clientVerificationAPIRequestHandler(
 			logFields["safetynet_check"] = safetyNetCheckLogs
 		}
 
-		log.WithContextFields(logFields).Info("API event")
+		log.LogRawFieldsWithTimestamp(logFields)
 
 		if verified {
 			// TODO: change throttling treatment

+ 1 - 1
psiphon/server/config.go

@@ -32,10 +32,10 @@ import (
 	"strconv"
 	"strings"
 
+	"github.com/Psiphon-Inc/crypto/ssh"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"golang.org/x/crypto/nacl/box"
-	"golang.org/x/crypto/ssh"
 )
 
 const (

+ 70 - 2
psiphon/server/log.go

@@ -20,6 +20,8 @@
 package server
 
 import (
+	"encoding/json"
+	"fmt"
 	"io"
 	"os"
 
@@ -60,12 +62,78 @@ func (logger *ContextLogger) WithContextFields(fields LogFields) *logrus.Entry {
 	return log.WithFields(logrus.Fields(fields))
 }
 
+// LogRawFieldsWithTimestamp directly logs the supplied fields adding only
+// an additional "timestamp" field. The stock "msg" and "level" fields are
+// omitted. This log is emitted at the Error level. This function exists to
+// support API logs which have neither a natural message nor severity; and
+// omitting these values here makes it easier to ship these logs to existing
+// API log consumers.
+func (logger *ContextLogger) LogRawFieldsWithTimestamp(fields LogFields) {
+	logger.WithFields(logrus.Fields(fields)).Error(
+		customJSONFormatterLogRawFieldsWithTimestamp)
+}
+
 // NewLogWriter returns an io.PipeWriter that can be used to write
 // to the global logger. Caller must Close() the writer.
 func NewLogWriter() *io.PipeWriter {
 	return log.Writer()
 }
 
+// CustomJSONFormatter is a customized version of logrus.JSONFormatter
+type CustomJSONFormatter struct {
+}
+
+const customJSONFormatterLogRawFieldsWithTimestamp = "CustomJSONFormatter.LogRawFieldsWithTimestamp"
+
+// Format implements logrus.Formatter. This is a customized version
+// of the standard logrus.JSONFormatter adapted from:
+// https://github.com/Sirupsen/logrus/blob/f1addc29722ba9f7651bc42b4198d0944b66e7c4/json_formatter.go
+//
+// The changes are:
+// - "time" is renamed to "timestamp"
+// - there's an option to omit the standard "msg" and "level" fields
+//
+func (f *CustomJSONFormatter) Format(entry *logrus.Entry) ([]byte, error) {
+	data := make(logrus.Fields, len(entry.Data)+3)
+	for k, v := range entry.Data {
+		switch v := v.(type) {
+		case error:
+			// Otherwise errors are ignored by `encoding/json`
+			// https://github.com/Sirupsen/logrus/issues/137
+			data[k] = v.Error()
+		default:
+			data[k] = v
+		}
+	}
+
+	if t, ok := data["timestamp"]; ok {
+		data["fields.timestamp"] = t
+	}
+
+	data["timestamp"] = entry.Time.Format(logrus.DefaultTimestampFormat)
+
+	if entry.Message != customJSONFormatterLogRawFieldsWithTimestamp {
+
+		if m, ok := data["msg"]; ok {
+			data["fields.msg"] = m
+		}
+
+		if l, ok := data["level"]; ok {
+			data["fields.level"] = l
+		}
+
+		data["msg"] = entry.Message
+		data["level"] = entry.Level.String()
+	}
+
+	serialized, err := json.Marshal(data)
+	if err != nil {
+		return nil, fmt.Errorf("Failed to marshal fields to JSON, %v", err)
+	}
+
+	return append(serialized, '\n'), nil
+}
+
 var log *ContextLogger
 
 // InitLogging configures a logger according to the specified
@@ -93,7 +161,7 @@ func InitLogging(config *Config) error {
 	log = &ContextLogger{
 		&logrus.Logger{
 			Out:       logWriter,
-			Formatter: &logrus.JSONFormatter{},
+			Formatter: &CustomJSONFormatter{},
 			Level:     level,
 		},
 	}
@@ -105,7 +173,7 @@ func init() {
 	log = &ContextLogger{
 		&logrus.Logger{
 			Out:       os.Stderr,
-			Formatter: &logrus.JSONFormatter{},
+			Formatter: &CustomJSONFormatter{},
 			Hooks:     make(logrus.LevelHooks),
 			Level:     logrus.DebugLevel,
 		},

+ 16 - 16
psiphon/server/meek.go

@@ -233,11 +233,11 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 		return
 	}
 
-	// PumpReads causes a TunnelServer/SSH goroutine blocking on a Read to
+	// pumpReads causes a TunnelServer/SSH goroutine blocking on a Read to
 	// read the request body as upstream traffic.
-	// TODO: run PumpReads and PumpWrites concurrently?
+	// TODO: run pumpReads and pumpWrites concurrently?
 
-	err = session.clientConn.PumpReads(request.Body)
+	err = session.clientConn.pumpReads(request.Body)
 	if err != nil {
 		if err != io.EOF {
 			log.WithContextFields(LogFields{"error": err}).Warning("pump reads failed")
@@ -257,10 +257,10 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request
 		session.sessionIDSent = true
 	}
 
-	// PumpWrites causes a TunnelServer/SSH goroutine blocking on a Write to
+	// pumpWrites causes a TunnelServer/SSH goroutine blocking on a Write to
 	// write its downstream traffic through to the response body.
 
-	err = session.clientConn.PumpWrites(responseWriter)
+	err = session.clientConn.pumpWrites(responseWriter)
 	if err != nil {
 		if err != io.EOF {
 			log.WithContextFields(LogFields{"error": err}).Warning("pump writes failed")
@@ -612,11 +612,11 @@ func newMeekConn(remoteAddr net.Addr, protocolVersion int) *meekConn {
 	}
 }
 
-// PumpReads causes goroutines blocking on meekConn.Read() to read
+// pumpReads causes goroutines blocking on meekConn.Read() to read
 // from the specified reader. This function blocks until the reader
 // is fully consumed or the meekConn is closed.
-// Note: channel scheme assumes only one concurrent call to PumpReads
-func (conn *meekConn) PumpReads(reader io.Reader) error {
+// Note: channel scheme assumes only one concurrent call to pumpReads
+func (conn *meekConn) pumpReads(reader io.Reader) error {
 
 	// Assumes that readyReader won't block.
 	conn.readyReader <- reader
@@ -634,7 +634,7 @@ func (conn *meekConn) PumpReads(reader io.Reader) error {
 
 // Read reads from the meekConn into buffer. Read blocks until
 // some data is read or the meekConn closes. Under the hood, it
-// waits for PumpReads to submit a reader to read from.
+// waits for pumpReads to submit a reader to read from.
 // Note: lock is to conform with net.Conn concurrency semantics
 func (conn *meekConn) Read(buffer []byte) (int, error) {
 	conn.readLock.Lock()
@@ -658,7 +658,7 @@ func (conn *meekConn) Read(buffer []byte) (int, error) {
 	} else {
 		// There may be more data in the reader, but the caller's
 		// buffer is full, so put the reader back into the ready
-		// channel. PumpReads remains blocked waiting for another
+		// channel. pumpReads remains blocked waiting for another
 		// Read call.
 		// Note that the reader could be at EOF, while another call is
 		// required to get that result (https://golang.org/pkg/io/#Reader).
@@ -668,12 +668,12 @@ func (conn *meekConn) Read(buffer []byte) (int, error) {
 	return n, err
 }
 
-// PumpReads causes goroutines blocking on meekConn.Write() to write
+// pumpWrites causes goroutines blocking on meekConn.Write() to write
 // to the specified writer. This function blocks until the meek response
 // body limits (size for protocol v1, turn around time for protocol v2+)
 // are met, or the meekConn is closed.
-// Note: channel scheme assumes only one concurrent call to PumpWrites
-func (conn *meekConn) PumpWrites(writer io.Writer) error {
+// Note: channel scheme assumes only one concurrent call to pumpWrites
+func (conn *meekConn) pumpWrites(writer io.Writer) error {
 
 	startTime := time.Now()
 	timeout := time.NewTimer(MEEK_TURN_AROUND_TIMEOUT)
@@ -713,7 +713,7 @@ func (conn *meekConn) PumpWrites(writer io.Writer) error {
 
 // Write writes the buffer to the meekConn. It blocks until the
 // entire buffer is written to or the meekConn closes. Under the
-// hood, it waits for sufficient PumpWrites calls to consume the
+// hood, it waits for sufficient pumpWrites calls to consume the
 // write buffer.
 // Note: lock is to conform with net.Conn concurrency semantics
 func (conn *meekConn) Write(buffer []byte) (int, error) {
@@ -721,7 +721,7 @@ func (conn *meekConn) Write(buffer []byte) (int, error) {
 	defer conn.writeLock.Unlock()
 
 	// TODO: may be more efficient to send whole buffer
-	// and have PumpWrites stash partial buffer when can't
+	// and have pumpWrites stash partial buffer when can't
 	// send it all.
 
 	n := 0
@@ -756,7 +756,7 @@ func (conn *meekConn) Write(buffer []byte) (int, error) {
 }
 
 // Close closes the meekConn. This will interrupt any blocked
-// Read, Write, PumpReads, and PumpWrites.
+// Read, Write, pumpReads, and pumpWrites.
 func (conn *meekConn) Close() error {
 	if atomic.CompareAndSwapInt32(&conn.closed, 0, 1) {
 		close(conn.closeBroadcast)

+ 2 - 2
psiphon/server/tunnelServer.go

@@ -30,9 +30,9 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/Psiphon-Inc/crypto/ssh"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
-	"golang.org/x/crypto/ssh"
 )
 
 const (
@@ -479,7 +479,7 @@ func (sshServer *sshServer) handleClient(tunnelProtocol string, clientConn net.C
 			// TODO: ensure this won't block shutdown
 			conn, result.err = psiphon.NewObfuscatedSshConn(
 				psiphon.OBFUSCATION_CONN_MODE_SERVER,
-				clientConn,
+				conn,
 				sshServer.support.Config.ObfuscatedSSHKey)
 			if result.err != nil {
 				result.err = common.ContextError(result.err)

+ 1 - 1
psiphon/server/udp.go

@@ -30,8 +30,8 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/Psiphon-Inc/crypto/ssh"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
-	"golang.org/x/crypto/ssh"
 )
 
 // setUDPChannel sets the single UDP channel for this sshClient.

+ 3 - 3
psiphon/tunnel.go

@@ -32,10 +32,10 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/Psiphon-Inc/crypto/ssh"
 	regen "github.com/Psiphon-Inc/goregen"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
 	"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/transferstats"
-	"golang.org/x/crypto/ssh"
 )
 
 // Tunneler specifies the interface required by components that use a tunnel.
@@ -268,8 +268,8 @@ func (tunnel *Tunnel) Dial(
 		err                error
 	}
 	resultChannel := make(chan *tunnelDialResult, 2)
-	if *tunnel.config.TunnelPortForwardTimeoutSeconds > 0 {
-		time.AfterFunc(time.Duration(*tunnel.config.TunnelPortForwardTimeoutSeconds)*time.Second, func() {
+	if *tunnel.config.TunnelPortForwardDialTimeoutSeconds > 0 {
+		time.AfterFunc(time.Duration(*tunnel.config.TunnelPortForwardDialTimeoutSeconds)*time.Second, func() {
 			resultChannel <- &tunnelDialResult{nil, errors.New("tunnel dial timeout")}
 		})
 	}