You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

172 lines
5.5 KiB

  1. var WriteQueue = require("./write-queue");
  2. var Util = require("./common-util");
  3. /* This module provides implements a FIFO scheduler
  4. which assumes the existence of three types of async tasks:
  5. 1. ordered tasks which must be executed sequentially
  6. 2. unordered tasks which can be executed in parallel
  7. 3. blocking tasks which must block the execution of all other tasks
  8. The scheduler assumes there will be many resources identified by strings,
  9. and that the constraints described above will only apply in the context
  10. of identical string ids.
  11. Many blocking tasks may be executed in parallel so long as they
  12. concern resources identified by different ids.
  13. USAGE:
  14. const schedule = require("./schedule")();
  15. // schedule two sequential tasks using the resource 'pewpew'
  16. schedule.ordered('pewpew', function (next) {
  17. appendToFile('beep\n', next);
  18. });
  19. schedule.ordered('pewpew', function (next) {
  20. appendToFile('boop\n', next);
  21. });
  22. // schedule a task that can happen whenever
  23. schedule.unordered('pewpew', function (next) {
  24. displayFileSize(next);
  25. });
  26. // schedule a blocking task which will wait
  27. // until the all unordered tasks have completed before commencing
  28. schedule.blocking('pewpew', function (next) {
  29. deleteFile(next);
  30. });
  31. // this will be queued for after the blocking task
  32. schedule.ordered('pewpew', function (next) {
  33. appendFile('boom', next);
  34. });
  35. */
  36. // return a uid which is not already in a map
  37. var unusedUid = function (set) {
  38. var uid = Util.uid();
  39. if (set[uid]) { return unusedUid(); }
  40. return uid;
  41. };
  42. // return an existing session, creating one if it does not already exist
  43. var lookup = function (map, id) {
  44. return (map[id] = map[id] || {
  45. //blocking: [],
  46. active: {},
  47. blocked: {},
  48. });
  49. };
  50. var isEmpty = function (map) {
  51. for (var key in map) {
  52. if (map.hasOwnProperty(key)) { return false; }
  53. }
  54. return true;
  55. };
  56. module.exports = function () {
  57. // every scheduler instance has its own queue
  58. var queue = WriteQueue();
  59. // ordered tasks don't require any extra logic
  60. var Ordered = function (id, task) {
  61. queue(id, task);
  62. };
  63. // unordered and blocking tasks need a little extra state
  64. var map = {};
  65. // regular garbage collection keeps memory consumption low
  66. var collectGarbage = function (id) {
  67. // avoid using 'lookup' since it creates a session implicitly
  68. var local = map[id];
  69. // bail out if no session
  70. if (!local) { return; }
  71. // bail out if there are blocking or active tasks
  72. if (local.lock) { return; }
  73. if (!isEmpty(local.active)) { return; }
  74. // if there are no pending actions then delete the session
  75. delete map[id];
  76. };
  77. // unordered tasks run immediately if there are no blocking tasks scheduled
  78. // or immediately after blocking tasks finish
  79. var runImmediately = function (local, task) {
  80. // set a flag in the map of active unordered tasks
  81. // to prevent blocking tasks from running until you finish
  82. var uid = unusedUid(local.active);
  83. local.active[uid] = true;
  84. task(function () {
  85. // remove the flag you set to indicate that your task completed
  86. delete local.active[uid];
  87. // don't do anything if other unordered tasks are still running
  88. if (!isEmpty(local.active)) { return; }
  89. // bail out if there are no blocking tasks scheduled or ready
  90. if (typeof(local.waiting) !== 'function') {
  91. return void collectGarbage();
  92. }
  93. setTimeout(local.waiting);
  94. });
  95. };
  96. var runOnceUnblocked = function (local, task) {
  97. var uid = unusedUid(local.blocked);
  98. local.blocked[uid] = function () {
  99. runImmediately(local, task);
  100. };
  101. };
  102. // 'unordered' tasks are scheduled to run in after the most recently received blocking task
  103. // or immediately and in parallel if there are no blocking tasks scheduled.
  104. var Unordered = function (id, task) {
  105. var local = lookup(map, id);
  106. if (local.lock) { return runOnceUnblocked(local, task); }
  107. runImmediately(local, task);
  108. };
  109. var runBlocked = function (local) {
  110. for (var task in local.blocked) {
  111. runImmediately(local, local.blocked[task]);
  112. }
  113. };
  114. // 'blocking' tasks must be run alone.
  115. // They are queued alongside ordered tasks,
  116. // and wait until any running 'unordered' tasks complete before commencing.
  117. var Blocking = function (id, task) {
  118. var local = lookup(map, id);
  119. queue(id, function (next) {
  120. // start right away if there are no running unordered tasks
  121. if (isEmpty(local.active)) {
  122. local.lock = true;
  123. return void task(function () {
  124. delete local.lock;
  125. runBlocked(local);
  126. next();
  127. });
  128. }
  129. // otherwise wait until the running tasks have completed
  130. local.waiting = function () {
  131. local.lock = true;
  132. task(function () {
  133. delete local.lock;
  134. delete local.waiting;
  135. runBlocked(local);
  136. next();
  137. });
  138. };
  139. });
  140. };
  141. return {
  142. ordered: Ordered,
  143. unordered: Unordered,
  144. blocking: Blocking,
  145. };
  146. };