ADB Spark Connector
===
<style>
.reveal {
font-size: 32px;
}
</style>
#### Функционал коннектора
<!-- .slide: style="font-size: 20px;" -->
* чтение данных из ADB
* запись данных в ADB с помощью различных режимов записи
* overwrite
* append
* errorIfExists
* поддержка структурированных данных
* автоматическое формирование схемы данных
* настраиваемое партиционирование
* целочисленные типы
* даты
* push-down операторов:
* отсекание колонок
* push-down фильтров
* извлечение дополнительных метаданных из ADB
* схема распределения данных
* статистика
* оптимизация count-ов
* выполнение произвольного sql через master ADB
* батч режим
---
#### Компонентная архитектура

---
#### Диаграмма чтения данных

---
#### Диаграмма записи данных

---
#### Поддерживаемые типы данных
<!-- .slide: style="font-size: 16px;" -->
### ADB to Spark
| ADB Data Type| Spark Data Type |
| -------- | -------- |
| bigint | LongType |
| bigSerial | LongType |
| bit | StringType |
| bytea | BinaryType |
| boolean | BooleanType |
| char | StringType |
| date | DateType |
| decimal | DecimalType |
| float4 | FloatType |
| float8 | DoubleType |
| int | IntegerType |
| interval | CalendarIntervalType |
| serial | IntegerType |
| smallInt | ShortType |
| text | StringType |
| time | TimeStampType |
| timestamp | TimeStampType |
| timestamptz | TimeStampType |
| timetz | TimeStampType |
| varchar | StringType |
---
#### Поддерживаемые типы данных
<!-- .slide: style="font-size: 16px;" -->
### Spark to ADB
| Spark Data Type | ADB Data Type |
| --------------- | -------- |
| BinaryType | bytea |
| BooleanType | boolean |
| CalendarIntervalType | interval |
| DateType | date |
| DecimalType | numeric |
| DoubleType | float8 |
| FloatType | float4 |
| IntegerType | int |
| LongType | bigint |
| ShortType | smallInt |
| StringType | text |
| TimeStampType | timestamp |
---
#### Вставка данных. Описание режимов работы. Дополнительные опции.
<!-- .slide: style="font-size: 20px;" -->
##### overwrite
Режим перезаписи таблицы
##### Вспомогательные опции:
* adb.create.table.with - with выражение в команде create table ADB
* adb.create.table.distributedby - distributed by выражение в команде create table ADB
* truncate - если true, то просто производим транкейт таблицы
</br>
##### append
Режим append-а данных в уже существующую таблицу
##### errorIfExists
Завершаем работу с ошибкой, если таблица уже существует, в остальном как overwrite
---
#### Примеры кода
Пример инициализации загрузки данных из ADB в Spark
```scala=
val crimes = spark
.read
.format("io.arenadata.spark.adb.spark.AdbDataSource")
.options(
Map(// JDBC ADB Master
"url" -> "jdbc:postgresql://10.92.6.137:5432/spark",
"user" -> "spark",
"password" -> "Orion123",
"dbschema" -> "test_data",
"dbtable" -> "crimes"
)
).load()
.withColumn("lat",$"lat".cast(DecimalType(38,18)))
.withColumn("long",$"long".cast(DecimalType(38,18)))
.as[Crime]
```
---
#### Примеры кода
* Пример инициализации загрузки данных из Spark в ADB
```scala=
finalResult // Существующий DataFrame
.write
.format("io.arenadata.spark.adb.spark.AdbDataSource")
.options(
Map(// JDBC ADB Master
"url" -> "jdbc:postgresql://10.92.6.137:5432/spark",
"user" -> "spark",
"password" -> "Orion123",
"dbschema" -> "test_data",
"dbtable" -> "crimes_final_result"
))
.mode(SaveMode.Overwrite)
.save()
```
---
#### Пример. Автоматическое формирование схемы.
<!-- .slide: style="font-size: 12px;" -->
```scala=
val crimes = spark
.read
.format("io.arenadata.spark.adb.spark.AdbDataSource")
.options(
Map(
"url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
"user" -> "spark",
"password" -> "Orion123",
"dbschema" -> "test_data",
"dbtable" -> "crimes"
)
).load()
crimes.printSchema()
```


---
#### Пример. Партиционирование. Целочисленные типы.
<!-- .slide: style="font-size: 16px;" -->
```scala=
val crimes = spark
.read
.format("io.arenadata.spark.adb.spark.AdbDataSource")
.options(
Map(
"url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
"user" -> "spark",
"password" -> "Orion123",
"dbschema" -> "test_data",
"dbtable" -> "crimes",
"partition.column" -> "offense_code",
"partition.count" -> "8"
)
).load()
.withColumn("lat",$"lat".cast(DecimalType(38,18)))
.withColumn("long",$"long".cast(DecimalType(38,18)))
.as[Crime]
crimes.show(10)
```
21/01/25 08:01:58 INFO AdbPartitioner: Number of partitions: 8, WHERE clauses of these partitions: "offense_code" < 576 or "offense_code" is null, "offense_code" >= 576 AND "offense_code" < 1041, "offense_code" >= 1041 AND "offense_code" < 1506, "offense_code" >= 1506 AND "offense_code" < 1971, "offense_code" >= 1971 AND "offense_code" < 2436, "offense_code" >= 2436 AND "offense_code" < 2901, "offense_code" >= 2901 AND "offense_code" < 3366, "offense_code" >= 3366

---
#### Пример. Партиционирование. Даты.
<!-- .slide: style="font-size: 16px;" -->
```scala=
val crimes = spark
.read
.format("io.arenadata.spark.adb.spark.AdbDataSource")
.options(
Map(
"url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
"user" -> "spark",
"password" -> "Orion123",
"dbschema" -> "test_data",
"dbtable" -> "crimes",
"partition.column" -> "occured_on_date",
"partition.count" -> "8"
)
).load()
.withColumn("lat",$"lat".cast(DecimalType(38,18)))
.withColumn("long",$"long".cast(DecimalType(38,18)))
.as[Crime]
crimes.show(10)
```
21/01/25 08:10:39 INFO AdbPartitioner: Number of partitions: 8, WHERE clauses of these partitions: "occurred_on_date" < '2015-11-09 02:40:37.5' or "occurred_on_date" is null, "occurred_on_date" >= '2015-11-09 02:40:37.5' AND "occurred_on_date" < '2016-04-04 05:21:15', "occurred_on_date" >= '2016-04-04 05:21:15' AND "occurred_on_date" < '2016-08-29 08:01:52.5', "occurred_on_date" >= '2016-08-29 08:01:52.5' AND "occurred_on_date" < '2017-01-23 10:42:30', "occurred_on_date" >= '2017-01-23 10:42:30' AND "occurred_on_date" < '2017-06-19 13:23:07.5', "occurred_on_date" >= '2017-06-19 13:23:07.5' AND "occurred_on_date" < '2017-11-13 16:03:45', "occurred_on_date" >= '2017-11-13 16:03:45' AND "occurred_on_date" < '2018-04-09 18:44:22.5', "occurred_on_date" >= '2018-04-09 18:44:22.5'

---
#### Пример. push-down операторов. Отсекание колонок.
```scala=
val crimes = spark
.read
.format("io.arenadata.spark.adb.spark.AdbDataSource")
.options(
Map(
"url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
"user" -> "spark",
"password" -> "Orion123",
"dbschema" -> "test_data",
"dbtable" -> "crimes"
)
).load()
crimes.select($"incident_number").collect()
```

---
#### Пример. push-down фильтров.
```scala=
val crimes = spark
.read
.format("io.arenadata.spark.adb.spark.AdbDataSource")
.options(
Map(
"url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
"user" -> "spark",
"password" -> "Orion123",
"dbschema" -> "test_data",
"dbtable" -> "crimes"
)
).load()
crimes.filter(expr("incident_number = 'I182070906'")).show()
```


---
#### Пример. Извлечение дополнительных метаданных из ADB. Схема распределения.
<!-- .slide: style="font-size: 20px;" -->
```scala=
val crimes = spark
.read
.format("io.arenadata.spark.adb.spark.AdbDataSource")
.options(
Map(
"url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
"user" -> "spark",
"password" -> "Orion123",
"dbschema" -> "test_data",
"dbtable" -> "crimes"
)
).load()
crimes.groupBy($"month").agg(expr("count(distinct incident_number)")).show()
```

---
#### Пример. Извлечение дополнительных метаданных из ADB. Статистика.
```scala=
val crimes = spark
.read
.format("io.arenadata.spark.adb.spark.AdbDataSource")
.options(
Map(
"url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
"user" -> "spark",
"password" -> "Orion123",
"dbschema" -> "test_data",
"dbtable" -> "crimes"
)
).load()
crimes.collect()
```

---
#### Пример. Извлечение дополнительных метаданных из ADB. Оптимизация count-ов.
<!-- .slide: style="font-size: 26px;" -->
```scala=
crimes.filter($"month" === 10)
.agg(expr("count(incident_number)"))
.show()
```

---
#### Пример. Выполнение произвольного sql через master ADB.
<!-- .slide: style="font-size: 16px;" -->
```scala=
lazy val spark: SparkSession = {
SparkSession
.builder()
.master("spark://10.92.6.7:7077")
.appName("spark_example")
.config("spark.adb.url","jdbc:postgresql://10.92.3.151:5432/spark")
.config("spark.adb.driver","org.postgresql.Driver")
.config("spark.adb.user","spark")
.config("spark.adb.password","Orion123")
.getOrCreate()
import io.arenadata.spark.adb.implicits
val crimes = spark.executeAdbSelectQueryOnMaster("select * from test_data.crimes;")
spark.executeAdbQueryOnMaster("create table test_data.test_table_query(id int);")
spark.executeAdbQueryOnMaster("insert into test_data.test_table_query values(1);")
spark.executeAdbQueryOnMaster("insert into test_data.test_table_query values(2);")
spark.executeAdbQueryOnMaster("insert into test_data.test_table_query values(3);")
val test = spark.executeAdbSelectQueryOnMaster("select * from test_data.test_table_query;")
crimes.show(10)
test.show(10)
```


---
#### Пример. Batch режим.
```scala=
val crimes = spark
.read
.format("io.arenadata.spark.adb.spark.AdbDataSource")
.options(
Map(
"url" -> "jdbc:postgresql://10.92.3.151:5432/spark",
"user" -> "spark",
"password" -> "Orion123",
"dbschema" -> "test_data",
"dbtable" -> "crimes",
"batch.enable" -> "true",
"batch.memoryMode" -> "ON_HEAP"
)
).load()
crimes.show()
```

---
#### Пример. Схема
* Версия Spark 2.3, ADB 6.8

Данные: https://www.kaggle.com/AnalyzeBoston/crimes-in-boston
---
#### Возможные улучшения
* Поддержка стриминга?
* Более гибкое партиционирование?
Ждем требований от заказчиков!
{"metaMigratedAt":"2023-06-15T14:33:32.208Z","metaMigratedFrom":"Content","title":"ADB Spark Connector","breaks":true,"contributors":"[{\"id\":\"0eed3978-94a3-41ab-9e71-1ce3fda54d8b\",\"add\":1354,\"del\":666},{\"id\":\"3a595fc4-ac64-4bea-8991-096fed009c12\",\"add\":24168,\"del\":13430}]"}