ゼファーネットのロゴ

Apache Airflow バージョン 2.8.1 の Amazon MWAA サポートの紹介 |アマゾン ウェブ サービス

日付:

ApacheAirflowのAmazonマネージドワークフロー (Amazon MWAA) は、マネージド オーケストレーション サービスです。 ApacheAirflow これにより、クラウドでのエンドツーエンドのデータ パイプラインのセットアップと運用が簡単になります。

組織は Amazon MWAA を使用してビジネスワークフローを強化します。例えば、 C2iゲノミクス は、データプラットフォームで Amazon MWAA を使用して、数十億レコードのがんゲノミクスデータを処理するアルゴリズムの検証を調整しています。 Twitchは、ライブ ストリーミング プラットフォームであり、140 億 XNUMX 万人を超えるアクティブ ユーザーを対象としたレコメンデーション モデルのトレーニングと展開を管理および調整しています。 Amazon MWAA を使用して拡張すると同時に、セキュリティを大幅に向上させ、インフラストラクチャ管理のオーバーヘッドを削減します。

本日、Amazon MWAA で Apache Airflow バージョン 2.8.1 環境が利用可能になったことを発表します。この投稿では、Amazon MWAA で利用できるようになった Airflow の新機能のいくつかと、Amazon MWAA 環境をセットアップまたはバージョン 2.8.1 にアップグレードする方法について説明します。

オブジェクトストレージ

データ パイプラインが拡大するにつれて、エンジニアは独自の API、認証方法、データにアクセスするための規則を使用して複数のシステムにわたるストレージを管理するのに苦労しており、カスタム ロジックとストレージ固有のオペレーターが必要になります。 Airflow は、これらの詳細を処理する統合オブジェクト ストレージ抽象化レイヤーを提供するようになり、エンジニアがデータ パイプラインに集中できるようになりました。 Airflow オブジェクト ストレージの用途 FSSPEC 異なるオブジェクト ストレージ システム間で一貫したデータ アクセス コードを有効にし、インフラストラクチャの複雑さを合理化します。

この機能の主な利点の一部を次に示します。

  • ポータブルなワークフロー – 有向非巡回グラフ (DAG) を最小限の変更でストレージ サービスを切り替えることができます。
  • 効率的なデータ転送 – メモリにロードする代わりにデータをストリーミングできます
  • メンテナンスの削減 – 個別のオペレーターが必要ないため、パイプラインの保守が簡単になります
  • 慣れ親しんだプログラミング経験 – 次のような Python モジュールを使用できます シャティル、ファイル操作の場合

オブジェクトストレージを使用するには Amazon シンプル ストレージ サービス (Amazon S3)、次のことが必要です。 追加のパッケージをインストールする s3fs と Amazon プロバイダー (apache-airflow-providers-amazon[s3fs]==x.x.x).

以下のサンプル コードでは、データを直接移動する方法を確認できます。 Googleのクラウドストレージ Amazon S3に。 Airflow のオブジェクト ストレージでは shutil.copyfileobj、オブジェクトのデータはチャンクで読み取られます。 gcs_data_source そしてストリーミング配信された amazon_s3_data_target.

gcs_data_source = ObjectStoragePath("gcs://source-bucket/prefix/", conn_id="google_cloud_default")

amazon_s3_data_target = ObjectStoragePath("s3://target-bucket/prefix/", conn_id="aws_default ")

with DAG(
    dag_id="copy_from_gcs_to_amazon_s3",
    start_date=datetime(2024, 2, 26),
    schedule="0 0 * * *",
    catchup=False,    
    tags=["2.8", "ObjectStorage"],
) as dag:

    def list_objects(path: ObjectStoragePath) -> list[ObjectStoragePath]:
        objects = [f for f in path.iterdir() if f.is_file()]
        return objects

    def copy_object(path: ObjectStoragePath, object: ObjectStoragePath):    
        object.copy(dst=path)

    objects_list = list_objects(path=gcs_data_source)
    copy_object.partial(path=amazon_s3_data_target).expand(object=objects_list)

Airflow オブジェクト ストレージの詳細については、以下を参照してください。 オブジェクト記憶域.

XCom UI

XCom (相互通信) により、タスク間でのデータの受け渡しが可能になり、タスク間の通信と調整が容易になります。以前は、開発者はタスクに関連する XCom を表示するには、別のビューに切り替える必要がありました。 Airflow 2.8 では、次のスクリーンショットに示すように、XCom Key-Value が Airflow Grid ビュー内のタブに直接レンダリングされます。

新しい エックスコム タブには次の利点があります。

  • XCom の可視性の向上 – UI の専用タブは、DAG またはタスクに関連付けられたすべての XCom を表示する便利でユーザーフレンドリーな方法を提供します。
  • デバッグの改善 – UI で XCom 値を直接確認できるため、DAG のデバッグに役立ちます。 Python コードを使用して手動でプルして検査する必要がなく、アップストリーム タスクの出力をすぐに確認できます。

タスクコンテキストロガー

タスクのライフサイクルを管理することは、Airflow でデータ パイプラインをスムーズに運用するために重要です。ただし、特にタスクが予期せず停止されるシナリオでは、特定の課題が残ります。これは、スケジューラのタイムアウトなど、さまざまな理由で発生する可能性があります。 ゾンビ タスク (ハートビートを送信せずに実行状態のままのタスク)、またはワーカーがメモリを使い果たしたインスタンス。

従来、このような障害、特にスケジューラーやエグゼキューターなどのコア Airflow コンポーネントによってトリガーされた障害は、タスク ログ内に記録されませんでした。この制限により、ユーザーは Airflow UI の外部でトラブルシューティングを行う必要があり、問題を特定して解決するプロセスが複雑になりました。

Airflow 2.8 では、この問題に対処する大幅な改善が導入されました。スケジューラやエグゼキュータを含むエアフロー コンポーネントは、新しい TaskContextLogger エラー メッセージをタスク ログに直接転送します。この機能を使用すると、タスクの実行に関連するすべての関連エラー メッセージを 1 か所で確認できます。これにより、タスクが失敗した理由を解明するプロセスが簡素化され、単一のログ ビュー内で何が問題になったのかを完全に把握できるようになります。

次のスクリーンショットは、タスクがどのように検出されるかを示しています。 zombie、スケジューラ ログはタスク ログの一部として含まれています。

環境構成パラメータを設定する必要があります enable_task_context_logger 〜へ True、機能を有効にします。これを有効にすると、Airflow はスケジューラー、エグゼキューター、またはコールバック実行コンテキストからタスク ログにログを送信し、Airflow UI で使用できるようになります。

データセットのリスナーフック

データセット データを意識したスケジューリングと DAG 間の依存関係を作成するためのデータ ソースの論理グループとして Airflow 2.4 で導入されました。たとえば、プロデューサー DAG がデータセットを更新するときにコンシューマー DAG が実行されるようにスケジュールできます。 リスナー Airflow ユーザーが環境内で発生する特定のイベントへのサブスクリプションを作成できるようにします。 Airflow 2.8 では、次の XNUMX つのデータセット イベントに対してリスナーが追加されます。 on_dataset_created & on_dataset_changedこれにより、Airflow ユーザーがデータセット管理操作に反応するカスタム コードを効率的に作成できるようになります。たとえば、外部システムをトリガーしたり、通知を送信したりできます。

データセットのリスナー フックの使用は簡単です。リスナーを作成するには、次の手順を実行します。 on_dataset_changed:

  1. リスナーを作成します (dataset_listener.py):
    from airflow import Dataset
    from airflow.listeners import hookimpl
    
    @hookimpl
    def on_dataset_changed(dataset: Dataset):
        """Following custom code is executed when a dataset is changed."""
        print("Invoking external endpoint")
    
        """Validating a specific dataset"""
        if dataset.uri == "s3://bucket-prefix/object-key.ext":
            print ("Execute specific/different action for this dataset")

  2. Airflow 環境にリスナーを登録するプラグインを作成します (dataset_listener_plugin.py):
    from airflow.plugins_manager import AirflowPlugin
    from plugins import listener_code
    
    class DatasetListenerPlugin(AirflowPlugin):
        name = "dataset_listener_plugin"
        listeners = [dataset_listener]

Amazon MWAA にプラグインをインストールする方法の詳細については、を参照してください。 カスタムプラグインのインストール.

Amazon MWAA で新しい Airflow 2.8.1 環境をセットアップする

を開始できます。 アカウントと優先リージョンで、 AWSマネジメントコンソール、API、または AWSコマンドラインインターフェイス (AWS CLI)。 コードとしてのインフラストラクチャ (IaC) を採用している場合は、次を使用してセットアップを自動化できます。 AWS CloudFormation AWSクラウド開発キット (AWS CDK)、または Terraform スクリプト。

Amazon MWAA で Airflow バージョン 2.8.1 環境が正常に作成されると、特定のパッケージがスケジューラー ノードとワーカー ノードに自動的にインストールされます。インストールされているパッケージとそのバージョンの完全なリストについては、以下を参照してください。 Amazon MWAA 環境にインストールされた Apache Airflow プロバイダー パッケージ。要件ファイルを使用して追加のパッケージをインストールできます。

Airflow の古いバージョンからバージョン 2.8.1 にアップグレードします。

インプレース バージョン アップグレードを使用して、古い Airflow バージョン 2.x ベースの環境をバージョン 2.8.1 にアップグレードすることで、これらの最新機能を活用できます。インプレース バージョン アップグレードの詳細については、次を参照してください。 Apache Airflow バージョンのアップグレード or Amazon MWAA によるインプレースバージョンアップグレードの導入.

まとめ

この投稿では、オブジェクト ストレージ、グリッド ビューに追加された新しい XCom タブ、タスク コンテキスト ログ、データセットのリスナー フックなど、Airflow バージョン 2.8 で導入されたいくつかの重要な機能と、それらの使用を開始する方法について説明しました。 Amazon MWAA での実装を示すサンプルコードも提供しました。変更点の完全なリストについては、以下を参照してください。 Airflow のリリースノート.

Amazon MWAA の詳細とコード例については、次のサイトを参照してください。 Amazon MWAA ユーザーガイドAmazon MWAA の例 GitHub リポジトリ.

Apache、Apache Airflow、および Airflow は、Apache の登録商標または商標です。 Apache Software Foundation 米国および/または他の国で。


著者について

マンシ ブタダ は、オランダを拠点とする ISV ソリューション アーキテクトです。彼女は、お客様がビジネス上の問題に対処する、適切に設計されたソリューションを AWS で設計および実装できるよう支援します。彼女はデータ分析とネットワーキングに情熱を持っています。仕事以外にも、食べ物を試したり、ピックルボールをしたり、楽しいボードゲームに飛び込むことを楽しんでいます。

エルナン・ガルシア オランダを拠点とする AWS のシニア ソリューション アーキテクトです。彼は金融サービス業界で働いており、企業のクラウド導入をサポートしています。彼はサーバーレス テクノロジー、セキュリティ、コンプライアンスに熱心に取り組んでいます。彼は家族や友人と時間を過ごし、さまざまな料理の新しい料理を試すことを楽しんでいます。

スポット画像

最新のインテリジェンス

スポット画像