ADB Spark Connector === <style> .reveal { font-size: 32px; } </style> #### Функционал коннектора <!-- .slide: style="font-size: 20px;" --> * чтение данных из ADB * запись данных в ADB с помощью различных режимов записи * overwrite * append * errorIfExists * поддержка структурированных данных * автоматическое формирование схемы данных * настраиваемое партиционирование * целочисленные типы * даты * push-down операторов: * отсекание колонок * push-down фильтров * извлечение дополнительных метаданных из ADB * схема распределения данных * статистика * оптимизация count-ов * выполнение произвольного sql через master ADB * батч режим --- #### Компонентная архитектура ![](https://i.imgur.com/yBWk1Mm.png =800x) --- #### Диаграмма чтения данных ![](https://i.imgur.com/IXD2mIP.png =700x) --- #### Диаграмма записи данных ![](https://i.imgur.com/uDHVKiY.png =700x) --- #### Поддерживаемые типы данных <!-- .slide: style="font-size: 16px;" --> ### ADB to Spark | ADB Data Type| Spark Data Type | | -------- | -------- | | bigint | LongType | | bigSerial | LongType | | bit | StringType | | bytea | BinaryType | | boolean | BooleanType | | char | StringType | | date | DateType | | decimal | DecimalType | | float4 | FloatType | | float8 | DoubleType | | int | IntegerType | | interval | CalendarIntervalType | | serial | IntegerType | | smallInt | ShortType | | text | StringType | | time | TimeStampType | | timestamp | TimeStampType | | timestamptz | TimeStampType | | timetz | TimeStampType | | varchar | StringType | --- #### Поддерживаемые типы данных <!-- .slide: style="font-size: 16px;" --> ### Spark to ADB | Spark Data Type | ADB Data Type | | --------------- | -------- | | BinaryType | bytea | | BooleanType | boolean | | CalendarIntervalType | interval | | DateType | date | | DecimalType | numeric | | DoubleType | float8 | | FloatType | float4 | | IntegerType | int | | LongType | bigint | | ShortType | smallInt | | StringType | text | | TimeStampType | timestamp | --- #### Вставка данных. Описание режимов работы. Дополнительные опции. <!-- .slide: style="font-size: 20px;" --> ##### overwrite Режим перезаписи таблицы ##### Вспомогательные опции: * adb.create.table.with - with выражение в команде create table ADB * adb.create.table.distributedby - distributed by выражение в команде create table ADB * truncate - если true, то просто производим транкейт таблицы </br> ##### append Режим append-а данных в уже существующую таблицу ##### errorIfExists Завершаем работу с ошибкой, если таблица уже существует, в остальном как overwrite --- #### Примеры кода Пример инициализации загрузки данных из ADB в Spark ```scala= val crimes = spark .read .format("io.arenadata.spark.adb.spark.AdbDataSource") .options( Map(// JDBC ADB Master "url" -> "jdbc:postgresql://10.92.6.137:5432/spark", "user" -> "spark", "password" -> "Orion123", "dbschema" -> "test_data", "dbtable" -> "crimes" ) ).load() .withColumn("lat",$"lat".cast(DecimalType(38,18))) .withColumn("long",$"long".cast(DecimalType(38,18))) .as[Crime] ``` --- #### Примеры кода * Пример инициализации загрузки данных из Spark в ADB ```scala= finalResult // Существующий DataFrame .write .format("io.arenadata.spark.adb.spark.AdbDataSource") .options( Map(// JDBC ADB Master "url" -> "jdbc:postgresql://10.92.6.137:5432/spark", "user" -> "spark", "password" -> "Orion123", "dbschema" -> "test_data", "dbtable" -> "crimes_final_result" )) .mode(SaveMode.Overwrite) .save() ``` --- #### Пример. Автоматическое формирование схемы. <!-- .slide: style="font-size: 12px;" --> ```scala= val crimes = spark .read .format("io.arenadata.spark.adb.spark.AdbDataSource") .options( Map( "url" -> "jdbc:postgresql://10.92.3.151:5432/spark", "user" -> "spark", "password" -> "Orion123", "dbschema" -> "test_data", "dbtable" -> "crimes" ) ).load() crimes.printSchema() ``` ![](https://i.imgur.com/V4wa6rG.png) ![](https://i.imgur.com/OyEV4Ce.png) --- #### Пример. Партиционирование. Целочисленные типы. <!-- .slide: style="font-size: 16px;" --> ```scala= val crimes = spark .read .format("io.arenadata.spark.adb.spark.AdbDataSource") .options( Map( "url" -> "jdbc:postgresql://10.92.3.151:5432/spark", "user" -> "spark", "password" -> "Orion123", "dbschema" -> "test_data", "dbtable" -> "crimes", "partition.column" -> "offense_code", "partition.count" -> "8" ) ).load() .withColumn("lat",$"lat".cast(DecimalType(38,18))) .withColumn("long",$"long".cast(DecimalType(38,18))) .as[Crime] crimes.show(10) ``` 21/01/25 08:01:58 INFO AdbPartitioner: Number of partitions: 8, WHERE clauses of these partitions: "offense_code" < 576 or "offense_code" is null, "offense_code" >= 576 AND "offense_code" < 1041, "offense_code" >= 1041 AND "offense_code" < 1506, "offense_code" >= 1506 AND "offense_code" < 1971, "offense_code" >= 1971 AND "offense_code" < 2436, "offense_code" >= 2436 AND "offense_code" < 2901, "offense_code" >= 2901 AND "offense_code" < 3366, "offense_code" >= 3366 ![](https://i.imgur.com/EKkjqPu.png) --- #### Пример. Партиционирование. Даты. <!-- .slide: style="font-size: 16px;" --> ```scala= val crimes = spark .read .format("io.arenadata.spark.adb.spark.AdbDataSource") .options( Map( "url" -> "jdbc:postgresql://10.92.3.151:5432/spark", "user" -> "spark", "password" -> "Orion123", "dbschema" -> "test_data", "dbtable" -> "crimes", "partition.column" -> "occured_on_date", "partition.count" -> "8" ) ).load() .withColumn("lat",$"lat".cast(DecimalType(38,18))) .withColumn("long",$"long".cast(DecimalType(38,18))) .as[Crime] crimes.show(10) ``` 21/01/25 08:10:39 INFO AdbPartitioner: Number of partitions: 8, WHERE clauses of these partitions: "occurred_on_date" < '2015-11-09 02:40:37.5' or "occurred_on_date" is null, "occurred_on_date" >= '2015-11-09 02:40:37.5' AND "occurred_on_date" < '2016-04-04 05:21:15', "occurred_on_date" >= '2016-04-04 05:21:15' AND "occurred_on_date" < '2016-08-29 08:01:52.5', "occurred_on_date" >= '2016-08-29 08:01:52.5' AND "occurred_on_date" < '2017-01-23 10:42:30', "occurred_on_date" >= '2017-01-23 10:42:30' AND "occurred_on_date" < '2017-06-19 13:23:07.5', "occurred_on_date" >= '2017-06-19 13:23:07.5' AND "occurred_on_date" < '2017-11-13 16:03:45', "occurred_on_date" >= '2017-11-13 16:03:45' AND "occurred_on_date" < '2018-04-09 18:44:22.5', "occurred_on_date" >= '2018-04-09 18:44:22.5' ![](https://i.imgur.com/f5Ijv2g.png) --- #### Пример. push-down операторов. Отсекание колонок. ```scala= val crimes = spark .read .format("io.arenadata.spark.adb.spark.AdbDataSource") .options( Map( "url" -> "jdbc:postgresql://10.92.3.151:5432/spark", "user" -> "spark", "password" -> "Orion123", "dbschema" -> "test_data", "dbtable" -> "crimes" ) ).load() crimes.select($"incident_number").collect() ``` ![](https://i.imgur.com/iOf2n8K.png) --- #### Пример. push-down фильтров. ```scala= val crimes = spark .read .format("io.arenadata.spark.adb.spark.AdbDataSource") .options( Map( "url" -> "jdbc:postgresql://10.92.3.151:5432/spark", "user" -> "spark", "password" -> "Orion123", "dbschema" -> "test_data", "dbtable" -> "crimes" ) ).load() crimes.filter(expr("incident_number = 'I182070906'")).show() ``` ![](https://i.imgur.com/IUR07nj.png) ![](https://i.imgur.com/PJaIWWB.png) --- #### Пример. Извлечение дополнительных метаданных из ADB. Схема распределения. <!-- .slide: style="font-size: 20px;" --> ```scala= val crimes = spark .read .format("io.arenadata.spark.adb.spark.AdbDataSource") .options( Map( "url" -> "jdbc:postgresql://10.92.3.151:5432/spark", "user" -> "spark", "password" -> "Orion123", "dbschema" -> "test_data", "dbtable" -> "crimes" ) ).load() crimes.groupBy($"month").agg(expr("count(distinct incident_number)")).show() ``` ![](https://i.imgur.com/xJVoe2t.png) --- #### Пример. Извлечение дополнительных метаданных из ADB. Статистика. ```scala= val crimes = spark .read .format("io.arenadata.spark.adb.spark.AdbDataSource") .options( Map( "url" -> "jdbc:postgresql://10.92.3.151:5432/spark", "user" -> "spark", "password" -> "Orion123", "dbschema" -> "test_data", "dbtable" -> "crimes" ) ).load() crimes.collect() ``` ![](https://i.imgur.com/FRaeu3q.png) --- #### Пример. Извлечение дополнительных метаданных из ADB. Оптимизация count-ов. <!-- .slide: style="font-size: 26px;" --> ```scala= crimes.filter($"month" === 10) .agg(expr("count(incident_number)")) .show() ``` ![](https://i.imgur.com/aZGwrk9.png) --- #### Пример. Выполнение произвольного sql через master ADB. <!-- .slide: style="font-size: 16px;" --> ```scala= lazy val spark: SparkSession = { SparkSession .builder() .master("spark://10.92.6.7:7077") .appName("spark_example") .config("spark.adb.url","jdbc:postgresql://10.92.3.151:5432/spark") .config("spark.adb.driver","org.postgresql.Driver") .config("spark.adb.user","spark") .config("spark.adb.password","Orion123") .getOrCreate() import io.arenadata.spark.adb.implicits val crimes = spark.executeAdbSelectQueryOnMaster("select * from test_data.crimes;") spark.executeAdbQueryOnMaster("create table test_data.test_table_query(id int);") spark.executeAdbQueryOnMaster("insert into test_data.test_table_query values(1);") spark.executeAdbQueryOnMaster("insert into test_data.test_table_query values(2);") spark.executeAdbQueryOnMaster("insert into test_data.test_table_query values(3);") val test = spark.executeAdbSelectQueryOnMaster("select * from test_data.test_table_query;") crimes.show(10) test.show(10) ``` ![](https://i.imgur.com/AKn8hLS.png) ![](https://i.imgur.com/5kjCRKg.png) --- #### Пример. Batch режим. ```scala= val crimes = spark .read .format("io.arenadata.spark.adb.spark.AdbDataSource") .options( Map( "url" -> "jdbc:postgresql://10.92.3.151:5432/spark", "user" -> "spark", "password" -> "Orion123", "dbschema" -> "test_data", "dbtable" -> "crimes", "batch.enable" -> "true", "batch.memoryMode" -> "ON_HEAP" ) ).load() crimes.show() ``` ![](https://i.imgur.com/2SgWhZu.png) --- #### Пример. Схема * Версия Spark 2.3, ADB 6.8 ![](https://i.imgur.com/zPibNRU.png) Данные: https://www.kaggle.com/AnalyzeBoston/crimes-in-boston --- #### Возможные улучшения * Поддержка стриминга? * Более гибкое партиционирование? Ждем требований от заказчиков!
{"metaMigratedAt":"2023-06-15T14:33:32.208Z","metaMigratedFrom":"Content","title":"ADB Spark Connector","breaks":true,"contributors":"[{\"id\":\"0eed3978-94a3-41ab-9e71-1ce3fda54d8b\",\"add\":1354,\"del\":666},{\"id\":\"3a595fc4-ac64-4bea-8991-096fed009c12\",\"add\":24168,\"del\":13430}]"}
    735 views
   Owned this note