# Debezium and Informix Connector :construction_worker: Xiaolin Zhang @2022-06-01 :child: --- ## Agenda #### What is this talk? - Overview of Debezium - Internal Architect of Debezium-Informix-Connector - Some Details - Demo & Read some source code #### What is **NOT**? - Explain everything about Kafka/Debezium/Informix Interrupt me **ANYTIME** when you have questions --- ## Who am I? ### Xiaolin - A software engineer - Emacs/Vim :heart: - Markdown/AsciiDoc/OrgMod :heart: - Linux/Unix/Open Source :heart: --- ### How to Read `Debezium` https://youtu.be/G7TvRzPQH-U?t=77 {%youtube G7TvRzPQH-U %} --- ## Some advices - 3. Blogs & Diagrams & Articles - 2. [Read the Fucking Manual](https://en.wikipedia.org/wiki/RTFM) - 1. Read the Fucking Source Code ``` “RTFSC—Read The F**king Source Code.” - Linus Torvalds ``` --- ## Change Data Capture - Incremental From Database ![](https://i.imgur.com/Sa7dkDw.png) --- ## CDC Use Cases: Why Kafka? - Messages have a **key** - Guaranteed **ordering** (per partition) - Pull-based - Supports compaction - Scales horizontally ![](https://i.imgur.com/9pPuhBU.png) --- ## Debezium & Kafka-connect ![](https://i.imgur.com/QRbx3sj.png) --- ## Debezium - Kafka - Kafka-Connect - Debezium - Connectors - [MongoDB](https://github.com/debezium/debezium/tree/main/debezium-connector-mongodb) - [MySQL](https://github.com/debezium/debezium/tree/main/debezium-connector-mysql) - [Oracle](https://github.com/debezium/debezium/tree/main/debezium-connector-oracle) - [PostgreSQL](https://github.com/debezium/debezium/tree/main/debezium-connector-postgres) - [SQLServer](https://github.com/debezium/debezium/tree/main/debezium-connector-sqlserver) - https://github.com/debezium/debezium-connector-db2 --- ## Debezium Concepts - Snapshot - InformixDatabaseSchema - HistorizedRelationalDatabaseSchema - InformixConnector - SourceConnector - InformixConnectorConfig - HistorizedRelationalDatabaseConnectorConfig - InformixConnectorTask - BaseSourceTask --- ## Debezium Concepts 2 - InformixOffsetContext - OffsetContext - InformixSnapshotChangeEventSource - RelationalSnapshotChangeEventSource - AbstractSnapshotChangeEventSource - SnapshotChangeEventSource - InformixStreamingChangeEventSource - StreamingChangeEventSource - ChangeEventSource - Lsn - SourceInfo --- Informix API --- ### Informix CDC Official Manual - https://www.ibm.com/docs/en/informix-servers/14.10?topic=api-change-data-capture --- ### Informix LSN Refrence: https://www.ibm.com/docs/en/informix-servers/12.10?topic=api-cdc-record-sequence-numbers - The sequence number associated with a CDC record is a BIGINT data type. ```java= IfxDataSource ds = new IfxDataSource(jdbcUrl); IfxCDCEngine.Builder builder = new IfxCDCEngine.Builder(ds); builder.sequenceId(seqId); // <- LSN ``` --- ### Informix CDC Prepare [Preparing to use the Change Data Capture API](https://www.ibm.com/docs/en/informix-servers/14.10?topic=api-preparing-use-change-data-capture) - **cdc_set_fullrowlogging()** - Run the following script as user informix from the `$INFORMIXDIR/etc/syscdcv1.sql` --- ### Informix CDC : Write an App [Writing an application to capture data changes](https://www.ibm.com/docs/en/informix-servers/14.10?topic=api-writing-application-capture-data-changes) 1. connect to `syscdcv1` 2. cdc_opensess() 3. cdc_set_fullrowlogging() 4. cdc_startcapture() 5. cdc_activatesess() 6. while(true) { mi_lo_read() } 7. **blah blah blah** 8. cdc_endcapture() 9. cdc_set_fullrowlogging() 10. cdc_closesess() --- ```java IfxDataSource ds = new IfxDataSource(jdbcUrl); IfxCDCEngine.Builder builder = new IfxCDCEngine.Builder(ds); builder.sequenceId(seqId); builder.watchTable("testdb:informix:hello", "a", "b"); builder.watchTable("testdb:informix:systables", "tabname", "owner", "partnum"); builder.watchTable("testdb:informix:systables", "tabname", "owner", "partnum"); try (IfxCDCEngine engine = builder.build()) { engine.init(); IfmxStreamRecord record = null; while ((record = engine.getRecord()) != null) { if (record instanceof IfxCDCOperationRecord) { // ... } else if (record instanceof IfxCDCMetaDataRecord) { // ... } else if (record instanceof IfxCDCBeginTransactionRecord) { // ... } else if (record instanceof IfxCDCCommitTransactionRecord) { // ... } else if (record instanceof IfxCDCTimeoutRecord) { // ... } else { // ... } } ``` --- ## Informix CDC: Format of Record - [Format of CDC records ](https://www.ibm.com/docs/en/informix-servers/14.10?topic=records-format-cdc) - [The CDC_REC_BEGINTX record](https://www.ibm.com/docs/en/SSGU8G_14.1.0/com.ibm.cdc.doc/ids_cdc_030.htm) - [The CDC_REC_INSERT record](https://www.ibm.com/docs/en/SSGU8G_14.1.0/com.ibm.cdc.doc/ids_cdc_035.htm) - [The CDC_REC_UPDAFT record](https://www.ibm.com/docs/en/SSGU8G_14.1.0/com.ibm.cdc.doc/ids_cdc_040.htm) - [The CDC_REC_UPDBEF record](https://www.ibm.com/docs/en/SSGU8G_14.1.0/com.ibm.cdc.doc/ids_cdc_041.htm) --- ## The Debezium-Informix-Connector - https://github.com/laoflch/debezium-informix-connector - https://github.com/leoncamel/debezium-informix-connector --- ## Local Dev Environment: Kafka ```shell= $KAFKA_ROOT/bin/zookeeper-server-start.sh $KAFKA_ROOT/config/zookeeper.properties $KAFKA_ROOT/bin/kafka-server-start.sh $KAFKA_ROOT/config/server.properties ``` ```shell= $KAFKA_ROOT/bin/connect-standalone.sh connect-standalone.properties connect-informix-source.properties ``` --- ### Kafka-connect-standalone.properties ```text bootstrap.servers=localhost:9092 key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore offset.storage.file.filename=/home/xiaolin/local/kafka/kafka_2.13-2.4.1/tmp/informix/connect.offsets offset.flush.interval.ms=10000 plugin.path=/home/xiaolin/local/kafka/kafka_2.13-2.4.1/connect-plugins/ ``` --- ### Kafka-informix-source.properties ```text name=informix-connector-214414 connector.class=laoflch.debezium.connector.informix.InformixConnector tasks.max=1 database.server.name=informix-214414 database.hostname=127.0.0.1 database.port=9088 database.user=informix database.password=in4mix database.dbname=testdb database.history=io.debezium.relational.history.FileDatabaseHistory database.history.file.filename=/home/xiaolin/local/kafka/kafka_2.13-2.4.1/tmp/informix/dbhistory.dat # from embedded's properties database.server.id=89 # Snapshot Mode: Initial_Schema_Ony vs Initial snapshot.mode=initial_schema_only # TODO: informix.columntype.blacklist=lvarchar,blob, xxxx # from source code # schema.whitelist=cdctable # schema.whitelist=informix # schema.blacklist= # table.whitelist=customer # table.whitelist=testdb.informix.customer # table.whitelist=hello # table.blacklist= # Decimal Handling # decimal.handling.mode=precise decimal.handling.mode=string # decimal.handling.mode=double provide.transaction.metadata=true ``` --- ### Debug from IntelliJ ```shell= export KAFKA_DEBUG=1 export DEBUG_SUSPEND_FLAG=y export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$SCRIPT_DIR/xiaolin-config/connect-log4j.properties" $KAFKA_ROOT/bin/connect-standalone.sh connect-standalone.properties connect-informix-source.properties ``` --- Data Path ```plantuml @startuml Actor user box "Informix" user -> Informix : Insert INTO Values(x, x) Informix -> Log : Begin Transaction end box box "Debezium Informix Connector" #LightBlue Log -> InformixCDCEngine : Record.Begin InformixCDCEngine -> InformixStreamingChangeEventSource : Informix -> Log : Insert Log -> InformixCDCEngine : Record.Insert Informix -> Log : Insert Log -> InformixCDCEngine : Record.Insert Informix -> Log : Insert Log -> InformixCDCEngine : Record.Insert InformixCDCEngine -> InformixStreamingChangeEventSource : Informix -> Log : Commit Transaction Log -> InformixCDCEngine : Record.Commit InformixCDCEngine -> InformixStreamingChangeEventSource : InformixStreamingChangeEventSource -> EventDispatcher : handleCommit EventDispatcher -> ChangeEventQueue : enqueue ChangeEventQueue -> EmbeddedEngine : task.poll() ChangeEventQueue -> EmbeddedEngine : task.poll() EmbeddedEngine -> DebeziumEngine.ChangeConsumer : handleBatch() DebeziumEngine.ChangeConsumer -> RecordCommiter : commit() RecordCommiter -> OffsetWriter : maybeFlush() note across: finally EmbeddedEngine -> OffsetWriter : commitOffset() end box @enduml ``` --- ### Integration Tests --- ## TODO and Future Jobs - 1 - Handle `TRUNCATE` - Column-Type-Blacklist: LVARCHAR, SMALLFLOAT, Decimal(x, 8), etc - Integration Tests - More Types - `UPDATE clause` --- ## TODO and Future Jobs - 2 - Metrics - Reference: https://github.com/debezium/debezium/blob/main/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeEventSourceMetricsFactory.java - Schema Change - Watch `systables`, `sysfragments`, `syscolumns` - Restart Informix CDC Engine on Schema Changes - Table attach(Config from properties) - Handle Various Faults and corner cases --- ### Thank you! :tada: <style> </style>
{"metaMigratedAt":"2023-06-17T01:57:10.807Z","metaMigratedFrom":"YAML","title":"Debezium and Informix-Connector","breaks":true,"description":"View the slide with \"Slide Mode\".","contributors":"[{\"id\":\"a1ee51f7-f123-4007-b226-71827c93bcfc\",\"add\":16656,\"del\":7201}]"}
    2440 views