Python多线程多进程并发控制技巧
本文针对Python多进程/多线程环境下的并发读写挑战,提出了一种**写优先**的解决方案,并符合百度SEO优化。在多代理系统或共享数据资源的应用中,允许多个读取者并发访问,同时确保写入者拥有独占访问权和优先权。通过自定义`RWLock`类,利用`multiprocessing.JoinableQueue`(或`queue.Queue`)和共享变量,实现了**高效并发**和**数据一致性**。该读写锁机制允许并发读取,并在写入者需要独占访问时能及时中断读取操作。文章详细介绍了`RWLock`的设计与实现,包括核心类结构、读取者和写入者的操作方法,并提供了多进程和多线程环境下的示例应用,帮助开发者在Python并发编程中实现**一写多读,写优先**的访问控制,有效平衡并发性与数据一致性的需求。

本文深入探讨了在Python多进程或多线程环境中,如何实现一个写入者(Writer)对多个读取者(Reader)共享资源的并发访问控制,并赋予写入者优先权。通过设计一个自定义的`RWLock`(读写锁)类,利用`multiprocessing.JoinableQueue`(或`queue.Queue`)和共享变量,确保了数据一致性,允许并发读取,并在写入者需要独占访问时能及时中断读取操作。
引言:多进程/多线程环境下的读写并发挑战
在构建多代理系统或任何需要共享数据资源的并发应用时,一个常见的场景是存在一个或少数几个写入者进程/线程,以及多个读取者进程/线程。理想情况下,我们希望在写入者不操作时,多个读取者能够并行访问共享数据以提高效率;而当写入者需要修改数据时,它应获得独占访问权,并确保数据在修改过程中不会被读取,以维护数据的一致性。同时,写入者在需要写入时应具有优先权,能够及时中断正在进行的读取操作。
Python标准库中的multiprocessing.Lock或threading.Lock提供了互斥访问,但它们不允许并发读取。multiprocessing.Condition可以用于更复杂的线程间通信和同步,但直接实现“一写多读,写优先”的模式仍需精心设计,特别是要允许并发读取。
为了解决这一挑战,我们将构建一个自定义的读写锁(Read-Write Lock)机制,它能够:
- 允许任意数量的读取者同时读取数据。
- 确保写入者在写入时拥有独占访问权,且不会有任何读取者同时进行读取。
- 赋予写入者优先权,当它需要写入时,能够“请求”读取者尽快释放资源。
自定义读写锁(RWLock)的设计与实现
核心思想是为每个读取者分配一个独立的JoinableQueue。写入者通过这些队列来通知读取者新数据已准备好,并等待所有读取者完成当前读取周期。读取者则通过其队列来阻塞,等待写入者的通知。此外,我们引入一个共享的“停止”标志,允许写入者在紧急情况下请求读取者立即中断当前读取。
RWLock 类结构
我们首先定义一个RWLock类,它将封装读写锁的逻辑。这个类需要跟踪读取者的数量、为每个读取者分配的队列、一个用于控制紧急停止的标志,以及一个用于初始化队列计数的锁。
from multiprocessing import Process, Lock, Value, JoinableQueue
from threading import local # 用于存储每个进程/线程私有的队列
import time
class RWLock:
def __init__(self, num_readers: int):
"""
创建一个支持单写入者和多读取者的读写锁。
num_readers 参数指定了读取者的数量。
"""
if num_readers < 1 or not isinstance(num_readers, int):
raise ValueError('num_readers 必须是一个正整数。')
self._local_storage = local() # 线程/进程局部存储,用于分配队列
self._num_readers = num_readers
self._queue_count = Value('i', 0) # 共享整数,用于为读取者分配队列索引
self._stop = Value('i', 0) # 共享整数,停止标志,写入者设置,读取者检查
self._lock = Lock() # 保护 _queue_count 的互斥锁
self._queues = [JoinableQueue(1) for _ in range(self._num_readers)] # 为每个读取者创建队列关键属性解释:
- _local_storage: threading.local对象,用于在每个进程(或线程)中存储其私有的队列引用,避免跨进程/线程直接共享JoinableQueue对象本身。
- _num_readers: 预期的读取者数量。
- _queue_count: multiprocessing.Value,一个共享的整数值,用于为每个新的读取者分配一个唯一的队列。
- _stop: multiprocessing.Value,一个共享的整数标志,当写入者需要立即独占访问时,会将其设置为1。读取者会周期性检查此标志。
- _lock: multiprocessing.Lock,用于保护_queue_count在多个进程同时初始化时不会出现竞争。
- _queues: 一个JoinableQueue列表,每个读取者对应一个。JoinableQueue(1)表示队列容量为1,确保写入者每次只能放置一个信号。
读取者操作方法
acquire_for_reading()
读取者调用此方法来请求共享读取权限。
def acquire_for_reading(self) -> None:
"""读取者请求对数据的共享读取访问。"""
# 如果尚未分配队列,则分配一个:
queue = getattr(self._local_storage, 'queue', None)
if queue is None:
with self._lock:
queue = self._queues[self._queue_count.value]
self._queue_count.value += 1
self._local_storage.queue = queue
queue.get() # 阻塞,直到写入者放入一个信号(表示有新数据)工作原理:
- 每个读取者首次调用时,会通过_local_storage获取一个专属的JoinableQueue。_queue_count确保每个读取者拿到不同的队列。
- queue.get()是一个阻塞操作。读取者会在此处等待,直到写入者调用release_for_writing()并向其队列中放入一个项(通常是None),表示有新的数据可供读取。
release_for_reading()
读取者完成数据读取后,调用此方法释放权限。
def release_for_reading(self):
"""读取者完成对数据的共享读取访问。"""
self._local_storage.queue.task_done() # 通知队列,已处理一个项工作原理:
- task_done()通知JoinableQueue,之前通过get()获取的项已被处理。这对于写入者通过join()等待所有读取者完成操作至关重要。
is_stop_posted()
读取者周期性调用此方法,检查写入者是否请求立即停止读取。
def is_stop_posted(self) -> bool:
"""读取者周期性调用此函数,查看写入者是否需要立即独占共享资源。"""
return True if self._stop.value else False工作原理:
- 读取者在执行耗时读取任务时,应定期检查_stop标志。如果为True,则应尽快中断当前读取并释放锁。这是一个合作机制,需要读取者主动配合。
写入者操作方法
acquire_for_writing(immediate=True)
写入者调用此方法来请求独占写入权限。
def acquire_for_writing(self, immediate=True):
"""
获取对数据的独占访问权限。
如果 immediate 参数为 True,则请求读取者尽快放弃对数据的访问。
"""
if immediate:
self._stop.value = 1 # 设置停止标志,请求读取者立即中断
for queue in self._queues:
queue.join() # 阻塞,直到所有读取者完成当前周期并调用 task_done()工作原理:
- 如果immediate为True,写入者会设置_stop标志为1,通知所有读取者尽快停止。
- 写入者遍历所有读取者的JoinableQueue,并调用queue.join()。join()方法会阻塞,直到队列中的所有项都被get()并随后task_done()。由于写入者在release_for_writing时会向队列中放入项,所以join()会等待这些项被处理。在初始状态或写入者刚完成写入后,队列为空,join()会立即返回。当写入者需要再次写入时,它会等待所有读取者处理完上一轮的数据。
release_for_writing()
写入者完成数据写入后,调用此方法释放权限。
def release_for_writing(self) -> None:
"""放弃独占写入访问权限。"""
self._stop.value = 0 # 重置停止标志
for queue in self._queues:
queue.put(None) # 向每个读取者的队列中放入一个信号,唤醒它们工作原理:
- 写入者首先重置_stop标志,表示不再需要紧急停止。
- 然后,它向每个读取者的JoinableQueue中放入一个None(或其他任意项)。这将解除之前在acquire_for_reading()中阻塞的读取者。
多进程示例应用
现在,我们结合RWLock类和multiprocessing模块来构建一个实际的读写并发系统。
# 导入必要的模块
from multiprocessing import Process, Lock, Value, JoinableQueue
from threading import local
import time
# RWLock 类的定义如上所示,此处省略重复代码
class SharedValue:
"""一个简单的共享数据容器,使用 multiprocessing.Value"""
def __init__(self, initial_value=0):
self.value = Value('i', initial_value, lock=False) # lock=False表示手动管理锁
def reader(rw_lock, id, shared_data):
"""读取者进程的逻辑"""
while True:
rw_lock.acquire_for_reading() # 获取读取权限
# 模拟耗时的读取任务
# 在这里,我们应该周期性检查写入者是否要求停止
sleep_time = id / 10 # 不同的读取者模拟不同的读取时间
for _ in range(10):
time.sleep(sleep_time)
if rw_lock.is_stop_posted():
print(f'reader {id} 收到停止请求,中断读取。', flush=True)
break # 写入者请求停止,中断当前读取
print(f'reader {id} 完成处理数据: {shared_data.value}', flush=True)
rw_lock.release_for_reading() # 释放读取权限
def writer(rw_lock, shared_data):
"""写入者进程的逻辑"""
while True:
# 当 shared_data.value 等于 3 时,写入者将请求立即停止读取者
rw_lock.acquire_for_writing(immediate=(shared_data.value == 3))
shared_data.value.value += 1 # 修改共享数据
print(f'wrote {shared_data.value.value} at {time.time()}', flush=True)
rw_lock.release_for_writing() # 释放写入权限
def main_multiprocessing():
num_readers = 3
rw_lock = RWLock(num_readers)
shared_data = SharedValue(0) # 共享数据
# 创建并启动读取者进程
for id in range(1, num_readers + 1):
Process(target=reader, args=(rw_lock, id, shared_data), daemon=True).start()
# 创建并启动写入者进程
Process(target=writer, args=(rw_lock, shared_data), daemon=True).start()
input('按回车键终止程序:\n')
if __name__ == '__main__':
main_multiprocessing()运行示例输出解释: 当程序运行时,你会观察到读取者会并发地处理数据。写入者在每次写入后,会等待所有读取者完成当前数据的处理。当shared_data.value达到3时,写入者会设置immediate=True,这时读取者会更快地中断其模拟的读取任务,从而让写入者几乎立即获得写入权限。这演示了写入者优先和中断机制。
多线程环境下的适配
RWLock的设计同样适用于多线程环境。主要的区别在于:
- 将multiprocessing.Process替换为threading.Thread。
- 将multiprocessing.Lock替换为threading.Lock。
- 将multiprocessing.Value替换为普通的Python整数(因为线程共享同一进程的内存空间)。
- 将multiprocessing.JoinableQueue替换为queue.Queue。
from threading import Thread, Lock
from queue import Queue
from threading import local
import time
class RWLockMultiThreading:
def __init__(self, num_readers: int):
if num_readers < 1 or not isinstance(num_readers, int):
raise ValueError('num_readers 必须是一个正整数。')
self._local_storage = local()
self._num_readers = num_readers
self._queue_count = 0 # 普通整数,线程共享
self._stop = 0 # 普通整数,线程共享
self._lock = Lock() # threading.Lock
self._queues = [Queue(1) for _ in range(self._num_readers)] # queue.Queue
def acquire_for_reading(self) -> None:
queue = getattr(self._local_storage, 'queue', None)
if queue is None:
with self._lock:
queue = self._queues[self._queue_count]
self._queue_count += 1
self._local_storage.queue = queue
queue.get()
def release_for_reading(self):
self._local_storage.queue.task_done()
def acquire_for_writing(self, immediate=True):
if immediate:
self._stop = 1
for queue in self._queues:
queue.join()
def release_for_writing(self) -> None:
self._stop = 0
for queue in self._queues:
queue.put(None)
def is_stop_posted(self) -> bool:
return True if self._stop else False
class SharedValueThread:
"""一个简单的共享数据容器,适用于多线程"""
def __init__(self, initial_value=0):
self.value = initial_value # 普通整数,线程共享
def reader_thread(rw_lock, id, shared_data):
"""读取者线程的逻辑"""
while True:
rw_lock.acquire_for_reading()
sleep_time = id / 10
for _ in range(10):
time.sleep(sleep_time)
if rw_lock.is_stop_posted():
print(f'reader {id} (thread) 收到停止请求,中断读取。', flush=True)
break
print(f'reader {id} (thread) 完成处理数据: {shared_data.value}', flush=True)
rw_lock.release_for_reading()
def writer_thread(rw_lock, shared_data):
"""写入者线程的逻辑"""
while True:
rw_lock.acquire_for_writing(immediate=(shared_data.value == 3))
shared_data.value += 1
print(f'wrote {shared_data.value} at {time.time()} (thread)', flush=True)
rw_lock.release_for_writing()
def main_multithreading():
num_readers = 3
rw_lock = RWLockMultiThreading(num_readers)
shared_data = SharedValueThread(0)
for id in range(1, num_readers + 1):
Thread(target=reader_thread, args=(rw_lock, id, shared_data), daemon=True).start()
Thread(target=writer_thread, args=(rw_lock, shared_data), daemon=True).start()
input('按回车键终止程序:\n')
if __name__ == '__main__':
# 可以选择运行多进程或多线程示例
# main_multiprocessing()
main_multithreading()注意事项与总结
- 合作式中断: is_stop_posted()机制要求读取者是“合作式”的。如果读取者不定期检查此标志,或者在检查后不立即中断,那么写入者的“立即”优先权将无法有效实现。在实际应用中,耗时较长的读取操作应设计成可中断的。
- 队列数量与资源消耗: RWLock为每个读取者创建一个独立的JoinableQueue。当读取者数量非常庞大时,这可能会带来一定的资源开销。对于超大规模的读取者场景,可能需要考虑其他更优化的读写锁实现,例如基于信号量或条件变量的复杂状态机。
- 数据一致性: 本方案确保了写入者在写入时独占资源,从而保证了数据的一致性。读取者在获取读取权限后,读取的是写入者最近一次写入的完整数据。
- 适用场景: 这种自定义RWLock特别适用于一个写入者频繁更新数据,而多个读取者需要并发读取,且写入者对实时性有较高要求(即写入者需要时能快速获得控制权)的场景。
- 进程 vs. 线程: 选择multiprocessing还是threading取决于具体任务。multiprocessing适用于CPU密集型任务,利用多核优势;threading适用于I/O密集型任务,或者需要共享大量内存数据的场景。本教程提供了两种环境下的适配方案。
通过上述RWLock的实现,我们成功地在Python的并发编程中,为共享资源提供了一个高效且具有写入者优先权的一写多读访问机制,有效平衡了并发性与数据一致性的需求。
好了,本文到此结束,带大家了解了《Python多线程多进程并发控制技巧》,希望本文对你有所帮助!关注golang学习网公众号,给大家分享更多文章知识!
Win10程序固定到开始菜单技巧
- 上一篇
- Win10程序固定到开始菜单技巧
- 下一篇
- CSS空容器检测与布局优化技巧
-
- 文章 · python教程 | 4分钟前 |
- Pandasmerge_asof快速匹配最近时间数据
- 254浏览 收藏
-
- 文章 · python教程 | 23分钟前 |
- 列表推导式与生成器表达式区别解析
- 427浏览 收藏
-
- 文章 · python教程 | 41分钟前 |
- Pythonopen函数使用技巧详解
- 149浏览 收藏
-
- 文章 · python教程 | 44分钟前 |
- Python合并多个列表的几种方法
- 190浏览 收藏
-
- 文章 · python教程 | 53分钟前 |
- Python嵌套if语句使用方法详解
- 264浏览 收藏
-
- 文章 · python教程 | 58分钟前 |
- Python队列判空安全方法详解
- 293浏览 收藏
-
- 文章 · python教程 | 1小时前 |
- RuffFormatter尾随逗号设置方法
- 450浏览 收藏
-
- 文章 · python教程 | 1小时前 |
- Python读取二进制文件的缓冲方法
- 354浏览 收藏
-
- 文章 · python教程 | 2小时前 | Python 数据结构 namedtuple 扑克牌 Card
- Pythonnamedtuple打造扑克牌玩法详解
- 291浏览 收藏
-
- 文章 · python教程 | 3小时前 |
- PythonIQR方法检测异常值详解
- 478浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 485次学习
-
- ChatExcel酷表
- ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
- 3186次使用
-
- Any绘本
- 探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
- 3398次使用
-
- 可赞AI
- 可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
- 3429次使用
-
- 星月写作
- 星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
- 4535次使用
-
- MagicLight
- MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
- 3807次使用
-
- Flask框架安装技巧:让你的开发更高效
- 2024-01-03 501浏览
-
- Django框架中的并发处理技巧
- 2024-01-22 501浏览
-
- 提升Python包下载速度的方法——正确配置pip的国内源
- 2024-01-17 501浏览
-
- Python与C++:哪个编程语言更适合初学者?
- 2024-03-25 501浏览
-
- 品牌建设技巧
- 2024-04-06 501浏览

