Amazonレッドシフト は、フルマネージドでスケーラブルなクラウド データ ウェアハウスであり、大規模な高速かつ簡単かつ安全な分析により、洞察を得るまでの時間を短縮します。数万の顧客が Amazon Redshift を利用してエクサバイト規模のデータを分析し、複雑な分析クエリを実行しており、Amazon Redshift は最も広く使用されているクラウド データ ウェアハウスとなっています。データ ウェアハウス インフラストラクチャを管理することなく、すべてのデータに対して数秒で分析を実行および拡張できます。
あなたが使用することができます AmazonRedshiftストリーミングの取り込み 分析データベースをほぼリアルタイムで更新する機能。 Amazon Redshift ストリーミング インジェストでは、データ ストリーム上にマテリアライズド ビューを直接作成できるため、データ パイプラインが簡素化されます。 Amazon Redshift のこの機能を使用すると、構造化照会言語 (SQL) を使用して、次のようなデータ ストリームに接続し、そこからデータを直接取り込むことができます。 Amazon Kinesisデータストリーム or ApacheKafkaのAmazonマネージドストリーミング (Amazon MSK) データ ストリームを処理し、データを Amazon Redshift に直接プルします。
この投稿では、Amazon MSK での Amazon Redshift ストリーミング取り込みを使用して、ほぼリアルタイムの分析を実装するためのベストプラクティスについて説明します。
ソリューションの概要
Amazon Redshift ストリーミング取り込みを使用して MSK トピックから Amazon Redshift にデータを取り込むパイプラインの例を見ていきます。また、Amazon Redshift でドット表記を使用して JSON データのネストを解除する方法も示します。次の図は、ソリューション アーキテクチャを示しています。
プロセス フローは次のステップで構成されます。
- Redshift クラスターにストリーミング マテリアライズド ビューを作成して、MSK トピックからのライブ ストリーミング データを使用します。
- ストアド プロシージャを使用して、取り込まれた MSK トピックのレコード レベルで Kafka パーティションと Kafka オフセットの一意の組み合わせを使用して変更データ キャプチャ (CDC) を実装します。
- Redshift クラスターにユーザー向けテーブルを作成し、ドット表記を使用して、ストリーミング マテリアライズド ビューからテーブルのデータ列に JSON ドキュメントをネスト解除します。ストアド プロシージャを定期的に呼び出すことで、新しいデータを継続的にロードできます。
- 接続を確立する の間 アマゾンクイックサイト ダッシュボードと Amazon Redshift を使用して、視覚化と洞察を提供します。
この投稿の一部として、次のトピックについても説明します。
- Amazon MSK から Amazon Redshift へのクロスアカウント ストリーミング インジェストを設定する手順
- ストリーミング・マテリアライズド・ビューから最適化されたパフォーマンスを実現するためのベスト・プラクティス
- Amazon Redshift ストリーミング取り込みの失敗を追跡するモニタリング手法
前提条件
次のものが必要です。
- AWSアカウント.
- ユースケースに応じて、次のリソースのいずれか:
- MSK クラスター。手順については、を参照してください。 Amazon MSK クラスターを作成する.
- トピック データ プロデューサーがデータを公開できる MSK クラスター内。
- MSK クラスター内のトピックにデータを書き込むデータ プロデューサー。
MSK トピックを設定する際の考慮事項
MSK トピックを構成するときは、次の考慮事項に留意してください。
- MSK トピックの名前が 128 文字以下であることを確認してください。
- この記事の執筆時点では、圧縮データを含む MSK レコードを Amazon Redshift で直接クエリすることはできません。 Amazon Redshift は、MSK トピック内のクライアント側の圧縮データに対するネイティブの解凍方法をサポートしていません。
- 続きます ベストプラクティス MSK クラスターのセットアップ中。
- ストリーミング取り込みを確認する 制限 その他の考慮事項については。
ストリーミング取り込みを設定する
ストリーミング インジェストを設定するには、次の手順を実行します。
- セットアップ AWS IDおよびアクセス管理 ストリーミング取り込みに必要な (IAM) ロールと信頼ポリシー。手順については、を参照してください。 IAM のセットアップと Kafka からのストリーミング インジェストの実行.
- 次を使用して、データが MSK トピックに流れ込んでいることを確認してください。 アマゾンクラウドウォッチ メトリクス (例えば、 1秒あたりの出力バイト数).
- 次のステップのために、Amazon Redshift コンソールからクエリエディター v2 を起動するか、好みの SQL クライアントを使用して Redshift クラスターに接続します。次の手順はクエリ エディター v2 で実行されました。
- MSK クラスターにマップする外部スキーマを作成します。次のステートメントの IAM ロール ARN と MSK クラスター ARN を置き換えます。
- トピック名で大文字と小文字が区別される場合は、必要に応じて、
enable_case_sensitive_identifier
Amazon Redshift でアクセスできるようになります。大文字と小文字を区別する識別子を使用するには、次のように設定します。enable_case_sensitive_identifier
セッション、ユーザー、またはクラスター レベルのいずれかで true に設定します。 - MSK トピックからのストリーム データを消費するマテリアライズド ビューを作成します。
メタデータ列 kafka_value
Amazon MSK から到着したものは、 ヴァーバイト Amazon Redshift の形式。この投稿では、 JSON_PARSE 変換する関数 kafka_value
へ SUPER データ型。また、 CAN_JSON_PARSE 無効な JSON レコードをスキップし、JSON 解析の失敗によるエラーを防ぐフィルター条件内の関数。今後のデバッグのために無効なデータを保存する方法については、この投稿の後半で説明します。
- ストリーミング マテリアライズド ビューを更新すると、Amazon Redshift が MSK トピックから読み取り、データをマテリアライズド ビューにロードします。
自動更新機能を使用するようにストリーミング マテリアライズド ビューを設定することもできます。 これにより、データがストリームに到着すると、マテリアライズド ビューが自動的に更新されます。 見る マテリアライズドビューを作成する 自動更新を使用してマテリアライズド ビューを作成する手順については、「
JSONドキュメントのネストを解除する
以下は、MSK トピックからストリーミング マテリアライズド ビューの SUPER タイプのデータ列に取り込まれた JSON ドキュメントのサンプルです。 Orders_Stream_MV
:
次のコードに示すようにドット表記を使用して、JSON ペイロードのネストを解除します。
次のスクリーンショットは、ネストを解除した後の結果がどのようになるかを示しています。
JSON ドキュメントに配列がある場合は、次を使用してデータのネストを解除することを検討してください。 パーティQL Amazon Redshift のステートメント。詳細については、「 JSONドキュメントのネストを解除する ポストで Amazon Kinesis Data Streams および Amazon DynamoDB による Amazon Redshift ストリーミング取り込みを使用した準リアルタイム分析.
増分データロード戦略
増分データ ロードを実装するには、次の手順を実行します。
- Amazon Redshift で Orders というテーブルを作成します。エンドユーザーはこのテーブルを視覚化とビジネス分析に使用します。
次に、というストアド プロシージャを作成します。 SP_Orders_Load
ストリーミング マテリアライズド ビューから CDC を実装し、最終的なビューにロードする Orders
テーブル。の組み合わせを使用します Kafka_Partition
& Kafka_Offset
CDC を実装するためのシステム列としてストリーミング マテリアライズド ビューで使用できます。これら 2 つの列の組み合わせは MSK トピック内で常に一意となるため、プロセス中にレコードが失われることはありません。ストアド プロシージャには次のコンポーネントが含まれています。
- 大文字と小文字を区別する識別子を使用するには、次のように設定します。
enable_case_sensitive_identifier
セッション、ユーザー、またはクラスター レベルのいずれかで true に設定します。 - 自動更新が有効になっていない場合は、ストリーミング マテリアライズド ビューを手動で更新します。
- という監査テーブルを作成します。
Orders_Streaming_Audit
ストアド プロシージャの最後の実行中に Orders テーブルに読み込まれたパーティションの最後のオフセットを追跡するためのフィールドが存在しない場合。 - ネストを解除し、新しいデータまたは変更されたデータのみをステージング テーブルに挿入します。
Orders_Staging_Table
、ストリーミング マテリアライズド ビューから読み取るOrders_Stream_MV
ここで、Kafka_Offset
最後に処理されたものよりも大きいですKafka_Offset
監査テーブルに記録されるOrders_Streaming_Audit
Kafka_Partition
処理中です。 - このストアド プロシージャを使用して初めてロードするとき、ファイルにはデータがありません。
Orders_Streaming_Audit
テーブルとすべてのデータOrders_Stream_MV
Orders テーブルにロードされます。 - ビジネス関連の列のみをユーザー向けの列に挿入します。
Orders
テーブル、ステージング テーブルから選択Orders_Staging_Table
. - 最大値を挿入します
Kafka_Offset
ロードされたすべてのKafka_Partition
監査テーブルにOrders_Streaming_Audit
中間ステージングテーブルを追加しました Orders_Staging_Table
このソリューションでは、予期しない障害が発生した場合のデバッグと追跡に役立ちます。ステージング ステップをスキップし、最終テーブルに直接ロードします。 Orders_Stream_MV
ユースケースに応じて、待ち時間を短縮できます。
- 次のコードを使用してストアド プロシージャを作成します。
- ストアド プロシージャを実行してデータを
Orders
テーブル: - Orders テーブルのデータを検証します。
クロスアカウントのストリーミング取り込みを確立する
MSK クラスターが別のアカウントに属している場合は、次の手順を実行して IAM ロールを作成し、クロスアカウント ストリーミング インジェストを設定します。次の図に示すように、Redshift クラスターがアカウント A にあり、MSK クラスターがアカウント B にあると仮定します。
次の手順を完了します。
- アカウント B で、という名前の IAM ロールを作成します。
MyRedshiftMSKRole
これにより、Amazon Redshift (アカウント A) が、という名前の MSK クラスター (アカウント B) と通信できるようになります。MyTestCluster
。 MSK クラスターが接続に IAM 認証を使用するか非認証アクセスを使用するかに応じて、次のポリシーのいずれかを使用して IAM ロールを作成する必要があります。- IAM
policAmazonAmazon
非認証アクセスを使用する MSK: - IAM 認証を使用する場合の Amazon MSK の IAM ポリシー:
- IAM
前の例のリソース セクションでは、 MyTestCluster
MSKクラスター。 IAM ロールを特定のトピックに制限する必要がある場合は、トピック リソースをより制限的なリソース ポリシーに置き換える必要があります。
- アカウント B に IAM ロールを作成したら、IAM ロール ARN をメモします (例:
arn:aws:iam::0123456789:role/MyRedshiftMSKRole
). - アカウント A で、Redshift のカスタマイズ可能な IAM ロールを作成します。
MyRedshiftRole
、Amazon Redshift は Amazon MSK に接続するときにそれを想定します。ロールには次のようなポリシーが必要です。これにより、アカウント A の Amazon Redshift IAM ロールがアカウント B の Amazon MSK ロールを引き受けることができます。 - Amazon Redshift IAM ロールのロール ARN をメモします (例:
arn:aws:iam::9876543210:role/MyRedshiftRole
). - アカウント B に戻り、このロールを IAM ロールの信頼ポリシーに追加します。
arn:aws:iam::0123456789:role/MyRedshiftMSKRole
アカウント B がアカウント A の IAM ロールを信頼できるようにします。信頼ポリシーは次のコードのようになります。 - アカウント A として Amazon Redshift コンソールにサインインします。
- クエリ エディター v2 または任意の SQL クライアントを起動し、次のステートメントを実行してアカウント B の MSK トピックにアクセスします。MSK クラスターにマップするには、次を使用して外部スキーマを作成します。 役割連鎖 IAM ロール ARN を、前後にスペースを入れずにカンマで区切って指定します。 Redshift クラスターに関連付けられたロールは、チェーンの最初に配置されます。
パフォーマンスに関する考慮事項
次のパフォーマンスに関する考慮事項に留意してください。
- ストリーミング マテリアライズド ビューを単純にし、ネスト解除、集計、ケース式などの変換を後のステップに移動します。たとえば、ストリーミング マテリアライズド ビューの上に別のマテリアライズド ビューを作成します。
- 特定の MSK トピックに対して、単一の Redshift クラスターまたはワークグループにストリーミング マテリアライズド ビューを 1 つだけ作成することを検討してください。 MSK トピックごとに複数のマテリアライズド ビューを作成すると、各マテリアライズド ビューがそのトピックのコンシューマーとなり、そのトピックの Amazon MSK 帯域幅を共有するため、取り込みパフォーマンスが低下する可能性があります。ストリーミング マテリアライズド ビュー内のライブ ストリーミング データは、次を使用して複数の Redshift クラスターまたは Redshift サーバーレス ワークグループ間で共有できます。 データ共有.
- ストリーミング マテリアライズド ビューを定義するときは、次の使用を避けてください。 Json_Extract_Path_Text データを事前にシュレッドするため、
Json_extract_path_text
データを行ごとに処理するため、取り込みスループットに大きな影響を与えます。データをストリームからそのまま取得し、後で細断することをお勧めします。 - 可能な場合は、取り込み速度を向上させるために、ストリーミング マテリアライズド ビューでソート キーをスキップすることを検討してください。ストリーミング マテリアライズド ビューにソート キーがある場合、ストリームから取り込まれたデータのバッチごとにソート操作が発生します。並べ替えでは、並べ替えキーのデータ型、並べ替えキー列の数、各バッチで取り込まれたデータの量に応じてパフォーマンスがオーバーヘッドされます。この並べ替え手順により、ストリーミング データをクエリできるようになるまでの待ち時間が長くなる可能性があります。取り込み時のレイテンシとデータのクエリ時のレイテンシのどちらがより重要かを比較検討する必要があります。
- ストリーミング マテリアライズド ビューのパフォーマンスを最適化し、ストレージ使用量を削減するには、次を使用してマテリアライズド ビューからデータを時々パージしてください。 削除, 切り詰めるまたは テーブルの追加を変更する.
- 複数の MSK トピックを Amazon Redshift に並行して取り込む必要がある場合は、少数のストリーミング マテリアライズド ビューから始めて、さらにマテリアライズド ビューを追加し続けて、クラスターまたはワークグループ内の全体的な取り込みパフォーマンスを評価します。
- Redshift でプロビジョニングされたクラスター内のノードの数、または Redshift サーバーレス ワークグループのベース RPU を増やすと、ストリーミング マテリアライズド ビューの取り込みパフォーマンスを向上させることができます。最適なパフォーマンスを得るには、Redshift でプロビジョニングされたクラスター内に MSK トピック内のパーティションと同じ数のスライス、または MSK トピック内の 8 つのパーティションごとに XNUMX RPU を配置することを目指す必要があります。
モニタリング手法
取り込み時にターゲットのマテリアライズド・ビュー列のサイズを超えるトピック内のレコードはスキップされます。マテリアライズド ビューの更新によってスキップされたレコードは、 SYS_STREAM_SCAN_ERRORS システムテーブル。
マテリアライズド・ビュー定義内の計算、データ型変換、またはその他のロジックが原因でレコードを処理するときにエラーが発生すると、問題のレコードがトピックから期限切れになるまでマテリアライズド・ビューのリフレッシュが失敗します。このような問題を回避するには、マテリアライズド ビュー定義のロジックを慎重にテストしてください。それ以外の場合は、レコードをデフォルトの VARBYTE 列に配置し、後で処理します。
利用可能な監視ビューは次のとおりです。
- SYS_MV_REFRESH_HISTORY – このビューを使用して、ストリーミング マテリアライズド ビューの更新履歴に関する情報を収集します。結果には、手動または自動などの更新タイプと、最新の更新のステータスが含まれます。次のクエリは、ストリーミング マテリアライズド ビューの更新履歴を示します。
- SYS_STREAM_SCAN_ERRORS – このビューを使用して、MSK トピックからのストリーミング取り込みを介したレコードのロードに失敗した理由を確認します。この投稿を書いている時点では、Amazon MSK から取り込む場合、このビューはレコードがマテリアライズド ビューの列サイズより大きい場合にのみエラーをログに記録します。このビューでは、位置列に MSK レコードの一意の識別子 (オフセット) も表示されます。次のクエリは、レコードが最大サイズ制限を超えたときのエラー コードとエラーの理由を示します。
- SYS_STREAM_SCAN_STATES – このビューを使用して、特定の Record_time でスキャンされたレコードの数を監視します。このビューは、バッチで読み取られた最後のレコードのオフセットも追跡します。次のクエリは、特定のマテリアライズド ビューのトピック データを示します。
- SYS_QUERY_HISTORY – このビューを使用して、ストリーミング マテリアライズド ビュー更新の全体的なメトリックを確認します。これにより、表示されないエラーについては、error_message 列にもエラーが記録されます。 SYS_STREAM_SCAN_ERRORS。次のクエリは、ストリーミング マテリアライズド ビューの更新失敗を引き起こすエラーを示しています。
実装に関する追加の考慮事項
必要に応じて、ストリーミング マテリアライズド ビューの上にマテリアライズド ビューを生成することもできます。これにより、ネストを解除して、エンド ユーザー向けに結果を事前計算できるようになります。このアプローチにより、ストアド プロシージャを使用して結果を最終テーブルに保存する必要がなくなります。
この投稿では、 CAN_JSON_PARSE 関数 エラーを防ぎ、より正常にデータを取り込むためです。この場合、解析できないストリーミング レコードは Amazon Redshift によってスキップされます。ただし、エラー レコードを追跡したい場合は、ストリーミング マテリアライズド ビューの作成時に次の SQL を使用してエラー レコードを列に保存することを検討してください。
あなたも考慮することができます データのアンロード 眺めから SYS_STREAM_SCAN_ERRORS に Amazon シンプル ストレージ サービス (Amazon S3) バケットを作成し、アラートを取得します 電子メールでレポートを送信する Amazon シンプル通知サービス 新しい S3 オブジェクトが作成されるたびに (Amazon SNS) 通知が送信されます。
最後に、データの鮮度要件に基づいて、次を使用できます。 アマゾンイベントブリッジ データ ウェアハウス内のジョブをスケジュールして、前述のメソッドを呼び出す SP_Orders_Load
ストアド プロシージャを定期的に実行します。 EventBridge はこれを一定の間隔で実行するため、メカニズム (たとえば、 AWSステップ関数 ステート マシン) を使用して、プロシージャへの以前の呼び出しが完了したかどうかを監視します。詳細については、以下を参照してください。 スケジュールに従って実行する Amazon EventBridge ルールの作成。 も参照できます AWS Step Functions と Amazon Redshift Data API を使用して ELT プロセスのオーケストレーションを加速する。 別のオプションは使用することです AmazonRedshiftクエリエディターv2 をクリックして更新をスケジュールします。 詳細については、を参照してください。 クエリ エディター v2 を使用したクエリのスケジュール設定.
まとめ
この投稿では、Amazon MSK での Amazon Redshift ストリーミング取り込みを使用して、ほぼリアルタイムの分析を実装するためのベストプラクティスについて説明しました。ストリーミング取り込みを使用して MSK トピックから Amazon Redshift にデータを取り込むパイプラインの例を示しました。また、Kafka パーティションと Kafka オフセットを使用して、Amazon Redshift への増分ストリーミング データのロードを実行する信頼性の高い戦略も示しました。さらに、Amazon MSK から Amazon Redshift へのクロスアカウント ストリーミング取り込みを設定する手順を示し、取り込み速度を最適化するためのパフォーマンスの考慮事項について説明しました。最後に、Amazon Redshift ストリーミング取り込みの失敗を追跡するモニタリング手法について説明しました。
ご質問がある場合は、コメント欄に残してください。
著者について
プーロミ・ダスグプタ AWS のシニア分析ソリューション アーキテクトです。 彼女は、顧客がクラウドベースの分析ソリューションを構築してビジネス上の問題を解決できるよう支援することに情熱を注いでいます。 仕事以外では、旅行や家族と過ごすことが好きです。
アデクンル・アデドトゥン Amazon Redshift サービスのシニア データベース エンジニアです。彼は、パフォーマンス チューニングに重点を置いて MPP データベースに 6 年間取り組んできました。また、新規および既存のサービス機能について開発チームにガイダンスを提供します。
- SEO を活用したコンテンツと PR 配信。 今日増幅されます。
- PlatoData.Network 垂直生成 Ai。 自分自身に力を与えましょう。 こちらからアクセスしてください。
- プラトアイストリーム。 Web3 インテリジェンス。 知識増幅。 こちらからアクセスしてください。
- プラトンESG。 カーボン、 クリーンテック、 エネルギー、 環境、 太陽、 廃棄物管理。 こちらからアクセスしてください。
- プラトンヘルス。 バイオテクノロジーと臨床試験のインテリジェンス。 こちらからアクセスしてください。
- 情報源: https://aws.amazon.com/blogs/big-data/best-practices-to-implement-near-real-time-analytics-using-amazon-redshift-streaming-ingestion-with-amazon-msk/