شعار زيفيرنت

أنشئ مسار تدفق شامل بدون خادم باستخدام Apache Kafka على Amazon MSK باستخدام Python | خدمات الويب الأمازون

التاريخ:

يستمر حجم البيانات التي يتم إنشاؤها عالميًا في الارتفاع، بدءًا من الألعاب وتجارة التجزئة والتمويل إلى التصنيع والرعاية الصحية والسفر. تبحث المؤسسات عن المزيد من الطرق لاستخدام التدفق المستمر للبيانات بسرعة للابتكار لأعمالها وعملائها. يتعين عليهم التقاط البيانات ومعالجتها وتحليلها وتحميلها بشكل موثوق في عدد لا يحصى من مخازن البيانات، كل ذلك في الوقت الفعلي.

يعد Apache Kafka خيارًا شائعًا لاحتياجات البث في الوقت الفعلي. ومع ذلك، قد يكون من الصعب إعداد مجموعة Kafka جنبًا إلى جنب مع مكونات معالجة البيانات الأخرى التي يتم ضبطها تلقائيًا وفقًا لاحتياجات التطبيق الخاص بك. أنت تخاطر بالنقص في التزويد لحركة المرور القصوى، مما قد يؤدي إلى التوقف، أو الإفراط في التزويد بالحمل الأساسي، مما يؤدي إلى الهدر. تقدم AWS خدمات متعددة بدون خادم مثل Amazon Managed Streaming لأباتشي كافكا (أمازون MSK) ، خرطوم بيانات أمازون, الأمازون DynamoDBو AWS لامدا هذا المقياس تلقائيًا حسب احتياجاتك.

وفي هذه التدوينة، نوضح كيف يمكنك استخدام بعض هذه الخدمات، بما في ذلك MSK بدون خادم، لإنشاء منصة بيانات بدون خادم لتلبية احتياجاتك في الوقت الفعلي.

حل نظرة عامة

دعونا نتخيل السيناريو. أنت مسؤول عن إدارة الآلاف من أجهزة المودم لموفر خدمة الإنترنت المنتشر عبر مناطق جغرافية متعددة. تريد مراقبة جودة اتصال المودم التي لها تأثير كبير على إنتاجية العملاء ورضاهم. يتضمن النشر الخاص بك أجهزة مودم مختلفة تحتاج إلى المراقبة والصيانة لضمان الحد الأدنى من وقت التوقف عن العمل. يرسل كل جهاز آلاف السجلات بحجم 1 كيلو بايت كل ثانية، مثل استخدام وحدة المعالجة المركزية واستخدام الذاكرة والإنذار وحالة الاتصال. أنت تريد الوصول في الوقت الفعلي إلى هذه البيانات حتى تتمكن من مراقبة الأداء في الوقت الفعلي واكتشاف المشكلات وتخفيفها بسرعة. تحتاج أيضًا إلى الوصول على المدى الطويل إلى هذه البيانات لنماذج التعلم الآلي (ML) لإجراء تقييمات الصيانة التنبؤية، والعثور على فرص التحسين، والتنبؤ بالطلب.

عملاؤك الذين يجمعون البيانات في الموقع مكتوبون بلغة Python، ويمكنهم إرسال جميع البيانات كمواضيع Apache Kafka إلى Amazon MSK. للوصول إلى البيانات في الوقت الفعلي وزمن الوصول المنخفض لتطبيقك، يمكنك استخدام لامدا ودينامو دي بي. لتخزين البيانات على المدى الطويل، يمكنك استخدام خدمة الموصل المُدارة بدون خادم خرطوم بيانات أمازون لإرسال البيانات إلى بحيرة البيانات الخاصة بك.

يوضح الرسم البياني التالي كيف يمكنك إنشاء هذا التطبيق الشامل بدون خادم.

تطبيق شامل بدون خادم

دعونا نتبع الخطوات الواردة في الأقسام التالية لتنفيذ هذه البنية.

قم بإنشاء مجموعة كافكا بدون خادم على Amazon MSK

نحن نستخدم Amazon MSK لاستيعاب بيانات القياس عن بعد في الوقت الفعلي من أجهزة المودم. يعد إنشاء مجموعة Kafka بدون خادم أمرًا سهلاً على Amazon MSK. يستغرق الأمر بضع دقائق فقط باستخدام وحدة تحكم إدارة AWS أو AWS SDK. لاستخدام وحدة التحكم، راجع الشروع في استخدام مجموعات MSK Serverless. يمكنك إنشاء مجموعة بدون خادم، إدارة الهوية والوصول AWS (IAM) الدور وجهاز العميل.

قم بإنشاء موضوع كافكا باستخدام بايثون

عندما يصبح نظام المجموعة وجهاز العميل لديك جاهزين، قم بتوصيل SSH إلى جهاز العميل الخاص بك وقم بتثبيت Kafka Python ومكتبة MSK IAM لـ Python.

  • قم بتشغيل الأوامر التالية لتثبيت Kafka Python و مكتبة MSK IAM:
pip install kafka-python

pip install aws-msk-iam-sasl-signer-python

  • إنشاء ملف جديد يسمى createTopic.py.
  • انسخ الكود التالي في هذا الملف، مع استبدال bootstrap_servers و region المعلومات مع تفاصيل مجموعتك. للحصول على تعليمات حول استرداد bootstrap_servers معلومات عن مجموعة MSK الخاصة بك، راجع الحصول على وسطاء التمهيد لمجموعة Amazon MSK.
from kafka.admin import KafkaAdminClient, NewTopic
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# AWS region where MSK cluster is located
region= '<UPDATE_AWS_REGION_NAME_HERE>'

# Class to provide MSK authentication token
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

# Create an instance of MSKTokenProvider class
tp = MSKTokenProvider()

# Initialize KafkaAdminClient with required configurations
admin_client = KafkaAdminClient(
    bootstrap_servers='<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id='client1',
)

# create topic
topic_name="mytopic"
topic_list =[NewTopic(name=topic_name, num_partitions=1, replication_factor=2)]
existing_topics = admin_client.list_topics()
if(topic_name not in existing_topics):
    admin_client.create_topics(topic_list)
    print("Topic has been created")
else:
    print("topic already exists!. List of topics are:" + str(existing_topics))

  • تشغيل createTopic.py البرنامج النصي لإنشاء موضوع كافكا جديد يسمى mytopic على مجموعتك بدون خادم:
python createTopic.py

إنتاج السجلات باستخدام بايثون

لنقم بإنشاء بعض نماذج بيانات القياس عن بعد للمودم.

  • إنشاء ملف جديد يسمى kafkaDataGen.py.
  • انسخ الكود التالي في هذا الملف لتحديث ملف BROKERS و region المعلومات مع تفاصيل مجموعتك:
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import random
from datetime import datetime
topicname='mytopic'

BROKERS = '<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>'
region= '<UPDATE_AWS_REGION_NAME_HERE>'
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers=BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    retry_backoff_ms=500,
    request_timeout_ms=20000,
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,)

# Method to get a random model name
def getModel():
    products=["Ultra WiFi Modem", "Ultra WiFi Booster", "EVG2000", "Sagemcom 5366 TN", "ASUS AX5400"]
    randomnum = random.randint(0, 4)
    return (products[randomnum])

# Method to get a random interface status
def getInterfaceStatus():
    status=["connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "down", "down"]
    randomnum = random.randint(0, 13)
    return (status[randomnum])

# Method to get a random CPU usage
def getCPU():
    i = random.randint(50, 100)
    return (str(i))

# Method to get a random memory usage
def getMemory():
    i = random.randint(1000, 1500)
    return (str(i))
    
# Method to generate sample data
def generateData():
    
    model=getModel()
    deviceid='dvc' + str(random.randint(1000, 10000))
    interface='eth4.1'
    interfacestatus=getInterfaceStatus()
    cpuusage=getCPU()
    memoryusage=getMemory()
    now = datetime.now()
    event_time = now.strftime("%Y-%m-%d %H:%M:%S")
    
    modem_data={}
    modem_data["model"]=model
    modem_data["deviceid"]=deviceid
    modem_data["interface"]=interface
    modem_data["interfacestatus"]=interfacestatus
    modem_data["cpuusage"]=cpuusage
    modem_data["memoryusage"]=memoryusage
    modem_data["event_time"]=event_time
    return modem_data

# Continuously generate and send data
while True:
    data =generateData()
    print(data)
    try:
        future = producer.send(topicname, value=data)
        producer.flush()
        record_metadata = future.get(timeout=10)
        
    except Exception as e:
        print(e.with_traceback())

  • تشغيل kafkaDataGen.py لإنشاء بيانات عشوائية بشكل مستمر ونشرها على موضوع كافكا المحدد:
python kafkaDataGen.py

قم بتخزين الأحداث في Amazon S3

الآن يمكنك تخزين جميع بيانات الأحداث الأولية في ملف خدمة تخزين أمازون البسيطة (Amazon S3) بحيرة البيانات للتحليلات. يمكنك استخدام نفس البيانات لتدريب نماذج تعلم الآلة. ال التكامل مع Amazon Data Firehose يسمح لـ Amazon MSK بتحميل البيانات بسلاسة من مجموعات Apache Kafka الخاصة بك إلى مستودع بيانات S3. أكمل الخطوات التالية لتدفق البيانات بشكل مستمر من Kafka إلى Amazon S3، مما يلغي الحاجة إلى إنشاء تطبيقات الموصل الخاصة بك أو إدارتها:

  • على وحدة تحكم Amazon S3، قم بإنشاء حاوية جديدة. يمكنك أيضًا استخدام مجموعة موجودة.
  • قم بإنشاء مجلد جديد في مجموعة S3 الخاصة بك يسمى streamingDataLake.
  • في وحدة تحكم Amazon MSK، اختر مجموعة MSK Serverless الخاصة بك.
  • على الإجراءات القائمة، اختر تحرير نهج الكتلة.

سياسة الكتلة

  • أختار تضمين مدير خدمة Firehose واختر حفظ التغييرات.

مدير خدمة خرطوم الإطفاء

  • على تسليم S3 علامة التبويب، اختر إنشاء دفق التسليم.

تيار التسليم

  • في حالة مصدر، اختر أمازون MSK.
  • في حالة الرحلات، اختر الأمازون S3.

المصدر والوجهة

  • في حالة اتصال مجموعة Amazon MSK، حدد وسطاء التمهيد الخاصين.
  • في حالة موضوع، أدخل اسم الموضوع (لهذه المشاركة، mytopic).

إعدادات المصدر

  • في حالة دلو S3، اختر تصفح واختر دلو S3 الخاص بك.
  • أدخل streamingDataLake كبادئة دلو S3 الخاصة بك.
  • أدخل streamingDataLakeErr كبادئة إخراج خطأ دلو S3 الخاص بك.

إعدادات الوجهة

  • اختار إنشاء دفق التسليم.

إنشاء دفق التسليم

يمكنك التحقق من كتابة البيانات في حاوية S3 الخاصة بك. يجب أن ترى أن streamingDataLake تم إنشاء الدليل ويتم تخزين الملفات في الأقسام.

الأمازون s3

تخزين الأحداث في DynamoDB

بالنسبة للخطوة الأخيرة، تقوم بتخزين أحدث بيانات المودم في DynamoDB. يتيح ذلك لتطبيق العميل الوصول إلى حالة المودم والتفاعل مع المودم عن بعد من أي مكان، مع زمن وصول منخفض وتوافر عالي. تعمل Lambda بسلاسة مع Amazon MSK. تستقصي Lambda داخليًا الرسائل الجديدة من مصدر الحدث، ثم تستدعي وظيفة Lambda المستهدفة بشكل متزامن. تقرأ Lambda الرسائل على دفعات وتوفرها لوظيفتك كحمولة للحدث.

لنقم أولاً بإنشاء جدول في DynamoDB. تشير إلى أذونات DynamoDB API: مرجع الإجراءات والموارد والشروط للتحقق من أن جهاز العميل الخاص بك لديه الأذونات اللازمة.

  • إنشاء ملف جديد يسمى createTable.py.
  • انسخ الكود التالي في الملف، وقم بتحديث ملف region المعلومات:
import boto3
region='<UPDATE_AWS_REGION_NAME_HERE>'
dynamodb = boto3.client('dynamodb', region_name=region)
table_name = 'device_status'
key_schema = [
    {
        'AttributeName': 'deviceid',
        'KeyType': 'HASH'
    }
]
attribute_definitions = [
    {
        'AttributeName': 'deviceid',
        'AttributeType': 'S'
    }
]
# Create the table with on-demand capacity mode
dynamodb.create_table(
    TableName=table_name,
    KeySchema=key_schema,
    AttributeDefinitions=attribute_definitions,
    BillingMode='PAY_PER_REQUEST'
)
print(f"Table '{table_name}' created with on-demand capacity mode.")

  • تشغيل createTable.py البرنامج النصي لإنشاء جدول يسمى device_status في دينامو دي بي:
python createTable.py

الآن دعونا نقوم بتكوين وظيفة Lambda.

  • في وحدة تحكم لامدا ، اختر وظائف في جزء التنقل.
  • اختار خلق وظيفة.
  • أختار مؤلف من الصفر.
  • في حالة اسم وظيفة¸ أدخل اسمًا (على سبيل المثال ، my-notification-kafka).
  • في حالة وقت التشغيل، اختر بيثون 3.11.
  • في حالة أذونات، حدد استخدم دورًا موجودًا واختيار دور مع أذونات القراءة من مجموعتك.
  • قم بإنشاء الوظيفة.

في صفحة تكوين وظيفة Lambda، يمكنك الآن تكوين المصادر والوجهات ورمز التطبيق الخاص بك.

  • اختار إضافة الزناد.
  • في حالة تكوين الزناد، أدخل MSK لتكوين Amazon MSK كمشغل لوظيفة مصدر Lambda.
  • في حالة كتلة MSK، أدخل myCluster.
  • إلغاء تنشيط الزناد، لأنك لم تقم بتكوين وظيفة Lambda الخاصة بك بعد.
  • في حالة حجم الدفعة، أدخل 100.
  • في حالة وضع البداية، اختر الأحدث.
  • في حالة اسم الموضوع¸ أدخل اسمًا (على سبيل المثال ، mytopic).
  • اختار أضف.
  • في صفحة تفاصيل وظيفة Lambda، على رمز علامة التبويب ، أدخل الرمز التالي:
import base64
import boto3
import json
import os
import random

def convertjson(payload):
    try:
        aa=json.loads(payload)
        return aa
    except:
        return 'err'

def lambda_handler(event, context):
    base64records = event['records']['mytopic-0']
    
    raw_records = [base64.b64decode(x["value"]).decode('utf-8') for x in base64records]
    
    for record in raw_records:
        item = json.loads(record)
        deviceid=item['deviceid']
        interface=item['interface']
        interfacestatus=item['interfacestatus']
        cpuusage=item['cpuusage']
        memoryusage=item['memoryusage']
        event_time=item['event_time']
        
        dynamodb = boto3.client('dynamodb')
        table_name = 'device_status'
        item = {
            'deviceid': {'S': deviceid},  
            'interface': {'S': interface},               
            'interface': {'S': interface},
            'interfacestatus': {'S': interfacestatus},
            'cpuusage': {'S': cpuusage},          
            'memoryusage': {'S': memoryusage},
            'event_time': {'S': event_time},
        }
        
        # Write the item to the DynamoDB table
        response = dynamodb.put_item(
            TableName=table_name,
            Item=item
        )
        
        print(f"Item written to DynamoDB")

  • نشر وظيفة لامدا.
  • على الاعداد علامة التبويب، اختر تعديل لتحرير الزناد.

تحرير الزناد

  • حدد المشغل، ثم اختر حفظ.
  • في وحدة تحكم DynamoDB ، اختر استكشف العناصر في جزء التنقل.
  • حدد الجدول device_status.

ستلاحظ أن Lambda تكتب الأحداث التي تم إنشاؤها في موضوع Kafka إلى DynamoDB.

جدول دي دي بي

نبذة عامة

يعد تدفق خطوط أنابيب البيانات أمرًا بالغ الأهمية لبناء التطبيقات في الوقت الفعلي. ومع ذلك، قد يكون إنشاء البنية التحتية وإدارتها أمرًا شاقًا. في هذا المنشور، تناولنا كيفية إنشاء مسار تدفق بدون خادم على AWS باستخدام Amazon MSK، وLambda، وDynamoDB، وAmazon Data Firehose، وخدمات أخرى. وتتمثل المزايا الرئيسية في عدم وجود خوادم يمكن إدارتها، وقابلية التوسع التلقائي للبنية الأساسية، ونموذج الدفع أولاً بأول باستخدام خدمات مُدارة بالكامل.

هل أنت مستعد لبناء خط الأنابيب الخاص بك في الوقت الفعلي؟ ابدأ اليوم باستخدام حساب AWS مجاني. بفضل قوة العمل بدون خادم، يمكنك التركيز على منطق التطبيق الخاص بك بينما تتعامل AWS مع الأحمال الثقيلة غير المتمايزة. دعونا نبني شيئًا رائعًا على AWS!


حول المؤلف

مسعود رحمن صايم هو مهندس بيانات متدفقة في AWS. إنه يعمل مع عملاء AWS على مستوى العالم لتصميم وبناء بنيات تدفق البيانات لحل مشاكل الأعمال في العالم الحقيقي. إنه متخصص في تحسين الحلول التي تستخدم خدمات تدفق البيانات و NoSQL. سايم شغوف جدًا بالحوسبة الموزعة.

مايكل أوجويكي هو مدير المنتج لشركة Amazon MSK. إنه متحمس لاستخدام البيانات للكشف عن الرؤى التي تدفع العمل. إنه يستمتع بمساعدة العملاء من مجموعة واسعة من الصناعات على تحسين أعمالهم باستخدام تدفق البيانات. يحب مايكل أيضًا التعرف على العلوم السلوكية وعلم النفس من الكتب والبودكاست.

بقعة_صورة

أحدث المعلومات الاستخباراتية

بقعة_صورة