1. Hook-up, link to java/go libraries ✔
2. Why Rust (link to driver) ✔
3. Installing ✔
4. Create log reader and consumer ✔
5. Consuming CDC changes (printer - example how to build and run printer) ✔
6. Saving progress ✔?
7. Advanced application: table replication using CDC log ✔
8. Benchmarks (Testing configuration, test cases)
9. Result
10. Summary
## CDC log consuming library in Rust
So, once again let’s recall to ourselves a nice feature in ScyllaDB
called [Change Data Capture (CDC)](https://www.scylladb.com/2020/07/23/using-change-data-capture-cdc-in-scylla/).
It allows tracking the operations that update the data in a cluster.
To make use of the CDC feature there already exist libraries in Java and Go languages that will simplify consuming the
CDC logs.
[Here](https://www.scylladb.com/2021/02/09/consuming-cdc-with-java-and-go/) you can find useful information about why
such a library is necessary and helpful for writing CDC consuming applications and what are the main challenges that the
libraries overcome.
Recently,
an [internal developer hackathon](https://www.scylladb.com/2021/02/17/scylla-developer-hackathon-rust-driver/) in
ScyllaDB has given rise to the
open-source [ScyllaDB Rust Driver](https://www.scylladb.com/2021/07/13/scylla-rust-driver-update-and-benchmarks/).
The development of the driver continued with new features such as authentication support with clusters that require
username+password authentication,
configurable load balancing algorithms, and as
the [benchmarks](https://www.scylladb.com/2021/07/13/scylla-rust-driver-update-and-benchmarks/) have shown the driver is
very competitive in terms of scalability and performance.
So, it is exciting how would perform a CDC log consuming library written in Rust which indeed will be based on the Rust
driver.
In fact, this is an academic project co-organized with the University of Warsaw, where a group of 4 students were
engaged in implementing the Rust library,
a table replicating application based on the Rust library and performing benchmarks.
This blog post aims to serve as a tutorial for the reader to write an application that processes CDC logs using the Rust
library, and to show the competitiveness of the Rust library in comparison to Java and Go libraries.
### Getting Started with Rust
Let’s see how to install and use the Rust library in a simple program that will print the changes happening to a
table in real-time.
You can find the source code [here](https://github.com/piodul/scylla-cdc-rust/tree/main/scylla-cdc-printer).
### Installing the library
The latest version of the library is available [here](https://github.com/piodul/scylla-cdc-rust).
The latest release of the library is available on [crates.io](todo).
You can integrate it in your application adding the following dependency in your `Cargo.toml` file:
```toml
scylla-cdc = { git = "https://github.com/piodul/scylla-cdc-rust" }
```
You can also build the library from the source code:
```shell
git clone https://github.com/piodul/scylla-cdc-rust
cd scylla-cdc-rust
cargo build
```
### Setting up the CDC consumer
We need to create a `CDCLogReader` instance step by step providing required configuration through
a `CDCLogReaderBuilder` instance.
First, we need to establish connection with a ScyllaDB cluster using the Rust driver:
```rust
let session: Session = SessionBuilder::new().known_node("127.0.0.1:9042").build().await.unwrap();
```
Next, we need to provide the base table name: keyspace and the table name.
Note that the provided names should be of the base table but not the CDC log table(e.g. `t.test` and
not `t.test_scylla_cdc_log`):
```rust
let keyspace: & str = "t";
let table: & str = "test";
```
Lastly, we should provide a change consumer factory which should implement `ConsumerFactory` trait, and
we need to define a consumer type which should implement `Consumer` trait:
```rust
struct DummyConsumer;
#[async_trait]
impl Consumer for DummyConsumer {
async fn consume_cdc(&mut self, data: CDCRow<'_>) -> anyhow::Result<()> {
// ... consume received data ...
Ok(())
}
}
struct DummyConsumerFactory;
#[async_trait]
impl ConsumerFactory for DummyConsumerFactory {
async fn new_consumer(&self) -> Box<dyn Consumer> {
Box::new(DummyConsumer)
}
}
```
Each consumer instance will consume changes from a single stream in chronological order. The `CDCLogReader` instance
will
process generations one by one, and when it starts to process the next generation then it stops the consumers for the
previous
generation. The library will create new consumer for each stream in the current generation and will manage their
lifetime, so
it is necessary to provide a change consumer factory. The `CDCLogReader` instance will spawn a `tokio` task which will
periodically fetch changes associated with certain streams in user specified window sizes and feed those changes to
consumers associated with those streams. When the `CDCLogReader` configuration is ready we can build an instance which will
automatically start the reading process.
```rust
let (cdc_log_printer, handle): (CDCLogReader, RemoteHandle<anyhow::Result<()>>) = CDCLogReaderBuilder::new()
.session(Arc::new(session))
.keyspace("log_reader_ks")
.table_name("log_reader_table")
.consumer_factory(Arc::new(DummyConsumerFactory))
.build()
.await?;
```
### Consuming CDC changes
Let's try to implement a printer application that shows what kind of information is available about the change.
The `CDCRow` object represents a single row in a CDC log table.
There are two types of columns in a `CDCRow` object:
* [CDC log columns](https://docs.scylladb.com/using-scylla/cdc/cdc-log-table/) (cdc\$stream_id, cdc\$time, etc)
* base table columns
First, let's show information that is independent of the schema of the base table (cdc\$stream\_id, cdc\$time, etc).
```rust
let stream_id = data.stream_id.to_string();
let (unix_timestamp, _) = data.time.get_timestamp().unwrap().to_unix();
let timestamp = NaiveDateTime::from_timestamp(unix_timestamp as i64, 0).to_string();
let operation = data.operation.to_string();
let batch_seq_no = data.batch_seq_no.to_string();
let end_of_batch = data.end_of_batch.to_string();
let time_to_live = data.ttl.map_or("null".to_string(), | ttl| ttl.to_string());
```
Unfortunately, a `CDCRow` object does not contain details about the change schema such as data types of a column or
whether a column is part of the primary key.
So, let's print the values in debug format of the Rust language:
```
if data.column_exists(column) {
if let Some(value) = data.get_value(column) {
row_to_print.push_str(&print_field(
value_field_name.as_str(),
format!("{:?}", value).as_str(),
));
} else {
row_to_print.push_str(&print_field(value_field_name.as_str(), "null"));
}
}
```
### Full Example
Full source code is available [here](https://github.com/piodul/scylla-cdc-rust/tree/main/scylla-cdc-printer), and you
can run it with the following commands:
```shell
git clone https://github.com/piodul/scylla-cdc-rust.git
cd scylla-cdc-rust
cargo build
cd target/debug
./scylla-cdc-printer --keyspace=t --table=test --hostname="127.0.0.1:9042"
```
Here is an example result that our application has printed for a specific cdc row:

### Saving progress
The library supports saving progress and restoring the last saved point in time for a particular stream. Imagine that
there is a sudden power outage and you have to start the whole process again. By periodically saving progress we can
eliminate the need to start from the very beginning in such cases.
**Note:** It is an optional feature and you need to explicitly enable it.
If saving is enabled, we create a new `Tokio` task that will periodically save the beginning of the next time window.
To enable this feature you need an object that implements `CDCCheckpointSaver` trait. We provide an example
implementation called `TableBackedCheckpointSaver` that saves progress in Scylla table alongside the base table and CDC
log table.
To create `TableBackedCheckpointSaver`:
```rust=
let user_checkpoint_saver =
TableBackedCheckpointSaver::new_with_default_ttl(
session, "t", "checkpoints"
)
.await
.unwrap();
```
**Note:** You need to provide a custom table name that is not in use in given keyspace.
Now we can tell `CDCLogReaderBuilder` that we want to save/load progress using `user_checkpoint_saver`:
```rust=
let (log_reader, handle) = CDCLogReaderBuilder::new()
// ...
.should_save_progress(true) // Mark that we want to save progress.
.should_load_progress(true) // Mark that we want to start consuming CDC logs from the last saved checkpoint.
.pause_between_saves(time::Duration::from_millis(100)) // Save progress each 100 ms. If not specified, a default value of 10 seconds is used.
.checkpoint_saver(user_checkpoint_saver) // Use `user_checkpoint_saver to manage checkpoints.
.build();
```
### Replicator
Replicator is another application based on the CDC Rust library that replicates a table from one Scylla cluster to another
by reading CDC changes, translating those changes to a CQL query and performing them on a destination cluster.
#### Install and Run
```shell
git clone git@github.com:piodul/scylla-cdc-rust.git
cd scylla-cdc-rust
cargo build
cd target/debug
./scylla-cdc-replicator -k KEYSPACE -t TABLE -s SRC_ADDRESS -d DST_ADDRESS
```
#### Command-line arguments
```shell
USAGE:
scylla-cdc-replicator [OPTIONS] --keyspace <KEYSPACE> --table <TABLE> --source <SOURCE> --destination <DESTINATION>
OPTIONS:
-d, --destination <DESTINATION> Address of a node in destination cluster
-h, --help Print help information
-k, --keyspace <KEYSPACE> Keyspace name
-s, --source <SOURCE> Address of a node in source cluster
--safety-interval <SAFETY_INTERVAL> Safety interval in seconds [default: 30]
--sleep-interval <SLEEP_INTERVAL> Sleep interval in seconds [default: 10]
--start-datetime <START_DATETIME> Start datetime as RFC 3339 formatted string
-t, --table <TABLE> Table names provided as a comma delimited string
--window-size <WINDOW_SIZE> Window size in seconds [default: 60]
```
Note: You can provide more than one table name as a command-line argument by separating table names with comma delimiter.
(table1;table2;...)
The `main` class reads the command-line arguments and creates a `ReplicatorConsumer` object that implements `Consumer` trait
for each source table. The consumers consume the CDC log changes and depending on the change operation type, whether it is of
type `ROW_INSERT`, `ROW_UPDATE`, etc., it executes a handler for each operation that will translate change to a CQL query
and perform it on the destination cluster.
### Benchmarks

### Results
### Summary