# PubSub **Author**: @aat ## Description Event streaming in FTL will provide Kafka-like persistent topic support. It will initially be implemented entirely within FTL but will be forward compatible with Kafka. ## Prior Reading See the [FTL Primitives](/OKNVpxm1QkmYnvGCvd8N1w) doc, which contains a description of the different PubSub messaging patterns. ## Goals - Automatically provision topics. - Automatic connect subscribers to topics. - Simple, straightforward, but flexible topic creation. - [Retries](/FiZFN6PQRDqrYAdDwyK0-g) - Simple mechanism for creating a consumer. - Forward compatible with Kafka. - Delivery will be at least once. - Error handling - a Verb designated to handle any subscriber failures (?) ## Non-Goals (optional) Initially, we won't support: - Dead letter queues. - Complex configuration. - At-most once delivery, locking, etc. ## Design Topics will initially be modelled in PostgreSQL to avoid the overhead of having to manage Kafka or similar infrastructure. The schema is described below. As with most resources in FTL, PubSub objects are described declaratively. The FTL tooling will analyse the source code and extract the definitions of topics and subscriptions from these declarations. These will then be defined in the schema, and provisioned based on that definition. ### Schema Topics and subscriptions will look something like the following in the FTL schema: ```javascript module payments { topic payins PaymentEvent subscription paymentProcessing payins verb payin(PayinRequest) PayinResponse { +ingress http POST /payment } sink processPayin(PaymentEvent) { +subscribe paymentProcessing } } ``` ### Programmatic interface #### Define a topic Go: ```go var payins = ftl.RegisterTopic[PayinEvent]("payins") ``` #### Create a subscription Go: ```go var paymentProcessing = ftl.RegisterSubscription(payins, "paymentProcessing") ``` #### Consume from a subcription This is not type-safe in Go's type system, but is in FTL's. ```go //ftl:subscribe paymentProcessing func ProcessPayin(ctx context.Context, req PayinEvent) error { // ... } ``` Publish to a topic: ```go //ftl:verb //ftl:ingress POST /payin func Payin(ctx context.Context, req PayinRequest) (PayinResponse, error) { if err := payins.Publish(PayinEvent{ /* ... */ }); err != nil { return PayinResponse{}, fmt.Errorf("failed to publish event: %w", err) } return PayinResponse{}, nil } ``` ## Implementation Topic/subscription creation will occur when a deployment is encountered that contains a resource that doesn't already exist. Creation of the resource will occur synchronously for now, as it's a few simple SQL row insertions. A new singleton scheduled task will be added to the controller that will periodically: 1. Enumerate all subscribers 2. For each subscription, use the hash ring to determine if this controller owns it. 3. If it does, ### Data model ```sql CREATE TABLE topics ( id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), -- Each topic is associated with an owning module? module BIGINT NOT NULL REFERENCES modules(id), -- Name of the topic. name VARCHAR NOT NULL, -- Data reference to the payload data type in the owning module's schema. type VARCHAR NOT NULL ); CREATE UNIQUE INDEX topics_module_name_idx ON topics(module, name); -- This table contains the actual topic data. CREATE TABLE topic_events ( id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), topic_id BIGINT NOT NULL REFERENCES topics(id) ON DELETE CASCADE, payload BYTEA NOT NULL ); -- A subscription to a topic. -- -- Multiple subscribers can consume from a single subscription. CREATE TABLE topic_subscriptions ( id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), topic_id BIGINT NOT NULL REFERENCES topics(id) ON DELETE CASCADE, -- Name of the subscription. name VARCHAR UNIQUE NOT NULL, -- Cursor pointing into the topic_event table. cursor BIGINT NOT NULL REFERENCES topic_event(id) ON DELETE CASCADE ); -- A subscriber to a topic. -- -- A subscriber is a 1:1 mapping between a subscription and a sink. CREATE TABLE topic_subscribers ( id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), topic_subscriptions_id BIGINT NOT NULL REFERENCES topic_subscriptions(id), -- Fully-qualified reference to the consumer sink. sink_ref VARCHAR NOT NULL ); -- Mapping functions for a topic. -- -- CREATE TABLE topic_mappers ( id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), input_topic_subscriptions_id BIGINT NOT NULL REFERENCES topic_subscriptions(id), output_topic_id BIGINT NOT NULL REFERENCES topics(id), verb_ref VARCHAR NOT NULL ); ```