diff --git a/OpenFlow/src/Config.ts b/OpenFlow/src/Config.ts index d3193378..f85203a8 100644 --- a/OpenFlow/src/Config.ts +++ b/OpenFlow/src/Config.ts @@ -59,6 +59,7 @@ export class Config { Config.auto_hourly_housekeeping = Config.parseBoolean(Config.getEnv("auto_hourly_housekeeping", "false")); Config.housekeeping_update_usage_hourly = Config.parseBoolean(Config.getEnv("housekeeping_update_usage_hourly", "false")); Config.housekeeping_update_usersize_hourly = Config.parseBoolean(Config.getEnv("housekeeping_update_usersize_hourly", "true")); + Config.housekeeping_skip_collections = Config.getEnv("housekeeping_skip_collections", ""); Config.workitem_queue_monitoring_enabled = Config.parseBoolean(Config.getEnv("workitem_queue_monitoring_enabled", "true")); Config.workitem_queue_monitoring_interval = parseInt(Config.getEnv("workitem_queue_monitoring_interval", (30 * 1000).toString())); // 30 sec @@ -237,6 +238,7 @@ export class Config { public static auto_hourly_housekeeping: boolean = Config.parseBoolean(Config.getEnv("auto_hourly_housekeeping", "true")); public static housekeeping_update_usage_hourly: boolean = Config.parseBoolean(Config.getEnv("housekeeping_update_usage_hourly", "false")); public static housekeeping_update_usersize_hourly: boolean = Config.parseBoolean(Config.getEnv("housekeeping_update_usersize_hourly", "true")); + public static housekeeping_skip_collections: string = Config.getEnv("housekeeping_skip_collections", ""); public static workitem_queue_monitoring_enabled: boolean = Config.parseBoolean(Config.getEnv("workitem_queue_monitoring_enabled", "true")); public static workitem_queue_monitoring_interval: number = parseInt(Config.getEnv("workitem_queue_monitoring_interval", (30 * 1000).toString())); // 30 sec diff --git a/OpenFlow/src/DatabaseConnection.ts b/OpenFlow/src/DatabaseConnection.ts index 7a1c7bc8..3346f978 100644 --- a/OpenFlow/src/DatabaseConnection.ts +++ b/OpenFlow/src/DatabaseConnection.ts @@ -260,11 +260,15 @@ export class DatabaseConnection extends events.EventEmitter { } if (!NoderedUtil.IsNullEmpty(wiq.robotqueue) && !NoderedUtil.IsNullEmpty(wiq.workflowid)) { Logger.instanse.verbose("[workitems] Send invoke message to robot queue " + wiq.workflowid); - await amqpwrapper.Instance().send(null, wiq.robotqueue, payload, 5000, null, null, 2); + let expiration = (Config.amqp_requeue_time / 2, 10) | 0; + if (expiration < 500) expiration = 500; + await amqpwrapper.Instance().send(null, wiq.robotqueue, payload, expiration, null, null, 2); } if (!NoderedUtil.IsNullEmpty(wiq.amqpqueue)) { Logger.instanse.verbose("[workitems] Send invoke message to amqp queue " + wiq.amqpqueue); - await amqpwrapper.Instance().send(null, wiq.amqpqueue, payload, 5000, null, null, 2); + let expiration = (Config.amqp_requeue_time / 2, 10) | 0; + if (expiration < 500) expiration = 500; + await amqpwrapper.Instance().send(null, wiq.amqpqueue, payload, expiration, null, null, 2); } } } diff --git a/OpenFlow/src/Messages/Message.ts b/OpenFlow/src/Messages/Message.ts index c19f8c78..8a29d54e 100644 --- a/OpenFlow/src/Messages/Message.ts +++ b/OpenFlow/src/Messages/Message.ts @@ -138,7 +138,7 @@ export class Message { await this.GetNoderedInstance(span); break; case "housekeeping": - await this.Housekeeping(false, false, false, span); + await this.Housekeeping(span); break; case "updateworkitemqueue": await this.UpdateWorkitemQueue(span); @@ -656,10 +656,7 @@ export class Message { if (Config.enable_openflow_amqp) { cli.Send(await QueueClient.SendForProcessing(this, this.priority)); } else { - // await this.Housekeeping(false, false, false, span); - Message.lastHouseKeeping = null; - var msg = JSON.parse(this.data); - await this.Housekeeping(msg.skipnodered, msg.skipcalculatesize, msg.skipupdateusersize, span); + await this.Housekeeping(span); cli.Send(this); } break; @@ -3559,7 +3556,33 @@ export class Message { if (diffminutes < 60) return false; return true; } - public async Housekeeping(skipNodered: boolean, skipCalculateSize: boolean, skipUpdateUserSize: boolean, parent: Span): Promise { + private async Housekeeping(parent: Span): Promise { + this.Reply(); + const span: Span = Logger.otel.startSubSpan("message.GetNoderedInstance", parent); + let msg: any; + try { + msg = JSON.parse(this.data); + Message.lastHouseKeeping = null; + if (NoderedUtil.IsNullEmpty(msg.skipnodered)) msg.skipnodered = false; + if (NoderedUtil.IsNullEmpty(msg.skipcalculatesize)) msg.skipcalculatesize = false; + if (NoderedUtil.IsNullEmpty(msg.skipupdateusersize)) msg.skipupdateusersize = false; + await this._Housekeeping(msg.skipnodered, msg.skipcalculatesize, msg.skipupdateusersize, span); + } catch (error) { + span?.recordException(error); + this.data = ""; + await handleError(null, error); + if (msg !== null && msg !== undefined) msg.error = error.message ? error.message : error; + } + try { + this.data = JSON.stringify(msg); + } catch (error) { + span?.recordException(error); + this.data = ""; + await handleError(null, error); + } + Logger.otel.endSpan(span); + } + public async _Housekeeping(skipNodered: boolean, skipCalculateSize: boolean, skipUpdateUserSize: boolean, parent: Span): Promise { if (Message.lastHouseKeeping == null) { Message.lastHouseKeeping = new Date(); Message.lastHouseKeeping.setDate(Message.lastHouseKeeping.getDate() - 1); @@ -3764,8 +3787,15 @@ export class Message { collections = collections.filter(x => x.name.indexOf("system.") === -1); let totalusage = 0; let index = 0; + let skip_collections = []; + if (!NoderedUtil.IsNullEmpty(Config.housekeeping_skip_collections)) skip_collections = Config.housekeeping_skip_collections.split(",") for (let col of collections) { if (col.name == "fs.chunks") continue; + if (skip_collections.indexOf(col.name) > -1) { + Logger.instanse.debug("[housekeeping][" + col.name + "] skipped due to housekeeping_skip_collections setting"); + continue; + } + index++; let aggregates: any = [ { diff --git a/OpenFlow/src/index.ts b/OpenFlow/src/index.ts index 94c2e794..c9a60ef9 100644 --- a/OpenFlow/src/index.ts +++ b/OpenFlow/src/index.ts @@ -59,7 +59,7 @@ function doHouseKeeping() { var msg2 = new Message(); msg2.jwt = Crypt.rootToken(); var h = dt.getHours(); var skipUpdateUsage: boolean = !(dt.getHours() == 1 || dt.getHours() == 13); - msg2.Housekeeping(false, skipUpdateUsage, skipUpdateUsage, null).catch((error) => Logger.instanse.error(error)); + msg2._Housekeeping(false, skipUpdateUsage, skipUpdateUsage, null).catch((error) => Logger.instanse.error(error)); // var dt = new Date(new Date().toISOString()); // var msg = new Message(); msg.jwt = Crypt.rootToken(); diff --git a/OpenFlowNodeRED/package.json b/OpenFlowNodeRED/package.json index 176b45e9..23d74065 100644 --- a/OpenFlowNodeRED/package.json +++ b/OpenFlowNodeRED/package.json @@ -1,6 +1,6 @@ { "name": "@openiap/nodered", - "version": "1.4.3", + "version": "1.4.4", "description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)", "main": "index.js", "scripts": { diff --git a/OpenFlowNodeRED/src/nodered/nodes/api.html b/OpenFlowNodeRED/src/nodered/nodes/api.html index 6f2f0b12..7ffe2b70 100644 --- a/OpenFlowNodeRED/src/nodered/nodes/api.html +++ b/OpenFlowNodeRED/src/nodered/nodes/api.html @@ -1796,8 +1796,67 @@ + + + +