1 Star 5 Fork 8

1264644959 / blog

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
handle.py 7.52 KB
一键复制 编辑 原始数据 按行查看 历史
1264644959 提交于 2020-10-16 09:28 . commit
"""
企业信息入库 mongo 脚本
"""
import hashlib
import json
import os
import threading
import time
import uuid
from queue import Queue
import pandas as pd
import docx
import pdfplumber
import redis
from pymongo import MongoClient
from win32com import client as wc
class Handle():
def __init__(self):
self.root_path = r'D:\bfy\my_code\task_3\all'
self.wenjian_list = []
self.word = wc.Dispatch('Word.Application')
self.mongo_conn = MongoClient(host = "")
self.redis_conn = redis.Redis(host="",db=0, port=6379)
self.gaoqi_db = self.mongo_conn.gaoqi_db
self.gaoqi_set = self.gaoqi_db.gaoqi_set
self.num = 0
self.wenjan_queue = Queue()
def main(self,wenjian):
try:
wenjian = wenjian[0]
print(wenjian)
if wenjian.endswith('.pdf'):
self.pdf_table(wenjian)
if wenjian.endswith('.docx'):
self.docx_table(wenjian)
except Exception as e:
print('===出错===')
print(e)
print('===处理完毕===')
def token(self,name):
token_str = name.replace(self.root_path,'')
print(token_str)
token_list = token_str.split('\\')
token_list.remove("")
print(token_list)
token = ""
for i in token_list:
token += i
data = {}
data['province'] = token_list[0]
data['time'] = token_list[1]
# data['describe'] = token_list[2]
# data['details'] = token_list[3]
# data['token'] = token
return data
def existsd(self,name):
token_str = name.replace(self.root_path, '')
print(token_str)
token_list = token_str.split('\\')
token_list.remove("")
print(token_list)
token = ""
for i in token_list:
token += i
token = hashlib.md5(token.encode('utf-8')).hexdigest()
return token
def pdf_table(self,name):
try:
print(name)
data = self.token(name)
token = self.existsd(name)
if self.redis_conn.sismember('gaoqi_token',token):
print('====此文件已经解析过===')
return
print(data)
with pdfplumber.open(name) as pdf:
for page in pdf.pages:
for table in page.extract_tables():
df = pd.DataFrame(table[:],columns=table[0])
for i in df.index:
company_name = df.iloc[i,1]
zhengshu_id = df.iloc[i,2]
res = self.gaoqi_set.find({'zhengshu_id': zhengshu_id})
if len(list(res)):
print('===本条数据已经存在于数据库中===')
data['company_name'] = ""
data['zhengshu_id'] = ""
continue
data['company_name'] = company_name
data['zhengshu_id'] = zhengshu_id
data['_id'] = uuid.uuid1()
print(data)
self.gaoqi_set.insert_one(data)
data['company_name'] = ""
data['zhengshu_id'] = ""
print('===存储成功===')
self.num += 1
self.redis_conn.sadd('gaoqi_token', token)
except Exception as e:
print('===存储失败==')
print(e)
def docx_table(self,name):
try:
print(name)
data = self.token(name)
token = self.existsd(name)
if self.redis_conn.sismember('gaoqi_token', token):
print('====此文件已经解析过===')
return
print(data)
doc = docx.Document(name)
for table in doc.tables:
for row in table.rows[1::]:
print('================================')
company_name = row.cells[1].text
zhengshu_id = row.cells[2].text
print(company_name,zhengshu_id)
res = self.gaoqi_set.find({'zhengshu_id': zhengshu_id})
if len(list(res)):
print('===本条数据已经存在于数据库中===')
data['company_name'] = ""
data['zhengshu_id'] = ""
continue
data['company_name'] = company_name
data['zhengshu_id'] = zhengshu_id
data['_id'] = uuid.uuid1()
print(data)
self.gaoqi_set.insert_one(data)
data['company_name'] = ""
data['zhengshu_id'] = ""
print('===存储成功===')
self.num += 1
self.redis_conn.sadd('gaoqi_token', token)
except Exception as e:
print('===存储失败==')
print(e)
# doc = docx.Document(name)
# print(666)
# doc.save('666.docx')
# doc.SaveAs('aaa',12)
# for paragraph in doc.paragraphs:
# print(paragraph.text)
# for table in doc.tables:
# print(len(table.rows))
def traverse(self,d):
dirs = os.listdir(d) # 获取当前目录的文件列表
for item in dirs:
absPath = os.path.join(d, item) # 拼接绝对路径
# print(absPath)
if os.path.isdir(absPath): # 判断是否是目录
self.traverse(absPath) # 递归调用函数
else:
self.wenjian_list.append((absPath,os.path.basename(absPath)))
def dox_docx(self):
self.traverse(self.root_path)
print('===文件总数===')
print(len(self.wenjian_list))
for wenjian in self.wenjian_list:
print(wenjian)
try:
if wenjian[0].endswith('.doc'):
doc = self.word.Documents.Open(wenjian[0])
name = os.path.dirname(wenjian[0]) + '\\' + wenjian[1] + 'x'
print(name)
doc.SaveAs(name, 12, False, "", True, "", False, False, False, False)
# doc.Close()
print('===处理成功===')
print('===删除文件===')
os.remove(wenjian[0])
except Exception as e:
print('===出错===')
print(e)
continue
print("文件总数{}个,处理了{}个".format(len(self.wenjian_list),self.num))
def dxc(self,num):
self.traverse(self.root_path)
print(self.wenjian_list)
threads_num = num
threads = []
for url in self.wenjian_list:
self.wenjan_queue.put(url)
for i in range(threads_num):
t = threading.Thread(target=self.my_target)
t.start()
threads.append(t)
self.wenjan_queue.join()
for i in range(threads_num):
self.wenjan_queue.put(None)
for t in threads:
t.join()
def my_target(self):
while 1:
wenjian = self.wenjan_queue.get()
if wenjian is None:
break
self.main(wenjian)
self.wenjan_queue.task_done()
self.wenjan_queue.task_done()
if __name__ == '__main__':
handle = Handle()
# handle.dox_docx()
handle.dxc(20)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/super__man/blog.git
git@gitee.com:super__man/blog.git
super__man
blog
blog
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891