जेफिरनेट लोगो

Python | का उपयोग करके Amazon MSK पर Apache Kafka के साथ एक एंड-टू-एंड सर्वर रहित स्ट्रीमिंग पाइपलाइन बनाएं अमेज़न वेब सेवाएँ

दिनांक:

गेमिंग, खुदरा और वित्त से लेकर विनिर्माण, स्वास्थ्य सेवा और यात्रा तक वैश्विक स्तर पर उत्पन्न डेटा की मात्रा में वृद्धि जारी है। संगठन अपने व्यवसायों और ग्राहकों के लिए नवाचार करने के लिए डेटा के निरंतर प्रवाह का त्वरित उपयोग करने के और तरीके तलाश रहे हैं। उन्हें वास्तविक समय में डेटा को विश्वसनीय रूप से कैप्चर करना, संसाधित करना, विश्लेषण करना और असंख्य डेटा स्टोर में लोड करना होगा।

अपाचे काफ्का इन वास्तविक समय की स्ट्रीमिंग आवश्यकताओं के लिए एक लोकप्रिय विकल्प है। हालाँकि, अन्य डेटा प्रोसेसिंग घटकों के साथ काफ्का क्लस्टर स्थापित करना चुनौतीपूर्ण हो सकता है जो आपके एप्लिकेशन की आवश्यकताओं के आधार पर स्वचालित रूप से स्केल करते हैं। आप चरम ट्रैफ़िक के लिए कम प्रावधान करने का जोखिम उठाते हैं, जिससे डाउनटाइम हो सकता है, या बेस लोड के लिए अधिक प्रावधान हो सकता है, जिससे बर्बादी हो सकती है। AWS कई सर्वर रहित सेवाएँ प्रदान करता है जैसे Apache Kafka के लिए Amazon प्रबंधित स्ट्रीमिंग (अमेज़ॅन एमएसके), अमेज़ॅन डेटा फ़ायरहोज़, अमेज़ॅन डायनेमोडीबी, तथा AWS लाम्बा वह पैमाना आपकी आवश्यकताओं के आधार पर स्वचालित रूप से होता है।

इस पोस्ट में, हम बताते हैं कि आप इनमें से कुछ सेवाओं का उपयोग कैसे कर सकते हैं एमएसके सर्वर रहित, आपकी वास्तविक समय की जरूरतों को पूरा करने के लिए एक सर्वर रहित डेटा प्लेटफ़ॉर्म बनाने के लिए।

समाधान अवलोकन

आइए एक परिदृश्य की कल्पना करें. आप कई भौगोलिक क्षेत्रों में तैनात एक इंटरनेट सेवा प्रदाता के लिए हजारों मॉडेम के प्रबंधन के लिए जिम्मेदार हैं। आप मॉडेम कनेक्टिविटी गुणवत्ता की निगरानी करना चाहते हैं जिसका ग्राहक उत्पादकता और संतुष्टि पर महत्वपूर्ण प्रभाव पड़ता है। आपकी तैनाती में विभिन्न मॉडेम शामिल हैं जिन्हें न्यूनतम डाउनटाइम सुनिश्चित करने के लिए निगरानी और रखरखाव की आवश्यकता होती है। प्रत्येक डिवाइस हर सेकंड हजारों 1 केबी रिकॉर्ड प्रसारित करता है, जैसे सीपीयू उपयोग, मेमोरी उपयोग, अलार्म और कनेक्शन स्थिति। आप इस डेटा तक वास्तविक समय में पहुंच चाहते हैं ताकि आप वास्तविक समय में प्रदर्शन की निगरानी कर सकें, और समस्याओं का तुरंत पता लगा सकें और उन्हें कम कर सकें। पूर्वानुमानित रखरखाव मूल्यांकन चलाने, अनुकूलन के अवसर खोजने और मांग का पूर्वानुमान लगाने के लिए आपको मशीन लर्निंग (एमएल) मॉडल के लिए इस डेटा तक दीर्घकालिक पहुंच की भी आवश्यकता है।

आपके ग्राहक जो डेटा ऑनसाइट एकत्र करते हैं, वे पायथन में लिखे गए हैं, और वे सभी डेटा को अपाचे काफ्का विषयों के रूप में अमेज़ॅन एमएसके को भेज सकते हैं। अपने एप्लिकेशन की कम-विलंबता और वास्तविक समय डेटा पहुंच के लिए, आप इसका उपयोग कर सकते हैं लैम्ब्डा और डायनेमोडीबी. लंबी अवधि के डेटा भंडारण के लिए, आप प्रबंधित सर्वर रहित कनेक्टर सेवा का उपयोग कर सकते हैं अमेज़ॅन डेटा फ़ायरहोज़ अपने डेटा लेक में डेटा भेजने के लिए।

निम्नलिखित चित्र दिखाता है कि आप इस एंड-टू-एंड सर्वर रहित एप्लिकेशन को कैसे बना सकते हैं।

एंड-टू-एंड सर्वर रहित एप्लिकेशन

आइए इस आर्किटेक्चर को लागू करने के लिए निम्नलिखित अनुभागों में दिए गए चरणों का पालन करें।

Amazon MSK पर सर्वर रहित काफ्का क्लस्टर बनाएं

हम मॉडेम से वास्तविक समय टेलीमेट्री डेटा प्राप्त करने के लिए अमेज़ॅन एमएसके का उपयोग करते हैं। Amazon MSK पर सर्वर रहित काफ्का क्लस्टर बनाना सीधा है। इसका उपयोग करने में केवल कुछ मिनट लगते हैं एडब्ल्यूएस प्रबंधन कंसोल या एडब्ल्यूएस एसडीके। कंसोल का उपयोग करने के लिए, देखें MSK सर्वर रहित क्लस्टर का उपयोग करना प्रारंभ करना. आप एक सर्वर रहित क्लस्टर बनाते हैं, AWS पहचान और अभिगम प्रबंधन (IAM) भूमिका, और क्लाइंट मशीन।

पायथन का उपयोग करके एक काफ्का विषय बनाएं

जब आपका क्लस्टर और क्लाइंट मशीन तैयार हो जाए, तो अपने क्लाइंट मशीन पर एसएसएच करें और पायथन के लिए काफ्का पायथन और एमएसके आईएएम लाइब्रेरी स्थापित करें।

pip install kafka-python

pip install aws-msk-iam-sasl-signer-python

  • नामक एक नई फ़ाइल बनाएँ createTopic.py.
  • इस फ़ाइल में निम्न कोड को प्रतिस्थापित करते हुए कॉपी करें bootstrap_servers और region आपके क्लस्टर के विवरण के साथ जानकारी। पुनः प्राप्त करने के निर्देशों के लिए bootstrap_servers आपके MSK क्लस्टर के लिए जानकारी, देखें Amazon MSK क्लस्टर के लिए बूटस्ट्रैप ब्रोकर प्राप्त करना.
from kafka.admin import KafkaAdminClient, NewTopic
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# AWS region where MSK cluster is located
region= '<UPDATE_AWS_REGION_NAME_HERE>'

# Class to provide MSK authentication token
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

# Create an instance of MSKTokenProvider class
tp = MSKTokenProvider()

# Initialize KafkaAdminClient with required configurations
admin_client = KafkaAdminClient(
    bootstrap_servers='<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id='client1',
)

# create topic
topic_name="mytopic"
topic_list =[NewTopic(name=topic_name, num_partitions=1, replication_factor=2)]
existing_topics = admin_client.list_topics()
if(topic_name not in existing_topics):
    admin_client.create_topics(topic_list)
    print("Topic has been created")
else:
    print("topic already exists!. List of topics are:" + str(existing_topics))

  • चलाएं createTopic.py एक नया काफ्का विषय बनाने के लिए स्क्रिप्ट कहा जाता है mytopic आपके सर्वर रहित क्लस्टर पर:
python createTopic.py

पायथन का उपयोग करके रिकॉर्ड तैयार करें

आइए कुछ नमूना मॉडेम टेलीमेट्री डेटा उत्पन्न करें।

  • नामक एक नई फ़ाइल बनाएँ kafkaDataGen.py.
  • अद्यतन करते हुए, निम्न कोड को इस फ़ाइल में कॉपी करें BROKERS और region आपके क्लस्टर के विवरण सहित जानकारी:
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import random
from datetime import datetime
topicname='mytopic'

BROKERS = '<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>'
region= '<UPDATE_AWS_REGION_NAME_HERE>'
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers=BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    retry_backoff_ms=500,
    request_timeout_ms=20000,
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,)

# Method to get a random model name
def getModel():
    products=["Ultra WiFi Modem", "Ultra WiFi Booster", "EVG2000", "Sagemcom 5366 TN", "ASUS AX5400"]
    randomnum = random.randint(0, 4)
    return (products[randomnum])

# Method to get a random interface status
def getInterfaceStatus():
    status=["connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "down", "down"]
    randomnum = random.randint(0, 13)
    return (status[randomnum])

# Method to get a random CPU usage
def getCPU():
    i = random.randint(50, 100)
    return (str(i))

# Method to get a random memory usage
def getMemory():
    i = random.randint(1000, 1500)
    return (str(i))
    
# Method to generate sample data
def generateData():
    
    model=getModel()
    deviceid='dvc' + str(random.randint(1000, 10000))
    interface='eth4.1'
    interfacestatus=getInterfaceStatus()
    cpuusage=getCPU()
    memoryusage=getMemory()
    now = datetime.now()
    event_time = now.strftime("%Y-%m-%d %H:%M:%S")
    
    modem_data={}
    modem_data["model"]=model
    modem_data["deviceid"]=deviceid
    modem_data["interface"]=interface
    modem_data["interfacestatus"]=interfacestatus
    modem_data["cpuusage"]=cpuusage
    modem_data["memoryusage"]=memoryusage
    modem_data["event_time"]=event_time
    return modem_data

# Continuously generate and send data
while True:
    data =generateData()
    print(data)
    try:
        future = producer.send(topicname, value=data)
        producer.flush()
        record_metadata = future.get(timeout=10)
        
    except Exception as e:
        print(e.with_traceback())

  • चलाएं kafkaDataGen.py लगातार यादृच्छिक डेटा उत्पन्न करने और इसे निर्दिष्ट काफ्का विषय पर प्रकाशित करने के लिए:
python kafkaDataGen.py

Amazon S3 में इवेंट स्टोर करें

अब आप सभी कच्चे ईवेंट डेटा को एक में संग्रहीत करते हैं अमेज़न सरल भंडारण सेवा (अमेज़ॅन S3) एनालिटिक्स के लिए डेटा लेक। आप एमएल मॉडल को प्रशिक्षित करने के लिए उसी डेटा का उपयोग कर सकते हैं। अमेज़ॅन डेटा फ़ायरहोज़ के साथ एकीकरण अमेज़ॅन MSK को आपके अपाचे काफ्का क्लस्टर से डेटा को S3 डेटा लेक में निर्बाध रूप से लोड करने की अनुमति देता है। अपने स्वयं के कनेक्टर एप्लिकेशन बनाने या प्रबंधित करने की आवश्यकता को समाप्त करते हुए, काफ्का से अमेज़ॅन S3 तक डेटा को लगातार स्ट्रीम करने के लिए निम्नलिखित चरणों को पूरा करें:

  • Amazon S3 कंसोल पर, एक नई बकेट बनाएं। आप मौजूदा बाल्टी का भी उपयोग कर सकते हैं।
  • अपने S3 बकेट में एक नया फ़ोल्डर बनाएँ जिसे कहा जाता है streamingDataLake.
  • Amazon MSK कंसोल पर, अपना MSK सर्वर रहित क्लस्टर चुनें।
  • पर क्रियाएँ मेनू, चुनें क्लस्टर नीति संपादित करें.

क्लस्टर नीति

  • चुनते हैं फ़ायरहोज़ सेवा प्रिंसिपल शामिल करें और चुनें परिवर्तन सहेजें.

फ़ायरहोज़ सेवा प्रिंसिपल

  • पर S3 डिलिवरी टैब चुनें वितरण स्ट्रीम बनाएं.

वितरण धारा

  • के लिए स्रोत, चुनें अमेज़ॅन एमएसके.
  • के लिए गंतव्य, चुनें अमेज़न S3.

स्रोत और गंतव्य

  • के लिए अमेज़ॅन एमएसके क्लस्टर कनेक्टिविटी, चुनते हैं निजी बूटस्ट्रैप दलाल.
  • के लिए विषय, विषय का नाम दर्ज करें (इस पोस्ट के लिए, mytopic).

स्रोत सेटिंग्स

  • के लिए S3 बाल्टी, चुनें ब्राउज और अपनी S3 बाल्टी चुनें।
  • दर्ज streamingDataLake आपके S3 बकेट उपसर्ग के रूप में।
  • दर्ज streamingDataLakeErr आपके S3 बकेट त्रुटि आउटपुट उपसर्ग के रूप में।

गंतव्य सेटिंग्स

  • चुनें वितरण स्ट्रीम बनाएं.

डिलीवरी स्ट्रीम बनाएं

आप सत्यापित कर सकते हैं कि डेटा आपके S3 बकेट में लिखा गया था। आपको यह देखना चाहिए कि streamingDataLake निर्देशिका बनाई गई थी और फ़ाइलें विभाजन में संग्रहीत हैं।

अमेजन s3

DynamoDB में ईवेंट संग्रहीत करें

अंतिम चरण के लिए, आप नवीनतम मॉडेम डेटा को DynamoDB में संग्रहीत करते हैं। यह क्लाइंट एप्लिकेशन को मॉडेम स्थिति तक पहुंचने और कम विलंबता और उच्च उपलब्धता के साथ कहीं से भी मॉडेम के साथ बातचीत करने की अनुमति देता है। लैम्ब्डा अमेज़ॅन एमएसके के साथ सहजता से काम करता है. लैम्ब्डा आंतरिक रूप से इवेंट स्रोत से नए संदेशों के लिए सर्वेक्षण करता है और फिर समकालिक रूप से लक्ष्य लैम्ब्डा फ़ंक्शन को आमंत्रित करता है। लैम्ब्डा संदेशों को बैचों में पढ़ता है और इन्हें आपके फ़ंक्शन को इवेंट पेलोड के रूप में प्रदान करता है।

आइए सबसे पहले DynamoDB में एक तालिका बनाएं। को देखें DynamoDB API अनुमतियाँ: क्रियाएँ, संसाधन और शर्तें संदर्भ यह सत्यापित करने के लिए कि आपकी क्लाइंट मशीन के पास आवश्यक अनुमतियाँ हैं।

  • नामक एक नई फ़ाइल बनाएँ createTable.py.
  • अद्यतन करते हुए, निम्न कोड को फ़ाइल में कॉपी करें region जानकारी:
import boto3
region='<UPDATE_AWS_REGION_NAME_HERE>'
dynamodb = boto3.client('dynamodb', region_name=region)
table_name = 'device_status'
key_schema = [
    {
        'AttributeName': 'deviceid',
        'KeyType': 'HASH'
    }
]
attribute_definitions = [
    {
        'AttributeName': 'deviceid',
        'AttributeType': 'S'
    }
]
# Create the table with on-demand capacity mode
dynamodb.create_table(
    TableName=table_name,
    KeySchema=key_schema,
    AttributeDefinitions=attribute_definitions,
    BillingMode='PAY_PER_REQUEST'
)
print(f"Table '{table_name}' created with on-demand capacity mode.")

  • चलाएं createTable.py तालिका बनाने के लिए स्क्रिप्ट कहा जाता है device_status डायनेमोडीबी में:
python createTable.py

अब लैम्ब्डा फ़ंक्शन को कॉन्फ़िगर करते हैं।

  • लैम्ब्डा कंसोल पर, चुनें कार्य नेविगेशन फलक में
  • चुनें फ़ंक्शन बनाएं.
  • चुनते हैं खरोंच से लेखक.
  • के लिए कार्य का नामएक नाम दर्ज करें (उदाहरण के लिए, my-notification-kafka).
  • के लिए क्रम, चुनें अजगर 3.11.
  • के लिए अनुमतियाँ, चुनते हैं किसी मौजूदा भूमिका का उपयोग करें और एक भूमिका चुनें आपके क्लस्टर से पढ़ने की अनुमति.
  • फ़ंक्शन बनाएं.

लैम्ब्डा फ़ंक्शन कॉन्फ़िगरेशन पृष्ठ पर, अब आप स्रोतों, गंतव्यों और अपने एप्लिकेशन कोड को कॉन्फ़िगर कर सकते हैं।

  • चुनें ट्रिगर जोड़ें.
  • के लिए ट्रिगर कॉन्फ़िगरेशन, दर्ज MSK लैम्ब्डा स्रोत फ़ंक्शन के लिए ट्रिगर के रूप में अमेज़ॅन एमएसके को कॉन्फ़िगर करने के लिए।
  • के लिए एमएसके क्लस्टर, दर्ज myCluster.
  • अचयनित ट्रिगर सक्रिय करें, क्योंकि आपने अभी तक अपना लैम्ब्डा फ़ंक्शन कॉन्फ़िगर नहीं किया है।
  • के लिए बैच का आकार, दर्ज 100.
  • के लिए शुरुआत का स्थान, चुनें Latest.
  • के लिए विषय का नामएक नाम दर्ज करें (उदाहरण के लिए, mytopic).
  • चुनें .
  • लैम्ब्डा फ़ंक्शन विवरण पृष्ठ पर, पर कोड टैब, निम्नलिखित कोड दर्ज करें:
import base64
import boto3
import json
import os
import random

def convertjson(payload):
    try:
        aa=json.loads(payload)
        return aa
    except:
        return 'err'

def lambda_handler(event, context):
    base64records = event['records']['mytopic-0']
    
    raw_records = [base64.b64decode(x["value"]).decode('utf-8') for x in base64records]
    
    for record in raw_records:
        item = json.loads(record)
        deviceid=item['deviceid']
        interface=item['interface']
        interfacestatus=item['interfacestatus']
        cpuusage=item['cpuusage']
        memoryusage=item['memoryusage']
        event_time=item['event_time']
        
        dynamodb = boto3.client('dynamodb')
        table_name = 'device_status'
        item = {
            'deviceid': {'S': deviceid},  
            'interface': {'S': interface},               
            'interface': {'S': interface},
            'interfacestatus': {'S': interfacestatus},
            'cpuusage': {'S': cpuusage},          
            'memoryusage': {'S': memoryusage},
            'event_time': {'S': event_time},
        }
        
        # Write the item to the DynamoDB table
        response = dynamodb.put_item(
            TableName=table_name,
            Item=item
        )
        
        print(f"Item written to DynamoDB")

  • लैम्ब्डा फ़ंक्शन तैनात करें।
  • पर विन्यास टैब चुनें संपादित करें ट्रिगर को संपादित करने के लिए.

ट्रिगर संपादित करें

  • ट्रिगर चुनें, फिर चुनें सहेजें.
  • डायनॉम्बीडी कंसोल पर, चुनें वस्तुओं का अन्वेषण करें नेविगेशन फलक में
  • तालिका का चयन करें device_status.

आप देखेंगे कि लैम्ब्डा काफ्का विषय में उत्पन्न घटनाओं को DynamoDB पर लिख रहा है।

डीडीबी तालिका

सारांश

वास्तविक समय के अनुप्रयोगों के निर्माण के लिए स्ट्रीमिंग डेटा पाइपलाइन महत्वपूर्ण हैं। हालाँकि, बुनियादी ढांचे की स्थापना और प्रबंधन कठिन हो सकता है। इस पोस्ट में, हमने Amazon MSK, Lambda, DynamoDB, Amazon Data Firehose और अन्य सेवाओं का उपयोग करके AWS पर सर्वर रहित स्ट्रीमिंग पाइपलाइन बनाने का तरीका बताया। मुख्य लाभ प्रबंधन के लिए कोई सर्वर नहीं होना, बुनियादी ढांचे की स्वचालित स्केलेबिलिटी और पूरी तरह से प्रबंधित सेवाओं का उपयोग करते हुए भुगतान करने वाला मॉडल है।

क्या आप अपनी स्वयं की रीयल-टाइम पाइपलाइन बनाने के लिए तैयार हैं? निःशुल्क AWS खाते के साथ आज ही शुरुआत करें। सर्वर रहित की शक्ति के साथ, आप अपने एप्लिकेशन लॉजिक पर ध्यान केंद्रित कर सकते हैं जबकि AWS अविभाजित भारी भारोत्तोलन को संभालता है। आइए AWS पर कुछ अद्भुत बनाएं!


लेखक के बारे में

मसूदुर रहमान सईम AWS में एक स्ट्रीमिंग डेटा आर्किटेक्ट है। वह वास्तविक दुनिया की व्यावसायिक समस्याओं को हल करने के लिए डेटा स्ट्रीमिंग आर्किटेक्चर के डिजाइन और निर्माण के लिए विश्व स्तर पर AWS ग्राहकों के साथ काम करता है। वह स्ट्रीमिंग डेटा सेवाओं और NoSQL का उपयोग करने वाले समाधानों को अनुकूलित करने में माहिर हैं। सईम डिस्ट्रीब्यूटेड कंप्यूटिंग को लेकर बहुत जुनूनी है।

माइकल ओगुइके अमेज़न MSK के लिए उत्पाद प्रबंधक हैं। उन्हें कार्रवाई को प्रेरित करने वाली अंतर्दृष्टि को उजागर करने के लिए डेटा का उपयोग करने का शौक है। उन्हें डेटा स्ट्रीमिंग का उपयोग करके विभिन्न प्रकार के उद्योगों के ग्राहकों को उनके व्यवसाय को बेहतर बनाने में मदद करने में आनंद आता है। माइकल को किताबों और पॉडकास्ट से व्यवहार विज्ञान और मनोविज्ञान के बारे में सीखना भी पसंद है।

स्पॉट_आईएमजी

नवीनतम खुफिया

स्पॉट_आईएमजी