# The Ultimate Guide to Data Engineering @ VNLIFE
[https://hackmd.io/@vnlife-data/data-engineering-contribution-guide (0.0.4)](https://hackmd.io/@vnlife-data/data-engineering-contribution-guide)
Change logs:
| version | status | date | by | change log |
|---------|------------|------------|--------|---|
| 0.0.1 | released | 2020-03-04 | NhatNV | create version 0.0.1 |
| 0.0.2 | released | 2020-04-28 | BinhNQ | Update `5. Testing` - `Input/Output data-type testing` |
| 0.0.3 | released | 2020-05-07 | NhatNV | Update `2.3. Data design convention` - `Column naming` |
| 0.0.4 | released | 2021-03-23 | NhatNV | Update `4. Code convention` |
| 0.0.5 | released | 2021-03-23 | NhatNV | Replace date_id columns by date columns (`2.3. Data design convention`) |
| 0.1.0 | TODO | | NhatNV | `data/warehouse.next` and v2 tables |
---
# Table of content
1. Version control
1.1. Branchs & Tags
1.2. Merge request
1.3. Git flow
2. Data design & storage
2.1. Data warehouse schema
2.2. Buckets
2.3. Data design convention
3. Code structure
3.1. Repository structure
3.2. Scalable project structure
3.3. Alterations
3.4. Notes
4. Code convention
4.1. Common rules
4.2. Naming convention
4.3. Type hints
4.4. Docstring
4.5. Notes
4.6. Code quality tools
5. Testing
6. Logging
7. Pipeline set-up
8. Changes announcement
9. Performance
---
## 1. Version control
### 1.1. Branchs & Tags
- `master` is protected, long-live & default branch
- It's recommended commit message follow [keepachangelog](https://keepachangelog.com) guide. For example:
- `Added: a new excited feature`
- `Changed: rename file abc.ipynb to xyz.py`
- `Deprecated: function upload, use upload_file instead`
- `Removed: function upload has been deprecated seen yesterday`
- `Fixed: Security vulnerability CVE-1234`
Tag is created from `master`, follow [SemVer](https://semver.org) standard (e.g `v1.0.0`, `v2.0.3`).
- Tag message describe all changes since last release. MUST follow up [keepachangelog](https://keepachangelog.com) guide
- Pipeline is auto-triggered for each tag, in order to build & publish artifact.
### 1.3. Merge request (MR)
- Assign to code owner for review. Owner is defined in `docs/CODEOWNERS` file
- MR's title should be clearly and short enough, follow up [keepachangelog](https://keepachangelog.com) guide. Example: `Added 10 columns to fact QR`
- MR's description should:
- Match JIRA task description. For example, list 10 columns added in the example above.
- Check list to make clear what items in check list has been skipped, example:
> - [ ] unit tests
> - [x] integration test
> - [x] comment on every function
- Any note neccessary for approvers/future contributors who would use this code. Example: sample result of test run, model benchmark summary, etc.
- Maintainer will only merge when owner has given approval.
### 1.2. Git flow
We follow **simplified** gitflow as below.
#### 1.2.1. Checkout a new feature branch from `master`
```bash
git checkout master
git pull
git checkout -b <new-feature-branch>
```
#### 1.2.2. Made some progress: Commit your work to your feature branch
```bash
git add qr_processor.py
git commit -m "Added 10 columns to fact QR"
```
#### 1.2.3. Submit your code for review
```bash
git fetch
git rebase origin/master
git push -f
```
And then update the MR's title and description.
#### 1.2.4. Resolve conflicts when rebasing
When rebasing, you can got some conflicts. To resolve them more easier, you can squash all your commits into one by following this instruction:
Step 1: Abort rebasing by `git rebase --abort`
Step 2: Use `git log --graph` to find out the checkout point. In the example below is commit `E`.
```text
A---B---C feature
/
D---E---F---G master
```
Step 3: Squash all commit from `E` to current
```bash
git reset --soft <E>
git commit -m <your-commit-message>
```
Step 4: Redo rebasing
```bash
git rebase origin/master
```
You may still got conflicts, but now they only need to resolve one time.
## 2. Data desgin & storage
### 2.1. Data warehouse schema
Our target is to build a **star architecture** data warehouse. Note that we avoid building too many dimension that can turn star schema to snowflake schema.
We have 3 main layers as below:
- Raw
- ETL
- Fact: Information gain overtime. Example: daily transaction
- Dimension (dim): Store entity information
- Status: Store fact's status (transaction status, etc)
- Map: Mapping between DIMs. Example: `qr_promotion_bank_map` is a mapping between `qr_promotion` and `sale_channel` (a.k.a. bank).
- Cube
Each layer must:
- Read from itself and 1 layer before
- Write to the same layer
**Note**:
- Please avoid Multi-row calculation (aggregation, distinct, windowing, etc) on ETL layer. If you really need it, build a cube, or discuss with teammates during the design phase.
- We follow "Single source of truth" principle: The information on each layer can be found at one and only one column on one table, except foreign-keys.
### 2.2. Buckets
- `common`: Non-specific-company data. Example:
- Dim `date`
- Dim `location`: vietnamese ward/district/province in current
- Dim `merchant_business_type`: follow VISA rules
- `master`: Entity that existed in different company and can be merged. Example:
- Employee
- User/customer
- `extend`: External data from `VCM`, `mobio`, `beecost`, etc.
- `develop`: Data in developing phase, free to read and write.
- `publish` (hidden): To publish out
- Company buckets: Entity that only existed on specific-company data, depends on company services. Example:
- `vnpay`
- `vnshop`
- `tripi`
- `teko`
### 2.3. Data design convention
- Table naming:
- Should not contain parent directory in its name. Example: `fact_qr_payment_vnpay` should be `qr_payment_vnpay` because it've already stored in `etl/fact/` directory and no need `fact_` prefix anymore.
- ETL-Map should end with `_map`. Example: `qr_promotion_bank_map`.
- Table that belongs to a service should start with this service. Example: Dim about promotion of QR service should be named `qr_promotion`.
- Column naming:
- Column name must be lowercase.
- TimestampType columns should end with `_at`. Example: `created_at`.
- DateType columns should end with `_date`. Example: `created_date`.
- Each timestamp column existed should have a corresponding date column. Example: `tran_at` should have `tran_date`.
- Duration/Timedelta columns should end with `_time`. Example: `processing_time`.
- Action done by something: `{action}_by_{something}`. Example: `send_by_phone`.
- Name of human should have 3 columns: `first_name`, `last_name`, `full_name`.
- Detail address should have 4 columns: `address`, `house_number`, `street`, `location_id` (references to dim `location`). Even `longitude` and `latitude` if have information.
- Status columns should end with `_status`. Example: `tran_status`.
- Transaction attributes should start with `tran_`. Example: `tran_at`, `tran_id`.
- Phone number columns should end with `_phone`. Example: `customer_phone`.
- Money columns should end with `_amount`. Example: `debit_amount`, `discount_amount`.
- Boolean columns should start with `is_`. Example: `is_developed_by_vnpay`.
- Slow changing dimension (SCD) columns: `is_current`, `updated_date` (or `start_date`, `end_date`), `version`.
- Metrixes that calculated on sale orders only (not including refund/return orders) should start with `gross_`. Example: `gross_revenue`
- `revenue` means net revenue, that included refund/return orders subtraction.
- Counting number of orders/customers should end with `_count`. Example: `return_count`.
- Number of good/products should end with `_qty`. Example: `return_qty`.
- Avoid using `total_`. Example: `total_revenue` -> `revenue`.
- Aggregation on a date range acronym: `{number_of_days}d`. Example: `revenue_7d`, `gross_margin_30d`.
- Like for Like acronym: `lfl_`. Example: `lfl_revenue`.
- Datatype:
- Duration/Timedelta: Long, miliseconds.
- Status: Integer
- Any ID column: Long
- Money: Long or Decimal
- `gender`: String
- `house_number`: String in order to store `12A`, `12B`, etc
- Acceptable values / transformation rules:
- Human name, location name, voucher code: uppercase
- Status: 0 must be success
- Phone: must be normalized (we've already have an utility for this)
- Gender: F - Female, U - Male, U - Unknown
### 2.4. Partitioning
#### Dimension
- Snapshot: Contain snapshot (version) of data overtime (daily - partition by date).
- Current: A copy of the latest snapshot
```text
|_ dim_xyz
|_ snapshot
| |_ date=20200101
| |_ date=20200102
|_ current # Data is the same as snapshot/date=20200102
```
#### Fact/Cube: Partitioned by processing date
```text
|_ fact_xyz
|_ date=20200101
|_ date=20200102
```
## 3. Code structure
### 3.1. Repository structure
```text
|_ raw
| |_ <project> # Data write to /raw/<project> respectively
|_ etl
| |_ dim
| | |_ <project> # Data write to /etl/dim/<project> respectively
| |_ fact
| | |_ <project> # Data write to /etl/fact/<project> respectively
| |_ status
| |_ <project> # Data write to /etl/status/<project> respectively
|_ cube
| |_ <project> # Data write to /cube/<project> respectively
|_ publisher
| |_ _outbound # Only this can use 'published' runner to send data (report, etc...)
| |_ <project> # Data write to /publish/<project> respectively
|_ model
| |_ <project>
|_ exp # Personal experiments
| |_ <person>/...
|_ pipeline # Pipeline set-up here
|_ etl.yml
|_ cube.yml
|_ publish.yml
|_ ...
```
**Note**:
* For each project, should have a `README.md` file, shortly describe about the project.
### 3.2. Scalable project structure
Generally any pipeline involves the following stages:
- **Data Reading**: reads from the data sources.
- **Data Preprocessing**: remove invalid (eg: NaN) rows, discard unused columns, dedup & filter rows, etc are in this phase. It's recommended to filter before neccessary joins.
- **Data Enrichment/Transformation**: After data is clean and ready to be used, it is transformed and new columns are added after some business validations. This is a map phase. No data should be dropped during this phase.
- **Data Aggregation/Calculation**: This phase contains tasks related to aggregations and calculations over the transformed data. It's recommended to avoid join here.
- **Data Writing**: Final phase handles writes of the results to external stores.
Your project structure should be based on these stages.
```text
|_ job # meant for scheduled job executor from CI/CD
|_ full_overwrite.py
|_ daily.py
|_ weekly.py
|_ data # static data files and their reader/parser
|_ public # what the others can re-use your codes
|_ logic # All logic, not including data reading and writing steps
|_ helper/utils
|_ preprocessing
|_ clean_merchant.py
|_ clean_merchant_test.py
|_ clean_terminal.py
|_ clean_terminal_test.py
|_ transformation
|_ populate_merchant_with_terminal.py
|_ populate_merchant_with_terminal_test.py
|_ aggregation/calculation
|_ ...
```
### 3.3. Alterations
**Alteration 1**: If there is only one job => `main.py` is ok
```text
|_ logic
|_ ...
|_ main.py
```
**Alteration 2**: If preprocessing logic is very simple => `clean.py` is ok
```text
|_ logic
|_ preprocessing
|_ clean.py
|_ clean_test.py
|_ ...
|_ ...
```
**Alteration 3**: If all logic is very simple => No sub-folder in `logic` is ok
```text
|_ logic
|_ processor.py
|_ processor_test.py
|_ main.py
```
### 3.4. Notes
> `/utils` is a place where you can place small snippets you can use throughout the application. Small functions to build bigger things with. Only have static methods and be stateless. You would not create an instance of such a class.
> `/helpers` is more of a place where you store code architectural snippets in my view. Things essential for bootstrapping components and developer ergonomics. It can be a utility class or it can be stateful or require an instance be created.
If some parts of your codes repeated in other module(s), you may need to bring it into common helper/utilities (`common.helper`, `common.spark.helper`, `etl.common`, `cube.common`, etc) for reusing.
About `common.spark.extension`:
> This class is meant to maintain the API compatibility between Scala & Python API (missing .transform() func for example). Unfortunately, it has involved into something more than that compatbility layer. It's advised not to further overload this extension class with any other helper/utils functions without discussing extensively with the team.
## 4. Code convention
### 4.1. Common rules
- S.R.P (Single Responsibility Principle)
```python
# Bad
def remove_invalid_records_and_populate_location_id():
pass
# Good
def remove_invalid_records():
pass
def populate_location_id():
pass
```
- D.R.Y (Don't Repeat Yourself)
```python
# Bad: this code existed in both cube user and cube shop
transactions = qr_fact.filter(F.col("is_effective"))
# Good: write a method for reusing in cube user and cube shop
def extract_effective_txn(qr_fact):
return qr_fact.filter(F.col("is_effective"))
```
### 4.2. Naming convention
#### File naming convention
Filename should be meaningful and normalized.
```text
# Example: A file should do one thing, and its name should describe that thing
|_ preprocessing
|_ terminal.py
|_ merchant.py
```
```text
# Bad
|_ preprocessing
|_ terminal_preprocessor.py
|_ merchant_preprocessor.py
# Good: We don't need `preprocessor` suffix since these files are inside `preprocessing` folder
|_ preprocessing
|_ terminal.py
|_ merchant.py
```
#### Python-stuff naming convention
- A class name should be a noun. Example: `class PhoneNormalizer`
- A method name should be a verb. Example: `def normalize_phone()`
- A variable name should be a noun. Example: `normalized_phone`
- A constant name should be a noun and uppercase. Example: `DATE_FORMAT = "%Y%m%d"`
- Add `_name` before name to declare a private method, variable or constant.
- Name of stuffs should describe what it do / what it is. Example:
```python
# Bad
df = spark.read.parquet(input_path)
df = clean(df)
# Good
original_terminals_df = spark.read.parquet(input_path)
cleaned_terminals_df = clean_terminal(original_terminals_df)
# Or
cleaned_terminals_df = TerminalPreprocessor.clean(original_terminals_df)
```
### 4.3. Type hints
```python
# Bad
def init_spark(configs):
pass
# Good
from typing import Dict
from pyspark import SparkSession
def init_spark(configs: Dict[str, str]) -> SparkSession:
pass
```
### 4.4. Docstring/Documentation
For easier understanding your helper/utility class/method, please provided at least a docstring or a documentation describe:
- What is it?
- What does it do?
- Arguments description and type hints
- Return(s) description and type hints
- At least an example of usage (input + usage = output)
```python
class Calculator:
"""
A helper class to demo a good docstring/documentation.
Contain some basic binary operator between two integers.
Example:
from your_module.public import Calculator
a, b = 1, 2
a_add_b = Calculator.add(a, b) # 3
"""
@staticmethod
def add(a: int, b: int) -> int:
"""
Add two integers.
Args:
a: The first integer
b: The second integer
Return: A new integer equals to (a + b)
"""
return a + b
```
### 4.5. Notes
#### Hard-coding is prohibited, please predefine a constant with meaningful name instead of using an hard-coding value.
```python
# Bad
def extract_success_transactions(transactions: DataFrame) -> DataFrame:
return transactions.filter(F.col("status") == 0)
# Good
SUCCESS_STATUS = 0
def extract_success_transactions(transactions: DataFrame) -> DataFrame:
return transactions.filter(F.col("status") == SUCCESS_STATUS)
# Better
class TransactionStatus:
SUCCESS = 0
# Other statuses can go here
def extract_success_transactions(transactions: DataFrame) -> DataFrame:
return transactions.filter(F.col("status") == TransactionStatus.SUCCESS)
```
#### Star-import is prohibited in our repository.
```python
# Bad
from pyspark.sql.functions import *
# Good
from pyspark.sql.functions import col, lit, when
# Also good
from pyspark.sql import functions as F
```
#### Path of a dim which has current-snapshot directories
```python
class EtlPath:
DIM_XYZ = Path(bucket=Bucket.VNPAY_DATAWAREHOUSE, location="etl/dim/xyz")
# Usage in dim-processing
all_snapshot_path = EtlPath.DIM_XYZ.generate(partition="snapshot")
specific_snapshot_path = EtlPath.DIM_XYZ.generate(partition="snapshot/date=20200101")
# Usage as an input of other things:
dim_xyz_path = EtlPath.DIM_XYZ.generate(partition="current")
```
#### Public code
Your project may have some methods/classes/constants that others need to import from. Put/Import them into `public` directory inside your project. These things should be well documented for anyone can understand.
Note: Don't import public things in your internal project to avoid circular importing.
- Bad
```text
your_module
|_ data
| |_ constants.py
|_ public
|_ __init__.py
|_ utils.py
```
```python
# data/constants.py
from your_module.public.utils import generate_location_id # Should not do this
UNKNOWN_PROVINCE_CODE = "99"
UNKNOWN_DISTRICT_CODE = "999"
UNKNOWN_WARD_CODE = "99999"
UNKNOWN_LOCATION = generate_location_id(
province_code=UNKNOWN_PROVINCE_CODE,
district_code=UNKNOWN_DISTRICT_CODE,
ward_code=UNKNOWN_WARD_CODE,
)
```
```python
# public/__init__.py
from your_module.data.constants import UNKNOWN_LOCATION
from .utils import generate_location_id
# public/utils
def generate_location_id():
pass
```
- Good
```text
your_module
|_ data
| |_ constants.py
|_ utils
| |_ __init__.py
| |_ generate_location_id.py
|_ public
|_ __init__.py
|_ utils.py
```
```python
# data/constants.py
from your_module.utils import generate_location_id
UNKNOWN_PROVINCE_CODE = "99"
UNKNOWN_DISTRICT_CODE = "999"
UNKNOWN_WARD_CODE = "99999"
UNKNOWN_LOCATION = generate_location_id(
province_code=UNKNOWN_PROVINCE_CODE,
district_code=UNKNOWN_DISTRICT_CODE,
ward_code=UNKNOWN_WARD_CODE,
)
```
```python
# public/__init__.py
from your_module.data.constants import UNKNOWN_LOCATION
from .utils import generate_location_id
# public/utils.py
from your_module.utils import generate_location_id
```
```python
# utils/__init__.py
from .generate_location_id import generate_location_id
# utils/generate_location_id
def generate_location_id():
pass
```
### 4.6. Code quality tools
- Auto-forrmater:
- Use `black` (recommended)
```bash
black <your_module>
```
- Use `Pycharm`
```text
Ctrl Alt L
```
- PEPs warning:
- Use `flake8`
```bash
flake8 <your_module>
```
- Use `Pycharm`
## 5. Testing
Attribution: https://docs.google.com/document/d/1XBTWa_ZJs1vd4xrOLRgwZvEZtcVjCyh7/
#### The unittests
- The unittests of each file should be placed in the same folder with that file.
- The unittest's filename must end with `_test`.
```text
|_ preprocessing
|_ clean.py
|_ clean_test.py
```
- A unittest method must start with `test_`.
```python
# clean_test.py
def test_clean(spark):
pass
```
- To run unittest:
```bash
pip install pytest-cov
pytest --cov=<path_of_your_module> <path_of_your_module>
```
#### Testcases
- Should contain at least one happy case.
```python
# Good example
@pytest.mark.parametrize("testcase", [
{
"name": "Happy case",
"input": cleaned_df, # Cleaned data no need cleaning
"expected": cleaned_df,
},
# ...
])
def test_clean(spark, testcase):
...
# Better example - better name
@pytest.mark.parametrize("testcase", [
{
"name": "Cleaned data do not need cleaning",
"input": cleaned_df,
"expected": cleaned_df,
},
# ...
])
def test_clean(spark, testcase):
...
```
- Each test case should have a clearly name, describe what it does or expects.
```python
# Bad
@pytest.mark.parametrize("testcase", [
# ...
{
"name": "Unhappy case",
"input": raw_data,
"expected": trimmed_data,
},
# ...
])
def test_clean(spark, testcase):
...
# Good
@pytest.mark.parametrize("testcase", [
# ...
{
"name": "Column X, Y, Z should be trimmed",
"input": raw_data,
"expected": trimmed_data,
},
# ...
])
def test_clean(spark, testcase):
...
```
#### Testable logic
Your logic input(s) should be DataFrame(s) and output(s) should be DataFrame(s), so they can be tested by pytest. The Data-Reading and Data-Writing steps should not be in the logic.
```python
# logic/main_processor.py
class MainProcessor:
@staticmethod
def process(terminal: DataFrame, merchant: DataFrame) -> DataFrame:
"""
Do all steps: preprocessing -> transformation
"""
# Data Preprocessing
cleaned_terminal = clean_terminal(terminal)
cleaned_merchant = clean_merchant(merchant)
# Data Transformation
dim_terminal = cleaned_terminal.transform(populate_merchant_info, merchant=cleaned_merchant)
# Data Calculation
# In this example we have no calculation
return dim_terminal
# logic/main_processor_test.py
def test_process(spark):
pass
# main.py
def main():
# ...
# Data Reading
terminal_df = spark.read.parquet(path=RAW_TERMINAL_PATH)
merchant_df = spark.read.parquet(path=RAW_MERCHANT_PATH)
# Logic (Data Preprocessing, Transformation, Calculation, v.v...)
dim_terminal = MainProcessor.process(terminal=terminal_df, merchant=merchant_df)
# Data Writing
dim_terminal.write.parquet(path=DIM_TERMINAL_PATH)
# ...
```
#### How to write testcases before coding?
Follow these steps, don't think about coding, and you will know how to write testcases before coding:
- Make a mockup of your input(s), we called `input`.
- Make a mockup of your output, we called `output`.
- Make some temporary data that represent your logic steps, we called `cleaned_input`, `enriched_input`, etc.
The logic will be: `input` => `cleaned_input` => `enriched_input` => `output`
- Now write testcases for each transition (`=>` symbol). If there are many testcases for that transition, it must be a complexity step and you should break-down it to some smaller steps.
- Code each transition in a method. It can call some smaller methods for clearly code, these methods will start with `_` and can have no testcases if it's simple.
```python
# preprocessor_test.py
def test_clean(spark):
test_cases = [
{
"name": "Happy case, input is cleaned",
# ...
},
{
"name": "Invalid records should be removed",
# ...
},
{
"name": "Test records should be removed",
# ...
},
# ...
]
# ...
# preprocessor.py
def _remove_invalid_records(df: DataFrame) -> DataFrame:
pass
def _remove_test_records(df: DataFrame) -> DataFrame:
pass
def clean(df: DataFrame) -> DataFrame:
return (
df
.transform(_remove_invalid_records)
.transform(_remove_test_records)
)
```
### Input/Output data-type testing
It's importance because:
- Your method's input table is another method's output table, and vice versa.
- Your method assume a certain table schema/column names.
- Unittest is insufficient since it assumes no change in input.
Instructions:
- Define your output schema that matched with our Data Dictionary.
```python
# your_module/public/schema.py
from pyspark.sql import types as T
# can name it OUTPUT_SCHEMA if too long
DIM_XYZ_SCHEMA = T.StructType([
T.StructField("id", T.LongType(), True),
# Other fields go here
])
```
- Define your input schema(s) and output schema of your component. They can be imported from others output schema, or another component
```python
# your_module/logic/transform/processor.py
from another_module.public.schema import ANOTHER_SCHEMA
class Processor:
INPUT_SCHEMA = ANOTHER_SCHEMA
OUTPUT_SCHEMA = T.StructType([
# ...
])
```
- Provide at least a testcase that use above schemas
```python
# your_module/logic/transform/processor_test.py
@pytest.mark.parametrize("test", [
{
"name": "Happy case",
"input_schema": Processor.INPUT_SCHEMA,
},
])
def test_schema(spark, test):
expected_df = spark.createDataFrame([], schema=Processor.OUTPUT_SCHEMA)
input_df = spark.createDataFrame([], schema=test["input_schema"])
actual_df = Processor.process(input_df)
assert_dataframe_schema_equals(actual_df, expected_df, message=test["name"])
```
## 6. Logging
```
import logging
from common.logging import setup_logging
setup_logging()
logger = logging.getLogger()
logger.info("It worked!")
```
Your log file will be stored in Minio at path `prod-pipeline-log/{app_name}--{app_id}.log`
## 7. Pipeline set-up
You should have 2 or 3 pipelines as below:
- A pytest pipeline
- A main pipeline
- (Optional) A copy-snapshot-to-current pipeline (for dim)
Please see `pipeline/etl.yml` for some examples.
## 8. Changes annoucement
- Any changes by you (common changes, data design changes, etc) that can affect others work should be announce in our Slack channel `data` or a specific team channel.
> Hi @channel, dim date have just updated: date_id was changed data type from StringType to LongType.
- If a method is not good and you want to propose a new one, please deprecate the old one with backward compatible then fire a warning in code and channel either.
```python
import warnings
def new_method():
pass
def old_method():
warnings.warn(
"'old_method' was deprecated, use 'new_method' instead",
DeprecationWarning
)
# Backward Compatible
new_method()
```
> Hi @channel, `old_method` was deprecated, please use `new_method` instead for better performance. We'll keep a backward compatible version of `old_method` and remove it in the future.
- If the usage is changed along with the data, you should provided the new usage.
> Hi @channel, dim `xyz` was changed type from type-0 to type-1, with new 2 sub-directories: `current` and `snapshot`. Anyone used this dim should use `current` sub-directory, see example below:
```python
# old: vnpay-prod/etl/dim/xyz
EtlVnpayPath.DIM_XYZ.generate(env='prod')
# new: vnpay-prod/etl/dim/xyz/current
EtlVnpayPath.DIM_XYZ.generate(env='prod', partition='current')
```
## 9. Performance
### 9.1. Partitioning
Repartitioning before writing a DataFrame can avoid skewing.
- Example 1: Given a fact table partitioned by date as below
```text
|_ fact
|_ date=20200101
|_ date=20200102
|_ ...
```
We expect that each partition `date=YYYYMMDD` should contain only one parquet file. To do that, use `fact.repartition('date')` as below:
```python
fact.repartition('date').write.parquet(path='fact', partitionBy=['date'], mode='overwrite')
```
- Example 2: Given a dim partitioned by processing date, but we want to write to a specific partition
```text
|_ dim
|_ date=20200101
|_ date=20200102
|_ ...
```
We expect that each partition `date=YYYYMMDD` should contain only one parquet file. To do that, use `dim.coalesce(1)` as below:
```python
dim.coalesce(1).write.parquet(path='dim/date=20200103', mode='overwrite')
```
### 9.2. Joining
When joining two DataFrames, it's better to place bigger DataFrame on the left.
```python
bigger_df.join(smaller_df, on='...', how='...')
```
If the right one is small enough (almost dimension is small except customer-based dim), you can force broadcast join.
```python
from pyspark.sql import functions as F
bigger_df.join(F.broadcast(smaller_df), on='...', how='...')
```
While joining multiple DataFrame, some small DataFrame should be pre-joining before joining with the others.
```python
bigger_df.join(smaller_df_1).join(smaller_df_2)
# Better
smaller_df = smaller_df_1.join(smaller_df_2)
bigger_df.join(smaller_df)
```