Try   HackMD

Postgres Events Pub Sub

Goal is to provide notification of interesting events (record creation, record deletion) and enable subscription to those events.

Options

There are a couple options for doing this in postgres, though each fundamentally rely on a trigger to invoke some action that sends a message:

        
      

Option 1. Use a language like plpythonu3

In this case the trigger is used to call a procedure defined in plpythonu3 (or similar scripting language). This procedure can execute any python code (within the environment of the python interpreter linked to the plpythonu3 language), and so can communicate messages to a consumer application that may be local or remote. The protocol used to send the messages is defined by the plpython script implementation.

        
      

The plpythonu3 script runs within postgres and is executed with each trigger. Hence, there are obvious performance factors to consider.

This approach is a bit invasive in that the python language extension needs to be installed. In practice, the plpythonu3 language setup has been a bit cumbersome. Note however, that the extension is available by default in pg 13+.

Option 2. Use NOTIFY/LISTEN

In this case the trigger runs a pg_notify() method or executes a NOTIFY SQL statement. A consumer application running on an event loop with a connection of the postgres instance uses LISTEN to detect events emitted by the NOTIFY operation.

        
      

A benefit of this approach is minimal change to postgres - no extensions or additions are required beyond the trigger to call NOTIFY.

NOTIFY messages are limited to 8000 bytes. So generally a key is sent with expectation that a query will be issued to get more information associated with the notification.

If the listener is not operating postgres will continue with normal operations. If there are no listeners, then messages are a NOOP.

--
-- send a NOTIFY message to 'sysm_watch' channel as JSON

-- 
CREATE OR REPLACE FUNCTION sysmeta_notify() RETURNS trigger as $sysmeta_notify$
DECLARE
  doc_id varchar;
BEGIN
  SELECT docid INTO doc_id FROM identifier WHERE guid=NEW.guid; 
  PERFORM pg_notify(
    'sysm_watch',
    json_build_object('op', TG_OP, 'pid', NEW.guid, 'docid', doc_id)::text
  );
  RETURN NULL;
END;
$sysmeta_notify$ LANGUAGE plpgsql;

CREATE TRIGGER sysmeta_notify AFTER INSERT OR UPDATE ON systemmetadata
 	FOR EACH ROW EXECUTE PROCEDURE sysmeta_notify();

To drop the trigger:

DROP TRIGGER sysmeta_notify ON systemmetadata;

Option 3. Use an extension like pg_amqp

pg_amqp is a small extension that provides a mechanism to send a message to an AMQP compatible queue service such as RabbitMQ.

        
      

The pg_amqp extension is easy to install and use of a well established messaging protocol significantly reduces the amount of custom code needed. A service like RabbitMQ has broad language support and can be scaled across multiple servers.

The approach supports the option to send much richer messages than the NOTIFY mechanism (RabbitMQ suggests up to 128MB for messages).

A warning is issued by postgres if amqp.publish is called without the AMQP service running, however there does not appear to be any disruption of normal operations.

-- Add the extension
CREATE EXTENSION amqp;

-- Insert a queue target
INSERT INTO amqp.broker 
	(host, port, vhost, username, password) 
VALUES 
	('localhost','5672','/','guest','guest') 
RETURNING broker_id;

-- Send a message with parameters:
-- queue target id
-- 
-- SELECT amqp.publish(
-- 	1, '', 'pg_test',
--	json_build_object('key','value','key2',2)::text,
--	1, 'application/json', 'reply to', 'correlation' );

Grant permissions to the amqp schema:

GRANT USAGE ON SCHEMA amqp TO metacat;
GRANT SELECT ON ALL TABLES IN SCHEMA amqp TO metacat;
--
-- send an AMQP message to channel 1 as JSON record
-- 
CREATE OR REPLACE FUNCTION sysmeta_amqp() RETURNS trigger as $sysmeta_amqp$
BEGIN
  PERFORM amqp.publish(
    1,                       -- broker id
    '',                      -- vhost name 
    'sysm_watch',            -- queue name
    row_to_json(NEW)::text,  -- payload
    1,                       -- 1=not persistent, 2=persistent
    'application/json',      -- message type
    '',                      -- reply_to
    TG_OP                    -- correlation_id
  );
  RETURN NULL;
END;
$sysmeta_amqp$ LANGUAGE plpgsql;

CREATE TRIGGER sysmeta_amqp AFTER INSERT OR UPDATE ON systemmetadata
 	FOR EACH ROW EXECUTE PROCEDURE sysmeta_amqp();

To drop the trigger:

DROP TRIGGER sysmeta_amqp ON systemmetadata;

There's also pg-amqp-bridge which is similar to pg-amqp, though is implemented with a pattern like option 2 above. Also, pgsql-listen-exchange implemented in option 2 pattern.