제퍼넷 로고

AWS Glue, AWS DMS 및 Amazon DynamoDB를 사용하여 실시간 서버리스 데이터 분석을 위해 스트리밍 데이터 소스를 CDC 데이터와 결합 | 아마존 웹 서비스

시간

고객은 기존의 분석 작업을 수행하기 위해 데이터 웨어하우징 솔루션을 사용해 왔습니다. 최근 데이터 레이크는 확장성, 내결함성, 정형, 반정형 및 비정형 데이터 세트 지원과 같은 이점을 제공하기 때문에 분석 솔루션의 기반이 되기 위해 많은 주목을 받고 있습니다.

데이터 레이크는 기본적으로 트랜잭션이 아닙니다. 그러나 ACID 속성으로 데이터 레이크를 향상시키는 여러 오픈 소스 프레임워크가 있어 트랜잭션 및 비트랜잭션 스토리지 메커니즘 간에 최상의 솔루션을 제공합니다.

데이터 정리 및 참조 데이터와의 조인과 같은 작업이 포함된 기존의 배치 수집 및 처리 파이프라인은 만들기가 간단하고 유지 관리 비용이 효율적입니다. 그러나 거의 실시간 제공 SLA를 통해 빠른 속도로 사물 인터넷(IoT) 및 클릭스트림과 같은 데이터 세트를 수집하는 데 어려움이 있습니다. 또한 원본 시스템에서 대상 시스템으로 변경 데이터 캡처(CDC)를 사용하여 증분 업데이트를 적용할 수 있습니다. 적시에 데이터 기반 의사 결정을 내리려면 특히 참조 데이터도 빠르게 변경되는 경우 누락된 레코드 및 배압을 설명하고 이벤트 순서 및 무결성을 유지해야 합니다.

이 게시물에서는 이러한 문제를 해결하는 것을 목표로 합니다. 다음을 사용하여 실시간으로 변경되는 참조 테이블에 스트리밍 데이터를 조인하는 단계별 가이드를 제공합니다. AWS 접착제, 아마존 DynamoDBAWS 데이터베이스 마이그레이션 서비스 (AWS DMS). 또한 다음을 사용하여 스트리밍 데이터를 트랜잭션 데이터 레이크로 수집하는 방법을 시연합니다. 아파치 후디 ACID 트랜잭션으로 증분 업데이트를 달성합니다.

솔루션 개요

예시 사용 사례의 경우 스트리밍 데이터가 Amazon Kinesis 데이터 스트림, 참조 데이터는 MySQL에서 관리합니다. 참조 데이터는 AWS DMS를 통해 MySQL에서 DynamoDB로 지속적으로 복제됩니다. 여기서 요구 사항은 거의 실시간으로 참조 데이터와 조인하여 실시간 스트림 데이터를 강화하고 다음과 같은 쿼리 엔진에서 쿼리할 수 있도록 만드는 것입니다. 아마존 아테나 일관성을 유지하면서. 이 사용 사례에서는 요구 사항이 변경될 때 MySQL의 참조 데이터를 업데이트할 수 있으며 쿼리는 참조 데이터에 업데이트를 반영하여 결과를 반환해야 합니다.

이 솔루션은 참조 데이터 세트의 크기가 작을 때 변경되는 참조 데이터 세트로 스트림에 참여하려는 사용자의 문제를 해결합니다. 참조 데이터는 DynamoDB 테이블에서 유지 관리되며 스트리밍 작업은 각 마이크로 배치에 대해 전체 테이블을 메모리에 로드하여 높은 처리량 스트림을 작은 참조 데이터 세트에 결합합니다.

다음 다이어그램은 솔루션 아키텍처를 보여줍니다.

아키텍처

사전 조건

이 연습에서는 다음과 같은 전제 조건이 있어야합니다.

IAM 역할 및 S3 버킷 생성

이 섹션에서는 아마존 단순 스토리지 서비스 (Amazon S3) 버킷 및 XNUMX개 AWS 자격 증명 및 액세스 관리 (IAM) 역할: 하나는 AWS Glue 작업용이고 다른 하나는 AWS DMS용입니다. 우리는 이것을 사용하여 AWS 클라우드 포메이션 주형. 다음 단계를 완료하십시오.

  1. AWS CloudFormation 콘솔에 로그인합니다.
  2. 왼쪽 메뉴에서 발사 스택::
  3. 왼쪽 메뉴에서 다음 보기.
  4. 럭셔리 스택 이름스택 이름을 입력하십시오.
  5. 럭셔리 DynamoDB 테이블 이름, 입력 tgt_country_lookup_table. 새 DynamoDB 테이블의 이름입니다.
  6. 럭셔리 S3BucketName접두사, 새 S3 버킷의 접두사를 입력합니다.
  7. 선택 AWS CloudFormation이 사용자 지정 이름으로 IAM 리소스를 생성 할 수 있음을 인정합니다.
  8. 왼쪽 메뉴에서 스택 생성.

스택 생성에는 약 1분이 소요될 수 있습니다.

Kinesis 데이터 스트림 생성

이 섹션에서는 Kinesis 데이터 스트림을 생성합니다.

  1. Kinesis 콘솔에서 다음을 선택합니다. 데이터 스트림 탐색 창에서
  2. 왼쪽 메뉴에서 데이터 스트림 생성.
  3. 럭셔리 데이터 스트림 이름, 스트림 이름을 입력하십시오.
  4. 나머지 설정은 기본값으로두고 데이터 스트림 생성.

Kinesis 데이터 스트림은 온디맨드 모드로 생성됩니다.

Aurora MySQL 클러스터 생성 및 구성

이 섹션에서는 Aurora MySQL 클러스터를 소스 데이터베이스로 생성하고 구성합니다. 첫 번째, CDC를 활성화하도록 소스 Aurora MySQL 데이터베이스 클러스터 구성 AWS DMS를 통해 DynamoDB로.

파라미터 그룹 생성

새 파라미터 그룹을 생성하려면 다음 단계를 완료하십시오.

  1. Amazon RDS 콘솔에서 다음을 선택합니다. 매개변수 그룹 탐색 창에서
  2. 왼쪽 메뉴에서 매개변수 그룹 만들기.
  3. 럭셔리 매개변수 그룹 제품군, 고르다 aurora-mysql5.7.
  4. 럭셔리 타입선택한다. DB 클러스터 파라미터 그룹.
  5. 럭셔리 그룹 이름, 입력 my-mysql-dynamodb-cdc.
  6. 럭셔리 상품 설명, 입력 Parameter group for demo Aurora MySQL database.
  7. 왼쪽 메뉴에서 만들기.
  8. 선택 my-mysql-dynamodb-cdc, 선택 편집 아래에 매개변수 그룹 작업.
  9. 다음과 같이 매개변수 그룹을 편집합니다.
성함 가치관
binlog_row_image 가득 찬
binlog_format
binlog_checksum 없음
log_slave_updates 1
  1. 왼쪽 메뉴에서 변경 사항을 저장.

RDS 매개변수 그룹

Aurora MySQL 클러스터 생성

Aurora MySQL 클러스터를 생성하려면 다음 단계를 완료하십시오.

  1. Amazon RDS 콘솔에서 다음을 선택합니다. 데이터베이스 탐색 창에서
  2. 왼쪽 메뉴에서 데이터베이스 생성.
  3. 럭셔리 데이터베이스 생성 방법 선택선택한다. 표준 생성.
  4. $XNUMX Million 미만 엔진 옵션에 대한 엔진 종류선택한다. 오로라(MySQL 호환).
  5. 럭셔리 엔진 버전선택한다. 오로라(MySQL 5.7) 2.11.2.
  6. 럭셔리 Canva의 제작된 채널아트 템플릿을선택한다. 생산.
  7. $XNUMX Million 미만 설정에 대한 DB 클러스터 식별자, 데이터베이스 이름을 입력하십시오.
  8. 럭셔리 마스터 사용자 이름, 기본 사용자 이름을 입력하십시오.
  9. 럭셔리 마스터 비밀번호마스터 비밀번호 확인, 기본 비밀번호를 입력하십시오.
  10. $XNUMX Million 미만 인스턴스 구성에 대한 DB 인스턴스 클래스선택한다. 버스트 가능한 클래스(t 클래스 포함) 선택하고 db.t3.소형.
  11. $XNUMX Million 미만 가용성 및 내구성에 대한 다중 AZ 배포선택한다. Aurora 복제본을 생성하지 마십시오.
  12. $XNUMX Million 미만 입/출력 라인에 대한 컴퓨팅 리소스선택한다. EC2 컴퓨팅 리소스에 연결하지 마십시오..
  13. 럭셔리 네트워크 유형선택한다. IPv4.
  14. 럭셔리 가상 사설 클라우드(VPC), VPC를 선택합니다.
  15. 럭셔리 DB 서브넷 그룹, 퍼블릭 서브넷을 선택합니다.
  16. 럭셔리 공공 액세스선택한다. 가능.
  17. 럭셔리 VPC 보안 그룹(방화벽), 퍼블릭 서브넷에 대한 보안 그룹을 선택합니다.
  18. $XNUMX Million 미만 데이터베이스 인증에 대한 데이터베이스 인증 옵션선택한다. 비밀번호 인증.
  19. $XNUMX Million 미만 추가 구성에 대한 DB 클러스터 파라미터 그룹에서 이전에 생성한 클러스터 파라미터 그룹을 선택합니다.
  20. 왼쪽 메뉴에서 데이터베이스 생성.

원본 데이터베이스에 권한 부여

다음 단계는 소스 Aurora MySQL 데이터베이스에 필요한 권한을 부여하는 것입니다. 이제 다음을 사용하여 DB 클러스터에 연결할 수 있습니다. MySQL 유틸리티. 쿼리를 실행하여 다음 작업을 완료할 수 있습니다.

  • 데모 데이터베이스 및 테이블 생성 및 데이터에 대한 쿼리 실행
  • AWS DMS 엔드포인트에서 사용하는 사용자에게 권한 부여

다음 단계를 완료하십시오.

  1. DB 클러스터에 연결하는 데 사용 중인 EC2 인스턴스에 로그인합니다.
  2. 명령 프롬프트에 다음 명령을 입력하여 DB 클러스터의 기본 DB 인스턴스에 연결합니다.
$ mysql -h mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com -P 3306 -u admin -p

  1. 다음 SQL 명령을 실행하여 데이터베이스를 생성합니다.
> CREATE DATABASE mydev;

  1. 다음 SQL 명령을 실행하여 테이블을 생성합니다.
> use mydev; > CREATE TABLE country_lookup_table
(
code varchar(5),
countryname varchar(40) not null,
combinedname varchar(40) not null
);

  1. 다음 SQL 명령을 실행하여 테이블을 데이터로 채웁니다.
> INSERT INTO country_lookup_table(code, countryname, combinedname) VALUES ('IN', 'India', 'IN-India'), ('US', 'USA', 'US-USA'), ('CA', 'Canada', 'CA-Canada'), ('CN', 'China', 'CN-China');

  1. 다음 SQL 명령을 실행하여 AWS DMS 엔드포인트에 대한 사용자를 생성하고 CDC 작업에 대한 권한 부여 (자리 표시자를 원하는 비밀번호로 교체):
> CREATE USER repl IDENTIFIED BY '<your-password>';
> GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'repl'@'%';
> GRANT SELECT ON mydev.country_lookup_table TO 'repl'@'%';

DynamoDB 참조 테이블에 데이터를 로드하도록 AWS DMS 리소스 생성 및 구성

이 단원에서는 DynamoDB 참조 테이블에 데이터를 복제하도록 AWS DMS를 생성하고 구성합니다.

AWS DMS 복제 인스턴스 생성

먼저 다음 단계를 완료하여 AWS DMS 복제 인스턴스를 생성합니다.

  1. AWS DMS 콘솔에서 다음을 선택합니다. 복제 인스턴스 탐색 창에서
  2. 왼쪽 메뉴에서 복제 인스턴스 생성.
  3. $XNUMX Million 미만 설정에 대한 성함, 인스턴스 이름을 입력합니다.
  4. $XNUMX Million 미만 인스턴스 구성에 대한 고 가용성선택한다. 개발 또는 테스트 워크로드(단일 AZ).
  5. $XNUMX Million 미만 연결 및 보안에 대한 VPC 보안 그룹선택한다. 디폴트 값.
  6. 왼쪽 메뉴에서 복제 인스턴스 생성.

Amazon VPC 엔드포인트 생성

선택적으로 생성할 수 있습니다. DynamoDB용 Amazon VPC 엔드포인트 프라이빗 네트워크의 AWS DMS 인스턴스에서 DynamoDB 테이블에 연결해야 하는 경우. 또한 활성화했는지 확인하십시오. 공개적으로 접근 가능 VPC 외부의 데이터베이스에 연결해야 하는 경우.

AWS DMS 소스 엔드포인트 생성

다음 단계를 완료하여 AWS DMS 소스 엔드포인트를 생성합니다.

  1. AWS DMS 콘솔에서 다음을 선택합니다. 종점 탐색 창에서
  2. 왼쪽 메뉴에서 엔드 포인트 생성.
  3. 럭셔리 끝점 유형선택한다. 소스 엔드포인트.
  4. $XNUMX Million 미만 끝점 구성에 대한 끝점 식별자, 끝점의 이름을 입력합니다.
  5. 럭셔리 소스 엔진선택한다. 아마존 오로라 MySQL.
  6. 럭셔리 엔드포인트 데이터베이스에 대한 액세스선택한다. 수동으로 접근 정보 제공.
  7. 럭셔리 서버 이름, Aurora 작성자 인스턴스의 엔드포인트 이름을 입력합니다(예: mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com).
  8. 럭셔리 포트, 입력 3306.
  9. 럭셔리 사용자 이름에서 AWS DMS 작업의 사용자 이름을 입력합니다.
  10. 럭셔리 비밀번호, 암호를 입력하십시오.
  11. 왼쪽 메뉴에서 엔드 포인트 생성.

AWS DMS 대상 엔드포인트 생성

다음 단계를 완료하여 AWS DMS 대상 엔드포인트를 생성합니다.

  1. AWS DMS 콘솔에서 다음을 선택합니다. 종점 탐색 창에서
  2. 왼쪽 메뉴에서 엔드 포인트 생성.
  3. 럭셔리 끝점 유형선택한다. 대상 엔드포인트.
  4. $XNUMX Million 미만 끝점 구성에 대한 끝점 식별자, 끝점의 이름을 입력합니다.
  5. 럭셔리 대상 엔진선택한다. 아마존 DynamoDB.
  6. 럭셔리 서비스 액세스 역할 ARN, AWS DMS 작업에 대한 IAM 역할을 입력합니다.
  7. 왼쪽 메뉴에서 엔드 포인트 생성.

AWS DMS 마이그레이션 작업 생성

다음 단계를 완료하여 AWS DMS 데이터베이스 마이그레이션 작업을 생성합니다.

  1. AWS DMS 콘솔에서 다음을 선택합니다. 데이터베이스 마이그레이션 작업 탐색 창에서
  2. 왼쪽 메뉴에서 작업 만들기.
  3. $XNUMX Million 미만 작업 구성에 대한 작업 식별자, 작업 이름을 입력합니다.
  4. 럭셔리 복제 인스턴스, 복제 인스턴스를 선택합니다.
  5. 럭셔리 소스 데이터베이스 엔드포인트, 소스 엔드포인트를 선택합니다.
  6. 럭셔리 대상 데이터베이스 끝점, 대상 엔드포인트를 선택합니다.
  7. 럭셔리 마이그레이션 유형선택한다. 기존 데이터 마이그레이션 및 지속적인 변경 복제.
  8. $XNUMX Million 미만 작업 설정에 대한 대상 테이블 준비 모드선택한다. 아무것도하지 않고.
  9. 럭셔리 전체 로드 완료 후 작업 중지선택한다. 멈추지 마.
  10. 럭셔리 LOB 열 설정선택한다. 제한된 LOB 모드.
  11. 럭셔리 작업 로그, 활성화 CloudWatch 로그 켜기일괄 최적화 적용 켜기.
  12. $XNUMX Million 미만 테이블 매핑선택한다. JSON 편집기 다음 규칙을 입력합니다.

여기에서 열에 값을 추가할 수 있습니다. 다음 규칙을 사용하여 AWS DMS CDC 작업은 먼저 지정된 이름으로 새 DynamoDB 테이블을 생성합니다. target-table-name. 그런 다음 모든 레코드를 복제합니다. DB 테이블의 열을 DynamoDB 테이블의 속성에 매핑.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "mydev", "table-name": "country_lookup_table" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "2", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "mydev", "table-name": "country_lookup_table" }, "target-table-name": "tgt_country_lookup_table", "mapping-parameters": { "partition-key-name": "code", "sort-key-name": "countryname", "exclude-columns": [ "code", "countryname" ], "attribute-mappings": [ { "target-attribute-name": "code", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${code}" }, { "target-attribute-name": "countryname", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${countryname}" } ], "apply-during-cdc": true } } ]
}

DMS 테이블 매핑

  1. 왼쪽 메뉴에서 작업 만들기.

이제 AWS DMS 복제 작업이 시작되었습니다.

  1. 기다려라. Status 로 표시 로드 완료.

DMS 작업

  1. DynamoDB 콘솔에서 테이블 탐색 창에서
  2. DynamoDB 참조 테이블을 선택하고 다음을 선택합니다. 테이블 항목 탐색 복제된 레코드를 검토합니다.

DynamoDB 참조 테이블 초기

AWS Glue 데이터 카탈로그 테이블 및 AWS Glue 스트리밍 ETL 작업 생성

이 단원에서는 AWS Glue 데이터 카탈로그 테이블과 AWS Glue 스트리밍 추출, 변환 및 로드(ETL) 작업을 생성합니다.

데이터 카탈로그 테이블 생성

다음 단계에 따라 원본 Kinesis 데이터 스트림에 대한 AWS Glue 데이터 카탈로그 테이블을 생성합니다.

  1. AWS Glue 콘솔에서 데이터베이스 아래에 데이터 카탈로그 탐색 창에서
  2. 왼쪽 메뉴에서 데이터베이스 추가.
  3. 럭셔리 성함, 입력 my_kinesis_db.
  4. 왼쪽 메뉴에서 데이터베이스 생성.
  5. 왼쪽 메뉴에서 테이블 아래에 데이터베이스다음을 선택 테이블 추가.
  6. 럭셔리 성함, 입력 my_stream_src_table.
  7. 럭셔리 데이터베이스선택한다. my_kinesis_db.
  8. 럭셔리 소스 유형 선택선택한다. 운동성.
  9. 럭셔리 Kinesis 데이터 스트림은 다음 위치에 있습니다.선택한다. 내 계정.
  10. 럭셔리 Kinesis 스트림 이름, 데이터 스트림의 이름을 입력합니다.
  11. 럭셔리 분류, 고르다 JSON.
  12. 왼쪽 메뉴에서 다음 보기.
  13. 왼쪽 메뉴에서 스키마를 JSON으로 편집, 다음 JSON을 입력한 다음 선택 찜하기.
[ { "Name": "uuid", "Type": "string", "Comment": "" }, { "Name": "country", "Type": "string", "Comment": "" }, { "Name": "itemtype", "Type": "string", "Comment": "" }, { "Name": "saleschannel", "Type": "string", "Comment": "" }, { "Name": "orderpriority", "Type": "string", "Comment": "" }, { "Name": "orderdate", "Type": "string", "Comment": "" }, { "Name": "region", "Type": "string", "Comment": "" }, { "Name": "shipdate", "Type": "string", "Comment": "" }, { "Name": "unitssold", "Type": "string", "Comment": "" }, { "Name": "unitprice", "Type": "string", "Comment": "" }, { "Name": "unitcost", "Type": "string", "Comment": "" }, { "Name": "totalrevenue", "Type": "string", "Comment": "" }, { "Name": "totalcost", "Type": "string", "Comment": "" }, { "Name": "totalprofit", "Type": "string", "Comment": "" }, { "Name": "impressiontime", "Type": "string", "Comment": "" }
]

접착제 카탈로그 테이블 스키마

    1. 왼쪽 메뉴에서 다음 보기다음을 선택 만들기.

AWS Glue 스트리밍 ETL 작업 생성

다음으로 AWS Glue 스트리밍 작업을 생성합니다. AWS Glue 3.0 이상은 기본적으로 Apache Hudi를 지원합니다., 그래서 우리는 이 기본 통합을 사용하여 Hudi 테이블로 수집합니다. AWS Glue 스트리밍 작업을 생성하려면 다음 단계를 완료하십시오.

  1. AWS Glue Studio 콘솔에서 다음을 선택합니다. 스파크 스크립트 편집기 선택하고 만들기.
  2. $XNUMX Million 미만 직업 세부 정보 탭, 성함, 작업 이름을 입력하십시오.
  3. 럭셔리 IAM 역할, AWS Glue 작업에 대한 IAM 역할을 선택합니다.
  4. 럭셔리 타입, 고르다 스파크 스트리밍.
  5. 럭셔리 접착제 버전선택한다. Glue 4.0 – spark 3.3, Scala 2, Python 3 지원.
  6. 럭셔리 요청된 작업자 수, 입력 3.
  7. $XNUMX Million 미만 고급 속성에 대한 작업 매개 변수선택한다. 새 매개변수 추가.
  8. 럭셔리 , 입력 --conf.
  9. 럭셔리 가치관, 입력 spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false.
  10. 왼쪽 메뉴에서 새 매개변수 추가.
  11. 럭셔리 , 입력 --datalake-formats.
  12. 럭셔리 가치관, 입력 hudi.
  13. 럭셔리 스크립트 경로, 입력 s3://<S3BucketName>/scripts/.
  14. 럭셔리 임시 경로, 입력 s3://<S3BucketName>/temporary/.
  15. 선택적으로 Spark UI 로그 경로, 입력 s3://<S3BucketName>/sparkHistoryLogs/.

접착제 작업 매개변수

  1. 스크립트 탭에서 AWS Glue Studio 편집기에 다음 스크립트를 입력하고 선택합니다. 만들기.

실시간에 가까운 스트리밍 작업은 Kinesis 데이터 스트림을 자주 업데이트되는 참조 데이터가 포함된 DynamoDB 테이블과 조인하여 데이터를 보강합니다. 보강된 데이터 세트는 데이터 레이크의 대상 Hudi 테이블에 로드됩니다. 바꾸다 AWS CloudFormation을 통해 생성한 버킷으로:

import sys, json
import boto3
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job args = getResolvedOptions(sys.argv,["JOB_NAME"]) # Initialize spark session and Glue context
sc = SparkContext() glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args) # job paramters
dydb_lookup_table = "tgt_country_lookup_table"
kin_src_database_name = "my_kinesis_db" kin_src_table_name = "my_stream_src_table" hudi_write_operation = "upsert" hudi_record_key = "uuid" hudi_precomb_key = "orderdate" checkpoint_path = "s3://<S3BucketName>/streamlab/checkpoint/" s3_output_folder = "s3://<S3BucketName>/output/"
hudi_table = "hudi_table"
hudi_database = "my_kinesis_db" # hudi options additional_options={ "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.write.recordkey.field": hudi_record_key, "hoodie.datasource.hive_sync.database": hudi_database, "hoodie.table.name": hudi_table, "hoodie.consistency.check.enabled": "true", "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.NonpartitionedKeyGenerator", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.NonPartitionedExtractor", "hoodie.datasource.write.hive_style_partitioning": "false", "hoodie.datasource.write.precombine.field": hudi_precomb_key, "hoodie.bulkinsert.shuffle.parallelism": "4", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.write.operation": hudi_write_operation, "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
} # Scan and load the reference data table from DynamoDB into AWS Glue DynamicFrames using boto3 API.
def readDynamoDb(): dynamodb = boto3.resource(“dynamodb”) table = dynamodb.Table(dydb_lookup_table) response = table.scan() items = response[“Items”] jsondata = sc.parallelize(items) lookupDf = glueContext.read.json(jsondata) return lookupDf # Load the Amazon Kinesis data stream from Amazon Glue Data Catalog.
source_df = glueContext.create_data_frame.from_catalog( database=kin_src_database_name, table_name=kin_src_table_name, transformation_ctx=”source_df”, additional_options={“startingPosition”: “TRIM_HORIZON”},
) # As part of batch processing, implement the transformation logic for joining streaming data frames with reference data frames.
def processBatch(data_frame, batchId): if data_frame.count() > 0: # Refresh the dymanodb table to pull latest snapshot for each microbatch country_lookup_df = readDynamoDb() final_frame = data_frame.join( country_lookup_df, data_frame["country"] == country_lookup_df["countryname"], 'left' ).drop( "countryname", "country", "unitprice", "unitcost", "totalrevenue", "totalcost", "totalprofit" ) # Script generated for node my-lab-hudi-connector final_frame.write.format("hudi") .options(**additional_options) .mode("append") .save(s3_output_folder) try: glueContext.forEachBatch( frame=source_df, batch_function=processBatch, options={"windowSize": "60 seconds", "checkpointLocation": checkpoint_path}, )
except Exception as e: print(f"Error is @@@ ....{e}")

  1. 왼쪽 메뉴에서 달리기 스트리밍 작업을 시작합니다.

다음 스크린샷은 DataFrames의 예를 보여줍니다. data_frame, country_lookup_dffinal_frame.

Glue 작업 로그 출력 초기

AWS Glue 작업은 DynamoDB의 Kinesis 데이터 스트림과 참조 테이블에서 오는 레코드를 성공적으로 조인한 다음 조인된 레코드를 Hudi 형식으로 Amazon S3에 수집했습니다.

샘플 데이터를 생성하고 Kinesis 데이터 스트림에 로드하는 Python 스크립트 생성 및 실행

이 섹션에서는 Python을 생성하고 실행하여 샘플 데이터를 생성하고 소스 Kinesis 데이터 스트림에 로드합니다. 다음 단계를 완료하십시오.

  1. AWS Cloud9, EC2 인스턴스 또는 데이터 스트림에 기록을 저장하는 다른 컴퓨팅 호스트에 로그인합니다.
  2. 라는 Python 파일을 만듭니다. generate-data-for-kds.py:
$ python3 generate-data-for-kds.py

  1. Python 파일을 열고 다음 스크립트를 입력합니다.
import json
import random
import boto3
import time STREAM_NAME = "<mystreamname>" def get_data(): return { "uuid": random.randrange(0, 1000001, 1), "country": random.choice( [ "United Arab Emirates", "China", "India", "United Kingdom", "United States of America", ] ), "itemtype": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ), "saleschannel": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ), "orderpriority": random.choice(["H", "L", "M", "C"]), "orderdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12", "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", "7/15/14", "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", "1/27/17", "2/25/17", "3/10/17", "4/1/17", ] ), "region": random.choice( ["Asia" "Europe", "Americas", "Middle Eastern", "Africa"] ), "shipdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12", "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", "7/15/14", "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", "1/27/17", "2/25/17", "3/10/17", "4/1/17", ] ), "unitssold": random.choice( [ "8217", "3465", "8877", "2882", "70", "7044", "6307", "2384", "1327", "2572", "8794", "4131", "5793", "9091", "4314", "9085", "5270", "5459", "1982", "8245", "4860", "4656", "8072", "65", "7864", "9778", ] ), "unitprice": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ), "unitcost": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ), "totalrevenue": random.choice( [ "1253749.86", "712750.5", "3745117.53", "1925954.14", "30604", "1448950.8", "689228.96", "22242.72", "145014.56", "23996.76", "961008.32", "337626.63", "1478837.04", "6075242.57", "887389.8", "742517.05", "3431876.7", "3648085.93", "161988.86", "673863.85", "1240660.8", "380534.88", "882108.16", "16593.2", "5255275.28", "463966.1", ] ), "totalcost": random.choice( [ "800664.48", "405786.15", "3237353.13", "1448320.28", "18433.1", "824922.84", "226042.88", "16497.28", "47559.68", "17798.24", "315176.96", "234103.77", "923520.06", "4568591.14", "505212.54", "514846.95", "2766539.2", "2743365.86", "112319.94", "467244.15", "774781.2", "263855.52", "289300.48", "10362.3", "3951974.56", "310842.62", ] ), "totalprofit": random.choice( [ "453085.38", "306964.35", "507764.4", "477633.86", "12170.9", "624027.96", "463186.08", "5745.44", "97454.88", "6198.52", "645831.36", "103522.86", "555316.98", "1506651.43", "382177.26", "227670.1", "665337.5", "904720.07", "49668.92", "206619.7", "465879.6", "116679.36", "592807.68", "6230.9", "1303300.72", "153123.48", ] ), "impressiontime": random.choice( [ "2022-10-24T02:27:41Z", "2022-10-24T02:27:41Z", "2022-11-24T02:27:41Z", "2022-12-24T02:27:41Z", "2022-13-24T02:27:41Z", "2022-14-24T02:27:41Z", "2022-15-24T02:27:41Z", ] ), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) time.sleep(2) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

이 스크립트는 2초마다 Kinesis 데이터 스트림 레코드를 넣습니다.

Aurora MySQL 클러스터에서 참조 테이블 업데이트 시뮬레이션

이제 모든 리소스와 구성이 준비되었습니다. 이 예에서는 추가하고 싶습니다. 3자리 국가 코드 참조 테이블에. Aurora MySQL 테이블의 레코드를 업데이트하여 변경 사항을 시뮬레이션해 보겠습니다. 다음 단계를 완료하십시오.

  1. AWS Glue 스트리밍 작업이 이미 실행 중인지 확인합니다.
  2. 앞에서 설명한 대로 기본 DB 인스턴스에 다시 연결합니다.
  3. SQL 명령을 입력하여 레코드를 업데이트하십시오.
> UPDATE country_lookup_table SET combinedname='US-USA-US' WHERE code='US';
> UPDATE country_lookup_table SET combinedname='CA-CAN-Canada' WHERE code='CA';
> UPDATE country_lookup_table SET combinedname='CN-CHN-China' WHERE code='CN';
> UPDATE country_lookup_table SET combinedname='IN-IND-India' WHERE code='IN';

이제 Aurora MySQL 소스 데이터베이스의 참조 테이블이 업데이트되었습니다. 그러면 변경 사항이 DynamoDB의 참조 테이블에 자동으로 복제됩니다.

DynamoDB 참조 테이블 업데이트됨

다음 표는 레코드를 보여줍니다. data_frame, country_lookup_dffinal_frame. 에 country_lookup_dffinal_frameWalk Through California 프로그램, combinedname 열에는 다음과 같은 형식의 값이 있습니다. <2-digit-country-code>-<3-digit-country-code>-<country-name>, AWS Glue 스트리밍 작업을 다시 시작하지 않고도 참조 테이블의 변경된 레코드가 테이블에 반영됨을 보여줍니다. 즉, 참조 테이블이 변경되는 경우에도 AWS Glue 작업이 Kinesis 데이터 스트림에서 들어오는 레코드를 참조 테이블과 성공적으로 조인합니다.
Glue 작업 로그 출력 업데이트됨

Athena를 사용하여 Hudi 테이블 쿼리

대상 테이블의 레코드를 보기 위해 Athena를 사용하여 Hudi 테이블을 쿼리해 보겠습니다. 다음 단계를 완료하십시오.

  1. 스크립트와 AWS Glue 스트리밍 작업이 계속 작동하는지 확인합니다.
    1. 파이썬 스크립트 (generate-data-for-kds.py) 아직 실행 중입니다.
    2. 생성된 데이터는 데이터 스트림으로 전송됩니다.
    3. AWS Glue 스트리밍 작업이 아직 실행 중입니다.
  2. Athena 콘솔의 쿼리 편집기에서 다음 SQL을 실행합니다.
select shipdate, unitssold, impressiontime, code,combinedname from <database>.<table>
where combinedname is not null
limit 10;

다음 쿼리 결과는 참조 테이블이 변경되기 전에 처리된 레코드를 보여줍니다. 기록 combinedname 열은 다음과 유사합니다. <2-digit-country-code>-<country-name>.

Athena 쿼리 결과 초기

다음 쿼리 결과는 참조 테이블이 변경된 후 처리된 레코드를 보여줍니다. 기록 combinedname 열은 다음과 유사합니다. <2-digit-country-code>-<3-digit-country-code>-<country-name>.

Athena 쿼리 결과 업데이트됨

이제 변경된 참조 데이터가 Kinesis 데이터 스트림의 레코드와 DynamoDB의 참조 데이터를 조인하는 대상 Hudi 테이블에 성공적으로 반영되었음을 이해합니다.

정리

마지막 단계로 리소스를 정리합니다.

  1. Kinesis 데이터 스트림을 삭제합니다.
  2. AWS DMS 마이그레이션 작업, 엔드포인트 및 복제 인스턴스를 삭제합니다.
  3. AWS Glue 스트리밍 작업을 중지하고 삭제합니다.
  4. AWS Cloud9 환경을 삭제합니다.
  5. CloudFormation 템플릿을 삭제합니다.

결론

실시간 데이터 수집 및 처리와 관련된 트랜잭션 데이터 레이크를 구축하고 유지 관리하려면 사용할 수집 서비스, 참조 데이터를 저장하는 방법, 사용할 트랜잭션 데이터 레이크 프레임워크와 같은 여러 변수 구성 요소와 결정을 내려야 합니다. 이 게시물에서는 AWS 기본 구성 요소를 빌딩 블록으로 사용하고 Apache Hudi를 트랜잭션 데이터 레이크용 오픈 소스 프레임워크로 사용하여 이러한 파이프라인의 구현 세부 정보를 제공했습니다.

우리는 이 솔루션이 그러한 요구 사항을 가진 새로운 데이터 레이크를 구현하려는 조직을 위한 출발점이 될 수 있다고 믿습니다. 또한 서로 다른 구성 요소는 완전히 연결 가능하며 기존 데이터 레이크에 혼합 및 일치시켜 새로운 요구 사항을 대상으로 하거나 기존 요구 사항을 마이그레이션하여 문제점을 해결할 수 있습니다.


저자 소개

매니쉬 콜라 AWS의 Data Lab Solutions Architect로서 다양한 업계의 고객과 긴밀히 협력하여 데이터 분석 및 AI 요구 사항에 맞는 클라우드 네이티브 솔루션을 설계합니다. 그는 AWS 여정에서 고객과 협력하여 비즈니스 문제를 해결하고 확장 가능한 프로토타입을 구축합니다. AWS에 합류하기 전에 Manish는 고객이 데이터 웨어하우스, BI, 데이터 통합 ​​및 데이터 레이크 프로젝트를 구현하도록 지원한 경험이 있습니다.

산토시 코타기리 가시적인 비즈니스 결과를 이끌어내는 데이터 분석 및 클라우드 솔루션 경험이 있는 AWS의 솔루션 아키텍트입니다. 그의 전문 지식은 클라우드 네이티브 및 오픈 소스 서비스에 중점을 두고 산업 전반의 클라이언트를 위한 확장 가능한 데이터 분석 솔루션을 설계하고 구현하는 데 있습니다. 그는 비즈니스 성장을 주도하고 복잡한 문제를 해결하기 위해 기술을 활용하는 데 열정적입니다.

스기모토 치호 AWS 빅 데이터 지원 팀의 클라우드 지원 엔지니어입니다. 그녀는 고객이 ETL 워크로드를 사용하여 데이터 레이크를 구축하도록 돕는 데 열정적입니다. 그녀는 행성 과학을 좋아하고 주말에 소행성 류구를 연구하는 것을 즐깁니다.

노리 타카 세키 야마 AWS Glue 팀의 수석 빅 데이터 설계자입니다. 그는 고객을 돕기 위해 소프트웨어 아티팩트를 구축하는 일을 담당하고 있습니다. 여가 시간에는 새 로드 바이크로 자전거를 즐깁니다.

spot_img

최신 인텔리전스

spot_img

우리와 함께 채팅

안녕하세요! 어떻게 도와 드릴까요?