Kafka is open source application that deals with messaging services. It’s used for streaming data process. It’s built by Linkedin. It’s written by Java and Scala. It’s part of Hadoop ecosystem. It’s used widely with Hadoop applications. Kafka works under a cluster (set of machines).

Kafka Architecture

Kafka parts

1- Records: A set of records in key/value with timestamp
2- Topic: is Array or buffer to keep the records. The topic is any node in the cluster. The topic can divide into partitions and segments.
3- Log: Each topic has log. When we divide the topic to different partitions, each partition has log
4- Producer: this part is responsible of sending the records into topics. We could have more than one consumer.
5- Consumer: this part is responsible to receive the records from the topic
6- Brokers: is the node in the Kafka cluster that responsible for get the producer’s records and put these records in the topic and send the records to the consumers. Also It’s responsible for replication the topics.
7- Partitions: Each topic can divide into different partitions. Each partition has log. Each log saves on the node.
8- Clusters: is a set of machines/nodes that Kafka runs on them

Kafka message in the cluster(brokers)

  • The producer can send records round-robin. Each record should contain at least one message. This message could be in a different format like Text, Json, Avro object or XML
Records round-robin
  • The procedure can send the records depending on the (key, value). The producer could send the record (message). This message will send in a different format like Text, Json, Avro object or XML. Each partition will have the records according to the key.
Records (key, value)

Kafka and Zookeeper

Kafka uses Zookeeper to manage the Kafka cluster. The Zookeeper uses a leadership election for Broker Topic Partition Leaders. The topic partition leader is responsible for handing the read/writes activities. There is a duplication in partition. We can specify the number of deplication when we create the topic. In each deplicated partitions, there is a leader. The leader partition is happening through elections. Zookeeper is resposible for this task. The consumer tries to get the data from leader partition. If the partition is down, the consumer will get the data from another partition.

ZooKeeper facilities synchronization in the process by maintaining a status on ZooKeeper servers that store information in local log files.

When the consumer subscribes to a topic, Kafka provides the current offset of the topic to the consumer and the offset is saved in the ZooKeeper.

Kafka API’s

  • Producer API

Producer API gives the ability to send the messages to Kafka cluster and deals with streaming. The maven repository

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.0</version>
</dependency>
  • Consumer API

Consumer API gives the ability to receive the messages from Kafka cluster. Also it deals with streaming. The maven repository

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.0</version>
</dependency>

  • Stream API

Kafka streams allows you to build real time applications. Kafka steams API gives you the ability to build the real time applications. Kafka stream gives Highly scalable, elastic, distributed, and fault-tolerant application. Also has the ability to deal Stateful and stateless processing. Also Kafka use for aggregation applications. Read more about Kafka stream

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.5.0</version>
</dependency>
  • Connector API

Kafka gives you the ability to connect to another databases through JDBC. Also you can connect to AVRO file format.

  • Admin API

Kafka has APIs to manage topics, to manage the brokers and Kafka configuration

Kafka multiple brockers

  • Copy server to server1 & server2:
    cp C:\bigdata\kafka\kafka_2.12-2.5.0\config\server.properties C:\bigdata\kafka\kafka_2.12-2.5.0\config\server1.properties
    cp C:\bigdata\kafka\kafka_2.12-2.5.0\config\server.properties C:\bigdata\kafka\kafka_2.12-2.5.0\config\server2.properties
  • Change configuration for server1 & server2

    server1.properties file:
    brocker.id = 1
    listeners=PLAINTEXT://:9093
    log.dirs=C:\bigdata\kafka\kafka_2.12-2.5.0\kafka-logs1

    server2.properties file:
    file:brocker.id = 2
    listeners=PLAINTEXT://:9094
    log.dirs=C:\bigdata\kafka\kafka_2.12-2.5.0\kafka-logs2

  • Start two Kafka servers (brocker1 & brocker2)

    C:\bigdata\kafka\kafka_2.12-2.5.0\bin\kafka-server-start.bat C:\bigdata\kafka\kafka_2.12-2.5.0\config\server1.properties

    C:\bigdata\kafka\kafka_2.12-2.5.0\bin\kafka-server-start.bat C:\bigdata\kafka\kafka_2.12-2.5.0\config\server2.properties

  • Create topic for two brockers

    C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 2 –partitions 2 –topic topictest

Kafka Security

Kerberos security

Kerberos with keytab

create kafka-jaas.conf file

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 doNotPrompt=true
 useTicketCache=true
 principal="userid@HADOOP.CA"
 useKeyTab=true
 serviceName="kafka"
 keyTab="/etc/security/hadoop_ca.keytab"
 client=true;
};

Java code

import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class Producer {

    public static void main(String[] args) {
        String topic = args[0];

        Properties props = new Properties();
        props.put("metadata.broker.list", "127.0.0.1:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "all");
        props.put("security.protocol", "PLAINTEXTSASL");

        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);

        for (int i = 0; i < 5; i++){
            producer.send(new KeyedMessage<String, String>(topic, "Test Date: " + new Date()));
        }
    }
}

Call java with Kerberos config

java -Djava.security.auth.login.config=/tmp/kafka-jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -cp kafka-testing-0.0.1-jar-with-dependencies.jar com.kafka.Producer

SSL security

We add this configuration to consumer/producer properties

//configure the following three settings for SSL Encryption
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  "password");

//configure the following three settings for SSL Authentication
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "keystore.jks");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password");

Install Kafka on windows

Configuration:

  • Download Kafka from https://kafka.apache.org/downloads an uncopress it (Choose 2.6 or 2.7 version).
  • There is two importnat folder (config & bin)
  • The Kafka comes with default Zookeeper.
  • Go to conf folder. In conf folder, we have two important files (zookeeper.properties & server.properties)
  • In zookeeper.properties file, we have to specifiy (in case of run the Zookeeper locally)
    #the folder to keep the intermediate data for the Zookeeper
    dataDir=C:/bigdata/kafka/kafka_2.12-2.5.0/data
    #the port at which the clients will connect
    clientPort=2181
  • In server.properties file, we have to sepcify
    #zookeeper server and port
    zookeeper.connect=localhost:2181
    #change the log for Kafka
    log.dirs=C:/bigdata/kafka/kafka_2.12-2.5.0/kafka-logs

Run Zookeeper with Kafka:

  • Go to bin\windows
    C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows
  • For any error or when restart, remove the files from log folders
  • You need to open one cmd windows session for Zookeeper, another one for Kafka start server and two for Kafka commands (one for producer and one for consumer)
    Start Zookeeper:
    C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows\zookeeper-server-start.bat C:\bigdata\kafka\kafka_2.12-2.5.0\config\zookeeper.properties
    Start Kafka:
    C:\bigdata\kafka\kafka_2.12-2.5.0\bin\kafka-server-start.bat C:\bigdata\kafka\kafka_2.12-2.5.0\config\server.properties
  • In any cmd of consumer or producer, run create topic
    C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topictest
    Explanation: Kafka will create topictest with one partition and 1 copy of duplication.
  • We can check the topic list in any consumer or producer cmd session
    C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows\kafka-topics.bat –list –zookeeper localhost:2181
  • We can list the specification for any topic
    C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows\kafka-topics –zookeeper localhost:2181 –describe –topic topictest
  • We can delete the topic from Kafka
    C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows\kafka-topics.bat –zookeeper localhost:2181 –topic topictest –delete

Run producer/consumer on windows

C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows\kafka-console-producer.bat –bootstrap-server localhost:9092 –topic topictest

C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows\kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic topictest

Kafka Consumer/Producer examples

We have to build two classes. One class is for consumer and another class is for producer.

  • We can run the producer application with “kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic test”
  • We can run the consumer application with “kafka-console-producer.bat –broker-list localhost:9094 –topic test”

Read for more details

Kafka stream