# Postgres Events Pub Sub
Goal is to provide notification of interesting events (record creation, record deletion) and enable subscription to those events.
## Options
There are a couple options for doing this in postgres, though each fundamentally rely on a trigger to invoke some action that sends a message:
```plantuml
actor user
user -> table: insert
table -> messenger: trigger
messenger -> subscriber: message
```
### Option 1. Use a language like `plpythonu3`
In this case the `trigger` is used to call a procedure defined in plpythonu3 (or similar scripting language). This procedure can execute any python code (within the environment of the python interpreter linked to the plpythonu3 language), and so can communicate messages to a consumer application that may be local or remote. The protocol used to send the messages is defined by the plpython script implementation.
```plantuml
actor user
user -> table: insert
table -> plpython: trigger
plpython -> subscriber: message
```
The plpythonu3 script runs within postgres and is executed with each trigger. Hence, there are obvious performance factors to consider.
This approach is a bit invasive in that the python language extension needs to be installed. In practice, the plpythonu3 language setup has been a bit cumbersome. Note however, that the extension is available by default in pg 13+.
### Option 2. Use `NOTIFY`/`LISTEN`
In this case the `trigger` runs a `pg_notify()` method or executes a `NOTIFY` SQL statement. A consumer application running on an event loop with a connection of the postgres instance uses `LISTEN` to detect events emitted by the `NOTIFY` operation.
```plantuml
actor user
participant table
participant pg_notify
subscriber -> pg_notify: LISTEN
note right: Subscriber has an\nongoing connection to\npostgres with LISTEN.
user -> table: insert
table -> pg_notify: trigger
pg_notify -> subscriber: NOTIFY.message
```
A benefit of this approach is minimal change to postgres - no extensions or additions are required beyond the trigger to call `NOTIFY`.
`NOTIFY` messages are limited to 8000 bytes. So generally a key is sent with expectation that a query will be issued to get more information associated with the notification.
If the listener is not operating postgres will continue with normal operations. If there are no listeners, then messages are a NOOP.
```sql
--
-- send a NOTIFY message to 'sysm_watch' channel as JSON
--
CREATE OR REPLACE FUNCTION sysmeta_notify() RETURNS trigger as $sysmeta_notify$
DECLARE
doc_id varchar;
BEGIN
SELECT docid INTO doc_id FROM identifier WHERE guid=NEW.guid;
PERFORM pg_notify(
'sysm_watch',
json_build_object('op', TG_OP, 'pid', NEW.guid, 'docid', doc_id)::text
);
RETURN NULL;
END;
$sysmeta_notify$ LANGUAGE plpgsql;
CREATE TRIGGER sysmeta_notify AFTER INSERT OR UPDATE ON systemmetadata
FOR EACH ROW EXECUTE PROCEDURE sysmeta_notify();
```
To drop the trigger:
```sql
DROP TRIGGER sysmeta_notify ON systemmetadata;
```
### Option 3. Use an extension like `pg_amqp`
`pg_amqp` is a small extension that provides a mechanism to send a message to an AMQP compatible queue service such as RabbitMQ.
```plantuml
actor user
participant table
participant amqp
user -> table: insert
table -> amqp: trigger
amqp -> RabbitMQ: AMQP message
```
The `pg_amqp` extension is easy to install and use of a well established messaging protocol significantly reduces the amount of custom code needed. A service like RabbitMQ has broad language support and can be scaled across multiple servers.
The approach supports the option to send much richer messages than the NOTIFY mechanism (RabbitMQ suggests up to 128MB for messages).
A warning is issued by postgres if `amqp.publish` is called without the AMQP service running, however there does not appear to be any disruption of normal operations.
```sql
-- Add the extension
CREATE EXTENSION amqp;
-- Insert a queue target
INSERT INTO amqp.broker
(host, port, vhost, username, password)
VALUES
('localhost','5672','/','guest','guest')
RETURNING broker_id;
-- Send a message with parameters:
-- queue target id
--
-- SELECT amqp.publish(
-- 1, '', 'pg_test',
-- json_build_object('key','value','key2',2)::text,
-- 1, 'application/json', 'reply to', 'correlation' );
```
Grant permissions to the `amqp` schema:
```sql
GRANT USAGE ON SCHEMA amqp TO metacat;
GRANT SELECT ON ALL TABLES IN SCHEMA amqp TO metacat;
```
```sql
--
-- send an AMQP message to channel 1 as JSON record
--
CREATE OR REPLACE FUNCTION sysmeta_amqp() RETURNS trigger as $sysmeta_amqp$
BEGIN
PERFORM amqp.publish(
1, -- broker id
'', -- vhost name
'sysm_watch', -- queue name
row_to_json(NEW)::text, -- payload
1, -- 1=not persistent, 2=persistent
'application/json', -- message type
'', -- reply_to
TG_OP -- correlation_id
);
RETURN NULL;
END;
$sysmeta_amqp$ LANGUAGE plpgsql;
CREATE TRIGGER sysmeta_amqp AFTER INSERT OR UPDATE ON systemmetadata
FOR EACH ROW EXECUTE PROCEDURE sysmeta_amqp();
```
To drop the trigger:
```sql
DROP TRIGGER sysmeta_amqp ON systemmetadata;
```
There's also `pg-amqp-bridge` which is similar to `pg-amqp`, though is implemented with a pattern like option 2 above. Also, `pgsql-listen-exchange` implemented in option 2 pattern.