1 Star 0 Fork 6

没什么好笑的 / rpa

forked from thor.lee / rpa 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
rpa.py 2.00 KB
一键复制 编辑 原始数据 按行查看 历史
from runtime.load import load_scripts
from runtime.task import TaskScheduler
from lib import excel
from lib.log import logger
import os
import time
class RPA(object):
def __init__(self, script_dir=None) -> None:
super().__init__()
if script_dir is None:
script_dir = os.path.join(os.getcwd(), "scripts")
flow_cls = load_scripts(script_dir)
self.scheduler = TaskScheduler(flow_cls)
def list_flow(self):
group_map = {}
for script_def in self.scheduler.scripts:
group_name = script_def.group
if group_name not in group_map:
group_map[group_name] = {
"title": group_name, "flows": []
}
d = script_def.to_dict()
del d["cls"]
del d["func"]
group_map[group_name]["flows"].append(d)
return list(group_map.values())
def submit_flow(self, flow_name, flow_args, execute_time=None):
"""
flow_name = 流程的唯一名称
flow_args = 流程的执行参数
execute_time = 定时执行时间, 秒为单位的时间戳
"""
if execute_time is not None:
if execute_time < time.time():
logger.error("定时执行时间不能早于当前时间")
return "FAIL"
#如果是设置位定时执行的任务,需要提前先执行precheck方法确保输入的参数都是正确的
script = list(filter(lambda x: str(x.name) == flow_name, self.scheduler.scripts))[0]
if script.cls:
flow = script.cls(**flow_args)
flow.precheck()
self.scheduler.submit(flow_name, flow_args, execute_time)
return "OK"
def list_user(self):
return excel.loads2dict("c:\\rpa\\账号.xlsx")
def get_password(self, uid):
users = excel.loads2dict("c:\\rpa\\账号.xlsx")
user = list(filter(lambda x: f'{x["序号"]}' == f"{uid}", users))[0]
return user["密码"]
1
https://gitee.com/xintao/rpa.git
git@gitee.com:xintao/rpa.git
xintao
rpa
rpa
main

搜索帮助