当前位置: 首页 > news >正文

建设企业网站管理的重要性百度关键词价格

建设企业网站管理的重要性,百度关键词价格,企业网站建设 管理 维护,做360手机网站优化排说明:在RabbitMQ消息传递过程中,有以下问题: 消息没发到交换机 消息没发到队列 MQ宕机,消息在队列中丢失 消息者接收到消息后,未能正常消费(程序报错),此时消息已在队列中移除 …

说明:在RabbitMQ消息传递过程中,有以下问题:

  • 消息没发到交换机

  • 消息没发到队列

  • MQ宕机,消息在队列中丢失

  • 消息者接收到消息后,未能正常消费(程序报错),此时消息已在队列中移除

针对以上问题,提供以下解决方案:

  • 消息确认:确认消息是否发送到交换机、队列;

  • 消息持久化:持久化消息,以防MQ宕机造成消息丢失;

  • 消费者消息确认:确认消费者已正确消费消息,才把消息从队列中删除;

在这里插入图片描述

消息确认

可以使用Rabbit MQ提供的publisher confirm机制来避免消息发送到MQ过程丢失。具体实现是,publisher-confirm(发送者确定)、publisher-return(发送者回执),前者判断消息到交换机、后者判断交换机到队列


publisher-confirm(发送者确定)

  • 消息成功投递到交换机,返回ack;

  • 消息未投递到交换机,返回nack;

publisher-return(发送者回执)

  • 消息投递到交换机,但没有到队列,返回ack,即失败原因;

在生产者端添加配置

spring:rabbitmq:# rabbitMQ相关配置host: 118.178.228.175port: 5672username: rootpassword: 123456virtual-host: /# 开启生产者确认,correlated为异步,simple为同步publisher-confirm-type: correlated# 开启publish-return功能,基于callback机制publisher-returns: true# 开启消息路由失败的策略,true是调用returnCallback方法,false是丢弃消息template:mandatory: true

publisher-return(发送者回执)代码

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;/*** 发送者回执实现*/
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** 回执信息* @param message 信息对象* @param replyCode 回执码* @param replyText 回执内容* @param exchange 交换机* @param routingKey 路由键值*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("消息发送队列失败=====replyCode{},replyText{},exchange{},routingKey{},message{}",replyCode,replyText,exchange,routingKey,message);}});}
}

publisher-confirm(发送者确定)代码

    @Testpublic void sendExceptionMessage() {// 路由键值String routingKey = "exception";// 消息String message = "This is a exception message";// 给消息设置一个唯一IDCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 编写confirmCallBack回调函数correlationData.getFuture().addCallback(new SuccessCallback<CorrelationData.Confirm>() {@Overridepublic void onSuccess(CorrelationData.Confirm confirm) {if (confirm.isAck()) {// 消息发送交换机成功log.debug("消息送达至交换机成功");} else {// 消息发送交换机失败,打印消息log.error("消息未能送达至交换机,ID{},原因{}", correlationData.getId(), confirm.getReason());}}}, new FailureCallback() {// 消息发送交换机异常@Overridepublic void onFailure(Throwable ex) {log.error("消息发送交换机异常,ID:{},原因{}", correlationData.getId(), ex.getMessage());}});rabbitTemplate.convertAndSend("amq.direct", routingKey, message, correlationData);}

测试,设置一个不存在的routingKey,被发送者确认(publisher-confirm)捕获到;

// 路由键值
String routingKey = "null";

在这里插入图片描述

设置一个不存在的路由,被发送者回执(publisher-return)捕获到;

rabbitTemplate.convertAndSend("null", routingKey, message, correlationData);

在这里插入图片描述

消息持久化

消息持久化,是指把消息保存到磁盘中,在RabbitMQ宕机或者关机时,重启后,消息仍可以保存下来。消息依赖于交换机、队列,因此持久化消息,同时也需要持久化交换机、队列。

创建一个持久化的交换机、队列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 消息持久化*/
@Configuration
public class DurableConfig {/*** 交换机持久化* @return*/@Beanpublic DirectExchange directExchange(){// 三个参数分别是:交换机名、是否持久化、没有队列与之绑定时是否自动删除return new DirectExchange("durable.direct",true,false);}/*** 队列持久化* @return*/@Beanpublic Queue durableQueue(){return QueueBuilder.durable("durable.queue").build();}/*** 交换机与队列绑定* @return*/@Beanpublic Binding binding(){return BindingBuilder.bind(durableQueue()).to(directExchange()).with("durable");}}

发送一个持久化的消息

    /*** 发送持久化消息*/@Testpublic void sendDurableMessage() {String routingKey = "durable";CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());Message message = MessageBuilder.withBody("This is a durable message".getBytes(StandardCharsets.UTF_8))// 设置该消息未持久化消息.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();rabbitTemplate.convertAndSend("durable.direct", routingKey, message, correlationData);}

打开RabbitMQ管理平台,可以看到"delivery_mode: 2",表示该消息是持久化消息

在这里插入图片描述

(源码:MessageDeliveryMode类)
在这里插入图片描述

实际上,交换机、队列默认就是持久化的(durable: true),所以不用特意设置;

在这里插入图片描述

消费者消息确认

介绍

消费者消息确认,是为了确保消费者已经消费了消息,才让MQ把该消息删除;

可通过在消费者的配置文件中增加下面这行配置实现,备选项有以下三个:

  • none:关闭ack,表示不做处理,消息发给消费者之后就立即被删除;

  • auto:自动ack,表示由Spring检测代码是否出现异常,出现异常则保留消息,没有异常则删除消息;

  • manual:手动ack,可根据业务手动编写代码,返回ack;

spring:rabbitmq:listener:simple:# 设置消息确认模式acknowledge-mode: none

测试:none

可编写代码测试,下面是生产者代码,发送消息

    /*** 发送普通消息*/@Testpublic void sendNoneMessage() {String directName = "none.direct";String routingKey = "none";String message = "This is a test message";rabbitTemplate.convertAndSend(directName, routingKey, message);}

消费者代码有问题,未能正常消费消息

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "none.queue"),exchange = @Exchange(name = "none.direct",type = ExchangeTypes.DIRECT),key = {"none"}))public void getNoneMessage(String normalMessage){System.out.println(1/0);System.out.println("normalMessage = " + normalMessage);}

测试结果,程序报错,消息也没能保留下来

在这里插入图片描述
在这里插入图片描述

测试:auto

更改设置为:auto,重试

在这里插入图片描述

但是消息未被删除

在这里插入图片描述

这种情况,在实际开发中是不能允许,可以通过更改消费失败的重试机制解决。

消费失败重试机制

方法一:设置retry

因为消息被消费失败,消息会一直循环重试,无限循环,导致mq的消息处理飙升,带来不必要的压力,这种情况可以通过在消费者端添加以下配置,限制失败重试的条件来解决:

spring:rabbitmq:listener:simple:retry:# 开启消费者失败重试enabled: true# 初次失败等待时长为1秒initial-interval: 1000# 失败的等待时长倍数,即后一次等待的时间是前一次等待时间的多少倍multiplier: 1# 最多重试次数max-attempts: 3# true 无状态 false 有状态 如果业务中包含事务 改为falsestateless: true

开启后,控制台可以发现,信息不回一直循环打印,而是打印数条后停止,日志信息中有提示“Retry Policy Exhausted”(重试策略已用尽)

在这里插入图片描述
这种通过配置的方式,并不会重试数次后仍保留消息,而是重试数次仍失败,随即丢弃消息,消息丢失,这在实际开发中也是不能被允许的。

方法二:路由存储消息

因此,可以通过下面这个方法,把消费失败的消息,通过交换机路由到另外的队列中存储起来,等业务代码被修复,再路由回来消费。

在这里插入图片描述

代码如下

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 错误消息队列*/
@Configuration
public class ErrorMessageQueueConfig {/*** 创建一个交换机,用于路由消费失败的消息* @return*/@Beanpublic DirectExchange errorExchange(){return new DirectExchange("error.direct");}/*** 创建一个队列,用于存储消费失败的消息* @return*/@Beanpublic Queue errorQueue(){return new Queue("error.queue");}/*** 绑定* @return*/@Beanpublic Binding errorBinding(){return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");}/*** 路由,当消费失败时,把消费失败的消息路由到此队列中,路由key为"error"* @param rabbitTemplate* @return*/@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}
}

可以看到,消息消费失败后并没有被丢失,而是路由到错误队列中存储了起来。因为错误队列没有设置RabbitListener,所以可以存储消息,等带代码问题被排查出来后,可以再针对该队列设置监听方法,消费这部分错误的消息。

在这里插入图片描述

另外,值得一提的是,消费者这边的控制台会报一个警告,提示路由密钥错误。我们可以理解,在RabbitMQ底层,会把消费失败了的消息,统一路由到一个地方去,而我们这种手动把消费失败的消息路由到自定义的队列中的方式,打破了这种“默认的规则”,所以报了一个这样的警告。这种警告是在可控范围内的。

在这里插入图片描述

总结

RabbitMQ发送消息,为了确保消息的可靠性,保证消息能被交换机、队列收到,消息能被正常消费,而不会因消费失败而丢失,提供了对应的一系列方法,并且最后还提供了两种消费失败重试方法,优化了消费过程,非常Nice。

http://www.jinmujx.cn/news/79443.html

相关文章:

  • 政府部门网站开发项目建设背景营销网站建设选择原则
  • 苏州建设培训中心网站在线一键生成网页
  • 企业网站建设收费标准线上销售渠道有哪几种
  • 专做项目报告的网站网络营销swot分析
  • 做网商必备网站计算机培训机构哪个最好
  • 买服务器做网站 镜像选什么凡科建站教程
  • 网站服务费算什么费用seo网站seo
  • 重庆建立公司网站贵港seo关键词整站优化
  • 怎么做旅游网站框架淘宝客推广有效果吗
  • 网站变成手机网站2022网络热词30个
  • 建设行业个人云网站重庆网站排名公司
  • 自己做装修网站网络营销在哪里学比较靠谱
  • 陕西优秀的企业门户网站建设seo的作用有哪些
  • vb.net 做网站电脑优化大师有用吗
  • 网站模板免费下载云资源企业网站快速建站
  • 网站做导航的地图seo产品优化免费软件
  • 网站设计想法百度搜索风云排行榜
  • 六盘水市住房和城乡建设局网站b2b商务平台
  • 贵州网络公司网站建设微信社群营销
  • wordpress官方主题推荐关键词优化软件排行
  • flash做的网站友情链接免费发布平台
  • 网站域名重定向如何做线上销售和推广
  • 7天酒店网站建设优势兰州网络推广与营销
  • 网站开发量计算seo搜索引擎优化工资多少钱
  • 网站备案信息注销原因百度登录入口百度
  • 长春二道网站建设今日热点
  • 淄博比较好的网站建设公司google浏览器网页版
  • 国外做mg动画的网站大全5g网络优化
  • 网站文件保护怎么做百度联系方式人工客服
  • 江门网站建设推广策划信阳百度推广公司电话