1 Star 0 Fork 0

imi 开发组 / imi-queue

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
MIT

imi-queue

Latest Version Php Version Swoole Version IMI License

介绍

imi 框架的队列组件,使用 Redis 作为持久化

支持的特性:

  • 消息持久化
  • 分布式
  • 延时消息
  • ACK 机制
  • 消费超时机制
  • 失败/超时消息重新消费

Composer

本项目可以使用composer安装,遵循psr-4自动加载规则,在你的 composer.json 中加入下面的内容:

{
    "require": {
        "imiphp/imi-queue": "^1.0.0"
    }
}

然后执行 composer update 安装。

使用说明

可以参考 exampletests 目录示例。

项目配置文件:

[
    'components'    =>  [
        'Queue'  =>  'Imi\Queue',
    ],
    'beans' =>  [
        'AutoRunProcessManager' =>  [
            'processes' =>  [
                // 加入队列消费进程,非必须,你也可以自己写进程消费
                'QueueConsumer',
            ],
        ],
        'imiQueue'  =>  [
            // 默认队列
            'default'   =>  'test1',
            // 队列列表
            'list'  =>  [
                // 队列名称
                'test1' =>  [
                    // 使用的队列驱动
                    'driver'        =>  \Imi\Queue\Driver\RedisQueueDriver::class,
                    // 消费协程数量
                    'co'            =>  1,
                    // 消费进程数量;可能会受进程分组影响,以同一组中配置的最多进程数量为准
                    'process'       =>  1,
                    // 消费循环尝试 pop 的时间间隔,单位:秒(仅使用消费者类时有效)
                    'timespan'      =>  0.1,
                    // 进程分组名称
                    'processGroup'  =>  'a',
                    // 自动消费
                    'autoConsumer'  =>  true,
                    // 消费者类
                    'consumer'      =>  'AConsumer',
                    // 驱动类所需要的参数数组
                    'config'        =>  [
                        // redis 连接池名
                        'poolName'  =>  'redis',
                        // redis 键前缀
                        'prefix'    =>  'imi:queue:test:',
                        // 消费循环尝试 pop 的时间间隔,单位:秒(手动调用pop()方法有效)
                        'timespan'  =>  0.1,
                    ]
                ],
            ],
        ],
    ]
]

消费者类

<?php
namespace ImiApp\Consumer;

use Imi\Log\Log;
use Imi\Bean\Annotation\Bean;
use Imi\Queue\Contract\IMessage;
use Imi\Queue\Driver\IQueueDriver;
use Imi\Queue\Service\BaseQueueConsumer;

/**
 * @Bean("AConsumer")
 */
class AConsumer extends BaseQueueConsumer
{
    /**
     * 处理消费
     * 
     * @param \Imi\Queue\Contract\IMessage $message
     * @param \Imi\Queue\Driver\IQueueDriver $queue
     * @return void
     */
    protected function consume(IMessage $message, IQueueDriver $queue)
    {
        Log::info(sprintf('[%s]%s:%s', $queue->getName(), $message->getMessageId(), $message->getMessage()));
        $queue->success($message);
    }

}

获取队列对象

use \Imi\Queue\Facade\Queue;
$queue = Queue::getQueue('队列名称');

推送消息到队列

返回消息ID

$message = new \Imi\Queue\Model\Message;
$message->setMessage('字符串的消息内容');
$message->setWorkingTimeout(0); // 设置工作超时时间,单位:秒,为0不限制
$queue->push($message);
// 延时消息,单位:秒
$queue->push($message, 1.5);

从队列弹出一个消息

$message = $queue->pop();
if(null !== $message)
{
    // 将消息标记为成功
    $queue->success($message);

    // 将消息标记为失败
    $queue->fail($message);

    // 将消息标记为失败,并重回队列
    $queue->fail($message, true);
}

删除消息

$message = $queue->pop();
if(null !== $message)
{
    $queue->delete($message);
}

清空队列

use \Imi\Queue\Enum\QueueType;

$queue->clear(); // 清空全部

// 清空指定类型
$queue->clear([
    QueueType::READY,   // 准备就绪
    QueueType::WORKING, // 工作中
    QueueType::FAIL,    // 失败
    QueueType::TIMEOUT, // 超时
    QueueType::DELAY,   // 准备就绪延时
]);

获取队列状态

// 返回 \Imi\Queue\Model\QueueStatus 类型
$status = $queue->status();
$status->getReady();    // 准备就绪数量
$status->getWorking();  // 工作中数量
$status->getFail();     // 失败数量
$status->getTimeout();  // 超时数量
$status->getDelay();    // 延时数量

将失败消息恢复到队列

让失败消息可以被重新消费

$queue->restoreFailMessages();

将超时消息恢复到队列

让超时消息可以被重新消费

$queue->restoreTimeoutMessages();

命令行工具

获取队列状态

命令:bin/imi queue/status -queue 队列名称

返回 JSON:

{
    "ready": 0,
    "working": 0,
    "fail": 0,
    "timeout": 0,
    "delay": 0
}

将失败消息恢复到队列

命令:bin/imi queue/restoreFail -queue 队列名称

返回恢复的消息数量:

0

将超时消息恢复到队列

命令:bin/imi queue/restoreTimeout -queue 队列名称

返回恢复的消息数量:

0

免费技术支持

QQ群:17916227 点击加群,如有问题会有人解答和修复。

运行环境

版权信息

imi-queue 遵循 MIT 开源协议发布,并提供免费使用。

捐赠

开源不求盈利,多少都是心意,生活不易,随缘随缘……

MIT License Copyright (c) 2019 imi 开发组 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

简介

暂无描述 展开 收起
MIT
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
1
https://gitee.com/imiphp/imi-queue.git
git@gitee.com:imiphp/imi-queue.git
imiphp
imi-queue
imi-queue
master

搜索帮助