> While investigating issues like KAFKA-17754, we also encountered unseen writes in Kafka. Owing to time constraints we have not investigated this behavior, but unseen writes could be a sign of hanging transactions, stuck consumers, or even data loss. We are curious whether a delayed Produce message could slide into a future transaction, violating transactional guarantees. We also suspect that the Kafka Java Client may reuse a sequence number when a request times out, causing writes to be acknowledged but silently discarded. More Kafka testing is warranted.
Seems like Jepsen should do another Kafka deep-dive. Last time was in 2013 (https://aphyr.com/posts/293-call-me-maybe-kafka, Kafka version 0.8 beta) and seems like they're on the verge of discovering a lot of issues in Kafka itself. Things like "causing writes to be acknowledged but silently discarded" sounds very scary.
Not that I'm a Kafka user, but I greatly appreciate your posts, so thank you :)
Maybe Kafka users should do a crowdfund for it if the companies aren't willing. Realistically, what would the goal of the crowdfund have to be for you to consider it?
I don’t think ayphr would disagree with me when I say that FDB’s testing regime is the gold standard and Jepsen is trying to get there, not the other way around.
I'm not sure. I've worked on a few projects now which employed simulation testing and passed, only to discover serious bugs using Jepsen. State space exploration and oracle design are hard problems, and I'm not convinced there's a single, ideal path for DB testing that subsumes all others. I prefer more of a "complete breakfast" approach.
On another axis: Jepsen isn't "trying to get there [to FDB's testing]" because Jepsen and FDB's tests are solving different problems. Jepsen exists to test arbitrary, third-party databases without their cooperation, or even access to the source. FoundationDB's test suite is designed to test FoundationDB, and they have political and engineering buy-in to design the database from the ground up to cooperate with a deterministic (and, I suspect, protocol-aware) simulation framework.
To some extent Antithesis may be able to bridge the gap by rendering arbitrary distributed binaries deterministic. Something I'd like to explore!
Has your opinion changed on that in the last few years? I could have sworn you were on record as saying this about foundation in the past but I couldn’t find it in my links.
I don't think so, but I've said a lot about databases in the last fifteen years haha.
Sometimes I look at what people say about FDB and it feels like... folks are putting words in my mouth that I don't recognize. I was very impressed by a short phone conversation with their engineers ~12 years ago. That's good, but that's not, like, a substantive experimental evaluation. That's "I focus my unpaid efforts on databases which seem more likely to yield fun, interesting results".
Hey mate, think we interacted briefly on the Confluent Slack while you were working on this, something about outstanding TXes potentially interfering with consumption in the same process IIRC?
This isn't the first time you've discussed how parlous the Kafka tx spec is - not that that's even really a spec as such. I think this came up in your Redpanda analysis.
(And totally agree with you btw, some of the worst ever customer Kafka issues I dealt with at RH involved transactions.)
So was wondering what your ideal spec would look like, because I'd be interested in trying to capture the tx semantics in something like TLA+ as a learning experience - and because it would only help FOSS Kafka and FOSS clients improve, especially now that Confluent has withdrawn so much from Apache Kafka development.
I'm not really sure how to answer this question, but even a few chapters worth of clear prose would go a long way. We lay out a bunch of questions in the discussion section that would be really helpful in firming up intended txn semantics.
> [with the default enable.auto.commit=true] Kafka consumers may automatically mark offsets as committed, regardless of whether they have actually been processed by the application. This means that a consumer can poll a series of records, mark them as committed, then crash—effectively causing those records to be lost
That's never been my understanding of auto-commit, that would be a crazy default wouldn't it?
The docs say this:
> when auto-commit is enabled, every time the poll method is called and data is fetched, the consumer is ready to automatically commit the offsets of messages that have been returned by the poll. If the processing of these messages is not completed before the next auto-commit interval, there’s a risk of losing the message’s progress if the consumer crashes or is otherwise restarted. In this case, when the consumer restarts, it will begin consuming from the last committed offset. When this happens, the last committed position can be as old as the auto-commit interval. Any messages that have arrived since the last commit are read again. If you want to reduce the window for duplicates, you can reduce the auto-commit interval
I don't find it amazingly clear, but overall my understanding from this is that offsets are committed _only_ if the processing finishes. Tuning the auto-commit interval helps with duplicate processing, not with lost messages, as you'd expect for at-least-once processing.
It is a little surprising, and I agree, the docs here are not doing a particularly good job of explaining it. It might help to ask: if you don't explicitly commit, how does Kafka know when you've processed the messages it gave you? It doesn't! It assumes any message it hands you is instantaneously processed.
Auto-commit is a bit like handing someone an ice cream cone, then immediately walking away and assuming they ate it. Sometimes people drop their ice cream immediately after you hand it to them, and never get a bite.
Information on the internet about this seems unreliable, confusing and contradictory... It's crazy for something so critical, especially when it's enabled by default.
Note: Using automatic offset commits can also give you "at-least-once" delivery, but the requirement is that you must consume all data returned from each call to poll(Duration) before any subsequent calls, or before closing the consumer.
E.g. the following commits every 10s - on each call to `poll`, it doesn't automagically commit every 5 s.
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "5000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Thread.sleep(10_000);
}
Just a note: I am not claiming it is working correctly, only saying there is a clear and documented way how the client knows when to commit, and that it works as expected in a simple scenario.
> if you don't explicitly commit, how does Kafka know when you've processed the messages it gave you?
I did expect that auto-commit still involved an explicit commit. I expected that it meant that the consumer side would commit _after_ processing a message/batch _if_ it had been >= autocommit_interval since the last commit. In other words, that it was a functionality baked into the Kafka client library (which does know when a message has been processed by the application). I don't know if it really makes sense, I never really thought hard about it before!
I'm still a bit skeptical... I'm pretty sure (although not positive) that I've seen consumers with autocommit being stuck because of timeouts that were much greater than the autocommit interval, and yet retrying the same message in a loop
Auto commit has always seemed super shady. Manual commit I have assumed is safe though - something something vector clocks - and it’d be really interesting to know if that trust is misplaced.
What is the process and cost for having you do a Jepsen test for something like that?
It is a bit of splitting hairs in some sense, but the key concept here is just because the message was delivered to the Kafka client successfully, does not mean it was processed by the application.
You will have to explicitly ack if you want that guarantee. For a concrete example, lets say all you do with a message is write it to a database. As soon as that message is in your client handler callback, that message is ack'ed. But you probably only want that ack to happen after a successful insert into the DB. The most likely scenario here to cause unprocessed messages is that the DB is down for whatever reason (maybe a network link is down, or k8s or even a firewall config now prevents you from accessing), and at some point during this your client goes down, maybe by an eng attempting a restart to see if the problem goes away.
It is my understanding that the reason why this is is high performance situations. You have some other system that can figure out if something fail, but with this feature you can move the high water mark so that you don't have to redo as much. But if you got the timing right and there is a failure you can go ahead and assume that when you restart again you'll be getting some stuff that you already processed. The problem is when you don't have this for mailing before the auto commit. It is meant to be done far after processing in my reading of it, but it does certainly seem like there's a contradiction that it should auto commit but only stuff so many milliseconds before the auto commit time?
I can maybe give some justification for why this feature exists. It's designed for synchronous, single-threaded consumers which do something like this:
loop {
1. Call poll
2. Durably process the messages
}
I think a point of confusion here is that the auto-commit check happens on the next call to poll—not asynchronously after the timeout. So you should only be able to drop writes if you are storing the messages without durably processing them (which includes any kind of async/defer/queues/etc.) before calling poll again.
(I should say—this is the documented behavior for the Java client library[0]—it's possible that it's not actually the behavior that's implemented today.)
The Kafka protocol is torn between being high-level and low-level, and as a result it does neither particularly well. Auto commit is a high-level feature that aims to make it easier to build simple applications without needing to really understand all of the moving pieces, but obviously can fail if you don't use it as expected.
I'd argue that today end users shouldn't be using the Kafka client directly—use a proper high level implementation that will get the details right for you (for data use cases this is probably a stream processing engine, for application use cases it's something like a duration execution engine).
I’m looking at the product page [0] and wondering how those two statements are compatible:
> Bufstream runs fully within your AWS or GCP VPC, giving you complete control over your data, metadata, and uptime. Unlike the alternatives, Bufstream never phones home.
> Bufstream pricing is simple: just $0.002 per uncompressed GiB written (about $2 per TiB). We don't charge any per-core, per-agent, or per-call fees.
Surely they wouldn’t run their entire business on the honor system?
> As of October 2024, Bufstream was deployed only with select customers.
my assumption would be an honor system might be doable. They are exposing themselves to risk of abuse of course but it might be a worthy trade off for getting certain clients on board.
That's correct. We hop onto Zoom calls with our customers on an agreed cadence, and they share a billing report with us to confirm usage/metering. For enterprise customers specifically, it works great. They don't want to violate contracts, and it also gives us a natural check-in point to ensure things are going smoothly with their deployment.
If a company makes unambiguous claims in their advertising which turn out to be false, they will get sued and maybe even fined by regulators. Much of the world does in fact operate on this kind of trust.
Firewalls solve problems like this. Relying on a secret callback mechanism to run your business is high risk, when you have sophisticated customers. A number of industries have low risk tolerance and default deny traffic, which would make those industries inaccessible to a company that tried to operate that way.
If you are looking for fun targets, may I suggest KubeMQ too? Its author claims that it’s better than Kafka, Redis and RabbitMQ. It’s also "kubernetes native" but the open source version refuses to start if it detects kubernetes.
They seem to have pivoted from protobuf tools to kafka alternatives. I don't think bufstream is OSS (yet). Or at least, they have very much de-emphasized their original offering on their site.
Nope! We're still heavily investing in scaling Protobuf. In fact, our data quality guarantees built into Bufstream are powered by Protobuf! This is simply an extension of what we do...Connect RPC, Buf CLI, etc.
I don't think bufstream itself is open source but there's https://github.com/bufbuild/bufstream-demo which may be close to what you want (but is also unlicensed, bizarrely)
That's correct. Bufstream is not open source, but we do have a demo that you can try. I've asked the team to include a proper LICENSE file as well. Thanks for catching that!
After reading thru the relevant blog posts and docs, my understanding is that Kafka defines "exactly-once delivery" as a property of what they call a "read-process-write operation", where workers read-from topic 1, and write-to topic 2, where both topics are in the same logical Kafka system. Is that correct? If so, isn't that better described as a transaction?
Kafka actually does call these transactions! However (and this is a loooong discussion I can't really dig into right now) there's sort of two ways to look at "exactly once". One is in the sense that DB transactions are "exactly once"; a transaction's effects shouldn't be duplicated or lost. But in another sense "exactly once" is a sort of dataflow graph property that relates messages across topic-partitions. That's a little more akin to ACID "consistency".
You can use transactions to get to that dataflow property, in the same sort of way that Serializable transaction systems guarantee certain kinds of domain-level consistency. For example, Serializability guarantees that any invariant preserved by a set of transactions, considered purely in isolation, is also preserved by concurrent histories of those transactions. I think you can argue Kafka intends to reach "exactly-once semantics" through transactions in that way.
This cuts both ways, choosing to not implement flawed portions of the spec could be seen as a good thing. I've always been a bit suspicious of the value of "bug for bug compatibility". You don't actually need transactions in Kafka in normal operation IME. I've never tried to use "streams" before and have never encountered a case where I thought they were a good trade. Better to implement that kind of stuff in a way I can control.
Both are true, but we use "transactions" for clarity, since the semantics of consumers outside transactions is even murkier. Every read in this workload takes place in the context of a transaction, and goes through the transactional offset commit path.
Ah, got it; I was assuming that “transactions” was referring to the transactions mentioned as the subject of the previous sentence, not the transactions active in consumers observing those. My mistake!
Jepsen is for making you cry if you didn't know they're testing the database you develop. Of course, those are tears of joy, because having jepsen's attention is an achievement in itself.
> We would like to combine Jepsen’s workload generation and history checking with Antithesis’ deterministic and replayable environment to make our tests more reproducible.
Furthermore, I'm aware of multiple banks currently using Kafka. One would hope that they're not using it in their core banking system given Kyle's findings
Maybe they'd be interested in funding a Jepsen ~attack~ experiment on Kafka
I see. I never trusted transactions and advised our app teams to not rely on them, at least without outside verification of them.
The situation is actually far worse with any client relying on librdkafka. Some of this has been fixed, but my company found at least a half dozen bugs/uncompleted features in librdkafka, mostly around retryable errors that were sometimes ignored, sometimes caused exceptions, and other times just straight hung clients.
Despite our company leaning heavily on Confluent to force librdkafka to get to parity with the Java client, it was always behind, and in general we started adopting a stance of not implementing any business critical functions on any feature implemented in the past year or major release.
Seems like Jepsen should do another Kafka deep-dive. Last time was in 2013 (https://aphyr.com/posts/293-call-me-maybe-kafka, Kafka version 0.8 beta) and seems like they're on the verge of discovering a lot of issues in Kafka itself. Things like "causing writes to be acknowledged but silently discarded" sounds very scary.