제퍼넷 로고

PySpark에서 창 함수 작업

시간

개요

PySpark의 창 기능을 배우는 것은 어려울 수 있지만 노력할 가치가 있습니다. 창 함수는 데이터 분석을 위한 강력한 도구이며 다른 방법으로는 볼 수 없었던 통찰력을 얻는 데 도움이 될 수 있습니다. Spark에서 Window 함수를 사용하는 방법을 이해함으로써; 당신은 당신의 걸릴 수 있습니다 데이터 분석 기술을 한 단계 더 발전시키고 더 많은 정보를 바탕으로 결정을 내릴 수 있습니다. 당신이 큰 일을 하든 작은 일을 하든 데이터 세트, Spark의 Window 함수를 학습하면 새롭고 흥미로운 방식으로 데이터를 조작하고 분석할 수 있습니다.

PySpark의 창 기능

이번 블로그에서는 먼저 윈도우 함수의 개념을 이해한 다음 이를 Spark SQL 및 PySpark DataFrame API와 함께 사용하는 방법에 대해 논의하겠습니다. 따라서 이 기사를 마치면 실제 데이터 세트에서 창 기능을 사용하는 방법을 이해하고 비즈니스에 필수적인 통찰력을 얻을 수 있습니다.

학습 목표

  • 창 함수의 개념을 이해합니다.
  • 데이터 세트를 사용하여 창 기능으로 작업합니다.
  • 창 기능을 사용하여 통찰력을 알아보세요.
  • Spark SQL 및 DataFrame API를 사용하여 창 기능을 사용하세요.

이 기사는 데이터 과학 블로그.

차례

창 기능이란 무엇입니까?

창 기능은 서로 관련된 행 그룹 내의 데이터를 분석하는 데 도움이 됩니다. 이를 통해 사용자는 일부 분할 및 순서 지정 기준에 따라 서로 연결된 데이터 프레임 또는 데이터 세트의 행에 대해 복잡한 변환을 수행할 수 있습니다.

창 함수는 분할 열 집합으로 정의된 데이터 프레임 또는 데이터 세트의 특정 파티션에서 작동합니다. 그만큼 주문 절은 윈도우 함수에서 데이터를 분할하여 특정 순서로 정렬합니다. 그런 다음 창 함수는 창 프레임에 지정된 대로 현재 행과 이전 '및'/'또는' 다음 행의 하위 집합을 포함하는 행의 슬라이딩 창에서 계산을 수행합니다.

PySpark에서 창 함수 작업

창 함수의 일반적인 예로는 이동 평균 계산, 특정 열이나 그룹을 기준으로 행 순위 지정 또는 정렬 등이 있습니다. , 누계 계산, 행 그룹의 첫 번째 또는 마지막 값 찾기 등이 있습니다. Spark의 강력한 창 기능을 통해 사용자는 복잡한 분석 및 집계를 수행할 수 있습니다. 대형 데이터 세트 비교적 쉽게 사용할 수 있어 대규모 작업에 널리 사용되는 도구입니다. 데이터 처리 그리고 분석.

"

SQL의 창 함수

Spark SQL은 세 가지 종류의 창 함수를 지원합니다.

  • 순위 기능:- 이러한 함수는 결과 집합의 파티션 내 각 행에 순위를 할당합니다. 예를 들어, ROW_NUMBER() 함수는 파티션 내의 각 행에 고유한 일련 번호를 제공합니다.
  • 분석 기능:- 이러한 함수는 행 창에 걸쳐 집계 값을 계산합니다. 예를 들어 SUM() 함수는 행 창에 걸쳐 열의 합계를 계산합니다.
  • 가치 기능:- 이러한 함수는 동일한 파티션에 있는 다른 행의 값을 기반으로 파티션의 각 행에 대한 분석 값을 계산합니다. 예를 들어 LAG() 함수는 파티션의 이전 행에서 열 값을 반환합니다.

DataFrame 생성

다양한 창 기능을 실제로 사용할 수 있도록 샘플 데이터프레임을 생성하겠습니다. 또한 이 데이터와 창 기능을 사용하여 몇 가지 질문에 답해 보겠습니다.

데이터 프레임에는 이름, 직위, 직원 번호, 고용 날짜, 급여 등과 같은 직원 세부 정보가 있습니다. 총계에는 다음과 같은 8개의 열이 있습니다.

  • 'empno': 이 열에는 직원 번호가 포함됩니다.
  • 'ename': 이 열에는 직원 이름이 있습니다.
  • '직업': 이 열에는 직원의 직위에 대한 정보가 포함됩니다.
  • 'hireddate': 이 열에는 직원의 채용 날짜가 표시됩니다.
  • 'sal': 이 열에는 급여 세부정보가 포함되어 있습니다.
  • 'comm': 이 열에는 직원 커미션 세부정보가 있습니다(있는 경우).
  • 'deptno': 직원이 속한 부서 번호가 이 열에 있습니다.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

이제 스키마를 확인하겠습니다.

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

이름이 "emp"인 DataFrame 'emp_df'의 임시 뷰를 만듭니다. 이를 통해 마치 테이블인 것처럼 Spark SQL의 SQL 구문을 사용하여 DataFrame을 쿼리할 수 있습니다. 임시 보기는 Spark 세션 기간 동안에만 유효합니다.

emp_df.createOrReplaceTempView("emp")

창 함수를 사용하여 문제 설명 해결

여기서는 Windows 기능을 사용하여 몇 가지 문제를 해결합니다.

Q1. 각 부서 내 급여 순위를 매기세요.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

PySpark 코드 접근 방식

  • Window 함수는 partitionBy(col('deptno'))를 사용하여 부서 번호별로 데이터를 분할한 다음 orderBy(col('sal').desc())를 사용하여 급여별로 내림차순으로 각 파티션 내의 데이터를 정렬합니다. windowSpec 변수는 최종 창 사양을 보유합니다.
  • 'emp_df'는 empno, ename, job, deptno 및 sal에 대한 열을 포함하여 직원 데이터를 포함하는 데이터 프레임입니다.
  • select 문 내에서 'F.rank().over(windowSpec)'를 사용하여 급여 열에 순위 함수를 적용합니다. 결과 열에는 'rank'라는 별칭 이름이 있습니다.
  • empno, ename, job, deptno 및 급여를 포함하는 'ranking_result_df' 데이터프레임을 생성합니다. 또한 해당 부서 내에서 직원의 급여 순위를 나타내는 새로운 열 '순위'가 있습니다.

출력:

결과는 부서별 급여 순위입니다.

Q2. 각 부서 내에서 급여 순위를 촘촘하게 매깁니다.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

PySpark 코드 접근 방식

  • 먼저 'emp_df' DataFrame을 deptno별로 분할하고 'sal' 열의 내림차순으로 정렬하는 Window 함수를 사용하여 창 사양을 만듭니다.
  • 그런 다음, dark_rank() 함수가 창 사양에 적용되어 정렬된 순서에 따라 각 파티션 내의 각 행에 밀집 순위를 할당합니다.
  • 마지막으로 emp_df에서 특정 열(예: 'empno', 'ename', 'job', 'deptno' 및 'sal')을 선택하고 'dense_rank'라는 새 열을 추가하여 'dense_ranking_df'라는 새 DataFrame이 생성됩니다. 윈도우 함수로 계산된 밀집 순위 값을 포함합니다.
  • 마지막으로 결과 DataFrame을 표 형식으로 표시합니다.

출력:

결과는 급여 측면에서 밀집 순위를 나타냅니다.

Q3. 각 부서 내의 행에 번호를 매깁니다.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

PySpark 코드 접근 방식

  • 첫 번째 줄은 Window.partitionBy() 및 Window.orderBy() 함수를 사용하여 계산을 위한 창 사양을 정의합니다. 이 창은 deptno 열로 분할되고 sal 열을 기준으로 내림차순으로 정렬됩니다.
  • 두 번째 줄은 'row_num'이라는 추가 열이 있는 'emp_df'의 투영인 'row_num_df'라는 새 DataFrame을 생성하며 여기에는 행 번호 세부 정보가 포함됩니다.
  • show() 함수는 각 직원의 empno, ename, job, deptno, sal 및 row_num 열을 보여주는 결과 DataFrame을 표시합니다.

출력:

출력에는 급여를 기준으로 부서 내 각 직원의 행 번호가 포함됩니다.

Q4. 각 부서 내 급여의 합계를 실행합니다.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

Approach PySpark 코드의 경우

  • 먼저 “Window.partitionBy()” 및 “Window.orderBy()” 메서드를 사용하여 창 사양을 정의합니다. "partitionBy()" 메서드는 "deptno" 열을 기준으로 데이터를 분할하는 반면, "orderBy()" 메서드는 "sal" 열을 기준으로 내림차순으로 데이터를 정렬합니다.
  • 다음으로, "over()" 메서드를 사용하여 "sal" 열에 "sum()" 함수를 적용하여 각 부서 내 급여 누계를 계산합니다. 결과는 'empno', 'ename', 'job', 'deptno', 'sal' 및 'running_total' 열을 포함하는 'running_sum_sal_df'라는 새 DataFrame에 있습니다.
  • 마지막으로 “running_sum_sal_df” DataFrame에서 “show()” 메서드가 호출되어 쿼리 출력을 표시합니다. 결과 DataFrame에는 각 직원의 급여 누계와 이름, 부서 번호, 직업과 같은 기타 세부 정보가 표시됩니다.

출력:

출력에는 각 부서의 급여 데이터의 누계가 포함됩니다.

Q5: 부서별 차기 연봉은 어떻게 되나요?

각 부서 내 다음 급여를 찾기 위해 LEAD 함수를 사용합니다. 

Lead() 창 함수는 창 파티션의 다음 행에 있는 표현식의 값을 가져오는 데 도움이 됩니다. 각 입력 열에 대한 열을 반환하며, 각 열에는 창 파티션 내 현재 행 위의 오프셋 행에 대한 입력 열 값이 포함됩니다. 리드 함수의 구문은 다음과 같습니다.- 리드(col, 오프셋=1, 기본값=없음).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

PySpark 코드 접근 방식

  • 첫째, 창 기능은 DataFrame의 행을 부서 번호(deptno)별로 분할하고 각 분할 내에서 급여를 내림차순으로 정렬하는 데 도움이 됩니다.
  • 그런 다음 각 파티션 내의 정렬된 'sal' 열에 Lead() 함수를 적용하여 다음 직원의 급여를 반환합니다(오프셋 1). 다음 직원이 없는 경우 기본값은 0입니다.
  • 결과 DataFrame 'next_salary_df'에는 직원 번호(empno), 이름(ename), 직위(job), 부서 번호(deptno), 현재 급여(sal) 및 다음 급여(next_val)에 대한 열이 포함됩니다.

출력:

출력에는 급여가 내림차순으로 부서 내 다음 직원의 급여가 포함됩니다. 

Q6. 각 부서 내 이전 급여.

이전 급여를 계산하려면 LAG 함수를 사용합니다.

lag 함수는 창 파티션 내 현재 행 앞의 지정된 오프셋에서 표현식 값을 반환합니다. lag 함수의 구문은 다음과 같습니다:- lag(expr, offset=1, default=None).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

PySpark 코드 접근 방식

  • window.partitionBy(col('deptno')) 는 창 파티션을 지정합니다. 부서별로 윈도우 기능이 별도로 작동한다는 뜻이다.
  • 그런 다음 orderBy(col('sal').desc())는 급여 순서를 지정하고 각 부서 내 급여를 내림차순으로 정렬합니다.
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val')은 DataFrame 'prev_sal_df'에 prev_val이라는 새 열을 생성합니다.
  • 각 행에 대해 이 열에는 windowSpec으로 정의된 창 내 이전 행의 'sal' 열 값이 포함됩니다.
  • offset=1 매개변수는 이전 행이 현재 행보다 한 행 앞에 있어야 함을 나타내며, default=0은 각 파티션의 첫 번째 행에 대한 기본값을 지정합니다(첫 번째 행에 대한 이전 행이 없으므로).
  • 마지막으로 prev_sal_df.show()는 결과 DataFrame을 표시합니다.

출력:

출력은 급여를 내림차순으로 정렬하여 각 부서 내 각 직원의 이전 급여를 나타냅니다.

Q7. 각 부서 내 첫 번째 급여를 계산하고 각 부서 내 모든 구성원과 비교합니다.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

PySpark 코드 접근 방식

  • 먼저 부서 번호(deptno)별로 데이터를 분할하고 급여(sal)별로 내림차순으로 정렬하는 WindowSpec 개체를 만듭니다.
  • 그런 다음 windowSpec에 정의된 창의 'sal' 열에 first() 분석 함수를 적용합니다. 이 함수는 'sal'을 내림차순으로 정렬하여 각 파티션(즉, 각 부서 그룹) 내 'sal' 열의 첫 번째 값을 반환합니다. 결과 열에는 'first_val'이라는 새 이름이 지정됩니다.
  • 이제 선택한 열과 급여 값의 내림차순을 기준으로 각 부서의 첫 번째 최고 급여를 표시하는 새 열 'first_val'을 포함하는 결과 DataFrame을 'first_value_df'라는 새 변수에 할당합니다.

출력:

출력에는 직원 DataFrame의 각 부서에 대한 첫 번째 최고 급여가 표시됩니다.

결론

이번 글에서는 윈도우 기능에 대해 알아봅니다. Spark SQL에는 순위 함수, 집계 함수, 값 함수의 세 가지 종류의 윈도우 함수가 있습니다. 이 기능을 사용하여 데이터 세트를 작업하여 중요하고 귀중한 통찰력을 찾았습니다. Spark Window Functions는 순위, 분석, 가치 계산과 같은 강력한 데이터 분석 도구를 제공합니다. 부서별 급여 인사이트를 분석하든 PySpark 및 SQL을 사용하여 실제 사례를 사용하든 이러한 기능은 Spark에서 효과적인 데이터 처리 및 분석을 위한 필수 도구를 제공합니다.

주요 요점

  • 우리는 윈도우 함수에 대해 배웠고 Spark SQL 및 PySpark DataFrame API를 사용하여 작업했습니다.
  • 우리는 적절한 분석을 제공하기 위해 순위, 밀도_순위, 행_번호, 지연, 리드, 그룹별, 파티션별 등의 함수를 사용합니다.
  • 또한 문제에 대한 자세한 단계별 솔루션을 확인하고 각 문제 설명의 끝 부분에 있는 결과를 분석했습니다.

이 사례 연구는 PySpark 기능을 더 잘 이해하는 데 도움이 됩니다. 의견이나 질문이 있으시면 아래에 댓글을 남겨주세요. 나랑 연결해줘 링크드인 추가 논의를 위해. 계속 공부하다!!!

이 기사에 표시된 미디어는 Analytics Vidhya의 소유가 아니며 작성자의 재량에 따라 사용됩니다.

spot_img

최신 인텔리전스

spot_img