شعار زيفيرنت

قم بتنسيق مسار ETL الشامل باستخدام Amazon S3 وAWS Glue وAmazon Redshift Serverless باستخدام Amazon MWAA | خدمات الويب الأمازون

التاريخ:

تدفقات عمل أمازون المدارة لتدفق أباتشي (Amazon MWAA) هي خدمة تنسيق مُدارة لـ أباتشي تدفق الهواء التي يمكنك استخدامها لإعداد وتشغيل خطوط أنابيب البيانات في السحابة على نطاق واسع. Apache Airflow هي أداة مفتوحة المصدر تُستخدم لتأليف تسلسل العمليات والمهام وجدولتها ومراقبتها برمجيًا، ويشار إليها باسم سير العمل. باستخدام Amazon MWAA، يمكنك استخدام Apache Airflow وPython لإنشاء مسارات عمل دون الحاجة إلى إدارة البنية التحتية الأساسية لقابلية التوسع والتوفر والأمان.

باستخدام حسابات AWS المتعددة، يمكن للمؤسسات توسيع نطاق أعباء عملها وإدارة تعقيداتها بشكل فعال أثناء نموها. يوفر هذا النهج آلية قوية للتخفيف من التأثير المحتمل للاضطرابات أو حالات الفشل، مع التأكد من بقاء أعباء العمل الحيوية قيد التشغيل. بالإضافة إلى ذلك، فإنه يتيح تحسين التكلفة من خلال مواءمة الموارد مع حالات استخدام محددة، والتأكد من التحكم في النفقات بشكل جيد. ومن خلال عزل أعباء العمل مع متطلبات أمان محددة أو احتياجات الامتثال، يمكن للمؤسسات الحفاظ على أعلى مستويات خصوصية البيانات وأمانها. علاوة على ذلك، تتيح لك القدرة على تنظيم حسابات AWS المتعددة بطريقة منظمة مواءمة عملياتك التجارية ومواردك وفقًا لمتطلباتك التشغيلية والتنظيمية والميزانية الفريدة. ويعزز هذا النهج الكفاءة والمرونة وقابلية التوسع، مما يمكّن المؤسسات الكبيرة من تلبية احتياجاتها المتطورة وتحقيق أهدافها.

يوضح هذا المنشور كيفية تنسيق خط أنابيب الاستخراج والتحويل والتحميل (ETL) من طرف إلى طرف باستخدام خدمة تخزين أمازون البسيطة (Amazon S3) ، غراء AWSو أمازون Redshift Serverless مع أمازون MWAA.

حل نظرة عامة

بالنسبة لهذا المنشور، فإننا نعتبر حالة استخدام حيث يريد فريق هندسة البيانات إنشاء عملية ETL وتقديم أفضل تجربة للمستخدمين النهائيين عندما يريدون الاستعلام عن أحدث البيانات بعد إضافة ملفات أولية جديدة إلى Amazon S3 في المركز المركزي. الحساب (الحساب أ في مخطط البنية التالي). يريد فريق هندسة البيانات فصل البيانات الأولية في حساب AWS الخاص به (الحساب ب في الرسم التخطيطي) لزيادة الأمان والتحكم. يريدون أيضًا تنفيذ أعمال معالجة البيانات وتحويلها في حسابهم الخاص (الحساب ب) لتقسيم الواجبات ومنع أي تغييرات غير مقصودة على البيانات الأولية المصدر الموجودة في الحساب المركزي (الحساب أ). يتيح هذا الأسلوب للفريق معالجة البيانات الأولية المستخرجة من الحساب "أ" إلى الحساب "ب"، المخصص لمهام معالجة البيانات. وهذا يضمن إمكانية الحفاظ على البيانات الأولية والمعالجة منفصلة بشكل آمن عبر حسابات متعددة، إذا لزم الأمر، لتعزيز إدارة البيانات وأمانها.

يستخدم الحل الخاص بنا مسار ETL شامل يتم تنسيقه بواسطة Amazon MWAA والذي يبحث عن ملفات تزايدية جديدة في موقع Amazon S3 في الحساب أ، حيث توجد البيانات الأولية. يتم ذلك عن طريق استدعاء مهام AWS Glue ETL والكتابة إلى كائنات البيانات في مجموعة Redshift Serverless في الحساب B. ثم يبدأ تشغيل المسار الإجراءات المخزنة وأوامر SQL على Redshift Serverless. عند انتهاء تشغيل الاستعلامات، سيتم تفريغ يتم استدعاء العملية من مستودع بيانات Redshift إلى حاوية S3 في الحساب أ.

نظرًا لأهمية الأمان، يغطي هذا المنشور أيضًا كيفية تكوين اتصال Airflow باستخدام مدير أسرار AWS لتجنب تخزين بيانات اعتماد قاعدة البيانات ضمن اتصالات ومتغيرات Airflow.

يوضح الرسم التخطيطي التالي النظرة العامة المعمارية للمكونات المشاركة في تنسيق سير العمل.

يتكون سير العمل من المكونات التالية:

  • توجد حاويات S3 المصدر والهدف في حساب مركزي (الحساب أ)، بينما توجد Amazon MWAA وAWS Glue وAmazon Redshift في حساب مختلف (الحساب ب). تم إعداد الوصول عبر الحسابات بين مجموعات S3 في الحساب أ مع الموارد الموجودة في الحساب ب لتتمكن من تحميل البيانات وإلغاء تحميلها.
  • في الحساب الثاني، تتم استضافة Amazon MWAA في VPC واحد وRedshift Serverless في VPC مختلف، وهما متصلان من خلال نظير VPC. يتم تأمين مجموعة عمل Redshift Serverless داخل شبكات فرعية خاصة عبر ثلاث مناطق توافر خدمات.
  • يتم تخزين أسرار مثل اسم المستخدم وكلمة المرور ومنفذ قاعدة البيانات ومنطقة AWS لـ Redshift Serverless في Secrets Manager.
  • يتم إنشاء نقاط نهاية VPC لـ Amazon S3 وSecrets Manager للتفاعل مع الموارد الأخرى.
  • عادةً ما يقوم مهندسو البيانات بإنشاء رسم بياني دوري موجه لتدفق الهواء (DAG) وتنفيذ تغييراتهم على GitHub. باستخدام إجراءات GitHub، يتم نشرها في حاوية S3 في الحساب B (في هذا المنشور، نقوم بتحميل الملفات إلى حاوية S3 مباشرةً). يقوم دلو S3 بتخزين الملفات المتعلقة بـ Airflow مثل ملفات DAG، requirements.txt الملفات والمكونات الإضافية. يتم تخزين نصوص وأصول AWS Glue ETL في حاوية S3 أخرى. يساعد هذا الفصل في الحفاظ على التنظيم وتجنب الارتباك.
  • يستخدم Airflow DAG العديد من المشغلين وأجهزة الاستشعار والاتصالات والمهام والقواعد لتشغيل خط أنابيب البيانات حسب الحاجة.
  • يتم تسجيل الدخول إلى سجلات Airflow الأمازون CloudWatchويمكن تكوين التنبيهات لمراقبة المهام. لمزيد من المعلومات، راجع مراقبة لوحات المعلومات والإنذارات على Amazon MWAA.

المتطلبات الأساسية المسبقة

نظرًا لأن هذا الحل يتمحور حول استخدام Amazon MWAA لتنسيق مسار ETL، فإنك تحتاج إلى إعداد موارد أساسية معينة عبر الحسابات مسبقًا. على وجه التحديد، تحتاج إلى إنشاء مجموعات ومجلدات S3 وموارد AWS Glue وموارد Redshift Serverless في حساباتها الخاصة قبل تنفيذ التكامل الكامل لسير العمل باستخدام Amazon MWAA.

انشر الموارد في الحساب أ باستخدام AWS CloudFormation

في الحساب أ، قم بتشغيل المقدم تكوين سحابة AWS كومة لإنشاء الموارد التالية:

  • مجموعات ومجلدات S3 المصدر والهدف. كأفضل ممارسة، يتم تنسيق هياكل دلو الإدخال والإخراج بتقسيم نمط الخلية كـ s3://<bucket>/products/YYYY/MM/DD/.
  • مجموعة بيانات عينة تسمى products.csvالتي نستخدمها في هذا المنصب.

قم بتحميل مهمة AWS Glue إلى Amazon S3 في الحساب ب

في الحساب ب، قم بإنشاء موقع Amazon S3 يسمى aws-glue-assets-<account-id>-<region>/scripts (إذا لم يكن موجودا). استبدل المعلمات الخاصة بمعرف الحساب والمنطقة في ملف Sample_glue_job.py البرنامج النصي وتحميل ملف مهمة AWS Glue إلى موقع Amazon S3.

انشر الموارد في الحساب "ب" باستخدام AWS CloudFormation

في الحساب ب، قم بتشغيل قالب حزمة CloudFormation المقدم لإنشاء الموارد التالية:

  • دلو S3 airflow-<username>-bucket لتخزين الملفات المتعلقة بـ Airflow بالبنية التالية:
    • الخناجر - مجلد ملفات DAG.
    • الإضافات - الملف الخاص بأي ملحقات Airflow مخصصة أو مجتمعية.
    • المتطلبات - و requirements.txt ملف لأي حزم بايثون.
    • مخطوطات - أي نصوص SQL مستخدمة في DAG.
    • البيانات - أي مجموعات بيانات مستخدمة في DAG.
  • بيئة Redshift بدون خادم. اسم مجموعة العمل ومساحة الاسم مسبوقة بـ sample.
  • بيئة AWS Glue، والتي تحتوي على ما يلي:
    • غراء AWS الزاحف، الذي يقوم بالزحف إلى البيانات من مجموعة مصدر S3 sample-inp-bucket-etl-<username> في الحساب أ
    • قاعدة بيانات تسمى products_db في كتالوج بيانات AWS Glue.
    • تعليم اللغة الإنجليزية وظيفة تسمى sample_glue_job. يمكن لهذه الوظيفة قراءة الملفات من products الجدول في كتالوج البيانات وتحميل البيانات في جدول Redshift products.
  • نقطة نهاية بوابة VPC إلى Amazon S3.
  • بيئة أمازون MWAA. للحصول على خطوات تفصيلية لإنشاء بيئة Amazon MWAA باستخدام وحدة تحكم Amazon MWAA، راجع تقديم مهام سير العمل المُدارة من Amazon لتدفق هواء Apache (MWAA).

إطلاق المكدس 1

قم بإنشاء موارد Amazon Redshift

قم بإنشاء جدولين وإجراء مخزن في مجموعة عمل Redshift Serverless باستخدام Products.sql ملف.

في هذا المثال، قمنا بإنشاء جدولين يسمى products و products_f. اسم الإجراء المخزن هو sp_products.

تكوين أذونات تدفق الهواء

بعد إنشاء بيئة Amazon MWAA بنجاح، ستظهر الحالة كـ متوفرة . أختر افتح واجهة مستخدم تدفق الهواء لعرض واجهة مستخدم تدفق الهواء. تتم مزامنة DAGs تلقائيًا من مجموعة S3 وتكون مرئية في واجهة المستخدم. ومع ذلك، في هذه المرحلة، لا توجد DAGs في المجلد S3.

أضف السياسة التي يديرها العميل AmazonMWAAFullConsoleAccess، والذي يمنح مستخدمي Airflow أذونات الوصول إدارة الهوية والوصول AWS (IAM)، وإرفاق هذه السياسة بدور Amazon MWAA. لمزيد من المعلومات، راجع الوصول إلى بيئة Amazon MWAA.

تتمتع السياسات المرتبطة بدور Amazon MWAA بإمكانية الوصول الكامل ويجب استخدامها فقط لأغراض الاختبار في بيئة اختبار آمنة. بالنسبة لعمليات نشر الإنتاج، اتبع مبدأ الامتياز الأقل.

هيئ البيئة

يوضح هذا القسم خطوات تكوين البيئة. تتضمن العملية الخطوات عالية المستوى التالية:

  1. قم بتحديث أي مقدمي خدمات ضروريين.
  2. قم بإعداد الوصول عبر الحسابات.
  3. أنشئ اتصال نظير VPC بين Amazon MWAA VPC وAmazon Redshift VPC.
  4. قم بتكوين Secrets Manager للتكامل مع Amazon MWAA.
  5. تحديد اتصالات تدفق الهواء.

قم بتحديث مقدمي الخدمة

اتبع الخطوات الواردة في هذا القسم إذا كان إصدار Amazon MWAA الخاص بك أقل من 2.8.1 (أحدث إصدار حتى كتابة هذا المنشور).

مقدمي هي حزم يتم صيانتها بواسطة المجتمع وتتضمن جميع المشغلين الأساسيين والخطافات وأجهزة الاستشعار لخدمة معينة. يتم استخدام موفر Amazon للتفاعل مع خدمات AWS مثل Amazon S3 وAmazon Redshift Serverless وAWS Glue والمزيد. هناك أكثر من 200 وحدة داخل مزود أمازون.

على الرغم من أن إصدار Airflow المدعوم في Amazon MWAA هو 2.6.3، والذي يأتي مرفقًا مع إصدار الحزمة المقدمة من Amazon 8.2.0، لم تتم إضافة دعم Amazon Redshift Serverless حتى قدمت Amazon إصدار الحزمة 8.4.0. نظرًا لأن إصدار الموفر المجمع الافتراضي أقدم مما كان عليه عندما تم تقديم دعم Redshift Serverless، فيجب ترقية إصدار الموفر لاستخدام هذه الوظيفة.

الخطوة الأولى هي تحديث ملف القيود و requirements.txt الملف بالإصدارات الصحيحة. تشير إلى تحديد حزم الموفر الأحدث للتعرف على خطوات تحديث حزمة موفر Amazon.

  1. حدد المتطلبات على النحو التالي:
    --constraint "/usr/local/airflow/dags/constraints-3.10-mod.txt"
    apache-airflow-providers-amazon==8.4.0

  2. قم بتحديث الإصدار الموجود في ملف القيود إلى 8.4.0 أو أعلى.
  3. إضافة القيود-3.11-updated.txt ملف ل /dags المجلد.

الرجوع إلى إصدارات Apache Airflow على مسارات العمل المُدارة من Amazon لـ Apache Airflow للحصول على الإصدارات الصحيحة من ملف القيود اعتمادًا على إصدار Airflow.

  1. انتقل إلى بيئة Amazon MWAA واختر تعديل.
  2. تحت رمز DAG في Amazon S3، ل ملف المتطلبات، اختر الإصدار الأحدث.
  3. اختار حفظ.

سيؤدي هذا إلى تحديث البيئة وسيصبح الموفرون الجدد ساريين.

  1. للتحقق من إصدار الموفرين، انتقل إلى مقدمي تحت إداري الجدول.

يجب أن يكون إصدار حزمة موفر Amazon هو 8.4.0، كما هو موضح في لقطة الشاشة التالية. إذا لم يكن الأمر كذلك، فقد حدث خطأ أثناء التحميل requirements.txt. لتصحيح أي أخطاء، انتقل إلى وحدة تحكم CloudWatch وافتح الملف requirements_install_ip بتسجيل الدخول سجل التدفقات، حيث يتم سرد الأخطاء. تشير إلى تمكين السجلات على وحدة تحكم Amazon MWAA لمزيد من التفاصيل.

قم بإعداد الوصول عبر الحسابات

يتعين عليك إعداد سياسات وأدوار عبر الحسابات بين الحساب "أ" والحساب "ب" للوصول إلى حاويات S3 لتحميل البيانات وإلغاء تحميلها. أكمل الخطوات التالية:

  1. في الحساب أ، قم بتكوين سياسة الحاوية الخاصة بالحاوية sample-inp-bucket-etl-<username> لمنح أذونات لأدوار AWS Glue وAmazon MWAA في الحساب B للكائنات الموجودة في المجموعة sample-inp-bucket-etl-<username>:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": [
                        "arn:aws:iam::<account-id-of- AcctB>:role/service-role/<Glue-role>",
                        "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                    ]
                },
                "Action": [
                    "s3:GetObject",
    "s3:PutObject",
    		   "s3:PutObjectAcl",
    		   "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  2. وبالمثل، قم بتكوين سياسة الحاوية الخاصة بالحاوية sample-opt-bucket-etl-<username> لمنح أذونات لأدوار Amazon MWAA في الحساب B لوضع الكائنات في هذه المجموعة:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                },
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  3. في الحساب أ، قم بإنشاء سياسة IAM تسمى policy_for_roleA، والذي يسمح بإجراءات Amazon S3 الضرورية على مجموعة الإخراج:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "kms:Decrypt",
                    "kms:Encrypt",
                    "kms:GenerateDataKey"
                ],
                "Resource": [
                    "<KMS_KEY_ARN_Used_for_S3_encryption>"
                ]
            },
            {
                "Sid": "VisualEditor1",
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:GetBucketAcl",
                    "s3:GetBucketCors",
                    "s3:GetEncryptionConfiguration",
                    "s3:GetBucketLocation",
                    "s3:ListAllMyBuckets",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListBucketVersions",
                    "s3:ListMultipartUploadParts"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*"
                ]
            }
        ]
    }

  4. قم بإنشاء دور IAM جديد يسمى RoleA مع الحساب "ب" باعتباره دور الكيان الموثوق به وأضف هذه السياسة إلى الدور. يتيح ذلك للحساب "ب" تولي الدور "أ" لتنفيذ إجراءات Amazon S3 الضرورية على حاوية المخرجات.
  5. في الحساب ب، قم بإنشاء سياسة IAM تسمى s3-cross-account-access مع الإذن بالوصول إلى الكائنات الموجودة في المجموعة sample-inp-bucket-etl-<username>، الموجود في الحساب أ.
  6. أضف هذه السياسة إلى دور AWS Glue ودور Amazon MWAA:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl"
                ],
                "Resource": "arn:aws:s3:::sample-inp-bucket-etl-<username>/*"
            }
        ]
    }

  7. في الحساب ب، قم بإنشاء سياسة IAM policy_for_roleB تحديد الحساب "أ" ككيان موثوق به. فيما يلي سياسة الثقة التي يجب افتراضها RoleA في الحساب أ:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "CrossAccountPolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::<account-id-of-AcctA>:role/RoleA"
            }
        ]
    }

  8. قم بإنشاء دور IAM جديد يسمى RoleB مع Amazon Redshift كنوع الكيان الموثوق به وإضافة هذه السياسة إلى الدور. هذا يسمح RoleB لنفترض RoleA في الحساب أ وأيضًا يمكن افتراضه بواسطة Amazon Redshift.
  9. تعلق RoleB إلى مساحة الاسم Redshift Serverless، حتى يتمكن Amazon Redshift من كتابة الكائنات إلى حاوية إخراج S3 في الحساب A.
  10. إرفاق السياسة policy_for_roleB إلى دور Amazon MWAA، الذي يسمح لـ Amazon MWAA بالوصول إلى مجموعة المخرجات في الحساب أ.

الرجوع إلى كيف يمكنني توفير الوصول عبر الحسابات إلى العناصر الموجودة في حاويات Amazon S3؟ لمزيد من التفاصيل حول إعداد الوصول عبر الحسابات إلى العناصر في Amazon S3 من AWS Glue وAmazon MWAA. تشير إلى كيف يمكنني نسخ أو إلغاء تحميل البيانات من Amazon Redshift إلى حاوية Amazon S3 في حساب آخر؟ لمزيد من التفاصيل حول إعداد الأدوار لتفريغ البيانات من Amazon Redshift إلى Amazon S3 من Amazon MWAA.

قم بإعداد نظير VPC بين Amazon MWAA وAmazon Redshift VPCs

نظرًا لأن Amazon MWAA وAmazon Redshift موجودان في حاسبين افتراضيين خاصين (VPC) منفصلين، فأنت بحاجة إلى إعداد نظير VPC بينهما. يجب عليك إضافة مسار إلى جداول المسار المرتبطة بالشبكات الفرعية لكلا الخدمتين. تشير إلى العمل مع اتصالات نظير VPC للحصول على تفاصيل حول نظير VPC.

تأكد من أن نطاق CIDR الخاص بـ Amazon MWAA VPC مسموح به في مجموعة أمان Redshift وأن نطاق CIDR الخاص بـ Amazon Redshift VPC مسموح به في مجموعة أمان Amazon MWAA، كما هو موضح في لقطة الشاشة التالية.

إذا تم تكوين أي من الخطوات السابقة بشكل غير صحيح، فمن المحتمل أن تواجه خطأ "مهلة الاتصال" أثناء تشغيل DAG.

قم بتكوين اتصال Amazon MWAA مع Secrets Manager

عندما يتم تكوين مسار Amazon MWAA لاستخدام Secrets Manager، فإنه سيبحث أولاً عن الاتصالات والمتغيرات في واجهة خلفية بديلة (مثل Secrets Manager). إذا كانت الواجهة الخلفية البديلة تحتوي على القيمة المطلوبة، فسيتم إرجاعها. وإلا، فإنه سيتحقق من قاعدة بيانات البيانات التعريفية بحثًا عن القيمة ويعيدها بدلاً من ذلك. لمزيد من التفاصيل، راجع تكوين اتصال Apache Airflow باستخدام سر AWS Secrets Manager.

أكمل الخطوات التالية:

  1. تكوين أ نقطة نهاية VPC لربط Amazon MWAA و Secrets Manager (com.amazonaws.us-east-1.secretsmanager).

يتيح ذلك لـ Amazon MWAA الوصول إلى بيانات الاعتماد المخزنة في Secrets Manager.

  1. لتزويد Amazon MWAA بالإذن للوصول إلى المفاتيح السرية لـ Secrets Manager، قم بإضافة السياسة المسماة SecretsManagerReadWrite لدور IAM للبيئة.
  2. لإنشاء الواجهة الخلفية لـ Secrets Manager كخيار تكوين Apache Airflow، انتقل إلى خيارات تكوين Airflow، وأضف أزواج القيمة الرئيسية التالية، واحفظ إعداداتك.

يؤدي هذا إلى تكوين Airflow للبحث عن سلاسل الاتصال والمتغيرات في ملف airflow/connections/* و airflow/variables/* مسارات:

secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend Secrets.backend_kwargs: {"connections_prefix" : "تدفق الهواء/الاتصالات"، "variables_prefix" : "تدفق الهواء/المتغيرات"}

  1. لإنشاء سلسلة URI لاتصال Airflow، انتقل إلى أوس كلاودشيل والدخول في قذيفة بايثون.
  2. قم بتشغيل التعليمة البرمجية التالية لإنشاء سلسلة URI للاتصال:
    import urllib.parse
    conn_type = 'redshift'
    host = 'sample-workgroup.<account-id-of-AcctB>.us-east-1.redshift-serverless.amazonaws.com' #Specify the Amazon Redshift workgroup endpoint
    port = '5439'
    login = 'admin' #Specify the username to use for authentication with Amazon Redshift
    password = '<password>' #Specify the password to use for authentication with Amazon Redshift
    role_arn = urllib.parse.quote_plus('arn:aws:iam::<account_id>:role/service-role/<MWAA-role>')
    database = 'dev'
    region = 'us-east-1' #YOUR_REGION
    conn_string = '{0}://{1}:{2}@{3}:{4}?role_arn={5}&database={6}&region={7}'.format(conn_type, login, password, host, port, role_arn, database, region)
    print(conn_string)
    

يجب إنشاء سلسلة الاتصال على النحو التالي:

redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=<region>

  1. أضف الاتصال في Secrets Manager باستخدام الأمر التالي في ملف واجهة سطر الأوامر AWS (AWS CLI).

يمكن القيام بذلك أيضًا من وحدة تحكم Secrets Manager. ستتم إضافة هذا في Secrets Manager كنص عادي.

aws secretsmanager create-secret --name airflow/connections/secrets_redshift_connection --description "Apache Airflow to Redshift Cluster" --secret-string "redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=us-east-1" --region=us-east-1

استخدم الاتصال airflow/connections/secrets_redshift_connection في داج. عند تشغيل DAG، سيبحث عن هذا الاتصال ويستعيد الأسرار من Secrets Manager. في حالة RedshiftDataOperator، مرر ال secret_arn كمعلمة بدلاً من اسم الاتصال.

يمكنك أيضًا إضافة أسرار باستخدام وحدة تحكم Secrets Manager كأزواج قيمة المفتاح.

  1. أضف سرًا آخر في Secrets Manager واحفظه باسم airflow/connections/redshift_conn_test.

قم بإنشاء اتصال Airflow من خلال قاعدة بيانات التعريف

يمكنك أيضًا إنشاء اتصالات في واجهة المستخدم. في هذه الحالة، سيتم تخزين تفاصيل الاتصال في قاعدة بيانات تعريف Airflow. إذا لم يتم تكوين بيئة Amazon MWAA لاستخدام الواجهة الخلفية لـ Secrets Manager، فسوف تتحقق من قاعدة بيانات البيانات التعريفية بحثًا عن القيمة وتعيدها. يمكنك إنشاء اتصال Airflow باستخدام واجهة المستخدم، أو AWS CLI، أو API. نعرض في هذا القسم كيفية إنشاء اتصال باستخدام واجهة مستخدم Airflow.

  1. في حالة معرف الاتصال، أدخل اسمًا للاتصال.
  2. في حالة نوع الاتصال، اختر الأمازون الأحمر.
  3. في حالة مضيف، أدخل نقطة نهاية Redshift (بدون منفذ وقاعدة بيانات) لـ Redshift Serverless.
  4. في حالة قاعدة البيانات، أدخل dev.
  5. في حالة مستخدم، أدخل اسم المستخدم المسؤول الخاص بك.
  6. في حالة كلمة المرور، ادخل رقمك السري.
  7. في حالة ميناءاستخدم المنفذ 5439.
  8. في حالة إكسترا، تعيين region و timeout المعلمات.
  9. اختبر الاتصال، ثم احفظ إعداداتك.

إنشاء وتشغيل DAG

في هذا القسم، نوضح كيفية إنشاء DAG باستخدام مكونات مختلفة. بعد إنشاء DAG وتشغيله، يمكنك التحقق من النتائج عن طريق الاستعلام عن جداول Redshift والتحقق من مجموعات S3 المستهدفة.

قم بإنشاء DAG

في Airflow، يتم تعريف خطوط أنابيب البيانات في كود Python على أنها DAGs. نقوم بإنشاء DAG يتكون من عوامل تشغيل وأجهزة استشعار واتصالات ومهام وقواعد مختلفة:

  • يبدأ DAG بالبحث عن الملفات المصدر في حاوية S3 sample-inp-bucket-etl-<username> ضمن الحساب "أ" لليوم الحالي باستخدام S3KeySensor. يتم استخدام S3KeySensor لانتظار وجود مفتاح واحد أو عدة مفاتيح في حاوية S3.
    • على سبيل المثال، تم تقسيم حاوية S3 الخاصة بنا إلى s3://bucket/products/YYYY/MM/DD/، لذلك يجب أن يقوم المستشعر بالتحقق من المجلدات التي تحتوي على التاريخ الحالي. لقد اشتقنا التاريخ الحالي في DAG وقمنا بتمريره إلى S3KeySensor، والذي يبحث عن أي ملفات جديدة في مجلد اليوم الحالي.
    • نحن أيضا نضع wildcard_match as True، والتي تمكن عمليات البحث على bucket_key ليتم تفسيرها على أنها نمط البدل يونكس. تعيين mode إلى reschedule بحيث تقوم مهمة المستشعر بتحرير فتحة العامل عند عدم استيفاء المعايير وإعادة جدولتها في وقت لاحق. كأفضل ممارسة، استخدم هذا الوضع عندما poke_interval أكثر من دقيقة واحدة لمنع التحميل الزائد على المجدول.
  • بعد توفر الملف في حاوية S3، يتم تشغيل زاحف AWS Glue باستخدام GlueCrawlerOperator للزحف إلى مجموعة مصدر S3 sample-inp-bucket-etl-<username> ضمن الحساب أ ويقوم بتحديث بيانات تعريف الجدول ضمن products_db قاعدة البيانات في كتالوج البيانات. يستخدم الزاحف دور AWS Glue وقاعدة بيانات كتالوج البيانات التي تم إنشاؤها في الخطوات السابقة.
  • يستخدم DAG GlueCrawlerSensor لانتظار اكتمال الزاحف.
  • عند اكتمال مهمة الزاحف، GlueJobOperator يتم استخدامه لتشغيل مهمة AWS Glue. اسم البرنامج النصي AWS Glue (مع الموقع) ويتم تمريره إلى المشغل مع دور AWS Glue IAM. معلمات أخرى مثل GlueVersion, NumberofWorkersو WorkerType يتم تمريرها باستخدام create_job_kwargs المعلمة.
  • يستخدم DAG GlueJobSensor لانتظار اكتمال مهمة AWS Glue. عند اكتماله، يظهر جدول التدريج الخاص بالتحول نحو الأحمر products سيتم تحميله بالبيانات من ملف S3.
  • يمكنك الاتصال بـ Amazon Redshift من Airflow باستخدام ثلاثة أنواع مختلفة مشغلي:
    • PythonOperator.
    • SQLExecuteQueryOperator، والذي يستخدم اتصال PostgreSQL و redshift_default كالاتصال الافتراضي.
    • RedshiftDataOperator، والذي يستخدم Redshift Data API و aws_default كالاتصال الافتراضي.

في DAG لدينا، نستخدم SQLExecuteQueryOperator و RedshiftDataOperator لإظهار كيفية استخدام هذه العوامل. يتم تشغيل إجراءات Redshift المخزنة RedshiftDataOperator. يقوم DAG أيضًا بتشغيل أوامر SQL في Amazon Redshift لحذف البيانات من الجدول المرحلي باستخدام SQLExecuteQueryOperator.

نظرًا لأننا قمنا بتكوين بيئة Amazon MWAA الخاصة بنا للبحث عن الاتصالات في Secrets Manager، فعند تشغيل DAG، فإنه يسترد تفاصيل اتصال Redshift مثل اسم المستخدم وكلمة المرور والمضيف والمنفذ والمنطقة من Secrets Manager. إذا لم يتم العثور على الاتصال في Secrets Manager، فسيتم استرداد القيم من الاتصالات الافتراضية.

In SQLExecuteQueryOperator، نقوم بتمرير اسم الاتصال الذي أنشأناه في Secrets Manager. إنه يبحث عن airflow/connections/secrets_redshift_connection ويستعيد الأسرار من مدير الأسرار. إذا لم يتم إعداد Secrets Manager، فسيتم إنشاء الاتصال يدويًا (على سبيل المثال، redshift-conn-id) يمكن تمريرها.

In RedshiftDataOperator، نمرر سر_الـ airflow/connections/redshift_conn_test تم إنشاء الاتصال في Secrets Manager كمعلمة.

  • كمهمة أخيرة، RedshiftToS3Operator يُستخدم لتفريغ البيانات من جدول Redshift إلى حاوية S3 sample-opt-bucket-etl في الحساب ب. airflow/connections/redshift_conn_test من Secrets Manager يستخدم لتفريغ البيانات.
  • TriggerRule ومن المقرر أن ALL_DONE، والذي يتيح تشغيل الخطوة التالية بعد اكتمال جميع المهام الأولية.
  • يتم تعريف تبعية المهام باستخدام chain() الوظيفة، والتي تسمح بتشغيل المهام بشكل متوازي إذا لزم الأمر. في حالتنا، نريد تنفيذ جميع المهام بالتسلسل.

فيما يلي رمز DAG الكامل. ال dag_id يجب أن يتطابق مع اسم البرنامج النصي DAG، وإلا فلن تتم مزامنته مع واجهة مستخدم Airflow.

from datetime import datetime
from airflow import DAG 
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.utils.trigger_rule import TriggerRule


dag_id = "data_pipeline"
vYear = datetime.today().strftime("%Y")
vMonth = datetime.today().strftime("%m")
vDay = datetime.today().strftime("%d")
src_bucket_name = "sample-inp-bucket-etl-<username>"
tgt_bucket_name = "sample-opt-bucket-etl-<username>"
s3_folder="products"
#Please replace the variable with the glue_role_arn
glue_role_arn_key = "arn:aws:iam::<account_id>:role/<Glue-role>"
glue_crawler_name = "products"
glue_db_name = "products_db"
glue_job_name = "sample_glue_job"
glue_script_location="s3://aws-glue-assets-<account_id>-<region>/scripts/sample_glue_job.py"
workgroup_name = "sample-workgroup"
redshift_table = "products_f"
redshift_conn_id_name="secrets_redshift_connection"
db_name = "dev"
secret_arn="arn:aws:secretsmanager:us-east-1:<account_id>:secret:airflow/connections/redshift_conn_test-xxxx"
poll_interval = 10

@task
def get_role_name(arn: str) -> str:
    return arn.split("/")[-1]

@task
def get_s3_loc(s3_folder: str) -> str:
    s3_loc  = s3_folder + "/year=" + vYear + "/month=" + vMonth + "/day=" + vDay + "/*.csv"
    return s3_loc

with DAG(
    dag_id=dag_id,
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
) as dag:
    role_arn = glue_role_arn_key
    glue_role_name = get_role_name(role_arn)
    s3_loc = get_s3_loc(s3_folder)


    # Check for new incremental files in S3 source/input bucket
    sensor_key = S3KeySensor(
        task_id="sensor_key",
        bucket_key=s3_loc,
        bucket_name=src_bucket_name,
        wildcard_match=True,
        #timeout=18*60*60,
        #poke_interval=120,
        timeout=60,
        poke_interval=30,
        mode="reschedule"
    )

    # Run Glue crawler
    glue_crawler_config = {
        "Name": glue_crawler_name,
        "Role": role_arn,
        "DatabaseName": glue_db_name,
    }

    crawl_s3 = GlueCrawlerOperator(
        task_id="crawl_s3",
        config=glue_crawler_config,
    )

    # GlueCrawlerOperator waits by default, setting as False to test the Sensor below.
    crawl_s3.wait_for_completion = False

    # Wait for Glue crawler to complete
    wait_for_crawl = GlueCrawlerSensor(
        task_id="wait_for_crawl",
        crawler_name=glue_crawler_name,
    )

    # Run Glue Job
    submit_glue_job = GlueJobOperator(
        task_id="submit_glue_job",
        job_name=glue_job_name,
        script_location=glue_script_location,
        iam_role_name=glue_role_name,
        create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 10, "WorkerType": "G.1X"},
    )

    # GlueJobOperator waits by default, setting as False to test the Sensor below.
    submit_glue_job.wait_for_completion = False

    # Wait for Glue Job to complete
    wait_for_job = GlueJobSensor(
        task_id="wait_for_job",
        job_name=glue_job_name,
        # Job ID extracted from previous Glue Job Operator task
        run_id=submit_glue_job.output,
        verbose=True,  # prints glue job logs in airflow logs
    )

    wait_for_job.poke_interval = 5

    # Execute the Stored Procedure in Redshift Serverless using Data Operator
    execute_redshift_stored_proc = RedshiftDataOperator(
        task_id="execute_redshift_stored_proc",
        database=db_name,
        workgroup_name=workgroup_name,
        secret_arn=secret_arn,
        sql="""CALL sp_products();""",
        poll_interval=poll_interval,
        wait_for_completion=True,
    )

    # Execute the Stored Procedure in Redshift Serverless using SQL Operator
    delete_from_table = SQLExecuteQueryOperator(
        task_id="delete_from_table",
        conn_id=redshift_conn_id_name,
        sql="DELETE FROM products;",
        trigger_rule=TriggerRule.ALL_DONE,
    )

    # Unload the data from Redshift table to S3
    transfer_redshift_to_s3 = RedshiftToS3Operator(
        task_id="transfer_redshift_to_s3",
        s3_bucket=tgt_bucket_name,
        s3_key=s3_loc,
        schema="PUBLIC",
        table=redshift_table,
        redshift_conn_id=redshift_conn_id_name,
    )

    transfer_redshift_to_s3.trigger_rule = TriggerRule.ALL_DONE

    #Chain the tasks to be executed
    chain(
        sensor_key,
        crawl_s3,
        wait_for_crawl,
        submit_glue_job,
        wait_for_job,
        execute_redshift_stored_proc,
        delete_from_table,
        transfer_redshift_to_s3
        )
    

التحقق من تشغيل DAG

بعد إنشاء ملف DAG (استبدل المتغيرات في البرنامج النصي DAG) وتحميله إلى ملف s3://sample-airflow-instance/dags المجلد، ستتم مزامنته تلقائيًا مع واجهة مستخدم Airflow. تظهر جميع DAGs على DAGs فاتورة غير مدفوعة. تبديل ON خيار لجعل DAG قابلاً للتشغيل. لأنه تم ضبط DAG الخاص بنا على schedule="@once"، فأنت بحاجة إلى تشغيل المهمة يدويًا عن طريق اختيار أيقونة التشغيل الموجودة أسفل الإجراءات. عند اكتمال DAG، يتم تحديث الحالة باللون الأخضر، كما هو موضح في لقطة الشاشة التالية.

في مجلة روابط القسم، هناك خيارات لعرض الكود والرسم البياني والشبكة والسجل والمزيد. يختار رسم بياني لتصور DAG بتنسيق رسم بياني. كما هو موضح في لقطة الشاشة التالية، يشير كل لون في العقدة إلى عامل تشغيل محدد، ويشير لون مخطط العقدة إلى حالة محددة.

تحقق من النتائج

في وحدة تحكم Amazon Redshift ، انتقل إلى ملف محرر الاستعلام الإصدار 2 وحدد البيانات الموجودة في products_f طاولة. يجب أن يتم تحميل الجدول وأن يحتوي على نفس عدد السجلات الموجودة في ملفات S3.

على وحدة تحكم Amazon S3، انتقل إلى حاوية S3 s3://sample-opt-bucket-etl في الحساب ب product_f يجب إنشاء الملفات ضمن بنية المجلد s3://sample-opt-bucket-etl/products/YYYY/MM/DD/.

تنظيف

قم بتنظيف الموارد التي تم إنشاؤها كجزء من هذا المنشور لتجنب تكبد الرسوم المستمرة:

  1. احذف مكدسات CloudFormation وحاوية S3 التي قمت بإنشائها كمتطلبات أساسية.
  2. احذف اتصالات نظير VPCs وVPC، والسياسات والأدوار عبر الحسابات، والأسرار في Secrets Manager.

وفي الختام

باستخدام Amazon MWAA، يمكنك إنشاء مسارات عمل معقدة باستخدام Airflow وPython دون إدارة المجموعات أو العقد أو أي حمل تشغيلي آخر يرتبط عادةً بنشر Airflow وتوسيع نطاقه في الإنتاج. في هذا المنشور، أظهرنا كيف توفر Amazon MWAA طريقة تلقائية لاستيعاب البيانات وتحويلها وتحليلها وتوزيعها بين الحسابات والخدمات المختلفة داخل AWS. لمزيد من الأمثلة على مشغلي AWS الآخرين، راجع ما يلي مستودع جيثب; نحن نشجعك على معرفة المزيد من خلال تجربة بعض هذه الأمثلة.


حول المؤلف


راديكا جاكولا هو مهندس حلول النماذج الأولية للبيانات الضخمة في AWS. إنها تساعد العملاء على بناء نماذج أولية باستخدام خدمات تحليلات AWS وقواعد البيانات المصممة لهذا الغرض. وهي متخصصة في تقييم مجموعة واسعة من المتطلبات وتطبيق خدمات AWS ذات الصلة وأدوات البيانات الضخمة وأطر العمل لإنشاء بنية قوية.

سيدهانث موراليدهار هو مدير الحساب الفني الرئيسي في AWS. إنه يعمل مع عملاء المؤسسات الكبيرة الذين يديرون أعباء العمل الخاصة بهم على AWS. إنه متحمس للعمل مع العملاء ومساعدتهم في تصميم أعباء العمل فيما يتعلق بالتكاليف والموثوقية والأداء والتميز التشغيلي على نطاق واسع في رحلتهم السحابية. لديه اهتمام كبير بتحليلات البيانات أيضًا.

بقعة_صورة

أحدث المعلومات الاستخباراتية

بقعة_صورة