You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

910 lines
34 KiB

  1. /* jshint esversion: 6 */
  2. /* global Buffer */
  3. var HK = module.exports;
  4. const nThen = require('nthen');
  5. const Util = require("./common-util");
  6. const MetaRPC = require("./commands/metadata");
  7. const Nacl = require('tweetnacl/nacl-fast');
  8. const now = function () { return (new Date()).getTime(); };
  9. const ONE_DAY = 1000 * 60 * 60 * 24; // one day in milliseconds
  10. /* getHash
  11. * this function slices off the leading portion of a message which is
  12. most likely unique
  13. * these "hashes" are used to identify particular messages in a channel's history
  14. * clients store "hashes" either in memory or in their drive to query for new messages:
  15. * when reconnecting to a pad
  16. * when connecting to chat or a mailbox
  17. * thus, we can't change this function without invalidating client data which:
  18. * is encrypted clientside
  19. * can't be easily migrated
  20. * don't break it!
  21. */
  22. const getHash = HK.getHash = function (msg, Log) {
  23. if (typeof(msg) !== 'string') {
  24. if (Log) {
  25. Log.warn('HK_GET_HASH', 'getHash() called on ' + typeof(msg) + ': ' + msg);
  26. }
  27. return '';
  28. }
  29. return msg.slice(0,64);
  30. };
  31. // historyKeeper should explicitly store any channel
  32. // with a 32 character id
  33. const STANDARD_CHANNEL_LENGTH = HK.STANDARD_CHANNEL_LENGTH = 32;
  34. // historyKeeper should not store messages sent to any channel
  35. // with a 34 character id
  36. const EPHEMERAL_CHANNEL_LENGTH = HK.EPHEMERAL_CHANNEL_LENGTH = 34;
  37. const tryParse = HK.tryParse = function (Env, str) {
  38. try {
  39. return JSON.parse(str);
  40. } catch (err) {
  41. Env.Log.error('HK_PARSE_ERROR', {
  42. message: err && err.name,
  43. input: str,
  44. });
  45. }
  46. };
  47. /* sliceCpIndex
  48. returns a list of all checkpoints which might be relevant for a client connecting to a session
  49. * if there are two or fewer checkpoints, return everything you have
  50. * if there are more than two
  51. * return at least two
  52. * plus any more which were received within the last 100 messages
  53. This is important because the additional history is what prevents
  54. clients from forking on checkpoints and dropping forked history.
  55. */
  56. const sliceCpIndex = HK.sliceCpIndex = function (cpIndex, line) {
  57. // Remove "old" checkpoints (cp sent before 100 messages ago)
  58. const minLine = Math.max(0, (line - 100));
  59. let start = cpIndex.slice(0, -2);
  60. const end = cpIndex.slice(-2);
  61. start = start.filter(function (obj) {
  62. return obj.line > minLine;
  63. });
  64. return start.concat(end);
  65. };
  66. const isMetadataMessage = HK.isMetadataMessage = function (parsed) {
  67. return Boolean(parsed && parsed.channel);
  68. };
  69. HK.listAllowedUsers = function (metadata) {
  70. return (metadata.owners || []).concat((metadata.allowed || []));
  71. };
  72. HK.getNetfluxSession = function (Env, netfluxId) {
  73. return Env.netfluxUsers[netfluxId];
  74. };
  75. HK.isUserSessionAllowed = function (allowed, session) {
  76. if (!session) { return false; }
  77. for (var unsafeKey in session) {
  78. if (allowed.indexOf(unsafeKey) !== -1) {
  79. return true;
  80. }
  81. }
  82. return false;
  83. };
  84. HK.authenticateNetfluxSession = function (Env, netfluxId, unsafeKey) {
  85. var user = Env.netfluxUsers[netfluxId] = Env.netfluxUsers[netfluxId] || {};
  86. user[unsafeKey] = +new Date();
  87. };
  88. HK.closeNetfluxSession = function (Env, netfluxId) {
  89. delete Env.netfluxUsers[netfluxId];
  90. };
  91. // validateKeyStrings supplied by clients must decode to 32-byte Uint8Arrays
  92. const isValidValidateKeyString = function (key) {
  93. try {
  94. return typeof(key) === 'string' &&
  95. Nacl.util.decodeBase64(key).length === Nacl.sign.publicKeyLength;
  96. } catch (e) {
  97. return false;
  98. }
  99. };
  100. var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/;
  101. /* expireChannel is here to clean up channels that should have been removed
  102. but for some reason are still present
  103. */
  104. const expireChannel = function (Env, channel) {
  105. return void Env.store.archiveChannel(channel, function (err) {
  106. Env.Log.info("ARCHIVAL_CHANNEL_BY_HISTORY_KEEPER_EXPIRATION", {
  107. channelId: channel,
  108. status: err? String(err): "SUCCESS",
  109. });
  110. });
  111. };
  112. /* dropChannel
  113. * cleans up memory structures which are managed entirely by the historyKeeper
  114. */
  115. const dropChannel = HK.dropChannel = function (Env, chanName) {
  116. delete Env.metadata_cache[chanName];
  117. delete Env.channel_cache[chanName];
  118. };
  119. /* checkExpired
  120. * synchronously returns true or undefined to indicate whether the channel is expired
  121. * according to its metadata
  122. * has some side effects:
  123. * closes the channel via the store.closeChannel API
  124. * and then broadcasts to all channel members that the channel has expired
  125. * removes the channel from the netflux-server's in-memory cache
  126. * removes the channel metadata from history keeper's in-memory cache
  127. FIXME the boolean nature of this API should be separated from its side effects
  128. */
  129. const checkExpired = function (Env, Server, channel) {
  130. const store = Env.store;
  131. const metadata_cache = Env.metadata_cache;
  132. if (!(channel && channel.length === STANDARD_CHANNEL_LENGTH)) { return false; }
  133. let metadata = metadata_cache[channel];
  134. if (!(metadata && typeof(metadata.expire) === 'number')) { return false; }
  135. // the number of milliseconds ago the channel should have expired
  136. let pastDue = (+new Date()) - metadata.expire;
  137. // less than zero means that it hasn't expired yet
  138. if (pastDue < 0) { return false; }
  139. // if it should have expired more than a day ago...
  140. // there may have been a problem with scheduling tasks
  141. // or the scheduled tasks may not be running
  142. // so trigger a removal from here
  143. if (pastDue >= ONE_DAY) { expireChannel(Env, channel); }
  144. // close the channel
  145. store.closeChannel(channel, function () {
  146. Server.channelBroadcast(channel, {
  147. error: 'EEXPIRED',
  148. channel: channel
  149. }, Env.id);
  150. dropChannel(Env, channel);
  151. });
  152. // return true to indicate that it has expired
  153. return true;
  154. };
  155. const getMetadata = HK.getMetadata = function (Env, channelName, _cb) {
  156. var cb = Util.once(Util.mkAsync(_cb));
  157. var metadata = Env.metadata_cache[channelName];
  158. if (metadata && typeof(metadata) === 'object') {
  159. return void cb(undefined, metadata);
  160. }
  161. MetaRPC.getMetadataRaw(Env, channelName, function (err, metadata) {
  162. if (err) {
  163. console.error(err);
  164. return void cb(err);
  165. }
  166. if (!(metadata && typeof(metadata.channel) === 'string' && metadata.channel.length === STANDARD_CHANNEL_LENGTH)) {
  167. return cb();
  168. }
  169. // cache it
  170. Env.metadata_cache[channelName] = metadata;
  171. cb(undefined, metadata);
  172. });
  173. };
  174. /* getIndex
  175. calls back with an error if anything goes wrong
  176. or with a cached index for a channel if it exists
  177. (along with metadata)
  178. otherwise it calls back with the index computed by 'computeIndex'
  179. as an added bonus:
  180. if the channel exists but its index does not then it caches the index
  181. */
  182. const getIndex = (Env, channelName, cb) => {
  183. const channel_cache = Env.channel_cache;
  184. const chan = channel_cache[channelName];
  185. // if there is a channel in memory and it has an index cached, return it
  186. if (chan && chan.index) {
  187. // enforce async behaviour
  188. return void Util.mkAsync(cb)(undefined, chan.index);
  189. }
  190. Env.batchIndexReads(channelName, cb, function (done) {
  191. Env.computeIndex(Env, channelName, (err, ret) => {
  192. // this is most likely an unrecoverable filesystem error
  193. if (err) { return void done(err); }
  194. // cache the computed result if possible
  195. if (chan) { chan.index = ret; }
  196. // return
  197. done(void 0, ret);
  198. });
  199. });
  200. };
  201. /* checkOffsetMap
  202. Sorry for the weird function --ansuz
  203. This should be almost equivalent to `Object.keys(map).length` except
  204. that is will use less memory by not allocating space for the temporary array.
  205. Beyond that, it returns length * -1 if any of the members of the map
  206. are not in ascending order. The function for removing older members of the map
  207. loops over elements in order and deletes them, so ordering is important!
  208. */
  209. var checkOffsetMap = function (map) {
  210. var prev = 0;
  211. var cur;
  212. var ooo = 0; // out of order
  213. var count = 0;
  214. for (let k in map) {
  215. count++;
  216. cur = map[k];
  217. if (!ooo && prev > cur) { ooo = true; }
  218. prev = cur;
  219. }
  220. return ooo ? count * -1: count;
  221. };
  222. /* Pass the map and the number of elements it contains */
  223. var trimOffsetByOrder = function (map, n) {
  224. var toRemove = Math.max(n - 50, 0);
  225. var i = 0;
  226. for (let k in map) {
  227. if (i >= toRemove) { return; }
  228. i++;
  229. delete map[k];
  230. }
  231. };
  232. /* Remove from the map any byte offsets which are below
  233. the lowest offset you'd like to preserve
  234. (probably the oldest checkpoint */
  235. var trimMapByOffset = function (map, offset) {
  236. if (!offset) { return; }
  237. for (let k in map) {
  238. if (map[k] < offset) {
  239. delete map[k];
  240. }
  241. }
  242. };
  243. /* storeMessage
  244. * channel id
  245. * the message to store
  246. * whether the message is a checkpoint
  247. * optionally the hash of the message
  248. * it's not always used, but we guard against it
  249. * async but doesn't have a callback
  250. * source of a race condition whereby:
  251. * two messaages can be inserted
  252. * two offsets can be computed using the total size of all the messages
  253. * but the offsets don't correspond to the actual location of the newlines
  254. * because the two actions were performed like ABba...
  255. * the fix is to use callbacks and implement queueing for writes
  256. * to guarantee that offset computation is always atomic with writes
  257. */
  258. const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) {
  259. const id = channel.id;
  260. const Log = Env.Log;
  261. Env.queueStorage(id, function (next) {
  262. const msgBin = Buffer.from(msg + '\n', 'utf8');
  263. // Store the message first, and update the index only once it's stored.
  264. // store.messageBin can be async so updating the index first may
  265. // result in a wrong cpIndex
  266. nThen((waitFor) => {
  267. Env.store.messageBin(id, msgBin, waitFor(function (err) {
  268. if (err) {
  269. waitFor.abort();
  270. Log.error("HK_STORE_MESSAGE_ERROR", err.message);
  271. // this error is critical, but there's not much we can do at the moment
  272. // proceed with more messages, but they'll probably fail too
  273. // at least you won't have a memory leak
  274. // TODO make it possible to respond to clients with errors so they know
  275. // their message wasn't stored
  276. return void next();
  277. }
  278. }));
  279. }).nThen((waitFor) => {
  280. getIndex(Env, id, waitFor((err, index) => {
  281. if (err) {
  282. Log.warn("HK_STORE_MESSAGE_INDEX", err.stack);
  283. // non-critical, we'll be able to get the channel index later
  284. return void next();
  285. }
  286. if (typeof (index.line) === "number") { index.line++; }
  287. if (isCp) {
  288. index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0);
  289. trimMapByOffset(index.offsetByHash, index.cpIndex[0]);
  290. index.cpIndex.push({
  291. offset: index.size,
  292. line: ((index.line || 0) + 1)
  293. });
  294. }
  295. if (optionalMessageHash) {
  296. index.offsetByHash[optionalMessageHash] = index.size;
  297. index.offsets++;
  298. }
  299. if (index.offsets >= 100 && !index.cpIndex.length) {
  300. let offsetCount = checkOffsetMap(index.offsetByHash);
  301. if (offsetCount < 0) {
  302. Log.warn('OFFSET_TRIM_OOO', {
  303. channel: id,
  304. map: index.OffsetByHash
  305. });
  306. } else if (offsetCount > 0) {
  307. trimOffsetByOrder(index.offsetByHash, index.offsets);
  308. index.offsets = checkOffsetMap(index.offsetByHash);
  309. }
  310. }
  311. index.size += msgBin.length;
  312. // handle the next element in the queue
  313. next();
  314. }));
  315. });
  316. });
  317. };
  318. /* getHistoryOffset
  319. returns a number representing the byte offset from the start of the log
  320. for whatever history you're seeking.
  321. query by providing a 'lastKnownHash',
  322. which is really just a string of the first 64 characters of an encrypted message.
  323. OR by -1 which indicates that we want the full history (byte offset 0)
  324. OR nothing, which indicates that you want whatever messages the historyKeeper deems relevant
  325. (typically the last few checkpoints)
  326. this function embeds a lot of the history keeper's logic:
  327. 0. if you passed -1 as the lastKnownHash it means you want the complete history
  328. * I'm not sure why you'd need to call this function if you know it will return 0 in this case...
  329. * it has a side-effect of filling the index cache if it's empty
  330. 1. if you provided a lastKnownHash and that message does not exist in the history:
  331. * either the client has made a mistake or the history they knew about no longer exists
  332. * call back with EUNKNOWN
  333. 2. if you did not provide a lastKnownHash
  334. * and there are fewer than two checkpoints:
  335. * return 0 (read from the start of the file)
  336. * and there are two or more checkpoints:
  337. * return the offset of the earliest checkpoint which 'sliceCpIndex' considers relevant
  338. 3. if you did provide a lastKnownHash
  339. * read through the log until you find the hash that you're looking for
  340. * call back with either the byte offset of the message that you found OR
  341. * -1 if you didn't find it
  342. */
  343. const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => {
  344. const cb = Util.once(Util.mkAsync(_cb));
  345. // lastKnownhash === -1 means we want the complete history
  346. if (lastKnownHash === -1) { return void cb(null, 0); }
  347. let offset = -1;
  348. nThen((waitFor) => {
  349. getIndex(Env, channelName, waitFor((err, index) => {
  350. if (err) { waitFor.abort(); return void cb(err); }
  351. // check if the "hash" the client is requesting exists in the index
  352. const lkh = index.offsetByHash[lastKnownHash];
  353. // fall through to the next block if the offset of the hash in question is not in memory
  354. if (lastKnownHash && typeof(lkh) !== "number") { return; }
  355. // Since last 2 checkpoints
  356. if (!lastKnownHash) {
  357. waitFor.abort();
  358. // Less than 2 checkpoints in the history: return everything
  359. if (index.cpIndex.length < 2) { return void cb(null, 0); }
  360. // Otherwise return the second last checkpoint's index
  361. return void cb(null, index.cpIndex[0].offset);
  362. /* LATER...
  363. in practice, two checkpoints can be very close together
  364. we have measures to avoid duplicate checkpoints, but editors
  365. can produce nearby checkpoints which are slightly different,
  366. and slip past these protections. To be really careful, we can
  367. seek past nearby checkpoints by some number of patches so as
  368. to ensure that all editors have sufficient knowledge of history
  369. to reconcile their differences. */
  370. }
  371. offset = lkh;
  372. }));
  373. }).nThen((w) => {
  374. // skip past this block if the offset is anything other than -1
  375. // this basically makes these first two nThen blocks behave like if-else
  376. if (offset !== -1) { return; }
  377. // either the message exists in history but is not in the cached index
  378. // or it does not exist at all. In either case 'getHashOffset' is expected
  379. // to return a number: -1 if not present, positive interger otherwise
  380. Env.getHashOffset(channelName, lastKnownHash, w(function (err, _offset) {
  381. if (err) {
  382. w.abort();
  383. return void cb(err);
  384. }
  385. offset = _offset;
  386. }));
  387. }).nThen(() => {
  388. cb(null, offset);
  389. });
  390. };
  391. /* getHistoryAsync
  392. * finds the appropriate byte offset from which to begin reading using 'getHistoryOffset'
  393. * streams through the rest of the messages, safely parsing them and returning the parsed content to the handler
  394. * calls back when it has reached the end of the log
  395. Used by:
  396. * GET_HISTORY
  397. */
  398. const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, cb) => {
  399. const store = Env.store;
  400. let offset = -1;
  401. nThen((waitFor) => {
  402. getHistoryOffset(Env, channelName, lastKnownHash, waitFor((err, os) => {
  403. if (err) {
  404. waitFor.abort();
  405. return void cb(err);
  406. }
  407. offset = os;
  408. }));
  409. }).nThen((waitFor) => {
  410. if (offset === -1) {
  411. return void cb(new Error('EUNKNOWN'));
  412. }
  413. const start = (beforeHash) ? 0 : offset;
  414. store.readMessagesBin(channelName, start, (msgObj, readMore, abort) => {
  415. if (beforeHash && msgObj.offset >= offset) { return void abort(); }
  416. var parsed = tryParse(Env, msgObj.buff.toString('utf8'));
  417. if (!parsed) { return void readMore(); }
  418. handler(parsed, readMore);
  419. }, waitFor(function (err) {
  420. return void cb(err);
  421. }));
  422. });
  423. };
  424. const handleRPC = function (Env, Server, seq, userId, parsed) {
  425. const HISTORY_KEEPER_ID = Env.id;
  426. /* RPC Calls... */
  427. var rpc_call = parsed.slice(1);
  428. Server.send(userId, [seq, 'ACK']);
  429. try {
  430. // slice off the sequence number and pass in the rest of the message
  431. Env.rpc(Server, userId, rpc_call, function (err, output) {
  432. if (err) {
  433. Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify([parsed[0], 'ERROR', err])]);
  434. return;
  435. }
  436. Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify([parsed[0]].concat(output))]);
  437. });
  438. } catch (e) {
  439. // if anything throws in the middle, send an error
  440. Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify([parsed[0], 'ERROR', 'SERVER_ERROR'])]);
  441. }
  442. };
  443. /*
  444. This is called when a user tries to connect to a channel that doesn't exist.
  445. we initialize that channel by writing the metadata supplied by the user to its log.
  446. if the provided metadata has an expire time then we also create a task to expire it.
  447. */
  448. const handleFirstMessage = function (Env, channelName, metadata) {
  449. Env.store.writeMetadata(channelName, JSON.stringify(metadata), function (err) {
  450. if (err) {
  451. // FIXME tell the user that there was a channel error?
  452. return void Env.Log.error('HK_WRITE_METADATA', {
  453. channel: channelName,
  454. error: err,
  455. });
  456. }
  457. });
  458. // write tasks
  459. if(metadata.expire && typeof(metadata.expire) === 'number') {
  460. // the fun part...
  461. // the user has said they want this pad to expire at some point
  462. Env.writeTask(metadata.expire, "EXPIRE", [ channelName ], function (err) {
  463. if (err) {
  464. // if there is an error, we don't want to crash the whole server...
  465. // just log it, and if there's a problem you'll be able to fix it
  466. // at a later date with the provided information
  467. Env.Log.error('HK_CREATE_EXPIRE_TASK', err);
  468. Env.Log.info('HK_INVALID_EXPIRE_TASK', JSON.stringify([metadata.expire, 'EXPIRE', channelName]));
  469. }
  470. });
  471. }
  472. };
  473. const handleGetHistory = function (Env, Server, seq, userId, parsed) {
  474. const metadata_cache = Env.metadata_cache;
  475. const HISTORY_KEEPER_ID = Env.id;
  476. const Log = Env.Log;
  477. // parsed[1] is the channel id
  478. // parsed[2] is a validation key or an object containing metadata (optionnal)
  479. // parsed[3] is the last known hash (optionnal)
  480. Server.send(userId, [seq, 'ACK']);
  481. var channelName = parsed[1];
  482. var config = parsed[2];
  483. var metadata = {};
  484. var lastKnownHash;
  485. var txid;
  486. // clients can optionally pass a map of attributes
  487. // if the channel already exists this map will be ignored
  488. // otherwise it will be stored as the initial metadata state for the channel
  489. if (config && typeof config === "object" && !Array.isArray(parsed[2])) {
  490. lastKnownHash = config.lastKnownHash;
  491. metadata = config.metadata || {};
  492. txid = config.txid;
  493. if (metadata.expire) {
  494. metadata.expire = +metadata.expire * 1000 + (+new Date());
  495. }
  496. }
  497. metadata.channel = channelName;
  498. metadata.created = +new Date();
  499. // if the user sends us an invalid key, we won't be able to validate their messages
  500. // so they'll never get written to the log anyway. Let's just drop their message
  501. // on the floor instead of doing a bunch of extra work
  502. // TODO send them an error message so they know something is wrong
  503. if (metadata.validateKey && !isValidValidateKeyString(metadata.validateKey)) {
  504. return void Log.error('HK_INVALID_KEY', metadata.validateKey);
  505. }
  506. nThen(function (waitFor) {
  507. var w = waitFor();
  508. /* fetch the channel's metadata.
  509. use it to check if the channel has expired.
  510. send it to the client if it exists.
  511. */
  512. getMetadata(Env, channelName, waitFor(function (err, metadata) {
  513. if (err) {
  514. Env.Log.error('HK_GET_HISTORY_METADATA', {
  515. channel: channelName,
  516. error: err,
  517. });
  518. return void w();
  519. }
  520. if (!metadata || !metadata.channel) { return w(); }
  521. // if there is already a metadata log then use it instead
  522. // of whatever the user supplied
  523. // it's possible that the channel doesn't have metadata
  524. // but in that case there's no point in checking if the channel expired
  525. // or in trying to send metadata, so just skip this block
  526. if (!metadata) { return void w(); }
  527. // And then check if the channel is expired. If it is, send the error and abort
  528. // FIXME this is hard to read because 'checkExpired' has side effects
  529. if (checkExpired(Env, Server, channelName)) { return void waitFor.abort(); }
  530. // always send metadata with GET_HISTORY requests
  531. Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(metadata)], w);
  532. }));
  533. }).nThen(() => {
  534. let msgCount = 0;
  535. // TODO compute lastKnownHash in a manner such that it will always skip past the metadata line?
  536. getHistoryAsync(Env, channelName, lastKnownHash, false, (msg, readMore) => {
  537. msgCount++;
  538. // avoid sending the metadata message a second time
  539. if (isMetadataMessage(msg) && metadata_cache[channelName]) { return readMore(); }
  540. if (txid) { msg[0] = txid; }
  541. Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(msg)], readMore);
  542. }, (err) => {
  543. if (err && err.code !== 'ENOENT') {
  544. if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", {
  545. err: err && err.message,
  546. stack: err && err.stack,
  547. }); }
  548. const parsedMsg = {error:err.message, channel: channelName, txid: txid};
  549. Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(parsedMsg)]);
  550. return;
  551. }
  552. if (msgCount === 0 && !metadata_cache[channelName] && Server.channelContainsUser(channelName, userId)) {
  553. handleFirstMessage(Env, channelName, metadata);
  554. Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(metadata)]);
  555. }
  556. // End of history message:
  557. let parsedMsg = {state: 1, channel: channelName, txid: txid};
  558. Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(parsedMsg)]);
  559. });
  560. });
  561. };
  562. const handleGetHistoryRange = function (Env, Server, seq, userId, parsed) {
  563. var channelName = parsed[1];
  564. var map = parsed[2];
  565. const HISTORY_KEEPER_ID = Env.id;
  566. if (!(map && typeof(map) === 'object')) {
  567. return void Server.send(userId, [seq, 'ERROR', 'INVALID_ARGS', HISTORY_KEEPER_ID]);
  568. }
  569. var oldestKnownHash = map.from;
  570. var desiredMessages = map.count;
  571. var desiredCheckpoint = map.cpCount;
  572. var txid = map.txid;
  573. if (typeof(desiredMessages) !== 'number' && typeof(desiredCheckpoint) !== 'number') {
  574. return void Server.send(userId, [seq, 'ERROR', 'UNSPECIFIED_COUNT', HISTORY_KEEPER_ID]);
  575. }
  576. if (!txid) {
  577. return void Server.send(userId, [seq, 'ERROR', 'NO_TXID', HISTORY_KEEPER_ID]);
  578. }
  579. Server.send(userId, [seq, 'ACK']);
  580. Env.getOlderHistory(channelName, oldestKnownHash, desiredMessages, desiredCheckpoint, function (err, toSend) {
  581. if (err && err.code !== 'ENOENT') {
  582. Env.Log.error("HK_GET_OLDER_HISTORY", err);
  583. }
  584. if (Array.isArray(toSend)) {
  585. toSend.forEach(function (msg) {
  586. Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId,
  587. JSON.stringify(['HISTORY_RANGE', txid, msg])]);
  588. });
  589. }
  590. Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId,
  591. JSON.stringify(['HISTORY_RANGE_END', txid, channelName])
  592. ]);
  593. });
  594. };
  595. const handleGetFullHistory = function (Env, Server, seq, userId, parsed) {
  596. const HISTORY_KEEPER_ID = Env.id;
  597. const Log = Env.Log;
  598. // parsed[1] is the channel id
  599. // parsed[2] is a validation key (optionnal)
  600. // parsed[3] is the last known hash (optionnal)
  601. Server.send(userId, [seq, 'ACK']);
  602. // FIXME should we send metadata here too?
  603. // none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22)
  604. return void getHistoryAsync(Env, parsed[1], -1, false, (msg, readMore) => {
  605. Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(['FULL_HISTORY', msg])], readMore);
  606. }, (err) => {
  607. let parsedMsg = ['FULL_HISTORY_END', parsed[1]];
  608. if (err) {
  609. Log.error('HK_GET_FULL_HISTORY', err.stack);
  610. parsedMsg = ['ERROR', parsed[1], err.message];
  611. }
  612. Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(parsedMsg)]);
  613. });
  614. };
  615. const directMessageCommands = {
  616. GET_HISTORY: handleGetHistory,
  617. GET_HISTORY_RANGE: handleGetHistoryRange,
  618. GET_FULL_HISTORY: handleGetFullHistory,
  619. };
  620. /* onDirectMessage
  621. * exported for use by the netflux-server
  622. * parses and handles all direct messages directed to the history keeper
  623. * check if it's expired and execute all the associated side-effects
  624. * routes queries to the appropriate handlers
  625. */
  626. HK.onDirectMessage = function (Env, Server, seq, userId, json) {
  627. const Log = Env.Log;
  628. const HISTORY_KEEPER_ID = Env.id;
  629. Log.silly('HK_MESSAGE', json);
  630. let parsed;
  631. try {
  632. parsed = JSON.parse(json[2]);
  633. } catch (err) {
  634. Log.error("HK_PARSE_CLIENT_MESSAGE", json);
  635. return;
  636. }
  637. var first = parsed[0];
  638. if (typeof(directMessageCommands[first]) !== 'function') {
  639. // it's either an unsupported command or an RPC call
  640. // either way, RPC has it covered
  641. return void handleRPC(Env, Server, seq, userId, parsed);
  642. }
  643. // otherwise it's some kind of history retrieval command...
  644. // go grab its metadata, because unfortunately people can ask for history
  645. // whether or not they have joined the channel, so we can't rely on JOIN restriction
  646. // to stop people from loading history they shouldn't see.
  647. var channelName = parsed[1];
  648. nThen(function (w) {
  649. getMetadata(Env, channelName, w(function (err, metadata) {
  650. if (err) {
  651. // stream errors?
  652. // we should log these, but if we can't load metadata
  653. // then it's probably not restricted or expired
  654. // it's not like anything else will recover from this anyway
  655. return;
  656. }
  657. // likewise, we can't do anything more here if there's no metadata
  658. // jump to the next block
  659. if (!metadata) { return; }
  660. // If the requested history is for an expired channel, abort
  661. // checkExpired has side effects and will disconnect users for you...
  662. if (checkExpired(Env, Server, parsed[1])) {
  663. // if the channel is expired just abort.
  664. w.abort();
  665. return;
  666. }
  667. // jump to handling the command if there's no restriction...
  668. if (!metadata.restricted) { return; }
  669. // check if the user is in the allow list...
  670. const allowed = HK.listAllowedUsers(metadata);
  671. const session = HK.getNetfluxSession(Env, userId);
  672. if (HK.isUserSessionAllowed(allowed, session)) {
  673. return;
  674. }
  675. /* Anyone in the userlist that isn't in the allow list should have already
  676. been kicked out of the channel. Likewise, disallowed users should not
  677. be able to add themselves to the userlist because JOIN commands respect
  678. access control settings. The error that is sent below protects against
  679. the remaining case, in which users try to get history without having
  680. joined the channel. Normally we'd send the allow list to tell them the
  681. key with which they should authenticate, but since we don't use this
  682. behaviour, I'm doing the easy thing and just telling them to GO AWAY.
  683. We can implement the more advanced behaviour later if it turns out that
  684. we need it. This command validates guards against all kinds of history
  685. access: GET_HISTORY, GET_HISTORY_RANGE, GET_FULL_HISTORY.
  686. */
  687. w.abort();
  688. return void Server.send(userId, [
  689. seq,
  690. 'ERROR',
  691. 'ERESTRICTED',
  692. HISTORY_KEEPER_ID
  693. ]);
  694. }));
  695. }).nThen(function () {
  696. // run the appropriate command from the map
  697. directMessageCommands[first](Env, Server, seq, userId, parsed);
  698. });
  699. };
  700. /* onChannelMessage
  701. Determine what we should store when a message a broadcasted to a channel"
  702. * ignores ephemeral channels
  703. * ignores messages sent to expired channels
  704. * rejects duplicated checkpoints
  705. * validates messages to channels that have validation keys
  706. * caches the id of the last saved checkpoint
  707. * adds timestamps to incoming messages
  708. * writes messages to the store
  709. */
  710. HK.onChannelMessage = function (Env, Server, channel, msgStruct) {
  711. //console.log(+new Date(), "onChannelMessage");
  712. const Log = Env.Log;
  713. // TODO our usage of 'channel' here looks prone to errors
  714. // we only use it for its 'id', but it can contain other stuff
  715. // also, we're using this RPC from both the RPC and Netflux-server
  716. // we should probably just change this to expect a channel id directly
  717. // don't store messages if the channel id indicates that it's an ephemeral message
  718. if (!channel.id || channel.id.length === EPHEMERAL_CHANNEL_LENGTH) { return; }
  719. const isCp = /^cp\|/.test(msgStruct[4]);
  720. let id;
  721. if (isCp) {
  722. // id becomes either null or an array or results...
  723. id = CHECKPOINT_PATTERN.exec(msgStruct[4]);
  724. if (Array.isArray(id) && id[2] && id[2] === channel.lastSavedCp) {
  725. // Reject duplicate checkpoints
  726. return;
  727. }
  728. }
  729. let metadata;
  730. nThen(function (w) {
  731. getMetadata(Env, channel.id, w(function (err, _metadata) {
  732. // if there's no channel metadata then it can't be an expiring channel
  733. // nor can we possibly validate it
  734. if (!_metadata) { return; }
  735. metadata = _metadata;
  736. // don't write messages to expired channels
  737. if (checkExpired(Env, Server, channel)) { return void w.abort(); }
  738. }));
  739. }).nThen(function (w) {
  740. // if there's no validateKey present skip to the next block
  741. if (!(metadata && metadata.validateKey)) { return; }
  742. // trim the checkpoint indicator off the message if it's present
  743. let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4];
  744. // convert the message from a base64 string into a Uint8Array
  745. //const txid = Util.uid();
  746. // Listen for messages
  747. //console.log(+new Date(), "Send verification request");
  748. Env.validateMessage(signedMsg, metadata.validateKey, w(function (err) {
  749. // no errors means success
  750. if (!err) { return; }
  751. // validation can fail in multiple ways
  752. if (err === 'FAILED') {
  753. // we log this case, but not others for some reason
  754. Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id);
  755. }
  756. // always abort if there was an error...
  757. return void w.abort();
  758. }));
  759. }).nThen(function () {
  760. // do checkpoint stuff...
  761. // 1. get the checkpoint id
  762. // 2. reject duplicate checkpoints
  763. if (isCp) {
  764. // if the message is a checkpoint we will have already validated
  765. // that it isn't a duplicate. remember its id so that we can
  766. // repeat this process for the next incoming checkpoint
  767. // WARNING: the fact that we only check the most recent checkpoints
  768. // is a potential source of bugs if one editor has high latency and
  769. // pushes a duplicate of an earlier checkpoint than the latest which
  770. // has been pushed by editors with low latency
  771. // FIXME
  772. if (Array.isArray(id) && id[2]) {
  773. // Store new checkpoint hash
  774. channel.lastSavedCp = id[2];
  775. }
  776. }
  777. // add the time to the message
  778. msgStruct.push(now());
  779. // storeMessage
  780. //console.log(+new Date(), "Storing message");
  781. storeMessage(Env, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4], Log));
  782. //console.log(+new Date(), "Message stored");
  783. });
  784. };