Skip to content

Commit

Permalink
get rid of low-level Future implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
itegulov committed Jan 13, 2025
1 parent af94d19 commit cd0326f
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 180 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ itertools = "0.13"
jsonrpsee = "0.23"
lazy_static = "1.4"
once_cell = "1.7"
pin-project = "1.1.7"
rand = "0.8"
reqwest = { version = "0.11", features = ["blocking"] }
rustc-hash = "1.1.0"
Expand Down
4 changes: 2 additions & 2 deletions crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,10 @@ async fn main() -> anyhow::Result<()> {
_ = any_server_stopped => {
tracing::trace!("node server was stopped")
},
_ = node_executor => {
_ = node_executor.run() => {
tracing::trace!("node executor was stopped")
},
_ = block_sealer => {
_ = block_sealer.run() => {
tracing::trace!("block sealer was stopped")
},
_ = state_dumper => {
Expand Down
1 change: 0 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ chrono.workspace = true
time.workspace = true
flate2.workspace = true
thiserror.workspace = true
pin-project.workspace = true

[dev-dependencies]
maplit.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/node/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,8 +656,8 @@ impl InMemoryNode {
pool.clone(),
node_handle.clone(),
);
tokio::spawn(node_executor);
tokio::spawn(block_sealer);
tokio::spawn(node_executor.run());
tokio::spawn(block_sealer.run());
Self::new(
inner,
blockchain,
Expand Down
182 changes: 60 additions & 122 deletions crates/core/src/node/inner/node_executor.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,15 @@
use super::InMemoryNodeInner;
use crate::node::pool::TxBatch;
use crate::system_contracts::SystemContracts;
use futures::future::BoxFuture;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::{mpsc, oneshot, RwLock};
use zksync_contracts::BaseSystemContracts;
use zksync_multivm::interface::TxExecutionMode;
use zksync_types::l2::L2Tx;
use zksync_types::L2BlockNumber;

#[pin_project::pin_project]
pub struct NodeExecutor {
node_inner: Arc<RwLock<InMemoryNodeInner>>,
system_contracts: SystemContracts,
command_receiver: mpsc::Receiver<Command>,
/// Future that is processing the next command
#[pin]
future: Option<BoxFuture<'static, ()>>,
}

impl NodeExecutor {
Expand All @@ -32,21 +22,55 @@ impl NodeExecutor {
node_inner,
system_contracts,
command_receiver,
future: None,
};
let handle = NodeExecutorHandle { command_sender };
(this, handle)
}

pub async fn run(mut self) -> anyhow::Result<()> {
while let Some(command) = self.command_receiver.recv().await {
match command {
Command::SealBlock(tx_batch, reply) => {
self.seal_block(tx_batch, reply).await;
}
Command::SealBlocks(tx_batches, interval, reply) => {
self.seal_blocks(tx_batches, interval, reply).await;
}
Command::IncreaseTime(delta, reply) => {
self.increase_time(delta, reply).await;
}
Command::EnforceNextTimestamp(timestamp, reply) => {
self.enforce_next_timestamp(timestamp, reply).await;
}
Command::SetCurrentTimestamp(timestamp, reply) => {
self.set_current_timestamp(timestamp, reply).await;
}
Command::SetTimestampInterval(seconds) => {
self.set_timestamp_interval(seconds).await;
}
Command::RemoveTimestampInterval(reply) => {
self.remove_timestamp_interval(reply).await;
}
}
}

tracing::trace!("channel has been closed; stopping node executor");
Ok(())
}
}

impl NodeExecutor {
async fn seal_block(
node_inner: Arc<RwLock<InMemoryNodeInner>>,
txs: Vec<L2Tx>,
base_system_contracts: BaseSystemContracts,
&self,
TxBatch { impersonating, txs }: TxBatch,
reply: Option<oneshot::Sender<anyhow::Result<L2BlockNumber>>>,
) {
let result = node_inner
let base_system_contracts = self
.system_contracts
.contracts(TxExecutionMode::VerifyExecute, impersonating)
.clone();
let result = self
.node_inner
.write()
.await
.seal_block(txs, base_system_contracts)
Expand All @@ -69,13 +93,12 @@ impl NodeExecutor {
}

async fn seal_blocks(
node_inner: Arc<RwLock<InMemoryNodeInner>>,
&self,
tx_batches: Vec<TxBatch>,
interval: u64,
system_contracts: SystemContracts,
reply: oneshot::Sender<anyhow::Result<Vec<L2BlockNumber>>>,
) {
let mut node_inner = node_inner.write().await;
let mut node_inner = self.node_inner.write().await;

// Save old interval to restore later: it might get replaced with `interval` below
let old_interval = node_inner.time_writer.get_block_timestamp_interval();
Expand All @@ -91,7 +114,8 @@ impl NodeExecutor {
.time_writer
.set_block_timestamp_interval(Some(interval));
}
let base_system_contracts = system_contracts
let base_system_contracts = self
.system_contracts
.contracts(TxExecutionMode::VerifyExecute, impersonating)
.clone();
let number = node_inner.seal_block(txs, base_system_contracts).await?;
Expand All @@ -118,24 +142,25 @@ impl NodeExecutor {
}
}

async fn increase_time(
node_inner: Arc<RwLock<InMemoryNodeInner>>,
delta: u64,
reply: oneshot::Sender<()>,
) {
node_inner.write().await.time_writer.increase_time(delta);
async fn increase_time(&self, delta: u64, reply: oneshot::Sender<()>) {
self.node_inner
.write()
.await
.time_writer
.increase_time(delta);
// Reply to sender if we can
if reply.send(()).is_err() {
tracing::info!("failed to reply as receiver has been dropped");
}
}

async fn enforce_next_timestamp(
node_inner: Arc<RwLock<InMemoryNodeInner>>,
&self,
timestamp: u64,
reply: oneshot::Sender<anyhow::Result<()>>,
) {
let result = node_inner
let result = self
.node_inner
.write()
.await
.time_writer
Expand All @@ -153,12 +178,9 @@ impl NodeExecutor {
}
}

async fn set_current_timestamp(
node_inner: Arc<RwLock<InMemoryNodeInner>>,
timestamp: u64,
reply: oneshot::Sender<i128>,
) {
let result = node_inner
async fn set_current_timestamp(&self, timestamp: u64, reply: oneshot::Sender<i128>) {
let result = self
.node_inner
.write()
.await
.time_writer
Expand All @@ -169,19 +191,17 @@ impl NodeExecutor {
}
}

async fn set_timestamp_interval(node_inner: Arc<RwLock<InMemoryNodeInner>>, delta: u64) {
node_inner
async fn set_timestamp_interval(&self, delta: u64) {
self.node_inner
.write()
.await
.time_writer
.set_block_timestamp_interval(Some(delta));
}

async fn remove_timestamp_interval(
node_inner: Arc<RwLock<InMemoryNodeInner>>,
reply: oneshot::Sender<bool>,
) {
let result = node_inner
async fn remove_timestamp_interval(&self, reply: oneshot::Sender<bool>) {
let result = self
.node_inner
.write()
.await
.time_writer
Expand All @@ -193,88 +213,6 @@ impl NodeExecutor {
}
}

impl Future for NodeExecutor {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

if this.future.is_none() {
let command_opt = futures::ready!(this.command_receiver.poll_recv(cx));
let Some(command) = command_opt else {
tracing::trace!("channel has been closed; stopping node executor");
return Poll::Ready(());
};
match command {
Command::SealBlock(tx_batch, reply) => {
let TxBatch { impersonating, txs } = tx_batch;

let base_system_contracts = this
.system_contracts
.contracts(TxExecutionMode::VerifyExecute, impersonating)
.clone();
let node_inner = this.node_inner.clone();
*this.future = Some(Box::pin(Self::seal_block(
node_inner,
txs,
base_system_contracts,
reply,
)));
}
Command::SealBlocks(tx_batches, interval, reply) => {
let node_inner = this.node_inner.clone();
let system_contracts = this.system_contracts.clone();
*this.future = Some(Box::pin(Self::seal_blocks(
node_inner,
tx_batches,
interval,
system_contracts,
reply,
)));
}
Command::IncreaseTime(delta, reply) => {
let node_inner = this.node_inner.clone();
*this.future = Some(Box::pin(Self::increase_time(node_inner, delta, reply)));
}
Command::EnforceNextTimestamp(timestamp, reply) => {
let node_inner = this.node_inner.clone();
*this.future = Some(Box::pin(Self::enforce_next_timestamp(
node_inner, timestamp, reply,
)));
}
Command::SetCurrentTimestamp(timestamp, reply) => {
let node_inner = this.node_inner.clone();
*this.future = Some(Box::pin(Self::set_current_timestamp(
node_inner, timestamp, reply,
)));
}
Command::SetTimestampInterval(seconds) => {
let node_inner = this.node_inner.clone();
*this.future =
Some(Box::pin(Self::set_timestamp_interval(node_inner, seconds)));
}
Command::RemoveTimestampInterval(reply) => {
let node_inner = this.node_inner.clone();
*this.future =
Some(Box::pin(Self::remove_timestamp_interval(node_inner, reply)));
}
}
}

if let Some(future) = this.future.as_mut().as_pin_mut() {
// Clear pending future if it completed
if let Poll::Ready(()) = future.poll(cx) {
*this.future = None;
// Wake yourself up as we might have some unprocessed commands left
cx.waker().wake_by_ref();
}
Poll::Pending
} else {
Poll::Pending
}
}
}

#[derive(Clone, Debug)]
pub struct NodeExecutorHandle {
command_sender: mpsc::Sender<Command>,
Expand Down
Loading

0 comments on commit cd0326f

Please sign in to comment.