---
# System prepended metadata

title: ADB Spark Connector

---

ADB Spark Connector
===
<style>
.reveal {
  font-size: 32px;
}
</style>

#### Функционал коннектора
<!-- .slide: style="font-size: 20px;" -->
* чтение данных из ADB
* запись данных в ADB с помощью различных режимов записи
    * overwrite
    * append
    * errorIfExists
* поддержка структурированных данных
* автоматическое формирование схемы данных
* настраиваемое партиционирование
    * целочисленные типы
    * даты
* push-down операторов:
    * отсекание колонок
    * push-down фильтров
* извлечение дополнительных метаданных из ADB
    * схема распределения данных
    * статистика 
    * оптимизация count-ов
* выполнение произвольного sql через master ADB
* батч режим


---

#### Компонентная архитектура
![](https://i.imgur.com/yBWk1Mm.png =800x)

---

#### Диаграмма чтения данных
![](https://i.imgur.com/IXD2mIP.png =700x)

---

#### Диаграмма записи данных
![](https://i.imgur.com/uDHVKiY.png =700x)

---

#### Поддерживаемые типы данных
<!-- .slide: style="font-size: 16px;" -->
### ADB to Spark

| ADB Data Type| Spark Data Type | 
| -------- | -------- | 
| bigint | LongType |
| bigSerial  | LongType |
| bit | StringType |
| bytea | BinaryType |
| boolean | BooleanType |
| char | StringType |
| date | DateType |
| decimal | DecimalType |
| float4 | FloatType |
| float8 | DoubleType |
| int | IntegerType |
| interval | CalendarIntervalType  |
| serial | IntegerType |
| smallInt | ShortType |
| text | StringType |
| time | TimeStampType |
| timestamp | TimeStampType |
| timestamptz | TimeStampType |
| timetz | TimeStampType |
| varchar | StringType |

---

#### Поддерживаемые типы данных
<!-- .slide: style="font-size: 16px;" -->
### Spark to ADB

| Spark Data Type | ADB Data Type |
| --------------- | -------- |
| BinaryType | bytea |
| BooleanType | boolean |
| CalendarIntervalType  | interval  |
| DateType | date |
| DecimalType | numeric |
| DoubleType | float8 |
| FloatType | float4 |
| IntegerType | int |
| LongType | bigint |
| ShortType | smallInt |
| StringType | text |
| TimeStampType | timestamp |

---

#### Вставка данных. Описание режимов работы. Дополнительные опции.
<!-- .slide: style="font-size: 20px;" -->
##### overwrite

Режим перезаписи таблицы

##### Вспомогательные опции:
* adb.create.table.with - with выражение в команде create table ADB
* adb.create.table.distributedby - distributed by выражение в команде create table ADB
* truncate - если true, то просто производим транкейт таблицы
</br>
##### append

Режим append-а данных в уже существующую таблицу

##### errorIfExists

Завершаем работу с ошибкой, если таблица уже существует, в остальном как overwrite

---

#### Примеры кода
Пример инициализации загрузки данных из ADB в Spark
```scala=
    val crimes  = spark
      .read
      .format("io.arenadata.spark.adb.spark.AdbDataSource")
      .options(
        Map(// JDBC ADB Master
          "url" -> "jdbc:postgresql://10.92.6.137:5432/spark", 
          "user" -> "spark", 
          "password" -> "Orion123",
          "dbschema" -> "test_data",
          "dbtable" -> "crimes"
        )
      ).load()
      .withColumn("lat",$"lat".cast(DecimalType(38,18)))
      .withColumn("long",$"long".cast(DecimalType(38,18)))
      .as[Crime]
```

---

#### Примеры кода

* Пример инициализации загрузки данных из Spark в ADB

```scala=
    finalResult // Существующий DataFrame
      .write
      .format("io.arenadata.spark.adb.spark.AdbDataSource")
      .options(
        Map(// JDBC ADB Master
          "url" -> "jdbc:postgresql://10.92.6.137:5432/spark", 
          "user" -> "spark",
          "password" -> "Orion123",
          "dbschema" -> "test_data",
          "dbtable" -> "crimes_final_result"
        ))
      .mode(SaveMode.Overwrite)
      .save()
```

---


#### Пример. Автоматическое формирование схемы.
<!-- .slide: style="font-size: 12px;" -->
```scala=
    val crimes  = spark
      .read
      .format("io.arenadata.spark.adb.spark.AdbDataSource")
      .options(
        Map(
          "url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
          "user" -> "spark",
          "password" -> "Orion123",
          "dbschema" -> "test_data",
          "dbtable" -> "crimes"
        )
      ).load()
    crimes.printSchema()
```

![](https://i.imgur.com/V4wa6rG.png)
![](https://i.imgur.com/OyEV4Ce.png)

---

#### Пример. Партиционирование. Целочисленные типы.
<!-- .slide: style="font-size: 16px;" -->
```scala=
    val crimes  = spark
      .read
      .format("io.arenadata.spark.adb.spark.AdbDataSource")
      .options(
        Map(
          "url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
          "user" -> "spark",
          "password" -> "Orion123",
          "dbschema" -> "test_data",
          "dbtable" -> "crimes",
          "partition.column" -> "offense_code",
          "partition.count" -> "8"
        )
      ).load()
      .withColumn("lat",$"lat".cast(DecimalType(38,18)))
      .withColumn("long",$"long".cast(DecimalType(38,18)))
      .as[Crime]

    crimes.show(10)
```

21/01/25 08:01:58 INFO AdbPartitioner: Number of partitions: 8, WHERE clauses of these partitions: "offense_code" < 576 or "offense_code" is null, "offense_code" >= 576 AND "offense_code" < 1041, "offense_code" >= 1041 AND "offense_code" < 1506, "offense_code" >= 1506 AND "offense_code" < 1971, "offense_code" >= 1971 AND "offense_code" < 2436, "offense_code" >= 2436 AND "offense_code" < 2901, "offense_code" >= 2901 AND "offense_code" < 3366, "offense_code" >= 3366


![](https://i.imgur.com/EKkjqPu.png)

---

#### Пример. Партиционирование. Даты.
<!-- .slide: style="font-size: 16px;" -->
```scala=
    val crimes  = spark
      .read
      .format("io.arenadata.spark.adb.spark.AdbDataSource")
      .options(
        Map(
          "url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
          "user" -> "spark",
          "password" -> "Orion123",
          "dbschema" -> "test_data",
          "dbtable" -> "crimes",
          "partition.column" -> "occured_on_date",
          "partition.count" -> "8"
        )
      ).load()
      .withColumn("lat",$"lat".cast(DecimalType(38,18)))
      .withColumn("long",$"long".cast(DecimalType(38,18)))
      .as[Crime]

    crimes.show(10)
```

21/01/25 08:10:39 INFO AdbPartitioner: Number of partitions: 8, WHERE clauses of these partitions: "occurred_on_date" < '2015-11-09 02:40:37.5' or "occurred_on_date" is null, "occurred_on_date" >= '2015-11-09 02:40:37.5' AND "occurred_on_date" < '2016-04-04 05:21:15', "occurred_on_date" >= '2016-04-04 05:21:15' AND "occurred_on_date" < '2016-08-29 08:01:52.5', "occurred_on_date" >= '2016-08-29 08:01:52.5' AND "occurred_on_date" < '2017-01-23 10:42:30', "occurred_on_date" >= '2017-01-23 10:42:30' AND "occurred_on_date" < '2017-06-19 13:23:07.5', "occurred_on_date" >= '2017-06-19 13:23:07.5' AND "occurred_on_date" < '2017-11-13 16:03:45', "occurred_on_date" >= '2017-11-13 16:03:45' AND "occurred_on_date" < '2018-04-09 18:44:22.5', "occurred_on_date" >= '2018-04-09 18:44:22.5'

![](https://i.imgur.com/f5Ijv2g.png)

---

#### Пример. push-down операторов. Отсекание колонок.

```scala=
    val crimes  = spark
      .read
      .format("io.arenadata.spark.adb.spark.AdbDataSource")
      .options(
        Map(
          "url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
          "user" -> "spark",
          "password" -> "Orion123",
          "dbschema" -> "test_data",
          "dbtable" -> "crimes"
        )
      ).load()

     crimes.select($"incident_number").collect()
```
![](https://i.imgur.com/iOf2n8K.png)

---

#### Пример. push-down фильтров.

```scala=
    val crimes  = spark
      .read
      .format("io.arenadata.spark.adb.spark.AdbDataSource")
      .options(
        Map(
          "url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
          "user" -> "spark",
          "password" -> "Orion123",
          "dbschema" -> "test_data",
          "dbtable" -> "crimes"
        )
      ).load()

     crimes.filter(expr("incident_number = 'I182070906'")).show()
```
![](https://i.imgur.com/IUR07nj.png)
![](https://i.imgur.com/PJaIWWB.png)


---

#### Пример. Извлечение дополнительных метаданных из ADB. Схема распределения.
<!-- .slide: style="font-size: 20px;" -->
```scala=
    val crimes  = spark
      .read
      .format("io.arenadata.spark.adb.spark.AdbDataSource")
      .options(
        Map(
          "url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
          "user" -> "spark",
          "password" -> "Orion123",
          "dbschema" -> "test_data",
          "dbtable" -> "crimes"
        )
      ).load()

     crimes.groupBy($"month").agg(expr("count(distinct incident_number)")).show()
```
![](https://i.imgur.com/xJVoe2t.png)

---

#### Пример. Извлечение дополнительных метаданных из ADB. Статистика.

```scala=
    val crimes  = spark
      .read
      .format("io.arenadata.spark.adb.spark.AdbDataSource")
      .options(
        Map(
          "url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
          "user" -> "spark",
          "password" -> "Orion123",
          "dbschema" -> "test_data",
          "dbtable" -> "crimes"
        )
      ).load()

     crimes.collect()
```
![](https://i.imgur.com/FRaeu3q.png)

---

#### Пример. Извлечение дополнительных метаданных из ADB. Оптимизация count-ов.
<!-- .slide: style="font-size: 26px;" -->
```scala=
    crimes.filter($"month" === 10)
    .agg(expr("count(incident_number)"))
    .show()
```
![](https://i.imgur.com/aZGwrk9.png)

---

#### Пример. Выполнение произвольного sql через master ADB.
<!-- .slide: style="font-size: 16px;" -->
```scala=
  lazy val spark: SparkSession = {
    SparkSession
      .builder()
      .master("spark://10.92.6.7:7077")
      .appName("spark_example")
      .config("spark.adb.url","jdbc:postgresql://10.92.3.151:5432/spark")
      .config("spark.adb.driver","org.postgresql.Driver")
      .config("spark.adb.user","spark")
      .config("spark.adb.password","Orion123")
      .getOrCreate()
      
    import io.arenadata.spark.adb.implicits
    val crimes = spark.executeAdbSelectQueryOnMaster("select * from test_data.crimes;")
    spark.executeAdbQueryOnMaster("create table test_data.test_table_query(id int);")
	spark.executeAdbQueryOnMaster("insert into test_data.test_table_query values(1);")
	spark.executeAdbQueryOnMaster("insert into test_data.test_table_query values(2);")
	spark.executeAdbQueryOnMaster("insert into test_data.test_table_query values(3);")
	val test = spark.executeAdbSelectQueryOnMaster("select * from test_data.test_table_query;")
	crimes.show(10)
	test.show(10)      
```
![](https://i.imgur.com/AKn8hLS.png)
![](https://i.imgur.com/5kjCRKg.png)

---

#### Пример. Batch режим.
```scala=
      val crimes  = spark
      .read
      .format("io.arenadata.spark.adb.spark.AdbDataSource")
      .options(
        Map(
          "url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
          "user" -> "spark",
          "password" -> "Orion123",
          "dbschema" -> "test_data",
          "dbtable" -> "crimes",
	  "batch.enable" -> "true",
          "batch.memoryMode" -> "ON_HEAP"
        )
      ).load()

     crimes.show()
```

![](https://i.imgur.com/2SgWhZu.png)

---

#### Пример. Схема

* Версия Spark 2.3, ADB 6.8
![](https://i.imgur.com/zPibNRU.png)

Данные: https://www.kaggle.com/AnalyzeBoston/crimes-in-boston


---

#### Возможные улучшения

* Поддержка стриминга?
* Более гибкое партиционирование?

Ждем требований от заказчиков!

