Messages can live forever if they are not consumed, you can use something called a Dead Letter Exchange (DLX) which accepts messages when the below occurs
DLX is a normal exchange that another consumer could process message. Below is an example on not using an DLX:
MyPictureImageConsumer.java (with exception) | package uk.co.datadisk.rabbitmq_consumer; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import uk.co.datadisk.rabbitmq_consumer.entity.Picture; import java.io.IOException; @Service public class MyPictureImageConsumer { private ObjectMapper objectMapper = new ObjectMapper(); private final Logger log = LoggerFactory.getLogger(MyPictureImageConsumer.class); @RabbitListener(queues = "dd.picture.image") public void listen(String message) throws IOException { var p = objectMapper.readValue(message, Picture.class); if (p.getSize() > 9000) { throw new IllegalArgumentException("Picture size is too large: " + p); } log.info("Image: {}", p); } } |
The way we have coded the above an infinite loop runs as the code throws an exception but the message remains on the queue, then we throw an exception again, etc, etc.
Now we create a new DLX (normal exchange) and a new queue that points to the DLX, if you look you can see has an extra feature called DLX
![]() |
![]() |
MyPictureImageConsumer.java (using AMPQ for exception - AUTOMATIC) | package uk.co.datadisk.rabbitmq_consumer; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpRejectAndDontRequeueException; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import uk.co.datadisk.rabbitmq_consumer.entity.Picture; import java.io.IOException; @Service public class MyPictureImageConsumer { private ObjectMapper objectMapper = new ObjectMapper(); private final Logger log = LoggerFactory.getLogger(MyPictureImageConsumer.class); @RabbitListener(queues = "dd.picture.image") public void listen(String message) throws IOException { var p = objectMapper.readValue(message, Picture.class); if (p.getSize() > 9000) { //throw new IllegalArgumentException("Picture size is too large: " + p); throw new AmqpRejectAndDontRequeueException("Picture size is too large: " + p); } log.info("Image: {}", p); } } |
This time the message is automatically passed to the DLX by RabbitMQ and thus avoiding the infinite loop issue we had above. The message will remain on the DLX until we consume it but at least it does not break our application.
A second way is to use RabbitMQ channel to reject the message, we need to setup two properties in the consumer spring framwork
MyPictureImageConsumer.java (using RabbitMQ Channel for exception - MANUAL) | package uk.co.datadisk.rabbitmq_consumer; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import uk.co.datadisk.rabbitmq_consumer.entity.Picture; import com.rabbitmq.client.Channel; import java.io.IOException; @Service public class MyPictureImageConsumer { private ObjectMapper objectMapper = new ObjectMapper(); private final Logger log = LoggerFactory.getLogger(MyPictureImageConsumer.class); @RabbitListener(queues = "dd.picture.image") public void listen(Message message, Channel channel) throws IOException { var p = objectMapper.readValue(message.getBody(), Picture.class); if (p.getSize() > 9000) { // throw new IllegalArgumentException("Picture size is too large: " + p); // throw new AmqpRejectAndDontRequeueException("Picture size is too large: " + p); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } log.info("Image: {}", p); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } |
You can also add a Time To Live (TTL) to a queue, which means that the message will be moved to the DLX after 5 seconds if not consumed.
Retry Mechanism using Message Headers
If we get an error we can retry to send the message after a brief period and a specific a number of times before sending to the DLX. For this example we create a number of exchanges and queues.
![]() |
![]() |
The bindings are pretty straight forward, below is the bindings for the q.guideline.image.wait queue
The wait queue uses the TTL feature to put back a message on the work after a specific period
In this example we are going to examine the message header properties in order to see if the message has been rejected before, look at the x-death header properties like count, x-first-death-exchange, x-first-death-queue and x-first-death-reason all can be used to determine where the message originally came from and how many times its been rejected, etc.
Producer | |
RetryPictureProducer.java | package uk.co.datadisk.rabbitmq_producer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; import uk.co.datadisk.rabbitmq_producer.entity.Picture; @Service public class RetryPictureProducer { private RabbitTemplate rabbitTemplate; private ObjectMapper objectMapper = new ObjectMapper(); public RetryPictureProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendMessage(Picture p) throws JsonProcessingException { var json = objectMapper.writeValueAsString(p); rabbitTemplate.convertAndSend("x.guideline.work", p.getType(), json); } } |
RabbitmqProducerApplication.java | package uk.co.datadisk.rabbitmq_producer; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import uk.co.datadisk.rabbitmq_producer.entity.Picture; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @SpringBootApplication public class RabbitmqProducerApplication implements CommandLineRunner { private final RetryPictureProducer retryPictureProducer; private final List<String> SOURCES = List.of("mobile", "types"); private final List<String> TYPES = List.of("jpg", "png", "svg"); public RabbitmqProducerApplication(RetryPictureProducer retryPictureProducer) { this.retryPictureProducer = retryPictureProducer; } public static void main(String[] args) { SpringApplication.run(RabbitmqProducerApplication.class, args); } @Override public void run(String... args) throws Exception { for (int i = 0; i < 10; i++) { var p = new Picture(); p.setName("Picture " + i); p.setSize(ThreadLocalRandom.current().nextLong(8001, 10001)); p.setSource(SOURCES.get(i % SOURCES.size())); p.setType(TYPES.get(i % TYPES.size())); System.out.println("Sending picture size: " + p.getSize()); retryPictureProducer.sendMessage(p); } } } |
Consumer | |
RabbitmqHeader.java | package uk.co.datadisk.rabbitmq_consumer.rabbitmq; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; import org.apache.commons.lang3.StringUtils; public class RabbitmqHeader { private static final String KEYWORD_QUEUE_WAIT = "wait"; private List<RabbitmqHeaderXDeath> xDeaths = new ArrayList<>(2); private String xFirstDeathExchange = StringUtils.EMPTY; private String xFirstDeathQueue = StringUtils.EMPTY; private String xFirstDeathReason = StringUtils.EMPTY; @SuppressWarnings("unchecked") public RabbitmqHeader(Map<String, Object> headers) { if (headers != null) { var xFirstDeathExchange = Optional.ofNullable(headers.get("x-first-death-exchange")); var xFirstDeathQueue = Optional.ofNullable(headers.get("x-first-death-queue")); var xFirstDeathReason = Optional.ofNullable(headers.get("x-first-death-reason")); xFirstDeathExchange.ifPresent(s -> this.setxFirstDeathExchange(s.toString())); xFirstDeathQueue.ifPresent(s -> this.setxFirstDeathQueue(s.toString())); xFirstDeathReason.ifPresent(s -> this.setxFirstDeathReason(s.toString())); var xDeathHeaders = (List<Map<String, Object>>) headers.get("x-death"); if (xDeathHeaders != null) { for (Map<String, Object> x : xDeathHeaders) { RabbitmqHeaderXDeath hdrDeath = new RabbitmqHeaderXDeath(); var reason = Optional.ofNullable(x.get("reason")); var count = Optional.ofNullable(x.get("count")); var exchange = Optional.ofNullable(x.get("exchange")); var queue = Optional.ofNullable(x.get("queue")); var routingKeys = Optional.ofNullable(x.get("routing-keys")); var time = Optional.ofNullable(x.get("time")); reason.ifPresent(s -> hdrDeath.setReason(s.toString())); count.ifPresent(s -> hdrDeath.setCount(Integer.parseInt(s.toString()))); exchange.ifPresent(s -> hdrDeath.setExchange(s.toString())); queue.ifPresent(s -> hdrDeath.setQueue(s.toString())); routingKeys.ifPresent(r -> { var listR = (List<String>) r; hdrDeath.setRoutingKeys(listR); }); time.ifPresent(d -> hdrDeath.setTime((Date) d)); xDeaths.add(hdrDeath); } } } } public int getFailedRetryCount() { // get from queue "wait" for (var xDeath : xDeaths) { if (xDeath.getExchange().toLowerCase().endsWith(KEYWORD_QUEUE_WAIT) && xDeath.getQueue().toLowerCase().endsWith(KEYWORD_QUEUE_WAIT)) { return xDeath.getCount(); } } return 0; } public List<RabbitmqHeaderXDeath> getxDeaths() { return xDeaths; } public String getxFirstDeathExchange() { return xFirstDeathExchange; } public String getxFirstDeathQueue() { return xFirstDeathQueue; } public String getxFirstDeathReason() { return xFirstDeathReason; } public void setxDeaths(List<RabbitmqHeaderXDeath> xDeaths) { this.xDeaths = xDeaths; } public void setxFirstDeathExchange(String xFirstDeathExchange) { this.xFirstDeathExchange = xFirstDeathExchange; } public void setxFirstDeathQueue(String xFirstDeathQueue) { this.xFirstDeathQueue = xFirstDeathQueue; } public void setxFirstDeathReason(String xFirstDeathReason) { this.xFirstDeathReason = xFirstDeathReason; } } |
RabbitmqHeaderXDeath.java | package uk.co.datadisk.rabbitmq_consumer.rabbitmq; import java.util.Date; import java.util.List; public class RabbitmqHeaderXDeath { private int count; private String exchange; private String queue; private String reason; private List<String> routingKeys; private Date time; @Override public boolean equals(Object obj) { if (this == obj) { return true; } if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } RabbitmqHeaderXDeath other = (RabbitmqHeaderXDeath) obj; if (count != other.count) { return false; } if (exchange == null) { if (other.exchange != null) { return false; } } else if (!exchange.equals(other.exchange)) { return false; } if (queue == null) { if (other.queue != null) { return false; } } else if (!queue.equals(other.queue)) { return false; } if (reason == null) { if (other.reason != null) { return false; } } else if (!reason.equals(other.reason)) { return false; } if (routingKeys == null) { if (other.routingKeys != null) { return false; } } else if (!routingKeys.equals(other.routingKeys)) { return false; } if (time == null) { if (other.time != null) { return false; } } else if (!time.equals(other.time)) { return false; } return true; } public int getCount() { return count; } public String getExchange() { return exchange; } public String getQueue() { return queue; } public String getReason() { return reason; } public List<String> getRoutingKeys() { return routingKeys; } public Date getTime() { return time; } @Override public int hashCode() { final int prime = 19; int result = 1; result = prime * result + count; result = prime * result + ((exchange == null) ? 0 : exchange.hashCode()); result = prime * result + ((queue == null) ? 0 : queue.hashCode()); result = prime * result + ((reason == null) ? 0 : reason.hashCode()); result = prime * result + ((routingKeys == null) ? 0 : routingKeys.hashCode()); result = prime * result + ((time == null) ? 0 : time.hashCode()); return result; } public void setCount(int count) { this.count = count; } public void setExchange(String exchange) { this.exchange = exchange; } public void setQueue(String queue) { this.queue = queue; } public void setReason(String reason) { this.reason = reason; } public void setRoutingKeys(List<String> routingKeys) { this.routingKeys = routingKeys; } public void setTime(Date time) { this.time = time; } } |
DlxProcessingErrorHandler.java | package uk.co.datadisk.rabbitmq_consumer; import uk.co.datadisk.rabbitmq_consumer.rabbitmq.RabbitmqHeader; import com.rabbitmq.client.Channel; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.lang.NonNull; import java.io.IOException; import java.util.Date; public class DlxProcessingErrorHandler { private static final Logger log = LoggerFactory.getLogger(DlxProcessingErrorHandler.class); @NonNull private String deadExchangeName; private int maxRetryCount = 3; // Constructor. Will retry for n times (default is 3) and on the next retry will // consider message as dead, put it on dead exchange with given dlxRoutingKey is null or empty. public DlxProcessingErrorHandler(String deadExchangeName) throws IllegalArgumentException { super(); if (StringUtils.isAnyEmpty(deadExchangeName)) { throw new IllegalArgumentException("Must define dlx exchange name"); } this.deadExchangeName = deadExchangeName; } // Constructor. Will retry for maxRetryCount times and on the next // retry will consider message as dead, put it on dead exchange with given dlxExchangeName and routingKey public DlxProcessingErrorHandler(String deadExchangeName, int maxRetryCount) { this(deadExchangeName); setMaxRetryCount(maxRetryCount); } public String getDeadExchangeName() { return deadExchangeName; } public int getMaxRetryCount() { return maxRetryCount; } /** * Handle AMQP message consume error. This default implementation will put * message to dead letter exchange for maxRetryCount times, thus * two variables are required when creating this object: * dlxExchangeName and dlxRoutingKey. * maxRetryCount is 3 by default, but you can set it using * setMaxRetryCount(int) */ public boolean handleErrorProcessingMessage(Message message, Channel channel) { var rabbitMqHeader = new RabbitmqHeader(message.getMessageProperties().getHeaders()); try { if (rabbitMqHeader.getFailedRetryCount() >= maxRetryCount) { // publish to dead and ack log.warn("[DEAD] Error at " + new Date() + " on retry " + rabbitMqHeader.getFailedRetryCount() + " for message " + message); channel.basicPublish(getDeadExchangeName(), message.getMessageProperties().getReceivedRoutingKey(), null, message.getBody()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } else { log.debug("[REQUEUE] Error at " + new Date() + " on retry " + rabbitMqHeader.getFailedRetryCount() + " for message " + message); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } return true; } catch (IOException e) { log.warn("[HANDLER-FAILED] Error at " + new Date() + " on retry " + rabbitMqHeader.getFailedRetryCount() + " for message " + message); } return false; } public void setMaxRetryCount(int maxRetryCount) throws IllegalArgumentException { if (maxRetryCount > 1000) { throw new IllegalArgumentException("max retry must between 0-1000"); } this.maxRetryCount = maxRetryCount; } } |
RetryImageConsumer.java | package uk.co.datadisk.rabbitmq_consumer; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import uk.co.datadisk.rabbitmq_consumer.entity.Picture; @Service public class RetryImageConsumer { private static final String DEAD_EXCHANGE_NAME = "x.guideline.dead"; private static final Logger log = LoggerFactory.getLogger(RetryImageConsumer.class); private DlxProcessingErrorHandler dlxProcessingErrorHandler; private ObjectMapper objectMapper; public RetryImageConsumer() { this.objectMapper = new ObjectMapper(); this.dlxProcessingErrorHandler = new DlxProcessingErrorHandler(DEAD_EXCHANGE_NAME); } @RabbitListener(queues = "q.guideline.image.work") public void listen(Message message, Channel channel) throws InterruptedException, JsonParseException, JsonMappingException, IOException { try { var p = objectMapper.readValue(message.getBody(), Picture.class); // process the image if (p.getSize() > 9000) { // throw exception, we will use DLX handler for retry mechanism throw new IOException("Size too large"); } else { log.info("Creating thumbnail & publishing : " + p); // you must acknowledge that message already processed channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } catch (IOException e) { log.warn("Error processing message : " + new String(message.getBody()) + " : " + e.getMessage()); dlxProcessingErrorHandler.handleErrorProcessingMessage(message, channel); } } } |
RetryVectorConsumer.java | package uk.co.datadisk.rabbitmq_consumer; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import uk.co.datadisk.rabbitmq_consumer.entity.Picture; @Service public class RetryVectorConsumer { private static final String DEAD_EXCHANGE_NAME = "x.guideline.dead"; private static final Logger log = LoggerFactory.getLogger(RetryVectorConsumer.class); private DlxProcessingErrorHandler dlxProcessingErrorHandler; private ObjectMapper objectMapper; public RetryVectorConsumer() { this.objectMapper = new ObjectMapper(); this.dlxProcessingErrorHandler = new DlxProcessingErrorHandler(DEAD_EXCHANGE_NAME); } @RabbitListener(queues = "q.guideline.vector.work") public void listen(Message message, Channel channel) throws InterruptedException, JsonParseException, JsonMappingException, IOException { try { var p = objectMapper.readValue(message.getBody(), Picture.class); // process the image if (p.getSize() > 9000) { // throw exception, we will use DLX handler for retry mechanism throw new IOException("Size too large"); } else { log.info("Convert to image, creating thumbnail, & publishing : " + p); // you must acknowledge that message already processed channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } catch (IOException e) { log.warn("Error processing message : " + new String(message.getBody()) + " : " + e.getMessage()); dlxProcessingErrorHandler.handleErrorProcessingMessage(message, channel); } } } |
Image sizes greater than 9000 will be placed in the wait queue, these will remain there for 30 mins
In this example we are going to AMQP to handle the retries, we will use a fanout exchange and create the queues as per below, this is nothing we have not seen before, some of the classes will be used from the above example
![]() |
![]() |
Producer | |
RetryEmployeeProducer.java | package uk.co.datadisk.rabbitmq_producer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; import uk.co.datadisk.rabbitmq_producer.entity.Employee; @Service public class RetryEmployeeProducer { private RabbitTemplate rabbitTemplate; private ObjectMapper objectMapper = new ObjectMapper(); public RetryEmployeeProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendMessage(Employee emp) throws JsonProcessingException { var json = objectMapper.writeValueAsString(emp); rabbitTemplate.convertAndSend("x.guideline2.work", "", json); } } |
RabbitmqProducerApplication.java | package uk.co.datadisk.rabbitmq_producer; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import uk.co.datadisk.rabbitmq_producer.entity.Employee; import java.time.LocalDate; @SpringBootApplication public class RabbitmqProducerApplication implements CommandLineRunner { private final RetryEmployeeProducer retryEmployeeProducer; public RabbitmqProducerApplication(RetryEmployeeProducer retryEmployeeProducer) { this.retryEmployeeProducer = retryEmployeeProducer; } public static void main(String[] args) { SpringApplication.run(RabbitmqProducerApplication.class, args); } @Override public void run(String... args) throws Exception { for (int i = 0; i < 10; i++) { Employee emp = new Employee("Employee" + i, null, LocalDate.now()); retryEmployeeProducer.sendMessage(emp); } } } |
Consumer | |
DlxFanoutProcessingErrorHandler.java | package uk.co.datadisk.rabbitmq_consumer; import java.io.IOException; import java.util.Date; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.lang.NonNull; import uk.co.datadisk.rabbitmq_consumer.rabbitmq.RabbitmqHeader; import com.rabbitmq.client.Channel; /** * * Generic class to handle RabbitMQ proccessing error that might occur on * try-catch. This will not handle invalid message conversion * though (for example if you has Employee JSON structure to process, but got * Animal JSON structure instead from Rabbit MQ queue). * */ public class DlxFanoutProcessingErrorHandler { private static final Logger log = LoggerFactory.getLogger(DlxFanoutProcessingErrorHandler.class); @NonNull private String deadExchangeName; @NonNull private String routingKey; private int maxRetryCount = 3; public DlxFanoutProcessingErrorHandler(String deadExchangeName, String routingKey) throws IllegalArgumentException { super(); if (StringUtils.isAnyEmpty(deadExchangeName, routingKey)) { throw new IllegalArgumentException("Must define dlx exchange name and routing key"); } this.deadExchangeName = deadExchangeName; this.routingKey = routingKey; } public DlxFanoutProcessingErrorHandler(String deadExchangeName, String routingKey, int maxRetryCount) { this(deadExchangeName, routingKey); setMaxRetryCount(maxRetryCount); } public String getDeadExchangeName() { return deadExchangeName; } public int getMaxRetryCount() { return maxRetryCount; } public String getRoutingKey() { return routingKey; } /** * Handle AMQP message consume error. This default implementation will put * message to dead letter exchange for maxRetryCount times, thus * two variables are required when creating this object: * dlxExchangeName and dlxRoutingKey. * maxRetryCount is 3 by default, but you can set it using * setMaxRetryCount(int) */ public boolean handleErrorProcessingMessage(Message message, Channel channel) { var rabbitMqHeader = new RabbitmqHeader(message.getMessageProperties().getHeaders()); try { if (rabbitMqHeader.getFailedRetryCount() >= maxRetryCount) { // publish to dead and ack log.warn("[DEAD] Error at " + new Date() + " on retry " + rabbitMqHeader.getFailedRetryCount() + " for message " + message); channel.basicPublish(getDeadExchangeName(), getRoutingKey(), null, message.getBody()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } else { log.debug("[REQUEUE] Error at " + new Date() + " on retry " + rabbitMqHeader.getFailedRetryCount() + " for message " + message); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } return true; } catch (IOException e) { log.warn("[HANDLER-FAILED] Error at " + new Date() + " on retry " + rabbitMqHeader.getFailedRetryCount() + " for message " + message); } return false; } public void setMaxRetryCount(int maxRetryCount) throws IllegalArgumentException { if (maxRetryCount > 1000) { throw new IllegalArgumentException("max retry must between 0-1000"); } this.maxRetryCount = maxRetryCount; } } |
RetryAccountingConsumer.java | package uk.co.datadisk.rabbitmq_consumer; import java.io.IOException; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import uk.co.datadisk.rabbitmq_consumer.entity.Employee; @Service public class RetryAccountingConsumer { private static final String DEAD_EXCHANGE_NAME = "x.guideline2.dead"; private static final String ROUTING_KEY = "accounting"; private static final Logger log = LoggerFactory.getLogger(RetryAccountingConsumer.class); private DlxFanoutProcessingErrorHandler dlxFanoutProcessingErrorHandler; private ObjectMapper objectMapper; public RetryAccountingConsumer() { this.objectMapper = new ObjectMapper(); this.dlxFanoutProcessingErrorHandler = new DlxFanoutProcessingErrorHandler(DEAD_EXCHANGE_NAME, ROUTING_KEY); } @RabbitListener(queues = "q.guideline2.accounting.work") public void listen(Message message, Channel channel) throws InterruptedException, JsonParseException, JsonMappingException, IOException { try { var emp = objectMapper.readValue(message.getBody(), Employee.class); if (StringUtils.isEmpty(emp.getName())) { throw new IllegalArgumentException("Name is empty"); } else { log.info("On accounting : {}", emp); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } catch (Exception e) { log.warn("Error processing message : {} : {}", new String(message.getBody()), e.getMessage()); dlxFanoutProcessingErrorHandler.handleErrorProcessingMessage(message, channel); } } } |
RetryMarketingConsumer.java | package uk.co.datadisk.rabbitmq_consumer; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import uk.co.datadisk.rabbitmq_consumer.entity.Employee; @Service public class RetryMarketingConsumer { private static final Logger log = LoggerFactory.getLogger(RetryMarketingConsumer.class); private ObjectMapper objectMapper; public RetryMarketingConsumer() { this.objectMapper = new ObjectMapper(); } @RabbitListener(queues = "q.guideline2.marketing.work") public void listen(Message message, Channel channel) throws InterruptedException, JsonParseException, JsonMappingException, IOException { var e = objectMapper.readValue(message.getBody(), Employee.class); log.info("On marketing : " + e); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } |
In this example we will take a look at Spring Boot retry mechanism, we will be configuring the application.properties (application.yml) file. One point to note about Spring Boot retry mechanism is that all other messages behind the message that is in the retry state are held up until this message has been processed or becomes exhausted (DLX).
I have setup the below exchanges and queues, I am showing a direct exchange example
All you need to do in Spring Boot is add the below to the properies (or yaml) file and thats it, just code the producers and consumers as normal using the throw IOException
SpringConsumer.java | package uk.co.datadisk.rabbitmq_consumer; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import uk.co.datadisk.rabbitmq_consumer.entity.Picture; import java.io.IOException; @Service public class SpringConsumer { ObjectMapper objectMapper = new ObjectMapper(); private static final Logger log = LoggerFactory.getLogger(SpringConsumer.class); @RabbitListener(queues = "dd.spring.image.work") public void listenImage(String message) throws IOException { var picture = objectMapper.readValue(message, Picture.class); log.info("Consuming image : {} size: {}", picture.getName(), picture.getSize()); if (picture.getSize() > 9000) { throw new IOException("Image " + picture.getName() + " size too large : " + picture.getSize()); } log.info("Creating thumbnail & publishing image : {}", picture.getName()); } @RabbitListener(queues = "dd.spring.vector.work") public void listenVector(String message) throws IOException { var picture = objectMapper.readValue(message, Picture.class); log.info("Consuming vector : {}", picture.getName()); log.info("Converting to image, creating thumbnail & publishing image : {}", picture.getName()); } } |
If you notice in the console output you can see that the consuming image is retried multiple times and eventually dies all based on the properties we have speciifed in properties file.