Zephyrnet Logo

Melhores práticas e atividades de ajuste de desempenho para PySpark

Data:

Recentemente, trabalhei em um projeto de migração do sas onde convertemos todos os lotes de jobs do SAS e os implantamos no EMR. Na fase inicial de desenvolvimento, costumávamos ter poucos erros ambientais que levavam muito tempo de causa, e percebemos que eles podem ser evitados apenas definindo alguns parâmetros e decidi compartilhá-los.

Como lidamos com dados enormes e esses trabalhos em lote envolviam junções, agregações e transformações de dados de várias fontes de dados, encontramos alguns problemas de desempenho e os corrigimos. Portanto, compartilharei algumas maneiras de melhorar o desempenho do código ou reduzir o tempo de execução do processamento em lote.

Inicialize o pyspark:

importar findspark findspark.init()

Deve ser a primeira linha do seu código quando você executa a partir do notebook jupyter. Ele anexa uma faísca a sys. path e inicialize o pyspark para o parâmetro inicial do Spark. Você também pode passar o caminho do spark explicitamente como abaixo:

findspark.init('/usr/****/apache-spark/3.1.1/libexec')

Dessa forma, o mecanismo o reconhece como um trabalho de faísca e o envia para a fila correta. Além disso, quando você continuar importando outros pacotes para o seu código, ele importará uma versão compatível de acordo com o pyspark, caso contrário, você poderá obter o erro de JVM incompatível em uma parte posterior do código que é difícil de depurar.

Crie uma sessão spark com a configuração necessária:

de pyspark.sql import SparkSession,SQLContext sql_jar="/path/to/sql_jar_file/sqljdbc42.jar" spark_snow_jar="/usr/.../snowflake/spark-snowflake_2.11-2.5.5-spark_2.3.jar" snow_jdbc_jar="/usr/.../snowflake/snowflake-jdbc-3.10.3.jar" oracle_jar="/usr/path/to/oracle_jar_file//v12/jdbc/lib/oracle6.jar" spark=(SparkSession . builder .master('yarn') .appName('Spark job new_job') .config('spark.driver.memory','10g') .config('spark.submit.deployMode','client') .config( 'spark.executor.memory','15g') .config('spark.executor.cores',4) .config('spark.yarn.queue','short') .config('spark.jars',' {},{},{},{}'.frmat(sql_jar,spark_snow_jar,snow_jdbc_jar,oracle_jar)) .enableHiveSupport() .getOrCreate())
  1. Você pode dar master como 'local' para fins de desenvolvimento, mas deve ser 'yarn' na implantação.
  2. Quando você usa master como 'local', ele usa 2 núcleos e uma única JVM para driver e trabalhador. Considerando que em 'yarn', você tem uma JVM separada para driver e trabalhadores e pode usar mais núcleos.
  3. Você pode adicionar mais memória de driver e memória de executor para alguns trabalhos, se necessário, para tornar o tempo de execução mais rápido.
  4. Como prática recomendada, você deve passar arquivos jar para todas as conexões de banco de dados disponíveis. Isso pode ser definido na sessão do Spark ou no arquivo de configuração. Isso ocorre porque quando você se conecta a um banco de dados Oracle/SQL/snowflake usando o código abaixo, pode obter o erro de classe “oracle.jdbc.driver.OracleDriver” se o mecanismo selecionar um arquivo jar incorreto.
data=spark.read.format("jdbc") .option("url",tns_path) .option("dbtable",query) .option("user",userid) .option("password",password) .option ("driver","oracle.jdbc.driver.OracleDriver") .load()

O nome do driver “oracle.jdbc.driver.OracleDriver” pode ser diferente para diferentes arquivos jar, pois às vezes muda com uma atualização de python/java. Como quase todos os projetos têm muitas versões instaladas em seus servidores, a cada atualização, haverá vários arquivos jar disponíveis de diferentes versões. Portanto, é aconselhável passar explicitamente o caminho do arquivo jar necessário de acordo com o código. Isso se aplica ao MySQL, floco de neve ou qualquer outra conexão de banco de dados também.

Use a opção fetch size para tornar a leitura do banco de dados mais rápida:

Usando o código de carregamento de dados acima, o spark lê 10 linhas (ou o que está definido no nível do banco de dados) por iteração, o que o torna muito lento ao lidar com dados grandes. Quando os dados de saída da consulta estavam em milhões, usar o tamanho de busca para 100000 por iteração reduziu o tempo de leitura em 20 a 30 minutos. PFB o código:

data=spark.read.format("jdbc") .option("url",tns_path) .option("dbtable",query) .option("user",userid) .option("password",password) .option ("fetchsize","100000") .option("driver","oracle.jdbc.driver.OracleDriver") .load()

Use a opção de tamanho de lote para tornar a gravação no banco de dados mais rápida:

Quando os dados estavam em crores, usar o tamanho do lote para 100000 por iteração reduziu o tempo de gravação de 20 a 30 minutos. PFB o código:

data.write.format("jdbc") .option("url",tns_path) .option("dbtable",schemaname.tablename) .option("user",userid) .option("password",password) .option ("fetchsize","100000") .option("driver","oracle.jdbc.driver.OracleDriver") .option("batchsize","100000") .mode('append').save()

Manipulando a inclinação de forma eficaz:

Skew é a distribuição desigual de dados entre partições. O Spark cria partições nos dados e processa essas partições em paralelo. Com o particionamento padrão do spark, os dados podem ser distorcidos em alguns casos, como join e group by, se a chave não for distribuída uniformemente. Nesses casos, quando uma partição tem 1000 registros, outra partição pode ter milhões de registros e a partição anterior aguarda a conclusão da última, como resultado, ela não pode utilizar o processamento paralelo e demora muito para ser concluída ou, em alguns casos, apenas permanece em um estado suspenso. Para resolver isso, podemos usar a repartição para aumentar o número de partições antes da ingestão.

Práticas do PySpark e distorção de ajuste de desempenho
data = data.repartition(10, "termo") ou data = data.repartition(10)

Você pode usar coalesce para reduzir o número de partições:

dados = dados.coalesce(3)

Cache/Persistir de forma eficiente:

Na solução inicial, ele estava buscando os dados e fazendo a serialização várias vezes e juntando-se à segunda tabela, o que resulta em muitas iterações. Este processo estava levando horas para ser concluído inicialmente.

Persist busca os dados e faz a serialização uma vez e mantém os dados no Cache para uso posterior. Então, da próxima vez que uma ação for chamada, os dados já estarão prontos no cache. Ao usar persist em ambas as tabelas, o processo foi concluído em menos de 5 minutos. O uso de broadcast join melhora ainda mais o tempo de execução. Discutiremos isso em seções posteriores.

Mas você precisa ter cuidado ao usar persist. O uso excessivo de persistência resultará em um erro de memória. Portanto, continue limpando seus dados da memória quando eles não forem mais usados ​​no programa.

Práticas do PySpark e cache de ajuste de desempenho

Você também pode limpar todo o cache no final do trabalho usando o código abaixo:

spark.catalog.clearCache()

Evite usar funções UDF, a menos que essa seja a única opção:

As funções definidas pelo usuário desserializam cada linha para o objeto, aplicam a função lambda e a serializam novamente, resultando em execução mais lenta e mais tempo de coleta de lixo.

Práticas do PySpark e ajuste de desempenho edf

Uso de Thread sempre que necessário:

Se houver várias ações independentes em um trabalho, você poderá usar um encadeamento para chamar essas ações simultaneamente. Por exemplo, em um trabalho estávamos lendo muitas tabelas enormes de um esquema e gravando em outro esquema. Devido à ação sequencial, o trabalho estava demorando mais de 2 horas. Depois que usamos o thread para escrita simultânea, o tempo de carregamento foi reduzido para 30 minutos. Pe, talvez seja necessário aumentar a configuração da sessão do Spark. Para otimizar o uso da configuração atual da sessão do Spark, você pode emparelhar uma tarefa pequena e mais lenta com uma tarefa maior e mais rápida.

Use mapPartitions() em vez de map():

Ambos são operações baseadas em rdd, mas a partição do mapa é preferida sobre o mapa, pois usando mapPartitions() você pode inicializar uma vez em uma partição completa, enquanto no map() ele faz o mesmo em uma linha de cada vez.

Diversos:

  1. Evite usar count() no quadro de dados se não for necessário. Remova todas as ações que você usou para depurar antes de implantar seu código.
  2. Grave arquivos intermediários ou finais no parquet para reduzir o tempo de leitura e gravação.
  3. Se você quiser ler qualquer arquivo do seu local durante o desenvolvimento, use o master como “local” porque no modo “yarn” você não pode ler do local. No modo de fio, ele faz referência ao HDFS. Portanto, você precisa levar esses arquivos para o local do HDFS para implantação.

Por favor, deixe-me saber se você tiver alguma dúvida. Você também pode sugerir práticas recomendadas adicionais para melhorar o desempenho. Você pode se conectar comigo usando este link.

Fonte da imagem usada: https://unsplash.com/photos/MrVEedTZLwM

As mídias mostradas neste artigo não são propriedade da Analytics Vidhya e são usadas a critério do autor.

PlatoAi. Web3 Reimagined. Inteligência de dados amplificada.
Clique aqui para acessar.

Fonte: https://www.analyticsvidhya.com/blog/2021/08/best-practices-and-performance-tuning-activities-for-pyspark/

local_img

Inteligência mais recente

local_img