Browse Source

Enable interrupting HTTP requests in-flight when a MeekConn is closed.

Rod Hynes 11 years ago
parent
commit
c8478b3a90
1 changed files with 40 additions and 15 deletions
  1. 40 15
      psiphon/meekConn.go

+ 40 - 15
psiphon/meekConn.go

@@ -193,7 +193,6 @@ func (meek *MeekConn) SetClosedSignal(closedSignal chan struct{}) (err error) {
 // Close terminates the meek connection. Close waits for the relay processing goroutine
 // to stop and releases HTTP transport resources.
 // A mutex is required to support psiphon.Conn.SetClosedSignal concurrency semantics.
-// NOTE: currently doesn't interrupt any HTTP request in flight.
 func (meek *MeekConn) Close() (err error) {
 	meek.mutex.Lock()
 	defer meek.mutex.Unlock()
@@ -201,9 +200,6 @@ func (meek *MeekConn) Close() (err error) {
 		close(meek.broadcastClosed)
 		meek.pendingConns.CloseAll()
 		meek.relayWaitGroup.Wait()
-		// TODO: meek.transport.CancelRequest() for current in-flight request?
-		// (currently pendingConns will abort establishing connections, but not
-		// established persistent connections)
 		meek.transport.CloseIdleConnections()
 		meek.isClosed = true
 		select {
@@ -337,6 +333,7 @@ func (meek *MeekConn) relay() {
 		case <-timeout.C:
 			// In the polling case, send an empty payload
 		case <-meek.broadcastClosed:
+			// TODO: timeout case may be selected when broadcastClosed is set?
 			return
 		}
 		sendPayloadSize := 0
@@ -356,6 +353,10 @@ func (meek *MeekConn) relay() {
 			go meek.Close()
 			return
 		}
+		if receivedPayload == nil {
+			// In this case, meek.roundTrip encountered broadcastClosed. Exit without error.
+			return
+		}
 		receivedPayloadSize, err := meek.readPayload(receivedPayload)
 		if err != nil {
 			Notice(NOTICE_ALERT, "%s", ContextError(err))
@@ -417,13 +418,37 @@ func (meek *MeekConn) roundTrip(sendPayload []byte) (receivedPayload io.ReadClos
 	request.Header.Set("User-Agent", "")
 	request.Header.Set("Content-Type", "application/octet-stream")
 	request.AddCookie(meek.cookie)
-	// This retry mitigates intermittent failures between the client and front/server.
+
+	// The retry mitigates intermittent failures between the client and front/server.
 	// Note: Retry will only be effective if entire request failed (underlying transport protocol
 	// such as SSH will fail if extra bytes are replayed in either direction due to partial relay
 	// success followed by retry).
 	var response *http.Response
-	for i := 0; i <= 1; i++ {
-		response, err = meek.transport.RoundTrip(request)
+	for retry := 0; retry <= 1; retry++ {
+
+		// The http.Transport.RoundTrip is run in a goroutine to enable cancelling a request in-flight.
+		type roundTripResponse struct {
+			response *http.Response
+			err      error
+		}
+		roundTripResponseChannel := make(chan *roundTripResponse, 1)
+		roundTripWaitGroup := new(sync.WaitGroup)
+		roundTripWaitGroup.Add(1)
+		go func() {
+			defer roundTripWaitGroup.Done()
+			r, err := meek.transport.RoundTrip(request)
+			roundTripResponseChannel <- &roundTripResponse{r, err}
+		}()
+		select {
+		case roundTripResponse := <-roundTripResponseChannel:
+			response = roundTripResponse.response
+			err = roundTripResponse.err
+		case <-meek.broadcastClosed:
+			meek.transport.CancelRequest(request)
+			return nil, nil
+		}
+		roundTripWaitGroup.Wait()
+
 		if err == nil {
 			break
 		}
@@ -434,14 +459,14 @@ func (meek *MeekConn) roundTrip(sendPayload []byte) (receivedPayload io.ReadClos
 	if response.StatusCode != http.StatusOK {
 		return nil, ContextError(fmt.Errorf("http request failed %d", response.StatusCode))
 	}
-        // observe response cookies for meek session key token.
-        // Once found it must be used for all consecutive requests made to the server
-        for _, c := range response.Cookies() {
-            if meek.cookie.Name == c.Name {
-                meek.cookie.Value = c.Value
-                break
-            }
-        }
+	// observe response cookies for meek session key token.
+	// Once found it must be used for all consecutive requests made to the server
+	for _, c := range response.Cookies() {
+		if meek.cookie.Name == c.Name {
+			meek.cookie.Value = c.Value
+			break
+		}
+	}
 	return response.Body, nil
 }