Logo Zéphyrnet

Présentation de la prise en charge d'Amazon MWAA pour Apache Airflow version 2.8.1 | Services Web Amazon

Date :

Flux de travail gérés par Amazon pour Apache Airflow (Amazon MWAA) est un service d'orchestration géré pour Flux d'air Apache cela facilite la configuration et l’exploitation de pipelines de données de bout en bout dans le cloud.

Les organisations utilisent Amazon MWAA pour améliorer leurs flux de travail commerciaux. Par exemple, Génomique C2i utilise Amazon MWAA dans sa plate-forme de données pour orchestrer la validation des algorithmes traitant les données génomiques du cancer dans des milliards d'enregistrements. Twitch, plateforme de streaming en direct, gère et orchestre la formation et le déploiement de ses modèles de recommandation pour plus de 140 millions d'utilisateurs actifs. Ils utilisent Amazon MWAA pour évoluer, tout en améliorant considérablement la sécurité et en réduisant les frais de gestion de l'infrastructure.

Aujourd'hui, nous annonçons la disponibilité des environnements Apache Airflow version 2.8.1 sur Amazon MWAA. Dans cet article, nous vous présentons certaines des nouvelles fonctionnalités et capacités d'Airflow désormais disponibles dans Amazon MWAA, et comment vous pouvez configurer ou mettre à niveau votre environnement Amazon MWAA vers la version 2.8.1.

Stockage d'objets

À mesure que les pipelines de données évoluent, les ingénieurs ont du mal à gérer le stockage sur plusieurs systèmes avec des API, des méthodes d'authentification et des conventions uniques pour accéder aux données, nécessitant une logique personnalisée et des opérateurs spécifiques au stockage. Airflow propose désormais une couche d'abstraction de stockage d'objets unifiée qui gère ces détails, permettant aux ingénieurs de se concentrer sur leurs pipelines de données. Utilisations du stockage d'objets Airflow fsspec pour permettre un code d'accès aux données cohérent sur différents systèmes de stockage d'objets, rationalisant ainsi la complexité de l'infrastructure.

Voici quelques-uns des principaux avantages de cette fonctionnalité :

  • Flux de travail portables – Vous pouvez changer de service de stockage avec des modifications minimes dans vos graphiques acycliques dirigés (DAG)
  • Transferts de données efficaces – Vous pouvez diffuser des données au lieu de les charger en mémoire
  • Maintenance réduite – Vous n'avez pas besoin d'opérateurs distincts, ce qui rend la maintenance de vos pipelines simple
  • Expérience de programmation familière – Vous pouvez utiliser des modules Python, comme fermer, pour les opérations sur les fichiers

Pour utiliser le stockage d'objets avec Service de stockage simple Amazon (Amazon S3), vous devez installer le package supplémentaire s3fs avec le fournisseur Amazon (apache-airflow-providers-amazon[s3fs]==x.x.x).

Dans l'exemple de code ci-dessous, vous pouvez voir comment déplacer des données directement depuis Google Cloud Storage vers Amazon S3. Parce que le stockage objet d'Airflow utilise shutil.copyfileobj, les données des objets sont lues par morceaux à partir de gcs_data_source et diffusé en streaming vers amazon_s3_data_target.

gcs_data_source = ObjectStoragePath("gcs://source-bucket/prefix/", conn_id="google_cloud_default")

amazon_s3_data_target = ObjectStoragePath("s3://target-bucket/prefix/", conn_id="aws_default ")

with DAG(
    dag_id="copy_from_gcs_to_amazon_s3",
    start_date=datetime(2024, 2, 26),
    schedule="0 0 * * *",
    catchup=False,    
    tags=["2.8", "ObjectStorage"],
) as dag:

    def list_objects(path: ObjectStoragePath) -> list[ObjectStoragePath]:
        objects = [f for f in path.iterdir() if f.is_file()]
        return objects

    def copy_object(path: ObjectStoragePath, object: ObjectStoragePath):    
        object.copy(dst=path)

    objects_list = list_objects(path=gcs_data_source)
    copy_object.partial(path=amazon_s3_data_target).expand(object=objects_list)

Pour plus d'informations sur le stockage d'objets Airflow, reportez-vous à Stockage d'objets.

Interface utilisateur XCom

XCom (cross-communications) permet la transmission de données entre les tâches, facilitant la communication et la coordination entre elles. Auparavant, les développeurs devaient passer à une vue différente pour voir les XCom liés à une tâche. Avec Airflow 2.8, les valeurs-clés XCom sont rendues directement sur un onglet de la vue Airflow Grid, comme indiqué dans la capture d'écran suivante.

Le nouveau système d’ xcom L'onglet offre les avantages suivants :

  • Visibilité XCom améliorée – Un onglet dédié dans l'interface utilisateur offre un moyen pratique et convivial de voir tous les XCom associés à un DAG ou à une tâche.
  • Débogage amélioré – Pouvoir voir les valeurs XCom directement dans l'interface utilisateur est utile pour déboguer les DAG. Vous pouvez voir rapidement le résultat des tâches en amont sans avoir besoin de les extraire et de les inspecter manuellement à l'aide du code Python.

Enregistreur de contexte de tâche

La gestion des cycles de vie des tâches est cruciale pour le bon fonctionnement des pipelines de données dans Airflow. Cependant, certains défis persistent, notamment dans les scénarios où les tâches sont arrêtées de manière inattendue. Cela peut se produire pour diverses raisons, notamment les délais d'attente du planificateur, zombie tâches (tâches qui restent en cours d'exécution sans envoyer de battements de cœur) ou instances dans lesquelles le travailleur manque de mémoire.

Traditionnellement, de telles pannes, en particulier celles déclenchées par les composants principaux d'Airflow tels que le planificateur ou l'exécuteur, n'étaient pas enregistrées dans les journaux de tâches. Cette limitation obligeait les utilisateurs à résoudre les problèmes en dehors de l'interface utilisateur d'Airflow, ce qui compliquait le processus d'identification et de résolution des problèmes.

Airflow 2.8 a introduit une amélioration significative qui résout ce problème. Les composants Airflow, y compris le planificateur et l'exécuteur, peuvent désormais utiliser le nouveau TaskContextLogger pour transmettre les messages d'erreur directement aux journaux de tâches. Cette fonctionnalité vous permet de voir tous les messages d'erreur pertinents liés à l'exécution d'une tâche en un seul endroit. Cela simplifie le processus permettant de déterminer pourquoi une tâche a échoué, offrant une perspective complète de ce qui n'a pas fonctionné dans une seule vue de journal.

La capture d'écran suivante montre comment la tâche est détectée comme zombie, et le journal du planificateur est inclus dans le journal des tâches.

Vous devez définir le paramètre de configuration de l'environnement enable_task_context_logger à True, pour activer la fonctionnalité. Une fois activé, Airflow peut envoyer les journaux du planificateur, de l'exécuteur ou du contexte d'exécution de rappel aux journaux de tâches et les rendre disponibles dans l'interface utilisateur d'Airflow.

Hooks d’écoute pour les ensembles de données

Jeux de données ont été introduits dans Airflow 2.4 en tant que regroupement logique de sources de données pour créer une planification et des dépendances tenant compte des données entre les DAG. Par exemple, vous pouvez planifier l'exécution d'un DAG consommateur lorsqu'un DAG producteur met à jour un ensemble de données. Les auditeurs permettre aux utilisateurs d'Airflow de créer des abonnements à certains événements se déroulant dans l'environnement. Dans Airflow 2.8, des écouteurs sont ajoutés pour deux événements d'ensembles de données : on_dataset_created ainsi que on_dataset_changed, permettant ainsi aux utilisateurs d'Airflow d'écrire du code personnalisé pour réagir aux opérations de gestion des ensembles de données. Par exemple, vous pouvez déclencher un système externe ou envoyer une notification.

L’utilisation de hooks d’écoute pour les ensembles de données est simple. Effectuez les étapes suivantes pour créer un écouteur pour on_dataset_changed:

  1. Créez l'écouteur (dataset_listener.py):
    from airflow import Dataset
    from airflow.listeners import hookimpl
    
    @hookimpl
    def on_dataset_changed(dataset: Dataset):
        """Following custom code is executed when a dataset is changed."""
        print("Invoking external endpoint")
    
        """Validating a specific dataset"""
        if dataset.uri == "s3://bucket-prefix/object-key.ext":
            print ("Execute specific/different action for this dataset")

  2. Créez un plugin pour enregistrer l'écouteur dans votre environnement Airflow (dataset_listener_plugin.py):
    from airflow.plugins_manager import AirflowPlugin
    from plugins import listener_code
    
    class DatasetListenerPlugin(AirflowPlugin):
        name = "dataset_listener_plugin"
        listeners = [dataset_listener]

Pour plus d'informations sur la façon d'installer des plug-ins dans Amazon MWAA, reportez-vous à Installation de plugins personnalisés.

Configurer un nouvel environnement Airflow 2.8.1 dans Amazon MWAA

Vous pouvez lancer le installation dans votre compte et votre région préférée en utilisant le Console de gestion AWS, API ou Interface de ligne de commande AWS (AWS CLI). Si vous adoptez l'infrastructure en tant que code (IaC), vous pouvez automatiser la configuration à l'aide de AWS CloudFormation, Kit de développement AWS Cloud (AWS CDK) ou des scripts Terraform.

Lors de la création réussie d'un environnement Airflow version 2.8.1 dans Amazon MWAA, certains packages sont automatiquement installés sur les nœuds de planification et de travail. Pour une liste complète des packages installés et de leurs versions, reportez-vous à Packages du fournisseur Apache Airflow installés sur les environnements Amazon MWAA. Vous pouvez installer des packages supplémentaires à l'aide d'un fichier d'exigences.

Mise à niveau des anciennes versions d'Airflow vers la version 2.8.1

Vous pouvez profiter de ces dernières fonctionnalités en mettant à niveau vos anciens environnements basés sur Airflow version 2.x vers la version 2.8.1 à l'aide de mises à niveau de version sur place. Pour en savoir plus sur les mises à niveau des versions sur place, reportez-vous à Mettre à jour la version d'Apache Airflow or Présentation des mises à niveau de version sur place avec Amazon MWAA.

Conclusion

Dans cet article, nous avons discuté de certaines fonctionnalités importantes introduites dans Airflow version 2.8, telles que le stockage d'objets, le nouvel onglet XCom ajouté à la vue grille, la journalisation du contexte des tâches, les hooks d'écoute pour les ensembles de données et la façon dont vous pouvez commencer à les utiliser. Nous avons également fourni des exemples de code pour montrer les implémentations dans Amazon MWAA. Pour la liste complète des modifications, reportez-vous à Notes de version d'Airflow.

Pour plus de détails et des exemples de code sur Amazon MWAA, visitez le Guide de l'utilisateur Amazon MWAA et par Exemples de dépôt GitHub d'Amazon MWAA.

Apache, Apache Airflow et Airflow sont soit des marques déposées, soit des marques commerciales de Apache Software Foundation aux États-Unis et/ou dans d'autres pays.


À propos des auteurs

Mansi Bhutada est un architecte de solutions ISV basé aux Pays-Bas. Elle aide les clients à concevoir et à mettre en œuvre des solutions bien architecturées dans AWS qui répondent à leurs problèmes commerciaux. Elle est passionnée par l'analyse de données et les réseaux. Au-delà du travail, elle aime expérimenter la nourriture, jouer au pickleball et se lancer dans des jeux de société amusants.

Hernán García est un architecte de solutions senior chez AWS basé aux Pays-Bas. Il travaille dans le secteur des services financiers, accompagnant les entreprises dans leur adoption du cloud. Il est passionné par les technologies sans serveur, la sécurité et la conformité. Il aime passer du temps avec sa famille et ses amis et essayer de nouveaux plats de différentes cuisines.

spot_img

Dernières informations

spot_img