Logo Zephyrnet

Presentazione del supporto Amazon MWAA per Apache Airflow versione 2.8.1 | Servizi Web di Amazon

Data:

Flussi di lavoro gestiti da Amazon per Apache Airflow (Amazon MWAA) è un servizio di orchestrazione gestito per Flusso d'aria Apache ciò semplifica la configurazione e la gestione di pipeline di dati end-to-end nel cloud.

Le organizzazioni utilizzano Amazon MWAA per migliorare i flussi di lavoro aziendali. Per esempio, Genomica C2i utilizza Amazon MWAA nella propria piattaforma dati per orchestrare la convalida degli algoritmi che elaborano i dati sulla genomica del cancro in miliardi di record. Twitch, una piattaforma di live streaming, gestisce e orchestra la formazione e l'implementazione dei suoi modelli di raccomandazione per oltre 140 milioni di utenti attivi. Utilizzano Amazon MWAA per scalare, migliorando significativamente la sicurezza e riducendo i costi di gestione dell'infrastruttura.

Oggi annunciamo la disponibilità degli ambienti Apache Airflow versione 2.8.1 su Amazon MWAA. In questo post ti guideremo attraverso alcune delle nuove funzionalità e funzionalità di Airflow ora disponibili in Amazon MWAA e come configurare o aggiornare il tuo ambiente Amazon MWAA alla versione 2.8.1.

Archiviazione di oggetti

Man mano che le pipeline di dati crescono, gli ingegneri hanno difficoltà a gestire lo storage su più sistemi con API, metodi di autenticazione e convenzioni unici per l'accesso ai dati, che richiedono logica personalizzata e operatori specifici dello storage. Airflow offre ora un livello di astrazione unificato per l'archiviazione di oggetti che gestisce questi dettagli, consentendo agli ingegneri di concentrarsi sulle proprie pipeline di dati. Usi di archiviazione degli oggetti del flusso d'aria fsspec per consentire un codice di accesso ai dati coerente tra diversi sistemi di storage di oggetti, semplificando così la complessità dell'infrastruttura.

Di seguito sono riportati alcuni dei principali vantaggi della funzionalità:

  • Flussi di lavoro portatili – Puoi cambiare servizio di archiviazione con modifiche minime nei tuoi grafici aciclici diretti (DAG)
  • Trasferimenti di dati efficienti – È possibile eseguire lo streaming dei dati invece di caricarli in memoria
  • Manutenzione ridotta – Non sono necessari operatori separati, il che rende semplice la manutenzione delle pipeline
  • Esperienza di programmazione familiare – Puoi usare moduli Python, come shutil, per le operazioni sui file

Per utilizzare l'archiviazione di oggetti con Servizio di archiviazione semplice Amazon (Amazon S3), è necessario installa il pacchetto extra s3fs con il fornitore Amazon (apache-airflow-providers-amazon[s3fs]==x.x.x).

Nel codice di esempio riportato di seguito puoi vedere come spostare i dati direttamente da Google Cloud Storage ad Amazon S3. Poiché l'archiviazione degli oggetti di Airflow utilizza shutil.copyfileobj, i dati degli oggetti vengono letti in blocchi da gcs_data_source e trasmesso in streaming su amazon_s3_data_target.

gcs_data_source = ObjectStoragePath("gcs://source-bucket/prefix/", conn_id="google_cloud_default")

amazon_s3_data_target = ObjectStoragePath("s3://target-bucket/prefix/", conn_id="aws_default ")

with DAG(
    dag_id="copy_from_gcs_to_amazon_s3",
    start_date=datetime(2024, 2, 26),
    schedule="0 0 * * *",
    catchup=False,    
    tags=["2.8", "ObjectStorage"],
) as dag:

    def list_objects(path: ObjectStoragePath) -> list[ObjectStoragePath]:
        objects = [f for f in path.iterdir() if f.is_file()]
        return objects

    def copy_object(path: ObjectStoragePath, object: ObjectStoragePath):    
        object.copy(dst=path)

    objects_list = list_objects(path=gcs_data_source)
    copy_object.partial(path=amazon_s3_data_target).expand(object=objects_list)

Per ulteriori informazioni sull'archiviazione di oggetti Airflow, fare riferimento a Storage degli oggetti.

Interfaccia utente XCom

XCom (comunicazioni incrociate) consente il passaggio di dati tra attività, facilitando la comunicazione e il coordinamento tra di loro. In precedenza, gli sviluppatori dovevano passare a una visualizzazione diversa per visualizzare gli XCom relativi a un'attività. Con Airflow 2.8, i valori-chiave XCom vengono renderizzati direttamente in una scheda all'interno della vista Airflow Grid, come mostrato nello screenshot seguente.

Il nuovo xcom La scheda offre i seguenti vantaggi:

  • Visibilità XCom migliorata – Una scheda dedicata nell'interfaccia utente fornisce un modo comodo e intuitivo per visualizzare tutti gli XCom associati a un DAG o a un'attività.
  • Debug migliorato – Essere in grado di vedere i valori XCom direttamente nell'interfaccia utente è utile per il debug dei DAG. Puoi visualizzare rapidamente l'output delle attività upstream senza doverli estrarre e ispezionare manualmente utilizzando il codice Python.

Registratore del contesto dell'attività

La gestione dei cicli di vita delle attività è fondamentale per il corretto funzionamento delle pipeline di dati in Airflow. Tuttavia, alcune sfide persistono, in particolare negli scenari in cui le attività vengono interrotte inaspettatamente. Ciò può verificarsi per vari motivi, inclusi timeout dello scheduler, zombie attività (attività che rimangono in uno stato di esecuzione senza inviare heartbeat) o istanze in cui il lavoratore esaurisce la memoria.

Tradizionalmente, tali errori, in particolare quelli innescati dai componenti principali di Airflow come lo scheduler o l'esecutore, non venivano registrati nei registri delle attività. Questa limitazione richiedeva agli utenti di risolvere i problemi al di fuori dell'interfaccia utente di Airflow, complicando il processo di individuazione e risoluzione dei problemi.

Airflow 2.8 ha introdotto un miglioramento significativo che risolve questo problema. I componenti del flusso d'aria, inclusi lo scheduler e l'esecutore, ora possono utilizzare il nuovo TaskContextLogger per inoltrare i messaggi di errore direttamente ai registri delle attività. Questa funzionalità ti consente di visualizzare tutti i messaggi di errore rilevanti relativi all'esecuzione di un'attività in un unico posto. Ciò semplifica il processo di determinazione del motivo per cui un'attività non è riuscita, offrendo una prospettiva completa di ciò che è andato storto all'interno di un'unica visualizzazione del registro.

La schermata seguente mostra come viene rilevata l'attività zombiee il registro dello scheduler viene incluso come parte del registro delle attività.

È necessario impostare il parametro di configurazione dell'ambiente enable_task_context_logger a True, per abilitare la funzione. Una volta abilitato, Airflow può inviare i log dallo scheduler, dall'esecutore o dal contesto di esecuzione della richiamata ai log delle attività e renderli disponibili nell'interfaccia utente di Airflow.

Hook di ascolto per set di dati

Dataset sono stati introdotti in Airflow 2.4 come raggruppamento logico di origini dati per creare pianificazione e dipendenze sensibili ai dati tra DAG. Ad esempio, è possibile pianificare l'esecuzione di un DAG consumer quando un DAG produttore aggiorna un set di dati. Gli ascoltatori consentire agli utenti Airflow di creare abbonamenti a determinati eventi che si verificano nell'ambiente. In Airflow 2.8, gli ascoltatori vengono aggiunti per due eventi di set di dati: on_dataset_creato ed on_dataset_changed, consentendo effettivamente agli utenti di Airflow di scrivere codice personalizzato per reagire alle operazioni di gestione del set di dati. Ad esempio, puoi attivare un sistema esterno o inviare una notifica.

L'uso degli hook di ascolto per i set di dati è semplice. Completa i passaggi seguenti per creare un listener per on_dataset_changed:

  1. Crea l'ascoltatore (dataset_listener.py):
    from airflow import Dataset
    from airflow.listeners import hookimpl
    
    @hookimpl
    def on_dataset_changed(dataset: Dataset):
        """Following custom code is executed when a dataset is changed."""
        print("Invoking external endpoint")
    
        """Validating a specific dataset"""
        if dataset.uri == "s3://bucket-prefix/object-key.ext":
            print ("Execute specific/different action for this dataset")

  2. Crea un plug-in per registrare l'ascoltatore nel tuo ambiente Airflow (dataset_listener_plugin.py):
    from airflow.plugins_manager import AirflowPlugin
    from plugins import listener_code
    
    class DatasetListenerPlugin(AirflowPlugin):
        name = "dataset_listener_plugin"
        listeners = [dataset_listener]

Per ulteriori informazioni su come installare i plug-in in Amazon MWAA, fare riferimento a Installazione di plugin personalizzati.

Configura un nuovo ambiente Airflow 2.8.1 in Amazon MWAA

Puoi avviare il flessibile. nel tuo account e nella regione preferita utilizzando il file Console di gestione AWS, API o Interfaccia della riga di comando di AWS (AWS CLI). Se stai adottando l'infrastruttura come codice (IaC), puoi automatizzare la configurazione utilizzando AWS CloudFormazione, le Kit di sviluppo cloud AWS (AWS CDK) o script Terraform.

Dopo aver creato con successo un ambiente Airflow versione 2.8.1 in Amazon MWAA, alcuni pacchetti vengono installati automaticamente sullo scheduler e sui nodi di lavoro. Per un elenco completo dei pacchetti installati e delle relative versioni, fare riferimento a Pacchetti del provider Apache Airflow installati su ambienti Amazon MWAA. È possibile installare pacchetti aggiuntivi utilizzando un file dei requisiti.

Aggiorna dalle versioni precedenti di Airflow alla versione 2.8.1

Puoi sfruttare queste funzionalità più recenti aggiornando i tuoi ambienti basati sulla versione 2.x di Airflow precedente alla versione 2.8.1 utilizzando gli aggiornamenti della versione sul posto. Per ulteriori informazioni sugli aggiornamenti della versione sul posto, fare riferimento a Aggiornamento della versione Apache Airflow or Presentazione degli aggiornamenti della versione sul posto con Amazon MWAA.

Conclusione

In questo post, abbiamo discusso alcune importanti funzionalità introdotte in Airflow versione 2.8, come l'archiviazione di oggetti, la nuova scheda XCom aggiunta alla visualizzazione griglia, la registrazione del contesto delle attività, gli hook del listener per i set di dati e come iniziare a utilizzarli. Abbiamo anche fornito del codice di esempio per mostrare le implementazioni in Amazon MWAA. Per l'elenco completo delle modifiche fare riferimento a Note sulla versione di Airflow.

Per ulteriori dettagli ed esempi di codice su Amazon MWAA, visita il Guida per l'utente di Amazon MWAA e la Amazon MWAA esempi repository GitHub.

Apache, Apache Airflow e Airflow sono marchi o marchi registrati di Apache Software Foundation negli Stati Uniti e/o in altri paesi.


Informazioni sugli autori

Mansi Bhutada è un ISV Solutions Architect con sede nei Paesi Bassi. Aiuta i clienti a progettare e implementare soluzioni ben architettate in AWS che risolvano i loro problemi aziendali. È appassionata di analisi dei dati e networking. Oltre al lavoro, le piace sperimentare il cibo, giocare a pickleball e tuffarsi in divertenti giochi da tavolo.

Hernan Garcia è un Senior Solutions Architect presso AWS con sede nei Paesi Bassi. Lavora nel settore dei servizi finanziari, supportando le aziende nell'adozione del cloud. È appassionato di tecnologie serverless, sicurezza e conformità. Gli piace passare il tempo con la famiglia e gli amici e provare nuovi piatti di cucine diverse.

spot_img

L'ultima intelligenza

spot_img