2 Star 2 Fork 0

MAMAMA / scratchlink-server

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
scratchlink.py 7.13 KB
AI 代码解读
一键复制 编辑 原始数据 按行查看 历史
MAMAMA 提交于 2020-03-20 15:05 . 完成
import tornado.ioloop
import tornado.web
import tornado.websocket
from tornado import httpserver
import os.path
import logging
import maLogger
import json
import yaml
import threading
import time
import asyncio
import base64
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
class ScratchBleParser(tornado.websocket.WebSocketHandler):
def __init__(self, *args, **kargs):
super().__init__(*args, **kargs)
self._logger = maLogger.maGetLogger('scratchlink')
self._client_logger = maLogger.maGetLogger('scratch3.0')
self._online = False
self.ble_scanning = False
self.msg_cnt = 0 #消息计数
self.loop = asyncio.get_running_loop()
self.bleMessage = {
"jsonrpc": "2.0",
"method": "nothing",
"params":{
"name": "nothing"
}
}
self.led_matrix = [
['-','-','-','-','-'],
['-','-','-','-','-'],
['-','-','-','-','-'],
['-','-','-','-','-'],
['-','-','-','-','-'],
]
self._logger.info("_________init_________")
def on_display(self, bytelist):
'''
在子类中重写方法,bytelist是一个十六进制数组,在scratch中对应要控制的LED阵列的位置
bit0 bit1 bit2 bit3 bit4 HEX
0 1 0 1 0 0x0A
1 0 1 0 1 0x15
1 0 0 0 1 0x11
0 1 0 1 0 0x0A
0 0 1 0 0 0x04
注意最左边的LED为最低位,和常规的习惯相反
'''
pass
def on_display_text(self, txt):
'''
在子类中重写方法,txt是发送的文本,对应scratch中display_text的内容
执行这个方法scratch会有一个等待时间,响应不如控制LED快
'''
pass
def check_origin(self, origin):
return True
def open(self):
self._logger.info('websocket established.')
self._online = True
def on_close(self):
self._logger.info('websocket closed.')
self._online = False
def on_message(self,message):
self._client_logger.debug(message)
try:
message_dict = yaml.safe_load(message)
if 'id' in message_dict:
self.msg_cnt = message_dict['id']
if message_dict['method'] == 'discover':
self.ble_scanning = True
self.write_message(self.ble_confirm())
asyncio.run_coroutine_threadsafe(self.ble_discover_task(), self.loop)
elif message_dict['method'] == 'connect':
self.ble_scanning = False
self.write_message(self.ble_confirm())
elif message_dict['method'] == 'read':
if message_dict['params']['startNotifications']:
self.write_message(self.ble_confirm())
asyncio.run_coroutine_threadsafe(self.ble_notification_task(), self.loop)
elif message_dict['method'] == 'write':
rawstr = message_dict['params']['message']
rawstr = base64.b64decode(rawstr)
if(rawstr[0] == 0x81):
#0x81表示发送文本
msg = rawstr[1:].decode('utf-8','replace')
self._logger.info('[receive message]: '+ msg)
self.on_display_text(msg)
elif(rawstr[0] == 0x82):
#0x82表示控制LED阵列
led_matrix = rawstr[1:]
for index,b in enumerate(led_matrix):
mask = 0x10
for i in range(5):
if(b&mask!=0):
self.led_matrix[index][4-i] = '*'
else:
self.led_matrix[index][4-i] = '-'
mask = mask >> 1
self._logger.debug(self.led_matrix[index])
self.on_display(rawstr)
self.write_message(self.ble_confirm(result=len(rawstr)))
except Exception as e:
self._logger.error(e)
'''
搜索蓝牙消息的应答模板
{
"jsonrpc": "2.0",
"method": "didDiscoverPeripheral",
"params": {
"name": "BBC micro:bit [potez]",
"rssi": -35,
"peripheralId": 212235825896020
}
}
'''
def ble_discover_reply(self):
self.bleMessage['method'] = "didDiscoverPeripheral"
self.bleMessage['params'] = {
"name": "BBC micro:bit [potez]",
"rssi": -35,
"peripheralId": 212235825896020
}
return self.__toJson(recoard=False)
'''
通知模板,message是HEX数据,可以用base64_Hex解码
推荐网站http://www.tomeko.net/online_tools/base64.php
里面包含了传感器和按键的信息,具体每个字节是什么含义有待分析
{
"jsonrpc": "2.0",
"method": "characteristicDidChange",
"params": {
"serviceId": "0000f005-0000-1000-8000-00805f9b34fb",
"characteristicId": "5261da01-fa7e-42ab-850b-7c80220097cc",
"encoding": "base64",
"message": "AET/9wAAAAAAAAAAAAAAAAAAAAA="
}
}
'''
def ble_notification_reply(self):
self.bleMessage['method'] = "characteristicDidChange"
self.bleMessage['params'] = {
"serviceId": "0000f005-0000-1000-8000-00805f9b34fb",
"characteristicId": "5261da01-fa7e-42ab-850b-7c80220097cc",
"encoding": "base64",
"message": "AET/9wAAAAAAAAAAAAAAAAAAAAA="
}
return self.__toJson(recoard=False)
def ble_confirm(self,result=None):
msg = {"jsonrpc":"2.0","id":0,"result":None}
msg['id'] = self.msg_cnt
msg['result'] = result
return self.__toJson(msg)
def __toJson(self, msg=None, recoard=True):
if(msg is None):
msg = self.bleMessage
json_ret = json.dumps(msg)
if(recoard):
self._logger.debug(json_ret)
return json_ret
## @brief 模拟发现ble设备的应答
async def ble_discover_task(self):
while(self._online and self.ble_scanning):
await asyncio.sleep(0.1)
self.write_message(self.ble_discover_reply())
## @brief 模拟notification
async def ble_notification_task(self):
while(self._online):
await asyncio.sleep(0.1)
self.write_message(self.ble_notification_reply())
class ScratchLink():
def __init__(self, BleParser=ScratchBleParser):
scratchlink_handler = tornado.web.Application([
(r"/scratch/ble", BleParser),
])
# 配置https证书
server = httpserver.HTTPServer(scratchlink_handler, ssl_options={
"certfile": "device-manager.scratch.mit.edu.pem",
"keyfile": "device-manager.scratch.mit.edu-key.pem",
})
server.listen(20110)
_log = maLogger.maGetLogger('tornado')
_log.info('tornado server start.')
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
ScratchLink()
Python
1
https://gitee.com/MAMAMAisused/scratchlink-server.git
git@gitee.com:MAMAMAisused/scratchlink-server.git
MAMAMAisused
scratchlink-server
scratchlink-server
master

搜索帮助