1 Star 0 Fork 24

walkskyer / SerialGenius

forked from edwinfound / SerialGenius 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
SerialUtil.py 5.37 KB
一键复制 编辑 原始数据 按行查看 历史
Eamon 提交于 2015-08-30 03:05 . feature: 初始化版本
#coding:utf-8
import sys,threading,time
import serial
import binascii,encodings
import re
import socket
from struct import *;
import Global;
import ELog;
#界面
PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE = 'N', 'E', 'O', 'M', 'S'
STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO = (1, 1.5, 2)
FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS = (5, 6, 7, 8)
PARITY_NAMES = {
PARITY_NONE: 'None',
PARITY_EVEN: 'Even',
PARITY_ODD: 'Odd',
PARITY_MARK: 'Mark',
PARITY_SPACE: 'Space',
}
BAUDRATES = ( 50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
9600, 19200, 38400, 57600, 115200, 128000, 192000, 256000,
384000, 512000, 768000, 1024000, 2048000, 4096000)
BYTESIZES = (FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS)
PARITIES = (PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE)
STOPBITS = (STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO)
class SerialThread:
def __init__(self, port=0, baudrate=9600, bytesize=EIGHTBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE, timeout=2):
self.l_serial = None;
self.port=port;
self.baudrate=baudrate;
self.bytesize=bytesize;
self.parity=parity;
self.stopbits=stopbits;
self.timeout=timeout;
self.exceptStop=False;
self.running = False;
self.thread_read = None;
self.thread_write = None;
self.thread_monitor=None;
self.writeBuffer=[];
'''
self.waitEnd = None;
self.port = Port;
'''
def start(self):
try:
self.l_serial = serial.Serial();
self.l_serial.port = self.port;
self.l_serial.baudrate = self.baudrate;
self.l_serial.parity = self.parity;
self.l_serial.stopbits = self.stopbits;
self.l_serial.timeout = self.timeout;
self.l_serial.open();
if self.l_serial.isOpen():
#self.waitEnd = threading.Event();
self.running = True;
self.exceptStop=False;
self.thread_read = threading.Thread(target=self.serialReader);
self.thread_read.setDaemon(True);
self.thread_read.start();
self.thread_write = threading.Thread(target=self.serialWriter);
self.thread_write.setDaemon(True);
self.thread_write.start();
self.thread_monitor = threading.Thread(target=self.serialMonitor);
self.thread_monitor.setDaemon(True);
self.thread_monitor.start();
return True;
else:
return False;
except:
ELog.Trace();
return False;
def stop(self):
self.running = False;
try:
self.thread_read.join();
self.thread_write.join();
if self.l_serial.isOpen():
self.l_serial.close();
except:
pass;
def serialMonitor(self):
self.thread_read.join();
self.thread_write.join();
if self.exceptStop:
self.stop();
Global.var['PanelTool'].OnOpenPort(None, True);
def SendAppend(self, data):
#print data;
self.writeBuffer.extend(data);
def serialWriter(self):
while self.running:
try:
if len(self.writeBuffer):
c=self.writeBuffer.pop(0);
if 0<=c<=255:
self.l_serial.write(chr(c));
#else:
#print '0x%X' % c;
else:
time.sleep(0.01);
except:
self.running=False;
self.exceptStop=True;
ELog.Trace();
def serialReader(self):
while self.running:
try:
n = self.l_serial.inWaiting();
if n:
data = self.l_serial.read(n);
rawData=[];
for i in range(0,n):
rawData.append(ord(data[i]));
for i in range(0, len(Global.Modules)):
if Global.var['PanelMain'].modules[i].isUsing:
Global.var['PanelMain'].modules[i].RecvData(rawData);
else:
time.sleep(0.001);
except:
self.running=False;
self.exceptStop=True;
ELog.Trace();
'''
返回可用的串口
'''
def GetValidPort():
valid_port=[];
t_serial=None;
for i in range(0, 20):
try:
t_serial = serial.Serial();
t_serial.port = i;
t_serial.baudrate = 9600;
t_serial.timeout = 2;
t_serial.bytesize=EIGHTBITS;
t_serial.parity=PARITY_NONE;
t_serial.stopbits=STOPBITS_ONE;
t_serial.open();
except:
pass;
if t_serial.isOpen():
try:
t_serial.close();
except:
pass;
valid_port.append(i);
return valid_port;
if __name__=='__main__':
print GetValidPort();
1
https://gitee.com/walkskyer/SerialGenius.git
git@gitee.com:walkskyer/SerialGenius.git
walkskyer
SerialGenius
SerialGenius
master

搜索帮助