Hacker News new | past | comments | ask | show | jobs | submit login
Apache Spark Scale: A 60 TB+ production use case (facebook.com)
254 points by rockyex on Sept 1, 2016 | hide | past | favorite | 38 comments



I tried (and failed after a month of work) earlier this year trying to do ~20TB shuffles with Spark. I felt both relief and frustration reading this post.

Relief: I'm not an idiot, and the problems in the shuffle were likely in Spark and not just me being a beginner user.

Frustration: I wanted to group about 100 billion x 200 byte records into 5 billion groups. This seems like exactly the problem Spark is designed for and is advertised for. I had great difficulty even getting example Spark SQL code (or my own RDD based code) working. I hear so many great things about Spark as a tool for big data, and also "your data isn't big". I considered 20TB on the "low end" of big data, but a seemingly popular and widely used big data tool can't shuffle it without numerous bug fixes and pain on the part of the user. Shuffling 90TB was worth a Facebook blog post! This makes me ask: To all the people using Spark for "big data", how painful is it and how much data are you handling? It appears the answer for >=20TB is "very painful", and for <=5TB I think you're generally in "handle on single node" territory.


One of the hardest parts for me was getting the cluster sized appropriately so that all data stayed in memory. Overflow to disk slows things down a lot. But, sizing the cluster can be tricky if you're generating a lot of data structures in the tasks.

I only use RDDs, so I put a lot of thought into the processing flow so that unnecessary data wasn't shuffled. If you're reducing PairRDDs, make sure that the data is evenly distributed. Also, I'm guessing you read the optimization docs, but a huge amount of network I/O can be reduced by choosing the right types and collections and optimizing serialization. And, of course group within partitions first, then within nodes, then across nodes. And, of course, go for fewer bigger servers with lots of network bandwidth.

There are a lot of tricks, unfortunately. And, since I don't know your experience level I won't bore you with things you probably already know.


I have no practical experience with Spark, but I was wondering where Alluxio fits into what you're describing.

Is it only applicable once the "cluster [is] sized appropriately so that all data stayed in memory" as you mention?


Alluxio can cache objects in memory that are used downstream in other steps or even other jobs.

In Spark, the biggest performance gains are realized when all data stays in memory. But, the Spark memory architecture is kind of opaque. It's pretty well documented, and there are some good blogs out there, but there are a lot of factors in play.

For example, let's say we start with 1 TB of compressed data and take a guess that it will uncompress to 10 TB. So, we do a naive calculation and guess that we'll need 20 nodes, each with 500 GB of memory.

For our example, Spark will be managed by Yarn. Yarn is going to take a portion of each node's memory. I forget how much, but let's say 10%. Then, depending on how many executors we have per node, each will also have memory overhead. Again, I don't know exactly, but let's just say another 10%. Then, some memory will be dedicated to system processes, etc. Then there's the Spark driver. So, after all of that, let's say 25-30% of memory is used outside of the executor. So, that leaves us with 10 executors per node, each with ~35 GB of memory. For maximum performance, we have to ensure that all 10 TB fits across all executors. So, we may actually need 29 nodes, or a lot of data is going to spill to disk.

Then, let's say that our processing job doesn't evenly distribute data across partitions. Some partitions get 5x as much data as other partitions. Again, data spills to disk - or the VM runs out of heap. Or, maybe our 10% compression estimate is off by 5% - more spills to disk.


I think Alluxio helps with the initial data read in the first stage, but subsequent stages' datasets are stored in memory anyway.


I've been working on Databricks for a month now and it is a very unique and satisfying learning experience. I generated a billion n-grams (1-6 word) from 300k docs and thinking in RDDs and Dataframes reminds me of how I felt when I first learned functional programming and GPU coding. RDD flatMap solves problems I didn't even know I had.

I ran a 'small' cluster of 24 nodes (300GB+ of RAM) and while I know I could have done the same thing with super optimized code on a single machine, not having to worry about each node's performance but instead thinking in pipelines is refreshing. My goal was not to use/make optimal tokenizer or stop-word remover but instead to make sure every part of the chain could be done in parallel.

My biggest complaint? Repeatedly having to restart the cluster because nodes stop responding or throw arcane errors. If these reliability fixes were in place last week, I would have easily been 20-25% more efficient with my time. Can't wait till Databricks team deploys this for their users.


What I find most exciting here is that they were able to improve performance by combining the three previous steps into one, in other words asking directly for what they wanted rather than trying to trick Hive into executing it in a certain manner.

Presumably the task had been split up in the past to work around performance limitations of Hive; the fact that they got better performance in a single step shows that the tooling is improving and much less complexity is required to implement this kind of job.


This is very intriguing, I cannot help but wonder how this operation would have performed on something like Google Big Query. I know that it is highly unlikely that Facebook would ever load their data to the Google Cloud Platform, but it would be an interesting comparison.


While both Spark and BigQuery do the shuffle step in-memory, there are some differences[1]:

- BigQuery's execution is pipelined (don't wait for step 1 to finish to start step 2)

- BigQuery's in-memory shuffler is not co-tenant to compute

And, of course, it's one thing to have software and hardware. BigQuery provides a fully-managed, fully-encrypted, HA, redundant, and constantly seamlessly maintaned and upgraded service [2].

[1]https://cloud.google.com/blog/big-data/2016/08/in-memory-que...

[2]https://cloud.google.com/blog/big-data/2016/08/google-bigque...

(disc: work on BigQuery)


Would pipelining help much when the processing job is CPU bound (all cores maxed out)?

Sorry - what does co-tenant to compute mean?


Pipelining helps with hanging chads, tail latency of work steps. If you have a slow worker (due to, say, data skew), entire job slows down. All other workers are sitting idle, waiting for the one worker to finish their piece. Read [0] to see what Dataflow does, and BigQuery/Dremel do very similar stuff to deal with this issue. BigQuery also doesn't have to wait for ALL workers to finish step 1 before proceeding to step 2.

By co-tenant to compute, I mean that processing nodes themselves handle the shuffle in Spark. This can cause non-obvious bottlenecks. BigQuery handles shuffle outside of the processing nodes [1].

[0] https://cloud.google.com/blog/big-data/2016/05/no-shard-left...

[1] https://cloud.google.com/blog/big-data/2016/08/in-memory-que...


I can't speak for big query, but for our open-source BigQuery alternative EventQL [0].

On the upside, massively parallel SQL databases allow you to express what probably took facebook a lot of code in a single SQL query.

On the downside, with any database that supports streaming inserts and dynamic resharding, you will have to eventually write each entry more than once. The more manual approach with spark prevents this.

[0] https://eventql.io


It's exciting to see new innovative open source contributions to the big data space. Since BigQuery has moved on in many ways from the ColumnIO and Dremel concepts described in the Dremel paper[0] with new versions of many of these components (like in-memory shuffle, new storage, new ingest, execution engine, etc), I'd love to learn where EventQL places itself in that gradient.

[0]http://static.googleusercontent.com/media/research.google.co...


My guess is that it wouldn't be too bad, but that at the volumes Facebook are doing, I suspect a lot of their speed is coming from custom optimisations they can make for their use-cases, which Big Query wouldn't have.


Looks awesome, the work on performance especially is much appreciated :D I would love to be able to get flame graphs on my spark cluster per node too - any ideas on how exactly FB does this?


Looks they are using the same library from Presto. You would just add the Java agent line to your Spark config i.e. spark.executor.extraJavaOptions.

https://github.com/prestodb/presto/issues/4004

Although you have to distribute the library across to all of the data nodes (if using YARN) or Spark nodes (if in standalone).


Thanks! I was going to ask where I might find it since Google results were a little thin on the ground.


I knocked up a quick Ansible script this morning. It is _very_ rough right now and is more like 'here's what you could do' (and you have to compile perf-map-agent beforehand), but I got it working on a small cluster: https://github.com/falloutdurham/spark-flame


"While running on 20 TB of input, we discovered that we were generating too many output files (each sized around 100 MB) due to the large number of tasks."

I could be completely missing something here, but to decrease the number of output files you can coalesce() the RDD before you write. For example, let's say you have a 20 node cluster, each with 10 executors, and the RDD action is split into 20000 tasks. You may end up with 20000 partitions (or more). However, you can coalesce, and reduce the number down to 200 partitions. Or, if necessary, you could even shuffle (across VMs but within the node) down to 20 partitions, if you're really motivated.

What am I missing?


"Remove the two temporary tables and combine all three Hive stages into a single Spark job that reads 60 TB of compressed data and performs a 90 TB shuffle and sort."

"As far as we know, this is the largest real-world Spark job attempted in terms of shuffle data size"

I'm far, far from a world class engineer, but I regularly do 90 TiB shuffle sorts. I must seriously be missing something, here.


Have you run into any of the issues mentioned of the article? Some of them are regressions, which version of Spark were you running?

Out of the linked issues these all seem like they would be "easy" to hit given enough data:

https://issues.apache.org/jira/browse/SPARK-13279

https://issues.apache.org/jira/browse/SPARK-13850

https://issues.apache.org/jira/browse/SPARK-13958

https://issues.apache.org/jira/browse/SPARK-14363


Are you using Spark? That's the context.


Yes


Coalescing down to a smaller partition number does decrease the number of output files. But it also decreases parallelism, which isn't expected when processing so large a dataset.

Coalescing makes more sense when some stage of the pipeline dramatically shrinks the amount of data (e.g. grep-ing error logs from all log files) so that successive stages can easily handle the rest of the data with much fewer executors.

(disc.: Spark committer)


Yeah I didn't quite get that part either.


Juding by the JIRA tickets most of these changes/fixes landed in 2.0.0 and some landed in 1.6.2.

I have been debating if I should go ahead and start switching to 2.0.0 and this is pushing me in that direction.


Great write up. I used Hadoop a lot for two consulting customers, and Google's map reduce as a contractor. I found map reduce to be a natural way to process data. That said, Spark is a huge leap forward. I like both the developer workflow using a repl and the machine learning library mllib is excellent.


> I found map reduce to be a natural way to process data.

It is a natural way to process certain types of data in certain ways. While there are lots of use cases like that, I have seen too many efforts to force things into Hadoop MapReduce/YARN or similar platforms just because it is the parallel processing hotness of the year.


Amazing work and nice article! Congrats to the Facebook and Databrick teams!


60T sadly isn't that much. We tried (and failed) to use spark on a several PB dataset and it failed miserably.


Do you have anymore details? What version?


I want to say it was the highest 1.6.x around Feb/March of this year with a few PB (a sample of the real dataset) over an infiniband network. It just broke miserably. Also, java was never terribly good when you want to speak native ibverbs as the jni stuff is just slow.


Couldn't they have just used Postgres on one 4/8-socket server with RAID?

A 60TB dataset can fit in one server. Isn't Spark intended for massive clusters, like dozens or hundreds of servers, over petabytes of data?


Note that it's 60TB compressed

Mostly it's about speed. With multiple machines you can bring more cores to bear for the processing, and have more RAM to cache partial results. Postgres could certainly do the job, but I'd be surprised if it would run within an order of magnitude of these results.


IO is pretty huge as well. You can spend lots and lots and lots of money to buy 1 machine a hard drive that can read 60 TB fast. Or you can have 100 machines with the cheapest possible hard drive and smoke the total IO.


Keep in mind that the systems required depend on the actual tasks, not just whether you can fit the data on a disk. I don't think the FB Newsfeed (show more photos from your cousin because you liked their text post last week...) can be built using only SQL.


they never mention hadoop and yahoo but mention they built hive a dozen times in one paragraph. is this how PR people think open sourced tech should be mentioned/credited? so lame. they can't think beyond their belly buttons.


Not sure what you mean. They only mentioned building Hive once.

And why would they mention Yahoo ? They stopped being particularly relevant in the Hadoop space quite a few years ago.




Consider applying for YC's W25 batch! Applications are open till Nov 12.

Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: