لوگوی Zephyrnet

کار با توابع پنجره در PySpark

تاریخ:

معرفی

یادگیری در مورد توابع پنجره در PySpark می تواند چالش برانگیز باشد اما ارزش تلاش را دارد. Window Functions ابزاری قدرتمند برای تجزیه و تحلیل داده ها است و می تواند به شما کمک کند تا بینش هایی را به دست آورید که ممکن است در غیر این صورت ندیده باشید. با درک نحوه استفاده از توابع پنجره در Spark. شما می توانید خود را بردارید تحلیل داده ها مهارت ها را به سطح بعدی برسانید و تصمیمات آگاهانه تری بگیرید. چه با بزرگ و چه کوچک کار کنید مجموعه داده ها، یادگیری توابع پنجره در Spark به شما امکان می دهد داده ها را به روش های جدید و هیجان انگیز دستکاری و تجزیه و تحلیل کنید.

توابع پنجره در PySpark

در این وبلاگ ابتدا مفهوم توابع پنجره را درک می کنیم و سپس نحوه استفاده از آنها با Spark SQL و PySpark DataFrame API را مورد بحث قرار می دهیم. به طوری که در پایان این مقاله، نحوه استفاده از توابع پنجره با مجموعه داده های واقعی و دریافت بینش ضروری برای تجارت را خواهید فهمید.

اهداف یادگیری

  • مفهوم توابع پنجره را درک کنید.
  • کار با توابع پنجره با استفاده از مجموعه داده ها.
  • با استفاده از توابع پنجره اطلاعات بینش را بیابید.
  • برای کار با توابع پنجره از Spark SQL و DataFrame API استفاده کنید.

این مقاله به عنوان بخشی از بلاگاتون علم داده.

جدول محتوا

توابع پنجره چیست؟

توابع پنجره به تجزیه و تحلیل داده ها در یک گروه از ردیف هایی که به یکدیگر مرتبط هستند کمک می کند. آنها کاربران را قادر می سازند تا بر اساس برخی معیارهای پارتیشن بندی و ترتیب، تبدیل های پیچیده را روی ردیف های یک چارچوب داده یا مجموعه داده مرتبط با یکدیگر انجام دهند.

توابع پنجره بر روی یک پارتیشن خاص از یک دیتافریم یا مجموعه داده ای که توسط مجموعه ای از ستون های پارتیشن بندی تعریف شده است، عمل می کنند. را سفارش داده شده توسط بند داده ها را در یک تابع پنجره پارتیشن بندی می کند تا آنها را به ترتیب خاصی مرتب کند. سپس توابع پنجره محاسباتی را بر روی یک پنجره کشویی از ردیف‌ها انجام می‌دهند که شامل ردیف فعلی و زیرمجموعه‌ای از ردیف‌های قبلی «and»/«یا» همانطور که در قاب پنجره مشخص شده است.

کار با توابع پنجره در PySpark

برخی از نمونه‌های متداول توابع پنجره شامل محاسبه میانگین متحرک، رتبه‌بندی یا مرتب‌سازی ردیف‌ها بر اساس یک ستون یا گروه خاص است. ستون ها، محاسبه مجموع در حال اجرا و یافتن اولین یا آخرین مقدار در یک گروه از ردیف ها. با توابع قدرتمند پنجره Spark، کاربران می توانند تجزیه و تحلیل های پیچیده و تجمیع را انجام دهند مجموعه داده های بزرگ با سهولت نسبی، آن را به یک ابزار محبوب برای بزرگ تبدیل می کند پردازش داده ها و تحلیلی

"

توابع پنجره در SQL

Spark SQL از سه نوع عملکرد پنجره پشتیبانی می کند:

  • توابع رتبه بندی: - این توابع به هر سطر در یک پارتیشن از مجموعه نتیجه یک رتبه اختصاص می دهند. برای مثال، تابع ROW_NUMBER() یک عدد متوالی منحصر به فرد به هر سطر در پارتیشن می دهد.
  • توابع تجزیه و تحلیل: - این توابع مقادیر مجموع را در پنجره ای از ردیف ها محاسبه می کنند. به عنوان مثال، تابع SUM() مجموع یک ستون را در پنجره ای از ردیف ها محاسبه می کند.
  • توابع ارزش: - این توابع یک مقدار تحلیلی را برای هر ردیف در یک پارتیشن بر اساس مقادیر سایر ردیف‌های همان پارتیشن محاسبه می‌کنند. به عنوان مثال، تابع LAG() مقدار یک ستون را از ردیف قبلی در پارتیشن برمی گرداند.

ایجاد DataFrame

ما یک دیتافریم نمونه ایجاد خواهیم کرد تا بتوانیم عملاً با توابع مختلف پنجره کار کنیم. همچنین ما سعی خواهیم کرد با کمک این داده ها و توابع پنجره به برخی از سوالات پاسخ دهیم.

چارچوب داده دارای جزئیات کارمندان مانند نام، نام، شماره کارمند، تاریخ استخدام، حقوق و غیره است. در مجموع ما 8 ستون داریم که به شرح زیر است:

  • 'empno': این ستون شامل شماره کارمند است.
  • 'ename': این ستون دارای نام کارکنان است.
  • 'job': این ستون حاوی اطلاعاتی در مورد عناوین شغلی کارکنان است.
  • 'hiredate': این ستون تاریخ استخدام کارمند را نشان می دهد.
  • 'sal': جزئیات حقوق در این ستون موجود است.
  • 'comm': این ستون دارای جزئیات کمیسیون کارکنان است، در صورت وجود.
  • 'deptno': شماره بخشی که کارمند به آن تعلق دارد در این ستون است.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

اکنون طرحواره را بررسی می کنیم:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

یک نمای موقت از DataFrame 'emp_df' با نام "emp" ایجاد کنید. این امکان را به ما می دهد تا DataFrame را با استفاده از نحو SQL در Spark SQL جویا شویم که گویی یک جدول است. نمای موقت فقط برای مدت زمان جلسه Spark معتبر است.

emp_df.createOrReplaceTempView("emp")

حل بیان مسئله با استفاده از توابع پنجره

در اینجا ما چندین بیانیه مشکل را با استفاده از توابع ویندوز حل خواهیم کرد:

Q1. حقوق و دستمزد را در هر بخش رتبه بندی کنید.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

رویکردی برای کد PySpark

  • تابع Window داده ها را بر اساس شماره بخش با استفاده از partitionBy(col('deptno')) پارتیشن بندی می کند و سپس داده ها را در هر پارتیشن بر اساس حقوق به ترتیب نزولی با استفاده از orderBy(col('sal').desc() مرتب می کند. متغیر windowSpec مشخصات نهایی پنجره را نگه می دارد.
  • 'emp_df' چارچوب داده‌ای است که شامل داده‌های کارکنان، از جمله ستون‌هایی برای empno، ename، job، deptno و sal است.
  • تابع رتبه با استفاده از "F.rank().over(windowSpec)" در بیانیه select به ستون حقوق اعمال می شود. ستون حاصل نام مستعار "رتبه" دارد.
  • این یک دیتافریم، 'ranking_result_df' ایجاد می کند که شامل empno، ename، job، deptno و حقوق است. همچنین دارای یک ستون جدید به نام "رتبه" است که نشان دهنده رتبه حقوق کارمند در بخش آنها است.

خروجی:

نتیجه دارای رتبه حقوق در هر بخش است.

Q2. رتبه متراکم حقوق در هر بخش.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

رویکردی برای کد PySpark

  • ابتدا با استفاده از تابع Window یک مشخصات پنجره ایجاد کنید که DataFrame 'emp_df' را بر اساس deptno پارتیشن بندی می کند و با پایین آمدن ستون 'sal' مرتب می کند.
  • سپس، تابع dense_rank() بر روی مشخصات پنجره اعمال می‌شود، که یک رتبه متراکم را به هر ردیف در هر پارتیشن بر اساس ترتیب مرتب شده آن اختصاص می‌دهد.
  • در نهایت، یک DataFrame جدید به نام 'dense_ranking_df' با انتخاب ستون های خاص از emp_df (یعنی 'empno'، 'ename'، 'job'، 'deptno'، و 'sal') و افزودن یک ستون جدید 'dense_rank' ایجاد می شود. حاوی مقادیر متراکم رتبه بندی محاسبه شده توسط تابع پنجره است.
  • در آخر، DataFrame حاصل را در قالب جدول نمایش دهید.

خروجی:

نتیجه دارای رتبه متراکم حقوق و دستمزد است.

Q3. ردیف های داخل هر بخش را شماره گذاری کنید.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

رویکرد برای کد PySpark

  • خط اول با استفاده از توابع Window.partitionBy() و Window.orderBy() مشخصات پنجره ای را برای محاسبه تعریف می کند. این پنجره توسط ستون deptno تقسیم شده و توسط ستون sal به ترتیب نزولی مرتب شده است.
  • خط دوم یک DataFrame جدید به نام "row_num_df" ایجاد می کند، یک طرح از "emp_df" با یک ستون اضافی به نام "row_num" و حاوی جزئیات شماره ردیف است.
  • تابع show() DataFrame حاصل را نمایش می دهد که ستون های empno، ename، job، deptno، sal و row_num هر کارمند را نشان می دهد.

خروجی:

خروجی شماره ردیف هر کارمند در بخش خود را بر اساس حقوق آنها خواهد داشت.

Q4. مجموع حقوق در هر بخش در حال اجرا است.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

روش برای کد PySpark

  • ابتدا با استفاده از متدهای “Window.partitionBy()” و “Window.orderBy()” مشخصات پنجره تعریف می شود. روش "partitionBy()" داده ها را با ستون "deptno" تقسیم می کند، در حالی که روش "orderBy()" داده ها را توسط ستون "sal" به ترتیب نزولی مرتب می کند.
  • سپس، تابع "sum()" با استفاده از روش "over()" برای محاسبه مجموع حقوق در هر بخش به ستون "sal" اعمال می شود. نتیجه در یک DataFrame جدید به نام "running_sum_sal_df" خواهد بود که شامل ستون‌های "empno"، "ename"، "job"، "deptno"، "sal" و "running_total" است.
  • در نهایت متد show() در DataFrame “running_sum_sal_df” فراخوانی می شود تا خروجی پرس و جو را نمایش دهد. DataFrame حاصل، مجموع حقوق در حال اجرا هر کارمند و سایر جزئیات مانند نام، شماره بخش و شغل را نشان می دهد.

خروجی:

خروجی مجموع داده های حقوق هر بخش را در حال اجرا خواهد داشت.

Q5: حقوق بعدی در هر بخش.

برای یافتن حقوق بعدی در هر بخش از تابع LEAD استفاده می کنیم. 

تابع پنجره lead() به دریافت مقدار عبارت در ردیف بعدی پارتیشن پنجره کمک می کند. برای هر ستون ورودی یک ستون برمی گرداند، جایی که هر ستون حاوی مقدار ستون ورودی برای ردیف افست بالای ردیف فعلی در پارتیشن پنجره است. نحو برای تابع lead به این صورت است: - lead(col, offset=1, default=none).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

رویکرد برای کد PySpark

  • ابتدا، تابع پنجره به پارتیشن بندی ردیف های DataFrame بر اساس شماره بخش (deptno) و مرتب کردن حقوق به ترتیب نزولی در هر پارتیشن کمک می کند.
  • سپس تابع lead() به ستون "sal" مرتب شده در هر پارتیشن اعمال می شود تا حقوق کارمند زیر را برگرداند (با افست 1)، و مقدار پیش فرض 0 است در صورتی که کارمند بعدی وجود نداشته باشد.
  • DataFrame حاصل «next_salary_df» شامل ستون‌هایی برای شماره کارمند (empno)، نام (ename)، عنوان شغل (شغل)، شماره بخش (deptno)، حقوق فعلی (sal)، و حقوق بعدی (next_val) است.

خروجی:

خروجی شامل حقوق کارمند بعدی در بخش بر اساس ترتیب نزولی حقوق است. 

Q6. حقوق قبلی در هر بخش.

برای محاسبه حقوق قبلی از تابع LAG استفاده می کنیم.

تابع تاخیر مقدار یک عبارت را در یک افست داده شده قبل از ردیف فعلی در پارتیشن پنجره برمی گرداند. نحو تابع تأخیر:- تأخیر (expr، offset=1، پیش‌فرض=هیچ‌کدام). over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

رویکرد برای کد PySpark

  • window.partitionBy(col('deptno')) پارتیشن پنجره را مشخص می کند. این بدان معناست که عملکرد پنجره به طور جداگانه برای هر بخش کار می کند.
  • سپس orderBy(col('sal').desc()) ترتیب حقوق و دستمزد را مشخص می کند و حقوق هر بخش را به ترتیب نزولی ترتیب می دهد.
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') ستون جدیدی به نام prev_val در DataFrame 'prev_sal_df' ایجاد می کند.
  • برای هر ردیف، این ستون حاوی مقدار ستون 'sal' از ردیف قبلی در پنجره تعریف شده توسط windowSpec است.
  • پارامتر offset=1 نشان می دهد که سطر قبل باید یک سطر قبل از سطر فعلی باشد و default=0 مقدار پیش فرض سطر اول در هر پارتیشن را مشخص می کند (زیرا ردیف قبلی برای سطر اول وجود ندارد).
  • در نهایت، prev_sal_df.show() DataFrame حاصل را نمایش می دهد.

خروجی:

خروجی نشان دهنده حقوق قبلی برای هر کارمند در هر بخش، بر اساس ترتیب حقوق به ترتیب نزولی است.

Q7. اولین حقوق در هر بخش و مقایسه با هر عضو در هر بخش.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

رویکرد برای کد PySpark

  • ابتدا یک شی WindowSpec ایجاد کنید که داده ها را بر اساس شماره بخش (deptno) پارتیشن بندی می کند و آن ها را بر اساس حقوق (sal) به ترتیب نزولی مرتب می کند.
  • سپس تابع تحلیلی first() را در ستون 'sal' روی پنجره تعریف شده توسط windowSpec اعمال می کند. این تابع اولین مقدار ستون 'sal' را در هر پارتیشن (یعنی هر گروه deptno) که با نزولی 'sal' مرتب شده است، برمی گرداند. ستون حاصل نام جدیدی دارد، 'first_val'.
  • اکنون DataFrame به دست آمده را که شامل ستون های انتخاب شده و یک ستون جدید، 'first_val' است، که اولین بالاترین حقوق را برای هر بخش بر اساس ترتیب نزولی مقادیر حقوق نشان می دهد، به متغیر جدیدی به نام 'first_value_df' اختصاص می دهد.

خروجی:

خروجی اولین بالاترین حقوق را برای هر بخش در DataFrame کارمند نشان می دهد.

نتیجه

در این مقاله با توابع پنجره آشنا می شویم. Spark SQL دارای سه نوع تابع پنجره است: توابع رتبه بندی، توابع جمع و توابع ارزش. با استفاده از این تابع، ما روی یک مجموعه داده کار کردیم تا برخی از بینش های مهم و ارزشمند را پیدا کنیم. Spark Window Functions ابزارهای قدرتمند تجزیه و تحلیل داده مانند رتبه بندی، تجزیه و تحلیل و محاسبات ارزش را ارائه می دهد. چه تجزیه و تحلیل بینش حقوق و دستمزد توسط بخش یا استفاده از مثال‌های عملی با PySpark و SQL، این توابع ابزارهای ضروری را برای پردازش و تجزیه و تحلیل موثر داده‌ها در Spark فراهم می‌کنند.

گیرنده های کلیدی

  • ما با توابع پنجره آشنا شدیم و با استفاده از Spark SQL و PySpark DataFrame API با آنها کار کردیم.
  • ما از توابعی مانند rank، dense_rank، row_number، lag، lead، groupBy، partitionBy و سایر توابع برای ارائه تحلیل مناسب استفاده می کنیم.
  • ما همچنین راه حل های گام به گام و دقیق برای مشکل را دیده ایم و خروجی را در پایان هر بیانیه مشکل تحلیل کرده ایم.

این مطالعه موردی به شما کمک می کند تا توابع PySpark را بهتر درک کنید. اگر نظر یا سوالی دارید، در زیر کامنت کنید. با من ارتباط برقرار کن لینک برای بحث بیشتر به یادگیری ادامه بده!!!

رسانه نشان داده شده در این مقاله متعلق به Analytics Vidhya نیست و به صلاحدید نویسنده استفاده می شود.

نقطه_img

جدیدترین اطلاعات

نقطه_img