当前位置:首页 > 文章列表 > 数据库 > Redis > 基于redis乐观锁实现并发排队

基于redis乐观锁实现并发排队

来源:脚本之家 2023-02-25 10:17:50 0浏览 收藏

大家好,我们又见面了啊~本文《基于redis乐观锁实现并发排队》的内容中将会涉及到redis乐观锁等等。如果你正在学习数据库相关知识,欢迎关注我,以后会给大家带来更多数据库相关文章,希望我们能一起进步!下面就开始本文的正式内容~

有个需求场景是这样的,使用redis控制scrapy运行的数量。当系统的后台设置为4时,只允许scapry启动4个任务,多余的任务则进行排队。

概况

最近做了一个django + scrapy + celery + redis 的爬虫系统,客户购买的主机除了跑其他程序外,还要跑我开发的这套程序,所以需要手动控制scrapy的实例数量,避免过多的爬虫给系统造成负担。

流程设计

1、爬虫任务由用户以请求的方式发起,所有的用户的请求统一进入到celery进行排队;
2、任务数量控制的执行就交给reids,经由celery保存到redis,包含了爬虫启动所需要的必要信息,从redis取一条信息即可启动一个爬虫;
3、通过scrapyd的接口来获取当前在运行的爬虫数量,以便决定下一步流程:如果小于4,则从redis中取相应数量的信息来启动爬虫,如果大于等于4,则继续等待;
4、如果在运行爬虫的数量有所减少,则及时从reids中取相应数量的信息来启动爬虫。

代码实现

业务代码有点复杂和啰嗦,此处使用伪代码来演示

import redis

# 实例化一个redis连接池
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, decode_responses=True, db=4, password='')

r = redis.Redis(connection_pool=pool)
# 爬虫实例限制为4 即只允许4个scrapy实例在运行
limited = 4

# 声明redis的乐观锁
lock = r.Lock()

# lock.acquire中有while循环,即它会线程阻塞,直到当前线程获得redis的lock,才会继续往下执行代码
if lock.acquire():
	# 1、从reids中取一条爬虫信息
	info = redis.get() 
	
	# 2、while循环监听爬虫运行的数量
	while True:
		req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json()
		# 统计当前有多少个爬虫在运行
		running = req.get('running') + req.get('pending')
		
		# 3、判断是否等待还是要增加爬虫数量
		# 3.1 如果在运行的数量大于等于设置到量 则继续等待
		if running >= limited:
			continue
		
		# 3.2 如果小于 则启动爬虫
		start_scrapy(info)
		# 3.3 将info从redis中删除
		redis.delete(info)
		# 3.4 释放锁
		lock.release()
		break
		

当前,这只是伪代码而已,实际的业务逻辑可能是非常复杂的,如:

@shared_task
def scrapy_control(key_uuid):

    r = redis.Redis(connection_pool=pool)
    db = MysqlDB()
    speed_limited = db.fetch_config('REPTILE_SPEED')
    speed_limited = int(speed_limited[0])

    keywords_num = MysqlDB().fetch_config('SEARCH_RANDOM')
    keywords_num = int(keywords_num[0])


    # while True:
    lock = r.lock('lock')
    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 进入处理环节' +  '\n')
    try:
        # acquire默认阻塞 如果获取不到锁时 会一直阻塞在这个函数的while循环中
        if lock.acquire():
            with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 获得锁' +  '\n')
            # 1 从redis中获取信息
            redis_obj = json.loads(r.get(key_uuid))
            user_id = redis_obj.get('user_id')
            contents = redis_obj.get('contents')
            
            # 2 使用while循环处理核心逻辑          
            is_hold_print = True
            while True:
                req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json()
                running = req.get('running') + req.get('pending')
                # 3 如果仍然有足够的爬虫在运行 则hold住redis锁,等待有空余的爬虫位置让出
                if running >= speed_limited:
                    if is_hold_print:
                        with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 爬虫在运行,线程等待中' +  '\n')
                        is_hold_print = False
                    time.sleep(1)
                    continue
                
                # 4 有空余的爬虫位置 则往下走
                # 4.1 处理完所有的内容后 释放锁
                if len(contents) == 0:
                    r.delete(key_uuid)
                    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 任务已完成,从redis中删除' +  '\n')
                    lock.release()
                    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 释放锁' +  '\n')
                    break

                # 4.2 创建task任务
                task_uuid = str(uuid.uuid4())
                article_obj = contents.pop()
                article_id = article_obj.get('article_id')
                article = article_obj.get('content')
                try:
                    Task.objects.create(
                        task_uuid = task_uuid,
                        user_id = user_id,
                        article_id = article_id,
                        content = article
                    )
                except Exception as e:
                    with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + '->' + str(task_uuid) + ' 创建Task出错: ' + str(e) +  '\n')
                # finally:
                # 4.3 启动爬虫任务 即便创建task失败也会启动
                try:
                    task_chain(user_id, article, task_uuid, keywords_num)
                except Exception as e:
                    with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 启动任务链失败: ' + str(e) +  '\n')
                
                # 加入sleep 防止代码执行速度快于爬虫启动速度而导致当前线程启动额外的爬虫
                time.sleep(5)

    except Exception as e:
        with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 获得锁之后的操作出错: ' + str(e) +  '\n')
        lock.release()

小坑
scrapy启动速度相对较慢,所以while循环中,代码中执行到了爬虫的启动,需要sleep一下再去通过scrapyd接口获取爬虫运行的数量,如果立刻读取,可能会造成误判。

文中关于redis的知识介绍,希望对你的学习有所帮助!若是受益匪浅,那就动动鼠标收藏这篇《基于redis乐观锁实现并发排队》文章吧,也可关注golang学习网公众号了解相关技术文章。

版本声明
本文转载于:脚本之家 如有侵犯,请联系study_golang@163.com删除
redis中的配置以及密码设置方式redis中的配置以及密码设置方式
上一篇
redis中的配置以及密码设置方式
Redis配置外网可访问(redis远程连接不上)的方法
下一篇
Redis配置外网可访问(redis远程连接不上)的方法
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    542次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    508次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    497次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    484次学习
查看更多
AI推荐
  • 笔灵AI生成答辩PPT:高效制作学术与职场PPT的利器
    笔灵AI生成答辩PPT
    探索笔灵AI生成答辩PPT的强大功能,快速制作高质量答辩PPT。精准内容提取、多样模板匹配、数据可视化、配套自述稿生成,让您的学术和职场展示更加专业与高效。
    20次使用
  • 知网AIGC检测服务系统:精准识别学术文本中的AI生成内容
    知网AIGC检测服务系统
    知网AIGC检测服务系统,专注于检测学术文本中的疑似AI生成内容。依托知网海量高质量文献资源,结合先进的“知识增强AIGC检测技术”,系统能够从语言模式和语义逻辑两方面精准识别AI生成内容,适用于学术研究、教育和企业领域,确保文本的真实性和原创性。
    29次使用
  • AIGC检测服务:AIbiye助力确保论文原创性
    AIGC检测-Aibiye
    AIbiye官网推出的AIGC检测服务,专注于检测ChatGPT、Gemini、Claude等AIGC工具生成的文本,帮助用户确保论文的原创性和学术规范。支持txt和doc(x)格式,检测范围为论文正文,提供高准确性和便捷的用户体验。
    35次使用
  • 易笔AI论文平台:快速生成高质量学术论文的利器
    易笔AI论文
    易笔AI论文平台提供自动写作、格式校对、查重检测等功能,支持多种学术领域的论文生成。价格优惠,界面友好,操作简便,适用于学术研究者、学生及论文辅导机构。
    43次使用
  • 笔启AI论文写作平台:多类型论文生成与多语言支持
    笔启AI论文写作平台
    笔启AI论文写作平台提供多类型论文生成服务,支持多语言写作,满足学术研究者、学生和职场人士的需求。平台采用AI 4.0版本,确保论文质量和原创性,并提供查重保障和隐私保护。
    37次使用
查看更多
相关文章
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码