Logo Zephyrnet

Tham gia nguồn dữ liệu phát trực tuyến với dữ liệu CDC để phân tích dữ liệu không cần máy chủ theo thời gian thực bằng AWS Glue, AWS DMS và Amazon DynamoDB | Dịch vụ web của Amazon

Ngày:

Khách hàng đã và đang sử dụng các giải pháp kho dữ liệu để thực hiện các nhiệm vụ phân tích truyền thống của họ. Gần đây, các hồ dữ liệu đã đạt được nhiều sức hút để trở thành nền tảng cho các giải pháp phân tích, bởi vì chúng đi kèm với các lợi ích như khả năng mở rộng, khả năng chịu lỗi và hỗ trợ cho các bộ dữ liệu có cấu trúc, bán cấu trúc và phi cấu trúc.

Hồ dữ liệu không phải là giao dịch theo mặc định; tuy nhiên, có nhiều khung nguồn mở giúp tăng cường hồ dữ liệu với các thuộc tính ACID, cung cấp giải pháp tốt nhất cho cả hai thế giới giữa cơ chế lưu trữ giao dịch và phi giao dịch.

Quy trình xử lý và nhập hàng loạt truyền thống liên quan đến các hoạt động như làm sạch dữ liệu và kết hợp với dữ liệu tham chiếu rất dễ tạo và tiết kiệm chi phí để duy trì. Tuy nhiên, có một thách thức đối với việc nhập các bộ dữ liệu, chẳng hạn như Internet of Things (IoT) và luồng nhấp chuột, với tốc độ nhanh với SLA phân phối gần như theo thời gian thực. Bạn cũng sẽ muốn áp dụng các bản cập nhật gia tăng với thu thập dữ liệu thay đổi (CDC) từ hệ thống nguồn đến đích. Để kịp thời đưa ra quyết định dựa trên dữ liệu, bạn cần tính đến các bản ghi bị bỏ sót và áp lực ngược, đồng thời duy trì thứ tự và tính toàn vẹn của sự kiện, đặc biệt nếu dữ liệu tham chiếu cũng thay đổi nhanh chóng.

Trong bài đăng này, chúng tôi mong muốn giải quyết những thách thức này. Chúng tôi cung cấp hướng dẫn từng bước để tham gia truyền dữ liệu vào bảng tham chiếu thay đổi theo thời gian thực bằng cách sử dụng Keo AWS, Máy phát điện AmazonDịch vụ di chuyển cơ sở dữ liệu AWS (AWS DMS). Chúng tôi cũng trình bày cách nhập dữ liệu phát trực tuyến vào kho dữ liệu giao dịch bằng cách sử dụng Apache Hudi để đạt được các cập nhật gia tăng với các giao dịch ACID.

Tổng quan về giải pháp

Đối với trường hợp sử dụng ví dụ của chúng tôi, truyền dữ liệu đang đi qua Luồng dữ liệu Amazon Kinesisvà dữ liệu tham chiếu được quản lý trong MySQL. Dữ liệu tham chiếu liên tục được sao chép từ MySQL sang DynamoDB thông qua AWS DMS. Yêu cầu ở đây là làm phong phú thêm dữ liệu luồng thời gian thực bằng cách kết hợp với dữ liệu tham chiếu trong thời gian gần như thực và làm cho nó có thể truy vấn được từ một công cụ truy vấn, chẳng hạn như amazon Athena trong khi vẫn giữ tính nhất quán. Trong trường hợp sử dụng này, dữ liệu tham chiếu trong MySQL có thể được cập nhật khi yêu cầu thay đổi, sau đó các truy vấn cần trả về kết quả bằng cách phản ánh các cập nhật trong dữ liệu tham chiếu.

Giải pháp này giải quyết vấn đề người dùng muốn tham gia các luồng bằng cách thay đổi tập dữ liệu tham chiếu khi kích thước của tập dữ liệu tham chiếu nhỏ. Dữ liệu tham chiếu được duy trì trong các bảng DynamoDB và tác vụ truyền trực tuyến sẽ tải toàn bộ bảng vào bộ nhớ cho từng lô vi mô, kết hợp một luồng thông lượng cao với một tập dữ liệu tham chiếu nhỏ.

Sơ đồ sau minh họa kiến ​​trúc giải pháp.

Kiến trúc

Điều kiện tiên quyết

Đối với hướng dẫn này, bạn nên có các điều kiện tiên quyết sau:

Tạo vai trò IAM và bộ chứa S3

Trong phần này, bạn tạo một Dịch vụ lưu trữ đơn giản của Amazon thùng (Amazon S3) và hai Quản lý truy cập và nhận dạng AWS (IAM): một vai trò cho công việc AWS Glue và một cho AWS DMS. Chúng tôi làm điều này bằng cách sử dụng một Hình thành đám mây AWS mẫu. Hoàn thành các bước sau:

  1. Đăng nhập vào bảng điều khiển AWS CloudFormation.
  2. Chọn Khởi chạy Stack::
  3. Chọn Sau.
  4. Trong Tên ngăn xếp, nhập tên cho ngăn xếp của bạn.
  5. Trong Tên bảng DynamoDB, đi vào tgt_country_lookup_table. Đây là tên của bảng DynamoDB mới của bạn.
  6. Trong Tiền tố tên nhóm S3, hãy nhập tiền tố của bộ chứa S3 mới của bạn.
  7. Chọn Tôi xác nhận rằng AWS CloudFormation có thể tạo tài nguyên IAM với tên tùy chỉnh.
  8. Chọn Tạo ngăn xếp.

Quá trình tạo ngăn xếp có thể mất khoảng 1 phút.

Tạo luồng dữ liệu Kinesis

Trong phần này, bạn tạo luồng dữ liệu Kinesis:

  1. Trên bảng điều khiển Kinesis, chọn luồng dữ liệu trong khung điều hướng.
  2. Chọn Tạo luồng dữ liệu.
  3. Trong Tên luồng dữ liệu, hãy nhập tên luồng của bạn.
  4. Để các cài đặt còn lại làm mặc định và chọn Tạo luồng dữ liệu.

Luồng dữ liệu Kinesis được tạo bằng chế độ theo yêu cầu.

Tạo và đặt cấu hình cụm Aurora MySQL

Trong phần này, bạn tạo và đặt cấu hình cụm Aurora MySQL làm cơ sở dữ liệu nguồn. Đầu tiên, định cấu hình cụm cơ sở dữ liệu Aurora MySQL nguồn của bạn để bật CDC thông qua AWS DMS đến DynamoDB.

Tạo một nhóm tham số

Hoàn thành các bước sau để tạo một nhóm thông số mới:

  1. Trên bảng điều khiển Amazon RDS, chọn Nhóm thông số trong khung điều hướng.
  2. Chọn Tạo nhóm thông số.
  3. Trong Họ nhóm thông số, lựa chọn aurora-mysql5.7.
  4. Trong Kiểu, chọn Nhóm tham số cụm DB.
  5. Trong Tên nhóm, đi vào my-mysql-dynamodb-cdc.
  6. Trong Mô tả, đi vào Parameter group for demo Aurora MySQL database.
  7. Chọn Tạo.
  8. Chọn my-mysql-dynamodb-cdc, và lựa chọn Chỉnh sửa Dưới Hành động của nhóm thông số.
  9. Chỉnh sửa nhóm tham số như sau:
Họ tên Giá trị
binlog_row_image Full
binlog_format HÀNG
binlog_checksum NONE
log_slave_updates 1
  1. Chọn Lưu các thay đổi.

Nhóm thông số RDS

Tạo cụm Aurora MySQL

Hoàn thành các bước sau để tạo cụm Aurora MySQL:

  1. Trên bảng điều khiển Amazon RDS, chọn Cơ sở dữ liệu trong khung điều hướng.
  2. Chọn Tạo cơ sở dữ liệu.
  3. Trong Chọn phương pháp tạo cơ sở dữ liệu, chọn tạo tiêu chuẩn.
  4. Theo tùy chọn động cơ, Cho Loại động cơ, chọn Aurora (Tương thích với MySQL).
  5. Trong Phiên bản động cơ, chọn Cực quang (MySQL 5.7) 2.11.2.
  6. Trong Templates, chọn Sản lượng.
  7. Theo Cài đặt, Cho mã định danh cụm DB, hãy nhập tên cho cơ sở dữ liệu của bạn.
  8. Trong tên người dùng chính, hãy nhập tên người dùng chính của bạn.
  9. Trong Mật khẩu cấp caoXác nhận mật khẩu chính, hãy nhập mật khẩu chính của bạn.
  10. Theo cấu hình phiên bản, Cho lớp phiên bản cơ sở dữ liệu, chọn Các lớp ổn định (bao gồm t lớp) Và chọn db.t3.small.
  11. Theo Sẵn có & độ bền, Cho Triển khai Multi-AZ, chọn Không tạo Bản sao Aurora.
  12. Theo Kết nối, Cho tài nguyên máy tính, chọn Không kết nối với tài nguyên điện toán EC2.
  13. Trong loại mạng, chọn IPv4.
  14. Trong Đám mây riêng ảo (VPC), hãy chọn VPC của bạn.
  15. Trong Nhóm mạng con DB, hãy chọn mạng con công cộng của bạn.
  16. Trong Quyền truy cập công khai, chọn .
  17. Trong Nhóm bảo mật VPC (tường lửa), hãy chọn nhóm bảo mật cho mạng con công khai của bạn.
  18. Theo xác thực cơ sở dữ liệu, Cho Tùy chọn xác thực cơ sở dữ liệu, chọn Xác thực mật khẩu.
  19. Theo Cấu hình bổ sung, Cho Nhóm tham số cụm DB, hãy chọn nhóm tham số cụm mà bạn đã tạo trước đó.
  20. Chọn Tạo cơ sở dữ liệu.

Cấp quyền cho cơ sở dữ liệu nguồn

Bước tiếp theo là cấp quyền cần thiết trên cơ sở dữ liệu Aurora MySQL nguồn. Bây giờ bạn có thể kết nối với cụm DB bằng cách sử dụng tiện ích MySQL. Bạn có thể chạy truy vấn để hoàn thành các tác vụ sau:

  • Tạo cơ sở dữ liệu demo và bảng và chạy các truy vấn trên dữ liệu
  • Cấp quyền cho người dùng được sử dụng bởi điểm cuối AWS DMS

Hoàn thành các bước sau:

  1. Đăng nhập vào phiên bản EC2 mà bạn đang sử dụng để kết nối với cụm DB của mình.
  2. Nhập lệnh sau tại dấu nhắc lệnh để kết nối với phiên bản DB chính của cụm DB của bạn:
$ mysql -h mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com -P 3306 -u admin -p

  1. Chạy lệnh SQL sau để tạo cơ sở dữ liệu:
> CREATE DATABASE mydev;

  1. Chạy lệnh SQL sau để tạo bảng:
> use mydev; > CREATE TABLE country_lookup_table
(
code varchar(5),
countryname varchar(40) not null,
combinedname varchar(40) not null
);

  1. Chạy lệnh SQL sau để điền dữ liệu vào bảng:
> INSERT INTO country_lookup_table(code, countryname, combinedname) VALUES ('IN', 'India', 'IN-India'), ('US', 'USA', 'US-USA'), ('CA', 'Canada', 'CA-Canada'), ('CN', 'China', 'CN-China');

  1. Chạy lệnh SQL sau để tạo người dùng cho điểm cuối AWS DMS và cấp quyền cho nhiệm vụ CDC (thay thế trình giữ chỗ bằng mật khẩu ưa thích của bạn):
> CREATE USER repl IDENTIFIED BY '<your-password>';
> GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'repl'@'%';
> GRANT SELECT ON mydev.country_lookup_table TO 'repl'@'%';

Tạo và định cấu hình tài nguyên AWS DMS để tải dữ liệu vào bảng tham chiếu DynamoDB

Trong phần này, bạn tạo và định cấu hình AWS DMS để sao chép dữ liệu vào bảng tham chiếu DynamoDB.

Tạo phiên bản sao chép AWS DMS

Trước tiên, hãy tạo một phiên bản sao chép AWS DMS bằng cách hoàn thành các bước sau:

  1. Trên bảng điều khiển AWS DMS, hãy chọn trường hợp sao chép trong khung điều hướng.
  2. Chọn Tạo phiên bản sao chép.
  3. Theo Cài đặt, Cho Họ tên, hãy nhập tên cho phiên bản của bạn.
  4. Theo cấu hình phiên bản, Cho Tính sẵn sàng cao, chọn Khối lượng công việc dành cho nhà phát triển hoặc thử nghiệm (Một vùng sẵn sàng).
  5. Theo Kết nối và bảo mật, Cho Nhóm bảo mật VPC, chọn mặc định.
  6. Chọn Tạo phiên bản sao chép.

Tạo điểm cuối Amazon VPC

Theo tùy chọn, bạn có thể tạo Điểm cuối Amazon VPC cho DynamoDB khi bạn cần kết nối với bảng DynamoDB của mình từ phiên bản AWS DMS trong một mạng riêng. Cũng đảm bảo rằng bạn kích hoạt Truy cập công cộng khi bạn cần kết nối với cơ sở dữ liệu bên ngoài VPC của mình.

Tạo điểm cuối nguồn AWS DMS

Tạo điểm cuối nguồn AWS DMS bằng cách hoàn tất các bước sau:

  1. Trên bảng điều khiển AWS DMS, hãy chọn Điểm cuối trong khung điều hướng.
  2. Chọn Tạo điểm cuối.
  3. Trong loại điểm cuối, chọn điểm cuối nguồn.
  4. Theo cấu hình điểm cuối, Cho định danh điểm cuối, hãy nhập tên cho điểm cuối của bạn.
  5. Trong công cụ nguồn, chọn Amazon AuroraMySQL.
  6. Trong Truy cập vào cơ sở dữ liệu điểm cuối, chọn Cung cấp thông tin truy cập theo cách thủ công.
  7. Trong Name Server, hãy nhập tên điểm cuối của phiên bản trình ghi Aurora của bạn (ví dụ: mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com).
  8. Trong Hải cảng, đi vào 3306.
  9. Trong Tên người dùng, hãy nhập tên người dùng cho tác vụ AWS DMS của bạn.
  10. Trong Mật khẩu, gõ mật khẩu.
  11. Chọn Tạo điểm cuối.

Tạo một điểm cuối mục tiêu AWS DMS

Tạo điểm cuối đích AWS DMS bằng cách hoàn tất các bước sau:

  1. Trên bảng điều khiển AWS DMS, hãy chọn Điểm cuối trong khung điều hướng.
  2. Chọn Tạo điểm cuối.
  3. Trong loại điểm cuối, chọn Điểm cuối mục tiêu.
  4. Theo cấu hình điểm cuối, Cho định danh điểm cuối, hãy nhập tên cho điểm cuối của bạn.
  5. Trong động cơ mục tiêu, chọn Máy phát điện Amazon.
  6. Trong Vai trò truy cập dịch vụ ARN, hãy nhập vai trò IAM cho tác vụ AWS DMS của bạn.
  7. Chọn Tạo điểm cuối.

Tạo tác vụ di chuyển AWS DMS

Tạo tác vụ di chuyển cơ sở dữ liệu AWS DMS bằng cách hoàn thành các bước sau:

  1. Trên bảng điều khiển AWS DMS, hãy chọn Nhiệm vụ di chuyển cơ sở dữ liệu trong khung điều hướng.
  2. Chọn Tạo nhiệm vụ.
  3. Theo cấu hình tác vụ, Cho định danh nhiệm vụ, hãy nhập tên cho nhiệm vụ của bạn.
  4. Trong Ví dụ sao chép, hãy chọn phiên bản sao chép của bạn.
  5. Trong Điểm cuối cơ sở dữ liệu nguồn, hãy chọn điểm cuối nguồn của bạn.
  6. Trong Điểm cuối cơ sở dữ liệu đích, chọn điểm cuối mục tiêu của bạn.
  7. Trong loại di chuyển, chọn Di chuyển dữ liệu hiện có và sao chép các thay đổi đang diễn ra.
  8. Theo Cài đặt tác vụ, Cho Chế độ chuẩn bị bảng mục tiêu, chọn không làm gì cả.
  9. Trong Dừng tác vụ sau khi hoàn thành tải đầy đủ, chọn Đừng dừng lại.
  10. Trong Cài đặt cột LOB, chọn Chế độ LOB hạn chế.
  11. Trong Nhật ký tác vụ, cho phép Bật nhật ký CloudWatchBật áp dụng tối ưu hóa hàng loạt.
  12. Theo ánh xạ bảng, chọn Trình soạn thảo JSON và nhập các quy tắc sau.

Tại đây bạn có thể thêm giá trị vào cột. Với các quy tắc sau, tác vụ AWS DMS CDC trước tiên sẽ tạo một bảng DynamoDB mới với tên được chỉ định trong target-table-name. Sau đó, nó sẽ sao chép tất cả các bản ghi, ánh xạ các cột trong bảng DB với các thuộc tính trong bảng DynamoDB.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "mydev", "table-name": "country_lookup_table" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "2", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "mydev", "table-name": "country_lookup_table" }, "target-table-name": "tgt_country_lookup_table", "mapping-parameters": { "partition-key-name": "code", "sort-key-name": "countryname", "exclude-columns": [ "code", "countryname" ], "attribute-mappings": [ { "target-attribute-name": "code", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${code}" }, { "target-attribute-name": "countryname", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${countryname}" } ], "apply-during-cdc": true } } ]
}

Ánh xạ bảng DMS

  1. Chọn Tạo nhiệm vụ.

Giờ đây, tác vụ sao chép AWS DMS đã được bắt đầu.

  1. Chờ Trạng thái để hiển thị như Tải hoàn tất.

tác vụ DMS

  1. Trên bảng điều khiển DynamoDB, chọn Bàn trong khung điều hướng.
  2. Chọn bảng tham chiếu DynamoDB và chọn Khám phá các mục bảng để xem xét các hồ sơ sao chép.

Ban đầu bảng tham chiếu DynamoDB

Tạo bảng Danh mục dữ liệu AWS Glue và tác vụ ETL truyền phát AWS Glue

Trong phần này, bạn tạo bảng Danh mục dữ liệu AWS Glue và tác vụ trích xuất, chuyển đổi và tải (ETL) luồng AWS Glue.

Tạo bảng Danh mục dữ liệu

Tạo bảng AWS Glue Data Catalog cho luồng dữ liệu Kinesis nguồn theo các bước sau:

  1. Trên bảng điều khiển AWS Glue, hãy chọn Cơ sở dữ liệu Dưới Danh mục dữ liệu trong khung điều hướng.
  2. Chọn Thêm cơ sở dữ liệu.
  3. Trong Họ tên, đi vào my_kinesis_db.
  4. Chọn Tạo cơ sở dữ liệu.
  5. Chọn Bàn Dưới Cơ sở dữ liệu, sau đó chọn Thêm bảng.
  6. Trong Họ tên, đi vào my_stream_src_table.
  7. Trong Cơ sở dữ liệu, chọn my_kinesis_db.
  8. Trong Chọn loại nguồn, chọn chuyển động.
  9. Trong Luồng dữ liệu Kinesis nằm trong, chọn tài khoản của tôi.
  10. Trong Tên luồng Kinesis, hãy nhập tên cho luồng dữ liệu của bạn.
  11. Trong phân loại, lựa chọn JSON.
  12. Chọn Sau.
  13. Chọn Chỉnh sửa lược đồ dưới dạng JSON, nhập JSON sau, sau đó chọn Lưu.
[ { "Name": "uuid", "Type": "string", "Comment": "" }, { "Name": "country", "Type": "string", "Comment": "" }, { "Name": "itemtype", "Type": "string", "Comment": "" }, { "Name": "saleschannel", "Type": "string", "Comment": "" }, { "Name": "orderpriority", "Type": "string", "Comment": "" }, { "Name": "orderdate", "Type": "string", "Comment": "" }, { "Name": "region", "Type": "string", "Comment": "" }, { "Name": "shipdate", "Type": "string", "Comment": "" }, { "Name": "unitssold", "Type": "string", "Comment": "" }, { "Name": "unitprice", "Type": "string", "Comment": "" }, { "Name": "unitcost", "Type": "string", "Comment": "" }, { "Name": "totalrevenue", "Type": "string", "Comment": "" }, { "Name": "totalcost", "Type": "string", "Comment": "" }, { "Name": "totalprofit", "Type": "string", "Comment": "" }, { "Name": "impressiontime", "Type": "string", "Comment": "" }
]

Lược đồ bảng Danh mục keo

    1. Chọn Sau, sau đó chọn Tạo.

Tạo một công việc ETL phát trực tuyến AWS Glue

Tiếp theo, bạn tạo một tác vụ phát trực tuyến AWS Glue. AWS Glue 3.0 trở lên hỗ trợ Apache Hudi nguyên bản, vì vậy chúng tôi sử dụng tích hợp gốc này để nhập vào bảng Hudi. Hoàn thành các bước sau để tạo tác vụ phát trực tuyến AWS Glue:

  1. Trên bảng điều khiển AWS Glue Studio, hãy chọn Trình chỉnh sửa tập lệnh Spark Và chọn Tạo.
  2. Theo Chi tiết công việc tab, cho Họ tên, hãy nhập tên cho công việc của bạn.
  3. Trong Vai trò IAM, hãy chọn vai trò IAM cho tác vụ AWS Glue của bạn.
  4. Trong Kiểu, lựa chọn truyền phát tia lửa.
  5. Trong Phiên bản keo, chọn Keo 4.0 - Hỗ trợ spark 3.3, Scala 2, Python 3.
  6. Trong Số lượng công nhân yêu cầu, đi vào 3.
  7. Theo Thuộc tính nâng cao, Cho Thông số công việc, chọn Thêm thông số mới.
  8. Trong Key, đi vào --conf.
  9. Trong Giá trị, đi vào spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false.
  10. Chọn Thêm thông số mới.
  11. Trong Key, đi vào --datalake-formats.
  12. Trong Giá trị, đi vào hudi.
  13. Trong Đường dẫn tập lệnh, đi vào s3://<S3BucketName>/scripts/.
  14. Trong Đường dẫn tạm thời, đi vào s3://<S3BucketName>/temporary/.
  15. Tùy chọn, cho Đường dẫn nhật ký giao diện người dùng Spark, đi vào s3://<S3BucketName>/sparkHistoryLogs/.

Tham số công việc keo

  1. trên Script tab, hãy nhập tập lệnh sau vào trình chỉnh sửa AWS Glue Studio và chọn Tạo.

Công việc truyền trực tuyến gần thời gian thực làm phong phú thêm dữ liệu bằng cách kết hợp luồng dữ liệu Kinesis với bảng DynamoDB chứa dữ liệu tham chiếu được cập nhật thường xuyên. Tập dữ liệu đã làm giàu được tải vào bảng Hudi đích trong kho dữ liệu. Thay thế với bộ chứa mà bạn đã tạo qua AWS CloudFormation:

import sys, json
import boto3
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job args = getResolvedOptions(sys.argv,["JOB_NAME"]) # Initialize spark session and Glue context
sc = SparkContext() glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args) # job paramters
dydb_lookup_table = "tgt_country_lookup_table"
kin_src_database_name = "my_kinesis_db" kin_src_table_name = "my_stream_src_table" hudi_write_operation = "upsert" hudi_record_key = "uuid" hudi_precomb_key = "orderdate" checkpoint_path = "s3://<S3BucketName>/streamlab/checkpoint/" s3_output_folder = "s3://<S3BucketName>/output/"
hudi_table = "hudi_table"
hudi_database = "my_kinesis_db" # hudi options additional_options={ "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.write.recordkey.field": hudi_record_key, "hoodie.datasource.hive_sync.database": hudi_database, "hoodie.table.name": hudi_table, "hoodie.consistency.check.enabled": "true", "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.NonpartitionedKeyGenerator", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.NonPartitionedExtractor", "hoodie.datasource.write.hive_style_partitioning": "false", "hoodie.datasource.write.precombine.field": hudi_precomb_key, "hoodie.bulkinsert.shuffle.parallelism": "4", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.write.operation": hudi_write_operation, "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
} # Scan and load the reference data table from DynamoDB into AWS Glue DynamicFrames using boto3 API.
def readDynamoDb(): dynamodb = boto3.resource(“dynamodb”) table = dynamodb.Table(dydb_lookup_table) response = table.scan() items = response[“Items”] jsondata = sc.parallelize(items) lookupDf = glueContext.read.json(jsondata) return lookupDf # Load the Amazon Kinesis data stream from Amazon Glue Data Catalog.
source_df = glueContext.create_data_frame.from_catalog( database=kin_src_database_name, table_name=kin_src_table_name, transformation_ctx=”source_df”, additional_options={“startingPosition”: “TRIM_HORIZON”},
) # As part of batch processing, implement the transformation logic for joining streaming data frames with reference data frames.
def processBatch(data_frame, batchId): if data_frame.count() > 0: # Refresh the dymanodb table to pull latest snapshot for each microbatch country_lookup_df = readDynamoDb() final_frame = data_frame.join( country_lookup_df, data_frame["country"] == country_lookup_df["countryname"], 'left' ).drop( "countryname", "country", "unitprice", "unitcost", "totalrevenue", "totalcost", "totalprofit" ) # Script generated for node my-lab-hudi-connector final_frame.write.format("hudi") .options(**additional_options) .mode("append") .save(s3_output_folder) try: glueContext.forEachBatch( frame=source_df, batch_function=processBatch, options={"windowSize": "60 seconds", "checkpointLocation": checkpoint_path}, )
except Exception as e: print(f"Error is @@@ ....{e}")

  1. Chọn chạy để bắt đầu công việc phát trực tuyến.

Ảnh chụp màn hình sau đây hiển thị các ví dụ về DataFrames data_frame, country_lookup_dffinal_frame.

Đầu ra nhật ký công việc dán keo ban đầu

Công việc AWS Glue đã nối thành công các bản ghi đến từ luồng dữ liệu Kinesis và bảng tham chiếu trong DynamoDB, sau đó nhập các bản ghi đã nối vào Amazon S3 ở định dạng Hudi.

Tạo và chạy tập lệnh Python để tạo dữ liệu mẫu và tải vào luồng dữ liệu Kinesis

Trong phần này, bạn tạo và chạy Python để tạo dữ liệu mẫu và tải dữ liệu đó vào luồng dữ liệu Kinesis nguồn. Hoàn thành các bước sau:

  1. Đăng nhập vào AWS Cloud9, phiên bản EC2 của bạn hoặc bất kỳ máy chủ lưu trữ điện toán nào khác đặt bản ghi vào luồng dữ liệu của bạn.
  2. Tạo một tệp Python có tên generate-data-for-kds.py:
$ python3 generate-data-for-kds.py

  1. Mở tệp Python và nhập đoạn mã sau:
import json
import random
import boto3
import time STREAM_NAME = "<mystreamname>" def get_data(): return { "uuid": random.randrange(0, 1000001, 1), "country": random.choice( [ "United Arab Emirates", "China", "India", "United Kingdom", "United States of America", ] ), "itemtype": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ), "saleschannel": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ), "orderpriority": random.choice(["H", "L", "M", "C"]), "orderdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12", "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", "7/15/14", "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", "1/27/17", "2/25/17", "3/10/17", "4/1/17", ] ), "region": random.choice( ["Asia" "Europe", "Americas", "Middle Eastern", "Africa"] ), "shipdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12", "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", "7/15/14", "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", "1/27/17", "2/25/17", "3/10/17", "4/1/17", ] ), "unitssold": random.choice( [ "8217", "3465", "8877", "2882", "70", "7044", "6307", "2384", "1327", "2572", "8794", "4131", "5793", "9091", "4314", "9085", "5270", "5459", "1982", "8245", "4860", "4656", "8072", "65", "7864", "9778", ] ), "unitprice": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ), "unitcost": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ), "totalrevenue": random.choice( [ "1253749.86", "712750.5", "3745117.53", "1925954.14", "30604", "1448950.8", "689228.96", "22242.72", "145014.56", "23996.76", "961008.32", "337626.63", "1478837.04", "6075242.57", "887389.8", "742517.05", "3431876.7", "3648085.93", "161988.86", "673863.85", "1240660.8", "380534.88", "882108.16", "16593.2", "5255275.28", "463966.1", ] ), "totalcost": random.choice( [ "800664.48", "405786.15", "3237353.13", "1448320.28", "18433.1", "824922.84", "226042.88", "16497.28", "47559.68", "17798.24", "315176.96", "234103.77", "923520.06", "4568591.14", "505212.54", "514846.95", "2766539.2", "2743365.86", "112319.94", "467244.15", "774781.2", "263855.52", "289300.48", "10362.3", "3951974.56", "310842.62", ] ), "totalprofit": random.choice( [ "453085.38", "306964.35", "507764.4", "477633.86", "12170.9", "624027.96", "463186.08", "5745.44", "97454.88", "6198.52", "645831.36", "103522.86", "555316.98", "1506651.43", "382177.26", "227670.1", "665337.5", "904720.07", "49668.92", "206619.7", "465879.6", "116679.36", "592807.68", "6230.9", "1303300.72", "153123.48", ] ), "impressiontime": random.choice( [ "2022-10-24T02:27:41Z", "2022-10-24T02:27:41Z", "2022-11-24T02:27:41Z", "2022-12-24T02:27:41Z", "2022-13-24T02:27:41Z", "2022-14-24T02:27:41Z", "2022-15-24T02:27:41Z", ] ), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) time.sleep(2) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

Tập lệnh này đặt bản ghi luồng dữ liệu Kinesis cứ sau 2 giây.

Mô phỏng cập nhật bảng tham chiếu trong cụm Aurora MySQL

Bây giờ tất cả các tài nguyên và cấu hình đã sẵn sàng. Đối với ví dụ này, chúng tôi muốn thêm mã quốc gia gồm 3 chữ số vào bảng tham chiếu. Hãy cập nhật các bản ghi trong bảng Aurora MySQL để mô phỏng các thay đổi. Hoàn thành các bước sau:

  1. Đảm bảo rằng công việc phát trực tuyến AWS Glue đang chạy.
  2. Kết nối lại với phiên bản CSDL chính, như được mô tả trước đó.
  3. Nhập các lệnh SQL của bạn để cập nhật bản ghi:
> UPDATE country_lookup_table SET combinedname='US-USA-US' WHERE code='US';
> UPDATE country_lookup_table SET combinedname='CA-CAN-Canada' WHERE code='CA';
> UPDATE country_lookup_table SET combinedname='CN-CHN-China' WHERE code='CN';
> UPDATE country_lookup_table SET combinedname='IN-IND-India' WHERE code='IN';

Giờ đây, bảng tham chiếu trong cơ sở dữ liệu nguồn Aurora MySQL đã được cập nhật. Sau đó, các thay đổi sẽ tự động được sao chép vào bảng tham chiếu trong DynamoDB.

Đã cập nhật bảng tham chiếu DynamoDB

Các bảng sau hiển thị các bản ghi trong data_frame, country_lookup_dffinal_frame. Trong country_lookup_dffinal_frame, Các combinedname cột có các giá trị được định dạng là <2-digit-country-code>-<3-digit-country-code>-<country-name>, điều này cho thấy rằng các bản ghi đã thay đổi trong bảng được tham chiếu được phản ánh trong bảng mà không cần khởi động lại công việc truyền phát AWS Glue. Điều đó có nghĩa là công việc AWS Glue sẽ kết hợp thành công các bản ghi đến từ luồng dữ liệu Kinesis với bảng tham chiếu ngay cả khi bảng tham chiếu đang thay đổi.
Đã cập nhật đầu ra nhật ký công việc dán keo

Truy vấn bảng Hudi bằng Athena

Hãy truy vấn bảng Hudi bằng Athena để xem các bản ghi trong bảng đích. Hoàn thành các bước sau:

  1. Đảm bảo rằng tập lệnh và công việc AWS Glue Streaming vẫn đang hoạt động:
    1. Tập lệnh Python (generate-data-for-kds.py) vẫn đang chạy.
    2. Dữ liệu được tạo đang được gửi đến luồng dữ liệu.
    3. Công việc phát trực tuyến AWS Glue vẫn đang chạy.
  2. Trên bảng điều khiển Athena, hãy chạy SQL sau trong trình chỉnh sửa truy vấn:
select shipdate, unitssold, impressiontime, code,combinedname from <database>.<table>
where combinedname is not null
limit 10;

Kết quả truy vấn sau đây hiển thị các bản ghi được xử lý trước khi thay đổi bảng được tham chiếu. Kỷ lục trong combinedname cột tương tự như <2-digit-country-code>-<country-name>.

Kết quả truy vấn Athena ban đầu

Kết quả truy vấn sau đây hiển thị các bản ghi được xử lý sau khi thay đổi bảng được tham chiếu. Kỷ lục trong combinedname cột tương tự như <2-digit-country-code>-<3-digit-country-code>-<country-name>.

Đã cập nhật kết quả truy vấn Athena

Bây giờ, bạn hiểu rằng dữ liệu tham chiếu đã thay đổi được phản ánh thành công trong bảng Hudi đích kết hợp các bản ghi từ luồng dữ liệu Kinesis và dữ liệu tham chiếu trong DynamoDB.

Làm sạch

Bước cuối cùng, dọn sạch tài nguyên:

  1. Xóa luồng dữ liệu Kinesis.
  2. Xóa nhiệm vụ di chuyển AWS DMS, điểm cuối và phiên bản sao chép.
  3. Dừng và xóa tác vụ phát trực tuyến AWS Glue.
  4. Xóa môi trường AWS Cloud9.
  5. Xóa mẫu CloudFormation.

Kết luận

Việc xây dựng và duy trì hồ dữ liệu giao dịch liên quan đến việc nhập và xử lý dữ liệu theo thời gian thực có nhiều thành phần khác nhau và các quyết định cần được đưa ra, chẳng hạn như sử dụng dịch vụ nhập nào, cách lưu trữ dữ liệu tham chiếu của bạn và sử dụng khung hồ dữ liệu giao dịch nào. Trong bài đăng này, chúng tôi đã cung cấp chi tiết triển khai của một quy trình như vậy, sử dụng các thành phần gốc của AWS làm khối xây dựng và Apache Hudi làm khung nguồn mở cho hồ dữ liệu giao dịch.

Chúng tôi tin rằng giải pháp này có thể là điểm khởi đầu cho các tổ chức muốn triển khai hồ dữ liệu mới với các yêu cầu như vậy. Ngoài ra, các thành phần khác nhau hoàn toàn có thể cắm được và có thể được kết hợp và khớp với các hồ dữ liệu hiện có để nhắm mục tiêu các yêu cầu mới hoặc di chuyển các yêu cầu hiện có, giải quyết các điểm khó khăn của chúng.


Giới thiệu về tác giả

Manish Kola là Kiến trúc sư giải pháp phòng thí nghiệm dữ liệu tại AWS, nơi anh hợp tác chặt chẽ với khách hàng trong nhiều ngành khác nhau để kiến ​​trúc các giải pháp dựa trên đám mây cho nhu cầu phân tích dữ liệu và AI của họ. Anh ấy hợp tác với khách hàng trên hành trình AWS của họ để giải quyết các vấn đề kinh doanh của họ và xây dựng các nguyên mẫu có thể mở rộng. Trước khi gia nhập AWS, kinh nghiệm của Manish bao gồm giúp khách hàng triển khai các dự án kho dữ liệu, BI, tích hợp dữ liệu và hồ dữ liệu.

Santosh Kotagiri là Kiến trúc sư giải pháp tại AWS có kinh nghiệm về phân tích dữ liệu và giải pháp đám mây dẫn đến kết quả kinh doanh hữu hình. Chuyên môn của ông nằm ở việc thiết kế và triển khai các giải pháp phân tích dữ liệu có thể mở rộng cho khách hàng trong các ngành, tập trung vào các dịch vụ mã nguồn mở và dựa trên nền tảng đám mây. Anh đam mê tận dụng công nghệ để thúc đẩy tăng trưởng kinh doanh và giải quyết các vấn đề phức tạp.

Chiho Sugimoto là Kỹ sư hỗ trợ đám mây trong nhóm Hỗ trợ dữ liệu lớn AWS. Cô ấy đam mê giúp khách hàng xây dựng hồ dữ liệu bằng cách sử dụng khối lượng công việc ETL. Cô ấy yêu khoa học hành tinh và thích nghiên cứu tiểu hành tinh Ryugu vào cuối tuần.

Noritaka Sekiyama là Kiến trúc sư dữ liệu lớn chính trong nhóm AWS Glue. Ông chịu trách nhiệm xây dựng các hiện vật phần mềm để giúp đỡ khách hàng. Khi rảnh rỗi, anh ấy thích đạp xe bằng chiếc xe đạp địa hình mới của mình.

tại chỗ_img

Tin tức mới nhất

tại chỗ_img

Trò chuyện trực tiếp với chúng tôi (chat)

Chào bạn! Làm thế nào để tôi giúp bạn?