Logo Zephyrnet

Lucrul cu funcții de fereastră în PySpark

Data:

Introducere

A învăța despre funcțiile ferestrei în PySpark poate fi o provocare, dar merită efortul. Funcțiile ferestrelor sunt un instrument puternic pentru analiza datelor și vă pot ajuta să obțineți informații pe care poate nu le-ați fi văzut altfel. Înțelegând cum să utilizați funcțiile Window în Spark; iti poti lua analiza datelor abilități la nivelul următor și luați decizii mai informate. Indiferent dacă lucrați cu mari sau mici seturi de date, învățarea Funcțiilor ferestrei în Spark vă va permite să manipulați și să analizați datele în moduri noi și interesante.

Funcțiile ferestrei în PySpark

În acest blog, vom înțelege mai întâi conceptul de funcții de fereastră și apoi vom discuta cum să le folosim cu Spark SQL și PySpark DataFrame API. Astfel încât, până la sfârșitul acestui articol, veți înțelege cum să utilizați funcțiile ferestrei cu seturi de date reale și să obțineți informații esențiale pentru afaceri.

obiective de invatare

  • Înțelegeți conceptul de funcții ale ferestrei.
  • Lucrul cu funcții de fereastră folosind seturi de date.
  • Aflați informațiile folosind funcțiile ferestrei.
  • Utilizați Spark SQL și API-ul DataFrame pentru a lucra cu funcții de fereastră.

Acest articol a fost publicat ca parte a Blogathonul științei datelor.

Cuprins

Ce sunt funcțiile ferestrei?

Funcțiile ferestrelor ajută la analiza datelor dintr-un grup de rânduri care sunt legate între ele. Acestea permit utilizatorilor să efectueze transformări complexe pe rândurile unui cadru de date sau ale unui set de date asociate între ele pe baza unor criterii de partiționare și ordonare.

Funcțiile ferestrei operează pe o anumită partiție a unui cadru de date sau a unui set de date definit de un set de coloane de partiționare. The COMANDA DE clauza partiţionează datele într-o funcţie de fereastră pentru a le aranja într-o anumită ordine. Funcțiile ferestrei efectuează apoi calcule pe o fereastră glisantă de rânduri care include rândul curent și un subset al rândurilor precedente, fie „și”/„sau” următoare, așa cum este specificat în cadrul ferestrei.

Lucrul cu funcții de fereastră în PySpark

Câteva exemple comune de funcții ferestre includ calcularea mediilor mobile, clasarea sau sortarea rândurilor pe baza unei anumite coloane sau a unui grup de coloane, calculând totalurile cumulate și găsirea primei sau ultimei valori dintr-un grup de rânduri. Cu funcțiile puternice ale ferestrei Spark, utilizatorii pot efectua analize complexe și agregari seturi de date mari cu relativă ușurință, făcându-l un instrument popular pentru mari de prelucrare a datelor și analitică.

"

Funcții de fereastră în SQL

Spark SQL acceptă trei tipuri de funcții de fereastră:

  • Funcții de clasare: - Aceste funcții atribuie un rang fiecărui rând dintr-o partiție a setului de rezultate. De exemplu, funcția ROW_NUMBER() oferă un număr secvenţial unic fiecărui rând din partiţie.
  • Funcții de analiză:- Aceste funcții calculează valorile agregate pe o fereastră de rânduri. De exemplu, funcția SUM() calculează suma unei coloane peste o fereastră de rânduri.
  • Funcții valorice:- Aceste funcții calculează o valoare analitică pentru fiecare rând dintr-o partiție, pe baza valorilor altor rânduri din aceeași partiție. De exemplu, funcția LAG() returnează valoarea unei coloane din rândul anterior din partiție.

Creare DataFrame

Vom crea un exemplu de cadru de date astfel încât să putem lucra practic cu diferite funcții de fereastră. De asemenea, vom încerca să răspundem la câteva întrebări cu ajutorul acestor date și funcții de fereastră.

Cadrul de date conține detalii despre angajați, cum ar fi numele, denumirea, numărul de angajat, data angajării, salariul etc. În total, avem 8 coloane care sunt după cum urmează:

  • „empno”: Această coloană conține numărul angajatului.
  • „ename”: această coloană are numele angajaților.
  • „job”: această coloană conține informații despre funcțiile angajaților.
  • „hiredate”: această coloană arată data angajării angajatului.
  • 'sal': detaliile salariale conțin în această coloană.
  • „comm”: această coloană conține detalii despre comisionul angajaților, dacă există.
  • „deptno”: numărul departamentului căruia îi aparține angajatul este în această coloană.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

Acum vom verifica schema:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

Creați o vizualizare temporară a DataFrame-ului „emp_df” cu numele „emp”. Ne permite să interogăm DataFrame folosind sintaxa SQL în Spark SQL ca și cum ar fi un tabel. Vizualizarea temporară este valabilă numai pe durata Sesiunii Spark.

emp_df.createOrReplaceTempView("emp")

Rezolvarea enunțurilor de problemă utilizând funcțiile ferestrei

Aici vom rezolva mai multe declarații de problemă folosind funcțiile Windows:

Î1. Clasează salariul în fiecare departament.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

Abordare pentru codul PySpark

  • Funcția Window partiţionează datele după numărul de departament folosind partitionBy(col('deptno')) și apoi ordonează datele din fiecare partiție în ordine descrescătoare a salariului folosind orderBy(col('sal').desc()). Variabila windowSpec deține specificația finală a ferestrei.
  • „emp_df” este cadrul de date care conține datele angajaților, inclusiv coloanele pentru empno, ename, job, deptno și sal.
  • Funcția de rang este aplicată coloanei de salariu folosind „F.rank().over(windowSpec)” din declarația select. Coloana rezultată are un nume de alias ca „rank”.
  • Acesta va crea un cadru de date, „ranking_result_df”, care include empno, ename, job, deptno și salariu. De asemenea, are o nouă coloană, „rank”, care reprezintă rangul salariului angajatului în cadrul departamentului său.

ieșire:

Rezultatul are rangul salarial în fiecare departament.

Q2. Clasificarea densă a salariului în cadrul fiecărui departament.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

Abordare pentru codul PySpark

  • Mai întâi, creați o specificație de fereastră utilizând funcția Window, care partiționează DataFrame „emp_df” după deptno și îl ordonează coborând coloana „sal”.
  • Apoi, funcția dense_rank() este aplicată peste specificația ferestrei, care atribuie un rang dens fiecărui rând din fiecare partiție pe baza ordinii sale sortate.
  • În cele din urmă, un nou DataFrame numit „dense_ranking_df” este creat prin selectarea unor coloane specifice din emp_df (adică, „empno”, „ename”, „job”, „deptno” și „sal”) și adăugând o nouă coloană „dense_rank” care conține valorile dense de clasare calculate de funcția fereastră.
  • În sfârșit, afișați DataFrame rezultat în format tabelar.

ieșire:

Rezultatul are un rang dens din punct de vedere al salariului.

Q3. Numerotați rândul din fiecare departament.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

Abordare pentru codul PySpark

  • Prima linie definește o specificație de fereastră pentru calcul folosind funcțiile Window.partitionBy() și Window.orderBy(). Această fereastră este împărțită de coloana deptno și ordonată de coloana sal în ordine descrescătoare.
  • A doua linie creează un nou DataFrame numit „row_num_df”, o proiecție a „emp_df” cu o coloană suplimentară numită „row_num” și conține detaliile numerelor de rând.
  • Funcția show() afișează DataFrame rezultat, care arată coloanele empno, ename, job, deptno, sal și row_num ale fiecărui angajat.

ieșire:

Ieșirea va avea numărul de rând al fiecărui angajat din departamentul său, în funcție de salariu.

Î4. Suma totală a salariului în fiecare departament.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

Abordarea pentru codul PySpark

  • Mai întâi, o specificație de fereastră este definită folosind metodele „Window.partitionBy()” și „Window.orderBy()”. Metoda „partitionBy()” partiţionează datele după coloana „deptno”, în timp ce metoda „orderBy()” ordonează datele după coloana „sal” în ordine descrescătoare.
  • Apoi, funcția „sum()” este aplicată coloanei „sal” folosind metoda „over()” pentru a calcula totalul curent al salariilor din fiecare departament. Rezultatul va fi într-un nou DataFrame numit „running_sum_sal_df”, care conține coloanele „empno”, „ename”, „job”, „deptno”, „sal” și „running_total”.
  • În cele din urmă, metoda „show()” este apelată pe DataFrame „running_sum_sal_df” pentru a afișa rezultatul interogării. DataFrame rezultat arată totalul total al salariilor fiecărui angajat și alte detalii precum numele, numărul departamentului și locul de muncă.

ieșire:

Ieșirea va avea un total curent al datelor de salariu ale fiecărui departament.

Q5: Următorul salariu din cadrul fiecărui departament.

Pentru a găsi următorul salariu în cadrul fiecărui departament folosim funcția LEAD. 

Funcția de fereastră lead() ajută la obținerea valorii expresiei din următorul rând al partiției ferestrei. Returnează o coloană pentru fiecare coloană de intrare, unde fiecare coloană va conține valoarea coloanei de intrare pentru rândul de compensare de deasupra rândului curent din partiția ferestrei. Sintaxa pentru funcția lead este:- lead(col, offset=1, default=None).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

Abordare pentru codul PySpark

  • În primul rând, funcția fereastră ajută la împărțirea rândurilor DataFrame după numărul departamentului (deptno) și la ordonarea salariilor în ordine descrescătoare în cadrul fiecărei partiții.
  • Funcția lead() este apoi aplicată coloanei ordonate „sal” din cadrul fiecărei partiții pentru a returna salariul următorului angajat (cu un offset de 1), iar valoarea implicită este 0 în cazul în care nu există următorul angajat.
  • DataFrame-ul rezultat „next_salary_df” conține coloane pentru numărul angajatului (empno), numele (ename), titlul postului (job), numărul departamentului (deptno), salariul curent (sal) și următorul salariu (next_val).

ieșire:

Ieșirea conține salariul următorului angajat din departament în funcție de ordinea salariului descendent. 

Î6. Salariu anterior in cadrul fiecarui departament.

Pentru a calcula salariul anterior, folosim funcția LAG.

Funcția lag returnează valoarea unei expresii la un offset dat înainte de rândul curent din partiția ferestrei. Sintaxa funcției lag este:- lag(expr, offset=1, default=None).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

Abordare pentru codul PySpark

  • window.partitionBy(col('deptno')) specifică partiția ferestrei. Înseamnă că funcția fereastră funcționează separat pentru fiecare departament.
  • Apoi orderBy(col('sal').desc()) specifică ordinea salariului și va ordona salariile în cadrul fiecărui departament în ordine descrescătoare.
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') creează o nouă coloană numită prev_val în DataFrame 'prev_sal_df'.
  • Pentru fiecare rând, această coloană conține valoarea coloanei „sal” din rândul anterior din fereastra definită de windowSpec.
  • Parametrul offset=1 indică faptul că rândul anterior trebuie să fie cu un rând înaintea rândului curent, iar default=0 specifică valoarea implicită pentru primul rând din fiecare partiție (din moment ce nu există un rând anterior pentru primul rând).
  • În cele din urmă, prev_sal_df.show() afișează DataFrame rezultat.

ieșire:

Ieșirea reprezintă salariul anterior pentru fiecare angajat din cadrul fiecărui departament, pe baza ordonării salariilor în ordine descrescătoare.

Î7. Primul salariu din fiecare departament și comparație cu fiecare membru din fiecare departament.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

Abordare pentru codul PySpark

  • Mai întâi, creați un obiect WindowSpec care parțiază datele după numărul departamentului (deptno) și le ordonează după salariu (sal) în ordine descrescătoare.
  • Apoi aplică prima funcție analitică () la coloana „sal” peste fereastra definită de windowSpec. Această funcție returnează prima valoare a coloanei „sal” din cadrul fiecărei partiții (adică fiecare grup de deptno) ordonată prin „sal” descendent. Coloana rezultată are un nou nume, „first_val”.
  • Acum atribuie DataFrame rezultat, care conține coloanele selectate și o nouă coloană, „first_val”, care arată primul cel mai mare salariu pentru fiecare departament pe baza ordinii descrescătoare a valorilor salariale, unei noi variabile numite „first_value_df”.

ieșire:

Rezultatul arată primul cel mai mare salariu pentru fiecare departament dintr-un DataFrame de angajat.

Concluzie

În acest articol, aflăm despre funcțiile ferestrei. Spark SQL are trei tipuri de funcții de fereastră: funcții de clasare, funcții de agregare și funcții de valoare. Folosind această funcție, am lucrat la un set de date pentru a găsi câteva informații importante și valoroase. Spark Window Functions oferă instrumente puternice de analiză a datelor, cum ar fi clasare, analiză și calcule de valoare. Fie că analizează statisticile salariale în funcție de departament sau folosesc exemple practice cu PySpark și SQL, aceste funcții oferă instrumente esențiale pentru procesarea și analiza eficientă a datelor în Spark.

Intrebari cu cheie

  • Am aflat despre funcțiile ferestrei și am lucrat cu ele folosind Spark SQL și PySpark DataFrame API.
  • Folosim funcții precum rank, dense_rank, row_number, lag, lead, groupBy, partitionBy și alte funcții pentru a oferi o analiză adecvată.
  • Am văzut, de asemenea, soluțiile detaliate pas cu pas ale problemei și am analizat rezultatul la sfârșitul fiecărei enunțuri a problemei.

Acest studiu de caz vă ajută să înțelegeți mai bine funcțiile PySpark. Dacă aveți opinii sau întrebări, atunci comentați mai jos. Conectează-te cu mine LinkedIn pentru discuții ulterioare. Continua sa inveti!!!

Media prezentată în acest articol nu este deținută de Analytics Vidhya și este utilizată la discreția Autorului.

spot_img

Ultimele informații

spot_img