| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- from models import *
- import requests
- import json
- from logger import logger
- from serial.tools import list_ports
- from utils.hlm_http_request import forward_request
- from sockets.socket_client import socket_manager
- from mcu.DeviceControl import DeviceControl
- import time
- @app.get("/")
- async def index():
- # await socket_manager.send_message(msg="测试")
- return {"message": "Hello World"}
- @app.get("/send_test")
- async def index():
- data = {"data1":1,"data2":2,"data3":3,"data4":4}
- await socket_manager.send_message(msg="测试",data=data)
- return {"message": "Hello World"}
- @app.get("/scan_serials", description="扫描可用的设备端口")
- async def scanSerials():
- """扫描串口"""
- ports = list_ports.comports()
- print("Scanning", ports)
- return {"message": "Hello World"}
- @app.get("/test_conndevice")
- def test_conndevice():
- device_control = DeviceControl()
- p_list = []
- temp_ports_dict = {}
- # while 1:
- time.sleep(1)
- ports_dict = device_control.scan_serial_port()
- temp_ports_dict = ports_dict
- if not ports_dict:
- # 全部清空 移除所有串口
- if p_list:
- _p = p_list.pop()
- device_control.remove_port(_p)
- # continue
- if ports_dict:
- # print(plist)
- for index, _i in enumerate(p_list):
- if _i not in ports_dict:
- _p = p_list.pop(index)
- device_control.remove_port(_p)
- for _port_name, _port_value in ports_dict.items():
- if _port_name not in p_list:
- try:
- p_list.append(_port_name)
- device_control.add_port_by_linkage(_port_name)
- except BaseException as e:
- print(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
- print(e.__traceback__.tb_lineno) # 发生异常所在的行数
- print("串口不存在{} {}".format(_port_name, e))
- # threading.Thread(target=self.add_port, args=(_port_name, _port_value)).start()
- # self.add_port(_p)
- @app.api_route(
- "/forward_request", methods=["GET", "POST"], description="代理转发hlm项目得请求"
- )
- async def forwardRequest(request: HlmForwardRequest):
- """
- 转发HTTP请求到目标URL
- :param request: FastAPI Request对象
- :return: 目标接口的响应
- """
- try:
- if request.method == "GET":
- params = request.query_params
- elif request.method == "POST":
- params = json.dump(request.query_params)
- else:
- raise UnicornException("仅支持GET和POST方法")
- target_url = request.target_url
- method = request.method.upper()
- headers = request.headers
- if not target_url:
- raise UnicornException("目标url地址不能为空")
- # 调用 hlm_http_request 中的 forward_request 函数
- response = forward_request(
- target_url, params=params, method=method, headers=headers
- )
- return response
- except requests.RequestException as e:
- raise UnicornException(e)
- except Exception as e:
- raise UnicornException(e)
|