首页 > Python资料 博客日记

Python 多线程编程详解:用 Threading 优雅提升程序并发性能

2024-11-05 01:00:08Python资料围观27

Python资料网推荐Python 多线程编程详解:用 Threading 优雅提升程序并发性能这篇文章给大家,欢迎收藏Python资料网享受知识的乐趣

Python 多线程编程详解:用 Threading 优雅提升程序并发性能

本文深入探讨了 Python 中多线程(Threading)的原理与应用,帮助读者更好地理解多线程在程序优化中的重要性。文章从多线程的基本概念入手,详细讲解了线程的创建与管理、join 方法的使用、线程之间的数据共享与同步、以及如何使用 Queue 实现线程间的数据传递。通过生动的代码示例,演示了如何在 Python 中有效利用多线程处理 I/O 密集型任务,以及如何通过线程锁(Lock)避免数据竞争问题。还解析了 GIL(全局解释器锁)对多线程性能的影响,为开发者提供了多线程编程的实战指导和性能优化的最佳实践。

一 多线程

Python 中的多线程(Threading)是一种允许程序在同一时间内执行多个操作的方式。它的核心思想是在程序中创建多个线程(Thread),每个线程可以独立地执行任务。

1 基本概念
  • 线程(Thread):线程是操作系统能够进行运算调度的最小单位,它包含在进程中,是进程的实际运作单位。一个进程可以包含多个线程,这些线程共享进程的内存空间和资源。
  • 多线程(Multithreading):在一个程序中创建多个线程,使它们并行运行,从而提升程序的效率。多线程特别适用于 I/O 密集型操作(如网络请求、文件读写)或多个独立任务需要同时执行的场景。
2 注意事项
  • 全局解释器锁(GIL):Python 的线程虽然可以实现并发,但由于 GIL 的存在,在 CPU 密集型任务中,多线程的性能提升可能不明显。对于 I/O 密集型任务,多线程仍然非常有用。
  • 数据竞争:多线程访问共享数据时,如果没有合适的同步机制(如锁),可能会出现数据竞争问题。

多线程的核心目标是提高程序的并发性,但在 Python 中使用时需要特别注意线程安全问题和 GIL 的限制。

二 线程的常用功能

1 线程模块
import threading
2 常规操作
# 获取已激活的线程数
print("已激活的线程数 ", threading.active_count())
# 查看所有线程信息
print("所有线程信息 ", threading.enumerate())
# 查看现在正在运行的线程
print("现在正在运行的线程 ", threading.current_thread())
3 创建线程
def thread_job():
    print(' 1 This is a thread of %s' % threading.current_thread())

def my_thread():
    # target 为目标函数,想要调用的任务方法。name 为当前线程名称。
    thread = threading.Thread(target=thread_job, name='我新建的线程')  # 定义线程
    thread.start()  # 让线程开始工作
    # time.sleep(10)

三 join 功能

1 不加 join 功能
def thread_job02():
    print(' 2 This is a thread of %s' % threading.current_thread())
    print("thread_job02 T1 start\n")
    for i in range(10):
        time.sleep(0.1)  # 任务间隔0.1s
    print("thread_job02 T1 finish\n")

def job02_thread():
    # join 功能 
    added_thread = threading.Thread(target=thread_job02, name='T1')
    added_thread.start()
    # 加 join 功能,放开注释就是加 join 的功能,和预想的结果一致。
    # added_thread.join()
    print("job02_thread all done\n")
    # join 控制多个线程的执行顺序

结果对比

# 预想的结果
thread_job02 T1 start
thread_job02 T1 finish
job02_thread all done
# 实际的结果
thread_job02 T1 start
job02_thread all done
thread_job02 T1 finish
2 加 join 的具体示例
def T1_job():
    print("T1_job T1 start\n")
    for i in range(10):
        time.sleep(0.1)
    print("T1_job T1 finish\n")


def T2_job():
    print("T2_job T2 start\n")
    print("T2_job T2 finish\n")


def T1T2_job_thread():
    thread_1 = threading.Thread(target=T1_job, name='T1')
    thread_2 = threading.Thread(target=T2_job, name='T2')
    thread_1.start()  # 开启T1
    thread_2.start()  # 开启T2
    # 不加 join 打印信息的前后顺序,取决去线程处理数据的速度
    # 在写 join 的时候可以把处理数据量少的写在前面,主要目的是减少主线程或其他依赖线程的等待时间
    thread_2.join()
    thread_1.join()
    print("T1T2_job_thread all done\n")

不加 join 打印信息的前后顺序,取决去线程处理数据的速度;在写 join 的时候可以把处理数据量少的写在前面,主要目的是减少主线程或其他依赖线程的等待时间

四 储存线程结果 Queue

1 导入线程队列的标准模块
from queue import Queue
2 定义被线程调用的函数
# 参数 一个列表l和一个队列q
def job(l, q):
    for i in range(len(l)):
        # 给列表元素作平方计算
        l[i] = l[i] ** 2
    q.put(l)  

注:多线程调用的函数不能用return返回值

3 定义多线程任务函数
def multithreading():
    q = Queue()  # q中存放返回值,代替return的返回值
    threads = []
    data = [[1, 2, 3], [3, 4, 5], [4, 4, 4], [5, 5, 5]]

    for i in range(4):
        # 定义四个线程, Thread首字母要大写,被调用的job函数没有括号,只是一个索引,参数在后面
        t = threading.Thread(target=job, args=(data[i], q))
        # 开始线程
        t.start()
        # 把每个线程append到线程列表中
        threads.append(t)

    for thread in threads:
        # 阻塞当前线程,主线程暂停执行(或调用 join() 的线程)
        thread.join()
    results = []
    for _ in range(4):
        results.append(q.get())  # q.get()按顺序从q中拿出一个值
    print(results)

五 GIL 全局解释器锁

GIL(Global Interpreter Lock,全局解释器锁)是 Python 的一种机制,它限制了同一时刻只有一个线程可以执行 Python 字节码。这意味着在多线程的情况下,即使系统有多个 CPU 核心,也只能有一个线程在同一时间执行 Python 代码。

def job02(l, q):
    res = sum(l)
    q.put(res)


def multithreading02(l):
    q = Queue()
    threads = []
    for i in range(4):
        t = threading.Thread(target=job02, args=(copy.copy(l), q), name='T%i' % i)
        t.start()
        threads.append(t)
    [t.join() for t in threads]
    total = 0
    for _ in range(4):
        total += q.get()
    print(total)


def normal02(l):
    total = sum(l)
    print(total)

运行时间对比

if __name__ == '__main__':
    l = list(range(1000000))
    s_t = time.time()
    normal02(l * 4)
    print('normal: ', time.time() - s_t)
    s_t = time.time()
    multithreading02(l)
    print('multithreading: ', time.time() - s_t)

GIL 不一定有效率,multithreading02normal02 运行了一样多次的运算,multithreading02` 并没有快多少,甚至有时候会慢。运行结果如下

1999998000000
normal02:  0.03085780143737793
1999998000000
multithreading02:  0.033220767974853516
划重点

线程安全: GIL 的主要目的是在 CPython(Python 的主流解释器实现)中简化内存管理,避免多线程访问共享数据时的竞争问题。GIL 确保了同一时刻只有一个线程能够执行 Python 代码,从而避免了多线程操作内存时的冲突。

六 线程锁 Lock

1 不使用 Lock 的情况
A = 0

def job1_G():
    global A
    for i in range(10):
        A += 1
        print('job1', A)

def job2_G():
    global A
    for i in range(10):
        A += 10
        print('job2', A)

def job_1_2_G():
    t1 = threading.Thread(target=job1_G)
    t2 = threading.Thread(target=job2_G)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

运行结果无序资源竞争激烈,如下

job1 1
job1 job2 12
job2 22
job2 32
job2 42
job2 52
2
job1 53
job1 54
job1 55
job1 56
job1 57
job1job2 68 
58
job1job2 79
job2  69
89
job2 99
job2 109
job1 110
2 使用 Lock 的情况

划重点

Lock 在同一时刻只用一个线程访问共享内存,防止 “竞态条件” 问题,它是把互斥锁。

  • 竞态条件: 当多个线程同时操作共享资源时,如果没有适当的同步机制,线程间的执行顺序可能会相互交错,导致数据竞争或错误。
  • 互斥锁(Lock): 互斥锁是一种同步原语,确保在同一时刻只有一个线程可以访问某个资源,其他线程在锁被占用时无法访问受保护的资源。
A = 0
lock = threading.Lock()

def job1_L():
    global A, lock
    lock.acquire()
    try:
        for i in range(10):
            A += 1
            print('job1', A)
    finally:
        lock.release()

def job2_L():
    global A, lock
    # 使用 with 语句可以简化 acquire 和 release 的过程,使代码更简洁清晰。推荐
    with lock:
        for i in range(10):
            A += 10
            print('job2', A)

def job_1_2_L():
    t1 = threading.Thread(target=job1_L)
    t2 = threading.Thread(target=job2_L)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

运行结果有序资源使用合理,如下

job1 1
job1 2
job1 3
job1 4
job1 5
job1 6
job1 7
job1 8
job1 9
job1 10
job2 20
job2 30
job2 40
job2 50
job2 60
job2 70
job2 80
job2 90
job2 100
job2 110

七 完整代码示例

:建议在运行 main.py 对应的代码功能时,逐行使用注释进行操作。

# This is a sample Python script.

# Press ⌃R to execute it or replace it with your code.
# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.

import threading
import time
from queue import Queue
import copy


def print_hi(name):
    # Use a breakpoint in the code line below to debug your script.
    print(f'Hi, {name}')  # Press ⌘F8 to toggle the breakpoint.

    # 添加线程
    # 获取已激活的线程数
    print("已激活的线程数 ", threading.active_count())
    # 查看所有线程信息
    print("所有线程信息 ", threading.enumerate())
    # 查看现在正在运行的线程
    print("现在正在运行的线程 ", threading.current_thread())


def thread_job():
    print(' 1 This is a thread of %s' % threading.current_thread())


def my_thread():
    # target 为目标函数,想要调用的任务方法。name 为当前线程名称。
    thread = threading.Thread(target=thread_job, name='我新建的线程')  # 定义线程
    thread.start()  # 让线程开始工作
    # time.sleep(10)


def thread_job02():
    print(' 2 This is a thread of %s' % threading.current_thread())
    print("thread_job02 T1 start\n")
    for i in range(10):
        time.sleep(0.1)  # 任务间隔0.1s
    print("thread_job02 T1 finish\n")


def job02_thread():
    # join 功能
    # 不加 join 功能
    # 预想的结果
    # T1 start
    # T1 finish
    # all done
    # 实际的结果
    # T1 start
    # all done
    # T1 finish
    added_thread = threading.Thread(target=thread_job02, name='T1')
    added_thread.start()
    # 加 join 功能
    added_thread.join()
    print("job02_thread all done\n")
    # join 控制多个线程的执行顺序


def T1_job():
    print("T1_job T1 start\n")
    for i in range(10):
        time.sleep(0.1)
    print("T1_job T1 finish\n")


def T2_job():
    print("T2_job T2 start\n")
    print("T2_job T2 finish\n")


def T1T2_job_thread():
    thread_1 = threading.Thread(target=T1_job, name='T1')
    thread_2 = threading.Thread(target=T2_job, name='T2')
    thread_1.start()  # 开启T1
    thread_2.start()  # 开启T2
    # 不加 join 打印信息的前后顺序,取决去线程处理数据的速度
    # 在写 join 的时候可以把处理数据量少的写在前面,主要目的是减少主线程或其他依赖线程的等待时间
    thread_2.join()
    thread_1.join()
    print("T1T2_job_thread all done\n")


# 参数 一个列表l和一个队列q
def job(l, q):
    for i in range(len(l)):
        # 给列表元素作平方计算
        l[i] = l[i] ** 2
    q.put(l)  # 多线程调用的函数不能用return返回值


def multithreading():
    q = Queue()  # q中存放返回值,代替return的返回值
    threads = []
    data = [[1, 2, 3], [3, 4, 5], [4, 4, 4], [5, 5, 5]]

    for i in range(4):
        # 定义四个线程, Thread首字母要大写,被调用的job函数没有括号,只是一个索引,参数在后面
        t = threading.Thread(target=job, args=(data[i], q))
        # 开始线程
        t.start()
        # 把每个线程append到线程列表中
        threads.append(t)

    for thread in threads:
        # 阻塞当前线程,主线程暂停执行(或调用 join() 的线程)
        thread.join()
    results = []
    for _ in range(4):
        results.append(q.get())  # q.get()按顺序从q中拿出一个值
    print(results)


def job02(l, q):
    res = sum(l)
    q.put(res)


def multithreading02(l):
    q = Queue()
    threads = []
    for i in range(4):
        t = threading.Thread(target=job02, args=(copy.copy(l), q), name='T%i' % i)
        t.start()
        threads.append(t)
    [t.join() for t in threads]
    total = 0
    for _ in range(4):
        total += q.get()
    print(total)


def normal02(l):
    total = sum(l)
    print(total)


def job1_G():
    global A
    for i in range(10):
        A += 1
        print('job1', A)


def job2_G():
    global A
    for i in range(10):
        A += 10
        print('job2', A)


A = 0
lock = threading.Lock()


def job_1_2_G():
    t1 = threading.Thread(target=job1_G)
    t2 = threading.Thread(target=job2_G)
    t1.start()
    t2.start()
    t1.join()
    t2.join()


def job1_L():
    global A, lock
    lock.acquire()
    try:
        for i in range(10):
            A += 1
            print('job1', A)
    finally:
        lock.release()


def job2_L():
    global A, lock
    # 使用 with 语句可以简化 acquire 和 release 的过程,使代码更简洁清晰。推荐
    with lock:
        for i in range(10):
            A += 10
            print('job2', A)


def job_1_2_L():
    t1 = threading.Thread(target=job1_L)
    t2 = threading.Thread(target=job2_L)
    t1.start()
    t2.start()
    t1.join()
    t2.join()


if __name__ == '__main__':
    print_hi('什么是多线程 Threading')
    my_thread()
    job02_thread()
    T1T2_job_thread()
    multithreading()
    
    l = list(range(1000000))
    s_t = time.time()
    normal02(l * 4)
    print('normal02: ', time.time() - s_t)
    s_t = time.time()
    multithreading02(l)
    print('multithreading02: ', time.time() - s_t)

    job_1_2_G()
    job_1_2_L()

# See PyCharm help at https://www.jetbrains.com/help/pycharm/

复制粘贴并覆盖到你的 main.py 中运行,运行结果如下。

Hi, 什么是多线程 Threading
已激活的线程数  1
所有线程信息  [<_MainThread(MainThread, started 8006736960)>]
现在正在运行的线程  <_MainThread(MainThread, started 8006736960)>
 1 This is a thread of <Thread(我新建的线程, started 6150959104)>
 2 This is a thread of <Thread(T1, started 6150959104)>
thread_job02 T1 start

thread_job02 T1 finish

job02_thread all done

T1_job T1 start

T2_job T2 start

T2_job T2 finish

T1_job T1 finish

T1T2_job_thread all done

[[1, 4, 9], [9, 16, 25], [16, 16, 16], [25, 25, 25]]
1999998000000
normal02:  0.042340993881225586
1999998000000
multithreading02:  0.027905941009521484
job1 1
job1 2
job1 3
job1 4
job1 5
job1job2 16
job2 26
job2 36
job2 46
job2 56
job2 66
job2 76
job2 86
job2 96
job2 106
 6
job1 107
job1 108
job1 109
job1 110
job1 111
job1 112
job1 113
job1 114
job1 115
job1 116
job1 117
job1 118
job1 119
job1 120
job2 130
job2 140
job2 150
job2 160
job2 170
job2 180
job2 190
job2 200
job2 210
job2 220

八 源码地址

代码地址:

国内看 Gitee什么是多线程 Threading.py

国外看 GitHub什么是多线程 Threading.py

引用 莫烦 Python


版权声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐