Skip to content

Commit

Permalink
refactor(agent): rename adapter->state (#124)
Browse files Browse the repository at this point in the history
* refactor(agent): move adapter out of pythd

* refactor(agent): fix adapter imports

* refactor(agent): rename adapter->state
  • Loading branch information
Reisen authored May 29, 2024
1 parent c5a717e commit e2071ac
Show file tree
Hide file tree
Showing 18 changed files with 91 additions and 104 deletions.
15 changes: 7 additions & 8 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,14 @@ pub mod metrics;
pub mod pythd;
pub mod remote_keypair_loader;
pub mod solana;
pub mod state;
pub mod store;
use {
self::{
config::Config,
pythd::{
adapter::notifier,
api::rpc,
},
pythd::api::rpc,
solana::network,
state::notifier,
},
anyhow::Result,
futures_util::future::join_all,
Expand Down Expand Up @@ -122,9 +121,8 @@ impl Agent {
let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10);

// Create the Pythd Adapter.
let adapter = Arc::new(
pythd::adapter::Adapter::new(self.config.pythd_adapter.clone(), logger.clone()).await,
);
let adapter =
Arc::new(state::State::new(self.config.pythd_adapter.clone(), logger.clone()).await);

// Spawn the primary network
jhs.extend(network::spawn_network(
Expand Down Expand Up @@ -197,6 +195,7 @@ pub mod config {
pythd,
remote_keypair_loader,
solana::network,
state,
},
anyhow::Result,
config as config_rs,
Expand All @@ -216,7 +215,7 @@ pub mod config {
pub primary_network: network::Config,
pub secondary_network: Option<network::Config>,
#[serde(default)]
pub pythd_adapter: pythd::adapter::Config,
pub pythd_adapter: state::Config,
#[serde(default)]
pub pythd_api_server: pythd::api::rpc::Config,
#[serde(default)]
Expand Down
12 changes: 6 additions & 6 deletions src/agent/dashboard.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use {
super::{
pythd::adapter::{
solana::{
network::Network,
oracle::PriceEntry,
},
state::{
global::GlobalStore,
local::{
LocalStore,
PriceInfo,
},
},
solana::{
network::Network,
oracle::PriceEntry,
},
},
crate::agent::{
metrics::MetricsServer,
pythd::adapter::global::{
state::global::{
AllAccountsData,
AllAccountsMetadata,
PriceAccountMetadata,
Expand Down
12 changes: 4 additions & 8 deletions src/agent/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
super::pythd::adapter::{
super::state::{
local::PriceInfo,
Adapter,
State,
},
crate::agent::{
solana::oracle::PriceEntry,
Expand Down Expand Up @@ -69,16 +69,12 @@ lazy_static! {
pub struct MetricsServer {
pub start_time: Instant,
pub logger: Logger,
pub adapter: Arc<Adapter>,
pub adapter: Arc<State>,
}

impl MetricsServer {
/// Instantiate a metrics API with a dashboard
pub async fn spawn(
addr: impl Into<SocketAddr> + 'static,
logger: Logger,
adapter: Arc<Adapter>,
) {
pub async fn spawn(addr: impl Into<SocketAddr> + 'static, logger: Logger, adapter: Arc<State>) {
let server = MetricsServer {
start_time: Instant::now(),
logger,
Expand Down
1 change: 0 additions & 1 deletion src/agent/pythd.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
pub mod adapter;
pub mod api;
14 changes: 7 additions & 7 deletions src/agent/pythd/api/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

use {
super::{
super::adapter,
Conf,
NotifyPrice,
NotifyPriceSched,
Price,
Pubkey,
SubscriptionID,
},
crate::agent::state,
anyhow::{
anyhow,
Result,
Expand Down Expand Up @@ -120,7 +120,7 @@ async fn handle_connection<S>(
notify_price_sched_tx_buffer: usize,
logger: Logger,
) where
S: adapter::AdapterApi,
S: state::StateApi,
S: Send,
S: Sync,
S: 'static,
Expand Down Expand Up @@ -168,7 +168,7 @@ async fn handle_next<S>(
notify_price_sched_rx: &mut mpsc::Receiver<NotifyPriceSched>,
) -> Result<()>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
tokio::select! {
msg = ws_rx.next() => {
Expand Down Expand Up @@ -210,7 +210,7 @@ async fn handle<S>(
msg: Message,
) -> Result<()>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
// Ignore control and binary messages
if !msg.is_text() {
Expand Down Expand Up @@ -296,7 +296,7 @@ async fn dispatch_and_catch_error<S>(
request: &Request<Method, Value>,
) -> Response<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
debug!(
logger,
Expand Down Expand Up @@ -436,7 +436,7 @@ pub async fn run<S>(
adapter: Arc<S>,
shutdown_rx: broadcast::Receiver<()>,
) where
S: adapter::AdapterApi,
S: state::StateApi,
S: Send,
S: Sync,
S: 'static,
Expand All @@ -454,7 +454,7 @@ async fn serve<S>(
mut shutdown_rx: broadcast::Receiver<()>,
) -> Result<()>
where
S: adapter::AdapterApi,
S: state::StateApi,
S: Send,
S: Sync,
S: 'static,
Expand Down
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/get_all_products.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use {
crate::agent::pythd::adapter,
crate::agent::state,
anyhow::Result,
};

pub async fn get_all_products<S>(adapter: &S) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let products = adapter.get_all_products().await?;
Ok(serde_json::to_value(products)?)
Expand Down
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/get_product.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
GetProductParams,
Method,
},
crate::agent::pythd::adapter,
crate::agent::state,
anyhow::{
anyhow,
Result,
Expand All @@ -19,7 +19,7 @@ pub async fn get_product<S>(
request: &Request<Method, Value>,
) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let params: GetProductParams = {
let value = request.params.clone();
Expand Down
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/get_product_list.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use {
crate::agent::pythd::adapter,
crate::agent::state,
anyhow::Result,
};

pub async fn get_product_list<S>(adapter: &S) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let product_list = adapter.get_product_list().await?;
Ok(serde_json::to_value(product_list)?)
Expand Down
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/subscribe_price.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
SubscribePriceParams,
SubscribeResult,
},
crate::agent::pythd::adapter,
crate::agent::state,
anyhow::{
anyhow,
Result,
Expand All @@ -23,7 +23,7 @@ pub async fn subscribe_price<S>(
request: &Request<Method, Value>,
) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let params: SubscribePriceParams = serde_json::from_value(
request
Expand Down
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/subscribe_price_sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
SubscribePriceSchedParams,
SubscribeResult,
},
crate::agent::pythd::adapter,
crate::agent::state,
anyhow::{
anyhow,
Result,
Expand All @@ -23,7 +23,7 @@ pub async fn subscribe_price_sched<S>(
request: &Request<Method, Value>,
) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let params: SubscribePriceSchedParams = serde_json::from_value(
request
Expand Down
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/update_price.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
Method,
UpdatePriceParams,
},
crate::agent::pythd::adapter,
crate::agent::state,
anyhow::{
anyhow,
Result,
Expand All @@ -19,7 +19,7 @@ pub async fn update_price<S>(
request: &Request<Method, Value>,
) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let params: UpdatePriceParams = serde_json::from_value(
request
Expand Down
5 changes: 2 additions & 3 deletions src/agent/solana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ pub mod network {
oracle,
},
crate::agent::{
pythd::adapter::Adapter,
remote_keypair_loader::KeypairRequest,
store,
state::State,
},
anyhow::Result,
serde::{
Expand Down Expand Up @@ -83,7 +82,7 @@ pub mod network {
network: Network,
keypair_request_tx: Sender<KeypairRequest>,
logger: Logger,
adapter: Arc<Adapter>,
adapter: Arc<State>,
) -> Result<Vec<JoinHandle<()>>> {
// Publisher permissions updates between oracle and exporter
let (publisher_permissions_tx, publisher_permissions_rx) = watch::channel(<_>::default());
Expand Down
24 changes: 10 additions & 14 deletions src/agent/solana/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
use {
self::transaction_monitor::TransactionMonitor,
super::{
super::store::{
self,
PriceIdentifier,
},
super::store::PriceIdentifier,
key_store,
network::Network,
oracle::PricePublishingMetadata,
},
crate::agent::{
pythd::adapter::{
remote_keypair_loader::{
KeypairRequest,
RemoteKeypairLoader,
},
state::{
global::GlobalStore,
local::{
LocalStore,
PriceInfo,
},
Adapter,
},
remote_keypair_loader::{
KeypairRequest,
RemoteKeypairLoader,
State,
},
},
anyhow::{
Expand Down Expand Up @@ -77,7 +74,6 @@ use {
self,
Sender,
},
oneshot,
watch,
},
task::JoinHandle,
Expand Down Expand Up @@ -186,7 +182,7 @@ pub fn spawn_exporter(
key_store: KeyStore,
keypair_request_tx: mpsc::Sender<KeypairRequest>,
logger: Logger,
adapter: Arc<Adapter>,
adapter: Arc<State>,
) -> Result<Vec<JoinHandle<()>>> {
// Create and spawn the network state querier
let (network_state_tx, network_state_rx) = watch::channel(Default::default());
Expand Down Expand Up @@ -278,7 +274,7 @@ pub struct Exporter {

logger: Logger,

adapter: Arc<Adapter>,
adapter: Arc<State>,
}

impl Exporter {
Expand All @@ -295,7 +291,7 @@ impl Exporter {
>,
keypair_request_tx: mpsc::Sender<KeypairRequest>,
logger: Logger,
adapter: Arc<Adapter>,
adapter: Arc<State>,
) -> Self {
let publish_interval = time::interval(config.publish_interval_duration);
Exporter {
Expand Down
Loading

0 comments on commit e2071ac

Please sign in to comment.