Kafka CLI Komutları

Kafka'da; topic oluşturma, silme, listeleme, detaylarını gösterme, mesaj produce etme, mesaj consume etme, consumer gruplarını oluşturma, silme ve inspect etme gibi  en çok kullanılan CLI komutlarından bazıları hakkında bilgi edineceğiz.

1. Kafka Topic CLI Komutları

Kafka Topics CLI, yani kafka-topics, Kafka'da bir topic oluşturmak, silmek, detaylarını görüntülemek veya değiştirmek için kullanılır.

ℹ️
Windows için kafka-topics.bat, Linux ve Mac için kafka-topics.sh gibi platformunuz için uygun uzantılarla CLI komutları kullanmalısınız.
ℹ️
Kafka'nın v2.2+ sürümüne sahipseniz, komutları --bootstrap-server seçeneğiyle kullanmanızı kesinlikle öneririz. Çünkü Zookeeper seçeneği artık kullanımdan kaldırılmıştır.

1.1 Topic Oluşturma

Bir Kafka topic oluşturmak için şu zorunlu parametreleri sağlamanız gerekiyor:

  • Kafka v2.2+ ise, Kafka hostname ve port bilgisi. Ör. localhost:9092
  • Kafka'nın daha eski bir sürümüyse, Zookeeper URL ve port bilgisi. Ör. localhost:2181
  • topic adı, partition sayısı ve replikasyon faktörü.
  • --create parametresi

Örnek

Kafka broker localhost:9092'de çalışırken 3 partition ve 1 replikasyon faktörü ile first_topic isimli bir topic oluşturalım.

kafka-topics.sh --bootstrap-server localhost:9092 --topic first_topic --create --partitions 3 --replication-factor 1

Komut Çıktısı

WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic first_topic.

Dikkat Edilmesi Gerekenler

kafka-topics.sh --create komutuyla ilgili yaygın hatalar ve uyarılar şunlardır:

  • Sahip olduğunuz broker sayısından daha büyük bir replikasyon faktörü belirleyemezsiniz.
  • İstediğiniz kadar partition belirtebilirsiniz fakat 3 partition başlamak için iyi bir sayıdır.
  • Bir komutu çalıştırdıktan sonra man/help çıktısını görürseniz bu, komutunuzun yanlış olduğu anlamına gelir. Hata mesajıyla ilgili ayrıntıları görmek için mesajın başıını okumalısınız.
  • Partitionlar ve çoğaltma faktörü için varsayılan dğer yoktur ve bunları açıkça belirtmeniz gerekir.
  • Topic adı yalnızca ASCII alfanümerikleri, '.', '_' ve '-' içermelidir.

Ekstra Komut Parametreleri

  • --config: Topic düzeyinde yapılandırmalar ayarlayabilirsiniz. örneğin --config max.message.bytes=64000. Tüm config seçenekleri için tıklayınız.
  • --disable-rack-aware: Rack aware replika atamasını devre dışı bırakır (önerilmez, yalnızca ne yaptığınızı biliyorsanız ayarlayın)

1.2 Topicleri Listeleme

Kafka topiclerini listelemek için şu zorunlu parametreleri sağlamanız gerekiyor:

  • Kafka v2.2+ ise, Kafka hostname ve port bilgisi. Ör. localhost:9092
  • Kafka'nın daha eski bir sürümüyse, Zookeeper URL ve port bilgisi. localhost:2181
  • --list parametresi

Örnek

Kafka broker localhost:9092'de çalışırken tüm topicleri listelemek istiyoruz.

kafka-topics.sh --bootstrap-server localhost:9092 --list

Komut Çıktısı

__consumer_offsets
first_topic
second_topic
test

Dikkat Edilmesi Gerekenler

kafka-topics.sh --list komutuyla ilgili yaygın hatalar ve uyarılar şunlardır:

  • Bu komut, sizin oluşturmadığınız dahili topicleri de listeler (__consumer_offsets gibi). Bu topicleri silmeye çalışmayın. Bu topicleri gizlemek istiyorsanız --exclude-internal parametresini kullanın
  • Bir komutu çalıştırdıktan sonra man/help çıktısını görürseniz bu, komutunuzun yanlış olduğu anlamına gelir. Hata mesajıyla ilgili ayrıntıları görmek için mesajın başıını okumalısınız.

Ekstra Komut Parametreleri

  • --exclude-internal: list veya describe komutunu çalıştırırken internal topicleri hariç tutar (kullanmazsanız varsayılan olarak listelenirler)
  • --at-min-isr-partitions: Topicleri describe ederken ayarlanırsa, yalnızca isr sayısı parametre ile verilen minimum değere eşit olan partitionları gösterir.
  • --unavailable-partitions: Yalnızca lideri mevcut olmayan partitionları göster.
  • --under-min-isr-partitions: Yalnızca isr sayısı yapılandırılan minimum değerden az olan partitionları gösterir.
  • --under-replicated-partitions: Yalnızca replika edilenlerin altındaki partionları gösterir

1.3 Topic Detaylarını Gösterme

Kafka topic detaylarını görüntülemek için şu zorunlu parametreleri sağlamanız gerekiyor:

  • Kafka v2.2+ ise, Kafka hostname ve port bilgisi. Ör. localhost:9092
  • Kafka'nın daha eski bir sürümüyse, Zookeeper URL ve port bilgisi. localhost:2181
  • --describe parametresi

Örnek

Kafka broker localhost:9092'de çalışırken first_topic isimli topic'in detaylarını görmek istiyoruz. Tek seferde birden fazla topic detayını görmek için virgülle ayrılmış bir topic listesi de verebilirsiniz.

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic first_topic

Komut çıktısı:

Topic: first_topic	TopicId: fN9mf1UDSmiCdKIDFyMEIQ	PartitionCount: 3	ReplicationFactor: 1	Configs: cleanup.policy=delete
Topic: first_topic	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
Topic: first_topic	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
Topic: first_topic	Partition: 2	Leader: 1	Replicas: 1	Isr: 1
  • Leader: 1:  Partition 0 için ID'si 1 olan broker'ın lider olduğu anlamına gelir.
  • Replicas: 1: Partition 0 için ID'si 1 olan broker'ın bir replika olduğu anlamına gelir.
  • Isr: 1, Partition 0 için ID'si 1 olan broker'ın in-sync bir replika olduğu anlamına gelir.

Dikkat Edilmesi Gerekenler

kafka-topics.sh --describekomutuyla ilgili yaygın hatalar ve uyarılar şunlardır:

  • Partition ID'sini ve broker ID'sini birbirine karıştırabilirsiniz. Komut çıktısında sayıların ne anlama geldiğini anlamak için yukarıdaki Komut Çıktısı bölümüne bakın.
  • Bir komutu çalıştırdıktan sonra man/help çıktısını görürseniz bu, komutunuzun yanlış olduğu anlamına gelir. Hata mesajıyla ilgili ayrıntıları görmek için mesajın başıını okumalısınız.
  • Bir topic listesini describe etmek için virgülle ayrılmış bir liste verebilirsiniz.
  • Bir --topic seçeneği belirtmezseniz, tüm topicleri describe edersiniz.

Ekstra Komut Parametreleri

  • --topic: Eğer parametre olarak geçmezseniz tüm topicleri describe edersiniz
  • --at-min-isr-partitions: Topicleri describe ederken ayarlanırsa, yalnızca isr sayısı parametre ile verilen minimum değere eşit olan partitionları gösterir.
  • --unavailable-partitions: Yalnızca lideri mevcut olmayan partitionları göster.
  • --under-min-isr-partitions: Yalnızca isr sayısı yapılandırılan minimum değerden az olan partitionları gösterir.
  • --under-replicated-partitions: Yalnızca replika edilenlerin altındaki partionları gösterir

1.4 Bir Topic İçindeki Partition Sayısını Artırma

Bir topic içindeki partition sayısını artırmak için şu zorunlu parametreleri sağlamanız gerekiyor:

  • Kafka v2.2+ ise, Kafka hostname ve port bilgisi. Ör. localhost:9092
  • Kafka'nın daha eski bir sürümüyse, Zookeeper URL ve port bilgisi. Ör. localhost:2181
  • topic adı ve partition sayısı
  • --alter parametresi
ℹ️
Uygulamalarınız key tabanlı sıralamaya dayanıyorsa, bir Kafka topicteki partition sayısını artırmak TEHLİKELİ BİR İŞLEMdir. Böyle bir durumda, keylerin düzgün bir şekilde yeniden dağıtılması için yeni bir topic oluşturmalı ve tüm verileri oraya kopyalamalısınız.

Örnek

Kafka broker localhost:9092'de çalışırken first_topic isimli topic'in partition sayısını 5 yapmak istiyoruz.

kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic first_topic --partitions 5

Komut çıktısı:

Komut çıktısı mevcut değildir. --describe komutuyla güncel durumu kontrol edebilirsiniz.

Dikkat Edilmesi Gerekenler

kafka-topics.sh --alterkomutuyla ilgili yaygın hatalar ve uyarılar şunlardır:

  • Partition sayısını değiştirmek key hashing tekniğini değiştirdiğinden, consumerlarınızın key tabanlı sıralamaya güvendiği senaryolarda bu komutun çalıştırılması ÖNERİLMEZ.
  • Yalnızca partitionlar ekleyebilirsiniz, partitionları kaldıramazsınız

1.5 Topic Silme

Bir topic'i silmek için şu zorunlu parametreleri sağlamanız gerekiyor:

  • Kafka v2.2+ ise, Kafka hostname ve port bilgisi. Ör. localhost:9092
  • Kafka'nın daha eski bir sürümüyse, Zookeeper URL ve port bilgisi. Ör. localhost:2181
  • --delete parametresi
  • Kafka aracılarının topic silmeye izin verdiğinden emin olmak. delete.topic.enable=true varsayılan olarak izin verir.

Örnek

Kafka broker localhost:9092'de çalışırken first_topic isimli topic'i silmek istiyoruz.

kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first_topic

Komut çıktısı:

Komut çıktısı mevcut değildir. --describe komutuyla güncel durumu kontrol edebilirsiniz. Silme işlemi biraz zaman alacabileceğinden belirli bir süre sonra kontrol ediniz.

Dikkat Edilmesi Gerekenler

kafka-topics.sh --deletekomutuyla ilgili yaygın hatalar ve uyarılar şunlardır:

  • Topic silmenin etkinleştirilmemesi durumunda (delete.topic.enable broker ayarına bakın), topicler "silinmek üzere işaretlenir" ancak silinmez
  • Kafka'da bir topic'i silmek biraz zaman alabilir ve bu nedenle kafka-topics komutu, topic silinmeden önce bile boş bir çıktı döndürür.
  • Bir topic listesini silmek için virgülle ayrılmış bir liste gönderebilirsiniz.

2. Kafka Producer CLI Komutları

Kafka console producer CLI, kafka-console-producer inputtan veri okumak ve Kafka'ya produce etmek için kullanılır.

ℹ️
Windows için kafka-console-producer.bat, Linux ve Mac için kafka-console-producer.sh gibi platformunuz için uygun uzantılarla CLI komutları kullanmalısınız.
ℹ️
Kafka'nın v2.5+ sürümüne sahipseniz, komutları --bootstrap-server seçeneğiyle, daha eski bir versiyona sahipseniz --broker-list seçeneğiyle çalıştırmalısınız.

2.1 Kafka Topic İçin Mesaj Produce  Etmek

Bir topic'e herhangi bir mesaj produce etmek için şu zorunlu parametreleri sağlamanız gerekiyor:

  • Kafka hostname ve port bilgisi. Ör. localhost:9092
  • Topic adı

Örnek

first_topic isimli topic'in 3 partition ve 1 replikasyon faktörü ile oluşturulduğundan emin olun:

kafka-topics.sh --bootstrap-server localhost:9092 --topic first_topic --create --partitions 3 --replication-factor 1

first_topic için produce etmeye başlayın:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first_topic

Kafka console producer'dan çıkmak için Ctrl+C klavye kombinasyonunu kullanabilirsiniz.

Komut çıktısı:

>

Producer açıldıktan sonra > işaretini görmelisiniz. Daha sonra yazdığınız herhangi bir metin satırı Kafka topic'e gönderilecektir (Enter'a basıldığında)

$ kafka-console-producer --bootstrap-server localhost:9092 --topic first_topic
>Merhaba
>Burası Kerteriz Blog
>Kafka producer ile topic'e mesaj produce ediyoruz.
>^C  (<- Ctrl + C producer'dan çıkmak için kullanılır)

Dikkat Edilmesi Gerekenler

kafka-console-producer.shkomutuyla ilgili yaygın hatalar ve uyarılar şunlardır:

  • Mesajlar varsayılan olarak null key ile gönderilir (daha fazla seçenek için aşağıya bakın)
  • Topic yoksa, Kafka tarafından otomatik olarak oluşturulabilir
  • Sağlanan ada sahip bir topic bulunmalıdır. Henüz var olmayan bir topic belirtirseniz, belirtilen ada sahip yeni bir topic, varsayılan partition sayısı ve replikayon faktörü ile oluşturulacaktır. Bu, aşağıdaki varsayılan değerlerle broker tarafındaki (config/server.properties dosyanızdaki) ayarlarla kontrol edilir:
auto.create.topics.enable=true
num.partitions=1
default.replication.factor=1

Ekstra Komut Parametreleri

  • --compression-codec: Varsayılan değeri gzip olan mesaj sıkıştırmayı etkinleştirir. Opsiyonel değerler; 'none', 'gzip', 'snappy', 'lz4' ve 'zstd'
  • --producer-property: acks=all ayarı gibi herhangi bir producer özelliğini iletebilirsiniz
  • --request-required-acks: Acks ayarını doğrudan yapmak için bir alternatif

2.2 Kafka Topic İçin Bir Dosya İçeriğinden Mesaj Produce  Etmek

Örnek dosya topic-input.txt olsun. Ayrıca her iletinin yeni bir satırda olduğundan emin olun.

Merhaba
Burası Kerteriz Blog

Dosyadan topic'e mesaj produce edin:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first_topic < topic-input.txt

2.3 Kafka Topic İçin Bir Key İle Mesaj Produce  Etmek

Varsayılan olarak, bir Kafka topic'e gönderilen iletiler, null keylere sahip olurlar.

Key değerini mesajların yanında göndermek için parse.key ve key.separator parametrelerini kullanmalıyız.

Bu örnekte, key ve değer arasındaki ayırıcı: :

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first_topic --property parse.key=true --property key.separator=:

Örnek input:

>example key:example value
>name:İsmet

Key/value ayırıcınızı her zaman dahil etmeyi unutmayın, aksi takdirde bir hata alırsınız.

3. Kafka Consumer CLI Komutları

Kafka console consumer CLI, kafka-console-consumer Kafka'dan veri okumak için kullanılır.

ℹ️
Windows için kafka-console-consumer.bat, Linux ve Mac için kafka-console-consumer.sh gibi platformunuz için uygun uzantılarla CLI komutları kullanmalısınız.
ℹ️
Kafka v0.10'dan bu yana consumer, Zookeeper'dan değil, bir Kafka bağlantı dizesinden yararlanıyor. Bunun nedeni, consumer offsetlerinin nasıl depolandığıyla alakalıdır. --zookeeper parametresini kullanarak gördüğünüz tüm yazılar güncelliğini yitirmiş kabul edilmelidir ve siz de asla --zookeeper parametresi kullanmamalısınız.

3.1 Kafka Topic'den Mesaj Consume Etmek

Bir topic'den mesaj consume etmek için şu zorunlu parametreleri sağlamanız gerekiyor:

  • Kafka hostname ve port bilgisi. Ör. localhost:9092
  • Topic adı
  • Geçmiş verileri okumanız gerekiyorsa --from-beginning parametresi. Aksi halde sadece bağlandıktan sonra produce edilen mesajları consume edersiniz.

Örnek

first_topic isimli topic'den sadece bağlandığınız andan itibaren produce edilen mesajları consume etmek için:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic

first_topic isimli topic'den tüm historical mesajları consume etmek için:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --from-beginning

Komut çıktısı:

Merhaba
Burası Kerteriz Blog
Kafka producer ile topic'e mesaj produce ediyoruz
^C Processed a total of 3 messages  (<-- Ctrl+C  kombinasyonu ile çıktığınızda)

Kafka console consumer, siz ondan çıkana kadar açık kalacak ve mesajları ekranda göstermeye devam edecektir. Gelen tüm mesajların metin (String) olarak seri hale getirilebileceğini varsayar.

Varsayılan olarak, Kafka console consumer key veya herhangi bir partition bilgisini göstermez.

Herhangi bir çıktı görmüyorsanız ancak Kafka topic'inizin içinde veri olduğunu biliyorsanız, --from-beginning seçeneğini kullanmayı unutmayın.

ℹ️
Mesajların sırası topic değil, partition başınadır. Bir topic birden fazla partition ile oluşturulabileceğinden, sıralama yalnızca partition düzeyinde garanti edilir. Yalnızca bir partition'a sahipseniz, daima produce ettiğiniz sırayla consumer edersiniz. Fakat birden fazla partition olduğunda mesaj sırası produce ettiğiniz sıradan farklı olacaktır.

Dikkat Edilmesi Gerekenler

kafka-console-consumer.shkomutuyla ilgili yaygın hatalar ve uyarılar şunlardır:

  • Mesajlar varsayılan olarak key veya meta veri bilgilerini göstermez
  • Bir kafka-console-consumer başlattığınızda, --from-beginning parametresini vermediğiniz sürece, yalnızca o andan itibaren gelen mesajlar görüntülenecek ve okunacaktır.
  • Topic mevcut değilse, console consumer onu otomatik olarak varsayılan olarak oluşturacaktır.
  • Virgülle ayrılmış bir liste veya bir pattern ile aynı anda birden çok topic consume edebilirsiniz.
  • Bir consumer grubu kimliği belirtilmezse, kafka-console-consumer rastgele bir consumer grubu oluşturur.
  • İletiler sırayla görünmüyorsa, sıralamanın topic düzeyinde değil, partition düzeyinde olduğunu hatırlayın.

Ekstra Komut Parametreleri

  • --from-beginning: Tüm geçmiş mesajları okumak için
  • --formatter: Mesajları belirli bir formatta görüntülemek için
  • --consumer-property: allow.auto.create.topics ayarı gibi herhangi bir consumer ayarını iletmek için
  • --group: Varsayılan olarak rastgele bir consumer grubu kimliği seçilir, ancak bu parametreyi kullanarak bunu geçersiz kılabilirsiniz.
  • --max-messages: Çıkmadan önce tüketilecek mesaj sayısı
  • --partition: Yalnızca belirli bir partitiondan consume etmek için

3.2 Kafka Topic'den Key/Value Formatında Mesaj Consume Etmek

Varsayılan olarak, console consumer yalnızca Kafka kaydının değerini gösterir. Bu komutu kullanarak hem key hem de değeri gösterebilirsiniz.

kafka.tools.DefaultMessageFormatter formatter ve print.timestamp=true, print.key=true ve print.value=true parametrelerini kullanarak mesaj consume edelim:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --from-beginning

CreateTime:1641810588071	null	merhaba
CreateTime:1641823304170	name	İsmet
CreateTime:1641823301294	example key	example value

Aşağıdakiler gibi daha fazla parametre de mevcuttur:

  • print.partition
  • print.offset
  • print.headers
  • key.separator
  • line.separator
  • headers.separator

4. Kafka Consumer Grup CLI Komutları

Kafka console consumer CLI, bir consumer grup içinde yer alan consumerlar için de kafka-console-consumer komutuyla Kafka'dan veri okumak için kullanılır.

ℹ️
Windows için kafka-console-consumer.bat, Linux ve Mac için kafka-console-consumer.sh gibi platformunuz için uygun uzantılarla CLI komutları kullanmalısınız.
ℹ️
Bu bölümdeki komutlar da ekstra --group parametresini kullanacağız.

4.1 Bir Kafka Consumer Grup İçinde Consumer Oluşturma

Bir consumer grup içinde consumer oluşturmak ve bu bölümdeki örnekleri gerçekleştirmek için şu adımları gerçekleştirin:

  • En az 2 partition içeren bir topic oluşturun ve ona veri gönderin
  • İlk kafka-console-consumer'ı oluşturun ve --group ile bir grup adı atayın
  • Yeni bir terminal/shell penceresi açın
  • İkinci bir kafka-console-consumer oluşturun ve aynı --group değişkenini kullanın
  • Topic'e veri gönderin ve consumerların okumaları paylaştığını görün.

Örnek

Bir grupta, Kafka topic'inizdeki partitionlardan daha fazla consumer'a sahip olamazsınız ve bu nedenle önce birkaç partition içeren bir Kafka topicoluşturmamız gerekiyor.

kafka-topics.sh --bootstrap-server localhost:9092 --topic first_topic --create --partitions 3 --replication-factor 1

Ardından, my-first-application adlı bir consumer grubunda bir consumer başlatın.

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --group my-first-application 

Yeni bir terminal/shell penceresi açın ve my-first-application ile aynı consumer grubunda ikinci bir consumer başlatın (tamamen aynı komutu kullandığımızı unutmayın)

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --group my-first-application 

Yeni bir terminal/shell penceresi açın ve my-first-application ile aynı consumer grubunda üçüncü bir consumer başlatın

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --group my-first-application 

my-first-application consumer grubundaki her consumer'a bir partition atanır. Topic içinde birkaç dizi mesaj üretin.

$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first_topic
>first message
>second message
>third message
>fourth message

Her consumer, yalnızca kendisine atanan partition'da üretilen iletileri gösterecektir.

Bir consumer'ı durdurursanız, consumer rebalance gerçekleştirildiğinden, gelen mesajlar kalan consumerlar'a gönderilmeye başlanır.

Eğer tüm consumerları durdurursak ve mesaj produce etmeye devam edersek

>eigth message
>ninth message
>tenth message

Gruptaki bir consumer'ın yeniden başlatılması üzerine, o consumer en son commit edilen offsetleri okuyacak ve yalnızca az önce oluşturduğunuz mesajları okuyacaktır.

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --group my-first-application

eigth message
ninth message
tenth message

Dikkat Edilmesi Gerekenler

  • --group parametresini kullanarak bir consumer grubunda consume ederseniz ve daha sonra aynı grupla --from-beginning seçeneğini kullanmayı denerseniz, bu parametre dikkate alınmaz. Bunun yerine consumer gruplarınızın offsetini bir sonraki başlıkta gösterildiği gibi sıfırlamanız gerekir.
  • Bir --group seçeneği belirtmezseniz, consumer'ın consumer grubu, console-consumer-11984 gibi rastgele bir consumer grubu olacaktır.
  • Tüm mesajları alan bir consumer görürseniz, bu muhtemelen topic'inizin yalnızca kafka-topics --describe komutuyla doğrulayabileceğiniz 1 partitionla oluşturulduğu anlamına gelir.

4.2 Bir Kafka Consumer Grup Resetleme

Bir Kafka consumer grubunu sıfırlamak için şunları yapmalıyız:

  • Kafka hostname ve port bilgisi. Ör. localhost:9092
  • Offset resetleme stratejisi
  • Ofset sıfırlama stratejisini anlayın (to earliest, to latest, to specific offset, shift by...)
  • Çalışan consumer gruplarını durdurun (aksi takdirde komut başarısız olur)
  • --reset-offsets parametresi

Örnek

İlk olarak, consumerların durdurulmasını sağlayın. --describe ile "has no active members" mesajını görmelisiniz.

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application

Consumer group 'my-first-application' has no active members.

Consumer grubunuz için geçerli ofsetleri gözlemleyin (yukarıdakiyle aynı komut)

GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
my-first-application first_topic     0          3               3               0               -               -               -
my-first-application first_topic     1          5               5               0               -               -               -
my-first-application first_topic     2          6               6               0               -               -               -

Topic'i tamamen yeniden okumak için offsetleri ilk başlangıç konuma sıfırlayacağız

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --to-earliest --execute --topic first_topic

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
my-first-application           first_topic                    0          0
my-first-application           first_topic                    1          0
my-first-application           first_topic                    2          0

Gördüğünüz gibi, tüm partitionlar için o consumer grubu yeni ofseti 0 oldu. Bu, o gruptaki bir consumer'ı yeniden başlattığınızda, her partition'ın başından itibaren okuyacağı anlamına gelir:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --group my-first-application
third message
fifth message
seventh message
tenth message
first message
fourth message
eigth message
hello
world
second message
sixth message
ninth message

Mesajların topicler için değil, her partitionlar için sırayla okunduğunu unutmayın (bu örnekte 3 bölümümüz var).

Ek olarak, shift ile, ofsetleri belirli bir değere kadar geri veya ileri alabilirsiniz (negatif, mesajlarda geri gitmek için, pozitif, mesajlarda ilerlemek için kullanılır).

Önce tüm consumerların durduğundan emin olalım.

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application

Consumer group 'my-first-application' has no active members.
GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
my-first-application first_topic     0          3               3               0               -               -               -
my-first-application first_topic     1          5               5               0               -               -               -
my-first-application first_topic     2          6               6               0               -               -               -

Ardından, first_topic isimli topic'i subscribe eden my-first-application isimli consumer grubu için offseti -2 ile kaydıralım.

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --shift-by -2 --execute --topic first_topic

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
my-first-application           first_topic                    0          1
my-first-application           first_topic                    1          3
my-first-application           first_topic                    2          4

Gördüğünüz gibi offsetler her bölüm için 2 azaldı. Kafka console consumer CLI'ı kullanarak first_topic isimli topic'deki iletileri okuyalım. Topic'in her partition'ından yalnızca son 2 mesajın geldiğine dikkat edin.

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --group my-first-application
seventh message
tenth message
fourth message
eigth message
sixth message
ninth message

Dikkat Edilmesi Gerekenler

kafka-consumer-groups.sh komutuyla ilgili yaygın hatalar ve uyarılar şunlardır:

  • İçinde consumerlar etkinse, bir consumer grubunu sıfırlayamazsınız.
  • Bu komut, bir consumer grubu için verileri yeniden işlemek için kullanılabilir (bir hata düzeltmeniz olması durumunda)
  • Bu komut ayrıca Kafka'da mesaj tüketimini ilerletmek için de kullanılabilir (örneğin, bir mesaj formatı bozuksa (poison pill) veya consumer tüm topic'i consume etmek için çok yavaşsa).

Ekstra Komut Parametreleri

  • --all-groups: İlgili komutu tüm gruplar için geçerli kılar, dikkatli kullanın
  • --all-topics: Offsetleri resetleme sürecinde bir gruba atanan tüm topicleri göz önünde bulundurur, dikkatli kullanın
  • --by-duration: Süreye göre ofsetleri sıfırlar
  • --dry-run: Yalnızca beklenen sonucu gösterir, ancak asıl komutu çalıştırmaz
  • --to-datetime, --by-period, --to-earliest, --to-latest, --shift-by, --from-file, --to-current: Offsetleri sıfırlamak için kullanabileceğiniz tüm seçenekler

4.3 Bir Kafka Consumer Grup İçindeki Tüm Consumerları Listeleme

Bir consumer grubtaki tüm Kafka consumerları listelemek, consumerlarınızın ağınızda nereye yerleştirildiğini ve consume konusuna ne kadar dahil olduklarını anlamanıza yardımcı olur.

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application

GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                          HOST            CLIENT-ID
my-first-application first_topic     0          3               3               0               consumer-my-first-application-1-0237b0a1-911d-45f1-891f-8fd7630a7593 /172.19.0.1     consumer-my-first-application-1
my-first-application first_topic     1          5               5               0               consumer-my-first-application-1-70ddc756-8dbc-45e4-b5b6-1e5a75db9e62 /172.19.0.1     consumer-my-first-application-1
my-first-application first_topic     2          6               6               0               consumer-my-first-application-1-a8ce2af3-97b3-4445-bba3-f4a5f6c4464d /172.19.0.1     consumer-my-first-application-1
  • CONSUMER ID, consumer'ın benzersiz kimliğini Kafka broker için temsil eder.
  • CLIENT ID, consumer gruplarınızdaki bir consumer'ı tanımlamak için isteğe bağlı olarak ayarlayabileceğiniz client-side ayar (client.id consumer özelliği ile)
  • CURRENT-OFFSET, o grup için en son kaydedilen offsettir.
  • LOG-END-OFFSET, consume için topic partition'ında mevcut olan en son mesaj offsetini temsil eder.
  • LAG, LOG-END-OFFSET ve CURRENT-OFFSET arasındaki farktır ve bir consumer'ın bir topic'in kuyruğunda ne kadar geride olduğunu gösterir.
  • HOST, consumer client makinesinin hostname / IP'sidir.

4.4 Tüm Kafka Consumer Grupları Listeleme

Consumer gruplarını listelemek, hangilerinin arızalı veya sorunsuz olup olmadığını anlamanıza yardımcı olur.

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --state
GROUP                STATE
my-first-application Stable

Tüm consumer gruplarını ve durumunu detaylıca önizleyebilirsiniz

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups  --state

GROUP                     COORDINATOR (ID)          ASSIGNMENT-STRATEGY  STATE           #MEMBERS
my-first-application      127.0.0.1:9092 (1)        range                Stable          3

--all-groups kullanmak yerine sadece bir grup için komut belirtebilirsiniz.

4.5 Tüm Kafka Consumer Grubu Silme

Okuma mekanizmasını tamamen sıfırlamak için bir consumer grubunu silmek isteyebilirsiniz. Bunun için delete parametresini kullanabilirsiniz:

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-first-application
Deletion of requested consumer groups ('my-first-application') was successful.

Alternatif olarak, yalnızca belirli bir topic için offsetleri silmek istiyorsanız (consumer grubunuz birden fazla topic'ten okurken faydalıdır), aşağıdaki komutu kullanabilirsiniz:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group my-first-application --topic first_topic
Request succeed for deleting offsets with topic first_topic group my-first-application

TOPIC                          PARTITION       STATUS
first_topic                    0               Successful
first_topic                    1               Successful
first_topic                    2               Successful