Exchanges

In this section we look at RabbitMQ exchanges, I will be covering 3 types of exchanges


Fanout Exchange the message will be routed to all queues that are binded to it, this is also know as publish and subscribe
Direct Exchange if the binding key matches exactly then the message will be delivered to that queue
Topic Exchange uses a routing pattern to determine where to route the message
Headers Exchange uses message headers to determine where to route the message (least used)

When you add a new exchange you can use the type drop down list to select what type of exchange you want to create. Remember that exchanges are binded to queues (see right-hand screenshot)

Fanout Exchange

Firstly I created two queues dd.hr.accounting and dd.hr.marketing, once the queues are created then you can create the exchange, specifying that it is a fanout exchange.

Once the exchange has been created we can bind the two queues to the fanout exchange

Producer
HumanResourceProducer.java
package uk.co.datadisk.rabbitmq_producer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_producer.entity.Employee;

@Service
public class HumanResourceProducer {

    private RabbitTemplate rabbitTemplate;

    private ObjectMapper objectMapper = new ObjectMapper();

    public HumanResourceProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(Employee emp) throws JsonProcessingException {
        var json = objectMapper.writeValueAsString(emp);
        // exchange, routingKey, payload
        rabbitTemplate.convertAndSend("dd.hr", "", json);
    }
}
RabbitmqProducerApplication.java
package uk.co.datadisk.rabbitmq_producer;

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import uk.co.datadisk.rabbitmq_producer.entity.Employee;

import java.time.LocalDate;

@SpringBootApplication
public class RabbitmqProducerApplication implements CommandLineRunner {

    private final HumanResourceProducer humanResourceProducer;

    public RabbitmqProducerApplication(HumanResourceProducer humanResourceProducer) {
        this.humanResourceProducer = humanResourceProducer;
    }

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqProducerApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("Sending 5 X Employees to RabbitMQ");
        for (int i = 0; i < 5; i++) {
            var e = new Employee("emp " + 1, "Employee " + i, LocalDate.now());
            humanResourceProducer.sendMessage(e);
        }
    }
}
Consumer
AccountingConsumer.java
package uk.co.datadisk.rabbitmq_consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_consumer.entity.Employee;

import java.io.IOException;

@Service
public class AccountingConsumer {

    private ObjectMapper objectMapper = new ObjectMapper();

    private static final Logger log = LoggerFactory.getLogger(AccountingConsumer.class);

    @RabbitListener(queues = "dd.hr.accounting")
    public void listen(String message) throws IOException {
        var emp = objectMapper.readValue(message, Employee.class);
        log.info("Accounting Employee is {}", emp);
    }
}
MarketingConsumer.java
package uk.co.datadisk.rabbitmq_consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_consumer.entity.Employee;

import java.io.IOException;

@Service
public class MarketingConsumer {

    private ObjectMapper objectMapper = new ObjectMapper();

    private static final Logger log = LoggerFactory.getLogger(MarketingConsumer.class);

    @RabbitListener(queues = "dd.hr.marketing")
    public void listen(String message) throws IOException {
        var emp = objectMapper.readValue(message, Employee.class);
        log.info("Marketing Employee is {}", emp);
    }
}
RabbitmqConsumerApplication.java
package uk.co.datadisk.rabbitmq_consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitmqConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqConsumerApplication.class, args);
    }

}

When you run the producer you can see that both queues get 5 messages (left-hand screenshot), and when you run the consumers you get the output in the screenshot on the right-hand side.

Direct Exchange

Direct exchange using a routing key to route the messages to a specific queue, I have created a two queues and binded them to the direct exchange, notice the routing keys.

Producer
Picture.java
package uk.co.datadisk.rabbitmq_consumer.entity;

public class Picture {

    private String name;
    private String type;
    private String source;
    private long size;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getSource() {
        return source;
    }

    public void setSource(String source) {
        this.source = source;
    }

    public long getSize() {
        return size;
    }

    public void setSize(long size) {
        this.size = size;
    }

    @Override
    public String toString() {
        return "Picture{" +
                "name='" + name + '\'' +
                ", type='" + type + '\'' +
                ", source='" + source + '\'' +
                ", size=" + size +
                '}';
    }
}
PictureProducer.java
package uk.co.datadisk.rabbitmq_producer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_producer.entity.Picture;

@Service
public class PictureProducer {

    private RabbitTemplate rabbitTemplate;
    private ObjectMapper objectMapper = new ObjectMapper();

    public PictureProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(Picture p) throws JsonProcessingException {
        var json = objectMapper.writeValueAsString(p);
        rabbitTemplate.convertAndSend("dd.picture", p.getType(), json);
    }
}
RabbitmqProducerApplication.java
package uk.co.datadisk.rabbitmq_producer;

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import uk.co.datadisk.rabbitmq_producer.entity.Picture;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

@SpringBootApplication
public class RabbitmqProducerApplication implements CommandLineRunner {

    private final PictureProducer pictureProducer;

    private final List<String> SOURCES = List.of("mobile", "types");
    private final List<String> TYPES = List.of("jpg", "png", "svg");

    public RabbitmqProducerApplication(PictureProducer pictureProducer) {
        this.pictureProducer = pictureProducer;
    }

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqProducerApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {

        for (int i = 0; i < 10; i++) {
            var p = new Picture();
            p.setName("Picture " + i);
            p.setSize(ThreadLocalRandom.current().nextLong(1, 10001));
            p.setSource(SOURCES.get(i % SOURCES.size()));
            p.setType(TYPES.get(i % TYPES.size()));

            pictureProducer.sendMessage(p);
        }
    }
}
Consumer
PictureImageConsumer.java
package uk.co.datadisk.rabbitmq_consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_consumer.entity.Picture;

import java.io.IOException;

@Service
public class PictureImageConsumer {

    private ObjectMapper objectMapper = new ObjectMapper();

    private final Logger log = LoggerFactory.getLogger(PictureImageConsumer.class);

    @RabbitListener(queues = "dd.picture.image")
    public void listen(String message) throws IOException {
        var p = objectMapper.readValue(message, Picture.class);
        log.info("Image: {}", p);
    }
}
PictureVectorConsumer.java
package uk.co.datadisk.rabbitmq_consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_consumer.entity.Picture;

import java.io.IOException;

@Service
public class PictureVectorConsumer {

    private ObjectMapper objectMapper = new ObjectMapper();

    private final Logger log = LoggerFactory.getLogger(PictureVectorConsumer.class);

    @RabbitListener(queues = "dd.picture.vector")
    public void listen(String message) throws IOException {
        var p = objectMapper.readValue(message, Picture.class);
        log.info("Vector: {}", p);
    }
}

You should see the messages being directed on the correct queue (left-hand screenshot), the ouput is show in the right-hand screenshot

Topic Exchange

Topic exchange uses routing pattern to route messages to a specific queue, you can have a mutlipe criteria, you can use the below special characters for the pattern

I have created a number of queues and an exchanged and bind them as below, notice the routing keys, I am using a mxiture of different patterns

Producer
PictureProducer2.java
package uk.co.datadisk.rabbitmq_producer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_producer.entity.Picture;

@Service
public class PictureProducer2 {

    private RabbitTemplate rabbitTemplate;
    private ObjectMapper objectMapper = new ObjectMapper();

    public PictureProducer2(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(Picture p) throws JsonProcessingException {
        var sb = new StringBuilder();
        sb.append(p.getSource());
        sb.append(".");

        if ( p.getSize() > 4000 ){
            sb.append("large");
        } else {
            sb.append("small");
        }
        sb.append(".");
        sb.append(p.getType());

        var json = objectMapper.writeValueAsString(p);
        var routingKey = sb.toString();
        rabbitTemplate.convertAndSend("dd.picture", routingKey, json);
    }
}
RabbitmqProducerApplication.java
package uk.co.datadisk.rabbitmq_producer;

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import uk.co.datadisk.rabbitmq_producer.entity.Picture;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

@SpringBootApplication
public class RabbitmqProducerApplication implements CommandLineRunner {

    private final PictureProducer2 pictureProducer2;

    private final List<String> SOURCES = List.of("mobile", "types");
    private final List<String> TYPES = List.of("jpg", "png", "svg");

    public RabbitmqProducerApplication(PictureProducer2 pictureProducer2) {
        this.pictureProducer2 = pictureProducer2;
    }

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqProducerApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {

        for (int i = 0; i < 10; i++) {
            var p = new Picture();
            p.setName("Picture " + i);
            p.setSize(ThreadLocalRandom.current().nextLong(1, 10001));
            p.setSource(SOURCES.get(i % SOURCES.size()));
            p.setType(TYPES.get(i % TYPES.size()));

            pictureProducer2.sendMessage(p);
        }
    }
}
Consumer
PictureImageConsumer.java
package uk.co.datadisk.rabbitmq_consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_consumer.entity.Picture;

import java.io.IOException;

@Service
public class PictureImageConsumer {

    private ObjectMapper objectMapper = new ObjectMapper();

    private final Logger log = LoggerFactory.getLogger(PictureImageConsumer.class);

    // Use the below queues to retrieve messages from other queues
    
    // @RabbitListener(queues = "dd.picture.filter")
    // @RabbitListener(queues = "dd.picture.log")
    // @RabbitListener(queues = "dd.picture.vector")

    @RabbitListener(queues = "dd.picture.image")
    public void listen(String message) throws IOException {
        var p = objectMapper.readValue(message, Picture.class);
        log.info("Image: {}", p);
    }
}

Headers Exchange

Lastly we look at headers exchange which is the least popular of all the exchanges, this basically uses the header/s that are sent with the message to determine where to route the message. You can see from the headers exchange I have setup (left-hand screenshot) there are no routing keys but arguments, firstly we setup a x-match which can be any or all and then you setup your arguments which can have any name, in my example I use item1 and item2 but they can be anything you like, the queues are normal queues as we have seen before.

I am only going to list the producer code as the consumer code is the same as we have seen many times, I have also coded two options on how to send messages to an headers exchange.

Producer
HeadersProducer.java
package uk.co.datadisk.rabbitmq_producer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_producer.entity.Picture;

@Service
public class HeadersProducer {

    private RabbitTemplate rabbitTemplate;
    private ObjectMapper objectMapper = new ObjectMapper();

    public HeadersProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(Picture p) throws JsonProcessingException {

// Option - 1

        var json = objectMapper.writeValueAsString(p);

		// there are other properties you can send for example priority, etc.
        rabbitTemplate.convertAndSend("dd.headers", "",  json, m -> {
            m.getMessageProperties().getHeaders().put("item1", p.getType());
            m.getMessageProperties().getHeaders().put("item2", p.getSource());  
            return m;
        });

// Option - 2
//        MessageProperties messageProperties = new MessageProperties();
//        messageProperties.setHeader("item1", p.getType());
//        messageProperties.setHeader("item2", p.getSource());
//        MessageConverter messageConverter = new SimpleMessageConverter();
//        Message message = messageConverter.toMessage("Hello World", messageProperties);
//        rabbitTemplate.send("dd.headers", "", message);
    }
}
RabbitmqProducerApplication.java
package uk.co.datadisk.rabbitmq_producer;

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import uk.co.datadisk.rabbitmq_producer.entity.Picture;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

@SpringBootApplication
public class RabbitmqProducerApplication implements CommandLineRunner {

    private final HeadersProducer headersProducer;

    private final List<String> SOURCES = List.of("mobile", "web");
    private final List<String> TYPES = List.of("jpg", "png", "svg");

    public RabbitmqProducerApplication(HeadersProducer headersProducer) {
        this.headersProducer = headersProducer;
    }

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqProducerApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {

        for (int i = 0; i < 10; i++) {
            var p = new Picture();

            p.setName("Picture " + i);
            p.setSize(ThreadLocalRandom.current().nextLong(9001, 10001));
            p.setSource(SOURCES.get(i % SOURCES.size()));
            p.setType(TYPES.get(i % TYPES.size()));

            System.out.println("Sending " + p.getType());
            headersProducer.sendMessage(p);
        }
    }
}