Skip to content

Commit 0a5642d

Browse files
committed
Add example for DynamoDB Stream
1 parent c6efcc1 commit 0a5642d

File tree

9 files changed

+469
-0
lines changed

9 files changed

+469
-0
lines changed

java/DynamoDBStreamSource/README.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# DynamoDB Streams Source example
2+
3+
* Flink version: 1.20
4+
* Flink API: DataStream API
5+
* Language: Java (11)
6+
* Flink connectors: DynamoDb Streams Source
7+
8+
9+
This example demonstrate how to use Flink DynamoDB Streams source.
10+
11+
This example uses the `DynamoDbStreamsSource` provided in Apache Flink's connector ecosystem.
12+
13+
### Pre-requisite set up
14+
15+
To run this example, the following resources needs to be created:
16+
1. A DynamoDB table - the example uses a table schema documented using `@DynamoDbBean`. See `DdbTableItem`.
17+
2. Set up DynamoDB Stream against the created table. See [DynamoDB Streams documentation](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html).
18+
3. Add items to the DynamoDB table using the schema created via console.
19+
20+
21+
### Runtime configuration
22+
23+
The application reads the runtime configuration from the Runtime Properties, when running on Amazon Managed Service for Apache Flink,
24+
or, when running locally, from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder.
25+
26+
All parameters are case-sensitive.
27+
28+
| Group ID | Key | Description |
29+
|-----------------|---------------|---------------------------|
30+
| `InputStream0` | `stream.arn` | ARN of the input stream. |
31+
32+
Every parameter in the `InputStream0` group is passed to the DynamoDB Streams consumer, for example `flink.stream.initpos`.
33+
34+
See Flink DynamoDB connector docs](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/dynamodb/) for details about configuring the DynamoDB connector.
35+
36+
To configure the application on Managed Service for Apache Flink, set up these parameter in the *Runtime properties*.
37+
38+
To configure the application for running locally, edit the [json file](resources/flink-application-properties-dev.json).
39+
40+
### Running in IntelliJ
41+
42+
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
43+
44+
See [Running examples locally](../running-examples-locally.md) for details.
45+

java/DynamoDBStreamSource/pom.xml

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>com.amazonaws</groupId>
8+
<artifactId>dynamodb-streams-source</artifactId>
9+
<version>1.0</version>
10+
<packaging>jar</packaging>
11+
12+
<properties>
13+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
14+
<jar.finalName>${project.name}-${project.version}</jar.finalName>
15+
<target.java.version>11</target.java.version>
16+
<maven.compiler.source>${target.java.version}</maven.compiler.source>
17+
<maven.compiler.target>${target.java.version}</maven.compiler.target>
18+
<flink.version>1.20.0</flink.version>
19+
<aws.connector.version>5.0.0-1.20</aws.connector.version>
20+
<kda.runtime.version>1.2.0</kda.runtime.version>
21+
<log4j.version>2.23.1</log4j.version>
22+
</properties>
23+
24+
<dependencies>
25+
<!-- Apache Flink dependencies -->
26+
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
27+
<dependency>
28+
<groupId>org.apache.flink</groupId>
29+
<artifactId>flink-streaming-java</artifactId>
30+
<version>${flink.version}</version>
31+
<scope>provided</scope>
32+
</dependency>
33+
<dependency>
34+
<groupId>org.apache.flink</groupId>
35+
<artifactId>flink-clients</artifactId>
36+
<version>${flink.version}</version>
37+
<scope>provided</scope>
38+
</dependency>
39+
40+
<!-- Library to retrieve runtime application properties in Managed Service for Apache Flink -->
41+
<dependency>
42+
<groupId>com.amazonaws</groupId>
43+
<artifactId>aws-kinesisanalytics-runtime</artifactId>
44+
<version>${kda.runtime.version}</version>
45+
<scope>provided</scope>
46+
</dependency>
47+
48+
<!-- Connectors and Formats -->
49+
<dependency>
50+
<groupId>org.apache.flink</groupId>
51+
<artifactId>flink-connector-base</artifactId>
52+
<version>${flink.version}</version>
53+
<scope>provided</scope>
54+
</dependency>
55+
<dependency>
56+
<groupId>org.apache.flink</groupId>
57+
<artifactId>flink-connector-dynamodb</artifactId>
58+
<version>${aws.connector.version}</version>
59+
</dependency>
60+
<dependency>
61+
<groupId>org.apache.flink</groupId>
62+
<artifactId>flink-connector-aws-kinesis-streams</artifactId>
63+
<version>${aws.connector.version}</version>
64+
</dependency>
65+
66+
<!-- Add logging framework, to produce console output when running in the IDE. -->
67+
<!-- These dependencies are excluded from the application JAR by default. -->
68+
<dependency>
69+
<groupId>org.apache.logging.log4j</groupId>
70+
<artifactId>log4j-slf4j-impl</artifactId>
71+
<version>${log4j.version}</version>
72+
</dependency>
73+
<dependency>
74+
<groupId>org.apache.logging.log4j</groupId>
75+
<artifactId>log4j-api</artifactId>
76+
<version>${log4j.version}</version>
77+
</dependency>
78+
<dependency>
79+
<groupId>org.apache.logging.log4j</groupId>
80+
<artifactId>log4j-core</artifactId>
81+
<version>${log4j.version}</version>
82+
</dependency>
83+
</dependencies>
84+
85+
<build>
86+
<finalName>${jar.finalName}</finalName>
87+
88+
<plugins>
89+
<!-- Java Compiler -->
90+
<plugin>
91+
<groupId>org.apache.maven.plugins</groupId>
92+
<artifactId>maven-compiler-plugin</artifactId>
93+
<version>3.8.1</version>
94+
<configuration>
95+
<source>${target.java.version}</source>
96+
<target>${target.java.version}</target>
97+
</configuration>
98+
</plugin>
99+
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
100+
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
101+
<plugin>
102+
<groupId>org.apache.maven.plugins</groupId>
103+
<artifactId>maven-shade-plugin</artifactId>
104+
<version>3.2.1</version>
105+
<executions>
106+
<!-- Run shade goal on package phase -->
107+
<execution>
108+
<phase>package</phase>
109+
<goals>
110+
<goal>shade</goal>
111+
</goals>
112+
<configuration>
113+
<artifactSet>
114+
<excludes>
115+
<exclude>org.apache.flink:force-shading</exclude>
116+
<exclude>com.google.code.findbugs:jsr305</exclude>
117+
<exclude>org.slf4j:*</exclude>
118+
<exclude>log4j:*</exclude>
119+
</excludes>
120+
</artifactSet>
121+
<filters>
122+
<filter>
123+
<!-- Do not copy the signatures in the META-INF folder.
124+
Otherwise, this might cause SecurityExceptions when using the JAR. -->
125+
<artifact>*:*</artifact>
126+
<excludes>
127+
<exclude>META-INF/*.SF</exclude>
128+
<exclude>META-INF/*.DSA</exclude>
129+
<exclude>META-INF/*.RSA</exclude>
130+
</excludes>
131+
</filter>
132+
</filters>
133+
<transformers>
134+
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
135+
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
136+
<mainClass>com.amazonaws.services.msf.StreamingJob</mainClass>
137+
</transformer>
138+
</transformers>
139+
</configuration>
140+
</execution>
141+
</executions>
142+
</plugin>
143+
</plugins>
144+
</build>
145+
</project>
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.amazonaws.services.msf;
2+
3+
import java.time.Instant;
4+
5+
public class ChangeEvent {
6+
7+
private Instant timestamp;
8+
private Type eventType;
9+
private DdbTableItem oldItem;
10+
private DdbTableItem newItem;
11+
12+
public Instant getTimestamp() {
13+
return timestamp;
14+
}
15+
16+
public void setTimestamp(Instant timestamp) {
17+
this.timestamp = timestamp;
18+
}
19+
20+
public Type getEventType() {
21+
return eventType;
22+
}
23+
24+
public void setEventType(Type eventType) {
25+
this.eventType = eventType;
26+
}
27+
28+
public DdbTableItem getOldItem() {
29+
return oldItem;
30+
}
31+
32+
public void setOldItem(DdbTableItem oldItem) {
33+
this.oldItem = oldItem;
34+
}
35+
36+
public DdbTableItem getNewItem() {
37+
return newItem;
38+
}
39+
40+
public void setNewItem(DdbTableItem newItem) {
41+
this.newItem = newItem;
42+
}
43+
44+
@Override
45+
public String toString() {
46+
return "ChangeEvent{" +
47+
"timestamp=" + timestamp +
48+
", eventType=" + eventType +
49+
", oldItem=" + oldItem +
50+
", newItem=" + newItem +
51+
'}';
52+
}
53+
54+
public enum Type {
55+
INSERT,
56+
MODIFY,
57+
REMOVE
58+
}
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.amazonaws.services.msf;
2+
3+
import org.apache.flink.api.common.typeinfo.TypeInformation;
4+
import org.apache.flink.connector.dynamodb.source.serialization.DynamoDbStreamsDeserializationSchema;
5+
import org.apache.flink.util.Collector;
6+
7+
import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
8+
import software.amazon.awssdk.services.dynamodb.model.Record;
9+
10+
import java.io.IOException;
11+
12+
public class ChangeEventDeserializationSchema implements DynamoDbStreamsDeserializationSchema<ChangeEvent> {
13+
14+
private static final TableSchema<DdbTableItem> TABLE_ITEM_TABLE_SCHEMA = TableSchema.fromBean(DdbTableItem.class);
15+
16+
@Override
17+
public void deserialize(Record record, String s, String s1, Collector<ChangeEvent> collector) throws IOException {
18+
ChangeEvent changeEvent = new ChangeEvent();
19+
changeEvent.setTimestamp(record.dynamodb().approximateCreationDateTime());
20+
21+
switch (record.eventName()) {
22+
case INSERT:
23+
changeEvent.setEventType(ChangeEvent.Type.INSERT);
24+
changeEvent.setNewItem(TABLE_ITEM_TABLE_SCHEMA.mapToItem(record.dynamodb().newImage()));
25+
break;
26+
case MODIFY:
27+
changeEvent.setEventType(ChangeEvent.Type.MODIFY);
28+
changeEvent.setOldItem(TABLE_ITEM_TABLE_SCHEMA.mapToItem(record.dynamodb().oldImage()));
29+
changeEvent.setNewItem(TABLE_ITEM_TABLE_SCHEMA.mapToItem(record.dynamodb().newImage()));
30+
break;
31+
case REMOVE:
32+
changeEvent.setEventType(ChangeEvent.Type.REMOVE);
33+
changeEvent.setOldItem(TABLE_ITEM_TABLE_SCHEMA.mapToItem(record.dynamodb().oldImage()));
34+
break;
35+
default:
36+
throw new IllegalStateException("Unknown event name " + record.eventName());
37+
}
38+
collector.collect(changeEvent);
39+
}
40+
41+
@Override
42+
public TypeInformation<ChangeEvent> getProducedType() {
43+
return TypeInformation.of(ChangeEvent.class);
44+
}
45+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.amazonaws.services.msf;
2+
3+
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbAttribute;
4+
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean;
5+
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey;
6+
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSortKey;
7+
8+
@DynamoDbBean
9+
public class DdbTableItem {
10+
11+
private String partitionKey;
12+
private String sortKey;
13+
private String fieldOne;
14+
private String fieldTwo;
15+
16+
@DynamoDbPartitionKey
17+
@DynamoDbAttribute("partition-key")
18+
public String getPartitionKey() {
19+
return partitionKey;
20+
}
21+
22+
23+
public void setPartitionKey(String partitionKey) {
24+
this.partitionKey = partitionKey;
25+
}
26+
27+
@DynamoDbSortKey
28+
@DynamoDbAttribute("sort-key")
29+
public String getSortKey() {
30+
return sortKey;
31+
}
32+
33+
public void setSortKey(String sortKey) {
34+
this.sortKey = sortKey;
35+
}
36+
37+
public String getFieldOne() {
38+
return fieldOne;
39+
}
40+
41+
public void setFieldOne(String fieldOne) {
42+
this.fieldOne = fieldOne;
43+
}
44+
45+
public String getFieldTwo() {
46+
return fieldTwo;
47+
}
48+
49+
public void setFieldTwo(String fieldTwo) {
50+
this.fieldTwo = fieldTwo;
51+
}
52+
53+
@Override
54+
public String toString() {
55+
return "DdbTableSchema{" +
56+
"partitionKey='" + partitionKey + '\'' +
57+
", sortKey='" + sortKey + '\'' +
58+
", fieldOne='" + fieldOne + '\'' +
59+
", fieldTwo='" + fieldTwo + '\'' +
60+
'}';
61+
}
62+
}

0 commit comments

Comments
 (0)