You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

285 lines
10 KiB

11 months ago
11 months ago
9 months ago
  1. /* jshint esversion: 6 */
  2. const nThen = require('nthen');
  3. const Crypto = require('crypto');
  4. const WriteQueue = require("./write-queue");
  5. const BatchRead = require("./batch-read");
  6. const RPC = require("./rpc");
  7. const HK = require("./hk-util.js");
  8. const Core = require("./commands/core");
  9. const Store = require("./storage/file");
  10. const BlobStore = require("./storage/blob");
  11. const Workers = require("./workers/index");
  12. module.exports.create = function (config, cb) {
  13. const Log = config.log;
  14. var WARN = function (e, output) {
  15. if (e && output) {
  16. Log.warn(e, {
  17. output: output,
  18. message: String(e),
  19. stack: new Error(e).stack,
  20. });
  21. }
  22. };
  23. Log.silly('HK_LOADING', 'LOADING HISTORY_KEEPER MODULE');
  24. // TODO populate Env with everything that you use from config
  25. // so that you can stop passing around your raw config
  26. // and more easily share state between historyKeeper and rpc
  27. const Env = {
  28. Log: Log,
  29. // store
  30. id: Crypto.randomBytes(8).toString('hex'),
  31. metadata_cache: {},
  32. channel_cache: {},
  33. queueStorage: WriteQueue(),
  34. queueDeletes: WriteQueue(),
  35. batchIndexReads: BatchRead("HK_GET_INDEX"),
  36. batchMetadata: BatchRead('GET_METADATA'),
  37. batchRegisteredUsers: BatchRead("GET_REGISTERED_USERS"),
  38. batchDiskUsage: BatchRead('GET_DISK_USAGE'),
  39. batchUserPins: BatchRead('LOAD_USER_PINS'),
  40. batchTotalSize: BatchRead('GET_TOTAL_SIZE'),
  41. //historyKeeper: config.historyKeeper,
  42. intervals: config.intervals || {},
  43. maxUploadSize: config.maxUploadSize || (20 * 1024 * 1024),
  44. premiumUploadSize: false, // overridden below...
  45. Sessions: {},
  46. paths: {},
  47. //msgStore: config.store,
  48. netfluxUsers: {},
  49. pinStore: undefined,
  50. pinnedPads: {},
  51. pinsLoaded: false,
  52. pendingPinInquiries: {},
  53. pendingUnpins: {},
  54. pinWorkers: 5,
  55. limits: {},
  56. admins: [],
  57. WARN: WARN,
  58. flushCache: config.flushCache,
  59. adminEmail: config.adminEmail,
  60. allowSubscriptions: config.allowSubscriptions === true,
  61. blockDailyCheck: config.blockDailyCheck === true,
  62. myDomain: config.myDomain,
  63. mySubdomain: config.mySubdomain, // only exists for the accounts integration
  64. customLimits: config.customLimits || {},
  65. // FIXME this attribute isn't in the default conf
  66. // but it is referenced in Quota
  67. domain: config.domain
  68. };
  69. (function () {
  70. var pes = config.premiumUploadSize;
  71. if (!isNaN(pes) && pes >= Env.maxUploadSize) {
  72. Env.premiumUploadSize = pes;
  73. }
  74. }());
  75. var paths = Env.paths;
  76. var keyOrDefaultString = function (key, def) {
  77. return typeof(config[key]) === 'string'? config[key]: def;
  78. };
  79. var pinPath = paths.pin = keyOrDefaultString('pinPath', './pins');
  80. paths.block = keyOrDefaultString('blockPath', './block');
  81. paths.data = keyOrDefaultString('filePath', './datastore');
  82. paths.staging = keyOrDefaultString('blobStagingPath', './blobstage');
  83. paths.blob = keyOrDefaultString('blobPath', './blob');
  84. Env.defaultStorageLimit = typeof(config.defaultStorageLimit) === 'number' && config.defaultStorageLimit >= 0?
  85. config.defaultStorageLimit:
  86. Core.DEFAULT_LIMIT;
  87. try {
  88. Env.admins = (config.adminKeys || []).map(function (k) {
  89. k = k.replace(/\/+$/, '');
  90. var s = k.split('/');
  91. return s[s.length-1];
  92. });
  93. } catch (e) {
  94. console.error("Can't parse admin keys. Please update or fix your config.js file!");
  95. }
  96. config.historyKeeper = Env.historyKeeper = {
  97. metadata_cache: Env.metadata_cache,
  98. channel_cache: Env.channel_cache,
  99. id: Env.id,
  100. channelMessage: function (Server, channel, msgStruct) {
  101. // netflux-server emits 'channelMessage' events whenever someone broadcasts to a channel
  102. // historyKeeper stores these messages if the channel id indicates that they are
  103. // a channel type with permanent history
  104. HK.onChannelMessage(Env, Server, channel, msgStruct);
  105. },
  106. channelClose: function (channelName) {
  107. // netflux-server emits 'channelClose' events whenever everyone leaves a channel
  108. // we drop cached metadata and indexes at the same time
  109. HK.dropChannel(Env, channelName);
  110. },
  111. channelOpen: function (Server, channelName, userId, wait) {
  112. Env.channel_cache[channelName] = Env.channel_cache[channelName] || {};
  113. var sendHKJoinMessage = function () {
  114. Server.send(userId, [
  115. 0,
  116. Env.id,
  117. 'JOIN',
  118. channelName
  119. ]);
  120. };
  121. // a little backwards compatibility in case you don't have the latest server
  122. // allow lists won't work unless you update, though
  123. if (typeof(wait) !== 'function') { return void sendHKJoinMessage(); }
  124. var next = wait();
  125. var cb = function (err, info) {
  126. next(err, info, sendHKJoinMessage);
  127. };
  128. // only conventional channels can be restricted
  129. if ((channelName || "").length !== HK.STANDARD_CHANNEL_LENGTH) {
  130. return void cb();
  131. }
  132. // gets and caches the metadata...
  133. HK.getMetadata(Env, channelName, function (err, metadata) {
  134. if (err) {
  135. Log.error('HK_METADATA_ERR', {
  136. channel: channelName,
  137. error: err,
  138. });
  139. }
  140. if (!metadata || (metadata && !metadata.restricted)) {
  141. // the channel doesn't have metadata, or it does and it's not restricted
  142. // either way, let them join.
  143. return void cb();
  144. }
  145. // this channel is restricted. verify that the user in question is in the allow list
  146. // construct a definitive list (owners + allowed)
  147. var allowed = HK.listAllowedUsers(metadata);
  148. // and get the list of keys for which this user has already authenticated
  149. var session = HK.getNetfluxSession(Env, userId);
  150. if (HK.isUserSessionAllowed(allowed, session)) {
  151. return void cb();
  152. }
  153. // otherwise they're not allowed.
  154. // respond with a special error that includes the list of keys
  155. // which would be allowed...
  156. // FIXME RESTRICT bonus points if you hash the keys to limit data exposure
  157. cb("ERESTRICTED", allowed);
  158. });
  159. },
  160. sessionClose: function (userId, reason) {
  161. HK.closeNetfluxSession(Env, userId);
  162. if (['BAD_MESSAGE', 'SEND_MESSAGE_FAIL_2'].indexOf(reason) !== -1) {
  163. if (reason && reason.code === 'ECONNRESET') { return; }
  164. return void Log.error('SESSION_CLOSE_WITH_ERROR', {
  165. userId: userId,
  166. reason: reason,
  167. });
  168. }
  169. if (['SOCKET_CLOSED', 'SOCKET_ERROR'].indexOf(reason)) { return; }
  170. Log.verbose('SESSION_CLOSE_ROUTINE', {
  171. userId: userId,
  172. reason: reason,
  173. });
  174. },
  175. directMessage: function (Server, seq, userId, json) {
  176. // netflux-server allows you to register an id with a handler
  177. // this handler is invoked every time someone sends a message to that id
  178. HK.onDirectMessage(Env, Server, seq, userId, json);
  179. },
  180. };
  181. Log.verbose('HK_ID', 'History keeper ID: ' + Env.id);
  182. nThen(function (w) {
  183. // create a pin store
  184. Store.create({
  185. filePath: pinPath,
  186. }, w(function (err, s) {
  187. if (err) { throw err; }
  188. Env.pinStore = s;
  189. }));
  190. // create a channel store
  191. Store.create(config, w(function (err, _store) {
  192. if (err) { throw err; }
  193. config.store = _store;
  194. Env.msgStore = _store; // API used by rpc
  195. Env.store = _store; // API used by historyKeeper
  196. }));
  197. // create a blob store
  198. BlobStore.create({
  199. blobPath: config.blobPath,
  200. blobStagingPath: config.blobStagingPath,
  201. archivePath: config.archivePath,
  202. getSession: function (safeKey) {
  203. return Core.getSession(Env.Sessions, safeKey);
  204. },
  205. }, w(function (err, blob) {
  206. if (err) { throw new Error(err); }
  207. Env.blobStore = blob;
  208. }));
  209. }).nThen(function (w) {
  210. Workers.initialize(Env, {
  211. blobPath: config.blobPath,
  212. blobStagingPath: config.blobStagingPath,
  213. taskPath: config.taskPath,
  214. pinPath: pinPath,
  215. filePath: config.filePath,
  216. archivePath: config.archivePath,
  217. channelExpirationMs: config.channelExpirationMs,
  218. verbose: config.verbose,
  219. openFileLimit: config.openFileLimit,
  220. maxWorkers: config.maxWorkers,
  221. }, w(function (err) {
  222. if (err) {
  223. throw new Error(err);
  224. }
  225. }));
  226. }).nThen(function () {
  227. if (config.disableIntegratedTasks) { return; }
  228. config.intervals = config.intervals || {};
  229. var tasks_running;
  230. config.intervals.taskExpiration = setInterval(function () {
  231. if (tasks_running) { return; }
  232. tasks_running = true;
  233. Env.runTasks(function (err) {
  234. if (err) {
  235. Log.error('TASK_RUNNER_ERR', err);
  236. }
  237. tasks_running = false;
  238. });
  239. }, 1000 * 60 * 5); // run every five minutes
  240. }).nThen(function () {
  241. RPC.create(Env, function (err, _rpc) {
  242. if (err) { throw err; }
  243. Env.rpc = _rpc;
  244. cb(void 0, config.historyKeeper);
  245. });
  246. });
  247. };