一、初始化项目

1、引入依赖

        <!--RabbitMq自动配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--消息转换器,用于配置消息序列化-->
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
        </dependency>

2、配置文件

server:
  port: 8080
spring:
  application:
    name: rabbitmq-demo
  rabbitmq:
    host: 127.0.0.1
    port: 5673
    username: root
    password: 123456

3、配置消息转化器

  • 在SpringBoot启动类中注入
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

4、RabbitMq配置类

/**
 * @Description: mq配置类,配置交换机 队列 和 绑定
 * @Date: 2024/3/26
 * @Author: tajiaoyezi
 */
@Configuration
public class RabbitConfig {
    /**
     * 创建 Queue
     */
    @Bean
    public Queue demoQueue() {
        return new Queue(MqMessage.QUEUE, // Queue 名字
                true, // durable: 是否持久化
                false, // exclusive: 是否排它
                false); // autoDelete: 是否自动删除
    }

    /**
     * 创建 Direct Exchange
     */
    @Bean
    public DirectExchange demoExchange() {
        return new DirectExchange(MqMessage.EXCHANGE,
                true,  // durable: 是否持久化
                false);  // exclusive: 是否排它
    }


    /**
     * 创建 Binding
     * Exchange:MqMessage.EXCHANGE
     * Routing key:MqMessage.ROUTING_KEY
     * Queue:MqMessage.QUEUE
     */
    @Bean
    public Binding demoBinding() {
        return BindingBuilder.bind(demoQueue()).to(demoExchange()).with(MqMessage.ROUTING_KEY);
    }

}

5、消息实体类

/**
 * @Description: 消息
 * @Date: 2024/3/26
 * @Author: tajiaoyezi
 */
@Data
@Builder
public class MqMessage implements Serializable {
    @Serial
    private static final long serialVersionUID = 1L;
    /**
     * 交换机
     */
    public static final String EXCHANGE = "DEMO_EXCHANGE";
    /**
     * 消息队列
     */
    public static final String QUEUE = "DEMO_QUEUE";
    /**
     * routingKey
     */
    public static final String ROUTING_KEY = "DEMO_ROUTING_KEY";
    /**
     * 内容
     */
    private String content;
    /**
     * 类型
     */
    private String type;
}

6、生产者

/**
 * @Description: 生产者
 * @Date: 2024/3/26
 * @Author: tajiaoyezi
 */
@Component
@RequiredArgsConstructor
public class DemoProducer {

    private final RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     */
    public void sendMessage(String content,String fileType) {
        MqMessage mqMessage = MqMessage.builder()
                .content(content)
                .type(fileType)
                .build();
        // 发送消息
        rabbitTemplate.convertAndSend(MqMessage.EXCHANGE, MqMessage.ROUTING_KEY, mqMessage);
    }
}

7、消费者

/**
 * @Description: 消费者
 * @Date: 2024/3/26
 * @Author: tajiaoyezi
 */
@Slf4j
@Component
public class DemoConsumer {


    /**
     * 消费者
     *
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(MqMessage.QUEUE),
            exchange = @Exchange(name = MqMessage.EXCHANGE),
            key = MqMessage.ROUTING_KEY
    ), concurrency = "1")
    @RabbitHandler
    public void onMessage(MqMessage message, Channel channel, Message messageStatus)  {
        // 获取消息状态信息
        long deliveryTag = messageStatus.getMessageProperties().getDeliveryTag();
        try {
            log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().threadId(), message);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

二、自动ack

  • 测试自动ack,这里生产者不进行修改,修改配置文件和消费者

1、配置文件

server:
  port: 8080
spring:
  application:
    name: rabbitmq-demo
  rabbitmq:
    host: 127.0.0.1
    port: 5673
    username: root
    password: 123456
    listener:
      simple:
        default-requeue-rejected: false # 消息拒绝后是否重新入队
        retry:
          enabled: true # 是否启动消息重试机制
          initial-interval: 1000 # 重试间隔 1000毫秒
          max-attempts: 3 # 重试次数 3次
          max-interval: 10000 # 重试间隔 10000毫秒
          multiplier: 2.0 # 重试间隔倍数
          stateless: true  # 是否无状态,true:无状态,false:有状态

2、消费者

  • 手动抛出异常,测试自动ack,重试次数为3
/**
 * @Description: 消费者
 * @Date: 2024/3/26
 * @Author: tajiaoyezi
 */
@Slf4j
@Component
public class DemoConsumer {


    /**
     * 消费者
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(MqMessage.QUEUE),
            exchange = @Exchange(name = MqMessage.EXCHANGE),
            key = MqMessage.ROUTING_KEY
    ), concurrency = "1")
    @RabbitHandler
    public void onMessage(MqMessage message, Channel channel, Message messageStatus) {
        // 获取消息状态信息
        long deliveryTag = messageStatus.getMessageProperties().getDeliveryTag();
        log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().threadId(), message);
        // 手动制造异常,模拟消息处理失败
        int i = 1 / 0;
    }
}
  • 这里可以看到消息重试了三次后抛出异常,符合配置文件中的重试次数为3

1-自动ack.png

三、手动ack

1、配置文件

  • 新增配置acknowledge-mode: manual开启手动ack
server:
  port: 8080
spring:
  application:
    name: rabbitmq-demo
  rabbitmq:
    host: 127.0.0.1
    port: 5673
    username: root
    password: 123456
    listener:
      simple:
        default-requeue-rejected: false # 消息拒绝后是否重新入队
        acknowledge-mode: manual # 消息确认模式,manual:手动确认,不设置默认自动

2、消费者

/**
 * @Description: 消费者
 * @Date: 2024/3/26
 * @Author: tajiaoyezi
 */
@Slf4j
@Component
public class DemoConsumer {


    /**
     * 消费者
     *
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(MqMessage.QUEUE),
            exchange = @Exchange(name = MqMessage.EXCHANGE),
            key = MqMessage.ROUTING_KEY
    ), concurrency = "1")
    @RabbitHandler
    public void onMessage(MqMessage message, Channel channel, Message messageStatus)  {
        // 获取消息状态信息
        long deliveryTag = messageStatus.getMessageProperties().getDeliveryTag();
        try {
            log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().threadId(), message);
            int a=1/0;
        } catch (Exception e) {
            // 判断消息状态进行手动ack
            if (messageStatus.getMessageProperties().getRedelivered()) {
                log.error("消息已重复处理失败,拒绝再次接收");
                try {
                    channel.basicReject(deliveryTag, false);
                } catch (IOException ex) {
                    throw new RuntimeException(ex);
                }
                // 再拒绝消息后,可以进行一些其他操作,让消息进入死信队列或者其他的消息补偿机制
                // ......
            } else {
                log.info("手动消息重发");
                try {
                    channel.basicNack(deliveryTag, false, true);
                } catch (IOException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }
    }
}
  • 这里手动进行了一次消息重发,再次失败后禁止了消息重发

2-手动ack.png