Skip to content

Commit

Permalink
Add cluster commit option to cluster memory.
Browse files Browse the repository at this point in the history
  • Loading branch information
TekMonksGitHub committed Apr 25, 2024
1 parent 02033af commit 3350b2c
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 9 deletions.
13 changes: 9 additions & 4 deletions backend/server/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ if (cluster.isMaster) {
else numWorkers = numCPUs;
}

const _forkWorker = _ =>
cluster.fork().on("message", msg => {for (const worker_id in cluster.workers) cluster.workers[worker_id].send(msg)});
const _forkWorker = _ => {
const worker = cluster.fork();
worker.on("message", msg => {
if (msg.type == "cluster.count")
worker.send({count: Object.keys(cluster.workers).length, id: msg.id});
else for (const worker_id in cluster.workers) cluster.workers[worker_id].send(msg)
});
};

// Fork workers.
console.log(`Starting ${numWorkers} workers.`);
Expand All @@ -29,6 +35,5 @@ if (cluster.isMaster) {
console.log("[TCP] Worker server with PID: " + worker.process.pid + " died.");
console.log("[TCP] Forking a new process to compensate.");
_forkWorker();
});

});
} else require(`${__dirname}/server.js`).bootstrap();
74 changes: 69 additions & 5 deletions backend/server/lib/clustermemory.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,86 @@
* License: See enclosed LICENSE file.
*/

const _clusterMemory = {}, _listeners = {}, SET_MSG = "__org_monkshu_cluster_memory_set";
const _clusterMemory = {}, _listeners = {}, SET_MSG = "__org_monkshu_cluster_memory_set",
POLL_VALUE_RESPONSE = "__org_monkshu_cluster_pollvalue_response", DEFAULT_POLL_TIMEOUT = 200,
POLL_VALUE_REQUEST = "__org_monkshu_cluster_pollvalue_request", CLUSTER_COUNT = "cluster.count";

/**
* Inits the cluster memory
*/
function init() {
process.on("message", msg => { if (msg.type == SET_MSG) _processSetMessage(msg.obj); });
process.on("message", msg => {
if (msg.type == SET_MSG) _processSetMessage(msg.obj);
if (msg.type == POLL_VALUE_REQUEST) {
process.send({type: POLL_VALUE_RESPONSE, key: msg.key, id: msg.id, value: _clusterMemory[msg.key]});
}
});
global.CLUSTER_MEMORY = this;
}

/**
* Sets the key and its value in the global memory.
* @param {string} key The key
* @param {object} value The value
* @param {boolean} ensureReplicated Optional: If true will return only once entire cluster is updated
* @param {number} replicationTimeout Optional: Timeout for cluster to sync to this value, default is 200ms
*/
function set(key, value) {
function set(key, value, ensureReplicated, replicationTimeout=DEFAULT_POLL_TIMEOUT) {
const obj = {}; obj[key] = value;
if (process.send) process.send({type: SET_MSG, obj})
else _processSetMessage(obj);

if (ensureReplicated && process.send) return new Promise(async resolve => {
let resolved = false, repliesReceived = 0;
const clusterCount = await _getClusterCount(replicationTimeout);
if (!clusterCount) {resolve("Replication timeout error"); return;} // error or timeout
const requestID = _createRequestID(), msgListener = function(msg) {
if ((msg.type == POLL_VALUE_RESPONSE) && (msg.key == key) && (msg.id == requestID) && (!resolved)) {
repliesReceived++; if (repliesReceived == clusterCount) {
resolved = true; process.removeListener("message", msgListener); resolve();
}
}
}
process.on("message", msgListener); process.send({type: POLL_VALUE_REQUEST, key, id: requestID}); // start the polling
setTimeout(_=>{ if (!resolved) {
resolved = true; process.removeListener("message", msgListener); resolve("Replication timeout error");
} }, replicationTimeout);
});
}

/**
* Given a key, returns its value from the global memory.
* @param {string} key The key.
* @param {boolean||object} initIfUndefined If an object is provided and the key is not defined, it will be set to
* @param {boolean||object} initIfUndefined Optional: If an object is provided and the key is not defined, it will be set to
* this or an empty object if the value of this param is the boolean true.
* @param {boolean} pollReplicas Optional: If true, replicas will be polled if the value is not local to check if it is set somewhere
* @param {number} polltimeout Optional: Timeout for polling values from other workers, default is 200 ms.
* @return The value if found, undefined if not.
*/
const get = (key, initIfUndefined) => {
const get = (key, initIfUndefined, pollReplicas, polltimeout=DEFAULT_POLL_TIMEOUT) => {
if ((!_clusterMemory[key]) && pollReplicas && process.send) return new Promise(async resolve => {
let repliesReceived = 0, resolved = false;
const clusterCount = await _getClusterCount(polltimeout);
if (!clusterCount) {if (initIfUndefined) set(key, initIfUndefined); resolve(null||initIfUndefined); return;} // error or timeout
const requestID = _createRequestID(), msgListener = function(msg) {
if ((msg.type == POLL_VALUE_RESPONSE) && (msg.key == key) && (msg.id == requestID) && (!resolved)) {
repliesReceived++;
if (msg.value) {
_clusterMemory[key] = msg.value; resolved = true;
process.removeListener("message", this); resolve(msg.value);
} else if (repliesReceived == clusterCount) {
resolved = true; process.removeListener("message", msgListener);
if (initIfUndefined) set(key, initIfUndefined); resolve(null||initIfUndefined);
}
}
}
process.on("message", msgListener); process.send({type: POLL_VALUE_REQUEST, key, id: requestID});
setTimeout(_=>{ if (!resolved) {
resolved = true; process.removeListener("message", msgListener);
if (initIfUndefined) set(key, initIfUndefined); resolve(null||initIfUndefined);
} }, polltimeout);
});

if ((!_clusterMemory[key]) && initIfUndefined) {set(key, initIfUndefined); return initIfUndefined;}
else return _clusterMemory[key];
}
Expand All @@ -54,4 +105,17 @@ function _processSetMessage(obj) {
}
}

function _getClusterCount(timeout) {
return new Promise(resolve => {
let resolved = false; const requestID = _createRequestID(), msgListener = function (msg) {
if (msg.id == requestID) {
resolved = true; resolve(msg.count); process.removeListener("message", msgListener); }
}
process.on("message", msgListener); process.send({type: CLUSTER_COUNT, id: requestID});
setTimeout(_=>{if (!resolved) {process.removeListener("message", msgListener); resolve(null)}}, timeout);
});
}

const _createRequestID = _ => `${Date.now()}-${Math.round(Math.random()*1000)}`;

module.exports = {init, set, get, listen};

0 comments on commit 3350b2c

Please sign in to comment.