Producer example
It’s the application is reponsible for sending the text to Kafka system
The steps to create a producer
- Create properties
- Create Producer from ProducerKafka
- Create Record from ProducerRecord
- Send the record as message
Type of messages to send (Synchronize or A synchronize)
Synchronize: Send a message wait for response of success (Network delay).
Example:
kafkaProducer.send(new ProducerRecord(topicName, Integer.toString(i), “test message – ” + i )).get();
Note: It’s better to put send in try and catch
A synchronize: Send messages and after that it will wait for response. The number of messages by max.inflight.requests.per.connection. The default value is 5.
Example:
producer.send(record,myclass);
class myclass implement Callback {
@Override
Public void onCompletion(RecordMetadata recordMetaData,Exception e) {
}
}
Acknowledgment types
Producer Acks
1- Acks 0 (NONE): The producer is sending a message without waiting
2- Acks 1 (LEADER): The producer is sending a message with waiting acknowledgment
3- Acks=all or Aks=-1: The producer is sending a message with waiting acknowledgment from all prducers
Maven source:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class producer {
static KafkaProducer<String, String> producer=null;
public static void main(String[] args){
tring topicName = "test";
Properties producerConfig = new Properties();
producerConfig.put("bootstrap.servers", "localhost:2181");
producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer kafkaProducer = new KafkaProducer(producerConfig);
for(int i = 0; i < 100; i++){
System.out.println(i);
kafkaProducer.send(new ProducerRecord(topicName, Integer.toString(i), "test message - " + i ));
}
System.out.println("Producer complete");
}
}
Consumer example
Maven source
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
public class consumer {
public static void main(String[] args) {
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<byte[], byte[]> consumerR = new KafkaConsumer<>(consumerConfig);
ReceiveConsumerRebalanceListener rebalanceListener = new ReceiveConsumerRebalanceListener();
consumerR.subscribe(Collections.singletonList("test"), rebalanceListener);
while (true) {
ConsumerRecords<byte[], byte[]> records = consumerR.poll(1000);
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumerR.commitSync();
}
}
private static class ReceiveConsumerRebalanceListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Called onPartitionsRevoked with partitions:" + partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Called onPartitionsAssigned with partitions:" + partitions);
}
}
}