Skip to content

Commit 19b4d9c

Browse files
authored
Add public example for SQS Connector (#89)
* Add public example for SQS Connector --------- Authored-by: Gowry Lakshmi <sgowryl@amazon.com>
1 parent 2770316 commit 19b4d9c

File tree

8 files changed

+389
-0
lines changed

8 files changed

+389
-0
lines changed

java/SQSSink/README.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Flink SQS Sink example
2+
3+
* Flink version: 1.20
4+
* Flink API: DataStream API
5+
* Language: Java (11)
6+
* Flink connectors: SQS Sink
7+
8+
This example demonstrate how to use [SQS Sink Connector](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/sqs/).
9+
10+
This example application generates random data and send the output to [`SQS Sink`](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/sqs/) connector.
11+
12+
### Runtime configuration
13+
14+
The application reads the runtime configuration from the Runtime Properties, when running on Amazon Managed Service for Apache Flink,
15+
or, when running locally, from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder.
16+
17+
All parameters are case-sensitive.
18+
19+
| Group ID | Key | Description |
20+
|----------------|--------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|
21+
| `OutputQueue0` | `sqs-url` | URL to SQS sink output queue |
22+
| `OutputQueue0` | `aws.region` | (optional) Region of the output stream. If not specified, it will use the application region or the default region of the AWS profile, when running locally. |
23+
24+
25+
To configure the applicaton on Managed Service for Apache Flink, set up these parameter in the *Runtime properties*.
26+
27+
To configure the application for running locally, edit the [json file](resources/flink-application-properties-dev.json).
28+
29+
### Running locally in IntelliJ
30+
31+
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
32+
33+
See [Running examples locally](../running-examples-locally.md) for details.

java/SQSSink/pom.xml

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>com.amazonaws</groupId>
8+
<artifactId>flink-connector-sqs</artifactId>
9+
<version>1.0</version>
10+
<packaging>jar</packaging>
11+
12+
<properties>
13+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
14+
<buildDirectory>${project.basedir}/target</buildDirectory>
15+
<jar.finalName>${project.name}-${project.version}</jar.finalName>
16+
<target.java.version>11</target.java.version>
17+
<maven.compiler.source>${target.java.version}</maven.compiler.source>
18+
<maven.compiler.target>${target.java.version}</maven.compiler.target>
19+
<flink.version>1.20.0</flink.version>
20+
<flink.connector.version>5.0.0-1.20</flink.connector.version>
21+
<kda.runtime.version>1.2.0</kda.runtime.version>
22+
<log4j.version>2.23.1</log4j.version>
23+
</properties>
24+
25+
<dependencyManagement>
26+
<dependencies>
27+
<dependency>
28+
<groupId>com.amazonaws</groupId>
29+
<artifactId>aws-java-sdk-bom</artifactId>
30+
<!-- Get the latest SDK version from https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bom -->
31+
<version>1.12.677</version>
32+
<type>pom</type>
33+
<scope>import</scope>
34+
</dependency>
35+
</dependencies>
36+
</dependencyManagement>
37+
38+
<dependencies>
39+
<!-- Flink Core dependencies -->
40+
<dependency>
41+
<groupId>org.apache.flink</groupId>
42+
<artifactId>flink-streaming-java</artifactId>
43+
<version>${flink.version}</version>
44+
<scope>provided</scope>
45+
</dependency>
46+
<dependency>
47+
<groupId>org.apache.flink</groupId>
48+
<artifactId>flink-clients</artifactId>
49+
<version>${flink.version}</version>
50+
<scope>provided</scope>
51+
</dependency>
52+
<dependency>
53+
<groupId>org.apache.flink</groupId>
54+
<artifactId>flink-runtime-web</artifactId>
55+
<version>${flink.version}</version>
56+
<scope>provided</scope>
57+
</dependency>
58+
<dependency>
59+
<groupId>org.apache.flink</groupId>
60+
<artifactId>flink-json</artifactId>
61+
<version>${flink.version}</version>
62+
<scope>provided</scope>
63+
</dependency>
64+
65+
<!-- Library to retrieve runtime application properties in Managed Service for Apache Flink -->
66+
<dependency>
67+
<groupId>com.amazonaws</groupId>
68+
<artifactId>aws-kinesisanalytics-runtime</artifactId>
69+
<version>${kda.runtime.version}</version>
70+
<scope>provided</scope>
71+
</dependency>
72+
73+
<!-- DataGen connector -->
74+
<dependency>
75+
<groupId>org.apache.flink</groupId>
76+
<artifactId>flink-connector-datagen</artifactId>
77+
<version>${flink.version}</version>
78+
</dependency>
79+
<dependency>
80+
<groupId>org.apache.flink</groupId>
81+
<artifactId>flink-connector-base</artifactId>
82+
<version>${flink.version}</version>
83+
<scope>provided</scope>
84+
</dependency>
85+
86+
<!-- SQS Connector -->
87+
<dependency>
88+
<groupId>org.apache.flink</groupId>
89+
<artifactId>flink-connector-sqs</artifactId>
90+
<version>${flink.connector.version}</version>
91+
</dependency>
92+
93+
94+
<!-- Logging framework, to produce console output when running in the IDE. -->
95+
<!-- These dependencies are excluded from the application JAR by default. -->
96+
<dependency>
97+
<groupId>org.apache.logging.log4j</groupId>
98+
<artifactId>log4j-slf4j-impl</artifactId>
99+
<version>${log4j.version}</version>
100+
</dependency>
101+
<dependency>
102+
<groupId>org.apache.logging.log4j</groupId>
103+
<artifactId>log4j-api</artifactId>
104+
<version>${log4j.version}</version>
105+
</dependency>
106+
<dependency>
107+
<groupId>org.apache.logging.log4j</groupId>
108+
<artifactId>log4j-core</artifactId>
109+
<version>${log4j.version}</version>
110+
</dependency>
111+
</dependencies>
112+
113+
<build>
114+
<directory>${buildDirectory}</directory>
115+
<finalName>${jar.finalName}</finalName>
116+
117+
<plugins>
118+
<!-- Java Compiler -->
119+
<plugin>
120+
<groupId>org.apache.maven.plugins</groupId>
121+
<artifactId>maven-compiler-plugin</artifactId>
122+
<version>3.8.1</version>
123+
<configuration>
124+
<source>${target.java.version}</source>
125+
<target>${target.java.version}</target>
126+
</configuration>
127+
</plugin>
128+
129+
<!-- Shade plugin to build the fat-jar including all required dependencies -->
130+
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
131+
<plugin>
132+
<groupId>org.apache.maven.plugins</groupId>
133+
<artifactId>maven-shade-plugin</artifactId>
134+
<version>3.2.1</version>
135+
<executions>
136+
<!-- Run shade goal on package phase -->
137+
<execution>
138+
<phase>package</phase>
139+
<goals>
140+
<goal>shade</goal>
141+
</goals>
142+
<configuration>
143+
<artifactSet>
144+
<excludes>
145+
<exclude>org.apache.flink:force-shading</exclude>
146+
<exclude>com.google.code.findbugs:jsr305</exclude>
147+
<exclude>org.slf4j:*</exclude>
148+
<exclude>log4j:*</exclude>
149+
</excludes>
150+
</artifactSet>
151+
<filters>
152+
<filter>
153+
<!-- Do not copy the signatures in the META-INF folder.
154+
Otherwise, this might cause SecurityExceptions when using the JAR. -->
155+
<artifact>*:*</artifact>
156+
<excludes>
157+
<exclude>META-INF/*.SF</exclude>
158+
<exclude>META-INF/*.DSA</exclude>
159+
<exclude>META-INF/*.RSA</exclude>
160+
</excludes>
161+
</filter>
162+
</filters>
163+
<transformers>
164+
<transformer
165+
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
166+
<transformer
167+
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
168+
<mainClass>com/amazonaws/services/msf/SQSStreamingJob.java</mainClass>
169+
</transformer>
170+
</transformers>
171+
</configuration>
172+
</execution>
173+
</executions>
174+
</plugin>
175+
</plugins>
176+
</build>
177+
</project>
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package com.amazonaws.services.msf;
2+
3+
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
4+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
5+
import org.apache.flink.api.common.typeinfo.TypeInformation;
6+
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
7+
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
8+
import org.apache.flink.connector.sqs.sink.SqsSink;
9+
import org.apache.flink.connector.sqs.sink.SqsSinkElementConverter;
10+
import org.apache.flink.streaming.api.datastream.DataStream;
11+
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
12+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
13+
import org.apache.flink.formats.json.JsonSerializationSchema;
14+
15+
import java.io.IOException;
16+
import java.util.Map;
17+
import java.util.Properties;
18+
19+
20+
public class SQSStreamingJob {
21+
22+
// Name of the local JSON resource with the application properties in the same format as they are received from the Amazon Managed Service for Apache Flink runtime
23+
private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json";
24+
25+
private static boolean isLocal(StreamExecutionEnvironment env) {
26+
return env instanceof LocalStreamEnvironment;
27+
}
28+
29+
/**
30+
* Load application properties from Amazon Managed Service for Apache Flink runtime or from a local resource, when the environment is local
31+
*/
32+
private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
33+
if (isLocal(env)) {
34+
return KinesisAnalyticsRuntime.getApplicationProperties(
35+
SQSStreamingJob.class.getClassLoader()
36+
.getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
37+
} else {
38+
return KinesisAnalyticsRuntime.getApplicationProperties();
39+
}
40+
}
41+
42+
private static DataGeneratorSource<StockPrice> getStockPriceDataGeneratorSource() {
43+
long recordPerSecond = 100;
44+
return new DataGeneratorSource<>(
45+
new StockPriceGeneratorFunction(),
46+
Long.MAX_VALUE,
47+
RateLimiterStrategy.perSecond(recordPerSecond),
48+
TypeInformation.of(StockPrice.class));
49+
}
50+
51+
private static <T> SqsSink<T> createSQSSink(
52+
Properties sinkProperties,
53+
SqsSinkElementConverter<T> elementConverter) {
54+
final String sqsUrl = sinkProperties.getProperty("sqs-url");
55+
return SqsSink.<T>builder()
56+
.setSqsSinkElementConverter(elementConverter)
57+
.setSqsUrl(sqsUrl)
58+
.setSqsClientProperties(sinkProperties)
59+
.build();
60+
}
61+
62+
public static void main(String[] args) throws Exception {
63+
64+
// set up the streaming execution environment
65+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
66+
final Map<String, Properties> applicationProperties = loadApplicationProperties(env);
67+
68+
// Source
69+
DataGeneratorSource<StockPrice> source = getStockPriceDataGeneratorSource();
70+
71+
// DataStream from Source
72+
DataStream<StockPrice> input = env.fromSource(
73+
source, WatermarkStrategy.noWatermarks(), "data-generator").setParallelism(1);
74+
75+
SqsSinkElementConverter<StockPrice> elementConverter =
76+
SqsSinkElementConverter.<StockPrice>builder()
77+
.setSerializationSchema(new JsonSerializationSchema<>())
78+
.build();
79+
80+
// Sink
81+
SqsSink<StockPrice> sink = createSQSSink(
82+
applicationProperties.get("OutputQueue0"),
83+
elementConverter
84+
);
85+
86+
input.sinkTo(sink);
87+
88+
env.execute("Flink SQS Sink examples");
89+
}
90+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.amazonaws.services.msf;
2+
3+
import java.sql.Timestamp;
4+
5+
public class StockPrice {
6+
private Timestamp eventTime;
7+
private String ticker;
8+
private double price;
9+
10+
11+
public StockPrice() {
12+
}
13+
14+
public StockPrice(Timestamp eventTime, String ticker, double price) {
15+
this.eventTime = eventTime;
16+
this.ticker = ticker;
17+
this.price = price;
18+
}
19+
20+
public Timestamp getEventTime() {
21+
return eventTime;
22+
}
23+
24+
public void setEventTime(Timestamp eventTime) {
25+
this.eventTime = eventTime;
26+
}
27+
28+
public String getTicker() {
29+
return ticker;
30+
}
31+
32+
public void setTicker(String ticker) {
33+
this.ticker = ticker;
34+
}
35+
36+
public double getPrice() {
37+
return price;
38+
}
39+
40+
public void setPrice(double price) {
41+
this.price = price;
42+
}
43+
44+
@Override
45+
public String toString() {
46+
return "StockPrice{" +
47+
"eventTime=" + eventTime +
48+
", ticker='" + ticker + '\'' +
49+
", price=" + price +
50+
'}';
51+
}
52+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.amazonaws.services.msf;
2+
3+
import org.apache.commons.lang3.RandomUtils;
4+
import org.apache.flink.connector.datagen.source.GeneratorFunction;
5+
6+
import java.sql.Timestamp;
7+
import java.time.Instant;
8+
9+
public class StockPriceGeneratorFunction implements GeneratorFunction<Long, StockPrice> {
10+
private static final String[] TICKERS = {"AAPL", "AMZN", "MSFT", "INTC", "TBV"};
11+
12+
@Override
13+
public StockPrice map(Long aLong) {
14+
return new StockPrice(
15+
new Timestamp(Instant.now().toEpochMilli()),
16+
TICKERS[RandomUtils.nextInt(0, TICKERS.length)],
17+
RandomUtils.nextDouble(10,100)
18+
);
19+
}
20+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[
2+
{
3+
"PropertyGroupId": "OutputQueue0",
4+
"PropertyMap": {
5+
"sqs-url": "https://sqs.us-east-1.amazonaws.com/012345678901/MyTestQueue",
6+
"aws.region": "us-east-1"
7+
}
8+
}
9+
]
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
rootLogger.level = INFO
2+
rootLogger.appenderRef.console.ref = ConsoleAppender
3+
4+
appender.console.name = ConsoleAppender
5+
appender.console.type = CONSOLE
6+
appender.console.layout.type = PatternLayout
7+
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

0 commit comments

Comments
 (0)