Logo Zephyrnet

Crea una pipeline di streaming serverless end-to-end con Apache Kafka su Amazon MSK utilizzando Python | Servizi Web di Amazon

Data:

Il volume dei dati generati a livello globale continua ad aumentare, dai giochi, alla vendita al dettaglio e alla finanza, alla produzione, all’assistenza sanitaria e ai viaggi. Le organizzazioni sono alla ricerca di nuovi modi per utilizzare rapidamente il costante flusso di dati per innovare per le proprie aziende e i propri clienti. Devono acquisire, elaborare, analizzare e caricare in modo affidabile i dati in una miriade di archivi dati, il tutto in tempo reale.

Apache Kafka è una scelta popolare per queste esigenze di streaming in tempo reale. Tuttavia, può essere difficile configurare un cluster Kafka insieme ad altri componenti di elaborazione dati che si adattano automaticamente in base alle esigenze dell'applicazione. Si rischia di effettuare un provisioning insufficiente per i picchi di traffico, che può portare a tempi di inattività, o un provisioning eccessivo per il carico di base, con conseguenti sprechi. AWS offre più servizi serverless come Streaming gestito da Amazon per Apache Kafka (Amazzonia MSK), Amazon Data Firehose, Amazon DynamoDBe AWS Lambda che si ridimensiona automaticamente in base alle tue esigenze.

In questo post spieghiamo come utilizzare alcuni di questi servizi, tra cui MSK senza server, per creare una piattaforma dati serverless in grado di soddisfare le tue esigenze in tempo reale.

Panoramica della soluzione

Immaginiamo uno scenario. Sei responsabile della gestione di migliaia di modem per un provider di servizi Internet distribuito in più aree geografiche. Desideri monitorare la qualità della connettività del modem che ha un impatto significativo sulla produttività e sulla soddisfazione dei clienti. La tua distribuzione include diversi modem che devono essere monitorati e mantenuti per garantire tempi di inattività minimi. Ogni dispositivo trasmette migliaia di record da 1 KB ogni secondo, come l'utilizzo della CPU, l'utilizzo della memoria, l'allarme e lo stato della connessione. Desideri l'accesso in tempo reale a questi dati in modo da poter monitorare le prestazioni in tempo reale e rilevare e mitigare rapidamente i problemi. È inoltre necessario un accesso a lungo termine a questi dati per i modelli di machine learning (ML) per eseguire valutazioni di manutenzione predittiva, trovare opportunità di ottimizzazione e prevedere la domanda.

I tuoi clienti che raccolgono i dati in loco sono scritti in Python e possono inviare tutti i dati come argomenti Apache Kafka ad Amazon MSK. Per l'accesso ai dati a bassa latenza e in tempo reale della tua applicazione, puoi utilizzare Lambda e DynamoDB. Per l'archiviazione dei dati a lungo termine, è possibile utilizzare il servizio connettore serverless gestito Amazon Data Firehose per inviare dati al tuo data Lake.

Il diagramma seguente mostra come creare questa applicazione serverless end-to-end.

applicazione serverless end-to-end

Seguiamo i passaggi nelle sezioni seguenti per implementare questa architettura.

Crea un cluster Kafka serverless su Amazon MSK

Utilizziamo Amazon MSK per acquisire dati di telemetria in tempo reale dai modem. La creazione di un cluster Kafka serverless è semplice su Amazon MSK. Ci vogliono solo pochi minuti utilizzando il Console di gestione AWS o SDK AWS. Per utilizzare la console, fare riferimento a Iniziare a utilizzare i cluster MSK Serverless. Crei un cluster serverless, Gestione dell'identità e dell'accesso di AWS (IAM) e macchina client.

Crea un argomento Kafka utilizzando Python

Quando il cluster e la macchina client sono pronti, esegui SSH sulla macchina client e installa Kafka Python e la libreria MSK IAM per Python.

  • Esegui i comandi seguenti per installare Kafka Python e il file Libreria MSK IAM:
pip install kafka-python

pip install aws-msk-iam-sasl-signer-python

  • Crea un nuovo file chiamato createTopic.py.
  • Copia il seguente codice in questo file, sostituendo il file bootstrap_servers ed region informazioni con i dettagli del tuo cluster. Per istruzioni su come recuperare il file bootstrap_servers informazioni per il tuo cluster MSK, vedi Ottenere i broker bootstrap per un cluster Amazon MSK.
from kafka.admin import KafkaAdminClient, NewTopic
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# AWS region where MSK cluster is located
region= '<UPDATE_AWS_REGION_NAME_HERE>'

# Class to provide MSK authentication token
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

# Create an instance of MSKTokenProvider class
tp = MSKTokenProvider()

# Initialize KafkaAdminClient with required configurations
admin_client = KafkaAdminClient(
    bootstrap_servers='<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id='client1',
)

# create topic
topic_name="mytopic"
topic_list =[NewTopic(name=topic_name, num_partitions=1, replication_factor=2)]
existing_topics = admin_client.list_topics()
if(topic_name not in existing_topics):
    admin_client.create_topics(topic_list)
    print("Topic has been created")
else:
    print("topic already exists!. List of topics are:" + str(existing_topics))

  • Corri il createTopic.py script per creare un nuovo argomento Kafka chiamato mytopic sul tuo cluster serverless:
python createTopic.py

Produrre record utilizzando Python

Generiamo alcuni dati di telemetria del modem di esempio.

  • Crea un nuovo file chiamato kafkaDataGen.py.
  • Copia il seguente codice in questo file, aggiornando il file BROKERS ed region informazioni con i dettagli per il tuo cluster:
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import random
from datetime import datetime
topicname='mytopic'

BROKERS = '<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>'
region= '<UPDATE_AWS_REGION_NAME_HERE>'
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers=BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    retry_backoff_ms=500,
    request_timeout_ms=20000,
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,)

# Method to get a random model name
def getModel():
    products=["Ultra WiFi Modem", "Ultra WiFi Booster", "EVG2000", "Sagemcom 5366 TN", "ASUS AX5400"]
    randomnum = random.randint(0, 4)
    return (products[randomnum])

# Method to get a random interface status
def getInterfaceStatus():
    status=["connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "down", "down"]
    randomnum = random.randint(0, 13)
    return (status[randomnum])

# Method to get a random CPU usage
def getCPU():
    i = random.randint(50, 100)
    return (str(i))

# Method to get a random memory usage
def getMemory():
    i = random.randint(1000, 1500)
    return (str(i))
    
# Method to generate sample data
def generateData():
    
    model=getModel()
    deviceid='dvc' + str(random.randint(1000, 10000))
    interface='eth4.1'
    interfacestatus=getInterfaceStatus()
    cpuusage=getCPU()
    memoryusage=getMemory()
    now = datetime.now()
    event_time = now.strftime("%Y-%m-%d %H:%M:%S")
    
    modem_data={}
    modem_data["model"]=model
    modem_data["deviceid"]=deviceid
    modem_data["interface"]=interface
    modem_data["interfacestatus"]=interfacestatus
    modem_data["cpuusage"]=cpuusage
    modem_data["memoryusage"]=memoryusage
    modem_data["event_time"]=event_time
    return modem_data

# Continuously generate and send data
while True:
    data =generateData()
    print(data)
    try:
        future = producer.send(topicname, value=data)
        producer.flush()
        record_metadata = future.get(timeout=10)
        
    except Exception as e:
        print(e.with_traceback())

  • Corri il kafkaDataGen.py per generare continuamente dati casuali e pubblicarli nell'argomento Kafka specificato:
python kafkaDataGen.py

Archiviare eventi in Amazon S3

Ora memorizzi tutti i dati grezzi degli eventi in un file Servizio di archiviazione semplice Amazon (Amazon S3) data Lake per l'analisi. Puoi utilizzare gli stessi dati per addestrare modelli ML. IL integrazione con Amazon Data Firehose consente ad Amazon MSK di caricare facilmente i dati dai cluster Apache Kafka in un data Lake S3. Completa i seguenti passaggi per eseguire lo streaming continuo dei dati da Kafka ad Amazon S3, eliminando la necessità di creare o gestire le tue applicazioni di connessione:

  • Nella console Amazon S3, crea un nuovo bucket. Puoi anche utilizzare un bucket esistente.
  • Crea una nuova cartella nel tuo bucket S3 chiamata streamingDataLake.
  • Nella console Amazon MSK, scegli il tuo cluster MSK Serverless.
  • Sulla Azioni menù, scegliere Modifica criterio cluster.

politica del cluster

  • Seleziona Includere l'entità servizio Firehose e scegli Salvare le modifiche.

preside del servizio manichette antincendio

  • Sulla Consegna S3 scheda, scegliere Crea flusso di consegna.

flusso di consegna

  • Nel Fontescegli AmazonMSK.
  • Nel Nei Dintorniscegli Amazon S3.

fonte e destinazione

  • Nel Connettività del cluster Amazon MSK, selezionare Broker di bootstrap privati.
  • Nel Argomento, inserisci il nome di un argomento (per questo post, mytopic).

impostazioni della sorgente

  • Nel Benna S3scegli Scopri la nostra gamma di prodotti e scegli il tuo secchio S3.
  • entrare streamingDataLake come prefisso del bucket S3.
  • entrare streamingDataLakeErr come prefisso di output dell'errore del bucket S3.

impostazioni di destinazione

  • Scegli Crea flusso di consegna.

creare un flusso di consegna

Puoi verificare che i dati siano stati scritti nel tuo bucket S3. Dovresti vedere che il streamingDataLake è stata creata la directory e i file sono archiviati in partizioni.

amazon s3

Archivia gli eventi in DynamoDB

Nell'ultimo passaggio, memorizzi i dati del modem più recenti in DynamoDB. Ciò consente all'applicazione client di accedere allo stato del modem e interagire con il modem in remoto da qualsiasi luogo, con bassa latenza e disponibilità elevata. Lambda funziona perfettamente con Amazon MSK. Lambda esegue internamente il polling dei nuovi messaggi dall'origine dell'evento e quindi richiama in modo sincrono la funzione Lambda di destinazione. Lambda legge i messaggi in batch e li fornisce alla tua funzione come payload dell'evento.

Creiamo innanzitutto una tabella in DynamoDB. Fare riferimento a Autorizzazioni API DynamoDB: riferimento ad azioni, risorse e condizioni per verificare che il computer client disponga delle autorizzazioni necessarie.

  • Crea un nuovo file chiamato createTable.py.
  • Copia il seguente codice nel file, aggiornando il file region informazioni:
import boto3
region='<UPDATE_AWS_REGION_NAME_HERE>'
dynamodb = boto3.client('dynamodb', region_name=region)
table_name = 'device_status'
key_schema = [
    {
        'AttributeName': 'deviceid',
        'KeyType': 'HASH'
    }
]
attribute_definitions = [
    {
        'AttributeName': 'deviceid',
        'AttributeType': 'S'
    }
]
# Create the table with on-demand capacity mode
dynamodb.create_table(
    TableName=table_name,
    KeySchema=key_schema,
    AttributeDefinitions=attribute_definitions,
    BillingMode='PAY_PER_REQUEST'
)
print(f"Table '{table_name}' created with on-demand capacity mode.")

  • Corri il createTable.py script per creare una tabella chiamata device_status in DynamoDB:
python createTable.py

Ora configuriamo la funzione Lambda.

  • Sulla console Lambda, selezionare funzioni nel pannello di navigazione.
  • Scegli Crea funzione.
  • Seleziona Autore da zero.
  • Nel Nome della funzioneinserire un nome (ad esempio, my-notification-kafka).
  • Nel Runtimescegli Python 3.11.
  • Nel Permessi, selezionare Usa un ruolo esistente e scegli un ruolo con autorizzazioni per leggere dal tuo cluster.
  • Crea la funzione.

Nella pagina di configurazione della funzione Lambda ora puoi configurare origini, destinazioni e il codice dell'applicazione.

  • Scegli Aggiungi trigger.
  • Nel Configurazione trigger, accedere MSK per configurare Amazon MSK come trigger per la funzione di origine Lambda.
  • Nel Cluster MSK, accedere myCluster.
  • Deseleziona Attiva il grilletto, perché non hai ancora configurato la funzione Lambda.
  • Nel Dimensione del lotto, accedere 100.
  • Nel Posizione di partenzascegli Ultime.
  • Nel Nome argomentoinserire un nome (ad esempio, mytopic).
  • Scegli Aggiungi.
  • Nella pagina dei dettagli della funzione Lambda, nel file Code scheda, inserisci il seguente codice:
import base64
import boto3
import json
import os
import random

def convertjson(payload):
    try:
        aa=json.loads(payload)
        return aa
    except:
        return 'err'

def lambda_handler(event, context):
    base64records = event['records']['mytopic-0']
    
    raw_records = [base64.b64decode(x["value"]).decode('utf-8') for x in base64records]
    
    for record in raw_records:
        item = json.loads(record)
        deviceid=item['deviceid']
        interface=item['interface']
        interfacestatus=item['interfacestatus']
        cpuusage=item['cpuusage']
        memoryusage=item['memoryusage']
        event_time=item['event_time']
        
        dynamodb = boto3.client('dynamodb')
        table_name = 'device_status'
        item = {
            'deviceid': {'S': deviceid},  
            'interface': {'S': interface},               
            'interface': {'S': interface},
            'interfacestatus': {'S': interfacestatus},
            'cpuusage': {'S': cpuusage},          
            'memoryusage': {'S': memoryusage},
            'event_time': {'S': event_time},
        }
        
        # Write the item to the DynamoDB table
        response = dynamodb.put_item(
            TableName=table_name,
            Item=item
        )
        
        print(f"Item written to DynamoDB")

  • Distribuisci la funzione Lambda.
  • Sulla Configurazione scheda, scegliere Modifica per modificare il trigger.

modifica trigger

  • Seleziona il trigger, quindi scegli Risparmi.
  • Sulla console DynamoDB, scegli Esplora gli elementi nel pannello di navigazione.
  • Seleziona la tabella device_status.

Vedrai che Lambda sta scrivendo gli eventi generati nell'argomento Kafka su DynamoDB.

tabella ddb

Sommario

Le pipeline di dati in streaming sono fondamentali per la creazione di applicazioni in tempo reale. Tuttavia, la creazione e la gestione dell’infrastruttura possono essere scoraggianti. In questo post, abbiamo illustrato come creare una pipeline di streaming serverless su AWS utilizzando Amazon MSK, Lambda, DynamoDB, Amazon Data Firehose e altri servizi. I vantaggi principali sono l'assenza di server da gestire, la scalabilità automatica dell'infrastruttura e un modello pay-as-you-go che utilizza servizi completamente gestiti.

Pronto a costruire la tua pipeline in tempo reale? Inizia oggi con un account AWS gratuito. Con la potenza del serverless, puoi concentrarti sulla logica della tua applicazione mentre AWS gestisce il lavoro pesante indifferenziato. Costruiamo qualcosa di fantastico su AWS!


Informazioni sugli autori

Masudur Rahaman Sayem è uno Streaming Data Architect presso AWS. Lavora con i clienti AWS a livello globale per progettare e costruire architetture di streaming di dati per risolvere problemi aziendali reali. È specializzato nell'ottimizzazione di soluzioni che utilizzano servizi di dati in streaming e NoSQL. Sayem è molto appassionato di calcolo distribuito.

Michael Oguike è un Product Manager per Amazon MSK. La sua passione è l'utilizzo dei dati per scoprire informazioni che guidano l'azione. Gli piace aiutare i clienti di un'ampia gamma di settori a migliorare le loro attività utilizzando lo streaming di dati. Michael ama anche conoscere la scienza comportamentale e la psicologia da libri e podcast.

spot_img

L'ultima intelligenza

spot_img