# Pub/Sub for multiple webhook subscribers and single & multiple publishers
## Requirements
- [ ] API keys can be included in the request header instead of just being included as a URL parameter
### Primary
- [ ] Implement a mechanism to register multiple custom webhook URLs in a single-tenant scenario [existing implementation].
-- Should be backward compatible, built on top of existing implementation
-- Can be invoked using existing --multitenant argument or additional startup arguments or be loaded as a module using --plugin argument
### Secondary
- [ ] Implement a mechanism to register multiple custom webhooks for multiple tenants
-- Should be backward compatible, built on top of existing implementation
-- Can be invoked using existing --multitenant argument or additional startup arguments or be loaded as a module using --plugin argument
## Current Status
- Using --webhook-url CLI parameter to specify a single target_url
- Webhook is dispatched whenever a record is created or state is updated.
- Topics
-- connections
-- basicmessages
-- issue_credentials
-- present_proof
## High-level understanding of how multi-tenancy will work?
- ~~Start the agent with --multitenant and --jwt-secret argument.~~
- ~~Base wallet [root] is created~~
- ~~Can create subwallets using POST /multitenancy/wallet. We provide wallet name and key. In the end, we are provided with an auth_token and also wallet route is added~~
- ~~In the inbound message handler, we use the extracted recipient key [to be implmeneted] to retrieve the wallet record and switch context.~~
- ~~Specified subwallet will now be used.~~
## Approach [Draft v2]
:::info
**ACA-Py start-up**
- Enable the pubsub service using --pubsub-enabled argument. During ACA-Py initialization, the pubsub service will be built and run in a container at port 9040 unless already specified using --pubsub-port parameter. If port 9040 already in use, we will increment by 1 till it is successful and save it as an environment variable. We will use ngrok to set up a tunnel to this host URL [http://host.docker.internal:9040 or 172.17.0.1:9040] and show it in the terminal along with other details. Pub/Sub service APIs will be accessible using this URL.
- Overide default port for pubsub service by specifying a specific port using --pubsub-port CLI parameter.
:::
:::info
**Publisher registration**
**Note**: No APIn m key is required in the header
- During ACA-Py initialization process, when pubsub service is successfuly running and tunnel to it is setup, we will automatically register the default publisher [single tenant, name: root] with the Pub/Sub service. In case, subscribers do not specify a publisher, then it will default to "root".

- Additional publishers [tenants] can be registered using the above workflow by the admin of ACA-Py.
:::
<!-- :::info
**Generate new subscriber API keys**

::: -->
:::info
**Subscriber Registration**

- expiration_window [optional] is the time window in which the pub/sub service will try to resend the notification to the subscriber in case of failure [not receiving HTTP status starting with 2]. In case of subsequent failures inside the failure handler, the record will be popped and added again to the queue until the expiration_window.
:::
:::info
**Workflow**
- ~~Persistent~~ Storage
-- Subscribers and Subscriptions are dictionaries.
-- Transaction and Failure Queues
-- Topics [already predefined], Completed and Publishers Sets
- Components of Pub/Sub service
-- API Service to support registration and publication functions.
-- Celery App with RabbitMQ and Redis backend
--- A call_webhook task to dispatch webhooks asyncronously and update failure queue in case needed.
--- A failure_handler task to retry requests for failures.
--- A transaction_handler which pops records in transaction queue, create multiple call_webhook celery tasks [one for each subscriber_id] and assign to workers.

- Single Tenant
1. ACA-Py is started in the terminal with --pubsub-enabled
2. In the background, pubsub service is built and start running on http://host.docker.internal:9040. A ngrok tunnel is setup to this host URL and is accessible via https://5057493e.ngrok.io. This URL is presented to users as the pubsub_base_url in the ACA-Py terminal. Also, default publisher with name root gets registered.
3. On the pubsub service side, call_webhook and failure_handler celery workers are started as subprocesses
celery -A call_webhook worker --concurrency n-1
celery -A failure_handler worker --concurrency 1
where n is the number of cores
4. SubscriberA can subscribe to pub/sub service using POST https://5057493e.ngrok.io/register_subscriber. Additional details such as webhook_URL, topics to subscribe must be included. The optional publisher argument can be defined as "root" [in case of missing, it defaults to root]. A new record with a generated subscriber_id as the key is added to subscriber dictionary. If publisher_name.topic_name key exists in subscription dict then subscriber_id is added to set otherwise a new key is added to the dict and subscriber_id added to an empty set as value.
5. On the ACA-Py agent side, the dispatcher will publish using POST https://5057493e.ngrok.io/publish, with payload data, topic name and registered publisher_name included in the request body. As the recipient key to the registered name map has only 1 record, the default value "root" is used.
6. A new dictionary record is added to the transaction queue publisher_name.topic_name as the key attribute and payload as the data.
7. Task handler pops record from transaction queue. Using key from the record and extracting associated subscription_ids , we add tasks to call_webhook workers [one for each subscription_id] to dispatch webhooks async.
8. Inside the call_webhook celery app task, in case of successful response [status code starting with 2], we do nothing. In case of failure, we add a dictionary record in the following format and add to the failure queue.
```
{
"key": published_name.topic_name,
"subscriber_id": "....",
"payload" : {
},
"added_datetime": "....",
"expiration_window": 600
}
```
11. Similar to task_handler, failure_handler pops the Failure Queue and assign those tasks to the failure_handler worker.
12. Inside the failure task handler, first we check current_datetime < added_datetime + expiration_windows. If it is not then we end that particular task right there. Otherwise, we retry dispatching of the webhook, in case of success we do nothing and return but in case of another failure, we add the record back to failure queue.
- Multi Tenant
In case of multi-tenancy, it is consistent with the above workflow. The only major differences are:
1. Agent admin manually registers additional publishers/tenants with the pubsub service
2. In the background, on the ACA-Py agent side, inside the dispatcher, we extract the recipient key from the wallet record in use [from the context] and get the registered name for publisher. This is used when publishing messages/notification to the pubsub service.
:::
## Expectations
- The above proposal is scalable and will meet both primary and secondary requirements
- High throughput by having multiple [n-1] workers dedicated to calling webhook_url asynchronously.
- Can handle failures with subscribers not recieving notification by using
-- retry logic working within a limited expiration_window
-- A dedicated worker to handle retrying
## Timeline
Based on feedback, I will finalize the approach and design, and then provide an estimated timeline for the implementation.
| Deliverable | ETA |
| ----------------- |:----------------------- |