Zephyrnet-Logo

Best Practices zur Implementierung von Analysen nahezu in Echtzeit mithilfe von Amazon Redshift Streaming Ingestion mit Amazon MSK | Amazon Web Services

Datum:

Amazon RedShift ist ein vollständig verwaltetes, skalierbares Cloud-Data-Warehouse, das Ihre Zeit zu Erkenntnissen durch schnelle, unkomplizierte und sichere Analysen in großem Maßstab beschleunigt. Zehntausende Kunden verlassen sich auf Amazon Redshift, um Exabytes an Daten zu analysieren und komplexe Analyseabfragen auszuführen, was es zum am weitesten verbreiteten Cloud-Data-Warehouse macht. Sie können in Sekundenschnelle Analysen aller Ihrer Daten durchführen und skalieren, ohne Ihre Data-Warehouse-Infrastruktur verwalten zu müssen.

Sie können die Verwendung Amazon Redshift-Streaming-Aufnahme Möglichkeit, Ihre Analysedatenbanken nahezu in Echtzeit zu aktualisieren. Die Streaming-Aufnahme von Amazon Redshift vereinfacht Datenpipelines, indem Sie materialisierte Ansichten direkt auf Datenströmen erstellen können. Mit dieser Funktion in Amazon Redshift können Sie Structured Query Language (SQL) verwenden, um eine Verbindung zu Datenströmen herzustellen und diese direkt zu erfassen, z Amazon Kinesis-Datenströme or Amazon Managed Streaming für Apache Kafka (Amazon MSK) Datenströme und ziehen Daten direkt an Amazon Redshift.

In diesem Beitrag besprechen wir die Best Practices zur Implementierung von Analysen nahezu in Echtzeit mithilfe der Amazon Redshift-Streaming-Aufnahme mit Amazon MSK.

Lösungsübersicht

Wir gehen eine Beispielpipeline durch, um Daten aus einem MSK-Thema mithilfe der Amazon-Redshift-Streaming-Aufnahme in Amazon Redshift aufzunehmen. Wir zeigen auch, wie man JSON-Daten mithilfe der Punktnotation in Amazon Redshift entschachtelt. Das folgende Diagramm veranschaulicht unsere Lösungsarchitektur.

Der Prozessablauf besteht aus folgenden Schritten:

  1. Erstellen Sie eine materialisierte Streaming-Ansicht in Ihrem Redshift-Cluster, um Live-Streaming-Daten aus den MSK-Themen zu nutzen.
  2. Verwenden Sie eine gespeicherte Prozedur, um Change Data Capture (CDC) mithilfe der einzigartigen Kombination aus Kafka-Partition und Kafka-Offset auf Datensatzebene für das aufgenommene MSK-Thema zu implementieren.
  3. Erstellen Sie eine benutzerorientierte Tabelle im Redshift-Cluster und verwenden Sie die Punktnotation, um die Verschachtelung des JSON-Dokuments aus der materialisierten Streaming-Ansicht in Datenspalten der Tabelle aufzuheben. Sie können kontinuierlich neue Daten laden, indem Sie die gespeicherte Prozedur in regelmäßigen Abständen aufrufen.
  4. Konnektivität herstellen zwischen einem Amazon QuickSight Dashboard und Amazon Redshift zur Bereitstellung von Visualisierung und Erkenntnissen.

Im Rahmen dieses Beitrags besprechen wir auch folgende Themen:

  • Schritte zum Konfigurieren der kontoübergreifenden Streaming-Aufnahme von Amazon MSK zu Amazon Redshift
  • Best Practices zur Erzielung einer optimierten Leistung durch Streaming materialisierter Ansichten
  • Überwachungstechniken zur Verfolgung von Fehlern bei der Amazon-Redshift-Streaming-Aufnahme

Voraussetzungen:

Sie müssen Folgendes haben:

  • Ein AWS-Konto.
  • Abhängig von Ihrem Anwendungsfall eine der folgenden Ressourcen:
  • Ein MSK-Cluster. Anweisungen finden Sie unter Erstellen Sie einen Amazon MSK-Cluster.
  • Ein Thema in Ihrem MSK-Cluster, wo Ihr Datenproduzent Daten veröffentlichen kann.
  • Ein Datenproduzent zum Schreiben von Daten in das Thema in Ihrem MSK-Cluster.

Überlegungen beim Einrichten Ihres MSK-Themas

Beachten Sie beim Konfigurieren Ihres MSK-Themas die folgenden Überlegungen:

  • Stellen Sie sicher, dass der Name Ihres MSK-Themas nicht länger als 128 Zeichen ist.
  • Zum jetzigen Zeitpunkt können MSK-Datensätze mit komprimierten Daten nicht direkt in Amazon Redshift abgefragt werden. Amazon Redshift unterstützt keine nativen Dekomprimierungsmethoden für clientseitig komprimierte Daten in einem MSK-Thema.
  • Folgen Sie uns Best Practices beim Einrichten Ihres MSK-Clusters.
  • Überprüfen Sie die Streaming-Aufnahme Einschränkungen für alle anderen Überlegungen.

Richten Sie die Streamingaufnahme ein

Führen Sie die folgenden Schritte aus, um die Streamingaufnahme einzurichten:

  1. Richten Sie die AWS Identity and Access Management and (IAM)-Rolle und Vertrauensrichtlinie, die für die Streaming-Aufnahme erforderlich sind. Anweisungen finden Sie im Einrichten von IAM und Durchführen der Streaming-Aufnahme von Kafka.
  2. Stellen Sie sicher, dass Daten in Ihr MSK-Thema fließen Amazon CloudWatch Metriken (zum Beispiel, BytesOutPerSec).
  3. Starten Sie den Abfrageeditor v2 über die Amazon Redshift-Konsole oder verwenden Sie Ihren bevorzugten SQL-Client, um für die nächsten Schritte eine Verbindung zu Ihrem Redshift-Cluster herzustellen. Die folgenden Schritte wurden im Abfrageeditor v2 ausgeführt.
  4. Erstellen Sie ein externes Schema zur Zuordnung zum MSK-Cluster. Ersetzen Sie Ihren IAM-Rollen-ARN und den MSK-Cluster-ARN in der folgenden Anweisung:
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  'iam-role-arn' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn';
    

  5. Wenn bei Ihren Themennamen die Groß-/Kleinschreibung beachtet werden muss, müssen Sie dies optional aktivieren enable_case_sensitive_identifier um in Amazon Redshift darauf zugreifen zu können. Um Bezeichner zu verwenden, bei denen die Groß-/Kleinschreibung beachtet wird, legen Sie fest enable_case_sensitive_identifier entweder auf Sitzungs-, Benutzer- oder Clusterebene auf „true“ setzen:
    SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;

  6. Erstellen Sie eine materialisierte Ansicht, um die Stream-Daten aus dem MSK-Thema zu nutzen:
    CREATE MATERIALIZED VIEW Orders_Stream_MV AS
    SELECT kafka_partition, 
     kafka_offset, 
     refresh_time,
     JSON_PARSE(kafka_value) as Data
    FROM custschema."ORDERTOPIC"
    WHERE CAN_JSON_PARSE(kafka_value);
    

Die Metadatenspalte kafka_value das von Amazon ankommt, wird in MSK gespeichert VARBYTE Format in Amazon Redshift. Für diesen Beitrag verwenden Sie die JSON_PARSE Funktion zum Konvertieren kafka_value zum SUPER-Datentyp. Sie verwenden auch die CAN_JSON_PARSE Funktion in der Filterbedingung, um ungültige JSON-Datensätze zu überspringen und Fehler aufgrund von JSON-Analysefehlern zu verhindern. Wir besprechen später in diesem Beitrag, wie die ungültigen Daten für zukünftige Debugging-Vorgänge gespeichert werden.

  1. Aktualisieren Sie die materialisierte Streaming-Ansicht, wodurch Amazon Redshift veranlasst wird, aus dem MSK-Thema zu lesen und Daten in die materialisierte Ansicht zu laden:
    REFRESH MATERIALIZED VIEW Orders_Stream_MV;

Sie können Ihre materialisierte Streaming-Ansicht auch so einstellen, dass sie automatische Aktualisierungsfunktionen verwendet. Dadurch wird Ihre materialisierte Ansicht automatisch aktualisiert, sobald Daten im Stream eintreffen. Sehen ERSTELLEN SIE EINE MATERIALISIERTE ANSICHT Anweisungen zum Erstellen einer materialisierten Ansicht mit automatischer Aktualisierung finden Sie hier.

Entschachteln Sie das JSON-Dokument

Das Folgende ist ein Beispiel eines JSON-Dokuments, das aus dem MSK-Thema in die Datenspalte vom Typ SUPER in der materialisierten Streaming-Ansicht aufgenommen wurde Orders_Stream_MV:

{
   "EventType":"Orders",
   "OrderID":"103",
   "CustomerID":"C104",
   "CustomerName":"David Smith",
   "OrderDate":"2023-09-02",
   "Store_Name":"Store-103",
   "ProductID":"P004",
   "ProductName":"Widget-X-003",
   "Quatity":"5",
   "Price":"2500",
   "OrderStatus":"Initiated"
}

Verwenden Sie die Punktnotation, wie im folgenden Code gezeigt, um die Verschachtelung Ihrer JSON-Nutzlast aufzuheben:

SELECT 
    data."OrderID"::INT4 as OrderID
    ,data."ProductID"::VARCHAR(36) as ProductID
    ,data."ProductName"::VARCHAR(36) as ProductName
    ,data."CustomerID"::VARCHAR(36) as CustomerID
    ,data."CustomerName"::VARCHAR(36) as CustomerName
    ,data."Store_Name"::VARCHAR(36) as Store_Name
    ,data."OrderDate"::TIMESTAMPTZ as OrderDate
    ,data."Quatity"::INT4 as Quatity
    ,data."Price"::DOUBLE PRECISION as Price
    ,data."OrderStatus"::VARCHAR(36) as OrderStatus
    ,"kafka_partition"::BIGINT  
    ,"kafka_offset"::BIGINT
FROM orders_stream_mv;

Der folgende Screenshot zeigt, wie das Ergebnis nach dem Aufheben der Verschachtelung aussieht.

Wenn Ihr JSON-Dokument Arrays enthält, sollten Sie die Verschachtelung Ihrer Daten mit aufheben PartiQL Anweisungen in Amazon Redshift. Weitere Informationen finden Sie im Abschnitt Entschachteln Sie das JSON-Dokument in der Post Nahezu-Echtzeit-Analysen durch Amazon Redshift-Streaming-Aufnahme mit Amazon Kinesis Data Streams und Amazon DynamoDB.

Inkrementelle Datenladestrategie

Führen Sie die folgenden Schritte aus, um einen inkrementellen Datenladevorgang zu implementieren:

  1. Erstellen Sie in Amazon Redshift eine Tabelle mit dem Namen „Bestellungen“, die Endbenutzer zur Visualisierung und Geschäftsanalyse verwenden:
    CREATE TABLE public.Orders (
        orderid integer ENCODE az64,
        productid character varying(36) ENCODE lzo,
        productname character varying(36) ENCODE lzo,
        customerid character varying(36) ENCODE lzo,
        customername character varying(36) ENCODE lzo,
        store_name character varying(36) ENCODE lzo,
        orderdate timestamp with time zone ENCODE az64,
        quatity integer ENCODE az64,
        price double precision ENCODE raw,
        orderstatus character varying(36) ENCODE lzo
    ) DISTSTYLE AUTO;
    

Als Nächstes erstellen Sie eine gespeicherte Prozedur namens SP_Orders_Load um CDC aus einer materialisierten Streaming-Ansicht zu implementieren und in das Finale zu laden Orders Tisch. Sie verwenden die Kombination von Kafka_Partition und Kafka_Offset in der materialisierten Streaming-Ansicht als Systemspalten zur Implementierung von CDC verfügbar. Die Kombination dieser beiden Spalten ist innerhalb eines MSK-Themas immer eindeutig, wodurch sichergestellt wird, dass während des Prozesses keiner der Datensätze übersehen wird. Die gespeicherte Prozedur enthält die folgenden Komponenten:

  • Um Bezeichner zu verwenden, bei denen die Groß-/Kleinschreibung beachtet wird, legen Sie fest enable_case_sensitive_identifier entweder auf Sitzungs-, Benutzer- oder Clusterebene auf „true“ setzen.
  • Aktualisieren Sie die materialisierte Streaming-Ansicht manuell, wenn die automatische Aktualisierung nicht aktiviert ist.
  • Erstellen Sie eine Prüftabelle mit dem Namen Orders_Streaming_Audit Wenn es nicht vorhanden ist, um den letzten Offset für eine Partition zu verfolgen, die während der letzten Ausführung der gespeicherten Prozedur in die Orders-Tabelle geladen wurde.
  • Heben Sie die Verschachtelung auf und fügen Sie nur neue oder geänderte Daten in eine Staging-Tabelle namens ein Orders_Staging_Table, Lesen aus der materialisierten Streaming-Ansicht Orders_Stream_MV, Wobei Kafka_Offset ist größer als der zuletzt verarbeitete Kafka_Offset in der Audit-Tabelle aufgezeichnet Orders_Streaming_Audit für die Kafka_Partition verarbeitet wird.
  • Beim ersten Laden mit dieser gespeicherten Prozedur sind keine Daten darin vorhanden Orders_Streaming_Audit Tabelle und alle Daten aus Orders_Stream_MV wird in die Tabelle „Bestellungen“ geladen.
  • Fügen Sie auf der Benutzeroberfläche nur geschäftsrelevante Spalten ein Orders Tabelle, Auswahl aus der Staging-Tabelle Orders_Staging_Table.
  • Geben Sie die max Kafka_Offset für jede geladene Kafka_Partition in die Audit-Tabelle Orders_Streaming_Audit

Wir haben die Zwischen-Staging-Tabelle hinzugefügt Orders_Staging_Table in dieser Lösung, um beim Debuggen bei unerwarteten Fehlern und der Nachverfolgbarkeit zu helfen. Den Staging-Schritt überspringen und direkt in den Finaltisch laden Orders_Stream_MV kann je nach Anwendungsfall eine geringere Latenz bieten.

  1. Erstellen Sie die gespeicherte Prozedur mit dem folgenden Code:
    CREATE OR REPLACE PROCEDURE SP_Orders_Load()
        AS $$
        BEGIN
    
        SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;
        REFRESH MATERIALIZED VIEW Orders_Stream_MV;
    
        --create an audit table if not exists to keep track of Max Offset per Partition that was loaded into Orders table  
    
        CREATE TABLE IF NOT EXISTS Orders_Streaming_Audit
        (
        "kafka_partition" BIGINT,
        "kafka_offset" BIGINT
        )
        SORTKEY("kafka_partition", "kafka_offset"); 
    
        DROP TABLE IF EXISTS Orders_Staging_Table;  
    
        --Insert only newly available data into staging table from streaming View based on the max offset for new/existing partitions
      --When loading for 1st time i.e. there is no data in Orders_Streaming_Audit table then all the data gets loaded from streaming View  
        CREATE TABLE Orders_Staging_Table as 
        SELECT 
        data."OrderID"."N"::INT4 as OrderID
        ,data."ProductID"."S"::VARCHAR(36) as ProductID
        ,data."ProductName"."S"::VARCHAR(36) as ProductName
        ,data."CustomerID"."S"::VARCHAR(36) as CustomerID
        ,data."CustomerName"."S"::VARCHAR(36) as CustomerName
        ,data."Store_Name"."S"::VARCHAR(36) as Store_Name
        ,data."OrderDate"."S"::TIMESTAMPTZ as OrderDate
        ,data."Quatity"."N"::INT4 as Quatity
        ,data."Price"."N"::DOUBLE PRECISION as Price
        ,data."OrderStatus"."S"::VARCHAR(36) as OrderStatus
        , s."kafka_partition"::BIGINT , s."kafka_offset"::BIGINT
        FROM Orders_Stream_MV s
        LEFT JOIN (
        SELECT
        "kafka_partition",
        MAX("kafka_offset") AS "kafka_offset"
        FROM Orders_Streaming_Audit
        GROUP BY "kafka_partition"
        ) AS m
        ON nvl(s."kafka_partition",0) = nvl(m."kafka_partition",0)
        WHERE
        m."kafka_offset" IS NULL OR
        s."kafka_offset" > m."kafka_offset";
    
        --Insert only business relevant column to final table selecting from staging table
        Insert into Orders 
        SELECT 
        OrderID
        ,ProductID
        ,ProductName
        ,CustomerID
        ,CustomerName
        ,Store_Name
        ,OrderDate
        ,Quatity
        ,Price
        ,OrderStatus
        FROM Orders_Staging_Table;
    
        --Insert the max kafka_offset for every loaded Kafka partitions into Audit table 
        INSERT INTO Orders_Streaming_Audit
        SELECT kafka_partition, MAX(kafka_offset)
        FROM Orders_Staging_Table
        GROUP BY kafka_partition;   
    
        END;
        $$ LANGUAGE plpgsql;
    

  2. Führen Sie die gespeicherte Prozedur aus, um Daten in die zu laden Orders Tabelle:
    call SP_Orders_Load();

  3. Validieren Sie die Daten in der Tabelle „Bestellungen“.

Richten Sie eine kontoübergreifende Streaming-Aufnahme ein

Wenn Ihr MSK-Cluster zu einem anderen Konto gehört, führen Sie die folgenden Schritte aus, um IAM-Rollen zu erstellen und die kontoübergreifende Streaming-Aufnahme einzurichten. Nehmen wir an, dass sich der Redshift-Cluster in Konto A und der MSK-Cluster in Konto B befindet, wie im folgenden Diagramm dargestellt.

Führen Sie die folgenden Schritte aus:

  1. Erstellen Sie in Konto B eine IAM-Rolle mit dem Namen MyRedshiftMSKRole Dadurch kann Amazon Redshift (Konto A) mit dem genannten MSK-Cluster (Konto B) kommunizieren MyTestCluster. Abhängig davon, ob Ihr MSK-Cluster für die Verbindung IAM-Authentifizierung oder nicht authentifizierten Zugriff verwendet, müssen Sie eine IAM-Rolle mit einer der folgenden Richtlinien erstellen:
    • Ein IAM policAmazonAmazon MSK nutzt nicht authentifizierten Zugriff:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }

    • Eine IAM-Richtlinie für Amazon MSK bei Verwendung der IAM-Authentifizierung:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKIAMpolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka-cluster:ReadData",
                      "kafka-cluster:DescribeTopic",
                      "kafka-cluster:Connect"
                  ],
                  "Resource": [
                      "arn:aws:kafka:us-east-1:0123456789:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1",
                      "arn:aws:kafka:us-east-1:0123456789:topic/MyTestCluster/*"
                  ]
              },
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }
      

Der Ressourcenabschnitt im vorherigen Beispiel ermöglicht den Zugriff auf alle Themen im MyTestCluster MSK-Cluster. Wenn Sie die IAM-Rolle auf bestimmte Themen beschränken müssen, müssen Sie die Themenressource durch eine restriktivere Ressourcenrichtlinie ersetzen.

  1. Nachdem Sie die IAM-Rolle in Konto B erstellt haben, notieren Sie sich den ARN der IAM-Rolle (z. B. arn:aws:iam::0123456789:role/MyRedshiftMSKRole).
  2. Erstellen Sie in Konto A eine anpassbare Redshift-IAM-Rolle mit dem Namen MyRedshiftRole, die Amazon Redshift bei der Verbindung mit Amazon MSK annimmt. Die Rolle sollte eine Richtlinie wie die folgende haben, die es der Amazon Redshift IAM-Rolle in Konto A ermöglicht, die Amazon MSK-Rolle in Konto B zu übernehmen:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "RedshiftMSKAssumePolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::0123456789:role/MyRedshiftMSKRole"        
           }
        ]
    }
    

  3. Notieren Sie sich den Rollen-ARN für die Amazon Redshift IAM-Rolle (z. B. arn:aws:iam::9876543210:role/MyRedshiftRole).
  4. Gehen Sie zurück zu Konto B und fügen Sie diese Rolle in der Vertrauensrichtlinie der IAM-Rolle hinzu arn:aws:iam::0123456789:role/MyRedshiftMSKRole um Konto B zu erlauben, der IAM-Rolle von Konto A zu vertrauen. Die Vertrauensrichtlinie sollte wie der folgende Code aussehen:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": "sts:AssumeRole",
          "Principal": {
            "AWS": "arn:aws:iam::9876543210:role/MyRedshiftRole"
          }
        }
      ]
    } 
    

  5. Melden Sie sich als Konto A bei der Amazon Redshift-Konsole an.
  6. Starten Sie den Abfrageeditor v2 oder Ihren bevorzugten SQL-Client und führen Sie die folgenden Anweisungen aus, um auf das MSK-Thema in Konto B zuzugreifen. Erstellen Sie zum Zuordnen zum MSK-Cluster ein externes Schema mit Rollenverkettung durch Angabe von IAM-Rollen-ARNs, getrennt durch ein Komma ohne Leerzeichen. Die dem Redshift-Cluster zugeordnete Rolle steht an erster Stelle in der Kette.
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  
    'arn:aws:iam::9876543210:role/MyRedshiftRole,arn:aws:iam::0123456789:role/MyRedshiftMSKRole' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn'; --replace with ARN of MSK cluster 
    

Leistungsüberlegungen

Beachten Sie die folgenden Leistungsaspekte:

  • Halten Sie die materialisierte Streaming-Ansicht einfach und verschieben Sie Transformationen wie Aufheben der Verschachtelung, Aggregation und Fallausdrücke in einen späteren Schritt – indem Sie beispielsweise eine weitere materialisierte Ansicht über der materialisierten Streaming-Ansicht erstellen.
  • Erwägen Sie die Erstellung nur einer materialisierten Streaming-Ansicht in einem einzelnen Redshift-Cluster oder einer einzelnen Redshift-Arbeitsgruppe für ein bestimmtes MSK-Thema. Die Erstellung mehrerer materialisierter Ansichten pro MSK-Thema kann die Aufnahmeleistung verlangsamen, da jede materialisierte Ansicht zum Verbraucher für dieses Thema wird und die Amazon MSK-Bandbreite für dieses Thema teilt. Live-Streaming-Daten in einer materialisierten Streaming-Ansicht können über mehrere Redshift-Cluster oder Redshift Serverless-Arbeitsgruppen gemeinsam genutzt werden Datenübertragung.
  • Vermeiden Sie beim Definieren Ihrer materialisierten Streaming-Ansicht die Verwendung Json_Extract_Path_Text Daten vorab zu zerkleinern, weil Json_extract_path_text bearbeitet die Daten Zeile für Zeile, was sich erheblich auf den Aufnahmedurchsatz auswirkt. Es ist vorzuziehen, die Daten unverändert aus dem Stream zu übertragen und sie später zu vernichten.
  • Erwägen Sie nach Möglichkeit, den Sortierschlüssel in der materialisierten Streaming-Ansicht zu überspringen, um die Aufnahmegeschwindigkeit zu beschleunigen. Wenn eine materialisierte Streaming-Ansicht über einen Sortierschlüssel verfügt, wird bei jedem Stapel erfasster Daten aus dem Stream ein Sortiervorgang ausgeführt. Beim Sortieren kommt es zu Leistungseinbußen, die vom Datentyp des Sortierschlüssels, der Anzahl der Sortierschlüsselspalten und der in jedem Stapel erfassten Datenmenge abhängen. Dieser Sortierschritt kann die Latenz erhöhen, bevor die Streaming-Daten zur Abfrage verfügbar sind. Sie sollten abwägen, was wichtiger ist: Latenz bei der Aufnahme oder Latenz bei der Abfrage der Daten.
  • Um die Leistung der materialisierten Streaming-Ansicht zu optimieren und die Speichernutzung zu reduzieren, löschen Sie gelegentlich Daten aus der materialisierten Ansicht mit löschen, abschneiden, oder Tabelle ändern, anhängen.
  • Wenn Sie mehrere MSK-Themen parallel in Amazon Redshift aufnehmen müssen, beginnen Sie mit einer kleineren Anzahl von Streaming-Materialized Views und fügen Sie immer mehr Materialized Views hinzu, um die Gesamtleistung der Aufnahme innerhalb eines Clusters oder einer Arbeitsgruppe zu bewerten.
  • Durch Erhöhen der Anzahl der Knoten in einem von Redshift bereitgestellten Cluster oder der Basis-RPU einer Redshift Serverless-Arbeitsgruppe kann die Aufnahmeleistung einer materialisierten Streaming-Ansicht gesteigert werden. Für eine optimale Leistung sollten Sie darauf abzielen, so viele Slices in Ihrem von Redshift bereitgestellten Cluster zu haben, wie Partitionen in Ihrem MSK-Thema vorhanden sind, oder 8 RPU für jeweils vier Partitionen in Ihrem MSK-Thema.

Überwachungstechniken

Datensätze im Thema, die zum Zeitpunkt der Aufnahme die Größe der Zielspalte der materialisierten Ansicht überschreiten, werden übersprungen. Datensätze, die durch die Aktualisierung der materialisierten Ansicht übersprungen werden, werden im protokolliert SYS_STREAM_SCAN_ERRORS Systemtabelle.

Fehler, die bei der Verarbeitung eines Datensatzes aufgrund einer Berechnung, einer Datentypkonvertierung oder einer anderen Logik in der Definition der materialisierten Ansicht auftreten, führen dazu, dass die Aktualisierung der materialisierten Ansicht fehlschlägt, bis der fehlerhafte Datensatz aus dem Thema abgelaufen ist. Um diese Art von Problemen zu vermeiden, testen Sie die Logik Ihrer materialisierten Ansichtsdefinition sorgfältig. Andernfalls landen Sie die Datensätze in der Standardspalte VARBYTE und verarbeiten Sie sie später.

Die folgenden Überwachungsansichten sind verfügbar:

  • SYS_MV_REFRESH_HISTORY – Verwenden Sie diese Ansicht, um Informationen über den Aktualisierungsverlauf Ihrer materialisierten Streaming-Ansichten zu sammeln. Die Ergebnisse umfassen den Aktualisierungstyp, z. B. manuell oder automatisch, und den Status der letzten Aktualisierung. Die folgende Abfrage zeigt den Aktualisierungsverlauf für eine materialisierte Streaming-Ansicht:
    select mv_name, refresh_type, status, duration  from SYS_MV_REFRESH_HISTORY where mv_name='mv_store_sales'

  • SYS_STREAM_SCAN_ERRORS – Verwenden Sie diese Ansicht, um den Grund zu überprüfen, warum ein Datensatz nicht über die Streaming-Aufnahme aus einem MSK-Thema geladen werden konnte. Zum Zeitpunkt des Verfassens dieses Beitrags protokolliert diese Ansicht bei der Aufnahme aus Amazon MSK nur dann Fehler, wenn der Datensatz größer als die Spaltengröße der materialisierten Ansicht ist. In dieser Ansicht wird auch die eindeutige Kennung (Offset) des MSK-Datensatzes in der Positionsspalte angezeigt. Die folgende Abfrage zeigt den Fehlercode und die Fehlerursache an, wenn ein Datensatz die maximale Größenbeschränkung überschreitet:
    select mv_name, external_schema_name, stream_name, record_time, query_id, partition_id, "position", error_code, error_reason
    from SYS_STREAM_SCAN_ERRORS  where mv_name='test_mv' and external_schema_name ='streaming_schema'	;
    

  • SYS_STREAM_SCAN_STATES – Verwenden Sie diese Ansicht, um die Anzahl der zu einem bestimmten Datensatzzeitpunkt gescannten Datensätze zu überwachen. Diese Ansicht verfolgt auch den Offset des letzten im Stapel gelesenen Datensatzes. Die folgende Abfrage zeigt Themendaten für eine bestimmte materialisierte Ansicht:
    select mv_name,external_schema_name,stream_name,sum(scanned_rows) total_records,
    sum(scanned_bytes) total_bytes 
    from SYS_STREAM_SCAN_STATES where mv_name='test_mv' and external_schema_name ='streaming_schema' group by 1,2,3;
    

  • SYS_QUERY_HISTORY – Verwenden Sie diese Ansicht, um die Gesamtmetriken für eine Streaming-Aktualisierung einer materialisierten Ansicht zu überprüfen. Dadurch werden auch Fehler in der Spalte „error_message“ für Fehler protokolliert, die nicht in angezeigt werden SYS_STREAM_SCAN_ERRORS. Die folgende Abfrage zeigt den Fehler, der den Aktualisierungsfehler einer materialisierten Streaming-Ansicht verursacht:
    select  query_id, query_type, status, query_text, error_message from sys_query_history where status='failed' and start_time>='2024-02-03 03:18:00' order by start_time desc

Zusätzliche Überlegungen zur Implementierung

Sie haben die Möglichkeit, optional eine materialisierte Ansicht zusätzlich zu einer materialisierten Streaming-Ansicht zu generieren, sodass Sie die Verschachtelung aufheben und Ergebnisse für Endbenutzer vorab berechnen können. Durch diesen Ansatz entfällt die Notwendigkeit, die Ergebnisse mithilfe einer gespeicherten Prozedur in einer Abschlusstabelle zu speichern.

In diesem Beitrag verwenden Sie die CAN_JSON_PARSE-Funktion Um sich vor Fehlern zu schützen und Daten erfolgreicher aufzunehmen – in diesem Fall werden die Streaming-Datensätze, die nicht analysiert werden können, von Amazon Redshift übersprungen. Wenn Sie jedoch den Überblick über Ihre Fehlerdatensätze behalten möchten, sollten Sie diese beim Erstellen der materialisierten Streaming-Ansicht mithilfe der folgenden SQL in einer Spalte speichern:

CREATE MATERIALIZED VIEW Orders_Stream_MV AS 
SELECT
kafka_partition, 
kafka_offset, 
refresh_time, 
JSON_PARSE(kafka_value) as Data 
case when CAN_JSON_PARSE(kafka_value) = true then json_parse(kafka_value) end Data,
case when CAN_JSON_PARSE(kafka_value) = false then kafka_value end Invalid_Data
FROM custschema."ORDERTOPIC";

Das können Sie auch überlegen Daten entladen aus der Sicht SYS_STREAM_SCAN_ERRORS In ein Amazon Simple Storage-Service (Amazon S3) Bucket und erhalten Sie Benachrichtigungen von Senden eines Berichts per E-Mail Verwendung von Amazon Simple Notification Service (Amazon SNS)-Benachrichtigungen, wenn ein neues S3-Objekt erstellt wird.

Schließlich können Sie je nach Ihren Anforderungen an die Datenaktualität Folgendes verwenden Amazon EventBridge um die Jobs in Ihrem Data Warehouse zu planen, um die oben genannten aufzurufen SP_Orders_Load gespeicherte Prozedur regelmäßig. EventBridge führt dies in festen Abständen durch, und möglicherweise benötigen Sie einen Mechanismus (z. B. eine AWS Step-Funktionen Zustandsmaschine), um zu überwachen, ob der vorherige Aufruf der Prozedur abgeschlossen wurde. Weitere Informationen finden Sie unter Erstellen einer Amazon EventBridge-Regel, die nach einem Zeitplan ausgeführt wird. Sie können sich auch darauf beziehen Beschleunigen Sie die Orchestrierung eines ELT-Prozesses mit AWS Step Functions und der Amazon Redshift Data API. Eine andere Möglichkeit ist die Verwendung Amazon Redshift-Abfrage-Editor v2 um die Aktualisierung zu planen. Einzelheiten finden Sie unter Planen einer Abfrage mit dem Abfrageeditor v2.

Zusammenfassung

In diesem Beitrag haben wir Best Practices zur Implementierung von Analysen nahezu in Echtzeit mithilfe der Streaming-Aufnahme von Amazon Redshift mit Amazon MSK besprochen. Wir haben Ihnen eine Beispielpipeline gezeigt, um Daten aus einem MSK-Thema mithilfe der Streaming-Aufnahme in Amazon Redshift aufzunehmen. Wir haben auch eine zuverlässige Strategie zum inkrementellen Laden von Streaming-Daten in Amazon Redshift mithilfe von Kafka Partition und Kafka Offset gezeigt. Darüber hinaus haben wir die Schritte zum Konfigurieren der kontoübergreifenden Streaming-Aufnahme von Amazon MSK zu Amazon Redshift demonstriert und Leistungsüberlegungen für eine optimierte Aufnahmerate besprochen. Abschließend haben wir Überwachungstechniken besprochen, um Fehler bei der Streaming-Aufnahme von Amazon Redshift zu verfolgen.

Wenn Sie Fragen haben, hinterlassen Sie diese im Kommentarbereich.


Über die Autoren

Poulomi Dasgupta ist Senior Analytics Solutions Architect bei AWS. Sie unterstützt leidenschaftlich gerne Kunden beim Aufbau cloudbasierter Analyselösungen zur Lösung ihrer Geschäftsprobleme. Außerhalb der Arbeit reist sie gerne und verbringt Zeit mit ihrer Familie.

Adekunle Adedotun ist Senior Database Engineer beim Amazon Redshift-Service. Er arbeitet seit 6 Jahren an MPP-Datenbanken mit Schwerpunkt auf Leistungsoptimierung. Außerdem berät er das Entwicklungsteam bei neuen und bestehenden Servicefunktionen.

spot_img

VC-Café

VC-Café

Neueste Intelligenz

spot_img