首页 > Python资料 博客日记

Python的并行任务(进程池、线程池)

2024-10-04 12:00:05Python资料围观23

本篇文章分享Python的并行任务(进程池、线程池),对你有帮助的话记得收藏一下,看Python资料网收获更多编程知识

python的并行任务(进程池、线程池)

在Python中,进程(Process)和线程(Thread)是并发编程的两种主要方式,它们各自适用于不同的场景。了解何时使用进程或线程,可以帮助你更有效地设计并发程序。
使用内置基本库concurrent.futures来实现并发,简单使用这个模块,包括并行线程和并行进程执行器 。

进程(Process)适用场景:

  • CPU密集型任务:当任务主要是计算密集型时,使用进程通常比线程更有效。因为Python的全局解释器锁(GIL)限制了同一时刻只有一个线程可以执行Python字节码。对于CPU密集型任务,使用多进程可以绕过GIL的限制,充分利用多核CPU的计算能力。
  • 需要隔离的应用:如果程序的不同部分需要高度的隔离性(例如,不同的内存空间、文件描述符等),使用进程是更好的选择。进程之间的通信(IPC)虽然比线程间通信(如共享内存)复杂,但提供了更高的安全性。
  • 跨平台兼容性:虽然Python的线程在大多数平台上都能很好地工作,但在某些平台上(特别是Windows),线程的行为可能与预期不同。在这些情况下,使用进程可能是一个更可靠的解决方案。

线程(Thread)适用场景:

  • I/O密集型任务:当任务主要是等待I/O操作(如文件读写、网络请求等)完成时,使用线程是更合适的选择。因为线程之间的切换成本较低,可以更有效地利用等待时间。
  • 需要快速响应的应用:对于需要快速响应用户输入或网络请求的应用,使用线程可以更快地处理这些请求,因为线程之间的切换比进程快得多。
  • 共享数据:如果多个任务需要频繁地访问和修改共享数据,使用线程可能更方便,因为线程可以共享进程的内存空间。然而,这也需要小心处理数据同步和竞争条件的问题。

注意:

  • 全局解释器锁(GIL):Python的GIL限制了同一时刻只有一个线程可以执行Python字节码。这意味着,对于CPU密集型任务,使用多线程可能不会带来性能上的提升。然而,对于I/O密集型任务,多线程仍然可以显著提高程序的效率。
  • 线程和进程的选择:在选择使用线程还是进程时,需要综合考虑任务的性质、系统的资源以及程序的复杂度。对于简单的I/O密集型任务,线程可能是更好的选择;而对于复杂的、需要高度隔离的或CPU密集型任务,进程可能更合适。
  • 并发库:Python提供了多种并发编程的库,如threading(用于线程)、multiprocessing(用于进程)以及concurrent.futures(提供了更高级的并发执行框架)。你可以根据具体需求选择合适的库来实现你的并发程序。

1. 并行线程ThreadPoolExecutor

import concurrent.futures
import numpy as np  
import time  

# 定义矩阵计算  
def complex_matrix_operation(n, iterations):  
    """  
    执行复杂矩阵运算来测试CPU性能。  
      
    参数:  
    n (int): 矩阵的大小,即n x n的矩阵。  
    iterations (int): 重复矩阵运算的次数。  
      
    返回:  
    float: 最终矩阵的迹(对角线元素之和)。  
    """  
    # 初始化随机矩阵  
    A = np.random.rand(n, n)  
    B = np.random.rand(n, n)  
      
    # 开始计时  
    start_time = time.time()  
      
    # 执行多次矩阵乘法  
    result = np.eye(n)  # 初始化为对角线矩阵  
    for _ in range(iterations): 
        # 矩阵乘积
        result = np.dot(result, np.dot(A, B))  
      
    # 计算并返回迹  
    trace = np.trace(result)  
      
    # 结束计时  
    end_time = time.time()  
    print(f"计算完成 {n},耗时: {end_time - start_time:.6f} 秒")  

    return trace  

if __name__ == '__main__':
    # 调用函数  
    n = 16  # 最大矩阵大小  
    iterations = 16  # 迭代次数  
    # 开始时间
    start_time = time.time()  
    # 创建线程池执行器 最多16线程
    with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
        # 提交任务到线程池
        futures = [executor.submit(complex_matrix_operation, i,iterations) for i in range(n)]
        
        # 使用wait函数等待所有任务完成
        # done, not_done = concurrent.futures.wait(futures, timeout=None)
        # for future in done:
        #     result = future.result()
        #     print('wait:',result)
     
        # # 使用as_completed函数按照完成顺序获取结果
        # for future in concurrent.futures.as_completed(futures):
        #     result = future.result()
        #     print('completed:',result)
    
    # 结束计时  
    end_time = time.time()  
    print(f"计算完成,总共耗时: {end_time - start_time:.6f} 秒")  

执行结果:

计算完成 0,耗时: 0.000437 秒
计算完成 1,耗时: 0.001399 秒
计算完成 2,耗时: 0.001689 秒
计算完成 4,耗时: 0.005548 秒
计算完成 3,耗时: 0.006699 秒
计算完成 6,耗时: 0.000553 秒
计算完成 5,耗时: 0.010176 秒
计算完成 10,耗时: 0.000951 秒
计算完成 9,耗时: 0.000789 秒
计算完成 11,耗时: 0.001857 秒
计算完成 7,耗时: 0.006189 秒
计算完成 15,耗时: 0.004257 秒
计算完成 13,耗时: 0.004155 秒
计算完成 14,耗时: 0.007222 秒
计算完成 12,耗时: 0.006338 秒
计算完成 8,耗时: 0.007463 秒
计算完成,总共耗时: 0.030619

从结果可以看到,计算的顺序是混乱的,非按n的顺序执行。

2. 并行进程ProcessPoolExecutor

import concurrent.futures
import numpy as np  
import time  

# 定义矩阵计算  
def complex_matrix_operation(n, iterations):  
    """  
    执行复杂矩阵运算来测试CPU性能。  
      
    参数:  
    n (int): 矩阵的大小,即n x n的矩阵。  
    iterations (int): 重复矩阵运算的次数。  
      
    返回:  
    float: 最终矩阵的迹(对角线元素之和)。  
    """  
    # 初始化随机矩阵  
    A = np.random.rand(n, n)  
    B = np.random.rand(n, n)  
      
    # 开始计时  
    start_time = time.time()  
      
    # 执行多次矩阵乘法  
    result = np.eye(n)  # 初始化为对角线矩阵  
    for _ in range(iterations): 
        # 矩阵乘积
        result = np.dot(result, np.dot(A, B))  
      
    # 计算并返回迹  
    trace = np.trace(result)  
      
    # 结束计时  
    end_time = time.time()  
    print(f"计算完成 {n},耗时: {end_time - start_time:.6f} 秒")  

    return trace  

if __name__ == '__main__':
    # 调用函数  
    n = 16  # 最大矩阵大小  
    iterations = 16  # 迭代次数  
    # 开始时间
    start_time = time.time()  
    # 创建线程池执行器 最多16进程
    with concurrent.futures.ProcessPoolExecutor(max_workers=16) as executor:
        # 提交任务到线程池
        futures = [executor.submit(complex_matrix_operation, i,iterations) for i in range(n)]
        
        # 使用wait函数等待所有任务完成
        # done, not_done = concurrent.futures.wait(futures, timeout=None)
        # for future in done:
        #     result = future.result()
        #     print('wait:',result)
     
        # # 使用as_completed函数按照完成顺序获取结果
        # for future in concurrent.futures.as_completed(futures):
        #     result = future.result()
        #     print('completed:',result)
    
    # 结束计时  
    end_time = time.time()  
    print(f"计算完成,总共耗时: {end_time - start_time:.6f} 秒")  

执行结果:

计算完成 1,耗时: 0.000953 秒计算完成 2,耗时: 0.000691 秒
计算完成 0,耗时: 0.000837 秒计算完成 3,耗时: 0.000993 秒
计算完成 6,耗时: 0.001096 秒计算完成 8,耗时: 0.000336 秒计算完成 7,耗时: 0.000955 秒计算完成 4,耗时: 0.000751 秒计算完成 9,耗时: 0.000916 秒


计算完成 13,耗时: 0.000758 秒计算完成 12,耗时: 0.001723 秒计算完成 11,耗时: 0.000999 秒计算完成 10,耗时: 0.001201 秒计算完成 14,耗时: 0.001045 秒计算完成 5,耗时: 0.001212 秒


计算完成 15,耗时: 0.000172 秒

计算完成,总共耗时: 0.432435

从结果可以看到,计算的顺序是混乱的,非按n的顺序执行。
进程用时比线程更多 ,如果把n调到256 ,差距更大。
ThreadPoolExecutor

计算完成,总共耗时: 4.962590

ProcessPoolExecutor

计算完成,总共耗时: 8.072993

以上是在笔记本Windows环境下执行。
在Linux 环境测试了一下,n = 1024 ,结果比较符合预期,因为是主要是计算量大,没有IO,没有网络访问,process的性能应该比thread要好:
process :

计算完成,总共耗时: 147.940822

thread:

计算完成,总共耗时: 263.970811

看Python进程情况,8个process进程,一个thread进程:

# ps -ef | grep python
root      6488  1353  0 16:26 pts/2    00:00:00 python current_process.py
root      6489  6488 81 16:26 pts/2    00:00:40 python current_process.py
root      6490  6488 62 16:26 pts/2    00:00:31 python current_process.py
root      6491  6488 80 16:26 pts/2    00:00:40 python current_process.py
root      6492  6488 81 16:26 pts/2    00:00:40 python current_process.py
root      6493  6488 82 16:26 pts/2    00:00:41 python current_process.py
root      6494  6488 78 16:26 pts/2    00:00:39 python current_process.py
root      6495  6488 64 16:26 pts/2    00:00:32 python current_process.py
root      6496  6488 77 16:26 pts/2    00:00:38 python current_process.py
root      6551 32716 99 16:26 pts/1    00:01:32 python current_thread.py
root      6845 10471  0 16:26 pts/0    00:00:00 grep --color=auto python

3.concurrent.futures 模块

concurrent.futures模块除了提供ThreadPoolExecutor和ProcessPoolExecutor之外,还提供了一些其他的函数来执行异步任务和处理结果。以下是一些常用的concurrent.futures模块函数:

(1)concurrent.futures.as_completed

concurrent.futures.as_completed(fs, timeout=None):
  • 接收一个可迭代的Future对象集合fs,返回一个生成器,在每个Future对象完成时产生结果。
  • 可选地指定timeout参数,用于限制等待结果的最长时间。

(2)concurrent.futures.wait

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED):
  • 接收一个可迭代的Future对象集合fs,等待所有的Future对象完成。
  • 可选地指定timeout参数,用于限制等待结果的最长时间。
  • 可选地指定return_when参数,用于指定何时返回结果,可选值包括FIRST_COMPLETED、FIRST_EXCEPTION和ALL_COMPLETED。

4.Executor 基类

concurrent.futures模块中的Executor是一个抽象基类,用于表示执行器对象。它定义了一些共同的方法和行为,用于管理并发执行的任务。但要通过它的子类调用,而不是直接调用。

Executor类并不直接实例化,而是通过具体的子类如ThreadPoolExecutor和ProcessPoolExecutor来创建实例。

常用的Executor类方法:

(1)submit(fn, *args, **kwargs): 提交一个可调用对象和它的参数给执行器,返回一个Future对象,表示该任务的未来结果。

(2)map(fn, *iterables, timeout=None): 批量提交任务,并按原始迭代器的顺序返回结果。它类似于内置函数map(),但是可以异步地并发执行任务。

(3)shutdown(wait=True): 关闭执行器,不再接受新的任务。如果wait参数为True(默认值),则在所有任务完成后再关闭执行器。

(4)submit_to_executor(fn, executor, *args, **kwargs): 将任务提交给指定的执行器对象,并返回一个Future对象。

(5)map_to_executor(fn, executor, *iterables, timeout=None): 将任务批量提交给指定的执行器对象,并返回结果。

这些方法使得在执行任务时更加方便和灵活。可以根据具体的需求选择合适的方法和执行器类型。

5.Future 对象

concurrent.futures.Future是concurrent.futures模块中的一个类,用于表示一个异步任务的未来结果。它将可调用对象封装为异步执行。Future 实例由 Executor.submit() 创建,除非测试,不应直接创建。

以下是concurrent.futures.Future的一些常用方法:

(1)result(): 等待并返回异步任务的结果。如果任务尚未完成,该方法会阻塞直到任务完成并返回结果。

(2)done(): 判断异步任务是否已经完成,返回布尔值。

(3)cancel(): 取消异步任务的执行。如果任务已经开始执行或已经完成,则无法取消。

(4)add_done_callback(fn): 添加一个回调函数,当异步任务完成时会调用该回调函数。

参考文章


版权声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐