# Data synchronization solution
將MySQL資料即時(事件驅動)同步到ES
## 啟用Binlog
Binlog是MySQL的日誌,通常用作Replication及備份使用,一般來說可用於兩種場景:
* MySQL主從複製:MySQL Replication在Master端打開Binlog,Master把它的二進制日誌傳遞給Slaves來達到Master與Slave資料一致的目的
* 資料恢復:通過使用mysqlbinlog工具來使恢復資料
1.修改`mysqld.cnf`
```cnf
# /etc/mysql/mysql.conf.d/mysqld.cnf
[mysqld]
log_bin = /var/lib/mysql/mysql-bin
server-id = 123456
binlog-format = row
```
* log_bin: 定義log file的base name,mysqld會在以下情況發生時在base name後面加上遞增的流水號(例如:`mysql-bin.000001`、`mysql-bin.000002`)。
1. server啟動或重新啟動
2. server刷新log
3. log的大小達到[max_binlog_size](https://dev.mysql.com/doc/refman/8.0/en/replication-options-binary-log.html#sysvar_max_binlog_size),範圍4k-1G,默認為1G
* server-id: 在replication topology中的唯一碼
* binKlog-format: 定義log file中event紀錄的格式,可以設定以下三種格式。
1. row-based logging: 紀錄每個被修改的`row`
2. statement-based logging: 紀錄每個會修改row的`SQL`
3. mixed-base logging: 上面兩種的結合(`row+SQL`)
2.重啟`mysql`
> service mysql restart
設定完成後,可以透過`show variables like "%log_bin%"`查看使否成功啟用

## Python實作
### python-mysql-replication[[GitHub](https://github.com/noplay/python-mysql-replication)]
Pure Python Implementation of MySQL replication protocol build on top of PyMYSQL. This allow you to receive event like insert, update, delete with their datas and raw SQL queries.
### Resume
儲存完成同步的事件訊息,以確保程式重新執行時不會導入重複的資料到ES
* log_file
* log_pos
```python
class Resume:
def __init__(self, log_file=None, log_pos=None):
self.log_file = log_file
self.log_pos = log_pos
@staticmethod
def get():
resume = Resume()
try:
es = Elasticsearch(ES_HOST)
doc = es.get(index=ES_RESUME_INFO_INDEX, id=1)
resume.log_file = doc['_source']['log_file']
resume.log_pos = doc['_source']['log_pos']
except NotFoundError:
# No resume in the ES
pass
return resume
def set(self):
es = Elasticsearch(ES_HOST)
doc = {
'log_file': self.log_file,
'log_pos': self.log_pos
}
es.index(index=ES_RESUME_INFO_INDEX, body=doc, id=1)
```
### Main
主程式
```python
init_resume = Resume.get()
if init_resume.log_file is None or init_resume.log_pos is None:
stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS,
only_events=[RotateEvent, WriteRowsEvent],
server_id=MYSQL_SERVER_ID,
blocking=True)
else:
stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS,
only_events=[RotateEvent, WriteRowsEvent],
server_id=MYSQL_SERVER_ID,
resume_stream=True,
log_file=init_resume.log_file,
log_pos=init_resume.log_pos,
blocking=True)
log_file = init_resume.log_file
for binlogevent in stream:
if isinstance(binlogevent, RotateEvent):
log_file = binlogevent.next_binlog
elif isinstance(binlogevent, WriteRowsEvent):
log_pos = binlogevent.packet.log_pos
index = binlogevent.schema + '.' + binlogevent.table
es = Elasticsearch(ES_HOST)
es.index(index=index, body=binlogevent.rows[0]["values"])
updating_resume = Resume(log_file, log_pos=log_pos)
updating_resume.set()
```
## Reference
https://dev.mysql.com/doc/refman/8.0/en/binary-log.html