Logo Zéphyrnet

Génération DAG dynamique avec YAML et DAG Factory dans Amazon MWAA | Services Web Amazon

Date :

Flux de travail géré par Amazon pour Apache Airflow (Amazon MWAA) est un service géré qui vous permet d'utiliser un Flux d'air Apache avec une évolutivité, une disponibilité et une sécurité améliorées pour améliorer et faire évoluer vos flux de travail d'entreprise sans la charge opérationnelle liée à la gestion de l'infrastructure sous-jacente. Dans le flux d'air, Graphes acycliques dirigés (DAG) sont définis comme du code Python. DAG dynamiques fait référence à la possibilité de générer des DAG à la volée pendant l'exécution, généralement en fonction de certaines conditions, configurations ou paramètres externes. Les DAG dynamiques vous aident à créer, planifier et exécuter des tâches au sein d'un DAG en fonction de données et de configurations qui peuvent changer au fil du temps.

Il existe différentes manières d'introduire du dynamisme dans les DAG Airflow (génération dynamique de DAG) en utilisant des variables d'environnement et des fichiers externes. Une des approches consiste à utiliser le Usine DAG Méthode de fichier de configuration basée sur YAML. Cette bibliothèque vise à faciliter la création et la configuration de nouveaux DAG en utilisant des paramètres déclaratifs en YAML. Il permet des personnalisations par défaut et est open source, ce qui simplifie la création et la personnalisation de nouvelles fonctionnalités.

Dans cet article, nous explorons le processus de création de DAG dynamiques avec des fichiers YAML, à l'aide de l'outil Usine DAG bibliothèque. Les DAG dynamiques offrent plusieurs avantages :

  1. Réutilisabilité améliorée du code – En structurant les DAG via des fichiers YAML, nous favorisons les composants réutilisables, réduisant ainsi la redondance dans vos définitions de workflow.
  2. Maintenance rationalisée – La génération de DAG basée sur YAML simplifie le processus de modification et de mise à jour des flux de travail, garantissant ainsi des procédures de maintenance plus fluides.
  3. Paramétrage flexible – Avec YAML, vous pouvez paramétrer les configurations DAG, facilitant ainsi les ajustements dynamiques des flux de travail en fonction de différentes exigences.
  4. Efficacité améliorée du planificateur – Les DAG dynamiques permettent une planification plus efficace, optimisant l'allocation des ressources et améliorant les exécutions globales du flux de travail
  5. Évolutivité améliorée – Les DAG pilotés par YAML permettent des exécutions parallèles, ce qui permet des flux de travail évolutifs capables de gérer efficacement des charges de travail accrues.

En exploitant la puissance des fichiers YAML et de la bibliothèque DAG Factory, nous proposons une approche polyvalente de création et de gestion de DAG, vous permettant de créer des pipelines de données robustes, évolutifs et maintenables.

Présentation de la solution

Dans cet article, nous utiliserons un exemple de fichier DAG conçu pour traiter un ensemble de données COVID-19. Le processus de workflow implique le traitement d'un ensemble de données open source proposé par OMS-COVID-19-Monde. Après avoir installé le Usine DAG Package Python, nous créons un fichier YAML contenant les définitions de diverses tâches. Nous traitons le décompte des décès par pays en passant Country en tant que variable, ce qui crée des DAG individuels par pays.

Le diagramme suivant illustre la solution globale ainsi que les flux de données au sein des blocs logiques.

Présentation de la solution

Pré-requis

Pour cette procédure pas à pas, vous devez disposer des prérequis suivants:

De plus, effectuez les étapes suivantes (exécutez l'installation dans un Région AWS où Amazon MWAA est disponible) :

  1. Créer un Environnement Amazon MWAA (si vous n'en avez pas déjà un). Si c'est la première fois que vous utilisez Amazon MWAA, reportez-vous à Présentation d'Amazon Managed Workflows pour Apache Airflow (MWAA).

Assurez-vous que le Gestion des identités et des accès AWS L'utilisateur ou le rôle (IAM) utilisé pour configurer l'environnement est associé à des stratégies IAM pour les autorisations suivantes :

Les politiques d'accès mentionnées ici ne sont qu'à titre d'exemple dans cet article. Dans un environnement de production, fournissez uniquement les autorisations granulaires nécessaires en exerçant principes du moindre privilège.

  1. Créez un nom de compartiment Amazon S3 unique (au sein d'un compte) lors de la création de votre environnement Amazon MWAA et créez des dossiers appelés dags ainsi que requirements.
    Bucket Amazon S3
  2. Créer et télécharger un requirements.txt fichier avec le contenu suivant dans le requirements dossier. Remplacer {environment-version} avec le numéro de version de votre environnement, et {Python-version} avec la version de Python compatible avec votre environnement :
    --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-{Airflow-version}/constraints-{Python-version}.txt"
    dag-factory==0.19.0
    pandas==2.1.4

Pandas est nécessaire uniquement pour l'exemple de cas d'utilisation décrit dans cet article, et dag-factory est le seul plug-in requis. Il est recommandé de vérifier la compatibilité de la dernière version de dag-factory avec Amazon MWAA. Le boto ainsi que psycopg2-binary les bibliothèques sont incluses avec l'installation de base d'Apache Airflow v2 et n'ont pas besoin d'être spécifiées dans votre requirements.txt fichier.

  1. Télécharger Fichier de données mondiales OMS-COVID-19 sur votre ordinateur local et téléchargez-le sous le dags préfixe de votre compartiment S3.

Assurez-vous que vous pointez vers la dernière version du compartiment AWS S3 de votre requirements.txt fichier pour que l'installation du package supplémentaire ait lieu. Cela devrait généralement prendre entre 15 et 20 minutes en fonction de la configuration de votre environnement.

Valider les DAG

Lorsque votre environnement Amazon MWAA s'affiche comme Disponible sur la console Amazon MWAA, accédez à l'interface utilisateur Airflow en choisissant Ouvrir l’interface utilisateur d’Airflow à côté de votre environnement.

Valider le DAG

Vérifiez les DAG existants en accédant à l’onglet DAG.

Vérifier le DAG

Configurez vos DAG

Effectuez les étapes suivantes:

  1. Créez des fichiers vides nommés dynamic_dags.yml, example_dag_factory.py ainsi que process_s3_data.py sur votre machine locale.
  2. Modifiez le process_s3_data.py fichier et enregistrez-le avec le contenu du code suivant, puis téléchargez à nouveau le fichier dans le compartiment Amazon S3 dags dossier. Nous effectuons quelques traitements de données de base dans le code :
    1. Lire le fichier à partir d'un emplacement Amazon S3
    2. Renommez le Country_code colonne en fonction du pays.
    3. Filtrer les données par pays donné.
    4. Écrivez les données finales traitées au format CSV et téléchargez-les vers le préfixe S3.
import boto3
import pandas as pd
import io
   
def process_s3_data(COUNTRY):
### Top level Variables replace S3_BUCKET with your bucket name ###
    s3 = boto3.client('s3')
    S3_BUCKET = "my-mwaa-assets-bucket-sfj33ddkm"
    INPUT_KEY = "dags/WHO-COVID-19-global-data.csv"
    OUTPUT_KEY = "dags/count_death"
### get csv file ###
   response = s3.get_object(Bucket=S3_BUCKET, Key=INPUT_KEY)
   status = response['ResponseMetadata']['HTTPStatusCode']
   if status == 200:
### read csv file and filter based on the country to write back ###
       df = pd.read_csv(response.get("Body"))
       df.rename(columns={"Country_code": "country"}, inplace=True)
       filtered_df = df[df['country'] == COUNTRY]
       with io.StringIO() as csv_buffer:
                   filtered_df.to_csv(csv_buffer, index=False)
                   response = s3.put_object(
                       Bucket=S3_BUCKET, Key=OUTPUT_KEY + '_' + COUNTRY + '.csv', Body=csv_buffer.getvalue()
                   )
       status = response['ResponseMetadata']['HTTPStatusCode']
       if status == 200:
           print(f"Successful S3 put_object response. Status - {status}")
       else:
           print(f"Unsuccessful S3 put_object response. Status - {status}")
   else:
       print(f"Unsuccessful S3 get_object response. Status - {status}")

  1. Modifiez le dynamic_dags.yml et enregistrez-le avec le contenu du code suivant, puis téléchargez à nouveau le fichier sur le dags dossier. Nous cousons différents DAG en fonction du pays comme suit :
    1. Définissez les arguments par défaut transmis à tous les DAG.
    2. Créez une définition DAG pour chaque pays en passant op_args
    3. Cartographier le process_s3_data fonctionner avec python_callable_name.
    4. Utilisez Opérateur Python pour traiter les données du fichier CSV stockées dans le compartiment Amazon S3.
    5. Nous avons mis schedule_interval 10 minutes, mais n'hésitez pas à ajuster cette valeur si nécessaire.
default:
  default_args:
    owner: "airflow"
    start_date: "2024-03-01"
    retries: 1
    retry_delay_sec: 300
  concurrency: 1
  max_active_runs: 1
  dagrun_timeout_sec: 600
  default_view: "tree"
  orientation: "LR"
  schedule_interval: "*/10 * * * *"
 
module3_dynamic_dag_Australia:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Australia"
 
module3_dynamic_dag_Brazil:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Brazil"
 
module3_dynamic_dag_India:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "India"
 
module3_dynamic_dag_Japan:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Japan"
 
module3_dynamic_dag_Mexico:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Mexico"
 
module3_dynamic_dag_Russia:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Russia"
 
module3_dynamic_dag_Spain:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Spain"

  1. Modifier le fichier example_dag_factory.py et enregistrez-le avec le contenu du code suivant, puis téléchargez à nouveau le fichier sur dags dossier. Le code nettoie les DAG existants et génère clean_dags() méthode et la création de nouveaux DAG à l'aide de la generate_dags() méthode de la DagFactory exemple.
from airflow import DAG
import dagfactory
  
config_file = "/usr/local/airflow/dags/dynamic_dags.yml"
example_dag_factory = dagfactory.DagFactory(config_file)
  
## to clean up or delete any existing DAGs ##
example_dag_factory.clean_dags(globals())
## generate and create new DAGs ##
example_dag_factory.generate_dags(globals())

  1. Après avoir téléchargé les fichiers, revenez à la console de l'interface utilisateur Airflow et accédez à l'onglet DAG, où vous trouverez de nouveaux DAG.
    Lister les nouveaux DAG
  2. Une fois que vous avez téléchargé les fichiers, revenez à la console de l'interface utilisateur Airflow et sous l'onglet DAG, vous constaterez que de nouveaux DAG apparaissent comme indiqué ci-dessous :DAG

Vous pouvez activer les DAG en les rendant actifs et en les testant individuellement. Lors de l'activation, un fichier CSV supplémentaire nommé count_death_{COUNTRY_CODE}.csv est généré dans le dossier dags.

Nettoyer

L'utilisation des différents services AWS abordés dans cet article peut entraîner des coûts. Pour éviter d'encourir des frais futurs, supprimez l'environnement Amazon MWAA après avoir terminé les tâches décrites dans cet article, puis videz et supprimez le compartiment S3.

Conclusion

Dans cet article de blog, nous avons montré comment utiliser le usine-jour bibliothèque pour créer des DAG dynamiques. Les DAG dynamiques se caractérisent par leur capacité à générer des résultats à chaque analyse du fichier DAG en fonction des configurations. Envisagez d'utiliser des DAG dynamiques dans les scénarios suivants :

  • Automatisation de la migration d'un système existant vers Airflow, où la flexibilité dans la génération de DAG est cruciale
  • Situations où seul un paramètre change entre différents DAG, rationalisant ainsi le processus de gestion du flux de travail
  • Gestion des DAG qui dépendent de la structure évolutive d'un système source, offrant une adaptabilité aux changements
  • Établir des pratiques standardisées pour les DAG au sein de votre équipe ou organisation en créant ces plans, favorisant la cohérence et l'efficacité
  • Adopter les déclarations basées sur YAML sur le codage Python complexe, simplifiant ainsi les processus de configuration et de maintenance du DAG
  • Créer des flux de travail basés sur les données qui s'adaptent et évoluent en fonction des entrées de données, permettant une automatisation efficace

En incorporant des DAG dynamiques dans votre flux de travail, vous pouvez améliorer l'automatisation, l'adaptabilité et la standardisation, améliorant ainsi l'efficience et l'efficacité de la gestion de votre pipeline de données.

Pour en savoir plus sur Amazon MWAA DAG Factory, visitez Atelier Amazon MWAA pour Analytics : DAG Factory. Pour plus de détails et des exemples de code sur Amazon MWAA, visitez le Guide de l'utilisateur Amazon MWAA et par Exemples Amazon MWAA GitHub dépôt.


À propos des auteurs

 Jayesh Shindé est architecte d'applications principal chez AWS ProServe India. Il se spécialise dans la création de diverses solutions centrées sur le cloud en utilisant des pratiques de développement de logiciels modernes telles que le sans serveur, le DevOps et l'analyse.

Dur Yeola est Sr. Cloud Architect chez AWS ProServe India, aidant les clients à migrer et à moderniser leur infrastructure vers AWS. Il se spécialise dans la création de DevSecOps et d'infrastructures évolutives à l'aide de conteneurs, d'AIOP et d'outils et services de développement AWS.

spot_img

Dernières informations

spot_img