Logo Zéphyrnet

Amazon Managed Service pour Apache Flink prend désormais en charge Apache Flink version 1.18 | Services Web Amazon

Date :

Apache Flink est un moteur de traitement distribué open source, offrant de puissantes interfaces de programmation pour le traitement par flux et par lots, avec une prise en charge de premier ordre du traitement avec état et de la sémantique de l'heure des événements. Apache Flink prend en charge plusieurs langages de programmation, Java, Python, Scala, SQL et plusieurs API avec différents niveaux d'abstraction, qui peuvent être utilisées de manière interchangeable dans la même application.

Service géré Amazon pour Apache Flink, qui offre une expérience sans serveur entièrement gérée pour l'exécution des applications Apache Flink, prend désormais en charge Apache Flink 1.18.1, la dernière version d'Apache Flink au moment de la rédaction.

Dans cet article, nous discutons de certaines des nouvelles fonctionnalités et capacités intéressantes d'Apache Flink, introduites avec les versions majeures les plus récentes, 1.16, 1.17 et 1.18, et désormais prises en charge dans le service géré pour Apache Flink.

Nouveaux connecteurs

Avant de plonger dans les nouvelles fonctionnalités d'Apache Flink disponibles avec la version 1.18.1, explorons les nouvelles capacités issues de la disponibilité de nombreux nouveaux connecteurs open source.

Opensearch

Un dédié Opensearch Le connecteur est désormais disponible pour être inclus dans vos projets, permettant à une application Apache Flink d'écrire des données directement dans OpenSearch, sans recourir au mode de compatibilité Elasticsearch. Ce connecteur est compatible avec Service Amazon OpenSearch provisionné et Service OpenSearch sans serveur.

Ce nouveau connecteur prend en charge API SQL et tables, fonctionnant à la fois avec Java et Python, et avec API DataStream, pour Java uniquement. Prêt à l'emploi, il fournit des garanties au moins une fois, en synchronisant les écritures avec les points de contrôle Flink. Vous pouvez obtenir une sémantique unique en utilisant des identifiants déterministes et la méthode upsert.

Par défaut, le connecteur utilise les bibliothèques clientes OpenSearch version 1.x. Vous pouvez passer à la version 2.x en ajouter les dépendances correctes.

Amazon DynamoDB

Les développeurs Apache Flink peuvent désormais utiliser un connecteur dédié pour écrire des données dans Amazon DynamoDB. Ce connecteur est basé sur le Apache Flink AsyncSink, développé par AWS et désormais partie intégrante du projet Apache Flink, pour simplifier la mise en œuvre de connecteurs récepteurs efficaces, en utilisant des requêtes d'écriture non bloquantes et un traitement par lots adaptatif.

Ce connecteur prend également en charge les deux SQL et tableaux API, Java et Python, et DataStream API, pour Java uniquement. Par défaut, le récepteur écrit par lots pour optimiser le débit. Une fonctionnalité notable de la version SQL est la prise en charge de la clause PARTITIONED BY. En spécifiant une ou plusieurs clés, vous pouvez réaliser une déduplication côté client, en envoyant uniquement le dernier enregistrement par clé à chaque écriture par lot. Un équivalent peut être obtenu avec l'API DataStream en spécifiant une liste de clés de partition à écraser dans chaque lot.

Ce connecteur fonctionne uniquement comme un évier. Vous ne pouvez pas l'utiliser pour lire à partir de DynamoDB. Pour rechercher des données dans DynamoDB, vous devez toujours implémenter une recherche à l'aide de l'outil API d'E/S asynchrone Flink ou implémenter une fonction définie par l'utilisateur (UDF) personnalisée pour SQL.

MongoDB

Un autre connecteur intéressant est pour MongoDB. Dans ce cas, la source et le récepteur sont disponibles, à la fois pour le SQL et tableaux API et DataStream API. Le nouveau connecteur fait désormais officiellement partie du projet Apache Flink et est pris en charge par la communauté. Ce nouveau connecteur remplace directement l'ancien fourni par MongoDB, qui ne prend en charge que les anciennes API Flink Sink et Source.

Comme pour les autres connecteurs de magasin de données, la source peut être utilisée comme source limitée, en mode batch ou pour des recherches. Le récepteur fonctionne à la fois en mode batch et en streaming, prenant en charge les modes upsert et append.

Parmi les nombreuses fonctionnalités notables de ce connecteur, il convient de mentionner la possibilité d'activer la mise en cache lors de l'utilisation de la source pour des recherches. Prêt à l’emploi, l’évier prend en charge les garanties au moins une fois. Lorsqu'une clé primaire est définie, le récepteur peut prendre en charge la sémantique exactement une fois via des upserts idempotents. Le connecteur récepteur prend également en charge la sémantique exactement une fois, avec des upserts idempotents, lorsque la clé primaire est définie.

Nouvelle version du connecteur

Il ne s'agit pas d'une nouvelle fonctionnalité, mais d'un facteur important à prendre en compte lors de la mise à jour d'une ancienne application Apache Flink : la gestion des versions du nouveau connecteur. À partir de la version 1.17 d'Apache Flink, la plupart des connecteurs ont été externalisés de la distribution principale d'Apache Flink et suivent une gestion des versions indépendante.

Pour inclure la bonne dépendance, vous devez spécifier la version de l'artefact avec le formulaire : <connector-version>-<flink-version>

Par exemple, le dernier connecteur Kafka, fonctionnant également avec Amazon Managed Streaming pour Apache Kafka (Amazon MSK), au moment de la rédaction, il s'agit de la version 3.1.0. Si vous utilisez Apache Flink 1.18, la dépendance à utiliser sera la suivante :

<dependency> 
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId> 
    <version>3.1.0-1.18</version>
</dependency>

Pour Amazon Kinésis, la nouvelle version du connecteur est la 4.2.0. La dépendance pour Apache Flink 1.18 sera la suivante :

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kinesis</artifactId> 
    <version>4.2.0-1.18</version>
</dependency>

Dans les sections suivantes, nous discutons davantage des nouvelles fonctionnalités puissantes désormais disponibles dans Apache Flink 1.18 et prises en charge dans Amazon Managed Service pour Apache Flink.

SQL

Dans Apache Flink SQL, les utilisateurs peuvent fournir indices pour joindre des requêtes qui peuvent être utilisées pour suggérer à l'optimiseur d'avoir un effet sur le plan de requête. En particulier, dans les applications de streaming, recherche de jointures sont utilisés pour enrichir une table, représentant des données en continu, avec des données interrogées à partir d'un système externe, généralement une base de données. Depuis la version 1.16, plusieurs améliorations ont été introduites pour les jointures de recherche, vous permettant d'ajuster le comportement de la jointure et d'améliorer les performances :

  • Cache de recherche est une fonctionnalité puissante, vous permettant de mettre en cache en mémoire les enregistrements les plus fréquemment utilisés, réduisant ainsi la pression sur la base de données. Auparavant, le cache de recherche était spécifique à certains connecteurs. Depuis Apache Flink 1.16, cette option est devenue disponible pour tous les connecteurs prenant en charge en interne la recherche (FLIP-221). Au moment d'écrire ces lignes, JDBC, Rucheet HBase les connecteurs prennent en charge le cache de recherche. Le cache de recherche propose trois modes : FULL, pour un petit ensemble de données pouvant être entièrement conservé en mémoire, PARTIAL, pour un ensemble de données volumineux, mettre en cache uniquement les enregistrements les plus récents, ou NONE, pour désactiver complètement le cache. Pour PARTIAL cache, vous pouvez également configurer le nombre de lignes à mettre en mémoire tampon et la durée de vie.
  • Recherche asynchrone est une autre fonctionnalité qui peut grandement améliorer les performances. La recherche asynchrone fournit dans Apache Flink SQL une fonctionnalité similaire à E/S asynchrones disponible dans l'API DataStream. Il permet à Apache Flink d'émettre de nouvelles requêtes vers la base de données sans bloquer le thread de traitement jusqu'à ce que les réponses aux recherches précédentes aient été reçues. De la même manière que les E/S asynchrones, vous pouvez configurer la recherche asynchrone pour appliquer l'ordre ou autoriser les résultats non ordonnés, ou ajuster la capacité du tampon et le délai d'attente.
  • Vous pouvez également configurer un stratégie de nouvelle tentative de recherche en combinaison avec PARTIAL or NONE cache de recherche, pour configurer le comportement en cas d'échec d'une recherche dans la base de données externe.

Tous ces comportements peuvent être contrôlés à l'aide d'un LOOKUP indice, comme dans l'exemple suivant, où nous montrons une jointure de recherche utilisant la recherche asynchrone :

SELECT 
    /*+ LOOKUP('table'='Customers', 'async'='true', 'output-mode'='allow_unordered') */ 
    O.order_id, O.total, C.address
FROM Orders AS O 
JOIN Customers FOR SYSTEM_TIME AS OF O.proc_time AS C 
  ON O.customer_id = O.customer_id

PyFlink

Dans cette section, nous discutons des nouvelles améliorations et de la prise en charge de PyFlink.

Prise en charge de Python 3.10

Les dernières versions d'Apache Flink ont ​​introduit plusieurs améliorations pour les utilisateurs de PyFlink. Tout d'abord, Python 3.10 est désormais pris en charge et le support de Python 3.6 a été complètement supprimé (FLINK-29421). Le service géré pour Apache Flink utilise actuellement le runtime Python 3.10 pour exécuter les applications PyFlink.

Se rapprocher de la parité des fonctionnalités

Du point de vue de l'API de programmation, PyFlink se rapproche de Java à chaque version. L'API DataStream prend désormais en charge des fonctionnalités telles que les sorties secondaires et l'état de diffusion, et les lacunes de l'API de fenêtrage ont été comblées. PyFlink prend également désormais en charge de nouveaux connecteurs comme Flux de données Amazon Kinesis directement depuis l'API DataStream.

Améliorations du mode fil de discussion

PyFlink est très efficace. La surcharge liée à l'exécution des opérateurs d'API Flink dans PyFlink est minime par rapport à Java ou Scala, car le moteur d'exécution exécute en fait l'implémentation de l'opérateur directement dans la JVM, quel que soit le langage de votre application. Mais lorsque vous disposez d’une fonction définie par l’utilisateur, les choses sont légèrement différentes. Une ligne de code Python aussi simple que lambda x: x + 1, ou aussi complexe qu'une fonction Pandas, doit s'exécuter dans un runtime Python.

Par défaut, Apache Flink exécute un runtime Python sur chaque gestionnaire de tâches, externe à la JVM. Chaque enregistrement est sérialisé, transmis au runtime Python via une communication inter-processus, désérialisé et traité dans le runtime Python. Le résultat est ensuite sérialisé et renvoyé à la JVM, où il est désérialisé. C'est le PyFlink Mode PROCESSUS. C'est très stable mais cela introduit une surcharge et, dans certains cas, cela peut devenir un goulot d'étranglement en termes de performances.

Depuis la version 1.15, Apache Flink prend également en charge Mode FIL pour PyFlink. Dans ce mode, les fonctions Python définies par l'utilisateur sont exécutées au sein de la JVM elle-même, supprimant ainsi la surcharge de sérialisation/désérialisation et de communication inter-processus. Le mode FIL a quelques limitations; par exemple, le mode THREAD ne peut pas être utilisé pour les Pandas ou les UDAF (fonctions d'agrégation définies par l'utilisateur, composées de nombreux enregistrements d'entrée et d'un enregistrement de sortie), mais peut améliorer considérablement les performances d'une application PyFlink.

Avec la version 1.16, la prise en charge du mode THREAD a été considérablement étendue, couvrant également l'API Python DataStream.

Le mode THREAD est pris en charge par Managed Service pour Apache Flink et peut être activé directement depuis votre application PyFlink.

Prise en charge d'Apple Silicon

Si vous utilisez des machines basées sur Apple Silicon pour développer des applications PyFlink, en développement pour PyFlink 1.15, vous avez probablement rencontré certains des problèmes de dépendance Python connus sur Apple Silicon. Ces problèmes ont finalement été résolus (FLINK-25188). Ces limitations n'ont pas affecté les applications PyFlink exécutées sur Managed Service pour Apache Flink. Avant la version 1.16, si vous souhaitiez développer une application PyFlink sur une machine utilisant un chipset M1, M2 ou M3, vous deviez utiliser certains solutions de contournement, car il était impossible d'installer PyFlink 1.15 ou version antérieure directement sur la machine.

Améliorations des points de contrôle non alignés

Apache Flink 1.15 prenait déjà en charge les points de contrôle incrémentiels et le débloquage de tampon. Ces fonctionnalités peuvent être utilisées, en particulier en combinaison, pour améliorer les performances des points de contrôle, rendant ainsi la durée du point de contrôle plus prévisible, notamment en présence de contre-pression. Pour plus d'informations sur ces fonctionnalités, voir Optimisez les points de contrôle dans vos applications Amazon Managed Service pour Apache Flink avec un déballage de tampon et des points de contrôle non alignés.

Avec les versions 1.16 et 1.17, plusieurs changements ont été introduits pour améliorer la stabilité et les performances.

Gestion du biais de données

Apache Flink utilise filigranes pour prendre en charge la sémantique temporelle de l'événement. Les filigranes sont des enregistrements spéciaux, normalement injectés dans le flux depuis l'opérateur source, qui marquent la progression du temps d'événement pour les opérateurs comme les agrégations de fenêtrages de temps d'événement. Une technique courante consiste à retarder les filigranes à partir de l'heure du dernier événement observé, afin de permettre aux événements d'être hors service, au moins dans une certaine mesure.

Cependant, l’utilisation de filigranes présente un défi. Lorsque l'application dispose de plusieurs sources, par exemple si elle reçoit des événements de plusieurs partitions d'un sujet Kafka, les filigranes sont générés indépendamment pour chaque partition. En interne, chaque opérateur attend toujours le même filigrane sur toutes les partitions d'entrée, l'alignant pratiquement sur la partition la plus lente. L'inconvénient est que si l'une des partitions ne reçoit pas de données, les filigranes ne progressent pas, augmentant ainsi la latence de bout en bout. Pour cette raison, un délai d'inactivité facultatif a été introduit dans de nombreuses sources de streaming. Après le délai d'expiration configuré, la génération du filigrane ignore toute partition ne recevant aucun enregistrement et les filigranes peuvent progresser.

Vous pouvez également être confronté à un défi similaire mais opposé si une source reçoit les événements beaucoup plus rapidement que les autres. Les filigranes sont alignés sur la partition la plus lente, ce qui signifie que toute agrégation de fenêtrage attendra le filigrane. Les enregistrements provenant de la source rapide doivent attendre, étant mis en mémoire tampon. Cela peut entraîner la mise en mémoire tampon d'un volume excessif de données et une croissance incontrôlable de l'état de l'opérateur.

Pour résoudre le problème des sources plus rapides, à partir d'Apache Flink 1.17, vous pouvez activer l'alignement des filigranes des fractionnements de sources (FLINK-28853). Ce mécanisme, désactivé par défaut, garantit qu'aucune partition ne fait progresser ses filigranes trop rapidement par rapport aux autres partitions. Vous pouvez lier plusieurs sources, comme plusieurs sujets d'entrée, en attribuant le même ID de groupe d'alignement et en configurant la durée de la dérive maximale par rapport au filigrane actuel. Si une partition spécifique reçoit des événements trop rapidement, l'opérateur source interrompt la consommation de cette partition jusqu'à ce que la dérive soit réduite en dessous du seuil configuré.

Vous pouvez l'activer pour chaque source séparément. Tout ce dont vous avez besoin est de spécifier un ID de groupe d'alignement, qui liera toutes les sources ayant le même ID, ainsi que la durée de la dérive maximale par rapport au filigrane minimal actuel. Cela mettra en pause la consommation de la sous-tâche source qui avance trop rapidement, jusqu'à ce que la dérive soit inférieure au seuil spécifié.

L'extrait de code suivant montre comment configurer l'alignement des filigranes des divisions de source sur une source Kafka émettant des filigranes délimités dans le désordre :

KafkaSource<Event> kafkaSource = ...
DataStream<Event> stream = env.fromSource(
    kafkaSource,
    WatermarkStrategy.<Event>forBoundedOutOfOrderness( Duration.ofSeconds(20))
        .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1)),
    "Kafka source"));

Cette fonctionnalité est uniquement disponible avec FLIP-217 sources compatibles, prenant en charge l’alignement des filigranes des divisions de sources. Au moment de la rédaction, parmi les principaux connecteurs de sources de streaming, seule la source Kafka prend en charge cette fonctionnalité.

Prise en charge directe du format Protobuf

Les API SQL et Table prennent désormais directement en charge Format Protobuf. Pour utiliser ce format, vous devez générer les classes Java Protobuf à partir du .proto fichiers de définition de schéma et incluez-les en tant que dépendances dans votre application.

Le format Protobuf fonctionne uniquement avec les API SQL et Table et uniquement pour lire ou écrire des données sérialisées Protobuf à partir d'une source ou vers un récepteur. Actuellement, Flink ne prend pas directement en charge Protobuf pour sérialiser directement l'état et ne prend pas en charge l'évolution du schéma, comme c'est le cas pour Avro, Par exemple. Vous devez toujours enregistrer un sérialiseur personnalisé avec quelques frais généraux pour votre application.

Garder Apache Flink open source

Apache Flink s'appuie en interne sur Akka pour envoyer des données entre les sous-tâches. En 2022, Lightbend, la société derrière Akka, a annoncé un changement de licence pour les futures versions d'Akka, d'Apache 2.0 à une licence plus restrictive, et qu'Akka 2.6, la version utilisée par Apache Flink, ne recevrait aucune autre mise à jour ou correctif de sécurité.

Bien qu'Akka ait été historiquement très stable et ne nécessite pas de mises à jour fréquentes, ce changement de licence représentait un risque pour le projet Apache Flink. La décision de la communauté Apache Flink a été de remplacer Akka par un fork de la version 2.6, appelé Apache Pekko (FLINK-32468). Ce fork conservera la licence Apache 2.0 et recevra toutes les mises à jour requises par la communauté. En attendant, la communauté Apache Flink réfléchira à l'opportunité de supprimer complètement la dépendance à Akka ou à Pekko.

Compression d'état

Apache Flink propose une compression facultative (par défaut : désactivée) pour tous les points de contrôle et de sauvegarde. Apache Flink a identifié un bug dans Flink 1.18.1 où l'état de l'opérateur n'a pas pu être correctement restauré lorsque la compression des instantanés est activée. Cela pourrait entraîner une perte de données ou une incapacité à restaurer à partir du point de contrôle. Pour résoudre ce problème, Managed Service pour Apache Flink a rétroporté le fixer qui sera inclus dans les futures versions d'Apache Flink.

Mises à niveau de version sur place avec Managed Service pour Apache Flink

Si vous exécutez actuellement une application sur Managed Service pour Apache Flink à l'aide d'Apache Flink 1.15 ou version antérieure, vous pouvez désormais la mettre à niveau sur place vers 1.18 sans perdre l'état, à l'aide de l'option Interface de ligne de commande AWS (AWS CLI), AWS CloudFormation or Kit de développement AWS Cloud (AWS CDK) ou tout outil utilisant l'API AWS.

La Mettre à jour l'application L'action API prend désormais en charge la mise à jour de la version d'exécution Apache Flink d'une application de service géré pour Apache Flink existante. Vous pouvez utiliser UpdateApplication directement sur une application en cours d'exécution.

Avant de procéder à la mise à jour sur place, vous devez vérifier et mettre à jour les dépendances incluses dans votre application, en vous assurant qu'elles sont compatibles avec la nouvelle version d'Apache Flink. En particulier, vous devez mettre à jour toute bibliothèque Apache Flink, tous les connecteurs et éventuellement la version Scala.

Nous vous recommandons également de tester l'application mise à jour avant de procéder à la mise à jour. Nous vous recommandons de tester localement et dans un environnement hors production, en utilisant la version d'exécution cible d'Apache Flink, pour garantir qu'aucune régression n'a été introduite.

Et enfin, si votre application est avec état, nous vous recommandons de prendre un instantané de l’état de l’application en cours d’exécution. Cela vous permettra de revenir à la version précédente de l'application.

Lorsque vous êtes prêt, vous pouvez maintenant utiliser le Mettre à jour l'application Action API ou application de mise à jour Commande AWS CLI pour mettre à jour la version d'exécution de l'application et la pointer vers le nouvel artefact d'application, le fichier JAR ou zip, avec les dépendances mises à jour.

Pour des informations plus détaillées sur le processus et l'API, reportez-vous à Mise à niveau de la version sur place pour Apache Flink. La documentation comprend des instructions étape par étape et une vidéo pour vous guider tout au long du processus de mise à niveau.

Conclusions

Dans cet article, nous avons examiné certaines des nouvelles fonctionnalités d'Apache Flink, prises en charge dans Amazon Managed Service pour Apache Flink. Cette liste n'est pas compréhensible. Apache Flink a également introduit des fonctionnalités très prometteuses, comme le TTL au niveau de l'opérateur pour SQL et l'API Table [FLIP-292] et Voyage dans le temps [FLIP-308], mais ceux-ci ne sont pas encore supportés par l'API, et pas encore vraiment accessibles aux utilisateurs. C’est pour cette raison que nous avons décidé de ne pas les aborder dans cet article.

Avec la prise en charge d'Apache Flink 1.18, Managed Service pour Apache Flink prend désormais en charge la dernière version d'Apache Flink. Nous avons vu certaines des nouvelles fonctionnalités intéressantes et les nouveaux connecteurs disponibles avec Apache Flink 1.18 et comment le service géré pour Apache Flink vous aide à mettre à niveau une application existante en place.

Vous pouvez trouver plus de détails sur les versions récentes sur le blog Apache Flink et les notes de version :

Si vous êtes nouveau sur Apache Flink, nous vous recommandons notre guide pour choisir la bonne API et le bon langage et suivant le Guide de Démarrage pour commencer à utiliser Managed Service pour Apache Flink.


À propos des auteurs

Lorenzo NicoraLorenzo Nicora travaille en tant qu'architecte principal de solutions de streaming chez AWS, aidant les clients de la région EMEA. Il construit des systèmes cloud natifs à forte intensité de données depuis plus de 25 ans, travaillant dans le secteur financier à la fois par le biais de cabinets de conseil et pour des sociétés de produits FinTech. Il a largement exploité les technologies open source et a contribué à plusieurs projets, dont Apache Flink.

Francisco MorilloFrancisco Morillo est architecte de solutions de streaming chez AWS. Francisco travaille avec les clients AWS, les aidant à concevoir des architectures d'analyse en temps réel à l'aide des services AWS, prenant en charge Amazon MSK et Amazon Managed Service pour Apache Flink.

spot_img

Dernières informations

spot_img