|
|
|
@ -1,6 +1,11 @@ |
|
|
|
/*@flow*/ |
|
|
|
/* jshint esversion: 6 */ |
|
|
|
/* global Buffer */ |
|
|
|
var Fs = require("fs"); |
|
|
|
var Path = require("path"); |
|
|
|
var nThen = require("nthen"); |
|
|
|
const ToPull = require('stream-to-pull-stream'); |
|
|
|
const Pull = require('pull-stream'); |
|
|
|
|
|
|
|
var mkPath = function (env, channelId) { |
|
|
|
return Path.join(env.root, channelId.slice(0, 2), channelId) + '.ndjson'; |
|
|
|
@ -8,7 +13,7 @@ var mkPath = function (env, channelId) { |
|
|
|
|
|
|
|
var getMetadataAtPath = function (Env, path, cb) { |
|
|
|
var remainder = ''; |
|
|
|
var stream = Fs.createReadStream(path, 'utf8'); |
|
|
|
var stream = Fs.createReadStream(path, { encoding: 'utf8' }); |
|
|
|
var complete = function (err, data) { |
|
|
|
var _cb = cb; |
|
|
|
cb = undefined; |
|
|
|
@ -25,16 +30,16 @@ var getMetadataAtPath = function (Env, path, cb) { |
|
|
|
var parsed = null; |
|
|
|
try { |
|
|
|
parsed = JSON.parse(metadata); |
|
|
|
complete(void 0, parsed); |
|
|
|
complete(undefined, parsed); |
|
|
|
} |
|
|
|
catch (e) { |
|
|
|
console.log(); |
|
|
|
console.log("getMetadataAtPath"); |
|
|
|
console.error(e); |
|
|
|
complete('INVALID_METADATA'); |
|
|
|
} |
|
|
|
}); |
|
|
|
stream.on('end', function () { |
|
|
|
complete(null); |
|
|
|
complete(); |
|
|
|
}); |
|
|
|
stream.on('error', function (e) { complete(e); }); |
|
|
|
}; |
|
|
|
@ -59,7 +64,7 @@ var closeChannel = function (env, channelName, cb) { |
|
|
|
var clearChannel = function (env, channelId, cb) { |
|
|
|
var path = mkPath(env, channelId); |
|
|
|
getMetadataAtPath(env, path, function (e, metadata) { |
|
|
|
if (e) { return cb(e); } |
|
|
|
if (e) { return cb(new Error(e)); } |
|
|
|
if (!metadata) { |
|
|
|
return void Fs.truncate(path, 0, function (err) { |
|
|
|
if (err) { |
|
|
|
@ -87,7 +92,7 @@ var clearChannel = function (env, channelId, cb) { |
|
|
|
|
|
|
|
var readMessages = function (path, msgHandler, cb) { |
|
|
|
var remainder = ''; |
|
|
|
var stream = Fs.createReadStream(path, 'utf8'); |
|
|
|
var stream = Fs.createReadStream(path, { encoding: 'utf8' }); |
|
|
|
var complete = function (err) { |
|
|
|
var _cb = cb; |
|
|
|
cb = undefined; |
|
|
|
@ -106,6 +111,60 @@ var readMessages = function (path, msgHandler, cb) { |
|
|
|
stream.on('error', function (e) { complete(e); }); |
|
|
|
}; |
|
|
|
|
|
|
|
const NEWLINE_CHR = ('\n').charCodeAt(0); |
|
|
|
const mkBufferSplit = () => { |
|
|
|
let remainder = null; |
|
|
|
return Pull((read) => { |
|
|
|
return (abort, cb) => { |
|
|
|
read(abort, function (end, data) { |
|
|
|
if (end) { |
|
|
|
cb(end, remainder ? [remainder, data] : [data]); |
|
|
|
remainder = null; |
|
|
|
return; |
|
|
|
} |
|
|
|
const queue = []; |
|
|
|
for (;;) { |
|
|
|
const offset = data.indexOf(NEWLINE_CHR); |
|
|
|
if (offset < 0) { |
|
|
|
remainder = remainder ? Buffer.concat([remainder, data]) : data; |
|
|
|
break; |
|
|
|
} |
|
|
|
let subArray = data.slice(0, offset); |
|
|
|
if (remainder) { |
|
|
|
subArray = Buffer.concat([remainder, subArray]); |
|
|
|
remainder = null; |
|
|
|
} |
|
|
|
queue.push(subArray); |
|
|
|
data = data.slice(offset + 1); |
|
|
|
} |
|
|
|
cb(end, queue); |
|
|
|
}); |
|
|
|
}; |
|
|
|
}, Pull.flatten()); |
|
|
|
}; |
|
|
|
|
|
|
|
const mkOffsetCounter = () => { |
|
|
|
let offset = 0; |
|
|
|
return Pull.map((buff) => { |
|
|
|
const out = { offset: offset, buff: buff }; |
|
|
|
// +1 for the eaten newline
|
|
|
|
offset += buff.length + 1; |
|
|
|
return out; |
|
|
|
}); |
|
|
|
}; |
|
|
|
|
|
|
|
const readMessagesBin = (env, id, start, msgHandler, cb) => { |
|
|
|
const stream = Fs.createReadStream(mkPath(env, id), { start: start }); |
|
|
|
let keepReading = true; |
|
|
|
Pull( |
|
|
|
ToPull.read(stream), |
|
|
|
mkBufferSplit(), |
|
|
|
mkOffsetCounter(), |
|
|
|
Pull.asyncMap((data, moreCb) => { msgHandler(data, moreCb, ()=>{ keepReading = false; moreCb(); }); }), |
|
|
|
Pull.drain(()=>(keepReading), cb) |
|
|
|
); |
|
|
|
}; |
|
|
|
|
|
|
|
var checkPath = function (path, callback) { |
|
|
|
// TODO check if we actually need to use stat at all
|
|
|
|
Fs.stat(path, function (err) { |
|
|
|
@ -117,7 +176,8 @@ var checkPath = function (path, callback) { |
|
|
|
callback(err); |
|
|
|
return; |
|
|
|
} |
|
|
|
Fs.mkdir(Path.dirname(path), function (err) { |
|
|
|
// 511 -> octal 777
|
|
|
|
Fs.mkdir(Path.dirname(path), 511, function (err) { |
|
|
|
if (err && err.code !== 'EEXIST') { |
|
|
|
callback(err); |
|
|
|
return; |
|
|
|
@ -154,7 +214,28 @@ var flushUnusedChannels = function (env, cb, frame) { |
|
|
|
cb(); |
|
|
|
}; |
|
|
|
|
|
|
|
var getChannel = function (env, id, callback) { |
|
|
|
var channelBytes = function (env, chanName, cb) { |
|
|
|
var path = mkPath(env, chanName); |
|
|
|
Fs.stat(path, function (err, stats) { |
|
|
|
if (err) { return void cb(err); } |
|
|
|
cb(undefined, stats.size); |
|
|
|
}); |
|
|
|
}; |
|
|
|
|
|
|
|
/*:: |
|
|
|
export type ChainPadServer_ChannelInternal_t = { |
|
|
|
atime: number, |
|
|
|
writeStream: typeof(process.stdout), |
|
|
|
whenLoaded: ?Array<(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void>, |
|
|
|
onError: Array<(?Error)=>void>, |
|
|
|
path: string |
|
|
|
}; |
|
|
|
*/ |
|
|
|
var getChannel = function ( |
|
|
|
env, |
|
|
|
id, |
|
|
|
callback /*:(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void*/ |
|
|
|
) { |
|
|
|
if (env.channels[id]) { |
|
|
|
var chan = env.channels[id]; |
|
|
|
chan.atime = +new Date(); |
|
|
|
@ -178,9 +259,9 @@ var getChannel = function (env, id, callback) { |
|
|
|
}); |
|
|
|
} |
|
|
|
var path = mkPath(env, id); |
|
|
|
var channel = env.channels[id] = { |
|
|
|
var channel /*:ChainPadServer_ChannelInternal_t*/ = env.channels[id] = { |
|
|
|
atime: +new Date(), |
|
|
|
writeStream: undefined, |
|
|
|
writeStream: (undefined /*:any*/), |
|
|
|
whenLoaded: [ callback ], |
|
|
|
onError: [ ], |
|
|
|
path: path |
|
|
|
@ -193,6 +274,9 @@ var getChannel = function (env, id, callback) { |
|
|
|
if (err) { |
|
|
|
delete env.channels[id]; |
|
|
|
} |
|
|
|
if (!channel.writeStream) { |
|
|
|
throw new Error("getChannel() complete called without channel writeStream"); |
|
|
|
} |
|
|
|
whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); }); |
|
|
|
}; |
|
|
|
var fileExists; |
|
|
|
@ -211,7 +295,7 @@ var getChannel = function (env, id, callback) { |
|
|
|
var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' }); |
|
|
|
env.openFiles++; |
|
|
|
stream.on('open', waitFor()); |
|
|
|
stream.on('error', function (err) { |
|
|
|
stream.on('error', function (err /*:?Error*/) { |
|
|
|
env.openFiles--; |
|
|
|
// this might be called after this nThen block closes.
|
|
|
|
if (channel.whenLoaded) { |
|
|
|
@ -228,20 +312,22 @@ var getChannel = function (env, id, callback) { |
|
|
|
}); |
|
|
|
}; |
|
|
|
|
|
|
|
var message = function (env, chanName, msg, cb) { |
|
|
|
const messageBin = (env, chanName, msgBin, cb) => { |
|
|
|
getChannel(env, chanName, function (err, chan) { |
|
|
|
if (err) { |
|
|
|
if (!chan) { |
|
|
|
cb(err); |
|
|
|
return; |
|
|
|
} |
|
|
|
let called = false; |
|
|
|
var complete = function (err) { |
|
|
|
var _cb = cb; |
|
|
|
cb = undefined; |
|
|
|
if (_cb) { _cb(err); } |
|
|
|
if (called) { return; } |
|
|
|
called = true; |
|
|
|
cb(err); |
|
|
|
}; |
|
|
|
chan.onError.push(complete); |
|
|
|
chan.writeStream.write(msg + '\n', function () { |
|
|
|
chan.onError.splice(chan.onError.indexOf(complete) - 1, 1); |
|
|
|
chan.writeStream.write(msgBin, function () { |
|
|
|
/*::if (!chan) { throw new Error("Flow unreachable"); }*/ |
|
|
|
chan.onError.splice(chan.onError.indexOf(complete), 1); |
|
|
|
if (!cb) { return; } |
|
|
|
//chan.messages.push(msg);
|
|
|
|
chan.atime = +new Date(); |
|
|
|
@ -250,9 +336,13 @@ var message = function (env, chanName, msg, cb) { |
|
|
|
}); |
|
|
|
}; |
|
|
|
|
|
|
|
var message = function (env, chanName, msg, cb) { |
|
|
|
messageBin(env, chanName, new Buffer(msg + '\n', 'utf8'), cb); |
|
|
|
}; |
|
|
|
|
|
|
|
var getMessages = function (env, chanName, handler, cb) { |
|
|
|
getChannel(env, chanName, function (err, chan) { |
|
|
|
if (err) { |
|
|
|
if (!chan) { |
|
|
|
cb(err); |
|
|
|
return; |
|
|
|
} |
|
|
|
@ -271,21 +361,39 @@ var getMessages = function (env, chanName, handler, cb) { |
|
|
|
errorState = true; |
|
|
|
return void cb(err); |
|
|
|
} |
|
|
|
if (!chan) { throw new Error("impossible, flow checking"); } |
|
|
|
chan.atime = +new Date(); |
|
|
|
cb(); |
|
|
|
}); |
|
|
|
}); |
|
|
|
}; |
|
|
|
|
|
|
|
var channelBytes = function (env, chanName, cb) { |
|
|
|
var path = mkPath(env, chanName); |
|
|
|
Fs.stat(path, function (err, stats) { |
|
|
|
if (err) { return void cb(err); } |
|
|
|
cb(void 0, stats.size); |
|
|
|
}); |
|
|
|
/*:: |
|
|
|
export type ChainPadServer_MessageObj_t = { buff: Buffer, offset: number }; |
|
|
|
export type ChainPadServer_Storage_t = { |
|
|
|
readMessagesBin: ( |
|
|
|
channelName:string, |
|
|
|
start:number, |
|
|
|
asyncMsgHandler:(msg:ChainPadServer_MessageObj_t, moreCb:()=>void, abortCb:()=>void)=>void, |
|
|
|
cb:(err:?Error)=>void |
|
|
|
)=>void, |
|
|
|
message: (channelName:string, content:string, cb:(err:?Error)=>void)=>void, |
|
|
|
messageBin: (channelName:string, content:Buffer, cb:(err:?Error)=>void)=>void, |
|
|
|
getMessages: (channelName:string, msgHandler:(msg:string)=>void, cb:(err:?Error)=>void)=>void, |
|
|
|
removeChannel: (channelName:string, cb:(err:?Error)=>void)=>void, |
|
|
|
closeChannel: (channelName:string, cb:(err:?Error)=>void)=>void, |
|
|
|
flushUnusedChannels: (cb:()=>void)=>void, |
|
|
|
getChannelSize: (channelName:string, cb:(err:?Error, size:?number)=>void)=>void, |
|
|
|
getChannelMetadata: (channelName:string, cb:(err:?Error|string, data:?any)=>void)=>void, |
|
|
|
clearChannel: (channelName:string, (err:?Error)=>void)=>void |
|
|
|
}; |
|
|
|
|
|
|
|
module.exports.create = function (conf, cb) { |
|
|
|
const flow_Config = require('../config.example.js'); |
|
|
|
type Config_t = typeof(flow_Config); |
|
|
|
*/ |
|
|
|
module.exports.create = function ( |
|
|
|
conf /*:Config_t*/, |
|
|
|
cb /*:(store:ChainPadServer_Storage_t)=>void*/ |
|
|
|
) { |
|
|
|
var env = { |
|
|
|
root: conf.filePath || './datastore', |
|
|
|
channels: { }, |
|
|
|
@ -294,15 +402,22 @@ module.exports.create = function (conf, cb) { |
|
|
|
openFiles: 0, |
|
|
|
openFileLimit: conf.openFileLimit || 2048, |
|
|
|
}; |
|
|
|
Fs.mkdir(env.root, function (err) { |
|
|
|
// 0x1ff -> 777
|
|
|
|
Fs.mkdir(env.root, 0x1ff, function (err) { |
|
|
|
if (err && err.code !== 'EEXIST') { |
|
|
|
// TODO: somehow return a nice error
|
|
|
|
throw err; |
|
|
|
} |
|
|
|
cb({ |
|
|
|
readMessagesBin: (channelName, start, asyncMsgHandler, cb) => { |
|
|
|
readMessagesBin(env, channelName, start, asyncMsgHandler, cb); |
|
|
|
}, |
|
|
|
message: function (channelName, content, cb) { |
|
|
|
message(env, channelName, content, cb); |
|
|
|
}, |
|
|
|
messageBin: (channelName, content, cb) => { |
|
|
|
messageBin(env, channelName, content, cb); |
|
|
|
}, |
|
|
|
getMessages: function (channelName, msgHandler, cb) { |
|
|
|
getMessages(env, channelName, msgHandler, cb); |
|
|
|
}, |
|
|
|
@ -331,4 +446,4 @@ module.exports.create = function (conf, cb) { |
|
|
|
setInterval(function () { |
|
|
|
flushUnusedChannels(env, function () { }); |
|
|
|
}, 5000); |
|
|
|
}; |
|
|
|
}; |