Skip to content

Commit 12cbf4d

Browse files
AlexeyAkhunovAlexey Sharp
and
Alexey Sharp
authored
Ropsten to find correct chain (#2614)
* Ropsten to find correct chain * Cleanup and compile fix * Compile fix * Print duration of the unwind, not the timestamp Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
1 parent fd357f0 commit 12cbf4d

9 files changed

+123
-93
lines changed

cmd/integration/commands/state_stages.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,8 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.
186186

187187
execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, changeSetHook, chainConfig, engine, vmConfig, nil, false, tmpDir)
188188

189-
execUntilFunc := func(execToBlock uint64) func(firstCycle bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error {
190-
return func(firstCycle bool, s *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error {
189+
execUntilFunc := func(execToBlock uint64) func(firstCycle bool, badBlockUnwind bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error {
190+
return func(firstCycle bool, badBlockUnwind bool, s *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error {
191191
if err := stagedsync.SpawnExecuteBlocksStage(s, unwinder, tx, execToBlock, ctx, execCfg, firstCycle); err != nil {
192192
return fmt.Errorf("spawnExecuteBlocksStage: %w", err)
193193
}
@@ -310,7 +310,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.
310310
if miner.MiningConfig.Enabled && nextBlock != nil && nextBlock.Header().Coinbase != (common.Address{}) {
311311
miner.MiningConfig.Etherbase = nextBlock.Header().Coinbase
312312
miner.MiningConfig.ExtraData = nextBlock.Header().Extra
313-
miningStages.MockExecFunc(stages.MiningCreateBlock, func(firstCycle bool, s *stagedsync.StageState, u stagedsync.Unwinder, tx kv.RwTx) error {
313+
miningStages.MockExecFunc(stages.MiningCreateBlock, func(firstCycle bool, badBlockUnwind bool, s *stagedsync.StageState, u stagedsync.Unwinder, tx kv.RwTx) error {
314314
err = stagedsync.SpawnMiningCreateBlockStage(s, tx,
315315
stagedsync.StageMiningCreateBlockCfg(db,
316316
miner,
@@ -501,7 +501,7 @@ func loopExec(db kv.RwDB, ctx context.Context, unwind uint64) error {
501501
cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, nil, chainConfig, engine, vmConfig, nil, false, tmpDBPath)
502502

503503
// set block limit of execute stage
504-
sync.MockExecFunc(stages.Execution, func(firstCycle bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error {
504+
sync.MockExecFunc(stages.Execution, func(firstCycle bool, badBlockUnwind bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error {
505505
if err = stagedsync.SpawnExecuteBlocksStage(stageState, sync, tx, to, ctx, cfg, false); err != nil {
506506
return fmt.Errorf("spawnExecuteBlocksStage: %w", err)
507507
}

eth/stagedsync/default_stages.go

+22-19
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,14 @@ func DefaultStages(ctx context.Context,
3333
{
3434
ID: stages.Headers,
3535
Description: "Download headers",
36-
Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error {
36+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
37+
if badBlockUnwind {
38+
return nil
39+
}
3740
return HeadersForward(s, u, ctx, tx, headers, firstCycle, test)
3841
},
3942
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
40-
return HeadersUnwind(u, s, tx, headers)
43+
return HeadersUnwind(u, s, tx, headers, test)
4144
},
4245
Prune: func(firstCycle bool, p *PruneState, tx kv.RwTx) error {
4346
return HeadersPrune(p, tx, headers, ctx)
@@ -46,7 +49,7 @@ func DefaultStages(ctx context.Context,
4649
{
4750
ID: stages.BlockHashes,
4851
Description: "Write block hashes",
49-
Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error {
52+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
5053
return SpawnBlockHashStage(s, tx, blockHashCfg, ctx)
5154
},
5255
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
@@ -61,7 +64,7 @@ func DefaultStages(ctx context.Context,
6164
Description: "Create headers snapshot",
6265
Disabled: true,
6366
DisabledDescription: "Enable by --snapshot.layout",
64-
Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error {
67+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
6568
return SpawnHeadersSnapshotGenerationStage(s, tx, snapshotHeaders, firstCycle, ctx)
6669
},
6770
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
@@ -74,7 +77,7 @@ func DefaultStages(ctx context.Context,
7477
{
7578
ID: stages.Bodies,
7679
Description: "Download block bodies",
77-
Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error {
80+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
7881
return BodiesForward(s, u, ctx, tx, bodies, test)
7982
},
8083
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
@@ -89,7 +92,7 @@ func DefaultStages(ctx context.Context,
8992
Description: "Create bodies snapshot",
9093
Disabled: true,
9194
DisabledDescription: "Enable by --snapshot.layout",
92-
Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error {
95+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
9396
return SpawnBodiesSnapshotGenerationStage(s, tx, snapshotBodies, ctx)
9497
},
9598
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
@@ -102,7 +105,7 @@ func DefaultStages(ctx context.Context,
102105
{
103106
ID: stages.Senders,
104107
Description: "Recover senders from tx signatures",
105-
Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error {
108+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
106109
return SpawnRecoverSendersStage(senders, s, u, tx, 0, ctx)
107110
},
108111
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
@@ -115,7 +118,7 @@ func DefaultStages(ctx context.Context,
115118
{
116119
ID: stages.Execution,
117120
Description: "Execute blocks w/o hash checks",
118-
Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error {
121+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
119122
return SpawnExecuteBlocksStage(s, u, tx, 0, ctx, exec, firstCycle)
120123
},
121124
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
@@ -130,7 +133,7 @@ func DefaultStages(ctx context.Context,
130133
Description: "Transpile marked EVM contracts to TEVM",
131134
Disabled: !sm.Experiments.TEVM,
132135
DisabledDescription: "Enable by adding `tevm` to --experiments",
133-
Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error {
136+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
134137
return SpawnTranspileStage(s, tx, 0, trans, ctx)
135138
},
136139
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
@@ -145,7 +148,7 @@ func DefaultStages(ctx context.Context,
145148
Description: "Create state snapshot",
146149
Disabled: true,
147150
DisabledDescription: "Enable by --snapshot.layout",
148-
Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error {
151+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
149152
return SpawnStateSnapshotGenerationStage(s, tx, snapshotState, ctx)
150153
},
151154
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
@@ -158,7 +161,7 @@ func DefaultStages(ctx context.Context,
158161
{
159162
ID: stages.HashState,
160163
Description: "Hash the key in the state",
161-
Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error {
164+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
162165
return SpawnHashStateStage(s, tx, hashState, ctx)
163166
},
164167
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
@@ -171,7 +174,7 @@ func DefaultStages(ctx context.Context,
171174
{
172175
ID: stages.IntermediateHashes,
173176
Description: "Generate intermediate hashes and computing state root",
174-
Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error {
177+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
175178
_, err := SpawnIntermediateHashesStage(s, u, tx, trieCfg, ctx)
176179
return err
177180
},
@@ -186,7 +189,7 @@ func DefaultStages(ctx context.Context,
186189
ID: stages.CallTraces,
187190
Description: "Generate call traces index",
188191
DisabledDescription: "Work In Progress",
189-
Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error {
192+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
190193
return SpawnCallTraces(s, tx, callTraces, ctx)
191194
},
192195
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
@@ -199,7 +202,7 @@ func DefaultStages(ctx context.Context,
199202
{
200203
ID: stages.AccountHistoryIndex,
201204
Description: "Generate account history index",
202-
Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error {
205+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
203206
return SpawnAccountHistoryIndex(s, tx, history, ctx)
204207
},
205208
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
@@ -212,7 +215,7 @@ func DefaultStages(ctx context.Context,
212215
{
213216
ID: stages.StorageHistoryIndex,
214217
Description: "Generate storage history index",
215-
Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error {
218+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
216219
return SpawnStorageHistoryIndex(s, tx, history, ctx)
217220
},
218221
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
@@ -225,7 +228,7 @@ func DefaultStages(ctx context.Context,
225228
{
226229
ID: stages.LogIndex,
227230
Description: "Generate receipt logs index",
228-
Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error {
231+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
229232
return SpawnLogIndex(s, tx, logIndex, ctx)
230233
},
231234
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
@@ -238,7 +241,7 @@ func DefaultStages(ctx context.Context,
238241
{
239242
ID: stages.TxLookup,
240243
Description: "Generate tx lookup index",
241-
Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error {
244+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
242245
return SpawnTxLookup(s, tx, txLookup, ctx)
243246
},
244247
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
@@ -251,7 +254,7 @@ func DefaultStages(ctx context.Context,
251254
{
252255
ID: stages.TxPool,
253256
Description: "Update transaction pool",
254-
Forward: func(firstCycle bool, s *StageState, _ Unwinder, tx kv.RwTx) error {
257+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, _ Unwinder, tx kv.RwTx) error {
255258
return SpawnTxPool(s, tx, txPool, ctx)
256259
},
257260
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
@@ -264,7 +267,7 @@ func DefaultStages(ctx context.Context,
264267
{
265268
ID: stages.Finish,
266269
Description: "Final: update current block for the RPC API",
267-
Forward: func(firstCycle bool, s *StageState, _ Unwinder, tx kv.RwTx) error {
270+
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, _ Unwinder, tx kv.RwTx) error {
268271
return FinishForward(s, tx, finish)
269272
},
270273
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {

eth/stagedsync/stage.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
// ExecFunc is the execution function for the stage to move forward.
1010
// * state - is the current state of the stage and contains stage data.
1111
// * unwinder - if the stage needs to cause unwinding, `unwinder` methods can be used.
12-
type ExecFunc func(firstCycle bool, s *StageState, unwinder Unwinder, tx kv.RwTx) error
12+
type ExecFunc func(firstCycle bool, badBlockUnwind bool, s *StageState, unwinder Unwinder, tx kv.RwTx) error
1313

1414
// UnwindFunc is the unwinding logic of the stage.
1515
// * unwindState - contains information about the unwind itself.

eth/stagedsync/stage_bodies.go

+11
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ func BodiesForward(
9494
var req *bodydownload.BodyRequest
9595
var peer []byte
9696
stopped := false
97+
prevProgress := bodyProgress
98+
noProgressCount := 0 // How many time the progress was printed without actual progress
9799
Loop:
98100
for !stopped {
99101
// TODO: this is incorrect use
@@ -174,14 +176,23 @@ Loop:
174176
stopped = true
175177
break
176178
}
179+
if s.BlockNumber > 0 && noProgressCount >= 5 {
180+
break
181+
}
177182
timer.Stop()
178183
timer = time.NewTimer(1 * time.Second)
179184
select {
180185
case <-ctx.Done():
181186
stopped = true
182187
case <-logEvery.C:
183188
deliveredCount, wastedCount := cfg.bd.DeliveryCounts()
189+
if prevProgress == bodyProgress {
190+
noProgressCount++
191+
} else {
192+
noProgressCount = 0 // Reset, there was progress
193+
}
184194
logProgressBodies(logPrefix, bodyProgress, prevDeliveredCount, deliveredCount, prevWastedCount, wastedCount)
195+
prevProgress = bodyProgress
185196
prevDeliveredCount = deliveredCount
186197
prevWastedCount = wastedCount
187198
//log.Info("Timings", "d1", d1, "d2", d2, "d3", d3, "d4", d4, "d5", d5, "d6", d6)

eth/stagedsync/stage_headers.go

+40-29
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ func HeadersForward(
118118
var peer []byte
119119
stopped := false
120120
prevProgress := headerProgress
121+
noProgressCount := 0 // How many time the progress was printed without actual progress
121122
for !stopped {
122123
currentTime := uint64(time.Now().Unix())
123124
req, penalties := cfg.hd.RequestMoreHeaders(currentTime)
@@ -160,6 +161,9 @@ func HeadersForward(
160161
if len(announces) > 0 {
161162
cfg.announceNewHashes(ctx, announces)
162163
}
164+
if s.BlockNumber > 0 && noProgressCount >= 5 {
165+
break
166+
}
163167
if headerInserter.BestHeaderChanged() { // We do not break unless there best header changed
164168
if !initialCycle {
165169
// if this is not an initial cycle, we need to react quickly when new headers are coming in
@@ -179,6 +183,11 @@ func HeadersForward(
179183
stopped = true
180184
case <-logEvery.C:
181185
progress := cfg.hd.Progress()
186+
if prevProgress == progress {
187+
noProgressCount++
188+
} else {
189+
noProgressCount = 0 // Reset, there was progress
190+
}
182191
logProgressHeaders(logPrefix, prevProgress, progress)
183192
prevProgress = progress
184193
case <-timer.C:
@@ -240,7 +249,7 @@ func fixCanonicalChain(logPrefix string, logEvery *time.Ticker, height uint64, h
240249
return nil
241250
}
242251

243-
func HeadersUnwind(u *UnwindState, s *StageState, tx kv.RwTx, cfg HeadersCfg) (err error) {
252+
func HeadersUnwind(u *UnwindState, s *StageState, tx kv.RwTx, cfg HeadersCfg, test bool) (err error) {
244253
useExternalTx := tx != nil
245254
if !useExternalTx {
246255
tx, err = cfg.db.BeginRw(context.Background())
@@ -284,42 +293,44 @@ func HeadersUnwind(u *UnwindState, s *StageState, tx kv.RwTx, cfg HeadersCfg) (e
284293
}
285294
}
286295
if badBlock {
287-
// Find header with biggest TD
288-
tdCursor, cErr := tx.Cursor(kv.HeaderTD)
289-
if cErr != nil {
290-
return cErr
291-
}
292-
defer tdCursor.Close()
293-
var k, v []byte
294-
k, v, err = tdCursor.Last()
295-
if err != nil {
296-
return err
297-
}
298296
var maxTd big.Int
299297
var maxHash common.Hash
300298
var maxNum uint64 = 0
301-
for ; err == nil && k != nil; k, v, err = tdCursor.Prev() {
302-
if len(k) != 40 {
303-
return fmt.Errorf("key in TD table has to be 40 bytes long: %x", k)
299+
if test { // If we are not in the test, we can do searching for the heaviest chain in the next cycle
300+
// Find header with biggest TD
301+
tdCursor, cErr := tx.Cursor(kv.HeaderTD)
302+
if cErr != nil {
303+
return cErr
304304
}
305-
var hash common.Hash
306-
copy(hash[:], k[8:])
307-
if cfg.hd.IsBadHeader(hash) {
308-
continue
309-
}
310-
var td big.Int
311-
if err = rlp.DecodeBytes(v, &td); err != nil {
305+
defer tdCursor.Close()
306+
var k, v []byte
307+
k, v, err = tdCursor.Last()
308+
if err != nil {
312309
return err
313310
}
314-
if td.Cmp(&maxTd) > 0 {
315-
maxTd.Set(&td)
316-
copy(maxHash[:], k[8:])
317-
maxNum = binary.BigEndian.Uint64(k[:8])
311+
for ; err == nil && k != nil; k, v, err = tdCursor.Prev() {
312+
if len(k) != 40 {
313+
return fmt.Errorf("key in TD table has to be 40 bytes long: %x", k)
314+
}
315+
var hash common.Hash
316+
copy(hash[:], k[8:])
317+
if cfg.hd.IsBadHeader(hash) {
318+
continue
319+
}
320+
var td big.Int
321+
if err = rlp.DecodeBytes(v, &td); err != nil {
322+
return err
323+
}
324+
if td.Cmp(&maxTd) > 0 {
325+
maxTd.Set(&td)
326+
copy(maxHash[:], k[8:])
327+
maxNum = binary.BigEndian.Uint64(k[:8])
328+
}
329+
}
330+
if err != nil {
331+
return err
318332
}
319333
}
320-
if err != nil {
321-
return err
322-
}
323334
if maxNum == 0 {
324335
maxNum = u.UnwindPoint
325336
if maxHash, err = rawdb.ReadCanonicalHash(tx, maxNum); err != nil {

eth/stagedsync/stage_interhashes.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,9 @@ func SpawnIntermediateHashesStage(s *StageState, u Unwinder, tx kv.RwTx, cfg Tri
9191
if cfg.checkRoot && root != expectedRootHash {
9292
log.Error(fmt.Sprintf("[%s] Wrong trie root of block %d: %x, expected (from header): %x. Block hash: %x", logPrefix, to, root, expectedRootHash, headerHash))
9393
if to > s.BlockNumber {
94-
log.Warn("Unwinding due to incorrect root hash", "to", to-1)
95-
u.UnwindTo(to-1, headerHash)
94+
unwindTo := (to + s.BlockNumber) / 2 // Binary search for the correct block, biased to the lower numbers
95+
log.Warn("Unwinding due to incorrect root hash", "to", unwindTo)
96+
u.UnwindTo(unwindTo, headerHash)
9697
}
9798
} else if err = s.Update(tx, to); err != nil {
9899
return trie.EmptyRoot, err

0 commit comments

Comments
 (0)