לוגו זפירנט

עבודה עם פונקציות חלון ב- PySpark

תאריך:

מבוא

ללמוד על פונקציות חלון ב- PySpark יכול להיות מאתגר אבל שווה את המאמץ. פונקציות חלון הן כלי רב עוצמה לניתוח נתונים ויכולות לעזור לך לקבל תובנות שאולי לא ראית אחרת. על ידי הבנה כיצד להשתמש בפונקציות החלון ב-Spark; אתה יכול לקחת את שלך ניתוח נתונים מיומנויות לשלב הבא ולקבל החלטות מושכלות יותר. בין אם אתה עובד עם גדול או קטן מערכי נתונים, לימוד פונקציות חלון ב-Spark יאפשר לך לתפעל ולנתח נתונים בדרכים חדשות ומרגשות.

פונקציות חלון ב- PySpark

בבלוג זה נבין תחילה את הרעיון של פונקציות חלון ולאחר מכן נדון כיצד להשתמש בהן עם Spark SQL ו-PySpark DataFrame API. כך שעד סוף מאמר זה, תבינו כיצד להשתמש בפונקציות חלונות עם מערכי נתונים אמיתיים ולקבל תובנות חיוניות לעסקים.

מטרות למידה

  • להבין את הרעיון של פונקציות חלון.
  • עבודה עם פונקציות חלונות באמצעות מערכי נתונים.
  • גלה את התובנות באמצעות פונקציות החלון.
  • השתמש ב-Spark SQL וב-DataFrame API כדי לעבוד עם פונקציות חלון.

מאמר זה פורסם כחלק מה- בלוגתון מדעי הנתונים.

תוכן העניינים

מהן פונקציות חלון?

פונקציות חלון עוזרות לנתח נתונים בתוך קבוצת שורות הקשורות זו לזו. הם מאפשרים למשתמשים לבצע טרנספורמציות מורכבות בשורות של מסגרת נתונים או מערך נתונים המשויכים זה לזה בהתבסס על קריטריונים מסוימים של חלוקה וסדר.

פונקציות חלון פועלות על מחיצה ספציפית של מסגרת נתונים או מערך נתונים המוגדרים על ידי קבוצה של עמודות מחיצות. ה מיין לפי סעיף מחלק את הנתונים בפונקציית חלון כדי לסדר אותם בסדר מסוים. לאחר מכן, פונקציות חלון מבצעות חישובים על חלון הזזה של שורות הכולל את השורה הנוכחית וקבוצת משנה של השורות הקודמות 'ו'/'או' הבאות, כפי שצוין במסגרת החלון.

עבודה עם פונקציות חלון ב- PySpark

כמה דוגמאות נפוצות לפונקציות חלון כוללות חישוב ממוצעים נעים, דירוג או מיון שורות על סמך עמודה או קבוצה ספציפית של עמודות, חישוב סכומים רצים ומציאת הערך הראשון או האחרון בקבוצת שורות. עם פונקציות החלונות החזקות של Spark, משתמשים יכולים לבצע ניתוחים מורכבים וצבירות מערכי נתונים גדולים בקלות יחסית, מה שהופך אותו לכלי פופולרי לגדולים עיבוד נתונים וניתוח.

"

פונקציות חלון ב-SQL

Spark SQL תומך בשלושה סוגים של פונקציות חלון:

  • פונקציות דירוג:- פונקציות אלו מקצות דירוג לכל שורה בתוך מחיצה של ערכת התוצאות. לדוגמה, הפונקציה ROW_NUMBER() נותנת מספר רציף ייחודי לכל שורה בתוך המחיצה.
  • פונקציות אנליטיקה:- פונקציות אלה מחשבות ערכים מצטברים על פני חלון של שורות. לדוגמה, הפונקציה SUM() מחשבת את הסכום של עמודה מעל חלון של שורות.
  • פונקציות ערך:- פונקציות אלו מחשבות ערך אנליטי עבור כל שורה במחיצה, בהתבסס על הערכים של שורות אחרות באותה מחיצה. לדוגמה, הפונקציה LAG() מחזירה את הערך של עמודה מהשורה הקודמת במחיצה.

יצירת DataFrame

אנו ניצור מסגרת נתונים לדוגמה כך שנוכל לעבוד בפועל עם פונקציות שונות של חלון. כמו כן, ננסה לענות על כמה שאלות בעזרת נתונים אלה ופונקציות החלון.

למסגרת הנתונים יש פרטי עובדים כמו שמם, ייעוד, מספר עובד, תאריך העסקה, שכר וכו'. בסך הכל יש לנו 8 עמודות שהן כדלקמן:

  • 'empno': עמודה זו מכילה את מספר העובד.
  • 'ename': בעמודה זו יש שמות עובדים.
  • 'עבודה': עמודה זו מכילה מידע על כותרות העבודה של העובדים.
  • 'תאריך שכירה': עמודה זו מציגה את תאריך העסקה של העובד.
  • 'sal': פרטי השכר מכילים בעמודה זו.
  • 'comm': בעמודה זו יש פרטי עמלות עובדים, אם יש כאלה.
  • 'deptno': מספר המחלקה אליה משתייך העובד נמצא בעמודה זו.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

כעת נבדוק את הסכימה:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

צור תצוגה זמנית של ה-DataFrame 'emp_df' עם השם "emp". זה מאפשר לנו לבצע שאילתות על ה-DataFrame באמצעות תחביר SQL ב-Spark SQL כאילו הייתה טבלה. התצוגה הזמנית תקפה רק למשך סשן Spark.

emp_df.createOrReplaceTempView("emp")

פתרון הצהרות בעיות באמצעות פונקציות חלון

כאן נפתור מספר הצהרות בעיה באמצעות פונקציות של Windows:

שאלה 1. דרג את השכר בתוך כל מחלקה.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

גישה לקוד PySpark

  • הפונקציה Window מחלקת את הנתונים לפי מספר מחלקה באמצעות partitionBy(col('deptno')) ולאחר מכן מזמינה את הנתונים בתוך כל מחיצה לפי משכורת בסדר יורד באמצעות orderBy(col('sal').desc()). המשתנה windowSpec מחזיק את מפרט החלון הסופי.
  • 'emp_df' הוא מסגרת הנתונים המכילה נתוני עובדים, כולל עמודות עבור empno, ename, job, deptno ו-sal.
  • פונקציית הדירוג מוחלת על עמודת השכר באמצעות 'F.rank().over(windowSpec)' בתוך הצהרת הבחירה. לעמודה המתקבלת יש שם כינוי בתור 'דרג'.
  • זה יצור מסגרת נתונים, 'ranking_result_df', הכוללת empno, ename, job, deptno ומשכורת. יש לו גם עמודה חדשה, 'דרג', המייצגת את דרגת השכר של העובד בתוך המחלקה שלו.

פלט:

לתוצאה יש דרג שכר בכל מחלקה.

שאלה 2. דירוג צפוף השכר בתוך כל מחלקה.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

גישה לקוד PySpark

  • ראשית, צור מפרט חלון באמצעות הפונקציה Window, אשר מחלקת את DataFrame 'emp_df' לפי deptno ומסדרת אותו על ידי ירידה בעמודה 'sal'.
  • לאחר מכן, הפונקציה dense_rank() מוחלת על מפרט החלון, אשר מקצה דירוג צפוף לכל שורה בתוך כל מחיצה בהתבסס על הסדר הממוין שלה.
  • לבסוף, DataFrame חדש בשם 'dense_ranking_df' נוצר על ידי בחירת עמודות ספציפיות מ-emp_df (כלומר, 'empno', 'ename', 'job', 'deptno' ו-'sal') והוספת עמודה חדשה 'dense_rank' ש מכיל את ערכי הדירוג הצפופים המחושבים על ידי פונקציית החלון.
  • לבסוף, הצג את ה-DataFrame שנוצר בפורמט טבלה.

פלט:

לתוצאה יש דרגה צפופה מבחינת שכר.

שאלה 3. מספר את השורה בתוך כל מחלקה.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

גישה לקוד PySpark

  • השורה הראשונה מגדירה מפרט חלון לחישוב באמצעות הפונקציות Window.partitionBy() ו-Window.orderBy(). חלון זה מחולק לפי עמודת deptno ומסודר לפי עמודת sal בסדר יורד.
  • השורה השנייה יוצרת DataFrame חדש בשם 'row_num_df', השלכה של 'emp_df' עם עמודה נוספת בשם 'row_num' והיא מכילה את פרטי מספרי השורות.
  • הפונקציה show() מציגה את ה-DataFrame שנוצר, המציגה את העמודות empno, ename, job, deptno, sal ו-row_num של כל עובד.

פלט:

הפלט יכלול את מספר השורה של כל עובד במחלקה שלו בהתבסס על השכר שלו.

שאלה 4. ריצה סכום כולל של שכר בתוך כל מחלקה.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

גישה עבור קוד PySpark

  • ראשית, מפרט חלון מוגדר באמצעות השיטות "Window.partitionBy()" ו-"Window.orderBy()". שיטת "partitionBy()" מחלקת את הנתונים לפי העמודה "deptno", בעוד ששיטת "orderBy()" מסדרת את הנתונים לפי העמודה "sal" בסדר יורד.
  • לאחר מכן, הפונקציה "sum()" מוחלת על העמודה "sal" תוך שימוש בשיטת "over()" כדי לחשב את הסכום השוטף של המשכורות בכל מחלקה. התוצאה תהיה ב-DataFrame חדש בשם "running_sum_sal_df", שמכיל את העמודות 'empno', 'ename', 'job', 'deptno', 'sal' ו-'running_total'.
  • לבסוף, השיטה "show()" נקראת ב-"running_sum_sal_df" DataFrame כדי להציג את הפלט של השאילתה. ה-DataFrame שנוצר מציג את סך המשכורות השוטף של כל עובד ופרטים נוספים כמו שם, מספר מחלקה ותפקיד.

פלט:

הפלט יכלול סכום שוטף של נתוני השכר של כל מחלקה.

שאלה 5: השכר הבא בתוך כל מחלקה.

כדי למצוא את השכר הבא בתוך כל מחלקה אנו משתמשים בפונקציית LEAD. 

פונקציית החלון lead() עוזרת לקבל את הערך של הביטוי בשורה הבאה של מחיצת החלון. הוא מחזיר עמודה עבור כל עמודת קלט, כאשר כל עמודה תכיל את הערך של עמודת הקלט עבור שורת ההיסט מעל השורה הנוכחית בתוך מחיצת החלון. התחביר של פונקציית ההובלה הוא:- lead(col, offset=1, default=None).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

גישה לקוד PySpark

  • ראשית, פונקציית החלון עוזרת לחלק את שורות ה-DataFrame לפי מספר מחלקה (deptno) ולסדר את המשכורות בסדר יורד בתוך כל מחיצה.
  • לאחר מכן מופעלת הפונקציה lead() על עמודת ה'sal' המסודרת בתוך כל מחיצה כדי להחזיר את השכר של העובד הבא (עם קיזוז של 1), וערך ברירת המחדל הוא 0 במקרה שאין עובד הבא.
  • ה-DataFrame המתקבל 'next_salary_df' מכיל עמודות עבור מספר העובד (empno), שם (ename), שם המשרה (תפקיד), מספר המחלקה (deptno), השכר הנוכחי (sal) והמשכורת הבאה (next_val).

פלט:

הפלט מכיל את השכר של העובד הבא במחלקה לפי סדר השכר היורד. 

שאלה 6. שכר קודם בכל מחלקה.

כדי לחשב את השכר הקודם, אנו משתמשים בפונקציית LAG.

פונקציית ה-lag מחזירה את הערך של ביטוי בהיסט נתון לפני השורה הנוכחית בתוך מחיצת החלון. התחביר של פונקציית ה-lag הוא:- lag(expr, offset=1, default=None).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

גישה לקוד PySpark

  • ה-window.partitionBy(col('deptno')) מציין את מחיצת החלון. זה אומר שפונקציית החלון פועלת בנפרד עבור כל מחלקה.
  • לאחר מכן orderBy(col('sal').desc()) מציין את סדר השכר ויסדר את המשכורות בתוך כל מחלקה בסדר יורד.
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') יוצר עמודה חדשה בשם prev_val ב-DataFrame 'prev_sal_df'.
  • עבור כל שורה, עמודה זו מכילה את הערך של העמודה 'sal' מהשורה הקודמת בתוך החלון שהוגדר על ידי ה-windowSpec.
  • הפרמטר offset=1 מציין שהשורה הקודמת צריכה להיות שורה אחת לפני השורה הנוכחית, ו-default=0 מציין את ערך ברירת המחדל עבור השורה הראשונה בכל מחיצה (מכיוון שאין שורה קודמת לשורה הראשונה).
  • לבסוף, prev_sal_df.show() מציג את ה-DataFrame שנוצר.

פלט:

התפוקה מייצגת את השכר הקודם לכל עובד בתוך כל מחלקה, בהתבסס על הזמנת המשכורות בסדר יורד.

שאלה 7. שכר ראשון בכל מחלקה והשוואה מול כל חבר בכל מחלקה.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

גישה לקוד PySpark

  • ראשית, צור אובייקט WindowSpec שמחלק את הנתונים לפי מספר מחלקה (deptno) ומסדר אותם לפי שכר (sal) בסדר יורד.
  • לאחר מכן מחיל את הפונקציה האנליטית first() על העמודה 'sal' מעל החלון שהוגדר על ידי windowSpec. פונקציה זו מחזירה את הערך הראשון של העמודה 'sal' בתוך כל מחיצה (כלומר כל קבוצת deptno) מסודרת לפי 'sal' יורד. לעמודה שהתקבלה יש שם חדש, 'first_val'.
  • כעת מקצה את ה-DataFrame המתקבל, המכילה את העמודות שנבחרו ועמודה חדשה, 'first_val', המציגה את השכר הראשון הגבוה ביותר עבור כל מחלקה בהתבסס על הסדר היורד של ערכי השכר, למשתנה חדש בשם 'first_value_df'.

פלט:

הפלט מציג את השכר הראשון הגבוה ביותר עבור כל מחלקה ב-DataFrame של עובד.

סיכום

במאמר זה נלמד על פונקציות חלונות. ל-Spark SQL יש שלושה סוגים של פונקציות חלון: פונקציות דירוג, פונקציות מצטבר ופונקציות ערך. באמצעות פונקציה זו, עבדנו על מערך נתונים כדי למצוא כמה תובנות חשובות ובעלות ערך. פונקציות חלון Spark מציעות כלי ניתוח נתונים רבי עוצמה כמו דירוג, ניתוח וחישובי ערך. בין אם ניתוח תובנות שכר לפי מחלקות או שימוש בדוגמאות מעשיות עם PySpark ו-SQL, פונקציות אלו מספקות כלים חיוניים לעיבוד וניתוח נתונים יעילים ב-Spark.

המנות העיקריות

  • למדנו על פונקציות החלון ועבדנו איתן באמצעות Spark SQL ו-PySpark DataFrame API.
  • אנו משתמשים בפונקציות כגון rank, dense_rank, row_number, lag, lead, groupBy, partitionBy ופונקציות אחרות כדי לספק ניתוח נכון.
  • ראינו גם את הפתרונות המפורטים שלב אחר שלב לבעיה וניתחנו את הפלט בסוף כל הצהרת בעיה.

תיאור מקרה זה עוזר לך להבין טוב יותר את פונקציות PySpark. אם יש לך דעות או שאלות, הגיבו למטה. התחבר איתי הלאה לינקדין לדיון נוסף. תמשיך ללמוד!!!

המדיה המוצגת במאמר זה אינה בבעלות Analytics Vidhya והיא משמשת לפי שיקול דעתו של המחבר.

ספוט_ימג

המודיעין האחרון

ספוט_ימג