Hacker News new | past | comments | ask | show | jobs | submit login
Show HN: Lightweight Threads, Channels and Actors for the JVM (paralleluniverse.co)
116 points by pron on May 2, 2013 | hide | past | favorite | 54 comments



Ignorant newbie here, but would love to hear views on this: I always treat with great skepticism claims that you can implement an inherent operating system function (eg: threads) on top of said operating system more efficiently than can be done by the OS itself. Usually it means the implementor simply didn't understand the next level down (eg: how the kernel works) and therefore couldn't tune it to their needs. But they understand the next level up extremely well. And there's nothing wrong with this, especially since one is platform independent and the other is (usually) very tied to specifics.

The one thing that seems evident to me is that native threads, in Java, all require a stack frame and this consumes memory. At some point millions of threads will, if nothing else, require gigabytes of memory for stack space. However in this era when the kind of computers that are likely to run these applications can easily have hundreds of GB of memory, I'm not even sure if that is an inherent limitation.

So to tldr; - can someone give me the insight, what is it that makes this more efficient at the language layer than the kernel layer? Why can a high level language magically scale to millions of threads and the OS can not? What is it that allows this?


I don't disagree with other answers, but I believe the situation can be described more easily: it isn't the same primitive, so, it isn't actually the case that someone is attempting to provide the same thing but faster: the OS is giving you "pre-emptive threads", whereas these alternatives are giving you "cooperative threads".

The argument is that for restricted use cases "cooperative threads" (which are the "dual", in a mathematical sense, to "evented" execution) are going to be faster than pre-emptive threads (which would require locks around even very short shared data usage, due to the unpredicability of the scheduler).

If the OS provided true cooperative multitasking, maybe it could do it faster, but that's something that has been pretty much decided to be a flawed OS primitive between processes ("OMG Mac OS 9 / Windows 3.1" ;P), and within a single process may as well be implemented in userland with little performance loss.


GHC's runtime (an implementation of Haskell) uses pre-emptive scheduling for its threads and has no problem handling millions of lightweight threads.


This starts turning into a semantics problem. GHC is a compiler that modifies the code it has compiled to add "synchronization points" that are used under a cooperative threading model, allowing it to know with certainty that certain kinds of operations or even functions it detects to have certain properties will not be pre-empted. Given that the overall language semantics are then pure functional, the costs associated with pre-emptive scheduling are removed, but again this is only because they "cheated" and didn't build a real "preemptive" scheduler: tasks must cooperate.

In fact, there are trivial kinds of operations you can perform (such as involving FFI) that simply will never pre-empt. The primitive the operating system provides, which I maintain is a fundamentally different primitive than what people are building in these user-space modifications, is "no matter what you do, whether on accident or on purpose, whether with benign or malicious intent, you will be time-sliced; your time-slicing will thereby happen at the discretion of the system, and you will not be trusted to request or demand excessive modifications to your time". GHC is not some magic exception: it is still cooperative multitasking.


Good question. In general the non-preemptive green-thread/fiber/lightweight-thread is a pretty well understood notion. Some OS support it, e.g. Windows supports manually scheduled fiber. However, OS support is not universal. It will take time, just like the transition from process to thread. Most supports come in the form of user-mode library since it doesn't need kernel-mode support.

Native thread needs stack which can be substantial, usually ~1M. Lightweight thread just needs a data structure to hold its data, usually hundreds of bytes to a few K. It's a factor of 1000X to 10000X.

Besides the memory advantage, lightweight threads can have performance advantage over native threads since the switching of lightweight threads on the same CPU doesn't need to do a context-switch, which can have substantial performance penalty as the L1/L2 cache and all the registers of the CPU need to be flushed and reloaded. On a multi-cpu system, memory barriers need to be crossed, cache needed to be sync'ed across CPU, and the TLB might be flushed as well, depending on how the OS implements the memory model of a thread.

Lightweight thread does require more attention from app developers since they need to worry about manually yielding now.

Java has great support on NIO which when used with lightweight threads can provide amazing scalability and performance boosts. See Netty and its friends.


Think of threads as state machines. At the OS level, you have to track a lot more, because you have to service any process that can create a thread. Within your constrained environment, your 'thread' can be something as simple as a small data structure or record that does something when a message is passed to it or a function is called on it. The services you provide to each of your 'threads' can be lightweight, intelligently aggregated and prioritized, and can have only the memory allocated that is needed for the representation of that structure.

Think of a thread as if it were an object in C#, Java, whatever. Stuff happens to it, and it does stuff, but it's all based on propagation of state changes. It can be processing in serial, but because of the abstraction, it will appear to be concurrent, and therefore, effectively BE concurrent.


http://stackoverflow.com/questions/2708033/technically-why-i...

No concrete benchmarks but everything I can find makes the same claim. A context switch in erlang is approx 20 ns, and a OS level context switch is between 1000 and 2000 ns. Mostly because it can make more assumptions about state, less processor state flushing etc.


In general, higher levels have more knowledge of the specific problem domain, and can therefore make more assumptions, while the kernel has to cater to all needs.

In this particular case the fork/join scheduler supports the constant creation/destruction of new fibers, and of one fiber constantly "waking-up" other blocked fibers. Even if OS task switching is good, OS threads weren't meant to be short-lived and quick to start.

That said, OS task-switching can be quite good. That's why I've left the option of running Quasar actors tied to a thread rather than a fiber. I've tried to hide the implementation details as much as possible from the user of the API.


Selective Receive sounds great. Any idea how to handle messages that continue to pile up behind your actor when they're never handled? Do they get culled somehow after a period of time? If not, how do you handle the inherent memory leaking where every actor piles up messages that were never handled, and wastes processing time by replaying them every time you do handle a message? Works okay when you have lightweight processes that are completely independent, like in Erlang - on a monolithic process like the JVM, not so much.

Also, with the lightweight threads using CPS - how do you prevent the kernel from deciding to do some housekeeping tasks on your core? Using APIC or something in Linux to assign the JVM process exclusively to cores so you're guaranteed no interruptions? User-level hardware affinity is not exactly the JVM's strong point.


Regarding selective receive: the messages aren't replayed whenever a new message comes along. The receive operation keeps a pointer into the queue to the last message scanned. Whenever the inner receive returns, the outer receive continues to scan the queue wherever it left off. Now, in general it's a good idea to use bounded queues so messages don't pile up indefinitely. When the queue overflows, the queue's owning actor (the receiver) will get an exception. When it gets the exception, it will either want to terminate or flush the queue.

If you need affinity, I recommend Peter Lawrey's Java-Thread-Affinity library (https://github.com/peter-lawrey/Java-Thread-Affinity).


If you're not replaying unhandled messages, you're not doing selective receive. To quote LYSEFGG, "Ignoring some messages to handle them later in the manner described above is the essence of selective receives" (http://learnyousomeerlang.com/more-on-multiprocessing). Erlang also doesn't limit mailbox size, so while it's great that you offer bound mailboxes (which is also great for performance since they can be array-based), it's not quite as flexible. And since you haven't begun to implement supervision or OTP, you'll have to handle failure as well when you break your bounds.

I'm a big fan of Martin Thompson's Mechanical Sympathy concepts, and I'm very intrigued in Peter Lawrey's work with Chronicle as well. That said, that hardware affinity library relies on native C code, and you better know what you're doing when you put it in. The topology of the CPUs and locality mean you have to be smart in your assignments, lest you end up message passing via QPI/Hypertransport between sockets at a latency of ~20ns/message. Point being, either be intimately familiar with your box and reconfigure for each kind on which you deploy, don't ever use a hypervisor, or pin and pray.

Are you able to introduce bulkheads and failure zones with your lightweight threads via CPS? If not, isolation of dangerous tasks on a thread that could impact other actors could be an issue. Akka does this by allowing you to specify what thread pool (preferrably forkjoin-based) you want to use for each actor.

Look, this is neat stuff you're doing. I'm not concerned that you don't like Scala, but Akka can be used from Java as well so that's a non-argument. It's merely another approach. And while you certainly CAN block in an Akka application, there are plenty of tools for asynchronous coding in Scala (Futures, Async) to help you avoid that and only block when you absolutely must.


The skipped messages will be replayed in the "outer" receive. Obviously, selective receive has its drawbacks, but it's part of what makes Erlang simple, and it can significantly help in modeling complex state transitions.

And yes, you can assign a fiber to a ForkJoinPool of your choosing (although I'm interested in what a "dangerous task" may be).


I agree that using selective receive helps in dealing with messages that arrive out of the order of a specific state transition. Akka gives users the ability to stash messages if they want to. On the JVM, a long-running actor-based application (which is one of the reasons for using actors in the first place) can struggle with it. It's one of the reasons the original Scala Actor library is no longer in use, though there are other important reasons - such as Akka's use of ActorRef, analogous to Erlang's PIDs, which mask the instance of an actor from those who wish to communicate with it, as well as it's physical location. As you scale actors across a cluster of machines, that becomes really useful.

That's great about assigning the fiber to a FJP. A dangerous task would be anything that could take down an actor, which can be worrisome depending on what state the actor is holding. There are varying kinds of such state, including that which can easily be retrieved again from an external source, that which is "scratch" data and inconsequential if lost, and that which cannot be recovered if lost. In actor-based applications, we want to encapsulate mutable state and use message-handling single-threaded interaction to prevent concurrency issues, right? If we're going to do something that could cause the actor to fail and risk losing data, we want to export that work along with the data needed to perform the task to another actor and let IT fail rather than risk the important one. There are ways to pass such data between incarnations of an actor on the JVM by carrying it with the Exception, but it's not free and you have to know when to use it.

So a dangerous task could be asking for data across an unreliable network or non-replicated source, it could be dividing by 0, anything that could cause typing errors (even in Erlang), you name it.


But how would a dangerous task affect the entire pool?

Also, I don't know if there should even be "important actors". Like in Erlang, we want to let it fail. Important data should be kept in a shared data structure that supports good concurrency, not in actor state. Like I said in the post, I don't think every aspect of the application should be modeled with actors.


With a thread pool shared by actors, even yours, if one of the actors fails, that thread is gone until the pool creates a new one (as needed). That's one less available thread until the recreation occurs. To minimize the impact on other actors, you put known dangerous tasks on their own small pool so that their probable thread death has no impact on others.

What you're not seeing is the relevance of a supervisor hierarchy and OTP. Yes, you want to let it crash. But you want isolation of failure as well, and only with failures of increasing criticality do you want it to be escalated through the hierarchy. There is a difference between a database call failing because a SQL command failed and all database interactions failing due to network partition. OTP via Erlang and Akka allow you to model that in your actor tree.

Important data kept in a shared data structure? Globally visible? Managed with what, STM? That won't fly at scale - STM is great only when you're dealing with small datasets that don't change very often. Immutable, persistent data structures? Also not good at scale due to fixed size allocations constantly happening as structural sharing is enforced. Allocations are cheap and hopefully the objects are short-lived and GC-friendly, but it's still far from a free ride.

The whole point of actors is a concurrency abstraction. They are meant for isolating mutable state from being affected by multiple threads at the same time. OTP and supervision is just a nice way of organizing them and learning when failure occurs on another thread.

Should an entire app be modeled with actors? Probably not. A subsystem certainly can be. Depends on what you're doing, of course.


How would a failed actor (huh :)) take down the thread? All exceptions thrown by the FJTasks tasks are caught.

The shared data structure was a reference to my previous post, not to STM: http://blog.paralleluniverse.co/post/44146699200/spaceships.


Am i the only one who had to smile at the sentence "At Parallel Universe we develop complex data structures and distributed data grids, which require a lot of low-level programming, so I do most of my work in Java."? :)

Also, i wish they would have some Java code on the page? As i understand it's available for Java as well? I love the actor model, but i have no intention to learn clojure.


Quasar is Java (Pulsar is a thin Clojure API to Quasar). There's no documentation yet (there will be soon), but the project page directs you to some examples.

All the code examples in the post are in Clojure because I wanted to demonstrate how the Erlang model is fully implemented, and the Clojure API easily mimics Erlang's. The Java API doesn't have pattern matching, and so would make it harder to see the resemblance to Erlang.

But as good as Java is for low-level, high-performance code, and even though I don't find its verbosity to be problematic in the least, Clojure is a beautiful, elegant language worth learning even if only for its concurrency and state-management philosophy.


I think the OP was smiling at the notion of Java as low-level. It's anything but.


How does this compare with http://code.google.com/p/jetlang/ ?

How does this work with garbage collection? Is the GC per-process ("fiber"), like in Erlang?


Jetlang doesn't offer lightweight-threads. It uses an event-driven processing of messages (like Akka), so fibers can't block and you can't do selective receives.

The JVM's GC isn't per actor (I know actors are called processes in Erlang, but let's keep the nomenclature consistent), but its GC is extremely advanced, and some implementations work on a per-thread basis. The ramifications are that we can't offer the same level of isolation as Erlang, at least not on HotSpot. An actor could theoretically produce a particular kind of garbage that will cause a GC pause to the entire system. But other than isolation, the JVM is very performant (much more than Beam), and handles concurrency extremely well.

I don't understand your last question (on load distribution).


Re: distribution, I just deleted this question from my comment. I had not payed attention to the item in "next steps" in your post which mentions Galaxy.

I'm excited to see how you guys go about trapping errors and such, as you mention in the "next steps."

Thanks for answering my small questions.


You can do selective receives by creating and disposing channel subscriptions according to actor state. And a fiber can always spawn a conventional thread to block and publish a message when done.


Jetlang is most analogous to Go's goroutines - typed unbuffered channels backed by a pool of OS threads, where a channel send always "happens before" the corresponding channel receive.


  There have been several attempts of porting actors to the 
  JVM. Quasar and Pulsar’s main contribution — from which 
  many of their advantages stem — is true lightweight 
  threads[1]. Lightweight threads provide many of the same 
  benefits “regular”, OS threads do, namely a single, simple 
  control flow and the ability to block and wait for some 
  resource to become available while in the meantime 
  allowing other threads to run on the CPU. Unlike regular 
  threads, lightweight threads are not scheduled by the 
  operating system so their context-switch is often faster, 
  and they require far less system resources. As a result, a 
  single machine can handle millions of them.
http://blog.paralleluniverse.co/post/49445260575/quasar-puls...

Excellent area to contribute to. In 2006-2008 there was Kilim, which uses some code-rewriting to accomplish similar greenthreading on the JVM to support an actor system with extremely lightweight lock-ree message massing.

  Kilim comfortably scales to handle hundreds of thousands of actors
  and messages on modest hardware. It is fast as well – task-switching
  is 1000x faster than Java threads and 60x faster than other lightweight
  tasking frameworks, and message-passing is 3x faster than Erlang (cur-
  rently the gold standard for concurrency-oriented programming).

from https://github.com/kilim/kilim/raw/master/docs/kilim_ecoop08...

Kilim: https://github.com/kilim/kilim


fwiw, he did in fact mention Kilim in the footnotes. Sriram also chimes in on the comments.


His point about Kilim being a monolithic solution is well taken- getting all three of these right (from both the end-user perspective and transformational perspective) is hard!

  (i) ultra-lightweight,
cooperatively-scheduled threads (actors), (ii) a message-passing frame- work (no shared memory, no locks) and (iii) isolation-aware messaging.

There's a stackoverflow thread on continuation libraries for the JVM. There's some coverage of the unnamed coroutine library in in Quasar-

http://stackoverflow.com/a/4687050/72070

Thanks for the note swannodette, I'd missed the reference.


Could you enumerate your specific perceived shortcomings of akka that you allude to in blog post?


Elsewhere in this discussion-

  Jetlang doesn't offer lightweight-threads. It uses an 
  event-driven processing of messages (like Akka), so fibers 
  can't block and you can't do selective receives.
https://news.ycombinator.com/item?id=5646097

Akka relies on operating system threads- when you block in Akka, you block in real life.


According to this Akka document - http://doc.akka.io/docs/akka/1.3.1/scala/dispatchers.html

"Akka supports dispatchers for both event-driven lightweight threads, allowing creation of millions of threads on a single workstation, and thread-based Actors, where each dispatcher is bound to a dedicated OS thread."

I also, would like to see the perceived shortcomings of Akka, since lightweight threads can't be the problem.


Akka calls them lightweight threads, but they really aren't as they can't be blocked. In short - they're not implemented as continuations.

Basing the implementation on real lightweight threads gives you selective receive and other goodies mentioned in the post, while maintaining the API simple. I think Quasar/Pulsar are much simpler than Akka, and they will stay simpler for said reason.

All in all, Akka feels a lot more complicated than Erlang. Also, Scala isn't everyone's cup of tea, and Akka doesn't mesh well with Clojure.

Quasar tries to join the power of Erlang with the power of the JVM. Pulsar tries to join the beauty and elegance of Erlang with the beauty and elegance of Clojure.



All well and good, another measure of "lightweight" that is often quoted is a little over 300 words of overhead/erlang BEAM process and 300 bytes/akka actor, do you have comparables for Quasar?

http://www.erlang.org/doc/efficiency_guide/processes.html

http://doc.akka.io/docs/akka/snapshot/general/actor-systems.... (300 bytes: way at the bottom


Quasar actors consume even less memory. An idle actor occupies about 500 bytes.


http://thornydev.blogspot.com/2013/01/go-concurrency-constru...

Michael Peterson has also done some good work implementing go-style concurrency in clojure


The subtitle is a better fitting title:

"Lightweight Threads, Channels and Actors for the JVM".


Interesting ! Regarding to your own Queue implementations, did you consider using the disruptor (http://lmax-exchange.github.io/disruptor/ - which is actually an advanced ring buffer)? Martin Thompson was also involved in that project.


Yes. I've been asked that elsewhere. See my reply here: http://www.java-gaming.org/index.php/topic,29466


I am newbie to go, but running the go version took about 186.053167ms, wonder how to explain this behavior http://play.golang.org/p/flK5QV-mDC Go Version : devel +740d244b2047 Thu May 02 18:59:39 2013 -0700


What was MAXPROCS set to? It's a lot faster to run this particular benchmark with only one thread.


1


Yeah, that's what I figured :) Set it to the number of (virtual) cores on your machine.


I didn't see anything in there about ensuring that the messages passed between actors are immutable. Did I miss it?


I've already mentioned Kilim in this discussion- it's a JVM bytecode rewriting framework that implements lightweight greenthreaded & isolated actors. Perhaps their introductory text will be of value for highlighting the value of immutability & it's reciprocal side, isolated messaging-

  The “Actor” model, espoused by Erlang, Singularity and the 
  Unix process+pipe model, offers an alternative:
  independent communicating sequential entities that share
  nothing and communicate by passing messages. Address-space
  isolation engenders several desirable properties: component-
  oriented testing, elimination of data races, unification of
  local and distributed programming models and better optimisation
  opportunities for compilers and garbage collectors. Finally,
  data-independence promotes failure-independence an exception in
  one actor cannot fatally affect another.
https://github.com/kilim/kilim/raw/master/docs/kilim_ecoop08...

I personally am not a fan of prohibiting people from doing things (blame Larry Wall, "first postmodern computer language"), and prefer making good tools available for those who do naturally seek immutability or purity or transferable ownership messages (Transferables in web messaging) or other optional ways of restricting themselves, but the actor model is pretty serious about isolation and Kilim is true to that perspective.


It's not enforced by Quasar, and Java, obviously can't ensure that, but in Clojure everything is immutable.


Ah, thanks for the clarification, I missed that Quasar was aimed at Java, which threw me off. (Apparently I can't be expected to read the first sentence carefully.)

After enjoying the benefits of immutability and lightweight message passing, I'd hate to give up the former -- but if you're stuck with Java, I suppose all this is gravy.


You're not "stuck with Java". Sometimes it's the best tool around. In any case, Quasar is the foundation. Pulsar wraps it with a nifty Clojure API. Perhaps we or someone else will come up with APIs in other JVM languages (I have my eyes on Kotlin).


Looks cool, but what's the distribution story? I'm not sure I understand the comparisons to Akka without one.


There's lots of case studies on akka in production, and people using are generally happy but also honest about bottlenecks they hit (more than a few thousand messages in message box etc

http://www.addthis.com/blog/2013/04/16/building-a-distribute...

http://marakana.com/s/akka_hammer_scala_nails,1129/index.htm...

http://corp.klout.com/blog/2012/10/scaling-the-klout-api-wit...

http://blog.kreuzverweis.com/uncategorized/moving-from-osgi-...


We intend to provide distribution on top of Galaxy (http://puniverse.github.io/galaxy/).


Is Pulsar + Galaxy + Zookeeper + presumably some new code/project still simpler than Akka? It sounds kind of complicated.


Galaxy doesn't require Zookeeper (it's an optional dependency). And Galaxy isn't there just to support Quasar/Pulsar. It's there to support a distributed, concurrent, consistent in-memory database. Actors are just part of the story.

To simplify, Quasar/Pulsar + Galaxy is, in Erlang parlance, like Erlang + Mnesia or Erlang + Riak (only with a consistent database that assists in parallelization)


Cool, thanks for clarifying pron. Looking forward to giving it a spin!


Great work Ron!




Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: