Below are some tips on how to get the best benefits from Sping Boot and RabbitMQ
You can use Spring Boot to create a JSON message converter
| RabbitmqConfig.java | @Configuration
public class RabbitmqConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(converter);
return template;
}
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
} |
| DummyProducer.java | @Service
public class DummyProducer {
// Will use the JSON convert we have configured in RabbitmqConfig.java
private RabbitTemplate rabbitTemplate;
public DummyProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
// DummyMessage is a simple POJO
public void sendDummy(DummyMessage message) {
rabbitTemplate.convertAndSend("x.dummy", "", message);
}
} |
There might be time where you want to distribute the load amoung the consumers when you are using concurrency, there is a property value that you can set, below will prefetch 50 messages to each consumer, this feature is useful if you have fast producers and slow consumers. The default value of prefetch is 250 if not specified.
| Prefetch example | @RabbitListener(queues = "dd.hr.accounting", concurrency = "2")
public void listen(String message) throws IOException {
var emp = objectMapper.readValue(message, Employee.class);
log.info("Accounting Employee is {}", emp);
} |
There might be times when you want to have different prefetch values on different consumers, we can create prefetch container factory to use with our listener.
| RabbitmqConfig.java | @Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchOneContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
var factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setPrefetchCount(50);
return factory;
} |
| Multiple Prefetch Consumer | @RabbitListener(queues = "q.scheduler", concurrency = "2", containerFactory = "prefetchOneContainerFactory")
public void listenScheduler(DummyMessage message) throws InterruptedException {
log.info("Listening scheduler {}", message);
} |
There are times that you might want the listener to send to another exchange/queue, you can use the @SendTo annotation
| SendTo example | @RabbitHandler
@SendTo("x.invoice.cancel/")
public PaymentCancelStatus handleInvoiceCancelled(InvoiceCancelledMessage message) {
var randomStatus = ThreadLocalRandom.current().nextBoolean();
return new PaymentCancelStatus(randomStatus, LocalDate.now(), message.getInvoiceNumber());
} |
You have an option to setup a default handler
| Introduction | @Service
@RabbitListener(queues = "q.invoice")
public class InvoiceConsumer {
.....
@RabbitHandler(isDefault = true)
public void handleDefault(Object message) {
log.info("on handleDefault : {}", message);
}
} |
Using Java to Create Exchanges, Queues and Bindings
You can use Spring Boot to create exchanges and queues and also to perform binding.
| Creating Exchange, Queue and Binding example | @Configuration
public class RabbitmqSchemaConfig {
@Bean
public FanoutExchange createFanoutExchange() {
return new FanoutExchange("dd.another-dummy-exchange", true, false, null);
}
@Bean
public Queue createQueue() {
return new Queue("dd.another-dummy-queue");
}
@Bean
public Binding createBinding() {
return new Binding("dd.another-dummy-queue", DestinationType.QUEUE, "dd.another-dummy-exchange", "", null);
return BindingBuilder.bind(createQueue()).to(createFanoutExchange());
}
// Below will perform the all the methods above in one go
@Bean
public Declarables createRabbitmqSchema() {
return new Declarables(new FanoutExchange("dd.another-dummy-exchange", true, false, null),
new Queue("dd.another-dummy-queue"),
new Binding("dd.another-dummy-queue", DestinationType.QUEUE, "dd.another-dummy-exchange", "", null));
}
} |
You can also configure a consumer to create the queue and bindings if one does not exist
| Introduction | @RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dd.auto-dummy-queue", durable = "true"),
exchange = @Exchange(name = "dd.auto-dummy-exchange",
type = ExchangeTypes.DIRECT, durable = "true"),
key = "routing-key",
ignoreDeclarationExceptions = "true"))
public void listenDummy(DummyMessage message) {
log.info("{}", message);
} |