Browse Source

Merge remote-tracking branch 'upstream/master'

Adam Pritchard 11 years ago
parent
commit
97ec7a2e03
3 changed files with 85 additions and 45 deletions
  1. 12 11
      psiphon/controller.go
  2. 33 19
      psiphon/dataStore.go
  3. 40 15
      psiphon/meekConn.go

+ 12 - 11
psiphon/controller.go

@@ -557,18 +557,18 @@ func (controller *Controller) stopEstablishing() {
 // servers with higher rank are priority candidates.
 func (controller *Controller) establishCandidateGenerator() {
 	defer controller.establishWaitGroup.Done()
+
+	iterator, err := NewServerEntryIterator(
+		controller.config.EgressRegion, controller.config.TunnelProtocol)
+	if err != nil {
+		Notice(NOTICE_ALERT, "failed to iterate over candidates: %s", err)
+		controller.SignalFailure()
+		return
+	}
+	defer iterator.Close()
+
 loop:
 	for {
-		// Note: it's possible that an active tunnel in excludeServerEntries will
-		// fail during this iteration of server entries and in that case the
-		// cooresponding server will not be retried (within the same iteration).
-		iterator, err := NewServerEntryIterator(
-			controller.config.EgressRegion, controller.config.TunnelProtocol)
-		if err != nil {
-			Notice(NOTICE_ALERT, "failed to iterate over candidates: %s", err)
-			controller.SignalFailure()
-			break loop
-		}
 		for {
 			serverEntry, err := iterator.Next()
 			if err != nil {
@@ -588,7 +588,8 @@ loop:
 				break loop
 			}
 		}
-		iterator.Close()
+		iterator.Reset()
+
 		// After a complete iteration of candidate servers, pause before iterating again.
 		// This helps avoid some busy wait loop conditions, and also allows some time for
 		// network conditions to change.

+ 33 - 19
psiphon/dataStore.go

@@ -48,14 +48,14 @@ func initDataStore() {
              rank integer not null unique,
              region text not null,
              data blob not null);
-	    create table if not exists serverEntryProtocol
-	        (serverEntryId text not null,
-	         protocol text not null,
-	         primary key (serverEntryId, protocol));
+        create table if not exists serverEntryProtocol
+            (serverEntryId text not null,
+             protocol text not null,
+             primary key (serverEntryId, protocol));
         create table if not exists keyValue
             (key text not null primary key,
              value text not null);
-		pragma journal_mode=WAL;
+        pragma journal_mode=WAL;
         `
 		db, err := sql.Open(
 			"sqlite3",
@@ -115,27 +115,35 @@ func transactionWithRetry(updater func(*sql.Tx) error) error {
 
 // serverEntryExists returns true if a serverEntry with the
 // given ipAddress id already exists.
-func serverEntryExists(transaction *sql.Tx, ipAddress string) bool {
+func serverEntryExists(transaction *sql.Tx, ipAddress string) (bool, error) {
 	query := "select count(*) from serverEntry where id  = ?;"
 	var count int
 	err := singleton.db.QueryRow(query, ipAddress).Scan(&count)
-	return err == nil && count > 0
+	if err != nil {
+		return false, ContextError(err)
+	}
+	return count > 0, nil
 }
 
-// StoreServerEntry adds the server entry to the data store. A newly
-// stored (or re-stored) server entry is assigned the next-to-top rank
-// for cycle order (the previous top ranked entry is promoted). The
-// purpose of this is to keep the last selected server as the top
-// ranked server.
+// StoreServerEntry adds the server entry to the data store.
+// A newly stored (or re-stored) server entry is assigned the next-to-top
+// rank for iteration order (the previous top ranked entry is promoted). The
+// purpose of inserting at next-to-top is to keep the last selected server
+// as the top ranked server. Note, server candidates are iterated in decending
+// rank order, so the largest rank is top rank.
 // When replaceIfExists is true, an existing server entry record is
 // overwritten; otherwise, the existing record is unchanged.
 func StoreServerEntry(serverEntry *ServerEntry, replaceIfExists bool) error {
 	return transactionWithRetry(func(transaction *sql.Tx) error {
-		serverEntryExists := serverEntryExists(transaction, serverEntry.IpAddress)
+		serverEntryExists, err := serverEntryExists(transaction, serverEntry.IpAddress)
+		if err != nil {
+			return ContextError(err)
+		}
 		if serverEntryExists && !replaceIfExists {
+			// Nothing more to do
 			return nil
 		}
-		_, err := transaction.Exec(`
+		_, err = transaction.Exec(`
             update serverEntry set rank = rank + 1
                 where id = (select id from serverEntry order by rank desc limit 1);
             `)
@@ -182,9 +190,10 @@ func StoreServerEntry(serverEntry *ServerEntry, replaceIfExists bool) error {
 	})
 }
 
-// PromoteServerEntry assigns the top cycle rank to the specified
-// server entry. This server entry will be the first candidate in
-// a subsequent tunnel establishment.
+// PromoteServerEntry assigns the top rank (one more than current
+// max rank) to the specified server entry. Server candidates are
+// iterated in decending rank order, so this server entry will be
+// the first candidate in a subsequent tunnel establishment.
 func PromoteServerEntry(ipAddress string) error {
 	return transactionWithRetry(func(transaction *sql.Tx) error {
 		_, err := transaction.Exec(`
@@ -335,6 +344,11 @@ func HasServerEntries(region, protocol string) bool {
 	query := "select count(*) from serverEntry" + whereClause
 	err := singleton.db.QueryRow(query, whereParams...).Scan(&count)
 
+	if err != nil {
+		Notice(NOTICE_ALERT, "HasServerEntries failed: %s", err)
+		return false
+	}
+
 	if region == "" {
 		region = "(any)"
 	}
@@ -344,7 +358,7 @@ func HasServerEntries(region, protocol string) bool {
 	Notice(NOTICE_INFO, "servers for region %s and protocol %s: %d",
 		region, protocol, count)
 
-	return err == nil && count > 0
+	return count > 0
 }
 
 // GetServerEntryIpAddresses returns an array containing
@@ -386,7 +400,7 @@ func SetKeyValue(key, value string) error {
 	})
 }
 
-// GetLastConnected retrieves a key/value pair. If not found,
+// GetKeyValue retrieves the value for a given key. If not found,
 // it returns an empty string value.
 func GetKeyValue(key string) (value string, err error) {
 	initDataStore()

+ 40 - 15
psiphon/meekConn.go

@@ -194,7 +194,6 @@ func (meek *MeekConn) SetClosedSignal(closedSignal chan struct{}) (err error) {
 // Close terminates the meek connection. Close waits for the relay processing goroutine
 // to stop and releases HTTP transport resources.
 // A mutex is required to support psiphon.Conn.SetClosedSignal concurrency semantics.
-// NOTE: currently doesn't interrupt any HTTP request in flight.
 func (meek *MeekConn) Close() (err error) {
 	meek.mutex.Lock()
 	defer meek.mutex.Unlock()
@@ -202,9 +201,6 @@ func (meek *MeekConn) Close() (err error) {
 		close(meek.broadcastClosed)
 		meek.pendingConns.CloseAll()
 		meek.relayWaitGroup.Wait()
-		// TODO: meek.transport.CancelRequest() for current in-flight request?
-		// (currently pendingConns will abort establishing connections, but not
-		// established persistent connections)
 		meek.transport.CloseIdleConnections()
 		meek.isClosed = true
 		select {
@@ -338,6 +334,7 @@ func (meek *MeekConn) relay() {
 		case <-timeout.C:
 			// In the polling case, send an empty payload
 		case <-meek.broadcastClosed:
+			// TODO: timeout case may be selected when broadcastClosed is set?
 			return
 		}
 		sendPayloadSize := 0
@@ -357,6 +354,10 @@ func (meek *MeekConn) relay() {
 			go meek.Close()
 			return
 		}
+		if receivedPayload == nil {
+			// In this case, meek.roundTrip encountered broadcastClosed. Exit without error.
+			return
+		}
 		receivedPayloadSize, err := meek.readPayload(receivedPayload)
 		if err != nil {
 			Notice(NOTICE_ALERT, "%s", ContextError(err))
@@ -418,13 +419,37 @@ func (meek *MeekConn) roundTrip(sendPayload []byte) (receivedPayload io.ReadClos
 	request.Header.Set("User-Agent", "")
 	request.Header.Set("Content-Type", "application/octet-stream")
 	request.AddCookie(meek.cookie)
-	// This retry mitigates intermittent failures between the client and front/server.
+
+	// The retry mitigates intermittent failures between the client and front/server.
 	// Note: Retry will only be effective if entire request failed (underlying transport protocol
 	// such as SSH will fail if extra bytes are replayed in either direction due to partial relay
 	// success followed by retry).
 	var response *http.Response
-	for i := 0; i <= 1; i++ {
-		response, err = meek.transport.RoundTrip(request)
+	for retry := 0; retry <= 1; retry++ {
+
+		// The http.Transport.RoundTrip is run in a goroutine to enable cancelling a request in-flight.
+		type roundTripResponse struct {
+			response *http.Response
+			err      error
+		}
+		roundTripResponseChannel := make(chan *roundTripResponse, 1)
+		roundTripWaitGroup := new(sync.WaitGroup)
+		roundTripWaitGroup.Add(1)
+		go func() {
+			defer roundTripWaitGroup.Done()
+			r, err := meek.transport.RoundTrip(request)
+			roundTripResponseChannel <- &roundTripResponse{r, err}
+		}()
+		select {
+		case roundTripResponse := <-roundTripResponseChannel:
+			response = roundTripResponse.response
+			err = roundTripResponse.err
+		case <-meek.broadcastClosed:
+			meek.transport.CancelRequest(request)
+			return nil, nil
+		}
+		roundTripWaitGroup.Wait()
+
 		if err == nil {
 			break
 		}
@@ -435,14 +460,14 @@ func (meek *MeekConn) roundTrip(sendPayload []byte) (receivedPayload io.ReadClos
 	if response.StatusCode != http.StatusOK {
 		return nil, ContextError(fmt.Errorf("http request failed %d", response.StatusCode))
 	}
-        // observe response cookies for meek session key token.
-        // Once found it must be used for all consecutive requests made to the server
-        for _, c := range response.Cookies() {
-            if meek.cookie.Name == c.Name {
-                meek.cookie.Value = c.Value
-                break
-            }
-        }
+	// observe response cookies for meek session key token.
+	// Once found it must be used for all consecutive requests made to the server
+	for _, c := range response.Cookies() {
+		if meek.cookie.Name == c.Name {
+			meek.cookie.Value = c.Value
+			break
+		}
+	}
 	return response.Body, nil
 }