首页 > Python资料 博客日记
Python中websockets服务端从客户端接收消息并发送给多线程
2024-03-20 17:00:06Python资料围观374次
目录
思路:
1.websockets需要从客户端接收消息,由于websockets创建服务端只能绑定一个端口,所以需要单独占用一个线程。收到的消息,我们需要共享给主线程,然后主线程根据设备(多线程)分发消息给各线程
2.消息中心需要独立出来,websockets服务端放消息,主线程去消息
3.根据思路设计模块:
1.消息库
2.服务端
3.主线程
4.多线程
先运行Main.py,再运行websocket_client.py(客户端),客户端发送的消息可能不一样,所以统一消息里面必须有device_id 或者device_name
修改websocket_client.py中data的信息,发送不同消息
一、消息队列
message_base.py
根据设备,创建储存设备消息,提取设备消息的功能
from queue import Queue
class MessageBase:
def __init__(self):
self.data = dict()
def add(self, device, data):
if device in self.data:
self.data[device].put(data)
else:
self.data[device] = Queue()
self.data[device].put(data)
def get(self, device):
data_queue: Queue = self.data.get(device)
if not data_queue or data_queue.empty():
return None
data = data_queue.get()
return data
if __name__ == '__main__':
mb = MessageBase()
mb.add("a", "asdasd")
mb.add("a", "11111111")
print(mb.data)
data = mb.get("a")
print(data)
二、服务端
websocket_server.py
从客户端接收消息,并存到消息队列
import asyncio
import json
import threading
import websockets
##
from message_base import MessageBase
class WebServer:
def __init__(self, host, port, message_base: MessageBase):
self.host = host
self.port = port
self.clients = []
self.message_base = message_base
async def echo(self, websocket, path):
self.clients.append(websocket)
client_ip, client_port = websocket.remote_address
print(f"连接到:{client_ip}:{client_port}")
while True:
try:
recv_text = await websocket.recv()
data = json.loads(recv_text)
device = data.get("device")
if device:
self.message_base.add(device, data)
else:
continue
except websockets.ConnectionClosed:
print("ConnectionClosed...") # 链接断开
self.clients.remove(websocket)
break
except websockets.InvalidState:
print("InvalidState...") # 无效状态
self.clients.remove(websocket)
break
except Exception as e:
print(e)
def connect(self):
print("连接成功!")
asyncio.set_event_loop(asyncio.new_event_loop())
start_server = websockets.serve(self.echo, self.host, self.port)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
def run(self):
t = threading.Thread(target=self.connect)
t.start()
print("已启动!")
if __name__ == '__main__':
mb = MessageBase()
ws = WebServer("192.168.6.28", 8001, mb)
ws.run()
三、设备功能
device_function.py
每个设备对应的线程功能,可以统一也可以写多个功能
class DeviceFunc:
def __init__(self, device_name, data):
self.device_name = device_name
self.data = data
def show_data(self):
if self.data:
print(self.device_name, "收到消息:", self.data.get("value"))
四、主线程
main.py
初始化所有功能模块,并运行主线程
from message_base import MessageBase
from websocket_server import WebServer
from device_function import DeviceFunc
class MainThread:
def __init__(self, message_base: MessageBase, websocket_server: WebServer, device_list):
self.message_base = message_base
self.websocket_server = websocket_server
self.device_list = device_list
def run_server(self):
self.websocket_server.run()
def run(self):
self.run_server()
while True:
for device in self.device_list:
try:
# 开始根据设备即功能处理消息
data = self.message_base.get(device)
if not data:
continue
df = DeviceFunc(device, data)
df.show_data()
except Exception as err:
pass
if __name__ == '__main__':
mb = MessageBase()
ws = WebServer("192.168.6.28", 8000, mb)
device_list = ["aa", "bb", "cc"]
mt = MainThread(mb, ws, device_list)
mt.run()
五、客户端
webscoket_client.py
给服务端发送消息,测试用
import json
import websocket
class WebClient:
def __init__(self, host, port):
self.host = host
self.port = port
self.conn = None
self.flag = False
def connect(self):
try:
url = f"ws://{self.host}:{self.port}"
self.conn = websocket.create_connection(url)
self.flag = True
print("连接成功")
except Exception as err:
self.flag = False
print("连接失败", err)
def close(self):
self.conn.close()
def recv(self):
data = self.conn.recv(1024)
print(data)
def send(self, data):
self.conn.send(data)
print("发送成功")
if __name__ == '__main__':
host = "192.168.6.28"
port = 8000
ws = WebClient(host, port)
if not ws.flag:
ws.connect()
devices = ["aa", "bb", "cc"]
while True:
device = random.choice(devices)
s = ""
for i in range(random.randint(0, 100)):
s += chr(random.randint(65, 122))
data = {"device": device, "value": s}
data = json.dumps(data)
ws.send(data)
time.sleep(1)
六、更新
队列在获取消息时有阻塞的现象,我们上面避免阻塞用了下面注释的内容,
def get(self, device): data_queue: Queue = self.data.get(device) if not data_queue: # 阻塞 return None # if not data_queue or data_queue.empty(): # return None data = data_queue.get() return data
我们如果不用注释的内容,让获取消息直接进入阻塞,但是阻塞是在设备线程里面,不影响主线程和其他线程。这样,我们需要把设备功能封装到线程里面。这样设备工作不会相互影响
DeviceFunc对象改为DeviceThread
import threading
class DeviceThread(threading.Thread):
def __init__(self, device_name, message_base):
super().__init__(target=self.process)
self.device_name = device_name
self.message_base = message_base
def get_data(self):
data = self.message_base.get(self.device_name)
return data
def process(self):
while True:
data = self.get_data()
if data:
print(self.device_name, "收到消息:", data.get("value"))
主线程的run方法修改为:
def run(self): self.run_server() # while True: # for device in self.device_list: # try: # data = self.message_base.get(device) # if not data: # continue # df = DeviceFunc(device, data) # df.show_data() # except Exception as err: # pass for device in self.device_list: t = DeviceThread(device, self.message_base) t.start()
标签:
相关文章
最新发布
- 光流法结合深度学习神经网络的原理及应用(完整代码都有Python opencv)
- Python 图像处理进阶:特征提取与图像分类
- 大数据可视化分析-基于python的电影数据分析及可视化系统_9532dr50
- 【Python】入门(运算、输出、数据类型)
- 【Python】第一弹---解锁编程新世界:深入理解计算机基础与Python入门指南
- 华为OD机试E卷 --第k个排列 --24年OD统一考试(Java & JS & Python & C & C++)
- Python已安装包在import时报错未找到的解决方法
- 【Python】自动化神器PyAutoGUI —告别手动操作,一键模拟鼠标键盘,玩转微信及各种软件自动化
- Pycharm连接SQL Sever(详细教程)
- Python编程练习题及解析(49题)
点击排行
- 版本匹配指南:Numpy版本和Python版本的对应关系
- 版本匹配指南:PyTorch版本、torchvision 版本和Python版本的对应关系
- Python 可视化 web 神器:streamlit、Gradio、dash、nicegui;低代码 Python Web 框架:PyWebIO
- 相关性分析——Pearson相关系数+热力图(附data和Python完整代码)
- Anaconda版本和Python版本对应关系(持续更新...)
- Python与PyTorch的版本对应
- Windows上安装 Python 环境并配置环境变量 (超详细教程)
- Python pyinstaller打包exe最完整教程