和風網標誌

在 PySpark 中使用視窗函數

日期:

簡介

了解 PySpark 中的視窗函數可能具有挑戰性,但值得付出努力。視窗函數是分析資料的強大工具,可以幫助您獲得其他方式可能看不到的見解。透過了解如何在 Spark 中使用視窗函數;你可以帶著你的 數據分析 技能提升到新的水平並做出更明智的決策。無論您是與大型企業還是小型企業合作 數據集,學習 Spark 中的視窗函數將使您能夠以令人興奮的新方式操作和分析資料。

PySpark 中的視窗函數

在本部落格中,我們將首先了解視窗函數的概念,然後討論如何將它們與 Spark SQL 和 PySpark DataFrame API 一起使用。因此,在本文結束時,您將了解如何將視窗函數與真實資料集結合使用,並獲得重要的業務見解。

學習目標

  • 理解窗函數的概念。
  • 使用資料集處理視窗函數。
  • 使用視窗函數找出見解。
  • 使用 Spark SQL 和 DataFrame API 來處理視窗函數。

這篇文章是作為 數據科學博客馬拉松。

目錄

什麼是窗函數?

視窗函數有助於分析一組彼此相關的行中的資料。它們使用戶能夠根據某些分區和排序標準對彼此關聯的資料框或資料集的行執行複雜的轉換。

視窗函數對由一組分區列定義的資料框或資料集的特定分區進行操作。這 訂購 子句對視窗函數中的資料進行分區,以按特定順序排列資料。然後,視窗函數對行的滑動視窗執行計算,其中包括當前行以及前面“和”/“或”後面行的子集,如視窗框架中所指定。

在 PySpark 中使用視窗函數

視窗函數的一些常見範例包括計算移動平均值、基於特定列或群組對行進行排名或排序 ,計算運行總計,並找出一組行中的第一個或最後一個值。借助Spark強大的視窗函數,使用者可以進行複雜的分析和聚合 大型數據集 相對輕鬆,使其成為大型企業的流行工具 數據處理 和分析。

"

SQL 中的視窗函數

Spark SQL支援三種視窗函數:

  • 排名功能:- 這些函數為結果集分區內的每一行分配一個排名。例如,ROW_NUMBER() 函數為分割區內的每一行提供唯一的序號。
  • 分析功能:- 這些函數計算行視窗上的聚合值。例如,SUM() 函數計算行視窗中列的總和。
  • 價值函數:- 這些函數根據同一分區中其他行的值計算分區中每一行的分析值。例如,LAG() 函數傳回分區中前一行的列值。

資料框創建

我們將建立一個範例資料框,以便我們可以實際使用不同的視窗函數。我們也將嘗試借助這些數據和視窗函數來回答一些問題。

資料框包含員工詳細信息,例如姓名、職務、員工編號、僱用日期、薪資等。總共有 8 列,如下所示:

  • 'empno':此欄位包含員工的編號。
  • 'ename':此欄位包含員工姓名。
  • 「職位」:此列包含有關員工職位的資訊。
  • 「僱用日期」:此欄位顯示員工的僱用日期。
  • 'sal':此列中包含薪資詳細資料。
  • 「comm」:此列包含員工佣金詳細資料(如果有)。
  • 'deptno':員工所屬的部門號碼在此列。
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

現在我們將檢查架構:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

建立名為“emp”的資料幀“emp_df”的臨時視圖。它允許我們在 Spark SQL 中使用 SQL 語法查詢 DataFrame,就好像它是一個表一樣。臨時視圖僅在 Spark 會話期間有效。

emp_df.createOrReplaceTempView("emp")

使用視窗函數解決問題陳述

在這裡,我們將使用 Windows 函數解決幾個問題陳述:

Q1.對每個部門的薪資進行排名。

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

PySpark 程式碼的方法

  • Window 函數使用partitionBy(col('deptno')) 依部門編號對資料進行分區,然後使用orderBy(col('sal').desc()) 依工資降序對每個分區內的資料進行排序。變數 windowSpec 保存最終的視窗規格。
  • 'emp_df' 是包含員工資料的資料框,包括 empno、ename、job、deptno 和 sal 欄位。
  • 在 select 語句中使用「F.rank().over(windowSpec)」將排名函數應用於薪資列。產生的列的別名為「rank」。
  • 它將建立一個資料框“ranking_result_df”,其中包括 empno、ename、job、deptno 和薪資。它還具有一個新列“等級”,代表員工在其部門內的工資等級。

輸出:

結果有每個部門的薪資排名。

Q2。各部門內的薪資等級密集。

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

PySpark 程式碼的方法

  • 首先,使用 Window 函數建立一個視窗規範,該函數按 deptno 分區「emp_df」DataFrame,並按「sal」列降序排列。
  • 然後,dense_rank() 函數應用於視窗規範,它根據排序順序為每個分區內的每一行分配密集排名。
  • 最後,透過從 emp_df 中選擇特定列(即「empno」、「ename」、「job」、「deptno」和「sal」)並新增一個新欄位「dense_rank」來建立一個名為「dense_ranking_df」的新 DataFrame包含由視窗函數計算的密集排名值。
  • 最後,以表格格式顯示產生的 DataFrame。

輸出:

結果在薪資方面有一個密集的排名。

Q3。每個部門內的行編號。

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

PySpark 程式碼的方法

  • 第一行使用 Window.partitionBy() 和 Window.orderBy() 函數定義計算的視窗規格。此視窗由 deptno 列分區,並按 sal 列降序排列。
  • 第二行建立一個名為「row_num_df」的新 DataFrame,它是「emp_df」的投影以及一個名為「row_num」的附加列,它包含行號詳細資訊。
  • show() 函數顯示產生的 DataFrame,其中顯示每位員工的 empno、ename、job、deptno、sal 和 row_num 欄位。

輸出:

產出將根據每位員工的薪資顯示其部門內每位員工的行號。

Q4。運作各部門內的總薪資。

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

途徑 對於 PySpark 代碼

  • 首先,使用「Window.partitionBy()」和「Window.orderBy()」方法定義視窗規格。 「partitionBy()」方法以「deptno」欄位對資料進行分區,而「orderBy()」方法則依「sal」列依降序對資料進行排序。
  • 接下來,使用「over()」方法將「sum()」函數應用於「sal」列,以計算每個部門內工資的運行總計。結果將位於一個名為「running_sum_sal_df」的新 DataFrame 中,其中包含欄位「empno」、「ename」、「job」、「deptno」、「sal」和「running_total」。
  • 最後,在「running_sum_sal_df」DataFrame 上呼叫「show()」方法來顯示查詢的輸出。產生的 DataFrame 顯示每位員工的總薪資以及其他詳細信息,例如姓名、部門編號和工作。

輸出:

輸出將包含每個部門薪資資料的運行總計。

Q5:各部門內的下一個薪資。

為了找出每個部門內的下一個薪水,我們使用 LEAD 函數。 

Lead() 視窗函數有助於取得視窗分區的下一行中的表達式的值。它為每個輸入列傳回一列,其中每列將包含視窗分區內目前行上方的偏移行的輸入列的值。 Lead 函數的語法為:- Lead(col, offset=1, default=None)。

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

PySpark 程式碼的方法

  • 首先,視窗函數有助於按部門編號 (deptno) 對 DataFrame 的行進行分區,並在每個分區內按降序排列工資。
  • 然後,lead() 函數會套用於每個分區中有序的「sal」列,以傳回下一個員工的薪資(偏移量為 1),如果沒有下一個員工,則預設值為 0。
  • 產生的 DataFrame 'next_salary_df' 包含員工編號 (empno)、姓名 (ename)、職位 (job)、部門編號 (deptno)、目前薪資 (sal) 和下一個薪資 (next_val) 的欄位。

輸出:

產出包含依薪資降序排列的部門中下一位員工的薪資。 

Q6.各部門以前的工資。

為了計算之前的工資,我們使用LAG函數。

lag 函數傳回視窗分區內目前行之前給定偏移處的表達式的值。 lag 函式的語法為:lag(expr, offset=1, default=None).over(windowSpec)。

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

PySpark 程式碼的方法

  • window.partitionBy(col('deptno')) 指定視窗分區。這意味著視窗功能是為每個部門單獨工作的。
  • 然後 orderBy(col('sal').desc()) 指定工資的順序,並將按降序排列每個部門內的工資。
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') 在 DataFrame 'prev_sal_df' 中建立一個名為 prev_val 的新欄位。
  • 對於每一行,此列包含 windowSpec 定義的視窗內前一行的「sal」列的值。
  • offset=1 參數表示前一行應該是目前行之前的一行,default=0 指定每個分區中第一行的預設值(因為第一行沒有前一行)。
  • 最後,prev_sal_df.show() 顯示產生的 DataFrame。

輸出:

產出表示每個部門內每位員工之前的薪資(基於薪資降序排列)。

Q7.每個部門內的第一薪水並與每個部門內的每個成員進行比較。

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

PySpark 程式碼的方法

  • 首先,建立一個 WindowSpec 對象,該物件按部門編號 (deptno) 對資料進行分區,並按工資 (sal) 降序排列資料。
  • 然後將first() 分析函數應用於windowSpec 定義的視窗上的「sal」欄位。此函數傳回每個分區(即每個 deptno 組)內「sal」列的第一個值,按「sal」降序排序。產生的欄位有一個新名稱“first_val”。
  • 現在將產生的 DataFrame(其中包含所選列和新列“first_val”)分配給名為“first_value_df”的新變量,該列根據工資值的降序顯示每個部門的第一高工資。

輸出:

輸出顯示員工 DataFrame 中每個部門的第一高薪。

結論

在這篇文章中,我們學習了視窗函數。 Spark SQL 有三種視窗函數:排名函數、聚合函數和值函數。使用此函數,我們對數據集進行了研究,以找到一些重要且有價值的見解。 Spark Window Functions 提供強大的資料分析工具,例如排名、分析和價值計算。無論是按部門分析薪資洞察還是使用 PySpark 和 SQL 來使用實際範例,這些函數都為 Spark 中有效的資料處理和分析提供了必要的工具。

關鍵要點

  • 我們了解了視窗函數,並使用 Spark SQL 和 PySpark DataFrame API 來使用它們。
  • 我們使用諸如rank、dense_rank、row_number、lag、lead、groupBy、partitionBy 等函數來提供正確的分析。
  • 我們還看到了問題的詳細逐步解決方案,並分析了每個問題陳述末尾的輸出。

本案例研究可幫助您更好地理解 PySpark 功能。如果您有任何意見或疑問,請在下面評論。與我聯繫 LinkedIn 以供進一步討論。保持學習!!!

本文所示的媒體不屬於 Analytics Vidhya 所有,其使用由作者自行決定。

現貨圖片

最新情報

現貨圖片