1 Star 2 Fork 0

宇宙蒙面侠X / antlinker-flow

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
engine.go 16.07 KB
一键复制 编辑 原始数据 按行查看 历史
Lyric 提交于 2019-01-17 16:57 . 优化流程查询
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
package flow
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/antlinker/flow/bll"
"github.com/antlinker/flow/register"
"github.com/antlinker/flow/schema"
"github.com/antlinker/flow/service/db"
"github.com/antlinker/flow/util"
"github.com/facebookgo/inject"
"github.com/pkg/errors"
)
// Logger 定义日志接口
type Logger interface {
Errorf(format string, args ...interface{})
}
// AutoCallbackHandler 自动执行节点回调处理
type AutoCallbackHandler func(action, flag, userID string, input []byte, result *HandleResult) error
// Engine 流程引擎
type Engine struct {
flowBll *bll.Flow
parser Parser
execer Execer
logger Logger
timingStart bool
timingTicker *time.Ticker
timingWg *sync.WaitGroup
getDBContext func(flag string) context.Context
autoCallback AutoCallbackHandler
}
// Init 初始化流程引擎
func (e *Engine) Init(parser Parser, execer Execer, sqlDB *sql.DB, trace bool) (*Engine, error) {
var (
g inject.Graph
flowBll bll.Flow
)
db := db.NewMySQLWithDB(sqlDB, trace)
err := g.Provide(&inject.Object{Value: db},
&inject.Object{Value: &flowBll})
if err != nil {
return e, err
}
err = g.Populate()
if err != nil {
return e, err
}
register.FlowDBMap(db)
err = db.CreateTablesIfNotExists()
if err != nil {
return e, err
}
e.flowBll = &flowBll
e.parser = parser
e.execer = execer
return e, nil
}
// SetParser 设定解析器
func (e *Engine) SetParser(parser Parser) {
e.parser = parser
}
// SetExecer 设定表达式执行器
func (e *Engine) SetExecer(execer Execer) {
e.execer = execer
}
// SetLogger 设定日志接口
func (e *Engine) SetLogger(logger Logger) {
e.logger = logger
}
// SetGetDBContext 设定获取DB上下文
func (e *Engine) SetGetDBContext(fn func(flag string) context.Context) {
e.getDBContext = fn
}
// SetAutoCallback 设定自动节点回调函数
func (e *Engine) SetAutoCallback(callback AutoCallbackHandler) {
e.autoCallback = callback
}
// FlowBll 流程业务
func (e *Engine) FlowBll() *bll.Flow {
return e.flowBll
}
func (e *Engine) errorf(format string, args ...interface{}) {
if e.logger != nil {
e.logger.Errorf(format, args...)
}
}
// StartTiming 启动定时器
func (e *Engine) StartTiming(interval time.Duration) {
if e.timingStart {
return
}
e.timingStart = true
e.timingWg = new(sync.WaitGroup)
e.timingTicker = time.NewTicker(interval)
go func() {
for range e.timingTicker.C {
err := e.handleTiming()
if err != nil {
e.errorf("处理定时任务发生错误:%v", err)
}
}
}()
}
func (e *Engine) handleTiming() error {
defer func() {
if err := recover(); err != nil {
e.errorf("处理定时任务发生崩溃:%v", err)
}
}()
items, err := e.flowBll.QueryExpiredNodeTiming()
if err != nil {
return err
}
for _, item := range items {
err = e.handleExpiredNodeTiming(item)
if err != nil {
return err
}
}
return nil
}
// 处理定时节点
func (e *Engine) handleExpiredNodeTiming(item *schema.NodeTiming) error {
e.timingWg.Add(1)
defer e.timingWg.Done()
ni, err := e.flowBll.GetNodeInstance(item.NodeInstanceID)
if err != nil {
return err
} else if ni == nil || ni.Status != 1 {
return nil
}
ctx := context.Background()
if fn := e.getDBContext; fn != nil {
ctx = fn(item.Flag)
}
if item.Input != "" {
var v map[string]interface{}
_ = json.Unmarshal([]byte(item.Input), &v)
if ni.InputData != "" {
iv := make(map[string]interface{})
_ = json.Unmarshal([]byte(ni.InputData), &iv)
for key, val := range v {
iv[key] = val
}
v = iv
}
buf, _ := json.Marshal(v)
ni.InputData = string(buf)
}
result, err := e.HandleFlow(ctx, item.NodeInstanceID, item.Processor, []byte(ni.InputData))
if err != nil {
return err
}
err = e.flowBll.DeleteNodeTiming(item.NodeInstanceID)
if err != nil {
return err
}
if fn := e.autoCallback; fn != nil {
err = fn("", item.Flag, item.Processor, []byte(ni.InputData), result)
if err != nil {
return err
}
}
return nil
}
// StopTiming 停止定时器
func (e *Engine) StopTiming() {
if !e.timingStart {
return
}
e.timingStart = false
e.timingTicker.Stop()
e.timingWg.Wait()
}
// 读取XML文件
func (e *Engine) parseFile(name string) ([]byte, error) {
fullName, err := filepath.Abs(name)
if err != nil {
return nil, err
}
file, err := os.Open(fullName)
if err != nil {
return nil, err
}
defer file.Close()
data, err := ioutil.ReadAll(file)
if err != nil {
return nil, err
}
return data, nil
}
// LoadFile 加载文件数据
func (e *Engine) LoadFile(name string) error {
data, err := e.parseFile(name)
if err != nil {
return err
}
_, err = e.CreateFlow(data)
return err
}
func (e *Engine) parseFormOperating(formOperating *schema.FormOperating, flow *schema.Flow, node *schema.Node, formResult *NodeFormResult) {
if formResult.ID == "" {
return
}
for _, f := range formOperating.FormGroup {
if f.Code == formResult.ID {
node.FormID = f.RecordID
return
}
}
form := &schema.Form{
RecordID: util.UUID(),
FlowID: flow.RecordID,
Code: formResult.ID,
TypeCode: "META",
Created: flow.Created,
}
// 解析URL类型
if fields := formResult.Fields; len(fields) == 2 {
if fields[0].ID == "type_code" &&
fields[0].DefaultValue == "URL" &&
fields[1].ID == "data" {
form.TypeCode = "URL"
form.Data = fields[1].DefaultValue
formOperating.FormGroup = append(formOperating.FormGroup, form)
node.FormID = form.RecordID
return
}
}
meta, _ := json.Marshal(formResult.Fields)
form.Data = string(meta)
for _, ff := range formResult.Fields {
field := &schema.FormField{
RecordID: util.UUID(),
FormID: form.RecordID,
Code: ff.ID,
Label: ff.Label,
TypeCode: ff.Type,
DefaultValue: ff.DefaultValue,
Created: flow.Created,
}
for _, item := range ff.Values {
formOperating.FieldOptionGroup = append(formOperating.FieldOptionGroup, &schema.FieldOption{
RecordID: util.UUID(),
FieldID: field.RecordID,
ValueID: item.ID,
ValueName: item.Name,
Created: flow.Created,
})
}
for _, item := range ff.Properties {
formOperating.FieldPropertyGroup = append(formOperating.FieldPropertyGroup, &schema.FieldProperty{
RecordID: util.UUID(),
FieldID: field.RecordID,
Code: item.ID,
Value: item.Value,
Created: flow.Created,
})
}
for _, item := range ff.Validations {
formOperating.FieldValidationGroup = append(formOperating.FieldValidationGroup, &schema.FieldValidation{
RecordID: util.UUID(),
FieldID: field.RecordID,
ConstraintName: item.Name,
ConstraintConfig: item.Config,
Created: flow.Created,
})
}
formOperating.FormFieldGroup = append(formOperating.FormFieldGroup, field)
}
formOperating.FormGroup = append(formOperating.FormGroup, form)
node.FormID = form.RecordID
}
// 创建节点操作
func (e *Engine) parseOperating(flow *schema.Flow, nodeResults []*NodeResult) (*schema.NodeOperating, *schema.FormOperating) {
nodeOperating := &schema.NodeOperating{
NodeGroup: make([]*schema.Node, len(nodeResults)),
}
formOperating := &schema.FormOperating{}
for i, n := range nodeResults {
node := &schema.Node{
RecordID: util.UUID(),
FlowID: flow.RecordID,
Code: n.NodeID,
Name: n.NodeName,
TypeCode: n.NodeType.String(),
OrderNum: strconv.FormatInt(int64(i+10), 10),
Created: flow.Created,
}
if n.FormResult != nil {
e.parseFormOperating(formOperating, flow, node, n.FormResult)
}
for _, exp := range n.CandidateExpressions {
nodeOperating.AssignmentGroup = append(nodeOperating.AssignmentGroup, &schema.NodeAssignment{
RecordID: util.UUID(),
NodeID: node.RecordID,
Expression: exp,
Created: flow.Created,
})
}
nodeOperating.NodeGroup[i] = node
}
var getNodeRecordID = func(nodeCode string) string {
for _, n := range nodeOperating.NodeGroup {
if n.Code == nodeCode {
return n.RecordID
}
}
return ""
}
for _, n := range nodeResults {
for _, r := range n.Routers {
nodeOperating.RouterGroup = append(nodeOperating.RouterGroup, &schema.NodeRouter{
RecordID: util.UUID(),
SourceNodeID: getNodeRecordID(n.NodeID),
TargetNodeID: getNodeRecordID(r.TargetNodeID),
Expression: r.Expression,
Explain: r.Explain,
Created: flow.Created,
})
}
// 增加节点属性
for _, p := range n.Properties {
nodeOperating.PropertyGroup = append(nodeOperating.PropertyGroup, &schema.NodeProperty{
RecordID: util.UUID(),
NodeID: getNodeRecordID(n.NodeID),
Name: p.Name,
Value: p.Value,
Created: flow.Created,
})
}
}
return nodeOperating, formOperating
}
// CreateFlow 创建流程数据
func (e *Engine) CreateFlow(data []byte) (string, error) {
result, err := e.parser.Parse(context.Background(), data)
if err != nil {
return "", err
}
// 检查流程是否存在,如果存在则检查版本号是否一致,如果不一致则创建新流程
oldFlow, err := e.flowBll.GetFlowByCode(result.FlowID)
if err != nil {
return "", err
} else if oldFlow != nil {
if result.FlowVersion <= oldFlow.Version {
return oldFlow.RecordID, nil
}
}
flow := &schema.Flow{
RecordID: util.UUID(),
Code: result.FlowID,
Name: result.FlowName,
Version: result.FlowVersion,
XML: string(data),
Status: result.FlowStatus,
Created: time.Now().Unix(),
}
nodeOperating, formOperating := e.parseOperating(flow, result.Nodes)
// 解析节点表单数据
for _, node := range result.Nodes {
// 查找表单ID不为空并且不包含表单字段的节点
if node.FormResult != nil && node.FormResult.ID != "" && len(node.FormResult.Fields) == 0 {
// 查找表单ID
var formID string
for _, form := range formOperating.FormGroup {
if form.Code == node.FormResult.ID {
formID = form.RecordID
break
}
}
if formID != "" {
for i, ns := range nodeOperating.NodeGroup {
if ns.Code == node.NodeID {
nodeOperating.NodeGroup[i].FormID = formID
break
}
}
}
}
}
err = e.flowBll.CreateFlow(flow, nodeOperating, formOperating)
if err != nil {
return "", err
}
return flow.RecordID, nil
}
// HandleResult 处理结果
type HandleResult struct {
IsEnd bool `json:"is_end"` // 是否结束
NextNodes []*NextNode `json:"next_nodes"` // 下一处理节点
FlowInstance *schema.FlowInstance `json:"flow_instance"` // 流程实例
}
func (r *HandleResult) String() string {
b, _ := json.Marshal(r)
return string(b)
}
// NextNode 下一节点
type NextNode struct {
Node *schema.Node // 节点信息
CandidateIDs []string // 节点候选人
NodeInstance *schema.NodeInstance // 节点实例
}
func (e *Engine) nextFlowHandle(ctx context.Context, nodeInstanceID, userID string, inputData []byte) (*HandleResult, error) {
var result HandleResult
var onNextNode = OnNextNodeOption(func(node *schema.Node, nodeInstance *schema.NodeInstance, nodeCandidates []*schema.NodeCandidate) {
var cids []string
for _, nc := range nodeCandidates {
cids = append(cids, nc.CandidateID)
}
result.NextNodes = append(result.NextNodes, &NextNode{
Node: node,
NodeInstance: nodeInstance,
CandidateIDs: cids,
})
})
var onFlowEnd = OnFlowEndOption(func(_ *schema.FlowInstance) {
result.IsEnd = true
})
nr, err := new(NodeRouter).Init(ctx, e, nodeInstanceID, inputData, onNextNode, onFlowEnd)
if err != nil {
return nil, err
}
err = nr.Next(userID)
if err != nil {
return nil, err
}
result.FlowInstance = nr.GetFlowInstance()
if !result.IsEnd {
for _, item := range result.NextNodes {
prop, verr := e.flowBll.GetNodeProperty(item.Node.RecordID)
if verr != nil {
return nil, verr
}
// 检查节点是否设定定时器,如果设定则加入定时
if v := prop["timing"]; v != "" {
expired, verr := strconv.Atoi(v)
if verr == nil && expired > 0 {
nt := &schema.NodeTiming{
NodeInstanceID: item.NodeInstance.RecordID,
Processor: item.CandidateIDs[0],
Input: prop["timing_input"],
ExpiredAt: time.Now().Add(time.Duration(expired) * time.Minute).Unix(),
Created: time.Now().Unix(),
}
if v, ok := FromFlagContext(ctx); ok {
nt.Flag = v
}
err = e.flowBll.CreateNodeTiming(nt)
if err != nil {
e.errorf("%+v", err)
}
}
}
}
}
return &result, nil
}
// StartFlow 启动流程
// flowCode 流程编号
// nodeCode 开始节点编号
// userID 发起人
// inputData 输入数据
func (e *Engine) StartFlow(ctx context.Context, flowCode, nodeCode, userID string, inputData []byte) (*HandleResult, error) {
nodeInstance, err := e.flowBll.LaunchFlowInstance(flowCode, nodeCode, userID, inputData)
if err != nil {
return nil, err
} else if nodeInstance == nil {
return nil, errors.New("未找到流程信息")
}
return e.nextFlowHandle(ctx, nodeInstance.RecordID, userID, inputData)
}
// LaunchFlow 发起流程(基于流程ID)
func (e *Engine) LaunchFlow(ctx context.Context, flowID, userID string, inputData []byte) (*HandleResult, error) {
_, ni, err := e.flowBll.LaunchFlowInstance2(flowID, userID, 1, inputData)
if err != nil {
return nil, err
}
return e.nextFlowHandle(ctx, ni.RecordID, userID, inputData)
}
// HandleFlow 处理流程节点
// nodeInstanceID 节点实例内码
// userID 处理人
// inputData 输入数据
func (e *Engine) HandleFlow(ctx context.Context, nodeInstanceID, userID string, inputData []byte) (*HandleResult, error) {
// 检查是否是节点候选人
exists, err := e.flowBll.CheckNodeCandidate(nodeInstanceID, userID)
if err != nil {
return nil, err
} else if !exists {
return nil, fmt.Errorf("无效的节点处理人")
}
nodeInstance, err := e.flowBll.GetNodeInstance(nodeInstanceID)
if err != nil {
return nil, err
} else if nodeInstance == nil || nodeInstance.Status != 1 {
return nil, fmt.Errorf("无效的处理节点")
}
return e.nextFlowHandle(ctx, nodeInstanceID, userID, inputData)
}
// StopFlow 停止流程
func (e *Engine) StopFlow(nodeInstanceID string, allowStop func(*schema.FlowInstance) bool) error {
flowInstance, err := e.flowBll.GetFlowInstanceByNode(nodeInstanceID)
if err != nil {
return err
} else if flowInstance == nil {
return errors.New("流程不存在")
}
if allowStop != nil && !allowStop(flowInstance) {
return errors.New("不允许停止流程")
}
return e.flowBll.StopFlowInstance(flowInstance.RecordID)
}
// StopFlowInstance 停止流程实例
func (e *Engine) StopFlowInstance(flowInstanceID string, allowStop func(*schema.FlowInstance) bool) error {
flowInstance, err := e.flowBll.GetFlowInstance(flowInstanceID)
if err != nil {
return err
}
if allowStop != nil && !allowStop(flowInstance) {
return errors.New("不允许停止流程")
}
return e.flowBll.StopFlowInstance(flowInstanceID)
}
// QueryTodoFlows 查询流程待办数据
// flowCode 流程编号
// userID 待办人
func (e *Engine) QueryTodoFlows(flowCode, userID string) ([]*schema.FlowTodoResult, error) {
return e.flowBll.QueryTodo("", flowCode, userID, 100)
}
// QueryFlowHistory 查询流程历史数据
// flowInstanceID 流程实例内码
func (e *Engine) QueryFlowHistory(flowInstanceID string) ([]*schema.FlowHistoryResult, error) {
return e.flowBll.QueryHistory(flowInstanceID)
}
// QueryDoneFlowIDs 查询已办理的流程实例ID列表
func (e *Engine) QueryDoneFlowIDs(flowCode, userID string) ([]string, error) {
return e.flowBll.QueryDoneIDs(flowCode, userID)
}
// QueryNodeCandidates 查询节点实例的候选人ID列表
func (e *Engine) QueryNodeCandidates(nodeInstanceID string) ([]string, error) {
candidates, err := e.flowBll.QueryNodeCandidates(nodeInstanceID)
if err != nil {
return nil, err
}
ids := make([]string, len(candidates))
for i, c := range candidates {
ids[i] = c.CandidateID
}
return ids, nil
}
// GetNodeInstance 获取节点实例
func (e *Engine) GetNodeInstance(nodeInstanceID string) (*schema.NodeInstance, error) {
return e.flowBll.GetNodeInstance(nodeInstanceID)
}
1
https://gitee.com/awol2010ex/antlinker-flow.git
git@gitee.com:awol2010ex/antlinker-flow.git
awol2010ex
antlinker-flow
antlinker-flow
develop

搜索帮助