RabbitMQ - 消息确认

Clojure括号侠
• 阅读 5396

RabbitMQ - 队列中提到,接收消息的时候,有两个方式,一个是consume,一个是get,这两个方法都有一个autoAck的参数。当我们设置为true的时候,说明消费者会通过AMQP显示的向rabbitmq发送一个确认,rabbitmq自动视其确认了消息,然后把消息从队列中删除。下面用consume的方式做些例子来理解autoAck的参数设置。

String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

GetResponse basicGet(String queue, boolean autoAck) throws IOException;

不确认

先往ack队列发送5条数据,可以看到ready是5,total是5。
RabbitMQ - 消息确认
运行以下代码,autoAck设置为false,且不对消息确认。

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与rabbitmq服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个Channel
    Channel channel = connection.createChannel();
    // 通过Channel定义队列
    channel.queueDeclare("ack", false, false, false, null);
    // 异步回调处理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("ack Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag());
        //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    };
    // 接收消息
    channel.basicConsume("ack", false, deliverCallback, consumerTag -> {
    });
}

运行结果如下,打印了5条消息:
RabbitMQ - 消息确认
在web控制台可以看出,ready是0,unacked是5,即未确认的消息数是5条。
RabbitMQ - 消息确认
把应用停止掉,即关闭消费者和rabbitmq的连接,web控制台如下,unacked的5条数据回到了ready。
RabbitMQ - 消息确认
综上,当autoAck为false时,消息分为两个部分,一个是未投放放消费者的(ready),一个是投放给消费者但未确认的。如果未确认信息的消费者断开了连接,这部分消息会回到ready重新投递给消费者,确保了消息的可靠性。需要注意的是,如果消费者一直没有断开连接,也没有进行确认,那这个消息会一直等待确认中。

确认

确认有两种,一个是自动确认,一个是手动确认。自动确认的话,就是把autoAck设置为true。
当rabbitmq向消费者传递消息的时候,会带有一个deliveryTag的传递标识,标记惟一地标识信道上的传递,交付标记的作用域是每个信道,所以必须在接收消息的信道上进行确认。交付标记是递增的正整数,所以我们看到是1,2,3这样的递增数字。
上面例子中,有注释了一行代码,就是用来手动确认的,第一个参数就是传递标记,第二个参数是是否批量确认。
上面的打印结果输出了deliveryTag的值,从1到5。把注释删了再运行后,可以看到rabbitmq把确认后的消息删除。

void basicAck(long deliveryTag, boolean multiple) throws IOException;

批量确认

basicAck方法中,有个参数是multiple,是用来批量确认的。当设置为true的时候,RabbitMQ将确认所有未完成的传递标记,包括确认中指定的标记。与其他与确认相关的内容一样,这是按每个信道确定范围的。比如收到标记为1、2的没确认,标记为3的确认,那前面两个也会一起确认。如果multiple设置为false,则仅确认当前的消息。
我们看看下面的例子,每处理三个消息确认一次。

public static void main(String[] args) throws IOException, TimeoutException {
    // 声明一个连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 创建一个与rabbitmq服务器的连接
    Connection connection = factory.newConnection();
    // 创建一个Channel
    Channel channel = connection.createChannel();
    // 通过Channel定义队列
    channel.queueDeclare("multiple", false, false, false, null);
    // 异步回调处理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("multiple Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag());
        if (delivery.getEnvelope().getDeliveryTag() % 3 == 0) {
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
        }
    };
    // 接收消息
    channel.basicConsume("multiple", false, deliverCallback, consumerTag -> {
    });
}

启动后,发送两个消息,可以看到打印了两次
RabbitMQ - 消息确认
控制台显示未确认2个
RabbitMQ - 消息确认
再发送一条数据,打印了第三个
RabbitMQ - 消息确认
web控制台看出,已经确认并删除了队列的消息
RabbitMQ - 消息确认

使用哪种确认方式

自动确认,这种模式通常被称为“发了就忘”,当消费端处理异常时,则服务器发送的消息将得不到正确的处理。因此,自动消息确认应该被认为是不安全的。
手动确认模式中,通常使用消息预取,它限制了通道上未完成(“进行中”)交付的数量。但是,对于自动确认,没有这样的限制,所以消费者有可能由于处理消息太慢,导致内存积压、堆耗尽,导致程序无法运行。因此,自动确认模式仅适用于能够高效、稳定地处理配送的消费者。

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
RabbitMQ学习总结(7)——Spring整合RabbitMQ实例
1.RabbitMQ简介RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。 官网:http://www.rabbitmq.com/(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fwww.rabbi
Stella981 Stella981
3年前
RabbitMQ_消息队列基本使用_2
简介RabbitMQ:接受消息再传递消息,可以视为一个“邮局”。发送者和接受者通过队列来进行交互,队列的大小可以视为无限的,多个发送者可以发生给一个队列,多个接收者也可以从一个队列中接受消息。pika&使用rabbitmq使用的协议是amqp,用于python的推荐客户端是pikapipinstallpika
Stella981 Stella981
3年前
MQ对比之RabbitMQ & Redis
消息队列选择:RabbitMQ&RedisRabbitMQRabbitMQ是一个由erlang开发的AMQP(AdvancedMessageQueue)的开源实现的产品,RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message
Stella981 Stella981
3年前
RabbitMQ基本示例,轮询机制,no_ack作用
一、RabbitMQ简介:'''RabbitMQ就是消息队列之前不是学了Queue了吗,都是队列还学RabbitMQ干嘛?干的事情是一样的Python的Queue有两个,一个线程Queue生产者消费者模型,一个进程Queue用于父进程与子进程交互两个完全独
Stella981 Stella981
3年前
C#队列学习笔记:RabbitMQ基础知识
  一、引言  RabbitMQ是RabbitMessageQueue的简写,但不能仅仅理解其为消息队列,消息代理更合适。RabbitMQ是一个由Erlang语言开发的AMQP(高级消息队列协议)的开源实现,其内部结构如下:!RabbitMQ内部结构(https://oscimg.oschina.net/osc
Stella981 Stella981
3年前
RabbitMQ 消息中间件搭建详解
1.RabbitMQ简介消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包
Stella981 Stella981
3年前
RabbitMQ 基础概念介绍
AMQP消息模型RabbitMQ是基于AMQP(高级消息队列协议)的一个开源实现,其内部实际也是AMQP的基本概念。AMQP的消息发送流程有如下几个步骤:1.消息生产者(producer)将消息发布到Exchange中;2.Exchange根据队列的绑定关系将消息分发到不同的队列(Queue
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Easter79 Easter79
3年前
SpringMVC中配置RabbitMQ
        RabbitMQ是工作在amqp协议(advancedmessagequeueprotocal,高级消息队列协议)上的一个消息中间件。它通过一个生产者消费者模型来处理应用中产生的消息。        除了生产者和消费者,此模型中另外一个重要的概念叫“工作队列”,也称为“任务队列”(TaskQueue),任务队列背后的核心想法是避免
Stella981 Stella981
3年前
RabbitMQ 三种方式的TTL
TTL说明RabbitMQ支持三种方式PerQueueMessageTTL(为进入队列的每一条消息设置一个TTL)QueueTTL(队列的TTL,如果在设置的TTL时间内,没有消费者连接,没有消息发送,RabbitMQ会默认其是将要抛弃不用的,会考虑在TTL到期后删除掉该队列)PerM
美凌格栋栋酱 美凌格栋栋酱
5个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(