Zephyrnet Logosu

PySpark'ta Pencere İşlevleriyle Çalışmak

Tarih:

Giriş

PySpark'ta Pencere İşlevlerini öğrenmek zor olabilir ancak çabaya değer. Pencere İşlevleri, verileri analiz etmek için güçlü bir araçtır ve başka türlü göremeyebileceğiniz içgörüler elde etmenize yardımcı olabilir. Spark'ta Pencere Fonksiyonlarının nasıl kullanılacağını anlayarak; seninkini alabilirsin veri analizi becerilerini bir sonraki seviyeye taşıyın ve daha bilinçli kararlar alın. İster büyük ister küçük parçalarla çalışıyor olun veri kümeleriSpark'ta Pencere İşlevlerini öğrenmek, verileri yeni ve heyecan verici yollarla değiştirmenize ve analiz etmenize olanak tanır.

PySpark'ta Pencere İşlevleri

Bu blogda öncelikle pencere fonksiyonlarının kavramını anlayacağız ve ardından bunların Spark SQL ve PySpark DataFrame API ile nasıl kullanılacağını tartışacağız. Böylece bu makalenin sonunda pencere işlevlerini gerçek veri kümeleriyle nasıl kullanacağınızı anlayacak ve iş için gerekli içgörüleri elde edeceksiniz.

Öğrenme hedefleri

  • Pencere fonksiyonları kavramını anlayın.
  • Veri kümelerini kullanarak pencere işlevleriyle çalışma.
  • Pencere işlevlerini kullanarak öngörüleri öğrenin.
  • Pencere işlevleriyle çalışmak için Spark SQL ve DataFrame API'yi kullanın.

Bu makale, Veri Bilimi Blogatonu.

İçindekiler

Pencere İşlevleri Nelerdir?

Pencere işlevleri, birbiriyle ilişkili bir grup satır içindeki verilerin analiz edilmesine yardımcı olur. Kullanıcıların, bir veri çerçevesinin veya veri kümesinin birbiriyle ilişkili satırları üzerinde bazı bölümleme ve sıralama kriterlerine göre karmaşık dönüşümler gerçekleştirmesine olanak tanır.

Pencere işlevleri, bir dizi bölümleme sütunu tarafından tanımlanan bir veri çerçevesinin veya veri kümesinin belirli bir bölümü üzerinde çalışır. TARAFINDAN SİPARİŞ cümlesi, verileri belirli bir sıraya göre düzenlemek için bir pencere işlevinde bölümlere ayırır. Pencere işlevleri daha sonra, pencere çerçevesinde belirtildiği gibi, geçerli satırı ve önceki 've'/' veya sonraki satırların bir alt kümesini içeren kayan bir satır penceresinde hesaplamalar gerçekleştirir.

PySpark'ta Pencere İşlevleriyle Çalışmak

Pencere fonksiyonlarının bazı yaygın örnekleri, hareketli ortalamaların hesaplanmasını, satırların belirli bir sütuna veya gruba göre sıralanmasını veya sıralanmasını içerir. sütunlar, değişen toplamları hesaplama ve bir grup satırdaki ilk veya son değeri bulma. Spark'ın güçlü pencere işlevleriyle kullanıcılar karmaşık analizler ve toplamalar gerçekleştirebilir. büyük veri setleri nispeten kolay olması onu büyük şirketler için popüler bir araç haline getiriyor veri işleme ve analitik.

"

SQL'de Pencere İşlevleri

Spark SQL üç tür pencere işlevini destekler:

  • Sıralama İşlevleri: - Bu işlevler, sonuç kümesinin bir bölümündeki her satıra bir sıra atar. Örneğin ROW_NUMBER() işlevi, bölüm içindeki her satıra benzersiz bir sıralı sayı verir.
  • Analitik İşlevleri: - Bu işlevler, toplam değerleri bir satır penceresi üzerinden hesaplar. Örneğin, SUM() işlevi, bir satır penceresindeki bir sütunun toplamını hesaplar.
  • Değer Fonksiyonları: - Bu işlevler, aynı bölümdeki diğer satırların değerlerine dayalı olarak bölümdeki her satır için analitik bir değer hesaplar. Örneğin, LAG() işlevi bölümdeki önceki satırdaki bir sütunun değerini döndürür.

DataFrame Oluşturma

Farklı pencere fonksiyonlarıyla pratik olarak çalışabilmemiz için örnek bir veri çerçevesi oluşturacağız. Ayrıca bu veriler ve pencere fonksiyonları yardımıyla bazı soruları cevaplamaya çalışacağız.

Veri çerçevesinde çalışanların Adı, Unvanı, Çalışan Numarası, İşe Alınma Tarihi, Maaş vb. gibi ayrıntılar bulunur. Toplamda aşağıdaki gibi 8 sütunumuz vardır:

  • 'empno': Bu sütun çalışanın numarasını içerir.
  • 'ename': Bu sütunda çalışan adları bulunur.
  • 'iş': Bu sütun çalışanların iş unvanlarına ilişkin bilgileri içerir.
  • 'işe alınma tarihi': Bu sütun, çalışanın işe alınma tarihini gösterir.
  • 'sal': Maaş detayları bu sütunda yer almaktadır.
  • 'comm': Bu sütunda, varsa çalışan komisyon ayrıntıları bulunur.
  • 'deptno': Çalışanın ait olduğu departman numarası bu sütunda yer alır.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

Şimdi şemayı kontrol edeceğiz:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

DataFrame 'emp_df'nin "emp" adıyla geçici bir görünümünü oluşturun. DataFrame'i Spark SQL'de SQL sözdizimini kullanarak sanki bir tablomuş gibi sorgulamamıza olanak tanır. Geçici görünüm yalnızca Spark Oturumu süresince geçerlidir.

emp_df.createOrReplaceTempView("emp")

Pencere İşlevlerini Kullanarak Sorun İfadelerini Çözme

Burada Windows işlevlerini kullanarak birkaç problem ifadesini çözeceğiz:

S1. Her departmandaki maaşı sıralayın.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

PySpark Koduna Yaklaşım

  • Window işlevi, partitionBy(col('deptno')) kullanarak verileri departman numarasına göre bölümlere ayırır ve daha sonra orderBy(col('sal').desc()) kullanarak her bölüm içindeki verileri maaşa göre azalan sırada sıralar. windowSpec değişkeni son pencere spesifikasyonunu tutar.
  • 'emp_df', empno, ename, job, deptno ve sal sütunları dahil olmak üzere çalışan verilerini içeren veri çerçevesidir.
  • Sıralama işlevi, select ifadesi içindeki 'F.rank().over(windowSpec)' kullanılarak maaş sütununa uygulanır. Ortaya çıkan sütunun 'rank' olarak bir takma adı vardır.
  • Emno, ename, iş, borç ve maaşı içeren bir 'ranking_result_df' veri çerçevesi oluşturacaktır. Ayrıca, çalışanın kendi departmanındaki maaşının sırasını temsil eden yeni bir 'rütbe' sütunu da vardır.

Çıktı:

Sonuç, her departmanda maaş sıralamasına sahiptir.

Q2. Her departmandaki maaşı yoğun bir şekilde sıralayın.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

PySpark Koduna Yaklaşım

  • Öncelikle, 'emp_df' DataFrame'i derinlik bazında bölümleyen ve onu 'sal' sütunundan aşağıya doğru sıralayan Window işlevini kullanarak bir pencere spesifikasyonu oluşturun.
  • Daha sonra, pencere spesifikasyonu üzerine, her bölümdeki her satıra, sıralama düzenine göre yoğun bir sıralama atayan, yoğun_rank() işlevi uygulanır.
  • Son olarak, emp_df'den belirli sütunlar (örneğin, 'empno', 'ename', 'job', 'deptno' ve 'sal') seçilerek ve 'dense_rank' adlı yeni bir sütun eklenerek 'dense_ranking_df' adı verilen yeni bir DataFrame oluşturulur. pencere fonksiyonu tarafından hesaplanan yoğun sıralama değerlerini içerir.
  • Son olarak, ortaya çıkan DataFrame'i tablo biçiminde görüntüleyin.

Çıktı:

Sonuç, maaş açısından yoğun bir sıralamaya sahiptir.

S3. Her departmanın içindeki satırı numaralandırın.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

PySpark koduna yaklaşım

  • İlk satır, Window.partitionBy() ve Window.orderBy() işlevlerini kullanan hesaplamaya yönelik bir pencere spesifikasyonunu tanımlar. Bu pencere, deptno sütununa göre bölümlenir ve sal sütununa göre azalan sırada sıralanır.
  • İkinci satır, 'row_num_df' adında yeni bir DataFrame oluşturur, 'emp_df'nin 'row_num' adlı ek bir sütunla izdüşümünü oluşturur ve satır numaraları ayrıntılarını içerir.
  • show() işlevi, her çalışanın empno, ename, job, deptno, sal ve row_num sütunlarını gösteren sonuçta ortaya çıkan DataFrame'i görüntüler.

Çıktı:

Çıktı, maaşlarına göre kendi departmanındaki her çalışanın satır numarasını içerecektir.

S4. Her departmandaki toplam maaş toplamının çalıştırılması.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

Yaklaşım PySpark kodu için

  • İlk olarak “Window.partitionBy()” ve “Window.orderBy()” yöntemleri kullanılarak bir pencere spesifikasyonu tanımlanır. "partitionBy()" yöntemi, verileri "deptno" sütununa göre bölerken, "orderBy()" yöntemi, verileri "sal" sütununa göre azalan sırada sıralar.
  • Daha sonra, her departmandaki maaşların toplamını hesaplamak için “over()” yöntemi kullanılarak “sal” sütununa “sum()” fonksiyonu uygulanır. Sonuç, 'empno', 'ename', 'job', 'deptno', 'sal' ve 'running_total' sütunlarını içeren "running_sum_sal_df" adlı yeni bir DataFrame'de olacaktır.
  • Son olarak sorgunun çıktısını görüntülemek için “running_sum_sal_df” DataFrame üzerinde “show()” metodu çağrılır. Ortaya çıkan DataFrame, her çalışanın toplam maaşını ve adı, departman numarası ve işi gibi diğer ayrıntılarını gösterir.

Çıktı:

Çıktı, her departmanın maaş verilerinin toplamını içerecektir.

S5: Her departmandaki bir sonraki maaş.

Her departmandaki bir sonraki maaşı bulmak için LEAD fonksiyonunu kullanıyoruz. 

Lead() pencere işlevi, pencere bölümünün bir sonraki satırındaki ifadenin değerinin alınmasına yardımcı olur. Her giriş sütunu için bir sütun döndürür; burada her sütun, pencere bölümündeki geçerli satırın üzerindeki uzaklık satırının giriş sütununun değerini içerecektir. Lead işlevinin sözdizimi şöyledir: - lead(sütun, ofset=1, varsayılan=Yok).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

PySpark koduna yaklaşım

  • İlk olarak, pencere işlevi DataFrame'in satırlarını departman numarasına (deptno) göre bölümlemeye ve maaşları her bölüm içinde azalan sırada sıralamaya yardımcı olur.
  • Daha sonra lead() işlevi, sonraki çalışanın maaşını (1 ofseti ile) döndürmek için her bölüm içindeki sıralı 'sal' sütununa uygulanır ve sonraki çalışanın olmaması durumunda varsayılan değer 0'dır.
  • Ortaya çıkan DataFrame 'next_salary_df', çalışan numarası (empno), isim (ename), iş unvanı (iş), departman numarası (deptno), mevcut maaş (sal) ve sonraki maaş (next_val) için sütunlar içerir.

Çıktı:

Çıktı, azalan maaş sırasına göre departmandaki bir sonraki çalışanın maaşını içerir. 

S6. Her departmandaki önceki maaş.

Önceki maaşı hesaplamak için LAG fonksiyonunu kullanıyoruz.

Gecikme işlevi, pencere bölümündeki geçerli satırdan önceki belirli bir konumdaki bir ifadenin değerini döndürür. Gecikme fonksiyonunun sözdizimi şöyledir: - lag(ifade, ofset=1, varsayılan=Yok).over(windowSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

PySpark koduna yaklaşım

  • window.partitionBy(col('deptno')) pencere bölümünü belirtir. Bu, pencere fonksiyonunun her departman için ayrı ayrı çalıştığı anlamına gelir.
  • Daha sonra orderBy(col('sal').desc()) maaşın sırasını belirtir ve her departmandaki maaşları azalan sırada sıralar.
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'), DataFrame 'prev_sal_df' içinde prev_val adında yeni bir sütun oluşturur.
  • Her satır için bu sütun, windowSpec tarafından tanımlanan pencere içindeki önceki satırdaki 'sal' sütununun değerini içerir.
  • offset=1 parametresi, önceki satırın geçerli satırdan bir satır önce olması gerektiğini belirtir ve default=0, her bölümdeki ilk satır için varsayılan değeri belirtir (ilk satır için önceki satır olmadığından).
  • Son olarak prev_sal_df.show(), elde edilen DataFrame'i görüntüler.

Çıktı:

Çıktı, maaşların azalan sıraya göre sıralanmasına dayalı olarak her departmandaki her çalışanın önceki maaşını temsil eder.

S7. Her departmandaki İlk Maaş ve her departmandaki her üyeyle karşılaştırma.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

PySpark koduna yaklaşım

  • İlk olarak, verileri departman numarasına (deptno) göre bölümleyen ve maaşa (sal) göre azalan sırada sıralayan bir WindowSpec nesnesi oluşturun.
  • Daha sonra, windowSpec tarafından tanımlanan pencere üzerindeki 'sal' sütununa ilk() analitik fonksiyonunu uygular. Bu işlev, her bölüm içindeki (yani her deptno grubu) 'sal' sütununun ilk değerini 'sal'dan azalarak sıralayarak döndürür. Ortaya çıkan sütunun yeni bir adı var: 'first_val'.
  • Artık, seçilen sütunları ve maaş değerlerinin azalan sırasına göre her departman için ilk en yüksek maaşı gösteren yeni bir sütun olan 'first_val'ı içeren sonuçta ortaya çıkan DataFrame'i 'first_value_df' adı verilen yeni bir değişkene atar.

Çıktı:

Çıktı, bir çalışanın DataFrame'indeki her departman için ilk en yüksek maaşı gösterir.

Sonuç

Bu yazımızda pencere fonksiyonlarını öğreneceğiz. Spark SQL'de üç tür pencere işlevi vardır: Sıralama işlevleri, Toplama işlevleri ve Değer işlevleri. Bu işlevi kullanarak bazı önemli ve değerli bilgiler bulmak için bir veri kümesi üzerinde çalıştık. Spark Window Functions sıralama, analiz ve değer hesaplamaları gibi güçlü veri analizi araçları sunar. İster departmanlara göre maaş öngörülerini analiz edin ister PySpark ve SQL ile pratik örnekler kullanın, bu işlevler Spark'ta etkili veri işleme ve analiz için temel araçları sağlar.

Önemli Noktalar

  • Pencere fonksiyonlarını öğrendik ve Spark SQL ve PySpark DataFrame API'yi kullanarak onlarla çalıştık.
  • Doğru analiz sağlamak için rütbe, yoğun_rank, satır_numarası, gecikme, kurşun, groupBy, partitionBy gibi işlevleri ve diğer işlevleri kullanırız.
  • Ayrıca sorunun ayrıntılı adım adım çözümlerini gördük ve her sorun ifadesinin sonundaki çıktıyı analiz ettik.

Bu örnek olay çalışması PySpark fonksiyonlarını daha iyi anlamanıza yardımcı olur. Herhangi bir fikriniz veya sorunuz varsa aşağıya yorum yapın. Benimle iletişime geç  LinkedIn daha fazla tartışma için. Öğrenmeye devam et!!!

Bu makalede gösterilen medya Analytics Vidhya'ya ait değildir ve Yazarın takdirine bağlı olarak kullanılır.

spot_img

En Son İstihbarat

spot_img