1 Star 0 Fork 1

茶密 / DGWLMsgServer

forked from zz89522185 / DGWLMsgServer 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
SocketThread.cpp 7.11 KB
一键复制 编辑 原始数据 按行查看 历史
zz89522185 提交于 2015-12-09 15:07 . 境界飞升与极品暗金装备
#include "SocketThread.h"
#include "ResPonseThread.h"
#if(CC_TARGET_PLATFORM == CC_PLATFORM_ANDROID)
#include "platform/android/jni/JniHelper.h"
#include<jni.h>
#endif
using namespace chrono;
using namespace this_thread;
USING_NS_CC;
ByteBuf SocketThread::send_buf;
ByteBuf SocketThread::recv_buf;
SocketThread::SocketThread(void){
//pthread_mutex_init(&_messageQueueMutex, NULL);
//_messageQueueMutex
quit = false;
csocket = NULL;
state = Disconnected;
onConnected = nullptr;
onConnectFailed = nullptr;
onConnectionClosed = nullptr;
NeedReConnection = false;
reconnectTimes = 0;
onNetworkError = nullptr;
}
/*开启一个独立的线程,负责创建客户端套接字,connect sever, start a send thread and a receive thread*/
int SocketThread::start(string& ip, int port){
quit = false;
this->ip = ip;
this->port = port;
int errCode = 0;
m_pInstance->messages.clear();
// log("ip:%s port:%d", ip.c_str(), port);
do{
_thread = thread(start_thread, this);
_thread.detach();
} while (0);
return errCode;
}
int SocketThread::sendmessage(Message* msg, bool insertFirst){
if (m_pInstance->state == Disconnected){
// log("sendmessage: socketthread disconnected");
return -1;
}
// log("send message:%d", msg->getId());
m_pInstance->_messageQueueMutex.lock();
if (insertFirst && !m_pInstance->messages.empty()){
m_pInstance->messages.push_front(msg);
}
else{
m_pInstance->messages.push_back(msg);
}
m_pInstance->_messageQueueMutex.unlock();
return 0;
}
void* SocketThread::send_fun(void*){
bool issend = true;
while (true){
if (quit){
return NULL;
}
if (m_pInstance->state==Disconnected){
/*if (!InBackground){
Director::getInstance()->getScheduler()->performFunctionInCocosThread([&]{
if (m_pInstance->onConnectionClosed!=nullptr){
m_pInstance->onConnectionClosed();
}
});
}*/
if (!m_pInstance->messages.empty()){
m_pInstance->NeedReConnection = true;
if (!m_pInstance->Reconnection()){
if (m_pInstance->reconnectTimes<4){
m_pInstance->reconnectTimes++;
}
else{
if (m_pInstance->onNetworkError!=nullptr){
m_pInstance->onNetworkError();
}
}
sleep_for(milliseconds((int)(1000*pow(2,m_pInstance->reconnectTimes))));
}
else{
if (m_pInstance->onConnected!=nullptr){
m_pInstance->onConnected();
}
SocketThread::getInstance()->state = Connected;
}
}
sleep_for(milliseconds(500));
continue;
}
m_pInstance->NeedReConnection = false;
if (!m_pInstance->messages.empty()&&SocketThread::getInstance()->state==Connected){
m_pInstance->_messageQueueMutex.lock();
Message* msg = m_pInstance->messages.front();
if (issend){
ByteBuf msgBuffer;
msgBuffer.reset();
msgBuffer.writeInt32(msg->getId());
msgBuffer.writeInt32(0);
msg->write_to(msgBuffer);
issend = false;
send_buf.reset();
send_buf.writeInt32(msgBuffer.readableSize() + 4);
int order = m_pInstance->SendCount;
order ^= 512;
order ^= msgBuffer.readableSize();
send_buf.writeInt32(order);
send_buf.writeInt32(msg->getId());
send_buf.writeInt32(0);
msg->write_to(send_buf);
int a = m_pInstance->csocket.Send((char*)send_buf.getBuf(), send_buf.readableSize());
if (a==-1){
m_pInstance->state = Disconnected;
if (m_pInstance->Reconnection()){
m_pInstance->_messageQueueMutex.unlock();
continue;
}
else{
Director::getInstance()->getScheduler()->performFunctionInCocosThread([&]{
if (m_pInstance->onConnectionClosed!=nullptr){
m_pInstance->onConnectionClosed();
}
});
}
sleep_for(milliseconds(1000));
}
if (a==send_buf.readableSize()){
auto str = bytestohexstring((char*)send_buf.getBuf(), a);
m_pInstance->SendCount++;
m_pInstance->messages.pop_front();
delete msg;
}
else{
auto str = bytestohexstring((char*)send_buf.getBuf(), a);
}
struct timeval now;
gettimeofday(&now, NULL);
m_pInstance->lastsend_time = now.tv_sec;
issend = true;
}
m_pInstance->_messageQueueMutex.unlock();
}
sleep_for(milliseconds(100));
}
return NULL;
}
void SocketThread::start_send_thread(){
threadSend = thread(send_fun, this);
threadSend.detach();
}
//发送消息线程
void* SocketThread::start_thread(void *arg){
#if CC_TARGET_PLATFORM == CC_PLATFORM_ANDROID
JavaVM *vm;
JNIEnv *env;
vm = JniHelper::getJavaVM();
JavaVMAttachArgs thread_args;
thread_args.name = "Socket Load";
thread_args.version = JNI_VERSION_1_4;
thread_args.group = NULL;
vm->AttachCurrentThread(&env, &thread_args);
#endif
SocketThread* thred = (SocketThread*)arg;
ODSocket cdSocket;
cdSocket.Init();
bool isok = cdSocket.Create(AF_INET, SOCK_STREAM, 0);
if (isok) {
}
bool iscon = cdSocket.Connect(thred->ip.c_str(), thred->port);
thred->csocket = cdSocket;
log("start rec thread");
ResPonseThread::getInstance()->start();//?a???óê????¢??3ì
log("start send thread");
m_pInstance->start_send_thread();//?a??·¢?í???¢??3ì
if (iscon){
thred->state = Connected;
log("invoke onConnected in start_thread");
Director::getInstance()->getScheduler()->performFunctionInCocosThread([&, thred]{
if (thred->onConnected != nullptr){
thred->onConnected();
}
});
}
else{
thred->state = Disconnected;
log("can not connect to server");
Director::getInstance()->getScheduler()->performFunctionInCocosThread([&, thred]{
if (thred->onConnectFailed != nullptr){
thred->onConnectFailed();
}
});
}
#if(CC_TARGET_PLATFORM == CC_PLATFORM_ANDROID)
vm->DetachCurrentThread();
#endif
return NULL;
}
string SocketThread::bytestohexstring(char* bytes, int bytelength){
string str("");
string str2("0123456789abcdef");
for (auto i = 0; i < bytelength; i++) {
int b;
b = 0x0f & (bytes[i] >> 4);
char s1 = str2.at(b);
str.append(1, str2.at(b));
b = 0x0f & bytes[i];
str.append(1, str2.at(b));
char s2 = str2.at(b);
}
return str;
}
ODSocket SocketThread::getSocket(){
return this->csocket;
}
SocketThread* SocketThread::m_pInstance = NULL;// new SocketThread;
SocketThread* SocketThread::getInstance(){
if (m_pInstance == NULL){
m_pInstance = new SocketThread();
}
return m_pInstance;
}
//线程停止 并且关闭连接
void SocketThread::stop(){
quit = true;
/*_thread.detach();
threadSend.detach();*/
m_pInstance->messages.clear();
csocket.Close();
m_pInstance = NULL;
log("scoket closed");
}
SocketThread::~SocketThread(void){
/*if(m_pInstance!=NULL){
delete m_pInstance;
}*/
}
bool SocketThread::Reconnection(){
SendCount = 0;
if (csocket){
csocket.Close();
//delete oldSocket;
}
ODSocket cdSocket;
cdSocket.Init();
csocket = cdSocket;
bool isok = csocket.Create(AF_INET, SOCK_STREAM, 0);
if (isok) {
}
bool iscon = csocket.Connect(ip.c_str(), port);
if (iscon){
state = Connected;
log("invoke onConnected in Reconnection");
Director::getInstance()->getScheduler()->performFunctionInCocosThread([&]{
if (onConnected != nullptr){
onConnected();
}
});
return true;
}
return false;
}
bool SocketThread::InBackground=false;
bool SocketThread::quit = false;
int SocketThread::SendCount = 0;
1
https://gitee.com/long33/DGWLMsgServer.git
git@gitee.com:long33/DGWLMsgServer.git
long33
DGWLMsgServer
DGWLMsgServer
master

搜索帮助