搜索

查看: 3091|回复: 11

[ASP.NET] .NETCore基于RabbitMQ实现延时队列的两方法

[复制链接]
发表于 2023-5-4 11:31:09 | 显示全部楼层 |阅读模式
Editor 2023-5-4 11:31:09 3091 11 看全部
目录
  • 前言
  • 实现延时队列的两种方式
  • 利用rabbitmq死信队列x-dead-letter-exchange和x-dead-letter-routing-key
  • .NETCore实现方式
  • rabbitmq通过安装插件的形式实现(推荐)
  • .NET Core 实现
  • 第一种方式的缺陷以及解决方案
    前言
    此文章用来记录自己学习延时队列过程的文章,并用.NET这两种方式实现了简单的Demo。
    延时队列的应用场景 应用下单后,30分钟没有支付的话,则自动取消订单活动开始前30分钟,提醒参赛者参加活动。活动结束后,30分钟后提醒未进行评价的参赛人员进行评价…
    上述的场景都可以使用延时队列进行对应的处理。
    上面的场景虽说可以通过定时器也可以处理,但有点浪费资源, 而上述的场景时间是不定的,例如有两个活动需要提醒参赛者参加,一个是7点开始 ,另一个是8点开始,那么触发处理的一个是6点半,一个是7点半。

    实现延时队列的两种方式
    使用Rabbitmq实现延时队列可以让消息持久化,也支持分布式
    缺点
    第一种第一种方式的缺陷以及解决方案
    第二种这个插件的当前设计并不真正适合具有大量延迟消息(例如成百上千或数百万)的场景。详情信息

    利用rabbitmq死信队列x-dead-letter-exchange和x-dead-letter-routing-key
    实现需要创建两对交换机和队列,其中需要对其中一对的队列进行设置x-dead-letter-exchange和x-dead-letter-routing-key属性,属性指定转发到另一对的交换机,
    随后实现流程图如下:

    2022092511055210.png

    2022092511055210.png


    .NETCore实现方式
    项目:.NET Core 控制台项目
    install-package RabbitMQ.Client

    生产者代码:
                ConnectionFactory connectionFactory = new ConnectionFactory
                {
                    UserName = "guest",
                    Password = "guest",
                    HostName = "127.0.0.1"
                };
                //创建连接
                var connection = connectionFactory.CreateConnection();
                //创建通道
                var channl = connection.CreateModel();
               //指定队列的x-dead-letter-exchange和x-dead-letter-routing-key
                Dictionary queueArgs = new Dictionary()
                {
                    { "x-dead-letter-exchange","exchange.business.test" },
                    {"x-dead-letter-routing-key","businessRoutingkey" }
                };
                //延时的交换机和队列绑定
                channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null);
                channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs);
                channl.QueueBind("queue.business.dlx", "exchange.business.dlx", "");
                //业务的交换机和队列绑定
                channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null);
                channl.QueueDeclare("queue.business.test", true, false, false, null);
                channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null);
                Console.WriteLine("生产者开始发送消息");
                while (true)
                {
                    string message = Console.ReadLine();
                    var body = Encoding.UTF8.GetBytes(message);
                    var properties = channl.CreateBasicProperties();
                    properties.Persistent = true;
                    properties.Expiration = "5000";
                    //发送一条延时5秒的消息
                    channl.BasicPublish("exchange.business.dlx", "", properties, body);
                }
    消费者
                ConnectionFactory connectionFactory = new ConnectionFactory
                {
                    UserName = "guest",
                    Password = "guest",
                    HostName = "127.0.0.1"
                };
                //创建连接
                var connection = connectionFactory.CreateConnection();
                var channel = connection.CreateModel();
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                //给消费时添加一个委托
                consumer.Received += (obj, ea) =>
                {
                    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                    //打印消费的消息
                    Console.WriteLine(message);
                    channel.BasicAck(ea.DeliveryTag, false);
                };
                //消费queue.business.test队列的消息
                channel.BasicConsume("queue.business.test", false, consumer);
                Console.ReadKey();
                channel.Dispose();
                connection.Close();
    实现效果:

    2022092511055211.gif

    2022092511055211.gif


    rabbitmq通过安装插件的形式实现(推荐)
    使用rabbitmq_delayed_message_exchange 插件提供的x-delayed-message类型的交换机
    下载插件的地址:https://www.rabbitmq.com/community-plugins.html
    选中rabbitmq_delayed_message_exchange插件

    该插件使用只需要声明交换机的时候,指定x-delayed-message类型,然后添加x-delayed-type参数即可

    .NET Core 实现
    生产者
                ConnectionFactory connectionFactory = new ConnectionFactory()
                {
                    UserName = "guest",
                    Password = "guest",
                    HostName = "127.0.0.1"
                };
                var connection = connectionFactory.CreateConnection();
                var channel = connection.CreateModel();
                Dictionary exchangeArgs = new Dictionary()
                {
                    {"x-delayed-type","direct" }
                };
                //指定x-delayed-message 类型的交换机,并且添加x-delayed-type属性
                channel.ExchangeDeclare("plug.delay.exchange", "x-delayed-message", true, false, exchangeArgs);
                channel.QueueDeclare("plug.delay.queue", true, false, false, null);
                channel.QueueBind("plug.delay.queue", "plug.delay.exchange", "plugdelay");
                var properties = channel.CreateBasicProperties();
                Console.WriteLine("生产者开始发送消息");
                Dictionary headers = new Dictionary()
                {
                    {"x-delay","5000" }
                };
                properties.Persistent = true;
                properties.Headers = headers;
                while (true)
                {
                    string message = Console.ReadLine();
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish("plug.delay.exchange", "plugdelay", properties, body);
                }
    消费者:
                ConnectionFactory connectionFactory = new ConnectionFactory
                {
                    UserName = "guest",
                    Password = "guest",
                    HostName = "127.0.0.1"
                };
                //创建连接
                var connection = connectionFactory.CreateConnection();
                var channel = connection.CreateModel();
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (obj, ea) =>
                {
                    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                    Console.WriteLine(message);
                    channel.BasicAck(ea.DeliveryTag, false);
                };
                channel.BasicConsume("plug.delay.queue", false, consumer);
                Console.ReadKey();
                channel.Dispose();
                connection.Close();
    实现效果:

    2022092511055212.gif

    2022092511055212.gif


    第一种方式的缺陷以及解决方案
    如果存在A、B消息进入了队列中,A在前,B在后,如果B消息的过期时间比A的过期时间要早,消费的时候,并不会先消费B,再消费A,而是B会等A先消费,即使A要晚过期

    举例
    生产者代码修改成如下:
                ConnectionFactory connectionFactory = new ConnectionFactory
                {
                    UserName = "guest",
                    Password = "guest",
                    HostName = "127.0.0.1"
                };
                //创建连接
                var connection = connectionFactory.CreateConnection();
                //创建通道
                var channl = connection.CreateModel();
                Dictionary queueArgs = new Dictionary()
                {
                    { "x-dead-letter-exchange","exchange.business.test" },
                    {"x-dead-letter-routing-key","businessRoutingkey" }
                };
                //延时的交换机和队列绑定
                channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null);
                channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs);
                channl.QueueBind("queue.business.dlx", "exchange.business.dlx", "");
                //业务的交换机和队列绑定
                channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null);
                channl.QueueDeclare("queue.business.test", true, false, false, null);
                channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null);
                string message1 = "Hello Word!1";
                string message2 = "Hello Word!2";
                var body1 = Encoding.UTF8.GetBytes(message1);
                var body2 = Encoding.UTF8.GetBytes(message2);
                var properties = channl.CreateBasicProperties();
                properties.Persistent = true;
                //先发送过期时间5秒的消息
                properties.Expiration = "5000";
                channl.BasicPublish("exchange.business.dlx", "", properties, body2);
                //再发送过期时间3秒的消息
                properties.Expiration = "3000";
                channl.BasicPublish("exchange.business.dlx", "", properties, body1);
    结果:

    2022092511055313.gif

    2022092511055313.gif


    这里先发了延时20秒的A消息,然后又发了延时10秒的B消息,但是最终结果并不是先消费了B消息,而是等A消息过期后,立刻再去消费B。
    这个会影响什么业务呢?好比两个C、D活动,C活动开始时间是7点,D活动开始时间是5点,那么D活动提醒需要等到C活动提醒后,才会立刻提醒,这明显不符合我们的业务需求。
    解决方案 每个活动都是单独的创建自己的交换机和队列使用第二种实现方式,即使用插件的形式。
    第一种不太现实,因为如果活动多的话,则会创建很多的队列,而且只会使用一次。
    业务上还是推荐使用插件的实现方式。
    第二种方式的效果

    2022092511055314.gif

    2022092511055314.gif


    github地址:
    https://github.com/MDZZ3/RabbitmqDelay

    到此这篇关于.NETCore基于RabbitMQ实现延时队列的两方法的文章就介绍到这了,更多相关.NETCore RabbitMQ 内容请搜索知鸟论坛以前的文章或继续浏览下面的相关文章希望大家以后多多支持知鸟论坛
  • 回复

    使用道具 举报

    发表于 2023-6-28 22:22:06 | 显示全部楼层
    知足常乐77 2023-6-28 22:22:06 看全部
    这东西我收了!谢谢楼主!知鸟论坛真好!
    回复

    使用道具 举报

    发表于 2023-6-29 18:13:42 | 显示全部楼层
    李志敏 2023-6-29 18:13:42 看全部
    论坛不能没有像楼主这样的人才啊!我会一直支持知鸟论坛
    回复

    使用道具 举报

    发表于 2023-6-29 18:23:43 | 显示全部楼层
    123456809 2023-6-29 18:23:43 看全部
    感谢楼主的无私分享!要想知鸟论坛好 就靠你我他
    回复

    使用道具 举报

    发表于 2023-6-30 00:14:32 | 显示全部楼层
    普通人物怨 2023-6-30 00:14:32 看全部
    这个帖子不回对不起自己!我想我是一天也不能离开知鸟论坛
    回复

    使用道具 举报

    发表于 2023-6-30 09:34:50 | 显示全部楼层
    墙和鸡蛋 2023-6-30 09:34:50 看全部
    我看不错噢 谢谢楼主!知鸟论坛越来越好!
    回复

    使用道具 举报

    发表于 2023-6-30 09:41:12 | 显示全部楼层
    术数古籍专卖疤 2023-6-30 09:41:12 看全部
    楼主,大恩不言谢了!知鸟论坛是最棒的!
    回复

    使用道具 举报

    发表于 2023-6-30 21:42:33 | 显示全部楼层
    永远爱你冰塘 2023-6-30 21:42:33 看全部
    楼主,我太崇拜你了!我想我是一天也不能离开知鸟论坛
    回复

    使用道具 举报

    发表于 2023-7-3 10:21:22 | 显示全部楼层
    462710480 2023-7-3 10:21:22 看全部
    其实我一直觉得楼主的品味不错!呵呵!知鸟论坛太棒了!
    回复

    使用道具 举报

    发表于 2023-7-4 05:21:18 | 显示全部楼层
    小妖花满楼满fx 2023-7-4 05:21:18 看全部
    这个帖子不回对不起自己!我想我是一天也不能离开知鸟论坛
    回复

    使用道具 举报

    • 您可能感兴趣
    点击右侧快捷回复 【请勿灌水】
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则 返回列表

    RSS订阅| SiteMap| 小黑屋| 知鸟论坛
    联系邮箱E-mail:zniao@foxmail.com
    快速回复 返回顶部 返回列表