1 Star 0 Fork 0

zhangpengju / rfid_cabinet_platform

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
mqtt_method.py 2.90 KB
一键复制 编辑 原始数据 按行查看 历史
zhangpengju 提交于 2024-01-30 14:35 . fix a bug
import time
import json
import paho.mqtt.client as mqtt
from db_method import DataBaseMethod
import logging
from util import get_mac_address
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class MQTTClient:
def __init__(self, broker_address, broker_port, client_name):
self.client = mqtt.Client(client_name)
self.broker_address = broker_address
self.broker_port = broker_port
# 设置连接成功和消息接收的回调函数
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
# 连接成功的回调函数
def on_connect(self, client, userdata, flags, rc):
logger.info("Connected with result code " + str(rc))
# 订阅MQTT Topic
self.client.subscribe("rfid_cabinet/cabinet_heartbeat")
self.client.subscribe("rfid_cabinet/cabinet_status")
self.client.subscribe("rfid_cabinet/esp_list")
# 接收MQTT消息的回调函数
def on_message(self, client, userdata, msg):
per_msg = json.loads(msg.payload.decode())
logger.info("MQTT接收信息:{} {}".format(msg.topic, per_msg))
if msg.topic == "rfid_cabinet/cabinet_heartbeat":
DataBaseMethod().insert_cabinet_heartbeat(per_msg)
DataBaseMethod().insert_cabinet_abnormal(per_msg)
DataBaseMethod().check_cabinet_current_status(per_msg) # 判断当前数据中心状态和设备状态是否统一
elif msg.topic == "rfid_cabinet/cabinet_status":
self.cabinet_status_task_processing(per_msg) # 状态任务处理
elif msg.topic == "rfid_cabinet/esp_list":
if per_msg.get("orderid"): # ppd 开机盘点
DataBaseMethod().insert_door_task(per_msg)
DataBaseMethod().update_door_task_inventory_data(per_msg) # esp_list 任务处理
def cabinet_status_task_processing (self, per_msg):
'''rfid_cabinet/cabinet_status 提交来数据任务处理
per_msg: {"mac": mac, "status": "盘点结束"}
'''
dm = DataBaseMethod()
dm.insert_cabinet_status(per_msg) # 插入时间状态
if per_msg["status"] == "正在盘点": # 正在盘点状态提交证明门已经关闭
dm.update_door_task_close_time(per_msg)
# 连接MQTT Broker
def connect(self):
self.client.connect(self.broker_address, self.broker_port, 60)
self.client.loop_forever()
# 发布消息到MQTT Topic
def publish_message(self, topic, message):
self.client.publish(topic, message)
def run_mqtt_client():
mqtt_client = MQTTClient(broker_address="47.96.3.242", broker_port=1883, client_name="server_client: {}".format(get_mac_address()))
mqtt_client.connect()
if __name__ == "__main__":
run_mqtt_client()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/zhangpengju/rfid_cabinet_platform.git
git@gitee.com:zhangpengju/rfid_cabinet_platform.git
zhangpengju
rfid_cabinet_platform
rfid_cabinet_platform
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891