Logo Zephyrnet

Lavorare con le funzioni finestra in PySpark

Data:

Introduzione

Imparare a conoscere le funzioni delle finestre in PySpark può essere impegnativo ma ne vale la pena. Le funzioni finestra sono un potente strumento per analizzare i dati e possono aiutarti a ottenere informazioni che altrimenti non avresti potuto vedere. Comprendendo come utilizzare le funzioni Finestra in Spark; puoi prendere il tuo analisi dei dati competenze al livello successivo e prendere decisioni più informate. Sia che tu stia lavorando con grandi o piccoli dataset, l'apprendimento delle funzioni finestra in Spark ti consentirà di manipolare e analizzare i dati in modi nuovi ed entusiasmanti.

Funzioni della finestra in PySpark

In questo blog, comprenderemo prima il concetto di funzioni finestra e poi discuteremo come utilizzarle con Spark SQL e l'API PySpark DataFrame. In questo modo, entro la fine di questo articolo, capirai come utilizzare le funzioni della finestra con set di dati reali e ottenere informazioni essenziali per il business.

obiettivi formativi

  • Comprendere il concetto di funzioni finestra.
  • Lavorare con le funzioni della finestra utilizzando i set di dati.
  • Scopri gli approfondimenti utilizzando le funzioni della finestra.
  • Utilizza Spark SQL e l'API DataFrame per lavorare con le funzioni della finestra.

Questo articolo è stato pubblicato come parte di Blogathon sulla scienza dei dati.

Sommario

Cosa sono le funzioni della finestra?

Le funzioni della finestra aiutano ad analizzare i dati all'interno di un gruppo di righe correlate tra loro. Consentono agli utenti di eseguire trasformazioni complesse sulle righe di un dataframe o di un set di dati associati tra loro in base ad alcuni criteri di partizionamento e ordinamento.

Le funzioni della finestra operano su una partizione specifica di un dataframe o di un set di dati definito da un insieme di colonne di partizionamento. IL ORDINATO DA La clausola suddivide i dati in una funzione finestra per disporli in un ordine specifico. Le funzioni della finestra eseguono quindi i calcoli su una finestra scorrevole di righe che include la riga corrente e un sottoinsieme delle righe precedenti "e"/"o" successive, come specificato nel riquadro della finestra.

Lavorare con le funzioni finestra in PySpark

Alcuni esempi comuni di funzioni della finestra includono il calcolo delle medie mobili, la classificazione o l'ordinamento delle righe in base a una colonna o a un gruppo specifico di colonne, calcolando i totali parziali e trovando il primo o l'ultimo valore in un gruppo di righe. Con le potenti funzioni della finestra di Spark, gli utenti possono eseguire analisi e aggregazioni complesse su grandi quantità di dati con relativa facilità, rendendolo uno strumento popolare per i big elaborazione dati e analisi.

"

Funzioni della finestra in SQL

Spark SQL supporta tre tipi di funzioni finestra:

  • Funzioni di classifica: - Queste funzioni assegnano un rango a ciascuna riga all'interno di una partizione del set di risultati. Ad esempio, la funzione ROW_NUMBER() assegna un numero sequenziale univoco a ciascuna riga all'interno della partizione.
  • Funzioni di analisi: - Queste funzioni calcolano valori aggregati su una finestra di righe. Ad esempio, la funzione SUM() calcola la somma di una colonna su una finestra di righe.
  • Funzioni valore: - Queste funzioni calcolano un valore analitico per ogni riga in una partizione, in base ai valori di altre righe nella stessa partizione. Ad esempio, la funzione LAG() restituisce il valore di una colonna della riga precedente nella partizione.

Creazione di frame di dati

Creeremo un dataframe di esempio in modo da poter praticamente lavorare con diverse funzioni della finestra. Inoltre proveremo a rispondere ad alcune domande con l'aiuto di questi dati e delle funzioni della finestra.

Il dataframe contiene i dettagli dei dipendenti come nome, designazione, numero del dipendente, data di assunzione, stipendio ecc. In totale abbiamo 8 colonne che sono le seguenti:

  • 'empno': questa colonna contiene il numero del dipendente.
  • 'ename': questa colonna contiene i nomi dei dipendenti.
  • 'lavoro': questa colonna contiene informazioni sui titoli professionali dei dipendenti.
  • 'data di assunzione': questa colonna mostra la data di assunzione del dipendente.
  • 'sal': i dettagli dello stipendio sono contenuti in questa colonna.
  • 'comm': questa colonna contiene i dettagli delle commissioni dei dipendenti, se presenti.
  • 'deptno': in questa colonna si trova il numero del dipartimento a cui appartiene il dipendente.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

Ora controlleremo lo schema:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

Crea una vista temporanea del DataFrame 'emp_df' con il nome "emp". Ci consente di interrogare il DataFrame utilizzando la sintassi SQL in Spark SQL come se fosse una tabella. La visualizzazione temporanea è valida solo per la durata della sessione Spark.

emp_df.createOrReplaceTempView("emp")

Risoluzione delle dichiarazioni di problemi utilizzando le funzioni della finestra

Qui risolveremo diverse affermazioni di problemi utilizzando le funzioni di Windows:

Q1. Classifica lo stipendio all'interno di ciascun dipartimento.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

Approccio per il codice PySpark

  • La funzione Window suddivide i dati in base al numero del dipartimento utilizzando distributionBy(col('deptno')) e quindi ordina i dati all'interno di ciascuna partizione in base allo stipendio in ordine decrescente utilizzando orderBy(col('sal').desc()). La variabile windowSpec contiene la specificazione finale della finestra.
  • 'emp_df' è il dataframe che contiene i dati dei dipendenti, incluse le colonne per empno, ename, job, deptno e sal.
  • La funzione di classificazione viene applicata alla colonna stipendio utilizzando 'F.rank().over(windowSpec)' all'interno dell'istruzione select. La colonna risultante ha un nome alias come "rank".
  • Creerà un dataframe, "ranking_result_df", che include empno, ename, job, deptno e stipendio. Ha anche una nuova colonna, "rango", che rappresenta il rango dello stipendio del dipendente all'interno del suo dipartimento.

Produzione:

Il risultato ha un livello salariale in ciascun dipartimento.

Q2. Classifica densa dello stipendio all'interno di ciascun dipartimento.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

Approccio per il codice PySpark

  • Innanzitutto, crea una specifica della finestra utilizzando la funzione Window, che partiziona il DataFrame "emp_df" per deptno e lo ordina discendendo nella colonna "sal".
  • Quindi, la funzione dense_rank() viene applicata alla specifica della finestra, che assegna un rango denso a ciascuna riga all'interno di ciascuna partizione in base al suo ordinamento.
  • Infine, viene creato un nuovo DataFrame chiamato 'dense_ranking_df' selezionando colonne specifiche da emp_df (ad esempio, 'empno', 'ename', 'job', 'deptno' e 'sal') e aggiungendo una nuova colonna 'dense_rank' che contiene i valori di classificazione densa calcolati dalla funzione finestra.
  • Infine, visualizza il DataFrame risultante in formato tabellare.

Produzione:

Il risultato ha un rango denso dal punto di vista salariale.

Q3. Numerare la riga all'interno di ciascun reparto.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

Approccio per il codice PySpark

  • La prima riga definisce una specifica della finestra per il calcolo utilizzando le funzioni Window.partitionBy() e Window.orderBy(). Questa finestra è partizionata dalla colonna deptno e ordinata dalla colonna sal in ordine decrescente.
  • La seconda riga crea un nuovo DataFrame chiamato 'row_num_df', una proiezione di 'emp_df' con una colonna aggiuntiva chiamata 'row_num' e contiene i dettagli dei numeri di riga.
  • La funzione show() visualizza il DataFrame risultante, che mostra le colonne empno, ename, job, deptno, sal e row_num di ciascun dipendente.

Produzione:

L'output avrà il numero di riga di ciascun dipendente all'interno del proprio dipartimento in base al suo stipendio.

Q4. Somma totale corrente dello stipendio all'interno di ciascun dipartimento.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

Approccio per il codice PySpark

  • Innanzitutto, viene definita la specifica di una finestra utilizzando i metodi “Window.partitionBy()” e “Window.orderBy()”. Il metodo “partitionBy()” suddivide i dati in base alla colonna “deptno”, mentre il metodo “orderBy()” ordina i dati in base alla colonna “sal” in ordine decrescente.
  • Successivamente, la funzione “sum()” viene applicata alla colonna “sal” utilizzando il metodo “over()” per calcolare il totale parziale degli stipendi all’interno di ciascun dipartimento. Il risultato sarà in un nuovo DataFrame chiamato "running_sum_sal_df", che contiene le colonne "empno", "ename", "job", "deptno", "sal" e "running_total".
  • Infine, il metodo "show()" viene chiamato sul DataFrame "running_sum_sal_df" per visualizzare l'output della query. Il DataFrame risultante mostra il totale parziale degli stipendi di ciascun dipendente e altri dettagli come nome, numero di dipartimento e lavoro.

Produzione:

L'output avrà un totale parziale dei dati sugli stipendi di ciascun dipartimento.

D5: Lo stipendio successivo all'interno di ciascun dipartimento.

Per trovare lo stipendio successivo all'interno di ciascun dipartimento utilizziamo la funzione LEAD. 

La funzione finestra lead() aiuta a ottenere il valore dell'espressione nella riga successiva della partizione della finestra. Restituisce una colonna per ogni colonna di input, dove ciascuna colonna conterrà il valore della colonna di input per la riga di offset sopra la riga corrente all'interno della partizione della finestra. La sintassi per la funzione lead è:- lead(col, offset=1, default=None).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

Approccio per il codice PySpark

  • Innanzitutto, la funzione window aiuta a partizionare le righe del DataFrame in base al numero del dipartimento (deptno) e a ordinare gli stipendi in ordine decrescente all'interno di ciascuna partizione.
  • La funzione lead() viene quindi applicata alla colonna ordinata 'sal' all'interno di ciascuna partizione per restituire lo stipendio del dipendente successivo (con un offset pari a 1) e il valore predefinito è 0 nel caso in cui non vi sia alcun dipendente successivo.
  • Il DataFrame risultante "next_salary_df" contiene colonne per il numero del dipendente (empno), il nome (ename), la qualifica lavorativa (job), il numero del dipartimento (deptno), lo stipendio corrente (sal) e lo stipendio successivo (next_val).

Produzione:

L'output contiene lo stipendio del dipendente successivo nel dipartimento in base all'ordine decrescente dello stipendio. 

Q6. Stipendio precedente all'interno di ciascun dipartimento.

Per calcolare lo stipendio precedente, utilizziamo la funzione LAG.

La funzione lag restituisce il valore di un'espressione ad un dato offset prima della riga corrente all'interno della partizione della finestra. La sintassi della funzione lag è:- lag(expr, offset=1, default=None).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

Approccio per il codice PySpark

  • window.partitionBy(col('deptno')) specifica la partizione della finestra. Ciò significa che la funzione finestra funziona separatamente per ciascun reparto.
  • Quindi orderBy(col('sal').desc()) specifica l'ordine dello stipendio e ordinerà gli stipendi all'interno di ciascun dipartimento in ordine decrescente.
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') crea una nuova colonna chiamata prev_val nel DataFrame 'prev_sal_df'.
  • Per ogni riga, questa colonna contiene il valore della colonna 'sal' della riga precedente all'interno della finestra definita da windowSpec.
  • Il parametro offset=1 indica che la riga precedente dovrebbe essere una riga prima della riga corrente e default=0 specifica il valore predefinito per la prima riga in ogni partizione (poiché non esiste una riga precedente per la prima riga).
  • Infine, prev_sal_df.show() visualizza il DataFrame risultante.

Produzione:

L'output rappresenta lo stipendio precedente di ciascun dipendente all'interno di ciascun reparto, in base all'ordinamento degli stipendi in ordine decrescente.

D7. Primo stipendio all'interno di ciascun dipartimento e confronto con ogni membro all'interno di ciascun dipartimento.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

Approccio per il codice PySpark

  • Innanzitutto, crea un oggetto WindowSpec che suddivide i dati in base al numero del dipartimento (deptno) e li ordina in base allo stipendio (sal) in ordine decrescente.
  • Quindi applica la funzione analitica first() alla colonna 'sal' sulla finestra definita da windowSpec. Questa funzione restituisce il primo valore della colonna 'sal' all'interno di ciascuna partizione (cioè ciascun gruppo reparto) ordinato in ordine discendente 'sal'. La colonna risultante ha un nuovo nome, "first_val".
  • Ora assegna il DataFrame risultante, che contiene le colonne selezionate e una nuova colonna, "first_val", che mostra il primo stipendio più alto per ciascun dipartimento in base all'ordine decrescente dei valori di stipendio, a una nuova variabile chiamata "first_value_df".

Produzione:

L'output mostra il primo stipendio più alto per ciascun dipartimento in un DataFrame del dipendente.

Conclusione

In questo articolo impareremo le funzioni della finestra. Spark SQL dispone di tre tipi di funzioni finestra: funzioni di classificazione, funzioni di aggregazione e funzioni di valore. Utilizzando questa funzione, abbiamo lavorato su un set di dati per trovare alcuni spunti importanti e preziosi. Le funzioni Spark Window offrono potenti strumenti di analisi dei dati come classificazione, analisi e calcolo del valore. Che si tratti di analizzare informazioni dettagliate sugli stipendi per dipartimento o di utilizzare esempi pratici con PySpark e SQL, queste funzioni forniscono strumenti essenziali per un'efficace elaborazione e analisi dei dati in Spark.

Punti chiave

  • Abbiamo appreso le funzioni della finestra e abbiamo lavorato con esse utilizzando Spark SQL e l'API PySpark DataFrame.
  • Utilizziamo funzioni come rango, dense_rank, row_number, lag, lead, groupBy, partitionBy e altre funzioni per fornire un'analisi corretta.
  • Abbiamo anche visto le soluzioni dettagliate passo passo al problema e analizzato l'output alla fine di ogni dichiarazione del problema.

Questo case study ti aiuta a comprendere meglio le funzioni PySpark. Se hai opinioni o domande, commenta qui sotto. Connettiti con me su LinkedIn per ulteriore discussione. Continua ad imparare!!!

I media mostrati in questo articolo non sono di proprietà di Analytics Vidhya e vengono utilizzati a discrezione dell'autore.

spot_img

L'ultima intelligenza

spot_img