Apache Kafka Kurulumu ve Temel API'ye Giriş
Apache Kafka, binlerce şirket tarafından yüksek performanslı data pipelines, streaming analytics, data integration, and mission-critical applicationlar için kullanılan açık kaynaklı bir distributed event streaming platformudur.
Apache Kafka Windows Kurulumu
Aşağıdaki adresten dönemin güncel api'lerinin indirme bağlantılarına ulaşabiliriz. Yazının yazıldığı tarih için 3.0 sürümü mevcut olmasına rağmen Windows'ta Zookeeper log dosyalarını yaratırken hata ile karşılaşıyoruz. Bunu bypass edebilmek için 2.8.1 sürümü ile devam ediyoruz.
Java sürümümüzü ve ortam değişkenlerimizi dönemin minimum gereksinimlerine göre ayarlamalıyız.
Kafka Server ve Zookeeper Ayarları
Öncelikle Windows'ta çalışacağımız için binary'de gelen windows dizini altındaki bat dosyalarını kullanıyoruz. Parent'taki linux için olan sh'ları değil. C altına kafka klasörümüzü oluşturarak örnek çalışmaya aşağıda devam ediyoruz.
C:\kafka\bin\windows
C:\kafka\config altında server.properties ve zookeeper.properties mevcut. Bunların içerisinde kullanılacak port bilgisinden log dosyalarını çıkaracağımız dizine kadar bunları kendi isteklerimiz doğrultusunda ayarlayabiliriz.
Komutlar ile Basit Mesaj Trafiği
Aşağıdaki linkten Kafka'nın kendi dökümanını geliştirme sürecinde referans alabiliriz.
Aşağıda gösterilecek işlemlerdeki komutlar dosyalara yazılıp batch olarak kullanılabilir, bu daha sağlıklı bir yaklaşım olacaktır. server-starter.bat zookeper-starter.bat gibi.
Hatalar ile karşılaşma durumunda yönetici olarak çalıştırma ve oluşturulmuş log dosyalarını temizleyip tekrar denemek işe yarabilir.
Zookeeper'ı başlatmak için :
C:/kafka/bin/windows/zookeeper-server-start.bat C:/kafka/config/zookeeper.properties
Kafka-Server başlatmak için :
C:/kafka/bin/windows/kafka-server-start.bat C:/kafka/config/server.properties
Topic yaratmak için :
C:/kafka/bin/windows/kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-test
--list ile oluşturduğunuz topicleri görebilirsiniz.
Producer başlatmak için :
C:/kafka/bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic topic-test
Consumer başlatmak için :
C:/kafka/bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 -topic topic-test --from-beginning
--partitions 5 --replication-factor 1
Topic'imizi yaratırken yukarıdaki gibi parametrelerle yapımızı değiştirebiliriz.
Java Demo
maven dependencies
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.8.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-jdk14 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>2.0.0-alpha0</version>
<scope>runtime</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.0-alpha0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.0-alpha0</version>
<scope>test</scope>
</dependency>
Logger bağımlılıkları hata attığı için eklendi fakat hata mesajların gönderilmesine engel değil.
Sender.java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Sender {
public static void main(String[] args) {
Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "testid-1");
Producer<String, String> producer = new KafkaProducer<String, String>(config, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic-test", "A", "Message"));
producer.commitTransaction();
} catch (Exception e){
producer.abortTransaction();
}
producer.close();
}
}
Sender'ı çalıştırdığımızda yukarıda komutla oluşturduğumuz consumer'a mesajın geldiğini görebiliriz. Ama aslında consumer tarafında da yapılması gereken bazı ayarlar var (auto-commit, isolation-level ...) transaction mekanızmasının çoklu mesajlarda vs. tam anlamıyla çalışabilmesi için. Detayları konu dışı olacağı için atlıyorum.
Consumer tarafında ise commit'lerin auto ya da manuel offset'leri kontrol ederek yapabileceğimizi, aynı zamanda Kafka'nın thread-safe olmadığını ve bunun yönetimini kendimiz yapmamız gerektiğini belirtmekte fayda var.