I work with pyspark and parquet quite a lot. I never had to deal with parquet outside spark, but this is how I would do this:
- Write a pandas_udf function in pyspark.
- Parition your data into smaller bits so that the pandas_udf does not get too much data at the same time.
Something like:
```
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
@f.pandas_udf(return_type=whatever)
def ingest(doc: pd.Series):
# doc is a pandas series now
# your processing goes here -> write to DB e.t.c
pd_series_literal = Create a pd.Series that just contains the integer 0 to make spark happy
return pd_series_literal
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("s3 path")
df = df.repartition(1000). # bump up this number if you run
into memory issues
Now the trick is, you can limit how much data is given to your pandas_udf by repartitioning your data. The more the partitions, the smaller the pd.Series that your pandas_udf gets. There's also the `spark.sql.execution.arrow.maxRecordsPerBatch` config that you can set in spark to limit memory consumption.
^ Probably overkill to bring spark into the equation, but this is one way to do it.
You can use a normal udf (i.e `f.udf()`) instead of a pandas_udf, but apparently that's slower due to java <-> python serialization
I just wanted to mention that AWS Athena eats 15G parquet files for breakfast.
It is trivial to map the file into Athena.
But you can't connect it to anything else than file output. But it can help you to for example write it to smaller chunks. Or choose another output format such as csv (although arbitrary email content in a csv feels like you are set up for parsing errors).
The benefit is that there is virtually no setup cost. And processing cost for a 15G file will be just a few cents.
Athena is probably my best bet tbh, especially if I can do a few clicks and just get smaller files. Processing smaller files is a no brainer / pretty easy and could be outsourced to lambda.
Yeah the big benefit is that it requires very little setup.
You create a new partitioned table/location from the originally mapped file using a CTAS like so:
CREATE TABLE new_table_name
WITH (
format = 'PARQUET',
parquet_compression = 'SNAPPY',
external_location = 's3://your-bucket/path/to/output/'
) AS
SELECT *
FROM original_table_name
PARTITIONED BY partition_column_name
You can probably create a hash and partition by the last character if you want 16 evenly sized partitions. Unless you already have a dimension to partition by.
It's been a while (~5yr) since I've done anything with Spark, but IIRC it used to be very difficult to make reliable jobs with the Java or Python APIs due to the impedance mismatch between Scala's lazy evaluation semantics and the eager evaluation of Java and Python. I'd encounter perplexing OOMs whenever I tried to use the Python or Java APIs, so I (reluctantly) learned enough Scala to make the Spark go brr and all was well. Is it still like this?
Same for me, the only reason to learn scala was Spark. The Java Api was messy. And still today, i like Scala, well, many functional languages, but for jumping between projects they are a nightmare, as everything is dense and cluttered.
- Write a pandas_udf function in pyspark.
- Parition your data into smaller bits so that the pandas_udf does not get too much data at the same time.
Something like:
```
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
@f.pandas_udf(return_type=whatever)
def ingest(doc: pd.Series): # doc is a pandas series now
spark = SparkSession.builder.getOrCreate()df = spark.read.parquet("s3 path")
df = df.repartition(1000). # bump up this number if you run into memory issues
df = df.withColumn("foo", ingest(f.col("doc_column"))
```
Now the trick is, you can limit how much data is given to your pandas_udf by repartitioning your data. The more the partitions, the smaller the pd.Series that your pandas_udf gets. There's also the `spark.sql.execution.arrow.maxRecordsPerBatch` config that you can set in spark to limit memory consumption.
^ Probably overkill to bring spark into the equation, but this is one way to do it.
You can use a normal udf (i.e `f.udf()`) instead of a pandas_udf, but apparently that's slower due to java <-> python serialization