39
39
)
40
40
from redis .asyncio .lock import Lock
41
41
from redis .asyncio .retry import Retry
42
+ from redis .backoff import ExponentialWithJitterBackoff
42
43
from redis .client import (
43
44
EMPTY_RESPONSE ,
44
45
NEVER_DECODE ,
65
66
PubSubError ,
66
67
RedisError ,
67
68
ResponseError ,
68
- TimeoutError ,
69
69
WatchError ,
70
70
)
71
71
from redis .typing import ChannelT , EncodableT , KeyT
72
72
from redis .utils import (
73
73
HIREDIS_AVAILABLE ,
74
74
SSL_AVAILABLE ,
75
75
_set_info_logger ,
76
+ deprecated_args ,
76
77
deprecated_function ,
77
78
get_lib_version ,
78
79
safe_str ,
@@ -208,6 +209,11 @@ def from_pool(
208
209
client .auto_close_connection_pool = True
209
210
return client
210
211
212
+ @deprecated_args (
213
+ args_to_warn = ["retry_on_timeout" ],
214
+ reason = "TimeoutError is included by default." ,
215
+ version = "6.0.0" ,
216
+ )
211
217
def __init__ (
212
218
self ,
213
219
* ,
@@ -225,6 +231,9 @@ def __init__(
225
231
encoding_errors : str = "strict" ,
226
232
decode_responses : bool = False ,
227
233
retry_on_timeout : bool = False ,
234
+ retry : Retry = Retry (
235
+ backoff = ExponentialWithJitterBackoff (base = 1 , cap = 10 ), retries = 3
236
+ ),
228
237
retry_on_error : Optional [list ] = None ,
229
238
ssl : bool = False ,
230
239
ssl_keyfile : Optional [str ] = None ,
@@ -242,7 +251,6 @@ def __init__(
242
251
lib_name : Optional [str ] = "redis-py" ,
243
252
lib_version : Optional [str ] = get_lib_version (),
244
253
username : Optional [str ] = None ,
245
- retry : Optional [Retry ] = None ,
246
254
auto_close_connection_pool : Optional [bool ] = None ,
247
255
redis_connect_func = None ,
248
256
credential_provider : Optional [CredentialProvider ] = None ,
@@ -251,10 +259,24 @@ def __init__(
251
259
):
252
260
"""
253
261
Initialize a new Redis client.
254
- To specify a retry policy for specific errors, first set
255
- `retry_on_error` to a list of the error/s to retry on, then set
256
- `retry` to a valid `Retry` object.
257
- To retry on TimeoutError, `retry_on_timeout` can also be set to `True`.
262
+
263
+ To specify a retry policy for specific errors, you have two options:
264
+
265
+ 1. Set the `retry_on_error` to a list of the error/s to retry on, and
266
+ you can also set `retry` to a valid `Retry` object(in case the default
267
+ one is not appropriate) - with this approach the retries will be triggered
268
+ on the default errors specified in the Retry object enriched with the
269
+ errors specified in `retry_on_error`.
270
+
271
+ 2. Define a `Retry` object with configured 'supported_errors' and set
272
+ it to the `retry` parameter - with this approach you completely redefine
273
+ the errors on which retries will happen.
274
+
275
+ `retry_on_timeout` is deprecated - please include the TimeoutError
276
+ either in the Retry object or in the `retry_on_error` list.
277
+
278
+ When 'connection_pool' is provided - the retry configuration of the
279
+ provided pool will be used.
258
280
"""
259
281
kwargs : Dict [str , Any ]
260
282
if event_dispatcher is None :
@@ -280,8 +302,6 @@ def __init__(
280
302
# Create internal connection pool, expected to be closed by Redis instance
281
303
if not retry_on_error :
282
304
retry_on_error = []
283
- if retry_on_timeout is True :
284
- retry_on_error .append (TimeoutError )
285
305
kwargs = {
286
306
"db" : db ,
287
307
"username" : username ,
@@ -291,7 +311,6 @@ def __init__(
291
311
"encoding" : encoding ,
292
312
"encoding_errors" : encoding_errors ,
293
313
"decode_responses" : decode_responses ,
294
- "retry_on_timeout" : retry_on_timeout ,
295
314
"retry_on_error" : retry_on_error ,
296
315
"retry" : copy .deepcopy (retry ),
297
316
"max_connections" : max_connections ,
@@ -403,10 +422,10 @@ def get_connection_kwargs(self):
403
422
"""Get the connection's key-word arguments"""
404
423
return self .connection_pool .connection_kwargs
405
424
406
- def get_retry (self ) -> Optional [" Retry" ]:
425
+ def get_retry (self ) -> Optional [Retry ]:
407
426
return self .get_connection_kwargs ().get ("retry" )
408
427
409
- def set_retry (self , retry : " Retry" ) -> None :
428
+ def set_retry (self , retry : Retry ) -> None :
410
429
self .get_connection_kwargs ().update ({"retry" : retry })
411
430
self .connection_pool .set_retry (retry )
412
431
@@ -633,18 +652,17 @@ async def _send_command_parse_response(self, conn, command_name, *args, **option
633
652
await conn .send_command (* args )
634
653
return await self .parse_response (conn , command_name , ** options )
635
654
636
- async def _disconnect_raise (self , conn : Connection , error : Exception ):
655
+ async def _close_connection (self , conn : Connection ):
637
656
"""
638
- Close the connection and raise an exception
639
- if retry_on_error is not set or the error
640
- is not one of the specified error types
657
+ Close the connection before retrying.
658
+
659
+ The supported exceptions are already checked in the
660
+ retry object so we don't need to do it here.
661
+
662
+ After we disconnect the connection, it will try to reconnect and
663
+ do a health check as part of the send_command logic(on connection level).
641
664
"""
642
665
await conn .disconnect ()
643
- if (
644
- conn .retry_on_error is None
645
- or isinstance (error , tuple (conn .retry_on_error )) is False
646
- ):
647
- raise error
648
666
649
667
# COMMAND EXECUTION AND PROTOCOL PARSING
650
668
async def execute_command (self , * args , ** options ):
@@ -661,7 +679,7 @@ async def execute_command(self, *args, **options):
661
679
lambda : self ._send_command_parse_response (
662
680
conn , command_name , * args , ** options
663
681
),
664
- lambda error : self ._disconnect_raise (conn , error ),
682
+ lambda _ : self ._close_connection (conn ),
665
683
)
666
684
finally :
667
685
if self .single_connection_client :
@@ -929,19 +947,14 @@ async def connect(self):
929
947
)
930
948
)
931
949
932
- async def _disconnect_raise_connect (self , conn , error ):
950
+ async def _reconnect (self , conn ):
933
951
"""
934
952
Close the connection and raise an exception
935
953
if retry_on_error is not set or the error is not one
936
954
of the specified error types. Otherwise, try to
937
955
reconnect
938
956
"""
939
957
await conn .disconnect ()
940
- if (
941
- conn .retry_on_error is None
942
- or isinstance (error , tuple (conn .retry_on_error )) is False
943
- ):
944
- raise error
945
958
await conn .connect ()
946
959
947
960
async def _execute (self , conn , command , * args , ** kwargs ):
@@ -954,7 +967,7 @@ async def _execute(self, conn, command, *args, **kwargs):
954
967
"""
955
968
return await conn .retry .call_with_retry (
956
969
lambda : command (* args , ** kwargs ),
957
- lambda error : self ._disconnect_raise_connect (conn , error ),
970
+ lambda _ : self ._reconnect (conn ),
958
971
)
959
972
960
973
async def parse_response (self , block : bool = True , timeout : float = 0 ):
@@ -1245,7 +1258,8 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
1245
1258
in one transmission. This is convenient for batch processing, such as
1246
1259
saving all the values in a list to Redis.
1247
1260
1248
- All commands executed within a pipeline are wrapped with MULTI and EXEC
1261
+ All commands executed within a pipeline(when running in transactional mode,
1262
+ which is the default behavior) are wrapped with MULTI and EXEC
1249
1263
calls. This guarantees all commands executed in the pipeline will be
1250
1264
executed atomically.
1251
1265
@@ -1274,7 +1288,7 @@ def __init__(
1274
1288
self .shard_hint = shard_hint
1275
1289
self .watching = False
1276
1290
self .command_stack : CommandStackT = []
1277
- self .scripts : Set [" Script" ] = set ()
1291
+ self .scripts : Set [Script ] = set ()
1278
1292
self .explicit_transaction = False
1279
1293
1280
1294
async def __aenter__ (self : _RedisT ) -> _RedisT :
@@ -1346,36 +1360,36 @@ def execute_command(
1346
1360
return self .immediate_execute_command (* args , ** kwargs )
1347
1361
return self .pipeline_execute_command (* args , ** kwargs )
1348
1362
1349
- async def _disconnect_reset_raise (self , conn , error ):
1363
+ async def _disconnect_reset_raise_on_watching (
1364
+ self ,
1365
+ conn : Connection ,
1366
+ error : Exception ,
1367
+ ):
1350
1368
"""
1351
- Close the connection, reset watching state and
1352
- raise an exception if we were watching,
1353
- if retry_on_error is not set or the error is not one
1354
- of the specified error types.
1369
+ Close the connection reset watching state and
1370
+ raise an exception if we were watching.
1371
+
1372
+ The supported exceptions are already checked in the
1373
+ retry object so we don't need to do it here.
1374
+
1375
+ After we disconnect the connection, it will try to reconnect and
1376
+ do a health check as part of the send_command logic(on connection level).
1355
1377
"""
1356
1378
await conn .disconnect ()
1357
1379
# if we were already watching a variable, the watch is no longer
1358
1380
# valid since this connection has died. raise a WatchError, which
1359
1381
# indicates the user should retry this transaction.
1360
1382
if self .watching :
1361
- await self .aclose ()
1383
+ await self .reset ()
1362
1384
raise WatchError (
1363
- "A ConnectionError occurred on while watching one or more keys"
1385
+ f "A { type ( error ). __name__ } occurred while watching one or more keys"
1364
1386
)
1365
- # if retry_on_error is not set or the error is not one
1366
- # of the specified error types, raise it
1367
- if (
1368
- conn .retry_on_error is None
1369
- or isinstance (error , tuple (conn .retry_on_error )) is False
1370
- ):
1371
- await self .aclose ()
1372
- raise
1373
1387
1374
1388
async def immediate_execute_command (self , * args , ** options ):
1375
1389
"""
1376
- Execute a command immediately, but don't auto-retry on a
1377
- ConnectionError if we're already WATCHing a variable. Used when
1378
- issuing WATCH or subsequent commands retrieving their values but before
1390
+ Execute a command immediately, but don't auto-retry on the supported
1391
+ errors for retry if we're already WATCHing a variable.
1392
+ Used when issuing WATCH or subsequent commands retrieving their values but before
1379
1393
MULTI is called.
1380
1394
"""
1381
1395
command_name = args [0 ]
@@ -1389,7 +1403,7 @@ async def immediate_execute_command(self, *args, **options):
1389
1403
lambda : self ._send_command_parse_response (
1390
1404
conn , command_name , * args , ** options
1391
1405
),
1392
- lambda error : self ._disconnect_reset_raise (conn , error ),
1406
+ lambda error : self ._disconnect_reset_raise_on_watching (conn , error ),
1393
1407
)
1394
1408
1395
1409
def pipeline_execute_command (self , * args , ** options ):
@@ -1544,28 +1558,24 @@ async def load_scripts(self):
1544
1558
if not exist :
1545
1559
s .sha = await immediate ("SCRIPT LOAD" , s .script )
1546
1560
1547
- async def _disconnect_raise_reset (self , conn : Connection , error : Exception ):
1561
+ async def _disconnect_raise_on_watching (self , conn : Connection , error : Exception ):
1548
1562
"""
1549
- Close the connection, raise an exception if we were watching,
1550
- and raise an exception if retry_on_error is not set or the
1551
- error is not one of the specified error types.
1563
+ Close the connection, raise an exception if we were watching.
1564
+
1565
+ The supported exceptions are already checked in the
1566
+ retry object so we don't need to do it here.
1567
+
1568
+ After we disconnect the connection, it will try to reconnect and
1569
+ do a health check as part of the send_command logic(on connection level).
1552
1570
"""
1553
1571
await conn .disconnect ()
1554
1572
# if we were watching a variable, the watch is no longer valid
1555
1573
# since this connection has died. raise a WatchError, which
1556
1574
# indicates the user should retry this transaction.
1557
1575
if self .watching :
1558
1576
raise WatchError (
1559
- "A ConnectionError occurred on while watching one or more keys"
1577
+ f "A { type ( error ). __name__ } occurred while watching one or more keys"
1560
1578
)
1561
- # if retry_on_error is not set or the error is not one
1562
- # of the specified error types, raise it
1563
- if (
1564
- conn .retry_on_error is None
1565
- or isinstance (error , tuple (conn .retry_on_error )) is False
1566
- ):
1567
- await self .reset ()
1568
- raise
1569
1579
1570
1580
async def execute (self , raise_on_error : bool = True ) -> List [Any ]:
1571
1581
"""Execute all the commands in the current pipeline"""
@@ -1590,7 +1600,7 @@ async def execute(self, raise_on_error: bool = True) -> List[Any]:
1590
1600
try :
1591
1601
return await conn .retry .call_with_retry (
1592
1602
lambda : execute (conn , stack , raise_on_error ),
1593
- lambda error : self ._disconnect_raise_reset (conn , error ),
1603
+ lambda error : self ._disconnect_raise_on_watching (conn , error ),
1594
1604
)
1595
1605
finally :
1596
1606
await self .reset ()
0 commit comments