SpringBoot整合RabbitMQ实现两种延时队列(订单延迟取消)
AI-摘要
Tianli GPT
AI初始化中...
介绍自己
生成本文简介
推荐相关文章
前往主页
前往tianli博客
一、延时队列概念篇
1、什么是延时队列,延时队列应用于什么场景
延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。
那么,为什么需要延迟消费呢?我们来看以下的场景:
- 网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝、去哪儿网)
- 系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会
2、 两种延迟队列形式
- 第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)
- 第二种方式:利用rabbitmq中的插件x-delay-message
2.1 TTL
- RabbitMQ可以针对队列设置x-expires(则队列中所有的消息都有相同的过期时间)或者针对Message设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
2.2 DXL
RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。
- x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange。
- x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
二、案例:订单超时未支付
1、方式一: TTL+DLX
1.1 导入Maven依赖
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.2 在application.yml配置文件中引入RabbitMq配置信息
#配置rabbitmq
rabbitmq:
host: 192.168.100.101
port: 5672
username: wanglei
password: 123456
virtual-host: /
1.3 封装RabbiMq常量类
public class MqConstants {
/**
* 死信交换机名称
*/
public static final String DELAY_EXCHANGE = "user.order.delay_exchange";
/**
* 死信接收交换机
*/
public static final String DELAY_RECEIVE_EXCHANGE = "user.order.receive_exchange";
/**
* 死信队列名称
*/
public static final String DELAY_QUEUE = "user.order.delay_queue";
/**
* 死信接收队列
*/
public static final String DELAY_RECEIVE_QUEUE = "user.order.receive_queue";
/**
* 死信 bindingKey
*/
public static final String DELAY_KEY="user.order.delay_key";
/**
* 死信接收 bindingKey
*/
public static final String DELAY_RECEIVE_KEY="user.order.receive_key";
}
1.4 配置RabbitMq配置类
@Configuration
public class RabbitmqConfig {
/**
* 死信交换机
*/
@Bean
public DirectExchange userOrderDelayExchange() {
return new DirectExchange(MqConstants.DELAY_EXCHANGE);
}
/**
* 死信队列
*/
@Bean
public Queue userOrderDelayQueue() {
Map<String, Object> map = new HashMap<>(16);
// 添加死信接收交换机名称
map.put("x-dead-letter-exchange", MqConstants.DELAY_RECEIVE_EXCHANGE);
// 添加死信接收key
map.put("x-dead-letter-routing-key", MqConstants.DELAY_RECEIVE_KEY);
// 绑定死信队列
return new Queue(MqConstants.DELAY_QUEUE, true, false, false, map);
}
/**
* 死信交换机绑定死信队列
* .with(死信key)
*/
@Bean
public Binding userOrderDelayBinding() {
return BindingBuilder.bind(userOrderDelayQueue()).to(userOrderDelayExchange()).with(MqConstants.DELAY_KEY);
}
/**
* 死信接收交换机
*/
@Bean
public DirectExchange userOrderReceiveExchange() {
return new DirectExchange(MqConstants.DELAY_RECEIVE_EXCHANGE);
}
/**
* 死信接收队列
*/
@Bean
public Queue userOrderReceiveQueue() {
return new Queue(MqConstants.DELAY_RECEIVE_QUEUE);
}
/**
* 死信接收交换机绑定消费队列
* .with(死信接收key)
*/
@Bean
public Binding userOrderReceiveBinding() {
return BindingBuilder.bind(userOrderReceiveQueue()).to(userOrderReceiveExchange()).with(MqConstants.DELAY_RECEIVE_KEY);
}
}
1.5 发送消息
/*
当用户提交订单后,携带订单id发送消息,设置延迟时间
*/
rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_KEY, order.getId(), message -> {
message.getMessageProperties().setExpiration("20000");
return message;
});
1.6 接收消息
/**
* @author leaflei
* 监听酒店数据的增删改
*/
@Slf4j
@Component
public class OrderListener {
@Autowired
private OrderMapper orderMapper;
@Autowired
private OrderDetailMapper orderDetailMapper;
@Autowired
private ItemClient itemClient;
/**
* 绑定死信接收队列
*
* @param id 订单id
*/
@RabbitListener(queues = MqConstants.DELAY_RECEIVE_QUEUE)
public void insertListen(Long id) {
log.info("监听到了消息 ===> {}", id);
// 根据订单id查询指定的订单数据
Order order = orderMapper.selectById(id);
// 判断订单状态:
if (order.getStatus() == 1) {
//订单未支付,修改订单状态为:取消 5
Order newOrder = new Order();
newOrder.setId(id);
newOrder.setStatus(5);
// 修改数据库中的订单数据
orderMapper.updateById(newOrder);
//根据订单id,获取订单详情数据
OrderDetail orderDetail = orderDetailMapper
.selectOne(new LambdaQueryWrapper<OrderDetail>()
.eq(OrderDetail::getOrderId, id));
//获取订单中的购买的商品数量
Integer num = orderDetail.getNum();
//获取订单详情中的商品id
Long itemId = orderDetail.getItemId();
//远程调用,恢复库存数据
itemClient.updateItemStock(itemId, 1, num);
}
log.info("消息已消费");
}
}
2、 方式二:使用DLX延迟队列插件
- DLX的延时队列需要您在RabbitMq服务器上安装 rabbitmq_delayed_message_exchange-3.10.2.ez
- 在 RabbitMQ 的 3.5.7 版本之后,提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列 ,同时需保证 Erlang/OPT 版本为 18.0 之后。
- 我这里 MQ 的版本是 3.9.11,现在去 GitHub 上根据版本号下载插件
2.1 安装插件并启动
-我用的Docker部署的mq,所以把下载好的插件复制到容器中的plugins目录
docker cp /rabbitmq_delayed_message_exchange-3.9.0.ez rabbit:/plugins
- 进入容器
docker exec -it rabbit /bin/bash
- 查看插件
cd plugins
ls |grep delay
- 在plugins中启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 退出容器,并重启
exit
docker restart rabbit
- 容器启动成功之后,登录RabbitMQ的管理页面,找到ExchangesTab页。点击Add a new exchange,在Type里面查看是否有x-delayed-message选项,如果存在就代表插件安装成功。

2.2 引入依赖和修改配置与方式一相同
略
2.3 配置DLX方式的RabbitMq延时队列
package com.bean.springcloudproduct.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @Description //TODO
* @Date $ $
* @Author huangwb
**/
@Configuration
public class RabbitmqConfig {
/**
* 延时队列交换机
* 注意这里的交换机类型:CustomExchange
*
* @return
*/
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
//属性参数 交换机名称 交换机类型 是否持久化 是否自动删除 配置参数
return new CustomExchange("delay_exchange", "x-delayed-message", true, false, args);
}
/**
* 延时队列
*
* @return
*/
@Bean
public Queue delayQueue() {
//属性参数 队列名称 是否持久化
return new Queue("delay_queue", true);
}
/**
* 给延时队列绑定交换机
*
* @return
*/
@Bean
public Binding cfgDelayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay_key").noargs();
}
}
2.4 发送消息
@GetMapping("test/{time}/{name}")
public String createOrderTest(@PathVariable("time") Integer time, @PathVariable("name") String name) {
OrderMaster orderMaster = new OrderMaster();
//订单未完成
orderMaster.setOrderStatus(0);
//未付款
orderMaster.setPayStatus(0);
orderMaster.setBuyerName(name);
orderMaster.setBuyerAddress("湖南长沙");
orderMaster.setBuyerPhone("手机号");
orderMaster.setOrderAmount(BigDecimal.ZERO);
orderMaster.setCreateTime(DateUtils.getCurrentDate());
orderMaster.setOrderId(UUID.randomUUID().toString().replaceAll("-", ""));
orderMasterService.insert(orderMaster);
//第一个参数是前面RabbitMqConfig的交换机名称 第二个参数的路由名称 第三个参数是传递的参数 第四个参数是配置属性
this.rabbitTemplate.convertAndSend(
"delay_exchange",
"delay_key",
orderMaster,
message -> {
//配置消息的过期时间
message.getMessageProperties().setDelay(time);
return message;
}
);
return "创建订单成功";
}
2.5 接收消息
package com.bean.springcloudproduct.service;
import com.bean.springcloudcommon.model.OrderMaster;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Objects;
/**
* @Description //TODO
* @Author huangwb
**/
@Component
public class OrderReceiver {
@Autowired
private OrderMasterService orderMasterService;
//监听消息队列
@RabbitListener(queues = "delay_queue")
public void consumeMessage(OrderMaster order) throws IOException {
try {
//如果订单状态不是0 说明订单已经被其他消费队列改动过了 加一个状态用来判断集群状态的情况
if (Objects.equals(0,order.getOrderStatus())) {
//设置订单过去状态
order.setOrderStatus(-1);
System.out.println(order.getBuyerName());
orderMasterService.updateByPrimaryKeySelective(order);
}
} catch (Exception e) {
}
}
}
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 leaflei
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果