Skip to content

Commit 2fe3486

Browse files
committed
hum: address pr
Signed-off-by: bingyuyap <bingyu.yap.21@gmail.com>
1 parent 4c7985b commit 2fe3486

File tree

3 files changed

+55
-30
lines changed

3 files changed

+55
-30
lines changed

fly/cmd/historical_uptime/main.go

+39-8
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ import (
55
"fmt"
66
"log"
77
"os"
8+
"os/signal"
89
"strconv"
910
"strings"
11+
"sync"
12+
"syscall"
1013
"time"
1114

1215
node_common "github.com/certusone/wormhole/node/pkg/common"
@@ -288,6 +291,7 @@ func main() {
288291
if err != nil {
289292
logger.Fatal("Failed to create bigtable db", zap.Error(err))
290293
}
294+
291295
promErrC := make(chan error)
292296
// Start Prometheus scraper
293297
initPromScraper(promRemoteURL, logger, promErrC)
@@ -307,15 +311,35 @@ func main() {
307311
batchSize := 100
308312
observationBatch := make([]*types.Observation, 0, batchSize)
309313

314+
sigChan := make(chan os.Signal, 1)
315+
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
316+
317+
// to make sure that we wait til observation cleanup is done
318+
var wg sync.WaitGroup
319+
320+
// rootCtx might not cancel if shutdown abruptly
321+
go func() {
322+
<-sigChan
323+
logger.Info("Received signal, initiating shutdown")
324+
rootCtxCancel()
325+
}()
326+
327+
wg.Add(1)
310328
go func() {
329+
defer wg.Done()
311330
ticker := time.NewTicker(5 * time.Second)
331+
defer ticker.Stop()
332+
312333
for {
313334
select {
314335
case <-rootCtx.Done():
336+
if len(observationBatch) > 0 {
337+
historical_uptime.ProcessObservationBatch(*db, logger, observationBatch)
338+
}
339+
logger.Info("Observation cleanup completed.")
315340
return
316-
case o := <-obsvC:
341+
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
317342
obs := historical_uptime.CreateNewObservation(o.Msg.MessageId, o.Msg.Addr, o.Timestamp, o.Msg.Addr)
318-
319343
observationBatch = append(observationBatch, obs)
320344

321345
// if it reaches batchSize then process this batch
@@ -324,13 +348,16 @@ func main() {
324348
observationBatch = observationBatch[:0] // Clear the batch
325349
}
326350
case batch := <-batchObsvC:
327-
// process immediately since batches are in group
328-
batchObservations := make([]*types.Observation, 0, len(batch.Msg.Observations))
329351
for _, signedObs := range batch.Msg.Observations {
330352
obs := historical_uptime.CreateNewObservation(signedObs.MessageId, signedObs.Signature, batch.Timestamp, signedObs.TxHash)
331-
batchObservations = append(batchObservations, obs)
353+
observationBatch = append(observationBatch, obs)
354+
355+
// if it reaches batchSize then process this batch
356+
if len(observationBatch) >= batchSize {
357+
historical_uptime.ProcessObservationBatch(*db, logger, observationBatch)
358+
observationBatch = observationBatch[:0] // Clear the batch
359+
}
332360
}
333-
historical_uptime.ProcessObservationBatch(*db, logger, batchObservations)
334361

335362
case <-ticker.C:
336363
// for every interval, process the batch
@@ -421,8 +448,12 @@ func main() {
421448
supervisor.WithPropagatePanic)
422449

423450
<-rootCtx.Done()
424-
logger.Info("root context cancelled, exiting...")
425-
// TODO: wait for things to shut down gracefully
451+
logger.Info("Root context cancelled, starting cleanup...")
452+
453+
// Wait for all goroutines to complete their cleanup
454+
wg.Wait()
455+
456+
logger.Info("All cleanup completed. Exiting...")
426457
}
427458

428459
func monitorChannelCapacity[T any](ctx context.Context, logger *zap.Logger, channelName string, ch <-chan T) {

fly/pkg/bigtable/cache.go

+8-12
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ func (c *ObservationCache) RUnlock() {
3434
c.mu.RUnlock()
3535
}
3636

37+
// NewObservationCache creates and returns a new ObservationCache instance.
38+
func NewObservationCache() *ObservationCache {
39+
return &ObservationCache{
40+
Messages: make(map[types.MessageID]*types.Message),
41+
Observations: make(map[types.MessageID]map[string]*types.Observation),
42+
}
43+
}
44+
3745
// GetMessage retrieves a message from the cache by its ID.
3846
// It returns the message and a boolean indicating whether the message was found.
3947
func (c *ObservationCache) GetMessage(messageID types.MessageID) (*types.Message, bool) {
@@ -50,18 +58,6 @@ func (c *ObservationCache) SetMessage(messageID types.MessageID, message *types.
5058
c.Messages[messageID] = message
5159
}
5260

53-
// GetObservation retrieves an observation from the cache by message ID and guardian address.
54-
// It returns the observation and a boolean indicating whether the observation was found.
55-
func (c *ObservationCache) GetObservation(messageID types.MessageID, guardianAddr string) (*types.Observation, bool) {
56-
c.mu.RLock()
57-
defer c.mu.RUnlock()
58-
if observations, exists := c.Observations[messageID]; exists {
59-
observation, exists := observations[guardianAddr]
60-
return observation, exists
61-
}
62-
return nil, false
63-
}
64-
6561
// SetObservation adds or updates an observation in the cache.
6662
func (c *ObservationCache) SetObservation(messageID types.MessageID, guardianAddr string, observation *types.Observation) {
6763
c.mu.Lock()

fly/pkg/historical_uptime/process_observation.go

+8-10
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ import (
1313
"go.uber.org/zap"
1414
)
1515

16-
var cache = &bigtable.ObservationCache{
17-
Messages: make(map[types.MessageID]*types.Message),
18-
Observations: make(map[types.MessageID]map[string]*types.Observation),
19-
}
16+
var cache = bigtable.NewObservationCache()
2017

2118
// ProcessObservationBatch processes a batch of observations and flushes the cache to the database.
2219
func ProcessObservationBatch(db bigtable.BigtableDB, logger *zap.Logger, batch []*types.Observation) error {
20+
cache.Lock()
21+
defer cache.Unlock()
22+
2323
for _, o := range batch {
24-
ProcessObservation(db, logger, o)
24+
ProcessObservationAlreadyLocked(db, logger, o)
2525
}
2626

2727
return FlushCache(db, logger)
@@ -43,11 +43,9 @@ func FlushCache(db bigtable.BigtableDB, logger *zap.Logger) error {
4343
return nil
4444
}
4545

46-
// ProcessObservation processes a single observation, updating the cache and checking observation times.
47-
func ProcessObservation(db bigtable.BigtableDB, logger *zap.Logger, o *types.Observation) {
48-
cache.Lock()
49-
defer cache.Unlock()
50-
46+
// ProcessObservationAlreadyLocked processes a single observation, updating the cache and checking observation times.
47+
// This function assumes that the cache lock has already been acquired.
48+
func ProcessObservationAlreadyLocked(db bigtable.BigtableDB, logger *zap.Logger, o *types.Observation) {
5149
// Check if the message exists in the cache, if not, try to fetch from the database
5250
message, exists := cache.Messages[o.MessageID]
5351
if !exists {

0 commit comments

Comments
 (0)