diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 3f35fdd59e..ac907b0c10 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -39,6 +39,7 @@ ) from redis.asyncio.lock import Lock from redis.asyncio.retry import Retry +from redis.backoff import ExponentialWithJitterBackoff from redis.client import ( EMPTY_RESPONSE, NEVER_DECODE, @@ -65,7 +66,6 @@ PubSubError, RedisError, ResponseError, - TimeoutError, WatchError, ) from redis.typing import ChannelT, EncodableT, KeyT @@ -73,6 +73,7 @@ HIREDIS_AVAILABLE, SSL_AVAILABLE, _set_info_logger, + deprecated_args, deprecated_function, get_lib_version, safe_str, @@ -208,6 +209,11 @@ def from_pool( client.auto_close_connection_pool = True return client + @deprecated_args( + args_to_warn=["retry_on_timeout"], + reason="TimeoutError is included by default.", + version="6.0.0", + ) def __init__( self, *, @@ -225,6 +231,9 @@ def __init__( encoding_errors: str = "strict", decode_responses: bool = False, retry_on_timeout: bool = False, + retry: Retry = Retry( + backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3 + ), retry_on_error: Optional[list] = None, ssl: bool = False, ssl_keyfile: Optional[str] = None, @@ -242,7 +251,6 @@ def __init__( lib_name: Optional[str] = "redis-py", lib_version: Optional[str] = get_lib_version(), username: Optional[str] = None, - retry: Optional[Retry] = None, auto_close_connection_pool: Optional[bool] = None, redis_connect_func=None, credential_provider: Optional[CredentialProvider] = None, @@ -251,10 +259,24 @@ def __init__( ): """ Initialize a new Redis client. - To specify a retry policy for specific errors, first set - `retry_on_error` to a list of the error/s to retry on, then set - `retry` to a valid `Retry` object. - To retry on TimeoutError, `retry_on_timeout` can also be set to `True`. + + To specify a retry policy for specific errors, you have two options: + + 1. Set the `retry_on_error` to a list of the error/s to retry on, and + you can also set `retry` to a valid `Retry` object(in case the default + one is not appropriate) - with this approach the retries will be triggered + on the default errors specified in the Retry object enriched with the + errors specified in `retry_on_error`. + + 2. Define a `Retry` object with configured 'supported_errors' and set + it to the `retry` parameter - with this approach you completely redefine + the errors on which retries will happen. + + `retry_on_timeout` is deprecated - please include the TimeoutError + either in the Retry object or in the `retry_on_error` list. + + When 'connection_pool' is provided - the retry configuration of the + provided pool will be used. """ kwargs: Dict[str, Any] if event_dispatcher is None: @@ -280,8 +302,6 @@ def __init__( # Create internal connection pool, expected to be closed by Redis instance if not retry_on_error: retry_on_error = [] - if retry_on_timeout is True: - retry_on_error.append(TimeoutError) kwargs = { "db": db, "username": username, @@ -291,7 +311,6 @@ def __init__( "encoding": encoding, "encoding_errors": encoding_errors, "decode_responses": decode_responses, - "retry_on_timeout": retry_on_timeout, "retry_on_error": retry_on_error, "retry": copy.deepcopy(retry), "max_connections": max_connections, @@ -403,10 +422,10 @@ def get_connection_kwargs(self): """Get the connection's key-word arguments""" return self.connection_pool.connection_kwargs - def get_retry(self) -> Optional["Retry"]: + def get_retry(self) -> Optional[Retry]: return self.get_connection_kwargs().get("retry") - def set_retry(self, retry: "Retry") -> None: + def set_retry(self, retry: Retry) -> None: self.get_connection_kwargs().update({"retry": retry}) self.connection_pool.set_retry(retry) @@ -633,18 +652,17 @@ async def _send_command_parse_response(self, conn, command_name, *args, **option await conn.send_command(*args) return await self.parse_response(conn, command_name, **options) - async def _disconnect_raise(self, conn: Connection, error: Exception): + async def _close_connection(self, conn: Connection): """ - Close the connection and raise an exception - if retry_on_error is not set or the error - is not one of the specified error types + Close the connection before retrying. + + The supported exceptions are already checked in the + retry object so we don't need to do it here. + + After we disconnect the connection, it will try to reconnect and + do a health check as part of the send_command logic(on connection level). """ await conn.disconnect() - if ( - conn.retry_on_error is None - or isinstance(error, tuple(conn.retry_on_error)) is False - ): - raise error # COMMAND EXECUTION AND PROTOCOL PARSING async def execute_command(self, *args, **options): @@ -661,7 +679,7 @@ async def execute_command(self, *args, **options): lambda: self._send_command_parse_response( conn, command_name, *args, **options ), - lambda error: self._disconnect_raise(conn, error), + lambda _: self._close_connection(conn), ) finally: if self.single_connection_client: @@ -929,19 +947,11 @@ async def connect(self): ) ) - async def _disconnect_raise_connect(self, conn, error): + async def _reconnect(self, conn): """ - Close the connection and raise an exception - if retry_on_error is not set or the error is not one - of the specified error types. Otherwise, try to - reconnect + Try to reconnect """ await conn.disconnect() - if ( - conn.retry_on_error is None - or isinstance(error, tuple(conn.retry_on_error)) is False - ): - raise error await conn.connect() async def _execute(self, conn, command, *args, **kwargs): @@ -954,7 +964,7 @@ async def _execute(self, conn, command, *args, **kwargs): """ return await conn.retry.call_with_retry( lambda: command(*args, **kwargs), - lambda error: self._disconnect_raise_connect(conn, error), + lambda _: self._reconnect(conn), ) async def parse_response(self, block: bool = True, timeout: float = 0): @@ -1245,7 +1255,8 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] in one transmission. This is convenient for batch processing, such as saving all the values in a list to Redis. - All commands executed within a pipeline are wrapped with MULTI and EXEC + All commands executed within a pipeline(when running in transactional mode, + which is the default behavior) are wrapped with MULTI and EXEC calls. This guarantees all commands executed in the pipeline will be executed atomically. @@ -1274,7 +1285,7 @@ def __init__( self.shard_hint = shard_hint self.watching = False self.command_stack: CommandStackT = [] - self.scripts: Set["Script"] = set() + self.scripts: Set[Script] = set() self.explicit_transaction = False async def __aenter__(self: _RedisT) -> _RedisT: @@ -1346,36 +1357,36 @@ def execute_command( return self.immediate_execute_command(*args, **kwargs) return self.pipeline_execute_command(*args, **kwargs) - async def _disconnect_reset_raise(self, conn, error): + async def _disconnect_reset_raise_on_watching( + self, + conn: Connection, + error: Exception, + ): """ - Close the connection, reset watching state and - raise an exception if we were watching, - if retry_on_error is not set or the error is not one - of the specified error types. + Close the connection reset watching state and + raise an exception if we were watching. + + The supported exceptions are already checked in the + retry object so we don't need to do it here. + + After we disconnect the connection, it will try to reconnect and + do a health check as part of the send_command logic(on connection level). """ await conn.disconnect() # if we were already watching a variable, the watch is no longer # valid since this connection has died. raise a WatchError, which # indicates the user should retry this transaction. if self.watching: - await self.aclose() + await self.reset() raise WatchError( - "A ConnectionError occurred on while watching one or more keys" + f"A {type(error).__name__} occurred while watching one or more keys" ) - # if retry_on_error is not set or the error is not one - # of the specified error types, raise it - if ( - conn.retry_on_error is None - or isinstance(error, tuple(conn.retry_on_error)) is False - ): - await self.aclose() - raise async def immediate_execute_command(self, *args, **options): """ - Execute a command immediately, but don't auto-retry on a - ConnectionError if we're already WATCHing a variable. Used when - issuing WATCH or subsequent commands retrieving their values but before + Execute a command immediately, but don't auto-retry on the supported + errors for retry if we're already WATCHing a variable. + Used when issuing WATCH or subsequent commands retrieving their values but before MULTI is called. """ command_name = args[0] @@ -1389,7 +1400,7 @@ async def immediate_execute_command(self, *args, **options): lambda: self._send_command_parse_response( conn, command_name, *args, **options ), - lambda error: self._disconnect_reset_raise(conn, error), + lambda error: self._disconnect_reset_raise_on_watching(conn, error), ) def pipeline_execute_command(self, *args, **options): @@ -1544,11 +1555,15 @@ async def load_scripts(self): if not exist: s.sha = await immediate("SCRIPT LOAD", s.script) - async def _disconnect_raise_reset(self, conn: Connection, error: Exception): + async def _disconnect_raise_on_watching(self, conn: Connection, error: Exception): """ - Close the connection, raise an exception if we were watching, - and raise an exception if retry_on_error is not set or the - error is not one of the specified error types. + Close the connection, raise an exception if we were watching. + + The supported exceptions are already checked in the + retry object so we don't need to do it here. + + After we disconnect the connection, it will try to reconnect and + do a health check as part of the send_command logic(on connection level). """ await conn.disconnect() # if we were watching a variable, the watch is no longer valid @@ -1556,16 +1571,8 @@ async def _disconnect_raise_reset(self, conn: Connection, error: Exception): # indicates the user should retry this transaction. if self.watching: raise WatchError( - "A ConnectionError occurred on while watching one or more keys" + f"A {type(error).__name__} occurred while watching one or more keys" ) - # if retry_on_error is not set or the error is not one - # of the specified error types, raise it - if ( - conn.retry_on_error is None - or isinstance(error, tuple(conn.retry_on_error)) is False - ): - await self.reset() - raise async def execute(self, raise_on_error: bool = True) -> List[Any]: """Execute all the commands in the current pipeline""" @@ -1590,7 +1597,7 @@ async def execute(self, raise_on_error: bool = True) -> List[Any]: try: return await conn.retry.call_with_retry( lambda: execute(conn, stack, raise_on_error), - lambda error: self._disconnect_raise_reset(conn, error), + lambda error: self._disconnect_raise_on_watching(conn, error), ) finally: await self.reset() diff --git a/redis/client.py b/redis/client.py index fda927507a..138f561974 100755 --- a/redis/client.py +++ b/redis/client.py @@ -11,6 +11,7 @@ List, Mapping, Optional, + Set, Type, Union, ) @@ -22,6 +23,7 @@ _RedisCallbacksRESP3, bool_ok, ) +from redis.backoff import ExponentialWithJitterBackoff from redis.cache import CacheConfig, CacheInterface from redis.commands import ( CoreCommands, @@ -29,6 +31,7 @@ SentinelCommands, list_or_args, ) +from redis.commands.core import Script from redis.connection import ( AbstractConnection, ConnectionPool, @@ -49,7 +52,6 @@ PubSubError, RedisError, ResponseError, - TimeoutError, WatchError, ) from redis.lock import Lock @@ -57,6 +59,7 @@ from redis.utils import ( HIREDIS_AVAILABLE, _set_info_logger, + deprecated_args, get_lib_version, safe_str, str_if_bytes, @@ -188,6 +191,11 @@ def from_pool( client.auto_close_connection_pool = True return client + @deprecated_args( + args_to_warn=["retry_on_timeout"], + reason="TimeoutError is included by default.", + version="6.0.0", + ) def __init__( self, host: str = "localhost", @@ -204,6 +212,9 @@ def __init__( encoding_errors: str = "strict", decode_responses: bool = False, retry_on_timeout: bool = False, + retry: Retry = Retry( + backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3 + ), retry_on_error: Optional[List[Type[Exception]]] = None, ssl: bool = False, ssl_keyfile: Optional[str] = None, @@ -227,7 +238,6 @@ def __init__( lib_name: Optional[str] = "redis-py", lib_version: Optional[str] = get_lib_version(), username: Optional[str] = None, - retry: Optional[Retry] = None, redis_connect_func: Optional[Callable[[], None]] = None, credential_provider: Optional[CredentialProvider] = None, protocol: Optional[int] = 2, @@ -237,10 +247,24 @@ def __init__( ) -> None: """ Initialize a new Redis client. - To specify a retry policy for specific errors, first set - `retry_on_error` to a list of the error/s to retry on, then set - `retry` to a valid `Retry` object. - To retry on TimeoutError, `retry_on_timeout` can also be set to `True`. + + To specify a retry policy for specific errors, you have two options: + + 1. Set the `retry_on_error` to a list of the error/s to retry on, and + you can also set `retry` to a valid `Retry` object(in case the default + one is not appropriate) - with this approach the retries will be triggered + on the default errors specified in the Retry object enriched with the + errors specified in `retry_on_error`. + + 2. Define a `Retry` object with configured 'supported_errors' and set + it to the `retry` parameter - with this approach you completely redefine + the errors on which retries will happen. + + `retry_on_timeout` is deprecated - please include the TimeoutError + either in the Retry object or in the `retry_on_error` list. + + When 'connection_pool' is provided - the retry configuration of the + provided pool will be used. Args: @@ -255,8 +279,6 @@ def __init__( if not connection_pool: if not retry_on_error: retry_on_error = [] - if retry_on_timeout is True: - retry_on_error.append(TimeoutError) kwargs = { "db": db, "username": username, @@ -378,10 +400,10 @@ def get_connection_kwargs(self) -> Dict: """Get the connection's key-word arguments""" return self.connection_pool.connection_kwargs - def get_retry(self) -> Optional["Retry"]: + def get_retry(self) -> Optional[Retry]: return self.get_connection_kwargs().get("retry") - def set_retry(self, retry: "Retry") -> None: + def set_retry(self, retry: Retry) -> None: self.get_connection_kwargs().update({"retry": retry}) self.connection_pool.set_retry(retry) @@ -581,18 +603,18 @@ def _send_command_parse_response(self, conn, command_name, *args, **options): conn.send_command(*args, **options) return self.parse_response(conn, command_name, **options) - def _disconnect_raise(self, conn, error): + def _close_connection(self, conn) -> None: """ - Close the connection and raise an exception - if retry_on_error is not set or the error - is not one of the specified error types + Close the connection before retrying. + + The supported exceptions are already checked in the + retry object so we don't need to do it here. + + After we disconnect the connection, it will try to reconnect and + do a health check as part of the send_command logic(on connection level). """ + conn.disconnect() - if ( - conn.retry_on_error is None - or isinstance(error, tuple(conn.retry_on_error)) is False - ): - raise error # COMMAND EXECUTION AND PROTOCOL PARSING def execute_command(self, *args, **options): @@ -611,7 +633,7 @@ def _execute_command(self, *args, **options): lambda: self._send_command_parse_response( conn, command_name, *args, **options ), - lambda error: self._disconnect_raise(conn, error), + lambda _: self._close_connection(conn), ) finally: if self._single_connection_client: @@ -870,19 +892,14 @@ def clean_health_check_responses(self) -> None: ) ttl -= 1 - def _disconnect_raise_connect(self, conn, error) -> None: + def _reconnect(self, conn) -> None: """ - Close the connection and raise an exception - if retry_on_error is not set or the error is not one - of the specified error types. Otherwise, try to - reconnect + The supported exceptions are already checked in the + retry object so we don't need to do it here. + + In this error handler we are trying to reconnect to the server. """ conn.disconnect() - if ( - conn.retry_on_error is None - or isinstance(error, tuple(conn.retry_on_error)) is False - ): - raise error conn.connect() def _execute(self, conn, command, *args, **kwargs): @@ -895,7 +912,7 @@ def _execute(self, conn, command, *args, **kwargs): """ return conn.retry.call_with_retry( lambda: command(*args, **kwargs), - lambda error: self._disconnect_raise_connect(conn, error), + lambda _: self._reconnect(conn), ) def parse_response(self, block=True, timeout=0): @@ -1264,7 +1281,8 @@ class Pipeline(Redis): in one transmission. This is convenient for batch processing, such as saving all the values in a list to Redis. - All commands executed within a pipeline are wrapped with MULTI and EXEC + All commands executed within a pipeline(when running in transactional mode, + which is the default behavior) are wrapped with MULTI and EXEC calls. This guarantees all commands executed in the pipeline will be executed atomically. @@ -1285,9 +1303,10 @@ def __init__(self, connection_pool, response_callbacks, transaction, shard_hint) self.response_callbacks = response_callbacks self.transaction = transaction self.shard_hint = shard_hint - self.watching = False - self.reset() + self.command_stack = [] + self.scripts: Set[Script] = set() + self.explicit_transaction = False def __enter__(self) -> "Pipeline": return self @@ -1353,36 +1372,37 @@ def execute_command(self, *args, **kwargs): return self.immediate_execute_command(*args, **kwargs) return self.pipeline_execute_command(*args, **kwargs) - def _disconnect_reset_raise(self, conn, error) -> None: + def _disconnect_reset_raise_on_watching( + self, + conn: AbstractConnection, + error: Exception, + ) -> None: """ - Close the connection, reset watching state and - raise an exception if we were watching, - if retry_on_error is not set or the error is not one - of the specified error types. + Close the connection reset watching state and + raise an exception if we were watching. + + The supported exceptions are already checked in the + retry object so we don't need to do it here. + + After we disconnect the connection, it will try to reconnect and + do a health check as part of the send_command logic(on connection level). """ conn.disconnect() + # if we were already watching a variable, the watch is no longer # valid since this connection has died. raise a WatchError, which # indicates the user should retry this transaction. if self.watching: self.reset() raise WatchError( - "A ConnectionError occurred on while watching one or more keys" + f"A {type(error).__name__} occurred while watching one or more keys" ) - # if retry_on_error is not set or the error is not one - # of the specified error types, raise it - if ( - conn.retry_on_error is None - or isinstance(error, tuple(conn.retry_on_error)) is False - ): - self.reset() - raise def immediate_execute_command(self, *args, **options): """ - Execute a command immediately, but don't auto-retry on a - ConnectionError if we're already WATCHing a variable. Used when - issuing WATCH or subsequent commands retrieving their values but before + Execute a command immediately, but don't auto-retry on the supported + errors for retry if we're already WATCHing a variable. + Used when issuing WATCH or subsequent commands retrieving their values but before MULTI is called. """ command_name = args[0] @@ -1396,7 +1416,7 @@ def immediate_execute_command(self, *args, **options): lambda: self._send_command_parse_response( conn, command_name, *args, **options ), - lambda error: self._disconnect_reset_raise(conn, error), + lambda error: self._disconnect_reset_raise_on_watching(conn, error), ) def pipeline_execute_command(self, *args, **options) -> "Pipeline": @@ -1534,15 +1554,19 @@ def load_scripts(self): if not exist: s.sha = immediate("SCRIPT LOAD", s.script) - def _disconnect_raise_reset( + def _disconnect_raise_on_watching( self, conn: AbstractConnection, error: Exception, ) -> None: """ - Close the connection, raise an exception if we were watching, - and raise an exception if retry_on_error is not set or the - error is not one of the specified error types. + Close the connection, raise an exception if we were watching. + + The supported exceptions are already checked in the + retry object so we don't need to do it here. + + After we disconnect the connection, it will try to reconnect and + do a health check as part of the send_command logic(on connection level). """ conn.disconnect() # if we were watching a variable, the watch is no longer valid @@ -1550,16 +1574,8 @@ def _disconnect_raise_reset( # indicates the user should retry this transaction. if self.watching: raise WatchError( - "A ConnectionError occurred on while watching one or more keys" + f"A {type(error).__name__} occurred while watching one or more keys" ) - # if retry_on_error is not set or the error is not one - # of the specified error types, raise it - if ( - conn.retry_on_error is None - or isinstance(error, tuple(conn.retry_on_error)) is False - ): - self.reset() - raise error def execute(self, raise_on_error: bool = True) -> List[Any]: """Execute all the commands in the current pipeline""" @@ -1583,7 +1599,7 @@ def execute(self, raise_on_error: bool = True) -> List[Any]: try: return conn.retry.call_with_retry( lambda: execute(conn, stack, raise_on_error), - lambda error: self._disconnect_raise_reset(conn, error), + lambda error: self._disconnect_raise_on_watching(conn, error), ) finally: self.reset() diff --git a/redis/connection.py b/redis/connection.py index 08e980e866..ffb1e37ba3 100644 --- a/redis/connection.py +++ b/redis/connection.py @@ -1611,7 +1611,7 @@ def close(self) -> None: """Close the pool, disconnecting all connections""" self.disconnect() - def set_retry(self, retry: "Retry") -> None: + def set_retry(self, retry: Retry) -> None: self.connection_kwargs.update({"retry": retry}) for conn in self._available_connections: conn.retry = retry diff --git a/tests/test_asyncio/test_connection_pool.py b/tests/test_asyncio/test_connection_pool.py index 3d120e4ca7..09409e04a8 100644 --- a/tests/test_asyncio/test_connection_pool.py +++ b/tests/test_asyncio/test_connection_pool.py @@ -614,9 +614,9 @@ async def test_busy_loading_from_pipeline_immediate_command(self, r): "DEBUG", "ERROR", "LOADING fake message" ) pool = r.connection_pool - assert not pipe.connection - assert len(pool._available_connections) == 1 - assert not pool._available_connections[0]._reader + assert pipe.connection + assert pipe.connection in pool._in_use_connections + assert not pipe.connection._reader @pytest.mark.onlynoncluster @skip_if_server_version_lt("2.8.8") diff --git a/tests/test_connection_pool.py b/tests/test_connection_pool.py index 0ec77a4fff..d97c9063ac 100644 --- a/tests/test_connection_pool.py +++ b/tests/test_connection_pool.py @@ -563,9 +563,9 @@ def test_busy_loading_from_pipeline_immediate_command(self, r): with pytest.raises(redis.BusyLoadingError): pipe.immediate_execute_command("DEBUG", "ERROR", "LOADING fake message") pool = r.connection_pool - assert not pipe.connection - assert len(pool._available_connections) == 1 - assert not pool._available_connections[0]._sock + assert pipe.connection + assert pipe.connection in pool._in_use_connections + assert not pipe.connection._sock @pytest.mark.onlynoncluster @skip_if_server_version_lt("2.8.8") diff --git a/tests/test_retry.py b/tests/test_retry.py index e1e4c414a4..cb001fbbd5 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -159,7 +159,7 @@ def test_client_retry_on_error_raise(self, request): def test_client_retry_on_error_different_error_raised(self, request): with patch.object(Redis, "parse_response") as parse_response: - parse_response.side_effect = TimeoutError() + parse_response.side_effect = OSError() retries = 3 r = _get_client( Redis, @@ -167,7 +167,7 @@ def test_client_retry_on_error_different_error_raised(self, request): retry_on_error=[ReadOnlyError], retry=Retry(NoBackoff(), retries), ) - with pytest.raises(TimeoutError): + with pytest.raises(OSError): try: r.get("foo") finally: