Data Team Style Guide
===
Table of contents
---
- [Infrastructure](/2aWCJoPJQ4qbLa8PLU16hg#1-Infrastructure)
- [Architecture](/2aWCJoPJQ4qbLa8PLU16hg#11-Architecture)
- [Data Flow](/2aWCJoPJQ4qbLa8PLU16hg#12-DataFlow)
- [Data Engineering Guide](/2aWCJoPJQ4qbLa8PLU16hg#2-Data-Engineering-Guide)
- [Project Structure](/2aWCJoPJQ4qbLa8PLU16hg#21-Project-Structure)
- [Git Flow](/2aWCJoPJQ4qbLa8PLU16hg#22-Git-flow)
- [Data Design](/2aWCJoPJQ4qbLa8PLU16hg#23-Data-design)
- [Coding Structure and Conventions](/2aWCJoPJQ4qbLa8PLU16hg#24-Coding-structure-and-conventions)
- [Pyspark Style Guide](/2aWCJoPJQ4qbLa8PLU16hg#25-Pyspark-Style-Guide)
- [References](/2aWCJoPJQ4qbLa8PLU16hg#3-References)
**1. Infrastructure**
---
### **1.1 Architecture**

**Data Sources:**
- Included all operational data like: product, team, ...
- Included multiple data formats: SQL, No-SQL, CSV, Json, gsheet,...
**Staging:**
- Included all operational data and clone 1-1 per row and database metadata: Daily, Hourly, Weekly, ....
- Stored on AWS S3 with `avro` format and compressed by `snappy`.
**ETL:**
- Included all ETL Jobs.
- Based on AWS Glue 2.0 (Spark 2.4).
**Mart:**
- Included all data was cleaned, transformed, aggregated.
- Published on AWS RDS.
### **1.2 DataFlow**
**Ingestion:**
- Use [streamsets](http://streamsets.data.tripi.vn/) to ingest data to Staging area (S3). [Template pipeline](https://), [Streamsets Naming Conventions](https://).
**ETL:**
- Clean and transform data from staging -> dim, fact, cube tables.
- Use [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python.html).
**Publish:**
- Publish data.
**2. Data Engineering Guide**
---
### **2.1 Project Structure**
For more detail at [LINK](https://bitbucket.org/chodulich/warehouse/src/master/Readme.md).
### **2.2 Git flow**
- Checkout from master.
- Develop new features: Dim, fact, cube, mapping…
- Resolve conflict and create merge PR to master.
- Review and approval.
- Merge.
### **2.3 Data design**
#### **2.3.1 Schema**
Follow by ***[Dimension database](https://www.ibm.com/support/knowledgecenter/en/SSGU8G_12.1.0/com.ibm.whse.doc/ids_whse_100.htm)***. All schemas will be designed with ***[star schema](https://medium.com/@marcosanchezayala/data-modeling-the-star-schema-c37e7652e206)***.
Why chooses dimention database: [detail](https://www.ibm.com/support/knowledgecenter/en/SSGU8G_14.1.0/com.ibm.whse.doc/ids_ddi_342.htm).
#### **2.3.2 Schema design**
- **Fact table:** contains measurements, metrics, and facts about a business process.
***Build Fact:***
- Determine which dimensions will be included.
- Determine where along the hierarchy of each dimension the information will be kept.
- **Dim table:** contains dimensions of a fact.
***Note:*** Avoid building too many dimension that can turn star schema to snowflake schema.
#### **2.3.3 Convention**
- Column name must be `lowercase`.
- TimestampType columns should end with `_at`. Example: `created_at`.
- DateType columns should end with `_date`. Example: `created_date`.
- Each `date/timestamp` column existed should have a corresponding `date_id` column that references to `dim_date`. Example: `created_at` should have `created_date_id`.
- Phone number columns should end with `_phone`. Example: `customer_phone`. All phone columns must be normalized.
- Money columns should end with `_amount`. Example: `debit_amount`, `discount_amount`.
- Boolean columns should start with `is_`. Example: `is_actived`.
- Aggregation on a date range acronym: `{number_of_days}d`. Example: `revenue_7d`, `gross_margin_30d`.
- Metrics that calculated on sale orders only (not including refund/return orders) should start with `gross_`. Example: `gross_revenue`
- `revenue` means net revenue, that included refund/return orders subtraction.
- Gender: `F - Female`, `M - Male`, `U - Unknown`
#### **2.3.4 Partition**
**Dimension**
Must be included `snapshot/date_id=` and `current` folders
- `snapshot/date_id=`: Contain snapshot (version) of data overtime (daily - partition by date).
- `current`: A copy of the latest snapshot.
```
|_ dim_xyz
|_ snapshot
| |_date_id=20200101
| |_date_id=20200102
|_ current # Data is the same as snapshot/date_id=20200102
```
**Fact/Cube**
Must be partioned by processing date or created date.
```
|_ fact_xyz
|_ crawled_date_id=20200101
|_ crawled_date_id=20200102
```
### **2.4 Coding structure and conventions**
#### 2.4.1 Scalable structure
Generally any pipeline involves the following stages:
- **Reading**: Contain all table for pipeline
- **Preprocessing**: clean, filter, remove invalid or unused records.Ex: remove NA, duplicate records, UPPER columns, ....
- **Transformation**: Contain logical transformation.
- **Aggregation**: Aggregate data.
- **Writing**: Write data.
#### 2.4.2 Conventions
- **File name**: Meaningful and normalized
```
# Example: Do one thing, and describe that thing
|_ preprocessing
|_ hotel_payment.py
|_ hotel_booking.py
```
```
# Bad
|_ preprocessing
|_ hotel_payment_preprocessor.py
|_ hotel_booking_preprocessor.py
# Good: Don't need `preprocessor` suffix since these files are inside `preprocessing` folder
|_ preprocessing
|_ hotel_payment.py
|_ hotel_booking.py
```
- **Coding**:
- A class name should be a noun. Example: `class PhoneNormalizer`
- A method name should be a verb. Example: `def normalize_phone()`
- A variable name should be a noun. Example: `normalized_phone`
- A constant name should be a noun and uppercase. Example: `DATE_FORMAT = "%Y%m%d"`
- Name of variable should describe what it does / what it is. Example:
```
# Bad
df = spark.read.parquet(input_path)
df = clean(df)
# Good
original_terminals_df = spark.read.parquet(input_path)
cleaned_terminals_df = clean_terminal(original_terminals_df)
# Or
cleaned_terminals_df = TerminalPreprocessor.clean(original_terminals_df)
```
- Type hint: Input/Output variable must be defined its type
```
# Bad
def init_spark(configs):
pass
# Good
from typing
Dict
from pyspark import SparkSession
def init_spark(configs: Dict[str, str]) -> SparkSession:
pass
```
- Predefine a constant with meaningful name instead of using an hard-coding value:
```
# Bad
def extract_success_transactions(transactions: DataFrame) -> DataFrame:
return transactions.filter(F.col("status") == 0)
# Good
SUCCESS_STATUS = 0
def extract_success_transactions(transactions: DataFrame) -> DataFrame:
return transactions.filter(F.col("status") == SUCCESS_STATUS)
# Better
class TransactionStatus:
SUCCESS = 0
# Other statuses can go here
def extract_success_transactions(transactions: DataFrame) -> DataFrame:
return transactions.filter(F.col("status") == TransactionStatus.SUCCESS)
```
- Should contain at least one happy case.
```
# Good example
def test_clean(spark):
test_cases = [
{
"name": "Happy case",
"input": cleaned_df, # Cleaned data no need cleaning
"expected": cleaned_df,
},
# ...
]
# ...
# Better example - better name
def test_clean(spark):
test_cases = [
{
"name": "Cleaned data do not need cleaning",
"input": cleaned_df,
"expected": cleaned_df,
},
# ...
]
# …
```
### **2.5 Pyspark Style Guide**
- **Prefer implicit column selection to direct access, except for disambiguation:**
```
# bad
df = df.select(F.lower(df1.colA), F.upper(df2.colB)) (1)
# good
df = df.select(F.lower(F.col('colA')), F.upper(F.col('colB'))) (2)
```
```
# bad
df.join(df2, on=(df.key == df2.key), how='left')
# good
df2 = df2.withColumnRename('key', 'df_key')
df = df.join(df2, on=['df_key'], how='left')
```
- Prefer the second style `(2)` because:
- If the dataframe variable name is large, expressions involving it quickly become unwieldy.
- If the column name has a space or other unsupported character, the bracket operator must be used instead. This generates inconsistency, and `df1['colA']` is just as difficult to write as `F.col('colA')`.
- Column expressions involving the dataframe aren't reusable and can't be used for defining abstract functions.
- Renaming a dataframe variable can be error-prone, as all column references must be updated in tandem.
- **Refactor complex logical operations:**
```
# bad
F.when( (F.col('prod_status') == 'Delivered') | (((F.datediff('deliveryDate_actual', 'current_date') < 0) & ((F.col('currentRegistration') != '') | ((F.datediff('deliveryDate_actual', 'current_date') < 0) & ((F.col('originalOperator') != '') | (F.col('currentOperator') != '')))))), 'In Service')
# good
has_operator = (logical)
delivery_date_passed = (logical)
has_registration = (logical)
is_delivered = (logical)
is_active = (has_registration | has_operator)
F.when(is_delivered | (delivery_date_passed & is_active), 'In Service')
```
- **Empty value:**
```
# bad
df = df.withColumn('foo', F.lit(''))
# bad
df = df.withColumn('foo', F.lit('NA'))
# good
df = df.withColumn('foo', F.lit(None))
```
- **Join:**
- Do not use right join. Confuse + complex logical code.
- Big table should be on the left side. For `join` operator the second table will be read multiple times when `join`
- Do not use `dropDuplicates()` or `distinct()` after join. May be included exclude value in data or something wrong in logical processing.
- **Dealing with nulls:**
Nulls are ignored for aggregate functions to avoid this problem by enabling the `ignorenulls` flag. Examples:
```
# bad
df_nulls = spark.createDataFrame([('a', None), ('a', 1), ('a', 2), ('a', None)], ['key', 'num'])
df_nulls.select('key', F.first('num').over(w4).alias('first')).collect()
# => [Row(key='a', first=None), Row(key='a', first=None), Row(key='a', first=None), Row(key='a', first=None)]
df_nulls.select('key', F.last('num').over(w4).alias('last')).collect()
# => [Row(key='a', last=None), Row(key='a', last=None), Row(key='a', last=None), Row(key='a', last=None)]
# good
df_nulls.select('key', F.first('num', ignorenulls=True).over(w4).alias('first')).collect()
# => [Row(key='a', first=1), Row(key='a', first=1), Row(key='a', first=1), Row(key='a', first=1)]
df_nulls.select('key', F.last('num', ignorenulls=True).over(w4).alias('last')).collect()
# => [Row(key='a', last=2), Row(key='a', last=2), Row(key='a', last=2), Row(key='a', last=2)]
```
- **Expressions:**
- Seperating into steps
```
# bad
df = (
df
.select('a', 'b', 'c', 'key')
.filter(F.col('a') == 'truthiness')
.withColumn('boverc', F.col('b') / F.col('c'))
.join(df2, 'key', how='inner')
.join(df3, 'key', how='left')
.drop('c')
)
# better
df = (
df
.select('a', 'b', 'c', 'key')
.filter(F.col('a') == 'truthiness')
)
df = df.withColumn('boverc', F.col('b') / F.col('c'))
df = (
df
.join(df2, 'key', how='inner')
.join(df3, 'key', how='left')
.drop('c')
)
```
- To keep things consistent, please wrap the entire expression into a single parenthesis block, and avoid using `\`
```
# bad
df = df.filter(F.col('event') == 'executing')\
.filter(F.col('has_tests') == True)\
.drop('has_tests')
# good
df = (
df
.filter(F.col('event') == 'executing')
.filter(F.col('has_tests') == True)
.drop('has_tests')
)
```
- One line - one logic:
```
# bad
customers_with_shipping_address = (
customers_with_shipping_address
.select('a', 'b', 'c', 'key')
.filter(F.col('a') == 'truthiness')
.withColumn('boverc', F.col('b') / F.col('c'))
.join(df2, 'key', how='inner')
)
# good
customers = (
customers
.select('a', 'b', 'c', 'key')
.filter(F.col('a') == 'truthiness')
)
customers = customers.withColumn('boverc', F.col('b') / F.col('c'))
customers = customers.join(df_to_join, 'key', how='inner')
```
- **Others:**
- Be wary of functions that grow too large. As a general rule, a file should not be over 250 lines, and a function should not be over 70 lines.
- Star-import is prohibited in our repository.
```
# Bad
from pyspark.sql.functions import *
# Good
from pyspark.sql.functions import col, lit, when
# Also good
from pyspark.sql import functions as F
```
**3. References**
---
[pyspark-style-guide](https://github.com/palantir/pyspark-style-guide#dealing-with-nulls)
[IBM](https://www.ibm.com/support/knowledgecenter/en/SSGU8G_12.1.0/com.ibm.datawhsenode.doc/datawhse.htm)
[Data Teko](https://data.pages.teko.vn/)
[Microsoft](https://docs.microsoft.com/en-us/analysis-services/?view=asallproducts-allversions)
###### tags: `tripi` `vntravel` `data` `vnlife`