Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(agent): rename adapter->state #124

Merged
merged 3 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,14 @@ pub mod metrics;
pub mod pythd;
pub mod remote_keypair_loader;
pub mod solana;
pub mod state;
pub mod store;
use {
self::{
config::Config,
pythd::{
adapter::notifier,
api::rpc,
},
pythd::api::rpc,
solana::network,
state::notifier,
},
anyhow::Result,
futures_util::future::join_all,
Expand Down Expand Up @@ -122,9 +121,8 @@ impl Agent {
let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10);

// Create the Pythd Adapter.
let adapter = Arc::new(
pythd::adapter::Adapter::new(self.config.pythd_adapter.clone(), logger.clone()).await,
);
let adapter =
Arc::new(state::State::new(self.config.pythd_adapter.clone(), logger.clone()).await);

// Spawn the primary network
jhs.extend(network::spawn_network(
Expand Down Expand Up @@ -197,6 +195,7 @@ pub mod config {
pythd,
remote_keypair_loader,
solana::network,
state,
},
anyhow::Result,
config as config_rs,
Expand All @@ -216,7 +215,7 @@ pub mod config {
pub primary_network: network::Config,
pub secondary_network: Option<network::Config>,
#[serde(default)]
pub pythd_adapter: pythd::adapter::Config,
pub pythd_adapter: state::Config,
#[serde(default)]
pub pythd_api_server: pythd::api::rpc::Config,
#[serde(default)]
Expand Down
12 changes: 6 additions & 6 deletions src/agent/dashboard.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use {
super::{
pythd::adapter::{
solana::{
network::Network,
oracle::PriceEntry,
},
state::{
global::GlobalStore,
local::{
LocalStore,
PriceInfo,
},
},
solana::{
network::Network,
oracle::PriceEntry,
},
},
crate::agent::{
metrics::MetricsServer,
pythd::adapter::global::{
state::global::{
AllAccountsData,
AllAccountsMetadata,
PriceAccountMetadata,
Expand Down
12 changes: 4 additions & 8 deletions src/agent/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
super::pythd::adapter::{
super::state::{
local::PriceInfo,
Adapter,
State,
},
crate::agent::{
solana::oracle::PriceEntry,
Expand Down Expand Up @@ -69,16 +69,12 @@ lazy_static! {
pub struct MetricsServer {
pub start_time: Instant,
pub logger: Logger,
pub adapter: Arc<Adapter>,
pub adapter: Arc<State>,
}

impl MetricsServer {
/// Instantiate a metrics API with a dashboard
pub async fn spawn(
addr: impl Into<SocketAddr> + 'static,
logger: Logger,
adapter: Arc<Adapter>,
) {
pub async fn spawn(addr: impl Into<SocketAddr> + 'static, logger: Logger, adapter: Arc<State>) {
let server = MetricsServer {
start_time: Instant::now(),
logger,
Expand Down
1 change: 0 additions & 1 deletion src/agent/pythd.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
pub mod adapter;
pub mod api;
14 changes: 7 additions & 7 deletions src/agent/pythd/api/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

use {
super::{
super::adapter,
Conf,
NotifyPrice,
NotifyPriceSched,
Price,
Pubkey,
SubscriptionID,
},
crate::agent::state,
anyhow::{
anyhow,
Result,
Expand Down Expand Up @@ -120,7 +120,7 @@ async fn handle_connection<S>(
notify_price_sched_tx_buffer: usize,
logger: Logger,
) where
S: adapter::AdapterApi,
S: state::StateApi,
S: Send,
S: Sync,
S: 'static,
Expand Down Expand Up @@ -168,7 +168,7 @@ async fn handle_next<S>(
notify_price_sched_rx: &mut mpsc::Receiver<NotifyPriceSched>,
) -> Result<()>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
tokio::select! {
msg = ws_rx.next() => {
Expand Down Expand Up @@ -210,7 +210,7 @@ async fn handle<S>(
msg: Message,
) -> Result<()>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
// Ignore control and binary messages
if !msg.is_text() {
Expand Down Expand Up @@ -296,7 +296,7 @@ async fn dispatch_and_catch_error<S>(
request: &Request<Method, Value>,
) -> Response<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
debug!(
logger,
Expand Down Expand Up @@ -436,7 +436,7 @@ pub async fn run<S>(
adapter: Arc<S>,
shutdown_rx: broadcast::Receiver<()>,
) where
S: adapter::AdapterApi,
S: state::StateApi,
S: Send,
S: Sync,
S: 'static,
Expand All @@ -454,7 +454,7 @@ async fn serve<S>(
mut shutdown_rx: broadcast::Receiver<()>,
) -> Result<()>
where
S: adapter::AdapterApi,
S: state::StateApi,
S: Send,
S: Sync,
S: 'static,
Expand Down
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/get_all_products.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use {
crate::agent::pythd::adapter,
crate::agent::state,
anyhow::Result,
};

pub async fn get_all_products<S>(adapter: &S) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let products = adapter.get_all_products().await?;
Ok(serde_json::to_value(products)?)
Expand Down
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/get_product.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
GetProductParams,
Method,
},
crate::agent::pythd::adapter,
crate::agent::state,
anyhow::{
anyhow,
Result,
Expand All @@ -19,7 +19,7 @@ pub async fn get_product<S>(
request: &Request<Method, Value>,
) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let params: GetProductParams = {
let value = request.params.clone();
Expand Down
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/get_product_list.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use {
crate::agent::pythd::adapter,
crate::agent::state,
anyhow::Result,
};

pub async fn get_product_list<S>(adapter: &S) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let product_list = adapter.get_product_list().await?;
Ok(serde_json::to_value(product_list)?)
Expand Down
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/subscribe_price.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
SubscribePriceParams,
SubscribeResult,
},
crate::agent::pythd::adapter,
crate::agent::state,
anyhow::{
anyhow,
Result,
Expand All @@ -23,7 +23,7 @@ pub async fn subscribe_price<S>(
request: &Request<Method, Value>,
) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let params: SubscribePriceParams = serde_json::from_value(
request
Expand Down
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/subscribe_price_sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
SubscribePriceSchedParams,
SubscribeResult,
},
crate::agent::pythd::adapter,
crate::agent::state,
anyhow::{
anyhow,
Result,
Expand All @@ -23,7 +23,7 @@ pub async fn subscribe_price_sched<S>(
request: &Request<Method, Value>,
) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let params: SubscribePriceSchedParams = serde_json::from_value(
request
Expand Down
4 changes: 2 additions & 2 deletions src/agent/pythd/api/rpc/update_price.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
Method,
UpdatePriceParams,
},
crate::agent::pythd::adapter,
crate::agent::state,
anyhow::{
anyhow,
Result,
Expand All @@ -19,7 +19,7 @@ pub async fn update_price<S>(
request: &Request<Method, Value>,
) -> Result<serde_json::Value>
where
S: adapter::AdapterApi,
S: state::StateApi,
{
let params: UpdatePriceParams = serde_json::from_value(
request
Expand Down
5 changes: 2 additions & 3 deletions src/agent/solana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ pub mod network {
oracle,
},
crate::agent::{
pythd::adapter::Adapter,
remote_keypair_loader::KeypairRequest,
store,
state::State,
},
anyhow::Result,
serde::{
Expand Down Expand Up @@ -83,7 +82,7 @@ pub mod network {
network: Network,
keypair_request_tx: Sender<KeypairRequest>,
logger: Logger,
adapter: Arc<Adapter>,
adapter: Arc<State>,
) -> Result<Vec<JoinHandle<()>>> {
// Publisher permissions updates between oracle and exporter
let (publisher_permissions_tx, publisher_permissions_rx) = watch::channel(<_>::default());
Expand Down
24 changes: 10 additions & 14 deletions src/agent/solana/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
use {
self::transaction_monitor::TransactionMonitor,
super::{
super::store::{
self,
PriceIdentifier,
},
super::store::PriceIdentifier,
key_store,
network::Network,
oracle::PricePublishingMetadata,
},
crate::agent::{
pythd::adapter::{
remote_keypair_loader::{
KeypairRequest,
RemoteKeypairLoader,
},
state::{
global::GlobalStore,
local::{
LocalStore,
PriceInfo,
},
Adapter,
},
remote_keypair_loader::{
KeypairRequest,
RemoteKeypairLoader,
State,
},
},
anyhow::{
Expand Down Expand Up @@ -77,7 +74,6 @@ use {
self,
Sender,
},
oneshot,
watch,
},
task::JoinHandle,
Expand Down Expand Up @@ -186,7 +182,7 @@ pub fn spawn_exporter(
key_store: KeyStore,
keypair_request_tx: mpsc::Sender<KeypairRequest>,
logger: Logger,
adapter: Arc<Adapter>,
adapter: Arc<State>,
) -> Result<Vec<JoinHandle<()>>> {
// Create and spawn the network state querier
let (network_state_tx, network_state_rx) = watch::channel(Default::default());
Expand Down Expand Up @@ -278,7 +274,7 @@ pub struct Exporter {

logger: Logger,

adapter: Arc<Adapter>,
adapter: Arc<State>,
}

impl Exporter {
Expand All @@ -295,7 +291,7 @@ impl Exporter {
>,
keypair_request_tx: mpsc::Sender<KeypairRequest>,
logger: Logger,
adapter: Arc<Adapter>,
adapter: Arc<State>,
) -> Self {
let publish_interval = time::interval(config.publish_interval_duration);
Exporter {
Expand Down
Loading
Loading