| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- // Basically just a wrapper around an fs.ReadStream
- module.exports = FileReader
- var fs = require("graceful-fs")
- , fstream = require("../fstream.js")
- , Reader = fstream.Reader
- , inherits = require("inherits")
- , mkdir = require("mkdirp")
- , Reader = require("./reader.js")
- , EOF = {EOF: true}
- , CLOSE = {CLOSE: true}
- inherits(FileReader, Reader)
- function FileReader (props) {
- // console.error(" FR create", props.path, props.size, new Error().stack)
- var me = this
- if (!(me instanceof FileReader)) throw new Error(
- "FileReader must be called as constructor.")
- // should already be established as a File type
- // XXX Todo: preserve hardlinks by tracking dev+inode+nlink,
- // with a HardLinkReader class.
- if (!((props.type === "Link" && props.Link) ||
- (props.type === "File" && props.File))) {
- throw new Error("Non-file type "+ props.type)
- }
- me._buffer = []
- me._bytesEmitted = 0
- Reader.call(me, props)
- }
- FileReader.prototype._getStream = function () {
- var me = this
- , stream = me._stream = fs.createReadStream(me._path, me.props)
- if (me.props.blksize) {
- stream.bufferSize = me.props.blksize
- }
- stream.on("open", me.emit.bind(me, "open"))
- stream.on("data", function (c) {
- // console.error("\t\t%d %s", c.length, me.basename)
- me._bytesEmitted += c.length
- // no point saving empty chunks
- if (!c.length) return
- else if (me._paused || me._buffer.length) {
- me._buffer.push(c)
- me._read()
- } else me.emit("data", c)
- })
- stream.on("end", function () {
- if (me._paused || me._buffer.length) {
- // console.error("FR Buffering End", me._path)
- me._buffer.push(EOF)
- me._read()
- } else {
- me.emit("end")
- }
- if (me._bytesEmitted !== me.props.size) {
- me.error("Didn't get expected byte count\n"+
- "expect: "+me.props.size + "\n" +
- "actual: "+me._bytesEmitted)
- }
- })
- stream.on("close", function () {
- if (me._paused || me._buffer.length) {
- // console.error("FR Buffering Close", me._path)
- me._buffer.push(CLOSE)
- me._read()
- } else {
- // console.error("FR close 1", me._path)
- me.emit("close")
- }
- })
- me._read()
- }
- FileReader.prototype._read = function () {
- var me = this
- // console.error("FR _read", me._path)
- if (me._paused) {
- // console.error("FR _read paused", me._path)
- return
- }
- if (!me._stream) {
- // console.error("FR _getStream calling", me._path)
- return me._getStream()
- }
- // clear out the buffer, if there is one.
- if (me._buffer.length) {
- // console.error("FR _read has buffer", me._buffer.length, me._path)
- var buf = me._buffer
- for (var i = 0, l = buf.length; i < l; i ++) {
- var c = buf[i]
- if (c === EOF) {
- // console.error("FR Read emitting buffered end", me._path)
- me.emit("end")
- } else if (c === CLOSE) {
- // console.error("FR Read emitting buffered close", me._path)
- me.emit("close")
- } else {
- // console.error("FR Read emitting buffered data", me._path)
- me.emit("data", c)
- }
- if (me._paused) {
- // console.error("FR Read Re-pausing at "+i, me._path)
- me._buffer = buf.slice(i)
- return
- }
- }
- me._buffer.length = 0
- }
- // console.error("FR _read done")
- // that's about all there is to it.
- }
- FileReader.prototype.pause = function (who) {
- var me = this
- // console.error("FR Pause", me._path)
- if (me._paused) return
- who = who || me
- me._paused = true
- if (me._stream) me._stream.pause()
- me.emit("pause", who)
- }
- FileReader.prototype.resume = function (who) {
- var me = this
- // console.error("FR Resume", me._path)
- if (!me._paused) return
- who = who || me
- me.emit("resume", who)
- me._paused = false
- if (me._stream) me._stream.resume()
- me._read()
- }
|