Skip to content

Latest commit

History

History
571 lines (499 loc) 路 32.5 KB

ksqldb-jdbc-sink.adoc

File metadata and controls

571 lines (499 loc) 路 32.5 KB

ksqlDB & Kafka Connect JDBC Sink in action

Tested with Confluent Platform 6.1, ksqlDB 0.15, Kafka Connect JDBC connector 5.5.3

Create source connector (data generator)

CREATE SOURCE CONNECTOR SOURCE_DATA WITH (
    'connector.class'                = 'io.mdrogalis.voluble.VolubleSourceConnector',
    'value.converter'                = 'org.apache.kafka.connect.json.JsonConverter',
    'value.converter.schemas.enable' = 'false',
    'key.converter'                  = 'org.apache.kafka.connect.storage.StringConverter',

    'genkp.customers.with'                 = '#{Internet.uuid}',
    'genv.customers.name.with'             = '#{HitchhikersGuideToTheGalaxy.character}',
    'genv.customers.email.with'            = '#{Internet.emailAddress}',
    'genv.customers.location->city.with'   = '#{HitchhikersGuideToTheGalaxy.location}',
    'genv.customers.location->planet.with' = '#{HitchhikersGuideToTheGalaxy.planet}',
    'topic.customers.records.exactly'      = 10,

    'genkp.transactions.with'                = '#{Internet.uuid}',
    'genv.transactions.customer_id.matching' = 'customers.key',
    'genv.transactions.cost.with'            = '#{Commerce.price}',
    'genv.transactions.card_type.with'       = '#{Business.creditCardType}',
    'genv.transactions.item.with'            = '#{Beer.name}',
    'topic.transactions.throttle.ms'         = 500
);

Declare Stream & Table over the source topics

CREATE TABLE CUSTOMER (ID VARCHAR PRIMARY KEY,
                       NAME VARCHAR,
                       EMAIL VARCHAR,
                       LOCATION STRUCT<CITY VARCHAR,
                                       PLANET VARCHAR>)
                 WITH (KAFKA_TOPIC='customers',
                       VALUE_FORMAT='JSON');

CREATE STREAM TXNS (ID VARCHAR KEY,
                    CUSTOMER_ID VARCHAR,
                    COST DOUBLE,
                    CARD_TYPE VARCHAR,
                    ITEM VARCHAR)
              WITH (KAFKA_TOPIC='transactions',
                    VALUE_FORMAT='JSON');
ksql> SELECT * FROM CUSTOMER EMIT CHANGES LIMIT 2;
+---------------------+---------------------+---------------------+---------------------+
|ID                   |NAME                 |EMAIL                |LOCATION             |
+---------------------+---------------------+---------------------+---------------------+
|84e2c8d1-3ac6-461b-8a|Agrajag              |benny.skiles@hotmail.|{CITY=Lamuella, PLANE|
|6c-2a765be6bd82      |                     |com                  |T=Nano}              |
|ee4cb838-cbf3-4cb7-8d|Pasta Fasta          |peter.nikolaus@yahoo.|{CITY=Betelgeuse, PLA|
|d2-b7b3a9e9da11      |                     |com                  |NET=Krikkit}         |
Limit Reached
Query terminated

ksql> SELECT * FROM TXNS EMIT CHANGES LIMIT 5;
+-----------------+-----------------+-----------------+-----------------+-----------------+
|ID               |CUSTOMER_ID      |COST             |CARD_TYPE        |ITEM             |
+-----------------+-----------------+-----------------+-----------------+-----------------+
|1a14a135-d9c3-42b|84e2c8d1-3ac6-461|44.05            |visa             |Alpha King Pale A|
|e-b692-9bfd58beec|b-8a6c-2a765be6bd|                 |                 |le               |
|63               |82               |                 |                 |                 |
|a01765a8-6d1e-40d|ee4cb838-cbf3-4cb|25.62            |visa             |Samuel Smith鈥檚 Im|
|8-9bc2-8d8166e020|7-8dd2-b7b3a9e9da|                 |                 |perial IPA       |
|14               |11               |                 |                 |                 |
|d6047271-d28e-4c2|d83c7c98-c2f1-449|81.8             |jcb              |Storm King Stout |
|7-bfc0-8dbf31855c|e-b80a-19c38d06b4|                 |                 |                 |
|03               |76               |                 |                 |                 |
|aaf68dbd-48d0-48e|9058ef36-3ab8-491|37.54            |visa             |Nugget Nectar    |
|1-a1c2-c28d9365ae|a-8142-c059b74f06|                 |                 |                 |
|e9               |f9               |                 |                 |                 |
|a4ea8cb5-b89f-48e|9058ef36-3ab8-491|27.8             |jcb              |Racer 5 India Pal|
|5-a023-7ab33a55b5|a-8142-c059b74f06|                 |                 |e Ale, Bear Repub|
|9f               |f9               |                 |                 |lic Bre          |
Limit Reached
Query terminated

Enrich transactions with customer info

SET 'auto.offset.reset' = 'earliest';

CREATE STREAM TXNS_ENRICHED WITH (FORMAT='AVRO') AS
    SELECT T.ID,
           AS_VALUE(T.ID) AS TXN_ID,
           T.ROWTIME AS TXN_TS,
           T.ITEM,
           T.COST,
           T.CARD_TYPE,
           C.ID AS CUST_ID,
           C.NAME,
           C.EMAIL,
           C.LOCATION
      FROM TXNS T
             LEFT OUTER JOIN
           CUSTOMER C
             ON T.CUSTOMER_ID=C.ID
      PARTITION BY T.ID;
ksql> SELECT * FROM TXNS_ENRICHED EMIT CHANGES LIMIT 2;
+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|T_ID            |TXN_ID          |TXN_TS          |ITEM            |COST            |CARD_TYPE       |CUST_ID         |NAME            |EMAIL           |LOCATION        |
+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|8640648b-961f-48|8640648b-961f-48|1615464205090   |Edmund Fitzgeral|87.52           |diners_club     |dae12f7a-3b49-43|Vroomfondel     |shaun.sauer@gmai|{CITY=Seventh Ga|
|64-82d7-5606c7cf|64-82d7-5606c7cf|                |d Porter        |                |                |4a-9b92-2e4ea087|                |l.com           |laxy of Light an|
|3a68            |3a68            |                |                |                |                |a2b1            |                |                |d Ingenuity, PLA|
|                |                |                |                |                |                |                |                |                |NET=Kria}       |
|07626615-d656-4f|07626615-d656-4f|1615464205468   |Nugget Nectar   |7.21            |laser           |c87d82e3-0f38-4e|Pizpot Gargravar|tristan.upton@ho|{CITY=Evildrome |
|0f-8e32-ed02365c|0f-8e32-ed02365c|                |                |                |                |4b-8bd3-9ec61648|r               |tmail.com       |Boozarama, PLANE|
|011e            |011e            |                |                |                |                |1813            |                |                |T=Xaxis}        |
Limit Reached
Query terminated

Stream enriched transactions to database

CREATE SINK CONNECTOR SINK_TXNS_ENRICHED_PG WITH (
    'connector.class'                     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                     = 'postgres',
    'connection.password'                 = 'postgres',
    'topics'                              = 'TXNS_ENRICHED',
    'key.converter'                       = 'io.confluent.connect.avro.AvroConverter',
    'key.converter.schema.registry.url'   = 'http://schema-registry:8081',
    'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url' = 'http://schema-registry:8081',
    'auto.create'                         = 'true',
    'auto.evolve'                         = 'true'
  );

This doesn鈥檛 work:

ksql> DESCRIBE CONNECTOR SINK_TXNS_ENRICHED_PG;

Name                 : SINK_TXNS_ENRICHED_PG
Class                : io.confluent.connect.jdbc.JdbcSinkConnector
Type                 : sink
State                : RUNNING
WorkerId             : kafka-connect:8083

 Task ID | State  | Error Trace
---------------------------------------------------------------------------------------------------------------------------------------------
 0       | FAILED | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: io.confluent.ksql.avro_schemas.KsqlDataSourceSchema_LOCATION (STRUCT) type doesn't have a mapping to the SQL database column type
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getSqlType(GenericDatabaseDialect.java:1753)
        at io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.getSqlType(PostgreSqlDatabaseDialect.java:221)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnSpec(GenericDatabaseDialect.java:1669)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.lambda$writeColumnsSpec$33(GenericDatabaseDialect.java:1658)
        at io.confluent.connect.jdbc.util.ExpressionBuilder.append(ExpressionBuilder.java:558)
        at io.confluent.connect.jdbc.util.ExpressionBuilder$BasicListBuilder.of(ExpressionBuilder.java:597)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnsSpec(GenericDatabaseDialect.java:1660)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.buildCreateTableStatement(GenericDatabaseDialect.java:1583)
        at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:91)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:120)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        ... 10 more

---------------------------------------------------------------------------------------------------------------------------------------------
ksql>

Use SMT to flatten LOCATION

DROP CONNECTOR SINK_TXNS_ENRICHED_PG;
CREATE SINK CONNECTOR SINK_TXNS_ENRICHED_PG WITH (
    'connector.class'                     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                     = 'postgres',
    'connection.password'                 = 'postgres',
    'topics'                              = 'TXNS_ENRICHED',
    'key.converter'                       = 'io.confluent.connect.avro.AvroConverter',
    'key.converter.schema.registry.url'   = 'http://schema-registry:8081',
    'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url' = 'http://schema-registry:8081',
    'auto.create'                         = 'true',
    'auto.evolve'                         = 'true',
    'table.name.format'                   = '${topic}',
    'transforms'                          = 'flatten',
    'transforms.flatten.type'             = 'org.apache.kafka.connect.transforms.Flatten$Value'
  );

鉁匱able is created and populated in Postgres:

docker exec -it postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
postgres=# \d+ "TXNS_ENRICHED"
                                        Table "public.TXNS_ENRICHED"
     Column      |       Type       | Collation | Nullable | Default | Storage  | Stats target | Description
-----------------+------------------+-----------+----------+---------+----------+--------------+-------------
 TXN_ID          | text             |           |          |         | extended |              |
 TXN_TS          | bigint           |           |          |         | plain    |              |
 ITEM            | text             |           |          |         | extended |              |
 COST            | double precision |           |          |         | plain    |              |
 CARD_TYPE       | text             |           |          |         | extended |              |
 CUST_ID         | text             |           |          |         | extended |              |
 NAME            | text             |           |          |         | extended |              |
 EMAIL           | text             |           |          |         | extended |              |
 LOCATION.CITY   | text             |           |          |         | extended |              |
 LOCATION.PLANET | text             |           |          |         | extended |              |
Access method: heap

postgres=# SELECT * FROM "TXNS_ENRICHED" LIMIT 2;
                TXN_ID                |    TXN_TS     |           ITEM           | COST  |  CARD_TYPE  |               CUST_ID                |       NAME        |           EMAIL           |             LOCATION.CITY             | LOCATION.PLANET
--------------------------------------+---------------+--------------------------+-------+-------------+--------------------------------------+-------------------+---------------------------+---------------------------------------+-----------------
 8640648b-961f-4864-82d7-5606c7cf3a68 | 1615464205090 | Edmund Fitzgerald Porter | 87.52 | diners_club | dae12f7a-3b49-434a-9b92-2e4ea087a2b1 | Vroomfondel       | shaun.sauer@gmail.com     | Seventh Galaxy of Light and Ingenuity | Kria
 07626615-d656-4f0f-8e32-ed02365c011e | 1615464205468 | Nugget Nectar            |  7.21 | laser       | c87d82e3-0f38-4e4b-8bd3-9ec616481813 | Pizpot Gargravarr | tristan.upton@hotmail.com | Evildrome Boozarama                   | Xaxis
(2 rows)

Note that the TXN_TS is a bigint (epoch), not an actual timestamp type.

Use SMT to handle timestamp

DROP CONNECTOR SINK_TXNS_ENRICHED_PG;
CREATE SINK CONNECTOR SINK_TXNS_ENRICHED_PG WITH (
    'connector.class'                         = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                          = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                         = 'postgres',
    'connection.password'                     = 'postgres',
    'topics'                                  = 'TXNS_ENRICHED',
    'key.converter'                           = 'io.confluent.connect.avro.AvroConverter',
    'key.converter.schema.registry.url'       = 'http://schema-registry:8081',
    'value.converter'                         = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url'     = 'http://schema-registry:8081',
    'auto.create'                             = 'true',
    'auto.evolve'                             = 'true',
    'table.name.format'                       = '${topic}',
    'transforms'                              = 'flatten,setTimestampType',
    'transforms.flatten.type'                 = 'org.apache.kafka.connect.transforms.Flatten$Value',
    'transforms.setTimestampType.type'        = 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
    'transforms.setTimestampType.field'       = 'TXN_TS',
    'transforms.setTimestampType.target.type' = 'Timestamp'
);

Connector doesn鈥檛 work though (Status is WARNING):

ksql> SHOW CONNECTORS;

 Connector Name        | Type   | Class                                       | Status
------------------------------------------------------------------------------------------------------------
 SOURCE_DATA           | SOURCE | io.mdrogalis.voluble.VolubleSourceConnector | RUNNING (1/1 tasks RUNNING)
 SINK_TXNS_ENRICHED_PG | SINK   | io.confluent.connect.jdbc.JdbcSinkConnector | WARNING (0/1 tasks RUNNING)
------------------------------------------------------------------------------------------------------------

Check the error:

ksql> DESCRIBE CONNECTOR SINK_TXNS_ENRICHED_PG;

Name                 : SINK_TXNS_ENRICHED_PG
Class                : io.confluent.connect.jdbc.JdbcSinkConnector
Type                 : sink
State                : RUNNING
WorkerId             : kafka-connect:8083

 Task ID | State  | Error Trace
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 0       | FAILED | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "TXNS_ENRICHED"("TXN_TS","ITEM","COST","CARD_TYPE","CUST_ID","NAME","EMAIL","LOCATION.CITY","LOCATION.PLANET") VALUES('2020-03-27 23:58:30.431+00','Shakespeare Oatmeal',69.72,'mastercard','ee4cb838-cbf3-4cb7-8dd2-b7b3a9e9da11','Pasta Fasta','peter.nikolaus@yahoo.com','Betelgeuse','Krikkit') was aborted: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"
org.postgresql.util.PSQLException: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"

        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        ... 10 more
Caused by: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "TXNS_ENRICHED"("TXN_TS","ITEM","COST","CARD_TYPE","CUST_ID","NAME","EMAIL","LOCATION.CITY","LOCATION.PLANET") VALUES('2020-03-27 23:58:30.431+00','Shakespeare Oatmeal',69.72,'mastercard','ee4cb838-cbf3-4cb7-8dd2-b7b3a9e9da11','Pasta Fasta','peter.nikolaus@yahoo.com','Betelgeuse','Krikkit') was aborted: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"
org.postgresql.util.PSQLException: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"

        ... 12 more

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
ksql>

Now that the TXN_TS is coming through as a timestamp, the Postgres INSERT is failing because we鈥檙e trying to write it to a bigint field:

ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"

So here we鈥檒l ditch the previous table, and instead populate a new one (taking advantage of table.name.format to modify the target table name) using all of the existing data in the source Kafka topic:

DROP CONNECTOR SINK_TXNS_ENRICHED_PG;
CREATE SINK CONNECTOR SINK_TXNS_ENRICHED_PG_01 WITH (
    'connector.class'                         = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                          = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                         = 'postgres',
    'connection.password'                     = 'postgres',
    'topics'                                  = 'TXNS_ENRICHED',
    'key.converter'                           = 'io.confluent.connect.avro.AvroConverter',
    'key.converter.schema.registry.url'       = 'http://schema-registry:8081',
    'value.converter'                         = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url'     = 'http://schema-registry:8081',
    'auto.create'                             = 'true',
    'auto.evolve'                             = 'true',
    'table.name.format'                       = '${topic}-01',
    'transforms'                              = 'flatten,setTimestampType',
    'transforms.flatten.type'                 = 'org.apache.kafka.connect.transforms.Flatten$Value',
    'transforms.setTimestampType.type'        = 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
    'transforms.setTimestampType.field'       = 'TXN_TS',
    'transforms.setTimestampType.target.type' = 'Timestamp'
);

Now Postgres table is built and populated with Timestamp column:

postgres=# \d+ "TXNS_ENRICHED-01"
                                            Table "public.TXNS_ENRICHED-01"
     Column      |            Type             | Collation | Nullable | Default | Storage  | Stats target | Description
-----------------+-----------------------------+-----------+----------+---------+----------+--------------+-------------
 TXN_ID          | text                        |           |          |         | extended |              |
 TXN_TS          | timestamp without time zone |           |          |         | plain    |              |
 ITEM            | text                        |           |          |         | extended |              |
 COST            | double precision            |           |          |         | plain    |              |
 CARD_TYPE       | text                        |           |          |         | extended |              |
 CUST_ID         | text                        |           |          |         | extended |              |
 NAME            | text                        |           |          |         | extended |              |
 EMAIL           | text                        |           |          |         | extended |              |
 LOCATION.CITY   | text                        |           |          |         | extended |              |
 LOCATION.PLANET | text                        |           |          |         | extended |              |
Access method: heap

postgres=# SELECT * FROM "TXNS_ENRICHED-01" LIMIT 2;
                TXN_ID                |         TXN_TS          |           ITEM           | COST  |  CARD_TYPE  |               CUST_ID                |       NAME        |           EMAIL           |             LOCATION.CITY             | LOCATION.PLANET
--------------------------------------+-------------------------+--------------------------+-------+-------------+--------------------------------------+-------------------+---------------------------+---------------------------------------+-----------------
 8640648b-961f-4864-82d7-5606c7cf3a68 | 2021-03-11 12:03:25.09  | Edmund Fitzgerald Porter | 87.52 | diners_club | dae12f7a-3b49-434a-9b92-2e4ea087a2b1 | Vroomfondel       | shaun.sauer@gmail.com     | Seventh Galaxy of Light and Ingenuity | Kria
 07626615-d656-4f0f-8e32-ed02365c011e | 2021-03-11 12:03:25.468 | Nugget Nectar            |  7.21 | laser       | c87d82e3-0f38-4e4b-8bd3-9ec616481813 | Pizpot Gargravarr | tristan.upton@hotmail.com | Evildrome Boozarama                   | Xaxis
(2 rows)

Build aggregate (beers sold per hour)

SET 'auto.offset.reset' = 'earliest';

CREATE TABLE BEERS_HOUR_AGG WITH (FORMAT='AVRO') AS
    SELECT WINDOWSTART AS WINDOW_START_TS,
           ITEM,
           SUM(COST) AS TOTAL_SOLD,
           COUNT(*) AS NUMBER_SOLD
      FROM TXNS WINDOW TUMBLING (SIZE 1 HOUR)
      GROUP BY ITEM;
  • Push query (stream of aggregate changes)

    SELECT TIMESTAMPTOSTRING(WINDOW_START_TS,'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
           ITEM,
           TOTAL_SOLD,
           NUMBER_SOLD
      FROM BEERS_HOUR_AGG
      EMIT CHANGES;
  • Pull query (aggregate current state)

    SELECT TIMESTAMPTOSTRING(WINDOW_START_TS,'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
           TOTAL_SOLD,
           NUMBER_SOLD
      FROM BEERS_HOUR_AGG
     WHERE ITEM='Duvel';

Build aggregate (customer summary) to push to database

SET 'auto.offset.reset' = 'earliest';

CREATE TABLE CUSTOMER_SUMMARY WITH (FORMAT='AVRO') AS
    SELECT CUST_ID,
           NAME,
           MAX(TXN_TS) AS MOST_RECENT_ORDER_TS,
           COUNT(*) AS NUM_ORDERS,
           COUNT_DISTINCT(ITEM) AS UNIQUE_ITEMS,
           SUM(COST) AS TOTAL_COST
      FROM TXNS_ENRICHED
      GROUP BY CUST_ID, NAME;

Stream ksqlDB to database

CREATE SINK CONNECTOR SINK_CUSTOMER_SUMMARY_PG_01 WITH (
    'connector.class'                         = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                          = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                         = 'postgres',
    'connection.password'                     = 'postgres',
    'topics'                                  = 'CUSTOMER_SUMMARY',
    'key.converter'                           = 'io.confluent.connect.avro.AvroConverter',
    'key.converter.schema.registry.url'       = 'http://schema-registry:8081',
    'value.converter'                         = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url'     = 'http://schema-registry:8081',
    'auto.create'                             = 'true',
    'auto.evolve'                             = 'true',
    'table.name.format'                       = '${topic}-01',
    'transforms'                              = 'setTimestampType',
    'transforms.setTimestampType.type'        = 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
    'transforms.setTimestampType.field'       = 'MOST_RECENT_ORDER_TS',
    'transforms.setTimestampType.target.type' = 'Timestamp'
);

You鈥檒l notice you don鈥檛 get the key columns in the data, only the aggregate values:

postgres=# select * from "CUSTOMER_SUMMARY-01" LIMIT 5;
  MOST_RECENT_ORDER_TS   | NUM_ORDERS | UNIQUE_ITEMS |     TOTAL_COST
-------------------------+------------+--------------+--------------------
 2021-03-11 12:04:21.399 |         13 |           13 |  642.1600000000001
 2021-03-11 12:04:45.403 |         19 |           16 |            1013.25
 2021-03-11 12:04:48.869 |         15 |           13 |  747.2699999999999
 2021-03-11 12:04:49.369 |         17 |           14 | 1117.6200000000001
 2021-03-11 12:04:49.869 |         21 |           14 | 1057.1499999999999
(5 rows)

Use pk.mode to include the key values when pushing table state from ksqlDB to database

CREATE SINK CONNECTOR SINK_CUSTOMER_SUMMARY_PG_02 WITH (
    'connector.class'                         = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                          = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                         = 'postgres',
    'connection.password'                     = 'postgres',
    'topics'                                  = 'CUSTOMER_SUMMARY',
    'key.converter'                           = 'io.confluent.connect.avro.AvroConverter',
    'key.converter.schema.registry.url'       = 'http://schema-registry:8081',
    'value.converter'                         = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url'     = 'http://schema-registry:8081',
    'auto.create'                             = 'true',
    'auto.evolve'                             = 'true',
    'insert.mode'                             = 'upsert',
    'pk.mode'                                 = 'record_key',
    'pk.fields'                               = 'CUST_ID,NAME',
    'table.name.format'                       = '${topic}-02',
    'transforms'                              = 'setTimestampType',
    'transforms.setTimestampType.type'        = 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
    'transforms.setTimestampType.field'       = 'MOST_RECENT_ORDER_TS',
    'transforms.setTimestampType.target.type' = 'Timestamp'
);

Important settings:

  • insert.mode is upsert (not the default insert)

  • pk.mode is record_key which says we鈥檙e going to define the target table鈥檚 primary key based on field(s) from the record鈥檚 key

  • pk.fields specifies which field(s) from the record鈥檚 key we鈥檇 like to use as the PK in the target table (for a ksqlDB aggregate table this is going to be whichever columns you declared for the GROUP BY)

Note target database table is created by the sink with a primary key:

postgres=# \d+ "CUSTOMER_SUMMARY-02"
                                             Table "public.CUSTOMER_SUMMARY-02"
        Column        |            Type             | Collation | Nullable | Default | Storage  | Stats target | Description
----------------------+-----------------------------+-----------+----------+---------+----------+--------------+-------------
 MOST_RECENT_ORDER_TS | timestamp without time zone |           |          |         | plain    |              |
 NUM_ORDERS           | bigint                      |           |          |         | plain    |              |
 UNIQUE_ITEMS         | bigint                      |           |          |         | plain    |              |
 TOTAL_COST           | double precision            |           |          |         | plain    |              |
 CUST_ID              | text                        |           | not null |         | extended |              |
 NAME                 | text                        |           | not null |         | extended |              |
Indexes:
    "CUSTOMER_SUMMARY-02_pkey" PRIMARY KEY, btree ("CUST_ID", "NAME")
Access method: heap

One row per unique key with aggregate values updated in-place:

postgres=# SELECT * FROM "CUSTOMER_SUMMARY-02" WHERE "NAME" = 'Vroomfondel' ;
  MOST_RECENT_ORDER_TS   | NUM_ORDERS | UNIQUE_ITEMS |     TOTAL_COST     |               CUST_ID                |    NAME
-------------------------+------------+--------------+--------------------+--------------------------------------+-------------
 2021-03-11 12:29:51.282 |        331 |           53 | 16004.209999999983 | dae12f7a-3b49-434a-9b92-2e4ea087a2b1 | Vroomfondel
(1 row)

postgres=# SELECT * FROM "CUSTOMER_SUMMARY-02" WHERE "NAME" = 'Vroomfondel' ;
  MOST_RECENT_ORDER_TS   | NUM_ORDERS | UNIQUE_ITEMS |     TOTAL_COST     |               CUST_ID                |    NAME
-------------------------+------------+--------------+--------------------+--------------------------------------+-------------
 2021-03-11 12:30:11.786 |        336 |           53 | 16269.619999999983 | dae12f7a-3b49-434a-9b92-2e4ea087a2b1 | Vroomfondel
(1 row)