Home Artificial Intelligence Comparing Performance of Big Data File Formats: A Practical Guide

Comparing Performance of Big Data File Formats: A Practical Guide

0
Comparing Performance of Big Data File Formats: A Practical Guide

Environment setup

On this guide, we’re going to make use of JupyterLab with Docker and MinIO. Consider Docker as a handy tool that simplifies running applications, and MinIO as a versatile storage solution perfect for handling a number of various kinds of data. Here’s how we’ll set things up:

I’m not diving deep into every step here since there’s already an ideal tutorial for that. I suggest checking it out first, then coming back to proceed with this one.

Once all the pieces’s ready, we’ll start by preparing our sample data. Open a brand new Jupyter notebook to start.

First up, we’d like to put in the s3fs Python package, essential for working with MinIO in Python.

!pip install s3fs

Following that, we’ll import the needed dependencies and modules.

import os
import s3fs
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as F
from pyspark.sql import Row
import pyspark.sql.types as T
import datetime
import time

We’ll also set some environment variables that will likely be useful when interacting with MinIO.

# Define environment variables
os.environ["MINIO_KEY"] = "minio"
os.environ["MINIO_SECRET"] = "minio123"
os.environ["MINIO_ENDPOINT"] = "http://minio1:9000"

Then, we’ll arrange our Spark session with the needed settings.

# Create Spark session
spark = SparkSession.builder
.appName("big_data_file_formats")
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.1026,org.apache.spark:spark-avro_2.12:3.5.0,io.delta:delta-spark_2.12:3.0.0")
.config("spark.hadoop.fs.s3a.endpoint", os.environ["MINIO_ENDPOINT"])
.config("spark.hadoop.fs.s3a.access.key", os.environ["MINIO_KEY"])
.config("spark.hadoop.fs.s3a.secret.key", os.environ["MINIO_SECRET"])
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.enableHiveSupport()
.getOrCreate()

Let’s simplify this to know it higher.

  • spark.jars.packages: Downloads the required JAR files from the Maven repository. A Maven repository is a central place used for storing construct artifacts like JAR files, libraries, and other dependencies which might be utilized in Maven-based projects.
  • spark.hadoop.fs.s3a.endpoint: That is the endpoint URL for MinIO.
  • spark.hadoop.fs.s3a.access.key and spark.hadoop.fs.s3a.secret.key: That is the access key and secret key for MinIO. Note that it is identical because the username and password used to access the MinIO web interface.
  • spark.hadoop.fs.s3a.path.style.access: It is about to true to enable path-style access for the MinIO bucket.
  • spark.hadoop.fs.s3a.impl: That is the implementation class for S3A file system.
  • spark.sql.extensions: Registers Delta Lake’s SQL commands and configurations throughout the Spark SQL parser.
  • spark.sql.catalog.spark_catalog: Sets the Spark catalog to Delta Lake’s catalog, allowing table management and metadata operations to be handled by Delta Lake.

Selecting the best JAR version is crucial to avoid errors. Using the identical Docker image, the JAR version mentioned here should work high quality. In case you encounter setup issues, be happy to go away a comment. I’ll do my best to help you 🙂

Our next step is to create an enormous Spark dataframe. It’ll have 10 million rows, divided into ten columns — half are text, and half are numbers.

# Generate sample data
num_rows = 10000000
df = spark.range(0, num_rows)

# Add columns
for i in range(1, 10): # Since we have already got one column
if i % 2 == 0:
# Integer column
df = df.withColumn(f"int_col_{i}", (F.randn() * 100).forged(T.IntegerType()))
else:
# String column
df = df.withColumn(f"str_col_{i}", (F.rand() * num_rows).forged(T.IntegerType()).forged("string"))

df.count()

Let’s peek at the primary few entries to see what they appear like.

# Show rows from sample data
df.show(10,truncate = False)

+---+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|id |str_col_1|int_col_2|str_col_3|int_col_4|str_col_5|int_col_6|str_col_7|int_col_8|str_col_9|
+---+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|0 |7764018 |128 |1632029 |-15 |5858297 |114 |1025493 |-88 |7376083 |
|1 |2618524 |118 |912383 |235 |6684042 |-115 |9882176 |170 |3220749 |
|2 |6351000 |75 |3515510 |26 |2605886 |89 |3217428 |87 |4045983 |
|3 |4346827 |-70 |2627979 |-23 |9543505 |69 |2421674 |-141 |7049734 |
|4 |9458796 |-106 |6374672 |-142 |5550170 |25 |4842269 |-97 |5265771 |
|5 |9203992 |23 |4818602 |42 |530044 |28 |5560538 |-75 |2307858 |
|6 |8900698 |-130 |2735238 |-135 |1308929 |22 |3279458 |-22 |3412851 |
|7 |6876605 |-35 |6690534 |-41 |273737 |-178 |8789689 |88 |4200849 |
|8 |3274838 |-42 |1270841 |-62 |4592242 |133 |4665549 |-125 |3993964 |
|9 |4904488 |206 |2176042 |58 |1388630 |-63 |9364695 |78 |2657371 |
+---+---------+---------+---------+---------+---------+---------+---------+---------+---------+
only showing top 10 rows

To know the structure of our dataframe, we’ll use df.printSchema() to see the varieties of data it accommodates. After this, we’ll create 4 CSV files. These will likely be used for Parquet, Avro, ORC, and Delta Lake. We’re doing this to avoid any bias in performance testing — using the identical CSV lets Spark cache and optimize things within the background.

# Write 4 CSVs for comparing performance for each file type
df.write.csv("s3a://mybucket/ten_million_parquet.csv")
df.write.csv("s3a://mybucket/ten_million_avro.csv")
df.write.csv("s3a://mybucket/ten_million_orc.csv")
df.write.csv("s3a://mybucket/ten_million_delta.csv")

Now, we’ll make 4 separate dataframes from these CSVs, every one for a special file format.

# Read all 4 CSVs to create dataframes
schema = T.StructType([
T.StructField("id", T.LongType(), nullable=False),
T.StructField("str_col_1", T.StringType(), nullable=True),
T.StructField("int_col_2", T.IntegerType(), nullable=True),
T.StructField("str_col_3", T.StringType(), nullable=True),
T.StructField("int_col_4", T.IntegerType(), nullable=True),
T.StructField("str_col_5", T.StringType(), nullable=True),
T.StructField("int_col_6", T.IntegerType(), nullable=True),
T.StructField("str_col_7", T.StringType(), nullable=True),
T.StructField("int_col_8", T.IntegerType(), nullable=True),
T.StructField("str_col_9", T.StringType(), nullable=True)
])

df_csv_parquet = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_parquet.csv")
df_csv_avro = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_avro.csv")
df_csv_orc = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_orc.csv")
df_csv_delta = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_delta.csv")

And that’s it! We’re all set to explore these big data file formats.

Working with Parquet

Parquet is a column-oriented file format that meshes very well with Apache Spark, making it a top alternative for handling big data. It shines in analytical scenarios, particularly while you’re sifting through data column by column.

Certainly one of its neat features is the flexibility to store data in a compressed format, with snappy compression being the go-to alternative. This not only saves space but additionally enhances performance.

One other cool aspect of Parquet is its flexible approach to data schemas. You’ll be able to start off with a basic structure after which easily expand by adding more columns as your needs grow. This adaptability makes it super user-friendly for evolving data projects.

Now that we’ve got a handle on Parquet, let’s put it to the test. We’re going to write down 10 million records right into a Parquet file and control how long it takes. As a substitute of using the %timeit Python function, which runs multiple times and will be heavy on resources for giant data tasks, we’ll just measure it once.

# Write data as Parquet
start_time = time.time()
df_csv_parquet.write.parquet("s3a://mybucket/ten_million_parquet2.parquet")
end_time = time.time()
print(f"Time taken to write down as Parquet: {end_time - start_time} seconds")

For me, this task took 15.14 seconds, but remember, this time can change depending in your computer. For instance, on a less powerful PC, it took longer. So, don’t sweat it in case your time is different. What’s necessary here is comparing the performance across different file formats.

Next up, we’ll run an aggregation query on our Parquet data.

# Perfom aggregation query using Parquet data
start_time = time.time()
df_parquet = spark.read.parquet("s3a://mybucket/ten_million_parquet2.parquet")
df_parquet
.select("str_col_5","str_col_7","int_col_2")
.groupBy("str_col_5","str_col_7")
.count()
.orderBy("count")
.limit(1)
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")

+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1 |6429997 |1 |
+---------+---------+-----+

This question finished in 12.33 seconds. Alright, now let’s switch gears and explore the ORC file format.

Working with ORC

The ORC file format, one other column-oriented contender, may not be as well-known as Parquet, however it has its own perks. One standout feature is its ability to compress data much more effectively than Parquet, while using the identical snappy compression algorithm.

It’s successful within the Hive world, because of its support for ACID operations in Hive tables. ORC can also be tailor-made for handling large streaming reads efficiently.

Plus, it’s just as flexible as Parquet on the subject of schemas — you may begin with a basic structure after which add more columns as your project grows. This makes ORC a sturdy alternative for evolving big data needs.

Let’s dive into testing ORC’s writing performance.

# Write data as ORC
start_time = time.time()
df_csv_orc.write.orc("s3a://mybucket/ten_million_orc2.orc")
end_time = time.time()
print(f"Time taken to write down as ORC: {end_time - start_time} seconds")

It took me 12.94 seconds to finish the duty. One other focal point is the scale of the information written to the MinIO bucket. Within the ten_million_orc2.orc folder, you’ll find several partition files, each of a consistent size. Every partition ORC file is about 22.3 MiB, and there are 16 files in total.

ORC partition files (Image by writer)

Comparing this to Parquet, each Parquet partition file is around 26.8 MiB, also totaling 16 files. This shows that ORC indeed offers higher compression than Parquet.

Next, we’ll test how ORC handles an aggregation query. We’re using the identical query for all file formats to maintain our benchmarking fair.

# Perform aggregation using ORC data
df_orc = spark.read.orc("s3a://mybucket/ten_million_orc2.orc")
start_time = time.time()
df_orc
.select("str_col_5","str_col_7","int_col_2")
.groupBy("str_col_5","str_col_7")
.count()
.orderBy("count")
.limit(1)
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")

+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1 |2906292 |1 |
+---------+---------+-----+

The ORC query finished in 13.44 seconds, a tad longer than Parquet’s time. With ORC checked off our list, let’s move on to experimenting with Avro.

Working with Avro

Avro is a row-based file format with its own unique strengths. While it doesn’t compress data as efficiently as Parquet or ORC, it makes up for this with a faster writing speed.

What really sets Avro apart is its excellent schema evolution capabilities. It handles changes like added, removed, or altered fields with ease, making it a go-to alternative for scenarios where data structures evolve over time.

Avro is especially well-suited for workloads that involve a whole lot of data writing.

Now, let’s take a look at how Avro does with writing data.

# Write data as Avro
start_time = time.time()
df_csv_avro.write.format("avro").save("s3a://mybucket/ten_million_avro2.avro")
end_time = time.time()
print(f"Time taken to write down as Avro: {end_time - start_time} seconds")

It took me 12.81 seconds, which is definitely quicker than each Parquet and ORC. Next, we’ll take a look at Avro’s performance with an aggregation query.

# Perform aggregation using Avro data
df_avro = spark.read.format("avro").load("s3a://mybucket/ten_million_avro2.avro")
start_time = time.time()
df_avro
.select("str_col_5","str_col_7","int_col_2")
.groupBy("str_col_5","str_col_7")
.count()
.orderBy("count")
.limit(1)
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")

+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1 |6429997 |1 |
+---------+---------+-----+

This question took about 15.42 seconds. So, on the subject of querying, Parquet and ORC are ahead by way of speed. Alright, it’s time to explore our final and newest file format — Delta Lake.

Working with Delta Lake

Delta Lake is a brand new star in the large data file format universe, closely related to Parquet by way of storage size — it’s like Parquet but with some extra features.

When writing data, Delta Lake takes a bit longer than Parquet, mostly due to its _delta_log folder, which is essential to its advanced capabilities. These capabilities include ACID compliance for reliable transactions, time travel for accessing historical data, and small file compaction to maintain things tidy.

While it’s a newcomer in the large data scene, Delta Lake has quickly turn into a favourite on cloud platforms that run Spark, outpacing its use in on-premises systems.

Let’s move on to testing Delta Lake’s performance, starting with an information writing test.

# Write data as Delta
start_time = time.time()
df_csv_delta.write.format("delta").save("s3a://mybucket/ten_million_delta2.delta")
end_time = time.time()
print(f"Time taken to write down as Delta Lake: {end_time - start_time} seconds")

The write operation took 17.78 seconds, which is a bit longer than the opposite file formats we’ve checked out. A neat thing to note is that within the ten_million_delta2.delta folder, each partition file is definitely a Parquet file, similar in size to what we observed with Parquet. Plus, there’s the _delta_log folder.

Writing data as Delta Lake (Image by writer)

The _delta_log folder within the Delta Lake file format plays a critical role in how Delta Lake manages and maintains data integrity and versioning. It is a key component that sets Delta Lake other than other big data file formats. Here’s an easy breakdown of its function:

  1. Transaction Log: The _delta_log folder accommodates a transaction log that records every change made to the information within the Delta table. This log is a series of JSON files that detail the additions, deletions, and modifications to the information. It acts like a comprehensive diary of all the information transactions.
  2. ACID Compliance: This log enables ACID (Atomicity, Consistency, Isolation, Durability) compliance. Every transaction in Delta Lake, like writing recent data or modifying existing data, is atomic and consistent, ensuring data integrity and reliability.
  3. Time Travel and Auditing: The transaction log allows for “time travel”, which implies you may easily view and restore earlier versions of the information. This is incredibly useful for data recovery, auditing, and understanding how data has evolved over time.
  4. Schema Enforcement and Evolution: The _delta_log also keeps track of the schema (structure) of the information. It enforces the schema during data writes and allows for protected evolution of the schema over time without corrupting the information.
  5. Concurrency and Merge Operations: It manages concurrent reads and writes, ensuring that multiple users can access and modify the information at the identical time without conflicts. This makes it ideal for complex operations like merge, update, and delete.

In summary, the _delta_log folder is the brain behind Delta Lake’s advanced data management features, offering robust transaction logging, version control, and reliability enhancements that will not be typically available in simpler file formats like Parquet or ORC.

Now, it’s time to see how Delta Lake fares with an aggregation query.

# Perform aggregation using Delta data
df_delta = spark.read.format("delta").load("s3a://mybucket/ten_million_delta2.delta")
start_time = time.time()
df_delta
.select("str_col_5","str_col_7","int_col_2")
.groupBy("str_col_5","str_col_7")
.count()
.orderBy("count")
.limit(1)
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")

+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1 |2906292 |1 |
+---------+---------+-----+

This question finished in about 15.51 seconds. While it is a tad slower in comparison with Parquet and ORC, it’s pretty close. It suggests that Delta Lake’s performance in real-world scenarios is sort of much like that of Parquet.

Awesome! We’ve wrapped up all our experiments. Let’s recap our findings in the following section.

When to make use of which file format?

We’ve wrapped up our testing, so let’s bring all our findings together. For data writing, Avro takes the highest spot. That’s really what it’s best at in practical scenarios.

In terms of reading and running aggregation queries, Parquet leads the pack. Nevertheless, this doesn’t mean ORC and Delta Lake fall short. As columnar file formats, they perform admirably in most situations.

Performance comparison (Image by writer)

Here’s a fast rundown:

  • Select ORC for the most effective compression, especially for those who’re using Hive and Pig for analytical tasks.
  • Working with Spark? Parquet and Delta Lake are your go-to decisions.
  • For scenarios with a number of data writing, like landing zone areas, Avro is the most effective fit.

And that’s a wrap on this tutorial!

LEAVE A REPLY

Please enter your comment!
Please enter your name here