ゼファーネットのロゴ

SageMaker Feature Store と Apache Iceberg オフライン ストア圧縮を使用して ML 開発を高速化する

日付:

今日、企業は機能ストアを確立して、ビジネス ユニットやデータ サイエンス チーム全体で ML 開発をスケーリングするための中央リポジトリを提供しています。 特徴データのサイズと複雑さが増すにつれて、データ サイエンティストはこれらの特徴ストアを効率的にクエリして、実験、モデル トレーニング、およびバッチ スコアリング用のデータセットを抽出できる必要があります。

Amazon SageMaker フィーチャーストア は、データ サイエンティストと ML エンジニアが、トレーニングと予測のワークフローで使用される精選されたデータを安全に保存、検出、共有できるようにする、専用の機能管理ソリューションです。 SageMaker Feature Store がサポートされるようになりました アパッチ氷山 機能を格納するためのテーブル形式として。 これにより、ML トレーニング データセットを抽出する際のクエリ パフォーマンスが向上し、Iceberg テーブルの圧縮が利用されるため、モデルの開発が加速されます。 機能グループの設計とその規模に応じて、この新しい機能を使用することで、トレーニング クエリのパフォーマンスが 10 倍から 100 倍向上することを体験できます。

この投稿の終わりまでに、Iceberg 形式を使用して機能グループを作成し、Iceberg のテーブル管理手順を使用して実行する方法がわかります。 アマゾンアテナ、およびこれらのタスクが自律的に実行されるようにスケジュールします。 Spark ユーザーの場合は、Spark を使用して同じ手順を実行し、それらを独自の Spark 環境と自動化に組み込む方法も学習します。

SageMaker Feature Store と Apache Iceberg

Amazon SageMaker フィーチャーストア 機能と関連メタデータの一元化されたストアであり、さまざまなプロジェクトや ML モデルに取り組んでいるデータ サイエンティスト チームが機能を簡単に発見して再利用できるようにします。

SageMaker Feature Store は、機能を管理するためのオンライン モードとオフライン モードで構成されています。 オンライン ストアは、低レイテンシのリアルタイム推論のユース ケースに使用されます。 オフライン ストアは、主にバッチ予測とモデル トレーニングに使用されます。 オフライン ストアは追加専用のストアであり、履歴フィーチャ データの保存とアクセスに使用できます。 オフライン ストアを使用すると、ユーザーは、探索とバッチ スコアリングのために機能を保存して提供し、モデル トレーニングのために特定の時点で正しいデータセットを抽出できます。

オフライン ストアのデータは、AWS アカウントの Amazon Simple Storage Service (Amazon S3) バケットに保存されます。 SageMaker Feature Store は、機能グループの作成中に AWS Glue データカタログを自動的に構築します。 顧客は、Spark ランタイムを使用してオフライン ストア データにアクセスし、ML 機能分析と機能エンジニアリングのユース ケースのためにビッグ データ処理を実行することもできます。

テーブル形式は、データ ファイルをテーブルとして抽象化する方法を提供します。 長年にわたり、ACID トランザクション、ガバナンス、およびカタログのユース ケースをサポートするために、多くのテーブル形式が登場しました。 アパッチ氷山 非常に大規模な分析データセット用のオープン テーブル形式です。 ファイルの大規模なコレクションをテーブルとして管理し、レコード レベルの挿入、更新、削除、タイム トラベル クエリなどの最新の分析データ レイク操作をサポートします。 Iceberg は、個々のデータ ファイルをディレクトリではなくテーブルで追跡します。 これにより、ライターはその場でデータ ファイルを作成し (ファイルは移動または変更されません)、明示的なコミットでのみファイルをテーブルに追加できます。 テーブルの状態は、メタデータ ファイルで維持されます。 テーブルの状態に対するすべての変更により、古いメタデータをアトミックに置き換える新しいメタデータ ファイル バージョンが作成されます。 テーブル メタデータ ファイルは、テーブル スキーマ、パーティション構成、およびその他のプロパティを追跡します。

Iceberg は AWS サービスと統合されています。 たとえば、 AWSグルー Iceberg テーブルのメタストアとしての Data Catalog、および アテナ データに Apache Parquet 形式を使用し、メタストアに AWS Glue カタログを使用する Apache Iceberg テーブルの読み取り、タイムトラベル、書き込み、および DDL クエリをサポートします。

SageMaker Feature Store を使用すると、デフォルトの標準 Glue 形式の代わりに、Iceberg テーブル形式で機能グループを作成できるようになりました。 これにより、お客様は新しいテーブル形式を活用して、Iceberg のファイル圧縮およびデータ プルーニング機能を使用し、ユース ケースと最適化の要件を満たすことができます。 また、Iceberg を使用すると、顧客は削除、タイムトラベル クエリ、高同時実行トランザクション、および高性能クエリを実行できます。

テーブル形式としての Iceberg と圧縮などのテーブル メンテナンス オペレーションを組み合わせることで、お客様は大規模なオフライン機能グループを操作する際のクエリ パフォーマンスが向上し、ML トレーニング データセットをより迅速に構築できるようになります。

次の図は、Iceberg をテーブル形式として使用したオフライン ストアの構造を示しています。

次のセクションでは、Iceberg 形式を使用して機能グループを作成する方法、AWS Athena を使用して Iceberg のテーブル管理手順を実行する方法、および AWS のサービスを使用してこれらのタスクをオンデマンドまたはスケジュールに従って実行するようにスケジュールする方法を学習します。 Spark ユーザーの場合は、Spark を使用して同じ手順を実行する方法も学習します。

段階的な手順については、 サンプルノート、これは GitHub にあります。 この投稿では、最も重要な部分を強調します。

Iceberg テーブル形式を使用した機能グループの作成

新しい機能グループを作成するときは、最初に表形式として Iceberg を選択する必要があります。 新しいオプション パラメータ TableFormat Amazon SageMaker Studio を使用してインタラクティブに設定するか、API または SDK を使用してコードを介して設定できます。 このパラメータは値を受け入れます ICEBERG or GLUE (現在の AWS Glue 形式の場合)。 次のコード スニペットは、Iceberg 形式と FeatureGroup.create SageMaker SDK の API。

orders_feature_group_iceberg.create(
s3_uri=f"s3://{s3_bucket_name}/{prefix}",
record_identifier_name=record_identifier_feature_name,
event_time_feature_name=event_time_feature_name,
role_arn=role,
enable_online_store=True,
table_format=TableFormatEnum.ICEBERG
)

テーブルが作成され、AWS Glue データ カタログに自動的に登録されます。

今で orders_feature_group_iceberg が作成されたら、選択した取り込みパイプラインを使用して機能を取り込むことができます。 この例では、 FeatureGroup.ingest() Pandas DataFrame からレコードを取り込む API。 また、 FeatureGroup().put_record 個々のレコードを取り込むか、ストリーミング ソースを処理するための API。 Spark ユーザーは、私たちを使用して Spark データフレームを取り込むこともできます Sparkコネクタ.

orders_fg = FeatureGroup(name=orders_feature_group_iceberg_name,
sagemaker_session=feature_store_session)
orders_fg.ingest(data_frame=order_data, wait=True)

オフライン フィーチャ ストアに対してクエリを実行することで、レコードが正常に取り込まれたことを確認できます。 S3 の場所に移動して、新しいフォルダー構造を確認することもできます。

Iceberg テーブル管理手順の実行

アマゾンアテナ は、Iceberg 管理手順をネイティブにサポートするサーバーレス SQL クエリ エンジンです。 このセクションでは、Athena を使用して、作成したオフライン機能グループを手動で圧縮します。 Athena エンジン バージョン 3 を使用する必要があることに注意してください。このためには、新しいワークグループを作成するか、既存のワークグループを構成して、推奨される Athena エンジン バージョン 3 を選択します。Athena エンジン バージョンの変更に関する詳細と手順については、次を参照してください。 Athena エンジンのバージョンの変更.

データが Iceberg テーブルに蓄積されると、追加のファイルを開くために必要な処理時間が増加するため、クエリの効率が徐々に低下する可能性があります。 圧縮により、テーブルの内容を変更することなく、テーブルの構造レイアウトが最適化されます。

圧縮を実行するには、 OPTIMIZE table REWRITE DATA Athena のコンパクション テーブル メンテナンス コマンド。 次の構文は、Iceberg テーブル形式を使用して格納されたフィーチャ グループのデータ レイアウトを最適化する方法を示しています。 の sagemaker_featurestore SageMaker Feature Store データベースの名前を表し、 orders-feature-group-iceberg-post-comp-03-14-05-17-1670076334 は機能グループ テーブル名です。

OPTIMIZE sagemaker_featurestore.orders-feature-group-iceberg-post-comp-03-14-05-17-1670076334 REWRITE DATA USING BIN_PACK

最適化コマンドを実行した後、 VACUUM スナップショットの期限切れを実行し、孤立したファイルを削除する手順。 これらのアクションにより、メタデータのサイズが縮小され、現在のテーブルの状態ではなく、テーブルに指定された保持期間よりも古いファイルが削除されます。

VACUUM sagemaker_featurestore.orders-feature-group-iceberg-post-comp-03-14-05-17-1670076334

テーブルのプロパティは、Athena の ALTER TABLE. これを行う方法の例については、 アテナのドキュメント. バキュームの場合、 vacuum_min_snapshots_to_keep および vacuum_max_snapshot_age_seconds スナップショット プルーニング パラメータの設定に使用できます。

サンプルのフィーチャ グループ テーブルで圧縮を実行した場合のパフォーマンスへの影響を見てみましょう。 テスト目的で、同じ注文機能レコードを XNUMX つの機能グループに取り込みました。 orders-feature-group-iceberg-pre-comp-02-11-03-06-1669979003 および orders-feature-group-iceberg-post-comp-03-14-05-17-1670076334Scikit-Learn で並列化された SageMaker 処理ジョブを使用すると、Amazon S49,908,135 に 3 個のオブジェクトが保存され、合計サイズは 106.5 GiB になります。

クエリを実行して、機能グループで重複や削除されたレコードのない最新のスナップショットを選択します orders-feature-group-iceberg-pre-comp-02-11-03-06-1669979003. 圧縮前は、クエリに 1 時間 27 分かかりました。

次に、圧縮を実行します orders-feature-group-iceberg-post-comp-03-14-05-17-1670076334 Athena OPTIMIZE クエリを使用して、機能グループ テーブルを Amazon S109,851 の 3 オブジェクトと合計サイズ 2.5 GiB に圧縮しました。 圧縮後に同じクエリを実行すると、実行時間は 1 分 13 秒に短縮されました。

Iceberg ファイルの圧縮により、クエリの実行時間が大幅に短縮されました。 同じクエリの場合、実行時間は 1 時間 27 分から 1 分 13 秒に短縮され、71 倍高速になりました。

AWS サービスを使用した Iceberg 圧縮のスケジューリング

このセクションでは、テーブル管理手順を自動化してオフライン フィーチャ ストアを圧縮する方法を学習します。 次の図は、Iceberg テーブル形式で機能グループを作成するためのアーキテクチャと、ファイルの圧縮およびクリーンアップ操作を含む完全に自動化されたテーブル管理ソリューションを示しています。

大まかに言えば、Iceberg テーブル形式を使用してフィーチャ グループを作成し、レコードをオンライン フィーチャ ストアに取り込みます。 特徴値は、オンライン ストアから過去のオフライン ストアに自動的に複製されます。 Athena は、Iceberg 管理手順を実行するために使用されます。 手順をスケジュールするには、 AWSグルー Python シェル スクリプトを使用してジョブを作成し、AWS Glue ジョブ スケジュールを作成します。

AWS Glue ジョブのセットアップ

AWS Glue ジョブを使用して、Iceberg テーブルのメンテナンスオペレーションをスケジュールに従って実行します。 まず、AWS Glue が Amazon Athena、Amazon S3、および CloudWatch にアクセスするためのアクセス許可を持つように、IAM ロールを作成する必要があります。

次に、Iceberg プロシージャを実行するための Python スクリプトを作成する必要があります。 あなたは見つけることができます サンプルスクリプト で。 スクリプトは、boto3 を使用して OPTIMIZE クエリを実行します。

optimize_sql = f"optimize {database}.{table} rewrite data using bin_pack"

スクリプトは、AWS Glue を使用してパラメータ化されています getResolvedOptions(args, options) ジョブの実行時にスクリプトに渡される引数にアクセスできるユーティリティ関数。 この例では、AWS リージョン、機能グループの Iceberg データベースとテーブル、Athena ワークグループ、Athena 出力場所の結果フォルダーをパラメーターとしてジョブに渡すことができるため、このスクリプトを環境で再利用できます。

最後に、実際の AWS Glue ジョブを作成して、スクリプトを AWS Glue のシェルとして実行します。

  • AWS Glue コンソールに移動します。
  • 選択する Jobs > Create New Job AWS Glue Studio の下のタブ。
  • 選択 Python シェル スクリプト エディタ.
  • 選択する 既存のスクリプトをアップロードして編集するに設定します。 OK をクリックします。 創造する.
  •   仕事の詳細 ボタンを使用すると、AWS Glue ジョブを設定できます。 以前に作成した IAM ロールを選択する必要があります。 選択する Pythonの3.9 または利用可能な最新の Python バージョン。
  • 同じタブで、次のような他の多くの構成オプションを定義することもできます。 リトライ回数 or ジョブのタイムアウト。 に 高度なプロパティ、下のスクリーンショットの例に示すように、ジョブパラメーターを追加してスクリプトを実行できます。
  • クリック Save.

日程 タブで、フィーチャ ストアのメンテナンス手順を実行するスケジュールを定義できます。 たとえば、次のスクリーンショットは、6 時間ごとのスケジュールでジョブを実行する方法を示しています。

ジョブの実行を監視して、完了ステータス、期間、開始時間などのランタイム メトリックを理解できます。 AWS Glue ジョブの CloudWatch Logs をチェックして、手順が正常に実行されていることを確認することもできます。

Spark を使用した Iceberg テーブル管理タスクの実行

お客様は、Spark を使用して圧縮ジョブとメンテナンス方法を管理することもできます。 Spark プロシージャの詳細については、 Sparkドキュメント.

最初に、いくつかの共通プロパティを構成する必要があります。

%%configure -f
{ "conf": { "spark.sql.catalog.smfs": "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.smfs.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", "spark.sql.catalog.smfs.warehouse": "<YOUR_ICEBERG_DATA_S3_LOCATION>", "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.smfs.glue.skip-name-validation": "true" }
}

次のコードは、Spark を介して機能グループを最適化するために使用できます。

spark.sql(f"""CALL smfs.system.rewrite_data_files(table => '{DATABASE}.`{ICEBERG_TABLE}`')""")

その後、次の XNUMX つのテーブル メンテナンス手順を実行して、不要になった古いスナップショットとオーファン ファイルを削除できます。

spark.sql(f"""CALL smfs.system.expire_snapshots(table => '{DATABASE}.`{ICEBERG_TABLE}`', older_than => TIMESTAMP '{one_day_ago}', retain_last => 1)""")
spark.sql(f"""CALL smfs.system.remove_orphan_files(table => '{DATABASE}.`{ICEBERG_TABLE}`')""")

次に、上記の Spark コマンドを Spark 環境に組み込むことができます。 たとえば、希望のスケジュールで、または取り込み後にパイプラインで上記の最適化を実行するジョブを作成できます。

完全なコード例を調べて、自分のアカウントで試してみるには、 GitHubレポ.

まとめ

SageMaker Feature Store は、組織がデータ サイエンス チーム間で ML 開発をスケーリングするのに役立つ、専用の機能管理ソリューションを提供します。 この投稿では、Apache Iceberg をテーブル形式として活用し、コンパクト化などのテーブル メンテナンス操作を活用して、大規模なオフライン機能グループで作業するときにクエリを大幅に高速化し、その結果、トレーニング データセットをより迅速に構築する方法について説明しました。 試してみて、コメントでご意見をお聞かせください。


著者について

アルノーラウアー AWS の公共部門チームのシニア パートナー ソリューション アーキテクトです。 彼は、パートナーと顧客が AWS テクノロジーを使用してビジネスニーズをソリューションに変換する最善の方法を理解できるようにします。 彼は、公共部門、エネルギー、消費財など、さまざまな業界でデジタル トランスフォーメーション プロジェクトの提供と設計に 17 年以上の経験を持っています。 Arnaud は、ML Specialty Certification を含む 12 の AWS 認定を保持しています。

イオアンカタナ は、AW​​S の人工知能および機械学習のスペシャリスト ソリューション アーキテクトです。 彼は、顧客が AWS クラウドで ML ソリューションを開発およびスケーリングするのを支援しています。 Ioan は、主にソフトウェア アーキテクチャ設計とクラウド エンジニアリングで 20 年以上の経験があります。

マークロイ はAWSの主要な機械学習アーキテクトであり、お客様がAI / MLソリューションを設計および構築するのを支援しています。 Markの仕事は、コンピュータービジョン、ディープラーニング、企業全体でのMLのスケーリングに主な関心を持って、幅広いMLユースケースをカバーしています。 彼は、保険、金融サービス、メディアとエンターテインメント、ヘルスケア、公益事業、製造業など、多くの業界の企業を支援してきました。 Markは、ML専門認定を含む25つのAWS認定を保持しています。 AWSに参加する前は、金融サービスでの19年間を含め、XNUMX年以上にわたってアーキテクト、開発者、テクノロジーのリーダーを務めていました。

ブランドン・チャタム SageMaker Feature Store チームのソフトウェアエンジニアです。 彼は、ビッグデータと機械学習を人々の指先にもたらすエレガントなシステムを構築することに情熱を注いでいます。

スポット画像

最新のインテリジェンス

スポット画像