Today, will quickly go through Spring Boot Messaging implementation using RabbitMQ – AMQP.
AMQP stands for Advanced Messaging Queuing Protocol. It is an open standard wire specification for asynchronous messaging based communication. It provides a description on how a message should be constructed.
The widely used AMQP brokers are RabbitMQ, StormMQ and OpenAMQ. Brokers role is to receive message from publishers and route them back to consumers. For difference between JMS v/s AMQP see https://www.linkedin.com/pulse/jms-vs-amqp-eran-shaham
AMQP Entities:
- Exchanges
- bindings
- Queues
Exchanges are entities where messages are sent and route these messages to one or more queues. The routing algorithm depends on exchange type and rule called bindings. The four exchange types are
- Direct Exchange – the default one (in this blog, I used this)
- Fanout Exchange
- Topic Exchange
- Headers Exchange
Lets start with Creating a AMQP clients i.e. Producer and Consumers. In spring boot select RabbitMQ as starter pack. In this example, will see how to publish a JSON message as a string and how to read (consume) same message.
A. project pom.xml looks like below:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.vkey</groupId>
<artifactId>AMQP_Client_New</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>AMQP_Client_New</name>
<description>Demo project for Spring Boot - AMQP</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Note: the GSON dependency included because we supposed to publish JSON message. To build message in JSON format, gson dependency introduced.
B. application.properties file – define AMQP connection details as below
#Rabbitmq configuration
spring.rabbitmq.host=10.225.91.6
spring.rabbitmq.port=5672 #default port for AMQP
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=hal.aero
Spring-AMQP automatically creates connectionFactory from property file. Will see in next section.
C. SpringBoot main application class – this class also acts as publisher, In PostConstruct it build and publish the message.
import java.util.ArrayList;
import java.util.List;
import javax.annotation.PostConstruct;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@SpringBootApplication
public class AmqpClientNewApplication {
@Autowired
private RabbitTemplate rabbitTemplate;
public static void main(String[] args) {
SpringApplication.run(AmqpClientNewApplication.class, args);
}
@PostConstruct
public void AmqpClientNewApplication_1() {
System.out.println(">>>>** Running AMQP Client **<<<<< ");
CampaignStatus cs = new CampaignStatus();
cs.setCampaignOrRepairID("0123456789-AP");
cs.setDateOfChange("20192019");
cs.setVin("1AYP48S81C605935V");
cs.setExecutor("SAHYADRI");
cs.setRetailerID("Deccan PUN");
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.serializeNulls();
Gson gson = gsonBuilder.create();
List<CampaignStatus> csList =new ArrayList<>();
csList.add(cs);
System.out.println(">>>> "+ gson.toJson(csList).toString());
rabbitTemplate.convertAndSend(RobbitMqConfig.publishQName, gson.toJson(csList).toString());
System.out.println("Is listener returned :"+rabbitTemplate.isReturnListener());
}
}
D. To read the application.properties file
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;
@Component
@Configuration
@ConfigurationProperties(prefix="spring.rabbitmq")
@Validated
public class ConfigInitializer {
String host;
String port;
String username;
String password;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getPort() {
return port;
}
public void setPort(String port) {
this.port = port;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
E. RabbitMQ configuration file, to configure Beans, RabbitTemplate and container
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*@Configuration
@EnableAutoConfiguration*/
public class RobbitMqConfig {
@Autowired
Receiver receiver;
public final static String publishQName ="prices.campaignstatus.sp";
public final static String consumeQName ="prices.campaignstatus.sp";
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter(){
Jackson2JsonMessageConverter con= new Jackson2JsonMessageConverter();
return con;
}
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate templete=new RabbitTemplate();
templete.setExchange("x.prices");//Set exchange
templete.setQueue(publishQName);//Declare queue
templete.setMessageConverter(jsonMessageConverter());
templete.setConnectionFactory(connectionFactory);
return templete;
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(consumeQName);
container.setMessageConverter(jsonMessageConverter());
container.setMessageListener(listenerAdapter(receiver));
return container;
}
/*
* @Bean
*
* @ConditionalOnMissingBean MessageListener consumer(){
return new Consumer();
* }
*/
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) { return new
MessageListenerAdapter(receiver, "receiveMessage"); }
}
F. Receiver i.e. AMQP Listener class as below
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
private static final Logger logger = LoggerFactory.getLogger(Receiver.class);
@RabbitListener(queues = "prices.campaignstatus.sp")
public void receiveMessage(String cs) {
System.out.println("Received <<<<< " + cs.toString() + ">>>>>");
}
}
Note: Don’t forget to change parameter to the type of message you expect, i.e. String or byte[] etc in method definition i.e. receiveMessage(String cs)
That’s all Folks – Your AMQP producers and receiver clients are ready.