/*
* Copyright (c) 2014, Psiphon Inc.
* All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*
*/
// Package psiphon implements the core tunnel functionality of a Psiphon client.
// The main function is RunForever, which runs a Controller that obtains lists of
// servers, establishes tunnel connections, and runs local proxies through which
// tunneled traffic may be sent.
package psiphon
import (
"errors"
"fmt"
"io"
"net"
"sync"
"time"
)
// Controller is a tunnel lifecycle coordinator. It manages lists of servers to
// connect to; establishes and monitors tunnels; and runs local proxies which
// route traffic through the tunnels.
type Controller struct {
config *Config
failureSignal chan struct{}
shutdownBroadcast chan struct{}
runWaitGroup *sync.WaitGroup
establishedTunnels chan *Tunnel
failedTunnels chan *Tunnel
tunnelMutex sync.Mutex
tunnels []*Tunnel
nextTunnel int
operateWaitGroup *sync.WaitGroup
isEstablishing bool
establishWaitGroup *sync.WaitGroup
stopEstablishingBroadcast chan struct{}
candidateServerEntries chan *ServerEntry
pendingConns *Conns
}
// NewController initializes a new controller.
func NewController(config *Config) (controller *Controller) {
return &Controller{
config: config,
// failureSignal receives a signal from a component (including socks and
// http local proxies) if they unexpectedly fail. Senders should not block.
// A buffer allows at least one stop signal to be sent before there is a receiver.
failureSignal: make(chan struct{}, 1),
shutdownBroadcast: make(chan struct{}),
runWaitGroup: new(sync.WaitGroup),
// establishedTunnels and failedTunnels buffer sizes are large enough to
// receive full pools of tunnels without blocking. Senders should not block.
establishedTunnels: make(chan *Tunnel, config.TunnelPoolSize),
failedTunnels: make(chan *Tunnel, config.TunnelPoolSize),
tunnels: make([]*Tunnel, 0),
operateWaitGroup: new(sync.WaitGroup),
isEstablishing: false,
pendingConns: new(Conns),
}
}
// Run executes the controller. It launches components and then monitors
// for a shutdown signal; after receiving the signal it shuts down the
// controller.
// The components include:
// - the periodic remote server list fetcher
// - the tunnel manager
// - a local SOCKS proxy that port forwards through the pool of tunnels
// - a local HTTP proxy that port forwards through the pool of tunnels
func (controller *Controller) Run(shutdownBroadcast <-chan struct{}) {
Notice(NOTICE_VERSION, VERSION)
Stats_Start()
defer Stats_Stop()
socksProxy, err := NewSocksProxy(controller.config, controller)
if err != nil {
Notice(NOTICE_ALERT, "error initializing local SOCKS proxy: %s", err)
return
}
defer socksProxy.Close()
httpProxy, err := NewHttpProxy(controller.config, controller)
if err != nil {
Notice(NOTICE_ALERT, "error initializing local SOCKS proxy: %s", err)
return
}
defer httpProxy.Close()
controller.runWaitGroup.Add(2)
go controller.remoteServerListFetcher()
go controller.runTunnels()
select {
case <-shutdownBroadcast:
Notice(NOTICE_INFO, "controller shutdown by request")
case <-controller.failureSignal:
Notice(NOTICE_ALERT, "controller shutdown due to failure")
}
// Note: in addition to establish(), this pendingConns will interrupt
// FetchRemoteServerList
controller.pendingConns.CloseAll()
close(controller.shutdownBroadcast)
controller.runWaitGroup.Wait()
Notice(NOTICE_INFO, "exiting controller")
}
// SignalFailure notifies the controller that an associated component has failed.
// This will terminate the controller.
func (controller *Controller) SignalFailure() {
select {
case controller.failureSignal <- *new(struct{}):
default:
}
}
// remoteServerListFetcher fetches an out-of-band list of server entries
// for more tunnel candidates. It fetches immediately, retries after failure
// with a wait period, and refetches after success with a longer wait period.
func (controller *Controller) remoteServerListFetcher() {
defer controller.runWaitGroup.Done()
// Note: unlike existing Psiphon clients, this code
// always makes the fetch remote server list request
loop:
for {
// TODO: FetchRemoteServerList should have its own pendingConns,
// otherwise it may needlessly abort when establish is stopped.
err := FetchRemoteServerList(controller.config, controller.pendingConns)
var duration time.Duration
if err != nil {
Notice(NOTICE_ALERT, "failed to fetch remote server list: %s", err)
duration = FETCH_REMOTE_SERVER_LIST_RETRY_TIMEOUT
} else {
duration = FETCH_REMOTE_SERVER_LIST_STALE_TIMEOUT
}
timeout := time.After(duration)
select {
case <-timeout:
// Fetch again
case <-controller.shutdownBroadcast:
break loop
}
}
Notice(NOTICE_INFO, "exiting remote server list fetcher")
}
// runTunnels is the controller tunnel management main loop. It starts and stops
// establishing tunnels based on the target tunnel pool size and the current size
// of the pool. Tunnels are established asynchronously using worker goroutines.
// When a tunnel is established, it's added to the active pool and a corresponding
// operateTunnel goroutine is launched which starts a session in the tunnel and
// monitors the tunnel for failures.
// When a tunnel fails, it's removed from the pool and the establish process is
// restarted to fill the pool.
func (controller *Controller) runTunnels() {
defer controller.runWaitGroup.Done()
// Don't start establishing until there are some server candidates. The
// typical case is a client with no server entries which will wait for
// the first successful FetchRemoteServerList to populate the data store.
for {
if HasServerEntries(
controller.config.EgressRegion, controller.config.TunnelProtocol) {
break
}
// TODO: replace polling with signal
timeout := time.After(5 * time.Second)
select {
case <-timeout:
case <-controller.shutdownBroadcast:
return
}
}
controller.startEstablishing()
loop:
for {
select {
case failedTunnel := <-controller.failedTunnels:
Notice(NOTICE_ALERT, "tunnel failed: %s", failedTunnel.serverEntry.IpAddress)
controller.terminateTunnel(failedTunnel)
// Note: only this goroutine may call startEstablishing/stopEstablishing and access
// isEstablishing.
if !controller.isEstablishing {
controller.startEstablishing()
}
// !TODO! design issue: might not be enough server entries with region/caps to ever fill tunnel slots
// solution(?) target MIN(CountServerEntries(region, protocol), TunnelPoolSize)
case establishedTunnel := <-controller.establishedTunnels:
Notice(NOTICE_INFO, "established tunnel: %s", establishedTunnel.serverEntry.IpAddress)
if controller.registerTunnel(establishedTunnel) {
Notice(NOTICE_INFO, "active tunnel: %s", establishedTunnel.serverEntry.IpAddress)
controller.operateWaitGroup.Add(1)
go controller.operateTunnel(establishedTunnel)
} else {
controller.discardTunnel(establishedTunnel)
}
if controller.isFullyEstablished() {
controller.stopEstablishing()
}
case <-controller.shutdownBroadcast:
break loop
}
}
controller.stopEstablishing()
controller.terminateAllTunnels()
controller.operateWaitGroup.Wait()
// Drain tunnel channels
close(controller.establishedTunnels)
for tunnel := range controller.establishedTunnels {
controller.discardTunnel(tunnel)
}
close(controller.failedTunnels)
for tunnel := range controller.failedTunnels {
controller.discardTunnel(tunnel)
}
Notice(NOTICE_INFO, "exiting run tunnels")
}
// discardTunnel disposes of a successful connection that is no longer required.
func (controller *Controller) discardTunnel(tunnel *Tunnel) {
Notice(NOTICE_INFO, "discard tunnel: %s", tunnel.serverEntry.IpAddress)
// TODO: not calling PromoteServerEntry, since that would rank the
// discarded tunnel before fully active tunnels. Can a discarded tunnel
// be promoted (since it connects), but with lower rank than all active
// tunnels?
tunnel.Close()
}
// registerTunnel adds the connected tunnel to the pool of active tunnels
// which are candidates for port forwarding. Returns true if the pool has an
// empty slot and false if the pool is full (caller should discard the tunnel).
func (controller *Controller) registerTunnel(tunnel *Tunnel) bool {
controller.tunnelMutex.Lock()
defer controller.tunnelMutex.Unlock()
if len(controller.tunnels) >= controller.config.TunnelPoolSize {
return false
}
// Perform a final check just in case we've established
// a duplicate connection.
for _, activeTunnel := range controller.tunnels {
if activeTunnel.serverEntry.IpAddress == tunnel.serverEntry.IpAddress {
Notice(NOTICE_ALERT, "duplicate tunnel: %s", tunnel.serverEntry.IpAddress)
return false
}
}
controller.tunnels = append(controller.tunnels, tunnel)
Notice(NOTICE_TUNNELS, "%d", len(controller.tunnels))
return true
}
// isFullyEstablished indicates if the pool of active tunnels is full.
func (controller *Controller) isFullyEstablished() bool {
controller.tunnelMutex.Lock()
defer controller.tunnelMutex.Unlock()
return len(controller.tunnels) >= controller.config.TunnelPoolSize
}
// terminateTunnel removes a tunnel from the pool of active tunnels
// and closes the tunnel. The next-tunnel state used by getNextActiveTunnel
// is adjusted as required.
func (controller *Controller) terminateTunnel(tunnel *Tunnel) {
controller.tunnelMutex.Lock()
defer controller.tunnelMutex.Unlock()
for index, activeTunnel := range controller.tunnels {
if tunnel == activeTunnel {
controller.tunnels = append(
controller.tunnels[:index], controller.tunnels[index+1:]...)
if controller.nextTunnel > index {
controller.nextTunnel--
}
if controller.nextTunnel >= len(controller.tunnels) {
controller.nextTunnel = 0
}
activeTunnel.Close()
Notice(NOTICE_TUNNELS, "%d", len(controller.tunnels))
break
}
}
}
// terminateAllTunnels empties the tunnel pool, closing all active tunnels.
// This is used when shutting down the controller.
func (controller *Controller) terminateAllTunnels() {
controller.tunnelMutex.Lock()
defer controller.tunnelMutex.Unlock()
for _, activeTunnel := range controller.tunnels {
activeTunnel.Close()
}
controller.tunnels = make([]*Tunnel, 0)
controller.nextTunnel = 0
Notice(NOTICE_TUNNELS, "%d", len(controller.tunnels))
}
// getNextActiveTunnel returns the next tunnel from the pool of active
// tunnels. Currently, tunnel selection order is simple round-robin.
func (controller *Controller) getNextActiveTunnel() (tunnel *Tunnel) {
controller.tunnelMutex.Lock()
defer controller.tunnelMutex.Unlock()
for i := len(controller.tunnels); i > 0; i-- {
tunnel = controller.tunnels[controller.nextTunnel]
controller.nextTunnel =
(controller.nextTunnel + 1) % len(controller.tunnels)
// A tunnel must[*] have started its session (performed the server
// API handshake sequence) before it may be used for tunneling traffic
// [*]currently not enforced by the server, but may be in the future.
if tunnel.IsSessionStarted() {
return tunnel
}
}
return nil
}
// isActiveTunnelServerEntries is used to check if there's already
// an existing tunnel to a candidate server.
func (controller *Controller) isActiveTunnelServerEntry(serverEntry *ServerEntry) bool {
controller.tunnelMutex.Lock()
defer controller.tunnelMutex.Unlock()
for _, activeTunnel := range controller.tunnels {
if activeTunnel.serverEntry.IpAddress == serverEntry.IpAddress {
return true
}
}
return false
}
// operateTunnel starts a Psiphon session (handshake, etc.) on a newly
// connected tunnel, and then monitors the tunnel for failures:
//
// 1. Overall tunnel failure: the tunnel sends a signal to the ClosedSignal
// channel on keep-alive failure and other transport I/O errors. In case
// of such a failure, the tunnel is marked as failed.
//
// 2. Tunnel port forward failures: the tunnel connection may stay up but
// the client may still fail to establish port forwards due to server load
// and other conditions. After a threshold number of such failures, the
// overall tunnel is marked as failed.
//
// TODO: currently, any connect (dial), read, or write error associated with
// a port forward is counted as a failure. It may be important to differentiate
// between failures due to Psiphon server conditions and failures due to the
// origin/target server (in the latter case, the tunnel is healthy). Here are
// some typical error messages to consider matching against (or ignoring):
//
// - "ssh: rejected: administratively prohibited (open failed)"
// - "ssh: rejected: connect failed (Connection timed out)"
// - "write tcp ... broken pipe"
// - "read tcp ... connection reset by peer"
// - "ssh: unexpected packet in response to channel open: "
//
func (controller *Controller) operateTunnel(tunnel *Tunnel) {
defer controller.operateWaitGroup.Done()
tunnelClosedSignal := make(chan struct{}, 1)
err := tunnel.conn.SetClosedSignal(tunnelClosedSignal)
if err != nil {
err = fmt.Errorf("failed to set closed signal: %s", err)
}
Notice(NOTICE_INFO, "starting session for %s", tunnel.serverEntry.IpAddress)
// TODO: NewSession server API calls may block shutdown
session, err := NewSession(controller.config, tunnel)
if err != nil {
err = fmt.Errorf("error starting session for %s: %s", tunnel.serverEntry.IpAddress, err)
}
// Tunnel may now be used for port forwarding
tunnel.SetSessionStarted()
// Promote this successful tunnel to first rank so it's one
// of the first candidates next time establish runs.
PromoteServerEntry(tunnel.serverEntry.IpAddress)
statsTimer := time.NewTimer(NextSendPeriod())
for err == nil {
select {
case failures := <-tunnel.portForwardFailures:
tunnel.portForwardFailureTotal += failures
Notice(
NOTICE_INFO, "port forward failures for %s: %d",
tunnel.serverEntry.IpAddress, tunnel.portForwardFailureTotal)
if tunnel.portForwardFailureTotal > controller.config.PortForwardFailureThreshold {
err = errors.New("tunnel exceeded port forward failure threshold")
}
case <-tunnelClosedSignal:
// TODO: this signal can be received during a commanded shutdown due to
// how tunnels are closed; should rework this to avoid log noise.
err = errors.New("tunnel closed unexpectedly")
case <-controller.shutdownBroadcast:
// Send final stats
sendStats(tunnel, session, true)
Notice(NOTICE_INFO, "shutdown operate tunnel")
return
case <-statsTimer.C:
sendStats(tunnel, session, false)
statsTimer.Reset(NextSendPeriod())
}
}
if err != nil {
Notice(NOTICE_ALERT, "operate tunnel error for %s: %s", tunnel.serverEntry.IpAddress, err)
// Don't block. Assumes the receiver has a buffer large enough for
// the typical number of operated tunnels. In case there's no room,
// terminate the tunnel (runTunnels won't get a signal in this case).
select {
case controller.failedTunnels <- tunnel:
default:
controller.terminateTunnel(tunnel)
}
}
}
// sendStats is a helper for sending session stats to the server.
func sendStats(tunnel *Tunnel, session *Session, final bool) {
payload := GetForServer(tunnel.serverEntry.IpAddress)
if payload != nil {
err := session.DoStatusRequest(payload, final)
if err != nil {
Notice(NOTICE_ALERT, "DoStatusRequest failed for %s: %s", tunnel.serverEntry.IpAddress, err)
PutBack(tunnel.serverEntry.IpAddress, payload)
}
}
}
// TunneledConn implements net.Conn and wraps a port foward connection.
// It is used to hook into Read and Write to observe I/O errors and
// report these errors back to the tunnel monitor as port forward failures.
type TunneledConn struct {
net.Conn
tunnel *Tunnel
}
func (conn *TunneledConn) Read(buffer []byte) (n int, err error) {
n, err = conn.Conn.Read(buffer)
if err != nil && err != io.EOF {
// Report 1 new failure. Won't block; assumes the receiver
// has a sufficient buffer for the threshold number of reports.
// TODO: conditional on type of error or error message?
select {
case conn.tunnel.portForwardFailures <- 1:
default:
}
}
return
}
func (conn *TunneledConn) Write(buffer []byte) (n int, err error) {
n, err = conn.Conn.Write(buffer)
if err != nil && err != io.EOF {
// Same as TunneledConn.Read()
select {
case conn.tunnel.portForwardFailures <- 1:
default:
}
}
return
}
// Dial selects an active tunnel and establishes a port forward
// connection through the selected tunnel. Failure to connect is considered
// a port foward failure, for the purpose of monitoring tunnel health.
func (controller *Controller) Dial(remoteAddr string) (conn net.Conn, err error) {
tunnel := controller.getNextActiveTunnel()
if tunnel == nil {
return nil, ContextError(errors.New("no active tunnels"))
}
tunnelConn, err := tunnel.Dial(remoteAddr)
if err != nil {
// TODO: conditional on type of error or error message?
select {
case tunnel.portForwardFailures <- 1:
default:
}
return nil, ContextError(err)
}
statsConn := NewStatsConn(tunnelConn, tunnel.ServerID(), tunnel.StatsRegexps())
conn = &TunneledConn{
Conn: statsConn,
tunnel: tunnel}
return
}
// startEstablishing creates a pool of worker goroutines which will
// attempt to establish tunnels to candidate servers. The candidates
// are generated by another goroutine.
func (controller *Controller) startEstablishing() {
if controller.isEstablishing {
return
}
Notice(NOTICE_INFO, "start establishing")
controller.isEstablishing = true
controller.establishWaitGroup = new(sync.WaitGroup)
controller.stopEstablishingBroadcast = make(chan struct{})
controller.candidateServerEntries = make(chan *ServerEntry)
for i := 0; i < controller.config.ConnectionWorkerPoolSize; i++ {
controller.establishWaitGroup.Add(1)
go controller.establishTunnelWorker()
}
controller.establishWaitGroup.Add(1)
go controller.establishCandidateGenerator()
}
// stopEstablishing signals the establish goroutines to stop and waits
// for the group to halt. pendingConns is used to interrupt any worker
// blocked on a socket connect.
func (controller *Controller) stopEstablishing() {
if !controller.isEstablishing {
return
}
Notice(NOTICE_INFO, "stop establishing")
// Note: on Windows, interruptibleTCPClose doesn't really interrupt socket connects
// and may leave goroutines running for a time after the Wait call.
controller.pendingConns.CloseAll()
close(controller.stopEstablishingBroadcast)
// Note: establishCandidateGenerator closes controller.candidateServerEntries
// (as it may be sending to that channel).
controller.establishWaitGroup.Wait()
controller.isEstablishing = false
controller.establishWaitGroup = nil
controller.stopEstablishingBroadcast = nil
controller.candidateServerEntries = nil
}
// establishCandidateGenerator populates the candidate queue with server entries
// from the data store. Server entries are iterated in rank order, so that promoted
// servers with higher rank are priority candidates.
func (controller *Controller) establishCandidateGenerator() {
defer controller.establishWaitGroup.Done()
iterator, err := NewServerEntryIterator(
controller.config.EgressRegion, controller.config.TunnelProtocol)
if err != nil {
Notice(NOTICE_ALERT, "failed to iterate over candidates: %s", err)
controller.SignalFailure()
return
}
defer iterator.Close()
loop:
for {
for {
serverEntry, err := iterator.Next()
if err != nil {
Notice(NOTICE_ALERT, "failed to get next candidate: %s", err)
controller.SignalFailure()
break loop
}
if serverEntry == nil {
// Completed this iteration
break
}
select {
case controller.candidateServerEntries <- serverEntry:
case <-controller.stopEstablishingBroadcast:
break loop
case <-controller.shutdownBroadcast:
break loop
}
}
iterator.Reset()
// After a complete iteration of candidate servers, pause before iterating again.
// This helps avoid some busy wait loop conditions, and also allows some time for
// network conditions to change.
timeout := time.After(ESTABLISH_TUNNEL_PAUSE_PERIOD)
select {
case <-timeout:
// Retry iterating
case <-controller.stopEstablishingBroadcast:
break loop
case <-controller.shutdownBroadcast:
break loop
}
}
close(controller.candidateServerEntries)
Notice(NOTICE_INFO, "stopped candidate generator")
}
// establishTunnelWorker pulls candidates from the candidate queue, establishes
// a connection to the tunnel server, and delivers the established tunnel to a channel.
func (controller *Controller) establishTunnelWorker() {
defer controller.establishWaitGroup.Done()
for serverEntry := range controller.candidateServerEntries {
// Note: don't receive from candidateQueue and broadcastStopWorkers in the same
// select, since we want to prioritize receiving the stop signal
select {
case <-controller.stopEstablishingBroadcast:
return
default:
}
// There may already be a tunnel to this candidate. If so, skip it.
if controller.isActiveTunnelServerEntry(serverEntry) {
continue
}
tunnel, err := EstablishTunnel(
controller.config, controller.pendingConns, serverEntry)
if err != nil {
// TODO: distingush case where conn is interrupted?
Notice(NOTICE_INFO, "failed to connect to %s: %s", serverEntry.IpAddress, err)
} else {
// Don't block. Assumes the receiver has a buffer large enough for
// the number of desired tunnels. If there's no room, the tunnel must
// not be required so it's discarded.
select {
case controller.establishedTunnels <- tunnel:
default:
controller.discardTunnel(tunnel)
}
}
}
Notice(NOTICE_INFO, "stopped establish worker")
}