SpringBoot and RabbitMQ Messaging – AMQP

Today, will quickly go through Spring Boot Messaging implementation using RabbitMQ – AMQP.

AMQP stands for Advanced Messaging Queuing Protocol. It is an open standard wire specification for asynchronous messaging based communication. It provides a description on how a message should be constructed.

The widely used AMQP brokers are RabbitMQ, StormMQ and OpenAMQ. Brokers role is to receive message from publishers and route them back to consumers. For difference between JMS v/s AMQP see https://www.linkedin.com/pulse/jms-vs-amqp-eran-shaham

AMQP Entities:

  1. Exchanges
  2. bindings
  3. Queues

Exchanges are entities where messages are sent and route these messages to one or more queues. The routing algorithm depends on exchange type and rule called bindings. The four exchange types are

  1. Direct Exchange – the default one (in this blog, I used this)
  2. Fanout Exchange
  3. Topic Exchange
  4. Headers Exchange

Lets start with Creating a AMQP clients i.e. Producer and Consumers. In spring boot select RabbitMQ as starter pack. In this example, will see how to publish a JSON message as a string and how to read (consume) same message.

A. project pom.xml looks like below:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.4.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.vkey</groupId>
	<artifactId>AMQP_Client_New</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>AMQP_Client_New</name>
	<description>Demo project for Spring Boot - AMQP</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
		</dependency>
		<dependency>
		    <groupId>com.google.code.gson</groupId>
		    <artifactId>gson</artifactId>
		    <version>2.8.5</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Note: the GSON dependency included because we supposed to publish JSON message. To build message in JSON format, gson dependency introduced.

B. application.properties file – define AMQP connection details as below

 #Rabbitmq configuration
spring.rabbitmq.host=10.225.91.6
spring.rabbitmq.port=5672 #default port for AMQP
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=hal.aero

Spring-AMQP automatically creates connectionFactory from property file. Will see in next section.

C. SpringBoot main application class – this class also acts as publisher, In PostConstruct it build and publish the message.

import java.util.ArrayList;
import java.util.List;
import javax.annotation.PostConstruct;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

@SpringBootApplication
public class AmqpClientNewApplication {

    @Autowired
    private RabbitTemplate rabbitTemplate;
	
    public static void main(String[] args) {
	SpringApplication.run(AmqpClientNewApplication.class, args);
    }
	
    @PostConstruct
    public void AmqpClientNewApplication_1() {
	System.out.println(">>>>** Running AMQP Client **<<<<< ");
	CampaignStatus cs  = new CampaignStatus();
	cs.setCampaignOrRepairID("0123456789-AP");
	cs.setDateOfChange("20192019");
	cs.setVin("1AYP48S81C605935V");
	cs.setExecutor("SAHYADRI");
	cs.setRetailerID("Deccan PUN");
		 
	GsonBuilder gsonBuilder = new GsonBuilder();  
	gsonBuilder.serializeNulls();  
	Gson gson = gsonBuilder.create();
		 
	List<CampaignStatus> csList  =new ArrayList<>();
	csList.add(cs);
	System.out.println(">>>> "+ gson.toJson(csList).toString());
		 
rabbitTemplate.convertAndSend(RobbitMqConfig.publishQName, gson.toJson(csList).toString());
System.out.println("Is listener returned :"+rabbitTemplate.isReturnListener());
	}
}

D. To read the application.properties file

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;

@Component
@Configuration
@ConfigurationProperties(prefix="spring.rabbitmq")
@Validated
public class ConfigInitializer {

	String host;
	String port;
	String username;
	String password;

	public String getHost() {
		return host;
	}
	public void setHost(String host) {
		this.host = host;
	}
	public String getPort() {
		return port;
	}
	public void setPort(String port) {
		this.port = port;
	}
	public String getUsername() {
		return username;
	}
	public void setUsername(String username) {
		this.username = username;
	}
	public String getPassword() {
		return password;
	}
	public void setPassword(String password) {
		this.password = password;
	}	
}

E. RabbitMQ configuration file, to configure Beans, RabbitTemplate and container

import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/*@Configuration
@EnableAutoConfiguration*/
public class RobbitMqConfig  {
	
    @Autowired
    Receiver receiver;
	
    public final static String publishQName ="prices.campaignstatus.sp";
    public final static String consumeQName ="prices.campaignstatus.sp";
    
    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter(){
        Jackson2JsonMessageConverter con= new Jackson2JsonMessageConverter();
        return con;
    }
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate templete=new RabbitTemplate();
        templete.setExchange("x.prices");//Set exchange
        templete.setQueue(publishQName);//Declare queue
        templete.setMessageConverter(jsonMessageConverter());
        templete.setConnectionFactory(connectionFactory);
        return templete;
    }
    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(consumeQName);
        container.setMessageConverter(jsonMessageConverter());
        container.setMessageListener(listenerAdapter(receiver));
        return container;
    }
    
	/*
	 * @Bean
	 * 
	 * @ConditionalOnMissingBean MessageListener consumer(){ 
          return new Consumer();
	 * }
	 */
    
	  @Bean 
	  MessageListenerAdapter listenerAdapter(Receiver receiver) { return new
	  MessageListenerAdapter(receiver, "receiveMessage"); } 
}

F. Receiver i.e. AMQP Listener class as below

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receiver {
  private static final Logger logger = LoggerFactory.getLogger(Receiver.class);

  @RabbitListener(queues = "prices.campaignstatus.sp")
  public void receiveMessage(String cs) {   
     System.out.println("Received <<<<<  " + cs.toString() + ">>>>>");
  }
}

Note: Don’t forget to change parameter to the type of message you expect, i.e. String or byte[] etc in method definition i.e. receiveMessage(String cs)

That’s all Folks – Your AMQP producers and receiver clients are ready.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s