# Postgres Events Pub Sub Goal is to provide notification of interesting events (record creation, record deletion) and enable subscription to those events. ## Options There are a couple options for doing this in postgres, though each fundamentally rely on a trigger to invoke some action that sends a message: ```plantuml actor user user -> table: insert table -> messenger: trigger messenger -> subscriber: message ``` ### Option 1. Use a language like `plpythonu3` In this case the `trigger` is used to call a procedure defined in plpythonu3 (or similar scripting language). This procedure can execute any python code (within the environment of the python interpreter linked to the plpythonu3 language), and so can communicate messages to a consumer application that may be local or remote. The protocol used to send the messages is defined by the plpython script implementation. ```plantuml actor user user -> table: insert table -> plpython: trigger plpython -> subscriber: message ``` The plpythonu3 script runs within postgres and is executed with each trigger. Hence, there are obvious performance factors to consider. This approach is a bit invasive in that the python language extension needs to be installed. In practice, the plpythonu3 language setup has been a bit cumbersome. Note however, that the extension is available by default in pg 13+. ### Option 2. Use `NOTIFY`/`LISTEN` In this case the `trigger` runs a `pg_notify()` method or executes a `NOTIFY` SQL statement. A consumer application running on an event loop with a connection of the postgres instance uses `LISTEN` to detect events emitted by the `NOTIFY` operation. ```plantuml actor user participant table participant pg_notify subscriber -> pg_notify: LISTEN note right: Subscriber has an\nongoing connection to\npostgres with LISTEN. user -> table: insert table -> pg_notify: trigger pg_notify -> subscriber: NOTIFY.message ``` A benefit of this approach is minimal change to postgres - no extensions or additions are required beyond the trigger to call `NOTIFY`. `NOTIFY` messages are limited to 8000 bytes. So generally a key is sent with expectation that a query will be issued to get more information associated with the notification. If the listener is not operating postgres will continue with normal operations. If there are no listeners, then messages are a NOOP. ```sql -- -- send a NOTIFY message to 'sysm_watch' channel as JSON -- CREATE OR REPLACE FUNCTION sysmeta_notify() RETURNS trigger as $sysmeta_notify$ DECLARE doc_id varchar; BEGIN SELECT docid INTO doc_id FROM identifier WHERE guid=NEW.guid; PERFORM pg_notify( 'sysm_watch', json_build_object('op', TG_OP, 'pid', NEW.guid, 'docid', doc_id)::text ); RETURN NULL; END; $sysmeta_notify$ LANGUAGE plpgsql; CREATE TRIGGER sysmeta_notify AFTER INSERT OR UPDATE ON systemmetadata FOR EACH ROW EXECUTE PROCEDURE sysmeta_notify(); ``` To drop the trigger: ```sql DROP TRIGGER sysmeta_notify ON systemmetadata; ``` ### Option 3. Use an extension like `pg_amqp` `pg_amqp` is a small extension that provides a mechanism to send a message to an AMQP compatible queue service such as RabbitMQ. ```plantuml actor user participant table participant amqp user -> table: insert table -> amqp: trigger amqp -> RabbitMQ: AMQP message ``` The `pg_amqp` extension is easy to install and use of a well established messaging protocol significantly reduces the amount of custom code needed. A service like RabbitMQ has broad language support and can be scaled across multiple servers. The approach supports the option to send much richer messages than the NOTIFY mechanism (RabbitMQ suggests up to 128MB for messages). A warning is issued by postgres if `amqp.publish` is called without the AMQP service running, however there does not appear to be any disruption of normal operations. ```sql -- Add the extension CREATE EXTENSION amqp; -- Insert a queue target INSERT INTO amqp.broker (host, port, vhost, username, password) VALUES ('localhost','5672','/','guest','guest') RETURNING broker_id; -- Send a message with parameters: -- queue target id -- -- SELECT amqp.publish( -- 1, '', 'pg_test', -- json_build_object('key','value','key2',2)::text, -- 1, 'application/json', 'reply to', 'correlation' ); ``` Grant permissions to the `amqp` schema: ```sql GRANT USAGE ON SCHEMA amqp TO metacat; GRANT SELECT ON ALL TABLES IN SCHEMA amqp TO metacat; ``` ```sql -- -- send an AMQP message to channel 1 as JSON record -- CREATE OR REPLACE FUNCTION sysmeta_amqp() RETURNS trigger as $sysmeta_amqp$ BEGIN PERFORM amqp.publish( 1, -- broker id '', -- vhost name 'sysm_watch', -- queue name row_to_json(NEW)::text, -- payload 1, -- 1=not persistent, 2=persistent 'application/json', -- message type '', -- reply_to TG_OP -- correlation_id ); RETURN NULL; END; $sysmeta_amqp$ LANGUAGE plpgsql; CREATE TRIGGER sysmeta_amqp AFTER INSERT OR UPDATE ON systemmetadata FOR EACH ROW EXECUTE PROCEDURE sysmeta_amqp(); ``` To drop the trigger: ```sql DROP TRIGGER sysmeta_amqp ON systemmetadata; ``` There's also `pg-amqp-bridge` which is similar to `pg-amqp`, though is implemented with a pattern like option 2 above. Also, `pgsql-listen-exchange` implemented in option 2 pattern.