2 Star 7 Fork 3

SunRains / DianPingSpider

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
utils.py 13.33 KB
一键复制 编辑 原始数据 按行查看 历史
SunRains 提交于 2021-11-08 09:17 . 大众点评商铺信息抓取实现
# _*_coding:utf-8_*_
# Author:sy
# Created:2021/11/6 0006 10:21
# Version:1.0
from dianping.exception import ProxyException, RequestException
import json
import os
import requests
import xlrd
import xlwt
from xlrd import xldate_as_tuple
from fontTools.ttLib import TTFont
import datetime
from lxml import etree
import re
import time
from fake_useragent import UserAgent
# class UserAgent(object):
#
# @classmethod
# def get_user_agent(cls, default_user_agent=""):
# """
# 随机获取UserAgent
# :return:
# """
# if not os.path.isfile("./user_agent.json"):
# raise Exception("User_Agent file is not exists")
# # 读取UserAgent文件中的信息
# try:
# with open("./user_agent.json", "r", encoding="utf-8") as file:
# data = json.loads(file.read())
# data_randomize = list(data['randomize'].values())
# data_browsers = data['browsers']
#
# # 随机获取
# browser = random.choice(data_randomize)
#
# return random.choice(data_browsers[browser])
# except Exception as e:
# # 获取失败返回默认的
# if len(default_user_agent) == 0:
# default_user_agent = "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.93 Safari/537.36"
# return default_user_agent
#
class ProxyUtils(object):
"""
IP代理
"""
def __init__(self, proxy_url, code_flag="code",
success_code=200,
data_flag="data",
ip_flag="ip",
message_code="msg",
port_flag="port"):
if len(proxy_url) == 0:
raise ProxyException("IP代理地址不能为空")
assert isinstance(proxy_url, str), 'proxy_url must be str'
self.proxy_url = proxy_url
# 代理接口状态标识
self.code_flag = code_flag
# 代理接口成功状态码
self.success_code = success_code
# 代理接口数据标识
self.data_flag = data_flag
# 代理接口IP地址标识
self.ip_flag = ip_flag
# 代理接口IP端口标识
self.port_flag = port_flag
# 代理接口IP端口标识
self.message_code = message_code
def get_proxy_ip(self):
"""
获取IP代理返回的IP地址
:return:
"""
time.sleep(5) # 睡眠5秒,以防止请求代理频繁
headers = {
"User-Agent": UserAgent(path="./user_agent.json", use_cache_server=False).random,
}
response = requests.get(self.proxy_url, headers=headers)
response.raise_for_status()
response.encoding = "utf-8"
response_data = json.loads(response.text)
if response_data[self.code_flag] != self.success_code:
raise ProxyException(response_data[self.message_code])
if len(response_data[self.data_flag]) == 0:
raise ProxyException("未获取到任何数据")
print("获取代理数据:{}".format(response_data[self.data_flag]))
# 返回动态代理结果
return [ip["ip"] + ":" + str(ip["port"]) for ip in response_data[self.data_flag]]
class ExcelUtils(object):
@classmethod
def read_excel_data(cls, file_path, sheet_index, sheet_column=None):
sheet = xlrd.open_workbook(file_path).sheet_by_index(sheet_index)
# Excel行数
sheet_rows = sheet.nrows
# Excel列数
sheet_cols = sheet.ncols
if sheet_column is not None:
assert isinstance(sheet_column, (list, set, tuple)), \
'sheet_column must be list\\tuple\\set of strings or unicode'
if len(sheet_column) != sheet_cols:
raise Exception("Excel 列名对应关系有误,请检查对应关系")
data = []
for row_index in range(1, sheet_rows):
current_row_data = {}
for col_index in range(1, sheet_cols):
# 获取单元格数据
cell_value = sheet.cell_value(row_index, col_index)
# 获取单元格数据类型
cell_type = sheet.cell(row_index, col_index).ctype
# 类型转换
if cell_type == 2 and cell_value % 1 == 0: # 如果是整形
cell_value = int(cell_value)
elif cell_type == 3:
# 转成datetime对象
date = datetime.datetime(*xldate_as_tuple(cell_value, 0))
cell_value = date.strftime('%Y/%d/%m %H:%M:%S')
elif cell_type == 4:
cell_value = True if cell_value == 1 else False
# 单元格赋值
column_name = sheet.row_values(0)[col_index]
if sheet_column is not None:
column_name = sheet_column[col_index]
current_row_data[column_name] = cell_value
data.append(current_row_data)
return data
@classmethod
def write_excel_data(cls, out_put_data, file_name, sheet_name):
assert isinstance(out_put_data, (list, set, tuple)), \
'sheet_column must be list\\tuple\\set of strings or unicode'
if len(out_put_data) == 0:
return
title = [item for item in out_put_data[0].keys()]
# 创建Sheet页
file = xlwt.Workbook(encoding="utf-8")
sheet = file.add_sheet(sheet_name, cell_overwrite_ok=True)
# 行数
row = 1
# 往Excel写入标题
for i in range(len(title)):
sheet.write(0, i, title[i])
# 写入数据
for i, item in enumerate(out_put_data):
for title_index in range(len(title)):
sheet.write(row, title_index, item[title[title_index]])
row += 1
# 保存到本地
file.save("{}.xlsx".format(file_name))
class RequestUtils(object):
@staticmethod
def __getheader():
with open("./cookie.txt", "r", encoding="utf-8") as file:
cookie = file.read()
if cookie is None or len(cookie) == 0:
raise RequestException("Cookie不能为空")
return {
"Cookie": cookie,
"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'
}
@classmethod
def down_href(cls, url, proxy_ip, data=None, method='GET'):
# 构造Header
headers = cls.__getheader()
# 获取IP代理
if proxy_ip is None:
raise RequestException("代理IP不能为空。")
proxy = {
"http": proxy_ip
}
response = None
if method == 'GET':
if data is None:
response = requests.get(url, headers=headers, proxies=proxy)
else:
response = requests.get(url, data=data, headers=headers, proxies=proxy)
else:
# TODO other request method
pass
if response is not None:
response.raise_for_status()
else:
raise RequestException("请求返回结果为空")
response.encoding = "utf-8"
return response.text
@classmethod
def download_file_text(cls, url):
# 获取header
headers = cls.__getheader()
response = requests.get(url=url, headers=headers)
response.encoding = "utf-8"
return response.text
@classmethod
def download_file_content(cls, url):
# 获取header
headers = cls.__getheader()
response = requests.get(url=url, headers=headers)
response.encoding = "utf-8"
return response.content
class FontUtils(object):
@classmethod
def down_font(cls, html_text):
html = etree.HTML(html_text)
# 获取所有link标签的href属性,得到svgtextcss所在的位置
link_tag = html.xpath("//link/@href")
svg_text_css_href = ""
for item in link_tag:
if item.find("svgtextcss") > -1:
svg_text_css_href = item
if len(svg_text_css_href) == 0:
raise RequestException("获取svg地址失败")
svg_href = "http:{}".format(svg_text_css_href)
svg_file_name = svg_href.split("/")
svg_file_name = svg_file_name[len(svg_file_name) - 1]
# 判断svg_css是否存在
if os.path.isfile("./css/{}".format(svg_file_name)):
return
# 不存在,则下载获取
svg_content = RequestUtils.download_file_text(svg_href)
with open("./css/{}".format(svg_file_name), "w", encoding="utf-8") as file:
file.write(svg_content)
# 下载字体
s = re.findall(r'\{(.*?)\}', svg_content)
for item in s:
# 解析字体
font_family = re.findall(r'font-family: "(.*?)"', item)
if len(font_family) == 0:
continue
href = re.findall(r',url\("(.*?)"\);', item)
file_path_name = "./font/{}.woff".format(font_family[0].split("-")[-1])
file_href = "https:{}".format(href[0])
# 下载
content = RequestUtils.download_file_content(file_href)
with open(file_path_name, "wb") as file:
file.write(content)
@classmethod
def get_font_word(cls, name, key):
tag = TTFont("./font/{}.woff".format(name))
order_list = tag.getGlyphOrder()
index = order_list.index(key)
"""
获取woff文本对照
:return:
"""
woff_str = '''
1234567890店中美家馆
小车大市公酒行国品发电金心业商司超
生装园场食有新限天面工服海华水房饰
城乐汽香部利子老艺花专东肉菜学福饭
人百餐茶务通味所山区门药银农龙停尚
安广鑫一容动南具源兴鲜记时机烤
文康信果阳理锅宝达地儿衣特产西批坊
州牛佳化五米修爱北养卖建材三会鸡室
红站德王光名丽油院堂烧江社合星货型
村自科快便日民营和活童明器烟育宾精
屋经居庄石顺林尔县手厅销用好客火雅
盛体旅之鞋辣作粉包楼校鱼平彩上
吧保永万物教吃设医正造丰健点汤网庆
技斯洗料配汇木缘加麻联卫川泰色世方
寓风幼羊烫来高厂兰阿贝皮全女拉成云
维贸道术运都口博河瑞宏京际路祥青镇
厨培力惠连马鸿钢训影甲助窗布富牌头
四多妆吉苑沙恒隆春干饼氏里二管
诚制售嘉长轩杂副清计黄讯太鸭号街交
与叉附近层旁对巷栋环省桥湖段乡厦府
铺内侧元购前幢滨处向座下臬凤港开关
景泉塘放昌线湾政步宁解白田町溪十八
古双胜本单同九迎第台玉锦底后七斜期
武岭松角纪朝峰六振珠局岗洲横边
济井办汉代临弄团外塔杨铁浦字年岛陵
原梅进荣友虹央桂沿事津凯莲丁秀柳集
紫旗张谷的是不了很还个也这我就在以
可到错没去过感次要比觉看得说常真们
但最喜哈么别位能较境非为欢然他挺着
价那意种想出员两推做排实分间甜
度起满给热完格荐喝等其再几只现朋候
样直而买于般豆量选奶打每评少算又因
情找些份置适什蛋师气你姐棒试总定啊
足级整带虾如态且尝主话强当更板知己
无酸让入啦式笑赞片酱差像提队走嫩才
刚午接重串回晚微周值费性桌拍跟
块调糕
'''
woffs = [i for i in woff_str if i != '\n' and i != ' ']
word = woffs[index - 2]
return word
@classmethod
def translate_tag(cls, html, tag_xpath):
children_tag_div = html.xpath(tag_xpath + "/*")
children_tag_dict = {}
# 翻译内容
for item in children_tag_div:
if item.tag != "e" and item.tag != "d":
continue
class_name = item.xpath("./@class")[0]
item_value = item.xpath("./text()")[0]
if item_value.startswith("*") and item_value.endswith("*"):
children_tag_dict[item_value] = cls.get_font_word(class_name, "uni" + item_value.strip("*"))
else:
children_tag_dict[item_value] = item_value
# 和现有的组合
char_arr = []
for item_char in html.xpath(tag_xpath + "//text()"):
if item_char in children_tag_dict.keys():
char_arr.append(children_tag_dict[item_char.strip()].replace("\\n", ""))
else:
char_arr.append(item_char.strip().replace("\\n", ""))
return "".join(char_arr)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/SunRains/dian-ping-spider.git
git@gitee.com:SunRains/dian-ping-spider.git
SunRains
dian-ping-spider
DianPingSpider
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891