Zephyrnet Logosu

Apache Airflow: Verileri ve E-postayı Dinamik Olarak Nasıl Alırsınız?

Tarih:

Bu makale, Veri Bilimi Blogathon.

Giriş

İş akışı yönetimi araçlarıyla gereksiz işleri otomatikleştirmek, önemli miktarda zaman ve kaynak tasarrufu sağlar. Apache Airflow şu anda iş akışı yönetimi araçlarında pazar lideridir. Airflow açık kaynaklıdır ve çeşitli harici hizmetleri kapsayan birçok operatör, kanca, sensör ve çok daha fazlasıyla önceden paketlenmiş olarak gelir.

Kaynak: Andrew Pons'ın Unsplash'taki fotoğrafı

Airflow, python topluluğu tarafından geliştirilen ve anlam değerlerini analiz etmek ve çıkarmak için çok sayıda veri kaynağının bağlanmasına izin veren bir platformdur. Airflow, DAG'leri kullanarak soyu korur ve verileri basitleştirir/ML mühendislerin işleri, kullanım durumlarını otomatikleştirilmiş iş akışlarına dönüştürmelerine olanak tanır.

Bu gönderi, kullanım durumuna özel eylemler gerçekleştirmek için ekiplere analiz etmek ve e-posta göndermek için çevrimiçi kaynaklardan verilerin nasıl çıkarılacağını göstermeyi amaçlamaktadır.

Önkoşullar

  • Hava akışı ve SMTP kurulumu.
  • python3

gereklilik

Kullanım örneği, veri ekiplerimizin verileri analiz etmek ve veriler etrafında modeller oluşturmak için verileri kaynaktan alması gerektiğini öngörür. Kaynak konumdaki dosyaların ilgili verileri içerip içermediklerini veya boş olup olmadıklarını belirlemek için ayrıştırılması gerekir.

Dosya içeriğinin mevcut olup olmadığına bağlı olarak, dosyaları veri ekiplerimize e-posta ile göndereceğiz.

apache hava akımı

İş Akışı Geliştirme

Dosyaları kaynaklardan okumak, güvenilir ve güvenli bir kimlik doğrulama mekanizması olmadan zor ve karmaşık olabilir. Daha geniş bir kitleye yönelik bu demo ile ilgili olarak, veri çıkarma işlemini daha basit hale getirmek için kamuya açık istatistiksel verileri güvenli bir platformdan çıkaracağız.

Verilerin analiz edilmeden önce referans olması için bir yerde saklanması gerekir. Basitlik ve uygulama kolaylığı için, airflow.cfg günlük yolunu kullanarak verilerimizi kaydetmek üzere bir dosya hedefi oluşturalım.

Apache Airflow, değerleri okumak için python ConfigParser'a benzer bir yapılandırma kitaplığı sağlar. Bu kütüphaneyi kullanarak elementin değer anahtarlarını alabiliriz.

airflow.configuration'dan istekleri içe aktar conf files_dir = conf.get("logging","base_log_folder") def get_file(url: str): resp = request.get(url) ile open(f"{files_dir}/file_name.csv) ","wb") olarak f: f.write(resp.content)

get_files yöntemimiz yerindeyken, bir sonraki adım dosya içeriğini kontrol etmektir. Aldığımız yaklaşım, dosyaları veri ekibine göndermeden önce dosyalarımızın veri içerdiğinden emin olmak için files_dir dosyamızdaki tüm dosyaları açmak, satırları okumak ve uzunluk için doğrulamaktır.

işletim sisteminden listdir'i os.path'den içe aktar isfile, join files = [f için f for listdir(files_dir) if isfile(join(files_dir, f))] def check_all_files(): get_file("https://stats.govt. nz/assets/Yüklemeler/Yıllık-kurumsal-anket/ Yıllık-kurumsal-anket-2021-finansal-yıl-geçici/Download-data/yıllık-kurumsal-anket-2021-finansal-yıl-geçici-boyut-bantları-csv. csv") get_file("https://stats.govt.nz/assets/Uploads/Business-operations-survey/ Business-operations-survey-2021/Download-data/bos2021ModC.csv") get_file("Business-operations- anket/İşletme-operasyonları-survey-2021/ Download-data/bos2021ModC.csv") def is_file_empty(dosya_adı: str): deneyin: dosya olarak open(file_name, 'r+') ile: data = [] dosyadaki satır için: data.append(line.replace('n', '')) if len(data) <= 1: True döndürürse: False döndürür, hariç: 'ERROR: FILE_NOT_FOUND' döndürür

check_all_files yöntemi, herkese açık veri kümelerini indirmek için yinelemeli olarak özel get_files yöntemini çağırır. Check_all_files içinde, indirilen tüm dosyaları tekrar tekrar açarak dosyaların uzunluğunu kontrol eden ve dosya uzunluğuna göre dosyanın durumunu döndüren bir alt yöntemimiz var.

Sonraki adım, check_all_files yöntemimizin dönüş türüne dayalı olarak e-posta uyarısını tetikleyecek bir mekanizma tasarlamaktır. Durum bilgisi için, is_empty_file yöntemi çağrı dönüş türümüzün boole eşdeğerini yakalamak için global değişken sonuçlarını koruyacağız. Şimdi dosyalar üzerinde dolaşacağız ve dönüş türünü sonuç listesine ekleyeceğiz. Dosyalarımızın içeriği olup olmadığını veya durumu günlüğe kaydetmek için boş olup olmadığını kontrol etmek için bu sonuç değişkenini kullanacağız.

sonuçlar = [] dosyalarda dosya için: global sonuçlar sonuçlar.append(is_file_empty(file)) if sonuçlar[0] == Doğru ve sonuçlar[1] == Doğru ve sonuçlar[2] == Doğru : 'GÜNCELLEME YOK' döndürür else: sonuç = [] dizin için, numaralandırmadaki veriler(sonuçlar): if veri == Yanlış: sonuç.append( dosyalar[dizin]) if len(sonuç) == 0: 'ERROR: FILES_NOT_FOUND' döndürür başka: sonuç döndürür

Yukarıdaki işlevi bir PythonOperator kullanarak çağıracağız.

get_files=PythonOperator( task_id='get_files', python_callable=check_all_files )

Şimdi check_all_files koşulundan dönüş durumunu ve BranchPythonOperator airflow mimarisini kullanacağız. check_for_email yöntemi bir görev örneği bekler ve xcom_pull sınıfını kullanarak çalışma zamanı sırasında dosyaları dinamik olarak çeker.

def check_for_email(ti): check_files= ti.xcom_pull(task_ids='get_files',key='check_files') eğer check_files == "GÜNCELLEME YOK": 'no_update' döndürür elif check_files == 'HATA: FILES_NOT_FOUND_'_ bulunamadı: dönüş başka: 'email_alert' döndür

Branch python operatörü, koşullara göre operatörleri tetiklemek ve gerisini atlamak için mükemmel bir uyum olabilir. Operatör, bir task_id döndüren bir python_callable'ı kabul eder ve bu görev_id'ye başvurulur ve yöntemin dallanmasında ana öğe olarak kabul edilir.

check_if_file_is_empty = BranchPythonOperator( task_id="get_and_check_file_contents", python_callable=check_for_email, Provide_context=True )

Başarılı dallanmanın ardından, şube python operatörü aracılığıyla kukla operatöre veya PythonOperator'a gireceğiz. Ardından, e-postaları gerçek dosya içeriğine göre göndereceğiz. Dosyaları bellekten almak için buradaki görev örneğinden yararlanacağız. xcom_pull'dan alınan dosyaları dosyaların durumuyla ekleyerek, aşağıdaki gibi önceden tanımlanmış bir şablonla teslim edilecek bir e-postayı tetikleyebiliriz.

def email_actual_file(ti, **kwargs): check_all_files() pull_files = ti.xcom_pull(task_ids='get_files') fact_files = [] file_status = [] for (content_file, status) zip(pull_files, sonuçlar): if status = = False: fact_files.append(content_file) file_status.append(status) varsa(file_status) == False: email = EmailOperator( task_id="email_alert", to='[e-posta korumalı]', konu="ASP Fiyatlandırması, ASP yaya geçidi ve NOC için Günlük Güncellemede CMS Uyarısı", html_content='''
Merhaba, İşte bir güncelleme. Eki kontrol edin''', #dag = context.get("dag"), files=[*actual_files]) email.execute(context=kwargs)

Fonksiyonlar tanımlanır; şimdi, tüm operatörleri bir DAG'ye eklemek ve bunları yürütülmek üzere değiştirmek, kullanım durumumuzun bir sonraki adımıdır.

Apache Hava Akışı Dağı

Görev ve en iyi uygulama olduğu gibi, varsayılan argümanları tanımlayacağız ve dag içindeki operatörleri bildireceğiz. E-posta operatöründen birden çok dosya almak için render_template_as_native_obj bayrağı True olarak ayarlanmalıdır. Özel e-posta operatörü, bağlam olarak **kwargs bekliyor; çalışma zamanı sırasında **kwargs'ı kabul etmek için Provide_context bayrağını true olarak ayarlamamız gerekiyor.

airflow.operators.email_operator'dan airflow.operators.dummy_operator'dan EmailOperator'u airflow.operators.python'dan içe aktar BranchPythonOperator'ı airflow.models'den içe airflow.operators.python_operator'dan içe aktar Owner":"Jay", "email_on_failure": True, "start_date":"2022, 01, 01"} DAG(dag_id='cms-asp_pricing_pg',default_args=default_agrs, render_template_as_native_obj=Doğru,program_interval='@ ) dag olarak: get_files=PythonOperator( task_id='get_files', python_callable=check_all_files ) check_if_file_is_empty = BranchPythonOperator( task_id="get_and_check_file_contents", python_found_callable=check_con_not_to_date_found_up_to_date_no_de_e_de_tr_tr) task_id='file_not_found' ) email_alert = PythonOperator(task_id = "email_alert", python_callable=email_actual_file, Provide_context=True) get_files >> check_if_file_is _empty >> [no_update,file_not_found, email_alert]

Çıktı

apache hava akımı

Sonuç

Apache Airflow, oradaki en popüler iş akışı yönetim aracıdır. Kuruluşlar, teknoloji yığınlarına hava akışını benimsiyor ve iş akışlarını verimli bir şekilde düzenlemek ve otomatikleştirmek için hava akışını kritik ve kaçınılmaz bir platform olarak ele alıyor.

Gönderi, veri çıkarma elde etmek ve verileri e-posta yoluyla iletmek için Apache Airflow dag'ı nasıl geliştirebileceğimiz konusunda bizi heyecan verici bir yolculuğa çıkardı.

  • Herkese açık veri kümelerini indirmek, ayrıştırmak ve dosya içeriğini doğrulamak için bir mekanizma tasarladık.

  • Kullanım durumuna özel sorunumuzu çözmek için Airflow dal python operatöründen yararlanmak için bir yöntem geliştirdik.

  • Yalnızca veri içeren dosyaları filtrelemek ve e-postayla göndermek için bir yöntem geliştirdik.

  • Sonunda, tüm operatörleri birleştirdik ve bir DAG oluşturduk, python yöntemlerini çağrılabilir olarak geçtik ve görevleri birlikte çalışan bir DAG'ye zincirledik.

Bu makalede gösterilen medya Analytics Vidhya'ya ait değildir ve Yazarın takdirine bağlı olarak kullanılır.

spot_img

En Son İstihbarat

spot_img