Skip to content

Updating default retry strategy for standalone clients. 3 retries with ExponentialWithJitterBackoff become the default config. #3614

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 73 additions & 63 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
)
from redis.asyncio.lock import Lock
from redis.asyncio.retry import Retry
from redis.backoff import ExponentialWithJitterBackoff
from redis.client import (
EMPTY_RESPONSE,
NEVER_DECODE,
Expand All @@ -65,14 +66,14 @@
PubSubError,
RedisError,
ResponseError,
TimeoutError,
WatchError,
)
from redis.typing import ChannelT, EncodableT, KeyT
from redis.utils import (
HIREDIS_AVAILABLE,
SSL_AVAILABLE,
_set_info_logger,
deprecated_args,
deprecated_function,
get_lib_version,
safe_str,
Expand Down Expand Up @@ -208,6 +209,11 @@ def from_pool(
client.auto_close_connection_pool = True
return client

@deprecated_args(
args_to_warn=["retry_on_timeout"],
reason="TimeoutError is included by default.",
version="6.0.0",
)
def __init__(
self,
*,
Expand All @@ -225,6 +231,9 @@ def __init__(
encoding_errors: str = "strict",
decode_responses: bool = False,
retry_on_timeout: bool = False,
retry: Retry = Retry(
backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
),
retry_on_error: Optional[list] = None,
ssl: bool = False,
ssl_keyfile: Optional[str] = None,
Expand All @@ -242,7 +251,6 @@ def __init__(
lib_name: Optional[str] = "redis-py",
lib_version: Optional[str] = get_lib_version(),
username: Optional[str] = None,
retry: Optional[Retry] = None,
auto_close_connection_pool: Optional[bool] = None,
redis_connect_func=None,
credential_provider: Optional[CredentialProvider] = None,
Expand All @@ -251,10 +259,24 @@ def __init__(
):
"""
Initialize a new Redis client.
To specify a retry policy for specific errors, first set
`retry_on_error` to a list of the error/s to retry on, then set
`retry` to a valid `Retry` object.
To retry on TimeoutError, `retry_on_timeout` can also be set to `True`.

To specify a retry policy for specific errors, you have two options:

1. Set the `retry_on_error` to a list of the error/s to retry on, and
you can also set `retry` to a valid `Retry` object(in case the default
one is not appropriate) - with this approach the retries will be triggered
on the default errors specified in the Retry object enriched with the
errors specified in `retry_on_error`.

2. Define a `Retry` object with configured 'supported_errors' and set
it to the `retry` parameter - with this approach you completely redefine
the errors on which retries will happen.

`retry_on_timeout` is deprecated - please include the TimeoutError
either in the Retry object or in the `retry_on_error` list.

When 'connection_pool' is provided - the retry configuration of the
provided pool will be used.
"""
kwargs: Dict[str, Any]
if event_dispatcher is None:
Expand All @@ -280,8 +302,6 @@ def __init__(
# Create internal connection pool, expected to be closed by Redis instance
if not retry_on_error:
retry_on_error = []
if retry_on_timeout is True:
retry_on_error.append(TimeoutError)
kwargs = {
"db": db,
"username": username,
Expand All @@ -291,7 +311,6 @@ def __init__(
"encoding": encoding,
"encoding_errors": encoding_errors,
"decode_responses": decode_responses,
"retry_on_timeout": retry_on_timeout,
"retry_on_error": retry_on_error,
"retry": copy.deepcopy(retry),
"max_connections": max_connections,
Expand Down Expand Up @@ -403,10 +422,10 @@ def get_connection_kwargs(self):
"""Get the connection's key-word arguments"""
return self.connection_pool.connection_kwargs

def get_retry(self) -> Optional["Retry"]:
def get_retry(self) -> Optional[Retry]:
return self.get_connection_kwargs().get("retry")

def set_retry(self, retry: "Retry") -> None:
def set_retry(self, retry: Retry) -> None:
self.get_connection_kwargs().update({"retry": retry})
self.connection_pool.set_retry(retry)

Expand Down Expand Up @@ -633,18 +652,17 @@ async def _send_command_parse_response(self, conn, command_name, *args, **option
await conn.send_command(*args)
return await self.parse_response(conn, command_name, **options)

async def _disconnect_raise(self, conn: Connection, error: Exception):
async def _close_connection(self, conn: Connection):
"""
Close the connection and raise an exception
if retry_on_error is not set or the error
is not one of the specified error types
Close the connection before retrying.

The supported exceptions are already checked in the
retry object so we don't need to do it here.

After we disconnect the connection, it will try to reconnect and
do a health check as part of the send_command logic(on connection level).
"""
await conn.disconnect()
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
raise error

# COMMAND EXECUTION AND PROTOCOL PARSING
async def execute_command(self, *args, **options):
Expand All @@ -661,7 +679,7 @@ async def execute_command(self, *args, **options):
lambda: self._send_command_parse_response(
conn, command_name, *args, **options
),
lambda error: self._disconnect_raise(conn, error),
lambda _: self._close_connection(conn),
)
finally:
if self.single_connection_client:
Expand Down Expand Up @@ -929,19 +947,14 @@ async def connect(self):
)
)

async def _disconnect_raise_connect(self, conn, error):
async def _reconnect(self, conn):
"""
Close the connection and raise an exception
if retry_on_error is not set or the error is not one
of the specified error types. Otherwise, try to
reconnect
"""
await conn.disconnect()
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
raise error
await conn.connect()

async def _execute(self, conn, command, *args, **kwargs):
Expand All @@ -954,7 +967,7 @@ async def _execute(self, conn, command, *args, **kwargs):
"""
return await conn.retry.call_with_retry(
lambda: command(*args, **kwargs),
lambda error: self._disconnect_raise_connect(conn, error),
lambda _: self._reconnect(conn),
)

async def parse_response(self, block: bool = True, timeout: float = 0):
Expand Down Expand Up @@ -1245,7 +1258,8 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
in one transmission. This is convenient for batch processing, such as
saving all the values in a list to Redis.

All commands executed within a pipeline are wrapped with MULTI and EXEC
All commands executed within a pipeline(when running in transactional mode,
which is the default behavior) are wrapped with MULTI and EXEC
calls. This guarantees all commands executed in the pipeline will be
executed atomically.

Expand Down Expand Up @@ -1274,7 +1288,7 @@ def __init__(
self.shard_hint = shard_hint
self.watching = False
self.command_stack: CommandStackT = []
self.scripts: Set["Script"] = set()
self.scripts: Set[Script] = set()
self.explicit_transaction = False

async def __aenter__(self: _RedisT) -> _RedisT:
Expand Down Expand Up @@ -1346,36 +1360,36 @@ def execute_command(
return self.immediate_execute_command(*args, **kwargs)
return self.pipeline_execute_command(*args, **kwargs)

async def _disconnect_reset_raise(self, conn, error):
async def _disconnect_reset_raise_on_watching(
self,
conn: Connection,
error: Exception,
):
"""
Close the connection, reset watching state and
raise an exception if we were watching,
if retry_on_error is not set or the error is not one
of the specified error types.
Close the connection reset watching state and
raise an exception if we were watching.

The supported exceptions are already checked in the
retry object so we don't need to do it here.

After we disconnect the connection, it will try to reconnect and
do a health check as part of the send_command logic(on connection level).
"""
await conn.disconnect()
# if we were already watching a variable, the watch is no longer
# valid since this connection has died. raise a WatchError, which
# indicates the user should retry this transaction.
if self.watching:
await self.aclose()
await self.reset()
raise WatchError(
"A ConnectionError occurred on while watching one or more keys"
f"A {type(error).__name__} occurred while watching one or more keys"
)
# if retry_on_error is not set or the error is not one
# of the specified error types, raise it
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
await self.aclose()
raise

async def immediate_execute_command(self, *args, **options):
"""
Execute a command immediately, but don't auto-retry on a
ConnectionError if we're already WATCHing a variable. Used when
issuing WATCH or subsequent commands retrieving their values but before
Execute a command immediately, but don't auto-retry on the supported
errors for retry if we're already WATCHing a variable.
Used when issuing WATCH or subsequent commands retrieving their values but before
MULTI is called.
"""
command_name = args[0]
Expand All @@ -1389,7 +1403,7 @@ async def immediate_execute_command(self, *args, **options):
lambda: self._send_command_parse_response(
conn, command_name, *args, **options
),
lambda error: self._disconnect_reset_raise(conn, error),
lambda error: self._disconnect_reset_raise_on_watching(conn, error),
)

def pipeline_execute_command(self, *args, **options):
Expand Down Expand Up @@ -1544,28 +1558,24 @@ async def load_scripts(self):
if not exist:
s.sha = await immediate("SCRIPT LOAD", s.script)

async def _disconnect_raise_reset(self, conn: Connection, error: Exception):
async def _disconnect_raise_on_watching(self, conn: Connection, error: Exception):
"""
Close the connection, raise an exception if we were watching,
and raise an exception if retry_on_error is not set or the
error is not one of the specified error types.
Close the connection, raise an exception if we were watching.

The supported exceptions are already checked in the
retry object so we don't need to do it here.

After we disconnect the connection, it will try to reconnect and
do a health check as part of the send_command logic(on connection level).
"""
await conn.disconnect()
# if we were watching a variable, the watch is no longer valid
# since this connection has died. raise a WatchError, which
# indicates the user should retry this transaction.
if self.watching:
raise WatchError(
"A ConnectionError occurred on while watching one or more keys"
f"A {type(error).__name__} occurred while watching one or more keys"
)
# if retry_on_error is not set or the error is not one
# of the specified error types, raise it
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
await self.reset()
raise

async def execute(self, raise_on_error: bool = True) -> List[Any]:
"""Execute all the commands in the current pipeline"""
Expand All @@ -1590,7 +1600,7 @@ async def execute(self, raise_on_error: bool = True) -> List[Any]:
try:
return await conn.retry.call_with_retry(
lambda: execute(conn, stack, raise_on_error),
lambda error: self._disconnect_raise_reset(conn, error),
lambda error: self._disconnect_raise_on_watching(conn, error),
)
finally:
await self.reset()
Expand Down
12 changes: 12 additions & 0 deletions redis/asyncio/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ def update_supported_errors(self, specified_errors: list):
set(self._supported_errors + tuple(specified_errors))
)

def get_retries_count(self) -> int:
"""
Returns the current retries count
"""
return self._retries

def update_retries_count(self, retries: int) -> None:
"""
Updates the retries count with the specified value
"""
self._retries = retries

async def call_with_retry(
self, do: Callable[[], Awaitable[T]], fail: Callable[[RedisError], Any]
) -> T:
Expand Down
Loading
Loading