# Securing Kafka ## Encryption ### 在 kafka 中的重要性 維護資料的安全性和完整性。 - kafka listener 可以設定透過 SSL/SASL_SSL 使用 TLS 做傳輸層的加密。 - [TLS cipher suites](https://medium.com/@clu1022/%E9%82%A3%E4%BA%9B%E9%97%9C%E6%96%BCssl-tls%E7%9A%84%E4%BA%8C%E4%B8%89%E4%BA%8B-%E4%B9%9D-ssl-communication-31a2a8a888a6)可以讓安全度增加,並且符合 Federal Information Processing Standards (FIPS) 規則。 (*聯邦資訊處理標準: 是美國聯邦政府制定給所有軍事機構除外的政府機構及政府的承包商所使用的公開標準。) 除了傳輸階段要將通道做 encryption,硬體層也需要。 - 需將整個硬碟和 volume 做加密,確保當 disk 被偷,含有機敏資料的 kafka logs 的儲存是安全的。 另外,也要特別注意擁有特殊權限的 admin 或是 cloud provider,因為他們是能夠直接接觸到機器,也就可以直接從 disk 裡面取得資料。 - 這時候可以透過 custom encryption provider 作為 kafka client plug-in,以實踐整個 data flow end-to-end encryption。 ### end-to-end encryption encryption 作法 - 可以在 producer serialized message 的時候,結合 encryption。 - consumer deserialized 的時候做 decryption。 - encryption 通常是用對稱式加密演算法(AES)。 - key 通常會存在 KMS 中,producer 和 consumer 就能取用,broker 是無法取得 encrption key 的,也就沒辦法取得加密資料,雲端適合用這種方法。 - decryption 要使用到的 encryption 參數通常會存在於 message header 或是 message payload 中。 - 數位簽章會被包含在 message header 中以確認資料完整性。 ![SmartSelect_20240406_145948_Samsung Notes](https://hackmd.io/_uploads/H14JNdAk0.jpg) - producer 和 consumer 會設定 credential 才能取得 encryption key。 - 要確保不會被 brute-force attack,最好是對 key 定期 rotate,新舊 Kay 都能支援解密資料。 - 很多 KMS 系統都有支援graceful key rotation,而不用特別處理 kafka client。 - 但如果使用 [compacted topic](https://developer.confluent.io/courses/architecture/compaction/),有資料用舊的 key 做加密的,為了避免影響到新訊息,producer 和 consumer 必須要先 offline,才能將那些 message 改用新 key 做加密。 **壓縮加密資料** - Serializer(壓縮)一定要在加密之前做,或是 app 應該要在把資料送到 kafka 之前就把資料壓縮。 - kafka 本身就不要在加密資料之後做壓縮了,因為不會有任何節省空間的好處。 message key 加密 - message key 通常不會有機敏資料,所以無需加密。 - 某些情況下,key不能是明碼,因為 key 會用分類 partition 和 compaction。因此如果 encryption 參數做異動,也要確保key 是一樣的。 - 因為 message key 和 value 是分開 serialize 的,透過儲存原本 key 的 hash 當作 message key,然後將 encrypted key 放在 message payload 或是 header 中,這樣 producer 才能做 key 的 transform。 ## Authorization Authorization 就是決定你可以對 kafka 的資源做什麼樣的操作。 ### 驗證過程 - 只要有 client 連線進來,broker 會確認 client 的身份驗證是否和 kafka principal 一致。 - 當 kafka 確認 client 是有被授權的,才能夠進行操作。 - 例如:Alice 要寫入顧客訂購資料到 customerOrders topic,broker 會確認 User:Alice 是否有被授權可以對這個 topic 做寫入的操作。 - kafka 有內建的 AclAuthorizer,可以透過以下設定 `authorizer.class.name=kafka.security.authorizer.AclAuthorizer ` ### AclAuthorizer 提供對 kafka 資源做 fine-grained 的權限控制(ACLs)。 - ACLs 存在 ZooKeeper 之中,和每個 broker 的 memory 裡面,以增加 request 進到 broker 後確認授權的速度。 - broker 啟動的時候,就會將 ACLs load 到 memory 裡面,並且透過 zookeeper 的 watcher,藉由 event trigger 的方式將 ACLs 更新。 ACL binding 組成如下: - Resource type: Cluster, topic, Group, TransactionalId, DelegationToken - Pattern type: Literal, Prefixed - Resource name: prefix 或是 wildcard * - Operation: Describe, Create, Delete, Alter, Read, Write, DescribeConfigs, AlterConfigs - Permission type: Allow, Deny; Deny 順序優先。 - Principal: principal 格式 <principalType>:<principalName>, e.g., User:Bob or Group:Sales. 可以使用 wildcard User:* 表示該權限給所有 users。 - Host: 可以連線進來的 client ip or * 讓所有人都可以連進來。 **ACLs 說明** - 如果沒有 deny rule,就至少要有一個 allow rule 符合操作才有權限執行。 - 如果 read, write, Alter, delete 權限都核准,那 describe 權限就會被視為核准。 - 如果 AlterConfigs 權限核准,DescribeConfigs 權限就會被視為核准。 - broker 要有 Cluster:ClusterAction 權限,才能讓 controller request 和 replica fetch requests 對其他 brokers 連線。 Producer - producer 要能夠 produce 訊息到 topic,就要有 Topic:Write 權限。 - producer 要有 idempotent 功能,要有 Cluster:IdempotentWrite 權限。 - producer 要有 transaction 功能,要有 TransactionalId:Write 權限。 Consumer - group 要能夠 commit offsets 要有 Group:Read 權限。 - consumer 要能夠從 topic consume 資料,要有 Topic:Read 權限。 Administrative - 要有 Create, Delete, Describe, Alter, DescribeConfigs, or AlterConfig 權限。 ![SmartSelect_20240407_173927_Samsung Notes](https://hackmd.io/_uploads/Hy-JcyegR.jpg) ![SmartSelect_20240407_174014_Samsung Notes](https://hackmd.io/_uploads/r1bQ5yxl0.jpg) kafka 提供 kafka-acl.sh tool,可以直接連 ZooKeeper,這樣當 kafka brokers 還沒被起起來的時候,就可以直接把權限注入。 ``` $ bin/kafka-acls.sh --add --cluster --operation ClusterAction \ --authorizer-properties zookeeper.connect=localhost:2181 \ --allow-principal User:kafka $ bin/kafka-acls.sh --bootstrap-server localhost:9092 \ --command-config admin.props --add --topic customerOrders \ --producer --allow-principal User:Alice $ bin/kafka-acls.sh --bootstrap-server localhost:9092 \ --command-config admin.props --add --resource-pattern-type PREFIXED \ --topic customer --operation Read --allow-principal User:Bob ``` AclAuthorizer 有兩個 config options 可以一次核准所有權限: 當 allow.everyone.if.no.acl.found=true,則所有 users 都自動擁有所有權限,而不用特別設定 ACLs。 *這個設定適合第一次建立 cluster 或是開發環境。* ``` super.users=User:Carol;User:Admin (user 之間用分號分隔 allow.everyone.if.no.acl.found=true ``` super User 可以使用所有操作,而且就算有 deny ACLs 也沒辦法。 如果 super user credential 被洩漏,Carol 就一定要從 super user 中移除,brokers 也要重啟才能套用。 比較安全的做法還是只設定特定的權限,這樣管理也比較容易。 ### Customizing Authorization - 可以自己設計想要什麼樣的權限,例如:role-base access control,或是限制 request 過來是哪種 listener。 ![SmartSelect_20240412_101626_Samsung Notes](https://hackmd.io/_uploads/SJRKFMUgR.jpg) - 也可以搭配 external 系統做 role,group based access control 的設計,例如:ldap ![SmartSelect_20240412_102543_Samsung Notes](https://hackmd.io/_uploads/SJeAszIlA.jpg) - 如果 users 有在 ldap 中的 role 或 group,就可以擁有客製化對 group 和 role 的權限。 - 這樣你就可以用 kafka 的 acl tools,直接對 role 和 group 做權限設定。 ![SmartSelect_20240412_103651_Chrome](https://hackmd.io/_uploads/H12ICzLeA.jpg) ### Security Considerations - acl 是存在 ZooKeeper 中,所以 zookeeper 的使用上也必須要有限制。或是將 acl 另外存放到 external db。 - acl 以最小權限為原則,像是可以以部門來做權限劃分,相同權限 group 用相同的 prefix 命名,較好做權限管理。這種情境就可以用 service credential,而不是個人的 credential。 ### Auditing 主要使用 log4j 控制 log 的設定。 **log4j** - kafka.authorizer.logger: 主要設定 authorization 相關的 log,只有開到 debug level 才會有 allow 的 log,不然 info 只會有被 denied 的 log。 ``` DEBUG Principal = User:Alice is Allowed Operation = Write from host = 127.0.0.1 on resource = Topic:LITERAL:customerOrders for request = Produce with resourceRefCount = 1 (kafka.authorizer.logger) INFO Principal = User:Mallory is Denied Operation = Describe from host = 10.0.0.13 on resource = Topic:LITERAL:customerOrders for request = Metadata with resourceRefCount = 1 (kafka.authorizer.logger) ``` - kafka.request.logger: request debug level log 只會有 client host 和 principals 的資訊,要有完整 request detail log,就要設定成 trace level。 ``` DEBUG Completed request:RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=producer-1, correlationId=6) -- {acks=-1,timeout=30000,partitionSizes=[customerOrders-0=15514]},response: {responses=[{topic=customerOrders,partition_responses=[{partition=0,error_code=0 ,base_offset=13,log_append_time=-1,log_start_offset=0,record_errors=[],error_mes sage=null}]}],throttle_time_ms=0} from connection 127.0.0.1:9094-127.0.0.1:61040-0;totalTime:2.42,requestQueueTime:0.112,localTime:2.15,remoteTime:0.0,throttleTime:0,responseQueueTime:0.04,sendTime: 0.118,securityProtocol:SASL_SSL,principal:User:Alice,listener:SASL_SSL,clientInf ormation:ClientInformation(softwareName=apache-kafka-java, softwareVersion=2.7.0-SNAPSHOT) (kafka.request.logger) ``` 這兩個都是很重要的指標,可以用來追蹤 request,避免可疑活動。 Authorization log 發現 authorize fail,可以查看攻擊來源。 ### Securing ZooKeeper **SASL** - 如果 zookeeper 以 sasl 設定,sasl 有提供在 jaas configuration file 中加入 java.security.auth.login.config 可以指定 login module。 enable kerberos 設定如下: ``` Server { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/zk.keytab" principal="zookeeper/zk1.example.com@EXAMPLE.COM"; }; ``` enable SASL authentication 設定如下: 設定 authentication 在 zookeeper config 中, ``` authProvider.sasl=org.apache.zookeeper.server.auth.SASLAuthenticationProvider kerberos.removeHostFromPrincipal=true kerberos.removeRealmFromPrincipal=true #這兩個 kerberos 設定可以確保所有 brokers 權限設定是一樣的。 ``` kafka 也要設定連進 ZooKeeper 的設定,會在 jaas config file 中加入以下: ``` Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/broker1.keytab" principal="kafka/broker1.example.com@EXAMPLE.COM"; }; ``` **SSL** ZooKeeper 不像 kafka,如果同時設定 ssl 和 SASL,就要兩種連線都成功才允許 access,zookeeper 只要其中一種成功就允許 access。 - 要使用 ssl 就要指定 key stores 的設定 - 如果要做 client authentication,就要加上 trust store,以認證 client 的 certificate。 ``` secureClientPort=2181 serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider ssl.keyStore.location=/path/to/zk.ks.p12 ssl.keyStore.password=zk-ks-password ssl.keyStore.type=PKCS12 ssl.trustStore.location=/path/to/zk.ts.p12 ssl.trustStore.password=zk-ts-password ssl.trustStore.type=PKCS12 ``` - kafka 端也需要做連線到 ZooKeeper 的設定,需要加上 trust store,以驗證 zookeeper。 如果加上需要驗證 client,就要再加上 key store。 ``` zookeeper.ssl.client.enable=true zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty zookeeper.ssl.keystore.location=/path/to/zkclient.ks.p12 zookeeper.ssl.keystore.password=zkclient-ks-password zookeeper.ssl.keystore.type=PKCS12 zookeeper.ssl.truststore.location=/path/to/zkclient.ts.p12 zookeeper.ssl.truststore.password=zkclient-ts-password zookeeper.ssl.truststore.type=PKCS12 ```