index.js 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. //filter will reemit the data if cb(err,pass) pass is truthy
  2. // reduce is more tricky
  3. // maybe we want to group the reductions or emit progress updates occasionally
  4. // the most basic reduce just emits one 'data' event after it has recieved 'end'
  5. var through = require('through')
  6. var Decoder = require('string_decoder').StringDecoder
  7. module.exports = split
  8. //TODO pass in a function to map across the lines.
  9. function split (matcher, mapper) {
  10. var decoder = new Decoder()
  11. var soFar = ''
  12. if('function' === typeof matcher)
  13. mapper = matcher, matcher = null
  14. if (!matcher)
  15. matcher = /\r?\n/
  16. function emit(stream, piece) {
  17. if(mapper) {
  18. try {
  19. piece = mapper(piece)
  20. }
  21. catch (err) {
  22. return stream.emit('error', err)
  23. }
  24. if('undefined' !== typeof piece)
  25. stream.queue(piece)
  26. }
  27. else
  28. stream.queue(piece)
  29. }
  30. function next (stream, buffer) {
  31. var pieces = (soFar + buffer).split(matcher)
  32. soFar = pieces.pop()
  33. for (var i = 0; i < pieces.length; i++) {
  34. var piece = pieces[i]
  35. emit(stream, piece)
  36. }
  37. }
  38. return through(function (b) {
  39. next(this, decoder.write(b))
  40. },
  41. function () {
  42. if(decoder.end)
  43. next(this, decoder.end())
  44. if(soFar != null)
  45. emit(this, soFar)
  46. this.queue(null)
  47. })
  48. }