当前仓库属于关闭状态,部分功能使用受限,详情请查阅 仓库状态说明
1 Star 0 Fork 7

shengzhe8688 / RabbitmqConnect
关闭

forked from 寻根 / RabbitmqConnect
关闭
 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
RabbitmqConnect.h 3.86 KB
一键复制 编辑 原始数据 按行查看 历史
寻根 提交于 2019-06-11 20:00 . 新文件: Example.cpp
#ifndef XG_RABITTMQCONNECT_H
#define XG_RABITTMQCONNECT_H
////////////////////////////////////////////////////////////////
#include <string>
#include <functional>
#include <amqp_tcp_socket.h>
#include <amqp_ssl_socket.h>
#define CHECK_FALSE_RETURN(FUNC) if (FUNC){} else return false;
using namespace std;
class RabbitmqConnect
{
protected:
amqp_socket_t* sock;
amqp_rpc_reply_t res;
amqp_connection_state_t conn;
public:
RabbitmqConnect()
{
res.reply_type = AMQP_RESPONSE_NORMAL;
sock = NULL;
}
~RabbitmqConnect()
{
close();
}
bool getResponse()
{
res = amqp_get_rpc_reply(conn);
CHECK_FALSE_RETURN(res.reply_type == AMQP_RESPONSE_NORMAL);
return true;
}
int getErrorCode() const
{
return res.library_error;
}
string getErrorString() const
{
switch (res.reply_type)
{
case AMQP_RESPONSE_NORMAL: return "SUCCESS";
case AMQP_RESPONSE_NONE: return "RESPONSE_NONE";
case AMQP_RESPONSE_LIBRARY_EXCEPTION: return amqp_error_string2(res.library_error);
case AMQP_RESPONSE_SERVER_EXCEPTION:
if (res.reply.id == AMQP_CONNECTION_CLOSE_METHOD || res.reply.id == AMQP_CHANNEL_CLOSE_METHOD)
{
auto msg = ((amqp_connection_close_t*)(res.reply.decoded))->reply_text;
return string((char*)(msg.bytes), (char*)(msg.bytes) + msg.len);
}
return "RESPONSE_SERVER_EXCEPTION";
default: return "UNKNOWN ERROR";
}
}
public:
void close()
{
if (sock)
{
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
sock = NULL;
}
}
bool connect(const string& host, int port, bool ssl = false)
{
close();
conn = amqp_new_connection();
sock = ssl ? amqp_ssl_socket_new(conn) : amqp_tcp_socket_new(conn);
if (sock == NULL)
{
amqp_destroy_connection(conn);
return false;
}
if (amqp_socket_open(sock, host.c_str(), port))
{
amqp_destroy_connection(conn);
sock = NULL;
return false;
}
return true;
}
int send(const string& exchange, const string& qname, const string& data)
{
amqp_basic_properties_t props;
props.delivery_mode = 2;
props.content_type = amqp_cstring_bytes("text/plain");
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
return amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes(qname.c_str()), 0, 0, &props, amqp_cstring_bytes(data.c_str()));
}
bool login(const string& usr, const string& pwd, amqp_sasl_method_enum method = AMQP_SASL_METHOD_PLAIN)
{
res = amqp_login(conn, "/", 0, 128 * 1024, 0, method, usr.c_str(), pwd.c_str());
CHECK_FALSE_RETURN(res.reply_type == AMQP_RESPONSE_NORMAL);
amqp_channel_open(conn, 1);
return getResponse();
}
bool recv(const string& exchange, const string& qname, std::function<void(const char*, int)> func, int timeout = 5, bool aotodelete = false)
{
timeval tv;
amqp_bytes_t mq = amqp_cstring_bytes(qname.c_str());
amqp_queue_declare(conn, 1, mq, 0, 1, 0, aotodelete ? 1 : 0, amqp_empty_table);
CHECK_FALSE_RETURN(getResponse());
amqp_queue_bind(conn, 1, mq, amqp_cstring_bytes(exchange.c_str()), mq, amqp_empty_table);
CHECK_FALSE_RETURN(getResponse());
amqp_basic_consume(conn, 1, mq, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
CHECK_FALSE_RETURN(getResponse());
tv.tv_sec = timeout;
tv.tv_usec = 0;
while (true)
{
amqp_envelope_t data;
amqp_maybe_release_buffers(conn);
res = amqp_consume_message(conn, &data, &tv, 0);
CHECK_FALSE_RETURN(res.reply_type == AMQP_RESPONSE_NORMAL);
func((char*)(data.message.body.bytes), data.message.body.len);
amqp_destroy_envelope(&data);
}
return false;
}
};
////////////////////////////////////////////////////////////////
#endif
C++
1
https://gitee.com/shengzhe8688/rabbitmqconnect.git
git@gitee.com:shengzhe8688/rabbitmqconnect.git
shengzhe8688
rabbitmqconnect
RabbitmqConnect
master

搜索帮助