Spark Query DSL


Some context

Some issues Jean-François & I could help AP-HP with:

  • Rapidly-changing data definitions
  • Need for quick rebuilds…
  • …While ensuring data quality
  • Amidst accumulated technical debt

Note:
AP-HP was struggling with plenty of issues, short on staff, mostly focused on putting out fires and providing for people coming to the rescue.
Some others were more suited to Data Science skills, including Samuel & Léo.


Some context (cont'd)

Jean-François on a big project: a graphical tool to visualize and edit the data schema, for collaboration.

Me on several small projects:

  • Speed up automated document classification
  • Quick YAML quality constraints
  • Auto-join dataframes

Note:
• From 40' suffocating Postgres to 2' on Spark
Select a class based on a series of sequential regexes
• Time saver when the underlying data keeps changing
Auto-check spark dataframes for invariants
• Last bit is the subject of this prez


Use case

graph TD
P[Patient 1]
C[Condition 1]
D[Drug 1]
D2[Drug 2]
P -->|"with ▾"| C
C -->|"without ▾"| D
D -->|"with ▾"| D2

subgraph Patient
PC["birth < 2000-01-01
<br>sex = male
<br>[+ add filter…]"]
P --> PC
end

subgraph Condition
CC["diagnosis = COVID-19
<br>[+ add filter…]"]
C --> CC
end

subgraph Drug
DC["name = chloroquine
<br>[+ add filter…]"]
D --> DC
end

subgraph Drug 
DC2["[+ add filter…]"]
D2 --> DC2
end

Note:
• Select entities to join & how to join them
• Field name conventions for joining


Long term: a UI tool

sequenceDiagram
    participant U as UI
    participant B as Backend
    participant T as Solr Transiator
    participant S as Spark
    participant R as Solr

	U->>+B: query

    B->>B: determine `fl`
    
	B->>+T: translate solr subqueries
	T-->>-B: subqueries

    B->>B: build DAG

    B->>S: get results
    S->>R: get individual dataframes
    R-->>S: dataframes
    S->>S: magic
    S-->>B: result dataframe

	B-->>-U: results

Articulation

val patient = Patient(patient_df, "patient")
val condition = Condition(condition_df, "condition")
val drug1 = Drug(drug1_df, "drug1")
val drug2 = Drug(drug2_df, "drug2")

InnerJoin(
  AntiJoin(
    InnerJoin(
      patient,
      condition
    ),
    drug1
  ),
  drug2
).select(patient, drug2)

Note:
• This is fine for their need, but…
• …not very wieldy
• …and I had some less busy on evenings & weekends, so…


Auto-join convention

  • Each (sub)query has an alias
  • Can join on left alias left
    • If it has id (configurable per class), or left_id
    • If right has left_id, left_fk or left
  • Override possible

Extension: DSL

val patient = Patient(patient1_df, "patient")
val condition = Condition(condition1_df, "condition")
val drug1 = Drug(drug1_df, "drug1")
val drug2 = Drug(drug2_df, "drug2")

(((patient + condition) - drug1) + drug2).select(patient1, drug2)

// With added filters
(patient | patient("age") > 20)
  + (condition | condition("diagnosis") === "COVID-19")
  - (drug | drug("name") === "chloroquine").alias("choloroquine")
  + drug

Note:
• …this being Scala, I made a DSL!
• A tree of operators is something that is well expressed by a DSL. There is one in DSS: QueryBuilder.
• And then it was pretty easy to integrate to DSS


Demo 👨🏻‍💻


DSS Integration

Available in perso/alavoillotte/spark-query-dsl

// Example: Load a Dataiku dataset into a Spark DataFrame
val message = dkuContext.getQuery(sqlContext, "message")
val topic = dkuContext.getQuery(sqlContext, "topic")
val person = dkuContext.getQuery(sqlContext, "person")

Examples: joins

// inner join: authors & their topics
person + topic
// multiple joins: authors, their topics and their messages
person + topic + message
// anti join: people without any message
person - message
// left outer join: people, possibly with their messages
person % message
// left semi join: people that have messages (select only the person)
person ^ message
// full outer join:
person %% message
// right outer join:
person %> message
// cross join: each combination of 2 people (donut)
person * person
// selft-join is tricky, requires alias
message.alias("parent") + message.on("message_id" -> "parent_id")

Examples: filters

// Full spark.sql.Column expressivity
person | ( person("name").when(lit("Alex"), "full quota").otherwise("ok")
        && person("born").isNotNull )

// Chainable filters
person | person("name").endsWith("e")
       | person("person_id") % 2 === 1

// Filters on join, useful for similar fields
person + topics | person("created") > topic("created")

Examples: selection

// topic with an author whose name finishes with an e
// but don't care about the author, just used for filtering
(topic + person | person("name").endsWith("e")).select(topic)

// select all with auto-prefix
(topic + person).select()  // topic_title, person_name, ...

// select all with no prefix, skip duplicates
(topic + person).select(true)  // title, name, ...

Exemples: group by

// person with count of distinct messages
(person % message) / person

// person with count of mandatory distinct messages & topics
(person + message + topic) / person

Examples: combinations

// union of untitled topic authors (left semi join)
// and people who have posted no messages
(person ^ (topic | topic("title").isNull))
  & (person - message)

Thanks!

  • Any questions?
  • If you want to check out the branch: perso/alavoillotte/spark-query-dsl
Select a repo