ゼファーネットのロゴ

Python を使用して Amazon MSK 上の Apache Kafka でエンドツーエンドのサーバーレス ストリーミング パイプラインを構築する |アマゾン ウェブ サービス

日付:

ゲーム、小売、金融から製造、ヘルスケア、旅行に至るまで、世界中で生成されるデータの量は急増し続けています。組織は、絶え間なく流入するデータを迅速に利用して、ビジネスや顧客のためにイノベーションを起こすさらなる方法を模索しています。データを確実にキャプチャ、処理、分析し、無数のデータ ストアにすべてリアルタイムでロードする必要があります。

Apache Kafka は、こうしたリアルタイム ストリーミングのニーズによく選ばれています。ただし、アプリケーションのニーズに応じて自動的に拡張する他のデータ処理コンポーネントとともに Kafka クラスターをセットアップするのは困難な場合があります。ピーク トラフィックのプロビジョニングが不足してダウンタイムが発生したり、ベース ロードのプロビジョニングが過剰になって無駄が生じたりするリスクがあります。 AWS は、次のような複数のサーバーレス サービスを提供しています。 ApacheKafkaのAmazonマネージドストリーミング (Amazon MSK)、 Amazon データ ファイアホース, Amazon DynamoDB, AWSラムダ ニーズに応じて自動的にスケールします。

この投稿では、これらのサービスのいくつかを使用する方法について説明します。 MSK サーバーレス、リアルタイムのニーズを満たすサーバーレス データ プラットフォームを構築します。

ソリューションの概要

シナリオを想像してみましょう。あなたは、複数の地域に展開されているインターネット サービス プロバイダーの数千のモデムを管理する責任を負っています。顧客の生産性と満足度に大きな影響を与えるモデムの接続品質を監視したいと考えています。導入環境には、ダウンタイムを最小限に抑えるために監視および保守する必要があるさまざまなモデムが含まれています。各デバイスは、CPU 使用率、メモリ使用量、アラーム、接続ステータスなどの 1 KB のレコードを毎秒数千件送信します。パフォーマンスをリアルタイムで監視し、問題を迅速に検出して軽減できるように、このデータにリアルタイムでアクセスしたいと考えています。また、機械学習 (ML) モデルが予知保全評価を実行し、最適化の機会を見つけて需要を予測するために、このデータに長期的にアクセスする必要もあります。

オンサイトでデータを収集するクライアントは Python で記述されており、すべてのデータを Apache Kafka トピックとして Amazon MSK に送信できます。アプリケーションの低遅延かつリアルタイムのデータ アクセスのために、次を使用できます。 ラムダとDynamoDB。長期のデータ保管には、マネージドサーバーレスコネクタサービスを使用できます。 Amazon データ ファイアホース データをデータレイクに送信します。

次の図は、このエンドツーエンドのサーバーレス アプリケーションを構築する方法を示しています。

エンドツーエンドのサーバーレス アプリケーション

次のセクションの手順に従って、このアーキテクチャを実装してみましょう。

Amazon MSK でサーバーレス Kafka クラスターを作成する

Amazon MSK を使用して、モデムからリアルタイムのテレメトリ データを取り込みます。 Amazon MSK では、サーバーレス Kafka クラスターの作成が簡単です。を使用すると数分しかかかりません AWSマネジメントコンソール またはAWS SDK。コンソールを使用するには、以下を参照してください。 MSK サーバーレス クラスターの使用を開始する。サーバーレスクラスターを作成すると、 AWS IDおよびアクセス管理 (IAM) ロールとクライアント マシン。

Python を使用して Kafka トピックを作成する

クラスターとクライアント マシンの準備ができたら、クライアント マシンに SSH で接続し、Kafka Python と Python 用の MSK IAM ライブラリをインストールします。

pip install kafka-python

pip install aws-msk-iam-sasl-signer-python

from kafka.admin import KafkaAdminClient, NewTopic
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# AWS region where MSK cluster is located
region= '<UPDATE_AWS_REGION_NAME_HERE>'

# Class to provide MSK authentication token
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

# Create an instance of MSKTokenProvider class
tp = MSKTokenProvider()

# Initialize KafkaAdminClient with required configurations
admin_client = KafkaAdminClient(
    bootstrap_servers='<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id='client1',
)

# create topic
topic_name="mytopic"
topic_list =[NewTopic(name=topic_name, num_partitions=1, replication_factor=2)]
existing_topics = admin_client.list_topics()
if(topic_name not in existing_topics):
    admin_client.create_topics(topic_list)
    print("Topic has been created")
else:
    print("topic already exists!. List of topics are:" + str(existing_topics))

  • 実行する createTopic.py という名前の新しい Kafka トピックを作成するスクリプト mytopic サーバーレスクラスター上で:
python createTopic.py

Python を使用してレコードを生成する

サンプル モデム テレメトリ データを生成してみましょう。

  • という新しいファイルを作成します kafkaDataGen.py.
  • 次のコードをこのファイルにコピーして、 BROKERS & region クラスターの詳細を含む情報:
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import random
from datetime import datetime
topicname='mytopic'

BROKERS = '<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>'
region= '<UPDATE_AWS_REGION_NAME_HERE>'
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers=BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    retry_backoff_ms=500,
    request_timeout_ms=20000,
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,)

# Method to get a random model name
def getModel():
    products=["Ultra WiFi Modem", "Ultra WiFi Booster", "EVG2000", "Sagemcom 5366 TN", "ASUS AX5400"]
    randomnum = random.randint(0, 4)
    return (products[randomnum])

# Method to get a random interface status
def getInterfaceStatus():
    status=["connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "down", "down"]
    randomnum = random.randint(0, 13)
    return (status[randomnum])

# Method to get a random CPU usage
def getCPU():
    i = random.randint(50, 100)
    return (str(i))

# Method to get a random memory usage
def getMemory():
    i = random.randint(1000, 1500)
    return (str(i))
    
# Method to generate sample data
def generateData():
    
    model=getModel()
    deviceid='dvc' + str(random.randint(1000, 10000))
    interface='eth4.1'
    interfacestatus=getInterfaceStatus()
    cpuusage=getCPU()
    memoryusage=getMemory()
    now = datetime.now()
    event_time = now.strftime("%Y-%m-%d %H:%M:%S")
    
    modem_data={}
    modem_data["model"]=model
    modem_data["deviceid"]=deviceid
    modem_data["interface"]=interface
    modem_data["interfacestatus"]=interfacestatus
    modem_data["cpuusage"]=cpuusage
    modem_data["memoryusage"]=memoryusage
    modem_data["event_time"]=event_time
    return modem_data

# Continuously generate and send data
while True:
    data =generateData()
    print(data)
    try:
        future = producer.send(topicname, value=data)
        producer.flush()
        record_metadata = future.get(timeout=10)
        
    except Exception as e:
        print(e.with_traceback())

  • 実行する kafkaDataGen.py ランダム データを継続的に生成し、指定された Kafka トピックにパブリッシュするには、次のようにします。
python kafkaDataGen.py

イベントを Amazon S3 に保存する

ここで、すべての生のイベント データを Amazon シンプル ストレージ サービス (Amazon S3) 分析用のデータレイク。同じデータを使用して ML モデルをトレーニングできます。の Amazon Data Firehose との統合 これにより、Amazon MSK は Apache Kafka クラスターから S3 データレイクにデータをシームレスにロードできます。 Kafka から Amazon S3 にデータを継続的にストリーミングするには、次の手順を実行します。これにより、独自のコネクタ アプリケーションを構築または管理する必要がなくなります。

  • Amazon S3 コンソールで、新しいバケットを作成します。既存のバケットを使用することもできます。
  • S3 バケットに次の名前の新しいフォルダーを作成します。 streamingDataLake.
  • Amazon MSK コンソールで、MSK サーバーレスクラスターを選択します。
  • ソフトウェア設定ページで、下図のように メニュー、選択 クラスター ポリシーの編集.

クラスタ ポリシー

  • 選択 Firehose サービス プリンシパルを含める 選択して 変更を保存します.

消防ホース サービス プリンシパル

  • ソフトウェア設定ページで、下図のように S3配信 タブを選択 配信ストリームを作成する.

配信ストリーム

  • ソース、選択する アマゾンMSK.
  • 開催場所、選択する アマゾンS3.

送信元と宛先

  • Amazon MSK クラスターの接続選択 プライベートブートストラップブローカー.
  • ご用件、トピック名を入力します (この投稿では、 mytopic).

ソース設定

  • S3バケット、選択する ブラウズ S3 バケットを選択します。
  • 入力します streamingDataLake S3 バケットのプレフィックスとして使用します。
  • 入力します streamingDataLakeErr S3 バケットのエラー出力プレフィックスとして使用します。

目的地設定

  • 選択する 配信ストリームを作成する.

配信ストリームを作成する

データが S3 バケットに書き込まれたことを確認できます。が表示されるはずです。 streamingDataLake ディレクトリが作成され、ファイルはパーティションに保存されます。

アマゾンs3

イベントを DynamoDB に保存する

最後のステップでは、最新のモデム データを DynamoDB に保存します。これにより、クライアント アプリケーションはモデムのステータスにアクセスし、どこからでもリモートでモデムと対話できるようになり、低遅延と高可用性が実現します。 Lambda は Amazon MSK とシームレスに連携します。 Lambda はイベントソースからの新しいメッセージを内部でポーリングし、ターゲットの Lambda 関数を同期的に呼び出します。 Lambda はメッセージをバッチで読み取り、イベント ペイロードとして関数に提供します。

まずはDynamoDBにテーブルを作成しましょう。参照する DynamoDB API 権限: アクション、リソース、条件のリファレンス クライアント マシンに必要な権限があることを確認します。

  • という新しいファイルを作成します createTable.py.
  • 次のコードをファイルにコピーして、 region 情報:
import boto3
region='<UPDATE_AWS_REGION_NAME_HERE>'
dynamodb = boto3.client('dynamodb', region_name=region)
table_name = 'device_status'
key_schema = [
    {
        'AttributeName': 'deviceid',
        'KeyType': 'HASH'
    }
]
attribute_definitions = [
    {
        'AttributeName': 'deviceid',
        'AttributeType': 'S'
    }
]
# Create the table with on-demand capacity mode
dynamodb.create_table(
    TableName=table_name,
    KeySchema=key_schema,
    AttributeDefinitions=attribute_definitions,
    BillingMode='PAY_PER_REQUEST'
)
print(f"Table '{table_name}' created with on-demand capacity mode.")

  • 実行する createTable.py というテーブルを作成するスクリプト device_status DynamoDB の場合:
python createTable.py

次に、Lambda 関数を構成しましょう。

  • Lambdaコンソールで、 機能 ナビゲーションペインに表示されます。
  • 選択する 関数を作成する.
  • 選択 最初から作成者.
  • 関数名¸ 名前を入力します (たとえば、 my-notification-kafka).
  • ランタイム、選択する Pythonの3.11.
  • 権限選択 既存の役割を使用する で役割を選択します クラスターから読み取る権限.
  • 関数を作成します。

Lambda 関数の設定ページで、ソース、宛先、アプリケーション コードを設定できるようになりました。

  • 選択する トリガーを追加.
  • トリガー構成、 入る MSK Amazon MSK を Lambda ソース関数のトリガーとして設定します。
  • MSK クラスター、 入る myCluster.
  • 選択を解除 トリガーをアクティブにする、Lambda 関数をまだ設定していないためです。
  • バッチサイズ、 入る 100.
  • 開始位置、選択する 最新.
  • トピック名¸ 名前を入力します (たとえば、 mytopic).
  • 選択する Add.
  • Lambda 関数の詳細ページの Code タブで、次のコードを入力します。
import base64
import boto3
import json
import os
import random

def convertjson(payload):
    try:
        aa=json.loads(payload)
        return aa
    except:
        return 'err'

def lambda_handler(event, context):
    base64records = event['records']['mytopic-0']
    
    raw_records = [base64.b64decode(x["value"]).decode('utf-8') for x in base64records]
    
    for record in raw_records:
        item = json.loads(record)
        deviceid=item['deviceid']
        interface=item['interface']
        interfacestatus=item['interfacestatus']
        cpuusage=item['cpuusage']
        memoryusage=item['memoryusage']
        event_time=item['event_time']
        
        dynamodb = boto3.client('dynamodb')
        table_name = 'device_status'
        item = {
            'deviceid': {'S': deviceid},  
            'interface': {'S': interface},               
            'interface': {'S': interface},
            'interfacestatus': {'S': interfacestatus},
            'cpuusage': {'S': cpuusage},          
            'memoryusage': {'S': memoryusage},
            'event_time': {'S': event_time},
        }
        
        # Write the item to the DynamoDB table
        response = dynamodb.put_item(
            TableName=table_name,
            Item=item
        )
        
        print(f"Item written to DynamoDB")

  • Lambda 関数をデプロイします。
  • ソフトウェア設定ページで、下図のように   タブを選択 編集 トリガーを編集します。

トリガーの編集

  • トリガーを選択し、 Save.
  • DynamoDBコンソールで、 アイテムを探索する ナビゲーションペインに表示されます。
  • テーブルを選択します device_status.

Lambda が Kafka トピックで生成されたイベントを DynamoDB に書き込んでいることがわかります。

ddbテーブル

まとめ

ストリーミング データ パイプラインは、リアルタイム アプリケーションを構築するために重要です。ただし、インフラストラクチャのセットアップと管理は困難な場合があります。この投稿では、Amazon MSK、Lambda、DynamoDB、Amazon Data Firehose などのサービスを使用して、AWS でサーバーレス ストリーミング パイプラインを構築する方法を説明しました。主な利点は、管理するサーバーが不要であること、インフラストラクチャの自動拡張性、フルマネージド サービスを使用した従量課金制モデルであることです。

独自のリアルタイム パイプラインを構築する準備はできていますか?無料の AWS アカウントを今すぐ始めましょう。サーバーレスの力を利用すると、AWS が差別化されていない重労働を処理しながら、ユーザーはアプリケーション ロジックに集中できます。 AWS で素晴らしいものを構築しましょう!


著者について

マスドゥール・ラハマン・サイエム AWS のストリーミング データ アーキテクトです。 彼は世界中の AWS のお客様と協力して、現実世界のビジネス上の問題を解決するためのデータ ストリーミング アーキテクチャを設計および構築しています。 ストリーミング データ サービスと NoSQL を使用するソリューションの最適化を専門としています。 Sayem は、分散コンピューティングに非常に情熱を注いでいます。

マイケル・オグイケ Amazon MSK のプロダクトマネージャーです。彼は、データを使用して行動を促す洞察を明らかにすることに情熱を持っています。彼は、幅広い業界の顧客がデータ ストリーミングを使用してビジネスを改善できるよう支援することに喜びを感じています。マイケルは、本やポッドキャストから行動科学や心理学について学ぶことも大好きです。

スポット画像

最新のインテリジェンス

スポット画像