Λογότυπο Zephyrnet

Εργασία με Λειτουργίες παραθύρου στο PySpark

Ημερομηνία:

Εισαγωγή

Η εκμάθηση σχετικά με τις Λειτουργίες παραθύρου στο PySpark μπορεί να είναι δύσκολη, αλλά αξίζει τον κόπο. Οι Λειτουργίες παραθύρου είναι ένα ισχυρό εργαλείο για την ανάλυση δεδομένων και μπορούν να σας βοηθήσουν να αποκτήσετε πληροφορίες που μπορεί να μην είχατε δει διαφορετικά. Κατανοώντας πώς να χρησιμοποιείτε τις Λειτουργίες παραθύρου στο Spark. μπορείτε να πάρετε το δικό σας ανάλυση δεδομένων δεξιότητες στο επόμενο επίπεδο και λήψη πιο τεκμηριωμένων αποφάσεων. Είτε εργάζεστε με μεγάλα είτε με μικρά σύνολα δεδομένων, η εκμάθηση των Λειτουργιών παραθύρου στο Spark θα σας επιτρέψει να χειριστείτε και να αναλύσετε δεδομένα με νέους και συναρπαστικούς τρόπους.

Λειτουργίες παραθύρου στο PySpark

Σε αυτό το ιστολόγιο, θα κατανοήσουμε πρώτα την έννοια των συναρτήσεων παραθύρου και στη συνέχεια θα συζητήσουμε πώς να τις χρησιμοποιήσουμε με το Spark SQL και το PySpark DataFrame API. Έτσι, μέχρι το τέλος αυτού του άρθρου, θα κατανοήσετε πώς να χρησιμοποιείτε τις συναρτήσεις παραθύρου με πραγματικά σύνολα δεδομένων και να λαμβάνετε βασικές πληροφορίες για τις επιχειρήσεις.

Στόχοι μάθησης

  • Κατανοήστε την έννοια των συναρτήσεων παραθύρου.
  • Εργασία με συναρτήσεις παραθύρου χρησιμοποιώντας σύνολα δεδομένων.
  • Μάθετε τις πληροφορίες χρησιμοποιώντας τις λειτουργίες παραθύρου.
  • Χρησιμοποιήστε το Spark SQL και το DataFrame API για να εργαστείτε με συναρτήσεις παραθύρου.

Αυτό το άρθρο δημοσιεύθηκε ως μέρος του Data Science Blogathon.

Πίνακας περιεχομένων

Τι είναι οι Λειτουργίες παραθύρου;

Οι συναρτήσεις παραθύρου βοηθούν στην ανάλυση δεδομένων μέσα σε μια ομάδα σειρών που σχετίζονται μεταξύ τους. Επιτρέπουν στους χρήστες να εκτελούν σύνθετους μετασχηματισμούς στις σειρές ενός πλαισίου δεδομένων ή ενός συνόλου δεδομένων που σχετίζονται μεταξύ τους με βάση ορισμένα κριτήρια κατάτμησης και ταξινόμησης.

Οι συναρτήσεις παραθύρου λειτουργούν σε ένα συγκεκριμένο διαμέρισμα ενός πλαισίου δεδομένων ή ενός συνόλου δεδομένων που ορίζεται από ένα σύνολο στηλών κατάτμησης. ο ΤΑΞΙΝΟΜΗΣΗ ΚΑΤΑ Ο όρος χωρίζει τα δεδομένα σε μια συνάρτηση παραθύρου για να τα τακτοποιήσει με μια συγκεκριμένη σειρά. Στη συνέχεια, οι συναρτήσεις παραθύρου εκτελούν υπολογισμούς σε ένα συρόμενο παράθυρο σειρών που περιλαμβάνει την τρέχουσα σειρά και ένα υποσύνολο των προηγούμενων σειρών "και"/"ή" που ακολουθούν, όπως καθορίζεται στο πλαίσιο παραθύρου.

Εργασία με Λειτουργίες παραθύρου στο PySpark

Μερικά κοινά παραδείγματα συναρτήσεων παραθύρου περιλαμβάνουν τον υπολογισμό των κινητών μέσων, την κατάταξη ή την ταξινόμηση σειρών με βάση μια συγκεκριμένη στήλη ή ομάδα στήλες, υπολογίζοντας τα τρέχοντα σύνολα και βρίσκοντας την πρώτη ή την τελευταία τιμή σε μια ομάδα σειρών. Με τις ισχυρές λειτουργίες παραθύρου του Spark, οι χρήστες μπορούν να εκτελούν σύνθετες αναλύσεις και συναθροίσεις μεγάλα σύνολα δεδομένων με σχετική ευκολία, καθιστώντας το δημοφιλές εργαλείο για μεγάλα επεξεργασία δεδομένων και την ανάλυση.

"

Λειτουργίες παραθύρου σε SQL

Το Spark SQL υποστηρίζει τρία είδη συναρτήσεων παραθύρου:

  • Λειτουργίες κατάταξης: - Αυτές οι συναρτήσεις εκχωρούν μια κατάταξη σε κάθε σειρά μέσα σε ένα διαμέρισμα του συνόλου αποτελεσμάτων. Για παράδειγμα, η συνάρτηση ROW_NUMBER() δίνει έναν μοναδικό διαδοχικό αριθμό σε κάθε σειρά μέσα στο διαμέρισμα.
  • Λειτουργίες Analytics: - Αυτές οι συναρτήσεις υπολογίζουν συγκεντρωτικές τιμές σε ένα παράθυρο σειρών. Για παράδειγμα, η συνάρτηση SUM() υπολογίζει το άθροισμα μιας στήλης σε ένα παράθυρο γραμμών.
  • Λειτουργίες αξίας: - Αυτές οι συναρτήσεις υπολογίζουν μια αναλυτική τιμή για κάθε γραμμή σε ένα διαμέρισμα, με βάση τις τιμές άλλων σειρών στο ίδιο διαμέρισμα. Για παράδειγμα, η συνάρτηση LAG() επιστρέφει την τιμή μιας στήλης από την προηγούμενη σειρά στο διαμέρισμα.

Δημιουργία DataFrame

Θα δημιουργήσουμε ένα δείγμα πλαισίου δεδομένων, ώστε να μπορούμε πρακτικά να εργαστούμε με διαφορετικές λειτουργίες παραθύρου. Επίσης, θα προσπαθήσουμε να απαντήσουμε σε ορισμένες ερωτήσεις με τη βοήθεια αυτών των δεδομένων και των συναρτήσεων παραθύρου.

Το πλαίσιο δεδομένων έχει στοιχεία εργαζομένων, όπως Όνομα, Διεύθυνση, Αριθμός Υπαλλήλου, Ημερομηνία πρόσληψης, Μισθός κ.λπ. Συνολικά έχουμε 8 στήλες που είναι οι εξής:

  • 'empno': Αυτή η στήλη περιέχει τον αριθμό του υπαλλήλου.
  • 'ename': Αυτή η στήλη έχει ονόματα υπαλλήλων.
  • 'job': Αυτή η στήλη περιέχει πληροφορίες σχετικά με τους τίτλους εργασίας των εργαζομένων.
  • 'Hiredate': Αυτή η στήλη δείχνει την ημερομηνία πρόσληψης του υπαλλήλου.
  • 'sal': Τα στοιχεία μισθού περιλαμβάνονται σε αυτήν τη στήλη.
  • 'comm': Αυτή η στήλη περιέχει στοιχεία προμήθειας υπαλλήλων, εάν υπάρχουν.
  • 'deptno': Ο αριθμός τμήματος στο οποίο ανήκει ο υπάλληλος βρίσκεται σε αυτήν τη στήλη.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

Τώρα θα ελέγξουμε το σχήμα:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

Δημιουργήστε μια προσωρινή προβολή του DataFrame 'emp_df' με το όνομα "emp". Μας επιτρέπει να κάνουμε ερώτημα στο DataFrame χρησιμοποιώντας τη σύνταξη SQL στο Spark SQL σαν να ήταν πίνακας. Η προσωρινή προβολή ισχύει μόνο για τη διάρκεια της περιόδου λειτουργίας Spark.

emp_df.createOrReplaceTempView("emp")

Επίλυση δηλώσεων προβλημάτων με χρήση συναρτήσεων παραθύρου

Εδώ θα λύσουμε πολλές δηλώσεις προβλημάτων χρησιμοποιώντας τις συναρτήσεις των Windows:

Q1. Κατάταξη του μισθού σε κάθε τμήμα.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

Προσέγγιση για τον κώδικα PySpark

  • Η συνάρτηση Παράθυρο χωρίζει τα δεδομένα κατά αριθμό τμήματος χρησιμοποιώντας partitionBy(col('deptno')) και στη συνέχεια ταξινομεί τα δεδομένα σε κάθε διαμέρισμα ανά μισθό σε φθίνουσα σειρά χρησιμοποιώντας orderBy(col('sal').desc()). Η μεταβλητή windowSpec περιέχει την τελική προδιαγραφή παραθύρου.
  • Το 'emp_df' είναι το πλαίσιο δεδομένων που περιέχει δεδομένα εργαζομένων, συμπεριλαμβανομένων στηλών για empno, ename, job, deptno και sal.
  • Η συνάρτηση κατάταξης εφαρμόζεται στη στήλη μισθού χρησιμοποιώντας το 'F.rank().over(windowSpec)' μέσα στη δήλωση Select. Η στήλη που προκύπτει έχει ψευδώνυμο ως «κατάταξη».
  • Θα δημιουργήσει ένα πλαίσιο δεδομένων, το 'ranking_result_df', το οποίο περιλαμβάνει empno, ename, job, deptno και μισθό. Έχει επίσης μια νέα στήλη, «κατάταξη», που αντιπροσωπεύει την κατάταξη του μισθού του υπαλλήλου στο τμήμα τους.

Παραγωγή:

Το αποτέλεσμα έχει μισθολογική κατάταξη σε κάθε τμήμα.

Ε2. Πυκνή κατάταξη του μισθού σε κάθε τμήμα.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

Προσέγγιση για τον κώδικα PySpark

  • Αρχικά, δημιουργήστε μια προδιαγραφή παραθύρου χρησιμοποιώντας τη συνάρτηση Window, η οποία χωρίζει το DataFrame 'emp_df' κατά deptno και το ταξινομεί κατεβαίνοντας τη στήλη 'sal'.
  • Στη συνέχεια, η συνάρτηση dense_rank() εφαρμόζεται πάνω από την προδιαγραφή παραθύρου, η οποία εκχωρεί μια πυκνή κατάταξη σε κάθε σειρά μέσα σε κάθε διαμέρισμα με βάση τη σειρά ταξινόμησης του.
  • Τέλος, δημιουργείται ένα νέο DataFrame που ονομάζεται "dense_ranking_df" επιλέγοντας συγκεκριμένες στήλες από το emp_df (δηλαδή, "empno", "ename", "job", "deptno" και "sal") και προσθέτοντας μια νέα στήλη "dense_rank" που περιέχει τις πυκνές τιμές κατάταξης που υπολογίζονται από τη συνάρτηση παραθύρου.
  • Τέλος, εμφανίστε το DataFrame που προκύπτει σε μορφή πίνακα.

Παραγωγή:

Το αποτέλεσμα έχει μια μισθολογική πυκνή κατάταξη.

Ε3. Αριθμήστε τη σειρά σε κάθε τμήμα.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

Προσέγγιση για τον κώδικα PySpark

  • Η πρώτη γραμμή ορίζει μια προδιαγραφή παραθύρου για τον υπολογισμό χρησιμοποιώντας τις συναρτήσεις Window.partitionBy() και Window.orderBy(). Αυτό το παράθυρο διαιρείται από τη στήλη deptno και ταξινομείται από τη στήλη sal με φθίνουσα σειρά.
  • Η δεύτερη γραμμή δημιουργεί ένα νέο DataFrame που ονομάζεται 'row_num_df', μια προβολή του 'emp_df' με μια πρόσθετη στήλη που ονομάζεται 'row_num' και περιέχει τις λεπτομέρειες των αριθμών σειρών.
  • Η συνάρτηση show() εμφανίζει το DataFrame που προκύπτει, το οποίο δείχνει τις στήλες empno, ename, job, deptno, sal και row_num κάθε υπαλλήλου.

Παραγωγή:

Η παραγωγή θα έχει τον αριθμό σειράς κάθε υπαλλήλου στο τμήμα του με βάση τον μισθό του.

Q4. Συνολικό ποσό μισθού σε κάθε τμήμα.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

Προσέγγιση για τον κώδικα PySpark

  • Αρχικά, ορίζεται μια προδιαγραφή παραθύρου χρησιμοποιώντας τις μεθόδους "Window.partitionBy()" και "Window.orderBy()". Η μέθοδος "partitionBy()" διαμερίζει τα δεδομένα με τη στήλη "deptno", ενώ η μέθοδος "orderBy()" ταξινομεί τα δεδομένα από τη στήλη "sal" με φθίνουσα σειρά.
  • Στη συνέχεια, η συνάρτηση "sum()" εφαρμόζεται στη στήλη "sal" χρησιμοποιώντας τη μέθοδο "over()" για τον υπολογισμό του τρέχοντος συνόλου των μισθών σε κάθε τμήμα. Το αποτέλεσμα θα είναι σε ένα νέο DataFrame που ονομάζεται "running_sum_sal_df", το οποίο περιέχει τις στήλες 'empno', 'ename', 'job', 'deptno', 'sal' και 'running_total'.
  • Τέλος, η μέθοδος "show()" καλείται στο DataFrame "running_sum_sal_df" για να εμφανίσει την έξοδο του ερωτήματος. Το DataFrame που προκύπτει δείχνει το τρέχον σύνολο των μισθών κάθε υπαλλήλου και άλλες λεπτομέρειες όπως όνομα, αριθμός τμήματος και θέση εργασίας.

Παραγωγή:

Η έξοδος θα έχει ένα τρέχον σύνολο των δεδομένων μισθού κάθε τμήματος.

Ε5: Ο επόμενος μισθός σε κάθε τμήμα.

Για να βρούμε τον επόμενο μισθό σε κάθε τμήμα χρησιμοποιούμε τη λειτουργία LEAD. 

Η συνάρτηση παραθύρου lead() βοηθά να λάβουμε την τιμή της έκφρασης στην επόμενη σειρά του διαμερίσματος παραθύρου. Επιστρέφει μια στήλη για κάθε στήλη εισόδου, όπου κάθε στήλη θα περιέχει την τιμή της στήλης εισόδου για τη γραμμή μετατόπισης πάνω από την τρέχουσα σειρά εντός του διαμερίσματος παραθύρου. Η σύνταξη για τη συνάρτηση lead είναι: - lead(col, offset=1, default=none).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

Προσέγγιση για τον κώδικα PySpark

  • Πρώτον, η συνάρτηση παραθύρου βοηθά στη διαμέριση των σειρών του DataFrame κατά αριθμό τμήματος (deptno) και στη σειρά των μισθών με φθίνουσα σειρά σε κάθε διαμέρισμα.
  • Η συνάρτηση lead() εφαρμόζεται στη συνέχεια στη διατεταγμένη στήλη 'sal' μέσα σε κάθε διαμέρισμα για να επιστρέψει το μισθό του επόμενου υπαλλήλου (με μετατόπιση 1) και η προεπιλεγμένη τιμή είναι 0 σε περίπτωση που δεν υπάρχει επόμενος υπάλληλος.
  • Το προκύπτον DataFrame 'next_salary_df' περιέχει στήλες για τον αριθμό υπαλλήλου (empno), το όνομα (ename), τον τίτλο εργασίας (δουλειά), τον αριθμό τμήματος (deptno), τον τρέχοντα μισθό (sal) και τον επόμενο μισθό (next_val).

Παραγωγή:

Η έξοδος περιέχει τον μισθό του επόμενου υπαλλήλου στο τμήμα με βάση τη σειρά φθίνουσας αμοιβής. 

Ε6. Προηγούμενος μισθός σε κάθε τμήμα.

Για τον υπολογισμό του προηγούμενου μισθού, χρησιμοποιούμε τη συνάρτηση ΟΤΔ.

Η συνάρτηση καθυστέρησης επιστρέφει την τιμή μιας έκφρασης σε μια δεδομένη μετατόπιση πριν από την τρέχουσα σειρά μέσα στο διαμέρισμα παραθύρου. Η σύνταξη της συνάρτησης καθυστέρησης είναι:- lag(expr, offset=1, default=None).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

Προσέγγιση για τον κώδικα PySpark

  • Το window.partitionBy(col('deptno')) καθορίζει το διαμέρισμα παραθύρου. Σημαίνει ότι η λειτουργία παραθύρου λειτουργεί ξεχωριστά για κάθε τμήμα.
  • Στη συνέχεια, το orderBy(col('sal').desc()) καθορίζει τη σειρά του μισθού και θα ταξινομήσει τους μισθούς σε κάθε τμήμα με φθίνουσα σειρά.
  • Το F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') δημιουργεί μια νέα στήλη που ονομάζεται prev_val στο DataFrame 'prev_sal_df'.
  • Για κάθε γραμμή, αυτή η στήλη περιέχει την τιμή της στήλης 'sal' από την προηγούμενη σειρά μέσα στο παράθυρο που ορίζεται από το windowSpec.
  • Η παράμετρος offset=1 υποδεικνύει ότι η προηγούμενη σειρά πρέπει να είναι μία σειρά πριν από την τρέχουσα σειρά και η default=0 καθορίζει την προεπιλεγμένη τιμή για την πρώτη σειρά σε κάθε διαμέρισμα (καθώς δεν υπάρχει προηγούμενη σειρά για την πρώτη σειρά).
  • Τέλος, η prev_sal_df.show() εμφανίζει το DataFrame που προκύπτει.

Παραγωγή:

Η παραγωγή αντιπροσωπεύει τον προηγούμενο μισθό για κάθε εργαζόμενο σε κάθε τμήμα, με βάση την ταξινόμηση των μισθών σε φθίνουσα σειρά.

Ε7. Πρώτος μισθός σε κάθε τμήμα και σύγκριση με κάθε μέλος σε κάθε τμήμα.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

Προσέγγιση για τον κώδικα PySpark

  • Αρχικά, δημιουργήστε ένα αντικείμενο WindowSpec που χωρίζει τα δεδομένα κατά αριθμό τμήματος (deptno) και τα ταξινομεί κατά μισθό (sal) με φθίνουσα σειρά.
  • Στη συνέχεια, εφαρμόζει την αναλυτική συνάρτηση first() στη στήλη 'sal' πάνω από το παράθυρο που ορίζεται από το windowSpec. Αυτή η συνάρτηση επιστρέφει την πρώτη τιμή της στήλης 'sal' σε κάθε διαμέρισμα (δηλαδή κάθε ομάδα deptno) ταξινομημένη με φθίνουσα σειρά 'sal'. Η στήλη που προκύπτει έχει ένα νέο όνομα, "first_val".
  • Τώρα εκχωρεί το DataFrame που προκύπτει, το οποίο περιέχει τις επιλεγμένες στήλες και μια νέα στήλη, "first_val", που δείχνει τον πρώτο υψηλότερο μισθό για κάθε τμήμα με βάση τη φθίνουσα σειρά των τιμών μισθού, σε μια νέα μεταβλητή που ονομάζεται "first_value_df".

Παραγωγή:

Η έξοδος δείχνει τον πρώτο υψηλότερο μισθό για κάθε τμήμα σε ένα DataFrame υπαλλήλου.

Συμπέρασμα

Σε αυτό το άρθρο, μαθαίνουμε για τις λειτουργίες παραθύρου. Το Spark SQL έχει τρία είδη συναρτήσεων παραθύρου: συναρτήσεις κατάταξης, συναρτήσεις συγκεντρωτικών συναρτήσεων και συναρτήσεις τιμής. Χρησιμοποιώντας αυτήν τη συνάρτηση, εργαστήκαμε σε ένα σύνολο δεδομένων για να βρούμε ορισμένες σημαντικές και πολύτιμες πληροφορίες. Το Spark Window Functions προσφέρει ισχυρά εργαλεία ανάλυσης δεδομένων όπως κατάταξη, αναλυτικά στοιχεία και υπολογισμούς τιμών. Είτε αναλύουν πληροφορίες μισθών ανά τμήμα είτε χρησιμοποιούν πρακτικά παραδείγματα με το PySpark & ​​SQL, αυτές οι λειτουργίες παρέχουν βασικά εργαλεία για αποτελεσματική επεξεργασία και ανάλυση δεδομένων στο Spark.

Βασικές τακτικές

  • Μάθαμε για τις λειτουργίες του παραθύρου και δουλέψαμε μαζί τους χρησιμοποιώντας το Spark SQL και το PySpark DataFrame API.
  • Χρησιμοποιούμε συναρτήσεις όπως rank, dense_rank, row_number, lag, lead, groupBy, partitionBy και άλλες συναρτήσεις για να παρέχουμε σωστή ανάλυση.
  • Είδαμε επίσης τις αναλυτικές, βήμα προς βήμα λύσεις στο πρόβλημα και αναλύσαμε τα αποτελέσματα στο τέλος κάθε δήλωσης προβλήματος.

Αυτή η μελέτη περίπτωσης σάς βοηθά να κατανοήσετε καλύτερα τις συναρτήσεις PySpark. Εάν έχετε οποιεσδήποτε απόψεις ή ερωτήσεις, σχολιάστε παρακάτω. Συνδεθείτε μαζί μου LinkedIn για περαιτέρω συζήτηση. Συνέχισε να μαθαίνεις!!!

Τα μέσα που εμφανίζονται σε αυτό το άρθρο δεν ανήκουν στο Analytics Vidhya και χρησιμοποιούνται κατά την κρίση του συγγραφέα.

spot_img

Τελευταία Νοημοσύνη

spot_img