13
13
14
14
import java .time .Duration ;
15
15
import java .util .Collections ;
16
- import java .util .concurrent .ExecutorService ;
17
- import java .util .concurrent .Executors ;
18
- import java .util .concurrent .TimeUnit ;
16
+ import java .util .concurrent .*;
19
17
20
18
public class ProcessingFunction extends RichAsyncFunction <IncomingEvent , ProcessedEvent > {
21
19
private static final Logger LOG = LoggerFactory .getLogger (ProcessingFunction .class );
22
20
23
21
private final String apiUrl ;
24
22
private final String apiKey ;
25
- private final long shutdownWaitTS ;
26
- private final int threadPoolSize ;
23
+ private static ExecutorService executorService ;
24
+
27
25
28
26
private transient AsyncHttpClient client ;
29
- private transient ExecutorService executorService ;
30
27
31
28
public ProcessingFunction (String apiUrl , String apiKey ) {
32
29
Preconditions .checkNotNull (apiUrl , "API URL must not be null" );
@@ -36,70 +33,62 @@ public ProcessingFunction(String apiUrl, String apiKey) {
36
33
37
34
this .apiUrl = apiUrl ;
38
35
this .apiKey = apiKey ;
39
- this .shutdownWaitTS = 20000 ; // Max time (ms) to wait for ExecutorService shutdown before forcing termination
40
- this .threadPoolSize = 30 ; // Number of threads in the ExecutorService pool for async operations
41
36
}
37
+ /**
38
+ * Instantiate the connection to an async client here to use within asyncInvoke
39
+ */
42
40
43
-
44
- // Instantiate the connection to an async client here to use within asyncInvoke
45
41
@ Override
46
42
public void open (Configuration parameters ) throws Exception {
47
43
DefaultAsyncHttpClientConfig .Builder clientBuilder = Dsl .config ().setConnectTimeout (Duration .ofSeconds (10 ));
48
44
client = Dsl .asyncHttpClient (clientBuilder );
49
- executorService = Executors .newFixedThreadPool (threadPoolSize );
50
- LOG .info ("Initialized ExecutorService with {} threads" , threadPoolSize );
45
+
46
+ int numCores = Runtime .getRuntime ().availableProcessors (); // get num cores on node for thread count
47
+ executorService = Executors .newFixedThreadPool (numCores );
48
+
51
49
}
52
50
53
- // close Async Client, Executor Service on shutdown
54
51
@ Override
55
- public void close () throws Exception {
56
- if (client != null ) {
57
- client .close ();
58
- }
59
- if (executorService != null ) {
60
- executorService .shutdown ();
61
- try {
62
- if (!executorService .awaitTermination (shutdownWaitTS , TimeUnit .MILLISECONDS )) {
63
- executorService .shutdownNow ();
64
- }
65
- } catch (InterruptedException e ) {
66
- executorService .shutdownNow ();
67
- }
68
- }
52
+ public void close () throws Exception
53
+ {
54
+ client .close ();
55
+ executorService .shutdown ();
69
56
}
70
57
71
58
@ Override
72
- public void asyncInvoke (final IncomingEvent incomingEvent , final ResultFuture <ProcessedEvent > resultFuture ) {
59
+ public void asyncInvoke (IncomingEvent incomingEvent , ResultFuture <ProcessedEvent > resultFuture ) {
60
+
73
61
// Create a new ProcessedEvent instance
74
62
ProcessedEvent processedEvent = new ProcessedEvent (incomingEvent .getMessage ());
75
63
LOG .debug ("New request: {}" , incomingEvent );
76
64
77
- executorService .submit (new Runnable () {
78
- @ Override
79
- public void run () {
65
+ // Note: The Async Client used must return a Future object or equivalent
66
+ Future <Response > future = client .prepareGet (apiUrl )
67
+ .setHeader ("x-api-key" , apiKey )
68
+ .execute ();
69
+
70
+ // Process the request via a Completable Future, in order to not block request synchronously
71
+ // Notice we are passing executor service for thread management
72
+ CompletableFuture .supplyAsync (() ->
73
+ {
80
74
try {
81
75
LOG .debug ("Trying to get response for {}" , incomingEvent .getId ());
82
- Response response = client .prepareGet (apiUrl )
83
- .setHeader ("x-api-key" , apiKey )
84
- .execute ()
85
- .get ();
86
-
87
- int statusCode = response .getStatusCode ();
88
-
89
- if (statusCode == 200 ) {
90
- LOG .debug ("Success! {}" , incomingEvent .getId ());
91
- resultFuture .complete (Collections .singleton (processedEvent ));
92
- } else if (statusCode == 500 ) { // Retryable error
93
- LOG .error ("Status code 500, retrying shortly..." );
94
- resultFuture .completeExceptionally (new Throwable (String .valueOf (statusCode )));
95
- } else {
96
- LOG .error ("Unexpected status code: {}" , statusCode );
97
- resultFuture .completeExceptionally (new Throwable (String .valueOf (statusCode )));
98
- }
99
- } catch (Exception e ) {
76
+ Response response = future .get ();
77
+ return response .getStatusCode ();
78
+ } catch (InterruptedException | ExecutionException e ) {
100
79
LOG .error ("Error during async HTTP call: {}" , e .getMessage ());
101
- resultFuture . completeExceptionally ( e ) ;
80
+ return - 1 ;
102
81
}
82
+ }, executorService ).thenAccept (statusCode -> {
83
+ if (statusCode == 200 ) {
84
+ LOG .debug ("Success! {}" , incomingEvent .getId ());
85
+ resultFuture .complete (Collections .singleton (processedEvent ));
86
+ } else if (statusCode == 500 ) { // Retryable error
87
+ LOG .error ("Status code 500, retrying shortly..." );
88
+ resultFuture .completeExceptionally (new Throwable (statusCode .toString ()));
89
+ } else {
90
+ LOG .error ("Unexpected status code: {}" , statusCode );
91
+ resultFuture .completeExceptionally (new Throwable (statusCode .toString ()));
103
92
}
104
93
});
105
94
}
0 commit comments