# SMS: Spark Migration Service Este serviço tem o objetivo de replicar dados entre nossos bancos e monitorar a consistência destas replicações. ## Motivação O SMS tem como sua principal motivação criar uma alternativa in-house para serviços como o DMS, a fim de manter uma replicação de dados entre diferentes bancos. Além da questão financeira, a ideia do SMS é fornecer ferramentas para o acompanhamento dessa replicação e facilitar a detecção de problemas com os dados nos bancos replicados. Hoje existe uma necessidade enorme de acesso aos dados do transacional para os times de operação e para monitoramentos do nosso produto. Para evitar o acesso ao banco de produção, existem duas replicações que visam sanar essa necessidade, mantendo o delay o menor possível. O problema é que a solução atual, o DMS, tem sofrido com o aumento de volume de dados e também com limitações próprias, gerando uma manutenção pesada e complicada. Através da captura de mudança de dados (CDC), a ideia é lidar com esse processamento de grande volume de dados com uma ferramenta própria, fazendo uso assim do Spark. Desacoplando o processamento de dados replicados dos bancos (as mudanças são apenas capturadas e enviadas para o S3), o DMS se torna menos propenso a erros e evita sobrecarga nos bancos de origem ou destino. Além disso, o SMS cria a possibilidade de receber dados de outras formas e normalizá-los para serem replicados aos bancos de destino. ## Funcionamento Inicialmente, os dados são copiados no formato *parquet* do banco de origem para um bucket no S3, utilizando o *Database Migration Service* (DMS), da AWS. A partir disto, nosso serviço, que roda no Airflow, realiza o procedimento de Full Load, CDC (change data capture) e checa a consistência dos dados replicados, conforme é mostrado no diagrama a seguir: ![Diagrama de disparo das DAGs do DMS](https://i.imgur.com/06LTfRJ.png) Cada uma destas DAGs é gerada dinamicamente para cada uma das tabelas cadastradas na variável `replication_tables`, cujo exemplo pode ser dado por: ``` [ { "dms_task_name": "salesforce-middleware-db-postgres-s3", "source_database_type": "postgres", "source_database": "salesforce-middleware-db", "source_schema": "public", "source_table": "Events", "source_database_connection_id": "postgres_salesforce_middleware", "source_database_pool": "pool_db_finops", "targets": [ { "target_database": "finops-db", "target_database_type": "postgres", "target_schema": "dataops_sandbox", "target_table": "salesforce_middleware_events", "target_database_connection_id": "sms_finops_writer", "target_database_pool": "pool_db_finops" }, { "target_database": "datarock", "target_database_type": "redshift", "target_schema": "dataops_sandbox", "target_table": "salesforce_middleware_events", "target_database_connection_id": "redshift_data", "target_database_pool": "pool_db_datarock_replication" } ], "cdc_dependent_monitorings": ["update", "duplicate_rows"], "cdc_independent_monitorings": ["missing_rows", "db_delay"], "schedule": "*/20 * * * *", "execution_frequency": 1, "delay_monitoring_interval": 10, "delay_monitoring_tolerance_interval": 20, "cdc_look_for_past_interval": 5, "start_date": "2020-09-25T09:00:00", "lob_slice": 63000 } ] ``` ## Parâmetros de uma tabela - `dms_task_name`: o nome da task do DMS responsável por replicar as informações do banco source para o s3 - `source_database_type`: o tipo do banco source (em geral `redshift` ou `postgres`) - `source_database`: o nome do banco de origem, assim como será replicado para o bucket S3 (ex: `finops-db`, `datarock`, `opsdb`) - `source_schema`: o schema onde se encontra a tabela (ex: `live`, `public`) - `source_table`: o nome da tabela que será replicada - `source_database_connection_id` : o id de uma Connection do banco de origem - `source_database_pool` um Pool para limitar conexões simultâneas ao banco de origem - `targets`: uma lista dos targets para onde deseja-se replicar. Os parâmetros requeridos para cada target são: - `target_database_type`: o tipo do banco de destino (em geral `redshift` ou `postgres`) - `target_database`: o nome do banco de destino (ex: `finops-db`, `datarock`) - `target_schema`: o schema onde se encontra a tabela target - `target_table`: o nome da tabela target - `target_database_connection_id`: o id de uma Connection do banco de destino - `target_database_pool`: um Pool para limitar conexões simultâneas ao banco de destino - `cdc_dependent_monitorings`: uma lista com os monitoramentos que serão disparados a partir da dag de CDC. - `cdc_independent_monitorings`: uma lista com os monitoramentos que terão *schedule* próprio - `schedule`: o cron de execução da dag de Full Load (Controle) - `execution_frequency`: em horas, define o quanto de tempo após o `execution_date` da dag que deve ser considerado para buscar arquivos de CDC - `cdc_look_for_past_interval`: em horas, define o quanto de tempo antes do `execution_date` da dag que deve ser considerado na busca dos arquivos CDC, - `delay_monitoring_interval`: intervalo de execução do monitoramento de delay, - `delay_monitoring_tolerance_interval`: intervalo de tolerância de delay do monitoramento, - `start_date`: data a partir da qual a versão é válida - `lob_slice`: o DMS trunca os LOB (large object binary) apartir de alguns caracteres. Esta variável possui este valor, para que na comparação entre source e target isto seja considerado. ## As DAGs ### Full Load A DAG realiza a operação de Full Load, quando necessário, controla a execução do CDC e também das migrations, em caso de nova tabela ou adição de coluna. Abaixo uma imagem do fluxo: ![full load](https://i.imgur.com/vsfcLzk.png) - `get_ddl` obtém a query de create table no source - `treat_migration` utiliza o resultado de `get_ddl` para gerar um json e converter os tipos para seus correspondentes no target. Além disto, identifica se é necessário criar a tabela ou realizar alter table para adicionar nova coluna. Se alguma destas condições é verdadeira, salva o json gerado no histórico de migrations no bucket s3 target e gera o DDL - `run_ddl_query` é chamada apenas quando a `treat_migration` identificou que é necessário. Esta task roda a query de DDL que cria a tabela target ou adiciona novas colunas a ela. - `verify_full_load_and_copy` verifica se há novos arquivos de full load no bucket source no s3. - Se sim, quer dizer que há um full load a ser realizado, por isto, verifica-se se a task de full load já foi finalizada pelo DMS. - Se sim, os arquivos de full load anterior são deletados e os novos parquets são copiados para o bucket s3 target para serem processados e a próxima task chamada carregará tais dados na tabela target - Se não, a próxima task chamada dispara a DAG de CDC - Se não, a próxima task chamada dispara o CDC - `run_query` executa query que faz copy dos arquivos de parquet para a tabela target - `delete_from_source` deleta os arquivos parquet do bucket s3 source, uma vez que estes já foram copiados para o target - `trigger_cdc` dispara a execução da DAG de CDC desta mesma tabela ### CDC Esta DAG dispara a redução dos arquivos de CDC no EMR, monitora esta execução, carregando o resultado nas tabelas target. Ao seu fim, dispara monitoramentos dependentes. O fluxo da DAG é o seguinte: ![cdc](https://i.imgur.com/tI8AxAj.png) - `copy_cdc_to_target_bucket` busca por arquivos CDC no bucket s3 source pertencentes a uma janela de horários que leva em consideração a frequência de execução e um intervalo extra. Se estes arquivos existirem, estes são copiados para o bucket target, além de colocar nas xcom quais chaves que deverão ser deletadas posteriormente. Se não houver arquivos no bucket source, é feita a busca pelas chaves no bucket target e, caso existam e este seja um reprocessamento, as chaves são copiadas para a pasta da execução atual. - `send_script_to_s3` renderiza o script Pyspark com os parâmetros da tabela e da execução e o envia para o s3, de onde será obtido pelo cluster EMR. - `start_spark_job` submete os jobs necessários ao cluster EMR. Os jobs são dois: o primeiro copia o script Pyspark do s3 para o cluster e segundo o executa. Ao fim, coloca nos xcom o id dos steps criados. - o `sensor` observa o status das steps no cluster EMR. Se as steps terminarem com sucesso, o sensor também terminará. Se alguma da steps for para algum status de falha ou cancelamento, o sensor terá resultado dado como falha. - `run_delete_query` identifica se dentre os arquivos gerados pelo Spark, há arquivos que indiquem linhas a ser deletadas. Se sim, roda query no target para deletá-los. - `run_upsert_query` identifica se dentre os arquivos gerados pelo Spark, há arquivos que indiquem linhas a ser inseridas ou atualizadas. Se sim, roda query no target para fazer o upsert. - `clean_source_bucket` deleta os arquivos do bucket source, uma vez que estes já foram copiados para o bucket target e replicados nas tabelas target. - `trigger` triga os monitoramentos definidos na variable `replication_table` no campo `cdc_dependent_monitorings`. ### Monitoramento de delay Esta DAG é disparada em intervalos regulares definidos pela variável `delay_monitoring_interval` e determina o atraso entre os bancos source e target. O grafo dela é o seguinte: ![delay monitoring](https://i.imgur.com/0Kv4uXi.png) - `run_max_date_query_source` realiza query para identificar, no banco source, a data de criação e de update de uma linha mais recentes. - `run_max_date_query_target` realiza query para identificar, no banco target, a data de criação e de update de uma linha mais recentes. - `check_db_delay` calcula o atraso entre os bancos e decide, de acordo com o a tolerância estabelecida na variável `delay_monitoring_tolerance_interval`, se o atraso dever ser informado no slack - `send_slack` envia mensagem no slack alertando se o atraso for excessivo e qual o atraso. Se não houver atraso significativo, não envia mensagem. - `save_db` adiciona a linha referente ao monitoramento na tabela ReplicationMonitoring, caso haja alertas - `save_database_delay` adiciona o atraso medido na tabela ReplicationDelays ### Monitoramento de duplicate rows Esta DAG, disparada pela última task do CDC, verifica se há mais de uma linha com um mesmo id na tabela observada. Seu grafo é o seguinte: ![duplicate rows monitoring](https://i.imgur.com/Ri1P0gl.png) - `run_duplicate` realiza query para identificar, no banco target, ids que ocorrem mais de uma vez. - `send_slack` envia mensagem no slack alertando sobre os ids identificados como duplicados. Se não houver id, não envia mensagem. - `save_db` adiciona a linha referente ao monitoramento na tabela ReplicationMonitoring e, caso haja alertas, registra-os em ReplicationMonitoringAlerts ### Monitoramento de missing rows Esta DAG é executada uma vez por dia e verifica se há ids faltantes na tabela target. ![missing rows monitoring](https://i.imgur.com/uh5M564.png) - `run_table_id_type` executa uma query para identificar o tipo de dado do id da tabela - `create_id_invervals` define datas limites para as quais a DAG deve olhar e busca os ids destes limites. - `get_source_ids` realiza query no source para obter todos os ids que estão entre os limites definidos por `create_id_invervals` - `get_target_ids` realiza query no target para obter todos os ids que estão entre os limites definidos por `create_id_invervals` - `analyze_missing_rows` verifica se todos os ids encontrados em `get_source_ids` também foram encontrados em `get_target_ids`. Caso exista algum id faltante, as ações de correção serão aplicadas, caso contrário, as próximas tasks serão puladas. - `send_slack` envia mensagem no slack alertando sobre os ids identificados como faltantes. Se não houver id, não envia mensagem. - `save_db` adiciona a linha referente ao monitoramento na tabela ReplicationMonitoring e, caso haja alertas, registra-os em ReplicationMonitoringAlerts - `source_alerted_query` busca os valores das linhas faltantes no banco source - `run_upsert_query` copia as linhas faltantes para o banco target - `update_db` atualiza a linha de alerta do monitoramento para marcar a data de resolução do problema - `update_slack` atualiza a mensagem enviada ao slack para mudar a cor para verde e adicionar a data de resolução ### Monitoramento de updates Esta DAG faz o monitoramento de consistência entre os valores das colunas de source e target. As tasks da DAG são: ![updates monitoring](https://i.imgur.com/TTQNzC3.png) - `get_ids_list` obtém, a partir dos arquivos CDC, os ids que foram afetados pelo fluxo de CDC. - `run_table_id_type` busca pelo tipo do id da tabela - `source_query` realiza query em chunks no banco source para obter o estado das linhas identificadas anteriormente - `target_query`realiza query em chunks no banco target para obter o estado das linhas identificadas anteriormente - `compare_rows` baixa os arquivos com o resultado das queries acima e verifica se o conteúdo presente no source e no target são correspondentes - `send_slack` envia mensagem no slack alertando sobre os ids onde foram encontradas divergências. Se não houver id, não envia mensagem. - `save_db` adiciona a linha referente ao monitoramento na tabela ReplicationMonitoring e, caso haja alertas, registra-os em ReplicationMonitoringAlerts - `source_alerted_query` busca os valores das linhas divergentes no banco source - `run_upsert_query` deleta e copia o conteúdo do source das linhas divergêntes para o banco target - `update_db` atualiza a linha de alerta do monitoramento para marcar a data de resolução do problema - `update_slack` atualiza a mensagem enviada ao slack para mudar a cor para verde e adicionar a data de resolução # Métricas de processamento Considerando o processamento dos arquivos que replicam live."TransactionsNew" do banco `finops-db` entre os dias 26/09/2020 e 28/09/2020, nos quais o ciclo estava configurado para ter duração de 20 minutos, em média, foram processados 19 arquivos por execução, sendo a média da quantidade de dados processada 1.22MB. Abaixo, histogramas que descrevem a quantidade de arquivos processados, a quantidade de dados processados e o tempo de execução por ciclo. ![arquivos processados](https://i.imgur.com/hc5VIp9.png) ![dados processados](https://i.imgur.com/vmevbOl.png) ![tempo de execução](https://i.imgur.com/Xuv1MN1.png) ## Acompanhamento das métricas da aplicação Para acompanhar a saúde desta aplicação, foi criada uma [dashboard no metabase](https://metabase.data.pagarme.net/dashboard/636). A figura abaixo representa o estado das métricas em um dado momento. ![](https://i.imgur.com/yMJE7SZ.png) O gráfico do canto superior esquerdo ilustra o percentual de tempo diário em que algum dado ficou inconsistente no banco target com relação ao target. O gráfico ao lado, no canto superior direito, mostra quantas linhas foram afetadas por inconsistências em cada dia. Os dois gráficos da linha inferior representam métricas similares. O primeiro significa a porcentagem de tempo dentro que uma semana que o atraso do banco target com relação ao source foi inferior a duas horas, sendo que a meta, indicada pela linha pontilhada, é de 85%. O gráfico a direita usa a mesma lógica, mas para atraso inferior a quatro horas e com meta de 90%. Todos os gráficos foram criados usando as métricas relativas apenas a uma das tabelas replicadas pelo serviço.