ds0nt's blog

Postgres Listen / Notify Real-time Notifications in Go

If you don’t know, postgres listen / notify is just a pub/sub mechanism. A database client can connect and listen on any channel, meanwhile postgres can notify on any channel. It’s like redis pub sub, or whatever mongo has.

So, the easiest way to turn whatever row gets updated into a text payload is to convert the row to json.

CREATE OR REPLACE FUNCTION bridge_notify() RETURNS trigger AS $$
DECLARE
    -- ...
	payload jsonb;
BEGIN
    -- ...
	payload = jsonb_build_object(
		'op', to_jsonb(TG_OP),
		'bucket', bucket
	);
    PERFORM pg_notify('bridge', payload::TEXT);
    RETURN NEW;

END;
$$ LANGUAGE plpgsql;

Personally, I store all of my data in two tables, in event sourced style.

It looks like this:

CREATE TABLE IF NOT EXISTS bridge_snaps (
	bucket	VARCHAR,
	id 		VARCHAR,
	version INT,
	ts		BIGINT,
	val 	JSONB
);

CREATE TABLE IF NOT EXISTS bridge_events (
	bucket	VARCHAR,
	id 		VARCHAR,
	version INT,
	type	INT,
	ts		BIGINT,
	val 	JSONB
);

The bridge_snaps table contains the latest version of any data. The data is json serialized in the val column.

The bridge_events table contains all events leading up to the latest version. Something like, a create event, and a few update events. The val column in the case is the json serialized event parameters… I should name that better.

So in my case, I want it so that any update to the bridge_snaps table is going to result in a usable object via a pg notification in my Go program.

On the Go side, things are easy

package main

import (
	"context"
	"encoding/json"
	"os"
	"strconv"
	"time"

	"github.com/jackc/pgx"
	"github.com/pkg/errors"
	log "github.com/sirupsen/logrus"
)

func main() {
	ctx := context.Background()

	pool, err := startPostgres(ctx)
	if err != nil {
		log.Fatalln(err)
	}

	time.Sleep(time.Second * 10)
}
func envPGConfig() pgx.ConnConfig {
	port, _ := strconv.Atoi(os.Getenv("DB_PORT"))
	return pgx.ConnConfig{
		Host:     os.Getenv("DB_HOST"),
		Port:     uint16(port),
		Database: os.Getenv("DB_DATABASE"),
		User:     os.Getenv("DB_USER"),
		Password: os.Getenv("DB_PASSWORD"),
	}
}

var tablesSQL = `
CREATE TABLE IF NOT EXISTS bridge_snaps ( ...
`

type notification struct {
	Op     string                 `json:"op"`
	Bucket string                 `json:"bucket"`
	ID     string                 `json:"id"`
	Val    map[string]interface{} `json:"val"`
	Old    map[string]interface{} `json:"old"`
}

func startPostgres(ctx context.Context) (*pgx.ConnPool, error) {
	pool, err := pgx.NewConnPool(pgx.ConnPoolConfig{
		ConnConfig: envPGConfig(),
		AfterConnect: func(c *pgx.Conn) error {
			_, err := c.Exec(tablesSQL)
			if err != nil {
				return errors.Wrap(err, "error running initial SQL statements")
			}

			// this subscribes our connection to the 'bridge' channel, the channel name can be whatever you want.
			err = c.Listen("bridge")
			if err != nil {
				return err
			}

			go func() {
				for {
					// if ctx is done, err will be non-nil and this func will return
					msg, err := c.WaitForNotification(ctx)
					if err != nil {
						log.Errorln(errors.Wrap(err, "WaitForNotification error"))
						return
					}
					meta := &notification{}
					err = json.Unmarshal([]byte(msg.Payload), &meta)
					if err != nil {
						log.Errorln(errors.Wrap(err, "json.Unmarshal error"))
						return
					}

					log.Println("Got a message from postgres!!!", meta)
				}
			}()

			return nil
		},
    })
    return pool, err
}

And to generate the notification from Postgres, we need to write a procedure and create a few triggers for it on our table.


DO $$
BEGIN
	IF NOT EXISTS (SELECT 1 FROM pg_proc WHERE proname = 'bridge_notify') THEN

        CREATE OR REPLACE FUNCTION bridge_notify() RETURNS trigger AS $FN$
        DECLARE
            hasNew bool = false;
            hasOld bool = false;
            size int;
            val jsonb;
            old jsonb;
            bucket jsonb;
            id TEXT;
            payload jsonb;
            txtload TEXT;
            counter INTEGER = 0;
        BEGIN
            IF TG_OP = 'INSERT' THEN
                hasNew = true;
                val = NEW.val;
            ELSEIF TG_OP = 'UPDATE' THEN
                hasNew = true;
                hasOld = true;
                val = NEW.val;
                old = OLD.val;
            ELSE
                old = OLD.val;
                hasOld = true;
            END IF;
            
            IF hasOld THEN
                bucket = to_jsonb(OLD.bucket);
                id = OLD.id;
            ELSE
                bucket = to_jsonb(NEW.bucket);
                id = NEW.id;
            END IF;
            
            payload = jsonb_build_object(
                'op', to_jsonb(TG_OP),
                'id', to_jsonb(id),
                'bucket', bucket
            );
            IF hasNew THEN
                payload = jsonb_set(payload, '{val}', NEW.val, true);
            END IF;
            IF hasOld THEN
                payload = jsonb_set(payload, '{old}', OLD.val, true);
            END IF;

            PERFORM pg_notify('bridge', payload::TEXT);
            
            RETURN NEW;

        END;
        $FN$ LANGUAGE plpgsql;

	END IF;

	IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'bridge_notify_update') THEN
		CREATE TRIGGER bridge_notify_update AFTER UPDATE ON bridge_snaps FOR EACH ROW EXECUTE PROCEDURE bridge_notify();
	END IF;
	IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'bridge_notify_insert') THEN
		CREATE TRIGGER bridge_notify_insert AFTER INSERT ON bridge_snaps FOR EACH ROW EXECUTE PROCEDURE bridge_notify();
	END IF;
	IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'bridge_notify_delete') THEN
		CREATE TRIGGER bridge_notify_delete AFTER DELETE ON bridge_snaps FOR EACH ROW EXECUTE PROCEDURE bridge_notify();
	END IF;
END $$;

Large Payloads via Chunking

Our postgres function has a few constraints that need to be addressed:

So in order to send large payloads, we need to either run another select statement, or we can send a bunch of notifications with small bits of payloads.

Also, in testing I ran into problems with that if you send two notifications that are the same, it might only send one of them. So for instance {"val":{"todo":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"}} if I split it every 10 characters (rediculously small), could end up with some missing X’s go side.

So in order to chunk things up were going to send 1000 chars of payload, with it’s ID, and a unique counter id to avoid the second problem above.

A few payloads like: <36 char uuid><counter>:<1000 chars of payload> <36 char uuid>EOF

and a final EOF payload to signal to golang that it’s received everything, and can start deserializing things.

On the Go side, we can change our for loop to handle chunking by introducing an in-memory map to hold our partial data until we get an EOF like this:

    payloadMap := map[string]*bytes.Buffer{}
    // ...

    // first 36 characters are UUID, bytes up to ':' is counter, the rest is up to 1000 characters of JSON
    id := msg.Payload[:36]
    buff, ok := payloadMap[id]
    if !ok {
        buff = bytes.NewBuffer([]byte{})
        payloadMap[id] = buff
    }
    colonI := strings.Index(msg.Payload[36:], ":") + 37
    chunk := msg.Payload[colonI:]
    if chunk != "EOF" {
        buff.WriteString(chunk)
        continue
    }
    delete(payloadMap, id)

    meta := &notification{}
    err = json.Unmarshal(buff.Bytes(), &meta)
    
    // ...

and the corresponding SQL change

    -- ...
	txtload = payload::TEXT;

	size = length(txtload);
	WHILE counter <= size LOOP
		PERFORM pg_notify(
			'bridge', 
			concat(id, counter::TEXT, ':', substr(txtload, counter, 1000)) -- append counter to ensure non-identical payload strings
		);
		counter = counter + 1000;
	END LOOP;
	PERFORM pg_notify('bridge', concat(id, 'EOF'));
	
	RETURN NEW;
END;