Skip to content

Commit

Permalink
optimize getLogs
Browse files Browse the repository at this point in the history
  • Loading branch information
codchen committed Dec 4, 2024
1 parent d125382 commit e671e3e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 20 deletions.
53 changes: 34 additions & 19 deletions evmrpc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ type LogFetcher struct {
includeSyntheticReceipts bool
}

func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCriteria, lastToHeight int64) ([]*ethtypes.Log, int64, error) {
func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCriteria, lastToHeight int64) (res []*ethtypes.Log, end int64, err error) {
bloomIndexes := EncodeFilters(crit.Addresses, crit.Topics)
if crit.BlockHash != nil {
block, err := blockByHashWithRetry(ctx, f.tmClient, crit.BlockHash[:], 1)
Expand Down Expand Up @@ -316,25 +316,45 @@ func (f *LogFetcher) GetLogsByFilters(ctx context.Context, crit filters.FilterCr
return nil, 0, fmt.Errorf("fromBlock %d is after toBlock %d", begin, end)
}
blockHeights := f.FindBlockesByBloom(begin, end, bloomIndexes)
res := []*ethtypes.Log{}
for _, height := range blockHeights {
h := height
block, err := blockByNumberWithRetry(ctx, f.tmClient, &h, 1)
if err != nil {
return nil, 0, err
}
res = append(res, f.GetLogsForBlock(ctx, block, crit, bloomIndexes)...)
if applyOpenEndedLogLimit && int64(len(res)) >= f.filterConfig.maxLog {
res = res[:int(f.filterConfig.maxLog)]
break
mu := sync.Mutex{}
wg := sync.WaitGroup{}
total := 0
res = []*ethtypes.Log{}
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("%s", e)
}
}()
slots := make([][]*ethtypes.Log, len(blockHeights))
for i, height := range blockHeights {
wg.Add(1)
h := height
i := i
go func() {
defer wg.Done()
block, err := blockByNumberWithRetry(ctx, f.tmClient, &h, 1)
if err != nil {
panic(err)
}
slots[i] = f.GetLogsForBlock(ctx, block, crit, bloomIndexes)
mu.Lock()
defer mu.Unlock()
total += len(slots[i])
if applyOpenEndedLogLimit && int64(total) > f.filterConfig.maxLog {
panic(fmt.Sprintf("number of logs %d exceeded max limit %d", total, f.filterConfig.maxLog))
}
}()

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}
wg.Wait()
for _, logs := range slots {
res = append(res, logs...)
}

return res, end, nil
}

func (f *LogFetcher) GetLogsForBlock(ctx context.Context, block *coretypes.ResultBlock, crit filters.FilterCriteria, filters [][]bloomIndexes) []*ethtypes.Log {
possibleLogs := f.FindLogsByBloom(block.Block.Height, filters)
possibleLogs := f.FindLogsByBloom(block, filters)
matchedLogs := utils.Filter(possibleLogs, func(l *ethtypes.Log) bool { return f.IsLogExactMatch(l, crit) })
for _, l := range matchedLogs {
l.BlockHash = common.Hash(block.BlockID.Hash)
Expand All @@ -358,13 +378,8 @@ func (f *LogFetcher) FindBlockesByBloom(begin, end int64, filters [][]bloomIndex
return
}

func (f *LogFetcher) FindLogsByBloom(height int64, filters [][]bloomIndexes) (res []*ethtypes.Log) {
func (f *LogFetcher) FindLogsByBloom(block *coretypes.ResultBlock, filters [][]bloomIndexes) (res []*ethtypes.Log) {
ctx := f.ctxProvider(LatestCtxHeight)
block, err := blockByNumberWithRetry(context.Background(), f.tmClient, &height, 1)
if err != nil {
fmt.Printf("error getting block when querying logs: %s\n", err)
return
}

for _, hash := range getEvmTxHashesFromBlock(block, f.txConfig) {
receipt, err := f.k.GetReceipt(ctx, hash)
Expand Down
2 changes: 1 addition & 1 deletion evmrpc/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func init() {
goodConfig.HTTPPort = TestPort
goodConfig.WSPort = TestWSPort
goodConfig.FilterTimeout = 500 * time.Millisecond
goodConfig.MaxLogNoBlock = 4
goodConfig.MaxLogNoBlock = 100
infoLog, err := log.NewDefaultLogger("text", "info")
if err != nil {
panic(err)
Expand Down

0 comments on commit e671e3e

Please sign in to comment.