1 Star 0 Fork 97

Ayrz / pyfree-IotEdge

forked from py-zxj-free / pyfree-IotEdge 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
README.md 18.74 KB
一键复制 编辑 原始数据 按行查看 历史

pyfree-gather-master

介绍

用于物联网数据采集的轻量级软件,通过xml配置采集信道,通过lua脚本进行信息报文数据解码或编码。考虑到目前主要部署在工控机上,暂时支持串口、网口(WIFI)的数据采集,后期会陆续扩展USB、蓝牙、4G等模块。涉及数传、lora、ZigBee通信的设备暂时可以通过转换设备转换成串口、网口等接入。

软件说明

架构概述:
  • 1)结构说明 采集软件具备规约转换能力,将采集数据汇聚并以通用标准通讯规约转发及受控,并转换采集和转发角色则可实现级联采控。
    数据聚合

  • 2)数据流:
    各种采集信道(网口、串口等)<->(报文编解码,lua脚本)<->采集中台(数据清洗、转换、映射)<->(报文编解码,lua脚本)<->各种转发信道(网口、串口等)
    数据传输

  • 3)类图:
    采集软件主要类

功能概述:
  • 1)数据采集:
    设置轮询间隔,进行总召唤,已从采集端获取设备态势;
    接收上层应用的控制信息,构建下控指令写入采集信道,并从采集信道等待设备端的响应指令,由于总召存在,有时不一定需要等待响应指令。 通过计算公式实现计算点信息采集,生成第一批孪生数据。
    当前版本支持通过通讯接口(如RS232、RS485、RJ45等)采集终端设备信息。

  • 2)报文解析:报文解析主要实现3点功能:

    • 其一,将一段字符串报文(ascii hex等格式)解析为点-值这样的数据对,当然这个点-值对是相对于一个采集(转发)信道而言,在某些情况可能会解析设备地址、点地址、值这样的数据对。在lua脚本的getReciveIDVAL函数中将根据下行指令和上行指令一并生成点-值的数据对集合
    • 其二,将点-值这样的数据对或数据对集转换成设备或采集识别的报文指令。在lua脚本的函数getDownControl中,将根据执行类型、点类型、点地址、数值等生成下行指令。由于考虑到业务逻辑的需要,暂时不支持将多个点-值数据对转换成下行指令,后续会考虑加入逻辑信息实现多点下控。
    • 其三,设备端一般作为从站端,大多采用应答式响应,为了实时获取设备态势,大多数情况下,需要主站端,即我们的采集软件不断下发总召指令已获取设备态势,通常总召指令一般就是查询指令,可以单点或多点查询,当然也需要配置指令的编码、格式等。具体的召唤指令在脚本(lua)的getTotalCallCMD函数进行配置。
    • 更具体的脚本信息可以参看demo-project/gather/modbus1.lua和结合源码理解。
    • 当前脚本库存储在本地,后期会架设云端脚本库服务,根据设备类型、厂家编号等获取响应脚本,并加载在本地,或手动配置指定脚本连接地址来获取
  • 3)实时数据:
    直接从采集信道获取的报文数据为原生数据,而报文解析得到的点-值数据对,以及根据这些数据对集加权计算得到的点-值数据对等都属于孪生数据。
    在采集软件中,设备态势的实时数据是以信道-点-值记录,由各自采集线程维护,实时数据会在数值变化或定期向转发通道推送。

  • 4)数据转发:
    转发信道可以有多个或不同类型,但目前是所有采集信道的实时数据(包括虚拟点、计算点)统一推送到转发信道,因此在采集信息点与转发信息点的映射中,并没有指定转发信道的编号
    当前版本支持通过网络接口(RJ45)与第三方平台进行监控数据交互通信。

  • 5)级联:
    某个采集软件的转发信道可以作为另一个采集软件的采集信道,从而形成级联,用于多级采集和数据汇总
    下级采集软件<->上级采集软件<->调度软件
    当前版本支持通过通讯接口(RJ45)进行自行级联通信。
    在下级采集软件的gather.xml配置文件设置转发端口到上级采集模块的采集接口上,例如:

    <!--下级采集软件的配置-->
    <nettype id="2" type="1" ptype="2" name="转发上级采集软件" ip="127.0.0.1" port="60100" />

    在上级采集软件的gather.xml配置文件设置采集信息,例如:

    <!--上级级采集软件的配置-->
    <!--采集端口与下级采集的转发端口匹配-->
    <nettype id="3" type="1" ptype="2" name="下级采集软件" ip="127.0.0.1" port="60100" />
    <!--设置来自下级采集的信息点映射-->
    <gather id="3" channeltype="3" channelid="3" name="NET" protocolid="4">
        <P id="1" type="1" addr="1" ip="127.0.0.1" />
        <P id="2" type="1" addr="2" ip="127.0.0.1" />
        <P id="4" type="2" addr="4" ip="127.0.0.1" />
        <P id="5" type="2" addr="5" ip="127.0.0.1" />
    </gather>
    <!--其中addr就是下级采集软件gmap.xml配置的转发信息点编号。-->
  • 6)运维配置:
    (1)在gather.xml配置采集端口参数、指令、采集点;
    (2)在gather.xml配置转发端口参数,在gmap.xml配置点集映射关系;
    (3)配置规约编码和解码脚本(lua),在gather.xml指定采集端口或转发端口调用;
    (4)在appconf.xml配置软件运行、日志记录等参数;

编译

一、依赖:
  1. acl_master acl_master是一个跨平台c/c++库,提供了网络通信库及服务器编程框架,同时提供更多的实用功能库及示例,具体编译与实现请参考其说明文档。
    项目地址: https://gitee.com/acl-dev/aclhttps://github.com/acl-dev/acl
    技术博客:https://www.iteye.com/blog/user/zsxxsz

做了两处源码调整:
由于acl提供的日志记录的时间只到秒级别,本项目需要毫秒的级别,因此修改了一下其源码,
在lib_acl/src/stdlib/acl_mylog.c文件中,将acl_logtime_fmt函数实现调整为:

void acl_logtime_fmt(char *buf, size_t size)  
{  
    //time_t	now;  
	struct timeval tm0;  
	gettimeofday(&tm0, NULL);  
	time_t	now = tm0.tv_sec;  
#ifdef	ACL_UNIX  
	struct tm local_time;  

	//(void) time (&now);  
	(void) localtime_r(&now, &local_time);  
	strftime(buf, size, "%Y/%m/%d %H:%M:%S", &local_time);  
	sprintf(buf, "%s.%03d ", buf,(int)(tm0.tv_usec/1000));  
#elif	defined(ACL_WINDOWS)  
	struct tm *local_time;  

	//(void) time (&now);  
	local_time = localtime(&now);  
	strftime(buf, size, "%Y/%m/%d %H:%M:%S", local_time);  
	sprintf(buf, "%s.%03d ",buf, (int)(tm0.tv_usec/1000));  
#else  
# error "unknown OS type"  
#endif  
}   

如果不需要毫秒级的时间格式,就不必修改。
其二:
需要修改acl库的源码,在lib_acl/src/stdlib/iostuff/acl_readable.c的int acl_readable(ACL_SOCKET fd)函数中

fds.events = POLLIN | POLLPRI;

由于win中调用了WSAPoll函数,而Winsock provider不支持POLLPRI与POLLWRBAND,调整为:

fds.events = POLLIN | POLLPRI;
#ifdef ACL_WINDOWS
fds.events &=~(POLLPRI|POLLWRBAND);
#endif
  1. lua_c++
    lua_c++是一个方便c++调用lua脚本的程序库,其下载地址:http://www.lua.org/ftp/ 也可以直接采用本项目编译好的库(Lua 5.3.0)。

  2. muparser
    muparser是一个计算公式库,快速和简单地解析数学表达式及进行计算,本项目主要用来处理计算点数值计算, 其源码本项目经过整理为cmake工程,直接进入到muparser目录按cmake编译即可

  3. libctb
    libctb是一个c++实现串口通信的程序库,具体编译请看起文档说明。
    在实践中发现win串口数据读取时部分数被截断的问题,因此对
    libctb\src\win32\serport.cpp的Read函数做了调整,具体如下,

int SerialPort::Read(char* buf,size_t len)  
{  
    DWORD read;  
    int m = m_fifo->items();  
    while(len) {  
        if(m_fifo->get(buf) == 1) {  
            len--;  
            buf++;  
        }  
        else {  
            break;  
        }  
    }  
    //printf("SerialPort::Read(%d,%d)..1..\n", m, len);  
    //byte _buf[64] = { 0 };  
    if(!ReadFile(fd,(LPVOID)buf,len,&read,&m_ov)) {  
        // if we use a asynchrone reading, ReadFile gives always  
        // FALSE  
        // ERROR_IO_PENDING means ok, other values show an error  
        if(GetLastError() != ERROR_IO_PENDING) {  
            // oops..., error in communication  
            //printf("SerialPort::Read error(%d)..2..\n", (int)GetLastError());  
            return -1;  
        }  
    }  
    else {  
        //memcpy(buf, _buf, (int)read + m);  
        //printf("SerialPort::Read(%d,%d,%d,%d)..3..\n", m, len,(int)read,strlen(buf));  
        // ok, we have read all wanted bytes  
        return (int)read + m;  
    }  
    //调整,防止NULL字符被丢弃  
    int size = strlen(buf);  
    for (int i = size; i < len; i++)  
    {  
        if (0X00 != buf[i])  
            size = i+1;  
    }  

    //memcpy(buf, _buf, (int)read + m);  
    //printf("SerialPort::Read(%d,%d,%d,%d)..4..\n", m, len, (int)read, strlen(buf));  
    return size;  
};   

另外由于指定更严格的编译要求,其libctb\include\ctb-0.16\iobase.h和libctb\src\iobase.cpp的ReadUntilEOS函数做了调整 调整为一下,即为其字符指针添加const修饰符:

virtual int ReadUntilEOS(char*& readbuf  
    ,size_t* readedBytes  
    ,const char* eosString = "\n"  
    ,long timeout_in_ms = 1000L  
    ,char quota = 0);   
  
int IOBase::ReadUntilEOS(char*& readbuf,  
                        size_t* readedBytes,  
                        const char* eosString, //调整  
                        long timeout_in_ms,  
                        char quota)  
{  
    int n = 0;  
    int timeout = 0;  
    int bufsize = DELTA_BUFSIZE;  
    int result = 0;  
    int quoted = 0;  
    char* buf = new char[bufsize];  
    char* des = buf;  
    char* eos = (char*)eosString; //调整  
    char ch;  
  
    Timer t(timeout_in_ms,&timeout,NULL);  
    t.start();  
  
    while(!timeout) {  
        if(des >= &buf[bufsize]) {  
            // buffer full, realloc more memory  
            char* tmp = new char[bufsize + DELTA_BUFSIZE + 1];  
            memcpy(tmp,buf,bufsize);  
            delete[] buf;  
            buf = tmp;  
            des = &buf[bufsize];  
            bufsize += DELTA_BUFSIZE;  
        }  
        // read next byte  
        n = Read(&ch,1);  
        if(n < 0) {  
            // an error occured  
            result = -1;  
            break;  
        }  
        else if(n == 0) {  
            // no data available, give up the processor for some time  
            // to reduce the cpu last  
            sleepms(10);  
            continue;  
        }  
        // if eos is composed of more than one char, and the current  
        // byte doesn't match the next eos character, we handle the   
        // readed byte as a normal char (and not an eos)  
        if((eos != (char*)eosString) && (ch != *eos)) { //调整  
            // FIXME!  
            // write all characters, which was matched the eos string  
            // until now (with the first wrong character all received  
            // eos characters are invalid and must handled as normal  
            // characters).  

            // This doesn't work right and is only a little workaround  
            // because the received eos chars are lost  
            PutBack(ch);  
            // because we doesn't match the eos string, we must 'reset'   
            // the eos match  
            eos = (char*)eosString; //调整  
            continue;  
        }  
        else {  
            if((ch == *eos) && !quoted) {  
            if(*++eos == 0) {  
                // the eos string is complete  
                result = 1;  
                break;  
            }  
            continue;  
            }  
        }  
        if(ch == quota) {  
            quoted ^= 1;  
        }  
        *des++ = ch;  
    }  
    *des = 0;  
    readbuf = buf;  
    *readedBytes = des - buf;  
    return result;  
};   
二、项目编译:

Linux编译 项目编译需要cmake+gcc支持,目前本项目的编译环境是centos7.3,由于需要将git相关信息写入程序版本描述,需要安装git
或在CMakeLists.txt中注销:"execute_process(COMMAND /bin/bash ../build.sh)"
编译命令类似如下:

cd gather-master  
mkdir build-linux  
cd build-linux  
cmake ..  
make  

win编译 当前win编译测试采用vs2015+cmake编译,例子如下
备注:若需要32为支持请去掉Win64指定,msbuild为vs的命令编译工具,可以直接vs打开工程文件编译

cd gather-master  
mkdir build-win  
cd build-win  

cmake -G "Visual Studio 14 2015 Win64" -DCMAKE_BUILD_TYPE=Release ..  
msbuild pyfree-gather.sln /p:Configuration="Release" /p:Platform="x64"

cmake -G "Visual Studio 14 2015 Win64" -DCMAKE_BUILD_TYPE=Debug ..  
msbuild pyfree-gather.sln /p:Configuration="Debug" /p:Platform="x64"
又或
cmake -G"NMake Makefiles" -DCMAKE_BUILD_TYPE=Release ..   
#or 
cmake -G"NMake Makefiles" -DCMAKE_BUILD_TYPE=Debug ..   
#then
nmake

由于acl第三方库win静态库有点大,本项目debug版本库不上传编译好的库,请自行编译,需要做一处调整 需要修改acl库的源码,在lib_acl/src/stdlib/iostuff/acl_readable.c的int acl_readable(ACL_SOCKET fd)函数中

fds.events = POLLIN | POLLPRI;

由于win中调用了WSAPoll函数,而Winsock provider不支持POLLPRI与POLLWRBAND,调整为:

fds.events = POLLIN | POLLPRI;
#ifdef ACL_WINDOWS
fds.events &=~(POLLPRI|POLLWRBAND);
#endif

demo示例

Linux

  1. 环境搭建
    主机win64系统
    采用"tool/VSPD虚拟串口.zip"串口工具安装并创建一个串口对COM4<->COM5
    采用"tool/mbslave.exe"工具模拟设备端,打开并设置其端口选择COM5,19200 8 N 1
    采用demo-project\player_test\player.exe模拟网络采集,需要安装其目录下的LAVFilters-0.65.exe媒体库支持
    虚拟机
    采用VMware工具创建虚拟机,centos7.3系统,本样例的网络地址设置192.168.174.130,
    开启虚拟机的共享文件夹功能,将项目挂载到虚拟机上
    在虚拟机关闭时,进入虚拟机设置页面,添加串行端口,
    指定使用物理串行端口,选择COM4(前提需要创建虚拟串口对)

  2. 程序配置
    编译的采集软件输出在demo-project/gather,并会拷贝到bgahter目录下
    bgahter为下位采集,作为gather一个采集端

程序启动有根据磁盘或网卡校验的License约束
因此需要向生成License的sn.txt输出文件,启动虚拟机,进入其目录执行指令构建:
./SWL 0 22 或 ./SWL 1 22
其会生成sn.txt文件,采集软件在启动时会读取该文件进行License验证
如果没用SWL程序,去顶级目录下的swLicense项目编译生成,支持cmake编译

  1. 项目配置
    appconf.xml:主要配置磁盘空间、日志路径以及采集配置文件和数据转发映射配置文件
    gather.xml:采集配置文件,其中配置了采集信道、转发信道以及信道涉及的端口信息、协议信息,具体看配置文件说明
    gmap.xml:采集信道数据到转发信道数据的映射关系配置,具体看配置说明

本demo中,bgahter作为一个下级采集数据,配置了一个串口采集端口(/dev/ttyS0),即是虚拟机配置的COM4串口
协议为modbus,将与mbslave.exe工具模拟设备端进行串口通信,modbus01.lua为其报文解析脚本
bgahter通过tcp/ip与上级采集通信,tcpPCS.lua为其解析脚本

gahter作为上级采集软件,有两个采集信道,其一,将通过tcp/ip网络通信获取bgahter采集的采集数据,其解析脚本tcpPCS.lua
其二,通过tcp/ip网络通信获取一个播放工具的采集数据,其解析脚本为tcpPlay.lua。
这两个采集信道的数据汇聚转发信道与上级应用调度服务通信
仅仅测试不需要安装部署gather服务,直接命令行进入下(上)级采集软件目录直接启动pyfree-gather即可。
需要安装为linux服务,请参考demo-project/gather/install_linux.sh文件。

win64

  1. 环境搭建
    采用"tool/VSPD虚拟串口.zip"串口工具安装并创建一个串口对COM4<->COM5
    采用"tool/mbslave.exe"工具模拟设备端,打开并设置其端口选择COM5,19200 8 N 1
    采用demo-project\player_test\player.exe模拟网络采集,需要安装其目录下的LAVFilters-0.65.exe媒体库支持

服务安装,管理员启动cmd,进入demo-project/gather目录:
安装:pyfree-gather.exe install
卸载:pyfree-gather.exe uninstall
在任务管理器或服务管理页面可以启停服务或配置其服务相关信息
更新:在任务管理器停止服务,拷贝pyfree-gather.exe覆盖完成更新

  1. 程序配置
    编译的采集软件输出在demo-project/gather,并拷贝到bgahter目录下
    bgahter为下位采集,作为gather一个采集端

程序启动有根据磁盘或网卡校验的License约束
因此需要向生成License的sn.txt输出文件,进入其目录执行指令构建:
SWL.exe 0 22 或 SWL.exe 1 22
其会生成sn.txt文件,采集软件在启动时会读取该文件进行License验证
如果没用SWL.exe程序,去顶级目录下的swLicense项目编译生成,支持cmake编译

  1. 项目配置
    appconf.xml:主要配置磁盘空间、日志路径以及采集配置文件和数据转发映射配置文件
    gather.xml:采集配置文件,其中配置了采集信道、转发信道以及信道涉及的端口信息、协议信息,具体看配置文件说明
    gmap.xml:采集信道数据到转发信道数据的映射关系配置,具体看配置说明

本demo中,bgahter作为一个下级采集数据,配置了一个串口采集端口(COM4,19200 8 N 1)
协议为modbus,将与mbslave.exe工具模拟设备端进行串口通信,modbus01.lua为其报文解析脚本
bgahter通过tcp/ip与上级采集通信,tcpPCS.lua为其解析脚本

gahter作为上级采集软件,有两个采集信道,其一,将通过tcp/ip网络通信获取bgahter采集的采集数据,其解析脚本tcpPCS.lua
其二,通过tcp/ip网络通信获取一个播放工具的采集数据,其解析脚本为tcpPlay.lua。
这两个采集信道的数据汇聚转发信道与上级应用调度服务通信
如仅仅用于测试可以直接命令行进入下(上)级采集软件目录运行pyfree-gather.exe即可。

C++
1
https://gitee.com/ayrz2010/pyfree-IotEdge.git
git@gitee.com:ayrz2010/pyfree-IotEdge.git
ayrz2010
pyfree-IotEdge
pyfree-IotEdge
master

搜索帮助