โลโก้เซเฟอร์เน็ต

การทำงานกับฟังก์ชั่นหน้าต่างใน PySpark

วันที่:

บทนำ

การเรียนรู้เกี่ยวกับฟังก์ชันของหน้าต่างใน PySpark อาจเป็นเรื่องที่ท้าทายแต่ก็คุ้มค่ากับความพยายาม ฟังก์ชันหน้าต่างเป็นเครื่องมือที่มีประสิทธิภาพสำหรับการวิเคราะห์ข้อมูลและสามารถช่วยให้คุณได้รับข้อมูลเชิงลึกที่คุณอาจไม่เคยเห็นมาก่อน ด้วยการทำความเข้าใจการใช้งาน Window Functions ใน Spark คุณสามารถรับของคุณ การวิเคราะห์ข้อมูล ทักษะไปอีกระดับและทำการตัดสินใจอย่างมีข้อมูลมากขึ้น ไม่ว่าคุณจะทำงานกับขนาดใหญ่หรือเล็ก ชุดข้อมูลการเรียนรู้ฟังก์ชันหน้าต่างใน Spark จะช่วยให้คุณจัดการและวิเคราะห์ข้อมูลด้วยวิธีใหม่และน่าตื่นเต้น

ฟังก์ชั่นหน้าต่างใน PySpark

ในบล็อกนี้ ก่อนอื่นเราจะเข้าใจแนวคิดของฟังก์ชันหน้าต่าง จากนั้นจึงอภิปรายวิธีใช้งานฟังก์ชันเหล่านี้กับ Spark SQL และ PySpark DataFrame API เพื่อที่ในตอนท้ายของบทความนี้ คุณจะเข้าใจวิธีใช้ฟังก์ชันหน้าต่างกับชุดข้อมูลจริง และรับข้อมูลเชิงลึกที่จำเป็นสำหรับธุรกิจ

วัตถุประสงค์การเรียนรู้

  • เข้าใจแนวคิดของฟังก์ชันหน้าต่าง
  • การทำงานกับฟังก์ชันหน้าต่างโดยใช้ชุดข้อมูล
  • ค้นหาข้อมูลเชิงลึกโดยใช้ฟังก์ชันหน้าต่าง
  • ใช้ Spark SQL และ DataFrame API เพื่อทำงานกับฟังก์ชันหน้าต่าง

บทความนี้เผยแพร่โดยเป็นส่วนหนึ่งของไฟล์ Blogathon วิทยาศาสตร์ข้อมูล

สารบัญ

ฟังก์ชั่นหน้าต่างคืออะไร?

ฟังก์ชันหน้าต่างช่วยวิเคราะห์ข้อมูลภายในกลุ่มแถวที่เกี่ยวข้องกัน ช่วยให้ผู้ใช้สามารถดำเนินการแปลงที่ซับซ้อนในแถวของดาต้าเฟรมหรือชุดข้อมูลที่เชื่อมโยงซึ่งกันและกัน โดยขึ้นอยู่กับการแบ่งพาร์ติชันและเกณฑ์การเรียงลำดับ

ฟังก์ชั่นหน้าต่างทำงานบนพาร์ติชันเฉพาะของดาต้าเฟรมหรือชุดข้อมูลที่กำหนดโดยชุดของคอลัมน์การแบ่งพาร์ติชัน ที่ สั่งโดย clause แบ่งพาร์ติชันข้อมูลในฟังก์ชันหน้าต่างเพื่อจัดเรียงตามลำดับเฉพาะ จากนั้นฟังก์ชันหน้าต่างจะทำการคำนวณบนหน้าต่างเลื่อนของแถวที่มีแถวปัจจุบันและเซตย่อยของแถวก่อนหน้า 'และ'/'หรือ' ตามมา ตามที่ระบุไว้ในกรอบหน้าต่าง

การทำงานกับฟังก์ชั่นหน้าต่างใน PySpark

ตัวอย่างทั่วไปของฟังก์ชันหน้าต่าง ได้แก่ การคำนวณค่าเฉลี่ยเคลื่อนที่ การจัดอันดับหรือการเรียงลำดับแถวตามคอลัมน์หรือกลุ่มเฉพาะของ คอลัมน์คำนวณผลรวมที่กำลังรัน และค้นหาค่าแรกหรือค่าสุดท้ายในกลุ่มแถว ด้วยฟังก์ชันหน้าต่างอันทรงพลังของ Spark ผู้ใช้สามารถทำการวิเคราะห์และการรวมกลุ่มที่ซับซ้อนได้ ชุดข้อมูลขนาดใหญ่ ได้อย่างง่ายดายทำให้เป็นเครื่องมือยอดนิยมสำหรับคนตัวใหญ่ การประมวลผล และการวิเคราะห์

"

ฟังก์ชั่นหน้าต่างใน SQL

Spark SQL รองรับฟังก์ชันหน้าต่างสามประเภท:

  • ฟังก์ชั่นการจัดอันดับ:- ฟังก์ชันเหล่านี้จะกำหนดอันดับให้กับแต่ละแถวภายในพาร์ติชันของชุดผลลัพธ์ ตัวอย่างเช่น ฟังก์ชัน ROW_NUMBER() จะให้หมายเลขลำดับที่ไม่ซ้ำกันแก่แต่ละแถวภายในพาร์ติชัน
  • ฟังก์ชั่นการวิเคราะห์:- ฟังก์ชันเหล่านี้จะคำนวณค่ารวมในหน้าต่างแถว ตัวอย่างเช่น ฟังก์ชัน SUM() จะคำนวณผลรวมของคอลัมน์เหนือหน้าต่างของแถว
  • ฟังก์ชั่นค่า:- ฟังก์ชันเหล่านี้จะคำนวณค่าการวิเคราะห์สำหรับแต่ละแถวในพาร์ติชัน โดยยึดตามค่าของแถวอื่นๆ ในพาร์ติชันเดียวกัน ตัวอย่างเช่น ฟังก์ชัน LAG() จะส่งคืนค่าของคอลัมน์จากแถวก่อนหน้าในพาร์ติชัน

การสร้างเฟรมข้อมูล

เราจะสร้าง dataframe ตัวอย่างเพื่อให้เราสามารถทำงานกับฟังก์ชันหน้าต่างต่างๆ ได้จริง นอกจากนี้เราจะพยายามตอบคำถามด้วยความช่วยเหลือของข้อมูลและฟังก์ชันหน้าต่างนี้

Dataframe จะมีรายละเอียดพนักงาน เช่น Name, Designation, Employee Number, Hire Date, Salary เป็นต้น รวมทั้งหมด 8 คอลัมน์ ดังนี้

  • 'empno': คอลัมน์นี้ประกอบด้วยหมายเลขพนักงาน
  • 'ename': คอลัมน์นี้มีชื่อพนักงาน
  • 'งาน': คอลัมน์นี้มีข้อมูลเกี่ยวกับตำแหน่งงานของพนักงาน
  • 'hiredate': คอลัมน์นี้แสดงวันที่จ้างงานของพนักงาน
  • 'sal': รายละเอียดเงินเดือนอยู่ในคอลัมน์นี้
  • 'comm': คอลัมน์นี้มีรายละเอียดค่าคอมมิชชั่นของพนักงาน ถ้ามี
  • 'deptno': หมายเลขแผนกที่พนักงานอยู่ในคอลัมน์นี้
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

ตอนนี้เราจะตรวจสอบสคีมา:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

สร้างมุมมองชั่วคราวของ DataFrame 'emp_df' ด้วยชื่อ "emp" ช่วยให้เราสามารถสืบค้น DataFrame โดยใช้ไวยากรณ์ SQL ใน Spark SQL ราวกับว่าเป็นตาราง มุมมองชั่วคราวใช้ได้เฉพาะในช่วงระยะเวลาของเซสชัน Spark เท่านั้น

emp_df.createOrReplaceTempView("emp")

การแก้ปัญหาคำชี้แจงปัญหาโดยใช้ฟังก์ชันหน้าต่าง

ที่นี่เราจะแก้ไขปัญหาหลายประการโดยใช้ฟังก์ชัน Windows:

ไตรมาสที่ 1 จัดอันดับเงินเดือนในแต่ละแผนก

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

แนวทางสำหรับโค้ด PySpark

  • ฟังก์ชัน Window จะแบ่งพาร์ติชันข้อมูลตามหมายเลขแผนกโดยใช้ partitionBy(col('deptno')) จากนั้นจัดลำดับข้อมูลภายในแต่ละพาร์ติชันตามเงินเดือนจากมากไปน้อยโดยใช้ orderBy(col('sal').desc()) ตัวแปร windowSpec เก็บข้อกำหนดหน้าต่างขั้นสุดท้ายไว้
  • 'emp_df' คือ dataframe ที่มีข้อมูลพนักงาน รวมถึงคอลัมน์สำหรับ empno, ename, job, deptno และ sal
  • ฟังก์ชันอันดับถูกนำไปใช้กับคอลัมน์เงินเดือนโดยใช้ 'F.rank().over(windowSpec)' ภายในคำสั่ง select คอลัมน์ผลลัพธ์มีชื่อนามแฝงว่า 'rank'
  • มันจะสร้าง dataframe 'ranking_result_df' ซึ่งประกอบด้วย empno, ename, job, deptno และเงินเดือน นอกจากนี้ยังมีคอลัมน์ใหม่ 'อันดับ' ซึ่งแสดงถึงอันดับเงินเดือนของพนักงานภายในแผนกของพวกเขา

Output:

ผลลัพธ์มีอันดับเงินเดือนในแต่ละแผนก

ไตรมาสที่ 2 หนาแน่นจัดอันดับเงินเดือนในแต่ละแผนก

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

แนวทางสำหรับโค้ด PySpark

  • ขั้นแรก สร้างข้อกำหนดหน้าต่างโดยใช้ฟังก์ชัน Window ซึ่งจะแบ่งพาร์ติชัน DataFrame 'emp_df' ด้วย deptno และจัดลำดับโดยลดคอลัมน์ 'sal' จากมากไปน้อย
  • จากนั้น ฟังก์ชัน Dens_rank() จะถูกนำไปใช้กับข้อกำหนดเฉพาะของหน้าต่าง ซึ่งจะกำหนดอันดับหนาแน่นให้กับแต่ละแถวภายในแต่ละพาร์ติชันตามลำดับการจัดเรียง
  • ในที่สุด DataFrame ใหม่ที่เรียกว่า 'dense_ranking_df' จะถูกสร้างขึ้นโดยการเลือกคอลัมน์เฉพาะจาก emp_df (เช่น 'empno', 'ename', 'งาน', 'deptno' และ 'sal') และเพิ่มคอลัมน์ใหม่ 'dense_rank' ที่ มีค่าการจัดอันดับหนาแน่นซึ่งคำนวณโดยฟังก์ชันหน้าต่าง
  • สุดท้าย แสดง DataFrame ที่เป็นผลลัพธ์ในรูปแบบตาราง

Output:

ผลลัพธ์มีอันดับหนาแน่นตามเงินเดือน

ไตรมาสที่ 3 ระบุหมายเลขแถวภายในแต่ละแผนก

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

แนวทางสำหรับโค้ด PySpark

  • บรรทัดแรกกำหนดข้อกำหนดหน้าต่างสำหรับการคำนวณโดยใช้ฟังก์ชัน Window.partitionBy() และ Window.orderBy() หน้าต่างนี้ถูกแบ่งพาร์ติชันตามคอลัมน์ deptno และเรียงลำดับตามคอลัมน์ sal ตามลำดับจากมากไปน้อย
  • บรรทัดที่สองสร้าง DataFrame ใหม่ชื่อ 'row_num_df' ซึ่งเป็นเส้นโครงของ 'emp_df' พร้อมด้วยคอลัมน์เพิ่มเติมที่เรียกว่า 'row_num' และมีรายละเอียดหมายเลขแถว
  • ฟังก์ชัน show() จะแสดง DataFrame ผลลัพธ์ ซึ่งแสดงคอลัมน์ empno, ename, job, deptno, sal และ row_num ของพนักงานแต่ละคน

Output:

ผลลัพธ์จะมีหมายเลขแถวของพนักงานแต่ละคนภายในแผนกโดยอิงตามเงินเดือน

ไตรมาสที่ 4 วิ่งผลรวมเงินเดือนในแต่ละแผนก

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

เข้าใกล้ สำหรับโค้ด PySpark

  • ขั้นแรก ข้อกำหนดเฉพาะของหน้าต่างถูกกำหนดโดยใช้วิธี “Window.partitionBy()” และ “Window.orderBy()” เมธอด “partitionBy()” จะแบ่งพาร์ติชันข้อมูลด้วยคอลัมน์ “deptno” ในขณะที่เมธอด “orderBy()” จะเรียงลำดับข้อมูลตามคอลัมน์ “sal” ตามลำดับจากมากไปน้อย
  • จากนั้น ฟังก์ชัน "sum()" จะถูกนำไปใช้กับคอลัมน์ "sal" โดยใช้วิธี "over()" เพื่อคำนวณยอดรวมของเงินเดือนภายในแต่ละแผนก ผลลัพธ์จะอยู่ใน DataFrame ใหม่ที่เรียกว่า “running_sum_sal_df” ซึ่งมีคอลัมน์ 'empno', 'ename', 'job', 'deptno', 'sal' และ 'running_total'
  • สุดท้ายนี้ มีการเรียกเมธอด "show()" บน DataFrame "running_sum_sal_df" เพื่อแสดงผลลัพธ์ของการสืบค้น DataFrame ที่เป็นผลลัพธ์จะแสดงเงินเดือนรวมของพนักงานแต่ละคนและรายละเอียดอื่นๆ เช่น ชื่อ หมายเลขแผนก และงาน

Output:

ผลลัพธ์จะมีข้อมูลเงินเดือนรวมของแต่ละแผนกที่ทำงานอยู่

Q5: เงินเดือนถัดไปในแต่ละแผนก

เพื่อค้นหาเงินเดือนถัดไปในแต่ละแผนก เราใช้ฟังก์ชัน LEAD 

ฟังก์ชันหน้าต่าง lead() ช่วยในการรับค่าของนิพจน์ในแถวถัดไปของพาร์ติชันหน้าต่าง โดยจะส่งกลับคอลัมน์สำหรับแต่ละคอลัมน์อินพุต โดยแต่ละคอลัมน์จะมีค่าของคอลัมน์อินพุตสำหรับแถวออฟเซ็ตที่อยู่เหนือแถวปัจจุบันภายในพาร์ติชันหน้าต่าง ไวยากรณ์สำหรับฟังก์ชัน lead คือ:- lead(col, offset=1, default=None)

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

แนวทางสำหรับโค้ด PySpark

  • ขั้นแรก ฟังก์ชันหน้าต่างจะช่วยแบ่งพาร์ติชันแถวของ DataFrame ตามหมายเลขแผนก (deptno) และเรียงลำดับเงินเดือนจากมากไปน้อยภายในแต่ละพาร์ติชัน
  • จากนั้น ฟังก์ชัน lead() จะถูกนำไปใช้กับคอลัมน์ 'sal' ที่เรียงลำดับภายในแต่ละพาร์ติชันเพื่อส่งคืนเงินเดือนของพนักงานต่อไปนี้ (โดยมีออฟเซ็ตเป็น 1) และค่าเริ่มต้นคือ 0 ในกรณีที่ไม่มีพนักงานคนถัดไป
  • DataFrame ที่เป็นผลลัพธ์ 'next_salary_df' มีคอลัมน์สำหรับหมายเลขพนักงาน (empno), ชื่อ (ename), ตำแหน่งงาน (งาน), หมายเลขแผนก (deptno), เงินเดือนปัจจุบัน (sal) และเงินเดือนถัดไป (next_val)

Output:

ผลลัพธ์ประกอบด้วยเงินเดือนของพนักงานคนถัดไปในแผนกตามลำดับเงินเดือนจากมากไปน้อย 

คำถามที่ 6 เงินเดือนก่อนหน้าในแต่ละแผนก

ในการคำนวณเงินเดือนก่อนหน้า เราใช้ฟังก์ชัน LAG

ฟังก์ชัน lag ส่งกลับค่าของนิพจน์ที่ออฟเซ็ตที่กำหนดก่อนแถวปัจจุบันภายในพาร์ติชันหน้าต่าง ไวยากรณ์ของฟังก์ชัน lag คือ:- lag(expr, offset=1, default=None).over(windowSpec)

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

แนวทางสำหรับโค้ด PySpark

  • window.partitionBy(col('deptno')) ระบุพาร์ติชันหน้าต่าง หมายความว่าฟังก์ชันหน้าต่างทำงานแยกกันสำหรับแต่ละแผนก
  • จากนั้น orderBy(col('sal').desc()) จะระบุลำดับของเงินเดือน และจะเรียงลำดับเงินเดือนภายในแต่ละแผนกจากมากไปน้อย
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') สร้างคอลัมน์ใหม่ที่เรียกว่า prev_val ใน DataFrame 'prev_sal_df'
  • สำหรับแต่ละแถว คอลัมน์นี้มีค่าของคอลัมน์ 'sal' จากแถวก่อนหน้าภายในหน้าต่างที่กำหนดโดย windowSpec
  • พารามิเตอร์ offset=1 บ่งชี้ว่าแถวก่อนหน้าควรเป็นหนึ่งแถวก่อนแถวปัจจุบัน และ default=0 ระบุค่าเริ่มต้นสำหรับแถวแรกในแต่ละพาร์ติชัน (เนื่องจากไม่มีแถวก่อนหน้าสำหรับแถวแรก)
  • สุดท้าย prev_sal_df.show() จะแสดง DataFrame ที่เป็นผลลัพธ์

Output:

ผลลัพธ์แสดงถึงเงินเดือนก่อนหน้าของพนักงานแต่ละคนภายในแต่ละแผนก โดยเรียงลำดับเงินเดือนจากมากไปน้อย

คำถามที่ 7 เงินเดือนแรกในแต่ละแผนกและเปรียบเทียบกับสมาชิกทุกคนในแต่ละแผนก

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

แนวทางสำหรับโค้ด PySpark

  • ขั้นแรก สร้างออบเจ็กต์ WindowSpec ที่แบ่งพาร์ติชันข้อมูลตามหมายเลขแผนก (deptno) และเรียงลำดับตามเงินเดือน (sal) ตามลำดับจากมากไปน้อย
  • จากนั้นใช้ฟังก์ชันการวิเคราะห์ first() กับคอลัมน์ 'sal' เหนือหน้าต่างที่กำหนดโดย windowSpec ฟังก์ชันนี้ส่งคืนค่าแรกของคอลัมน์ 'sal' ภายในแต่ละพาร์ติชัน (เช่น แต่ละกลุ่ม deptno) โดยเรียงลำดับตาม 'sal' จากมากไปหาน้อย คอลัมน์ผลลัพธ์มีชื่อใหม่ 'first_val'
  • ตอนนี้กำหนด DataFrame ผลลัพธ์ซึ่งประกอบด้วยคอลัมน์ที่เลือกและคอลัมน์ใหม่ 'first_val' ซึ่งแสดงเงินเดือนสูงสุดอันดับแรกสำหรับแต่ละแผนกตามลำดับค่าเงินเดือนจากมากไปหาน้อย ให้กับตัวแปรใหม่ที่เรียกว่า 'first_value_df'

Output:

ผลลัพธ์จะแสดงเงินเดือนสูงสุดอันดับแรกสำหรับแต่ละแผนกใน DataFrame ของพนักงาน

สรุป

ในบทความนี้ เราเรียนรู้เกี่ยวกับฟังก์ชันของหน้าต่าง Spark SQL มีฟังก์ชันหน้าต่างสามประเภท: ฟังก์ชันการจัดอันดับ ฟังก์ชันรวม และฟังก์ชันค่า เมื่อใช้ฟังก์ชันนี้ เราทำงานกับชุดข้อมูลเพื่อค้นหาข้อมูลเชิงลึกที่สำคัญและมีคุณค่า ฟังก์ชั่น Spark Window นำเสนอเครื่องมือวิเคราะห์ข้อมูลที่มีประสิทธิภาพ เช่น การจัดอันดับ การวิเคราะห์ และการคำนวณมูลค่า ไม่ว่าจะวิเคราะห์ข้อมูลเชิงลึกด้านเงินเดือนตามแผนก หรือใช้ตัวอย่างเชิงปฏิบัติด้วย PySpark และ SQL ฟังก์ชันเหล่านี้มอบเครื่องมือที่จำเป็นสำหรับการประมวลผลและการวิเคราะห์ข้อมูลอย่างมีประสิทธิภาพใน Spark

ประเด็นที่สำคัญ

  • เราเรียนรู้เกี่ยวกับฟังก์ชันหน้าต่างและทำงานร่วมกับฟังก์ชันเหล่านี้โดยใช้ Spark SQL และ PySpark DataFrame API
  • เราใช้ฟังก์ชันต่างๆ เช่น rank,หนาแน่น_rank,row_number,lag,lead,groupBy, partitionBy และฟังก์ชันอื่นๆ เพื่อให้การวิเคราะห์ที่เหมาะสม
  • นอกจากนี้เรายังได้เห็นวิธีแก้ปัญหาโดยละเอียดทีละขั้นตอนสำหรับปัญหา และวิเคราะห์ผลลัพธ์ในตอนท้ายของแต่ละคำชี้แจงปัญหา

กรณีศึกษานี้ช่วยให้คุณเข้าใจฟังก์ชัน PySpark ได้ดีขึ้น หากคุณมีความคิดเห็นหรือคำถามใด ๆ ให้แสดงความคิดเห็นด้านล่าง เชื่อมต่อกับฉันบน LinkedIn เพื่อหารือเพิ่มเติม เรียนรู้ต่อไป!!!

สื่อที่แสดงในบทความนี้ไม่ได้เป็นของ Analytics Vidhya และถูกใช้ตามดุลยพินิจของผู้เขียน

จุด_img

ข่าวกรองล่าสุด

จุด_img