ゼファーネットのロゴ

Amazon EMR で Apache Flink を使用してデータレイクを構築する

日付:

データ駆動型のビジネスを構築するには、データ カタログ内のエンタープライズ データ資産を民主化することが重要です。 統合されたデータ カタログを使用すると、データセットをすばやく検索して、データ スキーマ、データ形式、場所を把握できます。 の AWSGlueデータカタログ 異なるシステムがメタデータを保存および検索して、データサイロ内のデータを追跡できる統一リポジトリを提供します。

ApacheFlink は、スケーラブルなストリーミング ETL、分析、およびイベント駆動型アプリケーションに広く使用されているデータ処理エンジンです。 耐障害性を備えた正確な時間と状態の管理を提供します。 Flink は、統合された API またはアプリケーションを使用して、バインドされたストリーム (バッチ) と無制限のストリーム (ストリーム) を処理できます。 データが Apache Flink で処理された後、ダウンストリーム アプリケーションは、統合されたデータ カタログを使用してキュレートされたデータにアクセスできます。 統一されたメタデータを使用すると、データ処理アプリケーションとデータ消費アプリケーションの両方が、同じメタデータを使用してテーブルにアクセスできます。

この投稿では、Amazon EMR の Apache Flink を AWS Glue Data Catalog と統合して、リアルタイムでストリーミング データを取り込み、ビジネス分析のためにほぼリアルタイムでデータにアクセスできるようにする方法を示します。

Apache Flink コネクタとカタログ アーキテクチャ

Apache Flink は、コネクタとカタログを使用して、データとメタデータを操作します。 次の図は、データの読み取り/書き込み用の Apache Flink コネクタと、メタデータの読み取り/書き込み用のカタログのアーキテクチャを示しています。

Flink Glue アーキテクチャ

データの読み取り/書き込みのために、Flink にはインターフェースがあります DynamicTableSourceFactory 読み取り用と DynamicTableSinkFactory 書き込み用。 別の Flink コネクタは、異なるストレージ内のデータにアクセスするための XNUMX つのインターフェイスを実装します。 たとえば、Flink FileSystem コネクタには FileSystemTableFactory Hadoop Distributed File System (HDFS) または Amazon シンプル ストレージ サービス (Amazon S3)、Flink HBase コネクタには HBase2DynamicTableFactory HBase でデータの読み取り/書き込みを行い、Flink Kafka コネクタには KafkaDynamicTableFactory Kafka でデータを読み書きする。 参照できます テーブルと SQL コネクタ

メタデータの読み取り/書き込みのために、Flink にはカタログ インターフェースがあります。 Flink には、カタログ用の XNUMX つの組み込み実装があります。 GenericInMemoryCatalog カタログ データをメモリに保存します。 JdbcCatalog JDBC がサポートするリレーショナル データベースにカタログ データを格納します。 この記事の執筆時点では、MySQL および PostgreSQL データベースが JDBC カタログでサポートされています。 HiveCatalog カタログ データを Hive メタストアに格納します。 HiveCatalog 使用されます HiveShim 異なる Hive バージョンの互換性を提供します。 Hive メタストアまたは AWS Glue データ カタログを使用するように、さまざまなメタストア クライアントを設定できます。 この記事では、 アマゾンEMR 財産 hive.metastore.client.factory.class 〜へ com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory (参照してください AWS Glue Data Catalog を Hive のメタストアとして使用する) AWS Glue Data Catalog を使用して Flink カタログ データを保存できるようにします。 参照する カタログ

Kafka などのほとんどの Flink 組み込みコネクタ アマゾンキネシス, Amazon DynamoDB、Elasticsearch、または FileSystem は、Flink を使用できます HiveCatalog メタデータを AWS Glue データカタログに保存します。 ただし、Apache Iceberg などの一部のコネクタ実装には、独自のカタログ管理メカニズムがあります。 FlinkCatalog in Iceberg は、Flink でカタログ インターフェイスを実装します。 FlinkCatalog in Iceberg には、独自のカタログ実装へのラッパーがあります。 次の図は、Apache Flink、Iceberg コネクタ、およびカタログの関係を示しています。 詳細については、次を参照してください。 カタログの作成とカタログの使用 & カタログ.

Flink Iceberg Glue アーキテクチャ

Apache Hudi には、独自のカタログ管理機能もあります。 両方 HoodieCatalog & HoodieHiveCatalog Flink でカタログ インターフェイスを実装します。 HoodieCatalog HDFS などのファイル システムにメタデータを格納します。 HoodieHiveCatalog 設定するかどうかに応じて、Hive メタストアまたは AWS Glue データカタログにメタデータを保存します hive.metastore.client.factory.class 使用する com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory. 次の図は、Apache Flink、Hudi コネクタ、およびカタログの関係を示しています。 詳細については、次を参照してください。 カタログを作成.

Flink Hudi Glue アーキテクチャ

Iceberg と Hudi はカタログ管理メカニズムが異なるため、この投稿では Flink と AWS Glue Data Catalog の統合の XNUMX つのシナリオを示します。

  • Glue Data Catalog のメタデータを使用して Flink の Iceberg テーブルに読み取り/書き込み
  • Glue Data Catalog のメタデータを使用した Flink の Hudi テーブルへの読み取り/書き込み
  • Glue Data Catalog のメタデータを使用して、Flink の他のストレージ形式に読み取り/書き込み

ソリューションの概要

次の図は、この投稿で説明したソリューションの全体的なアーキテクチャを示しています。

Flink グルーの統合

このソリューションでは、 MySQL用AmazonRDS binlog を使用して、トランザクションの変更をリアルタイムで抽出します。 Amazon EMR Flink CDC コネクタは、binlog データを読み取り、データを処理します。 変換されたデータは Amazon S3 に保存できます。 AWS Glue Data Catalog を使用して、テーブル スキーマやテーブルの場所などのメタデータを保存します。 次のようなダウンストリーム データ コンシューマ アプリケーション アマゾンアテナ または Amazon EMR Trino は、ビジネス分析のためにデータにアクセスします。

以下は、このソリューションをセットアップする大まかな手順です。

  1. 有効にします binlog Amazon RDS for MySQL の場合、データベースを初期化します。
  2. AWS Glue データカタログを使用して EMR クラスターを作成します。
  3. Amazon EMR の Apache Flink CDC を使用して変更データキャプチャ (CDC) データを取り込みます。
  4. 処理されたデータをメタデータとともに Amazon S3 に保存し、AWS Glue Data Catalog に保存します。
  5. すべてのテーブル メタデータが AWS Glue データ カタログに保存されていることを確認します。
  6. ビジネス分析のために Athena または Amazon EMR Trino でデータを消費します。
  7. Amazon RDS for MySQL のソース レコードを更新および削除し、データ レイク テーブルの反映を検証します。

前提条件

この投稿では AWS IDおよびアクセス管理 次のサービスに対するアクセス許可を持つ (IAM) ロール:

  • Amazon RDS for MySQL (5.7.40)
  • アマゾン EMR (6.9.0)
  • アマゾンアテナ
  • AWSGlueデータカタログ
  • アマゾンS3

Amazon RDS for MySQL の binlog を有効にしてデータベースを初期化する

Amazon RDS for MySQL で CDC を有効にするには、Amazon RDS for MySQL のバイナリログを設定する必要があります。 参照する MySQL バイナリ ログの構成 詳細については。 データベースも作成します salesdb MySQLでテーブルを作成します customer, orderなどを使用して、データ ソースを設定します。

  1. Amazon RDS コンソールで、 パラメータグループ ナビゲーションペインに表示されます。
  2. MySQL の新しいパラメーター グループを作成します。
  3. 作成したパラメータ グループを編集して設定します binlog_format=ROW.

RDS-Binlog-フォーマット

  1. 作成したパラメータ グループを編集して設定します binlog_row_image=full.

RDS-Binlog-Row-Image

  1. パラメータグループを使用して RDS for MySQL DB インスタンスを作成します。
  2. の値を書き留めます。 hostname, username, password、後で使用します。
  3. 次のコマンドを実行して、Amazon S3 から MySQL データベース初期化スクリプトをダウンロードします。
aws s3 cp s3://emr-workshops-us-west-2/glue_immersion_day/scripts/salesdb.sql ./salesdb.sql

  1. RDS for MySQL データベースに接続し、 salesdb.sql コマンドを実行してデータベースを初期化し、RDS for MySQL データベース構成に従ってホスト名とユーザー名を指定します。
mysql -h <hostname> -u <username> -p
mysql> source salesdb.sql

AWS Glue データカタログを使用して EMR クラスターを作成する

Amazon EMR 6.9.0 から、Flink テーブル API/SQL は AWS Glue Data Catalog と統合できます。 Flink と AWS Glue の統合を使用するには、Amazon EMR 6.9.0 以降のバージョンを作成する必要があります。

  1. ファイルを作成する iceberg.properties Amazon EMR Trino と Data Catalog の統合用。 テーブル形式が Iceberg の場合、ファイルには次の内容が含まれている必要があります。
iceberg.catalog.type=glue
connector.name=iceberg

  1. アップロード iceberg.properties たとえば、S3バケットに DOC-EXAMPLE-BUCKET.

Amazon EMR Trino と Iceberg を統合する方法の詳細については、次を参照してください。 Trino で Iceberg クラスターを使用する.

  1. ファイルを作成する trino-glue-catalog-setup.sh Trino と Data Catalog の統合を構成します。 使用する trino-glue-catalog-setup.sh ブートストラップ スクリプトとして。 ファイルには次の内容が含まれている必要があります ( DOC-EXAMPLE-BUCKET S3バケット名を使用):
set -ex sudo aws s3 cp s3://DOC-EXAMPLE-BUCKET/iceberg.properties /etc/trino/conf/catalog/iceberg.properties

  1. アップロード trino-glue-catalog-setup.sh S3 バケットに (DOC-EXAMPLE-BUCKET).

参照する 追加のソフトウェアをインストールするためのブートストラップ アクションを作成する ブートストラップ スクリプトを実行します。

  1. ファイルを作成する flink-glue-catalog-setup.sh Flink と Data Catalog の統合を構成します。
  2. スクリプト ランナーを使用して、 flink-glue-catalog-setup.sh ステップ関数としてのスクリプト。

ファイルには次の内容が含まれている必要があります (ここでの JAR ファイル名は Amazon EMR 6.9.0 を使用しています。それ以降のバージョンの JAR 名は変更される可能性があるため、Amazon EMR のバージョンに従って必ず更新してください)。

ここでは、ブートストラップではなく Amazon EMR ステップを使用して、このスクリプトを実行していることに注意してください。 Amazon EMR Flink がプロビジョニングされた後、Amazon EMR ステップ スクリプトが実行されます。

set -ex sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib
sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib
sudo cp /usr/lib/hive/lib/hive-exec.jar /lib/flink/lib
sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib
sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib
sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar
sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar
sudo chmod 755 /usr/lib/flink/lib/hive-exec.jar
sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar
sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar sudo wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar -O /lib/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar
sudo chmod 755 /lib/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar sudo ln -s /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar /usr/lib/flink/lib/
sudo ln -s /usr/lib/hudi/hudi-flink-bundle.jar /usr/lib/flink/lib/ sudo mv /usr/lib/flink/opt/flink-table-planner_2.12-1.15.2.jar /usr/lib/flink/lib/
sudo mv /usr/lib/flink/lib/flink-table-planner-loader-1.15.2.jar /usr/lib/flink/opt/

  1. アップロード flink-glue-catalog-setup.sh S3 バケットに (DOC-EXAMPLE-BUCKET).

参照する Amazon EMR で Flink to Hive メタストアを設定する Flink および Hive メタストアの構成方法の詳細については、 を参照してください。 参照する Amazon EMR クラスターでコマンドとスクリプトを実行する Amazon EMR ステップ スクリプトの実行の詳細については、

  1. アプリケーション Hive、Flink、および Trino を使用して EMR 6.9.0 クラスターを作成します。

を使用して EMR クラスターを作成できます。 AWSコマンドラインインターフェイス (AWS CLI)または AWSマネジメントコンソール. 手順については、該当するサブセクションを参照してください。

AWS CLI で EMR クラスターを作成する

AWS CLI を使用するには、次の手順を完了します。

  1. ファイルを作成する emr-flink-trino-glue.json Data Catalog を使用するように Amazon EMR を設定します。 ファイルには次の内容が含まれている必要があります。
[
{ "Classification": "hive-site", "Properties": { "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
},
{ "Classification": "trino-connector-hive", "Properties": { "hive.metastore": "glue"
}
}
]

  1. 次のコマンドを実行して、EMR クラスターを作成します。 あなたの地元を提供してください emr-flink-trino-glue.json 親フォルダーのパス、S3 バケット、EMR クラスターのリージョン、EC2 キー名、および EMR ログの S3 バケット。
aws emr create-cluster --release-label emr-6.9.0 --applications Name=Hive Name=Flink Name=Spark Name=Trino --region us-west-2 --name flink-trino-glue-emr69 --configurations "file:///<your configuration path>/emr-flink-trino-glue.json" --bootstrap-actions '[{"Path":"s3://DOC-EXAMPLE-BUCKET/trino-glue-catalog-setup.sh","Name":"Add iceberg.properties for Trino"}]' --steps '[{"Args":["s3://DOC-EXAMPLE-BUCKET/flink-glue-catalog-setup.sh"],"Type":"CUSTOM_JAR","ActionOnFailure":"CONTINUE","Jar":"s3://<region>.elasticmapreduce/libs/script-runner/script-runner.jar","Properties":"","Name":"Flink-glue-integration"}]' --instance-groups InstanceGroupType=MASTER,InstanceType=m6g.2xlarge,InstanceCount=1 
InstanceGroupType=CORE,InstanceType=m6g.2xlarge,InstanceCount=2 
--use-default-roles --ebs-root-volume-size 30 --ec2-attributes KeyName=<keyname> 
--log-uri s3://<s3-bucket-for-emr>/elasticmapreduce/

コンソールで EMR クラスターを作成する

コンソールを使用するには、次の手順を実行します。

  1. Amazon EMR コンソールで、EMR クラスターを作成し、 Hive テーブルのメタデータに使用 for AWSGlueデータカタログ 設定を行います。
  2. 次のコードを使用して構成設定を追加します。
[
{ "Classification": "trino-connector-hive", "Properties": { "hive.metastore": "glue"
}
}
]

EMR-6.9-フリンクハイブグルー-1

  1. ステップ セクションに、というステップを追加します。 カスタム JAR.
  2. 作成セッションプロセスで JAR の場所 〜へ s3://<region>.elasticmapreduce/libs/script-runner/script-runner.jarここで、 EMR クラスターが存在するリージョンです。
  3. 作成セッションプロセスで Arguments 以前にアップロードした S3 パスに。

EMR-6.9-フリンクハイブグルー-2

  1. ブートストラップアクション セクションでは、選択 カスタムアクション.
  2. 作成セッションプロセスで スクリプトの場所 アップロードした S3 パスに。

EMR-6.9-フリンクハイブグルー-3

  1. 以降の手順を続行して、EMR クラスターの作成を完了します。

Amazon EMR で Apache Flink CDC を使用して CDC データを取り込む

Flink CDC コネクタは、データベース スナップショットの読み取りをサポートし、構成されたテーブルの更新をキャプチャします。 ダウンロードにより、MySQL 用の Flink CDC コネクタをデプロイしました。 flink-sql-connector-mysql-cdc-2.2.1.jar EMR クラスターを作成するときに Flink ライブラリに入れます。 Flink CDC コネクタは、Flink Hive カタログを使用して、Flink CDC テーブル スキーマを Hive メタストアまたは AWS Glue データ カタログに保存できます。 この投稿では、Data Catalog を使用して Flink CDC テーブルを保存します。

Flink CDC を使用して RDS for MySQL データベースとテーブルを取り込み、Data Catalog にメタデータを保存するには、次の手順を実行します。

  1. EMR プライマリ ノードに SSH で接続します。
  2. 次のコマンドを実行し、S3 バケット名を指定して、YARN セッションで Flink を開始します。
flink-yarn-session -d -jm 2048 -tm 4096 -s 2 -D state.backend=rocksdb -D state.backend.incremental=true -D state.checkpoint-storage=filesystem -D state.checkpoints.dir=s3://<flink-glue-integration-bucket>/flink-checkponts/ -D state.checkpoints.num-retained=10 -D execution.checkpointing.interval=10s -D execution.checkpointing.mode=EXACTLY_ONCE -D execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION -D execution.checkpointing.max-concurrent-checkpoints=1

  1. 次のコマンドを実行して、Flink SQL クライアント CLI を開始します。
/usr/lib/flink/bin/sql-client.sh embedded

  1. カタログ タイプを次のように指定して、Flink Hive カタログを作成します。 hive S3 バケット名を指定します。
CREATE CATALOG glue_catalog WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/etc/hive/conf.dist'
);
USE CATALOG glue_catalog;
CREATE DATABASE IF NOT EXISTS flink_cdc_db WITH ('hive.database.location-uri'= 's3://<flink-glue-integration-bucket>/flink-glue-for-hive/warehouse/')
use flink_cdc_db;

AWS Glue Data Catalog を使用して EMR Hive カタログを構成しているため、Flink Hive カタログで作成されたすべてのデータベースとテーブルは Data Catalog に保存されます。

  1. 前に作成した RDS for MySQL インスタンスのホスト名、ユーザー名、およびパスワードを指定して、Flink CDC テーブルを作成します。

RDS for MySQL のユーザー名とパスワードはテーブル プロパティとしてデータ カタログに保存されるため、有効にする必要があることに注意してください。 AWS Lake Formation による AWS Glue データベース/テーブル認証 機密データを保護します。

CREATE TABLE `glue_catalog`.`flink_cdc_db`.`customer_cdc` (
`CUST_ID` double NOT NULL,
`NAME` STRING NOT NULL,
`MKTSEGMENT` STRING NOT NULL,
PRIMARY KEY (`CUST_ID`) NOT ENFORCED
) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '<hostname>', 'port' = '3306', 'username' = '<username>', 'password' = '<password>', 'database-name' = 'salesdb', 'table-name' = 'CUSTOMER'
); CREATE TABLE `glue_catalog`.`flink_cdc_db`.`customer_site_cdc` (
`SITE_ID` double NOT NULL,
`CUST_ID` double NOT NULL,
`ADDRESS` STRING NOT NULL,
`CITY` STRING NOT NULL,
`STATE` STRING NOT NULL,
`COUNTRY` STRING NOT NULL,
`PHONE` STRING NOT NULL,
PRIMARY KEY (`SITE_ID`) NOT ENFORCED
) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '<hostname>', 'port' = '3306', 'username' = '<username>', 'password' = '<password>', 'database-name' = 'salesdb', 'table-name' = 'CUSTOMER_SITE'
); CREATE TABLE `glue_catalog`.`flink_cdc_db`.`sales_order_all_cdc` (
`ORDER_ID` int NOT NULL,
`SITE_ID` double NOT NULL,
`ORDER_DATE` TIMESTAMP NOT NULL,
`SHIP_MODE` STRING NOT NULL
) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '<hostname>', 'port' = '3306', 'username' = '<username>', 'password' = '<password>', 'database-name' = 'salesdb', 'table-name' = 'SALES_ORDER_ALL', 'scan.incremental.snapshot.enabled' = 'FALSE'
);

  1. 作成したテーブルに対してクエリを実行します。
SELECT count(O.ORDER_ID) AS ORDER_COUNT,
C.CUST_ID,
C.NAME,
C.MKTSEGMENT
FROM   customer_cdc C
JOIN customer_site_cdc CS
ON C.CUST_ID = CS.CUST_ID
JOIN sales_order_all_cdc O
ON O.SITE_ID = CS.SITE_ID
GROUP  BY C.CUST_ID,
C.NAME,
C.MKTSEGMENT;

次のスクリーンショットのようなクエリ結果が得られます。

Flink-SQL-CDC-テスト

処理されたデータを Amazon S3 に保存し、メタデータを Data Catalog に保存する

Amazon RDS for MySQL でリレーショナル データベース データを取り込んでいるため、生データが更新または削除される場合があります。 データの更新と削除をサポートするために、Apache Iceberg や Apache Hudi などのデータ レイク テクノロジを選択して、処理されたデータを保存できます。 前述したように、Iceberg と Hudi ではカタログ管理が異なります。 Flink を使用して、AWS Glue データカタログのメタデータを含む Iceberg テーブルと Hudi テーブルを読み書きする両方のシナリオを示します。

非 Iceberg および非 Hudi の場合、FileSystem Parquet ファイルを使用して、Flink 組み込みコネクタがデータ カタログを使用する方法を示します。

Glue Data Catalog のメタデータを使用して Flink の Iceberg テーブルに読み取り/書き込み

次の図は、この構成のアーキテクチャを示しています。

Iceberg の Flink Glue 統合

  1. 指定することにより、Data Catalog を使用して Flink Iceberg カタログを作成します。 catalog-impl as org.apache.iceberg.aws.glue.GlueCatalog.

Iceberg の Flink と Data Catalog の統合の詳細については、次を参照してください。 のりカタログ.

  1. Flink SQL クライアント CLI で、S3 バケット名を指定して次のコマンドを実行します。
CREATE CATALOG glue_catalog_for_iceberg WITH ( 'type'='iceberg', 'warehouse'='s3://<flink-glue-integration-bucket>/flink-glue-for-iceberg/warehouse/', 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO', 'lock-impl'='org.apache.iceberg.aws.glue.DynamoLockManager', 'lock.table'='FlinkGlue4IcebergLockTable' );

  1. 処理されたデータを格納する Iceberg テーブルを作成します。
USE CATALOG glue_catalog_for_iceberg;
CREATE DATABASE IF NOT EXISTS flink_glue_iceberg_db;
USE flink_glue_iceberg_db;
CREATE TABLE `glue_catalog_for_iceberg`.`flink_glue_iceberg_db`.`customer_summary` (
`CUSTOMER_ID` bigint,
`NAME` STRING,
`MKTSEGMENT` STRING,
`COUNTRY` STRING,
`ORDER_COUNT` BIGINT,
PRIMARY KEY (`CUSTOMER_ID`) NOT Enforced
)
WITH ( 'format-version'='2', 'write.upsert.enabled'='true');

  1. 処理されたデータを Iceberg に挿入します。
INSERT INTO `glue_catalog_for_iceberg`.`flink_glue_iceberg_db`.`customer_summary`
SELECT CAST(C.CUST_ID AS BIGINT) CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY,
count(O.ORDER_ID) AS ORDER_COUNT
FROM   `glue_catalog`.`flink_cdc_db`.`customer_cdc` C
JOIN `glue_catalog`.`flink_cdc_db`.`customer_site_cdc` CS
ON C.CUST_ID = CS.CUST_ID
JOIN `glue_catalog`.`flink_cdc_db`.`sales_order_all_cdc` O
ON O.SITE_ID = CS.SITE_ID
GROUP  BY C.CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY;

Glue Data Catalog のメタデータを使用した Flink の Hudi テーブルへの読み取り/書き込み

次の図は、この構成のアーキテクチャを示しています。

Hudi の Flink Glue 統合

次の手順を完了します。

  1. 指定して、Hive カタログを使用する Hudi のカタログを作成します。 mode as hms.

EMR クラスターを作成したときに Data Catalog を使用するように Amazon EMR を設定済みであるため、この Hudi Hive カタログは内部で Data Catalog を使用します。 Hudi の Flink と Data Catalog の統合の詳細については、次を参照してください。 カタログを作成.

  1. Flink SQL クライアント CLI で、S3 バケット名を指定して次のコマンドを実行します。
CREATE CATALOG glue_catalog_for_hudi WITH ( 'type' = 'hudi', 'mode' = 'hms', 'table.external' = 'true', 'default-database' = 'default', 'hive.conf.dir' = '/etc/hive/conf.dist', 'catalog.path' = 's3://<flink-glue-integration-bucket>/flink-glue-for-hudi/warehouse/'
);

  1. Data Catalog を使用して Hudi テーブルを作成し、S3 バケット名を指定します。
USE CATALOG glue_catalog_for_hudi;
CREATE DATABASE IF NOT EXISTS flink_glue_hudi_db;
use flink_glue_hudi_db;
CREATE TABLE `glue_catalog_for_hudi`.`flink_glue_hudi_db`.`customer_summary` (
`CUSTOMER_ID` bigint,
`NAME` STRING,
`MKTSEGMENT` STRING,
`COUNTRY` STRING,
`ORDER_COUNT` BIGINT,
PRIMARY KEY (`CUSTOMER_ID`) NOT Enforced
)
WITH ( 'connector' = 'hudi', 'write.tasks' = '4', 'path' = 's3://<flink-glue-integration-bucket>/flink-glue-for-hudi/warehouse/customer_summary', 'table.type' = 'COPY_ON_WRITE', 'read.streaming.enabled' = 'true', 'read.streaming.check-interval' = '1'
);

  1. 処理されたデータを Hudi に挿入します。
INSERT INTO `glue_catalog_for_hudi`.`flink_glue_hudi_db`.`customer_summary`
SELECT CAST(C.CUST_ID AS BIGINT) CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY,
count(O.ORDER_ID) AS ORDER_COUNT
FROM   `glue_catalog`.`flink_cdc_db`.`customer_cdc` C
JOIN `glue_catalog`.`flink_cdc_db`.`customer_site_cdc` CS
ON C.CUST_ID = CS.CUST_ID
JOIN `glue_catalog`.`flink_cdc_db`.`sales_order_all_cdc` O
ON O.SITE_ID = CS.SITE_ID
GROUP  BY C.CUST_ID,
C.NAME,
C.MKTSEGMENT,
CS.COUNTRY;

Glue Data Catalog のメタデータを使用して、Flink の他のストレージ形式に読み取り/書き込み

次の図は、この構成のアーキテクチャを示しています。

Parquet の Flink Glue 統合

前のステップで既に Flink Hive カタログを作成しているので、そのカタログを再利用します。

  1. Flink SQL クライアント CLI で、次のコマンドを実行します。
USE CATALOG glue_catalog;
CREATE DATABASE IF NOT EXISTS flink_hive_parquet_db;
use flink_hive_parquet_db;

SQL ダイアレクトを Hive に変更して、Hive 構文でテーブルを作成します。

  1. 次の SQL でテーブルを作成し、S3 バケット名を指定します。
SET table.sql-dialect=hive; CREATE TABLE `customer_summary` (
`CUSTOMER_ID` bigint,
`NAME` STRING,
`MKTSEGMENT` STRING,
`COUNTRY` STRING,
`ORDER_COUNT` BIGINT
)
STORED AS parquet
LOCATION 's3://<flink-glue-integration-bucket>/flink-glue-for-hive-parquet/warehouse/customer_summary';

Parquet ファイルは更新された行をサポートしていないため、CDC データからデータを使用することはできません。 ただし、Iceberg または Hudi からのデータを使用することはできます。

  1. 次のコードを使用して、Iceberg テーブルにクエリを実行し、Parquet テーブルにデータを挿入します。
SET table.sql-dialect=default;
SET execution.runtime-mode = batch;
INSERT INTO `glue_catalog`.`flink_hive_parquet_db`.`customer_summary`
SELECT * from `glue_catalog_for_iceberg`.`flink_glue_iceberg_db`.`customer_summary`;

すべてのテーブル メタデータが Data Catalog に格納されていることを確認する

AWS Glue コンソールに移動して、すべてのテーブルが Data Catalog に保存されていることを確認できます。

  1. AWS Glue コンソールで、選択します データベース をクリックして、作成したすべてのデータベースを一覧表示します。

グルーデータベース

  1. データベースを開き、すべてのテーブルがそのデータベースにあることを確認します。

グルーテーブル

ビジネス分析のために Athena または Amazon EMR Trino でデータを消費する

Athena または Amazon EMR Trino を使用して、結果データにアクセスできます。

Athena を使用してデータをクエリする

Athena を使用してデータにアクセスするには、次の手順を実行します。

  1. Athena クエリ エディターを開きます。
  2. 選択する flink_glue_iceberg_db for データベース.

あなたは customer_summary テーブルがリストされています。

  1. 次の SQL スクリプトを実行して、Iceberg 結果テーブルを照会します。
select * from customer_summary order by order_count desc limit 10

クエリの結果は、次のスクリーンショットのようになります。

アテナ-アイスバーグ-クエリ

  1. Hudi テーブルの場合は、変更します データベース 〜へ flink_glue_hudi_db 同じ SQL クエリを実行します。

Athena-Hudi-クエリ

  1. Parquet テーブルの場合は、変更します データベース 〜へ flink_hive_parquet_db 同じ SQL クエリを実行します。

Athena-Parquet-クエリ

Amazon EMR Trino でデータをクエリする

Amazon EMR Trino で Iceberg にアクセスするには、EMR プライマリ ノードに SSH 接続します。

  1. 次のコマンドを実行して、Trino CLI を開始します。
trino-cli --catalog iceberg

Amazon EMR Trino は、AWS Glue データカタログのテーブルをクエリできるようになりました。

  1. 次のコマンドを実行して、結果テーブルをクエリします。
show schemas;
use flink_glue_iceberg_db;
show tables;
select * from customer_summary order by order_count desc limit 10;

クエリの結果は、次のスクリーンショットのようになります。

EMR-Trino-Iceberg-クエリ

  1. Trino CLI を終了します。
  2. Trino CLI を開始します。 hive Hudi テーブルを照会するためのカタログ:
trino-cli --catalog hive

  1. 次のコマンドを実行して、Hudi テーブルを照会します。
show schemas;
use flink_glue_hudi_db;
show tables;
select * from customer_summary order by order_count desc limit 10;

Amazon RDS for MySQL のソース レコードを更新および削除し、データ レイク テーブルの反映を検証する

RDS for MySQL データベースの一部のレコードを更新および削除し、変更が Iceberg および Hudi テーブルに反映されていることを確認できます。

  1. RDS for MySQL データベースに接続し、次の SQL を実行します。
update CUSTOMER set NAME = 'updated_name' where CUST_ID=7; delete from CUSTOMER where CUST_ID=11;

  1. クエリ customer_summary Athena または Amazon EMR Trino を使用したテーブル。

更新および削除されたレコードは、Iceberg および Hudi テーブルに反映されます。

Athena-Iceberg-Query-Updated

クリーンアップ

この演習が終了したら、次の手順を実行してリソースを削除し、コストの発生を防ぎます。

  1. RDS for MySQL データベースを削除します。
  2. EMR クラスターを削除します。
  3. Data Catalog で作成されたデータベースとテーブルを削除します。
  4. Amazon S3 のファイルを削除します。

まとめ

この投稿では、Amazon EMR の Apache Flink を AWS Glue データカタログと統合する方法を示しました。 Flink SQL コネクタを使用して、Kafka、CDC、HBase、Amazon S3、Iceberg、または Hudi などの別のストアでデータを読み書きできます。 メタデータを Data Catalog に保存することもできます。 Flink テーブル API には、同じコネクタとカタログの実装メカニズムがあります。 XNUMX つのセッションで、さまざまなタイプを指す複数のカタログ インスタンスを使用できます。 IcebergCatalog & HiveCatalog、クエリで then を同じ意味で使用します。 Flink テーブル API を使用してコードを記述し、Flink と Data Catalog を統合する同じソリューションを開発することもできます。

私たちのソリューションでは、RDS for MySQL バイナリ ログを Flink CDC で直接使用しました。 使用することもできます アマゾンMSKコネクト でバイナリログを消費する MySQL デベジム データを保存します ApacheKafkaのAmazonマネージドストリーミング (アマゾン MSK)。 参照する Amazon MSK Connect、Apache Flink、およびApache Hudiを使用して、低レイテンシのソースからデータレイクへのパイプラインを作成します

Amazon EMR Flink の統合されたバッチおよびストリーミング データ処理機能を使用すると、XNUMX つのコンピューティング エンジンでデータの取り込みと処理を行うことができます。 Amazon EMR に統合された Apache Iceberg と Hudi を使用すると、進化可能でスケーラブルなデータレイクを構築できます。 AWS Glue Data Catalog を使用すると、すべてのエンタープライズ データ カタログを統一された方法で管理し、データを簡単に使用できます。

この投稿の手順に従って、Amazon EMR Flink と AWS Glue データ カタログを使用して統合バッチおよびストリーミング ソリューションを構築します。 ご不明な点がございましたら、コメントを残してください。


著者について

ジャンウェイ・リー シニア アナリティクス スペシャリストの TAM です。 彼は、AWS エンタープライズ サポートの顧客が最新のデータ プラットフォームを設計および構築するためのコンサルタント サービスを提供しています。


サムラット・デブ Amazon EMR のソフトウェア開発エンジニアです。 余暇には、新しい場所、異なる文化、食べ物を探索するのが大好きです。


プラブ ジョセフラジ は、Amazon EMR で働くシニア ソフトウェア開発エンジニアです。 彼は、Apache Hadoop と Apache Flink でソリューションを構築するチームを率いることに専念しています。 余暇には、プラブーは家族と過ごす時間を楽しんでいます。

スポット画像

最新のインテリジェンス

スポット画像