You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

367 lines
11 KiB

  1. /* jshint esversion: 6 */
  2. /* global process */
  3. const Util = require("../common-util");
  4. const nThen = require('nthen');
  5. const OS = require("os");
  6. const { fork } = require('child_process');
  7. const Workers = module.exports;
  8. const PID = process.pid;
  9. const DB_PATH = 'lib/workers/db-worker';
  10. const MAX_JOBS = 16;
  11. Workers.initialize = function (Env, config, _cb) {
  12. var cb = Util.once(Util.mkAsync(_cb));
  13. const workers = [];
  14. const response = Util.response(function (errLabel, info) {
  15. Env.Log.error('HK_DB_WORKER__' + errLabel, info);
  16. });
  17. const Log = Env.Log;
  18. const handleLog = function (level, label, info) {
  19. if (typeof(Log[level]) !== 'function') { return; }
  20. Log[level](label, info);
  21. };
  22. var isWorker = function (value) {
  23. return value && value.worker && typeof(value.worker.send) === 'function';
  24. };
  25. // pick ids that aren't already in use...
  26. const guid = function () {
  27. var id = Util.uid();
  28. return response.expected(id)? guid(): id;
  29. };
  30. var workerOffset = -1;
  31. var queue = [];
  32. var getAvailableWorkerIndex = function () {
  33. // If there is already a backlog of tasks you can avoid some work
  34. // by going to the end of the line
  35. if (queue.length) { return -1; }
  36. var L = workers.length;
  37. if (L === 0) {
  38. Log.error('NO_WORKERS_AVAILABLE', {
  39. queue: queue.length,
  40. });
  41. return -1;
  42. }
  43. // cycle through the workers once
  44. // start from a different offset each time
  45. // return -1 if none are available
  46. workerOffset = (workerOffset + 1) % L;
  47. var temp;
  48. for (let i = 0; i < L; i++) {
  49. temp = (workerOffset + i) % L;
  50. /* I'd like for this condition to be more efficient
  51. (`Object.keys` is sub-optimal) but I found some bugs in my initial
  52. implementation stemming from a task counter variable going out-of-sync
  53. with reality when a worker crashed and its tasks were re-assigned to
  54. its substitute. I'm sure it can be done correctly and efficiently,
  55. but this is a relatively easy way to make sure it's always up to date.
  56. We'll see how it performs in practice before optimizing.
  57. */
  58. if (workers[temp] && Object.keys(workers[temp]).length < MAX_JOBS) {
  59. return temp;
  60. }
  61. }
  62. return -1;
  63. };
  64. var sendCommand = function (msg, _cb) {
  65. var index = getAvailableWorkerIndex();
  66. var state = workers[index];
  67. // if there is no worker available:
  68. if (!isWorker(state)) {
  69. // queue the message for when one becomes available
  70. queue.push({
  71. msg: msg,
  72. cb: _cb,
  73. });
  74. return;
  75. }
  76. var cb = Util.once(Util.mkAsync(_cb));
  77. const txid = guid();
  78. msg.txid = txid;
  79. msg.pid = PID;
  80. // track which worker is doing which jobs
  81. state.tasks[txid] = msg;
  82. response.expect(txid, cb, 60000);
  83. state.worker.send(msg);
  84. };
  85. var handleResponse = function (state, res) {
  86. if (!res) { return; }
  87. // handle log messages before checking if it was addressed to your PID
  88. // it might still be useful to know what happened inside an orphaned worker
  89. if (res.log) {
  90. return void handleLog(res.log, res.label, res.info);
  91. }
  92. // but don't bother handling things addressed to other processes
  93. // since it's basically guaranteed not to work
  94. if (res.pid !== PID) {
  95. return void Log.error("WRONG_PID", res);
  96. }
  97. if (!res.txid) { return; }
  98. response.handle(res.txid, [res.error, res.value]);
  99. delete state.tasks[res.txid];
  100. if (!queue.length) { return; }
  101. var nextMsg = queue.shift();
  102. /* `nextMsg` was at the top of the queue.
  103. We know that a job just finished and all of this code
  104. is synchronous, so calling `sendCommand` should take the worker
  105. which was just freed up. This is somewhat fragile though, so
  106. be careful if you want to modify this block. The risk is that
  107. we take something that was at the top of the queue and push it
  108. to the back because the following msg took its place. OR, in an
  109. even worse scenario, we cycle through the queue but don't run anything.
  110. */
  111. sendCommand(nextMsg.msg, nextMsg.cb);
  112. };
  113. const initWorker = function (worker, cb) {
  114. const txid = guid();
  115. const state = {
  116. worker: worker,
  117. tasks: {},
  118. };
  119. response.expect(txid, function (err) {
  120. if (err) { return void cb(err); }
  121. workers.push(state);
  122. cb(void 0, state);
  123. }, 15000);
  124. worker.send({
  125. pid: PID,
  126. txid: txid,
  127. config: config,
  128. });
  129. worker.on('message', function (res) {
  130. handleResponse(state, res);
  131. });
  132. var substituteWorker = Util.once(function () {
  133. Env.Log.info("SUBSTITUTE_DB_WORKER", '');
  134. var idx = workers.indexOf(state);
  135. if (idx !== -1) {
  136. workers.splice(idx, 1);
  137. }
  138. Object.keys(state.tasks).forEach(function (txid) {
  139. const cb = response.expectation(txid);
  140. if (typeof(cb) !== 'function') { return; }
  141. const task = state.tasks[txid];
  142. if (!task && task.msg) { return; }
  143. response.clear(txid);
  144. Log.info('DB_WORKER_RESEND', task.msg);
  145. sendCommand(task.msg, cb);
  146. });
  147. var w = fork(DB_PATH);
  148. initWorker(w, function (err, state) {
  149. if (err) {
  150. throw new Error(err);
  151. }
  152. workers.push(state);
  153. });
  154. });
  155. worker.on('exit', substituteWorker);
  156. worker.on('close', substituteWorker);
  157. worker.on('error', function (err) {
  158. substituteWorker();
  159. Env.Log.error("DB_WORKER_ERROR", {
  160. error: err,
  161. });
  162. });
  163. };
  164. nThen(function (w) {
  165. const max = config.maxWorkers;
  166. var limit;
  167. if (typeof(max) !== 'undefined') {
  168. // the admin provided a limit on the number of workers
  169. if (typeof(max) === 'number' && !isNaN(max)) {
  170. if (max < 1) {
  171. Log.info("INSUFFICIENT_MAX_WORKERS", max);
  172. limit = 1;
  173. }
  174. limit = max;
  175. } else {
  176. Log.error("INVALID_MAX_WORKERS", '[' + max + ']');
  177. }
  178. }
  179. var logged;
  180. OS.cpus().forEach(function (cpu, index) {
  181. if (limit && index >= limit) {
  182. if (!logged) {
  183. logged = true;
  184. Log.info('WORKER_LIMIT', "(Opting not to use available CPUs beyond " + index + ')');
  185. }
  186. return;
  187. }
  188. initWorker(fork(DB_PATH), w(function (err) {
  189. if (!err) { return; }
  190. w.abort();
  191. return void cb(err);
  192. }));
  193. });
  194. }).nThen(function () {
  195. Env.computeIndex = function (Env, channel, cb) {
  196. Env.store.getWeakLock(channel, function (next) {
  197. sendCommand({
  198. channel: channel,
  199. command: 'COMPUTE_INDEX',
  200. }, function (err, index) {
  201. next();
  202. cb(err, index);
  203. });
  204. });
  205. };
  206. Env.computeMetadata = function (channel, cb) {
  207. Env.store.getWeakLock(channel, function (next) {
  208. sendCommand({
  209. channel: channel,
  210. command: 'COMPUTE_METADATA',
  211. }, function (err, metadata) {
  212. next();
  213. cb(err, metadata);
  214. });
  215. });
  216. };
  217. Env.getOlderHistory = function (channel, oldestKnownHash, desiredMessages, desiredCheckpoint, cb) {
  218. Env.store.getWeakLock(channel, function (next) {
  219. sendCommand({
  220. channel: channel,
  221. command: "GET_OLDER_HISTORY",
  222. hash: oldestKnownHash,
  223. desiredMessages: desiredMessages,
  224. desiredCheckpoint: desiredCheckpoint,
  225. }, Util.both(next, cb));
  226. });
  227. };
  228. Env.getPinState = function (safeKey, cb) {
  229. Env.pinStore.getWeakLock(safeKey, function (next) {
  230. sendCommand({
  231. key: safeKey,
  232. command: 'GET_PIN_STATE',
  233. }, Util.both(next, cb));
  234. });
  235. };
  236. Env.getFileSize = function (channel, cb) {
  237. sendCommand({
  238. command: 'GET_FILE_SIZE',
  239. channel: channel,
  240. }, cb);
  241. };
  242. Env.getDeletedPads = function (channels, cb) {
  243. sendCommand({
  244. command: "GET_DELETED_PADS",
  245. channels: channels,
  246. }, cb);
  247. };
  248. Env.getTotalSize = function (channels, cb) {
  249. // we could take out locks for all of these channels,
  250. // but it's OK if the size is slightly off
  251. sendCommand({
  252. command: 'GET_TOTAL_SIZE',
  253. channels: channels,
  254. }, cb);
  255. };
  256. Env.getMultipleFileSize = function (channels, cb) {
  257. sendCommand({
  258. command: "GET_MULTIPLE_FILE_SIZE",
  259. channels: channels,
  260. }, cb);
  261. };
  262. Env.getHashOffset = function (channel, hash, cb) {
  263. Env.store.getWeakLock(channel, function (next) {
  264. sendCommand({
  265. command: 'GET_HASH_OFFSET',
  266. channel: channel,
  267. hash: hash,
  268. }, Util.both(next, cb));
  269. });
  270. };
  271. Env.removeOwnedBlob = function (blobId, safeKey, cb) {
  272. sendCommand({
  273. command: 'REMOVE_OWNED_BLOB',
  274. blobId: blobId,
  275. safeKey: safeKey,
  276. }, cb);
  277. };
  278. Env.runTasks = function (cb) {
  279. sendCommand({
  280. command: 'RUN_TASKS',
  281. }, cb);
  282. };
  283. Env.writeTask = function (time, command, args, cb) {
  284. sendCommand({
  285. command: 'WRITE_TASK',
  286. time: time,
  287. task_command: command,
  288. args: args,
  289. }, cb);
  290. };
  291. // Synchronous crypto functions
  292. Env.validateMessage = function (signedMsg, key, cb) {
  293. sendCommand({
  294. msg: signedMsg,
  295. key: key,
  296. command: 'INLINE',
  297. }, cb);
  298. };
  299. Env.checkSignature = function (signedMsg, signature, publicKey, cb) {
  300. sendCommand({
  301. command: 'DETACHED',
  302. sig: signature,
  303. msg: signedMsg,
  304. key: publicKey,
  305. }, cb);
  306. };
  307. Env.hashChannelList = function (channels, cb) {
  308. sendCommand({
  309. command: 'HASH_CHANNEL_LIST',
  310. channels: channels,
  311. }, cb);
  312. };
  313. cb(void 0);
  314. });
  315. };