Zephyrnet-logo

Evolueer JSON-schema's in Amazon MSK- en Amazon Kinesis-gegevensstromen met het AWS Glue Schema-register

Datum:

Gegevens worden in een enorm tempo geproduceerd, gestreamd en geconsumeerd, en naar verwachting zal dat tempo in de toekomst exponentieel groeien. JSON is met name het meest gebruikte gegevensformaat voor streamingtechnologieën en workloads. Naarmate toepassingen, websites en machines steeds vaker gebruikmaken van technologieën voor gegevensstreaming, zoals Apache Kafka en Amazon Kinesis-gegevensstromen, die dienen als een zeer beschikbare transportlaag die de gegevensproducenten loskoppelt van gegevensconsumenten, kan het voor teams steeds uitdagender worden om JSON-schema's te coördineren en te ontwikkelen. Het toevoegen of verwijderen van een veld of het wijzigen van het gegevenstype op een of meer bestaande velden kan problemen met de gegevenskwaliteit en stroomafwaartse toepassingsfouten veroorzaken zonder zorgvuldige gegevensverwerking. Teams vertrouwen op aangepaste tools, complexe code, vervelende processen of onbetrouwbare documentatie om zich te beschermen tegen deze schemawijzigingen. Dit maakt een grote afhankelijkheid van menselijk toezicht, wat de veranderingsbeheerprocessen foutgevoelig kan maken. Een veelvoorkomende oplossing is een schemaregister waarmee gegevensproducenten en consumenten schemawijzigingen op een gecoördineerde manier kunnen valideren. Dit zorgt voor risicovrije evolutie als de zakelijke eisen in de loop van de tijd veranderen.

De AWS Lijm Schema Register, een serverloze functie van AWS lijm, stelt u nu in staat om streaminggegevens te valideren en betrouwbaar te ontwikkelen tegen JSON-schema's. Het Schemaregister is een gratis functie die de gegevenskwaliteit en de productiviteit van ontwikkelaars aanzienlijk kan verbeteren. Hiermee kunt u defensieve codering en coördinatie tussen teams elimineren, uitval van downstream-applicaties verminderen en een register gebruiken dat is geïntegreerd in meerdere AWS-services. Van elk schema kan een versie worden gemaakt binnen de vangrails van een compatibiliteitsmodus, waardoor ontwikkelaars de flexibiliteit hebben om JSON-schema's op betrouwbare wijze te ontwikkelen. Bovendien kan het Schema-register gegevens serialiseren in een gecomprimeerd formaat, wat u helpt te besparen op kosten voor gegevensoverdracht en opslag.

Dit bericht laat zien hoe u het Schema-register voor JSON-schema's gebruikt en geeft voorbeelden van hoe u het kunt gebruiken met zowel Kinesis-gegevensstromen als Apache Kafka of Amazon Managed Streaming voor Apache Kafka (Amazone MSK).

Overzicht van de oplossing

In dit bericht leiden we je door een oplossing voor het opslaan, valideren en ontwikkelen van een JSON-schema in het AWS Glue Schema-register. Het schema wordt gebruikt door Apache Kafka en Kinesis Data Streams-applicaties terwijl ze produceren en consumeren JSON-objecten. We laten u ook zien wat er gebeurt als een nieuwe versie van het schema wordt gemaakt met een nieuw veld.

Het volgende diagram illustreert onze oplossingsworkflow:

De stappen om deze oplossing te implementeren zijn als volgt:

  1. Maak een nieuw register en registreer een schema met een AWS CloudFormatie sjabloon.
  2. Maak een nieuwe versie van het schema met behulp van de AWS Glue-console die achterwaarts compatibel is met de vorige versie.
  3. Bouw een producer-applicatie om het volgende te doen:
    1. Genereer JSON-objecten die voldoen aan een van de schemaversies.
    2. Serialiseer de JSON-objecten in een array van bytes.
    3. Verkrijg de bijbehorende schemaversie-ID uit het Schemaregister en codeer de bytearray met hetzelfde.
    4. Verzend de gecodeerde byte-array via een Kinesis-gegevensstroom of Apache Kafka-onderwerp.
  4. Bouw een consumententoepassing om het volgende te doen:
    1. Ontvang de gecodeerde byte-array via een Kinesis-gegevensstroom of Apache Kafka-onderwerp.
    2. Decodeer de schemaversie-ID en verkrijg het bijbehorende schema uit het Schemaregister.
    3. Deserialiseer de array van bytes in het oorspronkelijke JSON-object.
    4. Gebruik het JSON-object indien nodig.

Beschrijving van het gebruikte schema

Voor dit bericht beginnen we met het volgende schema. Het schema is van een weerrapportobject dat drie hoofdgegevens bevat: location, temperature en timestamp. Alle drie zijn verplichte velden, maar het schema staat extra velden toe (aangegeven door de additionalProperties vlag) zoals windSpeed or precipitation als de producent ze wil opnemen. De location field is een object met twee stringvelden: city en state. Beide zijn verplichte velden en het schema staat geen extra velden toe binnen dit object.

{ "$id": "https://example.com/weather-report.schema.json", "$schema": "http://json-schema.org/draft-07/schema#", "title": "WeatherReport", "type": "object", "properties": { "location": { "type": "object", "properties": { "city": { "type": "string", "description": "Name of the city where the weather is being reported." }, "state": { "type": "string", "description": "Name of the state where the weather is being reported." } }, "additionalProperties": false, "required": [ "city", "state" ] }, "temperature": { "type": "integer", "description": "Temperature in Farenheit." }, "timestamp": { "description": "Timestamp in epoch format at which the weather was noted.", "type": "integer" } }, "additionalProperties": true, "required": [ "location", "temperature", "timestamp" ]
}

Met behulp van het bovenstaande schema zou een geldig JSON-object er als volgt uitzien:

{ "location": { "city": "Phoenix", "state": "Arizona" }, "temperature": 115, "windSpeed": 50, "timestamp": 1627335205
}

Implementeren met AWS CloudFormation

Voor een snelle start kunt u de meegeleverde CloudFormation-stack implementeren. De CloudFormation-sjabloon genereert de volgende bronnen in uw account:

  • register – Een register is een container met schema's. Met registers kunt u uw schema's ordenen en de toegangscontrole voor uw toepassingen beheren. Een register heeft een Amazon Resource Name (ARN) waarmee u verschillende toegangsmachtigingen voor schemabewerkingen binnen het register kunt organiseren en instellen.
  • Schema – Een schema definieert de structuur en het formaat van een gegevensrecord. Een schema is een versiespecificatie voor betrouwbare gegevenspublicatie, -verbruik of -opslag. Elk schema kan meerdere versies hebben. Versiebeheer wordt bepaald door een compatibiliteitsregel die wordt toegepast op een schema. Verzoeken om nieuwe schemaversies te registreren worden door het Schemaregister aan deze regel getoetst voordat ze kunnen slagen.

Om deze bronnen handmatig aan te maken zonder AWS CloudFormation te gebruiken, raadpleegt u: Een register maken en Een schema maken.

Voorwaarden

Zorg ervoor dat u de volgende stappen als vereisten voltooit:

  1. Maak een AWS-account aan. Voor dit bericht configureer je de vereiste AWS-bronnen in de us-east-1 or us-west-2 Regio. Als u zich nog niet heeft aangemeld, voert u de volgende taken uit:
    1. Account aanmaken. Voor instructies, zie Meld u aan voor AWS.
    2. Maak een AWS Identiteits- en toegangsbeheer (IAM) gebruiker. Voor instructies, zie Een IAM-gebruiker maken in uw AWS-account.
  2. Kies Start Stack om de CloudFormation-stack te starten:

Bekijk het nieuw geregistreerde schema

Laten we de register en schema op de AWS Glue-console.

  1. Meld u aan bij de AWS Glue-console en kies de juiste regio.
  2. Onder Gegevenscatalogus, kiezen Schemaregisters.
  3. Kies de GsrBlogRegistry schema register.
  4. Kies de GsrBlogSchema schema.
  5. Kies versie 1.

We kunnen de details van de JSON Schema-versie en de definitie ervan zien. Merk op dat de compatibiliteitsmodus gekozen is Achterwaartse compatibiliteit. Het doel daarvan zien we in de volgende paragraaf.

Evolueer het schema door een nieuwe achterwaarts compatibele versie te maken

In deze sectie nemen we wat er tot nu toe is gemaakt en een nieuwe schemaversie toevoegen om te laten zien hoe we ons schema kunnen ontwikkelen terwijl we de integriteit intact houden.

Om een ​​nieuwe schemaversie toe te voegen, voert u de volgende stappen uit en gaat u verder vanaf de vorige sectie:

  1. Op de Schema versie details pagina, kies Nieuwe versie registreren.
  2. Binnen properties object binnen de location object (na de state veld), voeg een nieuwe . toe country veld als volgt:
    "country": { "type": "string", "description": "Name of the country where the weather is being reported." }

Omdat de gekozen compatibiliteitsmodus voor het schema achterwaartse compatibiliteit is, is het belangrijk dat we van dit nieuwe veld geen verplicht veld maken. Als we dat doen, mislukt het Schemaregister deze nieuwe versie.

  1. Kies Registreer versie.

We hebben nu een nieuwe versie van het schema waarmee de producenten een optioneel landveld kunnen opnemen in het locatieobject als ze dat willen.

Gebruik het AWS Glue Schema-register

In deze sectie doorlopen we de stappen om het Schema-register te gebruiken met Kinesis Data Streams of Apache Kafka.

Voorwaarden

Zorg ervoor dat u de volgende stappen als vereisten voltooit:

  1. Configureer uw AWS-inloggegevens in uw lokale machine.
  2. Install Maven op de lokale machine.
  3. Download de applicatiecode van de GitHub rest.
  4. Stel het pakket samen:
    mvn clean package

Gebruik het schemaregister met Kinesis-gegevensstromen

Voer de Kinesis-producercode uit om JSON-berichten te produceren die zijn gekoppeld aan een schema-ID die is toegewezen door het Schemaregister:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.gsr.samples.json.kinesis.RunKinesisProducer" -Dexec.args="<<KINESIS_DATA_STREAM_NAME>>"

Deze opdracht retourneert de volgende uitvoer:

Putting 1 record into <<KINESIS_DATA_STREAM_NAME>>
Sent message 0
Putting 1 record into <<KINESIS_DATA_STREAM_NAME>>
Sent message 1
Putting 1 record into <<KINESIS_DATA_STREAM_NAME>>
Sent message 2
Successfully produced 3 messages to a stream called <<KINESIS_DATA_STREAM_NAME>>

Voer de Kinesis-consumentencode uit om JSON-berichten met de schema-ID te ontvangen, haal het schema op uit het Schemaregister en valideer:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.gsr.samples.json.kinesis.RunKinesisConsumer" -Dexec.args="<<KINESIS_DATA_STREAM_NAME>>"

Deze opdracht retourneert de volgende uitvoer met de ontvangen en gedecodeerde JSON-records:

Number of Records received: 1
[JsonDataWithSchema(schema={"$schema":"http://json-schema.org/draft-07/schema#","additionalProperties":true,"title":"WeatherReport","type":"object","properties":{"temperature":{"description":"Temperature in Farenheit.","type":"integer"},"location":{"additionalProperties":false,"type":"object","properties":{"city":{"description":"Name of the city where the weather is being reported.","type":"string"},"state":{"description":"Name of the state where the weather is being reported.","type":"string"}},"required":["city","state"]},"timestamp":{"description":"Timestamp in epoch format at which the weather was noted.","type":"integer"}},"required":["location","temperature","timestamp"],"$id":"https://example.com/weather-report.schema.json"}, payload={"temperature":89,"location":{"city":"Orlando","state":"Florida"},"timestamp":1627335205})]

Gebruik het schemaregister met Apache Kafka

Maak in de hoofdmap van de gedownloade GitHub-repomap een configuratiebestand met de verbindingsparameters voor het Kafka-cluster:

# Kafka
bootstrap.servers=localhost:9092

Voer de Kafka-producercode uit om JSON-berichten te produceren die zijn gekoppeld aan een schema-ID die is toegewezen door het Schemaregister:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.gsr.samples.json.kafka.RunKafkaProducer" -Dexec.args="<<CONFIG_FILE_NAME>><< TOPIC_NAME>>"

Deze opdracht retourneert de volgende uitvoer:

Sent message 0
Sent message 1
Sent message 2
Successfully produced 3 messages to a topic called <<TOPIC_NAME>>

Voer de Kafka-consumentencode uit om JSON-berichten met de schema-ID te gebruiken, haal het schema op uit het Schemaregister en valideer:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.gsr.samples.json.kafka.RunKafkaConsumer" -Dexec.args="<<CONFIG_FILE_NAME>> <<TOPIC_NAME>>"

Deze opdracht retourneert de volgende uitvoer met de ontvangen en gedecodeerde JSON-records:

Received message: key = message-0, value = JsonDataWithSchema(schema={"$schema":"http://json-schema.org/draft-07/schema#","additionalProperties":true,"title":"WeatherReport","type":"object","properties":{"temperature":{"description":"Temperature in Farenheit.","type":"integer"},"location":{"additionalProperties":false,"type":"object","properties":{"city":{"description":"Name of the city where the weather is being reported.","type":"string"},"state":{"description":"Name of the state where the weather is being reported.","type":"string"}},"required":["city","state"]},"timestamp":{"description":"Timestamp in epoch format at which the weather was noted.","type":"integer"}},"required":["location","temperature","timestamp"],"$id":"https://example.com/weather-report.schema.json"}, payload={"temperature":115,"location":{"city":"Phoenix","state":"Arizona"},"windSpeed":50,"timestamp":1627335205})

Opruimen

Nu naar de laatste stap, het opruimen van de middelen. Verwijder de CloudFormation-stack om alle resources te verwijderen die u hebt gemaakt als onderdeel van deze walkthrough.

Schema Register-functies

Laten we de functies bespreken die het Schema-register te bieden heeft:

  • Schema-ontdekking - Wanneer een producent een schemawijziging registreert, kunnen metadata worden toegepast als een sleutel-waardepaar om beheerders of ontwikkelaars doorzoekbare informatie te bieden. Deze metagegevens kunnen de oorspronkelijke bron van de gegevens aangeven (source=MSK_west), de naam van het team waarmee contact moet worden opgenomen (owner=DataEngineering), of AWS-tags (environment=Production). U kunt mogelijk een veld in uw gegevens op de producerende client versleutelen en metagegevens gebruiken om aan potentiële consumentenclients te specificeren welke openbare sleutel-vingerafdruk moet worden gebruikt voor de ontsleuteling.
  • Schemacompatibiliteit – Het versiebeheer van elk schema wordt bepaald door een compatibiliteitsmodus. Als wordt verzocht om een ​​nieuwe versie van een schema te registreren die de opgegeven compatibiliteitsmodus verbreekt, mislukt de aanvraag en wordt er een uitzondering gegenereerd. Compatibiliteitscontroles stellen ontwikkelaars die downstream-applicaties bouwen in staat om een ​​beperkte set scenario's te hebben om applicaties tegen te bouwen, wat helpt om zich probleemloos voor te bereiden op de wijzigingen. Veelgebruikte modi zijn VOORUIT, ACHTERUIT en VOLLEDIG. Voor meer informatie over modusdefinities, zie Schemaversie en compatibiliteit.
  • Schema validatie – Schema Register-serializers valideren dat de geproduceerde gegevens compatibel zijn met het toegewezen schema. Als dit niet het geval is, ontvangt de gegevensproducent een uitzondering van de serializer. Dit zorgt ervoor dat potentieel verbrekende wijzigingen eerder in ontwikkelingscycli worden gevonden en kan ook onbedoelde schemawijzigingen als gevolg van menselijke fouten helpen voorkomen.
  • Automatische registratie van schema's – Indien geconfigureerd om dit te doen, kan de gegevensproducent automatisch schemawijzigingen registreren terwijl ze in de gegevensstroom stromen. Dit is met name handig voor gebruikssituaties waarbij de bron van de gegevens wordt gegenereerd door een change data capture process (CDC) uit de database.
  • IAM-ondersteuning - Dankzij geïntegreerde IAM-ondersteuning kunnen alleen geautoriseerde producenten bepaalde schema's wijzigen. Bovendien kunnen alleen die consumenten die gemachtigd zijn om het schema te lezen, dit doen. Schemawijzigingen worden doorgaans opzettelijk en met zorg uitgevoerd, dus het is belangrijk om IAM te gebruiken om te bepalen wie deze wijzigingen uitvoert. Bovendien is toegangscontrole tot schema's belangrijk in situaties waarin gevoelige informatie in de schemadefinitie zelf kan worden opgenomen. In de vorige voorbeelden IAM-rollen worden afgeleid via de AWS SDK voor Java, dus ze zijn geërfd van de Amazon Elastic Compute-cloud (Amazon EC2) de rol van de instantie waarop de toepassing wordt uitgevoerd, bij gebruik van Amazon EC2. U kunt IAM-rollen ook toepassen op elke andere AWS-service die deze code zou kunnen bevatten, zoals containers of AWS Lambda functies.
  • Secundaire deserializer - Als u al schema's in een ander schemaregister hebt geregistreerd, is er een optie om een ​​secundaire deserializer op te geven bij het uitvoeren van schema-lookups. Dit maakt migraties vanuit andere schemaregisters mogelijk zonder helemaal opnieuw te hoeven beginnen. Elke schema-ID die niet bekend is in het Schemaregister, wordt opgezocht in het register dat is gekoppeld aan de secundaire deserializer.
  • Compressie – Het gebruik van een schemaregister kan de gegevensbelasting verminderen doordat er niet langer bij elk bericht schema's hoeven te worden verzonden en ontvangen. Schema Registry-bibliotheken bieden ook een optie voor zlib-compressie, die de gegevensvereisten nog verder kan verminderen door de payload van het bericht te comprimeren. Dit verschilt per gebruikssituatie, maar compressie kan de omvang van het bericht aanzienlijk verkleinen.
  • Meerdere gegevensformaten – Het Schema Register ondersteunt momenteel AVRO (v1.10.2) gegevensformaat, JSON-gegevensindeling met JSON Schema-indeling voor het schema (specificaties Draft-04, Draft-06 en Draft-07), en Java-taalondersteuning, met andere gegevensindelingen en talen die nog volgen.

Conclusie

In dit bericht hebben we de voordelen besproken van het gebruik van het AWS Glue Schema Registry om JSON-schema's voor datastromen te registreren, valideren en ontwikkelen naarmate de bedrijfsbehoeften veranderen. We hebben ook voorbeelden gegeven van het gebruik van het Schemaregister.

Lees verder over Integratie met AWS Glue Schema Registry.


Over de auteur

Aditya Challa is Senior Solutions Architect bij Amazon Web Services. Aditya helpt klanten graag bij hun AWS-reizen, omdat hij weet dat reizen altijd beter is als er gezelschap is. Hij is een grote fan van reizen, geschiedenis, technische wonderen en elke dag iets nieuws leren.

spot_img

Laatste intelligentie

spot_img

Chat met ons

Hallo daar! Hoe kan ik u helpen?