首页 > Python资料 博客日记

【Python系列】异步任务的终止

2024-09-27 14:00:05Python资料围观25

这篇文章介绍了【Python系列】异步任务的终止,分享给大家做个参考,收藏Python资料网收获更多编程知识

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。

非常期待和您一起在这个小小的网络世界里共同探索、学习和成长。💝💝💝 ✨✨ 欢迎订阅本专栏 ✨✨

一.需求说明

1.需求

我有一个很耗时的异步任务,我希望能通过接口的方式终止这个异步任务,在 python 中如何实现呢?

2.实现方式

可以使用 asyncio 相关的 api 进行实现

  1. 创建并运行异步任务
  2. 取消异步任务
  3. api 地址

3.asyncio 介绍

asyncio 是 Python 标准库中的一个模块,用于编写单线程的并发代码。它使用 async/await 语法来编写异步代码,允许你以非阻塞的方式执行 I/O 操作,如网络请求、文件读写等。

以下是 asyncio 的一些关键特性:

  1. 事件循环(Event Loop)asyncio 使用事件循环来管理所有异步操作。事件循环是一个运行在后台的无限循环,它不断地检查是否有新的事件(如 I/O 完成、定时器到期等)需要处理。
  2. 协程(Coroutines):协程是 asyncio 中的并发单元,它们是使用 async def 定义的函数。协程在遇到 await 表达式时会暂停执行,直到等待的任务完成,然后继续执行。
  3. 任务(Tasks):任务是协程的执行单元。你可以使用 asyncio.create_task() 将协程包装成任务,然后将其放入事件循环中执行。
  4. FuturesFuture 是一个低级别的并发机制,它代表了一个尚未完成的操作。Future 可以被等待,当操作完成时,它会通知等待者。
  5. 同步原语asyncio 提供了同步原语,如锁(Lock)、事件(Event)、条件变量(Condition)和信号量(Semaphore),用于在异步代码中处理共享资源的同步。
  6. 流(Streams)asyncio 还提供了流(如 StreamReaderStreamWriter),它们用于处理网络通信中的二进制数据流。
  7. 子进程asyncio 允许你异步地创建和管理子进程。
  8. 超时和取消:你可以为异步操作设置超时,或者在必要时取消正在进行的操作。

二.实现步骤

1.异步终止任务

from typing import Any

from manager.graph.manager import GraphManager
from manager.schemas.manager import SchemasManager
from utils import logger
from fastapi import APIRouter
from manager.graph.query_graph import LocalQueryGraph
import time
import asyncio

GraphRouter = APIRouter(prefix="/test", tags=["测试管理"])
global task_map
task_map: dict[Any, Any] = {}

class GraphRouterMap:

    # 长时间运行的异步任务
    @staticmethod
    async def long_running_task():
        try:
            while True:
                # 模拟任务运行
                print("长时间运行的任务正在运行.....")
                await asyncio.sleep(2)
        except asyncio.CancelledError:
            print("任务被取消")

    @staticmethod
    @GraphRouter.get("/start", summary="启动任务", description="启动任务")
    async def start_task():
        # 启动长时间任务,并返回任务对象
        run_id = time.strftime("%Y%m%d-%H%M%S")
        task = asyncio.create_task(GraphRouterMap.long_running_task())
        task_map[run_id] = task
        return run_id

    @staticmethod
    @GraphRouter.get("/stop", summary="中止任务", description="中止任务")
    async def stop_task(run_id: str):
        task = task_map.get(run_id)
        task.cancel()
        return {"message": "任务取消请求已发送"}

觉得有用的话点个赞 👍🏻 呗。
❤️❤️❤️本人水平有限,如有纰漏,欢迎各位大佬评论批评指正!😄😄😄

💘💘💘如果觉得这篇文对你有帮助的话,也请给个点赞、收藏下吧,非常感谢!👍 👍 👍

🔥🔥🔥Stay Hungry Stay Foolish 道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙


版权声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐