Skip to content

Commit c6efcc1

Browse files
Merge pull request #63 from alexiteki/main
Java Examples for Exception Handling including Async IO and Side Ouputs
2 parents 087f2ac + 1de2557 commit c6efcc1

17 files changed

+1083
-0
lines changed

java/AsyncIO/README.md

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
## Sample illustrating how to use Async I/O for Apache Flink with Retries
2+
* Flink version: 1.20
3+
* Flink API: DataStream API
4+
* Language: Java (11)
5+
* Connectors: [Datagen](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/datagen/), [Kinesis](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kinesis/)
6+
7+
This sample illustrates how to leverage the Async I/O pattern in Apache Flink, with retries on errors and timeouts. More details on Apache Flink's Async I/O Functionality can be found [here](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/)
8+
9+
Data is generated from a Datagen Source connector. The data is then run through an AsyncWaitOperator, which calls an API Gateway endpoint to return a response. The infrastructure for the API Endpoint can be launched via CloudFormation template, with instructions on how to do so below.
10+
11+
The application generates data internally and writes to a Kinesis Stream.
12+
13+
### Set up Cloud Infrastructure
14+
In order to run this example, you will need to have an endpoint to query against. We have provided an [AWS CloudFormation Template](src/main/resources/lambda-cloudformation.yaml) to launch the following resources in your AWS Account:
15+
- **IAM Role (LambdaExecutionRole)**: Creates a role for Lambda to write logs to CloudWatch.
16+
- **Lambda Function (RandomResponseLambda)**: Deploys a Python Lambda function that randomly returns different HTTP statuses.
17+
- **API Gateway (ApiGateway)**: Exposes the Lambda function via an HTTP endpoint.
18+
19+
**Total Estimated Cost**:
20+
Approximately $3.70 per million requests (combined Lambda and API Gateway costs after the free tier).
21+
22+
23+
### Runtime configuration
24+
25+
When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*.
26+
27+
When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](src/main/resources/flink-application-properties-dev.json) file located in the resources folder.
28+
29+
30+
Runtime parameters:
31+
32+
Here is the JSON data in the desired table format, with personally identifiable information (PII) removed and replaced with `X's`:
33+
34+
| Group ID | Key | Description |
35+
|-------------------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|
36+
| `OutputStream0` | `stream.arn` | ARN of the output stream. |
37+
| `OutputStream0` | `aws.region` | (optional) Region of the output stream. If not specified, it will use the application region or the default region of the AWS profile, when running locally. |
38+
| `EndpointService` | `api.url` | API URL for accessing the API Gateway Endpoint (found in CFN Outputs Tab) |
39+
| `EndpointService` | `api.key` | API key for authentication to the API Gateway Endpoint (found in CFN Outputs Tab) |
40+
| `EndpointService` | `aws.region` | (Optional) Region of the output stream. If not specified, it will use the application region or the default region of the AWS profile, when running locally. |
41+
42+
All parameters are case-sensitive.
43+
44+
This simple example assumes the Kinesis Stream is in the same region as the application, or in the default region for the authentication profile, when running locally.
45+
46+
47+
### Running in IntelliJ
48+
49+
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
50+
51+
See [Running examples locally](../running-examples-locally.md) for details.

java/AsyncIO/pom.xml

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>com.amazonaws</groupId>
8+
<artifactId>aws-msf-async-io</artifactId>
9+
<version>1.0</version>
10+
<packaging>jar</packaging>
11+
12+
<properties>
13+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
14+
<buildDirectory>${project.basedir}/target</buildDirectory>
15+
<jar.finalName>${project.name}-${project.version}</jar.finalName>
16+
<target.java.version>11</target.java.version>
17+
<maven.compiler.source>${target.java.version}</maven.compiler.source>
18+
<maven.compiler.target>${target.java.version}</maven.compiler.target>
19+
<flink.version>1.20.0</flink.version>
20+
<flink.connector.version>4.3.0-1.19</flink.connector.version>
21+
<kda.runtime.version>1.2.0</kda.runtime.version>
22+
<log4j.version>2.23.1</log4j.version>
23+
<jackson.version>2.16.2</jackson.version>
24+
</properties>
25+
26+
<dependencyManagement>
27+
<dependencies>
28+
<dependency>
29+
<groupId>com.amazonaws</groupId>
30+
<artifactId>aws-java-sdk-bom</artifactId>
31+
<!-- Get the latest SDK version from https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bom -->
32+
<version>1.12.676</version>
33+
<type>pom</type>
34+
<scope>import</scope>
35+
</dependency>
36+
</dependencies>
37+
</dependencyManagement>
38+
39+
<dependencies>
40+
<!-- Amazon Managed Service for Apache Flink (formerly Amazon Kinesis Data Analytics) runtime-->
41+
<dependency>
42+
<groupId>com.amazonaws</groupId>
43+
<artifactId>aws-kinesisanalytics-runtime</artifactId>
44+
<version>${kda.runtime.version}</version>
45+
</dependency>
46+
47+
<!-- Apache Flink dependencies -->
48+
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
49+
<dependency>
50+
<groupId>org.apache.flink</groupId>
51+
<artifactId>flink-streaming-java</artifactId>
52+
<version>${flink.version}</version>
53+
<scope>provided</scope>
54+
</dependency>
55+
<dependency>
56+
<groupId>org.apache.flink</groupId>
57+
<artifactId>flink-runtime-web</artifactId>
58+
<version>${flink.version}</version>
59+
<scope>provided</scope>
60+
</dependency>
61+
62+
<!-- Flink Kinesis connector -->
63+
<dependency>
64+
<groupId>org.apache.flink</groupId>
65+
<artifactId>flink-connector-base</artifactId>
66+
<version>${flink.version}</version>
67+
</dependency>
68+
<dependency>
69+
<groupId>org.apache.flink</groupId>
70+
<artifactId>flink-connector-kinesis</artifactId>
71+
<version>${flink.connector.version}</version>
72+
</dependency>
73+
<dependency>
74+
<groupId>org.apache.flink</groupId>
75+
<artifactId>flink-connector-datagen</artifactId>
76+
<version>${flink.version}</version>
77+
</dependency>
78+
79+
<!-- Add logging framework, to produce console output when running in the IDE. -->
80+
<!-- These dependencies are excluded from the application JAR by default. -->
81+
<dependency>
82+
<groupId>org.apache.logging.log4j</groupId>
83+
<artifactId>log4j-slf4j-impl</artifactId>
84+
<version>${log4j.version}</version>
85+
<scope>compile</scope>
86+
</dependency>
87+
<dependency>
88+
<groupId>org.apache.logging.log4j</groupId>
89+
<artifactId>log4j-api</artifactId>
90+
<version>${log4j.version}</version>
91+
<scope>compile</scope>
92+
</dependency>
93+
<dependency>
94+
<groupId>org.apache.logging.log4j</groupId>
95+
<artifactId>log4j-core</artifactId>
96+
<version>${log4j.version}</version>
97+
<scope>compile</scope>
98+
</dependency>
99+
100+
<!-- HTTP Async Client -->
101+
<dependency>
102+
<groupId>org.asynchttpclient</groupId>
103+
<artifactId>async-http-client</artifactId>
104+
<version>3.0.0</version>
105+
</dependency>
106+
</dependencies>
107+
108+
<build>
109+
<directory>${buildDirectory}</directory>
110+
<finalName>${jar.finalName}</finalName>
111+
112+
<plugins>
113+
<!-- Java Compiler -->
114+
<plugin>
115+
<groupId>org.apache.maven.plugins</groupId>
116+
<artifactId>maven-compiler-plugin</artifactId>
117+
<version>3.8.1</version>
118+
<configuration>
119+
<source>${target.java.version}</source>
120+
<target>${target.java.version}</target>
121+
</configuration>
122+
</plugin>
123+
124+
<!-- Shade plugin to build the fat-jar including all required dependencies -->
125+
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
126+
<plugin>
127+
<groupId>org.apache.maven.plugins</groupId>
128+
<artifactId>maven-shade-plugin</artifactId>
129+
<version>3.2.1</version>
130+
<executions>
131+
<!-- Run shade goal on package phase -->
132+
<execution>
133+
<phase>package</phase>
134+
<goals>
135+
<goal>shade</goal>
136+
</goals>
137+
<configuration>
138+
<artifactSet>
139+
<excludes>
140+
<exclude>org.apache.flink:force-shading</exclude>
141+
<exclude>com.google.code.findbugs:jsr305</exclude>
142+
<exclude>org.slf4j:*</exclude>
143+
<exclude>log4j:*</exclude>
144+
</excludes>
145+
</artifactSet>
146+
<filters>
147+
<filter>
148+
<!-- Do not copy the signatures in the META-INF folder.
149+
Otherwise, this might cause SecurityExceptions when using the JAR. -->
150+
<artifact>*:*</artifact>
151+
<excludes>
152+
<exclude>META-INF/*.SF</exclude>
153+
<exclude>META-INF/*.DSA</exclude>
154+
<exclude>META-INF/*.RSA</exclude>
155+
</excludes>
156+
</filter>
157+
</filters>
158+
<transformers>
159+
<transformer
160+
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
161+
<transformer
162+
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
163+
<mainClass>com.amazonaws.services.msf.RetriesFlinkJob</mainClass>
164+
</transformer>
165+
</transformers>
166+
</configuration>
167+
</execution>
168+
</executions>
169+
</plugin>
170+
</plugins>
171+
</build>
172+
</project>
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.amazonaws.services.msf;
2+
3+
import java.util.UUID;
4+
5+
public class IncomingEvent {
6+
7+
final String message;
8+
private String id;
9+
10+
@Override
11+
public String toString() {
12+
return "IncomingEvent{" +
13+
"id='" + id + '\'' +
14+
", message='" + message + '\'' +
15+
'}';
16+
}
17+
18+
public IncomingEvent(String message) {
19+
this.message = message;
20+
21+
}
22+
23+
// Getter method
24+
public String getMessage() {
25+
return message;
26+
}
27+
28+
public String getId() {
29+
return id;
30+
}
31+
32+
public void setId(String id)
33+
{
34+
this.id = id;
35+
}
36+
37+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.amazonaws.services.msf;
2+
3+
import org.apache.flink.connector.datagen.source.GeneratorFunction;
4+
5+
import java.util.UUID;
6+
7+
class IncomingEventDataGeneratorFunction implements GeneratorFunction<Long, IncomingEvent> {
8+
9+
@Override
10+
public IncomingEvent map(Long aLong) {
11+
12+
String message = "Hello World";
13+
14+
IncomingEvent incomingEvent = new IncomingEvent(message);
15+
incomingEvent.setId(UUID.randomUUID().toString().replace("-", ""));
16+
17+
return incomingEvent;
18+
}
19+
20+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.amazonaws.services.msf;
2+
3+
public class ProcessedEvent {
4+
5+
final String message;
6+
public String processed;
7+
8+
@Override
9+
public String toString() {
10+
return "ProcessedEvent{" +
11+
", message='" + message + '\'' +
12+
", processed='" + processed + '\'' +
13+
'}';
14+
}
15+
16+
public ProcessedEvent(String message, String processed) {
17+
this.message = message;
18+
this.processed = processed;
19+
}
20+
21+
// Getter methods
22+
public String getMessage() {
23+
return message;
24+
}
25+
26+
public String getProcessed() {
27+
return processed;
28+
}
29+
30+
// Setter method for processed field
31+
public void setProcessed(String processed) {
32+
this.processed = processed;
33+
}
34+
35+
}

0 commit comments

Comments
 (0)