|
|
@@ -24,32 +24,15 @@ import (
|
|
|
"errors"
|
|
|
"net"
|
|
|
"sync"
|
|
|
- "sync/atomic"
|
|
|
- "time"
|
|
|
|
|
|
- "github.com/Psiphon-Inc/goarista/monotime"
|
|
|
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
|
|
|
)
|
|
|
|
|
|
-const (
|
|
|
- PACKET_TUNNEL_PROBE_SLOW_READ = 3 * time.Second
|
|
|
- PACKET_TUNNEL_PROBE_SLOW_WRITE = 3 * time.Second
|
|
|
-)
|
|
|
-
|
|
|
// PacketTunnelTransport is an integration layer that presents an io.ReadWriteCloser interface
|
|
|
// to a tun.Client as the transport for relaying packets. The Psiphon client may periodically
|
|
|
// disconnect from and reconnect to the same or different Psiphon servers. PacketTunnelTransport
|
|
|
// allows the Psiphon client to substitute new transport channels on-the-fly.
|
|
|
-// PacketTunnelTransport implements transport monitoring, using heuristics to determine when
|
|
|
-// the channel tunnel should be probed as a failure check.
|
|
|
type PacketTunnelTransport struct {
|
|
|
- // Note: 64-bit ints used with atomic operations are placed
|
|
|
- // at the start of struct to ensure 64-bit alignment.
|
|
|
- // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
|
|
|
- lastReadComplete int64
|
|
|
- lastWriteStart int64
|
|
|
- lastWriteComplete int64
|
|
|
-
|
|
|
runContext context.Context
|
|
|
stopRunning context.CancelFunc
|
|
|
workers *sync.WaitGroup
|
|
|
@@ -66,28 +49,12 @@ func NewPacketTunnelTransport() *PacketTunnelTransport {
|
|
|
|
|
|
runContext, stopRunning := context.WithCancel(context.Background())
|
|
|
|
|
|
- p := &PacketTunnelTransport{
|
|
|
+ return &PacketTunnelTransport{
|
|
|
runContext: runContext,
|
|
|
stopRunning: stopRunning,
|
|
|
workers: new(sync.WaitGroup),
|
|
|
channelReady: sync.NewCond(new(sync.Mutex)),
|
|
|
-
|
|
|
- // Initialize lastReadComplete to now to avoid a false positive
|
|
|
- // in monitoring: lastWriteComplete.Sub(lastReadComplete) will
|
|
|
- // easily exceed PACKET_TUNNEL_PROBE_SLOW_READ when a write
|
|
|
- // completes before any read. If lastReadComplete were to be
|
|
|
- // used by logic other than the monitoring heuristics, this
|
|
|
- // initial value may need to be revisited.
|
|
|
- lastReadComplete: int64(monotime.Now()),
|
|
|
}
|
|
|
-
|
|
|
- // The monitor worker will signal the tunnel channel when it
|
|
|
- // suspects that the packet tunnel channel has failed.
|
|
|
-
|
|
|
- p.workers.Add(1)
|
|
|
- go p.monitor()
|
|
|
-
|
|
|
- return p
|
|
|
}
|
|
|
|
|
|
// Read implements the io.Reader interface. It uses the current transport channel
|
|
|
@@ -107,8 +74,6 @@ func (p *PacketTunnelTransport) Read(data []byte) (int, error) {
|
|
|
|
|
|
n, err := channelConn.Read(data)
|
|
|
|
|
|
- atomic.StoreInt64(&p.lastReadComplete, int64(monotime.Now()))
|
|
|
-
|
|
|
if err != nil {
|
|
|
|
|
|
// This assumes that any error means the channel has failed, which
|
|
|
@@ -135,17 +100,8 @@ func (p *PacketTunnelTransport) Write(data []byte) (int, error) {
|
|
|
return 0, common.ContextError(err)
|
|
|
}
|
|
|
|
|
|
- // ssh.Channels are pseudo net.Conns and don't support timeouts/deadlines.
|
|
|
- // Instead of spawning a goroutine per write, record time values that the
|
|
|
- // monitor worker will use to detect possible failures, such as writes taking
|
|
|
- // too long.
|
|
|
-
|
|
|
- atomic.StoreInt64(&p.lastWriteStart, int64(monotime.Now()))
|
|
|
-
|
|
|
n, err := channelConn.Write(data)
|
|
|
|
|
|
- atomic.StoreInt64(&p.lastWriteComplete, int64(monotime.Now()))
|
|
|
-
|
|
|
if err != nil {
|
|
|
|
|
|
// This assumes that any error means the channel has failed, which
|
|
|
@@ -158,8 +114,7 @@ func (p *PacketTunnelTransport) Write(data []byte) (int, error) {
|
|
|
}
|
|
|
|
|
|
// Close implements the io.Closer interface. Any underlying transport channel is
|
|
|
-// called, the monitor worker is stopped, and any blocking Read/Write calls will
|
|
|
-// be interrupted.
|
|
|
+// closed and any blocking Read/Write calls will be interrupted.
|
|
|
func (p *PacketTunnelTransport) Close() error {
|
|
|
|
|
|
p.stopRunning()
|
|
|
@@ -300,59 +255,3 @@ func (p *PacketTunnelTransport) failedChannel(
|
|
|
|
|
|
p.UseTunnel(channelTunnel)
|
|
|
}
|
|
|
-
|
|
|
-func (p *PacketTunnelTransport) monitor() {
|
|
|
-
|
|
|
- defer p.workers.Done()
|
|
|
-
|
|
|
- monitorTicker := time.NewTicker(1 * time.Second)
|
|
|
- defer monitorTicker.Stop()
|
|
|
-
|
|
|
- lastSignalTime := monotime.Time(0)
|
|
|
-
|
|
|
- for {
|
|
|
- select {
|
|
|
- case <-p.runContext.Done():
|
|
|
- return
|
|
|
- case <-monitorTicker.C:
|
|
|
- lastReadComplete := monotime.Time(atomic.LoadInt64(&p.lastReadComplete))
|
|
|
- lastWriteStart := monotime.Time(atomic.LoadInt64(&p.lastWriteStart))
|
|
|
- lastWriteComplete := monotime.Time(atomic.LoadInt64(&p.lastWriteComplete))
|
|
|
-
|
|
|
- // Heuristics to determine if the tunnel channel may have failed:
|
|
|
- // - a Write has blocked for too long
|
|
|
- // - no Reads after recent Writes
|
|
|
- //
|
|
|
- // When a heuristic is hit, a signal is sent to the channel tunnel
|
|
|
- // which will invoke and SSH keep alive probe of the tunnel. Nothing
|
|
|
- // is torn down here. If the tunnel determines it has failed, it will
|
|
|
- // close itself, which closes its channels, which will cause blocking
|
|
|
- // PacketTunnelTransport Reads/Writes to fail and call failedChannel.
|
|
|
-
|
|
|
- if (lastWriteStart != 0 &&
|
|
|
- lastWriteStart.Sub(lastWriteComplete) > PACKET_TUNNEL_PROBE_SLOW_WRITE) ||
|
|
|
- (lastWriteComplete.Sub(lastReadComplete) > PACKET_TUNNEL_PROBE_SLOW_READ) {
|
|
|
-
|
|
|
- // Don't keep signalling due to an old condition
|
|
|
- if lastWriteStart.Add(PACKET_TUNNEL_PROBE_SLOW_WRITE).Before(lastSignalTime) &&
|
|
|
- lastWriteComplete.Add(PACKET_TUNNEL_PROBE_SLOW_READ).Before(lastSignalTime) {
|
|
|
-
|
|
|
- break
|
|
|
- }
|
|
|
-
|
|
|
- p.channelMutex.Lock()
|
|
|
- channelTunnel := p.channelTunnel
|
|
|
- p.channelMutex.Unlock()
|
|
|
-
|
|
|
- if channelTunnel != nil {
|
|
|
- select {
|
|
|
- case channelTunnel.signalPortForwardFailure <- *new(struct{}):
|
|
|
- default:
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- lastSignalTime = monotime.Now()
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-}
|