Zephyrnet-logotyp

Introducerar Amazon MWAA-stöd för Apache Airflow version 2.8.1 | Amazon webbtjänster

Datum:

Amazon Managed Workflows för Apache Airflow (Amazon MWAA) är en hanterad orkestreringstjänst för Apache luftflöde som gör det enkelt att sätta upp och driva end-to-end datapipelines i molnet.

Organisationer använder Amazon MWAA för att förbättra sina arbetsflöden. Till exempel, C2i Genomics använder Amazon MWAA i sin dataplattform för att orkestrera valideringen av algoritmer som bearbetar cancergenomikdata i miljarder poster. Twitch, en livestreamingplattform, hanterar och orkestrerar utbildningen och distributionen av sina rekommendationsmodeller för över 140 miljoner aktiva användare. De använder Amazon MWAA för att skala, samtidigt som de förbättrar säkerheten avsevärt och minskar omkostnader för infrastrukturhantering.

Idag tillkännager vi tillgängligheten av Apache Airflow version 2.8.1-miljöer på Amazon MWAA. I det här inlägget går vi igenom några av de nya funktionerna och funktionerna i Airflow som nu är tillgängliga i Amazon MWAA, och hur du kan ställa in eller uppgradera din Amazon MWAA-miljö till version 2.8.1.

Objektlagring

När datapipelines skalas, kämpar ingenjörer för att hantera lagring över flera system med unika API:er, autentiseringsmetoder och konventioner för åtkomst till data, vilket kräver anpassad logik och lagringsspecifika operatörer. Airflow erbjuder nu ett enhetligt objektlagringsabstraktionslager som hanterar dessa detaljer, vilket låter ingenjörer fokusera på sina datapipelines. Används för lagring av luftflödesobjekt fsspec för att möjliggöra konsekvent dataåtkomstkod över olika objektlagringssystem, och därigenom effektivisera infrastrukturens komplexitet.

Följande är några av funktionens viktigaste fördelar:

  • Bärbara arbetsflöden – Du kan byta lagringstjänster med minimala ändringar i dina riktade acykliska grafer (DAG)
  • Effektiva dataöverföringar – Du kan strömma data istället för att ladda in i minnet
  • Minskat underhåll – Du behöver inga separata operatörer, vilket gör dina pipelines enkla att underhålla
  • Bekant programmeringserfarenhet – Du kan använda Python-moduler, som shutil, för filoperationer

Att använda objektlagring med Amazon enkel lagringstjänst (Amazon S3), du måste installera paketet extra s3fs med Amazon-leverantören (apache-airflow-providers-amazon[s3fs]==x.x.x).

I exempelkoden nedan kan du se hur du flyttar data direkt från Google Cloud Storage till Amazon S3. Eftersom Airflows objektlagring använder shutil.copyfileobj, objektens data läses i bitar från gcs_data_source och strömmade till amazon_s3_data_target.

gcs_data_source = ObjectStoragePath("gcs://source-bucket/prefix/", conn_id="google_cloud_default")

amazon_s3_data_target = ObjectStoragePath("s3://target-bucket/prefix/", conn_id="aws_default ")

with DAG(
    dag_id="copy_from_gcs_to_amazon_s3",
    start_date=datetime(2024, 2, 26),
    schedule="0 0 * * *",
    catchup=False,    
    tags=["2.8", "ObjectStorage"],
) as dag:

    def list_objects(path: ObjectStoragePath) -> list[ObjectStoragePath]:
        objects = [f for f in path.iterdir() if f.is_file()]
        return objects

    def copy_object(path: ObjectStoragePath, object: ObjectStoragePath):    
        object.copy(dst=path)

    objects_list = list_objects(path=gcs_data_source)
    copy_object.partial(path=amazon_s3_data_target).expand(object=objects_list)

För mer information om lagring av Airflow-objekt, se Objektförvaring.

XCom UI

XCom (korskommunikation) gör det möjligt att överföra data mellan uppgifter, vilket underlättar kommunikation och koordinering mellan dem. Tidigare var utvecklare tvungna att byta till en annan vy för att se XComs relaterade till en uppgift. Med Airflow 2.8 renderas XCom-nyckelvärden direkt på en flik i Airflow Grid-vyn, som visas i följande skärmdump.

Den nya xcom fliken ger följande fördelar:

  • Förbättrad XCom-synlighet – En dedikerad flik i användargränssnittet ger ett bekvämt och användarvänligt sätt att se alla XComs associerade med en DAG eller uppgift.
  • Förbättrad felsökning – Att kunna se XCom-värden direkt i användargränssnittet är användbart för att felsöka DAG:er. Du kan snabbt se resultatet av uppströmsuppgifter utan att manuellt behöva dra och inspektera dem med Python-kod.

Uppgiftskontextlogger

Att hantera uppgiftslivscykler är avgörande för att datapipelines ska fungera smidigt i Airflow. Vissa utmaningar har dock kvarstått, särskilt i scenarier där uppgifter oväntat stoppas. Detta kan uppstå på grund av olika orsaker, inklusive timeouts för schemaläggaren, zombie uppgifter (uppgifter som förblir i körläge utan att skicka hjärtslag), eller tillfällen där arbetaren får slut på minne.

Traditionellt registrerades inte sådana fel, särskilt de som utlöstes av kärna Airflow-komponenter som schemaläggaren eller executorn i uppgiftsloggarna. Denna begränsning krävde användare att felsöka utanför Airflow UI, vilket komplicerade processen att lokalisera och lösa problem.

Airflow 2.8 introducerade en betydande förbättring som åtgärdar detta problem. Luftflödeskomponenter, inklusive schemaläggaren och executorn, kan nu använda den nya TaskContextLogger för att vidarebefordra felmeddelanden direkt till uppgiftsloggarna. Den här funktionen låter dig se alla relevanta felmeddelanden relaterade till en aktivitets körning på ett ställe. Detta förenklar processen att ta reda på varför en uppgift misslyckades, vilket ger ett komplett perspektiv på vad som gick fel i en enda loggvy.

Följande skärmdump visar hur uppgiften upptäcks som zombie, och schemaläggningsloggen inkluderas som en del av uppgiftsloggen.

Du måste ställa in miljökonfigurationsparametern enable_task_context_logger till True, för att aktivera funktionen. När det väl har aktiverats kan Airflow skicka loggar från schemaläggaren, executorn eller callback-körningskontexten till uppgiftsloggarna och göra dem tillgängliga i Airflow UI.

Lyssnarkrokar för datauppsättningar

dataset introducerades i Airflow 2.4 som en logisk gruppering av datakällor för att skapa datamedveten schemaläggning och beroenden mellan DAG:er. Du kan till exempel schemalägga en konsument-DAG att köras när en producent-DAG uppdaterar en datauppsättning. lyssnare gör det möjligt för Airflow-användare att skapa prenumerationer på vissa händelser som händer i miljön. I Airflow 2.8 läggs lyssnare till för två datauppsättningshändelser: on_dataset_created och on_dataset_changed, vilket effektivt tillåter Airflow-användare att skriva anpassad kod för att reagera på datahanteringsoperationer. Du kan till exempel trigga ett externt system eller skicka ett meddelande.

Att använda lyssnarkrokar för datauppsättningar är enkelt. Slutför följande steg för att skapa en lyssnare för on_dataset_changed:

  1. Skapa lyssnaren (dataset_listener.py):
    from airflow import Dataset
    from airflow.listeners import hookimpl
    
    @hookimpl
    def on_dataset_changed(dataset: Dataset):
        """Following custom code is executed when a dataset is changed."""
        print("Invoking external endpoint")
    
        """Validating a specific dataset"""
        if dataset.uri == "s3://bucket-prefix/object-key.ext":
            print ("Execute specific/different action for this dataset")

  2. Skapa ett plugin för att registrera lyssnaren i din Airflow-miljö (dataset_listener_plugin.py):
    from airflow.plugins_manager import AirflowPlugin
    from plugins import listener_code
    
    class DatasetListenerPlugin(AirflowPlugin):
        name = "dataset_listener_plugin"
        listeners = [dataset_listener]

För mer information om hur man installerar plugins i Amazon MWAA, se Installerar anpassade plugins.

Konfigurera en ny Airflow 2.8.1-miljö i Amazon MWAA

Du kan initiera inställning i ditt konto och önskad region med hjälp av AWS Management Console, API eller AWS-kommandoradsgränssnitt (AWS CLI). Om du använder infrastruktur som kod (IaC) kan du automatisera installationen med hjälp av AWS molnformation, den AWS Cloud Development Kit (AWS CDK), eller Terraform-skript.

Efter framgångsrikt skapande av en Airflow version 2.8.1-miljö i Amazon MWAA, installeras vissa paket automatiskt på schemaläggaren och arbetarnoderna. För en komplett lista över installerade paket och deras versioner, se Apache Airflow-leverantörspaket installerade i Amazon MWAA-miljöer. Du kan installera ytterligare paket med hjälp av en kravfil.

Uppgradera från äldre versioner av Airflow till version 2.8.1

Du kan dra nytta av dessa senaste funktioner genom att uppgradera dina äldre Airflow version 2.x-baserade miljöer till version 2.8.1 med hjälp av inplacerade versionsuppgraderingar. För att lära dig mer om uppgraderingar av versioner på plats, se Uppgradering av Apache Airflow-versionen or Introducerar in-place versionsuppgraderingar med Amazon MWAA.

Slutsats

I det här inlägget diskuterade vi några viktiga funktioner som introducerats i Airflow version 2.8, såsom objektlagring, den nya XCom-fliken som lagts till i rutnätsvyn, loggning av uppgiftskontext, lyssnarkrokar för datauppsättningar och hur du kan börja använda dem. Vi tillhandahöll också lite exempelkod för att visa implementeringar i Amazon MWAA. För den fullständiga listan över ändringar, se Airflows release notes.

För ytterligare information och kodexempel på Amazon MWAA, besök Amazon MWAA användarhandbok och Amazon MWAA exempel på GitHub-repo.

Apache, Apache Airflow och Airflow är antingen registrerade varumärken eller varumärken som tillhör Apache Software Foundation i USA och / eller andra länder.


Om författarna

Mansi Bhutada är en ISV Solutions Architect baserad i Nederländerna. Hon hjälper kunder att designa och implementera väldesignade lösningar i AWS som löser deras affärsproblem. Hon brinner för dataanalys och nätverkande. Utöver jobbet tycker hon om att experimentera med mat, spela pickleball och dyka in i roliga brädspel.

Hernan Garcia är Senior Solutions Architect på AWS baserad i Nederländerna. Han arbetar inom finansbranschen och stöttar företag i deras molnintroduktion. Han brinner för serverlös teknologi, säkerhet och efterlevnad. Han tycker om att umgås med familj och vänner, och att prova nya rätter från olika kök.

plats_img

Senaste intelligens

plats_img