Home Artificial Intelligence 1.5 Years of Spark Knowledge in 8 Suggestions 0 — Quick Review 1 — Spark is a Grocery Store 2— Collect Data to Memory Once 3— Meet the SLA then Stop 4 — Disk Spill 5— Use SQL Syntax 6— Glob Filters 7 — Use Reduce with DataFrame.Union 8 — Use ChatGPT

1.5 Years of Spark Knowledge in 8 Suggestions 0 — Quick Review 1 — Spark is a Grocery Store 2— Collect Data to Memory Once 3— Meet the SLA then Stop 4 — Disk Spill 5— Use SQL Syntax 6— Glob Filters 7 — Use Reduce with DataFrame.Union 8 — Use ChatGPT

0
1.5 Years of Spark Knowledge in 8 Suggestions
0 — Quick Review
1 — Spark is a Grocery Store
2— Collect Data to Memory Once
3— Meet the SLA then Stop
4 — Disk Spill
5— Use SQL Syntax
6— Glob Filters
7 — Use Reduce with DataFrame.Union
8 — Use ChatGPT

My learnings from Databricks customer engagements

Towards Data Science
spark partition data skew optimize optimization pyspark sql python UI partition
Figure 1: a technical diagram of how one can write apache spark. Image by writer.

At Databricks, I help large retail organizations deploy and scale data and machine learning pipelines. Listed below are the 8 most significant spark suggestions/tricks I’ve learned in the sphere.

Throughout this post, we assume a general working knowledge of spark and it’s structure, but this post must be accessible to all levels.

Let’s dive in!

Quickly, let’s review what spark does…

Spark is a giant data processing engine. It takes python/java/scala/R/SQL and converts that code right into a highly optimized set of transformations.

spark partition data skew optimize optimization pyspark sql python UI partition
Figure 2: spark driver and employee configuration. Image by writer.

At it’s lowest level, spark creates tasks, that are parallelizable transformations on data partitions. These tasks are then distributed across from a driver node to employee nodes, that are answerable for leveraging their CPU cores to finish the transformations. By distributing tasks to potentially many staff, spark allows us to horizontally scale and thereby support complex data pipelines that might be unimaginable on a single machine.

Okay, hopefully not all of that was recent information. Either way, in the next sections we’ll decelerate a bit. The following pointers should help each novices and intermediates at spark.

Spark is complex. To assist each you and potentially others understand its structure, let’s leverage an impressively good analogy borrowed from queueing theory: spark is a food market.

When eager about the distributed computing component of spark, there are three principal components….

  • Data partitions: subsets of rows of our data. In our food market, they’re groceries.
  • Spark tasks: low-level transformations performed on a knowledge partition. In our food market, they’re customers.
  • Cores: the a part of your processor(s) that do work in parallel. In our food market, they’re cashiers.

That’s it!

Now, let’s leverage these concepts to speak through some fundamentals of spark.

spark partition data skew optimize optimization pyspark sql python UI partition
Figure 3: illustration of the cashier analog, specifically for data skew. Image by writer.

As show in figure 3, our cashiers (cores) can only process one customer (task) at a time. Moreover, some customers have loads of groceries (partition row count), as shown by the primary customer at cashier 2. From these easy observations…

  • The more cashiers (cores), the more customers (tasks) you’ll be able to process in parallel. That is horizontal/vertical scaling.
  • In the event you don’t have enough customers (tasks) to saturate your cashiers (cores), you’ll be paying for the cashier to sit down there. This pertains to autoscaling, cluster sizing, and partition sizing.
  • If customers (tasks) have very different amounts of groceries (partition row counts), you’ll see uneven utilization of your cashiers. That is data skew.
  • The higher your cashiers (cores), the faster they will process a single customer (task). This pertains to upgrading your processor.
  • etc.

Given the analogy comes from queueing theory, a field directly related to distributed computing, it’s quite powerful!

Use this analogy to debug, communicate, and develop spark.

Probably the most common mistake for spark novices is misunderstanding lazy evaluation.

Lazy evaluation signifies that no data transformations shall be performed until you invoke a set to memory. Examples of methods that invoke a set include but are usually not limited to…

  • .collect(): bring the DataFrame into memory as a python list.
  • .show(): print the primary n rows of your DataFrame.
  • .count(): get the variety of rows of your DataFrame.
  • .first(): get the primary row of your DataFrame.

The one most typical incorrect collection method is leveraging .count() throughout a program. Each time you invoke a set, all upstream transformations shall be recomputed from scratch, so if you might have 5 invocations of .count(), your program will asymptotically run 5x as long.

Spark is lazily evaluated! Pipelines must have a single flow from source(s) to focus on(s).

A surprisingly common issue that’s come up when working with large organizations is that they lose sight of the massive picture and thereby optimize pipelines in an inefficient manner.

Here’s how pipelines must be optimized for nearly all of use cases…

  1. Ask if we want to do the project. Put simply, take into consideration what you’re actually getting from optimizing a pipeline. In the event you expect to enhance runtime by 20% and the pipeline costs $100 to run, do you have to invest your extremely expensive data engineer’s salary to save lots of $20 per run? Possibly. Possibly not.
  2. Search for low hanging fruit within the code. After agreeing to do the project, check if the code has obvious flaws. Examples are misuse of lazy evaluation, unnecessary transformations, and incorrect ordering of transformations.
  3. Get the job running under the SLA by leveraging compute. After checking that the code is comparatively efficient, just throw compute at the issue so you’ll be able to 1) meet the SLA and, 2) gather statistics from the spark UI.
  4. Stop. In the event you’re properly saturating your compute and price isn’t egregious, do some last minute compute improvements then stop. Your time is precious. Don’t waste it saving dollars when you can be creating hundreds of dollars elsewhere.
  5. Deep dive. Finally, for those who really want to deep dive because cost is unacceptable, then roll up your sleeves and optimize data, code, and compute.

The fantastic thing about this framework is that 1–4 only require cursory knowledge of spark and are very quick to execute; sometimes you’ll be able to collect information on steps 1–4 during a 30 minute call. The framework also ensures that we’ll stop as soon as we’re adequate. Finally, if step 5 is required, we will delegate that to those on the team who’re strongest at spark.

By finding all of the ways to avoid over-optimizing a pipeline, you’re saving precious developer hours.

Disk spill is the one most typical reason that spark jobs run slow.

It’s a quite simple concept. Spark is designed to leverage in-memory processing. In the event you don’t have enough memory, spark will try to write down the additional data to disk to forestall your process from crashing. This is known as disk spill.

spark partition data skew optimize optimization pyspark sql python UI partition
Figure 4: screen shot of the spark UI highlighting disk spill. Image by writer.

Writing to and reading from disk is slow, so it must be avoided. If you ought to learn how one can discover and mitigate spill, follow this tutorial. Nonetheless, some quite common and straightforward methods to mitigate spill are…

  1. Process less data per task, which could be achieved by changing the partition count via spark.shuffle.partitions or repartition.
  2. Increase the RAM to core ratio in your compute.

In the event you want your job to run optimally, prevent spill.

Whether you’re using scala, java, python, SQL, or R, spark will all the time leverage the identical transformations under the hood. So, use the the fitting language on your task.

SQL is the least verbose “language” out of all supported spark languages for a lot of operations! More tangibly:

  • In the event you’re adding or modifying a column, use selectExpr or expr, especially paired with Python’s f-strings.
  • In the event you need complex SQL, create temp views then use spark.sql().

Listed below are two quick examples…

# Column rename and solid with SQL
df = df.selectExpr([f"{c}::int as {c}_abc" for c in df.columns])

# Column rename and solid with native spark
for c in df.columns:
df = df.withColumn(f"{c}_abc", F.col(c).solid("int")).drop(c)

# Window functions with SQL
df.withColumn("running_total", expr(
"sum(value) over (order by id rows between unbounded preceding and current row)"
))

# Window functions with native spark
windowSpec = Window.orderBy("id").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_with_running_total_native = df.withColumn("running_total", F.sum("value").over(windowSpec))

Use SQL.

Do it’s essential read a bunch of information files stored in a fancy directory? If that’s the case, use spark’s extremely powerful read options.

The primary time I encountered this problem, I rewrote os.walk to work with my cloud provider where data was stored. I very proudly showed this method to my project partner who simply said, “let me share my screen,” and proceeded to introduce me to glob filters.

# Read all parquet files within the directory (and subdirectories)
df = spark.read.load(
"examples/src/principal/resources/dir1",
format="parquet",
pathGlobFilter="*.parquet"
)

After I applied the glob filter shown above as an alternative of my custom os.walk, the ingestion operation was over 10x faster.

Spark has powerful parameters. Check if your required functionality exists before constructing bespoke implementations.

Loops are almost all the time detrimental to spark performance. Here’s why…

Spark has two core phases — planning and execution. Within the planning phase, spark creates a directed acyclical graph (DAG) which indicates how your specified transformations shall be carried out. The planning phase is comparatively expensive and might sometimes take several seconds, so you ought to invoke it as infrequently as possible.

Let’s discuss a use case where it’s essential to iterate through many DataFrames, perform expensive transformations, then append them to a table.

First, there’s native support for nearly all iterative use cases, specifically pandas UDFs, window functions, and joins. But, for those who truly do need a loop, here’s the way you invoke a single planning phase and thereby get all transformations in a single DAG.

import functools
from pyspark.sql import DataFrame

paths = get_file_paths()

# BAD: For loop
for path in paths:
df = spark.read.load(path)
df = fancy_transformations(df)
df.write.mode("append").saveAsTable("xyz")

# GOOD: functools.reduce
lazily_evaluated_reads = [spark.read.load(path) for path in paths]
lazily_evaluted_transforms = [fancy_transformations(df) for df in lazily_evaluated_reads]
unioned_df = functools.reduce(DataFrame.union, lazily_evaluted_transforms)
unioned_df.write.mode("append").saveAsTable("xyz")

The primary solution uses a for loop to iterate over paths, do fancy transformations, then append to our delta table of interest. Within the second, we store a listing of lazily evaluated DataFrames, apply transformations over them, then reduce them via a union, performing a single spark plan and write.

We are able to actually see the difference in architecture on the backend via the Spark UI…

spark partition data skew optimize optimization pyspark sql python UI partition
Figure 5: spark DAG for for loop vs. functools.reduce. Image by writer.

In figure 5, the DAG on the left corresponding to the for loop may have 10 stages. Nonetheless, the DAG on the fitting corresponding to functools.reduce may have a single stage and thereby could be processed more easily in parallel.

For an easy use case of reading 400 unique delta tables then appending to a delta table, this method was 6x faster than a for loop.

Get creative to create a single spark DAG.

This shouldn’t be about hype.

Spark is a well-establish and thereby well-documented piece of software. LLMs, specifically GPT-4, are really good at distilling complex information into digestible and concise explanations. Because the release of GPT-4, I actually have not done a fancy spark project where I didn’t heavily depend on GPT-4.

spark partition data skew optimize optimization pyspark sql python UI partition
Figure 6: example of GPT-4 output on impacting data partition size in spark. Image by writer.

Nonetheless, stating the (hopefully) obvious, watch out with LLMs. Anything you send to a closed source model can change into training data for the parent organization — be sure you don’t send anything sensitive. Also, please validate that the output from GPT is legit.

When used properly, LLMs are game-changing for spark learning and development. It’s price $20/month.

LEAVE A REPLY

Please enter your comment!
Please enter your name here