57 Star 134 Fork 41

K. / GoTasks

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
task_service.go 9.50 KB
一键复制 编辑 Web IDE 原始数据 按行查看 历史
K. 提交于 2015-10-07 20:32 . 修改golang版本
package GoTasks
import (
"flag"
"encoding/json"
"time"
"os"
"git.oschina.net/janpoem/go-logger.git"
"github.com/chuckpreslar/emission"
"net/http"
)
import (
_ "net/http/pprof"
"strconv"
)
type MainConfig struct {
TasksUrl string
Cycle int
TraceCycle int
LogDir string
LogLevel int
ShowComplete int
IsTest bool
IsConsole bool
PprofPort int
}
type Task struct {
Name string
Url string
Cycle int
PostUrl string
Delay int
EventName string
}
type Tasks struct {
Tasks[] Task
}
type TaskResult struct {
Id time.Time
Name string
Resp string
}
const (
CLEAN_NONE int = iota
CLEAN_START
CLEAN_COMPLETE
)
var runtimeDir = GetRuntimeDir()
//var configFile string
var globalConfig = MainConfig{"", 0, 0, "", logger.DEBUG, 0, false, false, 0 }
var globalEE = emission.NewEmitter()
var prepared = false
////////////////////////////////////////////////////////////////////////////////////////////
// 前置准备函数
////////////////////////////////////////////////////////////////////////////////////////////
func Prepare() {
if prepared {
return
}
// 命令行参数预定义
flag.StringVar(&globalConfig.LogDir, "log-dir", runtimeDir, "日志输出目录")
flag.IntVar(&globalConfig.LogLevel, "log-level", logger.DEBUG, "日志输出级别")
flag.IntVar(&globalConfig.Cycle, "cycle", 3600 * 12, "重新加载任务的周期,单位秒")
flag.IntVar(&globalConfig.TraceCycle, "trace-cycle", 600, "输出心跳信息的周期,单位秒")
flag.StringVar(&globalConfig.TasksUrl, "tasks", "", "任务列表")
flag.IntVar(&globalConfig.ShowComplete, "show-complete", 0, "显示请求完成的信息")
flag.BoolVar(&globalConfig.IsTest, "test", false, "是否测试命令的参数")
flag.BoolVar(&globalConfig.IsConsole, "console", true, "是否在命令行输出调试")
flag.IntVar(&globalConfig.PprofPort, "pprof-port", 0, "pprof服务器端口号,为零不启用")
// 解析命令行参数
if len(os.Args) > 1 && os.Args[1] == "--" {
// 使用linux的start-stop-daemon来启动进程的时候,--会导致参数的解析出错
// 比如:start-stop-daemon --start --quiet --pidfile --make-pidfile ${PIDFILE} --exec ${DAEMON} -- ${DAEMON_ARGS}
flag.CommandLine.Parse(os.Args[2:])
} else {
flag.CommandLine.Parse(os.Args[1:])
}
// 标记已经准备好了
prepared = true
// 现在版本的logger貌似有点问题,日志文件不指定而调用输出方法会抛出一个空指针的错误
logger.SetConsole(globalConfig.IsConsole)
logger.SetRollingFile(globalConfig.LogDir, "task_service.log", 10, 5, logger.MB)
logger.SetLevel(globalConfig.LogLevel)
if len(globalConfig.TasksUrl) <= 0 {
// linux系统可能没有中文...
logger.Error("Please specify a valid tasks url!")
os.Exit(1)
}
if !Exist(globalConfig.LogDir) {
logger.Error("Log direstory does not exist!")
os.Exit(1)
}
// 增加命令参数的调试
if globalConfig.IsTest {
os.Exit(0)
}
logger.Log("程序主目录:", runtimeDir)
logger.Log("日志目录:", globalConfig.LogDir, "日志级别:", globalConfig.LogLevel)
// if len(configFile) > 0 {
// loadConfig(configFile)
// }
}
// 本来想额外读取一个json文件,现在已经完全不需要了
//func loadConfig(path string) (bool) {
// logger.Log("加载配置文件:", path)
// raw, err := ReadFileByte(path)
// if err != nil {
// logger.Warn("加载配置文件出错", err)
// return false
// }
// var inter interface{}
// json.Unmarshal(raw, &inter)
// config, ok := inter.(map[string]interface{})
// if ok {
// for k, v := range config {
// js := JsonValue{v }
// if k == "TasksUrl" {
// url := js.AsString()
// if len(url) > 0 {
// globalConfig.TasksUrl = url
// }
// }
// if k == "Cycle" {
// cycle := js.AsNumber()
// if cycle > 0 {
// globalConfig.Cycle = cycle
// }
// }
// }
// logger.Log("解析配置成功:", globalConfig)
// } else {
// logger.Warn("解析配置失败", ok)
// }
// return true
//}
////////////////////////////////////////////////////////////////////////////////////////////
// 启动主函数
////////////////////////////////////////////////////////////////////////////////////////////
// 主计时器
var mainTicker *time.Ticker
// 主心跳计数器
var mainCounter = 0
// 任务的计时器
var taskTickers = make(map[string]*time.Ticker)
// 全部的任务清单
var taskList Tasks
// 清理计时器的步骤,默认无
var clearTaskTickersStep = CLEAN_NONE
func TaskServiceStart() {
if !prepared {
Prepare()
}
ch := make(chan int)
go startMainTicker()
go loadTasksList()
if globalConfig.PprofPort > 0 {
port := strconv.Itoa(globalConfig.PprofPort)
logger.Log("启动pprof调试服务器:", "localhost:" + port)
go func() {
http.ListenAndServe("localhost:" + port, nil)
}()
}
<-ch
// close(ch)
}
// 启动主计时器
func startMainTicker() {
logger.Log("主计时器启动")
// 如果主计时器不为空,先停止掉主计时器
stopMainMainTicker()
// 启动一个全局的计时器
mainTicker = time.NewTicker(time.Duration(1) * time.Second)
for {
select {
case <-mainTicker.C:
mainCounter += 1
// 10分钟输出一次心跳,表明自己没死
if mainCounter % globalConfig.TraceCycle == 0 {
logger.Warn("心跳计时器:", mainCounter, "任务数:", len(taskList.Tasks), "计时器数:", len(taskTickers))
}
if clearTaskTickersStep == CLEAN_NONE && mainCounter >= int(globalConfig.Cycle) {
// 开始清理计时器
clearTaskTickersStep = CLEAN_START
clearTickers()
// 这里的操作是阻塞的
}
// 已经完成清空计时器了
if clearTaskTickersStep == CLEAN_COMPLETE {
clearTaskTickersStep = CLEAN_NONE
logger.Debug("重启所有任务", "清理进程步骤:", clearTaskTickersStep)
ch := make(chan int)
go startMainTicker()
go loadTasksList()
<-ch
// close(ch)
}
}
}
}
// 增加一个停止主计时器的方法
func stopMainMainTicker() bool {
if mainTicker != nil {
mainTicker.Stop()
return true
}
return false
}
// 清理全部的计时器
func clearTickers() {
l := len(taskTickers)
logger.Log("需要清理的计时器数量", l)
if clearTaskTickersStep == CLEAN_START && l > 0 {
for key, ticker := range taskTickers {
logger.Debug("清空" + key + "任务计时器")
ticker.Stop()
delete(taskTickers, key)
}
}
mainCounter = 0
clearTaskTickersStep = CLEAN_COMPLETE
}
func loadTasksList() {
start := time.Now()
resp, _ := HttpGet(globalConfig.TasksUrl)
err := json.Unmarshal([]byte(resp), &taskList)
complete := time.Now()
if err != nil {
// 解析任务清单失败,就直接退出主进程。
logger.Error("解析任务清单失败", complete.Sub(start), err)
stopMainMainTicker()
os.Exit(1)
} else {
logger.Log("解析任务清单成功", complete.Sub(start))
t := taskList.Tasks
l := len(taskList.Tasks)
var names = make([]string, l)
for i := 0; i < len(t); i++ {
names[i] = taskList.Tasks[i].Name
}
// 开始执行任务
taskList.start()
}
}
func (self *Tasks) start() {
mainCounter = 0
logger.Log("全任务启动")
count := len(self.Tasks)
ch := make(chan int)
for i := 0; i < count; i++ {
go self.Tasks[i].start()
}
<-ch
// close(ch)
}
func (task *Task) start() {
if task.Delay <= 0 {
ch := make(chan int)
go task.request()
go task.startTicker()
<-ch
} else {
logger.Log(task.Name, "延迟", task.Delay, "秒开始")
ch := make(chan int)
timer := time.NewTimer(time.Duration(task.Delay) * time.Second)
<- timer.C
go task.request()
go task.startTicker()
task.Delay = 0
<-ch
}
// close(ch)
}
func (task *Task) GetEventName() string {
if len(task.EventName) > 0 {
return task.EventName
}
return task.Name
}
func (task *Task) request() {
start := time.Now()
logger.Log(task.Name, "开始", start)
resp, _ := HttpGet(task.Url)
complete := time.Now()
eventName := task.GetEventName()
if globalConfig.ShowComplete > 0 {
logger.Log(task.Name, "完成:", complete.Sub(start), "响应内容长度:", len(resp))
}
Emit("get:" + eventName, &TaskResult{ Id: start, Name: task.Name, Resp: resp })
if len(task.PostUrl) > 0 {
HttpPost(task.PostUrl, resp)
Emit("post:" + eventName, &TaskResult{ Id: start, Name: task.Name, Resp: resp })
}
}
func (task *Task) startTicker() {
if taskTickers[task.Name] != nil {
taskTickers[task.Name].Stop()
delete(taskTickers, task.Name)
}
ticker := time.NewTicker(time.Duration(task.Cycle) * time.Second)
taskTickers[task.Name] = ticker
for {
select {
case <-ticker.C:
// 这里如果不用go,则是堵塞的方式,他会堵塞计时器的触发
// task.request()
// 下面这个是非堵塞的模式,完全严格按照任务清单给的时间周期去执行
ch := make(chan int)
go task.request()
go task.startTicker()
<-ch
// close(ch)
}
}
}
func On(event, listener interface{}) *emission.Emitter {
return globalEE.On(event, listener)
}
func Once(event, listener interface{}) *emission.Emitter {
return globalEE.Once(event, listener)
}
func Off(event, listener interface{}) *emission.Emitter {
return globalEE.Off(event, listener)
}
func Emit(event interface{}, arguments ...interface{}) *emission.Emitter {
return globalEE.Emit(event, arguments...)
}
Go
1
https://gitee.com/janpoem/GoTasks.git
git@gitee.com:janpoem/GoTasks.git
janpoem
GoTasks
GoTasks
master

搜索帮助

14c37bed 8189591 565d56ea 8189591