From 9e09a57ca7b52d482c50c579a02dab896979ea25 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Thu, 14 Dec 2023 20:38:05 +0100 Subject: [PATCH] fix: fire and forget update tx The old behaviour causes the exporter not to publishing prices regularly. This change spawns a separate thread to unblock the exporter loop. Also, because of the batch staggering behaviour to avoid calling rpc for all batches at the same time, the old approach would always results in waiting longer than the publishing period. --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/agent/solana/exporter.rs | 52 +++++++++++++++++++++++++----------- 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8e21c1f..3246fda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2731,7 +2731,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "2.4.3" +version = "2.4.4" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index c3605a7..0b30856 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.4.3" +version = "2.4.4" edition = "2021" [[bin]] diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index dfc507e..dee0bb3 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -61,6 +61,7 @@ use { BTreeMap, HashMap, }, + sync::Arc, time::Duration, }, tokio::{ @@ -228,7 +229,7 @@ pub fn spawn_exporter( /// Exporter is responsible for exporting data held in the local store /// to the global Pyth Network. pub struct Exporter { - rpc_client: RpcClient, + rpc_client: Arc, config: Config, @@ -292,7 +293,10 @@ impl Exporter { ) -> Self { let publish_interval = time::interval(config.publish_interval_duration); Exporter { - rpc_client: RpcClient::new_with_timeout(rpc_url.to_string(), rpc_timeout), + rpc_client: Arc::new(RpcClient::new_with_timeout( + rpc_url.to_string(), + rpc_timeout, + )), config, network, publish_interval, @@ -536,7 +540,7 @@ impl Exporter { let mut batch_send_interval = time::interval( self.config .publish_interval_duration - .div_f64(num_batches as f64), + .div_f64((num_batches + 1) as f64), // +1 to give enough time for the last batch ); let mut batch_state = HashMap::new(); let mut batch_futures = vec![]; @@ -796,19 +800,37 @@ impl Exporter { network_state.blockhash, ); - let signature = self - .rpc_client - .send_transaction_with_config( - &transaction, - RpcSendTransactionConfig { - skip_preflight: true, - ..RpcSendTransactionConfig::default() - }, - ) - .await?; - debug!(self.logger, "sent upd_price transaction"; "signature" => signature.to_string(), "instructions" => instructions.len(), "price_accounts" => format!("{:?}", price_accounts)); + let tx = self.inflight_transactions_tx.clone(); + let logger = self.logger.clone(); + let rpc_client = self.rpc_client.clone(); + + // Fire this off in a separate task so we don't block the main thread of the exporter + tokio::spawn(async move { + let signature = match rpc_client + .send_transaction_with_config( + &transaction, + RpcSendTransactionConfig { + skip_preflight: true, + ..RpcSendTransactionConfig::default() + }, + ) + .await + { + Ok(signature) => signature, + Err(err) => { + error!(logger, "{}", err); + debug!(logger, "error context"; "context" => format!("{:?}", err)); + return; + } + }; + + debug!(logger, "sent upd_price transaction"; "signature" => signature.to_string(), "instructions" => instructions.len(), "price_accounts" => format!("{:?}", price_accounts)); - self.inflight_transactions_tx.send(signature).await?; + if let Err(err) = tx.send(signature).await { + error!(logger, "{}", err); + debug!(logger, "error context"; "context" => format!("{:?}", err)); + } + }); Ok(()) }