代码拉取完成,页面将自动刷新
同步操作将从 edwinfound/SerialGenius 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
#coding:utf-8
import sys,threading,time
import serial
import binascii,encodings
import re
import socket
from struct import *;
import Global;
import ELog;
#界面
PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE = 'N', 'E', 'O', 'M', 'S'
STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO = (1, 1.5, 2)
FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS = (5, 6, 7, 8)
PARITY_NAMES = {
PARITY_NONE: 'None',
PARITY_EVEN: 'Even',
PARITY_ODD: 'Odd',
PARITY_MARK: 'Mark',
PARITY_SPACE: 'Space',
}
BAUDRATES = ( 50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
9600, 19200, 38400, 57600, 115200, 128000, 192000, 256000,
384000, 512000, 768000, 1024000, 2048000, 4096000)
BYTESIZES = (FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS)
PARITIES = (PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE)
STOPBITS = (STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO)
class SerialThread:
def __init__(self, port=0, baudrate=9600, bytesize=EIGHTBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE, timeout=2):
self.l_serial = None;
self.port=port;
self.baudrate=baudrate;
self.bytesize=bytesize;
self.parity=parity;
self.stopbits=stopbits;
self.timeout=timeout;
self.exceptStop=False;
self.running = False;
self.thread_read = None;
self.thread_write = None;
self.thread_monitor=None;
self.writeBuffer=[];
'''
self.waitEnd = None;
self.port = Port;
'''
def start(self):
try:
self.l_serial = serial.Serial();
self.l_serial.port = self.port;
self.l_serial.baudrate = self.baudrate;
self.l_serial.parity = self.parity;
self.l_serial.stopbits = self.stopbits;
self.l_serial.timeout = self.timeout;
self.l_serial.open();
if self.l_serial.isOpen():
#self.waitEnd = threading.Event();
self.running = True;
self.exceptStop=False;
self.thread_read = threading.Thread(target=self.serialReader);
self.thread_read.setDaemon(True);
self.thread_read.start();
self.thread_write = threading.Thread(target=self.serialWriter);
self.thread_write.setDaemon(True);
self.thread_write.start();
self.thread_monitor = threading.Thread(target=self.serialMonitor);
self.thread_monitor.setDaemon(True);
self.thread_monitor.start();
return True;
else:
return False;
except:
ELog.Trace();
return False;
def stop(self):
self.running = False;
try:
self.thread_read.join();
self.thread_write.join();
if self.l_serial.isOpen():
self.l_serial.close();
except:
pass;
def serialMonitor(self):
self.thread_read.join();
self.thread_write.join();
if self.exceptStop:
self.stop();
Global.var['PanelTool'].OnOpenPort(None, True);
def SendAppend(self, data):
#print data;
self.writeBuffer.extend(data);
def serialWriter(self):
while self.running:
try:
if len(self.writeBuffer):
c=self.writeBuffer.pop(0);
if 0<=c<=255:
self.l_serial.write(chr(c));
#else:
#print '0x%X' % c;
else:
time.sleep(0.01);
except:
self.running=False;
self.exceptStop=True;
ELog.Trace();
def serialReader(self):
while self.running:
try:
n = self.l_serial.inWaiting();
if n:
data = self.l_serial.read(n);
rawData=[];
for i in range(0,n):
rawData.append(ord(data[i]));
for i in range(0, len(Global.Modules)):
if Global.var['PanelMain'].modules[i].isUsing:
Global.var['PanelMain'].modules[i].RecvData(rawData);
else:
time.sleep(0.001);
except:
self.running=False;
self.exceptStop=True;
ELog.Trace();
'''
返回可用的串口
'''
def GetValidPort():
valid_port=[];
t_serial=None;
for i in range(0, 20):
try:
t_serial = serial.Serial();
t_serial.port = i;
t_serial.baudrate = 9600;
t_serial.timeout = 2;
t_serial.bytesize=EIGHTBITS;
t_serial.parity=PARITY_NONE;
t_serial.stopbits=STOPBITS_ONE;
t_serial.open();
except:
pass;
if t_serial.isOpen():
try:
t_serial.close();
except:
pass;
valid_port.append(i);
return valid_port;
if __name__=='__main__':
print GetValidPort();
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。