Zephyrnet-logo

Sluit u aan bij een streaming gegevensbron met CDC-gegevens voor realtime serverloze gegevensanalyse met behulp van AWS Glue, AWS DMS en Amazon DynamoDB | Amazon-webservices

Datum:

Klanten gebruiken datawarehousing-oplossingen om hun traditionele analytische taken uit te voeren. De laatste tijd hebben datameren veel aan populariteit gewonnen om de basis te worden voor analytische oplossingen, omdat ze voordelen bieden zoals schaalbaarheid, fouttolerantie en ondersteuning voor gestructureerde, semi-gestructureerde en ongestructureerde datasets.

Datalakes zijn niet standaard transactioneel; er zijn echter meerdere opensource-frameworks die datameren verbeteren met ACID-eigenschappen, waardoor een beste van beide werelden wordt geboden tussen transactionele en niet-transactionele opslagmechanismen.

Traditionele pijplijnen voor batchopname en verwerking, waarbij bewerkingen zoals het opschonen van gegevens en het samenvoegen met referentiegegevens betrokken zijn, zijn eenvoudig te maken en kostenefficiënt te onderhouden. Het is echter een uitdaging om datasets, zoals Internet of Things (IoT) en clickstreams, snel op te nemen met SLA's voor bijna realtime levering. U wilt ook incrementele updates toepassen met change data capture (CDC) van het bronsysteem naar de bestemming. Om tijdig datagestuurde beslissingen te nemen, moet u rekening houden met gemiste records en tegendruk, en de ordening en integriteit van gebeurtenissen handhaven, vooral als de referentiegegevens ook snel veranderen.

In dit bericht proberen we deze uitdagingen aan te pakken. We bieden een stapsgewijze handleiding om streaminggegevens samen te voegen met een referentietabel die in realtime verandert AWS lijm, Amazon DynamoDB en AWS-databasemigratieservice (AWS-DMS). We laten ook zien hoe u streaminggegevens kunt opnemen in een transactioneel datameer met behulp van Apache Hudi om incrementele updates te bereiken met ACID-transacties.

Overzicht oplossingen

Voor ons voorbeeldgebruik komen er streaminggegevens door Amazon Kinesis-gegevensstromen, en referentiegegevens worden beheerd in MySQL. De referentiegegevens worden continu gerepliceerd van MySQL naar DynamoDB via AWS DMS. De vereiste hier is om de real-time stroomgegevens te verrijken door ze bijna in realtime samen te voegen met de referentiegegevens, en om ze bevraagbaar te maken vanuit een query-engine zoals Amazone Athene met behoud van consistentie. In dit geval kunnen referentiegegevens in MySQL worden bijgewerkt wanneer de vereiste wordt gewijzigd, en vervolgens moeten query's resultaten opleveren door updates in de referentiegegevens weer te geven.

Deze oplossing verhelpt het probleem dat gebruikers zich willen aansluiten bij streams met veranderende referentiedatasets wanneer de grootte van de referentiedataset klein is. De referentiegegevens worden bijgehouden in DynamoDB-tabellen en de streamingtaak laadt de volledige tabel in het geheugen voor elke microbatch, waarbij een high-throughput-stream wordt samengevoegd met een kleine referentiegegevensset.

Het volgende diagram illustreert de oplossingsarchitectuur.

Architectuur

Voorwaarden

Voor deze walkthrough moet u aan de volgende vereisten voldoen:

IAM-rollen en S3-bucket maken

In deze sectie maakt u een Amazon eenvoudige opslagservice (Amazon S3) emmer en twee AWS Identiteits- en toegangsbeheer (IAM) rollen: één voor de AWS Glue-taak en één voor AWS DMS. Dit doen we aan de hand van een AWS CloudFormatie sjabloon. Voer de volgende stappen uit:

  1. Meld u aan bij de AWS CloudFormation-console.
  2. Kies Start Stack::
  3. Kies Volgende.
  4. Voor Stack naam, voer een naam in voor uw stapel.
  5. Voor DynamoDBTabelnaam, ga naar binnen tgt_country_lookup_table. Dit is de naam van uw nieuwe DynamoDB-tabel.
  6. Voor S3BucketNamePrefix, voer het voorvoegsel van uw nieuwe S3-bucket in.
  7. kies Ik erken dat AWS CloudFormation IAM-bronnen met aangepaste namen kan maken.
  8. Kies Maak een stapel.

Het maken van een stapel kan ongeveer 1 minuut duren.

Een Kinesis-gegevensstroom maken

In deze sectie maakt u een Kinesis-gegevensstroom:

  1. Kies op de Kinesis-console Gegevensstromen in het navigatievenster.
  2. Kies Maak een datastroom.
  3. Voor Naam gegevensstroom, voer je streamnaam in.
  4. Laat de overige instellingen als standaard staan ​​en kies Maak een datastroom.

Er wordt een Kinesis-gegevensstroom gemaakt met de on-demand-modus.

Maak en configureer een Aurora MySQL-cluster

In deze sectie maakt en configureert u een Aurora MySQL-cluster als de brondatabase. Eerst, configureer uw bron Aurora MySQL-databasecluster om CDC in te schakelen via AWS DMS naar DynamoDB.

Maak een parametergroep

Voer de volgende stappen uit om een ​​nieuwe parametergroep te maken:

  1. Kies op de Amazon RDS-console Parametergroepen in het navigatievenster.
  2. Kies Parametergroep maken.
  3. Voor Familie van parametergroepenselecteer aurora-mysql5.7.
  4. Voor Type, kiezen DB-clusterparametergroep.
  5. Voor Groepsnaam, ga naar binnen my-mysql-dynamodb-cdc.
  6. Voor Omschrijving, ga naar binnen Parameter group for demo Aurora MySQL database.
  7. Kies creëren.
  8. kies my-mysql-dynamodb-cdcen kies Edit voor Parametergroepacties.
  9. Bewerk de parametergroep als volgt:
Naam Waarde
binlog_row_image vol
binlog_format RIJ
binlog_checksum GEEN
log_slave_updates 1
  1. Kies Wijzigingen opslaan.

RDS-parametergroep

Maak het Aurora MySQL-cluster

Voer de volgende stappen uit om het Aurora MySQL-cluster te maken:

  1. Kies op de Amazon RDS-console databases in het navigatievenster.
  2. Kies Maak een database.
  3. Voor Kies een methode voor het maken van een database, kiezen Standaard maken.
  4. Onder Motor optiesvoor Aandrijving, kiezen Aurora (MySQL-compatibel).
  5. Voor Motorversie, kiezen Aurora (MySQL 5.7) 2.11.2.
  6. Voor Sjablonen, kiezen Productie.
  7. Onder Instellingenvoor DB-cluster-ID, voer een naam in voor uw database.
  8. Voor Master-gebruikersnaam, voer uw primaire gebruikersnaam in.
  9. Voor Master wachtwoord en Bevestig het hoofdwachtwoord, voer je primaire wachtwoord in.
  10. Onder Instantie configuratievoor DB-instantieklasse, kiezen Burstable-klassen (inclusief t-klassen) En kies db.t3.klein.
  11. Onder Beschikbaarheid & duurzaamheidvoor Multi-AZ-implementatie, kiezen Maak geen Aurora-replica.
  12. Onder Connectiviteitvoor Rekenbron, kiezen Maak geen verbinding met een EC2-rekenresource.
  13. Voor Netwerktype, kiezen IPv4.
  14. Voor Virtuele privécloud (VPC), kies uw VPC.
  15. Voor DB-subnetgroep, kies uw openbare subnet.
  16. Voor Publieke toegang, kiezen Ja.
  17. Voor VPC-beveiligingsgroep (firewall), kiest u de beveiligingsgroep voor uw openbare subnet.
  18. Onder Database-authenticatievoor Opties voor database-authenticatie, kiezen Wachtwoord authenticatie.
  19. Onder Aanvullende configuratievoor DB-clusterparametergroep, kiest u de clusterparametergroep die u eerder hebt gemaakt.
  20. Kies Maak een database.

Machtigingen verlenen aan de brondatabase

De volgende stap is het verlenen van de vereiste toestemming voor de bron Aurora MySQL-database. Nu kunt u verbinding maken met het DB-cluster met behulp van de MySQL-hulpprogramma. U kunt query's uitvoeren om de volgende taken uit te voeren:

  • Maak een demodatabase en -tabel en voer query's uit op de gegevens
  • Verleen toestemming voor een gebruiker die wordt gebruikt door het AWS DMS-eindpunt

Voer de volgende stappen uit:

  1. Meld u aan bij de EC2-instantie die u gebruikt om verbinding te maken met uw DB-cluster.
  2. Voer de volgende opdracht in bij de opdrachtprompt om verbinding te maken met de primaire DB-instantie van uw DB-cluster:
$ mysql -h mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com -P 3306 -u admin -p

  1. Voer de volgende SQL-opdracht uit om een ​​database te maken:
> CREATE DATABASE mydev;

  1. Voer de volgende SQL-opdracht uit om een ​​tabel te maken:
> use mydev; > CREATE TABLE country_lookup_table
(
code varchar(5),
countryname varchar(40) not null,
combinedname varchar(40) not null
);

  1. Voer de volgende SQL-opdracht uit om de tabel met gegevens te vullen:
> INSERT INTO country_lookup_table(code, countryname, combinedname) VALUES ('IN', 'India', 'IN-India'), ('US', 'USA', 'US-USA'), ('CA', 'Canada', 'CA-Canada'), ('CN', 'China', 'CN-China');

  1. Voer de volgende SQL-opdracht uit om een ​​gebruiker voor het AWS DMS-eindpunt te maken en machtigingen verlenen voor CDC-taken (vervang de tijdelijke aanduiding door uw voorkeurswachtwoord):
> CREATE USER repl IDENTIFIED BY '<your-password>';
> GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'repl'@'%';
> GRANT SELECT ON mydev.country_lookup_table TO 'repl'@'%';

Maak en configureer AWS DMS-bronnen om gegevens in de DynamoDB-referentietabel te laden

In deze sectie maakt en configureert u AWS DMS om gegevens te repliceren naar de DynamoDB-referentietabel.

Maak een AWS DMS-replicatie-instantie

Maak eerst een AWS DMS-replicatie-instantie door de volgende stappen uit te voeren:

  1. Kies op de AWS DMS-console: Replicatie-exemplaren in het navigatievenster.
  2. Kies Maak een replicatie-exemplaar.
  3. Onder Instellingenvoor Naam, voer een naam in voor uw instantie.
  4. Onder Instantie configuratievoor Hoge beschikbaarheid, kiezen Dev- of testworkload (Single-AZ).
  5. Onder Connectiviteit en beveiligingvoor VPC-beveiligingsgroepen, kiezen verzuim.
  6. Kies Maak een replicatie-exemplaar.

Creëer Amazon VPC-eindpunten

Optioneel kunt u creëren Amazon VPC-eindpunten voor DynamoDB wanneer u verbinding moet maken met uw DynamoDB-tabel vanuit de AWS DMS-instantie in een particulier netwerk. Zorg er ook voor dat u inschakelen Publiek toegankelijk wanneer u verbinding moet maken met een database buiten uw VPC.

Maak een AWS DMS-broneindpunt

Maak een AWS DMS-broneindpunt door de volgende stappen uit te voeren:

  1. Kies op de AWS DMS-console: Eindpunten in het navigatievenster.
  2. Kies Eindpunt maken.
  3. Voor Type eindpunt, kiezen Bron eindpunt.
  4. Onder Eindpunt configuratievoor Eindpunt-ID, voer een naam in voor uw eindpunt.
  5. Voor Bron-engine, kiezen Amazon Aurora MySQL.
  6. Voor Toegang tot eindpuntdatabase, kiezen Geef toegangsinformatie handmatig op.
  7. Voor Server Naam, voert u de eindpuntnaam van uw Aurora-schrijverinstantie in (bijvoorbeeld mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com).
  8. Voor Haven, ga naar binnen 3306.
  9. Voor gebruikersnaam, voer een gebruikersnaam in voor uw AWS DMS-taak.
  10. Voor Wachtwoord, Vul een wachtwoord in.
  11. Kies Eindpunt maken.

Crate een AWS DMS-doeleindpunt

Maak een AWS DMS-doeleindpunt door de volgende stappen uit te voeren:

  1. Kies op de AWS DMS-console: Eindpunten in het navigatievenster.
  2. Kies Eindpunt maken.
  3. Voor Type eindpunt, kiezen Doeleindpunt.
  4. Onder Eindpunt configuratievoor Eindpunt-ID, voer een naam in voor uw eindpunt.
  5. Voor Doel motor, kiezen Amazon DynamoDB.
  6. Voor Servicetoegangsrol ARN, voer de IAM-rol in voor uw AWS DMS-taak.
  7. Kies Eindpunt maken.

AWS DMS-migratietaken maken

Maak AWS DMS-databasemigratietaken door de volgende stappen uit te voeren:

  1. Kies op de AWS DMS-console: Databasemigratietaken in het navigatievenster.
  2. Kies Taak maken.
  3. Onder Taak configuratievoor Taak-ID, voer een naam in voor uw taak.
  4. Voor Replicatie-exemplaar, kies uw replicatie-exemplaar.
  5. Voor Brondatabase-eindpunt, kies uw broneindpunt.
  6. Voor Doeldatabase-eindpunt, kies uw doeleindpunt.
  7. Voor Migratie type, kiezen Migreer bestaande gegevens en repliceer lopende wijzigingen.
  8. Onder Taakinstellingenvoor Doeltabel voorbereidingsmodus, kiezen Doe niets.
  9. Voor Stop de taak nadat de volledige belasting is voltooid, kiezen Stop niet.
  10. Voor LOB-kolominstellingen, kiezen Beperkte LOB-modus.
  11. Voor Taaklogboeken, inschakelen Schakel CloudWatch-logboeken in en Schakel batch-geoptimaliseerde toepassing in.
  12. Onder Tabeltoewijzingen, kiezen JSON-editor en voer de volgende regels in.

Hier kunt u waarden aan de kolom toevoegen. Met de volgende regels maakt de AWS DMS CDC-taak eerst een nieuwe DynamoDB-tabel met de opgegeven naam in target-table-name. Dan zal het alle records repliceren, de kolommen in de DB-tabel toewijzen aan de attributen in de DynamoDB-tabel.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "mydev", "table-name": "country_lookup_table" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "2", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "mydev", "table-name": "country_lookup_table" }, "target-table-name": "tgt_country_lookup_table", "mapping-parameters": { "partition-key-name": "code", "sort-key-name": "countryname", "exclude-columns": [ "code", "countryname" ], "attribute-mappings": [ { "target-attribute-name": "code", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${code}" }, { "target-attribute-name": "countryname", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${countryname}" } ], "apply-during-cdc": true } } ]
}

DMS-tabeltoewijzing

  1. Kies Taak maken.

Nu is de AWS DMS-replicatietaak gestart.

  1. Wacht op de Status laten zien als Laden voltooid.

DMS-taak

  1. Kies op de DynamoDB-console Tafels in het navigatievenster.
  2. Selecteer de DynamoDB-referentietabel en kies Verken tafelitems om de gerepliceerde records te bekijken.

DynamoDB-referentietabel initiaal

Maak een AWS Glue Data Catalog-tabel en een AWS Glue streaming ETL-taak

In deze sectie maakt u een AWS Glue Data Catalog-tabel en een AWS Glue streaming extractie-, transformatie- en laadtaak (ETL).

Maak een Data Catalog-tabel

Maak een AWS Glue Data Catalog-tabel voor de Kinesis-brongegevensstroom met de volgende stappen:

  1. Kies op de AWS Glue-console: databases voor Gegevenscatalogus in het navigatievenster.
  2. Kies Voeg database toe.
  3. Voor Naam, ga naar binnen my_kinesis_db.
  4. Kies Maak een database.
  5. Kies Tafels voor databases, kies dan Tabel toevoegen.
  6. Voor Naam, ga naar binnen my_stream_src_table.
  7. Voor Database, kiezen my_kinesis_db.
  8. Voor Selecteer het type bron, kiezen Kinesis.
  9. Voor Kinesis-gegevensstroom bevindt zich in, kiezen mijn rekening.
  10. Voor Naam Kinesis-stream, voer een naam in voor uw gegevensstroom.
  11. Voor Classificatieselecteer JSON.
  12. Kies Volgende.
  13. Kies Schema bewerken als JSON, voer de volgende JSON in en kies vervolgens Bespaar.
[ { "Name": "uuid", "Type": "string", "Comment": "" }, { "Name": "country", "Type": "string", "Comment": "" }, { "Name": "itemtype", "Type": "string", "Comment": "" }, { "Name": "saleschannel", "Type": "string", "Comment": "" }, { "Name": "orderpriority", "Type": "string", "Comment": "" }, { "Name": "orderdate", "Type": "string", "Comment": "" }, { "Name": "region", "Type": "string", "Comment": "" }, { "Name": "shipdate", "Type": "string", "Comment": "" }, { "Name": "unitssold", "Type": "string", "Comment": "" }, { "Name": "unitprice", "Type": "string", "Comment": "" }, { "Name": "unitcost", "Type": "string", "Comment": "" }, { "Name": "totalrevenue", "Type": "string", "Comment": "" }, { "Name": "totalcost", "Type": "string", "Comment": "" }, { "Name": "totalprofit", "Type": "string", "Comment": "" }, { "Name": "impressiontime", "Type": "string", "Comment": "" }
]

Glue Catalog-tabelschema

    1. Kies Volgende, kies dan creëren.

Maak een AWS Glue streaming ETL-taak

Vervolgens maakt u een AWS Glue-streamingtaak aan. AWS Glue 3.0 en hoger ondersteunt native Apache Hudi, dus gebruiken we deze native integratie om op te nemen in een Hudi-tabel. Voer de volgende stappen uit om de AWS Glue-streamingtaak te maken:

  1. Kies op de AWS Glue Studio-console: Spark-scripteditor En kies creëren.
  2. Onder Details van de baan tabblad, voor Naam, voer een naam in voor uw job.
  3. Voor IAM-rol, kies de IAM-rol voor uw AWS Glue-taak.
  4. Voor Typeselecteer Spark-streaming.
  5. Voor Lijm versie, kiezen Glue 4.0 – Ondersteunt spark 3.3, Scala 2, Python 3.
  6. Voor Gevraagd aantal werknemers, ga naar binnen 3.
  7. Onder Geavanceerde eigenschappenvoor Taakparameters, kiezen Nieuwe parameter toevoegen.
  8. Voor sleutel, ga naar binnen --conf.
  9. Voor Waarde, ga naar binnen spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false.
  10. Kies Nieuwe parameter toevoegen.
  11. Voor sleutel, ga naar binnen --datalake-formats.
  12. Voor Waarde, ga naar binnen hudi.
  13. Voor Scriptpad, ga naar binnen s3://<S3BucketName>/scripts/.
  14. Voor Tijdelijk pad, ga naar binnen s3://<S3BucketName>/temporary/.
  15. Optioneel, voor Pad naar Spark UI-logboeken, ga naar binnen s3://<S3BucketName>/sparkHistoryLogs/.

Lijmtaakparameter

  1. Op de Script tab, voer het volgende script in de AWS Glue Studio-editor in en kies creëren.

De bijna realtime streamingtaak verrijkt gegevens door een Kinesis-gegevensstroom samen te voegen met een DynamoDB-tabel die regelmatig bijgewerkte referentiegegevens bevat. De verrijkte dataset wordt geladen in de doel Hudi-tabel in het datameer. Vervangen met je bucket die je hebt gemaakt via AWS CloudFormation:

import sys, json
import boto3
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job args = getResolvedOptions(sys.argv,["JOB_NAME"]) # Initialize spark session and Glue context
sc = SparkContext() glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args) # job paramters
dydb_lookup_table = "tgt_country_lookup_table"
kin_src_database_name = "my_kinesis_db" kin_src_table_name = "my_stream_src_table" hudi_write_operation = "upsert" hudi_record_key = "uuid" hudi_precomb_key = "orderdate" checkpoint_path = "s3://<S3BucketName>/streamlab/checkpoint/" s3_output_folder = "s3://<S3BucketName>/output/"
hudi_table = "hudi_table"
hudi_database = "my_kinesis_db" # hudi options additional_options={ "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.write.recordkey.field": hudi_record_key, "hoodie.datasource.hive_sync.database": hudi_database, "hoodie.table.name": hudi_table, "hoodie.consistency.check.enabled": "true", "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.NonpartitionedKeyGenerator", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.NonPartitionedExtractor", "hoodie.datasource.write.hive_style_partitioning": "false", "hoodie.datasource.write.precombine.field": hudi_precomb_key, "hoodie.bulkinsert.shuffle.parallelism": "4", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.write.operation": hudi_write_operation, "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
} # Scan and load the reference data table from DynamoDB into AWS Glue DynamicFrames using boto3 API.
def readDynamoDb(): dynamodb = boto3.resource(“dynamodb”) table = dynamodb.Table(dydb_lookup_table) response = table.scan() items = response[“Items”] jsondata = sc.parallelize(items) lookupDf = glueContext.read.json(jsondata) return lookupDf # Load the Amazon Kinesis data stream from Amazon Glue Data Catalog.
source_df = glueContext.create_data_frame.from_catalog( database=kin_src_database_name, table_name=kin_src_table_name, transformation_ctx=”source_df”, additional_options={“startingPosition”: “TRIM_HORIZON”},
) # As part of batch processing, implement the transformation logic for joining streaming data frames with reference data frames.
def processBatch(data_frame, batchId): if data_frame.count() > 0: # Refresh the dymanodb table to pull latest snapshot for each microbatch country_lookup_df = readDynamoDb() final_frame = data_frame.join( country_lookup_df, data_frame["country"] == country_lookup_df["countryname"], 'left' ).drop( "countryname", "country", "unitprice", "unitcost", "totalrevenue", "totalcost", "totalprofit" ) # Script generated for node my-lab-hudi-connector final_frame.write.format("hudi") .options(**additional_options) .mode("append") .save(s3_output_folder) try: glueContext.forEachBatch( frame=source_df, batch_function=processBatch, options={"windowSize": "60 seconds", "checkpointLocation": checkpoint_path}, )
except Exception as e: print(f"Error is @@@ ....{e}")

  1. Kies lopen om de streamingtaak te starten.

De volgende schermafbeelding toont voorbeelden van de DataFrames data_frame, country_lookup_df en final_frame.

Lijm taakloguitvoer aanvankelijk

De AWS Glue-taak heeft met succes records samengevoegd die afkomstig zijn van de Kinesis-gegevensstroom en de referentietabel in DynamoDB, en vervolgens de samengevoegde records opgenomen in Amazon S3 in Hudi-indeling.

Maak en voer een Python-script uit om voorbeeldgegevens te genereren en laad deze in de Kinesis-gegevensstroom

In deze sectie maakt en voert u een Python uit om voorbeeldgegevens te genereren en deze in de Kinesis-brongegevensstroom te laden. Voer de volgende stappen uit:

  1. Log in op AWS Cloud9, uw EC2-instantie of een andere computerhost die records in uw gegevensstroom plaatst.
  2. Maak een Python-bestand met de naam generate-data-for-kds.py:
$ python3 generate-data-for-kds.py

  1. Open het Python-bestand en voer het volgende script in:
import json
import random
import boto3
import time STREAM_NAME = "<mystreamname>" def get_data(): return { "uuid": random.randrange(0, 1000001, 1), "country": random.choice( [ "United Arab Emirates", "China", "India", "United Kingdom", "United States of America", ] ), "itemtype": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ), "saleschannel": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ), "orderpriority": random.choice(["H", "L", "M", "C"]), "orderdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12", "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", "7/15/14", "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", "1/27/17", "2/25/17", "3/10/17", "4/1/17", ] ), "region": random.choice( ["Asia" "Europe", "Americas", "Middle Eastern", "Africa"] ), "shipdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12", "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", "7/15/14", "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", "1/27/17", "2/25/17", "3/10/17", "4/1/17", ] ), "unitssold": random.choice( [ "8217", "3465", "8877", "2882", "70", "7044", "6307", "2384", "1327", "2572", "8794", "4131", "5793", "9091", "4314", "9085", "5270", "5459", "1982", "8245", "4860", "4656", "8072", "65", "7864", "9778", ] ), "unitprice": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ), "unitcost": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ), "totalrevenue": random.choice( [ "1253749.86", "712750.5", "3745117.53", "1925954.14", "30604", "1448950.8", "689228.96", "22242.72", "145014.56", "23996.76", "961008.32", "337626.63", "1478837.04", "6075242.57", "887389.8", "742517.05", "3431876.7", "3648085.93", "161988.86", "673863.85", "1240660.8", "380534.88", "882108.16", "16593.2", "5255275.28", "463966.1", ] ), "totalcost": random.choice( [ "800664.48", "405786.15", "3237353.13", "1448320.28", "18433.1", "824922.84", "226042.88", "16497.28", "47559.68", "17798.24", "315176.96", "234103.77", "923520.06", "4568591.14", "505212.54", "514846.95", "2766539.2", "2743365.86", "112319.94", "467244.15", "774781.2", "263855.52", "289300.48", "10362.3", "3951974.56", "310842.62", ] ), "totalprofit": random.choice( [ "453085.38", "306964.35", "507764.4", "477633.86", "12170.9", "624027.96", "463186.08", "5745.44", "97454.88", "6198.52", "645831.36", "103522.86", "555316.98", "1506651.43", "382177.26", "227670.1", "665337.5", "904720.07", "49668.92", "206619.7", "465879.6", "116679.36", "592807.68", "6230.9", "1303300.72", "153123.48", ] ), "impressiontime": random.choice( [ "2022-10-24T02:27:41Z", "2022-10-24T02:27:41Z", "2022-11-24T02:27:41Z", "2022-12-24T02:27:41Z", "2022-13-24T02:27:41Z", "2022-14-24T02:27:41Z", "2022-15-24T02:27:41Z", ] ), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) time.sleep(2) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

Dit script plaatst elke 2 seconden een Kinesis-gegevensstroomrecord.

Simuleer het bijwerken van de referentietabel in het Aurora MySQL-cluster

Nu zijn alle bronnen en configuraties klaar. Voor dit voorbeeld willen we toevoegen een 3-cijferige landcode naar de referentietabel. Laten we records in de Aurora MySQL-tabel bijwerken om wijzigingen te simuleren. Voer de volgende stappen uit:

  1. Zorg ervoor dat de AWS Glue-streamingtaak al actief is.
  2. Maak opnieuw verbinding met het primaire DB-exemplaar, zoals eerder beschreven.
  3. Voer uw SQL-opdrachten in om records bij te werken:
> UPDATE country_lookup_table SET combinedname='US-USA-US' WHERE code='US';
> UPDATE country_lookup_table SET combinedname='CA-CAN-Canada' WHERE code='CA';
> UPDATE country_lookup_table SET combinedname='CN-CHN-China' WHERE code='CN';
> UPDATE country_lookup_table SET combinedname='IN-IND-India' WHERE code='IN';

Nu is de referentietabel in de Aurora MySQL-brondatabase bijgewerkt. Vervolgens worden de wijzigingen automatisch gerepliceerd naar de referentietabel in DynamoDB.

DynamoDB-referentietabel bijgewerkt

De volgende tabellen tonen records in data_frame, country_lookup_df en final_frame. in country_lookup_df en final_frame combinedname kolom heeft waarden die zijn opgemaakt als <2-digit-country-code>-<3-digit-country-code>-<country-name>, waaruit blijkt dat de gewijzigde records in de tabel waarnaar wordt verwezen, worden weerspiegeld in de tabel zonder de AWS Glue-streamingtaak opnieuw te starten. Het betekent dat de AWS Glue-taak met succes de inkomende records uit de Kinesis-gegevensstroom samenvoegt met de referentietabel, zelfs wanneer de referentietabel verandert.
Uitvoer van lijmtaaklogboek bijgewerkt

Voer een query uit op de Hudi-tabel met behulp van Athena

Laten we de Hudi-tabel opvragen met behulp van Athena om de records in de bestemmingstabel te bekijken. Voer de volgende stappen uit:

  1. Zorg ervoor dat het script en de AWS Glue Streaming-taak nog steeds werken:
    1. Het Python-script (generate-data-for-kds.py) loopt nog steeds.
    2. De gegenereerde data wordt naar de datastroom gestuurd.
    3. De AWS Glue-streamingtaak is nog steeds actief.
  2. Voer op de Athena-console de volgende SQL uit in de query-editor:
select shipdate, unitssold, impressiontime, code,combinedname from <database>.<table>
where combinedname is not null
limit 10;

Het volgende queryresultaat toont de records die zijn verwerkt voordat de tabel waarnaar wordt verwezen, is gewijzigd. Opnames in de combinedname kolom zijn vergelijkbaar met <2-digit-country-code>-<country-name>.

Athena-queryresultaat initiaal

Het volgende queryresultaat toont de records die zijn verwerkt nadat de tabel waarnaar wordt verwezen, is gewijzigd. Opnames in de combinedname kolom zijn vergelijkbaar met <2-digit-country-code>-<3-digit-country-code>-<country-name>.

Athena-queryresultaat bijgewerkt

U begrijpt nu dat de gewijzigde referentiegegevens met succes worden weerspiegeld in de doel-Hudi-tabel die records samenvoegt uit de Kinesis-gegevensstroom en de referentiegegevens in DynamoDB.

Opruimen

Ruim als laatste stap de bronnen op:

  1. Verwijder de Kinesis-gegevensstroom.
  2. Verwijder de AWS DMS-migratietaak, het eindpunt en de replicatie-instantie.
  3. Stop en verwijder de AWS Glue-streamingtaak.
  4. Verwijder de AWS Cloud9-omgeving.
  5. Verwijder de CloudFormation-sjabloon.

Conclusie

Het bouwen en onderhouden van een transactioneel datameer dat realtime gegevensopname en -verwerking omvat, heeft meerdere variabele componenten en er moeten beslissingen worden genomen, zoals welke opnameservice moet worden gebruikt, hoe uw referentiegegevens moeten worden opgeslagen en welk raamwerk voor transactiegegevensmeer moet worden gebruikt. In dit bericht hebben we de implementatiedetails van een dergelijke pijplijn gegeven, met behulp van AWS-native componenten als bouwstenen en Apache Hudi als het open-sourceframework voor een transactioneel datameer.

Wij zijn van mening dat deze oplossing een startpunt kan zijn voor organisaties die een nieuw datameer met dergelijke vereisten willen implementeren. Bovendien zijn de verschillende componenten volledig plugbaar en kunnen ze worden gecombineerd met bestaande datalakes om nieuwe vereisten aan te pakken of bestaande vereisten te migreren en hun pijnpunten aan te pakken.


Over de auteurs

Manische Kola is een Data Lab Solutions Architect bij AWS, waar hij nauw samenwerkt met klanten in verschillende sectoren om cloud-native oplossingen te ontwerpen voor hun data-analyse en AI-behoeften. Hij werkt samen met klanten op hun AWS-reis om hun bedrijfsproblemen op te lossen en schaalbare prototypes te bouwen. Voordat hij bij AWS kwam, heeft Manish onder meer ervaring met het helpen van klanten bij het implementeren van datawarehouse-, BI-, data-integratie- en datalake-projecten.

Santosh Kotagiri is een Solutions Architect bij AWS met ervaring in data-analyse en cloudoplossingen die leiden tot tastbare bedrijfsresultaten. Zijn expertise ligt in het ontwerpen en implementeren van schaalbare data-analyseoplossingen voor klanten in verschillende sectoren, met een focus op cloud-native en open-source services. Hij is gepassioneerd door het gebruik van technologie om bedrijfsgroei te stimuleren en complexe problemen op te lossen.

Chiho Sugimoto is een Cloud Support Engineer in het AWS Big Data Support-team. Ze is gepassioneerd om klanten te helpen bij het bouwen van datameren met behulp van ETL-workloads. Ze houdt van planetaire wetenschap en geniet ervan om in het weekend de asteroïde Ryugu te bestuderen.

Noritaka Sekiyama is een Principal Big Data Architect in het AWS Glue-team. Hij is verantwoordelijk voor het bouwen van software-artefacten om klanten te helpen. In zijn vrije tijd fietst hij graag met zijn nieuwe racefiets.

spot_img

Laatste intelligentie

spot_img

Chat met ons

Hallo daar! Hoe kan ik u helpen?