当前位置:首页 > 文章列表 > 文章 > python教程 > 提升Python数据处理性能:从多线程到多进程的优化实践

提升Python数据处理性能:从多线程到多进程的优化实践

2025-10-17 10:36:12 0浏览 收藏

珍惜时间,勤奋学习!今天给大家带来《提升Python数据处理性能:从多线程到多进程的优化实践 》,正文内容主要涉及到等等,如果你正在学习文章,或者是对文章有疑问,欢迎大家关注我!后面我会持续更新相关内容的,希望都能帮到正在学习的大家!

提升Python数据处理性能:从多线程到多进程的优化实践

本文探讨了在Python中处理大规模数据列表匹配和筛选时的性能瓶颈。针对传统多线程在CPU密集型任务中受限于GIL的局限性,文章提出并详细阐述了如何利用Python的multiprocessing模块,通过创建独立的进程来并行化任务,从而显著提升数据处理效率。文章提供了完整的代码示例和专业解析,帮助读者理解并应用多进程技术优化Python程序的性能。

1. Python并发编程的挑战:GIL与CPU密集型任务

在Python中,处理大量数据(例如超过23,000条JSON记录与3,000多个标记进行匹配)往往会面临性能挑战。当需要对这些数据进行复杂计算或字符串相似度比较等CPU密集型操作时,程序的执行时间可能会非常长。

一个常见的优化思路是使用并发编程,例如Python的threading模块。然而,对于CPU密集型任务,Python的全局解释器锁(Global Interpreter Lock, GIL)是一个显著的限制。GIL确保在任何给定时刻,只有一个线程能够执行Python字节码。这意味着即使创建了多个线程,它们也无法真正地并行执行CPU密集型任务,因为它们必须轮流获取GIL,导致多线程在CPU密集型场景下并不能带来显著的性能提升,甚至可能因为线程切换的开销而略微降低性能。

2. 问题场景与初始尝试

假设我们有两个列表:一个包含字典的json_list(例如用户数据,每个字典含有一个"code"字段),另一个是marking列表(包含需要匹配的字符串)。我们的目标是从json_list中找出与marking列表中的每个元素具有高相似度"code"的项,并将匹配到的标记和数据收集起来。

初始的尝试可能如下所示,使用threading模块来尝试并行化匹配过程:

import math
import threading
from difflib import SequenceMatcher

# 示例数据(实际数据量远大于此)
json_list = [
    {"code": "001123", "phone_number": "...", "email": "...", "address": "...", "note": ""},
    {"code": "654564", "phone_number": "...", "email": "...", "address": "...", "note": ""},
    {"code": "876890", "phone_number": "...", "email": "...", "address": "...", "note": ""},
    {"code": "hj876", "phone_number": "...", "email": "...", "address": "...", "note": ""},
    # ... 更多数据
]
marking = ["654564", "hj876", "8768"] # ... 更多标记

def find_marking(x, y):
    """
    比较标记x与数据y的'code'字段的相似度。
    """
    text_match = SequenceMatcher(None, x, y.get('code')).ratio()
    if text_match == 1 or (0.98 <= text_match < 0.99):
        return y
    return None

def eliminate_marking_threaded(marking_list, json_list):
    result, result_mark = [], []
    # 这里的内部函数及对data_scrap的修改存在并发问题和GIL限制
    # 实际场景中,对共享列表的pop/remove操作需要更复杂的同步机制
    # 且因为GIL,多线程在此处并不能带来性能提升
    def __process_eliminate(marking_item, data_scrap_copy):
        for data in data_scrap_copy: # 遍历副本
            result_data = find_marking(marking_item, data)
            if result_data:
                # 注意:这里的append操作如果直接对外部result/result_mark进行,
                # 需要加锁。且data_scrap_copy的remove只影响副本。
                # 实际代码中需要更严谨的共享数据处理。
                # 此处仅为说明多线程尝试的局限性。
                # result_mark.append(marking_item)
                # result.append(result_data)
                return

    threads = []
    # 针对每个marking创建线程,但由于GIL,实际不会并行执行
    for m in marking_list:
        # 传递json_list的副本以避免部分并发问题,但仍受GIL限制
        th = threading.Thread(target=__process_eliminate, args=(m, json_list[:]))
        th.start()
        threads.append(th)

    for thread in threads:
        thread.join()

    return result_mark, result # 在这个简单的多线程示例中,result/result_mark不会被正确填充

# 运行此代码会发现性能提升不明显,甚至可能更慢
# eliminated_markings, eliminated_data = eliminate_marking_threaded(marking, json_list)

如上所述,尽管使用了threading,但由于GIL的存在,这种方法在CPU密集型任务中无法实现真正的并行计算,耗时依然较长。

3. 利用multiprocessing实现真正的并行计算

为了克服GIL的限制,Python提供了multiprocessing模块。它允许程序创建独立的进程,每个进程都有自己的Python解释器和内存空间,因此它们可以真正地并行执行CPU密集型任务,不受GIL的影响。

3.1 核心思路

  1. 进程而非线程:使用Process代替Thread。
  2. 数据共享:由于每个进程有独立的内存空间,共享数据需要特殊机制,例如multiprocessing.Manager来创建可在进程间共享的数据结构(如列表、字典)。
  3. 任务分发:将大型任务(如marking_list)分割成更小的块(chunk),然后将这些块分发给不同的进程进行处理。

3.2 multiprocessing实现示例

import math
from difflib import SequenceMatcher
from multiprocessing import Process, Manager
import time # 用于计时演示

# 模拟大规模数据
# 注意:实际运行时请替换为您的真实数据
json_list_large = []
for i in range(25000):
    json_list_large.append({"code": f"{i:06d}", "phone_number": "...", "email": "...", "address": "...", "note": ""})
json_list_large.append({"code": "654564", "phone_number": "...", "email": "...", "address": "...", "note": ""})
json_list_large.append({"code": "hj876", "phone_number": "...", "email": "...", "address": "...", "note": ""})
json_list_large.append({"code": "876890", "phone_number": "...", "email": "...", "address": "...", "note": ""})

marking_large = []
for i in range(3500):
    marking_large.append(f"{i:06d}")
marking_large.extend(["654564", "hj876", "8768"])

def find_marking(x, y):
    """
    比较标记x与数据y的'code'字段的相似度。
    """
    text_match = SequenceMatcher(None, x, y.get('code')).ratio()
    if text_match == 1 or (0.98 <= text_match < 0.99):
        return y
    return None

def eliminate_marking_multiprocess(marking_list, json_list):
    """
    使用多进程并行处理标记列表,从json_list中查找匹配项。
    """
    manager = Manager()
    result_mark = manager.list() # 共享列表,用于存储匹配的标记
    result = manager.list()      # 共享列表,用于存储匹配的数据

    def __process_eliminate_chunk(sub_marking_list, data_scrap_copy, shared_result_mark, shared_result):
        """
        每个进程执行的函数,处理一部分标记列表。
        data_scrap_copy 是 json_list 的一个副本,进程对其的修改不会影响原始 json_list。
        """
        for marking_item in sub_marking_list:
            for data in data_scrap_copy: # 遍历json_list的副本
                result_data = find_marking(marking_item, data)
                if result_data:
                    # 将结果添加到共享列表中
                    shared_result_mark.append(marking_item)
                    shared_result.append(result_data)
                    # 注意:这里从data_scrap_copy中移除元素,只影响当前进程的副本,
                    # 且为了避免重复匹配,一旦找到一个匹配就跳出内层循环。
                    # 如果需要从原始json_list中“消除”,则需要更复杂的同步机制或在主进程中处理。
                    # data_scrap_copy.remove(data) # 如果需要确保每个标记只匹配一次,且从副本中移除
                    break # 找到匹配后,当前marking_item处理完毕,检查下一个marking_item

    processes = []
    # 根据CPU核心数或经验值设置chunk_size和num_processes
    # chunk_size决定了每个进程处理多少个marking
    chunk_size = max(1, len(marking_list) // (2 * (len(marking_list) // 1000 + 1))) # 动态调整chunk_size
    num_processes = math.ceil(len(marking_list) / chunk_size)

    print(f"Total markings: {len(marking_list)}, Chunk size: {chunk_size}, Number of processes: {num_processes}")

    for i in range(num_processes):
        start_idx = i * chunk_size
        end_idx = min((i + 1) * chunk_size, len(marking_list))
        sub_marking_list = marking_list[start_idx:end_idx]

        if not sub_marking_list:
            continue

        p = Process(
            target=__process_eliminate_chunk,
            # 传递json_list的副本给每个进程,避免进程间直接修改原始大列表的复杂同步问题
            args=(sub_marking_list, json_list[:], result_mark, result)
        )
        processes.append(p)
        p.start() # 启动进程

    for p in processes:
        p.join() # 等待所有进程完成

    manager.shutdown() # 关闭Manager,释放资源
    return list(result_mark), list(result) # 将Manager.list转换为普通Python列表

# 运行多进程版本
print("Starting multiprocessing elimination...")
start_time = time.time()
eliminated_markings, eliminated_data = eliminate_marking_multiprocess(marking_large, json_list_large)
end_time = time.time()
print(f"Multiprocessing finished in {end_time - start_time:.2f} seconds.")

print(f"Found {len(eliminated_markings)} matches.")
# print("Eliminated Markings:", eliminated_markings[:5]) # 打印前5个示例
# print("Eliminated Data:", eliminated_data[:5]) # 打印前5个示例

3.3 代码解析与注意事项

  1. multiprocessing.Manager:
    • Manager() 创建一个管理器对象,它允许你创建可在不同进程间共享的Python对象。
    • manager.list() 创建一个可以在多个进程中安全访问和修改的列表。这解决了传统列表在多进程环境下修改时可能出现的竞争条件和数据不一致问题。result_mark 和 result 就是通过这种方式创建的共享列表。
  2. 任务分块 (chunk_size):
    • marking_list 被分割成若干个子列表(sub_marking_list)。每个进程负责处理一个子列表。
    • 合理设置 chunk_size 很重要。过小的块可能导致进程创建和管理的开销过大;过大的块可能导致某些进程负载不均。可以根据实际CPU核心数和任务特性进行调整。
  3. 进程创建与执行:
    • Process(target=__process_eliminate_chunk, args=(...)) 创建一个新进程,并指定其执行的函数和传递的参数。
    • p.start() 启动进程。
    • p.join() 等待子进程完成。主进程会阻塞,直到所有子进程都执行完毕。
  4. json_list[:] 的作用:
    • 在 args=(sub_marking_list, json_list[:], ...) 中,json_list[:] 创建了 json_list 的一个浅拷贝。这意味着每个子进程都会收到 json_list 的一个独立副本。
    • 重要提示:如果子进程内部对 data_scrap_copy(即 json_list 的副本)进行 remove 操作,这只会影响该进程自身的副本,而不会修改原始的 json_list。如果目标是实际从原始 json_list 中移除匹配项,则需要更复杂的策略,例如让每个进程返回其匹配到的项的索引,然后在主进程中统一处理移除,或者使用 Manager().list() 来包装 json_list 并进行同步操作,但这会引入更多的复杂性和潜在的性能瓶颈。在当前示例中,我们主要关注的是收集匹配的标记和数据,而不是原地修改原始 json_list。
  5. manager.shutdown():
    • 在所有进程完成工作后,调用 manager.shutdown() 来关闭管理器并释放其资源。
  6. 结果转换:
    • Manager().list() 返回的对象是特殊的代理对象。为了在主进程中像普通列表一样操作它们,通常需要将其转换为标准的Python列表,例如 list(result_mark)。

4. 总结

当Python程序遇到CPU密集型任务,且多线程无法带来性能提升时,multiprocessing模块是更优的选择。通过创建独立的进程,multiprocessing能够绕过GIL的限制,实现真正的并行计算,从而显著缩短程序的执行时间。在使用multiprocessing时,需要注意进程间数据共享的机制(如Manager)以及任务分发策略,以确保程序的正确性和高效性。合理地应用多进程技术,可以有效提升Python在处理大规模数据和计算密集型任务时的性能表现。

今天关于《提升Python数据处理性能:从多线程到多进程的优化实践 》的内容介绍就到此结束,如果有什么疑问或者建议,可以在golang学习网公众号下多多回复交流;文中若有不正之处,也希望回复留言以告知!

win10无法弹出U盘怎么办_解决Win10安全删除硬件并弹出媒体功能失效的问题win10无法弹出U盘怎么办_解决Win10安全删除硬件并弹出媒体功能失效的问题
上一篇
win10无法弹出U盘怎么办_解决Win10安全删除硬件并弹出媒体功能失效的问题
将有符号字节数组转换为无符号整数数组:Java 实现指南
下一篇
将有符号字节数组转换为无符号整数数组:Java 实现指南
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    543次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    516次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    500次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    485次学习
查看更多
AI推荐
  • ChatExcel酷表:告别Excel难题,北大团队AI助手助您轻松处理数据
    ChatExcel酷表
    ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
    3179次使用
  • Any绘本:开源免费AI绘本创作工具深度解析
    Any绘本
    探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
    3390次使用
  • 可赞AI:AI驱动办公可视化智能工具,一键高效生成文档图表脑图
    可赞AI
    可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
    3418次使用
  • 星月写作:AI网文创作神器,助力爆款小说速成
    星月写作
    星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
    4524次使用
  • MagicLight.ai:叙事驱动AI动画视频创作平台 | 高效生成专业级故事动画
    MagicLight
    MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
    3798次使用
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码