Skip to content

Commit

Permalink
exporter: Use market hours information in price filtering (#95)
Browse files Browse the repository at this point in the history
* exporter: Use market hours information in price filtering

* Rename MarketHours->WeeklySchedule, market_hours->weekly_schedule

* market_hours: verify DST behavior using EU and US lag example

* market_hours.rs: true positives where markets agree on market hours
  • Loading branch information
drozdziak1 authored Nov 29, 2023
1 parent 75497df commit 3bc2d5a
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 56 deletions.
35 changes: 35 additions & 0 deletions integration-tests/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
"nasdaq_symbol": "AAPL",
"symbol": "Equity.US.AAPL/USD",
"base": "AAPL",
"weekly_schedule": "America/New_York,C,C,C,C,C,C,C" # Should never be published due to all-closed market hours
},
"metadata": {"jump_id": "186", "jump_symbol": "AAPL", "price_exp": -5, "min_publishers": 1},
}
Expand Down Expand Up @@ -732,3 +733,37 @@ async def test_agent_migrate_config(self,
# Continue with the simple test case, which must succeed
await self.test_update_price_simple(client_no_spawn)
await client_no_spawn.close()

@pytest.mark.asyncio
async def test_agent_respects_market_hours(self, client: PythAgentClient):
'''
Similar to test_update_price_simple, but using AAPL_USD and
asserting that nothing is published due to the symbol's
all-closed market hours.
'''

# Fetch all products
products = {product["attr_dict"]["symbol"]: product for product in await client.get_all_products()}

# Find the product account ID corresponding to the AAPL/USD symbol
product = products[AAPL_USD["attr_dict"]["symbol"]]
product_account = product["account"]

# Get the price account with which to send updates
price_account = product["price_accounts"][0]["account"]

# Send an "update_price" request
await client.update_price(price_account, 42, 2, "trading")
time.sleep(2)

# Send another "update_price" request to trigger aggregation
await client.update_price(price_account, 81, 1, "trading")
time.sleep(2)

# Confirm that the price account has been updated with the values from the first "update_price" request
final_product_state = await client.get_product(product_account)

final_price_account = final_product_state["price_accounts"][0]
assert final_price_account["price"] == 0
assert final_price_account["conf"] == 0
assert final_price_account["status"] == "unknown"
160 changes: 133 additions & 27 deletions src/agent/market_hours.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use {
chrono::{
naive::NaiveTime,
DateTime,
Datelike,
Duration,
TimeZone,
Utc,
Weekday,
},
chrono_tz::Tz,
Expand All @@ -27,8 +28,8 @@ lazy_static! {
}

/// Weekly market hours schedule
#[derive(Default, Debug, Eq, PartialEq)]
pub struct MarketHours {
#[derive(Clone, Default, Debug, Eq, PartialEq)]
pub struct WeeklySchedule {
pub timezone: Tz,
pub mon: MHKind,
pub tue: MHKind,
Expand All @@ -39,7 +40,7 @@ pub struct MarketHours {
pub sun: MHKind,
}

impl MarketHours {
impl WeeklySchedule {
pub fn all_closed() -> Self {
Self {
timezone: Default::default(),
Expand All @@ -53,13 +54,11 @@ impl MarketHours {
}
}

pub fn can_publish_at<Tz: TimeZone>(&self, when: &DateTime<Tz>) -> Result<bool> {
pub fn can_publish_at(&self, when: &DateTime<Utc>) -> bool {
// Convert to time local to the market
let when_market_local = when.with_timezone(&self.timezone);

// NOTE(2023-11-21): Strangely enough, I couldn't find a
// method that gets the programmatic Weekday from a DateTime.
let market_weekday: Weekday = when_market_local.format("%A").to_string().parse()?;
let market_weekday: Weekday = when_market_local.date_naive().weekday();

let market_time = when_market_local.time();

Expand All @@ -73,11 +72,11 @@ impl MarketHours {
Weekday::Sun => self.sun.can_publish_at(market_time),
};

Ok(ret)
ret
}
}

impl FromStr for MarketHours {
impl FromStr for WeeklySchedule {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self> {
let mut split_by_commas = s.split(",");
Expand Down Expand Up @@ -163,7 +162,7 @@ impl FromStr for MarketHours {
}

/// Helper enum for denoting per-day schedules: time range, all-day open and all-day closed.
#[derive(Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum MHKind {
Open,
Closed,
Expand Down Expand Up @@ -236,9 +235,9 @@ mod tests {
// Mon-Fri 9-5, inconsistent leading space on Tuesday, leading 0 on Friday (expected to be fine)
let s = "Europe/Warsaw,9:00-17:00, 9:00-17:00,9:00-17:00,9:00-17:00,09:00-17:00,C,C";

let parsed: MarketHours = s.parse()?;
let parsed: WeeklySchedule = s.parse()?;

let expected = MarketHours {
let expected = WeeklySchedule {
timezone: Tz::Europe__Warsaw,
mon: MHKind::TimeRange(
NaiveTime::from_hms_opt(9, 0, 0).unwrap(),
Expand Down Expand Up @@ -274,7 +273,7 @@ mod tests {
// Valid but missing a timezone
let s = "O,C,O,C,O,C,O";

let parsing_result: Result<MarketHours> = s.parse();
let parsing_result: Result<WeeklySchedule> = s.parse();

dbg!(&parsing_result);
assert!(parsing_result.is_err());
Expand All @@ -285,7 +284,7 @@ mod tests {
// One day short
let s = "Asia/Hong_Kong,C,O,C,O,C,O";

let parsing_result: Result<MarketHours> = s.parse();
let parsing_result: Result<WeeklySchedule> = s.parse();

dbg!(&parsing_result);
assert!(parsing_result.is_err());
Expand All @@ -295,7 +294,7 @@ mod tests {
fn test_parsing_gibberish_timezone_is_error() {
// Pretty sure that one's extinct
let s = "Pangea/New_Dino_City,O,O,O,O,O,O,O";
let parsing_result: Result<MarketHours> = s.parse();
let parsing_result: Result<WeeklySchedule> = s.parse();

dbg!(&parsing_result);
assert!(parsing_result.is_err());
Expand All @@ -304,7 +303,7 @@ mod tests {
#[test]
fn test_parsing_gibberish_day_schedule_is_error() {
let s = "Europe/Amsterdam,mondays are alright I guess,O,O,O,O,O,O";
let parsing_result: Result<MarketHours> = s.parse();
let parsing_result: Result<WeeklySchedule> = s.parse();

dbg!(&parsing_result);
assert!(parsing_result.is_err());
Expand All @@ -314,7 +313,7 @@ mod tests {
fn test_parsing_too_many_days_is_error() {
// One day too many
let s = "Europe/Lisbon,O,O,O,O,O,O,O,O,C";
let parsing_result: Result<MarketHours> = s.parse();
let parsing_result: Result<WeeklySchedule> = s.parse();

dbg!(&parsing_result);
assert!(parsing_result.is_err());
Expand All @@ -323,7 +322,7 @@ mod tests {
#[test]
fn test_market_hours_happy_path() -> Result<()> {
// Prepare a schedule of narrow ranges
let mh: MarketHours = "America/New_York,00:00-1:00,1:00-2:00,2:00-3:00,3:00-4:00,4:00-5:00,5:00-6:00,6:00-7:00".parse()?;
let wsched: WeeklySchedule = "America/New_York,00:00-1:00,1:00-2:00,2:00-3:00,3:00-4:00,4:00-5:00,5:00-6:00,6:00-7:00".parse()?;

// Prepare UTC datetimes that fall before, within and after market hours
let format = "%Y-%m-%d %H:%M";
Expand Down Expand Up @@ -357,7 +356,7 @@ mod tests {
NaiveDateTime::parse_from_str("2023-11-26 12:30", format)?.and_utc(),
];

dbg!(&mh);
dbg!(&wsched);

for ((before_dt, ok_dt), after_dt) in bad_datetimes_before
.iter()
Expand All @@ -368,9 +367,9 @@ mod tests {
dbg!(&ok_dt);
dbg!(&after_dt);

assert!(!mh.can_publish_at(before_dt)?);
assert!(mh.can_publish_at(ok_dt)?);
assert!(!mh.can_publish_at(after_dt)?);
assert!(!wsched.can_publish_at(before_dt));
assert!(wsched.can_publish_at(ok_dt));
assert!(!wsched.can_publish_at(after_dt));
}

Ok(())
Expand All @@ -380,7 +379,8 @@ mod tests {
#[test]
fn test_market_hours_midnight_00_24() -> Result<()> {
// Prepare a schedule of midnight-neighboring ranges
let mh: MarketHours = "Europe/Amsterdam,23:00-24:00,00:00-01:00,O,C,C,C,C".parse()?;
let wsched: WeeklySchedule =
"Europe/Amsterdam,23:00-24:00,00:00-01:00,O,C,C,C,C".parse()?;

let format = "%Y-%m-%d %H:%M";
let ok_datetimes = vec![
Expand Down Expand Up @@ -408,16 +408,122 @@ mod tests {
.unwrap(),
];

dbg!(&mh);
dbg!(&wsched);

for (ok_dt, bad_dt) in ok_datetimes.iter().zip(bad_datetimes.iter()) {
dbg!(&ok_dt);
dbg!(&bad_dt);

assert!(mh.can_publish_at(ok_dt)?);
assert!(!mh.can_publish_at(bad_dt)?);
assert!(wsched.can_publish_at(&ok_dt.with_timezone(&Utc)));
assert!(!wsched.can_publish_at(&bad_dt.with_timezone(&Utc)));
}

Ok(())
}

/// Performs a scenario on 2023 autumn DST change. During that
/// time, most of the EU switched on the weekend one week earlier
/// (Oct 28-29) than most of the US (Nov 4-5).
#[test]
fn test_market_hours_dst_shenanigans() -> Result<()> {
// The Monday schedule is equivalent between Amsterdam and
// Chicago for most of 2023 (7h difference), except for two
// instances of Amsterdam/Chicago DST change lag:
// * Spring 2023: Mar12(US)-Mar26(EU) (clocks go forward 1h,
// CDT/CET 6h offset in use for 2 weeks, CDT/CEST 7h offset after)
// * Autumn 2023: Oct29(EU)-Nov5(US) (clocks go back 1h,
// CDT/CET 6h offset in use 1 week, CST/CET 7h offset after)
let wsched_eu: WeeklySchedule = "Europe/Amsterdam,9:00-17:00,O,O,O,O,O,O".parse()?;
let wsched_us: WeeklySchedule = "America/Chicago,2:00-10:00,O,O,O,O,O,O".parse()?;

let format = "%Y-%m-%d %H:%M";

// Monday after EU change, before US change, from Amsterdam
// perspective. Okay for publishing Amsterdam market, outside hours for Chicago market
let dt1 = NaiveDateTime::parse_from_str("2023-10-30 16:01", format)?
.and_local_timezone(Tz::Europe__Amsterdam)
.unwrap();
dbg!(&dt1);

assert!(wsched_eu.can_publish_at(&dt1.with_timezone(&Utc)));
assert!(!wsched_us.can_publish_at(&dt1.with_timezone(&Utc)));

// Same point in time, from Chicago perspective. Still okay
// for Amsterdam, still outside hours for Chicago.
let dt2 = NaiveDateTime::parse_from_str("2023-10-30 10:01", format)?
.and_local_timezone(Tz::America__Chicago)
.unwrap();
dbg!(&dt2);

assert!(wsched_eu.can_publish_at(&dt2.with_timezone(&Utc)));
assert!(!wsched_us.can_publish_at(&dt2.with_timezone(&Utc)));

assert_eq!(dt1, dt2);

// Monday after EU change, before US change, from Chicago
// perspective. Okay for publishing Chicago market, outside
// hours for publishing Amsterdam market.
let dt3 = NaiveDateTime::parse_from_str("2023-10-30 02:01", format)?
.and_local_timezone(Tz::America__Chicago)
.unwrap();
dbg!(&dt3);

assert!(!wsched_eu.can_publish_at(&dt3.with_timezone(&Utc)));
assert!(wsched_us.can_publish_at(&dt3.with_timezone(&Utc)));

// Same point in time, from Amsterdam perspective. Still okay
// for Chicago, still outside hours for Amsterdam.
let dt4 = NaiveDateTime::parse_from_str("2023-10-30 08:01", format)?
.and_local_timezone(Tz::Europe__Amsterdam)
.unwrap();
dbg!(&dt4);

assert!(!wsched_eu.can_publish_at(&dt4.with_timezone(&Utc)));
assert!(wsched_us.can_publish_at(&dt4.with_timezone(&Utc)));

assert_eq!(dt3, dt4);

// Monday after both Amsterdam and Chicago get over their DST
// change, from Amsterdam perspective. Okay for publishing
// both markets.
let dt5 = NaiveDateTime::parse_from_str("2023-11-06 09:01", format)?
.and_local_timezone(Tz::Europe__Amsterdam)
.unwrap();
dbg!(&dt5);
assert!(wsched_eu.can_publish_at(&dt5.with_timezone(&Utc)));
assert!(wsched_us.can_publish_at(&dt5.with_timezone(&Utc)));

// Same point in time, from Chicago perspective
let dt6 = NaiveDateTime::parse_from_str("2023-11-06 02:01", format)?
.and_local_timezone(Tz::America__Chicago)
.unwrap();
dbg!(&dt6);
assert!(wsched_eu.can_publish_at(&dt6.with_timezone(&Utc)));
assert!(wsched_us.can_publish_at(&dt6.with_timezone(&Utc)));

assert_eq!(dt5, dt6);

// Monday after both Amsterdam and Chicago get over their DST
// change, from Amsterdam perspective. Outside both markets'
// hours.
let dt7 = NaiveDateTime::parse_from_str("2023-11-06 17:01", format)?
.and_local_timezone(Tz::Europe__Amsterdam)
.unwrap();
dbg!(&dt7);
assert!(!wsched_eu.can_publish_at(&dt7.with_timezone(&Utc)));
assert!(!wsched_us.can_publish_at(&dt7.with_timezone(&Utc)));

// Same point in time, from Chicago perspective, still outside
// hours for both markets.
let dt8 = NaiveDateTime::parse_from_str("2023-11-06 10:01", format)?
.and_local_timezone(Tz::America__Chicago)
.unwrap();
dbg!(&dt8);
assert!(!wsched_eu.can_publish_at(&dt8.with_timezone(&Utc)));
assert!(!wsched_us.can_publish_at(&dt8.with_timezone(&Utc)));

assert_eq!(dt7, dt8);

Ok(())
}
}
10 changes: 6 additions & 4 deletions src/agent/pythd/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ mod tests {
)
.unwrap(),
solana::oracle::ProductEntry {
account_data: pyth_sdk_solana::state::ProductAccount {
account_data: pyth_sdk_solana::state::ProductAccount {
magic: 0xa1b2c3d4,
ver: 6,
atype: 4,
Expand Down Expand Up @@ -992,7 +992,8 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
],
},
price_accounts: vec![
weekly_schedule: Default::default(),
price_accounts: vec![
solana_sdk::pubkey::Pubkey::from_str(
"GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU",
)
Expand All @@ -1014,7 +1015,7 @@ mod tests {
)
.unwrap(),
solana::oracle::ProductEntry {
account_data: pyth_sdk_solana::state::ProductAccount {
account_data: pyth_sdk_solana::state::ProductAccount {
magic: 0xa1b2c3d4,
ver: 5,
atype: 3,
Expand Down Expand Up @@ -1051,7 +1052,8 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
],
},
price_accounts: vec![
weekly_schedule: Default::default(),
price_accounts: vec![
solana_sdk::pubkey::Pubkey::from_str(
"GG3FTE7xhc9Diy7dn9P6BWzoCrAEE4D3p5NBYrDAm5DD",
)
Expand Down
Loading

0 comments on commit 3bc2d5a

Please sign in to comment.