1. 55
  1.  

    1. 12

      Perhaps I just missed it, but I think neither the article nor the links from the article that I read mention what I think is the biggest reason I’ve seen people recommend staying away from Postgres NOTIFY/LISTEN? From the docs:

      There is a queue that holds notifications that have been sent but not yet processed by all listening sessions. If this queue becomes full, transactions calling NOTIFY will fail at commit.

      This is a single, global queue for the entire database. I could be wrong here, but my understanding is if any client across the entire database executes LISTEN but is unable to process notifications fast enough, eventually all NOTIFY commands across all sessions will rollback any transaction they’re called in.

      1. 19

        The LISTEN/NOTIFY is an optimisation, your workers should still poll on a regular schedule. The LISTEN just helps minimising the time it takes for the next tick.

      2. 6

        Yep, I think that’s a valid critique and great feedback. Implementers should consider whether they’re comfortable shouldering that risk, or opting for periodic polling as a less-resource-efficient alternative.

        [edit]

        I’d also add that the risk of this is mitigated by either setting a global idle_in_transaction_session_timeout or explicitly setting it on any session with a LISTENer. Allowing unbounded idle time in transactions is a risk for a number of reasons unrelated to Postgres’ internal NOTIFY queue.

        1. 2

          Agree long transactions seem bad! I believe (but again, could be wrong) that this is a problem even without transactions, if you just have a client that is slow to process messages at the protocol level

      3. 2

        There are always limits somewhere. PG makes them known, documents them, fails reasonably and there is an easy solution for it (do an occasional poll if it’s been X amount of time since you’ve seen a NOTIFY as one example).

        It’s still a great default database. SQLite is also a great default database, provided you can keep your connections limited to a single machine.

    2. 16

      Postgres is the default database. Start with Postgres unless you have some reason you shouldn’t use the default.

    3. 7

      I really like this idea (both using pg for queues specifically, and the idea of avoiding introducing new moving parts if you don’t have to)! And in a world where everyone is obsessed with scale, I really like technology that is simple and designed for the scale it needs to achieve and no more.

      But one of the things that worries me about this approach is the (implied) practice of keeping a database transaction open for the entire duration of a long-running task, so as to hold the row-level lock on the job row you’re processing. (To be fair, most of the time I’ve been burnt by long-running transactions causing problems it’s been to do with holding a table-level lock, but given that any locks you acquire during a transaction you keep holding until you commit, it still makes me uneasy.)

      I feel like the better way to do this might be to have a field on the job table that indicates whether a job is being processed, and have the worker update that field when it starts and ends a job, and then combine that with SKIP LOCKED so that workers don’t end up waiting on each other to pick up new tasks. But then, you’d need to somehow make sure that, if a worker that falls over while its jobs are being processed, you doesn’t leave those jobs in limbo indefinitely—so that’s not a complete solution by itself either.

      Of course, this isn’t a criticism of the article—it’s focused more on the what and why than the how, so this level of implementation detail is probably intentionally out of scope for it. But it’s something worth considering!

      1. 4

        Thanks for the thoughtful feedback. I share your concern about having multiple transactions open for the duration of jobs.

        I’ll use neoq [1] as an example here, because I know the relevant code to point you to. The way I think about this is when we’re setting up background workers for a particular type of job, we should be aware of that job’s ~99th runtime percentile. Since transactions are idle when worker application code is in progress, that 99th percentile number (or whatever number you choose) can be used as input to idle_in_transaction_session_timeout [2] to dictate how long the transaction may sit idle while work is in progress (or when work is stuck). This is, of course, the last resort to prevent transactions from remaining open. We should first set a code level timeout [3] on every type of job; again, with that 99th percentile or the number of our choice as the input.

        The second part of this is concurrency control. Our degree of worker concurrency [4] dictates the number of open transactions. With timeouts and concurrency control, you’re guaranteed to never have more than N (degree of concurrency) open transactions waiting longer than the duration specified by our timeout.

        I feel like the better way to do this might be to have a field on the job table that indicates whether a job is being processed, and have the worker update that field when it starts and ends a job, and then combine that with SKIP LOCKED so that workers don’t end up waiting on each other to pick up new tasks

        Neoq does do this. This is done by using transactions and ROLLBACK any time an error occurs during job handler execution.


        Your comment is why I posted here instead of Hacker News. People here actually want to have a discussion, whereas those folks generally want to make salty comments :)

        [1] https://github.com/acaloiaro/neoq

        [2] https://github.com/acaloiaro/neoq/blob/main/backends/postgres/postgres_backend.go#L181

        [3] https://github.com/acaloiaro/neoq/blob/main/examples/add_job_with_timeout/main.go#L41

        [4] https://github.com/acaloiaro/neoq/blob/main/examples/add_job_with_custom_concurrency/main.go#L37

      2. 3

        I feel like the better way to do this might be to have a field on the job table that indicates whether a job is being processed, and have the worker update that field when it starts and ends a job, and then combine that with SKIP LOCKED so that workers don’t end up waiting on each other to pick up new tasks. But then, you’d need to somehow make sure that, if a worker that falls over while its jobs are being processed, you doesn’t leave those jobs in limbo indefinitely—so that’s not a complete solution by itself either.

        That’s essentially what we’re doing and it works quite well. To close the loop we have a last_seen timestamp field on the job that is periodically updated by the process that picked up the job, and a reaper process that re-enqueues jobs whose last_seen timestamp is older than some threshold.

      3. 3

        But then, you’d need to somehow make sure that, if a worker that falls over while its jobs are being processed, you doesn’t leave those jobs in limbo indefinitely—so that’s not a complete solution by itself either.

        Our solution to this at $WORK ended up being having a lease check-out table in addition, using a lease ID column rather than an “in use” column in the job table to track which worker picked up the job, and using triggers to release jobs if the lease was more than a short while old (2 minutes in our case).

    4. 6

      I don’t doubt that Postgres works well this way, but I think the main point is that SELECT ... FOR UPDATE SKIP LOCKED counters most of the reasoning behind the idea that databases are always the wrong choice for queues, rather than that Postgres specifically counters that idea.

      1. 2

        MySQL also has SKIP LOCKED. But as others have noted, advisory locks are also viable, but with a set of caveats I’m not as familiar with.

    5. 5

      I asked this on HN as well - does anyone have actual numbers/ benchmarks for a postgres queue? I don’t know what’s reasonable to expect. 100qps? 1000? 1mqps? What about qps under write load? I basically can’t find a single person saying what load they handle other than some really really hand wavy stuff.

      1. 2

        These are old but they should give you any idea: https://github.com/lateefj/gq/blob/master/NOTES.rst My memory i used reddit comments for messages. It would be relatively easy to create database instance per partition. For certain situations RDBMS (ACID transactions or non 0 errors / retry ) can make a lot more sense. If you have ever had to shed load in something like Kafka RDBMS can be very appealing for certain workloads.

      2. 1

        I’ve done some testing, but I’m not satisfied with my methodology. Feel free to reach out via email if you want to get something concrete.

        My best guess is closer to the 1000qps range.

        1. 1

          I’d be curious to see your queueing system. For reference, this is what I’ve built:

          https://github.com/grapl-security/grapl/blob/main/src/rust/plugin-work-queue/src/psql_queue.rs

          It could use some tweaking with regards to indexing and it is notably not batch oriented. I never got a chance to benchmark it properly.

          1. 1

            Relevant code is here: https://github.com/acaloiaro/neoq/blob/main/backends/postgres/postgres_backend.go

            Sorry it’s not in rustecean :)

    6. 2

      I am looking forward to a practical example of this concept in use.

      I’m using LISTEN/NOTIFY in a couple of simple setups where I don’t even have a queue (but then risk losing messages/jobs, both if workers are down and if there are too many jobs, there aren’t in these specific simple setups) - it would be interesting to see a production setup, with some numbers on maximum sizes of queues, worker processes etc.

      I have seen situations where a job queue in PostgreSQL (oooold TheSchwartz-setup) started to get unwieldy when the number of jobs exceeded a couple of million jobs, where RabbitMQ has been starting to struggle only at 100x times that - I’m sure specific matters in both cases, though :-)

      1. 3

        The first time I heard about this approach was Sage Griffin mentioning they designed crates.io’s queuing system like this.

        1. 1

          Good tip! https://github.com/rust-lang/crates.io/blob/main/src/background_jobs.rs

          I see sage did the original implementation too.

      2. 3

        FWIW, I use it LISTEN/NOTIFY for writing updated static files to disk when the info in the database changes:

        https://sive.rs/shc

        (In this case, blog comments write HTML so when someone leaves a comment, the static HTML cache of those comments is written, and viewers just see the static cache.)

      3. 3

        I’m currently trialling good_job for $work as a replacement for sidekiq (it would let us drop redis, and one less dependency is always tempting, and avoids a race condition where jobs being started before the transaction that enqueues them commits).

        So far I haven’t run into any trouble, and my dive through the source code didn’t turn up anything terrifying, but I’d want to wait another month or so before firmly recommending it.

        1. 4

          Dont enqueue directly to redis in your transaction, see https://microservices.io/patterns/data/transactional-outbox.html

          It also works for enqueueing tasks.

          1. 2

            I mean, sure you could adjust your existing software to do this (senior dev will knock it out in a few weeks work, if you count monitoring / documenting what to do if it falls behind / setting up CI etc for the new process).

            Or, you could pick a toolset that eliminates the cost / ops work of an extra runtime dependency and lets you lean on the tools you’re already using.

        2. 2

          Read https://honeyryderchuck.gitlab.io/2023/04/29/introducing-tobox.html for more details.

          good_job does not use SKIP LOCKED, btw; it rather uses connection level advisory locks to achieve exclusive access, which among other drawbacks, limits some options, such as enabling transaction level connection poolers. That and being rails only make it a no go for me.

          1. 1

            Obviously if you’re not using rails then a rails-only tool is not going to help you.

            Well aware of the tradeoffs to be made between explicit locks and advisory; in particular, the tradeoffs good_job makes become very expensive if you have more than 100 or so worker processes.

      4. 1

        django-mailer is an outgoing mail queue that stores mail in the DB and uses SELECT FOR UPDATE as a locking mechanism to ensure one mail gets sent once. I’ve now also added a runmailer_pg command that uses NOTIFY/LISTEN to enable it to send mail on the queue immediately, which has fixed the major downside of using a queue for mail (i.e. it wasn’t sending mail immediately).

        Source code for reference: https://github.com/pinax/django-mailer/blob/master/src/mailer/postgres.py

        1. 1

          If you flush successful jobs and move failed jobs to a dead letter queue, it should be plenty performant to poll the queue table on 1s ticks, or even faster if necessary. I like NOTIFY/LISTEN for general-purpose systems with heterogeneous queues, but for something like django-mailer, it might make sense to keep using tried and true polling.

        2. 1

          This reminds me of an incident many years ago when MySociety were running the petitions website for 10 Downing Street, and they had a petition that went mega-viral, over a million signatures. They had to send out an official email response after the petition closed.

          Originally the petitions website kept its mail queue in a table in MySQL, but it was not able to deliver a million messages in a reasonable timescale. I think the main problem was lack of parallelism (though the table scanning might have been accidentally quadratic as well).

          One of my friends was working for MySociety at the time, and I was postmaster for Cambridge University and an Exim expert, so I helped out with a bit of advice. Which boiled down to dumping the messages into Exim (local, therefore fast) and let it do the mail delivery in parallel. Much more effective use of the network!

    7. 2

      fyi that the link to the neoq GitHub repository has an errant ) at the end, and thus 404s.

      1. 2

        Thanks, fixed!

    8. 1

      I am not an expert in Postgres. If I get a list of jobs to perform using SELECT ... SKIP LOCKED, how I ensure that these rows are locked until the jobs are done? Do I have to start a transaction, UPDATE these rows and commit only when I am done with these jobs?

      1. 5

        You can have a long running transaction, but the usual way is to have a status column. Say it can have values NEW, RUNNING, DONE, FAILED. It starts new. You SELECT … WHERE status=‘NEW’ FOR UPDATE SKIP LOCKED in a transaction and update the status as part of committing the transaction. Then when the job finishes, you update it again. Meanwhile other jobs can still be taken by other workers while this worker is taking a job. The key thing that SKIP LOCKED does is not force your queue to be single threaded.

        1. 1

          I know at some point I had the idea to do this in neoq. Thanks for the reminder!

      2. 5

        You start a transaction, execute select … for update skip locked and hold the transaction open until processing the record is done, then update a status flag or delete the row, and finally close the transaction. If something fails, e.g. the machine crashes, the transaction is rolled back and the row automatically made available to be picked up by another process.

        One benefit of select … for update skip locked is that, in contrast to a scheme where you update a column to say that it’s in a “processing” state, there’s no need to have a sweeper process to identify rows where the processor has crashed, and mark them ready for processing again.

        1. 1

          I had one more question if someone could answer. Let’s say I have run this query to get the jobs I want:

          jobs_rows = cursor.fetchall("SELECT id, info FROM jobs WHERE status='PENDING' ORDER BY ts ASC LIMIT 100 FOR UPDATE SKIP LOCKED")
          

          Now I perform these jobs. Once done I have to update the previously selected jobs rows. For this I can do

          cursor.execute("UPDATE jobs SET status='DONE' WHERE id=ANY(%s)", (<list of ids from jobs_rows>,))
          

          Is there a better way to do this?

          1. 2

            Is there some particular reason you don’t like this? I mean, aside from that it’s a bit odd to fetch multiple jobs instead of a single job at a time - if something crashes you might end up with several jobs already processed but not marked as such, and the other jobs genuinely not processed yet.

            1. 1

              Nope. I am fine with it. Just wondering if I was making a mistake somewhere.

              1. 1

                Looks all good! You can also add another constraint that status='PENDING' when you set them to done and then compare the number of impacted rows. If there is a discrepancy then that indicates a bug. Maybe jobs got changed by some other process or even manual operations.