Logo Zephyrnet

Ìmúdàgba DAG iran pẹlu YAML ati DAG Factory ni Amazon MWAA | Amazon Web Services

ọjọ:

Ṣiṣan Iṣiṣẹ ti iṣakoso Amazon fun Apache Airflow (Amazon MWAA) jẹ iṣẹ iṣakoso ti o fun ọ laaye lati lo faramọ Afun Afẹfẹ ayika pẹlu imudara iwọn, wiwa, ati aabo lati mu dara ati iwọn awọn iṣan-iṣẹ iṣowo rẹ laisi ẹru iṣẹ ṣiṣe ti iṣakoso awọn amayederun ipilẹ. Ninu afẹfẹ afẹfẹ, Awọn aworan Acyclic Dari (DAGs) jẹ asọye bi koodu Python. Awọn DAG ti o ni agbara tọka si agbara lati ṣe ipilẹṣẹ awọn DAG lori fifo lakoko akoko asiko, ni igbagbogbo da lori diẹ ninu awọn ipo ita, awọn atunto, tabi awọn aye. Awọn DAG ti o ni agbara ṣe iranlọwọ fun ọ lati ṣẹda, ṣeto, ati ṣiṣe awọn iṣẹ ṣiṣe laarin DAG kan ti o da lori data ati awọn atunto ti o le yipada ni akoko pupọ.

Awọn ọna lọpọlọpọ lo wa lati ṣafihan dynamism ni Airflow DAGs (ìmúdàgba DAG iran) lilo awọn oniyipada ayika ati awọn faili ita. Ọkan ninu awọn ọna ni lati lo awọn DAG Factory YAML ọna iṣeto ni faili. Ile-ikawe yii ni ero lati dẹrọ ẹda ati iṣeto ti DAGs tuntun nipa lilo awọn aye asọye ni YAML. O ngbanilaaye awọn isọdi aiyipada ati pe o jẹ orisun-ìmọ, ti o jẹ ki o rọrun lati ṣẹda ati ṣe akanṣe awọn iṣẹ ṣiṣe tuntun.

Ninu ifiweranṣẹ yii, a ṣawari ilana ti ṣiṣẹda awọn DAG Dynamic pẹlu awọn faili YAML, ni lilo awọn DAG Factory ìkàwé. Awọn DAG ti o ni agbara nfunni ni ọpọlọpọ awọn anfani:

  1. Imudara koodu atunlo - Nipa siseto awọn DAG nipasẹ awọn faili YAML, a ṣe agbega awọn ohun elo atunlo, idinku idinku ninu awọn asọye ṣiṣan iṣẹ rẹ.
  2. Itọju ti o ni ilọsiwaju - Iran DAG ti o da lori YAML n ṣe irọrun ilana ti iyipada ati mimudojuiwọn ṣiṣan iṣẹ, ni idaniloju awọn ilana itọju didan.
  3. parameterization rọ – Pẹlu YAML, o le parameterize awọn atunto DAG, irọrun awọn atunṣe agbara si ṣiṣan iṣẹ ti o da lori awọn ibeere oriṣiriṣi.
  4. Ilọsiwaju ṣiṣe iṣeto – Awọn DAG ti o ni agbara jẹ ki ṣiṣe ṣiṣe ṣiṣe daradara diẹ sii, ṣiṣe ipinfunni awọn orisun ati imudara awọn ṣiṣe ṣiṣan iṣẹ gbogbogbo
  5. Ilọsiwaju scalability - Awọn DAG ti a ṣe idari YAML gba laaye fun awọn ṣiṣe isọgba, ṣiṣe awọn ṣiṣan iṣẹ iwọn ti o lagbara lati mu awọn ẹru iṣẹ pọ si daradara.

Nipa lilo agbara awọn faili YAML ati ile-ikawe DAG Factory, a ṣe ifilọlẹ ọna ti o wapọ si kikọ ati ṣiṣakoso awọn DAG, fifun ọ ni agbara lati ṣẹda awọn opo gigun ti data ti o lagbara, iwọn, ati itọju.

Akopọ ti ojutu

Ninu ifiweranṣẹ yii, a yoo lo apẹẹrẹ DAG faili ti o jẹ apẹrẹ lati ṣe ilana eto data COVID-19 kan. Ilana iṣan-iṣẹ naa pẹlu sisẹ sisẹ data orisun orisun ṣiṣi ti a funni nipasẹ WHO-COVID-19-Agbaye. Lẹhin ti a fi sori ẹrọ ni DAG-Factory Python package, a ṣẹda faili YAML ti o ni awọn asọye ti awọn iṣẹ ṣiṣe lọpọlọpọ. A ṣe ilana kika iku kan pato ti orilẹ-ede nipasẹ gbigbe Country bi oniyipada, eyiti o ṣẹda DAG ti orilẹ-ede kọọkan.

Aworan atọka atẹle ṣe afihan ojutu gbogbogbo pẹlu awọn ṣiṣan data laarin awọn bulọọki ọgbọn.

Akopọ ti Solusan

Prerequisites

Fun lilọ kiri yii, o yẹ ki o ni awọn ibeere pataki wọnyi:

Ni afikun, pari awọn igbesẹ wọnyi (ṣiṣẹ iṣeto ni ẹya AWS Ekun nibiti Amazon MWAA ti wa):

  1. Ṣẹda ohun kan Amazon MWAA ayika (ti o ko ba ni ọkan tẹlẹ). Ti eyi ba jẹ akoko akọkọ rẹ nipa lilo Amazon MWAA, tọka si Ṣafihan Awọn ṣiṣan Iṣẹ iṣakoso Amazon fun Apache Airflow (MWAA).

Rii daju pe Idanimọ AWS ati Isakoso Wiwọle (IAM) olumulo tabi ipa ti a lo fun iṣeto ayika ni awọn ilana IAM ti a so fun awọn igbanilaaye atẹle:

Awọn eto imulo wiwọle ti a mẹnuba nibi jẹ apẹẹrẹ nikan ni ifiweranṣẹ yii. Ni agbegbe iṣelọpọ, pese awọn igbanilaaye granular ti o nilo nikan nipasẹ adaṣe o kere anfani agbekale.

  1. Ṣẹda alailẹgbẹ kan (laarin akọọlẹ kan) Orukọ garawa Amazon S3 lakoko ṣiṣẹda agbegbe Amazon MWAA rẹ, ati ṣẹda awọn folda ti a pe dags ati requirements.
    Amazon S3 garawa
  2. Ṣẹda ati po si a requirements.txt faili pẹlu awọn wọnyi akoonu si awọn requirements folda. Rọpo {environment-version} pẹlu rẹ ayika ká version nọmba, ati {Python-version} pẹlu ẹya Python ti o ni ibamu pẹlu agbegbe rẹ:
    --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-{Airflow-version}/constraints-{Python-version}.txt"
    dag-factory==0.19.0
    pandas==2.1.4

Pandas nilo fun apẹẹrẹ lilo apẹẹrẹ ti a ṣalaye ninu ifiweranṣẹ yii, ati dag-factory jẹ plug-in ti a beere nikan. O ti wa ni niyanju lati ṣayẹwo awọn ibamu ti awọn titun ti ikede dag-factory pẹlu Amazon MWAA. Awọn boto ati psycopg2-binary Awọn ile-ikawe wa pẹlu ipilẹ Apache Airflow v2 fi sori ẹrọ ati pe ko nilo lati sọ pato ninu rẹ requirements.txt faili.

  1. gba awọn WHO-COVID-19-faili data agbaye si ti agbegbe rẹ ẹrọ ati ki o po si labẹ awọn dags ìpele ti rẹ S3 garawa.

Rii daju pe o n tọka si ẹya garawa AWS S3 tuntun ti tirẹ requirements.txt faili fun fifi sori package afikun lati ṣẹlẹ. Eyi yẹ ki o gba laarin awọn iṣẹju 15 – 20 da lori iṣeto agbegbe rẹ.

Ṣe idaniloju awọn DAG

Nigbati agbegbe Amazon MWAA rẹ fihan bi wa lori Amazon MWAA console, lilö kiri si Airflow UI nipa yiyan Ṣii Afẹfẹ UI tókàn si rẹ ayika.

Jẹrisi DAG

Daju awọn DAG ti o wa tẹlẹ nipa lilọ kiri si taabu DAGs.

Daju DAG

Tunto awọn DAG rẹ

Pari awọn igbesẹ wọnyi:

  1. Ṣẹda awọn faili ofo ti a npè ni dynamic_dags.yml, example_dag_factory.py ati process_s3_data.py lori ẹrọ agbegbe rẹ.
  2. Ṣatunkọ process_s3_data.py faili ki o fipamọ pẹlu akoonu koodu atẹle, lẹhinna gbe faili naa pada si garawa Amazon S3 dags folda. A n ṣe diẹ ninu sisẹ data ipilẹ ninu koodu:
    1. Ka faili naa lati ipo Amazon S3 kan
    2. Lorukọ ni Country_code iwe bi o ṣe yẹ si orilẹ-ede naa.
    3. Ajọ data nipasẹ orilẹ-ede ti a fun.
    4. Kọ data ipari ti a ti ni ilọsiwaju sinu ọna kika CSV ati gbejade pada si asọtẹlẹ S3.
import boto3
import pandas as pd
import io
   
def process_s3_data(COUNTRY):
### Top level Variables replace S3_BUCKET with your bucket name ###
    s3 = boto3.client('s3')
    S3_BUCKET = "my-mwaa-assets-bucket-sfj33ddkm"
    INPUT_KEY = "dags/WHO-COVID-19-global-data.csv"
    OUTPUT_KEY = "dags/count_death"
### get csv file ###
   response = s3.get_object(Bucket=S3_BUCKET, Key=INPUT_KEY)
   status = response['ResponseMetadata']['HTTPStatusCode']
   if status == 200:
### read csv file and filter based on the country to write back ###
       df = pd.read_csv(response.get("Body"))
       df.rename(columns={"Country_code": "country"}, inplace=True)
       filtered_df = df[df['country'] == COUNTRY]
       with io.StringIO() as csv_buffer:
                   filtered_df.to_csv(csv_buffer, index=False)
                   response = s3.put_object(
                       Bucket=S3_BUCKET, Key=OUTPUT_KEY + '_' + COUNTRY + '.csv', Body=csv_buffer.getvalue()
                   )
       status = response['ResponseMetadata']['HTTPStatusCode']
       if status == 200:
           print(f"Successful S3 put_object response. Status - {status}")
       else:
           print(f"Unsuccessful S3 put_object response. Status - {status}")
   else:
       print(f"Unsuccessful S3 get_object response. Status - {status}")

  1. Ṣatunkọ dynamic_dags.yml ati fi pamọ pẹlu akoonu koodu atẹle, lẹhinna gbe faili naa pada si awọn dags folda. A n ran awọn DAG lọpọlọpọ ti o da lori orilẹ-ede naa gẹgẹbi atẹle:
    1. Ṣetumo awọn ariyanjiyan aiyipada ti o kọja si gbogbo awọn DAG.
    2. Ṣẹda itumọ DAG fun awọn orilẹ-ede kọọkan nipasẹ gbigbe op_args
    3. Ṣe maapu naa process_s3_data iṣẹ pẹlu python_callable_name.
    4. lilo Python onišẹ lati ṣe ilana data faili csv ti o fipamọ sinu garawa Amazon S3.
    5. A ti ṣeto schedule_interval bi 10 iṣẹju, ṣugbọn lero free lati ṣatunṣe yi iye bi ti nilo.
default:
  default_args:
    owner: "airflow"
    start_date: "2024-03-01"
    retries: 1
    retry_delay_sec: 300
  concurrency: 1
  max_active_runs: 1
  dagrun_timeout_sec: 600
  default_view: "tree"
  orientation: "LR"
  schedule_interval: "*/10 * * * *"
 
module3_dynamic_dag_Australia:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Australia"
 
module3_dynamic_dag_Brazil:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Brazil"
 
module3_dynamic_dag_India:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "India"
 
module3_dynamic_dag_Japan:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Japan"
 
module3_dynamic_dag_Mexico:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Mexico"
 
module3_dynamic_dag_Russia:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Russia"
 
module3_dynamic_dag_Spain:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Spain"

  1. Ṣatunkọ faili naa example_dag_factory.py ki o si fi pamọ pẹlu akoonu koodu atẹle, lẹhinna gbe faili naa pada si dags folda. Awọn koodu nu awọn ti wa tẹlẹ DAGs ati gbogbo clean_dags() ọna ati awọn ṣiṣẹda titun DAGs lilo awọn generate_dags() ọna lati awọn DagFactory apeere.
from airflow import DAG
import dagfactory
  
config_file = "/usr/local/airflow/dags/dynamic_dags.yml"
example_dag_factory = dagfactory.DagFactory(config_file)
  
## to clean up or delete any existing DAGs ##
example_dag_factory.clean_dags(globals())
## generate and create new DAGs ##
example_dag_factory.generate_dags(globals())

  1. Lẹhin ti o gbejade awọn faili naa, pada si console Airflow UI ki o lọ kiri si taabu DAG, nibiti iwọ yoo rii DAGs tuntun.
    Ṣe atokọ awọn DAG tuntun
  2. Ni kete ti o ba gbejade awọn faili naa, pada si console Airflow UI ati labẹ taabu DAGs iwọ yoo rii awọn DAG tuntun ti n farahan bi a ṣe han ni isalẹ:Awọn DAG

O le mu awọn DAG ṣiṣẹ nipa ṣiṣe wọn lọwọ ati idanwo wọn ni ẹyọkan. Lori imuṣiṣẹ, afikun faili CSV ti a npè ni count_death_{COUNTRY_CODE}.csv ti ipilẹṣẹ ninu awọn dags folda.

Ninu

Awọn idiyele le wa ni nkan ṣe pẹlu lilo ọpọlọpọ awọn iṣẹ AWS ti a jiroro ni ifiweranṣẹ yii. Lati ṣe idiwọ awọn idiyele ọjọ iwaju, paarẹ agbegbe Amazon MWAA lẹhin ti o ti pari awọn iṣẹ ṣiṣe ti a ṣe ilana ni ifiweranṣẹ yii, ki o si ofo ki o pa garawa S3 naa.

ipari

Ninu ifiweranṣẹ bulọọgi yii a ṣe afihan bi o ṣe le lo ile-iṣẹ dag ile-ikawe lati ṣẹda DAGs ti o ni agbara. Awọn DAG ti o ni agbara jẹ ẹya nipasẹ agbara wọn lati ṣe ipilẹṣẹ awọn abajade pẹlu itupalẹ kọọkan ti faili DAG ti o da lori awọn atunto. Gbero lilo awọn DAG ti o ni agbara ni awọn oju iṣẹlẹ wọnyi:

  • Iṣilọ adaṣe adaṣe lati eto ohun-ini si Airflow, nibiti irọrun ni iran DAG ṣe pataki
  • Awọn ipo nibiti paramita kan nikan yipada laarin awọn DAG oriṣiriṣi, ṣiṣatunṣe ilana iṣakoso iṣan-iṣẹ
  • Ṣiṣakoṣo awọn DAG ti o gbẹkẹle ọna idagbasoke ti eto orisun kan, n pese iyipada si awọn ayipada
  • Ṣiṣeto awọn iṣe iwọntunwọnsi fun awọn DAG kọja ẹgbẹ tabi agbari rẹ nipa ṣiṣẹda awọn awoṣe wọnyi, igbega aitasera ati ṣiṣe
  • Gbigba awọn ikede ti o da lori YAML lori ifaminsi Python idiju, mimu iṣeto DAG dirọ ati awọn ilana itọju
  • Ṣiṣẹda ṣiṣan ṣiṣiṣẹ data ti o ṣe deede ati idagbasoke ti o da lori awọn igbewọle data, ṣiṣe adaṣe adaṣe daradara

Nipa iṣakojọpọ awọn DAG ti o ni agbara sinu ṣiṣiṣẹsẹhin iṣẹ rẹ, o le mu adaṣe pọ si, isọdọtun, ati iwọntunwọnsi, nikẹhin imudara ṣiṣe ati imunadoko ti iṣakoso opo gigun ti data rẹ.

Lati kọ diẹ sii nipa Amazon MWAA DAG Factory, ṣabẹwo Amazon MWAA fun Idanileko Atupale: DAG Factory. Fun awọn alaye afikun ati awọn apẹẹrẹ koodu lori Amazon MWAA, ṣabẹwo si Amazon MWAA Itọsọna olumulo ati awọn Awọn apẹẹrẹ Amazon MWAA GitHub ibi ipamọ.


Nipa awọn onkọwe

 Jayesh Shinde jẹ Sr. Ohun elo ayaworan pẹlu AWS ProServe India. O ṣe amọja ni ṣiṣẹda ọpọlọpọ awọn solusan ti o da lori awọsanma nipa lilo awọn iṣe idagbasoke sọfitiwia ode oni bii olupin, DevOps, ati awọn atupale.

Harshd Yeola jẹ Sr. Cloud Architect pẹlu AWS ProServe India n ṣe iranlọwọ fun awọn alabara lati jade ati ṣe imudojuiwọn awọn amayederun wọn sinu AWS. O ṣe amọja ni kikọ DevSecOps ati awọn amayederun iwọn lilo awọn apoti, AIOPs, ati Awọn irinṣẹ Olùgbéejáde AWS ati awọn iṣẹ.

iranran_img

Titun oye

iranran_img