MQTT Protocol in Java

MQTT (Message Queuing Telemetry Transport) is a messaging transport protocol that works on Client-Server architecture with publish/subscribe messaging pattern. It is lightweight, open, simple, easy to implement and also has encryption techniques for security. The protocol mostly runs on the TCP/IP protocol architecture. These highlights make it perfect for use in many situations, like communication in Machine-to-Machine (M2M) and Internet of Things (IoT) settings where a small code impression or footprint is required with high and prime network bandwidth.

To include the Paho library in Maven project, we add the ‘paho.client.mqtt‘ dependency on our project. The Paho Java Client is an MQTT customer library in Java for creating applications that are compatible on Java platforms or can operate on JVM. MQTT messaging in Java is done by utilizing the libraries given by Eclipse Paho.

To implement the MQTT Protocol, we use Mosquitto server. Eclipse Mosquitto project is an open-source message broker that implements the MQTT protocol. Mosquitto being so lightweight and platform-compatible is suitable to use on all devices from small powered single board computers to full stacked strong and robust servers.

Downloading and Installing the MQTT Protocol via Mosquitto

You will need to install the MQTT protocol in your machine/system in order for this code to work and run successfully. You can download the Eclipse Mosquitto project for implementing the MQTT protocol from the download link here. You can download Mosquitto for Windows, macOS and even Linux distributions as it is platform-friendly. The download setups available will install the MQTT protocol in your machine using the Mosquitto server. The Mosquitto server will connect to your TCP localhost at port number 1883.

After connecting the Mosquitto server with the localhost, we see the basic implementation of MQTT Protocol using Mosquitto in Java through the following code. In this code, the program firstly checks whether it is connected to the broker and then publishes the message after the connection is successfully established.

package sub;
//Name of the package in Eclipse

import java.io.File;
import java.io.IOException;
import java.util.Scanner;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQTTPubPrint{

  public static void main(String[] args) 
  {
    Scanner sc = null;
    try {
      sc = new Scanner(new File("textfile.txt"));
      // Location of the text file is placed here
      // Check if there is another line of input
      while(sc.hasNextLine()){
        String str = sc.nextLine();
        // parse each line using delimiter
        parseData(str);
      }
    } catch (IOException  exp) {
      // TODO Auto-generated catch block
      exp.printStackTrace();
    }finally{
      if(sc != null)
        sc.close();
    }              
  }

  private static void parseData(String str){    
    Scanner lineScanner = new Scanner(str);
    lineScanner.useDelimiter(",");
    while(lineScanner.hasNext()) {
      String topic        = "Pressure";
      String content1     = lineScanner.next() ;
      // String content1  = args[0]+"Pascal";
      int qos             =  1;
      String broker       = "tcp://localhost:1883";
      String PubId        = "127.0.0.1";
      MemoryPersistence persistence = new MemoryPersistence();
      // long startTime = System.nanoTime();
      try {

        MqttClient sampleClient = new MqttClient(broker, PubId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        connOpts.setConnectionTimeout(60);
        connOpts.setKeepAliveInterval(60);
        connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
        System.out.println("Connecting to broker: "+ broker);
        sampleClient.connect(connOpts);
        System.out.println("Connected");
        System.out.println("Publishing message: "+ content1);
        MqttMessage message = new MqttMessage(content1.getBytes());
        message.setQos(qos);
        sampleClient.publish(topic,message);
        System.out.println("Message published");

      } catch(MqttException me) {
        System.out.println("Reason :"+ me.getReasonCode());
        System.out.println("Message :"+ me.getMessage());
        System.out.println("Local :"+ me.getLocalizedMessage());
        System.out.println("Cause :"+ me.getCause());
        System.out.println("Exception :"+ me);
        me.printStackTrace();
      }

    }
    lineScanner.close();
  }


}


Leave a Reply

Your email address will not be published. Required fields are marked *