1 Star 0 Fork 1

zhaoyao / yamq

forked from RonxBulld / yamq 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
README.md 3.17 KB
一键复制 编辑 原始数据 按行查看 历史
RonxBulld 提交于 2021-05-02 16:03 . Fix some bugs, and add README.

yamq - Yet Another Message Queue

This is the single head file that implement message queue framework.

Basics

#include <yamq.h>

Create a Observer template class inherit from ObserverBase, and override Invoke.

class Observer : public yamq::ObserverBase {
public:
    Observer() = default ;
    ~Observer() override = default ;
    void Invoke(const yamq::KVdb &kvdb) override {
        std::cout << kvdb[""] << std::endl;
    }
};

Then, create a PubSub instance,

yamq::PubSub pubsub;

and Observer instance.

Observer obi;

After that, register this observer instance.

pubsub.Subscribe("", obi);

Finally, you can broadcast your defined message by calling the yamq::PubSub::Publish interface.

yamq::KVdb kvdb;
kvdb[""] = "Resolved.";
pubsub.Publish("", kvdb);

To subscribe to other message channel, you can modify the first stuff of calling Subscribe.

Similarly, it can only receive messages with the same first parameter of Publish.

pubsub.Subscribe("other", obi);
pubsub.Publish("other", kvdb);

Deregister observer

When the observer object is released, ObserverBase will automatically notify all related PubSub objects to deregister it.

And, you can use Disconnect and DisconnectAll to manually control unbinding from any PubSub object.

obi.DisconnectAll();
obi.Disconnect(&pubsub);

While destruct

When the object is ready for destruction, you need to pay attention to whether your task has stopped, otherwise it will cause some uncertain problems.

You can use WaitAndDisable before this to avoid this problem.

obk->WaitAndDisable();
delete obk;

Example

Putting all together:


#include <iostream>
#include <yamq.h>

class ConsoleOut {
private:
    std::string prompt_;
public:
    explicit ConsoleOut(const std::string &prompt = "") : prompt_(prompt) {}
    void WriteOut(const std::string &str) {
        std::cout << prompt_ << ": " << str << std::endl;
    }
};

template <class T>
class Observer : public yamq::ObserverBase {
private:
    ConsoleOut &console_out_;

public:
    explicit Observer(ConsoleOut &CO) : console_out_(CO) {}

    void Invoke(const yamq::KVdb &kvdb) override {
        console_out_.WriteOut(kvdb[""] + kvdb["i"]);
    }

    ~Observer() override = default ;
};

int main() {
    ConsoleOut co("Test");
    ConsoleOut ck("Echo");

    Observer<std::string> obi(co);
    Observer<long> *obk = new Observer<long>(ck);

    yamq::PubSub pubsub;
    pubsub.Subscribe("aaa", obi);
    pubsub.Subscribe("aaa", *obk);
    pubsub.Subscribe("bbb", [](const yamq::KVdb &kvdb){
        std::cout << "Lambda recived: " << kvdb[""] << std::endl;
    });
    bool obi_reg = true;

    yamq::KVdb kvdb;
    kvdb[""] = "Resolved.";

    int i = 0;
    while(i++ < 10000) {
        kvdb["i"] = std::to_string(i);
        pubsub.Publish("aaa", kvdb);
        if ((i >= 5000) && obk) {
            obk->WaitAndDisable();
            delete obk;
            obk = nullptr;
        }
        if (obi_reg && (i >= 1000)) {
            obi.Disconnect(&pubsub);
            obi_reg = false;
        }
    }

    return 0;
}

Linking

This is a header only library.

1
https://gitee.com/zhaoyao219/yamq.git
git@gitee.com:zhaoyao219/yamq.git
zhaoyao219
yamq
yamq
master

搜索帮助