Zephyrnet Logosu

AWS Glue, AWS DMS ve Amazon DynamoDB kullanarak gerçek zamanlı sunucusuz veri analitiği için CDC verileriyle bir akış veri kaynağına katılın | Amazon Web Hizmetleri

Tarih:

Müşteriler, geleneksel analitik görevlerini yerine getirmek için veri ambarı çözümleri kullanıyor. Son zamanlarda veri gölleri, ölçeklenebilirlik, hataya dayanıklılık ve yapılandırılmış, yarı yapılandırılmış ve yapılandırılmamış veri kümeleri için destek gibi avantajlarla birlikte geldiklerinden, analitik çözümlerin temeli olmak için büyük ilgi topladı.

Veri gölleri varsayılan olarak işlemsel değildir; ancak, veri göllerini ACID özellikleriyle zenginleştiren ve işlemsel ve işlemsel olmayan depolama mekanizmaları arasında her iki dünyanın da en iyisini sunan çok sayıda açık kaynaklı çerçeve vardır.

Veri temizleme ve referans verilerle birleştirme gibi işlemleri içeren geleneksel toplu alım ve işlem ardışık düzenlerinin oluşturulması basittir ve sürdürülmesi uygun maliyetlidir. Ancak, Nesnelerin İnterneti (IoT) ve tıklama akışları gibi veri kümelerini neredeyse gerçek zamanlı teslimat SLA'ları ile hızlı bir oranda almanın zorluğu vardır. Ayrıca, kaynak sistemden hedefe değişiklik verisi yakalama (CDC) ile artımlı güncellemeler uygulamak isteyeceksiniz. Zamanında veriye dayalı kararlar almak için, kaçırılan kayıtları ve karşı basıncı hesaba katmanız ve özellikle referans verileri de hızla değişiyorsa, olay sıralamasını ve bütünlüğünü korumanız gerekir.

Bu yazıda, bu zorlukları ele almayı amaçlıyoruz. Akış verilerini kullanarak gerçek zamanlı olarak değişen bir referans tablosuna birleştirmek için adım adım bir kılavuz sunuyoruz. AWS Tutkal, Amazon DinamoDB, ve AWS Veritabanı Geçiş Hizmeti (AWS DMS'si). Ayrıca, kullanarak bir işlemsel veri gölüne akış verilerinin nasıl alınacağını da gösteriyoruz. Apaçi Hudi ACID işlemleriyle artımlı güncellemeler elde etmek için.

Çözüme genel bakış

Örnek kullanım durumumuz için, akış verileri geliyor Amazon Kinesis Veri Akışlarıve referans verileri MySQL'de yönetilir. Referans verileri, AWS DMS aracılığıyla MySQL'den DynamoDB'ye sürekli olarak çoğaltılır. Buradaki gereklilik, gerçek zamanlı akış verilerini, referans verilerle neredeyse gerçek zamanlı olarak birleştirerek zenginleştirmek ve aşağıdaki gibi bir sorgu motorundan sorgulanabilir hale getirmektir. Amazon Atina tutarlılığı korurken. Bu kullanım durumunda, gereksinim değiştiğinde MySQL'deki referans verileri güncellenebilir ve ardından sorguların, referans verilerdeki güncellemeleri yansıtarak sonuçları döndürmesi gerekir.

Bu çözüm, referans veri setinin boyutu küçük olduğunda değişen referans veri setleriyle akışlara katılmak isteyen kullanıcıların sorununa yöneliktir. Referans verileri DynamoDB tablolarında tutulur ve akış işi, yüksek verimli bir akışı küçük bir referans veri kümesiyle birleştirerek her bir mikro parti için tüm tabloyu belleğe yükler.

Aşağıdaki şemada çözüm mimarisi gösterilmektedir.

mimari

Önkoşullar

Bu izlenecek yol için aşağıdaki ön koşullara sahip olmalısınız:

IAM rolleri ve S3 grubu oluşturma

Bu bölümde, bir Amazon Basit Depolama Hizmeti (Amazon S3) kovası ve iki AWS Kimlik ve Erişim Yönetimi (IAM) rolleri: biri AWS Glue işi için, diğeri ise AWS DMS içindir. Bunu kullanarak yapıyoruz AWS CloudFormation şablon. Aşağıdaki adımları tamamlayın:

  1. AWS CloudFormation konsolunda oturum açın.
  2. Klinik Yığını Başlat::
  3. Klinik Sonraki.
  4. İçin Yığın adı, yığınız için bir ad girin.
  5. İçin DinamoDBTabloAdı, girmek tgt_country_lookup_table. Bu, yeni DynamoDB tablonuzun adıdır.
  6. İçin S3BucketNamePrefix, yeni S3 klasörünüzün ön ekini girin.
  7. seç AWS CloudFormation'ın özel adlarla IAM kaynakları oluşturabileceğini kabul ediyorum.
  8. Klinik Yığın oluştur.

Yığın oluşturma yaklaşık 1 dakika sürebilir.

Kinesis veri akışı oluşturun

Bu bölümde, bir Kinesis veri akışı oluşturacaksınız:

  1. Kinesis konsolunda, Veri akışları Gezinti bölmesinde.
  2. Klinik Veri akışı oluşturun.
  3. İçin Veri akışı adı, akış adınızı girin.
  4. Kalan ayarları varsayılan olarak bırakın ve Veri akışı oluşturun.

İsteğe bağlı modda bir Kinesis veri akışı oluşturulur.

Bir Aurora MySQL kümesi oluşturun ve yapılandırın

Bu bölümde, kaynak veritabanı olarak bir Aurora MySQL kümesi oluşturacak ve yapılandıracaksınız. Birinci, CDC'yi etkinleştirmek için kaynak Aurora MySQL veritabanı kümenizi yapılandırın AWS DMS aracılığıyla DynamoDB'ye.

Bir parametre grubu oluşturun

Yeni bir parametre grubu oluşturmak için aşağıdaki adımları tamamlayın:

  1. Amazon RDS konsolunda, Parametre grupları Gezinti bölmesinde.
  2. Klinik Parametre grubu oluştur.
  3. İçin Parametre grubu ailesiseçin aurora-mysql5.7.
  4. İçin Tip, seçmek DB Kümesi Parametre Grubu.
  5. İçin Grup ismi, girmek my-mysql-dynamodb-cdc.
  6. İçin Açıklama, girmek Parameter group for demo Aurora MySQL database.
  7. Klinik oluşturmak.
  8. seç my-mysql-dynamodb-cdc, ve Seç Düzenle altında Parametre grubu işlemleri.
  9. Parametre grubunu aşağıdaki gibi düzenleyin:
Name Özellik
binlog_row_image tam
binlog_format SIRA
binlog_checksum YOK
log_slave_updates 1
  1. Klinik Değişiklikleri Kaydet.

RDS parametre grubu

Aurora MySQL kümesini oluşturun

Aurora MySQL kümesini oluşturmak için aşağıdaki adımları tamamlayın:

  1. Amazon RDS konsolunda, veritabanları Gezinti bölmesinde.
  2. Klinik Veritabanı oluştur.
  3. İçin Bir veritabanı oluşturma yöntemi seçin, seçmek standart oluşturma.
  4. Altında motor seçenekleri, Için Motor tipi, seçmek Aurora (MySQL Uyumlu).
  5. İçin Motor versiyonu, seçmek Aurora (MySQL 5.7) 2.11.2.
  6. İçin Şablonlar, seçmek üretim.
  7. Altında Ayarlar, Için DB kümesi tanımlayıcısı, veritabanınız için bir ad girin.
  8. İçin ana kullanıcı adı, birincil kullanıcı adınızı girin.
  9. İçin Ana parola ve Ana şifreyi onaylayın, birincil şifrenizi girin.
  10. Altında Örnek yapılandırma, Için veritabanı bulut sunucusu sınıfı, seçmek Burstable sınıfları (t sınıfları içerir) Ve seç db.t3.küçük.
  11. Altında Kullanılabilirlik ve dayanıklılık, Için Multi-AZ dağıtımı, seçmek Bir Aurora Replikası oluşturmayın.
  12. Altında Bağlantı, Için İşlem kaynağı, seçmek Bir EC2 bilgi işlem kaynağına bağlanmayın.
  13. İçin Ağ türü, seçmek IPv4.
  14. İçin Sanal özel bulut (VPC), VPC'nizi seçin.
  15. İçin DB alt ağ grubu, genel alt ağınızı seçin.
  16. İçin Kamu erişim, seçmek Evet.
  17. İçin VPC güvenlik grubu (güvenlik duvarı), genel alt ağınız için güvenlik grubunu seçin.
  18. Altında Veritabanı kimlik doğrulaması, Için Veritabanı kimlik doğrulama seçenekleri, seçmek Parola kimlik doğrulaması.
  19. Altında Ek yapılandırma, Için DB kümesi parametre grubu, daha önce oluşturduğunuz küme parametre grubunu seçin.
  20. Klinik Veritabanı oluştur.

Kaynak veritabanına izin verme

Bir sonraki adım, kaynak Aurora MySQL veritabanında gerekli izni vermektir. Artık DB kümesine şunu kullanarak bağlanabilirsiniz: MySQL yardımcı programı. Aşağıdaki görevleri tamamlamak için sorgular çalıştırabilirsiniz:

  • Bir demo veritabanı ve tablo oluşturun ve veriler üzerinde sorgular çalıştırın
  • AWS DMS uç noktası tarafından kullanılan bir kullanıcı için izin verme

Aşağıdaki adımları tamamlayın:

  1. Veritabanı kümenize bağlanmak için kullandığınız EC2 bulut sunucusunda oturum açın.
  2. Veritabanı kümenizin birincil veritabanı eşgörünümüne bağlanmak için komut istemine aşağıdaki komutu girin:
$ mysql -h mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com -P 3306 -u admin -p

  1. Bir veritabanı oluşturmak için aşağıdaki SQL komutunu çalıştırın:
> CREATE DATABASE mydev;

  1. Bir tablo oluşturmak için aşağıdaki SQL komutunu çalıştırın:
> use mydev; > CREATE TABLE country_lookup_table
(
code varchar(5),
countryname varchar(40) not null,
combinedname varchar(40) not null
);

  1. Tabloyu verilerle doldurmak için aşağıdaki SQL komutunu çalıştırın:
> INSERT INTO country_lookup_table(code, countryname, combinedname) VALUES ('IN', 'India', 'IN-India'), ('US', 'USA', 'US-USA'), ('CA', 'Canada', 'CA-Canada'), ('CN', 'China', 'CN-China');

  1. AWS DMS uç noktası için bir kullanıcı oluşturmak üzere aşağıdaki SQL komutunu çalıştırın ve CDC görevleri için izin verme (yer tutucuyu tercih ettiğiniz parola ile değiştirin):
> CREATE USER repl IDENTIFIED BY '<your-password>';
> GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'repl'@'%';
> GRANT SELECT ON mydev.country_lookup_table TO 'repl'@'%';

DynamoDB referans tablosuna veri yüklemek için AWS DMS kaynakları oluşturun ve yapılandırın

Bu bölümde, verileri DynamoDB referans tablosuna çoğaltmak için AWS DMS oluşturup yapılandıracaksınız.

Bir AWS DMS çoğaltma örneği oluşturun

Öncelikle, aşağıdaki adımları tamamlayarak bir AWS DMS çoğaltma örneği oluşturun:

  1. AWS DMS konsolunda seçin Çoğaltma örnekleri Gezinti bölmesinde.
  2. Klinik Çoğaltma örneği oluştur.
  3. Altında Ayarlar, Için Name, örneğiniz için bir ad girin.
  4. Altında Örnek yapılandırma, Için Yüksek kullanılabilirlik, seçmek Geliştirme veya test iş yükü (Single-AZ).
  5. Altında Bağlantı ve güvenlik, Için VPC güvenlik grupları, seçmek varsayılan.
  6. Klinik Çoğaltma örneği oluştur.

Amazon VPC uç noktaları oluşturun

İsteğe bağlı olarak oluşturabilirsiniz DynamoDB için Amazon VPC uç noktaları özel bir ağdaki AWS DMS örneğinden DynamoDB tablonuza bağlanmanız gerektiğinde. Ayrıca etkinleştirdiğinizden emin olun. Herkese açık VPC'nizin dışında bir veritabanına bağlanmanız gerektiğinde.

Bir AWS DMS kaynak uç noktası oluşturun

Aşağıdaki adımları tamamlayarak bir AWS DMS kaynak uç noktası oluşturun:

  1. AWS DMS konsolunda seçin Uç noktalar Gezinti bölmesinde.
  2. Klinik Bitiş noktası oluştur.
  3. İçin Uç nokta türü, seçmek Kaynak uç noktası.
  4. Altında Uç nokta yapılandırması, Için uç nokta tanımlayıcısı, uç noktanız için bir ad girin.
  5. İçin Kaynak motor, seçmek Amazon Aurora MySQL'i.
  6. İçin Uç nokta veritabanına erişim, seçmek Erişim bilgilerini manuel olarak sağlayın.
  7. İçin Sunucu Adı, Aurora yazıcı örneğinizin uç nokta adını girin (örneğin, mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com).
  8. İçin Liman, girmek 3306.
  9. İçin kullanıcı adı, AWS DMS göreviniz için bir kullanıcı adı girin.
  10. İçin Şifre, Bir parola girin.
  11. Klinik Bitiş noktası oluştur.

Bir AWS DMS hedef uç noktası oluşturun

Aşağıdaki adımları tamamlayarak bir AWS DMS hedef uç noktası oluşturun:

  1. AWS DMS konsolunda seçin Uç noktalar Gezinti bölmesinde.
  2. Klinik Bitiş noktası oluştur.
  3. İçin Uç nokta türü, seçmek Hedef uç nokta.
  4. Altında Uç nokta yapılandırması, Için uç nokta tanımlayıcısı, uç noktanız için bir ad girin.
  5. İçin Hedef motor, seçmek Amazon DinamoDB.
  6. İçin Hizmet erişim rolü ARN, AWS DMS göreviniz için IAM rolünü girin.
  7. Klinik Bitiş noktası oluştur.

AWS DMS geçiş görevleri oluşturma

Aşağıdaki adımları tamamlayarak AWS DMS veritabanı geçiş görevleri oluşturun:

  1. AWS DMS konsolunda seçin Veritabanı taşıma görevleri Gezinti bölmesinde.
  2. Klinik Görev oluştur.
  3. Altında Görev yapılandırması, Için görev tanımlayıcı, göreviniz için bir ad girin.
  4. İçin Çoğaltma örneği, çoğaltma örneğinizi seçin.
  5. İçin Kaynak veritabanı uç noktası, kaynak uç noktanızı seçin.
  6. İçin Hedef veritabanı uç noktası, hedef uç noktanızı seçin.
  7. İçin Taşıma türü, seçmek Mevcut verileri taşıyın ve devam eden değişiklikleri çoğaltın.
  8. Altında Görev ayarları, Için Hedef tablo hazırlama modu, seçmek Hiçbir şey yapma.
  9. İçin Tam yük tamamlandıktan sonra görevi durdurun, seçmek durma.
  10. İçin LOB sütun ayarları, seçmek Sınırlı LOB modu.
  11. İçin Görev günlükleri, etkinleştirme CloudWatch günlüklerini aç ve Toplu iş için optimize edilmiş uygulamayı açın.
  12. Altında Tablo eşlemeleri, seçmek JSON Editörü ve aşağıdaki kuralları girin.

Burada sütuna değer ekleyebilirsiniz. Aşağıdaki kurallarla, AWS DMS CDC görevi önce içinde belirtilen ada sahip yeni bir DynamoDB tablosu oluşturacaktır. target-table-name. Sonra tüm kayıtları çoğaltacak, DB tablosundaki sütunları DynamoDB tablosundaki niteliklerle eşleştirme.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "mydev", "table-name": "country_lookup_table" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "2", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "mydev", "table-name": "country_lookup_table" }, "target-table-name": "tgt_country_lookup_table", "mapping-parameters": { "partition-key-name": "code", "sort-key-name": "countryname", "exclude-columns": [ "code", "countryname" ], "attribute-mappings": [ { "target-attribute-name": "code", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${code}" }, { "target-attribute-name": "countryname", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${countryname}" } ], "apply-during-cdc": true } } ]
}

DMS tablosu eşleme

  1. Klinik Görev oluştur.

Artık AWS DMS replikasyon görevi başlatılmıştır.

  1. Bekle Durum olarak göstermek yükleme tamamlandı.

DMS görevi

  1. DynamoDB konsolunda, tablolar Gezinti bölmesinde.
  2. DynamoDB referans tablosunu seçin ve Tablo öğelerini keşfedin Çoğaltılan kayıtları gözden geçirmek için.

DynamoDB referans tablosu başlangıcı

Bir AWS Glue Data Catalog tablosu ve bir AWS Glue akışlı ETL işi oluşturun

Bu bölümde, bir AWS Glue Data Catalog tablosu ve bir AWS Glue akış ayıklama, dönüştürme ve yükleme (ETL) işi oluşturacaksınız.

Veri Kataloğu tablosu oluşturma

Aşağıdaki adımlarla kaynak Kinesis veri akışı için bir AWS Glue Data Catalog tablosu oluşturun:

  1. AWS Glue konsolunda seçin veritabanları altında Veri Kataloğu Gezinti bölmesinde.
  2. Klinik Veritabanı ekle.
  3. İçin Name, girmek my_kinesis_db.
  4. Klinik Veritabanı oluştur.
  5. Klinik tablolar altında veritabanları, Daha sonra seçmek Tablo ekle.
  6. İçin Name, girmek my_stream_src_table.
  7. İçin veritabanı, seçmek my_kinesis_db.
  8. İçin kaynak türünü seçin, seçmek Kinesis.
  9. İçin Kinesis veri akışı şu konumda bulunur:, seçmek hesabım.
  10. İçin Kinesis akışı adı, veri akışınız için bir ad girin.
  11. İçin Sınıflandırmaseçin JSON.
  12. Klinik Sonraki.
  13. Klinik Şemayı JSON olarak düzenle, aşağıdaki JSON'u girin ve ardından İndirim.
[ { "Name": "uuid", "Type": "string", "Comment": "" }, { "Name": "country", "Type": "string", "Comment": "" }, { "Name": "itemtype", "Type": "string", "Comment": "" }, { "Name": "saleschannel", "Type": "string", "Comment": "" }, { "Name": "orderpriority", "Type": "string", "Comment": "" }, { "Name": "orderdate", "Type": "string", "Comment": "" }, { "Name": "region", "Type": "string", "Comment": "" }, { "Name": "shipdate", "Type": "string", "Comment": "" }, { "Name": "unitssold", "Type": "string", "Comment": "" }, { "Name": "unitprice", "Type": "string", "Comment": "" }, { "Name": "unitcost", "Type": "string", "Comment": "" }, { "Name": "totalrevenue", "Type": "string", "Comment": "" }, { "Name": "totalcost", "Type": "string", "Comment": "" }, { "Name": "totalprofit", "Type": "string", "Comment": "" }, { "Name": "impressiontime", "Type": "string", "Comment": "" }
]

Tutkal Kataloğu tablo şeması

    1. Klinik Sonraki, Daha sonra seçmek oluşturmak.

Bir AWS Glue akışı ETL işi oluşturun

Ardından, bir AWS Glue akış işi oluşturursunuz. AWS Glue 3.0 ve sonrası, yerel olarak Apache Hudi'yi destekler, bu yüzden bir Hudi tablosuna almak için bu yerel entegrasyonu kullanıyoruz. AWS Glue akış işini oluşturmak için aşağıdaki adımları tamamlayın:

  1. AWS Glue Studio konsolunda şunu seçin: Spark komut dosyası düzenleyicisi Ve seç oluşturmak.
  2. Altında İş detayları sekme için Name, işiniz için bir isim girin.
  3. İçin IAM Rolü, AWS Glue işiniz için IAM rolünü seçin.
  4. İçin Tipseçin Kıvılcım Akışı.
  5. İçin Tutkal versiyonu, seçmek Tutkal 4.0 – Spark 3.3, Scala 2, Python 3'ü destekler.
  6. İçin İstenen işçi sayısı, girmek 3.
  7. Altında Gelişmiş özellikler, Için İş parametreleri, seçmek Yeni parametre ekle.
  8. İçin anahtar, girmek --conf.
  9. İçin Özellik, girmek spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false.
  10. Klinik Yeni parametre ekle.
  11. İçin anahtar, girmek --datalake-formats.
  12. İçin Özellik, girmek hudi.
  13. İçin Komut dosyası yolu, girmek s3://<S3BucketName>/scripts/.
  14. İçin geçici yol, girmek s3://<S3BucketName>/temporary/.
  15. İsteğe bağlı olarak Spark UI günlük yolu, girmek s3://<S3BucketName>/sparkHistoryLogs/.

Tutkal işi parametresi

  1. Üzerinde Senaryo sekmesinde, aşağıdaki betiği AWS Glue Studio düzenleyicisine girin ve seçin oluşturmak.

Neredeyse gerçek zamanlı akış işi, bir Kinesis veri akışını sık sık güncellenen referans verilerini içeren bir DynamoDB tablosuyla birleştirerek verileri zenginleştirir. Zenginleştirilmiş veri kümesi, veri gölündeki hedef Hudi tablosuna yüklenir. Yer değiştirmek AWS CloudFormation aracılığıyla oluşturduğunuz klasörünüzle:

import sys, json
import boto3
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job args = getResolvedOptions(sys.argv,["JOB_NAME"]) # Initialize spark session and Glue context
sc = SparkContext() glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args) # job paramters
dydb_lookup_table = "tgt_country_lookup_table"
kin_src_database_name = "my_kinesis_db" kin_src_table_name = "my_stream_src_table" hudi_write_operation = "upsert" hudi_record_key = "uuid" hudi_precomb_key = "orderdate" checkpoint_path = "s3://<S3BucketName>/streamlab/checkpoint/" s3_output_folder = "s3://<S3BucketName>/output/"
hudi_table = "hudi_table"
hudi_database = "my_kinesis_db" # hudi options additional_options={ "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.write.recordkey.field": hudi_record_key, "hoodie.datasource.hive_sync.database": hudi_database, "hoodie.table.name": hudi_table, "hoodie.consistency.check.enabled": "true", "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.NonpartitionedKeyGenerator", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.NonPartitionedExtractor", "hoodie.datasource.write.hive_style_partitioning": "false", "hoodie.datasource.write.precombine.field": hudi_precomb_key, "hoodie.bulkinsert.shuffle.parallelism": "4", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.write.operation": hudi_write_operation, "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
} # Scan and load the reference data table from DynamoDB into AWS Glue DynamicFrames using boto3 API.
def readDynamoDb(): dynamodb = boto3.resource(“dynamodb”) table = dynamodb.Table(dydb_lookup_table) response = table.scan() items = response[“Items”] jsondata = sc.parallelize(items) lookupDf = glueContext.read.json(jsondata) return lookupDf # Load the Amazon Kinesis data stream from Amazon Glue Data Catalog.
source_df = glueContext.create_data_frame.from_catalog( database=kin_src_database_name, table_name=kin_src_table_name, transformation_ctx=”source_df”, additional_options={“startingPosition”: “TRIM_HORIZON”},
) # As part of batch processing, implement the transformation logic for joining streaming data frames with reference data frames.
def processBatch(data_frame, batchId): if data_frame.count() > 0: # Refresh the dymanodb table to pull latest snapshot for each microbatch country_lookup_df = readDynamoDb() final_frame = data_frame.join( country_lookup_df, data_frame["country"] == country_lookup_df["countryname"], 'left' ).drop( "countryname", "country", "unitprice", "unitcost", "totalrevenue", "totalcost", "totalprofit" ) # Script generated for node my-lab-hudi-connector final_frame.write.format("hudi") .options(**additional_options) .mode("append") .save(s3_output_folder) try: glueContext.forEachBatch( frame=source_df, batch_function=processBatch, options={"windowSize": "60 seconds", "checkpointLocation": checkpoint_path}, )
except Exception as e: print(f"Error is @@@ ....{e}")

  1. Klinik koşmak Akış işini başlatmak için

Aşağıdaki ekran görüntüsü, DataFrame'lerin örneklerini gösterir. data_frame, country_lookup_df, ve final_frame.

Yapıştırıcı iş günlük çıktısı ilk

AWS Glue işi, DynamoDB'deki Kinesis veri akışından ve referans tablosundan gelen kayıtları başarıyla birleştirdi ve ardından birleştirilen kayıtları Hudi formatında Amazon S3'e aldı.

Örnek veriler oluşturmak ve Kinesis veri akışına yüklemek için bir Python betiği oluşturun ve çalıştırın

Bu bölümde, örnek veriler oluşturmak ve bunu kaynak Kinesis veri akışına yüklemek için bir Python oluşturup çalıştıracaksınız. Aşağıdaki adımları tamamlayın:

  1. AWS Cloud9'da, EC2 bulut sunucunuzda veya veri akışınıza kayıt koyan diğer herhangi bir bilgi işlem ana bilgisayarında oturum açın.
  2. adlı bir Python dosyası oluşturun. generate-data-for-kds.py:
$ python3 generate-data-for-kds.py

  1. Python dosyasını açın ve aşağıdaki betiği girin:
import json
import random
import boto3
import time STREAM_NAME = "<mystreamname>" def get_data(): return { "uuid": random.randrange(0, 1000001, 1), "country": random.choice( [ "United Arab Emirates", "China", "India", "United Kingdom", "United States of America", ] ), "itemtype": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ), "saleschannel": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ), "orderpriority": random.choice(["H", "L", "M", "C"]), "orderdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12", "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", "7/15/14", "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", "1/27/17", "2/25/17", "3/10/17", "4/1/17", ] ), "region": random.choice( ["Asia" "Europe", "Americas", "Middle Eastern", "Africa"] ), "shipdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12", "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", "7/15/14", "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", "1/27/17", "2/25/17", "3/10/17", "4/1/17", ] ), "unitssold": random.choice( [ "8217", "3465", "8877", "2882", "70", "7044", "6307", "2384", "1327", "2572", "8794", "4131", "5793", "9091", "4314", "9085", "5270", "5459", "1982", "8245", "4860", "4656", "8072", "65", "7864", "9778", ] ), "unitprice": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ), "unitcost": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ), "totalrevenue": random.choice( [ "1253749.86", "712750.5", "3745117.53", "1925954.14", "30604", "1448950.8", "689228.96", "22242.72", "145014.56", "23996.76", "961008.32", "337626.63", "1478837.04", "6075242.57", "887389.8", "742517.05", "3431876.7", "3648085.93", "161988.86", "673863.85", "1240660.8", "380534.88", "882108.16", "16593.2", "5255275.28", "463966.1", ] ), "totalcost": random.choice( [ "800664.48", "405786.15", "3237353.13", "1448320.28", "18433.1", "824922.84", "226042.88", "16497.28", "47559.68", "17798.24", "315176.96", "234103.77", "923520.06", "4568591.14", "505212.54", "514846.95", "2766539.2", "2743365.86", "112319.94", "467244.15", "774781.2", "263855.52", "289300.48", "10362.3", "3951974.56", "310842.62", ] ), "totalprofit": random.choice( [ "453085.38", "306964.35", "507764.4", "477633.86", "12170.9", "624027.96", "463186.08", "5745.44", "97454.88", "6198.52", "645831.36", "103522.86", "555316.98", "1506651.43", "382177.26", "227670.1", "665337.5", "904720.07", "49668.92", "206619.7", "465879.6", "116679.36", "592807.68", "6230.9", "1303300.72", "153123.48", ] ), "impressiontime": random.choice( [ "2022-10-24T02:27:41Z", "2022-10-24T02:27:41Z", "2022-11-24T02:27:41Z", "2022-12-24T02:27:41Z", "2022-13-24T02:27:41Z", "2022-14-24T02:27:41Z", "2022-15-24T02:27:41Z", ] ), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) time.sleep(2) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

Bu komut dosyası, her 2 saniyede bir Kinesis veri akışı kaydı koyar.

Aurora MySQL kümesindeki referans tablosunun güncellenmesini simüle edin

Artık tüm kaynaklar ve yapılandırmalar hazır. Bu örnek için, eklemek istiyoruz 3 haneli ülke kodu referans tablosuna Değişiklikleri simüle etmek için Aurora MySQL tablosundaki kayıtları güncelleyelim. Aşağıdaki adımları tamamlayın:

  1. AWS Glue akış işinin zaten çalışıyor olduğundan emin olun.
  2. Daha önce açıklandığı gibi birincil veritabanı bulut sunucusuna yeniden bağlanın.
  3. Kayıtları güncellemek için SQL komutlarınızı girin:
> UPDATE country_lookup_table SET combinedname='US-USA-US' WHERE code='US';
> UPDATE country_lookup_table SET combinedname='CA-CAN-Canada' WHERE code='CA';
> UPDATE country_lookup_table SET combinedname='CN-CHN-China' WHERE code='CN';
> UPDATE country_lookup_table SET combinedname='IN-IND-India' WHERE code='IN';

Şimdi Aurora MySQL kaynak veritabanındaki referans tablosu güncellendi. Ardından, değişiklikler otomatik olarak DynamoDB'deki referans tablosuna kopyalanır.

DynamoDB referans tablosu güncellendi

Aşağıdaki tablolar, içindeki kayıtları göstermektedir. data_frame, country_lookup_df, ve final_frame. içinde country_lookup_df ve final_frame, combinedname sütun olarak biçimlendirilmiş değerlere sahip <2-digit-country-code>-<3-digit-country-code>-<country-name>, başvurulan tablodaki değiştirilen kayıtların AWS Glue akış işini yeniden başlatmadan tabloya yansıtıldığını gösterir. Bu, AWS Glue işinin Kinesis veri akışından gelen kayıtları, referans tablosu değişirken bile referans tablosuyla başarıyla birleştirdiği anlamına gelir.
Tutkal işi günlük çıktısı güncellendi

Athena kullanarak Hudi tablosunu sorgulama

Hedef tablodaki kayıtları görmek için Athena kullanarak Hudi tablosunu sorgulayalım. Aşağıdaki adımları tamamlayın:

  1. Komut dosyasının ve AWS Glue Streaming işinin hala çalıştığından emin olun:
    1. Python betiği (generate-data-for-kds.py) hala çalışıyor.
    2. Üretilen veriler veri akışına gönderiliyor.
    3. AWS Glue akış işi hala çalışıyor.
  2. Athena konsolunda, sorgu düzenleyicide aşağıdaki SQL'i çalıştırın:
select shipdate, unitssold, impressiontime, code,combinedname from <database>.<table>
where combinedname is not null
limit 10;

Aşağıdaki sorgu sonucu, başvurulan tablo değiştirilmeden önce işlenen kayıtları gösterir. Kayıtlar combinedname sütun benzer <2-digit-country-code>-<country-name>.

Athena sorgu sonucu ilk

Aşağıdaki sorgu sonucu, başvurulan tablo değiştirildikten sonra işlenen kayıtları gösterir. Kayıtlar combinedname sütun benzer <2-digit-country-code>-<3-digit-country-code>-<country-name>.

Athena sorgu sonucu güncellendi

Artık, değiştirilen referans verilerinin, Kinesis veri akışındaki kayıtları ve DynamoDB'deki referans verilerini birleştiren hedef Hudi tablosuna başarıyla yansıtıldığını anlıyorsunuz.

Temizlemek

Son adım olarak, kaynakları temizleyin:

  1. Kinesis veri akışını silin.
  2. AWS DMS geçiş görevini, uç noktayı ve çoğaltma örneğini silin.
  3. AWS Glue akış işini durdurun ve silin.
  4. AWS Cloud9 ortamını silin.
  5. CloudFormation şablonunu silin.

Sonuç

Gerçek zamanlı veri alımını ve işlenmesini içeren bir işlemsel veri gölü oluşturmak ve sürdürmek, hangi alım hizmetinin kullanılacağı, referans verilerinizin nasıl depolanacağı ve hangi işlemsel veri gölü çerçevesinin kullanılacağı gibi alınması gereken çok sayıda değişken bileşene ve karara sahiptir. Bu gönderide, işlemsel bir veri gölü için yapı taşları olarak AWS yerel bileşenlerini ve açık kaynak çerçevesi olarak Apache Hudi'yi kullanarak böyle bir işlem hattının uygulama ayrıntılarını sağladık.

Bu çözümün, bu tür gereksinimlerle yeni bir veri gölü uygulamak isteyen kuruluşlar için bir başlangıç ​​noktası olabileceğine inanıyoruz. Ek olarak, farklı bileşenler tamamen takılabilir ve yeni gereksinimleri hedeflemek veya mevcut olanları geçirerek sorunlu noktaları ele almak için mevcut veri gölleriyle karıştırılabilir ve eşleştirilebilir.


yazarlar hakkında

Maniş Kola AWS'de bir Veri Laboratuvarı Çözümleri Mimarıdır ve burada veri analitiği ve yapay zeka ihtiyaçları için bulutta yerel çözümler tasarlamak üzere çeşitli sektörlerdeki müşterilerle yakın işbirliği içinde çalışır. İş sorunlarını çözmek ve ölçeklenebilir prototipler oluşturmak için AWS yolculuklarında müşterilerle iş birliği yapıyor. Manish'in AWS'ye katılmadan önceki deneyimi, müşterilerin veri ambarı, BI, veri entegrasyonu ve veri gölü projelerini uygulamalarına yardımcı olmayı içerir.

Santosh Kotagiri AWS'de somut iş sonuçlarına götüren veri analitiği ve bulut çözümleri deneyimine sahip bir Çözüm Mimarıdır. Uzmanlığı, bulutta yerel ve açık kaynaklı hizmetlere odaklanarak, sektörlerdeki müşteriler için ölçeklenebilir veri analitiği çözümleri tasarlama ve uygulamada yatmaktadır. İşi büyütmek ve karmaşık sorunları çözmek için teknolojiden yararlanma konusunda tutkulu.

Çiho Sugimoto AWS Büyük Veri Destek ekibinde bir Bulut Destek Mühendisidir. Müşterilerin ETL iş yüklerini kullanarak veri gölleri oluşturmasına yardımcı olma konusunda tutkulu. Gezegen bilimini seviyor ve hafta sonları asteroit Ryugu'yu incelemekten hoşlanıyor.

Noritaka Sekiyama AWS Glue ekibinde Baş Büyük Veri Mimarıdır. Müşterilere yardımcı olmak için yazılım yapıları oluşturmaktan sorumludur. Boş zamanlarında yeni yol bisikletiyle bisiklete binmekten keyif alıyor.

spot_img

En Son İstihbarat

spot_img

Bizimle sohbet

Merhaba! Size nasıl yardım edebilirim?