-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathlocateMessageGaps.ts
140 lines (138 loc) · 5.26 KB
/
locateMessageGaps.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import * as dotenv from 'dotenv';
dotenv.config();
import { ChainId, CHAIN_ID_SOLANA, coalesceChainName } from '@certusone/wormhole-sdk';
import {
Environment,
INITIAL_DEPLOYMENT_BLOCK_BY_NETWORK_AND_CHAIN,
getEnvironment,
sleep,
} from '@wormhole-foundation/wormhole-monitor-common';
import { TIMEOUT } from '../src/consts';
import { BigtableDatabase } from '../src/databases/BigtableDatabase';
import { parseMessageId } from '../src/databases/utils';
import { makeFinalizedWatcher } from '../src/watchers/utils';
import { Watcher } from '../src/watchers/Watcher';
// This script checks for gaps in the message sequences for an emitter.
// Ideally this shouldn't happen, but there seems to be an issue with Oasis, Karura, and Celo
(async () => {
const network: Environment = getEnvironment();
const bt = new BigtableDatabase();
if (!bt.bigtable) {
throw new Error('bigtable is undefined');
}
const instance = bt.bigtable.instance(bt.instanceId);
const messageTable = instance.table(bt.msgTableId);
try {
// Find gaps in sequence numbers with the same chain and emitter
// Filter known bad emitters
// Sort by ascending sequence number
const observedMessages = (await messageTable.getRows())[0]
.filter((m) => {
const { chain, emitter } = parseMessageId(m.id);
if (
chain === CHAIN_ID_SOLANA &&
emitter === '6bb14509a612f01fbbc4cffeebd4bbfb492a86df717ebe92eb6df432a3f00a25'
) {
return false;
}
return true;
})
.sort((a, b) => Number(parseMessageId(a.id).sequence - parseMessageId(b.id).sequence));
const total = observedMessages.length;
console.log(`processing ${total} messages`);
const gaps = [];
const latestEmission: { [emitter: string]: { sequence: bigint; block: number } } = {};
for (const observedMessage of observedMessages) {
const {
chain: emitterChain,
block,
emitter: emitterAddress,
sequence,
} = parseMessageId(observedMessage.id);
const chainName = coalesceChainName(emitterChain as ChainId);
const emitter = `${emitterChain}/${emitterAddress}`;
if (!latestEmission[emitter]) {
latestEmission[emitter] = {
sequence: chainName === 'algorand' || chainName === 'near' ? 0n : -1n,
block: Number(INITIAL_DEPLOYMENT_BLOCK_BY_NETWORK_AND_CHAIN[network][chainName] || 0),
};
}
while (sequence > latestEmission[emitter].sequence + 1n) {
latestEmission[emitter].sequence += 1n;
gaps.push(
[
emitterChain,
`${latestEmission[emitter].block}-${block}`,
emitterAddress,
latestEmission[emitter].sequence.toString(),
].join('/')
);
}
latestEmission[emitter] = { sequence, block };
}
// console.log(latestEmission);
// Sort by chain, emitter, sequence
gaps.sort((a, b) => {
const [aChain, _aBlocks, aEmitter, aSequence] = a.split('/');
const [bChain, _bBlocks, bEmitter, bSequence] = b.split('/');
return (
aChain.localeCompare(bChain) ||
aEmitter.localeCompare(bEmitter) ||
Number(BigInt(aSequence) - BigInt(bSequence))
);
});
console.log(gaps);
// Search misses and submit them to the db
let prevChain = '0';
let fromBlock = -1;
for (const gap of gaps) {
const [chain, blockRange, emitter, sequence] = gap.split('/');
const chainName = coalesceChainName(Number(chain) as ChainId);
if (chainName === 'algorand') {
console.warn('skipping algorand gaps until backfilled');
continue;
}
let watcher: Watcher;
try {
watcher = makeFinalizedWatcher(network, chainName);
} catch (e) {
console.error('skipping gap for unsupported chain', chainName);
continue;
}
const range = blockRange.split('-');
const rangeStart = parseInt(range[0]);
const rangeEnd = parseInt(range[1]);
if (prevChain === chain && rangeStart < fromBlock) {
// don't reset on consecutive ranges of missing sequence numbers
console.log('resuming at', fromBlock, 'on', chain);
} else {
fromBlock = rangeStart;
prevChain = chain;
console.log('starting at', fromBlock, 'on', chain);
}
let found = false;
while (fromBlock <= rangeEnd && !found) {
const toBlock = Math.min(fromBlock + watcher.maximumBatchSize - 1, rangeEnd);
const messages = await watcher.getMessagesForBlocks(fromBlock, toBlock);
for (const message of Object.entries(messages).filter(([key, value]) => value.length > 0)) {
const locatedMessages = message[1].filter((msgKey) => {
const [_transaction, vaaKey] = msgKey.split(':');
const [_chain, msgEmitter, msgSeq] = vaaKey.split('/');
return emitter === msgEmitter && sequence === msgSeq;
});
if (locatedMessages.length > 0) {
await bt.storeVaasByBlock(chainName, { [message[0]]: locatedMessages }, false);
console.log('located', message[0], locatedMessages);
found = true;
}
}
if (!found) {
fromBlock = toBlock + 1;
await sleep(TIMEOUT);
}
}
}
} catch (e) {
console.error(e);
}
})();