Skip to content

Commit

Permalink
refactor(agent): convert local store to an Api (#123)
Browse files Browse the repository at this point in the history
  • Loading branch information
Reisen authored May 27, 2024
1 parent 2d074fb commit c5a717e
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 256 deletions.
15 changes: 1 addition & 14 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,18 @@ impl Agent {
// Create the channels
// TODO: make all components listen to shutdown signal
let (shutdown_tx, _) = broadcast::channel(self.config.channel_capacities.shutdown);
let (local_store_tx, local_store_rx) =
mpsc::channel(self.config.channel_capacities.local_store);
let (primary_keypair_loader_tx, primary_keypair_loader_rx) = mpsc::channel(10);
let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10);

// Create the Pythd Adapter.
let adapter = Arc::new(
pythd::adapter::Adapter::new(
self.config.pythd_adapter.clone(),
local_store_tx.clone(),
logger.clone(),
)
.await,
pythd::adapter::Adapter::new(self.config.pythd_adapter.clone(), logger.clone()).await,
);

// Spawn the primary network
jhs.extend(network::spawn_network(
self.config.primary_network.clone(),
network::Network::Primary,
local_store_tx.clone(),
primary_keypair_loader_tx,
logger.new(o!("primary" => true)),
adapter.clone(),
Expand All @@ -148,7 +140,6 @@ impl Agent {
jhs.extend(network::spawn_network(
config.clone(),
network::Network::Secondary,
local_store_tx.clone(),
secondary_keypair_loader_tx,
logger.new(o!("primary" => false)),
adapter.clone(),
Expand All @@ -161,9 +152,6 @@ impl Agent {
shutdown_tx.subscribe(),
)));

// Spawn the Local Store
jhs.push(store::local::spawn_store(local_store_rx, logger.clone()));

// Spawn the Pythd API Server
jhs.push(tokio::spawn(rpc::run(
self.config.pythd_api_server.clone(),
Expand All @@ -175,7 +163,6 @@ impl Agent {
// Spawn the metrics server
jhs.push(tokio::spawn(metrics::MetricsServer::spawn(
self.config.metrics_server.bind_address,
local_store_tx,
logger.clone(),
adapter,
)));
Expand Down
26 changes: 8 additions & 18 deletions src/agent/dashboard.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use {
super::{
pythd::adapter::global::GlobalStore,
pythd::adapter::{
global::GlobalStore,
local::{
LocalStore,
PriceInfo,
},
},
solana::{
network::Network,
oracle::PriceEntry,
},
store::local::{
Message,
PriceInfo,
},
},
crate::agent::{
metrics::MetricsServer,
Expand All @@ -34,7 +36,6 @@ use {
},
time::Duration,
},
tokio::sync::oneshot,
typed_html::{
dom::DOMTree,
html,
Expand All @@ -45,22 +46,11 @@ use {
impl MetricsServer {
/// Create an HTML view of store data
pub async fn render_dashboard(&self) -> Result<String, Box<dyn std::error::Error>> {
// Prepare response channel for requests
let (local_tx, local_rx) = oneshot::channel();

// Request price data from local and global store
self.local_store_tx
.send(Message::LookupAllPriceInfo {
result_tx: local_tx,
})
.await?;

let local_data = LocalStore::get_all_price_infos(&*self.adapter).await;
let global_data = GlobalStore::accounts_data(&*self.adapter, Network::Primary).await?;
let global_metadata = GlobalStore::accounts_metadata(&*self.adapter).await?;

// Await the results
let local_data = local_rx.await?;

let symbol_view =
build_dashboard_data(local_data, global_data, global_metadata, &self.logger);

Expand Down
26 changes: 8 additions & 18 deletions src/agent/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use {
super::{
pythd::adapter::Adapter,
store::local::Message,
super::pythd::adapter::{
local::PriceInfo,
Adapter,
},
crate::agent::{
solana::oracle::PriceEntry,
store::{
local::PriceInfo,
PriceIdentifier,
},
store::PriceIdentifier,
},
lazy_static::lazy_static,
prometheus_client::{
Expand All @@ -34,10 +31,7 @@ use {
},
time::Instant,
},
tokio::sync::{
mpsc,
Mutex,
},
tokio::sync::Mutex,
warp::{
hyper::StatusCode,
reply,
Expand Down Expand Up @@ -73,23 +67,19 @@ lazy_static! {
/// Internal metrics server state, holds state needed for serving
/// dashboard and metrics.
pub struct MetricsServer {
/// Used to pull the state of all symbols in local store
pub local_store_tx: mpsc::Sender<Message>,
pub start_time: Instant,
pub logger: Logger,
pub adapter: Arc<Adapter>,
pub start_time: Instant,
pub logger: Logger,
pub adapter: Arc<Adapter>,
}

impl MetricsServer {
/// Instantiate a metrics API with a dashboard
pub async fn spawn(
addr: impl Into<SocketAddr> + 'static,
local_store_tx: mpsc::Sender<Message>,
logger: Logger,
adapter: Arc<Adapter>,
) {
let server = MetricsServer {
local_store_tx,
start_time: Instant::now(),
logger,
adapter,
Expand Down
57 changes: 22 additions & 35 deletions src/agent/pythd/adapter.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use {
super::{
super::store::{
local,
PriceIdentifier,
},
super::store::PriceIdentifier,
api::{
NotifyPrice,
NotifyPriceSched,
Expand All @@ -29,6 +26,7 @@ use {

pub mod api;
pub mod global;
pub mod local;
pub use api::{
notifier,
AdapterApi,
Expand Down Expand Up @@ -68,14 +66,14 @@ pub struct Adapter {
/// The fixed interval at which Notify Price Sched notifications are sent
notify_price_sched_interval_duration: Duration,

/// Channel on which to communicate with the local store
local_store_tx: mpsc::Sender<local::Message>,

/// The logger
logger: Logger,

/// Global store for managing the unified state of Pyth-on-Solana networks.
global_store: global::Store,

/// Local store for managing the unpushed state.
local_store: local::Store,
}

/// Represents a single Notify Price Sched subscription
Expand All @@ -95,19 +93,15 @@ struct NotifyPriceSubscription {
}

impl Adapter {
pub async fn new(
config: Config,
local_store_tx: mpsc::Sender<local::Message>,
logger: Logger,
) -> Self {
pub async fn new(config: Config, logger: Logger) -> Self {
let registry = &mut *PROMETHEUS_REGISTRY.lock().await;
Adapter {
global_store: global::Store::new(logger.clone(), registry),
local_store: local::Store::new(logger.clone(), registry),
subscription_id_seq: 1.into(),
notify_price_sched_subscriptions: RwLock::new(HashMap::new()),
notify_price_subscriptions: RwLock::new(HashMap::new()),
notify_price_sched_interval_duration: config.notify_price_sched_interval_duration,
local_store_tx,
logger,
}
}
Expand All @@ -128,8 +122,9 @@ mod tests {
},
crate::agent::{
pythd::{
api,
adapter::local::LocalStore,
api::{
self,
NotifyPrice,
NotifyPriceSched,
PriceAccountMetadata,
Expand All @@ -140,7 +135,6 @@ mod tests {
},
},
solana,
store::local,
},
iobuffer::IoBuffer,
pyth_sdk::Identifier,
Expand Down Expand Up @@ -172,29 +166,26 @@ mod tests {
};

struct TestAdapter {
adapter: Arc<Adapter>,
local_store_rx: mpsc::Receiver<local::Message>,
shutdown_tx: broadcast::Sender<()>,
jh: JoinHandle<()>,
adapter: Arc<Adapter>,
shutdown_tx: broadcast::Sender<()>,
jh: JoinHandle<()>,
}

async fn setup() -> TestAdapter {
// Create and spawn an adapter
let (local_store_tx, local_store_rx) = mpsc::channel(1000);
let notify_price_sched_interval_duration = Duration::from_nanos(10);
let logger = slog_test::new_test_logger(IoBuffer::new());
let config = Config {
notify_price_sched_interval_duration,
};
let adapter = Arc::new(Adapter::new(config, local_store_tx, logger).await);
let adapter = Arc::new(Adapter::new(config, logger).await);
let (shutdown_tx, _) = broadcast::channel(1);

// Spawn Price Notifier
let jh = tokio::spawn(notifier(adapter.clone(), shutdown_tx.subscribe()));

TestAdapter {
adapter,
local_store_rx,
shutdown_tx,
jh,
}
Expand Down Expand Up @@ -1379,7 +1370,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_update_price() {
// Start the test adapter
let mut test_adapter = setup().await;
let test_adapter = setup().await;

// Send an Update Price message
let account = "CkMrDWtmFJZcmAUC11qNaWymbXQKvnRx4cq1QudLav7t"
Expand All @@ -1394,18 +1385,14 @@ mod tests {
.unwrap();

// Check that the local store indeed received the correct update
match test_adapter.local_store_rx.recv().await.unwrap() {
local::Message::Update {
price_identifier,
price_info,
} => {
assert_eq!(price_identifier, Identifier::new(account.to_bytes()));
assert_eq!(price_info.price, price);
assert_eq!(price_info.conf, conf);
assert_eq!(price_info.status, PriceStatus::Trading);
}
_ => panic!("Uexpected message received by local store from adapter"),
};
let price_infos = LocalStore::get_all_price_infos(&*test_adapter.adapter).await;
let price_info = price_infos
.get(&Identifier::new(account.to_bytes()))
.unwrap();

assert_eq!(price_info.price, price);
assert_eq!(price_info.conf, conf);
assert_eq!(price_info.status, PriceStatus::Trading);

let _ = test_adapter.shutdown_tx.send(());
test_adapter.jh.abort();
Expand Down
33 changes: 17 additions & 16 deletions src/agent/pythd/adapter/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use {
AllAccountsMetadata,
GlobalStore,
},
local::{
self,
LocalStore,
},
Adapter,
NotifyPriceSchedSubscription,
NotifyPriceSubscription,
Expand All @@ -28,10 +32,7 @@ use {
network::Network,
oracle::PriceEntry,
},
store::{
local,
PriceIdentifier,
},
store::PriceIdentifier,
},
anyhow::{
anyhow,
Expand Down Expand Up @@ -357,18 +358,18 @@ impl AdapterApi for Adapter {
conf: Conf,
status: String,
) -> Result<()> {
self.local_store_tx
.send(local::Message::Update {
price_identifier: pyth_sdk::Identifier::new(account.to_bytes()),
price_info: local::PriceInfo {
status: Adapter::map_status(&status)?,
price,
conf,
timestamp: Utc::now().naive_utc(),
},
})
.await
.map_err(|_| anyhow!("failed to send update to local store"))
LocalStore::update(
self,
pyth_sdk::Identifier::new(account.to_bytes()),
local::PriceInfo {
status: Adapter::map_status(&status)?,
price,
conf,
timestamp: Utc::now().naive_utc(),
},
)
.await
.map_err(|_| anyhow!("failed to send update to local store"))
}

// TODO: implement FromStr method on PriceStatus
Expand Down
Loading

0 comments on commit c5a717e

Please sign in to comment.