首页 > Python资料 博客日记
python asyncio (协程、异步编程)
2024-10-27 10:00:06Python资料围观42次
文章目录
1. 简介
asyncio (Asynchronous I/O)模块是一个异步代码库,它提供了一种基于协程(coroutine)和事件循环(event loop)的编程模型。是很多python异步架构的基础,多用于处理高并发网络请求方面的问题
2. 什么是协程
协程(coroutine) 也被称为轻量级线程,是一种可以在单线程中实现多任务的编程方式(在用户态切换上下文)。它是一种特殊的函数或者方法,可以在执行过程中暂停,将控制权交还给调用者。
这使得协程能够在执行过程中等待某些事件发生,比如等待 IO 操作完成,而不会阻塞其他任务的执行
3. 为何异步
- 提高程序性能:异步编程可以充分利用计算资源,减少因为等待I/O操作而导致的阻塞
- 增强用户体验:在设计网络操作,用户界面或任何涉及等待耗时操作的情况下,异步编程可以使程序响应更加灵敏
- 节省资源:相对于创建大量线程或进程来处理并发任务,异步编程使用的资源更少
4. 如何异步
注意:下面使用的例子都是基于 Python3.5+ 来实现的,不适用于之前的 Python 版本。
4.1 简单示例
在学习异步之前,先看如下两个例子的对比,可以更直观的看出使用异步的优势。
例子一:
import time
def func1():
print('1')
time.sleep(1)
print('2')
def func2():
print('3')
time.sleep(1)
print('4')
func1()
func2()
输出如下:这里立即输出了 1,等待了 1s 后,又输出了 2 和 3,然后又等待了 1s, 最后输出了 4
1 # 这里立马输出 1,等待 1s 后才输出 2
2
3 # 2 和 3 基本同时输出,然后等待 1s 后才输出 4
4
例子二:
import asyncio
async def func1():
print('1')
await asyncio.sleep(1)
print('2')
async def func2():
print('3')
await asyncio.sleep(1)
print('4')
task_list = [func1(), func2()]
asyncio.run( asyncio.wait(task_list))
输出如下:这里立即输出了 3 和 1,等待了 1s 后,输出了 4 和 2
3 # 这里 3 和 1基本同时输出,等待 1s 后,4 和 2
1
4
2
第一个例子想必都能看懂,函数依次调用,遇到 time.sleep() 函数,则等待指定时间,然后继续执行,第一个例子大概用时 2s
第二个例子则是使用了 asyncio 异步库,遇到 asyncio.sleep() 函数,则切换任务执行(先暂停执行这个函数,切换到别的函数执行,当指定等待的时间间隔过去后,在继续执行这个函数),第二个例子大概用时 1s
4.2 事件循环
可以简单的理解为一个 while True 循环(死循环),它会循环检测执行某些代码。
只有当任务列表中的所有任务全部执行完,才会退出循环.
import asyncio
async def func():
print("123456")
# 获取一个事件循环的函数,异步事件循环可以管理和调度异步任务的执行
loop = asyncio.get_event_loop()
# 将任务放到 '任务列表',它会一直运行事件循环,直到 result 被完成为止
# 这里的 result 应该是一个可等待对象(协程对象 或者 Future对象)
# loop.run_until_complete(result) # 这行是伪代码,真正的代码示例如下
loop.run_until_complete(func())
# 输出如下:
123456
4.3 协程函数和协程对象
协程函数:使用 async def
语法定义,例如:async def func():
协程对象:调用 协程函数 返回一个协程对象.
# 这是一个协程函数
async def func():
pass
# result 是一个协程对象 (注意:函数内部代码不会执行,只是返回一个协程对象)
result = func()
如果想要运行协程函数内部代码,必须要将协程对象交给事件循环来处理
import asyncio
async def func():
print("执行协程函数 func")
result = func()
"""
loop = asyncio.get_event_loop() # 获取事件循环
loop.run_until_comlete( result ) # 运行事件循环
"""
# 在 python3.7 之后,可以省略上面两行,直接执行下面一行函数即可
asyncio.run(result)
4.4 await 关键字
await
关键字会暂停当前协程的执行(这个关键字只能在协程函数中使用
),让出事件循环的控制权,以便其它协程或异步任务可以执行,一旦被等待的对象完成,await 表达式将返回对象的结果,并且当前协程将继续执行。
await
+ 可等待的对象(协程对象、Task对象、Future对象)
同一个协程任务中,多个await
, 会依次等待可等待对象执行完成;不同协程任务中,遇到await会交替执行。
例子一:
import asyncio
async def func():
print("func() start")
await asyncio.sleep(2) # 这里模拟IO操作
print("func() end")
return "我是 func()"
async def main():
print("执行协程函数 main()")
# 遇到IO操作挂起当前协程(任务),等待IO操作完成之后在继续往下执行,当前协程挂起,事件循环可以去执行其他的协程(任务)
result = await func()
print("io请求结果:", result)
asyncio.run(main())
执行结果如下:
执行协程函数 main()
func() start
func() end
io请求结果: 我是 func()
例子二:
一个协程函数中可以有多个 await
,但是 await
是等待对应的函数得到结果后,才继续向下执行(同一协程任务中)
import asyncio
async def func(flags):
print("func() start")
await asyncio.sleep(2) # 这里模拟IO操作
print("func() end")
return "我是 func(" + flags + ")"
async def main():
print("执行协程函数 main()")
# 遇到IO操作挂起当前协程(任务),等待IO操作完成之后在继续往下执行,当前协程挂起,事件循环可以去执行其他的协程(任务)
result = await func("1")
print("IO第一次的请求结果:", result)
result = await func("2")
print("IO第二次的请求结果:", result)
asyncio.run(main())
执行结果如下:
执行协程函数 main()
func() start
func() end
IO第一次的请求结果: 我是 func(1)
func() start
func() end
IO第二次的请求结果: 我是 func(2)
4.5 Task 对象
Task 对象用于表示一个可并发执行的异步任务。它是 asyncio 中的一个重要概念,用于管理协程的执行。Task对象用于包装协程。
说简单点就是让协程加入事件循环中等待被调度执行,或者说在事件循环中添加任务等待被调度执行。事件循环会在适当的时机执行协程,并在协程遇到阻塞操作时挂起,转而执行其他可运行的任务,当阻塞操作完成,事件循环会恢复挂起的协程执行。
asyncio.create_task() 函数是将协程对象包装成Task对象,该任务将被添加到默认的事件循环中,如果当前没有活动的事件循环,则会引发 RuntimeError 异常。
除了asyncio.create_task() 函数,还可以用低层级的 loop.create_task() 或 asyncio.ensure_future() 函数。
● asyncio.create_task() 是一个全局函数,不依赖特定的事件循环。且它在Python3.7 中才被引入
● loop.create_task() 是事件循环的方法,需要先获取事件循环的引用
● asyncio.ensure_future() 虽然也是一个全局函数,但是它返回的是Future对象。python3.7 之前使用这个函数
例子一:
这里和 3.4 await 关键字: 例子二 中不同的是,这里通过 asyncio.create_task()
函数将多个协程任务添加到事件循环中,这里事件循环中是不同的协程任务,所以这里会交替执行。
import asyncio
async def func(arg):
print(1)
await asyncio.sleep(2)
print(2)
return "返回值: " + arg
async def main():
print("开始创建Task对象")
# 创建 Task 对象, 将当前执行 func 函数任务添加到事件循环
task1 = asyncio.create_task(func("task1"))
task2 = asyncio.create_task(func("task2"))
print("创建Task对象结束")
# 当执行某协程遇到IO操作时, 会自动切换到其他任务
# 此处的 await 是等待Task对象全都执行完毕并获取结果
ret1 = await task1
ret2 = await task2
print(ret1, ret2)
asyncio.run(main())
执行结果如下:
开始创建Task对象
创建Task对象结束
1
1
2
2
返回值: task1 返回值: task2
例子二:
asyncio.wait()
是asyncio
模块中的一个函数,用于等待一组可等待对象完成,它返回一个Future
对象,表示等待任务完成的结果。可以将任务列表(或者其他可迭代对象)作为参数传递给asyncio.wait()
函数。
import asyncio
async def func(arg):
print(1)
await asyncio.sleep(2)
print(2)
return "返回值" + arg
async def main():
print("开始创建Task对象")
# 创建任务列表 task_list,列表中的每个元素都是一个Task对象
task_list = [
asyncio.create_task(func("task1")),
asyncio.create_task(func("task2"))
]
print("创建Task对象结束")
# done: 是完成的 Task 对象,
# pending: 是未完成的 Task 对象
done, pending = await asyncio.wait(task_list, timeout=None)
print(done)
print(pending)
asyncio.run( main() )
执行结果如下
开始创建Task对象
创建Task对象结束
1
1
2
2
# 这是已经完成的Task对象
{<Task finished name='Task-2' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值task1'>, <Task finished name='Task-3' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值task2'>}
# 这是未完成的Task对象,set() 表示集合为空(没有未完成的任务)
set()
asyncio.create_task()
函数有一个 name 的字段可以为创建的任务指定一个可选的名称,这样可以更容易和的识别和追踪特定的任务。
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
async def main():
print("开始创建Task对象")
# 创建 Task 对象, 将当前执行 func 函数任务添加到事件循环
# name 给对象任务列表起名字,方便区分哪个任务结束
task_list = [
asyncio.create_task(func(), name="n1"),
asyncio.create_task(func(), name="n2")
]
print("创建Task对象结束")
# done 是完成的 Task 对象,
# pending 是未完成的 Task 对象
done, pending = await asyncio.wait(task_list, timeout=None)
print(done)
asyncio.run( main() )
# 输出如下:
开始创建Task对象
创建Task对象结束
1
1
2
2
{<Task finished name='n1' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值'>, <Task finished name='n2' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值'>}
例子三:
例子三 和 例子二 相比,task_list 中直接写任务名称,没有通过 asyncio.create_task()
创建Task对象,这是因为create_task()
函数创建任务之后会自动添加到事件循环中。这里定义的 task_list 列表还没有创建事件循环。并且 task_list 列表中的任务也并不需要手动调用 asyncio.create_task()
函数,因为 asyncio.task()
函数会隐式的将协程对象转换为任务(Task对象)。
在asyncio.wait()
的参数中,可以直接包含协程对象,而不必显示的将其转换为任务,它会自动的将协程对象封装成任务,并进行等待和管理。
import asyncio
async def func(arg):
print(1)
await asyncio.sleep(2)
print(2)
return "返回值" + arg
task_list = [
func("task1"),
func("task2")
]
"""
这里这样写是错误的,因为asyncio.create_task()是将协程对象包装成任务,并添加到
事件循环中,但是这里事件循环还没有创建(asyncio.run() 才会创建事件循环),所以会报错
task_list = [
asyncio.create_task(func("task1")),
asyncio.create_task(func("task2"))
]
"""
done, pending = asyncio.run( asyncio.wait(task_list))
print(done)
执行结果如下:
1
1
2
2
{<Task finished name='Task-2' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值task2'>, <Task finished name='Task-3' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值task1'>}
4.6 asyncio 的 Future 对象
asyncio
中的 Future
对象是一个更偏向底层的可等待对象,代表异步任务的最终结果。通常不会直接用到这个对象,而是直接使用Task对象来完成任务的创建和状态的追踪。
它是 Task 对象的基类,Task 继承 Future, Task 对象内部 await
结果的处理基 于Future
对象来的。
import asyncio
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),这个任务什么都不做
fut = loop.create_future()
# 等待任务结果(Future 对象),没结果会一直等待下去
await fut
asyncio.run( main() )
例子一:
import asyncio
async def set_after(fut: asyncio.Future):
await asyncio.sleep(2)
#
fut.set_result("success")
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),没有绑定任何行为,这个任务永远不知道什么时候结束
fut = loop.create_future()
# 创建一个任务(Task 对象),绑定了 set_after 函数,函数内部在等待 2s 后,会给 fut 赋值
# 即手动设置 Future 任务的最终结果,那么 fut 就可以结束了
await loop.create_task(set_after(fut))
# 等待 Future 对象获取最终结果,否则一直等待下去
# 这里等待 Future 对象的时候,遇到阻塞操作(因为 Future 对象没有结果会一直等待下去), 所以会切换到 set_after 任务去执行
# 而 set_after 任务在等待 2s 后,会给 Future 对象赋值,所以此时 Future 对象返回, 代码执行结束
data = await fut
print(data)
asyncio.run( main() )
例子二:
在 asyncio
模块中,还可以使用 asyncio.Future()
函数来创建一个 Future
对象。它与 loop.create_future()
创建的 Future
对象不同的是,loop.create_future()
创建的对象与特定的事件循环相关联,这意味着,该 Future
对象只能在创建它的事件循环中使用,而 asyncio.Future()
创建的 Future
对象没有特定事件循环想关联,所以可以在任何事件循环中使用。
import asyncio
async def task1(future):
await asyncio.sleep(2)
future.set_result('Task 1 completed')
async def task2(future):
await asyncio.sleep(3)
future.set_result('Task 2 completed')
async def main():
loop = asyncio.get_running_loop()
fut = loop.create_future()
# 启动两个任务,并共享同一个 Future 对象
asyncio.create_task(task1(fut))
asyncio.create_task(task2(fut))
# 等待 Future 对象的结果
result = await fut
print(result)
asyncio.run(main())
在上述示例中,我们定义了两个任务函数 task1() 和 task2(),它们分别会在一段时间后设置共享的Future对象的结果。在main()函数中,我们获取当前的事件循环对象,并使用loop.create_future()创建了一个 Future 对象 fut。然后,通过 asyncio.create_task() 函数启动了两个任务,它们共享同一个 Future 对象 fut。最后,使用 await 关键字等待 fut 的结果,并输出结果。
在这个示例中,loop.create_future() 的优势是可以将一个 Future 对象传递给多个协程,使得多个协程可以共享同一个 Future 对象,并在不同的时间点设置该对象的结果。这样可以实现更灵活的协同处理,例如在一个协程中等待多个任务完成后再继续执行。
4.7 concurrent.futures 的 Future 对象
首先,这个对象和 asyncio.Future 对象没有任何关系,它是使用线程池、进程池实现异步操作时用到的对象.
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func(value):
time.sleep(1)
print(value)
# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)
# 创建进程池
# pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
fut = pool.submit(func, i)
print(fut)
他们为不同的应用场景设计,但是 Python 提供了一个将 futures.Future
对象包装成 asyncio.Future
对象的函数 asyncio.warp_future
。
import time
import asyncio
import concurrent.futures
def func1():
# 某个耗时操作
time.sleep(2)
return "123"
async def main():
loop = asyncio.get_running_loop()
# 1. Run in the default loop's executor ( 默认ThreadPoolExecutor )
# 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行func1函数,并返回一个concurrent.futures.Future对象
# 第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装为asycio.Future对象。
# 因为concurrent.futures.Future对象不支持await语法,所以需要包装为 asycio.Future对象 才能使用。
fut = loop.run_in_executor(None, func1)
result = await fut
print('default thread pool', result)
# 2. Run in a custom thread pool:
# with concurrent.futures.ThreadPoolExecutor() as pool:
# result = await loop.run_in_executor(
# pool, func1)
# print('custom thread pool', result)
# 3. Run in a custom process pool:
# with concurrent.futures.ProcessPoolExecutor() as pool:
# result = await loop.run_in_executor(
# pool, func1)
# print('custom process pool', result)
asyncio.run(main())
loop.run_in_executor()
函数上面注释已经说清楚了,其实就是内部调用了 线程池或者进程池的submit
方法,然后调用了 asyncio.wrap_future()
,将起包装成了一个 asyncio 的 Future 对象。
loop.run_in_executor()
的作用就是将阻塞的函数委托给线程或进程池,在异步环境中运行它,以避免阻塞事件循环。它返回一个asyncio.Future
对象,表示在执行器中运行的函数的结果。
注意:我通过查看代码,发现里面调用的是 futures.wrap_future() 函数,而不是 asyncio.wrap_future()函数,可能不同版本之间不一样吧,但是这两个函数的功能是一样的,都是将concurrent.futures.Future对象包装为asyncio.Future对象,以便在 asyncio 的事件循环中进行异步处理,两者之间的主要区别在于模块的不同,一个是concurrent.futures模块中的函数,另一个是asyncio模块中的函数。
下面这个例子是 asyncio 模块加上不支持异步模块的混合使用。
import requests
import asyncio
import os
async def download_images(url):
# 发送网络请求,下载图片,遇到网络IO,自动切换其他任务
print("开始下载:", url)
loop = asyncio.get_event_loop()
# requests 模块默认不支持异步操作,所以使用线程池来配合实现
future = loop.run_in_executor(None, requests.get, url)
response = await future
print("下载完成")
file_name = url.rsplit("_")[-1]
with open(file_name, mode = 'wb') as f:
f.write(response.content)
if __name__ == '__main__':
url_list = [
"https://www3.autoimg.cn/newsdfs/g26/M04/B7/07/1488x0_1_autohomecar__CjIFVmR2EjmAV2uOAAJsOzOueIg341.jpg",
"https://car2.autoimg.cn/cardfs/product/g24/M06/76/C9/1488x0_1_autohomecar__Chtk3WQQgK-AA6DDACIn_NIMSLw990.jpg",
"https://www2.autoimg.cn/newsdfs/g26/M08/82/C6/1488x0_1_autohomecar__ChxkjmR2BUuADxtRAAVgtryBmXQ532.jpg"
]
tasks = [ download_images(url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
4.8 异步迭代器
异步迭代器: 实现了 __aiter__()
和 __anext__()
方法的对象。__anext__
必须返回一个 awaitable
对象。async for
会处理异步迭代器的__anext__()
方法所返回的可等待对象,直到其引发一个 StopAsyncIteration
异常。由 PEP 492 引入。
awaitable
对象 是指在异步编程中可以使用 await 关键字进行等待操作的对象
异步可迭代对象: 可在 async for
语句中被使用的对象。必须通过它的 __aiter__()
方法返回一个 asynchronous iterator
。由 PEP 492 引入。
以上的概念从该链接复制:python 术语对照表
import asyncio
class Reader(object):
""" 自定义异步迭代器, 同时也是异步可迭代对象"""
def __init__(self):
self.count = 0
async def readline(self):
self.count += 1
if self.count == 100:
return None
return self.count
def __aiter__(self):
return self
async def __anext__(self):
val = await self.readline()
if val is None:
raise StopAsyncIteration
return val
async def func():
obj = Reader()
# async for 语句必须写在协程函数里面
async for item in obj:
print(item)
asyncio.run(func())
4.9 异步上下文管理器
上下文管理器 是Python 中用于管理资源的一种机制,它提供了一种方便的方式来管理资源的获取和释放,无论是在正常情况下还是发生异常的情况。它可以使用 with 语句来创建一个上下文,并确保在离开该上下文时正确处理资源.
异步上下文管理器 它通过定义 __aenter__()
和 __aexit__()
方法来对 async with
语句中的环境进行控制。由 PEP 492 引入。
import asyncio
class AsyncContextManger:
def __init__(self, conn):
self.conn = conn
async def do_something(self):
# 异步操作数据库
return 666
async def __aenter__(self):
# 异步链接数据库
self.conn = await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc, tb):
# 异步关闭数据库链接
await asyncio.sleep(1)
# async with 方法需要在异步函数内进行使用
# 当使用是会执行当前类中得__aenter__ 这个方法返回什么那么f就是什么[可以进行设置数据库链接]
# 当上下文完成后 就会自动使用__aexit__方法[关闭数据库链接]
async def func():
async with AsyncContextManger() as f:
result = await f.do_something()
print (result)
asyncio.run(func())
4.10 uvloop
uvloop
是 asyncio
的事件循环的替代方案。是第三方的人员写的。它的事件循环效率是大于默认 asyncio
的事件循环的, 性能更高.
注意:windows 下面并没有 uvloop,在 Linux 上面是可以安装成功的
pip3 install uvloop
import asyncio
import uvloop
# TODO:将 asyncio 里面的事件循环替换为 uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 编写 asyncio 的代码
# 内部循环事件会自动化变为 uvloop
asyncio.run( ... )
4.11 异步库
在一些情况下,我们可能需要将 asyncio
与其他异步框架一起使用,asyncio
支持与其它库一起使用,例如aioredis、aiomysql、aiohttp
等,这些库都实现了 asyncio
的协议,并且能够与 asyncio
无缝的协作。
pip3 install aioredis # 异步操作redis的库
pip3 install aiomysql # 异步操作mysql的库
pip3 install aiohttp # 异步编写http的库
标签:
相关文章
最新发布
- 【Python】selenium安装+Microsoft Edge驱动器下载配置流程
- Python 中自动打开网页并点击[自动化脚本],Selenium
- Anaconda基础使用
- 【Python】成功解决 TypeError: ‘<‘ not supported between instances of ‘str’ and ‘int’
- manim边学边做--三维的点和线
- CPython是最常用的Python解释器之一,也是Python官方实现。它是用C语言编写的,旨在提供一个高效且易于使用的Python解释器。
- Anaconda安装配置Jupyter(2024最新版)
- Python中读取Excel最快的几种方法!
- Python某城市美食商家爬虫数据可视化分析和推荐查询系统毕业设计论文开题报告
- 如何使用 Python 批量检测和转换 JSONL 文件编码为 UTF-8
点击排行
- 版本匹配指南:Numpy版本和Python版本的对应关系
- 版本匹配指南:PyTorch版本、torchvision 版本和Python版本的对应关系
- Python 可视化 web 神器:streamlit、Gradio、dash、nicegui;低代码 Python Web 框架:PyWebIO
- 相关性分析——Pearson相关系数+热力图(附data和Python完整代码)
- Python与PyTorch的版本对应
- Anaconda版本和Python版本对应关系(持续更新...)
- Python pyinstaller打包exe最完整教程
- Could not build wheels for llama-cpp-python, which is required to install pyproject.toml-based proj