首页 > Python资料 博客日记
Python的并行任务(进程池、线程池)
2024-10-04 12:00:05Python资料围观55次
python的并行任务(进程池、线程池)
在Python中,进程(Process)和线程(Thread)是并发编程的两种主要方式,它们各自适用于不同的场景。了解何时使用进程或线程,可以帮助你更有效地设计并发程序。
使用内置基本库concurrent.futures来实现并发,简单使用这个模块,包括并行线程和并行进程执行器 。
进程(Process)适用场景:
- CPU密集型任务:当任务主要是计算密集型时,使用进程通常比线程更有效。因为Python的全局解释器锁(GIL)限制了同一时刻只有一个线程可以执行Python字节码。对于CPU密集型任务,使用多进程可以绕过GIL的限制,充分利用多核CPU的计算能力。
- 需要隔离的应用:如果程序的不同部分需要高度的隔离性(例如,不同的内存空间、文件描述符等),使用进程是更好的选择。进程之间的通信(IPC)虽然比线程间通信(如共享内存)复杂,但提供了更高的安全性。
- 跨平台兼容性:虽然Python的线程在大多数平台上都能很好地工作,但在某些平台上(特别是Windows),线程的行为可能与预期不同。在这些情况下,使用进程可能是一个更可靠的解决方案。
线程(Thread)适用场景:
- I/O密集型任务:当任务主要是等待I/O操作(如文件读写、网络请求等)完成时,使用线程是更合适的选择。因为线程之间的切换成本较低,可以更有效地利用等待时间。
- 需要快速响应的应用:对于需要快速响应用户输入或网络请求的应用,使用线程可以更快地处理这些请求,因为线程之间的切换比进程快得多。
- 共享数据:如果多个任务需要频繁地访问和修改共享数据,使用线程可能更方便,因为线程可以共享进程的内存空间。然而,这也需要小心处理数据同步和竞争条件的问题。
注意:
- 全局解释器锁(GIL):Python的GIL限制了同一时刻只有一个线程可以执行Python字节码。这意味着,对于CPU密集型任务,使用多线程可能不会带来性能上的提升。然而,对于I/O密集型任务,多线程仍然可以显著提高程序的效率。
- 线程和进程的选择:在选择使用线程还是进程时,需要综合考虑任务的性质、系统的资源以及程序的复杂度。对于简单的I/O密集型任务,线程可能是更好的选择;而对于复杂的、需要高度隔离的或CPU密集型任务,进程可能更合适。
- 并发库:Python提供了多种并发编程的库,如threading(用于线程)、multiprocessing(用于进程)以及concurrent.futures(提供了更高级的并发执行框架)。你可以根据具体需求选择合适的库来实现你的并发程序。
1. 并行线程ThreadPoolExecutor
import concurrent.futures
import numpy as np
import time
# 定义矩阵计算
def complex_matrix_operation(n, iterations):
"""
执行复杂矩阵运算来测试CPU性能。
参数:
n (int): 矩阵的大小,即n x n的矩阵。
iterations (int): 重复矩阵运算的次数。
返回:
float: 最终矩阵的迹(对角线元素之和)。
"""
# 初始化随机矩阵
A = np.random.rand(n, n)
B = np.random.rand(n, n)
# 开始计时
start_time = time.time()
# 执行多次矩阵乘法
result = np.eye(n) # 初始化为对角线矩阵
for _ in range(iterations):
# 矩阵乘积
result = np.dot(result, np.dot(A, B))
# 计算并返回迹
trace = np.trace(result)
# 结束计时
end_time = time.time()
print(f"计算完成 {n},耗时: {end_time - start_time:.6f} 秒")
return trace
if __name__ == '__main__':
# 调用函数
n = 16 # 最大矩阵大小
iterations = 16 # 迭代次数
# 开始时间
start_time = time.time()
# 创建线程池执行器 最多16线程
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
# 提交任务到线程池
futures = [executor.submit(complex_matrix_operation, i,iterations) for i in range(n)]
# 使用wait函数等待所有任务完成
# done, not_done = concurrent.futures.wait(futures, timeout=None)
# for future in done:
# result = future.result()
# print('wait:',result)
# # 使用as_completed函数按照完成顺序获取结果
# for future in concurrent.futures.as_completed(futures):
# result = future.result()
# print('completed:',result)
# 结束计时
end_time = time.time()
print(f"计算完成,总共耗时: {end_time - start_time:.6f} 秒")
执行结果:
计算完成 0,耗时: 0.000437 秒
计算完成 1,耗时: 0.001399 秒
计算完成 2,耗时: 0.001689 秒
计算完成 4,耗时: 0.005548 秒
计算完成 3,耗时: 0.006699 秒
计算完成 6,耗时: 0.000553 秒
计算完成 5,耗时: 0.010176 秒
计算完成 10,耗时: 0.000951 秒
计算完成 9,耗时: 0.000789 秒
计算完成 11,耗时: 0.001857 秒
计算完成 7,耗时: 0.006189 秒
计算完成 15,耗时: 0.004257 秒
计算完成 13,耗时: 0.004155 秒
计算完成 14,耗时: 0.007222 秒
计算完成 12,耗时: 0.006338 秒
计算完成 8,耗时: 0.007463 秒
计算完成,总共耗时: 0.030619 秒
从结果可以看到,计算的顺序是混乱的,非按n的顺序执行。
2. 并行进程ProcessPoolExecutor
import concurrent.futures
import numpy as np
import time
# 定义矩阵计算
def complex_matrix_operation(n, iterations):
"""
执行复杂矩阵运算来测试CPU性能。
参数:
n (int): 矩阵的大小,即n x n的矩阵。
iterations (int): 重复矩阵运算的次数。
返回:
float: 最终矩阵的迹(对角线元素之和)。
"""
# 初始化随机矩阵
A = np.random.rand(n, n)
B = np.random.rand(n, n)
# 开始计时
start_time = time.time()
# 执行多次矩阵乘法
result = np.eye(n) # 初始化为对角线矩阵
for _ in range(iterations):
# 矩阵乘积
result = np.dot(result, np.dot(A, B))
# 计算并返回迹
trace = np.trace(result)
# 结束计时
end_time = time.time()
print(f"计算完成 {n},耗时: {end_time - start_time:.6f} 秒")
return trace
if __name__ == '__main__':
# 调用函数
n = 16 # 最大矩阵大小
iterations = 16 # 迭代次数
# 开始时间
start_time = time.time()
# 创建线程池执行器 最多16进程
with concurrent.futures.ProcessPoolExecutor(max_workers=16) as executor:
# 提交任务到线程池
futures = [executor.submit(complex_matrix_operation, i,iterations) for i in range(n)]
# 使用wait函数等待所有任务完成
# done, not_done = concurrent.futures.wait(futures, timeout=None)
# for future in done:
# result = future.result()
# print('wait:',result)
# # 使用as_completed函数按照完成顺序获取结果
# for future in concurrent.futures.as_completed(futures):
# result = future.result()
# print('completed:',result)
# 结束计时
end_time = time.time()
print(f"计算完成,总共耗时: {end_time - start_time:.6f} 秒")
执行结果:
计算完成 1,耗时: 0.000953 秒计算完成 2,耗时: 0.000691 秒
计算完成 0,耗时: 0.000837 秒计算完成 3,耗时: 0.000993 秒
计算完成 6,耗时: 0.001096 秒计算完成 8,耗时: 0.000336 秒计算完成 7,耗时: 0.000955 秒计算完成 4,耗时: 0.000751 秒计算完成 9,耗时: 0.000916 秒
计算完成 13,耗时: 0.000758 秒计算完成 12,耗时: 0.001723 秒计算完成 11,耗时: 0.000999 秒计算完成 10,耗时: 0.001201 秒计算完成 14,耗时: 0.001045 秒计算完成 5,耗时: 0.001212 秒
计算完成 15,耗时: 0.000172 秒
计算完成,总共耗时: 0.432435 秒
从结果可以看到,计算的顺序是混乱的,非按n的顺序执行。
进程用时比线程更多 ,如果把n调到256 ,差距更大。
ThreadPoolExecutor
计算完成,总共耗时: 4.962590 秒
ProcessPoolExecutor
计算完成,总共耗时: 8.072993 秒
以上是在笔记本Windows环境下执行。
在Linux 环境测试了一下,n = 1024 ,结果比较符合预期,因为是主要是计算量大,没有IO,没有网络访问,process的性能应该比thread要好:
process :
计算完成,总共耗时: 147.940822 秒
thread:
计算完成,总共耗时: 263.970811 秒
看Python进程情况,8个process进程,一个thread进程:
# ps -ef | grep python
root 6488 1353 0 16:26 pts/2 00:00:00 python current_process.py
root 6489 6488 81 16:26 pts/2 00:00:40 python current_process.py
root 6490 6488 62 16:26 pts/2 00:00:31 python current_process.py
root 6491 6488 80 16:26 pts/2 00:00:40 python current_process.py
root 6492 6488 81 16:26 pts/2 00:00:40 python current_process.py
root 6493 6488 82 16:26 pts/2 00:00:41 python current_process.py
root 6494 6488 78 16:26 pts/2 00:00:39 python current_process.py
root 6495 6488 64 16:26 pts/2 00:00:32 python current_process.py
root 6496 6488 77 16:26 pts/2 00:00:38 python current_process.py
root 6551 32716 99 16:26 pts/1 00:01:32 python current_thread.py
root 6845 10471 0 16:26 pts/0 00:00:00 grep --color=auto python
3.concurrent.futures 模块
concurrent.futures模块除了提供ThreadPoolExecutor和ProcessPoolExecutor之外,还提供了一些其他的函数来执行异步任务和处理结果。以下是一些常用的concurrent.futures模块函数:
(1)concurrent.futures.as_completed
concurrent.futures.as_completed(fs, timeout=None):
- 接收一个可迭代的Future对象集合fs,返回一个生成器,在每个Future对象完成时产生结果。
- 可选地指定timeout参数,用于限制等待结果的最长时间。
(2)concurrent.futures.wait
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED):
- 接收一个可迭代的Future对象集合fs,等待所有的Future对象完成。
- 可选地指定timeout参数,用于限制等待结果的最长时间。
- 可选地指定return_when参数,用于指定何时返回结果,可选值包括FIRST_COMPLETED、FIRST_EXCEPTION和ALL_COMPLETED。
4.Executor 基类
concurrent.futures模块中的Executor是一个抽象基类,用于表示执行器对象。它定义了一些共同的方法和行为,用于管理并发执行的任务。但要通过它的子类调用,而不是直接调用。
Executor类并不直接实例化,而是通过具体的子类如ThreadPoolExecutor和ProcessPoolExecutor来创建实例。
常用的Executor类方法:
(1)submit(fn, *args, **kwargs): 提交一个可调用对象和它的参数给执行器,返回一个Future对象,表示该任务的未来结果。
(2)map(fn, *iterables, timeout=None): 批量提交任务,并按原始迭代器的顺序返回结果。它类似于内置函数map(),但是可以异步地并发执行任务。
(3)shutdown(wait=True): 关闭执行器,不再接受新的任务。如果wait参数为True(默认值),则在所有任务完成后再关闭执行器。
(4)submit_to_executor(fn, executor, *args, **kwargs): 将任务提交给指定的执行器对象,并返回一个Future对象。
(5)map_to_executor(fn, executor, *iterables, timeout=None): 将任务批量提交给指定的执行器对象,并返回结果。
这些方法使得在执行任务时更加方便和灵活。可以根据具体的需求选择合适的方法和执行器类型。
5.Future 对象
concurrent.futures.Future是concurrent.futures模块中的一个类,用于表示一个异步任务的未来结果。它将可调用对象封装为异步执行。Future 实例由 Executor.submit() 创建,除非测试,不应直接创建。
以下是concurrent.futures.Future的一些常用方法:
(1)result(): 等待并返回异步任务的结果。如果任务尚未完成,该方法会阻塞直到任务完成并返回结果。
(2)done(): 判断异步任务是否已经完成,返回布尔值。
(3)cancel(): 取消异步任务的执行。如果任务已经开始执行或已经完成,则无法取消。
(4)add_done_callback(fn): 添加一个回调函数,当异步任务完成时会调用该回调函数。
标签:
相关文章
最新发布
- 光流法结合深度学习神经网络的原理及应用(完整代码都有Python opencv)
- Python 图像处理进阶:特征提取与图像分类
- 大数据可视化分析-基于python的电影数据分析及可视化系统_9532dr50
- 【Python】入门(运算、输出、数据类型)
- 【Python】第一弹---解锁编程新世界:深入理解计算机基础与Python入门指南
- 华为OD机试E卷 --第k个排列 --24年OD统一考试(Java & JS & Python & C & C++)
- Python已安装包在import时报错未找到的解决方法
- 【Python】自动化神器PyAutoGUI —告别手动操作,一键模拟鼠标键盘,玩转微信及各种软件自动化
- Pycharm连接SQL Sever(详细教程)
- Python编程练习题及解析(49题)
点击排行
- 版本匹配指南:Numpy版本和Python版本的对应关系
- 版本匹配指南:PyTorch版本、torchvision 版本和Python版本的对应关系
- Python 可视化 web 神器:streamlit、Gradio、dash、nicegui;低代码 Python Web 框架:PyWebIO
- 相关性分析——Pearson相关系数+热力图(附data和Python完整代码)
- Anaconda版本和Python版本对应关系(持续更新...)
- Python与PyTorch的版本对应
- Windows上安装 Python 环境并配置环境变量 (超详细教程)
- Python pyinstaller打包exe最完整教程