Logo Zéphyrnet

Valider les données de diffusion sur Amazon MSK à l'aide de schémas dans le registre de schémas AWS Glue entre comptes

Date :

Les entreprises d'aujourd'hui font face à une croissance sans précédent du volume de données. Une part croissante des données est générée en temps réel par des appareils IoT, des sites Web, des applications métier et diverses autres sources. Les entreprises doivent traiter et analyser ces données dès leur arrivée pour prendre des décisions commerciales en temps réel. Amazon Managed Streaming pour Apache Kafka (Amazon MSK) est un service entièrement géré qui permet de créer et d'exécuter des applications de traitement de flux qui utilisent Apache Kafka pour collecter et traiter des données en temps réel.

Les applications de traitement de flux utilisant Apache Kafka ne communiquent pas directement entre elles ; ils communiquent via l'envoi et la réception de messages sur des sujets Kafka. Pour que les applications de traitement de flux communiquent efficacement et en toute confiance, une structure de charge utile de message doit être définie en termes d'attributs et de types de données. Cette structure décrit le schéma utilisé par les applications lors de l'envoi et de la réception de messages. Cependant, avec un grand nombre d'applications de producteurs et de consommateurs, même une petite modification du schéma (suppression d'un champ, ajout d'un nouveau champ ou modification du type de données) peut entraîner des problèmes pour les applications en aval qui sont difficiles à déboguer et à corriger.

Traditionnellement, les équipes s'appuyaient sur des processus de gestion des modifications (tels que les approbations et les fenêtres de maintenance) ou sur d'autres mécanismes informels (documentation, e-mails, outils de collaboration, etc.) pour s'informer mutuellement des modifications du schéma de données. Cependant, ces mécanismes ne sont pas évolutifs et sont sujets à des erreurs. le Registre de schémas AWS Glue vous permet de publier, découvrir, contrôler, valider et faire évoluer de manière centralisée des schémas pour les applications de traitement de flux. Avec le Registre de schémas AWS Glue, vous pouvez gérer et appliquer des schémas sur les applications de streaming de données à l'aide Apache Kafka, AmazonMSK, Flux de données Amazon Kinesis, Amazon Kinesis Data Analytics pour Apache Flinket AWS Lambda.

Cet article montre comment les applications de traitement de flux Apache Kafka valident les messages à l'aide d'un Apache Euro schéma stocké dans le Registre de schéma AWS Glue résidant dans un compte AWS central. Nous utilisons le Bibliothèque AWS Glue Schema Registry SerDe et Avro SpecificRecord pour valider les messages dans les applications de traitement de flux lors de l'envoi et de la réception de messages à partir d'une rubrique Kafka sur un cluster Amazon MSK. Bien que nous utilisions un schéma Avro pour cet article, la même approche et le même concept s'appliquent également aux schémas JSON.

Cas d'utilisation

Supposons une société de covoiturage fictive qui propose des promenades en licorne. Pour tirer des informations exploitables, ils doivent traiter un flux de messages de demande de trajet de licorne. Ils s'attendent à ce que les manèges soient très populaires et veulent s'assurer que leur solution peut évoluer. Ils construisent également un lac de données central où toutes leurs données de streaming et d'exploitation sont stockées pour analyse. Ils sont obsédés par les clients, ils s'attendent donc à ajouter de nouvelles fonctionnalités amusantes aux futurs trajets, comme le choix de la couleur des cheveux de votre licorne, et devront refléter ces attributs dans les messages de demande de trajet. Pour éviter les problèmes dans les applications en aval en raison de futures modifications de schéma, elles ont besoin d'un mécanisme pour valider les messages avec un schéma hébergé dans un registre de schéma central. Le fait d'avoir des schémas dans un registre de schémas central permet aux équipes d'application de publier, valider, faire évoluer et gérer plus facilement les schémas en un seul endroit.

Vue d'ensemble de la solution

L'entreprise utilise Amazon MSK pour capturer et distribuer à grande échelle les messages de demande de trajet de la licorne. Ils définissent un schéma Avro pour les demandes de trajet de licorne, car il fournit des structures de données riches, prend en charge le mappage direct vers JSON, ainsi qu'un format de données compact, rapide et binaire. Parce que le schéma avait été convenu à l'avance, ils ont décidé d'utiliser Avro SpecificRecord.SpecificRecord est une interface de la bibliothèque Avro qui permet d'utiliser un enregistrement Avro comme POJO. Cela se fait en générant une classe (ou des classes) Java à partir du schéma, en utilisant avro-maven-plugin. Ils utilisent Gestion des identités et des accès AWS (JE SUIS) rôles entre comptes pour permettre aux applications producteur et consommateur de l'autre compte AWS d'accéder en toute sécurité aux schémas du compte Schema Registry central.

Le registre de schéma AWS Glue se trouve dans le compte B, tandis que le cluster MSK et les applications de producteur et de consommateur Kafka se trouvent dans le compte A. Nous utilisons les deux rôles IAM suivants pour permettre l'accès entre comptes au registre de schéma AWS Glue. Les clients Apache Kafka du compte A assument un rôle dans le compte B à l'aide d'une stratégie basée sur l'identité, car le registre de schémas AWS Glue ne prend pas en charge les stratégies basées sur les ressources.

  • Compte Un rôle IAM – Permet aux applications producteur et consommateur d'assumer un rôle IAM dans le compte B.
  • Rôle IAM du compte B – Approuve tous les mandataires IAM du compte A et leur permet d'effectuer des actions de lecture sur le registre de schémas AWS Glue dans le compte B. Dans un scénario de cas d'utilisation réel, les mandataires IAM qui peuvent assumer des rôles entre comptes doivent être définis plus spécifiquement.

Le schéma d'architecture suivant illustre la solution :

La solution fonctionne comme suit :

  1. Un producteur Kafka s'exécutant dans le compte A assume le rôle IAM de registre de schémas entre comptes dans le compte B en appelant le Service de jeton de sécurité AWS (AWS SST) assumeRole API.
  2. Le producteur Kafka récupère l'ID de version du schéma Avro de demande de trajet unicorn à partir du registre de schémas AWS Glue pour le schéma intégré dans le POJO de demande de trajet unicorn. La récupération de l'ID de version du schéma est gérée en interne par le sérialiseur d'AWS Glue Schema Registry SerDe. Le sérialiseur doit être configuré dans le cadre de la configuration du producteur Kafka.
  3. Si le schéma existe dans AWS Glue Schema Registry, le sérialiseur décore l'enregistrement de données avec l'ID de version du schéma, puis le sérialise avant de le transmettre à la rubrique Kafka sur le cluster MSK.
  4. Le consommateur Kafka s'exécutant dans le compte A assume le rôle IAM du registre de schémas entre comptes dans le compte B en appelant AWS STS assumeRole API.
  5. Le consommateur Kafka commence à interroger la rubrique Kafka sur le cluster MSK pour les enregistrements de données.
  6. Le consommateur Kafka récupère le schéma Avro de demande de trajet unicorn à partir du registre de schémas AWS Glue, correspondant à l'ID de version de schéma qui est encodé dans l'enregistrement de données de demande de trajet unicorn. Récupération du schéma
    a est géré en interne par le désérialiseur d'AWS Glue Schema Registry SerDe. Le désérialiseur doit être configuré dans le cadre de la configuration du consommateur Kafka. Si le schéma existe dans le registre de schémas AWS Glue, le désérialiseur désérialise l'enregistrement de données dans la demande de trajet licorne POJO pour que le consommateur le traite.

La bibliothèque AWS Glue Schema Registry SerDe prend également en charge la configuration de compression facultative pour économiser sur les transferts de données. Pour plus d'informations sur le registre de schémas, consultez Fonctionnement du registre de schémas.

Licorne demande de trajet Schéma Avro

Le schéma suivant (UnicornRideRequest.avsc) définit un enregistrement représentant une demande de course unicorn, qui contient des attributs de demande de course ainsi que les attributs client et les attributs unicorn recommandés par le système :

{ "type": "record", "name": "UnicornRideRequest", "namespace": "demo.glue.schema.registry.avro", "fields": [ {"name": "request_id", "type": "int", "doc": "customer request id"}, {"name": "pickup_address","type": "string","doc": "customer pickup address"}, {"name": "destination_address","type": "string","doc": "customer destination address"}, {"name": "ride_fare","type": "float","doc": "ride fare amount (USD)"}, {"name": "ride_duration","type": "int","doc": "ride duration in minutes"}, {"name": "preferred_unicorn_color","type": {"type": "enum","name": "UnicornPreferredColor","symbols": ["WHITE","BLACK","RED","BLUE","GREY"]}, "default": "WHITE"}, { "name": "recommended_unicorn", "type": { "type": "record", "name": "RecommendedUnicorn", "fields": [ {"name": "unicorn_id","type": "int", "doc": "recommended unicorn id"}, {"name": "color","type": {"type": "enum","name": "unicorn_color","symbols": ["WHITE","RED","BLUE"]}}, {"name": "stars_rating", "type": ["null", "int"], "default": null, "doc": "unicorn star ratings based on customers feedback"} ] } }, { "name": "customer", "type": { "type": "record", "name": "Customer", "fields": [ {"name": "customer_account_no","type": "int", "doc": "customer account number"}, {"name": "first_name","type": "string"}, {"name": "middle_name","type": ["null","string"], "default": null}, {"name": "last_name","type": "string"}, {"name": "email_addresses","type": ["null", {"type":"array", "items":"string"}]}, {"name": "customer_address","type": "string","doc": "customer address"}, {"name": "mode_of_payment","type": {"type": "enum","name": "ModeOfPayment","symbols": ["CARD","CASH"]}, "default": "CARD"}, {"name": "customer_rating", "type": ["null", "int"], "default": null} ] } } ] }

Pré-requis

Pour utiliser cette solution, vous devez avoir deux comptes AWS :

Pour cette solution, nous utilisons Region us-east-1, mais vous pouvez modifier cela selon vos besoins.

Ensuite, nous créons les ressources dans chaque compte en utilisant AWS CloudFormation modèles.

Créer des ressources dans le compte B

Nous créons les ressources suivantes dans le compte B :

  • Un registre de schéma
  • Un schéma Avro
  • Un rôle IAM avec le AWSGlueSchemaRegistryReadonlyAccess politique gérée et un profil d'instance, qui permet à tous les mandataires IAM du compte A de l'assumer
  • La UnicornRideRequest.avsc Schéma Avro illustré précédemment, qui est utilisé comme définition de schéma dans le modèle CloudFormation

Assurez-vous que vous disposez des autorisations appropriées pour créer ces ressources.

  1. Connectez-vous au compte B.
  2. Lancez ce qui suit Pile CloudFormation.
  3. Pour Nom de la pile, Entrer SchemaRegistryStack.
  4. Pour Nom du registre de schéma, Entrer unicorn-ride-request-registry.
  5. Pour Nom du schéma Avro, Entrer unicorn-ride-request-schema-avro.
  6. Pour l'ID de compte AWS du client Kafka, entrez votre ID de compte A.
  7. Pour ID externe, saisissez un ID aléatoire unique (par exemple, demo10A), qui doit être fourni par les clients Kafka du compte A tout en assumant le rôle IAM dans ce compte.

Pour plus d'informations sur la sécurité entre comptes, consultez Le problème de l'adjoint confus.

  1. Lorsque la pile est complète, sur le Sortie onglet de la pile, copiez la valeur pour CrossAccountGlueSchemaRegistryRoleArn.

Les applications producteur et consommateur Kafka créées dans le compte A assument ce rôle pour accéder au registre de schémas et au schéma dans le compte B.

  1. Pour vérifier que les ressources ont été créées, sur la console AWS Glue, choisissez Registres de schéma dans la barre de navigation et localisez unicorn-ride-request-registry.
  2. Choisissez le registre unicorn-ride-request-registry et vérifier qu'il contient unicorn-ride-request-schema-avro dans l' Schémas .
  3. Choisissez le schéma pour voir son contenu.

Le rôle IAM créé par le SchemaRegistryStack stack permet à tous les mandataires IAM du compte A de l'assumer et d'effectuer des actions de lecture sur le registre de schémas AWS Glue. Examinons les relations d'approbation du rôle IAM.

  1. Sur le SchemaRegistryStack empiler Sortie tab, copiez la valeur de CrossAccountGlueSchemaRegistryRoleName.
  2. Sur la console IAM, recherchez ce rôle.
  3. Selectionnez Relations de confiance et regardez ses entités de confiance pour confirmer que le compte A est répertorié.
  4. Dans le Conditions section, confirmez que sts:ExternalId a le même ID aléatoire unique fourni lors de la création de la pile.

Créer des ressources dans le compte A

Nous créons les ressources suivantes dans le compte A :

  • Un VPC
  • Instances EC2 pour le producteur et le consommateur Kafka
  • Un environnement AWS Cloud9
  • Un cluster MSK

Au préalable, créez une paire de clés EC2 et téléchargez-la sur votre machine pour pouvoir vous connecter en SSH aux instances EC2. Créez également un Configuration du cluster MSK avec des valeurs par défaut. Vous devez avoir des autorisations pour créer le CloudFormation
pile, instances EC2, environnement AWS Cloud9, cluster MSK, configuration de cluster MSK et rôle IAM.

  1. Connectez-vous au compte A.
  2. Lancez ce qui suit Pile CloudFormation pour lancer le VPC, les instances EC2 et l'environnement AWS Cloud9.
  3. Pour Nom de la pile, Entrer MSKClientStack.
  4. Fournissez les plages CIDR de VPC et de sous-réseau.
  5. Pour Paire de clés EC2, choisissez une paire de clés EC2 existante.
  6. Pour le dernier ID d'AMI EC2, sélectionnez l'option par défaut.
  7. Pour l'ARN du rôle IAM entre comptes, utilisez la valeur pour CrossAccountGlueSchemaRegistryRoleArn (disponible sur le Sortie onglet de SchemaRegistryStack).
  8. Attendez que la pile soit créée avec succès.
  9. Lancez ce qui suit Pile CloudFormation pour créer le cluster MSK.
  10. Pour Nom de la pile, Entrer MSKClusterStack.
  11. Utilisez Amazon MSK version 2.7.1.
  12. Pour l'ARN de configuration du cluster MSK, entrez l'ARN de configuration du cluster MSK. Celui que vous avez créé dans le cadre du prérequis.
  13. Pour le numéro de révision de la configuration du cluster MSK, entrez 1 ou modifiez-le en fonction de votre version.
  14. Pour le nom de la pile CloudFormation du client, entrez MSKClientStack (le nom de la pile que vous avez créé avant cette pile).

Configurer le producteur Kafka

Pour configurer le producteur Kafka accédant au Schema Registry dans le compte AWS central, procédez comme suit :

  1. Connectez-vous au compte A.
  2. Sur la console AWS Cloud9, choisissez le Cloud9EC2Bastion environnement créé par le MSKClientStack association.
  3. Sur le Déposez votre dernière attestation menu, choisissez Télécharger des fichiers locaux.
  4. Chargez le fichier de paire de clés EC2 que vous avez utilisé précédemment lors de la création de la pile.
  5. Ouvrez un nouveau terminal et modifiez les autorisations de la paire de clés EC2 :
    chmod 0400 <keypair PEM file>

  6. SSH dans le KafkaProducerInstance instance EC2 et définissez la région selon vos besoins :
    ssh -i <keypair PEM file> ec2-user@<KafkaProducerInstance Private IP address>
    aws configure set region <region>

  7. Définir la variable d'environnement MSK_CLUSTER_ARN pointant vers l'ARN du cluster MSK :
    export MSK_CLUSTER_ARN=$(aws kafka list-clusters | jq '.ClusterInfoList[] | select (.ClusterName == "MSKClusterStack") | {ClusterArn} | join (" ")' | tr -d ")

Changez le .ClusterName valeur dans le code si vous avez utilisé un nom différent pour la pile CloudFormation du cluster MSK. Le nom du cluster est le même que le nom de la pile.

  1. Définir la variable d'environnement BOOTSTRAP_BROKERS pointant vers les courtiers bootstrap :
    export BOOTSTRAP_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerString)

  2. Vérifiez les variables d'environnement :
    echo $MSK_CLUSTER_ARN
    echo $BOOTSTRAP_BROKERS

  3. Créez un sujet Kafka appelé unicorn-ride-request-topic dans votre cluster MSK, qui sera ensuite utilisé par les applications Kafka producteur et consommateur :
    cd ~/kafka ./bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS --topic unicorn-ride-request-topic --create --partitions 3 --replication-factor 2 ./bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS --list

La MSKClientStack la pile a copié le fichier JAR du client producteur Kafka appelé kafka-cross-account-gsr-producer.jar à la KafkaProducerInstance exemple. Il contient le client producteur Kafka qui envoie des messages au sujet Kafka unicorn-ride-request-topic sur le cluster MSK et accède au unicorn-ride-request-schema-avro Schéma Avro du unicorn-ride-request-registry registre de schéma dans le compte B. Le code du producteur Kafka, que nous aborderons plus loin dans cet article, est disponible sur GitHub.

  1. Exécutez les commandes suivantes et vérifiez kafka-cross-account-gsr-producer.jar existe:
    cd ~
    ls -ls

  2. Exécutez la commande suivante pour exécuter le producteur Kafka dans le KafkaProducerInstance Terminal:
    java -jar kafka-cross-account-gsr-producer.jar -bs $BOOTSTRAP_BROKERS -rn <Account B IAM role arn that Kafka producer application needs to assume> -topic unicorn-ride-request-topic -reg us-east-1 -nm 500 -externalid <Account B IAM role external Id that you used while creating a CF stack in Account B>

Le code a les paramètres suivants :

  • -bs - $BOOTSTRAP_BROKERS (les courtiers d'amorçage du cluster MSK)
  • -rn - L' CrossAccountGlueSchemaRegistryRoleArn valeur de la SchemaRegistryStack empiler les sorties dans le compte B
  • -sujet – le sujet Kafka unicorn-ride-request-topic
  • -reg - us-east-1 (modifiez-le en fonction de votre région, il est utilisé pour le point de terminaison AWS STS et Schema Registry)
  • -nm : 500 (le nombre de messages que l'application producteur envoie au topic Kafka)
  • -ID externe – Le même ID externe (par exemple, demo10A) que vous avez utilisé lors de la création de la pile CloudFormation dans le compte B

La capture d'écran suivante montre les journaux du producteur Kafka montrant Schema Version Id received..., ce qui signifie qu'il a récupéré le schéma Avro unicorn-ride-request-schema-avro du compte B et des messages ont été envoyés au sujet Kafka sur le cluster MSK du compte A.

Code producteur Kafka

L'implémentation complète du producteur Kafka est disponible sur GitHub. Dans cette section, nous décomposons le code.

  • getProducerConfig() initialise les propriétés du producteur, comme illustré dans le code suivant :
    • VALUE_SERIALIZER_CLASS_CONFIG - L' GlueSchemaRegistryKafkaSerializer.class.getName() Implémentation du sérialiseur AWS qui sérialise les enregistrements de données (l'implémentation est disponible sur GitHub)
    • REGISTRY_NAME – Le registre des schémas du compte B
    • SCHEMA_NAME – Le nom du schéma du compte B
    • AVRO_RECORD_TYPE - AvroRecordType.SPECIFIC_RECORD
private Properties getProducerConfig() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); props.put(ProducerConfig.ACKS_CONFIG, "-1"); props.put(ProducerConfig.CLIENT_ID_CONFIG,"msk-cross-account-gsr-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name()); props.put(AWSSchemaRegistryConstants.AWS_REGION,regionName); props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "unicorn-ride-request-registry"); props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "unicorn-ride-request-schema-avro"); props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName()); return props;
}

  • startProducer() assume le rôle dans le compte B pour pouvoir se connecter au Schema Registry du compte B et envoie des messages au sujet Kafka sur le cluster MSK :
public void startProducer() { assumeGlueSchemaRegistryRole(); KafkaProducer<String, UnicornRideRequest> producer = new KafkaProducer<String,UnicornRideRequest>(getProducerConfig()); int numberOfMessages = Integer.valueOf(str_numOfMessages); logger.info("Starting to send records..."); for(int i = 0;i < numberOfMessages;i ++) { UnicornRideRequest rideRequest = getRecord(i); String key = "key-" + i; ProducerRecord<String, UnicornRideRequest> record = new ProducerRecord<String, UnicornRideRequest>(topic, key, rideRequest); producer.send(record, new ProducerCallback()); } }

  • assumeGlueSchemaRegistryRole() comme indiqué dans le code suivant, utilise AWS STS pour assumer le rôle IAM de Schema Registry entre comptes dans le compte B. (Pour plus d'informations, consultez Identifiants de sécurité temporaires dans IAM.) La réponse de stsClient.assumeRole(roleRequest) contient les informations d'identification temporaires, qui incluent accessKeyId, secretAccessKeyEt un sessionToken. Il définit ensuite les informations d'identification temporaires dans les propriétés système. le SDK AWS pour Java utilise ces informations d'identification lors de l'accès au registre de schéma (via le sérialiseur de registre de schéma). Pour plus d'informations, voir Utilisation des informations d'identification.
    public void assumeGlueSchemaRegistryRole() { try { Region region = Region.of(regionName); if(!Region.regions().contains(region)) throw new RuntimeException("Region : " + regionName + " is invalid."); StsClient stsClient = StsClient.builder().region(region).build(); AssumeRoleRequest roleRequest = AssumeRoleRequest.builder() .roleArn(this.assumeRoleARN) .roleSessionName("kafka-producer-cross-account-glue-schemaregistry-demo") .externalId(this.externalId) .build(); AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest); Credentials myCreds = roleResponse.credentials(); System.setProperty("aws.accessKeyId", myCreds.accessKeyId()); System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey()); System.setProperty("aws.sessionToken", myCreds.sessionToken()); stsClient.close(); } catch (StsException e) { logger.error(e.getMessage()); System.exit(1); } }

  • createUnicornRideRequest() utilise les classes générées par le schéma Avro (schéma de demande de trajet licorne) pour créer un SpecificRecord. Pour cet article, les valeurs des attributs de demande de trajet de licorne sont codées en dur dans cette méthode. Voir le code suivant :
    public UnicornRideRequest getRecord(int requestId){ /* Initialise UnicornRideRequest object of class that is generated from AVRO Schema */ UnicornRideRequest rideRequest = UnicornRideRequest.newBuilder() .setRequestId(requestId) .setPickupAddress("Melbourne, Victoria, Australia") .setDestinationAddress("Sydney, NSW, Aus") .setRideFare(1200.50F) .setRideDuration(120) .setPreferredUnicornColor(UnicornPreferredColor.WHITE) .setRecommendedUnicorn(RecommendedUnicorn.newBuilder() .setUnicornId(requestId*2) .setColor(unicorn_color.WHITE) .setStarsRating(5).build()) .setCustomer(Customer.newBuilder() .setCustomerAccountNo(1001) .setFirstName("Dummy") .setLastName("User") .setEmailAddresses(Arrays.asList("demo@example.com")) .setCustomerAddress("Flinders Street Station") .setModeOfPayment(ModeOfPayment.CARD) .setCustomerRating(5).build()).build(); logger.info(rideRequest.toString()); return rideRequest; }

Configurer le consommateur Kafka

La MSKClientStack la pile a créé le KafkaConsumerInstance instance pour l'application consommateur Kafka. Vous pouvez afficher toutes les instances créées par la pile sur la console Amazon EC2.

Pour configurer le consommateur Kafka accédant au Schema Registry dans le compte AWS central, procédez comme suit :

  1. Ouvrez un nouveau terminal dans le Cloud9EC2Bastion Environnement AWS Cloud9.
  2. SSH dans le KafkaConsumerInstance instance EC2 et définissez la région selon vos besoins :
    ssh -i <keypair PEM file> ec2-user@<KafkaConsumerInstance Private IP address>
    aws configure set region <region>

  3. Définir la variable d'environnement MSK_CLUSTER_ARN pointant vers l'ARN du cluster MSK :
    export MSK_CLUSTER_ARN=$(aws kafka list-clusters | jq '.ClusterInfoList[] | select (.ClusterName == "MSKClusterStack") | {ClusterArn} | join (" ")' | tr -d ")

Changez le .ClusterName valeur si vous avez utilisé un nom différent pour la pile CloudFormation du cluster MSK. Le nom du cluster est le même que le nom de la pile.

  1. Définir la variable d'environnement BOOTSTRAP_BROKERS pointant vers les courtiers bootstrap :
    export BOOTSTRAP_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerString)

  2. Vérifiez les variables d'environnement :
    echo $MSK_CLUSTER_ARN
    echo $BOOTSTRAP_BROKERS

La MSKClientStack pile a copié le fichier JAR du client client Kafka appelé kafka-cross-account-gsr-consumer.jar à la KafkaConsumerInstance exemple. Il contient le client client Kafka qui lit les messages du sujet Kafka unicorn-ride-request-topic sur le cluster MSK et accède au unicorn-ride-request-schema-avro Schéma Avro du unicorn-ride-request-registry registre dans le compte B. Le code de consommation Kafka, que nous aborderons plus loin dans cet article, est disponible sur GitHub.

  1. Exécutez les commandes suivantes et vérifiez kafka-cross-account-gsr-consumer.jar existe:
    cd ~
    ls -ls

  2. Exécutez la commande suivante pour exécuter le consommateur Kafka dans le KafkaConsumerInstance Terminal:
    java -jar kafka-cross-account-gsr-consumer.jar -bs $BOOTSTRAP_BROKERS -rn <Account B IAM role arn that Kafka consumer application needs to assume> -topic unicorn-ride-request-topic -reg us-east-1 -externalid <Account B IAM role external Id that you used while creating a CF stack in Account B>

Le code a les paramètres suivants :

  • -bs - $BOOTSTRAP_BROKERS (les courtiers d'amorçage du cluster MSK)
  • -rn - L' CrossAccountGlueSchemaRegistryRoleArn valeur de la SchemaRegistryStack empiler les sorties dans le compte B
  • -sujet – Le sujet Kafka unicorn-ride-request-topic
  • -reg - us-east-1 (modifiez-le en fonction de votre région, il est utilisé pour le point de terminaison AWS STS et Schema Registry)
  • -ID externe – Le même ID externe (par exemple, demo10A) que vous avez utilisé lors de la création de la pile CloudFormation dans le compte B

La capture d'écran suivante montre que les journaux du consommateur Kafka lisent avec succès les messages de la rubrique Kafka sur le cluster MSK dans le compte A et accèdent au schéma Avro unicorn-ride-request-schema-avro du unicorn-ride-request-registry registre de schéma dans le compte B.

Si vous voyez des journaux similaires, cela signifie que les applications client Kafka ont pu se connecter avec succès au registre de schémas centralisé dans le compte B et sont capables de valider les messages tout en envoyant et en consommant des messages du cluster MSK dans le compte A.

Code de la consommation Kafka

L'implémentation complète du consommateur Kafka est disponible sur GitHub. Dans cette section, nous décomposons le code.

  • getConsumerConfig() initialise les propriétés du consommateur, comme illustré dans le code suivant :
    • VALUE_DESERIALIZER_CLASS_CONFIG - L' GlueSchemaRegistryKafkaDeserializer.class.getName() Implémentation du désérialiseur AWS qui désérialise le SpecificRecord selon l'ID de schéma encodé du Schema Registry (l'implémentation est disponible sur GitHub).
    • AVRO_RECORD_TYPE - AvroRecordType.SPECIFIC_RECORD
private Properties getConsumerConfig() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "unicorn.riderequest.consumer"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, regionName); props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName()); return props;
}

  • startConsumer() assume le rôle dans le compte B pour pouvoir se connecter au Schema Registry du compte B et lit les messages du sujet Kafka sur le cluster MSK :
public void startConsumer() { logger.info("starting consumer..."); assumeGlueSchemaRegistryRole(); KafkaConsumer<String, UnicornRideRequest> consumer = new KafkaConsumer<String, UnicornRideRequest>(getConsumerConfig()); consumer.subscribe(Collections.singletonList(topic)); int count = 0; while (true) { final ConsumerRecords<String, UnicornRideRequest> records = consumer.poll(Duration.ofMillis(1000)); for (final ConsumerRecord<String, UnicornRideRequest> record : records) { final UnicornRideRequest rideRequest = record.value(); logger.info(String.valueOf(rideRequest.getRequestId())); logger.info(rideRequest.toString()); } }
}

  • assumeGlueSchemaRegistryRole() comme indiqué dans le code suivant, utilise AWS STS pour assumer le rôle IAM de Schema Registry entre comptes dans le compte B. La réponse de stsClient.assumeRole(roleRequest) contient les informations d'identification temporaires, qui incluent accessKeyId, secretAccessKeyEt un sessionToken. Il définit ensuite les informations d'identification temporaires dans les propriétés système. Le SDK pour Java utilise ces informations d'identification lors de l'accès au Schema Registry (via le sérialiseur Schema Registry). Pour plus d'informations, voir Utilisation des informations d'identification.
public void assumeGlueSchemaRegistryRole() { try { Region region = Region.of(regionName); if(!Region.regions().contains(region)) throw new RuntimeException("Region : " + regionName + " is invalid."); StsClient stsClient = StsClient.builder().region(region).build(); AssumeRoleRequest roleRequest = AssumeRoleRequest.builder() .roleArn(this.assumeRoleARN) .roleSessionName("kafka-consumer-cross-account-glue-schemaregistry-demo") .externalId(this.externalId) .build(); AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest); Credentials myCreds = roleResponse.credentials(); System.setProperty("aws.accessKeyId", myCreds.accessKeyId()); System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey()); System.setProperty("aws.sessionToken", myCreds.sessionToken()); stsClient.close(); } catch (StsException e) { logger.error(e.getMessage()); System.exit(1); } }

Compiler et générer des classes de schéma Avro

Comme toute autre partie de la création et du déploiement de votre application, la compilation du schéma et le processus de génération des classes de schéma Avro doivent être inclus dans votre pipeline CI/CD. Il existe plusieurs façons de générer des classes de schéma Avro ; nous utilisons avro-maven-plugin pour ce poste. Le processus CI/CD peut également utiliser avro-tools pour compiler le schéma Avro pour générer des classes. Le code suivant est un exemple de la façon dont vous pouvez utiliser avro-tools:

java -jar /path/to/avro-tools-1.10.2.jar compile schema <schema file> <destination> //compiling unicorn_ride_request.avsc
java -jar avro-tools-1.10.2.jar compile schema unicorn_ride_request.avsc .

Vue d'ensemble

Pour récapituler, nous commençons par définir et enregistrer un schéma Avro pour le message de demande de trajet licorne dans le registre de schémas AWS Glue dans le compte B, le compte central du lac de données. Dans le compte A, nous créons un cluster MSK et des instances EC2 producteur et consommateur Kafka avec leur code d'application respectif (kafka-cross-account-gsr-consumer.jar et kafka-cross-account-gsr-producer.jar) et déployés à l'aide de la pile CloudFormation.

Lorsque nous exécutons l'application du producteur dans le compte A, le sérialiseur (GlueSchemaRegistryKafkaSerializer) à partir de la bibliothèque AWS Glue Schema Registry SerDe fournie lorsque la configuration obtient le schéma de demande de trajet licorne (UnicornRideRequest.avsc) du registre central des schémas résidant dans le compte B pour sérialiser le message de demande de trajet licorne. Il utilise le rôle IAM (informations d'identification temporaires) dans le compte B et la région, le nom du registre de schéma (unicorn-ride-request-registry) et le nom du schéma (unicorn-ride-request-schema-avro) fourni comme configuration pour se connecter au registre de schémas central. Une fois le message sérialisé avec succès, l'application producteur l'envoie à la rubrique Kafka (unicorn-ride-request-topic) sur le cluster MSK.

Lorsque nous exécutons l'application consommateur dans le compte A, le désérialiseur (GlueSchemaRegistryKafkaDeserializer) de la bibliothèque Schema Registry SerDe fournie lorsque la configuration extrait l'ID de schéma codé du message lu à partir du sujet Kafka (unicorn-ride-request-topic) et obtient le schéma pour le même ID à partir du registre de schémas central du compte B. Il désérialise ensuite le message. Il utilise le rôle IAM (informations d'identification temporaires) dans le compte B et la région fournie comme configuration pour se connecter au registre central des schémas. L'application grand public configure également Avro SPECIFIC_RECORD pour informer le désérialiseur que le message est d'un type spécifique (demande de trajet licorne). Une fois le message désérialisé avec succès, l'application consommateur le traite conformément aux exigences.

Nettoyer

La dernière étape consiste à nettoyer. Pour éviter des frais inutiles, vous devez supprimer toutes les ressources créées par les piles CloudFormation utilisées pour cette publication. La façon la plus simple de le faire est de supprimer les piles. Supprimez d'abord le MSKClusterStack suivie par MSKClientStack du compte A. Supprimez ensuite le SchemaRegistryStack du compte B.

Conclusion

Dans cet article, nous avons montré comment utiliser AWS Glue Schema Registry avec Amazon MSK et les applications de traitement de flux pour valider les messages à l'aide d'un schéma Avro. Nous avons créé une architecture distribuée dans laquelle le Schema Registry réside dans un compte AWS central (compte de lac de données) et les applications producteur et consommateur Kafka résident dans un compte AWS distinct. Nous avons créé un schéma Avro dans le registre de schémas du compte central pour permettre aux équipes d'application de gérer efficacement les schémas à un seul endroit. Étant donné qu'AWS Glue Schema Registry prend en charge les stratégies d'accès basées sur l'identité, nous avons utilisé le rôle IAM entre comptes pour permettre aux applications producteur et consommateur Kafka exécutées dans un compte séparé d'accéder en toute sécurité au schéma à partir du compte central pour valider les messages. Parce que le schéma Avro a été convenu à l'avance, nous avons utilisé Avro SpecificRecord pour garantir la sécurité du type au moment de la compilation et éviter les problèmes de validation du schéma d'exécution côté client. Le code utilisé pour ce post est disponible sur GitHub pour référence.

Pour en savoir plus sur les services et les ressources de cette solution, reportez-vous à Registre de schémas AWS Glue, Guide du développeur Amazon MSK, Bibliothèque AWS Glue Schema Registry SerDeet Tutoriel IAM : Déléguez l'accès entre les comptes AWS à l'aide des rôles IAM.


À propos de l’auteur

Vikas Bajaj est architecte principal de solutions chez Amazon Web Service. Vikas travaille avec des clients natifs numériques et les conseille sur l'architecture et la modélisation technologiques, ainsi que sur les options et les solutions pour atteindre les objectifs stratégiques de l'entreprise. Il s'assure que les conceptions et les solutions sont efficaces, durables et adaptées aux besoins actuels et futurs de l'entreprise. Outre les discussions sur l'architecture et la technologie, il aime regarder et jouer au cricket.

spot_img

Dernières informations

spot_img