大规模网站开发语言,批量做网站引流,自己做网站项目,合肥外贸网站建设本文介绍#xff1a;消息可靠性保障需从发送者、MQ、消费者三方面着手#xff1a;发送者通过重连机制和确认机制#xff08;PublisherConfirm/Return#xff09;确保消息投递#xff1b;MQ通过数据持久化和LazyQueue优化存储#xff1b;消费者采用确认机制#xff08;ac…本文介绍消息可靠性保障需从发送者、MQ、消费者三方面着手发送者通过重连机制和确认机制PublisherConfirm/Return确保消息投递MQ通过数据持久化和LazyQueue优化存储消费者采用确认机制ack/nack/reject、失败重试及幂等处理唯一约束/业务判断防止重复消费。兜底方案使用延迟消息死信队列/DelayExchange插件处理异常情况形成完整的可靠性保障体系。如何保证消息可靠性保证消息可靠性要从发送者、MQ、消费者这三个方面考虑。发送者的可靠性发送者重连机制有时候可能会因为网络的问题导致发送者没能连接上MQ这时就需要对发送者进行发送者重连可以通过开启Spring AMQP提供的失败后重连的配置因为它默认是false关闭的。spring: rabbitmq: connection-timeout: 1s #设置MO的连接超时时间 template: retry: enabled: true #开启超时重试机制 initial-interval: 200ms #失败后的初始等待时间 multiplier: 1 # 失败后下次的等待时长倍数 max-attempts: 3 #最大重试次数不过呢这个重试机制是阻塞的它在重试等待的过程中当前线程会阻塞从而影响其它业务进行的性能。因此如果追求高性能可以不开启重试机制如果一定要保证发送者消息的可靠性可以开启重试但是一定要合理设置重连等待时长和次数等待时间不要太长了。当然了也可以采用异步线程的方式我可以将发送消息的代码放在单独线程进行这样在触发重试时就不会影响阻塞主线程执行的任务了。发送者确认机制Spring AMOP提供了Publisher Confirm和Publisher Return两种确认机制。开启确认机制后当发送者发送消息给MQ后MQ就会返回确认结果给发送者。返回的结果有以下3种情况1、消息到达了MQ但是通过交换机进行路由失败时可能是没绑定到队列或绑定路径错误导致路由失败此时会通过Publisher Return返回路由异常原因然后通过Publisher Confirm返回ACK告知投递成功2、临时消息到达了MQ并且入队成功返回ACK告知投递成功否则返回NACK告知投递失败3、持久消息到达了MQ入队成功将消息写入磁盘完成持久化返回ACK告知投递成功否则返回NACK告知投递失败ACK Acknowledge character是确认字符在数据通信中接收站发给发送站的一种传输类控制字符。 表示发来的数据已确认接收无误。 在TCP/IP协议中如果接收方成功的接收到数据那么会回复一个ACK数据。 通常ACK信号有自己固定的格式,长度大小,由接收方回复给发送方。如何开启确认机制来实现发送者确认spring: rabbitmq: publisher-confirm-type: correlated # 开启publisher confirm机制并设置confirm类型默认是none关闭的 publisher-returns: true #开publisher return机制publisher-confirm-type有三种模式1、none关闭confirm机制默认2、simple同步阻塞等待MQ的回执消息3、correlatedMQ异步回调方式返回回执消息ReturnCallback每个RabbitTemplate只能配置一个ReturnCallback因此在项目启动的时候配置就可以了而不是在每发一次消息进行配置Slf4j AllArgsConstructor Configuration public class MqConfig{ private final RabbitTemplate rabbitTemplate; PostConstruct public void init(){ rabbitTemplate.setReturnsCallback(new RabbitTemplate.Returnscallback(){ Override public void returnedMessage(ReturnedMessage returned){ log.error(触发return callback): } }); } }ConfirmCallback发送消息在每次发消息时都要进行一次配置指定消息ID识别哪个消息的关键标识、消息ConfirmCallbackSlf4j SpringBootTest public class RabbitMQPublisherConfirmTest { Autowired private RabbitTemplate rabbitTemplate; Test void testPublisherConfirm() throws InterruptedException { // 1.创建CorrelationData CorrelationData cd new CorrelationData(); // 2.给Future添加ConfirmCallback cd.getFuture().addCallback(new ListenableFutureCallbackCorrelationData.Confirm() { Override public void onFailure(Throwable ex) { // 2.1.Future发生异常时的处理逻辑基本不会触发 log.error(Handle message ack fail, ex); } Override public void onSuccess(CorrelationData.Confirm result) { // 2.2.Future接收到回执的处理逻辑参数中的result就是回执内容 if (result.isAck()) { // result.isAck(), boolean类型, true代表ack回执, false 代表 nack回执 log.debug(发送消息成功收到 ack!); } else { // result.getReason(), String类型返回nack时的异常描述 log.error(发送消息失败收到 nack, reason : {}, result.getReason()); } } }); // 3.发送消息 rabbitTemplate.convertAndSend(hmall.direct, redl, hello, cd); } }MQ的可靠性默认情况下RabbitMQ会将接收到的信息保存在内存中。有时候可能会出现下面两个问题1、MQ宕机内存中的消息会丢失2、内存空间有限当消费者故障或处理过慢时会导致消息积压MQ阻塞如果还用重发的方法来解决那MQ的性能会变差因此要保证MQ的可靠性要从存储上解决。数据持久化对RabbitMQ提前进行数据持久化到磁盘从以下3个方面进行持久化1、交换机持久化2、队列持久化3、消息持久化Spring AMQP创建交换机和发消息的持久化默认是支持的因此不需要额外做什么只要对队列持久化额外处理。进行了持久化发的MQ消息在传统的队列是会在内存中保存一份然后再在磁盘写一份那这样每一条处理消息处理的耗时是不是就变长了导致整体的一个并发能力是有点下降的。那要让接收到的消息直接写入磁盘不需要存储到内存中就需要用到LazyQueue。LazyQueue懒惰队列RabbitMQ从3.6.0版本开始增加了LazyQueue的概念。在3.12版本后所有队列都是Lazy Queue模式无法更改。LazyQueue是什么呢它是 RabbitMQ 中的一种队列模式它通过尽可能将消息存储在磁盘而不是内存中来优化消息的持久化处理从而解决内存压力和大量消息堆积的问题。LazyQueue的两个特性1、接收到消息后直接存入磁盘不再存储到内存2、消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存最多2048条)声明LazyQueue的三种方式方式1通过Bean声明:Bean public Queue lazyQueue(){ return QueueBuilder .durable(lazy.queue) .lazy()// 开启Lazy模式 .build(); }方式2通过注解形式声明RabbitListener(queuesToDeclare Queue( name lazy.queue, durable true, arguments Argument(name x-queue-mode,value lazy ) )) public void listenLazyQueue(string msg){ log.info(接收到 lazy.queue的消息:{}msg); }方式3通过控制台的方式声明消费者的可靠性确认机制消费者确认机制是为了确认消费者是否成功处理消息。当消费者处理消息结束后应该向RabbitMQ发送一个回执告知RabbitMQ自己消息处理状态:ack成功处理消息RabbitMQ从队列中删除该消息nack消息处理失败RabbitMQ要再次向队列发送消息reject消息处理失败并拒绝接收该消息RabbitMQ从队列中删除该消息Spring AMQP有三种消息确认方式1、不处理none即消息投递给消费者后立刻ack消息会立刻从MQ删除。非常不安全不建议使用2、手动模式manual需要自己在业务代码中调用api发送ack或reject存在业务入侵但更灵活3、自动模式autoSpring AMQP利用AOP对我们的消息处理逻辑做了环绕增强当业务正常执行时则自动返回ack当业务出现异常时根据异常判断返回不同结果如果是业务异常会自动返回nack如果是消息处理或校验异常自动返回reject配置方式spring: rabbitmq: listener: simple: prefetch: 1 acknowledge-mode: none失败重试机制Spring AMOP提供了消费者失败重试机制在消费者出现异常时利用本地重试而不是无限的重新投递消息一直重新入队。通过在application.yaml文件中给消费者添加配置来开启重试机制:spring: rabbitmg: listener: simple: prefetch: 1 retry: enabled: true #开启消费者失败重试 initial-interval: 1000ms #初始的失败等待时长为1秒 multiplier: 1 #下次失败的等待时长倍数下次等待时长multiplier *last-interval max-attempts: 3 #最大重试次数 stateless: true #true无状态:false有状态如果业务中包含事务这里改为false但是这种设置max-attempts最大重试次数耗尽时失败重试机制默认会拒绝并且将消息从队列中丢弃。因此要解决这个问题就要用到MessageRecoverer接口的另外两个实现类来解决1、ImmediateRequestMessageRecoverer在重试次数耗尽后会立即返回nack消息重新入队2、RepublishMessageRecoverer在重试次数耗尽后会将失败的消息投递到指定的交换机配置失败重试处理策略Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate , direct , queue); }幂等处理幂等是什么就是做一次和做多次的效果是一样的。如支付1元和重复支付账户只扣1元多扣的要退款。那为什么需要幂等呢因为消息可能重复1、网络问题导致确认没收到MQ重新发送2、消费者处理超时MQ认为失败重新发送3、手动重试操作如何实现幂等方法1据库唯一约束或全局id作为唯一标识-- 创建订单处理记录表 CREATE TABLE order_process ( id BIGINT PRIMARY KEY, order_id VARCHAR(50) UNIQUE, -- 唯一约束 status VARCHAR(20), created_time TIMESTAMP );原理1、第一次处理插入成功继续业务2、第二次处理插入失败已存在直接返回方法2基于业务逻辑判断来保证幂等性。虽然对发送者、MQ、消费者都进行了消息的可靠性保证但是没有绝对100%能够保证不会出现消息丢失哪怕是99.9%也还有0.1%的可能性发生丢失因此我们还需要做一个兜底方案。兜底方案延迟消息延迟消息是什么发送者发送消息时指定一个时间消费者不会立即收到消息而是在指定时间到了才收到消息。那怎么实现延迟消息呢可以通过死信队列或使用延迟消息的插件来实现延迟消息方案。死信队列死信是什么无法被正常投递的消息消息成为死信的三种情况1、消息被拒绝消费者说我不要这个2、消息过期超过生存时间TTL3、队列满了装不下了死信队列的作用1、收集问题消息统一处理异常情况2、实现延迟消息通过消息过期机制3、问题排查分析为什么消息成为死信工作原理1. 创建一个延迟队列设置消息30分钟后过期2. 消息进入延迟队列等待30分钟3. 30分钟后消息过期自动转到死信队列4. 消费者从死信队列获取消息并处理一个普通的交换机和一个绑定了死信交换机的普通队列然后给死信交换机绑定死信队列绑定消费者延迟消息插件DelayExchange插件RabbitMQ 提供的延迟消息插件可以简化延迟消息的处理过程。该插件通过设计一种特殊的交换机当消息投递到这种交换机时它能够暂存一段时间直到达到设定的延迟时间后再将消息投递到相应的队列。DelayExchange在消息发布时检查是否设置了x-delay头如果有则会将消息存储在内部数据库中直到延迟时间到达后才转发到目标队列。Docker安装方式1、下载插件的地址https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases2、上传插件到mqdocker cp rabbitmq_delayed_message_exchange-3.8.0.ez container_id:/plugins3、安装启用插件docker exec -it container_id rabbitmq-plugins enable rabbitmq_delayed_message_exchange4、重启容器docker restart container_id然后通过注解的方式声明队列、交换机、绑定队列交换机并设置交换机为延迟交换机来实现延迟消息RabbitListener(bindings QueueBinding( value Queue(name delay.queue, durable true), exchange Exchange(name delay.direct, delayed true), key delay )) public void listenDelayQueue(String msg) { log.info(接收到延迟消息{} , msg); }基于Bean的方式声明队列、交换机、绑定队列交换机并设置交换机为延迟交换机来实现延迟消息Configuration public class DirectConfiguration { //设置延迟交换机 Bean public DirectExchange delayExchange() { return ExchangeBuilder .directExchange(delay.direct) .delayed() .durable(true) .build(); } //声明队列 Bean public Queue delayedQueue() { return new Queue(delay.queue); } //绑定队列与交换机 Bean public Binding delayQueueBinding() { return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with(delay); } }然后发送消息时需要通过消息头x-delay设置过期时间Test void testPublisherDelayMessage(){ // 1.创建消息 String message 延迟消息; //2.发送消息利用消息后置处理器MessagePostProcessor添加消息头 rabbitTemplate.convertAndSend(delay,direct,delay, message, new MessagePostProcessor(){ Override public Message postProcessMessage(Message message)throws AmqpException { // 添加延迟消息属性 message.getMessageProperties().setDelay(5000); return message; } }); }