Skip to content

Latest commit

 

History

History

kafka-to-database

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 

Streaming data from Kafka to a Database using Kafka Connect JDBC Sink

See also:


Tested with Confluent Platform 6.1, ksqlDB 0.15, Kafka Connect JDBC connector 5.5.3

This uses Docker Compose to run the Kafka Connect worker.

  1. Bring the Docker Compose up

    docker-compose up -d
  2. Make sure everything is up and running

    ➜ docker-compose ps
         Name                    Command                  State                     Ports
    ---------------------------------------------------------------------------------------------------
    broker            /etc/confluent/docker/run        Up             0.0.0.0:9092->9092/tcp
    kafka-connect     bash -c cd /usr/share/conf ...   Up (healthy)   0.0.0.0:8083->8083/tcp, 9092/tcp
    kafkacat          /bin/sh -c apk add jq;           Up
                      wh ...
    ksqldb            /usr/bin/docker/run              Up             0.0.0.0:8088->8088/tcp
    mysql             docker-entrypoint.sh mysqld      Up             0.0.0.0:3306->3306/tcp, 33060/tcp
    schema-registry   /etc/confluent/docker/run        Up             0.0.0.0:8081->8081/tcp
    zookeeper         /etc/confluent/docker/run        Up             2181/tcp, 2888/tcp, 3888/tcp
  3. Create test topic + data using ksqlDB (but it’s still just a Kafka topic under the covers)

    docker exec -it ksqldb ksql http://ksqldb:8088
    CREATE STREAM TEST01 (KEY_COL VARCHAR KEY, COL1 INT, COL2 VARCHAR)
      WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO');
    INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('X',1,'FOO');
    INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('Y',2,'BAR');
    SHOW TOPICS;
    PRINT test01 FROM BEGINNING LIMIT 2;
  4. Create the Sink connector

    curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \
         -H "Content-Type: application/json" -d '{
        "connector.class"                    : "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url"                     : "jdbc:mysql://mysql:3306/demo",
        "topics"                             : "test01",
        "key.converter"                      : "org.apache.kafka.connect.storage.StringConverter",
        "value.converter"                    : "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "connection.user"                    : "connect_user",
        "connection.password"                : "asgard",
        "auto.create"                        : true,
        "auto.evolve"                        : true,
        "insert.mode"                        : "insert",
        "pk.mode"                            : "record_key",
        "pk.fields"                          : "MESSAGE_KEY"
    }'

    Things to customise for your environment:

    • topics : the source topic(s) you want to send to S3

    • key.converter : match the serialisation of your source data (see here)

    • value.converter : match the serialisation of your source data (see here)

      Check data in target db

      docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
      mysql> SHOW TABLES;
      +----------------+
      | Tables_in_demo |
      +----------------+
      | test01         |
      +----------------+
      1 row in set (0.00 sec)
      
      mysql> SELECT * FROM test01;
      +------+------+-------------+
      | COL1 | COL2 | MESSAGE_KEY |
      +------+------+-------------+
      |    1 | FOO  | X           |
      |    2 | BAR  | Y           |
      +------+------+-------------+
      2 rows in set (0.00 sec)
  5. Insert some more data. Notice what happens if you re-use a key.

    INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('Z',1,'WOO');
    INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('Y',4,'PFF');

    The Kafka Connect tasks status shows that there’s a duplicate key error

    $ curl -s http://localhost:8083/connectors/sink-jdbc-mysql-01/tasks/0/status | jq '.'
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "kafka-connect:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:\njava.sql.BatchUpdateException: Duplicate entry 'Y' for key 'test01.PRIMARY'\njava.sql.SQLIntegrityConstraintViolationException: Duplicate entry 'Y' for key 'test01.PRIMARY'\n\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:89)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)\n\t... 10 more\nCaused by: java.sql.SQLException: Exception chain:\njava.sql.BatchUpdateException: Duplicate entry 'Y' for key 'test01.PRIMARY'\njava.sql.SQLIntegrityConstraintViolationException: Duplicate entry 'Y' for key 'test01.PRIMARY'\n\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86)\n\t... 11 more\n"
    }
  6. Update the Sink connector to use UPSERT mode

    curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \
         -H "Content-Type: application/json" -d '{
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://mysql:3306/demo",
        "topics": "test01",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "connection.user": "connect_user",
        "connection.password": "asgard",
        "auto.create": true,
        "auto.evolve": true,
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "MESSAGE_KEY"
    }'

    Now the row for key Y updates, whilst Z is added:

    mysql> SELECT * FROM test01;
    +------+------+-------------+
    | COL1 | COL2 | MESSAGE_KEY |
    +------+------+-------------+
    |    1 | FOO  | X           |
    |    4 | PFF  | Y           |
    |    1 | WOO  | Z           |
    +------+------+-------------+
    3 rows in set (0.00 sec)
  7. Drop fields, add fields - note how the target schema evolves in-place

    # Drop the existing connector
    curl -X DELETE http://localhost:8083/connectors/sink-jdbc-mysql-01
    
    # Create a new one, reading from the same topic with new config
    # Because it's got a new name, the connector will re-read all the messages
    # from the topic.
    curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-02/config \
         -H "Content-Type: application/json" -d '{
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://mysql:3306/demo",
        "topics": "test01",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "connection.user": "connect_user",
        "connection.password": "asgard",
        "auto.create": true,
        "auto.evolve": true,
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "MESSAGE_KEY",
        "transforms": "dropSome,addSome",
        "transforms.dropSome.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.dropSome.blacklist": "COL2",
        "transforms.addSome.type":"org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.addSome.partition.field": "_partition",
        "transforms.addSome.timestamp.field" : "RECORD_TS"
    }'
    mysql> describe test01;
    +-------------+--------------+------+-----+---------+-------+
    | Field       | Type         | Null | Key | Default | Extra |
    +-------------+--------------+------+-----+---------+-------+
    | COL1        | int          | YES  |     | NULL    |       |
    | COL2        | varchar(256) | YES  |     | NULL    |       |
    | MESSAGE_KEY | varchar(256) | NO   | PRI | NULL    |       |
    | _partition  | int          | YES  |     | NULL    |       |
    | RECORD_TS   | datetime(3)  | YES  |     | NULL    |       |
    +-------------+--------------+------+-----+---------+-------+
    5 rows in set (0.00 sec)
    
    mysql> select * from test01;
    +------+------+-------------+------------+-------------------------+
    | COL1 | COL2 | MESSAGE_KEY | _partition | RECORD_TS               |
    +------+------+-------------+------------+-------------------------+
    |    1 | FOO  | X           |          0 | 2021-03-11 11:50:00.759 |
    |    4 | PFF  | Y           |          0 | 2021-03-11 11:50:47.761 |
    |    1 | WOO  | Z           |          0 | 2021-03-11 11:50:47.682 |
    +------+------+-------------+------------+-------------------------+
    3 rows in set (0.00 sec)
  8. Write some CSV and JSON to new topics

    docker exec -i kafkacat kafkacat \
            -b broker:29092 -P \
            -t some_json_data <<EOF
    { "ID": 1, "Artist": "Rick Astley", "Song": "Never Gonna Give You Up" }
    { "ID": 2, "Artist": "asdfasd", "Song": "dsfjfghg" }
    EOF
    
    docker exec -i kafkacat kafkacat \
            -b broker:29092 -P \
            -t some_json_data_with_a_schema <<EOF
    { "schema": { "type": "struct", "optional": false, "version": 1, "fields": [ { "field": "ID", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true } ] }, "payload": { "ID": "1", "Artist": "Rick Astley", "Song": "Never Gonna Give You Up" } }
    EOF
    
    docker exec -i kafkacat kafkacat \
            -b broker:29092 -P \
            -t some_csv_data <<EOF
    1,Rick Astley,Never Gonna Give You Up
    EOF
  9. Stream the JSON data that has a schema to DB:

    curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-02-json/config \
         -H "Content-Type: application/json" -d '{
        "connector.class"               : "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url"                : "jdbc:mysql://mysql:3306/demo",
        "topics"                        : "some_json_data_with_a_schema",
        "key.converter"                 : "org.apache.kafka.connect.storage.StringConverter",
        "value.converter"               : "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
        "connection.user"               : "connect_user",
        "connection.password"           : "asgard",
        "auto.create"                   : true,
        "auto.evolve"                   : true,
        "insert.mode"                   : "insert"
    }'
  10. Use ksqlDB to apply a schema to the CSV and schemaless-JSON, and show off INSERT INTO for merging two topics into one with a common schema

    CREATE STREAM SOME_JSON (ID INT, ARTIST VARCHAR, SONG VARCHAR)
      WITH (KAFKA_TOPIC='some_json_data', VALUE_FORMAT='JSON');
    
    SET 'auto.offset.reset' = 'earliest';
    
    CREATE STREAM SOME_JSON_AS_AVRO
      WITH (VALUE_FORMAT='AVRO') AS
        SELECT * FROM SOME_JSON;
    
    CREATE STREAM SOME_CSV (ID INT, ARTIST VARCHAR, SONG VARCHAR)
      WITH (KAFKA_TOPIC='some_csv_data', VALUE_FORMAT='DELIMITED');
    
    INSERT INTO SOME_JSON_AS_AVRO SELECT * FROM SOME_CSV;
  11. Create a sink for the reserialized data

    curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-02-avro/config \
         -H "Content-Type: application/json" -d '{
        "connector.class"                    : "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url"                     : "jdbc:mysql://mysql:3306/demo",
        "topics"                             : "SOME_JSON_AS_AVRO",
        "key.converter"                      : "org.apache.kafka.connect.storage.StringConverter",
        "value.converter"                    : "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "connection.user"                    : "connect_user",
        "connection.password"                : "asgard",
        "auto.create"                        : true,
        "auto.evolve"                        : true,
        "insert.mode"                        : "insert"
    }'

References