1 Star 2 Fork 0

王勇涛 / littledog

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
littledog.py 8.32 KB
一键复制 编辑 原始数据 按行查看 历史
王勇涛 提交于 2021-11-15 10:13 . 2021-11-15
#!/usr/bin/env python3
"""
自用小工具
Author: 王勇涛
"""
import argparse
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time
import re
import os
import zipfile
from datetime import datetime
import shutil
from multiprocessing import Process
import psutil
from email import encoders
from email.header import Header
from email.mime.text import MIMEText
from email.utils import parseaddr, formataddr
import smtplib
import socket
def iszip(newfile):
if re.match(r'(.*-auto.zip)', newfile):
return newfile
else:
return 'not'
def deploy(zipFilePath):
"""
解压 -auto.zip 为后缀的压缩包,并将其移动到当前用户家目录下
:param zipFilePath: 目标压缩包绝对路径
:return: None
"""
with zipfile.ZipFile(zipFilePath, 'r') as newZip:
newZip.extractall(path=path)
shutil.move(zipFilePath, os.path.join(os.path.expanduser('~'), os.path.split(zipFilePath)[1]))
def deleteOld(path, withoutReruler):
dir = os.listdir(path)
for file in dir:
if not re.match(withoutReruler, file):
if os.path.isdir(os.path.join(path, file)):
shutil.rmtree(os.path.join(path, file), )
else:
os.remove(os.path.join(path, file))
def dumpOld(path, out2path):
"""
将目录中原来的非 -auto.zip 结尾的文件打包
:param path: 被监控的目录
:param out2path: 打包后存放的目录
:return: None
"""
oldFiles = []
oldDirs = []
zip = zipfile.ZipFile(out2path, 'w', allowZip64=True)
reruler = r'.*-auto.zip'
for file in os.listdir(path):
if not re.match(reruler, file):
# 如果目标不是目录
if not os.path.isdir(os.path.join(path, file)):
oldFiles.append(os.path.join(path, file))
# 如果目标是目录
else:
for dir, subdirs, files in os.walk(os.path.join(path, file)):
for fileItem in files:
oldDirs.append(os.path.join(dir, fileItem))
for dirItem in subdirs:
oldDirs.append(os.path.join(dir, dirItem))
for i in oldDirs:
zip.write(i, os.path.join(file, os.path.split(i)[1]))
for file in oldFiles:
zip.write(file, os.path.split(file)[1], zipfile.ZIP_DEFLATED)
zip.close()
# 删除被打包完的文件
deleteOld(path, reruler)
def autoDeploy(eventsrc_path):
"""
自动部署
:param eventsrc_path:填写事件路径event.src_path
:return:
"""
if iszip(eventsrc_path) != 'not':
oldProjCompressPath = os.path.join(backupPath, datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + '.zip')
dumpOld(path, oldProjCompressPath)
time.sleep(3)
deploy(eventsrc_path)
class Handler1(FileSystemEventHandler):
def on_modified(self, event):
pass
def on_created(self, event):
autoDeploy(event.src_path)
# print('created')
def on_moved(self, event):
autoDeploy(event.src_path)
def on_closed(self, event):
pass
def on_deleted(self, event):
pass
def Handler_start():
event_handler = Handler1()
obs = Observer()
obs.schedule(event_handler, path, recursive=False)
obs.start()
try:
while True:
time.sleep(1)
finally:
obs.stop()
obs.join()
def iscpuoverload():
if psutil.cpu_percent() > args.cpu:
c = 0
for i in range(10):
time.sleep(1)
if psutil.cpu_percent() > args.cpu:
c = c + 1
if c >= 7:
return 'is'
def ismemoryoverload():
if psutil.virtual_memory()[2] > float(args.memory):
c = 0
for i in range(10):
time.sleep(1)
if psutil.virtual_memory()[2] > float(args.memory):
c = c + 1
if c >= 7:
return 'is'
def _format_addr(s):
"""格式化邮件地址,不能简单地传入`name <addr@example.com>`,因为如果包含中文,需要通过`Header`对象进行编码。"""
name, addr = parseaddr(s)
return formataddr((Header(name, 'utf-8').encode(), addr))
def mail2admin(from_addr: str, password: str, to_addr: str, text: str):
"""
发送邮件给管理员
:param from_addr: 发送方邮件地址
:param password: 发送方邮箱密码
:param to_addr: 目标邮箱地址
:param text: 邮件内容,文本格式
"""
msg = MIMEText(text, 'plain', 'utf-8')
msg['From'] = _format_addr('服务器告警 <%s>' % from_addr)
msg['To'] = _format_addr('管理员 <%s>' % to_addr)
msg['Subject'] = Header('服务器告警', 'utf-8').encode()
# 输入SMTP服务器地址:
smtp_server = 'smtp.qq.com'
smtp_port = 465
server = smtplib.SMTP_SSL(smtp_server, smtp_port)
# server.set_debuglevel(1)
server.login(from_addr, password)
server.sendmail(from_addr, [to_addr], msg.as_string())
server.quit()
def report():
while True:
if iscpuoverload() and ismemoryoverload():
text = '''
机器:%s : %s
cpu 使用率大于 %s%% 连续 10 秒!!!
内存占用率大于 %s%% 连续 10 秒!!!
''' % (socket.gethostname(), socket.gethostbyname(socket.gethostname()), args.cpu, args.memory)
mail2admin('wangyongtao2000@qq.com', 'mxtsyhepwgeobbaf', 'wangyongtao2000@qq.com', text)
time.sleep(3600)
continue
if iscpuoverload():
text = '''
机器:%s : %s
cpu 使用率大于 %s%% 连续 10 秒!!!
''' % (socket.gethostname(), socket.gethostbyname(socket.gethostname()), args.cpu)
mail2admin('wangyongtao2000@qq.com', 'mxtsyhepwgeobbaf', 'wangyongtao2000@qq.com', text)
time.sleep(3600)
continue
elif ismemoryoverload():
text = '''
机器:%s : %s
内存占用率大于 %s%% 连续 10 秒!!!
''' % (socket.gethostname(), socket.gethostbyname(socket.gethostname()), args.memory)
mail2admin('wangyongtao2000@qq.com', 'mxtsyhepwgeobbaf', 'wangyongtao2000@qq.com', text)
time.sleep(3600)
continue
else:
continue
# 命令参数信息
parser = argparse.ArgumentParser(description='wyt自用小工具')
parser.add_argument('-a', '--deploy', action='store_true',
help='开启自动部署')
parser.add_argument('-d', '--dir', nargs=1, metavar='绝对路径',
help='被监控的目录')
parser.add_argument('-b', '--backup', nargs=1, metavar='绝对路径', # default=os.path.expanduser('~'),
help='被监控目录的原本文件被打包后存放的目录')
parser.add_argument('-r', '--report', action='store_true',
help='开启服务器负载警告')
parser.add_argument('--memory', nargs='?', metavar='单位%', type=float, default=90.0,
help='内存占用百分比警告,默认90%%')
parser.add_argument('--cpu', nargs='?', metavar='单位%', type=float, default=80.0,
help='cpu 占用百分比警告,默认80%%')
args = parser.parse_args()
try:
if args.deploy:
# 被监控的目录
path = r'%s' % args.dir[0]
# 被监控目录的原本文件被打包后存放的目录
backupPath = r'%s' % args.backup[0]
except TypeError as e:
print('缺少参数-d以及-b')
exit(-1)
if __name__ == '__main__':
print('主进程ID:%s' % os.getpid())
if args.deploy:
p1 = Process(target=Handler_start)
p1.start()
print('开启自动部署')
if args.dir:
print('被监控目录为 %s' % args.dir[0])
if args.backup:
print('被监控目录的原本文件被打包后存放的目录 %s' % args.backup[0])
if args.report:
p2 = Process(target=report)
p2.start()
print('开启服务器负载警告')
if args.memory >= 0:
print('内存占用百分比警告 %s %% ' % args.memory)
if args.cpu >= 0:
print('cpu 占用百分比警告 %s %%' % args.cpu)
# p1 = Process(target=Handler_start)
# p2 = Process(target=report)
# p1.start()
# p1.join()
# p2.start()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/M4n5ter/littledog.git
git@gitee.com:M4n5ter/littledog.git
M4n5ter
littledog
littledog
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891