شعار زيفيرنت

تحقق من صحة البيانات المتدفقة عبر Amazon MSK باستخدام المخططات في سجل AWS Glue Schema متعدد الحسابات

التاريخ:

تواجه الأعمال التجارية اليوم نموًا غير مسبوق في حجم البيانات. يتم إنشاء جزء متزايد من البيانات في الوقت الفعلي بواسطة أجهزة إنترنت الأشياء والمواقع الإلكترونية وتطبيقات الأعمال ومصادر أخرى متنوعة. تحتاج الشركات إلى معالجة هذه البيانات وتحليلها بمجرد وصولها لاتخاذ قرارات العمل في الوقت الفعلي. Amazon Managed Streaming لأباتشي كافكا (Amazon MSK) هي خدمة مُدارة بالكامل تتيح إنشاء تطبيقات معالجة التدفق وتشغيلها التي تستخدم Apache Kafka لجمع البيانات ومعالجتها في الوقت الفعلي.

تطبيقات المعالجة المتدفقة باستخدام Apache Kafka لا تتواصل مع بعضها البعض بشكل مباشر ؛ يتواصلون عبر إرسال واستقبال الرسائل حول مواضيع كافكا. لكي تتواصل تطبيقات معالجة الدفق بكفاءة وثقة ، يجب تحديد بنية حمولة الرسائل من حيث السمات وأنواع البيانات. يصف هذا الهيكل تطبيقات المخطط التي تستخدمها عند إرسال الرسائل واستلامها. ومع ذلك ، مع وجود عدد كبير من تطبيقات المنتج والمستهلك ، حتى التغيير البسيط في المخطط (إزالة حقل أو إضافة حقل جديد أو تغيير نوع البيانات) قد يتسبب في حدوث مشكلات للتطبيقات النهائية التي يصعب تصحيحها وإصلاحها.

تقليديًا ، اعتمدت الفرق على عمليات إدارة التغيير (مثل الموافقات ونافذة الصيانة) أو الآليات غير الرسمية الأخرى (الوثائق ورسائل البريد الإلكتروني وأدوات التعاون وما إلى ذلك) لإبلاغ بعضها البعض بتغييرات مخطط البيانات. ومع ذلك ، فإن هذه الآليات لا تتسع وتكون عرضة للأخطاء. ال سجل مخطط AWS Glue يسمح لك بنشر المخططات واكتشافها والتحكم فيها والتحقق منها وتطويرها مركزيًا لتطبيقات معالجة الدفق. مع ال سجل مخطط AWS Glue، يمكنك إدارة وتنفيذ المخططات على تطبيقات دفق البيانات باستخدام اباتشي كافكا، أمازون MSK ، الأمازون كينسيس دفق البيانات, تحليلات بيانات Amazon Kinesis لـ Apache Flinkو AWS لامدا.

يوضح هذا المنشور كيف تتحقق تطبيقات معالجة دفق Apache Kafka من صحة الرسائل باستخدام ملف اباتشي افرو تم تخزين المخطط في ملف سجل AWS Glue Schema مقيم في حساب AWS مركزي. نحن نستخدم ال مكتبة SerDe لسجل مخطط الغراء في AWS و أفرو SpecificRecord للتحقق من صحة الرسائل في تطبيقات معالجة الدفق أثناء إرسال الرسائل واستلامها من موضوع كافكا على مجموعة Amazon MSK. على الرغم من أننا نستخدم مخطط Avro لهذا المنشور ، إلا أن نفس النهج والمفهوم ينطبقان على مخططات JSON أيضًا.

حالة الاستخدام

دعنا نفترض شركة مشاركة خيالية تقدم رحلات أحادية القرن. لرسم رؤى قابلة للتنفيذ ، يحتاجون إلى معالجة دفق من رسائل طلب ركوب يونيكورن. إنهم يتوقعون أن تحظى المشاوير بشعبية كبيرة ويريدون التأكد من أن حلولهم يمكن توسيع نطاقها. يقومون أيضًا ببناء بحيرة بيانات مركزية حيث يتم تخزين جميع بيانات التدفق والتشغيل لتحليلها. إنهم مهووسون بالعملاء ، لذلك يتوقعون إضافة ميزات ممتعة جديدة إلى الرحلات المستقبلية ، مثل اختيار لون شعر وحيد القرن الخاص بك ، وسيحتاجون إلى عكس هذه السمات في رسائل طلب الركوب. لتجنب المشكلات في التطبيقات المتلقية للمعلومات بسبب تغييرات المخطط المستقبلية ، يحتاجون إلى آلية للتحقق من صحة الرسائل باستخدام مخطط مستضاف في سجل مخطط مركزي. إن وجود المخططات في سجل مخطط مركزي يسهل على فرق التطبيق نشر المخططات والتحقق منها وتطويرها وصيانتها في مكان واحد.

حل نظرة عامة

تستخدم الشركة Amazon MSK لالتقاط وتوزيع رسائل طلب ركوب يونيكورن على نطاق واسع. يعرّفون مخطط Avro لطلبات ركوب الأحادي القرن لأنه يوفر هياكل بيانات غنية ، ويدعم التعيين المباشر إلى JSON ، بالإضافة إلى تنسيق بيانات مضغوط وسريع وثنائي. نظرًا لأنه تم الاتفاق على المخطط مسبقًا ، فقد قرروا استخدام Avro SpecificRecord.SpecificRecord هي واجهة من مكتبة Avro تسمح باستخدام سجل Avro باعتباره POJO. يتم ذلك عن طريق إنشاء فئة (أو فئات) Java من المخطط ، باستخدام avro-maven-plugin. هم يستخدمون إدارة الهوية والوصول AWS (انا) عبر الأدوار للسماح لتطبيقات المنتج والمستهلك من حساب AWS الآخر بالوصول الآمن والآمن إلى المخططات في حساب Schema Registry المركزي.

يوجد AWS Glue Schema Registry في الحساب ب ، بينما توجد مجموعة MSK وتطبيقات المنتج والمستهلك من كافكا في الحساب أ. نستخدم أدوار IAM التالية لتمكين الوصول عبر الحسابات إلى AWS Glue Schema Registry. يفترض عملاء Apache Kafka في الحساب A دورًا في الحساب B باستخدام سياسة قائمة على الهوية لأن AWS Glue Schema Registry لا يدعم السياسات القائمة على الموارد.

  • الحساب A دور IAM - يسمح لتطبيقات المنتج والمستهلك بتولي دور IAM في الحساب ب.
  • دور الحساب ب IAM - يثق في جميع مبادئ IAM من الحساب A ويسمح لهم بتنفيذ إجراءات القراءة على AWS Glue Schema Registry في الحساب ب. في سيناريو حالة الاستخدام الحقيقي ، يجب تحديد نطاق مبادئ IAM التي يمكنها تولي الأدوار عبر الحسابات بشكل أكثر تحديدًا.

يوضح مخطط العمارة التالي الحل:

الحل يعمل على النحو التالي:

  1. يفترض منتج كافكا الذي يعمل في الحساب أ دور IAM لسجل المخطط عبر الحسابات في الحساب ب عن طريق استدعاء خدمة رمز الأمان من AWS (أوس ستس) assumeRole API.
  2. يسترد منتج كافكا معرّف إصدار مخطط Avro لطلب ركوب أحادي القرن من AWS Glue Schema Registry للمخطط المضمّن في POJO لطلب ركوب أحادي القرن. تتم إدارة جلب معرف إصدار المخطط داخليًا بواسطة مُسلسل AWS Glue Schema Registry SerDe. يجب تكوين المسلسل كجزء من تكوين منتج كافكا.
  3. إذا كان المخطط موجودًا في AWS Glue Schema Registry ، يقوم المُسلسل بتزيين سجل البيانات بمعرف إصدار المخطط ثم يقوم بتسلسله قبل تسليمه إلى موضوع كافكا في مجموعة MSK.
  4. يفترض مستهلك كافكا الذي يعمل في الحساب أ دور IAM لسجل المخطط عبر الحسابات في الحساب ب من خلال استدعاء AWS STS assumeRole API.
  5. يبدأ مستهلك كافكا في استطلاع موضوع كافكا في كتلة MSK لسجلات البيانات.
  6. يسترد مستهلك كافكا طلب ركوب أحادي القرن مخطط Avro من AWS Glue Schema Registry ، والذي يطابق معرف إصدار المخطط المشفر في سجل بيانات طلب ركوب يونيكورن. إحضار المخطط
    تتم إدارة a داخليًا بواسطة أداة إزالة التسلسل من AWS Glue Schema Registry SerDe. يجب تكوين جهاز إلغاء التسلسل كجزء من تكوين مستهلك كافكا. إذا كان المخطط موجودًا في AWS Glue Schema Registry ، يقوم برنامج إلغاء التسلسل بإلغاء تسلسل سجل البيانات في طلب ركوب أحادي القرن POJO للمستهلك لمعالجته.

تدعم مكتبة AWS Glue Schema Registry SerDe أيضًا تكوين الضغط الاختياري لحفظ عمليات نقل البيانات. لمزيد من المعلومات حول Schema Registry ، راجع كيف يعمل Schema Registry.

طلب ركوب يونيكورن مخطط أفرو

المخطط التالي (UnicornRideRequest.avsc) سجل يمثل طلب ركوب وحيد القرن ، والذي يحتوي على سمات طلب الركوب جنبًا إلى جنب مع سمات العميل وسمات يونيكورن الموصى بها من قبل النظام:

{ "type": "record", "name": "UnicornRideRequest", "namespace": "demo.glue.schema.registry.avro", "fields": [ {"name": "request_id", "type": "int", "doc": "customer request id"}, {"name": "pickup_address","type": "string","doc": "customer pickup address"}, {"name": "destination_address","type": "string","doc": "customer destination address"}, {"name": "ride_fare","type": "float","doc": "ride fare amount (USD)"}, {"name": "ride_duration","type": "int","doc": "ride duration in minutes"}, {"name": "preferred_unicorn_color","type": {"type": "enum","name": "UnicornPreferredColor","symbols": ["WHITE","BLACK","RED","BLUE","GREY"]}, "default": "WHITE"}, { "name": "recommended_unicorn", "type": { "type": "record", "name": "RecommendedUnicorn", "fields": [ {"name": "unicorn_id","type": "int", "doc": "recommended unicorn id"}, {"name": "color","type": {"type": "enum","name": "unicorn_color","symbols": ["WHITE","RED","BLUE"]}}, {"name": "stars_rating", "type": ["null", "int"], "default": null, "doc": "unicorn star ratings based on customers feedback"} ] } }, { "name": "customer", "type": { "type": "record", "name": "Customer", "fields": [ {"name": "customer_account_no","type": "int", "doc": "customer account number"}, {"name": "first_name","type": "string"}, {"name": "middle_name","type": ["null","string"], "default": null}, {"name": "last_name","type": "string"}, {"name": "email_addresses","type": ["null", {"type":"array", "items":"string"}]}, {"name": "customer_address","type": "string","doc": "customer address"}, {"name": "mode_of_payment","type": {"type": "enum","name": "ModeOfPayment","symbols": ["CARD","CASH"]}, "default": "CARD"}, {"name": "customer_rating", "type": ["null", "int"], "default": null} ] } } ] }

المتطلبات الأساسية المسبقة

لاستخدام هذا الحل ، يجب أن يكون لديك حسابان على AWS:

لهذا الحل ، نستخدم المنطقة us-east-1، ولكن يمكنك تغيير هذا وفقًا لمتطلباتك.

بعد ذلك ، نقوم بإنشاء الموارد في كل حساب باستخدام تكوين سحابة AWS القوالب.

إنشاء الموارد في الحساب ب

نقوم بإنشاء الموارد التالية في الحساب ب:

  • سجل مخطط
  • مخطط أفرو
  • دور IAM مع AWSGlueSchemaRegistryReadonlyAccess السياسة المُدارة وملف تعريف المثيل ، والذي يسمح لجميع مديري الحساب A IAM بتوليها
  • UnicornRideRequest.avsc تم عرض مخطط Avro سابقًا ، والذي يتم استخدامه كتعريف مخطط في قالب CloudFormation

تأكد من أن لديك الأذونات المناسبة لإنشاء هذه الموارد.

  1. سجّل الدخول إلى الحساب ب.
  2. قم بتشغيل ما يلي مكدس CloudFormation.
  3. في حالة اسم المكدس، أدخل SchemaRegistryStack.
  4. في حالة اسم سجل المخطط، أدخل unicorn-ride-request-registry.
  5. في حالة اسم مخطط أفرو، أدخل unicorn-ride-request-schema-avro.
  6. بالنسبة لمعرف حساب AWS لعميل كافكا ، أدخل معرف حسابك.
  7. في حالة المعرف الخارجي، أدخل معرفًا عشوائيًا فريدًا (على سبيل المثال ، demo10A) ، والتي يجب أن يقدمها عملاء كافكا في الحساب أ أثناء تولي دور IAM في هذا الحساب.

لمزيد من المعلومات حول الأمان عبر الحسابات ، راجع نائب المرتبك مشكلة.

  1. عند اكتمال المكدس ، يتم تثبيت ملف النواتج علامة تبويب المكدس ، انسخ قيمة CrossAccountGlueSchemaRegistryRoleArn.

يتولى منتج كافكا وتطبيقات المستهلك التي تم إنشاؤها في الحساب أ هذا الدور للوصول إلى سجل المخطط والمخطط في الحساب ب.

  1. للتحقق من إنشاء الموارد ، في وحدة تحكم AWS Glue ، اختر سجلات المخطط في شريط التنقل ، وحدد موقع unicorn-ride-request-registry.
  2. اختر التسجيل unicorn-ride-request-registry وتحقق من أنه يحتوي على unicorn-ride-request-schema-avro في ال المخططات والقسم الخاص به.
  3. اختر المخطط لمعرفة محتواه.

دور IAM الذي أنشأه SchemaRegistryStack المكدس يسمح لجميع مبادئ Account A IAM بافتراض ذلك وتنفيذ إجراءات القراءة على AWS Glue Schema Registry. لنلق نظرة على علاقات الثقة لدور IAM.

  1. على SchemaRegistryStack كومة النواتج علامة التبويب ، انسخ قيمة CrossAccountGlueSchemaRegistryRoleName.
  2. في وحدة تحكم IAM ، ابحث عن هذا الدور.
  3. اختار علاقات الثقة وانظر إلى الكيانات الموثوقة بها لتأكيد أن الحساب أ مدرج.
  4. في مجلة الظروف قسم تأكيد ذلك sts:ExternalId له نفس المعرف العشوائي الفريد المقدم أثناء إنشاء المكدس.

إنشاء الموارد في الحساب أ

نقوم بإنشاء الموارد التالية في الحساب أ:

  • VPC
  • مثيلات EC2 لمنتج ومستهلك كافكا
  • بيئة AWS Cloud9
  • كتلة MSK

كشرط أساسي ، قم بإنشاء زوج مفاتيح EC2 وقم بتنزيله على جهازك لتتمكن من SSH في مثيلات EC2. قم أيضًا بإنشاء ملف تكوين كتلة MSK مع القيم الافتراضية. يجب أن يكون لديك أذونات لإنشاء CloudFormation
المكدس ومثيلات EC2 وبيئة AWS Cloud9 ومجموعة MSK وتكوين مجموعة MSK ودور IAM.

  1. سجّل الدخول إلى الحساب أ.
  2. قم بتشغيل ما يلي مكدس CloudFormation لتشغيل مثيلات VPC و EC2 و AWS Cloud9.
  3. في حالة اسم المكدس، أدخل MSKClientStack.
  4. قم بتوفير نطاقات CIDR والشبكة الفرعية VPC.
  5. في حالة زوج المفاتيح EC2، اختر زوج مفاتيح EC2 موجود.
  6. للحصول على أحدث معرف EC2 AMI ، حدد الخيار الافتراضي.
  7. بالنسبة لدور IAM عبر الحسابات ARN ، استخدم قيمة CrossAccountGlueSchemaRegistryRoleArn (متوفر على النواتج علامة التبويب SchemaRegistryStack).
  8. انتظر حتى يتم إنشاء المكدس بنجاح.
  9. قم بتشغيل ما يلي مكدس CloudFormation لإنشاء كتلة MSK.
  10. في حالة اسم المكدس، أدخل MSKClusterStack.
  11. استخدم الإصدار 2.7.1 من Amazon MSK.
  12. بالنسبة لتكوين كتلة MSK ARN ، أدخل تكوين مجموعة MSK ARN. واحد قمت بإنشائه كجزء من المتطلبات المسبقة.
  13. بالنسبة إلى رقم مراجعة تكوين مجموعة MSK ، أدخل 1 أو قم بتغييره وفقًا لإصدارك.
  14. بالنسبة إلى اسم مكدس CloudFormation للعميل ، أدخل MSKClientStack (اسم المكدس الذي قمت بإنشائه قبل هذا المكدس).

تكوين منتج كافكا

لتكوين منتج Kafka الذي يصل إلى Schema Registry في حساب AWS المركزي ، أكمل الخطوات التالية:

  1. سجّل الدخول إلى الحساب أ.
  2. في وحدة تحكم AWS Cloud9 ، اختر ملف Cloud9EC2Bastion البيئة التي أنشأتها MSKClientStack كومة.
  3. على قم بتقديم القائمة، اختر تحميل الملفات المحلية.
  4. قم بتحميل ملف EC2 keypair الذي استخدمته سابقًا أثناء إنشاء الحزمة.
  5. افتح محطة طرفية جديدة وقم بتغيير أذونات EC2 keypair:
    chmod 0400 <keypair PEM file>

  6. SSH في ملف KafkaProducerInstance مثيل EC2 وقم بتعيين المنطقة وفقًا لمتطلباتك:
    ssh -i <keypair PEM file> ec2-user@<KafkaProducerInstance Private IP address>
    aws configure set region <region>

  7. اضبط متغير البيئة MSK_CLUSTER_ARN مشيرًا إلى ARN الخاص بمجموعة MSK:
    export MSK_CLUSTER_ARN=$(aws kafka list-clusters | jq '.ClusterInfoList[] | select (.ClusterName == "MSKClusterStack") | {ClusterArn} | join (" ")' | tr -d ")

تغيير .ClusterName القيمة في الكود إذا استخدمت اسمًا مختلفًا لمكدس CloudFormation العنقودي MSK. اسم الكتلة هو نفس اسم المكدس.

  1. اضبط متغير البيئة BOOTSTRAP_BROKERS مشيرا إلى وسطاء التمهيد:
    export BOOTSTRAP_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerString)

  2. تحقق من متغيرات البيئة:
    echo $MSK_CLUSTER_ARN
    echo $BOOTSTRAP_BROKERS

  3. أنشئ موضوع كافكا يسمى unicorn-ride-request-topic في مجموعة MSK الخاصة بك ، والتي يستخدمها منتج كافكا وتطبيقات المستهلك لاحقًا:
    cd ~/kafka ./bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS --topic unicorn-ride-request-topic --create --partitions 3 --replication-factor 2 ./bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS --list

MSKClientStack قام المكدس بنسخ ملف JAR الخاص بعميل منتج كافكا المسمى kafka-cross-account-gsr-producer.jar إلى KafkaProducerInstance نموذج. يحتوي على عميل منتج كافكا الذي يرسل رسائل إلى موضوع كافكا موضوع طلب ركوب أحادي القرن على مجموعة MSK والوصول إلى unicorn-ride-request-schema-avro مخطط Avro من unicorn-ride-request-registry سجل المخطط في الحساب ب. يتوفر كود منتج كافكا ، الذي نغطيه لاحقًا في هذا المنشور ، على GitHub جيثب:.

  1. قم بتشغيل الأوامر التالية وتحقق kafka-cross-account-gsr-producer.jar موجود:
    cd ~
    ls -ls

  2. قم بتشغيل الأمر التالي لتشغيل منتج كافكا في ملف KafkaProducerInstance طرفية:
    java -jar kafka-cross-account-gsr-producer.jar -bs $BOOTSTRAP_BROKERS -rn <Account B IAM role arn that Kafka producer application needs to assume> -topic unicorn-ride-request-topic -reg us-east-1 -nm 500 -externalid <Account B IAM role external Id that you used while creating a CF stack in Account B>

يحتوي الكود على المعلمات التالية:

  • - $BOOTSTRAP_BROKERS (وسطاء التمهيد العنقودي MSK)
  • -rn - و CrossAccountGlueSchemaRegistryRoleArn قيمة من SchemaRegistryStack كومة المخرجات في الحساب ب
  • -عنوان - موضوع كافكا unicorn-ride-request-topic
  • -ريغ - us-east-1 (قم بتغييره وفقًا لمنطقتك ، حيث يتم استخدامه لنقطة نهاية AWS STS وسجل المخطط)
  • -نانومتر: 500 (عدد الرسائل التي يرسلها تطبيق المنتج إلى موضوع كافكا)
  • معرف خارجي - نفس المعرف الخارجي (على سبيل المثال ، demo10A) التي استخدمتها أثناء إنشاء مكدس CloudFormation في الحساب ب

تُظهر لقطة الشاشة التالية عرض سجلات منتج كافكا Schema Version Id received...، مما يعني أنه قام باسترداد مخطط Avro unicorn-ride-request-schema-avro من الحساب B وتم إرسال الرسائل إلى موضوع كافكا في مجموعة MSK في الحساب أ.

كود منتج كافكا

تطبيق منتج كافكا الكامل متاح على GitHub جيثب:. في هذا القسم ، نقوم بتفصيل الكود.

  • getProducerConfig() يهيئ خصائص المنتج ، كما هو موضح في الكود التالي:
    • VALUE_SERIALIZER_CLASS_CONFIG - و GlueSchemaRegistryKafkaSerializer.class.getName() تطبيق AWS المتسلسل الذي يسلسل سجلات البيانات (التنفيذ متاح في GitHub جيثب:)
    • التسجيل_NAME - سجل المخطط من الحساب ب
    • SCHEMA_NAME - اسم المخطط من الحساب ب
    • AVRO_RECORD_TYPE - AvroRecordType.SPECIFIC_RECORD
private Properties getProducerConfig() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); props.put(ProducerConfig.ACKS_CONFIG, "-1"); props.put(ProducerConfig.CLIENT_ID_CONFIG,"msk-cross-account-gsr-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name()); props.put(AWSSchemaRegistryConstants.AWS_REGION,regionName); props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "unicorn-ride-request-registry"); props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "unicorn-ride-request-schema-avro"); props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName()); return props;
}

  • startProducer() يفترض الدور في الحساب B ليكون قادرًا على الاتصال بـ Schema Registry في الحساب B ويرسل رسائل إلى موضوع كافكا في مجموعة MSK:
public void startProducer() { assumeGlueSchemaRegistryRole(); KafkaProducer<String, UnicornRideRequest> producer = new KafkaProducer<String,UnicornRideRequest>(getProducerConfig()); int numberOfMessages = Integer.valueOf(str_numOfMessages); logger.info("Starting to send records..."); for(int i = 0;i < numberOfMessages;i ++) { UnicornRideRequest rideRequest = getRecord(i); String key = "key-" + i; ProducerRecord<String, UnicornRideRequest> record = new ProducerRecord<String, UnicornRideRequest>(topic, key, rideRequest); producer.send(record, new ProducerCallback()); } }

  • assumeGlueSchemaRegistryRole() كما هو موضح في الكود التالي ، يستخدم AWS STS لتولي دور IAM لـ Schema Registry عبر الحسابات في الحساب ب (لمزيد من المعلومات ، راجع أوراق اعتماد الأمان المؤقتة في IAM.) الرد من stsClient.assumeRole(roleRequest) يحتوي على بيانات الاعتماد المؤقتة ، والتي تشمل accessKeyId, secretAccessKey، و sessionToken. ثم يقوم بتعيين بيانات الاعتماد المؤقتة في خصائص النظام. ال AWS SDK لـ Java يستخدم بيانات الاعتماد هذه أثناء الوصول إلى Schema Registry (من خلال Schema Registry serializer). لمزيد من المعلومات، راجع باستخدام أوراق الاعتماد.
    public void assumeGlueSchemaRegistryRole() { try { Region region = Region.of(regionName); if(!Region.regions().contains(region)) throw new RuntimeException("Region : " + regionName + " is invalid."); StsClient stsClient = StsClient.builder().region(region).build(); AssumeRoleRequest roleRequest = AssumeRoleRequest.builder() .roleArn(this.assumeRoleARN) .roleSessionName("kafka-producer-cross-account-glue-schemaregistry-demo") .externalId(this.externalId) .build(); AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest); Credentials myCreds = roleResponse.credentials(); System.setProperty("aws.accessKeyId", myCreds.accessKeyId()); System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey()); System.setProperty("aws.sessionToken", myCreds.sessionToken()); stsClient.close(); } catch (StsException e) { logger.error(e.getMessage()); System.exit(1); } }

  • createUnicornRideRequest() يستخدم مخطط Avro (مخطط طلب ركوب أحادي القرن) الفئات التي تم إنشاؤها لإنشاء ملف SpecificRecord. بالنسبة إلى هذا المنشور ، يتم ترميز قيم سمات طلب ركوب أحادي القرن في هذه الطريقة. انظر الكود التالي:
    public UnicornRideRequest getRecord(int requestId){ /* Initialise UnicornRideRequest object of class that is generated from AVRO Schema */ UnicornRideRequest rideRequest = UnicornRideRequest.newBuilder() .setRequestId(requestId) .setPickupAddress("Melbourne, Victoria, Australia") .setDestinationAddress("Sydney, NSW, Aus") .setRideFare(1200.50F) .setRideDuration(120) .setPreferredUnicornColor(UnicornPreferredColor.WHITE) .setRecommendedUnicorn(RecommendedUnicorn.newBuilder() .setUnicornId(requestId*2) .setColor(unicorn_color.WHITE) .setStarsRating(5).build()) .setCustomer(Customer.newBuilder() .setCustomerAccountNo(1001) .setFirstName("Dummy") .setLastName("User") .setEmailAddresses(Arrays.asList("demo@example.com")) .setCustomerAddress("Flinders Street Station") .setModeOfPayment(ModeOfPayment.CARD) .setCustomerRating(5).build()).build(); logger.info(rideRequest.toString()); return rideRequest; }

تكوين مستهلك كافكا

MSKClientStack كومة خلق KafkaConsumerInstance على سبيل المثال لتطبيق كافكا للمستهلك. يمكنك عرض جميع المثيلات التي تم إنشاؤها بواسطة المكدس على وحدة تحكم Amazon EC2.

لتكوين مستهلك كافكا الذي يصل إلى سجل المخطط في حساب AWS المركزي ، أكمل الخطوات التالية:

  1. افتح محطة طرفية جديدة في Cloud9EC2Bastion بيئة AWS Cloud9.
  2. SSH في ملف KafkaConsumerInstance مثيل EC2 وقم بتعيين المنطقة وفقًا لمتطلباتك:
    ssh -i <keypair PEM file> ec2-user@<KafkaConsumerInstance Private IP address>
    aws configure set region <region>

  3. اضبط متغير البيئة MSK_CLUSTER_ARN مشيرًا إلى ARN الخاص بمجموعة MSK:
    export MSK_CLUSTER_ARN=$(aws kafka list-clusters | jq '.ClusterInfoList[] | select (.ClusterName == "MSKClusterStack") | {ClusterArn} | join (" ")' | tr -d ")

تغيير .ClusterName القيمة إذا استخدمت اسمًا مختلفًا لمكدس CloudFormation الخاص بمجموعة MSK. اسم الكتلة هو نفس اسم المكدس.

  1. اضبط متغير البيئة BOOTSTRAP_BROKERS مشيرا إلى وسطاء التمهيد:
    export BOOTSTRAP_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerString)

  2. تحقق من متغيرات البيئة:
    echo $MSK_CLUSTER_ARN
    echo $BOOTSTRAP_BROKERS

MSKClientStack قام المكدس بنسخ ملف JAR لعميل كافكا المسمى kafka-cross-account-gsr-consumer.jar إلى KafkaConsumerInstance نموذج. يحتوي على عميل مستهلك كافكا الذي يقرأ رسائل من موضوع كافكا unicorn-ride-request-topic على الكتلة MSK والوصول إلى unicorn-ride-request-schema-avro مخطط Avro من unicorn-ride-request-registry التسجيل في الحساب "ب". رمز المستهلك الخاص بكافكا ، الذي سنغطيه لاحقًا في هذا المنشور ، متاح على GitHub جيثب:.

  1. قم بتشغيل الأوامر التالية وتحقق kafka-cross-account-gsr-consumer.jar موجود:
    cd ~
    ls -ls

  2. قم بتشغيل الأمر التالي لتشغيل مستهلك كافكا في ملف KafkaConsumerInstance طرفية:
    java -jar kafka-cross-account-gsr-consumer.jar -bs $BOOTSTRAP_BROKERS -rn <Account B IAM role arn that Kafka consumer application needs to assume> -topic unicorn-ride-request-topic -reg us-east-1 -externalid <Account B IAM role external Id that you used while creating a CF stack in Account B>

يحتوي الكود على المعلمات التالية:

  • - $BOOTSTRAP_BROKERS (وسطاء التمهيد العنقودي MSK)
  • -rn - و CrossAccountGlueSchemaRegistryRoleArn قيمة من SchemaRegistryStack كومة المخرجات في الحساب ب
  • -عنوان - موضوع كافكا unicorn-ride-request-topic
  • -ريغ - us-east-1 (قم بتغييره وفقًا لمنطقتك ، حيث يتم استخدامه لنقطة نهاية AWS STS وسجل المخطط)
  • معرف خارجي - نفس المعرف الخارجي (على سبيل المثال ، demo10A) التي استخدمتها أثناء إنشاء مكدس CloudFormation في الحساب ب

تُظهر لقطة الشاشة التالية سجلات مستهلك كافكا تقرأ بنجاح الرسائل من موضوع كافكا على مجموعة MSK في الحساب أ والوصول إلى مخطط Avro unicorn-ride-request-schema-avro من unicorn-ride-request-registry سجل المخطط في الحساب ب.

إذا رأيت السجلات المماثلة ، فهذا يعني أن كلا من تطبيقات مستهلك كافكا تمكنت من الاتصال بنجاح بسجل المخطط المركزي في الحساب ب وقادرة على التحقق من صحة الرسائل أثناء إرسال واستهلاك الرسائل من مجموعة MSK في الحساب أ.

كود مستهلك كافكا

التطبيق الكامل للمستهلك كافكا متاح على GitHub جيثب:. في هذا القسم ، نقوم بتفصيل الكود.

  • getConsumerConfig() تهيئة خصائص المستهلك ، كما هو موضح في الكود التالي:
    • VALUE_DESERIALIZER_CLASS_CONFIG - و GlueSchemaRegistryKafkaDeserializer.class.getName() تنفيذ برنامج إلغاء التسلسل من AWS الذي يقوم بإلغاء تسلسل ملفات SpecificRecord وفقًا لمعرّف المخطط المشفر من Schema Registry (يتوفر التطبيق على GitHub جيثب:).
    • AVRO_RECORD_TYPE - AvroRecordType.SPECIFIC_RECORD
private Properties getConsumerConfig() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "unicorn.riderequest.consumer"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, regionName); props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName()); return props;
}

  • startConsumer() يفترض الدور في الحساب B ليكون قادرًا على الاتصال بـ Schema Registry في الحساب B ويقرأ الرسائل من موضوع كافكا في مجموعة MSK:
public void startConsumer() { logger.info("starting consumer..."); assumeGlueSchemaRegistryRole(); KafkaConsumer<String, UnicornRideRequest> consumer = new KafkaConsumer<String, UnicornRideRequest>(getConsumerConfig()); consumer.subscribe(Collections.singletonList(topic)); int count = 0; while (true) { final ConsumerRecords<String, UnicornRideRequest> records = consumer.poll(Duration.ofMillis(1000)); for (final ConsumerRecord<String, UnicornRideRequest> record : records) { final UnicornRideRequest rideRequest = record.value(); logger.info(String.valueOf(rideRequest.getRequestId())); logger.info(rideRequest.toString()); } }
}

  • assumeGlueSchemaRegistryRole() كما هو موضح في الكود التالي ، يستخدم AWS STS لتولي دور IAM لـ Schema Registry عبر الحسابات في الحساب ب. الاستجابة من stsClient.assumeRole(roleRequest) يحتوي على بيانات الاعتماد المؤقتة ، والتي تشمل accessKeyId, secretAccessKey، و sessionToken. ثم يقوم بتعيين بيانات الاعتماد المؤقتة في خصائص النظام. يستخدم SDK لـ Java بيانات الاعتماد هذه أثناء الوصول إلى Schema Registry (من خلال برنامج تسلسل سجل المخطط). لمزيد من المعلومات، راجع باستخدام أوراق الاعتماد.
public void assumeGlueSchemaRegistryRole() { try { Region region = Region.of(regionName); if(!Region.regions().contains(region)) throw new RuntimeException("Region : " + regionName + " is invalid."); StsClient stsClient = StsClient.builder().region(region).build(); AssumeRoleRequest roleRequest = AssumeRoleRequest.builder() .roleArn(this.assumeRoleARN) .roleSessionName("kafka-consumer-cross-account-glue-schemaregistry-demo") .externalId(this.externalId) .build(); AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest); Credentials myCreds = roleResponse.credentials(); System.setProperty("aws.accessKeyId", myCreds.accessKeyId()); System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey()); System.setProperty("aws.sessionToken", myCreds.sessionToken()); stsClient.close(); } catch (StsException e) { logger.error(e.getMessage()); System.exit(1); } }

تجميع وإنشاء فئات مخطط Avro

مثل أي جزء آخر من بناء ونشر تطبيقك ، يجب تضمين تجميع المخطط وعملية إنشاء فئات مخطط Avro في خط أنابيب CI / CD. هناك طرق متعددة لإنشاء فئات مخطط Avro ؛ نحن نستخدم avro-maven-plugin لهذا المنصب. يمكن أيضًا استخدام عملية CI / CD avro-tools لتجميع مخطط Avro لإنشاء فئات. الكود التالي هو مثال على كيفية استخدامك avro-tools:

java -jar /path/to/avro-tools-1.10.2.jar compile schema <schema file> <destination> //compiling unicorn_ride_request.avsc
java -jar avro-tools-1.10.2.jar compile schema unicorn_ride_request.avsc .

نظرة عامة على التنفيذ

للتلخيص ، نبدأ بتحديد وتسجيل مخطط Avro لرسالة طلب ركوب يونيكورن في AWS Glue Schema Registry في الحساب B ، حساب بحيرة البيانات المركزية. في الحساب أ ، نقوم بإنشاء مجموعة MSK ومثيلات منتج كافكا ومستهلك EC2 مع كود التطبيق الخاص بها (kafka-cross-account-gsr-consumer.jar و kafka-cross-account-gsr-producer.jar) ونشرها فيها باستخدام مكدس CloudFormation.

عندما نقوم بتشغيل تطبيق المنتج في الحساب أ ، فإن المسلسل (GlueSchemaRegistryKafkaSerializer) من مكتبة AWS Glue Schema Registry SerDe المقدمة عندما يحصل التكوين على مخطط طلب ركوب أحادي القرن (UnicornRideRequest.avsc) من سجل المخطط المركزي المقيم في الحساب "ب" لإجراء تسلسل لرسالة طلب ركوب يونيكورن. يستخدم دور IAM (بيانات الاعتماد المؤقتة) في الحساب B والمنطقة ، واسم تسجيل المخطط (unicorn-ride-request-registry) ، واسم المخطط (unicorn-ride-request-schema-avro) يتم توفيره كتكوين للاتصال بسجل المخطط المركزي. بعد أن يتم تسلسل الرسالة بنجاح ، يرسلها تطبيق المنتج إلى موضوع كافكا (unicorn-ride-request-topic) على كتلة MSK.

عندما نقوم بتشغيل تطبيق المستهلك في الحساب أ ، فإن أداة إلغاء التسلسل (GlueSchemaRegistryKafkaDeserializer) من مكتبة Schema Registry SerDe المقدمة حيث يستخرج التكوين معرف المخطط المشفر من الرسالة المقروءة من موضوع كافكا (unicorn-ride-request-topic) ويحصل على المخطط لنفس المعرف من سجل المخطط المركزي في الحساب ب. ثم يقوم بإلغاء تسلسل الرسالة. يستخدم دور IAM (بيانات الاعتماد المؤقتة) في الحساب B والمنطقة المقدمة كتكوين للاتصال بسجل المخطط المركزي. يقوم تطبيق المستهلك أيضًا بتكوين ملفات Avro SPECIFIC_RECORD لإبلاغ جهاز إلغاء التسلسل أن الرسالة من نوع معين (طلب ركوب وحيد القرن). بعد إلغاء تسلسل الرسالة بنجاح ، يقوم تطبيق المستهلك بمعالجتها وفقًا للمتطلبات.

تنظيف

الخطوة الأخيرة هي التنظيف. لتجنب الرسوم غير الضرورية ، يجب عليك إزالة جميع الموارد التي تم إنشاؤها بواسطة مكدسات CloudFormation المستخدمة لهذا المنشور. إن أبسط طريقة للقيام بذلك هي حذف الحزم. قم أولاً بحذف ملف MSKClusterStack تليها MSKClientStack من الحساب أ. ثم احذف ملف SchemaRegistryStack من الحساب ب.

وفي الختام

في هذا المنشور ، أوضحنا كيفية استخدام AWS Glue Schema Registry مع Amazon MSK وتطبيقات المعالجة المتدفقة للتحقق من صحة الرسائل باستخدام مخطط Avro. لقد أنشأنا بنية موزعة حيث يوجد Schema Registry في حساب AWS مركزي (حساب بحيرة البيانات) ويوجد منتج كافكا وتطبيقات المستهلك في حساب AWS منفصل. قمنا بإنشاء مخطط Avro في سجل المخطط في الحساب المركزي لجعله فعالاً لفرق التطبيق للحفاظ على المخططات في مكان واحد. نظرًا لأن AWS Glue Schema Registry يدعم سياسات الوصول القائمة على الهوية ، فقد استخدمنا دور IAM عبر الحسابات للسماح لمنتج Kafka وتطبيقات المستهلك التي تعمل في حساب منفصل بالوصول الآمن إلى المخطط من الحساب المركزي للتحقق من صحة الرسائل. نظرًا لأنه تم الاتفاق على مخطط Avro مسبقًا ، استخدمنا Avro SpecificRecord لضمان أمان النوع في وقت التجميع وتجنب مشكلات التحقق من صحة مخطط وقت التشغيل من جانب العميل. الرمز المستخدم لهذا المنشور متاح في GitHub جيثب: كمرجع.

لمعرفة المزيد حول الخدمات والموارد في هذا الحل ، يرجى الرجوع إلى سجل مخطط AWS Glueأطلقت حملة دليل مطور Amazon MSKأطلقت حملة مكتبة SerDe لسجل مخطط الغراء في AWSو برنامج IAM التعليمي: تفويض الوصول عبر حسابات AWS باستخدام أدوار IAM.


عن المؤلف

فيكاس باجاج هو مهندس حلول رئيسي في Amazon Web Service. تعمل Vikas مع العملاء الأصليين الرقميين وتقدم لهم المشورة بشأن هندسة التكنولوجيا والنمذجة ، والخيارات والحلول لتلبية أهداف العمل الاستراتيجية. إنه يتأكد من أن التصميمات والحلول فعالة ومستدامة ومناسبة للغرض لاحتياجات العمل الحالية والمستقبلية. بصرف النظر عن مناقشات الهندسة المعمارية والتكنولوجيا ، فإنه يستمتع بمشاهدة الكريكيت ولعبه.

بقعة_صورة

أحدث المعلومات الاستخباراتية

بقعة_صورة

الدردشة معنا

أهلاً! كيف يمكنني مساعدك؟