collect.js 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. module.exports = collect
  2. function collect (stream) {
  3. if (stream._collected) return
  4. stream._collected = true
  5. stream.pause()
  6. stream.on("data", save)
  7. stream.on("end", save)
  8. var buf = []
  9. function save (b) {
  10. if (typeof b === "string") b = new Buffer(b)
  11. if (Buffer.isBuffer(b) && !b.length) return
  12. buf.push(b)
  13. }
  14. stream.on("entry", saveEntry)
  15. var entryBuffer = []
  16. function saveEntry (e) {
  17. collect(e)
  18. entryBuffer.push(e)
  19. }
  20. stream.on("proxy", proxyPause)
  21. function proxyPause (p) {
  22. p.pause()
  23. }
  24. // replace the pipe method with a new version that will
  25. // unlock the buffered stuff. if you just call .pipe()
  26. // without a destination, then it'll re-play the events.
  27. stream.pipe = (function (orig) { return function (dest) {
  28. // console.error(" === open the pipes", dest && dest.path)
  29. // let the entries flow through one at a time.
  30. // Once they're all done, then we can resume completely.
  31. var e = 0
  32. ;(function unblockEntry () {
  33. var entry = entryBuffer[e++]
  34. // console.error(" ==== unblock entry", entry && entry.path)
  35. if (!entry) return resume()
  36. entry.on("end", unblockEntry)
  37. if (dest) dest.add(entry)
  38. else stream.emit("entry", entry)
  39. })()
  40. function resume () {
  41. stream.removeListener("entry", saveEntry)
  42. stream.removeListener("data", save)
  43. stream.removeListener("end", save)
  44. stream.pipe = orig
  45. if (dest) stream.pipe(dest)
  46. buf.forEach(function (b) {
  47. if (b) stream.emit("data", b)
  48. else stream.emit("end")
  49. })
  50. stream.resume()
  51. }
  52. return dest
  53. }})(stream.pipe)
  54. }