Kafka is open source application that deals with messaging services. It’s used for streaming data process. It’s built by Linkedin. It’s written by Java and Scala. It’s part of Hadoop ecosystem. It’s used widely with Hadoop applications. Kafka works under a cluster (set of machines).
Kafka Architecture
Kafka parts
1- Records: A set of records in key/value with timestamp
2- Topic: is Array or buffer to keep the records. The topic is any node in the cluster. The topic can divide into partitions and segments.
3- Log: Each topic has log. When we divide the topic to different partitions, each partition has log
4- Producer: this part is responsible of sending the records into topics. We could have more than one consumer.
5- Consumer: this part is responsible to receive the records from the topic
6- Brokers: is the node in the Kafka cluster that responsible for get the producer’s records and put these records in the topic and send the records to the consumers. Also It’s responsible for replication the topics.
7- Partitions: Each topic can divide into different partitions. Each partition has log. Each log saves on the node.
8- Clusters: is a set of machines/nodes that Kafka runs on them
Kafka message in the cluster(brokers)
- The producer can send records round-robin. Each record should contain at least one message. This message could be in a different format like Text, Json, Avro object or XML
- The procedure can send the records depending on the (key, value). The producer could send the record (message). This message will send in a different format like Text, Json, Avro object or XML. Each partition will have the records according to the key.
Kafka and Zookeeper
Kafka uses Zookeeper to manage the Kafka cluster. The Zookeeper uses a leadership election for Broker Topic Partition Leaders. The topic partition leader is responsible for handing the read/writes activities. There is a duplication in partition. We can specify the number of deplication when we create the topic. In each deplicated partitions, there is a leader. The leader partition is happening through elections. Zookeeper is resposible for this task. The consumer tries to get the data from leader partition. If the partition is down, the consumer will get the data from another partition.
ZooKeeper facilities synchronization in the process by maintaining a status on ZooKeeper servers that store information in local log files.
When the consumer subscribes to a topic, Kafka provides the current offset of the topic to the consumer and the offset is saved in the ZooKeeper.
Kafka API’s
- Producer API
Producer API gives the ability to send the messages to Kafka cluster and deals with streaming. The maven repository
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
- Consumer API
Consumer API gives the ability to receive the messages from Kafka cluster. Also it deals with streaming. The maven repository
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
- Stream API
Kafka streams allows you to build real time applications. Kafka steams API gives you the ability to build the real time applications. Kafka stream gives Highly scalable, elastic, distributed, and fault-tolerant application. Also has the ability to deal Stateful and stateless processing. Also Kafka use for aggregation applications. Read more about Kafka stream
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.5.0</version>
</dependency>
- Connector API
Kafka gives you the ability to connect to another databases through JDBC. Also you can connect to AVRO file format.
- Admin API
Kafka has APIs to manage topics, to manage the brokers and Kafka configuration
Kafka multiple brockers
- Copy server to server1 & server2:
cp C:\bigdata\kafka\kafka_2.12-2.5.0\config\server.properties C:\bigdata\kafka\kafka_2.12-2.5.0\config\server1.properties
cp C:\bigdata\kafka\kafka_2.12-2.5.0\config\server.properties C:\bigdata\kafka\kafka_2.12-2.5.0\config\server2.properties - Change configuration for server1 & server2
server1.properties file:
brocker.id = 1
listeners=PLAINTEXT://:9093
log.dirs=C:\bigdata\kafka\kafka_2.12-2.5.0\kafka-logs1
server2.properties file:
file:brocker.id = 2
listeners=PLAINTEXT://:9094
log.dirs=C:\bigdata\kafka\kafka_2.12-2.5.0\kafka-logs2 - Start two Kafka servers (brocker1 & brocker2)
C:\bigdata\kafka\kafka_2.12-2.5.0\bin\kafka-server-start.bat C:\bigdata\kafka\kafka_2.12-2.5.0\config\server1.properties
C:\bigdata\kafka\kafka_2.12-2.5.0\bin\kafka-server-start.bat C:\bigdata\kafka\kafka_2.12-2.5.0\config\server2.properties - Create topic for two brockers
C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 2 –partitions 2 –topic topictest
Kafka Security
Kerberos security
Kerberos with keytab
create kafka-jaas.conf file
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
principal="userid@HADOOP.CA"
useKeyTab=true
serviceName="kafka"
keyTab="/etc/security/hadoop_ca.keytab"
client=true;
};
Java code
import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class Producer {
public static void main(String[] args) {
String topic = args[0];
Properties props = new Properties();
props.put("metadata.broker.list", "127.0.0.1:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "all");
props.put("security.protocol", "PLAINTEXTSASL");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (int i = 0; i < 5; i++){
producer.send(new KeyedMessage<String, String>(topic, "Test Date: " + new Date()));
}
}
}
Call java with Kerberos config
java -Djava.security.auth.login.config=/tmp/kafka-jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -cp kafka-testing-0.0.1-jar-with-dependencies.jar com.kafka.Producer
SSL security
We add this configuration to consumer/producer properties
//configure the following three settings for SSL Encryption props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"truststore.jks"); props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password"); //configure the following three settings for SSL Authentication props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "keystore.jks"); props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password"); props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password");
Install Kafka on windows
Configuration:
- Download Kafka from https://kafka.apache.org/downloads an uncopress it (Choose 2.6 or 2.7 version).
- There is two importnat folder (config & bin)
- The Kafka comes with default Zookeeper.
- Go to conf folder. In conf folder, we have two important files (zookeeper.properties & server.properties)
- In zookeeper.properties file, we have to specifiy (in case of run the Zookeeper locally)
#the folder to keep the intermediate data for the Zookeeper
dataDir=C:/bigdata/kafka/kafka_2.12-2.5.0/data
#the port at which the clients will connect
clientPort=2181 - In server.properties file, we have to sepcify
#zookeeper server and port
zookeeper.connect=localhost:2181
#change the log for Kafka
log.dirs=C:/bigdata/kafka/kafka_2.12-2.5.0/kafka-logs
Run Zookeeper with Kafka:
- Go to bin\windows
C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows - For any error or when restart, remove the files from log folders
- You need to open one cmd windows session for Zookeeper, another one for Kafka start server and two for Kafka commands (one for producer and one for consumer)
Start Zookeeper:
C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows\zookeeper-server-start.bat C:\bigdata\kafka\kafka_2.12-2.5.0\config\zookeeper.properties
Start Kafka:
C:\bigdata\kafka\kafka_2.12-2.5.0\bin\kafka-server-start.bat C:\bigdata\kafka\kafka_2.12-2.5.0\config\server.properties - In any cmd of consumer or producer, run create topic
C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topictest
Explanation: Kafka will create topictest with one partition and 1 copy of duplication. - We can check the topic list in any consumer or producer cmd session
C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows\kafka-topics.bat –list –zookeeper localhost:2181 - We can list the specification for any topic
C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows\kafka-topics –zookeeper localhost:2181 –describe –topic topictest - We can delete the topic from Kafka
C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows\kafka-topics.bat –zookeeper localhost:2181 –topic topictest –delete
Run producer/consumer on windows
C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows\kafka-console-producer.bat –bootstrap-server localhost:9092 –topic topictest
C:\bigdata\kafka\kafka_2.12-2.5.0\bin\windows\kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic topictest
Kafka Consumer/Producer examples
We have to build two classes. One class is for consumer and another class is for producer.
- We can run the producer application with “kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic test”
- We can run the consumer application with “kafka-console-producer.bat –broker-list localhost:9094 –topic test”
Read for more details