Apache Kafka Kurulumu ve Temel API'ye Giriş
4 min read

Apache Kafka Kurulumu ve Temel API'ye Giriş

Apache Kafka Kurulumu ve Temel API'ye Giriş

Apache Kafka, binlerce şirket tarafından yüksek performanslı data pipelines, streaming analytics, data integration, and mission-critical applicationlar için kullanılan açık kaynaklı bir distributed event streaming platformudur.


Apache Kafka Windows Kurulumu

Aşağıdaki adresten dönemin güncel api'lerinin indirme bağlantılarına ulaşabiliriz. Yazının yazıldığı tarih için 3.0 sürümü mevcut olmasına rağmen Windows'ta Zookeeper log dosyalarını yaratırken hata ile karşılaşıyoruz. Bunu bypass edebilmek için 2.8.1 sürümü ile devam ediyoruz.

Apache Kafka
Apache Kafka: A Distributed Streaming Platform.

Java sürümümüzü ve ortam değişkenlerimizi dönemin minimum gereksinimlerine göre ayarlamalıyız.


Kafka Server ve Zookeeper Ayarları

Öncelikle Windows'ta çalışacağımız için binary'de gelen windows dizini altındaki bat dosyalarını kullanıyoruz. Parent'taki linux için olan sh'ları değil. C altına kafka klasörümüzü oluşturarak örnek çalışmaya aşağıda devam ediyoruz.

C:\kafka\bin\windows

C:\kafka\config altında server.properties ve zookeeper.properties mevcut. Bunların içerisinde kullanılacak port bilgisinden log dosyalarını çıkaracağımız dizine kadar bunları kendi isteklerimiz doğrultusunda ayarlayabiliriz.


Komutlar ile Basit Mesaj Trafiği

Aşağıdaki linkten Kafka'nın kendi dökümanını geliştirme sürecinde referans alabiliriz.

Apache Kafka
Apache Kafka: A Distributed Streaming Platform.

Aşağıda gösterilecek işlemlerdeki komutlar dosyalara yazılıp batch olarak kullanılabilir, bu daha sağlıklı bir yaklaşım olacaktır. server-starter.bat zookeper-starter.bat gibi.

Hatalar ile karşılaşma durumunda yönetici olarak çalıştırma ve oluşturulmuş log dosyalarını temizleyip tekrar denemek işe yarabilir.

Zookeeper'ı başlatmak için :

C:/kafka/bin/windows/zookeeper-server-start.bat C:/kafka/config/zookeeper.properties

Kafka-Server başlatmak için :

C:/kafka/bin/windows/kafka-server-start.bat C:/kafka/config/server.properties

Topic yaratmak için :

C:/kafka/bin/windows/kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-test

--list ile oluşturduğunuz topicleri görebilirsiniz.

Producer başlatmak için :

C:/kafka/bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic topic-test

Consumer başlatmak için :

C:/kafka/bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 -topic topic-test --from-beginning
--partitions 5 --replication-factor 1

Topic'imizi yaratırken yukarıdaki gibi parametrelerle yapımızı değiştirebiliriz.


Java Demo

maven dependencies

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <version>2.8.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-jdk14 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-jdk14</artifactId>
            <version>2.0.0-alpha0</version>
            <scope>runtime</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>2.0.0-alpha0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>2.0.0-alpha0</version>
            <scope>test</scope>
        </dependency>

Logger bağımlılıkları hata attığı için eklendi fakat hata mesajların gönderilmesine engel değil.

Sender.java

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Sender {

    public static void main(String[] args) {
        Properties config = new Properties();

        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "testid-1");
        Producer<String, String> producer = new KafkaProducer<String, String>(config, new StringSerializer(), new StringSerializer());

        producer.initTransactions();

        try {
            producer.beginTransaction();
            producer.send(new ProducerRecord<>("topic-test", "A", "Message"));
            producer.commitTransaction();
        } catch (Exception e){
            producer.abortTransaction();
        }
        producer.close();
    }
}

Sender'ı çalıştırdığımızda yukarıda komutla oluşturduğumuz consumer'a mesajın geldiğini görebiliriz. Ama aslında consumer tarafında da yapılması gereken bazı ayarlar var (auto-commit, isolation-level ...) transaction mekanızmasının çoklu mesajlarda vs. tam anlamıyla çalışabilmesi için. Detayları konu dışı olacağı için atlıyorum.

Consumer tarafında ise commit'lerin auto ya da manuel offset'leri kontrol ederek yapabileceğimizi, aynı zamanda Kafka'nın thread-safe olmadığını ve bunun yönetimini kendimiz yapmamız gerektiğini belirtmekte fayda var.