同步操作将从 RonxBulld/yamq 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
This is the single head file that implement message queue framework.
#include <yamq.h>
Create a Observer
template class inherit from ObserverBase
, and override Invoke
.
class Observer : public yamq::ObserverBase {
public:
Observer() = default ;
~Observer() override = default ;
void Invoke(const yamq::KVdb &kvdb) override {
std::cout << kvdb[""] << std::endl;
}
};
Then, create a PubSub
instance,
yamq::PubSub pubsub;
and Observer
instance.
Observer obi;
After that, register this observer instance.
pubsub.Subscribe("", obi);
Finally, you can broadcast your defined message by calling the yamq::PubSub::Publish
interface.
yamq::KVdb kvdb;
kvdb[""] = "Resolved.";
pubsub.Publish("", kvdb);
To subscribe to other message channel, you can modify the first stuff of calling Subscribe
.
Similarly, it can only receive messages with the same first parameter of Publish
.
pubsub.Subscribe("other", obi);
pubsub.Publish("other", kvdb);
When the observer object is released, ObserverBase
will automatically notify all related
PubSub objects to deregister it.
And, you can use Disconnect
and DisconnectAll
to manually control unbinding from
any PubSub object.
obi.DisconnectAll();
obi.Disconnect(&pubsub);
When the object is ready for destruction, you need to pay attention to whether your task has stopped, otherwise it will cause some uncertain problems.
You can use WaitAndDisable
before this to avoid this problem.
obk->WaitAndDisable();
delete obk;
Putting all together:
#include <iostream>
#include <yamq.h>
class ConsoleOut {
private:
std::string prompt_;
public:
explicit ConsoleOut(const std::string &prompt = "") : prompt_(prompt) {}
void WriteOut(const std::string &str) {
std::cout << prompt_ << ": " << str << std::endl;
}
};
template <class T>
class Observer : public yamq::ObserverBase {
private:
ConsoleOut &console_out_;
public:
explicit Observer(ConsoleOut &CO) : console_out_(CO) {}
void Invoke(const yamq::KVdb &kvdb) override {
console_out_.WriteOut(kvdb[""] + kvdb["i"]);
}
~Observer() override = default ;
};
int main() {
ConsoleOut co("Test");
ConsoleOut ck("Echo");
Observer<std::string> obi(co);
Observer<long> *obk = new Observer<long>(ck);
yamq::PubSub pubsub;
pubsub.Subscribe("aaa", obi);
pubsub.Subscribe("aaa", *obk);
pubsub.Subscribe("bbb", [](const yamq::KVdb &kvdb){
std::cout << "Lambda recived: " << kvdb[""] << std::endl;
});
bool obi_reg = true;
yamq::KVdb kvdb;
kvdb[""] = "Resolved.";
int i = 0;
while(i++ < 10000) {
kvdb["i"] = std::to_string(i);
pubsub.Publish("aaa", kvdb);
if ((i >= 5000) && obk) {
obk->WaitAndDisable();
delete obk;
obk = nullptr;
}
if (obi_reg && (i >= 1000)) {
obi.Disconnect(&pubsub);
obi_reg = false;
}
}
return 0;
}
This is a header only library.
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。