شعار زيفيرنت

ابدأ بتكامل البيانات من Amazon S3 إلى Amazon Redshift باستخدام جلسات AWS Glue التفاعلية

التاريخ:

تضع المنظمات أولوية عالية لتكامل البيانات ، خاصة لدعم التحليلات والتعلم الآلي (ML) وذكاء الأعمال (BI) ومبادرات تطوير التطبيقات. تتزايد البيانات بشكل كبير ويتم إنشاؤها بواسطة مصادر بيانات متنوعة بشكل متزايد. يصبح تكامل البيانات أمرًا صعبًا عند معالجة البيانات على نطاق واسع والرفع الثقيل المتأصل المرتبط بالبنية التحتية المطلوبة لإدارتها. هذا هو أحد الأسباب الرئيسية وراء بحث المؤسسات باستمرار عن حلول تكامل بيانات سهلة الاستخدام وقليلة الصيانة لنقل البيانات من موقع إلى آخر أو لتوحيد بيانات أعمالهم من عدة مصادر في موقع مركزي لاتخاذ قرارات عمل إستراتيجية .

تستخدم معظم المؤسسات Spark لتلبية احتياجات معالجة البيانات الضخمة. إذا كنت تبحث عن تبسيط تكامل البيانات ، ولا تريد متاعب تدوير الخوادم أو إدارة الموارد أو إعداد مجموعات Spark ، فلدينا الحل المناسب لك.

غراء AWS هي خدمة تكامل بيانات بدون خادم تسهل اكتشاف البيانات وإعدادها ودمجها لأغراض التحليلات والتعلم الآلي وتطوير التطبيقات. يوفر AWS Glue كلاً من الواجهات المرئية والقائمة على الكود لجعل تكامل البيانات بسيطًا ويمكن للجميع الوصول إليه.

إذا كنت تفضل تجربة قائمة على التعليمات البرمجية وترغب في تأليف وظائف تكامل البيانات بشكل تفاعلي ، فإننا نوصي بجلسات تفاعلية. الجلسات التفاعلية هي إحدى ميزات AWS Glue التي تم إطلاقها مؤخرًا والتي تتيح لك تطوير عمليات AWS Glue بشكل تفاعلي وتشغيل واختبار كل خطوة وعرض النتائج.

هناك خيارات مختلفة لاستخدام الجلسات التفاعلية. يمكنك إنشاء جلسات تفاعلية والعمل معها من خلال واجهة سطر الأوامر AWS (AWS CLI) وواجهة برمجة التطبيقات. يمكنك أيضًا استخدام أجهزة الكمبيوتر المحمولة المتوافقة مع Jupyter للتأليف المرئي واختبار البرامج النصية لدفتر ملاحظاتك. توفر الجلسات التفاعلية نواة Jupyter التي تتكامل تقريبًا في أي مكان تقوم به Jupyter ، بما في ذلك التكامل مع IDEs مثل PyCharm و IntelliJ و Visual Studio Code. يمكّنك هذا من تأليف التعليمات البرمجية في بيئتك المحلية وتشغيلها بسلاسة على الواجهة الخلفية للجلسة التفاعلية. يمكنك أيضًا بدء تشغيل جهاز كمبيوتر محمول من خلال AWS Glue Studio ؛ يتم تنفيذ جميع خطوات التكوين من أجلك حتى تتمكن من استكشاف بياناتك والبدء في تطوير نص وظيفتك بعد بضع ثوانٍ فقط. عندما يكون الرمز جاهزًا ، يمكنك تكوين وجدولة ومراقبة دفاتر المهام كمهام AWS Glue.

إذا لم تكن قد جربت جلسات AWS Glue التفاعلية من قبل ، فيوصى بشدة بهذه المشاركة. نحن نعمل من خلال سيناريو بسيط حيث قد تحتاج إلى تحميل البيانات بشكل متزايد من خدمة تخزين أمازون البسيطة (Amazon S3) إلى الأمازون الأحمر أو تحويل وإثراء بياناتك قبل تحميلها في Amazon Redshift. في هذا المنشور ، نستخدم جلسات تفاعلية داخل دفتر ملاحظات AWS Glue Studio لتحميل ملف مجموعة بيانات NYC Taxi إلى أمازون Redshift Serverless الكتلة ، والاستعلام عن مجموعة البيانات المحملة ، وحفظ دفتر Jupyter الخاص بنا كمهمة ، وجدولته للتشغيل باستخدام تعبير cron. هيا بنا نبدأ.

حل نظرة عامة

نوجهك عبر الخطوات التالية:

  1. قم بإعداد دفتر AWS Glue Jupyter مع جلسات تفاعلية.
  2. استخدم سحر دفتر الملاحظات ، بما في ذلك اتصال AWS Glue والإشارات المرجعية.
  3. اقرأ البيانات من Amazon S3 ، وقم بتحويلها وتحميلها إلى Redshift Serverless.
  4. احفظ الكمبيوتر الدفتري كمهمة AWS Glue وجدولته للتشغيل.

المتطلبات الأساسية المسبقة

لهذه الإرشادات ، يجب علينا إكمال المتطلبات الأساسية التالية:

  1. تحميل سجلات رحلة تاكسي يلو تاكسي و جدول بحث منطقة سيارات الأجرة مجموعات البيانات في Amazon S3. يتم سرد خطوات القيام بذلك في القسم التالي.
  2. جهز ما يلزم إدارة الهوية والوصول AWS (IAM) وأدوار للعمل مع دفاتر AWS Glue Studio Jupyter وجلسات تفاعلية و AWS Glue.
  3. أنشئ اتصال AWS Glue لـ Redshift Serverless.

تحميل مجموعات البيانات إلى Amazon S3

تحميل سجلات رحلة تاكسي يلو تاكسي و بيانات جدول بحث منطقة سيارات الأجرة لبيئتك المحلية. بالنسبة لهذا المنشور ، نقوم بتنزيل بيانات يناير 2022 لبيانات سجلات رحلة التاكسي الصفراء بتنسيق باركيه. بيانات البحث عن منطقة سيارات الأجرة بتنسيق CSV. يمكنك أيضًا تنزيل ملف قاموس البيانات لمجموعة بيانات سجل الرحلة.

  1. على وحدة تحكم Amazon S3 ، إنشاء دلو تسمى my-first-aws-glue-is-project-<random number> في ال us-east-1 منطقة لتخزين البيانات.يجب أن تكون أسماء حاويات S3 فريدة عبر جميع حسابات AWS في جميع المناطق.
  2. إنشاء المجلدات nyc_yellow_taxi و taxi_zone_lookup في الحاوية التي أنشأتها للتو وقم بتحميل الملفات التي قمت بتنزيلها.
    يجب أن تبدو هياكل المجلدات الخاصة بك مثل لقطات الشاشة التالية.s3 بيانات سيارات الأجرة الصفراءبيانات البحث s3

إعداد سياسات IAM ودورها

دعنا نجهز سياسات ودور IAM الضروريين للعمل مع دفاتر AWS Glue Studio Jupyter وجلسات تفاعلية. لبدء استخدام أجهزة الكمبيوتر المحمولة في AWS Glue Studio ، يرجى الرجوع إلى بدء استخدام أجهزة الكمبيوتر المحمولة في AWS Glue Studio.

قم بإنشاء سياسات IAM لدور دفتر ملاحظات AWS Glue

قم بإنشاء السياسة AWSGlueInteractiveSessionPassRolePolicy بالأذونات التالية:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "iam:PassRole", "Resource":"arn:aws:iam::<AWS account ID>:role/AWSGlueServiceRole-GlueIS" } ]
}

تسمح هذه السياسة لدور دفتر ملاحظات AWS Glue بالتمرير إلى الجلسات التفاعلية بحيث يمكن استخدام نفس الدور في كلا المكانين. لاحظ أن AWSGlueServiceRole-GlueIS هو الدور الذي أنشأناه لدفتر AWS Glue Studio Jupyter في خطوة لاحقة. بعد ذلك ، قم بإنشاء السياسة AmazonS3Access-MyFirstGlueISProject بالأذونات التالية:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::<your s3 bucket name>", "arn:aws:s3:::<your s3 bucket name>/*" ] } ]
}

تسمح هذه السياسة لدور دفتر ملاحظات AWS Glue بالوصول إلى البيانات الموجودة في حاوية S3.

أنشئ دور IAM لدفتر AWS Glue

أنشئ دور AWS Glue جديدًا يسمى AWSGlueServiceRole-GlueIS مع السياسات التالية المرفقة به:

أنشئ اتصال AWS Glue لـ Redshift Serverless

نحن الآن جاهزون لذلك تكوين مجموعة أمان Redshift Serverless للتواصل مع مكونات AWS Glue.

  1. في وحدة التحكم Redshift Serverless ، افتح مجموعة العمل التي تستخدمها.
    يمكنك العثور على جميع مساحات الأسماء ومجموعات العمل على لوحة القيادة Redshift Serverless.
  2. تحت الدخول الى البيانات، اختر الشبكة والأمن.
  3. اختر الارتباط لمجموعة أمان Redshift Serverless VPC.redshift مجموعة أمان vpc serverlessتتم إعادة توجيهك إلى الأمازون الحوسبة المرنة السحابية (Amazon EC2).
  4. في تفاصيل مجموعة الأمان Redshift Serverless ، ضمن قواعد الداخل، اختر تحرير القواعد الواردة.
  5. أضف قاعدة مرجعية ذاتية للسماح لمكونات AWS Glue بالتواصل:
    1. في حالة النوع، اختر كل TCP.
    2. في حالة بروتوكول، اختر TCP.
    3. في حالة نطاق المنفذ ، تشمل جميع المنافذ.
    4. في حالة مصدر، استخدم نفس مجموعة الأمان مثل معرف المجموعة.
      مجموعة الأمان ذات الانزياح الأحمر للداخل
  6. وبالمثل ، أضف القواعد الصادرة التالية:
    1. قاعدة مرجعية ذاتية مع النوع as كل TCP, بروتوكول as TCP, نطاق المنفذ بما في ذلك جميع المنافذ و الرحلات كمجموعة الأمان نفسها مثل معرف المجموعة.
    2. قاعدة HTTPS للوصول إلى Amazon S3. ال s3-prefix-list-id القيمة مطلوبة في قاعدة مجموعة الأمان للسماح بحركة المرور من VPC إلى نقطة نهاية Amazon S3 VPC.
      مجموعة الأمان ذات الانزياح الأحمر للخارج

إذا لم يكن لديك نقطة نهاية Amazon S3 VPC ، فيمكنك إنشاء واحدة على سحابة أمازون الافتراضية الخاصة (Amazon VPC).

نقطة نهاية s3 vpc

يمكنك التحقق من قيمة s3-prefix-list-id على قوائم البادئات المُدارة صفحة على وحدة تحكم Amazon VPC.

قائمة بادئة s3

بعد ذلك ، اذهب إلى الموصلات صفحة على AWS Glue Studio و إنشاء اتصال JDBC جديد تسمى redshiftServerless إلى مجموعة Redshift Serverless الخاصة بك (ما لم تكن موجودة بالفعل). يمكنك العثور على تفاصيل نقطة نهاية Redshift Serverless ضمن مجموعة العمل الخاصة بك معلومات عامة الجزء. يبدو إعداد الاتصال مثل لقطة الشاشة التالية.

الانزياح الأحمر لصفحة الاتصال بدون خادم

اكتب رمزًا تفاعليًا على كمبيوتر محمول AWS Glue Studio Jupyter مدعوم بجلسات تفاعلية

يمكنك الآن البدء في كتابة تعليمات برمجية تفاعلية باستخدام دفتر AWS Glue Studio Jupyter المدعوم بجلسات تفاعلية. لاحظ أنه من الممارسات الجيدة الاستمرار في حفظ الكمبيوتر الدفتري على فترات منتظمة أثناء العمل من خلاله.

  1. في وحدة تحكم AWS Glue Studio ، أنشئ وظيفة جديدة.
  2. أختار مفكرة Jupyter وحدد قم بإنشاء دفتر ملاحظات جديد من البداية.
  3. اختار إنشاء.
    جلسة تفاعلية الغراء إنشاء دفتر ملاحظات
  4. في حالة اسم العمل، أدخل اسمًا (على سبيل المثال ، myFirstGlueISProject).
  5. في حالة دور IAM ، اختر الدور الذي قمت بإنشائه (AWSGlueServiceRole-GlueIS).
  6. اختار ابدأ مهمة الكمبيوتر المحمول.
    الغراء إعداد دفتر الجلسة التفاعليةبعد تهيئة دفتر الملاحظات ، يمكنك رؤية بعض العناصر السحرية المتاحة وخلية برمز معياري. لعرض جميع العناصر السحرية للجلسات التفاعلية ، قم بتشغيل %help في خلية لطباعة قائمة كاملة. فيما عدا %%sql، فإن تشغيل خلية من العناصر السحرية فقط لا يؤدي إلى بدء الجلسة ، ولكنه يحدد التكوين للجلسة التي تبدأ عند تشغيل الخلية الأولى من التعليمات البرمجية.جلسة تفاعلية بالغراء لتهيئة دفتر jupyterبالنسبة لهذا المنشور ، قمنا بتكوين AWS Glue بالإصدار 3.0 ، وثلاثة عمال G.1X ، ومهلة الخمول ، واتصال Amazon Redshift بمساعدة العناصر السحرية المتاحة.
  7. دعنا ندخل العناصر السحرية التالية في خليتنا الأولى ونشغلها:
    %glue_version 3.0
    %number_of_workers 3
    %worker_type G.1X
    %idle_timeout 60
    %connections redshiftServerless

    حصلنا على الرد التالي:

    Welcome to the Glue Interactive Sessions Kernel
    For more information on available magic commands, please type %help in any new cell. Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
    Installed kernel version: 0.35 Setting Glue version to: 3.0
    Previous number of workers: 5
    Setting new number of workers to: 3
    Previous worker type: G.1X
    Setting new worker type to: G.1X
    Current idle_timeout is 2880 minutes.
    idle_timeout has been set to 60 minutes.
    Connections to be included:
    redshiftServerless

  8. لنقم بتشغيل خلية الكود الأولى (رمز معياري) لبدء جلسة دفتر ملاحظات تفاعلية في غضون ثوانٍ قليلة:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)

    حصلنا على الرد التالي:

    Authenticating with environment variables and user-defined glue_role_arn:arn:aws:iam::xxxxxxxxxxxx:role/AWSGlueServiceRole-GlueIS
    Attempting to use existing AssumeRole session credentials.
    Trying to create a Glue session for the kernel.
    Worker Type: G.1X
    Number of Workers: 3
    Session ID: 7c9eadb1-9f9b-424f-9fba-d0abc57e610d
    Applying the following default arguments:
    --glue_kernel_version 0.35
    --enable-glue-datacatalog true
    --job-bookmark-option job-bookmark-enable
    Waiting for session 7c9eadb1-9f9b-424f-9fba-d0abc57e610d to get into ready status...
    Session 7c9eadb1-9f9b-424f-9fba-d0abc57e610d has been created

  9. بعد ذلك ، اقرأ بيانات سيارة الأجرة الصفراء في مدينة نيويورك من حاوية S3 في إطار AWS Glue الديناميكي:
    nyc_taxi_trip_input_dyf = glueContext.create_dynamic_frame.from_options( connection_type = "s3", connection_options = { "paths": ["s3://<your-s3-bucket-name>/nyc_yellow_taxi/"] }, format = "parquet", transformation_ctx = "nyc_taxi_trip_input_dyf"
    )

    دعنا نحسب عدد الصفوف ، ونلقي نظرة على المخطط وبعض صفوف مجموعة البيانات.

  10. عد الصفوف بالكود التالي:
    nyc_taxi_trip_input_df = nyc_taxi_trip_input_dyf.toDF()
    nyc_taxi_trip_input_df.count()

    حصلنا على الرد التالي:

  11. اعرض المخطط مع الكود التالي:
    nyc_taxi_trip_input_df.printSchema()

    حصلنا على الرد التالي:

    root |-- VendorID: long (nullable = true) |-- tpep_pickup_datetime: timestamp (nullable = true) |-- tpep_dropoff_datetime: timestamp (nullable = true) |-- passenger_count: double (nullable = true) |-- trip_distance: double (nullable = true) |-- RatecodeID: double (nullable = true) |-- store_and_fwd_flag: string (nullable = true) |-- PULocationID: long (nullable = true) |-- DOLocationID: long (nullable = true) |-- payment_type: long (nullable = true) |-- fare_amount: double (nullable = true) |-- extra: double (nullable = true) |-- mta_tax: double (nullable = true) |-- tip_amount: double (nullable = true) |-- tolls_amount: double (nullable = true) |-- improvement_surcharge: double (nullable = true) |-- total_amount: double (nullable = true) |-- congestion_surcharge: double (nullable = true) |-- airport_fee: double (nullable = true)

  12. اعرض بضعة صفوف من مجموعة البيانات بالشفرة التالية:
    nyc_taxi_trip_input_df.show(5)

    حصلنا على الرد التالي:

    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
    |VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
    | 2| 2022-01-18 15:04:43| 2022-01-18 15:12:51| 1.0| 1.13| 1.0| N| 141| 229| 2| 7.0| 0.0| 0.5| 0.0| 0.0| 0.3| 10.3| 2.5| 0.0|
    | 2| 2022-01-18 15:03:28| 2022-01-18 15:15:52| 2.0| 1.36| 1.0| N| 237| 142| 1| 9.5| 0.0| 0.5| 2.56| 0.0| 0.3| 15.36| 2.5| 0.0|
    | 1| 2022-01-06 17:49:22| 2022-01-06 17:57:03| 1.0| 1.1| 1.0| N| 161| 229| 2| 7.0| 3.5| 0.5| 0.0| 0.0| 0.3| 11.3| 2.5| 0.0|
    | 2| 2022-01-09 20:00:55| 2022-01-09 20:04:14| 1.0| 0.56| 1.0| N| 230| 230| 1| 4.5| 0.5| 0.5| 1.66| 0.0| 0.3| 9.96| 2.5| 0.0|
    | 2| 2022-01-24 16:16:53| 2022-01-24 16:31:36| 1.0| 2.02| 1.0| N| 163| 234| 1| 10.5| 1.0| 0.5| 3.7| 0.0| 0.3| 18.5| 2.5| 0.0|
    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
    only showing top 5 rows

  13. الآن ، اقرأ بيانات البحث عن منطقة سيارات الأجرة من حاوية S3 في إطار AWS Glue الديناميكي:
    nyc_taxi_zone_lookup_dyf = glueContext.create_dynamic_frame.from_options( connection_type = "s3", connection_options = { "paths": ["s3://<your-s3-bucket-name>/taxi_zone_lookup/"] }, format = "csv", format_options= { 'withHeader': True }, transformation_ctx = "nyc_taxi_zone_lookup_dyf"
    )

    دعنا نحسب عدد الصفوف ، ونلقي نظرة على المخطط وبعض صفوف مجموعة البيانات.

  14. عد الصفوف بالكود التالي:
    nyc_taxi_zone_lookup_df = nyc_taxi_zone_lookup_dyf.toDF()
    nyc_taxi_zone_lookup_df.count()

    حصلنا على الرد التالي:

  15. اعرض المخطط مع الكود التالي:
    nyc_taxi_zone_lookup_apply_mapping_dyf.toDF().printSchema()

    حصلنا على الرد التالي:

    root |-- LocationID: string (nullable = true) |-- Borough: string (nullable = true) |-- Zone: string (nullable = true) |-- service_zone: string (nullable = true)

  16. اعرض بعض الصفوف بالكود التالي:
    nyc_taxi_zone_lookup_df.show(5)

    حصلنا على الرد التالي:

    +----------+-------------+--------------------+------------+
    |LocationID| Borough| Zone|service_zone|
    +----------+-------------+--------------------+------------+
    | 1| EWR| Newark Airport| EWR|
    | 2| Queens| Jamaica Bay| Boro Zone|
    | 3| Bronx|Allerton/Pelham G...| Boro Zone|
    | 4| Manhattan| Alphabet City| Yellow Zone|
    | 5|Staten Island| Arden Heights| Boro Zone|
    +----------+-------------+--------------------+------------+
    only showing top 5 rows

  17. استنادًا إلى قاموس البيانات ، يتيح إعادة معايرة أنواع بيانات السمات في الإطارات الديناميكية المقابلة لكلا الإطارات الديناميكية:
    nyc_taxi_trip_apply_mapping_dyf = ApplyMapping.apply( frame = nyc_taxi_trip_input_dyf, mappings = [ ("VendorID","Long","VendorID","Integer"), ("tpep_pickup_datetime","Timestamp","tpep_pickup_datetime","Timestamp"), ("tpep_dropoff_datetime","Timestamp","tpep_dropoff_datetime","Timestamp"), ("passenger_count","Double","passenger_count","Integer"), ("trip_distance","Double","trip_distance","Double"), ("RatecodeID","Double","RatecodeID","Integer"), ("store_and_fwd_flag","String","store_and_fwd_flag","String"), ("PULocationID","Long","PULocationID","Integer"), ("DOLocationID","Long","DOLocationID","Integer"), ("payment_type","Long","payment_type","Integer"), ("fare_amount","Double","fare_amount","Double"), ("extra","Double","extra","Double"), ("mta_tax","Double","mta_tax","Double"), ("tip_amount","Double","tip_amount","Double"), ("tolls_amount","Double","tolls_amount","Double"), ("improvement_surcharge","Double","improvement_surcharge","Double"), ("total_amount","Double","total_amount","Double"), ("congestion_surcharge","Double","congestion_surcharge","Double"), ("airport_fee","Double","airport_fee","Double") ], transformation_ctx = "nyc_taxi_trip_apply_mapping_dyf"
    )

    nyc_taxi_zone_lookup_apply_mapping_dyf = ApplyMapping.apply( frame = nyc_taxi_zone_lookup_dyf, mappings = [ ("LocationID","String","LocationID","Integer"), ("Borough","String","Borough","String"), ("Zone","String","Zone","String"), ("service_zone","String", "service_zone","String") ], transformation_ctx = "nyc_taxi_zone_lookup_apply_mapping_dyf"
    )

  18. الآن دعنا نتحقق من مخططهم:
    nyc_taxi_trip_apply_mapping_dyf.toDF().printSchema()

    حصلنا على الرد التالي:

    root |-- VendorID: integer (nullable = true) |-- tpep_pickup_datetime: timestamp (nullable = true) |-- tpep_dropoff_datetime: timestamp (nullable = true) |-- passenger_count: integer (nullable = true) |-- trip_distance: double (nullable = true) |-- RatecodeID: integer (nullable = true) |-- store_and_fwd_flag: string (nullable = true) |-- PULocationID: integer (nullable = true) |-- DOLocationID: integer (nullable = true) |-- payment_type: integer (nullable = true) |-- fare_amount: double (nullable = true) |-- extra: double (nullable = true) |-- mta_tax: double (nullable = true) |-- tip_amount: double (nullable = true) |-- tolls_amount: double (nullable = true) |-- improvement_surcharge: double (nullable = true) |-- total_amount: double (nullable = true) |-- congestion_surcharge: double (nullable = true) |-- airport_fee: double (nullable = true)

    nyc_taxi_zone_lookup_apply_mapping_dyf.toDF().printSchema()

    حصلنا على الرد التالي:

    root |-- LocationID: integer (nullable = true) |-- Borough: string (nullable = true) |-- Zone: string (nullable = true) |-- service_zone: string (nullable = true)

  19. دعونا نضيف العمود trip_duration لحساب مدة كل رحلة بالدقائق إلى الإطار الديناميكي لرحلة التاكسي:
    # Function to calculate trip duration in minutes
    def trip_duration(start_timestamp,end_timestamp): minutes_diff = (end_timestamp - start_timestamp).total_seconds() / 60.0 return(minutes_diff)

    # Transformation function for each record
    def transformRecord(rec): rec["trip_duration"] = trip_duration(rec["tpep_pickup_datetime"], rec["tpep_dropoff_datetime"]) return rec
    nyc_taxi_trip_final_dyf = Map.apply( frame = nyc_taxi_trip_apply_mapping_dyf, f = transformRecord, transformation_ctx = "nyc_taxi_trip_final_dyf"
    )

    دعنا نحسب عدد الصفوف ، ونلقي نظرة على المخطط وعدد قليل من صفوف مجموعة البيانات بعد تطبيق التحويل أعلاه.

  20. احصل على عدد السجلات مع الكود التالي:
    nyc_taxi_trip_final_df = nyc_taxi_trip_final_dyf.toDF()
    nyc_taxi_trip_final_df.count()

    حصلنا على الرد التالي:

  21. اعرض المخطط مع الكود التالي:
    nyc_taxi_trip_final_df.printSchema()

    حصلنا على الرد التالي:

    root |-- extra: double (nullable = true) |-- tpep_dropoff_datetime: timestamp (nullable = true) |-- trip_duration: double (nullable = true) |-- trip_distance: double (nullable = true) |-- mta_tax: double (nullable = true) |-- improvement_surcharge: double (nullable = true) |-- DOLocationID: integer (nullable = true) |-- congestion_surcharge: double (nullable = true) |-- total_amount: double (nullable = true) |-- airport_fee: double (nullable = true) |-- payment_type: integer (nullable = true) |-- fare_amount: double (nullable = true) |-- RatecodeID: integer (nullable = true) |-- tpep_pickup_datetime: timestamp (nullable = true) |-- VendorID: integer (nullable = true) |-- PULocationID: integer (nullable = true) |-- tip_amount: double (nullable = true) |-- tolls_amount: double (nullable = true) |-- store_and_fwd_flag: string (nullable = true) |-- passenger_count: integer (nullable = true)

  22. اعرض بعض الصفوف بالكود التالي:
    nyc_taxi_trip_final_df.show(5)

    حصلنا على الرد التالي:

    +-----+---------------------+------------------+-------------+-------+---------------------+------------+--------------------+------------+-----------+------------+-----------+----------+--------------------+--------+------------+----------+------------+------------------+---------------+
    |extra|tpep_dropoff_datetime| trip_duration|trip_distance|mta_tax|improvement_surcharge|DOLocationID|congestion_surcharge|total_amount|airport_fee|payment_type|fare_amount|RatecodeID|tpep_pickup_datetime|VendorID|PULocationID|tip_amount|tolls_amount|store_and_fwd_flag|passenger_count|
    +-----+---------------------+------------------+-------------+-------+---------------------+------------+--------------------+------------+-----------+------------+-----------+----------+--------------------+--------+------------+----------+------------+------------------+---------------+
    | 0.0| 2022-01-18 15:12:51| 8.133333333333333| 1.13| 0.5| 0.3| 229| 2.5| 10.3| 0.0| 2| 7.0| 1| 2022-01-18 15:04:43| 2| 141| 0.0| 0.0| N| 1|
    | 0.0| 2022-01-18 15:15:52| 12.4| 1.36| 0.5| 0.3| 142| 2.5| 15.36| 0.0| 1| 9.5| 1| 2022-01-18 15:03:28| 2| 237| 2.56| 0.0| N| 2|
    | 3.5| 2022-01-06 17:57:03| 7.683333333333334| 1.1| 0.5| 0.3| 229| 2.5| 11.3| 0.0| 2| 7.0| 1| 2022-01-06 17:49:22| 1| 161| 0.0| 0.0| N| 1|
    | 0.5| 2022-01-09 20:04:14| 3.316666666666667| 0.56| 0.5| 0.3| 230| 2.5| 9.96| 0.0| 1| 4.5| 1| 2022-01-09 20:00:55| 2| 230| 1.66| 0.0| N| 1|
    | 1.0| 2022-01-24 16:31:36|14.716666666666667| 2.02| 0.5| 0.3| 234| 2.5| 18.5| 0.0| 1| 10.5| 1| 2022-01-24 16:16:53| 2| 163| 3.7| 0.0| N| 1|
    +-----+---------------------+------------------+-------------+-------+---------------------+------------+--------------------+------------+-----------+------------+-----------+----------+--------------------+--------+------------+----------+------------+------------------+---------------+
    only showing top 5 rows

  23. بعد ذلك ، قم بتحميل كل من الإطارات الديناميكية في مجموعة Amazon Redshift Serverless الخاصة بنا:
    nyc_taxi_trip_sink_dyf = glueContext.write_dynamic_frame.from_jdbc_conf( frame = nyc_taxi_trip_final_dyf, catalog_connection = "redshiftServerless", connection_options = {"dbtable": "public.f_nyc_yellow_taxi_trip","database": "dev"}, redshift_tmp_dir = "s3://aws-glue-assets-<AWS-account-ID>-us-east-1/temporary/", transformation_ctx = "nyc_taxi_trip_sink_dyf"
    )

    nyc_taxi_zone_lookup_sink_dyf = glueContext.write_dynamic_frame.from_jdbc_conf( frame = nyc_taxi_zone_lookup_apply_mapping_dyf, catalog_connection = "redshiftServerless", connection_options = {"dbtable": "public.d_nyc_taxi_zone_lookup", "database": "dev"}, redshift_tmp_dir = "s3://aws-glue-assets-<AWS-account-ID>-us-east-1/temporary/", transformation_ctx = "nyc_taxi_zone_lookup_sink_dyf"
    )

    الآن دعنا نتحقق من صحة البيانات التي تم تحميلها في مجموعة Amazon Redshift Serverless عن طريق تشغيل بعض الاستعلامات في محرر استعلام Amazon Redshift v2. يمكنك أيضًا استخدام محرر الاستعلام المفضل لديك.

  24. أولاً ، نحسب عدد السجلات ونحدد بضعة صفوف في كلا الجدولين الهدف (f_nyc_yellow_taxi_trip و d_nyc_taxi_zone_lookup):
    SELECT 'f_nyc_yellow_taxi_trip' AS table_name, COUNT(1) FROM "public"."f_nyc_yellow_taxi_trip"
    UNION ALL
    SELECT 'd_nyc_taxi_zone_lookup' AS table_name, COUNT(1) FROM "public"."d_nyc_taxi_zone_lookup";

    الانزياح الأحمر الجدول سجل حساب الناتج الاستعلام

    عدد السجلات في f_nyc_yellow_taxi_trip (2,463,931) و d_nyc_taxi_zone_lookup (265) تطابق عدد السجلات في إطار الإدخال الديناميكي الخاص بنا. هذا يؤكد أن جميع السجلات من الملفات في Amazon S3 قد تم تحميلها بنجاح في Amazon Redshift.

    يمكنك عرض بعض السجلات لكل جدول بالأوامر التالية:

    SELECT * FROM public.f_nyc_yellow_taxi_trip LIMIT 10;

    استعلام تحديد الانزياح الأحمر لبيانات الحقائق

    SELECT * FROM public.d_nyc_taxi_zone_lookup LIMIT 10;

    استعلام تحديد بيانات البحث عن الانزياح الأحمر

  25. تتمثل إحدى الأفكار التي نرغب في تكوينها من مجموعات البيانات في الحصول على أفضل خمسة مسارات مع مدة رحلتهم. لنقم بتشغيل SQL لذلك على Amazon Redshift:
    SELECT CASE WHEN putzl.zone >= dotzl.zone THEN putzl.zone || ' - ' || dotzl.zone ELSE dotzl.zone || ' - ' || putzl.zone END AS "Route", COUNT(1) AS "Frequency", ROUND(SUM(trip_duration),1) AS "Total Trip Duration (mins)"
    FROM public.f_nyc_yellow_taxi_trip ytt
    INNER JOIN public.d_nyc_taxi_zone_lookup putzl ON ytt.pulocationid = putzl.locationid
    INNER JOIN public.d_nyc_taxi_zone_lookup dotzl ON ytt.dolocationid = dotzl.locationid
    GROUP BY "Route"
    ORDER BY "Frequency" DESC, "Total Trip Duration (mins)" DESC
    LIMIT 5;

    الانزياح الأحمر أعلى 5 استعلام عن المسار

قم بتحويل الكمبيوتر الدفتري إلى مهمة AWS Glue وجدولتها

الآن بعد أن قمنا بتأليف الكود واختبرنا وظائفه ، دعنا نحفظه كوظيفة ونقوم بجدولته.

دعونا أولا تمكين إشارات مرجعية للوظيفة. تساعد الإشارات المرجعية للوظائف AWS Glue في الحفاظ على معلومات الحالة ومنع إعادة معالجة البيانات القديمة. باستخدام الإشارات المرجعية للوظائف ، يمكنك معالجة البيانات الجديدة عند إعادة التشغيل في فترة زمنية مجدولة.

  1. أضف الأمر السحري التالي بعد الخلية الأولى التي تحتوي على أوامر سحرية أخرى تمت تهيئتها أثناء تأليف الكود:
    %%configure
    { "--job-bookmark-option": "job-bookmark-enable"
    }

    لتهيئة الإشارات المرجعية للوظيفة ، نقوم بتشغيل الكود التالي باسم الوظيفة كمتغير افتراضي (myFirstGlueISProject لهذا المنصب). إشارات مرجعية للوظيفة تخزن الولايات لوظيفة ما. يجب أن يكون لديك دائمًا job.init() في بداية البرنامج النصي و job.commit() في نهاية البرنامج النصي. يتم استخدام هاتين الوظيفتين لتهيئة خدمة الإشارات المرجعية وتحديث تغيير الحالة إلى الخدمة. لن تعمل الإشارات المرجعية دون الاتصال بها.

  2. أضف الجزء التالي من الكود بعد الكود المعياري:
    params = []
    if '--JOB_NAME' in sys.argv: params.append('JOB_NAME')
    args = getResolvedOptions(sys.argv, params)
    if 'JOB_NAME' in args: jobname = args['JOB_NAME']
    else: jobname = "myFirstGlueISProject"
    job.init(jobname, args)

  3. ثم قم بالتعليق على جميع أسطر الكود التي تم تأليفها للتحقق من النتيجة المرجوة وليست ضرورية للوظيفة لتحقيق غرضها:
    #nyc_taxi_trip_input_df = nyc_taxi_trip_input_dyf.toDF()
    #nyc_taxi_trip_input_df.count()
    #nyc_taxi_trip_input_df.printSchema()
    #nyc_taxi_trip_input_df.show(5) #nyc_taxi_zone_lookup_df = nyc_taxi_zone_lookup_dyf.toDF()
    #nyc_taxi_zone_lookup_df.count()
    #nyc_taxi_zone_lookup_df.printSchema()
    #nyc_taxi_zone_lookup_df.show(5) #nyc_taxi_trip_apply_mapping_dyf.toDF().printSchema()
    #nyc_taxi_zone_lookup_apply_mapping_dyf.toDF().printSchema() #nyc_taxi_trip_final_df = nyc_taxi_trip_final_dyf.toDF()
    #nyc_taxi_trip_final_df.count()
    #nyc_taxi_trip_final_df.printSchema()
    #nyc_taxi_trip_final_df.show(5)

  4. احفظ دفتر الملاحظات.
    جلسة تفاعلية الغراء حفظ الوظيفة
    يمكنك التحقق من البرنامج النصي المقابل على ملف سيناريو علامة التبويب.علامة تبويب البرنامج النصي للجلسة التفاعلية الغراءنلاحظ أن job.commit() تتم إضافته تلقائيًا في نهاية البرنامج النصي ، فلنقم بتشغيل دفتر الملاحظات كوظيفة.
  5. أولاً ، اقتطاع f_nyc_yellow_taxi_trip و d_nyc_taxi_zone_lookup الجداول في Amazon Redshift باستخدام محرر الاستعلام v2 حتى لا يكون لدينا تكرارات في كلا الجدولين:
    truncate "public"."f_nyc_yellow_taxi_trip";
    truncate "public"."d_nyc_taxi_zone_lookup";

  6. اختار يجري لتشغيل الوظيفة.
    وظيفة تشغيل جلسة تفاعلية بالغراءيمكنك التحقق من حالته على أشواط علامة التبويب.الغراء حالة تشغيل وظيفة جلسة تفاعليةاكتملت المهمة في أقل من 5 دقائق مع G1.x 3 DPU.
  7. دعنا نتحقق من عدد السجلات في f_nyc_yellow_taxi_trip و d_nyc_taxi_zone_lookup الجداول في Amazon Redshift:
    SELECT 'f_nyc_yellow_taxi_trip' AS table_name, COUNT(1) FROM "public"."f_nyc_yellow_taxi_trip"
    UNION ALL
    SELECT 'd_nyc_taxi_zone_lookup' AS table_name, COUNT(1) FROM "public"."d_nyc_taxi_zone_lookup";

    الانزياح الأحمر العد إخراج الاستعلام

    مع تمكين الإشارات المرجعية للوظائف ، حتى إذا قمت بتشغيل الوظيفة مرة أخرى بدون وجود ملفات جديدة في المجلدات المقابلة في حاوية S3 ، فإنها لا تعالج نفس الملفات مرة أخرى. تُظهر لقطة الشاشة التالية مهمة لاحقة يتم تشغيلها في بيئتي ، والتي اكتملت في أقل من دقيقتين نظرًا لعدم وجود ملفات جديدة لمعالجتها.

    إعادة تشغيل وظيفة الجلسة التفاعلية الغراء

    الآن دعنا نحدد الوظيفة.

  8. على جداول علامة التبويب، اختر إنشاء جدول.
    جلسة تفاعلية الغراء إنشاء جدول
  9. في حالة الاسم¸ أدخل اسمًا (على سبيل المثال ، myFirstGlueISProject-testSchedule).
  10. في حالة تردد، اختر Custom.
  11. أدخل تعبير cron حتى يتم تشغيل الوظيفة كل يوم اثنين الساعة 6:00 صباحًا.
  12. أضف وصفًا اختياريًا.
  13. اختار إنشاء جدول.
    جلسة تفاعلية الغراء إضافة جدول

تم حفظ الجدول الزمني وتفعيله. يمكنك تعديل الجدول أو إيقافه مؤقتًا أو استئنافه أو حذفه من ملف الإجراءات القائمة.

عمل جدول الجلسة التفاعلية الغراء

تنظيف

لتجنب تكبد رسوم في المستقبل ، احذف موارد AWS التي أنشأتها.

  • حذف مهمة AWS Glue (myFirstGlueISProject لهذا المنصب).
  • احذف عناصر ودلو Amazon S3 (my-first-aws-glue-is-project-<random number> لهذا المنصب).
  • حذف سياسات وأدوار AWS IAM (AWSGlueInteractiveSessionPassRolePolicy, AmazonS3Access-MyFirstGlueISProject و AWSGlueServiceRole-GlueIS).
  • احذف جداول Amazon Redshift (f_nyc_yellow_taxi_trip و d_nyc_taxi_zone_lookup).
  • احذف اتصال AWS Glue JDBC (redshiftServerless).
  • احذف أيضًا مجموعة الأمان Redshift Serverless ذات المرجع الذاتي ، ونقطة نهاية Amazon S3 (إذا قمت بإنشائها أثناء اتباع الخطوات الخاصة بهذا المنشور).

وفي الختام

في هذا المنشور ، أوضحنا كيفية القيام بما يلي:

  • قم بإعداد دفتر AWS Glue Jupyter مع جلسات تفاعلية
  • استخدم العناصر السحرية للكمبيوتر الدفتري ، بما في ذلك اتصال AWS Glue على متن الطائرة والإشارات المرجعية
  • اقرأ البيانات من Amazon S3 ، وقم بتحويلها وتحميلها إلى Amazon Redshift Serverless
  • تكوين السحر لتمكين الإشارات المرجعية للوظائف ، وحفظ دفتر الملاحظات كمهمة AWS Glue ، وجدولتها باستخدام تعبير cron

الهدف من هذا المنشور هو تزويدك بأساسيات خطوة بخطوة لمساعدتك في استخدام أجهزة الكمبيوتر المحمولة AWS Glue Studio Jupyter والجلسات التفاعلية. يمكنك إعداد دفتر AWS Glue Jupyter في دقائق ، وبدء جلسة تفاعلية في ثوانٍ ، وتحسين تجربة التطوير بشكل كبير مع وظائف AWS Glue. الجلسات التفاعلية لها حد أدنى للفواتير مدته دقيقة واحدة مع ميزات التحكم في التكلفة التي تقلل من تكلفة تطوير تطبيقات إعداد البيانات. يمكنك إنشاء واختبار التطبيقات من البيئة التي تختارها ، حتى في بيئتك المحلية ، باستخدام الواجهة الخلفية للجلسات التفاعلية.

توفر الجلسات التفاعلية طريقة أسرع وأرخص وأكثر مرونة لإنشاء تطبيقات تحضير البيانات والتحليلات وتشغيلها. لمعرفة المزيد حول الجلسات التفاعلية ، ارجع إلى تطوير الوظائف (جلسات تفاعلية)، وابدأ في استكشاف تجربة تطوير جديدة بالكامل باستخدام AWS Glue. بالإضافة إلى ذلك ، تحقق من المنشورات التالية لتصفح المزيد من الأمثلة على استخدام الجلسات التفاعلية مع خيارات مختلفة:


حول المؤلف

مدونة فيكاس الموافقة المسبقة عن علمفيكاس عمر هو مهندس حلول متخصص رئيسي في التحليلات في Amazon Web Services. تتمتع Vikas بخلفية قوية في التحليلات وإدارة تجربة العملاء (CEM) واستثمار البيانات ، مع أكثر من 13 عامًا من الخبرة في هذا المجال على مستوى العالم. من خلال ست شهادات AWS ، بما في ذلك تخصص التحليلات ، فهو مدافع موثوق به في مجال التحليلات لعملاء وشركاء AWS. يحب السفر ومقابلة العملاء ومساعدتهم على النجاح في ما يفعلونه.

نوري الشخصي الموافقة المسبقة عن علمنوريتاكا سيكياما هو مهندس رئيسي للبيانات الضخمة في فريق AWS Glue. إنه يستمتع بالتعاون مع فرق مختلفة لتقديم نتائج مثل هذه المشاركة. في أوقات فراغه ، يستمتع بلعب ألعاب الفيديو مع عائلته.

غال بلوق الموافقة المسبقة عن علمغال هاين هو مدير منتج في AWS Glue ولديه أكثر من 15 عامًا من الخبرة كمدير منتج ومهندس بيانات ومهندس بيانات. إنها متحمسة لتطوير فهم عميق لاحتياجات عمل العملاء والتعاون مع المهندسين لتصميم منتجات بيانات أنيقة وقوية وسهلة الاستخدام. حصلت غال على درجة الماجستير في علوم البيانات من جامعة كاليفورنيا في بيركلي وهي تستمتع بالسفر ولعب ألعاب الطاولة والذهاب إلى الحفلات الموسيقية.

بقعة_صورة

أحدث المعلومات الاستخباراتية

بقعة_صورة