Logo Zéphyrnet

Bonnes pratiques pour mettre en œuvre des analyses en temps quasi réel à l'aide de l'ingestion de streaming Amazon Redshift avec Amazon MSK | Services Web Amazon

Date :

Redshift d'Amazon est un entrepôt de données cloud entièrement géré et évolutif qui accélère votre accès aux informations grâce à des analyses rapides, simples et sécurisées à grande échelle. Des dizaines de milliers de clients s'appuient sur Amazon Redshift pour analyser des exaoctets de données et exécuter des requêtes analytiques complexes, ce qui en fait l'entrepôt de données cloud le plus largement utilisé. Vous pouvez exécuter et faire évoluer des analyses en quelques secondes sur toutes vos données, sans avoir à gérer votre infrastructure d'entrepôt de données.

Vous pouvez utiliser le Ingestion de flux Amazon Redshift capacité de mettre à jour vos bases de données analytiques en temps quasi réel. L'ingestion de streaming Amazon Redshift simplifie les pipelines de données en vous permettant de créer des vues matérialisées directement au-dessus des flux de données. Grâce à cette fonctionnalité dans Amazon Redshift, vous pouvez utiliser le langage de requête structuré (SQL) pour vous connecter et ingérer directement des données à partir de flux de données, tels que Flux de données Amazon Kinesis or Amazon Managed Streaming pour Apache Kafka (Amazon MSK) et extrayez les données directement vers Amazon Redshift.

Dans cet article, nous discutons des meilleures pratiques pour mettre en œuvre des analyses en temps quasi réel à l'aide de l'ingestion de streaming Amazon Redshift avec Amazon MSK.

Présentation de la solution

Nous passons en revue un exemple de pipeline pour ingérer des données d'une rubrique MSK dans Amazon Redshift à l'aide de l'ingestion de streaming Amazon Redshift. Nous montrons également comment désimbriquer les données JSON à l'aide de la notation par points dans Amazon Redshift. Le diagramme suivant illustre notre architecture de solution.

Le flux de processus comprend les étapes suivantes :

  1. Créez une vue matérialisée en streaming dans votre cluster Redshift pour consommer les données de streaming en direct à partir des sujets MSK.
  2. Utilisez une procédure stockée pour implémenter la capture des données modifiées (CDC) à l'aide de la combinaison unique de Kafka Partition et Kafka Offset au niveau de l'enregistrement pour la rubrique MSK ingérée.
  3. Créez une table destinée à l'utilisateur dans le cluster Redshift et utilisez la notation par points pour dissocier le document JSON de la vue matérialisée en streaming dans les colonnes de données de la table. Vous pouvez charger en permanence de nouvelles données en appelant la procédure stockée à intervalles réguliers.
  4. Établir la connectivité entre un Amazon QuickSight tableau de bord et Amazon Redshift pour fournir une visualisation et des informations.

Dans le cadre de cet article, nous abordons également les sujets suivants :

  • Étapes pour configurer l'ingestion de streaming entre comptes d'Amazon MSK vers Amazon Redshift
  • Bonnes pratiques pour obtenir des performances optimisées à partir de vues matérialisées en streaming
  • Techniques de surveillance pour suivre les échecs de l'ingestion de streaming Amazon Redshift

Pré-requis

Vous devez avoir les éléments suivants:

  • Un compte AWS.
  • L'une des ressources suivantes, selon votre cas d'utilisation :
  • Un cluster MSK. Pour obtenir des instructions, reportez-vous à Créer un cluster Amazon MSK.
  • Un sujet dans votre cluster MSK où votre producteur de données peut publier des données.
  • Un producteur de données pour écrire des données dans le sujet de votre cluster MSK.

Considérations lors de la configuration de votre sujet MSK

Gardez à l’esprit les considérations suivantes lors de la configuration de votre sujet MSK :

  • Assurez-vous que le nom de votre sujet MSK ne dépasse pas 128 caractères.
  • Au moment d'écrire ces lignes, les enregistrements MSK contenant des données compressées ne peuvent pas être directement interrogés dans Amazon Redshift. Amazon Redshift ne prend en charge aucune méthode de décompression native pour les données compressées côté client dans une rubrique MSK.
  • FOLLOW les meilleures pratiques lors de la configuration de votre cluster MSK.
  • Vérifier l'ingestion de streaming limites pour toute autre considération.

Configurer l'ingestion de streaming

Pour configurer l'ingestion de streaming, procédez comme suit :

  1. Mettre en place le Gestion des identités et des accès AWS (IAM) et stratégie de confiance requises pour l’ingestion de streaming. Pour obtenir des instructions, reportez-vous au Configuration d'IAM et réalisation de l'ingestion de streaming à partir de Kafka.
  2. Assurez-vous que les données circulent dans votre sujet MSK en utilisant Amazon Cloud Watch métrique (par exemple, OctetsOutPerSec).
  3. Lancez l'éditeur de requêtes v2 à partir de la console Amazon Redshift ou utilisez votre client SQL préféré pour vous connecter à votre cluster Redshift pour les étapes suivantes. Les étapes suivantes ont été exécutées dans l'éditeur de requêtes v2.
  4. Créez un schéma externe à mapper au cluster MSK. Remplacez l'ARN de votre rôle IAM et l'ARN du cluster MSK dans l'instruction suivante :
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  'iam-role-arn' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn';
    

  5. Facultativement, si les noms de vos sujets sont sensibles à la casse, vous devez activer enable_case_sensitive_identifier pour pouvoir y accéder dans Amazon Redshift. Pour utiliser des identifiants sensibles à la casse, définissez enable_case_sensitive_identifier sur true au niveau de la session, de l'utilisateur ou du cluster :
    SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;

  6. Créez une vue matérialisée pour consommer les données de flux de la rubrique MSK :
    CREATE MATERIALIZED VIEW Orders_Stream_MV AS
    SELECT kafka_partition, 
     kafka_offset, 
     refresh_time,
     JSON_PARSE(kafka_value) as Data
    FROM custschema."ORDERTOPIC"
    WHERE CAN_JSON_PARSE(kafka_value);
    

La colonne métadonnées kafka_value qui arrive d'Amazon MSK est stocké dans VARBYTE format dans Amazon Redshift. Pour cet article, vous utilisez le JSON_PARSE fonction pour convertir kafka_value à Type de données SUPER. Vous utilisez également le CAN_JSON_PARSE fonctionne dans la condition de filtre pour ignorer les enregistrements JSON non valides et se prémunir contre les erreurs dues aux échecs d’analyse JSON. Nous expliquons plus loin dans cet article comment stocker les données non valides pour un débogage futur.

  1. Actualisez la vue matérialisée en streaming, ce qui déclenche la lecture par Amazon Redshift de la rubrique MSK et le chargement des données dans la vue matérialisée :
    REFRESH MATERIALIZED VIEW Orders_Stream_MV;

Vous pouvez également configurer votre vue matérialisée de streaming pour qu'elle utilise les fonctionnalités d'actualisation automatique. Cela actualisera automatiquement votre vue matérialisée à mesure que les données arriveront dans le flux. Voir CRÉER UNE VUE MATÉRIALISÉE pour obtenir des instructions permettant de créer une vue matérialisée avec actualisation automatique.

Désimbriquer le document JSON

Voici un exemple de document JSON ingéré depuis la rubrique MSK vers la colonne Données de type SUPER dans la vue matérialisée en streaming. Orders_Stream_MV:

{
   "EventType":"Orders",
   "OrderID":"103",
   "CustomerID":"C104",
   "CustomerName":"David Smith",
   "OrderDate":"2023-09-02",
   "Store_Name":"Store-103",
   "ProductID":"P004",
   "ProductName":"Widget-X-003",
   "Quatity":"5",
   "Price":"2500",
   "OrderStatus":"Initiated"
}

Utilisez la notation par points comme indiqué dans le code suivant pour désimbriquer votre charge utile JSON :

SELECT 
    data."OrderID"::INT4 as OrderID
    ,data."ProductID"::VARCHAR(36) as ProductID
    ,data."ProductName"::VARCHAR(36) as ProductName
    ,data."CustomerID"::VARCHAR(36) as CustomerID
    ,data."CustomerName"::VARCHAR(36) as CustomerName
    ,data."Store_Name"::VARCHAR(36) as Store_Name
    ,data."OrderDate"::TIMESTAMPTZ as OrderDate
    ,data."Quatity"::INT4 as Quatity
    ,data."Price"::DOUBLE PRECISION as Price
    ,data."OrderStatus"::VARCHAR(36) as OrderStatus
    ,"kafka_partition"::BIGINT  
    ,"kafka_offset"::BIGINT
FROM orders_stream_mv;

La capture d'écran suivante montre à quoi ressemble le résultat après le désimbrication.

Si vous avez des tableaux dans votre document JSON, envisagez de dissocier vos données en utilisant PartiQL déclarations dans Amazon Redshift. Pour plus d'informations, reportez-vous à la rubrique Désimbriquer le document JSON dans la poste Analyses en temps quasi réel à l'aide de l'ingestion de streaming Amazon Redshift avec Amazon Kinesis Data Streams et Amazon DynamoDB.

Stratégie de chargement de données incrémentielle

Effectuez les étapes suivantes pour implémenter un chargement de données incrémentiel :

  1. Créez un tableau appelé Commandes dans Amazon Redshift, que les utilisateurs finaux utiliseront pour la visualisation et l'analyse commerciale :
    CREATE TABLE public.Orders (
        orderid integer ENCODE az64,
        productid character varying(36) ENCODE lzo,
        productname character varying(36) ENCODE lzo,
        customerid character varying(36) ENCODE lzo,
        customername character varying(36) ENCODE lzo,
        store_name character varying(36) ENCODE lzo,
        orderdate timestamp with time zone ENCODE az64,
        quatity integer ENCODE az64,
        price double precision ENCODE raw,
        orderstatus character varying(36) ENCODE lzo
    ) DISTSTYLE AUTO;
    

Ensuite, vous créez une procédure stockée appelée SP_Orders_Load pour implémenter CDC à partir d'une vue matérialisée en streaming et charger dans le fichier final Orders tableau. Vous utilisez la combinaison de Kafka_Partition ainsi que Kafka_Offset disponible dans la vue matérialisée en streaming sous forme de colonnes système pour implémenter CDC. La combinaison de ces deux colonnes sera toujours unique au sein d'un sujet MSK, ce qui garantit qu'aucun enregistrement n'est oublié pendant le processus. La procédure stockée contient les composants suivants :

  • Pour utiliser des identifiants sensibles à la casse, définissez enable_case_sensitive_identifier sur true au niveau de la session, de l'utilisateur ou du cluster.
  • Actualisez manuellement la vue matérialisée en streaming si l'actualisation automatique n'est pas activée.
  • Créez une table d'audit appelée Orders_Streaming_Audit s'il n'existe pas pour garder une trace du dernier décalage d'une partition chargée dans la table Commandes lors de la dernière exécution de la procédure stockée.
  • Désimbriquez et insérez uniquement les données nouvelles ou modifiées dans une table intermédiaire appelée Orders_Staging_Table, lecture à partir de la vue matérialisée en streaming Orders_Stream_MV, Où Kafka_Offset est supérieur au dernier traité Kafka_Offset enregistré dans le tableau d'audit Orders_Streaming_Audit pour le Kafka_Partition être en cours de traitement.
  • Lors du premier chargement à l'aide de cette procédure stockée, il n'y aura aucune donnée dans le Orders_Streaming_Audit tableau et toutes les données de Orders_Stream_MV sera chargé dans le tableau des commandes.
  • Insérez uniquement les colonnes pertinentes pour l'entreprise dans le message destiné aux utilisateurs. Orders table, sélection dans la table intermédiaire Orders_Staging_Table.
  • Insérez le maximum Kafka_Offset pour chaque chargé Kafka_Partition dans la table d'audit Orders_Streaming_Audit

Nous avons ajouté la table de mise en scène intermédiaire Orders_Staging_Table dans cette solution pour aider au débogage en cas de pannes inattendues et de traçabilité. Sauter l'étape de préparation et charger directement dans la table finale à partir de Orders_Stream_MV peut fournir une latence plus faible en fonction de votre cas d'utilisation.

  1. Créez la procédure stockée avec le code suivant :
    CREATE OR REPLACE PROCEDURE SP_Orders_Load()
        AS $$
        BEGIN
    
        SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;
        REFRESH MATERIALIZED VIEW Orders_Stream_MV;
    
        --create an audit table if not exists to keep track of Max Offset per Partition that was loaded into Orders table  
    
        CREATE TABLE IF NOT EXISTS Orders_Streaming_Audit
        (
        "kafka_partition" BIGINT,
        "kafka_offset" BIGINT
        )
        SORTKEY("kafka_partition", "kafka_offset"); 
    
        DROP TABLE IF EXISTS Orders_Staging_Table;  
    
        --Insert only newly available data into staging table from streaming View based on the max offset for new/existing partitions
      --When loading for 1st time i.e. there is no data in Orders_Streaming_Audit table then all the data gets loaded from streaming View  
        CREATE TABLE Orders_Staging_Table as 
        SELECT 
        data."OrderID"."N"::INT4 as OrderID
        ,data."ProductID"."S"::VARCHAR(36) as ProductID
        ,data."ProductName"."S"::VARCHAR(36) as ProductName
        ,data."CustomerID"."S"::VARCHAR(36) as CustomerID
        ,data."CustomerName"."S"::VARCHAR(36) as CustomerName
        ,data."Store_Name"."S"::VARCHAR(36) as Store_Name
        ,data."OrderDate"."S"::TIMESTAMPTZ as OrderDate
        ,data."Quatity"."N"::INT4 as Quatity
        ,data."Price"."N"::DOUBLE PRECISION as Price
        ,data."OrderStatus"."S"::VARCHAR(36) as OrderStatus
        , s."kafka_partition"::BIGINT , s."kafka_offset"::BIGINT
        FROM Orders_Stream_MV s
        LEFT JOIN (
        SELECT
        "kafka_partition",
        MAX("kafka_offset") AS "kafka_offset"
        FROM Orders_Streaming_Audit
        GROUP BY "kafka_partition"
        ) AS m
        ON nvl(s."kafka_partition",0) = nvl(m."kafka_partition",0)
        WHERE
        m."kafka_offset" IS NULL OR
        s."kafka_offset" > m."kafka_offset";
    
        --Insert only business relevant column to final table selecting from staging table
        Insert into Orders 
        SELECT 
        OrderID
        ,ProductID
        ,ProductName
        ,CustomerID
        ,CustomerName
        ,Store_Name
        ,OrderDate
        ,Quatity
        ,Price
        ,OrderStatus
        FROM Orders_Staging_Table;
    
        --Insert the max kafka_offset for every loaded Kafka partitions into Audit table 
        INSERT INTO Orders_Streaming_Audit
        SELECT kafka_partition, MAX(kafka_offset)
        FROM Orders_Staging_Table
        GROUP BY kafka_partition;   
    
        END;
        $$ LANGUAGE plpgsql;
    

  2. Exécutez la procédure stockée pour charger les données dans le Orders table:
    call SP_Orders_Load();

  3. Validez les données dans le tableau Commandes.

Établir l'ingestion de streaming entre comptes

Si votre cluster MSK appartient à un autre compte, suivez les étapes suivantes pour créer des rôles IAM afin de configurer l'ingestion de streaming entre comptes. Supposons que le cluster Redshift se trouve dans le compte A et que le cluster MSK se trouve dans le compte B, comme indiqué dans le diagramme suivant.

Effectuez les étapes suivantes:

  1. Dans le compte B, créez un rôle IAM appelé MyRedshiftMSKRole qui permet à Amazon Redshift (compte A) de communiquer avec le cluster MSK (compte B) nommé MyTestCluster. Selon que votre cluster MSK utilise l'authentification IAM ou un accès non authentifié pour se connecter, vous devez créer un rôle IAM avec l'une des stratégies suivantes :
    • Un MAI policAmazonAmazon MSK utilisant un accès non authentifié :
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }

    • Une stratégie IAM pour Amazon MSK lors de l'utilisation de l'authentification IAM :
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKIAMpolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka-cluster:ReadData",
                      "kafka-cluster:DescribeTopic",
                      "kafka-cluster:Connect"
                  ],
                  "Resource": [
                      "arn:aws:kafka:us-east-1:0123456789:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1",
                      "arn:aws:kafka:us-east-1:0123456789:topic/MyTestCluster/*"
                  ]
              },
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }
      

La section ressources de l'exemple précédent donne accès à toutes les rubriques du MyTestCluster Cluster MSK. Si vous devez restreindre le rôle IAM à des sujets spécifiques, vous devez remplacer la ressource de sujet par une stratégie de ressources plus restrictive.

  1. Après avoir créé le rôle IAM dans le compte B, prenez note de l'ARN du rôle IAM (par exemple, arn:aws:iam::0123456789:role/MyRedshiftMSKRole).
  2. Dans le compte A, créez un rôle IAM personnalisable Redshift appelé MyRedshiftRole, qu'Amazon Redshift assumera lors de la connexion à Amazon MSK. Le rôle doit avoir une stratégie comme celle-ci, qui permet au rôle IAM Amazon Redshift dans le compte A d'assumer le rôle Amazon MSK dans le compte B :
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "RedshiftMSKAssumePolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::0123456789:role/MyRedshiftMSKRole"        
           }
        ]
    }
    

  3. Prenez note de l'ARN du rôle Amazon Redshift IAM (par exemple, arn:aws:iam::9876543210:role/MyRedshiftRole).
  4. Revenez au compte B et ajoutez ce rôle dans la politique de confiance du rôle IAM arn:aws:iam::0123456789:role/MyRedshiftMSKRole pour permettre au compte B d'approuver le rôle IAM du compte A. La stratégie d'approbation doit ressembler au code suivant :
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": "sts:AssumeRole",
          "Principal": {
            "AWS": "arn:aws:iam::9876543210:role/MyRedshiftRole"
          }
        }
      ]
    } 
    

  5. Connectez-vous à la console Amazon Redshift en tant que compte A.
  6. Lancez l'éditeur de requêtes v2 ou votre client SQL préféré et exécutez les instructions suivantes pour accéder à la rubrique MSK dans le compte B. Pour mapper au cluster MSK, créez un schéma externe à l'aide de enchaînement de rôles en spécifiant les ARN du rôle IAM, séparés par une virgule sans aucun espace autour. Le rôle attaché au cluster Redshift arrive en premier dans la chaîne.
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  
    'arn:aws:iam::9876543210:role/MyRedshiftRole,arn:aws:iam::0123456789:role/MyRedshiftMSKRole' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn'; --replace with ARN of MSK cluster 
    

Considérations relatives aux performances

Gardez à l’esprit les considérations de performances suivantes :

  • Gardez la vue matérialisée de streaming simple et déplacez les transformations telles que la désimbrication, l'agrégation et les expressions de cas à une étape ultérieure, par exemple en créant une autre vue matérialisée par-dessus la vue matérialisée de streaming.
  • Envisagez de créer une seule vue matérialisée en streaming dans un seul cluster ou groupe de travail Redshift pour un sujet MSK donné. La création de plusieurs vues matérialisées par rubrique MSK peut ralentir les performances d'ingestion, car chaque vue matérialisée devient un consommateur pour cette rubrique et partage la bande passante Amazon MSK pour cette rubrique. Les données de diffusion en direct dans une vue matérialisée en streaming peuvent être partagées entre plusieurs clusters Redshift ou groupes de travail Redshift Serverless à l'aide de partage de données.
  • Lors de la définition de votre vue matérialisée en streaming, évitez d'utiliser Json_Extract_Path_Text pour pré-déchiqueter les données, car Json_extract_path_text fonctionne sur les données ligne par ligne, ce qui a un impact significatif sur le débit d'ingestion. Il est préférable de récupérer les données telles quelles à partir du flux, puis de les détruire ultérieurement.
  • Dans la mesure du possible, envisagez d’ignorer la clé de tri dans la vue matérialisée en streaming pour accélérer la vitesse d’ingestion. Lorsqu'une vue matérialisée en streaming possède une clé de tri, une opération de tri se produit avec chaque lot de données ingérées à partir du flux. Le tri a des performances entendues en fonction du type de données de clé de tri, du nombre de colonnes de clé de tri et de la quantité de données ingérées dans chaque lot. Cette étape de tri peut augmenter la latence avant que les données de streaming ne soient disponibles pour interrogation. Vous devez déterminer ce qui est le plus important : la latence lors de l'ingestion ou la latence lors de l'interrogation des données.
  • Pour optimiser les performances de la vue matérialisée en streaming et réduire l'utilisation du stockage, purgez occasionnellement les données de la vue matérialisée à l'aide de effacer, tronquerou modifier le tableau ajouter.
  • Si vous devez ingérer plusieurs sujets MSK en parallèle dans Amazon Redshift, commencez avec un plus petit nombre de vues matérialisées en streaming et continuez à ajouter davantage de vues matérialisées pour évaluer les performances globales d'ingestion au sein d'un cluster ou d'un groupe de travail.
  • L'augmentation du nombre de nœuds dans un cluster provisionné par Redshift ou du RPU de base d'un groupe de travail Redshift Serverless peut contribuer à améliorer les performances d'ingestion d'une vue matérialisée en streaming. Pour des performances optimales, vous devez viser à avoir autant de tranches dans votre cluster provisionné Redshift qu'il y a de partitions dans votre rubrique MSK, ou 8 RPU pour quatre partitions dans votre rubrique MSK.

Techniques de surveillance

Les enregistrements de la rubrique qui dépassent la taille de la colonne de vue matérialisée cible au moment de l'ingestion seront ignorés. Les enregistrements ignorés par l'actualisation de la vue matérialisée seront consignés dans le SYS_STREAM_SCAN_ERRORS tableau système.

Les erreurs qui se produisent lors du traitement d'un enregistrement en raison d'un calcul, d'une conversion de type de données ou d'une autre logique dans la définition de la vue matérialisée entraîneront un échec d'actualisation de la vue matérialisée jusqu'à ce que l'enregistrement incriminé ait expiré de la rubrique. Pour éviter ce type de problèmes, testez soigneusement la logique de votre définition de vue matérialisée ; sinon, placez les enregistrements dans la colonne VARBYTE par défaut et traitez-les plus tard.

Les vues de surveillance suivantes sont disponibles :

  • SYS_MV_REFRESH_HISTORY – Utilisez cette vue pour collecter des informations sur l'historique d'actualisation de vos vues matérialisées en streaming. Les résultats incluent le type d'actualisation, tel que manuel ou automatique, et l'état de l'actualisation la plus récente. La requête suivante affiche l'historique d'actualisation pour une vue matérialisée en streaming :
    select mv_name, refresh_type, status, duration  from SYS_MV_REFRESH_HISTORY where mv_name='mv_store_sales'

  • SYS_STREAM_SCAN_ERRORS – Utilisez cette vue pour vérifier la raison pour laquelle un enregistrement n'a pas pu être chargé via l'ingestion de streaming à partir d'une rubrique MSK. Au moment de la rédaction de cet article, lors de l'ingestion depuis Amazon MSK, cette vue enregistre uniquement les erreurs lorsque l'enregistrement est plus grand que la taille de la colonne de la vue matérialisée. Cette vue affichera également l'identifiant unique (offset) de l'enregistrement MSK dans la colonne de position. La requête suivante affiche le code d'erreur et la raison de l'erreur lorsqu'un enregistrement dépasse la limite de taille maximale :
    select mv_name, external_schema_name, stream_name, record_time, query_id, partition_id, "position", error_code, error_reason
    from SYS_STREAM_SCAN_ERRORS  where mv_name='test_mv' and external_schema_name ='streaming_schema'	;
    

  • SYS_STREAM_SCAN_STATES – Utilisez cette vue pour surveiller le nombre d'enregistrements analysés à une heure d'enregistrement donnée. Cette vue suit également le décalage du dernier enregistrement lu dans le lot. La requête suivante affiche les données de rubrique pour une vue matérialisée spécifique :
    select mv_name,external_schema_name,stream_name,sum(scanned_rows) total_records,
    sum(scanned_bytes) total_bytes 
    from SYS_STREAM_SCAN_STATES where mv_name='test_mv' and external_schema_name ='streaming_schema' group by 1,2,3;
    

  • SYS_QUERY_HISTORY – Utilisez cette vue pour vérifier les métriques globales pour une actualisation de vue matérialisée en streaming. Cela enregistrera également les erreurs dans la colonne error_message pour les erreurs qui n'apparaissent pas dans SYS_STREAM_SCAN_ERRORS. La requête suivante affiche l'erreur provoquant l'échec de l'actualisation d'une vue matérialisée en streaming :
    select  query_id, query_type, status, query_text, error_message from sys_query_history where status='failed' and start_time>='2024-02-03 03:18:00' order by start_time desc

Considérations supplémentaires pour la mise en œuvre

Vous avez le choix de générer éventuellement une vue matérialisée au-dessus d'une vue matérialisée en continu, ce qui vous permet de désimbriquer et de précalculer les résultats pour les utilisateurs finaux. Cette approche élimine le besoin de stocker les résultats dans une table finale à l'aide d'une procédure stockée.

Dans cet article, vous utilisez le Fonction CAN_JSON_PARSE pour se prémunir contre toute erreur permettant d'ingérer plus efficacement les données : dans ce cas, les enregistrements de streaming qui ne peuvent pas être analysés sont ignorés par Amazon Redshift. Toutefois, si vous souhaitez conserver une trace de vos enregistrements d'erreurs, envisagez de les stocker dans une colonne à l'aide du code SQL suivant lors de la création de la vue matérialisée en continu :

CREATE MATERIALIZED VIEW Orders_Stream_MV AS 
SELECT
kafka_partition, 
kafka_offset, 
refresh_time, 
JSON_PARSE(kafka_value) as Data 
case when CAN_JSON_PARSE(kafka_value) = true then json_parse(kafka_value) end Data,
case when CAN_JSON_PARSE(kafka_value) = false then kafka_value end Invalid_Data
FROM custschema."ORDERTOPIC";

Vous pouvez aussi envisager déchargement de données de la vue SYS_STREAM_SCAN_ERRORS dans un Service de stockage simple Amazon (Amazon S3) et recevez des alertes par envoyer un rapport par e-mail en utilisant Service de notification simple d'Amazon (Amazon SNS) chaque fois qu'un nouvel objet S3 est créé.

Enfin, en fonction de vos besoins en matière de fraîcheur des données, vous pouvez utiliser Amazon Event Bridge pour planifier les travaux dans votre entrepôt de données pour appeler ce qui précède SP_Orders_Load procédure stockée de manière régulière. EventBridge le fait à intervalles fixes et vous devrez peut-être disposer d'un mécanisme (par exemple, un Fonctions d'étape AWS machine à états) pour surveiller si l’appel précédent à la procédure est terminé. Pour plus d'informations, reportez-vous à Création d'une règle Amazon EventBridge qui s'exécute selon un calendrier. Vous pouvez également vous référer à Accélérez l'orchestration d'un processus ELT à l'aide d'AWS Step Functions et de l'API Amazon Redshift Data. Une autre option consiste à utiliser Éditeur de requête Amazon Redshift v2 pour planifier l'actualisation. Pour plus de détails, reportez-vous à Planifier une requête avec l'éditeur de requêtes v2.

Conclusion

Dans cet article, nous avons discuté des meilleures pratiques pour mettre en œuvre des analyses en temps quasi réel à l'aide de l'ingestion de streaming Amazon Redshift avec Amazon MSK. Nous vous avons montré un exemple de pipeline pour ingérer des données d'une rubrique MSK dans Amazon Redshift à l'aide de l'ingestion de streaming. Nous avons également montré une stratégie fiable pour effectuer un chargement incrémentiel de données en streaming dans Amazon Redshift à l'aide de Kafka Partition et Kafka Offset. De plus, nous avons démontré les étapes de configuration de l'ingestion de streaming entre comptes d'Amazon MSK vers Amazon Redshift et discuté des considérations de performances pour un taux d'ingestion optimisé. Enfin, nous avons discuté des techniques de surveillance permettant de suivre les échecs d'ingestion de streaming Amazon Redshift.

Si vous avez des questions, laissez-les dans la section commentaires.


À propos des auteurs

Poulomi Dasgupta est architecte senior de solutions analytiques chez AWS. Elle se passionne pour aider les clients à créer des solutions d'analyse basées sur le cloud pour résoudre leurs problèmes commerciaux. En dehors du travail, elle aime voyager et passer du temps avec sa famille.

Adekunle Adedotun est un ingénieur de base de données senior avec le service Amazon Redshift. Il travaille sur les bases de données MPP depuis 6 ans, en se concentrant sur l'optimisation des performances. Il fournit également des conseils à l'équipe de développement pour les fonctionnalités de service nouvelles et existantes.

spot_img

Dernières informations

spot_img