Hacker News new | past | comments | ask | show | jobs | submit login
Transactionally Staged Job Drains in Postgres (brandur.org)
115 points by johns on Sept 20, 2017 | hide | past | favorite | 31 comments



(Author here.)

I've taken fire before for suggesting that any job should go into a database, but when you're using this sort of pattern with an ACID-compliant store like Postgres it is so convenient. Jobs stay invisible until they're committed with other data and ready to be worked. Transactions that rollback discard jobs along with everything else. You avoid so many edge cases and gain so much in terms of correctness and reliability.

Worker contention while locking can cause a variety of bad operational problems for a job queue that's put directly in a database (for the likes of delayed_job, Que, and queue_classic). The idea of staging the jobs first is meant as a compromise: all the benefits of transactional isolation but with significantly less operational trouble, and at the cost of only a slightly delayed jobs as an enqueuer moves them out of the database and into a job queue.

I'd be curious to hear what people think.


> I've taken fire before for suggesting that any job should go into a database, but when you're using this sort of pattern with an ACID-compliant store like Postgres it is so convenient.

+1 to in database queues that are implemented correctly. The sanity of transactional consistency of enqueuing alone is worth it. I've used similar patterns as a staging area for many years.

This also allows for transactionally consistent error handling as well. If a job is repeatedly failing you can transactionally remove it from the main queue and add it to a dead letter queue.


> This also allows for transactionally consistent error handling as well. If a job is repeatedly failing you can transactionally remove it from the main queue and add it to a dead letter queue.

Totally. This also leads to other operational tricks that you hope you never need, but are great the day you do. For example, a bad deploy queues a bunch of jobs with invalid arguments which will never succeed. You can open a transaction and go in and fix them in bulk using an `UPDATE` with jsonb select and manipulation operators. You can then even issue a `SELECT` to make sure that things look right before running `COMMIT`.

Again, something that you hope no one ever does in production, but a life saver in an emergency.


We do this and have great success with a small SMS/Email application doing ~2mil jobs a day (mostly over a 6hour peak period).

Except, we use postresql's LISTEN/NOTIFY which amazingly is also transaction aware (NOTIFY does not happen until the transaction is committed, and even more amazingly, sorta de-dupes itself!) to move the jobs from the database to the message queue.

This way we never lock the queue table. We run a simple go program on the postgresql server that LISTEN's, queries, pushes to REDIS and then deletes with a .25 second delay so we group the IO instead of processing each row individually.

This also allowed us to create jobs via INSERT FROM SELECT, which is awesome when your creating 50k jobs at a time.


Perhaps I'm missing something here, but in your example:

> Sidekiq.enqueue(job.job_name, *job.job_args)

You're doing all your enqueueing in a transaction, so if any enqueue call fails (e.g. network error) you'll break the transaction, and requeue all of your jobs (even those that were successfully delivered).

Given that you're lobbing the job outside of the DB transaction boundary, why have that transaction at all? It's not clear to me why all the jobs should share the same fate.

If you want at-least once message delivery, can't you configure that in your queue? (For example, RabbitMQ supports both modes; only ACKing after the task completes, or ACKing as soon as the worker dequeues the message).

I'm not familiar with Sidekiq, so maybe that's not an option there. But in that case, it's still not clear why you'd requeue all the tasks if one of them fails to be enqueued (or the delte fails); you could just decline to delete the row for the individual job that failed.


> Given that you're lobbing the job outside of the DB transaction boundary, why have that transaction at all? It's not clear to me why all the jobs should share the same fate.

Good question. I should have probably put a comment to annotate that.

The transaction is there purely to wrap the `SELECT` statement, then later the `DELETE`. If it fails midway through, the process will restart, and you will get doubled up jobs inserted into your queue. This isn't very desirable, but it's better than the alternative, which is `DELETE` the jobs too early and then lose jobs when your process dies.

> If you want at-least once message delivery, can't you configure that in your queue? (For example, RabbitMQ supports both modes; only ACKing after the task completes, or ACKing as soon as the worker dequeues the message).

Your queue will also be configured to do this (i.e. in the case of Sidekiq, it won't fully free a job until it's been confirmed to have succeeded or failed on at worker), but you basically need to have at least once delivery between any two systems. So the enqueuer will hand jobs off to a queue with this guarantee, and the queue will then hand them off to its own workers.

> I'm not familiar with Sidekiq, so maybe that's not an option there. But in that case, it's still not clear why you'd requeue all the tasks if one of them fails to be enqueued (or the delte fails); you could just decline to delete the row for the individual job that failed.

Yeah, keep in mind that this a pretty naive code sample for purposes of simplicity. You can and should be more clever about it like you suggest if you're putting it in production. However in practice, enqueuing a job is likely to have a very high success rate, so probably this simple worker will do the trick in most cases.


I do the same thing. Small projects start with the job queue in postgres.

As things eventually scale up, I move the queue to its own dedicated postgres node.

Once that starts to be too slow, I finally move to redis/kafka. 99% of things never make it to this stage.


Content aside. I never saw a blog article, so carefully typeset as this one:

- Font choices and sizes

- TOC

- Figures

- Code samples

... all look perfect. It even includes a carefully spaced initial.

I'd love to be able to replicate this on my Jekyll blog. But looks like most of this is hand-crafted HTML/CSS: https://github.com/brandur/sorg


+1 - i wish it were made in Hugo (which is golang as well and 100x as fast as jekyll).


Is static page generation performance really a thing for you?


here's the thing - i think the definition of fast is in two different dimensions and both of them count.

One is the generation: i actually run my startup's website on Hugo. There are quite a few pages - including landing pages - and jekll was sloooow.

Second, we have marketing people on windows + mac, dev people on linux. The time taken for someone to setup Hugo on their laptops is 5 minutes: its a single binary.

Jekyll - upwards of 30 minutes after fighting ruby and setting %PATH% variables in windows.


Faster page generation makes novel use cases like generating websites to be stored in cloud object storage with function calls actually possible and potentially much cheaper than a CMS like Wordpress. I'm sure there's some CMS POCs out there like that but the landscape is so chock full of plugins and really nontechnical users I can't really see it taking off.


It is really nice to be able to see your changes reflected "instantly" rather than having to "wait for a compile". Iteration is great!


Time to install is much faster, bundle install is just so damn slow.

And now that I'm not a ruby dev any more, I don't install a modern ruby on my computer.


I think it is great that PostgreSQL is strong enough to allow people to build robust queuing systems, but I still think that you are better off in the long run to use a real message queuing system like RabbitMQ to do this job.

Start out by running RabbitMQ on the same server as PostgreSQL but do limit its use of cores and RAM. Then when your business grows you can easily scale to a separate RabbitMQ server, to a cluster of MQ servers and to a distributed RabbitMQ service using clusters in multiple data centers with global queues synchronized using a RabbitMQ plugin.

The benefit of using RabbitMQ is that you begin to learn how message queuing fits into a system architecture and that you will not run into corner cases and weird behaviors as long as you heed the advice of moving to a dedicated RabbitMQ server when your usage gets large enough.

An additionally benefit is that when you learn how to integrate functionality by using a message queue (actor model) rather than a link editor, you can avoid the monolithic big ball of mud problem entirely and easily integrate both monolithic functions and microservices in your app.

Background jobs are just one part of what a robust message queue gives you. In my opinion, the desire for background jobs is a design smell that indicates a flaw in your architecture which you can fix by adding a message queue system.


I appreciate the writeup!

One thing that I probably should have been clearer on: I used Sidekiq as a queue example here, but this pattern generalizes to anything. RabbitMQ is just as plausible.

> Background jobs are just one part of what a robust message queue gives you. In my opinion, the desire for background jobs is a design smell that indicates a flaw in your architecture which you can fix by adding a message queue system.

Possibly ... one thing to consider is that a fair number of us are writing various types of web services, and in web services there are so many obvious tasks that should be moved out of band to a background job. It's not even so much about distributing workload (although that's a nice aspect) as it is about moving expensive operations out-of-band so that a user's request finishes faster.

Here's a couple examples:

* Process an uploaded image into a variety of thumbnail sizes.

* Fire a webhook.

* Send a welcome email.

* Duplicate added/updated information to an internal microservice.

* Start a file download (like synchronizing a remote RSS feed).

In all these cases there's no reason for the request that initiated the action to wait on the job being finished. Moving the work to the background is purely a win from the user's perspective.


As much as I like queues, RabbitMQ has some downsides compared to a database.

First, you get zero visibility into what's in the queue. There's literally no way to peek inside a queue without taking messages from it. Let's say one of the fields of your messages is customer_id. There's no way to get a count of how many messages are waiting that are related to customer 123.

This leads to the next problem: If the customer_key is something you want to partition by, you could create one queue per customer and then use a routing key to route the messages. But Rabbit queues are very rigid, as opposed to fluid. It's pretty inconvenient to move stuff between queues. So if you have one queue, and you want to split it into N queues, the only way is to drain the queue and republish each message back to the exchange. Rabbit provides no command line or management tools to do this, and neither does anyone else that I know.

Lastly, Rabbit deletes acked messages. To get any visibility into the history of your processing -- or indeed play back old messages -- you have to build that into your topology/apps, e.g. by having an exchange that dupes all messages into a queue and then run a consumer that drains it into a database table or log file.

I much like the "log" approach to queueing, as popularized by Apache Kafka. However, Kafka has its issues, and sometimes a database table is better.

The pattern I rather like to use is to use Rabbit purely for queue orchestration. Make a task table, use NOTIFY to signal that a row has been added (with ID as payload), have a worker use LISTEN and stuff each task's ID into Rabbit. Then have consumers get the Rabbit message, read (and lock!) the corresponding task, perform the task, then mark the task as done. If you need to replay or retry failed tasks, just use SQL to emit NOTIFYs again.


Bonus: you can use pg_amqp[1] to publish to RabbitMQ with transactional semantics

1: https://github.com/omniti-labs/pg_amqp


A queue is good for relieving the burden of processing the correct items from the list (so you don't have to query the database to know what to process next).

Apart from that would say that you should still have a serious ACID DB like Postgres for state management.

Then push the unique identifier of the task to the queue, and let the actual data live on the database.

Queues are good for being queues, not for being data stores.

I have seen some horrific cases of people mistaking a queue software for a database.


I tend to agree with you but I haven't found any good way to put things in a queue from within postgres - from a trigger for instance. Doing so would open up a lot of possibilities - do you have any suggestions, even if its just for things to google?


I haven't personally used either of these, but they look interesting and I'm hoping to test them out at some point:

https://github.com/gmr/pgsql-listen-exchange

https://github.com/subzerocloud/pg-amqp-bridge

In theory, these let you use postgres NOTIFY to add messages to queues (which can be done from inside triggers).


    loop do
      DB.transaction do
        # pull jobs in large batches
        job_batch = StagedJobs.order('id').limit(1000)

        if job_batch.count > 0
          # insert each one into the real job queue
          job_batch.each do |job|
            Sidekiq.enqueue(job.job_name, *job.job_args)
          end

          # and in the same transaction remove these records
          StagedJobs.where('id <= ?', job_batch.last).delete
        end
      end
    end
Isn't this essentially a busy loop? You can achieve something much more performant by using `LISTEN` and `NOTIFY` to fire an event every time a row is inserted.

Then the enqueuer can do a preliminary scan of the table when it boots up and then just a `LISTEN` instead of polling the DB.


Note the code snippet is mostly meant as a demonstration rather than something that's really production-grade.

Using `LISTEN`/`NOTIFY` is certainly workable. It's worth considering though that once you've got non-trivial traffic, you're going to have new jobs showing up all the time, so the loop is likely going to be cycling endlessly no matter what mechanic you use to feed it.

At the very least though you'd want a sleep statement with exponential backoff so that it's not churning away on an empty database at the low end.


Interesting.

I'm working on delivering a Postgres based job system right now; we cycle through states from an ENUM, landing eventually on a terminal state. Worker jobs (containers on a cluster) don't directly manipulate the state of the table, there's a controller system for that. Each controller in the (3-node) cluster has 2 connections to Postgres. Old jobs are DELETE'd when it's been "long ago enough".

Prior to addressing deadlocks from doing too much per transaction, initial load testing for this system suggested that the database was not the bounding factor in the system throughput, but rather worker throughput. Initial load is estimated to be under 500/day (\yawn\), but pushing the load to 100K/day didn't alter the outcome, although it made the cluster admin mildly annoyed.

One key reason I prefer to have the state machine switching / enum approach is that it's logically obvious. At a certain point, I am sure it'd have to change. I'm not sure how many concurrent mutations to separate rows a Postgres table can tolerate, but that serves as a hard upper bound.

Author: what kind of volume do you tolerate with this kind of design?


This pattern would basically be a clean migration away from a pure Postgres queue if either table bloat or locking becomes a performance problem. You maintain the benefits of transactional job enqueueing while only slightly worsening edge cases that could cause jobs to be run multiple times.

Just be sure to run your enqueueing process as a singleton, or each worker would be redundantly enqueueing lots of jobs. This can be guarded with a session advisory lock or a redis lock.

Knowing that this easy transition exists makes me even more confident in just using Que and not adding another service dependency (like Redis) until it’s really needed.


> Just be sure to run your enqueueing process as a singleton, or each worker would be redundantly enqueueing lots of jobs. This can be guarded with a session advisory lock or a redis lock.

If you're using PostgreSQL 9.5+ you can also use the new SKIP LOCKED capability, which is perfect for this sort of usage: https://blog.2ndquadrant.com/what-is-select-skip-locked-for-...


> Just be sure to run your enqueueing process as a singleton, or each worker would be redundantly enqueueing lots of jobs. This can be guarded with a session advisory lock or a redis lock.

Yeah, I'd forgotten to mention that in the article, but yep, something should be guaranteeing that only one enqueuer is running at a time.

> Knowing that this easy transition exists makes me even more confident in just using Que and not adding another service dependency (like Redis) until it’s really needed.

Yeah, so many stacks have a Redis for something these days anyway that you can probably start with it in a lot of cases, but if you're running really lean that seems like a good strategy.


> by selecting primed jobs in bulk and feeding them into another store like Redis

Doesn't this just mean bunch of lost jobs when redis fails.

Why not keep jobs with job state wait, done, etc in the reliable ACID store.


> Doesn't this just mean bunch of lost jobs when redis fails.

You need to be a little careful about configuration, but believe it or not, as long as Redis is configured with an append only file [1] it will provide you with a durability guarantee just like Postgres would. If it crashes, any jobs that were in there are recovered the next time it starts up.

From the docs:

> From now on, every time Redis receives a command that changes the dataset (e.g. SET) it will append it to the AOF. When you restart Redis it will re-play the AOF to rebuild the state.

---

[1] https://redis.io/topics/persistence


this is so awesome. for a small team building infrastructure on the cheap, building background jobs on postgres is so much nicer than using more complex tools like rabbitmq, etc .

are you planning on productizing this ?


Sequence allocations occur globally and outside your transaction.

    StagedJobs.where('id <= ?', job_batch.last).delete
This will end up deleting a job id that was reserved inside a transaction, meanwhile your enqueuer kicks off and fetches the jobs, then your transaction writes the job to staged_jobs table, just in time for enqueuer to delete it without ever queueing it.

You need to delete the specifically queued ids and not a numeric range.




Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: