8 Star 22 Fork 10

leo / supervisor

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
supervisor.cc 15.93 KB
一键复制 编辑 原始数据 按行查看 历史
leo 提交于 2019-10-25 17:10 . 增加Windows平台支持
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
#include "supervisor.h"
#include "resource.h"
#include "logger.h"
#include "watcher.h"
#include <csignal>
#include <cstring>
#include <ctime>
#include <iostream>
#include <fstream>
#include <regex>
#if defined(_WIN32)
# include <Windows.h>
#else
# include <sys/sysinfo.h>
# include <unistd.h>
#endif
#define HEADER_HTML "Content-type:text/html"
#define HEADER_JSON "Content-type:application/json"
#define MSG_FORBIDDEN "<div style=\"text-align:center;margin-top:10%;margin-left:4%;margin-right:4%;\"><h2>403 Forbidden</h2><hr><p></p></div>"
#define MSG_SUCCESS "{\"success\": true }"
#define MSG_BADPARAM "{\"err\": \"Bad parameters!\"}"
#define MSG_BADNAME "{\"err\": \"A command with same name exists already!\"}"
#define MSG_NEEDSTOP "{\"err\": \"Need to stop command before editing it!\"}"
#define MSG_RUNNING "{\"err\": \"Command is running already!\"}"
void Supervisor::Dispatch(mg_connection * pConn, int nEvent, void * pData) {
Supervisor * pIns = (Supervisor *)pConn->mgr->user_data;
switch (nEvent) {
case MG_EV_ACCEPT:
if (!pIns->CheckConnect(pConn)) pIns->Response(pConn, 403, HEADER_HTML, MSG_FORBIDDEN);
break;
case MG_EV_CLOSE:
pIns->ResponseClose(pConn);
break;
case MG_EV_HTTP_REQUEST: {
http_message * pMsg = (http_message *)pData;
if (!pIns->CheckAuth(pConn, pMsg)) {
pIns->ResponseNeedAuth(pConn);
} else if (pMsg->uri.len == 1) {
pIns->ResponseResource(pConn, "res/index.html");
} else if (pMsg->uri.len > 4 && std::string(pMsg->uri.p, 5) == "/res/") {
std::string sPath(pMsg->uri.p + 1, pMsg->uri.len - 1);
pIns->ResponseResource(pConn, sPath);
} else if (pMsg->uri.len > 4 && std::string(pMsg->uri.p, 5) == "/api/") {
pIns->ResponseApi(pConn, pMsg);
} else {
pIns->Response(pConn, 403, HEADER_HTML, MSG_FORBIDDEN);
}
break; }
case MG_EV_WEBSOCKET_HANDSHAKE_DONE: {
http_message * pMsg = (http_message *)pData;
std::string sUrl(pMsg->uri.p, pMsg->uri.len);
pIns->ResponseWebsocketOpened(pConn, sUrl);
break; }
default:
break;
}
}
void Supervisor::Start(Command & rCmd) {
if (rCmd.Has("help")) {
std::cout
<< "\nSupervisor - System process controller for Unix. Version 1.0\n"
<< "Copyright(c) longshuang@msn.cn 2015-2016. All rights reserved.\n\n"
<< "[Usage]\n"
<< "\thelp\tShow this message.\n"
<< "\tport=N\tSet listen port.\n"
<< "\tdaemon\tRun in daemon mode.\n"
<< std::endl;
return;
}
#if defined(_WIN32)
if (_access("supervisor.json", 0) != 0) SaveSetting();
#else
if (access("supervisor.json", 0) != 0) SaveSetting();
#endif
if (!LoadSetting()) return;
if (!GResource.Load()) {
LOG_ERR("Supervisor start failed due to load resource!");
return;
}
#if !defined(_WIN32)
signal(SIGPIPE, SIG_IGN);
signal(SIGKILL, [](int nSig) { GWatcher.StopAll(); exit(0); });
#endif
signal(SIGINT, [](int nSig) { GWatcher.StopAll(); exit(0); });
signal(SIGTERM, [](int nSig) { GWatcher.StopAll(); exit(0); });
mg_mgr_init(&_iMgr, this);
std::string sPort = rCmd.Has("port") ? rCmd.Get("port") : "8088";
mg_connection * pConn = mg_bind(&_iMgr, sPort.c_str(), Supervisor::Dispatch);
if (!pConn) {
LOG_ERR("Supervisor start failed : mg_bind!");
return;
}
mg_set_protocol_http_websocket(pConn);
LOG_INFO("Supervisor started on :%s", sPort.data());
Register("/api/add", EMethod::POST, &Supervisor::AddWatcher);
Register("/api/edit", EMethod::POST, &Supervisor::EditWatcher);
Register("/api/reload", EMethod::GET, &Supervisor::Reload);
Register("/api/start", EMethod::GET, &Supervisor::StartWatcher);
Register("/api/stop", EMethod::GET, &Supervisor::StopWatcher);
Register("/api/delete", EMethod::GET, &Supervisor::DeleteWatcher);
GWatcher.SetNotifier([this](const std::string & sScope, const Json::Value & rMsg) {
Broadcast(sScope, rMsg.toStyledString());
});
if (rCmd.Has("daemon")) {
#if defined(_WIN32)
HWND hWnd = ::FindWindowA("ConsoleWindowClass", NULL);
if (hWnd) ::ShowWindow(hWnd, SW_HIDE);
#else
daemon(1, 0);
#endif
}
while (true) {
mg_mgr_poll(&_iMgr, 10);
GWatcher.Breath();
}
mg_mgr_free(&_iMgr);
}
bool Supervisor::CheckConnect(mg_connection * pConn) {
int nIPCount = _iConf["iptables"].size();
if (nIPCount <= 0) return true;
const char * pAddr = inet_ntoa(pConn->sa.sin.sin_addr);
for (int i = 0; i < nIPCount; ++i) {
std::regex iMatch(_iConf["iptables"][i].asString(), std::regex::extended);
if (std::regex_match(pAddr, iMatch)) {
return true;
}
}
return false;
}
bool Supervisor::CheckAuth(mg_connection * pConn, http_message * pMsg) {
if (!_pAuth) return true;
fseek(_pAuth, SEEK_SET, 0);
return mg_http_check_digest_auth(pMsg, "supervisor", _pAuth) == 1;
}
void Supervisor::ResponseClose(mg_connection *pConn) {
char * pScope = (char *)pConn->user_data;
if (pScope) delete[] pScope;
for (auto it = _vSockets.begin(); it != _vSockets.end(); ++it) {
if (*it == pConn) {
_vSockets.erase(it);
break;
}
}
}
void Supervisor::ResponseResource(mg_connection * pConn, const std::string & sPath) {
Asset * pAsset = GResource.Get(sPath);
if (!pAsset) {
Response(pConn, 403, HEADER_HTML, MSG_FORBIDDEN);
} else {
std::string sHeader("Content-type:"), sContent(pAsset->pData, pAsset->nSize);
sHeader.append(pAsset->pType);
Response(pConn, 200, sHeader, sContent);
}
}
void Supervisor::ResponseWebsocketOpened(mg_connection *pConn, const std::string & sUrl) {
char * pScope = new char[sUrl.size() + 1];
memcpy(pScope, sUrl.data(), sUrl.size());
pScope[sUrl.size()] = '\0';
pConn->user_data = pScope;
_vSockets.push_back(pConn);
if (sUrl == "/ws") {
for (size_t i = 0; i < _iConf["watcher"].size(); ++i) {
Json::Value iWatcher = _iConf["watcher"][(int)i];
std::string sName = iWatcher["name"].asString();
Watcher::Info * pInfo = GWatcher.Get(sName);
Json::Value iMsg;
iMsg["action"] = "add";
iMsg["watcher"] = iWatcher;
iMsg["watcher"]["status"] = pInfo ? pInfo->emStatus : Watcher::EStatus::Stopped;
iMsg["watcher"]["pid"] = pInfo ? (int)pInfo->nPid : -1;
iMsg["watcher"]["start_time"] = pInfo ? pInfo->GetStartTime() : "----";
std::string sMsg = iMsg.toStyledString();
mg_send_websocket_frame(pConn, WEBSOCKET_OP_TEXT, sMsg.data(), sMsg.size());
}
} else if (sUrl.substr(0, 6) == "/tail/") {
std::string sName = sUrl.substr(6, sUrl.size() - 6);
Watcher::Info * pInfo = GWatcher.Get(sName);
if (pInfo) {
Json::Value iMsg;
iMsg["action"] = "tail_sync";
iMsg["data"] = pInfo->pTail;
Broadcast(sUrl, iMsg.toStyledString());
}
}
}
void Supervisor::ResponseApi(mg_connection * pConn, http_message * pMsg) {
std::string sUrl(pMsg->uri.p, pMsg->uri.len);
auto it = _mApis.find(sUrl);
if (it == _mApis.end()) {
Response(pConn, 403, HEADER_HTML, MSG_FORBIDDEN);
return;
}
if (it->second.emMethod == EMethod::GET) {
char * pBuf = new char[pMsg->query_string.len + 1];
size_t nSize = mg_url_decode(pMsg->query_string.p, pMsg->query_string.len, pBuf, pMsg->query_string.len + 1, true);
Json::Value iParam = Url2Json(pBuf, nSize);
delete[] pBuf;
(this->*(it->second.fProc))(pConn, iParam);
} else {
char * pBuf = new char[pMsg->body.len + 1];
size_t nSize = mg_url_decode(pMsg->body.p, pMsg->body.len, pBuf, pMsg->body.len + 1, true);
Json::Value iParam = Url2Json(pBuf, nSize);
delete[] pBuf;
(this->*(it->second.fProc))(pConn, iParam);
}
}
void Supervisor::ResponseNeedAuth(mg_connection * pConn) {
std::string sHeader;
char pMD5[33] = {0};
sHeader.append("WWW-Authenticate: Digest realm=\"supervisor\",");
sHeader.append("qop=\"auth\",");
sHeader.append("nonce=\"");
sHeader.append(std::to_string((uint32_t)time(NULL)));
sHeader.append("\",opaque=\"");
sHeader.append(cs_md5(pMD5, "supervisor", 10, NULL));
sHeader.append("\"");
Response(pConn, 401, sHeader, "");
}
void Supervisor::Reload(mg_connection * pConn, const Json::Value & rParam) {
LoadSetting();
Response(pConn, 200, HEADER_JSON, MSG_SUCCESS);
}
void Supervisor::AddWatcher(mg_connection * pConn, const Json::Value & rParam) {
if (rParam["name"].isNull() || rParam["cmd"].isNull()) {
Response(pConn, 200, HEADER_JSON, MSG_BADPARAM);
return;
}
std::string sName = rParam["name"].asString();
if (!GetConf(sName).isNull()) {
Response(pConn, 200, HEADER_JSON, MSG_BADNAME);
return;
}
Json::Value iProgram;
iProgram["name"] = rParam["name"];
iProgram["cmd"] = rParam["cmd"];
iProgram["dir"] = rParam["dir"].isNull() ? "/" : rParam["dir"];
iProgram["retry"] = rParam["retry"].isNull() ? 0 : atoi(rParam["retry"].asCString());
if (_iConf["watcher"].isNull()) _iConf["watcher"] = Json::Value(Json::arrayValue);
_iConf["watcher"].append(iProgram);
SaveSetting();
Json::Value iMsg;
iMsg["action"] = "add";
iMsg["watcher"] = iProgram;
iMsg["watcher"]["status"] = Watcher::EStatus::Stopped;
Broadcast("/ws", iMsg.toStyledString());
Response(pConn, 200, HEADER_JSON, MSG_SUCCESS);
}
void Supervisor::EditWatcher(mg_connection * pConn, const Json::Value & rParam) {
if (rParam["org-name"].isNull() || rParam["name"].isNull() || rParam["cmd"].isNull()) {
Response(pConn, 200, HEADER_JSON, MSG_BADPARAM);
return;
}
std::string sOldName = rParam["org-name"].asString();
std::string sNewName = rParam["name"].asString();
Watcher::Info * pStatus = GWatcher.Get(sOldName);
if (pStatus && pStatus->emStatus == Watcher::EStatus::Running) {
Response(pConn, 200, HEADER_JSON, MSG_NEEDSTOP);
return;
}
Json::Value iFind = Json::Value::null;
int nIdx = 0;
for (int i = 0; i < (int)_iConf["watcher"].size(); ++i) {
std::string sFind = _iConf["watcher"][i]["name"].asString();
if (sFind == sOldName) {
iFind = _iConf["watcher"][i];
nIdx = i;
} else if (sFind == sNewName) {
Response(pConn, 200, HEADER_JSON, MSG_BADNAME);
return;
}
}
if (iFind.isNull()) {
Response(pConn, 200, HEADER_JSON, MSG_BADPARAM);
return;
}
iFind["name"] = sNewName;
iFind["cmd"] = rParam["cmd"];
iFind["dir"] = rParam["dir"].isNull() ? "/" : rParam["dir"];
iFind["retry"] = rParam["retry"].isNull() ? 0 : atoi(rParam["retry"].asCString());
Json::Value iUnused;
_iConf["watcher"][nIdx] = iFind;
GWatcher.Remove(sOldName);
SaveSetting();
Json::Value iMsg;
iMsg["action"] = "add";
iMsg["watcher"] = iFind;
iMsg["watcher"]["status"] = Watcher::EStatus::Stopped;
if (sNewName != sOldName) iMsg["rename"] = sOldName;
Broadcast("/ws", iMsg.toStyledString());
Response(pConn, 200, HEADER_JSON, MSG_SUCCESS);
}
void Supervisor::StartWatcher(mg_connection * pConn, const Json::Value & rParam) {
if (rParam["name"].isNull()) {
Response(pConn, 200, HEADER_JSON, MSG_BADPARAM);
return;
}
std::string sName = rParam["name"].asString();
Json::Value iProg = GetConf(sName);
if (iProg.isNull()) {
Response(pConn, 200, HEADER_JSON, MSG_BADPARAM);
return;
}
std::string sCmd = iProg["cmd"].asString();
std::string sDir = iProg["dir"].asString();
int nRetry = iProg["retry"].asInt();
GWatcher.Start(sName, sDir, sCmd, nRetry);
Response(pConn, 200, HEADER_JSON, MSG_SUCCESS);
}
void Supervisor::StopWatcher(mg_connection * pConn, const Json::Value & rParam) {
if (rParam["name"].isNull()) {
Response(pConn, 200, HEADER_JSON, MSG_BADPARAM);
return;
}
std::string sName = rParam["name"].asString();
Json::Value iProg = GetConf(sName);
if (iProg.isNull()) {
Response(pConn, 200, HEADER_JSON, MSG_BADPARAM);
return;
}
GWatcher.Stop(sName);
Response(pConn, 200, HEADER_JSON, MSG_SUCCESS);
}
void Supervisor::DeleteWatcher(mg_connection * pConn, const Json::Value & rParam) {
if (rParam["name"].isNull()) {
Response(pConn, 200, HEADER_JSON, MSG_BADPARAM);
return;
}
std::string sName = rParam["name"].asString();
Watcher::Info * pStatus = GWatcher.Get(sName);
if (pStatus && pStatus->emStatus == Watcher::EStatus::Running) {
Response(pConn, 200, HEADER_JSON, MSG_NEEDSTOP);
return;
}
for (int i = 0; i < (int)_iConf["watcher"].size(); ++i) {
std::string sFind = _iConf["watcher"][i]["name"].asString();
if (sFind == sName) {
Json::Value iUnused;
_iConf["watcher"].removeIndex(i, &iUnused);
}
}
GWatcher.Remove(sName);
SaveSetting();
Json::Value iMsg;
iMsg["action"] = "delete";
iMsg["watcher"] = sName;
Broadcast("/ws", iMsg.toStyledString());
Response(pConn, 200, HEADER_JSON, MSG_SUCCESS);
}
bool Supervisor::LoadSetting() {
std::ifstream ifs("supervisor.json");
Json::Reader iReader;
Json::Value iTemp;
if (!iReader.parse(ifs, iTemp, false)) {
LOG_ERR("Load configuration failed : %s", iReader.getFormattedErrorMessages().c_str());
return false;
}
if (_pAuth) {
fclose(_pAuth);
_pAuth = NULL;
}
int nUsers = iTemp["user"].size();
if (nUsers > 0) {
_pAuth = tmpfile();
char * pBuf = new char[256];
char * pMD5 = new char[33];
for (int i = 0; i < nUsers; ++i) {
Json::Value & r = iTemp["user"][i];
std::string sAccount = r["account"].asString();
std::string sPswd = r["pswd"].asString();
std::string sData = sAccount + ":supervisor:" + sPswd;
char * pH1 = cs_md5(pMD5, sData.data(), sData.size(), NULL);
snprintf(pBuf, 256, "%s:supervisor:%s\n", sAccount.data(), pH1);
fprintf(_pAuth, "%s", pBuf);
}
delete[] pBuf;
delete[] pMD5;
}
_iConf = iTemp;
return true;
}
void Supervisor::SaveSetting() {
std::ofstream ofs("supervisor.json");
Json::StyledWriter iWriter;
ofs << iWriter.write(_iConf) << std::endl;
ofs.flush();
ofs.close();
}
void Supervisor::Register(const std::string & sUrl, EMethod emMethod, Proc fProc) {
if (_mApis.find(sUrl) != _mApis.end()) return;
Processor iProc;
iProc.emMethod = emMethod;
iProc.fProc = fProc;
_mApis[sUrl] = iProc;
}
Json::Value Supervisor::Url2Json(const char * pData, size_t nSize) {
std::string sUrl(pData, nSize);
Json::Value iParam;
size_t nStart = -1;
size_t nEnd = sUrl.find_first_of('=');
if (nEnd == std::string::npos || nEnd == nSize - 1 || nEnd <= 0)
return std::move(iParam);
do {
size_t nNext = sUrl.find_first_of('&', nEnd + 1);
if (nNext == std::string::npos) nNext = nSize;
std::string sKey = sUrl.substr(nStart + 1, nEnd - nStart - 1);
std::string sVal = sUrl.substr(nEnd + 1, nNext - nEnd - 1);
if (!sVal.empty()) iParam[sKey] = sVal;
if (nNext == nSize) break;
nStart = nNext;
nEnd = sUrl.find_first_of('=', nStart);
} while (nStart < nEnd && nEnd != std::string::npos && nEnd < nSize - 1);
return std::move(iParam);
}
Json::Value Supervisor::GetWatcherInfo(const std::string & sName) {
Watcher::Info * pInfo = GWatcher.Get(sName);
Json::Value iInfo = GetConf(sName);
iInfo["name"] = sName;
iInfo["status"] = pInfo ? pInfo->emStatus : Watcher::EStatus::Stopped;
return std::move(iInfo);
}
Json::Value Supervisor::GetConf(const std::string & sName) {
int nWatcher = _iConf["watcher"].size();
for (int n = 0; n < nWatcher; ++n) {
Json::Value iWatcher = _iConf["watcher"][n];
if (iWatcher["name"].asString() == sName) return iWatcher;
}
return Json::Value::null;
}
void Supervisor::Response(mg_connection * pConn, int nHttpCode, const std::string & sHeader, const std::string & sContent) {
mg_send_head(pConn, nHttpCode, sContent.size(), sHeader.empty() ? "Content-type:text/plain" : sHeader.data());
if (!sContent.empty()) mg_send(pConn, sContent.data(), sContent.size());
pConn->flags |= MG_F_SEND_AND_CLOSE;
}
void Supervisor::Broadcast(const std::string & sScope, const std::string & sJson) {
for (mg_connection * p : _vSockets) {
char * pScope = (char *)p->user_data;
if (sScope == pScope) mg_send_websocket_frame(p, WEBSOCKET_OP_TEXT, sJson.data(), sJson.size());
}
}
int main(int nArgc, char * pArgv[]) {
Command iCmd(nArgc, pArgv);
Supervisor iService;
iService.Start(iCmd);
return 0;
}
C++
1
https://gitee.com/love_linger/supervisor.git
git@gitee.com:love_linger/supervisor.git
love_linger
supervisor
supervisor
master

搜索帮助