# Spark Query DSL <!-- Put the link to this slide here so people can follow slide: https://hackmd.io/p/template-Talk-slide --> --- ## Some context Some issues Jean-François & I could help **AP-HP** with: - Rapidly-changing data definitions - Need for quick rebuilds… - …While ensuring data quality - Amidst accumulated technical debt Note: AP-HP was struggling with plenty of issues, short on staff, mostly focused on putting out fires and providing for people coming to the rescue. Some others were more suited to Data Science skills, including Samuel & Léo. --- ### Some context (cont'd) Jean-François on a big project: a graphical tool to visualize and edit the data schema, for collaboration. Me on several small projects: - Speed up automated document classification - Quick YAML quality constraints - <span><!-- .element: class="fragment highlight-blue" -->Auto-join dataframes</span> Note: • From 40' suffocating Postgres to 2' on Spark Select a class based on a series of sequential regexes • Time saver when the underlying data keeps changing Auto-check spark dataframes for invariants • Last bit is the subject of this prez --- ### Use case ```mermaid graph TD P[Patient 1] C[Condition 1] D[Drug 1] D2[Drug 2] P -->|"with ▾"| C C -->|"without ▾"| D D -->|"with ▾"| D2 subgraph Patient PC["birth < 2000-01-01 <br>sex = male <br>[+ add filter…]"] P --> PC end subgraph Condition CC["diagnosis = COVID-19 <br>[+ add filter…]"] C --> CC end subgraph Drug DC["name = chloroquine <br>[+ add filter…]"] D --> DC end subgraph Drug DC2["[+ add filter…]"] D2 --> DC2 end ``` Note: • Select entities to join & how to join them • Field name conventions for joining ---- ### Long term: a UI tool ```mermaid sequenceDiagram participant U as UI participant B as Backend participant T as Solr Transiator participant S as Spark participant R as Solr U->>+B: query B->>B: determine `fl` B->>+T: translate solr subqueries T-->>-B: subqueries B->>B: build DAG B->>S: get results S->>R: get individual dataframes R-->>S: dataframes S->>S: magic S-->>B: result dataframe B-->>-U: results ``` --- ### Articulation ```scala val patient = Patient(patient_df, "patient") val condition = Condition(condition_df, "condition") val drug1 = Drug(drug1_df, "drug1") val drug2 = Drug(drug2_df, "drug2") InnerJoin( AntiJoin( InnerJoin( patient, condition ), drug1 ), drug2 ).select(patient, drug2) ``` Note: • This is fine for their need, but… • …not very wieldy • …and I had some less busy on evenings & weekends, so… ---- ### Auto-join convention - Each (sub)query has an alias - Can join on left alias `left` - If it has `id` (configurable per class), or `left_id` - If `right` has `left_id`, `left_fk` or `left` - Override possible --- ### Extension: DSL ```scala val patient = Patient(patient1_df, "patient") val condition = Condition(condition1_df, "condition") val drug1 = Drug(drug1_df, "drug1") val drug2 = Drug(drug2_df, "drug2") (((patient + condition) - drug1) + drug2).select(patient1, drug2) // With added filters (patient | patient("age") > 20) + (condition | condition("diagnosis") === "COVID-19") - (drug | drug("name") === "chloroquine").alias("choloroquine") + drug ``` Note: • …this being Scala, I made a DSL! • A tree of operators is something that is well expressed by a DSL. There is one in DSS: QueryBuilder. • And then it was pretty easy to integrate to DSS --- ## Demo 👨🏻‍💻 ---- ## DSS Integration Available in `perso/alavoillotte/spark-query-dsl` ```scala // Example: Load a Dataiku dataset into a Spark DataFrame val message = dkuContext.getQuery(sqlContext, "message") val topic = dkuContext.getQuery(sqlContext, "topic") val person = dkuContext.getQuery(sqlContext, "person") ``` ---- ### Examples: joins ```scala // inner join: authors & their topics person + topic // multiple joins: authors, their topics and their messages person + topic + message // anti join: people without any message person - message // left outer join: people, possibly with their messages person % message // left semi join: people that have messages (select only the person) person ^ message // full outer join: person %% message // right outer join: person %> message // cross join: each combination of 2 people (donut) person * person // selft-join is tricky, requires alias message.alias("parent") + message.on("message_id" -> "parent_id") ``` ---- ### Examples: filters ```scala // Full spark.sql.Column expressivity person | ( person("name").when(lit("Alex"), "full quota").otherwise("ok") && person("born").isNotNull ) // Chainable filters person | person("name").endsWith("e") | person("person_id") % 2 === 1 // Filters on join, useful for similar fields person + topics | person("created") > topic("created") ``` ---- ### Examples: selection ```scala // topic with an author whose name finishes with an e // but don't care about the author, just used for filtering (topic + person | person("name").endsWith("e")).select(topic) // select all with auto-prefix (topic + person).select() // topic_title, person_name, ... // select all with no prefix, skip duplicates (topic + person).select(true) // title, name, ... ``` ---- ### Exemples: group by ```scala // person with count of distinct messages (person % message) / person // person with count of mandatory distinct messages & topics (person + message + topic) / person ``` ---- ### Examples: combinations ```scala // union of untitled topic authors (left semi join) // and people who have posted no messages (person ^ (topic | topic("title").isNull)) & (person - message) ``` --- ## Thanks! - Any questions? - If you want to check out the branch: `perso/alavoillotte/spark-query-dsl`
{"metaMigratedAt":"2023-06-15T07:25:56.666Z","metaMigratedFrom":"YAML","title":"Spark Query DSL","breaks":true,"slideOptions":"{\"theme\":\"league\"}","contributors":"[{\"id\":\"0bcb2984-dec3-4f69-a209-04eaf8f9be75\",\"add\":8463,\"del\":2521}]"}
    396 views