1 Star 0 Fork 51

haohailuo / KMQueue

forked from m631521383 / KMQueue 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0

KMQueue

Build Status

该框架是基于redis实现的分布式队列,简单灵活。

下面简单介绍下该队列的一些设计,如果还有其他不懂得地方可以参考源码和注释,代码中我加入了详尽的注释。

还有其他问题可以提issue。

更新历史

2018年1月23日:新增健康检测 防止执行耗时久的任务被备份队列监听检测到并当作失败任务重试。 但需要用户自行实现健康检测的逻辑,后续考虑通过zookeeper实现健康上报。

设计

序列图

基于Redis的分布式消息队列设计.png

队列模式

KMQueue队列分为两种模式:

  • default - 简单队列
  • safe - 安全队列

其中默认为default

可以以queueName:queueMode格式设置队列的模式。

  • queueName 队列名称

    default 为默认队列,可以不指定,默认值。 特性:队列任务可能会丢失,队列任务没有超时限制。

  • queueMode 队列模式,可选值有:default、safe。

    safe 为安全队列,任务有重试策略,达到重试次数依旧失败或者任务存活超时(这里说的超时是指AliveTimeout)(这两者都称为最终失败),Monitor会发出通知, 这样可以根据业务做一些处理,推荐将这些失败的任务持久化到数据库作为日志记录。当然或许你还有更好的处理方式。

    注意:需要开启备份队列监听程序BackupQueueMonitor,否则安全队列中最终失败的任务只会存储在备份队列中,而没有消费者去消费处理,这是很危险的行为

new KMQueueManager.Builder("127.0.0.1", 6379, "worker1_queue", "worker2_queue:safe")
...

worker1_queue为简单队列,worker2_queue为安全队列。

注意:为了更好的支持业务(将已存在的某个队列的DEFAULT改为SAFE,并重启服务的情况),做如下处理: 当new KMQueueManager.Builder队列名称参数中,只要有一个队列指定了SAFE模式,就会创建备份队列(用于队列任务监控,设置任务超时、失败任务重试等), 并且该备份队列的名称基于传入的所有队列名称生成(无论其队列是否是SAFE模式)。

上面的例子中,备份队列的生成策略为:

base64(md5("worker1_queue" + "worker2_queue"))

Task(任务)

构造方法声明如下:

public Task(String queue,
            String uid,
            boolean isUnique,
            String type,
            String data,
            Task.TaskStatus status)
  • uid:如果业务需要区分队列任务的唯一性,请自行生成uid参数, 否则队列默认使用uuid生成策略,这会导致即使data数据完全相同的任务也会被当作两个不同的任务处理。

  • 是否是唯一任务,即队列中同一时刻只存在一个该任务。

  • type:用于业务逻辑的处理,你可以根据不同的type任务类型,调用不同的handler去处理,可以不传。

KMQueueManager(队列管理器)

有三种方式获取Redis连接,详情查看KMQueueManager.Builder构造方法的三种重载形式。 如果你使用spring,建议获取spring中配置的redis连接池对象,并通过如下构造方法创建队列管理器:

public Builder(Pool<Jedis> pool, String... queues)

RedisTaskQueue(任务队列)

  • 1.采用阻塞队列,以阻塞的方式(brpop)获取任务队列中的任务;
  • 2.判断任务存活时间是否超时(对应的是大于aliveTimeout);
  • 3.更新任务的执行时间戳,放入备份队列的队首(lpush);

BackupQueueMonitor(备份队列监控)

因为初始化备份队列时设置了循环标记; 所以Monitor这里采用定时Job策略,使用brpoplpush backupQueue backupQueue循环遍历备份队列,遇到循环标记结束循环遍历。 对执行超时(对应的是大于protectedTimeout)或者存活时间超时(对应的是大于aliveTimeout)的任务做处理。

分为两种情况:

  • 任务存活时间超时 || (任务执行超时&任务重试次数大于RetryTimes):任务不再重试从备份队列删除该任务。 相应的可以通过实现Pipeline,决定这些任务的一些额外处理,比如持久化到数据库做日志记录。
    // 任务彻底失败后的处理,需要实现Pipeline接口,自行实现处理逻辑
    TaskPipeline taskPipeline = new TaskPipeline();
    BackupQueueMonitor backupQueueMonitor = new BackupQueueMonitor.Builder("127.0.0.1", 6379, backUpQueueName)
                    ...
                    .setPipeline(taskPipeline).build();
  • 任务执行超时&任务重试次数小于RetryTimes:即超时并且重复执行次数小于RetryTimes的任务重新放回任务队列执行,同时更新任务状态:
    • 放入任务队列,优先处理();
    • 任务state标记为"retry";
    • 重试次数+1;

健康检查

使用方式:

// 健康检测
MyAliveDetectHandler detectHandler = new MyAliveDetectHandler();
...
// 构造Monitor监听器
BackupQueueMonitor backupQueueMonitor = new BackupQueueMonitor.Builder("127.0.0.1", 6379, backUpQueueName)
        ...
        .registerAliveDetectHandler(detectHandler)
        .build();
// 执行监听
backupQueueMonitor.monitor();

registerAliveDetectHandler()方法可以设置Null,则不会启用健康检测。

检查正在执行的任务是否还在执行(存活),

为了防止耗时比较久的任务(任务的执行时间超出了通过队列管理器配置的任务执行超时时间 - 默认值:com.kingsoft.wps.mail.queue.config.Constant.PROTECTED_TIMEOUT) 会被备份队列监听器检测到并重新放入任务队列执行(因为备份队列监听器会把超出通过队列管理器配置的任务执行超时时间的任务当作是失败的任务(参考 什么是失败任务?)并进行重试)。

通过这种检测机制,可以保证check(Task)返回为true的任务(任务还在执行)不会被备份队列监听器重新放入任务队列重试。 这里只是提供一个接口,用户需要自己实现执行任务的健康检测。

目前健康检测机制还只是处于初步阶段,核心的检测逻辑还需要用户自行实现,这里只是提供一个接口。

一个比较简单的实现方式就是起一个定时job,每隔n毫秒检查线程中正在执行任务的状态,在redis中以 "任务的id + ALIVE_KEY_SUFFIX" 为key,ttl 为 n+m 毫秒(m < n, m用于保证两次job的空窗期),标记正在执行的任务。 然后AliveDetectHandler的实现类根据task去检查redis中是否存在该key,如果存在,返回true

使用Demo

生产任务

@Test
public void pushTaskTest() {
    KMQueueManager kmQueueManager = new KMQueueManager.Builder("127.0.0.1", 6379, "worker1_queue", "worker2_queue:safe")
            .setMaxWaitMillis(-1L)
            .setMaxTotal(600)
            .setMaxIdle(300)
            .setAliveTimeout(Constant.ALIVE_TIMEOUT)
            .build();
    // 初始化队列
    kmQueueManager.init();

    // 1.获取队列
    TaskQueue taskQueue = kmQueueManager.getTaskQueue("worker2_queue");
    // 2.创建任务
    JSONObject ob = new JSONObject();
    ob.put("data", "mail proxy task");
    String data = JSON.toJSONString(ob);
    // 参数 uid:如果业务需要区分队列任务的唯一性,请自行生成uid参数,
    // 否则队列默认使用uuid生成策略,这会导致即使data数据完全相同的任务也会被当作两个不同的任务处理。
    // 参数 type:用于业务逻辑的处理,你可以根据不同的type任务类型,调用不同的handler去处理,可以不传。
    Task task = new Task(taskQueue.getName(), "", true, "", data, new Task.TaskStatus());
    // 3.将任务加入队列
    taskQueue.pushTask(task);
}

消费任务

@Test
public void popTaskTest() {
    KMQueueManager kmQueueManager = new KMQueueManager.Builder("127.0.0.1", 6379, "worker1_queue", "worker2_queue:safe")
            .setMaxWaitMillis(-1L)
            .setMaxTotal(600)
            .setMaxIdle(300)
            .setAliveTimeout(Constant.ALIVE_TIMEOUT)
            .build();
    // 初始化队列
    kmQueueManager.init();

    // 1.获取队列
    TaskQueue taskQueue = kmQueueManager.getTaskQueue("worker1_queue");
    // 2.获取任务
    Task task = taskQueue.popTask();
    // 业务处理放到TaskConsumersHandler里
    if (task != null) {
        task.doTask(kmQueueManager, MyTaskHandler.class);
    }
}

你可以自行实现TaskHandler接口,创建适合你自己业务逻辑的任务处理类,并通过下面代码执行任务处理。

task.doTask(kmQueueManager, TaskHandler.class)

此外,doTask方法还支持业务传参,通过第三个参数实现params

task.doTask(kmQueueManager, TaskHandler.class, params)

如果业务处理抛出异常,队列也将其当作任务执行完成处理,

并通过taskQueue.finishTask(this)完成任务。

public void doTask(KMQueueManager kmQueueManager, Class clazz, Object... params) {

    // 获取任务所属队列
    TaskQueue taskQueue = kmQueueManager.getTaskQueue(this.getQueue());
    String queueMode = taskQueue.getMode();
    if (KMQueueManager.SAFE.equals(queueMode)) {// 安全队列
        try {
            handleTask(clazz, params);
        } catch (Throwable e) {
            e.printStackTrace();
        }
        // 任务执行完成,删除备份队列的相应任务
        taskQueue.finishTask(this);
    } else {// 普通队列
        handleTask(clazz);
    }
}

不会再进行任务重试操作。

这点可能不太容易理解,为什么任务抛出异常失败了,队列不会执行重试呢?

因为任务执行抛出异常是业务级的错误,队列不做干预。

队列的重试只是针对消费任务的线程被kill掉或者服务器宕机等情况,此时该任务还没执行完,任务的消费者还没告诉队列任务执行完成了。 此时备份队列监控会执行任务的重试。

如果你想在任务抛出异常失败时执行任务重试,可以不使用task.doTask,当任务抛出异常时,不执行任务的taskQueue.finishTask(this)操作。 这样备份队列监控会在下一个job对该任务进行检查处理。

taskQueue.finishTask(this)是一个非常方便的工具方法。

备份队列监控

@Test
public void monitorTaskTest() {

    // 健康检测
    MyAliveDetectHandler detectHandler = new MyAliveDetectHandler();
    // 任务彻底失败后的处理,需要实现Pipeline接口,自行实现处理逻辑
    MyPipeline pipeline = new MyPipeline();
    // 根据任务队列的名称构造备份队列的名称,注意:这里的任务队列参数一定要和KMQueueManager构造时传入的一一对应。
    String backUpQueueName = KMQUtils.genBackUpQueueName("worker1_queue", "worker2_queue:safe");
    // 构造Monitor监听器
    BackupQueueMonitor backupQueueMonitor = new BackupQueueMonitor.Builder("127.0.0.1", 6379, backUpQueueName)
            .setMaxWaitMillis(-1L)
            .setMaxTotal(600)
            .setMaxIdle(300)
            .setAliveTimeout(Constant.ALIVE_TIMEOUT)
            .setProtectedTimeout(Constant.PROTECTED_TIMEOUT)
            .setRetryTimes(Constant.RETRY_TIMES)
            .registerAliveDetectHandler(detectHandler)
            .setPipeline(pipeline).build();
    // 执行监听
    backupQueueMonitor.monitor();
}

重要的事情说三遍:

如果指定了队列的模式为安全队列,一定要开启备份队列监控!!!一定要开启备份队列监控!!!一定要开启备份队列监控!!!

QA

什么是失败任务?

任务执行抛出异常是业务级的错误,队列不做干预,队列依旧把它当作是成功的任务。

队列的重试只是针对消费任务的线程被kill掉或者服务器宕机等情况,此时该任务还没执行完,任务的消费者还没告诉队列任务执行完成了。 此时备份队列监控会执行任务的重试。在这种情况下,任务才能定义为失败任务。

Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

简介

基于Redis实现的分布式队列,队列任务监控,任务超时、失败任务重试等特性 展开 收起
Java
Apache-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/laurence/KMQueue.git
git@gitee.com:laurence/KMQueue.git
laurence
KMQueue
KMQueue
master

搜索帮助

14c37bed 8189591 565d56ea 8189591