You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

84 lines
2.9 KiB

  1. /* jshint esversion: 6 */
  2. /* global Buffer */
  3. const ToPull = require('stream-to-pull-stream');
  4. const Pull = require('pull-stream');
  5. const Stream = module.exports;
  6. // transform a stream of arbitrarily divided data
  7. // into a stream of buffers divided by newlines in the source stream
  8. // TODO see if we could improve performance by using libnewline
  9. const NEWLINE_CHR = ('\n').charCodeAt(0);
  10. const mkBufferSplit = () => {
  11. let remainder = null;
  12. return Pull((read) => {
  13. return (abort, cb) => {
  14. read(abort, function (end, data) {
  15. if (end) {
  16. if (data) { console.log("mkBufferSplit() Data at the end"); }
  17. cb(end, remainder ? [remainder, data] : [data]);
  18. remainder = null;
  19. return;
  20. }
  21. const queue = [];
  22. for (;;) {
  23. const offset = data.indexOf(NEWLINE_CHR);
  24. if (offset < 0) {
  25. remainder = remainder ? Buffer.concat([remainder, data]) : data;
  26. break;
  27. }
  28. let subArray = data.slice(0, offset);
  29. if (remainder) {
  30. subArray = Buffer.concat([remainder, subArray]);
  31. remainder = null;
  32. }
  33. queue.push(subArray);
  34. data = data.slice(offset + 1);
  35. }
  36. cb(end, queue);
  37. });
  38. };
  39. }, Pull.flatten());
  40. };
  41. // return a streaming function which transforms buffers into objects
  42. // containing the buffer and the offset from the start of the stream
  43. const mkOffsetCounter = () => {
  44. let offset = 0;
  45. return Pull.map((buff) => {
  46. const out = { offset: offset, buff: buff };
  47. // +1 for the eaten newline
  48. offset += buff.length + 1;
  49. return out;
  50. });
  51. };
  52. // readMessagesBin asynchronously iterates over the messages in a channel log
  53. // the handler for each message must call back to read more, which should mean
  54. // that this function has a lower memory profile than our classic method
  55. // of reading logs line by line.
  56. // it also allows the handler to abort reading at any time
  57. Stream.readFileBin = (stream, msgHandler, cb) => {
  58. //const stream = Fs.createReadStream(path, { start: start });
  59. let keepReading = true;
  60. Pull(
  61. ToPull.read(stream),
  62. mkBufferSplit(),
  63. mkOffsetCounter(),
  64. Pull.asyncMap((data, moreCb) => {
  65. msgHandler(data, moreCb, () => {
  66. try {
  67. stream.close();
  68. } catch (err) {
  69. console.error("READ_FILE_BIN_ERR", err);
  70. }
  71. keepReading = false;
  72. moreCb();
  73. });
  74. }),
  75. Pull.drain(() => (keepReading), (err) => {
  76. cb((keepReading) ? err : undefined);
  77. })
  78. );
  79. };