សេចក្តីផ្តើម
ការរៀនអំពីមុខងារ Window នៅក្នុង PySpark អាចជាបញ្ហាប្រឈម ប៉ុន្តែមានតម្លៃក្នុងការខិតខំប្រឹងប្រែង។ មុខងារ Window គឺជាឧបករណ៍ដ៏មានឥទ្ធិពលសម្រាប់ការវិភាគទិន្នន័យ និងអាចជួយអ្នកឱ្យទទួលបានការយល់ដឹងដែលអ្នកប្រហែលជាមិនធ្លាប់បានឃើញ។ តាមរយៈការយល់ដឹងពីរបៀបប្រើមុខងារ Window នៅក្នុង Spark; អ្នកអាចយករបស់អ្នក។ ការវិភាគទិន្នន័យ ជំនាញទៅកម្រិតបន្ទាប់ និងធ្វើការសម្រេចចិត្តប្រកបដោយការយល់ដឹងបន្ថែមទៀត។ មិនថាអ្នកធ្វើការជាមួយធំឬតូច សំណុំទិន្នន័យការរៀនមុខងារ Window នៅក្នុង Spark នឹងអនុញ្ញាតឱ្យអ្នករៀបចំ និងវិភាគទិន្នន័យតាមរបៀបថ្មី និងគួរឱ្យរំភើប។
នៅក្នុងប្លក់នេះ យើងនឹងយល់អំពីគោលគំនិតនៃមុខងារ window ជាមុនសិន ហើយបន្ទាប់មកពិភាក្សាពីរបៀបប្រើប្រាស់ពួកវាជាមួយ Spark SQL និង PySpark DataFrame API។ ដូច្នេះនៅចុងបញ្ចប់នៃអត្ថបទនេះ អ្នកនឹងយល់ពីរបៀបប្រើប្រាស់មុខងារ Window ជាមួយនឹងសំណុំទិន្នន័យពិតប្រាកដ និងទទួលបានការយល់ដឹងសំខាន់ៗសម្រាប់អាជីវកម្ម។
គោលបំណងនៃការរៀន
- ស្វែងយល់ពីគំនិតនៃមុខងារបង្អួច។
- ធ្វើការជាមួយមុខងារបង្អួចដោយប្រើសំណុំទិន្នន័យ។
- ស្វែងយល់ពីការយល់ដឹងដោយប្រើមុខងារបង្អួច។
- ប្រើ Spark SQL និង DataFrame API ដើម្បីធ្វើការជាមួយមុខងារបង្អួច។
អត្ថបទនេះត្រូវបានបោះពុម្ពផ្សាយជាផ្នែកមួយនៃព្រះគម្ពីរមរមន Blogathon វិទ្យាសាស្ត្រទិន្នន័យ។
តារាងមាតិកា
តើមុខងារ Window មានអ្វីខ្លះ?
មុខងារ Window ជួយវិភាគទិន្នន័យក្នុងក្រុមជួរដេកដែលទាក់ទងគ្នាទៅវិញទៅមក។ ពួកវាអាចឱ្យអ្នកប្រើប្រាស់ធ្វើការបំប្លែងដ៏ស្មុគស្មាញនៅលើជួរដេកនៃស៊ុមទិន្នន័យ ឬសំណុំទិន្នន័យដែលទាក់ទងគ្នាទៅវិញទៅមក ដោយផ្អែកលើលក្ខណៈវិនិច្ឆ័យនៃការបែងចែក និងលំដាប់មួយចំនួន។
មុខងារ Window ដំណើរការលើភាគថាសជាក់លាក់នៃ dataframe ឬ dataset ដែលកំណត់ដោយសំណុំនៃ partitioning columns។ នេះ។ បញ្ជាដោយ clause បែងចែកទិន្នន័យនៅក្នុងមុខងារ window ដើម្បីរៀបចំវាតាមលំដាប់ជាក់លាក់មួយ។ បន្ទាប់មក អនុគមន៍បង្អួចអនុវត្តការគណនាលើបង្អួចរំកិលនៃជួរដេក ដែលរួមបញ្ចូលជួរដេកបច្ចុប្បន្ន និងសំណុំរងនៃជួរខាងមុខទាំង 'និង'/'ឬ' ខាងក្រោម ដូចដែលបានបញ្ជាក់ក្នុងស៊ុមបង្អួច។
ឧទាហរណ៍ទូទៅមួយចំនួននៃមុខងារបង្អួចរួមមានការគណនាមធ្យមផ្លាស់ទី ចំណាត់ថ្នាក់ ឬតម្រៀបជួរដោយផ្អែកលើជួរឈរ ឬក្រុមជាក់លាក់នៃ ជួរឈរគណនាចំនួនសរុបដែលកំពុងដំណើរការ និងការស្វែងរកតម្លៃដំបូង ឬចុងក្រោយក្នុងក្រុមជួរ។ ជាមួយនឹងមុខងារបង្អួចដ៏មានអានុភាពរបស់ Spark អ្នកប្រើប្រាស់អាចធ្វើការវិភាគស្មុគស្មាញ និងការបូកសរុប សំណុំទិន្នន័យធំ ជាមួយនឹងភាពងាយស្រួលដែលទាក់ទង ធ្វើឱ្យវាក្លាយជាឧបករណ៍ពេញនិយមសម្រាប់ធំ ដំណើរការទិន្នន័យ និងការវិភាគ។
មុខងារ Window ក្នុង SQL
Spark SQL គាំទ្រមុខងារបង្អួចបីប្រភេទ៖
- មុខងារចំណាត់ថ្នាក់៖ - មុខងារទាំងនេះផ្តល់ចំណាត់ថ្នាក់ទៅជួរនីមួយៗក្នុងភាគថាសនៃសំណុំលទ្ធផល។ ឧទាហរណ៍ អនុគមន៍ ROW_NUMBER() ផ្តល់លេខបន្តបន្ទាប់គ្នាតែមួយគត់ទៅជួរនីមួយៗក្នុងភាគថាស។
- មុខងារវិភាគ៖ - មុខងារទាំងនេះគណនាតម្លៃសរុបនៅលើបង្អួចនៃជួរដេក។ ឧទាហរណ៍ អនុគមន៍ SUM() គណនាផលបូកនៃជួរឈរនៅលើបង្អួចនៃជួរដេក។
- មុខងារតម្លៃ៖ - មុខងារទាំងនេះគណនាតម្លៃវិភាគសម្រាប់ជួរនីមួយៗក្នុងភាគថាស ដោយផ្អែកលើតម្លៃនៃជួរផ្សេងទៀតក្នុងភាគថាសដូចគ្នា។ ឧទាហរណ៍ អនុគមន៍ LAG() ត្រឡប់តម្លៃនៃជួរឈរពីជួរមុនក្នុងភាគថាស។
ការបង្កើត DataFrame
យើងនឹងបង្កើតគំរូទិន្នន័យមួយ ដូច្នេះយើងអាចធ្វើការជាមួយមុខងារបង្អួចផ្សេងៗបានយ៉ាងអនុវត្ត។ យើងក៏នឹងព្យាយាមឆ្លើយសំណួរមួយចំនួន ដោយមានជំនួយពីទិន្នន័យនេះ និងមុខងារបង្អួច។
ស៊ុមទិន្នន័យមានព័ត៌មានលំអិតអំពីបុគ្គលិកដូចជា ឈ្មោះ ការកំណត់ លេខនិយោជិត កាលបរិច្ឆេទជួល ប្រាក់បៀវត្សរ៍ជាដើម។ សរុបយើងមាន 8 ជួរដែលមានដូចខាងក្រោម៖
- 'empno'៖ ជួរឈរនេះមានលេខបុគ្គលិក។
- 'ename'៖ ជួរឈរនេះមានឈ្មោះបុគ្គលិក។
- 'job': ជួរនេះមានព័ត៌មានអំពីចំណងជើងការងាររបស់និយោជិត។
- 'hiredate'៖ ជួរឈរនេះបង្ហាញពីកាលបរិច្ឆេទជួលបុគ្គលិក។
- 'sal': ព័ត៌មានលម្អិតអំពីប្រាក់ខែមាននៅក្នុងជួរឈរនេះ។
- 'comm'៖ ជួរឈរនេះមានព័ត៌មានលម្អិតអំពីគណៈកម្មការបុគ្គលិក ប្រសិនបើមាន។
- 'deptno': លេខនាយកដ្ឋានដែលនិយោជិតជាកម្មសិទ្ធិគឺនៅក្នុងជួរឈរនេះ។
# Create Sample Dataframe
employees = [
(7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
(7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
(7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
(7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
(7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
(7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
(7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
(7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
(7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
(7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
(7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
(7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)
]
# create dataframe
emp_df = spark.createDataFrame(employees,
["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()
# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename| job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH| CLERK|17-Dec-80| 800| 20| 10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300| 30|
| 7521| WARD| SALESMAN|22-Feb-81|1250| 500| 30|
| 7566| JONES| MANAGER| 2-Apr-81|2975| 0| 20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400| 30|
| 7698| BLAKE| MANAGER| 1-May-81|2850| 0| 30|
| 7782| CLARK| MANAGER| 9-Jun-81|2450| 0| 10|
| 7788| SCOTT| ANALYST|19-Apr-87|3000| 0| 20|
| 7629| ALEX| SALESMAN|28-Sep-79|1150|1400| 30|
| 7839| KING|PRESIDENT|17-Nov-81|5000| 0| 10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500| 0| 30|
| 7876| ADAMS| CLERK|23-May-87|1100| 0| 20|
+-----+------+---------+---------+----+----+------+
ឥឡូវនេះយើងនឹងពិនិត្យមើលគ្រោងការណ៍៖
# Checking the schema
emp_df.printSchema()
# Output:-
root
|-- empno: long (nullable = true)
|-- ename: string (nullable = true)
|-- job: string (nullable = true)
|-- hiredate: string (nullable = true)
|-- sal: long (nullable = true)
|-- comm: long (nullable = true)
|-- deptno: long (nullable = true)
បង្កើតទិដ្ឋភាពបណ្តោះអាសន្ននៃ DataFrame 'emp_df' ដោយប្រើឈ្មោះ "emp" ។ វាអនុញ្ញាតឱ្យយើងសាកសួរ DataFrame ដោយប្រើវាក្យសម្ព័ន្ធ SQL នៅក្នុង Spark SQL ដូចជាវាជាតារាង។ ទិដ្ឋភាពបណ្តោះអាសន្នមានសុពលភាពសម្រាប់តែរយៈពេលនៃវគ្គ Spark Session ប៉ុណ្ណោះ។
emp_df.createOrReplaceTempView("emp")
ការដោះស្រាយបញ្ហាសេចក្តីថ្លែងការណ៍ដោយប្រើមុខងារបង្អួច
ខាងក្រោមនេះយើងនឹងកំពុងដោះស្រាយបញ្ហាមួយចំនួនដោយប្រើមុខងារ windows៖
សំណួរទី 1 ចាត់ថ្នាក់ប្រាក់បៀវត្សរ៍ក្នុងផ្នែកនីមួយៗ។
# Using spark sql
rank_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal,
RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()
# Output:-
+-----+------+---------+------+----+----+
|empno| ename| job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839| KING|PRESIDENT| 10|5000| 1|
| 7782| CLARK| MANAGER| 10|2450| 2|
| 7369| SMITH| CLERK| 10| 800| 3|
| 7788| SCOTT| ANALYST| 20|3000| 1|
| 7566| JONES| MANAGER| 20|2975| 2|
| 7876| ADAMS| CLERK| 20|1100| 3|
| 7698| BLAKE| MANAGER| 30|2850| 1|
| 7499| ALLEN| SALESMAN| 30|1600| 2|
| 7844|TURNER| SALESMAN| 30|1500| 3|
| 7521| WARD| SALESMAN| 30|1250| 4|
| 7654|MARTIN| SALESMAN| 30|1250| 4|
| 7629| ALEX| SALESMAN| 30|1150| 6|
+-----+------+---------+------+----+----+
វិធីសាស្រ្តសម្រាប់លេខកូដ PySpark
- មុខងារ Window បែងចែកទិន្នន័យតាមលេខនាយកដ្ឋានដោយប្រើ partitionBy(col('deptno')) ហើយបន្ទាប់មកបញ្ជាទិន្នន័យក្នុង partition នីមួយៗដោយប្រាក់ខែតាមលំដាប់ចុះដោយប្រើ orderBy(col('sal').desc())។ WindowSpec អថេររក្សាការបញ្ជាក់វិនដូចុងក្រោយ។
- 'emp_df' គឺជា dataframe ដែលមានទិន្នន័យបុគ្គលិក រួមទាំងជួរឈរសម្រាប់ empno, ename, job, deptno និង sal ។
- មុខងារចំណាត់ថ្នាក់ត្រូវបានអនុវត្តទៅជួរប្រាក់ខែដោយប្រើ 'F.rank(.over(windowSpec)' នៅក្នុងសេចក្តីថ្លែងការណ៍ជ្រើសរើស។ ជួរលទ្ធផលមានឈ្មោះក្លែងក្លាយជា 'ចំណាត់ថ្នាក់'។
- វានឹងបង្កើត dataframe 'ranking_result_df' ដែលរួមមាន empno, ename, job, deptno និងប្រាក់ខែ។ វាក៏មានជួរថ្មីមួយទៀត 'ចំណាត់ថ្នាក់' ដែលតំណាងឱ្យឋានៈនៃប្រាក់ខែរបស់បុគ្គលិកនៅក្នុងនាយកដ្ឋានរបស់ពួកគេ។
លទ្ធផល:
លទ្ធផលមានចំណាត់ថ្នាក់ប្រាក់បៀវត្សរ៍ក្នុងនាយកដ្ឋាននីមួយៗ។
សំណួរទី 2 Dense ចាត់ថ្នាក់ប្រាក់បៀវត្សរ៍នៅក្នុងនាយកដ្ឋាននីមួយៗ។
# Using Spark SQL
dense_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal,
DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC)
AS dense_rank FROM emp""")
dense_df.show()
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()
# Output:-
+-----+------+---------+------+----+----------+
|empno| ename| job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839| KING|PRESIDENT| 10|5000| 1|
| 7782| CLARK| MANAGER| 10|2450| 2|
| 7369| SMITH| CLERK| 10| 800| 3|
| 7788| SCOTT| ANALYST| 20|3000| 1|
| 7566| JONES| MANAGER| 20|2975| 2|
| 7876| ADAMS| CLERK| 20|1100| 3|
| 7698| BLAKE| MANAGER| 30|2850| 1|
| 7499| ALLEN| SALESMAN| 30|1600| 2|
| 7844|TURNER| SALESMAN| 30|1500| 3|
| 7521| WARD| SALESMAN| 30|1250| 4|
| 7654|MARTIN| SALESMAN| 30|1250| 4|
| 7629| ALEX| SALESMAN| 30|1150| 5|
+-----+------+---------+------+----+----------+
វិធីសាស្រ្តសម្រាប់លេខកូដ PySpark
- ជាដំបូង បង្កើតការបញ្ជាក់របស់ Window ដោយប្រើប្រាស់មុខងារ Window ដែលបែងចែក 'emp_df' DataFrame ដោយ deptno ហើយបញ្ជាវាដោយចុះក្រោមជួរឈរ 'sal'។
- បន្ទាប់មក មុខងារ dense_rank() ត្រូវបានអនុវត្តលើការបញ្ជាក់របស់បង្អួច ដែលផ្តល់ចំណាត់ថ្នាក់ក្រាស់ទៅជួរនីមួយៗក្នុងភាគថាសនីមួយៗដោយផ្អែកលើលំដាប់ដែលបានតម្រៀបរបស់វា។
- ជាចុងក្រោយ DataFrame ថ្មីមួយហៅថា 'dense_ranking_df' ត្រូវបានបង្កើតឡើងដោយជ្រើសរើសជួរឈរជាក់លាក់ពី emp_df (ឧទាហរណ៍ 'empno', 'ename', 'job', 'deptno' និង 'sal') ហើយបន្ថែមជួរឈរថ្មី 'dense_rank' ដែល មានតម្លៃចំណាត់ថ្នាក់ក្រាស់ដែលគណនាដោយមុខងារបង្អួច។
- ចុងក្រោយ បង្ហាញ DataFrame លទ្ធផលជាទម្រង់តារាង។
លទ្ធផល:
លទ្ធផលមានកម្រិតប្រាក់បៀវត្សរ៍ក្រាស់។
សំណួរទី 3 លេខជួរនៅក្នុងផ្នែកនីមួយៗ។
# Using Spark SQL
row_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal,
ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
AS row_num FROM emp """)
row_df.show()
# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()
# Output:-
+-----+------+---------+------+----+-------+
|empno| ename| job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839| KING|PRESIDENT| 10|5000| 1|
| 7782| CLARK| MANAGER| 10|2450| 2|
| 7369| SMITH| CLERK| 10| 800| 3|
| 7788| SCOTT| ANALYST| 20|3000| 1|
| 7566| JONES| MANAGER| 20|2975| 2|
| 7876| ADAMS| CLERK| 20|1100| 3|
| 7698| BLAKE| MANAGER| 30|2850| 1|
| 7499| ALLEN| SALESMAN| 30|1600| 2|
| 7844|TURNER| SALESMAN| 30|1500| 3|
| 7521| WARD| SALESMAN| 30|1250| 4|
| 7654|MARTIN| SALESMAN| 30|1250| 5|
| 7629| ALEX| SALESMAN| 30|1150| 6|
+-----+------+---------+------+----+-------+
វិធីសាស្រ្តសម្រាប់លេខកូដ PySpark
- បន្ទាត់ទីមួយកំណត់ការបញ្ជាក់បង្អួចសម្រាប់ការគណនាដោយប្រើមុខងារ Window.partitionBy() និង Window.orderBy() ។ បង្អួចនេះត្រូវបានបែងចែកដោយជួរឈរ deptno ហើយត្រូវបានបញ្ជាដោយជួរឈរ sal តាមលំដាប់ចុះ។
- ជួរទីពីរបង្កើត DataFrame ថ្មីហៅថា 'row_num_df' ដែលជាការព្យាករនៃ 'emp_df' ជាមួយនឹងជួរឈរបន្ថែមហៅថា 'row_num' ហើយវាមានព័ត៌មានលម្អិតអំពីលេខជួរដេក។
- មុខងារ show() បង្ហាញ DataFrame លទ្ធផល ដែលបង្ហាញពី empno, ename, job, deptno, sal, និង row_num columns របស់និយោជិតនីមួយៗ។
លទ្ធផល:
លទ្ធផលនឹងមានលេខជួរដេករបស់បុគ្គលិកម្នាក់ៗនៅក្នុងនាយកដ្ឋានរបស់ពួកគេដោយផ្អែកលើប្រាក់ខែរបស់ពួកគេ។
សំណួរទី 4 ដំណើរការសរុបនៃប្រាក់បៀវត្សរ៍នៅក្នុងនាយកដ្ឋាននីមួយៗ។
# Using Spark SQL
running_sum_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal,
SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC)
AS running_total FROM emp
""")
running_sum_df.show()
# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()
# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename| job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839| KING|PRESIDENT| 10|5000| 5000|
| 7782| CLARK| MANAGER| 10|2450| 7450|
| 7369| SMITH| CLERK| 10| 800| 8250|
| 7788| SCOTT| ANALYST| 20|3000| 3000|
| 7566| JONES| MANAGER| 20|2975| 5975|
| 7876| ADAMS| CLERK| 20|1100| 7075|
| 7698| BLAKE| MANAGER| 30|2850| 2850|
| 7499| ALLEN| SALESMAN| 30|1600| 4450|
| 7844|TURNER| SALESMAN| 30|1500| 5950|
| 7521| WARD| SALESMAN| 30|1250| 8450|
| 7654|MARTIN| SALESMAN| 30|1250| 8450|
| 7629| ALEX| SALESMAN| 30|1150| 9600|
+-----+------+---------+------+----+-------------+
វិធីសាស្រ្ត សម្រាប់លេខកូដ PySpark
- ដំបូង ការកំណត់បង្អួចត្រូវបានកំណត់ដោយប្រើវិធីសាស្ត្រ "Window.partitionBy()" និង "Window.orderBy()" ។ វិធីសាស្ត្រ "partitionBy()" បែងចែកទិន្នន័យដោយជួរឈរ "deptno" ខណៈពេលដែលវិធីសាស្ត្រ "orderBy()" បញ្ជាទិន្នន័យដោយជួរឈរ "sal" តាមលំដាប់ចុះ។
- បន្ទាប់មក មុខងារ “sum()” ត្រូវបានអនុវត្តទៅជួរ “sal” ដោយប្រើវិធីសាស្ត្រ “over()” ដើម្បីគណនាប្រាក់បៀវត្សរ៍សរុបដែលកំពុងដំណើរការនៅក្នុងនាយកដ្ឋាននីមួយៗ។ លទ្ធផលនឹងស្ថិតនៅក្នុង DataFrame ថ្មីដែលហៅថា “running_sum_sal_df” ដែលមានជួរឈរ 'empno', 'ename', 'job', 'deptno', 'sal' និង 'running_total'។
- ជាចុងក្រោយ វិធីសាស្ត្រ "show()" ត្រូវបានហៅនៅលើ "running_sum_sal_df" DataFrame ដើម្បីបង្ហាញលទ្ធផលនៃសំណួរ។ លទ្ធផល DataFrame បង្ហាញពីប្រាក់បៀវត្សរ៍សរុបដែលកំពុងដំណើរការរបស់និយោជិតនីមួយៗ និងព័ត៌មានលម្អិតផ្សេងទៀតដូចជា ឈ្មោះ លេខនាយកដ្ឋាន និងការងារ។
លទ្ធផល:
លទ្ធផលនឹងមានចំនួនសរុបដែលកំពុងដំណើរការនៃទិន្នន័យប្រាក់ខែរបស់នាយកដ្ឋាននីមួយៗ។
សំណួរទី៥៖ ប្រាក់ខែបន្ទាប់នៅក្នុងនាយកដ្ឋាននីមួយៗ។
ដើម្បីស្វែងរកប្រាក់ខែបន្ទាប់នៅក្នុងនាយកដ្ឋាននីមួយៗ យើងប្រើមុខងារ LEAD ។
មុខងារបង្អួចនាំមុខ () ជួយឱ្យទទួលបានតម្លៃនៃកន្សោមនៅក្នុងជួរបន្ទាប់នៃភាគថាសបង្អួច។ វាត្រឡប់ជួរឈរសម្រាប់ជួរឈរបញ្ចូលនីមួយៗ ដែលជួរឈរនីមួយៗនឹងមានតម្លៃនៃជួរឈរបញ្ចូលសម្រាប់ជួរអុហ្វសិតខាងលើជួរបច្ចុប្បន្ននៅក្នុងភាគថាសបង្អួច។ វាក្យសម្ព័ន្ធសម្រាប់មុខងារនាំមុខគឺ៖- lead(col, offset=1, default=None)។
# Using Spark SQL
next_sal_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal, LEAD(sal, 1)
OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
""")
next_sal_df.show()
# Output:-
+-----+------+---------+------+----+--------+
|empno| ename| job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839| KING|PRESIDENT| 10|5000| 2450|
| 7782| CLARK| MANAGER| 10|2450| 800|
| 7369| SMITH| CLERK| 10| 800| null|
| 7788| SCOTT| ANALYST| 20|3000| 2975|
| 7566| JONES| MANAGER| 20|2975| 1100|
| 7876| ADAMS| CLERK| 20|1100| null|
| 7698| BLAKE| MANAGER| 30|2850| 1600|
| 7499| ALLEN| SALESMAN| 30|1600| 1500|
| 7844|TURNER| SALESMAN| 30|1500| 1250|
| 7521| WARD| SALESMAN| 30|1250| 1250|
| 7654|MARTIN| SALESMAN| 30|1250| 1150|
| 7629| ALEX| SALESMAN| 30|1150| null|
+-----+------+---------+------+----+--------+
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()
# Output:-
+-----+------+---------+------+----+--------+
|empno| ename| job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839| KING|PRESIDENT| 10|5000| 2450|
| 7782| CLARK| MANAGER| 10|2450| 800|
| 7369| SMITH| CLERK| 10| 800| 0|
| 7788| SCOTT| ANALYST| 20|3000| 2975|
| 7566| JONES| MANAGER| 20|2975| 1100|
| 7876| ADAMS| CLERK| 20|1100| 0|
| 7698| BLAKE| MANAGER| 30|2850| 1600|
| 7499| ALLEN| SALESMAN| 30|1600| 1500|
| 7844|TURNER| SALESMAN| 30|1500| 1250|
| 7521| WARD| SALESMAN| 30|1250| 1250|
| 7654|MARTIN| SALESMAN| 30|1250| 1150|
| 7629| ALEX| SALESMAN| 30|1150| 0|
+-----+------+---------+------+----+--------+
វិធីសាស្រ្តសម្រាប់លេខកូដ PySpark
- ទីមួយ មុខងារ window ជួយបែងចែកជួរដេករបស់ DataFrame តាមលេខនាយកដ្ឋាន (deptno) និងបញ្ជាប្រាក់ខែតាមលំដាប់ចុះក្នុង partition នីមួយៗ។
- បន្ទាប់មកមុខងារ lead() ត្រូវបានអនុវត្តទៅជួរ 'sal' ដែលបានបញ្ជានៅក្នុងភាគថាសនីមួយៗ ដើម្បីប្រគល់ប្រាក់បៀវត្សរ៍របស់និយោជិតខាងក្រោម (ជាមួយអុហ្វសិត 1) ហើយតម្លៃលំនាំដើមគឺ 0 ក្នុងករណីដែលមិនមានបុគ្គលិកបន្ទាប់។
- លទ្ធផល DataFrame 'next_salary_df' មានជួរឈរសម្រាប់លេខបុគ្គលិក (empno) ឈ្មោះ (ename) ចំណងជើងការងារ (job) លេខនាយកដ្ឋាន (deptno) ប្រាក់ខែបច្ចុប្បន្ន (sal) និងប្រាក់ខែបន្ទាប់ (next_val)។
លទ្ធផល:
លទ្ធផលមានប្រាក់បៀវត្សរ៍របស់និយោជិតបន្ទាប់នៅក្នុងនាយកដ្ឋានដោយផ្អែកលើលំដាប់នៃប្រាក់ខែធ្លាក់ចុះ។
សំណួរទី 6 ប្រាក់បៀវត្សរ៍ពីមុននៅក្នុងនាយកដ្ឋាននីមួយៗ។
ដើម្បីគណនាប្រាក់ខែមុន យើងប្រើមុខងារ LAG ។
មុខងារ lag ត្រឡប់តម្លៃនៃកន្សោមនៅអុហ្វសិតដែលបានផ្តល់ឱ្យមុនជួរបច្ចុប្បន្ននៅក្នុងភាគថាសបង្អួច។ វាក្យសម្ព័ន្ធនៃអនុគមន៍ lag គឺ៖- lag(expr, offset=1, default=None).over(windowSpec)។
# Using Spark SQL
preious_sal_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal, LAG(sal, 1)
OVER (PARTITION BY deptno ORDER BY sal DESC)
AS prev_val FROM emp
""")
preious_sal_df.show()
# Output:-
+-----+------+---------+------+----+--------+
|empno| ename| job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839| KING|PRESIDENT| 10|5000| null|
| 7782| CLARK| MANAGER| 10|2450| 5000|
| 7369| SMITH| CLERK| 10| 800| 2450|
| 7788| SCOTT| ANALYST| 20|3000| null|
| 7566| JONES| MANAGER| 20|2975| 3000|
| 7876| ADAMS| CLERK| 20|1100| 2975|
| 7698| BLAKE| MANAGER| 30|2850| null|
| 7499| ALLEN| SALESMAN| 30|1600| 2850|
| 7844|TURNER| SALESMAN| 30|1500| 1600|
| 7521| WARD| SALESMAN| 30|1250| 1500|
| 7654|MARTIN| SALESMAN| 30|1250| 1250|
| 7629| ALEX| SALESMAN| 30|1150| 1250|
+-----+------+---------+------+----+--------+
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()
# Output:-
+-----+------+---------+------+----+--------+
|empno| ename| job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839| KING|PRESIDENT| 10|5000| 0|
| 7782| CLARK| MANAGER| 10|2450| 5000|
| 7369| SMITH| CLERK| 10| 800| 2450|
| 7788| SCOTT| ANALYST| 20|3000| 0|
| 7566| JONES| MANAGER| 20|2975| 3000|
| 7876| ADAMS| CLERK| 20|1100| 2975|
| 7698| BLAKE| MANAGER| 30|2850| 0|
| 7499| ALLEN| SALESMAN| 30|1600| 2850|
| 7844|TURNER| SALESMAN| 30|1500| 1600|
| 7521| WARD| SALESMAN| 30|1250| 1500|
| 7654|MARTIN| SALESMAN| 30|1250| 1250|
| 7629| ALEX| SALESMAN| 30|1150| 1250|
+-----+------+---------+------+----+--------+
វិធីសាស្រ្តសម្រាប់លេខកូដ PySpark
- window.partitionBy(col('deptno')) បញ្ជាក់ភាគថាសបង្អួច។ វាមានន័យថាមុខងារបង្អួចធ្វើការដាច់ដោយឡែកសម្រាប់នាយកដ្ឋាននីមួយៗ។
- បន្ទាប់មក orderBy(col('sal').desc()) បញ្ជាក់លំដាប់នៃប្រាក់ខែ ហើយនឹងបញ្ជាប្រាក់ខែក្នុងផ្នែកនីមួយៗតាមលំដាប់ចុះ។
- F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') បង្កើតជួរឈរថ្មីមួយហៅថា prev_val នៅក្នុង DataFrame 'prev_sal_df'។
- សម្រាប់ជួរដេកនីមួយៗ ជួរឈរនេះមានតម្លៃនៃជួរឈរ 'sal' ពីជួរមុនក្នុងបង្អួចដែលកំណត់ដោយ windowSpec ។
- ប៉ារ៉ាម៉ែត្រ offset=1 បង្ហាញថាជួរមុនគួរតែជាជួរមួយមុនជួរបច្ចុប្បន្ន ហើយ default=0 បញ្ជាក់តម្លៃលំនាំដើមសម្រាប់ជួរទីមួយក្នុងភាគថាសនីមួយៗ (ព្រោះថាមិនមានជួរមុនសម្រាប់ជួរទីមួយទេ)។
- ទីបំផុត prev_sal_df.show() បង្ហាញ DataFrame លទ្ធផល។
លទ្ធផល:
ទិន្នផលតំណាងឱ្យប្រាក់បៀវត្សរ៍ពីមុនសម្រាប់និយោជិតម្នាក់ៗនៅក្នុងនាយកដ្ឋាននីមួយៗ ដោយផ្អែកលើលំដាប់នៃប្រាក់ខែតាមលំដាប់ចុះ។
សំណួរទី 7 ។ ប្រាក់បៀវត្សរ៍ដំបូងនៅក្នុងនាយកដ្ឋាននីមួយៗ និងប្រៀបធៀបជាមួយសមាជិកនីមួយៗនៅក្នុងនាយកដ្ឋាននីមួយៗ។
# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal,
FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC)
AS first_val FROM emp """)
first_val_df.show()
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()
# Output:-
+-----+------+---------+------+----+---------+
|empno| ename| job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839| KING|PRESIDENT| 10|5000| 5000|
| 7782| CLARK| MANAGER| 10|2450| 5000|
| 7369| SMITH| CLERK| 10| 800| 5000|
| 7788| SCOTT| ANALYST| 20|3000| 3000|
| 7566| JONES| MANAGER| 20|2975| 3000|
| 7876| ADAMS| CLERK| 20|1100| 3000|
| 7698| BLAKE| MANAGER| 30|2850| 2850|
| 7499| ALLEN| SALESMAN| 30|1600| 2850|
| 7844|TURNER| SALESMAN| 30|1500| 2850|
| 7521| WARD| SALESMAN| 30|1250| 2850|
| 7654|MARTIN| SALESMAN| 30|1250| 2850|
| 7629| ALEX| SALESMAN| 30|1150| 2850|
+-----+------+---------+------+----+---------+
វិធីសាស្រ្តសម្រាប់លេខកូដ PySpark
- ដំបូងបង្កើតវត្ថុ WindowSpec ដែលបែងចែកទិន្នន័យតាមលេខនាយកដ្ឋាន (deptno) ហើយបញ្ជាវាដោយប្រាក់ខែ (sal) តាមលំដាប់ចុះ។
- បន្ទាប់មកអនុវត្តមុខងារវិភាគដំបូង () ទៅជួរ 'sal' នៅលើបង្អួចដែលកំណត់ដោយ windowSpec ។ មុខងារនេះត្រឡប់តម្លៃដំបូងនៃជួរឈរ 'sal' នៅក្នុងភាគថាសនីមួយៗ (ឧ. ក្រុម deptno នីមួយៗ) ដែលបញ្ជាដោយចុះក្រោម 'sal' ។ ជួរលទ្ធផលមានឈ្មោះថ្មី 'first_val'។
- ឥឡូវនេះកំណត់លទ្ធផល DataFrame ដែលមានជួរឈរដែលបានជ្រើសរើស និងជួរឈរថ្មី 'first_val' ដែលបង្ហាញពីប្រាក់ខែខ្ពស់បំផុតដំបូងគេសម្រាប់នាយកដ្ឋាននីមួយៗដោយផ្អែកលើលំដាប់ចុះនៃតម្លៃប្រាក់បៀវត្សរ៍ទៅជាអថេរថ្មីមួយហៅថា 'first_value_df'។
លទ្ធផល:
លទ្ធផលបង្ហាញពីប្រាក់ខែខ្ពស់បំផុតដំបូងសម្រាប់នាយកដ្ឋាននីមួយៗនៅក្នុង DataFrame របស់បុគ្គលិក។
សន្និដ្ឋាន
នៅក្នុងអត្ថបទនេះ យើងរៀនអំពីមុខងាររបស់បង្អួច។ Spark SQL មានមុខងារ Window បីប្រភេទ៖ មុខងារចំណាត់ថ្នាក់ មុខងារសរុប និងមុខងារតម្លៃ។ ដោយប្រើមុខងារនេះ យើងបានធ្វើការលើសំណុំទិន្នន័យ ដើម្បីស្វែងរកការយល់ដឹងសំខាន់ៗ និងមានតម្លៃមួយចំនួន។ មុខងារ Spark Window ផ្តល់ជូននូវឧបករណ៍វិភាគទិន្នន័យដ៏មានឥទ្ធិពលដូចជា ចំណាត់ថ្នាក់ ការវិភាគ និងការគណនាតម្លៃ។ មិនថាការវិភាគការយល់ដឹងអំពីប្រាក់ខែតាមនាយកដ្ឋាន ឬការប្រើប្រាស់ឧទាហរណ៍ជាក់ស្តែងជាមួយ PySpark & SQL ទេ មុខងារទាំងនេះផ្តល់នូវឧបករណ៍សំខាន់ៗសម្រាប់ដំណើរការទិន្នន័យ និងការវិភាគប្រកបដោយប្រសិទ្ធភាពនៅក្នុង Spark ។
ការយកសំខាន់ៗ
- យើងបានសិក្សាអំពីមុខងាររបស់ Window ហើយបានធ្វើការជាមួយពួកគេដោយប្រើ Spark SQL និង PySpark DataFrame API។
- យើងប្រើមុខងារដូចជា rank, dense_rank, row_number, lag, lead, groupBy, partitionBy និងមុខងារផ្សេងទៀតដើម្បីផ្តល់ការវិភាគត្រឹមត្រូវ។
- យើងក៏បានឃើញដំណោះស្រាយជាជំហាន ៗ យ៉ាងលម្អិតចំពោះបញ្ហា និងវិភាគលទ្ធផលនៅចុងបញ្ចប់នៃសេចក្តីថ្លែងការណ៍បញ្ហានីមួយៗ។
ករណីសិក្សានេះជួយអ្នកឱ្យយល់កាន់តែច្បាស់អំពីមុខងារ PySpark ។ ប្រសិនបើអ្នកមានមតិយោបល់ ឬចម្ងល់ផ្សេងៗ សូម Comment ខាងក្រោម។ ភ្ជាប់ជាមួយខ្ញុំនៅលើ LinkedIn សម្រាប់ការពិភាក្សាបន្ថែម។ បន្តរៀន!!!
ប្រព័ន្ធផ្សព្វផ្សាយដែលបង្ហាញក្នុងអត្ថបទនេះមិនមែនជាកម្មសិទ្ធិរបស់ Analytics Vidhya ហើយត្រូវបានប្រើប្រាស់តាមការសំរេចចិត្តរបស់អ្នកនិពន្ធ។
- SEO ដែលដំណើរការដោយមាតិកា និងការចែកចាយ PR ។ ទទួលបានការពង្រីកថ្ងៃនេះ។
- PlatoData.Network Vertical Generative Ai. ផ្តល់អំណាចដល់ខ្លួនអ្នក។ ចូលប្រើទីនេះ។
- PlatoAiStream Web3 Intelligence ។ ចំណេះដឹងត្រូវបានពង្រីក។ ចូលប្រើទីនេះ។
- ផ្លាតូអេសជី។ កាបូន CleanTech, ថាមពល, បរិស្ថាន, ពន្លឺព្រះអាទិត្យ ការគ្រប់គ្រងកាកសំណល់។ ចូលប្រើទីនេះ។
- ផ្លាតូសុខភាព។ ជីវបច្ចេកវិទ្យា និង ភាពវៃឆ្លាត សាកល្បងគ្លីនិក។ ចូលប្រើទីនេះ។
- ប្រភព: https://www.analyticsvidhya.com/blog/2024/03/working-with-window-functions-in-pyspark/