ゼファーネットのロゴ

ApacheAirflowを使用した優れたETLプラクティス

日付:

この記事は、の一部として公開されました データサイエンスブログ。

Apache エアフロー ETL

ETLの概要

ETLは、XNUMX段階のデータ統合の一種です。抽出、変換、読み込みは処理であり、複数のソースからのデータを結合するために使用されます。 ビッグデータの構築によく使用されます。 このプロセスでは、データはソースシステムからプル(抽出)され、分析可能な形式に移行され、ウェアハウスまたは他のシステムに保存されます。 抽出、ロード、変換(ELT)は、関連はあるものの、パフォーマンスを向上させるためにデータベースに処理をプッシュするように設計された代替アプローチです。

このガイドでは、ApacheAirflowプラットフォームを介して実装されたデータストリームを使用したETL実装のグッドプラクティスについて説明します。

  1. ETLとは何ですか?
  2. ETLの履歴
  3. いくつかのETLの利点
  4. ApacheAirflow
  5. ApacheAirflowをインストールする方法
  6. エアフロー構成
  7. エアフローDAG
  8. エアフローラン
  9. エアフローオルタナティブ
  10. まとめ
  11. 参考文献

 

ETLとは何ですか? 

データエンジニアリングから、パイプラインプロセスとしてETL(抽出、変換、読み込み)を理解できます。 ビッグデータの作成に先行する一連のプロセスとして。 ELTをツールとともに使用して、データベース、スプレッドシート、ビデオファイル、オーディオなどのさまざまなデータソースを統合します。 このプロセス中に、データはデータソースから抽出され、分析可能な形式に変換され、機械学習ツールによる将来のモデリングのためにビッグデータシステムに保存されます。

複数のソースからのデータを組み合わせるために使用されます。 これは通常、データウェアハウスを構築するために使用されます。 このプロセスでは、データはソースシステムからプル(抽出)され、分析可能な形式に変換されて、倉庫または他のシステムに保存されます。 抽出、ロード、変換(ELT)は、パフォーマンスを向上させるためにデータベースに処理をプッシュするように設計された、関連はあるものの、代替のアプローチです。

 

ETLの履歴

ETLの使用は、企業がさまざまな種類のビジネス情報を格納するために複数のリポジトリまたはデータベースを使用し始めた70年代に始まりました。 データベース全体に広がるデータを統合する必要性は急速に高まっています。 ETLの採用は、データウェアハウスまたはビッグデータにロードする前に、さまざまなソースからデータを抽出して移動するプロセスになりました。

1990年代初頭、データウェアハウスは爆発的に増加しました。 別のデータベースモデルとして、メインフレームコンピューター、ミニコンピューター、パーソナルコンピューター、スプレッドシートなど、複数のシステムからのデータへの統合アクセスを提供しました。 ただし、部門が異なれば、倉庫ごとに異なるETLツールを使用することがよくあります。 それを合併や買収に加えると、多くの企業は統合されていない個別のETLソリューションになってしまいます。

現在、データへのアクセスは飛躍的に増加しており、フォーマット、ソース、およびデータシステムの数はほぼ無限です。 ETLは今日、企業にとって重要なプロセスです。

 

いくつかのETLの利点

複数の企業がETLプロセスを採用して、最良のビジネス上の意思決定を促進するデータを取得しました。 今日でも、複数のシステムやソースからのデータを統合することは、ビッグデータのフィードです。

ビッグデータとともに使用すると、ETLは企業に完全な履歴コンテキストを提供します。 統合ビューの提供:

  • ETLを使用すると、ビジネスユーザーは、イニシアチブに関連するデータを簡単に分析してレポートできます。
  • ETLは、ストリーミングデータなどの新たな統合要件をサポートするように進化しました。
  • 組織は、データをマージし、精度を維持し、保存されたデータのコントローラーを提供し、レポートを作成し、分析を実行するために、ETLとELTの両方を必要とします。


Apache Airflow:ワークフロー管理プラットフォーム

Apache Airflow:ワークフロー管理プラットフォーム| ETL

https://airflow.apache.org

Airflowは、ワークフローをプログラムで作成、スケジュール、および監視するためにコミュニティによって作成されたプラットフォームです。

気流の原理:

  • スケーラブル
  • ダイナミック
  • 広範
  • エレガント

エアフロー機能:

  • 純粋なPython
  • 便利なUI
  • 堅牢な統合
  • オープンソース

いくつかの統合:

インテグレーション

https://aws.amazon.com/pt/blogs/media/managing-hybrid-video-processing-workflows-with-apache-airflow/

ApacheAirflowをインストールする方法 

エアフローのインストールとセットアップ

1.サンプルディレクトリ内にairflowディレクトリを作成します。

2. airflowディレクトリに移動し、dagsディレクトリを作成します。

3.イメージをダウンロードし、DockerでApacheAirflowオブジェクトを実行します

3位。 Windowsを使用している場合は、シェルターミナルを開き、次のコマンドを実行します。

 docker run -d -p 8080:8080 -v "$ PWD / airflow / dags:/ opt / airflow / dags /" --entrypoint = / bin / bash --name airflow apache / airflow:2.1.1-python3。 8 -c'(airflow db init && airflow users create --username admin --password bootcamp --firstname Andre --lastname Lastname --role Admin --email [メール保護]); エアフローウェブサーバーとエアフロースケジューラ

3b。 環境に必要なライブラリをインストールします。

以下のコマンドを実行して、エアフローコンテナに接続します。

 docker container exec -it airflowbash

次に、ライブラリをインストールします。

 pip install pymysql xlrd openpyxl minio

3c。 エラーがない場合は、Apache Airflowユーザーインターフェイスのアドレスにアクセスします(*ターミナルを開く前に約5分待ちます)。

 https://localhost:8080

エアフロー構成

次の変数を作成します。

data_lake_server = 172.17.0.4:9001 data_lake_login = minioadmin data_lake_password = minioadmin database_server = 172.17.0.2 database_login = root database_password = bootcamp database_name = employees

エアフローログイン

エアフローログイン| ETL

エアフロー–デモ

気流変数の設定

エアフロー構成

エアフロー構成| ETL
Apacheエアフロー変数の設定

エアフロー–デモ

デモ| ETL

エアフロー–デモ

エアフローDAG


DAG
A DAG (有向非巡回グラフ)は、エアフローのコアコンセプトであり、収集します タスク 一緒に、それらがどのように実行されるべきかを言うために依存関係と関係で組織化されます。

DAG

エアフロー–デモ

DAGPythonコード例

from datetime import datetime、date、timedelta import pandas as pd from io import BytesIO from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.operators.bash import BashOperator from airflow.models import Variable from minio import Minio from sqlalchemy.engine import create_engine DEFAULT_ARGS = {'owner':'Airflow'、'depends_on_past':False、'start_date':datetime(2021、1、13)、} dag = DAG('etl_department_salary_left_att'、default_args = DEFAULT_ARGS、schedule_interval = "@ once ")data_lake_server = Variable.get(" data_lake_server ")data_lake_login = Variable.get(" data_lake_login ")data_lake_password = Variable.get(" data_lake_password ")database_server = Variable.get(" database_server ")database_login = Variable.get(" database_login ")database_password = Variable.get(" database_password ")database_name = Variable.get(" database_name ")url_connection =" mysql + pymysql:// {}:{} @ {} / {} "。format(str(database_login) 、str(database_password)、str(database_se rver)、str(database_name))engine = create_engine(url_connection)client = Minio(data_lake_server、access_key = data_lake_login、secret_key = data_lake_password、secure = False)def extract():#クエリパラコンサルタントosdados。 query = "" "SELECT emp.department as department、sal.salary as salary、emp.left FROM employees emp INNER JOIN salaries sal ON emp.emp_no = sal.emp_id;" "" df_ = pd.read_sql_query(query、engine) #persisteosarquivosnaáreadeStaging。 df_.to_csv( "/ tmp / department_salary_left.csv"、index = False)def load():#carrega os dadosapartirdaáreadestaging。 df_ = pd.read_csv( "/ tmp / department_salary_left.csv")#converte os dados para oformatoparquet。 df_.to_parquet( "/tmp/department_salary_left.parquet"、index = False)#carrega os dados para oDataLake。 client.fput_object( "processing"、 "department_salary_left.parquet"、 "/tmp/department_salary_left.parquet")extract_task = PythonOperator(task_id ='extract_data_from_database'、provide_context = True、python_callable = extract、dag = dag)load_task = PythonOperator( ='load_file_to_data_lake'、provide_context = True、python_callable = load、dag = dag)clean_task = BashOperator(task_id = "clean_files_on_staging"、bash_command = "rm -f /tmp/*.csv;rm -f /tmp/*.json; rm -f /tmp/*.parquet; "、dag = dag)extract_task >> load_task >> clean_task

 

エアフローラン

 

エアフローラン

エアフロー–デモ

デモ

エアフロー–デモ

エアフローの代替– ステッチ

ほとんどの協会は、社内の情報収集からERPソフトウェアまで、地域の組み合わせでデータを処理しています。 資産と用事の全体像を把握するために、彼らはその多数のソースからデータ分散フォーカスまたはデータレイクにデータを移動し、それに対して評価を実行します。 いずれにせよ、彼らはデータパイプラインを形成して認識し続けることを好まないでしょう。

幸いなことに、すべてを社内でコーディングすることは重要ではありません。 これは、そのようなXNUMXつの楽器の相関関係です。

ApacheAirflowについて

Apache Airflowはオープンソースプロジェクトであり、設計者は作業プロセスを整理して、情報を引き出し、変更し、負担をかけ、保存することができます。

コネクタ:データソースと反対意見

デジタルテクノロジーエコシステムでは、いくつかのデバイスに非常に多様なデータとオブジェクトが含まれ、オブジェクトストレージに保存されます。オブジェクトストレージはデータレイクとして定義でき、これらのセットがビッグデータを構成します。

 

まとめ

我々は使用することができます ApacheAirflow、ビッグデータストリーミングを管理するためのワークフローツールとして、さまざまな環境と統合する機能を備えています。 使って ETLのベストプラクティス、 洞察とビジネス予測を生成する機械学習アルゴリズムの開発に集中できるようになります。

参考文献

  • https://hdsr.mitpress.mit.edu/pub/da99kl2q/release/2
  • https://hdsr.mitpress.mit.edu/pub/4vlrf0x2/release/1
  • https://link.springer.com/article/10.1057/jma.2015.5
  • https://link.springer.com/article/10.1057/jma.2015.5
  • https://airflow.apache.org/docs/
  • https://towardsdatascience.com/python-etl-tools-best-8-options-5ef731e70b49

著者リファレンス:

  1. githubの
  2. Twitter
  3. M

この記事に示されているメディアはAnalyticsVidhyaが所有しておらず、作成者の裁量で使用されています

PlatoAi。 Web3の再考。 増幅されたデータインテリジェンス。
アクセスするには、ここをクリックしてください。

ソース:https://www.analyticsvidhya.com/blog/2021/11/good-etl-practices-with-apache-airflow/

スポット画像

最新のインテリジェンス

スポット画像