dir-reader.js 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. // A thing that emits "entry" events with Reader objects
  2. // Pausing it causes it to stop emitting entry events, and also
  3. // pauses the current entry if there is one.
  4. module.exports = DirReader
  5. var fs = require("graceful-fs")
  6. , fstream = require("../fstream.js")
  7. , Reader = fstream.Reader
  8. , inherits = require("inherits")
  9. , mkdir = require("mkdirp")
  10. , path = require("path")
  11. , Reader = require("./reader.js")
  12. , assert = require("assert").ok
  13. inherits(DirReader, Reader)
  14. function DirReader (props) {
  15. var me = this
  16. if (!(me instanceof DirReader)) throw new Error(
  17. "DirReader must be called as constructor.")
  18. // should already be established as a Directory type
  19. if (props.type !== "Directory" || !props.Directory) {
  20. throw new Error("Non-directory type "+ props.type)
  21. }
  22. me.entries = null
  23. me._index = -1
  24. me._paused = false
  25. me._length = -1
  26. if (props.sort) {
  27. this.sort = props.sort
  28. }
  29. Reader.call(this, props)
  30. }
  31. DirReader.prototype._getEntries = function () {
  32. var me = this
  33. // race condition. might pause() before calling _getEntries,
  34. // and then resume, and try to get them a second time.
  35. if (me._gotEntries) return
  36. me._gotEntries = true
  37. fs.readdir(me._path, function (er, entries) {
  38. if (er) return me.error(er)
  39. me.entries = entries
  40. me.emit("entries", entries)
  41. if (me._paused) me.once("resume", processEntries)
  42. else processEntries()
  43. function processEntries () {
  44. me._length = me.entries.length
  45. if (typeof me.sort === "function") {
  46. me.entries = me.entries.sort(me.sort.bind(me))
  47. }
  48. me._read()
  49. }
  50. })
  51. }
  52. // start walking the dir, and emit an "entry" event for each one.
  53. DirReader.prototype._read = function () {
  54. var me = this
  55. if (!me.entries) return me._getEntries()
  56. if (me._paused || me._currentEntry || me._aborted) {
  57. // console.error("DR paused=%j, current=%j, aborted=%j", me._paused, !!me._currentEntry, me._aborted)
  58. return
  59. }
  60. me._index ++
  61. if (me._index >= me.entries.length) {
  62. if (!me._ended) {
  63. me._ended = true
  64. me.emit("end")
  65. me.emit("close")
  66. }
  67. return
  68. }
  69. // ok, handle this one, then.
  70. // save creating a proxy, by stat'ing the thing now.
  71. var p = path.resolve(me._path, me.entries[me._index])
  72. assert(p !== me._path)
  73. assert(me.entries[me._index])
  74. // set this to prevent trying to _read() again in the stat time.
  75. me._currentEntry = p
  76. fs[ me.props.follow ? "stat" : "lstat" ](p, function (er, stat) {
  77. if (er) return me.error(er)
  78. var who = me._proxy || me
  79. stat.path = p
  80. stat.basename = path.basename(p)
  81. stat.dirname = path.dirname(p)
  82. var childProps = me.getChildProps.call(who, stat)
  83. childProps.path = p
  84. childProps.basename = path.basename(p)
  85. childProps.dirname = path.dirname(p)
  86. var entry = Reader(childProps, stat)
  87. // console.error("DR Entry", p, stat.size)
  88. me._currentEntry = entry
  89. // "entry" events are for direct entries in a specific dir.
  90. // "child" events are for any and all children at all levels.
  91. // This nomenclature is not completely final.
  92. entry.on("pause", function (who) {
  93. if (!me._paused && !entry._disowned) {
  94. me.pause(who)
  95. }
  96. })
  97. entry.on("resume", function (who) {
  98. if (me._paused && !entry._disowned) {
  99. me.resume(who)
  100. }
  101. })
  102. entry.on("stat", function (props) {
  103. me.emit("_entryStat", entry, props)
  104. if (entry._aborted) return
  105. if (entry._paused) entry.once("resume", function () {
  106. me.emit("entryStat", entry, props)
  107. })
  108. else me.emit("entryStat", entry, props)
  109. })
  110. entry.on("ready", function EMITCHILD () {
  111. // console.error("DR emit child", entry._path)
  112. if (me._paused) {
  113. // console.error(" DR emit child - try again later")
  114. // pause the child, and emit the "entry" event once we drain.
  115. // console.error("DR pausing child entry")
  116. entry.pause(me)
  117. return me.once("resume", EMITCHILD)
  118. }
  119. // skip over sockets. they can't be piped around properly,
  120. // so there's really no sense even acknowledging them.
  121. // if someone really wants to see them, they can listen to
  122. // the "socket" events.
  123. if (entry.type === "Socket") {
  124. me.emit("socket", entry)
  125. } else {
  126. me.emitEntry(entry)
  127. }
  128. })
  129. var ended = false
  130. entry.on("close", onend)
  131. entry.on("disown", onend)
  132. function onend () {
  133. if (ended) return
  134. ended = true
  135. me.emit("childEnd", entry)
  136. me.emit("entryEnd", entry)
  137. me._currentEntry = null
  138. if (!me._paused) {
  139. me._read()
  140. }
  141. }
  142. // XXX Remove this. Works in node as of 0.6.2 or so.
  143. // Long filenames should not break stuff.
  144. entry.on("error", function (er) {
  145. if (entry._swallowErrors) {
  146. me.warn(er)
  147. entry.emit("end")
  148. entry.emit("close")
  149. } else {
  150. me.emit("error", er)
  151. }
  152. })
  153. // proxy up some events.
  154. ; [ "child"
  155. , "childEnd"
  156. , "warn"
  157. ].forEach(function (ev) {
  158. entry.on(ev, me.emit.bind(me, ev))
  159. })
  160. })
  161. }
  162. DirReader.prototype.disown = function (entry) {
  163. entry.emit("beforeDisown")
  164. entry._disowned = true
  165. entry.parent = entry.root = null
  166. if (entry === this._currentEntry) {
  167. this._currentEntry = null
  168. }
  169. entry.emit("disown")
  170. }
  171. DirReader.prototype.getChildProps = function (stat) {
  172. return { depth: this.depth + 1
  173. , root: this.root || this
  174. , parent: this
  175. , follow: this.follow
  176. , filter: this.filter
  177. , sort: this.props.sort
  178. , hardlinks: this.props.hardlinks
  179. }
  180. }
  181. DirReader.prototype.pause = function (who) {
  182. var me = this
  183. if (me._paused) return
  184. who = who || me
  185. me._paused = true
  186. if (me._currentEntry && me._currentEntry.pause) {
  187. me._currentEntry.pause(who)
  188. }
  189. me.emit("pause", who)
  190. }
  191. DirReader.prototype.resume = function (who) {
  192. var me = this
  193. if (!me._paused) return
  194. who = who || me
  195. me._paused = false
  196. // console.error("DR Emit Resume", me._path)
  197. me.emit("resume", who)
  198. if (me._paused) {
  199. // console.error("DR Re-paused", me._path)
  200. return
  201. }
  202. if (me._currentEntry) {
  203. if (me._currentEntry.resume) me._currentEntry.resume(who)
  204. } else me._read()
  205. }
  206. DirReader.prototype.emitEntry = function (entry) {
  207. this.emit("entry", entry)
  208. this.emit("child", entry)
  209. }