Error Handling

Messages can live forever if they are not consumed, you can use something called a Dead Letter Exchange (DLX) which accepts messages when the below occurs

DLX is a normal exchange that another consumer could process message. Below is an example on not using an DLX:

MyPictureImageConsumer.java (with exception)
package uk.co.datadisk.rabbitmq_consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_consumer.entity.Picture;

import java.io.IOException;

@Service
public class MyPictureImageConsumer {

    private ObjectMapper objectMapper = new ObjectMapper();

    private final Logger log = LoggerFactory.getLogger(MyPictureImageConsumer.class);

    @RabbitListener(queues = "dd.picture.image")
    public void listen(String message) throws IOException {
        var p = objectMapper.readValue(message, Picture.class);

        if (p.getSize() > 9000) {
            throw new IllegalArgumentException("Picture size is too large: " + p);
        }
        log.info("Image: {}", p);
    }
}

The way we have coded the above an infinite loop runs as the code throws an exception but the message remains on the queue, then we throw an exception again, etc, etc.


Now we create a new DLX (normal exchange) and a new queue that points to the DLX, if you look you can see has an extra feature called DLX

MyPictureImageConsumer.java (using AMPQ for exception - AUTOMATIC)
package uk.co.datadisk.rabbitmq_consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_consumer.entity.Picture;

import java.io.IOException;

@Service
public class MyPictureImageConsumer {

    private ObjectMapper objectMapper = new ObjectMapper();

    private final Logger log = LoggerFactory.getLogger(MyPictureImageConsumer.class);

    @RabbitListener(queues = "dd.picture.image")
    public void listen(String message) throws IOException {
        var p = objectMapper.readValue(message, Picture.class);

        if (p.getSize() > 9000) {
            //throw new IllegalArgumentException("Picture size is too large: " + p);

            throw new AmqpRejectAndDontRequeueException("Picture size is too large: " + p);
        }
        log.info("Image: {}", p);
    }
}

This time the message is automatically passed to the DLX by RabbitMQ and thus avoiding the infinite loop issue we had above. The message will remain on the DLX until we consume it but at least it does not break our application.


A second way is to use RabbitMQ channel to reject the message, we need to setup two properties in the consumer spring framwork


MyPictureImageConsumer.java (using RabbitMQ Channel for exception - MANUAL)
package uk.co.datadisk.rabbitmq_consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_consumer.entity.Picture;
import com.rabbitmq.client.Channel;

import java.io.IOException;

@Service
public class MyPictureImageConsumer {

    private ObjectMapper objectMapper = new ObjectMapper();

    private final Logger log = LoggerFactory.getLogger(MyPictureImageConsumer.class);

    @RabbitListener(queues = "dd.picture.image")
    public void listen(Message message, Channel channel) throws IOException {
        var p = objectMapper.readValue(message.getBody(), Picture.class);

        if (p.getSize() > 9000) {
            // throw new IllegalArgumentException("Picture size is too large: " + p);
            // throw new AmqpRejectAndDontRequeueException("Picture size is too large: " + p);

            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }
        log.info("Image: {}", p);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

You can also add a Time To Live (TTL) to a queue, which means that the message will be moved to the DLX after 5 seconds if not consumed.


Retry Mechanism using Message Headers

If we get an error we can retry to send the message after a brief period and a specific a number of times before sending to the DLX. For this example we create a number of exchanges and queues.

The bindings are pretty straight forward, below is the bindings for the q.guideline.image.wait queue


The wait queue uses the TTL feature to put back a message on the work after a specific period


In this example we are going to examine the message header properties in order to see if the message has been rejected before, look at the x-death header properties like count, x-first-death-exchange, x-first-death-queue and x-first-death-reason all can be used to determine where the message originally came from and how many times its been rejected, etc.


Producer
RetryPictureProducer.java
package uk.co.datadisk.rabbitmq_producer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_producer.entity.Picture;

@Service
public class RetryPictureProducer {

    private RabbitTemplate rabbitTemplate;
    private ObjectMapper objectMapper = new ObjectMapper();

    public RetryPictureProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(Picture p) throws JsonProcessingException {
        var json = objectMapper.writeValueAsString(p);

        rabbitTemplate.convertAndSend("x.guideline.work", p.getType(), json);
    }
}
RabbitmqProducerApplication.java
package uk.co.datadisk.rabbitmq_producer;

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import uk.co.datadisk.rabbitmq_producer.entity.Picture;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

@SpringBootApplication
public class RabbitmqProducerApplication implements CommandLineRunner {

    private final RetryPictureProducer retryPictureProducer;

    private final List<String> SOURCES = List.of("mobile", "types");
    private final List<String> TYPES = List.of("jpg", "png", "svg");

    public RabbitmqProducerApplication(RetryPictureProducer retryPictureProducer) {
        this.retryPictureProducer = retryPictureProducer;
    }

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqProducerApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {

        for (int i = 0; i < 10; i++) {
            var p = new Picture();
            p.setName("Picture " + i);
            p.setSize(ThreadLocalRandom.current().nextLong(8001, 10001));
            p.setSource(SOURCES.get(i % SOURCES.size()));
            p.setType(TYPES.get(i % TYPES.size()));

            System.out.println("Sending picture size: " + p.getSize());
            retryPictureProducer.sendMessage(p);
        }
    }
}
Consumer
RabbitmqHeader.java
package uk.co.datadisk.rabbitmq_consumer.rabbitmq;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;

public class RabbitmqHeader {

	private static final String KEYWORD_QUEUE_WAIT = "wait";
	private List<RabbitmqHeaderXDeath> xDeaths = new ArrayList<>(2);
	private String xFirstDeathExchange = StringUtils.EMPTY;
	private String xFirstDeathQueue = StringUtils.EMPTY;
	private String xFirstDeathReason = StringUtils.EMPTY;

	@SuppressWarnings("unchecked")
	public RabbitmqHeader(Map<String, Object> headers) {
		if (headers != null) {
			var xFirstDeathExchange = Optional.ofNullable(headers.get("x-first-death-exchange"));
			var xFirstDeathQueue = Optional.ofNullable(headers.get("x-first-death-queue"));
			var xFirstDeathReason = Optional.ofNullable(headers.get("x-first-death-reason"));

			xFirstDeathExchange.ifPresent(s -> this.setxFirstDeathExchange(s.toString()));
			xFirstDeathQueue.ifPresent(s -> this.setxFirstDeathQueue(s.toString()));
			xFirstDeathReason.ifPresent(s -> this.setxFirstDeathReason(s.toString()));

			var xDeathHeaders = (List<Map<String, Object>>) headers.get("x-death");

			if (xDeathHeaders != null) {
				for (Map<String, Object> x : xDeathHeaders) {
					RabbitmqHeaderXDeath hdrDeath = new RabbitmqHeaderXDeath();
					var reason = Optional.ofNullable(x.get("reason"));
					var count = Optional.ofNullable(x.get("count"));
					var exchange = Optional.ofNullable(x.get("exchange"));
					var queue = Optional.ofNullable(x.get("queue"));
					var routingKeys = Optional.ofNullable(x.get("routing-keys"));
					var time = Optional.ofNullable(x.get("time"));

					reason.ifPresent(s -> hdrDeath.setReason(s.toString()));
					count.ifPresent(s -> hdrDeath.setCount(Integer.parseInt(s.toString())));
					exchange.ifPresent(s -> hdrDeath.setExchange(s.toString()));
					queue.ifPresent(s -> hdrDeath.setQueue(s.toString()));
					routingKeys.ifPresent(r -> {
						var listR = (List<String>) r;
						hdrDeath.setRoutingKeys(listR);
					});
					time.ifPresent(d -> hdrDeath.setTime((Date) d));

					xDeaths.add(hdrDeath);
				}
			}
		}
	}

	public int getFailedRetryCount() {
		// get from queue "wait"
		for (var xDeath : xDeaths) {
			if (xDeath.getExchange().toLowerCase().endsWith(KEYWORD_QUEUE_WAIT)
					&& xDeath.getQueue().toLowerCase().endsWith(KEYWORD_QUEUE_WAIT)) {
				return xDeath.getCount();
			}
		}

		return 0;
	}

	public List<RabbitmqHeaderXDeath> getxDeaths() {
		return xDeaths;
	}

	public String getxFirstDeathExchange() {
		return xFirstDeathExchange;
	}

	public String getxFirstDeathQueue() {
		return xFirstDeathQueue;
	}

	public String getxFirstDeathReason() {
		return xFirstDeathReason;
	}

	public void setxDeaths(List<RabbitmqHeaderXDeath> xDeaths) {
		this.xDeaths = xDeaths;
	}

	public void setxFirstDeathExchange(String xFirstDeathExchange) {
		this.xFirstDeathExchange = xFirstDeathExchange;
	}

	public void setxFirstDeathQueue(String xFirstDeathQueue) {
		this.xFirstDeathQueue = xFirstDeathQueue;
	}

	public void setxFirstDeathReason(String xFirstDeathReason) {
		this.xFirstDeathReason = xFirstDeathReason;
	}

}
RabbitmqHeaderXDeath.java
package uk.co.datadisk.rabbitmq_consumer.rabbitmq;

import java.util.Date;
import java.util.List;

public class RabbitmqHeaderXDeath {

	private int count;
	private String exchange;
	private String queue;
	private String reason;
	private List<String> routingKeys;
	private Date time;

	@Override
	public boolean equals(Object obj) {
		if (this == obj) {
			return true;
		}
		if (obj == null) {
			return false;
		}
		if (getClass() != obj.getClass()) {
			return false;
		}
		RabbitmqHeaderXDeath other = (RabbitmqHeaderXDeath) obj;
		if (count != other.count) {
			return false;
		}
		if (exchange == null) {
			if (other.exchange != null) {
				return false;
			}
		} else if (!exchange.equals(other.exchange)) {
			return false;
		}
		if (queue == null) {
			if (other.queue != null) {
				return false;
			}
		} else if (!queue.equals(other.queue)) {
			return false;
		}
		if (reason == null) {
			if (other.reason != null) {
				return false;
			}
		} else if (!reason.equals(other.reason)) {
			return false;
		}
		if (routingKeys == null) {
			if (other.routingKeys != null) {
				return false;
			}
		} else if (!routingKeys.equals(other.routingKeys)) {
			return false;
		}
		if (time == null) {
			if (other.time != null) {
				return false;
			}
		} else if (!time.equals(other.time)) {
			return false;
		}
		return true;
	}

	public int getCount() {
		return count;
	}

	public String getExchange() {
		return exchange;
	}

	public String getQueue() {
		return queue;
	}

	public String getReason() {
		return reason;
	}

	public List<String> getRoutingKeys() {
		return routingKeys;
	}

	public Date getTime() {
		return time;
	}

	@Override
	public int hashCode() {
		final int prime = 19;
		int result = 1;
		result = prime * result + count;
		result = prime * result + ((exchange == null) ? 0 : exchange.hashCode());
		result = prime * result + ((queue == null) ? 0 : queue.hashCode());
		result = prime * result + ((reason == null) ? 0 : reason.hashCode());
		result = prime * result + ((routingKeys == null) ? 0 : routingKeys.hashCode());
		result = prime * result + ((time == null) ? 0 : time.hashCode());
		return result;
	}

	public void setCount(int count) {
		this.count = count;
	}

	public void setExchange(String exchange) {
		this.exchange = exchange;
	}

	public void setQueue(String queue) {
		this.queue = queue;
	}

	public void setReason(String reason) {
		this.reason = reason;
	}

	public void setRoutingKeys(List<String> routingKeys) {
		this.routingKeys = routingKeys;
	}

	public void setTime(Date time) {
		this.time = time;
	}

}
DlxProcessingErrorHandler.java
package uk.co.datadisk.rabbitmq_consumer;

import uk.co.datadisk.rabbitmq_consumer.rabbitmq.RabbitmqHeader;
import com.rabbitmq.client.Channel;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.lang.NonNull;

import java.io.IOException;
import java.util.Date;

public class DlxProcessingErrorHandler {

    private static final Logger log = LoggerFactory.getLogger(DlxProcessingErrorHandler.class);

    @NonNull
    private String deadExchangeName;

    private int maxRetryCount = 3;

    // Constructor. Will retry for n times (default is 3) and on the next retry will
    // consider message as dead, put it on dead exchange with given dlxRoutingKey is null or empty.
    public DlxProcessingErrorHandler(String deadExchangeName) throws IllegalArgumentException {
        super();

        if (StringUtils.isAnyEmpty(deadExchangeName)) {
            throw new IllegalArgumentException("Must define dlx exchange name");
        }
        this.deadExchangeName = deadExchangeName;
    }


    // Constructor. Will retry for maxRetryCount times and on the next
    // retry will consider message as dead, put it on dead exchange with given dlxExchangeName and routingKey
    public DlxProcessingErrorHandler(String deadExchangeName, int maxRetryCount) {
        this(deadExchangeName);
        setMaxRetryCount(maxRetryCount);
    }

    public String getDeadExchangeName() {
        return deadExchangeName;
    }

    public int getMaxRetryCount() {
        return maxRetryCount;
    }

    /**
     * Handle AMQP message consume error. This default implementation will put
     * message to dead letter exchange for maxRetryCount times, thus
     * two variables are required when creating this object:
     * dlxExchangeName and dlxRoutingKey.
     * maxRetryCount is 3 by default, but you can set it using
     * setMaxRetryCount(int)
     */
    public boolean handleErrorProcessingMessage(Message message, Channel channel) {
        var rabbitMqHeader = new RabbitmqHeader(message.getMessageProperties().getHeaders());

        try {
            if (rabbitMqHeader.getFailedRetryCount() >= maxRetryCount) {
                // publish to dead and ack
                log.warn("[DEAD] Error at " + new Date() + " on retry " + rabbitMqHeader.getFailedRetryCount()
                        + " for message " + message);

                channel.basicPublish(getDeadExchangeName(), message.getMessageProperties().getReceivedRoutingKey(),
                        null, message.getBody());
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                log.debug("[REQUEUE] Error at " + new Date() + " on retry " + rabbitMqHeader.getFailedRetryCount()
                        + " for message " + message);

                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            }
            return true;
        } catch (IOException e) {
            log.warn("[HANDLER-FAILED] Error at " + new Date() + " on retry " + rabbitMqHeader.getFailedRetryCount()
                    + " for message " + message);
        }
        return false;
    }

    public void setMaxRetryCount(int maxRetryCount) throws IllegalArgumentException {
        if (maxRetryCount > 1000) {
            throw new IllegalArgumentException("max retry must between 0-1000");
        }
        this.maxRetryCount = maxRetryCount;
    }
}
RetryImageConsumer.java
package uk.co.datadisk.rabbitmq_consumer;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import uk.co.datadisk.rabbitmq_consumer.entity.Picture;

@Service
public class RetryImageConsumer {

	private static final String DEAD_EXCHANGE_NAME = "x.guideline.dead";

	private static final Logger log = LoggerFactory.getLogger(RetryImageConsumer.class);
	private DlxProcessingErrorHandler dlxProcessingErrorHandler;

	private ObjectMapper objectMapper;

	public RetryImageConsumer() {
		this.objectMapper = new ObjectMapper();
		this.dlxProcessingErrorHandler = new DlxProcessingErrorHandler(DEAD_EXCHANGE_NAME);
	}

	@RabbitListener(queues = "q.guideline.image.work")
	public void listen(Message message, Channel channel)
			throws InterruptedException, JsonParseException, JsonMappingException, IOException {
		try {
			var p = objectMapper.readValue(message.getBody(), Picture.class);
			// process the image
			if (p.getSize() > 9000) {
				// throw exception, we will use DLX handler for retry mechanism
				throw new IOException("Size too large");
			} else {
				log.info("Creating thumbnail & publishing : " + p);
				// you must acknowledge that message already processed
				channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
			}
		} catch (IOException e) {
			log.warn("Error processing message : " + new String(message.getBody()) + " : " + e.getMessage());
			dlxProcessingErrorHandler.handleErrorProcessingMessage(message, channel);
		}
	}
}
RetryVectorConsumer.java
package uk.co.datadisk.rabbitmq_consumer;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import uk.co.datadisk.rabbitmq_consumer.entity.Picture;

@Service
public class RetryVectorConsumer {

	private static final String DEAD_EXCHANGE_NAME = "x.guideline.dead";

	private static final Logger log = LoggerFactory.getLogger(RetryVectorConsumer.class);
	private DlxProcessingErrorHandler dlxProcessingErrorHandler;

	private ObjectMapper objectMapper;

	public RetryVectorConsumer() {
		this.objectMapper = new ObjectMapper();
		this.dlxProcessingErrorHandler = new DlxProcessingErrorHandler(DEAD_EXCHANGE_NAME);
	}

	@RabbitListener(queues = "q.guideline.vector.work")
	public void listen(Message message, Channel channel)
			throws InterruptedException, JsonParseException, JsonMappingException, IOException {
		try {
			var p = objectMapper.readValue(message.getBody(), Picture.class);
			// process the image
			if (p.getSize() > 9000) {
				// throw exception, we will use DLX handler for retry mechanism
				throw new IOException("Size too large");
			} else {
				log.info("Convert to image, creating thumbnail, & publishing : " + p);
				// you must acknowledge that message already processed
				channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
			}
		} catch (IOException e) {
			log.warn("Error processing message : " + new String(message.getBody()) + " : " + e.getMessage());
			dlxProcessingErrorHandler.handleErrorProcessingMessage(message, channel);
		}
	}
}

Image sizes greater than 9000 will be placed in the wait queue, these will remain there for 30 mins


Retry Mechanism using AMQP

In this example we are going to AMQP to handle the retries, we will use a fanout exchange and create the queues as per below, this is nothing we have not seen before, some of the classes will be used from the above example

Producer
RetryEmployeeProducer.java
package uk.co.datadisk.rabbitmq_producer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_producer.entity.Employee;

@Service
public class RetryEmployeeProducer {

    private RabbitTemplate rabbitTemplate;
    private ObjectMapper objectMapper = new ObjectMapper();

    public RetryEmployeeProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(Employee emp) throws JsonProcessingException {
        var json = objectMapper.writeValueAsString(emp);
        rabbitTemplate.convertAndSend("x.guideline2.work", "", json);
    }
}
RabbitmqProducerApplication.java
package uk.co.datadisk.rabbitmq_producer;

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import uk.co.datadisk.rabbitmq_producer.entity.Employee;

import java.time.LocalDate;

@SpringBootApplication
public class RabbitmqProducerApplication implements CommandLineRunner {

    private final RetryEmployeeProducer retryEmployeeProducer;

    public RabbitmqProducerApplication(RetryEmployeeProducer retryEmployeeProducer) {
        this.retryEmployeeProducer = retryEmployeeProducer;
    }

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqProducerApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {

        for (int i = 0; i < 10; i++) {
            Employee emp = new Employee("Employee" + i, null, LocalDate.now());
            retryEmployeeProducer.sendMessage(emp);
        }
    }
}
Consumer
DlxFanoutProcessingErrorHandler.java
package uk.co.datadisk.rabbitmq_consumer;

import java.io.IOException;
import java.util.Date;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.lang.NonNull;

import uk.co.datadisk.rabbitmq_consumer.rabbitmq.RabbitmqHeader;
import com.rabbitmq.client.Channel;

/**
 *
 * Generic class to handle RabbitMQ proccessing error that might occur on
 * try-catch. This will not handle invalid message conversion
 * though (for example if you has Employee JSON structure to process, but got
 * Animal JSON structure instead from Rabbit MQ queue).
 *
 */
public class DlxFanoutProcessingErrorHandler {

	private static final Logger log = LoggerFactory.getLogger(DlxFanoutProcessingErrorHandler.class);
	
	@NonNull
	private String deadExchangeName;

	@NonNull
	private String routingKey;

	private int maxRetryCount = 3;
	
	public DlxFanoutProcessingErrorHandler(String deadExchangeName, String routingKey) throws IllegalArgumentException {
		super();

		if (StringUtils.isAnyEmpty(deadExchangeName, routingKey)) {
			throw new IllegalArgumentException("Must define dlx exchange name and routing key");
		}

		this.deadExchangeName = deadExchangeName;
		this.routingKey = routingKey;
	}

	public DlxFanoutProcessingErrorHandler(String deadExchangeName, String routingKey, int maxRetryCount) {
		this(deadExchangeName, routingKey);
		setMaxRetryCount(maxRetryCount);
	}

	public String getDeadExchangeName() {
		return deadExchangeName;
	}

	public int getMaxRetryCount() {
		return maxRetryCount;
	}

	public String getRoutingKey() {
		return routingKey;
	}

	/**
	 * Handle AMQP message consume error. This default implementation will put
	 * message to dead letter exchange for maxRetryCount times, thus
	 * two variables are required when creating this object:
	 * dlxExchangeName and dlxRoutingKey.
	 * maxRetryCount is 3 by default, but you can set it using
	 * setMaxRetryCount(int)
	 */
	public boolean handleErrorProcessingMessage(Message message, Channel channel) {
		var rabbitMqHeader = new RabbitmqHeader(message.getMessageProperties().getHeaders());

		try {
			if (rabbitMqHeader.getFailedRetryCount() >= maxRetryCount) {
				// publish to dead and ack
				log.warn("[DEAD] Error at " + new Date() + " on retry " + rabbitMqHeader.getFailedRetryCount()
						+ " for message " + message);

				channel.basicPublish(getDeadExchangeName(), getRoutingKey(), null, message.getBody());
				channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
			} else {
				log.debug("[REQUEUE] Error at " + new Date() + " on retry " + rabbitMqHeader.getFailedRetryCount()
						+ " for message " + message);

				channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
			}
			return true;
		} catch (IOException e) {
			log.warn("[HANDLER-FAILED] Error at " + new Date() + " on retry " + rabbitMqHeader.getFailedRetryCount()
					+ " for message " + message);
		}

		return false;
	}

	public void setMaxRetryCount(int maxRetryCount) throws IllegalArgumentException {
		if (maxRetryCount > 1000) {
			throw new IllegalArgumentException("max retry must between 0-1000");
		}

		this.maxRetryCount = maxRetryCount;
	}
}
RetryAccountingConsumer.java
package uk.co.datadisk.rabbitmq_consumer;

import java.io.IOException;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import uk.co.datadisk.rabbitmq_consumer.entity.Employee;

@Service
public class RetryAccountingConsumer {

	private static final String DEAD_EXCHANGE_NAME = "x.guideline2.dead";
	private static final String ROUTING_KEY = "accounting";

	private static final Logger log = LoggerFactory.getLogger(RetryAccountingConsumer.class);
	private DlxFanoutProcessingErrorHandler dlxFanoutProcessingErrorHandler;

	private ObjectMapper objectMapper;

	public RetryAccountingConsumer() {
		this.objectMapper = new ObjectMapper();
		this.dlxFanoutProcessingErrorHandler = new DlxFanoutProcessingErrorHandler(DEAD_EXCHANGE_NAME, ROUTING_KEY);
	}

	@RabbitListener(queues = "q.guideline2.accounting.work")
	public void listen(Message message, Channel channel)
			throws InterruptedException, JsonParseException, JsonMappingException, IOException {
		try {
			var emp = objectMapper.readValue(message.getBody(), Employee.class);

			if (StringUtils.isEmpty(emp.getName())) {
				throw new IllegalArgumentException("Name is empty");
			} else {
				log.info("On accounting : {}", emp);
				channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
			}
		} catch (Exception e) {
			log.warn("Error processing message : {} : {}", new String(message.getBody()), e.getMessage());
			dlxFanoutProcessingErrorHandler.handleErrorProcessingMessage(message, channel);
		}
	}
}
RetryMarketingConsumer.java
package uk.co.datadisk.rabbitmq_consumer;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import uk.co.datadisk.rabbitmq_consumer.entity.Employee;

@Service
public class RetryMarketingConsumer {

	private static final Logger log = LoggerFactory.getLogger(RetryMarketingConsumer.class);
	private ObjectMapper objectMapper;

	public RetryMarketingConsumer() {
		this.objectMapper = new ObjectMapper();
	}

	@RabbitListener(queues = "q.guideline2.marketing.work")
	public void listen(Message message, Channel channel)
			throws InterruptedException, JsonParseException, JsonMappingException, IOException {
		var e = objectMapper.readValue(message.getBody(), Employee.class);
		log.info("On marketing : " + e);
		channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
	}
}

Spring Retry Mechanism

In this example we will take a look at Spring Boot retry mechanism, we will be configuring the application.properties (application.yml) file. One point to note about Spring Boot retry mechanism is that all other messages behind the message that is in the retry state are held up until this message has been processed or becomes exhausted (DLX).


I have setup the below exchanges and queues, I am showing a direct exchange example


All you need to do in Spring Boot is add the below to the properies (or yaml) file and thats it, just code the producers and consumers as normal using the throw IOException


SpringConsumer.java
package uk.co.datadisk.rabbitmq_consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import uk.co.datadisk.rabbitmq_consumer.entity.Picture;

import java.io.IOException;

@Service
public class SpringConsumer {
    ObjectMapper objectMapper = new ObjectMapper();
    private static final Logger log = LoggerFactory.getLogger(SpringConsumer.class);

    @RabbitListener(queues = "dd.spring.image.work")
    public  void listenImage(String message) throws IOException {

        var picture = objectMapper.readValue(message, Picture.class);
        log.info("Consuming image : {} size: {}", picture.getName(), picture.getSize());

        if (picture.getSize() > 9000) {
            throw new IOException("Image " + picture.getName() + " size too large : " + picture.getSize());
        }

        log.info("Creating thumbnail & publishing image : {}", picture.getName());
    }

    @RabbitListener(queues = "dd.spring.vector.work")
    public  void listenVector(String message) throws IOException {

        var picture = objectMapper.readValue(message, Picture.class);
        log.info("Consuming vector : {}", picture.getName());
        log.info("Converting to image, creating thumbnail & publishing image : {}", picture.getName());
    }
}

If you notice in the console output you can see that the consuming image is retried multiple times and eventually dies all based on the properties we have speciifed in properties file.