Skip to content

Commit

Permalink
Merge pull request #525 from LimeChain/#389-block-tree-warp-sync-hand…
Browse files Browse the repository at this point in the history
…ling

feat: handle block tree after warp sync
  • Loading branch information
Grigorov-Georgi authored Sep 9, 2024
2 parents f3703d9 + e55ff8e commit cb1065b
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/limechain/client/FullNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void start() {
switch (args.syncMode()) {
case FULL -> fullSyncMachine.start();
case WARP -> {
warpSyncMachine.onFinish(() -> fullSyncMachine.start());
warpSyncMachine.onFinish(fullSyncMachine::start);
warpSyncMachine.start();
}
default -> throw new IllegalStateException("Unexpected value: " + args.syncMode());
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/limechain/network/Network.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,16 +120,16 @@ private void initializeProtocols(ChainService chainService, GenesisBlockHash gen

String pingProtocol = ProtocolUtils.PING_PROTOCOL;
String chainId = chainService.getChainSpec().getProtocolId();
String protocolId = cliArgs.noLegacyProtocols()
? StringUtils.remove0xPrefix(genesisBlockHash.getGenesisHash().toString())
: chainId;
boolean legacyProtocol = !cliArgs.noLegacyProtocols();
String protocolId = legacyProtocol ? chainId :
StringUtils.remove0xPrefix(genesisBlockHash.getGenesisHash().toString());
String kadProtocolId = ProtocolUtils.getKadProtocol(chainId);
String warpProtocolId = ProtocolUtils.getWarpSyncProtocol(protocolId);
String lightProtocolId = ProtocolUtils.getLightMessageProtocol(protocolId);
String syncProtocolId = ProtocolUtils.getSyncProtocol(protocolId);
String stateProtocolId = ProtocolUtils.getStateProtocol(protocolId);
String blockAnnounceProtocolId = ProtocolUtils.getBlockAnnounceProtocol(protocolId);
String grandpaProtocolId = ProtocolUtils.getGrandpaProtocol(protocolId);
String grandpaProtocolId = ProtocolUtils.getGrandpaProtocol(protocolId, legacyProtocol);
String transactionsProtocolId = ProtocolUtils.getTransactionsProtocol(protocolId);

kademliaService = new KademliaService(kadProtocolId, hostId, isLocalEnabled, clientMode);
Expand Down
9 changes: 2 additions & 7 deletions src/main/java/com/limechain/network/ProtocolUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,8 @@ public static String getKadProtocol(String chainId) {
return String.format("/%s/kad", chainId);
}

public static String getGrandpaProtocol(String chainId) {
return String.format("/%s/grandpa/1", grandpaProtocolChain(chainId));
}

//TODO: figure out a more elegant solution
private static String grandpaProtocolChain(String chainId) {
return chainId.equals("dot") ? "paritytech" : chainId;
public static String getGrandpaProtocol(String chainId, boolean legacyProtocol) {
return String.format("/%s/grandpa/1", legacyProtocol ? "paritytech" : chainId);
}

public static String getTransactionsProtocol(String chainId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package com.limechain.network.protocol.blockannounce;

import com.limechain.exception.scale.ScaleEncodingException;
import com.limechain.exception.storage.BlockNodeNotFoundException;
import com.limechain.network.ConnectionManager;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshake;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage;
import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceHandshakeScaleReader;
import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceHandshakeScaleWriter;
import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceMessageScaleReader;
import com.limechain.network.protocol.warp.dto.Block;
import com.limechain.network.protocol.warp.dto.BlockBody;
import com.limechain.rpc.server.AppBean;
import com.limechain.storage.block.BlockState;
import com.limechain.sync.warpsync.WarpSyncState;
Expand All @@ -24,13 +21,14 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.logging.Level;

@Log
@AllArgsConstructor(access = AccessLevel.PROTECTED)
public class BlockAnnounceEngine {

public static final int HANDSHAKE_LENGTH = 69;

protected ConnectionManager connectionManager;
protected WarpSyncState warpSyncState;
protected BlockAnnounceHandshakeBuilder handshakeBuilder;
Expand Down Expand Up @@ -74,7 +72,7 @@ private void handleHandshake(byte[] msg, PeerId peerId, Stream stream, boolean c
connectionManager.addBlockAnnounceStream(stream);
connectionManager.updatePeer(peerId, handshake);
log.log(Level.INFO, "Received handshake from " + peerId + "\n" +
handshake);
handshake);
writeHandshakeToStream(stream, peerId);
}
}
Expand All @@ -85,19 +83,13 @@ private void handleBlockAnnounce(byte[] msg, PeerId peerId) {
connectionManager.updatePeer(peerId, announce);
warpSyncState.syncBlockAnnounce(announce);
log.log(Level.FINE, "Received block announce for block #" + announce.getHeader().getBlockNumber() +
" from " + peerId +
" with hash:0x" + announce.getHeader().getHash() +
" parentHash:" + announce.getHeader().getParentHash() +
" stateRoot:" + announce.getHeader().getStateRoot());
" from " + peerId +
" with hash:0x" + announce.getHeader().getHash() +
" parentHash:" + announce.getHeader().getParentHash() +
" stateRoot:" + announce.getHeader().getStateRoot());

if (BlockState.getInstance().isInitialized()) {
try {
BlockState.getInstance().addBlock(new Block(announce.getHeader(), new BlockBody(new ArrayList<>())));
} catch (BlockNodeNotFoundException ignored) {
// Currently we ignore this exception, because our syncing strategy as full node is not implemented yet.
// And thus when we receive a block announce and try to add it in the BlockState we will get this
// exception because the parent block of the received one is not found in the BlockState.
}
BlockState.getInstance().addBlockToBlockTree(announce.getHeader());
}
}

Expand Down
77 changes: 72 additions & 5 deletions src/main/java/com/limechain/storage/block/BlockState.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
import com.limechain.runtime.Runtime;
import com.limechain.storage.DBConstants;
import com.limechain.storage.KVRepository;
import com.limechain.storage.block.tree.BlockNode;
import com.limechain.storage.block.tree.BlockTree;
import com.limechain.utils.scale.ScaleUtils;
import io.emeraldpay.polkaj.types.Hash256;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.java.Log;
import org.javatuples.Pair;
import org.springframework.util.SerializationUtils;

import java.math.BigInteger;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -53,6 +56,10 @@ public class BlockState {
private Hash256 lastFinalized;
@Getter
private boolean initialized;
@Setter
private boolean fullSyncFinished;
@Getter
private final ArrayDeque<Pair<Instant, Block>> pendingBlocksQueue = new ArrayDeque<>();

/**
* Initializes the BlockState instance from genesis
Expand Down Expand Up @@ -97,11 +104,21 @@ public void initialize(final KVRepository<String, Object> repository) {

this.genesisHash = getHashByNumberFromDb(BigInteger.ZERO);
final BlockHeader lastHeader = getHighestFinalizedHeader();
final Hash256 headerHash = lastHeader.getHash();
this.lastFinalized = headerHash;
this.lastFinalized = lastHeader.getHash();
this.blockTree = new BlockTree(lastHeader);
}

public void initializeAfterWarpSync(Hash256 lastFinalizedBlockHash, BigInteger lastFinalizedBlockNumber) {
BlockNode parentBlock = new BlockNode(
lastFinalizedBlockHash,
null,
lastFinalizedBlockNumber.longValue()
);

this.blockTree = new BlockTree(parentBlock);
this.lastFinalized = lastFinalizedBlockHash;
}

/**
* Check if the hash is part of the unfinalized blocks in-memory or persisted in the database.
*
Expand Down Expand Up @@ -589,7 +606,7 @@ public List<Hash256> retrieveRangeFromDatabase(final Hash256 startHash, final Bl
// Verify that we ended up with the start hash
if (!Objects.equals(inLoopHash, startHash)) {
throw new BlockStorageGenericException("Start hash mismatch: expected " + startHash +
", found: " + inLoopHash);
", found: " + inLoopHash);
}

return hashes;
Expand Down Expand Up @@ -749,6 +766,8 @@ public Block getUnfinalizedBlockFromHash(final Hash256 hash) {
* @throws BlockNodeNotFoundException if the block corresponding to the provided hash is not found.
*/
public void setFinalizedHash(final Hash256 hash, final BigInteger round, final BigInteger setId) {
if (!fullSyncFinished) return;

if (!hasHeader(hash)) {
throw new BlockNodeNotFoundException("Cannot finalise unknown block " + hash);
}
Expand Down Expand Up @@ -883,8 +902,8 @@ public void handleFinalizedBlock(final Hash256 currentFinalizedHash) {

Block block = unfinalizedBlocks.get(subchainHash);
if (block == null) {
throw new BlockNotFoundException("Failed to find block in unfinalized block map for hash" +
subchainHash);
throw new BlockNotFoundException("Failed to find block in unfinalized block map for hash " +
subchainHash);
}

setHeader(block.getHeader());
Expand All @@ -904,4 +923,52 @@ public void handleFinalizedBlock(final Hash256 currentFinalizedHash) {
//TODO: If currentFinalizedHash is not equal to subchain hash, delete subchain state trie
}
}

public synchronized void addBlockToBlockTree(BlockHeader blockHeader) {
if (!fullSyncFinished) {
addBlockToQueue(blockHeader);
return;
}

processPendingBlocksFromQueue();

if (getPendingBlocksQueue().isEmpty()) {
try {
addBlock(new Block(blockHeader, new BlockBody(new ArrayList<>())));
} catch (BlockStorageGenericException ex) {
log.fine(String.format("[%s] %s", blockHeader.getHash().toString(), ex.getMessage()));
}
}
}

private void addBlockToQueue(BlockHeader blockHeader) {
var currentBlock = new Block(
blockHeader,
new BlockBody(new ArrayList<>())
);

pendingBlocksQueue.add(
new Pair<>(Instant.now(), currentBlock)
);
}

private void processPendingBlocksFromQueue() {
var rootBlockNumber = BigInteger.valueOf(blockTree.getRoot().getNumber());

while (!pendingBlocksQueue.isEmpty()) {
var currentPair = pendingBlocksQueue.poll();
var block = currentPair.getValue1();
var arrivalTime = currentPair.getValue0();

if (block.getHeader().getBlockNumber().compareTo(rootBlockNumber) <= 0) {
continue;
}

try {
addBlockWithArrivalTime(block, arrivalTime);
} catch (BlockStorageGenericException ex) {
log.fine(String.format("[%s] %s", block.getHeader().getHash().toString(), ex.getMessage()));
}
}
}
}
5 changes: 5 additions & 0 deletions src/main/java/com/limechain/storage/block/SyncState.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ public void finalizedCommitMessage(CommitMessage commitMessage) {
this.stateRoot = blockByHash.getHeader().getStateRoot();
this.lastFinalizedBlockHash = commitMessage.getVote().getBlockHash();
this.lastFinalizedBlockNumber = commitMessage.getVote().getBlockNumber();

if (BlockState.getInstance().isInitialized()) {
BlockState.getInstance().setFinalizedHash(commitMessage.getVote().getBlockHash(), commitMessage.getRoundNumber(), commitMessage.getSetId());
}
}

} catch (HeaderNotFoundException ignored) {
log.fine("Received commit message for a block that is not in the block store");
}
Expand Down
8 changes: 2 additions & 6 deletions src/main/java/com/limechain/storage/block/tree/BlockTree.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,6 @@ public List<Hash256> rangeInMemory(final Hash256 startHash, final Hash256 endHas
throw new BlockNodeNotFoundException("Start node not found");
}

if (startBlockNode.getNumber() > endBlockNode.getNumber()) {
throw new BlockStorageGenericException("Start is greater than end");
}

return accumulateHashesInDescendingOrder(endBlockNode, startBlockNode);
}

Expand All @@ -196,7 +192,7 @@ public List<Hash256> rangeInMemory(final Hash256 startHash, final Hash256 endHas
*/
public List<Hash256> accumulateHashesInDescendingOrder(final BlockNode endNode, final BlockNode startNode) {
if (startNode.getNumber() > endNode.getNumber()) {
throw new IllegalArgumentException("Start is greater than end");
throw new BlockStorageGenericException("Start is greater than end");
}

int blocksInRange = (int) (endNode.getNumber() - startNode.getNumber());
Expand All @@ -205,7 +201,7 @@ public List<Hash256> accumulateHashesInDescendingOrder(final BlockNode endNode,
BlockNode tempNode = endNode;
for (int position = blocksInRange - 1; position >= 0; position--) {
hashes.add(tempNode.getHash());
tempNode = endNode.getParent();
tempNode = tempNode.getParent();

if (tempNode == null) {
throw new BlockStorageGenericException("End node is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public void start() {
startNumber += blocksToFetch;
receivedBlocks = requestBlocks(startNumber, blocksToFetch);
}

blockState.setFullSyncFinished(true);
}

private TrieStructure<NodeData> loadStateAtBlockFromPeer(Hash256 lastFinalizedBlockHash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.limechain.chain.lightsyncstate.LightSyncState;
import com.limechain.network.Network;
import com.limechain.network.protocol.warp.dto.WarpSyncFragment;
import com.limechain.storage.block.BlockState;
import com.limechain.storage.block.SyncState;
import com.limechain.sync.warpsync.action.FinishedAction;
import com.limechain.sync.warpsync.action.RequestFragmentsAction;
Expand Down Expand Up @@ -101,6 +102,12 @@ private void finishWarpSync() {
this.warpState.setWarpSyncFinished(true);
this.networkService.handshakeBootNodes();
this.syncState.persistState();

BlockState.getInstance().initializeAfterWarpSync(
syncState.getLastFinalizedBlockHash(),
syncState.getLastFinalizedBlockNumber()
);

log.info("Warp sync finished.");
this.onFinishCallbacks.forEach(executor::submit);
}
Expand Down

0 comments on commit cb1065b

Please sign in to comment.