Logo Zephyrnet

Praca z funkcjami okna w PySpark

Data:

Wprowadzenie

Poznanie funkcji okna w PySpark może być trudne, ale warte wysiłku. Funkcje okna są potężnym narzędziem do analizowania danych i mogą pomóc Ci uzyskać informacje, których w innym przypadku być może nie widziałbyś. Rozumiejąc, jak używać funkcji okna w Spark; możesz zabrać swoje analiza danych umiejętności na wyższy poziom i podejmować bardziej świadome decyzje. Niezależnie od tego, czy pracujesz z dużymi, czy małymi zbiory danychnauka funkcji okien w platformie Spark umożliwi manipulowanie i analizowanie danych w nowy, ekscytujący sposób.

Funkcje okna w PySpark

Na tym blogu najpierw zrozumiemy koncepcję funkcji okiennych, a następnie omówimy, jak z nich korzystać w Spark SQL i PySpark DataFrame API. Dzięki temu pod koniec tego artykułu zrozumiesz, jak używać funkcji okna z rzeczywistymi zbiorami danych i uzyskasz niezbędne spostrzeżenia dla biznesu.

Cele kształcenia

  • Zrozumienie koncepcji funkcji okna.
  • Praca z funkcjami okna przy użyciu zbiorów danych.
  • Poznaj spostrzeżenia, korzystając z funkcji okna.
  • Użyj Spark SQL i DataFrame API do pracy z funkcjami okna.

Ten artykuł został opublikowany jako część Blogathon nauki o danych.

Spis treści

Jakie są funkcje okna?

Funkcje okna pomagają analizować dane w grupie wierszy, które są ze sobą powiązane. Umożliwiają użytkownikom wykonywanie złożonych transformacji wierszy ramki danych lub zbioru danych powiązanych ze sobą w oparciu o pewne kryteria partycjonowania i porządkowania.

Funkcje okna działają na określonej partycji ramki danych lub zbioru danych zdefiniowanej przez zestaw kolumn podziału. The ZAMÓW PRZEZ klauzula dzieli dane w funkcji okna, aby uporządkować je w określonej kolejności. Funkcje okna wykonują następnie obliczenia w przesuwanym oknie wierszy, które zawiera bieżący wiersz i podzbiór poprzedzających wierszy „i”/„lub” zgodnie z określeniem w ramce okna.

Praca z funkcjami okna w PySpark

Niektóre typowe przykłady funkcji okna obejmują obliczanie średnich kroczących, rankingowanie lub sortowanie wierszy na podstawie określonej kolumny lub grupy kolumny, obliczanie sum bieżących i znajdowanie pierwszej lub ostatniej wartości w grupie wierszy. Dzięki potężnym funkcjom okna Spark użytkownicy mogą wykonywać złożone analizy i agregacje dużych zbiorów danych ze względną łatwością, co czyni go popularnym narzędziem dla dużych analiza danych i analityka.

"

Funkcje okna w SQL

Spark SQL obsługuje trzy rodzaje funkcji okna:

  • Funkcje rankingowe: - Funkcje te przypisują rangę każdemu wierszowi w obrębie podziału zbioru wyników. Na przykład funkcja ROW_NUMBER() nadaje unikalny numer kolejny każdemu wierszowi w partycji.
  • Funkcje analityczne: - Funkcje te obliczają zagregowane wartości w oknie wierszy. Na przykład funkcja SUM() oblicza sumę kolumny w oknie wierszy.
  • Funkcje wartości: - Funkcje te obliczają wartość analityczną dla każdego wiersza w partycji na podstawie wartości innych wierszy w tej samej partycji. Przykładowo funkcja LAG() zwraca wartość kolumny z poprzedniego wiersza partycji.

Tworzenie ramki danych

Stworzymy przykładową ramkę danych, abyśmy mogli praktycznie pracować z różnymi funkcjami okna. Spróbujemy także odpowiedzieć na niektóre pytania za pomocą tych danych i funkcji okna.

Ramka danych zawiera szczegółowe informacje o pracownikach, takie jak imię i nazwisko, oznaczenie, numer pracownika, data zatrudnienia, wynagrodzenie itp. Łącznie mamy 8 kolumn, które wyglądają następująco:

  • 'empno': Ta kolumna zawiera numer pracownika.
  • „ename”: w tej kolumnie znajdują się nazwiska pracowników.
  • „praca”: Ta kolumna zawiera informacje o stanowiskach pracowników.
  • „data zatrudnienia”: ta kolumna pokazuje datę zatrudnienia pracownika.
  • „sal”: Szczegóły wynagrodzenia zawarte w tej kolumnie.
  • „comm”: Ta kolumna zawiera szczegółowe informacje na temat prowizji pracowniczych, jeśli takie istnieją.
  • „deptno”: W tej kolumnie znajduje się numer działu, do którego należy pracownik.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

Teraz sprawdzimy schemat:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

Utwórz tymczasowy widok ramki danych „emp_df” o nazwie „emp”. Pozwala nam wysyłać zapytania do DataFrame przy użyciu składni SQL w Spark SQL tak, jakby była to tabela. Widok tymczasowy jest ważny tylko przez czas trwania sesji Spark.

emp_df.createOrReplaceTempView("emp")

Rozwiązywanie problemów przy użyciu funkcji okna

Tutaj rozwiążemy kilka stwierdzeń problemów za pomocą funkcji systemu Windows:

Pytanie 1. Oceń wynagrodzenie w każdym dziale.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

Podejście do kodu PySpark

  • Funkcja Window dzieli dane według numeru działu za pomocą partycjiBy(col('deptno')), a następnie porządkuje dane w każdej partycji według wynagrodzenia w kolejności malejącej za pomocą metody OrderBy(col('sal').desc()). Zmienna windowSpec przechowuje ostateczną specyfikację okna.
  • „emp_df” to ramka danych zawierająca dane pracownika, w tym kolumny empno, ename, job, deptno i sal.
  • Funkcja rangi jest stosowana do kolumny wynagrodzenia za pomocą „F.rank().over(windowSpec)” w instrukcji Select. Wynikowa kolumna ma alias „rank”.
  • Utworzy ramkę danych „ranking_result_df”, która zawiera empno, ename, job, deptno i wynagrodzenie. Zawiera także nową kolumnę „ranking”, która reprezentuje stopień wynagrodzenia pracownika w jego dziale.

Wyjście:

Wynik ma rangę wynagrodzeń w każdym dziale.

Pytanie 2. Gęsty ranking wynagrodzeń w każdym dziale.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

Podejście do kodu PySpark

  • Najpierw utwórz specyfikację okna za pomocą funkcji Window, która dzieli ramkę danych „emp_df” według deptno i porządkuje ją w dół według kolumny „sal”.
  • Następnie do specyfikacji okna stosowana jest funkcja gęsta_rank(), która przypisuje gęstą rangę każdemu wierszowi w każdej partycji na podstawie jego kolejności posortowania.
  • Na koniec tworzona jest nowa ramka danych o nazwie „dense_ranking_df” poprzez wybranie określonych kolumn z emp_df (tj. „empno”, „ename”, „job”, „deptno” i „sal”) i dodanie nowej kolumny „dense_rank”, która zawiera gęste wartości rankingu obliczone przez funkcję okna.
  • Na koniec wyświetl wynikową ramkę danych w formacie tabelarycznym.

Wyjście:

Wynik ma gęstą rangę pod względem wynagrodzeń.

Pytanie 3. Ponumeruj wiersze w każdym dziale.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

Podejście do kodu PySpark

  • Pierwsza linia definiuje specyfikację okna do obliczeń przy użyciu funkcji Window.partitionBy() i Window.orderBy(). Okno to jest podzielone według kolumny deptno i uporządkowane według kolumny sal w kolejności malejącej.
  • Druga linia tworzy nową ramkę danych o nazwie „row_num_df”, rzut „emp_df” z dodatkową kolumną o nazwie „row_num” i zawiera szczegóły numerów wierszy.
  • Funkcja show() wyświetla wynikową ramkę danych zawierającą kolumny empno, ename, job, deptno, sal i row_num każdego pracownika.

Wyjście:

Dane wyjściowe będą zawierały numer wiersza każdego pracownika w jego dziale na podstawie jego wynagrodzenia.

Pytanie 4. Bieżąca łączna suma wynagrodzeń w każdym dziale.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

Podejście dla kodu PySpark

  • Najpierw definiuje się specyfikację okna za pomocą metod „Window.partitionBy()” i „Window.orderBy()”. Metoda „partitionBy()” dzieli dane według kolumny „deptno”, natomiast metoda „orderBy()” porządkuje dane według kolumny „sal” w kolejności malejącej.
  • Następnie do kolumny „sal” stosowana jest funkcja „sum()” przy użyciu metody „over()” w celu obliczenia bieżącej sumy wynagrodzeń w każdym dziale. Wynik będzie znajdował się w nowej ramce danych o nazwie „running_sum_sal_df”, która zawiera kolumny „empno”, „ename”, „job”, „deptno”, „sal” i „running_total”.
  • Na koniec wywoływana jest metoda „show()” na ramce danych „running_sum_sal_df” w celu wyświetlenia wyniku zapytania. Wynikowa ramka DataFrame pokazuje bieżącą sumę wynagrodzeń każdego pracownika i inne szczegóły, takie jak imię i nazwisko, numer działu i stanowisko.

Wyjście:

Dane wyjściowe będą zawierały sumę danych o wynagrodzeniach każdego działu.

Pytanie 5: Kolejne wynagrodzenie w każdym dziale.

Aby znaleźć kolejną pensję w każdym dziale, używamy funkcji LEAD. 

Funkcja okienkowa lead() pomaga uzyskać wartość wyrażenia w kolejnym wierszu partycji okiennej. Zwraca kolumnę dla każdej kolumny wejściowej, przy czym każda kolumna będzie zawierać wartość kolumny wejściowej dla wiersza przesunięcia powyżej bieżącego wiersza w partycji okna. Składnia funkcji lead jest następująca: - lead(col, offset=1, default=None).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

Podejście do kodu PySpark

  • Po pierwsze, funkcja okna pomaga podzielić wiersze DataFrame według numeru działu (deptno) i uporządkować wynagrodzenia w kolejności malejącej w ramach każdej partycji.
  • Następnie do uporządkowanej kolumny „sal” w każdej partycji stosowana jest funkcja lead(), aby zwrócić wynagrodzenie kolejnego pracownika (z przesunięciem 1), a wartością domyślną jest 0 w przypadku, gdy nie ma kolejnego pracownika.
  • Wynikowa ramka danych „next_salary_df” zawiera kolumny dla numeru pracownika (empno), nazwiska (ename), stanowiska (job), numeru działu (deptno), aktualnego wynagrodzenia (sal) i następnego wynagrodzenia (next_val).

Wyjście:

Dane wyjściowe zawierają wynagrodzenie kolejnego pracownika w dziale w kolejności malejącej. 

Pytanie 6. Poprzednie wynagrodzenie w każdym dziale.

Aby obliczyć poprzednią pensję, używamy funkcji LAG.

Funkcja opóźnienia zwraca wartość wyrażenia w danym przesunięciu przed bieżącym wierszem w partycji okna. Składnia funkcji opóźnienia jest następująca: - lag(expr, offset=1, default=None).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

Podejście do kodu PySpark

  • Parametr window.partitionBy(col('deptno')) określa partycję okna. Oznacza to, że funkcja okna działa oddzielnie dla każdego działu.
  • Następnie OrderBy(col('sal').desc()) określa kolejność wynagrodzeń i porządkuje wynagrodzenia w każdym dziale w kolejności malejącej.
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') tworzy nową kolumnę o nazwie prev_val w ramce danych 'prev_sal_df'.
  • Dla każdego wiersza kolumna ta zawiera wartość kolumny „sal” z poprzedniego wiersza w obrębie okna zdefiniowanego przez windowSpec.
  • Parametr offset=1 wskazuje, że poprzedni wiersz powinien znajdować się o jeden wiersz przed bieżącym wierszem, a parametr default=0 określa wartość domyślną dla pierwszego wiersza w każdej partycji (ponieważ nie ma poprzedniego wiersza dla pierwszego wiersza).
  • Na koniec prev_sal_df.show() wyświetla wynikową ramkę danych.

Wyjście:

Dane wyjściowe reprezentują poprzednie wynagrodzenie każdego pracownika w każdym dziale, w oparciu o uporządkowanie wynagrodzeń w kolejności malejącej.

Pytanie 7. Pierwsza pensja w każdym dziale i porównanie z każdym członkiem w każdym dziale.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

Podejście do kodu PySpark

  • Najpierw utwórz obiekt WindowSpec, który dzieli dane według numeru działu (deptno) i porządkuje je według wynagrodzenia (sal) w kolejności malejącej.
  • Następnie stosuje funkcję analityczną First() do kolumny „sal” w oknie zdefiniowanym przez windowSpec. Ta funkcja zwraca pierwszą wartość kolumny „sal” w obrębie każdej partycji (tj. każdej grupy deptno) uporządkowanej malejąco „sal”. Wynikowa kolumna ma nową nazwę „first_val”.
  • Teraz przypisuje wynikową ramkę DataFrame, która zawiera wybrane kolumny i nową kolumnę „first_val”, która pokazuje pierwszą najwyższą pensję w każdym dziale na podstawie malejącej kolejności wartości wynagrodzeń, do nowej zmiennej o nazwie „first_value_df”.

Wyjście:

Dane wyjściowe pokazują pierwszą najwyższą pensję dla każdego działu w ramce DataFrame pracownika.

Wnioski

W tym artykule dowiemy się o funkcjach okna. Spark SQL ma trzy rodzaje funkcji okna: funkcje rankingowe, funkcje agregujące i funkcje wartości. Korzystając z tej funkcji, pracowaliśmy nad zbiorem danych, aby znaleźć ważne i cenne spostrzeżenia. Funkcje okna Spark oferują potężne narzędzia do analizy danych, takie jak ranking, analiza i obliczenia wartości. Niezależnie od tego, czy analizujesz statystyki wynagrodzeń według działów, czy wykorzystujesz praktyczne przykłady z PySpark i SQL, funkcje te zapewniają niezbędne narzędzia do efektywnego przetwarzania i analizy danych w Spark.

Na wynos

  • Poznaliśmy funkcje okna i pracowaliśmy z nimi przy użyciu Spark SQL i PySpark DataFrame API.
  • Używamy funkcji takich jak rank, gęsty_rank, numer_wiersza, opóźnienie, lead, groupBy, PartitionBy i innych funkcji, aby zapewnić odpowiednią analizę.
  • Zobaczyliśmy także szczegółowe rozwiązania problemu krok po kroku i przeanalizowaliśmy wyniki na końcu każdego opisu problemu.

To studium przypadku pomoże Ci lepiej zrozumieć funkcje PySpark. Jeśli masz jakieś opinie lub pytania, skomentuj poniżej. Połącz się ze mną LinkedIn do dalszej dyskusji. Ucz się!!!

Media pokazane w tym artykule nie są własnością Analytics Vidhya i są wykorzystywane według uznania Autora.

spot_img

Najnowsza inteligencja

spot_img