Logo Zephyrnet

Bekerja dengan Fungsi Jendela di PySpark

Tanggal:

Pengantar

Mempelajari Fungsi Jendela di PySpark bisa jadi menantang, namun sepadan dengan usaha yang dilakukan. Fungsi Jendela adalah alat yang ampuh untuk menganalisis data dan dapat membantu Anda mendapatkan wawasan yang mungkin belum pernah Anda lihat sebelumnya. Dengan memahami cara menggunakan Fungsi Jendela di Spark; kamu bisa mengambil milikmu analisis data keterampilan ke tingkat berikutnya dan membuat keputusan yang lebih tepat. Baik Anda bekerja dengan skala besar atau kecil kumpulan data, mempelajari Fungsi Jendela di Spark akan memungkinkan Anda memanipulasi dan menganalisis data dengan cara baru dan menarik.

Fungsi Jendela di PySpark

Di blog ini, pertama-tama kita akan memahami konsep fungsi jendela dan kemudian membahas cara menggunakannya dengan Spark SQL dan PySpark DataFrame API. Sehingga di akhir artikel ini, Anda akan memahami cara menggunakan fungsi jendela dengan kumpulan data nyata dan mendapatkan wawasan penting untuk bisnis.

Tujuan Pembelajaran

  • Memahami konsep fungsi jendela.
  • Bekerja dengan fungsi jendela menggunakan kumpulan data.
  • Temukan wawasan menggunakan fungsi jendela.
  • Gunakan Spark SQL dan DataFrame API untuk bekerja dengan fungsi jendela.

Artikel ini diterbitkan sebagai bagian dari Blogathon Ilmu Data.

Daftar Isi

Apa itu Fungsi Jendela?

Fungsi jendela membantu menganalisis data dalam sekelompok baris yang terkait satu sama lain. Mereka memungkinkan pengguna untuk melakukan transformasi kompleks pada baris kerangka data atau kumpulan data yang terkait satu sama lain berdasarkan beberapa kriteria partisi dan pengurutan.

Fungsi jendela beroperasi pada partisi tertentu dari kerangka data atau kumpulan data yang ditentukan oleh sekumpulan kolom partisi. Itu DIPESAN OLEH klausa mempartisi data dalam fungsi jendela untuk mengaturnya dalam urutan tertentu. Fungsi jendela kemudian melakukan perhitungan pada jendela geser baris yang mencakup baris saat ini dan subset dari baris 'dan'/'atau' berikutnya, seperti yang ditentukan dalam bingkai jendela.

Bekerja dengan Fungsi Jendela di PySpark

Beberapa contoh umum fungsi jendela mencakup penghitungan rata-rata pergerakan, pemeringkatan, atau pengurutan baris berdasarkan kolom atau grup tertentu kolom, menghitung total berjalan, dan menemukan nilai pertama atau terakhir dalam sekelompok baris. Dengan fungsi jendela Spark yang canggih, pengguna dapat melakukan analisis dan agregasi yang kompleks dataset besar dengan relatif mudah, menjadikannya alat yang populer bagi kalangan besar pengolahan data dan analitik.

"

Fungsi Jendela di SQL

Spark SQL mendukung tiga jenis fungsi jendela:

  • Fungsi Pemeringkatan: - Fungsi-fungsi ini menetapkan peringkat untuk setiap baris dalam partisi kumpulan hasil. Misalnya, fungsi ROW_NUMBER() memberikan nomor urut unik untuk setiap baris dalam partisi.
  • Fungsi Analisis:- Fungsi-fungsi ini menghitung nilai agregat pada jendela baris. Misalnya, fungsi SUM() menghitung jumlah kolom pada jendela baris.
  • Fungsi Nilai: - Fungsi-fungsi ini menghitung nilai analitik untuk setiap baris dalam sebuah partisi, berdasarkan nilai baris lain dalam partisi yang sama. Misalnya, fungsi LAG() mengembalikan nilai kolom dari baris sebelumnya di partisi.

Pembuatan DataFrame

Kami akan membuat contoh kerangka data sehingga kami dapat bekerja secara praktis dengan fungsi jendela yang berbeda. Kami juga akan mencoba menjawab beberapa pertanyaan dengan bantuan data dan fungsi jendela ini.

Kerangka data memiliki detail karyawan seperti Nama, Jabatan, Nomor Karyawan, Tanggal Dipekerjakan, Gaji, dll. Total kami memiliki 8 kolom yaitu sebagai berikut:

  • 'empno' : Kolom ini berisi nomor pegawai.
  • 'ename': Kolom ini berisi nama karyawan.
  • 'pekerjaan': Kolom ini berisi informasi tentang jabatan karyawan.
  • 'hiredate': Kolom ini menunjukkan tanggal perekrutan karyawan.
  • 'sal': Rincian gaji terdapat di kolom ini.
  • 'comm': Kolom ini berisi rincian komisi karyawan, jika ada.
  • 'deptno': Nomor departemen tempat karyawan tersebut berada ada di kolom ini.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

Sekarang kita akan memeriksa skemanya:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

Buat tampilan sementara DataFrame 'emp_df' dengan nama “emp”. Ini memungkinkan kita untuk menanyakan DataFrame menggunakan sintaks SQL di Spark SQL seolah-olah itu adalah sebuah tabel. Tampilan sementara hanya valid selama durasi Sesi Spark.

emp_df.createOrReplaceTempView("emp")

Memecahkan Pernyataan Masalah Menggunakan Fungsi Jendela

Di sini kita akan menyelesaikan beberapa pernyataan masalah menggunakan fungsi windows:

Q1. Urutkan gaji dalam setiap departemen.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

Pendekatan untuk Kode PySpark

  • Fungsi Window mempartisi data berdasarkan nomor departemen menggunakan partisiBy(col('deptno')) dan kemudian mengurutkan data dalam setiap partisi berdasarkan gaji dalam urutan menurun menggunakan orderBy(col('sal').desc()). Variabel windowSpec menyimpan spesifikasi jendela akhir.
  • 'emp_df' adalah dataframe yang berisi data pegawai, termasuk kolom empno, ename, job, deptno dan sal.
  • Fungsi peringkat diterapkan ke kolom gaji menggunakan 'F.rank().over(windowSpec)' dalam pernyataan pilih. Kolom yang dihasilkan memiliki nama alias 'peringkat'.
  • Ini akan membuat kerangka data, 'ranking_result_df', yang mencakup empno, ename, pekerjaan, departemen, dan gaji. Ini juga memiliki kolom baru, 'peringkat', yang mewakili peringkat gaji karyawan dalam departemennya.

Keluaran:

Hasilnya memiliki peringkat gaji di setiap departemen.

Q2. Padat memberi peringkat gaji di setiap departemen.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

Pendekatan untuk Kode PySpark

  • Pertama, buat spesifikasi jendela menggunakan fungsi Window, yang mempartisi DataFrame 'emp_df' menurut deptno dan mengurutkannya dengan menurunkan kolom 'sal'.
  • Kemudian, fungsi dose_rank() diterapkan pada spesifikasi jendela, yang memberikan peringkat padat pada setiap baris dalam setiap partisi berdasarkan urutan pengurutannya.
  • Terakhir, DataFrame baru bernama 'dense_ranking_df' dibuat dengan memilih kolom tertentu dari emp_df (yaitu, 'empno', 'ename', 'job', 'deptno', dan 'sal') dan menambahkan kolom baru 'dense_rank' yang berisi nilai peringkat padat yang dihitung oleh fungsi jendela.
  • Terakhir, tampilkan DataFrame yang dihasilkan dalam format tabel.

Keluaran:

Hasilnya memiliki peringkat yang padat dari segi gaji.

Q3. Beri nomor pada baris dalam setiap departemen.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

Pendekatan untuk kode PySpark

  • Baris pertama mendefinisikan spesifikasi jendela untuk perhitungan menggunakan fungsi Window.partitionBy() dan Window.orderBy(). Jendela ini dipartisi oleh kolom deptno dan diurutkan berdasarkan kolom sal dalam urutan menurun.
  • Baris kedua membuat DataFrame baru bernama 'row_num_df', proyeksi 'emp_df' dengan kolom tambahan bernama 'row_num' dan berisi detail nomor baris.
  • Fungsi show() menampilkan DataFrame yang dihasilkan, yang memperlihatkan kolom empno, ename, job, deptno, sal, dan row_num setiap karyawan.

Keluaran:

Outputnya akan memiliki nomor baris setiap karyawan dalam departemennya berdasarkan gaji mereka.

Q4. Menjalankan jumlah total gaji di setiap departemen.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

Pendekatan untuk kode PySpark

  • Pertama, spesifikasi jendela ditentukan menggunakan metode “Window.partitionBy()” dan “Window.orderBy()”. Metode “partitionBy()” mempartisi data berdasarkan kolom “deptno”, sedangkan metode “orderBy()” mengurutkan data berdasarkan kolom “sal” dalam urutan menurun.
  • Selanjutnya, fungsi “sum()” diterapkan pada kolom “sal” menggunakan metode “over()” untuk menghitung total gaji berjalan di setiap departemen. Hasilnya akan berupa DataFrame baru bernama “running_sum_sal_df”, yang berisi kolom 'empno', 'ename', 'job', 'deptno', 'sal', dan 'running_total'.
  • Terakhir, metode “show()” dipanggil pada DataFrame “running_sum_sal_df” untuk menampilkan output kueri. DataFrame yang dihasilkan menunjukkan total gaji setiap karyawan dan detail lainnya seperti nama, nomor departemen, dan pekerjaan.

Keluaran:

Outputnya akan berisi total data gaji masing-masing departemen.

Q5: Gaji berikutnya di setiap departemen.

Untuk mencari gaji berikutnya di setiap departemen kami menggunakan fungsi LEAD. 

Fungsi jendela lead() membantu mendapatkan nilai ekspresi di baris berikutnya dari partisi jendela. Ini mengembalikan kolom untuk setiap kolom masukan, di mana setiap kolom akan berisi nilai kolom masukan untuk baris offset di atas baris saat ini dalam partisi jendela. Sintaks untuk fungsi lead adalah:- lead(col, offset=1, default=None).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

Pendekatan untuk kode PySpark

  • Pertama, fungsi jendela membantu mempartisi baris DataFrame berdasarkan nomor departemen (deptno) dan mengurutkan gaji dalam urutan menurun dalam setiap partisi.
  • Fungsi lead() kemudian diterapkan pada kolom 'sal' yang diurutkan dalam setiap partisi untuk mengembalikan gaji karyawan berikutnya (dengan offset 1), dan nilai defaultnya adalah 0 jika tidak ada karyawan berikutnya.
  • DataFrame 'next_salary_df' yang dihasilkan berisi kolom untuk nomor karyawan (empno), nama (ename), jabatan (job), nomor departemen (deptno), gaji saat ini (sal), dan gaji berikutnya (next_val).

Keluaran:

Outputnya berisi gaji karyawan berikutnya di departemen berdasarkan urutan gaji menurun. 

Q6. Gaji sebelumnya di setiap departemen.

Untuk menghitung gaji sebelumnya, kita menggunakan fungsi LAG.

Fungsi lag mengembalikan nilai ekspresi pada offset tertentu sebelum baris saat ini dalam partisi jendela. Sintaks fungsi lag adalah:- lag(expr, offset=1, default=None).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

Pendekatan untuk kode PySpark

  • window.partitionBy(col('deptno')) menentukan partisi jendela. Artinya fungsi jendela bekerja secara terpisah untuk setiap departemen.
  • Kemudian orderBy(col('sal').desc()) menentukan urutan gaji dan akan mengurutkan gaji di setiap departemen dalam urutan menurun.
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') membuat kolom baru bernama prev_val di DataFrame 'prev_sal_df'.
  • Untuk setiap baris, kolom ini berisi nilai kolom 'sal' dari baris sebelumnya dalam jendela yang ditentukan oleh windowSpec.
  • Parameter offset=1 menunjukkan bahwa baris sebelumnya harus satu baris sebelum baris saat ini, dan default=0 menentukan nilai default untuk baris pertama di setiap partisi (karena tidak ada baris sebelumnya untuk baris pertama).
  • Terakhir, prev_sal_df.show() menampilkan DataFrame yang dihasilkan.

Keluaran:

Outputnya mewakili gaji sebelumnya untuk setiap karyawan di setiap departemen, berdasarkan pengurutan gaji dalam urutan menurun.

Q7. Gaji Pertama di setiap departemen dan membandingkannya dengan setiap anggota di setiap departemen.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

Pendekatan untuk kode PySpark

  • Pertama, buat objek WindowSpec yang mempartisi data berdasarkan nomor departemen (deptno) dan mengurutkannya berdasarkan gaji (sal) dalam urutan menurun.
  • Kemudian terapkan fungsi analitik first() ke kolom 'sal' di atas jendela yang ditentukan oleh windowSpec. Fungsi ini mengembalikan nilai pertama kolom 'sal' dalam setiap partisi (yaitu setiap grup deptno) yang diurutkan berdasarkan 'sal' menurun. Kolom yang dihasilkan memiliki nama baru, 'first_val'.
  • Sekarang tetapkan DataFrame yang dihasilkan, yang berisi kolom yang dipilih dan kolom baru, 'first_val', yang menunjukkan gaji tertinggi pertama untuk setiap departemen berdasarkan urutan nilai gaji, ke variabel baru yang disebut 'first_value_df'.

Keluaran:

Outputnya menunjukkan gaji tertinggi pertama untuk setiap departemen di DataFrame karyawan.

Kesimpulan

Pada artikel ini, kita belajar tentang fungsi jendela. Spark SQL memiliki tiga jenis fungsi jendela: Fungsi peringkat, Fungsi agregat, dan Fungsi nilai. Dengan menggunakan fungsi ini, kami mengerjakan kumpulan data untuk menemukan beberapa wawasan penting dan berharga. Spark Window Functions menawarkan alat analisis data yang canggih seperti pemeringkatan, analitik, dan penghitungan nilai. Baik menganalisis wawasan gaji berdasarkan departemen atau menggunakan contoh praktis dengan PySpark & ​​SQL, fungsi-fungsi ini menyediakan alat penting untuk pemrosesan dan analisis data yang efektif di Spark.

Pengambilan Kunci

  • Kami mempelajari tentang fungsi jendela dan mengerjakannya menggunakan Spark SQL dan PySpark DataFrame API.
  • Kami menggunakan fungsi seperti peringkat, peringkat_padat, nomor_baris, lag, prospek, grupBy, partisiBy, dan fungsi lainnya untuk memberikan analisis yang tepat.
  • Kita juga telah melihat solusi langkah demi langkah yang terperinci untuk masalah tersebut dan menganalisis keluaran di akhir setiap pernyataan masalah.

Studi kasus ini membantu Anda lebih memahami fungsi PySpark. Jika Anda memiliki pendapat atau pertanyaan, beri komentar di bawah. Terhubung dengan saya LinkedIn untuk diskusi lebih lanjut. Terus belajar!!!

Media yang ditampilkan dalam artikel ini bukan milik Analytics Vidhya dan digunakan atas kebijaksanaan Penulis.

tempat_img

Intelijen Terbaru

tempat_img