Skip to content

Commit 4f8e88b

Browse files
update datastreamsink iceberg version (#100)
* update datastreamsink iceberg version * removing sort from creating table
1 parent 86d940f commit 4f8e88b

File tree

1 file changed

+2
-5
lines changed

1 file changed

+2
-5
lines changed

java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
public class IcebergSinkBuilder {
3636
private static final String DEFAULT_GLUE_DB = "default";
3737
private static final String DEFAULT_ICEBERG_TABLE_NAME = "prices_iceberg";
38-
private static final String DEFAULT_ICEBERG_SORT_ORDER_FIELD = "accountNr";
3938
private static final String DEFAULT_ICEBERG_PARTITION_FIELDS = "symbol";
4039
private static final String DEFAULT_ICEBERG_OPERATION = "upsert";
4140
private static final String DEFAULT_ICEBERG_UPSERT_FIELDS = "symbol";
@@ -45,7 +44,7 @@ public class IcebergSinkBuilder {
4544
* If Iceberg Table has not been previously created, we will create it using the Partition Fields specified in the
4645
* Properties, as well as add a Sort Field to improve query performance
4746
*/
48-
private static void createTable(Catalog catalog, TableIdentifier outputTable, org.apache.iceberg.Schema icebergSchema, PartitionSpec partitionSpec, String sortField) {
47+
private static void createTable(Catalog catalog, TableIdentifier outputTable, org.apache.iceberg.Schema icebergSchema, PartitionSpec partitionSpec) {
4948
// If table has been previously created, we do not do any operation or modification
5049
if (!catalog.tableExists(outputTable)) {
5150
Table icebergTable = catalog.createTable(outputTable, icebergSchema, partitionSpec);
@@ -79,8 +78,6 @@ public static FlinkSink.Builder createBuilder(Properties icebergProperties, Data
7978
String partitionFields = icebergProperties.getProperty("partition.fields", DEFAULT_ICEBERG_PARTITION_FIELDS);
8079
List<String> partitionFieldList = Arrays.asList(partitionFields.split("\\s*,\\s*"));
8180

82-
String sortField = icebergProperties.getProperty("sort.field", DEFAULT_ICEBERG_SORT_ORDER_FIELD);
83-
8481
// Iceberg you can perform Appends, Upserts and Overwrites.
8582
String icebergOperation = icebergProperties.getProperty("operation", DEFAULT_ICEBERG_OPERATION);
8683
Preconditions.checkArgument(icebergOperation.equals("append") || icebergOperation.equals("upsert") || icebergOperation.equals("overwrite"), "Invalid Iceberg Operation");
@@ -119,7 +116,7 @@ public static FlinkSink.Builder createBuilder(Properties icebergProperties, Data
119116
// Based on how many fields we want to partition, we create the Partition Spec
120117
PartitionSpec partitionSpec = getPartitionSpec(icebergSchema, partitionFieldList);
121118
// We create the Iceberg Table, using the Iceberg Catalog, Table Identifier, Schema parsed in Iceberg Schema Format and the partition spec
122-
createTable(catalog, outputTable, icebergSchema, partitionSpec, sortField);
119+
createTable(catalog, outputTable, icebergSchema, partitionSpec);
123120
// Once the table has been created in the job or before, we load it
124121
TableLoader tableLoader = TableLoader.fromCatalog(glueCatalogLoader, outputTable);
125122
// Get RowType Schema from Iceberg Schema

0 commit comments

Comments
 (0)