Logo Zéphyrnet

Une comparaison côte à côte d'Apache Spark et d'Apache Flink pour les cas d'utilisation courants du streaming | Services Web Amazon

Date :

Apache Flink et Apache Spark sont tous deux des frameworks de traitement de données distribués open source largement utilisés pour le traitement et l'analyse de données volumineuses. Spark est connu pour sa facilité d'utilisation, ses API de haut niveau et sa capacité à traiter de grandes quantités de données. Flink brille par sa capacité à gérer le traitement des flux de données dans des calculs avec état en temps réel et à faible latence. Les deux prennent en charge une variété de langages de programmation, des solutions évolutives pour gérer de grandes quantités de données et une large gamme de connecteurs. Historiquement, Spark a commencé comme un framework batch-first et Flink a commencé comme un framework streaming-first.

Dans cet article, nous partageons une étude comparative des modèles de streaming couramment utilisés pour créer des applications de traitement de flux, comment ils peuvent être résolus à l'aide de Spark (principalement Spark Structured Streaming) et Flink, et les variations mineures de leur approche. Les exemples couvrent des extraits de code en Python et SQL pour les deux frameworks sur trois thèmes principaux : la préparation des données, le traitement des données et l'enrichissement des données. Si vous êtes un utilisateur de Spark et que vous cherchez à résoudre vos cas d'utilisation de traitement de flux à l'aide de Flink, cet article est pour vous. Nous n'avons pas l'intention de couvrir le choix de la technologie entre Spark et Flink, car il est important d'évaluer les deux frameworks pour votre charge de travail spécifique et la manière dont le choix s'intègre dans votre architecture ; au lieu de cela, cet article met en évidence les principales différences pour les cas d'utilisation pour lesquels ces deux technologies sont généralement envisagées.

Offres Apache Flink API en couches qui offrent différents niveaux d'expressivité et de contrôle et sont conçus pour cibler différents types de cas d'utilisation. Les trois couches de l'API sont les fonctions de processus (également appelées API de traitement de flux avec état), DataStream et Table et SQL. L'API de traitement de flux avec état nécessite l'écriture de code détaillé, mais offre le meilleur contrôle sur le temps et l'état, qui sont des concepts fondamentaux du traitement de flux avec état. L'API DataStream prend en charge Java, Scala et Python et offre des primitives pour de nombreuses opérations de traitement de flux courantes, ainsi qu'un équilibre entre la verbosité du code ou l'expressivité et le contrôle. Les API Table et SQL sont des API relationnelles prenant en charge Java, Scala, Python et SQL. Ils offrent l'abstraction la plus élevée et un contrôle déclaratif intuitif de type SQL sur les flux de données. Flink permet également une transition et une commutation transparentes entre ces API. Pour en savoir plus sur les API en couches de Flink, consultez API en couches.

Apache Spark Structured Streaming propose les API Dataset et DataFrames, qui fournissent des API de streaming déclaratives de haut niveau pour représenter des données statiques et délimitées ainsi que des données en streaming et illimitées. Les opérations sont prises en charge dans Scala, Java, Python et R. Spark possède un ensemble de fonctions et une syntaxe riches avec des constructions simples pour la sélection, l'agrégation, le fenêtrage, les jointures, etc. Vous pouvez également utiliser l'API Streaming Table pour lire des tables en tant que flux de données DataFrames en tant qu'extension de l'API DataFrames. Bien qu'il soit difficile d'établir des parallèles directs entre Flink et Spark dans toutes les constructions de traitement de flux, à un niveau très élevé, nous pourrions dire que les API Spark Structured Streaming sont équivalentes aux API Table et SQL de Flink. Spark Structured Streaming, cependant, n'offre pas encore (au moment d'écrire ces lignes) d'équivalent aux API de niveau inférieur de Flink qui offrent un contrôle granulaire du temps et de l'état.

Flink et Spark Structured Streaming (ci-après dénommés Spark) sont des projets évolutifs. Le tableau suivant fournit une comparaison simple des fonctionnalités Flink et Spark pour les primitives de streaming courantes (au moment de la rédaction de cet article).

. Flink Spark
Traitement basé sur les lignes Oui Oui
Fonctions définies par l'utilisateur Oui Oui
Accès précis à l'état Oui, via DataStream et les API de bas niveau Non
Contrôler quand l'expulsion de l'État se produit Oui, via DataStream et les API de bas niveau Non
Structures de données flexibles pour le stockage d'état et l'interrogation Oui, via DataStream et les API de bas niveau Non
Minuteries pour le traitement et les opérations avec état Oui, via des API de bas niveau Non

Dans les sections suivantes, nous couvrons les principaux facteurs communs afin que nous puissions montrer comment les utilisateurs de Spark peuvent se rapporter à Flink et vice versa. Pour en savoir plus sur les API de bas niveau de Flink, consultez Fonction de processus. Par souci de simplicité, nous couvrons les quatre cas d'utilisation dans cet article en utilisant l'API Flink Table. Nous utilisons une combinaison de Python et SQL pour une comparaison de pommes à pommes avec Spark.

Préparation des données

Dans cette section, nous comparons les méthodes de préparation des données pour Spark et Flink.

Lecture des données

Nous examinons d'abord les moyens les plus simples de lire des données à partir d'un flux de données. Les sections suivantes supposent le schéma suivant pour les messages :

symbol: string,
price: int,
timestamp: timestamp,
company_info:
{ name: string, employees_count: int
}

Lecture de données à partir d'une source dans Spark Structured Streaming

Dans Spark Structured Streaming, nous utilisons un DataFrame de streaming en Python qui lit directement les données au format JSON :

spark = ... # spark session # specify schema
stock_ticker_schema = ... # Create a streaming DataFrame
df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "mybroker1:port") .option("topic", "stock_ticker") .load() .select(from_json(col("value"), stock_ticker_schema).alias("ticker_data")) .select(col("ticker_data.*"))

Notez que nous devons fournir un objet de schéma qui capture notre schéma boursier (stock_ticker_schema). Comparez cela à l'approche pour Flink dans la section suivante.

Lecture de données à partir d'une source à l'aide de l'API Flink Table

Pour Flink, nous utilisons l'instruction SQL DDL CREATE TABLE. Vous pouvez spécifier le schéma du flux comme vous le feriez pour n'importe quelle table SQL. La clause WITH nous permet de spécifier le connecteur au flux de données (Kafka dans ce cas), les propriétés associées pour le connecteur et les spécifications de format de données. Voir le code suivant :

# Create table using DDL CREATE TABLE stock_ticker ( symbol string, price INT, timestamp TIMESTAMP(3), company_info STRING, WATERMARK FOR timestamp AS timestamp - INTERVAL '3' MINUTE
) WITH ( 'connector' = 'kafka', 'topic' = 'stock_ticker', 'properties.bootstrap.servers' = 'mybroker1:port', 'properties.group.id' = 'testGroup', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true'
)

Aplatissement JSON

L'aplatissement JSON est le processus de conversion d'un objet JSON imbriqué ou hiérarchique en une structure plate à un seul niveau. Cela convertit plusieurs niveaux d'imbrication en un objet où toutes les clés et valeurs sont au même niveau. Les clés sont combinées à l'aide d'un délimiteur tel qu'un point (.) ou un trait de soulignement (_) pour indiquer la hiérarchie d'origine. L'aplatissement JSON est utile lorsque vous devez travailler avec un format plus simplifié. Dans Spark et Flink, les JSON imbriqués peuvent être compliqués à utiliser et peuvent nécessiter un traitement supplémentaire ou des fonctions définies par l'utilisateur pour être manipulés. Les JSON aplatis peuvent simplifier le traitement et améliorer les performances grâce à une surcharge de calcul réduite, en particulier avec des opérations telles que les jointures complexes, les agrégations et le fenêtrage. De plus, les JSON aplatis peuvent faciliter le débogage et le dépannage des pipelines de traitement de données, car il y a moins de niveaux d'imbrication à parcourir.

Aplatissement JSON dans Spark Structured Streaming

L'aplatissement JSON dans Spark Structured Streaming nécessite que vous utilisiez la méthode select et que vous spécifiiez le schéma à aplatir. L'aplatissement JSON dans Spark Structured Streaming implique de spécifier le nom de champ imbriqué que vous souhaitez afficher dans la liste de champs de niveau supérieur. Dans l'exemple suivant, company_info est un champ imbriqué et dans company_info, il y a un champ appelé company_name. Avec la requête suivante, nous aplatissons company_info.name à company_name:

stock_ticker_df = ... # Streaming DataFrame w/ schema shown above stock_ticker_df.select("symbol", "timestamp", "price", "company_info.name" as "company_name")

Aplatissement JSON dans Flink

Dans Flink SQL, vous pouvez utiliser la fonction JSON_VALUE. Notez que vous ne pouvez utiliser cette fonction que dans les versions Flink égales ou supérieures à 1.14. Voir le code suivant :

SELECT symbol, timestamp, price, JSON_VALUE(company_info, 'lax $.name' DEFAULT NULL ON EMPTY) AS company_name
FROM stock_ticker

Le terme laxiste dans la requête précédente concerne la gestion des expressions de chemin JSON dans Flink SQL. Pour plus d'informations, reportez-vous à Fonctions système (intégrées).

Traitement de l'information

Maintenant que vous avez lu les données, nous pouvons examiner quelques modèles courants de traitement des données.

Déduplication

La déduplication des données dans le traitement des flux est cruciale pour maintenir la qualité des données et assurer la cohérence. Il améliore l'efficacité en réduisant la pression sur le traitement des données en double et permet de réaliser des économies sur le stockage et le traitement.

Requête de déduplication Spark Streaming

L'extrait de code suivant est lié à un Spark Streaming DataFrame nommé stock_ticker. Le code effectue une opération pour supprimer les lignes en double en fonction de la symbol colonne. La méthode dropDuplicates est utilisée pour éliminer les lignes en double dans un DataFrame basé sur une ou plusieurs colonnes.

stock_ticker = ... # Streaming DataFrame w/ schema shown above stock_ticker.dropDuplicates("symbol")

Requête de déduplication Flink

Le code suivant montre l'équivalent Flink SQL pour dédupliquer les données en fonction de la symbol colonne. La requête récupère la première ligne pour chaque valeur distincte dans le symbol colonne de la stock_ticker flux, basé sur l'ordre croissant de proctime :

SELECT symbol, timestamp, price
FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY proctime ASC) AS row_num FROM stock_ticker)
WHERE row_num = 1

Fenêtrage

Le fenêtrage dans les flux de données est une construction fondamentale pour traiter les données dans les spécifications. Les fenêtres ont généralement des limites de temps, un nombre d'enregistrements ou d'autres critères. Ces limites temporelles divisent les flux de données continus et illimités en blocs gérables appelés fenêtres pour traitement. Windows aide à analyser les données et à obtenir des informations en temps réel tout en maintenant l'efficacité du traitement. Des analyses ou des opérations sont effectuées sur la mise à jour constante des données en continu dans une fenêtre.

Il existe deux fenêtres temporelles courantes utilisées à la fois dans Diffusion Spark ainsi que Flink que nous détaillerons dans ce billet : les fenêtres basculantes et coulissantes. UN fenêtre qui bascule est une fenêtre basée sur le temps qui est de taille fixe et qui n'a pas d'intervalles qui se chevauchent. UN fenêtre coulissante est une fenêtre basée sur le temps qui est de taille fixe et avance à intervalles fixes qui peuvent se chevaucher.

Requête de fenêtre bascule Spark Streaming

Voici une requête de fenêtre bascule Spark Streaming avec une taille de fenêtre de 10 minutes :

stock_ticker = ... # Streaming DataFrame w/ schema shown above # Get max stock price in tumbling window
# of size 10 minutes
visitsByWindowAndUser = visits .withWatermark("timestamp", "3 minutes") .groupBy( window(stock_ticker.timestamp, "10 minutes"), stock_ticker.symbol) .max(stock_ticker.price)

Requête de la fenêtre bascule de Flink Streaming

Voici une requête de fenêtre bascule équivalente dans Flink avec une taille de fenêtre de 10 minutes :

SELECT symbol, MAX(price) FROM TABLE( TUMBLE(TABLE stock_ticker, DESCRIPTOR(timestamp), INTERVAL '10' MINUTES)) GROUP BY ticker;

Requête de fenêtre glissante Spark Streaming

Voici une requête de fenêtre glissante Spark Streaming avec une taille de fenêtre de 10 minutes et un intervalle de diapositive de 5 minutes :

stock_ticker = ... # Streaming DataFrame w/ schema shown above # Get max stock price in sliding window
# of size 10 minutes and slide interval of size
# 5 minutes visitsByWindowAndUser = visits .withWatermark("timestamp", "3 minutes") .groupBy( window(stock_ticker.timestamp, "10 minutes", "5 minutes"), stock_ticker.symbol) .max(stock_ticker.price)

Requête de fenêtre coulissante Flink Streaming

Voici une requête de fenêtre glissante Flink avec une taille de fenêtre de 10 minutes et un intervalle de diapositives de 5 minutes :

SELECT symbol, MAX(price) FROM TABLE( HOP(TABLE stock_ticker, DESCRIPTOR(timestamp), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)) GROUP BY ticker;

Traitement des données en retard

Prise en charge de Spark Structured Streaming et de Flink traitement de l'heure de l'événement, où un champ dans la charge utile peut être utilisé pour définir des fenêtres temporelles distinctes de l'horloge murale des machines effectuant le traitement. Flink et Spark utilisent tous deux le filigrane à cette fin.

Le filigrane est utilisé dans les moteurs de traitement de flux pour gérer les retards. Un filigrane est comme une minuterie qui définit combien de temps le système peut attendre les événements tardifs. Si un événement arrive et se situe dans le délai défini (filigrane), le système l'utilisera pour mettre à jour une demande. S'il est postérieur au filigrane, le système l'ignorera.

Dans les requêtes de fenêtrage précédentes, vous spécifiez le seuil de retard dans Spark à l'aide du code suivant :

.withWatermark("timestamp", "3 minutes")

Cela signifie que tous les enregistrements en retard de 3 minutes, tels qu'ils sont suivis par l'horloge de l'événement, seront supprimés.

En revanche, avec l'API Flink Table, vous pouvez spécifier un seuil de retard analogue directement dans le DDL :

WATERMARK FOR timestamp AS timestamp - INTERVAL '3' MINUTE

Notez que Flink fournit des constructions supplémentaires pour spécifier le retard dans ses différentes API.

L'enrichissement des données

Dans cette section, nous comparons les méthodes d'enrichissement de données avec Spark et Flink.

Appel d'une API externe

L'appel d'API externes à partir de fonctions définies par l'utilisateur (UDF) est similaire dans Spark et Flink. Notez que votre UDF sera appelée pour chaque enregistrement traité, ce qui peut entraîner l'appel de l'API à un taux de demande très élevé. De plus, dans les scénarios de production, votre code UDF est souvent exécuté en parallèle sur plusieurs nœuds, ce qui amplifie encore le taux de demande.

Pour les extraits de code suivants, supposons que l'appel d'API externe implique l'appel de la fonction :

response = my_external_api(request)

Appel d'API externe dans Spark UDF

Le code suivant utilise Spark :

class Predict(ScalarFunction):
def open(self, function_context): with open("resources.zip/resources/model.pkl", "rb") as f:
self.model = pickle.load(f) def eval(self, x):
return self.model.predict(x)

Appel d'API externe dans Flink UDF

Pour Flink, supposons que nous définissions l'UDF callExternalAPIUDF, qui prend en entrée le symbole du symbole boursier et renvoie des informations enrichies sur le symbole via un point de terminaison REST. On peut alors S'inscrire et appelez l'UDF comme suit :

callExternalAPIUDF = udf(callExternalAPIUDF(), result_type=DataTypes.STRING()) SELECT symbol, callExternalAPIUDF(symbol) as enriched_symbol
FROM stock_ticker;

Les UDF Flink fournissent une méthode d'initialisation qui s'exécute une seule fois (par opposition à une fois par enregistrement traité).

Notez que vous devez utiliser les UDF judicieusement, car une UDF mal implémentée peut ralentir votre travail, provoquer une contre-pression et éventuellement bloquer votre application de traitement de flux. Il est conseillé d'utiliser les UDF de manière asynchrone pour maintenir un débit élevé, en particulier pour les cas d'utilisation liés aux E/S ou lorsqu'il s'agit de ressources externes telles que des bases de données ou des API REST. Pour en savoir plus sur l'utilisation des E/S asynchrones avec Apache Flink, reportez-vous à Enrichissez votre flux de données de manière asynchrone à l'aide d'Amazon Kinesis Data Analytics pour Apache Flink.

Conclusion

Apache Flink et Apache Spark sont tous deux des projets en évolution rapide et offrent un moyen rapide et efficace de traiter le Big Data. Cet article s'est concentré sur les principaux cas d'utilisation que nous rencontrions couramment lorsque les clients voulaient voir des parallèles entre les deux technologies pour créer des applications de traitement de flux en temps réel. Nous avons inclus les échantillons les plus fréquemment demandés au moment de la rédaction de cet article. Faites-nous savoir si vous souhaitez plus d'exemples dans la section des commentaires.


A propos de l'auteure

Deepthi Mohan est chef de produit principal au sein de l'équipe Amazon Kinesis Data Analytics.

Karthi Thyagarajan était architecte principal de solutions au sein de l'équipe Amazon Kinesis.

spot_img

Dernières informations

spot_img