replay_test.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. /*
  2. * Copyright (c) 2020, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "context"
  22. "encoding/json"
  23. "fmt"
  24. "io/ioutil"
  25. "os"
  26. "path/filepath"
  27. "reflect"
  28. "sync"
  29. "testing"
  30. "time"
  31. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
  32. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
  34. )
  35. func TestServerFragmentorReplay(t *testing.T) {
  36. runServerReplayTests(t, false)
  37. }
  38. func runServerReplayTests(t *testing.T, runPacketManipulation bool) {
  39. // Do not use OSSH, which has a different fragmentor replay mechanism. Meek
  40. // has a unique code path for passing around replay parameters and metrics.
  41. testCases := protocol.TunnelProtocols{
  42. protocol.TUNNEL_PROTOCOL_SSH,
  43. protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  44. }
  45. for _, tunnelProtocol := range testCases {
  46. t.Run(tunnelProtocol, func(t *testing.T) {
  47. runServerReplayTest(t, runPacketManipulation, tunnelProtocol)
  48. })
  49. }
  50. }
  51. func runServerReplayTest(
  52. t *testing.T,
  53. runPacketManipulation bool,
  54. tunnelProtocol string) {
  55. psiphon.SetEmitDiagnosticNotices(true, true)
  56. // Configure tactics
  57. tacticsConfigJSONFormat := `
  58. {
  59. "RequestPublicKey" : "%s",
  60. "RequestPrivateKey" : "%s",
  61. "RequestObfuscatedKey" : "%s",
  62. "DefaultTactics" : {
  63. "TTL" : "60s",
  64. "Probability" : 1.0,
  65. "Parameters" : {
  66. "LimitTunnelProtocols" : ["%s"],
  67. "FragmentorDownstreamLimitProtocols" : ["%s"],
  68. "FragmentorDownstreamProbability" : 1.0,
  69. "FragmentorDownstreamMinTotalBytes" : 10,
  70. "FragmentorDownstreamMaxTotalBytes" : 10,
  71. "FragmentorDownstreamMinWriteBytes" : 1,
  72. "FragmentorDownstreamMaxWriteBytes" : 1,
  73. "FragmentorDownstreamMinDelay" : "1ms",
  74. "FragmentorDownstreamMaxDelay" : "1ms",
  75. "ServerPacketManipulationSpecs" : [{"Name": "test-packetman-spec", "PacketSpecs": [[]]}],
  76. "ServerPacketManipulationProbability" : 1.0,
  77. "ServerProtocolPacketManipulations": {"%s" : ["test-packetman-spec"]},
  78. "ServerReplayPacketManipulation" : true,
  79. "ServerReplayFragmentor" : true,
  80. "ServerReplayUnknownGeoIP" : true,
  81. "ServerReplayTTL" : "5s",
  82. "ServerReplayTargetWaitDuration" : "200ms",
  83. "ServerReplayTargetTunnelDuration" : "50ms",
  84. "ServerReplayTargetUpstreamBytes" : 0,
  85. "ServerReplayTargetDownstreamBytes" : 0,
  86. "ServerReplayFailedCountThreshold" : 1,
  87. "ServerReplayFailedCountThreshold" : 1
  88. }
  89. }
  90. }
  91. `
  92. tacticsRequestPublicKey, tacticsRequestPrivateKey, tacticsRequestObfuscatedKey, err :=
  93. tactics.GenerateKeys()
  94. if err != nil {
  95. t.Fatalf("error generating tactics keys: %s", err)
  96. }
  97. tacticsConfigJSON := fmt.Sprintf(
  98. tacticsConfigJSONFormat,
  99. tacticsRequestPublicKey, tacticsRequestPrivateKey, tacticsRequestObfuscatedKey,
  100. tunnelProtocol, tunnelProtocol, tunnelProtocol)
  101. tacticsConfigFilename := filepath.Join(testDataDirName, "tactics_config.json")
  102. err = ioutil.WriteFile(tacticsConfigFilename, []byte(tacticsConfigJSON), 0600)
  103. if err != nil {
  104. t.Fatalf("error paving tactics config file: %s", err)
  105. }
  106. // Run Psiphon server
  107. generateConfigParams := &GenerateConfigParams{
  108. ServerIPAddress: "127.0.0.1",
  109. EnableSSHAPIRequests: true,
  110. WebServerPort: 8000,
  111. TunnelProtocolPorts: map[string]int{tunnelProtocol: 4000},
  112. }
  113. serverConfigJSON, _, _, _, encodedServerEntry, err := GenerateConfig(generateConfigParams)
  114. if err != nil {
  115. t.Fatalf("error generating server config: %s", err)
  116. }
  117. var serverConfig map[string]interface{}
  118. json.Unmarshal(serverConfigJSON, &serverConfig)
  119. serverConfig["LogFilename"] = filepath.Join(testDataDirName, "psiphond.log")
  120. serverConfig["LogLevel"] = "debug"
  121. serverConfig["TacticsConfigFilename"] = tacticsConfigFilename
  122. // Ensure server_tunnels emit quickly.
  123. serverConfig["MeekMaxSessionStalenessMilliseconds"] = 500
  124. if runPacketManipulation {
  125. serverConfig["RunPacketManipulator"] = true
  126. }
  127. serverConfigJSON, _ = json.Marshal(serverConfig)
  128. serverTunnelLog := make(chan map[string]interface{}, 1)
  129. setLogCallback(func(log []byte) {
  130. logFields := make(map[string]interface{})
  131. err := json.Unmarshal(log, &logFields)
  132. if err != nil {
  133. return
  134. }
  135. if logFields["event_name"] == nil {
  136. return
  137. }
  138. if logFields["event_name"].(string) == "server_tunnel" {
  139. select {
  140. case serverTunnelLog <- logFields:
  141. default:
  142. }
  143. }
  144. })
  145. serverWaitGroup := new(sync.WaitGroup)
  146. serverWaitGroup.Add(1)
  147. go func() {
  148. defer serverWaitGroup.Done()
  149. err := RunServices(serverConfigJSON)
  150. if err != nil {
  151. t.Errorf("error running server: %s", err)
  152. }
  153. }()
  154. defer func() {
  155. p, _ := os.FindProcess(os.Getpid())
  156. p.Signal(os.Interrupt)
  157. serverWaitGroup.Wait()
  158. }()
  159. // TODO: monitor logs for more robust wait-until-loaded.
  160. time.Sleep(1 * time.Second)
  161. checkServerTunnelLog := func(expectReplay bool) {
  162. // Numbers are float64 due to JSON decoding.
  163. expectedServerTunnelFields := map[string]interface{}{
  164. "downstream_bytes_fragmented": float64(10),
  165. "downstream_min_bytes_written": float64(1),
  166. "downstream_max_bytes_written": float64(1),
  167. "downstream_min_delayed": float64(1000),
  168. "downstream_max_delayed": float64(1000),
  169. "server_replay_fragmentation": expectReplay,
  170. "server_replay_packet_manipulation": expectReplay && runPacketManipulation,
  171. }
  172. if runPacketManipulation {
  173. expectedServerTunnelFields["server_packet_manipulation"] = "test-packetman-spec"
  174. }
  175. logFields := <-serverTunnelLog
  176. for name, value := range expectedServerTunnelFields {
  177. logValue, ok := logFields[name]
  178. if !ok {
  179. t.Fatalf("Missing expected server_tunnel field: %s", name)
  180. }
  181. if !reflect.DeepEqual(logValue, value) {
  182. t.Fatalf(
  183. "Unexpected server_tunnel %s value: got %T(%v); expected %T(%v)",
  184. name, logValue, logValue, value, value)
  185. }
  186. }
  187. }
  188. t.Log("first client run; no replay")
  189. runServerReplayClient(t, encodedServerEntry, true)
  190. checkServerTunnelLog(false)
  191. t.Log("second client run; is replay")
  192. runServerReplayClient(t, encodedServerEntry, true)
  193. checkServerTunnelLog(true)
  194. t.Log("TTL expires; no replay")
  195. // Wait until TTL expires.
  196. time.Sleep(5100 * time.Millisecond)
  197. runServerReplayClient(t, encodedServerEntry, true)
  198. checkServerTunnelLog(false)
  199. t.Log("failure clears replay; no replay")
  200. runServerReplayClient(t, encodedServerEntry, true)
  201. checkServerTunnelLog(true)
  202. runServerReplayClient(t, encodedServerEntry, false)
  203. // No server_tunnel for SSH handshake failure.
  204. // Wait for session to be retired, which will trigger replay failure.
  205. if protocol.TunnelProtocolUsesMeek(tunnelProtocol) {
  206. time.Sleep(1000 * time.Millisecond)
  207. }
  208. runServerReplayClient(t, encodedServerEntry, true)
  209. checkServerTunnelLog(false)
  210. }
  211. func runServerReplayClient(
  212. t *testing.T,
  213. encodedServerEntry []byte,
  214. handshakeSuccess bool) {
  215. if !handshakeSuccess {
  216. serverEntry, err := protocol.DecodeServerEntry(string(encodedServerEntry), "", "")
  217. if err != nil {
  218. t.Fatalf("error decoding server entry: %s", err)
  219. }
  220. serverEntry.SshPassword = ""
  221. encodedServerEntryStr, err := protocol.EncodeServerEntry(serverEntry)
  222. if err != nil {
  223. t.Fatalf("error encoding server entry: %s", err)
  224. }
  225. encodedServerEntry = []byte(encodedServerEntryStr)
  226. }
  227. dataRootDir, err := ioutil.TempDir(testDataDirName, "serverReplayClient")
  228. if err != nil {
  229. t.Fatalf("error createing temp dir: %s", err)
  230. }
  231. defer os.RemoveAll(dataRootDir)
  232. clientConfigJSON := fmt.Sprintf(`
  233. {
  234. "DataRootDirectory" : "%s",
  235. "ClientPlatform" : "Windows",
  236. "ClientVersion" : "0",
  237. "SponsorId" : "0",
  238. "PropagationChannelId" : "0",
  239. "TargetServerEntry" : "%s"
  240. }`, dataRootDir, string(encodedServerEntry))
  241. clientConfig, err := psiphon.LoadConfig([]byte(clientConfigJSON))
  242. if err != nil {
  243. t.Fatalf("error processing configuration file: %s", err)
  244. }
  245. err = clientConfig.Commit(false)
  246. if err != nil {
  247. t.Fatalf("error committing configuration file: %s", err)
  248. }
  249. err = psiphon.OpenDataStore(clientConfig)
  250. if err != nil {
  251. t.Fatalf("error initializing client datastore: %s", err)
  252. }
  253. defer psiphon.CloseDataStore()
  254. controller, err := psiphon.NewController(clientConfig)
  255. if err != nil {
  256. t.Fatalf("error creating client controller: %s", err)
  257. }
  258. tunnelEstablished := make(chan struct{}, 1)
  259. psiphon.SetNoticeWriter(psiphon.NewNoticeReceiver(
  260. func(notice []byte) {
  261. noticeType, payload, err := psiphon.GetNotice(notice)
  262. if err != nil {
  263. return
  264. }
  265. if noticeType == "Tunnels" {
  266. count := int(payload["count"].(float64))
  267. if count >= 1 {
  268. tunnelEstablished <- struct{}{}
  269. }
  270. }
  271. }))
  272. ctx, cancelFunc := context.WithCancel(context.Background())
  273. controllerWaitGroup := new(sync.WaitGroup)
  274. controllerWaitGroup.Add(1)
  275. go func() {
  276. defer controllerWaitGroup.Done()
  277. controller.Run(ctx)
  278. }()
  279. if handshakeSuccess {
  280. <-tunnelEstablished
  281. }
  282. // Meet tunnel duration critera.
  283. for i := 0; i < 20; i++ {
  284. time.Sleep(10 * time.Millisecond)
  285. _, _ = controller.Dial("127.0.0.1:80", nil)
  286. }
  287. cancelFunc()
  288. controllerWaitGroup.Wait()
  289. }