10 Star 43 Fork 16

lantsang / smart-dtu

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
bluestone_mqtt.py 8.57 KB
一键复制 编辑 原始数据 按行查看 历史
lantsang 提交于 2021-04-23 11:04 . #DTU3878 删除TODO注释
'''
File: bluestone_mqtt.py
Project: bluestone
Author: daniel dong
Email: dongzhenguo@lantsang.cn
Copyright 2021 - 2021 bluestone tech
'''
import net
import utime
import ujson
import log
import _thread
from misc import Power
from umqtt import MQTTClient
from usr import bluestone_config
from usr import bluestone_common
from usr import bluestone_gpio
from usr import bluestone_pwm
from usr import bluestone_fota
from usr import bluestone_uart
log.basicConfig(level = log.INFO)
_mqtt_log = log.getLogger("MQTT")
class BluestoneMqtt(object):
inst = None
def __init__(self, client_id, server, port, user, password, sub_topic, pub_topic):
BluestoneMqtt.inst = self
self.bs_config = None
self.bs_gpio = None
self.bs_pwm = None
self.bs_fota = None
self.bs_uart = None
self.sn = bluestone_common.BluestoneCommon.get_sn()
self.client_id = client_id
self.server = server
self.port = port
self.user = user
self.password = password
self.subscribe_topic = sub_topic
self.publish_topic = pub_topic
self.client = None
self._is_sub_callback_running = False
self._is_message_published = False
def _init_mqtt(self):
self.bs_config = bluestone_config.BluestoneConfig('bluestone_config.json')
self.bs_data_config = bluestone_config.BluestoneConfig('bluestone_data.json')
self.bs_gpio = bluestone_gpio.BluestoneGPIO()
self.bs_pwm = bluestone_pwm.BluestonePWM()
self.bs_fota = bluestone_fota.BluestoneFOTA()
self.bs_uart = bluestone_uart.BlueStoneUart(None)
# 创建一个MQTT实例
self.client = MQTTClient(
client_id = self.client_id,
server = self.server,
port = self.port,
user = self.user,
password = self.password,
keepalive = 30)
_mqtt_log.info("Start a new mqtt client, id:{}, server:{}, port:{}".format(self.client_id, self.server, self.port))
self.client.set_callback(self._sub_callback) # 设置消息回调
self.client.connect() # 建立连接
#sub_topic = self.subscribe_topic.format(self.sn)
_mqtt_log.info("Subscribe topic is {}".format(self.subscribe_topic))
self.client.subscribe(self.subscribe_topic) # 订阅主题
def _update_gpio_status(self, io_level_list):
try:
self.bs_data_config.update_config('gpio', io_level_list)
message = {}
message['gpio'] = io_level_list
_mqtt_log.info("Data configuration is {}".format(message))
self.publish(message)
except Exception as err:
_mqtt_log.error("Cannot update gpio level list, the error is {}".format(err))
def _handle_callback(self, key, config):
result = False
try:
if key.startswith('uart'):
# first payload then config
payload = self.bs_config.get_value(config, "payload")
if payload:
self.bs_uart.uart_write(key, ujson.dumps(payload))
uart_config = self.bs_config.get_value(config, "config")
if uart_config:
self.bs_config.update_config(key, uart_config)
self.bs_data_config.update_config(key, uart_config)
result = True
elif key.startswith('pwm'):
id = self.bs_pwm.get_id_by_name(key)
is_breathe = self.bs_config.get_int_value(config, "breathe")
frequency = self.bs_config.get_int_value(config, "frequency")
duty = self.bs_config.get_float_value(config, "duty")
if is_breathe:
self.bs_pwm.start_breathe(id, frequency)
else:
self.bs_pwm.start_once(id, frequency, duty)
elif key.startswith('timer'):
self.bs_config.update_config(key, config)
self.bs_data_config.update_config(key, config)
result = True
elif key == 'gpio':
io_level_list = self.bs_gpio.read_all()
io_name_list = self.bs_gpio.get_io_name_list()
for gpio_key in config.keys():
if gpio_key not in io_name_list:
continue;
level = self.bs_config.get_int_value(config, gpio_key)
if level is not None:
id = self.bs_gpio.get_id_by_name(gpio_key)
self.bs_gpio.write(id, level)
io_level_list[gpio_key] = level
self._update_gpio_status(io_level_list)
elif key == 'fota':
mode = self.bs_config.get_int_value(config, "mode")
if mode == 0:
url_list = self.bs_config.get_value(config, "url")
self.bs_fota.start_fota_app(url_list)
elif mode == 1:
url = self.bs_config.get_value(config, "url")
self.bs_fota.start_fota_firmware(url)
result = True
except Exception as err:
_mqtt_log.error("Cannot handle callback for mqtt, the error is {}".format(err))
return result
def _sub_callback_internal(self, topic, msg):
try:
message = msg.decode()
_mqtt_log.info("Subscribe received, topic={}, message={}".format(topic.decode(), message))
restart = False
config_setting = ujson.loads(message)
config_keys = config_setting.keys()
for key in config_setting:
config = config_setting[key]
key_exist = self.bs_config.check_key_exist(key)
if key_exist:
if not restart:
restart = self.bs_config.mqtt_check_key_restart(key)
result = self._handle_callback(key, config)
if result:
restart = True
if restart:
restart = False
_mqtt_log.info("New configuration was received from mqtt, restarting system to take effect")
Power.powerRestart()
except Exception as err:
_mqtt_log.error("Cannot handle subscribe callback for mqtt, the error is {}".format(err))
finally:
self._is_sub_callback_running = False
# 云端消息响应回调函数
def _sub_callback(self, topic, msg):
if self._is_sub_callback_running:
_mqtt_log.error("Subscribe callback function is running, skipping the new request")
return
self._is_sub_callback_running = True
_thread.start_new_thread(self._sub_callback_internal, (topic, msg))
def _mqtt_publish(self, message):
#pub_topic = self.publish_topic.format(self.sn)
#message = {"Config":{},"message":"MQTT hello from Bluestone"}
if self.client is not None:
self.client.publish(self.publish_topic, message)
self._is_message_published = True;
_mqtt_log.info("Publish topic is {}, message is {}".format(self.publish_topic, message))
def _wait_msg(self):
while True:
if self.client is not None:
self.client.wait_msg()
utime.sleep_ms(300)
def is_message_published(self):
return self._is_message_published
def start(self):
self._init_mqtt()
_thread.start_new_thread(self._wait_msg, ())
def publish(self, message):
network_state = bluestone_common.BluestoneCommon.get_network_state()
if network_state != 1:
_mqtt_log.error("Cannot publish mqtt message, the network state is {}".format(network_state))
return
#_mqtt_log.info("Publish message is {}".format(message))
#self._mqtt_publish(ujson.dumps(message))
self._is_message_published = False
_thread.start_new_thread(self._mqtt_publish, ([ujson.dumps(message)]))
def connect(self):
if self.client is not None:
self.client.connect()
_mqtt_log.info("MQTT connected")
def disconnect(self):
if self.client is not None:
self.client.disconnect()
_mqtt_log.info("MQTT disconnected")
def close(self):
self.disconnect()
self.client = None
_mqtt_log.info("MQTT closed")
Python
1
https://gitee.com/lantsang/smart-dtu.git
git@gitee.com:lantsang/smart-dtu.git
lantsang
smart-dtu
smart-dtu
master

搜索帮助