@@ -7,21 +7,25 @@ import { ethers } from 'ethers';
7
7
import { AXIOS_CONFIG_JSON , RPCS_BY_CHAIN } from '../consts' ;
8
8
import { makeBlockKey } from '../databases/utils' ;
9
9
import TokenRouterParser from '../fastTransfer/tokenRouter/parser' ;
10
- import { MarketOrder } from '../fastTransfer/types' ;
10
+ import SwapLayerParser from '../fastTransfer/swapLayer/parser' ;
11
+ import { MarketOrder , RedeemSwap } from '../fastTransfer/types' ;
11
12
import { Block } from './EVMWatcher' ;
12
13
import { BigNumber } from 'ethers' ;
13
14
import axios from 'axios' ;
14
15
import { sleep } from '@wormhole-foundation/wormhole-monitor-common' ;
16
+
15
17
export type BlockTag = 'finalized' | 'safe' | 'latest' ;
16
18
17
19
export class FTEVMWatcher extends Watcher {
18
20
finalizedBlockTag : BlockTag ;
19
21
lastTimestamp : number ;
20
22
latestFinalizedBlockNumber : number ;
21
23
tokenRouterAddress : string ;
24
+ swapLayerAddress : string | undefined ;
22
25
rpc : string ;
23
26
provider : ethers . providers . JsonRpcProvider ;
24
- parser : TokenRouterParser ;
27
+ tokenRouterParser : TokenRouterParser ;
28
+ swapLayerParser : SwapLayerParser | null ;
25
29
pg : Knex | null = null ;
26
30
27
31
constructor (
@@ -35,9 +39,13 @@ export class FTEVMWatcher extends Watcher {
35
39
this . latestFinalizedBlockNumber = 0 ;
36
40
this . finalizedBlockTag = finalizedBlockTag ;
37
41
this . tokenRouterAddress = FAST_TRANSFER_CONTRACTS [ network ] ?. [ chain ] ?. TokenRouter ! ;
42
+ this . swapLayerAddress = FAST_TRANSFER_CONTRACTS [ network ] ?. [ chain ] ?. SwapLayer ;
38
43
this . provider = new ethers . providers . JsonRpcProvider ( RPCS_BY_CHAIN [ network ] [ chain ] ) ;
39
44
this . rpc = RPCS_BY_CHAIN [ this . network ] [ this . chain ] ! ;
40
- this . parser = new TokenRouterParser ( this . network , chain , this . provider ) ;
45
+ this . tokenRouterParser = new TokenRouterParser ( this . network , chain , this . provider ) ;
46
+ this . swapLayerParser = this . swapLayerAddress
47
+ ? new SwapLayerParser ( this . provider , this . swapLayerAddress )
48
+ : null ;
41
49
this . logger . debug ( 'FTWatcher' , network , chain , finalizedBlockTag ) ;
42
50
// hacky way to not connect to the db in tests
43
51
// this is to allow ci to run without a db
@@ -124,61 +132,84 @@ export class FTEVMWatcher extends Watcher {
124
132
}
125
133
126
134
async getFtMessagesForBlocks ( fromBlock : number , toBlock : number ) : Promise < string > {
127
- const { results, lastBlockTime } = await this . parser . getFTResultsInRange ( fromBlock , toBlock ) ;
135
+ const tokenRouterPromise = this . tokenRouterParser . getFTResultsInRange ( fromBlock , toBlock ) ;
136
+ const swapLayerPromise = this . swapLayerParser ?. getFTSwapInRange ( fromBlock , toBlock ) || [ ] ;
137
+
138
+ const [ tokenRouterResults , swapLayerResults ] = await Promise . all ( [
139
+ tokenRouterPromise ,
140
+ swapLayerPromise ,
141
+ ] ) ;
142
+
143
+ if ( tokenRouterResults . results . length ) {
144
+ await this . saveBatch (
145
+ tokenRouterResults . results ,
146
+ 'market_orders' ,
147
+ 'fast_vaa_id' ,
148
+ fromBlock ,
149
+ toBlock
150
+ ) ;
151
+ }
128
152
129
- if ( results . length ) {
130
- await this . saveFastTransfers ( results , fromBlock , toBlock ) ;
153
+ if ( swapLayerResults . length ) {
154
+ await this . saveBatch ( swapLayerResults , 'redeem_swaps' , 'fill_vaa_id' , fromBlock , toBlock ) ;
131
155
}
156
+
157
+ // we do not need to compare the lastBlockTime from tokenRouter and swapLayer as they both use toBlock
158
+ const lastBlockTime = tokenRouterResults . lastBlockTime ;
132
159
return makeBlockKey ( toBlock . toString ( ) , lastBlockTime . toString ( ) ) ;
133
160
}
134
161
135
- // saves fast transfers in smaller batches to reduce the impact in any case anything fails
162
+ // saves items in smaller batches to reduce the impact in any case anything fails
136
163
// retry with exponential backoff is used here
137
- async saveFastTransfers (
138
- fastTransfers : MarketOrder [ ] ,
139
- fromBlock : number ,
140
- toBlock : number
164
+ private async saveBatch < T > (
165
+ items : T [ ] ,
166
+ tableName : string ,
167
+ conflictColumn : string ,
168
+ fromBlock ?: number ,
169
+ toBlock ?: number
141
170
) : Promise < void > {
142
171
if ( ! this . pg ) {
143
172
return ;
144
173
}
145
174
146
175
const batchSize = 50 ;
147
176
const maxRetries = 3 ;
148
- const totalBatches = Math . ceil ( fastTransfers . length / batchSize ) ;
177
+ const totalBatches = Math . ceil ( items . length / batchSize ) ;
149
178
150
- this . logger . debug (
151
- `Attempting to save ${ fastTransfers . length } fast transfers in batches of ${ batchSize } `
152
- ) ;
179
+ this . logger . debug ( `Attempting to save ${ items . length } ${ tableName } in batches of ${ batchSize } ` ) ;
153
180
154
- for ( let batchIndex = 0 ; batchIndex < fastTransfers . length ; batchIndex += batchSize ) {
155
- const batch = fastTransfers . slice ( batchIndex , batchIndex + batchSize ) ;
181
+ for ( let batchIndex = 0 ; batchIndex < items . length ; batchIndex += batchSize ) {
182
+ const batch = items . slice ( batchIndex , batchIndex + batchSize ) ;
156
183
const batchNumber = Math . floor ( batchIndex / batchSize ) + 1 ;
157
184
158
185
for ( let attempt = 1 ; attempt <= maxRetries ; attempt ++ ) {
159
186
try {
160
- await this . pg ( 'market_orders' ) . insert ( batch ) . onConflict ( 'fast_vaa_id' ) . merge ( ) ;
187
+ await this . pg ( tableName ) . insert ( batch ) . onConflict ( conflictColumn ) . merge ( ) ;
161
188
this . logger . info (
162
- `Successfully saved batch ${ batchNumber } /${ totalBatches } (${ batch . length } transfers )`
189
+ `Successfully saved batch ${ batchNumber } /${ totalBatches } (${ batch . length } ${ tableName } )`
163
190
) ;
164
191
break ;
165
192
} catch ( e ) {
166
193
if ( attempt === maxRetries ) {
194
+ const errorMessage = `Failed to save batch ${ batchNumber } /${ totalBatches } of ${ tableName } after ${ maxRetries } attempts` ;
167
195
this . logger . error (
168
- `Failed to save batch ${ batchNumber } /${ totalBatches } from block ${ fromBlock } - ${ toBlock } after ${ maxRetries } attempts` ,
196
+ fromBlock && toBlock
197
+ ? `${ errorMessage } from block ${ fromBlock } - ${ toBlock } `
198
+ : errorMessage ,
169
199
e
170
200
) ;
171
201
} else {
172
- // Wait before retrying (exponential backoff)
173
202
this . logger . warn (
174
- `Attempt ${ attempt } failed for batch ${ batchNumber } /${ totalBatches } . Retrying...`
203
+ `Attempt ${ attempt } failed for batch ${ batchNumber } /${ totalBatches } of ${ tableName } . Retrying...`
175
204
) ;
176
205
await sleep ( 1000 * Math . pow ( 2 , attempt - 1 ) ) ;
177
206
}
178
207
}
179
208
}
180
209
}
181
- this . logger . info ( `Completed saving fast transfers from block ${ fromBlock } - ${ toBlock } ` ) ;
210
+ this . logger . info (
211
+ `Completed saving ${ items . length } ${ tableName } from ${ fromBlock } to ${ toBlock } `
212
+ ) ;
182
213
}
183
214
}
184
215
0 commit comments