شعار زيفيرنت

نقدم لكم دعم Amazon MWAA لإصدار Apache Airflow 2.8.1 | خدمات الويب الأمازون

التاريخ:

تدفقات عمل أمازون المدارة لتدفق أباتشي (Amazon MWAA) هي خدمة تنسيق مُدارة لـ أباتشي تدفق الهواء مما يجعل من السهل إعداد وتشغيل خطوط أنابيب البيانات الشاملة في السحابة.

تستخدم المؤسسات Amazon MWAA لتحسين سير عمل أعمالها. على سبيل المثال، علم الجينوم C2i تستخدم Amazon MWAA في منصة البيانات الخاصة بها لتنسيق التحقق من صحة الخوارزميات التي تعالج بيانات جينوم السرطان في مليارات السجلات. تويتش، وهي منصة للبث المباشر، تدير وتنسق التدريب ونشر نماذج التوصيات الخاصة بها لأكثر من 140 مليون مستخدم نشط. إنهم يستخدمون Amazon MWAA للتوسع، مع تحسين الأمان بشكل كبير وتقليل النفقات العامة لإدارة البنية التحتية.

نعلن اليوم عن توفر بيئات Apache Airflow الإصدار 2.8.1 على Amazon MWAA. في هذا المنشور، نرشدك عبر بعض الميزات والإمكانيات الجديدة لـ Airflow المتوفرة الآن في Amazon MWAA، وكيف يمكنك إعداد أو ترقية بيئة Amazon MWAA إلى الإصدار 2.8.1.

تخزين الكائنات

مع توسع خطوط البيانات، يواجه المهندسون صعوبة في إدارة التخزين عبر أنظمة متعددة باستخدام واجهات برمجة تطبيقات فريدة وطرق مصادقة واتفاقيات للوصول إلى البيانات، مما يتطلب منطقًا مخصصًا وعوامل تشغيل خاصة بالتخزين. يوفر Airflow الآن طبقة تجريد موحدة لتخزين الكائنات تتعامل مع هذه التفاصيل، مما يسمح للمهندسين بالتركيز على خطوط أنابيب البيانات الخاصة بهم. استخدامات تخزين كائن تدفق الهواء com.fsspec لتمكين كود الوصول المتسق للبيانات عبر أنظمة تخزين الكائنات المختلفة، وبالتالي تبسيط تعقيد البنية التحتية.

وفيما يلي بعض الفوائد الرئيسية لهذه الميزة:

  • سير العمل المحمول – يمكنك تبديل خدمات التخزين مع الحد الأدنى من التغييرات في الرسوم البيانية غير الدورية الموجهة (DAGs)
  • عمليات نقل البيانات بكفاءة – يمكنك دفق البيانات بدلاً من تحميلها في الذاكرة
  • صيانة منخفضة - لا تحتاج إلى مشغلين منفصلين، مما يجعل صيانة خطوط الأنابيب الخاصة بك سهلة
  • تجربة برمجة مألوفة – يمكنك استخدام وحدات بايثون، مثل Shutil، لعمليات الملف

لاستخدام تخزين الكائنات مع خدمة تخزين أمازون البسيطة (أمازون S3)، تحتاج إلى ذلك قم بتثبيت الحزمة الإضافية s3fs مع موفر أمازون (apache-airflow-providers-amazon[s3fs]==x.x.x).

في نموذج التعليمات البرمجية أدناه، يمكنك معرفة كيفية نقل البيانات مباشرة من جوجل سحابة التخزين إلى أمازون S3. لأن تخزين كائن Airflow يستخدم shutil.copyfileobj، تتم قراءة بيانات الكائنات على أجزاء من gcs_data_source وتدفق إلى amazon_s3_data_target.

gcs_data_source = ObjectStoragePath("gcs://source-bucket/prefix/", conn_id="google_cloud_default")

amazon_s3_data_target = ObjectStoragePath("s3://target-bucket/prefix/", conn_id="aws_default ")

with DAG(
    dag_id="copy_from_gcs_to_amazon_s3",
    start_date=datetime(2024, 2, 26),
    schedule="0 0 * * *",
    catchup=False,    
    tags=["2.8", "ObjectStorage"],
) as dag:

    def list_objects(path: ObjectStoragePath) -> list[ObjectStoragePath]:
        objects = [f for f in path.iterdir() if f.is_file()]
        return objects

    def copy_object(path: ObjectStoragePath, object: ObjectStoragePath):    
        object.copy(dst=path)

    objects_list = list_objects(path=gcs_data_source)
    copy_object.partial(path=amazon_s3_data_target).expand(object=objects_list)

لمزيد من المعلومات حول تخزين كائن Airflow، راجع تخزين الكائنات.

واجهة المستخدم XCom

تسمح تقنية XCom (الاتصالات المتقاطعة) بتمرير البيانات بين المهام، مما يسهل الاتصال والتنسيق فيما بينها. في السابق، كان على المطورين التبديل إلى طريقة عرض مختلفة لرؤية XComs المرتبطة بمهمة ما. باستخدام Airflow 2.8، يتم عرض قيم مفتاح XCom مباشرةً على علامة تبويب ضمن عرض Airflow Grid، كما هو موضح في لقطة الشاشة التالية.

الجديد xcom توفر علامة التبويب الفوائد التالية:

  • تحسين رؤية XCom - توفر علامة التبويب المخصصة في واجهة المستخدم طريقة مريحة وسهلة الاستخدام لرؤية جميع XComs المرتبطة بـ DAG أو مهمة.
  • تحسين التصحيح - تعد القدرة على رؤية قيم XCom مباشرة في واجهة المستخدم مفيدة لتصحيح أخطاء DAGs. يمكنك رؤية مخرجات المهام الأولية بسرعة دون الحاجة إلى سحبها وفحصها يدويًا باستخدام كود Python.

مسجل سياق المهمة

تعد إدارة دورات حياة المهام أمرًا بالغ الأهمية للتشغيل السلس لخطوط أنابيب البيانات في Airflow. ومع ذلك، استمرت بعض التحديات، خاصة في السيناريوهات التي يتم فيها إيقاف المهام بشكل غير متوقع. يمكن أن يحدث هذا لأسباب مختلفة، بما في ذلك مهلات المجدول، الاموات الاحياء المهام (المهام التي تظل في حالة تشغيل دون إرسال نبضات)، أو الحالات التي نفدت فيها ذاكرة العامل.

تقليديًا، لم يتم تسجيل مثل هذه الإخفاقات، خاصة تلك الناجمة عن مكونات Airflow الأساسية مثل المجدول أو المنفذ، ضمن سجلات المهام. يتطلب هذا القيد من المستخدمين استكشاف الأخطاء وإصلاحها خارج واجهة مستخدم Airflow، مما يعقد عملية تحديد المشكلات وحلها.

قدم Airflow 2.8 تحسينًا كبيرًا يعالج هذه المشكلة. يمكن الآن لمكونات تدفق الهواء، بما في ذلك المجدول والمنفذ، استخدام الإصدار الجديد TaskContextLogger لإعادة توجيه رسائل الخطأ مباشرة إلى سجلات المهام. تتيح لك هذه الميزة رؤية جميع رسائل الخطأ ذات الصلة المتعلقة بتشغيل المهمة في مكان واحد. يعمل هذا على تبسيط عملية اكتشاف سبب فشل المهمة، مما يوفر منظورًا كاملاً للخطأ الذي حدث في عرض سجل واحد.

توضح لقطة الشاشة التالية كيفية اكتشاف المهمة zombie، ويتم تضمين سجل المجدول كجزء من سجل المهام.

تحتاج إلى تعيين معلمة تكوين البيئة enable_task_context_logger إلى True، لتفعيل الميزة. بمجرد تمكينه، يمكن لـ Airflow إرسال السجلات من المجدول أو المنفذ أو سياق تشغيل رد الاتصال إلى سجلات المهام، وإتاحتها في واجهة مستخدم Airflow.

خطافات المستمع لمجموعات البيانات

قواعد البيانات تم تقديمها في Airflow 2.4 كتجميع منطقي لمصادر البيانات لإنشاء جدولة وتبعيات مدركة للبيانات بين DAGs. على سبيل المثال، يمكنك جدولة DAG للمستهلك ليتم تشغيله عندما يقوم منتج DAG بتحديث مجموعة بيانات. المستمعين تمكين مستخدمي Airflow من إنشاء اشتراكات في أحداث معينة تحدث في البيئة. في Airflow 2.8، تتم إضافة المستمعين لحدثين لمجموعتي البيانات: on_dataset_created و on_dataset_changedمما يسمح لمستخدمي Airflow بكتابة تعليمات برمجية مخصصة للتفاعل مع عمليات إدارة مجموعة البيانات. على سبيل المثال، يمكنك تشغيل نظام خارجي، أو إرسال إشعار.

يعد استخدام خطافات المستمع لمجموعات البيانات أمرًا بسيطًا. أكمل الخطوات التالية لإنشاء مستمع لـ on_dataset_changed:

  1. إنشاء المستمع (dataset_listener.py):
    from airflow import Dataset
    from airflow.listeners import hookimpl
    
    @hookimpl
    def on_dataset_changed(dataset: Dataset):
        """Following custom code is executed when a dataset is changed."""
        print("Invoking external endpoint")
    
        """Validating a specific dataset"""
        if dataset.uri == "s3://bucket-prefix/object-key.ext":
            print ("Execute specific/different action for this dataset")

  2. قم بإنشاء مكون إضافي لتسجيل المستمع في بيئة Airflow الخاصة بك (dataset_listener_plugin.py):
    from airflow.plugins_manager import AirflowPlugin
    from plugins import listener_code
    
    class DatasetListenerPlugin(AirflowPlugin):
        name = "dataset_listener_plugin"
        listeners = [dataset_listener]

لمزيد من المعلومات حول كيفية تثبيت المكونات الإضافية في Amazon MWAA، راجع تثبيت المكونات الإضافية المخصصة.

قم بإعداد بيئة Airflow 2.8.1 جديدة في Amazon MWAA

يمكنك الشروع في الإعداد في حسابك والمنطقة المفضلة باستخدام وحدة تحكم إدارة AWSأو API أو واجهة سطر الأوامر AWS (AWS CLI). إذا كنت تستخدم البنية الأساسية كرمز (IaC)، فيمكنك أتمتة الإعداد باستخدام تكوين سحابة AWSأطلقت حملة مجموعة تطوير سحابة AWS (AWS CDK) أو نصوص Terraform.

عند الإنشاء الناجح لبيئة Airflow الإصدار 2.8.1 في Amazon MWAA، يتم تثبيت حزم معينة تلقائيًا على المجدول وعقد العامل. للحصول على قائمة كاملة بالحزم المثبتة وإصداراتها، راجع حزم موفر Apache Airflow مثبتة على بيئات Amazon MWAA. يمكنك تثبيت حزم إضافية باستخدام ملف المتطلبات.

قم بالترقية من الإصدارات الأقدم من Airflow إلى الإصدار 2.8.1

يمكنك الاستفادة من هذه الإمكانات الأحدث عن طريق ترقية البيئات القديمة المستندة إلى إصدار Airflow 2.x إلى الإصدار 2.8.1 باستخدام ترقيات الإصدار الموضعية. لمعرفة المزيد حول ترقيات الإصدار الموضعي، راجع ترقية إصدار Apache Airflow or تقديم ترقيات الإصدار الموضعية باستخدام Amazon MWAA.

وفي الختام

في هذا المنشور، ناقشنا بعض الميزات المهمة المقدمة في الإصدار 2.8 من Airflow، مثل تخزين الكائنات، وعلامة التبويب XCom الجديدة المضافة إلى عرض الشبكة، وتسجيل سياق المهام، وربطات المستمع لمجموعات البيانات، وكيف يمكنك البدء في استخدامها. لقد قدمنا ​​أيضًا بعض نماذج التعليمات البرمجية لإظهار عمليات التنفيذ في Amazon MWAA. للحصول على القائمة الكاملة للتغييرات، راجع ملاحظات إصدار Airflow.

للحصول على تفاصيل إضافية وأمثلة على التعليمات البرمجية على Amazon MWAA ، قم بزيارة دليل مستخدم Amazon MWAA و أمثلة Amazon MWAA GitHub repo.

تُعد Apache و Apache Airflow و Airflow إما علامات تجارية مسجلة أو علامات تجارية لشركة مؤسسة اباتشي للبرمجيات في الولايات المتحدة و / أو دول أخرى.


حول المؤلف

منسي بوتادا هو مهندس حلول ISV ومقره في هولندا. إنها تساعد العملاء على تصميم وتنفيذ حلول مصممة جيدًا في AWS لمعالجة مشكلات أعمالهم. إنها شغوفة بتحليلات البيانات والشبكات. بعيدًا عن العمل، تستمتع بتجربة الطعام ولعب كرة المخلل والغوص في ألعاب الطاولة الممتعة.

هرنان جارسيا هو مهندس حلول أول في AWS ومقره في هولندا. وهو يعمل في مجال الخدمات المالية، حيث يدعم المؤسسات في اعتماد السحابة الخاصة بها. إنه شغوف بالتقنيات بدون خادم والأمان والامتثال. يستمتع بقضاء الوقت مع العائلة والأصدقاء، وتجربة أطباق جديدة من مطابخ مختلفة.

بقعة_صورة

أحدث المعلومات الاستخباراتية

بقعة_صورة