Browse Source

Integrate redis-based legacy psi_web support

Rod Hynes 10 years ago
parent
commit
26cc5f42de
4 changed files with 81 additions and 20 deletions
  1. 21 0
      psiphon/server/config.go
  2. 7 3
      psiphon/server/geoip.go
  3. 11 3
      psiphon/server/services.go
  4. 42 14
      psiphon/server/sshService.go

+ 21 - 0
psiphon/server/config.go

@@ -55,6 +55,9 @@ const (
 	SSH_CONNECTION_READ_DEADLINE           = 5 * time.Minute
 	SSH_OBFUSCATED_KEY_BYTE_LENGTH         = 32
 	DEFAULT_OBFUSCATED_SSH_SERVER_PORT     = 3333
+	REDIS_POOL_MAX_IDLE                    = 5
+	REDIS_POOL_MAX_ACTIVE                  = 1000
+	REDIS_POOL_IDLE_TIMEOUT                = 5 * time.Minute
 )
 
 // TODO: break config into sections (sub-structs)
@@ -77,6 +80,23 @@ type Config struct {
 	SSHServerPort           int
 	ObfuscatedSSHKey        string
 	ObfuscatedSSHServerPort int
+	RedisServerAddress      string
+}
+
+func (config *Config) RunWebServer() bool {
+	return config.WebServerPort > 0
+}
+
+func (config *Config) RunSSHServer() bool {
+	return config.SSHServerPort > 0
+}
+
+func (config *Config) RunObfuscatedSSHServer() bool {
+	return config.ObfuscatedSSHServerPort > 0
+}
+
+func (config *Config) UseRedis() bool {
+	return config.RedisServerAddress != ""
 }
 
 func LoadConfig(configJson []byte) (*Config, error) {
@@ -201,6 +221,7 @@ func GenerateConfig(params *GenerateConfigParams) ([]byte, []byte, error) {
 		SSHServerPort:           sshServerPort,
 		ObfuscatedSSHKey:        obfuscatedSSHKey,
 		ObfuscatedSSHServerPort: obfuscatedSSHServerPort,
+		RedisServerAddress:      "",
 	}
 
 	encodedConfig, err := json.MarshalIndent(config, "\n", "    ")

+ 7 - 3
psiphon/server/geoip.go

@@ -34,13 +34,17 @@ type GeoIPData struct {
 	ISP     string
 }
 
-func GeoIPLookup(ipAddress string) GeoIPData {
-
-	result := GeoIPData{
+func NewGeoIPData() GeoIPData {
+	return GeoIPData{
 		Country: UNKNOWN_GEOIP_VALUE,
 		City:    UNKNOWN_GEOIP_VALUE,
 		ISP:     UNKNOWN_GEOIP_VALUE,
 	}
+}
+
+func GeoIPLookup(ipAddress string) GeoIPData {
+
+	result := NewGeoIPData()
 
 	ip := net.ParseIP(ipAddress)
 

+ 11 - 3
psiphon/server/services.go

@@ -47,11 +47,19 @@ func RunServices(encodedConfig []byte) error {
 		return psiphon.ContextError(err)
 	}
 
+	if config.UseRedis() {
+		err = InitRedis(config)
+		if err != nil {
+			log.WithContextFields(LogFields{"error": err}).Error("init redis failed")
+			return psiphon.ContextError(err)
+		}
+	}
+
 	waitGroup := new(sync.WaitGroup)
 	shutdownBroadcast := make(chan struct{})
 	errors := make(chan error)
 
-	if config.WebServerPort > 0 {
+	if config.RunWebServer() {
 		waitGroup.Add(1)
 		go func() {
 			defer waitGroup.Done()
@@ -63,7 +71,7 @@ func RunServices(encodedConfig []byte) error {
 		}()
 	}
 
-	if config.SSHServerPort > 0 {
+	if config.RunSSHServer() {
 		waitGroup.Add(1)
 		go func() {
 			defer waitGroup.Done()
@@ -75,7 +83,7 @@ func RunServices(encodedConfig []byte) error {
 		}()
 	}
 
-	if config.ObfuscatedSSHServerPort > 0 {
+	if config.RunObfuscatedSSHServer() {
 		waitGroup.Add(1)
 		go func() {
 			defer waitGroup.Done()

+ 42 - 14
psiphon/server/sshService.go

@@ -190,33 +190,48 @@ func (sshServer *sshServer) registerClient(client *sshClient) (string, bool) {
 	return key, true
 }
 
+func (sshServer *sshServer) getClientGeoIPData(clientKey string) GeoIPData {
+	sshServer.clientsMutex.Lock()
+	client, ok := sshServer.clients[clientKey]
+	sshServer.clientsMutex.Unlock()
+
+	geoIPData := NewGeoIPData()
+
+	if ok {
+		client.Lock()
+		geoIPData = client.geoIPData
+		client.Unlock()
+	}
+
+	return geoIPData
+}
+
 func (sshServer *sshServer) updateClient(
 	clientKey string, updater func(*sshClient)) {
 
 	sshServer.clientsMutex.Lock()
-	sshClient, ok := sshServer.clients[clientKey]
+	client, ok := sshServer.clients[clientKey]
 	sshServer.clientsMutex.Unlock()
 	if ok {
-		sshClient.Lock()
-		updater(sshClient)
-		sshClient.Unlock()
+		client.Lock()
+		updater(client)
+		client.Unlock()
 	}
 }
 
 func (sshServer *sshServer) unregisterClient(clientKey string) {
 	sshServer.clientsMutex.Lock()
-	if sshServer.stoppingClients {
-		return
-	}
 	client := sshServer.clients[clientKey]
 	delete(sshServer.clients, clientKey)
 	sshServer.clientsMutex.Unlock()
-
-	if client == nil {
-		return
+	if client != nil {
+		sshServer.stopClient(client)
 	}
+}
 
+func (sshServer *sshServer) stopClient(client *sshClient) {
 	client.sshConn.Close()
+	client.sshConn.Wait()
 	client.Lock()
 	log.WithContextFields(
 		LogFields{
@@ -237,12 +252,11 @@ func (sshServer *sshServer) unregisterClient(clientKey string) {
 func (sshServer *sshServer) stopClients() {
 	sshServer.clientsMutex.Lock()
 	sshServer.stoppingClients = true
+	sshServer.clients = make(map[string]*sshClient)
 	sshServer.clientsMutex.Unlock()
 	for _, client := range sshServer.clients {
-		client.sshConn.Close()
-		client.sshConn.Wait()
+		sshServer.stopClient(client)
 	}
-	sshServer.clients = make(map[string]*sshClient)
 }
 
 func (sshServer *sshServer) handleClient(tcpConn *net.TCPConn) {
@@ -359,9 +373,23 @@ func (sshServer *sshServer) passwordCallback(conn ssh.ConnMetadata, password []b
 	}
 
 	clientKey := string(conn.SessionID())
+	psiphonSessionID := sshPasswordPayload.SessionId
+
 	sshServer.updateClient(clientKey, func(client *sshClient) {
-		client.psiphonSessionID = sshPasswordPayload.SessionId
+		client.psiphonSessionID = psiphonSessionID
 	})
+
+	if sshServer.config.UseRedis() {
+		err = UpdateRedisForLegacyPsiWeb(
+			psiphonSessionID, sshServer.getClientGeoIPData(clientKey))
+		if err != nil {
+			log.WithContextFields(LogFields{
+				"psiphonSessionID": psiphonSessionID,
+				"error":            err}).Warning("UpdateRedisForLegacyPsiWeb failed")
+			// Allow the connection to proceed; legacy psi_web will not get accurate GeoIP values.
+		}
+	}
+
 	return nil, nil
 }