4 Star 3 Fork 3

小脚丫 / 虚拟主播-vtuber-python

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
main.py 14.63 KB
一键复制 编辑 原始数据 按行查看 历史
小脚丫 提交于 2023-04-05 13:29 . 夏健勇--手语识别更新
"""
Main program to run the detection and TCP
"""
from argparse import ArgumentParser
import cv2
import mediapipe as mp
import numpy as np
# for TCP connection with unity
import socket
# face detection and facial landmark
from facial_landmark import FaceMeshDetector
from hand_landmark import HandDetector
# pose estimation and stablization
from pose_estimator import PoseEstimator
from filters.stabilizer import Stabilizer
from filters.Kalman3D import Kalman3D
# Miscellaneous detections (eyes/ mouth...)
from facial_features import FacialFeatures, Eyes
# global variable
from utils.cvfpscalc import CvFpsCalc
from utils.dataset_utils import load_dataset, load_reference_signs
from sign_language.sign_recorder import SignRecorder
from sign_language.webcam_manager import WebcamManager
port = 5061
# have to be same as unity
# init TCP connection with unity
# return the socket connected
# def init_TCP():
# port = args.port
#
# # '127.0.0.1' = 'localhost' = your computer internal data transmission IP
# address = ('127.0.0.1', port)
# # address = ('192.168.0.107', port)
#
# try:
# s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# s.connect(address)
# # print(socket.gethostbyname(socket.gethostname()) + "::" + str(port))
# print("Connected to address:", socket.gethostbyname(socket.gethostname()) + ":" + str(port))
# return s
# except OSError as e:
# print("Error while connecting :: %s" % e)
#
# # quit the script if connection fails (e.g. Unity server side quits suddenly)
# sys.exit()
#
# s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# # print(socket.gethostbyname(socket.gethostname()))
# s.connect(address)
# return s
# def send_info_to_unity(s, args, data):
# msg = '%.4f ' * len(args) % args + str(data)
# # print(msg)
#
# try:
# s.send(bytes(msg, "utf-8"))
# except socket.error as e:
# print("error while sending :: " + str(e))
#
# # quit the script if connection fails (e.g. Unity server side quits suddenly)
# sys.exit()
# def print_debug_msg(args):
# msg = '%.4f ' * len(args) % args
# print(msg)
def main():
# 废除了先前版本的TCP传输 采用UDP传输
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
serverAddressPort = ("127.0.0.1", 5061)
# use internal webcam/ USB camera
cap = cv2.VideoCapture(0)
# IP cam (android only), with the app "IP Webcam"
# url = 'http://192.168.0.102:4747/video'
# url = 'https://192.168.0.102:8080/video'
# cap = cv2.VideoCapture(url)
# 手部的二项式拟合式与拟合系数
x = [300, 245, 200, 170, 145, 130, 112, 103, 93, 87, 80, 75, 70, 67, 62, 59, 57]
y = [20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100]
coff = np.polyfit(x, y, 2)
# 初始化眼部 嘴部 姿势的卡尔曼滤波器
eyes_stabilizers = [Stabilizer(
state_num=2,
measure_num=1,
cov_process=0.1,
cov_measure=0.1) for _ in range(6)]
mouth_dist_stabilizer = Stabilizer(
state_num=2,
measure_num=1,
cov_process=0.1,
cov_measure=0.1
)
pose_stabilizers = [Stabilizer(
state_num=2,
measure_num=1,
cov_process=0.1,
cov_measure=0.1) for _ in range(6)]
# 初始化双手各点位的卡尔曼滤波器
left_hand_filters = [Kalman3D() for _ in range(21)]
right_hand_filters = [Kalman3D() for _ in range(21)]
# 初始化mediapipe的holistic模块
mp_holistic = mp.solutions.holistic
holistic = mp_holistic.Holistic(
model_complexity=1,
static_image_mode=False,
refine_face_landmarks=True,
min_detection_confidence=0.8,
min_tracking_confidence=0.5
)
# 初始化Face和Hand的监测类
Face_detector = FaceMeshDetector()
Hands_detector = HandDetector()
# FPS检测工具类
cvFpsCalc = CvFpsCalc(buffer_len=10)
# get a sample frame for pose estimation img
success, img = cap.read()
# Pose estimation related
# img.shape[0]:图像的垂直尺寸(高度)img.shape[1]:图像的水平尺寸(宽度)
pose_estimator = PoseEstimator((img.shape[0], img.shape[1]))
# 创建一个新矩阵 存放facemesh所得到的468个脸部关键点的二维坐标值
image_points = np.zeros((pose_estimator.model_points_full.shape[0], 2))
# Create dataset of the videos where landmarks have not been extracted yet
videos = load_dataset()
# Create a DataFrame of reference signs (name: str, model: SignModel, distance: int)
reference_signs = load_reference_signs(videos)
# Object that stores mediapipe results and computes sign similarities
sign_recorder = SignRecorder(reference_signs)
# Object that draws keypoints & displays results
webcam_manager = WebcamManager()
# Initialize TCP connection
# if args.connect:
# socket = init_TCP()
while cap.isOpened():
success, img = cap.read()
if not success:
print("Ignoring empty camera frame.")
continue
display_fps = cvFpsCalc.get()
# Pose estimation by 3 steps:
# 1. detect face/hands;
# 2. detect landmarks;
# 3. estimate pose
# first two steps
img = cv2.cvtColor(cv2.flip(img, 1), cv2.COLOR_BGR2RGB)
img.flags.writeable = False
results = holistic.process(img)
img.flags.writeable = True
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
imgH = img.shape[0]
imgW = img.shape[1]
# 将holistic监测结果传至手语识别相关类中
webcam_manager.sign_detected, is_recording = sign_recorder.process_results(results)
face_landmarks = results.face_landmarks
# print(face_landmarks)
# img_facemesh用于画图
# 翻转输入图像以匹配facemesh所侦测的图像
# img = cv2.flip(img, 1)
img_facemesh, faces = Face_detector.findFaceMesh(img,face_landmarks)
# print(faces)
# 存放未处理的原始手部关键点 0:left 1:right
hands=[[],[]]
# 绘制/处理 手部关键点
left_hand_landmarks = results.left_hand_landmarks
right_hand_landmarks = results.right_hand_landmarks
# data中存放最终传输给unity端的手部数据
data = []
if left_hand_landmarks:
img_facemesh,hands[0]= Hands_detector.findHands(img,left_hand_landmarks)
print (hands[0])
HandLeft = Hands_detector.hand_filter(hands[0],imgW,imgH,left_hand_filters)
# print(HandLeft)
LdistanceCM = Hands_detector.keypoint_cal(HandLeft,coff)
data.extend(["Left"])
for lm in HandLeft:
data.extend([round(lm[0],5), round(imgH - lm[1],5), round(lm[2],5),round(LdistanceCM,5)])
if right_hand_landmarks:
img_facemesh,hands[1] = Hands_detector.findHands(img, right_hand_landmarks)
HandRight = Hands_detector.hand_filter(hands[1], imgW, imgH,right_hand_filters)
# print(HandRight)
RdistanceCM = Hands_detector.keypoint_cal(HandRight,coff)
data.extend(["Right"])
for lm in HandRight:
data.extend([round(lm[0],5), round(imgH - lm[1],5), round(lm[2],5),round(RdistanceCM,5)])
# 虹膜监测模块中 额外的 10 个面部关键点
iris_image_points = np.zeros((10, 2))
# if there is any face detected
if faces:
# only get the first face
# 若需要用多个模型完成多张脸的修改 只需在此进行修改
# 存放所得到的468个脸部关键点的二维坐标值
for i in range(len(image_points)):
image_points[i, 0] = faces[0][i][0]
image_points[i, 1] = faces[0][i][1]
# 存放虹膜额外的10个面部关键点
for j in range(len(iris_image_points)):
iris_image_points[j, 0] = faces[0][j + 468][0]
iris_image_points[j, 1] = faces[0][j + 468][1]
# The third step: pose estimation
# 返回值pose为: [[rvec], [tvec]]
# solve_pose_by_all_points方法中已将 468个面部关键点在人脸模型所对应的三维坐标点的矩阵传入
# 所以此处只需传入二维点矩阵
pose = pose_estimator.solve_pose_by_all_points(image_points)
# print(pose)
# 眼睛 嘴巴等点位的处理与阈值计算
x_ratio_left, y_ratio_left = FacialFeatures.detect_iris(image_points, iris_image_points, Eyes.LEFT)
x_ratio_right, y_ratio_right = FacialFeatures.detect_iris(image_points, iris_image_points, Eyes.RIGHT)
ear_left = FacialFeatures.eye_aspect_ratio(image_points, Eyes.LEFT)
ear_right = FacialFeatures.eye_aspect_ratio(image_points, Eyes.RIGHT)
pose_eye = [ear_left, ear_right, x_ratio_left, y_ratio_left, x_ratio_right, y_ratio_right]
mar = FacialFeatures.mouth_aspect_ratio(image_points)
mouth_distance = FacialFeatures.mouth_distance(image_points)
# print("left eye: %.2f, %.2f" % (x_ratio_left, y_ratio_left))
# print("right eye: %.2f, %.2f" % (x_ratio_right, y_ratio_right))
#
# print("rvec (y) = (%f): " % (pose[0][1]))
# 所得到的旋转向量 xyz坐标均为弧度制形成
# print("rvec (x, y, z) = (%f, %f, %f): " % (pose[0][0], pose[0][1], pose[0][2]))
# print("tvec (x, y, z) = (%f, %f, %f): " % (pose[1][0], pose[1][1], pose[1][2]))
# 位姿/眼部/嘴部的滤波操作
steady_pose = []
pose_np = np.array(pose).flatten()
# print(pose_np)
for value, ps_stb in zip(pose_np, pose_stabilizers):
ps_stb.update([value])
steady_pose.append(ps_stb.state[0])
steady_pose = np.reshape(steady_pose, (-1, 3))
# stabilize the eyes value
steady_pose_eye = []
for value, ps_stb in zip(pose_eye, eyes_stabilizers):
ps_stb.update([value])
steady_pose_eye.append(ps_stb.state[0])
mouth_dist_stabilizer.update([mouth_distance])
steady_mouth_dist = mouth_dist_stabilizer.state[0]
# uncomment the rvec line to check the raw values
# print("rvec steady (x, y, z) = (%f, %f, %f): " % (steady_pose[0][0], steady_pose[0][1], steady_pose[0][2]))
# print("tvec steady (x, y, z) = (%f, %f, %f): " % (steady_pose[1][0], steady_pose[1][1], steady_pose[1][2]))
# calculate the roll/ pitch/ yaw 计算欧拉角
# roll: +ve when the axis pointing upward
# pitch: +ve when we look upward
# yaw: +ve when we look left
# np.degrees 将弧度转化为角度 np.clip 限制角度的范围为 -90 ~ 90度
roll = np.clip(np.degrees(steady_pose[0][1]), -90, 90)
pitch = np.clip(-(180 + np.degrees(steady_pose[0][0])), -90, 90)
yaw = np.clip(np.degrees(steady_pose[0][2]), -90, 90)
# print(roll,pitch,yaw)
# print("Roll: %.2f, Pitch: %.2f, Yaw: %.2f" % (roll, pitch, yaw))
# print("left eye: %.2f, %.2f; right eye %.2f, %.2f"
# % (steady_pose_eye[0], steady_pose_eye[1], steady_pose_eye[2], steady_pose_eye[3]))
# print("EAR_LEFT: %.2f; EAR_RIGHT: %.2f" % (ear_left, ear_right))
# print("MAR: %.2f; Mouth Distance: %.2f" % (mar, steady_mouth_dist))
# send info to unity
# test = (roll, pitch, yaw,
# ear_left, ear_right, x_ratio_left, y_ratio_left, x_ratio_right, y_ratio_right,
# mar, mouth_distance)
# msg = '%.4f ' * len(test) % test + str(data)
# print(msg)
# 封装 所有和面部/头部相关的数据 传至unity端
face_data = (roll, pitch, yaw,
ear_left, ear_right, x_ratio_left, y_ratio_left, x_ratio_right, y_ratio_right,
mar, mouth_distance)
msg = '%.4f ' * len(face_data) % face_data + str(data)
sock.sendto(str.encode(str(msg)), serverAddressPort)
# if args.connect:
# # for sending to live2d model (Hiyori)
# send_info_to_unity(socket,
# (roll, pitch, yaw,
# ear_left, ear_right, x_ratio_left, y_ratio_left, x_ratio_right, y_ratio_right,
# mar, mouth_distance) , str(data)
# )
#
# # print the sent values in the terminal
# if args.debug:
# print_debug_msg((roll, pitch, yaw,
# ear_left, ear_right, x_ratio_left, y_ratio_left, x_ratio_right, y_ratio_right,
# mar, mouth_distance))
# pose_estimator.draw_annotation_box(img, pose[0], pose[1], color=(255, 128, 128))
# pose_estimator.draw_axis(img, pose[0], pose[1])
# pose_estimator.draw_axes(img_facemesh, steady_pose[0], steady_pose[1])
else:
# reset our pose estimator
pose_estimator = PoseEstimator((img_facemesh.shape[0], img_facemesh.shape[1]))
# Update the frame (draw landmarks & display result)
img_facemesh = webcam_manager.draw_text(img_facemesh, imgW, imgH, is_recording)
# FPS显示
cv2.putText(img_facemesh, "FPS:" + str(display_fps), (500, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 0), 2, cv2.LINE_AA)
cv2.imshow('Facial landmark', img_facemesh)
# press "q" to leave
# if cv2.waitKey(1) & 0xFF == ord('q'):
# break
pressedKey = cv2.waitKey(1) & 0xFF
if pressedKey == ord("r"): # Record pressing r
sign_recorder.record()
elif pressedKey == ord("q"): # Break pressing q
break
cap.release()
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("--connect", action="store_true",
help="connect to unity character",
default=False)
parser.add_argument("--port", type=int,
help="specify the port of the connection to unity. Have to be the same as in Unity",
default=5066)
parser.add_argument("--cam", type=int,
help="specify the camera number if you have multiple cameras",
default=0)
parser.add_argument("--debug", action="store_true",
help="showing raw values of detection in the terminal",
default=False)
args = parser.parse_args()
# demo code
main()
Python
1
https://gitee.com/little-feet/vtuber.git
git@gitee.com:little-feet/vtuber.git
little-feet
vtuber
虚拟主播-vtuber-python
master

搜索帮助