Zephyrnet-logo

Doorbreek datasilo's en stream uw CDC-gegevens met Amazon Redshift-streaming en Amazon MSK | Amazon-webservices

Datum:

Gegevens verliezen na verloop van tijd hun waarde. We horen van onze klanten dat ze de zakelijke transacties graag realtime willen analyseren. Traditioneel gebruikten klanten batchgebaseerde benaderingen voor het verplaatsen van gegevens van operationele systemen naar analytische systemen. Batchbelasting kan één of meerdere keren per dag plaatsvinden. Een batchgebaseerde aanpak kan latentie in de gegevensbeweging introduceren en de waarde van gegevens voor analyses verminderen. Op Change Data Capture (CDC) gebaseerde benadering is naar voren gekomen als alternatief voor batchgebaseerde benaderingen. Een op CDC gebaseerde aanpak legt de gegevenswijzigingen vast en maakt ze beschikbaar in datawarehouses voor verdere analyse in realtime.

CDC houdt wijzigingen bij die zijn aangebracht in de brondatabase, zoals invoegingen, updates en verwijderingen, en werkt deze wijzigingen voortdurend bij in de doeldatabase. Wanneer de CDC hoogfrequent is, verandert de brondatabase snel en moet de doeldatabase (dat wil zeggen meestal een datawarehouse) deze veranderingen vrijwel in realtime weerspiegelen.

Met de explosie van data is het aantal datasystemen in organisaties gegroeid. Datasilo’s zorgen ervoor dat data zich in verschillende bronnen bevinden, wat het lastig maakt om analyses uit te voeren.

Om diepere en rijkere inzichten te krijgen, kunt u alle wijzigingen uit verschillende datasilo's op één plek samenbrengen, zoals een datawarehouse. Dit post laat zien hoe u streaming-opname kunt gebruiken om gegevens naar toe te brengen Amazon roodverschuiving.

Roodverschuiving streaming-opname biedt gegevensopname met lage latentie en hoge doorvoer, waardoor klanten binnen enkele seconden in plaats van minuten inzichten kunnen verkrijgen. Het is eenvoudig in te stellen en neemt rechtstreeks streaminggegevens op in uw datawarehouse Amazon Kinesis-gegevensstromen en Amazon beheerde streaming voor Kafka (Amazon MSK) zonder dat je hoeft in te grijpen Amazon eenvoudige opslagservice (Amazone S3). U kunt gerealiseerde weergaven maken met behulp van SQL-instructies. Daarna kunt u met behulp van de gematerialiseerde weergavevernieuwing honderden megabytes aan gegevens per seconde opnemen.

Overzicht oplossingen

In dit bericht creëren we een gegevensreplicatie met lage latentie tussen Amazon Aurora MySQL naar Amazon Redshift Data Warehouse, met behulp van Roodverschuiving streaming-opname van Amazon MSK. Met Amazon MSK streamen we veilig gegevens met een volledig beheerde, zeer beschikbare Apache Kafka-service. Apache Kafka is een open-source gedistribueerd platform voor het streamen van evenementen dat door duizenden bedrijven wordt gebruikt voor hoogwaardige datapijplijnen, streaminganalyses, data-integratie en bedrijfskritische applicaties. We slaan CDC-gebeurtenissen voor een bepaalde tijd op in Amazon MSK, waardoor het mogelijk is om CDC-gebeurtenissen naar extra bestemmingen te leveren, zoals het Amazon S3-datameer.

Wij zetten in Debezium MySQL bron Kafka-connector op Amazon MSK Connect. Amazon MSK Connect maakt het eenvoudig om connectoren te implementeren, monitoren en automatisch te schalen die gegevens verplaatsen tussen Apache Kafka-clusters en externe systemen zoals databases, bestandssystemen en zoekindices. Amazon MSK Connect is volledig compatibel met Apache Kafka Connect, waarmee u uw Apache Kafka Connect-applicaties kunt tillen en verplaatsen zonder codewijzigingen.

Deze oplossing maakt gebruik van Amazon Aurora MySQL die de voorbeelddatabase host salesdb. Gebruikers van de database kunnen de bewerkingen INSERT, UPDATE en DELETE op rijniveau uitvoeren om de wijzigingsgebeurtenissen in het voorbeeld te produceren salesdb database. Debezium MySQL bron Kafka Connector leest deze wijzigingsgebeurtenissen en verzendt deze naar de Kafka-onderwerpen in Amazon MSK. Amazon Redshift leest vervolgens de berichten uit de Kafka-onderwerpen van Amazon MSK met behulp van de Amazon Redshift Streaming-functie. Amazon Redshift slaat deze berichten op met behulp van gematerialiseerde weergaven en verwerkt ze zodra ze binnenkomen.

U kunt zien hoe CDC een gebeurtenis maakt door naar dit voorbeeld te kijken hier. We gaan het OP-veld gebruiken. De verplichte string beschrijft het type bewerking dat ervoor zorgde dat de connector de gebeurtenis genereerde, in onze oplossing voor verwerking. In dit voorbeeld geeft c aan dat de bewerking een rij heeft gemaakt. Geldige waarden voor het OP-veld zijn:

  • c = creëren
  • u = bijwerken
  • d = verwijderen
  • r = lezen (geldt alleen voor snapshots)

Het volgende diagram illustreert de oplossingsarchitectuur:

Deze afbeelding toont de architectuur van de oplossing. we lezen van Amazon Aurora met behulp van de Debezium-connector voor MySQL. Debezium Connector voor MySQL wordt geïmplementeerd op Amazon MSK Connect en neemt de gebeurtenissen binnen Amazon MSK op die verder worden opgenomen in Amazon Redshift MV

De workflow van de oplossing bestaat uit de volgende stappen:

  • Amazon Aurora MySQL heeft een binair logboek (dwz binlog) dat alle bewerkingen (INSERT, UPDATE, DELETE) registreert in de volgorde waarin ze in de database zijn vastgelegd.
  • Amazon MSK Connect voert de bron-Kafka-connector uit, genaamd Debezium-connector voor MySQL, leest de binlog, produceert wijzigingsgebeurtenissen voor INSERT-, UPDATE- en DELETE-bewerkingen op rijniveau en verzendt de wijzigingsgebeurtenissen naar Kafka-onderwerpen in Amazon MSK.
  • Een door Amazon Redshift ingericht cluster is de streamconsument en kan berichten lezen van Kafka-onderwerpen van Amazon MSK.
  • Een gematerialiseerde weergave in Amazon Redshift is het landingsgebied voor gegevens die uit de stream worden gelezen en die worden verwerkt zodra deze binnenkomen.
  • Wanneer de gerealiseerde weergave wordt vernieuwd, wijzen Amazon Redshift-rekenknooppunten een groep Kafka-partities toe aan een rekensegment.
  • Elk segment verbruikt gegevens van de toegewezen partities totdat de weergave pariteit bereikt met de laatste Offset voor het Kafka-onderwerp.
  • Daaropvolgende gerealiseerde weergave vernieuwt leesgegevens vanaf de laatste offset van de vorige vernieuwing totdat deze pariteit bereikt met de onderwerpgegevens.
  • Binnen de Amazon Redshift hebben we een opgeslagen procedure gemaakt om CDC-records te verwerken en de doeltabel bij te werken.

Voorwaarden

In dit bericht wordt ervan uitgegaan dat je een actieve Amazon MSK Connect-stack in je omgeving hebt met de volgende componenten:

  • Aurora MySQL die een database host. In dit bericht gebruik je de voorbeelddatabase salesdb.
  • De Debezium MySQL-connector die draait op Amazon MSK Connect en die Amazon MSK in uw Amazon virtuele privécloud (Amazone VPC).
  • Amazon MSK-cluster

Als je geen Amazon MSK Connect-stack hebt, volg dan de instructies in de MSK Connect lab-opstelling en controleer of uw bronconnector repliceert gegevenswijzigingen in de Amazon MSK-onderwerpen.

U moet het Amazon Redshift-cluster inrichten in dezelfde VPC als het Amazon MSK-cluster. Als u er nog geen heeft geïmplementeerd, volgt u de stappen hier in de AWS-documentatie.

We gebruiken AWS Identity en Access Management (AWS IAM) authenticatie voor communicatie tussen Amazon MSK en Amazon Redshift-cluster. Zorg ervoor dat u een AWS IAM-rol heeft aangemaakt met een vertrouwensbeleid waarmee uw Amazon Redshift-cluster de rol op zich kan nemen. Zie voor informatie over het configureren van het vertrouwensbeleid voor de AWS IAM-rol Amazon Redshift autoriseren om namens u toegang te krijgen tot andere AWS-services. Nadat deze is gemaakt, moet de rol het volgende AWS IAM-beleid hebben, dat toestemming geeft voor communicatie met het Amazon MSK-cluster.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "MSKIAMpolicy",
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic",
                "kafka-cluster:Connect"
            ],
            "Resource": [
                "arn:aws:kafka:*:0123456789:cluster/xxx/xxx",
                "arn:aws:kafka:*:0123456789:topic/*/*/*"
            ]
        },
        {
            "Sid": "MSKPolicy",
            "Effect": "Allow",
            "Action": [
                "kafka:GetBootstrapBrokers"
            ],
            "Resource": "arn:aws:kafka:*:0123456789:cluster/xxx/xxx"
        }
    ]
}

Vervang het ARN met xxx uit het bovenstaande voorbeeldbeleid door de ARN van uw Amazon MSK-cluster.

  • Controleer ook of het Amazon Redshift-cluster toegang heeft tot het Amazon MSK-cluster. Voeg in de beveiligingsgroep van Amazon Redshift Cluster de inkomende regel toe voor de MSK-beveiligingsgroep die poort 9098 toestaat. Om te zien hoe u de roodverschuivingsclusterbeveiligingsgroep beheert, raadpleegt u VPC-beveiligingsgroepen beheren voor een cluster.

afbeelding laat zien hoe u de inkomende regel kunt toevoegen voor de MSK-beveiligingsgroep die poort 9098 toestaat, in de beveiligingsgroep van Amazon Redshift Cluster

  • En voeg in de beveiligingsgroep van het Amazon MSK-cluster de inkomende regel toe die poort 9098 toestaat als leider-IP-adres van uw Amazon Redshift Cluster, zoals weergegeven in het volgende diagram. U kunt het IP-adres voor het leiderknooppunt van uw Amazon Redshift Cluster vinden op het tabblad Eigenschappen van het Amazon Redshift-cluster AWS-beheerconsole.

afbeelding laat zien hoe u de inkomende regel kunt toevoegen die poort 9098 toestaat voor het leider-IP-adres van uw Amazon Redshift Cluster, in de beveiligingsgroep van het Amazon MSK-cluster

walkthrough

Navigeer naar de Amazon Redshift-service vanuit de AWS Management Console en stel vervolgens Amazon Redshift-streamingopname voor Amazon MSK in door de volgende stappen uit te voeren:

  1. Schakel_case_sensitive_identifier in op true – Als u de standaardparametergroep voor Amazon Redshift Cluster gebruikt, kunt u deze niet instellen enable_case_sensitive_identifier naar waar. Je kunt nieuwe creëren parametergroep Met enable_case_sensitive_identifier op true en koppel het aan het Amazon Redshift-cluster. Nadat u parameterwaarden hebt gewijzigd, moet u alle clusters die aan de gewijzigde parametergroep zijn gekoppeld opnieuw opstarten. Het kan enkele minuten duren voordat het Amazon Redshift-cluster opnieuw opstart.

Deze configuratie waarde die bepaalt of naam-ID's van databases, tabellen en kolommen hoofdlettergevoelig zijn. Als u klaar bent, opent u een nieuwe Amazon Redshift Query Editor V2, zodat de door ons aangebrachte configuratiewijzigingen worden weergegeven en volgt u vervolgens de volgende stappen.

  1. Maak een extern schema dat is toegewezen aan de streaminggegevensbron.
CREATE EXTERNAL SCHEMA MySchema
FROM MSK
IAM_ROLE 'arn:aws:iam::YourRole:role/msk-redshift-streaming'
AUTHENTICATION IAM
CLUSTER_ARN 'arn:aws:kafka:us-east-1:2073196*****:cluster/MSKCluster-msk-connect-lab/849b47a0-65f2-439e-b181-1038ea9d4493-10'; // Replace last part with your cluster ARN, this is just for example.//

Als u klaar bent, controleert u of u onderstaande tabellen ziet die zijn gemaakt met MSK-onderwerpen:

afbeelding toont tabellen gemaakt op basis van MSK Topics

  1. Maak een gerealiseerde weergave die verwijst naar het externe schema.
CREATE MATERIALIZED VIEW customer_debezium AUTO REFRESH YES AS
SELECT
*,
json_parse(kafka_value) as payload
from
"dev"."myschema"."salesdb.salesdb.CUSTOMER" ; // Replace myshecma with name you have given to your external schema in step 2 //

Nu kunt u de nieuw gemaakte gematerialiseerde weergave customer_debezium opvragen met behulp van de onderstaande opdracht.

SELECT * FROM "dev"."public"."customer_debezium" order by refresh_time desc;

Controleer of de gerealiseerde weergave is gevuld met de CDC-records

  1. VERNIEUW GEMATERIALISEERDE WEERGAVE (optioneel). Deze stap is optioneel zoals we al hebben gespecificeerd AUTO REFRESH AS YES tijdens het maken van MV (gematerialiseerde weergave).
REFRESH MATERIALIZED VIEW "dev"."public"."customer_debezium";

OPMERKING: Bovenstaande weergave wordt automatisch vernieuwd, wat betekent dat als u de records niet onmiddellijk ziet, u een paar seconden moet wachten en de select-instructie opnieuw moet uitvoeren. De Amazon Redshift streaming-opnameweergave wordt ook geleverd met de optie van handmatig vernieuwen, waarmee u het object handmatig kunt vernieuwen. U kunt de volgende query gebruiken die streaminggegevens onmiddellijk naar het Redshift-object haalt.

SELECT * FROM "dev"."public"."customer_debezium" order by refresh_time desc;

images shows records from the customer_debezium MV

Verwerk CDC-records in Amazon Redshift

In de volgende stappen maken we de verzameltabel om de CDC-gegevens op te slaan. Dit is de doeltabel die de laatste momentopname bevat en de opgeslagen procedure om CDC-records te verwerken en bij te werken in de doeltabel.

  1. Verzameltabel maken: De opsteltafel is een tijdelijke tabel die alle gegevens bevat die zullen worden gebruikt om wijzigingen aan te brengen in de doel tafel, inclusief zowel updates als inserts.
CREATE TABLE public.customer_stg (
customer_id character varying(256) ENCODE raw
distkey
,
customer_name character varying(256) ENCODE lzo,
market_segment character varying(256) ENCODE lzo,
ts_ms bigint ENCODE az64,
op character varying(2) ENCODE lzo,
record_rank smallint ENCODE az64,
refresh_time timestamp without time zone ENCODE az64
) DISTSTYLE KEY
SORTKEY
(customer_id); // In this particular example, we have used LZO encoding as LZO encoding works well for CHAR and VARCHAR columns that store very long character strings. You can use BYTEDICT as well if it matches your use case. //

  1. Doeltabel maken

Wij gebruiken customer_target tabel om de verwerkte CDC-gebeurtenissen te laden.

CREATE TABLE public.customer_target (
customer_id character varying(256) ENCODE raw
distkey
,
customer_name character varying(256) ENCODE lzo,
market_segment character varying(256) ENCODE lzo,
refresh_time timestamp without time zone ENCODE az64
) DISTSTYLE KEY
SORTKEY
(customer_id);

  1. creëren Last_extract_time debezium-tabel en Dummy-waarde invoegen.

We moeten de tijdstempel van de laatst geëxtraheerde CDC-gebeurtenissen opslaan. Wij gebruiken van debezium_last_extract tafel voor dit doel. Voor de eerste registratie voegen we een dummywaarde in, waardoor we een vergelijking kunnen maken tussen de huidige en de volgende CDC-verwerkingstijdstempel.

CREATE TABLE public.debezium_last_extract (
process_name character varying(256) ENCODE lzo,
latest_refresh_time timestamp without time zone ENCODE az64
) DISTSTYLE AUTO;

Insert into public.debezium_last_extract VALUES ('customer','1983-01-01 00:00:00');

SELECT * FROM "dev"."public"."debezium_last_extract";

  1. Maak een opgeslagen procedure

Deze opgeslagen procedure verwerkt de CDC-records en werkt de doeltabel bij met de laatste wijzigingen.

CREATE OR REPLACE PROCEDURE public.incremental_sync_customer()

LANGUAGE plpgsql

AS $$

DECLARE

sql VARCHAR(MAX) := '';

max_refresh_time TIMESTAMP;

staged_record_count BIGINT :=0;

BEGIN

-- Get last loaded refresh_time number from target table

sql := 'SELECT MAX(latest_refresh_time) FROM debezium_last_extract where process_name =''customer'';';

EXECUTE sql INTO max_refresh_time;

-- Truncate staging table

EXECUTE 'TRUNCATE customer_stg;';

-- Insert (and transform) latest change record for member with sequence number greater than last loaded sequence number into temp staging table

EXECUTE 'INSERT INTO customer_stg ('||

'select coalesce(payload.after."CUST_ID",payload.before."CUST_ID") ::varchar as customer_id,payload.after."NAME"::varchar as customer_name,payload.after."MKTSEGMENT" ::varchar as market_segment, payload.ts_ms::bigint,payload."op"::varchar, rank() over (partition by coalesce(payload.after."CUST_ID",payload.before."CUST_ID")::varchar order by payload.ts_ms::bigint desc) as record_rank, refresh_time from CUSTOMER_debezium where refresh_time > '''||max_refresh_time||''');';

sql := 'SELECT COUNT(*) FROM customer_stg;';

EXECUTE sql INTO staged_record_count;

RAISE INFO 'Staged member records: %', staged_record_count;

// replace customer_stg with your staging table name //

-- Delete records from target table that also exist in staging table (updated/deleted records)

EXECUTE 'DELETE FROM customer_target using customer_stg WHERE customer_target.customer_id = customer_stg.customer_id';

// replace customer_target with your target table name //

-- Insert all records from staging table into target table

EXECUTE 'INSERT INTO customer_target SELECT customer_id,customer_name, market_segment, refresh_time FROM customer_stg where record_rank =1 and op <> ''d''';

-- Insert max refresh time to control table

EXECUTE 'INSERT INTO debezium_last_extract SELECT ''customer'', max(refresh_time) from customer_target ';

END

$$

afbeeldingen tonen de opgeslagen procedure met de naam incremental_sync_customer gemaakt in de bovenstaande stap

Test de oplossing

Voorbeeld bijwerken salesdb gehost op Amazon Aurora

  1. Dit wordt jouw Amazon Aurora database en we hebben er toegang toe vanuit Amazon Elastic Compute Cloud (Amazon EC2) exemplaar met Name= KafkaClientInstance.
  2. Alstublieft vervang het Amazon Aurora-eindpunt door de waarde van uw Amazon Aurora-eindpunt en voer de volgende opdracht uit en de use salesdb.
mysql -f -u master -h mask-lab-salesdb.xxxx.us-east-1.rds.amazonaws.com --password=S3cretPwd99

image shows the details of the RDS for MySQL

  1. Voer een update uit, voeg in en verwijder een van de gemaakte tabellen. Je kunt de update ook meerdere keren uitvoeren om de laatst bijgewerkte record later in Amazon Redshift te controleren.

afbeelding toont de invoeg-, update- en verwijderbewerkingen die worden uitgevoerd op RDS voor MySQL

  1. Roep de opgeslagen procedure aan incrementele_synchronisatie_klant gemaakt in de bovenstaande stappen vanuit Amazon Redshift Query Editor v2. U kunt proc handmatig uitvoeren met de volgende opdracht of plan het in.
call incremental_sync_customer();
  1. Controleer de doeltabel voor de laatste wijzigingen. Deze stap is bedoeld om de nieuwste waarden in de doeltabel te controleren. U zult zien dat alle updates en verwijderingen die u in de brontabel hebt uitgevoerd, bovenaan worden weergegeven als resultaatvolgorde refresh_time.
SELECT * FROM "dev"."public"."customer_target" order by refresh_time desc;

image shows the records from from customer_target table in descending order

De oplossing uitbreiden

In deze oplossing hebben we CDC-verwerking voor de klantentabel laten zien, en u kunt dezelfde aanpak gebruiken om deze uit te breiden naar andere tabellen in het voorbeeld salesdb database of voeg meer databases toe MSK Connect-configuratie eigendom database.include.list.

Onze voorgestelde aanpak kan werken met elke MySQL-bron die wordt ondersteund door Debezium MySQL bron Kafka-connector. Op dezelfde manier moet u, om dit voorbeeld uit te breiden naar uw werkbelastingen en gebruiksscenario's, de staging- en doeltabellen maken volgens het schema van de brontabel. Dan moet je de coalesce(payload.after."CUST_ID",payload.before."CUST_ID")::varchar as customer_id instructies met de kolomnamen en typen in uw bron- en doeltabellen. Zoals in het voorbeeld in dit bericht hebben we LZO-codering gebruikt als LZO-codering, wat goed werkt voor CHAR- en VARCHAR-kolommen die zeer lange tekenreeksen opslaan. U kunt BYTEDICT ook gebruiken als dit overeenkomt met uw gebruiksscenario. Een andere overweging waarmee u rekening moet houden bij het maken van doel- en faseringstabellen is het kiezen van een distributiestijl en -sleutel op basis van gegevens in de brondatabase. Hier hebben we de distributiestijl als sleutel gekozen met Customer_id, die gebaseerd is op brongegevens en schema-updates door de genoemde best practices te volgen hier.

Schoonmaken

  1. Verwijder alle Amazon Redshift-clusters
  2. Verwijder Amazon MSK Cluster en MSK Connect Cluster
  3. Als u Amazon Redshift-clusters niet wilt verwijderen, kunt u MV en tabellen die tijdens dit bericht zijn gemaakt handmatig verwijderen met behulp van onderstaande opdrachten:
drop MATERIALIZED VIEW customer_debezium;
drop TABLE public.customer_stg;
drop TABLE public.customer_target;
drop TABLE public.debezium_last_extract;

Verwijder ook de inkomende beveiligingsregels die zijn toegevoegd aan uw Amazon Redshift- en Amazon MSK-clusters, samen met de AWS IAM-rollen die zijn aangemaakt in het gedeelte Vereisten.

Conclusie

In dit bericht hebben we je laten zien hoe Amazon Redshift-streaming-opname zorgde voor een hoge doorvoer en lage latentie-opname van streaminggegevens van Amazon Kinesis-gegevensstromen en Amazon MSK naar een gematerialiseerde visie van Amazon Redshift. We hebben de snelheid verhoogd en de kosten voor het streamen van gegevens naar Amazon Redshift verlaagd door de noodzaak om gebruik te maken van tussenliggende diensten te elimineren.

Verder hebben we ook laten zien hoe CDC-gegevens na generatie snel kunnen worden verwerkt, met behulp van een eenvoudige SQL-interface waarmee klanten bijna realtime analyses kunnen uitvoeren op verschillende gegevensbronnen (bijv. Internet-of-Things [IoT]-apparaten, systeemtelemetrie gegevens of clickstreamgegevens) van een drukke website of applicatie.

Terwijl u de opties verkent om bijna realtime analyses voor uw CDC-gegevens te vereenvoudigen en mogelijk te maken,

We hopen dat dit bericht u waardevolle begeleiding biedt. We verwelkomen alle gedachten of vragen in het opmerkingengedeelte.


Over de auteurs

Umesh Chaudhari is een streamingoplossingenarchitect bij AWS. Hij werkt samen met AWS-klanten om realtime gegevensverwerkingssystemen te ontwerpen en bouwen. Hij heeft 13 jaar werkervaring in software-engineering, waaronder het ontwerpen, ontwerpen en ontwikkelen van data-analysesystemen.

Vishal Khatri is Sr. Technical Account Manager en Analytics-specialist bij AWS. Vishal werkt samen met de staats- en lokale overheid en helpt bij het opleiden en delen van best practices met klanten door leiding te geven aan en eigenaar te zijn van de ontwikkeling en levering van technische inhoud, terwijl ze end-to-end klantoplossingen ontwerpen.

spot_img

Laatste intelligentie

spot_img