meek_test.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. /*
  2. * Copyright (c) 2017, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "bytes"
  22. crypto_rand "crypto/rand"
  23. "encoding/base64"
  24. "fmt"
  25. "math/rand"
  26. "net"
  27. "sync"
  28. "syscall"
  29. "testing"
  30. "time"
  31. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon"
  32. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  33. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/nacl/box"
  34. )
  35. var KB = 1024
  36. var MB = KB * KB
  37. func TestCachedResponse(t *testing.T) {
  38. rand.Seed(time.Now().Unix())
  39. testCases := []struct {
  40. concurrentResponses int
  41. responseSize int
  42. bufferSize int
  43. extendedBufferSize int
  44. extendedBufferCount int
  45. minBytesPerWrite int
  46. maxBytesPerWrite int
  47. copyPosition int
  48. expectedSuccess bool
  49. }{
  50. {1, 16, 16, 0, 0, 1, 1, 0, true},
  51. {1, 31, 16, 0, 0, 1, 1, 15, true},
  52. {1, 16, 2, 2, 7, 1, 1, 0, true},
  53. {1, 31, 15, 3, 5, 1, 1, 1, true},
  54. {1, 16, 16, 0, 0, 1, 1, 16, true},
  55. {1, 64*KB + 1, 64 * KB, 64 * KB, 1, 1, 1 * KB, 64 * KB, true},
  56. {1, 10 * MB, 64 * KB, 64 * KB, 158, 1, 32 * KB, 0, false},
  57. {1, 10 * MB, 64 * KB, 64 * KB, 159, 1, 32 * KB, 0, true},
  58. {1, 10 * MB, 64 * KB, 64 * KB, 160, 1, 32 * KB, 0, true},
  59. {1, 128 * KB, 64 * KB, 0, 0, 1, 1 * KB, 64 * KB, true},
  60. {1, 128 * KB, 64 * KB, 0, 0, 1, 1 * KB, 63 * KB, false},
  61. {1, 200 * KB, 64 * KB, 0, 0, 1, 1 * KB, 136 * KB, true},
  62. {10, 10 * MB, 64 * KB, 64 * KB, 1589, 1, 32 * KB, 0, false},
  63. {10, 10 * MB, 64 * KB, 64 * KB, 1590, 1, 32 * KB, 0, true},
  64. }
  65. for _, testCase := range testCases {
  66. description := fmt.Sprintf("test case: %+v", testCase)
  67. t.Run(description, func(t *testing.T) {
  68. pool := NewCachedResponseBufferPool(testCase.extendedBufferSize, testCase.extendedBufferCount)
  69. responses := make([]*CachedResponse, testCase.concurrentResponses)
  70. for i := 0; i < testCase.concurrentResponses; i++ {
  71. responses[i] = NewCachedResponse(testCase.bufferSize, pool)
  72. }
  73. // Repeats exercise CachedResponse.Reset() and CachedResponseBufferPool replacement
  74. for repeat := 0; repeat < 2; repeat++ {
  75. t.Logf("repeat %d", repeat)
  76. responseData := make([]byte, testCase.responseSize)
  77. _, _ = rand.Read(responseData)
  78. waitGroup := new(sync.WaitGroup)
  79. // Goroutines exercise concurrent access to CachedResponseBufferPool
  80. for _, response := range responses {
  81. waitGroup.Add(1)
  82. go func(response *CachedResponse) {
  83. defer waitGroup.Done()
  84. remainingSize := testCase.responseSize
  85. for remainingSize > 0 {
  86. writeSize := testCase.minBytesPerWrite
  87. writeSize += rand.Intn(testCase.maxBytesPerWrite - testCase.minBytesPerWrite + 1)
  88. if writeSize > remainingSize {
  89. writeSize = remainingSize
  90. }
  91. offset := len(responseData) - remainingSize
  92. response.Write(responseData[offset : offset+writeSize])
  93. remainingSize -= writeSize
  94. }
  95. }(response)
  96. }
  97. waitGroup.Wait()
  98. atLeastOneFailure := false
  99. for i, response := range responses {
  100. cachedResponseData := new(bytes.Buffer)
  101. n, err := response.CopyFromPosition(testCase.copyPosition, cachedResponseData)
  102. if testCase.expectedSuccess {
  103. if err != nil {
  104. t.Fatalf("CopyFromPosition unexpectedly failed for response %d: %s", i, err)
  105. }
  106. if n != cachedResponseData.Len() || n > response.Available() {
  107. t.Fatalf("cached response size mismatch for response %d", i)
  108. }
  109. if bytes.Compare(responseData[testCase.copyPosition:], cachedResponseData.Bytes()) != 0 {
  110. t.Fatalf("cached response data mismatch for response %d", i)
  111. }
  112. } else {
  113. atLeastOneFailure = true
  114. }
  115. }
  116. if !testCase.expectedSuccess && !atLeastOneFailure {
  117. t.Fatalf("CopyFromPosition unexpectedly succeeded for all responses")
  118. }
  119. for _, response := range responses {
  120. response.Reset()
  121. }
  122. }
  123. })
  124. }
  125. }
  126. func TestMeekResiliency(t *testing.T) {
  127. upstreamData := make([]byte, 5*MB)
  128. _, _ = rand.Read(upstreamData)
  129. downstreamData := make([]byte, 5*MB)
  130. _, _ = rand.Read(downstreamData)
  131. minWrite, maxWrite := 1, 128*KB
  132. minRead, maxRead := 1, 128*KB
  133. minWait, maxWait := 1*time.Millisecond, 500*time.Millisecond
  134. sendFunc := func(name string, conn net.Conn, data []byte) {
  135. for sent := 0; sent < len(data); {
  136. wait := minWait + time.Duration(rand.Int63n(int64(maxWait-minWait)+1))
  137. time.Sleep(wait)
  138. writeLen := minWrite + rand.Intn(maxWrite-minWrite+1)
  139. writeLen = min(writeLen, len(data)-sent)
  140. _, err := conn.Write(data[sent : sent+writeLen])
  141. if err != nil {
  142. t.Fatalf("conn.Write failed: %s", err)
  143. }
  144. sent += writeLen
  145. fmt.Printf("%s sent %d/%d...\n", name, sent, len(data))
  146. }
  147. fmt.Printf("%s send complete\n", name)
  148. }
  149. recvFunc := func(name string, conn net.Conn, expectedData []byte) {
  150. data := make([]byte, len(expectedData))
  151. for received := 0; received < len(data); {
  152. wait := minWait + time.Duration(rand.Int63n(int64(maxWait-minWait)+1))
  153. time.Sleep(wait)
  154. readLen := minRead + rand.Intn(maxRead-minRead+1)
  155. readLen = min(readLen, len(data)-received)
  156. n, err := conn.Read(data[received : received+readLen])
  157. if err != nil {
  158. t.Fatalf("conn.Read failed: %s", err)
  159. }
  160. received += n
  161. if bytes.Compare(data[0:received], expectedData[0:received]) != 0 {
  162. fmt.Printf("%s data check has failed...\n", name)
  163. additionalInfo := ""
  164. index := bytes.Index(expectedData, data[received-n:received])
  165. if index != -1 {
  166. // Helpful for debugging missing or repeated data...
  167. additionalInfo = fmt.Sprintf(
  168. " (last read of %d appears at %d)", n, index)
  169. }
  170. t.Fatalf("%s got unexpected data with %d/%d%s",
  171. name, received, len(expectedData), additionalInfo)
  172. }
  173. fmt.Printf("%s received %d/%d...\n", name, received, len(expectedData))
  174. }
  175. fmt.Printf("%s receive complete\n", name)
  176. }
  177. // Run meek server
  178. rawMeekCookieEncryptionPublicKey, rawMeekCookieEncryptionPrivateKey, err := box.GenerateKey(crypto_rand.Reader)
  179. if err != nil {
  180. t.Fatalf("box.GenerateKey failed: %s", err)
  181. }
  182. meekCookieEncryptionPublicKey := base64.StdEncoding.EncodeToString(rawMeekCookieEncryptionPublicKey[:])
  183. meekCookieEncryptionPrivateKey := base64.StdEncoding.EncodeToString(rawMeekCookieEncryptionPrivateKey[:])
  184. meekObfuscatedKey, err := common.MakeRandomStringHex(SSH_OBFUSCATED_KEY_BYTE_LENGTH)
  185. if err != nil {
  186. t.Fatalf("common.MakeRandomStringHex failed: %s", err)
  187. }
  188. mockSupport := &SupportServices{
  189. Config: &Config{
  190. MeekObfuscatedKey: meekObfuscatedKey,
  191. MeekCookieEncryptionPrivateKey: meekCookieEncryptionPrivateKey,
  192. },
  193. }
  194. listener, err := net.Listen("tcp", "127.0.0.1:0")
  195. if err != nil {
  196. t.Fatalf("net.Listen failed: %s", err)
  197. }
  198. defer listener.Close()
  199. serverAddress := listener.Addr().String()
  200. relayWaitGroup := new(sync.WaitGroup)
  201. clientHandler := func(_ string, conn net.Conn) {
  202. name := "server"
  203. relayWaitGroup.Add(1)
  204. go func() {
  205. defer relayWaitGroup.Done()
  206. sendFunc(name, conn, downstreamData)
  207. }()
  208. relayWaitGroup.Add(1)
  209. go func() {
  210. defer relayWaitGroup.Done()
  211. recvFunc(name, conn, upstreamData)
  212. }()
  213. }
  214. stopBroadcast := make(chan struct{})
  215. useTLS := false
  216. useObfuscatedSessionTickets := false
  217. server, err := NewMeekServer(
  218. mockSupport,
  219. listener,
  220. useTLS,
  221. useObfuscatedSessionTickets,
  222. clientHandler,
  223. stopBroadcast)
  224. if err != nil {
  225. t.Fatalf("NewMeekServer failed: %s", err)
  226. }
  227. serverWaitGroup := new(sync.WaitGroup)
  228. serverWaitGroup.Add(1)
  229. go func() {
  230. defer serverWaitGroup.Done()
  231. err := server.Run()
  232. if err != nil {
  233. t.Fatalf("MeekServer.Run failed: %s", err)
  234. }
  235. }()
  236. // Run meek client
  237. dialConfig := &psiphon.DialConfig{
  238. PendingConns: new(common.Conns),
  239. UseIndistinguishableTLS: true,
  240. DeviceBinder: new(fileDescriptorInterruptor),
  241. }
  242. meekConfig := &psiphon.MeekConfig{
  243. DialAddress: serverAddress,
  244. UseHTTPS: useTLS,
  245. UseObfuscatedSessionTickets: useObfuscatedSessionTickets,
  246. HostHeader: "example.com",
  247. MeekCookieEncryptionPublicKey: meekCookieEncryptionPublicKey,
  248. MeekObfuscatedKey: meekObfuscatedKey,
  249. }
  250. clientConn, err := psiphon.DialMeek(meekConfig, dialConfig)
  251. if err != nil {
  252. t.Fatalf("psiphon.DialMeek failed: %s", err)
  253. }
  254. // Relay data through meek while interrupting underlying TCP connections
  255. name := "client"
  256. relayWaitGroup.Add(1)
  257. go func() {
  258. defer relayWaitGroup.Done()
  259. sendFunc(name, clientConn, upstreamData)
  260. }()
  261. relayWaitGroup.Add(1)
  262. go func() {
  263. defer relayWaitGroup.Done()
  264. recvFunc(name, clientConn, downstreamData)
  265. }()
  266. relayWaitGroup.Wait()
  267. // Graceful shutdown
  268. clientConn.Close()
  269. listener.Close()
  270. close(stopBroadcast)
  271. // This wait will hang if shutdown is broken, and the test will ultimately panic
  272. serverWaitGroup.Wait()
  273. }
  274. type fileDescriptorInterruptor struct {
  275. }
  276. func (interruptor *fileDescriptorInterruptor) BindToDevice(fileDescriptor int) error {
  277. fdDup, err := syscall.Dup(fileDescriptor)
  278. if err != nil {
  279. return err
  280. }
  281. minAfter := 500 * time.Millisecond
  282. maxAfter := 1 * time.Second
  283. after := minAfter + time.Duration(rand.Int63n(int64(maxAfter-minAfter)+1))
  284. time.AfterFunc(after, func() {
  285. syscall.Shutdown(fdDup, syscall.SHUT_RDWR)
  286. syscall.Close(fdDup)
  287. fmt.Printf("interrupted TCP connection\n")
  288. })
  289. return nil
  290. }