Skip to content

Commit 8f5e0f9

Browse files
Iceberg Source + S3 Table Sink (#86)
* iceberg with s3 tables * moved other Iceberg examples into the same folder
1 parent 19b4d9c commit 8f5e0f9

34 files changed

+1516
-1
lines changed

.gitattributes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
flink-application-properties-dev.json filter=arn-filter

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,6 @@ venv/
1515
__pycache__/
1616

1717
/.run/
18+
19+
clean.sh
20+
smudge.sh
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# Flink Iceberg Source using DataStream API
2+
3+
* Flink version: 1.20.0
4+
* Flink API: DataStream API
5+
* Iceberg 1.6.1
6+
* Language: Java (11)
7+
* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/)
8+
and [Iceberg](https://iceberg.apache.org/docs/latest/flink/)
9+
10+
This example demonstrate how to use
11+
[Flink Iceberg Source Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with the Glue Data Catalog.
12+
13+
For simplicity, the application generates synthetic data, random stock prices, internally.
14+
Data is generated as AVRO Generic Record, simulating a real source, for example a Kafka Source, that receives records
15+
serialized with AVRO.
16+
17+
### Prerequisites
18+
19+
The application expects the following resources:
20+
* A Glue Data Catalog database in the current AWS region. The database name is configurable (default: "default").
21+
The application creates the Table, but the Catalog must exist already.
22+
* An S3 bucket to write the Iceberg table.
23+
24+
#### IAM Permissions
25+
26+
The application must have IAM permissions to:
27+
* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables.
28+
See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html).
29+
* Read and Write from the S3 bucket.
30+
31+
### Runtime configuration
32+
33+
When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties.
34+
35+
When running locally, the configuration is read from the
36+
[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file.
37+
38+
Runtime parameters:
39+
40+
| Group ID | Key | Default | Description |
41+
|-----------|--------------------------|-------------------|---------------------------------------------------------------------------------------------------------------------|
42+
| `DataGen` | `records.per.sec` | `10.0` | Records per second generated. |
43+
| `Iceberg` | `bucket.prefix` | (mandatory) | S3 bucket prefix, for example `s3://my-bucket/iceberg`. |
44+
| `Iceberg` | `catalog.db` | `default` | Name of the Glue Data Catalog database. |
45+
| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the Glue Data Catalog table. |
46+
| `Iceberg` | `partition.fields` | `symbol` | Comma separated list of partition fields. |
47+
| `Iceberg` | `sort.field` | `timestamp` | Sort field. |
48+
| `Iceberg` | `operation` | `updsert` | Iceberg operation. One of `upsert`, `append` or `overwrite`. |
49+
| `Iceberg` | `upsert.equality.fields` | `symbol` | Comma separated list of fields used for upsert. It must match partition fields. Required if `operation` = `upsert`. |
50+
51+
52+
### Checkpoints
53+
54+
Checkpointing must be enabled. Iceberg commits writes on checkpoint.
55+
56+
When running locally, the application enables checkpoints programmatically, every 10 seconds.
57+
When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration.
58+
59+
60+
### Known limitations
61+
62+
At the moment there are current limitations concerning Flink Iceberg integration:
63+
* Doesn't support Iceberg Table with hidden partitioning
64+
* Doesn't support adding columns, removing columns, renaming columns or changing columns.
65+
66+
### Schema and schema evolution
67+
68+
The application must "know" the AVRO schema on start.
69+
The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry.
70+
This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront.
71+
72+
This implementation does support schema evolution in the incoming data, as long as new schema versions are FORWARD compatible.
73+
Schema changes are not propagated to Iceberg.
74+
As long as the schema of incoming records is FORWARD compatible, the application deserializes incoming records using
75+
the schema it knows. Any new field in the incoming record is discarded.
76+
77+
In this example, the schema is loaded from a schema definition file, [price.avsc](./src/main/resources/price.avsc) embedded
78+
with the application.
79+
It is technically possible to fetch the schema on application start from an external source, like a schema registry or a
80+
schema definition file in an S3 bucket. This is beyond the scope of this example.
81+
82+
### Running locally, in IntelliJ
83+
84+
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
85+
86+
See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details.
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>com.amazonaws</groupId>
8+
<artifactId>iceberg-datastream-source</artifactId>
9+
<version>1.0</version>
10+
<packaging>jar</packaging>
11+
12+
<properties>
13+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
14+
<target.java.version>11</target.java.version>
15+
<maven.compiler.source>${target.java.version}</maven.compiler.source>
16+
<maven.compiler.target>${target.java.version}</maven.compiler.target>
17+
18+
<flink.version>1.20.0</flink.version>
19+
<avro.version>1.11.3</avro.version>
20+
<hadoop.version>3.4.0</hadoop.version>
21+
<iceberg.version>1.6.1</iceberg.version>
22+
<kda.runtime.version>1.2.0</kda.runtime.version>
23+
<log4j.version>2.23.1</log4j.version>
24+
<junit5.version>5.8.1</junit5.version>
25+
</properties>
26+
27+
<dependencies>
28+
<!-- Flink Core dependencies -->
29+
<dependency>
30+
<groupId>org.apache.flink</groupId>
31+
<artifactId>flink-runtime-web</artifactId>
32+
<version>${flink.version}</version>
33+
<scope>provided</scope>
34+
</dependency>
35+
<dependency>
36+
<groupId>org.apache.flink</groupId>
37+
<artifactId>flink-streaming-java</artifactId>
38+
<version>${flink.version}</version>
39+
<scope>provided</scope>
40+
</dependency>
41+
42+
<dependency>
43+
<groupId>org.apache.flink</groupId>
44+
<artifactId>flink-connector-files</artifactId>
45+
<version>${flink.version}</version>
46+
<scope>provided</scope>
47+
</dependency>
48+
<dependency>
49+
<groupId>org.apache.flink</groupId>
50+
<artifactId>flink-table-runtime</artifactId>
51+
<version>${flink.version}</version>
52+
<scope>provided</scope>
53+
</dependency>
54+
55+
<!-- Flink Iceberg uses DropWizard metrics -->
56+
<dependency>
57+
<groupId>org.apache.flink</groupId>
58+
<artifactId>flink-metrics-dropwizard</artifactId>
59+
<version>${flink.version}</version>
60+
</dependency>
61+
62+
<!-- Library to retrieve runtime application properties in Managed Service for Apache Flink -->
63+
<dependency>
64+
<groupId>com.amazonaws</groupId>
65+
<artifactId>aws-kinesisanalytics-runtime</artifactId>
66+
<version>${kda.runtime.version}</version>
67+
<scope>provided</scope>
68+
</dependency>
69+
70+
<!-- AVRO -->
71+
<dependency>
72+
<groupId>org.apache.flink</groupId>
73+
<artifactId>flink-avro</artifactId>
74+
<version>${flink.version}</version>
75+
</dependency>
76+
77+
<!--Iceberg dependencies -->
78+
<!-- DO NOT include the iceberg-flink-runtime-* dependency, because it contains a shaded version of Avro -->
79+
<dependency>
80+
<groupId>org.apache.iceberg</groupId>
81+
<artifactId>iceberg-core</artifactId>
82+
<version>${iceberg.version}</version>
83+
</dependency>
84+
<dependency>
85+
<groupId>org.apache.iceberg</groupId>
86+
<artifactId>iceberg-flink</artifactId>
87+
<version>${iceberg.version}</version>
88+
</dependency>
89+
<dependency>
90+
<groupId>org.apache.iceberg</groupId>
91+
<artifactId>iceberg-flink-1.19</artifactId>
92+
<version>${iceberg.version}</version>
93+
</dependency>
94+
<dependency>
95+
<groupId>org.apache.iceberg</groupId>
96+
<artifactId>iceberg-aws-bundle</artifactId>
97+
<version>${iceberg.version}</version>
98+
</dependency>
99+
<dependency>
100+
<groupId>org.apache.iceberg</groupId>
101+
<artifactId>iceberg-aws</artifactId>
102+
<version>${iceberg.version}</version>
103+
</dependency>
104+
105+
<dependency>
106+
<groupId>org.apache.hadoop</groupId>
107+
<artifactId>hadoop-client</artifactId>
108+
<version>${hadoop.version}</version>
109+
<exclusions>
110+
<exclusion>
111+
<groupId>org.apache.avro</groupId>
112+
<artifactId>avro</artifactId>
113+
</exclusion>
114+
<!-- exclude to prevent multiple of SLF4j binding conflict -->
115+
<exclusion>
116+
<groupId>org.slf4j</groupId>
117+
<artifactId>slf4j-reload4j</artifactId>
118+
</exclusion>
119+
</exclusions>
120+
</dependency>
121+
122+
123+
<!-- Tests -->
124+
<dependency>
125+
<groupId>org.junit.jupiter</groupId>
126+
<artifactId>junit-jupiter</artifactId>
127+
<version>${junit5.version}</version>
128+
<scope>test</scope>
129+
</dependency>
130+
131+
<!-- Logging framework, to produce console output when running in the IDE. -->
132+
<!-- These dependencies are excluded from the application JAR by default. -->
133+
<dependency>
134+
<groupId>org.apache.logging.log4j</groupId>
135+
<artifactId>log4j-slf4j-impl</artifactId>
136+
<version>${log4j.version}</version>
137+
</dependency>
138+
<dependency>
139+
<groupId>org.apache.logging.log4j</groupId>
140+
<artifactId>log4j-api</artifactId>
141+
<version>${log4j.version}</version>
142+
</dependency>
143+
<dependency>
144+
<groupId>org.apache.logging.log4j</groupId>
145+
<artifactId>log4j-core</artifactId>
146+
<version>${log4j.version}</version>
147+
<scope>runtime</scope>
148+
</dependency>
149+
</dependencies>
150+
151+
<build>
152+
<plugins>
153+
<!-- Java Compiler -->
154+
<plugin>
155+
<groupId>org.apache.maven.plugins</groupId>
156+
<artifactId>maven-compiler-plugin</artifactId>
157+
<version>3.8.1</version>
158+
<configuration>
159+
<source>${target.java.version}</source>
160+
<target>${target.java.version}</target>
161+
</configuration>
162+
</plugin>
163+
164+
<!-- Shade plugin to build the fat-jar including all required dependencies -->
165+
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
166+
<plugin>
167+
<groupId>org.apache.maven.plugins</groupId>
168+
<artifactId>maven-shade-plugin</artifactId>
169+
<version>3.2.1</version>
170+
<executions>
171+
<!-- Run shade goal on package phase -->
172+
<execution>
173+
<phase>package</phase>
174+
<goals>
175+
<goal>shade</goal>
176+
</goals>
177+
<configuration>
178+
<artifactSet>
179+
<excludes>
180+
<exclude>org.apache.flink:force-shading</exclude>
181+
<exclude>com.google.code.findbugs:jsr305</exclude>
182+
<exclude>org.slf4j:*</exclude>
183+
<exclude>log4j:*</exclude>
184+
</excludes>
185+
</artifactSet>
186+
<filters>
187+
<filter>
188+
<!-- Do not copy the signatures in the META-INF folder.
189+
Otherwise, this might cause SecurityExceptions when using the JAR. -->
190+
<artifact>*:*</artifact>
191+
<excludes>
192+
<exclude>META-INF/*.SF</exclude>
193+
<exclude>META-INF/*.DSA</exclude>
194+
<exclude>META-INF/*.RSA</exclude>
195+
</excludes>
196+
</filter>
197+
</filters>
198+
<transformers>
199+
<transformer
200+
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
201+
<transformer
202+
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
203+
<mainClass>com.amazonaws.services.msf.StreamingJob</mainClass>
204+
</transformer>
205+
</transformers>
206+
</configuration>
207+
</execution>
208+
</executions>
209+
</plugin>
210+
</plugins>
211+
</build>
212+
</project>

0 commit comments

Comments
 (0)