From bde79abff3396c84ef56bb465bc552a68bb73eef Mon Sep 17 00:00:00 2001 From: ts0yu <120932697+ts0yu@users.noreply.github.com> Date: Tue, 23 Jul 2024 16:23:21 +0000 Subject: [PATCH] feat: generic ito process, general clean up --- Cargo.lock | 1 + Cargo.toml | 3 +- src/arbitrageur.rs | 16 +++------ src/deployer.rs | 22 ++++++------ src/lib.rs | 25 ++++++++----- src/liquidity_admin.rs | 16 ++++----- src/pool_admin.rs | 18 ++++------ src/price_changer.rs | 79 +++++++++++++++--------------------------- src/types/process.rs | 12 +++---- 9 files changed, 77 insertions(+), 115 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 76179e0..05a6311 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -660,6 +660,7 @@ version = "0.1.0" dependencies = [ "alloy", "alloy-sol-types", + "alloy-transport-http", "anyhow", "async-trait", "crossbeam-channel", diff --git a/Cargo.toml b/Cargo.toml index 8b86258..e9e3d71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ version = "0.1.0" edition = "2021" [dependencies] -alloy = { version = "0.1.3", features = ["full", "node-bindings", "sol-types", "contract"] } +alloy = { version = "0.1.3", features = ["full", "node-bindings", "sol-types", "contract", "transports", "transport-http"] } tokio = { version = "1.36.0", features = ["full"] } revm = "10.0.0" octane = { git = "https://github.com/arena-rs/octane" } @@ -18,6 +18,7 @@ async-trait = "0.1.81" anyhow = "1.0.86" tracing = "0.1.40" env_logger = "0.11.3" +alloy-transport-http = "0.1" serde_json = "1.0.120" futures = "0.3.30" rand = "0.8.5" diff --git a/src/arbitrageur.rs b/src/arbitrageur.rs index 7f9456a..b84a626 100644 --- a/src/arbitrageur.rs +++ b/src/arbitrageur.rs @@ -26,18 +26,12 @@ impl Behavior for Arbitrageur { let mut stream = messager.clone().stream().unwrap(); while let Some(event) = stream.next().await { - let query: DeploymentResponse = match serde_json::from_str(&event.data) { - Ok(query) => query, - Err(_) => { - eprintln!("Failed to deserialize the event data into a DeploymentResponse"); - continue; + if let Ok(query) = serde_json::from_str::(&event.data) { + match query { + DeploymentResponse::PoolManager(address) => self.deployment = Some(address), + DeploymentResponse::Pool(params) => self.pool = Some(params), + _ => {} } - }; - - match query { - DeploymentResponse::PoolManager(address) => self.deployment = Some(address), - DeploymentResponse::Pool(params) => self.pool = Some(params), - _ => {} } if self.pool.is_some() && self.deployment.is_some() { diff --git a/src/deployer.rs b/src/deployer.rs index 1aaa50e..bd84559 100644 --- a/src/deployer.rs +++ b/src/deployer.rs @@ -1,12 +1,8 @@ use super::*; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize)] pub struct Deployer { - #[serde(skip)] - pub messager: Option, - - #[serde(skip)] - pub client: Option>, + pub base: Base, } #[derive(Debug, Deserialize, Serialize, Clone)] @@ -55,8 +51,8 @@ impl Behavior for Deployer { ) .await?; - self.client = Some(client.clone()); - self.messager = Some(messager.clone()); + self.base.client = Some(client.clone()); + self.base.messager = Some(messager.clone()); Ok(Some(messager.stream().unwrap())) } @@ -80,13 +76,14 @@ impl Behavior for Deployer { decimals, } => { let token = - ArenaToken::deploy(self.client.clone().unwrap(), name, symbol, decimals) + ArenaToken::deploy(self.base.client.clone().unwrap(), name, symbol, decimals) .await .unwrap(); println!("Token deployed at address: {:?}", token.address()); - self.messager + self.base + .messager .clone() .unwrap() .send(To::All, DeploymentResponse::Token(*token.address())) @@ -100,7 +97,7 @@ impl Behavior for Deployer { initial_price, } => { let lex = LiquidExchange::deploy( - self.client.clone().unwrap(), + self.base.client.clone().unwrap(), token_0, token_1, U256::from((initial_price * 10f64.powf(18.0)) as u64), @@ -108,7 +105,8 @@ impl Behavior for Deployer { .await .unwrap(); - self.messager + self.base + .messager .clone() .unwrap() .send(To::All, DeploymentResponse::LiquidExchange(*lex.address())) diff --git a/src/lib.rs b/src/lib.rs index b7c93a0..1ee2813 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{fmt::Debug, sync::Arc}; use alloy::primitives::{Address, Bytes, Uint, U256}; use anyhow::Result; @@ -23,8 +23,8 @@ use crate::{ pool_admin::PoolParams, price_changer::{PriceChanger, PriceUpdate}, types::process::{OrnsteinUhlenbeck, StochasticProcess}, + LiquidExchange::LiquidExchangeInstance, }; - pub mod arbitrageur; pub mod bindings; pub mod deployer; @@ -33,6 +33,15 @@ pub mod pool_admin; pub mod price_changer; pub mod types; +#[derive(Debug, Default, Deserialize, Serialize, Clone)] +pub struct Base { + #[serde(skip)] + pub messager: Option, + + #[serde(skip)] + pub client: Option>, +} + #[cfg(test)] mod tests { use octane::{agent::Agent, world::World}; @@ -172,10 +181,7 @@ mod tests { client: None, }); - let deployer = Agent::builder("deployer").with_behavior(Deployer { - messager: None, - client: None, - }); + let deployer = Agent::builder("deployer").with_behavior(Deployer::default()); let mock_deployer = Agent::builder("mock_deployer").with_behavior(MockOrchestrator { client: None, @@ -183,9 +189,10 @@ mod tests { tokens: vec![], }); - let changer = Agent::builder("pricechanger").with_behavior(PriceChanger::new( - OrnsteinUhlenbeck::new(1.0, 0.15, 0.0, 0.3, 1.0 / 252.0), - )); + let changer = + Agent::builder("pricechanger").with_behavior(PriceChanger::::new( + OrnsteinUhlenbeck::new(1.0, 0.15, 0.0, 0.3, 1.0 / 252.0), + )); let mut world = World::new("id"); diff --git a/src/liquidity_admin.rs b/src/liquidity_admin.rs index 72ddbd4..cc32d69 100644 --- a/src/liquidity_admin.rs +++ b/src/liquidity_admin.rs @@ -2,11 +2,7 @@ use super::*; #[derive(Debug, Deserialize, Serialize, Clone)] pub struct LiquidityAdmin { - #[serde(skip)] - pub messager: Option, - - #[serde(skip)] - pub client: Option>, + pub base: Base, pub deployment: Option
, } @@ -27,8 +23,8 @@ impl Behavior for LiquidityAdmin { client: Arc, messager: Messager, ) -> Result>> { - self.client = Some(client.clone()); - self.messager = Some(messager.clone()); + self.base.client = Some(client.clone()); + self.base.messager = Some(messager.clone()); let mut stream = messager.clone().stream().unwrap(); @@ -53,21 +49,21 @@ impl Behavior for LiquidityAdmin { Err(_) => return Ok(ControlFlow::Continue), }; - ArenaToken::new(query.pool.currency0, self.client.clone().unwrap()) + ArenaToken::new(query.pool.currency0, self.base.client.clone().unwrap()) .approve(self.deployment.unwrap(), Uint::MAX) .send() .await? .watch() .await?; - ArenaToken::new(query.pool.currency1, self.client.clone().unwrap()) + ArenaToken::new(query.pool.currency1, self.base.client.clone().unwrap()) .approve(self.deployment.unwrap(), Uint::MAX) .send() .await? .watch() .await?; - PoolManager::new(self.deployment.unwrap(), self.client.clone().unwrap()) + PoolManager::new(self.deployment.unwrap(), self.base.client.clone().unwrap()) .modifyLiquidity(query.pool, query.modification, Bytes::default()) .send() .await? diff --git a/src/pool_admin.rs b/src/pool_admin.rs index 811c5fe..42d40fc 100644 --- a/src/pool_admin.rs +++ b/src/pool_admin.rs @@ -5,11 +5,7 @@ use super::*; #[derive(Debug, Serialize, Deserialize)] pub struct PoolAdmin { - #[serde(skip)] - pub messager: Option, - - #[serde(skip)] - pub client: Option>, + pub base: Base, pub deployment: Option
, } @@ -30,9 +26,6 @@ impl Behavior for PoolAdmin { client: Arc, messager: Messager, ) -> Result>> { - self.client = Some(client.clone()); - self.messager = Some(messager.clone()); - let mut stream = messager.clone().stream().unwrap(); while let Some(event) = stream.next().await { @@ -50,8 +43,8 @@ impl Behavior for PoolAdmin { } } - self.client = Some(client.clone()); - self.messager = Some(messager.clone()); + self.base.client = Some(client.clone()); + self.base.messager = Some(messager.clone()); Ok(Some(messager.clone().stream().unwrap())) } @@ -71,7 +64,7 @@ impl Behavior for PoolAdmin { // will never panic as is always Some let pool_manager = - PoolManager::new(self.deployment.unwrap(), self.client.clone().unwrap()); + PoolManager::new(self.deployment.unwrap(), self.base.client.clone().unwrap()); let tx = pool_manager.initialize( pool_creation.key, @@ -81,7 +74,8 @@ impl Behavior for PoolAdmin { tx.send().await?.watch().await?; - self.messager + self.base + .messager .clone() .unwrap() .send(To::All, DeploymentResponse::Pool(key)) diff --git a/src/price_changer.rs b/src/price_changer.rs index 09f5fd3..d30b217 100644 --- a/src/price_changer.rs +++ b/src/price_changer.rs @@ -1,77 +1,53 @@ +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; + use super::*; -#[derive(Serialize, Deserialize, Debug)] -pub struct PriceChanger { - #[serde(skip)] - pub messager: Option, +#[derive(Debug, Serialize, Deserialize)] +pub struct PriceUpdate; - #[serde(skip)] - pub client: Option>, +#[derive(Serialize, Deserialize, Debug)] +pub struct PriceChanger +where + T: StochasticProcess, +{ + pub base: Base, - pub process: OrnsteinUhlenbeck, + pub process: T, pub lex: Option
, } -impl PriceChanger { - pub fn new(process: OrnsteinUhlenbeck) -> Self { +impl PriceChanger { + pub fn new(process: T) -> Self { Self { - messager: None, - client: None, + base: Base::default(), process, lex: None, } } } -use serde::{ser::SerializeStruct, Deserializer, Serializer}; - -#[derive(Debug, Serialize, Deserialize)] -pub struct PriceUpdate; - -// impl Serialize for PriceUpdate { -// fn serialize(&self, serializer: S) -> Result -// where -// S: Serializer, -// { -// let mut state = serializer.serialize_struct("PriceUpdate", 1)?; -// state.serialize_field("type", "PriceUpdate")?; -// state.end() -// } -// } - -// impl<'de> Deserialize<'de> for PriceUpdate { -// fn deserialize(deserializer: D) -> Result -// where -// D: Deserializer<'de>, -// { -// serde::de::Deserialize::deserialize(deserializer)?; -// Ok(PriceUpdate) -// } -// } - #[async_trait::async_trait] -impl Behavior for PriceChanger { +impl Behavior for PriceChanger +where + T: Debug + Send + Sync + for<'a> Deserialize<'a> + Serialize + StochasticProcess + 'static, +{ async fn startup( &mut self, client: Arc, messager: Messager, ) -> Result>> { - self.client = Some(client.clone()); - self.messager = Some(messager.clone()); + self.base.client = Some(client.clone()); + self.base.messager = Some(messager.clone()); let mut stream = messager.clone().stream().unwrap(); while let Some(event) = stream.next().await { let query: DeploymentResponse = match serde_json::from_str(&event.data) { - Ok(query) => { - println!("heresd"); - query - } - Err(_) => { - // eprintln!("Failed to deserialize the event datfa into a DeploymentResponse"); - continue; - } + Ok(query) => query, + Err(_) => continue, }; if let DeploymentResponse::LiquidExchange(address) = query { @@ -80,7 +56,7 @@ impl Behavior for PriceChanger { } } - Ok(Some(messager.clone().stream().unwrap())) + Ok(Some(messager.stream().unwrap())) } async fn process(&mut self, event: Message) -> Result { @@ -92,7 +68,8 @@ impl Behavior for PriceChanger { } }; - let liquid_exchange = LiquidExchange::new(self.lex.unwrap(), self.client.clone().unwrap()); + let liquid_exchange = + LiquidExchange::new(self.lex.unwrap(), self.base.client.clone().unwrap()); let tx = liquid_exchange.setPrice(alloy::primitives::utils::parse_ether( &self.process.step().to_string(), @@ -100,8 +77,6 @@ impl Behavior for PriceChanger { tx.send().await?.watch().await?; - println!("Price updated to: {}", self.process.current_value()); - Ok(ControlFlow::Continue) } } diff --git a/src/types/process.rs b/src/types/process.rs index 56ce4be..dd0a4af 100644 --- a/src/types/process.rs +++ b/src/types/process.rs @@ -3,11 +3,9 @@ use rand_distr::{Distribution, Normal}; use serde::{Deserialize, Serialize}; pub trait StochasticProcess { - type Value; + fn current_value(&self) -> f64; - fn current_value(&self) -> Self::Value; - - fn step(&mut self) -> Self::Value; + fn step(&mut self) -> f64; } #[derive(Serialize, Deserialize, Debug)] @@ -40,13 +38,11 @@ impl OrnsteinUhlenbeck { } impl StochasticProcess for OrnsteinUhlenbeck { - type Value = f64; - - fn current_value(&self) -> Self::Value { + fn current_value(&self) -> f64 { self.current_value } - fn step(&mut self) -> Self::Value { + fn step(&mut self) -> f64 { let mut rng = thread_rng(); let normal = Normal::new(0.0, 1.0).unwrap();