Zephyrnet-logo

Combineer AWS Glue en Amazon MWAA om geavanceerde VPC-selectie- en failover-strategieën te bouwen | Amazon-webservices

Datum:

AWS lijm is een serverloze data-integratieservice die het eenvoudig maakt om data uit meerdere bronnen te ontdekken, voor te bereiden, te verplaatsen en te integreren voor analyse, machine learning (ML) en applicatie-ontwikkeling.

AWS Glue-klanten moeten vaak aan strenge beveiligingseisen voldoen, waarbij soms de netwerkconnectiviteit die voor de taak is toegestaan, moet worden vergrendeld of binnen een specifieke VPC moet worden gewerkt om toegang te krijgen tot een andere dienst. Om binnen de VPC te kunnen werken, moeten de taken aan één enkel subnet worden toegewezen, maar het meest geschikte subnet kan in de loop van de tijd veranderen (bijvoorbeeld op basis van het gebruik en de beschikbaarheid). Het kan dus zijn dat u er de voorkeur aan geeft die beslissing tijdens runtime te nemen, op basis op uw eigen strategie.

Door Amazon beheerde workflows voor Apache Airflow (Amazon MWAA) is een AWS-service voor het uitvoeren van beheerde Airflow-workflows, waarmee aangepaste logica kan worden geschreven om te coördineren hoe taken zoals AWS Glue-taken worden uitgevoerd.

In dit bericht laten we zien hoe u een AWS Glue-taak uitvoert als onderdeel van een Airflow-workflow, met dynamisch configureerbare selectie van het VPC-subnet dat tijdens runtime aan de taak is toegewezen.

Overzicht oplossingen

Om binnen een VPC te kunnen werken, moet aan een AWS Glue-taak minimaal een verbinding worden toegewezen die netwerkconfiguratie omvat. Bij elke verbinding kan een VPC, subnet en beveiligingsgroep worden gespecificeerd, maar voor de eenvoud gebruikt dit bericht verbindingen van het type: NETWORK, dat alleen de netwerkconfiguratie definieert en geen externe systemen omvat.

Als aan de taak een vast subnet is toegewezen door één enkele verbinding, kan er in geval van een servicestoring op de Beschikbaarheidszones of als het subnet om andere redenen niet beschikbaar is, kan de taak niet worden uitgevoerd. Bovendien vereist elk knooppunt (bestuurder of werknemer) in een AWS Glue-taak een IP-adres dat is toegewezen vanuit het subnet. Wanneer veel grote taken tegelijkertijd worden uitgevoerd, kan dit leiden tot een tekort aan IP-adressen en kan de taak met minder knooppunten worden uitgevoerd dan bedoeld of helemaal niet worden uitgevoerd.

Met AWS Glue-extractie-, transformatie- en laadtaken (ETL) kunnen meerdere verbindingen worden gespecificeerd met meerdere netwerkconfiguraties. De taak zal echter altijd proberen de netwerkconfiguratie van de verbindingen in de aangegeven volgorde te gebruiken en de eerste te kiezen die slaagt gezondheidchecks en heeft ten minste twee IP-adressen om de taak te starten, wat misschien niet de optimale optie is.

Met deze oplossing kunt u dat gedrag verbeteren en aanpassen door de verbindingen dynamisch opnieuw te ordenen en de selectieprioriteit te definiëren. Als een nieuwe poging nodig is, worden de verbindingen opnieuw geprioriteerd op basis van de strategie, omdat de omstandigheden mogelijk zijn veranderd sinds de laatste run.

Als resultaat hiervan helpt het voorkomen dat de taak niet kan worden uitgevoerd of onder capaciteit wordt uitgevoerd als gevolg van een tekort aan subnet-IP-adressen of zelfs een storing, terwijl wordt voldaan aan de netwerkbeveiligings- en connectiviteitsvereisten.

Het volgende diagram illustreert de oplossingsarchitectuur.

Voorwaarden

Om de stappen van het bericht te volgen, heb je een gebruiker nodig die kan inloggen op de AWS-beheerconsole en toestemming heeft voor toegang tot Amazon MWAA, Amazon virtuele privécloud (Amazon VPC) en AWS-lijm. De AWS-regio waar u de oplossing wilt inzetten, heeft de capaciteit nodig om een ​​VPC en twee elastische IP-adressen te creëren. Het standaard regionale quotum voor beide typen bronnen is vijf, dus het kan zijn dat u via de console een verhoging moet aanvragen.

Je hebt ook een AWS Identiteits- en toegangsbeheer (IAM) rol geschikt om AWS Glue-taken uit te voeren als u er nog geen heeft. Voor instructies, zie Een IAM-rol maken voor AWS Glue.

Implementeer een Airflow-omgeving en VPC

Eerst implementeer je een nieuwe Airflow-omgeving, inclusief de creatie van een nieuwe VPC met twee publieke subnetten en twee private subnetten. Dit komt omdat Amazon MWAA fouttolerantie voor de beschikbaarheidszone vereist, dus het moet op twee subnetten op twee verschillende beschikbaarheidszones in de regio draaien. De openbare subnetten worden gebruikt zodat de NAT Gateway internettoegang kan bieden voor de particuliere subnetten.

Voer de volgende stappen uit:

  1. Maak een AWS CloudFormatie sjabloon op uw computer door de sjabloon uit het volgende te kopiëren Snelstartgids naar een lokaal tekstbestand.
  2. Kies op de AWS CloudFormation-console Stacks in het navigatievenster.
  3. Kies Maak een stapel met de optie Met nieuwe middelen (standaard).
  4. Kies Upload een sjabloonbestand en kies het lokale sjabloonbestand.
  5. Kies Volgende.
  6. Voltooi de installatiestappen, voer een naam in voor de omgeving en laat de rest van de parameters op de standaardwaarde staan.
  7. Erken bij de laatste stap dat er hulpbronnen zullen worden gecreëerd en kies Verzenden.

Het maken kan 20-30 minuten duren, totdat de status van de stapel verandert in CREATE_COMPLETE.

De hulpbron die de meeste tijd in beslag zal nemen, is de Airflow-omgeving. Terwijl het wordt gemaakt, kunt u doorgaan met de volgende stappen, totdat u de Airflow-gebruikersinterface moet openen.

  1. Op de stapel Resources tabblad, noteer de ID's voor de VPC en twee privé-subnetten (PrivateSubnet1 en PrivateSubnet2), om te gebruiken in de volgende stap.

Creëer AWS Glue-verbindingen

De CloudFormation-sjabloon implementeert twee privé-subnetten. In deze stap maakt u met elke verbinding een AWS Glue-verbinding, zodat AWS Glue-taken daarin kunnen worden uitgevoerd. Amazon MWAA heeft onlangs de capaciteit toegevoegd om het Airflow-cluster op gedeelde VPC's te laten draaien, wat de kosten verlaagt en het netwerkbeheer vereenvoudigt. Voor meer informatie, zie Introductie van gedeelde VPC-ondersteuning op Amazon MWAA.

Voer de volgende stappen uit om de verbindingen te maken:

  1. Kies op de AWS Glue-console: Gegevensverbindingen in het navigatievenster.
  2. Kies Verbinding maken.
  3. Kies Netwerk als de gegevensbron.
  4. Kies het VPC en privé-subnet (PrivateSubnet1) gemaakt door de CloudFormation-stack.
  5. Gebruik de standaardbeveiligingsgroep.
  6. Kies Volgende.
  7. Voer de verbindingsnaam in MWAA-Glue-Blog-Subnet1.
  8. Controleer de details en voltooi de creatie.
  9. Herhaal deze stappen met PrivateSubnet2 en geef de verbinding een naam MWAA-Glue-Blog-Subnet2.

Maak de AWS Glue-taak

Nu maakt u de AWS Glue-taak die later door de Airflow-workflow wordt geactiveerd. De taak gebruikt de verbindingen die in de vorige sectie zijn gemaakt, maar in plaats van ze rechtstreeks aan de taak toe te wijzen, zoals u normaal zou doen, laat u in dit scenario de lijst met taakverbindingen leeg en laat u de werkstroom beslissen welke u tijdens runtime wilt gebruiken.

Het taakscript is in dit geval niet significant en is alleen bedoeld om aan te tonen dat de taak in een van de subnetten wordt uitgevoerd, afhankelijk van de verbinding.

  1. Kies op de AWS Glue-console: ETL-banen in het navigatievenster en kies vervolgens Script-editor.
  2. Laat de standaardopties (Spark engine en Verse start) en kies Script maken.
  3. Vervang het placeholder-script door de volgende Python-code:
    import ipaddress
    import socket
    
    subnets = {
        "PrivateSubnet1": "10.192.20.0/24",
        "PrivateSubnet2": "10.192.21.0/24"
    }
    
    ip = socket.gethostbyname(socket.gethostname())
    subnet_name = "unknown"
    for subnet, cidr in subnets.items():
        if ipaddress.ip_address(ip) in ipaddress.ip_network(cidr):
            subnet_name = subnet
    
    print(f"The driver node has been assigned the ip: {ip}"
          + f" which belongs to the subnet: {subnet_name}")
    

  4. Hernoem de taak naar AirflowBlogJob.
  5. Op de Details van de baan tabblad, voor IAM-rol, kies een willekeurige rol en voer 2 in voor het aantal werknemers (alleen voor soberheid).
  6. Sla deze wijzigingen op zodat de taak wordt gemaakt.

Verleen AWS Glue-machtigingen aan de Airflow-omgevingsrol

De rol die door de CloudFormation-sjabloon voor Airflow is gemaakt, biedt de basisrechten om workflows uit te voeren, maar niet om te communiceren met andere services zoals AWS Glue. In een productieproject zou u uw eigen sjablonen definiëren met deze aanvullende machtigingen, maar in dit bericht voegt u, voor de eenvoud, de aanvullende machtigingen toe als inline-beleid. Voer de volgende stappen uit:

  1. Kies op de IAM-console rollen in het navigatievenster.
  2. Zoek de rol die door de sjabloon is gemaakt; het begint met de naam die u aan de CloudFormation-stack hebt toegewezen en vervolgens -MwaaExecutionRole-.
  3. Op de pagina met roldetails, op de Machtigingen toevoegen menu, kies Inline beleid maken.
  4. Schakel over van de visuele naar de JSON-modus en voer de volgende JSON in het tekstvak in. Er wordt van uitgegaan dat de AWS Glue-rol die je hebt de conventie volgt om mee te beginnen AWSGlueServiceRole. Voor verbeterde beveiliging kunt u de jokertekenbron op het ec2:DescribeSubnets toestemming met de ARN's van de twee privé-subnetten van de CloudFormation-stack.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "glue:GetConnection"
                ],
                "Resource": [
                    "arn:aws:glue:*:*:connection/MWAA-Glue-Blog-Subnet*",
                    "arn:aws:glue:*:*:catalog"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "glue:UpdateJob",
                    "glue:GetJob",
                    "glue:StartJobRun",
                    "glue:GetJobRun"
                ],
                "Resource": [
                    "arn:aws:glue:*:*:job/AirflowBlogJob",
                    "arn:aws:glue:*:*:job/BlogAirflow"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "ec2:DescribeSubnets"
                ],
                "Resource": "*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "iam:GetRole",
                    "iam:PassRole"
                ],
                "Resource": "arn:aws:iam::*:role/service-role/AWSGlueServiceRole*"
            }
        ]
    }
    

  5. Kies Volgende.
  6. Enter GlueRelatedPermissions als de beleidsnaam en voltooi het maken.

In dit voorbeeld gebruiken we een ETL-scripttaak; Voor een visuele taak heeft de Airflow-rol toestemming nodig om naar het geconfigureerde scriptpad te schrijven, omdat het script automatisch wordt gegenereerd bij het opslaan. Amazon eenvoudige opslagservice (Amazone S3).

Maak de luchtstroom-DAG

Een Airflow-workflow is gebaseerd op een Directed Acyclic Graph (DAG), die wordt gedefinieerd door een Python-bestand dat programmatisch de verschillende betrokken taken en de onderlinge afhankelijkheden specificeert. Voltooi de volgende scripts om de DAG te maken:

  1. Maak een lokaal bestand met de naam glue_job_dag.py met behulp van een teksteditor.

In elk van de volgende stappen bieden we een codefragment dat in het bestand kan worden ingevoerd en een uitleg van wat het doet.

  1. Het volgende fragment voegt de vereiste import van Python-modules toe. De modules zijn al geïnstalleerd op Airflow; Als dat niet het geval zou zijn, zou je een requirements.txt bestand om aan Airflow aan te geven welke modules moeten worden geïnstalleerd. Het definieert ook de Boto3-clients die de code later zal gebruiken. Standaard gebruiken ze dezelfde rol en regio als Airflow. Daarom stelt u vóór de rol de extra vereiste machtigingen in.
    import boto3
    from pendulum import datetime, duration
    from random import shuffle
    from airflow import DAG
    from airflow.decorators import dag, task
    from airflow.models import Variable
    from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
    
    glue_client = boto3.client('glue')
    ec2 = boto3.client('ec2')
    

  2. Het volgende fragment voegt drie functies toe om de verbindingsvolgordestrategie te implementeren, die definieert hoe de gegeven verbindingen opnieuw moeten worden geordend om hun prioriteit vast te stellen. Dit is slechts een voorbeeld; u kunt uw aangepaste code bouwen om uw eigen logica te implementeren, afhankelijk van uw behoeften. De code controleert eerst de IP's die beschikbaar zijn op elk verbindingssubnet en scheidt de IP's die voldoende IP's beschikbaar hebben om de taak op volledige capaciteit uit te voeren en de IP's die kunnen worden gebruikt omdat ze ten minste twee IP's beschikbaar hebben, wat het minimum is dat een taak nodig heeft. begin. Als de strategie is ingesteld op random, zal het de volgorde binnen elk van de eerder beschreven verbindingsgroepen willekeurig maken en eventuele andere verbindingen toevoegen. Als de strategie dat is capacity, zal het ze bestellen van de meeste gratis IP's naar de minste.
    def get_available_ips_from_connection(glue_connection_name):
        conn_response = glue_client.get_connection(Name=glue_connection_name)
        connection_properties = conn_response['Connection']['PhysicalConnectionRequirements']
        subnet_id = connection_properties['SubnetId']
        subnet_response = ec2.describe_subnets(SubnetIds=[subnet_id])
        return subnet_response['Subnets'][0]['AvailableIpAddressCount']
    
    def get_connections_free_ips(glue_connection_names, num_workers):
        good_connections = []
        usable_connections = []    
        for connection_name in glue_connection_names:
            try:
                available_ips = get_available_ips_from_connection(connection_name)
                # Priority to connections that can hold the full cluster and we haven't just tried
                if available_ips >= num_workers:
                    good_connections.append((connection_name, available_ips))
                elif available_ips >= 2: # The bare minimum to start a Glue job
                    usable_connections.append((connection_name, available_ips))                
            except Exception as e:
                print(f"[WARNING] Failed to check the free ips for:{connection_name}, will skip. Exception: {e}")  
        return good_connections, usable_connections
    
    def prioritize_connections(connection_list, num_workers, strategy):
        (good_connections, usable_connections) = get_connections_free_ips(connection_list, num_workers)
        print(f"Good connections: {good_connections}")
        print(f"Usable connections: {usable_connections}")
        all_conn = []
        if strategy=="random":
            shuffle(good_connections)
            shuffle(usable_connections)
            # Good connections have priority
            all_conn = good_connections + usable_connections
        elif strategy=="capacity":
            # We can sort both at the same time
            all_conn = good_connections + usable_connections
            all_conn.sort(key=lambda x: -x[1])
        else: 
            raise ValueError(f"Unknown strategy specified: {strategy}")    
        result = [c[0] for c in all_conn] # Just need the name
        # Keep at the end any other connections that could not be checked for ips
        result += [c for c in connection_list if c not in result]
        return result
    

  3. De volgende code maakt de DAG zelf met de taak taak uitvoeren, die de taak bijwerkt met de verbindingsvolgorde die is gedefinieerd door de strategie, deze uitvoert en wacht op de resultaten. De taaknaam, verbindingen en strategie zijn afkomstig van Airflow-variabelen, zodat deze eenvoudig kunnen worden geconfigureerd en bijgewerkt. Er zijn twee nieuwe pogingen met exponentiële uitstel geconfigureerd, dus als de taken mislukken, wordt de volledige taak herhaald, inclusief de verbindingsselectie. Misschien is de beste keuze nu een andere verbinding, of bevindt het eerder willekeurig gekozen subnet zich in een beschikbaarheidszone die momenteel te kampen heeft met een storing, en door een andere te kiezen, kan het herstellen.
    with DAG(
        dag_id='glue_job_dag',
        schedule_interval=None, # Run on demand only
        start_date=datetime(2000, 1, 1), # A start date is required
        max_active_runs=1,
        catchup=False
    ) as glue_dag:
        
        @task(
            task_id="glue_task", 
            retries=2,
            retry_delay=duration(seconds = 30),
            retry_exponential_backoff=True
        )
        def run_job_task(**ctx):    
            glue_connections = Variable.get("glue_job_dag.glue_connections").strip().split(',')
            glue_jobname = Variable.get("glue_job_dag.glue_job_name").strip()
            strategy= Variable.get('glue_job_dag.strategy', 'random') # random or capacity
            print(f"Connections available: {glue_connections}")
            print(f"Glue job name: {glue_jobname}")
            print(f"Strategy to use: {strategy}")
            job_props = glue_client.get_job(JobName=glue_jobname)['Job']            
            num_workers = job_props['NumberOfWorkers']
            
            glue_connections = prioritize_connections(glue_connections, num_workers, strategy)
            print(f"Running Glue job with the connection order: {glue_connections}")
            existing_connections = job_props.get('Connections',{}).get('Connections', [])
            # Preserve other connections that we don't manage
            other_connections = [con for con in existing_connections if con not in glue_connections]
            job_props['Connections'] = {"Connections": glue_connections + other_connections}
            # Clean up properties so we can reuse the dict for the update request
            for prop_name in ['Name', 'CreatedOn', 'LastModifiedOn', 'AllocatedCapacity', 'MaxCapacity']:
                del job_props[prop_name]
    
            GlueJobOperator(
                task_id='submit_job',
                job_name=glue_jobname,
                iam_role_name=job_props['Role'].split('/')[-1],
                update_config=True,
                create_job_kwargs=job_props,
                wait_for_completion=True
            ).execute(ctx)   
            
        run_job_task()
    

Maak de Airflow-workflow

Nu maakt u een workflow die de AWS Glue-taak aanroept die u zojuist hebt gemaakt:

  1. Zoek op de Amazon S3-console de bucket die is gemaakt door de CloudFormation-sjabloon, die een naam heeft die begint met de naam van de stapel en vervolgens -environmentbucket- (bijvoorbeeld, myairflowstack-environmentbucket-ap1qks3nvvr4).
  2. Maak in die bucket een map met de naam dagsen upload in die map het DAG-bestand glue_job_dag.py die u in de vorige sectie hebt gemaakt.
  3. Navigeer op de Amazon MWAA-console naar de omgeving die u hebt geïmplementeerd met de CloudFormation-stack.

Als de status nog niet is Beschikbaar, wacht tot het die status bereikt. Het mag niet langer dan 30 minuten duren sinds u de CloudFormation-stack heeft geïmplementeerd.

  1. Kies de omgevingslink in de tabel om de omgevingsdetails te bekijken.

Het is geconfigureerd om DAG's op te halen uit de bucket en map die u in de vorige stappen hebt gebruikt. Airflow controleert die map op wijzigingen.

  1. Kies Open de luchtstroom-UI om een ​​nieuw tabblad te openen met toegang tot de Airflow-gebruikersinterface, met behulp van de geïntegreerde IAM-beveiliging om u aan te melden.

Als er een probleem is met het DAG-bestand dat u hebt gemaakt, wordt er bovenaan de pagina een fout weergegeven waarin wordt aangegeven om welke regels het gaat. Bekijk in dat geval de stappen en upload opnieuw. Na een paar seconden wordt het geparseerd en wordt de foutbanner bijgewerkt of verwijderd.

  1. Op de beheerder menu, kies Variabelen.
  2. Voeg drie variabelen toe met de volgende sleutels en waarden:
    1. sleutel glue_job_dag.glue_connections met waarde MWAA-Glue-Blog-Subnet1,MWAA-Glue-Blog-Subnet2.
    2. sleutel glue_job_dag.glue_job_name met waarde AirflowBlogJob.
    3. sleutel glue_job_dag.strategy met waarde capacity.

Voer de taak uit met een dynamische subnettoewijzing

Nu bent u klaar om de workflow uit te voeren en te zien hoe de strategie de verbindingen dynamisch herschikt.

  1. Kies in de Airflow-gebruikersinterface: DAG's, en op de rij glue_job_dag, kies het afspeelpictogram.
  2. Op de Blader menu, kies Taakinstanties.
  3. Blader in de instancetabel naar rechts om de Log Url en kies het pictogram erop om het logboek te openen.

Het logboek wordt bijgewerkt terwijl de taak wordt uitgevoerd; u kunt de regel vinden die begint met "Lijmtaak uitvoeren met de verbindingsvolgorde:" en de voorgaande regels met details van de verbindings-IP's en de toegewezen categorie. Als er een fout optreedt, ziet u de details in dit logboek.

  1. Kies op de AWS Glue-console: ETL-banen in het navigatievenster en kies vervolgens de taak AirflowBlogJob.
  2. Op de Runs tabblad, kies het run-exemplaar en vervolgens het Logboeken uitvoeren link, waarmee een nieuw tabblad wordt geopend.
  3. Gebruik op het nieuwe tabblad de logstreamlink om het te openen.

Het zal het IP-adres weergeven waaraan het stuurprogramma is toegewezen en tot welk subnet het behoort, wat moet overeenkomen met de verbinding aangegeven door Airflow (als het log niet wordt weergegeven, kies dan Hervat zodat het wordt bijgewerkt zodra het beschikbaar is).

  1. Bewerk in de Airflow-gebruikersinterface de Airflow-variabele glue_job_dag.strategy om het in te stellen random.
  2. Voer de DAG meerdere keren uit en zie hoe de volgorde verandert.

Opruimen

Als u de implementatie niet langer nodig heeft, verwijdert u de bronnen om verdere kosten te voorkomen:

  1. Verwijder het Python-script dat u heeft geüpload, zodat de S3-bucket in de volgende stap automatisch kan worden verwijderd.
  2. Verwijder de CloudFormation-stack.
  3. Verwijder de AWS Glue-taak.
  4. Verwijder het script dat de taak heeft opgeslagen in Amazon S3.
  5. Verwijder de verbindingen die u als onderdeel van dit bericht hebt gemaakt.

Conclusie

In dit bericht hebben we laten zien hoe AWS Glue en Amazon MWAA kunnen samenwerken om geavanceerdere aangepaste workflows te bouwen, terwijl de operationele en managementoverhead worden geminimaliseerd. Deze oplossing geeft u meer controle over hoe uw AWS Glue-taak wordt uitgevoerd om te voldoen aan speciale operationele, netwerk- of beveiligingsvereisten.

Je eigen Amazon MWAA-omgeving kun je op meerdere manieren inzetten, zoals met de sjabloon gebruikt in dit bericht, op de Amazon MWAA-console, of met behulp van de AWS CLI. U kunt ook uw eigen strategieën implementeren om AWS Glue-taken te orkestreren, op basis van uw netwerkarchitectuur en vereisten (bijvoorbeeld om de taak indien mogelijk dichter bij de gegevens uit te voeren).


Over de auteurs

Michaël Greenshtein is een Analytics Specialist Solutions Architect voor de Publieke Sector.

Gonzalo herreros is een Senior Big Data Architect in het AWS Glue-team.

spot_img

Laatste intelligentie

spot_img