# A unidirectional Topic Operator ## Context The Strimzi Topic Operator (TO) provides a Kubernetes API for viewing and modifying topics within a Kafka cluster. It deviates from the standard operator pattern by being bidirectional. That is, it will reconcile changes to a `KafkaTopic` resource both to _and from_ a Kafka cluster. The bidirectionality means applications and users can continue to use Kafka-native APIs (such as the `Admin` client) and tooling (e.g. the scripts provided by Apache Kafka) as required. The TO makes use of ZooKeeper znode watches to know about changes to topic state within the Kafka cluster. The advent of KRaft means that the TO won't be able to rely on this mechanism. In order to continue to provide a kube-native API for managing Kafka topics we've considered some alternatives which would work with a KRaft-based Kafka cluster: * Using polling via the Kafka `Admin` client. * Using a KRaft observer The polling approach, while simple, has significant drawbacks. It would scale poorly (in terms of CPU and memory) with the number of topics, and would also suffer from increased latency between the time a change was made in Kafka and when it was refelected in Kubernetes. The increased latency would widen the window for making conflicting changes. The observer approach could work, but comes with some serious drawbacks. * The Kafka project does not provide a publicly-supported API for running a KRaft observer, so we'd need to make use of internal APIs which could change between releases without warning. This prepresents a potential maintenance burden to the project. * The operator would require persistent storage (for the replicated `__cluster_metadata` log), which would make the operator significantly more difficult to operate. For example if it were realized as single pod `Deployment` then the requirement for a persistent volume would tie it to a single AZ, meaning it would not be as highly available as the current operator. This could be worked around by having multiple observers with only a single elected leader actually in charge of making modifications at any one time. Overall, it is clear that the complexity of the operator would be significantly increased by persuing this direction. It's also worth reviewing _why_ people use the TO. There are two broad use cases: 1. Declarative deployments, for example in the form of gitops. 2. Using `KafkaTopics` operationally as the API through which to manage all topics. Based on opened issues and Slack conversations with uses, it seems that 1. is the most common reason people use the TO. However, the current TO doesn't do a great job at satisfying this use case. * It only supports a single namespace, which means TO admins cannot easily follow a workspace-per-dev team model. It also means that infrastrusture level topics (e.g. `__consumer_offsets` and `__transaction_state`) comingle with per-application topics. * It doesn't *enforce* a single source of truth, allowing for changes done directly in Kafka to conflict with declarative deployments. Moreover, the existing TO doesn't do a good job with item 2. either: * It only surfaces a subset of topic state through the Kube API (e.g. it doesn't include partition leadership/ISR etc). While this is intentional (it doesn't make sense to overload etcd to present this kind of dynamic information), it undermines this whole use case of the TO. For these reasons we are exploring what a unidirectional TO would look like, in terms of implementation and operational complexity and addressible use cases. ### Goals * A unidirectional TO which is KRaft compatible * Support for multiple namespaces ### Non-goals * Supporting RF change (this could be added later) ## Proposal ### CR API We would remain compatible with the existing `KafkaTopic` resource at the API level. This means that all existing resources could be handled by the new operator and all existing fields of the `status` would still be present for applications that consume the status. However, the semantics of a `KafkaTopic` would change incompatibly: The `spec` would no longer be updated when Kafka-side configuration changed. Instead if Kafka-side configuration was changed out-of-band (e.g. by using the `Admin` client, or Kafka shell scripts) those changes would eventually (depending on the timed reconciliation interval) be reverted. As context for the rest of this document, an example `KafkaTopic` looks like this: ```yaml= kind: KafkaTopic metadata: generation: 123 name: consumer.offsets spec: topicName: __consumer_offsets replicas: 3 partitions: 50 config: retention.ms: 123456789 status: topicName: __consumer_offsets observedGeneration: 123 conditions: - type: Ready status: True lastTransitionTime: 20230301T103000Z ``` Where: * `spec.topicName` is used for topic names which are not legal as kube resources. When this field is absent the name of the topic is Kafka is taken from the `metadata.name`. * `status.topicName` is used to detect a change to the `spec.topicName`, which results in an error * `metadata.generation == status.observedGeneration` imples that the operator has reconciled the latest change to the `spec` New fields will be introduced in the following text. ### The "single resource" principal To avoid ambiguity we will continue to require that a single `KafkaTopic` is used to manage a single topic in a single Kafka cluster. The fact that there are legal topic names (in Kafka) that are not legal resource names (in Kube) provided the motivation for supporting `spec.topicName`, so that a topic in Kafka could be managed by a `KafkaTopic` with a different `metadata.name`. As such it's not possible for the Kube apiserver to enforce the single resource principal because multiple `KafkaTopic` resource might refer to the same topic (i.e. the uniqueness constraint on `metadata.name` is insufficient). It is therefore left to the operator to detect if there are multiple `KafkaTopics` which point to the same topic in Kafka. To do this, the operator will keep an in-memory mapping of topic name to <namespace, name> pairs. This will allow us to detect the case where multiple resources are attempting to manage the same topic in Kafka. When this happens both resources' `Ready` status will change to `False` with suitable `reason`. ```yaml= kind: KafkaTopic metadata: generation: 123 namespace: some-namespace name: foo status: topicName: __consumer_offsets observedGeneration: 123 conditions: - type: Ready status: False reason: ResourceConflict message: Also managed by some-namespace/bar lastTransitionTime: 20230301T103000Z --- kind: KafkaTopic metadata: generation: 456 namespace: some-namespace name: bar status: topicName: __consumer_offsets observedGeneration: 456 conditions: - type: Ready status: False reason: ResourceConflict message: Also managed by some-namespace/foo lastTransitionTime: 20230301T103000Z ``` ### Unidirectionality As mentioned we would only synchronize topic state from Kube to Kafka. Users and applications could create, delete and modify topics using the Kafka protocol (e.g. via the `Admin` client) which would **not** be reflected by a `KafkaTopic` in Kube. In other words, the existence of at least some `KafkaTopic` resources would be decoupled from the existence of a corresponding topic in Kafka. While users could chose to create a topic in Kafka by creating a `KafkaTopic`, they could also create and eventually delete topics directly without a `KafkaTopic` ever existing. ### "Managed" and "unmanaged" topics We will refer to a topic which exists in a Kafka cluster and has a corresponding `KafkaTopic` as a _managed topic_. A topic which exists in a Kafka cluster without a matching `KafkaTopic` is an _unmanaged topic_. Naturally any topic created due to the creation of a `KafkaTopic` is managed from the outset. --- #### Example 1: `KafkaTopic` creation when topic does NOT exist in Kafka > 1. The user creates a `KafkaTopic`: > > ```yaml= > metadata: > generation: 1 > name: foo > spec: > partitions: 12 > replicas: 50 > ``` > > 2. The operator checks its in-memory map to see whether `foo` is already managed by some other resource. > > 1. If so, then both resources are updated with error conditions in their `status`. > 2. If not, it creates the topic in Kafka and updates the `status` > > ```yaml= > metadata: > generation: 1 > name: foo > spec: > topicName: foo > partitions: 12 > replicas: 50 > status: > observedGeneration: 1 > topicName: foo > conditions: > - type: Ready > status: True > lastTransitionTime: 20230301T103000Z > ``` > --- Because topics can be created directly in Kafka we need a mechanism for converting an unmanaged topic to a managed topic, aka "managing" the topic. This can easily be done by creating a matching `KafkaTopic`: The operator will attempt the `CreateTopics` request and if it receives a `TOPIC_ALREADY_EXISTS` will proceed with reconciliation as it would for a `KafkaTopic` modification. That is, the configuration of the topic in Kafka will be changed to match the `KafkaTopic` (or fail for the same possible reasons, with the same error conditions in the `status`). --- #### Example 2: `KafkaTopic` creation when topic exists in Kafka This is exactly the same as Example 1. --- ### Modification to `KafkaTopics` and timed reconciliation Via a Kube watch, and also based on a timer, the operator will ensure that the state of a managed topic in a Kafka cluster reflects the `spec` of its `KafkaTopic`. The process will first describe the topic and its configs and then make alterations, if required. * Modification to topic config will be via `incrementalAlterConfigs` using the `SET` operation. * Creation of partitions will be supported * Deletion of partitions is not supported by Kafka and will result in an suitable `status.condition` in the `KafkaTopic`. ```yaml status: conditions: - type: Ready status: False reason: NotSupported message: Decrease of spec.partitions is not supported by Kafka lastTransitionTime: 20230301T103000Z ``` The user will then need to decide how to proceed, but typically they might just revert the `spec.partitions`. * Changes to `spec.replicas` will be supported via CC, but not in the first phase of development. ```yaml status: conditions: - type: Ready status: False reason: NotSupported message: Changing spec.replicas is not supported by the operator lastTransitionTime: 20230301T103000Z ``` > Note that both these `NotSupported` reasons can arise both from changes being made to the `KafkaTopic`, or changes made directly in Kafka. For example, the TO cannot tell whether the `spec.partitions` was decreased, or whether it was unchanges and the conflict arises because someone increases the number of partitions directly in Kafka so that it looks like the `KafkaTopic` is requesting a decrease. Unlike the existing TO the unidirectional TO will reconcile batches of the known topics spread over the reconciliation interval, in order to smooth CPU and heap usage, lowering overall resource requirements. ### Topic deletion We will use a [Kube finalizer](https://kubernetes.io/blog/2021/05/14/using-finalizers-to-control-deletion/) for deletion. This means that deletion via the kube REST API will first mark the resource as scheduled for deletion, allowing the operator to handle the deletion and then update the `KafkaTopic` so that the resource is actually removed from `etcd`. Using a finalizer: * Avoids the operator needing to diff sets of existing topics to figure out `KafkaTopics` which have been deleted while the operator is not running. * Allows errors during deletion to be reported via the `KafkaTopic`'s `status`. ```yaml status: conditions: - type: Ready status: False reason: KafkaError message: Deletion failed: ${Error_Message} lastTransitionTime: 20230301T103000Z ``` --- #### Example 3: `KafkaTopic` deletion propagates to Kafka > 1. The topic is currently Kube-managed > > ```yaml= > metadata: > generation: 123 > name: foo > finalizer: > - strimzi.io/topic-operator > spec: > # ... > status: > observedGeneration: 123 # == generation => it is Kube-managed > ``` > > 2. The user then deletes the resource (e.g. `kubectl delete kafkatopic foo`). > Because of the presence of a `metadata.finalizer` the resource remains present. > Instead Kube adds the `metadata.deletionTimestamp` field. > > ```yaml= > metadata: > generation: 124 > name: foo > finalizer: > - strimzi.io/topic-operator > deletionTimestamp: 20230301T000000.000 > spec: > # ... > status: > observedGeneration: 124 # == generation => it is not syncrhonized > ``` > > 3. The operator notices the `metadata.deletionTimestamp` field. > 1. If there is more than one `KafkaTopic` which is trying to manage this topic then deletion from Kafka is not attempted; > the operator will remove the finalizer and Kube will remove the resource; upon timed reconciliation the condition on the other `KafkaTopic` will be removed and reconciliation via that other `KafkaTopic` will proceed as normal. > 2. If deletion is disallowed via `delete.topic.enable=false` broker configuration then the operator will remove the finalizer and Kube will remove the resource. The topic in Kafka will thus become unmanaged. > 3. Otherwise, the operator attempted to deletes the topic from Kafka. > 1. If deletion succeeds the operator will remove `strimzi.io/topic-operator` from the `metadata.finalizer`. > 2. If deletion failed with `UNKNOWN_TOPIC_OR_PARTITION` (i.e. the topic didn't exist) the operator will continue to remove the finalize. I.e. this case is not treated as an error. > 3. If deletion failed with any other error this is reported via a condition in the `status.conditions` and the finalizer is not removed. > > Note: deletion will be done via topic id, not topic name. > > **TODO** does this entirely prevent create/delete races? > --- Similarly to how it is possible to "manage" an existing unmanaged topic in Kafka, we will also provide a mechanism for "unmanaging" a managed topic, via a `spec.managed` flag. This will allow a `KafkaTopic` to be deleted **without** the topic in Kafka being deleted. This can be useful operationally, for example to change the `metadata.name` of a `KafkaTopic`. > An alternative to `spec.managed` would be "explicit deletion" via a `spec.delete` flag. > This alternative was rejected because it's incompatible with gitops. --- #### Example 4: `KafkaTopic` deletion WITHOUT propagation to Kafka > 1. Assume the topic is currently Kube-managed > > ```yaml= > metadata: > generation: 123 > name: foo > finalizer: > - strimzi.io/topic-operator > spec: > # ... > status: > observedGeneration: 123 # == generation => it is Kube-managed > ``` > > 2. The user changes the new `spec.managed` field to `False` so that the topic is no longer Kube-managed. > > ```yaml= > metadata: > generation: 124 > name: foo > finalizer: > - strimzi.io/topic-operator > spec: > managed: False # do not synchronize to Kafka > status: > observedGeneration: 123 # != generation => it is Kube-managed > ``` > > 3. The user waits until the operator has observed that change, indicated by the `status.observedGeneration` matching the `metadata.generation`: > > ```yaml= > metadata: > generation: 124 > name: foo > finalizer: > - strimzi.io/topic-operator > spec: > managed: False # do not synchronize to Kafka > status: > observedGeneration: 124 # == generation => it is now Kube-managed > ``` > > 4. The user can then delete the resource. > Because of the presence of a `metadata.finalizer` the resource is not actually deleted. > Instead Kube adds the `metadata.deletionTimestamp` field. > > ```yaml= > metadata: > generation: 124 > name: foo > finalizer: > - strimzi.io/topic-operator > deletionTimestamp: 20230301T000000.000 > spec: > managed: False # do not synchronize to Kafka > status: > observedGeneration: 124 # == generation => it is not syncrhonized > ``` > > 5. The operator notices the `metadata.deletionTimestamp` field. > Since the `spec.magnaged` is `False` it does **not** attempt topic deletion. > 1. It removes `strimzi.io/topic-operator` from the `metadata.finalizer`. > 2. Kube proceeds to remove the resource. Note that `spec.managed=False` means that a `KafkaTopic` can exist without there being a corresponding topic in Kafka. --- ### Multi-namespace support The operator will support multiple namespaces, subject to the _single resource principal_ [described above](#The-%E2%80%9Csingle-resource%E2%80%9D-principal). Detected violation in the multi-namespace case will use the same `Ready=False, reason: ResourceConflict` condition. It will be possible move the `KafkaTopic` for a topic in Kafka from one namespace to another by: * Either 1. Unmanging the `KafkaTopic` (as described above) 2. deleting it, 3. creating a new one in the new namespace. * Or 1. Creating a new `KafkaTopic` in the new namespace, making both resources have the `Ready=False, reason: ResourceConflict` condition. 2. Deleting the old one, which will remove the error condition on the new `KafkaTopic`, as described above. #### Cluster id The possibility of multiple operator instances each managing multiple namespaces increases the possibility of misconfiguration resulting in two operator instances observing the same namespace. To help prevent this we will add the Kafka cluster id to the `status`: ```yaml! kind: KafkaTopic spec: # ... status: topicName: foo clusterId: abcdef ``` If the operator detects a mismatch it will log an error, increment a METRIC and ignore the resource > We chose to ignore the `KafkaTopic` rather than add an error condition because we assume there exists another operator instance observing this resource which anyway end up resetting the `status` on its timed reconciliation, so a condition is not a good way of detecting this situation. #### Namespace Policy To help avoid collisions where two or more `KafkaTopics` are trying to manage the same topic in Kafka the operator will support an optional policy for which namespaces can manage which topics. --- #### Example > Conceptually (i.e. this might not be configured via YAML): > > ```yaml= > policy: > - namespace: team.a > topicNamePrefixes: > - foo-app. > - bar-app- > topicNames: > - config-foo > - namespace: team.b > topicNamePrefixes > - quux-app. > topicNames: > - config-quux > - namespace: kafka.cluster.admins > otherTopics: True > ``` > > Thus: > > - Topics `foo-app.x` and `foo-app.y` can only be managed by `KafkaTopics `in namespace `team.a` (due to the `foo-app.` prefix), likewise `bar-app-x` and `bar-app-y` (due to the `bar-app-` prefix). Additionally `config-foo` can be only managed from this namespace (due to the `topicNames` list). > - Topics `quux-app.x` and `quux-app.y` can only be managed by `KafkaTopics `in namespace `team.b` (due to the `quux-app.` prefix). > Additionally `config-quux` can be only managed from this namespace (due to the `topicNames` list). > - Topics `config-gee`, `__consumer_offsets`, and `__transaction_state` could only be managed from the `kafka.cluster.admins` namespace. > > Rules for legal policies: > > * All `topicNamePrefixes` must be disjoint (i.e. not given prefix is a prefix of any other prefix). > * Not `topicNamePrefix` maybe be a prefix of of any listed `topicNames`. > * Only one namespace may have `otherTopics: True` > > These rules are sufficent to guarantee that any given topic maps unambiguously to a single namespace. > If a `KafkaTopic` is created that violates these rules the operator will update the `Ready` condition with a suitable error explaining the policy violation. > If two `KafkaTopics` are created, only one of which violates these rules then only that one will get the `Ready=false` policy violation condition and the `KafkaTopic` which conforms to the policy will be reconciled normally. --- <!-- ### Scheduled auto-management The operator will support an option to periodically detect topics in Kafka which do not exist in Kube and create `KafkaTopics` for them. This will operate on a different schedule to the timed reconciliation of existing `KafkaTopic` resources. This requires: 1. being able to map a topic in Kafka back to a unique namespace in Kube 2. a policy on how to name the `KafkaTopic` to be created For 1. We will use a mapping from sets of disjoint topic name prefixes to namespace. > Using topic name prefixes is slightly less flexible than regular expressions, but allows the operator to validate disjointness thus removing the possibility of ambiguity in the mapping. For 2: We will support a simple template-based mechanism for computing the `metadata.name` from the topic name. Placeholders will be enclosed in `${` and `}`. Supported placeholder expressions are: * `SUFFIX-` that portion of the lower-cased topic name remaining after removal of the prefix, with illegal characters (e.g. `_`) replaced by `-` * `SUFFIX.` that portion of the lower-cased topic name remaining after removal of the prefix, with illegal characters (e.g. `_`) replaced by `.` * `NAME-` the complete lower-cased topic name, with illegal characters (e.g. `_`) replaced by `-` * `NAME.` the complete lower-cased topic name, with illegal characters (e.g. `_`) replaced by `.` * `HASH` a SHA-1 based hash (or prefix thereof, dependning on size constraints) based on the topic name. This allows to disambiguate names which might otherwise collide (e.g. `foo` and `FOO` are distinct topics in Kafka, but would have the same `${NAME-}`). --- #### Example > Conceptually (i.e. this might not be configured via YAML): > > ```yaml= > mappings: > - topicNamePrefix: foo-app. > namespace: team.a > nameTemplate: foo.${SUFFIX.} > - topicNamePrefix: bar-app- > namespace: team.a > nameTemplate: ${NAME-} > - topicNamePrefix: quux-app. > namespace: team.b > nameTemplate: ${NAME.} > defaultNamespace: > namespace: kafka.cluster.admins > ``` > > Thus: > > 0. a _managed topic_ called `foo-app.topic0_blah` would be ignored by auto creation (because it's already a managed topic). > 1. an _unmanaged topic_ called `foo-app.topic1_blah` would result in the creation of a `KafkaTopic` in namespace `team.a` with name `foo.topic1.blah` > 2. an _unmanged topic_ called `bar-app-topicA-B` would result in the creation of a `KafkaTopic` in namespace `team.a` with name `bar-app-topica-b` > 3. an _unmanged topic_ called `gee` would result in the creation of a `KafkaTopic` in namespace `kafka.cluster.admins` (because it doesn't match of the of the `topicNamePrefixes`) with name `gee-90c6dcf1767802b8bf571d0e31c491120e9b6131` > --- Depending on how these rules are configured it is possible for distinct topics in Kafka map to the same `KafkaTopic`. It may also happen that a similar conflict arises between a user-created `KafkaTopic` and an auto-created one. To detect this processing for any given `KafkaTopic` will be serialized and we will always check the topic being reconciled against the `status.topicName` prior to reconciliation. In this way a first-in-wins policy will apply: The `KafkaTopic` will continue to manage the same topic in Kafka, but an additional condition will record the conflict: ```yaml! status: topicName: foo conditions: - type: Ready status: False reason: KafkaError message: "Deletion failed: ${Error_Message}" lastTransitionTime: 20230301T103000Z - type: UniqueTopic status: False reason: TopicConflict message: "Topic 'FOO' also maps to this resource under auto-creation rules" lastTransitionTime: 20230301T103000Z ``` > Alternatively we could just log that we couldn't create a `KafkaTopic` for `FOO`, but that would be harder for the user to discover. **TODO:** * how to avoid CPU and memory spikes * Would timed reconciliation be able to delete? * How would we avoid flip-flop? --> <!-- ### Supporting RF change --- #### Example 1. The user changes `spec.replicas` 2. The operator describes the topic and determines that the number of replicas has changed 3. The operator uses the [`POST /kafkacruisecontrol/topic_configuration?topic=[topic_regex]&replication_factor=[target_replication_factor]`](https://github.com/linkedin/cruise-control/wiki/REST-APIs#change-kafka-topic-configuration) endpoint **TODO**: * Check whether this is Async, and the exact response schema from the POST request. * Describe the state machine. * Describe the `status` schema needed for the state machine - Include what happens if the KafkaTopic is deleted while the operation is ongoing -- need to cancel CC operation as well as deleting topic. * Describe how this might interact with `KafkaRebalance` CR - What if a rebalance is already in-progress? - What if an auto-rebalance starts running concurrently? * Decribe whether/how we might use batching to get better proposals when changing the RF of multiple `KafkaTopics` (CC's `topic` URL parameter is a regex) --- --> ### Operational considerations #### Changing `metadata.name` This WILL be supported by: 1. unmanaging the topic (`spec.managed=false`). 2. deleting the `KafkaTopic`. 3. recreating the `KafkaTopic` with a new name. #### Changing `spec.topicName` This will NOT be supported. It will be detected by a mismatch between `spec.topicName` and `status.topicName`. Changes thus detected will result in an error condition. #### Decreasing `spec.partitions` This will NOT be supported, because it's not supported by Kafka. The user will either need to revert the `spec.partitions`, or recreate the topic in Kafka (which they can do via the `KafkaTopic` or not). #### Changing `spec.relicas` This will NOT be supported using as part of this proposal. #### Recreating topics in the Kafka cluster This WILL be supported. #### Deployment of application with `KafkaTopics` Consider an application deployed via a manifest containing a `Deployment` and some `KafkaTopics`. If the Kafka cluster is configured with `auto.create.topics.enable` there is a race condition between: 1. the operator reacting to the new `KafkaTopics`. 2. the application starting and triggering autocreation of the topics. **If the operator wins:** then auto-creation will not happen, and the user's intent is honoured. **If the application wins:** then the topics will be created using the default configuration for the cluster. The operator will later reconcile the `KafkaTopics` which will result in reconfiguration the default configuration for auto-created topics differs from the `KafkaTopic.spec`. That reconfiguration will either succeed, or fail (e.g. because a `KafkaTopic` has a lower `spec.partitions` than the auto-created topic). To avoid this it is **recommended** that the Topic Operator not be used with cluster where `auto.create.topics.enable` is `true`. The operator will emit a `WARN` level log on startup if autocreation is enabled. Alternatively: App could be written to wait for topic existency, or use an init container to do the same. ### Disadvantages #### Compatibility with other tooling This approach is not easily compatible with other topic management tooling. This includes things like the `kafka-configs.sh` and other Apache Kafka scripts as well as any other tooling which uses the `Admin` client, including applications themselves. <!--Even using the "scheduled auto creation" option, changes done via the admin server would be reverted by the next timed reconciliation. --> In the particular case of an "Admin server", to provide CLI or UI functionality in a Strimzi-aware way it _might_ be possible to use wrap `Admin` (decorator pattern), or write it using some abstraction over `Admin` which could also support operations on `KafkaTopics` for those interactions which would otherwise result in conflicting sources of truth. Alternatively it might be possible to use a Kafka-aware proxy to redirect Kafka protocol messages which changed topic state to the Kube API, along with a Topic Operator which had a priviliged connection that was not proxied in this way. This would result in Kube being the effective source of truth for topic state.