| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430 |
- 'use strict';
- var nodeunit = require('nodeunit');
- var fs = require("fs");
- var path = require("path");
- var streamBuffers = require("stream-buffers");
- var async = require('async')
- var PullStream = require('../');
- module.exports = {
- "source sending 1-byte at a time": function (t) {
- t.expect(3);
- var ps = new PullStream({ lowWaterMark : 0 });
- ps.on('finish', function () {
- sourceStream.destroy();
- });
- var sourceStream = new streamBuffers.ReadableStreamBuffer({
- frequency: 0,
- chunkSize: 1
- });
- sourceStream.pipe(ps);
- sourceStream.put("Hello World!");
- ps.pull('Hello'.length, function (err, data) {
- if (err) {
- return t.done(err);
- }
- t.equal('Hello', data.toString());
- var writableStream = new streamBuffers.WritableStreamBuffer({
- initialSize: 100
- });
- writableStream.on('close', function () {
- var str = writableStream.getContentsAsString('utf8');
- t.equal(' World', str);
- ps.pull(function (err, data) {
- if (err) {
- return t.done(err);
- }
- t.equal('!', data.toString());
- return t.done();
- });
- });
- ps.pipe(' World'.length, writableStream);
- });
- },
- "source sending twelve bytes at once": function (t) {
- t.expect(3);
- var ps = new PullStream({ lowWaterMark : 0 });
- ps.on('finish', function () {
- sourceStream.destroy();
- });
- var sourceStream = new streamBuffers.ReadableStreamBuffer({
- frequency: 0,
- chunkSize: 1000
- });
- sourceStream.pipe(ps);
- sourceStream.put("Hello World!");
- ps.pull('Hello'.length, function (err, data) {
- if (err) {
- return t.done(err);
- }
- t.equal('Hello', data.toString());
- var writableStream = new streamBuffers.WritableStreamBuffer({
- initialSize: 100
- });
- writableStream.on('close', function () {
- var str = writableStream.getContentsAsString('utf8');
- t.equal(' World', str);
- ps.pull(function (err, data) {
- if (err) {
- return t.done(err);
- }
- t.equal('!', data.toString());
- return t.done();
- });
- });
- ps.pipe(' World'.length, writableStream);
- });
- },
- "source sending 512 bytes at once": function (t) {
- t.expect(512 / 4);
- var ps = new PullStream({ lowWaterMark : 0 });
- ps.on('finish', function() {
- sourceStream.destroy();
- });
- var values = [];
- for (var i = 0; i < 512; i+=4) {
- values.push(i + 1000);
- }
- var sourceStream = new streamBuffers.ReadableStreamBuffer({
- frequency: 0,
- chunkSize: 1000
- });
- sourceStream.pipe(ps);
- values.forEach(function(val) {
- sourceStream.put(val);
- });
- async.forEachSeries(values, function (val, callback) {
- ps.pull(4, function (err, data) {
- if (err) {
- return callback(err);
- }
- t.equal(val, data.toString());
- return callback(null);
- });
- }, function (err) {
- t.done(err);
- });
- },
- "two length pulls": function (t) {
- t.expect(2);
- var ps = new PullStream({ lowWaterMark : 0 });
- ps.on('finish', function () {
- sourceStream.destroy();
- });
- var sourceStream = new streamBuffers.ReadableStreamBuffer({
- frequency: 0,
- chunkSize: 1000
- });
- sourceStream.pipe(ps);
- sourceStream.put("Hello World!");
- ps.pull('Hello'.length, function (err, data) {
- if (err) {
- return t.done(err);
- }
- t.equal('Hello', data.toString());
- ps.pull(' World!'.length, function (err, data) {
- if (err) {
- return t.done(err);
- }
- t.equal(' World!', data.toString());
- return t.done();
- });
- });
- },
- "pulling zero bytes returns empty data": function (t) {
- t.expect(1);
- var ps = new PullStream({ lowWaterMark : 0 });
- var sourceStream = new streamBuffers.ReadableStreamBuffer({
- chunkSize: 1000
- });
- sourceStream.pipe(ps);
- sourceStream.put("Hello World!");
- ps.pull(0, function (err, data) {
- if (err) {
- return t.done(err);
- }
- t.equal(0, data.length, "data is empty");
- sourceStream.destroy();
- return t.done();
- });
- },
- "read from file": function (t) {
- t.expect(2);
- var ps = new PullStream({ lowWaterMark : 0 });
- var sourceStream = fs.createReadStream(path.join(__dirname, 'testFile.txt'));
- sourceStream.pipe(ps);
- ps.pull('Hello'.length, function (err, data) {
- if (err) {
- return t.done(err);
- }
- t.equal('Hello', data.toString());
- ps.pull(' World!'.length, function (err, data) {
- if (err) {
- return t.done(err);
- }
- t.equal(' World!', data.toString());
- return t.done();
- });
- });
- },
- "read past end of stream": function (t) {
- t.expect(2);
- var ps = new PullStream({ lowWaterMark : 0 });
- ps.on('finish', function () {
- sourceStream.destroy();
- });
- var sourceStream = new streamBuffers.ReadableStreamBuffer({
- frequency: 1,
- chunkSize: 1000
- });
- sourceStream.pipe(ps);
- sourceStream.put("Hello World!");
- ps.pull('Hello World!'.length, function (err, data) {
- if (err) {
- return t.done(err);
- }
- t.equal('Hello World!', data.toString());
- ps.pull(1, function (err, data) {
- if (err) {
- t.ok(err, 'should get an error');
- }
- t.done();
- });
- });
- },
- "pipe with no length": function (t) {
- t.expect(2);
- var ps = new PullStream({ lowWaterMark : 0 });
- ps.on('end', function () {
- t.ok(true, "pullstream should end");
- });
- var writableStream = new streamBuffers.WritableStreamBuffer({
- initialSize: 100
- });
- writableStream.on('close', function () {
- var str = writableStream.getContentsAsString('utf8');
- t.equal('Hello World!', str);
- t.done();
- });
- ps.pipe(writableStream);
- process.nextTick(function () {
- ps.write(new Buffer('Hello', 'utf8'));
- ps.write(new Buffer(' World', 'utf8'));
- process.nextTick(function () {
- ps.write(new Buffer('!', 'utf8'));
- ps.end();
- });
- });
- },
- "throw on calling write() after end": function (t) {
- t.expect(1);
- var ps = new PullStream({ lowWaterMark : 0 });
- ps.end();
- try {
- ps.write(new Buffer('hello', 'utf8'));
- t.fail("should throw error");
- } catch (ex) {
- t.ok(ex);
- }
- t.done();
- },
- "pipe more bytes than the pullstream buffer size": function (t) {
- t.expect(1);
- var ps = new PullStream();
- ps.on('end', function() {
- sourceStream.destroy();
- });
- var aVals = "", bVals = "";
- for (var i = 0; i < 20 * 1000; i++) {
- aVals += 'a';
- }
- for (var i = 0; i < 180 * 1000; i++) {
- bVals += 'b';
- }
- var combined = aVals + bVals;
- var sourceStream = new streamBuffers.ReadableStreamBuffer({
- frequency: 0,
- chunkSize: 40 * 1024
- });
- sourceStream.pipe(ps);
- sourceStream.put(aVals);
- var writableStream = new streamBuffers.WritableStreamBuffer({
- initialSize: 200 * 1000
- });
- writableStream.on('close', function () {
- var str = writableStream.getContentsAsString('utf8');
- t.equal(combined, str);
- t.done();
- });
- ps.once('drain', function () {
- ps.pipe(200 * 1000, writableStream);
- process.nextTick(sourceStream.put.bind(null, bVals));
- });
- },
- "mix asynchronous pull with synchronous pullUpTo - exact number of bytes returned": function (t) {
- t.expect(2);
- var ps = new PullStream();
- var sourceStream = new streamBuffers.ReadableStreamBuffer({
- frequency: 0,
- chunkSize: 1000
- });
- sourceStream.pipe(ps);
- sourceStream.put("Hello World!");
- ps.pull('Hello'.length, function (err, data) {
- if (err) {
- return t.done(err);
- }
- t.equal('Hello', data.toString());
- var data = ps.pullUpTo(" World!".length);
- t.equal(" World!", data.toString());
- sourceStream.destroy();
- t.done();
- });
- },
- "mix asynchronous pull with pullUpTo - fewer bytes returned than requested": function (t) {
- t.expect(2);
- var ps = new PullStream();
- var sourceStream = new streamBuffers.ReadableStreamBuffer({
- frequency: 0,
- chunkSize: 1000
- });
- sourceStream.pipe(ps);
- sourceStream.put("Hello World!");
- ps.pull('Hello'.length, function (err, data) {
- if (err) {
- return t.done(err);
- }
- t.equal('Hello', data.toString());
- var data = ps.pullUpTo(1000);
- t.equal(" World!", data.toString());
- sourceStream.destroy();
- t.done();
- });
- },
- "retrieve all currently remaining bytes": function (t) {
- t.expect(2);
- var ps = new PullStream();
- var sourceStream = new streamBuffers.ReadableStreamBuffer({
- frequency: 0,
- chunkSize: 1000
- });
- sourceStream.pipe(ps);
- sourceStream.put("Hello World!");
- ps.pull('Hello'.length, function (err, data) {
- if (err) {
- return t.done(err);
- }
- t.equal('Hello', data.toString());
- var data = ps.pullUpTo();
- t.equal(" World!", data.toString());
- sourceStream.destroy();
- t.done();
- });
- },
- // TODO: node PassThrough stream doesn't handle unshift the same way anymore.
- // "prepend": function (t) {
- // t.expect(1);
- // var ps = new PullStream();
- //
- // var sourceStream = new streamBuffers.ReadableStreamBuffer();
- //
- // sourceStream.pipe(ps);
- // sourceStream.put("World!");
- // ps.prepend("Hello ");
- //
- // ps.pull('Hello World!'.length, function (err, data) {
- // if (err) {
- // return t.done(err);
- // }
- // t.equal('Hello World!', data.toString());
- // sourceStream.destroy();
- // t.done();
- // });
- // },
- "drain": function (t) {
- t.expect(1);
- var ps = new PullStream();
- var sourceStream = new streamBuffers.ReadableStreamBuffer();
- sourceStream.pipe(ps);
- sourceStream.put("Hello World!");
- ps.drain('Hello '.length, function (err) {
- if (err) {
- return t.done(err);
- }
- ps.pull('World!'.length, function (err, data) {
- t.equal('World!', data.toString());
- sourceStream.destroy();
- t.done();
- });
- });
- }
- };
|