--- title: Asynchronous Notifications in Postgres tags: postgresql, yoctol, workshop --- # Asynchronous Notifications in Postgres https://www.postgresql.org/docs/11/tcn.html 先把 tcn(triggered_change_notification) 裝起來 ``` CREATE EXTENSION tcn; ``` 建立 trigger ``` CREATE TRIGGER actions_notification_trigger AFTER INSERT OR UPDATE OR DELETE ON actions FOR EACH ROW EXECUTE PROCEDURE triggered_change_notification (); ``` 聽 ``` LISTEN tcn; ``` 對 table 做出操作 ``` kurator=# INSERT INTO "public"."actions" ("order", "type", "descriptor", "actionable_id", "actionable_type", "platform") VALUES ('1', 'text', '{"text": "已經在專人線上回覆模式囉!欲切換回機器人模式,請點選按鈕。", "buttons": [{"id": "qjeOd64yTZ", "type": "postback", "title": "回到機器人模式", "payload": "466303161796137991", "defaultVisible": true, "webviewSetting": {"extension": false, "heightRatio": "full"}, "trackingUrlEnabled": false}]}', '466303161695474694', 'composite_actions', 'messenger'); INSERT 0 1 Asynchronous notification "tcn" with payload ""actions",I,"id"='814150981284533461'" received from server process with PID 4791. ``` In other process ``` kurator=# listen tcn; LISTEN Asynchronous notification "tcn" with payload ""actions",I,"id"='814150981284533461'" received from server process with PID 4791. ``` https://citizen428.net/blog/asynchronous-notifications-in-postgres/ 更厲害的 notify ``` kurator=# CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$ DECLARE record RECORD; payload JSON; BEGIN IF (TG_OP = 'DELETE') THEN record = OLD; ELSE record = NEW; END IF; payload = json_build_object('table', TG_TABLE_NAME, 'action', TG_OP, 'data', row_to_json(record)); PERFORM pg_notify('events', payload::text); RETURN NULL; END; $$ LANGUAGE plpgsql; CREATE FUNCTION ``` 建立 trigger ``` CREATE TRIGGER actions_better_notification_trigger AFTER INSERT OR UPDATE OR DELETE ON actions FOR EACH ROW EXECUTE PROCEDURE notify_event (); ``` 聽 ``` LISTEN events; ``` 操作 table ``` kurator=# INSERT INTO "public"."actions" ("order", "type", "descriptor", "actionable_id", "actionable_type", "platform") VALUES ('1', 'text', '{"text": "已經在專人線上回覆模式 囉!欲切換回機器人模式,請點選按鈕。", "buttons": [{"id": "qjeOd64yTZ", "type": "postback", "title": "回到機器人模式", "payload": "466303161796137991", "defaultVisible": true, "webviewSetting": {"extension": false, "heightRatio": "full"}, "trackingUrlEnabled": false}]}', '466303161695474694', 'composite_actions', 'messenger'); INSERT 0 1 Asynchronous notification "tcn" with payload ""actions",I,"id"='814156125556446423'" received from server process with PID 4791. Asynchronous notification "events" with payload "{"table" : "actions", "action" : "INSERT", "data" : {"id":814156125556446423,"order":1,"type":"text","descriptor":{"text": "已經在專人線上回覆模式囉!欲切換回機器人模式,請點選按鈕。", "buttons": [{"id": "qjeOd64yTZ", "type": "postback", "title": "回到機器人模式", "payload": "466303161796137991", "defaultVisible": true, "webviewSetting": {"extension": false, "heightRatio": "full"}, "trackingUrlEnabled": false}]},"actionable_id":466303161695474694,"actionable_type":"composite_actions","platform":"messenger","created_at":"2020-09-26T07:42:53.310387+00:00","updated_at":"2020-09-26T07:42:53.310387+00:00","deleted_at":null}}" received from server process with PID 4791. ``` In other process ``` kurator=# listen events; LISTEN Asynchronous notification "tcn" with payload ""actions",I,"id"='814158846627943642'" received from server process with PID 4791. Asynchronous notification "events" with payload "{"table" : "actions", "action" : "INSERT", "data" : {"id":814158846627943642,"order":1,"type":"text","descriptor":{"text": "已經在專人線上回覆模式囉!欲切換回機器人模式,請點選按鈕。", "buttons": [{"id": "qjeOd64yTZ", "type": "postback", "title": "回到機器人模式", "payload": "466303161796137991", "defaultVisible": true, "webviewSetting": {"extension": false, "heightRatio": "full"}, "trackingUrlEnabled": false}]},"actionable_id":466303161695474694,"actionable_type":"composite_actions","platform":"messenger","created_at":"2020-09-26T07:48:17.688478+00:00","updated_at":"2020-09-26T07:48:17.688478+00:00","deleted_at":null}}" received from server process with PID 4791. Asynchronous notification "tcn" with payload ""actions",I,"id"='814158854186079451'" received from server process with PID 4791. Asynchronous notification "events" with payload "{"table" : "actions", "action" : "INSERT", "data" : {"id":814158854186079451,"order":1,"type":"text","descriptor":{"text": "已經在專人線上回覆模式囉!欲切換回機器人模式,請點選按鈕。", "buttons": [{"id": "qjeOd64yTZ", "type": "postback", "title": "回到機器人模式", "payload": "466303161796137991", "defaultVisible": true, "webviewSetting": {"extension": false, "heightRatio": "full"}, "trackingUrlEnabled": false}]},"actionable_id":466303161695474694,"actionable_type":"composite_actions","platform":"messenger","created_at":"2020-09-26T07:48:18.584534+00:00","updated_at":"2020-09-26T07:48:18.584534+00:00","deleted_at":null}}" received from server process with PID 4791. ``` 五分鐘做完 pg event listener ``` > mkdir pg-notify-listener > cd pg-notify-listener > yarn add pg ``` copy this into index.js 填入你的 db 參數 ```javascript= const { Client } = require('pg') const client = new Client({ user: 'kurator', host: '127.0.0.1', database: 'kurator', password: 'kurator', port: 5430, }) ;(async () => { await client.connect() await client.query('listen events') console.log('start listening channel \'events\''); client.on('notification', function (msg) { console.log(`heard ${JSON.stringify(JSON.parse(msg.payload), 0, 2)}, from channel: ${msg.channel}`) }) })() ``` 執行它 ``` node index.js start listening channel 'events' ``` https://github.com/emilbayes/pg-ipc/blob/6884a2925d3f38ad443f5829ba9d625b79fd4323/index.js#L23-L30 https://github.com/brianc/node-postgres/blob/f0fc470d88b782607563040eb126455a7fbfb3b1/packages/pg/lib/client.js#L371-L373 https://github.com/brianc/node-postgres/blob/master/packages/pg/lib/connection.js https://github.com/emilbayes/pg-ipc https://github.com/brianc/node-postgres/blob/master/packages/pg/lib/client.js 當然以上的 trigger listen notify 都可以做其他應用