If you are managing an e-commerce website with transactional data stored in TP databases like MySQL, incorporating an OLAP database such as Databend becomes crucial for optimizing architecture and ensuring comprehensive data synchronization to extract valuable insights like conversion funnels. Apache Flink CDC (Change Data Capture) refers to the capability of Apache Flink to capture and process real-time data changes from various sources using SQL-based queries. CDC allows you to monitor and capture data modifications (inserts, updates, and deletes) happening in a database or streaming system and react to those changes in real time. ![](https://hackmd.io/_uploads/Hy6Asm5F3.png) In this tutorial, we will guide you through the process of using Flink CDC to import data from MySQL to Databend in real-time. With minimal requirements of Docker and SQL knowledge, you can easily follow this tutorial without the need for writing complex Java/Scala code or installing an IDE. > Hold on, have you heard of Databend? Databend is a modern cloud data warehouse written in Rust. If you're itching to learn more, head over to databend.rs and prepare to be amazed by the power of Databend! ## Step 1: Set Up Environment In this step, we will use `docker-compose` to deploy MySQL and Databend. Please ensure that you have `docker` and `docker-compose` installed on your system before you begin. 1. Save the following code as separate `docker-compose.yml` files and move each file to a separate folder. **debezium-MySQL** ```yaml version: '2.1' services: postgres: image: debezium/example-postgres:1.1 ports: - "5432:5432" environment: - POSTGRES_DB=postgres - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres mysql: image: debezium/example-mysql:1.1 ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=123456 - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw ``` **Databend** ```yaml version: '3' services: databend: image: datafuselabs/databend volumes: - {PATH}/databend-query.toml:/etc/databend/query.toml environment: QUERY_DEFAULT_USER: databend QUERY_DEFAULT_PASSWORD: databend MINIO_ENABLED: 'true' ports: - '8000:8000' - '9000:9000' - '3307:3307' - '8124:8124' ``` Locate the `databend-query.toml` file in the [Databend release](https://databend.rs/download) and move it to this folder. Then, replace `{PATH}` with the correct path. 2. Navigate to each folder where a `docker-compose.yml` was saved and execute the following command: ```bash docker-compose up -d ``` The command will automatically start all containers defined in the Docker Compose configuration in detached mode. You can use `docker ps` to check if the containers have started successfully. ## Step 2: Prepare Data Create a table in MySQL and populate it with sample data. ```sql CREATE DATABASE mydb; USE mydb; CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512)); ALTER TABLE products AUTO_INCREMENT = 10; INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), (default,"car battery","12V car battery"), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), (default,"hammer","12oz carpenter's hammer"), (default,"hammer","14oz carpenter's hammer"), (default,"hammer","16oz carpenter's hammer"), (default,"rocks","box of assorted rocks"), (default,"jacket","black wind breaker"), (default,"cloud","test for databend"), (default,"spare tire","24 inch spare tire"); ``` Then, create a corresponding target table in Databend. ```sql CREATE TABLE products (id INT NOT NULL, name VARCHAR(255) NOT NULL, description VARCHAR(512) ); ``` ## Step 3: Install Flink Download [Flink](https://flink.apache.org/downloads/) and the following SQL connectors to your system: - Flink SQL connector for Databend: https://github.com/databendcloud/flink-connector-databend/releases - Flink SQL connector for MySQL: https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar - Move the both connector JAR files to the lib folder in your Flink installation directory. **Start Flink:** ```bash cd flink-16.0 ./bin/start-cluster.sh ``` You can now open the Apache Flink Dashboard if you go to `http://localhost:8081` in your browser: ![](https://hackmd.io/_uploads/Hk1axFsF2.png) **Start the Flink SQL Client:** ```bash ./bin/sql-client.sh ``` ![](https://hackmd.io/_uploads/SJMMWYiYn.png) ## Step 4: Sync Data Set the checkpointing interval to 3 seconds, and create corresponding tables with MySQL and Databend connectors in the Flink SQL Client. For the available connection parameters, see https://github.com/databendcloud/flink-connector-databend#connector-options: ```sql Flink SQL> SET execution.checkpointing.interval = 3s; [INFO] Session property has been set. Flink SQL> CREATE TABLE mysql_products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'mydb', 'table-name' = 'products', 'server-time-zone' = 'UTC' ); [INFO] Execute statement succeed. Flink SQL> CREATE TABLE databend_products (id INT,name String,description String, PRIMARY KEY (`id`) NOT ENFORCED) WITH ('connector' = 'databend', 'url'='databend://localhost:8000', 'username'='databend', 'password'='databend', 'database-name'='default', 'table-name'='products', 'sink.batch-size' = '5', 'sink.flush-interval' = '1000', 'sink.ignore-delete' = 'false', 'sink.max-retries' = '3'); [INFO] Execute statement succeed. ``` In the Flink SQL Client, synchronize the data from the *mysql_products* table to the *databend_products* table: ```sql Flink SQL> INSERT INTO databend_products SELECT * FROM mysql_products; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: b14645f34937c7cf3672ffba35733734 ``` You can now see a running job in the Apache Flink Dashboard: ![](https://hackmd.io/_uploads/H1JFZKjY3.png) You're all set! If you query the products table in Databend, you will see that the data from MySQL has been successfully synchronized. ![](https://hackmd.io/_uploads/ByJAWFit2.png) Feel free to make any insertions, updates, or deletions in MySQL. You'll notice that Databend seamlessly reflects the corresponding changes in real-time.