# [PIC] DMS CDC with KDS and Lambda
## AWS Event Engine ##
URL: https://dashboard.eventengine.run/login?hash=6fce-1f5cb570e4-65
選擇 One-time password (OTP)

設定一個 Team Name (e.g. AWS Jack Hsu)


# Resources #
錄影欓:
# 0. Environment Setting #
0. Team Dashboard Download SSH Key
1. Create DMS Replication Instance
* Name: replication_instance
* Instance Type: r5.xlarge
* VPC: DmsVpc
* Single-AZ
2. Create Kinesis Data Streams
* player-kds
3. Create ElastiCache - Redis
* Name: player-redis
* Cluster mode: Disabled
* redis-subnet-gorup-1
* VPC: DmsVpc
* Advanced settings Security: RDSSecurityGroup
4. IAM Role
* DMS Service role: dms-admin-services-role (Attached AdministratorAccess)
* Lambda Services role: lambda-admin-services-role
5. Create Cloud9 (選擇性:只是為了要建立 redis 的 package)
6. Security Group
* InstanceSecurityGroup
Edit inbound rules
Add TCP 3389 (0.0.0.0/0)
* RDSSecurityGroup
Edit inbound rules
Add TCP 6379
# 1. Enable MSSQL CDC #
0. RDP Login on EC2
1. CDC Permission
```
--Set master database context
use [master]
GO
--Add the awssct login to the sysadmin server role - required for replication
ALTER SERVER ROLE [sysadmin] ADD MEMBER [awssct]
GO
--Set the recovery model to full for dms_sample - required for replication
ALTER DATABASE [dms_sample] SET RECOVERY FULL WITH NO_WAIT
GO
--Configure this SQL Server as its own distributor
exec sp_adddistributor @distributor = @@SERVERNAME, @password = N'Password1'
exec sp_adddistributiondb @database = N'distribution', @data_folder = N'C:\Program Files\Microsoft SQL Server\MSSQL14.MSSQLSERVER\MSSQL\DATA', @log_folder = N'C:\Program Files\Microsoft SQL Server\MSSQL14.MSSQLSERVER\MSSQL\Data', @log_file_size = 2, @min_distretention = 0, @max_distretention = 72, @history_retention = 48, @security_mode = 1
GO
--Change context to the distribution database
use [distribution]
GO
--Configure replication
if (not exists (select * from sysobjects where name = 'UIProperties' and type = 'U '))
create table UIProperties(id int)
if (exists (select * from ::fn_listextendedproperty('SnapshotFolder', 'user', 'dbo', 'table', 'UIProperties', null, null)))
EXEC sp_updateextendedproperty N'SnapshotFolder', N'C:\Program Files\Microsoft SQL Server\MSSQL14.MSSQLSERVER\MSSQL\ReplData', 'user', dbo, 'table', 'UIProperties'
else
EXEC sp_addextendedproperty N'SnapshotFolder', N'C:\Program Files\Microsoft SQL Server\MSSQL14.MSSQLSERVER\MSSQL\ReplData', 'user', dbo, 'table', 'UIProperties'
GO
exec sp_adddistpublisher @publisher = @@SERVERNAME, @distribution_db = N'distribution', @security_mode = 1, @working_directory = N'C:\Program Files\Microsoft SQL Server\MSSQL14.MSSQLSERVER\MSSQL\ReplData', @trusted = N'false', @thirdparty_flag = 0, @publisher_type = N'MSSQLSERVER'
GO
```
2. CDC Enable DB
```
use dms_sample
EXEC sys.sp_cdc_enable_db
```
3. sp_cdc_enable_table
```
exec sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'player',
@role_name = NULL,
@supports_net_changes = 1
GO
```
# 2. DMS
1. Create Source Endpoint for MSSQL
Name: mssql-source
Type: Source
Source Engine: Microsoft SQL Server
Access to endpoint database
Provide access information manually
Server Name: ec2-18-183-45-68.ap-northeast-1.compute.amazonaws.com
Database Name: dms_sample
User Name: awssct
Password: Password1
2. Create Target Endpoint for KDS
Copy KDS ARN: arn: aws:kinesis:ap-northeast-1:025363318636:stream/player-kds
Copy Lambda ARN: arn:aws:iam::025363318636:role/dms-admin-services-role
Name: kds-target
Target engine: Kinesis
3. Create Database migration tasks
Name: mssql-kds-cdc-task
Replication instance
Migration type: Replicate data changes only
Table mapping:
```
{
"rules": [
{
"rule-type": "selection",
"rule-id": "083388160",
"rule-name": "083388160",
"object-locator": {
"schema-name": "dbo",
"table-name": "player"
},
"rule-action": "include",
"filters": []
},
{
"rule-type": "object-mapping",
"rule-id": "2",
"rule-name": "2",
"rule-action": "map-record-to-record",
"target-table-name": "PLAYER",
"object-locator": {
"schema-name": "dbo",
"table-name": "player"
},
"mapping-parameters": {
"partition-key-type": "schema-table"
}
}
]
}
```
# 3. SQL example
```
DELETE FROM dbo.player
WHERE last_name = 'HSU' and first_name = 'Jack';
INSERT INTO dbo.player
(sport_team_id,
last_name,first_name,full_name)
VALUES
('62',
'Hsu','Jack','Jack Hsu');
UPDATE dbo.player
SET full_name = '2022-09-01 00:45'
WHERE last_name = 'HSU' and first_name = 'Jack';
```
# 4. Lambda #
* Lambda Env
1. Add Layer for Redis package
```
mkdir pytho
cd python
pip3 install redis -t .
ls -ltr
rm -rf *dist-info
ls -tlr
cd ..
zip -r redis-lambda-package.zip python
```
2. Runtime: Python 3.9
3. Layer: Redis Layer
4. VPC: DmsVPC
5. Service Role: lambda-admin
* Lambda - DMS Record print log
Name: kds-redis-print-log
Add Layer -> Custom Layers ->
Add Trigger: KDS <- kinesis/player-kds
```
from __future__ import print_function
import boto3
import time
import sys
import socket
import json
import base64
import logging
import dateutil.tz
import redis
from datetime import timedelta, date, datetime
asiaTaipei = dateutil.tz.gettz('Asia/Taipei')
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
for record in event['Records']:
#Kinesis data is base64 encoded so decode here
kinesis=record["kinesis"]
print("Kinesis payload: %s" %(str(kinesis)))
payload=base64.b64decode(record["kinesis"]["data"])
print("Decoded payload: %s" %(str(payload)))
# TODO implement
return {
'statusCode': 200,
'body': json.dumps('Lambda return statusCode: 200')
}
```
* DMS CDC -> Redis
```
from __future__ import print_function
import boto3
import time
import sys
import socket
import json
import base64
import logging
import dateutil.tz
import redis
from datetime import timedelta, date, datetime
asiaTaipei = dateutil.tz.gettz('Asia/Taipei')
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
#elasticache settings
redisClient = redis.StrictRedis(host='player-redis-001.rw9www.0001.apne1.cache.amazonaws.com', port=6379, db=0, password=None, socket_timeout=None, connection_pool=None, charset='utf-8', errors='strict', unix_socket_path=None)
for record in event['Records']:
#Kinesis data is base64 encoded so decode here
payload=base64.b64decode(record["kinesis"]["data"])
print("Decoded payload: %s" %(str(payload)))
cdcdata=json.loads(payload)
oper=str(cdcdata["metadata"]["operation"])
record=cdcdata["data"]
print("metadata.operation: %s" %(oper) )
print("record: %s" %(record) )
playerid=record["id"]
try:
if oper == "insert":
redisClient.set( playerid, json.dumps(record) )
message="operation function insert for PLAYER ID '%s' " %(playerid)
print("Redis get %s" %(redisClient.get(playerid)) )
if oper == "update":
redisClient.set( playerid, json.dumps(record) )
message="operation function update for PLAYER ID '%s' " %(playerid)
print("Redis get %s" %(redisClient.get(playerid)) )
if oper == "delete":
redisClient.delete( playerid )
message="operation function delete for PLAYER ID '%s' " %(playerid)
print("Redis get %s" %(redisClient.get(playerid)) )
except Exception as e:
print("Exception: " + e)
#logging for monitoring
log = {
'created': now.strftime("%Y-%m-%d %H:%M:%S"),
'services': 'USERS', #(USERS/CODE/SALES/INTERACTION)
'transaction': 'kds_redshift', #(kds_dynamodb / kds_aurora_mysql / kds_dynamodb / kds_redshift)
'operation': oper, #( insert / update / delete )
'status': 'failed', #( success / failed )
'message': e,
'tag': 'devax_log_insight',
'data': record
};
print(log)
#logging for monitoring
now = datetime.now(tz=asiaTaipei)
log = {
'created': now.strftime("%Y-%m-%d %H:%M:%S"),
'services': 'USERS', #(USERS/CODE/SALES/INTERACTION)
'transaction': 'kds_redis', #(kds_dynamodb / kds_aurora_mysql / kds_dynamodb / kds_redshift)
'operation': oper, #( insert / update / delete )
'status': 'success', #( success / failed )
'message': message,
'tag': 'devax_log_insight',
'data': record
};
print(log)
# TODO implement
return {
'statusCode': 200,
'body': json.dumps('Lambda return statusCode: 200')
}
```