Zephyrnet-Logo

Erstellen Sie mit Apache Kafka auf Amazon MSK unter Verwendung von Python | eine serverlose End-to-End-Streaming-Pipeline Amazon Web Services

Datum:

Das weltweit generierte Datenvolumen nimmt weiter zu, von Spielen, Einzelhandel und Finanzen bis hin zu Produktion, Gesundheitswesen und Reisen. Unternehmen suchen nach mehr Möglichkeiten, den ständigen Datenzufluss schnell für Innovationen für ihre Unternehmen und Kunden zu nutzen. Sie müssen die Daten zuverlässig erfassen, verarbeiten, analysieren und in unzählige Datenspeicher laden – und das alles in Echtzeit.

Apache Kafka ist eine beliebte Wahl für diese Echtzeit-Streaming-Anforderungen. Allerdings kann es schwierig sein, einen Kafka-Cluster zusammen mit anderen Datenverarbeitungskomponenten einzurichten, die je nach den Anforderungen Ihrer Anwendung automatisch skaliert werden. Es besteht die Gefahr einer unzureichenden Bereitstellung für Spitzendatenverkehr, was zu Ausfallzeiten führen kann, oder einer übermäßigen Bereitstellung für die Grundlast, was zu Verschwendung führen kann. AWS bietet mehrere serverlose Dienste wie Amazon Managed Streaming für Apache Kafka (Amazon MSK), Amazon Data Firehose, Amazon DynamoDB und AWS Lambda Diese skaliert automatisch entsprechend Ihren Anforderungen.

In diesem Beitrag erklären wir, wie Sie einige dieser Dienste nutzen können, darunter MSK Serverlos, um eine serverlose Datenplattform aufzubauen, die Ihren Echtzeitanforderungen entspricht.

Lösungsüberblick

Stellen wir uns ein Szenario vor. Sie sind für die Verwaltung Tausender Modems für einen Internetdienstanbieter verantwortlich, der in mehreren Regionen eingesetzt wird. Sie möchten die Qualität der Modemverbindung überwachen, die einen erheblichen Einfluss auf die Produktivität und Zufriedenheit der Kunden hat. Ihre Bereitstellung umfasst verschiedene Modems, die überwacht und gewartet werden müssen, um minimale Ausfallzeiten zu gewährleisten. Jedes Gerät überträgt jede Sekunde Tausende von 1-KB-Datensätzen, z. B. CPU-Auslastung, Speichernutzung, Alarm und Verbindungsstatus. Sie möchten Echtzeitzugriff auf diese Daten, damit Sie die Leistung in Echtzeit überwachen und Probleme schnell erkennen und beheben können. Sie benötigen außerdem einen längerfristigen Zugriff auf diese Daten für Modelle des maschinellen Lernens (ML), um vorausschauende Wartungsbewertungen durchzuführen, Optimierungsmöglichkeiten zu finden und den Bedarf zu prognostizieren.

Ihre Clients, die die Daten vor Ort sammeln, sind in Python geschrieben und können alle Daten als Apache Kafka-Themen an Amazon MSK senden. Für die geringe Latenz und den Echtzeit-Datenzugriff Ihrer Anwendung können Sie Folgendes verwenden Lambda und DynamoDB. Für die längerfristige Datenspeicherung können Sie den verwalteten serverlosen Connector-Dienst verwenden Amazon Data Firehose um Daten an Ihren Data Lake zu senden.

Das folgende Diagramm zeigt, wie Sie diese serverlose End-to-End-Anwendung erstellen können.

End-to-End-Anwendung ohne Server

Befolgen wir die Schritte in den folgenden Abschnitten, um diese Architektur zu implementieren.

Erstellen Sie einen serverlosen Kafka-Cluster auf Amazon MSK

Wir verwenden Amazon MSK, um Echtzeit-Telemetriedaten von Modems aufzunehmen. Das Erstellen eines serverlosen Kafka-Clusters ist auf Amazon MSK unkompliziert. Die Verwendung dauert nur wenige Minuten AWS-Managementkonsole oder AWS SDK. Informationen zur Verwendung der Konsole finden Sie unter Erste Schritte mit MSK Serverless-Clustern. Sie erstellen einen serverlosen Cluster, AWS Identity and Access Management and (IAM)-Rolle und Client-Rechner.

Erstellen Sie ein Kafka-Thema mit Python

Wenn Ihr Cluster und Ihr Client-Computer bereit sind, stellen Sie eine SSH-Verbindung zu Ihrem Client-Computer her und installieren Sie Kafka Python und die MSK IAM-Bibliothek für Python.

  • Führen Sie die folgenden Befehle aus, um Kafka Python und zu installieren MSK IAM-Bibliothek:
pip install kafka-python

pip install aws-msk-iam-sasl-signer-python

  • Erstellen Sie eine neue Datei mit dem Namen createTopic.py.
  • Kopieren Sie den folgenden Code in diese Datei und ersetzen Sie den bootstrap_servers und region Informationen mit den Details für Ihren Cluster. Anweisungen zum Abrufen der bootstrap_servers Informationen zu Ihrem MSK-Cluster finden Sie unter Abrufen der Bootstrap-Broker für einen Amazon MSK-Cluster.
from kafka.admin import KafkaAdminClient, NewTopic
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# AWS region where MSK cluster is located
region= '<UPDATE_AWS_REGION_NAME_HERE>'

# Class to provide MSK authentication token
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

# Create an instance of MSKTokenProvider class
tp = MSKTokenProvider()

# Initialize KafkaAdminClient with required configurations
admin_client = KafkaAdminClient(
    bootstrap_servers='<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id='client1',
)

# create topic
topic_name="mytopic"
topic_list =[NewTopic(name=topic_name, num_partitions=1, replication_factor=2)]
existing_topics = admin_client.list_topics()
if(topic_name not in existing_topics):
    admin_client.create_topics(topic_list)
    print("Topic has been created")
else:
    print("topic already exists!. List of topics are:" + str(existing_topics))

  • Führen Sie die createTopic.py Skript zum Erstellen eines neuen Kafka-Themas namens mytopic auf Ihrem serverlosen Cluster:
python createTopic.py

Erstellen Sie Datensätze mit Python

Lassen Sie uns einige Beispielmodem-Telemetriedaten generieren.

  • Erstellen Sie eine neue Datei mit dem Namen kafkaDataGen.py.
  • Kopieren Sie den folgenden Code in diese Datei und aktualisieren Sie die BROKERS und region Informationen mit den Details zu Ihrem Cluster:
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import random
from datetime import datetime
topicname='mytopic'

BROKERS = '<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>'
region= '<UPDATE_AWS_REGION_NAME_HERE>'
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers=BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    retry_backoff_ms=500,
    request_timeout_ms=20000,
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,)

# Method to get a random model name
def getModel():
    products=["Ultra WiFi Modem", "Ultra WiFi Booster", "EVG2000", "Sagemcom 5366 TN", "ASUS AX5400"]
    randomnum = random.randint(0, 4)
    return (products[randomnum])

# Method to get a random interface status
def getInterfaceStatus():
    status=["connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "down", "down"]
    randomnum = random.randint(0, 13)
    return (status[randomnum])

# Method to get a random CPU usage
def getCPU():
    i = random.randint(50, 100)
    return (str(i))

# Method to get a random memory usage
def getMemory():
    i = random.randint(1000, 1500)
    return (str(i))
    
# Method to generate sample data
def generateData():
    
    model=getModel()
    deviceid='dvc' + str(random.randint(1000, 10000))
    interface='eth4.1'
    interfacestatus=getInterfaceStatus()
    cpuusage=getCPU()
    memoryusage=getMemory()
    now = datetime.now()
    event_time = now.strftime("%Y-%m-%d %H:%M:%S")
    
    modem_data={}
    modem_data["model"]=model
    modem_data["deviceid"]=deviceid
    modem_data["interface"]=interface
    modem_data["interfacestatus"]=interfacestatus
    modem_data["cpuusage"]=cpuusage
    modem_data["memoryusage"]=memoryusage
    modem_data["event_time"]=event_time
    return modem_data

# Continuously generate and send data
while True:
    data =generateData()
    print(data)
    try:
        future = producer.send(topicname, value=data)
        producer.flush()
        record_metadata = future.get(timeout=10)
        
    except Exception as e:
        print(e.with_traceback())

  • Führen Sie die kafkaDataGen.py um kontinuierlich Zufallsdaten zu generieren und diese im angegebenen Kafka-Thema zu veröffentlichen:
python kafkaDataGen.py

Speichern Sie Ereignisse in Amazon S3

Jetzt speichern Sie alle rohen Ereignisdaten in einem Amazon Simple Storage-Service (Amazon S3) Data Lake für Analysen. Sie können dieselben Daten verwenden, um ML-Modelle zu trainieren. Der Integration mit Amazon Data Firehose ermöglicht Amazon MSK das nahtlose Laden von Daten aus Ihren Apache Kafka-Clustern in einen S3-Datensee. Führen Sie die folgenden Schritte aus, um kontinuierlich Daten von Kafka an Amazon S3 zu streamen, sodass Sie keine eigenen Connector-Anwendungen erstellen oder verwalten müssen:

  • Erstellen Sie auf der Amazon S3-Konsole einen neuen Bucket. Sie können auch einen vorhandenen Bucket verwenden.
  • Erstellen Sie in Ihrem S3-Bucket einen neuen Ordner mit dem Namen streamingDataLake.
  • Wählen Sie in der Amazon MSK-Konsole Ihren MSK Serverless-Cluster aus.
  • Auf dem Aktionen Menü, wählen Sie Clusterrichtlinie bearbeiten.

Cluster-Richtlinie

  • Auswählen Schließen Sie den Firehose-Dienstprinzipal ein und wählen Sie Änderungen speichern.

Feuerwehrdienstleiter

  • Auf dem S3-Lieferung Tab, wählen Sie Lieferstrom erstellen.

Lieferstrom

  • Aussichten für Quelle, wählen Amazon MSK.
  • Aussichten für Reiseziel, wählen Amazon S3.

Quelle und Ziel

  • Aussichten für Amazon MSK-Cluster-KonnektivitätWählen Private Bootstrap-Broker.
  • Aussichten für Betreff, geben Sie einen Themennamen ein (für diesen Beitrag, mytopic).

Quelleneinstellungen

  • Aussichten für S3-Eimer, wählen Entdecken und wählen Sie Ihren S3-Bucket aus.
  • Enter streamingDataLake als Ihr S3-Bucket-Präfix.
  • Enter streamingDataLakeErr als Ihr S3-Bucket-Fehlerausgabepräfix.

Zieleinstellungen

  • Auswählen Lieferstrom erstellen.

Lieferstrom erstellen

Sie können überprüfen, ob die Daten in Ihren S3-Bucket geschrieben wurden. Das solltest du sehen streamingDataLake Das Verzeichnis wurde erstellt und die Dateien werden in Partitionen gespeichert.

amazon s3

Speichern Sie Ereignisse in DynamoDB

Im letzten Schritt speichern Sie die aktuellsten Modemdaten in DynamoDB. Dadurch kann die Client-Anwendung von überall aus auf den Modemstatus zugreifen und mit dem Modem interagieren, mit geringer Latenz und hoher Verfügbarkeit. Lambda arbeitet nahtlos mit Amazon MSK zusammen. Lambda fragt intern nach neuen Nachrichten von der Ereignisquelle und ruft dann synchron die Ziel-Lambda-Funktion auf. Lambda liest die Nachrichten stapelweise und stellt sie Ihrer Funktion als Ereignisnutzlast zur Verfügung.

Erstellen wir zunächst eine Tabelle in DynamoDB. Beziehen auf DynamoDB-API-Berechtigungen: Referenz zu Aktionen, Ressourcen und Bedingungen um zu überprüfen, ob Ihr Client-Computer über die erforderlichen Berechtigungen verfügt.

  • Erstellen Sie eine neue Datei mit dem Namen createTable.py.
  • Kopieren Sie den folgenden Code in die Datei und aktualisieren Sie die region Informationen:
import boto3
region='<UPDATE_AWS_REGION_NAME_HERE>'
dynamodb = boto3.client('dynamodb', region_name=region)
table_name = 'device_status'
key_schema = [
    {
        'AttributeName': 'deviceid',
        'KeyType': 'HASH'
    }
]
attribute_definitions = [
    {
        'AttributeName': 'deviceid',
        'AttributeType': 'S'
    }
]
# Create the table with on-demand capacity mode
dynamodb.create_table(
    TableName=table_name,
    KeySchema=key_schema,
    AttributeDefinitions=attribute_definitions,
    BillingMode='PAY_PER_REQUEST'
)
print(f"Table '{table_name}' created with on-demand capacity mode.")

  • Führen Sie die createTable.py Skript zum Erstellen einer Tabelle namens device_status in DynamoDB:
python createTable.py

Lassen Sie uns nun die Lambda-Funktion konfigurieren.

  • Wählen Sie auf der Lambda-Konsole Funktionen im Navigationsbereich.
  • Auswählen Funktion erstellen.
  • Auswählen Autor von Grund auf neu.
  • Aussichten für Funktionsname¸ Geben Sie einen Namen ein (z. B. my-notification-kafka).
  • Aussichten für Laufzeit, wählen Python 3.11.
  • Aussichten für BerechtigungenWählen Verwenden Sie eine vorhandene Rolle und wähle eine Rolle mit Berechtigungen zum Lesen aus Ihrem Cluster.
  • Erstellen Sie die Funktion.

Auf der Konfigurationsseite der Lambda-Funktion können Sie jetzt Quellen, Ziele und Ihren Anwendungscode konfigurieren.

  • Auswählen Trigger hinzufügen.
  • Aussichten für Konfiguration auslösen, eingeben MSK um Amazon MSK als Auslöser für die Lambda-Quellfunktion zu konfigurieren.
  • Aussichten für MSK-Cluster, eingeben myCluster.
  • Deaktivieren Auslöser aktivieren, weil Sie Ihre Lambda-Funktion noch nicht konfiguriert haben.
  • Aussichten für Chargengröße, eingeben 100.
  • Aussichten für Startposition, wählen Aktuelle.
  • Aussichten für Themenname¸ Geben Sie einen Namen ein (z. B. mytopic).
  • Auswählen Speichern.
  • Auf der Lambda-Funktionsdetailseite auf der Code Geben Sie auf der Registerkarte den folgenden Code ein:
import base64
import boto3
import json
import os
import random

def convertjson(payload):
    try:
        aa=json.loads(payload)
        return aa
    except:
        return 'err'

def lambda_handler(event, context):
    base64records = event['records']['mytopic-0']
    
    raw_records = [base64.b64decode(x["value"]).decode('utf-8') for x in base64records]
    
    for record in raw_records:
        item = json.loads(record)
        deviceid=item['deviceid']
        interface=item['interface']
        interfacestatus=item['interfacestatus']
        cpuusage=item['cpuusage']
        memoryusage=item['memoryusage']
        event_time=item['event_time']
        
        dynamodb = boto3.client('dynamodb')
        table_name = 'device_status'
        item = {
            'deviceid': {'S': deviceid},  
            'interface': {'S': interface},               
            'interface': {'S': interface},
            'interfacestatus': {'S': interfacestatus},
            'cpuusage': {'S': cpuusage},          
            'memoryusage': {'S': memoryusage},
            'event_time': {'S': event_time},
        }
        
        # Write the item to the DynamoDB table
        response = dynamodb.put_item(
            TableName=table_name,
            Item=item
        )
        
        print(f"Item written to DynamoDB")

  • Stellen Sie die Lambda-Funktion bereit.
  • Auf dem Konfiguration Tab, wählen Sie Bearbeiten um den Auslöser zu bearbeiten.

Auslöser bearbeiten

  • Wählen Sie den Auslöser aus und wählen Sie dann Speichern.
  • Wählen Sie in der DynamoDB-Konsole Gegenstände erkunden im Navigationsbereich.
  • Wähle den Tisch aus device_status.

Sie werden sehen, dass Lambda im Kafka-Thema generierte Ereignisse in DynamoDB schreibt.

ddb-Tabelle

Zusammenfassung

Streaming-Datenpipelines sind für die Erstellung von Echtzeitanwendungen von entscheidender Bedeutung. Allerdings kann die Einrichtung und Verwaltung der Infrastruktur entmutigend sein. In diesem Beitrag haben wir erläutert, wie Sie mithilfe von Amazon MSK, Lambda, DynamoDB, Amazon Data Firehose und anderen Diensten eine serverlose Streaming-Pipeline auf AWS erstellen. Die Hauptvorteile sind, dass keine Server verwaltet werden müssen, die Infrastruktur automatisch skalierbar ist und ein Pay-as-you-go-Modell mit vollständig verwalteten Diensten zur Verfügung steht.

Sind Sie bereit, Ihre eigene Echtzeit-Pipeline aufzubauen? Beginnen Sie noch heute mit einem kostenlosen AWS-Konto. Mit der Leistung von Serverless können Sie sich auf Ihre Anwendungslogik konzentrieren, während AWS die undifferenzierte Schwerarbeit übernimmt. Lassen Sie uns etwas Großartiges auf AWS aufbauen!


Über die Autoren

Masudur Rahaman Sayem ist Streaming Data Architect bei AWS. Er arbeitet weltweit mit AWS-Kunden zusammen, um Daten-Streaming-Architekturen zu entwerfen und aufzubauen, um reale Geschäftsprobleme zu lösen. Er ist spezialisiert auf die Optimierung von Lösungen, die Streaming-Datendienste und NoSQL verwenden. Sayem hat eine große Leidenschaft für verteiltes Rechnen.

Michael Oguike ist Produktmanager für Amazon MSK. Seine Leidenschaft besteht darin, Daten zu nutzen, um Erkenntnisse zu gewinnen, die zum Handeln führen. Es macht ihm Spaß, Kunden aus den unterschiedlichsten Branchen dabei zu helfen, ihr Geschäft mithilfe von Daten-Streaming zu verbessern. Michael lernt auch gerne aus Büchern und Podcasts etwas über Verhaltenswissenschaft und Psychologie.

spot_img

Neueste Intelligenz

spot_img