确保芹菜的公平加工——第一部分
从现在开始,努力学习吧!本文《确保芹菜的公平加工——第一部分》主要讲解了等等相关知识点,我会在golang学习网中持续更新相关的系列文章,欢迎大家关注并积极留言建议。下面就先一起来看一下本篇正文内容吧,希望能帮到你!

如果您熟悉 python,您很可能听说过 celery。它通常是异步处理任务的首选,例如图像处理或发送电子邮件。
与一些人交谈时,我开始注意到许多开发人员一开始都觉得 celery 令人印象深刻,但随着他们的项目规模和复杂性的增加,他们的兴奋开始消退。虽然有些人出于正当原因放弃了 celery,但其他人可能只是没有深入探索其核心,无法根据自己的需求进行定制。
在这篇博客中,我想讨论一些开发人员开始寻找替代方案甚至构建自定义后台工作框架的原因之一:公平处理。在用户/租户提交不同大小任务的环境中,一个租户的繁重工作量影响其他租户的风险可能会造成瓶颈并导致挫败感。
我将引导您了解在 celery 中实现公平处理的策略,确保平衡的任务分配,以便没有任何一个租户可以支配您的资源。
问题
让我们深入探讨多租户应用程序面临的常见挑战,特别是那些处理批处理的应用程序。想象一下,您有一个系统,用户可以将其图像处理任务排队,允许他们在短暂等待后收到处理后的图像。此设置不仅可以使您的 api 保持响应,还可以让您根据需要扩展工作线程以有效地处理负载。
一切都运行顺利 - 直到一个租户决定提交大量图像进行处理。您拥有多名工作人员,他们甚至可以自动扩展以满足不断增长的需求,因此您对您的基础设施充满信心。然而,当其他租户尝试对较小的批次(可能只是几张图像)进行排队并突然发现自己面临长时间的等待而没有任何更新时,麻烦就开始了。在您不知不觉中,支持请求开始涌入,用户抱怨您的服务速度缓慢甚至没有响应。
这种情况太常见了,因为 celery 默认情况下按照接收到的顺序处理任务。当一个租户因大量涌入的任务而让您的工作人员不堪重负时,即使是最好的自动扩展策略也可能不足以防止其他租户出现延误。因此,这些用户体验到的服务水平可能达不到承诺或预期的水平。
使用 celery 进行速率限制
确保公平处理的一个有效策略是实施速率限制。它允许您控制每个租户在特定时间范围内可以提交的任务数量。这可以防止任何单个租户垄断您的工人,并确保所有租户都有公平的机会来处理他们的任务。
celery 具有内置的任务级别速率限制功能:
# app.py
from celery import celery
app = celery("app", broker="redis://localhost:6379/0")
@app.task(rate_limit="10/m") # limit to 10 tasks per minute
def process_data(data):
print(f"processing data: {data}")
# call the task
if __name__ == "__main__":
for i in range(20):
process_data.delay(f"data_{i}")
您可以通过执行以下命令来运行工作线程:
celery -a app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1
现在,运行app.py脚本来触发20个任务:
python app.py
如果您设法在本地运行它,您会注意到每个任务之间存在延迟,以确保执行速率限制。现在您可能认为这并不能真正帮助我们解决问题,您完全正确。 celery 的内置速率限制对于我们的任务可能涉及调用具有严格速率限制的外部服务的场景非常有用。
这个示例强调了内置功能对于复杂场景来说可能过于简单。然而,我们可以通过更深入地探索 celery 的框架来克服这个限制。让我们看看如何为每个租户设置适当的速率限制和自动重试。
我们将使用 redis 来跟踪每个租户的速率限制。 redis 是 celery 的流行数据库和代理,因此让我们利用这个可能已经在您的堆栈中的组件。
让我们导入几个库:
import time import redis from celery import celery, task
现在我们将为我们的速率限制任务实现一个自定义基任务类:
# initialize a redis client
redis_client = redis.strictredis(host="localhost", port=6379, db=0)
class ratelimitedtask(task):
def __init__(self, *args, **kwargs):
# set default rate limit
if not hasattr(self, "custom_rate_limit"):
self.custom_rate_limit = 10
super().__init__(*args, **kwargs)
def __call__(self, tenant_id, *args, **kwargs):
# rate limiting logic
key = f"rate_limit:{tenant_id}:{self.name}"
# increment the count for this minute
current_count = redis_client.incr(key)
if current_count == 1:
# set expiration for the key if it's the first request
redis_client.expire(key, 10)
if current_count > self.custom_rate_limit:
print(f"rate limit exceeded for tenant {tenant_id}. retrying...")
raise self.retry(countdown=10)
return super().__call__(tenant_id, *args, **kwargs)
这个自定义类将跟踪特定租户使用 redis 触发的任务量,并将 ttl 设置为 10 秒。如果超出速率限制,任务将在 10 秒后重试。所以基本上我们的默认速率限制是 10 秒内完成 10 个任务。
让我们定义一个模拟处理的示例任务:
@app.task(base=ratelimitedtask, custom_rate_limit=5)
def process(tenant_id: int, data):
"""
mock processing task that takes 0.3 seconds to complete.
"""
print(f"processing data: {data} for tenant: {tenant_id}")
time.sleep(0.3)
这里我们定义了一个流程任务,你可以看到我可以在任务级别更改custom_rate_limit。如果我们不指定 custom_rate_limit,则将分配默认值 10。 现在我们的速率限制已更改为 10 秒内完成 5 个任务。
现在让我们为不同的租户触发一些任务:
if __name__ == "__main__":
for i in range(20):
process.apply_async(args=(1, f"data_{i}"))
for i in range(10):
process.apply_async(args=(2, f"data_{i}"))
我们为租户 id 1 定义 20 个任务,为租户 id 2 定义 10 个任务。
所以我们完整的代码将如下所示:
# app.py
import time
import redis
from celery import celery, task
app = celery(
"app",
broker="redis://localhost:6379/0",
broker_connection_retry_on_startup=false,
)
# initialize a redis client
redis_client = redis.strictredis(host="localhost", port=6379, db=0)
class ratelimitedtask(task):
def __init__(self, *args, **kwargs):
if not hasattr(self, "custom_rate_limit"):
self.custom_rate_limit = 10
super().__init__(*args, **kwargs)
def __call__(self, tenant_id, *args, **kwargs):
# rate limiting logic
key = f"rate_limit:{tenant_id}:{self.name}"
# increment the count for this minute
current_count = redis_client.incr(key)
if current_count == 1:
# set expiration for the key if it's the first request
redis_client.expire(key, 10)
if current_count > self.custom_rate_limit:
print(f"rate limit exceeded for tenant {tenant_id}. retrying...")
raise self.retry(countdown=10)
return super().__call__(tenant_id, *args, **kwargs)
@app.task(base=ratelimitedtask, custom_rate_limit=5)
def process(tenant_id: int, data):
"""
mock processing task that takes 0.3 seconds to complete.
"""
print(f"processing data: {data} for tenant: {tenant_id}")
time.sleep(0.3)
if __name__ == "__main__":
for i in range(20):
process.apply_async(args=(1, f"data_{i}"))
for i in range(10):
process.apply_async(args=(2, f"data_{i}"))
让我们运行我们的工作线程:
celery -a app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1
现在,运行 app.py 脚本来触发任务:
python app.py
如您所见,工作人员处理了第一个租户的 5 个任务,并为所有其他任务设置了重试。然后,它会执行第二个租户的 5 个任务,并为其他任务设置重试,然后继续进行。
这种方法允许您定义每个租户的速率限制,但正如您在我们的示例中看到的,对于运行速度非常快的任务,对速率限制过于严格最终会让工作人员在一段时间内无所事事。微调速率限制参数至关重要,并且取决于具体的任务和数量。不要犹豫,不断尝试,直到找到最佳平衡。
结论
我们探讨了 celery 的默认任务处理如何导致多租户环境中的不公平,以及速率限制如何帮助解决此问题。通过实施特定于租户的速率限制,我们可以防止任何单个租户垄断资源,并确保更公平地分配处理能力。
这种方法为在 celery 中实现公平处理提供了坚实的基础。然而,还有其他值得探索的技术来进一步优化多租户应用程序中的任务处理。虽然我最初计划在一篇文章中涵盖所有内容,但事实证明这个主题非常广泛!为了确保清晰度并保持本文的重点,我决定将其分为两部分。
在本系列的下一部分中,我们将深入研究任务优先级作为增强公平性和效率的另一种机制。这种方法允许您根据不同的标准为任务分配不同的优先级,确保即使在高需求时期也能及时处理关键任务。
敬请期待下期!
以上就是本文的全部内容了,是否有顺利帮助你解决问题?若是能给你带来学习上的帮助,请大家多多支持golang学习网!更多关于文章的相关知识,也可关注golang学习网公众号。
ThinkPHP 如何实现会员等级差异化内容展示?
- 上一篇
- ThinkPHP 如何实现会员等级差异化内容展示?
- 下一篇
- SpringBoot 集成 Thymeleaf 库提示无法解析模板,如何解决?
-
- 文章 · python教程 | 11分钟前 |
- PyMongo导入CSV:数值转换技巧分享
- 111浏览 收藏
-
- 文章 · python教程 | 11分钟前 |
- Geopandas地理数据处理入门教程
- 174浏览 收藏
-
- 文章 · python教程 | 8小时前 |
- Pandas列扩展与行值移动方法
- 422浏览 收藏
-
- 文章 · python教程 | 8小时前 |
- FlaskSQLAlchemy更新用户积分教程详解
- 345浏览 收藏
-
- 文章 · python教程 | 8小时前 |
- Pandas行标准差计算方法详解
- 253浏览 收藏
-
- 文章 · python教程 | 9小时前 |
- Python调用srun性能分析与优化
- 263浏览 收藏
-
- 文章 · python教程 | 9小时前 |
- Python指定文件路径的方法及技巧
- 362浏览 收藏
-
- 文章 · python教程 | 10小时前 |
- Pandas统计连续相同值并新增列技巧
- 297浏览 收藏
-
- 文章 · python教程 | 10小时前 |
- DjangoQ对象使用技巧与优化方法
- 245浏览 收藏
-
- 文章 · python教程 | 10小时前 |
- Dagster数据流转与参数配置方法
- 211浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 485次学习
-
- ChatExcel酷表
- ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
- 3212次使用
-
- Any绘本
- 探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
- 3425次使用
-
- 可赞AI
- 可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
- 3455次使用
-
- 星月写作
- 星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
- 4564次使用
-
- MagicLight
- MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
- 3832次使用
-
- Flask框架安装技巧:让你的开发更高效
- 2024-01-03 501浏览
-
- Django框架中的并发处理技巧
- 2024-01-22 501浏览
-
- 提升Python包下载速度的方法——正确配置pip的国内源
- 2024-01-17 501浏览
-
- Python与C++:哪个编程语言更适合初学者?
- 2024-03-25 501浏览
-
- 品牌建设技巧
- 2024-04-06 501浏览

