Zephyrnet logo

Työskentely ikkunatoimintojen kanssa PySparkissa

Treffi:

esittely

PySparkin ikkunatoimintojen oppiminen voi olla haastavaa, mutta vaivan arvoista. Ikkunafunktiot ovat tehokas työkalu tietojen analysointiin ja voivat auttaa sinua saamaan oivalluksia, joita et ehkä muuten olisi nähnyt. Ymmärtämällä kuinka käyttää ikkunatoimintoja Sparkissa; voit ottaa omasi tietojen analysointi taidot seuraavalle tasolle ja tehdä tietoisempia päätöksiä. Työskenteletpä sitten suurten tai pienten kanssa aineistot, Sparkin ikkunatoimintojen oppiminen antaa sinun käsitellä ja analysoida tietoja uusilla ja jännittävillä tavoilla.

Ikkunatoiminnot PySparkissa

Tässä blogissa ymmärrämme ensin ikkunatoimintojen käsitteen ja keskustelemme sitten niiden käytöstä Spark SQL:n ja PySpark DataFrame API:n kanssa. Jotta tämän artikkelin loppuun mennessä ymmärrät, kuinka ikkunatoimintoja käytetään todellisten tietojoukkojen kanssa, ja saat liiketoiminnan kannalta tärkeitä tietoja.

Oppimistavoitteet

  • Ymmärrä ikkunatoimintojen käsite.
  • Työskentely ikkunatoimintojen kanssa tietojoukkojen avulla.
  • Tutustu oivalluksiin ikkunatoimintojen avulla.
  • Käytä Spark SQL:ää ja DataFrame APIa ikkunatoimintojen kanssa työskentelemiseen.

Tämä artikkeli julkaistiin osana Data Science Blogathon.

Sisällysluettelo

Mitä ovat ikkunatoiminnot?

Ikkunafunktiot auttavat analysoimaan tietoja toisiinsa liittyvien rivien joukossa. Niiden avulla käyttäjät voivat suorittaa monimutkaisia ​​muunnoksia tietokehyksen tai tietojoukon riveille, jotka liittyvät toisiinsa tiettyjen osiointi- ja järjestyskriteerien perusteella.

Ikkunafunktiot toimivat osiointisarakkeiden joukon määrittämässä tietokehyksen tai tietojoukon tietyssä osiossa. The TILAUS lauseke osittaa tiedot ikkunafunktiossa järjestääkseen ne tiettyyn järjestykseen. Ikkunafunktiot suorittavat sitten laskutoimituksia liukuvalle rivi-ikkunalle, joka sisältää nykyisen rivin ja alijoukon edeltävistä joko "ja"/"tai" seuraavista riveistä, kuten ikkunakehyksessä on määritetty.

Työskentely ikkunatoimintojen kanssa PySparkissa

Joitakin yleisiä esimerkkejä ikkunafunktioista ovat liukuvien keskiarvojen laskeminen, rivien järjestys tai lajittelu tietyn sarakkeen tai ryhmän perusteella. sarakkeet, laskee juoksevat summat ja etsii ensimmäisen tai viimeisen arvon riviryhmästä. Sparkin tehokkaiden ikkunatoimintojen avulla käyttäjät voivat suorittaa monimutkaisia ​​analyyseja ja aggregaatioita suuret tietokannat suhteellisen helposti, joten se on suosittu työkalu suurille tietojenkäsittely ja analytiikka.

"

Ikkunafunktiot SQL:ssä

Spark SQL tukee kolmenlaisia ​​ikkunatoimintoja:

  • Sijoitustoiminnot: - Nämä funktiot antavat arvon jokaiselle riville tulosjoukon osion sisällä. Esimerkiksi funktio ROW_NUMBER() antaa yksilöllisen järjestysnumeron jokaiselle osion riville.
  • Analyysitoiminnot: - Nämä funktiot laskevat aggregoituja arvoja rivi-ikkunan yli. Esimerkiksi SUM()-funktio laskee sarakkeen summan rivi-ikkunan yli.
  • Arvofunktiot: - Nämä funktiot laskevat analyyttisen arvon jokaiselle osion riville saman osion muiden rivien arvojen perusteella. Esimerkiksi LAG()-funktio palauttaa sarakkeen arvon osion edelliseltä riviltä.

Datakehyksen luominen

Luomme mallitietokehyksen, jotta voimme käytännössä työskennellä erilaisten ikkunatoimintojen kanssa. Yritämme myös vastata joihinkin kysymyksiin näiden tietojen ja ikkunatoimintojen avulla.

Tietokehyksessä on työntekijöiden tiedot, kuten nimi, nimitys, työntekijän numero, palkkauspäivä, palkka jne. Meillä on yhteensä 8 saraketta, jotka ovat seuraavat:

  • 'empno': Tämä sarake sisältää työntekijän numeron.
  • 'nimi': Tässä sarakkeessa on työntekijöiden nimet.
  • 'job': Tämä sarake sisältää tietoja työntekijöiden ammattinimikkeistä.
  • 'palkattu': Tässä sarakkeessa näkyy työntekijän palkkauspäivämäärä.
  • 'sal': Palkkatiedot sisältyvät tähän sarakkeeseen.
  • 'comm': Tässä sarakkeessa on työntekijöiden provisiotiedot, jos sellaisia ​​on.
  • 'deptno': Osaston numero, johon työntekijä kuuluu, on tässä sarakkeessa.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

Nyt tarkistamme kaavion:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

Luo väliaikainen näkymä DataFramesta 'emp_df' nimellä "emp". Sen avulla voimme tehdä kyselyjä DataFramesta käyttämällä SQL-syntaksia Spark SQL:ssä ikään kuin se olisi taulukko. Väliaikainen näkymä on voimassa vain Spark-istunnon ajan.

emp_df.createOrReplaceTempView("emp")

Ongelmailmoitusten ratkaiseminen ikkunatoimintojen avulla

Tässä ratkaisemme useita ongelmalauseita Windows-toimintojen avulla:

Q1. Aseta palkka kunkin osaston sisällä.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

PySpark-koodin lähestymistapa

  • Ikkuna-toiminto osittaa tiedot osastonumeron mukaan käyttämällä partitionBy(col('deptno')) ja järjestää sitten tiedot kussakin osiossa palkan mukaan laskevassa järjestyksessä komennolla orderBy(col('sal').desc()). Muuttuja windowSpec sisältää lopullisen ikkunan määrityksen.
  • "emp_df" on tietokehys, joka sisältää työntekijätiedot, mukaan lukien sarakkeet empno-, ename-, job-, deptno- ja sal-sarakkeille.
  • Sijoitusfunktiota sovelletaan palkkasarakkeeseen käyttämällä valintalausekkeen F.rank().over(windowSpec)-funktiota. Tuloksena olevan sarakkeen aliaksen nimi on "rank".
  • Se luo datakehyksen, 'ranking_result_df', joka sisältää empno-, ename-, työ-, deptno- ja palkan. Siinä on myös uusi sarake, "rank", joka edustaa työntekijän palkan arvoa osastolla.

lähtö:

Tuloksella on palkkaluokka kullakin osastolla.

Q2. Tiheä palkkaluokka kullakin osastolla.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

PySpark-koodin lähestymistapa

  • Luo ensin ikkunamäärittely Window-funktiolla, joka osittaa 'emp_df' DataFramen deptno:n mukaan ja järjestää sen laskemalla 'sal'-saraketta.
  • Sitten dense_rank()-funktiota sovelletaan ikkunamäärittelyyn, joka määrittää tiheän arvon kunkin osion jokaiselle riville sen lajittelujärjestyksen perusteella.
  • Lopuksi luodaan uusi datakehys nimeltä 'dense_ranking_df' valitsemalla tietyt sarakkeet emp_df:stä (eli 'empno', 'ename', 'job', 'deptno' ja 'sal') ja lisäämällä uusi sarake 'dense_rank', joka sisältää ikkunafunktion laskemat tiheät sijoitusarvot.
  • Näytä lopuksi tuloksena oleva DataFrame taulukkomuodossa.

lähtö:

Tuloksena on palkkatasoltaan tiheä arvo.

Q3. Numeroi kunkin osaston rivi.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

PySpark-koodin lähestymistapa

  • Ensimmäinen rivi määrittää ikkunan määrittelyn funktioiden Window.partitionBy() ja Window.orderBy() avulla tapahtuvaa laskutoimitusta varten. Tämä ikkuna on osioitu deptno-sarakkeella ja järjestetty sal-sarakkeen mukaan laskevassa järjestyksessä.
  • Toinen rivi luo uuden datakehyksen nimeltä 'row_num_df', projektion 'emp_df', jossa on lisäsarake nimeltä 'row_num' ja se sisältää rivinumeroiden tiedot.
  • Show()-funktio näyttää tuloksena olevan DataFramen, joka näyttää jokaisen työntekijän empno-, ename-, job-, deptno-, sal- ja row_num-sarakkeet.

lähtö:

Tuotoksessa on kunkin osastonsa työntekijän rivinumero palkan perusteella.

Q4. Palkan juokseva kokonaissumma jokaisessa osastossa.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

Lähestymistapa PySpark-koodille

  • Ensin määritetään ikkunan määrittely käyttämällä "Window.partitionBy()"- ja "Window.orderBy()"-menetelmiä. Metodi "partitionBy()" jakaa tiedot "deptno"-sarakkeen mukaan, kun taas "orderBy()"-menetelmä järjestää tiedot "sal"-sarakkeen mukaan laskevassa järjestyksessä.
  • Seuraavaksi "sum()"-funktiota sovelletaan "sal"-sarakkeeseen käyttämällä "over()"-menetelmää kunkin osaston palkkojen juoksevan kokonaissumman laskemiseksi. Tulos on uudessa DataFrame-kehyksessä nimeltä "running_sum_sal_df", joka sisältää sarakkeet 'empno', 'name', 'job', 'deptno', 'sal' ja 'running_total'.
  • Lopuksi "show()"-menetelmää kutsutaan "running_sum_sal_df" DataFrame -kehyksessä kyselyn tulosteen näyttämiseksi. Tuloksena oleva DataFrame näyttää kunkin työntekijän juoksevan palkan ja muut tiedot, kuten nimen, osaston numeron ja työn.

lähtö:

Tulosteessa on kunkin osaston palkkatietojen juokseva summa.

Q5: Seuraava palkka kussakin osastossa.

Löytääksemme seuraavan palkan kussakin osastossa käytämme LEAD-toimintoa. 

Ikkunafunktio lead() auttaa saamaan lausekkeen arvon ikkunaosion seuraavalla rivillä. Se palauttaa sarakkeen jokaiselle syöttösarakkeelle, jossa jokainen sarake sisältää syöttösarakkeen arvon ikkuna-osion nykyisen rivin yläpuolella olevan offset-rivin syöttösarakkeelle. Johtofunktion syntaksi on:- lead(col, offset=1, default=None).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

PySpark-koodin lähestymistapa

  • Ensinnäkin ikkunatoiminto auttaa osioimaan DataFramen rivit osastonumeron (deptno) mukaan ja järjestämään palkat laskevassa järjestyksessä kunkin osion sisällä.
  • Lead()-funktiota käytetään sitten kunkin osion määrättyyn "sal"-sarakkeeseen seuraavan työntekijän palkan palauttamiseksi (erotuksen ollessa 1), ja oletusarvo on 0, jos seuraavaa työntekijää ei ole.
  • Tuloksena oleva DataFrame 'next_salary_df' sisältää sarakkeet työntekijän numerolle (empno), nimelle (nimi), tehtävänimikkeelle (työ), osaston numerolle (deptno), nykyiselle palkalle (sal) ja seuraavalle palkalle (next_val).

lähtö:

Tulos sisältää osastolla seuraavan työntekijän palkan alenevassa järjestyksessä. 

Q6. Aikaisempi palkka joka osastolla.

Aiemman palkan laskemiseen käytämme LAG-funktiota.

Viivefunktio palauttaa lausekkeen arvon annetulla siirtymällä ennen nykyistä riviä ikkunaosion sisällä. Viivefunktion syntaksi on:- lag(expr, offset=1, default=None).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

PySpark-koodin lähestymistapa

  • window.partitionBy(col('deptno')) määrittää ikkunaosion. Tämä tarkoittaa, että ikkunatoiminto toimii erikseen jokaisessa osastossa.
  • Sitten orderBy(col('sal').desc()) määrittää palkan järjestyksen ja järjestää palkat kunkin osaston sisällä laskevassa järjestyksessä.
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') luo uuden sarakkeen nimeltä prev_val DataFrame-kehykseen 'prev_sal_df'.
  • Jokaiselle riville tämä sarake sisältää 'sal'-sarakkeen arvon edelliseltä riviltä windowSpec:n määrittämässä ikkunassa.
  • Parametri offset=1 osoittaa, että edellisen rivin tulee olla yksi rivi ennen nykyistä riviä, ja default=0 määrittää kunkin osion ensimmäisen rivin oletusarvon (koska ensimmäisellä rivillä ei ole edellistä riviä).
  • Lopuksi prev_sal_df.show() näyttää tuloksena olevan DataFramen.

lähtö:

Tulos edustaa kunkin osaston kunkin työntekijän edellistä palkkaa, joka perustuu palkkojen järjestykseen laskevassa järjestyksessä.

Q7. Ensimmäinen palkka kussakin osastossa ja jokaiseen osaston jäseneen verrattuna.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

PySpark-koodin lähestymistapa

  • Luo ensin WindowSpec-objekti, joka jakaa tiedot osastonumeron (deptno) mukaan ja järjestää ne palkan (sal) mukaan laskevassa järjestyksessä.
  • Sovella sitten analyyttistä first()-funktiota 'sal'-sarakkeeseen windowSpec:n määrittämän ikkunan päällä. Tämä funktio palauttaa kunkin osion (eli jokaisen deptno-ryhmän) "sal" -sarakkeen ensimmäisen arvon laskevassa "sal" -järjestyksessä. Tuloksena olevalla sarakkeella on uusi nimi, 'first_val'.
  • Määrittää nyt tuloksena olevan DataFramen, joka sisältää valitut sarakkeet ja uuden sarakkeen "first_val", joka näyttää kunkin osaston ensimmäisen korkeimman palkan palkkaarvojen laskevassa järjestyksessä, uudelle muuttujalle nimeltä "first_value_df".

lähtö:

Tulos näyttää ensimmäisen korkeimman palkan jokaiselle työntekijän DataFrame-osastolle.

Yhteenveto

Tässä artikkelissa opimme ikkunoiden toiminnoista. Spark SQL:ssä on kolmenlaisia ​​ikkunatoimintoja: Ranking-funktiot, Aggregate-funktiot ja Value-funktiot. Tämän toiminnon avulla kehitimme tietojoukkoa löytääksemme tärkeitä ja arvokkaita oivalluksia. Spark Window Functions tarjoaa tehokkaita data-analyysityökaluja, kuten sijoituksen, analytiikan ja arvolaskennan. Nämä toiminnot tarjoavat tärkeitä työkaluja tehokkaaseen tietojenkäsittelyyn ja analysointiin Sparkissa, analysoimalla palkkatietoja osastoittain tai hyödyntämällä käytännön esimerkkejä PySparkin ja SQL:n kanssa.

Keskeiset ostokset

  • Opimme ikkunoiden toiminnoista ja työskentelimme niiden kanssa Spark SQL:n ja PySpark DataFrame API:n avulla.
  • Käytämme toimintoja, kuten rank, dense_rank, row_number, lag, lead, groupBy, partitionBy ja muita toimintoja, jotta voimme tarjota oikean analyysin.
  • Olemme myös nähneet yksityiskohtaiset vaiheittaiset ratkaisut ongelmaan ja analysoineet tulosteen jokaisen ongelmalauseen lopussa.

Tämä tapaustutkimus auttaa sinua ymmärtämään PySpark-toimintoja paremmin. Jos sinulla on mielipiteitä tai kysymyksiä, kommentoi alle. Ota yhteyttä minuun LinkedIn jatkokeskusteluun. Jatka oppimista!!!

Tässä artikkelissa näkyvä media ei ole Analytics Vidhyan omistuksessa, ja sitä käytetään tekijän harkinnan mukaan.

spot_img

Uusin älykkyys

spot_img