代码拉取完成,页面将自动刷新
"""
企业信息入库 mongo 脚本
"""
import hashlib
import json
import os
import threading
import time
import uuid
from queue import Queue
import pandas as pd
import docx
import pdfplumber
import redis
from pymongo import MongoClient
from win32com import client as wc
class Handle():
def __init__(self):
self.root_path = r'D:\bfy\my_code\task_3\all'
self.wenjian_list = []
self.word = wc.Dispatch('Word.Application')
self.mongo_conn = MongoClient(host = "")
self.redis_conn = redis.Redis(host="",db=0, port=6379)
self.gaoqi_db = self.mongo_conn.gaoqi_db
self.gaoqi_set = self.gaoqi_db.gaoqi_set
self.num = 0
self.wenjan_queue = Queue()
def main(self,wenjian):
try:
wenjian = wenjian[0]
print(wenjian)
if wenjian.endswith('.pdf'):
self.pdf_table(wenjian)
if wenjian.endswith('.docx'):
self.docx_table(wenjian)
except Exception as e:
print('===出错===')
print(e)
print('===处理完毕===')
def token(self,name):
token_str = name.replace(self.root_path,'')
print(token_str)
token_list = token_str.split('\\')
token_list.remove("")
print(token_list)
token = ""
for i in token_list:
token += i
data = {}
data['province'] = token_list[0]
data['time'] = token_list[1]
# data['describe'] = token_list[2]
# data['details'] = token_list[3]
# data['token'] = token
return data
def existsd(self,name):
token_str = name.replace(self.root_path, '')
print(token_str)
token_list = token_str.split('\\')
token_list.remove("")
print(token_list)
token = ""
for i in token_list:
token += i
token = hashlib.md5(token.encode('utf-8')).hexdigest()
return token
def pdf_table(self,name):
try:
print(name)
data = self.token(name)
token = self.existsd(name)
if self.redis_conn.sismember('gaoqi_token',token):
print('====此文件已经解析过===')
return
print(data)
with pdfplumber.open(name) as pdf:
for page in pdf.pages:
for table in page.extract_tables():
df = pd.DataFrame(table[:],columns=table[0])
for i in df.index:
company_name = df.iloc[i,1]
zhengshu_id = df.iloc[i,2]
res = self.gaoqi_set.find({'zhengshu_id': zhengshu_id})
if len(list(res)):
print('===本条数据已经存在于数据库中===')
data['company_name'] = ""
data['zhengshu_id'] = ""
continue
data['company_name'] = company_name
data['zhengshu_id'] = zhengshu_id
data['_id'] = uuid.uuid1()
print(data)
self.gaoqi_set.insert_one(data)
data['company_name'] = ""
data['zhengshu_id'] = ""
print('===存储成功===')
self.num += 1
self.redis_conn.sadd('gaoqi_token', token)
except Exception as e:
print('===存储失败==')
print(e)
def docx_table(self,name):
try:
print(name)
data = self.token(name)
token = self.existsd(name)
if self.redis_conn.sismember('gaoqi_token', token):
print('====此文件已经解析过===')
return
print(data)
doc = docx.Document(name)
for table in doc.tables:
for row in table.rows[1::]:
print('================================')
company_name = row.cells[1].text
zhengshu_id = row.cells[2].text
print(company_name,zhengshu_id)
res = self.gaoqi_set.find({'zhengshu_id': zhengshu_id})
if len(list(res)):
print('===本条数据已经存在于数据库中===')
data['company_name'] = ""
data['zhengshu_id'] = ""
continue
data['company_name'] = company_name
data['zhengshu_id'] = zhengshu_id
data['_id'] = uuid.uuid1()
print(data)
self.gaoqi_set.insert_one(data)
data['company_name'] = ""
data['zhengshu_id'] = ""
print('===存储成功===')
self.num += 1
self.redis_conn.sadd('gaoqi_token', token)
except Exception as e:
print('===存储失败==')
print(e)
# doc = docx.Document(name)
# print(666)
# doc.save('666.docx')
# doc.SaveAs('aaa',12)
# for paragraph in doc.paragraphs:
# print(paragraph.text)
# for table in doc.tables:
# print(len(table.rows))
def traverse(self,d):
dirs = os.listdir(d) # 获取当前目录的文件列表
for item in dirs:
absPath = os.path.join(d, item) # 拼接绝对路径
# print(absPath)
if os.path.isdir(absPath): # 判断是否是目录
self.traverse(absPath) # 递归调用函数
else:
self.wenjian_list.append((absPath,os.path.basename(absPath)))
def dox_docx(self):
self.traverse(self.root_path)
print('===文件总数===')
print(len(self.wenjian_list))
for wenjian in self.wenjian_list:
print(wenjian)
try:
if wenjian[0].endswith('.doc'):
doc = self.word.Documents.Open(wenjian[0])
name = os.path.dirname(wenjian[0]) + '\\' + wenjian[1] + 'x'
print(name)
doc.SaveAs(name, 12, False, "", True, "", False, False, False, False)
# doc.Close()
print('===处理成功===')
print('===删除文件===')
os.remove(wenjian[0])
except Exception as e:
print('===出错===')
print(e)
continue
print("文件总数{}个,处理了{}个".format(len(self.wenjian_list),self.num))
def dxc(self,num):
self.traverse(self.root_path)
print(self.wenjian_list)
threads_num = num
threads = []
for url in self.wenjian_list:
self.wenjan_queue.put(url)
for i in range(threads_num):
t = threading.Thread(target=self.my_target)
t.start()
threads.append(t)
self.wenjan_queue.join()
for i in range(threads_num):
self.wenjan_queue.put(None)
for t in threads:
t.join()
def my_target(self):
while 1:
wenjian = self.wenjan_queue.get()
if wenjian is None:
break
self.main(wenjian)
self.wenjan_queue.task_done()
self.wenjan_queue.task_done()
if __name__ == '__main__':
handle = Handle()
# handle.dox_docx()
handle.dxc(20)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。