fragmentor.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. /*
  2. * Copyright (c) 2018, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package fragmentor
  20. import (
  21. "bytes"
  22. "context"
  23. "fmt"
  24. "net"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
  29. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
  30. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
  31. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
  32. "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
  33. )
  34. const (
  35. MAX_FRAGMENTOR_NOTICES = 3
  36. MAX_FRAGMENTOR_ITERATIONS_PER_NOTICE = 5
  37. )
  38. // Config specifies a fragmentor configuration. NewUpstreamConfig and
  39. // NewDownstreamConfig will generate configurations based on the given
  40. // client parameters.
  41. type Config struct {
  42. isUpstream bool
  43. probability float64
  44. minTotalBytes int
  45. maxTotalBytes int
  46. minWriteBytes int
  47. maxWriteBytes int
  48. minDelay time.Duration
  49. maxDelay time.Duration
  50. fragmentPRNG *prng.PRNG
  51. }
  52. // NewUpstreamConfig creates a new Config; may return nil. Specifying the PRNG
  53. // seed allows for optional replay of a fragmentor sequence.
  54. func NewUpstreamConfig(
  55. p parameters.ClientParametersAccessor, tunnelProtocol string, seed *prng.Seed) *Config {
  56. return newConfig(p, true, tunnelProtocol, seed)
  57. }
  58. // NewDownstreamConfig creates a new Config; may return nil. Specifying the
  59. // PRNG seed allows for optional replay of a fragmentor sequence.
  60. func NewDownstreamConfig(
  61. p parameters.ClientParametersAccessor, tunnelProtocol string, seed *prng.Seed) *Config {
  62. return newConfig(p, false, tunnelProtocol, seed)
  63. }
  64. func newConfig(
  65. p parameters.ClientParametersAccessor,
  66. isUpstream bool,
  67. tunnelProtocol string,
  68. seed *prng.Seed) *Config {
  69. if !protocol.TunnelProtocolIsCompatibleWithFragmentor(tunnelProtocol) {
  70. return nil
  71. }
  72. probability := parameters.FragmentorProbability
  73. limitProtocols := parameters.FragmentorLimitProtocols
  74. minTotalBytes := parameters.FragmentorMinTotalBytes
  75. maxTotalBytes := parameters.FragmentorMaxTotalBytes
  76. minWriteBytes := parameters.FragmentorMinWriteBytes
  77. maxWriteBytes := parameters.FragmentorMaxWriteBytes
  78. minDelay := parameters.FragmentorMinDelay
  79. maxDelay := parameters.FragmentorMaxDelay
  80. if !isUpstream {
  81. probability = parameters.FragmentorDownstreamProbability
  82. limitProtocols = parameters.FragmentorDownstreamLimitProtocols
  83. minTotalBytes = parameters.FragmentorDownstreamMinTotalBytes
  84. maxTotalBytes = parameters.FragmentorDownstreamMaxTotalBytes
  85. minWriteBytes = parameters.FragmentorDownstreamMinWriteBytes
  86. maxWriteBytes = parameters.FragmentorDownstreamMaxWriteBytes
  87. minDelay = parameters.FragmentorDownstreamMinDelay
  88. maxDelay = parameters.FragmentorDownstreamMaxDelay
  89. }
  90. tunnelProtocols := p.TunnelProtocols(limitProtocols)
  91. // When maxTotalBytes is 0 or the protocol is not a candidate for
  92. // fragmentation, it's a certainty that no fragmentation will be
  93. // performed.
  94. //
  95. // It's also possible that the weighted coin flip or random selection of
  96. // bytesToFragment will result in no fragmentation. However, as "seed" may
  97. // be nil, PRNG calls are deferred and these values are not yet known.
  98. //
  99. // TODO: when "seed" is not nil, the coin flip/range could be done here.
  100. if p.Int(maxTotalBytes) == 0 ||
  101. (len(tunnelProtocols) > 0 && !common.Contains(tunnelProtocols, tunnelProtocol)) {
  102. return nil
  103. }
  104. var fragmentPRNG *prng.PRNG
  105. if seed != nil {
  106. fragmentPRNG = prng.NewPRNGWithSeed(seed)
  107. }
  108. return &Config{
  109. isUpstream: isUpstream,
  110. probability: p.Float(probability),
  111. minTotalBytes: p.Int(minTotalBytes),
  112. maxTotalBytes: p.Int(maxTotalBytes),
  113. minWriteBytes: p.Int(minWriteBytes),
  114. maxWriteBytes: p.Int(maxWriteBytes),
  115. minDelay: p.Duration(minDelay),
  116. maxDelay: p.Duration(maxDelay),
  117. fragmentPRNG: fragmentPRNG,
  118. }
  119. }
  120. // MayFragment indicates whether the fragmentor configuration may result in
  121. // any fragmentation; config can be nil. When MayFragment is false, the caller
  122. // should skip wrapping the associated conn with a fragmentor.Conn.
  123. func (config *Config) MayFragment() bool {
  124. return config != nil
  125. }
  126. // Conn implements simple fragmentation of application-level messages/packets
  127. // into multiple TCP packets by splitting writes into smaller sizes and adding
  128. // delays between writes.
  129. //
  130. // The intent of Conn is both to frustrate firewalls that perform DPI on
  131. // application-level messages that cross TCP packets as well as to perform a
  132. // simple size and timing transformation to the traffic shape of the initial
  133. // portion of a TCP flow.
  134. type Conn struct {
  135. net.Conn
  136. config *Config
  137. noticeEmitter func(string)
  138. runCtx context.Context
  139. stopRunning context.CancelFunc
  140. isClosed int32
  141. writeMutex sync.Mutex
  142. numNotices int
  143. fragmentPRNG *prng.PRNG
  144. bytesToFragment int
  145. bytesFragmented int
  146. maxBytesWritten int
  147. minBytesWritten int
  148. minDelayed time.Duration
  149. maxDelayed time.Duration
  150. }
  151. // NewConn creates a new Conn. When no seed was provided in the Config,
  152. // SetPRNG must be called before the first Write.
  153. func NewConn(
  154. config *Config,
  155. noticeEmitter func(string),
  156. conn net.Conn) *Conn {
  157. runCtx, stopRunning := context.WithCancel(context.Background())
  158. return &Conn{
  159. Conn: conn,
  160. config: config,
  161. noticeEmitter: noticeEmitter,
  162. runCtx: runCtx,
  163. stopRunning: stopRunning,
  164. fragmentPRNG: config.fragmentPRNG,
  165. bytesToFragment: -1,
  166. }
  167. }
  168. // GetMetrics implements the common.MetricsSource interface.
  169. func (c *Conn) GetMetrics() common.LogFields {
  170. c.writeMutex.Lock()
  171. defer c.writeMutex.Unlock()
  172. logFields := make(common.LogFields)
  173. if c.bytesFragmented == 0 {
  174. return logFields
  175. }
  176. var prefix string
  177. if c.config.isUpstream {
  178. prefix = "upstream_"
  179. } else {
  180. prefix = "downstream_"
  181. }
  182. logFields[prefix+"bytes_fragmented"] = c.bytesFragmented
  183. logFields[prefix+"min_bytes_written"] = c.minBytesWritten
  184. logFields[prefix+"max_bytes_written"] = c.maxBytesWritten
  185. logFields[prefix+"min_delayed"] = int(c.minDelayed / time.Microsecond)
  186. logFields[prefix+"max_delayed"] = int(c.maxDelayed / time.Microsecond)
  187. return logFields
  188. }
  189. var upstreamMetricsNames = []string{
  190. "upstream_bytes_fragmented",
  191. "upstream_min_bytes_written",
  192. "upstream_max_bytes_written",
  193. "upstream_min_delayed",
  194. "upstream_max_delayed",
  195. }
  196. // GetUpstreamMetricsNames returns the upstream metrics parameter names.
  197. func GetUpstreamMetricsNames() []string {
  198. return upstreamMetricsNames
  199. }
  200. // SetPRNG sets the PRNG to be used by the fragmentor. Specifying a PRNG
  201. // allows for optional replay of a fragmentor sequence. SetPRNG is intended to
  202. // be used with obfuscator.GetDerivedPRNG and allows for setting the PRNG
  203. // after a conn has already been wrapped with a fragmentor.Conn (but before
  204. // the first Write).
  205. //
  206. // If no seed is specified in NewUp/DownstreamConfig and SetPRNG is not called
  207. // before the first Write, the Write will fail. If a seed was specified, or
  208. // SetPRNG was already called, SetPRNG has no effect.
  209. func (c *Conn) SetPRNG(PRNG *prng.PRNG) {
  210. c.writeMutex.Lock()
  211. defer c.writeMutex.Unlock()
  212. if c.fragmentPRNG == nil {
  213. c.fragmentPRNG = PRNG
  214. }
  215. }
  216. func (c *Conn) Write(buffer []byte) (int, error) {
  217. c.writeMutex.Lock()
  218. defer c.writeMutex.Unlock()
  219. if c.fragmentPRNG == nil {
  220. return 0, errors.TraceNew("missing fragmentPRNG")
  221. }
  222. if c.bytesToFragment == -1 {
  223. if !c.fragmentPRNG.FlipWeightedCoin(c.config.probability) {
  224. c.bytesToFragment = 0
  225. } else {
  226. c.bytesToFragment = c.fragmentPRNG.Range(
  227. c.config.minTotalBytes, c.config.maxTotalBytes)
  228. }
  229. }
  230. if c.bytesFragmented >= c.bytesToFragment {
  231. return c.Conn.Write(buffer)
  232. }
  233. totalBytesWritten := 0
  234. emitNotice := c.noticeEmitter != nil &&
  235. c.numNotices < MAX_FRAGMENTOR_NOTICES
  236. // TODO: use strings.Builder in Go 1.10
  237. var notice bytes.Buffer
  238. if emitNotice {
  239. fmt.Fprintf(&notice, "fragment %d bytes:", len(buffer))
  240. }
  241. for iterations := 0; len(buffer) > 0; iterations += 1 {
  242. delay := c.fragmentPRNG.Period(c.config.minDelay, c.config.maxDelay)
  243. timer := time.NewTimer(delay)
  244. var err error
  245. select {
  246. case <-c.runCtx.Done():
  247. err = c.runCtx.Err()
  248. case <-timer.C:
  249. }
  250. timer.Stop()
  251. if err != nil {
  252. return totalBytesWritten, err
  253. }
  254. minWriteBytes := c.config.minWriteBytes
  255. if minWriteBytes > len(buffer) {
  256. minWriteBytes = len(buffer)
  257. }
  258. maxWriteBytes := c.config.maxWriteBytes
  259. if maxWriteBytes > len(buffer) {
  260. maxWriteBytes = len(buffer)
  261. }
  262. writeBytes := c.fragmentPRNG.Range(minWriteBytes, maxWriteBytes)
  263. bytesWritten, err := c.Conn.Write(buffer[:writeBytes])
  264. totalBytesWritten += bytesWritten
  265. c.bytesFragmented += bytesWritten
  266. if err != nil {
  267. return totalBytesWritten, err
  268. }
  269. if c.minBytesWritten == 0 || c.minBytesWritten > bytesWritten {
  270. c.minBytesWritten = bytesWritten
  271. }
  272. if c.maxBytesWritten < bytesWritten {
  273. c.maxBytesWritten = bytesWritten
  274. }
  275. if c.minDelayed == 0 || c.minDelayed > delay {
  276. c.minDelayed = delay
  277. }
  278. if c.maxDelayed < delay {
  279. c.maxDelayed = delay
  280. }
  281. if emitNotice {
  282. if iterations < MAX_FRAGMENTOR_ITERATIONS_PER_NOTICE {
  283. fmt.Fprintf(&notice, " [%s] %d", delay, bytesWritten)
  284. } else if iterations == MAX_FRAGMENTOR_ITERATIONS_PER_NOTICE {
  285. fmt.Fprintf(&notice, "...")
  286. }
  287. }
  288. buffer = buffer[writeBytes:]
  289. // As soon as bytesToFragment has been satisfied, don't fragment the
  290. // remainder of this write buffer.
  291. if c.bytesFragmented >= c.bytesToFragment {
  292. bytesWritten, err := c.Conn.Write(buffer)
  293. totalBytesWritten += bytesWritten
  294. if err != nil {
  295. return totalBytesWritten, err
  296. } else {
  297. buffer = nil
  298. }
  299. }
  300. }
  301. if emitNotice {
  302. c.noticeEmitter(notice.String())
  303. c.numNotices += 1
  304. }
  305. return totalBytesWritten, nil
  306. }
  307. func (c *Conn) CloseWrite() error {
  308. if closeWriter, ok := c.Conn.(common.CloseWriter); ok {
  309. return closeWriter.CloseWrite()
  310. }
  311. return errors.TraceNew("underlying conn is not a CloseWriter")
  312. }
  313. func (c *Conn) Close() (err error) {
  314. if !atomic.CompareAndSwapInt32(&c.isClosed, 0, 1) {
  315. return nil
  316. }
  317. c.stopRunning()
  318. return c.Conn.Close()
  319. }
  320. func (c *Conn) IsClosed() bool {
  321. return atomic.LoadInt32(&c.isClosed) == 1
  322. }