当前位置:首页 > 文章列表 > 文章 > python教程 > Python asyncio 超时与取消实战:用 timeout 和 TaskGroup 管住慢任务

Python asyncio 超时与取消实战:用 timeout 和 TaskGroup 管住慢任务

来源:17golang原创 2026-06-12 20:37:12 0浏览 收藏

异步代码最容易出问题的地方,不是语法,而是“慢任务怎么办”。一个接口聚合三个上游服务,任意一个服务变慢,都可能把请求拖到超时;如果取消处理没有写好,还会留下半完成任务、未关闭连接和难以复现的偶发告警。

本文用一个可运行的小例子,梳理 Python asyncio 中常用的超时与取消手段:asyncio.timeout()asyncio.wait_for()TaskGroupshield()。目标不是把 API 背一遍,而是能在真实项目里给异步任务划边界、做清理、让故障更容易定位。

摘要

我们会从一个“订单详情聚合接口”出发,先写出会被慢上游拖住的版本,再逐步加入超时边界、结构化并发、取消清理和关键任务保护。读完后,你可以把同样的思路迁移到爬虫、消息消费、异步 Web 接口和批量任务调度中。

适合人群

适合已经会写基础 async defawait,但在项目里遇到慢请求、任务取消、异步资源释放问题的 Python 开发者。示例推荐使用 Python 3.11 及以上版本运行,因为 TaskGroupasyncio.timeout() 在这些版本里更适合组织结构化并发。

目录

  1. 先复现一个慢上游问题
  2. 用 timeout 给整段流程划边界
  3. wait_for 适合包住单个等待点
  4. TaskGroup 让一组任务同生共退
  5. 取消传播后一定要做资源清理
  6. 什么时候使用 shield
  7. 常见坑和排查思路

一、先复现一个慢上游问题

假设订单详情页需要同时读取订单、库存和优惠信息。为了模拟网络调用,下面的函数用 asyncio.sleep() 代表上游耗时:

import asyncio
from dataclasses import dataclass


@dataclass
class ServiceResult:
    name: str
    value: str


async def query_service(name: str, delay: float) -> ServiceResult:
    await asyncio.sleep(delay)
    return ServiceResult(name=name, value=f"{name}-ok")


async def build_order_detail() -> list[ServiceResult]:
    order_task = asyncio.create_task(query_service("order", 0.2))
    stock_task = asyncio.create_task(query_service("stock", 0.4))
    coupon_task = asyncio.create_task(query_service("coupon", 3.0))
    return await asyncio.gather(order_task, stock_task, coupon_task)


async def main() -> None:
    result = await build_order_detail()
    print(result)


if __name__ == "__main__":
    asyncio.run(main())

这段代码可以正常返回,但 coupon 慢到 3 秒时,整个聚合结果也会被拖到 3 秒。如果这是一个对外接口,请求线程池、网关超时和用户体验都会受到影响。

asyncio 聚合接口的超时边界示意图

二、用 timeout 给整段流程划边界

当你希望“这段业务最多只能花多少时间”时,可以把整段逻辑放进 asyncio.timeout()。它像一个时间盒,时间到了就取消当前等待中的任务,并抛出超时异常。

async def build_order_detail_with_limit() -> list[ServiceResult]:
    try:
        async with asyncio.timeout(1.0):
            return await asyncio.gather(
                query_service("order", 0.2),
                query_service("stock", 0.4),
                query_service("coupon", 3.0),
            )
    except TimeoutError:
        return [
            ServiceResult(name="order", value="fallback"),
            ServiceResult(name="stock", value="fallback"),
            ServiceResult(name="coupon", value="timeout"),
        ]

这里的重点是边界放在哪里。如果把边界放在整段聚合外层,业务表现是“超过 1 秒就走兜底”;如果只给某一个上游设置边界,业务表现就是“慢上游单独降级,其它结果照常返回”。

三、wait_for 适合包住单个等待点

asyncio.wait_for() 更适合包住一个明确的等待点,比如某个上游、某个锁、某个队列读取。下面的写法让优惠服务最多等待 0.8 秒,订单和库存仍然按原计划返回:

async def safe_coupon() -> ServiceResult:
    try:
        return await asyncio.wait_for(query_service("coupon", 3.0), timeout=0.8)
    except TimeoutError:
        return ServiceResult(name="coupon", value="timeout")


async def build_order_detail_partly_degraded() -> list[ServiceResult]:
    return await asyncio.gather(
        query_service("order", 0.2),
        query_service("stock", 0.4),
        safe_coupon(),
    )

外层时间盒强调“整体耗时不能超过多少”,单点等待强调“这个上游不能拖累其它结果”。在接口聚合、批量读取和多数据源组合场景里,通常会同时使用这两种策略。

四、TaskGroup 让一组任务同生共退

如果一组任务属于同一个业务步骤,推荐用 TaskGroup 表达这种关系。它的好处是结构清楚:只要组内某个任务异常退出,组里的其它任务也会被取消,调用者可以在一个地方处理结果。

async def build_with_task_group() -> dict[str, ServiceResult]:
    tasks = {}
    try:
        async with asyncio.timeout(1.2):
            async with asyncio.TaskGroup() as group:
                tasks["order"] = group.create_task(query_service("order", 0.2))
                tasks["stock"] = group.create_task(query_service("stock", 0.4))
                tasks["coupon"] = group.create_task(query_service("coupon", 3.0))
    except TimeoutError:
        return {
            "order": ServiceResult("order", "fallback"),
            "stock": ServiceResult("stock", "fallback"),
            "coupon": ServiceResult("coupon", "timeout"),
        }

    return {name: task.result() for name, task in tasks.items()}

和手动保存多个任务相比,TaskGroup 更像一个任务作用域。作用域结束时,任务要么完成,要么被明确取消,不容易出现“外层函数已经返回,里面还有任务偷偷运行”的情况。

asyncio TaskGroup 取消传播和资源清理流程图

五、取消传播后一定要做资源清理

任务被取消时,会在等待点收到 asyncio.CancelledError。不要把它当作普通异常随手吞掉。更稳妥的做法是:清理资源,然后继续抛出,让上层知道任务已经取消。

async def stream_rows(name: str) -> int:
    opened = False
    try:
        opened = True
        count = 0
        while True:
            await asyncio.sleep(0.2)
            count += 1
    except asyncio.CancelledError:
        print(f"{name}: task cancelled, close cursor")
        raise
    finally:
        if opened:
            print(f"{name}: resource closed")

如果你在 except asyncio.CancelledError 里返回默认值,上层会以为任务正常完成,排查时很难知道它其实被取消过。除非你非常明确地要把取消转成业务结果,否则应当清理后继续抛出。

六、什么时候使用 shield

asyncio.shield() 可以保护某个等待对象,不让外层取消直接打断它。它适合很少数需要“尽量完成”的短操作,比如写审计日志、释放租约、提交本地状态。不要把大段业务逻辑都包进 shield(),否则超时边界会变得难以理解。

async def write_audit_log(order_id: str) -> None:
    await asyncio.sleep(0.2)
    print(f"audit saved: {order_id}")


async def handle_request(order_id: str) -> str:
    try:
        async with asyncio.timeout(1.0):
            await query_service("order", 0.8)
            return "ok"
    finally:
        await asyncio.shield(write_audit_log(order_id))

这段代码表达的是:主流程有 1 秒边界,但收尾日志尽量落下。真实项目里还要控制日志写入自己的超时时间,避免保护动作本身又变成新的慢点。

七、常见坑和排查思路

1. 只设置网关超时,没有设置协程超时

网关断开连接后,如果应用内部任务没有收到明确的取消或超时,它可能还会继续占用连接池、锁和内存。应用层也要有自己的时间边界。

2. 把 CancelledError 当普通异常吞掉

取消是一种控制信号。清理资源可以,直接吞掉要谨慎。吞掉取消信号后,外层任务状态会变得不真实,监控也容易误判。

3. 所有上游共用一个超时时间

核心上游和非核心上游的策略应当不同。订单主数据可以等待更久,推荐信息、优惠角标这类非核心结果可以更快降级。

4. 没有记录慢点来源

建议在每个上游调用旁边记录耗时、超时次数和降级原因。日志里至少要有业务 ID、上游名、耗时、是否降级,排查时会省很多时间。

完整示例

下面把整体超时、单点降级和结构化任务组织合在一起,便于你复制到本地运行:

import asyncio
from dataclasses import dataclass


@dataclass
class ServiceResult:
    name: str
    value: str


async def query_service(name: str, delay: float) -> ServiceResult:
    await asyncio.sleep(delay)
    return ServiceResult(name, f"{name}-ok")


async def safe_call(name: str, delay: float, limit: float) -> ServiceResult:
    try:
        return await asyncio.wait_for(query_service(name, delay), timeout=limit)
    except TimeoutError:
        return ServiceResult(name, "timeout")


async def build_order_detail() -> dict[str, ServiceResult]:
    async with asyncio.timeout(1.5):
        async with asyncio.TaskGroup() as group:
            order = group.create_task(safe_call("order", 0.2, 1.0))
            stock = group.create_task(safe_call("stock", 0.4, 1.0))
            coupon = group.create_task(safe_call("coupon", 3.0, 0.6))

    return {
        "order": order.result(),
        "stock": stock.result(),
        "coupon": coupon.result(),
    }


async def main() -> None:
    try:
        detail = await build_order_detail()
    except TimeoutError:
        detail = {"error": ServiceResult("all", "timeout")}
    print(detail)


if __name__ == "__main__":
    asyncio.run(main())

总结

asyncio 的超时治理可以按三层理解:整体流程用 asyncio.timeout() 划边界,单个等待点用 asyncio.wait_for() 控制慢上游,一组强相关任务用 TaskGroup 管生命周期。任务被取消时,先清理资源,再把取消信号交回上层。这样写出来的异步代码,慢点更可控,故障也更容易定位。

参考资料

本文示例参考了 Python 官方文档中关于 asyncio 任务、超时、任务组和取消行为的说明,并结合接口聚合场景重新组织为原创示例。

版本声明
本文转载于:17golang原创 如有侵犯,请联系study_golang@163.com删除
Docker Compose 本地多服务环境实战:MySQL、Redis、Nginx 一键启动Docker Compose 本地多服务环境实战:MySQL、Redis、Nginx 一键启动
上一篇
Docker Compose 本地多服务环境实战:MySQL、Redis、Nginx 一键启动
Go 问答:defer 为什么不适合直接放在大循环里,资源该怎么释放
下一篇
Go 问答:defer 为什么不适合直接放在大循环里,资源该怎么释放
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    543次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    516次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    500次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    485次学习
查看更多
AI推荐
  • ChatExcel酷表:告别Excel难题,北大团队AI助手助您轻松处理数据
    ChatExcel酷表
    ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
    7929次使用
  • Any绘本:开源免费AI绘本创作工具深度解析
    Any绘本
    探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
    8355次使用
  • 可赞AI:AI驱动办公可视化智能工具,一键高效生成文档图表脑图
    可赞AI
    可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
    8170次使用
  • 星月写作:AI网文创作神器,助力爆款小说速成
    星月写作
    星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
    10090次使用
  • MagicLight.ai:叙事驱动AI动画视频创作平台 | 高效生成专业级故事动画
    MagicLight
    MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
    8943次使用
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码