You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

291 lines
10 KiB

11 months ago
11 months ago
  1. /* jshint esversion: 6 */
  2. const nThen = require('nthen');
  3. const Crypto = require('crypto');
  4. const WriteQueue = require("./write-queue");
  5. const BatchRead = require("./batch-read");
  6. const RPC = require("./rpc");
  7. const HK = require("./hk-util.js");
  8. const Core = require("./commands/core");
  9. const Store = require("./storage/file");
  10. const BlobStore = require("./storage/blob");
  11. const Workers = require("./workers/index");
  12. module.exports.create = function (config, cb) {
  13. const Log = config.log;
  14. var WARN = function (e, output) {
  15. if (e && output) {
  16. Log.warn(e, {
  17. output: output,
  18. message: String(e),
  19. stack: new Error(e).stack,
  20. });
  21. }
  22. };
  23. Log.silly('HK_LOADING', 'LOADING HISTORY_KEEPER MODULE');
  24. // TODO populate Env with everything that you use from config
  25. // so that you can stop passing around your raw config
  26. // and more easily share state between historyKeeper and rpc
  27. const Env = {
  28. Log: Log,
  29. // tasks
  30. // store
  31. id: Crypto.randomBytes(8).toString('hex'),
  32. metadata_cache: {},
  33. channel_cache: {},
  34. queueStorage: WriteQueue(),
  35. queueDeletes: WriteQueue(),
  36. batchIndexReads: BatchRead("HK_GET_INDEX"),
  37. batchMetadata: BatchRead('GET_METADATA'),
  38. batchRegisteredUsers: BatchRead("GET_REGISTERED_USERS"),
  39. batchDiskUsage: BatchRead('GET_DISK_USAGE'),
  40. batchUserPins: BatchRead('LOAD_USER_PINS'),
  41. batchTotalSize: BatchRead('GET_TOTAL_SIZE'),
  42. //historyKeeper: config.historyKeeper,
  43. intervals: config.intervals || {},
  44. maxUploadSize: config.maxUploadSize || (20 * 1024 * 1024),
  45. premiumUploadSize: false, // overridden below...
  46. Sessions: {},
  47. paths: {},
  48. //msgStore: config.store,
  49. netfluxUsers: {},
  50. pinStore: undefined,
  51. pinnedPads: {},
  52. pinsLoaded: false,
  53. pendingPinInquiries: {},
  54. pendingUnpins: {},
  55. pinWorkers: 5,
  56. limits: {},
  57. admins: [],
  58. WARN: WARN,
  59. flushCache: config.flushCache,
  60. adminEmail: config.adminEmail,
  61. allowSubscriptions: config.allowSubscriptions === true,
  62. blockDailyCheck: config.blockDailyCheck === true,
  63. myDomain: config.myDomain,
  64. mySubdomain: config.mySubdomain, // only exists for the accounts integration
  65. customLimits: config.customLimits || {},
  66. // FIXME this attribute isn't in the default conf
  67. // but it is referenced in Quota
  68. domain: config.domain
  69. };
  70. (function () {
  71. var pes = config.premiumUploadSize;
  72. if (!isNaN(pes) && pes >= Env.maxUploadSize) {
  73. Env.premiumUploadSize = pes;
  74. }
  75. }());
  76. var paths = Env.paths;
  77. var keyOrDefaultString = function (key, def) {
  78. return typeof(config[key]) === 'string'? config[key]: def;
  79. };
  80. var pinPath = paths.pin = keyOrDefaultString('pinPath', './pins');
  81. paths.block = keyOrDefaultString('blockPath', './block');
  82. paths.data = keyOrDefaultString('filePath', './datastore');
  83. paths.staging = keyOrDefaultString('blobStagingPath', './blobstage');
  84. paths.blob = keyOrDefaultString('blobPath', './blob');
  85. Env.defaultStorageLimit = typeof(config.defaultStorageLimit) === 'number' && config.defaultStorageLimit >= 0?
  86. config.defaultStorageLimit:
  87. Core.DEFAULT_LIMIT;
  88. try {
  89. Env.admins = (config.adminKeys || []).map(function (k) {
  90. k = k.replace(/\/+$/, '');
  91. var s = k.split('/');
  92. return s[s.length-1];
  93. });
  94. } catch (e) {
  95. console.error("Can't parse admin keys. Please update or fix your config.js file!");
  96. }
  97. config.historyKeeper = Env.historyKeeper = {
  98. metadata_cache: Env.metadata_cache,
  99. channel_cache: Env.channel_cache,
  100. id: Env.id,
  101. channelMessage: function (Server, channel, msgStruct) {
  102. // netflux-server emits 'channelMessage' events whenever someone broadcasts to a channel
  103. // historyKeeper stores these messages if the channel id indicates that they are
  104. // a channel type with permanent history
  105. HK.onChannelMessage(Env, Server, channel, msgStruct);
  106. },
  107. channelClose: function (channelName) {
  108. // netflux-server emits 'channelClose' events whenever everyone leaves a channel
  109. // we drop cached metadata and indexes at the same time
  110. HK.dropChannel(Env, channelName);
  111. },
  112. channelOpen: function (Server, channelName, userId, wait) {
  113. Env.channel_cache[channelName] = Env.channel_cache[channelName] || {};
  114. var sendHKJoinMessage = function () {
  115. Server.send(userId, [
  116. 0,
  117. Env.id,
  118. 'JOIN',
  119. channelName
  120. ]);
  121. };
  122. // a little backwards compatibility in case you don't have the latest server
  123. // allow lists won't work unless you update, though
  124. if (typeof(wait) !== 'function') { return void sendHKJoinMessage(); }
  125. var next = wait();
  126. var cb = function (err, info) {
  127. next(err, info, sendHKJoinMessage);
  128. };
  129. // only conventional channels can be restricted
  130. if ((channelName || "").length !== HK.STANDARD_CHANNEL_LENGTH) {
  131. return void cb();
  132. }
  133. // gets and caches the metadata...
  134. HK.getMetadata(Env, channelName, function (err, metadata) {
  135. if (err) {
  136. Log.error('HK_METADATA_ERR', {
  137. channel: channelName,
  138. error: err,
  139. });
  140. }
  141. if (!metadata || (metadata && !metadata.restricted)) {
  142. // the channel doesn't have metadata, or it does and it's not restricted
  143. // either way, let them join.
  144. return void cb();
  145. }
  146. // this channel is restricted. verify that the user in question is in the allow list
  147. // construct a definitive list (owners + allowed)
  148. var allowed = HK.listAllowedUsers(metadata);
  149. // and get the list of keys for which this user has already authenticated
  150. var session = HK.getNetfluxSession(Env, userId);
  151. if (HK.isUserSessionAllowed(allowed, session)) {
  152. return void cb();
  153. }
  154. // otherwise they're not allowed.
  155. // respond with a special error that includes the list of keys
  156. // which would be allowed...
  157. // FIXME RESTRICT bonus points if you hash the keys to limit data exposure
  158. cb("ERESTRICTED", allowed);
  159. });
  160. },
  161. sessionClose: function (userId, reason) {
  162. HK.closeNetfluxSession(Env, userId);
  163. if (['BAD_MESSAGE', 'SEND_MESSAGE_FAIL_2'].indexOf(reason) !== -1) {
  164. if (reason && reason.code === 'ECONNRESET') { return; }
  165. return void Log.error('SESSION_CLOSE_WITH_ERROR', {
  166. userId: userId,
  167. reason: reason,
  168. });
  169. }
  170. if (['SOCKET_CLOSED', 'SOCKET_ERROR'].indexOf(reason)) { return; }
  171. Log.verbose('SESSION_CLOSE_ROUTINE', {
  172. userId: userId,
  173. reason: reason,
  174. });
  175. },
  176. directMessage: function (Server, seq, userId, json) {
  177. // netflux-server allows you to register an id with a handler
  178. // this handler is invoked every time someone sends a message to that id
  179. HK.onDirectMessage(Env, Server, seq, userId, json);
  180. },
  181. };
  182. Log.verbose('HK_ID', 'History keeper ID: ' + Env.id);
  183. nThen(function (w) {
  184. // create a pin store
  185. Store.create({
  186. filePath: pinPath,
  187. }, w(function (err, s) {
  188. if (err) { throw err; }
  189. Env.pinStore = s;
  190. }));
  191. // create a channel store
  192. Store.create(config, w(function (err, _store) {
  193. if (err) { throw err; }
  194. config.store = _store;
  195. Env.msgStore = _store; // API used by rpc
  196. Env.store = _store; // API used by historyKeeper
  197. }));
  198. // create a blob store
  199. BlobStore.create({
  200. blobPath: config.blobPath,
  201. blobStagingPath: config.blobStagingPath,
  202. archivePath: config.archivePath,
  203. getSession: function (safeKey) {
  204. return Core.getSession(Env.Sessions, safeKey);
  205. },
  206. }, w(function (err, blob) {
  207. if (err) { throw new Error(err); }
  208. Env.blobStore = blob;
  209. }));
  210. }).nThen(function (w) {
  211. Workers.initialize(Env, {
  212. blobPath: config.blobPath,
  213. blobStagingPath: config.blobStagingPath,
  214. taskPath: config.taskPath,
  215. pinPath: pinPath,
  216. filePath: config.filePath,
  217. archivePath: config.archivePath,
  218. channelExpirationMs: config.channelExpirationMs,
  219. verbose: config.verbose,
  220. openFileLimit: config.openFileLimit,
  221. maxWorkers: config.maxWorkers,
  222. }, w(function (err) {
  223. if (err) {
  224. throw new Error(err);
  225. }
  226. }));
  227. }).nThen(function (w) {
  228. // create a task store (for scheduling tasks)
  229. require("./storage/tasks").create(config, w(function (e, tasks) {
  230. if (e) { throw e; }
  231. Env.tasks = tasks;
  232. }));
  233. if (config.disableIntegratedTasks) { return; }
  234. config.intervals = config.intervals || {};
  235. var tasks_running;
  236. config.intervals.taskExpiration = setInterval(function () {
  237. if (tasks_running) { return; }
  238. tasks_running = true;
  239. Env.runTasks(function (err) {
  240. if (err) {
  241. Log.error('TASK_RUNNER_ERR', err);
  242. }
  243. tasks_running = false;
  244. });
  245. }, 1000 * 60 * 5); // run every five minutes
  246. }).nThen(function () {
  247. RPC.create(Env, function (err, _rpc) {
  248. if (err) { throw err; }
  249. Env.rpc = _rpc;
  250. cb(void 0, config.historyKeeper);
  251. });
  252. });
  253. };