本文共 10503 字,大约阅读时间需要 35 分钟。
消息队列是一种应用程序间的通信方法,通过将消息发送到队列并由接收方读取,是一种脱耦(Decoupling)机制。其核心优势在于:
RabbitMQ是一款流行的开源消息中间件,由LIFY团队开发,广泛应用于互联网企业及中小型项目。其独特之处在于:
本文将基于RabbitMQ的Direct Exchange(直连交换机)进行实战演示,结合消息确认及Redis防重复消费优化。
采用Docker安装RabbitMQ,步骤如下:
# 拉取镜像docker pull rabbitmq:management# 启动RabbitMQdocker run -d -p 15672:15672 -p 5672:5672 rabbitmq:management
访问地址:http://localhost:15672
,默认账户密码均为guest
在pom.xml
中添加RabbitMQ支持:
org.springframework.boot spring-boot-starter-amqp
创建RabbitMQConfig.java
:
import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMQConfig { public static final String EXCHANGE_A = "exchange-A"; public static final String QUEUE_A = "queue-a"; public static final String ROUTING_KEY_A = "routing-key-A"; @Bean public DirectExchange exchangeA() { return new DirectExchange(EXCHANGE_A); } @Bean public Queue queueA() { return new Queue(QUEUE_A, true); } @Bean public Binding binding() { return BindingBuilder.bind(queueA()).to(exchangeA()).with(ROUTING_KEY_A); } @Bean @Scope("prototype") // 指ミ Cumhuriyetletonapis.org持有_RGisteredBean(GethetBeangenerates a new instance 每次静脉注射 public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMandatory(true); template.setMessageConverter(new SerializerMessageConverter()); return template; }}
修改application.yml
:
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest publisher-confirm-type: correlated publisher-returns: true
创建ConfirmCallbackService.java
:
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;@Componentpublic class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { private static final Logger log = LoggerFactory.getLogger(ConfirmCallbackService.class); @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.error("消息发送异常: cause={}", cause); } else { log.info("消息确认成功: correlationId={}, ack={}", correlationData.getId(), ack); } }}
创建ReturnCallbackService.java
:
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;@Componentpublic class ReturnCallbackService implements RabbitTemplate.ReturnCallback { private static final Logger log = LoggerFactory.getLogger(ReturnCallbackService.class); @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息已返回: replyCode={}, replyText={}, exchange={}, routingKey={}", replyCode, replyText, exchange, routingKey); }}
修改ProducerService.java
:
import org.springframework.amqp.core.MessageDeliveryMode;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Servicepublic class ProducerService { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ConfirmCallbackService confirmCallbackService; @Autowired private ReturnCallbackService returnCallbackService; public void sendMessage(String exchange, String routingKey, Object msg) { rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(confirmCallbackService); rabbitTemplate.setReturnCallback(returnCallbackService); final rabbitTemplate.convertAndSend( exchange, routingKey, msg, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, new CorrelationData(UUID.randomUUID().toString()) ); }}
修改RabbitMQController.java
:
@RestControllerpublic class RabbitMQController { private static final String SUCCESS = "success"; @Autowired private ProducerService producerService; @GetMapping("send") public String send() { producerService.sendMessage( RabbitMQConfig.EXCHANGE_A, RabbitMQConfig.ROUTING_KEY_A, "你好!"); return SUCCESS; }}
创建ReceiverMessage.java
:
import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.stereotype.Component;@Componentpublic class ReceiverMessage { @RabbitListener(queues = "queue-a") public void process(String msg, Message message, Channel channel) { MessageHeaders headers = message.getHeaders(); Long tag = (Long) headers.get(AmqpHeaders.DeliveryTag); try { // TODO: 处理业务逻辑 channel.basicAck(tag, false); } catch (Exception e) { log.error("消息处理失败:{}", e.getMessage()); boolean redelivered = (Boolean) headers.get(AmqpHeaders.Reredelivered); if (redelivered) { log.warn("消息已重复处理,拒绝处理!"); channel.basicAck(tag, false); } else { log.debug("消息将被重新发送队列……"); channel.basicNack(tag, false, true); } } }}
创建DelayRabbitConfig.java
:
import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class DelayRabbitConfig { /** * 延迟队列名称 */ private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue"; /** * DLX交换机名称 */ private static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange"; private static final String ORDER_DELAY_ROUTING_KEY = "order_delay"; private static final String ORDER_QUEUE_NAME = "user.order.queue"; private static final String ORDER_EXCHANGE_NAME = "user.order.exchange"; private static final String ORDER_ROUTING_KEY = "order"; @Bean public Queue delayOrderQueue() { Mapparams = new HashMap<>(); params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME); params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY); return new Queue(ORDER_DELAY_QUEUE, true, false, false, params); } @Bean public DirectExchange orderDelayExchange() { return new DirectExchange(ORDER_DELAY_EXCHANGE); } @Bean public Binding dlxBinding() { return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY); } @Bean public Queue orderQueue() { return new Queue(ORDER_QUEUE_NAME, true); } @Bean public TopicExchange orderTopicExchange() { return new TopicExchange(ORDER_EXCHANGE_NAME); } @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY); }}
修改ProducerService.java
:
// 在sendMessage方法中添加:message.getMessageProperties().setExpiration(1000 * 30 + "");
修改ReceiverMessage.java
:
@Overridepublic void process(Order order, Message message, Channel channel) { MessageHeaders headers = message.getHeaders(); Long tag = (Long) headers.get(AmqpHeaders.DeliveryTag); try { // 处理业务逻辑 channel.basicAck(tag, false); } catch (Exception e) { log.error("消息处理失败:{}", e.getMessage()); boolean redelivered = (Boolean) headers.get(AmqpHeaders.Reredelivered); if (redelivered) { log.warn("消息已重复处理,拒绝处理!"); channel.basicAck(tag, false); } else { log.debug("消息将被重新发送队列……"); channel.basicNack(tag, false, true); } }}
在消费项目的pom.xml
中添加Redis依赖:
org.springframework.boot spring-boot-starter-data-redis
修改application.yml
:
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest listener: simple: concurrency: 5 max-concurrency: 10 prefetch: 1 acknowledge-mode: manual default-requeue-rejected: true redis: database: 0 host: 192.168.0.150 port: 6379 password: timeout: 3000
修改ReceiverMessage.java
:
@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void process(String msg, Message message, Channel channel) { MessageHeaders headers = message.getHeaders(); Long tag = (Long) headers.get(AmqpHeaders.DeliveryTag); String msgId = (String) headers.get("spring_returned_message_correlation"); try { if (redisTemplate.opsForHash().hasKey("test", msgId)) { log.info("消息已被处理: msgId={}", msgId); channel.basicAck(tag, false); return; } redisTemplate.opsForHash().put("test", msgId, "处理中..."); int i = 1 / 0; // 确保代码报错,触发回调 channel.basicAck(tag, false); } catch (Exception e) { log.error("消息处理失败:{}", e.getMessage()); boolean redelivered = (Boolean) headers.get(AmqpHeaders.Reredelivered); if (redelivered) { log.warn("消息已重复处理,拒绝处理!"); channel.basicAck(tag, false); } else { log.debug("消息将被重新发送队列……"); channel.basicNack(tag, false, true); } }}
通过上述配置,可以确保消息在队列失败时不会重复消费,同时记录到Redis,避免死循环。
mvn spring-boot:run -Dabc=123
访问http://localhost:8080/send
,可以看到success
返回,说明消息已发送。
访问http://localhost:15672
,可以查看消息发送情况及状态。
mvn spring-boot:run -Dabc=123
控制台可以看到消息接收日志,说明消费成功。
访问http://localhost:8080/sendDelay
,等待1分钟,控制台可以看到消费日志。
在队列失败时,Redis记录将阻止重复处理,确保消息安全性。
通过以上步骤,可以完成RabbitMQ的快速入门及实战演示,包括消息生产、消费、延迟队列处理及Redis防重复消费配置。这系列文章适合学习 RabbitMQ 的开发者,帮助您快速掌握 RabbitMQ 的核心功能与应用场景。
转载地址:http://ldvkk.baihongyu.com/