代码拉取完成,页面将自动刷新
import time
import json
import paho.mqtt.client as mqtt
from db_method import DataBaseMethod
import logging
from util import get_mac_address
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class MQTTClient:
def __init__(self, broker_address, broker_port, client_name):
self.client = mqtt.Client(client_name)
self.broker_address = broker_address
self.broker_port = broker_port
# 设置连接成功和消息接收的回调函数
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
# 连接成功的回调函数
def on_connect(self, client, userdata, flags, rc):
logger.info("Connected with result code " + str(rc))
# 订阅MQTT Topic
self.client.subscribe("rfid_cabinet/cabinet_heartbeat")
self.client.subscribe("rfid_cabinet/cabinet_status")
self.client.subscribe("rfid_cabinet/esp_list")
# 接收MQTT消息的回调函数
def on_message(self, client, userdata, msg):
per_msg = json.loads(msg.payload.decode())
logger.info("MQTT接收信息:{} {}".format(msg.topic, per_msg))
if msg.topic == "rfid_cabinet/cabinet_heartbeat":
DataBaseMethod().insert_cabinet_heartbeat(per_msg)
DataBaseMethod().insert_cabinet_abnormal(per_msg)
DataBaseMethod().check_cabinet_current_status(per_msg) # 判断当前数据中心状态和设备状态是否统一
elif msg.topic == "rfid_cabinet/cabinet_status":
self.cabinet_status_task_processing(per_msg) # 状态任务处理
elif msg.topic == "rfid_cabinet/esp_list":
if per_msg.get("orderid"): # ppd 开机盘点
DataBaseMethod().insert_door_task(per_msg)
DataBaseMethod().update_door_task_inventory_data(per_msg) # esp_list 任务处理
def cabinet_status_task_processing (self, per_msg):
'''rfid_cabinet/cabinet_status 提交来数据任务处理
per_msg: {"mac": mac, "status": "盘点结束"}
'''
dm = DataBaseMethod()
dm.insert_cabinet_status(per_msg) # 插入时间状态
if per_msg["status"] == "正在盘点": # 正在盘点状态提交证明门已经关闭
dm.update_door_task_close_time(per_msg)
# 连接MQTT Broker
def connect(self):
self.client.connect(self.broker_address, self.broker_port, 60)
self.client.loop_forever()
# 发布消息到MQTT Topic
def publish_message(self, topic, message):
self.client.publish(topic, message)
def run_mqtt_client():
mqtt_client = MQTTClient(broker_address="47.96.3.242", broker_port=1883, client_name="server_client: {}".format(get_mac_address()))
mqtt_client.connect()
if __name__ == "__main__":
run_mqtt_client()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。