和風網標誌

在 Amazon MWAA 中使用 YAML 和 DAG Factory 動態產生 DAG |亞馬遜網路服務

日期:

適用於 Apache Airflow 的亞馬遜託管工作流 (Amazon MWAA) 是一項託管服務,可讓您使用熟悉的 阿帕奇氣流 具有改進的可擴展性、可用性和安全性的環境,可增強和擴展您的業務工作流程,而無需管理底層基礎設施的營運負擔。在氣流中, 有向無環圖 (DAG) 被定義為 Python 程式碼。 動態 DAG 指的是運行時動態產生 DAG 的能力,通常是基於某些外部條件、配置或參數。動態 DAG 可協助您根據可能隨時間變化的資料和配置在 DAG 內建立、規劃和執行任務。

有多種方法可以在 Airflow DAG 中引入活力(動態DAG生成)使用環境變數和外部文件。其中一種方法是使用 達格工廠 基於 YAML 的設定檔方法。該程式庫旨在透過使用 YAML 中的聲明性參數來促進新 DAG 的建立和配置。它允許預設自訂並且是開源的,使得創建和自訂新功能變得簡單。

在這篇文章中,我們探索使用 YAML 檔案建立動態 DAG 的過程,使用 達格工廠 圖書館.動態 DAG 具有多種優點:

  1. 增強程式碼可重複使用 – 透過 YAML 檔案建構 DAG,我們推廣可重複使用元件,減少工作流程定義中的冗餘。
  2. 簡化維護 – 基於 YAML 的 DAG 產生簡化了修改和更新工作流程的流程,確保維護過程更加順暢。
  3. 靈活的參數化 – 使用 YAML,您可以參數化 DAG 配置,方便根據不同的需求動態調整工作流程。
  4. 提高調度程序效率 – 動態 DAG 可實現更有效率的調度、最佳化資源分配並增強整體工作流程運行
  5. 增強的可擴展性 – YAML 驅動的 DAG 允許並行運行,從而實現可擴展的工作流程,能夠有效處理增加的工作負載。

透過利用 YAML 檔案和 DAG Factory 程式庫的強大功能,我們推出了一種建置和管理 DAG 的通用方法,讓您能夠建立強大、可擴展且可維護的資料管道。

解決方案概述

在本文中,我們將使用一個旨在處理 COVID-19 資料集的範例 DAG 檔案。工作流程涉及處理開源資​​料集 WHO-COVID-19-全球。我們安裝後 DAG工廠 Python 套件中,我們建立一個 YAML 文件,其中包含各種任務的定義。我們透過以下方式處理特定國家/地區的死亡人數: Country 作為一個變量,它創建基於各個國家/地區的 DAG。

下圖說明了整體解決方案以及邏輯區塊內的資料流。

解決方案概述

條件:

對於本演練,您應該具有以下先決條件:

此外,請完成以下步驟(在 AWS地區 亞馬遜 MWAA 可用的地方):

  1. 創建一個 亞馬遜 MWAA 環境 (如果您還沒有)。如果這是您第一次使用 Amazon MWAA,請參閱 推出 Amazon Managed Workflows for Apache Airflow (MWAA).

確保 AWS身份和訪問管理 用於設定環境的 (IAM) 使用者或角色附加了以下權限的 IAM 策略:

這裡提到的存取策略僅用於本文中的範例。在生產環境中,透過行使僅提供所需的細化權限 最小特權原則.

  1. 在建立 Amazon MWAA 環境時建立一個唯一的(在帳戶內)Amazon S3 儲存桶名稱,並建立名為的資料夾 dagsrequirements.
    亞馬遜S3桶
  2. 創建並上傳 requirements.txt 文件包含以下內容 requirements 資料夾。代替 {environment-version} 以及您環境的版本號,以及 {Python-version} 使用與您的環境兼容的 Python 版本:
    --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-{Airflow-version}/constraints-{Python-version}.txt"
    dag-factory==0.19.0
    pandas==2.1.4

僅本文中描述的範例用例需要 Pandas,並且 dag-factory 是唯一需要的插件。建議檢查最新版本的兼容性 dag-factory 與亞馬遜 MWAA。這 botopsycopg2-binary 庫包含在 Apache Airflow v2 基本安裝中,不需要在您的檔案中指定 requirements.txt 文件。

  1. 下載 WHO-COVID-19-全球資料文件 到您的本機機器並將其上傳到 dags 您的 S3 儲存桶的前綴。

確保您指向的是最新的 AWS S3 儲存桶版本 requirements.txt 用於進行附加軟體包安裝的檔案。這通常需要 15 - 20 分鐘,具體取決於您的環境配置。

驗證 DAG

當您的 Amazon MWAA 環境顯示為 可用的 在 Amazon MWAA 控制台上,透過選擇導覽至 Airflow UI 打開氣流使用者介面 靠近您的環境。

驗證 DAG

透過導航至 DAG 選項卡來驗證現有 DAG。

驗證 DAG

配置您的 DAG

完成以下步驟:

  1. 建立名為的空白文件 dynamic_dags.yml, example_dag_factory.pyprocess_s3_data.py 在本地計算機上。
  2. 編輯 process_s3_data.py 檔案並使用以下程式碼內容儲存,然後將檔案上傳回 Amazon S3 儲存桶 dags 資料夾。我們在程式碼中做了一些基本的資料處理:
    1. 從 Amazon S3 位置讀取文件
    2. 重命名 Country_code 列適合該國。
    3. 按給定國家過濾數據。
    4. 將處理後的最終資料寫入CSV格式並上傳回S3前綴。
import boto3
import pandas as pd
import io
   
def process_s3_data(COUNTRY):
### Top level Variables replace S3_BUCKET with your bucket name ###
    s3 = boto3.client('s3')
    S3_BUCKET = "my-mwaa-assets-bucket-sfj33ddkm"
    INPUT_KEY = "dags/WHO-COVID-19-global-data.csv"
    OUTPUT_KEY = "dags/count_death"
### get csv file ###
   response = s3.get_object(Bucket=S3_BUCKET, Key=INPUT_KEY)
   status = response['ResponseMetadata']['HTTPStatusCode']
   if status == 200:
### read csv file and filter based on the country to write back ###
       df = pd.read_csv(response.get("Body"))
       df.rename(columns={"Country_code": "country"}, inplace=True)
       filtered_df = df[df['country'] == COUNTRY]
       with io.StringIO() as csv_buffer:
                   filtered_df.to_csv(csv_buffer, index=False)
                   response = s3.put_object(
                       Bucket=S3_BUCKET, Key=OUTPUT_KEY + '_' + COUNTRY + '.csv', Body=csv_buffer.getvalue()
                   )
       status = response['ResponseMetadata']['HTTPStatusCode']
       if status == 200:
           print(f"Successful S3 put_object response. Status - {status}")
       else:
           print(f"Unsuccessful S3 put_object response. Status - {status}")
   else:
       print(f"Unsuccessful S3 get_object response. Status - {status}")

  1. 編輯 dynamic_dags.yml 並儲存以下程式碼內容,然後將檔案上傳回 dags 資料夾。我們依照國家拼接各種DAG如下:
    1. 定義傳遞給所有 DAG 的預設參數。
    2. 透過以下方式為各國創建 DAG 定義: op_args
    3. 映射 process_s3_data 具有功能 python_callable_name.
    4. 使用 Python運算符 處理儲存在 Amazon S3 儲存桶中的 csv 檔案資料。
    5. 我們已經設置 schedule_interval 為 10 分鐘,但請根據需要隨意調整該值。
default:
  default_args:
    owner: "airflow"
    start_date: "2024-03-01"
    retries: 1
    retry_delay_sec: 300
  concurrency: 1
  max_active_runs: 1
  dagrun_timeout_sec: 600
  default_view: "tree"
  orientation: "LR"
  schedule_interval: "*/10 * * * *"
 
module3_dynamic_dag_Australia:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Australia"
 
module3_dynamic_dag_Brazil:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Brazil"
 
module3_dynamic_dag_India:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "India"
 
module3_dynamic_dag_Japan:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Japan"
 
module3_dynamic_dag_Mexico:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Mexico"
 
module3_dynamic_dag_Russia:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Russia"
 
module3_dynamic_dag_Spain:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Spain"

  1. 編輯文件 example_dag_factory.py 並儲存以下程式碼內容,然後將檔案上傳回 dags 資料夾。該程式碼清理現有的 DAG 並生成 clean_dags() 方法並使用建立新的 DAG generate_dags() 的方法 DagFactory 實例。
from airflow import DAG
import dagfactory
  
config_file = "/usr/local/airflow/dags/dynamic_dags.yml"
example_dag_factory = dagfactory.DagFactory(config_file)
  
## to clean up or delete any existing DAGs ##
example_dag_factory.clean_dags(globals())
## generate and create new DAGs ##
example_dag_factory.generate_dags(globals())

  1. 上傳檔案後,返回 Airflow UI 控制台並導航至 DAG 選項卡,您將在其中找到新的 DAG。
    列出新的 DAG
  2. 上傳檔案後,返回 Airflow UI 控制台,在 DAG 標籤下,您會發現新的 DAG 出現,如下所示:達格

您可以透過啟動 DAG 並單獨測試它們來啟用 DAG。啟動後,會產生一個名為 count_death_{COUNTRY_CODE}.csv 生成在dags資料夾中。

打掃乾淨

使用本文中討論的各種 AWS 服務可能會產生相關費用。為了防止將來產生費用,請在完成本文中概述的任務後刪除 Amazon MWAA 環境,並清空並刪除 S3 儲存桶。

結論

在這篇文章中,我們示範如何使用 達格工廠 用於建立動態 DAG 的庫。動態 DAG 的特點是能夠根據配置每次解析 DAG 檔案來產生結果。考慮在以下場景中使用動態 DAG:

  • 自動從遺留系統遷移到 Airflow,其中 DAG 產生的靈活性至關重要
  • 不同 DAG 之間僅更改參數的情況,簡化工作流程管理流程
  • 管理依賴於源系統不斷發展的結構的 DAG,提供對變化的適應性
  • 透過創建這些藍圖,在整個團隊或組織中建立 DAG 標準化實踐,從而提高一致性和效率
  • 採用基於 YAML 的聲明而不是複雜的 Python 編碼,簡化 DAG 配置和維護流程
  • 創建數據驅動的工作流程,根據數據輸入進行調整和發展,從而實現高效的自動化

透過將動態 DAG 納入您的工作流程,您可以增強自動化、適應性和標準化,最終提高資料管道管理的效率和有效性。

要了解有關 Amazon MWAA DA​​G Factory 的更多信息,請訪問 Amazon MWAA 分析研討會:DAG Factory。有關 Amazon MWAA 的更多詳細資訊和程式碼範例,請訪問 亞馬遜 MWAA 用戶指南亞馬遜 MWAA 範例 GitHub 庫。


關於作者

 賈耶什·辛德 是 AWS ProServe India 的高級應用程式架構師。他擅長使用無伺服器、DevOps 和分析等現代軟體開發實踐來創建各種以雲端為中心的解決方案。

哈什德·約拉 是 AWS ProServe India 的高級雲端架構師,可協助客戶將基礎架構遷移到 AWS 並實現現代化。他專門使用容器、AIOP 以及 AWS 開發人員工具和服務來建構 DevSecOps 和可擴展基礎架構。

現貨圖片

最新情報

現貨圖片