Skip to content

Commit 1f4fabc

Browse files
committedAug 5, 2024·
ft_watcher: PR comments
Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com>
1 parent 231ff99 commit 1f4fabc

File tree

4 files changed

+46
-19
lines changed

4 files changed

+46
-19
lines changed
 

‎watcher/src/databases/BigtableDatabase.ts

-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ export class BigtableDatabase extends Database {
4949
this.signedVAAsTableId = assertEnvironmentVariable('BIGTABLE_SIGNED_VAAS_TABLE_ID');
5050
this.vaasByTxHashTableId = assertEnvironmentVariable('BIGTABLE_VAAS_BY_TX_HASH_TABLE_ID');
5151
this.instanceId = assertEnvironmentVariable('BIGTABLE_INSTANCE_ID');
52-
// TODO: make these const?
5352
this.latestCollectionName = assertEnvironmentVariable('FIRESTORE_LATEST_COLLECTION');
5453
this.latestNTTCollectionName = assertEnvironmentVariable('FIRESTORE_LATEST_NTT_COLLECTION');
5554
this.latestFTCollectionName = assertEnvironmentVariable('FIRESTORE_LATEST_FT_COLLECTION');

‎watcher/src/watchers/FTEVMWatcher.ts

+44-15
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { MarketOrder } from '../fastTransfer/types';
1111
import { Block } from './EVMWatcher';
1212
import { BigNumber } from 'ethers';
1313
import axios from 'axios';
14-
14+
import { sleep } from '@wormhole-foundation/wormhole-monitor-common';
1515
export type BlockTag = 'finalized' | 'safe' | 'latest';
1616

1717
export class FTEVMWatcher extends Watcher {
@@ -39,7 +39,6 @@ export class FTEVMWatcher extends Watcher {
3939
this.rpc = RPCS_BY_CHAIN[this.network][this.chain]!;
4040
this.parser = new TokenRouterParser(this.network, chain, this.provider);
4141
this.logger.debug('FTWatcher', network, chain, finalizedBlockTag);
42-
4342
// hacky way to not connect to the db in tests
4443
// this is to allow ci to run without a db
4544
if (isTest) {
@@ -128,28 +127,58 @@ export class FTEVMWatcher extends Watcher {
128127
const { results, lastBlockTime } = await this.parser.getFTResultsInRange(fromBlock, toBlock);
129128

130129
if (results.length) {
131-
try {
132-
await this.saveFastTransfers(results);
133-
} catch (e) {
134-
this.logger.error(e);
135-
}
130+
await this.saveFastTransfers(results, fromBlock, toBlock);
136131
}
137132
return makeBlockKey(toBlock.toString(), lastBlockTime.toString());
138133
}
139134

140-
async saveFastTransfers(fastTransfers: MarketOrder[]): Promise<void> {
141-
// this is to allow ci to run without a db
135+
// saves fast transfers in smaller batches to reduce the impact in any case anything fails
136+
// retry with exponential backoff is used here
137+
async saveFastTransfers(
138+
fastTransfers: MarketOrder[],
139+
fromBlock: number,
140+
toBlock: number
141+
): Promise<void> {
142142
if (!this.pg) {
143143
return;
144144
}
145-
this.logger.debug(`saving ${fastTransfers.length} fast transfers`);
146145

147-
// Batch insert the fast transfers
148-
try {
149-
await this.pg('market_orders').insert(fastTransfers).onConflict('fast_vaa_id').merge();
150-
} catch (e) {
151-
this.logger.error(`Error saving fast transfers ${e}`);
146+
const batchSize = 50;
147+
const maxRetries = 3;
148+
const totalBatches = Math.ceil(fastTransfers.length / batchSize);
149+
150+
this.logger.debug(
151+
`Attempting to save ${fastTransfers.length} fast transfers in batches of ${batchSize}`
152+
);
153+
154+
for (let batchIndex = 0; batchIndex < fastTransfers.length; batchIndex += batchSize) {
155+
const batch = fastTransfers.slice(batchIndex, batchIndex + batchSize);
156+
const batchNumber = Math.floor(batchIndex / batchSize) + 1;
157+
158+
for (let attempt = 1; attempt <= maxRetries; attempt++) {
159+
try {
160+
await this.pg('market_orders').insert(batch).onConflict('fast_vaa_id').merge();
161+
this.logger.info(
162+
`Successfully saved batch ${batchNumber}/${totalBatches} (${batch.length} transfers)`
163+
);
164+
break;
165+
} catch (e) {
166+
if (attempt === maxRetries) {
167+
this.logger.error(
168+
`Failed to save batch ${batchNumber}/${totalBatches} from block ${fromBlock} - ${toBlock} after ${maxRetries} attempts`,
169+
e
170+
);
171+
} else {
172+
// Wait before retrying (exponential backoff)
173+
this.logger.warn(
174+
`Attempt ${attempt} failed for batch ${batchNumber}/${totalBatches}. Retrying...`
175+
);
176+
await sleep(1000 * Math.pow(2, attempt - 1));
177+
}
178+
}
179+
}
152180
}
181+
this.logger.info(`Completed saving fast transfers from block ${fromBlock} - ${toBlock}`);
153182
}
154183
}
155184

‎watcher/src/watchers/FTSolanaWatcher.ts

-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ export class FTSolanaWatcher extends SolanaWatcher {
107107
});
108108
}
109109

110-
// TODO: Modify this so watcher can actually call this function (Add enum for mode)
111110
async getFtMessagesForBlocks(fromSlot: number, toSlot: number): Promise<string> {
112111
if (fromSlot > toSlot) throw new Error('solana: invalid block range');
113112

‎watcher/src/watchers/Watcher.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ export class Watcher {
2323
this.chain = chain;
2424
this.mode = mode;
2525

26-
// `vaa` -> ''
26+
// `vaa` -> 'VAA_'
2727
// `ntt` -> 'NTT_'
2828
// `ft` -> 'FT_'
29-
const loggerPrefix = mode === 'vaa' ? '' : mode.toUpperCase() + '_';
29+
const loggerPrefix = mode.toUpperCase() + '_';
3030
this.logger = getLogger(loggerPrefix + chain);
3131
}
3232

0 commit comments

Comments
 (0)
Please sign in to comment.