Skip to content

Commit 3d6eeac

Browse files
authored
New examples: S3 AVRO Source and Sink (#103)
* S3 AVRO Sink * S3 AVRO Source
1 parent f155441 commit 3d6eeac

16 files changed

+942
-0
lines changed

java/S3AvroSink/README.md

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# S3 Avro Sink
2+
3+
* Flink version: 1.20
4+
* Flink API: DataStream API
5+
* Language Java (11)
6+
* Connectors: FileSystem Sink (and DataGen connector)
7+
8+
This example demonstrates how to write AVRO files to S3.
9+
10+
The example generates random stock price data using the DataGen connector and writes to S3 as AVRO files with
11+
a bucketing in the format `year=yyyy/month=MM/day=dd/hour=HH/` and rotating files on checkpoint.
12+
13+
Note that FileSystem sink commit the writes on checkpoint. For this reason, checkpoint are programmatically enabled when running locally.
14+
When running on Managed Flink checkpoints are controlled by the application configuration and enabled by default.
15+
16+
This application can be used in combination with the [S3AvroSource](../S3AvroSource) example application which read AVRO data with the same schema from S3.
17+
18+
## Prerequisites
19+
20+
* An S3 bucket for writing data. The application IAM Role must allow writing to the bucket
21+
22+
23+
## Runtime Configuration
24+
25+
The application reads the runtime configuration from the Runtime Properties, when running on Amazon Managed Service for Apache Flink,
26+
or, when running locally, from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder.
27+
28+
All parameters are case-sensitive.
29+
30+
| Group ID | Key | Description |
31+
|----------------|---------------|------------------------------------|
32+
| `OutputBucket` | `bucket.name` | Name of the destination S3 bucket. |
33+
| `OutputBucket` | `bucket.path` | Base path withing the bucket. |
34+
35+
To configure the application on Managed Service for Apache Flink, set up these parameter in the *Runtime properties*.
36+
37+
To configure the application for running locally, edit the [json file](resources/flink-application-properties-dev.json).
38+
39+
### Running in IntelliJ
40+
41+
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
42+
43+
See [Running examples locally](../running-examples-locally.md) for details.
44+
45+
## AVRO Specific Record usage
46+
47+
The AVRO schema definition (`avdl` file) is included as part of the source code in the `./src/main/resources/avro` folder.
48+
The AVRO Maven Plugin is used to generate the Java object `StockPrice` at compile time.
49+
50+
If IntelliJ cannot find the `StockPrice` class when you import the project:
51+
1. Run `mvn generate-sources` manually once
52+
2. Ensure that the IntelliJ module configuration, in Project settings, also includes `target/generated-sources/avro` as Sources. If IntelliJ does not auto-detect it, add the path manually
53+
54+
These operations are only needed once.

java/S3AvroSink/pom.xml

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>com.amazonaws</groupId>
8+
<artifactId>s3-avro-sink</artifactId>
9+
<version>1.0</version>
10+
11+
<properties>
12+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
13+
<buildDirectory>${project.basedir}/target</buildDirectory>
14+
<jar.finalName>${project.name}-${project.version}</jar.finalName>
15+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
16+
<target.java.version>11</target.java.version>
17+
<maven.compiler.source>${target.java.version}</maven.compiler.source>
18+
<maven.compiler.target>${target.java.version}</maven.compiler.target>
19+
20+
<flink.version>1.20.0</flink.version>
21+
<kda.runtime.version>1.2.0</kda.runtime.version>
22+
<avro.version>1.11.3</avro.version>
23+
<parquet.avro.version>1.15.1</parquet.avro.version>
24+
25+
<log4j.version>2.23.1</log4j.version>
26+
<junit5.version>5.8.1</junit5.version>
27+
</properties>
28+
29+
<dependencyManagement>
30+
<dependencies>
31+
<dependency>
32+
<groupId>com.amazonaws</groupId>
33+
<artifactId>aws-java-sdk-bom</artifactId>
34+
<!-- Get the latest SDK version from https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bom -->
35+
<version>1.12.782</version>
36+
<type>pom</type>
37+
<scope>import</scope>
38+
</dependency>
39+
</dependencies>
40+
</dependencyManagement>
41+
42+
<dependencies>
43+
<!-- Apache Flink dependencies -->
44+
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
45+
<dependency>
46+
<groupId>org.apache.flink</groupId>
47+
<artifactId>flink-streaming-java</artifactId>
48+
<version>${flink.version}</version>
49+
<scope>provided</scope>
50+
</dependency>
51+
<dependency>
52+
<groupId>org.apache.flink</groupId>
53+
<artifactId>flink-clients</artifactId>
54+
<version>${flink.version}</version>
55+
<scope>provided</scope>
56+
</dependency>
57+
<dependency>
58+
<groupId>org.apache.flink</groupId>
59+
<artifactId>flink-runtime-web</artifactId>
60+
<version>${flink.version}</version>
61+
<scope>provided</scope>
62+
</dependency>
63+
64+
<!-- Library to retrieve runtime application properties in Managed Service for Apache Flink -->
65+
<dependency>
66+
<groupId>com.amazonaws</groupId>
67+
<artifactId>aws-kinesisanalytics-runtime</artifactId>
68+
<version>${kda.runtime.version}</version>
69+
<scope>provided</scope>
70+
</dependency>
71+
72+
<!-- Flink connectors -->
73+
<dependency>
74+
<groupId>org.apache.flink</groupId>
75+
<artifactId>flink-connector-files</artifactId>
76+
<version>${flink.version}</version>
77+
<scope>provided</scope>
78+
</dependency>
79+
<dependency>
80+
<groupId>org.apache.flink</groupId>
81+
<artifactId>flink-s3-fs-hadoop</artifactId>
82+
<version>${flink.version}</version>
83+
<scope>provided</scope>
84+
</dependency>
85+
86+
<!-- AVRO -->
87+
<dependency>
88+
<groupId>org.apache.flink</groupId>
89+
<artifactId>flink-avro</artifactId>
90+
<version>${flink.version}</version>
91+
</dependency>
92+
<dependency>
93+
<groupId>org.apache.avro</groupId>
94+
<artifactId>avro</artifactId>
95+
<version>${avro.version}</version>
96+
</dependency>
97+
98+
<!-- Tests -->
99+
<dependency>
100+
<groupId>org.junit.jupiter</groupId>
101+
<artifactId>junit-jupiter</artifactId>
102+
<version>${junit5.version}</version>
103+
<scope>test</scope>
104+
</dependency>
105+
106+
<!-- Logging framework, to produce console output when running in the IDE. -->
107+
<!-- These dependencies are excluded from the application JAR by default. -->
108+
<dependency>
109+
<groupId>org.apache.logging.log4j</groupId>
110+
<artifactId>log4j-slf4j-impl</artifactId>
111+
<version>${log4j.version}</version>
112+
</dependency>
113+
<dependency>
114+
<groupId>org.apache.logging.log4j</groupId>
115+
<artifactId>log4j-api</artifactId>
116+
<version>${log4j.version}</version>
117+
</dependency>
118+
<dependency>
119+
<groupId>org.apache.logging.log4j</groupId>
120+
<artifactId>log4j-core</artifactId>
121+
<version>${log4j.version}</version>
122+
<scope>runtime</scope>
123+
</dependency>
124+
</dependencies>
125+
126+
<build>
127+
128+
<plugins>
129+
<!-- AVRO source generator -->
130+
<plugin>
131+
<groupId>org.apache.avro</groupId>
132+
<artifactId>avro-maven-plugin</artifactId>
133+
<version>${avro.version}</version>
134+
<executions>
135+
<execution>
136+
<phase>generate-sources</phase>
137+
<goals>
138+
<goal>idl-protocol</goal>
139+
</goals>
140+
<configuration>
141+
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
142+
<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
143+
<fieldVisibility>private</fieldVisibility>
144+
<stringType>String</stringType>
145+
<enableDecimalLogicalType>true</enableDecimalLogicalType>
146+
</configuration>
147+
</execution>
148+
</executions>
149+
</plugin>
150+
151+
<!-- Java Compiler -->
152+
<plugin>
153+
<groupId>org.apache.maven.plugins</groupId>
154+
<artifactId>maven-compiler-plugin</artifactId>
155+
<version>3.8.1</version>
156+
<configuration>
157+
<source>${target.java.version}</source>
158+
<target>${target.java.version}</target>
159+
<forceJavacCompilerUse>true</forceJavacCompilerUse>
160+
</configuration>
161+
</plugin>
162+
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
163+
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
164+
<plugin>
165+
<groupId>org.apache.maven.plugins</groupId>
166+
<artifactId>maven-shade-plugin</artifactId>
167+
<version>3.2.1</version>
168+
<executions>
169+
<!-- Run shade goal on package phase -->
170+
<execution>
171+
<phase>package</phase>
172+
<goals>
173+
<goal>shade</goal>
174+
</goals>
175+
<configuration>
176+
<artifactSet>
177+
<excludes>
178+
<exclude>org.apache.flink:force-shading</exclude>
179+
<exclude>com.google.code.findbugs:jsr305</exclude>
180+
<exclude>org.slf4j:*</exclude>
181+
<exclude>org.apache.logging.log4j:*</exclude>
182+
</excludes>
183+
</artifactSet>
184+
<filters>
185+
<filter>
186+
<!-- Do not copy the signatures in the META-INF folder.
187+
Otherwise, this might cause SecurityExceptions when using the JAR. -->
188+
<artifact>*:*</artifact>
189+
<excludes>
190+
<exclude>META-INF/*.SF</exclude>
191+
<exclude>META-INF/*.DSA</exclude>
192+
<exclude>META-INF/*.RSA</exclude>
193+
</excludes>
194+
</filter>
195+
</filters>
196+
<transformers>
197+
<transformer
198+
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
199+
<transformer
200+
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
201+
<mainClass>com.amazonaws.services.msf.StreamingJob</mainClass>
202+
</transformer>
203+
</transformers>
204+
</configuration>
205+
</execution>
206+
</executions>
207+
</plugin>
208+
209+
</plugins>
210+
</build>
211+
212+
</project>
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package com.amazonaws.services.msf;
2+
3+
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
4+
import com.amazonaws.services.msf.avro.StockPrice;
5+
import com.amazonaws.services.msf.datagen.StockPriceGeneratorFunction;
6+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
7+
import org.apache.flink.api.common.typeinfo.TypeInformation;
8+
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
9+
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
10+
import org.apache.flink.connector.file.sink.FileSink;
11+
import org.apache.flink.core.fs.Path;
12+
import org.apache.flink.formats.avro.AvroWriters;
13+
import org.apache.flink.streaming.api.datastream.DataStream;
14+
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
15+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
16+
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
17+
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
18+
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
19+
import org.apache.flink.util.Preconditions;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import java.io.IOException;
24+
import java.util.Map;
25+
import java.util.Properties;
26+
27+
public class StreamingJob {
28+
private static final Logger LOGGER = LoggerFactory.getLogger(StreamingJob.class);
29+
30+
// Name of the local JSON resource with the application properties in the same format as they are received from the Amazon Managed Service for Apache Flink runtime
31+
private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json";
32+
33+
private static boolean isLocal(StreamExecutionEnvironment env) {
34+
return env instanceof LocalStreamEnvironment;
35+
}
36+
37+
/**
38+
* Load application properties from Amazon Managed Service for Apache Flink runtime or from a local resource, when the environment is local
39+
*/
40+
private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
41+
if (isLocal(env)) {
42+
LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
43+
return KinesisAnalyticsRuntime.getApplicationProperties(
44+
StreamingJob.class.getClassLoader()
45+
.getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
46+
} else {
47+
LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink");
48+
return KinesisAnalyticsRuntime.getApplicationProperties();
49+
}
50+
}
51+
52+
private static DataGeneratorSource<StockPrice> getStockPriceDataGeneratorSource() {
53+
long recordPerSecond = 100;
54+
return new DataGeneratorSource<>(
55+
new StockPriceGeneratorFunction(),
56+
Long.MAX_VALUE,
57+
RateLimiterStrategy.perSecond(recordPerSecond),
58+
TypeInformation.of(StockPrice.class));
59+
}
60+
61+
private static FileSink<StockPrice> getParquetS3Sink(String s3UrlPath) {
62+
return FileSink
63+
.forBulkFormat(new Path(s3UrlPath), AvroWriters.forSpecificRecord(StockPrice.class))
64+
// Bucketing
65+
.withBucketAssigner(new DateTimeBucketAssigner<>("'year='yyyy'/month='MM'/day='dd'/hour='HH/"))
66+
// Part file rolling - this is actually the default, rolling on checkpoint
67+
.withRollingPolicy(OnCheckpointRollingPolicy.build())
68+
.withOutputFileConfig(OutputFileConfig.builder()
69+
.withPartSuffix(".avro")
70+
.build())
71+
.build();
72+
}
73+
74+
public static void main(String[] args) throws Exception {
75+
// set up the streaming execution environment
76+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
77+
78+
// Local dev specific settings
79+
if (isLocal(env)) {
80+
// Checkpointing and parallelism are set by Amazon Managed Service for Apache Flink when running on AWS
81+
env.enableCheckpointing(30000);
82+
env.setParallelism(2);
83+
}
84+
85+
// Application configuration
86+
Properties applicationProperties = loadApplicationProperties(env).get("OutputBucket");
87+
String bucketName = Preconditions.checkNotNull(applicationProperties.getProperty("bucket.name"), "Bucket for S3 not defined");
88+
String bucketPath = Preconditions.checkNotNull(applicationProperties.getProperty("bucket.path"), "Path in S3 not defined");
89+
90+
// Build S3 URL. Strip any initial fwd slash from bucket path
91+
String s3UrlPath = String.format("s3a://%s/%s", bucketName.trim(), bucketPath.trim().replaceFirst("^/+", ""));
92+
LOGGER.info("Output URL: {}", s3UrlPath);
93+
94+
// Source (data generator)
95+
DataGeneratorSource<StockPrice> source = getStockPriceDataGeneratorSource();
96+
97+
// DataStream from source
98+
DataStream<StockPrice> stockPrices = env.fromSource(
99+
source, WatermarkStrategy.noWatermarks(), "data-generator").setParallelism(1);
100+
101+
// Sink (Parquet files to S3)
102+
FileSink<StockPrice> sink = getParquetS3Sink(s3UrlPath);
103+
104+
stockPrices.sinkTo(sink).name("avro-s3-sink");
105+
106+
// Also print the output
107+
// (This is for illustration purposes only and used when running locally. No output is printed when running on Managed Flink)
108+
stockPrices.print();
109+
110+
env.execute("Sink Avro to S3");
111+
}
112+
}

0 commit comments

Comments
 (0)