Logo Zephyrnet

Làm việc với các hàm cửa sổ trong PySpark

Ngày:

Giới thiệu

Tìm hiểu về Chức năng cửa sổ trong PySpark có thể khó khăn nhưng đáng nỗ lực. Hàm cửa sổ là một công cụ mạnh mẽ để phân tích dữ liệu và có thể giúp bạn hiểu rõ hơn mà có thể bạn chưa từng thấy. Bằng cách hiểu cách sử dụng Hàm cửa sổ trong Spark; bạn có thể lấy của bạn phân tích dữ liệu kỹ năng lên cấp độ tiếp theo và đưa ra quyết định sáng suốt hơn. Cho dù bạn đang làm việc với quy mô lớn hay nhỏ bộ dữ liệu, Học các hàm cửa sổ trong Spark sẽ cho phép bạn thao tác và phân tích dữ liệu theo những cách mới và thú vị.

Chức năng cửa sổ trong PySpark

Trong blog này, trước tiên chúng ta sẽ hiểu khái niệm về các hàm cửa sổ và sau đó thảo luận cách sử dụng chúng với API Spark SQL và PySpark DataFrame. Để đến cuối bài viết này, bạn sẽ hiểu cách sử dụng các hàm cửa sổ với bộ dữ liệu thực và có được những hiểu biết cần thiết cho doanh nghiệp.

Mục tiêu học tập

  • Hiểu khái niệm về các hàm của cửa sổ.
  • Làm việc với các hàm cửa sổ bằng cách sử dụng bộ dữ liệu.
  • Tìm hiểu những hiểu biết sâu sắc bằng cách sử dụng các chức năng của cửa sổ.
  • Sử dụng API Spark SQL và DataFrame để làm việc với các hàm cửa sổ.

Bài báo này đã được xuất bản như một phần của Blogathon Khoa học Dữ liệu.

Mục lục

Chức năng cửa sổ là gì?

Các hàm cửa sổ giúp phân tích dữ liệu trong một nhóm hàng có liên quan với nhau. Chúng cho phép người dùng thực hiện các phép biến đổi phức tạp trên các hàng của khung dữ liệu hoặc tập dữ liệu được liên kết với nhau dựa trên một số tiêu chí phân vùng và sắp xếp.

Các hàm cửa sổ hoạt động trên một phân vùng cụ thể của khung dữ liệu hoặc tập dữ liệu được xác định bởi một tập hợp các cột phân vùng. Các ĐẶT BỞI mệnh đề phân vùng dữ liệu trong hàm cửa sổ để sắp xếp nó theo một thứ tự cụ thể. Sau đó, các hàm cửa sổ thực hiện các phép tính trên một cửa sổ trượt gồm các hàng bao gồm hàng hiện tại và một tập hợp con của các hàng tiếp theo 'và'/'hoặc' trước đó, như được chỉ định trong khung cửa sổ.

Làm việc với các hàm cửa sổ trong PySpark

Một số ví dụ phổ biến về hàm cửa sổ bao gồm tính toán đường trung bình động, xếp hạng hoặc sắp xếp các hàng dựa trên một cột hoặc nhóm cụ thể. cột, tính tổng số đang chạy và tìm giá trị đầu tiên hoặc cuối cùng trong một nhóm hàng. Với các chức năng cửa sổ mạnh mẽ của Spark, người dùng có thể thực hiện các phân tích và tổng hợp phức tạp trên bộ dữ liệu lớn một cách tương đối dễ dàng, khiến nó trở thành một công cụ phổ biến cho các doanh nghiệp lớn xử lý dữ liệu và phân tích.

"

Hàm cửa sổ trong SQL

Spark SQL hỗ trợ ba loại chức năng cửa sổ:

  • Chức năng xếp hạng: - Các hàm này gán thứ hạng cho mỗi hàng trong một phân vùng của tập kết quả. Ví dụ: hàm ROW_NUMBER() cung cấp một số thứ tự duy nhất cho mỗi hàng trong phân vùng.
  • Chức năng phân tích: - Các hàm này tính toán các giá trị tổng hợp trên một cửa sổ hàng. Ví dụ: hàm SUM() tính tổng của một cột trên một cửa sổ các hàng.
  • Hàm giá trị: - Các hàm này tính toán giá trị phân tích cho mỗi hàng trong một phân vùng, dựa trên giá trị của các hàng khác trong cùng phân vùng. Ví dụ: hàm LAG() trả về giá trị của một cột ở hàng trước đó trong phân vùng.

Tạo khung dữ liệu

Chúng tôi sẽ tạo một khung dữ liệu mẫu để chúng tôi có thể làm việc thực tế với các chức năng cửa sổ khác nhau. Ngoài ra, chúng tôi sẽ cố gắng trả lời một số câu hỏi với sự trợ giúp của dữ liệu này và các chức năng của cửa sổ.

Khung dữ liệu có các thông tin chi tiết về nhân viên như Tên, Chức danh, Mã số nhân viên, Ngày thuê, Mức lương, v.v. Tổng cộng chúng tôi có 8 cột như sau:

  • 'empno': Cột này chứa mã số của nhân viên.
  • 'ename': Cột này chứa tên nhân viên.
  • 'job': Cột này chứa thông tin về chức danh công việc của nhân viên.
  • 'ngày thuê': Cột này hiển thị ngày thuê của nhân viên.
  • 'sal': Chi tiết lương có trong cột này.
  • 'comm': Cột này có chi tiết hoa hồng của nhân viên, nếu có.
  • 'deptno': Mã số bộ phận mà nhân viên thuộc về nằm trong cột này.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

Bây giờ chúng ta sẽ kiểm tra lược đồ:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

Tạo chế độ xem tạm thời của DataFrame 'emp_df' với tên “emp”. Nó cho phép chúng ta truy vấn DataFrame bằng cú pháp SQL trong Spark SQL như thể nó là một bảng. Chế độ xem tạm thời chỉ có hiệu lực trong suốt thời gian của Phiên Spark.

emp_df.createOrReplaceTempView("emp")

Giải quyết các câu lệnh vấn đề bằng cách sử dụng các hàm cửa sổ

Ở đây chúng ta sẽ giải quyết một số câu lệnh bằng cách sử dụng các hàm của Windows:

Q1. Xếp hạng mức lương trong mỗi bộ phận.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

Cách tiếp cận mã PySpark

  • Hàm Window phân vùng dữ liệu theo mã số phòng ban bằng cách sử dụng phân vùngBy(col('deptno')) rồi sắp xếp dữ liệu trong mỗi phân vùng theo mức lương theo thứ tự giảm dần bằng cách sử dụng orderBy(col('sal').desc()). Biến windowSpec chứa đặc tả cửa sổ cuối cùng.
  • 'emp_df' là khung dữ liệu chứa dữ liệu nhân viên, bao gồm các cột cho empno, ename, job, deptno và sal.
  • Hàm xếp hạng được áp dụng cho cột lương bằng cách sử dụng 'F.rank().over(windowSpec)' trong câu lệnh chọn. Cột kết quả có tên bí danh là 'xếp hạng'.
  • Nó sẽ tạo một khung dữ liệu, 'ranking_result_df', bao gồm empno, ename, job, deptno và lương. Nó cũng có một cột mới, 'cấp bậc', đại diện cho cấp bậc lương của nhân viên trong bộ phận của họ.

Đầu ra:

Kết quả có bậc lương ở từng bộ phận.

Q2. Xếp hạng dày đặc mức lương trong mỗi bộ phận.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

Cách tiếp cận mã PySpark

  • Đầu tiên, tạo một đặc tả cửa sổ bằng cách sử dụng hàm Window, hàm này phân vùng DataFrame 'emp_df' theo deptno và sắp xếp nó bằng cách giảm dần cột 'sal'.
  • Sau đó, hàmdense_rank() được áp dụng trên đặc tả cửa sổ, hàm này gán thứ hạng dày đặc cho mỗi hàng trong mỗi phân vùng dựa trên thứ tự sắp xếp của nó.
  • Cuối cùng, một DataFrame mới có tên là 'dense_ranking_df' được tạo bằng cách chọn các cột cụ thể từ emp_df (tức là 'empno', 'ename', 'job', 'deptno' và 'sal') và thêm một cột mới 'dense_rank' chứa các giá trị xếp hạng dày đặc được tính toán bởi hàm cửa sổ.
  • Cuối cùng, hiển thị DataFrame kết quả ở định dạng bảng.

Đầu ra:

Kết quả có một thứ hạng dày đặc về lương.

Q3. Đánh số hàng trong mỗi bộ phận.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

Cách tiếp cận mã PySpark

  • Dòng đầu tiên xác định đặc tả cửa sổ cho phép tính bằng cách sử dụng các hàm Window.partitionBy() và Window.orderBy(). Cửa sổ này được phân chia theo cột deptno và được sắp xếp theo cột sal theo thứ tự giảm dần.
  • Dòng thứ hai tạo một DataFrame mới có tên là 'row_num_df', một hình chiếu của 'emp_df' với một cột bổ sung có tên là 'row_num' và nó chứa chi tiết về số hàng.
  • Hàm show() hiển thị DataFrame kết quả, hiển thị các cột empno, ename, job, deptno, sal và row_num của mỗi nhân viên.

Đầu ra:

Đầu ra sẽ có số hàng của từng nhân viên trong bộ phận của họ dựa trên mức lương của họ.

Q4. Chạy tổng tiền lương trong từng bộ phận.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

Phương pháp tiếp cận cho mã PySpark

  • Đầu tiên, đặc tả cửa sổ được xác định bằng cách sử dụng các phương thức “Window.partitionBy()” và “Window.orderBy()”. Phương thức “partitionBy()” phân vùng dữ liệu theo cột “deptno”, trong khi phương thức “orderBy()” sắp xếp dữ liệu theo cột “sal” theo thứ tự giảm dần.
  • Tiếp theo, hàm “sum()” được áp dụng cho cột “sal” bằng phương pháp “over()” để tính tổng lương hiện có trong mỗi bộ phận. Kết quả sẽ ở trong một DataFrame mới có tên là “running_sum_sal_df”, chứa các cột 'empno', 'ename', 'job', 'deptno', 'sal' và 'running_total'.
  • Cuối cùng, phương thức “show()” được gọi trên DataFrame “running_sum_sal_df” để hiển thị kết quả đầu ra của truy vấn. DataFrame kết quả hiển thị tổng tiền lương hiện tại của mỗi nhân viên và các chi tiết khác như tên, mã số phòng ban và công việc.

Đầu ra:

Đầu ra sẽ có tổng số dữ liệu lương của từng bộ phận.

Q5: Mức lương tiếp theo của từng bộ phận.

Để tìm mức lương tiếp theo trong mỗi bộ phận, chúng tôi sử dụng hàm LEAD. 

Hàm cửa sổ lead() giúp lấy giá trị của biểu thức ở hàng tiếp theo của phân vùng cửa sổ. Nó trả về một cột cho mỗi cột đầu vào, trong đó mỗi cột sẽ chứa giá trị của cột đầu vào cho hàng offset phía trên hàng hiện tại trong phân vùng cửa sổ. Cú pháp của hàm lead là:- lead(col, offset=1, default=None).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

Cách tiếp cận mã PySpark

  • Đầu tiên, chức năng cửa sổ giúp phân vùng các hàng của DataFrame theo số phòng ban (deptno) và sắp xếp mức lương theo thứ tự giảm dần trong mỗi phân vùng.
  • Sau đó, hàm lead() được áp dụng cho cột 'sal' có thứ tự trong mỗi phân vùng để trả về lương của nhân viên sau (với offset là 1) và giá trị mặc định là 0 trong trường hợp không có nhân viên tiếp theo.
  • DataFrame 'next_salary_df' kết quả chứa các cột cho mã số nhân viên (empno), tên (ename), chức danh (công việc), mã số phòng ban (deptno), mức lương hiện tại (sal) và mức lương tiếp theo (next_val).

Đầu ra:

Kết quả đầu ra chứa mức lương của nhân viên tiếp theo trong bộ phận theo thứ tự lương giảm dần. 

Q6. Mức lương trước đây trong mỗi bộ phận.

Để tính mức lương trước đó, chúng ta sử dụng hàm LAG.

Hàm lag trả về giá trị của một biểu thức tại một khoảng lệch nhất định trước hàng hiện tại trong phân vùng cửa sổ. Cú pháp của hàm lag là:- lag(expr, offset=1, default=None).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

Cách tiếp cận mã PySpark

  • window.partitionBy(col('deptno')) chỉ định phân vùng cửa sổ. Điều đó có nghĩa là chức năng cửa sổ hoạt động riêng biệt cho từng bộ phận.
  • Sau đó orderBy(col('sal').desc()) chỉ định thứ tự lương và sẽ sắp xếp lương trong mỗi bộ phận theo thứ tự giảm dần.
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') tạo một cột mới có tên là prev_val trong DataFrame 'prev_sal_df'.
  • Đối với mỗi hàng, cột này chứa giá trị của cột 'sal' từ hàng trước trong cửa sổ được xác định bởi windowSpec.
  • Tham số offset=1 chỉ ra rằng hàng trước phải là một hàng trước hàng hiện tại và default=0 chỉ định giá trị mặc định cho hàng đầu tiên trong mỗi phân vùng (vì không có hàng trước cho hàng đầu tiên).
  • Cuối cùng, prev_sal_df.show() hiển thị DataFrame kết quả.

Đầu ra:

Kết quả đầu ra thể hiện mức lương trước đó của từng nhân viên trong mỗi bộ phận, dựa trên việc sắp xếp mức lương theo thứ tự giảm dần.

Q7. Mức lương đầu tiên trong mỗi bộ phận và so sánh với mọi thành viên trong mỗi bộ phận.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

Cách tiếp cận mã PySpark

  • Đầu tiên, tạo một đối tượng WindowSpec để phân vùng dữ liệu theo mã số phòng ban (deptno) và sắp xếp dữ liệu theo mức lương (sal) theo thứ tự giảm dần.
  • Sau đó áp dụng hàm phân tích first() cho cột 'sal' trên cửa sổ được xác định bởi windowSpec. Hàm này trả về giá trị đầu tiên của cột 'sal' trong mỗi phân vùng (tức là mỗi nhóm deptno) được sắp xếp theo thứ tự 'sal' giảm dần. Cột kết quả có tên mới, 'first_val'.
  • Bây giờ chỉ định DataFrame kết quả, chứa các cột đã chọn và một cột mới, 'first_val', hiển thị mức lương cao nhất đầu tiên cho mỗi bộ phận dựa trên thứ tự giảm dần của các giá trị lương, cho một biến mới có tên là 'first_value_df'.

Đầu ra:

Đầu ra hiển thị mức lương cao nhất đầu tiên cho mỗi bộ phận trong DataFrame của nhân viên.

Kết luận

Trong bài này chúng ta tìm hiểu về các hàm của cửa sổ. Spark SQL có ba loại hàm cửa sổ: Hàm xếp hạng, Hàm tổng hợp và Hàm giá trị. Bằng cách sử dụng chức năng này, chúng tôi đã làm việc trên một tập dữ liệu để tìm ra một số thông tin chi tiết quan trọng và có giá trị. Spark Window Functions cung cấp các công cụ phân tích dữ liệu mạnh mẽ như xếp hạng, phân tích và tính toán giá trị. Cho dù phân tích thông tin chi tiết về lương theo bộ phận hay sử dụng các ví dụ thực tế với PySpark & ​​SQL, các chức năng này đều cung cấp các công cụ thiết yếu để xử lý và phân tích dữ liệu hiệu quả trong Spark.

Chìa khóa chính

  • Chúng tôi đã tìm hiểu về các chức năng của cửa sổ và làm việc với chúng bằng API Spark SQL và PySpark DataFrame.
  • Chúng tôi sử dụng các hàm như xếp hạng,dense_rank, row_number, lag, lead, groupBy, phân vùngBy và các hàm khác để cung cấp phân tích thích hợp.
  • Chúng tôi cũng đã xem các giải pháp chi tiết từng bước cho vấn đề và phân tích kết quả đầu ra ở cuối mỗi báo cáo vấn đề.

Nghiên cứu điển hình này giúp bạn hiểu rõ hơn về các chức năng của PySpark. Nếu các bạn có ý kiến ​​hay thắc mắc gì thì hãy bình luận bên dưới nhé. Kết nối với tôi trên LinkedIn để thảo luận thêm. Hãy tiếp tục học hỏi!!!

Phương tiện hiển thị trong bài viết này không thuộc sở hữu của Analytics Vidhya và được sử dụng theo quyết định riêng của Tác giả.

tại chỗ_img

Tin tức mới nhất

tại chỗ_img