Zephyrnet-logo

Valideer streaminggegevens via Amazon MSK met behulp van schema's in AWS Glue Schema Registry voor meerdere accounts

Datum:

De bedrijven van vandaag worden geconfronteerd met een ongekende groei van de hoeveelheid gegevens. Een groeiend deel van de gegevens wordt in realtime gegenereerd door IoT-apparaten, websites, bedrijfsapplicaties en verschillende andere bronnen. Bedrijven moeten deze gegevens verwerken en analyseren zodra ze binnenkomen om in realtime zakelijke beslissingen te kunnen nemen. Amazon Managed Streaming voor Apache Kafka (Amazon MSK) is een volledig beheerde service waarmee streamverwerkingsapplicaties kunnen worden gebouwd en uitgevoerd die Apache Kafka gebruiken om gegevens in realtime te verzamelen en te verwerken.

Streamverwerkingsapplicaties die Apache Kafka gebruiken, communiceren niet rechtstreeks met elkaar; ze communiceren via het verzenden en ontvangen van berichten over Kafka-onderwerpen. Om stroomverwerkingsapplicaties efficiënt en zelfverzekerd te laten communiceren, moet een bericht-payload-structuur worden gedefinieerd in termen van attributen en gegevenstypen. Deze structuur beschrijft de schematoepassingen die worden gebruikt bij het verzenden en ontvangen van berichten. Bij een groot aantal producenten- en consumententoepassingen kan zelfs een kleine wijziging in het schema (het verwijderen van een veld, het toevoegen van een nieuw veld of het wijzigen van het gegevenstype) problemen veroorzaken voor downstream-toepassingen die moeilijk te debuggen en op te lossen zijn.

Traditioneel vertrouwden teams op processen voor wijzigingsbeheer (zoals goedkeuringen en onderhoudsvensters) of andere informele mechanismen (documentatie, e-mails, samenwerkingstools, enzovoort) om elkaar te informeren over wijzigingen in gegevensschema's. Deze mechanismen schalen echter niet en zijn vatbaar voor fouten. De AWS Lijm Schema Register stelt u in staat om centraal schema's voor stroomverwerkingstoepassingen te publiceren, ontdekken, controleren, valideren en ontwikkelen. Met de AWS Lijm Schema Registerkunt u schema's beheren en afdwingen in toepassingen voor gegevensstreaming met behulp van Apache Kafka, Amazon MSK, Amazon Kinesis-gegevensstromen, Amazon Kinesis Data Analytics voor Apache Flink en AWS Lambda.

Dit bericht laat zien hoe Apache Kafka-stroomverwerkingsapplicaties berichten valideren met behulp van een Apache Avro schema opgeslagen in de AWS Glue Schema-register woonachtig in een centraal AWS-account. Wij gebruiken de AWS Glue Schema Register SerDe bibliotheek en Avro SpecificRecord om berichten in stroomverwerkingstoepassingen te valideren tijdens het verzenden en ontvangen van berichten van een Kafka-onderwerp op een Amazon MSK-cluster. Hoewel we voor dit bericht een Avro-schema gebruiken, is dezelfde benadering en hetzelfde concept ook van toepassing op JSON-schema's.

Gebruik geval

Laten we uitgaan van een fictief ritbedrijf dat ritjes met eenhoorn aanbiedt. Om bruikbare inzichten te verkrijgen, moeten ze een stroom van berichten over ritverzoeken voor eenhoorns verwerken. Ze verwachten dat ritten erg populair zullen zijn en willen ervoor zorgen dat hun oplossing kan schalen. Ze bouwen ook een centraal datameer waar al hun streaming- en bedrijfsgegevens worden opgeslagen voor analyse. Ze zijn geobsedeerd door klanten, dus ze verwachten nieuwe leuke functies toe te voegen aan toekomstige ritten, zoals het kiezen van de haarkleur van je eenhoorn, en zullen deze kenmerken moeten weergeven in de ritverzoekberichten. Om problemen in downstream-applicaties als gevolg van toekomstige schemawijzigingen te voorkomen, hebben ze een mechanisme nodig om berichten te valideren met een schema dat wordt gehost in een centraal schemaregister. Door schema's in een centraal schemaregister te hebben, kunnen applicatieteams gemakkelijker schema's op één plek publiceren, valideren, ontwikkelen en onderhouden.

Overzicht oplossingen

Het bedrijf gebruikt Amazon MSK om de verzoeken om eenhoornrit op grote schaal vast te leggen en te verspreiden. Ze definiëren een Avro-schema voor verzoeken om eenhoornrit, omdat het uitgebreide gegevensstructuren biedt, directe toewijzing aan JSON ondersteunt, evenals een compact, snel en binair gegevensformaat. Omdat het schema van tevoren was afgesproken, besloten ze om Avro . te gebruiken SpecificRecord.SpecificRecord is een interface van de Avro-bibliotheek die het gebruik van een Avro-record als een POJO mogelijk maakt. Dit wordt gedaan door een Java-klasse (of klassen) uit het schema te genereren, met behulp van avro-maven-plugin. Ze gebruiken AWS Identiteits- en toegangsbeheer (IAM) rollen voor meerdere accounts om producenten- en consumententoepassingen van het andere AWS-account veilig en beveiligd toegang te geven tot schema's in het centrale Schema Registry-account.

Het AWS Glue Schema-register bevindt zich in account B, terwijl het MSK-cluster en Kafka-producenten en -consumententoepassingen zich in account A bevinden. We gebruiken de volgende twee IAM-rollen om toegang tussen verschillende accounts tot het AWS Glue Schema-register mogelijk te maken. Apache Kafka-clients in account A nemen een rol aan in account B met behulp van een op identiteit gebaseerd beleid, omdat het AWS Glue Schema-register geen op bronnen gebaseerd beleid ondersteunt.

  • Account A IAM-rol – Hiermee kunnen producenten- en consumententoepassingen een IAM-rol spelen in account B.
  • IAM-rol account B – Vertrouwt alle IAM-principals van account A en stelt hen in staat om leesacties uit te voeren op het AWS Glue Schema-register in account B. In een reëel gebruiksscenario moeten IAM-principals die meerdere accountrollen kunnen aannemen, specifieker worden bestreken.

Het volgende architectuurdiagram illustreert de oplossing:

De oplossing werkt als volgt:

  1. Een Kafka-producent die wordt uitgevoerd in account A neemt de IAM-rol van het schemaregister voor meerdere accounts in account B over door de AWS-beveiligingstoken-service (AWS STS) assumeRole API.
  2. De Kafka-producent haalt de eenhoornritverzoek Avro-schemaversie-ID op uit het AWS Glue Schema-register voor het schema dat is ingesloten in het eenhoornritverzoek POJO. Het ophalen van de schemaversie-ID wordt intern beheerd door de serializer van de AWS Glue Schema Registry SerDe. De serializer moet worden geconfigureerd als onderdeel van de Kafka-producentconfiguratie.
  3. Als het schema bestaat in het AWS Glue Schema-register, versiert de serializer het gegevensrecord met de schemaversie-ID en serialiseert het vervolgens voordat het wordt afgeleverd bij het Kafka-onderwerp op het MSK-cluster.
  4. De Kafka-consument die in account A wordt uitgevoerd, neemt de IAM-rol van het schemaregister voor meerdere accounts in account B over door de AWS STS te bellen assumeRole API.
  5. De Kafka-consument begint het Kafka-onderwerp op het MSK-cluster te pollen voor gegevensrecords.
  6. De Kafka-consument haalt het Avro-schema voor eenhoornritverzoek op uit het AWS Glue Schema-register, overeenkomend met de versie-ID van het schema die is gecodeerd in het gegevensrecord voor verzoekgegevens voor eenhoornrit. Het schema ophalen
    a wordt intern beheerd door de deserializer van de AWS Glue Schema Registry SerDe. De deserializer moet worden geconfigureerd als onderdeel van de Kafka-consumentenconfiguratie. Als het schema bestaat in het AWS Glue Schema-register, deserialiseert de deserializer het datarecord naar het POJO-verzoek voor eenhoornrit zodat de consument het kan verwerken.

De AWS Glue Schema Registry SerDe-bibliotheek ondersteunt ook optionele compressieconfiguratie om te besparen op gegevensoverdrachten. Voor meer informatie over het Schemaregister, zie: Hoe het Schemaregister werkt.

Eenhoorn rit verzoek Avro schema

Het volgende schema (UnicornRideRequest.avsc) definieert een record dat een ritverzoek van een eenhoorn vertegenwoordigt, dat attributen voor ritverzoeken bevat, samen met de kenmerken van de klant en door het systeem aanbevolen eenhoornattributen:

{ "type": "record", "name": "UnicornRideRequest", "namespace": "demo.glue.schema.registry.avro", "fields": [ {"name": "request_id", "type": "int", "doc": "customer request id"}, {"name": "pickup_address","type": "string","doc": "customer pickup address"}, {"name": "destination_address","type": "string","doc": "customer destination address"}, {"name": "ride_fare","type": "float","doc": "ride fare amount (USD)"}, {"name": "ride_duration","type": "int","doc": "ride duration in minutes"}, {"name": "preferred_unicorn_color","type": {"type": "enum","name": "UnicornPreferredColor","symbols": ["WHITE","BLACK","RED","BLUE","GREY"]}, "default": "WHITE"}, { "name": "recommended_unicorn", "type": { "type": "record", "name": "RecommendedUnicorn", "fields": [ {"name": "unicorn_id","type": "int", "doc": "recommended unicorn id"}, {"name": "color","type": {"type": "enum","name": "unicorn_color","symbols": ["WHITE","RED","BLUE"]}}, {"name": "stars_rating", "type": ["null", "int"], "default": null, "doc": "unicorn star ratings based on customers feedback"} ] } }, { "name": "customer", "type": { "type": "record", "name": "Customer", "fields": [ {"name": "customer_account_no","type": "int", "doc": "customer account number"}, {"name": "first_name","type": "string"}, {"name": "middle_name","type": ["null","string"], "default": null}, {"name": "last_name","type": "string"}, {"name": "email_addresses","type": ["null", {"type":"array", "items":"string"}]}, {"name": "customer_address","type": "string","doc": "customer address"}, {"name": "mode_of_payment","type": {"type": "enum","name": "ModeOfPayment","symbols": ["CARD","CASH"]}, "default": "CARD"}, {"name": "customer_rating", "type": ["null", "int"], "default": null} ] } } ] }

Voorwaarden

Om deze oplossing te gebruiken, moet u twee AWS-accounts hebben:

  • Rekening A – Voor het MSK cluster, Kafka producent en consument Amazon Elastic Compute-cloud (Amazon EC2) instanties, en AWS-Cloud9 milieu
  • Rekening B – Voor het Schemaregister en schema

Voor deze oplossing gebruiken we Regio us-east-1, maar u kunt dit naar eigen wens wijzigen.

Vervolgens maken we de bronnen in elk account met behulp van AWS CloudFormatie sjablonen.

Resources aanmaken in account B

We maken de volgende bronnen aan in account B:

  • Een schemaregister
  • Een Avro-schema
  • Een IAM-functie met de AWSGlueSchemaRegistryReadonlyAccess beheerd beleid en een instantieprofiel, waardoor alle account A IAM-principals dit kunnen aannemen
  • De UnicornRideRequest.avsc Eerder getoond Avro-schema, dat wordt gebruikt als schemadefinitie in de CloudFormation-sjabloon

Zorg ervoor dat u over de juiste machtigingen beschikt om deze bronnen te maken.

  1. Log in op account B.
  2. Start het volgende: CloudFormation-stapel.
  3. Voor Stack naam, ga naar binnen SchemaRegistryStack.
  4. Voor Schema Registernaam, ga naar binnen unicorn-ride-request-registry.
  5. Voor Naam Avro Schema, ga naar binnen unicorn-ride-request-schema-avro.
  6. Voer voor de AWS-account-ID van de Kafka-client uw account A-ID in.
  7. Voor ExterneId, voer een unieke willekeurige ID in (bijvoorbeeld demo10A), die moet worden verstrekt door de Kafka-klanten in account A terwijl ze de IAM-rol in dit account op zich nemen.

Zie voor meer informatie over beveiliging tussen accounts: Het verwarde plaatsvervangend probleem.

  1. Wanneer de stapel compleet is, op de Uitgangen tabblad van de stapel, kopieer de waarde voor CrossAccountGlueSchemaRegistryRoleArn.

De Kafka-producenten- en consumententoepassingen die in account A zijn gemaakt, nemen deze rol op zich om toegang te krijgen tot het schemaregister en het schema in account B.

  1. Om te controleren of de bronnen zijn gemaakt, kiest u op de AWS Glue-console: Schemaregisters in de navigatiebalk en zoek unicorn-ride-request-registry.
  2. Kies het register unicorn-ride-request-registry en controleer of het bevat: unicorn-ride-request-schema-avro in de schema's pagina.
  3. Kies het schema om de inhoud ervan te zien.

De IAM-rol gecreëerd door de SchemaRegistryStack stack stelt alle Account A IAM-principals in staat om het over te nemen en leesacties uit te voeren op het AWS Glue Schema Registry. Laten we eens kijken naar de vertrouwensrelaties van de IAM-rol.

  1. Op de SchemaRegistryStack stack Uitgangen tab, kopieer de waarde voor CrossAccountGlueSchemaRegistryRoleName.
  2. Zoek op de IAM-console naar deze rol.
  3. Kies Vertrouwensrelaties en kijk naar de vertrouwde entiteiten om te bevestigen dat account A wordt vermeld.
  4. In het Algemene voorwaarden sectie, bevestig dat sts:ExternalId heeft dezelfde unieke willekeurige ID die is opgegeven tijdens het maken van de stapel.

Resources maken in account A

We maken de volgende bronnen aan in account A:

  • Een VPC
  • EC2-instanties voor de Kafka-producent en -consument
  • Een AWS Cloud9-omgeving
  • Een MSK-cluster

Maak als eerste vereiste een EC2-sleutelpaar en download dit op uw computer om SSH naar EC2-instanties te kunnen verzenden. Maak ook een MSK-clusterconfiguratie met standaardwaarden. U moet machtigingen hebben om de CloudFormation te maken
stack, EC2-instanties, AWS Cloud9-omgeving, MSK-cluster, MSK-clusterconfiguratie en IAM-rol.

  1. Log in op account A.
  2. Start het volgende: CloudFormation-stapel om de VPC-, EC2-instanties en AWS Cloud9-omgeving te starten.
  3. Voor Stack naam, ga naar binnen MSKClientStack.
  4. Geef de VPC- en subnet-CIDR-bereiken op.
  5. Voor EC2 Sleutelpaar, kies een bestaand EC2-sleutelpaar.
  6. Selecteer de standaardoptie voor de nieuwste EC2 AMI ID.
  7. Gebruik voor de IAM-rol voor meerdere accounts ARN de waarde voor CrossAccountGlueSchemaRegistryRoleArn (beschikbaar op de Uitgangen tabblad van SchemaRegistryStack).
  8. Wacht tot de stapel succesvol is gemaakt.
  9. Start het volgende: CloudFormation-stapel om het MSK-cluster te maken.
  10. Voor Stack naam, ga naar binnen MSKClusterStack.
  11. Gebruik Amazon MSK versie 2.7.1.
  12. Voer voor de MSK-clusterconfiguratie ARN de MSK-clusterconfiguratie ARN in. Een die u hebt gemaakt als onderdeel van de vereiste.
  13. Voer voor het revisienummer van de MSK-clusterconfiguratie 1 in of wijzig het volgens uw versie.
  14. Voer voor de client CloudFormation-stacknaam in: MSKClientStack (de stapelnaam die u vóór deze stapel hebt gemaakt).

Configureer de Kafka-producent

Voer de volgende stappen uit om de Kafka-producent te configureren die toegang heeft tot het Schema-register in het centrale AWS-account:

  1. Log in op account A.
  2. Kies op de AWS Cloud9-console de Cloud9EC2Bastion omgeving gecreëerd door de MSKClientStack stack.
  3. Op de Dien in menu, kies Lokale bestanden uploaden.
  4. Upload het EC2-sleutelpaarbestand dat u eerder hebt gebruikt bij het maken van de stapel.
  5. Open een nieuwe terminal en wijzig de EC2-sleutelpaarrechten:
    chmod 0400 <keypair PEM file>

  6. SSH in de KafkaProducerInstance EC2-instantie en stel de regio in volgens uw vereisten:
    ssh -i <keypair PEM file> ec2-user@<KafkaProducerInstance Private IP address>
    aws configure set region <region>

  7. Stel de omgevingsvariabele in MSK_CLUSTER_ARN verwijzend naar de ARN van het MSK-cluster:
    export MSK_CLUSTER_ARN=$(aws kafka list-clusters | jq '.ClusterInfoList[] | select (.ClusterName == "MSKClusterStack") | {ClusterArn} | join (" ")' | tr -d ")

Verander de .ClusterName waarde in de code als u een andere naam hebt gebruikt voor de MSK-cluster CloudFormation-stack. De clusternaam is hetzelfde als de stapelnaam.

  1. Stel de omgevingsvariabele in BOOTSTRAP_BROKERS verwijzend naar de bootstrap-makelaars:
    export BOOTSTRAP_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerString)

  2. Controleer de omgevingsvariabelen:
    echo $MSK_CLUSTER_ARN
    echo $BOOTSTRAP_BROKERS

  3. Maak een Kafka-onderwerp met de naam unicorn-ride-request-topic in uw MSK-cluster, dat later wordt gebruikt door de Kafka-producent en consumententoepassingen:
    cd ~/kafka ./bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS --topic unicorn-ride-request-topic --create --partitions 3 --replication-factor 2 ./bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS --list

De MSKClientStack stack kopieerde het JAR-bestand van de Kafka-producerclient met de naam kafka-cross-account-gsr-producer.jar aan de KafkaProducerInstance voorbeeld. Het bevat de Kafka-producerclient die berichten verzendt naar het Kafka-onderwerp unicorn-ride-request-topic op het MSK-cluster en toegang heeft tot de unicorn-ride-request-schema-avro Avro-schema van de unicorn-ride-request-registry schemaregister in account B. De Kafka-producentcode, die we later in dit bericht behandelen, is beschikbaar op GitHub.

  1. Voer de volgende opdrachten uit en controleer: kafka-cross-account-gsr-producer.jar bestaat:
    cd ~
    ls -ls

  2. Voer de volgende opdracht uit om de Kafka-producer uit te voeren in de KafkaProducerInstance terminal:
    java -jar kafka-cross-account-gsr-producer.jar -bs $BOOTSTRAP_BROKERS -rn <Account B IAM role arn that Kafka producer application needs to assume> -topic unicorn-ride-request-topic -reg us-east-1 -nm 500 -externalid <Account B IAM role external Id that you used while creating a CF stack in Account B>

De code heeft de volgende parameters:

  • -bs - $BOOTSTRAP_BROKERS (de MSK-cluster bootstrap-makelaars)
  • -rn - The CrossAccountGlueSchemaRegistryRoleArn waarde van de SchemaRegistryStack stapeluitgangen in account B
  • -onderwerp – het Kafka-onderwerp unicorn-ride-request-topic
  • -reg - us-east-1 (wijzig het volgens uw regio, het wordt gebruikt voor het AWS STS-eindpunt en het schemaregister)
  • -nm: 500 (het aantal berichten dat de producer-applicatie naar het Kafka-onderwerp stuurt)
  • -externe id – Dezelfde externe ID (bijvoorbeeld demo10A) die u hebt gebruikt bij het maken van de CloudFormation-stack in Account B

De volgende schermafbeelding toont de Kafka-producerlogboeken: Schema Version Id received..., wat betekent dat het het Avro-schema heeft opgehaald unicorn-ride-request-schema-avro van account B en berichten zijn verzonden naar het Kafka-onderwerp op het MSK-cluster in account A.

Kafka-producentencode

De volledige Kafka producer-implementatie is beschikbaar op: GitHub. In deze sectie splitsen we de code op.

  • getProducerConfig() initialiseert de producer-eigenschappen, zoals weergegeven in de volgende code:
    • VALUE_SERIALIZER_CLASS_CONFIG - The GlueSchemaRegistryKafkaSerializer.class.getName() AWS-serializer-implementatie die gegevensrecords serialiseert (de implementatie is beschikbaar op GitHub)
    • REGISTRY_NAME – Het Schema Register van Account B
    • SCHEMA_NAME – De schemanaam van Account B
    • AVRO_RECORD_TYPE - AvroRecordType.SPECIFIC_RECORD
private Properties getProducerConfig() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); props.put(ProducerConfig.ACKS_CONFIG, "-1"); props.put(ProducerConfig.CLIENT_ID_CONFIG,"msk-cross-account-gsr-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name()); props.put(AWSSchemaRegistryConstants.AWS_REGION,regionName); props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "unicorn-ride-request-registry"); props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "unicorn-ride-request-schema-avro"); props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName()); return props;
}

  • startProducer() neemt de rol op zich in account B om verbinding te kunnen maken met het schemaregister in account B en stuurt berichten naar het Kafka-onderwerp op het MSK-cluster:
public void startProducer() { assumeGlueSchemaRegistryRole(); KafkaProducer<String, UnicornRideRequest> producer = new KafkaProducer<String,UnicornRideRequest>(getProducerConfig()); int numberOfMessages = Integer.valueOf(str_numOfMessages); logger.info("Starting to send records..."); for(int i = 0;i < numberOfMessages;i ++) { UnicornRideRequest rideRequest = getRecord(i); String key = "key-" + i; ProducerRecord<String, UnicornRideRequest> record = new ProducerRecord<String, UnicornRideRequest>(topic, key, rideRequest); producer.send(record, new ProducerCallback()); } }

  • assumeGlueSchemaRegistryRole() zoals weergegeven in de volgende code gebruikt AWS STS om de IAM-rol van het Schemaregister voor meerdere accounts in account B op zich te nemen. (Zie voor meer informatie Tijdelijke beveiligingsgegevens in IAM.) Het antwoord van stsClient.assumeRole(roleRequest) bevat de tijdelijke inloggegevens, waaronder: accessKeyId, secretAccessKeyEn een sessionToken. Vervolgens worden de tijdelijke referenties in de systeemeigenschappen ingesteld. De AWS SDK voor Java gebruikt deze inloggegevens bij het openen van het Schema Registry (via de Schema Registry-serializer). Voor meer informatie, zie Inloggegevens gebruiken.
    public void assumeGlueSchemaRegistryRole() { try { Region region = Region.of(regionName); if(!Region.regions().contains(region)) throw new RuntimeException("Region : " + regionName + " is invalid."); StsClient stsClient = StsClient.builder().region(region).build(); AssumeRoleRequest roleRequest = AssumeRoleRequest.builder() .roleArn(this.assumeRoleARN) .roleSessionName("kafka-producer-cross-account-glue-schemaregistry-demo") .externalId(this.externalId) .build(); AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest); Credentials myCreds = roleResponse.credentials(); System.setProperty("aws.accessKeyId", myCreds.accessKeyId()); System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey()); System.setProperty("aws.sessionToken", myCreds.sessionToken()); stsClient.close(); } catch (StsException e) { logger.error(e.getMessage()); System.exit(1); } }

  • createUnicornRideRequest() gebruikt het door Avro-schema (eenhoorn-ritverzoekschema) gegenereerde klassen om een SpecificRecord. Voor dit bericht zijn de kenmerken van de eenhoornritverzoek-attributen hard gecodeerd in deze methode. Zie de volgende code:
    public UnicornRideRequest getRecord(int requestId){ /* Initialise UnicornRideRequest object of class that is generated from AVRO Schema */ UnicornRideRequest rideRequest = UnicornRideRequest.newBuilder() .setRequestId(requestId) .setPickupAddress("Melbourne, Victoria, Australia") .setDestinationAddress("Sydney, NSW, Aus") .setRideFare(1200.50F) .setRideDuration(120) .setPreferredUnicornColor(UnicornPreferredColor.WHITE) .setRecommendedUnicorn(RecommendedUnicorn.newBuilder() .setUnicornId(requestId*2) .setColor(unicorn_color.WHITE) .setStarsRating(5).build()) .setCustomer(Customer.newBuilder() .setCustomerAccountNo(1001) .setFirstName("Dummy") .setLastName("User") .setEmailAddresses(Arrays.asList("demo@example.com")) .setCustomerAddress("Flinders Street Station") .setModeOfPayment(ModeOfPayment.CARD) .setCustomerRating(5).build()).build(); logger.info(rideRequest.toString()); return rideRequest; }

Configureer de Kafka-consument

De MSKClientStack stapel creëerde de KafkaConsumerInstance bijvoorbeeld voor de Kafka-consumententoepassing. U kunt alle instanties bekijken die door de stapel zijn gemaakt op de Amazon EC2-console.

Voer de volgende stappen uit om de Kafka-consument te configureren die toegang heeft tot het Schema-register in het centrale AWS-account:

  1. Open een nieuwe terminal in de Cloud9EC2Bastion AWS Cloud9-omgeving.
  2. SSH in de KafkaConsumerInstance EC2-instantie en stel de regio in volgens uw vereisten:
    ssh -i <keypair PEM file> ec2-user@<KafkaConsumerInstance Private IP address>
    aws configure set region <region>

  3. Stel de omgevingsvariabele in MSK_CLUSTER_ARN verwijzend naar de ARN van het MSK-cluster:
    export MSK_CLUSTER_ARN=$(aws kafka list-clusters | jq '.ClusterInfoList[] | select (.ClusterName == "MSKClusterStack") | {ClusterArn} | join (" ")' | tr -d ")

Verander de .ClusterName waarde als u een andere naam hebt gebruikt voor de MSK-cluster CloudFormation-stack. De clusternaam is hetzelfde als de stapelnaam.

  1. Stel de omgevingsvariabele in BOOTSTRAP_BROKERS verwijzend naar de bootstrap-makelaars:
    export BOOTSTRAP_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerString)

  2. Controleer de omgevingsvariabelen:
    echo $MSK_CLUSTER_ARN
    echo $BOOTSTRAP_BROKERS

De MSKClientStack stack kopieerde het JAR-bestand van de Kafka-consumentenclient met de naam kafka-cross-account-gsr-consumer.jar aan de KafkaConsumerInstance voorbeeld. Het bevat de Kafka-consumentenclient die berichten uit het Kafka-onderwerp leest unicorn-ride-request-topic op het MSK-cluster en heeft toegang tot de unicorn-ride-request-schema-avro Avro-schema van de unicorn-ride-request-registry register in account B. De Kafka-consumentencode, die we verderop in dit bericht behandelen, is beschikbaar op GitHub.

  1. Voer de volgende opdrachten uit en controleer: kafka-cross-account-gsr-consumer.jar bestaat:
    cd ~
    ls -ls

  2. Voer de volgende opdracht uit om de Kafka-consument uit te voeren in de KafkaConsumerInstance terminal:
    java -jar kafka-cross-account-gsr-consumer.jar -bs $BOOTSTRAP_BROKERS -rn <Account B IAM role arn that Kafka consumer application needs to assume> -topic unicorn-ride-request-topic -reg us-east-1 -externalid <Account B IAM role external Id that you used while creating a CF stack in Account B>

De code heeft de volgende parameters:

  • -bs - $BOOTSTRAP_BROKERS (de MSK-cluster bootstrap-makelaars)
  • -rn - The CrossAccountGlueSchemaRegistryRoleArn waarde van de SchemaRegistryStack stapeluitgangen in account B
  • -onderwerp – Het Kafka-onderwerp unicorn-ride-request-topic
  • -reg - us-east-1 (wijzig het volgens uw regio, het wordt gebruikt voor het AWS STS-eindpunt en het schemaregister)
  • -externe id – Dezelfde externe ID (bijvoorbeeld demo10A) die u hebt gebruikt bij het maken van de CloudFormation-stack in Account B

De volgende schermafbeelding toont de Kafka-consumentenlogboeken die met succes berichten lezen van het Kafka-onderwerp op het MSK-cluster in account A en toegang krijgen tot het Avro-schema unicorn-ride-request-schema-avro van het unicorn-ride-request-registry schemaregister in account B.

Als u vergelijkbare logboeken ziet, betekent dit dat beide Kafka-consumententoepassingen met succes verbinding hebben kunnen maken met het gecentraliseerde Schemaregister in account B en dat ze berichten kunnen valideren terwijl ze berichten van het MSK-cluster in account A verzenden en consumeren.

Kafka-consumentencode

De volledige Kafka-consumentenimplementatie is beschikbaar op: GitHub. In deze sectie splitsen we de code op.

  • getConsumerConfig() initialiseert consumenteneigenschappen, zoals weergegeven in de volgende code:
    • VALUE_DESERIALIZER_CLASS_CONFIG - The GlueSchemaRegistryKafkaDeserializer.class.getName() AWS deserializer-implementatie die de . deserialiseert SpecificRecord volgens de gecodeerde schema-ID van het Schemaregister (de implementatie is beschikbaar op GitHub).
    • AVRO_RECORD_TYPE - AvroRecordType.SPECIFIC_RECORD
private Properties getConsumerConfig() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "unicorn.riderequest.consumer"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, regionName); props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName()); return props;
}

  • startConsumer() neemt de rol op zich in account B om verbinding te kunnen maken met het schemaregister in account B en leest berichten uit het Kafka-onderwerp op het MSK-cluster:
public void startConsumer() { logger.info("starting consumer..."); assumeGlueSchemaRegistryRole(); KafkaConsumer<String, UnicornRideRequest> consumer = new KafkaConsumer<String, UnicornRideRequest>(getConsumerConfig()); consumer.subscribe(Collections.singletonList(topic)); int count = 0; while (true) { final ConsumerRecords<String, UnicornRideRequest> records = consumer.poll(Duration.ofMillis(1000)); for (final ConsumerRecord<String, UnicornRideRequest> record : records) { final UnicornRideRequest rideRequest = record.value(); logger.info(String.valueOf(rideRequest.getRequestId())); logger.info(rideRequest.toString()); } }
}

  • assumeGlueSchemaRegistryRole() zoals weergegeven in de volgende code gebruikt AWS STS om de IAM-rol van het Schemaregister voor meerdere accounts in account B op zich te nemen. Het antwoord van stsClient.assumeRole(roleRequest) bevat de tijdelijke inloggegevens, waaronder: accessKeyId, secretAccessKeyEn een sessionToken. Vervolgens worden de tijdelijke referenties in de systeemeigenschappen ingesteld. De SDK voor Java gebruikt deze referenties bij toegang tot het Schema-register (via de Schema-register-serializer). Voor meer informatie, zie Inloggegevens gebruiken.
public void assumeGlueSchemaRegistryRole() { try { Region region = Region.of(regionName); if(!Region.regions().contains(region)) throw new RuntimeException("Region : " + regionName + " is invalid."); StsClient stsClient = StsClient.builder().region(region).build(); AssumeRoleRequest roleRequest = AssumeRoleRequest.builder() .roleArn(this.assumeRoleARN) .roleSessionName("kafka-consumer-cross-account-glue-schemaregistry-demo") .externalId(this.externalId) .build(); AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest); Credentials myCreds = roleResponse.credentials(); System.setProperty("aws.accessKeyId", myCreds.accessKeyId()); System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey()); System.setProperty("aws.sessionToken", myCreds.sessionToken()); stsClient.close(); } catch (StsException e) { logger.error(e.getMessage()); System.exit(1); } }

Compileer en genereer Avro-schemaklassen

Net als elk ander onderdeel van het bouwen en implementeren van uw toepassing, moeten schemacompilatie en het proces van het genereren van Avro-schemaklassen worden opgenomen in uw CI/CD-pijplijn. Er zijn meerdere manieren om Avro-schemaklassen te genereren; we gebruiken avro-maven-plugin voor dit bericht. Het CI/CD-proces kan ook gebruik maken van: avro-tools om Avro-schema te compileren om klassen te genereren. De volgende code is een voorbeeld van hoe u kunt gebruiken avro-tools:

java -jar /path/to/avro-tools-1.10.2.jar compile schema <schema file> <destination> //compiling unicorn_ride_request.avsc
java -jar avro-tools-1.10.2.jar compile schema unicorn_ride_request.avsc .

Implementatie overzicht

Om samen te vatten, beginnen we met het definiëren en registreren van een Avro-schema voor het eenhoorn-ritverzoekbericht in het AWS Glue Schema-register in account B, het centrale datameer-account. In account A maken we een MSK-cluster en Kafka-producenten en consumenten EC2-instanties met hun respectievelijke applicatiecode (kafka-cross-account-gsr-consumer.jar en kafka-cross-account-gsr-producer.jar) en daarin geïmplementeerd met behulp van de CloudFormation-stack.

Wanneer we de producer-toepassing in account A uitvoeren, wordt de serializer (GlueSchemaRegistryKafkaSerializer) uit de AWS Glue Schema Registry SerDe-bibliotheek die wordt verstrekt als de configuratie het verzoekschema voor eenhoornrit krijgt (UnicornRideRequest.avsc) van het centrale Schemaregister dat zich in account B bevindt om het verzoekbericht voor een eenhoornrit te serialiseren. Het gebruikt de IAM-rol (tijdelijke referenties) in account B en regio, schemaregisternaam (unicorn-ride-request-registry), en schemanaam (unicorn-ride-request-schema-avro) opgegeven als de configuratie om verbinding te maken met het centrale Schemaregister. Nadat het bericht succesvol is geserialiseerd, stuurt de producer-applicatie het naar het Kafka-onderwerp (unicorn-ride-request-topic) op het MSK-cluster.

Wanneer we de consumententoepassing in account A uitvoeren, wordt de deserializer (GlueSchemaRegistryKafkaDeserializer) uit de Schema Registry SerDe-bibliotheek die wordt geleverd terwijl de configuratie de gecodeerde schema-ID extraheert uit het bericht dat is gelezen uit het Kafka-onderwerp (unicorn-ride-request-topic) en haalt het schema voor dezelfde ID uit het centrale Schemaregister in Account B. Vervolgens wordt het bericht gedeserialiseerd. Het gebruikt de IAM-rol (tijdelijke inloggegevens) in account B en de opgegeven regio als de configuratie om verbinding te maken met het centrale schemaregister. De consumententoepassing configureert ook Avro's SPECIFIC_RECORD om de deserializer te informeren dat het bericht van een specifiek type is (eenhoornritverzoek). Nadat het bericht met succes is gedeserialiseerd, verwerkt de consumententoepassing het volgens de vereisten.

Opruimen

De laatste stap is opruimen. Om onnodige kosten te voorkomen, moet u alle bronnen verwijderen die zijn gemaakt door de CloudFormation-stacks die voor dit bericht zijn gebruikt. De eenvoudigste manier om dit te doen, is door de stapels te verwijderen. Verwijder eerst de MSKClusterStack gevolgd door MSKClientStack van account A. Verwijder vervolgens de SchemaRegistryStack van rekening B.

Conclusie

In dit bericht hebben we laten zien hoe je AWS Glue Schema Registry kunt gebruiken met Amazon MSK en streamverwerkingsapplicaties om berichten te valideren met behulp van een Avro-schema. We hebben een gedistribueerde architectuur gecreëerd waarbij het Schema-register zich in een centraal AWS-account (data lake-account) bevindt en Kafka-producenten en consumententoepassingen in een afzonderlijk AWS-account. We hebben een Avro-schema gemaakt in het schemaregister in het centrale account om het voor de toepassingsteams efficiënt te maken om schema's op één plek te onderhouden. Omdat AWS Glue Schema Registry identiteitsgebaseerd toegangsbeleid ondersteunt, hebben we de IAM-rol voor meerdere accounts gebruikt om de Kafka-producer en consumententoepassingen die in een afzonderlijk account worden uitgevoerd, veilig toegang te geven tot het schema vanuit het centrale account om berichten te valideren. Omdat het Avro-schema van tevoren was afgesproken, hebben we Avro . gebruikt SpecificRecord om typeveiligheid tijdens het compileren te garanderen en problemen met de validatie van runtimeschema's aan de clientzijde te voorkomen. De code die voor dit bericht is gebruikt, is beschikbaar op GitHub als referentie.

Raadpleeg voor meer informatie over de services en bronnen in deze oplossing: AWS Lijm Schema Register Amazon MSK-ontwikkelaarsgids AWS Glue Schema Register SerDe bibliotheek en IAM-zelfstudie: Toegang delegeren tussen AWS-accounts met behulp van IAM-rollen.


Over de auteur

Vika Bajaj is Principal Solutions Architect bij Amazon Web Service. Vikas werkt met digital native-klanten en adviseert hen over technologie-architectuur en -modellering, en opties en oplossingen om strategische bedrijfsdoelstellingen te behalen. Hij zorgt ervoor dat ontwerpen en oplossingen efficiënt, duurzaam en geschikt zijn voor huidige en toekomstige zakelijke behoeften. Afgezien van discussies over architectuur en technologie, kijkt en speelt hij graag cricket.

spot_img

Laatste intelligentie

spot_img