Zephyrnet Logosu

Hesaplar arası AWS Glue Schema Registry'deki şemaları kullanarak Amazon MSK üzerinden akış verilerini doğrulayın

Tarih:

Günümüzün işletmeleri veri hacminde benzeri görülmemiş bir büyüme ile karşı karşıyadır. Verilerin giderek artan bir kısmı, IoT cihazları, web siteleri, iş uygulamaları ve diğer çeşitli kaynaklar tarafından gerçek zamanlı olarak üretiliyor. İşletmelerin, gerçek zamanlı iş kararları verebilmek için bu verileri ulaşır ulaşmaz işlemesi ve analiz etmesi gerekir. Apache Kafka için Amazon Tarafından Yönetilen Akış (Amazon MSK), verileri gerçek zamanlı olarak toplamak ve işlemek için Apache Kafka'yı kullanan akış işleme uygulamalarının oluşturulmasına ve çalıştırılmasına olanak tanıyan, tam olarak yönetilen bir hizmettir.

Apache Kafka kullanan akış işleme uygulamaları birbirleriyle doğrudan iletişim kurmaz; Kafka konuları üzerinden mesaj gönderip alarak iletişim kurarlar. Akış işleme uygulamalarının verimli ve güvenli bir şekilde iletişim kurabilmesi için, nitelikler ve veri türleri açısından bir mesaj veri yükü yapısının tanımlanması gerekir. Bu yapı, mesaj gönderip alırken kullanılan şema uygulamalarını açıklar. Ancak çok sayıda üretici ve tüketici uygulamasında şemadaki küçük bir değişiklik bile (bir alanın kaldırılması, yeni bir alanın eklenmesi veya veri türünde değişiklik) aşağı akış uygulamalarında hata ayıklaması ve düzeltilmesi zor sorunlara neden olabilir.

Geleneksel olarak ekipler, birbirlerini veri şeması değişiklikleri hakkında bilgilendirmek için değişiklik yönetimi süreçlerinden (onaylar ve bakım pencereleri gibi) veya diğer resmi olmayan mekanizmalardan (belgeler, e-postalar, işbirliği araçları vb.) yararlanır. Ancak bu mekanizmalar ölçeklenmez ve hatalara açıktır. AWS Tutkal Şeması Kayıt Defteri akış işleme uygulamalarına yönelik şemaları merkezi olarak yayınlamanıza, keşfetmenize, kontrol etmenize, doğrulamanıza ve geliştirmenize olanak tanır. İle AWS Tutkal Şeması Kayıt Defterikullanarak veri akışı uygulamalarındaki şemaları yönetebilir ve uygulayabilirsiniz. Apache KafkaAmazon MSK, Amazon Kinesis Veri Akışları, Apache Flink için Amazon Kinesis Veri Analizi, ve AWS Lambda.

Bu gönderi, Apache Kafka akış işleme uygulamalarının, bir iletiyi kullanarak iletileri nasıl doğruladığını gösterir. Apaçi Avro şemada saklandı AWS Glue Schema kaydı merkezi bir AWS hesabında ikamet ediyor. biz kullanıyoruz AWS Glue Schema Registry SerDe kitaplığı ve Avro SpecificRecord Amazon MSK kümesindeki bir Kafka konusundan mesaj gönderip alırken akış işleme uygulamalarındaki mesajları doğrulamak için. Bu yazı için Avro şemasını kullansak da aynı yaklaşım ve konsept JSON şemaları için de geçerlidir.

Kullanım örneği

Tek boynuzlu at gezileri sunan hayali bir araç paylaşımı şirketini varsayalım. Eyleme geçirilebilir bilgiler elde etmek için, tek boynuzlu ata binme isteği mesajlarının akışını işlemeleri gerekiyor. Yolculukların çok popüler olmasını bekliyorlar ve çözümlerinin ölçeklenebilir olduğundan emin olmak istiyorlar. Ayrıca tüm akış ve operasyon verilerinin analiz için saklandığı merkezi bir veri gölü de inşa ediyorlar. Müşteri takıntılıdırlar, bu nedenle gelecekteki yolculuklara tek boynuzlu atınızın saç rengini seçmek gibi yeni eğlenceli özellikler eklemeyi bekliyorlar ve bu özellikleri yolculuk isteği mesajlarına yansıtmaları gerekecek. Gelecekteki şema değişiklikleri nedeniyle aşağı akış uygulamalarında sorunları önlemek için, mesajları merkezi bir şema kayıt defterinde barındırılan bir şema ile doğrulayacak bir mekanizmaya ihtiyaçları vardır. Şemaların merkezi bir şema kaydında bulunması, uygulama ekiplerinin şemaları tek bir yerde yayınlamasını, doğrulamasını, geliştirmesini ve bakımını yapmasını kolaylaştırır.

Çözüme genel bakış

Şirket, tek boynuzlu ata binme isteği mesajlarını geniş ölçekte yakalamak ve dağıtmak için Amazon MSK'yı kullanıyor. Zengin veri yapıları sağlaması, JSON'a doğrudan eşlemeyi desteklemesi ve kompakt, hızlı ve ikili veri formatını desteklemesi nedeniyle tek boynuzlu yolculuk istekleri için bir Avro şeması tanımlarlar. Şema üzerinde önceden anlaşmaya varıldığı için Avro'yu kullanmaya karar verdiler SpecificRecord.SpecificRecord Avro kütüphanesinden bir Avro kaydının POJO olarak kullanılmasına izin veren bir arayüzdür. Bu, şemadan bir Java sınıfı (veya sınıfları) oluşturularak yapılır. avro-maven-plugin. Onlar kullanırlar AWS Kimlik ve Erişim Yönetimi (BEN) hesaplar arası roller diğer AWS hesabındaki üretici ve tüketici uygulamalarının merkezi Schema Registry hesabındaki şemalara güvenli ve güvenli bir şekilde erişmesine izin vermek.

AWS Glue Schema Registry, Hesap B'de yer alırken, MSK kümesi ile Kafka üretici ve tüketici uygulamaları Hesap A'dadır. AWS Glue Schema Registry'ye hesaplar arası erişimi etkinleştirmek için aşağıdaki iki IAM rolünü kullanıyoruz. AWS Glue Schema Registry'nin kaynak tabanlı politikaları desteklememesi nedeniyle A Hesabındaki Apache Kafka istemcileri, kimlik tabanlı bir politika kullanarak B Hesabında bir rol üstlenir.

  • Hesap A IAM rolü – Üretici ve tüketici uygulamalarının Hesap B'de bir IAM rolü üstlenmesine olanak tanır.
  • B Hesabı IAM rolü – A Hesabındaki tüm IAM sorumlularına güvenir ve B Hesabındaki AWS Glue Schema Kayıt Defterinde okuma eylemleri gerçekleştirmelerine izin verir. Gerçek bir kullanım senaryosunda, hesaplar arası rolleri üstlenebilen IAM sorumlularının kapsamı daha spesifik olarak belirlenmelidir.

Aşağıdaki mimari diyagram çözümü göstermektedir:

Çözüm şu şekilde çalışır:

  1. Hesap A'da çalışan bir Kafka üreticisi, Hesap B'de hesaplar arası Şema Kayıt Defteri IAM rolünü şu çağrıyı yaparak üstlenir: AWS Security Token Hizmeti (AWS STS) assumeRole API.
  2. Kafka üreticisi, tek boynuzlu at sürüş isteği POJO'ya gömülü şema için AWS Glue Schema Registry'den tek boynuzlu at sürüş isteği Avro şema sürüm kimliğini alır. Şema sürüm kimliğinin getirilmesi, AWS Glue Schema Registry SerDe'nin serileştiricisi tarafından dahili olarak yönetilir. Seri hale getiricinin Kafka üretici konfigürasyonunun bir parçası olarak yapılandırılması gerekir.
  3. Şema AWS Glue Schema Registry'de mevcutsa seri hale getirici, veri kaydını şema sürüm kimliğiyle süsler ve ardından onu MSK kümesindeki Kafka konusuna teslim etmeden önce serileştirir.
  4. A Hesabında çalışan Kafka tüketicisi, AWS STS'yi çağırarak B Hesabında hesaplar arası Şema Kayıt Defteri IAM rolünü üstlenir. assumeRole API.
  5. Kafka tüketicisi, veri kayıtları için MSK kümesinde Kafka konusunu yoklamaya başlar.
  6. Kafka tüketicisi, tek boynuzlu at yolculuğu isteği Avro şemasını AWS Glue Schema Registry'den alır ve tek boynuzlu at yolculuğu isteği veri kaydında kodlanan şema sürüm kimliğiyle eşleşir. Şema getiriliyor
    a, AWS Glue Schema Registry SerDe'nin seri durumdan çıkarma aracı tarafından dahili olarak yönetilir. Seri durumdan çıkarıcının Kafka tüketici yapılandırmasının bir parçası olarak yapılandırılması gerekir. Şema, AWS Glue Schema Registry'de mevcutsa seri durumdan çıkarıcı, tüketicinin bunu işlemesi için veri kaydını unicorn sürüş isteği POJO'ya seri durumdan çıkarır.

AWS Glue Schema Registry SerDe kitaplığı, veri aktarımlarından tasarruf etmek için isteğe bağlı sıkıştırma yapılandırmasını da destekler. Şema Kaydı hakkında daha fazla bilgi için bkz. Şema Kaydı nasıl çalışır?.

Tek boynuzlu at sürüş isteği Avro şeması

Aşağıdaki şema (UnicornRideRequest.avsc), müşteri özellikleri ve sistem tarafından önerilen tek boynuzlu at niteliklerinin yanı sıra yolculuk isteği niteliklerini içeren, tek boynuzlu at yolculuğu talebini temsil eden bir kaydı tanımlar:

{ "type": "record", "name": "UnicornRideRequest", "namespace": "demo.glue.schema.registry.avro", "fields": [ {"name": "request_id", "type": "int", "doc": "customer request id"}, {"name": "pickup_address","type": "string","doc": "customer pickup address"}, {"name": "destination_address","type": "string","doc": "customer destination address"}, {"name": "ride_fare","type": "float","doc": "ride fare amount (USD)"}, {"name": "ride_duration","type": "int","doc": "ride duration in minutes"}, {"name": "preferred_unicorn_color","type": {"type": "enum","name": "UnicornPreferredColor","symbols": ["WHITE","BLACK","RED","BLUE","GREY"]}, "default": "WHITE"}, { "name": "recommended_unicorn", "type": { "type": "record", "name": "RecommendedUnicorn", "fields": [ {"name": "unicorn_id","type": "int", "doc": "recommended unicorn id"}, {"name": "color","type": {"type": "enum","name": "unicorn_color","symbols": ["WHITE","RED","BLUE"]}}, {"name": "stars_rating", "type": ["null", "int"], "default": null, "doc": "unicorn star ratings based on customers feedback"} ] } }, { "name": "customer", "type": { "type": "record", "name": "Customer", "fields": [ {"name": "customer_account_no","type": "int", "doc": "customer account number"}, {"name": "first_name","type": "string"}, {"name": "middle_name","type": ["null","string"], "default": null}, {"name": "last_name","type": "string"}, {"name": "email_addresses","type": ["null", {"type":"array", "items":"string"}]}, {"name": "customer_address","type": "string","doc": "customer address"}, {"name": "mode_of_payment","type": {"type": "enum","name": "ModeOfPayment","symbols": ["CARD","CASH"]}, "default": "CARD"}, {"name": "customer_rating", "type": ["null", "int"], "default": null} ] } } ] }

Önkoşullar

Bu çözümü kullanmak için iki AWS hesabınızın olması gerekir:

Bu çözüm için Bölgeyi kullanıyoruz us-east-1, ancak bunu gereksinimlerinize göre değiştirebilirsiniz.

Daha sonra, her hesaptaki kaynakları aşağıdakileri kullanarak oluştururuz: AWS CloudFormation şablonlar.

Hesap B'de kaynak oluşturun

B Hesabında aşağıdaki kaynakları oluşturuyoruz:

  • Bir şema kaydı
  • Bir Avro şeması
  • Bir IAM rolü AWSGlueSchemaRegistryReadonlyAccess yönetilen politika ve tüm A Hesabı IAM sorumlularının bunu üstlenmesine olanak tanıyan bir örnek profili
  • The UnicornRideRequest.avsc CloudFormation şablonunda şema tanımı olarak kullanılan, daha önce gösterilen Avro şeması

Bu kaynakları oluşturmak için uygun izinlere sahip olduğunuzdan emin olun.

  1. B Hesabına giriş yapın.
  2. Aşağıdakileri başlatın CloudFormation yığını.
  3. İçin Yığın adı, girmek SchemaRegistryStack.
  4. İçin Şema Kayıt Defteri adı, girmek unicorn-ride-request-registry.
  5. İçin Avro Şema adı, girmek unicorn-ride-request-schema-avro.
  6. Kafka istemcisinin AWS hesap kimliği için Hesap A Kimliğinizi girin.
  7. İçin Harici Kimlikbenzersiz bir rastgele kimlik girin (örneğin, demo10A), bu hesapta IAM rolünü üstlenirken Hesap A'daki Kafka müşterileri tarafından sağlanmalıdır.

Hesaplar arası güvenlik hakkında daha fazla bilgi için bkz. Karışık milletvekili sorunu.

  1. Yığın tamamlandığında, Çıkışlar yığının sekmesindeki değeri kopyalayın CrossAccountGlueSchemaRegistryRoleArn.

Hesap A'da oluşturulan Kafka üretici ve tüketici uygulamaları, Hesap B'deki Şema Kaydına ve şemaya erişmek için bu rolü üstlenir.

  1. Kaynakların oluşturulduğunu doğrulamak için AWS Glue konsolunda şunu seçin: Şema kayıtları Gezinme çubuğunda ve bulun unicorn-ride-request-registry.
  2. Kayıt defterini seçin unicorn-ride-request-registry ve içerdiğini doğrulayın unicorn-ride-request-schema-avro içinde Şemalar Bölüm.
  3. İçeriğini görmek için şemayı seçin.

Tarafından oluşturulan IAM rolü SchemaRegistryStack yığın, tüm A Hesabı IAM sorumlularının bunu üstlenmesine ve AWS Glue Schema Registry'de okuma eylemleri gerçekleştirmesine olanak tanır. IAM rolünün güven ilişkilerine bakalım.

  1. Üzerinde SchemaRegistryStack yığın Çıkışlar sekmesi, değerini kopyalayın CrossAccountGlueSchemaRegistryRoleName.
  2. IAM konsolunda bu rolü arayın.
  3. Klinik Güven ilişkileri ve A Hesabının listelendiğini doğrulamak için güvenilir varlıklarına bakın.
  4. içinde Koşullar bölümünde bunu onaylayın sts:ExternalId Yığın oluşturma sırasında sağlanan aynı benzersiz rastgele kimliğe sahiptir.

A Hesabında kaynak oluşturun

A Hesabında aşağıdaki kaynakları oluşturuyoruz:

  • Bir VPC
  • Kafka üreticisi ve tüketicisi için EC2 bulut sunucuları
  • Bir AWS Cloud9 ortamı
  • MSK kümesi

Ön koşul olarak, EC2 bulut sunucularına SSH yapabilmek için bir EC2 anahtar çifti oluşturun ve bunu makinenize indirin. Ayrıca bir oluştur MSK kümesi yapılandırması varsayılan değerlerle. CloudFormation'ı oluşturmak için izinlere sahip olmanız gerekir
yığın, EC2 bulut sunucuları, AWS Cloud9 ortamı, MSK kümesi, MSK kümesi yapılandırması ve IAM rolü.

  1. A Hesabına giriş yapın.
  2. Aşağıdakileri başlatın CloudFormation yığını VPC, EC2 bulut sunucularını ve AWS Cloud9 ortamını başlatmak için.
  3. İçin Yığın adı, girmek MSKClientStack.
  4. VPC ve alt ağ CIDR aralıklarını sağlayın.
  5. İçin EC2 Anahtar Çiftimevcut bir EC2 anahtar çiftini seçin.
  6. En son EC2 AMI Kimliği için varsayılan seçeneği seçin.
  7. Hesaplar arası IAM rolü ARN'si için şu değeri kullanın: CrossAccountGlueSchemaRegistryRoleArn (üzerinde mevcut Çıkışlar of tab SchemaRegistryStack).
  8. Yığın başarıyla oluşturulmasını bekleyin.
  9. Aşağıdakileri başlatın CloudFormation yığını MSK kümesini oluşturmak için.
  10. İçin Yığın adı, girmek MSKClusterStack.
  11. Amazon MSK 2.7.1 sürümünü kullanın.
  12. MSK kümesi yapılandırması ARN'si için MSK kümesi yapılandırması ARN'sini girin. Önkoşulun bir parçası olarak oluşturduğunuz bir tane.
  13. MSK kümesi yapılandırması revizyon numarası için 1 girin veya sürümünüze göre değiştirin.
  14. İstemci CloudFormation yığın adı için şunu girin: MSKClientStack (bu yığından önce oluşturduğunuz yığın adı).

Kafka yapımcısını yapılandırma

Kafka üreticisinin merkezi AWS hesabındaki Şema Kaydına erişmesini yapılandırmak için aşağıdaki adımları tamamlayın:

  1. A Hesabına giriş yapın.
  2. AWS Cloud9 konsolunda şunu seçin: Cloud9EC2Bastion tarafından oluşturulan ortam MSKClientStack yığını.
  3. Üzerinde fileto menü seç Yerel Dosyaları Yükle.
  4. Yığını oluştururken daha önce kullandığınız EC2 anahtar çifti dosyasını yükleyin.
  5. Yeni bir terminal açın ve EC2 anahtar çifti izinlerini değiştirin:
    chmod 0400 <keypair PEM file>

  6. içine SSH KafkaProducerInstance EC2 örneğini seçin ve Bölgeyi gereksiniminize göre ayarlayın:
    ssh -i <keypair PEM file> ec2-user@<KafkaProducerInstance Private IP address>
    aws configure set region <region>

  7. Ortam değişkenini ayarlayın MSK_CLUSTER_ARN MSK kümesinin ARN'sine işaret ederek:
    export MSK_CLUSTER_ARN=$(aws kafka list-clusters | jq '.ClusterInfoList[] | select (.ClusterName == "MSKClusterStack") | {ClusterArn} | join (" ")' | tr -d ")

Değiştir .ClusterName MSK kümesi CloudFormation yığını için farklı bir ad kullandıysanız koddaki değer. Küme adı yığın adıyla aynıdır.

  1. Ortam değişkenini ayarlayın BOOTSTRAP_BROKERS önyükleme komisyoncularına işaret ederek:
    export BOOTSTRAP_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerString)

  2. Ortam değişkenlerini doğrulayın:
    echo $MSK_CLUSTER_ARN
    echo $BOOTSTRAP_BROKERS

  3. adında bir Kafka konusu oluşturun unicorn-ride-request-topic Kafka üreticisi ve tüketici uygulamaları tarafından daha sonra kullanılan MSK kümenizde:
    cd ~/kafka ./bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS --topic unicorn-ride-request-topic --create --partitions 3 --replication-factor 2 ./bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS --list

The MSKClientStack yığın, Kafka yapımcı istemcisi JAR dosyasını kopyaladı kafka-cross-account-gsr-producer.jar için KafkaProducerInstance misal. MSK kümesindeki Kafka konusu unicorn-ride-request-topic'e mesaj gönderen Kafka yapımcı istemcisini içerir ve unicorn-ride-request-schema-avro Avro şeması unicorn-ride-request-registry Hesap B'deki şema kaydı. Bu yazının ilerleyen kısımlarında ele alacağımız Kafka yapımcı koduna şuradan ulaşılabilir: GitHub.

  1. Aşağıdaki komutları çalıştırın ve doğrulayın kafka-cross-account-gsr-producer.jar var:
    cd ~
    ls -ls

  2. Kafka yapımcısını çalıştırmak için aşağıdaki komutu çalıştırın. KafkaProducerInstance terminal:
    java -jar kafka-cross-account-gsr-producer.jar -bs $BOOTSTRAP_BROKERS -rn <Account B IAM role arn that Kafka producer application needs to assume> -topic unicorn-ride-request-topic -reg us-east-1 -nm 500 -externalid <Account B IAM role external Id that you used while creating a CF stack in Account B>

Kod aşağıdaki parametrelere sahiptir:

  • -bs - $BOOTSTRAP_BROKERS (MSK kümesi önyükleme aracıları)
  • -rn - CrossAccountGlueSchemaRegistryRoleArn değer SchemaRegistryStack Hesap B'deki yığın çıktıları
  • -başlık – Kafka konusu unicorn-ride-request-topic
  • -reg - us-east-1 (Bölgenize göre değiştirin, AWS STS uç noktası ve Şema Kaydı için kullanılır)
  • -nm: 500 (prodüktör uygulamasının Kafka konusuna gönderdiği mesaj sayısı)
  • -harici kimlik – Aynı harici kimlik (örneğin, demo10A) Hesap B'de CloudFormation yığınını oluştururken kullandığınız

Aşağıdaki ekran görüntüsünde Kafka yapımcı günlükleri gösterilmektedir: Schema Version Id received...bu, Avro şemasını aldığı anlamına gelir unicorn-ride-request-schema-avro Hesap B'den ve Hesap A'daki MSK kümesindeki Kafka konusuna iletiler gönderildi.

Kafka yapımcı kodu

Kafka yapımcı uygulamasının tamamı şu adreste mevcuttur: GitHub. Bu bölümde kodu parçalara ayırıyoruz.

  • getProducerConfig() Aşağıdaki kodda gösterildiği gibi üretici özelliklerini başlatır:
    • VALUE_SERIALIZER_CLASS_CONFIG - GlueSchemaRegistryKafkaSerializer.class.getName() Veri kayıtlarını serileştiren AWS serileştirici uygulaması (uygulama şu adreste mevcuttur: GitHub)
    • REGISTRY_NAME – B Hesabındaki Şema Kaydı
    • SCHEMA_NAME – Hesap B'deki şema adı
    • AVRO_RECORD_TYPE - AvroRecordType.SPECIFIC_RECORD
private Properties getProducerConfig() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); props.put(ProducerConfig.ACKS_CONFIG, "-1"); props.put(ProducerConfig.CLIENT_ID_CONFIG,"msk-cross-account-gsr-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name()); props.put(AWSSchemaRegistryConstants.AWS_REGION,regionName); props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "unicorn-ride-request-registry"); props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "unicorn-ride-request-schema-avro"); props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName()); return props;
}

  • startProducer() Hesap B'deki Şema Kaydına bağlanabilmek için B Hesabındaki rolü üstlenir ve MSK kümesindeki Kafka konusuna mesajlar gönderir:
public void startProducer() { assumeGlueSchemaRegistryRole(); KafkaProducer<String, UnicornRideRequest> producer = new KafkaProducer<String,UnicornRideRequest>(getProducerConfig()); int numberOfMessages = Integer.valueOf(str_numOfMessages); logger.info("Starting to send records..."); for(int i = 0;i < numberOfMessages;i ++) { UnicornRideRequest rideRequest = getRecord(i); String key = "key-" + i; ProducerRecord<String, UnicornRideRequest> record = new ProducerRecord<String, UnicornRideRequest>(topic, key, rideRequest); producer.send(record, new ProducerCallback()); } }

  • assumeGlueSchemaRegistryRole() aşağıdaki kodda gösterildiği gibi, Hesap B'de hesaplar arası Schema Registry IAM rolünü üstlenmek için AWS STS'yi kullanır. (Daha fazla bilgi için bkz. IAM'de geçici güvenlik kimlik bilgileri.) Yanıt stsClient.assumeRole(roleRequest) aşağıdakileri içeren geçici kimlik bilgilerini içerir: accessKeyId, secretAccessKeyVe sessionToken. Daha sonra sistem özelliklerinde geçici kimlik bilgilerini ayarlar. Java için AWS SDK Schema Registry'ye erişirken (Schema Registry seri hale getirici aracılığıyla) bu kimlik bilgilerini kullanır. Daha fazla bilgi için bakınız Kimlik Bilgilerini Kullanma.
    public void assumeGlueSchemaRegistryRole() { try { Region region = Region.of(regionName); if(!Region.regions().contains(region)) throw new RuntimeException("Region : " + regionName + " is invalid."); StsClient stsClient = StsClient.builder().region(region).build(); AssumeRoleRequest roleRequest = AssumeRoleRequest.builder() .roleArn(this.assumeRoleARN) .roleSessionName("kafka-producer-cross-account-glue-schemaregistry-demo") .externalId(this.externalId) .build(); AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest); Credentials myCreds = roleResponse.credentials(); System.setProperty("aws.accessKeyId", myCreds.accessKeyId()); System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey()); System.setProperty("aws.sessionToken", myCreds.sessionToken()); stsClient.close(); } catch (StsException e) { logger.error(e.getMessage()); System.exit(1); } }

  • createUnicornRideRequest() oluşturmak için Avro şeması (tek boynuzlu at sürüş isteği şeması) tarafından oluşturulan sınıfları kullanır. SpecificRecord. Bu yazı için, tek boynuzlu ata binme isteği öznitelik değerleri bu yöntemde sabit kodlanmıştır. Aşağıdaki koda bakın:
    public UnicornRideRequest getRecord(int requestId){ /* Initialise UnicornRideRequest object of class that is generated from AVRO Schema */ UnicornRideRequest rideRequest = UnicornRideRequest.newBuilder() .setRequestId(requestId) .setPickupAddress("Melbourne, Victoria, Australia") .setDestinationAddress("Sydney, NSW, Aus") .setRideFare(1200.50F) .setRideDuration(120) .setPreferredUnicornColor(UnicornPreferredColor.WHITE) .setRecommendedUnicorn(RecommendedUnicorn.newBuilder() .setUnicornId(requestId*2) .setColor(unicorn_color.WHITE) .setStarsRating(5).build()) .setCustomer(Customer.newBuilder() .setCustomerAccountNo(1001) .setFirstName("Dummy") .setLastName("User") .setEmailAddresses(Arrays.asList("demo@example.com")) .setCustomerAddress("Flinders Street Station") .setModeOfPayment(ModeOfPayment.CARD) .setCustomerRating(5).build()).build(); logger.info(rideRequest.toString()); return rideRequest; }

Kafka tüketicisini yapılandırma

The MSKClientStack yığın oluşturdu KafkaConsumerInstance Kafka tüketici uygulaması için örnek. Yığın tarafından oluşturulan tüm bulut sunucularını Amazon EC2 konsolunda görüntüleyebilirsiniz.

Kafka tüketicisinin merkezi AWS hesabındaki Şema Kayıt Defterine erişmesini yapılandırmak için aşağıdaki adımları tamamlayın:

  1. Yeni bir terminal açın Cloud9EC2Bastion AWS Cloud9 ortamı.
  2. içine SSH KafkaConsumerInstance EC2 örneğini seçin ve Bölgeyi gereksiniminize göre ayarlayın:
    ssh -i <keypair PEM file> ec2-user@<KafkaConsumerInstance Private IP address>
    aws configure set region <region>

  3. Ortam değişkenini ayarlayın MSK_CLUSTER_ARN MSK kümesinin ARN'sine işaret ederek:
    export MSK_CLUSTER_ARN=$(aws kafka list-clusters | jq '.ClusterInfoList[] | select (.ClusterName == "MSKClusterStack") | {ClusterArn} | join (" ")' | tr -d ")

Değiştir .ClusterName MSK kümesi CloudFormation yığını için farklı bir ad kullandıysanız değer. Küme adı yığın adıyla aynıdır.

  1. Ortam değişkenini ayarlayın BOOTSTRAP_BROKERS önyükleme komisyoncularına işaret ederek:
    export BOOTSTRAP_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerString)

  2. Ortam değişkenlerini doğrulayın:
    echo $MSK_CLUSTER_ARN
    echo $BOOTSTRAP_BROKERS

The MSKClientStack yığın, Kafka tüketici istemcisi JAR dosyasını kopyaladı. kafka-cross-account-gsr-consumer.jar için KafkaConsumerInstance misal. Kafka konusundaki mesajları okuyan Kafka tüketici istemcisini içerir. unicorn-ride-request-topic MSK kümesinde bulunur ve unicorn-ride-request-schema-avro Avro şeması unicorn-ride-request-registry B Hesabındaki kayıt. Bu yazının ilerleyen kısımlarında ele alacağımız Kafka tüketici kodu şu adreste mevcuttur: GitHub.

  1. Aşağıdaki komutları çalıştırın ve doğrulayın kafka-cross-account-gsr-consumer.jar var:
    cd ~
    ls -ls

  2. Kafka tüketicisini çalıştırmak için aşağıdaki komutu çalıştırın. KafkaConsumerInstance terminal:
    java -jar kafka-cross-account-gsr-consumer.jar -bs $BOOTSTRAP_BROKERS -rn <Account B IAM role arn that Kafka consumer application needs to assume> -topic unicorn-ride-request-topic -reg us-east-1 -externalid <Account B IAM role external Id that you used while creating a CF stack in Account B>

Kod aşağıdaki parametrelere sahiptir:

  • -bs - $BOOTSTRAP_BROKERS (MSK kümesi önyükleme aracıları)
  • -rn - CrossAccountGlueSchemaRegistryRoleArn değer SchemaRegistryStack Hesap B'deki yığın çıktıları
  • -başlık – Kafka konusu unicorn-ride-request-topic
  • -reg - us-east-1 (Bölgenize göre değiştirin, AWS STS uç noktası ve Şema Kaydı için kullanılır)
  • -harici kimlik – Aynı harici kimlik (örneğin, demo10A) Hesap B'de CloudFormation yığınını oluştururken kullandığınız

Aşağıdaki ekran görüntüsü, Kafka tüketici günlüklerinin, Hesap A'daki MSK kümesindeki Kafka konusundaki mesajları başarıyla okuduğunu ve Avro şemasına eriştiğini göstermektedir. unicorn-ride-request-schema-avro itibaren unicorn-ride-request-registry B Hesabındaki şema kaydı.

Benzer günlükleri görüyorsanız, bu, hem Kafka tüketici uygulamalarının, Hesap B'deki merkezi Şema Kaydı ile başarılı bir şekilde bağlantı kurabildiği, hem de Hesap A'daki MSK kümesinden mesaj gönderip tüketirken mesajları doğrulayabildiği anlamına gelir.

Kafka tüketici kodu

Kafka tüketici uygulamasının tamamı şu adreste mevcuttur: GitHub. Bu bölümde kodu parçalara ayırıyoruz.

  • getConsumerConfig() aşağıdaki kodda gösterildiği gibi tüketici özelliklerini başlatır:
    • VALUE_DESERİALIZER_CLASS_CONFIG - GlueSchemaRegistryKafkaDeserializer.class.getName() AWS seri durumdan çıkarıcı uygulaması, SpecificRecord Şema Kayıt Defterindeki kodlanmış şema kimliğine göre (uygulama şu adreste mevcuttur: GitHub).
    • AVRO_RECORD_TYPE - AvroRecordType.SPECIFIC_RECORD
private Properties getConsumerConfig() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "unicorn.riderequest.consumer"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, regionName); props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName()); return props;
}

  • startConsumer() Hesap B'deki Şema Kaydına bağlanabilmek için Hesap B'deki rolü üstlenir ve MSK kümesindeki Kafka konusundan gelen mesajları okur:
public void startConsumer() { logger.info("starting consumer..."); assumeGlueSchemaRegistryRole(); KafkaConsumer<String, UnicornRideRequest> consumer = new KafkaConsumer<String, UnicornRideRequest>(getConsumerConfig()); consumer.subscribe(Collections.singletonList(topic)); int count = 0; while (true) { final ConsumerRecords<String, UnicornRideRequest> records = consumer.poll(Duration.ofMillis(1000)); for (final ConsumerRecord<String, UnicornRideRequest> record : records) { final UnicornRideRequest rideRequest = record.value(); logger.info(String.valueOf(rideRequest.getRequestId())); logger.info(rideRequest.toString()); } }
}

  • assumeGlueSchemaRegistryRole() aşağıdaki kodda gösterildiği gibi, Hesap B'de hesaplar arası Schema Registry IAM rolünü üstlenmek için AWS STS'yi kullanır. stsClient.assumeRole(roleRequest) aşağıdakileri içeren geçici kimlik bilgilerini içerir: accessKeyId, secretAccessKeyVe sessionToken. Daha sonra sistem özelliklerinde geçici kimlik bilgilerini ayarlar. Java SDK'sı, Schema Registry'ye erişirken (Schema Registry serileştiricisi aracılığıyla) bu kimlik bilgilerini kullanır. Daha fazla bilgi için bakınız Kimlik Bilgilerini Kullanma.
public void assumeGlueSchemaRegistryRole() { try { Region region = Region.of(regionName); if(!Region.regions().contains(region)) throw new RuntimeException("Region : " + regionName + " is invalid."); StsClient stsClient = StsClient.builder().region(region).build(); AssumeRoleRequest roleRequest = AssumeRoleRequest.builder() .roleArn(this.assumeRoleARN) .roleSessionName("kafka-consumer-cross-account-glue-schemaregistry-demo") .externalId(this.externalId) .build(); AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest); Credentials myCreds = roleResponse.credentials(); System.setProperty("aws.accessKeyId", myCreds.accessKeyId()); System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey()); System.setProperty("aws.sessionToken", myCreds.sessionToken()); stsClient.close(); } catch (StsException e) { logger.error(e.getMessage()); System.exit(1); } }

Avro şema sınıflarını derleyin ve oluşturun

Uygulamanızı oluşturma ve dağıtmanın diğer tüm aşamalarında olduğu gibi, şema derlemesi ve Avro şema sınıfları oluşturma süreci de CI/CD ardışık düzeninize dahil edilmelidir. Avro şema sınıfları oluşturmanın birden fazla yolu vardır; kullanırız avro-maven-plugin Bu yazı için. CI/CD işlemi ayrıca şunları kullanabilir: avro-tools sınıflar oluşturmak için Avro şemasını derlemek. Aşağıdaki kod nasıl kullanabileceğinizi gösteren bir örnektir avro-tools:

java -jar /path/to/avro-tools-1.10.2.jar compile schema <schema file> <destination> //compiling unicorn_ride_request.avsc
java -jar avro-tools-1.10.2.jar compile schema unicorn_ride_request.avsc .

Uygulamaya genel bakış

Özetlemek gerekirse, merkezi data lake hesabı olan Hesap B'deki AWS Glue Schema Registry'de unicorn yolculuk istek mesajı için bir Avro şeması tanımlayıp kaydetmekle başlıyoruz. Hesap A'da, ilgili uygulama kodlarıyla birlikte bir MSK kümesi ve Kafka üreticisi ve tüketici EC2 örnekleri oluşturuyoruz (kafka-cross-account-gsr-consumer.jar ve kafka-cross-account-gsr-producer.jar) ve CloudFormation yığınını kullanarak bunlara konuşlandırın.

Üretici uygulamasını Hesap A'da çalıştırdığımızda serileştirici (GlueSchemaRegistryKafkaSerializer) yapılandırma tek boynuzlu at sürüş isteği şemasını alırken sağlanan AWS Glue Schema Registry SerDe kitaplığından (UnicornRideRequest.avsc) tek boynuzlu ata binme isteği mesajını serileştirmek için Hesap B'de bulunan merkezi Şema Kaydı'ndan. Hesap B ve Bölgede IAM rolünü (geçici kimlik bilgileri), şema kayıt defteri adını (unicorn-ride-request-registry) ve şema adı (unicorn-ride-request-schema-avro) merkezi Şema Kaydına bağlanmak için yapılandırma olarak sağlanır. Mesaj başarıyla serileştirildikten sonra yapımcı uygulaması onu Kafka konusuna (unicorn-ride-request-topic) MSK kümesinde.

Tüketici uygulamasını Hesap A'da çalıştırdığımızda seri durumdan çıkarıcı (GlueSchemaRegistryKafkaDeserializer) yapılandırma, Kafka konusundan okunan iletiden kodlanmış şema kimliğini çıkarırken sağlanan Schema Registry SerDe kitaplığından (unicorn-ride-request-topic) ve aynı kimliğe ait şemayı B Hesabındaki merkezi Şema Kaydı'ndan alır. Daha sonra mesajı seri durumdan çıkarır. Merkezi Şema Kayıt Defterine bağlanmak için yapılandırma olarak sağlanan Hesap B'deki ve Bölgedeki IAM rolünü (geçici kimlik bilgileri) kullanır. Tüketici uygulaması aynı zamanda Avro'nun SPECIFIC_RECORD seri durumdan çıkarıcıya mesajın belirli bir türde olduğu konusunda bilgi vermek için (tek boynuzlu at sürüş isteği). Mesaj seri durumdan başarıyla çıkarıldıktan sonra tüketici uygulaması onu gereksinimlere göre işler.

Temizlemek

Son adım temizlemektir. Gereksiz ücretlendirmelerden kaçınmak için bu gönderi için kullanılan CloudFormation yığınları tarafından oluşturulan tüm kaynakları kaldırmalısınız. Bunu yapmanın en basit yolu yığınları silmektir. İlk önce şunu sil MSKClusterStack ardından MSKClientStack A Hesabından. Ardından şunu silin: SchemaRegistryStack Hesap B'den.

Sonuç

Bu yazıda, Avro şeması kullanarak mesajları doğrulamak için AWS Glue Schema Registry'nin Amazon MSK ve akış işleme uygulamalarıyla nasıl kullanılacağını gösterdik. Schema Registry'nin merkezi bir AWS hesabında (data lake hesabı) bulunduğu ve Kafka üretici ve tüketici uygulamalarının ayrı bir AWS hesabında bulunduğu dağıtılmış bir mimari oluşturduk. Uygulama ekiplerinin şemaları tek bir yerde tutmasını verimli hale getirmek için merkezi hesaptaki şema kayıt defterinde bir Avro şeması oluşturduk. AWS Glue Schema Registry kimlik tabanlı erişim politikalarını desteklediğinden, ayrı bir hesapta çalışan Kafka üretici ve tüketici uygulamalarının mesajları doğrulamak amacıyla merkezi hesaptan şemaya güvenli bir şekilde erişmesine izin vermek için hesaplar arası IAM rolünü kullandık. Avro şeması önceden kararlaştırıldığı için Avro'yu kullandık SpecificRecord derleme zamanında tür güvenliğini sağlamak ve istemci tarafında çalışma zamanı şeması doğrulama sorunlarını önlemek için. Bu gönderi için kullanılan kod şu adreste mevcuttur: GitHub referans için.

Bu çözümdeki hizmetler ve kaynaklar hakkında daha fazla bilgi edinmek için bkz. AWS Tutkal Şeması Kayıt Defteri, Amazon MSK Geliştirici Kılavuzu, AWS Glue Schema Registry SerDe kitaplığı, ve IAM öğreticisi: IAM rollerini kullanarak AWS hesaplarında erişim yetkisi verin.


Yazar Hakkında

Vikas Bajaj Amazon Web Service'te Baş Çözüm Mimarıdır. Vikas, dijital yerli müşterilerle çalışır ve onlara teknoloji mimarisi ve modelleme ile stratejik iş hedeflerini karşılamaya yönelik seçenekler ve çözümler konusunda tavsiyelerde bulunur. Tasarımların ve çözümlerin verimli, sürdürülebilir ve mevcut ve gelecekteki iş ihtiyaçlarına uygun olmasını sağlar. Mimarlık ve teknoloji tartışmalarının yanı sıra kriket izlemekten ve oynamaktan hoşlanıyor.

spot_img

En Son İstihbarat

spot_img

Bizimle sohbet

Merhaba! Size nasıl yardım edebilirim?