| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843 |
- /*
- * Copyright (c) 2015, Psiphon Inc.
- * All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- */
- package psiphon
- import (
- "flag"
- "fmt"
- "io"
- "io/ioutil"
- "net"
- "net/http"
- "net/url"
- "os"
- "strings"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- socks "github.com/Psiphon-Inc/goptlib"
- )
- func TestMain(m *testing.M) {
- flag.Parse()
- os.Remove(DATA_STORE_FILENAME)
- initDisruptor()
- SetEmitDiagnosticNotices(true)
- os.Exit(m.Run())
- }
- // Test case notes/limitations/dependencies:
- //
- // * Untunneled upgrade tests must execute before
- // the other tests to ensure no tunnel is established.
- // We need a way to reset the datastore after it's been
- // initialized in order to to clear out its data entries
- // and be able to arbitrarily order the tests.
- //
- // * The resumable download tests using disruptNetwork
- // depend on the download object being larger than the
- // disruptorMax limits so that the disruptor will actually
- // interrupt the first download attempt. Specifically, the
- // upgrade and remote server list at the URLs specified in
- // controller_test.config.enc.
- //
- // * The protocol tests assume there is at least one server
- // supporting each protocol in the server list at the URL
- // specified in controller_test.config.enc, and that these
- // servers are not overloaded.
- //
- // * fetchAndVerifyWebsite depends on the target URL being
- // available and responding.
- //
- func TestUntunneledUpgradeDownload(t *testing.T) {
- controllerRun(t,
- &controllerRunConfig{
- expectNoServerEntries: true,
- protocol: "",
- clientIsLatestVersion: false,
- disableUntunneledUpgrade: false,
- disableEstablishing: true,
- disableApi: false,
- tunnelPoolSize: 1,
- disruptNetwork: false,
- useHostNameTransformer: false,
- runDuration: 0,
- })
- }
- func TestUntunneledResumableUpgradeDownload(t *testing.T) {
- controllerRun(t,
- &controllerRunConfig{
- expectNoServerEntries: true,
- protocol: "",
- clientIsLatestVersion: false,
- disableUntunneledUpgrade: false,
- disableEstablishing: true,
- disableApi: false,
- tunnelPoolSize: 1,
- disruptNetwork: true,
- useHostNameTransformer: false,
- runDuration: 0,
- })
- }
- func TestUntunneledUpgradeClientIsLatestVersion(t *testing.T) {
- controllerRun(t,
- &controllerRunConfig{
- expectNoServerEntries: true,
- protocol: "",
- clientIsLatestVersion: true,
- disableUntunneledUpgrade: false,
- disableEstablishing: true,
- disableApi: false,
- tunnelPoolSize: 1,
- disruptNetwork: false,
- useHostNameTransformer: false,
- runDuration: 0,
- })
- }
- func TestUntunneledResumableFetchRemoveServerList(t *testing.T) {
- controllerRun(t,
- &controllerRunConfig{
- expectNoServerEntries: true,
- protocol: "",
- clientIsLatestVersion: true,
- disableUntunneledUpgrade: false,
- disableEstablishing: false,
- disableApi: false,
- tunnelPoolSize: 1,
- disruptNetwork: true,
- useHostNameTransformer: false,
- runDuration: 0,
- })
- }
- func TestTunneledUpgradeClientIsLatestVersion(t *testing.T) {
- controllerRun(t,
- &controllerRunConfig{
- expectNoServerEntries: false,
- protocol: "",
- clientIsLatestVersion: true,
- disableUntunneledUpgrade: true,
- disableEstablishing: false,
- disableApi: false,
- tunnelPoolSize: 1,
- disruptNetwork: false,
- useHostNameTransformer: false,
- runDuration: 0,
- })
- }
- func TestImpairedProtocols(t *testing.T) {
- // This test sets a tunnelPoolSize of 40 and runs
- // the session for 1 minute with network disruption
- // on. All 40 tunnels being disrupted every 10
- // seconds (followed by ssh keep alive probe timeout)
- // should be sufficient to trigger at least one
- // impaired protocol classification.
- controllerRun(t,
- &controllerRunConfig{
- expectNoServerEntries: false,
- protocol: "",
- clientIsLatestVersion: true,
- disableUntunneledUpgrade: true,
- disableEstablishing: false,
- disableApi: false,
- tunnelPoolSize: 40,
- disruptNetwork: true,
- useHostNameTransformer: false,
- runDuration: 1 * time.Minute,
- })
- }
- func TestSSH(t *testing.T) {
- controllerRun(t,
- &controllerRunConfig{
- expectNoServerEntries: false,
- protocol: TUNNEL_PROTOCOL_SSH,
- clientIsLatestVersion: false,
- disableUntunneledUpgrade: true,
- disableEstablishing: false,
- disableApi: false,
- tunnelPoolSize: 1,
- disruptNetwork: false,
- useHostNameTransformer: false,
- runDuration: 0,
- })
- }
- func TestObfuscatedSSH(t *testing.T) {
- controllerRun(t,
- &controllerRunConfig{
- expectNoServerEntries: false,
- protocol: TUNNEL_PROTOCOL_OBFUSCATED_SSH,
- clientIsLatestVersion: false,
- disableUntunneledUpgrade: true,
- disableEstablishing: false,
- disableApi: false,
- tunnelPoolSize: 1,
- disruptNetwork: false,
- useHostNameTransformer: false,
- runDuration: 0,
- })
- }
- func TestUnfrontedMeek(t *testing.T) {
- controllerRun(t,
- &controllerRunConfig{
- expectNoServerEntries: false,
- protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK,
- clientIsLatestVersion: false,
- disableUntunneledUpgrade: true,
- disableEstablishing: false,
- disableApi: false,
- tunnelPoolSize: 1,
- disruptNetwork: false,
- useHostNameTransformer: false,
- runDuration: 0,
- })
- }
- func TestUnfrontedMeekWithTransformer(t *testing.T) {
- controllerRun(t,
- &controllerRunConfig{
- expectNoServerEntries: false,
- protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK,
- clientIsLatestVersion: true,
- disableUntunneledUpgrade: true,
- disableEstablishing: false,
- disableApi: false,
- tunnelPoolSize: 1,
- disruptNetwork: false,
- useHostNameTransformer: true,
- runDuration: 0,
- })
- }
- func TestFrontedMeek(t *testing.T) {
- controllerRun(t,
- &controllerRunConfig{
- expectNoServerEntries: false,
- protocol: TUNNEL_PROTOCOL_FRONTED_MEEK,
- clientIsLatestVersion: false,
- disableUntunneledUpgrade: true,
- disableEstablishing: false,
- disableApi: false,
- tunnelPoolSize: 1,
- disruptNetwork: false,
- useHostNameTransformer: false,
- runDuration: 0,
- })
- }
- func TestFrontedMeekWithTransformer(t *testing.T) {
- controllerRun(t,
- &controllerRunConfig{
- expectNoServerEntries: false,
- protocol: TUNNEL_PROTOCOL_FRONTED_MEEK,
- clientIsLatestVersion: true,
- disableUntunneledUpgrade: true,
- disableEstablishing: false,
- disableApi: false,
- tunnelPoolSize: 1,
- disruptNetwork: false,
- useHostNameTransformer: true,
- runDuration: 0,
- })
- }
- func TestFrontedMeekHTTP(t *testing.T) {
- controllerRun(t,
- &controllerRunConfig{
- expectNoServerEntries: false,
- protocol: TUNNEL_PROTOCOL_FRONTED_MEEK_HTTP,
- clientIsLatestVersion: true,
- disableUntunneledUpgrade: true,
- disableEstablishing: false,
- disableApi: false,
- tunnelPoolSize: 1,
- disruptNetwork: false,
- useHostNameTransformer: false,
- runDuration: 0,
- })
- }
- func TestUnfrontedMeekHTTPS(t *testing.T) {
- controllerRun(t,
- &controllerRunConfig{
- expectNoServerEntries: false,
- protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
- clientIsLatestVersion: false,
- disableUntunneledUpgrade: true,
- disableEstablishing: false,
- disableApi: false,
- tunnelPoolSize: 1,
- disruptNetwork: false,
- useHostNameTransformer: false,
- runDuration: 0,
- })
- }
- func TestUnfrontedMeekHTTPSWithTransformer(t *testing.T) {
- controllerRun(t,
- &controllerRunConfig{
- expectNoServerEntries: false,
- protocol: TUNNEL_PROTOCOL_UNFRONTED_MEEK_HTTPS,
- clientIsLatestVersion: true,
- disableUntunneledUpgrade: true,
- disableEstablishing: false,
- disableApi: false,
- tunnelPoolSize: 1,
- disruptNetwork: false,
- useHostNameTransformer: true,
- runDuration: 0,
- })
- }
- func TestDisabledApi(t *testing.T) {
- controllerRun(t,
- &controllerRunConfig{
- expectNoServerEntries: false,
- protocol: "",
- clientIsLatestVersion: true,
- disableUntunneledUpgrade: true,
- disableEstablishing: false,
- disableApi: true,
- tunnelPoolSize: 1,
- disruptNetwork: false,
- useHostNameTransformer: false,
- runDuration: 0,
- })
- }
- type controllerRunConfig struct {
- expectNoServerEntries bool
- protocol string
- clientIsLatestVersion bool
- disableUntunneledUpgrade bool
- disableEstablishing bool
- tunnelPoolSize int
- disruptNetwork bool
- useHostNameTransformer bool
- runDuration time.Duration
- disableApi bool
- }
- func controllerRun(t *testing.T, runConfig *controllerRunConfig) {
- configFileContents, err := ioutil.ReadFile("controller_test.config")
- if err != nil {
- // Skip, don't fail, if config file is not present
- t.Skipf("error loading configuration file: %s", err)
- }
- config, err := LoadConfig(configFileContents)
- if err != nil {
- t.Fatalf("error processing configuration file: %s", err)
- }
- if runConfig.clientIsLatestVersion {
- config.ClientVersion = "999999999"
- }
- if runConfig.disableEstablishing {
- // Clear remote server list so tunnel cannot be established.
- // TODO: also delete all server entries in the datastore.
- config.RemoteServerListUrl = ""
- }
- if runConfig.disableApi {
- config.DisableApi = true
- }
- config.TunnelPoolSize = runConfig.tunnelPoolSize
- if runConfig.disableUntunneledUpgrade {
- // Disable untunneled upgrade downloader to ensure tunneled case is tested
- config.UpgradeDownloadClientVersionHeader = ""
- }
- if runConfig.disruptNetwork {
- config.UpstreamProxyUrl = disruptorProxyURL
- }
- if runConfig.useHostNameTransformer {
- config.HostNameTransformer = &TestHostNameTransformer{}
- }
- // Override client retry throttle values to speed up automated
- // tests and ensure tests complete within fixed deadlines.
- fetchRemoteServerListRetryPeriodSeconds := 0
- config.FetchRemoteServerListRetryPeriodSeconds = &fetchRemoteServerListRetryPeriodSeconds
- downloadUpgradeRetryPeriodSeconds := 0
- config.DownloadUpgradeRetryPeriodSeconds = &downloadUpgradeRetryPeriodSeconds
- establishTunnelPausePeriodSeconds := 1
- config.EstablishTunnelPausePeriodSeconds = &establishTunnelPausePeriodSeconds
- os.Remove(config.UpgradeDownloadFilename)
- config.TunnelProtocol = runConfig.protocol
- err = InitDataStore(config)
- if err != nil {
- t.Fatalf("error initializing datastore: %s", err)
- }
- serverEntryCount := CountServerEntries("", "")
- if runConfig.expectNoServerEntries && serverEntryCount > 0 {
- // TODO: replace expectNoServerEntries with resetServerEntries
- // so tests can run in arbitrary order
- t.Fatalf("unexpected server entries")
- }
- controller, err := NewController(config)
- if err != nil {
- t.Fatalf("error creating controller: %s", err)
- }
- // Monitor notices for "Tunnels" with count > 1, the
- // indication of tunnel establishment success.
- // Also record the selected HTTP proxy port to use
- // when fetching websites through the tunnel.
- httpProxyPort := 0
- tunnelEstablished := make(chan struct{}, 1)
- upgradeDownloaded := make(chan struct{}, 1)
- remoteServerListDownloaded := make(chan struct{}, 1)
- confirmedLatestVersion := make(chan struct{}, 1)
- var clientUpgradeDownloadedBytesCount int32
- var remoteServerListDownloadedBytesCount int32
- var impairedProtocolCount int32
- var impairedProtocolClassification = struct {
- sync.RWMutex
- classification map[string]int
- }{classification: make(map[string]int)}
- SetNoticeOutput(NewNoticeReceiver(
- func(notice []byte) {
- // TODO: log notices without logging server IPs:
- // fmt.Fprintf(os.Stderr, "%s\n", string(notice))
- noticeType, payload, err := GetNotice(notice)
- if err != nil {
- return
- }
- switch noticeType {
- case "ListeningHttpProxyPort":
- httpProxyPort = int(payload["port"].(float64))
- case "ConnectingServer":
- serverProtocol := payload["protocol"].(string)
- if runConfig.protocol != "" && serverProtocol != runConfig.protocol {
- // TODO: wrong goroutine for t.FatalNow()
- t.Fatalf("wrong protocol selected: %s", serverProtocol)
- }
- case "Tunnels":
- count := int(payload["count"].(float64))
- if count > 0 {
- if runConfig.disableEstablishing {
- // TODO: wrong goroutine for t.FatalNow()
- t.Fatalf("tunnel established unexpectedly")
- } else {
- select {
- case tunnelEstablished <- *new(struct{}):
- default:
- }
- }
- }
- case "ClientUpgradeDownloadedBytes":
- atomic.AddInt32(&clientUpgradeDownloadedBytesCount, 1)
- t.Logf("ClientUpgradeDownloadedBytes: %d", int(payload["bytes"].(float64)))
- case "ClientUpgradeDownloaded":
- select {
- case upgradeDownloaded <- *new(struct{}):
- default:
- }
- case "ClientIsLatestVersion":
- select {
- case confirmedLatestVersion <- *new(struct{}):
- default:
- }
- case "RemoteServerListDownloadedBytes":
- atomic.AddInt32(&remoteServerListDownloadedBytesCount, 1)
- t.Logf("RemoteServerListDownloadedBytes: %d", int(payload["bytes"].(float64)))
- case "RemoteServerListDownloaded":
- select {
- case remoteServerListDownloaded <- *new(struct{}):
- default:
- }
- case "ImpairedProtocolClassification":
- classification := payload["classification"].(map[string]interface{})
- impairedProtocolClassification.Lock()
- impairedProtocolClassification.classification = make(map[string]int)
- for k, v := range classification {
- count := int(v.(float64))
- if count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
- atomic.AddInt32(&impairedProtocolCount, 1)
- }
- impairedProtocolClassification.classification[k] = count
- }
- impairedProtocolClassification.Unlock()
- case "ActiveTunnel":
- serverProtocol := payload["protocol"].(string)
- classification := make(map[string]int)
- impairedProtocolClassification.RLock()
- for k, v := range impairedProtocolClassification.classification {
- classification[k] = v
- }
- impairedProtocolClassification.RUnlock()
- count, ok := classification[serverProtocol]
- if ok && count >= IMPAIRED_PROTOCOL_CLASSIFICATION_THRESHOLD {
- // TODO: wrong goroutine for t.FatalNow()
- t.Fatalf("unexpected tunnel using impaired protocol: %s, %+v",
- serverProtocol, classification)
- }
- }
- }))
- // Run controller, which establishes tunnels
- shutdownBroadcast := make(chan struct{})
- controllerWaitGroup := new(sync.WaitGroup)
- controllerWaitGroup.Add(1)
- go func() {
- defer controllerWaitGroup.Done()
- controller.Run(shutdownBroadcast)
- }()
- defer func() {
- // Test: shutdown must complete within 20 seconds
- close(shutdownBroadcast)
- shutdownTimeout := time.NewTimer(20 * time.Second)
- shutdownOk := make(chan struct{}, 1)
- go func() {
- controllerWaitGroup.Wait()
- shutdownOk <- *new(struct{})
- }()
- select {
- case <-shutdownOk:
- case <-shutdownTimeout.C:
- t.Fatalf("controller shutdown timeout exceeded")
- }
- }()
- if !runConfig.disableEstablishing {
- // Test: tunnel must be established within 120 seconds
- establishTimeout := time.NewTimer(120 * time.Second)
- select {
- case <-tunnelEstablished:
- case <-establishTimeout.C:
- t.Fatalf("tunnel establish timeout exceeded")
- }
- // Test: if starting with no server entries, a fetch remote
- // server list must have succeeded. With disruptNetwork, the
- // fetch must have been resumed at least once.
- if serverEntryCount == 0 {
- select {
- case <-remoteServerListDownloaded:
- default:
- t.Fatalf("expected remote server list downloaded")
- }
- if runConfig.disruptNetwork {
- count := atomic.LoadInt32(&remoteServerListDownloadedBytesCount)
- if count <= 1 {
- t.Fatalf("unexpected remote server list download progress: %d", count)
- }
- }
- }
- // Test: fetch website through tunnel
- // Allow for known race condition described in NewHttpProxy():
- time.Sleep(1 * time.Second)
- fetchAndVerifyWebsite(t, httpProxyPort)
- // Test: run for duration, periodically using the tunnel to
- // ensure failed tunnel detection, and ultimately hitting
- // impaired protocol checks.
- startTime := time.Now()
- for {
- time.Sleep(1 * time.Second)
- useTunnel(t, httpProxyPort)
- if startTime.Add(runConfig.runDuration).Before(time.Now()) {
- break
- }
- }
- // Test: with disruptNetwork, impaired protocols should be exercised
- if runConfig.runDuration > 0 && runConfig.disruptNetwork {
- count := atomic.LoadInt32(&impairedProtocolCount)
- if count <= 0 {
- t.Fatalf("unexpected impaired protocol count: %d", count)
- } else {
- impairedProtocolClassification.RLock()
- t.Logf("impaired protocol classification: %+v",
- impairedProtocolClassification.classification)
- impairedProtocolClassification.RUnlock()
- }
- }
- }
- // Test: upgrade check/download must be downloaded within 180 seconds
- expectUpgrade := !runConfig.disableApi && !runConfig.disableUntunneledUpgrade
- if expectUpgrade {
- upgradeTimeout := time.NewTimer(180 * time.Second)
- select {
- case <-upgradeDownloaded:
- // TODO: verify downloaded file
- if runConfig.clientIsLatestVersion {
- t.Fatalf("upgrade downloaded unexpectedly")
- }
- // Test: with disruptNetwork, must be multiple download progress notices
- if runConfig.disruptNetwork {
- count := atomic.LoadInt32(&clientUpgradeDownloadedBytesCount)
- if count <= 1 {
- t.Fatalf("unexpected upgrade download progress: %d", count)
- }
- }
- case <-confirmedLatestVersion:
- if !runConfig.clientIsLatestVersion {
- t.Fatalf("confirmed latest version unexpectedly")
- }
- case <-upgradeTimeout.C:
- t.Fatalf("upgrade download timeout exceeded")
- }
- }
- }
- type TestHostNameTransformer struct {
- }
- func (TestHostNameTransformer) TransformHostName(string) (string, bool) {
- return "example.com", true
- }
- func fetchAndVerifyWebsite(t *testing.T, httpProxyPort int) {
- testUrl := "https://raw.githubusercontent.com/Psiphon-Labs/psiphon-tunnel-core/master/LICENSE"
- roundTripTimeout := 10 * time.Second
- expectedResponsePrefix := " GNU GENERAL PUBLIC LICENSE"
- expectedResponseSize := 35148
- checkResponse := func(responseBody string) bool {
- return strings.HasPrefix(responseBody, expectedResponsePrefix) && len(responseBody) == expectedResponseSize
- }
- // Test: use HTTP proxy
- proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
- if err != nil {
- t.Fatalf("error initializing proxied HTTP request: %s", err)
- }
- httpClient := &http.Client{
- Transport: &http.Transport{
- Proxy: http.ProxyURL(proxyUrl),
- },
- Timeout: roundTripTimeout,
- }
- response, err := httpClient.Get(testUrl)
- if err != nil {
- t.Fatalf("error sending proxied HTTP request: %s", err)
- }
- body, err := ioutil.ReadAll(response.Body)
- if err != nil {
- t.Fatalf("error reading proxied HTTP response: %s", err)
- }
- response.Body.Close()
- if !checkResponse(string(body)) {
- t.Fatalf("unexpected proxied HTTP response")
- }
- // Test: use direct URL proxy
- httpClient = &http.Client{
- Transport: http.DefaultTransport,
- Timeout: roundTripTimeout,
- }
- response, err = httpClient.Get(
- fmt.Sprintf("http://127.0.0.1:%d/direct/%s",
- httpProxyPort, url.QueryEscape(testUrl)))
- if err != nil {
- t.Fatalf("error sending direct URL request: %s", err)
- }
- body, err = ioutil.ReadAll(response.Body)
- if err != nil {
- t.Fatalf("error reading direct URL response: %s", err)
- }
- response.Body.Close()
- if !checkResponse(string(body)) {
- t.Fatalf("unexpected direct URL response")
- }
- // Test: use tunneled URL proxy
- response, err = httpClient.Get(
- fmt.Sprintf("http://127.0.0.1:%d/tunneled/%s",
- httpProxyPort, url.QueryEscape(testUrl)))
- if err != nil {
- t.Fatalf("error sending tunneled URL request: %s", err)
- }
- body, err = ioutil.ReadAll(response.Body)
- if err != nil {
- t.Fatalf("error reading tunneled URL response: %s", err)
- }
- response.Body.Close()
- if !checkResponse(string(body)) {
- t.Fatalf("unexpected tunneled URL response")
- }
- }
- func useTunnel(t *testing.T, httpProxyPort int) {
- // No action on errors as the tunnel is expected to fail sometimes
- testUrl := "https://psiphon3.com"
- roundTripTimeout := 1 * time.Second
- proxyUrl, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", httpProxyPort))
- if err != nil {
- return
- }
- httpClient := &http.Client{
- Transport: &http.Transport{
- Proxy: http.ProxyURL(proxyUrl),
- },
- Timeout: roundTripTimeout,
- }
- response, err := httpClient.Get(testUrl)
- if err != nil {
- return
- }
- response.Body.Close()
- }
- const disruptorProxyAddress = "127.0.0.1:2160"
- const disruptorProxyURL = "socks4a://" + disruptorProxyAddress
- const disruptorMaxConnectionBytes = 625000
- const disruptorMaxConnectionTime = 10 * time.Second
- func initDisruptor() {
- go func() {
- listener, err := socks.ListenSocks("tcp", disruptorProxyAddress)
- if err != nil {
- fmt.Errorf("disruptor proxy listen error: %s", err)
- return
- }
- for {
- localConn, err := listener.AcceptSocks()
- if err != nil {
- fmt.Errorf("disruptor proxy accept error: %s", err)
- return
- }
- go func() {
- defer localConn.Close()
- remoteConn, err := net.Dial("tcp", localConn.Req.Target)
- if err != nil {
- fmt.Errorf("disruptor proxy dial error: %s", err)
- return
- }
- defer remoteConn.Close()
- err = localConn.Grant(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})
- if err != nil {
- fmt.Errorf("disruptor proxy grant error: %s", err)
- return
- }
- // Cut connection after disruptorMaxConnectionTime
- time.AfterFunc(disruptorMaxConnectionTime, func() {
- localConn.Close()
- remoteConn.Close()
- })
- // Relay connection, but only up to disruptorMaxConnectionBytes
- waitGroup := new(sync.WaitGroup)
- waitGroup.Add(1)
- go func() {
- defer waitGroup.Done()
- io.CopyN(localConn, remoteConn, disruptorMaxConnectionBytes)
- }()
- io.CopyN(remoteConn, localConn, disruptorMaxConnectionBytes)
- waitGroup.Wait()
- }()
- }
- }()
- }
|