Zephyrnet-logo

Back-up en herstel Kafka-onderwerpgegevens met Amazon MSK Connect

Datum:

U kunt Apache Kafka gebruiken om uw streaming-workloads uit te voeren. Kafka biedt weerstand tegen storingen en beschermt uw gegevens direct uit de doos door gegevens te repliceren over de brokers van het cluster. Dit zorgt ervoor dat de gegevens in het cluster duurzaam zijn. U kunt uw SLA's voor duurzaamheid bereiken door de replicatiefactor van het onderwerp te wijzigen. Streaminggegevens die zijn opgeslagen in Kafka-onderwerpen zijn echter meestal van voorbijgaande aard en hebben doorgaans een bewaartijd van dagen of weken. U kunt om verschillende redenen een back-up maken van de gegevens die zijn opgeslagen in uw Kafka-onderwerp lang nadat de bewaartijd is verstreken. U kunt bijvoorbeeld nalevingsvereisten hebben die vereisen dat u de gegevens meerdere jaren bewaart. Of je hebt misschien synthetische gegevens samengesteld die herhaaldelijk moeten worden gehydrateerd in Kafka-onderwerpen voordat je met de integratietests van je workload begint. Of een upstream-systeem waar u geen controle over heeft, produceert slechte gegevens en u moet uw onderwerp herstellen naar een voorheen goede staat.

Gegevens voor onbepaalde tijd opslaan in Kafka-onderwerpen is een optie, maar soms vraagt ​​de use case om een ​​aparte kopie. Met tools zoals MirrorMaker kunt u een back-up van uw gegevens maken in een ander Kafka-cluster. Dit vereist echter dat een ander actief Kafka-cluster als back-up wordt uitgevoerd, wat de rekenkosten en opslagkosten verhoogt. Een kosteneffectieve en duurzame manier om een ​​back-up te maken van de gegevens van uw Kafka-cluster is het gebruik van een objectopslagservice zoals: Amazon eenvoudige opslagservice (Amazone S3).

In dit bericht bespreken we een oplossing waarmee u een back-up van uw gegevens kunt maken voor koude opslag met behulp van Amazon MSK Connect. We herstellen de back-upgegevens naar een ander Kafka-onderwerp en stellen de consumentencompensaties opnieuw in op basis van uw gebruiksscenario.

Overzicht van de oplossing

Kafka Connect is een onderdeel van Apache Kafka dat het streamen van gegevens tussen Kafka-onderwerpen en externe systemen zoals objectopslag, databases en bestandssystemen vereenvoudigt. Het gebruikt sink-connectors om gegevens van Kafka-onderwerpen naar externe systemen te streamen, en bronconnectoren om gegevens van externe systemen naar Kafka-onderwerpen te streamen. U kunt kant-en-klare connectoren gebruiken die zijn geschreven door derden of uw eigen connectoren schrijven om aan uw specifieke vereisten te voldoen.

MSK Connect is een functie van Amazon Managed Streaming voor Apache Kafka (Amazon MSK) waarmee u volledig beheerde Kafka Connect-workloads kunt uitvoeren. Het werkt met MSK-clusters en met compatibele zelfbeheerde Kafka-clusters. In dit bericht gebruiken we de Lenzen AWS S3-connector om een ​​back-up te maken van de gegevens die zijn opgeslagen in een onderwerp in een Amazon MSK-cluster naar Amazon S3 en deze gegevens terug te zetten naar een ander onderwerp. Het volgende diagram toont onze oplossingsarchitectuur.

Om deze oplossing te implementeren, doorlopen we de volgende stappen:

  1. Maak een back-up van de gegevens met behulp van een MSK Connect-sink-connector op een S3-bucket.
  2. Herstel de gegevens met behulp van een MSK Connect-bronconnector naar een nieuw Kafka-onderwerp.
  3. Reset consumentencompensaties op basis van verschillende scenario's.

Voorwaarden

Zorg ervoor dat u de volgende stappen als vereisten voltooit:

  1. Stel de benodigde bronnen in voor Amazon MSK, Amazon S3 en AWS Identiteits- en toegangsbeheer (IK BEN).
  2. Maak twee Kafka-onderwerpen in het MSK-cluster: source_topic en target_topic.
  3. Een MSK Connect-plug-in maken met de Lenzen AWS S3-connector.
  4. Installeer de Kafka CLI door stap 1 van: Apache Kafka-snelstart.
  5. Installeer de kcat-hulpprogramma om testberichten naar het Kafka-onderwerp te sturen.

Maak een back-up van je onderwerpen

Afhankelijk van de gebruikssituatie wilt u wellicht een back-up maken van alle onderwerpen in uw Kafka-cluster of van enkele specifieke onderwerpen. In dit bericht bespreken we hoe u een back-up kunt maken van een enkel onderwerp, maar u kunt de oplossing uitbreiden om een ​​back-up van meerdere onderwerpen te maken.

Het formaat waarin de gegevens worden opgeslagen in Amazon S3 is belangrijk. Misschien wilt u de gegevens die zijn opgeslagen in Amazon S3 inspecteren om problemen zoals de introductie van slechte gegevens op te lossen. U kunt gegevens bekijken die zijn opgeslagen als JSON of platte tekst door teksteditors te gebruiken en te kijken in de tijdframes die voor u van belang zijn. U kunt ook grote hoeveelheden gegevens onderzoeken die zijn opgeslagen in Amazon S3 als JSON of Parquet met behulp van AWS-services zoals: Amazone Athene. De Lenzen AWS S3-connector ondersteunt het opslaan van objecten als JSON, Avro, Parquet, platte tekst of binair.

In dit bericht sturen we JSON-gegevens naar het Kafka-onderwerp en slaan deze op in Amazon S3. Afhankelijk van het gegevenstype dat aan uw vereisten voldoet, werkt u de connect.s3.kcql verklaring en *.converter configuratie. U kunt verwijzen naar de Documentatie lenzen gootsteenconnector voor details over de ondersteunde formaten en de gerelateerde configuraties. Als de bestaande connectoren niet werken voor uw gebruik, kunt u ook: schrijf je eigen connector of bestaande connectoren uitbreiden. U kunt de gegevens die zijn opgeslagen in Amazon S3 partitioneren op basis van velden van primitieve typen in de berichtkop of payload. We gebruiken de datumvelden die zijn opgeslagen in de koptekst om de gegevens op Amazon S3 te partitioneren.

Volg deze stappen om een ​​back-up van uw onderwerp te maken:

  1. Maak een nieuwe Amazon MSK-sink-connector door de volgende opdracht uit te voeren:
    aws kafkaconnect create-connector --capacity "autoScaling={maxWorkerCount=2,mcuCount=1,minWorkerCount=1,scaleInPolicy={cpuUtilizationPercentage=10},scaleOutPolicy={cpuUtilizationPercentage=80}}" --connector-configuration "connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector, key.converter.schemas.enable=false, connect.s3.kcql=INSERT IGNORE INTO <<S3 Bucket Name>>:my_workload SELECT * FROM source_topic PARTITIONBY _header.year,_header.month,_header.day,_header.hour STOREAS `JSON` WITHPARTITIONER=KeysAndValues WITH_FLUSH_COUNT = 5, aws.region=us-east-1, tasks.max=2, topics=source_topic, schema.enable=false, errors.log.enable=true, value.converter=org.apache.kafka.connect.storage.StringConverter, key.converter=org.apache.kafka.connect.storage.StringConverter " --connector-name "backup-msk-to-s3-v1" --kafka-cluster '{"apacheKafkaCluster": {"bootstrapServers": "<<MSK broker list>>","vpc": {"securityGroups": [ <<Security Group>> ],"subnets": [ <<Subnet List>&
    gt; ]}}}' --kafka-cluster-client-authentication "authenticationType=NONE" --kafka-cluster-encryption-in-transit "encryptionType=PLAINTEXT" --kafka-connect-version "2.7.1" --plugins "customPlugin={customPluginArn=<< ARN of the MSK Connect Plugin >>,revision=1}" --service-execution-role-arn " <<ARN of the IAM Role>> "

  2. Stuur gegevens naar het onderwerp met kcat:
    ./kcat -b <<broker list>> -t source_topic -H "year=$(date +"%Y")" -H "month=$(date +"%m")" -H "day=$(date +"%d")" -H "hour=$(date +"%H")" -P
    {"message":"interesset eros vel elit salutatus"}
    {"message":"impetus deterruisset per aliquam luctus"}
    {"message":"ridens vocibus feugait vitae cras"}
    {"message":"interesset eros vel elit salutatus"}
    {"message":"impetus deterruisset per aliquam luctus"}
    {"message":"ridens vocibus feugait vitae cras"}

  3. Controleer de S3-bucket om er zeker van te zijn dat de gegevens worden geschreven.

MSK Connect publiceert statistieken om Amazon Cloud Watch die u kunt gebruiken om uw back-upproces te bewaken. Belangrijke statistieken zijn: SinkRecordReadRate en SinkRecordSendRate, die het gemiddelde aantal records meet dat respectievelijk is gelezen van Kafka en geschreven naar Amazon S3.

Zorg er ook voor dat de back-upconnector gelijke tred houdt met de snelheid waarmee het Kafka-onderwerp berichten ontvangt door de offset-vertraging van de connector te bewaken. Als u Amazon MSK gebruikt, kunt u dit doen door: metrische gegevens op partitieniveau inschakelen op Amazon MSK en het bewaken van de OffsetLag metriek van alle partities voor de consumentengroep van de back-upconnector. U moet dit zo dicht mogelijk bij 0 houden door het maximum aantal MSK Connect worker-instanties aan te passen. De opdracht die we in de vorige stap hebben gebruikt, stelt MSK Connect in om automatisch op te schalen naar twee werkers. Pas de .... aan --capacity instelling om het maximale aantal werknemers van MSK Connect-werknemers te verhogen of te verlagen op basis van de OffsetLag metriek.

Gegevens herstellen naar uw onderwerpen

U kunt uw back-upgegevens herstellen naar een nieuw onderwerp met dezelfde naam in hetzelfde Kafka-cluster, een ander onderwerp in hetzelfde Kafka-cluster of een ander onderwerp in een ander Kafka-cluster. In dit bericht lopen we door het scenario van het herstellen van gegevens waarvan een back-up is gemaakt in Amazon S3 naar een ander onderwerp, target_topic, in hetzelfde Kafka-cluster. U kunt dit uitbreiden naar andere scenario's door het onderwerp en de brokerdetails in de connectorconfiguratie te wijzigen.

Volg deze stappen om de gegevens te herstellen:

  1. Maak een Amazon MSK-bronconnector door de volgende opdracht uit te voeren:
    aws kafkaconnect create-connector --capacity "autoScaling={maxWorkerCount=2,mcuCount=1,minWorkerCount=1,scaleInPolicy={cpuUtilizationPercentage=10},scaleOutPolicy={cpuUtilizationPercentage=80}}" --connector-configuration "connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector, key.converter.schemas.enable=false, connect.s3.kcql=INSERT IGNORE INTO target_topic SELECT * FROM <<S3 Bucket Name>>:my_workload PARTITIONBY _header.year,_header.month,_header.day,_header.hour STOREAS `JSON` WITHPARTITIONER=KeysAndValues WITH_FLUSH_COUNT = 5 , aws.region=us-east-1, tasks.max=2, topics=target_topic, schema.enable=false, errors.log.enable=true, value.converter=org.apache.kafka.connect.storage.StringConverter, key.converter=org.apache.kafka.connect.storage.StringConverter " --connector-name "restore-s3-to-msk-v1" --kafka-cluster '{"apacheKafkaCluster": {"bootstrapServers": "<<MSK broker list>>","vpc": {"securityGroups": [<<Security Group>>],"subnets": [ <<Subnet List>> ]}}}' --kafka-cluster-client-authentication "authenticationType=NONE" --kafka-cluster-encryption-in-transit "encryptionType=PLAINTEXT" --kafka-connect-version "2.7.1" --plugins "customPlugin={customPluginArn=<< ARN of the MSK Connect Plugin >>,revision=1}" --service-execution-role-arn " <<ARN of the IAM Role>> "

De connector leest de gegevens van de S3-bucket en speelt deze terug naar target_topic.

  1. Controleer of de gegevens naar het Kafka-onderwerp worden geschreven door de volgende opdracht uit te voeren:
    ./kafka-console-consumer.sh --bootstrap-server <<MSK broker list>> --topic target_topic --from-beginning

MSK Connect-connectoren werken voor onbepaalde tijd, wachtend op nieuwe gegevens die naar de bron worden geschreven. Tijdens het herstellen moet u de connector echter stoppen nadat alle gegevens naar het onderwerp zijn gekopieerd. MSK Connect publiceert de SourceRecordPollRate en SourceRecordWriteRate metrische gegevens naar CloudWatch, die respectievelijk het gemiddelde aantal records meten dat is ondervraagd vanuit Amazon S3 en het aantal records dat naar het Kafka-cluster is geschreven. U kunt deze metrische gegevens bewaken om de status van het herstelproces bij te houden. Wanneer deze statistieken 0 bereiken, worden de gegevens van Amazon S3 hersteld naar de target_topic. U kunt op de hoogte worden gesteld van de voltooiing door een CloudWatch-alarm in te stellen voor deze statistieken. U kunt de automatisering uitbreiden om een AWS Lambda functie die de connector verwijdert wanneer het herstel is voltooid.

Net als bij het back-upproces kunt u het herstelproces versnellen door het aantal MSK Connect-werknemers uit te schalen. Verander de --capacity parameter om het maximum en minimum aan werkkrachten aan te passen tot een aantal dat voldoet aan de herstel-SLA's van uw werkbelasting.

Vergoedingen voor consumenten resetten

Afhankelijk van de vereisten voor het herstellen van de gegevens naar een nieuw Kafka-onderwerp, moet u mogelijk ook de offsets van de consumentengroep alvorens ze te consumeren of te produceren. Het identificeren van de werkelijke offset waarnaar u wilt resetten, hangt af van uw specifieke zakelijke gebruiksscenario en vereist handmatig werk om dit te identificeren. U kunt hulpmiddelen zoals Amazon S3 Select, Athena of andere aangepaste hulpmiddelen gebruiken om de objecten te inspecteren. De volgende schermafbeelding toont het lezen van de records die eindigen op offset 14 van partitie 2 van onderwerp source_topic met S3 Select.

Nadat u de nieuwe startoffsets voor uw consumentengroepen hebt geïdentificeerd, moet u deze opnieuw instellen op uw Kafka-cluster. U kunt dit doen met behulp van de CLI-tools die bij Kafka worden geleverd.

Bestaande consumentengroepen

Als u dezelfde naam van de consumentengroep wilt gebruiken na het herstellen van het onderwerp, kunt u dit doen door de volgende opdracht uit te voeren voor elke partitie van het herstelde onderwerp:

 ./kafka-consumer-groups.sh --bootstrap-server <<broker list>> --group <<consumer group>> --topic target_topic:<<partition>> --to-offset <<desired offset>> --reset-offsets --execute

Controleer dit door de --describe optie van het commando:

./kafka-consumer-groups.sh --bootstrap-server <<brok
er list>> --group <<consumer group>> --describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG ...
source_topic 0 211006 188417765 188206759 ...
source_topic 1 212847 192997707 192784860 ...
source_topic 2 211147 196410627 196199480 ...
target_topic 0 211006 188417765 188206759 ...
target_topic 1 212847 192997707 192784860 ...
target_topic 2 211147 196410627 196199480 ...

Nieuwe consumentengroep

Als u wilt dat uw werklast een nieuwe consumentengroep maakt en aangepaste compensaties zoekt, kunt u dit doen door de zoeken methode in uw Kafka-consument voor elke partitie. U kunt ook de nieuwe consumentengroep maken door de volgende code uit te voeren:

./kafka-console-consumer.sh --bootstrap-server <<broker list>> --topic target_topic --group <<consumer group>> --from-beginning --max-messages 1

Reset de offset naar de gewenste offsets voor elke partitie door de volgende opdracht uit te voeren:

./kafka-consumer-groups.sh --bootstrap-server <<broker list>> --group <<New consumer group>> --topic target_topic:<<partition>> --to-offset <<desired offset>> --reset-offsets --execute

Opruimen

Voer de volgende opschoonstappen uit om lopende kosten te voorkomen:

  1. Verwijder de MSK Connect-connectoren en plug-in.
  2. Verwijder het MSK-cluster.
  3. Verwijder de S3-buckets.
  4. Verwijder alle CloudWatch-resources die u hebt gemaakt.

Conclusie

In dit bericht hebben we u laten zien hoe u een back-up kunt maken van Kafka-onderwerpgegevens en deze kunt herstellen met MSK Connect. U kunt deze oplossing uitbreiden naar meerdere onderwerpen en andere gegevensindelingen op basis van uw werklast. Zorg ervoor dat u verschillende scenario's test waarmee uw workloads te maken kunnen krijgen en documenteer het runbook voor elk van die scenario's.

Raadpleeg de volgende bronnen voor meer informatie:


Over de auteur

Rakshith Rao is Senior Solutions Architect bij AWS. Hij werkt samen met de strategische klanten van AWS om hun belangrijkste workloads op AWS te bouwen en te beheren.

spot_img

Laatste intelligentie

spot_img

Chat met ons

Hallo daar! Hoe kan ik u helpen?