You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

259 lines
8.1 KiB

  1. /*jshint esversion: 6 */
  2. var Pins = module.exports;
  3. const Fs = require("fs");
  4. const Path = require("path");
  5. const Util = require("./common-util");
  6. const Plan = require("./plan");
  7. const Semaphore = require('saferphore');
  8. const nThen = require('nthen');
  9. /* Accepts a reference to an object, and...
  10. either a string describing which log is being processed (backwards compatibility),
  11. or a function which will log the error with all relevant data
  12. */
  13. var createLineHandler = Pins.createLineHandler = function (ref, errorHandler) {
  14. var fileName;
  15. if (typeof(errorHandler) === 'string') {
  16. fileName = errorHandler;
  17. errorHandler = function (label, data) {
  18. console.error(label, {
  19. log: fileName,
  20. data: data,
  21. });
  22. };
  23. }
  24. // passing the reference to an object allows us to overwrite accumulated pins
  25. // make sure to get ref.pins as the result
  26. // it's a weird API but it's faster than unpinning manually
  27. var pins = ref.pins = {};
  28. ref.index = 0;
  29. ref.latest = 0; // the latest message (timestamp in ms)
  30. ref.surplus = 0; // how many lines exist behind a reset
  31. return function (line) {
  32. ref.index++;
  33. if (!Boolean(line)) { return; }
  34. var l;
  35. try {
  36. l = JSON.parse(line);
  37. } catch (e) {
  38. return void errorHandler('PIN_LINE_PARSE_ERROR', line);
  39. }
  40. if (!Array.isArray(l)) {
  41. return void errorHandler('PIN_LINE_NOT_FORMAT_ERROR', l);
  42. }
  43. if (typeof(l[2]) === 'number') {
  44. ref.latest = l[2]; // date
  45. }
  46. switch (l[0]) {
  47. case 'RESET': {
  48. pins = ref.pins = {};
  49. if (l[1] && l[1].length) { l[1].forEach((x) => { ref.pins[x] = 1; }); }
  50. ref.surplus = ref.index;
  51. //jshint -W086
  52. // fallthrough
  53. }
  54. case 'PIN': {
  55. l[1].forEach((x) => { pins[x] = 1; });
  56. break;
  57. }
  58. case 'UNPIN': {
  59. l[1].forEach((x) => { delete pins[x]; });
  60. break;
  61. }
  62. default:
  63. errorHandler("PIN_LINE_UNSUPPORTED_COMMAND", l);
  64. }
  65. };
  66. };
  67. /*
  68. takes contents of a pinFile (UTF8 string)
  69. and the pin file's name
  70. returns an array of of channel ids which are pinned
  71. throw errors on pin logs with invalid pin data
  72. */
  73. Pins.calculateFromLog = function (pinFile, fileName) {
  74. var ref = {};
  75. var handler = createLineHandler(ref, fileName);
  76. pinFile.split('\n').forEach(handler);
  77. return Object.keys(ref.pins);
  78. };
  79. /*
  80. pins/
  81. pins/A+/
  82. pins/A+/A+hyhrQLrgYixOomZYxpuEhwfiVzKk1bBp+arH-zbgo=.ndjson
  83. */
  84. const getSafeKeyFromPath = function (path) {
  85. return path.replace(/^.*\//, '').replace(/\.ndjson/, '');
  86. };
  87. const addUserPinToState = Pins.addUserPinToState = function (state, safeKey, itemId) {
  88. (state[itemId] = state[itemId] || {})[safeKey] = 1;
  89. };
  90. Pins.list = function (_done, config) {
  91. // allow for a configurable pin store location
  92. const pinPath = config.pinPath || './data/pins';
  93. // allow for a configurable amount of parallelism
  94. const plan = Plan(config.workers || 5);
  95. // run a supplied handler whenever you finish reading a log
  96. // or noop if not supplied.
  97. const handler = config.handler || function () {};
  98. // use and mutate a supplied object for state if it's passed
  99. const pinned = config.pinned || {};
  100. var isDone = false;
  101. // ensure that 'done' is only called once
  102. // that it calls back asynchronously
  103. // and that it sets 'isDone' to true, so that pending processes
  104. // know to abort
  105. const done = Util.once(Util.both(Util.mkAsync(_done), function () {
  106. isDone = true;
  107. }));
  108. const errorHandler = function (label, info) {
  109. console.log(label, info);
  110. };
  111. // TODO replace this with lib-readline?
  112. const streamFile = function (path, cb) {
  113. const id = getSafeKeyFromPath(path);
  114. return void Fs.readFile(path, 'utf8', function (err, body) {
  115. if (err) { return void cb(err); }
  116. const ref = {};
  117. const pinHandler = createLineHandler(ref, errorHandler);
  118. var lines = body.split('\n');
  119. lines.forEach(pinHandler);
  120. handler(ref, id, pinned);
  121. cb(void 0, ref);
  122. });
  123. };
  124. const scanDirectory = function (path, cb) {
  125. Fs.readdir(path, function (err, list) {
  126. if (err) {
  127. return void cb(err);
  128. }
  129. cb(void 0, list.map(function (item) {
  130. return {
  131. path: Path.join(path, item),
  132. id: item.replace(/\.ndjson$/, ''),
  133. };
  134. }));
  135. });
  136. };
  137. scanDirectory(pinPath, function (err, dirs) {
  138. if (err) {
  139. if (err.code === 'ENOENT') { return void done(void 0, {}); }
  140. return void done(err);
  141. }
  142. dirs.forEach(function (dir) {
  143. plan.job(1, function (next) {
  144. if (isDone) { return void next(); }
  145. scanDirectory(dir.path, function (nested_err, logs) {
  146. if (nested_err) {
  147. return void done(err);
  148. }
  149. logs.forEach(function (log) {
  150. if (!/\.ndjson$/.test(log.path)) { return; }
  151. plan.job(0, function (next) {
  152. if (isDone) { return void next(); }
  153. streamFile(log.path, function (err, ref) {
  154. if (err) { return void done(err); }
  155. var set = ref.pins;
  156. for (var item in set) {
  157. addUserPinToState(pinned, log.id, item);
  158. }
  159. next();
  160. });
  161. });
  162. });
  163. next();
  164. });
  165. });
  166. });
  167. plan.done(function () {
  168. // err ?
  169. done(void 0, pinned);
  170. }).start();
  171. });
  172. };
  173. Pins.load = function (cb, config) {
  174. const sema = Semaphore.create(config.workers || 5);
  175. let dirList;
  176. const fileList = [];
  177. const pinned = {};
  178. var pinPath = config.pinPath || './pins';
  179. var done = Util.once(cb);
  180. nThen((waitFor) => {
  181. // recurse over the configured pinPath, or the default
  182. Fs.readdir(pinPath, waitFor((err, list) => {
  183. if (err) {
  184. if (err.code === 'ENOENT') {
  185. dirList = [];
  186. return; // this ends up calling back with an empty object
  187. }
  188. waitFor.abort();
  189. return void done(err);
  190. }
  191. dirList = list;
  192. }));
  193. }).nThen((waitFor) => {
  194. dirList.forEach((f) => {
  195. sema.take((returnAfter) => {
  196. // iterate over all the subdirectories in the pin store
  197. Fs.readdir(Path.join(pinPath, f), waitFor(returnAfter((err, list2) => {
  198. if (err) {
  199. waitFor.abort();
  200. return void done(err);
  201. }
  202. list2.forEach((ff) => {
  203. if (config && config.exclude && config.exclude.indexOf(ff) > -1) { return; }
  204. fileList.push(Path.join(pinPath, f, ff));
  205. });
  206. })));
  207. });
  208. });
  209. }).nThen((waitFor) => {
  210. fileList.forEach((f) => {
  211. sema.take((returnAfter) => {
  212. Fs.readFile(f, waitFor(returnAfter((err, content) => {
  213. if (err) {
  214. waitFor.abort();
  215. return void done(err);
  216. }
  217. const hashes = Pins.calculateFromLog(content.toString('utf8'), f);
  218. hashes.forEach((x) => {
  219. (pinned[x] = pinned[x] || {})[f.replace(/.*\/([^/]*).ndjson$/, (x, y)=>y)] = 1;
  220. });
  221. })));
  222. });
  223. });
  224. }).nThen(() => {
  225. done(void 0, pinned);
  226. });
  227. };