diff --git a/redis/_parsers/base.py b/redis/_parsers/base.py index 69d7b585dd..177f655f09 100644 --- a/redis/_parsers/base.py +++ b/redis/_parsers/base.py @@ -1,7 +1,16 @@ +import logging import sys from abc import ABC from asyncio import IncompleteReadError, StreamReader, TimeoutError -from typing import Callable, List, Optional, Protocol, Union +from typing import Awaitable, Callable, List, Optional, Protocol, Union + +from redis.maintenance_events import ( + NodeFailedOverEvent, + NodeFailingOverEvent, + NodeMigratedEvent, + NodeMigratingEvent, + NodeMovingEvent, +) if sys.version_info.major >= 3 and sys.version_info.minor >= 11: from asyncio import timeout as async_timeout @@ -50,6 +59,8 @@ "Client sent AUTH, but no password is set": AuthenticationError, } +logger = logging.getLogger(__name__) + class BaseParser(ABC): EXCEPTION_CLASSES = { @@ -158,7 +169,19 @@ async def read_response( raise NotImplementedError() -_INVALIDATION_MESSAGE = [b"invalidate", "invalidate"] +_INVALIDATION_MESSAGE = (b"invalidate", "invalidate") +_MOVING_MESSAGE = (b"MOVING", "MOVING") +_MIGRATING_MESSAGE = (b"MIGRATING", "MIGRATING") +_MIGRATED_MESSAGE = (b"MIGRATED", "MIGRATED") +_FAILING_OVER_MESSAGE = (b"FAILING_OVER", "FAILING_OVER") +_FAILED_OVER_MESSAGE = (b"FAILED_OVER", "FAILED_OVER") + +_MAINTENANCE_MESSAGES = ( + *_MIGRATING_MESSAGE, + *_MIGRATED_MESSAGE, + *_FAILING_OVER_MESSAGE, + *_FAILED_OVER_MESSAGE, +) class PushNotificationsParser(Protocol): @@ -166,16 +189,68 @@ class PushNotificationsParser(Protocol): pubsub_push_handler_func: Callable invalidation_push_handler_func: Optional[Callable] = None + node_moving_push_handler_func: Optional[Callable] = None + maintenance_push_handler_func: Optional[Callable] = None def handle_pubsub_push_response(self, response): """Handle pubsub push responses""" raise NotImplementedError() def handle_push_response(self, response, **kwargs): - if response[0] not in _INVALIDATION_MESSAGE: + msg_type = response[0] + if msg_type not in ( + *_INVALIDATION_MESSAGE, + *_MAINTENANCE_MESSAGES, + *_MOVING_MESSAGE, + ): return self.pubsub_push_handler_func(response) - if self.invalidation_push_handler_func: - return self.invalidation_push_handler_func(response) + + try: + if ( + msg_type in _INVALIDATION_MESSAGE + and self.invalidation_push_handler_func + ): + return self.invalidation_push_handler_func(response) + + if msg_type in _MOVING_MESSAGE and self.node_moving_push_handler_func: + # Expected message format is: MOVING