Skip to content

Files

array-agg-postgres

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
Sep 11, 2024
Sep 11, 2024
Sep 11, 2024
Sep 11, 2024
Sep 11, 2024
Sep 11, 2024
Sep 11, 2024

Array Aggregation with Flink SQL

This demo shows how to aggregate the contents of arrays with Flink SQL, using the built-in function JSON_ARRAYAGG(), as well as a user-defined function for emitting a fully type-safe data structure. It uses the Upsert Kafka SQL Connector together with the Postgres CDC connector for Apache Flink, based on Debezium. Redpanda is used as a data streaming platform.

Prerequisites

Make sure to have the following software installed on your machine:

  • Java and Apache Maven
  • Docker and Docker Compose
  • Redpanda's rpk CLI
  • jq (optional)

Preparation

Build the JAR with the ARRAY_AGGR operator:

mvn clean verify

Start up all the components using Docker Compose:

docker compose up --build

Obtain a Flink SQL prompt and enable mini-batching:

docker-compose run sql-client
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '500 ms';
SET 'table.exec.mini-batch.size' = '1000';

Obtain a Postgres client session:

docker run --tty --rm -i \
  --network array-agg-network \
  quay.io/debezium/tooling:1.2 \
  bash -c 'pgcli postgresql://postgres:postgres@postgres:5432/postgres'

Create two topics in Redpanda:

rpk topic create orders_with_lines orders_with_lines_and_customer

Ingesting Data From Postgres

Create a table in Flink SQL for ingesting the data from the orders table in Postgres:

CREATE TABLE purchase_orders (
   id INT,
   order_date DATE,
   purchaser_id INT,
   db_name STRING METADATA FROM 'database_name' VIRTUAL,
   operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'postgres-cdc',
   'hostname' = 'postgres',
   'port' = '5432',
   'username' = 'postgres',
   'password' = 'postgres',
   'database-name' = 'postgres',
   'schema-name' = 'inventory',
   'table-name' = 'purchase_orders',
   'slot.name' = 'purchase_orders_slot'
);

Create a table for order lines:

CREATE TABLE order_lines (
   id INT,
   order_id INT,
   product_id INT,
   quantity INT,
   price DOUBLE,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'postgres-cdc',
   'hostname' = 'postgres',
   'port' = '5432',
   'username' = 'postgres',
   'password' = 'postgres',
   'database-name' = 'postgres',
   'schema-name' = 'inventory',
   'table-name' = 'order_lines',
   'slot.name' = 'order_lines_slot'
);

Register the UDF:

CREATE FUNCTION ARRAY_AGGR AS 'co.decodable.demos.arrayagg.ArrayAggr';

Perform some data changes in Postgres (via pgcli) and observe how the data in the Flink shell changes accordingly:

TODO

Emitting Data to Redpanda

Create an instance of the Kafka upsert connector:

CREATE TABLE orders_with_lines (
  order_id INT,
  order_date DATE,
  purchaser_id INT,
  lines ARRAY<row<id INT, product_id INT, quantity INT, price DOUBLE>>,
  PRIMARY KEY (order_id) NOT ENFORCED
 )
WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'orders-with-lines',
  'properties.bootstrap.servers' = 'redpanda:29092',
  'key.format' = 'json', 'value.format' = 'json'
);
INSERT INTO orders_with_lines
  SELECT
    po.id,
    po.order_date,
    po.purchaser_id,
    ARRAY_AGGR(ROW(ol.id, ol.product_id, ol.quantity, ol.price))
  FROM
    purchase_orders po
      LEFT JOIN order_lines ol ON ol.order_id = po.id
  GROUP BY po.id, po.order_date, po.purchaser_id;

Next, observe the data in Redpanda (do some more data changes in Postgres as well):

rpk topic consume orders-with-lines | jq '.value | fromjson'

Emitting events to Elasticsearch

CREATE TABLE orders_with_lines_es (
  order_id INT,
  order_date DATE,
  purchaser_id INT,
  lines ARRAY<row<id INT, product_id INT, quantity INT, price DOUBLE>>,
  PRIMARY KEY (order_id) NOT ENFORCED
 )
 WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://elasticsearch:9200',
     'index' = 'orders_with_lines'
 );
http http://localhost:9200/orders_with_lines/_doc/10001 | jq .

Clean-up

Shut down all the components using Docker Compose:

docker compose down

http://localhost:9200/orders_with_lines/_search?pretty