Просмотр исходного кода

Fixed: poor meek performance due sending only one Write() buffer per round trip instead of consolidating buffers up to max payload size. Now using a single, synchronized write buffer. Also implemented blocking on full receive buffer.

Rod Hynes 11 лет назад
Родитель
Сommit
fbd5a23b90
2 измененных файлов с 123 добавлено и 75 удалено
  1. 1 0
      README.md
  2. 122 75
      psiphon/meekConn.go

+ 1 - 0
README.md

@@ -32,6 +32,7 @@ This project is currently at the proof-of-concept stage. Current production Psip
   * unfronted meek almost makes this obsolete, since meek sessions survive underlying
      HTTP transport socket disconnects. The client could prefer unfronted meek protocol
      when handshake returns a preemptive_reconnect_lifetime_milliseconds.
+* split tunnel support
 * implement page view stats
 * implement local traffic stats (e.g., to display bytes sent/received)
 * control interface (w/ event messages)?

+ 122 - 75
psiphon/meekConn.go

@@ -43,13 +43,14 @@ import (
 // https://bitbucket.org/psiphon/psiphon-circumvention-system/src/default/go/meek-client/meek-client.go
 
 const (
-	MEEK_PROTOCOL_VERSION     = 1
-	MEEK_COOKIE_MAX_PADDING   = 32
-	MAX_SEND_PAYLOAD_LENGTH   = 65536
-	READ_PAYLOAD_CHUNK_LENGTH = 65536
-	MIN_POLL_INTERVAL         = 100 * time.Millisecond
-	MAX_POLL_INTERVAL         = 5 * time.Second
-	POLL_INTERNAL_MULTIPLIER  = 1.5
+	MEEK_PROTOCOL_VERSION      = 1
+	MEEK_COOKIE_MAX_PADDING    = 32
+	MAX_SEND_PAYLOAD_LENGTH    = 65536
+	FULL_RECEIVE_BUFFER_LENGTH = 2097152
+	READ_PAYLOAD_CHUNK_LENGTH  = 65536
+	MIN_POLL_INTERVAL          = 100 * time.Millisecond
+	MAX_POLL_INTERVAL          = 5 * time.Second
+	POLL_INTERNAL_MULTIPLIER   = 1.5
 )
 
 // MeekConn is a network connection that tunnels TCP over HTTP and supports "fronting". Meek sends
@@ -64,18 +65,21 @@ const (
 // MeekConn also operates in unfronted mode, in which plain HTTP connections are made without routing
 // through a CDN.
 type MeekConn struct {
-	url                 *url.URL
-	cookie              *http.Cookie
-	pendingConns        *PendingConns
-	transport           *http.Transport
-	mutex               sync.Mutex
-	isClosed            bool
-	closedSignal        chan struct{}
-	broadcastClosed     chan struct{}
-	relayWaitGroup      *sync.WaitGroup
-	availableReadBuffer chan *bytes.Buffer
-	emptyReadBuffer     chan *bytes.Buffer
-	writeQueue          chan []byte
+	url                  *url.URL
+	cookie               *http.Cookie
+	pendingConns         *PendingConns
+	transport            *http.Transport
+	mutex                sync.Mutex
+	isClosed             bool
+	closedSignal         chan struct{}
+	broadcastClosed      chan struct{}
+	relayWaitGroup       *sync.WaitGroup
+	emptyReceiveBuffer   chan *bytes.Buffer
+	partialReceiveBuffer chan *bytes.Buffer
+	fullReceiveBuffer    chan *bytes.Buffer
+	emptySendBuffer      chan *bytes.Buffer
+	partialSendBuffer    chan *bytes.Buffer
+	fullSendBuffer       chan *bytes.Buffer
 }
 
 // NewMeekConn returns an initialized meek connection. A meek connection is
@@ -133,30 +137,35 @@ func NewMeekConn(
 	// A MeekConn implements net.Conn concurrency semantics:
 	// "Multiple goroutines may invoke methods on a Conn simultaneously."
 	//
-	// Write() calls and relay() are synchronized with the writeQueue channel. Write sends
-	// payloads into the writeQueue, blocking when a payload is already in the queue as only
-	// one HTTP request is in flight at a time (the channel size is 1).
-	//
-	// Read() calls and relay() are synchronized by passing control of a single readBuffer
-	// (bytes.Buffer). This single buffer may be in the emptyReadBuffer channel (when it is
-	// available and empty), the availableReadBuffer channel (when it is available and contains
-	// data), or "checked out" by relay or Read when they are are writing to or reading from the
-	// buffer, respectively. relay will obtain the buffer from either channel, but Read will only
-	// obtain the buffer from availableReadBuffer, so it blocks when there is no data available
-	// to read.
+	// Read() calls and relay() are synchronized by exchanging control of a single
+	// receiveBuffer (bytes.Buffer). This single buffer may be:
+	// - in the emptyReceiveBuffer channel when it is available and empty;
+	// - in the partialReadBuffer channel when it is available and contains data;
+	// - in the fullReadBuffer channel when it is available and full of data;
+	// - "checked out" by relay or Read when they are are writing to or reading from the
+	//   buffer, respectively.
+	// relay() will obtain the buffer from either the empty or partial channel but block when
+	// the buffer is full. Read will obtain the buffer from the partial or full channel when
+	// there is data to read but block when the buffer is empty.
+	// Write() calls and relay() are synchronized in a similar way, using a single
+	// sendBuffer.
 	meek = &MeekConn{
-		url:                 url,
-		cookie:              cookie,
-		pendingConns:        pendingConns,
-		transport:           transport,
-		broadcastClosed:     make(chan struct{}),
-		relayWaitGroup:      new(sync.WaitGroup),
-		availableReadBuffer: make(chan *bytes.Buffer, 1),
-		emptyReadBuffer:     make(chan *bytes.Buffer, 1),
-		writeQueue:          make(chan []byte, 1),
+		url:                  url,
+		cookie:               cookie,
+		pendingConns:         pendingConns,
+		transport:            transport,
+		broadcastClosed:      make(chan struct{}),
+		relayWaitGroup:       new(sync.WaitGroup),
+		emptyReceiveBuffer:   make(chan *bytes.Buffer, 1),
+		partialReceiveBuffer: make(chan *bytes.Buffer, 1),
+		fullReceiveBuffer:    make(chan *bytes.Buffer, 1),
+		emptySendBuffer:      make(chan *bytes.Buffer, 1),
+		partialSendBuffer:    make(chan *bytes.Buffer, 1),
+		fullSendBuffer:       make(chan *bytes.Buffer, 1),
 	}
 	// TODO: benchmark bytes.Buffer vs. built-in append with slices?
-	meek.emptyReadBuffer <- new(bytes.Buffer)
+	meek.emptyReceiveBuffer <- new(bytes.Buffer)
+	meek.emptySendBuffer <- new(bytes.Buffer)
 	meek.relayWaitGroup.Add(1)
 	go meek.relay()
 	return meek, nil
@@ -209,18 +218,17 @@ func (meek *MeekConn) Read(buffer []byte) (n int, err error) {
 	if meek.closed() {
 		return 0, ContextError(errors.New("meek connection is closed"))
 	}
+	// Block until there is received data to consume
+	var receiveBuffer *bytes.Buffer
 	select {
-	case readBuffer := <-meek.availableReadBuffer:
-		n, err = readBuffer.Read(buffer)
-		if readBuffer.Len() > 0 {
-			meek.availableReadBuffer <- readBuffer
-		} else {
-			meek.emptyReadBuffer <- readBuffer
-		}
-		return n, err
+	case receiveBuffer = <-meek.partialReceiveBuffer:
+	case receiveBuffer = <-meek.fullReceiveBuffer:
 	case <-meek.broadcastClosed:
 		return 0, ContextError(errors.New("meek connection has closed"))
 	}
+	n, err = receiveBuffer.Read(buffer)
+	meek.replaceReceiveBuffer(receiveBuffer)
+	return n, err
 }
 
 // Write writes data to the connection.
@@ -229,25 +237,28 @@ func (meek *MeekConn) Write(buffer []byte) (n int, err error) {
 	if meek.closed() {
 		return 0, ContextError(errors.New("meek connection is closed"))
 	}
+	// Repeats until all n bytes are written
 	n = len(buffer)
-	// The data to send is split into MAX_SEND_PAYLOAD_LENGTH chunks as
-	// this is the most that will be sent per HTTP request.
 	for len(buffer) > 0 {
-		nextWrite := MAX_SEND_PAYLOAD_LENGTH
-		if len(buffer) < nextWrite {
-			nextWrite = len(buffer)
-		}
-		// TODO: pool of reusable buffers?
-		queuedWrite := make([]byte, nextWrite)
-		copy(queuedWrite, buffer)
-		buffer = buffer[nextWrite:]
+		// Block until there is capacity in the send buffer
+		var sendBuffer *bytes.Buffer
 		select {
-		case meek.writeQueue <- queuedWrite:
+		case sendBuffer = <-meek.emptySendBuffer:
+		case sendBuffer = <-meek.partialSendBuffer:
 		case <-meek.broadcastClosed:
 			return 0, ContextError(errors.New("meek connection has closed"))
 		}
+		writeLen := MAX_SEND_PAYLOAD_LENGTH - sendBuffer.Len()
+		if writeLen > 0 {
+			if writeLen > len(buffer) {
+				writeLen = len(buffer)
+			}
+			_, err = sendBuffer.Write(buffer[:writeLen])
+			buffer = buffer[writeLen:]
+		}
+		meek.replaceSendBuffer(sendBuffer)
 	}
-	return n, nil
+	return n, err
 }
 
 // Stub implementation of net.Conn.LocalAddr
@@ -275,6 +286,28 @@ func (meek *MeekConn) SetWriteDeadline(t time.Time) error {
 	return ContextError(errors.New("not supported"))
 }
 
+func (meek *MeekConn) replaceReceiveBuffer(receiveBuffer *bytes.Buffer) {
+	switch {
+	case receiveBuffer.Len() == 0:
+		meek.emptyReceiveBuffer <- receiveBuffer
+	case receiveBuffer.Len() >= FULL_RECEIVE_BUFFER_LENGTH:
+		meek.fullReceiveBuffer <- receiveBuffer
+	default:
+		meek.partialReceiveBuffer <- receiveBuffer
+	}
+}
+
+func (meek *MeekConn) replaceSendBuffer(sendBuffer *bytes.Buffer) {
+	switch {
+	case sendBuffer.Len() == 0:
+		meek.emptySendBuffer <- sendBuffer
+	case sendBuffer.Len() >= MAX_SEND_PAYLOAD_LENGTH:
+		meek.fullSendBuffer <- sendBuffer
+	default:
+		meek.partialSendBuffer <- sendBuffer
+	}
+}
+
 // relay sends and receives tunnelled traffic (payload). An HTTP request is
 // triggered when data is in the write queue or at a polling interval.
 // There's a geometric increase, up to a maximum, in the polling interval when
@@ -282,16 +315,30 @@ func (meek *MeekConn) SetWriteDeadline(t time.Time) error {
 func (meek *MeekConn) relay() {
 	defer meek.relayWaitGroup.Done()
 	interval := MIN_POLL_INTERVAL
-	var sendPayload []byte
+	var sendPayload = make([]byte, MAX_SEND_PAYLOAD_LENGTH)
 	for {
-		sendPayload = nil
+		// Block until there is payload to send or it is time to poll
+		var sendBuffer *bytes.Buffer
 		select {
-		case sendPayload = <-meek.writeQueue:
+		case sendBuffer = <-meek.partialSendBuffer:
+		case sendBuffer = <-meek.fullSendBuffer:
 		case <-time.After(interval):
+			// In the polling case, send an empty payload
 		case <-meek.broadcastClosed:
 			return
 		}
-		receivedPayload, err := meek.roundTrip(sendPayload)
+		sendPayloadSize := 0
+		if sendBuffer != nil {
+			var err error
+			sendPayloadSize, err = sendBuffer.Read(sendPayload)
+			meek.replaceSendBuffer(sendBuffer)
+			if err != nil {
+				Notice(NOTICE_ALERT, "%s", ContextError(err))
+				meek.Close()
+				return
+			}
+		}
+		receivedPayload, err := meek.roundTrip(sendPayload[:sendPayloadSize])
 		if err != nil {
 			Notice(NOTICE_ALERT, "%s", ContextError(err))
 			meek.Close()
@@ -303,7 +350,7 @@ func (meek *MeekConn) relay() {
 			meek.Close()
 			return
 		}
-		if receivedPayloadSize > 0 || sendPayload != nil {
+		if receivedPayloadSize > 0 || sendPayloadSize > 0 {
 			interval = 0
 		} else if interval == 0 {
 			interval = MIN_POLL_INTERVAL
@@ -324,22 +371,22 @@ func (meek *MeekConn) readPayload(receivedPayload io.ReadCloser) (totalSize int6
 	totalSize = 0
 	for {
 		reader := io.LimitReader(receivedPayload, READ_PAYLOAD_CHUNK_LENGTH)
-		var readBuffer *bytes.Buffer
+		// Block until there is capacity in the receive buffer
+		var receiveBuffer *bytes.Buffer
 		select {
-		case readBuffer = <-meek.availableReadBuffer:
-		case readBuffer = <-meek.emptyReadBuffer:
+		case receiveBuffer = <-meek.emptyReceiveBuffer:
+		case receiveBuffer = <-meek.partialReceiveBuffer:
+		case <-meek.broadcastClosed:
+			return 0, nil
 		}
-		// TODO: block when readBuffer is too large?
-		n, err := readBuffer.ReadFrom(reader)
+		// Note: receiveBuffer size may exceed FULL_RECEIVE_BUFFER_LENGTH by up to the size
+		// of one received payload. The FULL_RECEIVE_BUFFER_LENGTH value is just a threshold.
+		n, err := receiveBuffer.ReadFrom(reader)
+		meek.replaceReceiveBuffer(receiveBuffer)
 		if err != nil {
 			return 0, ContextError(err)
 		}
 		totalSize += n
-		if readBuffer.Len() > 0 {
-			meek.availableReadBuffer <- readBuffer
-		} else {
-			meek.emptyReadBuffer <- readBuffer
-		}
 		if n == 0 {
 			break
 		}