Hacker News new | past | comments | ask | show | jobs | submit login
Siberite: A Simple LevelDB-Backed Message Queue in Go (github.com/bogdanovich)
77 points by Bogdanovich on Oct 18, 2015 | hide | past | favorite | 35 comments



Why would you choose a LSM Tree based storage mechanism for a message queue?

The only reason I can come up with would be because it's a read-to-use library you can just plug in which gives OK performance and some handy features because you can use the KV store for other things. But it doesn't scale well and backups with LevelDB are not really easy either (close DB, copy all files).

Message queues when they are ordered (at least on the local node/queue level) usually just need some kind of append-only log file. You don't do random reads or writes into the middle of the queue, you only modify the head and tail.

InfluxDB, albeit being a time series db has similar write patterns to a message queue, learned it the hard way when they first tried to use a LSM Tree database (LevelDB), then switched to a B+Tree (BoltDB/LMDB) but that also doesn't scale once the DB gets big and the tree has quite some depth. They kindly did a nice writeup of their journey: https://influxdb.com/docs/v0.9/concepts/storage_engine.html

Why not do it simple and use append-only files without complex structure and management?

Check out Kafka for a better storage format for message queues of this kind.

PS: every message queue should first clearly explain what guarantees it provides.


A LSM tree is actually a good idea if you think about it.

The R/W patterns for a message queue are simple:

- Messages are key/value

- key is an autoincrementing id

- Writes are at the end, Reads are from the beginning

- Once a message is processed, it's deleted

So in practice this means that the items are written in an append-only fashion, get merged in bigger chunks, and then get progressively deleted. So at higher levels you don't see the huge latencies due to compaction because all records are deleted. Knowing that keys are only incrementing could also lead to a simple optimization: the compaction phase can be a simple concatenation of files.

So you get an append-only system that progressively removes older entries as they are deleted without resorting to mad science hackery [1]. Why didn't it work for InfluxDB ? All I can guess is that individual entries for each series are all mixed together (InfluxDB wants to be able to manage many series with many tags) and older entries are not deleted as frantically, so you get the latencies we all know with compaction and unpredictable reads.

Now, this is purely theoretical and of course further experimentations are needed to make sure this is correct, but LSM is in my opinion a correct pattern here.

[1] https://gist.github.com/CAFxX/571a1558db9a7b393579


A queue is the correct pattern for a queue. A tree, of any form, offers no advantage.

The InfluxDB experience is definitely illuminating. Their problems with LMDB were mainly due to misuse of the API. https://disqus.com/home/discussion/influxdb/benchmarking_lev...

For batched sequential writes, there is no other DB anywhere near as fast as LMDB http://symas.com/mdb/microbench/ (Section E, Batched Writes)

But even so - the reason LMDB can do this so quickly is because for batched sequential writes it cheats - it's just performing Appends, there's no complicated tree construction/balancing/splitting of any kind going on.

If you know that your workload will only be producer/consumer, with sequentially generated data that is sequentially consumed, it's a stupid waste of time to mess with any other structure than a pure linear queue. (Or a circular queue, when you know the upper bounds of how much data is outstanding.)

As for your initial statement - no, an LSM tree is not a correct pattern here. If your consumers are actually running as fast (or faster) than your producer then it should never flush from Level0/memory to Level1/disk. In that case all you've got is an in-memory queue that evaporates on a system crash.

If your consumers are running slower, that means data is accumulating in the DB, which means you will have compaction delays. And the compaction delays will only get slower over time, as more and more levels need to be merged. (Remember that merge operations are O(N). Then remember that there are N of them to do. O(N^2) is a horrible algorithmic complexity.) LSM is never a correct pattern.


> In that case all you've got is an in-memory queue that evaporates on a system crash.

https://www.cs.berkeley.edu/~brewer/cs262/Aries.pdf

> Remember that merge operations are O(N). Then remember that there are N of them to do. O(N^2) is a horrible algorithmic complexity.

No. Mountains of actual math refute this. LSM-tree merges are O(N log N). This is an Actual Fact.

Read more, kids.


Ah yes, you're absolutely right. O(N log N) because there are log N chunks to be merged.

O(N log N) is still untenable in the long run, nobody has exponentially growing compute resources.


May I also mention that N log N is the total cost of compaction for a DB of size N. You don't perform a compaction on every single write. Amortised per write the cost is more like N log(N)/N == log(N).

Also, N log(N) is nowhere near exponential. O(2^N) would be exponential, and that's not what you have here.


"Amortised per write" - now you're getting down into the constant factors, which Big-O disregards. But you can't ignore them in real implementations. First the actual writes have a 2x constant factor, since you're writing to a WAL in addition to the DB itself.

The original LSM paper claims that writes to Level 0 are free because that's all in-memory. But that's not really true; if you have a stream of incoming writes then everything that goes into Level 0 must eventually be pushed out to Level 1. Buffering doesn't make writes free, it only displaces their occurrence in time.

So you have a rolling merge every M writes. As far as Big-O goes, that's N log(N) / M == N log(N) because Big-O disregards constant factors!

In the context of an implementation like LevelDB, theory and reality diverge even further. Since it's chunking data into 2MB files and deleting them during each merge operation, and also writing a bunch of bookkeeping into Manifest files and other stuff, the number of actual I/Os is much higher. A lot of wasted activity in allocating and deallocating space - filesystem metadata overhead that's also not transactionally safe.

In LevelDB a single merge reads 26MB and writes 26MB at a time to push 2MB of data from a level L to level L+1. So now instead of a single merge op costing only N, it actually costs 13*N. Again, if you're only talking about Big-O complexity you sweep this under the rug. But in reality, this is a huge cost.


Stated another way - assume you want to sustain a user workload writing 20MB/sec, and you don't do any throttling. Level 0 consists of 4 1MB files - it will fill in 1/5th of a second, and then compaction will reduce it by 1MB. After that it will be compacting continuously every 1/20th of a second. To sustain this workload for the 1st second will thus require 17 compactions to Level 1. Assuming an already populated Level 1 and worst-case key distribution that means in 1 second it will trigger compactions that read 238MB and write 238MB to store the incoming 20MB.

Level 1 is only 10MB, so if it was empty it would fill in the first 1/2 second. For the remaining 1/2 second it would trigger 5 more compactions to Level 2, reading 130MB and writing 130MB. If it started out full then this would be 260MB/260MB respectively.

So for a 20MB/sec input workload you would need a disk subsystem capable of sustaining 498MB/sec of reads concurrent with 498MB/sec of writes. And that's only for a small DB, only Level 0-2 present (smaller than 110MB), and excluding the actual cost of filesystem operations (create/delete/etc).

That's only for the 1st second of load. For every second after that, you're dumping from Level 0 to Level 1 at 280MB read and 280MB write/sec. And dumping from Level 1 to Level 2 at 260/260 as before. 540/540 - so a disk capable of 1080MB/sec I/O is needed to sustain a 20MB/sec workload. And this is supposed to be HDD-optimized? Write-optimized? O(N logN) - what a laugh.

Maybe LSMs in general can be more efficient than this. LevelDB is pretty horrible though.


It would only trigger compaction if sst tables have overlapping keys. And if you only write new items, goleveldb implementation would just create 3.7Mb sst tables by default without trying to merge them into bigger chunks (what's the point? they are all sorted and non-overlapping). When you have queue consumption workload it would start merging tombstones with sst tables and since tombstones are also in sorted order it would not pick up multiple sst tables at a time, and just either completely or partially remove stale sst files. I added some more benchmarks including queue packing with 200M messages of 64 byte size, and benchmarks of consumption of 200M messages. The speed is sustainable. https://github.com/bogdanovich/siberite/blob/master/docs/ben...



And you can find a fantastic list of questions about queue guarantees/properties here: https://news.ycombinator.com/item?id=8709146


Yes, goleveldb was chosen because it's a ready to use library with a decent write and read performance, and no external non-Go dependencies. It can also be used to store multiple consumers offsets in future.

Regarding provided guarantees, with simple 'get work_queue' reads it provides at-most-once delivery. With two phase reliable reads 'get work_queue/open', 'get work_queue/close' it provides at-least-once delivery (although message is kept in memory on server during a reliable read and will be lost if you SIGKILL siberite. On SIGTERM and SIGINT siberite will gracefully abort the read and save the message).


I'm puzzled by your mention of consumer offsets.

Indeed, either Siberite is a queue system which purpose is to dispatch each message to one and only one consumer for further processing and which requires the consumers to acknowledge fully processed messages ;

or Siberite is a journal system (in the spirit of Kafka) which purpose is to replay the full log to any consumer asking for it and which offers the consumer a watermark mechanism to keep track of their progress.

In the former case, the queue system is responsible of what to do in case of a missing or late acknowledgement (choosing between "at least once" or "at most once" message delivering). In the later case, the consumers are responsible of how to maintain an atomic view of message consumption and message processing (for instance using a transaction to persist an offset with a state).


Right now it doesn't store any consumer offsets. And you can get either at-most-once or at-least-once guarantees.

But I found the idea of multiple consumer groups per queue very interesting. So basically you would still be able to fetch queue messages as you can do now and it will delete dequeued items, but you would also be able to use something like 'get queue_name:consumer_name' and it will create a consumer group internally with a stored offset and will serve messages using that offset. In case of reliable read failure each consumer group will keep it's own queue of failed deliveries, will check that queue and serve these failed items first. If source queue head has changed and became larger then consumer group offset, then consumer group offset would just start from the source queue head.

This way you can get Kafka-like multiple consumer groups per queue as an additional feature.


Why is it the queues responsibility to store consumer offsets? Consumer is the only side that knows how far along his processing is. Why is the queue storing this data, when all the consumer has to do is tell it: send me events for topic X from point P forward.


If you wanted the consumers to be stateless, assuming they otherwise had a deterministic identity, then you could have the queue operate like a journal internally, but present a unified queue API to consumers.

So the queue keeps track of the high-watermark on a per consumer basis and all the consumer has to do is show up, tell the queue its deterministic name/id (might be driven by imaging, configuration, or SDN), and the queue will serve up the next new item that consumer hasn't seen yet.

This would be handy for really dynamic transient worker topologies because it keeps the mutable state and state tracking concerns entirely outside the transient worker.

That said, I still wouldn't use LevelDB. Unless I was expecting to do multi-attribute range queries or something (now we're well outside queue territory), but even then you're still folding over the data for knowable start/end markers and a linear scan over a binary term file will be faster than the multiple seeks + segment scans that LevelDB requires.


If the consumer is stateless then it needs to acknowledge every received event for it to be reliable. Otherwise the producer may think something is sent when it actually never arrived (tcp connection was closed).

So it's either unreliable or slow.

Also if you have dynamic transient worker topologies, you have to remember those positions. You are saving data for later use, that may never arrive. How long do you keep this data?

Seems like a pretty messy way of doing things.

Completely agree about LevelDB.


TCP would guarantee delivery, but you're right in that you wouldn't know if the consumer actually did anything with the message. It could have crashed on parsing or something.

But moving the concern to the consumer to track the cursor doesn't make the protocol any more stable. To keep a stable cursor, the consumer would need to persist that someplace, which just pushes the acknowledgement to that persistence component instead. If a stable cursor is what you're after, then co-locating it with the durable queue provides a simpler solution with a slightly better consistency guarantee.

The garbage collection problem is a real one, but realistically how many consumers is an infrastructure service like this going to have? Tens? Hundreds? Thousands? Millions? Billions?

No matter which one of those you pick it's a trivially small secondary index to maintain even if you never reaped it. I mean it's a K/V problem (consumer_id -> queue_offest) and there's a K/V store already sitting there. If you didn't want it to grow forever then you could establish a TTL policy via configuration.

The problem you would have is consumers that don't have stable or bounded id's. Like a system that assigns a new id every time the consumer makes a request or the consumer is restarted.


> TCP would guarantee delivery

Calling send just copies the buffer to the kernel/driver. When the call returns you do not know how much of it is actually sent. You might have the situation of the producer thinking it was sent, when it in fact never actually made it onto the network.


In case of reliable fetch failure each consumer group will keep it's own queue of failed deliveries (persisted on disk), will check that queue and serve these failed items first.


This couldn't have come at a better time - I was actually looking for a durable message-queue written in Go. Is there any way to read more about the architecture of this system? I find systems like these to be quite fascinating but taking the time to go through the code can sometimes be very time-consuming. It would be awesome if more projects have a writeup as detailed as cockroachdb[0]!

Aside: There used to be a site sometime back which used to distribute compiled binaries of Go code for all platforms? Is it still up any chance?

[0] - https://github.com/cockroachdb/cockroach#architecture


http://nsq.io/

http://nsq.io/overview/internals.html

The service you are thinking of might be https://github.com/ddollar/godist

You can see the download links on https://github.com/ddollar/forego use it.


It's really simple. Each queue is a separate leveldb database on disk. Messages are stored as key/value using incremental ids. Head and tail of the queue are kept in memory and get initialized on startup via db scan.


Also, you have to be paying one hell of a compaction penalty if this isn't a grow-only dataset. By ordering your keys you're at least minimizing the overhead of compaction on write by utilizing the happy-path for how LevelDB moves data out of the write buffer and into the SSTs.

But deletes are going to have a big impact still, and (working from my failing memory of LevelDB internals) I think might actually be the pathologically sad case.


Why don't you just store the head and tail as K/V entries? You have a durable K/V store at your disposal.


There is no benefit in that except faster startup time. As a downside you'll get a lot head/tail db keys updates.


Fast start times are a valuable thing for a service component.

Stick about 10GB of small entries in it (should be enough to create all the levels) and then see what happens.

Also, you could reserve the persisted [H|T] for controlled shutdown scenarios. Basically anything that isn't complete system failure if you're properly trapping signals.


I added some more benchmarks including packing with 200M small 64 byte messages (20Gb) and consumption of that queue. There is no slowdown because of mass delete. https://github.com/bogdanovich/siberite/blob/master/docs/ben...


Sounds interesting. For my usecases, which require few (< 10) messages/sec and no clustering, would I gain anything by using Siberite over Beanstalk?


You can have large queue sizes (larger than RAM size) and siberite would still consume small amount of resident memory. You basically don't need a separate server with decent amount of memory for it. You can also can get benefit from two-phase reliable fetch - if your client gets disconnected without confirming a message, the message will be served to another client (very convenient if you use amazon spot instances for your workers).


Note that this also means that messages can be delivered more than once and/or that the clients need to remember the messages that they processed. In some setups that can be a showstopper.


Reliable fetch is a feature, not a protocol requirement. You can use simple 'get work_queue' command to just get a message, or you can use 'get work_queue/open', 'get work_queue/close' - two phase fetch if you need a reliable fetch. You can also use 'get work_queue/close/open' command to acknowledge previous message and read a new one.


Ok so you can switch between at-most-once and at-least-once guarantees. While nice to have both options in a message queue, my point still stands.

Each of these have trade-offs and the way it is architectured here, in the at-least-once case you will have to either remember all the processed messages or be prepared to process a message multiple times, whatever that means in your specific use-case.


Can you describe how the queue was represented as key/value?


Yes, as id/value with autoincrement key. Head and tail ids are kept in memory and get initialized on startup via leveldb database scan.




Join us for AI Startup School this June 16-17 in San Francisco!

Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: