owned this note
owned this note
Published
Linked with GitHub
---
title: Spark Query DSL
tags: Dataiku, Slides
slideOptions:
theme: league
---
# Spark Query DSL
<!-- Put the link to this slide here so people can follow
slide: https://hackmd.io/p/template-Talk-slide -->
---
## Some context
Some issues Jean-François & I could help **AP-HP** with:
- Rapidly-changing data definitions
- Need for quick rebuilds…
- …While ensuring data quality
- Amidst accumulated technical debt
Note:
AP-HP was struggling with plenty of issues, short on staff, mostly focused on putting out fires and providing for people coming to the rescue.
Some others were more suited to Data Science skills, including Samuel & Léo.
---
### Some context (cont'd)
Jean-François on a big project: a graphical tool to visualize and edit the data schema, for collaboration.
Me on several small projects:
- Speed up automated document classification
- Quick YAML quality constraints
- <span><!-- .element: class="fragment highlight-blue" -->Auto-join dataframes</span>
Note:
• From 40' suffocating Postgres to 2' on Spark
Select a class based on a series of sequential regexes
• Time saver when the underlying data keeps changing
Auto-check spark dataframes for invariants
• Last bit is the subject of this prez
---
### Use case
```mermaid
graph TD
P[Patient 1]
C[Condition 1]
D[Drug 1]
D2[Drug 2]
P -->|"with ▾"| C
C -->|"without ▾"| D
D -->|"with ▾"| D2
subgraph Patient
PC["birth < 2000-01-01
<br>sex = male
<br>[+ add filter…]"]
P --> PC
end
subgraph Condition
CC["diagnosis = COVID-19
<br>[+ add filter…]"]
C --> CC
end
subgraph Drug
DC["name = chloroquine
<br>[+ add filter…]"]
D --> DC
end
subgraph Drug
DC2["[+ add filter…]"]
D2 --> DC2
end
```
Note:
• Select entities to join & how to join them
• Field name conventions for joining
----
### Long term: a UI tool
```mermaid
sequenceDiagram
participant U as UI
participant B as Backend
participant T as Solr Transiator
participant S as Spark
participant R as Solr
U->>+B: query
B->>B: determine `fl`
B->>+T: translate solr subqueries
T-->>-B: subqueries
B->>B: build DAG
B->>S: get results
S->>R: get individual dataframes
R-->>S: dataframes
S->>S: magic
S-->>B: result dataframe
B-->>-U: results
```
---
### Articulation
```scala
val patient = Patient(patient_df, "patient")
val condition = Condition(condition_df, "condition")
val drug1 = Drug(drug1_df, "drug1")
val drug2 = Drug(drug2_df, "drug2")
InnerJoin(
AntiJoin(
InnerJoin(
patient,
condition
),
drug1
),
drug2
).select(patient, drug2)
```
Note:
• This is fine for their need, but…
• …not very wieldy
• …and I had some less busy on evenings & weekends, so…
----
### Auto-join convention
- Each (sub)query has an alias
- Can join on left alias `left`
- If it has `id` (configurable per class), or `left_id`
- If `right` has `left_id`, `left_fk` or `left`
- Override possible
---
### Extension: DSL
```scala
val patient = Patient(patient1_df, "patient")
val condition = Condition(condition1_df, "condition")
val drug1 = Drug(drug1_df, "drug1")
val drug2 = Drug(drug2_df, "drug2")
(((patient + condition) - drug1) + drug2).select(patient1, drug2)
// With added filters
(patient | patient("age") > 20)
+ (condition | condition("diagnosis") === "COVID-19")
- (drug | drug("name") === "chloroquine").alias("choloroquine")
+ drug
```
Note:
• …this being Scala, I made a DSL!
• A tree of operators is something that is well expressed by a DSL. There is one in DSS: QueryBuilder.
• And then it was pretty easy to integrate to DSS
---
## Demo 👨🏻💻
----
## DSS Integration
Available in `perso/alavoillotte/spark-query-dsl`
```scala
// Example: Load a Dataiku dataset into a Spark DataFrame
val message = dkuContext.getQuery(sqlContext, "message")
val topic = dkuContext.getQuery(sqlContext, "topic")
val person = dkuContext.getQuery(sqlContext, "person")
```
----
### Examples: joins
```scala
// inner join: authors & their topics
person + topic
// multiple joins: authors, their topics and their messages
person + topic + message
// anti join: people without any message
person - message
// left outer join: people, possibly with their messages
person % message
// left semi join: people that have messages (select only the person)
person ^ message
// full outer join:
person %% message
// right outer join:
person %> message
// cross join: each combination of 2 people (donut)
person * person
// selft-join is tricky, requires alias
message.alias("parent") + message.on("message_id" -> "parent_id")
```
----
### Examples: filters
```scala
// Full spark.sql.Column expressivity
person | ( person("name").when(lit("Alex"), "full quota").otherwise("ok")
&& person("born").isNotNull )
// Chainable filters
person | person("name").endsWith("e")
| person("person_id") % 2 === 1
// Filters on join, useful for similar fields
person + topics | person("created") > topic("created")
```
----
### Examples: selection
```scala
// topic with an author whose name finishes with an e
// but don't care about the author, just used for filtering
(topic + person | person("name").endsWith("e")).select(topic)
// select all with auto-prefix
(topic + person).select() // topic_title, person_name, ...
// select all with no prefix, skip duplicates
(topic + person).select(true) // title, name, ...
```
----
### Exemples: group by
```scala
// person with count of distinct messages
(person % message) / person
// person with count of mandatory distinct messages & topics
(person + message + topic) / person
```
----
### Examples: combinations
```scala
// union of untitled topic authors (left semi join)
// and people who have posted no messages
(person ^ (topic | topic("title").isNull))
& (person - message)
```
---
## Thanks!
- Any questions?
- If you want to check out the branch: `perso/alavoillotte/spark-query-dsl`