ゼファーネットのロゴ

Amazon MSK と Amazon EventBridge を使用してイベント駆動型のアーキテクチャを構築する | アマゾン ウェブ サービス

日付:

不変の事実 (イベント) に基づいて、イベント駆動型アーキテクチャ (EDA) を使用すると、企業は顧客の行動についてより深い洞察を得ることができ、より正確で迅速な意思決定プロセスが可能になり、顧客エクスペリエンスの向上につながります。 EDA では、次のような最新のイベント ブローカーが使用されます。 アマゾンイベントブリッジ および Apache Kafka は、イベントのパブリッシュとサブスクライブを行う重要な役割を果たします。 EventBridge は、独自のアプリ、サービスとしてのソフトウェア (SaaS) アプリ、AWS サービスからデータを取り込み、そのデータをターゲットにルーティングするサーバーレス イベント バスです。 EDA のバックボーンとしての役割には重複がありますが、両方のソリューションは異なる問題ステートメントから生まれ、特定の使用例を解決するための独自の機能を提供します。 両方のテクノロジーとその主な使用例をしっかりと理解することで、開発者は使いやすく、保守可能で、進化可能な EDA を作成できます。

ユースケースが明確に定義されており、ストリーミング イベントを使用したイベント ストリーミングと分析 (Kafka) や、個別のイベントに対する簡略化された一貫性のあるイベント フィルタリング、変換、ルーティングを使用したアプリケーション統合 ​​(EventBridge) など、XNUMX つのイベント バスに直接マッピングされている場合、特定のブローカー テクノロジーの場合は簡単です。 ただし、組織やビジネスの要件はより複雑であることが多く、XNUMX つのブローカー テクノロジーの能力を超えています。 ほとんどの場合、イベント ブローカーの選択は二者択一で決定すべきではありません。 補完的なブローカー テクノロジーを組み合わせ、その独自の強みを活用することは、使いやすく、保守可能で、進化可能な EDA を構築するための確実なアプローチです。 Kafka と EventBridge の統合をさらにスムーズにするために、AWS は Apache Kafka に基づいた EventBridge コネクタをオープンソース化しました。 これにより、Kafka Connect の既存の知識とツールセットを使用しながら、オンプレミスの Kafka デプロイメントから利用し、ポイントツーポイント通信を回避できます。

ストリーミング アプリケーションにより、バインドされていないデータセットに対するステートフルな計算が可能になります。 これにより、異常検出、イベント時の計算などのリアルタイムのユースケースが可能になります。 これらのアプリケーションは、Apache Flink、Apache Spark、Kafka Streams などのフレームワークを使用して構築できます。 これらのフレームワークの一部は、Apache Kafka 以外のダウンストリーム システムへのイベントの送信をサポートしていますが、フレームワーク全体でこれを行うための標準化された方法はありません。 各アプリケーション所有者は、イベントをダウンストリームに送信するための独自のロジックを構築する必要があります。 EDA では、このようなシナリオを処理するための推奨される方法は、イベントをイベント バスに発行し、それをダウンストリームに送信することです。

Apache Kafka から EventBridge にイベントを送信するには XNUMX つの方法があります。 Amazon EventBridge パイプ または、Kafka Connect の EventBridge シンク コネクタ。 この投稿では、どのオプションをいつ使用するか、および EventBridge シンク コネクタを使用して EDA を構築する方法について説明します。 ApacheKafkaのAmazonマネージドストリーミング (Amazon MSK)。

EventBridge シンク コネクタと EventBridge パイプ

EventBridge Pipes は、ポイントツーポイント統合でソースとターゲットを接続し、イベントのフィルタリング、変換、エンリッチメント、および API 宛先を使用した 14 を超える AWS サービスと外部 HTTPS ベースのターゲットへのイベント配信をサポートします。 これは、快適な開発者エクスペリエンスでセットアップと操作を簡素化するため、Kafka から EventBridge にイベントを送信するための最も簡単な方法として推奨されます。

あるいは、次の状況では、EventBridge シンク コネクタを選択して、Kafka から EventBridge イベント バスにイベントを直接送信することもできます。

  • 他のシステムやサービスと統合するために選択したプラットフォームとして、Kafka Connect フレームワークを中心としたプロセスとツールにすでに投資しています。
  • Kafka 互換のスキーマ レジストリと統合する必要があります。 AWSGlueスキーマレジストリ、イベントのシリアル化と逆シリアル化のための Avro および Protobuf データ形式をサポート
  • オンプレミスの Kafka 環境から EventBridge イベント バスにイベントを直接送信したい

ソリューションの概要

この投稿では、Kafka Connect と EventBridge シンク コネクタを使用して、Avro でシリアル化されたイベントを送信する方法を説明します。 Apache Kafka の Amazon マネージド ストリーミング (Amazon MSK) イベントブリッジへ。 これにより、Apache Kafka から、EventBridge API 宛先を使用して、Amazon CloudWatch、Amazon SQS、AWS Lambda、Salesforce、Datadog、Snowflake などの HTTPS ターゲットなど、サポートされている数十の EventBridge AWS およびパートナー ターゲットへのシームレスで一貫したデータ フローが可能になります。 次の図は、このブログ投稿で使用されている、Amazon MSK と EventBridge に基づくイベント駆動型アーキテクチャを示しています。

アーキテクチャ図

ワークフローは次の手順で構成されます。

  1. デモ アプリケーションはクレジット カード トランザクションを生成し、Avro 形式を使用して Amazon MSK に送信します。
  2. Amazon Elastic Container Service (Amazon ECS) 上で実行されている分析アプリケーションはトランザクションを消費し、異常がある場合は分析します。
  3. 異常が検出された場合、アプリケーションは不正検出イベントを MSK 通知トピックに送り返します。
  4. EventBridge コネクタは、Amazon MSK からの不正検出イベントを Avro 形式で消費します。
  5. コネクタはイベントを JSON に変換し、EventBridge に送信します。
  6. EventBridge では、JSON フィルタリング ルールを使用してイベントをフィルタリングし、他のサービスまたは別のサービスに送信します。 イベントバス。 この例では、不正検出イベントが次の宛先に送信されます。 Amazon CloudWatchログ 監査とイントロスペクションのために、また、サードパーティ API との統合がいかに簡単であるかを示すためにサードパーティ SaaS プロバイダーに送信します。 Salesforce.

前提条件

このチュートリアルでは、次の前提条件を満たしている必要があります。

AWS CDK スタックをデプロイする

このチュートリアルでは、AWS CDK スタックをアカウントにデプロイする必要があります。 フルスタックをエンドツーエンドでデプロイすることも、この投稿に沿って必要なリソースだけをデプロイすることもできます。

  1. ターミナルで次のコマンドを実行します。
    git clone https://github.com/awslabs/eventbridge-kafka-connector/

  2. cdk ディレクトリに移動します。
    cd eventbridge-kafka-connector/cdk

  3. 好みに基づいて AWS CDK スタックをデプロイします。
  4. この投稿で説明されている完全なセットアップを確認したい場合は、次のコマンドを実行します。
    cdk deploy —context deployment=FULL

  5. コネクタを独自にデプロイしたいが、MSK クラスターなどの必要なリソースがすでにある場合は、 AWS IDおよびアクセス管理 (IAM) ロール、セキュリティ グループ、データ ジェネレーターなどを設定するには、次のコマンドを実行します。
    cdk deploy —context deployment=PREREQ

EventBridge シンクコネクタを Amazon MSK Connect にデプロイする

CDK スタックを FULL モードでデプロイした場合は、このセクションをスキップして次のセクションに進むことができます。 EventBridge ルールを作成します。

コネクタには、MSK クラスターからデータを読み取り、レコードを下流の EventBridge に送信できる IAM ロールが必要です。

コネクタコードを Amazon S3 にアップロードする

コネクタ コードをアップロードするには、次の手順を実行します。 Amazon シンプル ストレージ サービス (Amazon S3):

  1. に移動します GitHubレポ.
  2. ダウンロード リリース 1.0.0 には、AWS Glue Schema Registry の依存関係が含まれています。
  3. Amazon S3コンソールで、 バケット ナビゲーションペインに表示されます。
  4. 選択する バケットを作成する.
  5. バケット名、 入る eventbridgeconnector-bucket-${AWS_ACCOUNT_ID}.

S3 バケットはグローバルに一意である必要があるため、 ${AWS_ACCOUNT_ID} 実際の AWS アカウント ID を使用します。 例えば、 eventbridgeconnector-bucket-123456789012.

  1. バケットを開いて、 アップロード.
  2. GitHub リポジトリからダウンロードした .jar ファイルを選択し、 アップロード.

S3 ファイルアップロードコンソール

カスタム プラグインを作成する

これで、アプリケーション コードが Amazon S3 に追加されました。 次のステップとして、カスタム プラグインを作成します。 アマゾンMSKコネクト。 次の手順を実行します。

  1. Amazon MSKコンソールで、 カスタムプラグイン 下のナビゲーションペインで MSKコネクト.
  2. 選択する カスタムプラグインを作成する.
  3. S3 URI – カスタム プラグイン オブジェクトという名前のファイルを参照します。 kafka-eventbridge-sink-with-gsr-dependencies.jar S3バケットで eventbridgeconnector-bucket-${AWS_ACCOUNT_ID} EventBridge コネクタの場合。
  4. カスタムプラグイン名、 入る msk-eventBridge-sink-plugin-v1.
  5. オプションの説明を入力します。
  6. 選択する カスタムプラグインを作成する.

MSK Connectプラグイン画面

  1. プラグインがステータスに遷移するまで待ちます アクティブ.

コネクタを作成する

MSK Connect でコネクタを作成するには、次の手順を実行します。

  1. Amazon MSKコンソールで、 コネクタ 下のナビゲーションペインで MSKコネクト.
  2. 選択する コネクタを作成する.
  3. 選択 既存のカスタム プラグインを使用 と下 カスタムプラグイン、プラグインを選択します msk-eventBridge-sink-plugin-v1 以前に作成したもの。
  4. 選択する Next.
  5. コネクタ名、 入る msk-eventBridge-sink-connector.
  6. オプションの説明を入力します。
  7. クラスタ type選択 MSK クラスター.
  8. MSKクラスターで、前に作成したクラスターを選択します。
  9. 認証、選択する IAM.
  10. コネクタ構成、次の設定を入力します (構成の詳細については、 GitHubリポジトリ):
    auto.offset.reset=earliest
    connector.class=software.amazon.event.kafkaconnector.EventBridgeSinkConnector
    topics=notifications
    aws.eventbridge.connector.id=avro-test-connector
    aws.eventbridge.eventbus.arn=arn:aws:events:us-east-1:123456789012:event-bus/eventbridge-sink-eventbus
    aws.eventbridge.region=us-east-1
    tasks.max=1
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
    value.converter.region=us-east-1
    value.converter.registry.name=default-registry
    value.converter.avroRecordType=GENERIC_RECORD

  11. ユーザーとの強い関係を維持するために、 replace aws.eventbridge.eventbus.arn, aws.eventbridge.region, value.converter.region 前提条件スタックの値を使用します。
  12. コネクタ容量 セクション、選択 自動スケール for 容量タイプ.
  13. デフォルト値の 1 のままにしておきます。 ワーカーあたりの MCU 数.
  14. すべてのデフォルト値を保持します コネクタ容量.
  15. ワーカー構成選択 MSK のデフォルト構成を使用する.
  16. アクセス許可、カスタム IAM ロールを選択します KafkaEventBridgeSinkStack-connectorRole、AWS CDK スタックのデプロイ中に作成したものです。
  17. 選択する Next.
  18. 選択する Next 再び。
  19. ログ配信選択 Amazon CloudWatch Logs に配信する.
  20. ロググループ、選択する /aws/mskconnect/eventBridgeSinkConnector.
  21. 選択する Next.
  22. レビューと作成、すべての設定を検証し、選択します コネクタを作成する.

コネクタは次の状態になります。 作成。 コネクタが次のステータスに移行するまでに最大で数分かかる場合があります。 Running:.

EventBridge ルールを作成する

コネクタがイベントを EventBridge に転送するようになったので、EventBridge ルールを使用してイベントをフィルターし、他のターゲットに送信できます。 ルールを作成するには、次の手順を実行します。

  1. EventBridge コンソールで、 キャンペーンのルール ナビゲーションペインに表示されます。
  2. 選択する ルールを作成.
  3. 入力します eb-to-cloudwatch-logs-and-webhook for 名前 .
  4. イベントブリッジ-シンク-イベントバスを選択します イベントバス.
  5. 選択する Next.
  6. 選択 カスタムパターン(JSONエディター)、選択する インセット、イベント パターンを次のコードに置き換えます。
    { "detail": { "value": { "eventType": ["suspiciousActivity"], "source": ["transactionAnalyzer"] } }, "detail-type": [{ "prefix": "kafka-connect-notification" }]
    }
    

  7. 選択する Next.
  8. ターゲット1選択 CloudWatch ログ グループ 入力してください kafka-events for ロググループ.
  9. 選択する 別のターゲットを追加する.
  10. (オプション: API 宛先を作成します) ターゲット2選択 EventBridge API の宛先 for ターゲットの種類.
  11. 選択 新しい API 宛先を作成する.
  12. わかりやすい名前を入力してください 名前 .
  13. URLを追加して次のように入力します API宛先エンドポイント。 (これは、Datadog、Salesforce などのエンドポイントの URL にすることができます)
  14. 選択 POST for HTTPメソッド.
  15. 選択 新しい接続を作成する for 接続.
  16. 接続名、名前を入力します。
  17. 選択 その他 as 宛先タイプ をクリックして APIキー as 認可タイプ.
  18. APIキー名 および 、キーを入力します。
  19. 選択する Next.
  20. 入力を検証して選択します ルールを作成.

イベントブリッジルール

CloudWatch Logs コンソールの次のスクリーンショットは、EventBridge からのいくつかのイベントを示しています。

CloudWatchログ

実稼働環境でコネクタを実行する

このセクションでは、コネクタの運用面について詳しく説明します。 具体的には、コネクタをスケーリングする方法と、CloudWatch を使用してコネクタを監視する方法について説明します。

コネクタを拡大縮小する

Kafka コネクタは、タスクの数に合わせて拡張します。 EventBridge シンク コネクタのコード設計では、実行できるタスクの数が制限されません。 MSK Connect は、タスクを実行するためのコンピューティング能力を提供します。 Provisioned or 自動スケール タイプ。 コネクタの展開中に、容量タイプを選択します 自動スケール ワーカーごとに 1 MCU (1vCPU と 4GiB のメモリを表します)。 つまり、MSK Connect はタスクを実行するためにインフラストラクチャを拡張しますが、タスクの数は拡張しません。 タスクの数はコネクタによって定義されます。 デフォルトでは、コネクタはコネクタ構成のtasks.maxで定義されたタスクの数で開始します。 この値が処理されたトピックのパーティション数より大きい場合、タスクの数は、Kafka Connect のリバランス中にパーティションの数に設定されます。

コネクタを監視する

MSK Connect はメトリクスを発行します デフォルトでモニタリングのために CloudWatch に送信されます。 MSK Connect メトリクスに加えて、実稼働環境ではコネクタのオフセットも監視する必要があります。 オフセットを監視すると、コネクタが Kafka クラスターで生成されたデータに追いつくことができるかどうかを把握できます。

クリーンアップ

リソースをクリーンアップして継続的な料金を回避するには、次の手順を実行します。

  1. Amazon MSKコンソールで、 コネクタ 下のナビゲーションペインで MSKコネクト.
  2. 作成したコネクタを選択し、 削除.
  3. 選択する クラスター ナビゲーションペインに表示されます。
  4. 作成したクラスターを選択し、 削除 メニュー。
  5. EventBridge コンソールで、 キャンペーンのルール ナビゲーションペインに表示されます。
  6. イベント バス「eventbridge-sink-eventbus」を選択します。
  7. 作成したすべてのルールを選択し、 削除.
  8. 「delete」と入力して削除を確認し、選択します 削除.

コンテキスト PREREQ を使用して AWS CDK スタックをデプロイした場合は、コネクタの .jar ファイルを削除します。

  1. Amazon S3コンソールで、 バケット ナビゲーションペインに表示されます。
  2. コネクタをアップロードしたバケットに移動し、 kafka-eventbridge-sink-with-gsr-dependencies.jar ファイルにソフトウェアを指定する必要があります。

選択したデプロイメントモードとは関係なく、他のすべての AWS リソースは AWS CDK または AWS CloudFormation。 走る cdk destroy リポジトリ ディレクトリから CloudFormation スタックを削除します。

または、AWS CloudFormation コンソールでスタックを選択します KafkaEventBridgeSinkStack 選択して 削除.

まとめ

この投稿では、MSK Connect を使用して EventBridge 用の AWS オープンソース Kafka コネクタを実行する方法、Kafka トピックを EventBridge に転送するようにコネクタを設定する方法、EventBridge ルールを使用してイベントをフィルタリングして CloudWatch に転送する方法を説明しました。ログと Webhook。

EventBridge の Kafka コネクタの詳細については、以下を参照してください。 Amazon EventBridge が Kafka Connect 用のオープンソースコネクタを発表、並びに MSKConnect開発者ガイド とコネクタのコード GitHubレポ.


著者について

フロリアン・メア AWS のシニア ソリューション アーキテクトおよびデータ ストリーミングの専門家です。 彼は、AWS クラウド サービスを使用してビジネス上の課題を解決することで、ドイツの顧客の成功と革新を支援する技術者です。 ソリューション アーキテクトとして働くことに加えて、Florian は情熱的な登山家でもあり、ヨーロッパ各地の最も高い山のいくつかに登っています。

ベンジャミン・マイヤー AWS のシニア ソリューション アーキテクトであり、AWS クラウド サービスを使用してビジネス上の課題を解決するためにドイツのゲーム ビジネスに重点を置いています。 ベンジャミンは 7 年間熱心な技術者であり、顧客をサポートしていないときは、モバイル アプリを開発したり、電子機器を構築したり、サボテンの世話をしたりしています。

スポット画像

最新のインテリジェンス

スポット画像