1 Star 1 Fork 1

zhen / hyperf-rocketmq

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
MIT

hyperf_rocketmq

基于阿里云 rocketmq SDK 封装,实现了协程、连接池、消息可靠投递 等功能

基于 https://github.com/thefair-net/hyperf_rocketmq 进行封装

1、安装

composer require zhen/hyperf-rocketmq

producer、consumer 创建命令

需要先安装依赖

composer require zhen/hyperf-devtool --dev
php bin/hyperf.php ext-gen:rocketmq-consumer 领域名 消费者名称   
php bin/hyperf.php ext-gen:rocketmq-producer 领域名 生产者名称   

2、配置

发布配置

php bin/hyperf.php vendor:publish zhen/hyperf-rocketmq

配置说明

配置 类型 默认值 备注
host string HTTP协议客户端接入点
access_key string AccessKey ID
secret_key string AccessKey Secret
instance_id string 实例id
pool array 连接池配置
return [
    'default' => [ // 分组名,基于 host、port、scheme 进行区分
        'host' => env('ROCKETMQ_HTTP_HOST'),
        'access_key' => env('ROCKETMQ_HTTP_ACCESS_KEY_ID'),
        'secret_key' => env('ROCKET_MQ_HTTP_ACCESS_KEY_SECRET'),
        'instance_id' => env('ROCKET_MQ_HTTP_INSTANCE_ID'),
        'pool' => [
            'min_connections' => 50,
            'max_connections' => 300,
            'connect_timeout' => 3.0,
            'wait_timeout' => 30.0,
            'heartbeat' => -1,
            'max_idle_time' => 60.0,
        ],
    ],
];

3、创建相关数据表

如果不需要记录日志 或 消息不需要可靠投递,可以忽略这步

php bin/hyperf.php migrate --path=migrations/rocketmq

表说明:

mq_status_log:消息生产状态表

mq_produce_status_log:生成消息状态

mq_consume_log:消费日志

4、投递消息

Producer注解参数

字段名 类型 描述 默认值
poolName string 连接池名称。对应配置文件 rocketmq.php 中的key default
dbConnection string 数据库连接名称(用于记录生产日志) default
topic string topic
messageKey string 消息key 随机生成
messageTag string 消息标签

4.1 定义生产者相关信息

在 DemoProducer 文件中,我们可以修改 @Producer 注解对应的字段来替换对应的 poolNametopicmessageTag。就是最终投递到消息队列中的数据,所以我们可以随意改写 __construct 方法,只要最后赋值 payload 即可。

使用 @Producer 注解时需 use Zhen\HyperfRocketMQ\Annotation\Producer; 命名空间;

<?php
declare(strict_types=1);

namespace App\Test\Queue\Producer;

use Zhen\HyperfRocketMQ\Annotation\Producer;
use Zhen\HyperfRocketMQ\Message\ProducerMessage;

#[Producer(topic:"Topic_03_test", messageTag:"tMsgKey")]
class DemoProducer extends ProducerMessage
{
    public function __construct(array $data)
    {
        // 设置消息内容
        $this->setPayload($data);
        // 自定义messageKey(不定义,会自动生成)
        $this->setMessageKey('xxxxx');
    }
}

4.2 普通投递方式

通过Zhen\HyperfRocketMQ\Producer实例,即可投递消息。

<?php

declare(strict_types=1);

namespace App\Controller;

use App\Producer\DemoProducer;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\RequestMapping;
use Zhen\HyperfRocketMQ\Producer;

#[Controller]
class IndexController extends AbstractController
{
    #[Inject(Producer::class)]
    protected Producer $producer;

    #[RequestMapping("index")]
    public function index()
    {
        $message = new DemoProducer(['a' => 1, 'b' =>2]);
        $this->producer->produce($message);
        return $this->response->json([]);
    }
}

4.3 消息可靠投递方式

目前,消息投递时Rocketmq返回成功响应,就视为投递成功(暂不考虑Rocketmq缓存丢失的问题)。

实现原理:先将需要投递的消息入库处理,然后再进行发送操作

  1. 执行以下命令,生成相关的数据表

    php bin/hyperf.php migrate --path=migrations/rocketmq
  2. 使用示例

    $demoProducer = new BarProducer(['test' => 12345, 'name' => '张三1231']);
    
    Db::beginTransaction();
    try{
        // todo 业务逻辑
    
        // 记录消息状态
        $demoProducer->saveMessageStatus();
        Db::commit();
    } catch(\Throwable $ex){
        Db::rollBack();
    }
    
    // 推送消息
    $this->producer->produce($demoProducer);
  3. 投递失败的消息,可以通过守护进程监听 mq_status_log 数据表status不等于3的消息,进行重新投递(后面实现)

5、消息消费

Consumer注解属性说明

属性 类型 描述 默认值
name string 消费名称
poolName string 连接池名称。对应配置文件 rocketmq.php 中的key default
topic string topic
groupId string 消费组id
messageTag string 消息标签
numOfMessage int 每次拉取消息数 3
waitSeconds int 轮询等待时间 3
processNums int 启动消费进程数 1
enable bool 是否初始化启动进程 true

在 DemoConsumer文件中,我们可以修改 @Consumer 注解对应的字段来替换对应的 topicgroupIdmessageTag

使用 @Consumer 注解时需 use Zhen\HyperfRocketMQ\Annotation\Consumer; 命名空间;

use Zhen\HyperfRocketMQ\Annotation\Consumer;
use Zhen\HyperfRocketMQ\Library\Model\Message as RocketMQMessage;
use Zhen\HyperfRocketMQ\Message\ConsumerMessage;
use Zhen\HyperfRocketMQ\Result;

#[Consumer(topic: "Topic_03_test", groupId: "test_test", messageTag: "tMsgKey||tMsgKey_bar")]
class DemoCounser extends ConsumerMessage
{
    public function consumeMessage(RocketMQMessage $rocketMQMessage): string
    {
        $msgTag = $rocketMQMessage->getMessageTag(); // 消息标签
        $msgKey = $rocketMQMessage->getMessageKey(); // 消息唯一标识
        $msgBody = $rocketMQMessage->toArray(); // 消息体
        $msgId = $rocketMQMessage->getMessageId();

        var_dump('消费端接收到消息:', $msgBody);

        return Result::ACK;
    }
}

6、事件说明

下面事件都在 Zhen\HyperfRocketMQ\Event 命名空间下

事件 说明
AfterProduce 消息生产成功触发的事件
BeforeConsume 开启消费前触发的事件
AfterConsume 成功消费后触发的事件
FailToConsume 消费失败触发的事件
MIT License Copyright (c) 2022 zhen Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

简介

基于阿里云 rocketmq SDK 封装,实现了协程、连接池、消息可靠投递 等功能 展开 收起
PHP
MIT
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
PHP
1
https://gitee.com/easy_code/hyperf-rocketmq.git
git@gitee.com:easy_code/hyperf-rocketmq.git
easy_code
hyperf-rocketmq
hyperf-rocketmq
master

搜索帮助