Creating a User Defined Kafka Producer Application in Java

Apache Kafka is a distributed, replicated commit log service commonly used for publish/subscribe type messaging, website activity handling, metric & log aggregation and stream processing. So here we are going to learn how to create a user defined Kafka Producer Application in Java.

Create a User Defined Kafka Producer Application in Java

It relinquishes the traditional server-client model in a manner that it uses the new Big Data computation effectively. It was developed by Neha Narkhede at Linkedin as the way to handle the heavy website activities and as a commit log service and uses Zookeeper in establishing a cluster(called broker) between the producers and the consumers. Some important keywords are explained below :
Topics: Feeds of messages are organized into topics.
Brokers: Kafka runs on a cluster of servers.
Producers: Processes the published messages to a Kafka topic.
Consumers: Processes that subscribe to topic and processes the feed of the published message.

Steps to be followed :

  1. Start the zookeeper service. Default zookeeper script is given in the bin folder.Run the following in the console
    bin/zookeeper-server-start.sh config/zookeeper.properties
  2. Start the Kafka broker service by running the following in a different console
    bin/kafka-server-start.sh config/server.properties
  3. Create a topic named anand by running in the console. This will create a topic named anand, connect it with the zookeeper running at port 2181 with replication factor set to 1 meaning that the topic shall have no other replicas on different clusters and is also set to have a single partition.
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic anand
  4. Creating our own custom Producer application in Java using Kafka APIs and then executing it.
  5. Running the default provided consumer script by running the following in the console :
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic anand

Step 4 in detail :

Create a Java project in and add the following jars :

jopt-simple-3.2.jar, kafka_2.11-0.9.0.0.jar, kafka-clients-0.9.0.0.jar, log4j-1.2.17.jar, metrics-core-2.2.0.jar, scala-library-2.11.7.jar, slf4j-api-1.7.6.jar, slf4j-log4j12-1.7.6.jar, snappy-java-1.1.1.7.jar, zkclient-0.7.jar, zookeeper-3.4.6.jar as the referenced jars.

Java program to create a User Defined Kafka Producer Application

Create a class named MyProducer and write the code in Java as :

import java.io.*;
import java.util.*;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.*;

public class MyProducer{
  @SuppressWarnings("resource")
  public static void main(String[] args)throws InterruptedException,ExecutionException, IOException{
    Properties props = new Properties();
    final String topic = "anand";
    InputStreamReader imp = new InputStreamReader(System.in);
    BufferedReader br = new BufferedReader(imp);
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost.localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props);
    System.out.println("Enter your messages from here now");
    int i =1;
    while(true) {
      String msg = br.readLine();
      String key = "key"+i;
      if(msg.equals(""))
        break;
      ProducerRecord<String,String> record = new ProducerRecord<String,String>(topic,key,msg);
      producer.send(record, new Callback() {
        public void onCompletion(RecordMetadata metadata,Exception exception) {
          if(exception == null) {
            System.out.println("Message delivered successfully : "+ msg);
          }
          else {
            System.out.println("Message not delivered : "+ exception.getMessage());
          }
        }
      });
      //System.out.println("Message sent : "+msg);
      System.out.println("Message sent : "+msg);
    }
    System.out.println("Re-run your code to again become producer client !!!");
  }
}

Explanation of the code :

  • The line import org.apache.kafka.clients.producer.KafkaProducer is used in order to create an instance of the KafkaProducer inside your application.The second line in this main function snippet creates a String called “topic” and assigns a topic name to the variable. We set topic to equal “anand”, since that is the name of the topic we would like to write messages to in this application.
  • The first line of main creates a new Properties object. Lines 5-7 create properties that are put into the prop object. Line 5 sets up the bootstrap server config – this is the connection information to our Kafka broker. Make sure that you update this to use the full hostname and port that YOUR broker is running on. As an example, “iopbeta42.localdomain:6667”. Lines 6 and 7 set the value and key serializer properties. The serializers are both setup for String here.
  • In Line 8 we actually create the KafkaProducer. The props object that contains all the properties you just set is passed in and used in the creation of the producer object. <String, String> specifies that we will be handling keys and values both in String format in the KafkaProducer object.
  • We then take in input in the msg variable and then create the ProducerRecord which is created with the topic we’d like to write to, the key, and the message that we just created a few lines earlier in the code.
  • Then we invoke the send method on the producer and pass in the new ProducerRecord object (named “record”) that we just created. We also create the callback – the on completion method will be called once the record that was sent is acknowledged.

Finally, run the application and see the working yourself !!!

Also, learn:

Leave a Reply

Your email address will not be published. Required fields are marked *