Zephyrnet-logo

Best practices om vrijwel realtime analyses te implementeren met behulp van Amazon Redshift Streaming Ingestion met Amazon MSK | Amazon-webservices

Datum:

Amazon roodverschuiving is een volledig beheerd, schaalbaar datawarehouse in de cloud dat uw tijd tot inzichten versnelt met snelle, eenvoudige en veilige analyses op schaal. Tienduizenden klanten vertrouwen op Amazon Redshift om exabytes aan gegevens te analyseren en complexe analytische queries uit te voeren, waardoor dit het meest gebruikte clouddatawarehouse is. U kunt binnen enkele seconden analyses uitvoeren en schalen op al uw gegevens, zonder dat u uw datawarehouse-infrastructuur hoeft te beheren.

U kunt gebruik maken van de Amazon Redshift streaming-opname mogelijkheid om uw analysedatabases bijna in realtime bij te werken. Amazon Redshift-streaming-opname vereenvoudigt datapijplijnen doordat je gematerialiseerde weergaven rechtstreeks bovenop datastromen kunt creëren. Met deze mogelijkheid in Amazon Redshift kun je Structured Query Language (SQL) gebruiken om verbinding te maken met gegevens uit gegevensstromen en deze direct op te nemen, zoals Amazon Kinesis-gegevensstromen or Amazon Managed Streaming voor Apache Kafka (Amazon MSK) datastromen en trek gegevens rechtstreeks naar Amazon Redshift.

In dit bericht bespreken we de best practices voor het implementeren van bijna-realtime analyses met behulp van Amazon Redshift-streamingopname met Amazon MSK.

Overzicht van de oplossing

We doorlopen een voorbeeldpijplijn om gegevens van een MSK-onderwerp in Amazon Redshift op te nemen met behulp van Amazon Redshift-streamingopname. We laten ook zien hoe je JSON-gegevens kunt ontnesten met behulp van puntnotatie in Amazon Redshift. Het volgende diagram illustreert onze oplossingsarchitectuur.

De processtroom bestaat uit de volgende stappen:

  1. Creëer een streaming-gematerialiseerde weergave in uw Redshift-cluster om live streaming-gegevens van de MSK-onderwerpen te gebruiken.
  2. Gebruik een opgeslagen procedure om Change Data Capture (CDC) te implementeren met behulp van de unieke combinatie van Kafka Partition en Kafka Offset op recordniveau voor het opgenomen MSK-onderwerp.
  3. Maak een op de gebruiker gerichte tabel in het Redshift-cluster en gebruik puntnotatie om het JSON-document te ontkoppelen van de streaming-gematerialiseerde weergave naar gegevenskolommen van de tabel. U kunt voortdurend nieuwe gegevens laden door de opgeslagen procedure met regelmatige tussenpozen aan te roepen.
  4. Breng connectiviteit tot stand tussen een Amazon QuickSight dashboard en Amazon Redshift om visualisatie en inzichten te leveren.

Als onderdeel van dit bericht bespreken we ook de volgende onderwerpen:

  • Stappen om streaming-opname tussen accounts van Amazon MSK naar Amazon Redshift te configureren
  • Best practices om geoptimaliseerde prestaties te bereiken door gematerialiseerde weergaven te streamen
  • Monitoringtechnieken om fouten bij de streaming-opname van Amazon Redshift op te sporen

Voorwaarden

U moet het volgende hebben:

  • Een AWS-account.
  • Een van de volgende bronnen, afhankelijk van uw gebruiksscenario:
  • Een MSK-cluster. Voor instructies, zie Maak een Amazon MSK-cluster.
  • Een onderwerp in uw MSK-cluster waar uw gegevensproducent gegevens kan publiceren.
  • Een gegevensproducent om gegevens naar het onderwerp in uw MSK-cluster te schrijven.

Overwegingen bij het opzetten van uw MSK-onderwerp

Houd rekening met de volgende overwegingen bij het configureren van uw MSK-onderwerp:

  • Zorg ervoor dat de naam van uw MSK-onderwerp niet langer is dan 128 tekens.
  • Op het moment van schrijven kunnen MSK-records die gecomprimeerde gegevens bevatten niet rechtstreeks worden opgevraagd in Amazon Redshift. Amazon Redshift ondersteunt geen eigen decompressiemethoden voor gecomprimeerde gegevens aan de clientzijde in een MSK-onderwerp.
  • Volg 'best practices' tijdens het instellen van uw MSK-cluster.
  • Bekijk de streaming-opname beperkingen voor eventuele andere overwegingen.

Streamingopname instellen

Voer de volgende stappen uit om streamingopname in te stellen:

  1. Stel de AWS Identiteits- en toegangsbeheer (IAM) rol- en vertrouwensbeleid vereist voor streaming-opname. Voor instructies raadpleegt u de IAM instellen en streaming-opname uitvoeren vanuit Kafka.
  2. Zorg ervoor dat er gegevens naar uw MSK-onderwerp stromen met behulp van Amazon Cloud Watch metriek (bijvoorbeeld, BytesOutPerSec).
  3. Start de query-editor v2 vanuit de Amazon Redshift-console of gebruik uw favoriete SQL-client om verbinding te maken met uw Redshift-cluster voor de volgende stappen. De volgende stappen zijn uitgevoerd in de query-editor v2.
  4. Maak een extern schema om toe te wijzen aan het MSK-cluster. Vervang uw IAM-rol ARN en het MSK-cluster ARN in de volgende verklaring:
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  'iam-role-arn' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn';
    

  5. Als uw onderwerpnamen hoofdlettergevoelig zijn, moet u dit eventueel inschakelen enable_case_sensitive_identifier om er toegang toe te hebben in Amazon Redshift. Als u hoofdlettergevoelige ID's wilt gebruiken, stelt u in enable_case_sensitive_identifier naar true op sessie-, gebruikers- of clusterniveau:
    SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;

  6. Maak een gerealiseerde weergave om de stroomgegevens uit het MSK-onderwerp te gebruiken:
    CREATE MATERIALIZED VIEW Orders_Stream_MV AS
    SELECT kafka_partition, 
     kafka_offset, 
     refresh_time,
     JSON_PARSE(kafka_value) as Data
    FROM custschema."ORDERTOPIC"
    WHERE CAN_JSON_PARSE(kafka_value);
    

De metagegevenskolom kafka_value die afkomstig is van Amazon MSK, wordt opgeslagen in VARBYTE formaat in Amazon Redshift. Voor dit bericht gebruik je de JSON_PARSE functie om te zetten kafka_value een SUPER-gegevenstype. Je maakt ook gebruik van de CAN_JSON_PARSE functie in de filtervoorwaarde om ongeldige JSON-records over te slaan en te beschermen tegen fouten als gevolg van JSON-parseerfouten. We bespreken later in dit bericht hoe u de ongeldige gegevens kunt opslaan voor toekomstige foutopsporing.

  1. Ververs de gematerialiseerde streamingweergave, waardoor Amazon Redshift het MSK-onderwerp leest en gegevens in de gematerialiseerde weergave laadt:
    REFRESH MATERIALIZED VIEW Orders_Stream_MV;

U kunt uw gematerialiseerde streamingweergave ook zo instellen dat deze automatische vernieuwingsmogelijkheden gebruikt. Hierdoor wordt uw gerealiseerde weergave automatisch vernieuwd zodra er gegevens in de stream binnenkomen. Zien CREËER EEN GEMATERIALISEERD UITZICHT voor instructies om een ​​gerealiseerde weergave te maken met automatisch vernieuwen.

Ontnes het JSON-document

Het volgende is een voorbeeld van een JSON-document dat is opgenomen vanuit het MSK-onderwerp naar de gegevenskolom van het SUPER-type in de streaming-gematerialiseerde weergave Orders_Stream_MV:

{
   "EventType":"Orders",
   "OrderID":"103",
   "CustomerID":"C104",
   "CustomerName":"David Smith",
   "OrderDate":"2023-09-02",
   "Store_Name":"Store-103",
   "ProductID":"P004",
   "ProductName":"Widget-X-003",
   "Quatity":"5",
   "Price":"2500",
   "OrderStatus":"Initiated"
}

Gebruik puntnotatie zoals weergegeven in de volgende code om uw JSON-payload te ontnesten:

SELECT 
    data."OrderID"::INT4 as OrderID
    ,data."ProductID"::VARCHAR(36) as ProductID
    ,data."ProductName"::VARCHAR(36) as ProductName
    ,data."CustomerID"::VARCHAR(36) as CustomerID
    ,data."CustomerName"::VARCHAR(36) as CustomerName
    ,data."Store_Name"::VARCHAR(36) as Store_Name
    ,data."OrderDate"::TIMESTAMPTZ as OrderDate
    ,data."Quatity"::INT4 as Quatity
    ,data."Price"::DOUBLE PRECISION as Price
    ,data."OrderStatus"::VARCHAR(36) as OrderStatus
    ,"kafka_partition"::BIGINT  
    ,"kafka_offset"::BIGINT
FROM orders_stream_mv;

De volgende schermafbeelding laat zien hoe het resultaat eruit ziet na het ontnesten.

Als u arrays in uw JSON-document heeft, kunt u overwegen uw gegevens te ontnesten met behulp van PartiQL uitspraken in Amazon Redshift. Raadpleeg de sectie voor meer informatie Ontnes het JSON-document bij de post Bijna realtime analyses met behulp van Amazon Redshift-streamingopname met Amazon Kinesis Data Streams en Amazon DynamoDB.

Incrementele strategie voor het laden van gegevens

Voer de volgende stappen uit om een ​​incrementele gegevensbelasting te implementeren:

  1. Maak een tabel met de naam Orders in Amazon Redshift, die eindgebruikers zullen gebruiken voor visualisatie en bedrijfsanalyse:
    CREATE TABLE public.Orders (
        orderid integer ENCODE az64,
        productid character varying(36) ENCODE lzo,
        productname character varying(36) ENCODE lzo,
        customerid character varying(36) ENCODE lzo,
        customername character varying(36) ENCODE lzo,
        store_name character varying(36) ENCODE lzo,
        orderdate timestamp with time zone ENCODE az64,
        quatity integer ENCODE az64,
        price double precision ENCODE raw,
        orderstatus character varying(36) ENCODE lzo
    ) DISTSTYLE AUTO;
    

Vervolgens maakt u een opgeslagen procedure genaamd SP_Orders_Load om CDC te implementeren vanuit een streaming-gematerialiseerde weergave en in de finale te laden Orders tafel. Je gebruikt de combinatie van Kafka_Partition en Kafka_Offset beschikbaar in de streaming-gematerialiseerde weergave als systeemkolommen om CDC te implementeren. De combinatie van deze twee kolommen zal altijd uniek zijn binnen een MSK-onderwerp, wat ervoor zorgt dat geen van de records tijdens het proces wordt gemist. De opgeslagen procedure bevat de volgende componenten:

  • Als u hoofdlettergevoelige ID's wilt gebruiken, stelt u in enable_case_sensitive_identifier naar true op sessie-, gebruikers- of clusterniveau.
  • Vernieuw de streaming-gerealiseerde weergave handmatig als automatisch vernieuwen niet is ingeschakeld.
  • Maak een audittabel met de naam Orders_Streaming_Audit als het niet bestaat om de laatste offset bij te houden voor een partitie die in de Orders-tabel is geladen tijdens de laatste uitvoering van de opgeslagen procedure.
  • Alleen nieuwe of gewijzigde gegevens ontkoppelen en invoegen in een stagingtabel genaamd Orders_Staging_Table, lezend vanuit de streaming-gematerialiseerde weergave Orders_Stream_MV, Waar Kafka_Offset groter is dan de laatst verwerkte Kafka_Offset vastgelegd in de audittabel Orders_Streaming_Audit voor de Kafka_Partition wordt verwerkt.
  • Wanneer u voor de eerste keer laadt met behulp van deze opgeslagen procedure, staan ​​er geen gegevens in het bestand Orders_Streaming_Audit tabel en alle gegevens uit Orders_Stream_MV wordt in de tabel Orders geladen.
  • Voeg alleen bedrijfsrelevante kolommen in voor de gebruikersinterface Orders tabel, door te selecteren uit de verzameltabel Orders_Staging_Table.
  • Plaats de max Kafka_Offset voor elke geladen Kafka_Partition in de audittabel Orders_Streaming_Audit

We hebben de tussenliggende faseringstabel toegevoegd Orders_Staging_Table in deze oplossing om te helpen bij het debuggen in geval van onverwachte fouten en bij de traceerbaarheid. De faseringsstap overslaan en direct naar de finaletafel gaan Orders_Stream_MV kan een lagere latentie bieden, afhankelijk van uw gebruiksscenario.

  1. Maak de opgeslagen procedure met de volgende code:
    CREATE OR REPLACE PROCEDURE SP_Orders_Load()
        AS $$
        BEGIN
    
        SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;
        REFRESH MATERIALIZED VIEW Orders_Stream_MV;
    
        --create an audit table if not exists to keep track of Max Offset per Partition that was loaded into Orders table  
    
        CREATE TABLE IF NOT EXISTS Orders_Streaming_Audit
        (
        "kafka_partition" BIGINT,
        "kafka_offset" BIGINT
        )
        SORTKEY("kafka_partition", "kafka_offset"); 
    
        DROP TABLE IF EXISTS Orders_Staging_Table;  
    
        --Insert only newly available data into staging table from streaming View based on the max offset for new/existing partitions
      --When loading for 1st time i.e. there is no data in Orders_Streaming_Audit table then all the data gets loaded from streaming View  
        CREATE TABLE Orders_Staging_Table as 
        SELECT 
        data."OrderID"."N"::INT4 as OrderID
        ,data."ProductID"."S"::VARCHAR(36) as ProductID
        ,data."ProductName"."S"::VARCHAR(36) as ProductName
        ,data."CustomerID"."S"::VARCHAR(36) as CustomerID
        ,data."CustomerName"."S"::VARCHAR(36) as CustomerName
        ,data."Store_Name"."S"::VARCHAR(36) as Store_Name
        ,data."OrderDate"."S"::TIMESTAMPTZ as OrderDate
        ,data."Quatity"."N"::INT4 as Quatity
        ,data."Price"."N"::DOUBLE PRECISION as Price
        ,data."OrderStatus"."S"::VARCHAR(36) as OrderStatus
        , s."kafka_partition"::BIGINT , s."kafka_offset"::BIGINT
        FROM Orders_Stream_MV s
        LEFT JOIN (
        SELECT
        "kafka_partition",
        MAX("kafka_offset") AS "kafka_offset"
        FROM Orders_Streaming_Audit
        GROUP BY "kafka_partition"
        ) AS m
        ON nvl(s."kafka_partition",0) = nvl(m."kafka_partition",0)
        WHERE
        m."kafka_offset" IS NULL OR
        s."kafka_offset" > m."kafka_offset";
    
        --Insert only business relevant column to final table selecting from staging table
        Insert into Orders 
        SELECT 
        OrderID
        ,ProductID
        ,ProductName
        ,CustomerID
        ,CustomerName
        ,Store_Name
        ,OrderDate
        ,Quatity
        ,Price
        ,OrderStatus
        FROM Orders_Staging_Table;
    
        --Insert the max kafka_offset for every loaded Kafka partitions into Audit table 
        INSERT INTO Orders_Streaming_Audit
        SELECT kafka_partition, MAX(kafka_offset)
        FROM Orders_Staging_Table
        GROUP BY kafka_partition;   
    
        END;
        $$ LANGUAGE plpgsql;
    

  2. Voer de opgeslagen procedure uit om gegevens in het Orders tafel:
    call SP_Orders_Load();

  3. Valideer gegevens in de tabel Orders.

Breng streaming-opname voor meerdere accounts tot stand

Als uw MSK-cluster tot een ander account behoort, voert u de volgende stappen uit om IAM-rollen te maken om streaming-opname voor meerdere accounts in te stellen. Laten we aannemen dat het Roodverschuivingscluster zich in account A bevindt en het MSK-cluster in account B, zoals weergegeven in het volgende diagram.

Voer de volgende stappen uit:

  1. Maak in account B een IAM-rol aan met de naam MyRedshiftMSKRole waarmee Amazon Redshift (account A) kan communiceren met het genoemde MSK-cluster (account B). MyTestCluster. Afhankelijk van of uw MSK-cluster IAM-verificatie of niet-geverifieerde toegang gebruikt om verbinding te maken, moet u een IAM-rol maken met een van de volgende beleidsregels:
    • Een IAM policAmazonAmazon MSK gebruikt niet-geverifieerde toegang:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }

    • Een IAM-beleid voor Amazon MSK bij gebruik van IAM-authenticatie:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKIAMpolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka-cluster:ReadData",
                      "kafka-cluster:DescribeTopic",
                      "kafka-cluster:Connect"
                  ],
                  "Resource": [
                      "arn:aws:kafka:us-east-1:0123456789:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1",
                      "arn:aws:kafka:us-east-1:0123456789:topic/MyTestCluster/*"
                  ]
              },
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }
      

De resourcesectie in het voorgaande voorbeeld geeft toegang tot alle onderwerpen in de MyTestCluster MSK-cluster. Als u de IAM-rol moet beperken tot specifieke onderwerpen, moet u de onderwerpbron vervangen door een restrictiever bronbeleid.

  1. Nadat u de IAM-rol in account B heeft aangemaakt, noteert u de IAM-rol ARN (bijvoorbeeld arn:aws:iam::0123456789:role/MyRedshiftMSKRole).
  2. Maak in account A een aanpasbare Redshift-IAM-rol aan, genaamd MyRedshiftRole, die Amazon Redshift zal aannemen bij het verbinden met Amazon MSK. De rol moet een beleid als het volgende hebben, waardoor de Amazon Redshift IAM-rol in account A de Amazon MSK-rol in account B kan overnemen:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "RedshiftMSKAssumePolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::0123456789:role/MyRedshiftMSKRole"        
           }
        ]
    }
    

  3. Let op de rol ARN voor de Amazon Redshift IAM-rol (bijvoorbeeld arn:aws:iam::9876543210:role/MyRedshiftRole).
  4. Ga terug naar account B en voeg deze rol toe aan het vertrouwensbeleid van de IAM-rol arn:aws:iam::0123456789:role/MyRedshiftMSKRole om account B de IAM-rol van account A te laten vertrouwen. Het vertrouwensbeleid moet er als volgt uitzien:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": "sts:AssumeRole",
          "Principal": {
            "AWS": "arn:aws:iam::9876543210:role/MyRedshiftRole"
          }
        }
      ]
    } 
    

  5. Meld u aan bij de Amazon Redshift-console als account A.
  6. Start de query-editor v2 of uw favoriete SQL-client en voer de volgende instructies uit om toegang te krijgen tot het MSK-onderwerp in account B. Om toe te wijzen aan het MSK-cluster, maakt u een extern schema met behulp van rollenkettingen door IAM-rol-ARN's op te geven, gescheiden door een komma zonder spaties eromheen. De rol die verbonden is aan het Redshift-cluster komt op de eerste plaats in de keten.
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  
    'arn:aws:iam::9876543210:role/MyRedshiftRole,arn:aws:iam::0123456789:role/MyRedshiftMSKRole' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn'; --replace with ARN of MSK cluster 
    

Prestatieoverwegingen

Houd rekening met de volgende prestatieoverwegingen:

  • Houd de gematerialiseerde weergave van streaming eenvoudig en verplaats transformaties zoals ontnesten, aggregatie en case-expressies naar een latere stap, bijvoorbeeld door een andere gematerialiseerde weergave te maken bovenop de gematerialiseerde streaming-weergave.
  • Overweeg om slechts één gematerialiseerde weergave voor streaming te maken in één Redshift-cluster of werkgroep voor een bepaald MSK-onderwerp. Het maken van meerdere gerealiseerde weergaven per MSK-onderwerp kan de opnameprestaties vertragen, omdat elke gerealiseerde weergave een consument voor dat onderwerp wordt en de Amazon MSK-bandbreedte voor dat onderwerp deelt. Live streaming-gegevens in een streaming-gematerialiseerde weergave kunnen worden gedeeld tussen meerdere Redshift-clusters of Redshift Serverless-werkgroepen met behulp van het delen van gegevens.
  • Vermijd het gebruik van Json_Extract_Path_Text om gegevens vooraf te versnipperen, omdat Json_extract_path_text werkt rij voor rij op de gegevens, wat een aanzienlijke invloed heeft op de opnamedoorvoer. Het verdient de voorkeur om de gegevens uit de stream te halen en deze later te versnipperen.
  • Overweeg waar mogelijk om de sorteersleutel in de gematerialiseerde weergave van streaming over te slaan om de opnamesnelheid te versnellen. Wanneer een gematerialiseerde streamingweergave een sorteersleutel heeft, vindt er een sorteerbewerking plaats bij elke batch opgenomen gegevens uit de stream. Sorteren heeft een prestatie die wordt waargenomen, afhankelijk van het gegevenstype van de sorteersleutel, het aantal sorteersleutelkolommen en de hoeveelheid gegevens die in elke batch wordt opgenomen. Deze sorteerstap kan de latentie vergroten voordat de streaminggegevens beschikbaar zijn voor query's. U moet afwegen wat belangrijker is: latentie bij opname of latentie bij het opvragen van de gegevens.
  • Voor optimale prestaties van de streaming-gematerialiseerde weergave en om het opslaggebruik te verminderen, kunt u af en toe gegevens uit de gematerialiseerde weergave verwijderen met behulp van verwijderen, afkappenof verander tabel toevoegen.
  • Als je meerdere MSK-onderwerpen parallel in Amazon Redshift moet opnemen, begin dan met een kleiner aantal gematerialiseerde weergaven voor streaming en blijf meer gerealiseerde weergaven toevoegen om de algehele opnameprestaties binnen een cluster of werkgroep te evalueren.
  • Het vergroten van het aantal knooppunten in een door Redshift ingericht cluster of de basis-RPU van een Redshift Serverless-werkgroep kan de opnameprestaties van een gematerialiseerde streamingweergave helpen verbeteren. Voor optimale prestaties moet u ernaar streven om evenveel segmenten in uw door Redshift ingerichte cluster te hebben als er partities zijn in uw MSK-onderwerp, of 8 RPU voor elke vier partities in uw MSK-onderwerp.

Monitoring technieken

Records in het onderwerp die op het moment van opname de grootte van de doelgerealiseerde weergavekolom overschrijden, worden overgeslagen. Records die worden overgeslagen door de gerealiseerde weergavevernieuwing worden geregistreerd in de SYS_STREAM_SCAN_ERRORS systeem tafel.

Fouten die optreden bij het verwerken van een record als gevolg van een berekening of een gegevenstypeconversie of een andere logica in de definitie van de gerealiseerde weergave, zullen ertoe leiden dat het vernieuwen van de gerealiseerde weergave mislukt totdat de overtredende record uit het onderwerp is verlopen. Om dit soort problemen te voorkomen, moet u de logica van uw gematerialiseerde weergavedefinitie zorgvuldig testen; anders plaatst u de records in de standaard VARBYTE-kolom en verwerkt u ze later.

De volgende zijn beschikbare monitoringweergaven:

  • SYS_MV_REFRESH_HISTORY – Gebruik deze weergave om informatie te verzamelen over de vernieuwingsgeschiedenis van uw streaming-gematerialiseerde weergaven. De resultaten omvatten het vernieuwingstype, zoals handmatig of automatisch, en de status van de meest recente vernieuwing. De volgende query toont de vernieuwingsgeschiedenis voor een gematerialiseerde weergave van streaming:
    select mv_name, refresh_type, status, duration  from SYS_MV_REFRESH_HISTORY where mv_name='mv_store_sales'

  • SYS_STREAM_SCAN_ERRORS – Gebruik deze weergave om de reden te controleren waarom een ​​record niet kan worden geladen via streamingopname vanuit een MSK-onderwerp. Op het moment van schrijven van dit bericht registreert deze weergave bij opname vanuit Amazon MSK alleen fouten wanneer de record groter is dan de gerealiseerde weergavekolomgrootte. Deze weergave toont ook de unieke identificatie (offset) van het MSK-record in de positiekolom. De volgende query toont de foutcode en de foutreden wanneer een record de maximale groottelimiet overschrijdt:
    select mv_name, external_schema_name, stream_name, record_time, query_id, partition_id, "position", error_code, error_reason
    from SYS_STREAM_SCAN_ERRORS  where mv_name='test_mv' and external_schema_name ='streaming_schema'	;
    

  • SYS_STREAM_SCAN_STATES – Gebruik deze weergave om het aantal records te controleren dat op een bepaald recordtijdstip is gescand. In deze weergave wordt ook de offset bijgehouden van de laatst gelezen record in de batch. De volgende query toont onderwerpgegevens voor een specifieke gerealiseerde weergave:
    select mv_name,external_schema_name,stream_name,sum(scanned_rows) total_records,
    sum(scanned_bytes) total_bytes 
    from SYS_STREAM_SCAN_STATES where mv_name='test_mv' and external_schema_name ='streaming_schema' group by 1,2,3;
    

  • SYS_QUERY_HISTORY – Gebruik deze weergave om de algemene statistieken voor een gematerialiseerde weergavevernieuwing te controleren. Hierdoor worden ook fouten geregistreerd in de kolom error_message voor fouten die niet verschijnen in SYS_STREAM_SCAN_ERRORS. De volgende query toont de fout die ervoor zorgt dat het vernieuwen van een gematerialiseerde streamingweergave mislukt:
    select  query_id, query_type, status, query_text, error_message from sys_query_history where status='failed' and start_time>='2024-02-03 03:18:00' order by start_time desc

Aanvullende overwegingen voor de implementatie

U heeft de keuze om optioneel een gematerialiseerde weergave te genereren bovenop een gestreamde gematerialiseerde weergave, zodat u de resultaten voor eindgebruikers kunt ontkoppelen en vooraf kunt berekenen. Deze aanpak elimineert de noodzaak om de resultaten op te slaan in een finaletafel met behulp van een opgeslagen procedure.

In dit bericht gebruik je de CAN_JSON_PARSE-functie om te beschermen tegen eventuele fouten om gegevens met meer succes op te nemen. In dit geval worden de streamingrecords die niet kunnen worden geparseerd, overgeslagen door Amazon Redshift. Als u echter uw foutrecords wilt bijhouden, kunt u overwegen deze in een kolom op te slaan met behulp van de volgende SQL bij het maken van de gematerialiseerde streamingweergave:

CREATE MATERIALIZED VIEW Orders_Stream_MV AS 
SELECT
kafka_partition, 
kafka_offset, 
refresh_time, 
JSON_PARSE(kafka_value) as Data 
case when CAN_JSON_PARSE(kafka_value) = true then json_parse(kafka_value) end Data,
case when CAN_JSON_PARSE(kafka_value) = false then kafka_value end Invalid_Data
FROM custschema."ORDERTOPIC";

Je kunt ook overwegen gegevens uitladen vanuit het zicht SYS_STREAM_SCAN_ERRORS in een Amazon eenvoudige opslagservice (Amazon S3) bucket en ontvang meldingen via het verzenden van een rapport via e-mail gebruik Amazon eenvoudige meldingsservice (Amazon SNS) meldingen wanneer een nieuw S3-object wordt gemaakt.

Ten slotte kunt u, op basis van uw gegevensversheidsvereiste, gebruiken Amazon EventBridge om de taken in uw datawarehouse te plannen om het bovengenoemde te bellen SP_Orders_Load opgeslagen procedure regelmatig. EventBridge doet dit met vaste tussenpozen, en mogelijk heeft u een mechanisme nodig (bijvoorbeeld een AWS Stap Functies state machine) om te controleren of de vorige aanroep van de procedure is voltooid. Voor meer informatie, zie Een Amazon EventBridge-regel maken die volgens een schema wordt uitgevoerd. U kunt ook verwijzen naar Versnel de orkestratie van een ELT-proces met behulp van AWS Step Functions en Amazon Redshift Data API. Een andere optie is gebruiken Amazon Redshift-query-editor v2 om de vernieuwing te plannen. Voor details, zie Een query plannen met de query-editor v2.

Conclusie

In dit bericht hebben we best practices besproken om bijna-realtime analyses te implementeren met behulp van Amazon Redshift-streamingopname met Amazon MSK. We hebben u een voorbeeldpijplijn laten zien om gegevens van een MSK-onderwerp in Amazon Redshift op te nemen met behulp van streaming-opname. We hebben ook een betrouwbare strategie laten zien voor het incrementeel laden van streaminggegevens in Amazon Redshift met behulp van Kafka Partition en Kafka Offset. Daarnaast hebben we de stappen gedemonstreerd voor het configureren van streaming-opname tussen accounts van Amazon MSK naar Amazon Redshift en hebben we prestatieoverwegingen besproken voor een geoptimaliseerde opnamesnelheid. Ten slotte bespraken we monitoringtechnieken om fouten bij de streaming-opname van Amazon Redshift op te sporen.

Als u vragen heeft, kunt u deze achterlaten in het opmerkingengedeelte.


Over de auteurs

Poulomi Dasgupta is Senior Analytics Solutions Architect bij AWS. Ze heeft een passie voor het helpen van klanten bij het bouwen van cloudgebaseerde analyseoplossingen om hun zakelijke problemen op te lossen. Naast haar werk houdt ze van reizen en tijd doorbrengen met haar gezin.

Adekunle Adedotun is een Sr. Database Engineer met Amazon Redshift-service. Hij werkt al 6 jaar aan MPP-databases met een focus op prestatieafstemming. Hij geeft ook leiding aan het ontwikkelingsteam voor nieuwe en bestaande servicefuncties.

spot_img

Laatste intelligentie

spot_img