Zephyrnet লোগো

PySpark এ উইন্ডো ফাংশনগুলির সাথে কাজ করা

তারিখ:

ভূমিকা

PySpark এ উইন্ডো ফাংশন সম্পর্কে শেখা চ্যালেঞ্জিং কিন্তু প্রচেষ্টার মূল্য হতে পারে। উইন্ডো ফাংশনগুলি ডেটা বিশ্লেষণের জন্য একটি শক্তিশালী হাতিয়ার এবং আপনাকে এমন অন্তর্দৃষ্টি পেতে সাহায্য করতে পারে যা আপনি অন্যথায় দেখেননি। স্পার্ক-এ উইন্ডো ফাংশন কীভাবে ব্যবহার করবেন তা বোঝার মাধ্যমে; আপনি আপনার নিতে পারেন তথ্য বিশ্লেষণ পরবর্তী স্তরে দক্ষতা এবং আরও জ্ঞাত সিদ্ধান্ত নিতে। আপনি বড় বা ছোট সঙ্গে কাজ করছেন কিনা ডেটাসেট, স্পার্ক-এ উইন্ডো ফাংশন শেখা আপনাকে নতুন এবং উত্তেজনাপূর্ণ উপায়ে ডেটা ম্যানিপুলেট এবং বিশ্লেষণ করার অনুমতি দেবে।

PySpark এ উইন্ডো ফাংশন

এই ব্লগে, আমরা প্রথমে উইন্ডো ফাংশনগুলির ধারণাটি বুঝব এবং তারপরে স্পার্ক SQL এবং PySpark DataFrame API এর সাথে কীভাবে সেগুলি ব্যবহার করতে হয় তা নিয়ে আলোচনা করব। যাতে এই নিবন্ধের শেষে, আপনি বাস্তব ডেটাসেটের সাথে উইন্ডো ফাংশনগুলি কীভাবে ব্যবহার করবেন এবং ব্যবসার জন্য প্রয়োজনীয় অন্তর্দৃষ্টি পাবেন তা বুঝতে পারবেন।

শিক্ষার উদ্দেশ্য

  • উইন্ডো ফাংশন ধারণা বুঝতে.
  • ডেটাসেট ব্যবহার করে উইন্ডো ফাংশনগুলির সাথে কাজ করা।
  • উইন্ডো ফাংশন ব্যবহার করে অন্তর্দৃষ্টি খুঁজে বের করুন.
  • উইন্ডো ফাংশনগুলির সাথে কাজ করতে স্পার্ক SQL এবং DataFrame API ব্যবহার করুন।

এই নিবন্ধটি একটি অংশ হিসাবে প্রকাশিত হয়েছিল ডেটা সায়েন্স ব্লগাথন।

সুচিপত্র

উইন্ডো ফাংশন কি?

উইন্ডো ফাংশনগুলি একে অপরের সাথে সম্পর্কিত সারিগুলির একটি গ্রুপের মধ্যে ডেটা বিশ্লেষণ করতে সহায়তা করে। তারা ব্যবহারকারীদের কিছু বিভাজন এবং অর্ডারের মানদণ্ডের ভিত্তিতে একে অপরের সাথে যুক্ত ডেটাফ্রেম বা ডেটাসেটের সারিগুলিতে জটিল রূপান্তর করতে সক্ষম করে।

উইন্ডো ফাংশনগুলি পার্টিশন কলামগুলির একটি সেট দ্বারা সংজ্ঞায়িত ডেটাফ্রেম বা ডেটাসেটের একটি নির্দিষ্ট পার্টিশনে কাজ করে। দ্য আদেশ দ্বারা clause একটি উইন্ডো ফাংশনে ডেটাকে একটি নির্দিষ্ট ক্রমে সাজানোর জন্য পার্টিশন করে। উইন্ডো ফাংশন তারপরে সারিগুলির একটি স্লাইডিং উইন্ডোতে গণনা সম্পাদন করে যাতে বর্তমান সারি এবং পূর্ববর্তী 'এবং'/'অথবা' নিম্নলিখিত সারিগুলির একটি উপসেট অন্তর্ভুক্ত থাকে, যেমন উইন্ডো ফ্রেমে উল্লেখ করা হয়েছে।

PySpark এ উইন্ডো ফাংশনগুলির সাথে কাজ করা

উইন্ডো ফাংশনের কিছু সাধারণ উদাহরণের মধ্যে রয়েছে চলমান গড় গণনা করা, নির্দিষ্ট কলাম বা গোষ্ঠীর উপর ভিত্তি করে সারি বাছাই করা কলাম, চলমান মোট গণনা করা, এবং সারিগুলির একটি গ্রুপে প্রথম বা শেষ মান খুঁজে বের করা। স্পার্কের শক্তিশালী উইন্ডো ফাংশনগুলির সাথে, ব্যবহারকারীরা জটিল বিশ্লেষণ এবং একত্রিতকরণ করতে পারে বড় ডেটাসেট আপেক্ষিক স্বাচ্ছন্দ্যের সাথে, এটি বড়দের জন্য একটি জনপ্রিয় হাতিয়ার করে তোলে তথ্য প্রক্রিয়াজাতকরণ এবং বিশ্লেষণ।

"

SQL এ উইন্ডো ফাংশন

স্পার্ক এসকিউএল তিন ধরনের উইন্ডো ফাংশন সমর্থন করে:

  • র‌্যাঙ্কিং ফাংশন:- এই ফাংশনগুলি ফলাফল সেটের একটি পার্টিশনের মধ্যে প্রতিটি সারিতে একটি র‌্যাঙ্ক নির্ধারণ করে। উদাহরণস্বরূপ, ROW_NUMBER() ফাংশন পার্টিশনের মধ্যে প্রতিটি সারিতে একটি অনন্য ক্রমিক সংখ্যা দেয়।
  • বিশ্লেষণ ফাংশন:- এই ফাংশনগুলি সারিগুলির একটি উইন্ডোতে সমষ্টিগত মানগুলি গণনা করে। উদাহরণস্বরূপ, SUM() ফাংশন সারিগুলির একটি উইন্ডোতে একটি কলামের যোগফল গণনা করে।
  • মান ফাংশন:- এই ফাংশনগুলি একই পার্টিশনের অন্যান্য সারির মানগুলির উপর ভিত্তি করে একটি পার্টিশনের প্রতিটি সারির জন্য একটি বিশ্লেষণাত্মক মান গণনা করে। উদাহরণস্বরূপ, LAG() ফাংশন পার্টিশনের পূর্ববর্তী সারি থেকে একটি কলামের মান প্রদান করে।

ডেটাফ্রেম তৈরি

আমরা একটি নমুনা ডেটাফ্রেম তৈরি করব যাতে আমরা কার্যত বিভিন্ন উইন্ডো ফাংশনের সাথে কাজ করতে পারি। এছাড়াও আমরা এই ডেটা এবং উইন্ডো ফাংশনগুলির সাহায্যে কিছু প্রশ্নের উত্তর দেওয়ার চেষ্টা করব।

ডেটাফ্রেমে কর্মচারীদের বিশদ বিবরণ রয়েছে যেমন তাদের নাম, পদবী, কর্মচারী নম্বর, নিয়োগের তারিখ, বেতন ইত্যাদি। আমাদের মোট 8টি কলাম রয়েছে যা নিম্নরূপ:

  • 'empno': এই কলামে কর্মচারীর নম্বর রয়েছে।
  • 'ename': এই কলামে কর্মচারীর নাম আছে।
  • 'জব': এই কলামে কর্মচারীদের চাকরির শিরোনাম সম্পর্কে তথ্য রয়েছে।
  • 'hiredate': এই কলামটি কর্মচারীর নিয়োগের তারিখ দেখায়।
  • 'sal': বেতনের বিবরণ এই কলামে রয়েছে।
  • 'comm': এই কলামে কর্মচারী কমিশনের বিবরণ আছে, যদি থাকে।
  • 'deptno': কর্মচারী যে বিভাগের নম্বর এই কলামে রয়েছে।
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

এখন আমরা স্কিমা পরীক্ষা করব:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

DataFrame 'emp_df'-এর একটি অস্থায়ী ভিউ তৈরি করুন যার নাম “emp”। এটি আমাদেরকে স্পার্ক এসকিউএল-এ এসকিউএল সিনট্যাক্স ব্যবহার করে ডেটাফ্রেমকে প্রশ্ন করার অনুমতি দেয় যেন এটি একটি টেবিল। অস্থায়ী দৃশ্য শুধুমাত্র স্পার্ক সেশনের সময়কালের জন্য বৈধ।

emp_df.createOrReplaceTempView("emp")

উইন্ডো ফাংশন ব্যবহার করে সমস্যার বিবৃতি সমাধান করা

এখানে আমরা উইন্ডোজ ফাংশন ব্যবহার করে বেশ কয়েকটি সমস্যা বিবৃতি সমাধান করব:

প্রশ্ন ১. প্রতিটি বিভাগের মধ্যে বেতন র্যাঙ্ক.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

PySpark কোডের জন্য পদ্ধতি

  • উইন্ডো ফাংশন partitionBy(col('deptno')) ব্যবহার করে ডিপার্টমেন্ট নম্বর অনুসারে ডেটা বিভাজন করে এবং তারপর orderBy(col('sal').desc() ব্যবহার করে নিচের ক্রমে বেতন অনুসারে প্রতিটি পার্টিশনের মধ্যে ডেটা অর্ডার করে। পরিবর্তনশীল windowSpec চূড়ান্ত উইন্ডো স্পেসিফিকেশন ধারণ করে।
  • 'emp_df' হল ডেটাফ্রেম যা কর্মচারীর ডেটা ধারণ করে, যার মধ্যে empno, ename, job, deptno এবং sal এর জন্য কলাম রয়েছে।
  • র‌্যাঙ্ক ফাংশনটি সিলেক্ট স্টেটমেন্টের মধ্যে 'F.rank(.over(windowSpec)' ব্যবহার করে বেতন কলামে প্রয়োগ করা হয়। ফলস্বরূপ কলামে 'র্যাঙ্ক' হিসাবে একটি উপনাম আছে।
  • এটি একটি ডেটাফ্রেম তৈরি করবে, 'র্যাঙ্কিং_রেজাল্ট_ডিএফ', যার মধ্যে রয়েছে empno, ename, job, deptno এবং বেতন। এটিতে একটি নতুন কলাম রয়েছে, 'র্যাঙ্ক', যা তাদের বিভাগের মধ্যে কর্মচারীর বেতনের পদমর্যাদার প্রতিনিধিত্ব করে।

আউটপুট:

ফলাফল প্রতিটি বিভাগে বেতন পদমর্যাদা আছে.

প্রশ্ন ২. প্রতিটি বিভাগের মধ্যে বেতনের ঘনত্ব।

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

PySpark কোডের জন্য পদ্ধতি

  • প্রথমে, উইন্ডো ফাংশন ব্যবহার করে একটি উইন্ডো স্পেসিফিকেশন তৈরি করুন, যা deptno দ্বারা 'emp_df' ডেটাফ্রেমকে পার্টিশন করে এবং 'সাল' কলামে নেমে অর্ডার দেয়।
  • তারপর, dense_rank() ফাংশনটি উইন্ডো স্পেসিফিকেশনের উপর প্রয়োগ করা হয়, যা প্রতিটি পার্টিশনের মধ্যে প্রতিটি সারিকে তার সাজানো ক্রম অনুসারে একটি ঘন র‌্যাঙ্ক বরাদ্দ করে।
  • অবশেষে, emp_df (যেমন, 'empno', 'ename', 'job', 'deptno', এবং 'sal') থেকে নির্দিষ্ট কলাম নির্বাচন করে এবং একটি নতুন কলাম 'dense_rank' যোগ করে 'dense_ranking_df' নামে একটি নতুন ডেটাফ্রেম তৈরি করা হয়। উইন্ডো ফাংশন দ্বারা গণনা করা ঘন র‌্যাঙ্কিং মান রয়েছে।
  • অবশেষে, সারণী বিন্যাসে ফলস্বরূপ ডেটাফ্রেম প্রদর্শন করুন।

আউটপুট:

ফলাফল একটি বেতন-ভিত্তিক ঘন র্যাঙ্ক আছে.

Q3. প্রতিটি বিভাগের মধ্যে সারি সংখ্যা.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

PySpark কোডের জন্য পদ্ধতি

  • প্রথম লাইনটি Window.partitionBy() এবং Window.orderBy() ফাংশন ব্যবহার করে গণনার জন্য একটি উইন্ডো স্পেসিফিকেশন সংজ্ঞায়িত করে। এই উইন্ডোটি deptno কলাম দ্বারা বিভাজন করা হয় এবং সাল কলাম দ্বারা অবরোহ ক্রমে ক্রম করা হয়।
  • দ্বিতীয় লাইনটি 'row_num_df' নামে একটি নতুন DataFrame তৈরি করে, 'emp_df'-এর একটি অভিক্ষেপ যার নাম 'row_num' নামে একটি অতিরিক্ত কলাম এবং এতে সারি সংখ্যার বিবরণ রয়েছে।
  • show() ফাংশন ফলস্বরূপ ডেটাফ্রেম প্রদর্শন করে, যা প্রতিটি কর্মীর empno, ename, job, deptno, sal এবং row_num কলাম দেখায়।

আউটপুট:

আউটপুটে তাদের বেতনের উপর ভিত্তি করে তাদের বিভাগের মধ্যে প্রতিটি কর্মচারীর সারি নম্বর থাকবে।

Q4. প্রতিটি বিভাগের মধ্যে বেতনের মোট যোগফল চলছে।

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

অভিগমন PySpark কোডের জন্য

  • প্রথমত, একটি উইন্ডো স্পেসিফিকেশন “Window.partitionBy()” এবং “Window.orderBy()” পদ্ধতি ব্যবহার করে সংজ্ঞায়িত করা হয়। "partitionBy()" পদ্ধতিটি "deptno" কলাম দ্বারা ডেটাকে বিভাজন করে, যখন "orderBy()" পদ্ধতিটি "sal" কলাম দ্বারা নিম্নোক্ত ক্রমে ডেটা অর্ডার করে।
  • এরপর, প্রতিটি বিভাগের মধ্যে চলমান মোট বেতন গণনা করতে "ওভার()" পদ্ধতি ব্যবহার করে "সাল" কলামে "সমষ্টি()" ফাংশন প্রয়োগ করা হয়। ফলাফলটি "running_sum_sal_df" নামে একটি নতুন ডেটাফ্রেমে আসবে, যেখানে 'empno', 'ename', 'job', 'deptno', 'sal', এবং 'running_total' কলাম রয়েছে।
  • অবশেষে, কোয়েরির আউটপুট প্রদর্শনের জন্য "রানিং_সম_সাল_ডিএফ" ডেটাফ্রেমে "শো()" পদ্ধতিটি কল করা হয়। ফলস্বরূপ ডেটাফ্রেম প্রতিটি কর্মচারীর চলমান মোট বেতন এবং অন্যান্য বিবরণ যেমন নাম, বিভাগ নম্বর এবং চাকরি দেখায়।

আউটপুট:

আউটপুটে প্রতিটি বিভাগের বেতন ডেটার একটি চলমান মোট থাকবে।

প্রশ্ন 5: প্রতিটি বিভাগের মধ্যে পরবর্তী বেতন।

প্রতিটি বিভাগের মধ্যে পরবর্তী বেতন খুঁজে পেতে আমরা LEAD ফাংশন ব্যবহার করি। 

লিড() উইন্ডো ফাংশন উইন্ডো পার্টিশনের পরবর্তী সারিতে এক্সপ্রেশনের মান পেতে সাহায্য করে। এটি প্রতিটি ইনপুট কলামের জন্য একটি কলাম প্রদান করে, যেখানে প্রতিটি কলামে উইন্ডো পার্টিশনের মধ্যে বর্তমান সারির উপরে অফসেট সারির জন্য ইনপুট কলামের মান থাকবে। লিড ফাংশনের জন্য সিনট্যাক্স হল:- লিড(কল, অফসেট=1, ডিফল্ট=কোনও নয়)।

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

PySpark কোডের জন্য পদ্ধতি

  • প্রথমত, উইন্ডো ফাংশন ডেটাফ্রেমের সারিগুলিকে ডিপার্টমেন্ট নম্বর (deptno) দ্বারা বিভাজন করতে সাহায্য করে এবং প্রতিটি পার্টিশনের মধ্যে নিচের ক্রমে বেতন অর্ডার করে।
  • নিম্নলিখিত কর্মচারীর বেতন ফেরত দেওয়ার জন্য প্রতিটি পার্টিশনের মধ্যে নির্দেশিত 'সাল' কলামে লিড() ফাংশন প্রয়োগ করা হয় (1 এর অফসেট সহ), এবং পরবর্তী কর্মচারী না থাকলে ডিফল্ট মান 0 হয়।
  • ফলস্বরূপ ডেটাফ্রেম 'next_salary_df'-এ কর্মচারী নম্বর (empno), নাম (ename), চাকরির শিরোনাম (চাকরি), বিভাগ নম্বর (deptno), বর্তমান বেতন (sal), এবং পরবর্তী বেতন (next_val) এর জন্য কলাম রয়েছে।

আউটপুট:

আউটপুটে নিচের বেতনের ক্রম অনুসারে বিভাগের পরবর্তী কর্মচারীর বেতন থাকে। 

প্রশ্ন ৬. প্রতিটি বিভাগের মধ্যে আগের বেতন।

পূর্ববর্তী বেতন গণনা করতে, আমরা LAG ফাংশন ব্যবহার করি।

ল্যাগ ফাংশন উইন্ডো পার্টিশনের মধ্যে বর্তমান সারির আগে প্রদত্ত অফসেটে একটি এক্সপ্রেশনের মান প্রদান করে। ল্যাগ ফাংশনের সিনট্যাক্স হল:- lag(expr, offset=1, default=None).over(windowSpec)।

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

PySpark কোডের জন্য পদ্ধতি

  • window.partitionBy(col('deptno')) উইন্ডো পার্টিশন নির্দিষ্ট করে। এর মানে হল যে উইন্ডো ফাংশন প্রতিটি বিভাগের জন্য আলাদাভাবে কাজ করে।
  • তারপর OrderBy(col('sal').desc()) বেতনের ক্রম নির্দিষ্ট করে এবং প্রতিটি বিভাগের মধ্যে বেতন নিম্নক্রম অনুসারে অর্ডার করবে।
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') ডেটাফ্রেম 'prev_sal_df'-এ prev_val নামে একটি নতুন কলাম তৈরি করে।
  • প্রতিটি সারির জন্য, এই কলামে windowSpec দ্বারা সংজ্ঞায়িত উইন্ডোর মধ্যে পূর্ববর্তী সারির 'সাল' কলামের মান রয়েছে।
  • অফসেট=1 প্যারামিটার নির্দেশ করে যে পূর্ববর্তী সারিটি বর্তমান সারির আগে একটি সারি হওয়া উচিত, এবং default=0 প্রতিটি পার্টিশনে প্রথম সারির জন্য ডিফল্ট মান নির্দিষ্ট করে (যেহেতু প্রথম সারির জন্য কোনো পূর্ববর্তী সারি নেই)।
  • অবশেষে, prev_sal_df.show() ফলে ডেটাফ্রেম প্রদর্শন করে।

আউটপুট:

আউটপুট প্রতিটি বিভাগের মধ্যে প্রতিটি কর্মচারীর জন্য পূর্ববর্তী বেতনের প্রতিনিধিত্ব করে, বেতনের ক্রমক্রমের উপর ভিত্তি করে।

প্রশ্ন ৭. প্রতিটি বিভাগের মধ্যে প্রথম বেতন এবং প্রতিটি বিভাগের মধ্যে প্রতিটি সদস্যের সাথে তুলনা করা।

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

PySpark কোডের জন্য পদ্ধতি

  • প্রথমে, একটি WindowSpec অবজেক্ট তৈরি করুন যা ডিপার্টমেন্ট নম্বর (deptno) দ্বারা ডেটা বিভাজন করে এবং এটিকে বেতন (sal) দ্বারা অবরোহ ক্রমে অর্ডার করে।
  • তারপর windowSpec দ্বারা সংজ্ঞায়িত উইন্ডোর উপরে 'sal' কলামে প্রথম() বিশ্লেষণাত্মক ফাংশন প্রয়োগ করে। এই ফাংশনটি প্রতিটি পার্টিশনের মধ্যে 'সাল' কলামের প্রথম মান প্রদান করে (অর্থাৎ প্রতিটি ডিপ্টনো গ্রুপের) 'সাল' অবতরণ করে। ফলস্বরূপ কলামের একটি নতুন নাম আছে, 'first_val'।
  • এখন ফলস্বরূপ ডেটাফ্রেম বরাদ্দ করে, যার মধ্যে নির্বাচিত কলাম রয়েছে এবং একটি নতুন কলাম, 'first_val', যা বেতন মানের অবরোহ ক্রম অনুসারে প্রতিটি বিভাগের জন্য প্রথম সর্বোচ্চ বেতন দেখায়, 'first_value_df' নামে একটি নতুন পরিবর্তনশীলকে।

আউটপুট:

আউটপুট একজন কর্মচারী ডেটাফ্রেমে প্রতিটি বিভাগের জন্য প্রথম সর্বোচ্চ বেতন দেখায়।

উপসংহার

এই নিবন্ধে, আমরা উইন্ডো ফাংশন সম্পর্কে শিখেছি। স্পার্ক এসকিউএল-এর তিন ধরনের উইন্ডো ফাংশন রয়েছে: র‌্যাঙ্কিং ফাংশন, অ্যাগ্রিগেট ফাংশন এবং ভ্যালু ফাংশন। এই ফাংশনটি ব্যবহার করে, আমরা কিছু গুরুত্বপূর্ণ এবং মূল্যবান অন্তর্দৃষ্টি খুঁজে পেতে একটি ডেটাসেটে কাজ করেছি। স্পার্ক উইন্ডো ফাংশনগুলি শক্তিশালী ডেটা বিশ্লেষণ সরঞ্জামগুলি অফার করে যেমন র্যাঙ্কিং, বিশ্লেষণ এবং মান গণনা। বিভাগ দ্বারা বেতন অন্তর্দৃষ্টি বিশ্লেষণ করা হোক বা PySpark এবং SQL এর সাথে ব্যবহারিক উদাহরণ নিয়োগ করা হোক না কেন, এই ফাংশনগুলি স্পার্ক-এ কার্যকর ডেটা প্রক্রিয়াকরণ এবং বিশ্লেষণের জন্য প্রয়োজনীয় সরঞ্জাম সরবরাহ করে।

কী Takeaways

  • আমরা উইন্ডো ফাংশন সম্পর্কে শিখেছি এবং Spark SQL এবং PySpark DataFrame API ব্যবহার করে তাদের সাথে কাজ করেছি।
  • আমরা সঠিক বিশ্লেষণ প্রদানের জন্য র‌্যাঙ্ক, ঘনত্ব_র‍্যাঙ্ক, রো_নম্বর, ল্যাগ, লিড, গ্রুপবাই, পার্টিশনবাই এবং অন্যান্য ফাংশন ব্যবহার করি।
  • আমরা সমস্যার বিস্তারিত ধাপে ধাপে সমাধানও দেখেছি এবং প্রতিটি সমস্যার বিবৃতির শেষে আউটপুট বিশ্লেষণ করেছি।

এই কেস স্টাডি আপনাকে PySpark ফাংশনগুলি আরও ভালভাবে বুঝতে সাহায্য করে। আপনার যদি কোন মতামত বা প্রশ্ন থাকে, তাহলে নিচে কমেন্ট করুন। আমার সাথে সংযোগ করুন লিঙ্কডইন আরও আলোচনার জন্য। শিখতে থাকুন!!!

এই নিবন্ধে দেখানো মিডিয়া Analytics বিদ্যার মালিকানাধীন নয় এবং লেখকের বিবেচনার ভিত্তিতে ব্যবহার করা হয়।

স্পট_আইএমজি

সর্বশেষ বুদ্ধিমত্তা

স্পট_আইএমজি