# SQL Server Merge Statement deadlock
## 緣起
我們家的JDBC Sink Connector主要運行模式使用
- `insert.mode` => upsert mode
- `pk.mode` => record_key
- `tasks.max` => 3
幾週前DBA通報我們家的JDBC Sink Connector的帳號有Deadlock發生,DBA給的deadlock的SQL是JDBC Sink Connector產生的merge statement那段,發生deadlock的時間點去查record topic裡的message,key裡並沒有重複pk的record。
當時沒想太多就想說一般Sink Connector worker數等於record topic的partition數。因為用PK分到專屬partition,不同worker thread處理的都是不同pk的資料。那我就把worker改成1,反正只有一個處理3個partition也還好。有點不求甚解的鳥做法。
當時認為merge取得的lock是row lock、key lock之類的,所以想不通為什麼會有deadlock發生,所以就無腦的把tasks.max改成1,請DBA幫忙監控是否還有deadlock發生。(後記:Connector發生deadlock也會有log,在connect的log會看到deadlock相關的stacktrace)
在google查詢中發現有人問merge是否會有table lock的討論就去查一下微軟的文件。
Oracle merge statement應該只有row lock([ref](https://docs.oracle.com/en/database/oracle/oracle-database/19/sqlrf/Automatic-Locks-in-DML-Operations.html))
## Lock
SQL Server merge statement的文件有關的描述
- [Concurrency considerations for MERGE](https://learn.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?view=sql-server-ver16#concurrency-considerations-for-merge)
### HOLDLOCK hint
這個hint是為了保護不違反PK的狀況發生。
> **HOLDLOCK** is a synonym for the SERIALIZABLE transaction isolation level, which doesn't allow for other concurrent transactions to modify data that this transaction has read. SERIALIZABLE is the safest isolation level but provides for the least concurrency with other transactions that retains locks on ranges of data to prevent phantom rows from being inserted or updated while reads are in progress.
### SERIALIZABLE level
> <span style="color:red">Range locks</span> are placed in the range of key values that match the search conditions of each statement executed in a transaction. This blocks other transactions from updating or inserting any rows that would qualify for any of the statements executed by the current transaction. This means that if any of the statements in a transaction are executed a second time, they will read the same set of rows. The range locks are held until the transaction completes. This is the most restrictive of the isolation levels because it locks entire ranges of keys and holds the locks until the transaction completes. Because concurrency is lower, use this option only when necessary. This option has the same effect as setting HOLDLOCK on all tables in all SELECT statements in a transaction.
因為JDBC Sink Connector設定是使用pk產生upsert(merge) statement[SqlServerDatabaseDialect.java#buildUpsertQueryStatement()](https://github.com/spinatelli/kafka-connect-jdbc/blob/81ef5d1380b340b07a240e883367728a24371903/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java#L320)。
預設的狀況下產生的SQL statement會有HOLDLOCK這個hint。(這個hint也可以在DBA監控deadlock印出來的訊息看到)
### Confluent JDBC Sink Connector
insert mode設定為upsert的時候預設產生的SQL會有HOLDLOCK的hint。要不要設定hint開關是下面這個參數。
[`mssql.use.merge.holdlock`](https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html)
`mssql.use.merge.holdlock`
Whether to use HOLDLOCK when performing a MERGE INTO upsert statement. Note that this configuration property is specific to Microsoft SQL Server only.
* Type: boolean
* Default: true
* Importance: low
[SqlServerDatabaseDialect.java#buildUpsertQueryStatement()](https://github.com/spinatelli/kafka-connect-jdbc/blob/81ef5d1380b340b07a240e883367728a24371903/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java#L328)
## Useful Resources
後來找到下面這篇更進一步解釋range lock和提出一些建議的解法。
[How MERGE on two different rows can still deadlock you](https://sqlsunday.com/2021/05/04/how-merge-can-deadlock-you/)
> Key lock vs. Range lock
If the Seek hits an existing row, SQL Server will place a key lock on that specific row. This prevents other processes from changing this exact row until we’re finished with it. Remember, because we’re in Serializable, we expect the row we saw to be there next time we need it, and to look just the way we found it.
But if we didn’t find a row, there’s nothing specific to lock, because locks can only be placed on keys (rows), pages, partitions, tables and schemas. SQL Server cannot lock an arbitrary search criteria like “WHERE id=123”, because that would create a computational nightmare. To solve this, SQL Server will place a range lock, which holds not a specific key, but everything between two existing keys.
As you can see, our single-point lookup just got a little more invasive.
If we have a table with two rows, id=100 and id=10000, a Seek on id=123 would place a Range lock on 100>id≥1000.
根因在於你以為key lock;但實際上可能是range lock。
### 方案
#### The simplest fix
等如果不行就失敗的重做 => `tasks.max`=1讓寫入持有lock的只剩一個,但這樣就失去當Connector的Worker某一個異常的時候會由另外的worker thread接手處理該partition的record的優點。
JDBC Connector有retry次數和backoff的相關參數([ref](https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html#retries)),原本就有default值。在JdbcSinkTask就是作用於拿到SinkRecord使用JdbcWriter寫入時([ref](https://github.com/confluentinc/kafka-connect-jdbc/blob/v10.7.6/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java#L78))。看程式碼資料應該大部分都會在retry的過程寫入,但DBA和log的監控會在意application發生deadlock的情況。或許,在CDC的使用情境已經PK已經確定相同PK會循序由同一個partition的consumer去處理,第二條路可能是一個不錯的選項。
#### The simple fix
拿掉HOLDLOCK => 如果你確定你的操作不會因為沒有lock而產生問題。
在JDBC Sink Connector設定方法
`mssql.use.merge.holdlock`=false
#### Rewriting the MERGE
不用merge的寫法但是可以達到相同目的的做法,在使用JDBC Connector的前提下應該不會選這條。有興趣可以自己去看。
#### 在JDBC Sink Connector可能方案
另一條複雜的路線但沒實驗過就是把insert, update, delete的行為透過SMT送到專屬topic,然後分別建立sink connector在insert, update mode和enable delete的狀態。
以下是Confluent JDBC Connecot官方`insert.mode`的選項和說明
`insert.mode`
The insertion mode to use.
* Type: string
* Default: insert
* Valid Values: [insert, upsert, update]
* Importance: high
The supported modes are as follows:
`insert`
Use standard SQL INSERT statements.
`upsert`
Use the appropriate upsert semantics for the target database if it is supported by the connector–for example, INSERT OR IGNORE. When using upsert mode, you must add and define the pk.mode and pk.fields properties in the connector configuration. For example:
{
...
"pk.mode": "record_value",
"pk.fields": "id"
...
}
In the previous example, pk.fields should contain your primary key.
`update`
Use the appropriate update semantics for the target database if it is supported by the connector–for example, UPDATE.