Skip to content

Commit 336a970

Browse files
Merge pull request #82 from alexiteki/main
Updates to Async and Side Output based on Eng Feedback
2 parents a64439e + db9cbcb commit 336a970

File tree

7 files changed

+58
-41
lines changed

7 files changed

+58
-41
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ env/
1212
venv/
1313
.java-version
1414
/pyflink/
15-
/.run/
15+

java/AsyncIO/src/main/java/com/amazonaws/services/msf/ProcessedEvent.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,33 +3,20 @@
33
public class ProcessedEvent {
44

55
final String message;
6-
public String processed;
76

87
@Override
98
public String toString() {
109
return "ProcessedEvent{" +
1110
", message='" + message + '\'' +
12-
", processed='" + processed + '\'' +
1311
'}';
1412
}
1513

16-
public ProcessedEvent(String message, String processed) {
14+
public ProcessedEvent(String message) {
1715
this.message = message;
18-
this.processed = processed;
1916
}
2017

2118
// Getter methods
2219
public String getMessage() {
2320
return message;
2421
}
25-
26-
public String getProcessed() {
27-
return processed;
28-
}
29-
30-
// Setter method for processed field
31-
public void setProcessed(String processed) {
32-
this.processed = processed;
33-
}
34-
3522
}

java/AsyncIO/src/main/java/com/amazonaws/services/msf/ProcessingFunction.java

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414

1515
import java.time.Duration;
1616
import java.util.Collections;
17-
import java.util.concurrent.CompletableFuture;
18-
import java.util.concurrent.ExecutionException;
19-
import java.util.concurrent.Future;
17+
import java.util.concurrent.*;
2018

2119
public class ProcessingFunction extends RichAsyncFunction<IncomingEvent, ProcessedEvent> {
2220
private static final Logger LOG = LoggerFactory.getLogger(ProcessingFunction.class);
2321

2422
private final String apiUrl;
2523
private final String apiKey;
24+
private static ExecutorService executorService;
25+
2626

2727
private transient AsyncHttpClient client;
2828

@@ -43,41 +43,52 @@ public ProcessingFunction(String apiUrl, String apiKey) {
4343
public void open(Configuration parameters) throws Exception {
4444
DefaultAsyncHttpClientConfig.Builder clientBuilder = Dsl.config().setConnectTimeout(Duration.ofSeconds(10));
4545
client = Dsl.asyncHttpClient(clientBuilder);
46+
47+
int numCores = Runtime.getRuntime().availableProcessors(); // get num cores on node for thread count
48+
executorService = Executors.newFixedThreadPool(numCores);
49+
50+
}
51+
52+
@Override
53+
public void close() throws Exception
54+
{
55+
client.close();
56+
executorService.shutdown();
4657
}
4758

4859
@Override
4960
public void asyncInvoke(IncomingEvent incomingEvent, ResultFuture<ProcessedEvent> resultFuture) {
5061

5162
// Create a new ProcessedEvent instance
52-
ProcessedEvent processedEvent = new ProcessedEvent(incomingEvent.getMessage(), null);
63+
ProcessedEvent processedEvent = new ProcessedEvent(incomingEvent.getMessage());
5364
LOG.debug("New request: {}", incomingEvent);
5465

66+
// Note: The Async Client used must return a Future object or equivalent
5567
Future<Response> future = client.prepareGet(apiUrl)
5668
.setHeader("x-api-key", apiKey)
5769
.execute();
5870

59-
// Asynchronously calling API and handling response via Completable Future
60-
CompletableFuture.supplyAsync(() -> {
61-
try {
62-
LOG.debug("Trying to get response for {}", incomingEvent.getId());
63-
Response response = future.get();
64-
return response.getStatusCode();
65-
} catch (InterruptedException | ExecutionException e) {
66-
LOG.error("Error during async HTTP call: {}", e.getMessage());
67-
return -1;
68-
}
69-
}).thenAccept(statusCode -> {
71+
// Process the request via a Completable Future, in order to not block request synchronously
72+
// Notice we are passing executor service for thread management
73+
CompletableFuture.supplyAsync(() ->
74+
{
75+
try {
76+
LOG.debug("Trying to get response for {}", incomingEvent.getId());
77+
Response response = future.get();
78+
return response.getStatusCode();
79+
} catch (InterruptedException | ExecutionException e) {
80+
LOG.error("Error during async HTTP call: {}", e.getMessage());
81+
return -1;
82+
}
83+
}, executorService).thenAccept(statusCode -> {
7084
if (statusCode == 200) {
71-
processedEvent.setProcessed("SUCCESS");
7285
LOG.debug("Success! {}", incomingEvent.getId());
7386
resultFuture.complete(Collections.singleton(processedEvent));
7487
} else if (statusCode == 500) { // Retryable error
7588
LOG.error("Status code 500, retrying shortly...");
76-
processedEvent.setProcessed("FAIL");
7789
resultFuture.completeExceptionally(new Throwable(statusCode.toString()));
7890
} else {
7991
LOG.error("Unexpected status code: {}", statusCode);
80-
processedEvent.setProcessed("FAIL");
8192
resultFuture.completeExceptionally(new Throwable(statusCode.toString()));
8293
}
8394
});

java/AsyncIO/src/main/java/com/amazonaws/services/msf/RetriesFlinkJob.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public static void main(String[] args) throws Exception {
8181
Preconditions.checkArgument(!outputStreamArn.isEmpty(), "Output stream ARN must not be empty");
8282

8383
processedStream
84-
.map(value -> String.format("%s,%s", value.message, value.processed))
84+
.map(value -> String.format("%s", value.message))
8585
.sinkTo(createSink(outputProperties));
8686

8787
LOGGER.debug("Starting flink job: {}", "Async I/O Retries");

java/SideOutputs/src/main/java/com/amazonaws/services/msf/IncomingEvent.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,6 @@ public IncomingEvent(String message) {
1717

1818

1919

20+
21+
2022
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.amazonaws.services.msf;
2+
3+
public enum ProcessingOutcome {
4+
SUCCESS("SUCCESS"),
5+
ERROR("ERROR");
6+
7+
private final String text;
8+
9+
ProcessingOutcome(final String text) {
10+
this.text = text;
11+
}
12+
13+
@Override
14+
public String toString() {
15+
return text;
16+
}
17+
}

java/SideOutputs/src/main/java/com/amazonaws/services/msf/SideOutputsFlinkJob.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,20 +68,20 @@ public static void main(String[] args) throws Exception {
6868
.setParallelism(1);
6969

7070
// Validate stream for invalid messages
71-
SingleOutputStreamOperator<Tuple2<IncomingEvent, Boolean>> validatedStream = source
71+
SingleOutputStreamOperator<Tuple2<IncomingEvent, ProcessingOutcome>> validatedStream = source
7272
.map(incomingEvent -> {
73-
boolean isPoisoned = "Poison".equals(incomingEvent.message);
74-
return Tuple2.of(incomingEvent, isPoisoned);
75-
}, TypeInformation.of(new TypeHint<Tuple2<IncomingEvent, Boolean>>() {
73+
ProcessingOutcome result = "Poison".equals(incomingEvent.message)?ProcessingOutcome.ERROR: ProcessingOutcome.SUCCESS;
74+
return Tuple2.of(incomingEvent, result);
75+
}, TypeInformation.of(new TypeHint<Tuple2<IncomingEvent, ProcessingOutcome>>() {
7676
}));
7777

7878
// Split the stream based on validation
7979
SingleOutputStreamOperator<IncomingEvent> mainStream = validatedStream
80-
.process(new ProcessFunction<Tuple2<IncomingEvent, Boolean>, IncomingEvent>() {
80+
.process(new ProcessFunction<Tuple2<IncomingEvent, ProcessingOutcome>, IncomingEvent>() {
8181
@Override
82-
public void processElement(Tuple2<IncomingEvent, Boolean> value, Context ctx,
82+
public void processElement(Tuple2<IncomingEvent, ProcessingOutcome> value, Context ctx,
8383
Collector<IncomingEvent> out) throws Exception {
84-
if (value.f1) {
84+
if (value.f1.equals(ProcessingOutcome.ERROR)) {
8585
// Invalid event (true), send to DLQ sink
8686
ctx.output(invalidEventsTag, value.f0);
8787
} else {

0 commit comments

Comments
 (0)