Zephyrnet Logosu

Gerçek Zamanlı Verileri İşlemek için Açık Kaynak Veri Hattı Oluşturmak İçin Kafka Connect Nasıl Kullanılır

Tarih:

Gerçek Zamanlı Verileri İşlemek için Açık Kaynak Veri Hattı Oluşturmak İçin Kafka Connect Nasıl Kullanılır

Bu makale, yalnızca saf açık kaynak teknolojilerini kullanarak gerçek zamanlı bir veri hattının nasıl oluşturulacağını gösterir. Bunlara Kafka Connect, Apache Kafka, Kibana ve daha fazlası dahildir.


By Paul Brebner, Instaclustr'da Teknoloji Evangelisti

Kafka Connect, Kafka'yı diğer veri teknolojileriyle eşleştirmeyi oldukça zahmetsiz hale getiren, özellikle güçlü bir açık kaynaklı veri akış aracıdır. Dağıtılmış bir teknoloji olarak Kafka Connect, Kafka kümelerinden bağımsız olarak özellikle yüksek kullanılabilirlik ve esnek ölçeklendirme sunar. Kafka konularına veri göndermek için kaynak veya havuz bağlayıcılarını kullanan Kafka Connect, kod gerektirmeden birden fazla Kafka dışı teknolojiyle entegrasyona olanak tanır.

Resim

Pek çok popüler veri teknolojisi için sağlam açık kaynak Kafka bağlayıcıları ve kendi bağlantınızı yazma fırsatı mevcuttur. Bu makale, Kafka'dan gerçek zamanlı akış verilerini Elasticsearch (dizine alınmış Kafka kayıtlarının ölçeklenebilir aramasını etkinleştirmek için) ve Kibana (bunları görselleştirmek için) ile entegre etmek için Kafka Connect'in nasıl kullanılacağına ilişkin gerçek dünyadaki, gerçek veri kullanım durumunu ele almaktadır. sonuçlar). 

Resim

Kafka ve Kafka Connect'in avantajlarını vurgulayan ilginç bir kullanım örneği için şunlardan ilham aldım: CDC'nin COVID-19 veri izleyicisi. Kafka özellikli izleyici, birden çok konumdan, birden çok formatta ve birden çok protokol kullanarak gerçek zamanlı COVID test verilerini toplar ve bu olayları kolayca tüketilebilir, görselleştirilmiş sonuçlara dönüştürür. İzleyici aynı zamanda sonuçların hızla ulaştığından ve güvenilir olduğundan emin olmak için gerekli veri yönetimine de sahiptir.

Benzer şekilde karmaşık ve zorlayıcı bir kullanım senaryosu aramaya başladım; ancak ideal olarak pandemiden daha az endişe verici bir kullanım senaryosu. Sonunda, halka açık akış REST API'lerini ve basit bir JSON formatında zengin verileri içeren ilginç bir alanla karşılaştım: Ay gelgitleri.
 
 

Ay gelgit verileri

 
Gelgitler, gezegenin yörüngedeki ayın altında aynı noktaya kadar tamamen döndüğü 24 saat 50 dakikalık bir dönem olan ay gününü takip eder. Her ay gününde, ayın çekim kuvvetinin neden olduğu iki gelgit ve iki gelgit yaşanır:

şekil
Kaynak 1 Ulusal Okyanus ve Atmosfer İdaresi

 

The Ulusal Okyanus ve Atmosfer İdaresi (NOAA) küresel gelgit istasyonlarından ayrıntılı sensör verilerini almayı kolaylaştıran bir REST API sağlar. 

Resim

Örneğin, aşağıdaki REST çağrısı istasyon kimliğini, veri tipini (deniz seviyesini seçtim) ve veriyi (ortalama deniz seviyesi) belirtir ve metrik birimler cinsinden daha yeni tek sonucu ister: 

https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8724580&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json


Bu çağrı, istasyonun enlem ve boylamını, saati ve su seviyesi değerini içeren bir JSON sonucu döndürür. Döndürülen sonuçların veri türünü, verilerini ve birimlerini anlamak için çağrınızın ne olduğunu hatırlamanız gerektiğini unutmayın!
 

{"meta veriler": { "id":"8724580", "name":"Key West", "lat":"24.5508”, "lon":"-81.8081"}, "data":[{ "t" :"2020-09-24 04:18", "v":"0.597", "s":"0.005", "f":"1,0,0,0", "q":"p"} ]}


Veri ardışık düzenini başlatma (REST kaynak bağlayıcısıyla)

 
Kafka Connect akış verileri ardışık düzenini oluşturmaya başlamak için öncelikle bir Kafka kümesi ve bir Kafka Connect kümesi hazırlamamız gerekir. 

Resim

Daha sonra aşağıdaki gibi bir REST bağlayıcı tanıtıyoruz: bu mevcut açık kaynak olanı. Bunu bir AWS S3 klasörüne dağıtacağız (bunları kullanın) talimatlar gerekirse). Ardından Kafka Connect kümesine S3 kümesini kullanmasını, küme içinde görünür olacak şekilde eşitlemesini, bağlayıcıyı yapılandırmasını ve son olarak onu çalıştırmasını söyleyeceğiz. Bu "BYOC" (Kendi Bağlayıcınızı Getirin) yaklaşımı, özel ihtiyaçlarınızı karşılayan bir bağlayıcı bulmak için sınırsız seçeneğe sahip olmanızı sağlar.

Resim

Aşağıdaki örnek, %100 açık kaynaklı bir Kafka Connect dağıtımını REST API kullanacak şekilde yapılandırmak için "curl" komutunun kullanımını gösterir. Kendi dağıtımınızla eşleşecek şekilde URL'yi, adı ve şifreyi değiştirmeniz gerekeceğini unutmayın:

curl https://connectorClusterIP:8083/connectors -k -u name:password -X POST -H 'Content-Type: application/json' -d ' { "name": "source_rest_tide_1", "config": { "anahtar .converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.storage.StringConverter", "connector.class": "com.tm.kafka .connect.rest.RestSourceConnector", "tasks.max": "1", "rest.source.poll.interval.ms": "600000", "rest.source.method": "GET", "rest.source .url": "https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8454000&product=water_level&datum=msl&units=metric&×_zone=gmt&application=instaclustr&format=json", "rest.source.headers ": "Content-Type:application/json,Accept:application/json", "rest.source.topic.selector": "com.tm.kafka.connect.rest.selector.SimpleTopicSelector", "rest.source.destination .topics": "gelgit-konusu" } }


Bu kod tarafından oluşturulan bağlayıcı görevi, REST API'sini 10 dakikalık aralıklarla yoklar ve sonucu "tides-topic" Kafka konusuna yazar. Verileri bu şekilde toplamak için rastgele beş toplam gelgit sensörünün seçilmesiyle, gelgit verileri artık beş konfigürasyon ve beş konektör aracılığıyla gelgit konusunu dolduruyor.

Resim
 
 

İşlem hattını sonlandırma (Elasticsearch havuz konektörüyle)

 
Bu gelgit verilerine gidecek bir yer sağlamak için üretim hattının sonuna bir Elasticsearch kümesi ve Kibana ekleyeceğiz. Bir yapılandıracağız açık kaynak Elasticsearch havuz konektörü Elasticsearch'e veri göndermek için.

Resim

Aşağıdaki örnek yapılandırma havuz adını, sınıfı, Elasticsearch dizinini ve Kafka konumuzu kullanır. Zaten bir dizin yoksa, varsayılan eşlemelere sahip bir dizin oluşturulacaktır. 

curl https://connectorClusterIP:8083/connectors -k -u name:password -X POST -H 'Content-Type: application/json' -d ' { "name" : "elastic-sink-tides", "config" : { "connector.class" : "com.datamountaineer.streamreactor.connect.elastic7.ElasticSinkConnector", "tasks.max" : 3, "topics" : "tides", "connect.elastic.hosts" : ”ip", "connect.elastic.port": 9201, "connect.elastic.kcql": "tides-index'e GÖZALDIĞINI EKLEYİN tides-topic'ten SEÇ *", "connect.elastic.use.http.username": ”elasticName", " connect.elastic.use.http.password" : ”elasticPassword" } }'


Boru hattı şu anda çalışır durumda. Ancak, varsayılan indeks eşlemeleri nedeniyle Gelgit indeksine gelen tüm gelgit verileri bir dizedir.

Resim

Zaman serisi verilerimizin grafiğini doğru bir şekilde çizmek için özel eşleme gereklidir. Aşağıdaki gelgit endeksi için bu özel eşlemeyi, özel tarih için JSON "t" alanını, çift olarak "v"yi ve toplama için anahtar kelime olarak "ad"ı kullanarak oluşturacağız:

curl -u elasticName:elasticPassword ”elasticURL:9201/tides-index" -X PUT -H 'Content-Type: application/json' -d' { "mappings" : { "properties" : { "data" : { "properties " : { "t" : { "type" : "date", "format" : "yyyy-AA-gg SS:dd" }, "v" : { "type" : "double" }, "f" : { "type" : "text" }, "q" : { "type" : "text" }, "s" : { "type" : "text" } } }, "meta veriler" : { "properties" : { "id" : { "type" : "text" }, "lat" : { "type" : "text" }, "long" : { "type" : "text" }, "name" : { "type" : ”anahtar kelime" } }}}} }'


Elasticsearch'ün "yeniden indekslenmesi" (dizinin silinmesi ve tüm verilerin yeniden indekslenmesi), genellikle bir Elasticsearch indeks eşlemesini her değiştirdiğinizde gereklidir. Veriler, bu kullanım örneğinde olduğu gibi mevcut bir Kafka havuz bağlayıcısından yeniden oynatılabilir veya Elasticsearch kullanılarak elde edilebilir. reindex çalışma. 

 
 

Kibana ile verileri görselleştirme

 
Gelgit verilerini görselleştirmek için öncelikle Kibana'da zaman filtresi alanı olarak "t" yapılandırılmış bir indeks modeli oluşturacağız. Daha sonra çizgi grafik türünü seçerek bir görselleştirme oluşturacağız. Son olarak grafik ayarlarını, y ekseni 30 dakikadaki ortalama gelgit seviyesini, x ekseni ise zaman içindeki verileri gösterecek şekilde yapılandıracağız. 

Sonuç, boru hattının veri topladığı beş örnek istasyon için gelgitlerdeki değişikliklerin bir grafiğidir:

Resim

Sonuçlar

 
Gelgitlerin periyodik doğası görselleştirmemizde açıkça görülüyor; her ay gününde iki yüksek gelgit meydana geliyor.

Resim

Daha da şaşırtıcı olanı, yüksek ve alçak gelgit arasındaki aralık, her küresel istasyonda farklıdır. Bunun nedeni sadece ayın değil, güneşin, yerel coğrafyanın, hava durumunun ve iklim değişikliğinin etkisidir. Bu örnek Kafka Connect işlem hattı, görselleştirmelerin gücünü yararlı bir şekilde göstermek için Kafka, Elasticsearch ve Kibana'dan yararlanıyor: genellikle ham verilerin ortaya çıkaramayacağını ortaya çıkarabilirler!

 
Bio: Paul Brebner Apache Cassandra, Apache Spark, OpenSearch, Redis ve Apache Kafka gibi açık kaynak teknolojilerinin yönetilen hizmet platformunu sağlayan Instaclustr'da Teknoloji Evangelistidir.

İlgili:


Plato Ai. Web3 Yeniden Düşünüldü. Güçlendirilmiş Veri Zekası.
Erişmek için buraya tıklayın.

Kaynak: https://www.kdnuggets.com/2021/07/kafka-open-source-data-pipeline-processing-real-time-data.html

spot_img

En Son İstihbarat

spot_img

Bizimle sohbet

Merhaba! Size nasıl yardım edebilirim?