2 Star 2 Fork 0

绝世尘封 / SpiderWxContent

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
db.py 12.94 KB
一键复制 编辑 原始数据 按行查看 历史
绝世尘封 提交于 2022-08-25 11:06 . add
#!/usr/bin/python
# -*-coding:utf-8-*-
import pymysql
# from DBUtils.PooledDB import PooledDB
from dbutils.pooled_db import PooledDB
import logging
# 连接MySQL
import common
from dotenv import load_dotenv, find_dotenv
import os
load_dotenv(verbose=True)
class DbManager(object):
# 构造函数
def __init__(self):
self.conn = None
self.cur = None
self.POOL = PooledDB(
creator=pymysql,
maxconnections=20, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=5, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=0, # 链接池中最多闲置的链接,0和None不限制
maxusage=1, # 一个链接最多被重复使用的次数,None表示无限制
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
setsession=[],
host=os.getenv('HOSTNAME') if os.getenv('HOSTNAME') is not None else "localhost",
port=int(os.getenv('HOSTPORT')) if int(os.getenv('HOSTPORT')) is not None else 3306,
user=os.getenv('USERNAME') if os.getenv('USERNAME') is not None else "root",
password=os.getenv('PASSWORD') if os.getenv('PASSWORD') is not None else "",
database=os.getenv('DATABASE') if os.getenv('DATABASE') is not None else "schema",
charset=os.getenv('CHARSET') if os.getenv('CHARSET') is not None else "utf8",
)
# 连接数据库
def connectDatabase(self):
try:
self.conn = self.POOL.connection()
self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
return True
except:
logging.warning("connectDatabase failed")
return False
# 关闭数据库
def close(self):
if self.conn and self.cur:
self.cur.close()
self.conn.close()
return True
# 基本的执行SQL方法,下面几乎都调用这个
def execute(self, sql, params=None, exe_many=False):
res = self.connectDatabase()
if not res:
return False
cnt = 0
try:
if self.conn and self.cur:
if exe_many:
cnt = self.cur.executemany(sql, params)
else:
cnt = self.cur.execute(sql, params)
self.conn.commit()
except Exception as e:
logging.error("execute failed: " + sql)
logging.error(str(e) + "\n\n")
return False
self.close()
return cnt
################################################################
################ 以下为封装好的执行方法:表、字段方式 ################
################################################################
# 新增并返回新增ID
def table_insert(self, **kwargs):
"""
table:必填,表名,如:table="test_table"
data :必填,更新数据,字典类型,如:data={"aaa": "666'6", "bbb": "888"}
"""
table = kwargs["table"]
data = kwargs["data"]
sql = "insert into %s (" % table
fields = ""
values = []
flag = ""
for k, v in data.items():
fields += "%s," % k
values.append(str(v))
flag += "%s,"
fields = fields.rstrip(",")
values = tuple(values)
flag = flag.rstrip(",")
sql += fields + ") values (" + flag + ");"
logging.info("sql:\n{} [{}]\n".format(sql, values))
try:
self.execute(sql, values)
# 获取自增id
res = self.cur.lastrowid
return res
except:
self.conn.rollback()
# 修改数据并返回影响的行数
def table_update(self, **kwargs):
"""
table:必填,表名,如:table="test_table"
data :必填,更新数据,字典类型,如:data={"aaa": "666'6", "bbb": "888"}
where:必填,更新条件,字典类型用于=,如:where={"aaa": 333, "bbb": 2};字符串类型用于非等于判断,如:where="aaa>=333"
"""
table = kwargs["table"]
data = kwargs["data"]
where = kwargs["where"]
sql = "update %s set " % table
values = []
for k, v in data.items():
sql += "{}=%s,".format(k)
values.append(str(v))
sql = sql.rstrip(",")
sql += " where 1=1 "
if type(where) == dict:
for k, v in where.items():
sql += " and {} in (%s)".format(k)
values.append(str(v))
elif type(where) == str:
sql += " and %s" % where
sql += ";"
values = tuple(values)
logging.info("sql:\n{} [{}]\n".format(sql, values))
try:
self.execute(sql, values)
rowcount = self.cur.rowcount
return rowcount
except:
self.conn.rollback()
# 删除并返回影响行数
def table_delete(self, **kwargs):
"""
table:必填,表名,如:table="test_table"
where:必填,删除条件,字典类型用于=,如:where={"aaa": 333, "bbb": 2};字符串类型用于非等于判断,如:where="aaa>=333"
"""
table = kwargs["table"]
where = kwargs["where"]
sql = "delete from %s where 1=1" % (table)
values = []
if type(where) == dict:
for k, v in where.items():
sql += " and {} in (%s)".format(k)
values.append(str(v))
elif type(where) == str:
sql += " and %s" % where
sql += ";"
values = tuple(values)
logging.info("sql:\n{} [{}]\n".format(sql, values))
try:
self.execute(sql, values)
rowcount = self.cur.rowcount
return rowcount
except:
self.conn.rollback()
# 查一条数据
def table_select_one(self, **kwargs):
"""
table:必填,表名,如:table="test_table"
where:必填,查询条件,字典类型用于=,如:where={"aaa": 333, "bbb": 2};字符串类型用于非等于判断,如:where="aaa>=333"
field: 非必填,查询列名,字符串类型,如:field="aaa, bbb",不填默认*
order: 非必填,排序字段,字符串类型,如:order="ccc"
sort: 非必填,排序方式,字符串类型,如:sort="asc"或者"desc",不填默认asc
"""
table = kwargs["table"]
field = "field" in kwargs and kwargs["field"] or "*"
where = kwargs["where"]
order = "order" in kwargs and "order by " + kwargs["order"] or ""
sort = kwargs.get("sort", "asc")
if order == "":
sort = ""
sql = "select %s from %s where 1=1 " % (field, table)
values = []
if type(where) == dict:
for k, v in where.items():
sql += " and {} in (%s)".format(k)
values.append(str(v))
elif type(where) == str:
sql += " and %s" % where
sql += " %s %s limit 1;" % (order, sort)
values = tuple(values)
logging.info("sql:\n{} [{}]\n".format(sql, values))
try:
self.execute(sql, values)
data = self.cur.fetchone()
return data
except:
self.conn.rollback()
# 查批量数据
def table_select_many(self, **kwargs):
"""
table:必填,表名,如:table="test_table"
where:必填,查询条件,字典类型用于=,如:where={"aaa": 333, "bbb": 2};字符串类型用于非等于判断,如:where="aaa>=333"
field: 非必填,查询列名,字符串类型,如:field="aaa, bbb",不填默认*
order: 非必填,排序字段,字符串类型,如:order="ccc"
sort: 非必填,排序方式,字符串类型,如:sort="asc"或者"desc",不填默认asc
offset:非必填,偏移量,如翻页,不填默认0
limit: 非必填,条数,不填默认100
"""
table = kwargs["table"]
field = "field" in kwargs and kwargs["field"] or "*"
order = "order" in kwargs and "order by " + kwargs["order"] or ""
sort = kwargs.get("sort", "asc")
if order == "":
sort = ""
where = kwargs["where"]
offset = kwargs.get("offset", 0)
limit = kwargs.get("limit", 100)
sql = "select %s from %s where 1=1 " % (field, table)
values = []
if type(where) == dict:
for k, v in where.items():
sql += " and {} in (%s)".format(k)
values.append(str(v))
elif type(where) == str:
sql += " and %s" % where
values = tuple(values)
sql += " %s %s limit %s, %s;" % (order, sort, offset, limit)
logging.info("sql:\n{} [{}]\n".format(sql, values))
try:
self.execute(sql, values)
data = self.cur.fetchall()
return data
except:
self.conn.rollback()
# 查条数
def table_count(self, **kwargs):
"""
table:必填,表名,如:table="test_table"
where:必填,查询条件,字典类型用于=,如:where={"aaa": 333, "bbb": 2};字符串类型用于非等于判断,如:where="aaa>=333"
"""
table = kwargs["table"]
where = kwargs["where"]
sql = "select count(1) as count from %s where 1=1 " % (table)
values = []
if type(where) == dict:
for k, v in where.items():
sql += " and {} in (%s)".format(k)
values.append(str(v))
elif type(where) == str:
sql += " and %s;" % where
values = tuple(values)
logging.info("sql:\n{} \n".format(sql))
try:
self.execute(sql, values)
data = self.cur.fetchone()
return data
except:
self.conn.rollback()
def count_news():
print("开始统计")
# 示例
mysqldb = DbManager()
# 更新统计 查询当天的记录
spider_news_count = mysqldb.table_count(table="news",
where="create_time >='" + common.TodayTime() + "'")
spider_log = mysqldb.table_count(table="spider_log",
where="create_time >='" + common.TodayTime() + "'")
count = spider_news_count['count']
if spider_log['count']:
mysqldb.table_update(table="spider_log", data={"spider_news_count": count},
where="create_time >='" + common.TodayTime() + "'")
else:
mysqldb.table_insert(table="spider_log",
data={"spider_news_count": count})
if __name__ == "__main__":
# 示例
mysqldb = DbManager()
# 更新统计 查询当天的记录
# mysqldb = db.DbManager()
spider_news_count = mysqldb.table_select_one(table="news",
where="create_time >='" + common.TodayTime() + "'")
print(spider_news_count)
if spider_news_count:
count = spider_news_count['spider_news_count'] + 1
print(count)
mysqldb.table_update(table="spider_log", data={"spider_news_count": count},
where={"id": spider_news_count['id']})
else:
count = 0
mysqldb.table_insert(table="spider_log",
data={"spider_news_count": count})
# ###################### ↓↓↓ 表、字段方式操作示例 ↓↓↓ ######################
# 插入数据
# i = mysqldb.table_insert(table="news", data={"cid": '1', "title": "测试标题", "content": "hello python","addtime":"2022-07-25 09:10:00",'author':"万人迷","source":"央视网",'status':"1"})
# print("自增ID:", i)
# exit()
# 更新数据
# c = mysqldb.table_update(table="spider_log", data={"spider_news_count": 1}, where={"id": 1})
# print("更新行数:", c)
# exit()
# 删除数据
# c = mysqldb.table_delete(table="test_table", where={"aaa": "666'6", "bbb": 777})
# print("删除行数:", c)
# 查询一条 buyPrice >= 90 AND buyPrice <= 100 common.TodayTime()
s = mysqldb.table_select_one(table="spider_log",where="create_time >='"+common.TodayTime()+"'")
if s:
print(1)
else:
count = 0
mysqldb.table_insert(table="spider_log",
data={"spider_news_count": count})
print(s)
exit()
# 批量查询,默认查询100条
# 完整参数示例
l = mysqldb.table_select_many(table="test_table", field="aaa, bbb", where="aaa>333", order="ccc", sort="desc", offset=25, limit=20)
print(l)
# 必填参数示例
l = mysqldb.table_select_many(table="test_table", field="aaa, bbb", where={"aaa": 333, "bbb": 2})
print(l)
# 查条数
count = mysqldb.table_count(table="test_table", where={"aaa": 333, "bbb": 2})
print(count)
Python
1
https://gitee.com/kaifakaixin.com/SpiderWxContent.git
git@gitee.com:kaifakaixin.com/SpiderWxContent.git
kaifakaixin.com
SpiderWxContent
SpiderWxContent
wkf

搜索帮助