Hacker News new | past | comments | ask | show | jobs | submit login
Adventures in message queues (antirez.com)
350 points by fcambus on March 15, 2015 | hide | past | favorite | 99 comments



In my experience there are three things that will break here;

1) At-most-once is a bridge to an elementary school which has an inter-dimensional connection to a universe filled with pit vipers. Kids will die, and there is nothing you can do to stop it.

2) Messages are removed when acknowledged or memory pressure forces them to be kicked out. Black Perl messages, those that sail in from out of nowhere, and lonely widows (processes that never find out their loved ones are dead) will result.

3) Messages are ordered using wall clock millisecond time. This will leave your messages struggling to find their place in line and messages that should be dead, not be dead (missing fragment problem).

Obviously all these are simply probabilistic trade-offs based on most likely scenarios which result in arbitrarily small windows of vulnerability. No window is small enough at scale over time.

Often when these things have bitten me it has been non-programming stuff. For example a clock that wouldn't follow NTP because it was too far ahead of what NTP thought the time was, an operator fixed that by turning time back 8 seconds. A client library that was told messages arrive at most one time, and so made a file deletion call on the arrival of a message, a restored node holding that message managed to shoot it out before the operator could tell it that it was coming back from a crash, poof damaged file. And one of my favorites in ordering, a system that rebooted after an initial crash (resetting its sequence count) and getting messages back into flight with the wrong sequence number but with legitimate sequence values. FWIW, these sorts of things are especially challenging for distributed storage systems because files are, at their most abstract, little finite state machines that walk through a very specific sequence of mutations the order of which is critical for correct operation.

My advice for folks building such systems are never depend on the 'time', always assume at-least-once, and build in-band error detection and correction to allow for computing the correct result from message stream 'n' where two or more invariants in your message protocol have been violated.

Good luck!


Hello, 1 / 2 are not going to be problems for sure because:

1) At most once must be clearly asked for with the right API call, the default is at least once, and the API has a protection to avoid an error by mistake, that is, "retry=0" is only accepted if you specify a single copy for the message, otherwise an error is returned:

> addjob myqueue myjob 0 retry 0

(error) ERR With RETRY set to 0 please explicitly set REPLICATE to 1 (at-most-once delivery)

2) No active messages are removed on memory pressues, only acknowledged messages (already processed at least one time) that are still not garbage collected since not every node that may have a copy was reached. This means that Disque by default try hard to avoid spurious multiple deliveries, but on memory pressure it only sticks the main guarantee (that is, at least once delivery).

3) Most problems involving a message broker are fine with "fair" ordering, that is, we are not going to deliver messages in random order. Even if there is some clock difference between nodes (for sure there is), in case multiple nodes are generating messages for the same queue, the queue of certain nodes with clocks more "in the past" may have under high traffic some millisecond of delay, because queues are modeled with skiplists, so messages are inserted in the right place when migrated from one node to another because of the auto-federation feature. When no message migration is involved, messages are delivered exactly in order, but the order is even in this case violated at some point because Disque re-deliver messages automatically if not acknowledged, which is a violation of ordering per se.


I don't understand the purpose of at-most-once semantics in practice: So, you've got some process where you don't care if the message goes through, but you're willing spend money on the compute/storage for it anyway? Why bother?

If you're designing a system with those semantics, is that because you're hoping that it's exactly-once: 99.999% of the time -- wink, wink, nudge, nudge so that your handlers don't have to be idempotent? What's the fallback plan for the day that all of your messages go into a networking blackhole?


Soft real time data. Anything that would be out of date by the time you had timed out and resent. For example a packet of range data from a robot. Drop one? Worthless now, but another one will be along in 50 msec or so.


Telemetry from an external data source wouldn't be delivered from a queue, it'd be published into a queue. Think IoT into an AWS Lambda instance -- you've already paid the micro-cents to handle the request, so why not just forward it into Kinesis? (How does putting that data into a lossy queue save you time/money/effort?)

EDIT: Misread your example. The idea is that the robot has an outgoing buffer containing time-series data that it tries to publish on a best-effort basis? In that example, it seems like a ring buffer would be a better solution than a messaging system.


Another example of this sort of behavior was the netcode common to Quake and many other shooters--since lots of history is sent as full state instead of deltas, it's "okay" to have lost packets.

It's not ideal, and it'll screw up certain types of things that really, really want a contiguous history of events, but for normal play it's perfectly fine.

When doing client-side prediction, then it may become useful to have historical data, but as my sibling post suggests, a ring buffer of length one minimizes latency.

It actually might be interesting to see a writeup on multiplayer games viewed through the lens of eventually-consistent distributed databases.


A ring buffer of length one minimizes latency.


Why would you use a queue instead of a topic for a use-case like this?


From the point of view each of the subscribers to a topic, they see a queue.


> I don't understand the purpose of at-most-once semantics in practice: So, you've got some process where you don't care if the message goes through, but you're willing spend money on the compute/storage for it anyway? Why bother?

For instance because the time for retransmission is higher than what someone (say an HTTP request) is going to wait for.


I could imagine at most once being good for something like a ping or a heartbeat. Or even a "this is the current version of some piece of data". And if your consumers are prone to failing to ack and you don't want to gum up the queue with retries.

But your point stands that it is certainly not what we want in the lions share of use cases. I guess that's why the default semantics are at least once. With at most once being a configurable option.


I work in SMS sending, and every link in the chain has to reason about how they fail. If there is a failure, it is better that the message is dropped than that it gets resent duplicated.

It would be a bad system indeed if a failure to write a log line (say) caused a retry loop that resent the message again and again...


Many at-least-once systems support dead letter queues natively, and if not, support isn't hard to add on yourself.


I want a message queue to hold credit card authorization requests. I want to ensure that the authorization requests occur at most once, since having a customer's credit card charged twice is infinitely worse than not charging the card.

If the request never goes through, then I can always get the customer to try again, but if it gets charged more than once, then it's a huge headache, and our customers get extremely angry and lose confidence that we know what we're doing.


That is certainly doable but you need to separate the message semantics from the credit card semantics. So lets say your authorization includes a nonce which is a blend of the transaction id data such that it is unique to the transaction. Now you send that through the message system which delivers it one or more times, but the nonce identifies all of them as the same transaction so the credit card processor tosses the 'extra' ones away, it has already done the work. In such a system the message passing infrastructure needs to be at-least-once and the protocol involved insures that nothing bad happens if you deliver messages twice. Then you can route around damage or non-functioning nodes.

The reasoning at the protocol level then goes, "If I have never seen this nonce, then the transaction has not been authorized" (very durable, easy to implement), and "A request seen multiple times will only be recorded once" (also very durable and easy to implement).

But if the message infrastructure tries to guarantee at-most-once, and the user writes their protocol, "if I see this authorization I should do it, because the protocol will only send it to me one time." Then there will come a time in that protocols future where something blew up the invariants in the message infrastructure and the message arrives twice, and the wrong thing happens on the credit card because the protocol trusts it to do the right thing.

Back in the RPC wars people were arguing that it was so much "simpler" and more "transparent" if a remote procedure call had the exact same semantics as a local procedure call. And that is true it would be, if you could manage that similarity, except it has been proven over and over again that network messaging systems can't completely get it correct.


Interesting, thanks for the insight. We are doing something similar to the nonce that you mention, but your argument that at-least-once semantics is more appropriate is very convincing to me, as is your RPC argument. I'm old enough to have had to endure both ONC and DCE rpcs :)


For this kind of issue I stick to at-least-once, and then use a lot of band-aid:

- Each worker has to hook to a database where a record keep the state of the transaction: ready, processing, success or failure. With a good old SELECT FOR UPDATE a transaction will never be done more than once.

- Timestamps and alerts ensure that if the worker fails dramatically (leaving the task as "processing"), a human will check and update the task accordingly.

It has proven pretty resilient so far... but at very small scales.


If you've got a worker node that proxies those requests to your payment processor and it dies, then you can be in an unrecoverable state where you won't know whether the authorization request has succeeded or whether the message was lost.

An at-most-once policy doesn't save you from that, but an at-least-once policy would force you to write an idempotent handler that may be able to recover from failure (via the PP API) or safely make progress (with a bit of transactional state).


What happens with the authorization to the credit card processor times out? You don't know if it succeeded or not. You have to have a way to ask the credit card processor for "did this charge succeed?" and then process the payment if they respond with "no".


Don't you store the result of the authorization in your persistent storage? If you do, you can check whether you need to do the authorization again.


Credit card authorization might be better suited to a synchronous request, versus going through a message queue.


It doesn't really matter either way. Unless you are using distributed transaction protocols, you pretty much MUST store-and-forward and have idempotent processes in the consumer/receiver to guarantee something happens. Usually the easiest way is to generate a transaction ID, such as a UUID, at the very start and use it end-to-end as indicated by the earlier nonce comment. Then each step along the way can either locally dedupe, or refer to a shared database/API with it.


Well one difference is being able to easily propagate failures end to end.

Agreed about idempotent processes in the "consumer" processes. But I generally prefer marking an end user session in a persistent store before propagating the request for payment (effectively acting as a sticky current transaction ID), and making synchronous requests down the chain versus generating a transaction ID and doing a bunch of async message queuing.

It's just easier to know when something didn't work and display the appropriate user feedback. But your point is valid.


A lot of sensor data is like this. Missing a reading is not ideal but not the end of the world.


How does something like rabbitmq fare here?


Apparently with clustered masters, with network partition failover and healing, it by default drops messages, which means it's not even at-least-once. https://aphyr.com/posts/315-call-me-maybe-rabbitmq

Not so great guarantees there.


Rabbitmq has at-least-once semantics (unless requeue = false, in which case all bets are off, since you could get a message multiple times due to cluster partitions^ or not at all due to a drop without requeue. Note that unlike disque, requeue is set on a rejection/recovery operation, not on a message).

It has transactional publishing (which not all clients support as the Confirm method is non-standard) and uses a per-queue leader election so all publishes have to go to the master queue, guaranteeing ordering. However, requeued messages have undefined order. However however, all messages with the requeued flag unset still have strict ordering relative to each other.

Messages are only removed when acked. If I understand correctly, disque will report back a message publish failure in an OOM condition, whereas RabbitMQ will simply refuse to read from the connection and make TCP do the publisher throttle work, which is a massive hack that occasionally causes problems. I prefer disque's method, especially since clients should never be deluded into thinking a publish can't fail for other reasons.

Overall, the main differences between this and rabbitmq, from my reading, are:

* RabbitMQ has stronger ordering guarentees...until your message gets requeued, in which case things get more complex.

* Disque has a clearer clustering story, stating outright that it is AP. However, this means that messages might or might not always reach their destination, producing (as the grandparent comment calls it) "lonely widows". RabbitMQ can be CP or AP depending on configuration, but its CP mode is somewhat fraught with pitfalls and strange performance issues (as I have personally experienced in production).

* RabbitMQ has the advantage of the whole suite of AMQP features: exchanges, headers, routing rules, as well as some non-standard ones like federation, dead letter queueing. Of course, complexity comes at a cost, and Disque is considerably simpler.

* RabbitMQ supports per-message durability and per-queue mirroring options. This reflects its history as a single-node application which had clustering added later. I prefer Disque's approach of always-clustered with the option to write to disk (especially having that option only apply on graceful restart).

Overall, I might use Disque for high throughput applications where I don't mind messages rarely being lost, such as metrics. However, the AP semantics worry me. Dissatisfied as I am with Rabbit's CP mode, it's still my preferred option of these two in most use-cases.

^ (edit) As the sibling comment notes, the default mode for clustering is AP and message can be lost (this is the same as disque). A few times in my original post I accidentally defaulted to talking about its CP mode, which is the only mode I've used (IMO, the only mode that should be used).


Thanks for your comment. I think there is some misunderstanding about AP / CP and message semantics. Messages in a (proper) AP system are immutable and replicated multiple times, and never dropped if not acknowledged by a client, so there is no case where the message with a replication level of N can miss delivering if just N-1 nodes failures occurred. This is the story of a Disque message:

1. A client produces the message into some node with replication factor of N.

2. The node replicates to additional N-1 nodes (or to N nodes under memory pressure to avoid to retain a copy).

3. When the replication factor is achieved, the client gets notified. Or after a timeout, an error is reported to the client, and a best-effort cluster-wide deletion of the message is performed (only contacting nodes that may have a copy).

4. At this point the message is queued only in a given node, but N have a copy.

5. The message is delivered, but let's imagine is not acknowledged by the client.

6. Every node that has a copy, after the retry time elapsed, will attempt to re-queue it again, using a best-effort algorithm to avoid requeueing multilpe times. However the algorithm is designed that under partitions what could happen is only multiple nodes putting the message ready for delivery again, not the contrary.

7. Eventually the message gets acknowledged. The ack is propagated across the cluster to the nodes that had a copy of the job, and the ack is retained (if there is no memory pressure) to make sure that no node will try to deliver again the message. However when all the nodes report to have the acknowledged, the message is garbage collected and removed from all the nodes having a copy.

So basically you can count on Disque trying to deliver the message at any cost UNLESS the specified message time to live is reached. You can optionally specify a max life for the message, for example 2 days, if after 2 days no delivery was possible, the message is deleted regardless of the acknowledged state or not. This is useful because sometimes to deliver a message after a given time does not make sense.

However if you set, for example, retry to 60 seconds and TTL to 2 days, it means that all the nodes having a copy will try every minute to deliver a non acknowledged message again for 2 days. There is just to keep in mind that TTL means to destroy messages after some time.


So what if, during a partition, I publish another message? Does it get rejected for not being able to reach N servers within timeout, or has "N" been adjusted down to the size of the currently-reachable machines.

ie. is a partitioned cluster effectively "split-brained" where publishes only appear on one side, or does it stop accepting new messages?


N is a number you specify via the API call with the REPLICATE option. By default is set to 3 to provide a reasonable durability. So if you are in any side of the partition with at least 3 nodes, you can continue without issues.

Two sides of a partitions are basically two smaller cluster that operate independently, if we consider new messages. But what about old messages? They'll wait (if there is no memory pressure) to get garbage collected if copies are split among the two nodes. However during the partition the side where the message gets acknowledged will stop ASAP from re-queueing it.


Note that even in CP mode, committed messages can be dropped as well (one of the pitfalls I assume you've experienced) when the cluster heals from partition. cf link in my sibling comment.


I suspect what happened there was to do with http://www.rabbitmq.com/ha.html#unsynchronised-slaves

When using mirrored queues, Rabbit does ensure all the active mirrors are written to before confirming a publish:

    "in the case of publisher confirms, a message will only be confirmed to the publisher when it has been accepted by all of the mirrors"
So if my understanding is correct, wiping the contents of a re-joining mirror shouldn't matter, since no new messages should have been accepted since the partition (unless the "pause" part of pause-minority is only happening after other things like re-election or dropping "dead" slaves, in which case yes pause-minority is useless - this seems doubtful, however).

Hence why I think the problem is synchronized slaves.

Basically, when a slave is created (eg. in response to another slave dying), it only receives NEW messages, not existing messages. So suppose the following sequence of events on a 2-mirror queue:

    Publish A
    Master and slave both contain A
    Slave dies
    New slave created
    Master contains A; Slave contains nothing
    Publish B
    Master contains A,B ; Slave contains B
    Master dies
    Slave promoted and new slave created
    A is lost
The way around this is with setting the policy "ha-sync-mode": "automatic". In which case the act of creating a new slave also replicates the current contents of the master. To the best of my knowledge, if the same Call Me Maybe tests were run with that policy in place, no messages should be lost.

But yes, this is precisely what I meant by "fraught with pitfalls". The pause while messages replicate can be disastrous on its own if the queue is large, another issue that has bit me in production.

I do love RabbitMQ but I wish there was a good, planned-from-the-beginning as clustered CP AMQP broker out there. Maybe I'll try to write one.


It is possible to build reliable monotonic counters based on time, and that's actually a sound approach.


I'm very sorry, credits for the questions goes to Jacques Chester, see https://news.ycombinator.com/item?id=8709146 I made an error cut&pasting the wrong name of Adrian (Hi Adrian, sorry for misquoting you!). Never blog and go to bed I guess, your post may magically be top news on HN...


Seems like a similar design to Apache Kafka, http://kafka.apache.org. AP, partial ordering (Kafka does ordering within "partitions", but not topics).

One difference is that Disque "garbage collects" data once delivery semantics are achieved (client acks) whereas Kafka holds onto all messages within an SLA/TTL, allowing reprocessing. Disque tries to handle at-most-once in the server whereas Kafka leaves it to the client.

Will be good to have some fresh ideas in this space, I think. A Redis approach to message queues will be interesting because the speed and client library support is bound to be pretty good.


Maybe I'm missing something, but if it is important to guarantee that a certain message will be dispatched and processed by a worker, why wouldn't a RDBMS with appropriate transactional logic be the best solution?


It would. There's an argument that something like only once, guaranteed and in-order delivery are business requirements which therefore have no place in the message layer - http://www.infoq.com/articles/no-reliable-messaging.


Credit for the questions is due to jacques_chester, not me! See https://news.ycombinator.com/item?id=8709146


>a few months ago I saw a comment in Hacker News, written by Adrian Colyer...was commenting how different messaging systems have very different set of features, properties, and without the details it is almost impossible to evaluate the different choices, and to evaluate if one is faster than the other because it has a better implementation, or simple offers a lot less guarantees. So he wrote a set of questions one should ask when evaluating a messaging system.

I can not find the comment by @acolyer on HN. Who can help me?


I think Salvatore mis-remembered. The comment was by @jacques_chester (he made a joke about it below, but people didn't get it). The comment is in this thread (HN doesn't let me link directly): https://news.ycombinator.com/item?id=8708921 . Look for Jacque's comment in there, and Salvatore replied to it.

I also wanted to find the original, and used Jacque's remark below as a starting point to start hunting for it.


Sorry Jacques, Adrian. This great (IMHO) set of questions was written by Jacques indeed.


Thanks!


I wonder what the point is in having "best effort FIFO"? If the application has to be able to deal with unordered messages anyway, you might as well not bother to try to maintain any kind of order.

It's as well to be hung for a sheep as for a lamb.


In general, you want to consume messages "fairly" ie. in a way that minimizes latency introduced by the queuing. Best-effort ordering gives you this most of the time, which is better than none of the time.


Exactly, imagine you use your message broker in order to coordinate a web app that needs to apply effects to photos in a web interface. It is definitely a good idea that users arriving before are served before, but violating this by a couple of milliseconds is not going to change the experience.


It lets the consumer use a fixed buffer to re-introduce ordering (possibly dropping any out of order messages that come outside of a window)


Ask HN: I'm in the market for a distributed message queue, for scheduling background tasks -

Does anything support "regional" priorities, where jobs are popped based on a combination of age + geographic/latency factors?

Also, what are recommended solutions for distributing job injection? My job injection is basically solely dependent on time, and so i envisage one node (raft consensus?) determining all jobs to inject into the queue.

My queue volume is about 50 items/sec and nodes will be up to 400ms apart.


That sounds like a job for a scheduler, in the cluster sense. I'm actually looking around for a job framework in .NET that supports custom schedulers, but have yet to find something that supports resource constrained scheduling. It's all either about en-queueing something to be done NOW, or at a future date. I haven't seen anything that supports custom scheduler implementations on a per-job type basis. They don't really distinguish between logging work to be done and deciding whether or not it can be executed NOW.


This looks very cool. At-least once semantics are the way to go because most tasks require idempotence anyway and that helps in dealing with multiple delivery. Strict FIFO ordering is not always needed either as long as you avoid starvation - most of the time you need "reliable" deferred execution ("durable threads").

I started prototyping something along these lines on top of riak (it is incomplete - missing leases etc but that should be straightforward to add): https://github.com/isbo/carousel It is a durable and loosely FIFO queue. It is AP because of Riak+CRDTs. It is a proof of concept - would be nice to build it on top of riak_core instead of as a client library.


When I first installed Redis years ago I was astounded at how easy it was to get up and running. Compare this to the plethora of message brokers out there: the vast majority you will spend the better half of the day trying to figure out how to configure the damn thing.

My overall impressions with message brokers is that RabbitMQ is a pain in the ass to setup, celery is my go to these days with beanstalkd being a close second if I don't want too many of celery's features.


RabbitMQ is ridiculously trivial to set up in a clustered configuration. Even more advanced things like HA policies and user permissions are super easy to do.

RabbitMQ's problem is that has no tolerance for network partitions, and also tends to be very buggy in the presence of such.


Agreed, I've used beanstalkd in multiple projects/companies and its simplicity, robustness and especially stability has always impressed me.

There are client drivers for a wide range of languages including Python and Ruby (and also a ActiveJob compatible library).

Installing it is simple as downloading the source, running make and then the binary. http://kr.github.io/beanstalkd/


We use Beanstalkd actively to process 50M emails per month, and it does a great job. We never had any issues with it, and setup is very easy, you have a nice admin console panel to help you in development and monitor your tubes https://github.com/ptrofimov/beanstalk_console


Celery isn't a message broker - they always recommend using Celery as a consumer for a RabbitMQ broker.


Is that so? RabbitMQ on a single node is easy, you can get up and running right after installation with just the defaults.

I do agree there's a few gotchas with clustering though, such as firewall rules and inet_dist_listen_min/max, setcookie, etc.


how to set up rabbitmq:

1) sudo apt-get install rabbitmq 2) there is no step 2


While I believe the top poster is being hyperbolic, I think you aren't being entirely honest. Looking over my chef script I have - multiple add_user, set_user_tags, add_vhost & set_permission commands. Then theres the application code of creating the queues and adding them to the appropriate hosts. Whats even more annoying is there isn't a simple way to backup/move nodes that doesn't involve either imaging the entire OS, or exporting every message (why can't I just flush and copy a directory?). All of these permissions are needed when all I really need is a default admin given that the node is already behind a firewall and anyone with permission to connect to the node either likely knows what they are doing or is an intruder and I should consider the box hosed either way.

Somehow, I found Kafka easier to use than RabbitMQ.


FYI, Salvatore will speak at dotScale in Paris about Disque on June 8: http://dotscale.io


This has me excited for many reasons. Redis is amazingly powerful, robust and reliable piece of technology. Also I love reading antirez's blog posts about the decisions behind Redis so I can't wait to learn more about queueing systems from him when discussing Disque.


This looks like a good effort, congratulations.

Personally I'm torn on the usefulness of generic brokers for all circumstances... there are obvious advantages, but at the same time every messaging problem scales and evolves differently so a broker can quickly become just one more tail trying to wag the dog.

I am also interested in the architecture of tools like ZeroMQ and nanomsg, where they provide messaging "primitives" and patterns that can easily be used to compose larger systems, including having central brokers if that floats your boat.


We recently switched from RabbitMQ to Redis queuing because we were not able to implement a well enough priority queue with highly irregular workloads. Prefetch would not work since 2 minute workloads would block all following messages. Timeout queues would somewhat rebalance msgs, but large blocks of messages would be queued at the same time and therefor be processed as large blocks. Now our workers are listening to 10 queues/lists with different priorities with BRPOP and so far everything seems to work.


Unodered, in-memory queues shouldn't be anyone's goto solution. I think there's a time and place for these, and having at-least-once delivery is a huge win over just using Redis, so I'm excited.

Still, unless you know exactly what you're doing, you should pick a something with strong ordering guarantees and that won't reject messages under memory pressure (although, rejecting new messages under memory pressure is A LOT easier/better to handle than dropping old messages).


Some big project are currently making the switch to DDS-based pub/sub. [1,2]

Now that everybody is making QoS guarantees in pub/sub and message queues, is there a real difference to the 10 year old tech deployed in boats, trains and tanks?

[1] http://www.omg.org/spec/DDS/1.2/

[2] http://design.ros2.org/articles/ros_on_dds.html


Anyone personally work on a DDS project is in actual production? I have never even seen one although I've worked in some of the industries where it is supposedly a success.


I think this has a lot of roots from NSQ. But, NSQ has no replication support.

I think built in replication is very nice to have. Would like to try once this arrives.


NSQ writes messages sent to a topic onto all channels subscribing to the topic which can be used as a form of replication in a way that meshes well with its at-least-once semantics.


But, that all happens inside one NSQ. I mean what if that NSQ server goes down.

We don't have that kind of guarantee in NSQ. To get rid of that, we need to maintain the replication our own by publishing the message to two different NSQ servers.


True, but with NSQ being brokerless, the lines are somewhat blurred between client and server.

Each of our app servers has its own local nsqd. If that nsqd stops responding, we take the app server out of the load balancer. The local nsqd publishes to multiple channels, which get consumed by hosts in different datacenters.

There are still potential failure cases: nsqd loses connection to consumers, messages start building up, then the machine somehow goes away. The only way to prevent it is to take the machine out of the load balancer any time nsqd messages start backing up, but we prioritize serving requests over sending messages.


Yes. I didn't want to rant our NSQ. This is the reason we choose NSQ because of it's simplicity.

We maintain NSQ close to the consumer. We don't our publisher to take responsibility to the message processing. Once it's push to the queue, we need to make sure it's getting process anyway.

That's why we publish same message to multiple queues. All our DB operations are idempotent. So, we are okay with processing the same message multiple times.


Good points.


I wish to assure all and sundry that Adrian Colyer is not my secret crime-fighting identity, and vice versa :)


Sorry Jacques :-( Fixed... and thank you again for your comment.


Hey, it gave me chance to crack a lame joke. I love those.


Thanks for your insightful comment at: https://news.ycombinator.com/item?id=8709146


Will there be any way to set up machine affinity? I think Azure Service Bus uses this mechanism (by specifying a partition key for a message) to enable strict FIFO for a given partition.


I didn't see any mention of dead letter queues. Does it support dead letters? This is an extremely useful feature of Amazon SQS.


This reminds me of a talk at SCALE13x about NATS: http://nats.io

It's fast and scaleable.



Is guaranteed at-most-once delivery impossible?


Exctly-once is impossible. Example, you build all the system to be totally consistent and transactional (a CP system basically). Then you deliver the message to the client, and it dies without to report you if the message was acknowledged or not. You have two options:

1. Re-issue the message. If you do that, it is possible that the now crashed client already processed it, and you end with multiple delivery.

2. Drop the message. If you do that, the client maybe did not processed the message, and you end with no delivery.


Exactly-once is possible in practice if you effectively bring the client under the umbrella of your transactions. E.g. you can (ab)use MySQL and other RDBMs to do something like:

1. Have a table where each row is a "job" or "message" and has a status which when enqueued is status = "unclaimed". You can also have a `last_changed` column.

2. Have workers consuming that table that GET_LOCK() a row and set status = "processing" and hold the lock for the duration of the processing.

3. When they're finished with the task they update status = "finished" and unlock the row (or equivalently, disconnect).

This requires much tighter coupling between the queue and the queue consumer (each client must maintain a lock / connection for the duration of processing items).

But it means that:

* Nothing will ever pick up the item more than once due to the GET_LOCK().

* If the consumer dies the item is either just unlocked, or the status is "processing". You can as a matter of policy re-pick up "processing" with a `last_changed` in the distant past, alert on those items and manually inspect them.

* If the consumer processes the item successfully it'll set the status to "finished" and nothing will process the item again.

Now obviously this requires a lot more overhead & maintenance than what you have in mind, in particular it makes some of the failure cases be "the item is delayed due to a consumer dying and won't be processed until a human looks at whether it was actually finished".

But this is the sort of pattern that you might use e.g. if you have a queue for sending out invoices. At work we use MySQL + https://metacpan.org/pod/Data::Consumer::MySQL to do this.


Even considering the client to be part of the distributed system in an "active" way, cooperating for the single delivery goal, the part I don't believe is reasonable is: "In particular it makes some of the failure cases be "the item is delayed due to a consumer dying and won't be processed until a human looks at whether it was actually finished".

Moreover, what you describe here is more like: at most once delivery with delivered messages log so that non acknowledged entires can be inspected. I don't see how this really qualifies as exactly once. I guess exactly once must be , eventually, honored automatically even in the face of failures to qualify.

Isn't it just better to use an at-least-once queue, (trans)actions-unique-IDs, and a CP store as the source of truth for the current state? So you turn all the sensible operations into idempotent ones.


Right, you don't have to implement it like that, but the point is that the job is guaranteed to be in one of these states:

* Hasn't been picked up yet * Has been picked up, and currently has something processing it (because the lock is still there) * Has been picked up, but whatever picked it up has gone away * It's finished

Handling the jobs that haven't been picked up yet or are finished is easy. But what do you do about the jobs where the processor has simply gone away?

Well, if they still hold the lock you can give them some more time, and if they don't hold the lock presumably they died.

At that point you can decide how you want to handle that, do you just have something re-queue those jobs after some set amount of time has passed, or does someone manually look at the jobs and decide what to do?

As an example of something we use this for: You might have some transactional E-Mails to be sent out, each recipient is a row in the queue table, you have to generate an E-Mail and pipe it to sendmail, just before you shell out to sendmail you mark the item as "processing", then you depending on the sendmail return value you either mark the item as processed in the queue or re-queue it.

There's obviously a race condition here where sendmail might have returned OK to you but the DB server you're talking to blows up (so you can't set the status as "finished"). No amount of having unique IDs in external systems is going to help you because that'll just create the same problem. I.e. you have some state outside of your system that needs to be manipulated exactly once, and once that's done you'd like to mark the job as done.

In practice once you get the things that can fail between "processing" and "finished" down to some trivial logic this sort of thing is really reliable as an "exactly once" queue. To the extent that it fails once in a blue moon you can usually just manually repair it, i.e. in this case see what your mail logs say about what mails you sent out.

Redis obviously doesn't have the same strong storage guarantees as a disk-backed RDMBs, but we also have a version of exactly this logic that runs on top of Redis sentinel: https://metacpan.org/pod/Queue::Q::ReliableFIFO::Redis

It has the same queued/in-progress/finished queues using Redis lists, things are moved between the lists atomically and processed by workers, but of course if a worker crashes and burns at the wrong time you're stuck with some items in in-progress state and have to somehow figure out what to do with them. I.e. either you blindly re-queue them (at least once) or manually see whether they finished their work and re-queue them as appropriate (exactly once).


That's not entirely true, since what you describe isn't a CP system. The client is a part of the system!

It is possible to have the client be part of the consensus algorithm and guarantee exactly once delivery, but the latency gets quite high.


Are you assuming the client will eventually recover here? Or that clients are in general able to "lock" the resource before doing some work? Because otherwise, imagine a client may crash and will never recover again, and you have clients sending a beam of electrons in some device, without feedbacks about the beam output. Each client is a device that can send the beam. If the client will never restart again, even with all the cooperation and consensus, there is no way to know if the beam of electrons was generated or not.


"> /dev/null" is technically very bad "at most once"


How does zero MQ stand up


ZeroMQ is not a message queue, nor is it a broker. You could build a message queue with it, but it's not really a valid comparison.


I wasn't aware it's not a broker. Thanks for clearing that up.


It's fairly trivial to build an in-memory broker with zmq, though, but you should rather consider zmq as building blocks for a message queue, or sockets on steroids.

It fills a need for certain applications, and it is very good at what it does.


It would be nice to have a message queue system not built in Erlang or Java.


Apache Qpid has a C++ broker if that helps, plus it talks AMQP...

(http://qpid.apache.org/components/cpp-broker/index.html)




Despite its name, ZeroMQ is an alternative sockets API and nothing, I repeat nothing, more.


ZeroMQ is not a message queue. It's in the name: It has zero message queue capabilities. It does implement socket primitives you can use to build a queue broker, but afaik no one has.




Thanks. Looks unfinished. No persistence yet, according to the author.




Consider applying for YC's W25 batch! Applications are open till Nov 12.

Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: