제퍼넷 로고

Python을 사용하여 Amazon MSK에서 Apache Kafka로 엔드투엔드 서버리스 스트리밍 파이프라인 구축 | 아마존 웹 서비스

시간

전 세계적으로 생성되는 데이터의 양은 게임, 소매, 금융부터 제조, 의료, 여행에 이르기까지 계속해서 급증하고 있습니다. 조직은 지속적으로 유입되는 데이터를 신속하게 활용하여 비즈니스와 고객을 혁신할 수 있는 더 많은 방법을 찾고 있습니다. 그들은 데이터를 실시간으로 안정적으로 캡처, 처리, 분석하고 수많은 데이터 저장소에 로드해야 합니다.

Apache Kafka는 이러한 실시간 스트리밍 요구 사항에 널리 사용되는 선택입니다. 그러나 애플리케이션의 요구 사항에 따라 자동으로 확장되는 다른 데이터 처리 구성 요소와 함께 Kafka 클러스터를 설정하는 것은 어려울 수 있습니다. 최대 트래픽에 대한 과소 프로비저닝으로 인해 다운타임이 발생하거나 기본 로드에 대한 과잉 프로비저닝으로 인해 낭비가 발생할 위험이 있습니다. AWS는 다음과 같은 여러 서버리스 서비스를 제공합니다. Apache Kafka 용 Amazon Managed Streaming (아마존 MSK), 아마존 데이터 파이어호스, 아마존 DynamoDBAWS 람다 필요에 따라 자동으로 확장됩니다.

이 게시물에서는 다음을 포함하여 이러한 서비스 중 일부를 사용하는 방법을 설명합니다. MSK 서버리스, 실시간 요구 사항을 충족하는 서버리스 데이터 플랫폼을 구축합니다.

솔루션 개요

시나리오를 상상해 봅시다. 귀하는 여러 지역에 배포된 인터넷 서비스 공급자를 위해 수천 개의 모뎀을 관리할 책임이 있습니다. 고객 생산성과 만족도에 큰 영향을 미치는 모뎀 연결 품질을 모니터링하려고 합니다. 배포에는 가동 중지 시간을 최소화하기 위해 모니터링하고 유지 관리해야 하는 다양한 모뎀이 포함되어 있습니다. 각 장치는 CPU 사용량, 메모리 사용량, 알람, 연결 상태 등의 1KB 레코드 수천 개를 매초 전송합니다. 실시간으로 성능을 모니터링하고 문제를 신속하게 감지 및 완화할 수 있도록 이 데이터에 대한 실시간 액세스가 필요합니다. 또한 예측 유지 관리 평가를 실행하고, 최적화 기회를 찾고, 수요를 예측하려면 기계 학습(ML) 모델용 데이터에 장기간 액세스해야 합니다.

현장에서 데이터를 수집하는 클라이언트는 Python으로 작성되며 모든 데이터를 Apache Kafka 주제로 Amazon MSK에 보낼 수 있습니다. 애플리케이션의 짧은 대기 시간과 실시간 데이터 액세스를 위해 다음을 사용할 수 있습니다. 람다와 DynamoDB. 장기간 데이터 저장을 위해 관리형 서버리스 커넥터 서비스를 사용할 수 있습니다. 아마존 데이터 파이어호스 데이터 레이크로 데이터를 전송합니다.

다음 다이어그램은 이 엔드투엔드 서버리스 애플리케이션을 구축하는 방법을 보여줍니다.

엔드투엔드 서버리스 애플리케이션

이 아키텍처를 구현하려면 다음 섹션의 단계를 따르세요.

Amazon MSK에서 서버리스 Kafka 클러스터 생성

우리는 Amazon MSK를 사용하여 모뎀에서 실시간 원격 측정 데이터를 수집합니다. Amazon MSK에서는 서버리스 Kafka 클러스터를 생성하는 것이 간단합니다. 사용하는 데 몇 분 밖에 걸리지 않습니다. AWS 관리 콘솔 또는 AWS SDK. 콘솔을 사용하려면 다음을 참조하세요. MSK 서버리스 클러스터 사용 시작하기. 서버리스 클러스터를 생성합니다. AWS 자격 증명 및 액세스 관리 (IAM) 역할 및 클라이언트 머신.

Python을 사용하여 Kafka 주제 만들기

클러스터와 클라이언트 머신이 준비되면 SSH를 통해 클라이언트 머신에 연결하고 Kafka Python과 Python용 MSK IAM 라이브러리를 설치합니다.

pip install kafka-python

pip install aws-msk-iam-sasl-signer-python

  • 라는 새 파일을 만듭니다 createTopic.py.
  • 다음 코드를 이 파일에 복사하여 bootstrap_serversregion 클러스터에 대한 세부정보가 포함된 정보입니다. 검색에 대한 지침은 bootstrap_servers MSK 클러스터에 대한 정보는 다음을 참조하세요. Amazon MSK 클러스터용 부트스트랩 브로커 가져오기.
from kafka.admin import KafkaAdminClient, NewTopic
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# AWS region where MSK cluster is located
region= '<UPDATE_AWS_REGION_NAME_HERE>'

# Class to provide MSK authentication token
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

# Create an instance of MSKTokenProvider class
tp = MSKTokenProvider()

# Initialize KafkaAdminClient with required configurations
admin_client = KafkaAdminClient(
    bootstrap_servers='<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id='client1',
)

# create topic
topic_name="mytopic"
topic_list =[NewTopic(name=topic_name, num_partitions=1, replication_factor=2)]
existing_topics = admin_client.list_topics()
if(topic_name not in existing_topics):
    admin_client.create_topics(topic_list)
    print("Topic has been created")
else:
    print("topic already exists!. List of topics are:" + str(existing_topics))

  • 실행 createTopic.py 라는 새로운 Kafka 주제를 생성하는 스크립트 mytopic 서버리스 클러스터에서:
python createTopic.py

Python을 사용하여 레코드 생성

몇 가지 샘플 모뎀 원격 측정 데이터를 생성해 보겠습니다.

  • 라는 새 파일을 만듭니다 kafkaDataGen.py.
  • 다음 코드를 이 파일에 복사하여 업데이트합니다. BROKERSregion 클러스터 세부정보가 포함된 정보:
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import random
from datetime import datetime
topicname='mytopic'

BROKERS = '<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>'
region= '<UPDATE_AWS_REGION_NAME_HERE>'
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers=BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    retry_backoff_ms=500,
    request_timeout_ms=20000,
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,)

# Method to get a random model name
def getModel():
    products=["Ultra WiFi Modem", "Ultra WiFi Booster", "EVG2000", "Sagemcom 5366 TN", "ASUS AX5400"]
    randomnum = random.randint(0, 4)
    return (products[randomnum])

# Method to get a random interface status
def getInterfaceStatus():
    status=["connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "down", "down"]
    randomnum = random.randint(0, 13)
    return (status[randomnum])

# Method to get a random CPU usage
def getCPU():
    i = random.randint(50, 100)
    return (str(i))

# Method to get a random memory usage
def getMemory():
    i = random.randint(1000, 1500)
    return (str(i))
    
# Method to generate sample data
def generateData():
    
    model=getModel()
    deviceid='dvc' + str(random.randint(1000, 10000))
    interface='eth4.1'
    interfacestatus=getInterfaceStatus()
    cpuusage=getCPU()
    memoryusage=getMemory()
    now = datetime.now()
    event_time = now.strftime("%Y-%m-%d %H:%M:%S")
    
    modem_data={}
    modem_data["model"]=model
    modem_data["deviceid"]=deviceid
    modem_data["interface"]=interface
    modem_data["interfacestatus"]=interfacestatus
    modem_data["cpuusage"]=cpuusage
    modem_data["memoryusage"]=memoryusage
    modem_data["event_time"]=event_time
    return modem_data

# Continuously generate and send data
while True:
    data =generateData()
    print(data)
    try:
        future = producer.send(topicname, value=data)
        producer.flush()
        record_metadata = future.get(timeout=10)
        
    except Exception as e:
        print(e.with_traceback())

  • 실행 kafkaDataGen.py 지속적으로 임의의 데이터를 생성하고 이를 지정된 Kafka 주제에 게시하려면 다음을 수행합니다.
python kafkaDataGen.py

Amazon S3에 이벤트 저장

이제 모든 원시 이벤트 데이터를 아마존 단순 스토리지 서비스 (Amazon S3) 분석을 위한 데이터 레이크. 동일한 데이터를 사용하여 ML 모델을 교육할 수 있습니다. 그만큼 Amazon Data Firehose와의 통합 Amazon MSK를 사용하면 Apache Kafka 클러스터의 데이터를 S3 데이터 레이크로 원활하게 로드할 수 있습니다. 다음 단계를 완료하면 Kafka에서 Amazon S3로 데이터를 지속적으로 스트리밍하여 자체 커넥터 애플리케이션을 구축하거나 관리할 필요가 없습니다.

  • Amazon S3 콘솔에서 새 버킷을 생성합니다. 기존 버킷을 사용할 수도 있습니다.
  • S3 버킷에 다음이라는 새 폴더를 만듭니다. streamingDataLake.
  • Amazon MSK 콘솔에서 MSK 서버리스 클러스터를 선택합니다.
  • 행위 메뉴, 선택 클러스터 정책 수정.

클러스터 정책

  • 선택 Firehose 서비스 주체 포함 선택하고 변경 사항을 저장.

Firehose 서비스 주체

  • S3 배송 탭에서 배달 스트림 만들기.

배달 스트림

  • 럭셔리 출처선택한다. 아마존 MSK.
  • 럭셔리 목적지선택한다. 아마존 S3.

소스와 대상

  • 럭셔리 Amazon MSK 클러스터 연결, 고르다 개인 부트스트랩 브로커.
  • 럭셔리 주제, 주제 이름을 입력하세요(이 게시물의 경우 mytopic).

소스 설정

  • 럭셔리 S3 버킷선택한다. 검색 S3 버킷을 선택합니다.
  • 엔터 버튼 streamingDataLake S3 버킷 접두사로 사용됩니다.
  • 엔터 버튼 streamingDataLakeErr S3 버킷 오류 출력 접두사로 사용됩니다.

목적지 설정

  • 왼쪽 메뉴에서 배달 스트림 만들기.

전송 스트림 생성

데이터가 S3 버킷에 기록되었는지 확인할 수 있습니다. 당신은 streamingDataLake 디렉토리가 생성되고 파일이 파티션에 저장됩니다.

아마존 s3

DynamoDB에 이벤트 저장

마지막 단계에서는 가장 최근의 모뎀 데이터를 DynamoDB에 저장합니다. 이를 통해 클라이언트 응용 프로그램은 모뎀 상태에 액세스하고 낮은 대기 시간과 높은 가용성으로 어디서나 원격으로 모뎀과 상호 작용할 수 있습니다. Lambda는 Amazon MSK와 원활하게 작동합니다.. Lambda는 내부적으로 이벤트 소스의 새 메시지를 폴링한 다음 대상 Lambda 함수를 동기식으로 호출합니다. Lambda는 메시지를 일괄적으로 읽고 이를 이벤트 페이로드로 함수에 제공합니다.

먼저 DynamoDB에서 테이블을 생성해 보겠습니다. 인용하다 DynamoDB API 권한: 작업, 리소스 및 조건 참조 클라이언트 컴퓨터에 필요한 권한이 있는지 확인합니다.

  • 라는 새 파일을 만듭니다 createTable.py.
  • 다음 코드를 파일에 복사하여 업데이트합니다. region 정보 :
import boto3
region='<UPDATE_AWS_REGION_NAME_HERE>'
dynamodb = boto3.client('dynamodb', region_name=region)
table_name = 'device_status'
key_schema = [
    {
        'AttributeName': 'deviceid',
        'KeyType': 'HASH'
    }
]
attribute_definitions = [
    {
        'AttributeName': 'deviceid',
        'AttributeType': 'S'
    }
]
# Create the table with on-demand capacity mode
dynamodb.create_table(
    TableName=table_name,
    KeySchema=key_schema,
    AttributeDefinitions=attribute_definitions,
    BillingMode='PAY_PER_REQUEST'
)
print(f"Table '{table_name}' created with on-demand capacity mode.")

  • 실행 createTable.py 라는 테이블을 생성하는 스크립트 device_status DynamoDB에서:
python createTable.py

이제 Lambda 함수를 구성해 보겠습니다.

  • Lambda 콘솔에서 기능 탐색 창에서
  • 왼쪽 메뉴에서 기능 만들기.
  • 선택 처음부터 저자.
  • 럭셔리 기능 명¸ 이름을 입력하십시오 (예 : my-notification-kafka).
  • 럭셔리 런타임선택한다. 파이썬 3.11.
  • 럭셔리 권한, 고르다 기존 역할 사용 그리고 역할을 선택하세요 클러스터에서 읽을 수 있는 권한.
  • 함수를 만듭니다.

이제 Lambda 함수 구성 페이지에서 소스, 대상 및 애플리케이션 코드를 구성할 수 있습니다.

  • 왼쪽 메뉴에서 트리거 추가.
  • 럭셔리 트리거 구성, 입력 MSK Amazon MSK를 Lambda 소스 함수에 대한 트리거로 구성합니다.
  • 럭셔리 MSK 클러스터, 입력 myCluster.
  • 선택 취소 트리거 활성화, 아직 Lambda 함수를 구성하지 않았기 때문입니다.
  • 럭셔리 배치 크기, 입력 100.
  • 럭셔리 시작 위치선택한다. 최근 소식.
  • 럭셔리 주제 이름¸ 이름을 입력하십시오 (예 : mytopic).
  • 왼쪽 메뉴에서 추가.
  • Lambda 함수 세부 정보 페이지에서 암호 탭에서 다음 코드를 입력하십시오.
import base64
import boto3
import json
import os
import random

def convertjson(payload):
    try:
        aa=json.loads(payload)
        return aa
    except:
        return 'err'

def lambda_handler(event, context):
    base64records = event['records']['mytopic-0']
    
    raw_records = [base64.b64decode(x["value"]).decode('utf-8') for x in base64records]
    
    for record in raw_records:
        item = json.loads(record)
        deviceid=item['deviceid']
        interface=item['interface']
        interfacestatus=item['interfacestatus']
        cpuusage=item['cpuusage']
        memoryusage=item['memoryusage']
        event_time=item['event_time']
        
        dynamodb = boto3.client('dynamodb')
        table_name = 'device_status'
        item = {
            'deviceid': {'S': deviceid},  
            'interface': {'S': interface},               
            'interface': {'S': interface},
            'interfacestatus': {'S': interfacestatus},
            'cpuusage': {'S': cpuusage},          
            'memoryusage': {'S': memoryusage},
            'event_time': {'S': event_time},
        }
        
        # Write the item to the DynamoDB table
        response = dynamodb.put_item(
            TableName=table_name,
            Item=item
        )
        
        print(f"Item written to DynamoDB")

  • Lambda 함수를 배포합니다.
  • 구성 탭에서 편집 트리거를 편집합니다.

트리거 편집

  • 트리거를 선택한 다음 찜하기.
  • DynamoDB 콘솔에서 항목 탐색 탐색 창에서
  • 표 선택 device_status.

Lambda가 Kafka 주제에서 생성된 이벤트를 DynamoDB에 쓰는 것을 볼 수 있습니다.

DDB 테이블

요약

스트리밍 데이터 파이프라인은 실시간 애플리케이션을 구축하는 데 중요합니다. 그러나 인프라를 설정하고 관리하는 것은 어려울 수 있습니다. 이 게시물에서는 Amazon MSK, Lambda, DynamoDB, Amazon Data Firehose 및 기타 서비스를 사용하여 AWS에서 서버리스 스트리밍 파이프라인을 구축하는 방법을 살펴보았습니다. 주요 이점은 관리할 서버가 없으며 인프라의 자동 확장성, 완전 관리형 서비스를 사용하는 종량제 모델입니다.

자신만의 실시간 파이프라인을 구축할 준비가 되셨나요? 지금 무료 AWS 계정으로 시작해보세요. 서버리스의 강력한 기능을 사용하면 AWS가 획일적인 무거운 작업을 처리하는 동안 사용자는 애플리케이션 로직에 집중할 수 있습니다. AWS에서 멋진 것을 만들어 봅시다!


저자에 관하여

마수두르 라하마 사엠 AWS의 스트리밍 데이터 아키텍트입니다. 그는 실제 비즈니스 문제를 해결하기 위해 데이터 스트리밍 아키텍처를 설계하고 구축하기 위해 전 세계 AWS 고객과 협력합니다. 그는 스트리밍 데이터 서비스와 NoSQL을 사용하는 솔루션 최적화를 전문으로 합니다. Sayem은 분산 컴퓨팅에 매우 열정적입니다.

마이클 오기케 Amazon MSK의 제품 관리자입니다. 그는 데이터를 사용하여 행동을 유도하는 통찰력을 찾는 데 열정을 갖고 있습니다. 그는 다양한 업계의 고객이 데이터 스트리밍을 사용하여 비즈니스를 개선하도록 돕는 것을 좋아합니다. Michael은 또한 책과 팟캐스트를 통해 행동 과학과 심리학에 대해 배우는 것을 좋아합니다.

spot_img

최신 인텔리전스

spot_img