ゼファーネットのロゴ

Amazon MSK での Amazon Redshift ストリーミング取り込みを使用して、ほぼリアルタイムの分析を実装するためのベストプラクティス |アマゾン ウェブ サービス

日付:

Amazonレッドシフト は、フルマネージドでスケーラブルなクラウド データ ウェアハウスであり、大規模な高速かつ簡単かつ安全な分析により、洞察を得るまでの時間を短縮します。数万の顧客が Amazon Redshift を利用してエクサバイト規模のデータを分析し、複雑な分析クエリを実行しており、Amazon Redshift は最も広く使用されているクラウド データ ウェアハウスとなっています。データ ウェアハウス インフラストラクチャを管理することなく、すべてのデータに対して数秒で分析を実行および拡張できます。

あなたが使用することができます AmazonRedshiftストリーミングの取り込み 分析データベースをほぼリアルタイムで更新する機能。 Amazon Redshift ストリーミング インジェストでは、データ ストリーム上にマテリアライズド ビューを直接作成できるため、データ パイプラインが簡素化されます。 Amazon Redshift のこの機能を使用すると、構造化照会言語 (SQL) を使用して、次のようなデータ ストリームに接続し、そこからデータを直接取り込むことができます。 Amazon Kinesisデータストリーム or ApacheKafkaのAmazonマネージドストリーミング (Amazon MSK) データ ストリームを処理し、データを Amazon Redshift に直接プルします。

この投稿では、Amazon MSK での Amazon Redshift ストリーミング取り込みを使用して、ほぼリアルタイムの分析を実装するためのベストプラクティスについて説明します。

ソリューションの概要

Amazon Redshift ストリーミング取り込みを使用して MSK トピックから Amazon Redshift にデータを取り込むパイプラインの例を見ていきます。また、Amazon Redshift でドット表記を使用して JSON データのネストを解除する方法も示します。次の図は、ソリューション アーキテクチャを示しています。

プロセス フローは次のステップで構成されます。

  1. Redshift クラスターにストリーミング マテリアライズド ビューを作成して、MSK トピックからのライブ ストリーミング データを使用します。
  2. ストアド プロシージャを使用して、取り込まれた MSK トピックのレコード レベルで Kafka パーティションと Kafka オフセットの一意の組み合わせを使用して変更データ キャプチャ (CDC) を実装します。
  3. Redshift クラスターにユーザー向けテーブルを作成し、ドット表記を使用して、ストリーミング マテリアライズド ビューからテーブルのデータ列に JSON ドキュメントをネスト解除します。ストアド プロシージャを定期的に呼び出すことで、新しいデータを継続的にロードできます。
  4. 接続を確立する の間 アマゾンクイックサイト ダッシュボードと Amazon Redshift を使用して、視覚化と洞察を提供します。

この投稿の一部として、次のトピックについても説明します。

  • Amazon MSK から Amazon Redshift へのクロスアカウント ストリーミング インジェストを設定する手順
  • ストリーミング・マテリアライズド・ビューから最適化されたパフォーマンスを実現するためのベスト・プラクティス
  • Amazon Redshift ストリーミング取り込みの失敗を追跡するモニタリング手法

前提条件

次のものが必要です。

  • AWSアカウント.
  • ユースケースに応じて、次のリソースのいずれか:
  • MSK クラスター。手順については、を参照してください。 Amazon MSK クラスターを作成する.
  • トピック データ プロデューサーがデータを公開できる MSK クラスター内。
  • MSK クラスター内のトピックにデータを書き込むデータ プロデューサー。

MSK トピックを設定する際の考慮事項

MSK トピックを構成するときは、次の考慮事項に留意してください。

  • MSK トピックの名前が 128 文字以下であることを確認してください。
  • この記事の執筆時点では、圧縮データを含む MSK レコードを Amazon Redshift で直接クエリすることはできません。 Amazon Redshift は、MSK トピック内のクライアント側の圧縮データに対するネイティブの解凍方法をサポートしていません。
  • 続きます ベストプラクティス MSK クラスターのセットアップ中。
  • ストリーミング取り込みを確認する 制限 その他の考慮事項については。

ストリーミング取り込みを設定する

ストリーミング インジェストを設定するには、次の手順を実行します。

  1. セットアップ AWS IDおよびアクセス管理 ストリーミング取り込みに必要な (IAM) ロールと信頼ポリシー。手順については、を参照してください。 IAM のセットアップと Kafka からのストリーミング インジェストの実行.
  2. 次を使用して、データが MSK トピックに流れ込んでいることを確認してください。 アマゾンクラウドウォッチ メトリクス (例えば、 1秒あたりの出力バイト数).
  3. 次のステップのために、Amazon Redshift コンソールからクエリエディター v2 を起動するか、好みの SQL クライアントを使用して Redshift クラスターに接続します。次の手順はクエリ エディター v2 で実行されました。
  4. MSK クラスターにマップする外部スキーマを作成します。次のステートメントの IAM ロール ARN と MSK クラスター ARN を置き換えます。
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  'iam-role-arn' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn';
    

  5. トピック名で大文字と小文字が区別される場合は、必要に応じて、 enable_case_sensitive_identifier Amazon Redshift でアクセスできるようになります。大文字と小文字を区別する識別子を使用するには、次のように設定します。 enable_case_sensitive_identifier セッション、ユーザー、またはクラスター レベルのいずれかで true に設定します。
    SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;

  6. MSK トピックからのストリーム データを消費するマテリアライズド ビューを作成します。
    CREATE MATERIALIZED VIEW Orders_Stream_MV AS
    SELECT kafka_partition, 
     kafka_offset, 
     refresh_time,
     JSON_PARSE(kafka_value) as Data
    FROM custschema."ORDERTOPIC"
    WHERE CAN_JSON_PARSE(kafka_value);
    

メタデータ列 kafka_value Amazon MSK から到着したものは、 ヴァーバイト Amazon Redshift の形式。この投稿では、 JSON_PARSE 変換する関数 kafka_valueSUPER データ型。また、 CAN_JSON_PARSE 無効な JSON レコードをスキップし、JSON 解析の失敗によるエラーを防ぐフィルター条件内の関数。今後のデバッグのために無効なデータを保存する方法については、この投稿の後半で説明します。

  1. ストリーミング マテリアライズド ビューを更新すると、Amazon Redshift が MSK トピックから読み取り、データをマテリアライズド ビューにロードします。
    REFRESH MATERIALIZED VIEW Orders_Stream_MV;

自動更新機能を使用するようにストリーミング マテリアライズド ビューを設定することもできます。 これにより、データがストリームに到着すると、マテリアライズド ビューが自動的に更新されます。 見る マテリアライズドビューを作成する 自動更新を使用してマテリアライズド ビューを作成する手順については、「

JSONドキュメントのネストを解除する

以下は、MSK トピックからストリーミング マテリアライズド ビューの SUPER タイプのデータ列に取り込まれた JSON ドキュメントのサンプルです。 Orders_Stream_MV:

{
   "EventType":"Orders",
   "OrderID":"103",
   "CustomerID":"C104",
   "CustomerName":"David Smith",
   "OrderDate":"2023-09-02",
   "Store_Name":"Store-103",
   "ProductID":"P004",
   "ProductName":"Widget-X-003",
   "Quatity":"5",
   "Price":"2500",
   "OrderStatus":"Initiated"
}

次のコードに示すようにドット表記を使用して、JSON ペイロードのネストを解除します。

SELECT 
    data."OrderID"::INT4 as OrderID
    ,data."ProductID"::VARCHAR(36) as ProductID
    ,data."ProductName"::VARCHAR(36) as ProductName
    ,data."CustomerID"::VARCHAR(36) as CustomerID
    ,data."CustomerName"::VARCHAR(36) as CustomerName
    ,data."Store_Name"::VARCHAR(36) as Store_Name
    ,data."OrderDate"::TIMESTAMPTZ as OrderDate
    ,data."Quatity"::INT4 as Quatity
    ,data."Price"::DOUBLE PRECISION as Price
    ,data."OrderStatus"::VARCHAR(36) as OrderStatus
    ,"kafka_partition"::BIGINT  
    ,"kafka_offset"::BIGINT
FROM orders_stream_mv;

次のスクリーンショットは、ネストを解除した後の結果がどのようになるかを示しています。

JSON ドキュメントに配列がある場合は、次を使用してデータのネストを解除することを検討してください。 パーティQL Amazon Redshift のステートメント。詳細については、「 JSONドキュメントのネストを解除する ポストで Amazon Kinesis Data Streams および Amazon DynamoDB による Amazon Redshift ストリーミング取り込みを使用した準リアルタイム分析.

増分データロード戦略

増分データ ロードを実装するには、次の手順を実行します。

  1. Amazon Redshift で Orders というテーブルを作成します。エンドユーザーはこのテーブルを視覚化とビジネス分析に使用します。
    CREATE TABLE public.Orders (
        orderid integer ENCODE az64,
        productid character varying(36) ENCODE lzo,
        productname character varying(36) ENCODE lzo,
        customerid character varying(36) ENCODE lzo,
        customername character varying(36) ENCODE lzo,
        store_name character varying(36) ENCODE lzo,
        orderdate timestamp with time zone ENCODE az64,
        quatity integer ENCODE az64,
        price double precision ENCODE raw,
        orderstatus character varying(36) ENCODE lzo
    ) DISTSTYLE AUTO;
    

次に、というストアド プロシージャを作成します。 SP_Orders_Load ストリーミング マテリアライズド ビューから CDC を実装し、最終的なビューにロードする Orders テーブル。の組み合わせを使用します Kafka_Partition & Kafka_Offset CDC を実装するためのシステム列としてストリーミング マテリアライズド ビューで使用できます。これら 2 つの列の組み合わせは MSK トピック内で常に一意となるため、プロセス中にレコードが失われることはありません。ストアド プロシージャには次のコンポーネントが含まれています。

  • 大文字と小文字を区別する識別子を使用するには、次のように設定します。 enable_case_sensitive_identifier セッション、ユーザー、またはクラスター レベルのいずれかで true に設定します。
  • 自動更新が有効になっていない場合は、ストリーミング マテリアライズド ビューを手動で更新します。
  • という監査テーブルを作成します。 Orders_Streaming_Audit ストアド プロシージャの最後の実行中に Orders テーブルに読み込まれたパーティションの最後のオフセットを追跡するためのフィールドが存在しない場合。
  • ネストを解除し、新しいデータまたは変更されたデータのみをステージング テーブルに挿入します。 Orders_Staging_Table、ストリーミング マテリアライズド ビューから読み取る Orders_Stream_MVここで、 Kafka_Offset 最後に処理されたものよりも大きいです Kafka_Offset 監査テーブルに記録される Orders_Streaming_Audit Kafka_Partition 処理中です。
  • このストアド プロシージャを使用して初めてロードするとき、ファイルにはデータがありません。 Orders_Streaming_Audit テーブルとすべてのデータ Orders_Stream_MV Orders テーブルにロードされます。
  • ビジネス関連の列のみをユーザー向けの列に挿入します。 Orders テーブル、ステージング テーブルから選択 Orders_Staging_Table.
  • 最大値を挿入します Kafka_Offset ロードされたすべての Kafka_Partition 監査テーブルに Orders_Streaming_Audit

中間ステージングテーブルを追加しました Orders_Staging_Table このソリューションでは、予期しない障害が発生した場合のデバッグと追跡に役立ちます。ステージング ステップをスキップし、最終テーブルに直接ロードします。 Orders_Stream_MV ユースケースに応じて、待ち時間を短縮できます。

  1. 次のコードを使用してストアド プロシージャを作成します。
    CREATE OR REPLACE PROCEDURE SP_Orders_Load()
        AS $$
        BEGIN
    
        SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;
        REFRESH MATERIALIZED VIEW Orders_Stream_MV;
    
        --create an audit table if not exists to keep track of Max Offset per Partition that was loaded into Orders table  
    
        CREATE TABLE IF NOT EXISTS Orders_Streaming_Audit
        (
        "kafka_partition" BIGINT,
        "kafka_offset" BIGINT
        )
        SORTKEY("kafka_partition", "kafka_offset"); 
    
        DROP TABLE IF EXISTS Orders_Staging_Table;  
    
        --Insert only newly available data into staging table from streaming View based on the max offset for new/existing partitions
      --When loading for 1st time i.e. there is no data in Orders_Streaming_Audit table then all the data gets loaded from streaming View  
        CREATE TABLE Orders_Staging_Table as 
        SELECT 
        data."OrderID"."N"::INT4 as OrderID
        ,data."ProductID"."S"::VARCHAR(36) as ProductID
        ,data."ProductName"."S"::VARCHAR(36) as ProductName
        ,data."CustomerID"."S"::VARCHAR(36) as CustomerID
        ,data."CustomerName"."S"::VARCHAR(36) as CustomerName
        ,data."Store_Name"."S"::VARCHAR(36) as Store_Name
        ,data."OrderDate"."S"::TIMESTAMPTZ as OrderDate
        ,data."Quatity"."N"::INT4 as Quatity
        ,data."Price"."N"::DOUBLE PRECISION as Price
        ,data."OrderStatus"."S"::VARCHAR(36) as OrderStatus
        , s."kafka_partition"::BIGINT , s."kafka_offset"::BIGINT
        FROM Orders_Stream_MV s
        LEFT JOIN (
        SELECT
        "kafka_partition",
        MAX("kafka_offset") AS "kafka_offset"
        FROM Orders_Streaming_Audit
        GROUP BY "kafka_partition"
        ) AS m
        ON nvl(s."kafka_partition",0) = nvl(m."kafka_partition",0)
        WHERE
        m."kafka_offset" IS NULL OR
        s."kafka_offset" > m."kafka_offset";
    
        --Insert only business relevant column to final table selecting from staging table
        Insert into Orders 
        SELECT 
        OrderID
        ,ProductID
        ,ProductName
        ,CustomerID
        ,CustomerName
        ,Store_Name
        ,OrderDate
        ,Quatity
        ,Price
        ,OrderStatus
        FROM Orders_Staging_Table;
    
        --Insert the max kafka_offset for every loaded Kafka partitions into Audit table 
        INSERT INTO Orders_Streaming_Audit
        SELECT kafka_partition, MAX(kafka_offset)
        FROM Orders_Staging_Table
        GROUP BY kafka_partition;   
    
        END;
        $$ LANGUAGE plpgsql;
    

  2. ストアド プロシージャを実行してデータを Orders テーブル:
    call SP_Orders_Load();

  3. Orders テーブルのデータを検証します。

クロスアカウントのストリーミング取り込みを確立する

MSK クラスターが別のアカウントに属している場合は、次の手順を実行して IAM ロールを作成し、クロスアカウント ストリーミング インジェストを設定します。次の図に示すように、Redshift クラスターがアカウント A にあり、MSK クラスターがアカウント B にあると仮定します。

次の手順を完了します。

  1. アカウント B で、という名前の IAM ロールを作成します。 MyRedshiftMSKRole これにより、Amazon Redshift (アカウント A) が、という名前の MSK クラスター (アカウント B) と通信できるようになります。 MyTestCluster。 MSK クラスターが接続に IAM 認証を使用するか非認証アクセスを使用するかに応じて、次のポリシーのいずれかを使用して IAM ロールを作成する必要があります。
    • IAM policAmazonAmazon 非認証アクセスを使用する MSK:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }

    • IAM 認証を使用する場合の Amazon MSK の IAM ポリシー:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKIAMpolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka-cluster:ReadData",
                      "kafka-cluster:DescribeTopic",
                      "kafka-cluster:Connect"
                  ],
                  "Resource": [
                      "arn:aws:kafka:us-east-1:0123456789:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1",
                      "arn:aws:kafka:us-east-1:0123456789:topic/MyTestCluster/*"
                  ]
              },
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }
      

前の例のリソース セクションでは、 MyTestCluster MSKクラスター。 IAM ロールを特定のトピックに制限する必要がある場合は、トピック リソースをより制限的なリソース ポリシーに置き換える必要があります。

  1. アカウント B に IAM ロールを作成したら、IAM ロール ARN をメモします (例: arn:aws:iam::0123456789:role/MyRedshiftMSKRole).
  2. アカウント A で、Redshift のカスタマイズ可能な IAM ロールを作成します。 MyRedshiftRole、Amazon Redshift は Amazon MSK に接続するときにそれを想定します。ロールには次のようなポリシーが必要です。これにより、アカウント A の Amazon Redshift IAM ロールがアカウント B の Amazon MSK ロールを引き受けることができます。
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "RedshiftMSKAssumePolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::0123456789:role/MyRedshiftMSKRole"        
           }
        ]
    }
    

  3. Amazon Redshift IAM ロールのロール ARN をメモします (例: arn:aws:iam::9876543210:role/MyRedshiftRole).
  4. アカウント B に戻り、このロールを IAM ロールの信頼ポリシーに追加します。 arn:aws:iam::0123456789:role/MyRedshiftMSKRole アカウント B がアカウント A の IAM ロールを信頼できるようにします。信頼ポリシーは次のコードのようになります。
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": "sts:AssumeRole",
          "Principal": {
            "AWS": "arn:aws:iam::9876543210:role/MyRedshiftRole"
          }
        }
      ]
    } 
    

  5. アカウント A として Amazon Redshift コンソールにサインインします。
  6. クエリ エディター v2 または任意の SQL クライアントを起動し、次のステートメントを実行してアカウント B の MSK トピックにアクセスします。MSK クラスターにマップするには、次を使用して外部スキーマを作成します。 役割連鎖 IAM ロール ARN を、前後にスペースを入れずにカンマで区切って指定します。 Redshift クラスターに関連付けられたロールは、チェーンの最初に配置されます。
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  
    'arn:aws:iam::9876543210:role/MyRedshiftRole,arn:aws:iam::0123456789:role/MyRedshiftMSKRole' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn'; --replace with ARN of MSK cluster 
    

パフォーマンスに関する考慮事項

次のパフォーマンスに関する考慮事項に留意してください。

  • ストリーミング マテリアライズド ビューを単純にし、ネスト解除、集計、ケース式などの変換を後のステップに移動します。たとえば、ストリーミング マテリアライズド ビューの上に別のマテリアライズド ビューを作成します。
  • 特定の MSK トピックに対して、単一の Redshift クラスターまたはワークグループにストリーミング マテリアライズド ビューを 1 つだけ作成することを検討してください。 MSK トピックごとに複数のマテリアライズド ビューを作成すると、各マテリアライズド ビューがそのトピックのコンシューマーとなり、そのトピックの Amazon MSK 帯域幅を共有するため、取り込みパフォーマンスが低下する可能性があります。ストリーミング マテリアライズド ビュー内のライブ ストリーミング データは、次を使用して複数の Redshift クラスターまたは Redshift サーバーレス ワークグループ間で共有できます。 データ共有.
  • ストリーミング マテリアライズド ビューを定義するときは、次の使用を避けてください。 Json_Extract_Path_Text データを事前にシュレッドするため、 Json_extract_path_text データを行ごとに処理するため、取り込みスループットに大きな影響を与えます。データをストリームからそのまま取得し、後で細断することをお勧めします。
  • 可能な場合は、取り込み速度を向上させるために、ストリーミング マテリアライズド ビューでソート キーをスキップすることを検討してください。ストリーミング マテリアライズド ビューにソート キーがある場合、ストリームから取り込まれたデータのバッチごとにソート操作が発生します。並べ替えでは、並べ替えキーのデータ型、並べ替えキー列の数、各バッチで取り込まれたデータの量に応じてパフォーマンスがオーバーヘッドされます。この並べ替え手順により、ストリーミング データをクエリできるようになるまでの待ち時間が長くなる可能性があります。取り込み時のレイテンシとデータのクエリ時のレイテンシのどちらがより重要かを比較検討する必要があります。
  • ストリーミング マテリアライズド ビューのパフォーマンスを最適化し、ストレージ使用量を削減するには、次を使用してマテリアライズド ビューからデータを時々パージしてください。 削除, 切り詰めるまたは テーブルの追加を変更する.
  • 複数の MSK トピックを Amazon Redshift に並行して取り込む必要がある場合は、少数のストリーミング マテリアライズド ビューから始めて、さらにマテリアライズド ビューを追加し続けて、クラスターまたはワークグループ内の全体的な取り込みパフォーマンスを評価します。
  • Redshift でプロビジョニングされたクラスター内のノードの数、または Redshift サーバーレス ワークグループのベース RPU を増やすと、ストリーミング マテリアライズド ビューの取り込みパフォーマンスを向上させることができます。最適なパフォーマンスを得るには、Redshift でプロビジョニングされたクラスター内に MSK トピック内のパーティションと同じ数のスライス、または MSK トピック内の 8 つのパーティションごとに XNUMX RPU を配置することを目指す必要があります。

モニタリング手法

取り込み時にターゲットのマテリアライズド・ビュー列のサイズを超えるトピック内のレコードはスキップされます。マテリアライズド ビューの更新によってスキップされたレコードは、 SYS_STREAM_SCAN_ERRORS システムテーブル。

マテリアライズド・ビュー定義内の計算、データ型変換、またはその他のロジックが原因でレコードを処理するときにエラーが発生すると、問題のレコードがトピックから期限切れになるまでマテリアライズド・ビューのリフレッシュが失敗します。このような問題を回避するには、マテリアライズド ビュー定義のロジックを慎重にテストしてください。それ以外の場合は、レコードをデフォルトの VARBYTE 列に配置し、後で処理します。

利用可能な監視ビューは次のとおりです。

  • SYS_MV_REFRESH_HISTORY – このビューを使用して、ストリーミング マテリアライズド ビューの更新履歴に関する情報を収集します。結果には、手動または自動などの更新タイプと、最新の更新のステータスが含まれます。次のクエリは、ストリーミング マテリアライズド ビューの更新履歴を示します。
    select mv_name, refresh_type, status, duration  from SYS_MV_REFRESH_HISTORY where mv_name='mv_store_sales'

  • SYS_STREAM_SCAN_ERRORS – このビューを使用して、MSK トピックからのストリーミング取り込みを介したレコードのロードに失敗した理由を確認します。この投稿を書いている時点では、Amazon MSK から取り込む場合、このビューはレコードがマテリアライズド ビューの列サイズより大きい場合にのみエラーをログに記録します。このビューでは、位置列に MSK レコードの一意の識別子 (オフセット) も表示されます。次のクエリは、レコードが最大サイズ制限を超えたときのエラー コードとエラーの理由を示します。
    select mv_name, external_schema_name, stream_name, record_time, query_id, partition_id, "position", error_code, error_reason
    from SYS_STREAM_SCAN_ERRORS  where mv_name='test_mv' and external_schema_name ='streaming_schema'	;
    

  • SYS_STREAM_SCAN_STATES – このビューを使用して、特定の Record_time でスキャンされたレコードの数を監視します。このビューは、バッチで読み取られた最後のレコードのオフセットも追跡します。次のクエリは、特定のマテリアライズド ビューのトピック データを示します。
    select mv_name,external_schema_name,stream_name,sum(scanned_rows) total_records,
    sum(scanned_bytes) total_bytes 
    from SYS_STREAM_SCAN_STATES where mv_name='test_mv' and external_schema_name ='streaming_schema' group by 1,2,3;
    

  • SYS_QUERY_HISTORY – このビューを使用して、ストリーミング マテリアライズド ビュー更新の全体的なメトリックを確認します。これにより、表示されないエラーについては、error_message 列にもエラーが記録されます。 SYS_STREAM_SCAN_ERRORS。次のクエリは、ストリーミング マテリアライズド ビューの更新失敗を引き起こすエラーを示しています。
    select  query_id, query_type, status, query_text, error_message from sys_query_history where status='failed' and start_time>='2024-02-03 03:18:00' order by start_time desc

実装に関する追加の考慮事項

必要に応じて、ストリーミング マテリアライズド ビューの上にマテリアライズド ビューを生成することもできます。これにより、ネストを解除して、エンド ユーザー向けに結果を事前計算できるようになります。このアプローチにより、ストアド プロシージャを使用して結果を最終テーブルに保存する必要がなくなります。

この投稿では、 CAN_JSON_PARSE 関数 エラーを防ぎ、より正常にデータを取り込むためです。この場合、解析できないストリーミング レコードは Amazon Redshift によってスキップされます。ただし、エラー レコードを追跡したい場合は、ストリーミング マテリアライズド ビューの作成時に次の SQL を使用してエラー レコードを列に保存することを検討してください。

CREATE MATERIALIZED VIEW Orders_Stream_MV AS 
SELECT
kafka_partition, 
kafka_offset, 
refresh_time, 
JSON_PARSE(kafka_value) as Data 
case when CAN_JSON_PARSE(kafka_value) = true then json_parse(kafka_value) end Data,
case when CAN_JSON_PARSE(kafka_value) = false then kafka_value end Invalid_Data
FROM custschema."ORDERTOPIC";

あなたも考慮することができます データのアンロード 眺めから SYS_STREAM_SCAN_ERRORSAmazon シンプル ストレージ サービス (Amazon S3) バケットを作成し、アラートを取得します 電子メールでレポートを送信する Amazon シンプル通知サービス 新しい S3 オブジェクトが作成されるたびに (Amazon SNS) 通知が送信されます。

最後に、データの鮮度要件に基づいて、次を使用できます。 アマゾンイベントブリッジ データ ウェアハウス内のジョブをスケジュールして、前述のメソッドを呼び出す SP_Orders_Load ストアド プロシージャを定期的に実行します。 EventBridge はこれを一定の間隔で実行するため、メカニズム (たとえば、 AWSステップ関数 ステート マシン) を使用して、プロシージャへの以前の呼び出しが完了したかどうかを監視します。詳細については、以下を参照してください。 スケジュールに従って実行する Amazon EventBridge ルールの作成。 も参照できます AWS Step Functions と Amazon Redshift Data API を使用して ELT プロセスのオーケストレーションを加速する。 別のオプションは使用することです AmazonRedshiftクエリエディターv2 をクリックして更新をスケジュールします。 詳細については、を参照してください。 クエリ エディター v2 を使用したクエリのスケジュール設定.

まとめ

この投稿では、Amazon MSK での Amazon Redshift ストリーミング取り込みを使用して、ほぼリアルタイムの分析を実装するためのベストプラクティスについて説明しました。ストリーミング取り込みを使用して MSK トピックから Amazon Redshift にデータを取り込むパイプラインの例を示しました。また、Kafka パーティションと Kafka オフセットを使用して、Amazon Redshift への増分ストリーミング データのロードを実行する信頼性の高い戦略も示しました。さらに、Amazon MSK から Amazon Redshift へのクロスアカウント ストリーミング取り込みを設定する手順を示し、取り込み速度を最適化するためのパフォーマンスの考慮事項について説明しました。最後に、Amazon Redshift ストリーミング取り込みの失敗を追跡するモニタリング手法について説明しました。

ご質問がある場合は、コメント欄に残してください。


著者について

プーロミ・ダスグプタ AWS のシニア分析ソリューション アーキテクトです。 彼女は、顧客がクラウドベースの分析ソリューションを構築してビジネス上の問題を解決できるよう支援することに情熱を注いでいます。 仕事以外では、旅行や家族と過ごすことが好きです。

アデクンル・アデドトゥン Amazon Redshift サービスのシニア データベース エンジニアです。彼は、パフォーマンス チューニングに重点を置いて MPP データベースに 6 年間取り組んできました。また、新規および既存のサービス機能について開発チームにガイダンスを提供します。

スポット画像

最新のインテリジェンス

スポット画像