# 1. Oveview
In this work, we created a data pipeline to load data from the S3 bucket onto Snowflake. During the pipeline, we apply the Slowly Changing Dimension (SCD) type 2 technique to track the historical data.
We collect data from [data.world](https://data.world/dr5hn/country-state-city). The dataset contains all countries, states, and cities all over the world with more than 200k records. We assume that City records are like records in the fact table in Data Warehouse, which we insert many times. State and Country are considered as a dimension table and be bulk load directly from the S3 to Snowflake.
# 2. External stage S3
The external stage references data files stored in a location outside of Snowflake (e.g: S3). Here are some steps to create an External Stage S3.
## a. IAM Policy
Snowflake requires some permissions on an S3 bucket and folder to be able to access files in the folder (and sub-folders). So that we need to create a policy in AWS, then Snowflake can access S3.
```ruby=
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:GetObjectVersion",
"s3:DeleteObject",
"s3:DeleteObjectVersion"
],
"Resource": "arn:aws:s3:::luattb-tuvd11-bucket/db/*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::luattb-tuvd11-bucket",
"Condition": {
"StringLike": {
"s3:prefix": [
"db/*"
]
}
}
}
]
}
```
This policy gives us permission to read, write and delete objects from the S3 bucket. I named this policy **snowflake policy** to easily use in the next step.
## b. IAM Role
We created a new role **mysnowflakerole** then attach **snowflake policy**. The aim of these steps is for security purposes, the role without permission can't access our source in S3. We modify a bit in the trust relationship section follow this:
```ruby=
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::337570307036:user/k190-s-sgst2910"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "RT97487_SFCRole=3_hIOeN5Y0N+9cZgzFf4K3i40Kn9Q="
}
}
}
]
}
```
> Caution: AWS and ExternalId will be modified later based on Snowflake.
## c. Create a Cloud Storage Integration in Snowflake
> A storage integration is a Snowflake object that stores a generated identity and access management (IAM) user for your S3 cloud storage.
* First of all, we created a storage integration by Snowflake command line. The storage_aws_role-arnn is from **mysnowflakerole**
```ruby=
create or replace storage integration s3_sf_int
type = external_stage
storage_provider = s3
enabled = true
storage_aws_role_arn = 'arn:aws:iam::294164302872:role/mysnowflakerole'
storage_allowed_locations = ('s3://luattb-tuvd11-bucket/db/')
```
* Next, we describe the **s3_sf_int** to get AWS and ExternalId that be mentioned above.
```
DESC INTEGRATION s3_sf_int;
```
The output looks like this:

We modified "mysnowflakerole" trust relationship with the red sections
* Then, we created a new role in snowflake that have permission to access integration.
```ruby=
create or replace role mysnowflakerole;
grant create stage on schema dim_fact_db to role mysnowflakerole;
grant usage on integration s3_sf_int to role mysnowflakerole;
```
* Besides that, we created CSV file format to easily use in the future. Cause City.csv file is difficult to separate by a comma, so we created both "," and "|" delimiter CSV file format.
```ruby=
//seperate by comma ","
create or replace file format my_csv_format
type = csv
field_delimiter = ','
skip_header = 1
null_if = ('NULL', 'null')
empty_field_as_null = true
//seperate by "|"
create or replace file format csv_format_col
type = csv
field_delimiter = '|'
skip_header = 1
null_if = ('NULL', 'null')
empty_field_as_null = true
```
* Finally, we created an external stage.
```ruby=
create or replace stage my_s3_stage
storage_integration = s3_sf_int
url = 's3://luattb-tuvd11-bucket/db/'
file_format = csv_format_col;
```
# 3. Steam and task using SCD (Slowly Changing Dimension)
## Perform SCD
SCDs are a common database modeling technique used to capture data in a table and show how it changes over time. Although SCDs are most commonly associated with dimensions.
In this work, we built an SCD in Snowflake is simple using the Streams and Tasks functionalities that Snowflake will use Snowpipe.
## Stream
A stream object captures data manipulation language (DML) changes made to a table, including inserts, updates, and deletes, as well as metadata about each change, so that actions can be taken using the changed data (rows & feaures). This process is referred to as change data capture (CDC).
My table stream tracks the changes made to rows in a source table. A table stream (landing table) makes a “change table” available of what changed, at the row level, between two transactional points of time in a table.
When a Stream is being created on a table, it takes a "current_flag" of every row in the source table as the current transactional version of the table. Stream store the "current transactional" of table data and return the "root data" by using versioning history for the source table.
## Task
When combined with streams to make an end-to-end data pipeline, we would need to create Snowflake tasks that can help you to schedule a single SQL or a stored procedure.
We can set up end to end data pipeline to get the data flow from source to master table while maintaining the Type 2 Slowly Changing Dimension.
Specifically part **3b** are the tasks that we will have to do as part of the ETL process.
## a. Create table
### Dimension table
- **Country dim table:** Record countries data including name, code and region of that country. This table functions as a dimension table for the fact table City.
```ruby=
create or replace TABLE SF_DATABASE.DIM_FACT_DB.COUNTRY_DIM (
COUNTRY_CODE NUMBER(38,0) autoincrement,
NAME VARCHAR(50),
REGION VARCHAR(25),
SYMBOL VARCHAR(25)
);
```
- **State dim table:** Record states data including name, id and region of that state of each country. This table functions as a dimension table for the fact table City.
```ruby=
create or replace TABLE SF_DATABASE.DIM_FACT_DB.STATE_DIM (
STATE_ID NUMBER(38,0) autoincrement,
NAME VARCHAR(50),
STATE VARCHAR(25)
);
```
### Fact table
- **City source/landing/root Table**:
Although the attributes in 3 tables are the same, each table has a different task:
+ Source table: contains data loaded from our external stage.
+ Landing table: contain and record the data when performing Stream object captures this data.
+ Root table: contains data of source table without update/delete.
```ruby=
create or replace TABLE SF_DATABASE.DIM_FACT_DB.CITY_SRC (
ID_CITY NUMBER(38,0) NOT NULL,
NAME VARCHAR(100),
ID_COUNTRY NUMBER(38,0),
ID_STATE NUMBER(38,0),
LAT FLOAT,
LONG FLOAT
);
```
- **City History Table:** Record historical data when performing insert/delete/update events in a Stream object captures data.
-- START_TIME:Stream start time.
-- END_TIME: Time of a stream when the update/delete is done.
-- CURRENT_FLAG:
+ Flag = 1, for the new rows with the latest record that has just been inserted/deleted.
+ Flag = 0, for rows with updated or deleted records, along with END_TIME data.
```ruby=
create or replace TABLE SF_DATABASE.DIM_FACT_DB.CITY_HISTORY (
ID_CITY NUMBER(38,0) NOT NULL,
NAME VARCHAR(100),
ID_COUNTRY NUMBER(38,0),
ID_STATE NUMBER(38,0),
LAT FLOAT,
LONG FLOAT,
START_TIME TIMESTAMP_NTZ(9),
END_TIME TIMESTAMP_NTZ(9),
CURRENT_FLAG NUMBER(38,0)
);
```
## b. Pipeline

##### Details of the tasks are as the following:
- **TASK01:** Truncate Source Table before every load.
- **TASK02:** Load data from raw file(in External Stage) to Source Table and remove the file from External stage on successful load.
- **TASK03:** Update data into Landing table with the latest Source table data.
- **TASK04:** Transform Landing to History table, based on tracking object Stream into Landing table.
- **TASK05:** Update Root table from History table , processing only current_flag=1 records.
## c. Perform to stream and tasks.
We use **load_wh** warehouse for loading task, **compute_wh** for transforming task. Here is our work to create pipeline from S3 to Snowflake
* The first task, we truncate the source table to assure that is empty. So that we don't need to merge a lot into a landing table. It is a root task, that happen every 5 minutes.
```ruby=
create or replace task city_history_root
warehouse=compute_wh
schedule='5 minute'
as
truncate table city_src
```
* And we create **city_history_stream** on **landing** table to catch insert/update/delete event.
```ruby=
create or replace task create_city_history_stream
warehouse=compute_wh
after csv_file_to_city_table
as
create or replace stream city_history_stream on table dim_fact_db.city_landing
```
* Next, we load data from S3 to source table. We just use the necessary column from CSV file
```ruby=
create or replace task csv_file_to_city_table
warehouse=load_wh
after create_city_history_stream
as
copy into city_src from(
select t.$1,t.$2,t.$6,t.$3,t.$9,t.$10
from @my_s3_stage/city (pattern=>'.*.csv',file_format=>csv_format_col) t
)
```
* After loading to the source table, we compare it with the landing table. If there is any new records or update, we will insert or update them into the landing.
```ruby=
create or replace task merge_city_from_raw_src
warehouse=compute_wh
after csv_file_to_city_table
as
merge into city_landing ct
using (select * from city_src) cts
on ct.id_city=cts.id_city
when not matched
then
insert(id_city,name,id_country,id_state,lat,long)
values (cts.id_city,cts.name,cts.id_country,cts.id_state,cts.lat,cts.long)
when matched and (ct.id_city!=cts.id_city or ct.name!=cts.name or ct.id_country!=cts.id_country
or ct.id_state!=cts.id_state or ct.lat!=cts.lat or ct.long!=cts.long)
then
update set
ct.id_city=cts.id_city, ct.name=cts.name, ct.id_country=cts.id_country,
ct.id_state=cts.id_state, ct.lat=cts.lat , ct.long=cts.long
```


* Then, The stream will catch events from the landing table (i.e insert/delete/update). Then compare it with the history table, if there are new records, we will insert them into the history table. If there are updated or deleted records, we will change the old records flag to 0 (i.e not in use), and endd_time.
```ruby=
create or replace task merge_final_table_scd2
warehouse=compute_wh
after merge_city_from_raw_src
as
merge into city_history ch
using (select * from city_history_stream) chs
on ch.id_city = chs.id_city and ch.name=chs.name and ch.id_state=chs.id_state and ch.lat=chs.lat and ch.long=chs.long
when not matched and (chs.metadata$action='INSERT') //insert and update in historical
then
insert(id_city,name,id_country,id_state,lat,long,start_time,end_time,current_flag)
values (chs.id_city,chs.name,chs.id_country,chs.id_state,chs.lat,chs.long,to_timestamp(current_timestamp),NULL, 1)
when matched and (chs.metadata$action='DELETE') //update and delete
then
update set ch.current_flag=0,
ch.end_time=to_timestamp(current_timestamp);
```

* After all, we will insert all records with flag=1 (i.e in use) from history table into the master table (city)
```ruby=
create or replace task insert_into_master_city
warehouse=compute_wh
after merge_final_table_scd2
as
insert overwrite into city
select id_city,name,id_country,id_state,lat,long from city_history where current_flag=1;
```
* Finally, we remove all available files in the S3 bucket to avoid redundant inserts in the future.
```ruby=
create or replace task src_file_remove
warehouse=load_wh
after insert_into_master_city
as
remove @MY_S3_STAGE/city pattern='.*.csv';
```
* Resume these task to automatically run the pipeline every 5 minutes
```ruby=
alter task create_city_history_stream resume;
alter task csv_file_to_city_table resume;
alter task merge_city_from_raw_src resume;
alter task merge_final_table_scd2 resume;
alter task insert_into_master_city resume;
alter task src_file_remove resume;
alter task city_history_root resume;
//root should be suspend when we want to alter child task
```
# 4. Visualize
## a. How to connect Snowflake to Power BI
**Step1:** Choose to get data from another source.

**Step2:** Connect to Power BI using your console on Snowflake

## b. Visualizing
#### Complete the following requiremnts:
1. Visualize the locations of countries around the world by a map chart using our Snowflake dataset.
2. Total number of countries in the dataset
2.1 Number of states of each country.
2.2 Total of cities of each states in this country.
3. Visualize a chart to output the list of states for each selected country.
3.1 Count and compare the number of cities in a certain state with many other states of a country.
3.2 Find the location of states and cities on the map chart.
4. Visualize a columns chart to rank the states of that country, which state has the most number of cities.
#### Visualization
For example, we choose the region as Asia, and the country here Vietnam to visualize.

--------------------------------------------