文章内容

一、RabbitMQ消息丢失场景
RabbitMQ丢失的以下三种情况:

- 1)生产者:生产者发送消息至MQ的数据丢失
- 2)RabbitMQ:MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失
- 3)消费者:消费者刚拿到消息,还没处理,挂掉了,MQ又以为消费者处理完
二、RabbitMQ消息丢失解决方案
针对这几方问题分别列出解决方案:

1、生产者的两种方案
- 1)开启RabbitMQ事务(不推荐)
- 2)开启confirm模式(异步,推荐)
1)开启RabbitMQ事务
AMQP协议提供了事务机制,在投递消息时开启事务支持,如果消息投递失败,则回滚事务。
a)自定义事务管理器
01 02 03 04 05 06 07 08 09 10 11 12 | @Configuration public class RabbitTranscation { @Bean public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){ return new RabbitTransactionManager(connectionFactory); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ return new RabbitTemplate(connectionFactory); } } |
b)修改yml
1 2 3 4 | spring: rabbitmq: # 消息在未被队列收到的情况下返回 publisher-returns: true |
c)开启事务支持
1 | rabbitTemplate.setChannelTransacted( true ); |
d)消息未接收时调用ReturnCallback
1 | rabbitTemplate.setMandatory( true ); |
e)生产者投递消息
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 | @Service public class ProviderTranscation implements RabbitTemplate.ReturnCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ // 设置channel开启事务 rabbitTemplate.setChannelTransacted( true ); rabbitTemplate.setReturnCallback( this ); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println( "这条消息发送失败了" +message+ ",请处理" ); } @Transactional (rollbackFor = Exception. class ,transactionManager = "rabbitTransactionManager" ) public void publishMessage(String message) throws Exception { rabbitTemplate.setMandatory( true ); rabbitTemplate.convertAndSend( "javatrip" ,message); } } |
但是,很少有人这么干,因为这是同步操作,一条消息发送之后会使发送端阻塞,以等待RabbitMQ-Server的回应,之后才能继续发送下一条消息,生产者生产消息的吞吐量和性能都会大大降低。
2)开启confirm模式
发送消息时将信道设置为confirm模式,消息进入该信道后,都会被指派给一个唯一ID,一旦消息被投递到所匹配的队列后,RabbitMQ就会发送给生产者一个确认。
a)开启消息确认机制
1 2 3 4 5 6 | spring: rabbitmq: # 消息在未被队列收到的情况下返回 publisher-returns: true # 开启消息确认机制 publisher-confirm-type: correlated |
b)消息未接收时调用ReturnCallback
1 | rabbitTemplate.setMandatory( true ); |
c)生产者投递消息
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | @Service public class ConfirmProvider implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setReturnCallback( this ); rabbitTemplate.setConfirmCallback( this ); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ System.out.println( "确认了这条消息:" +correlationData); } else { System.out.println( "确认失败了:" +correlationData+ ";出现异常:" +cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println( "这条消息发送失败了" +message+ ",请处理" ); } public void publisMessage(String message){ rabbitTemplate.setMandatory( true ); rabbitTemplate.convertAndSend( "javatrip" ,message); } } |
d)消息的重试机制
如果消息确认失败后,我们可以进行消息补偿,也就是消息的重试机制。当未收到确认信息时进行消息的重新投递。设置如下配置即可完成。
01 02 03 04 05 06 07 08 09 10 11 12 13 | spring: rabbitmq: # 支持消息发送失败后重返队列 publisher-returns: true # 开启消息确认机制 publisher-confirm-type: correlated listener: simple: retry: # 开启重试 enabled: true # 最大重试次数 max-attempts: 5 |
2、RabbitMQ:开启RabbitMQ持久化,将内存数据持久化到磁盘中
1)持久化队列
创建队列的时候将持久化属性durable设置为true,同时要将autoDelete设置为false。
1 | @Queue (value = "javatrip" ,durable = "true" ,autoDelete = "false" ) |
2)持久化消息
发送消息的时候将消息的deliveryMode设置为2,在Spring Boot中消息默认就是持久化的。
3、消费者:关闭RabbitMQ自动ACK
消费者刚消费了消息,还没有处理业务,结果发生异常。这时候就需要关闭自动确认,改为手动确认消息。
1)修改yml为手动签收模式
1 2 3 4 5 6 7 8 | spring: rabbitmq: listener: simple: # 手动签收模式 acknowledge-mode: manual # 每次签收一条消息 prefetch: 1 |
2)消费者手动签收
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 | @Component @RabbitListener (queuesToDeclare = @Queue (value = "javatrip" , durable = "true" )) public class Consumer { @RabbitHandler public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{ System.out.println(message); // 唯一的消息ID Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 确认该条消息 if (...){ channel.basicAck(deliverTag, false ); } else { // 消费失败,消息重返队列 channel.basicNack(deliverTag, false , true ); } } |
三、RabbitMQ幂等问题
幂等性问题通俗点讲就是保证数据不被重复消费,同时数据也不能少(就是上述的可靠性),也就是数据一致性问题。数据重复的问题简单的多,就是在消费端判断数据是否已经被消费过:
- 1)比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。1.
- 2)比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
- 3)比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id(时间+机器编码+应用PID+计数),类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
- 4)比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。