Zephyrnet Logo

Trabalhando com funções de janela no PySpark

Data:

Introdução

Aprender sobre funções de janela no PySpark pode ser desafiador, mas vale a pena o esforço. As funções de janela são uma ferramenta poderosa para analisar dados e podem ajudá-lo a obter insights que você não teria visto de outra forma. Ao compreender como usar funções de janela no Spark; você pode levar o seu análise de dados habilidades para o próximo nível e tomar decisões mais informadas. Esteja você trabalhando com grandes ou pequenos conjuntos de dados, aprender funções de janela no Spark permitirá que você manipule e analise dados de maneiras novas e interessantes.

Funções de janela no PySpark

Neste blog, primeiro entenderemos o conceito de funções de janela e, em seguida, discutiremos como usá-las com Spark SQL e PySpark DataFrame API. Para que ao final deste artigo você entenda como utilizar funções de janela com conjuntos de dados reais e obtenha insights essenciais para os negócios.

Objetivos de aprendizagem

  • Compreenda o conceito de funções de janela.
  • Trabalhando com funções de janela usando conjuntos de dados.
  • Descubra os insights usando as funções da janela.
  • Use Spark SQL e API DataFrame para trabalhar com funções de janela.

Este artigo foi publicado como parte do Blogatona de Ciência de Dados.

Índice

O que são funções de janela?

As funções de janela ajudam a analisar dados dentro de um grupo de linhas relacionadas entre si. Eles permitem que os usuários executem transformações complexas nas linhas de um dataframe ou conjunto de dados associados entre si com base em alguns critérios de particionamento e ordenação.

As funções de janela operam em uma partição específica de um dataframe ou conjunto de dados definido por um conjunto de colunas de particionamento. O ORDENAR POR cláusula particiona os dados em uma função de janela para organizá-los em uma ordem específica. As funções de janela executam cálculos em uma janela deslizante de linhas que inclui a linha atual e um subconjunto das linhas anteriores 'e'/'ou' seguintes, conforme especificado no quadro da janela.

Trabalhando com funções de janela no PySpark

Alguns exemplos comuns de funções de janela incluem cálculo de médias móveis, classificação ou classificação de linhas com base em uma coluna ou grupo específico de colunas, calculando totais acumulados e localizando o primeiro ou o último valor em um grupo de linhas. Com as poderosas funções de janela do Spark, os usuários podem realizar análises e agregações complexas em grandes conjuntos de dados com relativa facilidade, tornando-se uma ferramenta popular para grandes informática e análises.

"

Funções de janela em SQL

Spark SQL oferece suporte a três tipos de funções de janela:

  • Funções de classificação: - Essas funções atribuem uma classificação a cada linha em uma partição do conjunto de resultados. Por exemplo, a função ROW_NUMBER() fornece um número sequencial exclusivo para cada linha da partição.
  • Funções analíticas: - Essas funções calculam valores agregados em uma janela de linhas. Por exemplo, a função SUM() calcula a soma de uma coluna em uma janela de linhas.
  • Funções de valor: - Essas funções calculam um valor analítico para cada linha em uma partição, com base nos valores de outras linhas na mesma partição. Por exemplo, a função LAG() retorna o valor de uma coluna da linha anterior na partição.

Criação de DataFrame

Criaremos um dataframe de amostra para que possamos trabalhar praticamente com diferentes funções de janela. Também tentaremos responder algumas perguntas com a ajuda desses dados e funções de janela.

O dataframe contém detalhes dos funcionários, como nome, designação, número do funcionário, data de contratação, salário, etc. No total, temos 8 colunas que são as seguintes:

  • 'empno': Esta coluna contém o número do funcionário.
  • 'ename': Esta coluna contém nomes de funcionários.
  • 'cargo': esta coluna contém informações sobre os cargos dos funcionários.
  • 'hiredate': Esta coluna mostra a data de contratação do funcionário.
  • 'sal': os detalhes do salário estão contidos nesta coluna.
  • 'comm': Esta coluna contém detalhes de comissões de funcionários, se houver.
  • 'deptno': O número do departamento ao qual o funcionário pertence está nesta coluna.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

Agora vamos verificar o esquema:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

Crie uma visualização temporária do DataFrame 'emp_df' com o nome “emp”. Ele nos permite consultar o DataFrame usando a sintaxe SQL no Spark SQL como se fosse uma tabela. A visualização temporária é válida apenas durante a sessão Spark.

emp_df.createOrReplaceTempView("emp")

Resolvendo declarações de problemas usando funções de janela

Aqui estaremos resolvendo várias declarações de problemas usando funções do Windows:

Q1. Classifique o salário dentro de cada departamento.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

Abordagem para código PySpark

  • A função Window particiona os dados por número de departamento usando partitionBy(col('deptno')) e então ordena os dados dentro de cada partição por salário em ordem decrescente usando orderBy(col('sal').desc()). A variável windowSpec contém a especificação final da janela.
  • 'emp_df' é o dataframe que contém dados de funcionários, incluindo colunas para empno, ename, job, deptno e sal.
  • A função de classificação é aplicada à coluna salário usando 'F.rank().over(windowSpec)' na instrução select. A coluna resultante tem um nome alternativo como 'classificação'.
  • Ele criará um dataframe, 'ranking_result_df', que inclui empno, ename, job, deptno e salário. Também possui uma nova coluna, 'classificação', que representa a classificação do salário do funcionário em seu departamento.

Saída:

O resultado tem classificação salarial em cada departamento.

Q2. Classificação densa do salário dentro de cada departamento.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

Abordagem para código PySpark

  • Primeiro, crie uma especificação de janela usando a função Window, que particiona o DataFrame 'emp_df' por deptno e o ordena descendentemente na coluna 'sal'.
  • Em seguida, a função densa_rank() é aplicada sobre a especificação da janela, que atribui uma classificação densa a cada linha dentro de cada partição com base em sua ordem de classificação.
  • Finalmente, um novo DataFrame chamado 'dense_ranking_df' é criado selecionando colunas específicas de emp_df (ou seja, 'empno', 'ename', 'job', 'deptno' e 'sal') e adicionando uma nova coluna 'dense_rank' que contém os valores de classificação densos calculados pela função de janela.
  • Por último, exiba o DataFrame resultante em formato tabular.

Saída:

O resultado tem uma classificação densa em termos salariais.

Q3. Numere a linha dentro de cada departamento.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

Abordagem para código PySpark

  • A primeira linha define uma especificação de janela para o cálculo usando as funções Window.partitionBy() e Window.orderBy(). Esta janela é particionada pela coluna deptno e ordenada pela coluna sal em ordem decrescente.
  • A segunda linha cria um novo DataFrame chamado 'row_num_df', uma projeção de 'emp_df' com uma coluna adicional chamada 'row_num' e contém os detalhes dos números das linhas.
  • A função show() exibe o DataFrame resultante, que mostra as colunas empno, ename, job, deptno, sal e row_num de cada funcionário.

Saída:

A saída terá o número da linha de cada funcionário de seu departamento com base em seu salário.

Q4. A soma total do salário dentro de cada departamento.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

Abordagem para código PySpark

  • Primeiro, uma especificação de janela é definida usando os métodos “Window.partitionBy()” e “Window.orderBy()”. O método “partitionBy()” particiona os dados pela coluna “deptno”, enquanto o método “orderBy()” ordena os dados pela coluna “sal” em ordem decrescente.
  • Em seguida, a função “sum()” é aplicada à coluna “sal” usando o método “over()” para calcular o total acumulado de salários dentro de cada departamento. O resultado estará em um novo DataFrame chamado “running_sum_sal_df”, que contém as colunas 'empno', 'ename', 'job', 'deptno', 'sal' e 'running_total'.
  • Finalmente, o método “show()” é chamado no DataFrame “running_sum_sal_df” para exibir a saída da consulta. O DataFrame resultante mostra o total de salários de cada funcionário e outros detalhes como nome, número do departamento e cargo.

Saída:

A saída terá um total contínuo dos dados salariais de cada departamento.

Q5: O próximo salário dentro de cada departamento.

Para encontrar o próximo salário dentro de cada departamento usamos a função LEAD. 

A função de janela lead() ajuda a obter o valor da expressão na próxima linha da partição da janela. Ele retorna uma coluna para cada coluna de entrada, onde cada coluna conterá o valor da coluna de entrada para a linha de deslocamento acima da linha atual na partição da janela. A sintaxe para a função lead é: - lead(col, offset=1, default=None).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

Abordagem para código PySpark

  • Primeiro, a função de janela ajuda a particionar as linhas do DataFrame por número de departamento (deptno) e ordenar os salários em ordem decrescente dentro de cada partição.
  • A função lead() é então aplicada à coluna ordenada 'sal' dentro de cada partição para retornar o salário do funcionário seguinte (com um deslocamento de 1), e o valor padrão é 0 caso não haja nenhum próximo funcionário.
  • O DataFrame resultante 'next_salary_df' contém colunas para o número do funcionário (empno), nome (ename), cargo (cargo), número do departamento (deptno), salário atual (sal) e próximo salário (next_val).

Saída:

A saída contém o salário do próximo funcionário do departamento com base na ordem decrescente de salário. 

Q6. Salário anterior dentro de cada departamento.

Para calcular o salário anterior, utilizamos a função LAG.

A função lag retorna o valor de uma expressão em um determinado deslocamento antes da linha atual na partição da janela. A sintaxe da função lag é: - lag(expr, offset=1, default=None).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

Abordagem para código PySpark

  • O window.partitionBy(col('deptno')) especifica a partição da janela. Isso significa que a função janela funciona separadamente para cada departamento.
  • Então orderBy(col('sal').desc()) especifica a ordem do salário e ordenará os salários dentro de cada departamento em ordem decrescente.
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') cria uma nova coluna chamada prev_val no DataFrame 'prev_sal_df'.
  • Para cada linha, esta coluna contém o valor da coluna 'sal' da linha anterior dentro da janela definida pelo windowSpec.
  • O parâmetro offset=1 indica que a linha anterior deve estar uma linha antes da linha atual e default=0 especifica o valor padrão para a primeira linha em cada partição (já que não há linha anterior para a primeira linha).
  • Finalmente, prev_sal_df.show() exibe o DataFrame resultante.

Saída:

A saída representa o salário anterior de cada funcionário de cada departamento, com base na ordenação dos salários em ordem decrescente.

Q7. Primeiro salário dentro de cada departamento e comparando com todos os membros de cada departamento.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

Abordagem para código PySpark

  • Primeiro, crie um objeto WindowSpec que particione os dados por número de departamento (deptno) e os ordene por salário (sal) em ordem decrescente.
  • Em seguida, aplica a função analítica first() à coluna 'sal' na janela definida por windowSpec. Esta função retorna o primeiro valor da coluna 'sal' dentro de cada partição (ou seja, cada grupo deptno) ordenado por 'sal' decrescente. A coluna resultante tem um novo nome, 'first_val'.
  • Agora atribui o DataFrame resultante, que contém as colunas selecionadas e uma nova coluna, 'first_val', que mostra o primeiro salário mais alto de cada departamento com base na ordem decrescente dos valores salariais, a uma nova variável chamada 'first_value_df'.

Saída:

A saída mostra o primeiro salário mais alto para cada departamento em um DataFrame de funcionário.

Conclusão

Neste artigo, aprendemos sobre as funções da janela. Spark SQL tem três tipos de funções de janela: funções de classificação, funções agregadas e funções de valor. Usando esta função, trabalhamos em um conjunto de dados para encontrar alguns insights importantes e valiosos. As funções do Spark Window oferecem ferramentas poderosas de análise de dados, como classificação, análise e cálculos de valor. Seja analisando insights salariais por departamento ou empregando exemplos práticos com PySpark e SQL, essas funções fornecem ferramentas essenciais para processamento e análise de dados eficazes no Spark.

Principais lições

  • Aprendemos sobre as funções da janela e trabalhamos com elas usando Spark SQL e PySpark DataFrame API.
  • Usamos funções como rank, denso_rank, row_number, lag, lead, groupBy, partiçãoBy e outras funções para fornecer uma análise adequada.
  • Também vimos as soluções detalhadas passo a passo para o problema e analisamos o resultado no final de cada definição do problema.

Este estudo de caso ajuda você a entender melhor as funções do PySpark. Se você tiver alguma opinião ou dúvida, comente abaixo. Conecte-se comigo em LinkedIn para uma discussão mais aprofundada. Continue aprendendo!!!

A mídia mostrada neste artigo não é propriedade da Analytics Vidhya e é usada a critério do Autor.

local_img

Inteligência mais recente

local_img