api.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. from models import *
  2. import requests
  3. import json
  4. from logger import logger
  5. from serial.tools import list_ports
  6. from utils.hlm_http_request import forward_request
  7. from sockets.socket_client import socket_manager
  8. from mcu.DeviceControl import DeviceControl
  9. import time
  10. @app.get("/")
  11. async def index():
  12. # await socket_manager.send_message(msg="测试")
  13. return {"message": "Hello World"}
  14. @app.get("/send_test")
  15. async def index():
  16. data = {"data1":1,"data2":2,"data3":3,"data4":4}
  17. await socket_manager.send_message(msg="测试",data=data)
  18. return {"message": "Hello World"}
  19. @app.get("/scan_serials", description="扫描可用的设备端口")
  20. async def scanSerials():
  21. """扫描串口"""
  22. ports = list_ports.comports()
  23. print("Scanning", ports)
  24. return {"message": "Hello World"}
  25. @app.get("/test_conndevice")
  26. def test_conndevice():
  27. device_control = DeviceControl()
  28. p_list = []
  29. temp_ports_dict = {}
  30. # while 1:
  31. time.sleep(1)
  32. ports_dict = device_control.scan_serial_port()
  33. temp_ports_dict = ports_dict
  34. if not ports_dict:
  35. # 全部清空 移除所有串口
  36. if p_list:
  37. _p = p_list.pop()
  38. device_control.remove_port(_p)
  39. # continue
  40. if ports_dict:
  41. # print(plist)
  42. for index, _i in enumerate(p_list):
  43. if _i not in ports_dict:
  44. _p = p_list.pop(index)
  45. device_control.remove_port(_p)
  46. for _port_name, _port_value in ports_dict.items():
  47. if _port_name not in p_list:
  48. try:
  49. p_list.append(_port_name)
  50. device_control.add_port_by_linkage(_port_name)
  51. except BaseException as e:
  52. print(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
  53. print(e.__traceback__.tb_lineno) # 发生异常所在的行数
  54. print("串口不存在{} {}".format(_port_name, e))
  55. # threading.Thread(target=self.add_port, args=(_port_name, _port_value)).start()
  56. # self.add_port(_p)
  57. @app.api_route(
  58. "/forward_request", methods=["GET", "POST"], description="代理转发hlm项目得请求"
  59. )
  60. async def forwardRequest(request: HlmForwardRequest):
  61. """
  62. 转发HTTP请求到目标URL
  63. :param request: FastAPI Request对象
  64. :return: 目标接口的响应
  65. """
  66. try:
  67. if request.method == "GET":
  68. params = request.query_params
  69. elif request.method == "POST":
  70. params = json.dump(request.query_params)
  71. else:
  72. raise UnicornException("仅支持GET和POST方法")
  73. target_url = request.target_url
  74. method = request.method.upper()
  75. headers = request.headers
  76. if not target_url:
  77. raise UnicornException("目标url地址不能为空")
  78. # 调用 hlm_http_request 中的 forward_request 函数
  79. response = forward_request(
  80. target_url, params=params, method=method, headers=headers
  81. )
  82. return response
  83. except requests.RequestException as e:
  84. raise UnicornException(e)
  85. except Exception as e:
  86. raise UnicornException(e)