1 Star 2 Fork 1

BingoLee / FileSync

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
fs_master.py 3.15 KB
一键复制 编辑 原始数据 按行查看 历史
BingoLee 提交于 2022-06-16 05:24 . update fs_master.py.
# -*- coding: UTF-8 -*-
"""
事件过滤处理模块
本模块功能:
1. 初始化并启动Slaves线程池管理类,
2. 定时从inotify消息队列中解析事件到任务到队列
"""
from time import sleep
import fs_global as Global
from fs_logger import Logger
from fs_slaves import Slaves
from fs_message import Sender
from fs_util import Singleton, Common
from fs_data import ConfigWrapper, TaskQueue, RetryQueue, StateInfo
class Master(Singleton):
def __init__(self):
self.slaves = None
self.thread_count = None
def steps(self):
try:
self.init_task()
self.handle_event()
self.slaves = Slaves(self.thread_count)
except Exception as e:
Logger.error(e)
return False
else:
return True
def init_task(self):
""" 初始化任务队列 """
count = int(ConfigWrapper.get_key_value('thread_count'))
if count < 1 or count > 100:
raise Exception('[fs_master] thread_count is invalid:%s' % count)
self.thread_count = count
limit_size = int(ConfigWrapper.get_key_value('sync_queue_size'))
TaskQueue.init(limit_size, count)
limit_size = int(ConfigWrapper.get_key_value('fail_queue_size'))
RetryQueue.init(limit_size)
def handle_event(self):
Common.start_thread(target=self.parse_task, args=())
@classmethod
def parse_task(cls, args=None):
"""
事件处理函数
死循环处理inotify原始事件;
如果事件是监控的同步文件,则直接将文件放入队列(同步文件)
否则将该事件的上级目录放入队列(同步目录)
参数: None
返回值: None
"""
event_list = Sender.send(Global.G_INOTIFY_EVENT_MSGID)
while 1:
while 1:
if not len(event_list):
break
event, path = event_list.pop(0).split()
size = Common.get_size(path) if Common.is_exists(path) else 0
Logger.debug("[fs_master] Event %s, Size %s, Path %s " % (event, size, path))
if not ConfigWrapper.is_listen_file(path) and not Common.is_dir(path):
path = Common.dirname(path)
TaskQueue.push_task(path)
""" 防止同一事件频繁同步,每次等待一段时间 """
sleep(float(ConfigWrapper.get_key_value('sync_period')))
def start(self):
self.slaves.start()
def status(self):
syncing, connect = self.slaves.status()
StateInfo.set_syncing_task(syncing)
StateInfo.set_connected_ip(connect)
StateInfo.set_waiting_task(TaskQueue.status())
StateInfo.set_retry_task(RetryQueue.status())
def reload(self):
"""
reload之前不存在的监听目录在reload之后存在,
则此目录需要加入任务队列
"""
Logger.info("[fs_master] appear listen: %s" % Global.G_APPEAR_LISTEN)
[TaskQueue.push_task(path) for path in Global.G_APPEAR_LISTEN]
def pause(self):
self.slaves.pause()
def resume(self):
self.slaves.resume()
1
https://gitee.com/BingoLee1/FileSync.git
git@gitee.com:BingoLee1/FileSync.git
BingoLee1
FileSync
FileSync
master

搜索帮助