ゼファーネットのロゴ

PySpark でのウィンドウ関数の操作

日付:

概要

PySpark のウィンドウ関数について学ぶのは難しいかもしれませんが、努力する価値はあります。ウィンドウ関数はデータを分析するための強力なツールであり、他の方法では得られなかった洞察を得るのに役立ちます。 Spark でのウィンドウ関数の使用方法を理解する。あなたはあなたのものを取ることができます データ分析 スキルを次のレベルに引き上げ、より多くの情報に基づいた意思決定を行います。大きなものでも小さなものでも データセット, Spark で Window Functions を学習すると、新しくエキサイティングな方法でデータを操作および分析できるようになります。

PySpark のウィンドウ関数

このブログでは、まずウィンドウ関数の概念を理解し、次にそれを Spark SQL および PySpark DataFrame API で使用する方法について説明します。この記事を読み終えるまでに、実際のデータセットでウィンドウ関数を使用する方法を理解し、ビジネスに重要な洞察を得ることができるようになります。

学習目標

  • ウィンドウ関数の概念を理解します。
  • データセットを使用したウィンドウ関数の操作。
  • ウィンドウ関数を使用して洞察を見つけます。
  • Spark SQL と DataFrame API を使用してウィンドウ関数を操作します。

この記事は、の一部として公開されました データサイエンスブログ。

目次

ウィンドウ関数とは何ですか?

ウィンドウ関数は、相互に関連する行のグループ内のデータを分析するのに役立ちます。これにより、ユーザーは、いくつかの分割および順序付け基準に基づいて、相互に関連付けられたデータフレームまたはデータセットの行に対して複雑な変換を実行できるようになります。

ウィンドウ関数は、一連のパーティション化列によって定義されたデータフレームまたはデータセットの特定のパーティションに対して動作します。の ORDER BY 句は、ウィンドウ関数内のデータを分割して特定の順序に配置します。次に、ウィンドウ関数は、ウィンドウ フレームで指定されているように、現在の行と、先行する「および」/「または」後の行のサブセットを含む行のスライディング ウィンドウで計算を実行します。

PySpark でのウィンドウ関数の操作

ウィンドウ関数の一般的な例には、移動平均の計算、特定の列またはグループに基づいた行のランキングまたは並べ替えなどがあります。 コラム、累計を計算し、行のグループ内の最初または最後の値を見つけます。 Spark の強力なウィンドウ関数を使用すると、ユーザーは複雑な分析と集計を実行できます。 大規模なデータセット 比較的簡単にできるため、大企業向けの人気ツールとなっています。 データ処理 と分析。

"

SQL のウィンドウ関数

Spark SQL は 3 種類のウィンドウ関数をサポートしています。

  • ランキング機能:- これらの関数は、結果セットのパーティション内の各行にランクを割り当てます。たとえば、ROW_NUMBER() 関数は、パーティション内の各行に一意の連続番号を与えます。
  • 分析機能:- これらの関数は、行のウィンドウ全体で集計値を計算します。たとえば、SUM() 関数は、行のウィンドウにわたる列の合計を計算します。
  • 値関数:- これらの関数は、同じパーティション内の他の行の値に基づいて、パーティション内の各行の分析値を計算します。たとえば、LAG() 関数は、パーティション内の前の行の列の値を返します。

データフレームの作成

実際にさまざまなウィンドウ関数を操作できるように、サンプル データフレームを作成します。また、このデータとウィンドウ関数を利用して、いくつかの質問に答えてみます。

データフレームには、名前、役職、従業員番号、入社日、給与などの従業員の詳細が含まれています。合計で次の 8 つの列があります。

  • 'empno': この列には従業員番号が含まれます。
  • 'ename': この列には従業員名が含まれます。
  • 「job」: この列には、従業員の役職に関する情報が含まれます。
  • 'hiredate': この列には従業員の雇用日が表示されます。
  • 'sal': この列には給与の詳細が含まれます。
  • 'comm': この列には、従業員のコミッションの詳細が含まれます (ある場合)。
  • 'deptno': 従業員が所属する部門番号がこの列にあります。
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

次にスキーマを確認します。

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

データフレーム「emp_df」の一時ビューを「emp」という名前で作成します。これにより、Spark SQL の SQL 構文を使用して、テーブルであるかのように DataFrame をクエリできるようになります。一時ビューは、Spark セッションの間のみ有効です。

emp_df.createOrReplaceTempView("emp")

ウィンドウ関数を使用した問題ステートメントの解決

ここでは、Windows 関数を使用していくつかの問題ステートメントを解決します。

Q1.各部門内の給与をランク付けします。

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

PySpark コードのアプローチ

  • Window 関数は、partitionBy(col('deptno')) を使用して部門番号ごとにデータを分割し、orderBy(col('sal').desc()) を使用して各パーティション内のデータを給与の降順に並べ替えます。変数 windowSpec は、最終的なウィンドウ仕様を保持します。
  • 「emp_df」は、empno、ename、job、deptno、sal の列を含む従業員データを含むデータフレームです。
  • ランク関数は、select ステートメント内の 'F.rank().over(windowSpec)' を使用して給与列に適用されます。結果の列には、「rank」という別名が付けられます。
  • これにより、empno、ename、job、deptno、salary を含むデータフレーム「ranking_result_df」が作成されます。また、部門内での従業員の給与のランクを表す新しい列「ランク」も追加されました。

出力:

その成果には部門ごとの給与ランクがついています。

Q2.各部門内での給与のランクが密です。

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

PySpark コードのアプローチ

  • まず、Window 関数を使用してウィンドウ仕様を作成します。これは、「emp_df」データフレームを deptno で分割し、「sal」列の降順で並べ替えます。
  • 次に、dense_rank() 関数がウィンドウ仕様に適用され、ソート順に基づいて各パーティション内の各行に密ランクが割り当てられます。
  • 最後に、emp_df から特定の列 (つまり、「empno」、「ename」、「job」、「deptno」、および「sal」) を選択し、新しい列「dense_rank」を追加することによって、「dense_ranking_df」という新しいデータフレームが作成されます。ウィンドウ関数によって計算された密なランキング値が含まれます。
  • 最後に、結果の DataFrame を表形式で表示します。

出力:

結果は給与面での順位が濃い。

Q3.各部門内の行に番号を付けます。

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

PySpark コードのアプローチ

  • 最初の行では、Window.partitionBy() 関数と Window.orderBy() 関数を使用して計算のためのウィンドウ仕様を定義します。このウィンドウは deptno 列で分割され、sal 列で降順に並べられています。
  • 2 行目は、「row_num_df」という新しい DataFrame を作成します。これは、「row_num」という追加の列を持つ「emp_df」の射影であり、行番号の詳細が含まれています。
  • show() 関数は、各従業員の empno、ename、job、deptno、sal、および row_num 列を示す結果の DataFrame を表示します。

出力:

出力には、給与に基づいた部門内の各従業員の行番号が含まれます。

Q4.各部門内の給与の合計を計算します。

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

アプローチ PySpark コードの場合

  • まず、「Window.partitionBy()」メソッドと「Window.orderBy()」メソッドを使用してウィンドウ仕様を定義します。 「partitionBy()」メソッドは「deptno」列でデータを分割し、「orderBy()」メソッドは「sal」列でデータを降順に並べます。
  • 次に、「over()」メソッドを使用して「sal」列に「sum()」関数を適用し、各部門内の給与の累計を計算します。結果は、「ru​​nning_sum_sal_df」という名前の新しい DataFrame に入ります。これには、列「empno」、「ename」、「job」、「deptno」、「sal」、「running_total」が含まれます。
  • 最後に、「running_sum_sal_df」データフレームで「show()」メソッドが呼び出され、クエリの出力が表示されます。結果として得られる DataFrame には、各従業員の給与の累計と、名前、部門番号、職務などのその他の詳細が表示されます。

出力:

出力には、各部門の給与データの累計が含まれます。

Q5: 各部門内の次の給与。

各部門内の次の給与を見つけるには、LEAD 関数を使用します。 

lead() ウィンドウ関数は、ウィンドウ パーティションの次の行の式の値を取得するのに役立ちます。各入力列の列を返します。各列には、ウィンドウ パーティション内の現在の行の上のオフセット行の入力列の値が含まれます。 lead 関数の構文は次のとおりです: - lead(col, offset=1,default=None)。

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

PySpark コードのアプローチ

  • まず、ウィンドウ関数は、DataFrame の行を部門番号 (deptno) で分割し、各パーティション内で給与を降順に並べ替えるのに役立ちます。
  • 次に、 lead() 関数が各パーティション内の順序付けされた 'sal' 列に適用され、次の従業員の給与 (オフセット 1) が返されます。次の従業員がいない場合のデフォルト値は 0 です。
  • 結果のデータフレーム「next_salary_df」には、従業員番号 (empno)、名前 (ename)、役職 (job)、部門番号 (deptno)、現在の給与 (sal)、および次の給与 (next_val) の列が含まれています。

出力:

出力には、給与の降順に基づいて、部門内の次の従業員の給与が含まれます。 

Q6.各部門における以前の給与。

以前の給与を計算するには、LAG 関数を使用します。

lag 関数は、ウィンドウ パーティション内の現在の行の前の指定されたオフセットにある式の値を返します。 lag 関数の構文は次のとおりです:- lag(expr, offset=1,default=None).over(windowSpec)。

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

PySpark コードのアプローチ

  • window.partitionBy(col('deptno')) はウィンドウ パーティションを指定します。部門ごとに窓口機能が別々に働いているということです。
  • 次に、 orderBy(col('sal').desc()) で給与の順序を指定し、各部門内の給与を降順に並べます。
  • F.lag('sal', offset=1,default=0).over(windowSpec).alias('prev_val') は、DataFrame 'prev_sal_df' に prev_val という新しい列を作成します。
  • 各行について、この列には、windowSpec で定義されたウィンドウ内の前の行の「sal」列の値が含まれます。
  • offset=1 パラメータは、前の行が現在の行の 0 行前であることを示し、default=XNUMX は、各パーティションの最初の行のデフォルト値を指定します (最初の行には前の行がないため)。
  • 最後に、 prev_sal_df.show() は結果の DataFrame を表示します。

出力:

出力は、給与の降順に基づいて、各部門内の各従業員の以前の給与を表します。

Q7.各部門内の最初の給与と、各部門内のすべてのメンバーとの比較。

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

PySpark コードのアプローチ

  • まず、データを部門番号 (deptno) で分割し、給与 (sal) で降順に並べる WindowSpec オブジェクトを作成します。
  • 次に、first() 分析関数を windowSpec で定義されたウィンドウ上の 'sal' 列に適用します。この関数は、各パーティション (つまり、各 deptno グループ) 内の 'sal' 列の最初の値を 'sal' の降順で返します。結果の列には、「first_val」という新しい名前が付けられます。
  • 次に、選択した列と、給与値の降順に基づいて各部門の最初の最高給与を示す新しい列「first_val」を含む結果のデータフレームを、「first_value_df」という新しい変数に割り当てます。

出力:

出力には、従業員データフレーム内の各部門の最初の最高給与が表示されます。

まとめ

この記事では、ウィンドウ関数について学びます。 Spark SQL には、ランキング関数、集計関数、値関数の 3 種類のウィンドウ関数があります。この関数を使用して、データセットに取り組み、いくつかの重要で貴重な洞察を見つけました。 Spark Window Functions は、ランキング、分析、値の計算などの強力なデータ分析ツールを提供します。部門ごとの給与の洞察を分析する場合でも、PySpark と SQL を使用した実践的な例を使用する場合でも、これらの関数は、Spark での効果的なデータ処理と分析に不可欠なツールを提供します。

主要な取り組み

  • 私たちはウィンドウ関数について学び、Spark SQL と PySpark DataFrame API を使用してそれらを操作しました。
  • 適切な分析を提供するために、rank、dense_rank、row_number、lag、lead、groupBy、partitionBy などの関数を使用します。
  • また、問題に対する詳細な段階的な解決策を確認し、各問題ステートメントの最後にある出力を分析しました。

このケーススタディは、PySpark 関数をより深く理解するのに役立ちます。ご意見やご質問がございましたら、以下にコメントしてください。で私とつながってください LinkedIn さらなる議論のために。学び続けます!!!

この記事に示されているメディアは Analytics Vidhya が所有するものではなく、著者の裁量で使用されています。

スポット画像

最新のインテリジェンス

スポット画像