Maybe I didn't get the point. Of course we can't have exactly-once delivery directly in the layer of an unreliable network - but it seems pretty easy to construct it on a higher layer if your network stack supports at-least-once delivery: Just assign each message a unique ID during sending, then track those IDs in the receiver side and discard duplicates. And you need those IDs anyway so you can generate ACKs.
Isn't this basically what every "reliable" transport (TCP, HTTP3, message queues...) does?
Wherever you construct it you must necessarily have a machine whose failure mode is that "exactly once" degrades into either "at most once" or "at least once".
What determines which failure mode you get is whether the machine will failover to a machine that retries uncertain messages (giving you "at least once"), or it doesn't (giving you "at most once").
But, you say, why can't we have it failover to a machine that asks recipients what they have got and goes from there? Well we can, but the recipients don't know what messages are in the network still on their way to them.
But, you say, why not have the recipients disregard those inbound messages once they know about the replacement machine? Well you can do that, but now the *recipients* become machines whose job is to ensure the deduplication. And now *they* become the machine with a bad failure mode.
But, you say, does this not reduce the odds of failures? Why yes, it does. Which is why people do things like this. And there has to come a point where we accept SOME failure rate.
If the recipient fails after telling the deduping machine what it has seen, the recipient's failover will be in an unknown different state. And now the failover machine has to try to figure out what its state should be, what is going on, and so on. You can add ways to addressing each possibility here, and you'll get ever more obscure chains that again can result in failure. At the cost of ever increasing complexity, you'll make failures ever more complicated.
Frequently developers who implement these manage to do two bad things. First they introduced a lot of complex code which rarely triggers except in disaster, and so whose bugs tend to survive. And second, they manage to convince themselves that they have accomplished the impossible, and make reliability promises that other developers unwisely believe. Exactly how unreliable most systems were and how much the documentation couldn't be trusted was underappreciated until https://jepsen.io/ came along and started proving how bad most distributed software was.
Now it may seem bizarrely unlikely that you'll ever see this kind of situation. But failures often start from network congestion due to a packet storm. And the failures lead to chatty Byzantine fault tolerance protocols adding significant traffic. This causes cascading failures. And so a small, simple outage can escalate into a series of outages as ever more confused servers continue overwhelming the network with their futile attempts to discover what is supposed to be true. So complex combinations of failures occur together more often than most of us would expect.
Certainly the recipient can fail, this seems obvious; if a user's phone dies then your app is not going to work. Perhaps I have the wrong model/framing here, but I was thinking from the perspective of being resilient to any failure outside of the recipient device.
Ah, but distributed systems tend to be hooked together. So the recipient device may itself feed into something else. And in the case of a message queue, generally does.
Now you need a database. Do you also need exactly-once delivery to the database? Now the service is no longer stateless too, which means scalability is a problem. Maybe you decide to make it just an in-process cache for de-duping, but that needs expiring and now the semantics are exactly-once within a given time period, and not across service restarts.
We can definitely solve this with higher level constructs, but they're not free, and they can introduce the same issues themselves.
> Isn't this basically what every "reliable" transport (TCP, HTTP3, message queues...) does?
TCP does this, to solve retries at the TCP layer. HTTP3 does this to solve issues at the HTTP3 layer. Message queues might solve this for the message queue, depends. But none of these solve the product level, or the user experience level, or other higher levels where these issues still crop up. They're issues you have to solve at every layer in some way.
Or I just use a monotonically increasing ID and track the highest ID I've received in order. I might have to buffer a certain number of packets/messages/whatever to deal with out-of-order arrivals, but the entire state fits into RAM. (Edit: Actually I don't even have to. It's probably a good thing to do for efficiency, but in principle I can just drop out-of-order messages and wait until they are redelivered, hopefully in the correct order)
But yes, even if I needed some stuff to archieve it, that doesn't make it impossible as the OP claimed.
> But none of these solve the product level, or the user experience level, or other higher levels where these issues still crop up.
I don't understand this point. Do you have some examples?
How do you know what the last message you received is? You can crash in the middle of receiving a message. You can crash after you've written the ID to disk but before you've processed it, or you can crash after you've processed it but before you've written the ID to storage.
If you're a distribution box (which is quite, quite common in these message queue systems), you can get the message, and send it to a box that just powered off. You saw it, you recorded it, and now you're not sure if you forwarded it successfully or not.
Fun thing about power outages, not every box turns off at exactly the same nanosecond. PSUs are full of capacitors and inductors. Sometimes that's just enough to float through a brownout, too (and a bunch of machines booting can also cause a brownout)
> Or I just use a monotonically increasing ID and track the highest ID I've received in order.
This assumes you can generate monotonically increasing numbers. If you have many clients, now they all need to share a data source and may be performance bound by generating those numbers.
> Actually I don't even have to. It's probably a good thing to do for efficiency, but in principle I can just drop out-of-order messages and wait until they are redelivered, hopefully in the correct order
True (modulo first problem), but efficiency may be necessary here. With many clients, you may end up in a state where only a small fraction of messages get through successfully, and most traffic is unsuccessful, which is bad. This also makes performance commitments hard to maintain as it's perhaps just luck when a client manages to get a message through. Clients also now need more buffering, more state, etc.
>> But none of these solve the product level, or the user experience level, or other higher levels where these issues still crop up.
>I don't understand this point. Do you have some examples?
Let's assume a simple client->server instant messaging app. As a user, if I send a message, I expect that to arrive exactly once. It's going over TCP which is "reliable", but it doesn't stop the HTTP request from failing and needing to be retried. It's using HTTP3, but that doesn't stop the server generating a 503 and needing to retry the POST request (or whatever). Maybe the server puts the message in a message queue, but that connection fails after sending a transaction commit, did it get committed?
Idempotency tokens or an equivalent mechanism do solve this, but there isn't one magic trick to solving it in some base layer technology like TCP, this needs to be solved again and again whenever you have distributed systems.
Also, this isn't just networking. Two processes on a single machine communicating via IPC may be effectively a distributed system! I've got some experience doing this on Android, and it's still hard.
> This assumes you can generate monotonically increasing numbers. If you have many clients, now they all need to share a data source and may be performance bound by generating those numbers.
You can assign an ID to your nodes and let them generate increasing numbers on their own. Node ID decides on a tie, and if one node sees a larger counter value appear, it adjusts its own counter so that it doesn't stay behind:
This is a logical solution but not a full practical solution.
With this approach you'd still need to communicate the current clock number back to clients as otherwise one will get ahead and have all its traffic accepted, and others will fall behind and be unable to get traffic accepted. Even if an error causes a client to bump forwards to retry, by the time it has done that the number it is about to retry with may have been used.
Additionally, the aim is still to get exactly-once delivery, so clients need to be able to differentiate between an error caused by them reusing an ID that was rejected to enforce exactly-once delivery, and an error caused by another client getting that ID.
Basically, this issue is easy to solve with low traffic and reliable persistent storage everywhere, but hard to solve with high traffic, or the acceptance that all persistent storage brings additional reliability challenges.
You don't "just" do anything when networking is involved. You can check out a course on TCP/IP if you want to see some reasons that is not enough. What you're describing is essentially a part of the TCP and isn't sufficient for a basic implementation of that.
Yeah often the best way to tackle exactly-once delivery to a database is a uniqueness constraint, but that isn't free – there's the index cost, additional write cost, and the error needs to be handled when it's thrown back to the client on a collision (something many applications don't handle well).
Stateful services are far harder to scale than stateless ones. Typically a stateless service can be scaled out horizontally with relative ease, but when there's state storage involved this becomes harder. You can scale vertically, but only so far. You can scale horizontally, but then typically need to introduce some sort of sharding/consistent hashing/etc to ensure that the service has a consistent view of the world without needing to connect to every database instance.
Not sure where the expectation of things being free comes from.
If your stating point is stateless then you can consider the tradeoff of introducing state vs. processing the same request multiple times.
It's probably more accurate to say that toy systems can maintain the illusion of exactly-once for a while, but it doesn't scale. You can't keep a record of every message ever seen in a message based system. Message passing systems exist to handle rates of traffic that cannot be achieved by rolling your own event queue as a thin wrapper around other tools like databases. It's not just the storage, it's the throughput.
The first time I encountered RabbitMQ it could only handle 60 messages per second with delivery guarantees. We already had our own bespoke system that used a database to handle a couple multiples of that. So we ended up limping along with what we had.
So when does the receiver record the IDs? When it receives the message but before processing or after it's processed the message? If the former, then what if it goes down during processing? Then the other receivers will keep rejecting the message even though it's never processed. So now it's less than once. If the receiver records it after it's done processing, then it could go down after processing but before recording it in the DB. So now you have more than once.
Also, isn't the assumption here that you will have a reliable connection to a shared DB?
You can have engineered solutions that is pragmatically close to deliver exactly once but it's not "pure" -- there are still scenarios, however unlikely, that it will fail.
Even if you record both when it arrives and when its completed a power failure mid way will still leave everything in between in a partial state unless everything has XA transactional behaviour and as we know that scales quite poorly.
XA scales well enough for many workloads. I believe that too many developers discard solutions like XA because it "scales quite poorly" without doing serious analysis of how much scalability they are likely to need and whether XA scales well enough to support it. On the flip side I believe that too many developers underestimate the complexity of managing state and failure in distributed systems.
The context is exactly once in a distributed system. When you construct that higher layer you will make your system highly coordinated, thus no longer a distributed system.
I mean, the property of distributed systems is that they crop up everywhere you have an unreliable transport and generally only bring downsides. If you could un-distribute your system that easily, I bet that would make a lot of people very happy.
The OP defines "distributed systems" like this:
> Web browser and server? Distributed. Server and database? Distributed. Server and message queue? Distributed.
By that definition, as soon as I have a server, a client and an unreliable connection between them, I have a distributed system. In that context, nothing stops me from counting IDs.
When you receive a message in your scheme, you have to do the following:
1. Some action with a side-effect (ex: update an entry on disk, send a message out to some third party, etc.). This might be a bank transaction, or a note saying "you gotta ship package X to person Y".
2. Some action to note that you've received ID X (ex: write to disk, send a message out to your DB, etc.)
How do you set up your server to deterministically do neither in event of a crash, and both in event that your code turns to completion?
If the action is sending a message, I’d say the answer is to just send it again, and defer the “exactly-once” problem to the point where the message is actually used. (It seems like we’re in a context where sending messages is always unreliable, so that send could fail, so there needs to be some protocol for retrying it anyway.)
So the action is some local action. If it’s purely digital, it’s fiddly but surely not impossible to ensure that the action and the record of the action either both take place or both don’t. It’s a database with a transaction log.
If the action is some irrevocable physical thing - remotely controlling a printer, say - you need to make a best effort to handle errors gracefully, sure.
I’d concede that it’s impossible to ensure that a document is printed out exactly once, say - maybe there’s a paper jam, and it’s debatable whether the jammed paper counts as a valid printout. But that’s not very surprising and I don’t think it tells you much. It’s mostly a problem for printer manufacturers rather than distributed system architects.
This issues are in the context of distributed systems where you want to be able to recover from losing a receiver (f.ex., we want to be able to reassign partitions for a Kafka topic when a consumer goes down). If you don't mind your system grinding to a halt whenever you lose one of your receivers (that's perfectly fine in some circumstances!), then your proposed solution works great.
Edit: also, i should be fair and acknowledge that you're effectively describing idempotency (i'm guessing you already knew that ;P ), which the article's author eventually points out is a way to recover "exactly-once" semantics. The point, maybe, is that someone needs to explicitly do this somewhere; you can't really rely on your protocol to do it for you.
Yes, but the receiver can be faulty. If it acknowledges the message and then crashes before handling it, you've got at-most-once, and if it handles the message and then crashes before acknowledging it, you've got at-least-once. You can avoid this if the receiver handles and acknowledges in a single transaction, but I only know of one platform that implements this and everyone hates it (hence the throwaway).
Even with a transaction, if the processing involves external side effects, e.g. sending and email, the rollback won't matter and you still get at least once.
There's no rollback, it's an atomic transaction. The certainty that messages are always handled completely or fail completely is one of the big design constraints that made the whole thing so hinky.
What if the receiver process fails? How do you know which messages it processed successfully? You can shuffle the problem around indefinitely but it doesn’t go away.
If your processing doesn’t have any external side effects (make external API calls, send emails, charge credit cards, etc) then one option is to put your message queue in a relational DB alongside all your other data. Then you can pull a message, process it, write the results to the DB, and mark the message as processed all inside one big transaction. But not many use-cases can fit these constraints and it also has low throughout.
Isn't this basically what every "reliable" transport (TCP, HTTP3, message queues...) does?