搜索

查看: 3068|回复: 11

[ASP.NET] .Net Core和RabbitMQ限制循环消费的方法

[复制链接]
发表于 2023-5-4 11:24:49 | 显示全部楼层 |阅读模式
Editor 2023-5-4 11:24:49 3068 11 看全部
目录
  • 前言
  • 循环场景
  • 解决方案
  • 一次消费
  • 消息不重入队列
  • 限定重试次数
  • 消息头设定次数
  • 存储重试次数
  • 队列使用Quorum类型
  • 队列消息过期
  • 参考资料
    前言
    当消费者端接收消息处理业务时,如果出现异常或是拒收消息将消息又变更为等待投递再次推送给消费者,这样一来,则形成循环的条件。

    2022103011192153.jpg

    2022103011192153.jpg


    循环场景
    生产者发送100条消息到RabbitMQ中,消费者设定读取到第50条消息时,设置拒收,同时设定是否还留存在当前队列中(当requeue为false时,设置了死信队列则进入死信队列,否则移除消息)。
    consumer.Received += (model, ea) =>
    {
        var message = ea.Body;
        Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));
        if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
        {
            Console.WriteLine("拒收");
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
            return;
        }
        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
    };
    当第50条消息拒收,则仍在队列中且处在队列头部,重新推送给消费者,再次拒收,再次推送,反反复复。

    2022103011192154.jpg

    2022103011192154.jpg


    最终其他消息全部消费完毕,仅剩第50条消息往复间不断消费,拒收,消费,这将可能导致RabbitMQ出现内存泄漏问题。

    2022103011192155.jpg

    2022103011192155.jpg


    解决方案
    RabbitMQ及AMQP协议本身没有提供这类重试功能,但可以利用一些已有的功能来间接实现重试限定(以下只考虑基于手动确认模式情况)。此处只想到或是只查到了如下几种方案解决消息循环消费问题。
  • 一次消费
  • 无论成功与否,消费者都对外返回ack,将拒收原因或是异常信息catch存入本地或是新队列中另作重试。
  • 消费者拒绝消息或是出现异常,返回Nack或Reject,消息进入死信队列或丢弃(requeue设定为false)。
  • 限定重试次数
  • 在消息的头中添加重试次数,并将消息重新发送出去,再每次重新消费时从头中判断重试次数,递增或递减该值,直到达到限制,requeue改为false,最终进入死信队列或丢弃。
  • 可以在Redis、Memcache或其他存储中存储消息唯一键(例如Guid、雪花Id等,但必须在发布消息时手动设置它),甚至在mysql中连同重试次数一起存储,然后在每次重新消费时递增/递减该值,直到达到限制,requeue改为false,最终进入死信队列或丢弃。
  • 队列使用Quorum类型,限制投递次数,超过次数消息被删除。
  • 队列消息过期
  • 设置过期时间,给队列或是消息设置TTL,重试一定次数消息达到过期时间后进入死信队列或丢弃(requeue设定为true)。
  • 也许还有更多好的方案...
    一次消费
    对外总是Ack
    消息到达了消费端,可因某些原因消费失败了,对外可以发送Ack,而在内部走额外的方式去执行补偿操作,比如将消息转发到内部的RabbitMQ或是其他处理方式,终归是只消费一次。
    var queueName = "alwaysack_queue";
    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
    channel.BasicQos(0, 5, false);
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        try
        {
            var message = ea.Body;
            Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));
            if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
            {
                throw new Exception("模拟异常");
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
        finally
        {
            ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
        }
    };
    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    当消费端收到消息,处理时出现异常,可以另想办法去处理,而对外保持着ack的返回,以避免消息的循环消费。

    2022103011192156.jpg

    2022103011192156.jpg


    消息不重入队列
    在消费者端,因异常或是拒收消息时,对requeue设置为false时,如果设置了死信队列,则符合“消息被拒绝且不重入队列”这一进入死信队列的情况,从而避免消息反复重试。如未设置死信队列,则消息被丢失。

    2022103011192157.jpg

    2022103011192157.jpg


    此处假定接收100条消息,在接收到第50条消息时设置拒收,并且设置了requeue为false。
    var dlxExchangeName = "dlx_exchange";
    channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);
    var dlxQueueName = "dlx_queue";
    channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
    channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");
    var queueName = "nackorreject_queue";
    var arguments = new Dictionary
    {
        { "x-dead-letter-exchange", dlxExchangeName }
    };
    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);
    channel.BasicQos(0, 5, false);
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var message = ea.Body;
        Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));
        if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
        {
            Console.WriteLine("拒收");
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);//关键在于requeue=false
            return;
        }
        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
    };
    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    如此一来,拒收消息不会重入队列,并且现有队列绑定了死信交换机,因此,消息进入到死信队列中,如不绑定,则消息丢失。

    2022103011192258.jpg

    2022103011192258.jpg


    限定重试次数
    设置重试次数,限定循环消费的次数,允许短暂的循环,但最终打破循环。

    消息头设定次数
    在消息头中设置次数记录作为标记,但是,消费端无法对接收到的消息修改消息头然后将原消息送回MQ,因此,需要将原消息内容重新发送消息到MQ,具体步骤如下
  • 原消息设置不重入队列。
  • 再发送新的消息其内容与原消息一致,可设置新消息的消息头来携带重试次数。
  • 消费端再次消费时,便可从消息头中查看消息被消费的次数。

    2022103011192259.jpg

    2022103011192259.jpg


    此处假定接收10条消息,在接收到第5条消息时设置拒收, 当消息头中重试次数未超过设定的3次时,消息可以重入队列,再次被消费。
    var queueName = "messageheaderretrycount_queue";
    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
    channel.BasicQos(0, 5, false);
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var message = ea.Body;
        Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));
        if (Encoding.UTF8.GetString(message.ToArray()).Contains("5"))
        {
            var maxRetryCount = 3;
            Console.WriteLine($"拒收 {DateTime.Now}");
            //初次消费
            if (ea.BasicProperties.Headers == null)
            {
                //原消息设置为不重入队列
                ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);
                //发送新消息到队列中
                RetryPublishMessage(channel, queueName, message.ToArray(), 1);
                return;
            }
            //获取重试次数
            var retryCount = ParseRetryCount(ea);
            if (retryCount ();
        basicProperties.Headers.Add("retryCount", retryCount);
        channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: basicProperties, body: body);
    }
    static int ParseRetryCount(BasicDeliverEventArgs ea)
    {
        var existRetryRecord = ea.BasicProperties.Headers.TryGetValue("retryCount", out object retryCount);
        if (!existRetryRecord)
        {
            throw new Exception("没有设置重试次数");
        }
        return (int)retryCount;
    }
    消息被拒收后,再重新发送消息到原有交换机或是队列下中,以使得消息像是消费失败回到了队列中,如此来控制消费次数,但是这种场景下,新消息排在了队列的尾部,而不是原消息排在队列头部。

    2022103011192260.jpg

    2022103011192260.jpg


    存储重试次数
    在存储服务中存储消息的唯一标识与对应重试次数,消费消息前对消息进行判断是否存在。

    2022103011192261.jpg

    2022103011192261.jpg


    与消息头判断一致,只是消息重试次数的存储从消息本身挪入存储服务中了。需要注意的是,消息发送端需要设置消息的唯一标识(MessageId属性)
    //模拟外部存储服务
    var MessageRetryCounts = new Dictionary();
    var queueName = "storageretrycount_queue";
    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
    channel.BasicQos(0, 5, false);
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var message = ea.Body;
        Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));
        if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
        {
            var maxRetryCount = 3;
            Console.WriteLine("拒收");
       
            //重试次数判断
            var existRetryRecord = MessageRetryCounts.ContainsKey(ea.BasicProperties.MessageId);
            if (!existRetryRecord)
            {
                //重入队列,继续重试
                MessageRetryCounts.Add(ea.BasicProperties.MessageId, 1);
                ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
                return;
            }
       
            if (MessageRetryCounts[ea.BasicProperties.MessageId]
    除第一次拒收外,允许三次重试机会,三次重试完毕后,设置requeue为false,消息丢失或进入死信队列(如有设置的话)。

    2022103011192262.jpg

    2022103011192262.jpg


    队列使用Quorum类型
    第一种和第二种分别是消息自身、外部存储服务来管理消息重试次数,使用Quorum,由MQ来限定消息的投递次数,也就控制了重试次数。

    2022103011192263.jpg

    2022103011192263.jpg


    设置队列类型为quorum,设置投递最大次数,当超过投递次数后,消息被丢弃。
    var queueName = "quorumtype_queue";
    var arguments = new Dictionary()
    {
        { "x-queue-type", "quorum"},
        { "x-delivery-limit", 3 }
    };
    channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
    channel.BasicQos(0, 5, false);
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var message = ea.Body;
        Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));
        if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
        {
            Console.WriteLine($"拒收 {DateTime.Now}");
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
            return;
        }
        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
    };
    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    第一次消费被拒收重入队列后,经最大三次投递后,消费端不再收到消息,如此一来也限制了消息的循环消费。

    2022103011192264.jpg

    2022103011192264.jpg


    队列消息过期
    当为消息设置了过期时间时,当消息没有受到Ack,且还在队列中,受到过期时间的限制,反复消费但未能成功时,消息将走向过期,进入死信队列或是被丢弃。
    聚焦于过期时间的限制,因此在消费者端,因异常或是拒收消息时,需要对requeue设置为true,将消息再次重入到原队列中。

    2022103011192265.jpg

    2022103011192265.jpg


    设定消费者端第五十条消息会被拒收,且队列的TTL设置为5秒。
    //死信交换机和死信队列
    var dlxExchangeName = "dlx_exchange";
    channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);
    var dlxQueueName = "dlx_queue";
    channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
    channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");
    //常规队列
    var queueName = "normalmessage_queue";
    var arguments = new Dictionary
    {
        { "x-message-ttl", 5000},
        { "x-dead-letter-exchange", dlxExchangeName }
    };
    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);
    channel.BasicQos(0, 5, false);
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var message = ea.Body;
        Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));
        if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
        {
            Console.WriteLine($"拒收 {DateTime.Now}");
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
            return;
        }
        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
    };
    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    当消费者端拒收消息后消息重入队列,再次消费,反复进行超过5秒后,消息在队列中达到了过期时间,则被挪入到死信队列中。

    2022103011192266.jpg

    2022103011192266.jpg


    从Web管理中死信队列中可查看该条过期的消息。

    2022103011192267.jpg

    2022103011192267.jpg


    参考资料
    https://www.jianshu.com/p/f77a0b10c140
    https://www.jianshu.com/p/4904c609632f
    https://stackoverflow.com/questions/23158310/how-do-i-set-a-number-of-retry-attempts-in-rabbitmq
    到此这篇关于.Net Core和RabbitMQ限制循环消费的文章就介绍到这了,更多相关.net core rabbitmq循环消费内容请搜索知鸟论坛以前的文章或继续浏览下面的相关文章希望大家以后多多支持知鸟论坛
  • 回复

    使用道具 举报

    发表于 2023-6-28 23:47:09 | 显示全部楼层
    向往草原403 2023-6-28 23:47:09 看全部
    论坛不能没有像楼主这样的人才啊!我会一直支持知鸟论坛
    回复

    使用道具 举报

    发表于 2023-6-29 11:06:51 | 显示全部楼层
    啤酒瓶空了缓 2023-6-29 11:06:51 看全部
    楼主,我太崇拜你了!我想我是一天也不能离开知鸟论坛
    回复

    使用道具 举报

    发表于 2023-6-29 11:25:53 | 显示全部楼层
    落败的青春阳落s 2023-6-29 11:25:53 看全部
    论坛不能没有像楼主这样的人才啊!我会一直支持知鸟论坛
    回复

    使用道具 举报

    发表于 2023-6-29 14:17:19 | 显示全部楼层
    幸福341 2023-6-29 14:17:19 看全部
    这个帖子不回对不起自己!我想我是一天也不能离开知鸟论坛
    回复

    使用道具 举报

    发表于 2023-6-29 15:59:36 | 显示全部楼层
    风吹吹蛋蛋疼风w 2023-6-29 15:59:36 看全部
    这东西我收了!谢谢楼主!知鸟论坛真好!
    回复

    使用道具 举报

    发表于 2023-6-29 19:33:57 | 显示全部楼层
    戏做顿 2023-6-29 19:33:57 看全部
    论坛不能没有像楼主这样的人才啊!我会一直支持知鸟论坛
    回复

    使用道具 举报

    发表于 2023-6-30 17:53:08 | 显示全部楼层
    我是的十八簿 2023-6-30 17:53:08 看全部
    这个帖子不回对不起自己!我想我是一天也不能离开知鸟论坛
    回复

    使用道具 举报

    发表于 2023-7-1 00:25:37 | 显示全部楼层
    462710480 2023-7-1 00:25:37 看全部
    感谢楼主的无私分享!要想知鸟论坛好 就靠你我他
    回复

    使用道具 举报

    发表于 2023-7-4 01:43:25 | 显示全部楼层
    米老鼠和蓝精鼠v 2023-7-4 01:43:25 看全部
    楼主太厉害了!楼主,I*老*虎*U!我觉得知鸟论坛真是个好地方!
    回复

    使用道具 举报

    • 您可能感兴趣
    点击右侧快捷回复 【请勿灌水】
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则 返回列表

    RSS订阅| SiteMap| 小黑屋| 知鸟论坛
    联系邮箱E-mail:zniao@foxmail.com
    快速回复 返回顶部 返回列表