深圳罗湖网站设计公司,企业做网站有什么作用,网站建设绿茶,网站之间的差异RabbitMQ是一个开源的消息队列系统#xff0c;实现了高级消息队列协议#xff08;AMQP#xff09;。它提供了强大的消息传递功能#xff0c;支持多种消息传递模式#xff0c;是分布式系统中常用的消息中间件。 RabbitMQ核心概念
消息中间件
消息中间件是分布式系统中重要…RabbitMQ是一个开源的消息队列系统实现了高级消息队列协议AMQP。它提供了强大的消息传递功能支持多种消息传递模式是分布式系统中常用的消息中间件。RabbitMQ核心概念消息中间件消息中间件是分布式系统中重要的组件用于解耦生产者和消费者之间不需要直接通信异步提高系统的响应性和吞吐量削峰缓冲瞬时高并发请求可靠确保消息不丢失基本概念Producer生产者发送消息的应用程序Consumer消费者接收消息的应用程序Queue队列存储消息的缓冲区Exchange交换机接收生产者发送的消息根据路由规则将消息转发到队列Binding绑定交换机和队列之间的连接关系Routing Key路由键消息发送时指定的路由信息Connection连接应用程序与RabbitMQ服务器之间的TCP连接Channel通道建立在连接之上的虚拟连接工作模式1. 简单模式Simple一个生产者对应一个消费者最简单的消息传递模式。生产者代码ConfigurationpublicclassRabbitConfig{BeanpublicQueuesimpleQueue(){returnnewQueue(simple.queue,true);}}ServicepublicclassSimpleProducer{AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendMessage(Stringmessage){rabbitTemplate.convertAndSend(simple.queue,message);System.out.println(发送消息message);}}消费者代码ComponentRabbitListener(queuessimple.queue)publicclassSimpleConsumer{RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println(接收消息message);// 处理消息逻辑}}2. 工作队列模式Work Queue一个生产者对应多个消费者消费者竞争消费消息实现负载均衡。生产者代码ConfigurationpublicclassRabbitConfig{BeanpublicQueueworkQueue(){returnnewQueue(work.queue,true);}}ServicepublicclassWorkProducer{AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendMessage(Stringmessage){rabbitTemplate.convertAndSend(work.queue,message);System.out.println(发送消息message);}}消费者代码ComponentpublicclassWorkConsumer1{RabbitListener(queueswork.queue)publicvoidreceiveMessage(Stringmessage){System.out.println(消费者1接收消息message);try{// 模拟处理时间Thread.sleep(1000);}catch(InterruptedExceptione){Thread.currentThread().interrupt();}System.out.println(消费者1处理完成message);}}ComponentpublicclassWorkConsumer2{RabbitListener(queueswork.queue)publicvoidreceiveMessage(Stringmessage){System.out.println(消费者2接收消息message);try{// 模拟处理时间Thread.sleep(2000);}catch(InterruptedExceptione){Thread.currentThread().interrupt();}System.out.println(消费者2处理完成message);}}3. 发布订阅模式Publish/Subscribe一个生产者发送消息通过交换机广播给多个队列多个消费者分别处理。配置代码ConfigurationpublicclassRabbitConfig{BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange(fanout.exchange);}BeanpublicQueuefanoutQueue1(){returnnewQueue(fanout.queue1,true);}BeanpublicQueuefanoutQueue2(){returnnewQueue(fanout.queue2,true);}BeanpublicBindingbinding1(FanoutExchangefanoutExchange,QueuefanoutQueue1){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}BeanpublicBindingbinding2(FanoutExchangefanoutExchange,QueuefanoutQueue2){returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}生产者代码ServicepublicclassFanoutProducer{AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendMessage(Stringmessage){rabbitTemplate.convertAndSend(fanout.exchange,,message);System.out.println(发布消息message);}}消费者代码ComponentRabbitListener(queuesfanout.queue1)publicclassFanoutConsumer1{RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println(消费者1接收消息message);}}ComponentRabbitListener(queuesfanout.queue2)publicclassFanoutConsumer2{RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println(消费者2接收消息message);}}4. 路由模式Routing根据路由键将消息发送到指定的队列。配置代码ConfigurationpublicclassRabbitConfig{BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange(direct.exchange);}BeanpublicQueuedirectQueue1(){returnnewQueue(direct.queue1,true);}BeanpublicQueuedirectQueue2(){returnnewQueue(direct.queue2,true);}BeanpublicBindingbindingDirect1(DirectExchangedirectExchange,QueuedirectQueue1){returnBindingBuilder.bind(directQueue1).to(directExchange).with(info);}BeanpublicBindingbindingDirect2(DirectExchangedirectExchange,QueuedirectQueue2){returnBindingBuilder.bind(directQueue2).to(directExchange).with(error);}}生产者代码ServicepublicclassDirectProducer{AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendInfoMessage(Stringmessage){rabbitTemplate.convertAndSend(direct.exchange,info,INFO: message);System.out.println(发送INFO消息message);}publicvoidsendErrorMessage(Stringmessage){rabbitTemplate.convertAndSend(direct.exchange,error,ERROR: message);System.out.println(发送ERROR消息message);}}消费者代码ComponentRabbitListener(queuesdirect.queue1)publicclassDirectConsumer1{RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println(INFO消费者接收消息message);}}ComponentRabbitListener(queuesdirect.queue2)publicclassDirectConsumer2{RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println(ERROR消费者接收消息message);}}5. 主题模式Topic根据通配符匹配的路由键将消息发送到相应的队列。配置代码ConfigurationpublicclassRabbitConfig{BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange(topic.exchange);}BeanpublicQueuetopicQueue1(){returnnewQueue(topic.queue1,true);}BeanpublicQueuetopicQueue2(){returnnewQueue(topic.queue2,true);}BeanpublicBindingbindingTopic1(TopicExchangetopicExchange,QueuetopicQueue1){returnBindingBuilder.bind(topicQueue1).to(topicExchange).with(user.*);}BeanpublicBindingbindingTopic2(TopicExchangetopicExchange,QueuetopicQueue2){returnBindingBuilder.bind(topicQueue2).to(topicExchange).with(user.#);}}生产者代码ServicepublicclassTopicProducer{AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendUserCreateMessage(Stringmessage){rabbitTemplate.convertAndSend(topic.exchange,user.create,创建用户message);System.out.println(发送用户创建消息message);}publicvoidsendUserUpdateMessage(Stringmessage){rabbitTemplate.convertAndSend(topic.exchange,user.update,更新用户message);System.out.println(发送用户更新消息message);}publicvoidsendUserDeleteMessage(Stringmessage){rabbitTemplate.convertAndSend(topic.exchange,user.delete,删除用户message);System.out.println(发送用户删除消息message);}}消费者代码ComponentRabbitListener(queuestopic.queue1)publicclassTopicConsumer1{RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println(用户操作消费者1接收消息message);}}ComponentRabbitListener(queuestopic.queue2)publicclassTopicConsumer2{RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println(用户操作消费者2接收消息message);}}高级特性消息确认机制生产者确认ConfigurationpublicclassRabbitConfig{BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactoryconnectionFactory){RabbitTemplaterabbitTemplatenewRabbitTemplate(connectionFactory);// 启用发布者确认rabbitTemplate.setConfirmCallback((correlationData,ack,cause)-{if(ack){System.out.println(消息发送成功correlationData.getId());}else{System.out.println(消息发送失败cause);}});// 启用发布者返回rabbitTemplate.setReturnsCallback(returned-{System.out.println(消息无法路由returned.getMessage());});returnrabbitTemplate;}}消费者确认ComponentRabbitListener(queuesconfirm.queue)publicclassConfirmConsumer{RabbitHandlerpublicvoidreceiveMessage(Stringmessage,Channelchannel,Messagemsg)throwsIOException{try{System.out.println(接收消息message);// 处理消息逻辑// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);}catch(Exceptione){// 处理失败拒绝消息并重新入队channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,true);}}}死信队列处理无法被正常消费的消息。配置代码ConfigurationpublicclassRabbitConfig{BeanpublicQueuenormalQueue(){MapString,ObjectargsnewHashMap();args.put(x-dead-letter-exchange,dead.letter.exchange);args.put(x-dead-letter-routing-key,dead.letter);args.put(x-message-ttl,60000);// 消息TTL 60秒returnnewQueue(normal.queue,true,false,false,args);}BeanpublicQueuedeadLetterQueue(){returnnewQueue(dead.letter.queue,true);}BeanpublicDirectExchangedeadLetterExchange(){returnnewDirectExchange(dead.letter.exchange);}BeanpublicBindingdeadLetterBinding(DirectExchangedeadLetterExchange,QueuedeadLetterQueue){returnBindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(dead.letter);}}死信消费者ComponentRabbitListener(queuesdead.letter.queue)publicclassDeadLetterConsumer{RabbitHandlerpublicvoidreceiveMessage(Stringmessage){System.out.println(处理死信消息message);// 处理死信消息的逻辑}}消息持久化ConfigurationpublicclassRabbitConfig{BeanpublicQueuedurableQueue(){// durabletrue 表示队列持久化returnnewQueue(durable.queue,true);}BeanpublicExchangedurableExchange(){// durabletrue 表示交换机持久化returnnewDirectExchange(durable.exchange,true,false);}}ServicepublicclassDurableProducer{AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendMessage(Stringmessage){// MessageProperties 设置消息持久化MessagePropertiespropertiesnewMessageProperties();properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);MessagemsgnewMessage(message.getBytes(),properties);rabbitTemplate.send(durable.exchange,durable.key,msg);}}监控和管理管理界面功能RabbitMQ管理界面提供了丰富的监控和管理功能Overview查看整体状态和统计信息Connections查看连接信息Channels查看通道信息Exchanges管理交换机Queues管理队列Admin用户和权限管理常用监控命令# 查看队列信息 rabbitmqctl list_queues name messages consumers # 查看交换机信息 rabbitmqctl list_exchanges # 查看绑定关系 rabbitmqctl list_bindings # 查看连接信息 rabbitmqctl list_connectionsSpring Boot Actuator集成dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-actuator/artifactId /dependencymanagement:endpoints:web:exposure:include:health,info,rabbitendpoint:rabbit:enabled:true性能优化1. 连接复用ConfigurationpublicclassRabbitConfig{BeanpublicConnectionFactoryconnectionFactory(){CachingConnectionFactoryconnectionFactorynewCachingConnectionFactory();connectionFactory.setHost(localhost);connectionFactory.setPort(5672);connectionFactory.setUsername(admin);connectionFactory.setPassword(admin123);// 设置连接缓存connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);connectionFactory.setConnectionCacheSize(10);// 设置通道缓存connectionFactory.setChannelCacheSize(25);returnconnectionFactory;}}2. 批量发送ServicepublicclassBatchProducer{AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendBatchMessages(ListStringmessages){ListMessagemessageListmessages.stream().map(msg-MessageBuilder.withBody(msg.getBytes()).build()).collect(Collectors.toList());// 批量发送消息for(Messagemessage:messageList){rabbitTemplate.send(batch.exchange,batch.routing.key,message);}}}3. 消费者优化ConfigurationpublicclassRabbitConfig{BeanpublicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(ConnectionFactoryconnectionFactory){SimpleRabbitListenerContainerFactoryfactorynewSimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 设置消费者并发数factory.setConcurrentConsumers(3);factory.setMaxConcurrentConsumers(10);// 设置预取数量factory.setPrefetchCount(5);// 设置手动确认factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);returnfactory;}}故障排查常见问题消息丢失检查队列和消息是否持久化确认生产者和消费者确认机制是否正确配置消息重复消费确保消费者处理逻辑的幂等性使用消息ID进行去重连接断开检查网络连接稳定性配置连接重试机制性能问题监控队列长度和消费者数量调整预取数量和并发消费者数日志配置dependency groupIdorg.springframework.amqp/groupId artifactIdspring-rabbit/artifactId /dependencylogging:level:com.rabbitmq:DEBUGorg.springframework.amqp:DEBUG