I'm floored by this. Last I spoke with that team the response was a few (but not everyone) in leadership getting it and everyone else asking why we would care. How far to come. GCP just got substantially more interesting.
Edit: by this I only mean to say well done to the Google team.
By the way, in addition to ordered messages, Pub/Sub has recently gotten a number of new features including message filtering, dead letter queues, and retry policy (some GA, some beta). You can find out more here: https://cloud.google.com/pubsub/docs/release-notes
Kafka can be configured to store messages forever. This means it can be used as the canonical store of both current and past data. It is attractive for some applications.
Any plans to add the ability to retain messages forever? I know I can store them in GCS and replay, but that's not what I'm looking for.
I can see how this would be useful. You could use the snapshot/seek features to replay messages from a particular point. It'd be useful retain messages for a long time so you could replay messages from any point. I could even imagine a feature for replaying back messages from only a time range. We're also aware of requests for more seamless integration with other GCP services; they may augment or provide a similar set of features. I can't comment on timelines unfortunately. :)
Disclaimer: I work on Cloud Pub/Sub, but this is my own opinion.
That makes sense. To add some color to the use case:
We'd like to use a pub sub system to store binlogs from databases (as done in projects like https://debezium.io/).
We'd like any team to be able to bring a replica database online by starting from the beginning of time and playing back the binlogs. And then to keep playing any future binlog messages to keep the replica current.
For usage like this, we could in theory keep all past messages on GCS, and then access the old messages on GCS and then when those are up to date, get the messages from pubsub. Or we could keep them on GCS and replay them on pubsub in the future if we wanted to.
But in a perfect world, I'd prefer the pubsub system to handle this, and allow a consumer to start at the beginning of time and get up to date, and stay up to date in a simple way.
Whoa! This is a game changer! I was looking into both Kafka and Google Pub/Sub for a event-oriented system my team was designing. Google Pub/Sub looked very promising, but no guarantee of ordering was a deal breaker for us. I’ll consider this more strongly for the next system we build.
We used Pub/Sub very extensively (50B messages a day) but moved to Pulsar [0]. It performs equally well and has some nice features. And also no vendor lock-in.
Pulsar seems operationally quite complex, as it has a dependency on both BookKeeper and ZooKeeper (which BK also needs). ZooKeeper is particularly notorious for being difficult. What's your experience been like?
It definitely is on the more complex side of management. That's why we partnered with Kafkaesque to do the maintenance for us. We were fine handling it ourselves but decided to outsource it as it's less critical for us than many other internal tasks.
They have an open ticket [0] to dilute zookeeper's dependance, but as far as I know it's still pending.
The company I work for had the same stance 5 years ago. We regretted it a hundred times and now we stand the gaff. Nothing better than maintaining an EC2 based Cassandra cluster instead of simple using DynamoDB, huh...
I don't believe I advocated for the use of Cassandra. Owning vs being owned by, we are not arguing about the same things.
Stances are not strategies, when we use another's API we form a bond but the the other is free to break it so we are automatically at a weaker position. We have acquiesced. But if we choose a strategically worse choice, we have not only acquiesced, but done self-harm.
I agree with this, the best parts of the cloud is abstracting away a lot of the basic maintenance of these applications. There is lock in to some platform no matter what you do, but hopefully you can design your application so that if you do need to rearchitect you can do it in phases.
"What's a use case where strict ordering is critically important?"
In general, as the use case grows, every use case where the developers did not make explicit and careful provision for ensuring that order is not important, with quite non-trivial effort.
Even a lot of systems whose developers think they have no ordering dependencies are wrong in at least one subtle way without realizing it.
If you need to megascale, you're going to have to bite the bullet and build a system that can handle out-of-order, but there's a lot of systems out there where you don't need megascale, and you can get rid of that "quite non-trivial effort" to deal with out-of-orderness by asking for messages to arrive in order.
To get a sense of just how useful that can be... bear in mind that every time you open a TCP socket instead of a UDP one, you just made exactly that choice, to use an ordered message system when you didn't "need" one. Take a look at everything you do with a TCP socket and think about trying to run it over UDP, and not with something like QUIC that basically adds half of TCP back on it, but with UDP straight-up. That's what kind of things can use in-order delivery... lots of things.
Almost everything can be simplified by guaranteed in-order delivery. It's just that some things can't afford the downsides.
I can understand your post, but I don't quite buy the TCP thing. I don't think anyone is using TCP for ordering, they're using it because they don't want their packet dropped.
I guess all of the systems I build are just built to assume no order/ or to leverage causal ordering, because that feels much easier to reason about - enforcing ordering feels really hard, and like something that a message bus can only do some of the work of.
> I can understand your post, but I don't quite buy the TCP thing. I don't think anyone is using TCP for ordering, they're using it because they don't want their packet dropped.
Think of (almost) any modern protocol built on top of TCP, and you'll see that ordering is critical. (http, smtp, telnet/ssh, etc.)
There's no ordering in HTTP. If you could send a whole HTTP request as a UDP packet you'd get exactly the same protocol (obviously sans WebSockets - but you could work around that).
I'm not sure that ordering doesn't matter for most TCP data. For example, HTTP depends on ordering. Any time you are transmitting messages larger than the size of a packet, you need some degree of ordering, even if it's only to reconstruct the individual messages, when you don't care about the order of the messages.
There are a couple of options without needing guaranteed ordering:
- jobs can have ever increasing ids, workers record the last seen id in one place, and ignore jobs with ids less than last seen
- job results are returned for each job to a supervisor. if a job result doesn't match current expected state, resend job. jobs should be idempotent in case a job is sent multiple times
If the create job is expensive, the latter solution would be less ideal, though.
Creating ever increasing ids reliably at scale is not trivial. You will probably end up having a single server generating these ids which will then become a single point of failure.
That doesn’t necessarily require that the entire queue is totally ordered, but the alternatives (such as Virtual Synchrony) are still considered arcane / research topics.
Now you need to queue up events for some time, reorder them using the timestamp, and then process them. It’s possible, but has overhead in both performance and custom code you’ll have to maintain. If there is no guarantee of order, two separate systems consuming those same events also might get different results, depending on the implementation, that can be problematic
For a single process on one box with one thread you can use something like that.
If you involve more than 1 box that goes out the window. Sometimes you can still get 'one timestamp' by making something else the owner of the timestamp. It also depends on your resolution of time and the process that does the ingesting. For example if that ingest process has more than one thread to handle things you can still get out of order/sametimestamp if not coded correctly.
If events are generated by different processes you cannot really guarantee that time is exactly the same for them, unless you do something fancy to ensure that.
Interesting.
The ordering here is when the event was generated or when the event entered the queue ?
I think the later and so I think the examples here don’t apply without something on top and a trade off
If you have one message source (a single thread or some kind of coordination), and the messages have lower frequency than the timestamp resolution, yes.
The farther you get from that, the more the answer is no.
That won’t work in all cases. For instance, if you get messages from devices which can be reimaged they may have clock skew in a period of time before they’re synchronized again.
But in any case you can't rely on the order of message ingress to your system to represent anything meaningful either? It would have to ensure that the key for defining order would have some hard logical ordering purpose for which time is not relevant or useful.
The order of message ingress can still be meaningful even if device clocks are skew or jump due to rebooting, reimaging, network time sync, frequency drift, etc.
A hard logical order arises from interactions. E.g. if the device receives a message, does something locally, goes through a clock change, and then sends a message dependent on one it fetched earlier, that's a logical order with out-of-order clock.
Or if a device gets a message, processes, sends something to another device, that one processes too then sends another message back to the original source, there's a logical order but with three different clocks. Even if the clocks are synchronised, there will be some drift and the messages may be processed fast enough that the drift puts their timestamps out of order.
That guarantee/assumption can never really be made.
Message A on event a' from system a might be sent to system b effecting event b' and thus message B to be sent by <insert medium here>, and consumed and correlated by consumer software MC on hardware mc.
However system B might take longer to flush it's hardware/software buffer and the message arrives at mc before message A, for example.
I've encountered this many times. That data has no meaning in itself except in the meta.
> Even if the clocks are synchronised, there will be some drift and the messages may be processed fast enough that the drift puts their timestamps out of order.
If you are consuming from sources which you cannot control the accuracy of the clocks, then you must inherently either reduce your reliance on the need for accuracy (many Windows systems have horrendous clock discipline in their software implementation) or find a way of ensuring accuracy. E.g. close proximity NTP or close proximity PTP etc etc.
I think you and I are agreeing, but it's not obvious ;-)
> However system B might take longer to flush it's hardware/software buffer and the message arrives at mc before message A, for example.
There are two message As in your system, the one sent to system b, and the one consumed by hardware mc. Let's call them Ab and Amc.
In that situation, message B is a consequence of message Ab which can be tracked, and at system mc (depending on semantics) it might be necessary to use that tracking to process message Amc before message B, at least logically.
For example message Ab/Amc might be "add new user Foo the our database with standard policy", and system B might react with "ok, my job is to set the policy for new users, I'll tell mc to add permission Bar to user Foo then activate Foo".
That works out fine as long as the logical order relation is maintained, and system mc, the database, processes message Amc first, regardless of arrival order.
Dependency tracking can ensure that (without clocks or timestamps), even if messages are transmitted independently. For example by message B containing a header that says it comes logically after message A.
The pubsub queue can also guarantee that order (without clocks or timestamps or dependency tracking), provided all messages go through the same pubsub system, and Ab+Amc are fanned-out by the pubsub system rather than sent independently by A to each destination. All bets are off if Ab and Amc take other routes.
> If you are consuming from sources which you cannot control the accuracy of the clocks, then you must inherently either reduce your reliance on the need for accuracy (many Windows systems have horrendous clock discipline in their software implementation) or find a way of ensuring accuracy. E.g. close proximity NTP or close proximity PTP etc etc.
If you think Windows is bad, try just about any cloud VM, which has a virtual clock and is stalled all the time in the guest, including just after the guest reads its virtual clock and before using the value :-)
I prefer systems which rely on logical ordering guarantees as much as possible, so clock drift doesn't matter.
When you are rely on a global clock order to ensure correct behaviour, you have to slow down some operations to accommodate for clock variance across the system, as well as network latency variance (because it affects clock synchronisation).
If you rely on logical order, then there's no need for time delays; no upper speed limit. Instead you have to keep track of dependencies, or have implicit causal dependencies. And it's more robust on real systems because clock drift and jumps don't matter.
In practice you need some clock dependence for timeouts anyway, and there are protocol advantages when you can depend on a well synchronised clock. So I prefer to mix the two for different layers of the protocol, to get advantages of both. Unfortunately for correct behaviour, timeouts are often the "edge case" that isn't well tested or correctly implemented at endpoints.
human conscious thought is local and single-threaded. it takes a lot of experience and training to be able to intuitively reason about non-local multi-threaded computation. if you're smart and humble you can try to simplify the problem by making individual messages independent from each other by e.g. employing redundancy but you still have to be aware that it's even a problem to begin with.
I see it a lot when integrating legacy healthcare systems that effectively operate on state-transition queues with an assumption of in-order processing.
Supports zero message loss, no headaches around topic partitions, in-order messaging, support for open apis/protocols, in-memory AND persistent quality of service, support for event mesh etc
This is definitely a surprising, if welcome, development from GCP. We used to be pretty significant users of Pub/Sub but migrated to Cloud Tasks after several discussions with our account manager indicated this wasn't the direction they wanted to go with PubSub.
The implementation here also seems to be somewhat unusual at first glass - in particular that a retry of any given tasks appears to also retry any subsequent message with the same "ordering key".
I do wonder what use cases this is targeted that wouldn't require pretty extensive work on the application side to ensure good idempotency. Does anyone have any ideas of problems this would solve in and of itself?
The ordering keys feature supports a large number of keys (though since the throughput limit is 1 MB/sec per key, many applications shouldn't have issues scaling up on a given key).
Imagine you have an order processing system where you have to 1) write to a database 2) write to a metrics log 3) and send an email to the customer. You can publish a message with the ordering key being the user who initiated the order. This means you are guaranteed to see message 1 before 2, which is seen before 3.
You do have to account for possible message re-deliveries. In this example, you can 1) write to the database with a order's unique ID (to prevent duplicate rows) 2) be fine with duplicates for metrics since a bit of duplication is okay (or maybe you have a job later that removes duplicates offline) 3) and be okay with sending emails to customers twice (pretty harmless). You may also keep a side-cache of processed messages to reduce the processing of duplicates, but that's a bit heavy and may not be necessary.
What Cloud Pub/Sub with ordering keys gets you in this scenario is 1) durability of published messages 2) scalability across keys 3) ordering between messages in a key 4) retries in case one step fails 5) buffering in case your subscribers are slow or down 6) a fully hosted service (no dealing with your own cluster, scales automatically) 7) global availability (no need to shard your subscription by region, simplifying your app).
Disclaimer: I work on Cloud Pub/Sub, but this explanation is my own.
Thanks for this explanation. Can you give a similarly concrete example of how, according to the docs: "When you receive messages in order and the Pub/Sub service redelivers a message with an ordering key, Pub/Sub maintains order by also redelivering the subsequent messages with the same ordering key. The Pub/Sub service redelivers these messages in the order that it originally received them." I'm a little confused about what scenario with ordering would lead to the need to re-send multiple messages.
If you're using pubsub to propagate state, then this lets you replicate with eventual consistency. (Think db replication, instant messaging, video game scores, etc.)
Seriously, I can't shake the feeling that most people who want this are likely very, very wrong and that this "feature" will turn out to be a footgun and a trap more often than a sniper rifle...
Why? It's just exposing the partitions underneath (which all horizontally scaled event/log systems use) and letting you order by the key within each partition.
This is exactly like Kafk, Pulsar, AWS Kinesis, Azure Eventhubs, and many others which are all being used by thousands of applications without issue.
Actually, it's not exactly exposing the partition underneath. The ordering key is the only unit a user has to deal with: think user ID or database row primary key. Throughput is limited to 1MB/s per ordering key, but the number of ordering keys is limited only by what can be represented in a 1KB string. One can have millions of keys if desired and they don't need to be known in advance.
The system scales automatically as needed, so there is no need to think in terms of partitions or worry about repartitioning when load increases. Order is preserved through this scaling without the user having to do anything.
Disclaimer: I work on Cloud Pub/Sub and am the primary engineer on the ordering feature.
There is a mapping from ordering key to internal storage structures, yes. The goal is that users are abstracted away from any notion of partition and the system handles scaling and ordering on its own, e.g., when there are hot shards.
Technically I agree, but I think in most cases, the alternative is the endpoints need to handle unordered messages correctly and that won't happen either.
So which is better: The queue does it, hiding subtle footguns because doing distributed systems reliably is hard; or the queue doesn't do it and the applications hanging off have failures more often (but not often enough for people to fix) because distributed systems reliability is still hard.
All of the uses of Kafka I have encountered in my professional career have been deployed simply to make something asynchronous, that is, kafka exists to be a low-impedance sink of things to do later in some other process. I've not run across one that actually needed those items to be processed in order. Therefore it is my impression that ordering can be a false requirement.
Agreed, also "processing in order" != "consuming from the sink in order". If everything upstream and downstream of the sink doesn't also happen in order then using an ordered sink is pointless.
Elaborate please, why is receiving messages in an ordered fashion a foot gun and a trap? Isn’t it totally acceptable to rely on this type of behavior if your use case demands it?
Because this will break in situations where you have multiple consumers or if there's any sort of redelivery/failure scenario.
Ordering only works if there's a single-threaded consumer somewhere that processes exactly one message at a time and blocks processing as soon as there is an error of any kind. The amount of scenarios that this works for is probably very small.
The footgun is that people don't realize the above constraints and assume that ordering will magically fix a lot of ordering issues (some more examples of this in the rest of this comments section).
To ensure ordering, systems like these have to block consumption on any given partition until it gets an ACK. If publishers of the data don't understand that, they can publish with too coarse a partition key which greatly decreases the amount of concurrent consumption you can have.
Not knocking how these systems work, it's more that people generally don't "read the fine print", especially these days with cloud services.
Head-of-line blocking has two aspects: crashes and long processing times. If the first message for an ordered key can't be acked because it crashes subscribers, we can't make progress on the messages for that key. If the first message, likewise, takes too long, then it delays processing of subsequent messages for that key.
Even without ordering, we still have to deal with variations of these issues. A message may still crash subscribers, be redelivered, and crash yet more subscribers. A message may delay the processing of other messages on the subscription.
Here's one way to deal with crashing messages: keep a side-cache of processing attempts for every message ID and ack those that have been processed too many times before processing them. Cloud Pub/Sub has a dead-letter queues feature to do this automatically. (It is not yet implemented for subscriptions with ordering keys - one can't enable both features.) We can do something similar for long messages - acking the message on timeout or processing more messages in parallel.
So, with ordering keys, as it stands, you do have the extra problem of dealing with messages-of-death (crash-inducing messages) yourself, but the long-processing time issue still exists in a slightly different form. Hopefully, we can add DLQs for ordering keys if there's sufficient customer demand.
Disclaimer: I work on the Cloud Pub/Sub team, but this is my own explanation.
This looks like a naive implementation of adding ordering keys on one side, and waiting until all messages arrive on the other in order, without improving the underlying delivery transport to support ordering.
Given the documentation now clearly states "Publishing messages with ordering keys might increase latency", I think that what Google is doing is effectively doing is putting lipstick on their service that does not support ordering and instead offering a message reconstruction capability on the subscriber end.
This approach, whilst it may work a lot of the time, is pretty flawed given one failed published message could cause the entire stream to stall indefinitely. And given Google Pub/Sub does not support idempotency, and does not support exactly-once delivery, they recommend "In general, accommodating more-than-once delivery requires your subscriber to be idempotent when processing messages."
Google, I am afraid whilst it's nice to welcome you to the exactly-once semantic party, I think you may need to go back to the drawing board and bake ordering and idempotency into the transport layer, in the same way Kafka and Ably do so that true ordering is supported.
Disclaimed: I am the co-founder of ably.com, a far edge enterprise messaging solution, with exactly-once semantics and real ordering :)
> the publisher is responsible for now maintaining arbitrary ordering IDs and passing this in the publish method
Ah, thanks for pointing this out. The "ordering ID" bit is actually a remnant of how to publish messages before we had officially ordering key support (and only shows up in Node, but not our other samples https://cloud.google.com/pubsub/docs/publisher#using_orderin...). We'll be fixing this sample shortly.
Disclaimer: I work on Cloud Pub/Sub. This explanation is my own.
Out of interest then, what is the reasoning behind the warning about additional latency. Is Google Pub/Sub doing some kind of reconstruction on the receiving end to piece together an unordered stream, or has Google Pub/Sub changed how the underlying streaming primitives work to natively support ordering.
The throughput limitation is 1MB/s per ordering key. The number of unique ordering keys allowed on a topic is limited only by what can be represented by a 1KB string, so very high throughput on a topic is still possible.
Disclaimer: I work on Cloud Pub/Sub and am the primary engineer on the ordering feature.
In the unordered case, there should be no noticeable performance degradation. This was something that we looked into extensively from both the server side as well as with our client libraries.
If anyone is looking for a solid alternative, check out Solace PubSub+. Been around for more than a decade and allows you to be cloud agnostic.
https://solace.com/products/event-broker/software/
(Can use in prod for free as well)
How does this feature work? is ordering key like kinesis partition Key. Does it somehow assign an Id to each subscriber and make sure that all messages for a key are sent to a single subscriber?
Yes, the idea is each ordering key is assigned to a single subscriber client, and all messages for that key are sent in order to that client. There is effectively a cursor which is advanced by acking. When we reassign the key to another client, we replay all messages starting from the cursor.
CPS doesn't use a notion of partitions under the covers like Kinesis. Each ordering key can be thought of constituting a "virtual" partition. You can have as many keys as you'd like, without specifying them ahead of time.
Disclaimer: I work on Cloud Pub/Sub, but this is my own explanation.
As my colleague, kamalaboulhosn, posted elsewhere in this thread:
"The throughput limitation is 1MB/s per ordering key. The number of unique ordering keys allowed on a topic is limited only by what can be represented by a 1KB string, so very high throughput on a topic is still possible."
So, it's more of a throughput limit, not a message limit, per key. With small messages, you can get high "TPS".
Is this just a buffer? Feels like any MQ could offer this within a given latency/timeout window. I’m pretty naive about this though, is it more involved?
Cloud Pub/Sub offers global topics/subscriptions that scale automatically and are completely managed for you (no dealing with your own cluster). With the ordering keys feature, there's no need to specify the number or ID of the ordering keys ahead of time (as one may need to do for traditional message queue partitions). In general, there's no provisioning ahead of time, so topics/subscriptions will scale as your usage grows. These are a few of the properties Cloud Pub/Sub offers compared to traditional self-hosted message queues.
Disclaimer: I work on the Cloud Pub/Sub team, but this explanation is my own.
I'm less concerned about scale or not managing my own cluster, and more about the mechanics of how this is implemented. Is it something like:
- User configures an ordering key
- Application sends a message w/ ordering key "4"
- Pub/Sub gets message w/ ordering key "4"
- Pub/Sub buffers "4" until it gets a message > "4"
- Application sends a message w/ ordering key "13"
- Pub/Sub sends "4"
- Pub/Sub buffers "13" until it gets a message > "13"
I just thought that the lack of order was in inherent tension with performance because you need a buffer to order things. Am I wrong about that (probably)?
1) Application sends a message w/ ordering key "abc" (let's call it msg1)
2) Pub/Sub gets msg1.
3) Pub/Sub acknowledges that msg1 was received. Now, you can be secure we have stored this message durably.
4) Pub/Sub sends msg1 to the subscriber ASAP if there's one available.
5) A subscriber client assigned to ordering key "abc" gets msg1.
6) The subscriber client processes msg1 and sends an ack to Pub/Sub.
7) Pub/Sub receives the ack and removes the message from storage. (It's possible to retain acked messages, but let's not talk about that for simplicity.)
Another message for the same ordering key "abc" arriving at Pub/Sub any time after step 4 will be delivered to the subscriber only after msg1. To summarize, the timestamp of a successful publish response is the timestamp by which we order messages sent to the client.
Pub/Sub may not send messages immediately if the subscriber is too slow to ack the message (perhaps each message takes a while to process) or if the subscriber is down or if there's too many messages in the backlog for the ordering key. In these cases, you can be assured that they're safely stored on disk and will be resent later (after some backoff period.)
So, to answer your question directly, we only stall the sending of ordered messages to subscribers if there's a backlog for that ordering key or if the subscriber not fast enough.
A better name would be partioning or shard key. It's not physically ordering the items based on the key, just hashing them into a partition. The physical time they are received by the service is the order they're stored and returned as.
This kind of confuses me a bit. Why would you want to turn Pub/Sub into a queue?
How badly does this affect message acknowledgment and retries? I assume just a huge hit to latency. This seems like a horrible idea for anyone expecting to use multiple subscribers or expecting to chunk multiple messages per request.
Services relying on Pub/Sub should be idempotent anyway. If you need to work around that for some reason, you are better off dumping messages from your subscriber into RabbotMQ or Redis for processing and use a Subscriber/Scheduler/Worker pattern.
Having used both types. Turning the pub/sub into a queue has some advantages in debugging and processing. Kafka has the idea of each queue being partitioned and having hash keys. Which means you can have a bunch of processes reading from the same queue and no one really steps on each other. Basically sharding at the data stream level with guaranteed ordering. It is a neat concept. Another is playback. Kafka uses a groupid/offset to keep track of where you are at. Another nice bit is messages are decently hard to lose as they stick around and you can playback by just moving the offset. The update is maybe 10 bytes into a memorybacked filestore. At first I too was skeptical of perf but it can scale very nicely and lets you scale a topic horizontally as well as vertically. In the background you have an expire time for a message. So maybe you only keep it for one week. Or you can set it to last years. For something like that you would be better off putting it in a db table though.
Idempotent is a good idea even in a system like this. But it is not always possible as your upstream data sources may be something very different.
> Services relying on Pub/Sub should be idempotent anyway. If you need to work around that for some reason, you are better off dumping messages from your subscriber into RabbotMQ or Redis for processing and use a Subscriber/Scheduler/Worker pattern.
Exactly. Pub/Sub is made to send messages to any consumer. To bring in order you direct messages to a specific consumer. Alright well Pub/Sub wasn't the right tool then.
Edit: by this I only mean to say well done to the Google team.