# [PIC] DMS CDC with KDS and Lambda ## AWS Event Engine ## URL: https://dashboard.eventengine.run/login?hash=6fce-1f5cb570e4-65 選擇 One-time password (OTP) ![](https://i.imgur.com/Jf3cEhi.png) 設定一個 Team Name (e.g. AWS Jack Hsu) ![](https://i.imgur.com/1QP4COx.png) ![](https://i.imgur.com/eOXYYQQ.png) # Resources # 錄影欓: # 0. Environment Setting # 0. Team Dashboard Download SSH Key 1. Create DMS Replication Instance * Name: replication_instance * Instance Type: r5.xlarge * VPC: DmsVpc * Single-AZ 2. Create Kinesis Data Streams * player-kds 3. Create ElastiCache - Redis * Name: player-redis * Cluster mode: Disabled * redis-subnet-gorup-1 * VPC: DmsVpc * Advanced settings Security: RDSSecurityGroup 4. IAM Role * DMS Service role: dms-admin-services-role (Attached AdministratorAccess) * Lambda Services role: lambda-admin-services-role 5. Create Cloud9 (選擇性:只是為了要建立 redis 的 package) 6. Security Group * InstanceSecurityGroup Edit inbound rules Add TCP 3389 (0.0.0.0/0) * RDSSecurityGroup Edit inbound rules Add TCP 6379 # 1. Enable MSSQL CDC # 0. RDP Login on EC2 1. CDC Permission ``` --Set master database context use [master] GO --Add the awssct login to the sysadmin server role - required for replication ALTER SERVER ROLE [sysadmin] ADD MEMBER [awssct] GO --Set the recovery model to full for dms_sample - required for replication ALTER DATABASE [dms_sample] SET RECOVERY FULL WITH NO_WAIT GO --Configure this SQL Server as its own distributor exec sp_adddistributor @distributor = @@SERVERNAME, @password = N'Password1' exec sp_adddistributiondb @database = N'distribution', @data_folder = N'C:\Program Files\Microsoft SQL Server\MSSQL14.MSSQLSERVER\MSSQL\DATA', @log_folder = N'C:\Program Files\Microsoft SQL Server\MSSQL14.MSSQLSERVER\MSSQL\Data', @log_file_size = 2, @min_distretention = 0, @max_distretention = 72, @history_retention = 48, @security_mode = 1 GO --Change context to the distribution database use [distribution] GO --Configure replication if (not exists (select * from sysobjects where name = 'UIProperties' and type = 'U ')) create table UIProperties(id int) if (exists (select * from ::fn_listextendedproperty('SnapshotFolder', 'user', 'dbo', 'table', 'UIProperties', null, null))) EXEC sp_updateextendedproperty N'SnapshotFolder', N'C:\Program Files\Microsoft SQL Server\MSSQL14.MSSQLSERVER\MSSQL\ReplData', 'user', dbo, 'table', 'UIProperties' else EXEC sp_addextendedproperty N'SnapshotFolder', N'C:\Program Files\Microsoft SQL Server\MSSQL14.MSSQLSERVER\MSSQL\ReplData', 'user', dbo, 'table', 'UIProperties' GO exec sp_adddistpublisher @publisher = @@SERVERNAME, @distribution_db = N'distribution', @security_mode = 1, @working_directory = N'C:\Program Files\Microsoft SQL Server\MSSQL14.MSSQLSERVER\MSSQL\ReplData', @trusted = N'false', @thirdparty_flag = 0, @publisher_type = N'MSSQLSERVER' GO ``` 2. CDC Enable DB ``` use dms_sample EXEC sys.sp_cdc_enable_db ``` 3. sp_cdc_enable_table ``` exec sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'player', @role_name = NULL, @supports_net_changes = 1 GO ``` # 2. DMS 1. Create Source Endpoint for MSSQL Name: mssql-source Type: Source Source Engine: Microsoft SQL Server Access to endpoint database Provide access information manually Server Name: ec2-18-183-45-68.ap-northeast-1.compute.amazonaws.com Database Name: dms_sample User Name: awssct Password: Password1 2. Create Target Endpoint for KDS Copy KDS ARN: arn: aws:kinesis:ap-northeast-1:025363318636:stream/player-kds Copy Lambda ARN: arn:aws:iam::025363318636:role/dms-admin-services-role Name: kds-target Target engine: Kinesis 3. Create Database migration tasks Name: mssql-kds-cdc-task Replication instance Migration type: Replicate data changes only Table mapping: ``` { "rules": [ { "rule-type": "selection", "rule-id": "083388160", "rule-name": "083388160", "object-locator": { "schema-name": "dbo", "table-name": "player" }, "rule-action": "include", "filters": [] }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "2", "rule-action": "map-record-to-record", "target-table-name": "PLAYER", "object-locator": { "schema-name": "dbo", "table-name": "player" }, "mapping-parameters": { "partition-key-type": "schema-table" } } ] } ``` # 3. SQL example ``` DELETE FROM dbo.player WHERE last_name = 'HSU' and first_name = 'Jack'; INSERT INTO dbo.player (sport_team_id, last_name,first_name,full_name) VALUES ('62', 'Hsu','Jack','Jack Hsu'); UPDATE dbo.player SET full_name = '2022-09-01 00:45' WHERE last_name = 'HSU' and first_name = 'Jack'; ``` # 4. Lambda # * Lambda Env 1. Add Layer for Redis package ``` mkdir pytho cd python pip3 install redis -t . ls -ltr rm -rf *dist-info ls -tlr cd .. zip -r redis-lambda-package.zip python ``` 2. Runtime: Python 3.9 3. Layer: Redis Layer 4. VPC: DmsVPC 5. Service Role: lambda-admin * Lambda - DMS Record print log Name: kds-redis-print-log Add Layer -> Custom Layers -> Add Trigger: KDS <- kinesis/player-kds ``` from __future__ import print_function import boto3 import time import sys import socket import json import base64 import logging import dateutil.tz import redis from datetime import timedelta, date, datetime asiaTaipei = dateutil.tz.gettz('Asia/Taipei') logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): for record in event['Records']: #Kinesis data is base64 encoded so decode here kinesis=record["kinesis"] print("Kinesis payload: %s" %(str(kinesis))) payload=base64.b64decode(record["kinesis"]["data"]) print("Decoded payload: %s" %(str(payload))) # TODO implement return { 'statusCode': 200, 'body': json.dumps('Lambda return statusCode: 200') } ``` * DMS CDC -> Redis ``` from __future__ import print_function import boto3 import time import sys import socket import json import base64 import logging import dateutil.tz import redis from datetime import timedelta, date, datetime asiaTaipei = dateutil.tz.gettz('Asia/Taipei') logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): #elasticache settings redisClient = redis.StrictRedis(host='player-redis-001.rw9www.0001.apne1.cache.amazonaws.com', port=6379, db=0, password=None, socket_timeout=None, connection_pool=None, charset='utf-8', errors='strict', unix_socket_path=None) for record in event['Records']: #Kinesis data is base64 encoded so decode here payload=base64.b64decode(record["kinesis"]["data"]) print("Decoded payload: %s" %(str(payload))) cdcdata=json.loads(payload) oper=str(cdcdata["metadata"]["operation"]) record=cdcdata["data"] print("metadata.operation: %s" %(oper) ) print("record: %s" %(record) ) playerid=record["id"] try: if oper == "insert": redisClient.set( playerid, json.dumps(record) ) message="operation function insert for PLAYER ID '%s' " %(playerid) print("Redis get %s" %(redisClient.get(playerid)) ) if oper == "update": redisClient.set( playerid, json.dumps(record) ) message="operation function update for PLAYER ID '%s' " %(playerid) print("Redis get %s" %(redisClient.get(playerid)) ) if oper == "delete": redisClient.delete( playerid ) message="operation function delete for PLAYER ID '%s' " %(playerid) print("Redis get %s" %(redisClient.get(playerid)) ) except Exception as e: print("Exception: " + e) #logging for monitoring log = { 'created': now.strftime("%Y-%m-%d %H:%M:%S"), 'services': 'USERS', #(USERS/CODE/SALES/INTERACTION) 'transaction': 'kds_redshift', #(kds_dynamodb / kds_aurora_mysql / kds_dynamodb / kds_redshift) 'operation': oper, #( insert / update / delete ) 'status': 'failed', #( success / failed ) 'message': e, 'tag': 'devax_log_insight', 'data': record }; print(log) #logging for monitoring now = datetime.now(tz=asiaTaipei) log = { 'created': now.strftime("%Y-%m-%d %H:%M:%S"), 'services': 'USERS', #(USERS/CODE/SALES/INTERACTION) 'transaction': 'kds_redis', #(kds_dynamodb / kds_aurora_mysql / kds_dynamodb / kds_redshift) 'operation': oper, #( insert / update / delete ) 'status': 'success', #( success / failed ) 'message': message, 'tag': 'devax_log_insight', 'data': record }; print(log) # TODO implement return { 'statusCode': 200, 'body': json.dumps('Lambda return statusCode: 200') } ```