pullStreamTest.js 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. 'use strict';
  2. var nodeunit = require('nodeunit');
  3. var fs = require("fs");
  4. var path = require("path");
  5. var streamBuffers = require("stream-buffers");
  6. var async = require('async')
  7. var PullStream = require('../');
  8. module.exports = {
  9. "source sending 1-byte at a time": function (t) {
  10. t.expect(3);
  11. var ps = new PullStream({ lowWaterMark : 0 });
  12. ps.on('finish', function () {
  13. sourceStream.destroy();
  14. });
  15. var sourceStream = new streamBuffers.ReadableStreamBuffer({
  16. frequency: 0,
  17. chunkSize: 1
  18. });
  19. sourceStream.pipe(ps);
  20. sourceStream.put("Hello World!");
  21. ps.pull('Hello'.length, function (err, data) {
  22. if (err) {
  23. return t.done(err);
  24. }
  25. t.equal('Hello', data.toString());
  26. var writableStream = new streamBuffers.WritableStreamBuffer({
  27. initialSize: 100
  28. });
  29. writableStream.on('close', function () {
  30. var str = writableStream.getContentsAsString('utf8');
  31. t.equal(' World', str);
  32. ps.pull(function (err, data) {
  33. if (err) {
  34. return t.done(err);
  35. }
  36. t.equal('!', data.toString());
  37. return t.done();
  38. });
  39. });
  40. ps.pipe(' World'.length, writableStream);
  41. });
  42. },
  43. "source sending twelve bytes at once": function (t) {
  44. t.expect(3);
  45. var ps = new PullStream({ lowWaterMark : 0 });
  46. ps.on('finish', function () {
  47. sourceStream.destroy();
  48. });
  49. var sourceStream = new streamBuffers.ReadableStreamBuffer({
  50. frequency: 0,
  51. chunkSize: 1000
  52. });
  53. sourceStream.pipe(ps);
  54. sourceStream.put("Hello World!");
  55. ps.pull('Hello'.length, function (err, data) {
  56. if (err) {
  57. return t.done(err);
  58. }
  59. t.equal('Hello', data.toString());
  60. var writableStream = new streamBuffers.WritableStreamBuffer({
  61. initialSize: 100
  62. });
  63. writableStream.on('close', function () {
  64. var str = writableStream.getContentsAsString('utf8');
  65. t.equal(' World', str);
  66. ps.pull(function (err, data) {
  67. if (err) {
  68. return t.done(err);
  69. }
  70. t.equal('!', data.toString());
  71. return t.done();
  72. });
  73. });
  74. ps.pipe(' World'.length, writableStream);
  75. });
  76. },
  77. "source sending 512 bytes at once": function (t) {
  78. t.expect(512 / 4);
  79. var ps = new PullStream({ lowWaterMark : 0 });
  80. ps.on('finish', function() {
  81. sourceStream.destroy();
  82. });
  83. var values = [];
  84. for (var i = 0; i < 512; i+=4) {
  85. values.push(i + 1000);
  86. }
  87. var sourceStream = new streamBuffers.ReadableStreamBuffer({
  88. frequency: 0,
  89. chunkSize: 1000
  90. });
  91. sourceStream.pipe(ps);
  92. values.forEach(function(val) {
  93. sourceStream.put(val);
  94. });
  95. async.forEachSeries(values, function (val, callback) {
  96. ps.pull(4, function (err, data) {
  97. if (err) {
  98. return callback(err);
  99. }
  100. t.equal(val, data.toString());
  101. return callback(null);
  102. });
  103. }, function (err) {
  104. t.done(err);
  105. });
  106. },
  107. "two length pulls": function (t) {
  108. t.expect(2);
  109. var ps = new PullStream({ lowWaterMark : 0 });
  110. ps.on('finish', function () {
  111. sourceStream.destroy();
  112. });
  113. var sourceStream = new streamBuffers.ReadableStreamBuffer({
  114. frequency: 0,
  115. chunkSize: 1000
  116. });
  117. sourceStream.pipe(ps);
  118. sourceStream.put("Hello World!");
  119. ps.pull('Hello'.length, function (err, data) {
  120. if (err) {
  121. return t.done(err);
  122. }
  123. t.equal('Hello', data.toString());
  124. ps.pull(' World!'.length, function (err, data) {
  125. if (err) {
  126. return t.done(err);
  127. }
  128. t.equal(' World!', data.toString());
  129. return t.done();
  130. });
  131. });
  132. },
  133. "pulling zero bytes returns empty data": function (t) {
  134. t.expect(1);
  135. var ps = new PullStream({ lowWaterMark : 0 });
  136. var sourceStream = new streamBuffers.ReadableStreamBuffer({
  137. chunkSize: 1000
  138. });
  139. sourceStream.pipe(ps);
  140. sourceStream.put("Hello World!");
  141. ps.pull(0, function (err, data) {
  142. if (err) {
  143. return t.done(err);
  144. }
  145. t.equal(0, data.length, "data is empty");
  146. sourceStream.destroy();
  147. return t.done();
  148. });
  149. },
  150. "read from file": function (t) {
  151. t.expect(2);
  152. var ps = new PullStream({ lowWaterMark : 0 });
  153. var sourceStream = fs.createReadStream(path.join(__dirname, 'testFile.txt'));
  154. sourceStream.pipe(ps);
  155. ps.pull('Hello'.length, function (err, data) {
  156. if (err) {
  157. return t.done(err);
  158. }
  159. t.equal('Hello', data.toString());
  160. ps.pull(' World!'.length, function (err, data) {
  161. if (err) {
  162. return t.done(err);
  163. }
  164. t.equal(' World!', data.toString());
  165. return t.done();
  166. });
  167. });
  168. },
  169. "read past end of stream": function (t) {
  170. t.expect(2);
  171. var ps = new PullStream({ lowWaterMark : 0 });
  172. ps.on('finish', function () {
  173. sourceStream.destroy();
  174. });
  175. var sourceStream = new streamBuffers.ReadableStreamBuffer({
  176. frequency: 1,
  177. chunkSize: 1000
  178. });
  179. sourceStream.pipe(ps);
  180. sourceStream.put("Hello World!");
  181. ps.pull('Hello World!'.length, function (err, data) {
  182. if (err) {
  183. return t.done(err);
  184. }
  185. t.equal('Hello World!', data.toString());
  186. ps.pull(1, function (err, data) {
  187. if (err) {
  188. t.ok(err, 'should get an error');
  189. }
  190. t.done();
  191. });
  192. });
  193. },
  194. "pipe with no length": function (t) {
  195. t.expect(2);
  196. var ps = new PullStream({ lowWaterMark : 0 });
  197. ps.on('end', function () {
  198. t.ok(true, "pullstream should end");
  199. });
  200. var writableStream = new streamBuffers.WritableStreamBuffer({
  201. initialSize: 100
  202. });
  203. writableStream.on('close', function () {
  204. var str = writableStream.getContentsAsString('utf8');
  205. t.equal('Hello World!', str);
  206. t.done();
  207. });
  208. ps.pipe(writableStream);
  209. process.nextTick(function () {
  210. ps.write(new Buffer('Hello', 'utf8'));
  211. ps.write(new Buffer(' World', 'utf8'));
  212. process.nextTick(function () {
  213. ps.write(new Buffer('!', 'utf8'));
  214. ps.end();
  215. });
  216. });
  217. },
  218. "throw on calling write() after end": function (t) {
  219. t.expect(1);
  220. var ps = new PullStream({ lowWaterMark : 0 });
  221. ps.end();
  222. try {
  223. ps.write(new Buffer('hello', 'utf8'));
  224. t.fail("should throw error");
  225. } catch (ex) {
  226. t.ok(ex);
  227. }
  228. t.done();
  229. },
  230. "pipe more bytes than the pullstream buffer size": function (t) {
  231. t.expect(1);
  232. var ps = new PullStream();
  233. ps.on('end', function() {
  234. sourceStream.destroy();
  235. });
  236. var aVals = "", bVals = "";
  237. for (var i = 0; i < 20 * 1000; i++) {
  238. aVals += 'a';
  239. }
  240. for (var i = 0; i < 180 * 1000; i++) {
  241. bVals += 'b';
  242. }
  243. var combined = aVals + bVals;
  244. var sourceStream = new streamBuffers.ReadableStreamBuffer({
  245. frequency: 0,
  246. chunkSize: 40 * 1024
  247. });
  248. sourceStream.pipe(ps);
  249. sourceStream.put(aVals);
  250. var writableStream = new streamBuffers.WritableStreamBuffer({
  251. initialSize: 200 * 1000
  252. });
  253. writableStream.on('close', function () {
  254. var str = writableStream.getContentsAsString('utf8');
  255. t.equal(combined, str);
  256. t.done();
  257. });
  258. ps.once('drain', function () {
  259. ps.pipe(200 * 1000, writableStream);
  260. process.nextTick(sourceStream.put.bind(null, bVals));
  261. });
  262. },
  263. "mix asynchronous pull with synchronous pullUpTo - exact number of bytes returned": function (t) {
  264. t.expect(2);
  265. var ps = new PullStream();
  266. var sourceStream = new streamBuffers.ReadableStreamBuffer({
  267. frequency: 0,
  268. chunkSize: 1000
  269. });
  270. sourceStream.pipe(ps);
  271. sourceStream.put("Hello World!");
  272. ps.pull('Hello'.length, function (err, data) {
  273. if (err) {
  274. return t.done(err);
  275. }
  276. t.equal('Hello', data.toString());
  277. var data = ps.pullUpTo(" World!".length);
  278. t.equal(" World!", data.toString());
  279. sourceStream.destroy();
  280. t.done();
  281. });
  282. },
  283. "mix asynchronous pull with pullUpTo - fewer bytes returned than requested": function (t) {
  284. t.expect(2);
  285. var ps = new PullStream();
  286. var sourceStream = new streamBuffers.ReadableStreamBuffer({
  287. frequency: 0,
  288. chunkSize: 1000
  289. });
  290. sourceStream.pipe(ps);
  291. sourceStream.put("Hello World!");
  292. ps.pull('Hello'.length, function (err, data) {
  293. if (err) {
  294. return t.done(err);
  295. }
  296. t.equal('Hello', data.toString());
  297. var data = ps.pullUpTo(1000);
  298. t.equal(" World!", data.toString());
  299. sourceStream.destroy();
  300. t.done();
  301. });
  302. },
  303. "retrieve all currently remaining bytes": function (t) {
  304. t.expect(2);
  305. var ps = new PullStream();
  306. var sourceStream = new streamBuffers.ReadableStreamBuffer({
  307. frequency: 0,
  308. chunkSize: 1000
  309. });
  310. sourceStream.pipe(ps);
  311. sourceStream.put("Hello World!");
  312. ps.pull('Hello'.length, function (err, data) {
  313. if (err) {
  314. return t.done(err);
  315. }
  316. t.equal('Hello', data.toString());
  317. var data = ps.pullUpTo();
  318. t.equal(" World!", data.toString());
  319. sourceStream.destroy();
  320. t.done();
  321. });
  322. },
  323. // TODO: node PassThrough stream doesn't handle unshift the same way anymore.
  324. // "prepend": function (t) {
  325. // t.expect(1);
  326. // var ps = new PullStream();
  327. //
  328. // var sourceStream = new streamBuffers.ReadableStreamBuffer();
  329. //
  330. // sourceStream.pipe(ps);
  331. // sourceStream.put("World!");
  332. // ps.prepend("Hello ");
  333. //
  334. // ps.pull('Hello World!'.length, function (err, data) {
  335. // if (err) {
  336. // return t.done(err);
  337. // }
  338. // t.equal('Hello World!', data.toString());
  339. // sourceStream.destroy();
  340. // t.done();
  341. // });
  342. // },
  343. "drain": function (t) {
  344. t.expect(1);
  345. var ps = new PullStream();
  346. var sourceStream = new streamBuffers.ReadableStreamBuffer();
  347. sourceStream.pipe(ps);
  348. sourceStream.put("Hello World!");
  349. ps.drain('Hello '.length, function (err) {
  350. if (err) {
  351. return t.done(err);
  352. }
  353. ps.pull('World!'.length, function (err, data) {
  354. t.equal('World!', data.toString());
  355. sourceStream.destroy();
  356. t.done();
  357. });
  358. });
  359. }
  360. };