diff --git a/pyth_observer/check/publisher.py b/pyth_observer/check/publisher.py index 39e6676..6a20ad4 100644 --- a/pyth_observer/check/publisher.py +++ b/pyth_observer/check/publisher.py @@ -293,13 +293,17 @@ def run(self) -> bool: current_time = int(time.time()) publisher_key = (self.__state.publisher_name, self.__state.symbol) - PUBLISHER_CACHE[publisher_key].append( - PriceUpdate(current_time, self.__state.price) - ) updates = PUBLISHER_CACHE[publisher_key] + # Only cache new prices, let repeated prices grow stale. + # These will be caught as an exact stall in the detector. + is_repeated_price = updates and updates[-1].price == self.__state.price + cur_update = PriceUpdate(current_time, self.__state.price) + if not is_repeated_price: + PUBLISHER_CACHE[publisher_key].append(cur_update) + # Analyze for stalls - result = self.__detector.analyze_updates(list(updates)) + result = self.__detector.analyze_updates(list(updates), cur_update) logger.debug(f"Stall detection result: {result}") self.__last_analysis = result # For error logging diff --git a/pyth_observer/check/stall_detection.py b/pyth_observer/check/stall_detection.py index 56135cb..8fc1197 100644 --- a/pyth_observer/check/stall_detection.py +++ b/pyth_observer/check/stall_detection.py @@ -78,33 +78,36 @@ def __init__( self.noise_threshold = noise_threshold self.min_noise_samples = min_noise_samples - def analyze_updates(self, updates: List[PriceUpdate]) -> StallDetectionResult: + def analyze_updates( + self, updates: List[PriceUpdate], cur_update: PriceUpdate + ) -> StallDetectionResult: """ Assumes that the cache has been recently updated since it takes the latest cached timestamp as the current time. Args: updates: List of price updates to analyze + cur_update: The update currently being processed. If it's a repeated price, + the update won't be in `updates`, so we need it as a separate parameter. Returns: StallDetectionResult with detection details """ - # Need at least 2 samples - if not updates or len(updates) < 2: + # Need at least 1 sample + if not updates: return StallDetectionResult.no_stall() ## Check for exact stall # The latest 2 updates are sufficient to detect an exact stall - latest_updates = updates[-2:] - duration = latest_updates[1].timestamp - latest_updates[0].timestamp + duration = cur_update.timestamp - updates[-1].timestamp if duration <= self.stall_time_limit: return StallDetectionResult.no_stall() - elif latest_updates[1].price == latest_updates[0].price: + elif cur_update.price == updates[-1].price: return StallDetectionResult( is_stalled=True, stall_type="exact", - base_price=latest_updates[1].price, + base_price=cur_update.price, noise_magnitude=0.0, duration=duration, confidence=1.0, diff --git a/tests/test_checks_publisher.py b/tests/test_checks_publisher.py index 7af8750..4676b1f 100644 --- a/tests/test_checks_publisher.py +++ b/tests/test_checks_publisher.py @@ -114,6 +114,24 @@ def test_exact_stall_fails_check(self): check_b = self.setup_check(state_b, stall_time_limit=5) self.run_check(check_b, 6, False) # Should fail as it exceeds the limit + PUBLISHER_CACHE.clear() + state_c = make_publisher_state(1, 100.0, 2.0, 1, 100.0, 1.0) + check_c = self.setup_check(state_c, stall_time_limit=5) + self.run_check(check_c, 2, True) # Initial check should pass + state_c.price = 105.0 # Change the price + self.run_check(check_c, 3, True) # Should pass as price changes + state_c.price = 100.0 # Change back to original price + # Simulate a stall -- send the same price repeatedly. + self.run_check(check_c, 2, True) + state_c.price = 100.0 + self.run_check(check_c, 2, True) + state_c.price = 100.0 + self.run_check(check_c, 2, True) + state_c.price = 100.0 + self.run_check( + check_c, 2, False + ) # Should fail since we breached the stall time limit + PUBLISHER_CACHE.clear() state_c = make_publisher_state(1, 100.0, 2.0, 1, 100.0, 1.0) check_c = self.setup_check(state_c, stall_time_limit=5)