@@ -12,30 +12,33 @@ import { Database } from './Database';
12
12
import {
13
13
BigtableMessagesResultRow ,
14
14
BigtableMessagesRow ,
15
+ BigtableSignedVAAsResultRow ,
16
+ BigtableSignedVAAsRow ,
15
17
BigtableVAAsByTxHashRow ,
16
- BigtableVAAsResultRow ,
17
18
VaasByBlock ,
18
19
} from './types' ;
19
20
import {
20
21
makeMessageId ,
21
22
makeVAAsByTxHashRowKey ,
22
- makeVaaId ,
23
23
makeSignedVAAsRowKey ,
24
24
parseMessageId ,
25
25
} from './utils' ;
26
+ import { getSignedVAA } from '../utils/getSignedVAA' ;
26
27
27
28
const WATCH_MISSING_TIMEOUT = 5 * 60 * 1000 ;
28
29
29
30
export class BigtableDatabase extends Database {
30
- tableId : string ;
31
+ msgTableId : string ;
32
+ signedVAAsTableId : string ;
31
33
vaasByTxHashTableId : string ;
32
34
instanceId : string ;
33
35
bigtable : Bigtable ;
34
36
firestoreDb : FirebaseFirestore . Firestore ;
35
37
latestCollectionName : string ;
36
38
constructor ( ) {
37
39
super ( ) ;
38
- this . tableId = assertEnvironmentVariable ( 'BIGTABLE_TABLE_ID' ) ;
40
+ this . msgTableId = assertEnvironmentVariable ( 'BIGTABLE_TABLE_ID' ) ;
41
+ this . signedVAAsTableId = assertEnvironmentVariable ( 'BIGTABLE_SIGNED_VAAS_TABLE_ID' ) ;
39
42
this . vaasByTxHashTableId = assertEnvironmentVariable ( 'BIGTABLE_VAAS_BY_TX_HASH_TABLE_ID' ) ;
40
43
this . instanceId = assertEnvironmentVariable ( 'BIGTABLE_INSTANCE_ID' ) ;
41
44
this . latestCollectionName = assertEnvironmentVariable ( 'FIRESTORE_LATEST_COLLECTION' ) ;
@@ -92,7 +95,7 @@ export class BigtableDatabase extends Database {
92
95
const chainId = coalesceChainId ( chain ) ;
93
96
const filteredBlocks = BigtableDatabase . filterEmptyBlocks ( vaasByBlock ) ;
94
97
const instance = this . bigtable . instance ( this . instanceId ) ;
95
- const table = instance . table ( this . tableId ) ;
98
+ const table = instance . table ( this . msgTableId ) ;
96
99
const vaasByTxHashTable = instance . table ( this . vaasByTxHashTableId ) ;
97
100
const rowsToInsert : BigtableMessagesRow [ ] = [ ] ;
98
101
const vaasByTxHash : { [ key : string ] : string [ ] } = { } ;
@@ -122,7 +125,7 @@ export class BigtableDatabase extends Database {
122
125
} ,
123
126
} ,
124
127
} ) ;
125
- const txHashRowKey = makeVAAsByTxHashRowKey ( txHash , chain ) ;
128
+ const txHashRowKey = makeVAAsByTxHashRowKey ( txHash , chainId ) ;
126
129
const vaaRowKey = makeSignedVAAsRowKey ( chainId , emitter , seq ) ;
127
130
vaasByTxHash [ txHashRowKey ] = [ ...( vaasByTxHash [ txHashRowKey ] || [ ] ) , vaaRowKey ] ;
128
131
} ) ;
@@ -154,7 +157,7 @@ export class BigtableDatabase extends Database {
154
157
155
158
async updateMessageStatuses ( messageKeys : string [ ] , value : number = 1 ) : Promise < void > {
156
159
const instance = this . bigtable . instance ( this . instanceId ) ;
157
- const table = instance . table ( this . tableId ) ;
160
+ const table = instance . table ( this . msgTableId ) ;
158
161
const chunkedMessageKeys = chunkArray ( messageKeys , 1000 ) ;
159
162
for ( const chunk of chunkedMessageKeys ) {
160
163
const rowsToInsert : BigtableMessagesRow [ ] = chunk . map ( ( id ) => ( {
@@ -175,7 +178,7 @@ export class BigtableDatabase extends Database {
175
178
176
179
async fetchMissingVaaMessages ( ) : Promise < BigtableMessagesResultRow [ ] > {
177
180
const instance = this . bigtable . instance ( this . instanceId ) ;
178
- const messageTable = instance . table ( this . tableId ) ;
181
+ const messageTable = instance . table ( this . msgTableId ) ;
179
182
// TODO: how to filter to only messages with hasSignedVaa === 0
180
183
const observedMessages = ( await messageTable . getRows ( ) ) [ 0 ] as BigtableMessagesResultRow [ ] ;
181
184
const missingVaaMessages = observedMessages . filter (
@@ -185,70 +188,95 @@ export class BigtableDatabase extends Database {
185
188
}
186
189
187
190
async watchMissing ( ) : Promise < void > {
188
- const vaaTableId = assertEnvironmentVariable ( 'BIGTABLE_VAA_TABLE_ID' ) ;
189
191
const instance = this . bigtable . instance ( this . instanceId ) ;
190
- const vaaTable = instance . table ( vaaTableId ) ;
192
+ const signedVAAsTable = instance . table ( this . signedVAAsTableId ) ;
191
193
while ( true ) {
192
194
try {
195
+ // this array first stores all of the messages which are missing VAAs
196
+ // messages which we find VAAs for are then pruned from the array
197
+ // lastly we try to fetch VAAs for the messages in the pruned array from the guardians
193
198
const missingVaaMessages = await this . fetchMissingVaaMessages ( ) ;
194
199
const total = missingVaaMessages . length ;
195
200
this . logger . info ( `locating ${ total } messages with hasSignedVAA === 0` ) ;
196
201
let found = 0 ;
197
202
const chunkedVAAIds = chunkArray (
198
203
missingVaaMessages . map ( ( observedMessage ) => {
199
204
const { chain, emitter, sequence } = parseMessageId ( observedMessage . id ) ;
200
- return makeVaaId ( chain , emitter , sequence ) ;
205
+ return makeSignedVAAsRowKey ( chain , emitter , sequence . toString ( ) ) ;
201
206
} ) ,
202
207
1000
203
208
) ;
204
209
let chunkNum = 0 ;
205
- const foundRecords : string [ ] = [ ] ;
210
+ const foundKeys : string [ ] = [ ] ;
206
211
for ( const chunk of chunkedVAAIds ) {
207
212
this . logger . info ( `processing chunk ${ ++ chunkNum } of ${ chunkedVAAIds . length } ` ) ;
208
- const filter = [
209
- {
210
- family : 'QuorumState' ,
211
- column : 'SignedVaa' ,
212
- } ,
213
- ] ;
214
213
const vaaRows = (
215
- await vaaTable . getRows ( {
214
+ await signedVAAsTable . getRows ( {
216
215
keys : chunk ,
217
216
decode : false ,
218
- filter,
219
217
} )
220
- ) [ 0 ] as BigtableVAAsResultRow [ ] ;
218
+ ) [ 0 ] as BigtableSignedVAAsResultRow [ ] ;
221
219
for ( const row of vaaRows ) {
222
220
try {
223
- const vaaBytes = row . data . QuorumState . SignedVAA ?. [ 0 ] . value ;
224
- if ( vaaBytes ) {
225
- const parsed = parseVaa ( vaaBytes ) ;
226
- const matchingIndex = missingVaaMessages . findIndex ( ( observedMessage ) => {
227
- const { chain, emitter, sequence } = parseMessageId ( observedMessage . id ) ;
228
- if (
229
- parsed . emitterChain === chain &&
230
- parsed . emitterAddress . toString ( 'hex' ) === emitter &&
231
- parsed . sequence === sequence
232
- ) {
233
- return true ;
234
- }
235
- } ) ;
236
- if ( matchingIndex !== - 1 ) {
237
- found ++ ;
238
- // remove matches to keep array lean
239
- const [ matching ] = missingVaaMessages . splice ( matchingIndex , 1 ) ;
240
- foundRecords . push ( matching . id ) ;
221
+ const vaaBytes = row . data . info . bytes [ 0 ] . value ;
222
+ const parsed = parseVaa ( vaaBytes ) ;
223
+ const matchingIndex = missingVaaMessages . findIndex ( ( observedMessage ) => {
224
+ const { chain, emitter, sequence } = parseMessageId ( observedMessage . id ) ;
225
+ if (
226
+ parsed . emitterChain === chain &&
227
+ parsed . emitterAddress . toString ( 'hex' ) === emitter &&
228
+ parsed . sequence === sequence
229
+ ) {
230
+ return true ;
241
231
}
232
+ } ) ;
233
+ if ( matchingIndex !== - 1 ) {
234
+ found ++ ;
235
+ // remove matches to keep array lean
236
+ // messages with missing VAAs will be kept in the array
237
+ const [ matching ] = missingVaaMessages . splice ( matchingIndex , 1 ) ;
238
+ foundKeys . push ( matching . id ) ;
242
239
}
243
240
} catch ( e ) { }
244
241
}
245
242
}
246
243
this . logger . info ( `processed ${ total } messages, found ${ found } , missing ${ total - found } ` ) ;
247
- this . updateMessageStatuses ( foundRecords ) ;
244
+ this . updateMessageStatuses ( foundKeys ) ;
245
+ // attempt to fetch VAAs missing from messages from the guardians and store them
246
+ // this is useful for cases where the VAA doesn't exist in the `signedVAAsTable` (perhaps due to an outage) but is available
247
+ const missingSignedVAARows : BigtableSignedVAAsRow [ ] = [ ] ;
248
+ for ( const msg of missingVaaMessages ) {
249
+ const { chain, emitter, sequence } = parseMessageId ( msg . id ) ;
250
+ const seq = sequence . toString ( ) ;
251
+ const vaaBytes = await getSignedVAA ( chain , emitter , seq ) ;
252
+ if ( vaaBytes ) {
253
+ const key = makeSignedVAAsRowKey ( chain , emitter , seq ) ;
254
+ missingSignedVAARows . push ( {
255
+ key,
256
+ data : {
257
+ info : {
258
+ bytes : { value : vaaBytes , timestamp : '0' } ,
259
+ } ,
260
+ } ,
261
+ } ) ;
262
+ }
263
+ }
264
+ this . storeSignedVAAs ( missingSignedVAARows ) ;
265
+ // TODO: add slack message alerts
248
266
} catch ( e ) {
249
267
this . logger . error ( e ) ;
250
268
}
251
269
await sleep ( WATCH_MISSING_TIMEOUT ) ;
252
270
}
253
271
}
272
+
273
+ async storeSignedVAAs ( rows : BigtableSignedVAAsRow [ ] ) : Promise < void > {
274
+ const instance = this . bigtable . instance ( this . instanceId ) ;
275
+ const table = instance . table ( this . signedVAAsTableId ) ;
276
+ const chunks = chunkArray ( rows , 1000 ) ;
277
+ for ( const chunk of chunks ) {
278
+ await table . insert ( chunk ) ;
279
+ this . logger . info ( `wrote ${ chunk . length } signed VAAs to the ${ this . signedVAAsTableId } table` ) ;
280
+ }
281
+ }
254
282
}
0 commit comments