From 9014e0e24344177098ce2f7c3d4ffa59d9f147ab Mon Sep 17 00:00:00 2001 From: zizare Date: Wed, 19 Jun 2024 23:26:22 +0200 Subject: [PATCH 1/3] Use error handler for reactive cache aspect --- .../cache/interceptor/CacheAspectSupport.java | 27 ++++- .../annotation/ReactiveCachingTests.java | 106 ++++++++++++++++-- 2 files changed, 121 insertions(+), 12 deletions(-) diff --git a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java index b77b8ca6f80d..a09540d1fd33 100644 --- a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java +++ b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java @@ -506,7 +506,10 @@ private Object findInCaches(CacheOperationContext context, Object key, for (Cache cache : context.getCaches()) { if (CompletableFuture.class.isAssignableFrom(context.getMethod().getReturnType())) { - CompletableFuture result = cache.retrieve(key); + CompletableFuture result = Optional.ofNullable(cache.retrieve(key)).map(c -> c.exceptionally(ex -> { + getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key); + return null; + })).orElse(null); if (result != null) { return result.thenCompose(value -> (CompletableFuture) evaluate( (value != null ? CompletableFuture.completedFuture(unwrapCacheValue(value)) : null), @@ -1136,12 +1139,30 @@ public Object findInCaches(CacheOperationContext context, Cache cache, Object ke if (adapter.isMultiValue()) { return adapter.fromPublisher(Flux.from(Mono.fromFuture(cachedFuture)) .switchIfEmpty(Flux.defer(() -> (Flux) evaluate(null, invoker, method, contexts))) - .flatMap(v -> evaluate(valueToFlux(v, contexts), invoker, method, contexts))); + .flatMap(v -> evaluate(valueToFlux(v, contexts), invoker, method, contexts)) + .onErrorResume(RuntimeException.class, ex -> { + try { + getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key); + return Flux.defer(() -> (Flux) evaluate(null, invoker, method, contexts)); + } + catch (RuntimeException exception) { + return Flux.error(exception); + } + })); } else { return adapter.fromPublisher(Mono.fromFuture(cachedFuture) .switchIfEmpty(Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts))) - .flatMap(v -> evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts))); + .flatMap(v -> evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts)) + .onErrorResume(RuntimeException.class, ex -> { + try { + getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key); + return Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts)); + } + catch (RuntimeException exception) { + return Mono.error(exception); + } + })); } } return NOT_HANDLED; diff --git a/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java b/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java index 06130951af90..5820e266cf88 100644 --- a/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java +++ b/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java @@ -18,8 +18,10 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicLong; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import reactor.core.publisher.Flux; @@ -29,12 +31,15 @@ import org.springframework.cache.CacheManager; import org.springframework.cache.concurrent.ConcurrentMapCache; import org.springframework.cache.concurrent.ConcurrentMapCacheManager; +import org.springframework.cache.interceptor.CacheErrorHandler; +import org.springframework.cache.interceptor.LoggingCacheErrorHandler; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.lang.Nullable; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; /** * Tests for annotation-based caching methods that use reactive operators. @@ -46,10 +51,11 @@ class ReactiveCachingTests { @ParameterizedTest - @ValueSource(classes = {EarlyCacheHitDeterminationConfig.class, - EarlyCacheHitDeterminationWithoutNullValuesConfig.class, - LateCacheHitDeterminationConfig.class, - LateCacheHitDeterminationWithValueWrapperConfig.class}) + @ValueSource( + classes = {EarlyCacheHitDeterminationConfig.class, + EarlyCacheHitDeterminationWithoutNullValuesConfig.class, + LateCacheHitDeterminationConfig.class, + LateCacheHitDeterminationWithValueWrapperConfig.class}) void cacheHitDetermination(Class configClass) { AnnotationConfigApplicationContext ctx = @@ -113,11 +119,57 @@ void cacheHitDetermination(Class configClass) { ctx.close(); } + @Test + void cacheErrorHandlerWithLoggingCacheErrorHandler() { + AnnotationConfigApplicationContext ctx = + new AnnotationConfigApplicationContext(ExceptionCacheManager.class, ReactiveCacheableService.class, ErrorHandlerCachingConfiguration.class); + ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); + + Object key = new Object(); + Long r1 = service.cacheFuture(key).join(); + + assertThat(r1).isNotNull(); + assertThat(r1).as("cacheFuture").isEqualTo(0L); + + key = new Object(); + + r1 = service.cacheMono(key).block(); + + assertThat(r1).isNotNull(); + assertThat(r1).as("cacheMono").isEqualTo(1L); + + key = new Object(); + + r1 = service.cacheFlux(key).blockFirst(); + + assertThat(r1).isNotNull(); + assertThat(r1).as("cacheFlux blockFirst").isEqualTo(2L); + } + + @Test + void cacheErrorHandlerWithSimpleCacheErrorHandler() { + AnnotationConfigApplicationContext ctx = + new AnnotationConfigApplicationContext(ExceptionCacheManager.class, ReactiveCacheableService.class); + ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); + + Throwable completableFuturThrowable = catchThrowable(() -> service.cacheFuture(new Object()).join()); + assertThat(completableFuturThrowable).isInstanceOf(CompletionException.class) + .extracting(Throwable::getCause) + .isInstanceOf(UnsupportedOperationException.class); + + Throwable monoThrowable = catchThrowable(() -> service.cacheMono(new Object()).block()); + assertThat(monoThrowable).isInstanceOf(UnsupportedOperationException.class); + + Throwable fluxThrowable = catchThrowable(() -> service.cacheFlux(new Object()).blockFirst()); + assertThat(fluxThrowable).isInstanceOf(UnsupportedOperationException.class); + } + @ParameterizedTest - @ValueSource(classes = {EarlyCacheHitDeterminationConfig.class, - EarlyCacheHitDeterminationWithoutNullValuesConfig.class, - LateCacheHitDeterminationConfig.class, - LateCacheHitDeterminationWithValueWrapperConfig.class}) + @ValueSource( + classes = {EarlyCacheHitDeterminationConfig.class, + EarlyCacheHitDeterminationWithoutNullValuesConfig.class, + LateCacheHitDeterminationConfig.class, + LateCacheHitDeterminationWithValueWrapperConfig.class}) void fluxCacheDoesntDependOnFirstRequest(Class configClass) { AnnotationConfigApplicationContext ctx = @@ -139,7 +191,6 @@ void fluxCacheDoesntDependOnFirstRequest(Class configClass) { ctx.close(); } - @CacheConfig(cacheNames = "first") static class ReactiveCacheableService { @@ -242,4 +293,41 @@ public void put(Object key, @Nullable Object value) { } } + @Configuration + static class ErrorHandlerCachingConfiguration implements CachingConfigurer { + + @Bean + @Override + public CacheErrorHandler errorHandler() { + return new LoggingCacheErrorHandler(); + } + } + + @Configuration(proxyBeanMethods = false) + @EnableCaching + static class ExceptionCacheManager { + + @Bean + CacheManager cacheManager() { + return new ConcurrentMapCacheManager("first") { + @Override + protected Cache createConcurrentMapCache(String name) { + return new ConcurrentMapCache(name, isAllowNullValues()) { + @Override + public CompletableFuture retrieve(Object key) { + return CompletableFuture.supplyAsync(() -> { + throw new UnsupportedOperationException("Test exception on retrieve"); + }); + } + + @Override + public void put(Object key, @Nullable Object value) { + throw new UnsupportedOperationException("Test exception on put"); + } + }; + } + }; + } + } + } From ea7ea2a1a1cd7c4ee25da79e9a074793e4bdba6b Mon Sep 17 00:00:00 2001 From: zizare Date: Wed, 26 Jun 2024 10:56:20 +0200 Subject: [PATCH 2/3] remove unnecessary null check --- .../cache/interceptor/CacheAspectSupport.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java index a09540d1fd33..1e47fa0c25e8 100644 --- a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java +++ b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java @@ -506,12 +506,12 @@ private Object findInCaches(CacheOperationContext context, Object key, for (Cache cache : context.getCaches()) { if (CompletableFuture.class.isAssignableFrom(context.getMethod().getReturnType())) { - CompletableFuture result = Optional.ofNullable(cache.retrieve(key)).map(c -> c.exceptionally(ex -> { - getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key); - return null; - })).orElse(null); + CompletableFuture result = cache.retrieve(key); if (result != null) { - return result.thenCompose(value -> (CompletableFuture) evaluate( + return result.exceptionally(ex -> { + getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key); + return null; + }).thenCompose(value -> (CompletableFuture) evaluate( (value != null ? CompletableFuture.completedFuture(unwrapCacheValue(value)) : null), invoker, method, contexts)); } From 7acbe17bfed99419edcc076f7821c21cdba26061 Mon Sep 17 00:00:00 2001 From: zizare Date: Tue, 2 Jul 2024 08:11:19 +0200 Subject: [PATCH 3/3] remove unnecessary defer --- .../cache/interceptor/CacheAspectSupport.java | 6 +++--- .../cache/annotation/ReactiveCachingTests.java | 18 ++++++++---------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java index 1e47fa0c25e8..ff01be10870b 100644 --- a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java +++ b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java @@ -506,7 +506,7 @@ private Object findInCaches(CacheOperationContext context, Object key, for (Cache cache : context.getCaches()) { if (CompletableFuture.class.isAssignableFrom(context.getMethod().getReturnType())) { - CompletableFuture result = cache.retrieve(key); + CompletableFuture result = cache.retrieve(key); if (result != null) { return result.exceptionally(ex -> { getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key); @@ -1143,7 +1143,7 @@ public Object findInCaches(CacheOperationContext context, Cache cache, Object ke .onErrorResume(RuntimeException.class, ex -> { try { getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key); - return Flux.defer(() -> (Flux) evaluate(null, invoker, method, contexts)); + return evaluate(null, invoker, method, contexts); } catch (RuntimeException exception) { return Flux.error(exception); @@ -1157,7 +1157,7 @@ public Object findInCaches(CacheOperationContext context, Cache cache, Object ke .onErrorResume(RuntimeException.class, ex -> { try { getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key); - return Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts)); + return evaluate(null, invoker, method, contexts); } catch (RuntimeException exception) { return Mono.error(exception); diff --git a/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java b/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java index 5820e266cf88..6adf6a2ebf0e 100644 --- a/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java +++ b/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java @@ -51,11 +51,10 @@ class ReactiveCachingTests { @ParameterizedTest - @ValueSource( - classes = {EarlyCacheHitDeterminationConfig.class, - EarlyCacheHitDeterminationWithoutNullValuesConfig.class, - LateCacheHitDeterminationConfig.class, - LateCacheHitDeterminationWithValueWrapperConfig.class}) + @ValueSource(classes = {EarlyCacheHitDeterminationConfig.class, + EarlyCacheHitDeterminationWithoutNullValuesConfig.class, + LateCacheHitDeterminationConfig.class, + LateCacheHitDeterminationWithValueWrapperConfig.class}) void cacheHitDetermination(Class configClass) { AnnotationConfigApplicationContext ctx = @@ -165,11 +164,10 @@ void cacheErrorHandlerWithSimpleCacheErrorHandler() { } @ParameterizedTest - @ValueSource( - classes = {EarlyCacheHitDeterminationConfig.class, - EarlyCacheHitDeterminationWithoutNullValuesConfig.class, - LateCacheHitDeterminationConfig.class, - LateCacheHitDeterminationWithValueWrapperConfig.class}) + @ValueSource(classes = {EarlyCacheHitDeterminationConfig.class, + EarlyCacheHitDeterminationWithoutNullValuesConfig.class, + LateCacheHitDeterminationConfig.class, + LateCacheHitDeterminationWithValueWrapperConfig.class}) void fluxCacheDoesntDependOnFirstRequest(Class configClass) { AnnotationConfigApplicationContext ctx =