Logotip Zephyrnet

Delo s funkcijami oken v PySpark

Datum:

Predstavitev

Spoznavanje okenskih funkcij v PySparku je lahko zahtevno, a vredno truda. Funkcije oken so zmogljivo orodje za analizo podatkov in vam lahko pomagajo pridobiti vpoglede, ki jih sicer morda niste videli. Z razumevanjem uporabe okenskih funkcij v Sparku; lahko vzameš svojega Analiza podatkov spretnosti na naslednjo raven in sprejemanje bolj premišljenih odločitev. Ne glede na to, ali delate z velikimi ali majhnimi nabor podatkov, vam bo učenje okenskih funkcij v Sparku omogočilo manipulacijo in analizo podatkov na nove in vznemirljive načine.

Funkcije oken v PySpark

V tem blogu bomo najprej razumeli koncept okenskih funkcij in nato razpravljali o tem, kako jih uporabljati s Spark SQL in PySpark DataFrame API. Tako da boste do konca tega članka razumeli, kako uporabljati okenske funkcije z resničnimi nabori podatkov in pridobili bistvene vpoglede za poslovanje.

Učni cilji

  • Razumeti koncept okenskih funkcij.
  • Delo z okenskimi funkcijami z uporabo naborov podatkov.
  • Poiščite vpoglede z uporabo okenskih funkcij.
  • Uporabite Spark SQL in DataFrame API za delo s funkcijami oken.

Ta članek je bil objavljen kot del Blogaton podatkovne znanosti.

Kazalo

Kaj so okenske funkcije?

Okenske funkcije pomagajo analizirati podatke znotraj skupine vrstic, ki so med seboj povezane. Uporabnikom omogočajo izvajanje zapletenih transformacij v vrsticah podatkovnega okvira ali nabora podatkov, povezanih med seboj, na podlagi nekaterih kriterijev za razdelitev in razvrščanje.

Okenske funkcije delujejo na določeni particiji podatkovnega okvira ali nabora podatkov, ki ga definira niz particijskih stolpcev. The ORDER BY klavzula razdeli podatke v okensko funkcijo, da jih razporedi v določenem vrstnem redu. Okenske funkcije nato izvajajo izračune v drsečem oknu vrstic, ki vključuje trenutno vrstico in podmnožico predhodnih bodisi 'in'/'ali' naslednjih vrstic, kot je določeno v okenskem okviru.

Delo s funkcijami oken v PySpark

Nekateri običajni primeri okenskih funkcij vključujejo izračun drsečih povprečij, razvrščanje ali razvrščanje vrstic na podlagi določenega stolpca ali skupine stolpci, izračunavanje tekočih vsot in iskanje prve ali zadnje vrednosti v skupini vrstic. Z zmogljivimi okenskimi funkcijami Spark lahko uporabniki izvajajo kompleksne analize in združevanja Velikih podatkovnih nizov z relativno lahkoto, zaradi česar je priljubljeno orodje za velike obdelava podatkov in analitiko.

"

Okenske funkcije v SQL

Spark SQL podpira tri vrste okenskih funkcij:

  • Funkcije razvrščanja: - Te funkcije dodelijo rang vsaki vrstici znotraj particije niza rezultatov. Na primer, funkcija ROW_NUMBER() daje edinstveno zaporedno številko vsaki vrstici znotraj particije.
  • Funkcije analitike:- Te funkcije izračunajo agregatne vrednosti v oknu vrstic. Na primer, funkcija SUM() izračuna vsoto stolpca v oknu vrstic.
  • Funkcije vrednosti: - Te funkcije izračunajo analitično vrednost za vsako vrstico v particiji na podlagi vrednosti drugih vrstic v isti particiji. Na primer, funkcija LAG() vrne vrednost stolpca iz prejšnje vrstice v particiji.

Ustvarjanje DataFrame

Ustvarili bomo vzorčni podatkovni okvir, tako da bomo lahko praktično delali z različnimi funkcijami oken. Prav tako bomo poskušali odgovoriti na nekaj vprašanj s pomočjo teh podatkov in okenskih funkcij.

Podatkovni okvir vsebuje podrobnosti zaposlenih, kot so njihovo ime, naziv, številka zaposlenega, datum zaposlitve, plača itd. Skupaj imamo 8 stolpcev, ki so naslednji:

  • 'empno': Ta stolpec vsebuje številko zaposlenega.
  • 'ename': Ta stolpec vsebuje imena zaposlenih.
  • 'zaposlitev': Ta stolpec vsebuje informacije o nazivih delovnih mest zaposlenih.
  • 'hiredate': Ta stolpec prikazuje datum zaposlitve zaposlenega.
  • 'sal': podrobnosti o plači so v tem stolpcu.
  • 'comm': Ta stolpec vsebuje podrobnosti o proviziji zaposlenih, če obstajajo.
  • 'deptno': V tem stolpcu je številka oddelka, kateremu zaposleni pripada.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

Zdaj bomo preverili shemo:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

Ustvarite začasni pogled DataFrame 'emp_df' z imenom "emp". Omogoča nam poizvedovanje po DataFrame s sintakso SQL v Spark SQL, kot da bi šlo za tabelo. Začasni pogled je veljaven samo za čas trajanja seje Spark.

emp_df.createOrReplaceTempView("emp")

Reševanje izjav o težavah z uporabo okenskih funkcij

Tukaj bomo reševali več stavkov o težavah z uporabo funkcij Windows:

Q1. Razvrstite plačo znotraj vsakega oddelka.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

Pristop za kodo PySpark

  • Funkcija Window razdeli podatke po številki oddelka z uporabo partitionBy(col('deptno')) in nato razvrsti podatke znotraj vsake particije glede na plačo v padajočem vrstnem redu z uporabo orderBy(col('sal').desc()). Spremenljivka windowSpec vsebuje končno specifikacijo okna.
  • 'emp_df' je podatkovni okvir, ki vsebuje podatke o zaposlenih, vključno s stolpci za empno, ename, job, deptno in sal.
  • Funkcija ranga se uporabi za stolpec plače z uporabo 'F.rank().over(windowSpec)' v stavku select. Nastali stolpec ima vzdevek 'rank'.
  • Ustvaril bo podatkovni okvir, 'ranking_result_df', ki vključuje empno, ename, job, deptno in salary. Ima tudi nov stolpec, 'rang', ki predstavlja rang plače zaposlenega v njegovem oddelku.

izhod:

Rezultat ima plačni rang v vsakem oddelku.

Q2. Gosto razvrstite plačo znotraj vsakega oddelka.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

Pristop za kodo PySpark

  • Najprej ustvarite specifikacijo okna s funkcijo Window, ki particionira 'emp_df' DataFrame po deptno in ga razvrsti tako, da se spusti po stolpcu 'sal'.
  • Nato se nad specifikacijo okna uporabi funkcija dense_rank(), ki vsaki vrstici znotraj vsake particije dodeli gosto uvrstitev glede na njen razvrščeni vrstni red.
  • Končno se ustvari nov DataFrame, imenovan 'dense_ranking_df', tako da se izberejo določeni stolpci iz emp_df (tj. 'empno', 'ename', 'job', 'deptno' in 'sal') in doda nov stolpec 'dense_rank', ki vsebuje vrednosti gostega rangiranja, izračunane z okensko funkcijo.
  • Nazadnje prikažite nastali DataFrame v obliki tabele.

izhod:

Rezultat ima gosto plačo.

Q3. Oštevilčite vrstico znotraj vsakega oddelka.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

Pristop za kodo PySpark

  • Prva vrstica definira specifikacijo okna za izračun z uporabo funkcij Window.partitionBy() in Window.orderBy(). To okno je razdeljeno s stolpcem deptno in razvrščeno po stolpcu sal v padajočem vrstnem redu.
  • Druga vrstica ustvari nov DataFrame, imenovan 'row_num_df', projekcijo 'emp_df' z dodatnim stolpcem, imenovanim 'row_num', in vsebuje podrobnosti o številkah vrstic.
  • Funkcija show() prikaže nastali DataFrame, ki prikazuje stolpce empno, ename, job, deptno, sal in row_num vsakega zaposlenega.

izhod:

Izhod bo imel številko vrstice vsakega zaposlenega v njihovem oddelku glede na njihovo plačo.

Q4. Tekoči skupni znesek plač znotraj vsakega oddelka.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

Pristop za kodo PySpark

  • Najprej se določi specifikacija okna z metodama »Window.partitionBy()« in »Window.orderBy()«. Metoda “partitionBy()” razdeli podatke po stolpcu “deptno”, medtem ko metoda “orderBy()” razvrsti podatke po stolpcu “sal” v padajočem vrstnem redu.
  • Nato se funkcija »sum()« uporabi za stolpec »sal« z uporabo metode »over()« za izračun tekoče vsote plač v vsakem oddelku. Rezultat bo v novem DataFrame z imenom “running_sum_sal_df”, ki vsebuje stolpce 'empno', 'ename', 'job', 'deptno', 'sal' in 'running_total'.
  • Nazadnje se v DataFrameu »running_sum_sal_df« pokliče metoda »show()« za prikaz izhoda poizvedbe. Dobljeni DataFrame prikazuje trenutno skupno plačo vsakega zaposlenega in druge podrobnosti, kot so ime, številka oddelka in delovno mesto.

izhod:

Rezultat bo vseboval tekočo vsoto podatkov o plačah vsakega oddelka.

V5: Naslednja plača v vsakem oddelku.

Za iskanje naslednje plače znotraj posameznega oddelka uporabljamo funkcijo LEAD. 

Okenska funkcija lead() pomaga pridobiti vrednost izraza v naslednji vrstici okenske particije. Vrne stolpec za vsak vhodni stolpec, kjer bo vsak stolpec vseboval vrednost vhodnega stolpca za vrstico z odmikom nad trenutno vrstico znotraj okenske particije. Sintaksa za funkcijo lead je:- lead(col, offset=1, default=None).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

Pristop za kodo PySpark

  • Prvič, okenska funkcija pomaga razdeliti vrstice DataFrame po številki oddelka (deptno) in razporediti plače v padajočem vrstnem redu znotraj vsake particije.
  • Funkcija lead() se nato uporabi za urejen stolpec 'sal' znotraj vsake particije, da vrne plačo naslednjega zaposlenega (z odmikom 1), privzeta vrednost pa je 0, če ni naslednjega zaposlenega.
  • Nastali DataFrame 'next_salary_df' vsebuje stolpce za številko zaposlenega (empno), ime (ename), delovno mesto (job), številko oddelka (deptno), trenutno plačo (sal) in naslednjo plačo (next_val).

izhod:

Izhod vsebuje plačo naslednjega zaposlenega v oddelku glede na vrstni red padajočih plač. 

V6. Prejšnja plača v vsakem oddelku.

Za izračun prejšnje plače uporabljamo funkcijo LAG.

Funkcija zamika vrne vrednost izraza pri danem odmiku pred trenutno vrstico znotraj okenske particije. Sintaksa funkcije lag je:- lag(expr, offset=1, default=None).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

Pristop za kodo PySpark

  • Window.partitionBy(col('deptno')) določa particijo okna. To pomeni, da funkcija okna deluje ločeno za vsak oddelek.
  • Nato orderBy(col('sal').desc()) podaja vrstni red plače in razvrsti plače znotraj vsakega oddelka v padajočem vrstnem redu.
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') ustvari nov stolpec z imenom prev_val v DataFrame 'prev_sal_df'.
  • Za vsako vrstico ta stolpec vsebuje vrednost stolpca 'sal' iz prejšnje vrstice v oknu, ki ga definira windowSpec.
  • Parameter offset=1 označuje, da mora biti prejšnja vrstica eno vrstico pred trenutno vrstico, default=0 pa podaja privzeto vrednost za prvo vrstico v vsaki particiji (ker za prvo vrstico ni prejšnje vrstice).
  • Končno, prev_sal_df.show() prikaže nastali DataFrame.

izhod:

Izhod predstavlja prejšnjo plačo za vsakega zaposlenega v posameznem oddelku na podlagi razvrščanja plač v padajočem vrstnem redu.

V7. Prva plača v vsakem oddelku in primerjava z vsakim članom v vsakem oddelku.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

Pristop za kodo PySpark

  • Najprej ustvarite objekt WindowSpec, ki razdeli podatke po številki oddelka (deptno) in jih razvrsti po plači (sal) v padajočem vrstnem redu.
  • Nato uporabi analitično funkcijo first() za stolpec 'sal' nad oknom, ki ga definira windowSpec. Ta funkcija vrne prvo vrednost stolpca 'sal' znotraj vsake particije (tj. vsake skupine deptno), razvrščeno po padajočem 'sal'. Nastali stolpec ima novo ime, 'first_val'.
  • Zdaj dodeli nastali podatkovni okvir, ki vsebuje izbrane stolpce in nov stolpec, 'first_val', ki prikazuje prvo najvišjo plačo za vsak oddelek na podlagi padajočega vrstnega reda vrednosti plače, novi spremenljivki, imenovani 'first_value_df'.

izhod:

Izhod prikazuje prvo najvišjo plačo za vsak oddelek v DataFrame zaposlenih.

zaključek

V tem članku spoznavamo funkcije oken. Spark SQL ima tri vrste okenskih funkcij: funkcije razvrščanja, agregatne funkcije in funkcije vrednosti. S to funkcijo smo delali na naboru podatkov, da bi našli nekaj pomembnih in dragocenih vpogledov. Funkcije Spark Window ponujajo zmogljiva orodja za analizo podatkov, kot so razvrščanje, analitika in izračuni vrednosti. Ne glede na to, ali analizirate vpoglede v plače po oddelkih ali uporabljate praktične primere s PySpark & ​​SQL, te funkcije zagotavljajo osnovna orodja za učinkovito obdelavo in analizo podatkov v Sparku.

Ključni izdelki

  • Spoznali smo okenske funkcije in delali z njimi s pomočjo Spark SQL in PySpark DataFrame API.
  • Za zagotavljanje pravilne analize uporabljamo funkcije, kot so rank, dense_rank, row_number, lag, lead, groupBy, partitionBy in druge funkcije.
  • Ogledali smo si tudi podrobne rešitve problema po korakih in analizirali rezultat na koncu vsake izjave o problemu.

Ta študija primera vam pomaga bolje razumeti funkcije PySpark. Če imate kakršna koli mnenja ali vprašanja, komentirajte spodaj. Poveži se z mano na LinkedIn za nadaljnjo razpravo. Nadaljujte z učenjem!!!

Mediji, prikazani v tem članku, niso v lasti Analytics Vidhya in se uporabljajo po lastni presoji avtorja.

spot_img

Najnovejša inteligenca

spot_img