代码拉取完成,页面将自动刷新
# -*- coding: UTF-8 -*-
"""
事件过滤处理模块
本模块功能:
1. 初始化并启动Slaves线程池管理类,
2. 定时从inotify消息队列中解析事件到任务到队列
"""
from time import sleep
import fs_global as Global
from fs_logger import Logger
from fs_slaves import Slaves
from fs_message import Sender
from fs_util import Singleton, Common
from fs_data import ConfigWrapper, TaskQueue, RetryQueue, StateInfo
class Master(Singleton):
def __init__(self):
self.slaves = None
self.thread_count = None
def steps(self):
try:
self.init_task()
self.handle_event()
self.slaves = Slaves(self.thread_count)
except Exception as e:
Logger.error(e)
return False
else:
return True
def init_task(self):
""" 初始化任务队列 """
count = int(ConfigWrapper.get_key_value('thread_count'))
if count < 1 or count > 100:
raise Exception('[fs_master] thread_count is invalid:%s' % count)
self.thread_count = count
limit_size = int(ConfigWrapper.get_key_value('sync_queue_size'))
TaskQueue.init(limit_size, count)
limit_size = int(ConfigWrapper.get_key_value('fail_queue_size'))
RetryQueue.init(limit_size)
def handle_event(self):
Common.start_thread(target=self.parse_task, args=())
@classmethod
def parse_task(cls, args=None):
"""
事件处理函数
死循环处理inotify原始事件;
如果事件是监控的同步文件,则直接将文件放入队列(同步文件)
否则将该事件的上级目录放入队列(同步目录)
参数: None
返回值: None
"""
event_list = Sender.send(Global.G_INOTIFY_EVENT_MSGID)
while 1:
while 1:
if not len(event_list):
break
event, path = event_list.pop(0).split()
size = Common.get_size(path) if Common.is_exists(path) else 0
Logger.debug("[fs_master] Event %s, Size %s, Path %s " % (event, size, path))
if not ConfigWrapper.is_listen_file(path) and not Common.is_dir(path):
path = Common.dirname(path)
TaskQueue.push_task(path)
""" 防止同一事件频繁同步,每次等待一段时间 """
sleep(float(ConfigWrapper.get_key_value('sync_period')))
def start(self):
self.slaves.start()
def status(self):
syncing, connect = self.slaves.status()
StateInfo.set_syncing_task(syncing)
StateInfo.set_connected_ip(connect)
StateInfo.set_waiting_task(TaskQueue.status())
StateInfo.set_retry_task(RetryQueue.status())
def reload(self):
"""
reload之前不存在的监听目录在reload之后存在,
则此目录需要加入任务队列
"""
Logger.info("[fs_master] appear listen: %s" % Global.G_APPEAR_LISTEN)
[TaskQueue.push_task(path) for path in Global.G_APPEAR_LISTEN]
def pause(self):
self.slaves.pause()
def resume(self):
self.slaves.resume()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。