Zephyrnet Logo

Apache Spark Performance Optimization for Data Engineers

Date:

This article was published as a part of the Data Science Blogathon

Introduction

Apache Spark is a big data processing framework that has long become one of the most popular and frequently encountered in all kinds of projects related to Big Data. It successfully combines the speed of work and the simplicity of the developer expressing his thoughts.

The developer works with data at a high enough level and it seems that there is nothing difficult in, for example, connecting two data sets by writing just one line of code:

 ordersDF.join(customersDF, ordersDF ["customer_id"] == customersDF["id"], "left_outer")

But just think: what happens in a cluster when two datasets are connected, which may or may not be entirely on any of the cluster nodes? Usually, Apache Spark copes with everything quickly, but sometimes, especially if there is really a lot of data, you still need to understand what is happening at the level below and use this knowledge to help Apache Spark work to its fullest.

Today we are going to talk about how to make your application run fast and use all the resources that you requested for it. This article will focus on mainly the Spark SQL module, running an Apache Spark application on a Yarn cluster with static provisioning. But the general ideas can be applied to other initial data as well. We are looking at Spark 2.3 / 2.4 here to better understand all the innovations in Spark 3.

Data and where it lives

Let’s start with the abstraction that Spark provides us for working with data – this is RDD ( Resilient Distributed Dataset ). For the purpose of this article, it doesn’t matter that we are working with a DataFrame or DataSet.

Apache spark-RDD

Image 1

Thus, for developers, a set of data is presented as a single object, and it is processed in portions (blocks) separately in some thread on some executor in the cluster. A block is the minimum unit of processing, the executor receives a block and an instruction that tells him what needs to be done with this block of data.

How an Apache Spark app works in a cluster

At a high level, every Spark application at the time of its operation consists of a driver – a program that executes the main () function and executors that run on the cluster nodes. Executors are universal soldiers, they receive a chunk of data (block) and an instruction, execute it, and report completion to the driver to receive the next instruction. Each executor can have more than one processing thread running, in which case each thread processes its own block of data independently of the others. Thus, if, when starting our application, we ordered five executors with four cores (threads) from the cluster manager, then at each moment of time we have 5 * 4 = 20 threads and at best we can process 20 data blocks simultaneously.

So, each task gets to execute:

  • num_executors – the number of separate JVM processes in which data processing threads will be launched (they can be located both on the same cluster node or on different ones). The processes will run until the end of the application;

  • executor_cores The number of concurrent threads running in each executor. One thread processes one block of data at a time.

How the Apache Spark application works

Image 2

How the Apache Spark application works

In Spark History (a Web server for displaying the execution logs of Spark applications in a convenient form) it looks like this:

Apache Spark History: Stage

We see here two executors, each of which has four processing threads.

Shuffle Apache Spark Performance Optimization

So, we figured out that we have N data blocks and P threads (workers) that can process these data blocks in parallel.

And everything would be fine with us if these blocks lived until the end of the application, but almost any application will have processing that entails a complete reshuffling of our blocks. This, for example, joining two tables by key (JOIN), grouping by key (GROUP BY). In this case, the well-known MapReduce pattern works for everyone., in which the data of the entire set by key is redistributed into new data blocks so that rows with the same key are in only one block. This process is called Shuffle in Spark. Why did I capitalize it? Because it is a very complex and expensive process, which increases the memory consumption on the performers, the consumption of disk memory on the cluster nodes, and the network exchange between the cluster nodes. It is very reminiscent of the transformation of a caterpillar into a butterfly – everything falls apart to be reassembled in a new guise, and is also energy-intensive.

Division of Tasks into Stages

In Spark, processing blocks from one Shuffle to another is called a Stage. Note that before shuffling, all blocks are processed in parallel, after shuffling, they are also processed in parallel, but a new stage will not start until all the blocks at the end of the previous stage have passed this process. Thus, the boundary between the stages is a waiting place when processing blocks in parallel. Note also that within one stage, all tasks (tasks) over one block occur sequentially within one thread. That is, the block is not transmitted anywhere over the network, but all blocks are processed in parallel. It turns out that the number of blocks within the boundaries of the stage is unchanged.

Image 3

The task is divided into stages

We came to the following picture: all tasks are divided into stages, and within each stage, the number of blocks is constant and equal…. And this is where the fun begins. We know the number of workers (P = executors * cores), but how many blocks there will be at each stage is a question that directly affects the performance of our application. After all, if there are a lot of blocks, and there are few performers, then each performer will process several blocks sequentially and vice versa: if there are few blocks, and there are more performers, then some performers will be idle while the rest are working tirelessly. The most interesting thing here is that when the application is running slowly, they try to give it more executors, but the performance, in this case, does not increase.

Let’s start by figuring out the amount of work in stages. Herein after, for the sake of simplicity, we will consider blocks of only one data set. At any given time, performers can process several unrelated stages. For example, before the JOIN, the two datasets will be processed independently of each other and thus divide the executors among themselves. In this case, the number of processing units will be their sum. But for our purposes, it is necessary to understand what is happening with one set of data. In the first step, everything will depend on where your dataset came from. For example, if you are reading a directory of parquet files from HDFS, then the number of blocks in the first step will generally be equal to (the number of HDFS blocks that make up all .parquet files from the directory being loaded). That is, in this case, each HDFS block will represent a separate block of data for processing. But do not forget that this block distribution will be maintained until the end of the stage. Here’s a great example.

We are reading a small file from HDFS with 150,000 entries. The entire file fits into one HDFS block. Thus, at the first stage, we have only one data block, so only one performer can work with it. But according to the logic of the transformation, each line contains a field duration(the number of viewing seconds), and we need to multiply each line in the output into as many lines as there are seconds of watching in this line.

viDF = spark.read.parquet("/tst/vi/")
viDF.createOrReplaceTempView("ViewingInterval")
spark.sql("""select t.*, explode(get_list_of_seconds(duatation)) as secondNumber from ViewingInterval""")

The transformation of test data does not work quickly. Looking at Spark History, we see:

At the first stage, one block of data

At the first stage, one block of data

Tasks = 1 means that at this stage there is only one task since there is only one data block. We see 2 MB of data at the input, and at the output, there is already an expanded set of 1 GB of data. And all this is done by one thread, the rest are idle since there are no more tasks at this stage. What should we do, after all, explode- a narrow dependence and for this reason does not interrupt the stage, but is performed at the same stage at which the data is read. Within the framework of the stage, as we already know, the number of blocks is unchanged. In this case, we can easily (since the input data set is small, and the shuffling will take place quickly) break this stage in two using the function repartition(N), which leads to shuffling in random order, producing N blocks of data at the output, approximately equal in size. And since it shuffles (Shuffle), it means that a new stage begins.

viDF = spark.read.parquet("/tst/vi/")
viDF.repartition(60).createOrReplaceTempView("ViewingInterval")
spark.sql("""select t.*, explode(get_list_of_seconds(duatation)) as secondNumber from ViewingInterval""")

Let’s look at Spark History:

Now processing is running in parallel | Apache spark

Now processing is running in parallel

At the second stage – which I now got explode after repartition, we have 60 tasks (data blocks) and all the performers are now working and not idle. The transformation time has been cut by almost half. Our task is to ensure that there is no downtime, and all the performers work, otherwise, why are we taking resources from the cluster that we do not use later.

We figured out the first stage and even learned how to break any stage in two with the help of repartition(N). Let’s deal with the internal stages that are between the two shuffles. Here everything is decided by the spark spark.sql.shuffle.partitions (default 200) parameter . More precisely, I decided, since with the introduction of AQE Spark learned to regulate this amount itself. So any internal stage will consist of spark.sql.shuffle.partitions of data blocks. But here, too, not everything is so smooth: if you do not have a lot of data, then you need to decrease this parameter, and if you have a lot, increase it. And in the case of Spark 2.3, you need to look for some kind of middle ground, depending on your data.

I will give an example when we have little data, and spark.sql.shuffle.partitions = 200 by default.Looking at Spark History, we see that our dataset consists of only 185 lines, and was divided into 200 blocks during shuffling (but here it will not be enough for 200 blocks). Note that the really useful work of the performer is colored green here. That is, it turns out that of the total time of the performer’s work to process one data block from one record, the useful time was <10%. The rest of the time is waiting and deserializing.

Apache spark shuffle

aggregate matrics

What happens at the last stage? This again depends on where we output our transformation data. For example, we want to write everything to a directory as parquet files. If we do this after the shuffle, without doing anything, then we will find 200 files in this directory after the execution of our program. Why? Because after shuffling, we got by default spark.sql.shuffle.partitions = 200 blocks, and since one block is processed by one thread, it will write it itself to a separate file.

Typically, this is where developers want to control the number of files in the HDFS and call the method when saving to the DataFrame coalesce(N). This method simply enters each block in our set into one of N’s new blocks. That has coalesced (), in reality, unlike repartition(), it does not lead to shuffling and therefore does not break the stage, it just makes it so that at our stage there will be N data blocks. But what this leads to is that at this stage there will be work only for N performers. What if we decided to save everything in one file – only one stream will work. Let us recall the reasoning about the first stage and, if the last stage is quite serious in terms of calculations, then immediately before saving, coalesce(N)it makes sense to do repartition(N)to split the last stage into two: the penultimate one, which will perform heavy calculations in spark.sql.shuffle.partitions of threads in parallel (if before that there was a join, for example) and the last one, which will directly save to the number of files we need (N) already without resource-intensive calculations. Here you need to think about what will be faster – to leave everything as it is or, adding repartition(N), to perform a shuffle, which is also not free, but it is potentially possible to parallelize complex calculations.

dataDF.repartition(1) .write .format("parquet") .mode("overwrite") .option("compression","snappy") .save("/tst/problem_4/result")

Now that we have figured out the relationship between the number of blocks on a stage and the number of performers, I will give a small example. At the input stage, we have 20 data blocks, and there are only 10 executors (5 executors * 2 cores). We see that almost every executor, after processing one block, is forced to make another block for processing, since, on average, one executor has two data blocks that need to be processed. But, remembering that all data blocks at one stage can be processed in parallel, we request 20 executors for our task (5 executors * 4 cores), we get that each executor will now process only one block and the time of the entire stage will ideally be halved. This is exactly the case when increasing resources works and increases speed,

Increased resources - works faster Apache spark

Increased resources – works faster

By the way, one of the interesting points of applying the method of breaking the last stage while maintaining, described in the previous paragraph:

dataDF.repartition(N).write. …

If we compare the indicators before and after the break of the last stage, then everything seems fine: the transformation time has decreased several times (since the last calculations were performed in parallel by all performers), Shuffle Spill disappeared (this is when the performer does not have enough memory and he arranges a kind of swap with local Of course, in this case, all the data came in several large blocks and the performers had difficulty digesting them).

STOP! Let’s take a closer look at the size of the files received when saving. Was 5.9 GB, now 10.3 GB, the number of records is the same, the composition of the data is the same. Why? That’s a fly in the ointment!

Pay attention to the size of the Output | Apache spark

Pay attention to the size of the Output

Only added. We repartition()already found out that it distributes data in a random way. That is, instead of partially ordered data by key after the last shuffle (JOIN was in our case), we get data distributed randomly. Recall that parquet is a columnar file storage format, and the data in it is compressed, taking advantage of the fact that they can be partially ordered in the column. It turns out that we introduced randomness into the distribution of rows and thus degraded the data compressibility by almost two times. What can you do about it? It is possible to return the order but inside each data block.

dataDF.repartition(20). sortWithinPartitions(asc("id")).write. …

The function sortWithinPartitions()sorts by field or several fields within each block, i.e. no shuffling happens everything works within one performer in his memory. After applying this function to our transformation to sort by several fields, the total size of the output files became even slightly smaller than initially. Now everything works for us quickly, the size of the output files suits us. In addition, in this case, we recorded files of approximately the same size in HDFS (this is a consequence repartition()), which may be convenient for further processing.

Optimizer for Apache Spark Performance Optimization

Since we have touched the parquet format file, we will see how the Spark optimizer works on the example of such an optimizer rule as predicate pushdown and projection pushdown.

In the case of projection pushdown, the columnar form parquet wins especially. Let me remind you that the actual execution of the query tree begins only at the time when the action is performed, that is, an operation that outputs data: transfers to the main program (driver) ( collect, count, ..), stores the file transfers in a database, etc. … In doing so, Spark builds a query tree and optimizes it. Thus, when building a query, the optimizer already knows which fields are needed to get the result, and will read only these fields from the file. Since in the columnar file format the data is stored in the context of columns, only these fields are read from the file.

Consider the optimizer’s predicate pushdown rule. The principle of this optimization is quite simple: we have a lot of data and there is no need to process them if they are not useful in the end, for example, they must be filtered at the end of the execution of our query tree. The optimizer tries to lower all conditions and filters as much as possible to levels lower – closer to data sources, ideally before directly reading the file (or, for example, a query to an RDBMS).

Let’s consider an example:

example Apache spark

Here is the physical execution plan for the query that is generated:

example

Let’s pay attention to the block of direct reading from the file (FileScan parquet) and the PushedFilters block – these are the conditions that will be imposed during the physical reading of the file. We see that three conditions got here:

  • for the ValueDatecondition IsNotNulland LessThanOrEqual- with the latter it is clear, this is reflected in our SQL. Where did it come from IsNotNull? It is clear that we have a condition in our request ValueDate <= constant and the NULL values ​​do not satisfy this condition, that is, logically everything is correct. But why does the parquet file optimizer make this condition separately? More about this in the next paragraph;

  • for SubjectID condition IsNotNull. But we do not have such a condition in the request and in general there is no condition for SubjectID. There is only a LEFT JOIN on this field, where our table is joined to the main one. Yes, exactly: with such a JOIN, all rows where SubjectID is NULLwill not be included in the resulting selection. We see that the optimizer takes this into account and does not even read such lines from the file at the very beginning.

Let’s still figure out what is so interesting about the condition IsNotNullthat the optimizer adds it separately. To do this, let’s look at the structure of the parquet file. You can use parquet tools for this. The thing is that the parquet file, along with the schema, also stores some statistics on fields in the context of row groups.

Parquet file insidec| Apache spark

Parquet file inside

We see that for all integer types there is the number of values ​​( Values), the number of NULLs in this block ( Null Values), as well as the Min and Max values ​​of the column in this group of rows. We immediately recall our condition on the field IsNotNull. That is, if the field SubjectID in this group had Values = Null Values, then we could conclude that all values ​​in this group of rows are NULL and not read this block at all. The same applies to the conditions more, less, equal – here you can use the Min and Max values ​​of the column and draw a conclusion – whether you need to read this group of rows at all.

It is important to understand that a condition can be lowered to levels below only if it is known in advance before starting the execution of the query.

Real-life example for Apache Spark Performance Optimization

The parquet file directory is partitioned by field. A string with the filter values ​​for this field, separated by a comma, was passed to the transformation. The developer made explode(split(filter)), that is, a small table from this row with values ​​, and with a clear conscience, made an INNER JOIN with the main table that needs to be filtered. The transformation worked slowly. Let’s look at the query plan:

All partitions are read from HDFS | Apache spark

All partitions are read from HDFS

Strange, but at the first stage, Spark subtracts all partitions ( PartitionCount =121), although we pass a filter that consists of only one value. This is exactly the case when, when building a query tree, Spark does not know about the filter at all, because it is hidden behind the JOIN.

Instead of building a table with filter values, we simply use the standard Spark SQL function find_in_set (). It looks for the position of the substring in the string, which is a comma-separated list.

That is, the filter now represents a simple expression:
where find_in_set(surveyprogectid, )

And if you look at the query execution plan, since when building it, the optimizer already knows the filter string and the condition, it lowers this condition to the level of reading from the file. In addition, knowing that this is a partitioning field, it applies the partition pruning rule, that is, it throws out partitions that do not match the filter from consideration.

Only one partition is read from HDFS | Apache spark

Only one partition is read from HDFS

Please note that our condition is now in the PartitionFilters block, since the field is partitioning, only the partition we need is subtracted from HDFS ( PartitionCount = 1).

Therefore, if you have a large table with partitioning and you select some partitions via JOIN, then it may be better to form a separate action to form a list of values ​​from this filtered table as a string and pass it as a constant to the condition of the main query.

Pain Point of Apache Spark Optimizer

Great job of the optimizer … But sometimes its tendency to bring the condition as low as possible to the source can be harmful. UDF (user-defined function ) enters the scene. The user-defined function is a black box for the Spark optimizer.

Consider the following example:

We have a large file with several billion lines. We want to select only unique IDs and apply our UDF to them, then select only those results that will be Null. The sequence of requests:

T1=> select distinct id from T
T2=> select UDF(ID) as newID from T1
T3=> select * from T2 where newID is null

There are only a few thousand unique id values ​​in the table, and our UDF does not work fast – it goes to HBase. That is, we, having built such a query tree, expect that our UDF will be called several thousand times. We launch the request and wait a long, long time.

Let’s look at the query execution plan:

The condition with UDF went down almost to the lowest level The condition with UDF went down almost to the lowest level

… Oh! The optimizer did its best: it honestly lowered our condition isNull(UDF(id))to the level immediately after the direct reading of the file, even up to the moment when we select only unique id. This means that our heavy UDF has tried billions of times instead of thousands.

What can you think of here? For example, do cache(persist)after calculating unique id (T1). Or use lateral view, through which the optimizer will not pass the condition further.

select udf_res as newID from T1 lateral view explode (array(UDF(ID))) as udf_res

We got what we wanted at the beginning – UDF is calculated only for unique ids:

Conclusion

Outside the scope of this article, there are questions related to JOIN optimization: broadcast, data skew, with the pros and cons of coalescing and repartition. Some points are described in sufficient detail here, and some are not. So, visit AnalyticsVidhya for the basics, and the performance is highly optimized using this. Refer attached screenshots for the same!

Reference :

Image 1: https://mallikarjuna_g.gitbooks.io/spark/content/diagrams/spark-rdds.png

Image 2: https://media.springernature.com/lw685/springer-static/image/art%3A10.1007%2Fs11227-019-03093-0/MediaObjects/11227_2019_3093_Fig1_HTML.png

Image 3: https://xlera8.com/wp-content/uploads/2021/10/apache-spark-performance-optimization-for-data-engineers-3.png

The media shown in this article are not owned by Analytics Vidhya and are used at the Author’s discretion.

PlatoAi. Web3 Reimagined. Data Intelligence Amplified.
Click here to access.

Source: https://www.analyticsvidhya.com/blog/2021/09/apache-spark-performance-optimization-for-data-engineers/

spot_img

VC Cafe

LifeSciVC

Latest Intelligence

spot_img

Chat with us

Hi there! How can I help you?