ゼファーネットのロゴ

オープンソースのトランザクションテーブル形式に Amazon Athena と Spark SQL を使用する |アマゾン ウェブ サービス

日付:

AWS を活用したデータレイクは、比類のない可用性によってサポートされています。 Amazon シンプル ストレージ サービス (Amazon S3) は、さまざまなデータと分析アプローチを組み合わせるために必要なスケール、俊敏性、柔軟性を処理できます。データ レイクのサイズが増大し、使用方法が成熟するにつれて、データとビジネス イベントの整合性を維持するために多大な労力が費やされる可能性があります。ファイルがトランザクション的に一貫した方法で更新されるようにするために、次のようなオープンソースのトランザクション テーブル形式を使用する顧客が増えています。 アパッチ氷山, アパッチ・フディ, Linux Foundation デルタレイク これは、高圧縮率でデータを保存し、アプリケーションやフレームワークとネイティブにインターフェイスし、Amazon S3 上に構築されたデータレイクでの増分データ処理を簡素化するのに役立ちます。これらの形式により、ACID (アトミック性、整合性、分離性、耐久性) トランザクション、アップサート、削除、および以前はデータ ウェアハウスでのみ利用可能であったタイム トラベルやスナップショットなどの高度な機能が可能になります。各ストレージ形式は、この機能をわずかに異なる方法で実装します。比較については、を参照してください。 AWS のトランザクション データ レイクにオープン テーブル形式を選択する.

2023年には、 AWS が一般提供を発表 Apache Iceberg、Apache Hudi、および Linux Foundation Delta Lake の場合 Apache Spark 用 Amazon Athenaこれにより、別個のコネクタまたは関連する依存関係をインストールしてバージョンを管理する必要がなくなり、これらのフレームワークの使用に必要な構成手順が簡素化されます。

この投稿では、Spark SQL を使用する方法を説明します。 アマゾンアテナ ノートブックを作成し、Iceberg、Hudi、Delta Lake テーブル形式で動作します。 Athena の Spark SQL を使用した、データベースとテーブルの作成、テーブルへのデータの挿入、データのクエリ、Amazon S3 でのテーブルのスナップショットの確認などの一般的な操作を示します。

前提条件

次の前提条件を完了します。

Amazon S3 からサンプルノートブックをダウンロードしてインポートする

手順を進めるには、この投稿で説明したノートブックを次の場所からダウンロードしてください。

ノートブックをダウンロードした後、次の手順に従ってノートブックを Athena Spark 環境にインポートします。 ノートブックをインポートするには のセクション ノートブック ファイルの管理.

特定の「Open Table Format」セクションに移動します

Iceberg テーブル形式に興味がある場合は、次の場所に移動してください。 Apache Iceberg テーブルの操作 のセクションから無料でダウンロードできます。

Hudi テーブル形式に興味がある場合は、次の場所に移動してください。 Apache Hudi テーブルの操作 のセクションから無料でダウンロードできます。

Delta Lake テーブル形式に興味がある場合は、次の場所に移動してください。 Linux 基盤の Delta Lake テーブルの操作 のセクションから無料でダウンロードできます。

Apache Iceberg テーブルの操作

Athena で Spark ノートブックを使用する場合、PySpark を使用せずに SQL クエリを直接実行できます。これを行うには、セル マジックを使用します。セル マジックは、セルの動作を変更するノートブック セル内の特別なヘッダーです。 SQL の場合、次を追加できます。 %%sql マジックは、セルの内容全体を Athena 上で実行される SQL ステートメントとして解釈します。

このセクションでは、Apache Spark for Athena で SQL を使用して Apache Iceberg テーブルを作成、分析、管理する方法を示します。

ノートブックセッションをセットアップする

Athena で Apache Iceberg を使用するには、セッションの作成または編集中に、 アパッチ氷山 オプションを展開して、 Apache Spark のプロパティ セクション。次のスクリーンショットに示すように、プロパティが事前に設定されます。

この画像は、Athena で Spak セッションを作成するときに設定された Apache Iceberg プロパティを示しています。

手順については、を参照してください。 セッションの詳細の編集 or 自分だけのノートを作成する.

このセクションで使用されているコードは、次の場所にあります。 SparkSQL_iceberg.ipynb ファイルを参照してください。

データベースと Iceberg テーブルを作成する

まず、AWS Glue データカタログにデータベースを作成します。次の SQL を使用すると、次のようなデータベースを作成できます。 icebergdb:

%%sql
CREATE DATABASE icebergdb

次にデータベース内で icebergdbという名前の Iceberg テーブルを作成します。 noaa_iceberg データをロードする Amazon S3 内の場所を指します。次のステートメントを実行し、場所を置き換えます s3://<your-S3-bucket>/<prefix>/ S3 バケットとプレフィックスを使用して:

%%sql
CREATE TABLE icebergdb.noaa_iceberg(
station string,
date string,
latitude string,
longitude string,
elevation string,
name string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string)
USING iceberg
PARTITIONED BY (year string)
LOCATION 's3://<your-S3-bucket>/<prefix>/noaaiceberg/'

テーブルにデータを挿入する

を設定するには、 noaa_iceberg Iceberg テーブル、Parquet テーブルからデータを挿入します sparkblogdb.noaa_pq これは前提条件の一部として作成されました。これを行うには、 INSERT INTO Spark でのステートメント:

%%sql
INSERT INTO icebergdb.noaa_iceberg select * from sparkblogdb.noaa_pq

また、 SELECTとしてのテーブルの作成 USING Iceberg 句を使用して、1 ステップで Iceberg テーブルを作成し、ソース テーブルからデータを挿入します。

%%sql
CREATE TABLE icebergdb.noaa_iceberg
USING iceberg
PARTITIONED BY (year)
AS SELECT * FROM sparkblogdb.noaa_pq

Iceberg テーブルにクエリを実行する

データが Iceberg テーブルに挿入されたので、分析を開始できます。 Spark SQL を実行して、年ごとに記録された最低気温を見つけてみましょう。 'SEATTLE TACOMA AIRPORT, WA US' 場所:

%%sql
select name, year, min(MIN) as minimum_temperature
from icebergdb.noaa_iceberg
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

次の出力が得られます。

画像は最初の選択クエリの出力を示しています

Iceberg テーブルのデータを更新する

テーブル内のデータを更新する方法を見てみましょう。駅名を更新したい 'SEATTLE TACOMA AIRPORT, WA US' 〜へ 'Sea-Tac'。 Spark SQL を使用すると、 UPDATE Iceberg テーブルに対するステートメント:

%%sql
UPDATE icebergdb.noaa_iceberg
SET name = 'Sea-Tac'
WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'

次に、前の SELECT クエリを実行して、その場所で記録された最低温度を見つけることができます。 'Sea-Tac' 場所:

%%sql
select name, year, min(MIN) as minimum_temperature
from icebergdb.noaa_iceberg
where name = 'Sea-Tac'
group by 1,2

次の出力が得られます。

画像は 2 番目の選択クエリの出力を示しています

コンパクトなデータファイル

Iceberg のようなオープン テーブル フォーマットは、ファイル ストレージ内にデルタ変更を作成し、マニフェスト ファイルを通じて行のバージョンを追跡することによって機能します。データファイルが増えるとマニフェストファイルに保存されるメタデータも多くなり、データファイルが小さいと不要な量のメタデータが発生することが多く、その結果クエリの効率が低下し、Amazon S3 アクセスコストが高くなります。アイスバーグのランニング rewrite_data_files Spark for Athena のプロシージャは、多くの小さなデルタ変更ファイルを、読み取りに最適化された小さな Parquet ファイルのセットに結合して、データ ファイルを圧縮します。ファイルを圧縮すると、クエリ時の読み取り操作が高速化されます。テーブルで圧縮を実行するには、次の Spark SQL を実行します。

%%sql
CALL spark_catalog.system.rewrite_data_files
(table => 'icebergdb.noaa_iceberg', strategy=>'sort', sort_order => 'zorder(name)')

rewrite_data_files にはオプションがあります をクリックして並べ替え戦略を指定します。これは、データの再編成と圧縮に役立ちます。

テーブルのスナップショットをリストする

Iceberg テーブルに対する書き込み、更新、削除、アップサート、圧縮の各操作により、テーブルの新しいスナップショットが作成されますが、スナップショットの分離とタイム トラベルのために古いデータとメタデータは保持されます。 Iceberg テーブルのスナップショットを一覧表示するには、次の Spark SQL ステートメントを実行します。

%%sql
SELECT *
FROM spark_catalog.icebergdb.noaa_iceberg.snapshots

古いスナップショットを期限切れにする

不要になったデータ ファイルを削除し、テーブル メタデータのサイズを小さく保つために、スナップショットを定期的に期限切れにすることをお勧めします。期限切れになっていないスナップショットにまだ必要なファイルは削除されません。 Spark for Athena で、次の SQL を実行してテーブルのスナップショットを期限切れにします icebergdb.noaa_iceberg 特定のタイムスタンプより古いもの:

%%sql
CALL spark_catalog.system.expire_snapshots
('icebergdb.noaa_iceberg', TIMESTAMP '2023-11-30 00:00:00.000')

タイムスタンプ値は、次の形式の文字列として指定されることに注意してください。 yyyy-MM-dd HH:mm:ss.fff。出力には、削除されたデータ ファイルとメタデータ ファイルの数が表示されます。

テーブルとデータベースを削除します

次の Spark SQL を実行して、この演習から Amazon S3 内の Iceberg テーブルと関連データをクリーンアップできます。

%%sql
DROP TABLE icebergdb.noaa_iceberg PURGE

次の Spark SQL を実行して、データベース Icebergdb を削除します。

%%sql
DROP DATABASE icebergdb

Spark for Athena を使用して Iceberg テーブルで実行できるすべての操作の詳細については、「」を参照してください。 スパーククエリ & スパーク手順 Iceberg のドキュメントに記載されています。

Apache Hudi テーブルの操作

次に、Spark for Athena で SQL を使用して Apache Hudi テーブルを作成、分析、管理する方法を示します。

ノートブックセッションをセットアップする

Athena で Apache Hudi を使用するには、セッションの作成または編集中に、 アパッチ・フディ オプションを展開して、 Apache Spark のプロパティ のセクションから無料でダウンロードできます。

この画像は、Athena で Spak セッションを作成するときに設定された Apache Hudi プロパティを示しています。

手順については、を参照してください。 セッションの詳細の編集 or 自分だけのノートを作成する.

このセクションで使用されるコードは、 SparkSQL_hudi.ipynb ファイルを参照してください。

データベースとHudiテーブルを作成する

まず、というデータベースを作成します。 hudidb これは AWS Glue データ カタログに保存され、その後 Hudi テーブルが作成されます。

%%sql
CREATE DATABASE hudidb

データをロードする Amazon S3 内の場所を指す Hudi テーブルを作成します。表は次のものであることに注意してください。 コピーオンライト タイプ。それは次のように定義されます type= 'cow' テーブル DDL 内。 station と date を複数の主キーとして定義し、preCombinedField を年として定義しました。また、テーブルは年ごとにパーティション化されます。次のステートメントを実行し、場所を置き換えます s3://<your-S3-bucket>/<prefix>/ S3 バケットとプレフィックスを使用して:

%%sql
CREATE TABLE hudidb.noaa_hudi(
station string,
date string,
latitude string,
longitude string,
elevation string,
name string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string,
year string)
USING HUDI
PARTITIONED BY (year)
TBLPROPERTIES(
primaryKey = 'station, date',
preCombineField = 'year',
type = 'cow'
)
LOCATION 's3://<your-S3-bucket>/<prefix>/noaahudi/'

テーブルにデータを挿入する

Iceberg と同様に、 INSERT INTO からデータを読み取ってテーブルにデータを設定するステートメント sparkblogdb.noaa_pq 前回の投稿で作成したテーブル:

%%sql
INSERT INTO hudidb.noaa_hudi select * from sparkblogdb.noaa_pq

Hudi テーブルをクエリする

テーブルが作成されたので、クエリを実行して、その場所で記録された最高温度を見つけてみましょう。 'SEATTLE TACOMA AIRPORT, WA US' 場所:

%%sql
select name, year, max(MAX) as maximum_temperature
from hudidb.noaa_hudi
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

Hudi テーブルのデータを更新する

駅名を変えましょう 'SEATTLE TACOMA AIRPORT, WA US' 〜へ 'Sea–Tac'。 Spark for Athena で UPDATE ステートメントを実行すると、 update の記録 noaa_hudi テーブル:

%%sql
UPDATE hudidb.noaa_hudi
SET name = 'Sea-Tac'
WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'

前の SELECT クエリを実行して、記録された最高温度を見つけます。 'Sea-Tac' 場所:

%%sql
select name, year, max(MAX) as maximum_temperature
from hudidb.noaa_hudi
where name = 'Sea-Tac'
group by 1,2

タイムトラベルクエリを実行する

Athena 上の SQL でタイムトラベル クエリを使用して、過去のデータ スナップショットを分析できます。例えば:

%%sql
select name, year, max(MAX) as maximum_temperature
from hudidb.noaa_hudi timestamp as of '2023-12-01 23:53:43.100'
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

このクエリは、過去の特定の時点でのシアトル空港の気温データをチェックします。タイムスタンプ句を使用すると、現在のデータを変更せずに戻ることができます。タイムスタンプ値は、次の形式の文字列として指定されることに注意してください。 yyyy-MM-dd HH:mm:ss.fff.

クラスタリングによるクエリ速度の最適化

クエリのパフォーマンスを向上させるには、次のことを実行できます。 クラスタリング Spark for Athena で SQL を使用する Hudi テーブル:

%%sql
CALL run_clustering(table => 'hudidb.noaa_hudi', order => 'name')

コンパクトなテーブル

コンパクションは、Hudi が特に Merge On Read (MOR) テーブルで使用するテーブル サービスで、行ベースのログ ファイルから対応する列ベースのベース ファイルに更新を定期的にマージして、新しいバージョンのベース ファイルを生成します。圧縮はコピー オン ライト (COW) テーブルには適用されず、MOR テーブルにのみ適用されます。 Spark for Athena で次のクエリを実行して、MOR テーブルで圧縮を実行できます。

%%sql
CALL run_compaction(op => 'run', table => 'hudi_table_mor');

テーブルとデータベースを削除します

次の Spark SQL を実行して、作成した Hudi テーブルと関連データを Amazon S3 の場所から削除します。

%%sql
DROP TABLE hudidb.noaa_hudi PURGE

次の Spark SQL を実行してデータベースを削除します hudidb:

%%sql
DROP DATABASE hudidb

Spark for Athena を使用して Hudi テーブルで実行できるすべての操作については、「」を参照してください。 SQL DDL & 手順 Hudi のドキュメントに記載されています。

Linux 基盤の Delta Lake テーブルの操作

次に、Spark for Athena で SQL を使用して Delta Lake テーブルを作成、分析、管理する方法を示します。

ノートブックセッションをセットアップする

Spark for Athena で Delta Lake を使用するには、セッションの作成または編集中に、 Linux Foundation デルタレイク 展開することにより Apache Spark のプロパティ のセクションから無料でダウンロードできます。

この画像は、Athena で Spak セッションを作成するときに設定された Delta Lake プロパティを示しています。

手順については、を参照してください。 セッションの詳細の編集 or 自分だけのノートを作成する.

このセクションで使用されるコードは、 SparkSQL_delta.ipynb ファイルを参照してください。

データベースと Delta Lake テーブルを作成する

このセクションでは、AWS Glue データ カタログにデータベースを作成します。次の SQL を使用して、というデータベースを作成できます。 deltalakedb:

%%sql
CREATE DATABASE deltalakedb

次にデータベース内で deltalakedbという名前の Delta Lake テーブルを作成します。 noaa_delta データをロードする Amazon S3 内の場所を指します。次のステートメントを実行し、場所を置き換えます s3://<your-S3-bucket>/<prefix>/ S3 バケットとプレフィックスを使用して:

%%sql
CREATE TABLE deltalakedb.noaa_delta(
station string,
date string,
latitude string,
longitude string,
elevation string,
name string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string)
USING delta
PARTITIONED BY (year string)
LOCATION 's3://<your-S3-bucket>/<prefix>/noaadelta/'

テーブルにデータを挿入する

私たちは使用します INSERT INTO からデータを読み取ってテーブルにデータを設定するステートメント sparkblogdb.noaa_pq 前回の投稿で作成したテーブル:

%%sql
INSERT INTO deltalakedb.noaa_delta select * from sparkblogdb.noaa_pq

CREATE TABLE AS SELECT を使用して Delta Lake テーブルを作成し、1 つのクエリでソース テーブルからデータを挿入することもできます。

Delta Lake テーブルをクエリする

データが Delta Lake テーブルに挿入されたので、分析を開始できます。 Spark SQL を実行して、最低記録温度を見つけてみましょう。 'SEATTLE TACOMA AIRPORT, WA US' 場所:

%%sql
select name, year, max(MAX) as minimum_temperature
from deltalakedb.noaa_delta
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

デルタレイクテーブルのデータを更新する

駅名を変えましょう 'SEATTLE TACOMA AIRPORT, WA US' 〜へ 'Sea–Tac'。実行できます UPDATE Spark for Athena のレコードを更新するためのステートメント noaa_delta テーブル:

%%sql
UPDATE deltalakedb.noaa_delta
SET name = 'Sea-Tac'
WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'

前の SELECT クエリを実行して、その場所で記録された最低温度を見つけることができます。 'Sea-Tac' 場所を指定すると、結果は前と同じになるはずです。

%%sql
select name, year, max(MAX) as minimum_temperature
from deltalakedb.noaa_delta
where name = 'Sea-Tac'
group by 1,2

コンパクトなデータファイル

Spark for Athena では、Delta Lake テーブルで OPTIMIZE を実行できます。これにより、小さなファイルが大きなファイルに圧縮されるため、小さなファイルのオーバーヘッドによってクエリに負担がかかりません。圧縮操作を実行するには、次のクエリを実行します。

%%sql
OPTIMIZE deltalakedb.noaa_delta

参照する 最適化 OPTIMIZE の実行中に使用できるさまざまなオプションについては、Delta Lake のドキュメントを参照してください。

Delta Lake テーブルによって参照されなくなったファイルを削除する

Spark for Athena を使用してテーブルに対して VACCUM コマンドを実行することで、Delta Lake テーブルで参照されなくなった、保持しきい値より古い Amazon S3 に保存されているファイルを削除できます。

%%sql
VACUUM deltalakedb.noaa_delta

参照する デルタテーブルによって参照されなくなったファイルを削除します VACUUM で使用できるオプションについては、Delta Lake のドキュメントを参照してください。

テーブルとデータベースを削除します

次の Spark SQL を実行して、作成した Delta Lake テーブルを削除します。

%%sql
DROP TABLE deltalakedb.noaa_delta

次の Spark SQL を実行してデータベースを削除します deltalakedb:

%%sql
DROP DATABASE deltalakedb

Delta Lake テーブルとデータベースで DROP TABLE DDL を実行すると、これらのオブジェクトのメタデータが削除されますが、Amazon S3 内のデータ ファイルは自動的に削除されません。ノートブックのセルで次の Python コードを実行して、S3 の場所からデータを削除できます。

import boto3

s3 = boto3.resource('s3')
bucket = s3.Bucket('<your-S3-bucket>')
bucket.objects.filter(Prefix="<prefix>/noaadelta/").delete()

Spark for Athena を使用して Delta Lake テーブルで実行できる SQL ステートメントの詳細については、「 クイックスタート Delta Lake のドキュメントに記載されています。

まとめ

この投稿では、Athena ノートブックで Spark SQL を使用してデータベースとテーブルを作成し、データの挿入とクエリを実行し、Hudi、Delta Lake、Iceberg テーブルで更新、圧縮、タイムトラベルなどの一般的な操作を実行する方法を説明しました。オープン テーブル フォーマットは、ACID トランザクション、アップサート、削除をデータ レイクに追加し、生のオブジェクト ストレージの制限を克服します。 Spark on Athena の組み込み統合により、個別のコネクタをインストールする必要がなくなるため、Amazon S3 で信頼性の高いデータレイクを構築するためのこれらの一般的なフレームワークを使用する際の設定手順と管理オーバーヘッドが削減されます。データ レイク ワークロードにオープン テーブル形式を選択する方法の詳細については、次を参照してください。 AWS のトランザクション データ レイクにオープン テーブル形式を選択する.


著者について

パティック・シャー Amazon Athena のシニア分析アーキテクトです。 彼は 2015 年に AWS に入社し、それ以来ビッグデータ分析分野に注力し、AWS 分析サービスを使用して顧客がスケーラブルで堅牢なソリューションを構築できるよう支援しています。

ラジ・デヴナス は、Amazon Athena の AWS のプロダクト マネージャーです。 彼は、顧客に愛される製品を構築し、顧客がデータから価値を抽出できるよう支援することに情熱を注いでいます。 彼の経歴は、金融、小売、スマート ビルディング、ホーム オートメーション、データ通信システムなどの複数の最終市場向けのソリューションの提供にあります。

スポット画像

最新のインテリジェンス

スポット画像