replay_test.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. /*
  2. * Copyright (c) 2020, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "context"
  22. "encoding/json"
  23. "fmt"
  24. "io/ioutil"
  25. "os"
  26. "path/filepath"
  27. "reflect"
  28. "sync"
  29. "testing"
  30. "time"
  31. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
  32. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/tactics"
  34. )
  35. func TestServerFragmentorReplay(t *testing.T) {
  36. runServerReplayTests(t, false)
  37. }
  38. func runServerReplayTests(t *testing.T, runPacketManipulation bool) {
  39. // Do not use OSSH, which has a different fragmentor replay mechanism. Meek
  40. // has a unique code path for passing around replay parameters and metrics.
  41. testCases := protocol.TunnelProtocols{
  42. protocol.TUNNEL_PROTOCOL_SSH,
  43. protocol.TUNNEL_PROTOCOL_UNFRONTED_MEEK,
  44. }
  45. for _, tunnelProtocol := range testCases {
  46. t.Run(tunnelProtocol, func(t *testing.T) {
  47. runServerReplayTest(t, runPacketManipulation, tunnelProtocol)
  48. })
  49. }
  50. }
  51. func runServerReplayTest(
  52. t *testing.T,
  53. runPacketManipulation bool,
  54. tunnelProtocol string) {
  55. psiphon.SetEmitDiagnosticNotices(true, true)
  56. // Configure tactics
  57. tacticsConfigJSONFormat := `
  58. {
  59. "RequestPublicKey" : "%s",
  60. "RequestPrivateKey" : "%s",
  61. "RequestObfuscatedKey" : "%s",
  62. "DefaultTactics" : {
  63. "TTL" : "60s",
  64. "Probability" : 1.0,
  65. "Parameters" : {
  66. "LimitTunnelProtocols" : ["%s"],
  67. "FragmentorDownstreamLimitProtocols" : ["%s"],
  68. "FragmentorDownstreamProbability" : 1.0,
  69. "FragmentorDownstreamMinTotalBytes" : 10,
  70. "FragmentorDownstreamMaxTotalBytes" : 10,
  71. "FragmentorDownstreamMinWriteBytes" : 1,
  72. "FragmentorDownstreamMaxWriteBytes" : 1,
  73. "FragmentorDownstreamMinDelay" : "1ms",
  74. "FragmentorDownstreamMaxDelay" : "1ms",
  75. "ServerPacketManipulationSpecs" : [{"Name": "test-packetman-spec", "PacketSpecs": [[]]}],
  76. "ServerPacketManipulationProbability" : 1.0,
  77. "ServerProtocolPacketManipulations": {"%s" : ["test-packetman-spec"]},
  78. "ServerReplayPacketManipulation" : true,
  79. "ServerReplayFragmentor" : true,
  80. "ServerReplayUnknownGeoIP" : true,
  81. "ServerReplayTTL" : "5s",
  82. "ServerReplayTargetWaitDuration" : "200ms",
  83. "ServerReplayTargetTunnelDuration" : "50ms",
  84. "ServerReplayTargetUpstreamBytes" : 0,
  85. "ServerReplayTargetDownstreamBytes" : 0,
  86. "ServerReplayFailedCountThreshold" : 1,
  87. "ServerReplayFailedCountThreshold" : 1
  88. }
  89. }
  90. }
  91. `
  92. tacticsRequestPublicKey, tacticsRequestPrivateKey, tacticsRequestObfuscatedKey, err :=
  93. tactics.GenerateKeys()
  94. if err != nil {
  95. t.Fatalf("error generating tactics keys: %s", err)
  96. }
  97. tacticsConfigJSON := fmt.Sprintf(
  98. tacticsConfigJSONFormat,
  99. tacticsRequestPublicKey, tacticsRequestPrivateKey, tacticsRequestObfuscatedKey,
  100. tunnelProtocol, tunnelProtocol, tunnelProtocol)
  101. tacticsConfigFilename := filepath.Join(testDataDirName, "tactics_config.json")
  102. err = ioutil.WriteFile(tacticsConfigFilename, []byte(tacticsConfigJSON), 0600)
  103. if err != nil {
  104. t.Fatalf("error paving tactics config file: %s", err)
  105. }
  106. // Run Psiphon server
  107. generateConfigParams := &GenerateConfigParams{
  108. ServerIPAddress: "127.0.0.1",
  109. TunnelProtocolPorts: map[string]int{tunnelProtocol: 4000},
  110. }
  111. serverConfigJSON, _, _, _, encodedServerEntry, err := GenerateConfig(generateConfigParams)
  112. if err != nil {
  113. t.Fatalf("error generating server config: %s", err)
  114. }
  115. var serverConfig map[string]interface{}
  116. json.Unmarshal(serverConfigJSON, &serverConfig)
  117. serverConfig["LogFilename"] = filepath.Join(testDataDirName, "psiphond.log")
  118. serverConfig["LogLevel"] = "debug"
  119. serverConfig["TacticsConfigFilename"] = tacticsConfigFilename
  120. // Ensure server_tunnels emit quickly.
  121. serverConfig["MeekMaxSessionStalenessMilliseconds"] = 500
  122. if runPacketManipulation {
  123. serverConfig["RunPacketManipulator"] = true
  124. }
  125. serverConfigJSON, _ = json.Marshal(serverConfig)
  126. serverTunnelLog := make(chan map[string]interface{}, 1)
  127. setLogCallback(func(log []byte) {
  128. logFields := make(map[string]interface{})
  129. err := json.Unmarshal(log, &logFields)
  130. if err != nil {
  131. return
  132. }
  133. if logFields["event_name"] == nil {
  134. return
  135. }
  136. if logFields["event_name"].(string) == "server_tunnel" {
  137. select {
  138. case serverTunnelLog <- logFields:
  139. default:
  140. }
  141. }
  142. })
  143. serverWaitGroup := new(sync.WaitGroup)
  144. serverWaitGroup.Add(1)
  145. go func() {
  146. defer serverWaitGroup.Done()
  147. err := RunServices(serverConfigJSON)
  148. if err != nil {
  149. t.Errorf("error running server: %s", err)
  150. }
  151. }()
  152. defer func() {
  153. p, _ := os.FindProcess(os.Getpid())
  154. p.Signal(os.Interrupt)
  155. serverWaitGroup.Wait()
  156. }()
  157. // TODO: monitor logs for more robust wait-until-loaded.
  158. time.Sleep(1 * time.Second)
  159. checkServerTunnelLog := func(expectReplay bool) {
  160. // Numbers are float64 due to JSON decoding.
  161. expectedServerTunnelFields := map[string]interface{}{
  162. "downstream_bytes_fragmented": float64(10),
  163. "downstream_min_bytes_written": float64(1),
  164. "downstream_max_bytes_written": float64(1),
  165. "downstream_min_delayed": float64(1000),
  166. "downstream_max_delayed": float64(1000),
  167. "server_replay_fragmentation": expectReplay,
  168. "server_replay_packet_manipulation": expectReplay && runPacketManipulation,
  169. }
  170. if runPacketManipulation {
  171. expectedServerTunnelFields["server_packet_manipulation"] = "test-packetman-spec"
  172. }
  173. logFields := <-serverTunnelLog
  174. for name, value := range expectedServerTunnelFields {
  175. logValue, ok := logFields[name]
  176. if !ok {
  177. t.Fatalf("Missing expected server_tunnel field: %s", name)
  178. }
  179. if !reflect.DeepEqual(logValue, value) {
  180. t.Fatalf(
  181. "Unexpected server_tunnel %s value: got %T(%v); expected %T(%v)",
  182. name, logValue, logValue, value, value)
  183. }
  184. }
  185. }
  186. t.Log("first client run; no replay")
  187. runServerReplayClient(t, encodedServerEntry, true)
  188. checkServerTunnelLog(false)
  189. t.Log("second client run; is replay")
  190. runServerReplayClient(t, encodedServerEntry, true)
  191. checkServerTunnelLog(true)
  192. t.Log("TTL expires; no replay")
  193. // Wait until TTL expires.
  194. time.Sleep(5100 * time.Millisecond)
  195. runServerReplayClient(t, encodedServerEntry, true)
  196. checkServerTunnelLog(false)
  197. t.Log("failure clears replay; no replay")
  198. runServerReplayClient(t, encodedServerEntry, true)
  199. checkServerTunnelLog(true)
  200. runServerReplayClient(t, encodedServerEntry, false)
  201. // No server_tunnel for SSH handshake failure.
  202. // Wait for session to be retired, which will trigger replay failure.
  203. if protocol.TunnelProtocolUsesMeek(tunnelProtocol) {
  204. time.Sleep(1000 * time.Millisecond)
  205. }
  206. runServerReplayClient(t, encodedServerEntry, true)
  207. checkServerTunnelLog(false)
  208. }
  209. func runServerReplayClient(
  210. t *testing.T,
  211. encodedServerEntry []byte,
  212. handshakeSuccess bool) {
  213. if !handshakeSuccess {
  214. serverEntry, err := protocol.DecodeServerEntry(string(encodedServerEntry), "", "")
  215. if err != nil {
  216. t.Fatalf("error decoding server entry: %s", err)
  217. }
  218. serverEntry.SshPassword = ""
  219. encodedServerEntryStr, err := protocol.EncodeServerEntry(serverEntry)
  220. if err != nil {
  221. t.Fatalf("error encoding server entry: %s", err)
  222. }
  223. encodedServerEntry = []byte(encodedServerEntryStr)
  224. }
  225. dataRootDir, err := ioutil.TempDir(testDataDirName, "serverReplayClient")
  226. if err != nil {
  227. t.Fatalf("error createing temp dir: %s", err)
  228. }
  229. defer os.RemoveAll(dataRootDir)
  230. clientConfigJSON := fmt.Sprintf(`
  231. {
  232. "DataRootDirectory" : "%s",
  233. "ClientPlatform" : "Windows",
  234. "ClientVersion" : "0",
  235. "SponsorId" : "0000000000000000",
  236. "PropagationChannelId" : "0000000000000000",
  237. "TargetServerEntry" : "%s"
  238. }`, dataRootDir, string(encodedServerEntry))
  239. clientConfig, err := psiphon.LoadConfig([]byte(clientConfigJSON))
  240. if err != nil {
  241. t.Fatalf("error processing configuration file: %s", err)
  242. }
  243. err = clientConfig.Commit(false)
  244. if err != nil {
  245. t.Fatalf("error committing configuration file: %s", err)
  246. }
  247. err = psiphon.OpenDataStore(clientConfig)
  248. if err != nil {
  249. t.Fatalf("error initializing client datastore: %s", err)
  250. }
  251. defer psiphon.CloseDataStore()
  252. controller, err := psiphon.NewController(clientConfig)
  253. if err != nil {
  254. t.Fatalf("error creating client controller: %s", err)
  255. }
  256. tunnelEstablished := make(chan struct{}, 1)
  257. psiphon.SetNoticeWriter(psiphon.NewNoticeReceiver(
  258. func(notice []byte) {
  259. noticeType, payload, err := psiphon.GetNotice(notice)
  260. if err != nil {
  261. return
  262. }
  263. if noticeType == "Tunnels" {
  264. count := int(payload["count"].(float64))
  265. if count >= 1 {
  266. tunnelEstablished <- struct{}{}
  267. }
  268. }
  269. }))
  270. ctx, cancelFunc := context.WithCancel(context.Background())
  271. controllerWaitGroup := new(sync.WaitGroup)
  272. controllerWaitGroup.Add(1)
  273. go func() {
  274. defer controllerWaitGroup.Done()
  275. controller.Run(ctx)
  276. }()
  277. if handshakeSuccess {
  278. <-tunnelEstablished
  279. }
  280. // Meet tunnel duration critera.
  281. for i := 0; i < 20; i++ {
  282. time.Sleep(10 * time.Millisecond)
  283. _, _ = controller.Dial("127.0.0.1:80", nil)
  284. }
  285. cancelFunc()
  286. controllerWaitGroup.Wait()
  287. }