1 Star 0 Fork 2

chenkechao / bit-fist-crawler

forked from 拳倾天下 / bit-fist-crawler 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

源码:bit-fist-crawler

初学 python,还有很多问题待优化,欢迎和我一样的小白一起研究,也欢迎大佬路过指点!

1. 介绍

用于从爬取某系统数据,同步到数据库

  • 数学验证码识别与自动计算
  • 会话保持,验证码识别错误、session 过期,自动重试
  • 架构 python、docker、mysql

2. 安装教程

2.1. docker 打包、推送

确保本地 docker 为启动状态

打开 cmd 或 powershell,cd 到 Dockerfile 同级目录下,执行 docker 打包:

docker build -t bit-fist-crawler:0.1 .
docker tag bit-fist-crawler:0.1 harbor.test.com/cztl/bit-fist-crawler:0.1
dokcer push harbor.test.com/cztl/bit-fist-crawler:0.1

harbor.test.com 替换成自己的 docker 仓库

2.2. 服务器配置信息

docker-compose 配置文件 docker-compose-crawler.yml

version: '2'
services:
  bit-fist-crawler:
    image: harbor.test.com/cztl/bit-fist-crawler:0.1
    container_name: bit-fist-crawler
    volumes:
      - "/data/crawler/config:/home/project/config"
      - "/data/crawler/log:/home/project/logs"
    restart: always
    ports:
      - 11423:11423
    mem_limit: 512m
    networks:
      - chuanzangNetwork
    logging:
      options:
        max-size: "10m"
        max-file: "10" 
networks:
  chuanzangNetwork: 
    external: true

其中 "/data/crawler/config","/data/crawler/log" 分别用于挂载配置文件路径和日志文件路径。

在服务器上创建文件夹,用于存放配置文件覆盖代码中的 /config

sudo mkdir /data/crawler/config

将 config.txt 上传到该文件夹

2.3. 启动

sudo docker-compose -f docker-compose-crawler.yml up -d

启动后,日志挂载在 /data/crawler/log

3. 实现过程

3.0. 背景

开发项目时,遇到了需要用爬虫爬数据的需求。目标网站是个需要登录验证的网站,不能直接获取接口或页面。用户名、账号对方已经给到我们,剩下的是就是通过验证,然后获取数据。 用 java 应该也是可以实现的,但是想借这个契机学习一下 python,所以决定用 python 实现。于是乎在菜鸟学了几天 Python 3 教程,真是入门级的好去处^-^。 看了好多文章,最后实现了这个从 0 到 1 的过程,很开心。

初学 python,代码并不是很规范,文件夹也是随便建的。还有很多问题待优化,欢迎和我一样的小白一起研究,也欢迎大佬路过指点!

源码地址:bit-fist-crawler

3.1. 验证码识别

识别验证码就是这个项目的核心了,第一次接触,还是挺费劲的。这个功能是使用 opencv-python 这个库来做图形处理的。 代码主要在 src/utils/captcha_util.py 中。验证码各式各样,每种验证码的图形处理都会有不同的处理过程,所以这部分代码只能参考思路。

我的验证码是这样的:

识别出 5+1,然后计算结果 6 就是最终结果。好在这个验证码都是个位运算,而且后面的 “=?”固定,不需要识别,所以最终只要识别前三个字符就行。

这部分用到这两个库:

pip install opencv-python
pip install numpy

opencv-python 处理图片的方式,其实是将图片数字化。众所周知,图片是由一个个像素组成,每个像素又是由 r,g,b 三种颜色组成,每种颜色从 0-255 代表其亮度。这样一来就可以用一个三维数组来表示一张图片,前面两维表示坐标,第三维表示色道值。

我的验证码是 60*160 的图片,所以读取之后得到的是 60*160*3 的数组.

识别出字符,需要排除干扰因素,把最需要并且最简单部分交给程序处理。 对于这个验证码,我的处理方式是:

  1. 灰度化:排除颜色信息
import cv2
import numpy as np
# 打开图片,image_path 是验证码下载的文件路径:src/img/cztl-web-captcha.jpeg,每次下载重写这个图片
img = cv2.imread(image_path)
# 灰度处理 或者 分离通道,这两个都可以得到灰度图,可根据世界效果选;分离通道返回值依次是 b,g,r,可根据世界效果挑选,本例使用的是 g
# img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
b, img_gray, r = cv2.split(img)

灰度化过后,图片中只剩下黑白灰,可以理解成只有一个色道,这样三维数组就变成一个 60*160 的二维数组了。 2. 二值化:使图片中只有黑白两种颜色

# 二值化,大于阈值 80 的都转化成 255(黑),否则是 0(白)
ret, img_inv = cv2.threshold(img_gray, 80, 255, cv2.THRESH_BINARY_INV)

  1. 透视拉伸:这该死的验证码是倾斜的,所以要做一下拉伸,尽量使字符摆正(但是这个验证码,每张验证码的倾斜程度都不同,这是比较蛋疼的部分,也是我的程序会误判的元凶之一,也没找到很好的解决办法)
# 透视拉伸
img_dst = img_perspective(img_inv)
def img_perspective(img):
    """
    图片透视拉伸
    :param img: 源图片
    :return: 拉伸后的图片
    """
    pos1 = np.float32([[0, 0], [135, 0], [30, 60], [160, 60]])
    pos2 = np.float32([[25, 0], [160, 0], [30, 60], [160, 60]])
    mm = cv2.getPerspectiveTransform(pos1, pos2)
    return cv2.warpPerspective(img, mm, (160, 60))

pos1 是当前图片中的四个点为像素坐标,按照左上,右上,左下,右下顺序排列。pos2 是想要调整到的目标点位。

  1. 切割图片:把验证码的前三个字符切割成图片,由于会发生字符粘连的情况,所以需要额外进行判断和切割
# 查找轮廓
contours, hierarchy = cv2.findContours(img_dst, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 画出矩形边界,x、y边框起始点的坐标,w、h为宽高
cv2.rectangle(img_dst, (x, y), (x + w, y + h), (255, 255, 255), thickness=1)

画出矩形边界,已办用于在调整的时候用,可以看到边框,方便修改。二标注的时候需要把字符切成一张张小图片

box = np.int0([[x, y], [x + w, y], [x + w, y + h], [x, y + h]])
cv2.drawContours(img_dst, [box], 0, (0, 0, 255), 2)
roi = img_dst[box[0][1]:box[3][1], box[0][0]:box[1][0]]
roi_std = cv2.resize(roi, (30, 30))  # 将字符图片统一调整为30x30的图片大小

但是有时候会出现字符粘连的情况,像下图就是前两个字符粘在一起,没法按照轮廓切割。

对于字符粘连的问题,我的方式简单粗暴,按照宽度平均分割

def get_rect_contours(img_dst):
    """
    获取矩形边界列表,按照x坐标从左向右排序
    :param img_dst:
    :return:
    """
    contours, hierarchy = cv2.findContours(img_dst, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    rects = []
    for contour in contours:
        x, y, w, h = cv2.boundingRect(contour)
        # 排除问号
        if w < 10 and x > 100 or x > 110 and h > 10:
            continue
        # 排除等号
        if 2 < w / h < 6 and x > 90:
            continue
        # 两个字符粘在一起,通常是第二个数字和运算符粘连 或第二个数字和等号粘连 或第一个数字和运算符粘连
        if 28 < w < 50:
            w = 20
            if x < 80:
                rects.append((x + 20, y, w, h))
        # 三个字符粘连,出现在运算符、第二个数字、等号之间 或第一个数字、运算符、第二个数字之间
        if w > 50:
            rects.append((x, y, 20, h))
            rects.append((x + 20, y, 20, h))
            if x < 50:
                rects.append((x + 40, y, w - 40, h))
            continue
        # '*' 被划分太细的情况放弃
        if w < 10 and h < 10:
            continue
        # 矩形切图
        rects.append((x, y, w, h))
    rects.sort(key=None, reverse=False)
    return rects
  1. 人工标注
def mark_img(roi, timestamp):
    """
    人工标注切图
    :param roi:
    :param timestamp:
    :return:
    """
    print("PS:对每张切图输入对应的字符(用于标记切图),回车跳过当前切图,点击关闭退出人工标记切图")
    cv2.imshow("image", roi)
    key = cv2.waitKey(0)
    if key == 27: # 点击关闭,退出
        sys.exit()
    if key == 13: # 回车跳过当前
        return
    char = chr(key)
    print("您输入的key是:", char)
    filename = "%s/%s_%s.jpg" % (img_lib_path, timestamp, char)
    cv2.imwrite(filename, roi)

这个过程就是不断的加载新的验证码,把图片切出来,人工的查看每张切图,用时间戳、下划线、识别字符命名图片,保存起来,用作训练数据。这部分比较无脑,但是要重复好多次,保存足够的训练数据。 6. 训练

所谓训练就是利用已存在的数据(img_lib中的图片),归纳总结出一个规律性的、可借鉴的模型来,后面就可以根据这个模型判断识别新的验证码。

def train_machine():
    """
    机器训练
    :return: id_label_map, model
    """
    # TODO 后续可尝试将返回值缓存和持久化
    filenames = os.listdir(img_lib_path)
    samples = np.empty((0, 900))
    labels = []
    for filename in filenames:
        filepath = "%s/%s" % (img_lib_path, filename)
        label = filename.split(".")[0].split("_")[-1]
        labels.append(label)
        im = cv2.imread(filepath, cv2.IMREAD_GRAYSCALE)
        roi_std = cv2.resize(im, (30, 30))
        sample = roi_std.reshape((1, 900)).astype(np.float32)
        samples = np.append(samples, sample, 0)
    samples = samples.astype(np.float32)
    unique_labels = list(set(labels))
    unique_ids = list(range(len(unique_labels)))
    label_id_map = dict(zip(unique_labels, unique_ids))
    id_label_map = dict(zip(unique_ids, unique_labels))
    label_ids = list(map(lambda x: label_id_map[x], labels))
    label_ids = np.array(label_ids).reshape((-1, 1)).astype(np.float32)
    model = cv2.ml.KNearest_create()
    model.train(samples, cv2.ml.ROW_SAMPLE, label_ids)
    return id_label_map, model

这个过程是把 img_lib 下的所有图片都用 cv2.imread() 读进来,保存在 samples 中,然后切割文件名,把下划线之后的字符(也就是图片对应的字符,此处称之为 label) 保存在 labels 中。samples 和 labels 是一一对应的关系。

id_label_map 的 value 是 labels 去重的集合,key 是角标,id_label_map 用于后续对应查找 label。 label_ids 是 img_lib 中所有图片依次对应 id_label_map 的 key。

  1. 识别

识别的过程,主要用到的是 model 这个对象,拿到新的验证码图片之后,依次和 samples 中的图片(二维数组)进行对比, 然后找到最接近的图片,返回这张图片对应的 label_ids 的值。最后用这个值去 id_label_map 中找出对应的 label,即识别到的字符。 说到底,是数学问题啊~

id_label_map, model = train_machine()
for image in images:
    sample = image.reshape((1, 900)).astype(np.float32)
    # 找出最相似的图片
    ret, results, neighbours, distances = model.findNearest(sample, k=3)
    # 找出该图片对应的 label_ids 中的值
    label_id = int(results[0, 0])
    # 找出对应的 label,这就是识别结果
    label = id_label_map[label_id]
    cv2.imshow("image", image)
    key = cv2.waitKey(0)
    if key == 27:
        sys.exit()
    if key == 13:
        return
    correct_char = chr(key)
    print("您输入的key是:%s,机器识别的key是:%s" % (correct_char, label))

这部分代码是搬砖来的,个人觉得 train_machine() 可以稍微简化一下的,训练数据时,label_ids 如果存的是 id_label_map 的值,而不是 key 的话,后面就返回一个 model 就行了, 后面识别的 results[0, 0] 直接就是我们想要的结果,也不需要在取 id_label_map 中找了。

3.2. session 保持及自动重试

验证码识别的最终目的是登陆,获取校验所需的信息,从而在之后的请求能通过校验。本例系统使用的是 session、cookie机制,所以此处最开始用到 http.cookiejar 保存 cookie 的方式请求。

import requests
import http.cookiejar
# 设置一个cookie处理器,它负责从服务器下载cookie到本地,并且在发送请求时带上本地的cookie
    cj = http.cookiejar.CookieJar()
    cookie_support = request.HTTPCookieProcessor(cj)
    opener = request.build_opener(cookie_support, request.HTTPHandler)

    request.install_opener(opener)
    raw_data = {"figure": figure, "username": username, "password": password, "imgId": imgId, "code": code}
    post_data = parse.urlencode(raw_data).encode('utf-8')
    cookie = input("输入cookie:")
    headers = {"Cookie": cookie}
    req = request.Request(url=login_url, data=post_data, method='POST')
    # 打开登录主页面(目的是从页面下载cookie,这样我们在再送post数据时就有cookie了,否则发送不成功)
    response = request.urlopen(req)

但是这种方式行不通,因为每次请求都是一个新的会话,导致没次都识别到的验证码都和后端不对应。

解决办法是使用 session = requests.session(),requests库的session会话对象可以跨请求保持某些参数,就是比如你使用session成功的登录了某个网站,再次使用该session对象请求该网站的其他网页都会默认使用该session之前使用的cookie等参数。

import requests
session = requests.session()
wrong_title = "铁路工程管理平台--登录"


def request(url, method=request_method.GET, headers=None, data=None):
    """
    请求数据
    :param url:
    :param method:
    :param headers:
    :param data:
    :return:
    """
    response = session.request(method, url, headers=headers, data=data)
    return response

这样就解决了会话保持的问题,但是还有个问题,session迟早会过期,所以还需要加上 session 过期自动重新登录的功能。 这个系统 session 过期或登陆失败,都会重定向会登录页,所以此处用网页 title 判断是否 session 过期。所以增加了以下方法,这样 session 过期或者验证码识别错了导致登录失败,都可以自动重新登录。


def request_retry(url, method=request_method.GET, headers=None, data=None):
    """
    请求数据
    :param url:
    :param method:
    :param headers:
    :param data:
    :return:
    """
    response = session.request(method, url, headers=headers, data=data)
    if response.headers.get("Content-Type") == "text/html;charset=UTF-8":
        title = get_tile(response.text)
        if title == wrong_title:
            login()
            response = session.request(method, url, headers=headers, data=data)
    return response

3.3 定时任务和爬取数据

定时任务用到了 apscheduler 库

from apscheduler.schedulers.blocking import BlockingScheduler

创建一个 BlockingScheduler 对象,然后把需要定时的方法放入参数中。此处设置的是在 7 点到 23 点之间,每半小时更新一次。

def main():
    # 登录,收集数据
    login.login()
    data_collector()
    # 定时收集数据
    task = BlockingScheduler()
    task.add_job(data_collector, "cron", hour="7-23", minute="*/30")
    task.start()


def data_collector():
    """
    数据收集
    :return:
    """
    logging.info("开始定时任务")
    # 同步进度信息
    progress_collector.get_construct_points()
    logging.info("定时任务完成")

所爬数据分两种,一种是响应格式为 json,另一种响应格式为 html 页面。 json 格式的数据可以借助 json 库转化下,直接取出想要的字段即可,例如:

def get_retry_std(url, headers=None, data=None):
    """
    get方法请求数据,返回json,只适合 responses 是标准输出的情况
    :param url:
    :param headers:
    :param data:
    :return:
    """
    response = get_retry(url, headers=headers, data=data)
    if response.headers.get("Content-Type") != "text/plain;charset=UTF-8":
        log.error("Content-Type 必须是text/plain;charset=UTF-8")
    if response.status_code != 200:
        log.error("请求异常,状态码:", response.status_code)
    response_body = json.loads(response.text)
    return response_body["result"]

html 页面的数据需要借助 lxml 解析 dom,最后获取想要的 dom 的数据即可,例如:

from lxml import etree
def get_tile(text):
    """
    根据文本获取html页面title
    :param text:
    :return:
    """
    tree = etree.HTML(text)
    return tree.xpath("//title")[0].text

4. 使用说明

4.1. 服务器上启动

直接运行 docker 镜像即可

4.2. 本地使用

本地启动 starter.py 即可,半小时刷新一次。

test_main.py 中 test_analyse_accuracy() 方法用于测试识别精确度,自定义循环次数,需要每次手动输入用于判断机器识别是否正确。 test_bound_result() 方法,用于画矩形边框,辅助调整切割图片。 test_mark_images() 人工标注图片,标注好的切图存于 img_lib 中,用作训练数据。

配置文件中的内容需要 [mysql] 自行补充, [crawler] 部分仅适用于本例,需根据实际情况定。

参考文章

Python 3 教程

用Python识别验证码

opencv-python 入门篇

opencv-python 指南

空文件

简介

数学验证码识别与自动计算 验证码识别错误、session 过期,自动重试 展开 收起
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
1
https://gitee.com/chenkechao/bit-fist-crawler.git
git@gitee.com:chenkechao/bit-fist-crawler.git
chenkechao
bit-fist-crawler
bit-fist-crawler
master

搜索帮助