Skip to content

Commit 44b26b5

Browse files
authored
Minor fix of Iceberg source example (#92)
* Minor fixes to Iceberg Source example, removing unused code and config params
1 parent 983b832 commit 44b26b5

File tree

7 files changed

+7
-261
lines changed

7 files changed

+7
-261
lines changed

java/Iceberg/IcebergDataStreamSource/README.md

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,13 @@
44
* Flink API: DataStream API
55
* Iceberg 1.6.1
66
* Language: Java (11)
7-
* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/)
8-
and [Iceberg](https://iceberg.apache.org/docs/latest/flink/)
7+
* Flink connectors: [Iceberg](https://iceberg.apache.org/docs/latest/flink/)
98

109
This example demonstrate how to use
1110
[Flink Iceberg Source Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with the Glue Data Catalog.
1211

13-
For simplicity, the application generates synthetic data, random stock prices, internally.
14-
Data is generated as AVRO Generic Record, simulating a real source, for example a Kafka Source, that receives records
15-
serialized with AVRO.
12+
For simplicity, the application reads from the Iceberg table as AVRO Generic Records, and just print every record.
13+
(note that, when running in Managed Flink, the output will not be visible).
1614

1715
### Prerequisites
1816

@@ -43,18 +41,6 @@ Runtime parameters:
4341
| `Iceberg` | `bucket.prefix` | (mandatory) | S3 bucket prefix, for example `s3://my-bucket/iceberg`. |
4442
| `Iceberg` | `catalog.db` | `default` | Name of the Glue Data Catalog database. |
4543
| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the Glue Data Catalog table. |
46-
| `Iceberg` | `partition.fields` | `symbol` | Comma separated list of partition fields. |
47-
| `Iceberg` | `sort.field` | `timestamp` | Sort field. |
48-
| `Iceberg` | `operation` | `updsert` | Iceberg operation. One of `upsert`, `append` or `overwrite`. |
49-
| `Iceberg` | `upsert.equality.fields` | `symbol` | Comma separated list of fields used for upsert. It must match partition fields. Required if `operation` = `upsert`. |
50-
51-
52-
### Checkpoints
53-
54-
Checkpointing must be enabled. Iceberg commits writes on checkpoint.
55-
56-
When running locally, the application enables checkpoints programmatically, every 10 seconds.
57-
When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration.
5844

5945

6046
### Known limitations

java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/StreamingJob.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,18 +72,16 @@ public static void main(String[] args) throws Exception {
7272
Schema avroSchema = AvroSchemaUtils.loadSchema();
7373

7474

75-
7675
// Local dev specific settings
7776
if (isLocal(env)) {
78-
org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration();
79-
env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
77+
// We are disabling operator chaining when running locally, to allow observing every single operator in the
78+
// Flink UI, for demonstration purposes.
79+
// Disabling operator chaining can harm performance and is not recommended.
8080
env.disableOperatorChaining();
8181

8282
// Checkpointing and parallelism are set by Amazon Managed Service for Apache Flink when running on AWS
8383
env.enableCheckpointing(60000);
8484
env.setParallelism(2);
85-
86-
8785
}
8886

8987
Properties icebergProperties = applicationProperties.get("Iceberg");
@@ -120,8 +118,6 @@ public static void main(String[] args) throws Exception {
120118

121119
AvroGenericRecordReaderFunction readerFunction = AvroGenericRecordReaderFunction.fromTable(table);
122120

123-
124-
125121
IcebergSource<GenericRecord> source =
126122
IcebergSource.<GenericRecord>builder()
127123
.tableLoader(tableLoader)

java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java

Lines changed: 0 additions & 46 deletions
This file was deleted.

java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java

Lines changed: 0 additions & 154 deletions
This file was deleted.

java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSourceBuilder.java

Whitespace-only changes.
Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,10 @@
11
[
2-
{
3-
"PropertyGroupId": "DataGen",
4-
"PropertyMap": {
5-
"records.per.sec": 10.0
6-
}
7-
},
82
{
93
"PropertyGroupId": "Iceberg",
104
"PropertyMap": {
115
"bucket.prefix": "s3://<my-bucket>/iceberg",
126
"catalog.db": "iceberg",
13-
"catalog.table": "prices_iceberg",
14-
"partition.fields": "symbol",
15-
"sort.field": "timestamp",
16-
"operation": "append",
17-
"upsert.equality.fields": "symbol"
7+
"catalog.table": "prices_iceberg"
188
}
199
}
2010
]

java/Iceberg/IcebergDataStreamSource/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java

Lines changed: 0 additions & 26 deletions
This file was deleted.

0 commit comments

Comments
 (0)