Zephyrnet-logo

Beste praksis og ytelsesjusteringsaktiviteter for PySpark

Dato:

Nylig jobbet jeg med et sas-migrasjonsprosjekt der vi konverterte alle SAS batchjobber og distribuerte dem på EMR. I den innledende utviklingsfasen pleide vi å få noen miljøfeil som tok mye tid på grunn av årsaken, og innså at disse kan unngås bare ved å sette noen få parametere, og jeg bestemte meg for å dele dem.

Da vi behandlet enorme data og disse batchjobbene involverte sammenføyninger, aggregering og transformasjoner av data fra forskjellige datakilder, møtte vi noen ytelsesproblemer og fikset dem. Så jeg vil dele noen måter å forbedre ytelsen til koden på eller redusere utførelsestiden for batchbehandling.

Initialiser pyspark:

import findspark findspark.init()

Det bør være den første linjen i koden når du kjører fra Jupyter-notebooken. Den fester en gnist til sys. bane og initialiser pyspark til Spark home parameter. Du kan også passere gnistbanen eksplisitt som nedenfor:

findspark.init('/usr/****/apache-spark/3.1.1/libexec')

På denne måten gjenkjenner motoren den som en gnistjobb og sender den til riktig kø. Også når du fortsetter med å importere andre pakker til koden din vil den importere en kompatibel versjon i henhold til pyspark, ellers kan du få den inkompatible JVM-feilen i en senere del av koden som er vanskelig å feilsøke.

Opprett gnistøkt med nødvendig konfigurasjon:

fra pyspark.sql importer SparkSession,SQLContext sql_jar="/path/to/sql_jar_file/sqljdbc42.jar" spark_snow_jar="/usr/.../snowflake/spark-snowflake_2.11-2.5.5-spark"_2.3. snow_jdbc_jar="/usr/.../snowflake/snowflake-jdbc-3.10.3.jar" oracle_jar="/usr/path/to/oracle_jar_file//v12/jdbc/lib/oracle6.jar" spark=(SparkSession . builder .master('yarn') .appName('Spark job new_job') .config('spark.driver.memory','10g') .config('spark.submit.deployMode','client') .config( 'spark.executor.memory','15g') .config('spark.executor.cores',4) .config('spark.yarn.queue','short') .config('spark.jars',' {},{},{},{}'.framat(sql_jar,spark_snow_jar,snow_jdbc_jar,oracle_jar)) .enableHiveSupport() .getOrCreate())
  1. Du kan gi master som "lokal" for utviklingsformål, men det bør være "garn" i distribusjon.
  2. Når du bruker master som 'lokal', bruker den 2 kjerner og en enkelt JVM for både sjåfør og arbeider. Mens i 'garn' har du separat JVM for sjåfør og arbeidere, og du kan bruke flere kjerner.
  3. Du kan legge til mer driverminne og eksekveringsminne for enkelte jobber hvis det er nødvendig for å gjøre utførelsestiden raskere.
  4. Som en beste praksis bør du sende jar-filer for alle tilgjengelige databasetilkoblinger. Dette kan settes enten i gnist-økten eller konfigurasjonsfilen. Dette er fordi når du kobler til en Oracle/SQL/snøfnugg-database ved å bruke koden nedenfor, kan du få klassen "oracle.jdbc.driver.OracleDriver"-klassen ikke funnet hvis motoren velger en feil jar-fil.
data=spark.read.format("jdbc") .option("url",tns_path) .option("dbtable",query) .option("bruker",brukerid) .option("passord",passord) .option ("driver","oracle.jdbc.driver.OracleDriver") .load()

Drivernavnet "oracle.jdbc.driver.OracleDriver" kan være forskjellig for forskjellige jar-filer, da det noen ganger endres med en oppdatering fra python/java. Siden nesten alle prosjekter har mange versjoner installert på serveren med hver oppdatering, vil det være flere jar-filer tilgjengelig fra forskjellige versjoner. Så det er tilrådelig å eksplisitt sende den nødvendige jar-filbanen i henhold til koden. Dette gjelder også for MySQL, snowflake eller andre DB-forbindelser.

Bruk alternativet for hentestørrelse for å gjøre lesing fra DB raskere:

Ved å bruke databelastningskoden ovenfor, leser gnisten 10 rader (eller det som er satt på DB-nivå) per iterasjon, noe som gjør det veldig tregt når man arbeider med store data. Når spørringens utdata var i crores, reduserte lesetiden med 100000-20 minutter ved bruk av hentestørrelse til 30 XNUMX per iterasjon. PFB koden:

data=spark.read.format("jdbc") .option("url",tns_path) .option("dbtable",query) .option("bruker",brukerid) .option("passord",passord) .option ("fetchsize","100000") .option("driver","oracle.jdbc.driver.OracleDriver") .load()

Bruk batchstørrelsesalternativet for å gjøre skriving til DB raskere:

Når dataene var i crores, reduserte bruk av batchstørrelse til 100000 20 per iterasjon skrivetiden med 30-XNUMX minutter. PFB koden:

data.write.format("jdbc") .option("url",tns_path) .option("dbtable",skjemanavn.tabellnavn) .option("bruker",brukerid) .option("passord",passord) .option ("fetchsize","100000") .option("driver","oracle.jdbc.driver.OracleDriver") .option("batchsize","100000") .mode('append').save()

Håndtere skjevhet effektivt:

Skew er den ujevne fordelingen av data på tvers av partisjoner. Spark lager partisjoner i data og behandler disse partisjonene parallelt. Med standardpartisjonering av gnist, kan dataene bli skjevt i noen tilfeller, for eksempel bli med og gruppere etter hvis nøkkelen ikke er jevnt fordelt. I slike tilfeller, når en partisjon har 1000 poster, kan en annen partisjon ha millioner av poster og den førstnevnte partisjonen venter på at sistnevnte skal fullføres, som et resultat kan den ikke bruke parallell behandling og tar for lang tid å fullføre, eller i noen tilfeller kan den forblir bare i en hengt tilstand. For å løse dette kan vi bruke ompartisjon for å øke antall partisjoner før inntak.

PySpark-øvelser og ytelsesjustering er skjev
data = data.repartition(10, "term") eller data = data.repartition(10)

Du kan bruke coalesce for å redusere antall partisjoner:

data = data.coalesce(3)

Buffer/vedvarer effektivt:

I den opprinnelige løsningen hentet den dataene og gjorde serialisering flere ganger, og ble sammen med den andre tabellen som resulterer i mye iterasjon. Denne prosessen tok timer å fullføre i utgangspunktet.

Persist henter dataene og gjør serialisering én gang og beholder dataene i Cache for videre bruk. Så neste gang en handling kalles er dataene allerede klare i hurtigbufferen. Ved å bruke persist på begge tabellene ble prosessen fullført på mindre enn 5 minutter. Bruk av broadcast join forbedrer utførelsestiden ytterligere. Vi vil diskutere det i senere avsnitt.

Men du må være forsiktig mens du bruker persist. Overforbruk av vedvarende vil resultere i en minnefeil. Så fortsett å tømme dataene dine fra minnet når de ikke lenger brukes i programmet.

PySpark-øvelser og cache for ytelsesjustering

Du kan også tømme all hurtigbufferen på slutten av jobben ved å bruke koden nedenfor:

spark.catalog.clearCache()

Unngå å bruke UDF-funksjoner med mindre det er det eneste alternativet:

Brukerdefinerte funksjoner de-serialiserer hver rad til objekt, bruker lambda-funksjonen og re-serialiserer den, noe som resulterer i tregere utførelse og mer søppelhentingstid.

PySpark-øvelser og ytelsesjustering edf

Bruk av tråd når det er nødvendig:

Hvis det er flere uavhengige handlinger i en jobb, kan du bruke en tråd til å kalle disse handlingene samtidig. For eksempel, i en jobb leste vi mange enorme tabeller fra ett skjema og skrev til et annet skjema. På grunn av sekvensiell handling tok jobben mer enn 2 timer. Etter at vi brukte tråden til samtidig skriving, ble lastetiden redusert til 30 minutter. Du må kanskje øke konfigurasjonen av gnistøkten. For optimal bruk av den nåværende konfigurasjonen av gnistøkten, kan du pare en liten tregere oppgave med en større, raskere oppgave.

Bruk mapPartitions() i stedet for map():

Begge er rdd-baserte operasjoner, men kartpartisjon foretrekkes fremfor kartet ettersom du bruker mapPartitions() kan initialisere en gang på en komplett partisjon, mens i map() gjør den det samme på en rad hver gang.

Diverse:

  1. Unngå å bruke count() på datarammen hvis det ikke er nødvendig. Fjern alle handlingene du brukte for feilsøking før du distribuerer koden din.
  2. Skriv mellom- eller sluttfiler til parkett for å redusere lese- og skrivetiden.
  3. Hvis du vil lese en fil fra din lokale under utvikling, bruk masteren som "lokal" fordi i "garn"-modus kan du ikke lese fra lokal. I garnmodus refererer det til HDFS. Så du må få disse filene til HDFS-plasseringen for distribusjon.

Gi meg beskjed hvis du har spørsmål. Du kan også foreslå ekstra gode fremgangsmåter for å forbedre ytelsen. Du kan få kontakt med meg ved å bruke dette link.

Brukt bildekilde: https://unsplash.com/photos/MrVEedTZLwM

Media vist i denne artikkelen eies ikke av Analytics Vidhya og brukes etter forfatterens skjønn.

PlatonAi. Web3 Reimagined. Data Intelligence Amplified.
Klikk her for å få tilgang.

Kilde: https://www.analyticsvidhya.com/blog/2021/08/best-practices-and-performance-tuning-activities-for-pyspark/

spot_img

VC kafé

VC kafé

Siste etterretning

spot_img