# Debezium and Informix Connector
:construction_worker: Xiaolin Zhang @2022-06-01 :child:
---
## Agenda
#### What is this talk?
- Overview of Debezium
- Internal Architect of Debezium-Informix-Connector
- Some Details
- Demo & Read some source code
#### What is **NOT**?
- Explain everything about Kafka/Debezium/Informix
Interrupt me **ANYTIME** when you have questions
---
## Who am I?
### Xiaolin
- A software engineer
- Emacs/Vim :heart:
- Markdown/AsciiDoc/OrgMod :heart:
- Linux/Unix/Open Source :heart:
---
### How to Read `Debezium`
https://youtu.be/G7TvRzPQH-U?t=77
{%youtube G7TvRzPQH-U %}
---
## Some advices
- 3. Blogs & Diagrams & Articles
- 2. [Read the Fucking Manual](https://en.wikipedia.org/wiki/RTFM)
- 1. Read the Fucking Source Code
```
“RTFSC—Read The F**king Source Code.”
- Linus Torvalds
```
---
## Change Data Capture
- Incremental From Database

---
## CDC Use Cases: Why Kafka?
- Messages have a **key**
- Guaranteed **ordering** (per partition)
- Pull-based
- Supports compaction
- Scales horizontally

---
## Debezium & Kafka-connect

---
## Debezium
- Kafka
- Kafka-Connect
- Debezium
- Connectors
- [MongoDB](https://github.com/debezium/debezium/tree/main/debezium-connector-mongodb)
- [MySQL](https://github.com/debezium/debezium/tree/main/debezium-connector-mysql)
- [Oracle](https://github.com/debezium/debezium/tree/main/debezium-connector-oracle)
- [PostgreSQL](https://github.com/debezium/debezium/tree/main/debezium-connector-postgres)
- [SQLServer](https://github.com/debezium/debezium/tree/main/debezium-connector-sqlserver)
- https://github.com/debezium/debezium-connector-db2
---
## Debezium Concepts
- Snapshot
- InformixDatabaseSchema
- HistorizedRelationalDatabaseSchema
- InformixConnector
- SourceConnector
- InformixConnectorConfig
- HistorizedRelationalDatabaseConnectorConfig
- InformixConnectorTask
- BaseSourceTask
---
## Debezium Concepts 2
- InformixOffsetContext
- OffsetContext
- InformixSnapshotChangeEventSource
- RelationalSnapshotChangeEventSource
- AbstractSnapshotChangeEventSource
- SnapshotChangeEventSource
- InformixStreamingChangeEventSource
- StreamingChangeEventSource
- ChangeEventSource
- Lsn
- SourceInfo
---
Informix API
---
### Informix CDC Official Manual
- https://www.ibm.com/docs/en/informix-servers/14.10?topic=api-change-data-capture
---
### Informix LSN
Refrence: https://www.ibm.com/docs/en/informix-servers/12.10?topic=api-cdc-record-sequence-numbers
- The sequence number associated with a CDC record is a BIGINT data type.
```java=
IfxDataSource ds = new IfxDataSource(jdbcUrl);
IfxCDCEngine.Builder builder = new IfxCDCEngine.Builder(ds);
builder.sequenceId(seqId); // <- LSN
```
---
### Informix CDC Prepare
[Preparing to use the Change Data Capture API](https://www.ibm.com/docs/en/informix-servers/14.10?topic=api-preparing-use-change-data-capture)
- **cdc_set_fullrowlogging()**
- Run the following script as user informix from the `$INFORMIXDIR/etc/syscdcv1.sql`
---
### Informix CDC : Write an App
[Writing an application to capture data changes](https://www.ibm.com/docs/en/informix-servers/14.10?topic=api-writing-application-capture-data-changes)
1. connect to `syscdcv1`
2. cdc_opensess()
3. cdc_set_fullrowlogging()
4. cdc_startcapture()
5. cdc_activatesess()
6. while(true) { mi_lo_read() }
7. **blah blah blah**
8. cdc_endcapture()
9. cdc_set_fullrowlogging()
10. cdc_closesess()
---
```java
IfxDataSource ds = new IfxDataSource(jdbcUrl);
IfxCDCEngine.Builder builder = new IfxCDCEngine.Builder(ds);
builder.sequenceId(seqId);
builder.watchTable("testdb:informix:hello", "a", "b");
builder.watchTable("testdb:informix:systables", "tabname", "owner", "partnum"); builder.watchTable("testdb:informix:systables", "tabname", "owner", "partnum");
try (IfxCDCEngine engine = builder.build()) {
engine.init();
IfmxStreamRecord record = null;
while ((record = engine.getRecord()) != null) {
if (record instanceof IfxCDCOperationRecord) {
// ...
} else if (record instanceof IfxCDCMetaDataRecord) {
// ...
} else if (record instanceof IfxCDCBeginTransactionRecord) {
// ...
} else if (record instanceof IfxCDCCommitTransactionRecord) {
// ...
} else if (record instanceof IfxCDCTimeoutRecord) {
// ...
} else {
// ...
}
}
```
---
## Informix CDC: Format of Record
- [Format of CDC records
](https://www.ibm.com/docs/en/informix-servers/14.10?topic=records-format-cdc)
- [The CDC_REC_BEGINTX record](https://www.ibm.com/docs/en/SSGU8G_14.1.0/com.ibm.cdc.doc/ids_cdc_030.htm)
- [The CDC_REC_INSERT record](https://www.ibm.com/docs/en/SSGU8G_14.1.0/com.ibm.cdc.doc/ids_cdc_035.htm)
- [The CDC_REC_UPDAFT record](https://www.ibm.com/docs/en/SSGU8G_14.1.0/com.ibm.cdc.doc/ids_cdc_040.htm)
- [The CDC_REC_UPDBEF record](https://www.ibm.com/docs/en/SSGU8G_14.1.0/com.ibm.cdc.doc/ids_cdc_041.htm)
---
## The Debezium-Informix-Connector
- https://github.com/laoflch/debezium-informix-connector
- https://github.com/leoncamel/debezium-informix-connector
---
## Local Dev Environment: Kafka
```shell=
$KAFKA_ROOT/bin/zookeeper-server-start.sh $KAFKA_ROOT/config/zookeeper.properties
$KAFKA_ROOT/bin/kafka-server-start.sh $KAFKA_ROOT/config/server.properties
```
```shell=
$KAFKA_ROOT/bin/connect-standalone.sh connect-standalone.properties connect-informix-source.properties
```
---
### Kafka-connect-standalone.properties
```text
bootstrap.servers=localhost:9092
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.storage.file.filename=/home/xiaolin/local/kafka/kafka_2.13-2.4.1/tmp/informix/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/home/xiaolin/local/kafka/kafka_2.13-2.4.1/connect-plugins/
```
---
### Kafka-informix-source.properties
```text
name=informix-connector-214414
connector.class=laoflch.debezium.connector.informix.InformixConnector
tasks.max=1
database.server.name=informix-214414
database.hostname=127.0.0.1
database.port=9088
database.user=informix
database.password=in4mix
database.dbname=testdb
database.history=io.debezium.relational.history.FileDatabaseHistory
database.history.file.filename=/home/xiaolin/local/kafka/kafka_2.13-2.4.1/tmp/informix/dbhistory.dat
# from embedded's properties
database.server.id=89
# Snapshot Mode: Initial_Schema_Ony vs Initial
snapshot.mode=initial_schema_only
# TODO:
informix.columntype.blacklist=lvarchar,blob, xxxx
# from source code
# schema.whitelist=cdctable
# schema.whitelist=informix
# schema.blacklist=
# table.whitelist=customer
# table.whitelist=testdb.informix.customer
# table.whitelist=hello
# table.blacklist=
# Decimal Handling
# decimal.handling.mode=precise
decimal.handling.mode=string
# decimal.handling.mode=double
provide.transaction.metadata=true
```
---
### Debug from IntelliJ
```shell=
export KAFKA_DEBUG=1
export DEBUG_SUSPEND_FLAG=y
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$SCRIPT_DIR/xiaolin-config/connect-log4j.properties"
$KAFKA_ROOT/bin/connect-standalone.sh connect-standalone.properties connect-informix-source.properties
```
---
Data Path
```plantuml
@startuml
Actor user
box "Informix"
user -> Informix : Insert INTO Values(x, x)
Informix -> Log : Begin Transaction
end box
box "Debezium Informix Connector" #LightBlue
Log -> InformixCDCEngine : Record.Begin
InformixCDCEngine -> InformixStreamingChangeEventSource :
Informix -> Log : Insert
Log -> InformixCDCEngine : Record.Insert
Informix -> Log : Insert
Log -> InformixCDCEngine : Record.Insert
Informix -> Log : Insert
Log -> InformixCDCEngine : Record.Insert
InformixCDCEngine -> InformixStreamingChangeEventSource :
Informix -> Log : Commit Transaction
Log -> InformixCDCEngine : Record.Commit
InformixCDCEngine -> InformixStreamingChangeEventSource :
InformixStreamingChangeEventSource -> EventDispatcher : handleCommit
EventDispatcher -> ChangeEventQueue : enqueue
ChangeEventQueue -> EmbeddedEngine : task.poll()
ChangeEventQueue -> EmbeddedEngine : task.poll()
EmbeddedEngine -> DebeziumEngine.ChangeConsumer : handleBatch()
DebeziumEngine.ChangeConsumer -> RecordCommiter : commit()
RecordCommiter -> OffsetWriter : maybeFlush()
note across: finally
EmbeddedEngine -> OffsetWriter : commitOffset()
end box
@enduml
```
---
### Integration Tests
---
## TODO and Future Jobs - 1
- Handle `TRUNCATE`
- Column-Type-Blacklist: LVARCHAR, SMALLFLOAT, Decimal(x, 8), etc
- Integration Tests
- More Types
- `UPDATE clause`
---
## TODO and Future Jobs - 2
- Metrics
- Reference: https://github.com/debezium/debezium/blob/main/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeEventSourceMetricsFactory.java
- Schema Change
- Watch `systables`, `sysfragments`, `syscolumns`
- Restart Informix CDC Engine on Schema Changes
- Table attach(Config from properties)
- Handle Various Faults and corner cases
---
### Thank you! :tada:
<style>
</style>
{"metaMigratedAt":"2023-06-17T01:57:10.807Z","metaMigratedFrom":"YAML","title":"Debezium and Informix-Connector","breaks":true,"description":"View the slide with \"Slide Mode\".","contributors":"[{\"id\":\"a1ee51f7-f123-4007-b226-71827c93bcfc\",\"add\":16656,\"del\":7201}]"}