Kafka is open source application that deals with messaging services. It’s used for streaming data process. It’s built by Linkedin. It’s written by Java and Scala. It’s part of Hadoop ecosystem. It’s used widely with Hadoop applications. Kafka works under a cluster (set of machines).

Kafka Architecture

Kafka parts

1- Records: A set of records in key/value with timestamp
2- Topic: is Array or buffer to keep the records. The topic is any node in the cluster. The topic can divide into partitions and segments.
3- Log: Each topic has log. When we divide the topic to different partitions, each partition has log
4- Producer: this part is responsible of sending the records into topics. We could have more than one consumer.
5- Consumer: this part is responsible to receive the records from the topic
6- Brokers: is the node in the Kafka cluster that responsible for get the producer’s records and put these records in the topic and send the records to the consumers. Also It’s responsible for replication the topics.
7- Partitions: Each topic can divide into different partitions. Each partition has log. Each log saves on the node.
8- Clusters: is a set of machines/nodes that Kafka runs on them

Kafka and Zookeeper

Kafka uses Zookeeper to manage the Kafka cluster. The Zookeeper uses leadership election for Broker Topic Partition Leaders. The topic partition leader is responsible for handing the read/writes activities.

Kafka API’s

  • Producer API

Producer API gives the ability to send the messages to Kafka cluster and deals with streaming. The maven repository

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.0</version>
</dependency>
  • Consumer API

Consumer API gives the ability to receive the messages from Kafka cluster. Also it deals with streaming. The maven repository

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.0</version>
</dependency>

  • Stream API

Kafka streams allows you to build real time applications. Kafka steams API gives you the ability to build the real time applications. Kafka stream gives Highly scalable, elastic, distributed, and fault-tolerant application. Also has the ability to deal Stateful and stateless processing. Also Kafka use for aggregation applications. Read more about Kafka stream

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.5.0</version>
</dependency>
  • Connector API

Kafka gives you the ability to connect to another databases through JDBC. Also you can connect to AVRO file format.

  • Admin API

Kafka has APIs to manage topics, to manage the brokers and Kafka configuration

Kafka Security

Kerberos security

Kerberos with keytab

create kafka-jaas.conf file

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 doNotPrompt=true
 useTicketCache=true
 principal="userid@HADOOP.CA"
 useKeyTab=true
 serviceName="kafka"
 keyTab="/etc/security/hadoop_ca.keytab"
 client=true;
};

Java code

import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class Producer {

    public static void main(String[] args) {
        String topic = args[0];

        Properties props = new Properties();
        props.put("metadata.broker.list", "127.0.0.1:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "all");
        props.put("security.protocol", "PLAINTEXTSASL");

        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);

        for (int i = 0; i < 5; i++){
            producer.send(new KeyedMessage<String, String>(topic, "Test Date: " + new Date()));
        }
    }
}

Call java with Kerberos config

java -Djava.security.auth.login.config=/tmp/kafka-jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -cp kafka-testing-0.0.1-jar-with-dependencies.jar com.kafka.Producer

SSL security

We add this configuration to consumer/producer properties

//configure the following three settings for SSL Encryption
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  "password");

//configure the following three settings for SSL Authentication
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "keystore.jks");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password");

Install Kafka on windows

Kafka Consumer/Producer examples

we built two classes. One class is for consumer and another class is for producer.

  • We can run the producer application with “kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic test”
  • We can run the consumer application with “kafka-console-producer.bat –broker-list localhost:9092 –topic test”

Read for more details

Kafka stream