From e671e3ef0d7ba1cb8cbcd53d665a8dc6bbe701da Mon Sep 17 00:00:00 2001 From: Tony Chen Date: Wed, 4 Dec 2024 11:17:08 +0800 Subject: [PATCH] optimize getLogs --- evmrpc/filter.go | 53 ++++++++++++++++++++++++++++---------------- evmrpc/setup_test.go | 2 +- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/evmrpc/filter.go b/evmrpc/filter.go index 9bda11057e..f302658f4e 100644 --- a/evmrpc/filter.go +++ b/evmrpc/filter.go @@ -283,7 +283,7 @@ type LogFetcher struct { includeSyntheticReceipts bool } -func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCriteria, lastToHeight int64) ([]*ethtypes.Log, int64, error) { +func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCriteria, lastToHeight int64) (res []*ethtypes.Log, end int64, err error) { bloomIndexes := EncodeFilters(crit.Addresses, crit.Topics) if crit.BlockHash != nil { block, err := blockByHashWithRetry(ctx, f.tmClient, crit.BlockHash[:], 1) @@ -316,25 +316,45 @@ func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCr return nil, 0, fmt.Errorf("fromBlock %d is after toBlock %d", begin, end) } blockHeights := f.FindBlockesByBloom(begin, end, bloomIndexes) - res := []*ethtypes.Log{} - for _, height := range blockHeights { - h := height - block, err := blockByNumberWithRetry(ctx, f.tmClient, &h, 1) - if err != nil { - return nil, 0, err - } - res = append(res, f.GetLogsForBlock(ctx, block, crit, bloomIndexes)...) - if applyOpenEndedLogLimit && int64(len(res)) >= f.filterConfig.maxLog { - res = res[:int(f.filterConfig.maxLog)] - break + mu := sync.Mutex{} + wg := sync.WaitGroup{} + total := 0 + res = []*ethtypes.Log{} + defer func() { + if e := recover(); e != nil { + err = fmt.Errorf("%s", e) } + }() + slots := make([][]*ethtypes.Log, len(blockHeights)) + for i, height := range blockHeights { + wg.Add(1) + h := height + i := i + go func() { + defer wg.Done() + block, err := blockByNumberWithRetry(ctx, f.tmClient, &h, 1) + if err != nil { + panic(err) + } + slots[i] = f.GetLogsForBlock(ctx, block, crit, bloomIndexes) + mu.Lock() + defer mu.Unlock() + total += len(slots[i]) + if applyOpenEndedLogLimit && int64(total) > f.filterConfig.maxLog { + panic(fmt.Sprintf("number of logs %d exceeded max limit %d", total, f.filterConfig.maxLog)) + } + }() + } + wg.Wait() + for _, logs := range slots { + res = append(res, logs...) } return res, end, nil } func (f *LogFetcher) GetLogsForBlock(ctx context.Context, block *coretypes.ResultBlock, crit filters.FilterCriteria, filters [][]bloomIndexes) []*ethtypes.Log { - possibleLogs := f.FindLogsByBloom(block.Block.Height, filters) + possibleLogs := f.FindLogsByBloom(block, filters) matchedLogs := utils.Filter(possibleLogs, func(l *ethtypes.Log) bool { return f.IsLogExactMatch(l, crit) }) for _, l := range matchedLogs { l.BlockHash = common.Hash(block.BlockID.Hash) @@ -358,13 +378,8 @@ func (f *LogFetcher) FindBlockesByBloom(begin, end int64, filters [][]bloomIndex return } -func (f *LogFetcher) FindLogsByBloom(height int64, filters [][]bloomIndexes) (res []*ethtypes.Log) { +func (f *LogFetcher) FindLogsByBloom(block *coretypes.ResultBlock, filters [][]bloomIndexes) (res []*ethtypes.Log) { ctx := f.ctxProvider(LatestCtxHeight) - block, err := blockByNumberWithRetry(context.Background(), f.tmClient, &height, 1) - if err != nil { - fmt.Printf("error getting block when querying logs: %s\n", err) - return - } for _, hash := range getEvmTxHashesFromBlock(block, f.txConfig) { receipt, err := f.k.GetReceipt(ctx, hash) diff --git a/evmrpc/setup_test.go b/evmrpc/setup_test.go index 0c8d74e2af..7efd80c3bd 100644 --- a/evmrpc/setup_test.go +++ b/evmrpc/setup_test.go @@ -469,7 +469,7 @@ func init() { goodConfig.HTTPPort = TestPort goodConfig.WSPort = TestWSPort goodConfig.FilterTimeout = 500 * time.Millisecond - goodConfig.MaxLogNoBlock = 4 + goodConfig.MaxLogNoBlock = 100 infoLog, err := log.NewDefaultLogger("text", "info") if err != nil { panic(err)