לוגו זפירנט

שיטות עבודה מומלצות ופעילויות כוונון ביצועים עבור PySpark

תאריך:

Recently I worked on a sas migration project where we converted all the SAS batch jobs park and deployed them on EMR. In the initial development phase, we used to get few environmental errors which took lots of time oot cause, and realized that these can be avoided just by setting few parameters and I decided to share those.

As we dealt with huge data and these batch jobs involved joins, aggregation, and transformations of data from various data sources, we encountered some performance issues and fixed those. So I will be sharing few ways to improve the performance of the code or reduce execution time for batch processing.

Initialize pyspark:

ייבוא ​​findspark findspark.init()

It should be the first line of your code when you run from the jupyter notebook. It attaches a spark to sys. path and initialize pyspark to Spark home parameter. You can also pass the spark path explicitly like below:

findspark.init(‘/usr/****/apache-spark/3.1.1/libexec’)

This way the engine recognizes it as a spark job and sends it to the correct queue. Also when you proceed with importing other packages into your code it will import a compatible version according to pyspark, else you might get the incompatible JVM error in a later part of the code which is hard to debug.

Create spark session with required configuration:

from pyspark.sql import SparkSession,SQLContext
sql_jar="/path/to/sql_jar_file/sqljdbc42.jar"
spark_snow_jar="/usr/.../snowflake/spark-snowflake_2.11-2.5.5-spark_2.3.jar"
snow_jdbc_jar="/usr/.../snowflake/snowflake-jdbc-3.10.3.jar"
oracle_jar="/usr/path/to/oracle_jar_file//v12/jdbc/lib/oracle6.jar"
spark=(SparkSession
.builder
.master('yarn')
.appName('Spark job new_job')
.config('spark.driver.memory','10g')
.config('spark.submit.deployMode','client')
.config('spark.executor.memory','15g')
.config('spark.executor.cores',4)
.config('spark.yarn.queue','short')
.config('spark.jars','{},{},{},{}'.frmat(sql_jar,spark_snow_jar,snow_jdbc_jar,oracle_jar))
.enableHiveSupport()
.getOrCreate())
  1. You can give master as ‘local’ for development purposes but it should be ‘yarn’ in deployment.
  2. When you use master as ‘local’, it uses 2 cores and a single JVM for both driver and worker. Whereas in ‘yarn’, you have separate JVM for driver and workers and you can use more cores.
  3. You can add more driver memory and executor memory for some jobs if required to make the execution time faster.
  4. As a best practice, you should pass jar files for all the available database connections. This could be set either in the spark session or config file. This is because when you connect to an Oracle/SQL/snowflake database using the below code, you might get the “oracle.jdbc.driver.OracleDriver” class not found error if the engine picks an incorrect jar file.
data=spark.read.format("jdbc")
.option("url",tns_path)
.option("dbtable",query)
.option("user",userid)
.option("password",password)
.option("driver","oracle.jdbc.driver.OracleDriver")
.load()

Driver name “oracle.jdbc.driver.OracleDriver” could be different for different jar files as it changes sometimes with an update from python/java. As almost all projects have many versions installed in their server with each update there will be multiple jar files available from different versions. So it is advisable to explicitly pass the required jar file path as per the code. This applies to MySQL, snowflake, or any other DB connections as well.

Use fetch size option to make reading from DB faster:

Using the above data load code spark reads 10 rows(or what is set at DB level) per iteration which makes it very slow when dealing with large data. When the query output data was in crores, using fetch size to 100000 per iteration reduced reading time 20-30 minutes. PFB the code:

data=spark.read.format("jdbc")
.option("url",tns_path)
.option("dbtable",query)
.option("user",userid)
.option("password",password)
.option("fetchsize","100000")
.option("driver","oracle.jdbc.driver.OracleDriver")
.load()

Use batch size option to make writing to DB faster:

When the data was in crores, using batch size to 100000 per iteration reduced writing time 20-30 minutes. PFB the code:

data.write.format("jdbc")
.option("url",tns_path)
.option("dbtable",schemaname.tablename)
.option("user",userid)
.option("password",password)
.option("fetchsize","100000")
.option("driver","oracle.jdbc.driver.OracleDriver")
.option("batchsize","100000")
.mode('append').save()

Handling Skew effectively:

Skew is the uneven distribution of data across partitions. Spark creates partitions in data and processes those partitions in parallel. With default partitioning of spark, the data might be skewed in some cases like join and group by if the key is not evenly distributed. In such cases, when one partition has 1000 records another partition might have millions of records and the former partition waits for the latter to complete, as a result, it can not utilize parallel processing and takes too long to complete or in some cases, it just stays in a hung state. To resolve this we can use repartition to increase the number of partitions before ingestion.

PySpark practices and performance tuning skew
data = data.repartition(10, "term") or
data = data.repartition(10)

You can use coalesce to reduce the number of partitions:

data = data.coalesce(3)

Cache/Persist Efficiently:

In the initial solution, it was fetching the data and doing serialization multiple times, and joining with the second table which results in a lot of iteration. This process was taking hours to complete initially.

Persist fetches the data and does serialization once and keeps the data in Cache for further use. So next time an action is called the data is ready in cache already. By using persist on both the tables the process was completed in less than 5 minutes. Using broadcast join improves the execution time further. We will be discussing that in later sections.

But you need to be careful while using persist. Overuse of persisting will result in a memory error. So keep clearing your data from memory when they are no longer used in the program.

PySpark practices and performance tuning cache

You can also clear all the cache at the end of the job by using the below code:

spark.catalog.clearCache()

Avoid using UDF functions unless that is the only option:

User-defined functions de-serialize each row to object, apply the lambda function and re-serialize it resulting in slower execution and more garbage collection time.

PySpark practices and performance tuning edf

Use of Thread wherever necessary:

If there are multiple independent actions in one job, you can use a thread to call those actions simultaneously. For example, in one job we were reading many huge tables from one schema and writing to another schema. Due to sequential action, the job was taking more than 2 hours. After we used the thread for concurrent writing, the load time was reduced to 30 minutes. Pe you might need to increase the spark session configuration. For optimum use of the current spark session configuration, you might pair a small slower task with a bigger faster task.

Use mapPartitions() instead of map():

Both are rdd based operations, yet map partition is preferred over the map as using mapPartitions() you can initialize once on a complete partition whereas in the map() it does the same on one row each time.

שונה:

  1. Avoid using count() on the data frame if it is not necessary. Remove all those actions you used for debugging before deploying your code.
  2. Write intermediate or final files to parquet to reduce the read and write time.
  3. If you want to read any file from your local during development, use the master as “local” because in “yarn” mode you can’t read from local. In yarn mode, it references HDFS. So you have to get those files to the HDFS location for deployment.

Please let me know if you have any queries. You can also suggest added best practices to improve performance. You can connect with me using this קישור.

Used picture source: https://unsplash.com/photos/MrVEedTZLwM

אמצעי התקשורת המוצגים במאמר זה אינם בבעלות Analytics Vidhya ומשמשים את שיקול הדעת של המחבר.

PlatoAi. Web3 מחדש. מודיעין נתונים מוגבר.
לחץ כאן לגישה.

Source: https://www.analyticsvidhya.com/blog/2021/08/best-practices-and-performance-tuning-activities-for-pyspark/

ספוט_ימג

המודיעין האחרון

ספוט_ימג