บทนำ
การเรียนรู้เกี่ยวกับฟังก์ชันของหน้าต่างใน PySpark อาจเป็นเรื่องที่ท้าทายแต่ก็คุ้มค่ากับความพยายาม ฟังก์ชันหน้าต่างเป็นเครื่องมือที่มีประสิทธิภาพสำหรับการวิเคราะห์ข้อมูลและสามารถช่วยให้คุณได้รับข้อมูลเชิงลึกที่คุณอาจไม่เคยเห็นมาก่อน ด้วยการทำความเข้าใจการใช้งาน Window Functions ใน Spark คุณสามารถรับของคุณ การวิเคราะห์ข้อมูล ทักษะไปอีกระดับและทำการตัดสินใจอย่างมีข้อมูลมากขึ้น ไม่ว่าคุณจะทำงานกับขนาดใหญ่หรือเล็ก ชุดข้อมูลการเรียนรู้ฟังก์ชันหน้าต่างใน Spark จะช่วยให้คุณจัดการและวิเคราะห์ข้อมูลด้วยวิธีใหม่และน่าตื่นเต้น
ในบล็อกนี้ ก่อนอื่นเราจะเข้าใจแนวคิดของฟังก์ชันหน้าต่าง จากนั้นจึงอภิปรายวิธีใช้งานฟังก์ชันเหล่านี้กับ Spark SQL และ PySpark DataFrame API เพื่อที่ในตอนท้ายของบทความนี้ คุณจะเข้าใจวิธีใช้ฟังก์ชันหน้าต่างกับชุดข้อมูลจริง และรับข้อมูลเชิงลึกที่จำเป็นสำหรับธุรกิจ
วัตถุประสงค์การเรียนรู้
- เข้าใจแนวคิดของฟังก์ชันหน้าต่าง
- การทำงานกับฟังก์ชันหน้าต่างโดยใช้ชุดข้อมูล
- ค้นหาข้อมูลเชิงลึกโดยใช้ฟังก์ชันหน้าต่าง
- ใช้ Spark SQL และ DataFrame API เพื่อทำงานกับฟังก์ชันหน้าต่าง
บทความนี้เผยแพร่โดยเป็นส่วนหนึ่งของไฟล์ Blogathon วิทยาศาสตร์ข้อมูล
สารบัญ
ฟังก์ชั่นหน้าต่างคืออะไร?
ฟังก์ชันหน้าต่างช่วยวิเคราะห์ข้อมูลภายในกลุ่มแถวที่เกี่ยวข้องกัน ช่วยให้ผู้ใช้สามารถดำเนินการแปลงที่ซับซ้อนในแถวของดาต้าเฟรมหรือชุดข้อมูลที่เชื่อมโยงซึ่งกันและกัน โดยขึ้นอยู่กับการแบ่งพาร์ติชันและเกณฑ์การเรียงลำดับ
ฟังก์ชั่นหน้าต่างทำงานบนพาร์ติชันเฉพาะของดาต้าเฟรมหรือชุดข้อมูลที่กำหนดโดยชุดของคอลัมน์การแบ่งพาร์ติชัน ที่ สั่งโดย clause แบ่งพาร์ติชันข้อมูลในฟังก์ชันหน้าต่างเพื่อจัดเรียงตามลำดับเฉพาะ จากนั้นฟังก์ชันหน้าต่างจะทำการคำนวณบนหน้าต่างเลื่อนของแถวที่มีแถวปัจจุบันและเซตย่อยของแถวก่อนหน้า 'และ'/'หรือ' ตามมา ตามที่ระบุไว้ในกรอบหน้าต่าง
ตัวอย่างทั่วไปของฟังก์ชันหน้าต่าง ได้แก่ การคำนวณค่าเฉลี่ยเคลื่อนที่ การจัดอันดับหรือการเรียงลำดับแถวตามคอลัมน์หรือกลุ่มเฉพาะของ คอลัมน์คำนวณผลรวมที่กำลังรัน และค้นหาค่าแรกหรือค่าสุดท้ายในกลุ่มแถว ด้วยฟังก์ชันหน้าต่างอันทรงพลังของ Spark ผู้ใช้สามารถทำการวิเคราะห์และการรวมกลุ่มที่ซับซ้อนได้ ชุดข้อมูลขนาดใหญ่ ได้อย่างง่ายดายทำให้เป็นเครื่องมือยอดนิยมสำหรับคนตัวใหญ่ การประมวลผล และการวิเคราะห์
ฟังก์ชั่นหน้าต่างใน SQL
Spark SQL รองรับฟังก์ชันหน้าต่างสามประเภท:
- ฟังก์ชั่นการจัดอันดับ:- ฟังก์ชันเหล่านี้จะกำหนดอันดับให้กับแต่ละแถวภายในพาร์ติชันของชุดผลลัพธ์ ตัวอย่างเช่น ฟังก์ชัน ROW_NUMBER() จะให้หมายเลขลำดับที่ไม่ซ้ำกันแก่แต่ละแถวภายในพาร์ติชัน
- ฟังก์ชั่นการวิเคราะห์:- ฟังก์ชันเหล่านี้จะคำนวณค่ารวมในหน้าต่างแถว ตัวอย่างเช่น ฟังก์ชัน SUM() จะคำนวณผลรวมของคอลัมน์เหนือหน้าต่างของแถว
- ฟังก์ชั่นค่า:- ฟังก์ชันเหล่านี้จะคำนวณค่าการวิเคราะห์สำหรับแต่ละแถวในพาร์ติชัน โดยยึดตามค่าของแถวอื่นๆ ในพาร์ติชันเดียวกัน ตัวอย่างเช่น ฟังก์ชัน LAG() จะส่งคืนค่าของคอลัมน์จากแถวก่อนหน้าในพาร์ติชัน
การสร้างเฟรมข้อมูล
เราจะสร้าง dataframe ตัวอย่างเพื่อให้เราสามารถทำงานกับฟังก์ชันหน้าต่างต่างๆ ได้จริง นอกจากนี้เราจะพยายามตอบคำถามด้วยความช่วยเหลือของข้อมูลและฟังก์ชันหน้าต่างนี้
Dataframe จะมีรายละเอียดพนักงาน เช่น Name, Designation, Employee Number, Hire Date, Salary เป็นต้น รวมทั้งหมด 8 คอลัมน์ ดังนี้
- 'empno': คอลัมน์นี้ประกอบด้วยหมายเลขพนักงาน
- 'ename': คอลัมน์นี้มีชื่อพนักงาน
- 'งาน': คอลัมน์นี้มีข้อมูลเกี่ยวกับตำแหน่งงานของพนักงาน
- 'hiredate': คอลัมน์นี้แสดงวันที่จ้างงานของพนักงาน
- 'sal': รายละเอียดเงินเดือนอยู่ในคอลัมน์นี้
- 'comm': คอลัมน์นี้มีรายละเอียดค่าคอมมิชชั่นของพนักงาน ถ้ามี
- 'deptno': หมายเลขแผนกที่พนักงานอยู่ในคอลัมน์นี้
# Create Sample Dataframe
employees = [
(7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
(7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
(7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
(7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
(7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
(7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
(7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
(7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
(7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
(7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
(7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
(7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)
]
# create dataframe
emp_df = spark.createDataFrame(employees,
["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()
# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename| job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH| CLERK|17-Dec-80| 800| 20| 10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300| 30|
| 7521| WARD| SALESMAN|22-Feb-81|1250| 500| 30|
| 7566| JONES| MANAGER| 2-Apr-81|2975| 0| 20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400| 30|
| 7698| BLAKE| MANAGER| 1-May-81|2850| 0| 30|
| 7782| CLARK| MANAGER| 9-Jun-81|2450| 0| 10|
| 7788| SCOTT| ANALYST|19-Apr-87|3000| 0| 20|
| 7629| ALEX| SALESMAN|28-Sep-79|1150|1400| 30|
| 7839| KING|PRESIDENT|17-Nov-81|5000| 0| 10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500| 0| 30|
| 7876| ADAMS| CLERK|23-May-87|1100| 0| 20|
+-----+------+---------+---------+----+----+------+
ตอนนี้เราจะตรวจสอบสคีมา:
# Checking the schema
emp_df.printSchema()
# Output:-
root
|-- empno: long (nullable = true)
|-- ename: string (nullable = true)
|-- job: string (nullable = true)
|-- hiredate: string (nullable = true)
|-- sal: long (nullable = true)
|-- comm: long (nullable = true)
|-- deptno: long (nullable = true)
สร้างมุมมองชั่วคราวของ DataFrame 'emp_df' ด้วยชื่อ "emp" ช่วยให้เราสามารถสืบค้น DataFrame โดยใช้ไวยากรณ์ SQL ใน Spark SQL ราวกับว่าเป็นตาราง มุมมองชั่วคราวใช้ได้เฉพาะในช่วงระยะเวลาของเซสชัน Spark เท่านั้น
emp_df.createOrReplaceTempView("emp")
การแก้ปัญหาคำชี้แจงปัญหาโดยใช้ฟังก์ชันหน้าต่าง
ที่นี่เราจะแก้ไขปัญหาหลายประการโดยใช้ฟังก์ชัน Windows:
ไตรมาสที่ 1 จัดอันดับเงินเดือนในแต่ละแผนก
# Using spark sql
rank_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal,
RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()
# Output:-
+-----+------+---------+------+----+----+
|empno| ename| job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839| KING|PRESIDENT| 10|5000| 1|
| 7782| CLARK| MANAGER| 10|2450| 2|
| 7369| SMITH| CLERK| 10| 800| 3|
| 7788| SCOTT| ANALYST| 20|3000| 1|
| 7566| JONES| MANAGER| 20|2975| 2|
| 7876| ADAMS| CLERK| 20|1100| 3|
| 7698| BLAKE| MANAGER| 30|2850| 1|
| 7499| ALLEN| SALESMAN| 30|1600| 2|
| 7844|TURNER| SALESMAN| 30|1500| 3|
| 7521| WARD| SALESMAN| 30|1250| 4|
| 7654|MARTIN| SALESMAN| 30|1250| 4|
| 7629| ALEX| SALESMAN| 30|1150| 6|
+-----+------+---------+------+----+----+
แนวทางสำหรับโค้ด PySpark
- ฟังก์ชัน Window จะแบ่งพาร์ติชันข้อมูลตามหมายเลขแผนกโดยใช้ partitionBy(col('deptno')) จากนั้นจัดลำดับข้อมูลภายในแต่ละพาร์ติชันตามเงินเดือนจากมากไปน้อยโดยใช้ orderBy(col('sal').desc()) ตัวแปร windowSpec เก็บข้อกำหนดหน้าต่างขั้นสุดท้ายไว้
- 'emp_df' คือ dataframe ที่มีข้อมูลพนักงาน รวมถึงคอลัมน์สำหรับ empno, ename, job, deptno และ sal
- ฟังก์ชันอันดับถูกนำไปใช้กับคอลัมน์เงินเดือนโดยใช้ 'F.rank().over(windowSpec)' ภายในคำสั่ง select คอลัมน์ผลลัพธ์มีชื่อนามแฝงว่า 'rank'
- มันจะสร้าง dataframe 'ranking_result_df' ซึ่งประกอบด้วย empno, ename, job, deptno และเงินเดือน นอกจากนี้ยังมีคอลัมน์ใหม่ 'อันดับ' ซึ่งแสดงถึงอันดับเงินเดือนของพนักงานภายในแผนกของพวกเขา
Output:
ผลลัพธ์มีอันดับเงินเดือนในแต่ละแผนก
ไตรมาสที่ 2 หนาแน่นจัดอันดับเงินเดือนในแต่ละแผนก
# Using Spark SQL
dense_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal,
DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC)
AS dense_rank FROM emp""")
dense_df.show()
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()
# Output:-
+-----+------+---------+------+----+----------+
|empno| ename| job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839| KING|PRESIDENT| 10|5000| 1|
| 7782| CLARK| MANAGER| 10|2450| 2|
| 7369| SMITH| CLERK| 10| 800| 3|
| 7788| SCOTT| ANALYST| 20|3000| 1|
| 7566| JONES| MANAGER| 20|2975| 2|
| 7876| ADAMS| CLERK| 20|1100| 3|
| 7698| BLAKE| MANAGER| 30|2850| 1|
| 7499| ALLEN| SALESMAN| 30|1600| 2|
| 7844|TURNER| SALESMAN| 30|1500| 3|
| 7521| WARD| SALESMAN| 30|1250| 4|
| 7654|MARTIN| SALESMAN| 30|1250| 4|
| 7629| ALEX| SALESMAN| 30|1150| 5|
+-----+------+---------+------+----+----------+
แนวทางสำหรับโค้ด PySpark
- ขั้นแรก สร้างข้อกำหนดหน้าต่างโดยใช้ฟังก์ชัน Window ซึ่งจะแบ่งพาร์ติชัน DataFrame 'emp_df' ด้วย deptno และจัดลำดับโดยลดคอลัมน์ 'sal' จากมากไปน้อย
- จากนั้น ฟังก์ชัน Dens_rank() จะถูกนำไปใช้กับข้อกำหนดเฉพาะของหน้าต่าง ซึ่งจะกำหนดอันดับหนาแน่นให้กับแต่ละแถวภายในแต่ละพาร์ติชันตามลำดับการจัดเรียง
- ในที่สุด DataFrame ใหม่ที่เรียกว่า 'dense_ranking_df' จะถูกสร้างขึ้นโดยการเลือกคอลัมน์เฉพาะจาก emp_df (เช่น 'empno', 'ename', 'งาน', 'deptno' และ 'sal') และเพิ่มคอลัมน์ใหม่ 'dense_rank' ที่ มีค่าการจัดอันดับหนาแน่นซึ่งคำนวณโดยฟังก์ชันหน้าต่าง
- สุดท้าย แสดง DataFrame ที่เป็นผลลัพธ์ในรูปแบบตาราง
Output:
ผลลัพธ์มีอันดับหนาแน่นตามเงินเดือน
ไตรมาสที่ 3 ระบุหมายเลขแถวภายในแต่ละแผนก
# Using Spark SQL
row_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal,
ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
AS row_num FROM emp """)
row_df.show()
# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()
# Output:-
+-----+------+---------+------+----+-------+
|empno| ename| job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839| KING|PRESIDENT| 10|5000| 1|
| 7782| CLARK| MANAGER| 10|2450| 2|
| 7369| SMITH| CLERK| 10| 800| 3|
| 7788| SCOTT| ANALYST| 20|3000| 1|
| 7566| JONES| MANAGER| 20|2975| 2|
| 7876| ADAMS| CLERK| 20|1100| 3|
| 7698| BLAKE| MANAGER| 30|2850| 1|
| 7499| ALLEN| SALESMAN| 30|1600| 2|
| 7844|TURNER| SALESMAN| 30|1500| 3|
| 7521| WARD| SALESMAN| 30|1250| 4|
| 7654|MARTIN| SALESMAN| 30|1250| 5|
| 7629| ALEX| SALESMAN| 30|1150| 6|
+-----+------+---------+------+----+-------+
แนวทางสำหรับโค้ด PySpark
- บรรทัดแรกกำหนดข้อกำหนดหน้าต่างสำหรับการคำนวณโดยใช้ฟังก์ชัน Window.partitionBy() และ Window.orderBy() หน้าต่างนี้ถูกแบ่งพาร์ติชันตามคอลัมน์ deptno และเรียงลำดับตามคอลัมน์ sal ตามลำดับจากมากไปน้อย
- บรรทัดที่สองสร้าง DataFrame ใหม่ชื่อ 'row_num_df' ซึ่งเป็นเส้นโครงของ 'emp_df' พร้อมด้วยคอลัมน์เพิ่มเติมที่เรียกว่า 'row_num' และมีรายละเอียดหมายเลขแถว
- ฟังก์ชัน show() จะแสดง DataFrame ผลลัพธ์ ซึ่งแสดงคอลัมน์ empno, ename, job, deptno, sal และ row_num ของพนักงานแต่ละคน
Output:
ผลลัพธ์จะมีหมายเลขแถวของพนักงานแต่ละคนภายในแผนกโดยอิงตามเงินเดือน
ไตรมาสที่ 4 วิ่งผลรวมเงินเดือนในแต่ละแผนก
# Using Spark SQL
running_sum_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal,
SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC)
AS running_total FROM emp
""")
running_sum_df.show()
# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()
# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename| job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839| KING|PRESIDENT| 10|5000| 5000|
| 7782| CLARK| MANAGER| 10|2450| 7450|
| 7369| SMITH| CLERK| 10| 800| 8250|
| 7788| SCOTT| ANALYST| 20|3000| 3000|
| 7566| JONES| MANAGER| 20|2975| 5975|
| 7876| ADAMS| CLERK| 20|1100| 7075|
| 7698| BLAKE| MANAGER| 30|2850| 2850|
| 7499| ALLEN| SALESMAN| 30|1600| 4450|
| 7844|TURNER| SALESMAN| 30|1500| 5950|
| 7521| WARD| SALESMAN| 30|1250| 8450|
| 7654|MARTIN| SALESMAN| 30|1250| 8450|
| 7629| ALEX| SALESMAN| 30|1150| 9600|
+-----+------+---------+------+----+-------------+
เข้าใกล้ สำหรับโค้ด PySpark
- ขั้นแรก ข้อกำหนดเฉพาะของหน้าต่างถูกกำหนดโดยใช้วิธี “Window.partitionBy()” และ “Window.orderBy()” เมธอด “partitionBy()” จะแบ่งพาร์ติชันข้อมูลด้วยคอลัมน์ “deptno” ในขณะที่เมธอด “orderBy()” จะเรียงลำดับข้อมูลตามคอลัมน์ “sal” ตามลำดับจากมากไปน้อย
- จากนั้น ฟังก์ชัน "sum()" จะถูกนำไปใช้กับคอลัมน์ "sal" โดยใช้วิธี "over()" เพื่อคำนวณยอดรวมของเงินเดือนภายในแต่ละแผนก ผลลัพธ์จะอยู่ใน DataFrame ใหม่ที่เรียกว่า “running_sum_sal_df” ซึ่งมีคอลัมน์ 'empno', 'ename', 'job', 'deptno', 'sal' และ 'running_total'
- สุดท้ายนี้ มีการเรียกเมธอด "show()" บน DataFrame "running_sum_sal_df" เพื่อแสดงผลลัพธ์ของการสืบค้น DataFrame ที่เป็นผลลัพธ์จะแสดงเงินเดือนรวมของพนักงานแต่ละคนและรายละเอียดอื่นๆ เช่น ชื่อ หมายเลขแผนก และงาน
Output:
ผลลัพธ์จะมีข้อมูลเงินเดือนรวมของแต่ละแผนกที่ทำงานอยู่
Q5: เงินเดือนถัดไปในแต่ละแผนก
เพื่อค้นหาเงินเดือนถัดไปในแต่ละแผนก เราใช้ฟังก์ชัน LEAD
ฟังก์ชันหน้าต่าง lead() ช่วยในการรับค่าของนิพจน์ในแถวถัดไปของพาร์ติชันหน้าต่าง โดยจะส่งกลับคอลัมน์สำหรับแต่ละคอลัมน์อินพุต โดยแต่ละคอลัมน์จะมีค่าของคอลัมน์อินพุตสำหรับแถวออฟเซ็ตที่อยู่เหนือแถวปัจจุบันภายในพาร์ติชันหน้าต่าง ไวยากรณ์สำหรับฟังก์ชัน lead คือ:- lead(col, offset=1, default=None)
# Using Spark SQL
next_sal_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal, LEAD(sal, 1)
OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
""")
next_sal_df.show()
# Output:-
+-----+------+---------+------+----+--------+
|empno| ename| job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839| KING|PRESIDENT| 10|5000| 2450|
| 7782| CLARK| MANAGER| 10|2450| 800|
| 7369| SMITH| CLERK| 10| 800| null|
| 7788| SCOTT| ANALYST| 20|3000| 2975|
| 7566| JONES| MANAGER| 20|2975| 1100|
| 7876| ADAMS| CLERK| 20|1100| null|
| 7698| BLAKE| MANAGER| 30|2850| 1600|
| 7499| ALLEN| SALESMAN| 30|1600| 1500|
| 7844|TURNER| SALESMAN| 30|1500| 1250|
| 7521| WARD| SALESMAN| 30|1250| 1250|
| 7654|MARTIN| SALESMAN| 30|1250| 1150|
| 7629| ALEX| SALESMAN| 30|1150| null|
+-----+------+---------+------+----+--------+
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()
# Output:-
+-----+------+---------+------+----+--------+
|empno| ename| job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839| KING|PRESIDENT| 10|5000| 2450|
| 7782| CLARK| MANAGER| 10|2450| 800|
| 7369| SMITH| CLERK| 10| 800| 0|
| 7788| SCOTT| ANALYST| 20|3000| 2975|
| 7566| JONES| MANAGER| 20|2975| 1100|
| 7876| ADAMS| CLERK| 20|1100| 0|
| 7698| BLAKE| MANAGER| 30|2850| 1600|
| 7499| ALLEN| SALESMAN| 30|1600| 1500|
| 7844|TURNER| SALESMAN| 30|1500| 1250|
| 7521| WARD| SALESMAN| 30|1250| 1250|
| 7654|MARTIN| SALESMAN| 30|1250| 1150|
| 7629| ALEX| SALESMAN| 30|1150| 0|
+-----+------+---------+------+----+--------+
แนวทางสำหรับโค้ด PySpark
- ขั้นแรก ฟังก์ชันหน้าต่างจะช่วยแบ่งพาร์ติชันแถวของ DataFrame ตามหมายเลขแผนก (deptno) และเรียงลำดับเงินเดือนจากมากไปน้อยภายในแต่ละพาร์ติชัน
- จากนั้น ฟังก์ชัน lead() จะถูกนำไปใช้กับคอลัมน์ 'sal' ที่เรียงลำดับภายในแต่ละพาร์ติชันเพื่อส่งคืนเงินเดือนของพนักงานต่อไปนี้ (โดยมีออฟเซ็ตเป็น 1) และค่าเริ่มต้นคือ 0 ในกรณีที่ไม่มีพนักงานคนถัดไป
- DataFrame ที่เป็นผลลัพธ์ 'next_salary_df' มีคอลัมน์สำหรับหมายเลขพนักงาน (empno), ชื่อ (ename), ตำแหน่งงาน (งาน), หมายเลขแผนก (deptno), เงินเดือนปัจจุบัน (sal) และเงินเดือนถัดไป (next_val)
Output:
ผลลัพธ์ประกอบด้วยเงินเดือนของพนักงานคนถัดไปในแผนกตามลำดับเงินเดือนจากมากไปน้อย
คำถามที่ 6 เงินเดือนก่อนหน้าในแต่ละแผนก
ในการคำนวณเงินเดือนก่อนหน้า เราใช้ฟังก์ชัน LAG
ฟังก์ชัน lag ส่งกลับค่าของนิพจน์ที่ออฟเซ็ตที่กำหนดก่อนแถวปัจจุบันภายในพาร์ติชันหน้าต่าง ไวยากรณ์ของฟังก์ชัน lag คือ:- lag(expr, offset=1, default=None).over(windowSpec)
# Using Spark SQL
preious_sal_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal, LAG(sal, 1)
OVER (PARTITION BY deptno ORDER BY sal DESC)
AS prev_val FROM emp
""")
preious_sal_df.show()
# Output:-
+-----+------+---------+------+----+--------+
|empno| ename| job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839| KING|PRESIDENT| 10|5000| null|
| 7782| CLARK| MANAGER| 10|2450| 5000|
| 7369| SMITH| CLERK| 10| 800| 2450|
| 7788| SCOTT| ANALYST| 20|3000| null|
| 7566| JONES| MANAGER| 20|2975| 3000|
| 7876| ADAMS| CLERK| 20|1100| 2975|
| 7698| BLAKE| MANAGER| 30|2850| null|
| 7499| ALLEN| SALESMAN| 30|1600| 2850|
| 7844|TURNER| SALESMAN| 30|1500| 1600|
| 7521| WARD| SALESMAN| 30|1250| 1500|
| 7654|MARTIN| SALESMAN| 30|1250| 1250|
| 7629| ALEX| SALESMAN| 30|1150| 1250|
+-----+------+---------+------+----+--------+
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()
# Output:-
+-----+------+---------+------+----+--------+
|empno| ename| job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839| KING|PRESIDENT| 10|5000| 0|
| 7782| CLARK| MANAGER| 10|2450| 5000|
| 7369| SMITH| CLERK| 10| 800| 2450|
| 7788| SCOTT| ANALYST| 20|3000| 0|
| 7566| JONES| MANAGER| 20|2975| 3000|
| 7876| ADAMS| CLERK| 20|1100| 2975|
| 7698| BLAKE| MANAGER| 30|2850| 0|
| 7499| ALLEN| SALESMAN| 30|1600| 2850|
| 7844|TURNER| SALESMAN| 30|1500| 1600|
| 7521| WARD| SALESMAN| 30|1250| 1500|
| 7654|MARTIN| SALESMAN| 30|1250| 1250|
| 7629| ALEX| SALESMAN| 30|1150| 1250|
+-----+------+---------+------+----+--------+
แนวทางสำหรับโค้ด PySpark
- window.partitionBy(col('deptno')) ระบุพาร์ติชันหน้าต่าง หมายความว่าฟังก์ชันหน้าต่างทำงานแยกกันสำหรับแต่ละแผนก
- จากนั้น orderBy(col('sal').desc()) จะระบุลำดับของเงินเดือน และจะเรียงลำดับเงินเดือนภายในแต่ละแผนกจากมากไปน้อย
- F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') สร้างคอลัมน์ใหม่ที่เรียกว่า prev_val ใน DataFrame 'prev_sal_df'
- สำหรับแต่ละแถว คอลัมน์นี้มีค่าของคอลัมน์ 'sal' จากแถวก่อนหน้าภายในหน้าต่างที่กำหนดโดย windowSpec
- พารามิเตอร์ offset=1 บ่งชี้ว่าแถวก่อนหน้าควรเป็นหนึ่งแถวก่อนแถวปัจจุบัน และ default=0 ระบุค่าเริ่มต้นสำหรับแถวแรกในแต่ละพาร์ติชัน (เนื่องจากไม่มีแถวก่อนหน้าสำหรับแถวแรก)
- สุดท้าย prev_sal_df.show() จะแสดง DataFrame ที่เป็นผลลัพธ์
Output:
ผลลัพธ์แสดงถึงเงินเดือนก่อนหน้าของพนักงานแต่ละคนภายในแต่ละแผนก โดยเรียงลำดับเงินเดือนจากมากไปน้อย
คำถามที่ 7 เงินเดือนแรกในแต่ละแผนกและเปรียบเทียบกับสมาชิกทุกคนในแต่ละแผนก
# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal,
FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC)
AS first_val FROM emp """)
first_val_df.show()
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()
# Output:-
+-----+------+---------+------+----+---------+
|empno| ename| job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839| KING|PRESIDENT| 10|5000| 5000|
| 7782| CLARK| MANAGER| 10|2450| 5000|
| 7369| SMITH| CLERK| 10| 800| 5000|
| 7788| SCOTT| ANALYST| 20|3000| 3000|
| 7566| JONES| MANAGER| 20|2975| 3000|
| 7876| ADAMS| CLERK| 20|1100| 3000|
| 7698| BLAKE| MANAGER| 30|2850| 2850|
| 7499| ALLEN| SALESMAN| 30|1600| 2850|
| 7844|TURNER| SALESMAN| 30|1500| 2850|
| 7521| WARD| SALESMAN| 30|1250| 2850|
| 7654|MARTIN| SALESMAN| 30|1250| 2850|
| 7629| ALEX| SALESMAN| 30|1150| 2850|
+-----+------+---------+------+----+---------+
แนวทางสำหรับโค้ด PySpark
- ขั้นแรก สร้างออบเจ็กต์ WindowSpec ที่แบ่งพาร์ติชันข้อมูลตามหมายเลขแผนก (deptno) และเรียงลำดับตามเงินเดือน (sal) ตามลำดับจากมากไปน้อย
- จากนั้นใช้ฟังก์ชันการวิเคราะห์ first() กับคอลัมน์ 'sal' เหนือหน้าต่างที่กำหนดโดย windowSpec ฟังก์ชันนี้ส่งคืนค่าแรกของคอลัมน์ 'sal' ภายในแต่ละพาร์ติชัน (เช่น แต่ละกลุ่ม deptno) โดยเรียงลำดับตาม 'sal' จากมากไปหาน้อย คอลัมน์ผลลัพธ์มีชื่อใหม่ 'first_val'
- ตอนนี้กำหนด DataFrame ผลลัพธ์ซึ่งประกอบด้วยคอลัมน์ที่เลือกและคอลัมน์ใหม่ 'first_val' ซึ่งแสดงเงินเดือนสูงสุดอันดับแรกสำหรับแต่ละแผนกตามลำดับค่าเงินเดือนจากมากไปหาน้อย ให้กับตัวแปรใหม่ที่เรียกว่า 'first_value_df'
Output:
ผลลัพธ์จะแสดงเงินเดือนสูงสุดอันดับแรกสำหรับแต่ละแผนกใน DataFrame ของพนักงาน
สรุป
ในบทความนี้ เราเรียนรู้เกี่ยวกับฟังก์ชันของหน้าต่าง Spark SQL มีฟังก์ชันหน้าต่างสามประเภท: ฟังก์ชันการจัดอันดับ ฟังก์ชันรวม และฟังก์ชันค่า เมื่อใช้ฟังก์ชันนี้ เราทำงานกับชุดข้อมูลเพื่อค้นหาข้อมูลเชิงลึกที่สำคัญและมีคุณค่า ฟังก์ชั่น Spark Window นำเสนอเครื่องมือวิเคราะห์ข้อมูลที่มีประสิทธิภาพ เช่น การจัดอันดับ การวิเคราะห์ และการคำนวณมูลค่า ไม่ว่าจะวิเคราะห์ข้อมูลเชิงลึกด้านเงินเดือนตามแผนก หรือใช้ตัวอย่างเชิงปฏิบัติด้วย PySpark และ SQL ฟังก์ชันเหล่านี้มอบเครื่องมือที่จำเป็นสำหรับการประมวลผลและการวิเคราะห์ข้อมูลอย่างมีประสิทธิภาพใน Spark
ประเด็นที่สำคัญ
- เราเรียนรู้เกี่ยวกับฟังก์ชันหน้าต่างและทำงานร่วมกับฟังก์ชันเหล่านี้โดยใช้ Spark SQL และ PySpark DataFrame API
- เราใช้ฟังก์ชันต่างๆ เช่น rank,หนาแน่น_rank,row_number,lag,lead,groupBy, partitionBy และฟังก์ชันอื่นๆ เพื่อให้การวิเคราะห์ที่เหมาะสม
- นอกจากนี้เรายังได้เห็นวิธีแก้ปัญหาโดยละเอียดทีละขั้นตอนสำหรับปัญหา และวิเคราะห์ผลลัพธ์ในตอนท้ายของแต่ละคำชี้แจงปัญหา
กรณีศึกษานี้ช่วยให้คุณเข้าใจฟังก์ชัน PySpark ได้ดีขึ้น หากคุณมีความคิดเห็นหรือคำถามใด ๆ ให้แสดงความคิดเห็นด้านล่าง เชื่อมต่อกับฉันบน LinkedIn เพื่อหารือเพิ่มเติม เรียนรู้ต่อไป!!!
สื่อที่แสดงในบทความนี้ไม่ได้เป็นของ Analytics Vidhya และถูกใช้ตามดุลยพินิจของผู้เขียน
- เนื้อหาที่ขับเคลื่อนด้วย SEO และการเผยแพร่ประชาสัมพันธ์ รับการขยายวันนี้
- PlatoData.Network Vertical Generative Ai เพิ่มพลังให้กับตัวเอง เข้าถึงได้ที่นี่.
- เพลโตไอสตรีม. Web3 อัจฉริยะ ขยายความรู้ เข้าถึงได้ที่นี่.
- เพลโตESG. คาร์บอน, คลีนเทค, พลังงาน, สิ่งแวดล้อม แสงอาทิตย์, การจัดการของเสีย. เข้าถึงได้ที่นี่.
- เพลโตสุขภาพ เทคโนโลยีชีวภาพและข่าวกรองการทดลองทางคลินิก เข้าถึงได้ที่นี่.
- ที่มา: https://www.analyticsvidhya.com/blog/2024/03/working-with-window-functions-in-pyspark/