1 Star 0 Fork 0

胖纸_张毅 / tlib

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
teventloop.cxx 6.50 KB
一键复制 编辑 原始数据 按行查看 历史
肉肉 提交于 2015-11-16 21:55 . no commit message
#include "tserver.h"
#include "tsocket.h"
#include "teventloop.h"
#include <stdio.h>
#include <stdlib.h>
#include <iostream>
namespace tlib
{
//static TMutex g_mutex;
#ifdef WIN32
TEventLoop::TEventLoop()
{
FD_ZERO(&m_connSet);
FD_ZERO(&m_sendSet);
FD_ZERO(&m_recvSet);
m_timeout.tv_sec = 0;
m_timeout.tv_usec = 500;
}
TEventLoop::~TEventLoop()
{
}
//IO线程, BS线程都会调用
bool TEventLoop::AddLoop(int fd, EventMonitor em)
{
//TAutoLock lock(&g_mutex);
if (em == Read)
FD_SET(fd, &m_recvSet);
if (em == Write)
FD_SET(fd, &m_sendSet);
FD_SET(fd, &m_connSet);
return true;
}
//IO线程, BS线程都会调用
bool TEventLoop::DelLoop(int fd, EventMonitor em)
{
//TAutoLock lock(&g_mutex);
if (em == Read)
FD_CLR(fd, &m_recvSet);
if (em == Write)
FD_CLR(fd, &m_sendSet);
FD_CLR(fd, &m_connSet);
return true;
}
//运行线程: m_ioThreadPool
//调用路径: TServer.Start()->TEventLoop.RunLoop()
void TEventLoop::RunLoop(TServer* srv)
{
log_debug("io线程[%d]启动..", CurrThreadId());
fd_set recvFdSet, sendFdSet;
for (;;)
{
FD_ZERO(&recvFdSet);
FD_ZERO(&sendFdSet);
recvFdSet = m_recvSet;
sendFdSet = m_sendSet;
int ret = ::select(0, &recvFdSet, &sendFdSet, NULL, NULL/*&m_timeout*/); //为等待时间传入NULL,则永久等待, 而传入0立即返回,不要使用
for (unsigned int i = 0; i < m_connSet.fd_count; i++)
{
if (FD_ISSET(m_connSet.fd_array[i], &recvFdSet))
{
if (m_connSet.fd_array[i] == srv->m_nSrvFd)
{ //监听端口
int tid = CurrThreadId();
TClient* pConn = new TClient;
pConn->pSrv = srv;
pConn->fd = TSocket::Accept(srv->m_nSrvFd);
pConn->m_inputBuffer.Bind(&srv->m_pBufferPool);
pConn->m_outputBuffer.Bind(&srv->m_pBufferPool);
TSocket::SetNonBlock(pConn->fd, true);
//TODO: AddLoop()//考虑线程的情况
srv->m_eventLoop[IO_IDX(pConn->fd)].m_conns[pConn->fd] = pConn;
srv->m_eventLoop[IO_IDX(pConn->fd)].AddLoop(pConn->fd, Read);
srv->m_bsQueues[BS_IDX(pConn->fd)].Push(bind(&TServer::OnConn, srv, pConn));
}
else
{ //接受数据//connSet_.fd_array[i]
int tid = CurrThreadId();
TClient* pConn = m_conns[m_connSet.fd_array[i]];
int len = pConn->ReadFd();
if (len > 0)
{
void* msg = NULL;
if (srv->PreRecv(pConn, msg))
{
srv->m_bsQueues[BS_IDX(pConn->fd)].Push(bind(&TServer::OnRecv, srv, pConn, msg));
}
}
else if (len == 0)
{ //貌似本来长度为0就是客户端断开, 现在是负数是断开, 这点要注意
}
else
{ //客户端断开
//TODO: DelLoop()//考虑线程的情况
srv->m_eventLoop[IO_IDX(pConn->fd)].DelLoop(pConn->fd, Read);
srv->m_eventLoop[IO_IDX(pConn->fd)].DelLoop(pConn->fd, Write);
TSocket::Close(pConn->fd);
srv->m_bsQueues[BS_IDX(pConn->fd)].Push(bind(&TServer::OnClose, srv, pConn));
}
}
}
if (FD_ISSET(m_connSet.fd_array[i], &sendFdSet))
{ //发送数据
TClient* pConn = m_conns[m_connSet.fd_array[i]];
srv->m_bsQueues[BS_IDX(pConn->fd)].Push(bind(&TClient::SendFd, pConn));
}
}
}
}
#else
TEventLoop::TEventLoop()
{
m_epollfd = ::epoll_create(MAX_EVENTS);
if (m_epollfd < 0)
log_exit("epoll_create出错, exit");
}
TEventLoop::~TEventLoop()
{
close(m_epollfd);
}
bool TEventLoop::AddLoop(int fd, EventMonitor em)
{
//TAutoLock lock(&g_mutex);
if (em == Read)
m_ev.events = EPOLLIN;
if (em == Write)
m_ev.events = EPOLLOUT;
m_ev.data.fd = fd;
if (-1 == ::epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &m_ev))
{
if (errno != 17)
{
log_exit("epoll[%d] del fd[%d] failed, errno: %d", m_epollfd, fd, errno);
}
}
log_debug("epoll[%d] add fd[%d] success", m_epollfd, fd);
}
bool TEventLoop::DelLoop(int fd, EventMonitor em)
{
//TAutoLock lock(&g_mutex);
if (em == Read)
m_ev.events = EPOLLIN;
if (em == Write)
m_ev.events = EPOLLOUT;
m_ev.data.fd = fd;
if (-1 == ::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, fd, &m_ev))
log_exit("epoll[%d] del fd[%d] failed, errno: %d", m_epollfd, fd, errno);
log_debug("epoll[%d] del fd[%d] success", m_epollfd, fd);
}
void TEventLoop::RunLoop(TServer* srv)
{
log_debug("io线程[%d]启动..", CurrThreadId());
for (;;)
{
int fds = ::epoll_wait(m_epollfd, m_events, MAX_EVENTS, -1);
for (int i = 0; i < fds; i++)
{
if (m_events[i].events & EPOLLIN)
{
int tid = CurrThreadId();
if (m_events[i].data.fd == srv->m_nSrvFd)
{
TClient* pConn = new TClient;
pConn->pSrv = srv;
pConn->fd = TSocket::Accept(srv->m_nSrvFd);
pConn->m_inputBuffer.Bind(&srv->m_pBufferPool);
pConn->m_outputBuffer.Bind(&srv->m_pBufferPool);
TSocket::SetNonBlock(pConn->fd, true);
//TODO: AddLoop()//考虑线程的情况
if (srv->m_eventLoop[IO_IDX(pConn->fd)].m_conns.capacity() < pConn->fd)
{
srv->m_eventLoop[IO_IDX(pConn->fd)].m_conns.reserve(srv->m_eventLoop[IO_IDX(pConn->fd)].m_conns.capacity() + 10240);
}
srv->m_eventLoop[IO_IDX(pConn->fd)].m_conns[pConn->fd] = pConn;
srv->m_eventLoop[IO_IDX(pConn->fd)].AddLoop(pConn->fd, Read);
srv->m_bsQueues[BS_IDX(pConn->fd)].Push(bind(&TServer::OnConn, srv, pConn));
}
else
{
int tid = CurrThreadId();
TClient* pConn = m_conns[m_events[i].data.fd];
int len = pConn->ReadFd();
if (len > 0)
{
void* msg = NULL;
if (srv->PreRecv(pConn, msg))
{
srv->m_bsQueues[BS_IDX(pConn->fd)].Push(bind(&TServer::OnRecv, srv, pConn, msg));
}
}
else if (len == 0)
{ //貌似本来长度为0就是客户端断开, 现在是负数是断开, 这点要注意
}
else
{ //客户端断开
//TODO: DelLoop()//考虑线程的情况
srv->m_eventLoop[IO_IDX(pConn->fd)].DelLoop(pConn->fd, Read);
if (m_events[i].events & EPOLLOUT)
srv->m_eventLoop[IO_IDX(pConn->fd)].DelLoop(pConn->fd, Write);
TSocket::Close(pConn->fd);
srv->m_bsQueues[BS_IDX(pConn->fd)].Push(bind(&TServer::OnClose, srv, pConn));
}
}
}
if (m_events[i].events & EPOLLOUT)
{
TClient* pConn = m_conns[m_events[i].data.fd];
srv->m_bsQueues[BS_IDX(pConn->fd)].Push(bind(&TClient::SendFd, pConn));
}
}
}
}
#endif
} //namespace tlib
1
https://gitee.com/osczyup/tlib.git
git@gitee.com:osczyup/tlib.git
osczyup
tlib
tlib
master

搜索帮助