首页 > Python资料 博客日记
python asyncio (协程、异步编程)
2024-10-27 10:00:06Python资料围观87次
文章目录
1. 简介
asyncio (Asynchronous I/O)模块是一个异步代码库,它提供了一种基于协程(coroutine)和事件循环(event loop)的编程模型。是很多python异步架构的基础,多用于处理高并发网络请求方面的问题
2. 什么是协程
协程(coroutine) 也被称为轻量级线程,是一种可以在单线程中实现多任务的编程方式(在用户态切换上下文)。它是一种特殊的函数或者方法,可以在执行过程中暂停,将控制权交还给调用者。
这使得协程能够在执行过程中等待某些事件发生,比如等待 IO 操作完成,而不会阻塞其他任务的执行
3. 为何异步
- 提高程序性能:异步编程可以充分利用计算资源,减少因为等待I/O操作而导致的阻塞
- 增强用户体验:在设计网络操作,用户界面或任何涉及等待耗时操作的情况下,异步编程可以使程序响应更加灵敏
- 节省资源:相对于创建大量线程或进程来处理并发任务,异步编程使用的资源更少
4. 如何异步
注意:下面使用的例子都是基于 Python3.5+ 来实现的,不适用于之前的 Python 版本。
4.1 简单示例
在学习异步之前,先看如下两个例子的对比,可以更直观的看出使用异步的优势。
例子一:
import time
def func1():
print('1')
time.sleep(1)
print('2')
def func2():
print('3')
time.sleep(1)
print('4')
func1()
func2()
输出如下:这里立即输出了 1,等待了 1s 后,又输出了 2 和 3,然后又等待了 1s, 最后输出了 4
1 # 这里立马输出 1,等待 1s 后才输出 2
2
3 # 2 和 3 基本同时输出,然后等待 1s 后才输出 4
4
例子二:
import asyncio
async def func1():
print('1')
await asyncio.sleep(1)
print('2')
async def func2():
print('3')
await asyncio.sleep(1)
print('4')
task_list = [func1(), func2()]
asyncio.run( asyncio.wait(task_list))
输出如下:这里立即输出了 3 和 1,等待了 1s 后,输出了 4 和 2
3 # 这里 3 和 1基本同时输出,等待 1s 后,4 和 2
1
4
2
第一个例子想必都能看懂,函数依次调用,遇到 time.sleep() 函数,则等待指定时间,然后继续执行,第一个例子大概用时 2s
第二个例子则是使用了 asyncio 异步库,遇到 asyncio.sleep() 函数,则切换任务执行(先暂停执行这个函数,切换到别的函数执行,当指定等待的时间间隔过去后,在继续执行这个函数),第二个例子大概用时 1s
4.2 事件循环
可以简单的理解为一个 while True 循环(死循环),它会循环检测执行某些代码。
只有当任务列表中的所有任务全部执行完,才会退出循环.
import asyncio
async def func():
print("123456")
# 获取一个事件循环的函数,异步事件循环可以管理和调度异步任务的执行
loop = asyncio.get_event_loop()
# 将任务放到 '任务列表',它会一直运行事件循环,直到 result 被完成为止
# 这里的 result 应该是一个可等待对象(协程对象 或者 Future对象)
# loop.run_until_complete(result) # 这行是伪代码,真正的代码示例如下
loop.run_until_complete(func())
# 输出如下:
123456
4.3 协程函数和协程对象
协程函数:使用 async def
语法定义,例如:async def func():
协程对象:调用 协程函数 返回一个协程对象.
# 这是一个协程函数
async def func():
pass
# result 是一个协程对象 (注意:函数内部代码不会执行,只是返回一个协程对象)
result = func()
如果想要运行协程函数内部代码,必须要将协程对象交给事件循环来处理
import asyncio
async def func():
print("执行协程函数 func")
result = func()
"""
loop = asyncio.get_event_loop() # 获取事件循环
loop.run_until_comlete( result ) # 运行事件循环
"""
# 在 python3.7 之后,可以省略上面两行,直接执行下面一行函数即可
asyncio.run(result)
4.4 await 关键字
await
关键字会暂停当前协程的执行(这个关键字只能在协程函数中使用
),让出事件循环的控制权,以便其它协程或异步任务可以执行,一旦被等待的对象完成,await 表达式将返回对象的结果,并且当前协程将继续执行。
await
+ 可等待的对象(协程对象、Task对象、Future对象)
同一个协程任务中,多个await
, 会依次等待可等待对象执行完成;不同协程任务中,遇到await会交替执行。
例子一:
import asyncio
async def func():
print("func() start")
await asyncio.sleep(2) # 这里模拟IO操作
print("func() end")
return "我是 func()"
async def main():
print("执行协程函数 main()")
# 遇到IO操作挂起当前协程(任务),等待IO操作完成之后在继续往下执行,当前协程挂起,事件循环可以去执行其他的协程(任务)
result = await func()
print("io请求结果:", result)
asyncio.run(main())
执行结果如下:
执行协程函数 main()
func() start
func() end
io请求结果: 我是 func()
例子二:
一个协程函数中可以有多个 await
,但是 await
是等待对应的函数得到结果后,才继续向下执行(同一协程任务中)
import asyncio
async def func(flags):
print("func() start")
await asyncio.sleep(2) # 这里模拟IO操作
print("func() end")
return "我是 func(" + flags + ")"
async def main():
print("执行协程函数 main()")
# 遇到IO操作挂起当前协程(任务),等待IO操作完成之后在继续往下执行,当前协程挂起,事件循环可以去执行其他的协程(任务)
result = await func("1")
print("IO第一次的请求结果:", result)
result = await func("2")
print("IO第二次的请求结果:", result)
asyncio.run(main())
执行结果如下:
执行协程函数 main()
func() start
func() end
IO第一次的请求结果: 我是 func(1)
func() start
func() end
IO第二次的请求结果: 我是 func(2)
4.5 Task 对象
Task 对象用于表示一个可并发执行的异步任务。它是 asyncio 中的一个重要概念,用于管理协程的执行。Task对象用于包装协程。
说简单点就是让协程加入事件循环中等待被调度执行,或者说在事件循环中添加任务等待被调度执行。事件循环会在适当的时机执行协程,并在协程遇到阻塞操作时挂起,转而执行其他可运行的任务,当阻塞操作完成,事件循环会恢复挂起的协程执行。
asyncio.create_task() 函数是将协程对象包装成Task对象,该任务将被添加到默认的事件循环中,如果当前没有活动的事件循环,则会引发 RuntimeError 异常。
除了asyncio.create_task() 函数,还可以用低层级的 loop.create_task() 或 asyncio.ensure_future() 函数。
● asyncio.create_task() 是一个全局函数,不依赖特定的事件循环。且它在Python3.7 中才被引入
● loop.create_task() 是事件循环的方法,需要先获取事件循环的引用
● asyncio.ensure_future() 虽然也是一个全局函数,但是它返回的是Future对象。python3.7 之前使用这个函数
例子一:
这里和 3.4 await 关键字: 例子二 中不同的是,这里通过 asyncio.create_task()
函数将多个协程任务添加到事件循环中,这里事件循环中是不同的协程任务,所以这里会交替执行。
import asyncio
async def func(arg):
print(1)
await asyncio.sleep(2)
print(2)
return "返回值: " + arg
async def main():
print("开始创建Task对象")
# 创建 Task 对象, 将当前执行 func 函数任务添加到事件循环
task1 = asyncio.create_task(func("task1"))
task2 = asyncio.create_task(func("task2"))
print("创建Task对象结束")
# 当执行某协程遇到IO操作时, 会自动切换到其他任务
# 此处的 await 是等待Task对象全都执行完毕并获取结果
ret1 = await task1
ret2 = await task2
print(ret1, ret2)
asyncio.run(main())
执行结果如下:
开始创建Task对象
创建Task对象结束
1
1
2
2
返回值: task1 返回值: task2
例子二:
asyncio.wait()
是asyncio
模块中的一个函数,用于等待一组可等待对象完成,它返回一个Future
对象,表示等待任务完成的结果。可以将任务列表(或者其他可迭代对象)作为参数传递给asyncio.wait()
函数。
import asyncio
async def func(arg):
print(1)
await asyncio.sleep(2)
print(2)
return "返回值" + arg
async def main():
print("开始创建Task对象")
# 创建任务列表 task_list,列表中的每个元素都是一个Task对象
task_list = [
asyncio.create_task(func("task1")),
asyncio.create_task(func("task2"))
]
print("创建Task对象结束")
# done: 是完成的 Task 对象,
# pending: 是未完成的 Task 对象
done, pending = await asyncio.wait(task_list, timeout=None)
print(done)
print(pending)
asyncio.run( main() )
执行结果如下
开始创建Task对象
创建Task对象结束
1
1
2
2
# 这是已经完成的Task对象
{<Task finished name='Task-2' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值task1'>, <Task finished name='Task-3' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值task2'>}
# 这是未完成的Task对象,set() 表示集合为空(没有未完成的任务)
set()
asyncio.create_task()
函数有一个 name 的字段可以为创建的任务指定一个可选的名称,这样可以更容易和的识别和追踪特定的任务。
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
async def main():
print("开始创建Task对象")
# 创建 Task 对象, 将当前执行 func 函数任务添加到事件循环
# name 给对象任务列表起名字,方便区分哪个任务结束
task_list = [
asyncio.create_task(func(), name="n1"),
asyncio.create_task(func(), name="n2")
]
print("创建Task对象结束")
# done 是完成的 Task 对象,
# pending 是未完成的 Task 对象
done, pending = await asyncio.wait(task_list, timeout=None)
print(done)
asyncio.run( main() )
# 输出如下:
开始创建Task对象
创建Task对象结束
1
1
2
2
{<Task finished name='n1' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值'>, <Task finished name='n2' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值'>}
例子三:
例子三 和 例子二 相比,task_list 中直接写任务名称,没有通过 asyncio.create_task()
创建Task对象,这是因为create_task()
函数创建任务之后会自动添加到事件循环中。这里定义的 task_list 列表还没有创建事件循环。并且 task_list 列表中的任务也并不需要手动调用 asyncio.create_task()
函数,因为 asyncio.task()
函数会隐式的将协程对象转换为任务(Task对象)。
在asyncio.wait()
的参数中,可以直接包含协程对象,而不必显示的将其转换为任务,它会自动的将协程对象封装成任务,并进行等待和管理。
import asyncio
async def func(arg):
print(1)
await asyncio.sleep(2)
print(2)
return "返回值" + arg
task_list = [
func("task1"),
func("task2")
]
"""
这里这样写是错误的,因为asyncio.create_task()是将协程对象包装成任务,并添加到
事件循环中,但是这里事件循环还没有创建(asyncio.run() 才会创建事件循环),所以会报错
task_list = [
asyncio.create_task(func("task1")),
asyncio.create_task(func("task2"))
]
"""
done, pending = asyncio.run( asyncio.wait(task_list))
print(done)
执行结果如下:
1
1
2
2
{<Task finished name='Task-2' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值task2'>, <Task finished name='Task-3' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值task1'>}
4.6 asyncio 的 Future 对象
asyncio
中的 Future
对象是一个更偏向底层的可等待对象,代表异步任务的最终结果。通常不会直接用到这个对象,而是直接使用Task对象来完成任务的创建和状态的追踪。
它是 Task 对象的基类,Task 继承 Future, Task 对象内部 await
结果的处理基 于Future
对象来的。
import asyncio
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),这个任务什么都不做
fut = loop.create_future()
# 等待任务结果(Future 对象),没结果会一直等待下去
await fut
asyncio.run( main() )
例子一:
import asyncio
async def set_after(fut: asyncio.Future):
await asyncio.sleep(2)
#
fut.set_result("success")
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),没有绑定任何行为,这个任务永远不知道什么时候结束
fut = loop.create_future()
# 创建一个任务(Task 对象),绑定了 set_after 函数,函数内部在等待 2s 后,会给 fut 赋值
# 即手动设置 Future 任务的最终结果,那么 fut 就可以结束了
await loop.create_task(set_after(fut))
# 等待 Future 对象获取最终结果,否则一直等待下去
# 这里等待 Future 对象的时候,遇到阻塞操作(因为 Future 对象没有结果会一直等待下去), 所以会切换到 set_after 任务去执行
# 而 set_after 任务在等待 2s 后,会给 Future 对象赋值,所以此时 Future 对象返回, 代码执行结束
data = await fut
print(data)
asyncio.run( main() )
例子二:
在 asyncio
模块中,还可以使用 asyncio.Future()
函数来创建一个 Future
对象。它与 loop.create_future()
创建的 Future
对象不同的是,loop.create_future()
创建的对象与特定的事件循环相关联,这意味着,该 Future
对象只能在创建它的事件循环中使用,而 asyncio.Future()
创建的 Future
对象没有特定事件循环想关联,所以可以在任何事件循环中使用。
import asyncio
async def task1(future):
await asyncio.sleep(2)
future.set_result('Task 1 completed')
async def task2(future):
await asyncio.sleep(3)
future.set_result('Task 2 completed')
async def main():
loop = asyncio.get_running_loop()
fut = loop.create_future()
# 启动两个任务,并共享同一个 Future 对象
asyncio.create_task(task1(fut))
asyncio.create_task(task2(fut))
# 等待 Future 对象的结果
result = await fut
print(result)
asyncio.run(main())
在上述示例中,我们定义了两个任务函数 task1() 和 task2(),它们分别会在一段时间后设置共享的Future对象的结果。在main()函数中,我们获取当前的事件循环对象,并使用loop.create_future()创建了一个 Future 对象 fut。然后,通过 asyncio.create_task() 函数启动了两个任务,它们共享同一个 Future 对象 fut。最后,使用 await 关键字等待 fut 的结果,并输出结果。
在这个示例中,loop.create_future() 的优势是可以将一个 Future 对象传递给多个协程,使得多个协程可以共享同一个 Future 对象,并在不同的时间点设置该对象的结果。这样可以实现更灵活的协同处理,例如在一个协程中等待多个任务完成后再继续执行。
4.7 concurrent.futures 的 Future 对象
首先,这个对象和 asyncio.Future 对象没有任何关系,它是使用线程池、进程池实现异步操作时用到的对象.
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func(value):
time.sleep(1)
print(value)
# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)
# 创建进程池
# pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
fut = pool.submit(func, i)
print(fut)
他们为不同的应用场景设计,但是 Python 提供了一个将 futures.Future
对象包装成 asyncio.Future
对象的函数 asyncio.warp_future
。
import time
import asyncio
import concurrent.futures
def func1():
# 某个耗时操作
time.sleep(2)
return "123"
async def main():
loop = asyncio.get_running_loop()
# 1. Run in the default loop's executor ( 默认ThreadPoolExecutor )
# 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行func1函数,并返回一个concurrent.futures.Future对象
# 第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装为asycio.Future对象。
# 因为concurrent.futures.Future对象不支持await语法,所以需要包装为 asycio.Future对象 才能使用。
fut = loop.run_in_executor(None, func1)
result = await fut
print('default thread pool', result)
# 2. Run in a custom thread pool:
# with concurrent.futures.ThreadPoolExecutor() as pool:
# result = await loop.run_in_executor(
# pool, func1)
# print('custom thread pool', result)
# 3. Run in a custom process pool:
# with concurrent.futures.ProcessPoolExecutor() as pool:
# result = await loop.run_in_executor(
# pool, func1)
# print('custom process pool', result)
asyncio.run(main())
loop.run_in_executor()
函数上面注释已经说清楚了,其实就是内部调用了 线程池或者进程池的submit
方法,然后调用了 asyncio.wrap_future()
,将起包装成了一个 asyncio 的 Future 对象。
loop.run_in_executor()
的作用就是将阻塞的函数委托给线程或进程池,在异步环境中运行它,以避免阻塞事件循环。它返回一个asyncio.Future
对象,表示在执行器中运行的函数的结果。
注意:我通过查看代码,发现里面调用的是 futures.wrap_future() 函数,而不是 asyncio.wrap_future()函数,可能不同版本之间不一样吧,但是这两个函数的功能是一样的,都是将concurrent.futures.Future对象包装为asyncio.Future对象,以便在 asyncio 的事件循环中进行异步处理,两者之间的主要区别在于模块的不同,一个是concurrent.futures模块中的函数,另一个是asyncio模块中的函数。
下面这个例子是 asyncio 模块加上不支持异步模块的混合使用。
import requests
import asyncio
import os
async def download_images(url):
# 发送网络请求,下载图片,遇到网络IO,自动切换其他任务
print("开始下载:", url)
loop = asyncio.get_event_loop()
# requests 模块默认不支持异步操作,所以使用线程池来配合实现
future = loop.run_in_executor(None, requests.get, url)
response = await future
print("下载完成")
file_name = url.rsplit("_")[-1]
with open(file_name, mode = 'wb') as f:
f.write(response.content)
if __name__ == '__main__':
url_list = [
"https://www3.autoimg.cn/newsdfs/g26/M04/B7/07/1488x0_1_autohomecar__CjIFVmR2EjmAV2uOAAJsOzOueIg341.jpg",
"https://car2.autoimg.cn/cardfs/product/g24/M06/76/C9/1488x0_1_autohomecar__Chtk3WQQgK-AA6DDACIn_NIMSLw990.jpg",
"https://www2.autoimg.cn/newsdfs/g26/M08/82/C6/1488x0_1_autohomecar__ChxkjmR2BUuADxtRAAVgtryBmXQ532.jpg"
]
tasks = [ download_images(url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
4.8 异步迭代器
异步迭代器: 实现了 __aiter__()
和 __anext__()
方法的对象。__anext__
必须返回一个 awaitable
对象。async for
会处理异步迭代器的__anext__()
方法所返回的可等待对象,直到其引发一个 StopAsyncIteration
异常。由 PEP 492 引入。
awaitable
对象 是指在异步编程中可以使用 await 关键字进行等待操作的对象
异步可迭代对象: 可在 async for
语句中被使用的对象。必须通过它的 __aiter__()
方法返回一个 asynchronous iterator
。由 PEP 492 引入。
以上的概念从该链接复制:python 术语对照表
import asyncio
class Reader(object):
""" 自定义异步迭代器, 同时也是异步可迭代对象"""
def __init__(self):
self.count = 0
async def readline(self):
self.count += 1
if self.count == 100:
return None
return self.count
def __aiter__(self):
return self
async def __anext__(self):
val = await self.readline()
if val is None:
raise StopAsyncIteration
return val
async def func():
obj = Reader()
# async for 语句必须写在协程函数里面
async for item in obj:
print(item)
asyncio.run(func())
4.9 异步上下文管理器
上下文管理器 是Python 中用于管理资源的一种机制,它提供了一种方便的方式来管理资源的获取和释放,无论是在正常情况下还是发生异常的情况。它可以使用 with 语句来创建一个上下文,并确保在离开该上下文时正确处理资源.
异步上下文管理器 它通过定义 __aenter__()
和 __aexit__()
方法来对 async with
语句中的环境进行控制。由 PEP 492 引入。
import asyncio
class AsyncContextManger:
def __init__(self, conn):
self.conn = conn
async def do_something(self):
# 异步操作数据库
return 666
async def __aenter__(self):
# 异步链接数据库
self.conn = await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc, tb):
# 异步关闭数据库链接
await asyncio.sleep(1)
# async with 方法需要在异步函数内进行使用
# 当使用是会执行当前类中得__aenter__ 这个方法返回什么那么f就是什么[可以进行设置数据库链接]
# 当上下文完成后 就会自动使用__aexit__方法[关闭数据库链接]
async def func():
async with AsyncContextManger() as f:
result = await f.do_something()
print (result)
asyncio.run(func())
4.10 uvloop
uvloop
是 asyncio
的事件循环的替代方案。是第三方的人员写的。它的事件循环效率是大于默认 asyncio
的事件循环的, 性能更高.
注意:windows 下面并没有 uvloop,在 Linux 上面是可以安装成功的
pip3 install uvloop
import asyncio
import uvloop
# TODO:将 asyncio 里面的事件循环替换为 uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 编写 asyncio 的代码
# 内部循环事件会自动化变为 uvloop
asyncio.run( ... )
4.11 异步库
在一些情况下,我们可能需要将 asyncio
与其他异步框架一起使用,asyncio
支持与其它库一起使用,例如aioredis、aiomysql、aiohttp
等,这些库都实现了 asyncio
的协议,并且能够与 asyncio
无缝的协作。
pip3 install aioredis # 异步操作redis的库
pip3 install aiomysql # 异步操作mysql的库
pip3 install aiohttp # 异步编写http的库
标签:
相关文章
最新发布
- 光流法结合深度学习神经网络的原理及应用(完整代码都有Python opencv)
- Python 图像处理进阶:特征提取与图像分类
- 大数据可视化分析-基于python的电影数据分析及可视化系统_9532dr50
- 【Python】入门(运算、输出、数据类型)
- 【Python】第一弹---解锁编程新世界:深入理解计算机基础与Python入门指南
- 华为OD机试E卷 --第k个排列 --24年OD统一考试(Java & JS & Python & C & C++)
- Python已安装包在import时报错未找到的解决方法
- 【Python】自动化神器PyAutoGUI —告别手动操作,一键模拟鼠标键盘,玩转微信及各种软件自动化
- Pycharm连接SQL Sever(详细教程)
- Python编程练习题及解析(49题)
点击排行
- 版本匹配指南:Numpy版本和Python版本的对应关系
- 版本匹配指南:PyTorch版本、torchvision 版本和Python版本的对应关系
- Python 可视化 web 神器:streamlit、Gradio、dash、nicegui;低代码 Python Web 框架:PyWebIO
- 相关性分析——Pearson相关系数+热力图(附data和Python完整代码)
- Anaconda版本和Python版本对应关系(持续更新...)
- Python与PyTorch的版本对应
- Windows上安装 Python 环境并配置环境变量 (超详细教程)
- Python pyinstaller打包exe最完整教程