In this section we look at RabbitMQ exchanges, I will be covering 3 types of exchanges
Fanout Exchange |
the message will be routed to all queues that are binded to it, this is also know as publish and subscribe |
Direct Exchange |
if the binding key matches exactly then the message will be delivered to that queue |
Topic Exchange |
uses a routing pattern to determine where to route the message |
Headers Exchange |
uses message headers to determine where to route the message (least used) |
When you add a new exchange you can use the type drop down list to select what type of exchange you want to create. Remember that exchanges are binded to queues (see right-hand screenshot)
 |
 |
Fanout Exchange
Firstly I created two queues dd.hr.accounting and dd.hr.marketing, once the queues are created then you can create the exchange, specifying that it is a fanout exchange.
Once the exchange has been created we can bind the two queues to the fanout exchange
 |
 |
Producer |
HumanResourceProducer.java |
package uk.co.datadisk.rabbitmq_producer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_producer.entity.Employee;
@Service
public class HumanResourceProducer {
private RabbitTemplate rabbitTemplate;
private ObjectMapper objectMapper = new ObjectMapper();
public HumanResourceProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(Employee emp) throws JsonProcessingException {
var json = objectMapper.writeValueAsString(emp);
// exchange, routingKey, payload
rabbitTemplate.convertAndSend("dd.hr", "", json);
}
} |
RabbitmqProducerApplication.java |
package uk.co.datadisk.rabbitmq_producer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import uk.co.datadisk.rabbitmq_producer.entity.Employee;
import java.time.LocalDate;
@SpringBootApplication
public class RabbitmqProducerApplication implements CommandLineRunner {
private final HumanResourceProducer humanResourceProducer;
public RabbitmqProducerApplication(HumanResourceProducer humanResourceProducer) {
this.humanResourceProducer = humanResourceProducer;
}
public static void main(String[] args) {
SpringApplication.run(RabbitmqProducerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
System.out.println("Sending 5 X Employees to RabbitMQ");
for (int i = 0; i < 5; i++) {
var e = new Employee("emp " + 1, "Employee " + i, LocalDate.now());
humanResourceProducer.sendMessage(e);
}
}
} |
Consumer |
AccountingConsumer.java |
package uk.co.datadisk.rabbitmq_consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_consumer.entity.Employee;
import java.io.IOException;
@Service
public class AccountingConsumer {
private ObjectMapper objectMapper = new ObjectMapper();
private static final Logger log = LoggerFactory.getLogger(AccountingConsumer.class);
@RabbitListener(queues = "dd.hr.accounting")
public void listen(String message) throws IOException {
var emp = objectMapper.readValue(message, Employee.class);
log.info("Accounting Employee is {}", emp);
}
} |
MarketingConsumer.java |
package uk.co.datadisk.rabbitmq_consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_consumer.entity.Employee;
import java.io.IOException;
@Service
public class MarketingConsumer {
private ObjectMapper objectMapper = new ObjectMapper();
private static final Logger log = LoggerFactory.getLogger(MarketingConsumer.class);
@RabbitListener(queues = "dd.hr.marketing")
public void listen(String message) throws IOException {
var emp = objectMapper.readValue(message, Employee.class);
log.info("Marketing Employee is {}", emp);
}
} |
RabbitmqConsumerApplication.java |
package uk.co.datadisk.rabbitmq_consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitmqConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqConsumerApplication.class, args);
}
} |
When you run the producer you can see that both queues get 5 messages (left-hand screenshot), and when you run the consumers you get the output in the screenshot on the right-hand side.
Direct Exchange
Direct exchange using a routing key to route the messages to a specific queue, I have created a two queues and binded them to the direct exchange, notice the routing keys.
 |
 |
Producer |
Picture.java |
package uk.co.datadisk.rabbitmq_consumer.entity;
public class Picture {
private String name;
private String type;
private String source;
private long size;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
public long getSize() {
return size;
}
public void setSize(long size) {
this.size = size;
}
@Override
public String toString() {
return "Picture{" +
"name='" + name + '\'' +
", type='" + type + '\'' +
", source='" + source + '\'' +
", size=" + size +
'}';
}
} |
PictureProducer.java |
package uk.co.datadisk.rabbitmq_producer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_producer.entity.Picture;
@Service
public class PictureProducer {
private RabbitTemplate rabbitTemplate;
private ObjectMapper objectMapper = new ObjectMapper();
public PictureProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(Picture p) throws JsonProcessingException {
var json = objectMapper.writeValueAsString(p);
rabbitTemplate.convertAndSend("dd.picture", p.getType(), json);
}
} |
RabbitmqProducerApplication.java |
package uk.co.datadisk.rabbitmq_producer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import uk.co.datadisk.rabbitmq_producer.entity.Picture;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
@SpringBootApplication
public class RabbitmqProducerApplication implements CommandLineRunner {
private final PictureProducer pictureProducer;
private final List<String> SOURCES = List.of("mobile", "types");
private final List<String> TYPES = List.of("jpg", "png", "svg");
public RabbitmqProducerApplication(PictureProducer pictureProducer) {
this.pictureProducer = pictureProducer;
}
public static void main(String[] args) {
SpringApplication.run(RabbitmqProducerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
for (int i = 0; i < 10; i++) {
var p = new Picture();
p.setName("Picture " + i);
p.setSize(ThreadLocalRandom.current().nextLong(1, 10001));
p.setSource(SOURCES.get(i % SOURCES.size()));
p.setType(TYPES.get(i % TYPES.size()));
pictureProducer.sendMessage(p);
}
}
} |
Consumer |
PictureImageConsumer.java |
package uk.co.datadisk.rabbitmq_consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_consumer.entity.Picture;
import java.io.IOException;
@Service
public class PictureImageConsumer {
private ObjectMapper objectMapper = new ObjectMapper();
private final Logger log = LoggerFactory.getLogger(PictureImageConsumer.class);
@RabbitListener(queues = "dd.picture.image")
public void listen(String message) throws IOException {
var p = objectMapper.readValue(message, Picture.class);
log.info("Image: {}", p);
}
} |
PictureVectorConsumer.java |
package uk.co.datadisk.rabbitmq_consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_consumer.entity.Picture;
import java.io.IOException;
@Service
public class PictureVectorConsumer {
private ObjectMapper objectMapper = new ObjectMapper();
private final Logger log = LoggerFactory.getLogger(PictureVectorConsumer.class);
@RabbitListener(queues = "dd.picture.vector")
public void listen(String message) throws IOException {
var p = objectMapper.readValue(message, Picture.class);
log.info("Vector: {}", p);
}
} |
You should see the messages being directed on the correct queue (left-hand screenshot), the ouput is show in the right-hand screenshot
Topic Exchange
Topic exchange uses routing pattern to route messages to a specific queue, you can have a mutlipe criteria, you can use the below special characters for the pattern
- * - subsitute exactly one word
- # - subsitute zero or more words
I have created a number of queues and an exchanged and bind them as below, notice the routing keys, I am using a mxiture of different patterns
 |
 |
Producer |
PictureProducer2.java |
package uk.co.datadisk.rabbitmq_producer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_producer.entity.Picture;
@Service
public class PictureProducer2 {
private RabbitTemplate rabbitTemplate;
private ObjectMapper objectMapper = new ObjectMapper();
public PictureProducer2(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(Picture p) throws JsonProcessingException {
var sb = new StringBuilder();
sb.append(p.getSource());
sb.append(".");
if ( p.getSize() > 4000 ){
sb.append("large");
} else {
sb.append("small");
}
sb.append(".");
sb.append(p.getType());
var json = objectMapper.writeValueAsString(p);
var routingKey = sb.toString();
rabbitTemplate.convertAndSend("dd.picture", routingKey, json);
}
} |
RabbitmqProducerApplication.java |
package uk.co.datadisk.rabbitmq_producer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import uk.co.datadisk.rabbitmq_producer.entity.Picture;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
@SpringBootApplication
public class RabbitmqProducerApplication implements CommandLineRunner {
private final PictureProducer2 pictureProducer2;
private final List<String> SOURCES = List.of("mobile", "types");
private final List<String> TYPES = List.of("jpg", "png", "svg");
public RabbitmqProducerApplication(PictureProducer2 pictureProducer2) {
this.pictureProducer2 = pictureProducer2;
}
public static void main(String[] args) {
SpringApplication.run(RabbitmqProducerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
for (int i = 0; i < 10; i++) {
var p = new Picture();
p.setName("Picture " + i);
p.setSize(ThreadLocalRandom.current().nextLong(1, 10001));
p.setSource(SOURCES.get(i % SOURCES.size()));
p.setType(TYPES.get(i % TYPES.size()));
pictureProducer2.sendMessage(p);
}
}
} |
Consumer |
PictureImageConsumer.java |
package uk.co.datadisk.rabbitmq_consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_consumer.entity.Picture;
import java.io.IOException;
@Service
public class PictureImageConsumer {
private ObjectMapper objectMapper = new ObjectMapper();
private final Logger log = LoggerFactory.getLogger(PictureImageConsumer.class);
// Use the below queues to retrieve messages from other queues
// @RabbitListener(queues = "dd.picture.filter")
// @RabbitListener(queues = "dd.picture.log")
// @RabbitListener(queues = "dd.picture.vector")
@RabbitListener(queues = "dd.picture.image")
public void listen(String message) throws IOException {
var p = objectMapper.readValue(message, Picture.class);
log.info("Image: {}", p);
}
} |
Headers Exchange
Lastly we look at headers exchange which is the least popular of all the exchanges, this basically uses the header/s that are sent with the message to determine where to route the message. You can see from the headers exchange I have setup (left-hand screenshot) there are no routing keys but arguments, firstly we setup a x-match which can be any or all and then you setup your arguments which can have any name, in my example I use item1 and item2 but they can be anything you like, the queues are normal queues as we have seen before.
I am only going to list the producer code as the consumer code is the same as we have seen many times, I have also coded two options on how to send messages to an headers exchange.
Producer |
HeadersProducer.java |
package uk.co.datadisk.rabbitmq_producer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_producer.entity.Picture;
@Service
public class HeadersProducer {
private RabbitTemplate rabbitTemplate;
private ObjectMapper objectMapper = new ObjectMapper();
public HeadersProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(Picture p) throws JsonProcessingException {
// Option - 1
var json = objectMapper.writeValueAsString(p);
// there are other properties you can send for example priority, etc.
rabbitTemplate.convertAndSend("dd.headers", "", json, m -> {
m.getMessageProperties().getHeaders().put("item1", p.getType());
m.getMessageProperties().getHeaders().put("item2", p.getSource());
return m;
});
// Option - 2
// MessageProperties messageProperties = new MessageProperties();
// messageProperties.setHeader("item1", p.getType());
// messageProperties.setHeader("item2", p.getSource());
// MessageConverter messageConverter = new SimpleMessageConverter();
// Message message = messageConverter.toMessage("Hello World", messageProperties);
// rabbitTemplate.send("dd.headers", "", message);
}
} |
RabbitmqProducerApplication.java |
package uk.co.datadisk.rabbitmq_producer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import uk.co.datadisk.rabbitmq_producer.entity.Picture;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
@SpringBootApplication
public class RabbitmqProducerApplication implements CommandLineRunner {
private final HeadersProducer headersProducer;
private final List<String> SOURCES = List.of("mobile", "web");
private final List<String> TYPES = List.of("jpg", "png", "svg");
public RabbitmqProducerApplication(HeadersProducer headersProducer) {
this.headersProducer = headersProducer;
}
public static void main(String[] args) {
SpringApplication.run(RabbitmqProducerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
for (int i = 0; i < 10; i++) {
var p = new Picture();
p.setName("Picture " + i);
p.setSize(ThreadLocalRandom.current().nextLong(9001, 10001));
p.setSource(SOURCES.get(i % SOURCES.size()));
p.setType(TYPES.get(i % TYPES.size()));
System.out.println("Sending " + p.getType());
headersProducer.sendMessage(p);
}
}
} |