Skip to content

Commit

Permalink
[OGUI-1554]DCS SOR panel (#2601)
Browse files Browse the repository at this point in the history
  • Loading branch information
graduta authored Oct 9, 2024
1 parent f174ea3 commit 7c50c94
Show file tree
Hide file tree
Showing 15 changed files with 481 additions and 7 deletions.
66 changes: 66 additions & 0 deletions Control/lib/adapters/DcsIntegratedEventAdapter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

/**
* DcsIntegratedEventAdapter - Given an AliECS Integrated Service Event for DCS.SOR, build a DCS Integrated Event
*
* The DCS SOR event is a special event that comes from either:
* * the DCS service itself (when containing the payload "dcsEvent") and it is for one detector only
* * the ECS service which describes steps being executed for all detectors involved
*/
class DcsIntegratedEventAdapter {
/**
* DcsIntegratedEventAdapter
*/
constructor() {
}

/**
* Build a DCS Integrated Event from an AliECS Integrated Service Event. If it is a DCSevent, the detector will replace detectors array
* @param {object} event - AliECS Integrated Service Event
* @param {number} timestamp - timestamp of the event (int64 as per proto file definition)
* @return {object} DCS Integrated Event
*/
static buildDcsIntegratedEvent(event, timestamp) {
const { name, error, environmentId, payload } = event;
const { operationName, operationStatus, operationStep, operationStepStatus } = event;

const payloadJSON = JSON.parse(payload);
const { dcsEvent, runNumber, detector = null, state } = payloadJSON;
if (!dcsEvent) {
return null;
}
let { detectors } = payloadJSON;

if (detector) {
// event comes with information also from DCS and it comes per detector for SOR so we override detectors
detectors = [detector];
}

return {
name,
timestamp: Number(timestamp),
error,
environmentId,
runNumber,
state,
operationName,
operationStatus,
operationStep,
operationStepStatus,
detectors
};
}
}

exports.DcsIntegratedEventAdapter = DcsIntegratedEventAdapter;
19 changes: 17 additions & 2 deletions Control/lib/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* or submit itself to any jurisdiction.
*/

const { Kafka, logLevel } = require('kafkajs');
const logger = (require('@aliceo2/web-ui').LogManager)
.getLogger(`${process.env.npm_config_log_label ?? 'cog'}/api`);
const config = require('./config/configProvider.js');
Expand Down Expand Up @@ -45,6 +46,7 @@ const {WorkflowTemplateService} = require('./services/WorkflowTemplate.service.j
const {NotificationService, ConsulService} = require('@aliceo2/web-ui');

// AliECS Core
const { AliEcsSynchronizer } = require('./control-core/AliEcsSynchronizer.js');
const AliecsRequestHandler = require('./control-core/RequestHandler.js');
const ApricotService = require('./control-core/ApricotService.js');
const ControlService = require('./control-core/ControlService.js');
Expand Down Expand Up @@ -100,19 +102,32 @@ module.exports.setup = (http, ws) => {
aliecsReqHandler.setWs(ws);
aliecsReqHandler.workflowService = workflowService;

const envCache = new EnvCache(ctrlService, envService);
const envCache = new EnvCache(ctrlService, envService, cacheService);
envCache.setWs(ws);

const bkpService = new BookkeepingService(config.bookkeeping ?? {});
const runService = new RunService(bkpService, apricotService, cacheService);
runService.retrieveStaticConfigurations();
const runController = new RunController(runService, cacheService);

const notificationService = new NotificationService(config.kafka);
const notificationService = new NotificationService();
if (notificationService.isConfigured()) {
notificationService.proxyWebNotificationToWs(ws);
}

let aliEcsSynchronizer = undefined;
if (config.kafka && config.kafka?.enable) {
const kafkaClient = new Kafka({
clientId: 'control-gui',
brokers: config.kafka.brokers,
retry: { retries: 3 },
logLevel: logLevel.NOTHING,
});

aliEcsSynchronizer = new AliEcsSynchronizer(kafkaClient, cacheService);
aliEcsSynchronizer.start();
}

const statusService = new StatusService(
config, ctrlService, consulService, apricotService, notificationService, wsService,
);
Expand Down
3 changes: 3 additions & 0 deletions Control/lib/common/cacheKeys.enum.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
const CacheKeys = Object.freeze({
CALIBRATION_RUNS_BY_DETECTOR: 'CALIBRATION_RUNS_BY_DETECTOR',
CALIBRATION_RUNS_REQUESTS: 'CALIBRATION_RUNS_REQUESTS',
DCS: {
SOR: 'DCS.SOR',
}
});

exports.CacheKeys = CacheKeys;
109 changes: 109 additions & 0 deletions Control/lib/control-core/AliEcsEventMessagesConsumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/
const protobuf = require('protobufjs');
const path = require('node:path');
const { LogManager } = require('@aliceo2/web-ui');

const protoDir = path.resolve(__dirname, './../../protobuf/protos');
const root = protobuf.loadSync(path.resolve(protoDir, 'events.proto'));
const EventMessage = root.lookupType('events.Event');

/**
* @callback MessageReceivedCallback
* @param {EventMessage} message received message
* @return {Promise<void>}
*/

/**
* Consumer that consume ECS event messages and pass them to previously-registered listeners
* @author Martin Boulais <mboulais@cern.ch>
* Until consumer is added in the common library, consumer was extracted from:
* - https://github.com/AliceO2Group/Bookkeeping/blob/main/lib/server/kafka/AliEcsEventMessagesConsumer.js
*/
class AliEcsEventMessagesConsumer {
/**
* Constructor
*
* @param {import('kafkajs').Kafka} kafkaClient configured kafka client
* @param {string} groupId the group id to use for the kafka consumer
* @param {string[]} topics the list of topics to consume
*/
constructor(kafkaClient, groupId, topics) {
this.consumer = kafkaClient.consumer({ groupId });
this._topics = topics;

/**
* @type {MessageReceivedCallback[]}
* @private
*/
this._listeners = [];

this._logger = LogManager.getLogger('cog/ecs-event-consumer');
}

/**
* Register a listener to listen on event message being received
*
* Listeners are called all at once, not waiting for completion before calling the next ones, only errors are caught and logged
*
* @param {MessageReceivedCallback} listener the listener to register
* @return {void}
*/
onMessageReceived(listener) {
this._listeners.push(listener);
}

/**
* Start the kafka consumer
*
* @return {Promise<void>} Resolves once the consumer started to consume messages
*/
async start() {
this._logger.infoMessage(`Started to listen on kafka topic ${this._topics}`);
await this.consumer.connect();
await this.consumer.subscribe({ topics: this._topics });
await this.consumer.run({
eachMessage: async ({ message, topic }) => {
const error = EventMessage.verify(message.value);
if (error) {
this._logger.errorMessage(`Received an invalid message on "${topic}" ${error}`);
return;
}
await this._handleEvent(
EventMessage.toObject(
EventMessage.decode(message.value),
{ enums: String },
)
);
},
});
}

/**
* Call every registered listeners by passing the given message to it
*
* @param {EventMessage} message the message to pass to listeners
* @return {void}
*/
async _handleEvent(message) {
for (const listener of this._listeners) {
try {
await listener(message);
} catch (error) {
this._logger.errorMessage(`An error occurred when handling event: ${error.message}\n${error.stack}`);
}
}
}
}

exports.AliEcsEventMessagesConsumer = AliEcsEventMessagesConsumer;
89 changes: 89 additions & 0 deletions Control/lib/control-core/AliEcsSynchronizer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

const { AliEcsEventMessagesConsumer } = require('./AliEcsEventMessagesConsumer.js');
const { DcsIntegratedEventAdapter } = require('../adapters/DcsIntegratedEventAdapter.js');
const { CacheKeys } = require('../common/cacheKeys.enum.js');
const { LogManager } = require('@aliceo2/web-ui');

const INTEGRATED_SERVICES_CONSUMER_GROUP = 'cog-integrated-services';
const INTEGRATED_SERVICES_TOPICS = ['aliecs.integrated_service.dcs'];
const SOR_EVENT_NAME = 'readout-dataflow.dcs.sor';

/**
* Utility synchronizing AliECS data into control-gui, listening to kafka
*/
class AliEcsSynchronizer {
/**
* Constructor
*
* @param {import('kafkajs').Kafka} kafkaClient - configured kafka client
* @param {CacheService} cacheService - instance of CacheService
*/
constructor(kafkaClient, cacheService) {
this._cacheService = cacheService;
this._logger = LogManager.getLogger('cog/ali-ecs-synchronizer');

this._ecsIntegratedServiceConsumer = new AliEcsEventMessagesConsumer(
kafkaClient,
INTEGRATED_SERVICES_CONSUMER_GROUP,
INTEGRATED_SERVICES_TOPICS
);
this._ecsIntegratedServiceConsumer.onMessageReceived(async (eventMessage) => {
const { timestamp, integratedServiceEvent } = eventMessage;
try {
if (integratedServiceEvent.name === SOR_EVENT_NAME) {
const dcsSorEvent = DcsIntegratedEventAdapter.buildDcsIntegratedEvent(integratedServiceEvent, timestamp);
if (!dcsSorEvent) {
return;
}
const { environmentId } = dcsSorEvent;
let cachedDcsSteps = this._cacheService.getByKey(CacheKeys.DCS.SOR);
if (!cachedDcsSteps) {
cachedDcsSteps = {};
}
if (!cachedDcsSteps?.[environmentId]) {
cachedDcsSteps[environmentId] = {
displayCache: true,
dcsOperations: [dcsSorEvent]
};
} else {
cachedDcsSteps[environmentId].dcsOperations.push(dcsSorEvent);
}
cachedDcsSteps[environmentId].dcsOperations.sort((a, b) => a.timestamp - b.timestamp);
this._cacheService.updateByKeyAndBroadcast(CacheKeys.DCS.SOR, cachedDcsSteps, {command: CacheKeys.DCS.SOR});
}
} catch (error) {
this._logger.errorMessage(`Error when parsing event message: ${error.message}\n${error.trace}`);
}
});
}

/**
* Start the synchronization process
*
* @return {void}
*/
start() {
this._logger.infoMessage('Starting to consume AliECS messages for integrated services');
this._ecsIntegratedServiceConsumer
.start()
.catch((error) =>
this._logger.errorMessage(
`Error when starting ECS integrated services consumer: ${error.message}\n${error.trace}`
)
);
}
}

exports.AliEcsSynchronizer = AliEcsSynchronizer;
13 changes: 12 additions & 1 deletion Control/lib/control-core/EnvCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
const {WebSocketMessage, LogManager} = require('@aliceo2/web-ui');
const logger = LogManager.getLogger(`${process.env.npm_config_log_label ?? 'cog'}/envcache`);
const assert = require('assert');
const { CacheKeys } = require('../common/cacheKeys.enum.js');

/**
* Caches AliECS core GetEnvironments response
Expand All @@ -24,9 +25,11 @@ class EnvCache {
/**
* @param {object} ctrlService - Handle to Control service
* @param {EnvironmentService} environmentService - service to be used to retrieve information on environments
* @param {CacheService} cacheService - service to be used to retrieve information from cache
*/
constructor(ctrlService, environmentService) {
constructor(ctrlService, environmentService, cacheService) {
this.ctrlService = ctrlService;
this._cacheService = cacheService;
this.cache = {};
this.timeout = 9000;
this.cacheEvictionTimeout = 5 * 60 * 1000;
Expand Down Expand Up @@ -88,6 +91,14 @@ class EnvCache {
for (let [index, currentEnv] of envs.environments.entries()) {
try {
const environment = await this._environmentService.getEnvironment(currentEnv.id);
if (environment.state === 'RUNNING' && !environment.currentTransition) {
// if environment reached a stable running state, hide the display cache SOR information from future ERROR states
const dcsCache = this._cacheService.getByKey(CacheKeys.DCS.SOR);
if (dcsCache?.[environment.id]) {
dcsCache[environment.id].displayCache = false;
}
this._cacheService.updateByKeyAndBroadcast(CacheKeys.DCS.SOR, dcsCache,{command: CacheKeys.DCS.SOR});
}
envs.environments[index] = environment;
this._updateCache(envs);
} catch (error) {
Expand Down
3 changes: 2 additions & 1 deletion Control/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Control/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
"@aliceo2/web-ui": "2.7.1",
"@grpc/grpc-js": "1.12.0",
"@grpc/proto-loader": "0.7.0",
"google-protobuf": "3.21.0"
"google-protobuf": "3.21.0",
"kafkajs": "2.2.4"
},
"bundledDependencies": [
"@aliceo2/web-ui",
Expand Down
Loading

0 comments on commit 7c50c94

Please sign in to comment.