代码拉取完成,页面将自动刷新
#pragma once
#include <string>
#include "hiredis.h"
#include "async.h"
#include <functional>
#include <memory>
#include <atomic>
#include "utils.hpp"
#include <iostream>
#include <chrono>
#include <thread>
#include <hiredis.h>
#include <async.h>
#include <adapters/libev.h>
#include "logger.hpp"
#include "event_queue.hpp"
enum ConnStatus{
CONN_INIT,
CONN_AUTH,
CONN_OPEN,
CONN_CLOSE
};
enum RedisEvent{
EVT_CONNECT,
EVT_AUTH,
EVT_DISCONNECTED,
EVT_RESULT,
};
typedef std::function<void(RedisEvent)> RedisEvtHandle;
typedef std::function<void(redisReply * )> RedisHandle;
struct RedisResult{
RedisResult(int t):type(t)
{
begin = GetMilliSeconds() ;
}
RedisResult(const std::string &cmd = "",RedisHandle h = nullptr,
redisReply *r= nullptr):command(cmd),handle(h),reply(r)
{
begin = GetMilliSeconds() ;
}
int type = EVT_RESULT;
std::string command;
RedisHandle handle = nullptr;
redisReply * reply = nullptr;
void * data;
~RedisResult()
{
if (reply != nullptr)
{
freeReplyObject(reply);
reply = nullptr;
}
}
double begin;
double end ;
double get_duration()
{
return end - begin ;
}
};
typedef std::function<void(RedisResult* )> ResultHandler;
typedef EventQueue<RedisResult*> RedisResultQueue;
class RedisConn
{
public:
RedisConn(int idx = 0)
{
m_index = idx;
m_status = CONN_INIT;
//recv_start_time = std::chrono::system_clock::now();
recv_start_time = GetMilliSeconds();
}
int init(const std::string & host = "127.0.0.1",
int port = 6379,const std::string & passwd = "")
{
m_loop = ev_loop_new(EVFLAG_AUTO);
ev_set_userdata(m_loop, (void*)this);
m_host = host;
m_port = port;
m_passwd = passwd ;
connect();
m_thread = std::thread(&RedisConn::run,this);
return 0;
}
static void on_async_command(redisAsyncContext *ctx, void *r, void *privdata) {
RedisConn * self = (RedisConn*)ctx->data;
if (self->m_speed_count == 0)
{
//DLog("[%d] when first command",self->m_index);
//self->recv_start_time = std::chrono::system_clock::now();
self->recv_start_time = GetMilliSeconds();
}
if (self->m_speed_count %10000 == 9999)
{
//DLog("[%d] after finished 10000 command ",self->m_index);
//auto escaped_time = std::chrono::system_clock::now() - self->recv_start_time;
auto escaped_time = GetMilliSeconds() - self->recv_start_time;
//std::cout << "Conn[" << self->m_index << "] 10000 commands time " << escaped_time.count() << std::endl;
std::cout << "Conn[" << self->m_index << "] 10000 commands time " << escaped_time<< std::endl;
self->m_speed_count = 0;
}
else {
self->m_speed_count ++;
}
redisReply *reply = (redisReply*) r;
if (reply == NULL)
{
RedisResult * pResult = (RedisResult*) privdata;
pResult->end = GetMilliSeconds();
self->result_queue.push(pResult);
return;
}
RedisResult * pResult = (RedisResult*) privdata;
pResult->end = GetMilliSeconds();
self->result_queue.push(pResult);
}
static void on_auth(redisAsyncContext *ctx, void *r, void *privdata) {
redisReply *reply = (redisReply*) r;
if (reply == NULL)
{
DLog("on auth failed ");
return;
}
RedisConn * self = (RedisConn*)ctx->data;
self->m_status = CONN_OPEN;
DLog("on auth finished");
RedisResult * pResult = new RedisResult(EVT_AUTH);
self->result_queue.push(pResult);
}
static void on_connected(const redisAsyncContext *ctx, int status) {
if (status != REDIS_OK) {
DLog("Error: %s", ctx->errstr);
return;
}
DLog("Connected...");
RedisConn * self = (RedisConn*)ctx->data;
self->auth();
}
static void on_disconnect(const redisAsyncContext *ctx, int status) {
if (status != REDIS_OK) {
DLog("Error: %s", ctx->errstr);
return;
}
RedisConn * self = (RedisConn*)ctx->data;
DLog("Disconnected...");
}
int connect()
{
DLog("connect to server %s:%d",m_host.c_str(),m_port);
m_ctx = redisAsyncConnect(m_host.c_str() , m_port);
if (m_ctx->err) {
DLog("Error: %s", m_ctx->errstr);
return -1;
}
m_ctx->data = this;
redisLibevAttach(m_loop,m_ctx);
redisAsyncSetConnectCallback(m_ctx,&RedisConn::on_connected );
redisAsyncSetDisconnectCallback(m_ctx,&RedisConn::on_disconnect);
m_isRunning = true;
return 0;
}
void auth()
{
if (!m_passwd.empty())
{
redisAsyncCommand(m_ctx, on_auth, this, "AUTH %s",m_passwd.c_str() );
}
else
{
m_status = CONN_OPEN;
RedisResult * pResult = new RedisResult(EVT_CONNECT);
this->result_queue.push(pResult);
}
}
void execute(const std::string & query,RedisResult * pResult )
{
pResult->command = query;
m_inner_queue.push(pResult);
ev_async_send(m_loop,&m_evwatch);
}
static void ev_proc(struct ev_loop * loop,ev_async * async,int )
{
RedisConn * self = (RedisConn*) ev_userdata(loop);
if (self->is_connected())
{
self->m_inner_queue.process([&](RedisResult* pRst){
self->process_count ++;
redisAsyncCommand(self->m_ctx, &RedisConn::on_async_command, pRst, pRst->command.c_str() );
});
}
}
void run()
{
ev_run(m_loop, EVRUN_ONCE);
ev_run(m_loop, EVRUN_NOWAIT);
ev_async_init(&m_evwatch ,ev_proc);
ev_async_start(m_loop,&m_evwatch);
while (m_isRunning) {
ev_run(m_loop,EVRUN_NOWAIT);
}
DLog("############quit run ################");
//ev_loop(EV_DEFAULT_ 0);
}
void stop()
{
redisAsyncDisconnect(m_ctx);
m_isRunning = false;
ev_break(m_loop);
m_thread.join();
}
bool is_connected()
{
return m_status == CONN_OPEN || m_status == CONN_AUTH;
}
RedisResultQueue result_queue;
std::atomic<uint64_t> process_count ;
//std::chrono::time_point<std::chrono::system_clock> recv_start_time ;
double recv_start_time;
private:
EventQueue<RedisResult* > m_inner_queue;
int m_index;
int m_speed_count = 0;
std::thread m_thread;
std::string m_host;
struct ev_loop *m_loop;
ev_async m_evwatch;
int m_port;
std::string m_passwd;
redisAsyncContext *m_ctx ;
bool m_isRunning = false;
ConnStatus m_status;
};
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。