搜索

查看: 3071|回复: 11

[PHP] ThinkPHP基于think-queue的队列插件实现消息推送

[复制链接]
发表于 2023-5-4 17:05:44 | 显示全部楼层 |阅读模式
Editor 2023-5-4 17:05:44 3071 11 看全部
目录
  • 前言
  • 安装
  • 搭建消息队列的存储环境
  • 消息的创建与推送
  • 消息的消费与删除
  • 发布任务
  • 处理任务think-queue是ThinkPHP官方提供的一个消息队列服务,是专门支持队列服务的扩展包。think-queue消息队列适用于大并发或返回结果时间比较长且需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送。think-queue消息队列可进行发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等操作。

    前言
    传统的程序执行流程一般是 即时|同步|串行的,在某些场景下,会存在并发低,吞吐量低,响应时间长等问题。在大型系统中,一般会引入消息队列的组件,将流程中部分任务抽离出来放入消息队列,并由专门的消费者作针对性的处理,从而降低系统耦合度,提高系统性能和可用性。
    一般来说,可以抽离的任务具有以下的特点:
  • 允许延后|异步|并行处理 (相对于传统的 即时|同步|串行 的执行方式)
  • 允许延后
    抢购活动时,先快速缓冲有限的参与人数到消息队列,后续再排队处理实际的抢购业务;
  • 允许异步
    业务处理过程中的邮件,短信等通知
  • 允许并行
    用户支付成功之后,邮件通知,微信通知,短信通知可以由多个不同的消费者并行执行,通知到达的时间不要求先后顺序。
  • 允许失败和重试
  • 强一致性的业务放入核心流程处理
  • 无一致性要求或最终一致即可的业务放入队列处理
    thinkphp-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:
  • 消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等
  • 队列的多队列, 内存限制 ,启动,停止,守护等
  • 消息队列可降级为同步执行
    thinkphp-queue 内置了 RedisDatabaseTopthinkSync这四种驱动。本文主要介绍 thinkphp-queue 结合其内置的 redis 驱动的使用方式和基本原理。
    注1:如无特殊说明,下文中的 ‘消息’ 和 ‘任务’两个词指代的是同一个概念,即队列中的一个成员。该成员对消息队列而言是其内部保存的消息; 对业务应用而言是一个待执行的任务。请根据语境区分。

    安装
    首先查看ThinkPHP框架版本,然后进入Packagist官网搜索think-queue,并根据ThinkPHP版本选择对应think-queue版本。
    thinkphp-queue地址:https://packagist.org/packages/topthink/think-queue
    本文采用的ThinkPHP的版本为5.0.23。
    可直接使用Composer为当前项目安装think-queue消息队列插件

    搭建消息队列的存储环境
    不推荐使用数据库,如果使用Redis驱动,那么需要提前安装Redis服务以及PHP的Redis扩展。
    根据选择的存储方式,在 \application\config\queue.php 这个配置文件中,添加消息队列对应的驱动配置

    消息的创建与推送
    我们在控制器中执行测试代码,将数据推送到helloJobQueue队列
    新增 \application\index\controller\JobTest.php 控制器,在该控制器中添加 actionWithHelloJob 方法
    time(), 'bizId' => uniqid() , 'a' => 1 ] ;
          
          // 4.将该任务推送到消息队列,等待对应的消费者去执行
          $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );       
          
          // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
          if( $isPushed !== false ){  
              echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."
    ";
          }else{
              echo 'Oops, something went wrong.';
          }
      }
    }
    在这个例子当中,我们是手动指定的 $jobHandlerClassName ,更合理的做法是先定义好消息名称与消费者类名的映射关系,然后由某个可以获取该映射关系的类来推送这个消息。这样,生产者只需要知道消息的名称,而无需指定哪个消费者类来处理。

    消息的消费与删除
    编写 Hello 消费者类,用于处理 helloJobQueue 队列中的任务
    新增 \application\index\job\Hello.php 消费者类,并编写其 fire() 方法
    checkDatabaseToSeeIfJobNeedToBeDone($data);
              if(!$isJobStillNeedToBeDone){
                  $job->delete();
                  return;
              }
            
              $isJobDone = $this->doHelloJob($data);
            
              if ($isJobDone) {
                  // 如果任务执行成功, 记得删除任务
                  $job->delete();
                  print("Hello Job has been done and deleted"."\n");
              }else{
                  if ($job->attempts() > 3) {
                      //通过这个方法可以检查这个任务已经重试了几次了
                      print("Hello Job has been retried more than 3 times!"."\n");
                      
                                              $job->delete();
                      
                      // 也可以重新发布这个任务
                      //print("Hello Job will be availabe again after 2s."."\n");
                      //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
                  }
              }
          }
          
          /**
           * 有些消息在到达消费者时,可能已经不再需要执行了
           * @param array|mixed    $data     发布任务时自定义的数据
           * @return boolean                 任务执行的结果
           */
          private function checkDatabaseToSeeIfJobNeedToBeDone($data){
              return true;
          }
          /**
           * 根据消息中的数据进行实际的业务处理...
           */
          private function doHelloJob($data)
          {
              print("Hello Job Started. job Data is: ".var_export($data,true)." \n");
              print("Hello Job is Fired at " . date('Y-m-d H:i:s') ." \n");
              print("Hello Job is Done!"." \n");
             
              return true;
          }
      }
    发布任务
    在浏览器中访问 http://your.project.domain/index/job_test/actionWithHelloJob ,可以看到消息推送成功。
    消息推送成功后可以用redis可视化工具查看redis数据进行验证

    处理任务
    切换到当前终端到项目根目录
    $ php think queue:work --queue dismiss_job_queue
    查看执行的结果

    至此,成功地使用thinkphp中的thinkphp-queue经历了一个消息的 创建 -> 推送 -> 消费 -> 删除 的基本流程。
    到此这篇关于ThinkPHP基于think-queue的队列插件实现消息推送的文章就介绍到这了,更多相关think-queue消息推送内容请搜索知鸟论坛以前的文章或继续浏览下面的相关文章希望大家以后多多支持知鸟论坛
  • 回复

    使用道具 举报

    发表于 2023-6-28 18:30:29 | 显示全部楼层
    执着等待等wc 2023-6-28 18:30:29 看全部
    楼主发贴辛苦了,谢谢楼主分享!我觉得知鸟论坛是注册对了!
    回复

    使用道具 举报

    发表于 2023-6-28 22:44:28 | 显示全部楼层
    我的苦恼冉 2023-6-28 22:44:28 看全部
    论坛不能没有像楼主这样的人才啊!我会一直支持知鸟论坛
    回复

    使用道具 举报

    发表于 2023-6-29 11:23:25 | 显示全部楼层
    落败的青春阳落s 2023-6-29 11:23:25 看全部
    楼主,我太崇拜你了!我想我是一天也不能离开知鸟论坛
    回复

    使用道具 举报

    发表于 2023-6-29 14:10:23 | 显示全部楼层
    dxf17 2023-6-29 14:10:23 看全部
    感谢楼主的无私分享!要想知鸟论坛好 就靠你我他
    回复

    使用道具 举报

    发表于 2023-6-29 22:13:02 | 显示全部楼层
    素色流年783 2023-6-29 22:13:02 看全部
    感谢楼主的无私分享!要想知鸟论坛好 就靠你我他
    回复

    使用道具 举报

    发表于 2023-6-29 23:30:07 | 显示全部楼层
    普通人物怨 2023-6-29 23:30:07 看全部
    既然你诚信诚意的推荐了,那我就勉为其难的看看吧!知鸟论坛不走平凡路。
    回复

    使用道具 举报

    发表于 2023-6-30 00:23:21 | 显示全部楼层
    风来时狂放 2023-6-30 00:23:21 看全部
    楼主,我太崇拜你了!我想我是一天也不能离开知鸟论坛
    回复

    使用道具 举报

    发表于 2023-6-30 08:15:34 | 显示全部楼层
    风吹吹蛋蛋疼风w 2023-6-30 08:15:34 看全部
    楼主,大恩不言谢了!知鸟论坛是最棒的!
    回复

    使用道具 举报

    发表于 2023-6-30 23:20:10 | 显示全部楼层
    井底燕雀傥 2023-6-30 23:20:10 看全部
    这个帖子不回对不起自己!我想我是一天也不能离开知鸟论坛
    回复

    使用道具 举报

    • 您可能感兴趣
    点击右侧快捷回复 【请勿灌水】
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则 返回列表

    RSS订阅| SiteMap| 小黑屋| 知鸟论坛
    联系邮箱E-mail:zniao@foxmail.com
    快速回复 返回顶部 返回列表