首页 > Python资料 博客日记
Python中websockets服务端从客户端接收消息并发送给多线程
2024-03-20 17:00:06Python资料围观282次
目录
思路:
1.websockets需要从客户端接收消息,由于websockets创建服务端只能绑定一个端口,所以需要单独占用一个线程。收到的消息,我们需要共享给主线程,然后主线程根据设备(多线程)分发消息给各线程
2.消息中心需要独立出来,websockets服务端放消息,主线程去消息
3.根据思路设计模块:
1.消息库
2.服务端
3.主线程
4.多线程
先运行Main.py,再运行websocket_client.py(客户端),客户端发送的消息可能不一样,所以统一消息里面必须有device_id 或者device_name
修改websocket_client.py中data的信息,发送不同消息
一、消息队列
message_base.py
根据设备,创建储存设备消息,提取设备消息的功能
from queue import Queue
class MessageBase:
def __init__(self):
self.data = dict()
def add(self, device, data):
if device in self.data:
self.data[device].put(data)
else:
self.data[device] = Queue()
self.data[device].put(data)
def get(self, device):
data_queue: Queue = self.data.get(device)
if not data_queue or data_queue.empty():
return None
data = data_queue.get()
return data
if __name__ == '__main__':
mb = MessageBase()
mb.add("a", "asdasd")
mb.add("a", "11111111")
print(mb.data)
data = mb.get("a")
print(data)
二、服务端
websocket_server.py
从客户端接收消息,并存到消息队列
import asyncio
import json
import threading
import websockets
##
from message_base import MessageBase
class WebServer:
def __init__(self, host, port, message_base: MessageBase):
self.host = host
self.port = port
self.clients = []
self.message_base = message_base
async def echo(self, websocket, path):
self.clients.append(websocket)
client_ip, client_port = websocket.remote_address
print(f"连接到:{client_ip}:{client_port}")
while True:
try:
recv_text = await websocket.recv()
data = json.loads(recv_text)
device = data.get("device")
if device:
self.message_base.add(device, data)
else:
continue
except websockets.ConnectionClosed:
print("ConnectionClosed...") # 链接断开
self.clients.remove(websocket)
break
except websockets.InvalidState:
print("InvalidState...") # 无效状态
self.clients.remove(websocket)
break
except Exception as e:
print(e)
def connect(self):
print("连接成功!")
asyncio.set_event_loop(asyncio.new_event_loop())
start_server = websockets.serve(self.echo, self.host, self.port)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
def run(self):
t = threading.Thread(target=self.connect)
t.start()
print("已启动!")
if __name__ == '__main__':
mb = MessageBase()
ws = WebServer("192.168.6.28", 8001, mb)
ws.run()
三、设备功能
device_function.py
每个设备对应的线程功能,可以统一也可以写多个功能
class DeviceFunc:
def __init__(self, device_name, data):
self.device_name = device_name
self.data = data
def show_data(self):
if self.data:
print(self.device_name, "收到消息:", self.data.get("value"))
四、主线程
main.py
初始化所有功能模块,并运行主线程
from message_base import MessageBase
from websocket_server import WebServer
from device_function import DeviceFunc
class MainThread:
def __init__(self, message_base: MessageBase, websocket_server: WebServer, device_list):
self.message_base = message_base
self.websocket_server = websocket_server
self.device_list = device_list
def run_server(self):
self.websocket_server.run()
def run(self):
self.run_server()
while True:
for device in self.device_list:
try:
# 开始根据设备即功能处理消息
data = self.message_base.get(device)
if not data:
continue
df = DeviceFunc(device, data)
df.show_data()
except Exception as err:
pass
if __name__ == '__main__':
mb = MessageBase()
ws = WebServer("192.168.6.28", 8000, mb)
device_list = ["aa", "bb", "cc"]
mt = MainThread(mb, ws, device_list)
mt.run()
五、客户端
webscoket_client.py
给服务端发送消息,测试用
import json
import websocket
class WebClient:
def __init__(self, host, port):
self.host = host
self.port = port
self.conn = None
self.flag = False
def connect(self):
try:
url = f"ws://{self.host}:{self.port}"
self.conn = websocket.create_connection(url)
self.flag = True
print("连接成功")
except Exception as err:
self.flag = False
print("连接失败", err)
def close(self):
self.conn.close()
def recv(self):
data = self.conn.recv(1024)
print(data)
def send(self, data):
self.conn.send(data)
print("发送成功")
if __name__ == '__main__':
host = "192.168.6.28"
port = 8000
ws = WebClient(host, port)
if not ws.flag:
ws.connect()
devices = ["aa", "bb", "cc"]
while True:
device = random.choice(devices)
s = ""
for i in range(random.randint(0, 100)):
s += chr(random.randint(65, 122))
data = {"device": device, "value": s}
data = json.dumps(data)
ws.send(data)
time.sleep(1)
六、更新
队列在获取消息时有阻塞的现象,我们上面避免阻塞用了下面注释的内容,
def get(self, device): data_queue: Queue = self.data.get(device) if not data_queue: # 阻塞 return None # if not data_queue or data_queue.empty(): # return None data = data_queue.get() return data
我们如果不用注释的内容,让获取消息直接进入阻塞,但是阻塞是在设备线程里面,不影响主线程和其他线程。这样,我们需要把设备功能封装到线程里面。这样设备工作不会相互影响
DeviceFunc对象改为DeviceThread
import threading
class DeviceThread(threading.Thread):
def __init__(self, device_name, message_base):
super().__init__(target=self.process)
self.device_name = device_name
self.message_base = message_base
def get_data(self):
data = self.message_base.get(self.device_name)
return data
def process(self):
while True:
data = self.get_data()
if data:
print(self.device_name, "收到消息:", data.get("value"))
主线程的run方法修改为:
def run(self): self.run_server() # while True: # for device in self.device_list: # try: # data = self.message_base.get(device) # if not data: # continue # df = DeviceFunc(device, data) # df.show_data() # except Exception as err: # pass for device in self.device_list: t = DeviceThread(device, self.message_base) t.start()
标签:
相关文章
最新发布
- 【Python】selenium安装+Microsoft Edge驱动器下载配置流程
- Python 中自动打开网页并点击[自动化脚本],Selenium
- Anaconda基础使用
- 【Python】成功解决 TypeError: ‘<‘ not supported between instances of ‘str’ and ‘int’
- manim边学边做--三维的点和线
- CPython是最常用的Python解释器之一,也是Python官方实现。它是用C语言编写的,旨在提供一个高效且易于使用的Python解释器。
- Anaconda安装配置Jupyter(2024最新版)
- Python中读取Excel最快的几种方法!
- Python某城市美食商家爬虫数据可视化分析和推荐查询系统毕业设计论文开题报告
- 如何使用 Python 批量检测和转换 JSONL 文件编码为 UTF-8
点击排行
- 版本匹配指南:Numpy版本和Python版本的对应关系
- 版本匹配指南:PyTorch版本、torchvision 版本和Python版本的对应关系
- Python 可视化 web 神器:streamlit、Gradio、dash、nicegui;低代码 Python Web 框架:PyWebIO
- 相关性分析——Pearson相关系数+热力图(附data和Python完整代码)
- Python与PyTorch的版本对应
- Anaconda版本和Python版本对应关系(持续更新...)
- Python pyinstaller打包exe最完整教程
- Could not build wheels for llama-cpp-python, which is required to install pyproject.toml-based proj