Zephyrnet-Logo

Arbeiten mit Fensterfunktionen in PySpark

Datum:

Einleitung

Das Erlernen von Fensterfunktionen in PySpark kann eine Herausforderung sein, aber die Mühe lohnt sich. Fensterfunktionen sind ein leistungsstarkes Tool zum Analysieren von Daten und können Ihnen dabei helfen, Erkenntnisse zu gewinnen, die Sie sonst vielleicht nicht gesehen hätten. Durch das Verständnis, wie man Fensterfunktionen in Spark verwendet; Du kannst deine nehmen Datenanalyse Bringen Sie Ihre Fähigkeiten auf die nächste Ebene und treffen Sie fundiertere Entscheidungen. Egal, ob Sie mit Groß oder Klein arbeiten DatensätzeWenn Sie Fensterfunktionen in Spark erlernen, können Sie Daten auf neue und aufregende Weise bearbeiten und analysieren.

Fensterfunktionen in PySpark

In diesem Blog werden wir zunächst das Konzept der Fensterfunktionen verstehen und dann diskutieren, wie man sie mit Spark SQL und der PySpark DataFrame API verwendet. Damit Sie am Ende dieses Artikels verstehen, wie Sie Fensterfunktionen mit echten Datensätzen nutzen und wichtige Erkenntnisse für Ihr Unternehmen gewinnen.

Lernziele

  • Verstehen Sie das Konzept der Fensterfunktionen.
  • Arbeiten mit Fensterfunktionen mithilfe von Datensätzen.
  • Ermitteln Sie die Erkenntnisse mithilfe der Fensterfunktionen.
  • Verwenden Sie Spark SQL und die DataFrame-API, um mit Fensterfunktionen zu arbeiten.

Dieser Artikel wurde als Teil des veröffentlicht Data Science-Blogathon.

Inhaltsverzeichnis

Was sind Fensterfunktionen?

Fensterfunktionen helfen bei der Analyse von Daten innerhalb einer Gruppe von Zeilen, die miteinander in Beziehung stehen. Sie ermöglichen es Benutzern, anhand bestimmter Partitionierungs- und Sortierkriterien komplexe Transformationen an den miteinander verbundenen Zeilen eines Datenrahmens oder Datensatzes durchzuführen.

Fensterfunktionen arbeiten auf einer bestimmten Partition eines Datenrahmens oder Datensatzes, die durch eine Reihe von Partitionierungsspalten definiert ist. Der SORTIEREN NACH Die Klausel partitioniert die Daten in einer Fensterfunktion, um sie in einer bestimmten Reihenfolge anzuordnen. Fensterfunktionen führen dann Berechnungen für ein gleitendes Zeilenfenster durch, das die aktuelle Zeile und eine Teilmenge der vorhergehenden „und“/„oder“-folgenden Zeilen umfasst, wie im Fensterrahmen angegeben.

Arbeiten mit Fensterfunktionen in PySpark

Zu den gängigen Beispielen für Fensterfunktionen gehören die Berechnung gleitender Durchschnitte sowie das Ordnen oder Sortieren von Zeilen basierend auf einer bestimmten Spalte oder Gruppe Spalten, Berechnen laufender Summen und Ermitteln des ersten oder letzten Werts in einer Gruppe von Zeilen. Mit den leistungsstarken Fensterfunktionen von Spark können Benutzer komplexe Analysen und Aggregationen durchführen große Datensätze mit relativer Leichtigkeit, was es zu einem beliebten Werkzeug für große Unternehmen macht Datenverarbeitung und Analytik.

"

Fensterfunktionen in SQL

Spark SQL unterstützt drei Arten von Fensterfunktionen:

  • Ranking-Funktionen:- Diese Funktionen weisen jeder Zeile innerhalb einer Partition der Ergebnismenge einen Rang zu. Die Funktion ROW_NUMBER() gibt beispielsweise jeder Zeile innerhalb der Partition eine eindeutige fortlaufende Nummer.
  • Analysefunktionen:- Diese Funktionen berechnen Aggregatwerte über ein Zeilenfenster. Beispielsweise berechnet die Funktion SUM() die Summe einer Spalte über ein Zeilenfenster.
  • Wertfunktionen:- Diese Funktionen berechnen einen Analysewert für jede Zeile in einer Partition, basierend auf den Werten anderer Zeilen in derselben Partition. Beispielsweise gibt die Funktion LAG() den Wert einer Spalte aus der vorherigen Zeile in der Partition zurück.

DataFrame-Erstellung

Wir werden einen Beispieldatenrahmen erstellen, damit wir praktisch mit verschiedenen Fensterfunktionen arbeiten können. Außerdem werden wir versuchen, mit Hilfe dieser Daten und Fensterfunktionen einige Fragen zu beantworten.

Der Datenrahmen enthält Mitarbeiterdetails wie Name, Bezeichnung, Mitarbeiternummer, Einstellungsdatum, Gehalt usw. Insgesamt haben wir 8 Spalten, die wie folgt lauten:

  • 'empno': Diese Spalte enthält die Nummer des Mitarbeiters.
  • 'ename': Diese Spalte enthält Mitarbeiternamen.
  • „Job“: Diese Spalte enthält Informationen über die Berufsbezeichnungen der Mitarbeiter.
  • 'hiredate': Diese Spalte zeigt das Einstellungsdatum des Mitarbeiters.
  • 'sal': Gehaltsdetails sind in dieser Spalte enthalten.
  • „comm“: Diese Spalte enthält ggf. Einzelheiten zu den Mitarbeiterprovisionen.
  • 'deptno': In dieser Spalte steht die Abteilungsnummer, zu der der Mitarbeiter gehört.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

Jetzt überprüfen wir das Schema:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

Erstellen Sie eine temporäre Ansicht des DataFrame „emp_df“ mit dem Namen „emp“. Es ermöglicht uns, den DataFrame mithilfe der SQL-Syntax in Spark SQL abzufragen, als wäre er eine Tabelle. Die temporäre Ansicht ist nur für die Dauer der Spark-Sitzung gültig.

emp_df.createOrReplaceTempView("emp")

Lösen von Problemstellungen mithilfe von Fensterfunktionen

Hier werden wir mehrere Problemstellungen mithilfe von Windows-Funktionen lösen:

Q1. Ordnen Sie das Gehalt innerhalb jeder Abteilung.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

Ansatz für PySpark-Code

  • Die Window-Funktion partitioniert die Daten nach Abteilungsnummer mithilfe von partitionBy(col('deptno')) und ordnet die Daten innerhalb jeder Partition dann mithilfe von orderBy(col('sal').desc()) nach Gehalt in absteigender Reihenfolge. Die Variable windowSpec enthält die endgültige Fensterspezifikation.
  • „emp_df“ ist der Datenrahmen, der Mitarbeiterdaten enthält, einschließlich der Spalten für empno, ename, job, deptno und sal.
  • Die Rangfunktion wird mithilfe von „F.rank().over(windowSpec)“ in der SELECT-Anweisung auf die Gehaltsspalte angewendet. Die resultierende Spalte hat den Aliasnamen „Rang“.
  • Es wird ein Datenrahmen namens „ranking_result_df“ erstellt, der empno, ename, job, deptno und Gehalt enthält. Außerdem gibt es eine neue Spalte „Rang“, die den Rang des Gehalts des Mitarbeiters innerhalb seiner Abteilung darstellt.

Ausgang:

Das Ergebnis hat einen Gehaltsrang in jeder Abteilung.

Q2. Dichtes Ranking der Gehälter innerhalb der einzelnen Abteilungen.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

Ansatz für PySpark-Code

  • Erstellen Sie zunächst eine Fensterspezifikation mit der Window-Funktion, die den DataFrame „emp_df“ nach deptno partitioniert und ihn nach absteigender Reihenfolge in der Spalte „sal“ sortiert.
  • Anschließend wird die Funktion „dense_rank()“ auf die Fensterspezifikation angewendet, die jeder Zeile innerhalb jeder Partition basierend auf ihrer Sortierreihenfolge einen dichten Rang zuweist.
  • Schließlich wird ein neuer DataFrame namens „dense_ranking_df“ erstellt, indem bestimmte Spalten aus emp_df ausgewählt werden (z. B. „empno“, „ename“, „job“, „deptno“ und „sal“) und eine neue Spalte „dense_rank“ hinzugefügt wird enthält die von der Fensterfunktion berechneten dichten Rangfolgewerte.
  • Zuletzt zeigen Sie den resultierenden DataFrame im Tabellenformat an.

Ausgang:

Das Ergebnis hat einen gehaltsmäßig dichten Rang.

Q3. Nummerieren Sie die Zeile innerhalb jeder Abteilung.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

Ansatz für PySpark-Code

  • Die erste Zeile definiert eine Fensterspezifikation für die Berechnung mithilfe der Funktionen Window.partitionBy() und Window.orderBy(). Dieses Fenster ist durch die Spalte „deptno“ unterteilt und in der Spalte „sal“ in absteigender Reihenfolge sortiert.
  • Die zweite Zeile erstellt einen neuen DataFrame namens „row_num_df“, eine Projektion von „emp_df“ mit einer zusätzlichen Spalte namens „row_num“, die die Details zu den Zeilennummern enthält.
  • Die Funktion show() zeigt den resultierenden DataFrame an, der die Spalten „emno“, „ename“, „job“, „deptno“, „sal“ und „row_num“ jedes Mitarbeiters anzeigt.

Ausgang:

Die Ausgabe enthält die Zeilennummer jedes Mitarbeiters innerhalb seiner Abteilung basierend auf seinem Gehalt.

Q4. Laufende Gesamtgehaltssumme in jeder Abteilung.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

Ansatz für PySpark-Code

  • Zunächst wird eine Fensterspezifikation mithilfe der Methoden „Window.partitionBy()“ und „Window.orderBy()“ definiert. Die Methode „partitionBy()“ partitioniert die Daten nach der Spalte „deptno“, während die Methode „orderBy()“ die Daten nach der Spalte „sal“ in absteigender Reihenfolge sortiert.
  • Als nächstes wird die Funktion „sum()“ auf die Spalte „sal“ angewendet, wobei die Methode „over()“ verwendet wird, um die laufende Summe der Gehälter innerhalb jeder Abteilung zu berechnen. Das Ergebnis wird in einem neuen DataFrame namens „running_sum_sal_df“ angezeigt, der die Spalten „empno“, „ename“, „job“, „deptno“, „sal“ und „running_total“ enthält.
  • Abschließend wird die Methode „show()“ im DataFrame „running_sum_sal_df“ aufgerufen, um die Ausgabe der Abfrage anzuzeigen. Der resultierende DataFrame zeigt die laufende Summe der Gehälter jedes Mitarbeiters und andere Details wie Name, Abteilungsnummer und Job.

Ausgang:

Die Ausgabe enthält eine laufende Summe der Gehaltsdaten jeder Abteilung.

F5: Das nächste Gehalt innerhalb jeder Abteilung.

Um das nächste Gehalt innerhalb jeder Abteilung zu finden, verwenden wir die LEAD-Funktion. 

Die Fensterfunktion „lead()“ hilft dabei, den Wert des Ausdrucks in der nächsten Zeile der Fensterpartition abzurufen. Für jede Eingabespalte wird eine Spalte zurückgegeben, wobei jede Spalte den Wert der Eingabespalte für die Offsetzeile über der aktuellen Zeile innerhalb der Fensterpartition enthält. Die Syntax für die Lead-Funktion lautet: Lead(col, offset=1, default=None).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

Ansatz für PySpark-Code

  • Erstens hilft die Fensterfunktion dabei, die Zeilen des DataFrame nach Abteilungsnummer (deptno) zu unterteilen und die Gehälter innerhalb jeder Partition in absteigender Reihenfolge zu ordnen.
  • Die Funktion „lead()“ wird dann auf die geordnete Spalte „sal“ in jeder Partition angewendet, um das Gehalt des folgenden Mitarbeiters zurückzugeben (mit einem Offset von 1). Der Standardwert ist 0, falls es keinen nächsten Mitarbeiter gibt.
  • Der resultierende DataFrame „next_salary_df“ enthält Spalten für die Mitarbeiternummer (empno), den Namen (ename), die Berufsbezeichnung (job), die Abteilungsnummer (deptno), das aktuelle Gehalt (sal) und das nächste Gehalt (next_val).

Ausgang:

Die Ausgabe enthält das Gehalt des nächsten Mitarbeiters in der Abteilung in absteigender Gehaltsreihenfolge. 

F6. Bisheriges Gehalt in jeder Abteilung.

Zur Berechnung des bisherigen Gehalts nutzen wir die LAG-Funktion.

Die Verzögerungsfunktion gibt den Wert eines Ausdrucks an einem bestimmten Offset vor der aktuellen Zeile innerhalb der Fensterpartition zurück. Die Syntax der Lag-Funktion lautet: lag(expr, offset=1, default=None).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

Ansatz für PySpark-Code

  • window.partitionBy(col('deptno')) gibt die Fensterpartition an. Das bedeutet, dass die Fensterfunktion für jede Abteilung separat funktioniert.
  • Dann gibt orderBy(col('sal').desc()) die Reihenfolge des Gehalts an und ordnet die Gehälter innerhalb jeder Abteilung in absteigender Reihenfolge.
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') erstellt eine neue Spalte namens prev_val im DataFrame 'prev_sal_df'.
  • Für jede Zeile enthält diese Spalte den Wert der Spalte „sal“ aus der vorherigen Zeile innerhalb des durch die windowSpec definierten Fensters.
  • Der Parameter offset=1 gibt an, dass die vorherige Zeile eine Zeile vor der aktuellen Zeile liegen soll, und default=0 gibt den Standardwert für die erste Zeile in jeder Partition an (da es für die erste Zeile keine vorherige Zeile gibt).
  • Abschließend zeigt prev_sal_df.show() den resultierenden DataFrame an.

Ausgang:

Die Ausgabe stellt das bisherige Gehalt für jeden Mitarbeiter in jeder Abteilung dar, basierend auf der Reihenfolge der Gehälter in absteigender Reihenfolge.

F7. Erstes Gehalt innerhalb jeder Abteilung und Vergleich mit jedem Mitglied innerhalb jeder Abteilung.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

Ansatz für PySpark-Code

  • Erstellen Sie zunächst ein WindowSpec-Objekt, das die Daten nach Abteilungsnummer (deptno) partitioniert und sie in absteigender Reihenfolge nach Gehalt (sal) sortiert.
  • Wendet dann die analytische Funktion first() auf die Spalte „sal“ über dem durch windowSpec definierten Fenster an. Diese Funktion gibt den ersten Wert der Spalte „sal“ innerhalb jeder Partition (dh jeder Abteilungsgruppe) zurück, geordnet nach absteigendem „sal“. Die resultierende Spalte hat einen neuen Namen: „first_val“.
  • Weist nun den resultierenden DataFrame, der die ausgewählten Spalten und eine neue Spalte „first_val“ enthält, die das erste höchste Gehalt für jede Abteilung basierend auf der absteigenden Reihenfolge der Gehaltswerte anzeigt, einer neuen Variablen namens „first_value_df“ zu.

Ausgang:

Die Ausgabe zeigt das erste höchste Gehalt für jede Abteilung in einem Mitarbeiter-DataFrame.

Zusammenfassung

In diesem Artikel lernen wir Fensterfunktionen kennen. Spark SQL verfügt über drei Arten von Fensterfunktionen: Ranking-Funktionen, Aggregatfunktionen und Wertfunktionen. Mit dieser Funktion haben wir an einem Datensatz gearbeitet, um einige wichtige und wertvolle Erkenntnisse zu gewinnen. Spark Window Functions bietet leistungsstarke Datenanalysetools wie Ranking, Analyse und Wertberechnungen. Ganz gleich, ob Sie Gehaltseinblicke nach Abteilungen analysieren oder praktische Beispiele mit PySpark und SQL anwenden, diese Funktionen stellen wesentliche Werkzeuge für eine effektive Datenverarbeitung und -analyse in Spark bereit.

Key Take Away

  • Wir lernten die Fensterfunktionen kennen und arbeiteten mit ihnen mithilfe der Spark SQL- und PySpark DataFrame-API.
  • Wir verwenden Funktionen wie Rank, Dense_Rank, Row_Number, Lag, Lead, GroupBy, PartitionBy und andere Funktionen, um eine ordnungsgemäße Analyse bereitzustellen.
  • Wir haben auch die detaillierten Schritt-für-Schritt-Lösungen für das Problem gesehen und die Ausgabe am Ende jeder Problemstellung analysiert.

Diese Fallstudie hilft Ihnen, die PySpark-Funktionen besser zu verstehen. Wenn Sie Meinungen oder Fragen haben, kommentieren Sie diese unten. Verbinde dich mit mir  LinkedIn zur weiteren Diskussion. Lerne weiter!!!

Die in diesem Artikel gezeigten Medien sind nicht Eigentum von Analytics Vidhya und werden nach Ermessen des Autors verwendet.

spot_img

Neueste Intelligenz

spot_img