搜索

查看: 3098|回复: 11

[Python] django+celery+RabbitMQ自定义多个消息队列的实现

[复制链接]
发表于 2023-5-4 17:20:22 | 显示全部楼层 |阅读模式
Editor 2023-5-4 17:20:22 3098 11 看全部
关于django celery的使用网上有很多文章,本文就不多做更多的说明。
本文使用版本
  • python==3.8.15
  • Django==3.2.4
  • celery==5.2.7
    celery.py
    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
    from kombu import Exchange, Queue
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'zkcelery.settings')
    app = Celery('zkcelery')
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()
    # 看了一篇文章说,如果使用redis做broker,exchange可以不配置;但如果使用rabbitMQ做broker,就必须要配置。
    queue = (
        Queue('default', exchange=Exchange('default', type='direct'), routing_key='default'),
        Queue('q1', exchange=Exchange('e1', type='direct'), routing_key='r1'),
        Queue('q2', exchange=Exchange('e2', type='direct'), routing_key='r2'),
        Queue('q3', exchange=Exchange('e3', type='fanout'), routing_key='r3'),
    )
    # 一旦配置了route后,所有的任务名都必须要指定route,否则任务无法执行。
    # 经过测试,route匹配是最长匹配规则。
    route = {
        'apps.zhiding.tasks.add': {'queue': 'q1', 'routing_key': 'r1'},
        'apps.zhiding.tasks.multiply': {'queue': 'q2', 'routing_key': 'r2'},
        # 其它的任务名称,匹配这条路由
        # 如果以上队列的worker服务器坏了,这些任务会被全部放进这个队列里,该队列的worker将继续处理这些任务
        # 下面这条队列一定要配置,否则其它任务无法处理。
        '*': {'queue': 'default', 'routing_key': 'default'},
    }
    app.conf.update(CELERY_QUEUES=queue, CELERY_ROUTES=route)
    tasks.py
    from celery import shared_task
    import time

    @shared_task
    def add(x, y):
        time.sleep(2)
        print('任务睡眠2秒后执行了')
        return x + y

    @shared_task
    def multiply(x, y):
        time.sleep(5)
        print('任务睡眠5秒后执行了')
        return x * y

    @shared_task
    def sub(x, y):
        time.sleep(4)
        print('任务睡眠4秒后执行了')
        return x - y
    笔者也看了很多博文,在settings.py配置文件中写入CELERY_QUEUESCELERY_ROUTES,上面的配置对应下来就是如下代码块:
    CELERY_QUEUES = (
        Queue('default', exchange=Exchange('default', type='direct'), routing_key='default'),
        Queue('sq1', exchange=Exchange('sq1', type='direct'), routing_key='sq1'),
        Queue('sq2', exchange=Exchange('sq2', type='direct'), routing_key='sq2'),
        Queue('sq3', exchange=Exchange('sq3', type='fanout'), routing_key='sq3'),
    )
    CELERY_ROUTES = {
        'apps.zhiding.tasks.add': {'queue': 'sq1', 'routing_key': 'sq1'},
        'apps.zhiding.tasks.multiply': {'queue': 'sq2', 'routing_key': 'sq2'},
        '*': {'queue': 'default', 'routing_key': 'default'},
    }
    但是笔者在实际使用中发现后面这种方式配置始终未生效,不知道是不是笔者版本的不同,没有做更多的研究,如果你能找到问题的原因,欢迎评论交流。
    启动worker
    # 笔者使用的windows,启动时需要加上-P eventlet
    celery -A zkcelery worker -l info -P eventlet
    启动后队列中出现配置中的个队列

    2023022215475248.jpg

    2023022215475248.jpg


    同时会在rabbitmq中创建(如果不存在)4个队列,交换机和相应的绑定关系(当然也可以直接通过rabbitmq管理端直接创建自己需要的队列、交换机和绑定,具体根据个人习惯或者视工作场景而定选择)

    2023022215475249.jpg

    2023022215475249.jpg


    以队列q1示例:

    2023022215475350.jpg

    2023022215475350.jpg


    暂时先关闭worker,便于观察消息队列中的消息。
    向队列中发送几条消息,消息均进入到配置中指定的queue中

    2023022215475351.jpg

    2023022215475351.jpg


    再次启动worker,队列中的消息立马被消费

    2023022215475352.jpg

    2023022215475352.jpg


    如何做到消费指定的队列中的消息,只需要启动的时候加上参数Q
    # -Q指定消费的队列
    # -n 指定worker节点的名称,避免启动多个时的重名冲突
    celery -A zkcelery worker -l info -Q q1 -n node1 -P eventlet
    可以看到终端中queues只有q1了

    2023022215475453.jpg

    2023022215475453.jpg


    q1中的消息被消费掉了,其他队列没有变化

    2023022215475454.jpg

    2023022215475454.jpg


    也可以同时指定多个消费队列
    celery -A zkcelery worker -l info -Q q2,default -n node2 -P eventlet

    2023022215475455.jpg

    2023022215475455.jpg


    当然也可以在生产方指定推送的队列,举例如下:

    2023022215475456.jpg

    2023022215475456.jpg


    到此这篇关于django+celery+RabbitMQ自定义多个消息队列的实现的文章就介绍到这了,更多相关django celery RabbitMQ消息队列内容请搜索知鸟论坛以前的文章或继续浏览下面的相关文章希望大家以后多多支持知鸟论坛
  • 回复

    使用道具 举报

    发表于 2023-6-28 22:40:59 | 显示全部楼层
    素色流年783 2023-6-28 22:40:59 看全部
    论坛不能没有像楼主这样的人才啊!我会一直支持知鸟论坛
    回复

    使用道具 举报

    发表于 2023-6-29 00:32:38 | 显示全部楼层
    贺老师 2023-6-29 00:32:38 看全部
    楼主发贴辛苦了,谢谢楼主分享!我觉得知鸟论坛是注册对了!
    回复

    使用道具 举报

    发表于 2023-6-29 07:12:09 | 显示全部楼层
    普通人物怨 2023-6-29 07:12:09 看全部
    既然你诚信诚意的推荐了,那我就勉为其难的看看吧!知鸟论坛不走平凡路。
    回复

    使用道具 举报

    发表于 2023-6-29 19:42:55 | 显示全部楼层
    冀苍鸾 2023-6-29 19:42:55 看全部
    感谢楼主的无私分享!要想知鸟论坛好 就靠你我他
    回复

    使用道具 举报

    发表于 2023-6-29 23:27:18 | 显示全部楼层
    哈哈SE7 2023-6-29 23:27:18 看全部
    论坛不能没有像楼主这样的人才啊!我会一直支持知鸟论坛
    回复

    使用道具 举报

    发表于 2023-6-29 23:37:38 | 显示全部楼层
    戏做顿 2023-6-29 23:37:38 看全部
    其实我一直觉得楼主的品味不错!呵呵!知鸟论坛太棒了!
    回复

    使用道具 举报

    发表于 2023-6-30 09:22:10 | 显示全部楼层
    小妖花满楼满fx 2023-6-30 09:22:10 看全部
    楼主太厉害了!楼主,I*老*虎*U!我觉得知鸟论坛真是个好地方!
    回复

    使用道具 举报

    发表于 2023-7-1 01:14:43 | 显示全部楼层
    十二音阶囤 2023-7-1 01:14:43 看全部
    楼主发贴辛苦了,谢谢楼主分享!我觉得知鸟论坛是注册对了!
    回复

    使用道具 举报

    发表于 2023-7-3 11:15:45 | 显示全部楼层
    永远就三年疗 2023-7-3 11:15:45 看全部
    楼主太厉害了!楼主,I*老*虎*U!我觉得知鸟论坛真是个好地方!
    回复

    使用道具 举报

    • 您可能感兴趣
    点击右侧快捷回复 【请勿灌水】
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则 返回列表

    RSS订阅| SiteMap| 小黑屋| 知鸟论坛
    联系邮箱E-mail:zniao@foxmail.com
    快速回复 返回顶部 返回列表