Просмотр исходного кода

Merge pull request #245 from rod-hynes/master

Bug fixes
Rod Hynes 9 лет назад
Родитель
Сommit
75a873cff7
3 измененных файлов с 104 добавлено и 71 удалено
  1. 21 18
      psiphon/meekConn.go
  2. 16 1
      psiphon/server/api.go
  3. 67 52
      psiphon/server/meek.go

+ 21 - 18
psiphon/meekConn.go

@@ -540,23 +540,7 @@ func (meek *MeekConn) readPayload(receivedPayload io.ReadCloser) (totalSize int6
 }
 
 // roundTrip configures and makes the actual HTTP POST request
-func (meek *MeekConn) roundTrip(sendPayload []byte) (receivedPayload io.ReadCloser, err error) {
-	request, err := http.NewRequest("POST", meek.url.String(), bytes.NewReader(sendPayload))
-	if err != nil {
-		return nil, common.ContextError(err)
-	}
-
-	// Don't use the default user agent ("Go 1.1 package http").
-	// For now, just omit the header (net/http/request.go: "may be blank to not send the header").
-	request.Header.Set("User-Agent", "")
-
-	request.Header.Set("Content-Type", "application/octet-stream")
-
-	for name, value := range meek.additionalHeaders {
-		request.Header.Set(name, value)
-	}
-
-	request.AddCookie(meek.cookie)
+func (meek *MeekConn) roundTrip(sendPayload []byte) (io.ReadCloser, error) {
 
 	// The retry mitigates intermittent failures between the client and front/server.
 	//
@@ -570,13 +554,32 @@ func (meek *MeekConn) roundTrip(sendPayload []byte) (receivedPayload io.ReadClos
 	// i.e., as long as the underlying tunnel has not timed out and as long as the server has not
 	// expired the current meek session. Presently not doing this to avoid excessive connection attempts
 	// through the first hop. In addition, this will require additional support for timely shutdown.
-
 	retries := uint(0)
 	retryDeadline := monotime.Now().Add(MEEK_ROUND_TRIP_RETRY_DEADLINE)
 
+	var err error
 	var response *http.Response
 	for {
 
+		var request *http.Request
+		request, err = http.NewRequest("POST", meek.url.String(), bytes.NewReader(sendPayload))
+		if err != nil {
+			// Don't retry when can't initialize a Request
+			break
+		}
+
+		// Don't use the default user agent ("Go 1.1 package http").
+		// For now, just omit the header (net/http/request.go: "may be blank to not send the header").
+		request.Header.Set("User-Agent", "")
+
+		request.Header.Set("Content-Type", "application/octet-stream")
+
+		for name, value := range meek.additionalHeaders {
+			request.Header.Set(name, value)
+		}
+
+		request.AddCookie(meek.cookie)
+
 		// The http.Transport.RoundTrip is run in a goroutine to enable cancelling a request in-flight.
 		type roundTripResponse struct {
 			response *http.Response

+ 16 - 1
psiphon/server/api.go

@@ -307,6 +307,21 @@ func statusAPIRequestHandler(
 			}
 			sessionFields["tunnel_server_ip_address"] = tunnelServerIPAddress
 
+			// Note: older clients won't send establishment_duration
+			if _, ok := tunnelStat["establishment_duration"]; ok {
+
+				strEstablishmentDuration, err := getStringRequestParam(tunnelStat, "establishment_duration")
+				if err != nil {
+					return nil, common.ContextError(err)
+				}
+				establishmentDuration, err := strconv.ParseInt(strEstablishmentDuration, 10, 64)
+				if err != nil {
+					return nil, common.ContextError(err)
+				}
+				// Client reports establishment_duration in nanoseconds; divide to get to milliseconds
+				sessionFields["establishment_duration"] = establishmentDuration / 1000000
+			}
+
 			serverHandshakeTimestamp, err := getStringRequestParam(tunnelStat, "server_handshake_timestamp")
 			if err != nil {
 				return nil, common.ContextError(err)
@@ -321,7 +336,7 @@ func statusAPIRequestHandler(
 			if err != nil {
 				return nil, common.ContextError(err)
 			}
-			// Client reports durations in nanoseconds; divide to get to milliseconds
+			// Client reports duration in nanoseconds; divide to get to milliseconds
 			sessionFields["duration"] = duration / 1000000
 
 			totalBytesSent, err := getInt64RequestParam(tunnelStat, "total_bytes_sent")

+ 67 - 52
psiphon/server/meek.go

@@ -585,51 +585,68 @@ func makeMeekSessionID() (string, error) {
 
 // meekConn implements the net.Conn interface and is to be used as a client
 // connection by the tunnel server (being passed to sshServer.handleClient).
-// meekConn doesn't perform any real I/O, but instead shuttles io.Readers and
-// io.Writers between goroutines blocking on Read()s and Write()s.
+// meekConn bridges net/http request/response payload readers and writers
+// and goroutines calling Read()s and Write()s.
 type meekConn struct {
-	remoteAddr      net.Addr
-	protocolVersion int
-	closeBroadcast  chan struct{}
-	closed          int32
-	readLock        sync.Mutex
-	readyReader     chan io.Reader
-	readResult      chan error
-	writeLock       sync.Mutex
-	nextWriteBuffer chan []byte
-	writeResult     chan error
+	remoteAddr        net.Addr
+	protocolVersion   int
+	closeBroadcast    chan struct{}
+	closed            int32
+	readLock          sync.Mutex
+	emptyReadBuffer   chan *bytes.Buffer
+	partialReadBuffer chan *bytes.Buffer
+	fullReadBuffer    chan *bytes.Buffer
+	writeLock         sync.Mutex
+	nextWriteBuffer   chan []byte
+	writeResult       chan error
 }
 
 func newMeekConn(remoteAddr net.Addr, protocolVersion int) *meekConn {
-	return &meekConn{
-		remoteAddr:      remoteAddr,
-		protocolVersion: protocolVersion,
-		closeBroadcast:  make(chan struct{}),
-		closed:          0,
-		readyReader:     make(chan io.Reader, 1),
-		readResult:      make(chan error, 1),
-		nextWriteBuffer: make(chan []byte, 1),
-		writeResult:     make(chan error, 1),
-	}
+	conn := &meekConn{
+		remoteAddr:        remoteAddr,
+		protocolVersion:   protocolVersion,
+		closeBroadcast:    make(chan struct{}),
+		closed:            0,
+		emptyReadBuffer:   make(chan *bytes.Buffer, 1),
+		partialReadBuffer: make(chan *bytes.Buffer, 1),
+		fullReadBuffer:    make(chan *bytes.Buffer, 1),
+		nextWriteBuffer:   make(chan []byte, 1),
+		writeResult:       make(chan error, 1),
+	}
+	// Read() calls and pumpReads() are synchronized by exchanging control
+	// of a single readBuffer. This is the same scheme used in and described
+	// in psiphon.MeekConn.
+	conn.emptyReadBuffer <- new(bytes.Buffer)
+	return conn
 }
 
 // pumpReads causes goroutines blocking on meekConn.Read() to read
 // from the specified reader. This function blocks until the reader
-// is fully consumed or the meekConn is closed.
-// Note: channel scheme assumes only one concurrent call to pumpReads
+// is fully consumed or the meekConn is closed. A read buffer allows
+// up to MEEK_MAX_PAYLOAD_LENGTH bytes to be read and buffered without
+// a Read() immediately consuming the bytes, but there's still a
+// possibility of a stall if no Read() calls are made after this
+// read buffer is full.
+// Note: assumes only one concurrent call to pumpReads
 func (conn *meekConn) pumpReads(reader io.Reader) error {
+	for {
+
+		var readBuffer *bytes.Buffer
+		select {
+		case readBuffer = <-conn.emptyReadBuffer:
+		case readBuffer = <-conn.partialReadBuffer:
+		case <-conn.closeBroadcast:
+			return io.EOF
+		}
 
-	// Assumes that readyReader won't block.
-	conn.readyReader <- reader
+		limitReader := io.LimitReader(reader, int64(MEEK_MAX_PAYLOAD_LENGTH-readBuffer.Len()))
+		n, err := readBuffer.ReadFrom(limitReader)
 
-	// Receiving readResult means Read(s) have consumed the
-	// reader sent to readyReader. readyReader is now empty and
-	// no reference is kept to the reader.
-	select {
-	case err := <-conn.readResult:
-		return err
-	case <-conn.closeBroadcast:
-		return io.EOF
+		conn.replaceReadBuffer(readBuffer)
+
+		if n == 0 || err != nil {
+			return err
+		}
 	}
 }
 
@@ -641,34 +658,32 @@ func (conn *meekConn) Read(buffer []byte) (int, error) {
 	conn.readLock.Lock()
 	defer conn.readLock.Unlock()
 
-	var reader io.Reader
+	var readBuffer *bytes.Buffer
 	select {
-	case reader = <-conn.readyReader:
+	case readBuffer = <-conn.partialReadBuffer:
+	case readBuffer = <-conn.fullReadBuffer:
 	case <-conn.closeBroadcast:
 		return 0, io.EOF
 	}
 
-	n, err := reader.Read(buffer)
+	n, err := readBuffer.Read(buffer)
 
-	if err != nil {
-		if err == io.EOF {
-			err = nil
-		}
-		// Assumes readerResult won't block.
-		conn.readResult <- err
-	} else {
-		// There may be more data in the reader, but the caller's
-		// buffer is full, so put the reader back into the ready
-		// channel. pumpReads remains blocked waiting for another
-		// Read call.
-		// Note that the reader could be at EOF, while another call is
-		// required to get that result (https://golang.org/pkg/io/#Reader).
-		conn.readyReader <- reader
-	}
+	conn.replaceReadBuffer(readBuffer)
 
 	return n, err
 }
 
+func (conn *meekConn) replaceReadBuffer(readBuffer *bytes.Buffer) {
+	switch readBuffer.Len() {
+	case MEEK_MAX_PAYLOAD_LENGTH:
+		conn.fullReadBuffer <- readBuffer
+	case 0:
+		conn.emptyReadBuffer <- readBuffer
+	default:
+		conn.partialReadBuffer <- readBuffer
+	}
+}
+
 // pumpWrites causes goroutines blocking on meekConn.Write() to write
 // to the specified writer. This function blocks until the meek response
 // body limits (size for protocol v1, turn around time for protocol v2+)