شعار زيفيرنت

العمل مع وظائف النافذة في PySpark

التاريخ:

المُقدّمة

قد يكون التعرف على وظائف النافذة في PySpark أمرًا صعبًا ولكنه يستحق الجهد المبذول. تعد وظائف النافذة أداة قوية لتحليل البيانات ويمكن أن تساعدك في الحصول على رؤى ربما لم تكن تراها بطريقة أخرى. من خلال فهم كيفية استخدام وظائف النافذة في Spark؛ يمكنك أن تأخذ الخاص بك تحليل البيانات المهارات إلى المستوى التالي واتخاذ قرارات أكثر استنارة. سواء كنت تعمل مع كبير أو صغير قواعد البيانات، سيسمح لك تعلم وظائف النافذة في Spark بمعالجة البيانات وتحليلها بطرق جديدة ومثيرة.

وظائف النافذة في PySpark

في هذه المدونة، سنفهم أولاً مفهوم وظائف النافذة ثم نناقش كيفية استخدامها مع Spark SQL وPySpark DataFrame API. وبحلول نهاية هذه المقالة، ستفهم كيفية استخدام وظائف النافذة مع مجموعات البيانات الحقيقية والحصول على رؤى أساسية للأعمال.

أهداف التعلم

  • فهم مفهوم وظائف النافذة.
  • العمل مع وظائف النافذة باستخدام مجموعات البيانات.
  • تعرف على الرؤى باستخدام وظائف النافذة.
  • استخدم Spark SQL وDataFrame API للعمل مع وظائف النافذة.

تم نشر هذه المقالة كجزء من مدونة علوم البيانات.

جدول المحتويات

ما هي وظائف النافذة؟

تساعد وظائف النافذة في تحليل البيانات ضمن مجموعة من الصفوف المرتبطة ببعضها البعض. إنها تمكن المستخدمين من إجراء تحويلات معقدة على صفوف إطار البيانات أو مجموعة البيانات المرتبطة ببعضها البعض بناءً على بعض معايير التقسيم والترتيب.

تعمل وظائف النافذة على قسم محدد من إطار البيانات أو مجموعة البيانات المحددة بواسطة مجموعة من أعمدة التقسيم. ال ترتيب حسب تقوم الجملة بتقسيم البيانات في وظيفة النافذة لترتيبها بترتيب معين. تقوم وظائف النافذة بعد ذلك بإجراء العمليات الحسابية على نافذة منزلقة من الصفوف التي تتضمن الصف الحالي ومجموعة فرعية من الصفوف "و"/"أو" السابقة، كما هو محدد في إطار النافذة.

العمل مع وظائف النافذة في PySpark

تتضمن بعض الأمثلة الشائعة لوظائف النافذة حساب المتوسطات المتحركة أو ترتيب أو فرز الصفوف بناءً على عمود أو مجموعة محددة من الأعمدةوحساب الإجماليات الجارية وإيجاد القيمة الأولى أو الأخيرة في مجموعة من الصفوف. بفضل وظائف نافذة Spark القوية، يمكن للمستخدمين إجراء تحليلات وتجميعات معقدة مجموعات كبيرة من البيانات بسهولة نسبية، مما يجعلها أداة شائعة للشركات الكبيرة معالجة المعلومات والتحليلات.

"

وظائف النافذة في SQL

يدعم Spark SQL ثلاثة أنواع من وظائف النافذة:

  • وظائف التصنيف:- تقوم هذه الوظائف بتعيين رتبة لكل صف داخل قسم من مجموعة النتائج. على سبيل المثال، توفر الدالة ROW_NUMBER() رقمًا تسلسليًا فريدًا لكل صف داخل القسم.
  • وظائف التحليلات:- تقوم هذه الوظائف بحساب القيم المجمعة عبر نافذة من الصفوف. على سبيل المثال، تقوم الدالة SUM() بحساب مجموع عمود فوق نافذة من الصفوف.
  • وظائف القيمة:- تحسب هذه الوظائف قيمة تحليلية لكل صف في القسم، بناءً على قيم الصفوف الأخرى في نفس القسم. على سبيل المثال، تقوم الدالة LAG() بإرجاع قيمة عمود من الصف السابق في القسم.

إنشاء إطار البيانات

سنقوم بإنشاء نموذج بيانات حتى نتمكن عمليًا من العمل مع وظائف النوافذ المختلفة. سنحاول أيضًا الإجابة على بعض الأسئلة بمساعدة هذه البيانات ووظائف النافذة.

يحتوي إطار البيانات على تفاصيل الموظفين مثل الاسم والمنصب ورقم الموظف وتاريخ التوظيف والراتب وما إلى ذلك. المجموع لدينا 8 أعمدة وهي كما يلي:

  • 'empno': يحتوي هذا العمود على رقم الموظف.
  • "ename": يحتوي هذا العمود على أسماء الموظفين.
  • "الوظيفة": يحتوي هذا العمود على معلومات حول المسميات الوظيفية للموظفين.
  • "تاريخ التعيين": يعرض هذا العمود تاريخ تعيين الموظف.
  • 'sal': تفاصيل الراتب موجودة في هذا العمود.
  • 'comm': يحتوي هذا العمود على تفاصيل عمولة الموظف، إن وجدت.
  • 'deptno': رقم القسم الذي ينتمي إليه الموظف موجود في هذا العمود.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

الآن سوف نتحقق من المخطط:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

قم بإنشاء عرض مؤقت لـ DataFrame 'emp_df' بالاسم "emp". يسمح لنا بالاستعلام عن DataFrame باستخدام بناء جملة SQL في Spark SQL كما لو كان جدولاً. العرض المؤقت صالح فقط طوال مدة جلسة Spark.

emp_df.createOrReplaceTempView("emp")

حل بيانات المشكلة باستخدام وظائف النافذة

سنقوم هنا بحل العديد من بيانات المشكلات باستخدام وظائف Windows:

س1. ترتيب الراتب داخل كل قسم.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

نهج لرمز PySpark

  • تقوم وظيفة Window بتقسيم البيانات حسب رقم القسم باستخدام PartitionBy(col('deptno')) ثم ترتيب البيانات داخل كل قسم حسب الراتب بترتيب تنازلي باستخدام orderBy(col('sal').desc()). يحمل المتغير windowSpec المواصفات النهائية للنافذة.
  • "emp_df" هو إطار البيانات الذي يحتوي على بيانات الموظف، بما في ذلك أعمدة empno وename وjob وdeptno وsal.
  • يتم تطبيق وظيفة التصنيف على عمود الراتب باستخدام "F.rank().over(windowSpec)" ضمن عبارة التحديد. يحتوي العمود الناتج على اسم مستعار باسم "الرتبة".
  • سيتم إنشاء إطار بيانات، "ranking_result_df"، والذي يتضمن الموظف والاسم والوظيفة والقسم والراتب. كما يحتوي أيضًا على عمود جديد، "الرتبة"، يمثل رتبة راتب الموظف داخل قسمه.

الإخراج:

النتيجة لها رتبة الراتب في كل قسم.

س2. رتبة كثيفة الراتب داخل كل قسم.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

نهج لرمز PySpark

  • أولاً، قم بإنشاء مواصفات نافذة باستخدام وظيفة Window، التي تقوم بتقسيم DataFrame 'emp_df' بواسطة deptno وترتيبها عن طريق نزول العمود 'sal'.
  • بعد ذلك، يتم تطبيق وظيفة Density_rank () على مواصفات النافذة، والتي تقوم بتعيين رتبة كثيفة لكل صف داخل كل قسم بناءً على ترتيبه المفرز.
  • أخيرًا، يتم إنشاء DataFrame جديد يسمى "dense_ranking_df" عن طريق تحديد أعمدة محددة من emp_df (على سبيل المثال، "empno"، و"ename"، و"job"، و"deptno"، و"sal") وإضافة عمود جديد "dense_rank" والذي يحتوي على قيم الترتيب الكثيفة المحسوبة بواسطة وظيفة النافذة.
  • أخيرًا، قم بعرض DataFrame الناتج بتنسيق جدولي.

الإخراج:

النتيجة لها رتبة كثيفة من حيث الراتب.

س3. رقم الصف داخل كل قسم.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

نهج لرمز PySpark

  • يحدد السطر الأول مواصفات النافذة للعملية الحسابية باستخدام الدالتين Window.partitionBy() وWindow.orderBy(). يتم تقسيم هذه النافذة بواسطة عمود deptno ويتم ترتيبها بواسطة عمود sal بترتيب تنازلي.
  • يقوم السطر الثاني بإنشاء DataFrame جديد يسمى "row_num_df"، وهو إسقاط لـ "emp_df" مع عمود إضافي يسمى "row_num" ويحتوي على تفاصيل أرقام الصفوف.
  • تعرض الدالة show() DataFrame الناتجة، والتي تعرض أعمدة empno وename وjob وdeptno وsal وrow_num لكل موظف.

الإخراج:

سيحتوي الناتج على رقم صف كل موظف داخل قسمه بناءً على راتبه.

س 4. تشغيل المبلغ الإجمالي للرواتب داخل كل قسم.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

الرسالة لرمز PySpark

  • أولاً، يتم تعريف مواصفات النافذة باستخدام طريقتي "Window.partitionBy()" و"Window.orderBy()". تقوم طريقة "partitionBy()" بتقسيم البيانات حسب عمود "deptno"، بينما تقوم طريقة "orderBy()" بترتيب البيانات حسب عمود "sal" بترتيب تنازلي.
  • بعد ذلك، يتم تطبيق الدالة "sum()" على عمود "sal" باستخدام طريقة "over()" لحساب إجمالي الرواتب الجارية داخل كل قسم. ستكون النتيجة في DataFrame جديد يسمى "running_sum_sal_df"، والذي يحتوي على الأعمدة "empno"، و"ename"، و"job"، و"deptno"، و"sal"، و"running_total".
  • أخيرًا، يتم استدعاء الأسلوب "show()" في DataFrame "running_sum_sal_df" لعرض مخرجات الاستعلام. يُظهر DataFrame الناتج إجمالي رواتب كل موظف وتفاصيل أخرى مثل الاسم ورقم القسم والوظيفة.

الإخراج:

سيكون الناتج إجماليًا جاريًا لبيانات الرواتب لكل قسم.

س5: الراتب التالي داخل كل قسم.

للعثور على الراتب التالي داخل كل قسم نستخدم وظيفة LEAD. 

تساعد وظيفة نافذة Lead () في الحصول على قيمة التعبير في الصف التالي من قسم النافذة. تقوم بإرجاع عمود لكل عمود إدخال، حيث سيحتوي كل عمود على قيمة عمود الإدخال لصف الإزاحة أعلى الصف الحالي داخل قسم النافذة. بناء جملة الدالة الرئيسية هو: - Lead(col, offset=1, default=None).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

نهج لرمز PySpark

  • أولا، تساعد وظيفة window على تقسيم صفوف DataFrame حسب رقم القسم (deptno) وترتيب الرواتب تنازليا داخل كل قسم.
  • يتم بعد ذلك تطبيق الدالة Lead() على عمود "sal" المرتب داخل كل قسم لإرجاع راتب الموظف التالي (مع إزاحة 1)، والقيمة الافتراضية هي 0 في حالة عدم وجود موظف تالي.
  • يحتوي DataFrame الناتج 'next_salary_df' على أعمدة لرقم الموظف (empno)، والاسم (ename)، والمسمى الوظيفي (job)، ورقم القسم (deptno)، والراتب الحالي (sal)، والراتب التالي (next_val).

الإخراج:

تحتوي المخرجات على راتب الموظف التالي في القسم حسب الترتيب التنازلي للراتب. 

س6. الراتب السابق داخل كل قسم.

لحساب الراتب السابق نستخدم دالة LAG.

تقوم الدالة lag بإرجاع قيمة التعبير عند إزاحة معينة قبل الصف الحالي داخل قسم النافذة. بناء جملة الدالة lag هو: - lag(expr, offset=1, default=None).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

نهج لرمز PySpark

  • يحدد window.partitionBy(col('deptno')) قسم النافذة. وهذا يعني أن وظيفة النافذة تعمل بشكل منفصل لكل قسم.
  • ثم يحدد orderBy(col('sal').desc()) ترتيب الراتب وسيقوم بترتيب الرواتب داخل كل قسم بترتيب تنازلي.
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') ينشئ عمودًا جديدًا يسمى prev_val في DataFrame 'prev_sal_df'.
  • لكل صف، يحتوي هذا العمود على قيمة العمود "sal" من الصف السابق داخل النافذة المحددة بواسطة windowSpec.
  • تشير المعلمة offset=1 إلى أن الصف السابق يجب أن يكون صفًا واحدًا قبل الصف الحالي، وتحدد المعلمة default=0 القيمة الافتراضية للصف الأول في كل قسم (نظرًا لعدم وجود صف سابق للصف الأول).
  • وأخيرًا، يعرض prev_sal_df.show() DataFrame الناتج.

الإخراج:

يمثل الناتج الراتب السابق لكل موظف داخل كل قسم بناء على ترتيب الرواتب تنازليا.

س7. الراتب الأول داخل كل قسم ومقارنته بكل عضو داخل كل قسم.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

نهج لرمز PySpark

  • أولاً، قم بإنشاء كائن WindowSpec الذي يقوم بتقسيم البيانات حسب رقم القسم (deptno) وترتيبها حسب الراتب (sal) بترتيب تنازلي.
  • ثم يتم تطبيق الوظيفة التحليلية الأولى () على عمود "sal" الموجود فوق النافذة المحددة بواسطة windowSpec. تقوم هذه الدالة بإرجاع القيمة الأولى لعمود "sal" داخل كل قسم (أي كل مجموعة deptno) مرتبة حسب "sal" تنازليًا. العمود الناتج له اسم جديد، "first_val".
  • يقوم الآن بتعيين DataFrame الناتج، الذي يحتوي على الأعمدة المحددة وعمود جديد، "first_val"، الذي يعرض أول أعلى راتب لكل قسم بناءً على الترتيب التنازلي لقيم الرواتب، إلى متغير جديد يسمى "first_value_df".

الإخراج:

يُظهر الإخراج أول أعلى راتب لكل قسم في DataFrame الخاص بالموظف.

وفي الختام

في هذه المقالة نتعرف على وظائف النافذة. يحتوي Spark SQL على ثلاثة أنواع من وظائف النوافذ: وظائف التصنيف، والوظائف التجميعية، ووظائف القيمة. باستخدام هذه الوظيفة، عملنا على مجموعة بيانات للعثور على بعض الأفكار المهمة والقيمة. توفر وظائف Spark Window أدوات قوية لتحليل البيانات مثل التصنيف والتحليلات وحسابات القيمة. سواء كان تحليل رؤى الرواتب حسب القسم أو استخدام أمثلة عملية مع PySpark وSQL، توفر هذه الوظائف أدوات أساسية لمعالجة البيانات وتحليلها بشكل فعال في Spark.

الوجبات السريعة الرئيسية

  • لقد تعرفنا على وظائف النافذة وعملنا معها باستخدام Spark SQL وPySpark DataFrame API.
  • نحن نستخدم وظائف مثل الترتيب، وكثافة_الرتبة، ورقم الصف، والتأخر، والرصاص، وgroupBy، وpartitionBy، وغيرها من الوظائف لتوفير التحليل المناسب.
  • لقد رأينا أيضًا الحلول التفصيلية للمشكلة خطوة بخطوة وقمنا بتحليل المخرجات في نهاية كل بيان مشكلة.

تساعدك دراسة الحالة هذه على فهم وظائف PySpark بشكل أفضل. إذا كان لديك أي آراء أو أسئلة، ثم التعليق أدناه. تواصل معي على لينكدين: لمزيد من المناقشة. استمر في التعلم!!!

الوسائط الموضحة في هذه المقالة ليست مملوكة لـ Analytics Vidhya ويتم استخدامها وفقًا لتقدير المؤلف.

بقعة_صورة

أحدث المعلومات الاستخباراتية

بقعة_صورة