Skip to content

Commit 76d547e

Browse files
committed
Updating default retry strategy for standalone clients. 3 retries with ExponentialWithJitterBackoff become the default config.
1 parent 6b0abfb commit 76d547e

File tree

4 files changed

+62
-60
lines changed

4 files changed

+62
-60
lines changed

redis/client.py

+40-51
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
PubSubError,
5151
RedisError,
5252
ResponseError,
53-
TimeoutError,
5453
WatchError,
5554
)
5655
from redis.lock import Lock
@@ -601,18 +600,16 @@ def _send_command_parse_response(self, conn, command_name, *args, **options):
601600
conn.send_command(*args, **options)
602601
return self.parse_response(conn, command_name, **options)
603602

604-
def _conditional_disconnect(self, conn, error) -> None:
603+
def _close_connection(self, conn) -> None:
605604
"""
606-
Close the connection if the error is not TimeoutError.
605+
Close the connection before retrying.
606+
607607
The supported exceptions are already checked in the
608608
retry object so we don't need to do it here.
609+
609610
After we disconnect the connection, it will try to reconnect and
610611
do a health check as part of the send_command logic(on connection level).
611612
"""
612-
if isinstance(error, TimeoutError):
613-
# If the error is a TimeoutError, we don't want to
614-
# disconnect the connection. We want to retry the command.
615-
return
616613

617614
conn.disconnect()
618615

@@ -633,7 +630,7 @@ def _execute_command(self, *args, **options):
633630
lambda: self._send_command_parse_response(
634631
conn, command_name, *args, **options
635632
),
636-
lambda error: self._conditional_disconnect(conn, error),
633+
lambda _: self._close_connection(conn),
637634
)
638635
finally:
639636
if self._single_connection_client:
@@ -892,19 +889,14 @@ def clean_health_check_responses(self) -> None:
892889
)
893890
ttl -= 1
894891

895-
def _disconnect_raise_connect(self, conn, error) -> None:
892+
def _reconnect(self, conn) -> None:
896893
"""
897-
Close the connection and raise an exception
898-
if retry_on_error is not set or the error is not one
899-
of the specified error types. Otherwise, try to
900-
reconnect
894+
The supported exceptions are already checked in the
895+
retry object so we don't need to do it here.
896+
897+
In this error handler we are trying to reconnect to the server.
901898
"""
902899
conn.disconnect()
903-
if (
904-
conn.retry_on_error is None
905-
or isinstance(error, tuple(conn.retry_on_error)) is False
906-
):
907-
raise error
908900
conn.connect()
909901

910902
def _execute(self, conn, command, *args, **kwargs):
@@ -917,7 +909,7 @@ def _execute(self, conn, command, *args, **kwargs):
917909
"""
918910
return conn.retry.call_with_retry(
919911
lambda: command(*args, **kwargs),
920-
lambda error: self._disconnect_raise_connect(conn, error),
912+
lambda _: self._reconnect(conn),
921913
)
922914

923915
def parse_response(self, block=True, timeout=0):
@@ -1375,36 +1367,37 @@ def execute_command(self, *args, **kwargs):
13751367
return self.immediate_execute_command(*args, **kwargs)
13761368
return self.pipeline_execute_command(*args, **kwargs)
13771369

1378-
def _disconnect_reset_raise(self, conn, error) -> None:
1370+
def _disconnect_reset_raise_on_watching(
1371+
self,
1372+
conn: AbstractConnection,
1373+
error: Exception,
1374+
) -> None:
13791375
"""
1380-
Close the connection, reset watching state and
1381-
raise an exception if we were watching,
1382-
if retry_on_error is not set or the error is not one
1383-
of the specified error types.
1376+
Close the connection reset watching state and
1377+
raise an exception if we were watching.
1378+
1379+
The supported exceptions are already checked in the
1380+
retry object so we don't need to do it here.
1381+
1382+
After we disconnect the connection, it will try to reconnect and
1383+
do a health check as part of the send_command logic(on connection level).
13841384
"""
13851385
conn.disconnect()
1386+
13861387
# if we were already watching a variable, the watch is no longer
13871388
# valid since this connection has died. raise a WatchError, which
13881389
# indicates the user should retry this transaction.
13891390
if self.watching:
13901391
self.reset()
13911392
raise WatchError(
1392-
"A ConnectionError occurred on while watching one or more keys"
1393+
f"A {type(error).__name__} occurred while watching one or more keys"
13931394
)
1394-
# if retry_on_error is not set or the error is not one
1395-
# of the specified error types, raise it
1396-
if (
1397-
conn.retry_on_error is None
1398-
or isinstance(error, tuple(conn.retry_on_error)) is False
1399-
):
1400-
self.reset()
1401-
raise
14021395

14031396
def immediate_execute_command(self, *args, **options):
14041397
"""
1405-
Execute a command immediately, but don't auto-retry on a
1406-
ConnectionError if we're already WATCHing a variable. Used when
1407-
issuing WATCH or subsequent commands retrieving their values but before
1398+
Execute a command immediately, but don't auto-retry on the supported
1399+
errors for retry if we're already WATCHing a variable.
1400+
Used when issuing WATCH or subsequent commands retrieving their values but before
14081401
MULTI is called.
14091402
"""
14101403
command_name = args[0]
@@ -1418,7 +1411,7 @@ def immediate_execute_command(self, *args, **options):
14181411
lambda: self._send_command_parse_response(
14191412
conn, command_name, *args, **options
14201413
),
1421-
lambda error: self._disconnect_reset_raise(conn, error),
1414+
lambda error: self._disconnect_reset_raise_on_watching(conn, error),
14221415
)
14231416

14241417
def pipeline_execute_command(self, *args, **options) -> "Pipeline":
@@ -1556,32 +1549,28 @@ def load_scripts(self):
15561549
if not exist:
15571550
s.sha = immediate("SCRIPT LOAD", s.script)
15581551

1559-
def _disconnect_raise_reset(
1552+
def _disconnect_raise_on_watching(
15601553
self,
15611554
conn: AbstractConnection,
15621555
error: Exception,
15631556
) -> None:
15641557
"""
1565-
Close the connection, raise an exception if we were watching,
1566-
and raise an exception if retry_on_error is not set or the
1567-
error is not one of the specified error types.
1558+
Close the connection, raise an exception if we were watching.
1559+
1560+
The supported exceptions are already checked in the
1561+
retry object so we don't need to do it here.
1562+
1563+
After we disconnect the connection, it will try to reconnect and
1564+
do a health check as part of the send_command logic(on connection level).
15681565
"""
15691566
conn.disconnect()
15701567
# if we were watching a variable, the watch is no longer valid
15711568
# since this connection has died. raise a WatchError, which
15721569
# indicates the user should retry this transaction.
15731570
if self.watching:
15741571
raise WatchError(
1575-
"A ConnectionError occurred on while watching one or more keys"
1572+
f"A {type(error).__name__} occurred while watching one or more keys"
15761573
)
1577-
# if retry_on_error is not set or the error is not one
1578-
# of the specified error types, raise it
1579-
if (
1580-
conn.retry_on_error is None
1581-
or isinstance(error, tuple(conn.retry_on_error)) is False
1582-
):
1583-
self.reset()
1584-
raise error
15851574

15861575
def execute(self, raise_on_error: bool = True) -> List[Any]:
15871576
"""Execute all the commands in the current pipeline"""
@@ -1605,7 +1594,7 @@ def execute(self, raise_on_error: bool = True) -> List[Any]:
16051594
try:
16061595
return conn.retry.call_with_retry(
16071596
lambda: execute(conn, stack, raise_on_error),
1608-
lambda error: self._disconnect_raise_reset(conn, error),
1597+
lambda error: self._disconnect_raise_on_watching(conn, error),
16091598
)
16101599
finally:
16111600
self.reset()

redis/cluster.py

+17-4
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from redis._parsers import CommandsParser, Encoder
1111
from redis._parsers.helpers import parse_scan
12-
from redis.backoff import default_backoff
12+
from redis.backoff import NoBackoff, default_backoff
1313
from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface
1414
from redis.client import CaseInsensitiveDict, PubSub, Redis
1515
from redis.commands import READ_COMMANDS, RedisClusterCommands
@@ -431,7 +431,7 @@ def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
431431
# Choose a primary if the cluster contains different primaries
432432
self.nodes_manager.default_node = random.choice(primaries)
433433
else:
434-
# Otherwise, hoose a primary if the cluster contains different primaries
434+
# Otherwise, choose a primary if the cluster contains different primaries
435435
replicas = [node for node in self.get_replicas() if node != curr_node]
436436
if replicas:
437437
self.nodes_manager.default_node = random.choice(replicas)
@@ -1322,8 +1322,12 @@ def __eq__(self, obj):
13221322
return isinstance(obj, ClusterNode) and obj.name == self.name
13231323

13241324
def __del__(self):
1325-
if self.redis_connection is not None:
1326-
self.redis_connection.close()
1325+
try:
1326+
if self.redis_connection is not None:
1327+
self.redis_connection.close()
1328+
except Exception:
1329+
# Ignore errors when closing the connection
1330+
pass
13271331

13281332

13291333
class LoadBalancingStrategy(Enum):
@@ -1574,17 +1578,26 @@ def create_redis_connections(self, nodes):
15741578
)
15751579

15761580
def create_redis_node(self, host, port, **kwargs):
1581+
# We are configuring the connection pool not to retry connections
1582+
# to avoid retrying connections to nodes that are not reachable
1583+
# and to avoid blocking the connection pool.
1584+
# The retries will be handled on cluster client level
1585+
# where we will have proper handling of the cluster topology
1586+
retry = Retry(backoff=NoBackoff(), retries=0)
1587+
15771588
if self.from_url:
15781589
# Create a redis node with a costumed connection pool
15791590
kwargs.update({"host": host})
15801591
kwargs.update({"port": port})
15811592
kwargs.update({"cache": self._cache})
1593+
kwargs.update({"retry": retry})
15821594
r = Redis(connection_pool=self.connection_pool_class(**kwargs))
15831595
else:
15841596
r = Redis(
15851597
host=host,
15861598
port=port,
15871599
cache=self._cache,
1600+
retry=retry,
15881601
**kwargs,
15891602
)
15901603
return r

tests/test_connection_pool.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -563,9 +563,9 @@ def test_busy_loading_from_pipeline_immediate_command(self, r):
563563
with pytest.raises(redis.BusyLoadingError):
564564
pipe.immediate_execute_command("DEBUG", "ERROR", "LOADING fake message")
565565
pool = r.connection_pool
566-
assert not pipe.connection
567-
assert len(pool._available_connections) == 1
568-
assert not pool._available_connections[0]._sock
566+
assert pipe.connection
567+
assert pipe.connection in pool._in_use_connections
568+
assert not pipe.connection._sock
569569

570570
@pytest.mark.onlynoncluster
571571
@skip_if_server_version_lt("2.8.8")

tests/test_retry.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -159,15 +159,15 @@ def test_client_retry_on_error_raise(self, request):
159159

160160
def test_client_retry_on_error_different_error_raised(self, request):
161161
with patch.object(Redis, "parse_response") as parse_response:
162-
parse_response.side_effect = TimeoutError()
162+
parse_response.side_effect = OSError()
163163
retries = 3
164164
r = _get_client(
165165
Redis,
166166
request,
167167
retry_on_error=[ReadOnlyError],
168168
retry=Retry(NoBackoff(), retries),
169169
)
170-
with pytest.raises(TimeoutError):
170+
with pytest.raises(OSError):
171171
try:
172172
r.get("foo")
173173
finally:

0 commit comments

Comments
 (0)