Kafka CLI Komutları
Kafka'da; topic oluşturma, silme, listeleme, detaylarını gösterme, mesaj produce etme, mesaj consume etme, consumer gruplarını oluşturma, silme ve inspect etme gibi en çok kullanılan CLI komutlarından bazıları hakkında bilgi edineceğiz.
1. Kafka Topic CLI Komutları
Kafka Topics CLI, yani kafka-topics
, Kafka'da bir topic oluşturmak, silmek, detaylarını görüntülemek veya değiştirmek için kullanılır.
kafka-topics.bat
, Linux ve Mac için kafka-topics.sh
gibi platformunuz için uygun uzantılarla CLI komutları kullanmalısınız.--bootstrap-server
seçeneğiyle kullanmanızı kesinlikle öneririz. Çünkü Zookeeper seçeneği artık kullanımdan kaldırılmıştır.1.1 Topic Oluşturma
Bir Kafka topic oluşturmak için şu zorunlu parametreleri sağlamanız gerekiyor:
- Kafka v2.2+ ise, Kafka hostname ve port bilgisi. Ör.
localhost:9092
- Kafka'nın daha eski bir sürümüyse, Zookeeper URL ve port bilgisi. Ör.
localhost:2181
- topic adı, partition sayısı ve replikasyon faktörü.
--create
parametresi
Örnek
Kafka broker localhost:9092
'de çalışırken 3 partition ve 1 replikasyon faktörü ile first_topic
isimli bir topic oluşturalım.
kafka-topics.sh --bootstrap-server localhost:9092 --topic first_topic --create --partitions 3 --replication-factor 1
Komut Çıktısı
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic first_topic.
Dikkat Edilmesi Gerekenler
kafka-topics.sh --create
komutuyla ilgili yaygın hatalar ve uyarılar şunlardır:
- Sahip olduğunuz broker sayısından daha büyük bir replikasyon faktörü belirleyemezsiniz.
- İstediğiniz kadar partition belirtebilirsiniz fakat
3
partition başlamak için iyi bir sayıdır. - Bir komutu çalıştırdıktan sonra man/help çıktısını görürseniz bu, komutunuzun yanlış olduğu anlamına gelir. Hata mesajıyla ilgili ayrıntıları görmek için mesajın başıını okumalısınız.
- Partitionlar ve çoğaltma faktörü için varsayılan dğer yoktur ve bunları açıkça belirtmeniz gerekir.
- Topic adı yalnızca ASCII alfanümerikleri, '
.
', '_
' ve '-
' içermelidir.
Ekstra Komut Parametreleri
--config
: Topic düzeyinde yapılandırmalar ayarlayabilirsiniz. örneğin--config max.message.bytes=64000
. Tüm config seçenekleri için tıklayınız.--disable-rack-aware
: Rack aware replika atamasını devre dışı bırakır (önerilmez, yalnızca ne yaptığınızı biliyorsanız ayarlayın)
1.2 Topicleri Listeleme
Kafka topiclerini listelemek için şu zorunlu parametreleri sağlamanız gerekiyor:
- Kafka v2.2+ ise, Kafka hostname ve port bilgisi. Ör.
localhost:9092
- Kafka'nın daha eski bir sürümüyse, Zookeeper URL ve port bilgisi.
localhost:2181
--list
parametresi
Örnek
Kafka broker localhost:9092
'de çalışırken tüm topicleri listelemek istiyoruz.
kafka-topics.sh --bootstrap-server localhost:9092 --list
Komut Çıktısı
__consumer_offsets
first_topic
second_topic
test
Dikkat Edilmesi Gerekenler
kafka-topics.sh --list
komutuyla ilgili yaygın hatalar ve uyarılar şunlardır:
- Bu komut, sizin oluşturmadığınız dahili topicleri de listeler (
__consumer_offsets
gibi). Bu topicleri silmeye çalışmayın. Bu topicleri gizlemek istiyorsanız--exclude-internal
parametresini kullanın - Bir komutu çalıştırdıktan sonra man/help çıktısını görürseniz bu, komutunuzun yanlış olduğu anlamına gelir. Hata mesajıyla ilgili ayrıntıları görmek için mesajın başıını okumalısınız.
Ekstra Komut Parametreleri
--exclude-internal
: list veya describe komutunu çalıştırırken internal topicleri hariç tutar (kullanmazsanız varsayılan olarak listelenirler)--at-min-isr-partitions
: Topicleri describe ederken ayarlanırsa, yalnızca isr sayısı parametre ile verilen minimum değere eşit olan partitionları gösterir.--unavailable-partitions
: Yalnızca lideri mevcut olmayan partitionları göster.--under-min-isr-partitions
: Yalnızca isr sayısı yapılandırılan minimum değerden az olan partitionları gösterir.--under-replicated-partitions
: Yalnızca replika edilenlerin altındaki partionları gösterir
1.3 Topic Detaylarını Gösterme
Kafka topic detaylarını görüntülemek için şu zorunlu parametreleri sağlamanız gerekiyor:
- Kafka v2.2+ ise, Kafka hostname ve port bilgisi. Ör.
localhost:9092
- Kafka'nın daha eski bir sürümüyse, Zookeeper URL ve port bilgisi.
localhost:2181
--describe
parametresi
Örnek
Kafka broker localhost:9092
'de çalışırken first_topic
isimli topic'in detaylarını görmek istiyoruz. Tek seferde birden fazla topic detayını görmek için virgülle ayrılmış bir topic listesi de verebilirsiniz.
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic first_topic
Komut çıktısı:
Topic: first_topic TopicId: fN9mf1UDSmiCdKIDFyMEIQ PartitionCount: 3 ReplicationFactor: 1 Configs: cleanup.policy=delete
Topic: first_topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: first_topic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: first_topic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Leader: 1
: Partition 0 için ID'si 1 olan broker'ın lider olduğu anlamına gelir.Replicas: 1
: Partition 0 için ID'si 1 olan broker'ın bir replika olduğu anlamına gelir.Isr: 1
, Partition 0 için ID'si 1 olan broker'ın in-sync bir replika olduğu anlamına gelir.
Dikkat Edilmesi Gerekenler
kafka-topics.sh --describe
komutuyla ilgili yaygın hatalar ve uyarılar şunlardır:
- Partition ID'sini ve broker ID'sini birbirine karıştırabilirsiniz. Komut çıktısında sayıların ne anlama geldiğini anlamak için yukarıdaki Komut Çıktısı bölümüne bakın.
- Bir komutu çalıştırdıktan sonra man/help çıktısını görürseniz bu, komutunuzun yanlış olduğu anlamına gelir. Hata mesajıyla ilgili ayrıntıları görmek için mesajın başıını okumalısınız.
- Bir topic listesini describe etmek için virgülle ayrılmış bir liste verebilirsiniz.
- Bir
--topic
seçeneği belirtmezseniz, tüm topicleri describe edersiniz.
Ekstra Komut Parametreleri
--topic
: Eğer parametre olarak geçmezseniz tüm topicleri describe edersiniz--at-min-isr-partitions
: Topicleri describe ederken ayarlanırsa, yalnızca isr sayısı parametre ile verilen minimum değere eşit olan partitionları gösterir.--unavailable-partitions
: Yalnızca lideri mevcut olmayan partitionları göster.--under-min-isr-partitions
: Yalnızca isr sayısı yapılandırılan minimum değerden az olan partitionları gösterir.--under-replicated-partitions
: Yalnızca replika edilenlerin altındaki partionları gösterir
1.4 Bir Topic İçindeki Partition Sayısını Artırma
Bir topic içindeki partition sayısını artırmak için şu zorunlu parametreleri sağlamanız gerekiyor:
- Kafka v2.2+ ise, Kafka hostname ve port bilgisi. Ör.
localhost:9092
- Kafka'nın daha eski bir sürümüyse, Zookeeper URL ve port bilgisi. Ör.
localhost:2181
- topic adı ve partition sayısı
--alter
parametresi
Örnek
Kafka broker localhost:9092
'de çalışırken first_topic
isimli topic'in partition sayısını 5
yapmak istiyoruz.
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic first_topic --partitions 5
Komut çıktısı:
Komut çıktısı mevcut değildir. --describe
komutuyla güncel durumu kontrol edebilirsiniz.
Dikkat Edilmesi Gerekenler
kafka-topics.sh --alter
komutuyla ilgili yaygın hatalar ve uyarılar şunlardır:
- Partition sayısını değiştirmek key hashing tekniğini değiştirdiğinden, consumerlarınızın key tabanlı sıralamaya güvendiği senaryolarda bu komutun çalıştırılması ÖNERİLMEZ.
- Yalnızca partitionlar ekleyebilirsiniz, partitionları kaldıramazsınız
1.5 Topic Silme
Bir topic'i silmek için şu zorunlu parametreleri sağlamanız gerekiyor:
- Kafka v2.2+ ise, Kafka hostname ve port bilgisi. Ör.
localhost:9092
- Kafka'nın daha eski bir sürümüyse, Zookeeper URL ve port bilgisi. Ör.
localhost:2181
--delete
parametresi- Kafka aracılarının topic silmeye izin verdiğinden emin olmak.
delete.topic.enable=true
varsayılan olarak izin verir.
Örnek
Kafka broker localhost:9092
'de çalışırken first_topic
isimli topic'i silmek istiyoruz.
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first_topic
Komut çıktısı:
Komut çıktısı mevcut değildir. --describe
komutuyla güncel durumu kontrol edebilirsiniz. Silme işlemi biraz zaman alacabileceğinden belirli bir süre sonra kontrol ediniz.
Dikkat Edilmesi Gerekenler
kafka-topics.sh --delete
komutuyla ilgili yaygın hatalar ve uyarılar şunlardır:
- Topic silmenin etkinleştirilmemesi durumunda (
delete.topic.enable
broker ayarına bakın), topicler "silinmek üzere işaretlenir" ancak silinmez - Kafka'da bir topic'i silmek biraz zaman alabilir ve bu nedenle
kafka-topics
komutu, topic silinmeden önce bile boş bir çıktı döndürür. - Bir topic listesini silmek için virgülle ayrılmış bir liste gönderebilirsiniz.
2. Kafka Producer CLI Komutları
Kafka console producer CLI, kafka-console-producer
inputtan veri okumak ve Kafka'ya produce etmek için kullanılır.
kafka-console-producer.bat
, Linux ve Mac için kafka-console-producer.sh
gibi platformunuz için uygun uzantılarla CLI komutları kullanmalısınız.--bootstrap-server
seçeneğiyle, daha eski bir versiyona sahipseniz --broker-list
seçeneğiyle çalıştırmalısınız.2.1 Kafka Topic İçin Mesaj Produce Etmek
Bir topic'e herhangi bir mesaj produce etmek için şu zorunlu parametreleri sağlamanız gerekiyor:
- Kafka hostname ve port bilgisi. Ör.
localhost:9092
- Topic adı
Örnek
first_topic
isimli topic'in 3 partition ve 1 replikasyon faktörü ile oluşturulduğundan emin olun:
kafka-topics.sh --bootstrap-server localhost:9092 --topic first_topic --create --partitions 3 --replication-factor 1
first_topic
için produce etmeye başlayın:
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first_topic
Kafka console producer'dan çıkmak için Ctrl+C
klavye kombinasyonunu kullanabilirsiniz.
Komut çıktısı:
>
Producer açıldıktan sonra >
işaretini görmelisiniz. Daha sonra yazdığınız herhangi bir metin satırı Kafka topic'e gönderilecektir (Enter'a basıldığında)
$ kafka-console-producer --bootstrap-server localhost:9092 --topic first_topic
>Merhaba
>Burası Kerteriz Blog
>Kafka producer ile topic'e mesaj produce ediyoruz.
>^C (<- Ctrl + C producer'dan çıkmak için kullanılır)
Dikkat Edilmesi Gerekenler
kafka-console-producer.sh
komutuyla ilgili yaygın hatalar ve uyarılar şunlardır:
- Mesajlar varsayılan olarak
null
key ile gönderilir (daha fazla seçenek için aşağıya bakın) - Topic yoksa, Kafka tarafından otomatik olarak oluşturulabilir
- Sağlanan ada sahip bir topic bulunmalıdır. Henüz var olmayan bir topic belirtirseniz, belirtilen ada sahip yeni bir topic, varsayılan partition sayısı ve replikayon faktörü ile oluşturulacaktır. Bu, aşağıdaki varsayılan değerlerle broker tarafındaki (
config/server.properties
dosyanızdaki) ayarlarla kontrol edilir:
auto.create.topics.enable=true
num.partitions=1
default.replication.factor=1
Ekstra Komut Parametreleri
--compression-codec
: Varsayılan değerigzip
olan mesaj sıkıştırmayı etkinleştirir. Opsiyonel değerler; 'none
', 'gzip
', 'snappy
', 'lz4
' ve 'zstd
'--producer-property
:acks=all
ayarı gibi herhangi bir producer özelliğini iletebilirsiniz--request-required-acks
: Acks ayarını doğrudan yapmak için bir alternatif
2.2 Kafka Topic İçin Bir Dosya İçeriğinden Mesaj Produce Etmek
Örnek dosya topic-input.txt
olsun. Ayrıca her iletinin yeni bir satırda olduğundan emin olun.
Merhaba
Burası Kerteriz Blog
Dosyadan topic'e mesaj produce edin:
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first_topic < topic-input.txt
2.3 Kafka Topic İçin Bir Key İle Mesaj Produce Etmek
Varsayılan olarak, bir Kafka topic'e gönderilen iletiler, null
keylere sahip olurlar.
Key değerini mesajların yanında göndermek için parse.key
ve key.separator
parametrelerini kullanmalıyız.
Bu örnekte, key ve değer arasındaki ayırıcı: :
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first_topic --property parse.key=true --property key.separator=:
Örnek input:
>example key:example value
>name:İsmet
Key/value ayırıcınızı her zaman dahil etmeyi unutmayın, aksi takdirde bir hata alırsınız.
3. Kafka Consumer CLI Komutları
Kafka console consumer CLI, kafka-console-consumer
Kafka'dan veri okumak için kullanılır.
kafka-console-consumer.bat
, Linux ve Mac için kafka-console-consumer.sh
gibi platformunuz için uygun uzantılarla CLI komutları kullanmalısınız.--zookeeper
parametresini kullanarak gördüğünüz tüm yazılar güncelliğini yitirmiş kabul edilmelidir ve siz de asla --zookeeper
parametresi kullanmamalısınız.3.1 Kafka Topic'den Mesaj Consume Etmek
Bir topic'den mesaj consume etmek için şu zorunlu parametreleri sağlamanız gerekiyor:
- Kafka hostname ve port bilgisi. Ör.
localhost:9092
- Topic adı
- Geçmiş verileri okumanız gerekiyorsa -
-from-beginning
parametresi. Aksi halde sadece bağlandıktan sonra produce edilen mesajları consume edersiniz.
Örnek
first_topic
isimli topic'den sadece bağlandığınız andan itibaren produce edilen mesajları consume etmek için:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic
first_topic
isimli topic'den tüm historical mesajları consume etmek için:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --from-beginning
Komut çıktısı:
Merhaba
Burası Kerteriz Blog
Kafka producer ile topic'e mesaj produce ediyoruz
^C Processed a total of 3 messages (<-- Ctrl+C kombinasyonu ile çıktığınızda)
Kafka console consumer, siz ondan çıkana kadar açık kalacak ve mesajları ekranda göstermeye devam edecektir. Gelen tüm mesajların metin (String) olarak seri hale getirilebileceğini varsayar.
Varsayılan olarak, Kafka console consumer key veya herhangi bir partition bilgisini göstermez.
Herhangi bir çıktı görmüyorsanız ancak Kafka topic'inizin içinde veri olduğunu biliyorsanız, --from-beginning
seçeneğini kullanmayı unutmayın.
Dikkat Edilmesi Gerekenler
kafka-console-consumer.sh
komutuyla ilgili yaygın hatalar ve uyarılar şunlardır:
- Mesajlar varsayılan olarak key veya meta veri bilgilerini göstermez
- Bir
kafka-console-consumer
başlattığınızda,--from-beginning
parametresini vermediğiniz sürece, yalnızca o andan itibaren gelen mesajlar görüntülenecek ve okunacaktır. - Topic mevcut değilse, console consumer onu otomatik olarak varsayılan olarak oluşturacaktır.
- Virgülle ayrılmış bir liste veya bir pattern ile aynı anda birden çok topic consume edebilirsiniz.
- Bir consumer grubu kimliği belirtilmezse,
kafka-console-consumer
rastgele bir consumer grubu oluşturur. - İletiler sırayla görünmüyorsa, sıralamanın topic düzeyinde değil, partition düzeyinde olduğunu hatırlayın.
Ekstra Komut Parametreleri
--from-beginning
: Tüm geçmiş mesajları okumak için--formatter
: Mesajları belirli bir formatta görüntülemek için--consumer-property
:allow.auto.create.topics
ayarı gibi herhangi bir consumer ayarını iletmek için--group
: Varsayılan olarak rastgele bir consumer grubu kimliği seçilir, ancak bu parametreyi kullanarak bunu geçersiz kılabilirsiniz.--max-messages
: Çıkmadan önce tüketilecek mesaj sayısı--partition
: Yalnızca belirli bir partitiondan consume etmek için
3.2 Kafka Topic'den Key/Value Formatında Mesaj Consume Etmek
Varsayılan olarak, console consumer yalnızca Kafka kaydının değerini gösterir. Bu komutu kullanarak hem key hem de değeri gösterebilirsiniz.
kafka.tools.DefaultMessageFormatter
formatter ve print.timestamp=true
, print.key=true
ve print.value=true
parametrelerini kullanarak mesaj consume edelim:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --from-beginning
CreateTime:1641810588071 null merhaba
CreateTime:1641823304170 name İsmet
CreateTime:1641823301294 example key example value
Aşağıdakiler gibi daha fazla parametre de mevcuttur:
print.partition
print.offset
print.headers
key.separator
line.separator
headers.separator
4. Kafka Consumer Grup CLI Komutları
Kafka console consumer CLI, bir consumer grup içinde yer alan consumerlar için de kafka-console-consumer
komutuyla Kafka'dan veri okumak için kullanılır.
kafka-console-consumer.bat
, Linux ve Mac için kafka-console-consumer.sh
gibi platformunuz için uygun uzantılarla CLI komutları kullanmalısınız.--group
parametresini kullanacağız.4.1 Bir Kafka Consumer Grup İçinde Consumer Oluşturma
Bir consumer grup içinde consumer oluşturmak ve bu bölümdeki örnekleri gerçekleştirmek için şu adımları gerçekleştirin:
- En az 2 partition içeren bir topic oluşturun ve ona veri gönderin
- İlk
kafka-console-consumer
'ı oluşturun ve--group
ile bir grup adı atayın - Yeni bir terminal/shell penceresi açın
- İkinci bir
kafka-console-consumer
oluşturun ve aynı--group
değişkenini kullanın - Topic'e veri gönderin ve consumerların okumaları paylaştığını görün.
Örnek
Bir grupta, Kafka topic'inizdeki partitionlardan daha fazla consumer'a sahip olamazsınız ve bu nedenle önce birkaç partition içeren bir Kafka topicoluşturmamız gerekiyor.
kafka-topics.sh --bootstrap-server localhost:9092 --topic first_topic --create --partitions 3 --replication-factor 1
Ardından, my-first-application
adlı bir consumer grubunda bir consumer başlatın.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --group my-first-application
Yeni bir terminal/shell penceresi açın ve my-first-application
ile aynı consumer grubunda ikinci bir consumer başlatın (tamamen aynı komutu kullandığımızı unutmayın)
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --group my-first-application
Yeni bir terminal/shell penceresi açın ve my-first-application
ile aynı consumer grubunda üçüncü bir consumer başlatın
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --group my-first-application
my-first-application
consumer grubundaki her consumer'a bir partition atanır. Topic içinde birkaç dizi mesaj üretin.
$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first_topic
>first message
>second message
>third message
>fourth message
Her consumer, yalnızca kendisine atanan partition'da üretilen iletileri gösterecektir.
Bir consumer'ı durdurursanız, consumer rebalance gerçekleştirildiğinden, gelen mesajlar kalan consumerlar'a gönderilmeye başlanır.
Eğer tüm consumerları durdurursak ve mesaj produce etmeye devam edersek
>eigth message
>ninth message
>tenth message
Gruptaki bir consumer'ın yeniden başlatılması üzerine, o consumer en son commit edilen offsetleri okuyacak ve yalnızca az önce oluşturduğunuz mesajları okuyacaktır.
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --group my-first-application
eigth message
ninth message
tenth message
Dikkat Edilmesi Gerekenler
--group
parametresini kullanarak bir consumer grubunda consume ederseniz ve daha sonra aynı grupla--from-beginning
seçeneğini kullanmayı denerseniz, bu parametre dikkate alınmaz. Bunun yerine consumer gruplarınızın offsetini bir sonraki başlıkta gösterildiği gibi sıfırlamanız gerekir.- Bir
--group
seçeneği belirtmezseniz, consumer'ın consumer grubu,console-consumer-11984
gibi rastgele bir consumer grubu olacaktır. - Tüm mesajları alan bir consumer görürseniz, bu muhtemelen topic'inizin yalnızca
kafka-topics --describe
komutuyla doğrulayabileceğiniz 1 partitionla oluşturulduğu anlamına gelir.
4.2 Bir Kafka Consumer Grup Resetleme
Bir Kafka consumer grubunu sıfırlamak için şunları yapmalıyız:
- Kafka hostname ve port bilgisi. Ör.
localhost:9092
- Offset resetleme stratejisi
- Ofset sıfırlama stratejisini anlayın (to earliest, to latest, to specific offset, shift by...)
- Çalışan consumer gruplarını durdurun (aksi takdirde komut başarısız olur)
--reset-offsets
parametresi
Örnek
İlk olarak, consumerların durdurulmasını sağlayın. --describe
ile "has no active members" mesajını görmelisiniz.
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application
Consumer group 'my-first-application' has no active members.
Consumer grubunuz için geçerli ofsetleri gözlemleyin (yukarıdakiyle aynı komut)
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-first-application first_topic 0 3 3 0 - - -
my-first-application first_topic 1 5 5 0 - - -
my-first-application first_topic 2 6 6 0 - - -
Topic'i tamamen yeniden okumak için offsetleri ilk başlangıç konuma sıfırlayacağız
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --to-earliest --execute --topic first_topic
GROUP TOPIC PARTITION NEW-OFFSET
my-first-application first_topic 0 0
my-first-application first_topic 1 0
my-first-application first_topic 2 0
Gördüğünüz gibi, tüm partitionlar için o consumer grubu yeni ofseti 0 oldu. Bu, o gruptaki bir consumer'ı yeniden başlattığınızda, her partition'ın başından itibaren okuyacağı anlamına gelir:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --group my-first-application
third message
fifth message
seventh message
tenth message
first message
fourth message
eigth message
hello
world
second message
sixth message
ninth message
Mesajların topicler için değil, her partitionlar için sırayla okunduğunu unutmayın (bu örnekte 3 bölümümüz var).
Ek olarak, shift
ile, ofsetleri belirli bir değere kadar geri veya ileri alabilirsiniz (negatif, mesajlarda geri gitmek için, pozitif, mesajlarda ilerlemek için kullanılır).
Önce tüm consumerların durduğundan emin olalım.
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application
Consumer group 'my-first-application' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-first-application first_topic 0 3 3 0 - - -
my-first-application first_topic 1 5 5 0 - - -
my-first-application first_topic 2 6 6 0 - - -
Ardından, first_topic
isimli topic'i subscribe eden my-first-application
isimli consumer grubu için offseti -2 ile kaydıralım.
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --shift-by -2 --execute --topic first_topic
GROUP TOPIC PARTITION NEW-OFFSET
my-first-application first_topic 0 1
my-first-application first_topic 1 3
my-first-application first_topic 2 4
Gördüğünüz gibi offsetler her bölüm için 2 azaldı. Kafka console consumer CLI'ı kullanarak first_topic
isimli topic'deki iletileri okuyalım. Topic'in her partition'ından yalnızca son 2 mesajın geldiğine dikkat edin.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --group my-first-application
seventh message
tenth message
fourth message
eigth message
sixth message
ninth message
Dikkat Edilmesi Gerekenler
kafka-consumer-groups.sh
komutuyla ilgili yaygın hatalar ve uyarılar şunlardır:
- İçinde consumerlar etkinse, bir consumer grubunu sıfırlayamazsınız.
- Bu komut, bir consumer grubu için verileri yeniden işlemek için kullanılabilir (bir hata düzeltmeniz olması durumunda)
- Bu komut ayrıca Kafka'da mesaj tüketimini ilerletmek için de kullanılabilir (örneğin, bir mesaj formatı bozuksa (poison pill) veya consumer tüm topic'i consume etmek için çok yavaşsa).
Ekstra Komut Parametreleri
--all-groups
: İlgili komutu tüm gruplar için geçerli kılar, dikkatli kullanın--all-topics
: Offsetleri resetleme sürecinde bir gruba atanan tüm topicleri göz önünde bulundurur, dikkatli kullanın--by-duration
: Süreye göre ofsetleri sıfırlar--dry-run
: Yalnızca beklenen sonucu gösterir, ancak asıl komutu çalıştırmaz--to-datetime
,--by-period
,--to-earliest
,--to-latest
,--shift-by
,--from-file
,--to-current
: Offsetleri sıfırlamak için kullanabileceğiniz tüm seçenekler
4.3 Bir Kafka Consumer Grup İçindeki Tüm Consumerları Listeleme
Bir consumer grubtaki tüm Kafka consumerları listelemek, consumerlarınızın ağınızda nereye yerleştirildiğini ve consume konusuna ne kadar dahil olduklarını anlamanıza yardımcı olur.
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-first-application
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-first-application first_topic 0 3 3 0 consumer-my-first-application-1-0237b0a1-911d-45f1-891f-8fd7630a7593 /172.19.0.1 consumer-my-first-application-1
my-first-application first_topic 1 5 5 0 consumer-my-first-application-1-70ddc756-8dbc-45e4-b5b6-1e5a75db9e62 /172.19.0.1 consumer-my-first-application-1
my-first-application first_topic 2 6 6 0 consumer-my-first-application-1-a8ce2af3-97b3-4445-bba3-f4a5f6c4464d /172.19.0.1 consumer-my-first-application-1
CONSUMER ID
, consumer'ın benzersiz kimliğini Kafka broker için temsil eder.CLIENT ID
, consumer gruplarınızdaki bir consumer'ı tanımlamak için isteğe bağlı olarak ayarlayabileceğiniz client-side ayar (client.id
consumer özelliği ile)CURRENT-OFFSET
, o grup için en son kaydedilen offsettir.LOG-END-OFFSET
, consume için topic partition'ında mevcut olan en son mesaj offsetini temsil eder.LAG
,LOG-END-OFFSET
veCURRENT-OFFSET
arasındaki farktır ve bir consumer'ın bir topic'in kuyruğunda ne kadar geride olduğunu gösterir.HOST
, consumer client makinesinin hostname / IP'sidir.
4.4 Tüm Kafka Consumer Grupları Listeleme
Consumer gruplarını listelemek, hangilerinin arızalı veya sorunsuz olup olmadığını anlamanıza yardımcı olur.
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --state
GROUP STATE
my-first-application Stable
Tüm consumer gruplarını ve durumunu detaylıca önizleyebilirsiniz
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups --state
GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
my-first-application 127.0.0.1:9092 (1) range Stable 3
--all-groups
kullanmak yerine sadece bir grup için komut belirtebilirsiniz.
4.5 Tüm Kafka Consumer Grubu Silme
Okuma mekanizmasını tamamen sıfırlamak için bir consumer grubunu silmek isteyebilirsiniz. Bunun için delete
parametresini kullanabilirsiniz:
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-first-application
Deletion of requested consumer groups ('my-first-application') was successful.
Alternatif olarak, yalnızca belirli bir topic için offsetleri silmek istiyorsanız (consumer grubunuz birden fazla topic'ten okurken faydalıdır), aşağıdaki komutu kullanabilirsiniz:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group my-first-application --topic first_topic
Request succeed for deleting offsets with topic first_topic group my-first-application
TOPIC PARTITION STATUS
first_topic 0 Successful
first_topic 1 Successful
first_topic 2 Successful