Skip to content

Commit 972784f

Browse files
authored
Add BiConsumer to spring cloud stream plugin (#1077)
* Add BiConsumer to spring cloud stream plugin
1 parent f3dbccb commit 972784f

File tree

8 files changed

+99
-17
lines changed

8 files changed

+99
-17
lines changed

springwolf-core/src/main/java/io/github/springwolf/core/asyncapi/scanners/beans/DefaultBeanMethodsScanner.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package io.github.springwolf.core.asyncapi.scanners.beans;
33

4-
import io.github.springwolf.core.asyncapi.scanners.classes.spring.ConfigurationClassScanner;
4+
import io.github.springwolf.core.asyncapi.scanners.classes.spring.ComponentClassScanner;
55
import io.github.springwolf.core.asyncapi.scanners.common.annotation.AnnotationUtil;
66
import lombok.RequiredArgsConstructor;
77
import org.springframework.context.annotation.Bean;
@@ -16,11 +16,11 @@
1616
@RequiredArgsConstructor
1717
public class DefaultBeanMethodsScanner implements BeanMethodsScanner {
1818

19-
private final ConfigurationClassScanner configurationClassScanner;
19+
private final ComponentClassScanner componentClassScanner;
2020

2121
@Override
2222
public Set<Method> getBeanMethods() {
23-
return configurationClassScanner.scan().stream()
23+
return componentClassScanner.scan().stream()
2424
.map(Class::getDeclaredMethods)
2525
.map(Arrays::asList)
2626
.flatMap(List::stream)

springwolf-core/src/main/java/io/github/springwolf/core/configuration/SpringwolfScannerConfiguration.java

+2-9
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import io.github.springwolf.core.asyncapi.scanners.channels.annotations.AsyncAnnotationMethodLevelChannelsScanner;
1717
import io.github.springwolf.core.asyncapi.scanners.classes.SpringwolfClassScanner;
1818
import io.github.springwolf.core.asyncapi.scanners.classes.spring.ComponentClassScanner;
19-
import io.github.springwolf.core.asyncapi.scanners.classes.spring.ConfigurationClassScanner;
2019
import io.github.springwolf.core.asyncapi.scanners.common.AsyncAnnotationProvider;
2120
import io.github.springwolf.core.asyncapi.scanners.common.channel.AsyncAnnotationChannelService;
2221
import io.github.springwolf.core.asyncapi.scanners.common.message.AsyncAnnotationMessageService;
@@ -55,14 +54,8 @@ public ComponentClassScanner componentClassScanner(
5554

5655
@Bean
5756
@ConditionalOnMissingBean
58-
public ConfigurationClassScanner configurationClassScanner(ComponentClassScanner componentClassScanner) {
59-
return new ConfigurationClassScanner(componentClassScanner);
60-
}
61-
62-
@Bean
63-
@ConditionalOnMissingBean
64-
public BeanMethodsScanner beanMethodsScanner(ConfigurationClassScanner configurationClassScanner) {
65-
return new DefaultBeanMethodsScanner(configurationClassScanner);
57+
public BeanMethodsScanner beanMethodsScanner(ComponentClassScanner componentClassScanner) {
58+
return new DefaultBeanMethodsScanner(componentClassScanner);
6659
}
6760

6861
@Bean

springwolf-core/src/test/java/io/github/springwolf/core/asyncapi/scanners/beans/DefaultBeanMethodsScannerIntegrationTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package io.github.springwolf.core.asyncapi.scanners.beans;
33

4-
import io.github.springwolf.core.asyncapi.scanners.classes.spring.ConfigurationClassScanner;
4+
import io.github.springwolf.core.asyncapi.scanners.classes.spring.ComponentClassScanner;
55
import org.junit.jupiter.api.Test;
66
import org.junit.jupiter.api.extension.ExtendWith;
77
import org.springframework.beans.factory.annotation.Autowired;
@@ -26,11 +26,11 @@ class DefaultBeanMethodsScannerIntegrationTest {
2626
private DefaultBeanMethodsScanner beanMethodsScanner;
2727

2828
@MockBean
29-
private ConfigurationClassScanner configurationClassScanner;
29+
private ComponentClassScanner componentClassScanner;
3030

3131
@Test
3232
void name() {
33-
when(configurationClassScanner.scan()).thenReturn(Set.of(ConfigurationClass.class));
33+
when(componentClassScanner.scan()).thenReturn(Set.of(ConfigurationClass.class));
3434

3535
Set<String> beanMethods = beanMethodsScanner.getBeanMethods().stream()
3636
.map(Method::getName)

springwolf-examples/springwolf-cloud-stream-example/src/main/java/io/github/springwolf/examples/cloudstream/configuration/CloudstreamConfiguration.java

+8
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import org.springframework.context.annotation.Bean;
1313
import org.springframework.context.annotation.Configuration;
1414

15+
import java.util.Map;
16+
import java.util.function.BiConsumer;
1517
import java.util.function.Consumer;
1618
import java.util.function.Function;
1719

@@ -33,6 +35,12 @@ public Consumer<AnotherPayloadDto> consumerMethod() {
3335
return input -> log.info("Received new message in another-topic: {}", input.toString());
3436
}
3537

38+
@Bean
39+
public BiConsumer<AnotherPayloadDto, Map<String, Object>> biConsumerMethod() {
40+
return (input, headers) ->
41+
log.info("Received new message in biconsumer-topic: {}. Headers {}.", input.toString(), headers);
42+
}
43+
3644
@GooglePubSubAsyncChannelBinding(
3745
schemaSettings = @GooglePubSubAsyncSchemaSetting(encoding = "BINARY", name = "project/test"))
3846
@GooglePubSubAsyncMessageBinding(schema = @GooglePubSubAsyncMessageSchema(name = "project/test"))

springwolf-examples/springwolf-cloud-stream-example/src/main/resources/application.properties

+4-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ spring.cloud.stream.default-binder=kafka
1010
spring.cloud.stream.binders.kafka.type=kafka
1111
spring.cloud.stream.binders.kafka.environment.spring.kafka.bootstrap-servers=${spring.kafka.bootstrap-servers}
1212
spring.cloud.stream.kafka.binder.autoCreateTopics=true
13-
spring.cloud.function.definition=process;consumerMethod;consumerClass;googlePubSubConsumerMethod
13+
spring.cloud.function.definition=process;consumerMethod;consumerClass;googlePubSubConsumerMethod;biConsumerMethod
1414

1515
spring.cloud.stream.bindings.process-in-0.destination=example-topic
1616
spring.cloud.stream.bindings.process-in-0.group=springwolf
@@ -19,6 +19,9 @@ spring.cloud.stream.bindings.process-out-0.destination=another-topic
1919
spring.cloud.stream.bindings.consumerMethod-in-0.destination=consumer-topic
2020
spring.cloud.stream.bindings.consumerMethod-in-0.group=springwolf
2121

22+
spring.cloud.stream.bindings.biConsumerMethod-in-0.destination=biconsumer-topic
23+
spring.cloud.stream.bindings.biConsumerMethod-in-0.group=springwolf
24+
2225
spring.cloud.stream.bindings.consumerClass-in-0.destination=consumer-class-topic
2326
spring.cloud.stream.bindings.consumerClass-in-0.group=springwolf
2427

springwolf-examples/springwolf-cloud-stream-example/src/test/resources/asyncapi.json

+26
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,17 @@
3333
"kafka": { }
3434
}
3535
},
36+
"biconsumer-topic": {
37+
"address": "biconsumer-topic",
38+
"messages": {
39+
"AnotherPayloadDto": {
40+
"$ref": "#/components/messages/AnotherPayloadDto"
41+
}
42+
},
43+
"bindings": {
44+
"kafka": { }
45+
}
46+
},
3647
"consumer-class-topic": {
3748
"address": "consumer-class-topic",
3849
"messages": {
@@ -278,6 +289,21 @@
278289
}
279290
]
280291
},
292+
"biconsumer-topic_publish_biConsumerMethod": {
293+
"action": "receive",
294+
"channel": {
295+
"$ref": "#/channels/biconsumer-topic"
296+
},
297+
"description": "Auto-generated description",
298+
"bindings": {
299+
"kafka": { }
300+
},
301+
"messages": [
302+
{
303+
"$ref": "#/channels/biconsumer-topic/messages/AnotherPayloadDto"
304+
}
305+
]
306+
},
281307
"consumer-class-topic_publish_ConsumerClass": {
282308
"action": "receive",
283309
"channel": {

springwolf-plugins/springwolf-cloud-stream-plugin/src/main/java/io/github/springwolf/plugins/cloudstream/asyncapi/scanners/common/FunctionalChannelBeanBuilder.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.Collections;
1313
import java.util.List;
1414
import java.util.Set;
15+
import java.util.function.BiConsumer;
1516
import java.util.function.Consumer;
1617
import java.util.function.Function;
1718
import java.util.function.Predicate;
@@ -24,7 +25,7 @@ public class FunctionalChannelBeanBuilder {
2425
public Set<FunctionalChannelBeanData> build(AnnotatedElement element) {
2526
Class<?> type = getRawType(element);
2627

27-
if (Consumer.class.isAssignableFrom(type)) {
28+
if (Consumer.class.isAssignableFrom(type) || BiConsumer.class.isAssignableFrom(type)) {
2829
Type payloadType = getTypeGenerics(element).get(0);
2930
return Set.of(ofConsumer(element, payloadType));
3031
}

springwolf-plugins/springwolf-cloud-stream-plugin/src/test/java/io/github/springwolf/plugins/cloudstream/asyncapi/scanners/channels/CloudStreamFunctionChannelsScannerIntegrationTest.java

+51
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import java.util.Collections;
5555
import java.util.List;
5656
import java.util.Map;
57+
import java.util.function.BiConsumer;
5758
import java.util.function.Consumer;
5859
import java.util.function.Function;
5960
import java.util.function.Supplier;
@@ -163,6 +164,51 @@ void testConsumerBinding() {
163164
assertThat(componentsService.getMessages()).contains(Map.entry(String.class.getName(), message));
164165
}
165166

167+
@Test
168+
void testBiConsumerBinding() {
169+
// Given a binding "spring.cloud.stream.bindings.testBiConsumer-in-0.destination=test-consumer-input-topic"
170+
BindingProperties testBiConsumerInBinding = new BindingProperties();
171+
String topicName = "test-biconsumer-input-topic";
172+
testBiConsumerInBinding.setDestination(topicName);
173+
when(bindingServiceProperties.getBindings()).thenReturn(Map.of("testBiConsumer-in-0", testBiConsumerInBinding));
174+
175+
// When scan is called
176+
Map<String, ChannelObject> actualChannels = channelsScanner.scan();
177+
Map<String, Operation> actualOperations = operationsScanner.scan();
178+
179+
// Then the returned channels contain a ChannelItem with the correct data
180+
MessageObject message = MessageObject.builder()
181+
.name(String.class.getName())
182+
.title("string")
183+
.payload(MessagePayload.of(MultiFormatSchema.builder()
184+
.schema(SchemaObject.builder().type(SchemaType.STRING).build())
185+
.build()))
186+
.headers(MessageHeaders.of(
187+
MessageReference.toSchema(AsyncHeadersNotDocumented.NOT_DOCUMENTED.getTitle())))
188+
.bindings(Map.of("kafka", new EmptyMessageBinding()))
189+
.build();
190+
191+
ChannelObject expectedChannel = ChannelObject.builder()
192+
.channelId(topicName)
193+
.address(topicName)
194+
.bindings(channelBinding)
195+
.messages(Map.of(message.getMessageId(), MessageReference.toComponentMessage(message)))
196+
.build();
197+
198+
Operation expectedOperation = Operation.builder()
199+
.action(OperationAction.RECEIVE)
200+
.bindings(operationBinding)
201+
.description("Auto-generated description")
202+
.channel(ChannelReference.fromChannel(topicName))
203+
.messages(List.of(MessageReference.toChannelMessage(topicName, message)))
204+
.build();
205+
206+
assertThat(actualChannels).containsExactly(Map.entry(topicName, expectedChannel));
207+
assertThat(actualOperations)
208+
.containsExactly(Map.entry("test-biconsumer-input-topic_publish_testBiConsumer", expectedOperation));
209+
assertThat(componentsService.getMessages()).contains(Map.entry(String.class.getName(), message));
210+
}
211+
166212
@Test
167213
void testSupplierBinding() {
168214
// Given a binding "spring.cloud.stream.bindings.testSupplier-out-0.destination=test-supplier-output-topic"
@@ -488,5 +534,10 @@ public Function<String, Integer> testFunction() {
488534
public Function<KStream<Void, String>, KStream<Void, Integer>> kStreamTestFunction() {
489535
return stream -> stream.mapValues(s -> 1);
490536
}
537+
538+
@Bean
539+
public BiConsumer<String, Map<String, Object>> testBiConsumer() {
540+
return (value, header) -> System.out.println(value);
541+
}
491542
}
492543
}

0 commit comments

Comments
 (0)