I am going to look at RabbitMQ using Java and Springframe work (both have very good support for RabbitMQ), there are other messaging broker systems like Kafka, ActiveMq and Kestrel which are all have similar aspects. RabbitMQ is a application to application messaging system, you can deploy it on-prem or in the cloud and can be configured to be highly scalable (offered by both Azure and AWS), it can be installed on many operating systems. It also offers the below:
I am not going to detail on how to install RabbitMQ and will point you to the RabbitMQ website for installation guide. I am going to use RabbitMQ version 3.8.1 on a Windows 10 laptop, I will be using Java 11 and the Intellij IDE. One note that RabbitMQ has many plugins like Linux, Docker, etc which add additional features to RabbitMQ.
RabbitMQ is basically a broker in the middle of a number of applications, in order for these applications to pass data between themselves they pass messages to RabbitMQ that the other applications pickup and process. This means that you don't have to configure each application individually. The setup can be a simple one or a very complex where many applications are involved.
Before we get into coding some terminology that you need to be familiar with regarding RabbitMQ
Publisher | is an application that send messages to a broker (RabbitMQ) |
Subscriber | is an application that subscribes to broker in particular a exchange/queue |
Consumer | is an application that will comsume (pull) the messages at the broker and process them |
Queue | a buffer that stores messages |
Exchange | routes messages to a specific queue or queues |
Routing Key | a key that the exchange uses to decide on how to route a specific message |
Binding | the link between an exchange and a queue |
Basically the flow is that the producer sends a message to RabbitMQ, the exchange will examine the message and depending on the binding, route the message to the specific queue. The consumer will have subscribed to the queue and thus will consume the message.
I have created a project called rabbitmq_dd, I have then created two modules called rabbitmq_producer and rabbitmq_consumer both of which use the Spring for RabbitMQ dependenancies, I have used Maven but you can use anything that you like. I have kept the two projects separate so that you can use them on different servers, etc.
![]() |
![]() |
Lastly we need to configure both the producer and consumer to point to the installed RabbitMQ, we use the application.properties file (or the application.yml file) as per below, change to reflect were the RabbitMQ is setup and any credentials that you have setup:
I have created a simple queue called dd.hello using the web GUI, just so that we can perform a simple connection and to create a message and consume it.
![]() |
![]() |
Basically we have to create a producer and then configure the application to use that producer
HelloProducer.java | package uk.co.datadisk.rabbitmq_producer; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; @Service public class HelloProducer { private final RabbitTemplate rabbitTemplate; public HelloProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendHello(String message) { // queue name, message to send rabbitTemplate.convertAndSend("dd.hello", "Hello " + message); } } |
RabbitmqProducerApplication.java | package uk.co.datadisk.rabbitmq_producer; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RabbitmqProducerApplication implements CommandLineRunner { private final HelloProducer helloProducer; public RabbitmqProducerApplication(HelloProducer helloProducer) { this.helloProducer = helloProducer; } public static void main(String[] args) { SpringApplication.run(RabbitmqProducerApplication.class, args); } @Override public void run(String... args) throws Exception { System.out.println("Sending Message to RabbitMQ"); helloProducer.sendHello("Paul Valle"); } } |
You can see the console output in intellij connecting to RabbitMQ
If we login to RabbitMQ go to the dd.hello queue and select get messages we can see our message
![]() |
![]() |
Now lets create a simple consumer, as you can see not much code is actually required, thats the beauty of the spring framework.
Consumer | package uk.co.datadisk.rabbitmq_consumer; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class HelloConsumer { @RabbitListener(queues = "dd.hello") // you can also add concurrentcy value for multiple consumers private void listen(String message) { System.out.println("Consuming " + message); } } |
We can see that the message is retrieved and displayed in the console window in intellij
If you then take a look at the dd.hello queue it will now be empty
Most web applications use Javascript Object Notation (JSON) which is a light-weight data-interchange format for exchaning data between applications. It uses a human readable form to transmit data objects consisting of attribute-value pairs and array data types.
I have added some dependencies to Maven and created a dd.employee queue
![]() |
![]() |
There are many ways to code RabbitMQ and Json this is just one way and I have I have kept the code to a minimum, solely to focus on the RabbitMQ code.
Producer | |
Employee.java | package uk.co.datadisk.rabbitmq_producer.entity; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import uk.co.datadisk.rabbitmq_producer.CustomLocalDateSerializer; import java.time.LocalDate; public class Employee { @JsonProperty("employee_Id") private String employeeId; private String name; @JsonProperty("Date_of_Birth") @JsonSerialize(using = CustomLocalDateSerializer.class) private LocalDate dob; public Employee(String employeeId, String name, LocalDate dob) { this.employeeId = employeeId; this.name = name; this.dob = dob; } public String getEmployeeId() { return employeeId; } public void setEmployeeId(String employeeId) { this.employeeId = employeeId; } public String getName() { return name; } public void setName(String name) { this.name = name; } public LocalDate getDob() { return dob; } public void setDob(LocalDate dob) { this.dob = dob; } } |
CustomLocalDateSerializer.java | package uk.co.datadisk.rabbitmq_producer; import java.io.IOException; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.ser.std.StdSerializer; public class CustomLocalDateSerializer extends StdSerializer<LocalDate> { private static final long serialVersionUID = 1L; private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MMM-dd"); public CustomLocalDateSerializer() { this(null); } public CustomLocalDateSerializer(Class<LocalDate> t) { super(t); } @Override public void serialize(LocalDate value, JsonGenerator gen, SerializerProvider arg2) throws IOException, JsonProcessingException { gen.writeString(formatter.format(value)); } } |
EmployeeJsonProducer.java | package uk.co.datadisk.rabbitmq_producer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; import uk.co.datadisk.rabbitmq_producer.entity.Employee; @Service public class EmployeeJsonProducer { private RabbitTemplate rabbitTemplate; private ObjectMapper objectMapper = new ObjectMapper(); public EmployeeJsonProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendMessage(Employee emp) throws JsonProcessingException { var json = objectMapper.writeValueAsString(emp); rabbitTemplate.convertAndSend("dd.employee", json); } } |
RabbitmqProducerApplication.java | package uk.co.datadisk.rabbitmq_producer; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import uk.co.datadisk.rabbitmq_producer.entity.Employee; import java.time.LocalDate; @SpringBootApplication public class RabbitmqProducerApplication implements CommandLineRunner { private final EmployeeJsonProducer employeeJsonProducer; public RabbitmqProducerApplication(EmployeeJsonProducer employeeJsonProducer) { this.employeeJsonProducer = employeeJsonProducer; } public static void main(String[] args) { SpringApplication.run(RabbitmqProducerApplication.class, args); } @Override public void run(String... args) throws Exception { System.out.println("Sending 5 X Employees to RabbitMQ"); for (int i = 0; i < 5; i++) { var e = new Employee("emp " + 1, "Employee " + i, LocalDate.now()); employeeJsonProducer.sendMessage(e); } } } |
Consumer | |
Employee.java | package uk.co.datadisk.rabbitmq_consumer.entity; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import uk.co.datadisk.rabbitmq_consumer.CustomLocalDateDeserializer; import java.time.LocalDate; public class Employee { @JsonProperty("employee_Id") private String employeeId; private String name; @JsonProperty("Date_of_Birth") @JsonDeserialize(using = CustomLocalDateDeserializer.class) private LocalDate dob; public Employee() { } public Employee(String employeeId, String name, LocalDate dob) { this.employeeId = employeeId; this.name = name; this.dob = dob; } public String getEmployeeId() { return employeeId; } public void setEmployeeId(String employeeId) { this.employeeId = employeeId; } public String getName() { return name; } public void setName(String name) { this.name = name; } public LocalDate getDob() { return dob; } public void setDob(LocalDate dob) { this.dob = dob; } @Override public String toString() { return "Employee{" + "employeeId='" + employeeId + '\'' + ", name='" + name + '\'' + ", dob=" + dob + '}'; } } |
CustomLocalDateDeserializer.java | package uk.co.datadisk.rabbitmq_consumer; import java.io.IOException; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.deser.std.StdDeserializer; public class CustomLocalDateDeserializer extends StdDeserializer<LocalDate> { private static final long serialVersionUID = 1L; private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MMM-dd"); public CustomLocalDateDeserializer() { super(LocalDate.class); } @Override public LocalDate deserialize(JsonParser parser, DeserializationContext context) throws IOException { return LocalDate.parse(parser.readValueAs(String.class), formatter); } } |
EmployeeJsonConsumer.java | package uk.co.datadisk.rabbitmq_consumer; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import uk.co.datadisk.rabbitmq_consumer.entity.Employee; import java.io.IOException; @Service public class EmployeeJsonConsumer { private ObjectMapper objectMapper = new ObjectMapper(); private static final Logger log = LoggerFactory.getLogger(EmployeeJsonConsumer.class); @RabbitListener(queues = "dd.employee") public void listen(String message) throws IOException { var emp = objectMapper.readValue(message, Employee.class); log.info("Employee is {}", emp); } } |
RabbitmqConsumerApplication.java | package uk.co.datadisk.rabbitmq_consumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RabbitmqConsumerApplication { public static void main(String[] args) { SpringApplication.run(RabbitmqConsumerApplication.class, args); } } |
Below is the dd.employee queue
![]() |
![]() |
When you run the consumer the logger output is below
In this section I cover how to publish and subscribe to RabbitMQ using a standard Java application (Java 11), firstly you need to add a dependency as per below
I got a few errors when using Intellij regarding error release version 5 not supported, try below to resolve
Let's take a look at the producer and consumer code, I have given a couple of options for the consumer
Producer | |
Producer example | package uk.co.datadisk.producer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // If the RabbitMQ does not use defaults // factory.setHost("localhost"); // factory.setPort(15678); // factory.setUsername("Guest"); // factory.setPassword("Guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // You have the option to create the queue // channel.queueDeclare("products_queue", false, false, false, null); String message = "Testing Java standalone"; channel.basicPublish("", "dd.standalone", null, message.getBytes()); channel.close(); connection.close(); } } |
Consumer | |
Consumer example | package uk.co.datadisk.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // Option - 1 // DefaultConsumer consumer = new DefaultConsumer(channel) { // @Override // public void handleDelivery( // String consumerTag, // Envelope envelope, // AMQP.BasicProperties properties, // byte[] body) throws IOException { // // String message = new String(body, "UTF-8"); // System.out.println("Message: " + message); // } // }; // channel.basicConsume("dd.standalone", true, consumer); // Option - 2 DeliverCallback deliverCallBack = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("Message: " + message); }; // exchange/queue, acknowledgment, deliverCallback, cancelCallback channel.basicConsume("dd.standalone", true, deliverCallBack, consumerTag -> {}); } } Note: make sure you don't close the channel or the connection for the consumer |