Skip to content

Commit efd3305

Browse files
authored
Upgrade s3parquet example (#96)
Updated S3ParquetSink example to Flink 1.20 Removed CFN and script, no longer used
1 parent 3918d81 commit efd3305

18 files changed

+158
-554
lines changed

java/S3ParquetSink/README.md

Lines changed: 20 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -1,165 +1,40 @@
11
# S3 Parquet Sink
22

3-
* Flink version: 1.15
3+
* Flink version: 1.20
44
* Flink API: DataStream API
55
* Language Java (11)
6+
* Connectors: FileSystem Sink (and DataGen connector)
67

7-
This example demonstrates how to write data coming from a Kinesis Data Stream into an Amazon S3 Bucket in Parquet format using Hive style partitioning.
8+
This example demonstrates how to write Parquet files to S3.
89

9-
This example uses data generated from the [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator)
10+
The example generates random stock price data using the DataGen connector and writes to S3 as Parquet files with
11+
a bucketing in the format `year=yyyy/month=MM/day=dd/hour=HH/` and rotating files on checkpoint.
1012

11-
The Flink application consumes data in String format from a Kinesis Data Streams,
12-
parses the JSON and performs a count of number of symbols processed in a Tumbling Window of 1 minute,
13-
using processing time. It sinks the results in parquet format using Avro Writer to an S3 Bucket.
13+
Note that FileSystem sink commit the writes on checkpoint. For this reason, checkpoint are programmatically enabled when running locally.
14+
When running on Managed Flink checkpoints are controlled by the application configuration and enabled by default.
1415

15-
![Flink Example](images/flink-kinesis-s3.png)
16+
## Prerequisites
1617

17-
### Sample Input
18-
```json
19-
{"price": 36, "symbol":"AMZN"}
18+
* An S3 bucket for writing data. The application IAM Role must allow writing to the bucket
2019

21-
```
22-
### Sample Output
23-
The data gets written to S3 path.
24-
Following is the sample output of `aws s3 ls --recursive s3://<BUCKET-NAME>/flink/msf/` if the output gets written to `s3://<BUCKET-NAME>/flink/msf/` -
25-
```shell
26-
2023-10-13 13:36:01 736 flink/msf/year=2023/month=10/day=13/hour=13/part-06887bc3-7ba4-4c77-8688-5a673a947175-0.parquet
27-
2023-10-13 13:39:01 747 flink/msf/year=2023/month=10/day=13/hour=13/part-34cea3b7-7f3e-4039-8ecb-3ba3ab5a5845-0.parquet
28-
2023-10-13 13:40:01 747 flink/msf/year=2023/month=10/day=13/hour=13/part-34cea3b7-7f3e-4039-8ecb-3ba3ab5a5845-1.parquet
29-
2023-10-13 13:41:01 747 flink/msf/year=2023/month=10/day=13/hour=13/part-34cea3b7-7f3e-4039-8ecb-3ba3ab5a5845-2.parquet
30-
2023-10-13 13:36:01 747 flink/msf/year=2023/month=10/day=13/hour=13/part-419f4d33-0c28-40ca-936c-a34d9f6e8c8c-0.parquet
31-
2023-10-13 13:39:01 736 flink/msf/year=2023/month=10/day=13/hour=13/part-df8700ad-679c-48f1-8bc3-7fd002efa38b-0.parquet
32-
2023-10-13 13:40:01 736 flink/msf/year=2023/month=10/day=13/hour=13/part-df8700ad-679c-48f1-8bc3-7fd002efa38b-1.parquet
33-
2023-10-13 13:41:01 736 flink/msf/year=2023/month=10/day=13/hour=13/part-df8700ad-679c-48f1-8bc3-7fd002efa38b-2.parquet
20+
## Runtime Configuration
3421

35-
```
22+
The application reads the runtime configuration from the Runtime Properties, when running on Amazon Managed Service for Apache Flink,
23+
or, when running locally, from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder.
3624

37-
The sample content of file is -
38-
```
39-
{
40-
"symbol": "\"TBV\"",
41-
"count": 758
42-
}
43-
{
44-
"symbol": "\"AMZN\"",
45-
"count": 749
46-
}
47-
{
48-
"symbol": "\"AAPL\"",
49-
"count": 777
50-
}
25+
All parameters are case-sensitive.
5126

52-
```
27+
| Group ID | Key | Description |
28+
|----------------|---------------|------------------------------------|
29+
| `OutputBucket` | `bucket.name` | Name of the destination S3 bucket. |
30+
| `OutputBucket` | `bucket.path` | Base path withing the bucket. |
5331

54-
## Pre-requisites
32+
To configure the application on Managed Service for Apache Flink, set up these parameter in the *Runtime properties*.
5533

56-
In order for to have this sample running locally or in Amazon Managed Service For Apache Flink, you will need the following:
34+
To configure the application for running locally, edit the [json file](resources/flink-application-properties-dev.json).
5735

58-
* Existing Kinesis Data Stream (Please add Kinesis Data Stream Name and Region in flink-application-properties-dev.json)
59-
* Existing S3 Bucket (Please add your S3 Bucket Name, including path to which you want the application to write the results, in flink-application-properties-dev.json )
60-
* JSON producer, for which one of the fields is "Symbol"
61-
62-
You can modify the Flink Application, if you wish to perform the count on a different field.
63-
64-
## Flink compatibility
65-
66-
**Note:** This project is compatible with Flink 1.15+ and Amazon Managed Service for Apache Flink.
67-
68-
### Flink API compatibility
69-
70-
It uses the `FlinkKinesisConsumer` and `FileSink` (as opposed to `StreamingFileSink`).
71-
72-
## Notes about using AVRO with Apache Flink
73-
74-
### AVRO-generated classes
75-
76-
This project uses classes generated at built-time as data objects.
77-
78-
As a best practice, only the AVRO schema definitions (IDL `.avdl` files in this case) are included in the project source
79-
code.
80-
81-
AVRO Maven plugin generates the Java classes (source code) at build-time, during the
82-
[`generate-source`](https://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html) phase.
83-
84-
The generated classes are written into `./target/generated-sources/avro` directory and should **not** be committed with
85-
the project source.
86-
87-
This way, the only dependency is on the schema definition file(s).
88-
If any change is required, the schema file is modified and the AVRO classes are re-generated automatically in the build.
89-
90-
Code generation is supported by all common IDEs like IntelliJ.
91-
If your IDE does not see the AVRO classes (`TradeCount`) when you import the project for the
92-
first time, you may manually run `mvn generate-sources` once of force source code generation from the IDE.
93-
94-
### AVRO-generated classes (SpecificRecord) in Apache Flink
95-
96-
Using AVRO-generated classes (SpecificRecord) within the flow of the Flink application (between operators) or in the
97-
Flink state, has an additional benefit.
98-
Flink will [natively and efficiently serialize and deserialize](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos)
99-
these objects, without risking of falling back to Kryo.
100-
101-
### FileSink & S3 dependencies
102-
The following dependencies related to AVRO and S3 are included (for FLink 1.15.4):
103-
104-
1. `org.apache.flink:flink-connector-files:1.15.4` - File Sink Connector
105-
2. `org.apache.flink:flink-s3-fs-hadoop:1.15.4` - Support for writing to Amazon S3.
106-
3. `org.apache.flink:flink-avro:1.15.4` - Support for using Avro with Flink .
107-
108-
109-
### Running in Intellij
36+
### Running in IntelliJ
11037

11138
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
11239

11340
See [Running examples locally](../running-examples-locally.md) for details.
114-
115-
116-
117-
## Deploying using CloudFormation to Amazon Managed Service for Apache Flink
118-
### Pre-requisite
119-
1. Kinesis stream and S3 bucket.
120-
2. AWS user credential using which you can create CloudFormation stack from console or CLI.
121-
122-
### Build and deployment
123-
1. The steps below create stack using `./cloudformation/msf-deploy.yaml`.
124-
2. The script `deploy.sh` creates the stack using AWS CLI. Ensure that AWS CLI is configured and your user has permissions to create CloudFormation stack.
125-
3. Alternatively you can deploy from console using `./cloudformation/msf-deploy.yaml` and pass required parameters.
126-
4. Edit `deploy.sh` to modify "Region configuration" . Modify following configurations -
127-
* region= Deployment region
128-
129-
5. Edit `deploy.sh` to modify "Kinesis and S3 Sink configuration". Modify following configurations -
130-
* input_stream= Input Kinesis stream name.
131-
* s3_bucket_name= S3 Bucket name
132-
* s3_file_path = S3 folder path. Ex. flink/msf
133-
Ensure that source stream and sink bucket are created.
134-
135-
6. To build code, execute the script below which will build the jar and upload the jar to S3 at s3://BUCKET_NAME/flink/flink-kds-s3.jar.
136-
```shell
137-
./build.sh <BUCKET_NAME>
138-
```
139-
7. Run `deploy.sh` to deploy the CloudFormation template . Refer the sample CloudFormation template at `./cloudformation/msf-deploy.yaml` .
140-
The CloudFormation needs the jar to be there at s3://BUCKET_NAME/flink/flink-kds-s3.jar.
141-
142-
```
143-
./deploy.sh <BUCKET_NAME>
144-
```
145-
8. The template creates following resources -
146-
* Flink application with application name defined by application_name in deploy.sh.
147-
* CloudWatch log group with name - /aws/amazon-msf/${application_name}
148-
* CloudWatch log stream under the log group created above by name amazon-msf-log-stream.
149-
* IAM execution role for Flink application.
150-
* IAM managed policy for permission.
151-
152-
153-
## Data generator - Kinesis
154-
You can use [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator),
155-
also available in a [hosted version](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html),
156-
to generate random data to Kinesis Data Stream and test the application.
157-
158-
RecordTemplate:
159-
```json
160-
{
161-
"symbol":"{{random.arrayElement(["AAPL","AMZN","MSFT","INTC","TBV"])}}"
162-
}
163-
```
164-
165-

java/S3ParquetSink/build.sh

Lines changed: 0 additions & 25 deletions
This file was deleted.

java/S3ParquetSink/cloudformation/msf-deploy.yaml

Lines changed: 0 additions & 144 deletions
This file was deleted.

0 commit comments

Comments
 (0)