index.js 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. //filter will reemit the data if cb(err,pass) pass is truthy
  2. // reduce is more tricky
  3. // maybe we want to group the reductions or emit progress updates occasionally
  4. // the most basic reduce just emits one 'data' event after it has recieved 'end'
  5. var Stream = require('stream').Stream
  6. , es = exports
  7. , through = require('through')
  8. , from = require('from')
  9. , duplex = require('duplexer')
  10. , map = require('map-stream')
  11. , pause = require('pause-stream')
  12. , split = require('split')
  13. , pipeline = require('stream-combiner')
  14. es.Stream = Stream //re-export Stream from core
  15. es.through = through
  16. es.from = from
  17. es.duplex = duplex
  18. es.map = map
  19. es.pause = pause
  20. es.split = split
  21. es.pipeline = es.connect = es.pipe = pipeline
  22. // merge / concat
  23. //
  24. // combine multiple streams into a single stream.
  25. // will emit end only once
  26. es.concat = //actually this should be called concat
  27. es.merge = function (/*streams...*/) {
  28. var toMerge = [].slice.call(arguments)
  29. var stream = new Stream()
  30. var endCount = 0
  31. stream.writable = stream.readable = true
  32. toMerge.forEach(function (e) {
  33. e.pipe(stream, {end: false})
  34. var ended = false
  35. e.on('end', function () {
  36. if(ended) return
  37. ended = true
  38. endCount ++
  39. if(endCount == toMerge.length)
  40. stream.emit('end')
  41. })
  42. })
  43. stream.write = function (data) {
  44. this.emit('data', data)
  45. }
  46. stream.destroy = function () {
  47. merge.forEach(function (e) {
  48. if(e.destroy) e.destroy()
  49. })
  50. }
  51. return stream
  52. }
  53. // writable stream, collects all events into an array
  54. // and calls back when 'end' occurs
  55. // mainly I'm using this to test the other functions
  56. es.writeArray = function (done) {
  57. if ('function' !== typeof done)
  58. throw new Error('function writeArray (done): done must be function')
  59. var a = new Stream ()
  60. , array = [], isDone = false
  61. a.write = function (l) {
  62. array.push(l)
  63. }
  64. a.end = function () {
  65. isDone = true
  66. done(null, array)
  67. }
  68. a.writable = true
  69. a.readable = false
  70. a.destroy = function () {
  71. a.writable = a.readable = false
  72. if(isDone) return
  73. done(new Error('destroyed before end'), array)
  74. }
  75. return a
  76. }
  77. //return a Stream that reads the properties of an object
  78. //respecting pause() and resume()
  79. es.readArray = function (array) {
  80. var stream = new Stream()
  81. , i = 0
  82. , paused = false
  83. , ended = false
  84. stream.readable = true
  85. stream.writable = false
  86. if(!Array.isArray(array))
  87. throw new Error('event-stream.read expects an array')
  88. stream.resume = function () {
  89. if(ended) return
  90. paused = false
  91. var l = array.length
  92. while(i < l && !paused && !ended) {
  93. stream.emit('data', array[i++])
  94. }
  95. if(i == l && !ended)
  96. ended = true, stream.readable = false, stream.emit('end')
  97. }
  98. process.nextTick(stream.resume)
  99. stream.pause = function () {
  100. paused = true
  101. }
  102. stream.destroy = function () {
  103. ended = true
  104. stream.emit('close')
  105. }
  106. return stream
  107. }
  108. //
  109. // readable (asyncFunction)
  110. // return a stream that calls an async function while the stream is not paused.
  111. //
  112. // the function must take: (count, callback) {...
  113. //
  114. es.readable =
  115. function (func, continueOnError) {
  116. var stream = new Stream()
  117. , i = 0
  118. , paused = false
  119. , ended = false
  120. , reading = false
  121. stream.readable = true
  122. stream.writable = false
  123. if('function' !== typeof func)
  124. throw new Error('event-stream.readable expects async function')
  125. stream.on('end', function () { ended = true })
  126. function get (err, data) {
  127. if(err) {
  128. stream.emit('error', err)
  129. if(!continueOnError) stream.emit('end')
  130. } else if (arguments.length > 1)
  131. stream.emit('data', data)
  132. process.nextTick(function () {
  133. if(ended || paused || reading) return
  134. try {
  135. reading = true
  136. func.call(stream, i++, function () {
  137. reading = false
  138. get.apply(null, arguments)
  139. })
  140. } catch (err) {
  141. stream.emit('error', err)
  142. }
  143. })
  144. }
  145. stream.resume = function () {
  146. paused = false
  147. get()
  148. }
  149. process.nextTick(get)
  150. stream.pause = function () {
  151. paused = true
  152. }
  153. stream.destroy = function () {
  154. stream.emit('end')
  155. stream.emit('close')
  156. ended = true
  157. }
  158. return stream
  159. }
  160. //
  161. // map sync
  162. //
  163. es.mapSync = function (sync) {
  164. return es.through(function write(data) {
  165. var mappedData = sync(data)
  166. if (typeof mappedData !== 'undefined')
  167. this.emit('data', mappedData)
  168. })
  169. }
  170. //
  171. // log just print out what is coming through the stream, for debugging
  172. //
  173. es.log = function (name) {
  174. return es.through(function (data) {
  175. var args = [].slice.call(arguments)
  176. if(name) console.error(name, data)
  177. else console.error(data)
  178. this.emit('data', data)
  179. })
  180. }
  181. //
  182. // child -- pipe through a child process
  183. //
  184. es.child = function (child) {
  185. return es.duplex(child.stdin, child.stdout)
  186. }
  187. //
  188. // parse
  189. //
  190. // must be used after es.split() to ensure that each chunk represents a line
  191. // source.pipe(es.split()).pipe(es.parse())
  192. es.parse = function () {
  193. return es.through(function (data) {
  194. var obj
  195. try {
  196. if(data) //ignore empty lines
  197. obj = JSON.parse(data.toString())
  198. } catch (err) {
  199. return console.error(err, 'attemping to parse:', data)
  200. }
  201. //ignore lines that where only whitespace.
  202. if(obj !== undefined)
  203. this.emit('data', obj)
  204. })
  205. }
  206. //
  207. // stringify
  208. //
  209. es.stringify = function () {
  210. var Buffer = require('buffer').Buffer
  211. return es.mapSync(function (e){
  212. return JSON.stringify(Buffer.isBuffer(e) ? e.toString() : e) + '\n'
  213. })
  214. }
  215. //
  216. // replace a string within a stream.
  217. //
  218. // warn: just concatenates the string and then does str.split().join().
  219. // probably not optimal.
  220. // for smallish responses, who cares?
  221. // I need this for shadow-npm so it's only relatively small json files.
  222. es.replace = function (from, to) {
  223. return es.pipeline(es.split(from), es.join(to))
  224. }
  225. //
  226. // join chunks with a joiner. just like Array#join
  227. // also accepts a callback that is passed the chunks appended together
  228. // this is still supported for legacy reasons.
  229. //
  230. es.join = function (str) {
  231. //legacy api
  232. if('function' === typeof str)
  233. return es.wait(str)
  234. var first = true
  235. return es.through(function (data) {
  236. if(!first)
  237. this.emit('data', str)
  238. first = false
  239. this.emit('data', data)
  240. return true
  241. })
  242. }
  243. //
  244. // wait. callback when 'end' is emitted, with all chunks appended as string.
  245. //
  246. es.wait = function (callback) {
  247. var body = ''
  248. return es.through(function (data) { body += data },
  249. function () {
  250. this.emit('data', body)
  251. this.emit('end')
  252. if(callback) callback(null, body)
  253. })
  254. }
  255. es.pipeable = function () {
  256. throw new Error('[EVENT-STREAM] es.pipeable is deprecated')
  257. }