1 Star 0 Fork 0

jwang70s / TestSerial

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
testSerial.py 12.79 KB
一键复制 编辑 原始数据 按行查看 历史
jwang70s 提交于 2019-07-25 19:52 . 说明和显示
# -*- coding: utf-8 -*-
""" testSerial.py
串口设备通讯校验与时序图绘制
-----------------------------
发送(本机)串口指令数据,接收设备(从机)回复,并比对设备回复与期待回复是否相同以验证串口通讯是否有效。
同时在控制台输出发送指令数据与期待回复数据的波型,可以用来与示波器显示的波型进行比对。
在离线状态可以仅用于显示数据波型。
Test equipment by send/respond data via serial port and show timing volts notes.
--
Send command data and get respond data from the equipment and compare the responded data with expect value to varify the serial communication.
The command/respond data will be shown in timing voltages notes which can be compared with the wave getting from a oscilloscope on the bus.
A offline model without real command sending is also available to show timing voltage notes for the command only.
Written by J.wang(Wang Jian)
Source opened at : https://gitee.com/jwang70s/TestSerial with license of Apache 2.0
jwang70s@sina.com
"""
import math
import json
import serial
import re
import argparse
from schema import Schema,And,Or,SchemaError
import os
import sys
import time
HIGH = 'HIGH'
LOW = 'LOW'
CONF_FILE = 'config.json'
class SerialTiming:
""" Class for making timing volts notes """
TYPELIST = ['RS485','RS422','TTL','RS232']
zero_volt = ''
one_volt = ''
start_volt = ''
stop_volt = ''
def __init__(self,type_name='TTL'):
"""
Initiallize of SerialTiming
|Para:
type_name:str serial bus type from:'RS485','RS422','RS232','TTL'
|Exception:
ValueError
"""
if type_name.upper() in ['RS485','RS422','TTL']:
self.zero_volt = LOW
self.one_volt = HIGH
self.start_volt = LOW
self.stop_volt = HIGH
elif type_name.upper() in ['RS232']:
self.zero_volt = HIGH
self.one_volt = LOW
self.start_volt = HIGH
self.stop_volt = LOW
else:
raise ValueError("Unsupported type name '{0}',the name should be in {1}".format(type_name,SerialTiming.TYPELIST))
def timing_notes(self,datas:list,conf:dict):
"""
Get timing volts notes string
|Para:
datas:list(int) list of data like [1,10,7]
conf:dict dictition for configuration
|Return:
:str voltage notes like: '__--______________--|____--__--________--|__------__________--|'
|Exception:
ValueError
"""
# note parameters
hn = conf["high_volt_note"]
ln = conf["low_volt_note"]
sn = conf["seperator_note"]
nt = conf["note_times"]
# serial parameters
byte_size = conf['serial']['byte_size']
parity = conf['serial']['parity']
stop_bits = conf['serial']['stop_bits']
notes = []
for data in datas:
bits = []
#start bit
bits.extend([self.start_volt])
#data bits
bs = '{:08b}'.format(data) # to biniary string
bsl = [bs[x] for x in range(len(bs))] # to binary char list
bsl.reverse() # reverse to sequence by bit0~bitn
bsl = bsl[0:byte_size] # modified to byte_size
ifodd = bsl.count('1') % 2 # 1 for odd and 0 for even
#parity bit
if parity.upper() == 'N':
pass
elif parity.upper() == 'O':
bsl.extend([str((ifodd+1) % 2)]) #'0' for odd and '1' for even
elif parity.upper() == 'E': #'1' for odd and '0' for even
bsl.extend([str(ifodd)])
elif parity.upper() == 'M': #'1' for all
bsl.extend(['1'])
elif parity.upper() == 'S': #'0' for all
bsl.extend(['0'])
else:
raise ValueError("Wrong Parity:{0}".format(parity))
# replace '0' and '1' to volts string
for index,bit in enumerate(bsl):
if bit == '0':
bsl[index] = self.zero_volt
elif bit == '1':
bsl[index] = self.one_volt
else:
pass # never happend
bits.extend(bsl)
#change results by time of notes into notes list
for bit in bits:
notes.extend(nt * [bit])
# stopbits
for _unused in range(math.ceil(nt *stop_bits)):
notes.extend([self.stop_volt])
#replace high/low volt string(like 'HIGH') to high/low volt notes(like '-' as defined in configration file)
for index,note in enumerate(notes):
if note == HIGH:
notes[index] = hn
elif note == LOW:
notes[index] = ln
else:
pass #never happend
# add separator
if sn != '':
notes.extend([sn])
return ''.join(notes)
def conf_default():
"""
defalut configuration value
|Return:
:dict configuration dictionary
"""
conf = {
'serial':{
'ser_name':'',
'baud_rate':115200,
'byte_size':8,
'parity':'N',
'stop_bits':1,
'read_timeOut':2,
'rw_delay':0.5,
},
'ser_type':'TTL',
'low_volt_note':'_',
'high_volt_note':'-',
'seperator_note':'|',
'note_times':2,
}
return conf
def varify_conf(conf):
"""
verify the conf dictition
|return
:dict the vefified configuration dictionary
|Exception
SchemaError when not vefified
"""
conf_schema = {
'serial':{
'ser_name':str,
'baud_rate':And(int,lambda x: x > 0),
'byte_size':And(int,lambda x: x in [5,6,7,8]),
'parity':And(str,lambda x: x.upper() in ['N','O','E','M','S']),
'stop_bits':And(Or(int,float), lambda x: x in [1,1.5,2]),
'read_timeOut':And(Or(int,float), lambda x: x >0),
'rw_delay':And(Or(int,float),lambda x: x >=0),
},
'ser_type':And(str,lambda x: x.upper() in SerialTiming.TYPELIST),
'low_volt_note':And(str,lambda x: len(x) > 0),
'high_volt_note':And(str,lambda x: len(x) > 0),
'seperator_note':str,
'note_times':And(int,lambda x: x > 0),
}
return Schema(conf_schema).validate(conf)
def save_conf(conf):
""" seve configuration dict to json file """
with open(CONF_FILE,'wt') as f:
f.write(json.dumps(conf,indent=4))
def load_conf():
""" load configuration dict from json file """
with open(CONF_FILE) as f:
conf = json.loads(f.read())
return conf
def input2intlist(input_str):
""" transform the input string into int list
|Para:
input_str:str The input string can be in case of string like:
hex string: '0x680x740x68' or '0x687468' or '0x68 74 6874' or '0x 68 74 68'
int string: '104,116 104' or '104'
|Return:
:list(int) int list of transformed input command/respond
|Exception
ValueError
"""
sp = list(filter(None,re.split(r'\s|\,',input_str))) #string list without separator
if len(sp) == 1 and sp[0][0:2].upper() == '0X':
# in case of string like '0x680x740x68' or '0x687468'
sp = sp[0] # from ['sp']
sp = list(filter(None,re.split('0x|0X,',sp)))
sp = list(bytearray.fromhex(''.join(sp)))
elif len(sp[0]) >= 2 and sp[0][0:2].upper() == '0X':
# in case of string like '0x68 74 68,68' (space,tab or ',' seperated)
if sp[0].upper() == '0X':
# in case of string like '0x 68 74 68,68' (space,tab or ',' seperated)
sp = sp[1:]
sp0 = []
for x in sp:
sp0.extend(list(bytearray.fromhex(list(filter(None,re.split('0x|0X,',x)))[0])))
sp = sp0
else:
# in case of string like '104,116 104'
sp = [int(x) for x in sp]
return sp
if __name__ == "__main__":
#console input
print("\nTest equipment by send/respond data via serial port and show timing volts notes")
print("-----------------------------------------------------------------------------------")
print("Command line: input command to be send vai serial port.")
print("\tOnly show timing vlolts notes without real send in Offline model.")
print("\tPress Enter (without input) will use the last input value of BOTH Command and Respond.")
print("Respond line: input expected respond data (including endings)")
print("\tNot available in Offline model.")
print("\nType 'quit' in command line for quit.")
print("-----------------------------------------\n")
online = False
if os.path.exists('config.json'):
try:
conf = varify_conf(load_conf())
except Exception as e:
print("Invalide configration file:",repr(e),"\ndefault configration is used:")
conf = conf_default()
else:
conf = conf_default()
parser = argparse.ArgumentParser()
parser.description='Test equipment by send/respond data via serial port and show timing volts notes'
parser.add_argument("-p","--portname",help="serial port name for connection,none for offline",type=str)
parser.add_argument("-s","--serialtype",help="type name from 'RS485','RS422','RS232','TTL'(default)",type=str)
args = parser.parse_args()
if args.portname:
conf['serial']['ser_name'] = args.portname
if args.serialtype:
conf['ser_type'] = args.serialtype
#serial open
tm = SerialTiming(type_name=conf['ser_type'])
if conf['serial']['ser_name'] != '': # in case of config file ser_name is '' and not define by console
try:
ser = serial.Serial(port=conf['serial']['ser_name'],baudrate=conf['serial']['baud_rate'])
ser.bytesize = conf['serial']['byte_size']
ser.parity = conf['serial']['parity']
ser.stopbits = conf['serial']['stop_bits']
ser.timeout = conf['serial']['read_timeOut']
online = True
except Exception as e:
print("Serial port '{0}' opend Fail!".format(conf['serial']['ser_name']))
online = False
pass
if online:
print("OnLine model: Serial '{0}' is opened at port:'{1}'".format(conf['ser_type'],ser.port))
else:
print("OffLine model:{0}".format(conf['ser_type']))
#accept input and showing volts notes
cmd_last = '' #list(int)
res_last = '' #list(int)
while True:
prompt = "Command(Enter for last):"
# if cmd_last != '':
# prompt = "Command(Enter for last int:{0}):".format(cmd_last)
cmd_input = input(prompt)
if cmd_input.upper() == "QUIT":
yorn = input("Save the configration to file (yes/no(Enter))?:")
if yorn.upper() == 'YES':
save_conf(conf)
print("Configration is saved to file: {0}".format(CONF_FILE))
sys.exit()
elif cmd_input == '':
if cmd_last != '':
cmd_int = cmd_last
else:
continue
else:
try:
cmd_int = input2intlist(cmd_input)
except Exception as e:
print("Invalid input",repr(e))
continue
cmd_last = cmd_int
if online:
if cmd_input == '':
res_int = res_last
else:
while True:
res_input = input("Expected Respond:")
if res_input == '':
continue
try:
res_int = input2intlist(res_input)
break
except Exception as e:
continue
res_last = res_int
notes = tm.timing_notes(cmd_int,conf)
if online == False:
notes += '\n'
print("Will be send:",*cmd_int,"\nTiming Volts:",notes)
if online:
notes = tm.timing_notes(res_int,conf)
print("Expected Respond:",*res_int,"\nTiming Volts:",notes)
ser.write(cmd_int)
time.sleep(conf['serial']['rw_delay'])
ser_res = ser.read(len(res_int))
ser.flushInput() #make it clear for next recieve
if list(ser_res) == res_int:
result_str = 'Meet the expected.\n'
else:
result_str = 'NOT MATCH the expected.\n'
if len(ser_res) == 0: result_str = result_str[:-1] +'(TIME OUT)\n'
print("Serial Recieve:",*list(ser_res),",",result_str)
Python
1
https://gitee.com/jwang70s/TestSerial.git
git@gitee.com:jwang70s/TestSerial.git
jwang70s
TestSerial
TestSerial
master

搜索帮助