Hacker News new | past | comments | ask | show | jobs | submit login
Go Concurrency Patterns: Pipelines and cancellation (golang.org)
247 points by enneff on March 13, 2014 | hide | past | favorite | 32 comments



The beauty of Go is, I have been writing these pipeline functions without even being explicitly taught the pattern.


Likewise, it simply makes sense if you have a proper understanding of channels and go-routines.


Note the use of receive-only channels:

    func merge(cs ...<-chan int) <-chan int {
Inside the function you cannot send to a member of cs - it will give you a compile-error.

Outside the function you cannot send to its return value.

An example to play with: http://play.golang.org/p/6AJLpZkLBe


I've cleanly used that in a lil' game I'm working on.

Players send their move to the game on a channel they expose through Move(). They receive regular updates of the game state on a channel they expose through Update().

    type Player interface {
        Name() string
        Move() <-chan Move
        Update() chan<- State
    }
> https://github.com/aybabtme/bomberman/blob/master/player/pla...

The game reads the player moves, if moves are available[1]:

    for pState, player := range g.Players {
        if pState.Alive {
            select {
            case m := <-player.Move():
                movePlayer(g, board, pState, m)
            default:
            }
        }
    }
> https://github.com/aybabtme/bomberman/blob/master/bomberman....

The game sends each turn's update to players that are ready to listen for it:

    for pState, player := range game.Players {
        pState.Board = board.Clone()
        pState.Turn = game.Turn()
        select {
        case player.Update() <- *pState:
        default:
        }
    }
> https://github.com/aybabtme/bomberman/blob/master/bomberman....

This leans to surprisingly clean implementations:

  * A websocket player [2]
  * A keyboard player [3]
  * A random AI player [4]
Note that the rest of the game (`bomberman.go`) is a hairy ball that needs refactoring.

[1]: this, to prevent an unresponsive player from hanging the game, or an AI player from unfairly computing longer than it's opponent

[2]: https://github.com/aybabtme/bomberweb/blob/master/player/pla...

[3]: https://github.com/aybabtme/bomberman/blob/master/player/inp...

[4]: https://github.com/aybabtme/bomberman/blob/master/player/ai/...


> In Go, we can do this by closing a channel, because a receive operation on a closed channel can always proceed immediately, yielding the element type's zero value.

    for n := range c {
        select {
        case out <- n:
        case <-done:
        }
    }
This, is brilliant.


I wasn't quite getting that code when I first read it.

Now I can see it allows you do "discard" all the pending reads from the input channel (and avoid writing to a possibly-closed 'out' channel), but wouldn't it be better to break the loop in this case for an immediate exit?

i.e. go for 'interrupt' semantics, rather than 'drain'?


Yes, interrupt is better, and the article explains how to do that a few paragraphs later:

        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
If you allow pipeline stages to interrupt their receive loop instead of draining the inbound channel, then _all_ send operations need to be governed by done. Otherwise the interrupted receive loop may block the upstream sender.


The loop (particularly `range c`) keeps getting element out of `c` and assigning it to `n`. If it's "done", then `n` wouldn't be sent into `out`, but still gets read from the channel `c` constantly. In other words, it's the loop itself that makes sure everything sent into channel `c` is consumed.

If `break` is used, for loop is terminated once it's "done", hence nothing would be reading from channel `c`. The previous station in the pipeline would then block at sending operation.


I think its so that the channel is empty, and will have no references so it will be garbage collected.


I use channels to single-file data to a SQLite db. Data gets pushed into channels from multiple users / locations of the web app. A single goroutine (concurrently running function) reads from the channels one channel at a time and, for each channel, writes everything in the channel to the db in a single transaction. SQLite isn't the best at concurrency. Go's channels help resolve that issue nicely.


Cool. Can the concurrency guru's out there tell me what is the difference between the "Go Concurrency Pattern" and the Java SingleThreadExecutor? (http://docs.oracle.com/javase/6/docs/api/java/util/concurren...)

  ExecutorService pipeline = ExecutorService.newSingleThreadExecutor();

  FutureTask<String> step1 =
       new FutureTask<String>(new Callable<String>() {
         public String call() {
           return "step1";
  }});

  executor.execute(step1); // execute single-step pipeline
  
  step1.cancel(true); // interrupt the task


There is no "Go Concurrency Pattern" (singular). The article demonstrates some ways to use Go's concurrency primitives.

Obviously there is a massive syntactic difference between your Java example and the equivalent Go code, but the other major difference that jumps out at me is that cancellation is baked into the Java library, whereas in Go the programmer determines how each of their pipeline stages should be shut down.


> Obviously there is a massive syntactic difference between your Java example and the equivalent Go code

With Java 8 and lambdas, syntax will be much cleaner, i.e. you can write something like: executor.execute(() -> "step1");


To me, the major difference is that the Java solution gives you one solution. The Go solutions gives you the building blocks.

The latter has the advantage you get to recompose the system in new ways, yielding greater flexibility.

In Java, a better compositional framework is probably Erik Meijer's et.al's RxJava. Haskell also provides better compositionality due to it's functional structure and lazy evaluation semantics.


I guess an obvious difference is that Go has these stuff built into runtime and syntax, which makes those light-weight threads / tasks a first class thing.

Furthermore, is there a way in "Java SingleThreadExecutor" and its related components that allow automatic scheduling tasks on different threads? For example, you are on a quad-core and you want to have 4 threads running together, with millions of tasks mapped to 4 threads and dynamically balanced.


I believe that's what the ForkJoinPool is for.

http://docs.oracle.com/javase/7/docs/api/java/util/concurren...


Sure, you can replace SingleThreadExecutor by Executors.newFixedThreadPool(4)


I think one huge difference is that a channel in Go is a stream of data, in which each element can cause the consuming code to wait for new responses. In your example, the future can only return one bit of data.

Afaik, what Go does is just an implementation of http://en.wikipedia.org/wiki/Communicating_sequential_proces... which is also available in some other languages (Clojure is the one that comes to mind for me). In Java, I think a closer analogy is a http://docs.oracle.com/javase/7/docs/api/java/util/concurren... which lets the producer and consumer threads automatically work in lock step as well.


> I think one huge difference is that a channel in Go is a stream of data, in which each element can cause the consuming code to wait for new responses. In your example, the future can only return one bit of data.

In java you also can do something like:

List<Future<Long>> futures = new ArrayList<>();

for(int i = 0; i < n; i ++) futures.add(some parallel execution logic);

long count = 0;

for(Future<Long> f: futures) count += f.get();


I think the main difference is the way the language and runtime collaborate to schedule multiple events. Note the use of the 'select...case' in the example code.

These slides go into some more examples: http://talks.golang.org/2013/advconc.slide

You end up with a really nice "actor model", where you have a goro which owns some state responding to incoming messages over channels and sending replies.

Slide 24 gives the general structure, but it's a good read.


I'm a C++ developer by day, and I haven't tried Go yet. This looks really intresting. There is a massive gap for some Dataflow Programming based languages; not sure Go is quite there yet, but we're moving in the right direction.


The future of programming is somewhere in the field of dataflow and (functional) reactive programming. It brings easy state to the declarative programming world.


Said often and probably false. Neither frp nor dataflow are very expressive. Probably something still imperative but more reactive is the future.


Great explanation in document form of these patterns. Also very succinct and easy to reference.

For those that would rather read this in presentation form: http://talks.golang.org/2012/concurrency.slide#1

And for those that would rather watch a video presentation: https://www.youtube.com/watch?v=f6kdp27TYZs


Both those links are in the original article, but they are not the same content.


Node.js has another beautiful way of doing this. It's called streams there.

https://github.com/substack/stream-handbook

http://nodejs.org/api/stream.html


Interesting, I'll have to compare pipelines in Go to pipelines in Haskell.


> Interesting, I'll have to compare pipelines in Go to pipelines in Haskell

check out the "power series, power serious" paper by doug-macllroy. he does that...


As jerf said, this is a little different since it's comparing to lazy io rather than pipes or conduit.


the 'squinting at power series' paper by mcllroy is very cool demonstration of this technique using new-squeak


To help the lazy:

http://swtch.com/~rsc/thread/squint.pdf

(it's also linked at the bottom of the article)


That appears to compare Go channels to "simple" lazy Haskell. A more apt Haskel comparison would be to something like "pipes" or "conduit", which will be quite different.




Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: