Data Team Style Guide === Table of contents --- - [Infrastructure](/2aWCJoPJQ4qbLa8PLU16hg#1-Infrastructure) - [Architecture](/2aWCJoPJQ4qbLa8PLU16hg#11-Architecture) - [Data Flow](/2aWCJoPJQ4qbLa8PLU16hg#12-DataFlow) - [Data Engineering Guide](/2aWCJoPJQ4qbLa8PLU16hg#2-Data-Engineering-Guide) - [Project Structure](/2aWCJoPJQ4qbLa8PLU16hg#21-Project-Structure) - [Git Flow](/2aWCJoPJQ4qbLa8PLU16hg#22-Git-flow) - [Data Design](/2aWCJoPJQ4qbLa8PLU16hg#23-Data-design) - [Coding Structure and Conventions](/2aWCJoPJQ4qbLa8PLU16hg#24-Coding-structure-and-conventions) - [Pyspark Style Guide](/2aWCJoPJQ4qbLa8PLU16hg#25-Pyspark-Style-Guide) - [References](/2aWCJoPJQ4qbLa8PLU16hg#3-References) **1. Infrastructure** --- ### **1.1 Architecture** ![](https://i.imgur.com/jrpt0Wd.png) **Data Sources:** - Included all operational data like: product, team, ... - Included multiple data formats: SQL, No-SQL, CSV, Json, gsheet,... **Staging:** - Included all operational data and clone 1-1 per row and database metadata: Daily, Hourly, Weekly, .... - Stored on AWS S3 with `avro` format and compressed by `snappy`. **ETL:** - Included all ETL Jobs. - Based on AWS Glue 2.0 (Spark 2.4). **Mart:** - Included all data was cleaned, transformed, aggregated. - Published on AWS RDS. ### **1.2 DataFlow** **Ingestion:** - Use [streamsets](http://streamsets.data.tripi.vn/) to ingest data to Staging area (S3). [Template pipeline](https://), [Streamsets Naming Conventions](https://). **ETL:** - Clean and transform data from staging -> dim, fact, cube tables. - Use [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python.html). **Publish:** - Publish data. **2. Data Engineering Guide** --- ### **2.1 Project Structure** For more detail at [LINK](https://bitbucket.org/chodulich/warehouse/src/master/Readme.md). ### **2.2 Git flow** - Checkout from master. - Develop new features: Dim, fact, cube, mapping… - Resolve conflict and create merge PR to master. - Review and approval. - Merge. ### **2.3 Data design** #### **2.3.1 Schema** Follow by ***[Dimension database](https://www.ibm.com/support/knowledgecenter/en/SSGU8G_12.1.0/com.ibm.whse.doc/ids_whse_100.htm)***. All schemas will be designed with ***[star schema](https://medium.com/@marcosanchezayala/data-modeling-the-star-schema-c37e7652e206)***. Why chooses dimention database: [detail](https://www.ibm.com/support/knowledgecenter/en/SSGU8G_14.1.0/com.ibm.whse.doc/ids_ddi_342.htm). #### **2.3.2 Schema design** - **Fact table:** contains measurements, metrics, and facts about a business process. ***Build Fact:*** - Determine which dimensions will be included. - Determine where along the hierarchy of each dimension the information will be kept. - **Dim table:** contains dimensions of a fact. ***Note:*** Avoid building too many dimension that can turn star schema to snowflake schema. #### **2.3.3 Convention** - Column name must be `lowercase`. - TimestampType columns should end with `_at`. Example: `created_at`. - DateType columns should end with `_date`. Example: `created_date`. - Each `date/timestamp` column existed should have a corresponding `date_id` column that references to `dim_date`. Example: `created_at` should have `created_date_id`. - Phone number columns should end with `_phone`. Example: `customer_phone`. All phone columns must be normalized. - Money columns should end with `_amount`. Example: `debit_amount`, `discount_amount`. - Boolean columns should start with `is_`. Example: `is_actived`. - Aggregation on a date range acronym: `{number_of_days}d`. Example: `revenue_7d`, `gross_margin_30d`. - Metrics that calculated on sale orders only (not including refund/return orders) should start with `gross_`. Example: `gross_revenue` - `revenue` means net revenue, that included refund/return orders subtraction. - Gender: `F - Female`, `M - Male`, `U - Unknown` #### **2.3.4 Partition** **Dimension** Must be included `snapshot/date_id=` and `current` folders - `snapshot/date_id=`: Contain snapshot (version) of data overtime (daily - partition by date). - `current`: A copy of the latest snapshot. ``` |_ dim_xyz |_ snapshot | |_date_id=20200101 | |_date_id=20200102 |_ current # Data is the same as snapshot/date_id=20200102 ``` **Fact/Cube** Must be partioned by processing date or created date. ``` |_ fact_xyz |_ crawled_date_id=20200101 |_ crawled_date_id=20200102 ``` ### **2.4 Coding structure and conventions** #### 2.4.1 Scalable structure Generally any pipeline involves the following stages: - **Reading**: Contain all table for pipeline - **Preprocessing**: clean, filter, remove invalid or unused records.Ex: remove NA, duplicate records, UPPER columns, .... - **Transformation**: Contain logical transformation. - **Aggregation**: Aggregate data. - **Writing**: Write data. #### 2.4.2 Conventions - **File name**: Meaningful and normalized ``` # Example: Do one thing, and describe that thing |_ preprocessing |_ hotel_payment.py |_ hotel_booking.py ``` ``` # Bad |_ preprocessing |_ hotel_payment_preprocessor.py |_ hotel_booking_preprocessor.py # Good: Don't need `preprocessor` suffix since these files are inside `preprocessing` folder |_ preprocessing |_ hotel_payment.py |_ hotel_booking.py ``` - **Coding**: - A class name should be a noun. Example: `class PhoneNormalizer` - A method name should be a verb. Example: `def normalize_phone()` - A variable name should be a noun. Example: `normalized_phone` - A constant name should be a noun and uppercase. Example: `DATE_FORMAT = "%Y%m%d"` - Name of variable should describe what it does / what it is. Example: ``` # Bad df = spark.read.parquet(input_path) df = clean(df) # Good original_terminals_df = spark.read.parquet(input_path) cleaned_terminals_df = clean_terminal(original_terminals_df) # Or cleaned_terminals_df = TerminalPreprocessor.clean(original_terminals_df) ``` - Type hint: Input/Output variable must be defined its type ``` # Bad def init_spark(configs): pass # Good from typing Dict from pyspark import SparkSession def init_spark(configs: Dict[str, str]) -> SparkSession: pass ``` - Predefine a constant with meaningful name instead of using an hard-coding value: ``` # Bad def extract_success_transactions(transactions: DataFrame) -> DataFrame: return transactions.filter(F.col("status") == 0) # Good SUCCESS_STATUS = 0 def extract_success_transactions(transactions: DataFrame) -> DataFrame: return transactions.filter(F.col("status") == SUCCESS_STATUS) # Better class TransactionStatus: SUCCESS = 0 # Other statuses can go here def extract_success_transactions(transactions: DataFrame) -> DataFrame: return transactions.filter(F.col("status") == TransactionStatus.SUCCESS) ``` - Should contain at least one happy case. ``` # Good example def test_clean(spark): test_cases = [ { "name": "Happy case", "input": cleaned_df, # Cleaned data no need cleaning "expected": cleaned_df, }, # ... ] # ... # Better example - better name def test_clean(spark): test_cases = [ { "name": "Cleaned data do not need cleaning", "input": cleaned_df, "expected": cleaned_df, }, # ... ] # … ``` ### **2.5 Pyspark Style Guide** - **Prefer implicit column selection to direct access, except for disambiguation:** ``` # bad df = df.select(F.lower(df1.colA), F.upper(df2.colB)) (1) # good df = df.select(F.lower(F.col('colA')), F.upper(F.col('colB'))) (2) ``` ``` # bad df.join(df2, on=(df.key == df2.key), how='left') # good df2 = df2.withColumnRename('key', 'df_key') df = df.join(df2, on=['df_key'], how='left') ``` - Prefer the second style `(2)` because: - If the dataframe variable name is large, expressions involving it quickly become unwieldy. - If the column name has a space or other unsupported character, the bracket operator must be used instead. This generates inconsistency, and `df1['colA']` is just as difficult to write as `F.col('colA')`. - Column expressions involving the dataframe aren't reusable and can't be used for defining abstract functions. - Renaming a dataframe variable can be error-prone, as all column references must be updated in tandem. - **Refactor complex logical operations:** ``` # bad F.when( (F.col('prod_status') == 'Delivered') | (((F.datediff('deliveryDate_actual', 'current_date') < 0) & ((F.col('currentRegistration') != '') | ((F.datediff('deliveryDate_actual', 'current_date') < 0) & ((F.col('originalOperator') != '') | (F.col('currentOperator') != '')))))), 'In Service') # good has_operator = (logical) delivery_date_passed = (logical) has_registration = (logical) is_delivered = (logical) is_active = (has_registration | has_operator) F.when(is_delivered | (delivery_date_passed & is_active), 'In Service') ``` - **Empty value:** ``` # bad df = df.withColumn('foo', F.lit('')) # bad df = df.withColumn('foo', F.lit('NA')) # good df = df.withColumn('foo', F.lit(None)) ``` - **Join:** - Do not use right join. Confuse + complex logical code. - Big table should be on the left side. For `join` operator the second table will be read multiple times when `join` - Do not use `dropDuplicates()` or `distinct()` after join. May be included exclude value in data or something wrong in logical processing. - **Dealing with nulls:** Nulls are ignored for aggregate functions to avoid this problem by enabling the `ignorenulls` flag. Examples: ``` # bad df_nulls = spark.createDataFrame([('a', None), ('a', 1), ('a', 2), ('a', None)], ['key', 'num']) df_nulls.select('key', F.first('num').over(w4).alias('first')).collect() # => [Row(key='a', first=None), Row(key='a', first=None), Row(key='a', first=None), Row(key='a', first=None)] df_nulls.select('key', F.last('num').over(w4).alias('last')).collect() # => [Row(key='a', last=None), Row(key='a', last=None), Row(key='a', last=None), Row(key='a', last=None)] # good df_nulls.select('key', F.first('num', ignorenulls=True).over(w4).alias('first')).collect() # => [Row(key='a', first=1), Row(key='a', first=1), Row(key='a', first=1), Row(key='a', first=1)] df_nulls.select('key', F.last('num', ignorenulls=True).over(w4).alias('last')).collect() # => [Row(key='a', last=2), Row(key='a', last=2), Row(key='a', last=2), Row(key='a', last=2)] ``` - **Expressions:** - Seperating into steps ``` # bad df = ( df .select('a', 'b', 'c', 'key') .filter(F.col('a') == 'truthiness') .withColumn('boverc', F.col('b') / F.col('c')) .join(df2, 'key', how='inner') .join(df3, 'key', how='left') .drop('c') ) # better df = ( df .select('a', 'b', 'c', 'key') .filter(F.col('a') == 'truthiness') ) df = df.withColumn('boverc', F.col('b') / F.col('c')) df = ( df .join(df2, 'key', how='inner') .join(df3, 'key', how='left') .drop('c') ) ``` - To keep things consistent, please wrap the entire expression into a single parenthesis block, and avoid using `\` ``` # bad df = df.filter(F.col('event') == 'executing')\ .filter(F.col('has_tests') == True)\ .drop('has_tests') # good df = ( df .filter(F.col('event') == 'executing') .filter(F.col('has_tests') == True) .drop('has_tests') ) ``` - One line - one logic: ``` # bad customers_with_shipping_address = ( customers_with_shipping_address .select('a', 'b', 'c', 'key') .filter(F.col('a') == 'truthiness') .withColumn('boverc', F.col('b') / F.col('c')) .join(df2, 'key', how='inner') ) # good customers = ( customers .select('a', 'b', 'c', 'key') .filter(F.col('a') == 'truthiness') ) customers = customers.withColumn('boverc', F.col('b') / F.col('c')) customers = customers.join(df_to_join, 'key', how='inner') ``` - **Others:** - Be wary of functions that grow too large. As a general rule, a file should not be over 250 lines, and a function should not be over 70 lines. - Star-import is prohibited in our repository. ``` # Bad from pyspark.sql.functions import * # Good from pyspark.sql.functions import col, lit, when # Also good from pyspark.sql import functions as F ``` **3. References** --- [pyspark-style-guide](https://github.com/palantir/pyspark-style-guide#dealing-with-nulls) [IBM](https://www.ibm.com/support/knowledgecenter/en/SSGU8G_12.1.0/com.ibm.datawhsenode.doc/datawhse.htm) [Data Teko](https://data.pages.teko.vn/) [Microsoft](https://docs.microsoft.com/en-us/analysis-services/?view=asallproducts-allversions) ###### tags: `tripi` `vntravel` `data` `vnlife`