Zephyrnet Logosu

Python kullanarak Amazon MSK'da Apache Kafka ile uçtan uca sunucusuz bir akış hattı oluşturun | Amazon Web Hizmetleri

Tarih:

Oyun, perakende ve finanstan üretime, sağlık hizmetlerine ve seyahate kadar küresel olarak üretilen veri hacmi artmaya devam ediyor. Kuruluşlar, işletmeleri ve müşterileri için yenilik yapmak amacıyla sürekli veri akışını hızla kullanmanın daha fazla yolunu arıyor. Verileri güvenilir bir şekilde gerçek zamanlı olarak yakalamaları, işlemeleri, analiz etmeleri ve sayısız veri deposuna yüklemeleri gerekiyor.

Apache Kafka, bu gerçek zamanlı akış ihtiyaçları için popüler bir seçimdir. Ancak uygulamanızın ihtiyaçlarına göre otomatik olarak ölçeklenen diğer veri işleme bileşenleriyle birlikte bir Kafka kümesi oluşturmak zor olabilir. Yoğun trafik için yetersiz kaynak sağlama riskiyle karşı karşıya kalırsınız, bu da kesinti süresine yol açabilir veya temel yük için aşırı kaynak ayırarak israfa yol açabilir. AWS, aşağıdakiler gibi birden fazla sunucusuz hizmet sunar: Apache Kafka için Amazon Tarafından Yönetilen Akış (Amazon MSK), Amazon Veri Firehose, Amazon DinamoDB, ve AWS Lambda ihtiyaçlarınıza göre otomatik olarak ölçeklenir.

Bu yazıda, bu hizmetlerden bazılarını nasıl kullanabileceğinizi açıklıyoruz: MSK Sunucusuz, gerçek zamanlı ihtiyaçlarınızı karşılayacak sunucusuz bir veri platformu oluşturmak için.

Çözüme genel bakış

Bir senaryo hayal edelim. Birden fazla coğrafyaya dağıtılan bir internet servis sağlayıcısının binlerce modemini yönetmek sizin sorumluluğunuzdadır. Müşteri verimliliği ve memnuniyeti üzerinde önemli etkisi olan modem bağlantı kalitesini izlemek istiyorsunuz. Dağıtımınız, minimum kesinti süresini sağlamak için izlenmesi ve bakımı yapılması gereken farklı modemler içerir. Her cihaz, CPU kullanımı, hafıza kullanımı, alarm ve bağlantı durumu gibi her saniye binlerce 1 KB kayıt iletir. Performansı gerçek zamanlı olarak izleyebilmek ve sorunları hızla tespit edip azaltabilmek için bu verilere gerçek zamanlı erişim istiyorsunuz. Tahmine dayalı bakım değerlendirmeleri yürütmek, optimizasyon fırsatlarını bulmak ve talebi tahmin etmek amacıyla makine öğrenimi (ML) modelleri için de bu verilere daha uzun vadeli erişime ihtiyacınız var.

Verileri yerinde toplayan istemcileriniz Python'da yazılmıştır ve tüm verileri Apache Kafka konuları olarak Amazon MSK'ya gönderebilirler. Uygulamanızın düşük gecikmeli ve gerçek zamanlı veri erişimi için şunları kullanabilirsiniz: Lambda ve DynamoDB. Daha uzun vadeli veri depolama için yönetilen sunucusuz bağlayıcı hizmetini kullanabilirsiniz Amazon Veri Firehose Veri gölünüze veri göndermek için.

Aşağıdaki şemada bu uçtan uca sunucusuz uygulamayı nasıl oluşturabileceğiniz gösterilmektedir.

uçtan uca sunucusuz uygulama

Bu mimariyi uygulamak için aşağıdaki bölümlerdeki adımları takip edelim.

Amazon MSK'da sunucusuz Kafka kümesi oluşturma

Modemlerden gerçek zamanlı telemetri verilerini almak için Amazon MSK'yı kullanıyoruz. Amazon MSK'da sunucusuz bir Kafka kümesi oluşturmak basittir. Kullanımı yalnızca birkaç dakika sürer AWS Yönetim Konsolu veya AWS SDK'sı. Konsolu kullanmak için bkz. MSK Sunucusuz kümelerini kullanmaya başlama. Sunucusuz bir küme oluşturursunuz, AWS Kimlik ve Erişim Yönetimi (IAM) rolü ve istemci makinesi.

Python kullanarak Kafka konusu oluşturma

Kümeniz ve istemci makineniz hazır olduğunda, istemci makinenize SSH uygulayın ve Kafka Python'u ve Python için MSK IAM kitaplığını yükleyin.

pip install kafka-python

pip install aws-msk-iam-sasl-signer-python

  • Adlı yeni bir dosya oluşturun createTopic.py.
  • Aşağıdaki kodu bu dosyaya kopyalayın ve yerine bootstrap_servers ve region kümenizin ayrıntılarını içeren bilgiler. Geri alma talimatları için bootstrap_servers MSK kümenize ilişkin bilgiler için bkz. Bir Amazon MSK kümesi için önyükleme aracılarını alma.
from kafka.admin import KafkaAdminClient, NewTopic
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# AWS region where MSK cluster is located
region= '<UPDATE_AWS_REGION_NAME_HERE>'

# Class to provide MSK authentication token
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

# Create an instance of MSKTokenProvider class
tp = MSKTokenProvider()

# Initialize KafkaAdminClient with required configurations
admin_client = KafkaAdminClient(
    bootstrap_servers='<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id='client1',
)

# create topic
topic_name="mytopic"
topic_list =[NewTopic(name=topic_name, num_partitions=1, replication_factor=2)]
existing_topics = admin_client.list_topics()
if(topic_name not in existing_topics):
    admin_client.create_topics(topic_list)
    print("Topic has been created")
else:
    print("topic already exists!. List of topics are:" + str(existing_topics))

  • Çalıştır createTopic.py adında yeni bir Kafka konusu oluşturmak için komut dosyası mytopic sunucusuz kümenizde:
python createTopic.py

Python kullanarak kayıtlar üretin

Bazı örnek modem telemetri verileri oluşturalım.

  • Adlı yeni bir dosya oluşturun kafkaDataGen.py.
  • Aşağıdaki kodu bu dosyaya kopyalayıp güncelleyin. BROKERS ve region Kümenizin ayrıntılarını içeren bilgiler:
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import random
from datetime import datetime
topicname='mytopic'

BROKERS = '<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>'
region= '<UPDATE_AWS_REGION_NAME_HERE>'
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers=BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    retry_backoff_ms=500,
    request_timeout_ms=20000,
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,)

# Method to get a random model name
def getModel():
    products=["Ultra WiFi Modem", "Ultra WiFi Booster", "EVG2000", "Sagemcom 5366 TN", "ASUS AX5400"]
    randomnum = random.randint(0, 4)
    return (products[randomnum])

# Method to get a random interface status
def getInterfaceStatus():
    status=["connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "down", "down"]
    randomnum = random.randint(0, 13)
    return (status[randomnum])

# Method to get a random CPU usage
def getCPU():
    i = random.randint(50, 100)
    return (str(i))

# Method to get a random memory usage
def getMemory():
    i = random.randint(1000, 1500)
    return (str(i))
    
# Method to generate sample data
def generateData():
    
    model=getModel()
    deviceid='dvc' + str(random.randint(1000, 10000))
    interface='eth4.1'
    interfacestatus=getInterfaceStatus()
    cpuusage=getCPU()
    memoryusage=getMemory()
    now = datetime.now()
    event_time = now.strftime("%Y-%m-%d %H:%M:%S")
    
    modem_data={}
    modem_data["model"]=model
    modem_data["deviceid"]=deviceid
    modem_data["interface"]=interface
    modem_data["interfacestatus"]=interfacestatus
    modem_data["cpuusage"]=cpuusage
    modem_data["memoryusage"]=memoryusage
    modem_data["event_time"]=event_time
    return modem_data

# Continuously generate and send data
while True:
    data =generateData()
    print(data)
    try:
        future = producer.send(topicname, value=data)
        producer.flush()
        record_metadata = future.get(timeout=10)
        
    except Exception as e:
        print(e.with_traceback())

  • Çalıştır kafkaDataGen.py sürekli olarak rastgele veriler oluşturmak ve bunları belirtilen Kafka konusuna yayınlamak için:
python kafkaDataGen.py

Etkinlikleri Amazon S3'te saklayın

Artık tüm ham olay verilerini bir Amazon Basit Depolama Hizmeti (Amazon S3) analitik için veri gölü. ML modellerini eğitmek için aynı verileri kullanabilirsiniz. Amazon Data Firehose ile entegrasyon Amazon MSK'nın Apache Kafka kümelerinizdeki verileri S3 veri gölüne sorunsuz bir şekilde yüklemesine olanak tanır. Kendi bağlayıcı uygulamalarınızı oluşturma veya yönetme ihtiyacını ortadan kaldırarak Kafka'dan Amazon S3'e sürekli veri akışı sağlamak için aşağıdaki adımları tamamlayın:

  • Amazon S3 konsolunda yeni bir paket oluşturun. Mevcut bir paketi de kullanabilirsiniz.
  • S3 paketinizde adı verilen yeni bir klasör oluşturun streamingDataLake.
  • Amazon MSK konsolunda MSK Sunucusuz kümenizi seçin.
  • Üzerinde İşlemler menü seç Küme politikasını düzenle.

küme politikası

  • seç Firehose hizmet sorumlusunu dahil et Ve seç Değişiklikleri Kaydet.

itfaiye hortumu servis sorumlusu

  • Üzerinde S3 teslimatı sekmesini seçin Teslimat akışı oluştur.

teslimat akışı

  • İçin Kaynak, seçmek Amazon MSK'sı.
  • İçin Varış yeri, seçmek Amazon S3.

kaynak ve hedef

  • İçin Amazon MSK kümesi bağlantısıseçin Özel önyükleme komisyoncuları.
  • İçin konu, bir konu adı girin (bu gönderi için, mytopic).

kaynak ayarları

  • İçin S3 kepçe, seçmek Araştır ve S3 kovanızı seçin.
  • Keşfet streamingDataLake S3 kova önekiniz olarak.
  • Keşfet streamingDataLakeErr S3 grup hatası çıktı önekiniz olarak.

hedef ayarları

  • Klinik Teslimat akışı oluştur.

teslimat akışı oluştur

Verilerin S3 klasörünüze yazıldığını doğrulayabilirsiniz. Şunu görmelisiniz: streamingDataLake dizin oluşturuldu ve dosyalar bölümlerde saklandı.

amazon s3

Etkinlikleri DynamoDB'de saklayın

Son adımda en güncel modem verilerini DynamoDB'de saklarsınız. Bu, istemci uygulamasının modem durumuna erişmesine ve düşük gecikme ve yüksek kullanılabilirlik ile her yerden modemle uzaktan etkileşime girmesine olanak tanır. Lambda, Amazon MSK ile sorunsuz bir şekilde çalışır. Lambda, olay kaynağından gelen yeni mesajları dahili olarak yoklar ve ardından eşzamanlı olarak hedef Lambda işlevini çağırır. Lambda, mesajları toplu olarak okur ve bunları bir olay yükü olarak fonksiyonunuza sunar.

Öncelikle DynamoDB'de bir tablo oluşturalım. Bakınız DynamoDB API izinleri: Eylemler, kaynaklar ve koşullar referansı istemci makinenizin gerekli izinlere sahip olduğunu doğrulamak için.

  • Adlı yeni bir dosya oluşturun createTable.py.
  • Aşağıdaki kodu güncelleyerek dosyaya kopyalayın. region bilgi:
import boto3
region='<UPDATE_AWS_REGION_NAME_HERE>'
dynamodb = boto3.client('dynamodb', region_name=region)
table_name = 'device_status'
key_schema = [
    {
        'AttributeName': 'deviceid',
        'KeyType': 'HASH'
    }
]
attribute_definitions = [
    {
        'AttributeName': 'deviceid',
        'AttributeType': 'S'
    }
]
# Create the table with on-demand capacity mode
dynamodb.create_table(
    TableName=table_name,
    KeySchema=key_schema,
    AttributeDefinitions=attribute_definitions,
    BillingMode='PAY_PER_REQUEST'
)
print(f"Table '{table_name}' created with on-demand capacity mode.")

  • Çalıştır createTable.py adında bir tablo oluşturmak için komut dosyası device_status DynamoDB'de:
python createTable.py

Şimdi Lambda fonksiyonunu yapılandıralım.

  • Lambda konsolunda şunu seçin: fonksiyonlar Gezinti bölmesinde.
  • Klinik İşlev oluştur.
  • seç Sıfırdan yazar.
  • İçin Fonksiyon adı¸ bir isim girin (örneğin, my-notification-kafka).
  • İçin Süre, seçmek Python 3.11.
  • İçin İzinlerseçin Mevcut bir rolü kullan ve bir rol seçin kümenizden okuma izinleri.
  • Fonksiyonu oluşturun.

Lambda işlevi yapılandırma sayfasında artık kaynakları, hedefleri ve uygulama kodunuzu yapılandırabilirsiniz.

  • Klinik Tetikleyici ekle.
  • İçin Tetik yapılandırması, girmek MSK Amazon MSK'yı Lambda kaynak işlevi için tetikleyici olarak yapılandırmak için.
  • İçin MSK kümesi, girmek myCluster.
  • Kaldırın Tetiği etkinleştir, çünkü henüz Lambda işlevinizi yapılandırmadınız.
  • İçin Parti boyutu, girmek 100.
  • İçin Başlangıç ​​pozisyonu, seçmek En Son Eklenenler.
  • İçin Konu adı¸ bir isim girin (örneğin, mytopic).
  • Klinik Ekle.
  • Lambda işlevi ayrıntıları sayfasında, Kod sekmesine aşağıdaki kodu girin:
import base64
import boto3
import json
import os
import random

def convertjson(payload):
    try:
        aa=json.loads(payload)
        return aa
    except:
        return 'err'

def lambda_handler(event, context):
    base64records = event['records']['mytopic-0']
    
    raw_records = [base64.b64decode(x["value"]).decode('utf-8') for x in base64records]
    
    for record in raw_records:
        item = json.loads(record)
        deviceid=item['deviceid']
        interface=item['interface']
        interfacestatus=item['interfacestatus']
        cpuusage=item['cpuusage']
        memoryusage=item['memoryusage']
        event_time=item['event_time']
        
        dynamodb = boto3.client('dynamodb')
        table_name = 'device_status'
        item = {
            'deviceid': {'S': deviceid},  
            'interface': {'S': interface},               
            'interface': {'S': interface},
            'interfacestatus': {'S': interfacestatus},
            'cpuusage': {'S': cpuusage},          
            'memoryusage': {'S': memoryusage},
            'event_time': {'S': event_time},
        }
        
        # Write the item to the DynamoDB table
        response = dynamodb.put_item(
            TableName=table_name,
            Item=item
        )
        
        print(f"Item written to DynamoDB")

  • Lambda işlevini dağıtın.
  • Üzerinde yapılandırma sekmesini seçin Düzenle Tetikleyiciyi düzenlemek için.

tetikleyiciyi düzenle

  • Tetikleyiciyi seçin ve ardından İndirim.
  • DynamoDB konsolunda, Öğeleri keşfedin Gezinti bölmesinde.
  • tabloyu seçin device_status.

Lambda'nın Kafka konusunda oluşturulan olayları DynamoDB'ye yazdığını göreceksiniz.

ddb tablosu

Özet

Akış veri hatları, gerçek zamanlı uygulamalar oluşturmak için kritik öneme sahiptir. Ancak altyapıyı kurmak ve yönetmek göz korkutucu olabilir. Bu yazıda Amazon MSK, Lambda, DynamoDB, Amazon Data Firehose ve diğer hizmetleri kullanarak AWS'de sunucusuz bir akış hattının nasıl oluşturulacağını anlattık. Temel avantajlar, yönetilecek sunucuların olmaması, altyapının otomatik ölçeklenebilirliği ve tamamen yönetilen hizmetleri kullanan kullandıkça öde modelidir.

Kendi gerçek zamanlı işlem hattınızı oluşturmaya hazır mısınız? Ücretsiz bir AWS hesabıyla bugün başlayın. Sunucusuzluğun gücüyle siz uygulama mantığınıza odaklanabilir, AWS ise farklı olmayan ağır işleri halleder. AWS'de muhteşem bir şey inşa edelim!


Yazarlar Hakkında

Masudur Rahaman Sayem AWS'de Akış Veri Mimarıdır. Gerçek dünyadaki iş sorunlarını çözmek için veri akışı mimarileri tasarlamak ve oluşturmak için dünyanın her yerindeki AWS müşterileriyle birlikte çalışır. Akışlı veri hizmetleri ve NoSQL kullanan çözümleri optimize etme konusunda uzmandır. Sayem, dağıtılmış bilgi işlem konusunda çok tutkulu.

Michael Oguike Amazon MSK'nın Ürün Yöneticisidir. Eylemi teşvik eden içgörüleri ortaya çıkarmak için verileri kullanma konusunda tutkulu. Çok çeşitli sektörlerden müşterilerin veri akışını kullanarak işlerini geliştirmelerine yardımcı olmaktan hoşlanıyor. Michael ayrıca kitaplardan ve podcast'lerden davranış bilimi ve psikoloji hakkında bilgi almayı da seviyor.

spot_img

En Son İstihbarat

spot_img