Fun story time: I was at twitter for a few years and tended to write quick hacks that people wanted to replace with better engineering.
We never had scala thrift bindings, and the Java ones were awkward from Scala, so I wrote a thrift plugin in JRuby that used the Ruby thrift parser and ERb web templates to output some Scala code. Integrated with our build pipeline, worked great for the company.
I also wrote one era of twitter's service deploy system on a hacked up Capistrano.
These projects took a few days because they were dirty hacks, but I still got a below perf review for getting easily distracted, because I didn't yet know how to sell those company-wide projects.
Anyhow, about a month before that team kicked off Parquet, I showed them a columnar format I made for a hackweek based on Lucene's codec packages, and was using to power a mixpanel-alike analytics system.
I'm not sure whether they were inspired or terrified that my hack would reach production, but I like to think I had a small hand in getting Parquet kickstarted.
Tangential to the topic but regarding the supposed Snowball Effect there is in real life no such thing. I have pushed large 'snowballs' down slopes --in reality they are snow cylinders as shown in the photo-- and they invariably do not get far. The reason being that when one side of the cylinder randomly thickens slightly with respect to the other side this causes the whole thing to turn in the opposite direction.
For example, if the RHS of your cylinder has a slightly larger radius than the LHS the cylinder will commence turning to the left.
The upshot is the thick side picks up more snow than the thin side and the disparity in radii increases more rapidly still. The cylinder becomes a truncated cone which turns sideways and halts!
It is highly dependent on the snow conditions and the recent weather. Sometimes even just the a couple hours are enough to change the conditions to have a good chance of rollerballs. The climate also has an impact, in my experience more coastal areas have more periods when they form.
And in some cases the rollerballs get too tall for the bonding strength of the snow, so they break into parts that can restart the cycle if the slope is steep enough.
Reading through this blog, to me it seems Parquet is lot like ClickHouse native data format.
Best part of ClickHouse native data format is I can use the same ClickHouse queries and can run in local or remote server/cluster and let ClickHouse to decide the available resources in the most performant way.
ClickHouse has a native and the fastest integration with Parquet so i can:
- Query local/s3 parquet data from command line using clickhouse-local.
- Query large amount of local/s3 data programmatically by offloading it to clickhouse server/cluster which can do processing in distributed fashion.
I've been struggeling with a tough parquet problem for a few months now.
I have a 15gb parquet file in a s3 bucket and I need to "unzip" and extract every row from the file to write into my database. The contents of the file are emails and I need to integrate them into our search function.
Is this possible to do without an unreasonable amount of RAM? Are there any affordable services that can help here?
Feel free to contact me (email in bio), happy to pay for a consult at the minimum.
I work with pyspark and parquet quite a lot. I never had to deal with parquet outside spark, but this is how I would do this:
- Write a pandas_udf function in pyspark.
- Parition your data into smaller bits so that the pandas_udf does not get too much data at the same time.
Something like:
```
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
@f.pandas_udf(return_type=whatever)
def ingest(doc: pd.Series):
# doc is a pandas series now
# your processing goes here -> write to DB e.t.c
pd_series_literal = Create a pd.Series that just contains the integer 0 to make spark happy
return pd_series_literal
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("s3 path")
df = df.repartition(1000). # bump up this number if you run
into memory issues
Now the trick is, you can limit how much data is given to your pandas_udf by repartitioning your data. The more the partitions, the smaller the pd.Series that your pandas_udf gets. There's also the `spark.sql.execution.arrow.maxRecordsPerBatch` config that you can set in spark to limit memory consumption.
^ Probably overkill to bring spark into the equation, but this is one way to do it.
You can use a normal udf (i.e `f.udf()`) instead of a pandas_udf, but apparently that's slower due to java <-> python serialization
I just wanted to mention that AWS Athena eats 15G parquet files for breakfast.
It is trivial to map the file into Athena.
But you can't connect it to anything else than file output. But it can help you to for example write it to smaller chunks. Or choose another output format such as csv (although arbitrary email content in a csv feels like you are set up for parsing errors).
The benefit is that there is virtually no setup cost. And processing cost for a 15G file will be just a few cents.
Athena is probably my best bet tbh, especially if I can do a few clicks and just get smaller files. Processing smaller files is a no brainer / pretty easy and could be outsourced to lambda.
Yeah the big benefit is that it requires very little setup.
You create a new partitioned table/location from the originally mapped file using a CTAS like so:
CREATE TABLE new_table_name
WITH (
format = 'PARQUET',
parquet_compression = 'SNAPPY',
external_location = 's3://your-bucket/path/to/output/'
) AS
SELECT *
FROM original_table_name
PARTITIONED BY partition_column_name
You can probably create a hash and partition by the last character if you want 16 evenly sized partitions. Unless you already have a dimension to partition by.
It's been a while (~5yr) since I've done anything with Spark, but IIRC it used to be very difficult to make reliable jobs with the Java or Python APIs due to the impedance mismatch between Scala's lazy evaluation semantics and the eager evaluation of Java and Python. I'd encounter perplexing OOMs whenever I tried to use the Python or Java APIs, so I (reluctantly) learned enough Scala to make the Spark go brr and all was well. Is it still like this?
Same for me, the only reason to learn scala was Spark. The Java Api was messy. And still today, i like Scala, well, many functional languages, but for jumping between projects they are a nightmare, as everything is dense and cluttered.
Our company (scratchdata.com, open source) is literally built to solve the problem of schlepping large amounts of data between sources and destinations, so I have worked on this problem a lot personally and happy to nerd out about what works.
I - by my HPC-background - am wondering quite a bit what happened that 15GB-files are considered large data? Not being a crazy parquet-user, but:
- does this decompress to giant sizes?
- can't you split the file easily, because it includes row-based segments?
- why does it take months to solve this for one file?
As a fellow HPC user, I tried a couple of years ago to do a tricky data format conversion using these newfangled tools. I was essentially just taking a huge (multi-terabyte) 3D dataset, transposing it and changing the endianness.
The solutions I was able to put together using Dask and Spark and such were all insanely slow, they just got killed by Slurm without getting anywhere. In the end I went back to good ole' shell scripting with xxd to handle most of the heavy lifting. Finished in under an hour.
The appeal of these newfangled tools is that you can work with data sizes that are infeasible to people who only know Excel, yet you don't need to understand a single thing about how your data is actually stored.
If you can be bothered to read the file format specification, open up some files in a hex editor to understand the layout, and write low-level code to parse the data - then you can achieve several orders of magnitude higher performance.
I think command line tools is going to be fine if all you do is process one row at a time. Or if your data has a known order.
But if you want to do some kind of grouping or for example pivoting rows to columns, I think you will still benefit from a distributed tool like Spark or Trino. That can do the map/reduce job for you in a distributed way.
Because most people don’t have an HPC background, aren’t familiar with parquet internals, don’t know how to make their language stream data instead of buffering it all in memory, have slow internet connections at home, are running out of disk space on their laptops, and only have 4 GB of ram to work with after Chrome and Slack take up the other 12 GB.
15 GB is a real drag to do anything with. So it’s a real pain when someone says “I’ll just give you 1 TB worth of parquet in S3”, the equivalent of dropping a billion dollars on someone’s doorstep in $1 bills.
How do you see the competition from Trino and Athena in your case?
Depends a lot on what you want to do with the data of course, but if you want to filter and slice/dice it, my experience is that it is really fast and stable. And if you already have it on s3, the threshold for using it is extremely small.
what is your point? They talked about 15GB of parquet - what does this have to do with 1TB of parquet?
Also: How does the tool you sell here solve the problem - the data is already there and can't be processed (15GB - funny that seems to be the scale of YC startups?)? How does a tool to transfer the data into a new database help here?
It might not be fast, but a quick 1-off solution that you let run for a while would probably do that job. There shouldn't be a need to load the whole file into memory.
Have you given DuckDB a try? I'm using it to shuttle some hefty data between postgres and some parquet files on S3 and it's only a couple lines. Haven't noted any memory issues so far
Took your advice and tried DuckDB. Here's what I've got so far:
```
def _get_duck_db_arrow_results(s3_key):
con = duckdb.connect(config={'threads': 1, 'memory_limit': '1GB'})
con.install_extension("aws")
con.install_extension("httpfs")
con.load_extension("aws")
con.load_extension("httpfs")
con.sql("CALL load_aws_credentials('hadrius-dev', set_region=true);")
con.sql("CREATE SECRET (TYPE S3,PROVIDER CREDENTIAL_CHAIN);")
results = con \
.execute(f"SELECT * FROM read_parquet('{s3_key}');") \
.fetch_record_batch(1024)
for index, result in enumerate(results):
print(index)
return results
```
I ran the above on a 1.4gb parquet file and 15 min later, all of the results were printed at once. This suggests to me that the whole file was loaded loaded into memory at once.
with open('tmp/'+DUMP_NAME+'_cleaned.jsonl', mode='w', newline='\n', encoding='utf8') as f:
for row in df.collect(streaming=True).iter_rows(named=True):
row = {k: v for k, v in row.items() if (v is not None and v != [] and v != '')}
f.write(json.dumps(row, default=str) + '\n')
```
My suggestion is to load each row group individually, as they generally will be much smaller than your total file size. You can do this via pyarrow.ParquetFile.read_row_group. To truly optimize this for reading from s3 you could use fsspec’s open_parquet_file library which would allow you to only load each row group one at a time.
I had similiar issue, but for aggreagations. Use case was to "compress" large datasets into smaller aggregations for insertion into a costly db.
At first we used duckdb but memory became an issue there and we also bumped into a couple of issues with how duckdb handles arrays.
We then moved this workload to clickhouse local, which was faster and had more fine tuning options to our liking. in this case was limiting ram usage with i.e. max_bytes_before_external_group_by
I'm puzzled as to why this is a problem that has lasted months. My phone has enough RAM to work with this file in memory. Do not use pyspark, it is unbelievably slow and memory hogging if you hold it even slightly wrong. Spark is for tb-sized data, at minimum.
Have you tried downloading the file from s3 to /tmp, opening it with pandas, iterating through 1000 row chunks, pushing to DB? The default DF to SQL built into pandas doesn't batch the inserts so it will be about 10x slower than necessary, but speeding that up is a quick google->SO away.
I've spent many many hours trying these suggestions, didn't have much luck. iter_batches loads the whole file (or some very large amount of it) into memory.
It sounds like maybe your parquet file has no partitioning. Apart from the iterating over row groups like someone else suggested, I suspect there is no better solution than downloading the whole thing to your computer, partitioning it in a sane way, and uploading it again. It's only 15 GB so it should be fine even on an old laptop.
Of course then you might as well do all the processing you're interested in while the file is on your local disk, since it is probably much faster than the cloud service disk.
This is the answer for a one-off or occasional problem unless your time is worthless.
$200 to rent a machine that can run the naive solution for an entire day is peanuts compared to the dev time for a “better” solution. Running that machine for eight hours would only cost enough to purchase about a half day of junior engineer time.
Perhaps look into using dlt from https://dlthub.com, using pyarrow or polars. It handles large datasets well, especially when using generators to process the data in chunks.
It's very interesting to see how a new "enterprise open source" project is born. The part where right at the start the author knows that they should have more than one company on board, and how the other companies each contribute their part.
No-one with the combination of motivation, time and skills needed get it into Debian. Someone wanted to get a Python implementation in, but it looks like they never found the time.
If you follow that link, you'll see polars and parquet are a large highly configurable collection of tools for format manipulations across many HPC formats. Debian maintainers possibly don't want to bundle the entirety, as it would be vast.
My question is "why isn't it in Debian?", I ask that since Debian has rather high standards and the absence from Debian suggests some quality issue in available libraries for the format or the format itself.
Parquet is what, 12 years old? Hardly cutting edge. What you say my well be true for polars (I'm not familiar with it), if/when it (or something else) does get packaged I'll give parquet another look ...
Pandas is probably in Debian and it can read parquet files. Polars is fairly new and under active development. It's a python library, I install those in $HOME/.local, as opposed to system wide. One can also install it in a venv. With pip you can also uninstall packages and keep things fairly tidy.
Pandas is in Debian but it cannot read parquet files itself, it uses 3rd party "engines" for that purpose and those are not available in Debian
Python 3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0]
on linux
Type "help", "copyright", "credits" or "license" for more
information.
>>> import pandas
>>> pandas.read_parquet('sample3.parquet')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/python3/dist-packages/pandas/io/parquet.py",
line 493, in read_parquet
impl = get_engine(engine)
File "/usr/lib/python3/dist-packages/pandas/io/parquet.py",
line 53, in get_engine
raise ImportError(
ImportError: Unable to find a usable engine; tried using:
'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for
parquet support.
Yes, i wasn't clear: it's the polars library that's actively changing, so that might be the issue, or just the vast set of optional components configurable on installation, which isn't the normal package manager experience.
FWIW i think i share your general aversion to _not_ using packages, just for the tidiness of installs and removals, though i'm on fedora and macos.
pandas is a python-centric, tabular data handler that works well in clouds (and desktop Debian). Pandas can read parquet data today, among other libs mentioned. The binary dot-so driver style is single-host centric and not the emphasis of these cloudy projects (and their cloudy funders)
Perhaps more alarm is called for when this python+pandas and parquet does not work on Debian, but that is not the case today.
ps- data access in clouds often uses the S3:// endpoint . Contrast to a POSIX endpoint using _fread()_ or similar.. many parquet-aware clients prefer the cloudy, un-POSIX method to access data and that is another reason it is not a simple package in Debian today.
As I understand it, pandas can read parquet if the pyarrow or fastparquet packages are available, but that's not the case and attempts to fix that have been underway for several years.
We never had scala thrift bindings, and the Java ones were awkward from Scala, so I wrote a thrift plugin in JRuby that used the Ruby thrift parser and ERb web templates to output some Scala code. Integrated with our build pipeline, worked great for the company.
I also wrote one era of twitter's service deploy system on a hacked up Capistrano.
These projects took a few days because they were dirty hacks, but I still got a below perf review for getting easily distracted, because I didn't yet know how to sell those company-wide projects.
Anyhow, about a month before that team kicked off Parquet, I showed them a columnar format I made for a hackweek based on Lucene's codec packages, and was using to power a mixpanel-alike analytics system.
I'm not sure whether they were inspired or terrified that my hack would reach production, but I like to think I had a small hand in getting Parquet kickstarted.