Просмотр исходного кода

Fix: race condition accessing meek sendBuffer
- must await Close of Body by RoundTrip before
buffer can be safely reused

Rod Hynes 8 лет назад
Родитель
Сommit
045d881a4a
1 измененных файлов с 50 добавлено и 8 удалено
  1. 50 8
      psiphon/meekConn.go

+ 50 - 8
psiphon/meekConn.go

@@ -600,6 +600,40 @@ func (meek *MeekConn) relay() {
 	}
 }
 
+// readCloseSignaller is an io.ReadCloser wrapper for an io.Reader
+// that is passed, as the request body, to http.Transport.RoundTrip.
+// readCloseSignaller adds the AwaitClosed call, which is used
+// to schedule recycling the buffer underlying the reader only after
+// RoundTrip has called Close and will no longer use the buffer.
+// See: https://golang.org/pkg/net/http/#RoundTripper
+type readCloseSignaller struct {
+	reader io.Reader
+	closed chan struct{}
+}
+
+func NewReadCloseSignaller(reader io.Reader) *readCloseSignaller {
+	return &readCloseSignaller{
+		reader: reader,
+		closed: make(chan struct{}, 1),
+	}
+}
+
+func (r *readCloseSignaller) Read(p []byte) (int, error) {
+	return r.reader.Read(p)
+}
+
+func (r *readCloseSignaller) Close() error {
+	select {
+	case r.closed <- *new(struct{}):
+	default:
+	}
+	return nil
+}
+
+func (r *readCloseSignaller) AwaitClosed() {
+	<-r.closed
+}
+
 // roundTrip configures and makes the actual HTTP POST request
 func (meek *MeekConn) roundTrip(sendBuffer *bytes.Buffer) (int64, error) {
 
@@ -654,13 +688,15 @@ func (meek *MeekConn) roundTrip(sendBuffer *bytes.Buffer) (int64, error) {
 		// Omit the request payload when retrying after receiving a
 		// partial server response.
 
-		var payloadReader io.Reader
+		var signaller *readCloseSignaller
+		var requestBody io.ReadCloser
 		if !serverAcknowledgedRequestPayload && sendBuffer != nil {
-			payloadReader = bytes.NewReader(sendBuffer.Bytes())
+			signaller = NewReadCloseSignaller(bytes.NewReader(sendBuffer.Bytes()))
+			requestBody = signaller
 		}
 
 		var request *http.Request
-		request, err := http.NewRequest("POST", meek.url.String(), payloadReader)
+		request, err := http.NewRequest("POST", meek.url.String(), requestBody)
 		if err != nil {
 			// Don't retry when can't initialize a Request
 			return 0, common.ContextError(err)
@@ -684,6 +720,11 @@ func (meek *MeekConn) roundTrip(sendBuffer *bytes.Buffer) (int64, error) {
 			request.Header.Set("Range", fmt.Sprintf("bytes=%d-", receivedPayloadSize))
 		}
 
+		// sendBuffer will be replaced once the data is no longer needed,
+		// when RoundTrip calls Close on the Body; this allows meekConn.Write()
+		// to unblock and start buffering data for the next roung trip while
+		// still reading the current round trip response.
+
 		response, err := meek.transport.RoundTrip(request)
 		if err != nil {
 			select {
@@ -719,11 +760,12 @@ func (meek *MeekConn) roundTrip(sendBuffer *bytes.Buffer) (int64, error) {
 			// must have received the request payload.
 			serverAcknowledgedRequestPayload = true
 
-			// sendBuffer can now be replaced, as the data is no longer
-			// needed; this allows meekConn.Write() to unblock and start
-			// buffering data for the next roung trip while still reading
-			// the current round trip response.
-			if sendBuffer != nil {
+			// sendBuffer is now no longer required for retries, ane the
+			// buffer may be replaced; this allows meekConn.Write() to unblock
+			// and start buffering data for the next roung trip while still
+			// reading the current round trip response.
+			if signaller != nil {
+				signaller.AwaitClosed()
 				sendBuffer.Truncate(0)
 				meek.replaceSendBuffer(sendBuffer)
 				sendBuffer = nil