There's some excellent choices in this design, but using Kafka in the middle makes no sense. The Cassandra change-data segments are written in batches, and BigQuery loads in batches. So the author is splitting up change-data segments into individual rows, feeding those rows into Kafka, and then re-batching them on the BQ side for ingestion.
The use of a streaming system adds a tremendous amount of complexity for no benefit, because the original data source is batch. A simpler solution would be to process the data in 5-minute batches and use a BigQuery MERGE statement to apply that "patch" to the existing table.
You really shouldn't use Kafka unless you have a true streaming data source and a hard latency requirement of < 5 minutes. It's much easier to work with data in small batches, using local disk and blob stores like S3 as your queue.
Thanks for the feedback! I totally agree with you that using a streaming approach seems like an overkill. Actually making the pipeline truly real-time is part of our future work for Cassandra CDC (This is mentioned in Part 2 of this blogpost, which is expected to be published next week). In Cassandra 4.0, the CDC feature is improved to allow real-time parsing of commit logs. We are running 3.X in production, however we plan to add 4.0 support in the future.
What does the 4.0 api look like? I would home that they would expose a high-level interface for change data, similar to SQL Server or Snowflake streams. Most databases leave this as an afterthought, which is so unfortunate.
The high-level idea is hard-linking the commit log segment file in cdc directory as it is being written + have a separate index file to keep track of offset.
Relying on an eventual consistency database sounds like a huge challenge in terms of getting absolute consistency between Cassandra and Kafka. There seems to be many edge cases that could cause the data in Kafka to get out of sync, it doesn't sound very reliable. Especially if you want to rely on Kafka for event sourcing later on, which is sounds like you're planning on in the future. For a payments company that sounds very risky.
What happens if you write a row to Cassandra but that row is invalidated because of failed writes to the other nodes in the cluster (ex. the write was QUORUM and it succeeded on the node that you're reading from but failed on all other nodes). In Cassandra, when an error occurs, there are no rollbacks, and the row gets read-repaired. It sounds like that row will get outputted to Kafka. How do you "disappear" data written into Kafka if it gets read-repaired?
great question actually. There is a nuance in cassandra regarding to read-repair. If you issue a QUORUM read and the read itself failed (say, because 2/3 nodes are down), but succeeded on 1 node; and later on you read again when all three nodes are up, the failed write will actually be resurrected during read repair due to last write wins conflict resolution.
This is why it is recommended to always retry when a write fails. This way data might arrive at BQ slightly before cassandra itself for a failed write, but eventually they will be consistent.
That said, at WePay cassandra is being used by services which do not have to be strongly consistent at the moment. Not to say it’s not possible, as there are companies that use Cassandra’s light weight transaction feature for CAS operations.
As for consistency between kafka and cassandra, agree that its hard to make it 100% consistent (due to a number of limitations cassandra poses). This is why we tend to be pretty careful about which services should use cassandra, and specify clearly what is the expectation and SLA for cassandra cdc.
I recently had a junior dev propose using Kafka as a business ‘event’ system, to trigger or be triggered by webhooks, in order to integrate our API with a third party CRM.
What's your point? Your comment comes off as dismissive and incomplete. Why was the juniors suggestion bad? It may have totally been a garbage suggestion, but if it was explain the reasoning.
Not the author of the relevant post but assuming he's tailing off of the first response I can only surmise they meant that in their case Kafka was unnecessary/a bad choice due to the restraints posed (<5 minute delay, streaming input, etc). Agree that without context it's an incomplete comment that does come off as dismissive, ("junior dev") seems to be meant as belittling/insulting.
Aurora and Redshift do not scale well enough for companies at their scale (only 64TB for Aurora last I checked, and Redshift falls over near the 100TB level). They'd be looking at DynamoDB.
I don’t think any managed AWS service scales well for any bigger enterprise which have to handle a decent amount of traffic. Once you reach certain threshold you soon hit some throttling and limits BS and AWS solution is just to throw more money at them.
DynamoDB scales a lot. It falls into the "throw money at it," bucket though, and you need to have a pattern that shards well with it. If you have a reason they will basically always increase your limits, too.
There's a re:invent presentation of them having an in-house 1pb+ cluster. What they don't tell you is that 101 8xl nodes is half a million dollars a year in reserved instances, before you include any other costs associated with it; and that particular workflow (log scanning) is very nicely suited for any columnar store.
Operations are also a disaster with Redshift; anything where you have to touch the cluster itself at any scale past a handful of nodes typically requires a support ticket with "hey, when this breaks, please work your magic and fix it?" There's also the issue of tuning your queues, which is a whole extra layer that you, the customer, must tune. Their suggestion tools are getting better on that front though.
Just use something else if you have more than a few TB, or you have a ton of time and money to just throw around.
Out of curiosity, is the binlog accessible on Aurora/MySQL? We've lots of users of the Debezium MySQL connector on AWS, but would be interesting to know whether it'd work with Aurora, too.
(Disclaimer: I'm the lead of Debezium, which is the open-source CDC platform to which WePay are contributing their Cassandra work)
I'm not saying nosql dbs don't have their place, but...
Articles like this are exactly why my current company moved to cassandra, and why I regret not having tried harder to stop them (but I was new, didn't knew cassandra and I'm digressing)
Cassandra is (very probably) wrong for your use case.
The only model that makes sense in cassandra is where you insert data, wait for minutes for the propagation, hope it was propagated, then read it, while knowing either the primary/clustering key or the time of the last read data (assuming everything has the timestamp in the key, and has been replicated)
Anything else in cassandra feels like a hack.
Deleting stuff is kinda bad, it will remain there (marked as tombstone) until the "grace period" has expired (default 10 days). It is really bad unless you put the timestamp in the keys, which you basically must do in cassandra. default 100k tombstones ad your query will be dropped.
You want a big grace period for the data, 'cause if a node goes down that is exactly the maximum time you have to bring it up and hope all data has finished resyncing. Otherwise deleted data will come up again, and your node will have less data. If the grace period has passed or are not confident in the replication speed, absolutely delete that node and add a new one.
Bigger grace period means more tombstones (unless you never, ever delete data). Even if all the data/tombstones have been replicated to all nodes, the tombstones remain. Running manual compaction once prevents the automatic compaction from running ever again.
You need to pay attention to never scan tombstones, so no queries like select without where, always use a LIMIT or where that will ensure you will not scan deleted data.
(but don't use 'where' on columns that are not keys... Unless X, unless Y on the unless X and so on for everything)
They need kafka because the probably need a queue, and there is no way to implement those in semi-realtime in cassandra without the tombstone problem killing your queries.
Features have been added and then deprecated (see material views), or very limited in their use (secondary indexes, which basically work only per-partition).
Queries that are not plain select/insert (or update, which is the same as insert) have loads of special cases and special cases on the special cases.
LWT are much slower, and are the only transaction/consensus you will find. for single-row only.
COPY TO/FROM works with csv, but will not properly escape all kinds of data, giving you all kinds of fun. Hell, manual copy-pasting a timestamp from a select to an insert does not work, the format a tiny bit different.
Backups/restore procedures seem complex and amateurish, especially when you have to restore in an other cluster.
You can select how many nodes to query for an operation, but things like "Quorum" are not a transaction or consensus. BATCH is atomic, not isolated. read-after-write works only on exactly the same node (unless you write to ALL nodes, and I have not checked what happens if one is down).
And a sh*tload of other stuff.
I could go on, but even if this fits your use case, don't use cassandra.
Try scylladb first. Same protocol, missing material views and LWT (until next year apparently), but 10x as fast, probably due to being C++ vs java
This, especially, is what my team also found to be true: "The only model that makes sense in cassandra is where you insert data, wait for minutes for the propagation, hope it was propagated, then read it, while knowing either the primary/clustering key or the time of the last read data."
We use Cassandra in production at my company, but only after much scope reduction to the use case.
We needed a distributed data store for real-time writes and delayed reads for a 24x7 high-throughput shared-nothing highly-parallel distributed processing plant (implemented atop Apache Storm). Cassandra fit the bill because writes could come from any of 500+ parallel processes on 30+ physical nodes (and growing), potentially across AZs in AWS. And because the writes were a form of "distributed grouping" where we'd later look up data by row key.
At first, we tried to also use Cassandra for counters, but that had loads of problems. So we cut that. We briefly looked at LWTs and determined they were a non-starter.
Then we had some reasonable cost/storage constraints, and discovered that Cassandra's storage format with its "native" map/list types left a lot to be desired. We had some client performance requirements, and found the same thing that hosed storage also hosed client performance. So then we did a lot of performance hacks and whittled down our use case to just the "grouping" one described above, and even still, hit a thorny production issue around "effective wide row limits".
I wrote up a number of these issues in this detailed blog post.
The good news about Cassandra is that it has been a rock-solid production cluster for us, given the whittled-down use case and performance enhancements. That is, operationally, it really is nice to have a system that thoughtfully and correctly handles data sharding, node zombification, and node failure. That's really Cassandra's legacy, IMO: that it is possible to build a distributed, masterless cluster system that lets your engineers and ops team sleep at night. But I think the quirky/limited data model may hold the project back long-term, when it comes to multi-decade popularity.
We moved away from datastax for a number of reasons, one of the biggest was the lifecycle manager. For us, it silently failed to make backups, created backups that were unrecoverable, and other generally nasty stuff.
The worst part was with every update, something broke that should have been caught be QA. It felt like nobody was regularly testing their backups, or they rolled their own(which defeats the point of buying support)
Having said that, if you wanted to stream things like log data, cassandra was brilliant.
However, it collapsed like a sack of poop as soon as you wanted to do any kind of reading. In the end we replaced a 6 node r4.4xlarge 4TB cluster with a single t2.medium Postgres instance.
Can you please elaborate a little bit more of how you replaced it by postgres? Because it is strange that a single box of postgres in way less powerful instance type would perform the same as your cassandra cluster.
This kind of seems that the first solution was way over engineered or was built for different requirements.
Oh it was a total mistake, Fortunately it wasn't mine. But I did have to support it and migrate away from it.
Cassandra session are quite heavy, We have a large farm that spins up, does stuff and closes down. So thats the first problem. (yes we used kafka to pipeline the data in, and that worked well, but...)
It _used_ to be a very heavy write/read ratio. But as time went on, we needed to read more and more thing concurrently.
Because its "distributed" and basically a glorified token ring system, throughput drops dramatically as load increases.
We are not inserting that much data, just lots and lots of records. We then do a geospatial query later on to pull that data back. postGIS is far better at handling this, compared the datastax graph layer + solr(ie, the full datastax "stack" ).
But honestly, we could have coped with that, if the backups worked. That and shipping code with a 4 year old CVE that could have been easily remedied if they'd bothered to do an automated scan.
Every point release would involved 1-5 days of hard work from me. considering the support cost was > my wage, that stung quite a lot.
I'm really curious why MySQL was dropped instead of being optimised?
SQL databases can (and do) scale to high volumes, and would have avoided a lot of re-engineering. What was the final blocker for WePay that meant that optimising SQL could go no further?
SQL on it's own scales to a point, that point being however large you can make a single machine. Which can be quiet small depending on the data sizes in question. Beyond that you need to use something like Vitess (which the article mentions) but that comes with it's own overheads and caveats. You no longer have SQL really, you have Vitess. A couple years ago a company I was at tried to use Vitess but found too many edge cases and too much operational overhead.
They don't really scale since MySQL / PG don't have built in features for horizontal scaling / sharding ect ... If you want to make it scale you have to do a lot of things manually / use third party tools / solutions.
It's still a "double write" but you may not need distributed transactions if you're happy with at-least-once writes to Kafka and deduplication on the consuming side.
Redundancy. Only one of the scrollbars is actually active at the moment, but if it fails, a leadership election will take place, and one of the others will take over as the active.
This was a remarkably detailed and honestly confusing post from Tencent, until I noticed the Chase logo at the top, and it was not by Tencent. (The WeChat team in Tencent there is very small, close, integrated).
Pretty major name clash here, WeChat pay aka WePay, perhaps WeChat Pay, and well, another WePay.
For those not familiar, WeChat, somewhat a WhatsApp+/Facebood Messenger of China, probably earlier mover however riding from QQ, transact billions of yuan in payments every day and together with AliPay is promoting a cashless society (which I think is wrong, I like cash).
The use of a streaming system adds a tremendous amount of complexity for no benefit, because the original data source is batch. A simpler solution would be to process the data in 5-minute batches and use a BigQuery MERGE statement to apply that "patch" to the existing table.
You really shouldn't use Kafka unless you have a true streaming data source and a hard latency requirement of < 5 minutes. It's much easier to work with data in small batches, using local disk and blob stores like S3 as your queue.