Skip to content

Commit a4d933c

Browse files
authored
Deaggregation with KinesisStreamSource (#104)
* De-aggregation with KinesisStreamSource
1 parent 90eb988 commit a4d933c

File tree

15 files changed

+889
-0
lines changed

15 files changed

+889
-0
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
## KinesisStreamsSource de-aggregation
2+
3+
* Flink version: 1.20
4+
* Flink API: DataStream API
5+
* Language: Java (11)
6+
* Flink connectors: Kinesis Source and Sink
7+
8+
This example demonstrates how to consume records published using KPL aggregation using `KinesisStreamsSource`.
9+
10+
This folder contains two separate modules:
11+
1. [kpl-producer](kpl-producer): a simple command line Java application to produce the JSON record to a Kinesis Stream, using KPL aggregation
12+
2. [flink-app](flink-app): the Flink application demonstrating how to consume the aggregated stream, and publishing the de-aggregated records to another stream.
13+
14+
Look at the instructions in the subfolders to run the KPL Producer (data generator) and the Flink application.
15+
16+
### Background and motivation
17+
18+
As of version `5.0.0`, `KinesisStreamsSource` does not support de-aggregation yet.
19+
20+
If the connector is used to consume a stream produced with KPL aggregation, the source is not able to deserialize the records out of the box.
21+
22+
This example shows how to implement de-aggregation in the deserialization schema.
23+
24+
In particular, this example uses a wrapper which can be used to add de-aggregation to potentially any implementation
25+
of `org.apache.flink.api.common.serialization.DeserializationSchema`.
26+
27+
Implementation:
28+
[KinesisDeaggregatingDeserializationSchemaWrapper.java](flink-app/src/main/java/com/amazonaws/services/msf/deaggregation/KinesisDeaggregatingDeserializationSchemaWrapper.java)
29+
30+
> *IMPORTANT*: This implementation of de-aggregation is for demonstration purposes only.
31+
> The code is not meant for production use and is not optimized in terms of performance.
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
## Flink job consuming an aggregated stream
2+
3+
This Flink job consumes a Kinesis stream with aggregated JSON records, and publish them to another stream.
4+
5+
### Prerequisites
6+
7+
The application expects two Kinesis Streams:
8+
* Input stream containing the aggregated records
9+
* Output stream where the de-aggregated records are published
10+
11+
The application must have sufficient permissions to read and write the streams.
12+
13+
### Runtime configuration
14+
15+
The application reads the runtime configuration from the Runtime Properties, when running on Amazon Managed Service for Apache Flink,
16+
or, when running locally, from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder.
17+
18+
All parameters are case-sensitive.
19+
20+
| Group ID | Key | Description |
21+
|------------------|---------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|
22+
| `InputStream0` | `stream.arn` | ARN of the input stream. |
23+
| `InputStream0` | `aws.region` | Region of the input stream. |
24+
| `InputStream0` | `source.init.position` | (optional) Starting position when the application starts with no state. Default is `LATEST` |
25+
| `InputStream0` | `source.reader.type` | (optional) Choose between standard (`POLLING`) and Enhanced Fan-Out (`EFO`) consumer. Default is `POLLING`. |
26+
| `InputStream0` | `source.efo.consumer.name` | (optional, for EFO consumer mode only) Name of the EFO consumer. Only used if `source.reader.type=EFO`. |
27+
| `InputStream0` | `source.efo.consumer.lifecycle` | (optional, for EFO consumer mode only) Lifecycle management mode of EFO consumer. Choose between `JOB_MANAGED` and `SELF_MANAGED`. Default is `JOB_MANAGED`. |
28+
| `OutputStream0` | `stream.arn` | ARN of the output stream. |
29+
| `OutputStream0` | `aws.region` | Region of the output stream. |
30+
31+
Every parameter in the `InputStream0` group is passed to the Kinesis consumer, and every parameter in the `OutputStream0` is passed to the Kinesis client of the sink.
32+
33+
See Flink Kinesis connector docs](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/) for details about configuring the Kinesis connector.
34+
35+
To configure the application on Managed Service for Apache Flink, set up these parameter in the *Runtime properties*.
36+
37+
To configure the application for running locally, edit the [json file](resources/flink-application-properties-dev.json).
38+
39+
### Running in IntelliJ
40+
41+
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
42+
43+
See [Running examples locally](../../running-examples-locally.md) for details.
44+
45+
46+
### Data Generator
47+
48+
Use the [KPL Producer](../kpl-producer) to generate aggregates StockPrices to the Kinesis stream
49+
50+
51+
### Data example
52+
53+
```
54+
{'event_time': '2024-05-28T19:53:17.497201', 'ticker': 'AMZN', 'price': 42.88}
55+
```
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>com.amazonaws</groupId>
8+
<artifactId>kinesis-source-deaggregation</artifactId>
9+
<version>1.0</version>
10+
11+
<properties>
12+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
13+
<buildDirectory>${project.basedir}/target</buildDirectory>
14+
<jar.finalName>${project.name}-${project.version}</jar.finalName>
15+
<target.java.version>11</target.java.version>
16+
<maven.compiler.source>${target.java.version}</maven.compiler.source>
17+
<maven.compiler.target>${target.java.version}</maven.compiler.target>
18+
<flink.version>1.20.0</flink.version>
19+
<aws.connector.version>5.0.0-1.20</aws.connector.version>
20+
<kinesis.client.version>2.7.0</kinesis.client.version>
21+
<kda.runtime.version>1.2.0</kda.runtime.version>
22+
<log4j.version>2.23.1</log4j.version>
23+
</properties>
24+
25+
<dependencyManagement>
26+
<dependencies>
27+
<dependency>
28+
<groupId>com.amazonaws</groupId>
29+
<artifactId>aws-java-sdk-bom</artifactId>
30+
<!-- Get the latest SDK version from https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bom -->
31+
<version>1.12.677</version>
32+
<type>pom</type>
33+
<scope>import</scope>
34+
</dependency>
35+
<dependency>
36+
<groupId>software.amazon.awssdk</groupId>
37+
<artifactId>bom</artifactId>
38+
<version>2.31.28</version>
39+
<type>pom</type>
40+
<scope>import</scope>
41+
</dependency>
42+
</dependencies>
43+
</dependencyManagement>
44+
45+
<dependencies>
46+
<!-- Apache Flink dependencies -->
47+
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
48+
<dependency>
49+
<groupId>org.apache.flink</groupId>
50+
<artifactId>flink-streaming-java</artifactId>
51+
<version>${flink.version}</version>
52+
<scope>provided</scope>
53+
</dependency>
54+
<dependency>
55+
<groupId>org.apache.flink</groupId>
56+
<artifactId>flink-clients</artifactId>
57+
<version>${flink.version}</version>
58+
<scope>provided</scope>
59+
</dependency>
60+
<dependency>
61+
<groupId>org.apache.flink</groupId>
62+
<artifactId>flink-runtime-web</artifactId>
63+
<version>${flink.version}</version>
64+
<scope>provided</scope>
65+
</dependency>
66+
<dependency>
67+
<groupId>org.apache.flink</groupId>
68+
<artifactId>flink-json</artifactId>
69+
<version>${flink.version}</version>
70+
<scope>provided</scope>
71+
</dependency>
72+
73+
<!-- Library to retrieve runtime application properties in Managed Service for Apache Flink -->
74+
<dependency>
75+
<groupId>com.amazonaws</groupId>
76+
<artifactId>aws-kinesisanalytics-runtime</artifactId>
77+
<version>${kda.runtime.version}</version>
78+
<scope>provided</scope>
79+
</dependency>
80+
81+
<dependency>
82+
<groupId>software.amazon.kinesis</groupId>
83+
<artifactId>amazon-kinesis-client</artifactId>
84+
<version>${kinesis.client.version}</version>
85+
</dependency>
86+
<dependency>
87+
<groupId>software.amazon.awssdk</groupId>
88+
<artifactId>retries</artifactId>
89+
</dependency>
90+
91+
<!-- Connectors and Formats -->
92+
<dependency>
93+
<groupId>org.apache.flink</groupId>
94+
<artifactId>flink-connector-base</artifactId>
95+
<version>${flink.version}</version>
96+
<scope>provided</scope>
97+
</dependency>
98+
<dependency>
99+
<groupId>org.apache.flink</groupId>
100+
<artifactId>flink-connector-aws-kinesis-streams</artifactId>
101+
<version>${aws.connector.version}</version>
102+
</dependency>
103+
104+
<!-- Add logging framework, to produce console output when running in the IDE. -->
105+
<!-- These dependencies are excluded from the application JAR by default. -->
106+
<dependency>
107+
<groupId>org.apache.logging.log4j</groupId>
108+
<artifactId>log4j-slf4j-impl</artifactId>
109+
<version>${log4j.version}</version>
110+
</dependency>
111+
<dependency>
112+
<groupId>org.apache.logging.log4j</groupId>
113+
<artifactId>log4j-api</artifactId>
114+
<version>${log4j.version}</version>
115+
</dependency>
116+
<dependency>
117+
<groupId>org.apache.logging.log4j</groupId>
118+
<artifactId>log4j-core</artifactId>
119+
<version>${log4j.version}</version>
120+
</dependency>
121+
</dependencies>
122+
123+
<build>
124+
<directory>${buildDirectory}</directory>
125+
<finalName>${jar.finalName}</finalName>
126+
127+
<plugins>
128+
<!-- Java Compiler -->
129+
<plugin>
130+
<groupId>org.apache.maven.plugins</groupId>
131+
<artifactId>maven-compiler-plugin</artifactId>
132+
<version>3.8.1</version>
133+
<configuration>
134+
<source>${target.java.version}</source>
135+
<target>${target.java.version}</target>
136+
</configuration>
137+
</plugin>
138+
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
139+
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
140+
<plugin>
141+
<groupId>org.apache.maven.plugins</groupId>
142+
<artifactId>maven-shade-plugin</artifactId>
143+
<version>3.2.1</version>
144+
<executions>
145+
<!-- Run shade goal on package phase -->
146+
<execution>
147+
<phase>package</phase>
148+
<goals>
149+
<goal>shade</goal>
150+
</goals>
151+
<configuration>
152+
<artifactSet>
153+
<excludes>
154+
<exclude>org.apache.flink:force-shading</exclude>
155+
<exclude>com.google.code.findbugs:jsr305</exclude>
156+
<exclude>org.slf4j:*</exclude>
157+
<exclude>log4j:*</exclude>
158+
</excludes>
159+
</artifactSet>
160+
<filters>
161+
<filter>
162+
<!-- Do not copy the signatures in the META-INF folder.
163+
Otherwise, this might cause SecurityExceptions when using the JAR. -->
164+
<artifact>*:*</artifact>
165+
<excludes>
166+
<exclude>META-INF/*.SF</exclude>
167+
<exclude>META-INF/*.DSA</exclude>
168+
<exclude>META-INF/*.RSA</exclude>
169+
</excludes>
170+
</filter>
171+
</filters>
172+
<transformers>
173+
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
174+
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
175+
<mainClass>com.amazonaws.services.msf.StreamingJob</mainClass>
176+
</transformer>
177+
</transformers>
178+
</configuration>
179+
</execution>
180+
</executions>
181+
</plugin>
182+
</plugins>
183+
</build>
184+
</project>
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package com.amazonaws.services.msf;
2+
3+
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
4+
import com.amazonaws.services.msf.deaggregation.KinesisDeaggregatingDeserializationSchemaWrapper;
5+
import com.amazonaws.services.msf.model.StockPrice;
6+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
7+
import org.apache.flink.api.common.serialization.SerializationSchema;
8+
import org.apache.flink.api.common.typeinfo.TypeInformation;
9+
import org.apache.flink.configuration.Configuration;
10+
import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink;
11+
import org.apache.flink.connector.kinesis.source.KinesisStreamsSource;
12+
import org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema;
13+
import org.apache.flink.formats.json.JsonDeserializationSchema;
14+
import org.apache.flink.formats.json.JsonSerializationSchema;
15+
import org.apache.flink.shaded.guava31.com.google.common.collect.Maps;
16+
import org.apache.flink.streaming.api.datastream.DataStream;
17+
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
18+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
import java.io.IOException;
23+
import java.util.Map;
24+
import java.util.Properties;
25+
26+
27+
public class StreamingJob {
28+
private static final Logger LOG = LoggerFactory.getLogger(StreamingJob.class);
29+
30+
// Name of the local JSON resource with the application properties in the same format as they are received from the Amazon Managed Service for Apache Flink runtime
31+
private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json";
32+
33+
private static boolean isLocal(StreamExecutionEnvironment env) {
34+
return env instanceof LocalStreamEnvironment;
35+
}
36+
37+
/**
38+
* Load application properties from Amazon Managed Service for Apache Flink runtime or from a local resource, when the environment is local
39+
*/
40+
private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
41+
if (isLocal(env)) {
42+
LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
43+
return KinesisAnalyticsRuntime.getApplicationProperties(
44+
StreamingJob.class.getClassLoader()
45+
.getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
46+
} else {
47+
LOG.info("Loading application properties from Amazon Managed Service for Apache Flink");
48+
return KinesisAnalyticsRuntime.getApplicationProperties();
49+
}
50+
}
51+
52+
// Create a source using a KinesisDeserializationSchema
53+
private static <T> KinesisStreamsSource<T> createKinesisSource(Properties inputProperties, final KinesisDeserializationSchema<T> kinesisDeserializationSchema) {
54+
final String inputStreamArn = inputProperties.getProperty("stream.arn");
55+
return KinesisStreamsSource.<T>builder()
56+
.setStreamArn(inputStreamArn)
57+
.setSourceConfig(Configuration.fromMap(Maps.fromProperties(inputProperties)))
58+
.setDeserializationSchema(kinesisDeserializationSchema)
59+
.build();
60+
}
61+
62+
// Create a sink
63+
private static <T> KinesisStreamsSink<T> createKinesisSink(Properties outputProperties, final SerializationSchema<T> serializationSchema) {
64+
final String outputStreamArn = outputProperties.getProperty("stream.arn");
65+
return KinesisStreamsSink.<T>builder()
66+
.setStreamArn(outputStreamArn)
67+
.setKinesisClientProperties(outputProperties)
68+
.setSerializationSchema(serializationSchema)
69+
.setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
70+
.build();
71+
}
72+
73+
public static void main(String[] args) throws Exception {
74+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
75+
final Map<String, Properties> applicationProperties = loadApplicationProperties(env);
76+
LOG.warn("Application properties: {}", applicationProperties);
77+
78+
// Wrap the deserialization schema
79+
KinesisDeserializationSchema<StockPrice> deaggregatingKinesisDeserializationSchema =
80+
// Wrapper which takes care of deaggregation
81+
new KinesisDeaggregatingDeserializationSchemaWrapper<>(
82+
new JsonDeserializationSchema<>(StockPrice.class) // DeserializationSchema to deserialize each deaggregated record
83+
);
84+
KinesisStreamsSource<StockPrice> source = createKinesisSource(applicationProperties.get("InputStream0"), deaggregatingKinesisDeserializationSchema);
85+
86+
// Set up the source
87+
DataStream<StockPrice> input = env.fromSource(source,
88+
WatermarkStrategy.noWatermarks(),
89+
"Kinesis source",
90+
TypeInformation.of(StockPrice.class));
91+
92+
// Send the deaggregated records to the sink
93+
KinesisStreamsSink<StockPrice> sink = createKinesisSink(applicationProperties.get("OutputStream0"), new JsonSerializationSchema<>());
94+
95+
input.sinkTo(sink);
96+
97+
env.execute("Kinesis de-aggregating Source");
98+
}
99+
}

0 commit comments

Comments
 (0)