Saul's blog Saul's blog
首页
后端
分布式
前端
更多
分类
标签
归档
友情链接
关于
GitHub (opens new window)

Saul.J.Wu

立身之本,不在高低。
首页
后端
分布式
前端
更多
分类
标签
归档
友情链接
关于
GitHub (opens new window)
  • 安全框架(认证、授权)

  • 分布式事务

  • 消息队列

    • RabbitMQ

      • RabbitMQ入门
        • docker安装RabbitMQ
          • 导入和到处配置
          • 连接
          • 交换机
          • 管理用户
          • 虚拟机主机
        • 运行机制
        • Exchange类型
          • topic匹配示例:
        • springboot整合RabbitMQ
          • maven
          • yml
          • 开启RabbitMQ
        • 测试
          • 创建交换机
          • 创建队列
          • 创建绑定关系
          • 发送消息
          • 发送消息对象
        • 配置RabbitMQ json序列化
        • 监听队列
          • @RabbitListener
          • @RabbitHandler
        • RabbitMQ消息确认机制-可靠抵达
          • 发送端确认-ConfirmCallback(生产者)
          • 消费端确认-消费者
        • spirngboot自动创建交换机和队列
        • 持久化
        • 消息丢失、重复、积压等解决方案
          • 1、消息丢失
          • 2、消息重复
          • 3、消息积压
  • K8S

  • 分布式
  • 消息队列
  • RabbitMQ
SaulJWu
2021-07-29

RabbitMQ入门

# docker安装RabbitMQ

docker run -d --name rabbitmq \
-p 5671:5671 \
-p 5672:5672 \
-p 4369:4369 \
-p 25672:25672 \
-p 15671:15671 \
-p 15672:15672 \
-v /opt/rabbitmq/data:/var/lib/rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--restart=always \
rabbitmq:management
1
2
3
4
5
6
7
8
9
10
11
12
  • 4369,25672:Erlang发现&集群端口
  • 5672,5671:服务端口,AMQP端口
  • 15672:web管理后台端口
  • 661613,61614:STOMP协议端口
  • 1883,883:MQTT协议端口

启动后,只需要访问15672端口

默认账号密码都是guest

image-20210729124531208

# 导入和到处配置

image-20210729124626042

则两个都是用来导入导出,方便迁移。

# 连接

image-20210729124650657

一个客户端,只有一个连接,一个连接里面有多个通道

# 交换机

image-20210729124714791

这些是默认的交换机

# 管理用户

image-20210729124814344

# 虚拟机主机

image-20210729124831226

默认是一个/主机

虽然你可以根据不同环境建立不同虚拟机,但是其实不建议这么做,现在跑多一个docker容器也很简单。

# 运行机制

image-20210729125139169

# Exchange类型

四种类型:

  • direct
  • fanout
  • topic
  • headers
Exchange类型 通信方式 路由键匹配 说明
direct 点对点 直接交换机、完全匹配、单播的模式 消息中的路由键(Routing key)如果和Binding中的binding key一致,
交换机就将消息发到对应的队列中。
路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,
则只转发routing key标记为“dog”的消息,不会转发“dog.puppy”等等信息
fanout 发布/订阅 子网广播、无视路由键 每个发到fanout类型交换器的消息都回分到所有绑定的队列上去。
fanout交换器不处理路由键,只是简单的将对垒绑定到交换器上,
每个发送到交换器的消息都回被转发到该交换器绑定的所有对队列上。
很像子网广播,每台之王内的主机都获得了一份复制的消息。
fanout类型转发消息是最快的
topic 发布/订阅 模式匹配分配消息、部分广播 topic交换器通过模式匹配分配消息的路由键属性,
将路由键其和某个模式进行匹配,此时队列需要绑定到一个模式上。
它将路由键和路由键和和绑定键的字符串切分陈单词,这些单词之间用点隔开。
它同样也回识别两个通配符:符号“#”和符号“”。
“#” 匹配0个或多个单词,
“*” 匹配1个单词
headers 点对点 完全匹配、单播的模式 headers匹配AMQP消息的header而不是路由键
headers交换器和dircet交换器完全一致,但是性能差很多,目前几乎用不到

# topic匹配示例:

image-20210729135251887

# springboot整合RabbitMQ

# maven

<!--rabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1
2
3
4
5

# yml

# rabbitMq配置
spring:
  rabbitmq:
    host: 192.168.31.250
    port: 5672
    # 虚拟主机
    virtual-host: /
1
2
3
4
5
6
7

引入了amqp,RabbitAutoConfiguration就会自动生效

给容器中自动配置了

1、RabbitTemplate

2、AmapAdmin

3、CachingConnectionFactory

4、RabbitMessagingTemplate

配置文件是:

@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties
1
2

# 开启RabbitMQ

@EnableRabbit
1

# 测试

# 创建交换机

@Autowired
AmqpAdmin amqpAdmin;

@Test
void createExchange() {
    /**
         *  name交换机名字,
         *  durable是否持久化,
         *  autoDelete是否自动删除
         */

    DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
    amqpAdmin.declareExchange(directExchange);
    log.info("[{}]创建成功",directExchange);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

image-20210730140719580

重启docker容器,发现这个交换机确实做到持久化了

image-20210730140811108

# 创建队列

@Test
void createQueue() {
    /**
         * name队列名字,
         * durable是否持久化,
         * exclusive是否排他性(只能有一个连接)
         * autoDelete是否自动删除
         */
    Queue queue = new Queue("hello-java-queue",true,false,false);
    amqpAdmin.declareQueue(queue);
    log.info("[{}]创建成功",queue);
}
1
2
3
4
5
6
7
8
9
10
11
12

image-20210730141257899

重启docker,发现队列也持久化了。

# 创建绑定关系

@Test
void createBinding() {
    /**
         * destination 目的地,
         * destinationType 目的地类型,
         * exchange 交换机,
         * routingKey 路由键
         * 将exchange指定的交换机和destination目的地进行绑定,使用routingKey作为路由键
         * arguments:参数,可以为null
         */
    Binding binding = new Binding("hello-java-queue",
                                  Binding.DestinationType.QUEUE,
                                  "hello-java-exchange",
                                  "hello.java",
                                  null);
    amqpAdmin.declareBinding(binding);
    log.info("[{}]创建成功",binding);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

image-20210730142104533

交换机和队列都持久化了,那么绑定关系自然就是持久化。

# 发送消息

@Autowired
RabbitTemplate rabbitTemplate;

@Test
    void sendMessage() {
        String msg = "HelloWorld!";
        //还可以传第4个参数,配合本地消息表使用
        rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", msg);
        log.info("消息发送完成[{}]", msg);
    }

1
2
3
4
5
6
7
8
9
10
11

发送完之后,可以去服务器端的客户端页面获取信息

在服务器端的客户端页面从队列中获取消息是一个危险的动作,生产环境一定要了解业务之后再做操作

image-20210730143741010

可以看到这里已经有一条信息,点击进去

image-20210730143821925

接收消息,有几种模式:

image-20210730143843268

  • Nack message requeue true

获取消息,但是不做ack应答确认,消息重新入队

  • Automatic ack

获取消息,自动做ack应答确认,消息将会删除

  • reject requeue true

拒绝获取消息,消息重新入队

  • reject requeue false

拒绝获取消息,消息不重新入队,将会被删除

image-20210730144401446

可以看到,消息已经被取出来,当我们第二次去签收时,发现队列已经为空

image-20210730144433619

# 发送消息对象

@Test
void sendMessage2() {
    //如果发送消息时对象必须Serializable
    SysOrgEntity orgEntity = new SysOrgEntity();
    orgEntity.setOrgCode("dev");
    orgEntity.setOrgName("测试机构");
    rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orgEntity);
    log.info("消息发送完成[{}]", orgEntity);
}
1
2
3
4
5
6
7
8
9

image-20210730144755648

发现对象消息是序列化进去的,如果想要以json发送出去

# 配置RabbitMQ json序列化

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

配置完后,再次发送一个消息对象

image-20210730145342721

发现已经成功序列化了。

可以看到,content_type支持json,另外也可以看到TypeId看到类名,到时也可以转换过来。

# 监听队列

注解类型 @RabbitListener @RabbitHandler
注解位置 类+方法上 方法
说明监听哪些队列即可 只能注解在方法上,一般要配合@RabbitListener使用,然后重载方法,对一个队列接收不同类型的数据

# @RabbitListener

监听消息使用@RabbitListener,必须要有@enableRabbit,另外这个类,必须注入到容器中,可以是Service Component等等

@RabbitListener(queues = {"hello-java-queue"})
public void recieveMessage(Object message) {
    log.info("接收到消息...内容,{}==>类型:{}",message,message);
}
1
2
3
4
@RabbitListener(queues = {"hello-java-queue"})
public void recieveMessage(Message message) {
    //消息头
    MessageProperties messageProperties = message.getMessageProperties();
    //消息体
    byte[] body = message.getBody();
    log.info("接收到消息...内容,{}==>类型:{}",message,message);
}
1
2
3
4
5
6
7
8
@RabbitListener(queues = {"hello-java-queue"})
public void recieveMessage(Message message, SysOrgVo content) {
    //消息头
    MessageProperties messageProperties = message.getMessageProperties();
    //消息体
    byte[] body = message.getBody();
    log.info("接收到消息...内容,{}==>类型:{}", message, content);
}
1
2
3
4
5
6
7
8
接收参数可以写以下类型:
1、message import org.springframework.amqp.core.Message;
2、T 接收的消息类型,发的是什么类型的消息,收的也可以是什么信息
3、Channel 通道,当前传输数据的通道 import com.rabbitmq.client.Channel;
1
2
3
4

队列可以很多人监听,但是最终只有一个服务消费,消费了消息就会被删除。

而且只有一个消息完全处理完,方法运行结束,才可以接收下一个消息。

# @RabbitHandler

只能注解在方法上,一般要配合@RabbitListener使用,然后重载方法,对一个队列接收不同类型的数据

# RabbitMQ消息确认机制-可靠抵达

在分布式系统中,在关键环节一定要保证消息不丢失,特别容易网络抖动或者不稳定,导致丢失

  • 保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制
  • pulisher confirmCallback 确认模式
  • publisher returnCallback 未投递到queue退回模式
  • consumer ack机制

image-20210802095941412

# 发送端确认-ConfirmCallback(生产者)

  • spring.rabbitmq.publisher-confirms=true
    • 在开启connectionFactory的时候设置PublisherConfirms(true)选项,开启confirmcallback
    • CorrelationData:用来表示当前消息唯一性
    • 消息只要被broker接收到会执行confirmCallback,如果是cluster模式,需要所有broker接收到才会调用confirmCallback。
    • 被broker接收到只能表示message已经到达服务器,并不能保证消息一定会被投递到目标queue里。所以需要用到接下来的returnCallback。
#Deprecated configuration property 'spring.rabbitmq.publisher-confirms' 
#已经被弃用
1
2
rabbitmq.publisher-confirm-type=correlated
1

如果该属性会触发以下方法:

/**
  * 设置生产者消息publish-confirm回调函数
  */
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if(!ack){
        LoggerUtil.error(RabbitConfig.class, StringUtils.join("publishConfirm消息发送到交换器被退回,Id:", correlationData.getId(), ";退回原因是:", cause));
    } else {
        LoggerUtil.info(RabbitConfig.class, "发送消息到交换器成功,MessageId:"+correlationData.getId());
    }
});
1
2
3
4
5
6
7
8
9
10
# spring.rabbitmq.publisher-confirm-type新版发布确认属性有三种确认类型
	/**
	 * The type of publisher confirms to use.
	 */
	public enum ConfirmType {

		/**
		 * Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
		 * within scoped operations.
		 */
		SIMPLE,

		/**
		 * Use with {@code CorrelationData} to correlate confirmations with sent
		 * messsages.
		 */
		CORRELATED,

		/**
		 * Publisher confirms are disabled (default).
		 */
		NONE

	}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  • NONE值是禁用发布确认模式,是默认值
  • CORRELATED值是发布消息成功到交换器后会触发回调方法
  • SIMPLE值经测试有两种效果,
    • 其一效果和CORRELATED值一样会触发回调方法,
    • 其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,
    • 根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;

# 可靠抵达队列 ReturnCallBack

spring.rabbitmq.publisher-returns=true

# 异步优先回调确认

所以mq的配置是

spring:
    rabbitmq:
        host: 192.168.31.250
        port: 5672
        # 虚拟主机
        virtual-host: /
        # 生产者发送确认
        publisher-confirm-type: correlated
        # 消息抵达队列确认
        publisher-returns: true
        # 只要抵达队列,以异步方式优先回调return确认
        template:
          mandatory: true
1
2
3
4
5
6
7
8
9
10
11
12
13

所以发送端最终确认:

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;

@Configuration
public class MyRabbitConfig {

  @Autowired
  RabbitTemplate rabbitTemplate;

  /**
   * 使用JSON序列化机制,进行消息转换
   *
   * @return
   */
  @Bean
  public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
  }

  /**
   * 定制RabbitTemplate
   * 1、服务收到消息就会回调
   * 1、spring.rabbitmq.publisher-confirms: true
   * 2、设置确认回调
   * 2、消息正确抵达队列就会进行回调
   * 1、spring.rabbitmq.publisher-returns: true
   * spring.rabbitmq.template.mandatory: true
   * 2、设置确认回调ReturnCallback
   * <p>
   * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
   */
  @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
  public void initRabbitTemplate() {

    /**
     * 1、只要消息抵达Broker就ack=true
     * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
     * ack:消息是否成功收到
     * cause:失败的原因
     */
    //设置确认回调
    rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
      System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
    });

     /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

# 消费端确认-消费者

# 手动确认模式

spring:
    rabbitmq:
        # 手动ack消息
        listener:
          simple:
            acknowledge-mode: manual
1
2
3
4
5
6

当消费者宕机时,没手动签收的消息,回重新回到队列。并且是Ready状态。

long deliveryTag = message.getMessageProperties().getDeliveryTag();
try{
    /**
     * 签收当前消息,非批量模式
     * long deliveryTag 消息标识
     * boolean multiple 是否批量操作
     */
	channel.basicAck(deliveryTag,false);
}catch(Excetion e){
    //网络中断
}
1
2
3
4
5
6
7
8
9
10
11

# 拒接消息

/**
 * long deliveryTag 消息标识
 * boolean multiple 是否批量操作
 * boolean requeue 是否重新进入队列
 */
channel.basicNack(deliveryTag,false,true);
/**
 * long deliveryTag 消息标识
 * boolean requeue 是否重新进入队列
 */
channel.basicReject(deliveryTag,true);
1
2
3
4
5
6
7
8
9
10
11

只要是消费者宕机了,或者没一个个确认,那么就重新重入队列

# spirngboot自动创建交换机和队列

@Configuration
public class OrderMqConfig {

    @Value("${scenic-mq-config.order-delay-queue-milliseconds}")
    private int delayQueueMilliseconds;

    /**
     * 订单交换机
     * String name 交换机名字
     * boolean durable 是否持久化
     * boolean autoDelete 是否自动删除
     * Map<String, Object> arguments 自定义参数
     */
    @Bean
    public Exchange orderEventExchange() {
        return new TopicExchange(
                OrderMqConstans.ORDER_EVENT_EXCHANGE,
                true,
                false
        );
    }

    /**
     * 订单专用延迟队列(死信队列)
     * String name 队列名字
     * boolean durable, 是否持久化
     * boolean exclusive, 是否排他,是否只能有一个连接
     * boolean autoDelete, 是否自动删除
     * Map<String, Object> arguments 自定义参数
     */
    @Bean
    public Queue orderDelayQueue() {
        Map<String, Object> args = new HashMap<>();
        //死信交换机
        args.put("x-dead-letter-exchange", OrderMqConstans.ORDER_EVENT_EXCHANGE);
        //死信路由键
        args.put("x-dead-letter-routing-key", OrderMqConstans.ORDER_RELEASE_ROUTING_KEY);
        //死信时间(毫秒)
        args.put("x-message-ttl", delayQueueMilliseconds);
        Queue queue = new Queue(
                OrderMqConstans.ORDER_DELAY_QUEUE,
                true,
                false,
                false,
                args
        );
        return queue;
    }

    /**
     * 订单完成的队列
     */
    @Bean
    public Queue orderFinishQueue() {
        return new Queue(
                OrderMqConstans.ORDER_FINISH_QUEUE,
                true,
                false,
                false
        );
    }

    /**
     * 订单释放的队列
     */
    @Bean
    public Queue orderReleaseQueue() {
        return new Queue(
                OrderMqConstans.ORDER_RELEASE_QUEUE,
                true,
                false,
                false
        );
    }

    /**
     * 订单创建成功的绑定关系
     * 将队列与交换机绑定,使用指定路由健
     * <p>
     * String destination,目的地名字
     * Binding.DestinationType destinationType, 目的地类型:交换机/队列
     * String exchange, 交换机
     * String routingKey, 绑定关系使用的路由健
     * Map<String, Object> arguments
     */
    @Bean
    public Binding orderCreateBinding() {
        return new Binding(
                OrderMqConstans.ORDER_DELAY_QUEUE,
                Binding.DestinationType.QUEUE,
                OrderMqConstans.ORDER_EVENT_EXCHANGE,
                OrderMqConstans.ORDER_CREATE_ROUTING_KEY,
                null
        );
    }

    /**
     * 订单完成的绑定关系
     */
    @Bean
    public Binding orderFinishBinding() {
        return new Binding(
                OrderMqConstans.ORDER_FINISH_QUEUE,
                Binding.DestinationType.QUEUE,
                OrderMqConstans.ORDER_EVENT_EXCHANGE,
                OrderMqConstans.ORDER_FINISH_ROUTING_KEY,
                null
        );
    }

    /**
     * 订单释放的绑定关系
     */
    @Bean
    public Binding orderReleaseBinding() {
        return new Binding(
                OrderMqConstans.ORDER_RELEASE_QUEUE,
                Binding.DestinationType.QUEUE,
                OrderMqConstans.ORDER_EVENT_EXCHANGE,
                OrderMqConstans.ORDER_RELEASE_ROUTING_KEY,
                null
        );
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125

# 持久化

  • 队列持久化
  • 交换机持久化
  • 消息持久化

要三个一起持久化,才是真正的持久化!

# 消息丢失、重复、积压等解决方案

# 1、消息丢失

  • 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有容错机制,可记录到数据库,采用定期扫描重发的方式。
  • 做好日志记录,每个消息状态是否都被服务器收到都应该记录
  • 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发

# 容错方法

代理示例:

/**
     * 关闭订单
     * @param orderEntity
     */
    @Override
    public void closeOrder(OrderEntity orderEntity) {

        //关闭订单之前先查询一下数据库,判断此订单状态是否已支付
        OrderEntity orderInfo = this.getOne(new QueryWrapper<OrderEntity>().
            eq("order_sn",orderEntity.getOrderSn()));

        if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) {
            //代付款状态进行关单
            OrderEntity orderUpdate = new OrderEntity();
            orderUpdate.setId(orderInfo.getId());
            orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode());
            this.updateById(orderUpdate);

            // 发送消息给MQ
            OrderTo orderTo = new OrderTo();
            BeanUtils.copyProperties(orderInfo, orderTo);

            try {
                //TODO 确保每个消息发送成功,给每个消息做好日志记录,(给数据库保存每一个详细信息)保存每个消息的详细信息
                rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);
            } catch (Exception e) {
                //TODO 定期扫描数据库,重新发送失败的消息
                                // while() 重试次数
            }
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 日志记录

创建消息日志记录表:

CREATE TABLE `mq_message` (
  `message_id` char(32) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '主键ID',
  `content` text COLLATE utf8mb4_unicode_ci COMMENT '消息内容(json)',
  `to_exchange` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '目标交换机',
  `routing_key` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '路由健',
  `class_type` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '标志类型(方便转换)',
  `message_state` tinyint(4) NOT NULL DEFAULT '0' COMMENT '消息状态[0->新建,1->已发送,2->错误抵达,3->已抵达]',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  `update_time` datetime NOT NULL COMMENT '更新时间',
  PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='mq本地消息表';
1
2
3
4
5
6
7
8
9
10
11

消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。

  • publisher 也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。

生产者消息确认回调应该增加日志记录,确认回调成功后修改记录日志的状态: gulimall-order/xxx/order/config/MyRabbitConfig.java

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;

@Configuration
public class MyRabbitConfig {

  @Autowired
  RabbitTemplate rabbitTemplate;

  /**
   * 使用JSON序列化机制,进行消息转换
   *
   * @return
   */
  @Bean
  public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
  }

  /**
   * 定制RabbitTemplate
   * 1、服务收到消息就会回调
   * 1、spring.rabbitmq.publisher-confirms: true
   * 2、设置确认回调
   * 2、消息正确抵达队列就会进行回调
   * 1、spring.rabbitmq.publisher-returns: true
   * spring.rabbitmq.template.mandatory: true
   * 2、设置确认回调ReturnCallback
   * <p>
   * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
   */
  @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
  public void initRabbitTemplate() {

    /**
     * 1、只要消息抵达Broker就ack=true
     * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
     * ack:消息是否成功收到
     * cause:失败的原因
     */
    //设置确认回调
    rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
      //服务器收到消息,更新本地消息表....
      System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
    });

     /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            //报错误了,更新本地消息表...
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

# 定期重发

自动ACK的状态下。消费者收到消息,但没来得及消费然后宕机。

  • 一定开启手动ACK,消费成功才移除,失败或者还没来得及处理就 noAck并重新入队。

防止消息丢失记住这两条: 1、做好消息确认机制(publisher,consumer【手动ack】】) 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再发送一遍

# 2、消息重复

# 出现重复的几种情况

  • 1、消息消费成功,事务已经提交,ack时,机器宕机,导致没有ack成功。Broker的消息重新由 unack 变为ready,并发送给其他消费者
  • 2、消息消费失败,由于重试机制,自动又将消息发送出去。
  • 3、成功消费,ack时宕机,消息又unack变为ready,Broker又重新发送

# 解决方案

  • 消费者的业务消费接口应该设计为幂等性的。比如扣库存有工作单的状态标识。(最重要)
  • 使用防重表(redis/mysql),发送消息每一个都有业务的唯一标识,处理过就不用再处理。
  • rabbitMQ的每一个消息都有 redilivered字段,可以获取是否被重新投递过来的,而不是第一次被投递过来的。
    • 虽然是重新派过来的,但是上一次处理可能是失败的,如果单纯地是否重新派发来拒绝消息,可能会导致有些业务没处理,所以不推荐使用这种做法

# 3、消息积压

  • 消费者宕机
  • 消费者消费能力不足
  • 发送者发送流量太大
    • 上线更多的消费者,进行正常的消费
    • 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理
帮我改善此页面 (opens new window)
上次更新: 2021/09/08, 06:15:12
docker运行Seata
K8S搭建

← docker运行Seata K8S搭建→

最近更新
01
zabbix学习笔记二
02-28
02
zabbix学习笔记一
02-10
03
Linux访问不了github
12-08
更多文章>
Theme by Vdoing | Copyright © 2020-2022 Saul.J.Wu | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式