You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

235 lines
8.9 KiB

11 months ago
11 months ago
  1. /*
  2. There are many situations where we want to do lots of little jobs
  3. in parallel and with few constraints as to their ordering.
  4. One example is recursing over a bunch of directories and reading files.
  5. The naive way to do this is to recurse over all the subdirectories
  6. relative to a root while adding files to a list. Then to iterate over
  7. the files in that list. Unfortunately, this means holding the complete
  8. list of file paths in memory, which can't possible scale as our database grows.
  9. A better way to do this is to recurse into one directory and
  10. iterate over its contents until there are no more, then to backtrack
  11. to the next directory and repeat until no more directories exist.
  12. This kind of thing is easy enough when you perform one task at a time
  13. and use synchronous code, but with multiple asynchronous tasks it's
  14. easy to introduce subtle bugs.
  15. This module is designed for these situations. It allows you to easily
  16. and efficiently schedule a large number of tasks with an associated
  17. degree of priority from 0 (highest priority) to Number.MAX_SAFE_INTEGER.
  18. Initialize your scheduler with a degree of parallelism, and start planning
  19. some initial jobs. Set it to run and it will keep going until all jobs are
  20. complete, at which point it will optionally execute a 'done' callback.
  21. Getting back to the original example:
  22. List the contents of the root directory, then plan subsequent jobs
  23. with a priority of 1 to recurse into subdirectories. The callback
  24. of each of these recursions can then plan higher priority tasks
  25. to actually process the contained files with a priority of 0.
  26. As long as there are more files scheduled it will continue to process
  27. them first. When there are no more files the scheduler will read
  28. the next directory and repopulate the list of files to process.
  29. This will repeat until everything is done.
  30. // load the module
  31. const Plan = require("./plan");
  32. // instantiate a scheduler with a parallelism of 5
  33. var plan = Plan(5)
  34. // plan the first job which schedules more jobs...
  35. .job(1, function (next) {
  36. listRootDirectory(function (files) {
  37. files.forEach(function (file) {
  38. // highest priority, run as soon as there is a free worker
  39. plan.job(0, function (next) {
  40. processFile(file, function (result) {
  41. console.log(result);
  42. // don't forget to call next
  43. next();
  44. });
  45. });
  46. });
  47. next(); // call 'next' to free up one worker
  48. });
  49. })
  50. // chain commands together if you want
  51. .done(function () {
  52. console.log("DONE");
  53. })
  54. // it won't run unless you launch it
  55. .start();
  56. */
  57. module.exports = function (max) {
  58. var plan = {};
  59. max = max || 5;
  60. // finds an id that isn't in use in a particular map
  61. // accepts an id in case you have one already chosen
  62. // otherwise generates random new ids if one is not passed
  63. // or if there is a collision
  64. var uid = function (map, id) {
  65. if (typeof(id) === 'undefined') {
  66. id = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER);
  67. }
  68. if (id && typeof(map[id]) === 'undefined') {
  69. return id;
  70. }
  71. return uid(map);
  72. };
  73. // the queue of jobs is an array, which will be populated
  74. // with maps for each level of priority
  75. var jobs = [];
  76. // the count of currently running jobs
  77. var count = 0;
  78. // a list of callbacks to be executed once everything is done
  79. var completeHandlers = [];
  80. // the recommended usage is to create a new scheduler for every job
  81. // use it for internals in a scope, and let the garbage collector
  82. // clean up when everything stops. This means you shouldn't
  83. // go passing 'plan' around in a long-lived process!
  84. var FINISHED = false;
  85. var done = function () {
  86. // 'done' gets called when there are no more jobs in the queue
  87. // but other jobs might still be running...
  88. // the count of running processes should never be less than zero
  89. // because we guard against multiple callbacks
  90. if (count < 0) { throw new Error("should never happen"); }
  91. // greater than zero is definitely possible, it just means you aren't done yet
  92. if (count !== 0) { return; }
  93. // you will finish twice if you call 'start' a second time
  94. // this behaviour isn't supported yet.
  95. if (FINISHED) { throw new Error('finished twice'); }
  96. FINISHED = true;
  97. // execute all your 'done' callbacks
  98. completeHandlers.forEach(function (f) { f(); });
  99. };
  100. var run;
  101. // this 'next' is internal only.
  102. // it iterates over all known jobs, running them until
  103. // the scheduler achieves the desired amount of parallelism.
  104. // If there are no more jobs it will call 'done'
  105. // which will shortcircuit if there are still pending tasks.
  106. // Whenever any tasks finishes it will return its lock and
  107. // run as many new jobs as are allowed.
  108. var next = function () {
  109. // array.some skips over bare indexes in sparse arrays
  110. var pending = jobs.some(function (bag /*, priority*/) {
  111. if (!bag || typeof(bag) !== 'object') { return; }
  112. // a bag is a map of jobs for any particular degree of priority
  113. // iterate over jobs in the bag until you're out of 'workers'
  114. for (var id in bag) {
  115. // bail out if you hit max parallelism
  116. if (count >= max) { return true; }
  117. run(bag, id, next);
  118. }
  119. });
  120. // check whether you're done if you hit the end of the array
  121. if (!pending) { done(); }
  122. };
  123. // and here's the part that actually handles jobs...
  124. run = function (bag, id) {
  125. // this is just a sanity check.
  126. // there should only ever be jobs in each bag.
  127. if (typeof(bag[id]) !== 'function') {
  128. throw new Error("expected function");
  129. }
  130. // keep a local reference to the function
  131. var f = bag[id];
  132. // remove it from the bag.
  133. delete bag[id];
  134. // increment the count of running jobs
  135. count++;
  136. // guard against it being called twice.
  137. var called = false;
  138. f(function () {
  139. // watch out! it'll bite you.
  140. // maybe this should just return?
  141. // support that option for 'production' ?
  142. if (called) { throw new Error("called twice"); }
  143. // the code below is safe because we can't call back a second time
  144. called = true;
  145. // decrement the count of running jobs...
  146. count--;
  147. // and finally call next to replace this worker with more job(s)
  148. next();
  149. });
  150. };
  151. // this is exposed as API
  152. plan.job = function (priority, cb) {
  153. // you have to pass both the priority (a non-negative number) and an actual job
  154. if (typeof(priority) !== 'number' || priority < 0) { throw new Error('expected a non-negative number'); }
  155. // a job is an asynchronous function that takes a single parameter:
  156. // a 'next' callback which will keep the whole thing going.
  157. // forgetting to call 'next' means you'll never complete.
  158. if (typeof(cb) !== 'function') { throw new Error('expected function'); }
  159. // initialize the specified priority level if it doesn't already exist
  160. var bag = jobs[priority] = jobs[priority] || {};
  161. // choose a random id that isn't already in use for this priority level
  162. var id = uid(bag);
  163. // add the job to this priority level's bag
  164. // most (all?) javascript engines will append this job to the bottom
  165. // of the map. Meaning when we iterate it will be run later than
  166. // other jobs that were scheduled first, effectively making a FIFO queue.
  167. // However, this is undefined behaviour and you shouldn't ever rely on it.
  168. bag[id] = function (next) {
  169. cb(next);
  170. };
  171. // returning 'plan' lets us chain methods together.
  172. return plan;
  173. };
  174. var started = false;
  175. plan.start = function () {
  176. // don't allow multiple starts
  177. // even though it should work, it's simpler not to.
  178. if (started) { return plan; }
  179. // this seems to imply a 'stop' method
  180. // but I don't need it, so I'm not implementing it now --ansuz
  181. started = true;
  182. // start asynchronously, otherwise jobs will start running
  183. // before you've had a chance to return 'plan', and weird things
  184. // happen.
  185. setTimeout(function () {
  186. next();
  187. });
  188. return plan;
  189. };
  190. // you can pass any number of functions to be executed
  191. // when all pending jobs are complete.
  192. // We don't pass any arguments, so you need to handle return values
  193. // yourself if you want them.
  194. plan.done = function (f) {
  195. if (typeof(f) !== 'function') { throw new Error('expected function'); }
  196. completeHandlers.push(f);
  197. return plan;
  198. };
  199. // That's all! I hope you had fun reading this!
  200. return plan;
  201. };