Skip to content

Commit 178fd19

Browse files
Genesis sync from existing snapshots (#3087)
1 parent bcc2538 commit 178fd19

36 files changed

+929
-329
lines changed

cmd/hack/hack.go

+65-19
Original file line numberDiff line numberDiff line change
@@ -2589,58 +2589,102 @@ func recsplitWholeChain(chaindata string) error {
25892589

25902590
log.Info("Last body number", "last", last)
25912591
for i := uint64(*block); i < last; i += blocksPerFile {
2592-
fileName := snapshotsync.FileName(i, i+blocksPerFile, snapshotsync.Transactions)
2593-
segmentFile := path.Join(snapshotDir, fileName) + ".seg"
2592+
fileName := snapshotsync.FileName(i, i+blocksPerFile, snapshotsync.Bodies)
2593+
25942594
log.Info("Creating", "file", fileName+".seg")
25952595
db := mdbx.MustOpen(chaindata)
2596-
firstTxID, err := snapshotsync.DumpTxs(db, "", i, int(blocksPerFile))
2597-
if err != nil {
2596+
if err := snapshotsync.DumpBodies(db, "", i, int(blocksPerFile)); err != nil {
25982597
panic(err)
25992598
}
26002599
db.Close()
2600+
segmentFile := path.Join(snapshotDir, fileName) + ".seg"
26012601
if err := compress1(chaindata, fileName, segmentFile); err != nil {
26022602
panic(err)
26032603
}
2604-
if err := snapshotsync.TransactionsHashIdx(*chainID, firstTxID, segmentFile); err != nil {
2605-
panic(err)
2606-
}
2604+
//if err := snapshotsync.BodiesIdx(segmentFile, i); err != nil {
2605+
// panic(err)
2606+
//}
26072607
_ = os.Remove(fileName + ".dat")
26082608

26092609
fileName = snapshotsync.FileName(i, i+blocksPerFile, snapshotsync.Headers)
2610-
segmentFile = path.Join(snapshotDir, fileName) + ".seg"
26112610
log.Info("Creating", "file", fileName+".seg")
26122611
db = mdbx.MustOpen(chaindata)
26132612
if err := snapshotsync.DumpHeaders(db, "", i, int(blocksPerFile)); err != nil {
26142613
panic(err)
26152614
}
26162615
db.Close()
2616+
segmentFile = path.Join(snapshotDir, fileName) + ".seg"
26172617
if err := compress1(chaindata, fileName, segmentFile); err != nil {
26182618
panic(err)
26192619
}
2620-
2621-
if err := snapshotsync.HeadersHashIdx(segmentFile, i); err != nil {
2622-
panic(err)
2623-
}
2620+
//if err := snapshotsync.HeadersHashIdx(segmentFile, i); err != nil {
2621+
// panic(err)
2622+
//}
26242623
_ = os.Remove(fileName + ".dat")
26252624

2626-
fileName = snapshotsync.FileName(i, i+blocksPerFile, snapshotsync.Bodies)
2627-
segmentFile = path.Join(snapshotDir, fileName) + ".seg"
2625+
fileName = snapshotsync.FileName(i, i+blocksPerFile, snapshotsync.Transactions)
26282626
log.Info("Creating", "file", fileName+".seg")
26292627
db = mdbx.MustOpen(chaindata)
2630-
if err := snapshotsync.DumpBodies(db, "", i, int(blocksPerFile)); err != nil {
2628+
firstTxID, err := snapshotsync.DumpTxs(db, "", i, int(blocksPerFile))
2629+
if err != nil {
26312630
panic(err)
26322631
}
26332632
db.Close()
2633+
segmentFile = path.Join(snapshotDir, fileName) + ".seg"
26342634
if err := compress1(chaindata, fileName, segmentFile); err != nil {
26352635
panic(err)
26362636
}
2637-
if err := snapshotsync.BodiesIdx(segmentFile, i); err != nil {
2638-
panic(err)
2639-
}
2637+
_ = firstTxID
2638+
//if err := snapshotsync.TransactionsHashIdx(*chainID, firstTxID, segmentFile); err != nil {
2639+
// panic(err)
2640+
//}
26402641
_ = os.Remove(fileName + ".dat")
26412642

26422643
//nolint
2643-
break // TODO: remove me - useful for tests
2644+
//break // TODO: remove me - useful for tests
2645+
}
2646+
return nil
2647+
}
2648+
2649+
func checkBlockSnapshot(chaindata string) error {
2650+
database := mdbx.MustOpen(chaindata)
2651+
defer database.Close()
2652+
dataDir := path.Dir(chaindata)
2653+
chainConfig := tool.ChainConfigFromDB(database)
2654+
chainID, _ := uint256.FromBig(chainConfig.ChainID)
2655+
_ = chainID
2656+
2657+
snapshots := snapshotsync.NewAllSnapshots(path.Join(dataDir, "snapshots"), params.KnownSnapshots(chainConfig.ChainName))
2658+
snapshots.ReopenSegments()
2659+
snapshots.ReopenIndices()
2660+
//if err := snapshots.BuildIndices(context.Background(), *chainID); err != nil {
2661+
// panic(err)
2662+
//}
2663+
2664+
snBlockReader := snapshotsync.NewBlockReaderWithSnapshots(snapshots)
2665+
tx, err := database.BeginRo(context.Background())
2666+
if err != nil {
2667+
return err
2668+
}
2669+
defer tx.Rollback()
2670+
2671+
for i := uint64(0); i < snapshots.BlocksAvailable(); i++ {
2672+
hash, err := rawdb.ReadCanonicalHash(tx, i)
2673+
if err != nil {
2674+
return err
2675+
}
2676+
blockFromDB := rawdb.ReadBlock(tx, hash, i)
2677+
blockFromSnapshot, _, err := snBlockReader.BlockWithSenders(context.Background(), tx, hash, i)
2678+
if err != nil {
2679+
return err
2680+
}
2681+
2682+
if blockFromSnapshot.Hash() != blockFromDB.Hash() {
2683+
panic(i)
2684+
}
2685+
if i%1_000 == 0 {
2686+
log.Info(fmt.Sprintf("Block Num: %dK", i/1_000))
2687+
}
26442688
}
26452689
return nil
26462690
}
@@ -3928,6 +3972,8 @@ func main() {
39283972
err = compress1(*chaindata, *name, *name)
39293973
case "recsplitWholeChain":
39303974
err = recsplitWholeChain(*chaindata)
3975+
case "checkBlockSnapshot":
3976+
err = checkBlockSnapshot(*chaindata)
39313977
case "decompress":
39323978
err = decompress(*name)
39333979
case "genstate":

cmd/integration/commands/snapshot_check.go

-3
This file was deleted.

cmd/integration/commands/stages.go

+23-15
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ func stageBodies(db kv.RwDB, ctx context.Context) error {
459459
}
460460

461461
u := sync.NewUnwindState(stages.Senders, s.BlockNumber-unwind, s.BlockNumber)
462-
if err := stagedsync.UnwindBodiesStage(u, tx, stagedsync.StageBodiesCfg(db, nil, nil, nil, nil, 0, *chainConfig, 0), ctx); err != nil {
462+
if err := stagedsync.UnwindBodiesStage(u, tx, stagedsync.StageBodiesCfg(db, nil, nil, nil, nil, 0, *chainConfig, 0, allSnapshots(chainConfig), getBlockReader(chainConfig)), ctx); err != nil {
463463
return err
464464
}
465465

@@ -538,7 +538,7 @@ func stageSenders(db kv.RwDB, ctx context.Context) error {
538538
if err != nil {
539539
return err
540540
}
541-
cfg := stagedsync.StageSendersCfg(db, chainConfig, tmpdir, pm)
541+
cfg := stagedsync.StageSendersCfg(db, chainConfig, tmpdir, pm, allSnapshots(chainConfig))
542542
if unwind > 0 {
543543
u := sync.NewUnwindState(stages.Senders, s.BlockNumber-unwind, s.BlockNumber)
544544
err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx)
@@ -558,6 +558,7 @@ func stageSenders(db kv.RwDB, ctx context.Context) error {
558558
func stageExec(db kv.RwDB, ctx context.Context) error {
559559
pm, engine, chainConfig, vmConfig, sync, _, _ := newSync(ctx, db, nil)
560560
must(sync.SetCurrentStage(stages.Execution))
561+
tmpdir := path.Join(datadir, etl.TmpDirName)
561562

562563
if reset {
563564
genesis, _ := byChain()
@@ -585,7 +586,7 @@ func stageExec(db kv.RwDB, ctx context.Context) error {
585586
pm.TxIndex = prune.Distance(s.BlockNumber - pruneTo)
586587
}
587588

588-
cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, nil, chainConfig, engine, vmConfig, nil, false, tmpDBPath, getBlockReader())
589+
cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, nil, chainConfig, engine, vmConfig, nil, false, tmpdir, getBlockReader(chainConfig))
589590
if unwind > 0 {
590591
u := sync.NewUnwindState(stages.Execution, s.BlockNumber-unwind, s.BlockNumber)
591592
err := stagedsync.UnwindExecutionStage(u, s, nil, ctx, cfg, false)
@@ -615,7 +616,7 @@ func stageExec(db kv.RwDB, ctx context.Context) error {
615616
}
616617

617618
func stageTrie(db kv.RwDB, ctx context.Context) error {
618-
pm, _, _, _, sync, _, _ := newSync(ctx, db, nil)
619+
pm, _, chainConfig, _, sync, _, _ := newSync(ctx, db, nil)
619620
must(sync.SetCurrentStage(stages.IntermediateHashes))
620621
tmpdir := path.Join(datadir, etl.TmpDirName)
621622

@@ -644,7 +645,7 @@ func stageTrie(db kv.RwDB, ctx context.Context) error {
644645

645646
log.Info("StageExec", "progress", execStage.BlockNumber)
646647
log.Info("StageTrie", "progress", s.BlockNumber)
647-
cfg := stagedsync.StageTrieCfg(db, true, true, tmpdir, getBlockReader())
648+
cfg := stagedsync.StageTrieCfg(db, true, true, tmpdir, getBlockReader(chainConfig))
648649
if unwind > 0 {
649650
u := sync.NewUnwindState(stages.IntermediateHashes, s.BlockNumber-unwind, s.BlockNumber)
650651
if err := stagedsync.UnwindIntermediateHashesStage(u, s, tx, cfg, ctx); err != nil {
@@ -911,7 +912,7 @@ func stageHistory(db kv.RwDB, ctx context.Context) error {
911912
func stageTxLookup(db kv.RwDB, ctx context.Context) error {
912913
tmpdir := path.Join(datadir, etl.TmpDirName)
913914

914-
pm, _, _, _, sync, _, _ := newSync(ctx, db, nil)
915+
pm, _, chainConfig, _, sync, _, _ := newSync(ctx, db, nil)
915916
must(sync.SetCurrentStage(stages.TxLookup))
916917

917918
tx, err := db.BeginRw(ctx)
@@ -936,7 +937,7 @@ func stageTxLookup(db kv.RwDB, ctx context.Context) error {
936937
}
937938
log.Info("Stage", "name", s.ID, "progress", s.BlockNumber)
938939

939-
cfg := stagedsync.StageTxLookupCfg(db, pm, tmpdir)
940+
cfg := stagedsync.StageTxLookupCfg(db, pm, tmpdir, allSnapshots(chainConfig))
940941
if unwind > 0 {
941942
u := sync.NewUnwindState(stages.TxLookup, s.BlockNumber-unwind, s.BlockNumber)
942943
err = stagedsync.UnwindTxLookup(u, s, tx, cfg, ctx)
@@ -1022,21 +1023,28 @@ func byChain() (*core.Genesis, *params.ChainConfig) {
10221023
var openSnapshotOnce sync.Once
10231024
var _allSnapshotsSingleton *snapshotsync.AllSnapshots
10241025

1025-
func allSnapshots() *snapshotsync.AllSnapshots {
1026+
func allSnapshots(cc *params.ChainConfig) *snapshotsync.AllSnapshots {
10261027
openSnapshotOnce.Do(func() {
1027-
snapshotCfg := ethconfig.Snapshot{}
10281028
if enableSnapshot {
1029-
snapshotCfg.Enabled = true
1030-
snapshotCfg.Dir = path.Join(datadir, "snapshots")
1031-
_allSnapshotsSingleton = snapshotsync.MustOpenAll(snapshotCfg.Dir)
1029+
snapshotCfg := ethconfig.Snapshot{
1030+
Enabled: true,
1031+
Dir: path.Join(datadir, "snapshots"),
1032+
}
1033+
_allSnapshotsSingleton = snapshotsync.NewAllSnapshots(snapshotCfg.Dir, params.KnownSnapshots(cc.ChainName))
1034+
if err := _allSnapshotsSingleton.ReopenSegments(); err != nil {
1035+
panic(err)
1036+
}
1037+
if err := _allSnapshotsSingleton.ReopenIndices(); err != nil {
1038+
panic(err)
1039+
}
10321040
}
10331041
})
10341042
return _allSnapshotsSingleton
10351043
}
10361044

1037-
func getBlockReader() (blockReader interfaces.FullBlockReader) {
1045+
func getBlockReader(cc *params.ChainConfig) (blockReader interfaces.FullBlockReader) {
10381046
blockReader = snapshotsync.NewBlockReader()
1039-
if sn := allSnapshots(); sn != nil {
1047+
if sn := allSnapshots(cc); sn != nil {
10401048
blockReader = snapshotsync.NewBlockReaderWithSnapshots(sn)
10411049
}
10421050
return blockReader
@@ -1110,7 +1118,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
11101118
stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, tmpdir),
11111119
stagedsync.StageMiningExecCfg(db, miner, events, *chainConfig, engine, &vm.Config{}, tmpdir),
11121120
stagedsync.StageHashStateCfg(db, tmpdir),
1113-
stagedsync.StageTrieCfg(db, false, true, tmpdir, getBlockReader()),
1121+
stagedsync.StageTrieCfg(db, false, true, tmpdir, getBlockReader(chainConfig)),
11141122
stagedsync.StageMiningFinishCfg(db, *chainConfig, engine, miner, ctx.Done()),
11151123
),
11161124
stagedsync.MiningUnwindOrder,

cmd/integration/commands/state_stages.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.
180180
stateStages.DisableStages(stages.Headers, stages.BlockHashes, stages.Bodies, stages.Senders,
181181
stages.Finish)
182182

183-
execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, changeSetHook, chainConfig, engine, vmConfig, nil, false, tmpDir, getBlockReader())
183+
execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, changeSetHook, chainConfig, engine, vmConfig, nil, false, tmpDir, getBlockReader(chainConfig))
184184

185185
execUntilFunc := func(execToBlock uint64) func(firstCycle bool, badBlockUnwind bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error {
186186
return func(firstCycle bool, badBlockUnwind bool, s *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error {
@@ -404,7 +404,7 @@ func checkMinedBlock(b1, b2 *types.Block, chainConfig *params.ChainConfig) {
404404
}
405405

406406
func loopIh(db kv.RwDB, ctx context.Context, unwind uint64) error {
407-
_, _, _, _, sync, _, _ := newSync(ctx, db, nil)
407+
_, _, chainConfig, _, sync, _, _ := newSync(ctx, db, nil)
408408
tmpdir := path.Join(datadir, etl.TmpDirName)
409409
tx, err := db.BeginRw(ctx)
410410
if err != nil {
@@ -425,7 +425,7 @@ func loopIh(db kv.RwDB, ctx context.Context, unwind uint64) error {
425425
}
426426
_ = sync.SetCurrentStage(stages.IntermediateHashes)
427427
u = &stagedsync.UnwindState{ID: stages.IntermediateHashes, UnwindPoint: to}
428-
if err = stagedsync.UnwindIntermediateHashesStage(u, stage(sync, tx, nil, stages.IntermediateHashes), tx, stagedsync.StageTrieCfg(db, true, true, tmpdir, getBlockReader()), ctx); err != nil {
428+
if err = stagedsync.UnwindIntermediateHashesStage(u, stage(sync, tx, nil, stages.IntermediateHashes), tx, stagedsync.StageTrieCfg(db, true, true, tmpdir, getBlockReader(chainConfig)), ctx); err != nil {
429429
return err
430430
}
431431
must(tx.Commit())
@@ -470,6 +470,7 @@ func loopIh(db kv.RwDB, ctx context.Context, unwind uint64) error {
470470

471471
func loopExec(db kv.RwDB, ctx context.Context, unwind uint64) error {
472472
pm, engine, chainConfig, vmConfig, sync, _, _ := newSync(ctx, db, nil)
473+
tmpdir := path.Join(datadir, etl.TmpDirName)
473474

474475
tx, err := db.BeginRw(ctx)
475476
if err != nil {
@@ -489,7 +490,7 @@ func loopExec(db kv.RwDB, ctx context.Context, unwind uint64) error {
489490
from := progress(tx, stages.Execution)
490491
to := from + unwind
491492

492-
cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, nil, chainConfig, engine, vmConfig, nil, false, tmpDBPath, getBlockReader())
493+
cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, nil, chainConfig, engine, vmConfig, nil, false, tmpdir, getBlockReader(chainConfig))
493494

494495
// set block limit of execute stage
495496
sync.MockExecFunc(stages.Execution, func(firstCycle bool, badBlockUnwind bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error {

cmd/rpcdaemon/README.md

+5-5
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,11 @@ The daemon should respond with something like:
6464
INFO [date-time] HTTP endpoint opened url=localhost:8545...
6565
```
6666

67-
When RPC daemon runs remotely, by default it maintains a state cache, which is updated every time when Erigon
68-
imports a new block. When state cache is reasonably warm, it allows such remote RPC daemon to execute queries
69-
related to `latest` block (i.e. to current state) with comparable performance to a local RPC daemon
70-
(around 2x slower vs 10x slower without state cache). Since there can be multiple such RPC daemons per one
71-
Erigon node, it may scale well for some workloads that are heavy on the current state queries.
67+
When RPC daemon runs remotely, by default it maintains a state cache, which is updated every time when Erigon imports a
68+
new block. When state cache is reasonably warm, it allows such remote RPC daemon to execute queries related to `latest`
69+
block (i.e. to current state) with comparable performance to a local RPC daemon
70+
(around 2x slower vs 10x slower without state cache). Since there can be multiple such RPC daemons per one Erigon node,
71+
it may scale well for some workloads that are heavy on the current state queries.
7272

7373
### Healthcheck
7474

cmd/rpcdaemon/cli/config.go

+21-1
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ import (
2323
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/services"
2424
"github.com/ledgerwatch/erigon/cmd/utils"
2525
"github.com/ledgerwatch/erigon/common/paths"
26+
"github.com/ledgerwatch/erigon/core/rawdb"
2627
"github.com/ledgerwatch/erigon/eth/ethconfig"
2728
"github.com/ledgerwatch/erigon/internal/debug"
2829
"github.com/ledgerwatch/erigon/node"
30+
"github.com/ledgerwatch/erigon/params"
2931
"github.com/ledgerwatch/erigon/rpc"
3032
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
3133
"github.com/ledgerwatch/log/v3"
@@ -246,7 +248,25 @@ func RemoteServices(ctx context.Context, cfg Flags, logger log.Logger, rootCance
246248

247249
if cfg.SingleNodeMode {
248250
if cfg.Snapshot.Enabled {
249-
allSnapshots, err := snapshotsync.OpenAll(cfg.Snapshot.Dir)
251+
var cc *params.ChainConfig
252+
if err := db.View(context.Background(), func(tx kv.Tx) error {
253+
genesisBlock, err := rawdb.ReadBlockByNumber(tx, 0)
254+
if err != nil {
255+
return err
256+
}
257+
cc, err = rawdb.ReadChainConfig(tx, genesisBlock.Hash())
258+
if err != nil {
259+
return err
260+
}
261+
return nil
262+
}); err != nil {
263+
return nil, nil, nil, nil, nil, nil, err
264+
}
265+
if cc == nil {
266+
return nil, nil, nil, nil, nil, nil, fmt.Errorf("chain config not found in db. Need start erigon at least once on this db")
267+
}
268+
269+
allSnapshots := snapshotsync.NewAllSnapshots(cfg.Snapshot.Dir, params.KnownSnapshots(cc.ChainName))
250270
if err != nil {
251271
return nil, nil, nil, nil, nil, nil, err
252272
}

cmd/rpcdaemon/interfaces/interfaces.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@ type BlockReader interface {
1212
BlockWithSenders(ctx context.Context, tx kv.Tx, hash common.Hash, blockHeight uint64) (block *types.Block, senders []common.Address, err error)
1313
}
1414

15+
type HeaderReader interface {
16+
Header(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (*types.Header, error)
17+
HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (*types.Header, error)
18+
}
19+
1520
type FullBlockReader interface {
1621
BlockReader
17-
Header(ctx context.Context, tx kv.Tx, hash common.Hash, blockHeight uint64) (*types.Header, error)
18-
HeaderByNumber(ctx context.Context, tx kv.Tx, blockHeight uint64) (*types.Header, error)
22+
HeaderReader
1923
}

cmd/state/commands/regenerate_txlookup.go

-19
This file was deleted.

0 commit comments

Comments
 (0)