និមិត្តសញ្ញា Zephyrnet

ធ្វើការជាមួយមុខងារ Window នៅក្នុង PySpark

កាលបរិច្ឆេទ:

សេចក្តីផ្តើម

ការរៀនអំពីមុខងារ Window នៅក្នុង PySpark អាចជាបញ្ហាប្រឈម ប៉ុន្តែមានតម្លៃក្នុងការខិតខំប្រឹងប្រែង។ មុខងារ Window គឺជាឧបករណ៍ដ៏មានឥទ្ធិពលសម្រាប់ការវិភាគទិន្នន័យ និងអាចជួយអ្នកឱ្យទទួលបានការយល់ដឹងដែលអ្នកប្រហែលជាមិនធ្លាប់បានឃើញ។ តាមរយៈការយល់ដឹងពីរបៀបប្រើមុខងារ Window នៅក្នុង Spark; អ្នកអាចយករបស់អ្នក។ ការវិភាគ​ទិន្នន័យ ជំនាញទៅកម្រិតបន្ទាប់ និងធ្វើការសម្រេចចិត្តប្រកបដោយការយល់ដឹងបន្ថែមទៀត។ មិនថាអ្នកធ្វើការជាមួយធំឬតូច សំណុំទិន្នន័យការរៀនមុខងារ Window នៅក្នុង Spark នឹងអនុញ្ញាតឱ្យអ្នករៀបចំ និងវិភាគទិន្នន័យតាមរបៀបថ្មី និងគួរឱ្យរំភើប។

មុខងារ Window នៅក្នុង PySpark

នៅក្នុងប្លក់នេះ យើងនឹងយល់អំពីគោលគំនិតនៃមុខងារ window ជាមុនសិន ហើយបន្ទាប់មកពិភាក្សាពីរបៀបប្រើប្រាស់ពួកវាជាមួយ Spark SQL និង PySpark DataFrame API។ ដូច្នេះនៅចុងបញ្ចប់នៃអត្ថបទនេះ អ្នកនឹងយល់ពីរបៀបប្រើប្រាស់មុខងារ Window ជាមួយនឹងសំណុំទិន្នន័យពិតប្រាកដ និងទទួលបានការយល់ដឹងសំខាន់ៗសម្រាប់អាជីវកម្ម។

គោលបំណងនៃការរៀន

  • ស្វែងយល់ពីគំនិតនៃមុខងារបង្អួច។
  • ធ្វើការជាមួយមុខងារបង្អួចដោយប្រើសំណុំទិន្នន័យ។
  • ស្វែងយល់ពីការយល់ដឹងដោយប្រើមុខងារបង្អួច។
  • ប្រើ Spark SQL និង DataFrame API ដើម្បីធ្វើការជាមួយមុខងារបង្អួច។

អត្ថបទនេះត្រូវបានបោះពុម្ពផ្សាយជាផ្នែកមួយនៃព្រះគម្ពីរមរមន Blogathon វិទ្យាសាស្ត្រទិន្នន័យ។

តារាង​មាតិកា

តើមុខងារ Window មានអ្វីខ្លះ?

មុខងារ Window ជួយវិភាគទិន្នន័យក្នុងក្រុមជួរដេកដែលទាក់ទងគ្នាទៅវិញទៅមក។ ពួកវាអាចឱ្យអ្នកប្រើប្រាស់ធ្វើការបំប្លែងដ៏ស្មុគស្មាញនៅលើជួរដេកនៃស៊ុមទិន្នន័យ ឬសំណុំទិន្នន័យដែលទាក់ទងគ្នាទៅវិញទៅមក ដោយផ្អែកលើលក្ខណៈវិនិច្ឆ័យនៃការបែងចែក និងលំដាប់មួយចំនួន។

មុខងារ Window ដំណើរការលើភាគថាសជាក់លាក់នៃ dataframe ឬ dataset ដែលកំណត់ដោយសំណុំនៃ partitioning columns។ នេះ។ បញ្ជាដោយ clause បែងចែកទិន្នន័យនៅក្នុងមុខងារ window ដើម្បីរៀបចំវាតាមលំដាប់ជាក់លាក់មួយ។ បន្ទាប់មក អនុគមន៍​បង្អួច​អនុវត្ត​ការ​គណនា​លើ​បង្អួច​រំកិល​នៃ​ជួរ​ដេក ដែល​រួម​បញ្ចូល​ជួរ​ដេក​បច្ចុប្បន្ន និង​សំណុំ​រង​នៃ​ជួរ​ខាង​មុខ​ទាំង 'និង'/'ឬ' ខាងក្រោម ដូច​ដែល​បាន​បញ្ជាក់​ក្នុង​ស៊ុមបង្អួច។

ធ្វើការជាមួយមុខងារ Window នៅក្នុង PySpark

ឧទាហរណ៍ទូទៅមួយចំនួននៃមុខងារបង្អួចរួមមានការគណនាមធ្យមផ្លាស់ទី ចំណាត់ថ្នាក់ ឬតម្រៀបជួរដោយផ្អែកលើជួរឈរ ឬក្រុមជាក់លាក់នៃ ជួរឈរគណនាចំនួនសរុបដែលកំពុងដំណើរការ និងការស្វែងរកតម្លៃដំបូង ឬចុងក្រោយក្នុងក្រុមជួរ។ ជាមួយនឹងមុខងារបង្អួចដ៏មានអានុភាពរបស់ Spark អ្នកប្រើប្រាស់អាចធ្វើការវិភាគស្មុគស្មាញ និងការបូកសរុប សំណុំទិន្នន័យធំ ជាមួយនឹងភាពងាយស្រួលដែលទាក់ទង ធ្វើឱ្យវាក្លាយជាឧបករណ៍ពេញនិយមសម្រាប់ធំ ដំណើរការទិន្នន័យ និងការវិភាគ។

"

មុខងារ Window ក្នុង SQL

Spark SQL គាំទ្រមុខងារបង្អួចបីប្រភេទ៖

  • មុខងារចំណាត់ថ្នាក់៖ - មុខងារទាំងនេះផ្តល់ចំណាត់ថ្នាក់ទៅជួរនីមួយៗក្នុងភាគថាសនៃសំណុំលទ្ធផល។ ឧទាហរណ៍ អនុគមន៍ ROW_NUMBER() ផ្តល់លេខបន្តបន្ទាប់គ្នាតែមួយគត់ទៅជួរនីមួយៗក្នុងភាគថាស។
  • មុខងារវិភាគ៖ - មុខងារទាំងនេះគណនាតម្លៃសរុបនៅលើបង្អួចនៃជួរដេក។ ឧទាហរណ៍ អនុគមន៍ SUM() គណនាផលបូកនៃជួរឈរនៅលើបង្អួចនៃជួរដេក។
  • មុខងារតម្លៃ៖ - មុខងារទាំងនេះគណនាតម្លៃវិភាគសម្រាប់ជួរនីមួយៗក្នុងភាគថាស ដោយផ្អែកលើតម្លៃនៃជួរផ្សេងទៀតក្នុងភាគថាសដូចគ្នា។ ឧទាហរណ៍ អនុគមន៍ LAG() ត្រឡប់តម្លៃនៃជួរឈរពីជួរមុនក្នុងភាគថាស។

ការបង្កើត DataFrame

យើង​នឹង​បង្កើត​គំរូ​ទិន្នន័យ​មួយ ដូច្នេះ​យើង​អាច​ធ្វើ​ការ​ជាមួយ​មុខងារ​បង្អួច​ផ្សេងៗ​បាន​យ៉ាង​អនុវត្ត។ យើងក៏នឹងព្យាយាមឆ្លើយសំណួរមួយចំនួន ដោយមានជំនួយពីទិន្នន័យនេះ និងមុខងារបង្អួច។

ស៊ុមទិន្នន័យមានព័ត៌មានលំអិតអំពីបុគ្គលិកដូចជា ឈ្មោះ ការកំណត់ លេខនិយោជិត កាលបរិច្ឆេទជួល ប្រាក់បៀវត្សរ៍ជាដើម។ សរុបយើងមាន 8 ជួរដែលមានដូចខាងក្រោម៖

  • 'empno'៖ ជួរឈរនេះមានលេខបុគ្គលិក។
  • 'ename'៖ ជួរ​ឈរ​នេះ​មាន​ឈ្មោះ​បុគ្គលិក។
  • 'job': ជួរនេះមានព័ត៌មានអំពីចំណងជើងការងាររបស់និយោជិត។
  • 'hiredate'៖ ជួរឈរនេះបង្ហាញពីកាលបរិច្ឆេទជួលបុគ្គលិក។
  • 'sal': ព័ត៌មានលម្អិតអំពីប្រាក់ខែមាននៅក្នុងជួរឈរនេះ។
  • 'comm'៖ ជួរឈរនេះមានព័ត៌មានលម្អិតអំពីគណៈកម្មការបុគ្គលិក ប្រសិនបើមាន។
  • 'deptno': លេខនាយកដ្ឋានដែលនិយោជិតជាកម្មសិទ្ធិគឺនៅក្នុងជួរឈរនេះ។
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

ឥឡូវនេះយើងនឹងពិនិត្យមើលគ្រោងការណ៍៖

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

បង្កើតទិដ្ឋភាពបណ្តោះអាសន្ននៃ DataFrame 'emp_df' ដោយប្រើឈ្មោះ "emp" ។ វាអនុញ្ញាតឱ្យយើងសាកសួរ DataFrame ដោយប្រើវាក្យសម្ព័ន្ធ SQL នៅក្នុង Spark SQL ដូចជាវាជាតារាង។ ទិដ្ឋភាពបណ្តោះអាសន្នមានសុពលភាពសម្រាប់តែរយៈពេលនៃវគ្គ Spark Session ប៉ុណ្ណោះ។

emp_df.createOrReplaceTempView("emp")

ការដោះស្រាយបញ្ហាសេចក្តីថ្លែងការណ៍ដោយប្រើមុខងារបង្អួច

ខាងក្រោម​នេះ​យើង​នឹង​កំពុង​ដោះ​ស្រាយ​បញ្ហា​មួយ​ចំនួន​ដោយ​ប្រើ​មុខងារ windows៖

សំណួរទី 1 ចាត់ថ្នាក់ប្រាក់បៀវត្សរ៍ក្នុងផ្នែកនីមួយៗ។

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

វិធីសាស្រ្តសម្រាប់លេខកូដ PySpark

  • មុខងារ Window បែងចែកទិន្នន័យតាមលេខនាយកដ្ឋានដោយប្រើ partitionBy(col('deptno')) ហើយបន្ទាប់មកបញ្ជាទិន្នន័យក្នុង partition នីមួយៗដោយប្រាក់ខែតាមលំដាប់ចុះដោយប្រើ orderBy(col('sal').desc())។ WindowSpec អថេរ​រក្សា​ការ​បញ្ជាក់​វិនដូ​ចុងក្រោយ។
  • 'emp_df' គឺជា dataframe ដែលមានទិន្នន័យបុគ្គលិក រួមទាំងជួរឈរសម្រាប់ empno, ename, job, deptno និង sal ។
  • មុខងារចំណាត់ថ្នាក់ត្រូវបានអនុវត្តទៅជួរប្រាក់ខែដោយប្រើ 'F.rank(.over(windowSpec)' នៅក្នុងសេចក្តីថ្លែងការណ៍ជ្រើសរើស។ ជួរលទ្ធផលមានឈ្មោះក្លែងក្លាយជា 'ចំណាត់ថ្នាក់'។
  • វានឹងបង្កើត dataframe 'ranking_result_df' ដែលរួមមាន empno, ename, job, deptno និងប្រាក់ខែ។ វាក៏មានជួរថ្មីមួយទៀត 'ចំណាត់ថ្នាក់' ដែលតំណាងឱ្យឋានៈនៃប្រាក់ខែរបស់បុគ្គលិកនៅក្នុងនាយកដ្ឋានរបស់ពួកគេ។

លទ្ធផល:

លទ្ធផលមានចំណាត់ថ្នាក់ប្រាក់បៀវត្សរ៍ក្នុងនាយកដ្ឋាននីមួយៗ។

សំណួរទី 2 Dense ចាត់ថ្នាក់ប្រាក់បៀវត្សរ៍នៅក្នុងនាយកដ្ឋាននីមួយៗ។

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

វិធីសាស្រ្តសម្រាប់លេខកូដ PySpark

  • ជាដំបូង បង្កើតការបញ្ជាក់របស់ Window ដោយប្រើប្រាស់មុខងារ Window ដែលបែងចែក 'emp_df' DataFrame ដោយ deptno ហើយបញ្ជាវាដោយចុះក្រោមជួរឈរ 'sal'។
  • បន្ទាប់មក មុខងារ dense_rank() ត្រូវបានអនុវត្តលើការបញ្ជាក់របស់បង្អួច ដែលផ្តល់ចំណាត់ថ្នាក់ក្រាស់ទៅជួរនីមួយៗក្នុងភាគថាសនីមួយៗដោយផ្អែកលើលំដាប់ដែលបានតម្រៀបរបស់វា។
  • ជាចុងក្រោយ DataFrame ថ្មីមួយហៅថា 'dense_ranking_df' ត្រូវបានបង្កើតឡើងដោយជ្រើសរើសជួរឈរជាក់លាក់ពី emp_df (ឧទាហរណ៍ 'empno', 'ename', 'job', 'deptno' និង 'sal') ហើយបន្ថែមជួរឈរថ្មី 'dense_rank' ដែល មាន​តម្លៃ​ចំណាត់ថ្នាក់​ក្រាស់​ដែល​គណនា​ដោយ​មុខងារ​បង្អួច។
  • ចុងក្រោយ បង្ហាញ DataFrame លទ្ធផលជាទម្រង់តារាង។

លទ្ធផល:

លទ្ធផល​មាន​កម្រិត​ប្រាក់​បៀវត្សរ៍​ក្រាស់។

សំណួរទី 3 លេខជួរនៅក្នុងផ្នែកនីមួយៗ។

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

វិធីសាស្រ្តសម្រាប់លេខកូដ PySpark

  • បន្ទាត់ទីមួយកំណត់ការបញ្ជាក់បង្អួចសម្រាប់ការគណនាដោយប្រើមុខងារ Window.partitionBy() និង Window.orderBy() ។ បង្អួចនេះត្រូវបានបែងចែកដោយជួរឈរ deptno ហើយត្រូវបានបញ្ជាដោយជួរឈរ sal តាមលំដាប់ចុះ។
  • ជួរទីពីរបង្កើត DataFrame ថ្មីហៅថា 'row_num_df' ដែលជាការព្យាករនៃ 'emp_df' ជាមួយនឹងជួរឈរបន្ថែមហៅថា 'row_num' ហើយវាមានព័ត៌មានលម្អិតអំពីលេខជួរដេក។
  • មុខងារ show() បង្ហាញ DataFrame លទ្ធផល ដែលបង្ហាញពី empno, ename, job, deptno, sal, និង row_num columns របស់និយោជិតនីមួយៗ។

លទ្ធផល:

លទ្ធផលនឹងមានលេខជួរដេករបស់បុគ្គលិកម្នាក់ៗនៅក្នុងនាយកដ្ឋានរបស់ពួកគេដោយផ្អែកលើប្រាក់ខែរបស់ពួកគេ។

សំណួរទី 4 ដំណើរការសរុបនៃប្រាក់បៀវត្សរ៍នៅក្នុងនាយកដ្ឋាននីមួយៗ។

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

វិធីសាស្រ្ត សម្រាប់លេខកូដ PySpark

  • ដំបូង ការកំណត់បង្អួចត្រូវបានកំណត់ដោយប្រើវិធីសាស្ត្រ "Window.partitionBy()" និង "Window.orderBy()" ។ វិធីសាស្ត្រ "partitionBy()" បែងចែកទិន្នន័យដោយជួរឈរ "deptno" ខណៈពេលដែលវិធីសាស្ត្រ "orderBy()" បញ្ជាទិន្នន័យដោយជួរឈរ "sal" តាមលំដាប់ចុះ។
  • បន្ទាប់មក មុខងារ “sum()” ត្រូវបានអនុវត្តទៅជួរ “sal” ដោយប្រើវិធីសាស្ត្រ “over()” ដើម្បីគណនាប្រាក់បៀវត្សរ៍សរុបដែលកំពុងដំណើរការនៅក្នុងនាយកដ្ឋាននីមួយៗ។ លទ្ធផលនឹងស្ថិតនៅក្នុង DataFrame ថ្មីដែលហៅថា “running_sum_sal_df” ដែលមានជួរឈរ 'empno', 'ename', 'job', 'deptno', 'sal' និង 'running_total'។
  • ជាចុងក្រោយ វិធីសាស្ត្រ "show()" ត្រូវបានហៅនៅលើ "running_sum_sal_df" DataFrame ដើម្បីបង្ហាញលទ្ធផលនៃសំណួរ។ លទ្ធផល DataFrame បង្ហាញពីប្រាក់បៀវត្សរ៍សរុបដែលកំពុងដំណើរការរបស់និយោជិតនីមួយៗ និងព័ត៌មានលម្អិតផ្សេងទៀតដូចជា ឈ្មោះ លេខនាយកដ្ឋាន និងការងារ។

លទ្ធផល:

លទ្ធផលនឹងមានចំនួនសរុបដែលកំពុងដំណើរការនៃទិន្នន័យប្រាក់ខែរបស់នាយកដ្ឋាននីមួយៗ។

សំណួរទី៥៖ ប្រាក់ខែបន្ទាប់នៅក្នុងនាយកដ្ឋាននីមួយៗ។

ដើម្បីស្វែងរកប្រាក់ខែបន្ទាប់នៅក្នុងនាយកដ្ឋាននីមួយៗ យើងប្រើមុខងារ LEAD ។ 

មុខងារបង្អួចនាំមុខ () ជួយឱ្យទទួលបានតម្លៃនៃកន្សោមនៅក្នុងជួរបន្ទាប់នៃភាគថាសបង្អួច។ វាត្រឡប់ជួរឈរសម្រាប់ជួរឈរបញ្ចូលនីមួយៗ ដែលជួរឈរនីមួយៗនឹងមានតម្លៃនៃជួរឈរបញ្ចូលសម្រាប់ជួរអុហ្វសិតខាងលើជួរបច្ចុប្បន្ននៅក្នុងភាគថាសបង្អួច។ វាក្យសម្ព័ន្ធសម្រាប់មុខងារនាំមុខគឺ៖- lead(col, offset=1, default=None)។

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

វិធីសាស្រ្តសម្រាប់លេខកូដ PySpark

  • ទីមួយ មុខងារ window ជួយបែងចែកជួរដេករបស់ DataFrame តាមលេខនាយកដ្ឋាន (deptno) និងបញ្ជាប្រាក់ខែតាមលំដាប់ចុះក្នុង partition នីមួយៗ។
  • បន្ទាប់មកមុខងារ lead() ត្រូវបានអនុវត្តទៅជួរ 'sal' ដែលបានបញ្ជានៅក្នុងភាគថាសនីមួយៗ ដើម្បីប្រគល់ប្រាក់បៀវត្សរ៍របស់និយោជិតខាងក្រោម (ជាមួយអុហ្វសិត 1) ហើយតម្លៃលំនាំដើមគឺ 0 ក្នុងករណីដែលមិនមានបុគ្គលិកបន្ទាប់។
  • លទ្ធផល DataFrame 'next_salary_df' មានជួរឈរសម្រាប់លេខបុគ្គលិក (empno) ឈ្មោះ (ename) ចំណងជើងការងារ (job) លេខនាយកដ្ឋាន (deptno) ប្រាក់ខែបច្ចុប្បន្ន (sal) និងប្រាក់ខែបន្ទាប់ (next_val)។

លទ្ធផល:

លទ្ធផលមានប្រាក់បៀវត្សរ៍របស់និយោជិតបន្ទាប់នៅក្នុងនាយកដ្ឋានដោយផ្អែកលើលំដាប់នៃប្រាក់ខែធ្លាក់ចុះ។ 

សំណួរទី 6 ប្រាក់បៀវត្សរ៍ពីមុននៅក្នុងនាយកដ្ឋាននីមួយៗ។

ដើម្បីគណនាប្រាក់ខែមុន យើងប្រើមុខងារ LAG ។

មុខងារ lag ត្រឡប់តម្លៃនៃកន្សោមនៅអុហ្វសិតដែលបានផ្តល់ឱ្យមុនជួរបច្ចុប្បន្ននៅក្នុងភាគថាសបង្អួច។ វាក្យសម្ព័ន្ធនៃអនុគមន៍ lag គឺ៖- lag(expr, offset=1, default=None).over(windowSpec)។

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

វិធីសាស្រ្តសម្រាប់លេខកូដ PySpark

  • window.partitionBy(col('deptno')) បញ្ជាក់ភាគថាសបង្អួច។ វាមានន័យថាមុខងារបង្អួចធ្វើការដាច់ដោយឡែកសម្រាប់នាយកដ្ឋាននីមួយៗ។
  • បន្ទាប់មក orderBy(col('sal').desc()) បញ្ជាក់លំដាប់នៃប្រាក់ខែ ហើយនឹងបញ្ជាប្រាក់ខែក្នុងផ្នែកនីមួយៗតាមលំដាប់ចុះ។
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') បង្កើតជួរឈរថ្មីមួយហៅថា prev_val នៅក្នុង DataFrame 'prev_sal_df'។
  • សម្រាប់​ជួរ​ដេក​នីមួយៗ ជួរ​ឈរ​នេះ​មាន​តម្លៃ​នៃ​ជួរ​ឈរ 'sal' ពី​ជួរ​មុន​ក្នុង​បង្អួច​ដែល​កំណត់​ដោយ windowSpec ។
  • ប៉ារ៉ាម៉ែត្រ offset=1 បង្ហាញថាជួរមុនគួរតែជាជួរមួយមុនជួរបច្ចុប្បន្ន ហើយ default=0 បញ្ជាក់តម្លៃលំនាំដើមសម្រាប់ជួរទីមួយក្នុងភាគថាសនីមួយៗ (ព្រោះថាមិនមានជួរមុនសម្រាប់ជួរទីមួយទេ)។
  • ទីបំផុត prev_sal_df.show() បង្ហាញ DataFrame លទ្ធផល។

លទ្ធផល:

ទិន្នផលតំណាងឱ្យប្រាក់បៀវត្សរ៍ពីមុនសម្រាប់និយោជិតម្នាក់ៗនៅក្នុងនាយកដ្ឋាននីមួយៗ ដោយផ្អែកលើលំដាប់នៃប្រាក់ខែតាមលំដាប់ចុះ។

សំណួរទី 7 ។ ប្រាក់បៀវត្សរ៍ដំបូងនៅក្នុងនាយកដ្ឋាននីមួយៗ និងប្រៀបធៀបជាមួយសមាជិកនីមួយៗនៅក្នុងនាយកដ្ឋាននីមួយៗ។

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

វិធីសាស្រ្តសម្រាប់លេខកូដ PySpark

  • ដំបូងបង្កើតវត្ថុ WindowSpec ដែលបែងចែកទិន្នន័យតាមលេខនាយកដ្ឋាន (deptno) ហើយបញ្ជាវាដោយប្រាក់ខែ (sal) តាមលំដាប់ចុះ។
  • បន្ទាប់មកអនុវត្តមុខងារវិភាគដំបូង () ទៅជួរ 'sal' នៅលើបង្អួចដែលកំណត់ដោយ windowSpec ។ មុខងារនេះត្រឡប់តម្លៃដំបូងនៃជួរឈរ 'sal' នៅក្នុងភាគថាសនីមួយៗ (ឧ. ក្រុម deptno នីមួយៗ) ដែលបញ្ជាដោយចុះក្រោម 'sal' ។ ជួរលទ្ធផលមានឈ្មោះថ្មី 'first_val'។
  • ឥឡូវនេះកំណត់លទ្ធផល DataFrame ដែលមានជួរឈរដែលបានជ្រើសរើស និងជួរឈរថ្មី 'first_val' ដែលបង្ហាញពីប្រាក់ខែខ្ពស់បំផុតដំបូងគេសម្រាប់នាយកដ្ឋាននីមួយៗដោយផ្អែកលើលំដាប់ចុះនៃតម្លៃប្រាក់បៀវត្សរ៍ទៅជាអថេរថ្មីមួយហៅថា 'first_value_df'។

លទ្ធផល:

លទ្ធផលបង្ហាញពីប្រាក់ខែខ្ពស់បំផុតដំបូងសម្រាប់នាយកដ្ឋាននីមួយៗនៅក្នុង DataFrame របស់បុគ្គលិក។

សន្និដ្ឋាន

នៅក្នុងអត្ថបទនេះ យើងរៀនអំពីមុខងាររបស់បង្អួច។ Spark SQL មានមុខងារ Window បីប្រភេទ៖ មុខងារចំណាត់ថ្នាក់ មុខងារសរុប និងមុខងារតម្លៃ។ ដោយប្រើមុខងារនេះ យើងបានធ្វើការលើសំណុំទិន្នន័យ ដើម្បីស្វែងរកការយល់ដឹងសំខាន់ៗ និងមានតម្លៃមួយចំនួន។ មុខងារ Spark Window ផ្តល់ជូននូវឧបករណ៍វិភាគទិន្នន័យដ៏មានឥទ្ធិពលដូចជា ចំណាត់ថ្នាក់ ការវិភាគ និងការគណនាតម្លៃ។ មិនថាការវិភាគការយល់ដឹងអំពីប្រាក់ខែតាមនាយកដ្ឋាន ឬការប្រើប្រាស់ឧទាហរណ៍ជាក់ស្តែងជាមួយ PySpark & ​​SQL ទេ មុខងារទាំងនេះផ្តល់នូវឧបករណ៍សំខាន់ៗសម្រាប់ដំណើរការទិន្នន័យ និងការវិភាគប្រកបដោយប្រសិទ្ធភាពនៅក្នុង Spark ។

ការយកសំខាន់ៗ

  • យើងបានសិក្សាអំពីមុខងាររបស់ Window ហើយបានធ្វើការជាមួយពួកគេដោយប្រើ Spark SQL និង PySpark DataFrame API។
  • យើងប្រើមុខងារដូចជា rank, dense_rank, row_number, lag, lead, groupBy, partitionBy និងមុខងារផ្សេងទៀតដើម្បីផ្តល់ការវិភាគត្រឹមត្រូវ។
  • យើងក៏បានឃើញដំណោះស្រាយជាជំហាន ៗ យ៉ាងលម្អិតចំពោះបញ្ហា និងវិភាគលទ្ធផលនៅចុងបញ្ចប់នៃសេចក្តីថ្លែងការណ៍បញ្ហានីមួយៗ។

ករណីសិក្សានេះជួយអ្នកឱ្យយល់កាន់តែច្បាស់អំពីមុខងារ PySpark ។ ប្រសិនបើអ្នកមានមតិយោបល់ ឬចម្ងល់ផ្សេងៗ សូម Comment ខាងក្រោម។ ភ្ជាប់ជាមួយខ្ញុំនៅលើ LinkedIn សម្រាប់ការពិភាក្សាបន្ថែម។ បន្តរៀន!!!

ប្រព័ន្ធផ្សព្វផ្សាយដែលបង្ហាញក្នុងអត្ថបទនេះមិនមែនជាកម្មសិទ្ធិរបស់ Analytics Vidhya ហើយត្រូវបានប្រើប្រាស់តាមការសំរេចចិត្តរបស់អ្នកនិពន្ធ។

spot_img

បញ្ញាចុងក្រោយ

spot_img