Sfoglia il codice sorgente

Don't require an empty buffer to read a meek request

- This restores the property that some previous
  request payload can remain unconsumed without
  blocking a request.

- Read buffers may require additional memory.
Rod Hynes 9 anni fa
parent
commit
aa9c1194a6
2 ha cambiato i file con 22 aggiunte e 10 eliminazioni
  1. 1 1
      psiphon/server/config.go
  2. 21 9
      psiphon/server/meek.go

+ 1 - 1
psiphon/server/config.go

@@ -205,7 +205,7 @@ type Config struct {
 	// buffers. Shared buffers are allocated on first use and remain
 	// allocated, so shared buffer count * size is roughly the memory
 	// overhead of this facility.
-	// A default of 1024 is used when MeekCachedResponsePoolBufferCount
+	// A default of 2048 is used when MeekCachedResponsePoolBufferCount
 	// is 0.
 	MeekCachedResponsePoolBufferCount int
 

+ 21 - 9
psiphon/server/meek.go

@@ -73,7 +73,7 @@ const (
 	MEEK_MAX_SESSION_ID_LENGTH          = 20
 	MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH = 65536
 	MEEK_DEFAULT_POOL_BUFFER_LENGTH     = 65536
-	MEEK_DEFAULT_POOL_BUFFER_COUNT      = 1024
+	MEEK_DEFAULT_POOL_BUFFER_COUNT      = 2048
 )
 
 // MeekServer implements the meek protocol, which tunnels TCP traffic (in the case of Psiphon,
@@ -860,13 +860,24 @@ func (conn *meekConn) pumpReads(reader io.Reader) error {
 	// read. Now, the consumer, the Read() caller, must fully drain the
 	// read buffer first.
 
+	// Use either an empty or partial buffer. By using a partial
+	// buffer, pumpReads will not block if the Read() caller has
+	// not fully drained the read buffer.
+
 	var readBuffer *bytes.Buffer
 	select {
 	case readBuffer = <-conn.emptyReadBuffer:
+	case readBuffer = <-conn.partialReadBuffer:
 	case <-conn.closeBroadcast:
 		return io.EOF
 	}
 
+	newDataOffset := readBuffer.Len()
+
+	// Since we need to read the full request payload in order to
+	// take its checksum before relaying it, the read buffer can
+	// grow to up to 2 x MEEK_MAX_REQUEST_PAYLOAD_LENGTH + 1.
+
 	// +1 allows for an explict check for request payloads that
 	// exceed the maximum permitted length.
 	limitReader := io.LimitReader(reader, MEEK_MAX_REQUEST_PAYLOAD_LENGTH+1)
@@ -876,11 +887,11 @@ func (conn *meekConn) pumpReads(reader io.Reader) error {
 		err = errors.New("invalid request payload length")
 	}
 
-	// If the request read fails, don't relay any data. This allows
+	// If the request read fails, don't relay the new data. This allows
 	// the client to retry and resend its request payload without
 	// interrupting/duplicating the payload flow.
 	if err != nil {
-		readBuffer.Reset()
+		readBuffer.Truncate(newDataOffset)
 		conn.replaceReadBuffer(readBuffer)
 		return common.ContextError(err)
 	}
@@ -893,12 +904,13 @@ func (conn *meekConn) pumpReads(reader io.Reader) error {
 	// the underlying SSH connection will fail and the client
 	// must reconnect.
 
-	checksum := crc64.Checksum(readBuffer.Bytes(), conn.meekServer.checksumTable)
+	checksum := crc64.Checksum(
+		readBuffer.Bytes()[newDataOffset:], conn.meekServer.checksumTable)
 
 	if conn.lastReadChecksum == nil {
 		conn.lastReadChecksum = new(uint64)
 	} else if *conn.lastReadChecksum == checksum {
-		readBuffer.Reset()
+		readBuffer.Truncate(newDataOffset)
 	}
 
 	*conn.lastReadChecksum = checksum
@@ -934,12 +946,12 @@ func (conn *meekConn) Read(buffer []byte) (int, error) {
 }
 
 func (conn *meekConn) replaceReadBuffer(readBuffer *bytes.Buffer) {
-	switch readBuffer.Len() {
-	case MEEK_MAX_REQUEST_PAYLOAD_LENGTH:
+	length := readBuffer.Len()
+	if length >= MEEK_MAX_REQUEST_PAYLOAD_LENGTH {
 		conn.fullReadBuffer <- readBuffer
-	case 0:
+	} else if length == 0 {
 		conn.emptyReadBuffer <- readBuffer
-	default:
+	} else {
 		conn.partialReadBuffer <- readBuffer
 	}
 }