和风网标志

在 PySpark 中使用窗口函数

日期:

介绍

了解 PySpark 中的窗口函数可能具有挑战性,但值得付出努力。窗口函数是分析数据的强大工具,可以帮助您获得在其他情况下可能看不到的见解。通过了解如何在 Spark 中使用窗口函数;你可以带着你的 数据分析 技能提升到新的水平并做出更明智的决策。无论您是与大型企业还是小型企业合作 数据集,学习 Spark 中的窗口函数将使您能够以令人兴奋的新方式操作和分析数据。

PySpark 中的窗口函数

在本博客中,我们将首先了解窗口函数的概念,然后讨论如何将它们与 Spark SQL 和 PySpark DataFrame API 一起使用。因此,在本文结束时,您将了解如何将窗口函数与真实数据集结合使用,并获得重要的业务见解。

学习目标

  • 理解窗函数的概念。
  • 使用数据集处理窗口函数。
  • 使用窗口函数找出见解。
  • 使用 Spark SQL 和 DataFrame API 来处理窗口函数。

这篇文章是作为 数据科学博客马拉松。

目录

什么是窗函数?

窗口函数有助于分析一组彼此相关的行中的数据。它们使用户能够根据某些分区和排序标准对彼此关联的数据框或数据集的行执行复杂的转换。

窗口函数对由一组分区列定义的数据框或数据集的特定分区进行操作。这 ORDER BY 子句对窗口函数中的数据进行分区,以按特定顺序排列数据。然后,窗口函数对行滑动窗口执行计算,其中包括当前行以及前面“和”/“或”后面行的子集,如窗口框架中指定的。

在 PySpark 中使用窗口函数

窗口函数的一些常见示例包括计算移动平均值、基于特定列或组对行进行排名或排序 ,计算运行总计,并查找一组行中的第一个或最后一个值。借助Spark强大的窗口函数,用户可以进行复杂的分析和聚合 大型数据集 相对轻松,使其成为大型企业的流行工具 数据处理 和分析。

"

SQL 中的窗口函数

Spark SQL支持三种窗口函数:

  • 排名功能:- 这些函数为结果集分区内的每一行分配一个排名。例如,ROW_NUMBER() 函数为分区内的每一行提供唯一的序列号。
  • 分析功能:- 这些函数计算行窗口上的聚合值。例如,SUM() 函数计算行窗口中列的总和。
  • 价值函数:- 这些函数根据同一分区中其他行的值计算分区中每一行的分析值。例如,LAG() 函数返回分区中前一行的列值。

数据框创建

我们将创建一个示例数据框,以便我们可以实际使用不同的窗口函数。我们还将尝试借助这些数据和窗口函数来回答一些问题。

数据框包含员工详细信息,例如姓名、职务、员工编号、雇用日期、薪资等。总共有 8 列,如下所示:

  • 'empno':此列包含员工的编号。
  • 'ename':此列包含员工姓名。
  • “职位”:此列包含有关员工职位的信息。
  • “雇用日期”:此列显示员工的雇用日期。
  • 'sal':此列中包含薪资详细信息。
  • “comm”:此列包含员工佣金详细信息(如果有)。
  • 'deptno':员工所属的部门号在此列中。
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

现在我们将检查架构:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

创建名为“emp”的数据帧“emp_df”的临时视图。它允许我们在 Spark SQL 中使用 SQL 语法查询 DataFrame,就好像它是一个表一样。临时视图仅在 Spark 会话期间有效。

emp_df.createOrReplaceTempView("emp")

使用窗口函数解决问题陈述

在这里,我们将使用 Windows 函数解决几个问题陈述:

Q1.对每个部门的薪资进行排名。

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

PySpark 代码的方法

  • Window 函数使用partitionBy(col('deptno')) 按部门编号对数据进行分区,然后使用orderBy(col('sal').desc()) 按工资降序对每个分区内的数据进行排序。变量 windowSpec 保存最终的窗口规格。
  • 'emp_df' 是包含员工数据的数据框,包括 empno、ename、job、deptno 和 sal 列。
  • 在 select 语句中使用“F.rank().over(windowSpec)”将排名函数应用于工资列。生成的列的别名为“rank”。
  • 它将创建一个数据框“ranking_result_df”,其中包括 empno、ename、job、deptno 和薪资。它还具有一个新列“等级”,代表员工在其部门内的工资等级。

输出:

结果有每个部门的薪资排名。

Q2。各部门内的薪资等级密集。

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

PySpark 代码的方法

  • 首先,使用 Window 函数创建一个窗口规范,该函数按 deptno 分区“emp_df”DataFrame,并按“sal”列降序排列。
  • 然后,dense_rank() 函数应用于窗口规范,它根据排序顺序为每个分区内的每一行分配密集排名。
  • 最后,通过从 emp_df 中选择特定列(即“empno”、“ename”、“job”、“deptno”和“sal”)并添加一个新列“dense_rank”来创建一个名为“dense_ranking_df”的新 DataFrame包含由窗口函数计算的密集排名值。
  • 最后,以表格格式显示生成的 DataFrame。

输出:

结果在薪资方面有一个密集的排名。

Q3。每个部门内的行编号。

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

PySpark 代码的方法

  • 第一行使用 Window.partitionBy() 和 Window.orderBy() 函数定义计算的窗口规范。该窗口由 deptno 列分区,并按 sal 列降序排列。
  • 第二行创建一个名为“row_num_df”的新 DataFrame,它是“emp_df”的投影以及一个名为“row_num”的附加列,它包含行号详细信息。
  • show() 函数显示生成的 DataFrame,其中显示每个员工的 empno、ename、job、deptno、sal 和 row_num 列。

输出:

输出将根据每个员工的工资显示其部门内每个员工的行号。

Q4。运行各部门内的工资总额。

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

途径 对于 PySpark 代码

  • 首先,使用“Window.partitionBy()”和“Window.orderBy()”方法定义窗口规范。 “partitionBy()”方法按“deptno”列对数据进行分区,而“orderBy()”方法按“sal”列按降序对数据进行排序。
  • 接下来,使用“over()”方法将“sum()”函数应用于“sal”列,以计算每个部门内工资的运行总计。结果将位于一个名为“running_sum_sal_df”的新 DataFrame 中,其中包含列“empno”、“ename”、“job”、“deptno”、“sal”和“running_total”。
  • 最后,在“running_sum_sal_df”DataFrame 上调用“show()”方法来显示查询的输出。生成的 DataFrame 显示每个员工的工资总额和其他详细信息,例如姓名、部门编号和工作。

输出:

输出将包含每个部门薪资数据的运行总计。

Q5:各部门内的下一个薪资。

为了查找每个部门内的下一个薪水,我们使用 LEAD 函数。 

Lead() 窗口函数有助于获取窗口分区的下一行中的表达式的值。它为每个输入列返回一列,其中每列将包含窗口分区内当前行上方的偏移行的输入列的值。 Lead 函数的语法为:- Lead(col, offset=1, default=None)。

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

PySpark 代码的方法

  • 首先,窗口函数有助于按部门编号 (deptno) 对 DataFrame 的行进行分区,并在每个分区内按降序排列工资。
  • 然后,lead() 函数应用于每个分区中有序的“sal”列,以返回下一个员工的工资(偏移量为 1),如果没有下一个员工,则默认值为 0。
  • 生成的 DataFrame 'next_salary_df' 包含员工编号 (empno)、姓名 (ename)、职务 (job)、部门编号 (deptno)、当前工资 (sal) 和下一个工资 (next_val) 的列。

输出:

输出包含按工资降序排列的部门中下一位员工的工资。 

Q6.各部门以前的工资。

为了计算之前的工资,我们使用LAG函数。

lag 函数返回窗口分区内当前行之前给定偏移处的表达式的值。 lag 函数的语法为:lag(expr, offset=1, default=None).over(windowSpec)。

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

PySpark 代码的方法

  • window.partitionBy(col('deptno')) 指定窗口分区。这意味着窗口功能是为每个部门单独工作的。
  • 然后 orderBy(col('sal').desc()) 指定工资的顺序,并将按降序排列每个部门内的工资。
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') 在 DataFrame 'prev_sal_df' 中创建一个名为 prev_val 的新列。
  • 对于每一行,此列包含 windowSpec 定义的窗口内前一行的“sal”列的值。
  • offset=1 参数表示前一行应该是当前行之前的一行,default=0 指定每个分区中第一行的默认值(因为第一行没有前一行)。
  • 最后,prev_sal_df.show() 显示生成的 DataFrame。

输出:

输出表示每个部门内每个员工之前的工资(基于工资降序排列)。

Q7.每个部门内的第一薪水并与每个部门内的每个成员进行比较。

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

PySpark 代码的方法

  • 首先,创建一个 WindowSpec 对象,该对象按部门编号 (deptno) 对数据进行分区,并按工资 (sal) 降序排列数据。
  • 然后将first() 分析函数应用于windowSpec 定义的窗口上的“sal”列。此函数返回每个分区(即每个 deptno 组)内“sal”列的第一个值,按“sal”降序排序。生成的列有一个新名称“first_val”。
  • 现在将生成的 DataFrame(其中包含所选列和新列“first_val”)分配给名为“first_value_df”的新变量,该新列根据工资值的降序显示每个部门的第一高工资。

输出:

输出显示员工 DataFrame 中每个部门的第一高薪。

结论

在这篇文章中,我们学习了窗口函数。 Spark SQL 具有三种窗口函数:排名函数、聚合函数和值函数。使用此函数,我们对数据集进行了研究,以找到一些重要且有价值的见解。 Spark Window Functions 提供强大的数据分析工具,例如排名、分析和价值计算。无论是按部门分析薪资洞察还是使用 PySpark 和 SQL 来使用实际示例,这些函数都为 Spark 中有效的数据处理和分析提供了必要的工具。

关键精华

  • 我们了解了窗口函数,并使用 Spark SQL 和 PySpark DataFrame API 来使用它们。
  • 我们使用诸如rank、dense_rank、row_number、lag、lead、groupBy、partitionBy 等函数来提供正确的分析。
  • 我们还看到了问题的详细逐步解决方案,并分析了每个问题陈述末尾的输出。

本案例研究可帮助您更好地理解 PySpark 功能。如果您有任何意见或疑问,请在下面评论。与我联系 LinkedIn 以供进一步讨论。保持学习!!!

本文中显示的媒体不属于 Analytics Vidhya 所有,其使用由作者自行决定。

现货图片

VC咖啡厅

生命科学VC

最新情报

VC咖啡厅

生命科学VC

现货图片