|
|
@@ -24,6 +24,7 @@ import (
|
|
|
"encoding/json"
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
+ "math/rand"
|
|
|
"path/filepath"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
@@ -221,10 +222,17 @@ func StoreServerEntry(serverEntry *ServerEntry, replaceIfExists bool) error {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
-// StoreServerEntries stores a list of server entries. This is simply a
|
|
|
-// helper which calls StoreServerEntry on each entry in the list -- so there
|
|
|
-// is an independent transaction for each entry -- and stops on first error.
|
|
|
+// StoreServerEntries shuffles and stores a list of server entries.
|
|
|
+// Shuffling is performed on imported server entrues as part of client-side
|
|
|
+// load balancing.
|
|
|
+// There is an independent transaction for each entry insert/update.
|
|
|
func StoreServerEntries(serverEntries []*ServerEntry, replaceIfExists bool) error {
|
|
|
+
|
|
|
+ for index := len(serverEntries) - 1; index > 0; index-- {
|
|
|
+ swapIndex := rand.Intn(index + 1)
|
|
|
+ serverEntries[index], serverEntries[swapIndex] = serverEntries[swapIndex], serverEntries[index]
|
|
|
+ }
|
|
|
+
|
|
|
for _, serverEntry := range serverEntries {
|
|
|
err := StoreServerEntry(serverEntry, replaceIfExists)
|
|
|
if err != nil {
|
|
|
@@ -286,10 +294,30 @@ func (iterator *ServerEntryIterator) Reset() error {
|
|
|
return ContextError(err)
|
|
|
}
|
|
|
var cursor *sql.Rows
|
|
|
+
|
|
|
+ // This query implements the Psiphon server candidate selection
|
|
|
+ // algorithm: the first set of server candidates are in rank (priority)
|
|
|
+ // order, to favor previously successful servers; then the remaining
|
|
|
+ // long tail is shuffled to raise up less recent candidates.
|
|
|
+
|
|
|
whereClause, whereParams := makeServerEntryWhereClause(
|
|
|
iterator.region, iterator.protocol, nil)
|
|
|
- query := "select data from serverEntry" + whereClause + " order by rank desc;"
|
|
|
- cursor, err = transaction.Query(query, whereParams...)
|
|
|
+ headLength := CONNECTION_WORKER_POOL_SIZE
|
|
|
+ queryFormat := `
|
|
|
+ select data from serverEntry %s
|
|
|
+ order by case
|
|
|
+ when rank > coalesce((select rank from serverEntry %s order by rank desc limit ?, 1), -1) then rank
|
|
|
+ else abs(random())%%((select rank from serverEntry %s order by rank desc limit ?, 1))
|
|
|
+ end desc;`
|
|
|
+ query := fmt.Sprintf(queryFormat, whereClause, whereClause, whereClause)
|
|
|
+ params := make([]interface{}, 0)
|
|
|
+ params = append(params, whereParams...)
|
|
|
+ params = append(params, whereParams...)
|
|
|
+ params = append(params, headLength)
|
|
|
+ params = append(params, whereParams...)
|
|
|
+ params = append(params, headLength)
|
|
|
+
|
|
|
+ cursor, err = transaction.Query(query, params...)
|
|
|
if err != nil {
|
|
|
transaction.Rollback()
|
|
|
return ContextError(err)
|