ゼファーネットのロゴ

知事:Pythonを使用して最初のETLパイプラインを作成およびスケジュールする方法

日付:

ゴールドブログ知事:Pythonを使用して最初のETLパイプラインを作成およびスケジュールする方法

ローカルとクラウドの両方でワークフロー管理システムが簡単になりました。


By DarioRadečić、NEOSのコンサルタント



による写真 ヘレナ・ロペス から Pexels

 

Prefectは、単純な前提に基づいたPythonベースのワークフロー管理システムです。 —コードはおそらく機能しますが、機能しない場合もあります (source)。 すべてが期待どおりに機能する場合、ワークフローシステムについて考える人は誰もいません。 しかし、事態が悪化すると、Prefectはコードが正常に失敗することを保証します。

ワークフロー管理システムとして、Prefectを使用すると、ロギング、再試行、動的マッピング、キャッシング、障害通知などをデータパイプラインに簡単に追加できます。 必要のないときは見えません—すべてが期待どおりに機能するとき、そして必要なときは見えます。 保険のようなもの。

Pythonユーザーが利用できるワークフロー管理システムはPrefectだけではありませんが、間違いなく最も熟練したシステムです。 Apache Airflowなどの代替手段は通常はうまく機能しますが、大きなプロジェクトで作業する場合は多くの頭痛の種になります。 PrefectとAirflowの詳細な比較を読むことができます こちら.

この記事では、タスク、フロー、パラメーター、障害、スケジュールなどのライブラリーの基本について説明し、ローカルとクラウドの両方で環境をセットアップする方法についても説明します。 使用します 土星の雲 その部分については、構成が簡単になるためです。 これはデータサイエンティストによって作成されたクラウドプラットフォームであるため、手間のかかる作業のほとんどはあなたのために行われます。

Saturn Cloudは、汗をかくことなくPrefectワークフローを処理できます。 また、ダッシュボードから分散型機械学習、ディープラーニング、GPUトレーニングまで、あらゆるものに対応する最先端のソリューションです。

今日は、次の方法を学びます。

  • Prefectをローカルにインストールする
  • Pythonで簡単なETLパイプラインを作成する
  • Prefectを使用して、タスク、フロー、パラメーター、スケジュールを宣言し、障害を処理します
  • SaturnCloudでPrefectを実行する

Prefectをローカルにインストールする方法

 
 
Prefectライブラリを仮想環境内にインストールします。 次のコマンドは、という名前の環境を作成してアクティブ化します prefect_env Python 3.8に基づくAnaconda経由:

conda create — name prefect_env python=3.8
conda activate prefect_env

入力する必要があります y Anacondaに続行するように指示するために数回ですが、それはすべてのインストールに当てはまります。 ライブラリに関しては、 パンダ データ操作の場合、 つながり申請 データをダウンロードするため、そしてもちろん、 知事 ワークフロー管理の場合:

conda install requests pandas
conda install -c conda-forge prefect

これで、Pythonコードの記述を開始するために必要なすべてのものが揃いました。 次にそれをしましょう。

Pythonを使用したETLパイプラインの作成

 
 
今日は、Prefectを使用して、比較的単純なタスクを完了します—ETLパイプラインを実行します。 このパイプラインは、ダミーAPIからデータをダウンロードして変換し、CSVとして保存します。 The JSONプレースホルダー WebサイトはダミーAPIとして機能します。 特に、XNUMX人のユーザーの偽のデータが含まれています。

画像1—偽のユーザーデータ(出典: https://jsonplaceholder.typicode.com/users) (作者による画像)


 

Pythonファイルを作成することから始めましょう—私は私の名前を付けました 01_etl_pipeline.py。 また、抽出および変換されたデータが保存されるフォルダーがあることを確認してください。 私はそれを呼んだ data、Pythonスクリプトがある場所にあります。

ETLパイプラインには、データの抽出、変換、読み込みのXNUMXつの機能を実装する必要があります。 この場合、これらの関数は次のように動作します。

  • extract(url: str) -> dict —にGETリクエストを行います url パラメータ。 一部のデータが返されたかどうかをテストします。その場合、データは辞書として返されます。 それ以外の場合は、例外が発生します。
  • transform(data: dict) -> pd.DataFrame — ID、名前、ユーザー名、電子メール、住所、電話番号、会社などの特定の属性のみが保持されるようにデータを変換します。 変換されたデータをPandasDataFrameとして返します。
  • load(data: pd.DataFrame, path: str) -> None —以前に変換されたものを保存します data でCSVファイルに path。 また、ファイル名にタイムスタンプを追加して、ファイルが上書きされないようにします。

関数宣言後、Pythonスクリプトの実行時にXNUMXつすべてが呼び出されます。 完全なコードスニペットは次のとおりです。

これで、ターミナルから次のコマンドを実行してスクリプトを実行できます。

python 01_etl_pipeline.py

すべてが正しく実行された場合、出力は表示されないはずです。 ただし、CSVファイルは data フォルダ(ファイルをXNUMX回実行しました):



画像2— ETLパイプラインをXNUMX回実行した後のデータフォルダー内のCSVファイルのリスト(作成者による画像)

 

ご覧のとおり、ETLパイプラインはエラーなしで実行および終了します。 しかし、パイプラインをスケジュールどおりに実行したい場合はどうでしょうか。 そこが 知事 に入っています。

知事の基本を探る

 
 
このセクションでは、の基本を学びます 知事 タスク、フロー、パラメーター、スケジュールなど。

知事のタスク

 
 
最も単純なタスクから始めましょう。 これは基本的にワークフローのXNUMXつのステップです。 フォローするには、という新しいPythonファイルを作成します 02_task_conversion.py。 からすべてをコピーします 01_etl_pipeline.py、そしてあなたは行く準備ができています。

Python関数をPrefectTaskに変換するには、最初に必要なインポートを行う必要があります— from prefect import task、そして興味のある機能を飾ります。 次に例を示します。

@task
def my_function(): pass

それがあなたがしなければならないすべてです! ETLパイプラインの更新バージョンは次のとおりです。

それを実行して、何が起こるか見てみましょう:

python 02_task_conversion.py



画像3— Prefectを使用した関数からタスクへの変換(作成者による画像)

 

何かがおかしいようです。 それは 知事の仕事 なしで実行することはできません 知事の流れ。 次に実装しましょう。

知事の流れ

 
 
からすべてをコピーします 02_task_conversion.py 新しいファイルに— 03_flow.py。 インポートする必要があります Flow   prefect 宣言する前にライブラリ。

フローを宣言するために、別のPython関数を記述します— prefect_flow()。 パラメータを受け入れず、何も装飾されません。 関数内では、Pythonのコンテキストマネージャーを使用してフローを作成します。 フローには、以前は if __name__ == ‘__main__” コードブロック。

上記のブロックでは、対応するフローを実行する必要があります run() 機能。

このファイルの完全なコードは次のとおりです。

それを実行して、何が起こるか見てみましょう:

python 03_flow.py

画像4—初めてPrefect Flowを実行する(作成者による画像)


 

今それは何かです! ETLパイプラインが実行されるだけでなく、すべてのタスクがいつ開始および終了したかに関する詳細情報も取得します。 ファイルをXNUMX回実行したので、XNUMXつの新しいCSVファイルをに保存する必要があります data フォルダ。 それが事実かどうかを確認しましょう:

画像5— Prefect Flowによって生成されたCSVファイル(作成者による画像)


 

これが、Prefectを使用して単純なETLパイプラインを実行する方法です。 純粋なPython実装に比べてまだ多くの利点はありませんが、すぐに変更します。

知事のパラメータ

 
 
パラメータ値をハードコーディングすることは決して良い考えではありません。 そこが 知事のパラメータ 入ってください。開始するには、からすべてをコピーします 03_flow.py 新しいファイルに— 04_parameters.py。 をインポートする必要があります Parameter からのクラス prefect パッケージ。

このクラスは、フローコンテキストマネージャー内で使用できます。 役立つと思われる引数は次のとおりです。

  • name —パラメータの名前。後でフローを実行するときに使用されます。
  • required —ブール値。フローの実行にパラメーターが必要かどうかを指定します。
  • default —パラメータのデフォルト値を指定します。

API URLのパラメータを宣言します— param_url = Parameter(name=’p_url’, required=True).

パラメータに値を割り当てるには、を指定する必要があります parameters の引数としての辞書 run() 関数。 パラメータ名と値は、キーと値のペアとして記述する必要があります。

このファイルの完全なコードは次のとおりです。

ファイルを実行して、何が起こるか見てみましょう。

python 04_parameters.py

画像6—パラメータを含むPrefect Flowの実行(作成者による画像)


 

ファイルをXNUMX回実行したので、XNUMXつの新しいCSVファイルが data フォルダ。 それが本当かどうかを確認しましょう:



画像7—パラメータを含むPrefect Flowによって生成されたCSVファイル(作成者による画像)

 

そして、あなたはそれを持っています—一箇所でのパラメータ値の指定。 これにより、将来の変更や、より複雑なワークフローの管理が容易になります。

次に、特に便利なPrefectの機能であるスケジュールについて説明します。

知事のスケジュール

 
 
今日は、タスクをスケジュールするXNUMXつの方法を検討します— インターバルスケジュール & cronスケジュール。 CronはUnixでタスクをスケジュールするためのよく知られた方法であるため、XNUMX番目の方法はおなじみのように聞こえるかもしれません。

まずは インターバルスケジューラ。 まず、からすべてをコピーします 04_intervals.py 〜へ 05_interval_scheduler.py。 インポートする必要があります IntervalScheduler から prefect.schedules.

次に、インポートされたクラスのインスタンスを、 prefect_flow() 関数宣言を行い、XNUMX秒ごとに実行するように指示します。 これは、の値を設定することで実行できます。 interval パラメータに一致する最初のデバイスのリモートコントロール URL を返します。

スケジューラをワークフローに接続するには、の値を指定する必要があります schedule 初期化時のパラメータ Flow コンテキストマネージャーを使用したクラス。

スクリプトファイル全体は次のようになります。

ファイルを実行して、何が起こるか見てみましょう。

python 05_interval_scheduler.py



画像8—インターバルスケジュールの使用(作成者による画像)

 

ご覧のとおり、ETLパイプライン全体がXNUMX回実行されました。 知事は、次の実行が発生したときにターミナルに報告します。

それでは、 cronスケジューラ。 からすべてをコピーします 05_interval_scheduler.py 〜へ 06_cron_scheduler.py。 今回はインポートします CronSchedule   IntervalSchedule.

クラスの初期化時に、cronパターンを指定します cron パラメータ。 XNUMXつの星の記号は、ワークフローがXNUMX分ごとに実行されることを保証します。 これは、Cronで可能な限り低い間隔です。

ファイルの残りの部分は同じままです。 コードは次のとおりです。

ファイルを実行してみましょう:

Python 06_cron_scheduler.py




画像9— Cronスケジュールの使用(作成者による画像)

 

ご覧のとおり、ETLパイプラインは、Cronパターンで指定されているように、XNUMX分ごとにXNUMX回実行されました。 このセクションの最後の部分では、障害を処理する方法を探り、障害に常に備える必要がある理由を説明します。

知事の失敗

 
 
遅かれ早かれ、ワークフローで予期しないエラーが発生します。 Prefectは、タスクの実行を再試行するための途方もなく簡単な方法を提供します。 まず、からすべてをコピーします 04_parameters.py 新しいファイルに— 07_failures.py.

  extract() さまざまなネットワーク上の理由で機能が失敗する可能性があります。 たとえば、APIは現在利用できませんが、数秒で利用できるようになる可能性があります。 これらのことは実稼働環境で発生し、アプリケーションを完全にクラッシュさせるべきではありません。

不要なクラッシュを回避するために、 task デコレータ少し。 さまざまなパラメータを受け入れることができ、今日は max_retries & retry_delay。 どちらも一目瞭然なので、これ以上の説明はしません。

唯一の問題は、ワークフローがそのまま失敗しないことです。 ただし、存在しないURLをパラメータ値として内部に配置すると、 flow.run()。 コードは次のとおりです。

ファイルを実行してみましょう:

python 07_failures.py



画像10— Prefectによる障害の防止(作成者による画像)

 

タスクは失敗しましたが、ワークフローはクラッシュしませんでした。 もちろん、XNUMX回再試行するとクラッシュしますが、パラメータの指定はいつでも変更できます。

これで、Prefectをローカルで操作できます。 次に、コードをクラウドに移動して、変更点を調べてみましょう。

SaturnCloudでPrefectを実行する

 
 
すぐに手を汚しましょう。 開始するには、無料版のにサインアップしてください プレフェクトクラウド アカウント。 登録プロセスは簡単で、それ以上の説明は必要ありません。 登録したら、プロジェクトを作成します。 私の名前を付けました SaturnCloudDemo.

に行く前に 土星の雲、XNUMXつを接続するAPIキーをPrefectで作成する必要があります。 あなたは見つけるでしょう APIキー 設定の下のオプション。 ご覧のとおり、私は私の名前を付けました SaturnDemoKey:



画像11— Prefect Cloud APIキーの作成(作成者による画像)

 

これで必要なものがすべて揃ったので、 土星の雲 無料のアカウントを作成します。 ダッシュボードに表示されると、プロジェクトを作成するための複数のオプションが表示されます。 を選択 知事 以下に示すようなオプション:



画像12— Saturn CloudでのPrefectプロジェクトの作成(作成者による画像)

 

Saturn Cloudは、すべての面倒な作業を自動的に実行します。数分後、ボタンをクリックしてJupyterLabインスタンスを開くことができます。



画像13— Saturn CloudでJupyterLabを開く(作成者による画像)

 

XNUMXつのノートブックにアクセスできます。XNUMXつ目は、SaturnCloudでPrefectを使用する簡単なデモンストレーションを示しています。 外観は次のとおりです。



画像14— SaturnCloudのPrefectCloudノートブック(作成者による画像)

 

ノートブックを機能させるには、XNUMXつだけ変更する必要があります。 まず、プロジェクト名をPrefectCloudのプロジェクト名に変更します。 次に、交換します <your_api_key_here> 数分前に生成されたAPIキーを使用します。 すべてを正しく実行すると、次のメッセージが表示されます。



画像15— Saturn Cloudにログイン成功したメッセージ(作成者による画像)

 

テストするには、ノートブックで後続するすべてのセルを実行します。 次に、Prefect Cloudダッシュボードに移動して、プロジェクトを開きます。 数分前のように空にはなりません。



画像16—成功したPrefectタスクのスケジューリング(作成者による画像)

 

そして、それはあなたがしなければならないすべてです! ETLパイプラインをコピーして貼り付け、機能することを確認してください。 そこでSaturnCloudが輝いています。面倒な作業はすべて自動的に構成されるため、最小限の変更でローカルマシンからコードをコピーして貼り付けることができます。

次のセクションでまとめましょう。

最終的な考え

 
 
これで、Prefectの基本がローカルとクラウドの両方で説明されました。 この記事を読む前にトピックについて何も知らなくても、本番アプリケーションのワークフロー管理システムの価値を理解していただければ幸いです。

ロギングとSlack通知の構成など、より高度なガイドについては、以下を参照してください。 公式ドキュメント。 提供されている例は、始めるのに十分すぎるほどです。

フォロー

 
 

  • 私に従ってください M このようなより多くの物語のために
  • 私にサインアップ ニュースレター
  • 接続する LinkedIn

 
バイオ: DarioRadečić NEOSのコンサルタントです。

元の。 許可を得て転載。

関連する


PlatoAi。 Web3の再考。 増幅されたデータインテリジェンス。
アクセスするには、ここをクリックしてください。

ソース:https://www.kdnuggets.com/2021/08/prefect-write-schedule-etl-pipeline-python.html

スポット画像

最新のインテリジェンス

スポット画像