Zephyrnet logó

Az ablakfüggvények használata a PySparkban

Találka:

Bevezetés

A PySpark ablakfunkcióinak megismerése kihívást jelenthet, de megéri az erőfeszítést. A Window Functions hatékony eszköz az adatok elemzéséhez, és segíthet olyan betekintést nyerni, amelyet egyébként nem látott volna. A Spark ablakfunkcióinak használatának megértésével; viheti a sajátját adatelemzés készségeket a következő szintre emelni, és megalapozottabb döntéseket hozni. Akár nagy, akár kicsivel dolgozik adatkészletek, A Spark ablakfunkcióinak megtanulása lehetővé teszi az adatok új és izgalmas módokon történő kezelését és elemzését.

Ablakfunkciók a PySparkban

Ebben a blogban először megértjük az ablakfüggvények fogalmát, majd megvitatjuk, hogyan használhatjuk őket a Spark SQL-lel és a PySpark DataFrame API-val. Ennek érdekében a cikk végére megérti, hogyan használhatja az ablakfüggvényeket valós adatkészletekkel, és alapvető betekintést nyerhet az üzleti élet számára.

Tanulási célok

  • Ismerje meg az ablakfüggvények fogalmát.
  • Munka ablakfüggvényekkel adatkészletek segítségével.
  • Ismerje meg a betekintést az ablakfunkciók segítségével.
  • Használja a Spark SQL-t és a DataFrame API-t az ablakfüggvények kezeléséhez.

Ez a cikk részeként jelent meg Adattudományi Blogaton.

Tartalomjegyzék

Mik azok az ablakfunkciók?

Az ablakfüggvények segítenek elemezni az egymáshoz kapcsolódó sorok csoportján belüli adatokat. Lehetővé teszik a felhasználók számára, hogy összetett átalakításokat hajtsanak végre egy adatkeret vagy adatkészlet egymáshoz társított sorain bizonyos particionálási és rendezési feltételek alapján.

Az ablakfüggvények a particionáló oszlopok által meghatározott adatkeret vagy adatkészlet egy adott partícióján működnek. A RENDEZÉS záradék particionálja az adatokat egy ablakfüggvényben, hogy meghatározott sorrendbe rendezze azokat. Az ablakfüggvények ezután számításokat hajtanak végre a sorokból álló csúszó ablakon, amely tartalmazza az aktuális sort és a megelőző „és”/„vagy” következő sorok egy részhalmazát, az ablakkeretben meghatározottak szerint.

Az ablakfüggvények használata a PySparkban

Néhány gyakori példa az ablakfunkciókra: a mozgóátlagok kiszámítása, a sorok rangsorolása vagy rendezése egy adott oszlop vagy csoport alapján. oszlopok, a futó összegek kiszámítása és az első vagy utolsó érték megtalálása egy sorcsoportban. A Spark hatékony ablakfunkcióival a felhasználók összetett elemzéseket és összesítéseket hajthatnak végre nagy adatkészletek viszonylag könnyen, így népszerű eszköz a nagyok számára adatfeldolgozás és elemzés.

"

Ablakfunkciók SQL-ben

A Spark SQL háromféle ablakfunkciót támogat:

  • Rangsorolási funkciók: - Ezek a függvények rangot rendelnek az eredményhalmaz egy partícióján belül minden sorhoz. Például a ROW_NUMBER() függvény egyedi sorszámot ad a partíción belül minden sorhoz.
  • Analitikai funkciók: - Ezek a függvények összesített értékeket számítanak ki egy sorablakon keresztül. Például a SUM() függvény kiszámítja egy oszlop összegét egy sorablak felett.
  • Értékfüggvények: - Ezek a függvények egy analitikai értéket számítanak ki a partíció minden sorához, ugyanazon partíció többi sorának értékei alapján. Például a LAG() függvény egy oszlop értékét adja vissza a partíció előző sorából.

DataFrame létrehozása

Létrehozunk egy minta adatkeretet, hogy gyakorlatilag különböző ablakfüggvényekkel dolgozhassunk. Ezen adatok és ablakfüggvények segítségével néhány kérdésre is megpróbálunk választ adni.

Az adatkeret az alkalmazottak adatait tartalmazza, például a nevüket, a beosztásukat, az alkalmazottak számát, a felvétel dátumát, a fizetést stb. Összesen 8 oszlopunk van, amelyek a következők:

  • 'empno': Ez az oszlop az alkalmazott számát tartalmazza.
  • 'név': Ebben az oszlopban az alkalmazottak nevei vannak.
  • 'job': Ez az oszlop az alkalmazottak munkaköri megnevezéseiről tartalmaz információkat.
  • 'hiredate': Ez az oszlop a munkavállaló felvételi dátumát mutatja.
  • 'sal': Ebben az oszlopban a fizetési adatok szerepelnek.
  • 'comm': Ebben az oszlopban találhatók az alkalmazotti jutalék részletei, ha vannak ilyenek.
  • 'deptno': Ebben az oszlopban az a részlegszám szerepel, amelyhez a munkavállaló tartozik.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

Most ellenőrizzük a sémát:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

Hozzon létre egy ideiglenes nézetet a DataFrame „emp_df”-ről „emp” néven. Lehetővé teszi a DataFrame lekérdezését SQL szintaxis használatával a Spark SQL-ben, mintha az egy tábla lenne. Az ideiglenes nézet csak a Spark Session időtartamára érvényes.

emp_df.createOrReplaceTempView("emp")

Problémanyilatkozatok megoldása ablakfüggvények segítségével

Itt számos problémamegoldást fogunk megoldani a Windows függvények segítségével:

Q1. Az egyes osztályokon belül rangsorolja a fizetést.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

A PySpark kód megközelítése

  • Az Ablak függvény részlegszám szerint particionálja az adatokat a partitionBy(col('deptno')) segítségével, majd az orderBy(col('sal').desc()) használatával csökkenő sorrendbe rendezi az adatokat az egyes partíciókon belül fizetés szerint. A windowSpec változó tartalmazza a végső ablak specifikációt.
  • Az 'emp_df' az alkalmazottak adatait tartalmazó adatkeret, beleértve az empno, ename, job, deptno és sal oszlopokat.
  • A rangfüggvény a fizetés oszlopban az 'F.rank().over(windowSpec)' használatával kerül alkalmazásra a select utasításban. Az eredményül kapott oszlop álnévvel rendelkezik: „rang”.
  • Ez létrehoz egy adatkeretet, a 'ranking_result_df', amely tartalmazza az empno-t, az ename-t, a job-ot, a deptno-t és a fizetést. Ezen kívül van egy új „rang” oszlopa is, amely az alkalmazott fizetésének rangját jelzi az osztályon belül.

output:

Az eredmény minden osztályon fizetési fokozattal rendelkezik.

Q2. Sűrű rangsorolja a fizetést az egyes osztályokon belül.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

A PySpark kód megközelítése

  • Először hozzon létre egy ablakspecifikációt a Window függvény segítségével, amely az 'emp_df' DataFrame-et deptno szerint particionálja, és a 'sal' oszlopban csökkenő sorrendben rendezi.
  • Ezután a dense_rank() függvény alkalmazásra kerül az ablak specifikációján, amely sűrű rangot rendel az egyes partíciókon belüli minden sorhoz a rendezett sorrend alapján.
  • Végül egy új adatkeret jön létre, a 'dense_ranking_df' néven az emp_df bizonyos oszlopainak kiválasztásával (azaz 'empno', 'ename', 'job', 'deptno' és 'sal'), majd hozzáad egy új, 'dense_rank' oszlopot, amely az ablakfüggvénnyel számított sűrű rangsorértékeket tartalmazza.
  • Végül jelenítse meg az eredményül kapott DataFrame-et táblázatos formátumban.

output:

Az eredmény fizetési szempontból sűrű rangú.

Q3. Számozza meg a sort az egyes részlegeken belül.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

PySpark kód megközelítése

  • Az első sor meghatározza az ablak specifikációját a Window.partitionBy() és Window.orderBy() függvények használatával történő számításhoz. Ezt az ablakot a deptno oszlop particionálja, és a sal oszlop rendezi csökkenő sorrendben.
  • A második sor létrehoz egy új DataFrame-et 'row_num_df' néven, az 'emp_df' vetületét egy további 'row_num' nevű oszloppal, és tartalmazza a sorszámok részleteit.
  • A show() függvény megjeleníti az eredményül kapott DataFrame-et, amely megjeleníti az egyes alkalmazottak empno, ename, job, deptno, sal és row_num oszlopait.

output:

A kimenetben minden egyes alkalmazott sorszáma lesz az osztályon belül, a fizetésük alapján.

Q4. A fizetés teljes összege az egyes részlegeken belül.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

Megközelítés PySpark kódhoz

  • Először egy ablak specifikációt kell megadni a „Window.partitionBy()” és a „Window.orderBy()” metódusokkal. A „partitionBy()” metódus a „deptno” oszlop szerint particionálja az adatokat, míg az „orderBy()” metódus a „sal” oszlop szerint rendezi az adatokat csökkenő sorrendben.
  • Ezután a „sum()” függvényt alkalmazzuk a „sal” oszlopra a „over()” módszerrel, hogy kiszámítsuk az egyes részlegeken belüli fizetések teljes összegét. Az eredmény egy új, „running_sum_sal_df” nevű DataFrame-ben lesz, amely az „empno”, „ename”, „job”, „deptno”, „sal” és „running_total” oszlopokat tartalmazza.
  • Végül a „show()” metódus meghívásra kerül a „running_sum_sal_df” DataFrame-en, hogy megjelenítse a lekérdezés kimenetét. Az eredményül kapott DataFrame megmutatja az egyes alkalmazottak teljes fizetését és egyéb részleteket, például a nevet, a részlegszámot és a munkakört.

output:

A kimenet az egyes részlegek fizetési adatainak futó összegét tartalmazza.

5. kérdés: A következő fizetés az egyes részlegeken belül.

Az egyes osztályokon belüli következő fizetés megtalálásához a LEAD funkciót használjuk. 

A lead() window függvény segít lekérni a kifejezés értékét az ablakpartíció következő sorában. Minden bemeneti oszlophoz egy oszlopot ad vissza, ahol minden oszlop tartalmazza az ablakpartíción belüli aktuális sor feletti eltolási sor bemeneti oszlopának értékét. A lead függvény szintaxisa:- lead(col, offset=1, default=None).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

PySpark kód megközelítése

  • Először is, az ablak funkció segít felosztani a DataFrame sorait osztályszám (deptno) szerint, és a fizetéseket az egyes partíciókon belül csökkenő sorrendbe rendezni.
  • A lead() függvény ezután minden partíción belül a rendezett „sal” oszlopra kerül, hogy visszaadja a következő alkalmazott fizetését (1-es beszámítással), és az alapértelmezett érték 0, ha nincs következő alkalmazott.
  • Az eredményül kapott DataFrame 'next_salary_df' oszlopokat tartalmaz az alkalmazotti szám (empno), név (név), munkakör (munkakör), osztályszám (deptno), aktuális fizetés (sal) és következő fizetés (next_val) oszlopai.

output:

A kimenet az osztály következő munkatársának fizetését tartalmazza a fizetés csökkenő sorrendjében. 

Q6. Korábbi fizetés az egyes osztályokon belül.

Az előző fizetés kiszámításához a LAG függvényt használjuk.

A lag függvény egy kifejezés értékét adja vissza egy adott eltolásnál az ablakpartíción belüli aktuális sor előtt. A lag függvény szintaxisa:- lag(expr, offset=1, default=None).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

PySpark kód megközelítése

  • A window.partitionBy(col('deptno')) határozza meg az ablakpartíciót. Ez azt jelenti, hogy az ablak funkció minden részlegnél külön működik.
  • Ezután az orderBy(col('sal').desc()) meghatározza a fizetés sorrendjét, és az egyes részlegeken belül csökkenő sorrendben rendezi a fizetéseket.
  • Az F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') létrehoz egy új oszlopot prev_val néven a DataFrame 'prev_sal_df'-ben.
  • Minden sornál ez az oszlop tartalmazza a windowSpec által meghatározott ablakon belül az előző sor 'sal' oszlopának értékét.
  • Az offset=1 paraméter azt jelzi, hogy az előző sornak egy sorral az aktuális sor előtt kell lennie, a default=0 pedig az egyes partíciók első sorának alapértelmezett értékét adja meg (mivel az első sorhoz nincs előző sor).
  • Végül a prev_sal_df.show() megjeleníti az eredményül kapott DataFrame-et.

output:

A kimenet az egyes részlegeken belül az egyes alkalmazottak korábbi fizetését jelenti, a fizetések csökkenő sorrendben történő rendezése alapján.

Q7. Első fizetés az egyes osztályokon belül, és az egyes osztályokon belüli tagokhoz képest.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

PySpark kód megközelítése

  • Először hozzon létre egy WindowSpec objektumot, amely az adatokat részlegszám (deptno) szerint particionálja, és fizetés szerint (sal) rendezi csökkenő sorrendben.
  • Ezután alkalmazza a first() elemző függvényt a 'sal' oszlopra a windowSpec által meghatározott ablak felett. Ez a függvény minden partíción (azaz minden deptno csoporton) belül a „sal” oszlop első értékét adja vissza, csökkenő „sal” sorrendben. Az eredményül kapott oszlop új nevet kapott: 'first_val'.
  • Most hozzárendeli az eredményül kapott DataFrame-et, amely tartalmazza a kiválasztott oszlopokat és egy új oszlopot, a „first_val”, amely az egyes osztályok első legmagasabb fizetését mutatja a fizetési értékek csökkenő sorrendje alapján, egy új „first_value_df” változóhoz.

output:

A kimenet az első legmagasabb fizetést mutatja az egyes részlegeknél az alkalmazotti DataFrame-ben.

Következtetés

Ebben a cikkben az ablakfunkciókról tanulunk. A Spark SQL háromféle ablakfunkcióval rendelkezik: rangsoroló függvények, összesítő függvények és érték függvények. Ezzel a funkcióval egy adatkészleten dolgoztunk, hogy fontos és értékes információkat kapjunk. A Spark Window Functions olyan hatékony adatelemző eszközöket kínál, mint a rangsorolás, az elemzés és az értékszámítás. Akár részlegenként elemzi a fizetési ismereteket, akár gyakorlati példákat alkalmaz PySpark és SQL segítségével, ezek a funkciók alapvető eszközöket biztosítanak a hatékony adatfeldolgozáshoz és -elemzéshez a Sparkban.

Kulcs elvezetések

  • Megismertük az ablakfunkciókat, és a Spark SQL és a PySpark DataFrame API segítségével dolgoztunk velük.
  • A megfelelő elemzés érdekében olyan függvényeket használunk, mint a rang, sűrű_rang, sor_szám, lag, lead, groupBy, partitionBy és más függvények.
  • Láttuk a probléma részletes, lépésenkénti megoldásait is, és elemeztük a kimenetet minden problémamegnyilatkozás végén.

Ez az esettanulmány segít jobban megérteni a PySpark funkcióit. Ha bármilyen véleménye vagy kérdése van, írja meg kommentben alább. Csatlakozz hozzám LinkedIn további megbeszélésre. Tanulj tovább!!!

A cikkben bemutatott média nem az Analytics Vidhya tulajdona, és a szerző saját belátása szerint használja.

spot_img

Legújabb intelligencia

spot_img