ゼファーネットのロゴ

Amazon Managed Service for Apache Flink は、Apache Flink バージョン 1.18 をサポートするようになりました。アマゾン ウェブ サービス

日付:

ApacheFlink は、オープンソースの分散処理エンジンであり、ストリーム処理とバッチ処理の両方に強力なプログラミング インターフェイスを提供し、ステートフル処理とイベント時間セマンティクスの最上級のサポートを備えています。 Apache Flink は、複数のプログラミング言語、Java、Python、Scala、SQL、および異なる抽象化レベルの複数の API をサポートしており、これらは同じアプリケーション内で互換的に使用できます。

Apache Flink 向け Amazon マネージドサービスApache Flink アプリケーションを実行するフルマネージドのサーバーレス エクスペリエンスを提供する、 Apache フリンク 1.18.1、この記事の執筆時点では Apache Flink の最新バージョンです。

この投稿では、最新のメジャー リリース 1.16、1.17、および 1.18 で導入され、Apache Flink のマネージド サービスでサポートされるようになった、Apache Flink の興味深い新機能のいくつかについて説明します。

新しいコネクタ

バージョン 1.18.1 で利用できる Apache Flink の新機能について詳しく説明する前に、多くの新しいオープン ソース コネクタが利用可能になることで得られる新機能について見てみましょう。

Opensearch

専用 Opensearch コネクタをプロジェクトに含めることができるようになり、Apache Flink アプリケーションが Elasticsearch 互換モードに依存せずに OpenSearch にデータを直接書き込むことができるようになりました。このコネクタは以下と互換性があります AmazonOpenSearchサービス プロビジョニングされ、 OpenSearch サービス サーバーレス.

この新しいコネクタはサポートしています SQL およびテーブル API、Java と Python の両方で動作し、 データストリーム API、Java のみ。追加設定なしで、少なくとも 1 回の保証が提供され、Flink チェックポイントと書き込みが同期されます。確定的 ID と更新/挿入メソッドを使用して、1 回限りのセマンティクスを実現できます。

デフォルトでは、コネクタは OpenSearch バージョン 1.x クライアント ライブラリを使用します。次の方法でバージョン 2.x に切り替えることができます。 正しい依存関係を追加する.

Amazon DynamoDB

Apache Flink 開発者は、専用コネクタを使用してデータを書き込むことができるようになりました。 Amazon DynamoDB。このコネクタは、 Apache Flink AsyncSinkAWS によって開発され、現在は Apache Flink プロジェクトの不可欠な部分となっており、ノンブロッキング書き込みリクエストと適応型バッチ処理を使用して効率的なシンク コネクタの実装を簡素化します。

このコネクタは両方ともサポートしています SQLとテーブル API、Java および Python、および データストリーム API、Java のみ。デフォルトでは、シンクはスループットを最適化するためにバッチで書き込みます。 SQL バージョンの注目すべき機能は、PARTITIONED BY 句のサポートです。 1 つ以上のキーを指定すると、バッチ書き込みごとにキーごとの最新レコードのみを送信することで、クライアント側で重複排除を行うことができます。 DataStream API を使用して、各バッチ内で上書きするパーティション キーのリストを指定することで、同等のことを実現できます。

このコネクタはシンクとしてのみ機能します。 DynamoDB からの読み取りには使用できません。 DynamoDB でデータを検索するには、引き続き、 Flink 非同期 I/O API または、SQL 用のカスタム ユーザー定義関数 (UDF) を実装します。

MongoDBの

もう 1 つの興味深いコネクタは、 MongoDBの。この場合、ソースとシンクの両方が利用可能です。 SQLとテーブル API と データストリーム API。新しいコネクタは正式に Apache Flink プロジェクトの一部となり、コミュニティによってサポートされています。この新しいコネクタは、古い Flink Sink および Source API のみをサポートする MongoDB によって提供される古いコネクタを直接置き換えます。

他のデータ ストア コネクタと同様に、ソースは境界付きソースとして、バッチ モードで、またはルックアップとして使用できます。シンクはバッチ モードとストリーミングの両方で動作し、更新/挿入モードと追加モードの両方をサポートします。

このコネクタの多くの注目すべき機能の中で、言及する価値のある機能の 1 つは、ルックアップにソースを使用するときにキャッシュを有効にする機能です。シンクは追加設定なしで少なくとも 1 回の保証をサポートします。主キーが定義されている場合、シンクは冪等な更新/挿入により 1 回限りのセマンティクスをサポートできます。シンク コネクタは、主キーが定義されている場合、冪等の更新/挿入による 1 回限りのセマンティクスもサポートします。

新しいコネクタのバージョン管理

新しい機能ではありませんが、古い Apache Flink アプリケーションを更新するときに考慮すべき重要な要素は、新しいコネクタのバージョン管理です。 Apache Flink バージョン 1.17 以降、ほとんどのコネクタはメインの Apache Flink ディストリビューションから外部化され、独立したバージョン管理に従います。

適切な依存関係を含めるには、次の形式でアーティファクトのバージョンを指定する必要があります。 <connector-version>-<flink-version>

たとえば、最新の Kafka コネクタは、 ApacheKafkaのAmazonマネージドストリーミング (Amazon MSK)、執筆時点ではバージョン 3.1.0 です。 Apache Flink 1.18 を使用している場合、使用する依存関係は次のとおりです。

<dependency> 
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId> 
    <version>3.1.0-1.18</version>
</dependency>

アマゾンキネシス、新しいコネクタのバージョンは 4.2.0 です。 Apache Flink 1.18 の依存関係は次のようになります。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kinesis</artifactId> 
    <version>4.2.0-1.18</version>
</dependency>

次のセクションでは、Apache Flink 1.18 で利用可能になり、Apache Flink の Amazon マネージド サービスでサポートされる強力な新機能についてさらに説明します。

SQL

Apache Flink SQL では、ユーザーが提供できるのは ヒント オプティマイザーがクエリ プランに影響を与えることを提案するために使用できるクエリを結合します。特にストリーミング アプリケーションでは、 ルックアップ結合 外部システム (通常はデータベース) からクエリされたデータを使用して、ストリーミング データを表すテーブルを強化するために使用されます。バージョン 1.16 以降、ルックアップ結合にいくつかの改善が導入され、結合の動作を調整してパフォーマンスを向上できるようになりました。

  • ルックアップキャッシュ は強力な機能で、最も頻繁に使用されるレコードをメモリ内にキャッシュして、データベースへの負荷を軽減できます。以前は、ルックアップ キャッシュは一部のコネクタに固有でした。 Apache Flink 1.16 以降、このオプションは、ルックアップを内部的にサポートするすべてのコネクタで利用できるようになりました (FLIP-221)。この記事を書いている時点では、 JDBC, ハイブ, HBase コネクタはルックアップ キャッシュをサポートします。ルックアップ キャッシュには 3 つの使用可能なモードがあります。 FULL、完全にメモリ内に保持できる小さなデータセットの場合、 PARTIAL、大規模なデータセットの場合、最新のレコードのみをキャッシュする、または NONE、キャッシュを完全に無効にします。のために PARTIAL キャッシュを使用する場合、バッファする行数と存続期間を構成することもできます。
  • 非同期ルックアップ これもパフォーマンスを大幅に向上させる機能です。非同期ルックアップは、Apache Flink SQL で次のような機能を提供します。 非同期I/O DataStream API で利用可能です。これにより、Apache Flink は、以前のルックアップに対する応答が受信されるまで処理スレッドをブロックすることなく、新しいリクエストをデータベースに送信できるようになります。非同期 I/O と同様に、非同期ルックアップを構成して順序付けを強制したり、順序付けされていない結果を許可したり、バッファ容量やタイムアウトを調整したりできます。
  • また、 ルックアップ再試行戦略 と組み合わせて PARTIAL or NONE ルックアップ キャッシュ。外部データベースでのルックアップが失敗した場合の動作を設定します。

これらすべての動作は、 LOOKUP 次の例のようなヒントでは、非同期ルックアップを使用したルックアップ結合が示されています。

SELECT 
    /*+ LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered') */ 
    O.order_id, O.total, C.address
FROM Orders AS O 
JOIN Customers FOR SYSTEM_TIME AS OF O.proc_time AS C 
  ON O.customer_id = O.customer_id

パイフリンク

このセクションでは、PyFlink の新しい改善点とサポートについて説明します。

Python3.10のサポート

Apache Flink の最新バージョンでは、PyFlink ユーザー向けにいくつかの改善が導入されました。何よりもまず、Python 3.10 がサポートされるようになり、Python 3.6 のサポートは完全に削除されました (フリンク-29421)。 Apache Flink のマネージド サービスは現在、Python 3.10 ランタイムを使用して PyFlink アプリケーションを実行します。

機能同等に近づく

プログラミング API の観点から見ると、PyFlink はバージョンごとに Java に近づいています。 DataStream API は副出力やブロードキャスト ステートなどの機能をサポートするようになり、ウィンドウ API のギャップは解消されました。 PyFlink は、次のような新しいコネクタもサポートするようになりました。 Amazon Kinesisデータストリーム DataStream API から直接。

スレッドモードの改善

PyFlink は非常に効率的です。 PyFlink で Flink API 演算子を実行する際のオーバーヘッドは、Java や Scala に比べて最小限です。これは、アプリケーションの言語に関係なく、ランタイムが実際に JVM で演算子の実装を直接実行するためです。ただし、ユーザー定義関数がある場合は、状況が少し異なります。次のような単純な Python コード行 lambda x: x + 1、または Pandas 関数と同じくらい複雑な関数は、Python ランタイムで実行する必要があります。

デフォルトでは、Apache Flink は、JVM の外部の各タスク マネージャーで Python ランタイムを実行します。各レコードはシリアル化され、プロセス間通信を介して Python ランタイムに渡され、逆シリアル化されて、Python ランタイムで処理されます。結果はシリアル化されて JVM に戻され、そこで逆シリアル化されます。これは PyFlink です プロセスモード。非常に安定していますが、オーバーヘッドが発生し、場合によってはパフォーマンスのボトルネックになる可能性があります。

バージョン 1.15 以降、Apache Flink もサポートされています スレッドモード PyFlink用。このモードでは、Python ユーザー定義関数が JVM 自体内で実行され、シリアル化/逆シリアル化およびプロセス間通信のオーバーヘッドが除去されます。 THREADモードには、 いくつかの制限;たとえば、THREAD モードは Pandas や UDAF (多くの入力レコードと 1 つの出力レコードで構成されるユーザー定義の集計関数) には使用できませんが、PyFlink アプリケーションのパフォーマンスを大幅に向上させることができます。

バージョン 1.16 では、THREAD モードのサポートが大幅に拡張され、Python DataStream API もカバーされました。

THREAD モードは、Apache Flink のマネージド サービスでサポートされており、 PyFlink アプリケーションから直接有効化.

Appleシリコンのサポート

Apple Silicon ベースのマシンを使用して PyFlink アプリケーションを開発し、PyFlink 1.15 向けに開発している場合は、Apple Silicon で既知の Python 依存関係の問題のいくつかに遭遇したことがあるでしょう。これらの問題は最終的に解決されました (フリンク-25188)。これらの制限は、Apache Flink のマネージド サービスで実行されている PyFlink アプリケーションには影響しませんでした。バージョン 1.16 より前は、M1、M2、または M3 チップセットを使用するマシン上で PyFlink アプリケーションを開発したい場合は、次のようなものを使用する必要がありました。 回避策、PyFlink 1.15 以前をマシンに直接インストールすることは不可能だったためです。

整列されていないチェックポイントの改善

Apache Flink 1.15 は、すでに増分チェックポイントとバッファー デブローティングをサポートしています。これらの機能を特に組み合わせて使用​​すると、チェックポイントのパフォーマンスが向上し、特にバックプレッシャーが存在する場合にチェックポイントの継続時間をより予測しやすくなります。これらの機能の詳細については、次を参照してください。 バッファーのデブローティングとアライメントされていないチェックポイントを使用して、Apache Flink アプリケーションの Amazon マネージド サービスでのチェックポイント設定を最適化します。.

バージョン 1.16 および 1.17 では、安定性とパフォーマンスを向上させるためにいくつかの変更が導入されました。

データスキューの処理

Apache Flink が使用する イベント時のセマンティクスをサポートするウォーターマーク。ウォーターマークは特別なレコードであり、通常はソース オペレーターからフローに挿入され、イベント時間ウィンドウ集計などのオペレーターのイベント時間の進行状況をマークします。一般的な手法は、ウォーターマークを観測された最新のイベント時間から遅らせて、イベントが少なくともある程度は狂うことを許容することです。

ただし、ウォーターマークの使用には課題​​が伴います。アプリケーションに複数のソースがある場合、たとえば、Kafka トピックの複数のパーティションからイベントを受信する場合、ウォーターマークはパーティションごとに個別に生成されます。内部的には、各オペレーターは常にすべての入力パーティションで同じウォーターマークを待ち、実際には最も遅いパーティションに位置合わせします。欠点は、パーティションの 1 つがデータを受信して​​いない場合、ウォーターマークが処理されず、エンドツーエンドの遅延が増加することです。このため、 オプションのアイドルタイムアウト 多くのストリーミング ソースで導入されています。構成されたタイムアウトの後、ウォーターマークの生成はレコードを受信して​​いないパーティションを無視し、ウォーターマークは続行できます。

また、1 つのソースが他のソースよりもはるかに速くイベントを受信して​​いる場合、同様ではあるが逆の課題に直面する可能性があります。ウォーターマークは最も遅いパーティションに合わせて配置されます。これは、ウィンドウ集計がウォーターマークを待つことを意味します。高速ソースからのレコードはバッファリングされて待機する必要があります。これにより、過剰な量のデータがバッファリングされ、オペレータの状態が制御不能に増大する可能性があります。

ソースの高速化の問題に対処するために、Apache Flink 1.17 以降では、ソース分割のウォーターマーク位置合わせを有効にすることができます (フリンク-28853)。このメカニズムはデフォルトで無効になっており、どのパーティションも他のパーティションと比べてウォーターマークの進行が速すぎることがなくなります。複数の入力トピックなどの複数のソースをバインドし、同じアライメント グループ ID を割り当て、現在のウォーターマークからの最大ドリフトの期間を構成できます。特定のパーティションの 1 つがイベントの受信速度が速すぎる場合、ソース オペレーターは、ドリフトが設定されたしきい値を下回るまで、そのパーティションの使用を一時停止します。

ソースごとに個別に有効にすることができます。必要なのは、アラインメント グループ ID を指定することだけです。これにより、同じ ID を持つすべてのソースがバインドされ、現在の最小ウォーターマークからの最大ドリフトの期間が結合されます。これにより、ドリフトが指定されたしきい値よりも低くなるまで、進行が速すぎるソース サブタスクからの消費が一時停止されます。

次のコード スニペットは、境界不規則なウォーターマークを発行する Kafka ソース上でソース分割のウォーターマーク位置合わせを設定する方法を示しています。

KafkaSource<Event> kafkaSource = ...
DataStream<Event> stream = env.fromSource(
    kafkaSource,
    WatermarkStrategy.<Event>forBoundedOutOfOrderness( Duration.ofSeconds(20))
        .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1)),
    "Kafka source"));

この機能は以下の場合のみ利用可能です FLIP-217 互換性のあるソース、ソース分割のウォーターマーク配置をサポートします。執筆時点では、主要なストリーミング ソース コネクタの中で、Kafka ソースのみがこの機能をサポートしています。

Protobuf 形式の直接サポート

SQL およびテーブル API が直接サポートされるようになりました。 プロトバッファ形式。この形式を使用するには、Protobuf Java クラスを .proto スキーマ定義ファイルを作成し、アプリケーションに依存関係として含めます。

Protobuf 形式は SQL およびテーブル API でのみ機能し、Protobuf でシリアル化されたデータをソースまたはシンクから読み書きする場合にのみ機能します。現在、Flink は Protobuf による状態の直接シリアル化を直接サポートしておらず、スキーマの進化もサポートしていません。 アブロ、 例えば。まだ登録する必要があります カスタムシリアライザー アプリケーションに多少のオーバーヘッドがかかります。

Apache Flink をオープンソースに維持する

Apache Flink は、サブタスク間のデータ送信を内部的に Akka に依存しています。 2022年には、 Akkaを開発したLightbend社がライセンス変更を発表 将来の Akka バージョンについては、Apache 2.0 からより制限の厳しいライセンスに移行し、Apache Flink で使用されるバージョンである Akka 2.6 には、これ以上のセキュリティ アップデートや修正は適用されません。

Akka は歴史的に非常に安定しており、頻繁な更新を必要としませんが、このライセンス変更は Apache Flink プロジェクトにとってリスクを意味しました。 Apache Flink コミュニティの決定は、Akka をバージョン 2.6 のフォークに置き換えることでした。 アパッチ ペッコ (フリンク-32468)。このフォークは Apache 2.0 ライセンスを保持し、コミュニティから必要なアップデートを受け取ります。それまでの間、Apache Flink コミュニティは、Akka または Pekko への依存関係を完全に削除するかどうかを検討します。

状態圧縮

Apache Flink は、すべてのチェックポイントとセーブポイントに対してオプションの圧縮 (デフォルト: オフ) を提供します。 Apache Flink がバグを特定しました Flink 1.18.1 では、スナップショット圧縮が有効になっているときにオペレーターの状態を適切に復元できませんでした。これにより、データが失われるか、チェックポイントから復元できなくなる可能性があります。これを解決するために、Apache Flink のマネージド サービスは、 修正します これは、Apache Flink の将来のバージョンに含まれる予定です。

Apache Flink のマネージド サービスによるインプレース バージョン アップグレード

現在、Apache Flink 1.15 以前を使用して Apache Flink のマネージド サービスでアプリケーションを実行している場合は、 AWSコマンドラインインターフェイス (AWS CLI)、 AWS CloudFormation or AWSクラウド開発キット (AWS CDK)、または AWS API を使用するツール。

  アプリケーションの更新 API アクションは、Apache Flink アプリケーションの既存のマネージド サービスの Apache Flink ランタイム バージョンの更新をサポートするようになりました。 UpdateApplication は実行中のアプリケーションで直接使用できます。

インプレース更新を続行する前に、アプリケーションに含まれる依存関係を検証して更新し、新しい Apache Flink バージョンと互換性があることを確認する必要があります。特に、Apache Flink ライブラリ、コネクタ、および場合によっては Scala バージョンを更新する必要があります。

また、更新を続行する前に、更新されたアプリケーションをテストすることをお勧めします。回帰が発生していないことを確認するために、ターゲットの Apache Flink ランタイム バージョンを使用して、ローカルおよび非運用環境でテストすることをお勧めします。

最後に、アプリケーションがステートフルである場合は、 スナップショット 実行中のアプリケーションの状態。これにより、以前のアプリケーション バージョンにロールバックできるようになります。

準備ができたら、 アプリケーションの更新 APIアクションまたは アップデートアプリケーション アプリケーションのランタイム バージョンを更新し、更新された依存関係を含む新しいアプリケーション アーティファクト、JAR、または zip ファイルをポイントする AWS CLI コマンド。

プロセスと API の詳細については、以下を参照してください。 Apache Flink のインプレース バージョン アップグレード。ドキュメントには、アップグレード プロセスをガイドするステップバイステップの説明とビデオが含まれています。

結論

この投稿では、Apache Flink の Amazon マネージドサービスでサポートされている Apache Flink の新機能のいくつかを調べました。このリストは包括的なものではありません。 Apache Flink は、SQL やテーブル API のオペレーターレベルの TTL など、非常に有望な機能もいくつか導入しました。FLIP-292] とタイムトラベル [FLIP-308] ですが、これらは API でまだサポートされていないため、ユーザーはまだ実際にアクセスできません。このため、この投稿ではそれらについては取り上げないことにしました。

Apache Flink 1.18 のサポートにより、Apache Flink のマネージド サービスは、最新リリースの Apache Flink バージョンをサポートするようになりました。 Apache Flink 1.18 で利用できる興味深い新機能と新しいコネクタのいくつかと、Apache Flink のマネージド サービスが既存のアプリケーションのアップグレードにどのように役立つかを見てきました。

最近のリリースの詳細については、Apache Flink ブログとリリース ノートをご覧ください。

Apache Flink を初めて使用する場合は、 適切な API と言語を選択するためのガイド そして スタートガイド Apache Flink のマネージド サービスの使用を開始します。


著者について

ロレンツォ・ニコラロレンツォ・ニコラ AWS のシニア ストリーミング ソリューション アーキテクトとして働いており、EMEA 全体の顧客をサポートしています。彼は 25 年以上にわたってクラウドネイティブでデータ集約型のシステムを構築しており、コンサルティング会社や FinTech 製品会社の両方を通じて金融業界で働いています。彼はオープンソース テクノロジーを幅広く活用し、Apache Flink などのいくつかのプロジェクトに貢献してきました。

フランシスコ・モリーロフランシスコ・モリーロ AWS のストリーミング ソリューション アーキテクトです。 Francisco は AWS の顧客と協力し、AWS のサービスを使用したリアルタイム分析アーキテクチャの設計を支援し、Amazon MSK および Amazon Managed Service for Apache Flink をサポートしています。

スポット画像

最新のインテリジェンス

スポット画像