Logo Zephyrnet

Nṣiṣẹ pẹlu Awọn iṣẹ Window ni PySpark

ọjọ:

ifihan

Kikọ nipa Awọn iṣẹ Ferese ni PySpark le jẹ nija ṣugbọn tọsi ipa naa. Awọn iṣẹ Window jẹ ohun elo ti o lagbara fun itupalẹ data ati pe o le ṣe iranlọwọ fun ọ lati ni oye ti o le ma ti rii bibẹẹkọ. Nipa agbọye bi o ṣe le lo Awọn iṣẹ Window ni Spark; o le gba rẹ onínọmbà data awọn ọgbọn si ipele ti atẹle ati ṣe awọn ipinnu alaye diẹ sii. Boya o n ṣiṣẹ pẹlu nla tabi kekere awọn ipilẹ data, Ikẹkọ Awọn iṣẹ Window ni Spark yoo gba ọ laaye lati ṣe afọwọyi ati itupalẹ data ni awọn ọna tuntun ati moriwu.

Awọn iṣẹ Window ni PySpark

Ninu bulọọgi yii, a yoo kọkọ loye imọran awọn iṣẹ window ati lẹhinna jiroro bi a ṣe le lo wọn pẹlu Spark SQL ati PySpark DataFrame API. Nitorinaa pe ni ipari nkan yii, iwọ yoo loye bi o ṣe le lo awọn iṣẹ window pẹlu awọn ipilẹ data gidi ati gba awọn oye pataki fun iṣowo.

Awọn Ero ẹkọ

  • Loye ero ti awọn iṣẹ window.
  • Ṣiṣẹ pẹlu awọn iṣẹ window nipa lilo awọn datasets.
  • Wa awọn oye nipa lilo awọn iṣẹ window.
  • Lo Spark SQL ati DataFrame API lati ṣiṣẹ pẹlu awọn iṣẹ window.

Nkan yii ni a tẹjade gẹgẹbi apakan ninu Data Imọ Blogathon.

Atọka akoonu

Kini Awọn iṣẹ Window?

Awọn iṣẹ ferese ṣe iranlọwọ ṣe itupalẹ data laarin ẹgbẹ kan ti awọn ori ila ti o ni ibatan si ara wọn. Wọn jẹki awọn olumulo lati ṣe awọn iyipada eka lori awọn ori ila ti dataframe tabi dataset ti o ni nkan ṣe pẹlu ara wọn ti o da lori diẹ ninu ipin ati awọn ilana aṣẹ.

Awọn iṣẹ ferese ṣiṣẹ lori ipin kan pato ti dataframe tabi dataset ti asọye nipasẹ ṣeto awọn ọwọn ipin. Awọn NIPA NIPA Awọn ipin ipin data ni iṣẹ window lati ṣeto ni aṣẹ kan pato. Awọn iṣẹ ferese lẹhinna ṣe awọn iṣiro lori ferese sisun ti awọn ori ila ti o pẹlu laini lọwọlọwọ ati ipin ti iṣaaju boya 'ati'/'tabi' awọn ori ila ti o tẹle, gẹgẹbi pato ninu fireemu window.

Nṣiṣẹ pẹlu Awọn iṣẹ Window ni PySpark

Diẹ ninu awọn apẹẹrẹ ti o wọpọ ti awọn iṣẹ window pẹlu ṣiṣe iṣiro awọn iwọn gbigbe, ipo tabi tito awọn ori ila ti o da lori iwe kan pato tabi ẹgbẹ ti ọwọn, Iṣiro lapapọ nṣiṣẹ, ati wiwa akọkọ tabi iye to kẹhin ni ẹgbẹ kan ti awọn ori ila. Pẹlu awọn iṣẹ window ti o lagbara ti Spark, awọn olumulo le ṣe awọn itupalẹ eka ati awọn akojọpọ lori awọn ipilẹ data nla pẹlu ojulumo Ease, ṣiṣe awọn ti o kan gbajumo ọpa fun ńlá ṣiṣe data ati atupale.

"

Awọn iṣẹ Window ni SQL

Spark SQL ṣe atilẹyin awọn iru awọn iṣẹ window mẹta:

  • Awọn iṣẹ ipo: - Awọn iṣẹ wọnyi fi ipo kan si ila kọọkan laarin ipin kan ti ṣeto abajade. Fun apẹẹrẹ, iṣẹ ROW_NUMBER() n funni ni nọmba ọkọọkan ti o yatọ si ila kọọkan laarin ipin.
  • Awọn iṣẹ atupale:- Awọn iṣẹ wọnyi ṣe iṣiro awọn iye apapọ lori ferese ti awọn ori ila. Fun apẹẹrẹ, iṣẹ SUM() ṣe iṣiro apao iwe kan lori ferese ti awọn ori ila.
  • Awọn iṣẹ iye: - Awọn iṣẹ wọnyi ṣe iṣiro iye atupale fun ila kọọkan ni ipin kan, da lori awọn iye ti awọn ori ila miiran ni ipin kanna. Fun apẹẹrẹ, iṣẹ LAG () pada iye ti iwe kan lati ila ti tẹlẹ ninu ipin.

DataFrame Ṣiṣẹda

A yoo ṣẹda a ayẹwo dataframe ki, ti a le Oba ṣiṣẹ pẹlu o yatọ si window awọn iṣẹ. Bakannaa a yoo gbiyanju lati dahun diẹ ninu awọn ibeere pẹlu iranlọwọ ti yi data ati window awọn iṣẹ.

Awọn dataframe ni awọn alaye ti awọn oṣiṣẹ bi Orukọ wọn, Apejọ, Nọmba Oṣiṣẹ, Ọjọ igbanisise, Owo-oṣu ati bẹbẹ lọ Lapapọ a ni awọn ọwọn 8 eyiti o jẹ atẹle:

  • 'empno': Oju-iwe yii ni nọmba oṣiṣẹ ninu.
  • 'ename': Iwe yii ni awọn orukọ oṣiṣẹ.
  • 'iṣẹ': Iwe yii ni alaye nipa awọn akọle iṣẹ ti oṣiṣẹ.
  • 'Hiredate': Iwe yii fihan ọjọ igbanisise oṣiṣẹ.
  • 'sal': Awọn alaye ekunwo ni ninu iwe yii.
  • 'comm': Iwe yii ni awọn alaye igbimọ oṣiṣẹ, ti o ba jẹ eyikeyi.
  • 'deptno': Nọmba ẹka ti oṣiṣẹ jẹ wa ninu iwe yii.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

Bayi a yoo ṣayẹwo eto naa:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

Ṣẹda wiwo igba diẹ ti DataFrame 'emp_df' pẹlu orukọ "emp". O gba wa laaye lati beere DataFrame nipa lilo SQL syntax ni Spark SQL bi ẹnipe o jẹ tabili kan. Wiwo igba diẹ wulo nikan fun iye akoko akoko Spark.

emp_df.createOrReplaceTempView("emp")

Yiyan Awọn Gbólóhùn Isoro Lilo Awọn iṣẹ Window

Nibi a yoo yanju ọpọlọpọ awọn alaye iṣoro nipa lilo awọn iṣẹ Windows:

Q1. Ṣe ipo owo osu laarin ẹka kọọkan.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

Ọna fun koodu PySpark

  • Iṣẹ Window naa pin data naa nipasẹ nọmba ẹka nipa lilo partitionBy (col ('deptno')) ati lẹhinna paṣẹ data laarin ipin kọọkan nipasẹ owo-oṣu ni aṣẹ sọkalẹ nipa lilo aṣẹBy (col ('sal').desc ()). WindowSpec oniyipada di sipesifikesonu window ipari.
  • 'emp_df' ni dataframe ti o ni data oṣiṣẹ ninu, pẹlu awọn ọwọn fun empno, ename, iṣẹ, deptno ati sal.
  • Iṣẹ ipo naa ni a lo si iwe isanwo nipa lilo 'F.rank () . lori (windowSpec)' laarin alaye yiyan. Iwe abajade ni orukọ inagijẹ bi 'ipo'.
  • Yoo ṣẹda fireemu data kan, 'ranking_result_df', eyiti o pẹlu empno, ename, iṣẹ, deptno, ati owo osu. O tun ni iwe tuntun, 'ipo', ti o duro fun ipo ti owo osu oṣiṣẹ laarin ẹka wọn.

o wu:

Abajade ni ipo isanwo ni ẹka kọọkan.

Q2. Ipon ipo awọn ekunwo laarin kọọkan Eka.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

Ọna fun koodu PySpark

  • Ni akọkọ, ṣẹda sipesifikesonu window kan nipa lilo iṣẹ Window, eyiti o pin ipin 'emp_df' DataFrame nipasẹ deptno ati paṣẹ nipasẹ sisọ isalẹ iwe 'sal'.
  • Lẹhinna, iṣẹ dense_rank () ni a lo lori sipesifikesonu window, eyiti o fi ipo ipon si laini kọọkan laarin ipin kọọkan ti o da lori aṣẹ lẹsẹsẹ rẹ.
  • Ni ipari, DataFrame tuntun ti a pe ni 'dense_ranking_df' ni a ṣẹda nipasẹ yiyan awọn ọwọn kan pato lati emp_df (ie, 'empno', 'ename', 'iṣẹ', 'deptno', ati 'sal') ati fifi iwe tuntun kan 'dense_rank' pe ni awọn iye ipo ipon ṣe iṣiro nipasẹ iṣẹ window.
  • Ni ikẹhin, ṣafihan Abajade DataFrame ni ọna kika tabular.

o wu:

Abajade naa ni ipo ipon-ọlọgbọn-ọya kan.

Q3. Nọmba awọn kana laarin kọọkan ẹka.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

Ọna fun koodu PySpark

  • Laini akọkọ n ṣalaye sipesifikesonu window kan fun iṣiro nipa lilo awọn iṣẹ Window.partitionBy () ati Window.orderBy (). Ferese yii ti pin nipasẹ ọwọn deptno ati paṣẹ nipasẹ ọwọn sal ni ọna ti o sọkalẹ.
  • Laini keji ṣẹda DataFrame tuntun ti a pe ni 'row_num_df', asọtẹlẹ kan ti 'emp_df' pẹlu iwe afikun ti a pe ni 'row_num' ati pe o ni awọn alaye awọn nọmba ila.
  • Awọn iṣẹ show () han awọn Abajade DataFrame, eyi ti o fihan kọọkan abáni ká empno, ename, ise, deptno, sal, ati row_num ọwọn.

o wu:

Ijade yoo ni nọmba ila ti oṣiṣẹ kọọkan laarin ẹka wọn ti o da lori owo osu wọn.

Q4. Nṣiṣẹ lapapọ apao ti ekunwo laarin kọọkan Eka.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

ona fun PySpark koodu

  • Ni akọkọ, asọye window kan jẹ asọye nipa lilo awọn ọna “Window.partitionBy ()” ati “Window.orderBy ()”. Ọna “partitionBy ()” ṣe ipin data naa nipasẹ iwe “deptno”, lakoko ti ọna “orderBy ()” paṣẹ data nipasẹ iwe “sal” ni ọna ti n sọkalẹ.
  • Nigbamii ti, iṣẹ “apao ()” ni a lo si iwe “sal” ni lilo ọna “over ()” lati ṣe iṣiro apapọ ṣiṣe awọn owo osu laarin ẹka kọọkan. Abajade yoo wa ninu DataFrame tuntun ti a pe ni “running_sum_sal_df”, eyiti o ni awọn ọwọn 'empno', 'ename', 'iṣẹ', 'deptno', 'sal', ati 'running_total' ninu.
  • Nikẹhin, ọna “show ()” ni a pe lori “running_sum_sal_df” DataFrame lati ṣe afihan iṣẹjade ibeere naa. Abajade DataFrame fihan apapọ oṣiṣẹ ti oṣiṣẹ kọọkan ti awọn owo osu ati awọn alaye miiran bi orukọ, nọmba ẹka, ati iṣẹ.

o wu:

Ijade yoo ni apapọ ṣiṣiṣẹ ti data isanwo ti ẹka kọọkan.

Q5: Oya ti o tẹle laarin ẹka kọọkan.

Lati wa owo osu ti o tẹle laarin ẹka kọọkan a lo iṣẹ LEAD. 

Awọn iṣẹ window asiwaju () ṣe iranlọwọ lati gba iye ti ikosile ni ila atẹle ti ipin window. O da iwe kan pada fun iwe titẹ sii kọọkan, nibiti iwe kọọkan yoo ni iye ti iwe titẹ sii fun ila aiṣedeede loke ila ti isiyi laarin ipin window. Awọn sintasi fun ise asiwaju jẹ:- lead(col, offset=1, default=Ko si).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

Ọna fun koodu PySpark

  • Ni akọkọ, iṣẹ window ṣe iranlọwọ lati pin awọn ori ila DataFrame nipasẹ nọmba ẹka (deptno) ati paṣẹ awọn owo osu ni aṣẹ ti o sọkalẹ laarin ipin kọọkan.
  • Iṣẹ asiwaju () lẹhinna lo si iwe 'sal' ti a paṣẹ laarin ipin kọọkan lati da owo-oṣu ti oṣiṣẹ atẹle pada (pẹlu aiṣedeede ti 1), ati pe iye aiyipada jẹ 0 ni ọran ko si oṣiṣẹ atẹle.
  • Abajade DataFrame 'next_salary_df' ni awọn ọwọn fun nọmba oṣiṣẹ (empno), orukọ (ename), akọle iṣẹ (iṣẹ), nọmba ẹka (deptno), owo osu lọwọlọwọ (sal), ati owo-oṣu atẹle (next_val).

o wu:

Ijade naa ni owo osu ti oṣiṣẹ ti o tẹle ni ẹka ti o da lori aṣẹ ti owo-oṣu ti o sọkalẹ. 

Q6. Ti tẹlẹ ekunwo laarin kọọkan Eka.

Lati ṣe iṣiro owo osu iṣaaju, a lo iṣẹ LAG.

Iṣẹ aisun da iye ikosile pada ni aiṣedeede ti a fun ṣaaju laini lọwọlọwọ laarin ipin window. Sintasi ti iṣẹ aisun jẹ: - aisun (expr, offset=1, default=Ko si).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

Ọna fun koodu PySpark

  • Awọn window.partitionBy (col ('deptno')) pato awọn window ipin. O tumọ si pe iṣẹ window ṣiṣẹ lọtọ fun ẹka kọọkan.
  • Lẹhinna paṣẹBy (col ('sal') .desc ()) pato aṣẹ ti owo-oṣu ati pe yoo paṣẹ awọn owo osu laarin ẹka kọọkan ni ilana ti o sọkalẹ.
  • F.lag ('sal', aiṣedeede=1, aiyipada=0)
  • Fun kana kọọkan, iwe yii ni iye ti iwe 'sal' lati ori ila ti tẹlẹ laarin window ti asọye nipasẹ windowSpec.
  • Aṣeṣe = paramita 1 tọkasi pe ila ti tẹlẹ yẹ ki o jẹ ila kan ṣaaju ila lọwọlọwọ, ati aiyipada = 0 ṣe afihan iye aiyipada fun ila akọkọ ni apakan kọọkan (niwọn igba ti ko si laini iṣaaju fun laini akọkọ).
  • Lakotan, prev_sal_df.show () ṣe afihan DataFrame ti o jẹ abajade.

o wu:

Ijade naa ṣe aṣoju owo-oṣu iṣaaju fun oṣiṣẹ kọọkan laarin ẹka kọọkan, da lori pipaṣẹ awọn owo osu ni aṣẹ ti n sọkalẹ.

Q7. Owo osu akọkọ laarin ẹka kọọkan ati ifiwera si gbogbo ọmọ ẹgbẹ laarin ẹka kọọkan.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

Ọna fun koodu PySpark

  • Ni akọkọ, ṣẹda ohun WindowSpec kan ti o pin data naa nipasẹ nọmba ẹka (deptno) ati paṣẹ nipasẹ owo-oṣu (sal) ni aṣẹ ti n sọkalẹ.
  • Lẹhinna lo iṣẹ itupalẹ akọkọ () si iwe 'sal' lori window ti asọye nipasẹ windowSpec. Iṣẹ yii da iye akọkọ ti ọwọn 'sal' pada laarin ipin kọọkan (ie ẹgbẹ kọọkan deptno) paṣẹ nipasẹ sisọkalẹ 'sal'. Iwe abajade ni orukọ titun, 'first_val'.
  • Bayi ṣe ipinnu DataFrame ti o yọrisi, eyiti o ni awọn ọwọn ti o yan ati iwe tuntun kan, 'first_val', ti o ṣe afihan owo-oṣu akọkọ ti o ga julọ fun ẹka kọọkan ti o da lori aṣẹ ti o sọkalẹ ti awọn iye owo osu, si oniyipada tuntun ti a pe ni 'first_value_df'.

o wu:

Ijade naa fihan owo osu akọkọ ti o ga julọ fun ẹka kọọkan ni DataFrame oṣiṣẹ.

ipari

Ninu àpilẹkọ yii, a kọ ẹkọ nipa awọn iṣẹ window. Spark SQL ni iru awọn iṣẹ window mẹta: Awọn iṣẹ ipo, Awọn iṣẹ apapọ ati awọn iṣẹ Iye. Lilo iṣẹ yii, a ṣiṣẹ lori ipilẹ data lati wa diẹ ninu awọn oye pataki ati ti o niyelori. Awọn iṣẹ Window Spark nfunni awọn irinṣẹ itupalẹ data ti o lagbara bi ipo, awọn atupale, ati awọn iṣiro iye. Boya itupalẹ awọn oye owo osu nipasẹ ẹka tabi lilo awọn apẹẹrẹ ilowo pẹlu PySpark & ​​SQL, awọn iṣẹ wọnyi pese awọn irinṣẹ pataki fun sisẹ data to munadoko ati itupalẹ ni Spark.

Awọn Iparo bọtini

  • A kọ ẹkọ nipa awọn iṣẹ window ati ṣiṣẹ pẹlu wọn nipa lilo Spark SQL ati PySpark DataFrame API.
  • A lo awọn iṣẹ bii ipo, dense_rank, row_number, aisun, asiwaju, groupBy, partitionBy, ati awọn iṣẹ miiran lati pese itupalẹ to dara.
  • A tun ti rii alaye ni awọn ojutu igbese-nipasẹ-igbesẹ si iṣoro naa ati ṣe itupalẹ abajade ni ipari alaye iṣoro kọọkan.

Iwadi ọran yii ṣe iranlọwọ fun ọ lati ni oye awọn iṣẹ PySpark daradara. Ti o ba ni awọn imọran tabi awọn ibeere, lẹhinna sọ asọye ni isalẹ. Sopọ pẹlu mi lori LinkedIn fun siwaju fanfa. Tesiwaju Kọ ẹkọ !!!

Media ti o han ninu nkan yii kii ṣe ohun ini nipasẹ Vidhya atupale ati pe o lo ni lakaye ti Onkọwe.

iranran_img

Titun oye

iranran_img