当前位置:首页 > 文章列表 > 文章 > python教程 > Python多进程多线程锁实现详解

Python多进程多线程锁实现详解

2025-11-09 21:45:54 0浏览 收藏

本文深入解析了如何在Python多进程或多线程环境下,构建一个写优先、支持并发读写的读写锁机制。针对传统同步原语在复杂场景下的局限性,提出了一种基于`JoinableQueue`和共享变量的`RWLock`自定义类实现。该方案允许读者并发访问共享资源,同时确保写者在必要时能优先中断读操作,获得独占访问权。文章详细阐述了`RWLock`的设计原理和实现细节,包括读者如何获取和释放访问权,写者如何实现独占和中断,以及如何保证数据一致性。此外,还提供了多进程和多线程环境下的具体示例,展示了如何利用该机制解决共享资源访问的同步问题,兼顾数据一致性与系统响应性,为Python并发编程提供了实用的解决方案。

Python多进程/多线程读写锁实现:高效管理一写多读并发访问

本教程深入探讨了在Python多进程或多线程环境中,如何高效地实现一个写优先、多读并发的读写锁机制。通过自定义`RWLock`类,利用`JoinableQueue`和共享变量,确保读操作可以并发进行,而写操作在获得独占访问权时能优先中断读操作,从而解决共享资源访问的复杂同步问题,并兼顾数据一致性与系统响应性。

在并发编程中,处理共享资源的访问是核心挑战之一。尤其是在“一写多读”的场景下,我们通常希望读者能够并发地访问数据以提高效率,而写者在更新数据时必须获得独占访问权,以保证数据的一致性。更进一步,如果写者需要紧急写入,它应该能够优先获得控制权,甚至中断正在进行的读操作。

传统同步机制的局限

Python的multiprocessing模块提供了Lock、Semaphore、Condition等多种同步原语。虽然它们能够解决基本的互斥和通信问题,但在实现“一写多读,写优先且可中断读”的复杂场景时,仅使用这些基本原语可能不够灵活或效率不高。例如,Condition对象通常用于线程或进程间的事件通知,但它并不能直接提供读写并发和写者中断读者的机制。一个简单的Condition可能导致所有读者在等待写者通知时阻塞,无法实现并发读。

为了满足这些特定需求,我们需要一种更高级的同步机制——读写锁(Read-Write Lock)。

读写锁(RWLock)设计原理

自定义的RWLock旨在解决以下问题:

  1. 并发读:允许多个读者同时访问共享数据。
  2. 写者独占:写者在写入时必须独占资源,防止读者读取到不一致的数据。
  3. 写者优先:写者可以请求立即获得控制权,中断正在进行的读操作。
  4. 数据一致性:读者总是读取到写者完成写入后的最新、一致的数据版本。

本方案的核心思想是为每个读者进程(或线程)分配一个独立的multiprocessing.JoinableQueue。写者通过这些队列来协调读者的行为。

  • 读者获取访问权:每个读者在尝试读取数据时,会从它自己的队列中执行一个阻塞的get()操作。这意味着读者会一直等待,直到写者将一个项放入其队列中,表明有新数据可读。
  • 读者释放访问权:读者完成数据处理后,会调用其队列的task_done()方法,通知写者它已完成当前批次数据的读取。
  • 写者获取独占权:当写者需要写入时,它会遍历所有读者的队列并调用join()方法。join()会阻塞,直到所有读者都对队列中的所有项(由写者之前放入)调用了task_done()。这确保了写者在写入之前,所有读者都已处理完上一批数据。
  • 写者释放独占权:写者完成写入后,会向每个读者的队列中放入一个项(例如None),从而唤醒所有等待的读者,告知它们有新数据可读。
  • 写者优先中断:为了实现写者优先中断,引入了一个共享的_stop标志(multiprocessing.Value)。当写者需要立即获得控制权时,它可以设置此标志。读者进程在执行读取任务期间,应定期检查此_stop标志。如果发现标志被设置,读者应立即停止当前读取任务,并释放其访问权(调用task_done()),从而允许写者尽快获得独占权。

RWLock类实现详解 (多进程版)

以下是基于multiprocessing模块实现的RWLock类:

from multiprocessing import Process, Lock, Value, JoinableQueue
from threading import local
import time

class RWLock:
    def __init__(self, num_readers: int):
        """
        创建一个支持单写多读的读写锁。
        num_readers: 读者进程的数量。
        """
        if num_readers < 1 or not isinstance(num_readers, int):
            raise ValueError('num_readers 必须是正整数。')
        self._local_storage = local()  # 用于存储每个进程/线程的私有数据,如其专属队列
        self._num_readers = num_readers
        self._queue_count = Value('i', 0)  # 计数器,用于为读者分配队列
        self._stop = Value('i', 0)         # 停止标志,写者设置,读者检查
        self._lock = Lock()                # 用于保护_queue_count的互斥锁
        self._queues = [JoinableQueue(1) for _ in range(self._num_readers)] # 每个读者一个队列

    def acquire_for_reading(self) -> None:
        """读者请求共享读访问权限。"""
        # 如果当前进程/线程尚未分配队列,则分配一个
        queue = getattr(self._local_storage, 'queue', None)
        if queue is None:
            with self._lock:
                queue = self._queues[self._queue_count.value]
                self._queue_count.value += 1
            self._local_storage.queue = queue
        queue.get()  # 阻塞,等待写者写入新数据并发出信号

    def release_for_reading(self):
        """读者完成共享读访问。"""
        self._local_storage.queue.task_done() # 通知写者当前批次数据已处理完毕

    def acquire_for_writing(self, immediate=True):
        """
        获取独占写访问权限。
        如果 immediate 为 True,则请求读者尽快释放访问权限。
        """
        if immediate:
            self._stop.value = 1  # 设置停止标志,通知读者尽快停止

        for queue in self._queues:
            queue.join()  # 阻塞,直到所有读者完成当前数据处理

    def release_for_writing(self) -> None:
        """释放独占写访问权限。"""
        self._stop.value = 0  # 重置停止标志
        for queue in self._queues:
            queue.put(None)  # 向每个读者队列放入一个项,唤醒等待的读者

    def is_stop_posted(self) -> bool:
        """
        读者定期调用此函数,检查写者是否请求立即独占控制。
        """
        return True if self._stop.value else False

代码解析:

  • __init__: 初始化时创建指定数量的JoinableQueue,每个队列对应一个读者。_queue_count用于确保每个读者进程只分配一个唯一的队列。_stop是一个multiprocessing.Value,用于进程间共享的停止标志。
  • acquire_for_reading: 读者首次调用时,会从_queues中获取一个队列并存储在_local_storage中,确保每个读者进程有自己的专属队列。然后调用queue.get()阻塞,等待写者释放新数据。
  • release_for_reading: 读者完成读取后,调用queue.task_done()。这是JoinableQueue的关键特性,它会减少队列中未完成任务的计数,当计数为零时,join()操作将解除阻塞。
  • acquire_for_writing: 写者调用此方法来获取写锁。如果immediate为True,它会设置_stop标志,通知读者尽快停止。然后,它对所有读者的队列调用join(),这将阻塞直到所有读者都完成了它们当前批次的数据处理。
  • release_for_writing: 写者完成写入后,重置_stop标志,然后向每个读者的队列中放入一个None,这会唤醒所有在acquire_for_reading中等待的读者。
  • is_stop_posted: 读者进程在处理数据时应周期性地调用此方法,检查_stop标志。如果返回True,表示写者请求立即停止,读者应尽快完成当前操作并释放锁。

多进程示例应用

以下示例展示了如何在多进程环境中使用RWLock来协调一个写者和多个读者:

# 共享数据类,用于在进程间传递数据
class SharedData:
    def __init__(self, initial_value=0):
        self.value = Value('i', initial_value, lock=False) # 使用multiprocessing.Value作为共享数据

def reader(rw_lock, id, shared_data):
    """读者进程函数"""
    while True:
        rw_lock.acquire_for_reading() # 获取读锁
        # 模拟长时间的读取任务
        # 在读取过程中,定期检查写者是否请求中断
        sleep_time = id / 10 # 不同读者有不同的模拟读取时间
        for _ in range(10):
            time.sleep(sleep_time)
            if rw_lock.is_stop_posted():
                print(f'reader {id} 收到停止请求,提前中断。', flush=True)
                break # 收到停止请求,立即中断

        # 实际读取共享数据
        current_value = shared_data.value.value
        print(f'reader {id} 完成处理数据: {current_value}', flush=True)
        rw_lock.release_for_reading() # 释放读锁
        time.sleep(0.1) # 模拟处理完后的小间歇

def writer(rw_lock, shared_data):
    """写者进程函数"""
    while True:
        # 当共享数据值为3时,写者请求立即写入
        # 否则,等待所有读者完成当前批次读取
        rw_lock.acquire_for_writing(immediate=(shared_data.value.value == 3))
        shared_data.value.value += 1 # 写入新数据
        print(f'wrote {shared_data.value.value} at {time.time()}', flush=True)
        rw_lock.release_for_writing() # 释放写锁
        time.sleep(1) # 模拟写完后的小间歇

def main():
    num_readers = 3
    rw_lock = RWLock(num_readers)
    shared_data = SharedData() # 共享数据实例

    # 启动读者进程
    for id in range(1, num_readers + 1):
        Process(target=reader, args=(rw_lock, id, shared_data), daemon=True).start()

    # 启动写者进程
    Process(target=writer, args=(rw_lock, shared_data), daemon=True).start()

    input('按 Enter 键终止程序:\n')

if __name__ == '__main__':
    main()

示例输出分析:

Hit enter to terminate:
wrote 1 at 1704820185.6386113
reader 1 done processing 1
reader 2 done processing 1
reader 3 done processing 1
wrote 2 at 1704820188.7424514
reader 1 done processing 2
reader 2 done processing 2
reader 3 done processing 2
wrote 3 at 1704820191.8461268
reader 1 done processing 3
reader 2 done processing 3
reader 3 done processing 3
wrote 4 at 1704820192.1564832  <-- 注意这里,写操作几乎立即发生
reader 1 done processing 4
reader 2 done processing 4
reader 3 done processing 4
wrote 5 at 1704820195.2668517
...

从输出中可以看出,当shared_data.value达到3时,写者调用acquire_for_writing(immediate=True)。此时,读者进程会因为is_stop_posted()返回True而中断其模拟的长时间读取任务,从而使写者能够更快地获取写锁并写入4。这证明了写者优先和中断机制的有效性。在通常情况下,写者会等待所有读者完成当前数据处理(由最慢的读者决定),但在immediate=True的场景下,写者可以大大缩短等待时间。

多线程环境下的优化

上述RWLock类是为多进程设计的,使用了multiprocessing模块中的同步原语(Process, Lock, Value, JoinableQueue)。如果您的应用是基于多线程而不是多进程,可以进行一些优化,将这些原语替换为threading模块和标准queue模块的对应实现,因为线程间的通信开销通常小于进程间通信。

主要变化包括:

  • multiprocessing.Process 替换为 threading.Thread。
  • multiprocessing.Lock 替换为 threading.Lock。
  • multiprocessing.Value 替换为普通的Python int变量(因为线程共享内存)。
  • multiprocessing.JoinableQueue 替换为 queue.Queue。

以下是适用于多线程的RWLockMultiThreading类:

from threading import Thread, Lock
from queue import Queue
from threading import local
import time

class RWLockMultiThreading:
    def __init__(self, num_readers: int):
        """
        创建一个支持单写多读的读写锁,适用于多线程环境。
        num_readers: 读者线程的数量。
        """
        if num_readers < 1 or not isinstance(num_readers, int):
            raise ValueError('num_readers 必须是正整数。')
        self._local_storage = local()
        self._num_readers = num_readers
        self._queue_count = 0  # 普通整数变量,线程共享
        self._stop = 0         # 普通整数变量,线程共享
        self._lock = Lock()    # threading.Lock
        self._queues = [Queue(1) for _ in range(self._num_readers)] # queue.Queue

    def acquire_for_reading(self) -> None:
        """读者线程请求共享读访问权限。"""
        queue = getattr(self._local_storage, 'queue', None)
        if queue is None:
            with self._lock:
                queue = self._queues[self._queue_count]
                self._queue_count += 1
            self._local_storage.queue = queue
        queue.get()  # 阻塞,等待写者写入新数据并发出信号

    def release_for_reading(self):
        """读者线程完成共享读访问。"""
        self._local_storage.queue.task_done() # 通知写者当前批次数据已处理完毕

    def acquire_for_writing(self, immediate=True):
        """
        获取独占写访问权限。
        如果 immediate 为 True,则请求读者线程尽快释放访问权限。
        """
        if immediate:
            self._stop = 1  # 设置停止标志

        for queue in self._queues:
            queue.join()  # 阻塞,直到所有读者完成当前数据处理

    def release_for_writing(self) -> None:
        """释放独占写访问权限。"""
        self._stop = 0  # 重置停止标志
        for queue in self._queues:
            queue.put(None)  # 向每个读者队列放入一个项,唤醒等待的读者

    def is_stop_posted(self) -> bool:
        """
        读者线程定期调用此函数,检查写者是否请求立即独占控制。
        """
        return True if self._stop else False

# 多线程示例应用
class SharedValue: # 简单的共享值类,线程共享
    def __init__(self):
        self.value = 0

def reader_thread(rw_lock, id, shared_data):
    """读者线程函数"""
    while True:
        rw_lock.acquire_for_reading()
        sleep_time = id / 10
        for _ in range(10):
            time.sleep(sleep_time)
            if rw_lock.is_stop_posted():
                print(f'reader {id} (thread) 收到停止请求,提前中断。', flush=True)

今天带大家了解了的相关知识,希望对你有所帮助;关于文章的技术知识我们会一点点深入介绍,欢迎大家关注golang学习网公众号,一起学习编程~

剪映PC版跟踪方法及教程详解剪映PC版跟踪方法及教程详解
上一篇
剪映PC版跟踪方法及教程详解
HTML5拖拽实现教程详解
下一篇
HTML5拖拽实现教程详解
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    543次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    516次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    500次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    485次学习
查看更多
AI推荐
  • ChatExcel酷表:告别Excel难题,北大团队AI助手助您轻松处理数据
    ChatExcel酷表
    ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
    3173次使用
  • Any绘本:开源免费AI绘本创作工具深度解析
    Any绘本
    探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
    3386次使用
  • 可赞AI:AI驱动办公可视化智能工具,一键高效生成文档图表脑图
    可赞AI
    可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
    3415次使用
  • 星月写作:AI网文创作神器,助力爆款小说速成
    星月写作
    星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
    4520次使用
  • MagicLight.ai:叙事驱动AI动画视频创作平台 | 高效生成专业级故事动画
    MagicLight
    MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
    3793次使用
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码