شعار زيفيرنت

كيفية استخدام Kafka Connect لإنشاء خط أنابيب بيانات مفتوح المصدر لمعالجة البيانات في الوقت الفعلي

التاريخ:

كيفية استخدام Kafka Connect لإنشاء خط أنابيب بيانات مفتوح المصدر لمعالجة البيانات في الوقت الفعلي

توضح لك هذه المقالة كيفية إنشاء مسار بيانات في الوقت الفعلي باستخدام تقنيات مفتوحة المصدر فقط. من بينها كافكا كونيكت وأباتشي كافكا وكيبانا وغيرها.


By بول بريبنر، مبشر التكنولوجيا في Instaclustr

يعد كافكا كونيكت أداة تدفق بيانات قوية ومفتوحة المصدر بشكل خاص تجعل من السهل جدًا إقران كافكا بتقنيات البيانات الأخرى. كتقنية موزعة ، يوفر Kafka Connect توافرًا عاليًا بشكل خاص وقياسًا مرنًا بشكل مستقل عن مجموعات كافكا. باستخدام موصلات المصدر أو الحوض لإرسال البيانات من وإلى موضوعات كافكا ، يتيح كافكا كونيكت التكامل مع العديد من التقنيات غير التابعة لكافكا دون الحاجة إلى تعليمات برمجية.

صورة

تتوفر موصلات كافكا القوية مفتوحة المصدر للعديد من تقنيات البيانات الشائعة ، وكذلك الفرصة لكتابة بنفسك. تتناول هذه المقالة حالة استخدام بيانات حقيقية في العالم الحقيقي لكيفية استخدام Kafka Connect لدمج بيانات التدفق في الوقت الفعلي من كافكا مع Elasticsearch (لتمكين البحث القابل للتوسع في سجلات كافكا المفهرسة) و Kibana (من أجل تصور تلك النتائج). 

صورة

بالنسبة لحالة استخدام مثيرة للاهتمام تسلط الضوء على مزايا كافكا وكافكا كونيكت ، فقد ألهمتني متتبع بيانات COVID-19 الخاص بمركز السيطرة على الأمراض. يجمع المتتبع الممكّن من كافكا بيانات اختبار COVID في الوقت الفعلي من مواقع متعددة ، بتنسيقات متعددة وباستخدام بروتوكولات متعددة ، ويعالج تلك الأحداث إلى نتائج مرئية سهلة الاستهلاك. يمتلك المتعقب أيضًا إدارة البيانات اللازمة للتأكد من وصول النتائج بسرعة ويمكن الوثوق بها.

بدأت في البحث عن حالة استخدام مماثلة معقدة ومقنعة - ولكن من الناحية المثالية حالة أقل خطورة من الوباء. في النهاية ، صادفت مجالًا مثيرًا للاهتمام ، والذي تضمن دفق REST APIs وبيانات غنية بتنسيق JSON بسيط: المد والجزر القمري.
 
 

بيانات المد القمري

 
يتبع المد والجزر اليوم القمري ، وهي فترة مدتها 24 ساعة و 50 دقيقة يدور خلالها الكوكب بالكامل إلى نفس النقطة تحت القمر الذي يدور حوله. كل يوم قمري له مد وجزر مرتفعان واثنان من المد والجزر الناجمين عن جاذبية القمر:

الشكل
المصدر 1 الإدارة الوطنية للمحيطات والغلاف الجوي

 

الإدارة الوطنية للمحيطات والغلاف الجوي (NOAA) يوفر REST API الذي يجعل من السهل استرداد بيانات المستشعر المفصلة من محطات المد والجزر العالمية. 

صورة

على سبيل المثال ، تحدد مكالمة REST التالية معرف المحطة ونوع البيانات (اخترت مستوى سطح البحر) والمرجع (متوسط ​​مستوى سطح البحر) ، وتطلب النتيجة الأكثر حداثة بالوحدات المترية: 

https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8724580&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json


تقوم هذه المكالمة بإرجاع نتيجة JSON مع خط الطول وخط العرض للمحطة والوقت وقيمة مستوى المياه. لاحظ أنه يجب أن تتذكر ماهية مكالمتك من أجل فهم نوع البيانات والمرجع ووحدات النتائج التي تم إرجاعها!
 

{"البيانات الوصفية": {"id": "8724580"، "الاسم": "Key West"، "lat": "24.5508"، "lon": "- 81.8081"}، "data": [{"t" : "2020-09-24 04:18"، "v": "0.597"، "s": "0.005"، "f": "1,0,0,0،XNUMX،XNUMX،XNUMX"، "q": "p"} ]}


بدء خط أنابيب البيانات (باستخدام موصل مصدر REST)

 
لبدء إنشاء خط تدفق بيانات كافكا كونيكت ، يجب علينا أولاً تحضير كتلة كافكا ومجموعة كافكا كونيكت. 

صورة

بعد ذلك ، نقدم موصل REST ، مثل هذا متاح مفتوح المصدر واحد. سنقوم بنشرها في حاوية AWS S3 (استخدمها تعليمات إذا لزم الأمر). ثم سنطلب من مجموعة Kafka Connect استخدام حاوية S3 ، ومزامنتها لتكون مرئية داخل المجموعة ، وتهيئة الموصل ، وأخيراً تشغيلها. يضمن أسلوب "BYOC" (إحضار الموصل الخاص بك) أن لديك خيارات غير محدودة للعثور على موصل يلبي احتياجاتك الخاصة.

صورة

يوضح المثال التالي استخدام أمر "curl" لتكوين نشر كافكا كونيكت مفتوح المصدر بنسبة 100٪ لاستخدام واجهة برمجة تطبيقات REST. لاحظ أنك ستحتاج إلى تغيير عنوان URL والاسم وكلمة المرور لمطابقة النشر الخاص بك:

curl https: // connectorClusterIP: 8083 / connectors -k -u name: password -X POST -H 'Content-Type: application / json' -d '{"name": "source_rest_tide_1"، "config": {"key .converter ":" org.apache.kafka.connect.storage.StringConverter "،" value.converter ":" org.apache.kafka.connect.storage.StringConverter "،" connector.class ":" com.tm.kafka .connect.rest.RestSourceConnector "، "asks.max": "1"، "rest.source.poll.interval.ms": "600000"، "rest.source.method": "GET"، "rest.source .url ":" https://api.tidesandcurrents.noaa.gov/api/prod/datagetter؟date=latest&station=8454000&product=water_level&datum=msl&units=metric&×_zone=gmt&application=instaclustr&format=json ". ":" نوع المحتوى: application / json ، Accept: application / json "،" rest.source.topic.selector ":" com.tm.kafka.connect.rest.selector.SimpleTopicSelector "،" rest.source.destination .topics ":" tides-topic "}}


تقوم مهمة الموصل التي تم إنشاؤها بواسطة هذا الرمز باستقصاء واجهة برمجة تطبيقات REST في فواصل زمنية مدتها 10 دقائق ، وتكتب النتيجة إلى موضوع كافكا "المد والجزر". من خلال الاختيار العشوائي لخمسة أجهزة استشعار للمد والجزر لجمع البيانات بهذه الطريقة ، تملأ بيانات المد والجزر الآن موضوع المد والجزر عبر خمسة تكوينات وخمسة موصلات.

صورة
 
 

إنهاء خط الأنابيب (مع موصل بالوعة Elasticsearch)

 
لإعطاء بيانات المد والجزر في مكان ما للذهاب إليه ، سنقدم مجموعة Elasticsearch و Kibana في نهاية خط الأنابيب. سنقوم بتكوين ملف موصل بالوعة Elasticsearch مفتوح المصدر لإرسال Elasticsearch البيانات.

صورة

يستخدم نموذج التكوين التالي اسم الحوض والفئة وفهرس Elasticsearch وموضوع كافكا. إذا كان الفهرس غير موجود بالفعل ، فسيتم إنشاء فهرس بالتعيينات الافتراضية. 

curl https: // connectorClusterIP: 8083 / connectors -k -u name: password -X POST -H 'Content-Type: application / json' -d '{"name": "elastic-sink-tides"، "config" : {"connector.class": "com.datamountaineer.streamreactor.connect.elastic7.ElasticSinkConnector"، "features.max": 3، "مواضيع": "tides"، "connect.elastic.hosts": ”ip"، "connect.elastic.port": 9201، "connect.elastic.kcql": "INSERT IGNORE INTO Tides-index SELECT * FROM Tides-topic"، "connect.elastic.use.http.username": "elasticName"، " connect.elastic.use.http.password ":" elasticPassword "}} '


خط الأنابيب يعمل الآن. ومع ذلك ، فإن جميع بيانات المد والجزر التي تصل إلى فهرس المد والجزر هي سلسلة ، بسبب تعيينات الفهرس الافتراضية.

صورة

مطلوب تعيين مخصص لرسم بيانات السلاسل الزمنية الخاصة بنا بشكل صحيح. سننشئ هذا التعيين المخصص لفهرس المد والجزر أدناه ، باستخدام حقل JSON "t" للتاريخ المخصص ، و "v" كمزدوج ، و "الاسم" ككلمة رئيسية للتجميع:

curl -u elasticName: elasticPassword ”elasticURL: 9201 / tides-index" -X PUT -H 'Content-Type: application / json' -d '{"mappings": {"properties": {"data": {"properties ": {" t ": {" type ":" date "،" format ":" yyyy-MM-dd HH: mm "}،" v ": {" type ":" double "}،" f ": {"type": "text"}، "q": {"type": "text"}، "s": {"type": "text"}}}، "metadata": {"properties": { "id": {"type": "text"}، "lat": {"type": "text"}، "long": {"type": "text"}، "name": {"type" : ”الكلمة الرئيسية"}}}}} '


عادة ما يكون Elasticsearch "reindexing" (حذف الفهرس وإعادة فهرسة جميع البيانات) مطلوبًا في كل مرة تقوم فيها بتغيير تعيين فهرس Elasticsearch. يمكن إعادة البيانات إما من موصل حوض كافكا الحالي ، كما فعلنا في حالة الاستخدام هذه ، أو الحصول عليها باستخدام Elasticsearch الفهرسة العملية. 

 
 

تصور البيانات مع Kibana

 
لتصور بيانات المد ، سنقوم أولاً بإنشاء نمط فهرس في Kibana ، مع تكوين "t" كحقل مرشح الوقت. سنقوم بعد ذلك بإنشاء تصور ، واختيار نوع الرسم البياني الخطي. أخيرًا ، سنقوم بتكوين إعدادات الرسم البياني بحيث يعرض المحور الصادي متوسط ​​مستوى المد على مدى 30 دقيقة ويظهر المحور السيني تلك البيانات بمرور الوقت. 

والنتيجة هي رسم بياني للتغييرات في المد والجزر لمحطات العينات الخمس التي يجمع خط الأنابيب البيانات منها:

صورة

النتائج

 
من الواضح أن الطبيعة الدورية للمد والجزر تظهر في تصوراتنا ، حيث يحدث مدان مرتفعان في كل يوم قمري.

صورة

والأكثر إثارة للدهشة ، أن النطاق بين المد والجزر المرتفع والمنخفض يختلف في كل محطة عالمية. هذا لا يرجع إلى تأثيرات الشمس فقط ، ولكن بسبب الشمس والجغرافيا المحلية والطقس والتغير المناخي. يستخدم خط أنابيب كافكا هذا المثال كافكا وإليستسيرتش وكيبانا لإثبات قوة التصورات بشكل مفيد: يمكنها في كثير من الأحيان الكشف عما لا تستطيع البيانات الخام!

 
السيرة الذاتية: بول بريبنر هو المبشر التكنولوجي في Instaclustr ، والذي يوفر منصة خدمة مُدارة لتقنيات مفتوحة المصدر مثل Apache Cassandra و Apache Spark و OpenSearch و Redis و Apache Kafka.

هذا الموضوع ذو علاقة بـ:


أفلاطون. Web3 مُعاد تصوره. تضخيم ذكاء البيانات.
انقر هنا للوصول.

المصدر: https://www.kdnuggets.com/2021/07/kafka-open-source-data-pipeline-processing-real-time-data.html

بقعة_صورة

أحدث المعلومات الاستخباراتية

بقعة_صورة