首页 > Python资料 博客日记
Python的并行任务(进程池、线程池)
2024-10-04 12:00:05Python资料围观31次
python的并行任务(进程池、线程池)
在Python中,进程(Process)和线程(Thread)是并发编程的两种主要方式,它们各自适用于不同的场景。了解何时使用进程或线程,可以帮助你更有效地设计并发程序。
使用内置基本库concurrent.futures来实现并发,简单使用这个模块,包括并行线程和并行进程执行器 。
进程(Process)适用场景:
- CPU密集型任务:当任务主要是计算密集型时,使用进程通常比线程更有效。因为Python的全局解释器锁(GIL)限制了同一时刻只有一个线程可以执行Python字节码。对于CPU密集型任务,使用多进程可以绕过GIL的限制,充分利用多核CPU的计算能力。
- 需要隔离的应用:如果程序的不同部分需要高度的隔离性(例如,不同的内存空间、文件描述符等),使用进程是更好的选择。进程之间的通信(IPC)虽然比线程间通信(如共享内存)复杂,但提供了更高的安全性。
- 跨平台兼容性:虽然Python的线程在大多数平台上都能很好地工作,但在某些平台上(特别是Windows),线程的行为可能与预期不同。在这些情况下,使用进程可能是一个更可靠的解决方案。
线程(Thread)适用场景:
- I/O密集型任务:当任务主要是等待I/O操作(如文件读写、网络请求等)完成时,使用线程是更合适的选择。因为线程之间的切换成本较低,可以更有效地利用等待时间。
- 需要快速响应的应用:对于需要快速响应用户输入或网络请求的应用,使用线程可以更快地处理这些请求,因为线程之间的切换比进程快得多。
- 共享数据:如果多个任务需要频繁地访问和修改共享数据,使用线程可能更方便,因为线程可以共享进程的内存空间。然而,这也需要小心处理数据同步和竞争条件的问题。
注意:
- 全局解释器锁(GIL):Python的GIL限制了同一时刻只有一个线程可以执行Python字节码。这意味着,对于CPU密集型任务,使用多线程可能不会带来性能上的提升。然而,对于I/O密集型任务,多线程仍然可以显著提高程序的效率。
- 线程和进程的选择:在选择使用线程还是进程时,需要综合考虑任务的性质、系统的资源以及程序的复杂度。对于简单的I/O密集型任务,线程可能是更好的选择;而对于复杂的、需要高度隔离的或CPU密集型任务,进程可能更合适。
- 并发库:Python提供了多种并发编程的库,如threading(用于线程)、multiprocessing(用于进程)以及concurrent.futures(提供了更高级的并发执行框架)。你可以根据具体需求选择合适的库来实现你的并发程序。
1. 并行线程ThreadPoolExecutor
import concurrent.futures
import numpy as np
import time
# 定义矩阵计算
def complex_matrix_operation(n, iterations):
"""
执行复杂矩阵运算来测试CPU性能。
参数:
n (int): 矩阵的大小,即n x n的矩阵。
iterations (int): 重复矩阵运算的次数。
返回:
float: 最终矩阵的迹(对角线元素之和)。
"""
# 初始化随机矩阵
A = np.random.rand(n, n)
B = np.random.rand(n, n)
# 开始计时
start_time = time.time()
# 执行多次矩阵乘法
result = np.eye(n) # 初始化为对角线矩阵
for _ in range(iterations):
# 矩阵乘积
result = np.dot(result, np.dot(A, B))
# 计算并返回迹
trace = np.trace(result)
# 结束计时
end_time = time.time()
print(f"计算完成 {n},耗时: {end_time - start_time:.6f} 秒")
return trace
if __name__ == '__main__':
# 调用函数
n = 16 # 最大矩阵大小
iterations = 16 # 迭代次数
# 开始时间
start_time = time.time()
# 创建线程池执行器 最多16线程
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
# 提交任务到线程池
futures = [executor.submit(complex_matrix_operation, i,iterations) for i in range(n)]
# 使用wait函数等待所有任务完成
# done, not_done = concurrent.futures.wait(futures, timeout=None)
# for future in done:
# result = future.result()
# print('wait:',result)
# # 使用as_completed函数按照完成顺序获取结果
# for future in concurrent.futures.as_completed(futures):
# result = future.result()
# print('completed:',result)
# 结束计时
end_time = time.time()
print(f"计算完成,总共耗时: {end_time - start_time:.6f} 秒")
执行结果:
计算完成 0,耗时: 0.000437 秒
计算完成 1,耗时: 0.001399 秒
计算完成 2,耗时: 0.001689 秒
计算完成 4,耗时: 0.005548 秒
计算完成 3,耗时: 0.006699 秒
计算完成 6,耗时: 0.000553 秒
计算完成 5,耗时: 0.010176 秒
计算完成 10,耗时: 0.000951 秒
计算完成 9,耗时: 0.000789 秒
计算完成 11,耗时: 0.001857 秒
计算完成 7,耗时: 0.006189 秒
计算完成 15,耗时: 0.004257 秒
计算完成 13,耗时: 0.004155 秒
计算完成 14,耗时: 0.007222 秒
计算完成 12,耗时: 0.006338 秒
计算完成 8,耗时: 0.007463 秒
计算完成,总共耗时: 0.030619 秒
从结果可以看到,计算的顺序是混乱的,非按n的顺序执行。
2. 并行进程ProcessPoolExecutor
import concurrent.futures
import numpy as np
import time
# 定义矩阵计算
def complex_matrix_operation(n, iterations):
"""
执行复杂矩阵运算来测试CPU性能。
参数:
n (int): 矩阵的大小,即n x n的矩阵。
iterations (int): 重复矩阵运算的次数。
返回:
float: 最终矩阵的迹(对角线元素之和)。
"""
# 初始化随机矩阵
A = np.random.rand(n, n)
B = np.random.rand(n, n)
# 开始计时
start_time = time.time()
# 执行多次矩阵乘法
result = np.eye(n) # 初始化为对角线矩阵
for _ in range(iterations):
# 矩阵乘积
result = np.dot(result, np.dot(A, B))
# 计算并返回迹
trace = np.trace(result)
# 结束计时
end_time = time.time()
print(f"计算完成 {n},耗时: {end_time - start_time:.6f} 秒")
return trace
if __name__ == '__main__':
# 调用函数
n = 16 # 最大矩阵大小
iterations = 16 # 迭代次数
# 开始时间
start_time = time.time()
# 创建线程池执行器 最多16进程
with concurrent.futures.ProcessPoolExecutor(max_workers=16) as executor:
# 提交任务到线程池
futures = [executor.submit(complex_matrix_operation, i,iterations) for i in range(n)]
# 使用wait函数等待所有任务完成
# done, not_done = concurrent.futures.wait(futures, timeout=None)
# for future in done:
# result = future.result()
# print('wait:',result)
# # 使用as_completed函数按照完成顺序获取结果
# for future in concurrent.futures.as_completed(futures):
# result = future.result()
# print('completed:',result)
# 结束计时
end_time = time.time()
print(f"计算完成,总共耗时: {end_time - start_time:.6f} 秒")
执行结果:
计算完成 1,耗时: 0.000953 秒计算完成 2,耗时: 0.000691 秒
计算完成 0,耗时: 0.000837 秒计算完成 3,耗时: 0.000993 秒
计算完成 6,耗时: 0.001096 秒计算完成 8,耗时: 0.000336 秒计算完成 7,耗时: 0.000955 秒计算完成 4,耗时: 0.000751 秒计算完成 9,耗时: 0.000916 秒
计算完成 13,耗时: 0.000758 秒计算完成 12,耗时: 0.001723 秒计算完成 11,耗时: 0.000999 秒计算完成 10,耗时: 0.001201 秒计算完成 14,耗时: 0.001045 秒计算完成 5,耗时: 0.001212 秒
计算完成 15,耗时: 0.000172 秒
计算完成,总共耗时: 0.432435 秒
从结果可以看到,计算的顺序是混乱的,非按n的顺序执行。
进程用时比线程更多 ,如果把n调到256 ,差距更大。
ThreadPoolExecutor
计算完成,总共耗时: 4.962590 秒
ProcessPoolExecutor
计算完成,总共耗时: 8.072993 秒
以上是在笔记本Windows环境下执行。
在Linux 环境测试了一下,n = 1024 ,结果比较符合预期,因为是主要是计算量大,没有IO,没有网络访问,process的性能应该比thread要好:
process :
计算完成,总共耗时: 147.940822 秒
thread:
计算完成,总共耗时: 263.970811 秒
看Python进程情况,8个process进程,一个thread进程:
# ps -ef | grep python
root 6488 1353 0 16:26 pts/2 00:00:00 python current_process.py
root 6489 6488 81 16:26 pts/2 00:00:40 python current_process.py
root 6490 6488 62 16:26 pts/2 00:00:31 python current_process.py
root 6491 6488 80 16:26 pts/2 00:00:40 python current_process.py
root 6492 6488 81 16:26 pts/2 00:00:40 python current_process.py
root 6493 6488 82 16:26 pts/2 00:00:41 python current_process.py
root 6494 6488 78 16:26 pts/2 00:00:39 python current_process.py
root 6495 6488 64 16:26 pts/2 00:00:32 python current_process.py
root 6496 6488 77 16:26 pts/2 00:00:38 python current_process.py
root 6551 32716 99 16:26 pts/1 00:01:32 python current_thread.py
root 6845 10471 0 16:26 pts/0 00:00:00 grep --color=auto python
3.concurrent.futures 模块
concurrent.futures模块除了提供ThreadPoolExecutor和ProcessPoolExecutor之外,还提供了一些其他的函数来执行异步任务和处理结果。以下是一些常用的concurrent.futures模块函数:
(1)concurrent.futures.as_completed
concurrent.futures.as_completed(fs, timeout=None):
- 接收一个可迭代的Future对象集合fs,返回一个生成器,在每个Future对象完成时产生结果。
- 可选地指定timeout参数,用于限制等待结果的最长时间。
(2)concurrent.futures.wait
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED):
- 接收一个可迭代的Future对象集合fs,等待所有的Future对象完成。
- 可选地指定timeout参数,用于限制等待结果的最长时间。
- 可选地指定return_when参数,用于指定何时返回结果,可选值包括FIRST_COMPLETED、FIRST_EXCEPTION和ALL_COMPLETED。
4.Executor 基类
concurrent.futures模块中的Executor是一个抽象基类,用于表示执行器对象。它定义了一些共同的方法和行为,用于管理并发执行的任务。但要通过它的子类调用,而不是直接调用。
Executor类并不直接实例化,而是通过具体的子类如ThreadPoolExecutor和ProcessPoolExecutor来创建实例。
常用的Executor类方法:
(1)submit(fn, *args, **kwargs): 提交一个可调用对象和它的参数给执行器,返回一个Future对象,表示该任务的未来结果。
(2)map(fn, *iterables, timeout=None): 批量提交任务,并按原始迭代器的顺序返回结果。它类似于内置函数map(),但是可以异步地并发执行任务。
(3)shutdown(wait=True): 关闭执行器,不再接受新的任务。如果wait参数为True(默认值),则在所有任务完成后再关闭执行器。
(4)submit_to_executor(fn, executor, *args, **kwargs): 将任务提交给指定的执行器对象,并返回一个Future对象。
(5)map_to_executor(fn, executor, *iterables, timeout=None): 将任务批量提交给指定的执行器对象,并返回结果。
这些方法使得在执行任务时更加方便和灵活。可以根据具体的需求选择合适的方法和执行器类型。
5.Future 对象
concurrent.futures.Future是concurrent.futures模块中的一个类,用于表示一个异步任务的未来结果。它将可调用对象封装为异步执行。Future 实例由 Executor.submit() 创建,除非测试,不应直接创建。
以下是concurrent.futures.Future的一些常用方法:
(1)result(): 等待并返回异步任务的结果。如果任务尚未完成,该方法会阻塞直到任务完成并返回结果。
(2)done(): 判断异步任务是否已经完成,返回布尔值。
(3)cancel(): 取消异步任务的执行。如果任务已经开始执行或已经完成,则无法取消。
(4)add_done_callback(fn): 添加一个回调函数,当异步任务完成时会调用该回调函数。
标签:
相关文章
最新发布
- 【Python】selenium安装+Microsoft Edge驱动器下载配置流程
- Python 中自动打开网页并点击[自动化脚本],Selenium
- Anaconda基础使用
- 【Python】成功解决 TypeError: ‘<‘ not supported between instances of ‘str’ and ‘int’
- manim边学边做--三维的点和线
- CPython是最常用的Python解释器之一,也是Python官方实现。它是用C语言编写的,旨在提供一个高效且易于使用的Python解释器。
- Anaconda安装配置Jupyter(2024最新版)
- Python中读取Excel最快的几种方法!
- Python某城市美食商家爬虫数据可视化分析和推荐查询系统毕业设计论文开题报告
- 如何使用 Python 批量检测和转换 JSONL 文件编码为 UTF-8
点击排行
- 版本匹配指南:Numpy版本和Python版本的对应关系
- 版本匹配指南:PyTorch版本、torchvision 版本和Python版本的对应关系
- Python 可视化 web 神器:streamlit、Gradio、dash、nicegui;低代码 Python Web 框架:PyWebIO
- 相关性分析——Pearson相关系数+热力图(附data和Python完整代码)
- Python与PyTorch的版本对应
- Anaconda版本和Python版本对应关系(持续更新...)
- Python pyinstaller打包exe最完整教程
- Could not build wheels for llama-cpp-python, which is required to install pyproject.toml-based proj