PythonAsyncio:后台任务顺序执行方法
来到golang学习网的大家,相信都是编程学习爱好者,希望在这里学习文章相关编程知识。下面本篇文章就来带大家聊聊《Python Asyncio:后台任务顺序执行技巧》,介绍一下,希望对大家的知识积累有所帮助,助力实战开发!
在开发高性能的异步应用时,我们经常会遇到需要并发执行任务以提高效率,但某些特定操作又必须严格按顺序进行的情况。一个典型的例子是数据收集与数据保存:应用可以持续地、异步地收集数据批次,但为了数据完整性和避免竞态条件,数据保存操作(例如写入文件或数据库)通常需要按批次顺序进行,即前一个批次的数据保存完成,下一个批次才能开始保存。
考虑以下场景:一个应用持续收集数据批次,并尝试在后台保存它们。如果收集速度快于保存速度,或者不同批次的数据大小差异导致保存时间不一,就可能出现后收集的小批次数据先于前收集的大批次数据保存完成,从而导致数据混乱。
以下是一个简化的示例,展示了这种潜在的问题:
import asyncio import random async def save_data(): """模拟数据保存操作,耗时2秒。""" print("我正在保存一个批次的数据...") await asyncio.sleep(2) print("一个批次的数据保存完毕。") async def collect_data(): """模拟数据收集操作,耗时随机。""" event_loop = asyncio.get_event_loop() while True: print("我正在收集数据...") await asyncio.sleep(random.randint(1, 5)) # 模拟收集耗时 # 直接创建后台任务保存数据,可能导致多个保存任务同时运行 event_loop.create_task(save_data()) # 运行主程序 # asyncio.run(collect_data()) # 如果运行此代码,会发现"我正在保存一个批次的数据..."可能重复出现,且完成顺序不确定
上述代码的问题在于,event_loop.create_task(save_data())会立即启动一个新的后台任务,而不会等待上一个save_data()任务完成。这导致多个save_data()实例可能同时运行,违背了“一次只保存一个批次”的需求。
策略一:显式等待前一个保存任务完成
解决此问题的一种直接方法是,在启动新的保存任务之前,先等待前一个保存任务的完成。这类似于双缓冲机制,确保了保存操作的原子性和顺序性。
实现原理: 通过维护一个对上一个保存任务的引用。在每次准备启动新的保存任务时,检查是否存在未完成的旧任务。如果存在,则使用await等待其完成,然后再启动新的保存任务。
示例代码:
import asyncio import random async def save_data_batch(): """模拟数据保存操作,耗时2秒。""" print("我正在保存一个批次的数据...") await asyncio.sleep(2) print("一个批次的数据保存完毕。") async def collect_data_sequential_save(): """数据收集,确保保存任务顺序执行。""" event_loop = asyncio.get_event_loop() last_save_task = None # 用于存储上一个保存任务的引用 while True: print("我正在收集数据...") await asyncio.sleep(random.randint(1, 5)) # 模拟收集耗时 # 如果存在上一个未完成的保存任务,则等待其完成 if last_save_task: print("等待上一个批次保存完成...") await last_save_task print("上一个批次已保存完成,准备启动新的保存任务。") # 启动新的保存任务,并保存其引用 last_save_task = event_loop.create_task(save_data_batch()) # 为了演示,可以设置一个退出条件 # if random.random() < 0.1: # break # 循环结束后,确保最后一个保存任务也完成 if last_save_task: await last_save_task # 运行示例 # asyncio.run(collect_data_sequential_save())
优缺点:
- 优点: 实现简单直观,直接控制保存任务的顺序。
- 缺点: 这种方法在某种程度上会“阻塞”数据收集的流程。如果一个批次的保存时间特别长,而同时有多个小批次数据被收集,那么这些小批次将不得不等待大批次保存完成才能开始自己的保存,这可能会导致收集到的数据在内存中堆积,甚至影响整体吞吐量。它确保的是下一个保存任务的启动在上一个保存任务完成之后,而不是真正意义上的收集和保存完全并行。
策略二:使用 asyncio.Queue 实现生产者-消费者模型
为了更好地解耦数据收集和数据保存过程,并实现更高效的并发,我们可以采用生产者-消费者模型,利用 asyncio.Queue 来缓冲待保存的数据批次。
实现原理:
- 生产者(数据收集器): 负责收集数据,并将收集到的数据(或指示需要保存的信号)放入一个 asyncio.Queue 中。
- 消费者(数据保存器): 启动一个独立的后台任务,持续从队列中取出数据,并执行保存操作。由于只有一个消费者任务,它会自然地按顺序处理队列中的数据。
- asyncio.Queue 的 maxsize 参数可以限制队列中允许的最大项目数,从而防止收集速度过快导致内存耗尽。当队列满时,生产者(put操作)会自动等待,直到队列中有空间。
示例代码:
import asyncio import random async def save_data_batch(): """模拟数据保存操作,耗时2秒。""" print("我正在保存一个批次的数据...") await asyncio.sleep(2) print("一个批次的数据保存完毕。") async def save_all_batches(queue: asyncio.Queue): """消费者:从队列中取出数据并保存。""" while True: try: # 从队列中获取一个批次,如果队列为空则等待 batch = await queue.get() print(f"消费者:从队列中取出批次 {batch},准备保存。") await save_data_batch() # 执行保存操作 queue.task_done() # 标记此任务已完成 except asyncio.CancelledError: # 当消费者任务被取消时,优雅退出 print("消费者任务被取消,退出。") break except Exception as e: print(f"消费者:保存过程中发生错误: {e}") queue.task_done() # 即使出错也要标记完成,避免死锁 async def collect_data_with_queue(): """生产者:收集数据并放入队列。""" event_loop = asyncio.get_event_loop() # 创建一个有最大容量的队列,防止内存溢出 queue = asyncio.Queue(maxsize=4) # 启动消费者任务 saving_task = event_loop.create_task(save_all_batches(queue)) batch_id = 0 try: while True: print(f"生产者:我正在收集数据 (批次 {batch_id})...") await asyncio.sleep(random.randint(1, 3)) # 模拟收集耗时 # 将收集到的数据(这里用批次ID代替)放入队列 # 如果队列已满,此put操作会等待直到有空间 await queue.put(f"Batch-{batch_id}") print(f"生产者:批次 {batch_id} 已放入队列。队列当前大小: {queue.qsize()}") batch_id += 1 # 为了演示,在收集一定数量批次后停止 if batch_id >= 10: print("生产者:已收集足够批次,停止收集。") break finally: # 收集完成后,等待队列中的所有任务完成 print("生产者:等待所有队列中的批次保存完成...") await queue.join() print("生产者:所有队列中的批次已保存完成。") # 取消消费者任务,使其优雅退出 saving_task.cancel() # 确保消费者任务被取消并完成清理 await saving_task # 运行示例 # asyncio.run(collect_data_with_queue())
优缺点:
- 优点:
- 真正的并发: 数据收集和数据保存可以同时进行,最大化利用CPU和I/O资源。
- 顺序保证: 队列天然保证了数据的先进先出(FIFO)顺序,因此保存操作总是按收集顺序进行。
- 流量控制: maxsize 参数提供了内置的背压机制,防止生产者速度过快压垮消费者或耗尽内存。
- 解耦: 收集逻辑和保存逻辑完全分离,易于维护和扩展。
- 缺点:
- 复杂度增加: 需要管理队列、消费者任务的生命周期、取消和异常处理。
- 死锁风险: 如果消费者任务因未处理的异常而意外终止,而队列中仍有未处理的项,queue.join() 可能会永远等待,导致死锁。因此,健壮的异常处理至关重要。
关于异常处理的注意事项: 在生产环境中,确保消费者任务(如 save_all_batches)的健壮性至关重要。如果消费者任务因未捕获的异常而退出,那么 queue.join() 可能会永远阻塞,因为 task_done() 不会被调用。一种更健壮的方法是,在生产者尝试 queue.put() 时,也同时监控消费者任务的状态。如果消费者任务意外完成(例如,因为它崩溃了),生产者应该停止生产并妥善处理。
# 生产者在put时监控消费者状态的更健壮示例片段 # (这只是概念性代码,实际可能更复杂) # putting_task = event_loop.create_task(queue.put(f"Batch-{batch_id}")) # done, pending = await asyncio.wait({putting_task, saving_task}, # return_when=asyncio.FIRST_COMPLETED) # if saving_task in done: # # 消费者任务已完成(可能因错误),取消put操作并退出 # putting_task.cancel() # print("消费者任务已意外退出,停止生产。") # break # else: # # put操作完成,继续生产 # await putting_task # 确保put操作的异常被捕获
总结与选择
- 如果你对性能要求不是极致,或者保存任务的耗时相对稳定且不长,策略一(显式等待) 是一个简单有效的选择。它易于理解和实现,适合快速开发和维护。
- 如果你需要最大化并发吞吐量,并且数据收集和保存之间存在显著的性能差异,或者你需要精细控制数据流,那么 策略二(asyncio.Queue) 是更优的选择。它提供了更强的解耦和流量控制能力,但需要更仔细地处理任务生命周期和异常情况,以确保系统的稳定性和健壮性。
在实际应用中,选择哪种策略取决于具体的业务需求、性能目标以及对代码复杂度的接受程度。理解这两种模式的优缺点,将帮助你在 asyncio 应用中构建出既高效又可靠的后台任务处理机制。
今天关于《PythonAsyncio:后台任务顺序执行方法》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于的内容请关注golang学习网公众号!

- 上一篇
- JavaScript异步错误处理技巧详解

- 下一篇
- 360智图优化移动端广告图方法
-
- 文章 · python教程 | 9分钟前 | Python 数据准备 mplfinance 股票图表 K线图
- Python股票图表制作:mplfinance绘图教程详解
- 367浏览 收藏
-
- 文章 · python教程 | 11分钟前 | Kubernetes 高可用性 数据一致性 异常检测 自动扩缩
- Kubernetes异常检测扩展方法解析
- 195浏览 收藏
-
- 文章 · python教程 | 24分钟前 |
- Python子类避免重复初始化技巧
- 411浏览 收藏
-
- 文章 · python教程 | 29分钟前 |
- Python中len函数的作用是什么
- 182浏览 收藏
-
- 文章 · python教程 | 41分钟前 |
- Python正则表达式数据验证技巧
- 316浏览 收藏
-
- 文章 · python教程 | 42分钟前 |
- Python脚本:GitLab多项目文件检测方法
- 370浏览 收藏
-
- 文章 · python教程 | 43分钟前 |
- Python语言种类及特性对比解析
- 225浏览 收藏
-
- 文章 · python教程 | 54分钟前 |
- Python快速计算数组唯一差值技巧
- 103浏览 收藏
-
- 文章 · python教程 | 1小时前 |
- 如何用正则匹配日期格式YYYY-MM-DD
- 334浏览 收藏
-
- 文章 · python教程 | 1小时前 |
- Python嵌套循环优化技巧分享
- 296浏览 收藏
-
- 文章 · python教程 | 1小时前 |
- PyCharm安装步骤详解教程
- 489浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 511次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 498次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 484次学习
-
- 千音漫语
- 千音漫语,北京熠声科技倾力打造的智能声音创作助手,提供AI配音、音视频翻译、语音识别、声音克隆等强大功能,助力有声书制作、视频创作、教育培训等领域,官网:https://qianyin123.com
- 117次使用
-
- MiniWork
- MiniWork是一款智能高效的AI工具平台,专为提升工作与学习效率而设计。整合文本处理、图像生成、营销策划及运营管理等多元AI工具,提供精准智能解决方案,让复杂工作简单高效。
- 112次使用
-
- NoCode
- NoCode (nocode.cn)是领先的无代码开发平台,通过拖放、AI对话等简单操作,助您快速创建各类应用、网站与管理系统。无需编程知识,轻松实现个人生活、商业经营、企业管理多场景需求,大幅降低开发门槛,高效低成本。
- 128次使用
-
- 达医智影
- 达医智影,阿里巴巴达摩院医疗AI创新力作。全球率先利用平扫CT实现“一扫多筛”,仅一次CT扫描即可高效识别多种癌症、急症及慢病,为疾病早期发现提供智能、精准的AI影像早筛解决方案。
- 121次使用
-
- 智慧芽Eureka
- 智慧芽Eureka,专为技术创新打造的AI Agent平台。深度理解专利、研发、生物医药、材料、科创等复杂场景,通过专家级AI Agent精准执行任务,智能化工作流解放70%生产力,让您专注核心创新。
- 126次使用
-
- Flask框架安装技巧:让你的开发更高效
- 2024-01-03 501浏览
-
- Django框架中的并发处理技巧
- 2024-01-22 501浏览
-
- 提升Python包下载速度的方法——正确配置pip的国内源
- 2024-01-17 501浏览
-
- Python与C++:哪个编程语言更适合初学者?
- 2024-03-25 501浏览
-
- 品牌建设技巧
- 2024-04-06 501浏览