> Guaranteed "exactly once" delivery of messages to a consumer within a visibility timeout
That's not going to be true. It might be true when things are running well, but when it fails, it'll either be at most once or at least once. You don't build for the steady state, you build against the failure mode. That's an important deciding factor in whether you choose a system: you can accept duplicates gracefully or you can accept some amount of data loss.
Without reviewing all of the code, it's not possible to say what this actually is, but since it seems like it's up to the implementor to set up replication, I suspect this is an at-most-once queue (if the client receives a response before the server has replicated the data and the server is destroyed, the data is lost). But depending on the diligence of the developer, it could be that this provides no real guarantees (0-N deliveries).
It will be true because of the "within a visibility timeout" right? Of course that makes the claim way less interesting.
I took a peek at the code and it looks like their visibility timeout is pretty much a lock on a message. So it's not exactly once for any meaningful definition, but it does prevent the same message from being consumed multiple times within the visibility timeout.
> it does prevent the same message from being consumed multiple times within the visibility timeout.
... When there is no failure of the underlying system. The expectation of any queue is that messages are only delivered once. But that's not what's interesting: what matters is what happens when there's a system failure: either the message gets delivered more than once, the message gets delivered zero times, or a little of column A and a little of column B (which is the worst of both worlds and is a bug). If you have one queue node, it can fail and lose your data. If you have multiple queue nodes, you can have a network partition. In all cases, it's possible to not know whether the message was processed or not at some point
The Two Generals Problem is a great thought experiment that describes how distributed consensus is impossible when communication between nodes has the possibility of failing.
Distributed consensus within a quorum is possible. With only two nodes there is no quorum. With three nodes and at most one offline, consensus is possible.
If the message never reaches the queue (network error, database is down, app is down, etc), then yes that is a 0 delivery scenario. Once the message reaches the queue though, it is guaranteed that only a single consumer can read the message for the duration of the visibility timeout. FOR UPDATE guarantees only a single consumer can read the record, and the visibility timeout means we don't have to hold a lock. After that visibility timeout expires, it is an at-least-once scenario. Any suggestion for how we could change the verbiage on the readme to make that more clear?
You are talking about distributed systems, nobody expects to read "exactly once" delivery. If I read that on the docs, I consider that a huge red flag.
And the fact is that what you describe is a performance optimization, I still have to write my code so that it is idempotent, so that optimization does not affect me in any other way, because exactly once is not a thing.
All of this to say, I'm not even sure it's worth mentioning?
You should let the Apache Flink team know, they mention exactly-once processing on their home page (under "correctness guarantees") and in their list of features.
Exactly once delivery is not the same as exactly once processing.
Exactly once delivery is an impossibility; see the two generals problem. Exactly once processing is usually at-least-once delivery coupled with deduplication or an otherwise idempotent action
You CAN have "at most once" processing, since there is always the option of processing zero messages due to systems being down, but it's a joint effort, nobody is able to offer it "for free" (as in, no effort to implement it)
> Once the message reaches the queue though, it is guaranteed that only a single consumer can read the message for the duration of the visibility timeout.
But does the message get persisted and replicated before any consumer gets the message (and after the submission of the message is acked)? If it's a single node, the answer is simply "no": the hard drive can melt before anyone reads the message and the data is lost. It's not "exactly once" if nobody gets the message.
And if the message is persisted and replicated, but there's subsequently a network partition, do multiple consumers get to read the message? What happens if writing the confirmation from the consumer fails, does the visibility timeout still expire?
> After that visibility timeout expires, it is an at-least-once scenario.
That's not really what "at least once" refers to. That's normal operation, and sets the normal expectations of how the system should work under normal conditions. What matters is what happens under abnormal conditions.
> FOR UPDATE guarantees only a single consumer can read the record, and the visibility timeout means we don't have to hold a lock
This is not always true!
Postgres does snapshot isolation within transactions that begins on the first statement within a transaction. This means that some isolation levels like REPEATABLE READ can feel misleadingly “safer” but actually break your guarantee. From the docs:
> Note also that if one is relying on explicit locking to prevent concurrent changes, one should either use Read Committed mode, or in Repeatable Read mode be careful to obtain locks before performing queries. A lock obtained by a repeatable read transaction guarantees that no other transactions modifying the table are still running, but if the snapshot seen by the transaction predates obtaining the lock, it might predate some now-committed changes in the table. A repeatable read transaction's snapshot is actually frozen at the start of its first query or data-modification command (SELECT, INSERT, UPDATE, DELETE, or MERGE), so it is possible to obtain locks explicitly before the snapshot is frozen.
Well that only is a problem with locks if you actually need locks to satisfy your guarantee and you acquire the lock after your first read. However repeatable reads are never a good guarantee without locking
As u/Fire-Dragon-DoL says, you can have an exactly-once guarantee within this small system, but you can't extend it beyond that. And even then, I don't think you can have it. If the DB is full, you can't get new messages and they might get dropped, and if the threads picking up events get stuck or die then messages might go unprocessed and once again the guarantee is violated. And if processing an event requires having external side-effects that cannot be rolled back then you're violating the at-most-once part of the exactly-once guarantee.
> you can have an exactly-once guarantee within this small system
It's actually harder to do this in a small system. I submit a message to the queue, and it's saved and acknowledged. Then the hard drive fails before the message is requested by a consumer. It's gone. Zero deliveries, or "at most once".
> If the DB is full, you can't get new messages and they might get dropped
In this case, I'd expect the producer to receive a failure, so technically there's nothing to deliver.
> if the threads picking up events get stuck or die
While this obviously affects delivery, this is a concurrency bug and not a fundamental design choice. The failures folks usually refer to in this context are ones that are outside of your control, like hardware failures or power outages.
> That's not going to be true. It might be true when things are running well, but when it fails, it'll either be at most once or at least once.
Silly question as somebody not very deep in the details on this.
It's not easy to make distributed systems idempotent across the board (POST vs PUT, etc.)
Distributed rollbacks are also hard once you reach interacting with 3rd party APIs, databases, cache, etc.
What is the trick in your "on message received handler" from the queue to achieve "exactly once"? Some kind of "message hash ID" and then you check in Redis if it has already been processed either fully successfully, partially, or with failures? That has drawbacks/problems too, no? Is it an impossible problem?
> What is the trick in your "on message received handler" from the queue to achieve "exactly once"?
There's no trick. The problem isn't "how to build a robust enough system" it's "how can you possibly know whether the message was successfully processed or not". If you have one node that stores data for the queue, loss of that node means the data is gone. You don't know you don't have it. If you have multiple nodes running the queue and they stop talking to each other, how do you know whether one of the other nodes already allowed someone to process a particular message (e.g., in the face of a network partition), or didn't let someone else process the message (e.g., that node experienced hardware failure).
At some point, you find yourself staring down the Byzantine generals problem. You can make systems pretty robust (and some folks have made extremely robust systems), but there's not really a completely watertight solution. And in most cases, the cost of watertightness simply isn't worth it compared to just making your system idempotent or accepting some reasonably small amount of data loss.
You don’t achieve exactly once at the protocol/queue level, but at the service/consumer level. This is why SQS guarantees at-least-once.
It’s generally what you described, but the term I’ve seen is “nonce” which is essentially an ID on the message that’s unique, and you can check against an in-memory data store or similar to see if you’ve already processed the message, and simply return / stop processing the message/job if so.
And just to add a small clarification since I had to double take: this isn't exactly-once delivery (which isn't possible), this is exactly-once processing. But even exactly-once processing generally has issues, so it's better to assume at least once processing as the thing to design for and try to make everything within your processing ~idempotent.
I'm curious about the same thing.. I use a deadletter queue in sqs and even sns and here I can see an archiver but it's not clear if I need to rollout my own deadletter behaviour here.. not sure I would be too confident in it either..
A nice novel project here but I'm a bit skeptical of its application for me at least.
pgmq.archive() gives us an API to retain messages on your queue, its an alternative to pgmq.delete(). For me as a long-time Redis user, message retention was always important and was always extra work to implement.
DLQ isn't a built-in feature to PGMQ yet. We run PGMQ our SaaS at Tembo.io, and the way we implement the DLQ is by checking the message's read_ct value. When it exceeds some value, we send the message to another queue rather than processing it. Successfully processed messages end up getting pgmq.archive()'d.
To be honest. I like the at least once constraint. It causes you to think of background jobs in a idempotent way and makes for better designs. So ultimately I never found the removal of it a mitzva.
Also if you don't have the constraint and say it works and you need to change systems for any reason, now you gotta rewrite your workers.
I agree. But it can be useful to have a guarantee, even for a specified period of time, that the message will only be seen once. For example, if the processing of that message is very expensive, such as if that message results in API requests to a very expensive SaaS service. It may be idempotent to process that message more times than necessary, but doing so may be cost prohibitive if you are billed per request. I think this is a case where using the VT to help you only process that message one time could help out quite a bit.
Agreed that this property is useful for efficiency. The danger comes from users believing this property is a guarantee and making correctness decisions based on it. There is no such thing as a distributed lock or exclusive hold.
This is more or less where my brain was heading. The utility of exactly once system is quite high. But the utility of coding knowing that it is at least once, hopefully, and if not we'll do a catch-up, means that I will code knowing that my work must be safe in an environment with these constraints.
The best argument I have for a postgres background worker queue is simplicity. You're already managing a db, why not have the db simplify your startup's infrastructure. Honestly, with heroku, sidekiq, and redis it isn't even simplifying. But for startups simplicity is useful so they can scale when they need to and avoid complexity when possible. But that's the only argument I'd make.
Quite. The problem with a visibility timeout is that there can be delays in handling a message, and if the original winner fails to do it in time then some other process/thread will, yes, but if processing has external side-effects that can't be undone then the at-most-once guarantee will definitely be violated. And if you run out of storage space and messages have to get dropped then you'll definitely violate the at-least-once guarantee.
One big advantage of using queue inside DB is that you can actually use queue operations in the same transaction as your data operations. It makes everything incredibly simpler when it comes to failure modes.
IMO 90% of software which uses external queues is buggy when it comes to edge cases.
A fair point. If you do need an external queue for any reason (legacy/already have one, advanced routing semantics, integrations for external stream processors, etc.) the "Transactional Outbox" pattern provides a way to have your cake and eat it too here--but only for produce operations.
In this pattern, publishers write to an RDBMS table on publish, and then best-effort publish to the message broker after RDBMS transaction commit, deleting the row on publish success (optionally doing this in a failure-swallowing/background-threaded way). An external scheduled job polls the "publishes" table and republishes any rows that failed to make it to the message broker later on. When coupled with inbound message deduplication (a feature many message brokers now support to some degree) and/or consumer idempotency, this is a pretty robust way to reduce the reliability hit of an external message broker being in your transaction processing path.
It's not a panacea, in that it doesn't help with transactional processing/consumption and imposes some extra DB load, but is fairly easy to adopt in an ad-hoc/don't-have-to-rewrite-the-whole-app way.
Sure, not having to spin up a separate server is nice, but this aspect is underappreciated, imo. We eliminated a whole class of errors and edge cases at my day job just by switching the event enqueue to the same DB transaction as the things that triggered them. It does create a bottleneck at the DB, but as others have commented, you probably aren't going to need the scalability as much as you think you do.
The advantage of using your DB as a queue is that a traditional DB is easier to interact with (for example using SQL queries to view or edit the state of your queue).
In most business applications the message payload is a "job_id" pointing to a DB table so you always need and have to go back to the database to do something useful anyway. With this setup it's one less thing to worry about and you can take full advantage of SQL and traditional database features.
The only downside and bottleneck of having your DB act as a queue is if the workers processes are hitting the DB too frequently to reserve their next job.
Most applications will not reach the level of scale for that to be a problem.
However if it does become a problem there is an elegant solution where you can continously populate a real queue by querying the DB and putting items in it ("feeder process"). Now you can let the workers reserve their jobs from the real queue so the DB is not being hit as frequently.
The workers will still interact with the DB as part of doing their work of course. However they will not ask their "give me my next job ID" question to the DB. They get it from the real queue which is more efficient for that kind of QPOP operation.
This solution has the best of both worlds you get the best features of something like Postgres to be the storage backend for your jobs without the downside of hammering the DB to get the next available job (but in general the DB alone can scale quite well for 95% of the businesses out there).
Simplicity is one of the reasons we started this project. IMO, far less maintenance overhead to running Postgres compared to RabbitMQ, especially if you are already running Postgres in your application stack. If PGMQ fits your requirements, then you do not need to introduce a new technically.
There's definitely use cases where PGMQ wont compare to RabbitMQ, or Kafka, though.
PGMQ doesn't give you a way to deliver the same message to concurrent consumers the same way that you can with Kafka via consumer groups.
To get this with PGMQ, you'd need to do something like creating multiple queues, then send messages to all the queues within a transaction. e.g. `begin; pgmq.send('queue_a'...); pgmq.send('queue_b'...); commit;`
PGMQ doesn't seem to have functionality for returning a payload to the task submitter.
For example, lets say the task is something like:
Run a SELECT query on database FOO, returning the results
There would be workarounds (ie store the results in a "results" table or similar) but just being able to directly return the result in a message to the caller is conceptually simpler.
Yeah, I've written a few of these and should probably release a package at some point but each version has been somewhat domain specific.
The last time we measured an immediate 99% performance improvement over SNS+SQS. It was so dramatic that we were able to reduce job resources simply due to the queue implementation change.
There's a lot of useful and almost trivial features you can throw in as well. SQS hasn't changed much in a long time.
This seems like a lot of fluff for basically SELECT ... FROM queue FOR UPDATE SKIP LOCKED? Why is is the extension needed when all it does is run some management type SQL?
Perhaps we were all just not good at database'ing, but at a previous job, "using RDBMS as a queue" became a meme/shorthand for "terrible idea that needs to be stamped out immediately".
Does Postgres have some features that make it not entirely unsuitable to use for queuing?
I think this is one of the cases where "You don't have Google problems, so you don't need Google solutions" applies. Using Postgres or a RDBMS for queuing is perfectly fine and it'll get you a long way before you have to worry about scaling or optimizing it.
The benefits are easy to see: You already know how to operate a database, you can easily see what's the in the queue, you can easily insert items in the queue with a simple "insert" query, you can use triggers to enqueue items that got changed etc.
In the end, a queue is relatively simple to switch out later.
I think it would be tough to compare. There are client libraries for several languages, but the project is mostly a SQL API to the queue operations like send, read, archive, delete using the same semantics as SQS/RSMQ.
Any language that can connect to Postgres can use PGMQ, whereas it seems River is Go only?
I’m not sure what are the benefits for the micro service architecture. Do you expect other services/domains to connect to your database to listen for events? How does it scale if you have several micro services that need to publish events?
Or do you expect to a dedicated database to be maintained for this queue? Worth comparing it with other queue systems that persist messages and can help you to scale message processing like kafka with topic partitions.
Who said anything about microservice architecture?
If you're building a new MVP that needs background processing, you have a queue, and your monolith listens to the queue. Sticking to one database during the MVP keeps things simple until you validate both the product and expected scale. You can migrate to something heavier later if the product gets traction.
We talk a little bit in https://tembo.io/blog/managed-postgres-rust about how we use PGMQ to run our SaaS at Tembo.io. We could have ran a Redis instance and used RSMQ, but it simplified our architecture to stick with Postgres rather than bringing in Redis.
As for scaling - normal Postgres scaling rules apply. max_connections will determine how many concurrent applications can connect. The queue workload (many insert, read, update, delete) is very OLTP-like IMO, and Postgres handles that very well. We wrote some about dealing with bloat in this blog: https://tembo.io/blog/optimizing-postgres-auto-vacuum
IMO, it is most valuable when you are looking for ways of reducing complexity. For a lot of projects, if you're already running Postgres then it is maybe not worth the added complexity of bringing in another technology.
I agree in general, but there will always be certain requirements and team structures where stuff like this makes sense.
For me, I work in a small team of 6 devs on an ever growing app and feature set. I 100% will leverage managed services where cost and complexity allow. SQS is one of the most stable and cheapest AWS service, and the ability to just use it and not have to sysops it means we can spend more time building features.
Indeed. I’ve relied heavily on SQS for years and never regretted it. I question the comparison to SQS for this add on — it’s not really in the same ballpark.
When you're building an MVP, you want to keep the architecture as utterly simple as possible. SQS helps at scale, but your product may never reach scale. In the meantime, you vendor-locked into AWS, with all its attendant needs like setting up auth to AWS, account vending, etc. If your MVP already needs stuff like object storage, and you're setting up AWS anyway, then sure, prefer SQS. But a lot of MVPs are better set up as monolith + Postgres to start with and only complicating the architecture after product traction is found.
For the longest time the common advice was that using a database as a message queue/broker was a bad idea. Now, everyone seems keen to use a DB for this instead of tools dedicated to this purpose. Why?
The circle of life. We used to track jobs in a db, but that would eventually hit performance issues (locks, noisy neighbors, etc). So more nosql solutions and services showed up. New devs gobbled up the concepts and APIs but were frustrated that distributed nosql solutions were SaaSified and wanted to bring control back in house, in a single db, and we are back to square one. But with better tooling and hardware in theory. Hopefully you have a dedicated db at least to avoid noisy neighbor workloads but you may still have task queue work that hits scaling limits on the single db causing tasks to interfere with one another--so you can shard the db and isolate task streams, and the circle continues
A client is supposed to poll the queue for new items (i.e. issue pop requests in a loop), or is there some better event-oriented approach for this (via pg notify ?)
pop() or read() in a loop, yes. can read 1 message or many messages at a time.
what we do at Tembo in our infrastructure is pause for up to a few seconds if a read() returns no messages. when there are messages, then we read() with no pause in between. when the queues are empty it amounts to less than one query per second. there is not much cost to reading frequently if you use a client side connection pool, or a server side pool like pgbouncer.
I've thought about this too. But I can't even tell what would be the good default. At low load events seem nicer, but at high load polling seems necessary?
That's normal, yes. Name a queuing system, and with very few exceptions it will have clients for a variety of languages.
It is also normal to exchange messages with contant as json, protobuf or similar format, which again can be processed by any language that aims to be widely used.
In fact, are there any queues that aren't language-agnostic? I had the idea that ZeroMQ was a C/C++ only thing, but I checked the docs and it's got the usual plethora of language bindings https://zeromq.org/get-started/
So right now I can't name any queue systems that are single language. They're all aimed at interop.
Most queuing systems that I have used, you can send text or bytes, so the most common denominator is DTOs encoded as json, encoded as UTF-8 text.
You could do likewise, (i.e. send text containing a representation of what you want) and this would allow other languages to consume? But that's an edge case (sending text) of an edge case (RQ).
RQ never publishes a real protocol for sending and receiving jobs. Also the `data` field could be anything because it's pickled so you could send a whole Python's object there.
See [1] for efforts to make RQ cross language compatible
> RQ never publishes a real protocol for sending and receiving jobs.
Then, RQ sounds substandard.
> Also the `data` field could be anything
"the sender could send anything" is true in theory, but in practice it is both useless and false. If you want to make it work, sender and receiver need to agree on a lot.
JSON could take any forms too, but the sender and receiver have to typically agree, implicitly or explicitly, on many of the details of the message. Like, what fields are needed, what the allowable values are, etc.
Interesting. I've been looking at a much more simpler system than Celery queue to publish jobs from Golang and consume jobs from Python side (AI/ML stuff). This threads gave a lot of names and links for further investigation.
As mentioned, there are a plethora of queuing systems that have cross platform clients.
If you’re interested specifically in a background job system, you may want to check out Faktory. It’s Mike Perham’s language-agnostic follow-up to Sidekiq.
I'm yet to find a use case for "WITH TIME ZONE", in all cases it's better to use "WITHOUT TIME ZONE". All it does is displays the date in sql client local timezone, which should never matter for well done service. Would be glad to learn otherwise.
Yes, it is quite confusing and I dread to think how many have got it wrong and store local times like the GP.
But it's also not as simple as "always use WITH TIME ZONE". That also leads to a mistake.
The reason is (just to reiterate what you said) the TIMESTAMP WITH TIME ZONE does not store the time zone! If you ever want to get local time back (e.g. ask a question like "how many users log on before lunch time") then you need to store either local time in a TIMESTAMP WITHOUT TIME ZONE field, or the time zone, and get local time like: SELECT recorded_at AT TIME ZONE time_zone AS local_time ... (I prefer the latter).
Look carefully. The SQL client does not just "display it in local time", it displays it with a UTC offset. You can be sure whenever you see a UTC offset that UTC is fully recoverable. In this way the TIMESTAMP WITH TIME ZONE field is context independent. It's just UTC.
Conversely, if there is no offset, time zone, or something to distinguish it as UTC (like the Z in ISO8601) then you are just storing "local time", that is the time on the clock in someone's kitchen, somewhere. This is the TIMESTAMP WITHOUT TIME ZONE field and is highly context dependent (in particular, what clock was used?)
Note there are a number of background job processors for specific languages/frameworks that use Postgresql as the broker. For example GoodJob and the upcoming SolidQueue in Ruby and Rails.
This is neat. Would be cool if there was support for a dead letter or retry queue. The idea of deleting an event transactionally with the result of processing said event is pretty nice.
This is exactly how we do this in our SaaS at Tembo.io. We check read_ct, and move the message if >= N. I think it would be awesome if this were a built-in feature though.
On the contrary, creating new HTTP connections introduces an irreducible source of latency compared to establishing and reusing a persistent connection.
You may end up building a single-tenant architecture where each tenant gets its own database and there are relatively few consumers that are able to respond quicker due to sticking with a long-lived connection model.
If you look at other HTTP-based databases, like DynamoDB or S3, the latency involved in setting up new connections is a downside of those databases (not arguing that it's never worth it, architectural decisions are all trade-offs, but that is a trade-off).
Trying to build on persistent HTTP connections that don't close is a recipe for frustration when scaling horizontally, which is something you plan to do, since the reason why not to go with Postgres is so you can have more than one instance, right?
You can't juggle the connection between different servers. If a server drops out (because it's being scaled in), then you lose the connection, and you have to establish a new one, which introduces a hiccup/latency. No free lunches.
People considering this project should also probably consider Graphile Worker[1] I've scaled Graphile Worker to 10m daily jobs just fine
The behavior of this library is a bit different and in some ways a bit lower level. If you are using something like this, expect to get very intimate with it as you scale- a lot of times your custom workload would really benefit from a custom index and it's handy to understand how the underlying system works.
have also used/introduced this to several places I've worked and it's been great each time. My only qualm is it's not particularly easy to modify the exponential back off timing without hacky solutions. Have you ever found a good way to do that?
That's not going to be true. It might be true when things are running well, but when it fails, it'll either be at most once or at least once. You don't build for the steady state, you build against the failure mode. That's an important deciding factor in whether you choose a system: you can accept duplicates gracefully or you can accept some amount of data loss.
Without reviewing all of the code, it's not possible to say what this actually is, but since it seems like it's up to the implementor to set up replication, I suspect this is an at-most-once queue (if the client receives a response before the server has replicated the data and the server is destroyed, the data is lost). But depending on the diligence of the developer, it could be that this provides no real guarantees (0-N deliveries).