当前位置:首页 > 文章列表 > 文章 > python教程 > 确保芹菜的公平加工——第一部分

确保芹菜的公平加工——第一部分

来源:dev.to 2024-11-27 14:04:06 0浏览 收藏

从现在开始,努力学习吧!本文《确保芹菜的公平加工——第一部分》主要讲解了等等相关知识点,我会在golang学习网中持续更新相关的系列文章,欢迎大家关注并积极留言建议。下面就先一起来看一下本篇正文内容吧,希望能帮到你!

确保芹菜的公平加工——第一部分

如果您熟悉 python,您很可能听说过 celery。它通常是异步处理任务的首选,例如图像处理或发送电子邮件。

与一些人交谈时,我开始注意到许多开发人员一开始都觉得 celery 令人印象深刻,但随着他们的项目规模和复杂性的增加,他们的兴奋开始消退。虽然有些人出于正当原因放弃了 celery,但其他人可能只是没有深入探索其核心,无法根据自己的需求进行定制。

在这篇博客中,我想讨论一些开发人员开始寻找替代方案甚至构建自定义后台工作框架的原因之一:公平处理。在用户/租户提交不同大小任务的环境中,一个租户的繁重工作量影响其他租户的风险可能会造成瓶颈并导致挫败感。

我将引导您了解在 celery 中实现公平处理的策略,确保平衡的任务分配,以便没有任何一个租户可以支配您的资源。

问题

让我们深入探讨多租户应用程序面临的常见挑战,特别是那些处理批处理的应用程序。想象一下,您有一个系统,用户可以将其图像处理任务排队,允许他们在短暂等待后收到处理后的图像。此设置不仅可以使您的 api 保持响应,还可以让您根据需要扩展工作线程以有效地处理负载。

一切都运行顺利 - 直到一个租户决定提交大量图像进行处理。您拥有多名工作人员,他们甚至可以自动扩展以满足不断增长的需求,因此您对您的基础设施充满信心。然而,当其他租户尝试对较小的批次(可能只是几张图像)进行排队并突然发现自己面临长时间的等待而没有任何更新时,麻烦就开始了。在您不知不觉中,支持请求开始涌入,用户抱怨您的服务速度缓慢甚至没有响应。

这种情况太常见了,因为 celery 默认情况下按照接收到的顺序处理任务。当一个租户因大量涌入的任务而让您的工作人员不堪重负时,即使是最好的自动扩展策略也可能不足以防止其他租户出现延误。因此,这些用户体验到的服务水平可能达不到承诺或预期的水平。

使用 celery 进行速率限制

确保公平处理的一个有效策略是实施速率限制。它允许您控制每个租户在特定时间范围内可以提交的任务数量。这可以防止任何单个租户垄断您的工人,并确保所有租户都有公平的机会来处理他们的任务。

celery 具有内置的任务级别速率限制功能:

# app.py
from celery import celery

app = celery("app", broker="redis://localhost:6379/0")

@app.task(rate_limit="10/m") # limit to 10 tasks per minute
def process_data(data):
    print(f"processing data: {data}")

# call the task
if __name__ == "__main__":
    for i in range(20):
        process_data.delay(f"data_{i}")

您可以通过执行以下命令来运行工作线程:

celery -a app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1

现在,运行app.py脚本来触发20个任务:

python app.py

如果您设法在本地运行它,您会注意到每个任务之间存在延迟,以确保执行速率限制。现在您可能认为这并不能真正帮助我们解决问题,您完全正确。 celery 的内置速率限制对于我们的任务可能涉及调用具有严格速率限制的外部服务的场景非常有用。

这个示例强调了内置功能对于复杂场景来说可能过于简单。然而,我们可以通过更深入地探索 celery 的框架来克服这个限制。让我们看看如何为每个租户设置适当的速率限制和自动重试。

我们将使用 redis 来跟踪每个租户的速率限制。 redis 是 celery 的流行数据库和代理,因此让我们利用这个可能已经在您的堆栈中的组件。

让我们导入几个库:

import time
import redis
from celery import celery, task

现在我们将为我们的速率限制任务实现一个自定义基任务类:

# initialize a redis client
redis_client = redis.strictredis(host="localhost", port=6379, db=0)

class ratelimitedtask(task):
    def __init__(self, *args, **kwargs):
        # set default rate limit
        if not hasattr(self, "custom_rate_limit"):
            self.custom_rate_limit = 10

        super().__init__(*args, **kwargs)

    def __call__(self, tenant_id, *args, **kwargs):
        # rate limiting logic
        key = f"rate_limit:{tenant_id}:{self.name}"

        # increment the count for this minute
        current_count = redis_client.incr(key)

        if current_count == 1:
            # set expiration for the key if it's the first request
            redis_client.expire(key, 10)

        if current_count > self.custom_rate_limit:
            print(f"rate limit exceeded for tenant {tenant_id}. retrying...")
            raise self.retry(countdown=10)

        return super().__call__(tenant_id, *args, **kwargs)

这个自定义类将跟踪特定租户使用 redis 触发的任务量,并将 ttl 设置为 10 秒。如果超出速率限制,任务将在 10 秒后重试。所以基本上我们的默认速率限制是 10 秒内完成 10 个任务。

让我们定义一个模拟处理的示例任务:

@app.task(base=ratelimitedtask, custom_rate_limit=5)
def process(tenant_id: int, data):
    """
    mock processing task that takes 0.3 seconds to complete.
    """
    print(f"processing data: {data} for tenant: {tenant_id}")
    time.sleep(0.3)

这里我们定义了一个流程任务,你可以看到我可以在任务级别更改custom_rate_limit。如果我们不指定 custom_rate_limit,则将分配默认值 10。 现在我们的速率限制已更改为 10 秒内完成 5 个任务。

现在让我们为不同的租户触发一些任务:

if __name__ == "__main__":
    for i in range(20):
        process.apply_async(args=(1, f"data_{i}"))

    for i in range(10):
        process.apply_async(args=(2, f"data_{i}"))

我们为租户 id 1 定义 20 个任务,为租户 id 2 定义 10 个任务。

所以我们完整的代码将如下所示:

# app.py
import time
import redis
from celery import celery, task

app = celery(
    "app",
    broker="redis://localhost:6379/0",
    broker_connection_retry_on_startup=false,
)

# initialize a redis client
redis_client = redis.strictredis(host="localhost", port=6379, db=0)


class ratelimitedtask(task):
    def __init__(self, *args, **kwargs):
        if not hasattr(self, "custom_rate_limit"):
            self.custom_rate_limit = 10

        super().__init__(*args, **kwargs)

    def __call__(self, tenant_id, *args, **kwargs):
        # rate limiting logic
        key = f"rate_limit:{tenant_id}:{self.name}"

        # increment the count for this minute
        current_count = redis_client.incr(key)

        if current_count == 1:
            # set expiration for the key if it's the first request
            redis_client.expire(key, 10)

        if current_count > self.custom_rate_limit:
            print(f"rate limit exceeded for tenant {tenant_id}. retrying...")
            raise self.retry(countdown=10)

        return super().__call__(tenant_id, *args, **kwargs)


@app.task(base=ratelimitedtask, custom_rate_limit=5)
def process(tenant_id: int, data):
    """
    mock processing task that takes 0.3 seconds to complete.
    """
    print(f"processing data: {data} for tenant: {tenant_id}")
    time.sleep(0.3)

if __name__ == "__main__":
    for i in range(20):
        process.apply_async(args=(1, f"data_{i}"))

    for i in range(10):
        process.apply_async(args=(2, f"data_{i}"))

让我们运行我们的工作线程:

celery -a app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1

现在,运行 app.py 脚本来触发任务:

python app.py

如您所见,工作人员处理了第一个租户的 5 个任务,并为所有其他任务设置了重试。然后,它会执行第二个租户的 5 个任务,并为其他任务设置重试,然后继续进行。

这种方法允许您定义每个租户的速率限制,但正如您在我们的示例中看到的,对于运行速度非常快的任务,对速率限制过于严格最终会让工作人员在一段时间内无所事事。微调速率限制参数至关重要,并且取决于具体的任务和数量。不要犹豫,不断尝试,直到找到最佳平衡。

结论

我们探讨了 celery 的默认任务处理如何导致多租户环境中的不公平,以及速率限制如何帮助解决此问题。通过实施特定于租户的速率限制,我们可以防止任何单个租户垄断资源,并确保更公平地分配处理能力。

这种方法为在 celery 中实现公平处理提供了坚实的基础。然而,还有其他值得探索的技术来进一步优化多租户应用程序中的任务处理。虽然我最初计划在一篇文章中涵盖所有内容,但事实证明这个主题非常广泛!为了确保清晰度并保持本文的重点,我决定将其分为两部分。

在本系列的下一部分中,我们将深入研究任务优先级作为增强公平性和效率的另一种机制。这种方法允许您根据不同的标准为任务分配不同的优先级,确保即使在高需求时期也能及时处理关键任务。

敬请期待下期!

以上就是本文的全部内容了,是否有顺利帮助你解决问题?若是能给你带来学习上的帮助,请大家多多支持golang学习网!更多关于文章的相关知识,也可关注golang学习网公众号。

版本声明
本文转载于:dev.to 如有侵犯,请联系study_golang@163.com删除
ThinkPHP 如何实现会员等级差异化内容展示? 
ThinkPHP 如何实现会员等级差异化内容展示?
上一篇
ThinkPHP 如何实现会员等级差异化内容展示?
SpringBoot 集成 Thymeleaf 库提示无法解析模板,如何解决?
下一篇
SpringBoot 集成 Thymeleaf 库提示无法解析模板,如何解决?
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    543次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    516次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    500次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    485次学习
查看更多
AI推荐
  • ljg-skills -
    ljg-skills
    ljg-skills 是李继刚开源的 AI 技能与提示词集合,面向大模型使用者整理了一批可复用的 prompt、角色设定和任务技能模板,适合用于学习提示词设计、搭建个人 AI 工作流和沉淀团队常用智能体能力。
    3768次使用
  • MELO音乐 - AI 音乐生成平台,支持多模态创作能力
    MELO音乐
    MELO音乐是一站式AI视频与音乐制作助手,对标suno, udio的高品质体验。提供伴奏生成、原创写词、无损导出、哼唱识曲、混音变声等全套音频与短视频编辑工具。无论是流行Kpop、电音说唱、民谣古风、摇滚儿歌还是商用轻音乐,MELO为你免费谱曲,轻松做同款!
    3480次使用
  • UniScribe - AI 免费在线音视频转文字平台
    UniScribe
    UniScribe 是一款 AI 音视频转文字与内容整理工具,支持上传音频、视频文件或粘贴 YouTube 链接,自动生成转写文本、摘要、思维导图和关键问题,并支持多格式导出,适合会议记录、课程学习、访谈整理和内容创作复盘。
    3448次使用
  • 剧云 - 免费 AI 智能中文剧本创作平台
    剧云
    剧云是专业中文剧本创作平台,安全稳定运行十余年,集成AI编剧、剧本医生审核、人物小传、剧情关系图、大纲编写、多人协作、Word导入导出、版权管控功能,数据安全防护,轻松高效创作剧本。
    3639次使用
  • 万象有声 - AI 一站式有声内容创作平台
    万象有声
    万象有声,一个专为有声创作者打造的新一代智能有声内容创作平台。平台提供专业的智能拆章、智能画本编辑、AI配音、AI生成音效、后期制作、智能对轨、智能审听等有声创作全流程工具,可以帮助创作者高效、低成本创作出引人入胜的有声作品。立即体验,让有声书制作更简单!
    3607次使用
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码