meekBuffer.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. /*
  2. * Copyright (c) 2017, Psiphon Inc.
  3. * All rights reserved.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. */
  19. package server
  20. import (
  21. "errors"
  22. "io"
  23. )
  24. // CachedResponse is a data structure that supports meek
  25. // protocol connection interruption resiliency: it stores
  26. // payload data from the most recent response so that it
  27. // may be resent if the client fails to receive it.
  28. //
  29. // The meek server maintains one CachedResponse for each
  30. // meek client. Psiphon's variant of meek streams response
  31. // data, so responses are not fixed size. To limit the memory
  32. // overhead of response caching, each CachedResponse has a
  33. // fixed-size buffer that operates as a ring buffer,
  34. // discarding older response bytes when the buffer fills.
  35. // A CachedResponse that has discarded data may still satisfy
  36. // a client retry where the client has already received part
  37. // of the response payload.
  38. //
  39. // A CachedResponse will also extend its capacity by
  40. // borrowing buffers from a CachedResponseBufferPool, if
  41. // available. When Reset is called, borrowed buffers are
  42. // released back to the pool.
  43. type CachedResponse struct {
  44. buffers [][]byte
  45. readPosition int
  46. readAvailable int
  47. writeIndex int
  48. writeBufferIndex int
  49. overwriting bool
  50. extendedBufferPool *CachedResponseBufferPool
  51. }
  52. // NewCachedResponse creates a CachedResponse with a fixed buffer
  53. // of size bufferSize and borrowing buffers from extendedBufferPool.
  54. func NewCachedResponse(
  55. bufferSize int,
  56. extendedBufferPool *CachedResponseBufferPool) *CachedResponse {
  57. return &CachedResponse{
  58. buffers: [][]byte{make([]byte, bufferSize)},
  59. extendedBufferPool: extendedBufferPool,
  60. }
  61. }
  62. // Reset reinitializes the CachedResponse state to have
  63. // no buffered response and releases all extended buffers
  64. // back to the pool.
  65. // Reset _must_ be called before discarding a CachedResponse
  66. // or extended buffers will not be released.
  67. func (response *CachedResponse) Reset() {
  68. for i, buffer := range response.buffers {
  69. if i > 0 {
  70. response.extendedBufferPool.Put(buffer)
  71. }
  72. }
  73. response.buffers = response.buffers[0:1]
  74. response.readPosition = 0
  75. response.readAvailable = 0
  76. response.writeIndex = 0
  77. response.writeBufferIndex = 0
  78. response.overwriting = false
  79. }
  80. // Available returns the size of the buffered response data.
  81. func (response *CachedResponse) Available() int {
  82. return response.readAvailable
  83. }
  84. // HasPosition checks if the CachedResponse has buffered
  85. // response data starting at or before the specified
  86. // position.
  87. func (response *CachedResponse) HasPosition(position int) bool {
  88. return response.readAvailable > 0 && response.readPosition <= position
  89. }
  90. // CopyFromPosition writes the response data, starting at
  91. // the specified position, to writer. Any data before the
  92. // position is skipped. CopyFromPosition will return an error
  93. // if the specified position is not available.
  94. // CopyFromPosition will copy no data and return no error if
  95. // the position is at the end of its available data.
  96. // CopyFromPosition can be called repeatedly to read the
  97. // same data -- it does not advance or modify the CachedResponse.
  98. func (response *CachedResponse) CopyFromPosition(
  99. position int, writer io.Writer) (int, error) {
  100. if response.readAvailable > 0 && response.readPosition > position {
  101. return 0, errors.New("position unavailable")
  102. }
  103. // Special case: position is end of available data
  104. if position == response.readPosition+response.readAvailable {
  105. return 0, nil
  106. }
  107. // Begin at the start of the response data, which may
  108. // be midway through the buffer(s).
  109. index := 0
  110. bufferIndex := 0
  111. if response.overwriting {
  112. index = response.writeIndex
  113. bufferIndex = response.writeBufferIndex
  114. if index >= len(response.buffers[bufferIndex]) {
  115. index = 0
  116. bufferIndex = (bufferIndex + 1) % len(response.buffers)
  117. }
  118. }
  119. // Iterate over all available data, skipping until at the
  120. // requested position.
  121. n := 0
  122. skip := position - response.readPosition
  123. available := response.readAvailable
  124. for available > 0 {
  125. buffer := response.buffers[bufferIndex]
  126. toCopy := min(len(buffer)-index, available)
  127. available -= toCopy
  128. if skip > 0 {
  129. if toCopy >= skip {
  130. index += skip
  131. toCopy -= skip
  132. skip = 0
  133. } else {
  134. skip -= toCopy
  135. }
  136. }
  137. if skip == 0 {
  138. written, err := writer.Write(buffer[index : index+toCopy])
  139. n += written
  140. if err != nil {
  141. return n, err
  142. }
  143. }
  144. index = 0
  145. bufferIndex = (bufferIndex + 1) % len(response.buffers)
  146. }
  147. return n, nil
  148. }
  149. // Write appends data to the CachedResponse. All writes will
  150. // succeed, but only the most recent bytes will be retained
  151. // once the fixed buffer is full and no extended buffers are
  152. // available.
  153. //
  154. // Write may be called multiple times to record a single
  155. // response; Reset should be called between responses.
  156. //
  157. // Write conforms to the io.Writer interface.
  158. func (response *CachedResponse) Write(data []byte) (int, error) {
  159. dataIndex := 0
  160. for dataIndex < len(data) {
  161. // Write into available space in the current buffer
  162. buffer := response.buffers[response.writeBufferIndex]
  163. canWriteLen := len(buffer) - response.writeIndex
  164. needWriteLen := len(data) - dataIndex
  165. writeLen := min(canWriteLen, needWriteLen)
  166. if writeLen > 0 {
  167. copy(
  168. buffer[response.writeIndex:response.writeIndex+writeLen],
  169. data[dataIndex:dataIndex+writeLen])
  170. response.writeIndex += writeLen
  171. // readPosition tracks the earliest position in
  172. // the response that remains in the cached response.
  173. // Once the buffer is full (and cannot be extended),
  174. // older data is overwritten and readPosition advances.
  175. //
  176. // readAvailable is the amount of data in the cached
  177. // response, which may be less than the buffer capacity.
  178. if response.overwriting {
  179. response.readPosition += writeLen
  180. } else {
  181. response.readAvailable += writeLen
  182. }
  183. dataIndex += writeLen
  184. }
  185. if needWriteLen > canWriteLen {
  186. // Add an extended buffer to increase capacity
  187. // TODO: can extend whenever response.readIndex and response.readBufferIndex are 0?
  188. if response.writeBufferIndex == len(response.buffers)-1 &&
  189. !response.overwriting {
  190. extendedBuffer := response.extendedBufferPool.Get()
  191. if extendedBuffer != nil {
  192. response.buffers = append(response.buffers, extendedBuffer)
  193. }
  194. }
  195. // Move to the next buffer, which may wrap around
  196. // This isn't a general ring buffer: Reset is called at
  197. // start of each response, so the initial data is always
  198. // at the beginning of the first buffer. It follows that
  199. // data is overwritten once the buffer wraps around back
  200. // to the beginning.
  201. response.writeBufferIndex++
  202. if response.writeBufferIndex >= len(response.buffers) {
  203. response.writeBufferIndex = 0
  204. response.overwriting = true
  205. }
  206. response.writeIndex = 0
  207. }
  208. }
  209. return len(data), nil
  210. }
  211. // CachedResponseBufferPool is a fixed-size pool of
  212. // fixed-size buffers that are used to temporarily extend
  213. // the capacity of CachedResponses.
  214. type CachedResponseBufferPool struct {
  215. bufferSize int
  216. buffers chan []byte
  217. }
  218. // NewCachedResponseBufferPool creates a new CachedResponseBufferPool
  219. // with the specified number of buffers. Buffers are allocated on
  220. // demand and once allocated remain allocated.
  221. func NewCachedResponseBufferPool(
  222. bufferSize, bufferCount int) *CachedResponseBufferPool {
  223. buffers := make(chan []byte, bufferCount)
  224. for i := 0; i < bufferCount; i++ {
  225. buffers <- make([]byte, 0)
  226. }
  227. return &CachedResponseBufferPool{
  228. bufferSize: bufferSize,
  229. buffers: buffers,
  230. }
  231. }
  232. // Get returns a buffer, if one is available, or returns nil
  233. // when no buffer is available. Get does not block. Call Put
  234. // to release the buffer back to the pool.
  235. //
  236. // Note: currently, Buffers are not zeroed between use by
  237. // different CachedResponses owned by different clients.
  238. // A bug resulting in cross-client data transfer exposes
  239. // only OSSH ciphertext in the case of meek's use of
  240. // CachedResponses.
  241. func (pool *CachedResponseBufferPool) Get() []byte {
  242. select {
  243. case buffer := <-pool.buffers:
  244. if len(buffer) == 0 {
  245. buffer = make([]byte, pool.bufferSize)
  246. }
  247. return buffer
  248. default:
  249. return nil
  250. }
  251. }
  252. // Put releases a buffer back to the pool. The buffer must
  253. // have been obtained from Get.
  254. func (pool *CachedResponseBufferPool) Put(buffer []byte) {
  255. pool.buffers <- buffer
  256. }