当前仓库属于关闭状态,部分功能使用受限,详情请查阅 仓库状态说明
3 Star 4 Fork 4

风酒 / YOLO_v1_tensorflow
关闭

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
yolo.py 17.28 KB
一键复制 编辑 原始数据 按行查看 历史
风酒 提交于 2019-09-08 11:52 . repair waitkey
# -*- coding:utf-8 -*-
import numpy as np
import cv2
import tensorflow as tf
import time
import os
def save_img(img_cp, to_file):
"""
保存图片和边框分类信息
:param img_cp: 要保存的图片
:param to_file: 保存路径
:return:
"""
# 保存画过框的图片
is_saved = cv2.imwrite(to_file, img_cp) # 保存图片
if is_saved:
print("Saved success in:", to_file)
else:
print("Saving error!")
def save_txt(content, to_file):
"""
写入文件
:param content: txt文件内容
:param to_file: 保存路径
:return:
"""
txt = open(to_file, 'w')
txt.write(content)
txt.close()
print('txt file write to: ' + to_file)
class Yolo:
"""
Yolo类
"""
def __init__(self):
# 类别
self.classes = [
"aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
"dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"
]
# 类别数
self.class_num = len(self.classes)
# 权重文件
self.weights_file = 'weights/YOLO_small.ckpt'
# 是否直接显示检测后图片
self.imshow = True
# 是否开启日志
self.disp_console = True
self.alpha = 0.1
# 置信度低于该值被过滤
self.threshold = 0.2
# 非极大值抑制中IoU超过这个值,去除低置信度的框
self.iou_threshold = 0.5
# 筛选的候选框数
self.B = 2
# yolo分割
self.S = 7
self.person_detected = 0 # 检测到人的图片数
self.overall_pics = 0
self._build_networks()
self._non_maxima_suppression()
def _build_networks(self):
"""
构建网络,类似 vgg16
:return:
"""
if self.disp_console:
print("Building YOLO graph...")
self.x = tf.placeholder('float32', [None, 448, 448, 3])
conv_1 = self._conv_layer(1, self.x, 64, 7, 2)
pool_2 = self._pooling_layer(2, conv_1, 2, 2)
conv_3 = self._conv_layer(3, pool_2, 192, 3, 1)
pool_4 = self._pooling_layer(4, conv_3, 2, 2)
conv_5 = self._conv_layer(5, pool_4, 128, 1, 1)
conv_6 = self._conv_layer(6, conv_5, 256, 3, 1)
conv_7 = self._conv_layer(7, conv_6, 256, 1, 1)
conv_8 = self._conv_layer(8, conv_7, 512, 3, 1)
pool_9 = self._pooling_layer(9, conv_8, 2, 2)
conv_10 = self._conv_layer(10, pool_9, 256, 1, 1)
conv_11 = self._conv_layer(11, conv_10, 512, 3, 1)
conv_12 = self._conv_layer(12, conv_11, 256, 1, 1)
conv_13 = self._conv_layer(13, conv_12, 512, 3, 1)
conv_14 = self._conv_layer(14, conv_13, 256, 1, 1)
conv_15 = self._conv_layer(15, conv_14, 512, 3, 1)
conv_16 = self._conv_layer(16, conv_15, 256, 1, 1)
conv_17 = self._conv_layer(17, conv_16, 512, 3, 1)
conv_18 = self._conv_layer(18, conv_17, 512, 1, 1)
conv_19 = self._conv_layer(19, conv_18, 1024, 3, 1)
pool_20 = self._pooling_layer(20, conv_19, 2, 2)
conv_21 = self._conv_layer(21, pool_20, 512, 1, 1)
conv_22 = self._conv_layer(22, conv_21, 1024, 3, 1)
conv_23 = self._conv_layer(23, conv_22, 512, 1, 1)
conv_24 = self._conv_layer(24, conv_23, 1024, 3, 1)
conv_25 = self._conv_layer(25, conv_24, 1024, 3, 1)
conv_26 = self._conv_layer(26, conv_25, 1024, 3, 2)
conv_27 = self._conv_layer(27, conv_26, 1024, 3, 1)
conv_28 = self._conv_layer(28, conv_27, 1024, 3, 1)
fc_29 = self._fc_layer(29, conv_28, 512, flat=True, linear=False)
fc_30 = self._fc_layer(30, fc_29, 4096, flat=False, linear=False)
fc_32 = self._fc_layer(32, fc_30, 1470, flat=False, linear=True) # skip dropout_31
self.predicts = fc_32
self.sess = tf.Session()
self.sess.run(tf.initialize_all_variables())
self.saver = tf.train.Saver()
self.saver.restore(self.sess, self.weights_file)
if self.disp_console:
print("Loading complete!" + '\n')
def _conv_layer(self, idx, inputs, out_channel, size, stride):
"""
卷积层
:param idx: 层数
:param inputs: 输入
:param out_channel: 输出的深度/通道数
:param size: 卷积核大小
:param stride: 卷积步长
:return:
"""
input_channels = inputs.get_shape()[3] # 输入的深度/通道数
weight = tf.Variable(tf.truncated_normal([size, size, int(input_channels), out_channel], stddev=0.1))
biases = tf.Variable(tf.constant(0.1, shape=[out_channel]))
pad_size = size // 2 # 下取整
pad_mat = np.array([[0, 0], [pad_size, pad_size], [pad_size, pad_size], [0, 0]])
inputs_pad = tf.pad(inputs, pad_mat) # 2,3维填充
conv = tf.nn.conv2d(inputs_pad, weight, strides=[1, stride, stride, 1], padding='VALID', name=str(idx) + '_conv')
conv_biased = tf.add(conv, biases, name=str(idx) + '_conv_biased')
if self.disp_console:
print(
'layer-%d (Conv): size = %d*%d, stride = %d, input-out channels = %d-%d'
% (idx, size, size, stride, int(input_channels), out_channel)
)
return tf.maximum(self.alpha * conv_biased, conv_biased, name=str(idx) + '_leaky_relu')
def _pooling_layer(self, idx, inputs, size, stride):
"""
池化层
:param idx:层数
:param inputs:输入
:param size:分块大小
:param stride:步长
:return:
"""
if self.disp_console:
print('layer-%d (Pool): size = %d*%d, stride = %d' % (idx, size, size, stride))
return tf.nn.max_pool(inputs, ksize=[1, size, size, 1], strides=[1, stride, stride, 1], padding='SAME', name=str(idx) + '_pool')
def _fc_layer(self, idx, inputs, hiddens, flat=False, linear=False):
"""
全连接层
:param idx:
:param inputs:
:param hiddens:
:param flat: 输入参数是否平化,即上一层是否fc
:param linear: 是否线性激活
:return:
"""
input_shape = inputs.get_shape().as_list()
if flat:
dim = input_shape[1] * input_shape[2] * input_shape[3]
inputs_transposed = tf.transpose(inputs, (0, 3, 1, 2))
inputs_processed = tf.reshape(inputs_transposed, [-1, dim])
else:
dim = input_shape[1]
inputs_processed = inputs
weight = tf.Variable(tf.truncated_normal([dim, hiddens], stddev=0.1))
biases = tf.Variable(tf.constant(0.1, shape=[hiddens]))
if self.disp_console:
print(
'layer-%d (Fc): hidden = %d, input dimension = %d, flat = %d, activation = %d'
% (idx, hiddens, int(dim), int(flat), 1 - int(linear))
)
if linear: # 线性激活
return tf.add(tf.matmul(inputs_processed, weight), biases, name=str(idx) + '_fc')
else:
ip = tf.add(tf.matmul(inputs_processed, weight), biases)
return tf.maximum(self.alpha * ip, ip, name=str(idx) + '_fc')
def _non_maxima_suppression(self):
"""
非极大抑制 类别置信度 筛选
:return:
"""
self.w_img = tf.placeholder(tf.float32, name="w_img")
self.h_img = tf.placeholder(tf.float32, name="h_img")
idx1 = self.S * self.S * self.class_num
idx2 = idx1 + self.S * self.S * self.B
# 0-980 S*S*n 7*7个格子,每个格子20个类别的概率
class_probs = tf.reshape(self.predicts[0, :idx1], (self.S, self.S, self.class_num))
# 980-1078 S*S*B 7*7个格子,每个格子2个bbox的置信度
confidence = tf.reshape(self.predicts[0, idx1:idx2], (self.S, self.S, self.B))
# 1078- S*S*B*4 7*7个格子,每个格子2个bbox的参数,每个格子共8个参数
boxes = tf.reshape(self.predicts[0, idx2:], (self.S, self.S, self.B, 4))
# x偏移量
x_offset = np.transpose(np.reshape(np.array([np.arange(self.S)] * self.S * self.B), [self.B, self.S, self.S]), [1, 2, 0])
# y偏移量
y_offset = np.transpose(x_offset, [1, 0, 2])
# 得到bbox中心点的真实坐标和框的宽高,
# 加上偏移得到中心点相对于张图片左上角坐标,除以格子数,乘以原图大小,得到真实坐标
# 宽高平方得到相对于图片的比例,乘以原图大小,得到真实w,h
boxes = tf.stack([
(boxes[:, :, :, 0] + tf.constant(x_offset, dtype=tf.float32)) / self.S * self.w_img,
(boxes[:, :, :, 1] + tf.constant(y_offset, dtype=tf.float32)) / self.S * self.h_img,
tf.square(boxes[:, :, :, 2]) * self.w_img,
tf.square(boxes[:, :, :, 3]) * self.h_img
], axis=3)
# 将7*7*2个bbox置信度展开,将7*7*20个类别展开相乘 7*7*2*20得到每个bbox对于20个类别的条件概率
scores = tf.expand_dims(confidence, -1) * tf.expand_dims(class_probs, 2)
scores = tf.reshape(scores, [-1, self.class_num]) # [S*S*B,20]
boxes = tf.reshape(boxes, [-1, 4]) # bbox坐标[S*S*B, 4]
# # 去除置信度低于阈值的bbox
max_score_idx_in_bbox = tf.argmax(scores, axis=1) # 每个框最大概率索引
max_score_num_in_bbox = tf.reduce_max(scores, axis=1) # 每个框最大概率值
filter_mask = max_score_num_in_bbox >= self.threshold # 概率值大于阈值的bbox,维度不变,满足条件为true
bbox_score = tf.boolean_mask(max_score_num_in_bbox, filter_mask) # 找到最大7*7*2个框的中概率过阈值的框的值
bbox_classes = tf.boolean_mask(max_score_idx_in_bbox, filter_mask) # 找到最大7*7*2个框的中概率过阈值的框的id
boxes = tf.boolean_mask(boxes, filter_mask) # 找到最大7*7*2个框的中概率过阈值的框的坐标
# 非极大抑制 (不考虑不同类)
_boxes = tf.stack( # box (x, y, w, h) -> box (x1, y1, x2, y2)
[boxes[:, 0] - 0.5 * boxes[:, 2], boxes[:, 1] - 0.5 * boxes[:, 3],
boxes[:, 0] + 0.5 * boxes[:, 2], boxes[:, 1] + 0.5 * boxes[:, 3]],
axis=1)
nms_indices = tf.image.non_max_suppression(_boxes, bbox_score, self.class_num, self.iou_threshold)
self.scores = tf.gather(bbox_score, nms_indices)
self.boxes = tf.gather(boxes, nms_indices)
self.box_classes = tf.gather(bbox_classes, nms_indices)
def detect(self, img):
"""
检测
:param img:
:return:
"""
s1 = time.time()
h_img, w_img, _ = img.shape
img_resized = cv2.resize(img, (448, 448)) # 转换成448*448*3
img_rgb = cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB) # 转换成rgb模式
img_resized_np = np.asarray(img_rgb) # 转换为array
_images = np.zeros((1, 448, 448, 3), dtype=np.float32)
_images[0] = (img_resized_np / 255.0) * 2.0 - 1.0 # 归一化标准化
s2 = time.time()
if self.disp_console:
print('Deal with picture cost time : ' + str(s2 - s1) + ' s' + '\n')
scores, boxes, box_classes = self.sess.run(
[self.scores, self.boxes, self.box_classes],
feed_dict={self.x: _images, self.w_img: w_img, self.h_img: h_img}
) # 输进net,最后的fc得到1470个参数(7*7*30),深度30中前20位为类别,2位为2个为bbox置信度,8位为2个bbox的参数
if self.disp_console:
print('Detect one picture cost time : ' + str(time.time() - s2) + ' s' + '\n')
return scores, boxes, box_classes
def draw_region(self, img, scores, boxes, box_classes):
"""
画框
:param img:
:param scores:
:param boxes:
:param box_classes:
:return:
"""
s = time.time()
results = [] # 最终得到list,每条为类别名和定位框
for i in range(len(scores)):
results.append(
(self.classes[box_classes[i]], boxes[i, 0], boxes[i, 1], boxes[i, 2], boxes[i, 3], scores[i]))
img_cp = img.copy()
class_results_set = set() # 类别集合
txt = ''
for i in range(len(results)): # 遍历每个框
x = int(results[i][1])
y = int(results[i][2])
w = int(results[i][3] / 2)
h = int(results[i][4] / 2)
class_results_set.add(results[i][0]) # 放入类别名
if self.disp_console:
print(
'class: %s , [x,y,w,h]=[%s,%s,%s,%s], Confidence= %s'
% (results[i][0], str(x), str(y), str(int(results[i][3])), str(int(results[i][4])), str(results[i][5]))
)
# 画框和文字
line_type = cv2.LINE_AA if cv2.__version__ > '3' else cv2.LINE_AA
cv2.rectangle(img_cp, (x - w, y - h), (x + w, y + h), (0, 255, 0), 2)
cv2.rectangle(img_cp, (x - w, y - h - 20), (x + w, y - h), (125, 125, 125), -1)
cv2.putText(
img_cp, results[i][0] + ' : %.2f' % results[i][5], (x - w + 5, y - h - 7), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (0, 0, 0), 1, lineType=line_type
)
txt.join(results[i][0] + ',' + str(x) + ',' + str(y) + ',' + str(w) + ',' + str(h) + ',' + str(results[i][5]) + '\n')
if "person" in class_results_set:
self.person_detected += 1
if self.disp_console:
print('Draw region cost time : ' + str(time.time() - s) + ' s' + '\n')
return img_cp, txt
def image_detector(self, from_file, img_to_file, txt_to_file):
"""
单图片检测
:param from_file: 检测图片路径
:param img_to_file: 检测后图片保存路径
:param txt_to_file: 检测后文本信息保存路径
:return:
"""
image = cv2.imread(from_file) # 读取文件
scores, boxes, box_classes = self.detect(image)
image_detected, txt_content = self.draw_region(image, scores, boxes, box_classes) # 画框
# 保存文件
save_img(image_detected, img_to_file)
save_txt(txt_content, txt_to_file)
# 展示图片
cv2.imshow('YOLO_small detection', image_detected)
cv2.waitKey(0)
def multi_img_detector(self, from_folder, to_folder):
"""
多图片检测
:param from_folder: 放置图片的文件夹
:param to_folder: 保存图片的文件夹
:return:
"""
filename_list = os.listdir(from_folder)
for filename in filename_list:
self.overall_pics += 1
image = cv2.imread(from_folder + "/" + filename) # 读取文件
scores, boxes, box_classes = self.detect(image)
image_detected, txt_content = self.draw_region(image, scores, boxes, box_classes) # 画框
# 保存文件
img_to_file = to_folder + "/" + filename
txt_to_file = to_folder + "/" + filename
save_img(image_detected, img_to_file)
save_img(txt_content, txt_to_file)
# 展示图片
cv2.imshow('YOLO_small detection', image_detected)
cv2.waitKey(0)
print("Fooling_rate:", (self.overall_pics - self.person_detected) / self.overall_pics)
def camera_detector(self, to_file):
"""
摄像头检测
:param to_file: 保存路径
:return:
"""
video = cv2.VideoWriter(to_file, cv2.VideoWriter_fourcc('I', '4', '2', '0'), 30, (368, 480))
cap = cv2.VideoCapture(0)
ret, _ = cap.read()
while ret:
ret, frame = cap.read()
scores, boxes, box_classes = self.detect(frame)
image_detected, txt_content = self.draw_region(frame, scores, boxes, box_classes) # 画框
# 展示图片
cv2.imshow('YOLO_small detection', image_detected)
cv2.waitKey(1)
video.write(image_detected)
ret, frame = cap.read()
video.release()
cv2.destroyAllWindows()
def vedio_detector(self, from_vedio, to_file):
"""
视频检测
:param from_vedio: 视频路径
:param to_file: 保存路径
:return:
"""
cap = cv2.VideoCapture(from_vedio)
video_write = cv2.VideoWriter(to_file, cv2.VideoWriter_fourcc('I', '4', '2', '0'), int(cap.get(5)), (int(cap.get(3)), int(cap.get(4))))
for _ in range(int(cap.get(7))):
ret, frame = cap.read()
self.detect(frame)
scores, boxes, box_classes = self.detect(frame)
image_detected, txt_content = self.draw_region(frame, scores, boxes, box_classes) # 画框
# 展示图片
cv2.imshow('YOLO_small detection', image_detected)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
video_write.write(image_detected)
video_write.release()
cv2.destroyAllWindows()
if __name__ == '__main__':
yolo = Yolo()
# yolo.image_detector('test/sample/person.jpg', 'test/result/person.jpg', 'test/result/person.txt')
# yolo.multi_img_detector('test/sample','test/result')
# yolo.camera_detector('test/result/camera.mp4')
yolo.vedio_detector('test/sample/1.mp4', 'test/result/1.mp4')
# from tensorflow.python.client import device_lib
# print(device_lib.list_local_devices())
Python
1
https://gitee.com/windandwine/YOLO_v1_tensorflow.git
git@gitee.com:windandwine/YOLO_v1_tensorflow.git
windandwine
YOLO_v1_tensorflow
YOLO_v1_tensorflow
master

搜索帮助