# SMS: Spark Migration Service
Este serviço tem o objetivo de replicar dados entre nossos bancos e monitorar a consistência destas replicações.
## Motivação
O SMS tem como sua principal motivação criar uma alternativa in-house para serviços como o DMS, a fim de manter uma replicação de dados entre diferentes bancos. Além da questão financeira, a ideia do SMS é fornecer ferramentas para o acompanhamento dessa replicação e facilitar a detecção de problemas com os dados nos bancos replicados.
Hoje existe uma necessidade enorme de acesso aos dados do transacional para os times de operação e para monitoramentos do nosso produto. Para evitar o acesso ao banco de produção, existem duas replicações que visam sanar essa necessidade, mantendo o delay o menor possível. O problema é que a solução atual, o DMS, tem sofrido com o aumento de volume de dados e também com limitações próprias, gerando uma manutenção pesada e complicada.
Através da captura de mudança de dados (CDC), a ideia é lidar com esse processamento de grande volume de dados com uma ferramenta própria, fazendo uso assim do Spark. Desacoplando o processamento de dados replicados dos bancos (as mudanças são apenas capturadas e enviadas para o S3), o DMS se torna menos propenso a erros e evita sobrecarga nos bancos de origem ou destino. Além disso, o SMS cria a possibilidade de receber dados de outras formas e normalizá-los para serem replicados aos bancos de destino.
## Funcionamento
Inicialmente, os dados são copiados no formato *parquet* do banco de origem para um bucket no S3, utilizando o *Database Migration Service* (DMS), da AWS. A partir disto, nosso serviço, que roda no Airflow, realiza o procedimento de Full Load, CDC (change data capture) e checa a consistência dos dados replicados, conforme é mostrado no diagrama a seguir:

Cada uma destas DAGs é gerada dinamicamente para cada uma das tabelas cadastradas na variável `replication_tables`, cujo exemplo pode ser dado por:
```
[
{
"dms_task_name": "salesforce-middleware-db-postgres-s3",
"source_database_type": "postgres",
"source_database": "salesforce-middleware-db",
"source_schema": "public",
"source_table": "Events",
"source_database_connection_id": "postgres_salesforce_middleware",
"source_database_pool": "pool_db_finops",
"targets": [
{
"target_database": "finops-db",
"target_database_type": "postgres",
"target_schema": "dataops_sandbox",
"target_table": "salesforce_middleware_events",
"target_database_connection_id": "sms_finops_writer",
"target_database_pool": "pool_db_finops"
},
{
"target_database": "datarock",
"target_database_type": "redshift",
"target_schema": "dataops_sandbox",
"target_table": "salesforce_middleware_events",
"target_database_connection_id": "redshift_data",
"target_database_pool": "pool_db_datarock_replication"
}
],
"cdc_dependent_monitorings": ["update", "duplicate_rows"],
"cdc_independent_monitorings": ["missing_rows", "db_delay"],
"schedule": "*/20 * * * *",
"execution_frequency": 1,
"delay_monitoring_interval": 10,
"delay_monitoring_tolerance_interval": 20,
"cdc_look_for_past_interval": 5,
"start_date": "2020-09-25T09:00:00",
"lob_slice": 63000
}
]
```
## Parâmetros de uma tabela
- `dms_task_name`: o nome da task do DMS responsável por replicar as informações do banco source para o s3
- `source_database_type`: o tipo do banco source (em geral `redshift` ou `postgres`)
- `source_database`: o nome do banco de origem, assim como será replicado para o bucket S3 (ex: `finops-db`, `datarock`, `opsdb`)
- `source_schema`: o schema onde se encontra a tabela (ex: `live`, `public`)
- `source_table`: o nome da tabela que será replicada
- `source_database_connection_id` : o id de uma Connection do banco de origem
- `source_database_pool` um Pool para limitar conexões simultâneas ao banco de origem
- `targets`: uma lista dos targets para onde deseja-se replicar. Os parâmetros requeridos para cada target são:
- `target_database_type`: o tipo do banco de destino (em geral `redshift` ou `postgres`)
- `target_database`: o nome do banco de destino (ex: `finops-db`, `datarock`)
- `target_schema`: o schema onde se encontra a tabela target
- `target_table`: o nome da tabela target
- `target_database_connection_id`: o id de uma Connection do banco de destino
- `target_database_pool`: um Pool para limitar conexões simultâneas ao banco de destino
- `cdc_dependent_monitorings`: uma lista com os monitoramentos que serão disparados a partir da dag de CDC.
- `cdc_independent_monitorings`: uma lista com os monitoramentos que terão *schedule* próprio
- `schedule`: o cron de execução da dag de Full Load (Controle)
- `execution_frequency`: em horas, define o quanto de tempo após o `execution_date` da dag que deve ser considerado para buscar arquivos de CDC
- `cdc_look_for_past_interval`: em horas, define o quanto de tempo antes do `execution_date` da dag que deve ser considerado na busca dos arquivos CDC,
- `delay_monitoring_interval`: intervalo de execução do monitoramento de delay,
- `delay_monitoring_tolerance_interval`: intervalo de tolerância de delay do monitoramento,
- `start_date`: data a partir da qual a versão é válida
- `lob_slice`: o DMS trunca os LOB (large object binary) apartir de alguns caracteres. Esta variável possui este valor, para que na comparação entre source e target isto seja considerado.
## As DAGs
### Full Load
A DAG realiza a operação de Full Load, quando necessário, controla a execução do CDC e também das migrations, em caso de nova tabela ou adição de coluna. Abaixo uma imagem do fluxo:

- `get_ddl` obtém a query de create table no source
- `treat_migration` utiliza o resultado de `get_ddl` para gerar um json e converter os tipos para seus correspondentes no target. Além disto, identifica se é necessário criar a tabela ou realizar alter table para adicionar nova coluna. Se alguma destas condições é verdadeira, salva o json gerado no histórico de migrations no bucket s3 target e gera o DDL
- `run_ddl_query` é chamada apenas quando a `treat_migration` identificou que é necessário. Esta task roda a query de DDL que cria a tabela target ou adiciona novas colunas a ela.
- `verify_full_load_and_copy` verifica se há novos arquivos de full load no bucket source no s3.
- Se sim, quer dizer que há um full load a ser realizado, por isto, verifica-se se a task de full load já foi finalizada pelo DMS.
- Se sim, os arquivos de full load anterior são deletados e os novos parquets são copiados para o bucket s3 target para serem processados e a próxima task chamada carregará tais dados na tabela target
- Se não, a próxima task chamada dispara a DAG de CDC
- Se não, a próxima task chamada dispara o CDC
- `run_query` executa query que faz copy dos arquivos de parquet para a tabela target
- `delete_from_source` deleta os arquivos parquet do bucket s3 source, uma vez que estes já foram copiados para o target
- `trigger_cdc` dispara a execução da DAG de CDC desta mesma tabela
### CDC
Esta DAG dispara a redução dos arquivos de CDC no EMR, monitora esta execução, carregando o resultado nas tabelas target. Ao seu fim, dispara monitoramentos dependentes. O fluxo da DAG é o seguinte:

- `copy_cdc_to_target_bucket` busca por arquivos CDC no bucket s3 source pertencentes a uma janela de horários que leva em consideração a frequência de execução e um intervalo extra. Se estes arquivos existirem, estes são copiados para o bucket target, além de colocar nas xcom quais chaves que deverão ser deletadas posteriormente. Se não houver arquivos no bucket source, é feita a busca pelas chaves no bucket target e, caso existam e este seja um reprocessamento, as chaves são copiadas para a pasta da execução atual.
- `send_script_to_s3` renderiza o script Pyspark com os parâmetros da tabela e da execução e o envia para o s3, de onde será obtido pelo cluster EMR.
- `start_spark_job` submete os jobs necessários ao cluster EMR. Os jobs são dois: o primeiro copia o script Pyspark do s3 para o cluster e segundo o executa. Ao fim, coloca nos xcom o id dos steps criados.
- o `sensor` observa o status das steps no cluster EMR. Se as steps terminarem com sucesso, o sensor também terminará. Se alguma da steps for para algum status de falha ou cancelamento, o sensor terá resultado dado como falha.
- `run_delete_query` identifica se dentre os arquivos gerados pelo Spark, há arquivos que indiquem linhas a ser deletadas. Se sim, roda query no target para deletá-los.
- `run_upsert_query` identifica se dentre os arquivos gerados pelo Spark, há arquivos que indiquem linhas a ser inseridas ou atualizadas. Se sim, roda query no target para fazer o upsert.
- `clean_source_bucket` deleta os arquivos do bucket source, uma vez que estes já foram copiados para o bucket target e replicados nas tabelas target.
- `trigger` triga os monitoramentos definidos na variable `replication_table` no campo `cdc_dependent_monitorings`.
### Monitoramento de delay
Esta DAG é disparada em intervalos regulares definidos pela variável `delay_monitoring_interval` e determina o atraso entre os bancos source e target. O grafo dela é o seguinte:

- `run_max_date_query_source` realiza query para identificar, no banco source, a data de criação e de update de uma linha mais recentes.
- `run_max_date_query_target` realiza query para identificar, no banco target, a data de criação e de update de uma linha mais recentes.
- `check_db_delay` calcula o atraso entre os bancos e decide, de acordo com o a tolerância estabelecida na variável `delay_monitoring_tolerance_interval`, se o atraso dever ser informado no slack
- `send_slack` envia mensagem no slack alertando se o atraso for excessivo e qual o atraso. Se não houver atraso significativo, não envia mensagem.
- `save_db` adiciona a linha referente ao monitoramento na tabela ReplicationMonitoring, caso haja alertas
- `save_database_delay` adiciona o atraso medido na tabela ReplicationDelays
### Monitoramento de duplicate rows
Esta DAG, disparada pela última task do CDC, verifica se há mais de uma linha com um mesmo id na tabela observada. Seu grafo é o seguinte:

- `run_duplicate` realiza query para identificar, no banco target, ids que ocorrem mais de uma vez.
- `send_slack` envia mensagem no slack alertando sobre os ids identificados como duplicados. Se não houver id, não envia mensagem.
- `save_db` adiciona a linha referente ao monitoramento na tabela ReplicationMonitoring e, caso haja alertas, registra-os em ReplicationMonitoringAlerts
### Monitoramento de missing rows
Esta DAG é executada uma vez por dia e verifica se há ids faltantes na tabela target.

- `run_table_id_type` executa uma query para identificar o tipo de dado do id da tabela
- `create_id_invervals` define datas limites para as quais a DAG deve olhar e busca os ids destes limites.
- `get_source_ids` realiza query no source para obter todos os ids que estão entre os limites definidos por `create_id_invervals`
- `get_target_ids` realiza query no target para obter todos os ids que estão entre os limites definidos por `create_id_invervals`
- `analyze_missing_rows` verifica se todos os ids encontrados em `get_source_ids` também foram encontrados em `get_target_ids`. Caso exista algum id faltante, as ações de correção serão aplicadas, caso contrário, as próximas tasks serão puladas.
- `send_slack` envia mensagem no slack alertando sobre os ids identificados como faltantes. Se não houver id, não envia mensagem.
- `save_db` adiciona a linha referente ao monitoramento na tabela ReplicationMonitoring e, caso haja alertas, registra-os em ReplicationMonitoringAlerts
- `source_alerted_query` busca os valores das linhas faltantes no banco source
- `run_upsert_query` copia as linhas faltantes para o banco target
- `update_db` atualiza a linha de alerta do monitoramento para marcar a data de resolução do problema
- `update_slack` atualiza a mensagem enviada ao slack para mudar a cor para verde e adicionar a data de resolução
### Monitoramento de updates
Esta DAG faz o monitoramento de consistência entre os valores das colunas de source e target. As tasks da DAG são:

- `get_ids_list` obtém, a partir dos arquivos CDC, os ids que foram afetados pelo fluxo de CDC.
- `run_table_id_type` busca pelo tipo do id da tabela
- `source_query` realiza query em chunks no banco source para obter o estado das linhas identificadas anteriormente
- `target_query`realiza query em chunks no banco target para obter o estado das linhas identificadas anteriormente
- `compare_rows` baixa os arquivos com o resultado das queries acima e verifica se o conteúdo presente no source e no target são correspondentes
- `send_slack` envia mensagem no slack alertando sobre os ids onde foram encontradas divergências. Se não houver id, não envia mensagem.
- `save_db` adiciona a linha referente ao monitoramento na tabela ReplicationMonitoring e, caso haja alertas, registra-os em ReplicationMonitoringAlerts
- `source_alerted_query` busca os valores das linhas divergentes no banco source
- `run_upsert_query` deleta e copia o conteúdo do source das linhas divergêntes para o banco target
- `update_db` atualiza a linha de alerta do monitoramento para marcar a data de resolução do problema
- `update_slack` atualiza a mensagem enviada ao slack para mudar a cor para verde e adicionar a data de resolução
# Métricas de processamento
Considerando o processamento dos arquivos que replicam live."TransactionsNew" do banco `finops-db` entre os dias 26/09/2020 e 28/09/2020, nos quais o ciclo estava configurado para ter duração de 20 minutos, em média, foram processados 19 arquivos por execução, sendo a média da quantidade de dados processada 1.22MB. Abaixo, histogramas que descrevem a quantidade de arquivos processados, a quantidade de dados processados e o tempo de execução por ciclo.



## Acompanhamento das métricas da aplicação
Para acompanhar a saúde desta aplicação, foi criada uma [dashboard no metabase](https://metabase.data.pagarme.net/dashboard/636). A figura abaixo representa o estado das métricas em um dado momento.

O gráfico do canto superior esquerdo ilustra o percentual de tempo diário em que algum dado ficou inconsistente no banco target com relação ao target. O gráfico ao lado, no canto superior direito, mostra quantas linhas foram afetadas por inconsistências em cada dia.
Os dois gráficos da linha inferior representam métricas similares. O primeiro significa a porcentagem de tempo dentro que uma semana que o atraso do banco target com relação ao source foi inferior a duas horas, sendo que a meta, indicada pela linha pontilhada, é de 85%. O gráfico a direita usa a mesma lógica, mas para atraso inferior a quatro horas e com meta de 90%.
Todos os gráficos foram criados usando as métricas relativas apenas a uma das tabelas replicadas pelo serviço.