Hacker News new | past | comments | ask | show | jobs | submit login
Streaming joins are hard (estuary.dev)
115 points by danthelion 2 days ago | hide | past | favorite | 54 comments





The correct way to think about the problem is in terms of evaluating joins (or any other queries) over changing datasets. And for that you need an engine designed for *incremental* processing from the ground up: algorithms, data structures, the storage layer, and of course the underlying theory. If you don't have such an engine, you're doomed to build layer of hacks, and still fail to do it well.

We've been building such an engine at Feldera (https://www.feldera.com/), and it can compute joins, aggregates, window queries, and much more fully incrementally. All you have to do is write your queries in SQL, attach your data sources (stream or batch), and watch results get incrementally updated in real-time.


Is it related to Differential Dataflow / timely dataflow https://github.com/TimelyDataflow/differential-dataflow

We have our own formal model called DBSP: https://docs.feldera.com/papers

It is indeed inspired by timely/differential, but is not exactly comparable to it. One nice property of DBSP is that the theory is very modular and allows adding new incremental operators with strong correctness guarantees, kind of LEGO brick for incremental computation. For example we have a fully incremental implementation of rolling aggregates (https://www.feldera.com/blog/rolling-aggregates), which I don't think any other system can do today.


Fast rolling aggregates are swell. I meet a lot of people who are building trading systems and want this sort of thing, but it usually isn't a great choice because the perfectly rectangular kernel probably isn't the best possible version of the feature and because arbitrary kernels can be well approximated using a state of constant size rather than a large buffer storing a sliding window.

Are you aware of any efforts to apply DBSP's theory to a general programming language/environment? From my perspective, DDlog was the most inspiring project in the field of incremental computation, but it seems like all of these projects just lead to implementations of streaming databases or other similar commercial products that fit into Data™ pipelines (no offense). Incremental computation pops up everywhere, from databases to business logic to UI rendering and video game graphics, and I have this hunch that if the problem could be solved at a fundamental level and in an accessible way, we could have revolutionary gains for programmers and programs.

Thanks for the kind words about DDlog :)

The reason DBSP and Differential Dataflow work so well is because they are specialized to relational computations. Relational operators have nice properties that allow evaluating them incrementally. Incremental evaluation for a general purpose language like Rust is a much, much harder problem.

FWIW, DBSP is available as a Rust crate (https://crates.io/crates/dbsp), so you can use it as an embedded incremental compute engine inside your program.


Indeed. I've experimented a bit with abusing DD/DBSP for my purposes by modeling various kinds of data structures in terms of Z-sets, but these efforts have not yielded very impressive results. :)

For how elegant DBSP is I still found the paper a tough nut to crack, and it really is one of the more accessible theoretical contributions in the space, at least from this grubby programmer's perspective... I hope to devote some time to study and play around more, but in the meantime I'm rooting for you!


Thanks again!

You may want to check out this tutorial for a hands-on introduction to DBSP: https://docs.rs/dbsp/0.28.0/dbsp/tutorial/index.html


Also there's now a DBSP implementation in pure Python! https://github.com/brurucy/pydbsp

(Not who you are replying to) Not sure if it’s specifically related to DBSP but checkout incremental DataFun (slide ~55 of https://www.rntz.net/files/stl2017-datafun-slides.pdf) and the paper cited there: A Theory of Changes for Higher Order Languages: Incrementalizing Lambda-calculi by Static Differentiation (Cai et. al, PLDI 2014).

Does this offer any non-SQL programmatic interfaces or ways to do Complex Event Processing (e.g. https://www.databricks.com/glossary/complex-event-processing )? A lot of those scenarios would be tough to express in SQL.

Yes, you can write Rust UDFs with Feldera and even use the dbsp crate directly if you'd like.

Hi, I’ve read the DBSP paper and it’s a really well-thought out framework; all the magic seemed so simple with the way the paper laid things out. However, the paper dealt with abelian Z-sets only, and mentioned that in your implementation, you also handle the non-abelian aspect of ordering. I was wondering if you guys have published about how did you that?

Apologies about the confusion. We indeed only solve incremental computation for Abelian groups, and the paper is making a case that database tables can be modeled as Abelian groups using Z-sets, and all relational operators (plus aggregation, recursion, and more) can be modeled as operations on Z-sets.

Yes, I might have misworded my question. My question is in relation to this paragraph on page 12:

"Note that the SQL ORDER BY directive can be modelled as a non-linear aggregate function that emits a list. However, such an implementation is not efficiently incrementalizable in DBSP. We leave the efficient handling of ORDER BY to future work."

My understanding is that Feldera does indeed support ORDER BY, which I imagine it does incrementally, thus my question.

The statement in the paper that ordering is not efficiently incrementalisable seems to makes sense to me. It is clear that even though Z-sets are not naively able to represent diffs of ordered relations (since Z-sets are unordered), ordering can be modelled as an aggregate that first builds up the first row, then the second row, and so on. Even as formulated this way however, I fail to see how the entire "incrementalised" computation would still be practically incremental, in the sense that the size of the output diff (Z-set) is small as long as the diff of the input is small.

For example, consider the query `select x from y order by x asc`, and the following values respectively occur in the stream of x: 5, 4, 3, 2, 1. Now, consider the incremental diff for the last value of 1. If presumably one models order by a list aggregation, then the Z-set for the entire computation seems to be

    - [ 2, 3, 4, 5 ]
    + [ 1, 2, 3, 4, 5 ]
which grows with the size of the output set rather than the size of the input diff. If presumably one models order by e.g. adding an order column, the diff would be

    - 2 (index: 0)
    - 3 (index: 1)
    - 4 (index: 2)
    - 5 (index: 3)
    + 1 (index: 0)
    + 2 (index: 1)
    + 3 (index: 2)
    + 4 (index: 3)
    + 5 (index: 4)
which once again varies with the size of the output set.

I was with you and thinking Postgres over and over until the second paragraph. Which isn’t to say anything bad about your product, it sounds very cool.

But i’d work in “just like Postgres”.


Good point. The goal is indeed to be a Postgres of incremental computing: any SQL query should "just work" out of the box with good performance and standard SQL semantics. You shouldn't need a team of experts to use the tool effectively.

Can someone explain what the use case is for streaming joins in the first place?

I've written my fair share of joins in SQL. They're indispensable.

But I've never come across a situation where I needed to join data from two streams in real time as they're both coming in. I'm not sure I even understand what that's supposed to mean conceptually.

It's easy enough to dump streams into a database and query the database but clearly this isn't about that.

So what's the use case for joins on raw stream data?


Event correlations are a typical one. Think about ad tech: you want every click event to be hydrated with information about the impression or query that led to it. Both of those are high-volume log streams.

You want to end up with the results of:

``` select * from clicks left join impressions on (clicks.impression_id=impressions.id) ```

but you want to see incremental results - for instance, because you want to feed the joined rows into a streaming aggregator to keep counts as up to date as possible.


That's helpful, thanks.

I was definitely under the impression that ad impressions and clicks would be written to databases immediately and queried from there.

I'm still having a hard time imagining in what case you'd need a "live" aggregating display that needed to join data from multiple streams, rather than just accumulating from individual streams, but I guess I can imagine that there are circumstances where that would be desired.

Thanks!


Live-updated aggregates are quite common in this area. Consider metered billing ("discontinue this ad after it has been served/clicked/rendered X times"), reactive segmentation ("the owner of a store has decided to offer a discount to anyone that viewed but did not purchase products X, Y, and Z within a 10 minute period"), or intrusion detection ("if the same sequence of routes is accessed quickly in rapid succession across the webserver fleet, regardless of source IP or UA, send an alert").

In a very large number of cases, those streams of data are too large to query effectively (read: cheaply or with low enough latency to satisfy people interested in up-to-date results) at rest. With 100ks or millions of events/second, the "store then query" approach loses fidelity and affordability fast.


I think it can be challenging to get that much data to a single database. For example, you probably don't want to send every "someone moused over this ad" event in Japan to a datacenter in us-east-1. But if you do the aggregation and storage close to the user, you can emit summaries to that central server, backing some web page where you can see your "a 39-year-old white male moused over this ad" count go up in real time.

How important ads are is debatable, but if you're an ad company and this is what your customers want, it's an implementation that you might come up with because of the engineering practicality.


I have worked on systems that used Oracle Materialised Views for this. The aggregates get updated in realtime, and you don't need to run a heavy query every time.

I'll use a contrived example here to explain what the value of streaming the data itself is.

Let's say you run a large installation that has a variety of very important gauges and sensors. Due to the size and complexity of this installation, these gauges and sensors need to be fed back to a console somewhere so that an overseer role of sorts can get that big picture view to ensure the installation is functioning fully healthy.

For that scenario, if you look at your data in the sense of a typical RDBMS / Data Warehouse, you would probably want to save as much over the wire traffic as possible to ensure there's no delays in getting the sensor information fed into the system reliably on time. So you trim down things to just a station ID and some readings coming into your "fact" table (it could be more transactionally modeled but mostly it'll fit the same bill).

Basically the streaming is useful so that in near-realtime you can live scroll the recordset as data comes in. Your SQL query becomes more of an infinite Cursor.

Older ways of doing this did exist on SQL databases just fine; typically you'd have some kind of record marker, whether it was ROWID, DateTime, etc., and you'd just reissue an identical query to get the newer records. That introduces some overhead though, and the streaming approach kind of minimizes/eliminates that.


I definitely understand the value of streaming. Your gauges example is great.

What I don't understand is streaming joins. None of your gauge values need to join to anything.

And if they did -- if something needed to join ID values to display names, presumably those would sit in a database, not a different stream?


> And if they did -- if something needed to join ID values to display names, presumably those would sit in a database, not a different stream?

At a high level the push-instead-of-pull benefit here is "you don't have to query the ID values to get the display names every time" which will reduce your latency. (You can cache but then you might get into invalidation issues and start thinking "why not just send the updates directly to my cache instead")

There's also a less cacheable version where both sides are updating more frequently and you have logic like "if X=1 and Y=2 do Z."

For small enough batches streaming and micro-batching do often end up very similar.


Should’ve just cached the output of group bys.

The main benefit isn't necessarily that it's _streaming_ per se, but that it's _incremental_. We typically see people start by just incrementally materializing their data to a destination in more or less the same set tables that exist in the source system. Then they develop downstream applications on top of the destination tables, and they start to identify queries that could be sped up by pre-computing some portion of it incrementally before materializing it.

There's also cases where you just want real time results. For example, if you want to take action based on a joined result set, then in the rdbms world yoy might periodically run a query that joins the tables and see if you need to take action. But polling becomes increasingly inefficient at lower polling intervals. So it can work better to incrementally compute the join results, so you can take action immediately upon seeing something appear in the output. Think use cases like monitoring, fraud detection, etc.


The computational complexity of running an analytical query on a database is, at best, O(N), where N is the size of the database. The computational complexity of evaluating queries incrementally over streaming data with a well-designed query engine is O(delta), where delta is the size of the *new* data. If your use case is well served by a database (i.e., can tolerate the latency), then you're certainly better off relying on the more mature technology. But if you need to do some heavy-weight queries and get fresh results in real-time, no DB I can think of can pull that off (including "real-time" databases).

We apply incremental, streamable "joins" (relational queries) for real-time syncing between application client and server. I think much of the initial research in this space was around data pipelines but the killer app (no pun intended) is actually in app development

I agree completely! We've always talked about this, but we haven't really seen a clear way to package it into a good developer UX. We've got some ideas, though, so maybe one day we'll take a stab at it. For now we've been more focused on integrations and just building out the platform.

Interesting, how can I learn more about this?

Anything you can do with stateful streaming technology, you can do with a database and a message handler. It’s just a question of programming model and scaling characteristics. You typically get an in-process embedded DB per shard, with an API that makes it seem closer to managing state in memory.

Isn't the use case just any time you want a client to essentially subscribe to an SQL query and receive message every time the result of that SQL query changes?

This is extremely common in trading systems where real time data is joined against reference data and grouped, etc for a variety of purposes including consumption by algorithms and display.

Probably related to the fundamental problem of joining distributed data within CAP constraints. Virtually all distributed databases offering full SQL are CP (that is, they assume no nodes will be down otherwise the data won't return).

If you have distributed data, the join will get calculated by SOME node in the network, and the data will have to be streamed in and joined by the central processor. Even with modern meganodes, for BigData marketing you have to handle arbitrarily sized datasets, and that means streaming data into the processing nodes working memory.

Of course there are ways to distribute join calculation (sometimes) as well, but you're still talking merging streams of data coming into processing nodes.

Now, if you have to handle AP/eventually consistent models, then it REALLY gets complicated, and ultimately your huge massive join (I'm assuming a join of tables of data, not just a denormalization join of a single row/primary key and child foreign keys) is a big eventually consistent approximation view, even without the issue of incoming updates/transactions mutating the underlying datasets as you stream and merge/filter them.


A couple of years ago Materialize had all the buzz, not sure what is the difference.

https://materialize.com/


Materialize is powered by timely dataflow and differential dataflow, Estuary Flow uses a streaming mapreduce architecture: https://estuary.dev/why-mapreduce-is-making-a-comeback/

Streams are conceptually infinite, yes, but many streaming use cases are dealing with a finite amount of data that's larger than memory but fits on disk. In those cases, you can typically get away with materializing your inputs to a temporary file in order to implement joins, sorts, percentile aggregations, etc.

Yes, and this is an important point! This is the reason for our current approach for sqlite derivations. You can absolutely just store all the data in the sqlite database, as long as it actually fits. And there's cases where people actually do this on our platform, though I don't think we have an example in our docs.

A lot of people just learning about streaming systems don't come in with useful intuitions about when they can and can't use that approach, or even that it's an option. We're hoping to build up to some documentation that can help new people learn what their options are, and when to use each one.


A large part of my job in the last few months has been in the form figuring out how to optimize joins in Kafka Streams.

Kafka Streams, by default, uses either RocksDB or an in-memory system for the join buffer, which is fine but completely devours your RAM, and so I have been writing something more tuned for our work that actually uses Postgres as the state store.

It works, but optimizing JOINs is almost as much of an art as it is a science. Trying to optimize caches and predict stuff so you can minimize the cost of latency ends up being a lot of “guess and check” work, particularly if you want to keep memory usage reasonable.


Can you explain why streaming joins are necessary. All examples I've seen are bad. For example joining books and author as a stream seems ridiculous, why couldn't the author come up with a better example that is realistic.

Imagine a system monitoring payment transactions. Each transaction stream (e.g., purchase events) could be joined with customer account data (e.g., past purchasing patterns or blacklist flags). Streaming joins enable flagging potentially fraudulent transactions leveraging live context.

JOINs are just hard period. When you're operating at a large scale, you need to be thinking about exactly how to partition + index your data for the types of queries that you want to write with JOINs.

Streaming joins are so hard, that they're an anti pattern. If you're using external storage to make it work, then your architecture has probably gone really wrong or you're using streams for something that you shouldn't.


The ability to express joins in terms of SQL with Estuary is pretty cool. Flink can do a lot of what is described in this post, but you have to set up a lot of intermediate structures, write a lot of Java/Scala, and store your state as protos to support backwards compatibility. Abstracting all of that away would be a huge time saver, but I imagine not having fine grained control over the results and join methods could be frustrating.

Flink does have a SQL join now that you can make work. Streaming joins remain a hard problem, though and, imo, SQL doesn’t map nicely onto streaming systems.

"Unlike batch tables, streams are infinite. You can't "just wait" for all the rows to arrive before performing a join."

I view batch tables as simply a given state of some set of streams at a point in time. Running the same query against "batch" tables at different points in time yields different results (assuming the table is churning over time).


Your mental model is spot on and described quite well here: https://current.confluent.io/2024-sessions/streaming-queries...

I think it should be possible to create a compiler which transforms arbitrary sql queries into a set of triggers and temporary tables to get incremental materialized views which are just normal tables. Those can be indexed, joined etc. no extra services needed. Such an approach should in theory work for multiple relational database systems if it's all adhering to standards.

If both inputs are ordered by a subset of the join key, you can stream the join operation. It depends on your domain whether this can be made the case, or course. If one of the two join operands is much smaller than the other, you can make the join operation streaming for the larger operand.

> Streaming data isn't static like tables in databases—it's unbounded, constantly updating, and poses significant challenges in managing state.

I don't really see the difference between tables & streams. Data in tables changes over time too. You can model a stream as a table with any degree of fidelity you desire. In fact, I believe this could be considered a common approach for implementing streaming abstractions.


When one queries a table though, it's only query at one point in time. Querying a stream implies that your result set is a stream as well, which introduces a whole separate set of complexities to worry about both as an implementor of the query engine and a client.

It seems intuitive to me that a correct streaming join is impossible without an infinite buffer and strong guarantees on how events are ordered. The number of real world systems offering both of those guarantees is zero. Anyone espousing streaming joins as a general solution should be avoided at all costs, particularly if they have a title that contains "architect" or "enterprise" (god forbid both in the same title).

At best, it is a trick to be applied in very specific circumstances.


A streaming join indeed requires an unbounded buffer in the most general case when inputs keep growing and any input record on one side of the join can match any record on the other side. However, it does not require inputs to be ordered. An incremental query engine such as Feldera or Materialize can handle out-of-order data and offer strong consistency guarantees (disclaimer: I am a developer of Feldera). In practice, unbounded buffers can often be avoided as well. This may require a specialized join such as as-of join (https://www.feldera.com/blog/asof-join) and some GC machinery.



Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: