Zephyrnet-logo

Arbeide med vindusfunksjoner i PySpark

Dato:

Introduksjon

Å lære om vindusfunksjoner i PySpark kan være utfordrende, men verdt innsatsen. Vindufunksjoner er et kraftig verktøy for å analysere data og kan hjelpe deg med å få innsikt du kanskje ikke har sett ellers. Ved å forstå hvordan du bruker Window Functions i Spark; du kan ta din dataanalyse ferdigheter til neste nivå og ta mer informerte beslutninger. Enten du jobber med stort eller smått datasett, vil lære Window Functions i Spark at du kan manipulere og analysere data på nye og spennende måter.

Vindufunksjoner i PySpark

I denne bloggen skal vi først forstå konseptet med vindusfunksjoner og deretter diskutere hvordan du bruker dem med Spark SQL og PySpark DataFrame API. Slik at du mot slutten av denne artikkelen vil forstå hvordan du bruker vindusfunksjoner med ekte datasett og får viktig innsikt for virksomheten.

Læringsmål

  • Forstå konseptet med vindusfunksjoner.
  • Arbeide med vindusfunksjoner ved hjelp av datasett.
  • Finn ut innsikten ved hjelp av vindusfunksjonene.
  • Bruk Spark SQL og DataFrame API for å jobbe med vindusfunksjoner.

Denne artikkelen ble publisert som en del av Data Science Blogathon.

Innholdsfortegnelse

Hva er vindusfunksjoner?

Vindusfunksjoner hjelper til med å analysere data innenfor en gruppe rader som er relatert til hverandre. De gjør det mulig for brukere å utføre komplekse transformasjoner på radene i en dataramme eller datasett knyttet til hverandre basert på noen partisjonerings- og bestillingskriterier.

Vindufunksjoner opererer på en spesifikk partisjon av en dataramme eller datasett definert av et sett med partisjoneringskolonner. De REKKEFØLGE ETTER klausul partisjonerer dataene i en vindusfunksjon for å ordne dem i en bestemt rekkefølge. Vindusfunksjoner utfører deretter beregninger på et skyvevindu med rader som inkluderer gjeldende rad og en delmengde av de foregående enten 'og'/'eller' påfølgende rader, som spesifisert i vindusrammen.

Arbeide med vindusfunksjoner i PySpark

Noen vanlige eksempler på vindusfunksjoner inkluderer beregning av glidende gjennomsnitt, rangering eller sortering av rader basert på en bestemt kolonne eller gruppe av kolonner, beregne løpende totaler og finne den første eller siste verdien i en gruppe med rader. Med Sparks kraftige vindusfunksjoner kan brukere utføre komplekse analyser og aggregeringer over store datasett med relativ letthet, noe som gjør det til et populært verktøy for store databehandling og analyse.

"

Vindufunksjoner i SQL

Spark SQL støtter tre typer vindusfunksjoner:

  • Rangeringsfunksjoner:- Disse funksjonene tildeler en rangering til hver rad i en partisjon av resultatsettet. For eksempel gir ROW_NUMBER()-funksjonen et unikt sekvensnummer til hver rad i partisjonen.
  • Analysefunksjoner:- Disse funksjonene beregner aggregerte verdier over et vindu med rader. SUM()-funksjonen beregner for eksempel summen av en kolonne over et vindu med rader.
  • Verdifunksjoner:- Disse funksjonene beregner en analytisk verdi for hver rad i en partisjon, basert på verdiene til andre rader i samme partisjon. For eksempel returnerer LAG()-funksjonen verdien til en kolonne fra forrige rad i partisjonen.

DataFrame Creation

Vi vil lage en eksempeldataramme slik at vi praktisk talt kan jobbe med forskjellige vindusfunksjoner. Vi vil også prøve å svare på noen spørsmål ved hjelp av disse dataene og vindusfunksjonene.

Datarammen har ansattes detaljer som navn, betegnelse, ansattnummer, ansettelsesdato, lønn osv. Totalt har vi 8 kolonner som er som følger:

  • 'empno': Denne kolonnen inneholder den ansattes nummer.
  • 'ename': Denne kolonnen har ansattes navn.
  • 'jobb': Denne kolonnen inneholder informasjon om ansattes stillingsbetegnelser.
  • 'hiredate': Denne kolonnen viser den ansattes ansettelsesdato.
  • 'sal': Lønnsdetaljer inneholder i denne kolonnen.
  • 'comm': Denne kolonnen har ansatt provisjonsdetaljer, hvis noen.
  • 'deptno': Avdelingsnummeret som den ansatte tilhører står i denne kolonnen.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

Nå skal vi sjekke skjemaet:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

Lag en midlertidig visning av DataFrame 'emp_df' med navnet "emp". Det lar oss spørre DataFrame ved å bruke SQL-syntaks i Spark SQL som om det var en tabell. Den midlertidige visningen er bare gyldig for varigheten av Spark-økten.

emp_df.createOrReplaceTempView("emp")

Løse problemerklæringer ved hjelp av vindusfunksjoner

Her skal vi løse flere problemsetninger ved å bruke Windows-funksjoner:

Q1. Ranger lønnen innenfor hver avdeling.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

Tilnærming for PySpark-kode

  • Vindu-funksjonen deler dataene etter avdelingsnummer ved å bruke partitionBy(col('deptno')) og bestiller deretter dataene innenfor hver partisjon etter lønn i synkende rekkefølge ved å bruke orderBy(col('sal').desc()). Variabelen windowSpec inneholder den endelige vindusspesifikasjonen.
  • 'emp_df' er datarammen som inneholder ansattes data, inkludert kolonner for empno, ename, jobb, deptno og sal.
  • Rangeringsfunksjonen brukes på lønnskolonnen ved å bruke 'F.rank().over(windowSpec)' i select-utsagnet. Den resulterende kolonnen har et aliasnavn som 'rang'.
  • Det vil lage en dataramme, 'ranking_result_df', som inkluderer empno, ename, jobb, deptno og lønn. Den har også en ny kolonne, 'rang', som representerer rangeringen av den ansattes lønn innenfor deres avdeling.

Utgang:

Utfallet har lønnsgrad i hver avdeling.

Q2. Tett rangering lønnen innen hver avdeling.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

Tilnærming for PySpark-kode

  • Lag først en vindusspesifikasjon ved å bruke Window-funksjonen, som partisjonerer 'emp_df' DataFrame etter deptno og bestiller den ved å gå ned i 'sal'-kolonnen.
  • Deretter blir funksjonen dense_rank() brukt over vindusspesifikasjonen, som tildeler en tett rangering til hver rad i hver partisjon basert på dens sorterte rekkefølge.
  • Til slutt opprettes en ny DataFrame kalt 'dense_ranking_df' ved å velge spesifikke kolonner fra emp_df (dvs. 'empno', 'ename', 'job', 'deptno' og 'sal') og legge til en ny kolonne 'dense_rank' som inneholder de tette rangeringsverdiene beregnet av vindusfunksjonen.
  • Til slutt, vis den resulterende DataFrame i tabellformat.

Utgang:

Utfallet har en lønnsmessig tett rangering.

Q3. Nummer raden innenfor hver avdeling.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

Tilnærming for PySpark-kode

  • Den første linjen definerer en vindusspesifikasjon for beregningen ved å bruke funksjonene Window.partitionBy() og Window.orderBy(). Dette vinduet er partisjonert av deptno-kolonnen og sortert etter sal-kolonnen i synkende rekkefølge.
  • Den andre linjen oppretter en ny DataFrame kalt 'row_num_df', en projeksjon av 'emp_df' med en ekstra kolonne kalt 'row_num' og den inneholder radnummerdetaljene.
  • Show()-funksjonen viser den resulterende DataFrame, som viser hver ansatts empno-, ename-, jobb-, deptno-, sal- og row_num-kolonner.

Utgang:

Utgangen vil ha radnummeret til hver ansatt i deres avdeling basert på lønnen deres.

Q4. Løpende totalsum av lønn innen hver avdeling.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

Tilnærming for PySpark-kode

  • Først defineres en vindusspesifikasjon ved å bruke metodene "Window.partitionBy()" og "Window.orderBy()". "partitionBy()"-metoden partisjonerer dataene etter "deptno"-kolonnen, mens "orderBy()"-metoden sorterer dataene etter "sal"-kolonnen i synkende rekkefølge.
  • Deretter brukes "sum()"-funksjonen på "sal"-kolonnen ved å bruke "over()"-metoden for å beregne den løpende summen av lønn innen hver avdeling. Resultatet vil være i en ny DataFrame kalt "running_sum_sal_df", som inneholder kolonnene 'empno', 'ename', 'job', 'deptno', 'sal' og 'running_total'.
  • Til slutt kalles "show()"-metoden på "running_sum_sal_df" DataFrame for å vise utdataene fra spørringen. Den resulterende DataFrame viser hver ansatts løpende totalsum av lønn og andre detaljer som navn, avdelingsnummer og jobb.

Utgang:

Utgangen vil ha en løpende sum av hver avdelings lønnsdata.

Q5: Neste lønn innen hver avdeling.

For å finne neste lønn innenfor hver avdeling bruker vi LEAD-funksjonen. 

Lead()-vindusfunksjonen hjelper til med å få verdien av uttrykket i neste rad i vinduspartisjonen. Den returnerer en kolonne for hver inndatakolonne, der hver kolonne vil inneholde verdien til inngangskolonnen for forskyvningsraden over gjeldende rad i vinduspartisjonen. Syntaksen for lead-funksjonen er:- lead(col, offset=1, default=None).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

Tilnærming for PySpark-kode

  • For det første hjelper vindusfunksjonen til å partisjonere DataFrames rader etter avdelingsnummer (deptno) og bestille lønningene i synkende rekkefølge innenfor hver partisjon.
  • Lead()-funksjonen brukes deretter på den bestilte 'sal'-kolonnen i hver partisjon for å returnere lønnen til følgende ansatt (med en offset på 1), og standardverdien er 0 i tilfelle det ikke er noen neste ansatt.
  • Den resulterende DataFrame 'next_salary_df' inneholder kolonner for ansattnummer (empno), navn (ename), stillingstittel (jobb), avdelingsnummer (deptno), gjeldende lønn (sal) og neste lønn (neste_val).

Utgang:

Utdataene inneholder lønnen til neste ansatte i avdelingen basert på rekkefølgen på synkende lønn. 

Q6. Tidligere lønn innen hver avdeling.

For å beregne tidligere lønn bruker vi LAG-funksjonen.

Lagfunksjonen returnerer verdien til et uttrykk ved en gitt forskyvning før gjeldende rad i vinduspartisjonen. Syntaksen til lagfunksjonen er:- lag(uttr, offset=1, default=Ingen).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

Tilnærming for PySpark-kode

  • window.partitionBy(col('deptno')) spesifiserer vinduspartisjonen. Det betyr at vindusfunksjonen fungerer separat for hver avdeling.
  • Deretter spesifiserer orderBy('sal').desc()) rekkefølgen på lønnen og vil bestille lønningene innenfor hver avdeling i synkende rekkefølge.
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') oppretter en ny kolonne kalt prev_val i DataFrame 'prev_sal_df'.
  • For hver rad inneholder denne kolonnen verdien av 'sal'-kolonnen fra den forrige raden i vinduet definert av vinduspesifikasjonen.
  • Parameteren offset=1 indikerer at forrige rad skal være én rad før gjeldende rad, og default=0 spesifiserer standardverdien for den første raden i hver partisjon (siden det ikke er noen tidligere rad for den første raden).
  • Til slutt viser prev_sal_df.show() den resulterende DataFrame.

Utgang:

Utgangen representerer forrige lønn for hver ansatt innenfor hver avdeling, basert på bestilling av lønn i synkende rekkefølge.

Q7. Første lønn innen hver avdeling og sammenligne med hvert medlem innen hver avdeling.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

Tilnærming for PySpark-kode

  • Først oppretter du et WindowSpec-objekt som deler dataene etter avdelingsnummer (deptno) og bestiller det etter lønn (sal) i synkende rekkefølge.
  • Bruker deretter den første() analytiske funksjonen på 'sal'-kolonnen over vinduet definert av windowSpec. Denne funksjonen returnerer den første verdien av 'sal'-kolonnen innenfor hver partisjon (dvs. hver deptno-gruppe) sortert etter synkende 'sal'. Den resulterende kolonnen har et nytt navn, 'first_val'.
  • Tildeler nå den resulterende DataFrame, som inneholder de valgte kolonnene og en ny kolonne, 'first_val', som viser den første høyeste lønnen for hver avdeling basert på synkende rekkefølge av lønnsverdier, til en ny variabel kalt 'first_value_df'.

Utgang:

Utdataene viser den første høyeste lønnen for hver avdeling i en ansatt DataFrame.

konklusjonen

I denne artikkelen lærer vi om vindusfunksjoner. Spark SQL har tre typer vindusfunksjoner: Rangeringsfunksjoner, Aggregerte funksjoner og Verdifunksjoner. Ved å bruke denne funksjonen jobbet vi med et datasett for å finne noen viktige og verdifulle innsikter. Spark Window Functions tilbyr kraftige dataanalyseverktøy som rangering, analyser og verdiberegninger. Enten du analyserer lønnsinnsikt etter avdeling eller bruker praktiske eksempler med PySpark og SQL, gir disse funksjonene viktige verktøy for effektiv databehandling og analyse i Spark.

Nøkkelfunksjoner

  • Vi lærte om vindusfunksjonene og jobbet med dem ved å bruke Spark SQL og PySpark DataFrame API.
  • Vi bruker funksjoner som rank, dense_rank, row_number, lag, lead, groupBy, partitionBy og andre funksjoner for å gi riktig analyse.
  • Vi har også sett de detaljerte trinnvise løsningene på problemet og analysert resultatet på slutten av hver problemstilling.

Denne casestudien hjelper deg å forstå PySpark-funksjonene bedre. Hvis du har noen meninger eller spørsmål, så kommenter nedenfor. Ta kontakt med meg på Linkedin for videre diskusjon. Fortsett å lære!!!

Mediene vist i denne artikkelen eies ikke av Analytics Vidhya og brukes etter forfatterens skjønn.

spot_img

Siste etterretning

spot_img