代码拉取完成,页面将自动刷新
同步操作将从 LepusGroup/lepus 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
#!/bin/env python
# -*-coding:utf-8-*-
import unittest
import pymysql
import os
from include.functions import (get_config,
get_item,
get_mysql_variables,
mysql_exec,
mysql_query,
send_mail,
add_alarm,
check_if_ok,
update_send_mail_status,
check_db_status,
update_db_status_init,
update_db_status,
update_check_time,
get_option,)
class TestFunctions(unittest.TestCase):
def setUp(self):
conn = pymysql.connect(host='127.0.0.1',
port=3306,
user='root',
passwd='test123')
self.data_dict = get_mysql_variables(conn)
def test_get_config(self):
group = 'monitor_server'
config_list = []
config_list.append(get_config(group, 'host'))
config_list.append(get_config(group, 'port'))
config_list.append(get_config(group, 'user'))
config_list.append(get_config(group, 'passwd'))
config_list.append(get_config(group, 'dbname'))
self.assertEqual(5, len(config_list))
def test_get_mysql_variables(self):
self.assertIsNotNone(self.data_dict)
def test_get_item(self):
version = get_item(self.data_dict, 'version')
self.assertEqual('5.7.25-log', version)
def test_mysql_exec(self):
sql = 'insert into test(name) values (%s)'
param = ('test')
mysql_exec(sql, param)
def test_mysql_query(self):
sql = 'select * from test'
result = mysql_query(sql)
self.assertIsNotNone(result)
def test_send_mail(self):
to_list = get_option('send_mail_to_list')
mailto_list = to_list.split(';')
sub = 'noreply'
content = 'test!'
result = send_mail(mailto_list, sub, content)
self.assertTrue(result)
def test_add_alarm(self):
add_alarm(server_id=1,
tags='test',
db_host='127.0.0.1',
db_port='3306',
create_time='2019-11-12 12:12:12',
db_type='MySQL',
alarm_item='none',
alarm_value='none',
level='normal',
message='naomal',
send_mail=0,
send_mail_to_list=''
)
def test_check_if_ok(self):
check_if_ok(server_id=1,
tags='test',
db_host='127.0.0.1',
db_port='3306',
create_time='2019-11-13 12:12:12',
db_type='mysql',
alarm_item='test',
alarm_value='test',
message='test',
send_mail=0,
send_mail_to_list='')
def test_update_send_mail_status(self):
update_send_mail_status(server=1,
db_type='mysql',
alarm_item='test',
send_mail=0,
send_mail_max_count=1)
def test_check_db_status(self):
check_db_status(server_id=1,
db_host='127.0.0.1',
db_port='3306',
tags='test',
db_type='mysql')
def test_update_db_status_init(self):
update_db_status_init(role='dba',
version='1.0.0',
db_host='127.0.0.1',
db_port='3306',
tags='test')
def test_update_db_status(self):
update_db_status(field='repl_delay',
value=1,
db_host='127.0.0.1',
db_port='3306',
alarm_time='2019-11-14 12:12:12',
alarm_item='test',
alarm_value='warning',
alarm_level='info')
def test_update_check_time(self):
update_check_time()
def test_get_option(self):
result = get_option(key='monitor')
self.assertEqual(result, '1')
if __name__ == '__main__':
unittest.main()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。