1 Star 0 Fork 0

unitwork / iwechen-thrift

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
ClientManager.cpp 9.34 KB
一键复制 编辑 原始数据 按行查看 历史
wshy1121 提交于 2017-12-19 11:45 . mod 使用iwechen配置模块
#include <boost/thread/mutex.hpp>
#include <zlib.h>
#include <boost/format.hpp>
#include <boost/bind.hpp>
#include "ClientManager.h"
#include "trace_worker.h"
#include "Base64.h"
#include "conn-pool/ConnPool.h"
#include "JsonHelper.h"
#include "IweChenCfg.h"
static boost::mutex g_insMutexCalc;
CClientManager* CClientManager::_instance = NULL;
CClientManager* CClientManager::instance()
{
if (NULL == _instance)
{
boost::unique_lock<boost::mutex> guardMutex(g_insMutexCalc);
if (NULL == _instance)
{
_instance = new CClientManager;
}
}
return _instance;
}
CClientManager::CClientManager()
:m_serialNumber(0)
{
InitPublisher();
m_methodMap["ping"] = boost::bind(&CClientManager::Ping, this, _1, _2, _3, _4, _5);
m_methodMap["websocket"] = boost::bind(&CClientManager::Websocket, this, _1, _2, _3, _4, _5);
m_methodMap["webview"] = boost::bind(&CClientManager::Webview, this, _1, _2, _3, _4, _5);
ConnPoolInit();
}
CClientManager::~CClientManager()
{
}
void CClientManager::ConnPoolInit()
{
const rapidjson::Value &defaultValue = CWeChenCfg::instance()->Cfg()["THRIFT"]["default"];
CThriftConnPool::instance()->Init(defaultValue);
const rapidjson::Value &serverValue = CWeChenCfg::instance()->Cfg()["THRIFT"]["server"];
for (unsigned int serverIndex=0; serverIndex<serverValue.Size(); ++serverIndex)
{
const rapidjson::Value &configValue = serverValue[serverIndex];
boost::shared_ptr<CThriftClient> thriftClient = boost::shared_ptr<CThriftClient>(new CThriftClient(configValue));
RegisterMethods(configValue["methods"], thriftClient);
m_thriftClientVector.push_back(thriftClient);
}
}
bool CClientManager::RegisterMethod(const char *methodName, Method method)
{
m_methodMap[methodName] = method;
return true;
}
void CClientManager::RegisterMethods(const rapidjson::Value &methodsValue, boost::shared_ptr<CThriftClient> &thriftClient)
{ trace_worker();
for (unsigned int methodIndex=0; methodIndex<methodsValue.Size(); ++methodIndex)
{
const char *methodName = methodsValue[methodIndex].GetString();
trace_printf("methodName %s", methodName);
RegisterMethod(methodName, boost::bind(&CThriftClient::thriftMethod, thriftClient, _1, _2, _3, _4, _5));
}
}
void CClientManager::InitPublisher()
{ trace_worker();
const rapidjson::Value &redisValue = CWeChenCfg::instance()->Cfg()["REDIS"];
std::string redisHost = redisValue["host"].GetString();
int redisPort = redisValue["port"].GetInt();
m_publisher.connect(redisHost, redisPort);
}
void CClientManager::CleanStr(const std::string& srcStr, const std::string cleanChars, std::string& dstStr)
{
char *strBuffer = (char *)malloc(srcStr.size() + 1);
bool isValueChar = true;
int dstStrLen = 0;
for (unsigned int i=0; i<srcStr.size(); ++i)
{
isValueChar = true;
for (unsigned int cleanCharsIndex=0; cleanCharsIndex<cleanChars.size(); ++cleanCharsIndex)
{
if (cleanChars[cleanCharsIndex] == srcStr[i])
{
isValueChar = false;
break;
}
}
if (isValueChar == true)
{
strBuffer[dstStrLen++] = srcStr[i];
}
}
strBuffer[dstStrLen] = '\0';
dstStr = strBuffer;
free(strBuffer);
}
std::string &CClientManager::Compress(const std::string &srcData, std::string &dstData)
{
unsigned long dstBufferLen = srcData.size() + 64;
unsigned char *dstBuffer = (unsigned char *)malloc(dstBufferLen);
::compress(dstBuffer, &dstBufferLen, (const Bytef*)srcData.c_str(), (uLong)srcData.size());
dstData.assign((const char *)dstBuffer, dstBufferLen);
free(dstBuffer);
return dstData;
}
bool CClientManager::thriftMethod(rapidjson::Document& returnDocument, const std::string& method, const rapidjson::Value& reqArgsValue)
{
rapidjson::StringBuffer buffer;
std::string openId = reqArgsValue.HasMember("openId") ? reqArgsValue["openId"].GetString() : "";
std::string _return;
if (thriftMethod(_return, openId, method, CJsonHelper::toString(reqArgsValue, buffer)) == false)
{
return false;
}
if (_return.size() <= 0 || returnDocument.Parse(_return.c_str()).HasParseError())
{
return false;
}
return true;
}
bool CClientManager::thriftMethod(std::string& _return, const std::string &openId, const std::string& method, const std::string& reqArgs)
{
Method &methodfunction = m_methodMap[method];
if (methodfunction)
{
return methodfunction(_return, openId, method, reqArgs, false);
}
return false;
}
bool CClientManager::thriftMethodSyn(rapidjson::Document& returnDocument, const std::string& method, rapidjson::Value& reqArgsValue, rapidjson::Document &document)
{ trace_worker();
rapidjson::StringBuffer buffer;
std::string openId = reqArgsValue.HasMember("openId") ? reqArgsValue["openId"].GetString() : "";
const int serialNumber = ++m_serialNumber;
reqArgsValue.AddMember("serialNumber", serialNumber, document.GetAllocator());
std::string _return;
if (thriftMethodSyn(_return, openId, method, CJsonHelper::toString(reqArgsValue, buffer)) == false)
{
return false;
}
if (_return.size() <= 0 || returnDocument.Parse(_return.c_str()).HasParseError()
|| !returnDocument.HasMember("serialNumber")
|| returnDocument["serialNumber"].GetInt() != serialNumber)
{
return false;
}
return true;
}
bool CClientManager::thriftMethodSyn(std::string& _return, const std::string &openId, const std::string& method, const std::string& reqArgs)
{ trace_worker();
trace_printf("openId.c_str(), method.c_str(), reqArgs.c_str() %s %s %s", openId.c_str(), method.c_str(), reqArgs.c_str());
Method &methodfunction = m_methodMap[method];
if (methodfunction)
{
bool bRet = methodfunction(_return, openId, method, reqArgs, true);
trace_printf("bRet, _return.c_str() %d, %s", bRet, _return.c_str());
return bRet;
}
return false;
}
bool CClientManager::SendWSData(rapidjson::Document &document, rapidjson::Value &dataValue)
{
if (!dataValue.HasMember("openId") || !dataValue.HasMember("content"))
{
return false;
}
const std::string &openId = dataValue["openId"].GetString();
dataValue.RemoveMember("openId");
boost::unique_lock<boost::mutex> uniqueLock(m_publisherMutex);
if (m_publisher.numSub(openId) <= 0)
{
return false;
}
rapidjson::StringBuffer buffer;
return m_publisher.publish(openId, std::string("message:") + CJsonHelper::toString(CompressData(document, dataValue), buffer));
}
bool CClientManager::IsOnLine(const std::string &openId)
{
std::vector<int> onLineVector;
std::vector<std::string> openIdVector;
openIdVector.push_back(openId);
IsOnLines(openIdVector, onLineVector);
return onLineVector[0] != 0;
}
bool CClientManager::IsOnLines(const std::vector<std::string> &openIdVector, std::vector<int> &onLineVector)
{
onLineVector.clear();
boost::unique_lock<boost::mutex> uniqueLock(m_publisherMutex);
for (unsigned int openIdVectorIndex=0; openIdVectorIndex<openIdVector.size(); ++openIdVectorIndex)
{
m_publisher.numSub(openIdVector[openIdVectorIndex]) <= 0 ? onLineVector.push_back(0) : onLineVector.push_back(1);
}
return true;
}
rapidjson::Value &CClientManager::CompressData(rapidjson::Document &document, rapidjson::Value &dataValue)
{
bool compress = false;
if (dataValue.HasMember("compress"))
{
compress = dataValue["compress"].GetBool();
}
if (compress == true)
{
rapidjson::StringBuffer buffer;
std::string dstContent;
const std::string &srcContent = CJsonHelper::toString(dataValue["content"], buffer);
dstContent = Compress(srcContent, dstContent);
std::string base64Str;
CBase64 base64;
base64.Encode((const unsigned char *)dstContent.c_str(), dstContent.size(), base64Str);
CleanStr(base64Str, "\r\n", dstContent);
dataValue.AddMember("content", rapidjson::Value(dstContent.c_str(), document.GetAllocator()), document.GetAllocator());
}
return dataValue;
}
bool CClientManager::Ping(std::string& _return, const std::string &openId, const std::string &method, const std::string &content, bool isSyn)
{
_return = "pong";
return true;
}
bool CClientManager::Websocket(std::string& _return, const std::string& openId, const std::string& method, const std::string& content, bool isSyn)
{ trace_worker();
trace_printf("content.c_str() %s", content.c_str());
rapidjson::Document tmpDocument;
if (tmpDocument.Parse(content.c_str()).HasParseError()
|| !tmpDocument.HasMember("code")
|| !tmpDocument.HasMember("url"))
{
return false;
}
std::string url = tmpDocument["url"].GetString();
std::string wsUrl = url.replace(url.find("https"), 5, "wss") + "/" + tmpDocument["code"].GetString();
_return = (boost::format("{\"url\":\"%s\"}") % wsUrl.c_str()).str();
trace_printf("_return.c_str() %s", _return.c_str());
return true;
}
bool CClientManager::Webview(std::string& _return, const std::string& openId, const std::string& method, const std::string& content, bool isSyn)
{ trace_worker();
_return = (boost::format("<meta http-equiv='refresh' content='0;URL=%s'>") % content.c_str()).str();
return true;
}
C++
1
https://gitee.com/unitwork/iwechen-thrift.git
git@gitee.com:unitwork/iwechen-thrift.git
unitwork
iwechen-thrift
iwechen-thrift
master

搜索帮助