Zephyrnet Logosu

AWS Glue ve Delta kullanarak bir veri gölünde yavaş değişen boyutları uygulayın

Tarih:

Bir veri ambarında, bir boyut kullanıcıların işle ilgili soruları yanıtlayabilmelerini sağlamak için gerçekleri ve ölçüleri kategorize eden bir yapıdır. Bir örnek vermek gerekirse, tipik bir satış alanında müşteri, zaman veya ürün boyutlardır ve satış işlemleri bir olgudur. Boyut içindeki öznitelikler zaman içinde değişebilir; bir müşteri adresini değiştirebilir, bir çalışan yüklenici pozisyonundan tam zamanlı bir pozisyona geçebilir veya bir üründe birden çok revizyon olabilir. A Yavaş yavaş değişen boyut (SCD), belirli bir süre içinde yavaşça değişebilen nispeten statik verileri içeren bir veri ambarı konseptidir. Veri ambarında tutulan üç ana SCD türü vardır: Tip 1 (geçmiş yok), Tip 2 (tam geçmiş) ve Tip 3 (sınırlı geçmiş). Değişiklik verisi yakalama (CDC), değişen veriler üzerinde bir işlem gerçekleştirilebilmesi için iki veritabanı yüklemesi arasında değişen verileri belirleme yeteneği sağlayan bir veritabanı özelliğidir.

Dünyanın dört bir yanındaki kuruluşlar veri platformlarını veri gölleriyle modernize ederken Amazon Basit Depolama Hizmeti (Amazon S3), veri göllerinde SCD'leri işlemek zor olabilir. Kaynak sistemler, veri gölü içinde işlenmek üzere değiştirilen verileri tanımlayan bir mekanizma sağlamadığında ve veri kaynağı bir veritabanı yerine yarı yapılandırılmışsa veri işlemeyi oldukça karmaşık hale getirdiğinde durum daha da zorlaşır. Tip 2 SCD'leri işlerken temel amaç, veri gölü içindeki değişiklikleri izlemek için veri kümesinin başlangıç ​​ve bitiş tarihlerini doğru bir şekilde tanımlamaktır çünkü bu, tüketen uygulamalar için belirli bir noktada raporlama yeteneği sağlar.

Bu gönderide, yarı yapılandırılmış bir kaynak (JSON) için değişen verilerin nasıl tanımlanacağını göstermeye ve tüm geçmiş veri değişikliklerini (SCD Tip 2) yakalayıp bunları kullanarak bir S3 veri gölünde depolamaya odaklanacağız. AWS Tutkal ve açık veri gölü formatı delta.io. Bu uygulama aşağıdaki kullanım durumlarını destekler:

  • Mevcut ve tam geçmiş kayıtları tanımlamak için başlangıç ​​ve bitiş tarihleri ​​ve veri gölündeki silinen kayıtları (mantıksal silmeler) tanımlamak için bir işaret ile İzleme Tipi 2 SCD'ler
  • gibi tüketim araçlarını kullanın. Amazon Atina tarihsel kayıtları sorunsuz bir şekilde sorgulamak için

Çözüme genel bakış

Bu gönderi, örnek bir çalışan veri kümesi kullanan uçtan uca bir kullanım durumuyla çözümü gösterir. Veri kümesi, kimlik, ad, adres, telefon numarası, yüklenici olup olmadığı ve daha fazlası gibi çalışan ayrıntılarını temsil eder. SCD uygulamasını göstermek için aşağıdaki varsayımları göz önünde bulundurun:

  • Veri mühendisliği ekibi, kayıtların tam anlık görüntüleri olan ve kaynak kayıt değişikliklerini belirleyecek herhangi bir mekanizma içermeyen günlük dosyalar alır.
  • Ekip, kaynaktan yeni, güncellenmiş ve silinmiş kayıtları belirlemek ve veri gölündeki geçmiş değişiklikleri korumak için SCD Tip 2 işlevselliğini uygulamakla görevlendirilmiştir.
  • Kaynak sistemler CDC yeteneğini sağlamadığından, yeni, güncellenmiş ve silinmiş kayıtları tanımlamak ve bunları veri gölü katmanında kalıcı kılmak için bir mekanizmanın geliştirilmesi gerekir.

Mimari şu şekilde uygulanır:

  • Kaynak sistemler, S3 iniş grubundaki dosyaları alır (bu adım, sağlanan veriler kullanılarak örnek kayıtlar oluşturularak taklit edilir) AWS Lambda iniş kovasına işlev)
  • Bir AWS Glue işi (Delta işi), kaynak veri dosyasını seçer ve önceki dosya yüklemesindeki değiştirilen verileri (yeni ekler, mevcut kayıtlarda yapılan güncellemeler ve kaynaktan silinen kayıtlar) S3 veri gölüne (işlenmiş katman grubu) işler.
  • Mimari, açık veri gölü biçimini (Delta) kullanır ve S3 veri gölünü, yeni değişiklikler güncellenebildiği, yeni ekler eklenebildiği ve kaynak silme işlemleri doğru bir şekilde tanımlanıp işaretlenebildiği için değiştirilebilir bir Delta Gölü olarak oluşturur. Birlikte delete_flag değer
  • Bir AWS Glue gezgini, Athena tarafından sorgulanabilen verileri kataloglar

Aşağıdaki şema mimarimizi göstermektedir.

Önkoşullar

Başlamadan önce, aşağıdaki ön koşullara sahip olduğunuzdan emin olun:

Çözümü dağıtın

Bu çözüm için, tekrarlanabilir konuşlandırmalara olanak sağlamak üzere mimaride yer alan hizmetleri ayarlayan bir CloudFormation şablonu sağlıyoruz. Bu şablon aşağıdaki kaynakları oluşturur:

  • İki S3 grubu: örnek çalışan verilerini depolamak için bir iniş grubu ve değişken veri gölü (Delta Gölü) için işlenmiş bir katman grubu
  • Örnek kayıtlar oluşturmak için bir Lambda işlevi
  • Kaynak verileri iniş grubundan işlenen kovaya kadar işlemek için bir AWS Glue ayıklama, dönüştürme ve yükleme (ETL) işi

Çözümü dağıtmak için aşağıdaki adımları tamamlayın:

  1. Klinik Yığını Başlat CloudFormation yığınını başlatmak için:

  1. Bir yığın adı girin.
  2. seç AWS CloudFormation'ın özel adlarla IAM kaynakları oluşturabileceğini kabul ediyorum.
  3. Klinik Yığın oluştur.

CloudFormation yığın dağıtımı tamamlandıktan sonra, aşağıdaki kaynakları not etmek için AWS CloudFormation konsoluna gidin. Çıkışlar sekmesi:

  • Veri gölü kaynakları – S3 kovaları scd-blog-landing-xxxx ve scd-blog-processed-xxxx (olarak adlandırılır scd-blog-landing ve scd-blog-processed bu yazının sonraki bölümlerinde)
  • Örnek kayıt oluşturucu Lambda işlevi - SampleDataGenaratorLambda-<CloudFormation Stack Name> (olarak adlandırılır SampleDataGeneratorLambda)
  • AWS Tutkal Veri Kataloğu veritabanı - deltalake_xxxxxx (olarak adlandırılır deltalake)
  • AWS Glue Delta işi - <CloudFormation-Stack-Name>-src-to-processed (olarak adlandırılır src-to-processed)

Hesabınızda CloudFormation yığınını dağıtmanın AWS kullanım ücretlerine tabi olduğunu unutmayın.

SCD Tip 2 uygulamasını test edin

Altyapı hazır olduğunda, genel çözüm tasarımını test etmeye ve çalışan veri kümesinden geçmiş kayıtları sorgulamaya hazırsınız. Bu gönderi, günlük olarak tam anlık görüntü verileri aldığınız gerçek bir müşteri kullanım durumu için uygulanmak üzere tasarlanmıştır. SCD uygulamasının aşağıdaki yönlerini test ediyoruz:

  • İlk yükleme için bir AWS Glue işi çalıştırın
  • Kaynakta herhangi bir değişikliğin olmadığı bir senaryoyu simüle edin
  • Yeni kayıtlar ekleyerek ve mevcut kayıtları değiştirerek ve silerek ekleme, güncelleme ve silme senaryolarını simüle edin
  • Silinen kaydın yeni bir ek olarak geri geldiği bir senaryoyu simüle edin

Örnek bir çalışan veri kümesi oluşturun

Çözümü test etmek için ve ilk veri alımınıza başlamadan önce veri kaynağının tanımlanması gerekir. Bu adımı basitleştirmek için az önce devreye aldığınız CloudFormation yığınında bir Lambda işlevi devreye alındı.

İşlevi açın ve varsayılan değerle bir test olayı yapılandırın. hello-world Aşağıdaki ekran görüntüsünde görüldüğü gibi şablon olayı JSON. Şablonda herhangi bir değişiklik yapmadan bir olay adı girin ve test olayını kaydedin.

Klinik test örnek kayıtları oluşturmak için Lambda işlevini çağıran bir test olayını çağırmak için.

Lambda işlevi başlatmayı tamamladığında, aşağıdaki örnek çalışan veri kümesini açılış paketinde görebileceksiniz.

AWS Glue işini çalıştırın

Yolda çalışan veri kümesini görüp görmediğinizi onaylayın s3://scd-blog-landing/dataset/employee/. Veri setini indirebilir ve VS Code gibi bir kod düzenleyicide açabilirsiniz. Aşağıda veri kümesinin bir örneği verilmiştir:

{"emp_id":1,"first_name":"Melissa","last_name":"Parks","Address":"19892 Williamson Causeway Suite 737nKarenborough, IN 11372","phone_number":"001-372-612-0684","isContractor":false}
{"emp_id":2,"first_name":"Laura","last_name":"Delgado","Address":"93922 Rachel Parkways Suite 717nKaylaville, GA 87563","phone_number":"001-759-461-3454x80784","isContractor":false}
{"emp_id":3,"first_name":"Luis","last_name":"Barnes","Address":"32386 Rojas SpringsnDicksonchester, DE 05474","phone_number":"127-420-4928","isContractor":false}
{"emp_id":4,"first_name":"Jonathan","last_name":"Wilson","Address":"682 Pace Springs Apt. 011nNew Wendy, GA 34212","phone_number":"761.925.0827","isContractor":true}
{"emp_id":5,"first_name":"Kelly","last_name":"Gomez","Address":"4780 Johnson TunnelnMichaelland, WI 22423","phone_number":"+1-303-418-4571","isContractor":false}
{"emp_id":6,"first_name":"Robert","last_name":"Smith","Address":"04171 Mitchell Springs Suite 748nNorth Juliaview, CT 87333","phone_number":"261-155-3071x3915","isContractor":true}
{"emp_id":7,"first_name":"Glenn","last_name":"Martinez","Address":"4913 Robert ViewsnWest Lisa, ND 75950","phone_number":"001-638-239-7320x4801","isContractor":false}
{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott ValleynGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}
{"emp_id":9,"first_name":"Karen","last_name":"Spencer","Address":"7284 Coleman Club Apt. 813nAndersonville, AS 86504","phone_number":"484-909-3127","isContractor":true}
{"emp_id":10,"first_name":"Daniel","last_name":"Foley","Address":"621 Sarah Lock Apt. 537nJessicaton, NH 95446","phone_number":"457-716-2354x4945","isContractor":true}
{"emp_id":11,"first_name":"Amy","last_name":"Stevens","Address":"94661 Young Lodge Suite 189nCynthiamouth, PR 01996","phone_number":"241.375.7901x6915","isContractor":true}
{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce MeadowsnLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":true}
{"emp_id":13,"first_name":"John","last_name":"Valdez","Address":"686 Brian Forges Suite 229nSullivanbury, MN 25872","phone_number":"+1-488-011-0464x95255","isContractor":false}
{"emp_id":14,"first_name":"Michael","last_name":"West","Address":"293 Jones Squares Apt. 997nNorth Amandabury, TN 03955","phone_number":"146.133.9890","isContractor":true}
{"emp_id":15,"first_name":"Perry","last_name":"Mcguire","Address":"2126 Joshua Forks Apt. 050nPort Angela, MD 25551","phone_number":"001-862-800-3814","isContractor":true}
{"emp_id":16,"first_name":"James","last_name":"Munoz","Address":"74019 Banks EstatesnEast Nicolefort, GU 45886","phone_number":"6532485982","isContractor":false}
{"emp_id":17,"first_name":"Todd","last_name":"Barton","Address":"2795 Kelly Shoal Apt. 500nWest Lindsaytown, TN 55404","phone_number":"079-583-6386","isContractor":true}
{"emp_id":18,"first_name":"Christopher","last_name":"Noble","Address":"Unit 7816 Box 9004nDPO AE 29282","phone_number":"215-060-7721","isContractor":true}
{"emp_id":19,"first_name":"Sandy","last_name":"Hunter","Address":"7251 Sarah CreeknWest Jasmine, CO 54252","phone_number":"8759007374","isContractor":false}
{"emp_id":20,"first_name":"Jennifer","last_name":"Ballard","Address":"77628 Owens Key Apt. 659nPort Victorstad, IN 02469","phone_number":"+1-137-420-7831x43286","isContractor":true}
{"emp_id":21,"first_name":"David","last_name":"Morris","Address":"192 Leslie Groves Apt. 930nWest Dylan, NY 04000","phone_number":"990.804.0382x305","isContractor":false}
{"emp_id":22,"first_name":"Paula","last_name":"Jones","Address":"045 Johnson Viaduct Apt. 732nNorrisstad, AL 12416","phone_number":"+1-193-919-7527x2207","isContractor":true}
{"emp_id":23,"first_name":"Lisa","last_name":"Thompson","Address":"1295 Judy Ports Suite 049nHowardstad, PA 11905","phone_number":"(623)577-5982x33215","isContractor":true}
{"emp_id":24,"first_name":"Vickie","last_name":"Johnson","Address":"5247 Jennifer Run Suite 297nGlenberg, NC 88615","phone_number":"708-367-4447x9366","isContractor":false}
{"emp_id":25,"first_name":"John","last_name":"Hamilton","Address":"5899 Barnes PlainnHarrisville, NC 43970","phone_number":"341-467-5286x20961","isContractor":false}

Veri kümesini indirin ve hazır tutun, çünkü veri kümesini eklemeleri, güncellemeleri ve silmeleri simüle etmek için gelecekteki kullanım durumları için değiştireceksiniz. Sizin için oluşturulan örnek veri kümesi, önceki örnekte gördüğünüzden tamamen farklı olacaktır.

İşi çalıştırmak için aşağıdaki adımları tamamlayın:

  1. AWS Glue konsolunda seçin Mesleki Öğretiler Gezinti bölmesinde.
  2. işi seç src-to-processed.
  3. Üzerinde Runs sekmesini seçin koşmak.

AWS Glue işi ilk kez çalıştırıldığında, iş çalışan veri kümesini iniş grubu yolundan okur ve verileri bir Delta tablosu olarak işlenen gruba alır.

İş tamamlandığında, ilk veri yüklemesini görmek için bir tarayıcı oluşturabilirsiniz. Aşağıdaki ekran görüntüsü, üzerinde bulunan veritabanını göstermektedir. veritabanları gidin.

  1. Klinik Tarayıcıları Gezinti bölmesinde.
  2. Klinik tarayıcı oluştur.

  1. Tarayıcınıza bir ad verin delta-lake-crawler, Daha sonra seçmek Sonraki.

  1. seç Henüz değil AWS Glue tablolarına zaten eşlenmiş veriler için.
  2. Klinik Veri kaynağı ekle.

  1. Üzerinde Veri kaynağı açılır menüden Delta Gölü.
  2. Delta tablosunun yolunu girin.
  3. seç Yerel tablolar oluşturma.
  4. Klinik Delta Lake veri kaynağı ekleme.

  1. Klinik Sonraki.

  1. CloudFormation şablonu tarafından oluşturulan rolü seçin, ardından Sonraki.

  1. CloudFormation şablonu tarafından oluşturulan veritabanını seçin, ardından Sonraki.

  1. Klinik tarayıcı oluştur.

  1. Tarayıcınızı seçin ve seçin koşmak.

Verileri sorgula

Tarayıcı tamamlandıktan sonra oluşturduğu tabloyu görebilirsiniz.

Verileri sorgulamak için aşağıdaki adımları tamamlayın:

  1. Çalışan tablosunu seçin ve İşlemler menü seç Veriyi gör.

Athena konsoluna yönlendirilirsiniz. En son Athena motoruna sahip değilseniz, en son Athena motoruyla yeni bir Athena çalışma grubu oluşturun.

  1. Altında Yönetim gezinme bölmesinde öğesini seçin. Çalışma Grupları.

  1. Klinik Çalışma grubu oluştur.

  1. Çalışma grubu için aşağıdaki gibi bir ad girin: DeltaWorkgroup.
  2. seç Athena SQL'i motor olarak ve seçin Athena motor versiyonu 3 için Sorgu motoru sürümü.

  1. Klinik Çalışma grubu oluştur.

  1. Çalışma grubunu oluşturduktan sonra, çalışma grubunu seçin (DeltaWorkgroup) Athena sorgu düzenleyicisindeki açılır menüde.

  1. Aşağıdaki sorguyu üzerinde çalıştırın employee tablosu:
SELECT * FROM "deltalake_2438fbd0"."employee";

Not: Yukarıdaki sorguyu çalıştırmadan önce CloudFormation çıktılarından doğru veritabanı adını güncelleyin.

gözlemleyebilirsiniz ki, employee tabloda 25 kayıt var. Aşağıdaki ekran görüntüsü, bazı örnek kayıtlarla birlikte toplam çalışan kayıtlarını göstermektedir.

Delta tablosu bir emp_key, her değişikliğe özeldir ve değişiklikleri izlemek için kullanılır. bu emp_key her ekleme, güncelleme ve silme işlemi için oluşturulur ve tek bir dosyaya ait tüm değişiklikleri bulmak için kullanılabilir. emp_id.

The emp_key aşağıdaki kodda gösterildiği gibi SHA256 karma algoritması kullanılarak oluşturulur:

df.withColumn("emp_key", sha2(concat_ws("||", col("emp_id"), col("first_name"), col("last_name"), col("Address"), col("phone_number"), col("isContractor")), 256))

Eklemeler, güncellemeler ve silmeler gerçekleştirin

Veri setinde değişiklik yapmadan önce aynı işi bir kez daha çalıştıralım. Kaynaktan gelen mevcut yükün ilk yük ile aynı olduğu ve hiçbir değişiklik olmadığı varsayıldığında, AWS Glue işi veri kümesinde herhangi bir değişiklik yapmamalıdır. İş tamamlandıktan sonra, öncekini çalıştırın Select Athena sorgu düzenleyicisinde sorgulayın ve aşağıdaki değerlere sahip 25 aktif kayıt olduğunu onaylayın:

  • Sütun içeren 25 kaydın tümü isCurrent=true
  • Sütun içeren 25 kaydın tümü end_date=Null
  • Sütun içeren 25 kaydın tümü delete_flag=false

Önceki işin bu değerlerle çalıştığını onayladıktan sonra, ilk veri setimizi aşağıdaki değişikliklerle değiştirelim:

  1. Değiştir isContractor bayrak false (Şuna değiştir true veri kümeniz zaten gösteriyorsa false) Için emp_id=12.
  2. Tüm satırı sil emp_id=8 (kaydı bir metin düzenleyicide kaydettiğinizden emin olun, çünkü bu kaydı başka bir kullanım durumunda kullanıyoruz).
  3. için satırı kopyala emp_id=25 ve yeni bir satır ekleyin. Değiştir emp_id olduğu 26ve diğer sütunların değerlerini de değiştirdiğinizden emin olun.

Bu değişiklikleri yaptıktan sonra, çalışan kaynak veri kümesi aşağıdaki koda benzer (okunabilirlik için, önceki üç adımda açıklandığı gibi yalnızca değiştirilen kayıtları dahil ettik):

{"emp_id":12,"first_name":"Nicholas","last_name":"Aguirre","Address":"7474 Joyce MeadowsnLake Billy, WA 40750","phone_number":"495.259.9738","isContractor":false}
{"emp_id":26,"first_name":"John-copied","last_name":"Hamilton-copied","Address":"6000 Barnes PlainnHarrisville-city, NC 5000","phone_number":"444-467-5286x20961","isContractor":true}

  1. Şimdi, değiştirilenleri yükleyin fake_emp_data.json aynı kaynak önekine dosya.

  1. Değiştirilen çalışan veri kümesini Amazon S3'e yükledikten sonra AWS Glue konsoluna gidin ve işi çalıştırın.
  2. İş tamamlandığında, Athena sorgu düzenleyicisinde aşağıdaki sorguyu çalıştırın ve aşağıdaki değerlere sahip toplam 27 kayıt olduğunu onaylayın:
SELECT * FROM "deltalake_2438fbd0"."employee";

Not: Yukarıdaki sorguyu çalıştırmadan önce CloudFormation çıktısından doğru veritabanı adını güncelleyin.

  1. Athena sorgu düzenleyicisinde başka bir sorgu çalıştırın ve aşağıdaki değerlerle döndürülen 4 kayıt olduğunu doğrulayın:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Not: Yukarıdaki sorguyu çalıştırmadan önce CloudFormation çıktısından doğru veritabanı adını güncelleyin.

için iki kayıt göreceksiniz. emp_id=12:

  • Bir emp_id=12 aşağıdaki değerlerle kayıt yapın (ilk yükün bir parçası olarak alınan kayıt için):
    • emp_key=44cebb094ef289670e2c9325d5f3e4ca18fdd53850b7ccd98d18c7a57cb6d4b4
    • isCurrent=false
    • delete_flag=false
    • end_date=’2023-03-02’
  • Bir ikinci emp_id=12 aşağıdaki değerlerle kayıt yapın (kaynağa yapılan değişikliğin bir parçası olarak alınan kayıt için):
    • emp_key=b60547d769e8757c3ebf9f5a1002d472dbebebc366bfbc119227220fb3a3b108
    • isCurrent=true
    • delete_flag=false
    • end_date=Null (veya boş dize)

İçin rekor emp_id=8 bu çalıştırmanın bir parçası olarak kaynakta silinmiş olanlar, değerlerde aşağıdaki değişikliklerle birlikte var olmaya devam edecek:

  • isCurrent=false
  • end_date=’2023-03-02’
  • delete_flag=true

Yeni çalışan kaydı aşağıdaki değerlerle eklenecektir:

  • emp_id=26
  • isCurrent=true
  • end_date=NULL (veya boş dize)
  • delete_flag=false

Unutmayın emp_key gerçek tablonuzdaki değerler burada örnek olarak verilenlerden farklı olabilir.

  1. Silme işlemleri için, yeni kaynak dosya ve emp_key ile iç birleştirme ile birlikte temel tablodan emp_id'yi kontrol ederiz.
  2. Koşul doğru olarak değerlendirilirse, emp_key çalışan temel tablosunun yeni emp_key güncellemelerine eşit olup olmadığını kontrol ederiz ve geçerli, silinmemiş kaydı alırız (isCurrent=true ve delete_flag=false).
  3. Yeni dosyadaki silme değişikliklerini, eşleşen tüm silme koşulu satırları için temel tabloyla birleştiriyoruz ve aşağıdakileri güncelliyoruz:
    1. isCurrent=false
    2. delete_flag=true
    3. end_date=current_date

Aşağıdaki koda bakın:

delete_join_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key"
delete_cond = "employee.emp_key == employeeUpdates.emp_key and employee.isCurrent = true and employeeUpdates.delete_flag = true" base_tbl.alias("employee") .merge(union_updates_dels.alias("employeeUpdates"), delete_join_cond) .whenMatchedUpdate(condition=delete_cond, set={"isCurrent": "false", "end_date": current_date(), "delete_flag": "true"}).execute()

  1. Hem güncellemeler hem de ekler için, temel tablonun şu koşulu kontrol ederiz: employee.emp_id eşittir new changes.emp_id ve employee.emp_key eşittir new changes.emp_key, yalnızca mevcut kayıtları alırken.
  2. Bu koşul şu şekilde değerlendirilirse true, daha sonra mevcut kaydı alırız (isCurrent=true ve delete_flag=false).
  3. Aşağıdakileri güncelleyerek değişiklikleri birleştiriyoruz:
    1. İkinci koşul olarak değerlendirilirse true:
      1. isCurrent=false
      2. end_date=current_date
    2. Veya ikinci koşul şu şekilde değerlendirilirse tüm satırı aşağıdaki gibi ekleriz: false:
      1. emp_id=new record’s emp_key
      2. emp_key=new record’s emp_key
      3. first_name=new record’s first_name
      4. last_name=new record’s last_name
      5. address=new record’s address
      6. phone_number=new record’s phone_number
      7. isContractor=new record’s isContractor
      8. start_date=current_date
      9. end_date=NULL (veya boş dize)
      10. isCurrent=true
      11. delete_flag=false

Aşağıdaki koda bakın:

upsert_cond = "employee.emp_id=employeeUpdates.emp_id and employee.emp_key = employeeUpdates.emp_key and employee.isCurrent = true"
upsert_update_cond = "employee.isCurrent = true and employeeUpdates.delete_flag = false" base_tbl.alias("employee").merge(union_updates_dels.alias("employeeUpdates"), upsert_cond) .whenMatchedUpdate(condition=upsert_update_cond, set={"isCurrent": "false", "end_date": current_date() }) .whenNotMatchedInsert( values={ "isCurrent": "true", "emp_id": "employeeUpdates.emp_id", "first_name": "employeeUpdates.first_name", "last_name": "employeeUpdates.last_name", "Address": "employeeUpdates.Address", "phone_number": "employeeUpdates.phone_number", "isContractor": "employeeUpdates.isContractor", "emp_key": "employeeUpdates.emp_key", "start_date": current_date(), "delete_flag": "employeeUpdates.delete_flag", "end_date": "null" }) .execute()

Son adım olarak, önceki değişiklikten silinen kaydı kaynak veri setine geri getirelim ve tekrar veri setine nasıl girdiğini görelim. employee veri gölündeki tabloyu inceleyin ve tüm geçmişin nasıl korunduğunu gözlemleyin.

Önceki adımda değiştirdiğimiz veri setimizi değiştirelim ve aşağıdaki değişiklikleri yapalım.

  1. silinenleri ekle emp_id=8 veri kümesine geri dön.

Bu değişiklikleri yaptıktan sonra, çalışan kaynak veri kümem aşağıdaki koda benziyor (okunabilirlik için, önceki adımda açıklandığı gibi yalnızca eklenen kaydı dahil ettik):

{"emp_id":8,"first_name":"Teresa","last_name":"Estrada","Address":"339 Scott ValleynGonzalesfort, PA 18212","phone_number":"435-600-3162","isContractor":false}

  1. Değiştirilen çalışan veri kümesi dosyasını aynı kaynak önekine yükleyin.
  2. Değiştirilenleri yükledikten sonra fake_emp_data.json veri kümesini Amazon S3'e aktarın, AWS Glue konsoluna gidin ve işi yeniden çalıştırın.
  3. İş tamamlandığında, Athena sorgu düzenleyicisinde aşağıdaki sorguyu çalıştırın ve aşağıdaki değerlere sahip toplam 28 kayıt olduğunu onaylayın:
SELECT * FROM "deltalake_2438fbd0"."employee";

Not: Yukarıdaki sorguyu çalıştırmadan önce CloudFormation çıktısından doğru veritabanı adını güncelleyin.

  1. Aşağıdaki sorguyu çalıştırın ve 5 kayıt olduğunu doğrulayın:
SELECT * FROM "AwsDataCatalog"."deltalake_2438fbd0"."employee" where emp_id in (8, 12, 26)
order by emp_id;

Not: Yukarıdaki sorguyu çalıştırmadan önce CloudFormation çıktısından doğru veritabanı adını güncelleyin.

için iki kayıt göreceksiniz. emp_id=8:

  • Bir emp_id=8 aşağıdaki değerlerle kayıt yapın (silinmiş olan eski kayıt):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=false
    • deleted_flag=true
    • end_date=’2023-03-02
  • Başka emp_id=8 aşağıdaki değerlerle kayıt yapın (son çalıştırmada eklenen yeni kayıt):
    • emp_key=536ba1ba5961da07863c6d19b7481310e64b58b4c02a89c30c0137a535dbf94d
    • isCurrent=true
    • deleted_flag=false
    • end_date=NULL (veya boş dize)

The emp_key gerçek tablonuzdaki değerler burada örnek olarak verilenlerden farklı olabilir. Ayrıca, bu, sonraki yüklemede herhangi bir değişiklik yapılmadan yeniden eklenen aynı silinmiş kayıt olduğundan, kayıtta herhangi bir değişiklik olmayacağını unutmayın. emp_key.

Son kullanıcı örnek sorguları

Aşağıda, çalışan değişiklik verileri geçmişinin raporlama için nasıl geçilebileceğini gösteren bazı örnek son kullanıcı sorguları yer almaktadır:

  • Sorgu 1 – Geçerli ayda kuruluştan ayrılan tüm çalışanların bir listesini alın (örneğin, Mart 2023).
SELECT * FROM "deltalake_2438fbd0"."employee" where delete_flag=true and date_format(CAST(end_date AS date),'%Y/%m') ='2023/03'

Not: Yukarıdaki sorguyu çalıştırmadan önce CloudFormation çıktısından doğru veritabanı adını güncelleyin.

Önceki sorgu, kuruluştan ayrılan iki çalışan kaydı döndürür.

  • Sorgu 2 – Geçerli ayda (örneğin, Mart 2023) kuruluşa katılan yeni çalışanların bir listesini alın.
SELECT * FROM "deltalake_2438fbd0"."employee" where date_format(start_date,'%Y/%m') ='2023/03' and iscurrent=true

Not: Yukarıdaki sorguyu çalıştırmadan önce CloudFormation çıktısından doğru veritabanı adını güncelleyin.

Önceki sorgu, kuruluşa katılan 23 aktif çalışan kaydını döndürür.

  • Sorgu 3 – Kuruluştaki herhangi bir çalışanın geçmişini bulun (bu durumda çalışan 18).
SELECT * FROM "deltalake_2438fbd0"."employee" where emp_id=18

Not: Yukarıdaki sorguyu çalıştırmadan önce CloudFormation çıktısından doğru veritabanı adını güncelleyin.

Önceki sorguda, çalışan 18'in kuruluştan ayrılmadan önce çalışan kayıtlarında iki değişiklik olduğunu gözlemleyebiliriz.

Bu örnekte sağlanan veri sonuçlarının, Lambda işlevi tarafından oluşturulan örnek verilere dayalı olarak belirli kayıtlarınızda göreceğinizden farklı olduğunu unutmayın.

Temizlemek

Bu çözümü denemeyi bitirdiğinizde, AWS ücretlerinin tahakkuk etmesini önlemek için kaynaklarınızı temizleyin:

  1. S3 kaplarını boşaltın.
  2. AWS CloudFormation konsolundan yığını silin.

Sonuç

Bu gönderide, kaynak sistemlerin AWS ile değişiklik verisi yakalama yeteneği sağlayamadığı durumlarda S2 Delta Lake'te yarı yapılandırılmış bir veri kaynağı için değişen verilerin nasıl tanımlanacağını ve geçmiş değişikliklerin (SCD Tip 3) nasıl korunacağını gösterdik. Zamk. Aşağı akış uygulamalarının veri gölünde yakalanan CDC verilerinden ek özelleştirmeler oluşturmasını sağlamak için bu çözümü daha da genişletebilirsiniz.

Ek olarak, bu çözümü kullanarak bir orkestrasyonun parçası olarak genişletebilirsiniz. AWS Basamak İşlevleri veya kuruluşunuzun aşina olduğu diğer yaygın olarak kullanılan düzenleyiciler. Ayrıca, uygun olan yerlerde bölümler ekleyerek bu çözümü genişletebilirsiniz. Delta tablosunu şu şekilde de koruyabilirsiniz: sıkıştırma küçük dosyalar.


yazarlar hakkında

Nith Govindasivan, AWS Profesyonel Hizmetlerine sahip bir Veri Gölü Mimarıdır ve burada Büyük Veri ve Analitik çözümlerini uygulayarak müşterilerin modern veri mimarisi yolculuklarına katılmalarına yardımcı olur. İş dışında, Nith hevesli bir Kriket hayranıdır, boş zamanlarında neredeyse tüm kriketleri izler ve uzun yolculuklardan ve uluslararası seyahatlerden hoşlanır.

Vijay Velpula AWS Profesyonel Hizmetlerine sahip bir Veri Mimarıdır. Müşterilerin Büyük Veri ve Analitik Çözümlerini uygulamalarına yardımcı olur. İş dışında ailesiyle vakit geçirmekten, seyahat etmekten, yürüyüş yapmaktan ve bisiklete binmekten hoşlanır.

Sriharsh Adari Amazon Web Services'de (AWS) Kıdemli Çözüm Mimarıdır ve müşterilerin AWS'de yenilikçi çözümler geliştirmek için iş sonuçlarından geriye doğru çalışmasına yardımcı olur. Yıllar boyunca, endüstri sektörlerinde veri platformu dönüşümlerinde birden fazla müşteriye yardımcı oldu. Temel uzmanlık alanları arasında Teknoloji Stratejisi, Veri Analitiği ve Veri Bilimi bulunmaktadır. Boş zamanlarında spor yapmaktan, aşırı derecede TV şovları izlemekten ve Tabla oynamaktan hoşlanır.

spot_img

En Son İstihbarat

spot_img