Zephyrnet-logo

Hoe ENGIE hun pijplijnen voor gegevensopname schaalt met Amazon MWAA

Datum:

ENGIE, een van de grootste nutsbedrijven in Frankrijk en een wereldspeler in de koolstofvrije energietransitie, produceert, transporteert en verhandelt elektriciteit, gas en energiediensten. Met 160,000 medewerkers wereldwijd is ENGIE een gedecentraliseerde organisatie met 25 business units met een hoge mate van delegatie en empowerment. Het gedecentraliseerde wereldwijde klantenbestand van ENGIE had veel gegevens verzameld en het had een slimmere, unieke aanpak en oplossing nodig om zijn initiatieven op elkaar af te stemmen en gegevens te leveren die opneembaar, organiseerbaar, beheersbaar, deelbaar en bruikbaar zijn voor alle wereldwijde bedrijfseenheden.

In 2018 besloot de bedrijfsleiding van het bedrijf om zijn digitale transformatie te versnellen door middel van data en innovatie door: een datagedreven bedrijf worden. Yves Le Gélard, chief digital officer bij ENGIE, legt het doel van het bedrijf uit: “Duurzaamheid is voor ENGIE de alfa en de omega van alles. Dit is onze bestaansreden. We helpen grote bedrijven en de grootste steden op aarde in hun pogingen om zo snel mogelijk over te gaan naar nul-koolstof, want het is eigenlijk de belangrijkste vraag voor de mensheid van vandaag.”

ENGIE gebruikt, net als elke andere grote onderneming, meerdere tools voor extraheren, transformeren en laden (ETL) om gegevens op te nemen in hun datameer op AWS. Niettemin hebben ze meestal dure licentieplannen. "Het bedrijf had een uniforme methode nodig voor het verzamelen en analyseren van gegevens om klanten te helpen hun waardeketens te beheren", zegt Gregory Wolowiec, de Chief Technology Officer die het dataprogramma van ENGIE leidt. ENGIE wilde een applicatie met een gratis licentie, goed geïntegreerd met meerdere technologieën en met een continue integratie, continue levering (CI/CD) pijplijn om al hun opnameprocessen gemakkelijker te kunnen schalen.

ENGIE begon te gebruiken Door Amazon beheerde workflows voor Apache Airflow (Amazon MWAA) om dit probleem op te lossen en begonnen met het verplaatsen van verschillende gegevensbronnen van on-premise applicaties en ERP's, AWS-services zoals Amazon roodverschuiving, Amazon relationele databaseservice (Amazone RDS), Amazon DynamoDB, externe services zoals Salesforce en andere cloudproviders naar een gecentraliseerd datameer bovenop Amazon eenvoudige opslagservice (Amazone S3).

Amazon MWAA wordt met name gebruikt om geharmoniseerde operationele en bedrijfsgegevens van verschillende on-premises en SaaS-gegevensbronnen (Software as a Service) te verzamelen en op te slaan in een gecentraliseerd datameer. Het doel van dit datameer is om een ​​"groepsprestatiecockpit" te creëren die een efficiënte, gegevensgestuurde analyse en doordachte besluitvorming door de raad van bestuur van Engie mogelijk maakt.

In dit bericht delen we hoe ENGIE een CI/CD-pijplijn heeft gemaakt voor een Amazon MWAA-projectsjabloon met behulp van een AWS Codecommit repository en aangesloten op AWS CodePipeline om de code en aangepaste plug-ins te bouwen, testen en verpakken. In dit geval hebben we een aangepaste plug-in ontwikkeld om gegevens uit Salesforce op te nemen op basis van de: Airflow Salesforce open-source plug-in.

Overzicht oplossingen

De volgende diagrammen illustreren de oplossingsarchitectuur die de geïmplementeerde Amazon MWAA-omgeving en de bijbehorende pijplijnen definieert. Het beschrijft ook het gebruiksscenario van de klant voor de opname van Salesforce-gegevens in Amazon S3.

Het volgende diagram toont de architectuur van de geïmplementeerde Amazon MWAA-omgeving en de geïmplementeerde pijplijnen.

De voorgaande architectuur wordt volledig ingezet via infrastructure as code (IaC). De uitvoering omvat het volgende:

  • Amazon MWAA-omgeving – Een aanpasbare Amazon MWAA-omgeving verpakt met plug-ins en vereisten en op een veilige manier geconfigureerd.
  • Voorzieningspijplijn – Het admin-team kan de Amazon MWAA-omgeving beheren met behulp van de meegeleverde CI/CD-provisioningpipeline. Deze pijplijn bevat een CodeCommit-repository die is aangesloten op CodePipeline om de omgeving en de bijbehorende plug-ins en vereisten continu bij te werken.
  • Projectpijplijn – Deze CI/CD-pijplijn wordt geleverd met een CodeCommit-repository die CodePipeline activeert om continu door gebruikers ontwikkelde DAG's te bouwen, testen en implementeren. Eenmaal ingezet, worden deze DAG's beschikbaar gesteld in de Amazon MWAA-omgeving.

Het volgende diagram toont de workflow voor gegevensopname, die de volgende stappen omvat:

  1. De DAG wordt handmatig of op basis van een schema geactiveerd door Amazon MWAA.
  2. Amazon MWAA initieert parameters voor gegevensverzameling en berekent batches.
  3. Amazon MWAA verdeelt verwerkingstaken onder zijn werknemers.
  4. Gegevens worden in batches opgehaald uit Salesforce.
  5. Amazon MWAA gaat ervan uit dat een AWS Identiteits- en toegangsbeheer (IAM)-rol met de nodige machtigingen om de verzamelde gegevens op te slaan in de doel-S3-bucket.

Deze AWS Cloud-ontwikkelingskit (AWS CDK)-constructie wordt geïmplementeerd met de volgende best practices voor beveiliging:

  • Met het principe van de minste bevoegdheden verleent u alleen machtigingen aan de bronnen of acties die gebruikers nodig hebben om taken uit te voeren.
  • S3-buckets worden geïmplementeerd met nalevingsregels voor beveiliging: versleuteling, versiebeheer en blokkering van openbare toegang.
  • Authenticatie- en autorisatiebeheer wordt afgehandeld met: AWS eenmalige aanmelding (AWS-SSO).
  • Airflow slaat verbindingen met externe bronnen op een veilige manier op in Airflow's standaard geheimen-backend of een alternatieve geheimen-backend zoals AWS-geheimenmanager or AWS Systems Manager-parameteropslag.

Voor dit bericht doorlopen we een use case waarbij we de gegevens van Salesforce gebruiken om deze in een ENGIE-datameer op te nemen om het te transformeren en bedrijfsrapporten op te bouwen.

Vereisten voor implementatie

Voor deze walkthrough zijn de volgende vereisten:

  • Basiskennis van het Linux-besturingssysteem
  • Toegang tot een AWS-account met beheerder of hoofdgebruiker (of gelijkwaardige) IAM-rol
    polissen bijgevoegd
  • Toegang tot een shell-omgeving of optioneel met AWS-cloudshell

Implementeer de oplossing

Voer de volgende stappen uit om de oplossing te implementeren en uit te voeren:

  1. Installeer AWS CDK.
  2. Bootstrap uw AWS-account.
  3. Definieer uw AWS CDK-omgevingsvariabelen.
  4. Implementeer de stapel.

Installeer AWS CDK

De beschreven oplossing is volledig geïmplementeerd met AWS CDK.

AWS CDK is een open-source softwareontwikkelingsraamwerk om uw cloudtoepassingsbronnen te modelleren en in te richten met behulp van bekende programmeertalen. Als u vertrouwd wilt raken met AWS CDK, kunt u de AWS CDK-workshop is een geweldige plek om te beginnen.

Installeer AWS CDK met behulp van de volgende opdrachten:

npm install -g aws-cdk
# To check the installation
cdk --version

Bootstrap uw AWS-account op

Ten eerste moet u ervoor zorgen dat de omgeving waarin u de oplossing wilt implementeren, is geweest bootstrap. U hoeft dit slechts één keer te doen per omgeving waar u AWS CDK-applicaties wilt implementeren. Als u niet zeker weet of uw omgeving al is opgestart, kunt u de opdracht altijd opnieuw uitvoeren:

cdk bootstrap aws://YOUR_ACCOUNT_ID/YOUR_REGION

Definieer uw AWS CDK-omgevingsvariabelen

Definieer op Linux of MacOS uw omgevingsvariabelen met de volgende code:

export CDK_DEFAULT_ACCOUNT=YOUR_ACCOUNT_ID
export CDK_DEFAULT_REGION=YOUR_REGION

Gebruik in Windows de volgende code:

setx CDK_DEFAULT_ACCOUNT YOUR_ACCOUNT_ID
setx CDK_DEFAULT_REGION YOUR_REGION

De stapel implementeren

Standaard implementeert de stack een standaard Amazon MWAA-omgeving met de bijbehorende pijplijnen die eerder zijn beschreven. Het creëert een nieuwe VPC om de Amazon MWAA-bronnen te hosten.

De stapel kan worden aangepast met behulp van de parameters in de volgende tabel.

Om een ​​parameter aan de constructie door te geven, kun je de . gebruiken AWS CDK runtime-context. Als u van plan bent uw omgeving aan te passen met meerdere parameters, raden we u aan om de cdk.json context-bestand met versiebeheer om onverwachte wijzigingen in uw implementaties te voorkomen. In ons voorbeeld geven we slechts één parameter door aan de constructie. Daarom gebruiken we voor de eenvoud van de tutorial de the --context or -c optie voor de cdk commando, zoals in het volgende voorbeeld:

cdk deploy -c paramName=paramValue -c paramName=paramValue ...

Parameter Omschrijving Standaard Geldige waarden
vpcId VPC-ID waar het cluster is geïmplementeerd. Als er geen is, maakt u een nieuwe aan en heeft u de parameter nodig cidr in dat geval. Geen VPC-ID
cider De CIDR voor de VPC die is gemaakt om Amazon MWAA-bronnen te hosten. Alleen gebruikt als de vpcId is niet gedefinieerd. 172.31.0.0/16 IP-CIDR
subnetId's Door komma's gescheiden lijst met subnet-ID's waar het cluster is geïmplementeerd. Als er geen is, zoekt u naar privé-subnetten in dezelfde beschikbaarheidszone. Geen Lijst met subnet-ID's (gescheiden door komma's)
envNaam Amazon MWAA-omgevingsnaam MwaaEnvironment Draad
envTags Amazon MWAA-omgevingstags Geen Zie het volgende JSON-voorbeeld: '{"Environment":"MyEnv", "Application":"MyApp", "Reason":"Airflow"}'
milieuklasse Amazon MWAA-omgevingsklasse mw1.klein mw1.klein, mw1.medium, mw1.groot
maxWerknemers Amazon MWAA maximale werknemers 1 int
webserverToegangsmodus Toegangsmodus Amazon MWAA-omgeving (privé of openbaar) PUBLIC_ONLY PUBLIC_ONLY, PRIVATE_ONLY
geheimenBackend Amazon MWAA-omgevingsgeheimen backend Luchtstroom SecretsManager

Kloon de GitHub-repository:

git clone https://github.com/aws-samples/cdk-amazon-mwaa-cicd

Implementeer de stapel met de volgende opdracht:

cd mwaairflow && pip install . && cdk synth && cdk deploy -c vpcId=YOUR_VPC_ID

De volgende schermafbeelding toont de stackimplementatie:

De volgende schermafbeelding toont de geïmplementeerde stapel:

Oplossingsbronnen maken

Voor deze walkthrough moet u aan de volgende vereisten voldoen:

Als u geen Salesforce-account hebt, kunt u een SalesForce-ontwikkelaarsaccount maken:

  1. Meld u aan voor een ontwikkelaarsaccount.
  2. Kopieer de host uit de e-mail die u ontvangt.
  3. Log in op uw nieuwe Salesforce-account
  4. Kies het profielpictogram en vervolgens Instellingen.
  5. Kies Reset mijn beveiligingstoken.
  6. Controleer uw e-mail en kopieer de beveiligingstoken die u ontvangt.

Nadat u aan deze vereisten hebt voldaan, bent u klaar om de volgende bronnen te maken:

  • Een S3-bucket voor Salesforce-uitvoergegevens
  • Een IAM-rol en IAM-beleid om de Salesforce-uitvoergegevens op Amazon S3 te schrijven
  • Een Salesforce-verbinding op de Airflow-gebruikersinterface om te kunnen lezen vanuit Salesforce
  • Een AWS-verbinding op de Airflow UI om te kunnen schrijven op Amazon S3
  • Een Airflow-variabele in de Airflow-gebruikersinterface om de naam van de doel-S3-bucket op te slaan

Een S3-bucket maken voor Salesforce-uitvoergegevens

Voer de volgende stappen uit om een ​​output S3-bucket te maken:

  1. Kies op de Amazon S3-console Maak een bucket.

De Maak een bucket wizard wordt geopend.

  1. Voor Bucketnaam, voer een DNS-compatibele naam in voor uw bucket, zoals: airflow-blog-post.
  2. Voor Regio, kies de regio waar u uw Amazon MWAA-omgeving hebt geïmplementeerd, bijvoorbeeld VS Oost (N. Virginia) us-oost-1.
  3. Kies Maak een bucket.

Voor meer informatie, zie Een bucket maken.

Maak een IAM-rol en IAM-beleid om de Salesforce-uitvoergegevens op Amazon S3 te schrijven

In deze stap maken we een IAM-beleid waarmee Amazon MWAA op uw S3-bucket kan schrijven.

  1. Kies op de IAM-console in het navigatievenster Policies.
  2. Kies Maak beleid.
  3. Kies de JSON Tab.
  4. Voer het volgende JSON-beleidsdocument in en vervang airflow-blog-post met uw bucketnaam:
    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": ["arn:aws:s3:::airflow-blog-post"] }, { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:DeleteObject" ], "Resource": ["arn:aws:s3:::airflow-blog-post/*"] } ]
    }

  5. Kies Volgende: Tags.
  6. Kies Volgende: Recensie.
  7. Voor Naam, kies een naam voor uw polis (bijvoorbeeld airflow_data_output_policy).
  8. Kies Maak beleid.

Laten we het IAM-beleid koppelen aan een nieuwe IAM-rol die we gebruiken in onze Airflow-verbindingen.

  1. Kies op de IAM-console rollen in het navigatievenster en kies vervolgens Rol creëren.
  2. In het Of selecteer een service om de gebruiksscenario's te bekijken sectie, kies S3.
  3. Voor Selecteer uw gebruiksgeval, kiezen S3.
  4. Zoek naar de naam van het IAM-beleid dat we in de vorige stap hebben gemaakt (airflow_data_output_role) en selecteer het beleid.
  5. Kies Volgende: Tags.
  6. Kies Volgende: Review.
  7. Voor Rol naam, kies een naam voor je rol (airflow_data_output_role).
  8. Bekijk de rol en kies vervolgens Rol creëren.

U wordt doorgestuurd naar de rollen pagina.

  1. Voer in het zoekvak de naam in van de rol die u hebt gemaakt en kies deze.
  2. Kopieer de rol ARN om later te gebruiken om de AWS-verbinding op Airflow te maken.

Maak een Salesforce-verbinding op de Airflow-gebruikersinterface om te kunnen lezen vanuit Salesforce

Om gegevens uit Salesforce te lezen, moeten we een verbinding maken met behulp van de Airflow-gebruikersinterface.

  1. Kies in de Airflow-gebruikersinterface: beheerder.
  2. Kies aansluitingenen vervolgens het plusteken om een ​​nieuwe verbinding te maken.
  3. Vul de velden in met de vereiste informatie.

De volgende tabel bevat meer informatie over elke waarde.

Veld Verplicht Omschrijving Values
Conn-ID Ja Verbindings-ID om te definiëren en later te gebruiken in de DAG Bijvoorbeeld salesforce_connection
Conn-type Ja Connectie type HTTP
gastheer Ja Salesforce-hostnaam host-dev-ed.my.salesforce.com or host.lightning.force.com. Vervang de host door uw Salesforce-host en voeg de . niet toe http:// als voorvoegsel.
Inloggen Ja De Salesforce-gebruikersnaam. De gebruiker moet leestoegang hebben tot de salesforce-objecten. admin@example.com
Wachtwoord Ja Het bijbehorende wachtwoord voor de gedefinieerde gebruiker. MijnWachtwoord123
Haven Nee Salesforce-instantiepoort. Standaard 443. 443
Extra Ja Geef de extra parameters op (als een JSON-woordenboek) die in de Salesforce-verbinding kunnen worden gebruikt. security_token is het Salesforce-beveiligingstoken voor authenticatie. Om het Salesforce-beveiligingstoken in uw e-mail te krijgen, moet u: reset je beveiligingstoken. {"security_token":"AbCdE..."}

Maak een AWS-verbinding in de Airflow UI om te kunnen schrijven op Amazon S3

Een AWS-verbinding is vereist om gegevens naar Amazon S3 te uploaden, dus we moeten een verbinding maken met behulp van de Airflow-gebruikersinterface.

  1. Kies in de Airflow-gebruikersinterface: beheerder.
  2. Kies aansluitingenen kies vervolgens het plusteken om een ​​nieuwe verbinding te maken.
  3. Vul de velden in met de vereiste informatie.

De volgende tabel geeft meer informatie over de velden.

Veld Verplicht Omschrijving Waarde
Conn-ID Ja Verbindings-ID om te definiëren en later te gebruiken in de DAG Bijvoorbeeld aws_connection
Conn-type Ja Connectie type Amazon Web Services
Extra Ja Het is verplicht om de regio te specificeren. U moet ook de rol ARN opgeven die we eerder hebben gemaakt.
{ "region":"eu-west-1", "role_arn":"arn:aws:iam::123456789101:role/airflow_data_output_role "
}

Maak een Airflow-variabele in de Airflow-gebruikersinterface om de naam van de doel-S3-bucket op te slaan

We maken een variabele om de naam van de doel-S3-bucket in te stellen. Deze variabele wordt gebruikt door de DAG. We moeten dus een variabele maken met behulp van de Airflow-gebruikersinterface.

  1. Kies in de Airflow-gebruikersinterface: beheerder.
  2. Kies Variabelenen kies vervolgens het plusteken om een ​​nieuwe variabele te maken.
  3. Voor sleutel, ga naar binnen bucket_name.
  4. Voor Val, voer de naam in van de S3-bucket die u in een vorige stap hebt gemaakt (airflow-blog-post).

Maak en implementeer een DAG in Amazon MWAA

Om gegevens van Salesforce in Amazon S3 te kunnen opnemen, moeten we een DAG (Directed Acyclic Graph) maken. Voer de volgende stappen uit om de DAG te maken en te implementeren:

  1. Maak een lokale Python DAG.
  2. Implementeer uw DAG met behulp van de project CI/CD-pijplijn.
  3. Voer uw DAG uit op de Airflow-gebruikersinterface.
  4. Toon uw gegevens in Amazon S3 (met S3 Select).

Maak een lokale Python DAG

de verstrekte SalesForceToS3Operator stelt u in staat om gegevens van Salesforce-objecten op te nemen in een S3-bucket. Verwijzen naar standaard Salesforce-objecten voor de volledige lijst met objecten waarvan u gegevens kunt opnemen met deze Airflow-operator.

In dit geval nemen we gegevens op van het Opportunity Salesforce-object. We halen de gegevens van de afgelopen 6 maanden op in maandelijkse batches en we filteren op een specifieke lijst met velden.

De DAG in de voorbeeld in GitHub-repository importeert de laatste 6 maanden van het Opportunity-object (één bestand per maand) door de lijst met opgehaalde velden te filteren.

Deze operator neemt twee verbindingen als parameters:

  • Een AWS-verbinding die wordt gebruikt om opgenomen gegevens te uploaden naar Amazon S3.
  • Een Salesforce-verbinding om gegevens uit Salesforce te lezen.

De volgende tabel geeft meer informatie over de parameters.

Parameter Type Verplicht Omschrijving
sf_conn_id snaar Ja Naam van de Airflow-verbinding met de volgende informatie:

  • gebruikersnaam
  • wachtwoord
  • beveiligingstoken
sf_obj snaar Ja Naam van het relevante Salesforce-object (Account, Lead, Opportunity)
s3_conn_id snaar Ja De bestemming S3-verbindings-ID
s3_bucket snaar Ja De bestemming S3-emmer
s3_toets snaar Ja De S3-sleutel van de bestemming
sf_velden snaar Nee De (optionele) lijst met velden die u uit het object wilt halen (Id, Name, enzovoort).
Indien geen (de standaard), dan krijgt dit alle velden voor het object.
fmt snaar Nee Het (optionele) formaat waarin de S3-sleutel van de gegevens moet staan.
Mogelijke waarden zijn CSV (standaard), JSON en NDJSON.
van datum datumnotatie Nee Een specifieke datum-tijd (optioneel) opgemaakte invoer om query's uit te voeren voor incrementele opname.
Geëvalueerd tegen de SystemModStamp attribuut.
Niet compatibel met de queryparameter en moet de notatie datum-tijd hebben (bijvoorbeeld 2021-01-01T00:00:00Z).
Standaard: Geen
daten datumnotatie Nee Een specifieke datum-tijd (optioneel) opgemaakte invoer om query's op uit te voeren voor incrementele opname.
Geëvalueerd tegen de SystemModStamp attribuut.
Niet compatibel met de queryparameter en moet de notatie datum-tijd hebben (bijvoorbeeld 2021-01-01T00:00:00Z).
Standaard: Geen
vraag snaar Nee Een specifieke query (optioneel) die moet worden uitgevoerd voor het opgegeven object.
Dit overschrijft het maken van standaardquery's.
Standaard: Geen
relatie_object snaar Nee Sommige query's vereisen dat relatieobjecten werken, en dit zijn niet dezelfde namen als het Salesforce-object.
Geef dat relatieobject hier op (optioneel).
Standaard: Geen
record_time_toegevoegd boolean Nee Stel deze optionele waarde in op true als u een Unix-tijdstempelveld wilt toevoegen aan de resulterende gegevens die aangeven wanneer de gegevens zijn opgehaald uit Salesforce.
Standaard: False
dwang_naar_tijdstempel boolean Nee Stel deze optionele waarde in op true als u alle velden met datums en datetimes wilt converteren naar Unix timestamp (UTC).
Standaard: False

De eerste stap is om de operator in uw DAG te importeren:

from operators.salesforce_to_s3_operator import SalesforceToS3Operator

Definieer vervolgens uw DAG-standaard ARG's, die u kunt gebruiken voor uw algemene taakparameters:

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = { 'owner': 'user-demo@example.com', 'depends_on_past': False, 'start_date': days_ago(2), 'retries': 0, 'retry_delay': timedelta(minutes=1), 'sf_conn_id': 'salesforce_connection', 's3_conn_id': 'aws_connection', 's3_bucket': 'salesforce-to-s3',
}
...

Ten slotte definieert u de taken om de operator te gebruiken.

De volgende voorbeelden illustreren enkele use-cases.

Volledige opname van Salesforce-object

Deze taak neemt alle inhoud van het Salesforce-object op dat is gedefinieerd in sf_obj. Dit selecteert alle beschikbare velden van het object en schrijft ze in het gedefinieerde formaat in fmt. Zie de volgende code:

...
salesforce_to_s3 = SalesforceToS3Operator( task_id="Opportunity_to_S3", sf_conn_id=default_args["sf_conn_id"], sf_obj="Opportunity", fmt="ndjson", s3_conn_id=default_args["s3_conn_id"], s3_bucket=default_args["s3_bucket"], s3_key=f"salesforce/raw/dt={s3_prefix}/{table.lower()}.json", dag=salesforce_to_s3_dag,
)
...

Gedeeltelijke opname van Salesforce-objecten op basis van velden

Deze taak neemt specifieke velden van het Salesforce-object op dat is gedefinieerd in sf_obj. De geselecteerde velden worden gedefinieerd in de optionele sf_fields parameter. Zie de volgende code:

...
salesforce_to_s3 = SalesforceToS3Operator( task_id="Opportunity_to_S3", sf_conn_id=default_args["sf_conn_id"], sf_obj="Opportunity", sf_fields=["Id","Name","Amount"], fmt="ndjson", s3_conn_id=default_args["s3_conn_id"], s3_bucket=default_args["s3_bucket"], s3_key=f"salesforce/raw/dt={s3_prefix}/{table.lower()}.json", dag=salesforce_to_s3_dag,
)
...

Gedeeltelijke opname van Salesforce-objecten op basis van tijdsperiode

Deze taak neemt alle velden van het Salesforce-object op dat is gedefinieerd in sf_obj. De tijdsperiode kan relatief zijn met behulp van from_date or to_date parameters of absoluut door beide parameters te gebruiken.

Het volgende voorbeeld illustreert de relatieve inname vanaf de gedefinieerde datum:

...
salesforce_to_s3 = SalesforceToS3Operator( task_id="Opportunity_to_S3", sf_conn_id=default_args["sf_conn_id"], sf_obj="Opportunity", from_date="YESTERDAY", fmt="ndjson", s3_conn_id=default_args["s3_conn_id"], s3_bucket=default_args["s3_bucket"], s3_key=f"salesforce/raw/dt={s3_prefix}/{table.lower()}.json", dag=salesforce_to_s3_dag,
)
...

De from_date en to_date parameters ondersteunen de datum-tijdnotatie van Salesforce. Het kan een specifieke datum zijn of een letterlijke (bijvoorbeeld: TODAY, LAST_WEEK, LAST_N_DAYS:5). Voor meer informatie over datumnotaties, zie Datumnotaties en datumletters.

Voor de volledige DAG, zie de voorbeeld in GitHub-repository.

Deze code genereert dynamisch taken die query's uitvoeren om de gegevens van het Opportunity-object op te halen in de vorm van batches van 1 maand.

De sf_fields parameter stelt ons in staat om alleen de geselecteerde velden uit het object te extraheren.

Sla de DAG lokaal op als salesforce_to_s3.py.

Implementeer uw DAG met behulp van de project-CI/CD-pijplijn

Als onderdeel van de CDK-implementatie zijn een CodeCommit-repository en CodePipeline-pijplijn gemaakt om continu DAG's te bouwen, testen en implementeren in uw Amazon MWAA-omgeving.

Om de nieuwe DAG te implementeren, moet de broncode worden vastgelegd in de CodeCommit-repository. Dit activeert een CodePipeline-run die uw nieuwe DAG bouwt, test en implementeert en deze beschikbaar maakt in uw Amazon MWAA-omgeving.

  1. Meld u aan bij de CodeCommit-console in uw implementatieregio.
  2. Onder bron, kiezen Vindplaatsen.

Je zou een nieuwe repository moeten zien mwaaproject.

  1. Duw je nieuwe DAG in de mwaaproject repository onder dags. U kunt hiervoor de CodeCommit-console of de Git-opdrachtregel gebruiken:
    1. CodeCommit-console:
      1. Kies de naam van de CodeCommit-repository van het project mwaaproject en navigeer onder dags.
      2. Kies Bestand toevoegen en Upload bestand en upload je nieuwe DAG.
    2. Git-opdrachtregel:
      1. Om te kunnen klonen en toegang te krijgen tot uw CodeCommit-project met de Git-opdrachtregel, moet u ervoor zorgen dat de Git-client correct is geconfigureerd. Verwijzen naar Instellen voor AWS CodeCommit.
      2. Kloon de repository met de volgende opdracht na het vervangen: met uw project Regio:
        git clone https://git-codecommit.<region>.amazonaws.com/v1/repos/mwaaproject

      3. Kopieer het DAG-bestand onder dags en voeg het toe met het commando:
        git add dags/salesforce_to_s3.py

      4. Leg je nieuwe bestand vast met een bericht:
        git commit -m "add salesforce DAG"

      5. Push het lokale bestand naar de CodeCommit-repository:

De nieuwe commit triggert een nieuwe pijplijn die de nieuwe DAG bouwt, test en implementeert. U kunt de pijplijn volgen op de CodePipeline-console.

  1. Kies op de CodePipeline-console Pijpleiding in het navigatievenster.
  2. Op de Pijpleidingen pagina, je zou het moeten zien mwaaproject-pipeline.
  3. Kies de pijplijn om de details ervan weer te geven.

Nadat u hebt gecontroleerd of de pijplijnuitvoering is gelukt, kunt u controleren of de DAG is geïmplementeerd in de S3-bucket en daarom beschikbaar is op de Amazon MWAA-console.

  1. Zoek op de Amazon S3-console naar een bucket die begint met mwaairflowstack-mwaaenvstackne en ga onder dags.

Je zou de nieuwe DAG moeten zien.

  1. Kies op de Amazon MWAA-console: DAG's.

Je zou de nieuwe DAG moeten kunnen zien.

Voer uw DAG uit op de Airflow-gebruikersinterface

Ga naar de Airflow UI en schakel de DAG in.

Hierdoor wordt uw DAG automatisch geactiveerd.

Later kunt u doorgaan met het handmatig activeren door het pictogram Uitvoeren te kiezen.

Kies de DAG en Grafiekweergave om de uitvoering van uw DAG te zien.

Als u een probleem ondervindt, kunt u de logboeken van de mislukte taken controleren vanuit het contextmenu van de taakinstantie.

Toon uw gegevens in Amazon S3 (met S3 Select)

Voer de volgende stappen uit om uw gegevens weer te geven:

  1. Op de Amazon S3-console, in de Emmers lijst, kiest u de naam van de bucket die de uitvoer van de Salesforce-gegevens bevat (airflow-blog-post).
  2. In het Objecten lijst, kiest u de naam van de map met het object dat u hebt gekopieerd uit Salesforce (opportunity).
  3. Kies de raw-map en de dt map met de laatste tijdstempel.
  4. Selecteer een bestand.
  5. Op de Acties menu, kies Opvragen met S3 Select.
  6. Kies SQL-query uitvoeren om een ​​voorbeeld van de gegevens te bekijken.

Opruimen

Om toekomstige kosten te voorkomen, verwijdert u de AWS CloudFormation-stack en de resources die u als onderdeel van dit bericht hebt ingezet.

  1. Op de AWS CloudFormation-console,
    verwijder de stapel
    MWAAirflowStack.

Om de ingezette bronnen op te schonen met behulp van de AWS-opdrachtregelinterface (AWS CLI), kunt u eenvoudig de volgende opdracht uitvoeren:

cdk destroy MWAAirflowStack

Zorg ervoor dat u zich in het hoofdpad van het project bevindt wanneer u de opdracht uitvoert.

Nadat u hebt bevestigd dat u de CloudFormation-stack wilt vernietigen, worden de bronnen van de oplossing uit uw AWS-account verwijderd.

De volgende schermafbeelding toont het proces van het implementeren van de stapel:

De volgende schermafbeelding bevestigt dat de stapel niet is geïmplementeerd.

  1. Navigeer naar de Amazon S3-console en zoek de twee emmers met: mwaairflowstack-mwaaenvstack en mwaairflowstack-mwaaproj die zijn gemaakt tijdens de implementatie.
  2. Selecteer elke emmer de inhoud verwijderenen verwijder vervolgens de bucket.
  3. Verwijder de IAM-rol die is gemaakt om op de S3-buckets te schrijven.

Conclusie

ENGIE ontdekte significante waarde door Amazon MWAA te gebruiken, waardoor zijn wereldwijde business units data op een productievere manier konden verwerken. Dit bericht presenteerde hoe ENGIE hun pijplijnen voor gegevensopname heeft geschaald met behulp van Amazon MWAA. Het eerste deel van het bericht beschreef de architectuurcomponenten en hoe met succes een CI/CD-pijplijn voor een Amazon MWAA-projectsjabloon kan worden geïmplementeerd met behulp van een CodeCommit-repository en deze in CodePipeline kan worden aangesloten om de code en aangepaste plug-ins te bouwen, testen en verpakken. Het tweede deel leidde u door de stappen om het opnameproces van Salesforce te automatiseren met behulp van Airflow met een voorbeeld. Voor de Airflow-configuratie heb je Airflow-variabelen gebruikt, maar je kunt ook Secrets Manager met Amazon MWAA met de secretsBackend parameter bij het implementeren van de stapel.

De use case die in dit bericht wordt besproken, is slechts één voorbeeld van hoe je Amazon MWAA kunt gebruiken om het op grote schaal gemakkelijker te maken om end-to-end datapijplijnen in de cloud op te zetten en te exploiteren. Voor meer informatie over Amazon MWAA, bekijk de: Gebruikershandleiding.


Over de auteurs

Anouar Zaaber is Senior Engagement Manager bij AWS Professional Services. Hij leidt interne AWS-, externe partner- en klantteams om AWS-cloudservices te leveren waarmee de klanten hun bedrijfsresultaten kunnen realiseren.

Amine El Mallem is een Data/ML Ops Engineer bij AWS Professional Services. Hij werkt met klanten aan het ontwerpen, automatiseren en bouwen van oplossingen op AWS voor hun zakelijke behoeften.

Armando Segnini is een Data Architect bij AWS Professional Services. Hij besteedt zijn tijd aan het bouwen van schaalbare big data- en analyseoplossingen voor AWS Enterprise- en strategische klanten. Armando houdt er ook van om met zijn gezin over de hele wereld te reizen en foto's te maken van de plaatsen die hij bezoekt.

Mohamed-Ali Elouaer is een DevOps-consultant bij AWS Professional Services. Hij maakt deel uit van het AWS ProServe-team en helpt zakelijke klanten bij het oplossen van complexe problemen met betrekking tot automatisering, beveiliging en monitoring met behulp van AWS-services. In zijn vrije tijd houdt hij van reizen en films kijken.

Julien Grinsztajn is architect bij ENGIE. Hij maakt deel uit van het Digital & IT Consulting ENGIE IT-team dat werkt aan de definitie van de architectuur voor complexe projecten met betrekking tot data-integratie en netwerkbeveiliging. In zijn vrije tijd reist hij graag over de oceanen om haaien en andere zeedieren te ontmoeten.

spot_img

Laatste intelligentie

spot_img