test.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576
  1. 'use strict'
  2. var test = require('tape').test
  3. var from = require('from2')
  4. var sink = require('flush-write-stream')
  5. var cloneable = require('./')
  6. test('basic passthrough', function (t) {
  7. t.plan(2)
  8. var read = false
  9. var source = from(function (size, next) {
  10. if (read) {
  11. this.push(null)
  12. } else {
  13. read = true
  14. this.push('hello world')
  15. }
  16. next()
  17. })
  18. var instance = cloneable(source)
  19. t.notOk(read, 'stream not started')
  20. instance.pipe(sink(function (chunk, enc, cb) {
  21. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  22. cb()
  23. }))
  24. })
  25. test('clone sync', function (t) {
  26. t.plan(4)
  27. var read = false
  28. var source = from(function (size, next) {
  29. if (read) {
  30. this.push(null)
  31. } else {
  32. read = true
  33. this.push('hello world')
  34. }
  35. next()
  36. })
  37. var instance = cloneable(source)
  38. t.notOk(read, 'stream not started')
  39. var cloned = instance.clone()
  40. t.notOk(read, 'stream not started')
  41. instance.pipe(sink(function (chunk, enc, cb) {
  42. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  43. cb()
  44. }))
  45. cloned.pipe(sink(function (chunk, enc, cb) {
  46. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  47. cb()
  48. }))
  49. })
  50. test('clone async', function (t) {
  51. t.plan(4)
  52. var read = false
  53. var source = from(function (size, next) {
  54. if (read) {
  55. this.push(null)
  56. } else {
  57. read = true
  58. this.push('hello world')
  59. }
  60. next()
  61. })
  62. var instance = cloneable(source)
  63. t.notOk(read, 'stream not started')
  64. var cloned = instance.clone()
  65. t.notOk(read, 'stream not started')
  66. instance.pipe(sink(function (chunk, enc, cb) {
  67. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  68. cb()
  69. }))
  70. setImmediate(function () {
  71. cloned.pipe(sink(function (chunk, enc, cb) {
  72. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  73. cb()
  74. }))
  75. })
  76. })
  77. test('basic passthrough in obj mode', function (t) {
  78. t.plan(2)
  79. var read = false
  80. var source = from.obj(function (size, next) {
  81. if (read) {
  82. return this.push(null)
  83. } else {
  84. read = true
  85. this.push({ hello: 'world' })
  86. }
  87. next()
  88. })
  89. var instance = cloneable(source)
  90. t.notOk(read, 'stream not started')
  91. instance.pipe(sink.obj(function (chunk, enc, cb) {
  92. t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
  93. cb()
  94. }))
  95. })
  96. test('multiple clone in object mode', function (t) {
  97. t.plan(4)
  98. var read = false
  99. var source = from.obj(function (size, next) {
  100. if (read) {
  101. return this.push(null)
  102. } else {
  103. read = true
  104. this.push({ hello: 'world' })
  105. }
  106. next()
  107. })
  108. var instance = cloneable(source)
  109. t.notOk(read, 'stream not started')
  110. var cloned = instance.clone()
  111. t.notOk(read, 'stream not started')
  112. instance.pipe(sink.obj(function (chunk, enc, cb) {
  113. t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
  114. cb()
  115. }))
  116. setImmediate(function () {
  117. cloned.pipe(sink.obj(function (chunk, enc, cb) {
  118. t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
  119. cb()
  120. }))
  121. })
  122. })
  123. test('basic passthrough with data event', function (t) {
  124. t.plan(2)
  125. var read = false
  126. var source = from(function (size, next) {
  127. if (read) {
  128. this.push(null)
  129. } else {
  130. read = true
  131. this.push('hello world')
  132. }
  133. next()
  134. })
  135. var instance = cloneable(source)
  136. t.notOk(read, 'stream not started')
  137. var data = ''
  138. instance.on('data', function (chunk) {
  139. data += chunk.toString()
  140. })
  141. instance.on('end', function () {
  142. t.equal(data, 'hello world', 'chunk matches')
  143. })
  144. })
  145. test('basic passthrough with data event on clone', function (t) {
  146. t.plan(3)
  147. var read = false
  148. var source = from(function (size, next) {
  149. if (read) {
  150. this.push(null)
  151. } else {
  152. read = true
  153. this.push('hello world')
  154. }
  155. next()
  156. })
  157. var instance = cloneable(source)
  158. var cloned = instance.clone()
  159. t.notOk(read, 'stream not started')
  160. var data = ''
  161. cloned.on('data', function (chunk) {
  162. data += chunk.toString()
  163. })
  164. cloned.on('end', function () {
  165. t.equal(data, 'hello world', 'chunk matches in clone')
  166. })
  167. instance.pipe(sink(function (chunk, enc, cb) {
  168. t.equal(chunk.toString(), 'hello world', 'chunk matches in instance')
  169. cb()
  170. }))
  171. })
  172. test('errors if cloned after start', function (t) {
  173. t.plan(2)
  174. var source = from(function (size, next) {
  175. this.push('hello world')
  176. this.push(null)
  177. next()
  178. })
  179. var instance = cloneable(source)
  180. instance.pipe(sink(function (chunk, enc, cb) {
  181. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  182. t.throws(function () {
  183. instance.clone()
  184. }, 'throws if cloned after start')
  185. cb()
  186. }))
  187. })
  188. test('basic passthrough with readable event', function (t) {
  189. t.plan(2)
  190. var read = false
  191. var source = from(function (size, next) {
  192. if (read) {
  193. this.push(null)
  194. } else {
  195. read = true
  196. this.push('hello world')
  197. }
  198. next()
  199. })
  200. var instance = cloneable(source)
  201. t.notOk(read, 'stream not started')
  202. var data = ''
  203. instance.on('readable', function () {
  204. var chunk
  205. while ((chunk = this.read()) !== null) {
  206. data += chunk.toString()
  207. }
  208. })
  209. instance.on('end', function () {
  210. t.equal(data, 'hello world', 'chunk matches')
  211. })
  212. })
  213. test('basic passthrough with readable event on clone', function (t) {
  214. t.plan(3)
  215. var read = false
  216. var source = from(function (size, next) {
  217. if (read) {
  218. this.push(null)
  219. } else {
  220. read = true
  221. this.push('hello world')
  222. }
  223. next()
  224. })
  225. var instance = cloneable(source)
  226. var cloned = instance.clone()
  227. t.notOk(read, 'stream not started')
  228. var data = ''
  229. cloned.on('readable', function () {
  230. var chunk
  231. while ((chunk = this.read()) !== null) {
  232. data += chunk.toString()
  233. }
  234. })
  235. cloned.on('end', function () {
  236. t.equal(data, 'hello world', 'chunk matches in clone')
  237. })
  238. instance.pipe(sink(function (chunk, enc, cb) {
  239. t.equal(chunk.toString(), 'hello world', 'chunk matches in instance')
  240. cb()
  241. }))
  242. })
  243. test('source error destroys all', function (t) {
  244. t.plan(5)
  245. var source = from()
  246. var instance = cloneable(source)
  247. var clone = instance.clone()
  248. source.on('error', function (err) {
  249. t.ok(err, 'source errors')
  250. instance.on('error', function (err2) {
  251. t.ok(err === err2, 'instance receives same error')
  252. })
  253. instance.on('close', function () {
  254. t.pass('instance is closed')
  255. })
  256. clone.on('error', function (err3) {
  257. t.ok(err === err3, 'clone receives same error')
  258. })
  259. clone.on('close', function () {
  260. t.pass('clone is closed')
  261. })
  262. })
  263. source.emit('error', new Error())
  264. })
  265. test('source destroy destroys all', function (t) {
  266. t.plan(2)
  267. var source = from()
  268. var instance = cloneable(source)
  269. var clone = instance.clone()
  270. instance.on('close', function () {
  271. t.pass('instance is closed')
  272. })
  273. clone.on('close', function () {
  274. t.pass('clone is closed')
  275. })
  276. source.destroy()
  277. })
  278. test('instance error destroys all but the source', function (t) {
  279. t.plan(4)
  280. var source = from()
  281. var instance = cloneable(source)
  282. var clone = instance.clone()
  283. source.on('close', function () {
  284. t.fail('source should not be closed')
  285. })
  286. instance.on('error', function (err) {
  287. t.is(err.message, 'beep', 'instance errors')
  288. instance.on('close', function () {
  289. t.pass('instance is closed')
  290. })
  291. clone.on('error', function (err3) {
  292. t.ok(err === err3, 'clone receives same error')
  293. })
  294. clone.on('close', function () {
  295. t.pass('clone is closed')
  296. })
  297. })
  298. instance.destroy(new Error('beep'))
  299. })
  300. test('instance destroy destroys all but the source', function (t) {
  301. t.plan(2)
  302. var source = from()
  303. var instance = cloneable(source)
  304. var clone = instance.clone()
  305. source.on('close', function () {
  306. t.fail('source should not be closed')
  307. })
  308. instance.on('close', function () {
  309. t.pass('instance is closed')
  310. })
  311. clone.on('close', function () {
  312. t.pass('clone is closed')
  313. })
  314. instance.destroy()
  315. })
  316. test('clone destroy does not affect other clones, cloneable or source', function (t) {
  317. t.plan(1)
  318. var source = from()
  319. var instance = cloneable(source)
  320. var clone = instance.clone()
  321. var other = instance.clone()
  322. source.on('close', function () {
  323. t.fail('source should not be closed')
  324. })
  325. instance.on('close', function () {
  326. t.fail('instance should not be closed')
  327. })
  328. other.on('close', function () {
  329. t.fail('other clone should not be closed')
  330. })
  331. clone.on('close', function () {
  332. t.pass('clone is closed')
  333. })
  334. clone.destroy()
  335. })
  336. test('clone remains readable if other is destroyed', function (t) {
  337. t.plan(3)
  338. var read = false
  339. var source = from(function (size, next) {
  340. if (read) {
  341. this.push(null)
  342. } else {
  343. read = true
  344. this.push('hello')
  345. }
  346. next()
  347. })
  348. var instance = cloneable(source)
  349. var clone = instance.clone()
  350. var other = instance.clone()
  351. instance.pipe(sink.obj(function (chunk, enc, cb) {
  352. t.deepEqual(chunk.toString(), 'hello', 'instance chunk matches')
  353. cb()
  354. }))
  355. clone.pipe(sink.obj(function (chunk, enc, cb) {
  356. t.deepEqual(chunk.toString(), 'hello', 'clone chunk matches')
  357. cb()
  358. }))
  359. clone.on('close', function () {
  360. t.fail('clone should not be closed')
  361. })
  362. instance.on('close', function () {
  363. t.fail('instance should not be closed')
  364. })
  365. other.on('close', function () {
  366. t.pass('other is closed')
  367. })
  368. other.destroy()
  369. })
  370. test('clone of clone', function (t) {
  371. t.plan(6)
  372. var read = false
  373. var source = from(function (size, next) {
  374. if (read) {
  375. this.push(null)
  376. } else {
  377. read = true
  378. this.push('hello world')
  379. }
  380. next()
  381. })
  382. var instance = cloneable(source)
  383. t.notOk(read, 'stream not started')
  384. var cloned = instance.clone()
  385. t.notOk(read, 'stream not started')
  386. var replica = cloned.clone()
  387. t.notOk(read, 'stream not started')
  388. instance.pipe(sink(function (chunk, enc, cb) {
  389. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  390. cb()
  391. }))
  392. cloned.pipe(sink(function (chunk, enc, cb) {
  393. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  394. cb()
  395. }))
  396. replica.pipe(sink(function (chunk, enc, cb) {
  397. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  398. cb()
  399. }))
  400. })
  401. test('from vinyl', function (t) {
  402. t.plan(3)
  403. var source = from(['wa', 'dup'])
  404. var instance = cloneable(source)
  405. var clone = instance.clone()
  406. var data = ''
  407. var data2 = ''
  408. var ends = 2
  409. function latch () {
  410. if (--ends === 0) {
  411. t.equal(data, data2)
  412. }
  413. }
  414. instance.on('data', function (chunk) {
  415. data += chunk.toString()
  416. })
  417. process.nextTick(function () {
  418. t.equal('', data, 'nothing was written yet')
  419. t.equal('', data2, 'nothing was written yet')
  420. clone.on('data', function (chunk) {
  421. data2 += chunk.toString()
  422. })
  423. })
  424. instance.on('end', latch)
  425. clone.on('end', latch)
  426. })
  427. test('waits till all are flowing', function (t) {
  428. t.plan(1)
  429. var source = from(['wa', 'dup'])
  430. var instance = cloneable(source)
  431. // we create a clone
  432. instance.clone()
  433. instance.on('data', function (chunk) {
  434. t.fail('this should never happen')
  435. })
  436. process.nextTick(function () {
  437. t.pass('wait till nextTick')
  438. })
  439. })
  440. test('isCloneable', function (t) {
  441. t.plan(4)
  442. var source = from(['hello', ' ', 'world'])
  443. t.notOk(cloneable.isCloneable(source), 'a generic readable is not cloneable')
  444. var instance = cloneable(source)
  445. t.ok(cloneable.isCloneable(instance), 'a cloneable is cloneable')
  446. var clone = instance.clone()
  447. t.ok(cloneable.isCloneable(clone), 'a clone is cloneable')
  448. var cloneClone = clone.clone()
  449. t.ok(cloneable.isCloneable(cloneClone), 'a clone of a clone is cloneable')
  450. })