diff --git a/src/agent.rs b/src/agent.rs index 3f2c299..c3b44c6 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -118,26 +118,18 @@ impl Agent { // Create the channels // TODO: make all components listen to shutdown signal let (shutdown_tx, _) = broadcast::channel(self.config.channel_capacities.shutdown); - let (local_store_tx, local_store_rx) = - mpsc::channel(self.config.channel_capacities.local_store); let (primary_keypair_loader_tx, primary_keypair_loader_rx) = mpsc::channel(10); let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10); // Create the Pythd Adapter. let adapter = Arc::new( - pythd::adapter::Adapter::new( - self.config.pythd_adapter.clone(), - local_store_tx.clone(), - logger.clone(), - ) - .await, + pythd::adapter::Adapter::new(self.config.pythd_adapter.clone(), logger.clone()).await, ); // Spawn the primary network jhs.extend(network::spawn_network( self.config.primary_network.clone(), network::Network::Primary, - local_store_tx.clone(), primary_keypair_loader_tx, logger.new(o!("primary" => true)), adapter.clone(), @@ -148,7 +140,6 @@ impl Agent { jhs.extend(network::spawn_network( config.clone(), network::Network::Secondary, - local_store_tx.clone(), secondary_keypair_loader_tx, logger.new(o!("primary" => false)), adapter.clone(), @@ -161,9 +152,6 @@ impl Agent { shutdown_tx.subscribe(), ))); - // Spawn the Local Store - jhs.push(store::local::spawn_store(local_store_rx, logger.clone())); - // Spawn the Pythd API Server jhs.push(tokio::spawn(rpc::run( self.config.pythd_api_server.clone(), @@ -175,7 +163,6 @@ impl Agent { // Spawn the metrics server jhs.push(tokio::spawn(metrics::MetricsServer::spawn( self.config.metrics_server.bind_address, - local_store_tx, logger.clone(), adapter, ))); diff --git a/src/agent/dashboard.rs b/src/agent/dashboard.rs index bb4b45f..d7af08c 100644 --- a/src/agent/dashboard.rs +++ b/src/agent/dashboard.rs @@ -1,14 +1,16 @@ use { super::{ - pythd::adapter::global::GlobalStore, + pythd::adapter::{ + global::GlobalStore, + local::{ + LocalStore, + PriceInfo, + }, + }, solana::{ network::Network, oracle::PriceEntry, }, - store::local::{ - Message, - PriceInfo, - }, }, crate::agent::{ metrics::MetricsServer, @@ -34,7 +36,6 @@ use { }, time::Duration, }, - tokio::sync::oneshot, typed_html::{ dom::DOMTree, html, @@ -45,22 +46,11 @@ use { impl MetricsServer { /// Create an HTML view of store data pub async fn render_dashboard(&self) -> Result> { - // Prepare response channel for requests - let (local_tx, local_rx) = oneshot::channel(); - // Request price data from local and global store - self.local_store_tx - .send(Message::LookupAllPriceInfo { - result_tx: local_tx, - }) - .await?; - + let local_data = LocalStore::get_all_price_infos(&*self.adapter).await; let global_data = GlobalStore::accounts_data(&*self.adapter, Network::Primary).await?; let global_metadata = GlobalStore::accounts_metadata(&*self.adapter).await?; - // Await the results - let local_data = local_rx.await?; - let symbol_view = build_dashboard_data(local_data, global_data, global_metadata, &self.logger); diff --git a/src/agent/metrics.rs b/src/agent/metrics.rs index 43dd052..921796c 100644 --- a/src/agent/metrics.rs +++ b/src/agent/metrics.rs @@ -1,14 +1,11 @@ use { - super::{ - pythd::adapter::Adapter, - store::local::Message, + super::pythd::adapter::{ + local::PriceInfo, + Adapter, }, crate::agent::{ solana::oracle::PriceEntry, - store::{ - local::PriceInfo, - PriceIdentifier, - }, + store::PriceIdentifier, }, lazy_static::lazy_static, prometheus_client::{ @@ -34,10 +31,7 @@ use { }, time::Instant, }, - tokio::sync::{ - mpsc, - Mutex, - }, + tokio::sync::Mutex, warp::{ hyper::StatusCode, reply, @@ -73,23 +67,19 @@ lazy_static! { /// Internal metrics server state, holds state needed for serving /// dashboard and metrics. pub struct MetricsServer { - /// Used to pull the state of all symbols in local store - pub local_store_tx: mpsc::Sender, - pub start_time: Instant, - pub logger: Logger, - pub adapter: Arc, + pub start_time: Instant, + pub logger: Logger, + pub adapter: Arc, } impl MetricsServer { /// Instantiate a metrics API with a dashboard pub async fn spawn( addr: impl Into + 'static, - local_store_tx: mpsc::Sender, logger: Logger, adapter: Arc, ) { let server = MetricsServer { - local_store_tx, start_time: Instant::now(), logger, adapter, diff --git a/src/agent/pythd/adapter.rs b/src/agent/pythd/adapter.rs index 04cdd32..4a63a51 100644 --- a/src/agent/pythd/adapter.rs +++ b/src/agent/pythd/adapter.rs @@ -1,9 +1,6 @@ use { super::{ - super::store::{ - local, - PriceIdentifier, - }, + super::store::PriceIdentifier, api::{ NotifyPrice, NotifyPriceSched, @@ -29,6 +26,7 @@ use { pub mod api; pub mod global; +pub mod local; pub use api::{ notifier, AdapterApi, @@ -68,14 +66,14 @@ pub struct Adapter { /// The fixed interval at which Notify Price Sched notifications are sent notify_price_sched_interval_duration: Duration, - /// Channel on which to communicate with the local store - local_store_tx: mpsc::Sender, - /// The logger logger: Logger, /// Global store for managing the unified state of Pyth-on-Solana networks. global_store: global::Store, + + /// Local store for managing the unpushed state. + local_store: local::Store, } /// Represents a single Notify Price Sched subscription @@ -95,19 +93,15 @@ struct NotifyPriceSubscription { } impl Adapter { - pub async fn new( - config: Config, - local_store_tx: mpsc::Sender, - logger: Logger, - ) -> Self { + pub async fn new(config: Config, logger: Logger) -> Self { let registry = &mut *PROMETHEUS_REGISTRY.lock().await; Adapter { global_store: global::Store::new(logger.clone(), registry), + local_store: local::Store::new(logger.clone(), registry), subscription_id_seq: 1.into(), notify_price_sched_subscriptions: RwLock::new(HashMap::new()), notify_price_subscriptions: RwLock::new(HashMap::new()), notify_price_sched_interval_duration: config.notify_price_sched_interval_duration, - local_store_tx, logger, } } @@ -128,8 +122,9 @@ mod tests { }, crate::agent::{ pythd::{ - api, + adapter::local::LocalStore, api::{ + self, NotifyPrice, NotifyPriceSched, PriceAccountMetadata, @@ -140,7 +135,6 @@ mod tests { }, }, solana, - store::local, }, iobuffer::IoBuffer, pyth_sdk::Identifier, @@ -172,21 +166,19 @@ mod tests { }; struct TestAdapter { - adapter: Arc, - local_store_rx: mpsc::Receiver, - shutdown_tx: broadcast::Sender<()>, - jh: JoinHandle<()>, + adapter: Arc, + shutdown_tx: broadcast::Sender<()>, + jh: JoinHandle<()>, } async fn setup() -> TestAdapter { // Create and spawn an adapter - let (local_store_tx, local_store_rx) = mpsc::channel(1000); let notify_price_sched_interval_duration = Duration::from_nanos(10); let logger = slog_test::new_test_logger(IoBuffer::new()); let config = Config { notify_price_sched_interval_duration, }; - let adapter = Arc::new(Adapter::new(config, local_store_tx, logger).await); + let adapter = Arc::new(Adapter::new(config, logger).await); let (shutdown_tx, _) = broadcast::channel(1); // Spawn Price Notifier @@ -194,7 +186,6 @@ mod tests { TestAdapter { adapter, - local_store_rx, shutdown_tx, jh, } @@ -1379,7 +1370,7 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_update_price() { // Start the test adapter - let mut test_adapter = setup().await; + let test_adapter = setup().await; // Send an Update Price message let account = "CkMrDWtmFJZcmAUC11qNaWymbXQKvnRx4cq1QudLav7t" @@ -1394,18 +1385,14 @@ mod tests { .unwrap(); // Check that the local store indeed received the correct update - match test_adapter.local_store_rx.recv().await.unwrap() { - local::Message::Update { - price_identifier, - price_info, - } => { - assert_eq!(price_identifier, Identifier::new(account.to_bytes())); - assert_eq!(price_info.price, price); - assert_eq!(price_info.conf, conf); - assert_eq!(price_info.status, PriceStatus::Trading); - } - _ => panic!("Uexpected message received by local store from adapter"), - }; + let price_infos = LocalStore::get_all_price_infos(&*test_adapter.adapter).await; + let price_info = price_infos + .get(&Identifier::new(account.to_bytes())) + .unwrap(); + + assert_eq!(price_info.price, price); + assert_eq!(price_info.conf, conf); + assert_eq!(price_info.status, PriceStatus::Trading); let _ = test_adapter.shutdown_tx.send(()); test_adapter.jh.abort(); diff --git a/src/agent/pythd/adapter/api.rs b/src/agent/pythd/adapter/api.rs index f01cd2f..a8a061b 100644 --- a/src/agent/pythd/adapter/api.rs +++ b/src/agent/pythd/adapter/api.rs @@ -5,6 +5,10 @@ use { AllAccountsMetadata, GlobalStore, }, + local::{ + self, + LocalStore, + }, Adapter, NotifyPriceSchedSubscription, NotifyPriceSubscription, @@ -28,10 +32,7 @@ use { network::Network, oracle::PriceEntry, }, - store::{ - local, - PriceIdentifier, - }, + store::PriceIdentifier, }, anyhow::{ anyhow, @@ -357,18 +358,18 @@ impl AdapterApi for Adapter { conf: Conf, status: String, ) -> Result<()> { - self.local_store_tx - .send(local::Message::Update { - price_identifier: pyth_sdk::Identifier::new(account.to_bytes()), - price_info: local::PriceInfo { - status: Adapter::map_status(&status)?, - price, - conf, - timestamp: Utc::now().naive_utc(), - }, - }) - .await - .map_err(|_| anyhow!("failed to send update to local store")) + LocalStore::update( + self, + pyth_sdk::Identifier::new(account.to_bytes()), + local::PriceInfo { + status: Adapter::map_status(&status)?, + price, + conf, + timestamp: Utc::now().naive_utc(), + }, + ) + .await + .map_err(|_| anyhow!("failed to send update to local store")) } // TODO: implement FromStr method on PriceStatus diff --git a/src/agent/pythd/adapter/local.rs b/src/agent/pythd/adapter/local.rs new file mode 100644 index 0000000..4679060 --- /dev/null +++ b/src/agent/pythd/adapter/local.rs @@ -0,0 +1,110 @@ +// The Local Store stores a copy of all the price information this local publisher +// is contributing to the network. The Exporters will then take this data and publish +// it to the networks. +use { + super::{ + Adapter, + AdapterApi, + PriceIdentifier, + }, + crate::agent::metrics::PriceLocalMetrics, + anyhow::{ + anyhow, + Result, + }, + chrono::NaiveDateTime, + prometheus_client::registry::Registry, + pyth_sdk_solana::state::PriceStatus, + slog::Logger, + solana_sdk::bs58, + std::collections::HashMap, + tokio::sync::RwLock, +}; + +#[derive(Clone, Debug)] +pub struct PriceInfo { + pub status: PriceStatus, + pub price: i64, + pub conf: u64, + pub timestamp: NaiveDateTime, +} + +impl PriceInfo { + /// Returns false if any non-timestamp fields differ with `other`. Used for last published state comparison in exporter. + pub fn cmp_no_timestamp(&self, other: &Self) -> bool { + // Prevent forgetting to use a new field if we expand the type. + #[deny(unused_variables)] + let Self { + status, + price, + conf, + timestamp: _, + } = self; + + status == &other.status && price == &other.price && conf == &other.conf + } +} + +pub struct Store { + prices: RwLock>, + metrics: PriceLocalMetrics, + logger: Logger, +} + +impl Store { + pub fn new(logger: Logger, registry: &mut Registry) -> Self { + Store { + prices: RwLock::new(HashMap::new()), + metrics: PriceLocalMetrics::new(registry), + logger, + } + } +} + +#[async_trait::async_trait] +pub trait LocalStore { + async fn update(&self, price_identifier: PriceIdentifier, price_info: PriceInfo) -> Result<()>; + async fn get_all_price_infos(&self) -> HashMap; +} + +// Allow downcasting Adapter into GlobalStore for functions that depend on the `GlobalStore` service. +impl<'a> From<&'a Adapter> for &'a Store { + fn from(adapter: &'a Adapter) -> &'a Store { + &adapter.local_store + } +} + +#[async_trait::async_trait] +impl LocalStore for T +where + for<'a> &'a T: Into<&'a Store>, + T: AdapterApi, + T: Sync, +{ + async fn update(&self, price_identifier: PriceIdentifier, price_info: PriceInfo) -> Result<()> { + debug!(self.into().logger, "local store received price update"; "identifier" => bs58::encode(price_identifier.to_bytes()).into_string()); + + // Drop the update if it is older than the current one stored for the price + if let Some(current_price_info) = self.into().prices.read().await.get(&price_identifier) { + if current_price_info.timestamp > price_info.timestamp { + return Err(anyhow!( + "Received stale timestamp for price {}", + price_identifier + )); + } + } + + self.into().metrics.update(&price_identifier, &price_info); + self.into() + .prices + .write() + .await + .insert(price_identifier, price_info); + + Ok(()) + } + + async fn get_all_price_infos(&self) -> HashMap { + self.into().prices.read().await.clone() + } +} diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 1f810c6..5a822af 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -81,7 +81,6 @@ pub mod network { pub fn spawn_network( config: Config, network: Network, - local_store_tx: Sender, keypair_request_tx: Sender, logger: Logger, adapter: Arc, @@ -110,7 +109,6 @@ pub mod network { config.rpc_timeout, publisher_permissions_rx, KeyStore::new(config.key_store.clone(), &logger)?, - local_store_tx, keypair_request_tx, logger, adapter, diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index ec0d46d..28e9475 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -3,7 +3,6 @@ use { super::{ super::store::{ self, - local::PriceInfo, PriceIdentifier, }, key_store, @@ -13,6 +12,10 @@ use { crate::agent::{ pythd::adapter::{ global::GlobalStore, + local::{ + LocalStore, + PriceInfo, + }, Adapter, }, remote_keypair_loader::{ @@ -181,7 +184,6 @@ pub fn spawn_exporter( HashMap>, >, key_store: KeyStore, - local_store_tx: Sender, keypair_request_tx: mpsc::Sender, logger: Logger, adapter: Arc, @@ -216,7 +218,6 @@ pub fn spawn_exporter( rpc_url, rpc_timeout, key_store, - local_store_tx, network_state_rx, transactions_tx, publisher_permissions_rx, @@ -249,9 +250,6 @@ pub struct Exporter { /// The Key Store key_store: KeyStore, - /// Channel on which to communicate with the local store - local_store_tx: Sender, - /// The last state published for each price identifier. Used to /// rule out stale data and prevent repetitive publishing of /// unchanged prices. @@ -290,7 +288,6 @@ impl Exporter { rpc_url: &str, rpc_timeout: Duration, key_store: KeyStore, - local_store_tx: Sender, network_state_rx: watch::Receiver, inflight_transactions_tx: Sender, publisher_permissions_rx: watch::Receiver< @@ -310,7 +307,6 @@ impl Exporter { network, publish_interval, key_store, - local_store_tx, last_published_state: HashMap::new(), network_state_rx, inflight_transactions_tx, @@ -615,14 +611,7 @@ impl Exporter { } async fn fetch_local_store_contents(&self) -> Result> { - let (result_tx, result_rx) = oneshot::channel(); - self.local_store_tx - .send(store::local::Message::LookupAllPriceInfo { result_tx }) - .await - .map_err(|_| anyhow!("failed to send lookup price info message to local store"))?; - result_rx - .await - .map_err(|_| anyhow!("failed to fetch from local store")) + Ok(LocalStore::get_all_price_infos(&*self.adapter).await) } async fn publish_batch(&self, batch: &[(Identifier, PriceInfo)]) -> Result<()> { diff --git a/src/agent/store.rs b/src/agent/store.rs index bbbcfec..6f0bb24 100644 --- a/src/agent/store.rs +++ b/src/agent/store.rs @@ -1,3 +1 @@ -pub mod local; - pub type PriceIdentifier = pyth_sdk::Identifier; diff --git a/src/agent/store/local.rs b/src/agent/store/local.rs deleted file mode 100644 index c00a8af..0000000 --- a/src/agent/store/local.rs +++ /dev/null @@ -1,135 +0,0 @@ -// The Local Store stores a copy of all the price information this local publisher -// is contributing to the network. The Exporters will then take this data and publish -// it to the networks. -use { - super::PriceIdentifier, - crate::agent::metrics::{ - PriceLocalMetrics, - PROMETHEUS_REGISTRY, - }, - anyhow::{ - anyhow, - Result, - }, - chrono::NaiveDateTime, - pyth_sdk_solana::state::PriceStatus, - slog::Logger, - solana_sdk::bs58, - std::collections::HashMap, - tokio::{ - sync::{ - mpsc, - oneshot, - }, - task::JoinHandle, - }, -}; - -#[derive(Clone, Debug)] -pub struct PriceInfo { - pub status: PriceStatus, - pub price: i64, - pub conf: u64, - pub timestamp: NaiveDateTime, -} - -impl PriceInfo { - /// Returns false if any non-timestamp fields differ with `other`. Used for last published state comparison in exporter. - pub fn cmp_no_timestamp(&self, other: &Self) -> bool { - // Prevent forgetting to use a new field if we expand the type. - #[deny(unused_variables)] - let Self { - status, - price, - conf, - timestamp: _, - } = self; - - status == &other.status && price == &other.price && conf == &other.conf - } -} - -#[derive(Debug)] -pub enum Message { - Update { - price_identifier: PriceIdentifier, - price_info: PriceInfo, - }, - LookupAllPriceInfo { - result_tx: oneshot::Sender>, - }, -} - -pub fn spawn_store(rx: mpsc::Receiver, logger: Logger) -> JoinHandle<()> { - tokio::spawn(async move { Store::new(rx, logger).await.run().await }) -} - -pub struct Store { - prices: HashMap, - metrics: PriceLocalMetrics, - rx: mpsc::Receiver, - logger: Logger, -} - -impl Store { - pub async fn new(rx: mpsc::Receiver, logger: Logger) -> Self { - Store { - prices: HashMap::new(), - metrics: PriceLocalMetrics::new(&mut &mut PROMETHEUS_REGISTRY.lock().await), - rx, - logger, - } - } - - pub async fn run(&mut self) { - while let Some(message) = self.rx.recv().await { - if let Err(err) = self.handle(message) { - error!(self.logger, "{}", err); - debug!(self.logger, "error context"; "context" => format!("{:?}", err)); - } - } - } - - fn handle(&mut self, message: Message) -> Result<()> { - match message { - Message::Update { - price_identifier, - price_info, - } => { - self.update(price_identifier, price_info)?; - Ok(()) - } - Message::LookupAllPriceInfo { result_tx } => result_tx - .send(self.get_all_price_infos()) - .map_err(|_| anyhow!("failed to send LookupAllPriceInfo result")), - } - } - - pub fn update( - &mut self, - price_identifier: PriceIdentifier, - price_info: PriceInfo, - ) -> Result<()> { - debug!(self.logger, "local store received price update"; "identifier" => bs58::encode(price_identifier.to_bytes()).into_string()); - - // Drop the update if it is older than the current one stored for the price - if let Some(current_price_info) = self.prices.get(&price_identifier) { - if current_price_info.timestamp > price_info.timestamp { - return Err(anyhow!( - "Received stale timestamp for price {}", - price_identifier - )); - } - } - - self.metrics.update(&price_identifier, &price_info); - - self.prices.insert(price_identifier, price_info); - - Ok(()) - } - - pub fn get_all_price_infos(&self) -> HashMap { - self.prices.clone() - } -}