Skip to content

Commit d5c6c3d

Browse files
committed
watcher: add supervisor to monitor watchers
1 parent 90abed6 commit d5c6c3d

File tree

5 files changed

+85
-1
lines changed

5 files changed

+85
-1
lines changed

watcher/src/consts.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1-
import { ChainName, CONTRACTS, Network } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
1+
import { ChainName, CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
22
import { Environment } from '@wormhole-foundation/wormhole-monitor-common';
33
import { AxiosRequestConfig } from 'axios';
44

55
export const TIMEOUT = 0.5 * 1000;
6+
export const HB_INTERVAL = 5 * 60 * 1000; // 5 Minutes
7+
export type WorkerData = {
8+
network: Environment;
9+
chain: ChainName;
10+
};
611

712
// Notes about RPCs
813
// Ethereum

watcher/src/index.ts

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
getEnvironment,
1111
getMode,
1212
} from '@wormhole-foundation/wormhole-monitor-common';
13+
import { startSupervisor } from './workers/supervisor';
1314

1415
initDb();
1516

@@ -100,3 +101,4 @@ if (mode === 'vaa') {
100101
} else {
101102
throw new Error(`Unknown mode: ${mode}`);
102103
}
104+
startSupervisor(supportedChains);

watcher/src/watchers/Watcher.ts

+4
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { TIMEOUT } from '../consts';
99
import { VaasByBlock } from '../databases/types';
1010
import { getResumeBlockByChain, storeLatestBlock, storeVaasByBlock } from '../databases/utils';
1111
import { getLogger, WormholeLogger } from '../utils/logger';
12+
import { parentPort } from 'worker_threads';
1213

1314
export class Watcher {
1415
chain: ChainName;
@@ -105,6 +106,9 @@ export class Watcher {
105106
this.logger.warn(`backing off for ${expoBacko}ms`);
106107
await sleep(expoBacko);
107108
}
109+
if (parentPort) {
110+
parentPort.postMessage('heartbeat');
111+
}
108112
}
109113
}
110114
}

watcher/src/workers/supervisor.ts

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { ChainName } from '@certusone/wormhole-sdk';
2+
import { Worker } from 'worker_threads';
3+
import { HB_INTERVAL, WorkerData } from '../consts';
4+
import { getLogger } from '../utils/logger';
5+
import { Environment, getEnvironment } from '@wormhole-foundation/wormhole-monitor-common';
6+
7+
interface WorkerInfo {
8+
worker: Worker;
9+
data: WorkerData;
10+
lastHB: number;
11+
}
12+
13+
const workers: { [key: string]: WorkerInfo } = {};
14+
const logger = getLogger('supervisor');
15+
const network: Environment = getEnvironment();
16+
17+
function spawnWorker(data: WorkerData) {
18+
const workerName = `${data.chain}Worker`;
19+
logger.info(`Spawning worker ${workerName} on network ${network}...`);
20+
const worker = new Worker('./dist/src/workers/worker.js', { workerData: data });
21+
22+
worker.on('message', (message) => {
23+
if (message === 'heartbeat') {
24+
logger.debug(`Worker ${workerName} sent HB`);
25+
workers[workerName].lastHB = Date.now();
26+
}
27+
});
28+
29+
worker.on('exit', (code) => {
30+
logger.warn(`Worker ${workerName} exited with code ${code}`);
31+
if (code !== 0) {
32+
logger.error(`Restarting worker ${workerName}...`);
33+
spawnWorker(data);
34+
}
35+
});
36+
37+
workers[workerName] = { worker, data, lastHB: Date.now() };
38+
logger.debug('Finished spawning worker:', workerName);
39+
}
40+
41+
function monitorWorkers() {
42+
setInterval(() => {
43+
for (const [workerName, workerInfo] of Object.entries(workers)) {
44+
logger.debug(
45+
`Checking worker ${workerName} with lastHB of ${new Date(workerInfo.lastHB)}...`
46+
);
47+
if (Date.now() - workerInfo.lastHB > HB_INTERVAL) {
48+
logger.error(`Worker ${workerName} missed HB, restarting...`);
49+
workerInfo.worker.terminate();
50+
spawnWorker(workerInfo.data);
51+
}
52+
}
53+
}, HB_INTERVAL);
54+
}
55+
56+
export function startSupervisor(supportedChains: ChainName[]) {
57+
supportedChains.forEach((chain) => {
58+
const workerData: WorkerData = { network, chain };
59+
spawnWorker(workerData);
60+
});
61+
62+
monitorWorkers();
63+
}

watcher/src/workers/worker.ts

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { initDb } from '../databases/utils';
2+
import { makeFinalizedWatcher } from '../watchers/utils';
3+
import { workerData } from 'worker_threads';
4+
5+
initDb(false);
6+
const network = workerData.network;
7+
const chain = workerData.chain;
8+
console.log(`Making watcher for ${network} ${chain}...`);
9+
makeFinalizedWatcher(network, chain).watch();
10+
console.log(`Watcher for ${network} ${chain} started.`);

0 commit comments

Comments
 (0)