Skip to content

Commit

Permalink
[OGUI-1412] Add new CacheService and Broadcast new calibration inform…
Browse files Browse the repository at this point in the history
…ation (#2185)

* adds a new cacheservice(in-memory) that is to be used by all the other components for updates
* broadcast calibration run to all clients if information is new
* udpates `RunController` to initially serve data from cache and if missing, to request from Bookkeeping
  • Loading branch information
graduta authored Oct 31, 2023
1 parent 48908ac commit 9a01f75
Show file tree
Hide file tree
Showing 10 changed files with 266 additions and 47 deletions.
27 changes: 17 additions & 10 deletions Control/lib/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,24 @@ const {WorkflowTemplateController} = require('./controllers/WorkflowTemplate.con

// local services
const {BookkeepingService} = require('./services/Bookkeeping.service.js');
const {BroadcastService} = require('./services/Broadcast.service.js');
const {CacheService} = require('./services/Cache.service.js');
const {EnvironmentService} = require('./services/Environment.service.js');
const {Intervals} = require('./services/Intervals.service.js');
const Lock = require('./services/Lock.js');
const {RunService} = require('./services/Run.service.js');
const {StatusService} = require('./services/Status.service.js');
const {WorkflowTemplateService} = require('./services/WorkflowTemplate.service.js');
const Lock = require('./services/Lock.js');

// web-ui services
const {NotificationService, ConsulService} = require('@aliceo2/web-ui');

// AliECS Core
const GrpcProxy = require('./control-core/GrpcProxy.js');
const ControlService = require('./control-core/ControlService.js');
const ApricotService = require('./control-core/ApricotService.js');
const AliecsRequestHandler = require('./control-core/RequestHandler.js');
const ApricotService = require('./control-core/ApricotService.js');
const ControlService = require('./control-core/ControlService.js');
const EnvCache = require('./control-core/EnvCache.js');
const GrpcProxy = require('./control-core/GrpcProxy.js');

const path = require('path');
const O2_CONTROL_PROTO_PATH = path.join(__dirname, './../protobuf/o2control.proto');
Expand All @@ -65,6 +67,8 @@ module.exports.setup = (http, ws) => {
consulService = new ConsulService(config.consul);
}
const wsService = new WebSocketService(ws);
const broadcastService = new BroadcastService(ws);
const cacheService = new CacheService(broadcastService);

const consulController = new ConsulController(consulService, config.consul);
consulController.testConsulStatus();
Expand All @@ -89,9 +93,9 @@ module.exports.setup = (http, ws) => {
envCache.setWs(ws);

const bkpService = new BookkeepingService(config.bookkeeping ?? {});
const runService = new RunService(bkpService, apricotService);
const runService = new RunService(bkpService, apricotService, cacheService);
runService.init();
const runController = new RunController(runService);
const runController = new RunController(runService, cacheService);

const notificationService = new NotificationService(config.kafka);
if (notificationService.isConfigured()) {
Expand Down Expand Up @@ -185,8 +189,11 @@ function initializeIntervals(intervalsService, statusService, runService, bkpSer
intervalsService.register(statusService.retrieveNotificationSystemStatus.bind(statusService), SERVICES_REFRESH_RATE);
intervalsService.register(statusService.retrieveAliECSIntegratedInfo.bind(statusService), SERVICES_REFRESH_RATE);

intervalsService.register(
runService.retrieveCalibrationRunsGroupedByDetector.bind(runService),
CALIBRATION_RUNS_REFRESH_RATE
);

if (config.bookkeeping) {
intervalsService.register(
runService.retrieveCalibrationRunsGroupedByDetector.bind(runService),
CALIBRATION_RUNS_REFRESH_RATE
);
}
}
22 changes: 22 additions & 0 deletions Control/lib/common/cacheKeys.enum.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* @license
* Copyright 2019-2020 CERN and copyright holders of ALICE O2.
* See http://alice-o2.web.cern.ch/copyright for details of the copyright holders.
* All rights not expressly granted are reserved.
*
* This software is distributed under the terms of the GNU General Public
* License v3 (GPL Version 3), copied verbatim in the file "COPYING".
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

/**
* Keys that are used to set/get information to/from the CacheService
*/
const CacheKeys = Object.freeze({
CALIBRATION_RUNS_BY_DETECTOR: 'CALIBRATION_RUNS_BY_DETECTOR'
});

exports.CacheKeys = CacheKeys;
22 changes: 19 additions & 3 deletions Control/lib/controllers/Run.controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
const {Log} = require('@aliceo2/web-ui');
const {updateExpressResponseFromNativeError} = require('./../errors/updateExpressResponseFromNativeError.js');
const {CacheKeys} = require('./../common/cacheKeys.enum.js');

/**
* Controller for dealing with all API requests on retrieving information on runs
Expand All @@ -21,14 +22,20 @@ class RunController {
/**
* Constructor for initializing controller of runs
* @param {RunService} runService - service to use to build information on runs
* @param {CacheService} cacheService - service to use for retrieving information stored in-memory
*/
constructor(runService) {
constructor(runService, cacheService) {
this._logger = new Log(`${process.env.npm_config_log_label ?? 'cog'}/run-ctrl`);

/**
* @type {RunService}
*/
this._runService = runService;

/**
* @type {CacheService}
*/
this._cacheService = cacheService;
}

/**
Expand All @@ -37,9 +44,18 @@ class RunController {
* @param {Response} res - HTTP Response object
* @returns {void}
*/
getCalibrationRunsHandler(_, res) {
async getCalibrationRunsHandler(_, res) {
let calibrationRuns;
try {
calibrationRuns = this._cacheService.getByKey(CacheKeys.CALIBRATION_RUNS_BY_DETECTOR);
} catch (error) {
this._logger.debug(`Unable to serve from cache due to: ${error}`);
}
try {
res.status(200).json(this._runService.calibrationRunsPerDetector);
if (!calibrationRuns) {
calibrationRuns = await this._runService.retrieveCalibrationRunsGroupedByDetector();
}
res.status(200).json(calibrationRuns);
} catch (error) {
this._logger.debug(error);
updateExpressResponseFromNativeError(res, error);
Expand Down
2 changes: 1 addition & 1 deletion Control/lib/services/Bookkeeping.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const {httpGetJson} = require('./../utils.js');
const RunSummaryAdapter = require('./../adapters/RunSummaryAdapter.js');
const {BookkeepingFilterAdapter} = require('./../adapters/external/BookkeepingFilterAdapter.js');

const DEFAULT_REFRESH_RATE = 10000;
const DEFAULT_REFRESH_RATE = 30000;

/**
* BookkeepingService class to be used to retrieve data from Bookkeeping
Expand Down
50 changes: 50 additions & 0 deletions Control/lib/services/Broadcast.service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* @license
* Copyright 2019-2020 CERN and copyright holders of ALICE O2.
* See http://alice-o2.web.cern.ch/copyright for details of the copyright holders.
* All rights not expressly granted are reserved.
*
* This software is distributed under the terms of the GNU General Public
* License v3 (GPL Version 3), copied verbatim in the file "COPYING".
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

const {WebSocketMessage} = require('@aliceo2/web-ui');

/**
* @class
* BroadcastService class is to be used for building a websocket message and broadcasting it via web sockets
*/
class BroadcastService {
/**
* @constructor
* Constructor for initializing the service with AliceO2/websocket service instance to use
* @param {WebSocket} wsService - which is to be used for broadcasting
*/
constructor(wsService) {
/**
* @type {WebSocket}
*/
this._wsService = wsService;
}

/**
* Method to receive command and payload to build a WebSocket message and broadcast it to all listening clients
* @param {String} command - command to be added to websocket message
* @param {Object} payload - payload to be sent to the clients
* @return {void}
*/
broadcast(command, payload) {
if (payload) {
const message = new WebSocketMessage()
.setCommand(command)
.setPayload(payload);
this._wsService?.broadcast(message);
}
}
}

module.exports = {BroadcastService};
82 changes: 82 additions & 0 deletions Control/lib/services/Cache.service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* @license
* Copyright 2019-2020 CERN and copyright holders of ALICE O2.
* See http://alice-o2.web.cern.ch/copyright for details of the copyright holders.
* All rights not expressly granted are reserved.
*
* This software is distributed under the terms of the GNU General Public
* License v3 (GPL Version 3), copied verbatim in the file "COPYING".
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

const {Log} = require('@aliceo2/web-ui');
const {deepStrictEqual, AssertionError} = require('assert');

/**
* @class
* CacheService class is designed to store in-memory information and allow users to also broadcast new information to the all or registered clients.
*/
class CacheService {
/**
* @constructor
* Constructor for initializing the service with:
* - empty maps for needed information
* - optional service for broadcasting information
* @param {BroadcastService} broadcastService - which is to be used for broadcasting
*/
constructor(broadcastService) {

/**
* @type {Object<String, Object>}
*/
this._memory = {};

/**
* @type {BroadcastService}
*/
this._broadcastService = broadcastService;

this._logger = new Log(`${process.env.npm_config_log_label ?? 'cog'}/cache-service`);
}

/**
* Method to receive a function for retrieval of information and a key under which the information should be updated
* @param {String} key - key under which the information should be stored
* @param {String} value - command to be used for broadcasting message
* @param {Object} broadcastConfig - object containing broadcast information; if present information will be broadcasted
* @return {void}
*/
async updateByKeyAndBroadcast(key, value, {command} = {}) {
if (value) {
try {
deepStrictEqual(value, this._memory[key]);
} catch (error) {
if (error instanceof AssertionError) {
this._memory[key] = value;
if (command) {
this._broadcastService.broadcast(command, value);
}
} else {
this._logger.debug(`Unable to update key ${key} due to ${error}`);
}
}
}
}

/**
* Getter for retrieving a copy of the information stored in-memory under a certain key
* @param {key} - key under which information is stored
* @return {Object}
*/
getByKey(key) {
if (this._memory[key]) {
return JSON.parse(JSON.stringify(this._memory[key]));
}
return null;
}
}

module.exports = {CacheService};
40 changes: 17 additions & 23 deletions Control/lib/services/Run.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
*/

const {Log} = require('@aliceo2/web-ui');
const {grpcErrorToNativeError} = require('./../errors/grpcErrorToNativeError.js');

const {RUNTIME_COMPONENT: {COG}, RUNTIME_KEY: {CALIBRATION_MAPPING}} = require('./../common/kvStore/runtime.enum.js');
const {RunDefinitions} = require('./../common/runDefinition.enum.js')
const {CacheKeys} = require('../common/cacheKeys.enum.js');
const {grpcErrorToNativeError} = require('./../errors/grpcErrorToNativeError.js');
const {LOG_LEVEL} = require('./../common/logLevel.enum.js');
const {RunCalibrationStatus} = require('./../common/runCalibrationStatus.enum.js');
const {RunDefinitions} = require('./../common/runDefinition.enum.js')
const {RUNTIME_COMPONENT: {COG}, RUNTIME_KEY: {CALIBRATION_MAPPING}} = require('./../common/kvStore/runtime.enum.js');

/**
* @class
Expand All @@ -33,8 +34,9 @@ class RunService {
* Constructor for configuring the service to retrieve data via passed services
* @param {BookkeepingService} bkpService - service for retrieving RUNs information
* @param {ApricotService} apricotService - service for retrieving information through AliECS Apricot gRPC connection, mainly KV Store data
* @param {CacheService} cacheService - service to store information in-memory
*/
constructor(bkpService, apricotService) {
constructor(bkpService, apricotService, cacheService) {
/**
* @type {BookkeepingService}
*/
Expand All @@ -45,6 +47,11 @@ class RunService {
*/
this._apricotService = apricotService;

/**
* @type {CacheService}
*/
this._cacheService = cacheService;

/**
* @type {Object<String, Number>}
*/
Expand All @@ -56,16 +63,6 @@ class RunService {
*/
this._calibrationConfigurationPerDetectorMap = {};

/**
* @type {Object<String, Array<RunSummary>>}
*/
this._calibrationRunsPerDetector = {};

/**
* @type {Object<String, Array<RunSummary>>}
*/
this._calibrationRunsPerDetector = {};

this._logger = new Log(`${process.env.npm_config_log_label ?? 'cog'}/run-service`);
}

Expand All @@ -76,7 +73,7 @@ class RunService {
async init() {
this._calibrationConfigurationPerDetectorMap = await this._retrieveCalibrationConfigurationsForDetectors();
this._runTypes = await this._bkpService.getRunTypes();
this._calibrationRunsPerDetector = await this.retrieveCalibrationRunsGroupedByDetector();
await this.retrieveCalibrationRunsGroupedByDetector();
}

/**
Expand Down Expand Up @@ -109,7 +106,11 @@ class RunService {
}
}
}
this._calibrationRunsPerDetector = calibrationRunsPerDetector;
this._cacheService?.updateByKeyAndBroadcast(
CacheKeys.CALIBRATION_RUNS_BY_DETECTOR,
calibrationRunsPerDetector,
{command: CacheKeys.CALIBRATION_RUNS_BY_DETECTOR}
);
return calibrationRunsPerDetector;
}

Expand Down Expand Up @@ -157,13 +158,6 @@ class RunService {
return this._calibrationConfigurationPerDetectorMap;
}

/**
* Return the object containing a KV object with detector and its corresponding last calibration runs
* @return {Object<String, Array<RunSummary>>}
*/
get calibrationRunsPerDetector() {
return this._calibrationRunsPerDetector;
}
}

module.exports = {RunService};
8 changes: 7 additions & 1 deletion Control/public/Model.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,13 @@ export default class Model extends Observable {
if (message?.payload[STATUS_COMPONENTS_KEYS.GENERAL_SYSTEM_KEY]) {
this.about.updateComponentStatus('system', message.payload[STATUS_COMPONENTS_KEYS.GENERAL_SYSTEM_KEY])
}

break
case 'CALIBRATION_RUNS_BY_DETECTOR':
if (message.payload) {
this.calibrationRunsModel.calibrationRuns = RemoteData.success(message?.payload)
this.notify();
}
break
}
}

Expand Down
Loading

0 comments on commit 9a01f75

Please sign in to comment.