Skip to content

Commit

Permalink
address feedbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-bahjati committed Nov 30, 2023
1 parent 5289b35 commit da315fe
Showing 1 changed file with 10 additions and 19 deletions.
29 changes: 10 additions & 19 deletions src/agent/solana/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,7 @@ pub fn spawn_oracle(
let (updates_tx, updates_rx) = mpsc::channel(config.updates_channel_capacity);
if config.subscriber_enabled {
let subscriber = Subscriber::new(
rpc_url.to_string(),
wss_url.to_string(),
rpc_timeout,
config.commitment,
key_store.program_key,
updates_tx,
Expand Down Expand Up @@ -658,14 +656,9 @@ mod subscriber {
/// Subscriber subscribes to all changes on the given account, and sends those changes
/// on updates_tx. This is a convenience wrapper around the Blockchain Shadow crate.
pub struct Subscriber {
/// HTTP RPC endpoint
rpc_url: String,
/// WSS RPC endpoint
wss_url: String,

/// Timeout for RPC requests
rpc_timeout: Duration,

/// Commitment level used to read account data
commitment: CommitmentLevel,

Expand All @@ -681,18 +674,14 @@ mod subscriber {

impl Subscriber {
pub fn new(
rpc_url: String,
wss_url: String,
rpc_timeout: Duration,
commitment: CommitmentLevel,
program_key: Pubkey,
updates_tx: mpsc::Sender<(Pubkey, solana_sdk::account::Account)>,
logger: Logger,
) -> Self {
Subscriber {
rpc_url,
wss_url,
rpc_timeout,
commitment,
program_key,
updates_tx,
Expand All @@ -707,7 +696,8 @@ mod subscriber {
error!(self.logger, "{}", err);
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
if current_time.elapsed() < Duration::from_secs(30) {
tracing::error!(
warn!(
self.logger,
"Subscriber restarting too quickly. Sleeping for 1 second."
);
tokio::time::sleep(Duration::from_secs(1)).await;
Expand All @@ -717,15 +707,13 @@ mod subscriber {
}

pub async fn start(&self) -> Result<()> {
debug!(self.logger, "subscribed to program account updates"; "program_key" => self.program_key.to_string());

let client = PubsubClient::new(self.wss_url.as_str())
.await
.expect("failed to create pubsub client");
let client = PubsubClient::new(self.wss_url.as_str()).await?;

let config = RpcProgramAccountsConfig {
account_config: RpcAccountInfoConfig {
commitment: Some(CommitmentConfig::confirmed()),
commitment: Some(CommitmentConfig {
commitment: self.commitment,
}),
encoding: Some(UiAccountEncoding::Base64Zstd),
..Default::default()
},
Expand All @@ -737,6 +725,8 @@ mod subscriber {
.program_subscribe(&self.program_key, Some(config))
.await?;

debug!(self.logger, "subscribed to program account updates"; "program_key" => self.program_key.to_string());

loop {
match tokio_stream::StreamExt::next(&mut notif).await {
Some(update) => {
Expand All @@ -754,7 +744,8 @@ mod subscriber {
.map_err(|_| anyhow!("failed to send update to oracle"))?;
}
None => {
return Err(anyhow!("Subscriber closed connection"));
debug!(self.logger, "subscriber closed connection");
return Ok(());
}
}
}
Expand Down

0 comments on commit da315fe

Please sign in to comment.