Producer example

It’s the application is reponsible for sending the text to Kafka system

The steps to create a producer

  • Create properties
  • Create Producer from ProducerKafka
  • Create Record from ProducerRecord
  • Send the record as message

Type of messages to send (Synchronize or A synchronize)

Synchronize: Send a message wait for response of success (Network delay).

Example:
kafkaProducer.send(new ProducerRecord(topicName, Integer.toString(i), “test message – ” + i )).get();

Note: It’s better to put send in try and catch

A synchronize: Send messages and after that it will wait for response. The number of messages by max.inflight.requests.per.connection. The default value is 5.

Example:

producer.send(record,myclass);
class myclass implement Callback {
   @Override
   Public void onCompletion(RecordMetadata recordMetaData,Exception e) {
   }
 }

Acknowledgment types

Producer Acks
1- Acks 0 (NONE): The producer is sending a message without waiting
2- Acks 1 (LEADER): The producer is sending a message with waiting acknowledgment
3- Acks=all or Aks=-1: The producer is sending a message with waiting acknowledgment from all prducers

Maven source:

 <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
</dependency>
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class producer {

    static KafkaProducer<String, String> producer=null;

    public static void main(String[] args){

        tring topicName = "test";

        Properties producerConfig  = new Properties();
        producerConfig.put("bootstrap.servers", "localhost:2181");
        producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer kafkaProducer = new KafkaProducer(producerConfig);

        for(int i = 0; i < 100; i++){
            System.out.println(i);
            kafkaProducer.send(new ProducerRecord(topicName, Integer.toString(i), "test message - " + i ));
        }

        System.out.println("Producer complete");

    }


}

Consumer example

Maven source

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
</dependency>
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

public class consumer {


    public static void main(String[] args) {

        Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup");
        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<byte[], byte[]> consumerR = new KafkaConsumer<>(consumerConfig);
        ReceiveConsumerRebalanceListener rebalanceListener = new ReceiveConsumerRebalanceListener();
        consumerR.subscribe(Collections.singletonList("test"), rebalanceListener);

        while (true) {
            ConsumerRecords<byte[], byte[]> records = consumerR.poll(1000);
            for (ConsumerRecord<byte[], byte[]> record : records) {
                System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }

            consumerR.commitSync();
        }

    }

    private static class  ReceiveConsumerRebalanceListener implements ConsumerRebalanceListener {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            System.out.println("Called onPartitionsRevoked with partitions:" + partitions);
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            System.out.println("Called onPartitionsAssigned with partitions:" + partitions);
        }
    }

}