Zephyrnet-logo

Introductie van Amazon MWAA-ondersteuning voor Apache Airflow versie 2.8.1 | Amazon-webservices

Datum:

Door Amazon beheerde workflows voor Apache Airflow (Amazon MWAA) is een beheerde orkestratieservice voor Apache-luchtstroom Dat maakt het eenvoudig om end-to-end datapijplijnen in de cloud op te zetten en te exploiteren.

Organisaties gebruiken Amazon MWAA om hun zakelijke workflows te verbeteren. Bijvoorbeeld, C2i Genomica gebruikt Amazon MWAA in hun dataplatform om de validatie te orkestreren van algoritmen die gegevens over kankergenomica in miljarden records verwerken. Trekken, een livestreamplatform, beheert en orkestreert de training en implementatie van zijn aanbevelingsmodellen voor meer dan 140 miljoen actieve gebruikers. Ze gebruiken Amazon MWAA om te schalen, terwijl ze de beveiliging aanzienlijk verbeteren en de overhead voor infrastructuurbeheer verminderen.

Vandaag kondigen we de beschikbaarheid aan van Apache Airflow versie 2.8.1-omgevingen op Amazon MWAA. In dit bericht leiden we u door enkele van de nieuwe functies en mogelijkheden van Airflow die nu beschikbaar zijn in Amazon MWAA, en hoe u uw Amazon MWAA-omgeving kunt instellen of upgraden naar versie 2.8.1.

Objectopslag

Naarmate de datapijplijnen zich uitbreiden, hebben ingenieurs moeite met het beheren van opslag over meerdere systemen met unieke API's, authenticatiemethoden en conventies voor toegang tot gegevens, waarvoor aangepaste logica en opslagspecifieke operators nodig zijn. Airflow biedt nu een uniforme abstractielaag voor objectopslag die deze details verwerkt, zodat ingenieurs zich kunnen concentreren op hun datapijplijnen. Gebruik van luchtstroomobjectopslag fsspec om consistente datatoegangscode voor verschillende objectopslagsystemen mogelijk te maken, waardoor de complexiteit van de infrastructuur wordt gestroomlijnd.

Hier volgen enkele van de belangrijkste voordelen van de functie:

  • Draagbare werkstromen – U kunt van opslagservice wisselen met minimale wijzigingen in uw Directed Acyclic Graphs (DAG's)
  • Efficiënte gegevensoverdracht – U kunt gegevens streamen in plaats van in het geheugen te laden
  • Minder onderhoud – U heeft geen aparte operators nodig, waardoor uw leidingen eenvoudig te onderhouden zijn
  • Bekende programmeerervaring – Je kunt Python-modules gebruiken, zoals stil, voor bestandsbewerkingen

Om objectopslag te gebruiken Amazon eenvoudige opslagservice (Amazon S3), dat is nodig installeer het pakket extra s3fs met de Amazon-provider (apache-airflow-providers-amazon[s3fs]==x.x.x).

In de onderstaande voorbeeldcode ziet u hoe u gegevens rechtstreeks vanuit Google Cloud Storage naar Amazon S3. Omdat Airflow's objectopslag gebruik maakt van shutil.copyfileobj, worden de gegevens van de objecten in stukjes uitgelezen gcs_data_source en gestreamd naar amazon_s3_data_target.

gcs_data_source = ObjectStoragePath("gcs://source-bucket/prefix/", conn_id="google_cloud_default")

amazon_s3_data_target = ObjectStoragePath("s3://target-bucket/prefix/", conn_id="aws_default ")

with DAG(
    dag_id="copy_from_gcs_to_amazon_s3",
    start_date=datetime(2024, 2, 26),
    schedule="0 0 * * *",
    catchup=False,    
    tags=["2.8", "ObjectStorage"],
) as dag:

    def list_objects(path: ObjectStoragePath) -> list[ObjectStoragePath]:
        objects = [f for f in path.iterdir() if f.is_file()]
        return objects

    def copy_object(path: ObjectStoragePath, object: ObjectStoragePath):    
        object.copy(dst=path)

    objects_list = list_objects(path=gcs_data_source)
    copy_object.partial(path=amazon_s3_data_target).expand(object=objects_list)

Voor meer informatie over de opslag van Airflow-objecten, zie Object Opslag.

XCom-gebruikersinterface

XCom (cross-communicatie) maakt het doorgeven van gegevens tussen taken mogelijk, waardoor de communicatie en coördinatie daartussen wordt vergemakkelijkt. Voorheen moesten ontwikkelaars naar een andere weergave overschakelen om XComs gerelateerd aan een taak te zien. Met Airflow 2.8 worden XCom-sleutelwaarden rechtstreeks weergegeven op een tabblad in de Airflow Grid-weergave, zoals weergegeven in de volgende schermafbeelding.

De nieuwe xcom tabblad biedt de volgende voordelen:

  • Verbeterde XCom-zichtbaarheid – Een speciaal tabblad in de gebruikersinterface biedt een handige en gebruiksvriendelijke manier om alle XComs te zien die aan een DAG of taak zijn gekoppeld.
  • Verbeterde foutopsporing – Het direct kunnen zien van XCom-waarden in de gebruikersinterface is handig bij het debuggen van DAG’s. U kunt snel de uitvoer van upstream-taken zien zonder dat u deze handmatig hoeft op te halen en te inspecteren met behulp van Python-code.

Taakcontextlogger

Het beheren van taaklevenscycli is cruciaal voor de soepele werking van datapijplijnen in Airflow. Er zijn echter bepaalde problemen blijven bestaan, vooral in scenario's waarin taken onverwacht worden stopgezet. Dit kan verschillende oorzaken hebben, waaronder time-outs van de planner, zombie taken (taken die actief blijven zonder hartslagen te verzenden), of gevallen waarbij de werker onvoldoende geheugen heeft.

Traditioneel werden dergelijke fouten, vooral die veroorzaakt door belangrijke Airflow-componenten zoals de planner of uitvoerder, niet geregistreerd in de taaklogboeken. Deze beperking vereiste dat gebruikers problemen buiten de Airflow-gebruikersinterface moesten oplossen, wat het proces van het opsporen en oplossen van problemen bemoeilijkte.

Airflow 2.8 introduceerde een aanzienlijke verbetering die dit probleem aanpakt. Luchtstroomcomponenten, inclusief de planner en uitvoerder, kunnen nu gebruik maken van de nieuwe TaskContextLogger om foutmeldingen rechtstreeks naar de taaklogboeken door te sturen. Met deze functie kunt u alle relevante foutmeldingen met betrekking tot de uitvoering van een taak op één plek bekijken. Dit vereenvoudigt het proces om uit te zoeken waarom een ​​taak is mislukt, en biedt een compleet perspectief van wat er mis is gegaan binnen één logweergave.

De volgende schermafbeelding laat zien hoe de taak wordt gedetecteerd zombieen het plannerlogboek wordt opgenomen als onderdeel van het takenlogboek.

U moet de omgevingsconfiguratieparameter instellen enable_task_context_logger naar True, om de functie in te schakelen. Zodra het is ingeschakeld, kan Airflow logboeken van de planner, de uitvoerder of de callback-runcontext naar de taaklogboeken verzenden en deze beschikbaar maken in de Airflow-gebruikersinterface.

Luisteraarshaken voor datasets

datasets werden in Airflow 2.4 geïntroduceerd als een logische groepering van gegevensbronnen om gegevensbewuste planning en afhankelijkheden tussen DAG's te creëren. U kunt bijvoorbeeld plannen dat een consumenten-DAG wordt uitgevoerd wanneer een producenten-DAG een gegevensset bijwerkt. luisteraars stel Airflow-gebruikers in staat abonnementen te maken op bepaalde evenementen die in de omgeving plaatsvinden. In Airflow 2.8 worden luisteraars toegevoegd voor twee gegevenssetgebeurtenissen: on_dataset_created en on_dataset_changed, waardoor Airflow-gebruikers effectief aangepaste code kunnen schrijven om te reageren op gegevenssetbeheerbewerkingen. Zo kun je bijvoorbeeld een extern systeem activeren, of een notificatie versturen.

Het gebruik van luisteraarshaken voor datasets is eenvoudig. Voer de volgende stappen uit om een ​​luisteraar voor te maken on_dataset_changed:

  1. Maak de luisteraar (dataset_listener.py):
    from airflow import Dataset
    from airflow.listeners import hookimpl
    
    @hookimpl
    def on_dataset_changed(dataset: Dataset):
        """Following custom code is executed when a dataset is changed."""
        print("Invoking external endpoint")
    
        """Validating a specific dataset"""
        if dataset.uri == "s3://bucket-prefix/object-key.ext":
            print ("Execute specific/different action for this dataset")

  2. Maak een plug-in om de luisteraar in uw Airflow-omgeving te registreren (dataset_listener_plugin.py):
    from airflow.plugins_manager import AirflowPlugin
    from plugins import listener_code
    
    class DatasetListenerPlugin(AirflowPlugin):
        name = "dataset_listener_plugin"
        listeners = [dataset_listener]

Voor meer informatie over het installeren van plug-ins in Amazon MWAA raadpleegt u Aangepaste plug-ins installeren.

Zet een nieuwe Airflow 2.8.1-omgeving op in Amazon MWAA

U kunt de setup in uw account en voorkeursregio met behulp van de AWS-beheerconsole, API, of AWS-opdrachtregelinterface (AWS CLI). Als u infrastructuur als code (IaC) gebruikt, kunt u de installatie automatiseren met behulp van AWS CloudFormatie AWS Cloud-ontwikkelingskit (AWS CDK) of Terraform-scripts.

Na het succesvol aanmaken van een Airflow versie 2.8.1-omgeving in Amazon MWAA, worden bepaalde pakketten automatisch geïnstalleerd op de planner- en werkknooppunten. Voor een volledige lijst van geïnstalleerde pakketten en hun versies, zie Apache Airflow-providerpakketten geïnstalleerd op Amazon MWAA-omgevingen. U kunt extra pakketten installeren met behulp van een vereistenbestand.

Upgrade van oudere versies van Airflow naar versie 2.8.1

U kunt profiteren van deze nieuwste mogelijkheden door uw oudere op Airflow versie 2.x gebaseerde omgevingen te upgraden naar versie 2.8.1 met behulp van interne versie-upgrades. Voor meer informatie over interne versie-upgrades raadpleegt u De Apache Airflow-versie upgraden or Introductie van interne versie-upgrades met Amazon MWAA.

Conclusie

In dit bericht hebben we enkele belangrijke functies besproken die zijn geïntroduceerd in Airflow versie 2.8, zoals objectopslag, het nieuwe XCom-tabblad toegevoegd aan de rasterweergave, taakcontextregistratie, luisteraarhooks voor datasets en hoe u deze kunt gaan gebruiken. We hebben ook enkele voorbeeldcode geleverd om implementaties in Amazon MWAA te tonen. Voor de volledige lijst met wijzigingen, zie Releaseopmerkingen van Airflow.

Ga voor meer informatie en codevoorbeelden op Amazon MWAA naar de Amazon MWAA-gebruikershandleiding en Amazon MWAA-voorbeelden GitHub-repo.

Apache, Apache Airflow en Airflow zijn gedeponeerde handelsmerken of handelsmerken van de Apache Software Foundation in de Verenigde Staten en/of andere landen.


Over de auteurs

Mansi Bhutada is een ISV Solutions Architect gevestigd in Nederland. Ze helpt klanten bij het ontwerpen en implementeren van goed ontworpen oplossingen in AWS die hun zakelijke problemen aanpakken. Ze heeft een passie voor data-analyse en netwerken. Naast haar werk experimenteert ze graag met eten, speelt ze pickleball en duikt ze in leuke bordspellen.

Hernán Garcia is een Senior Solutions Architect bij AWS gevestigd in Nederland. Hij werkt in de financiële dienstverlening en ondersteunt bedrijven bij hun adoptie van de cloud. Hij heeft een passie voor serverloze technologieën, beveiliging en compliance. Hij brengt graag tijd door met familie en vrienden en probeert graag nieuwe gerechten uit verschillende keukens.

spot_img

Laatste intelligentie

spot_img