Skip to content

Commit

Permalink
Optimize getLogs with parallelization using fixed amount workers (#1974)
Browse files Browse the repository at this point in the history
* optimize getLogs

* optimize getLogs

* Parallelize execution for bloom as well

* Fix build

* Fix unit test

---------

Co-authored-by: Tony Chen <codchen03@gmail.com>
  • Loading branch information
yzang2019 and codchen authored Dec 5, 2024
1 parent d125382 commit 7c8e452
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 39 deletions.
115 changes: 78 additions & 37 deletions evmrpc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"sort"
"sync"
"time"

Expand All @@ -23,6 +25,8 @@ import (

const TxSearchPerPage = 10

const MaxNumOfWorkers = 500

type FilterType byte

const (
Expand Down Expand Up @@ -283,14 +287,14 @@ type LogFetcher struct {
includeSyntheticReceipts bool
}

func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCriteria, lastToHeight int64) ([]*ethtypes.Log, int64, error) {
func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCriteria, lastToHeight int64) (res []*ethtypes.Log, end int64, err error) {
bloomIndexes := EncodeFilters(crit.Addresses, crit.Topics)
if crit.BlockHash != nil {
block, err := blockByHashWithRetry(ctx, f.tmClient, crit.BlockHash[:], 1)
if err != nil {
return nil, 0, err
}
return f.GetLogsForBlock(ctx, block, crit, bloomIndexes), block.Block.Height, nil
return f.GetLogsForBlock(block, crit, bloomIndexes), block.Block.Height, nil
}
applyOpenEndedLogLimit := f.filterConfig.maxLog > 0 && (crit.FromBlock == nil || crit.ToBlock == nil)
latest := f.ctxProvider(LatestCtxHeight).BlockHeight()
Expand All @@ -315,56 +319,93 @@ func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCr
if begin > end {
return nil, 0, fmt.Errorf("fromBlock %d is after toBlock %d", begin, end)
}
blockHeights := f.FindBlockesByBloom(begin, end, bloomIndexes)
res := []*ethtypes.Log{}
for _, height := range blockHeights {
h := height
block, err := blockByNumberWithRetry(ctx, f.tmClient, &h, 1)
if err != nil {
return nil, 0, err

// Parallelize execution
numWorkers := int(math.Min(MaxNumOfWorkers, float64(end-begin+1)))
var wg sync.WaitGroup
tasksChan := make(chan int64, end-begin+1)
resultsChan := make(chan *ethtypes.Log, end-begin+1)
res = []*ethtypes.Log{}
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("%s", e)
}
res = append(res, f.GetLogsForBlock(ctx, block, crit, bloomIndexes)...)
if applyOpenEndedLogLimit && int64(len(res)) >= f.filterConfig.maxLog {
res = res[:int(f.filterConfig.maxLog)]
break
}()
// Send tasks
go func() {
for height := begin; height <= end; height++ {
if height == 0 {
continue // Skip genesis height
}
tasksChan <- height
}
close(tasksChan) // Close the tasks channel to signal workers
}()

// Worker function
worker := func() {
defer wg.Done()
for height := range tasksChan {
if len(crit.Addresses) != 0 || len(crit.Topics) != 0 {
providerCtx := f.ctxProvider(height)
blockBloom := f.k.GetBlockBloom(providerCtx)
if !MatchFilters(blockBloom, bloomIndexes) {
continue
}
}
h := height
block, berr := blockByNumberWithRetry(ctx, f.tmClient, &h, 1)
if berr != nil {
panic(berr)
}
matchedLogs := f.GetLogsForBlock(block, crit, bloomIndexes)
for _, log := range matchedLogs {
resultsChan <- log
}
}
}

// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker()
}

// Collect results
go func() {
wg.Wait()
close(resultsChan) // Close the results channel after workers finish
}()

// Aggregate results into the final slice
for result := range resultsChan {
res = append(res, result)
}

// Sorting res in ascending order
sort.Slice(res, func(i, j int) bool {
return res[i].BlockNumber < res[j].BlockNumber
})

// Apply rate limit
if applyOpenEndedLogLimit && int64(len(res)) >= f.filterConfig.maxLog {
res = res[:int(f.filterConfig.maxLog)]
}

return res, end, nil
}

func (f *LogFetcher) GetLogsForBlock(ctx context.Context, block *coretypes.ResultBlock, crit filters.FilterCriteria, filters [][]bloomIndexes) []*ethtypes.Log {
possibleLogs := f.FindLogsByBloom(block.Block.Height, filters)
func (f *LogFetcher) GetLogsForBlock(block *coretypes.ResultBlock, crit filters.FilterCriteria, filters [][]bloomIndexes) []*ethtypes.Log {
possibleLogs := f.FindLogsByBloom(block, filters)
matchedLogs := utils.Filter(possibleLogs, func(l *ethtypes.Log) bool { return f.IsLogExactMatch(l, crit) })
for _, l := range matchedLogs {
l.BlockHash = common.Hash(block.BlockID.Hash)
}
return matchedLogs
}

func (f *LogFetcher) FindBlockesByBloom(begin, end int64, filters [][]bloomIndexes) (res []int64) {
//TODO: parallelize
for height := begin; height <= end; height++ {
if height == 0 {
// no block bloom on genesis height
continue
}
ctx := f.ctxProvider(height)
blockBloom := f.k.GetBlockBloom(ctx)
if MatchFilters(blockBloom, filters) {
res = append(res, height)
}
}
return
}

func (f *LogFetcher) FindLogsByBloom(height int64, filters [][]bloomIndexes) (res []*ethtypes.Log) {
func (f *LogFetcher) FindLogsByBloom(block *coretypes.ResultBlock, filters [][]bloomIndexes) (res []*ethtypes.Log) {
ctx := f.ctxProvider(LatestCtxHeight)
block, err := blockByNumberWithRetry(context.Background(), f.tmClient, &height, 1)
if err != nil {
fmt.Printf("error getting block when querying logs: %s\n", err)
return
}

for _, hash := range getEvmTxHashesFromBlock(block, f.txConfig) {
receipt, err := f.k.GetReceipt(ctx, hash)
Expand Down
2 changes: 0 additions & 2 deletions evmrpc/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
// ethtypes "github.com/ethereum/go-ethereum/core/types"
// "github.com/sei-protocol/sei-chain/x/evm/types"
"github.com/stretchr/testify/require"
)

Expand Down

0 comments on commit 7c8e452

Please sign in to comment.