1. 16
    1. 6

      This is very interesting. I have solved this problem a different way.

      Instead of firing NOTIFY for the data directly, I write a row into a queue table with table, row metadata. The insert into the queue fires NOTIFY, and the listeners can look up the correct record from the metadata. Every message has a monotonic sequence number, and listeners track the newest sequence they’ve seen. When they start up, they select all rows newer than the latest. Poor-mans Kafka, basically.

      1. 1

        This is a cool pattern!

        1. 1

          Presumably listeners store their most-recently-seen sequence in Postgres too?

          1. 1

            There’s more to the picture, but I’m leaving stuff out for brevity. But yes, you should persist the monotonic sequence. This value will essentially become part of a vector clock once this grows to multiple systems.

          2. 1

            We do this exact thing all the time, works great.

            1. 1

              How do you deal with downtime of all listeners in combination with ill-ordered sequence numbers due to failed transactions?

              Basically a listener could read number 2, then go down - then number 1 is being written (because the transaction took longer than for number 2) and the notify does not reach the listener because it’s down, then the listener starts up again, reads from 2 and gets notifications for 3 and 4 etc but misses 1.

              1. 3

                Okay, I’ll paint a more detailed picture. The goal is to replicate data changes from one system to another in real-time. They live in entirely different datacenters.

                System 1 A traditional, Postgres-backed UI application

                1. An UPDATE happens in our Postgres database related to data we care about
                2. This fires a trigger that writes a row into our event queue table, combined with a sequence number. the sequence number comes from a Postgres Sequence.
                3. The INSERT fires a NOTIFY that alerts a publisher process.
                4. The publisher uses the NOTIFY metadata to query the changed data and produces an idempotent JSON payload
                5. The payload is published to an ordered GCP pubsub topic using the sequence number as ordering key
                6. When the event is successfully published to the topic, the event row is deleted from the queue

                System 1 (restart)

                1. publisher queries event queue for any unprocessed events
                2. LISTEN for new events

                System 2 An in-memory policy engine with a dedicated PostgreSQL DB

                1. A pool of journaler processes watch the pubsub topic for events
                2. When an event happens, a journaler writes it to the journal table and acknowledges the message
                3. The journal INSERT fires a NOTIFY
                4. Every replica of the system has a replicator process that received the NOTIFY
                5. The replicator consumes the event payload and updates the in-memory data
                6. The specific record retains the sequence number that created it, and a global sequence is updated monotonically
                7. A heartbeat process checks that the in-memory system is live, and updates its heartbeat row in the PostgreSQL DB with its current sequence
                8. A reaper process runs on an interval that deletes stale heartbeat records (a process restart will have a different identity)
                9. a pgcron job finds the oldest current sequence in the heartbeat table and flushes any journal entries below it
                10. flushed journal entries are written to a permanent cache that enforces monotonic updates, so that only the newest version of each record is retained

                System 2 (restart) An in-memory dataset means a restart is a blank slate.

                1. begin by loading the permanent cache into memory
                2. query the journal for all records newer than the current global sequence
                3. LISTEN for new journal writes

                Because the sequence is atomically incremented by Postgres, this means that a failed transaction results in a discarded sequence. That’s fine.

                We are using DB tables as durable buffers so that we don’t lose messages during a system restart or crash.

                If there is a race condition where a record is updated again before the previous event is published, that’s also fine because the event is idempotent so whatever changes have taken place will be serialized.

                We use an ordered pubsub topic on GCP to reduce the possibility of out-of-order events on the consuming side, and we enforce monotonic updates at the DB level.

                1. 1

                  First of all, thanks a lot for the detailed reply. I appreciate!

                  I think you actually currently have a potentially race-condition (or “transaction-condition) bug in your application. You are saying:

                  Because the sequence is atomically incremented by Postgres, this means that a failed transaction results in a discarded sequence. That’s fine.

                  It is atomically incremented, but it is not incremented in order. I made a mistake by mentioning “failed transactions” but what I actually meant was a long-lasting transaction. So indeed, a discarded sequence number does not matter, I agree with you on that. But what does matter is if a transaction takes a long time. Let’s say 1 hour just for fun even if it’s unrealistic.

                  In that case, when the transaction starts the sequence number is picked (number 1). Another transaction starts and picks 2. The second transaction finishes and commits. The trigger runs, the notify goes out and the listener picks it up and writes it somewhere. It remembers (and persists?) the number 1. NOW the listener goes down. Alternatively, it just has a temporary network issue. During that time, the second transaction finally commits. It triggers the notify, but the listener does not get it. Now the listener starts (or the network issue is resolved). A third transaction commits with number 3. The listener receives the notify with number 3. It checks it’s last transaction number which is 2, so it is happy to just read in 3. Transaction 1 and its event will now be forgotten.

                  Possible solution that I can see: if skipped sequence numbers are detected, they need to be kept track of and need to be retried for a certain time period T. Also, on restart, the listener needs to read events from the past for a certain time period T. This T must be longer than a transaction could possibly take.

                  Alternatively: on a notify, don’t query by the meta data from the notify. Use timestamps and always query by timestamp > now() - T. To improve efficieny, keep track of the sequence numbers processed during the last query and add them to the query AND sequence_number not in [...processed sequence numbers].

                  There also seems to be a way to work with transaction IDs directly and detect such cases but this is really relying on postgres internals then.

                  1. 1

                    I had a similar issue with a log (basically kafka for lazy folk who like postgres), and ended up adding annotating log entries with the current transaction Id, and on each notification, querying for all rows whose transaction IDs were smaller than that of the longest-running open transaction. I wrote it up too.

                    1. 1

                      Oh, that’s very interesting. Don’t you have to handle txid wraparound in that case?

                      1. 2

                        Yeah, you’re right, but the functions I mention return a 64-bit value (transaction Id and an epoch), and I was pretty happy that it’d take several million years for that to wrap around. At which point it would be someone else’s problem (if at all).

                    2. 1

                      Thanks for going into more detail. I don’t think the scenario you’re describing is actually a problem for us, I’ll bring you even further into the weeds and let’s see if you agree.

                      The event queue is populated by a trigger than runs AFTER INSERT OR UPDATE OR DELETE, which detects changes in certain data we care about and INSERTs a row into the event queue table.

                      The INSERT triggers the sequence increment, because this is the default value of the seq column.

                      So, the only sort of race condition that I could think of is multiple changes hit the table for the same table row in rapid succession, but remember that the publisher emits idempotent event payloads. So we might end up with multiple events with the same data… but we don’t care, as long as it’s fresh.

                      The ordered deliver from Google pubsub means that rapid updates for one row will most likely arrive in order, but even if not we enforce monotonic updates on the receiving end. So in the unlikely event that we do process events for a row out-of-order, we just drop the first event and use the newer one.

                      1. 2

                        The SQL statement (including the trigger and everything it does) runs within the same transaction - and it has to, so that either the insert/update/delete happens AND the insert into the event queue table happens, or none of it happens.

                        So then, the problem with a “hanging” transaction is possible (depending on how your queries look like and come in I suppose) and that means there will be points in time where a listener will first see an event with number X added and then, afterwards(!), will see an event with number X-1 or X-2 added. Unless I somehow misunderstand how you generate the number in the seq-column, but this is just a serial right?

                        The ordered deliver from Google pubsub means that rapid updates for one row will most likely arrive in order, but even if not we enforce monotonic updates on the receiving end. So in the unlikely event that we do process events for a row out-of-order, we just drop the first event and use the newer one.

                        So if you have only one producer and your producer basically only runs one transaction at a time before starting a new one, then the problem does not exist because there are no concurrent transactions. So if one transaction were hanging, nothing would be written anymore. If this is your setup, then you indeed don’t have the problem. It would only arise if there are e.g. two producers writing things and whatever they write increases the same serial (even if they write completely different event types and even insert into different tables). I kind of assumed you would have concurrent writes potentially.

                        Also see the response and the link that noncrap just made here - I think it explains it quite well. From his post:

                        The current implementation is perfectly fine when there is only a single producer, when you start to have multiple producers logging to the same place using batched operations (transactions), it it turns out that this can fail quite badly.

                        1. 1

                          The SQL statement (including the trigger and everything it does) runs within the same transaction - and it has to, so that either the insert/update/delete happens AND the insert into the event queue table happens, or none of it happens.

                          So then, the problem with a “hanging” transaction is possible (depending on how your queries look like and come in I suppose) and that means there will be points in time where a listener will first see an event with number X added and then, afterwards(!), will see an event with number X-1 or X-2 added.

                          What you’re describing here appears to contradict PostgreSQL’s documentation on the READ COMMITTED isolation level:

                          UPDATE, DELETE, SELECT FOR UPDATE, and SELECT FOR SHARE commands behave the same as SELECT in terms of searching for target rows: they will only find target rows that were committed as of the command start time. However, such a target row might have already been updated (or deleted or locked) by another concurrent transaction by the time it is found. In this case, the would-be updater will wait for the first updating transaction to commit or roll back (if it is still in progress). If the first updater rolls back, then its effects are negated and the second updater can proceed with updating the originally found row. If the first updater commits, the second updater will ignore the row if the first updater deleted it, otherwise it will attempt to apply its operation to the updated version of the row.

                          https://www.postgresql.org/docs/current/transaction-iso.html#XACT-READ-COMMITTED

                          It is certainly possible that the documentation does not accurately reflect reality, so let’s look at Kyle Kingsbury’s analysis of PG:

                          In most respects, PostgreSQL behaved as expected: both read uncommitted and read committed prevent write skew and aborted reads.

                          https://jepsen.io/analyses/postgresql-12.3

                          So as far as I can tell, my design will work exactly as intended.

                          1. 1

                            I guess I’m misunderstanding your setting. I was thinking you have multiple processes writing into one table A (insert or update or delete) potentially in parallel / concurrently, then you have a trigger that writes an event into table B. And then you have one or more listener processes that read from table B.

                            If you remove the “parallel / concurrently” from the whole setting, then the problem I mentioned does not exist. But that is at the cost of performance / availability. So if you lock the whole table A for writes when making an update, then the situation I’m talking about will not happen. But that is only if you lock the whole table! Otherwise, it will only be true for each row - and that is what the documentation you quoted is talking about. It’s talking about the “target row” and not “the target table”.

                            1. 1

                              I was thinking you have multiple processes writing into one table A (insert or update or delete) potentially in parallel / concurrently, then you have a trigger that writes an event into table B. And then you have one or more listener processes that read from table B.

                              Yes, that is correct. The publisher LISTENs to Table B for new events, and the event payload indicates what data has been mutated. It queries the necessary data to generate an idempotent payload which is published to pubsub. So if there are multiple, overlapping transactions writing the same row, READ COMMITTED will prevent stale reads. There is no need to lock entire tables, standard transaction isolation is all we need here.

                              If we were to introduce multiple publisher processes, with multiple LISTEN subscriptions, then we would need to lock the event rows as they are being processed and query with SKIP LOCKED.

              2. 4

                This is possible if you are just looking from postgresql wire protocol level. In the protocol design, backend responds frontend query with a row definition, followed by resultset row by row. So it’s totally possible for a frontend to send a sql subscribe query, or say live query, and waits for the backend streaming results row by row until a ReadyToQuery message indicates the end. I think it’s due to Postgres’ connection management and process model so this technique isn’t widely adopt.

                I created an example in my library pgwire to demo this: https://github.com/sunng87/pgwire/blob/test/example-streaming/examples/streaming.rs In psql you will see the results arrive in 10 seconds, but at the network level, it’s actually available to client every 1 second

                1. [Comment removed by author]

                🇬🇧 The UK geoblock is lifted, hopefully permanently.