关于使用Redisson订阅数问题
本篇文章给大家分享《关于使用Redisson订阅数问题》,覆盖了数据库的常见基础知识,其实一个语言的全部知识点一篇文章是不可能说完的,但希望通过这些问题,让读者对自己的掌握程度有一定的认识(B 数),从而弥补自己的不足,更好的掌握它。
一、前提
最近在使用分布式锁redisson时遇到一个线上问题:发现是subscriptionsPerConnection or subscriptionConnectionPoolSize 的大小不够,需要提高配置才能解决。
二、源码分析
下面对其源码进行分析,才能找到到底是什么逻辑导致问题所在:
1、RedissonLock#lock() 方法
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 尝试获取,如果ttl == null,则表示获取锁成功
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// 订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题
RFuture<redissonlockentry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
// 后面代码忽略
try {
// 无限循环获取锁,直到获取锁成功
// ...
} finally {
// 取消订阅锁释放事件
unsubscribe(future, threadId);
}
}</redissonlockentry>
总结下主要逻辑:
- 获取当前线程的线程id;
- tryAquire尝试获取锁,并返回ttl
- 如果ttl为空,则结束流程;否则进入后续逻辑;
- this.subscribe(threadId)订阅当前线程,返回一个RFuture;
- 如果在指定时间没有监听到,则会产生如上异常。
- 订阅成功后, 通过while(true)循环,一直尝试获取锁
- fially代码块,会解除订阅
所以上述这情况问题应该出现在subscribe()方法中
2、详细看下subscribe()方法
protected RFuture<redissonlockentry> subscribe(long threadId) {
// entryName 格式:“id:name”;
// channelName 格式:“redisson_lock__channel:name”;
return pubSub.subscribe(getEntryName(), getChannelName());
}</redissonlockentry>
RedissonLock#pubSub 是在RedissonLock构造函数中初始化的:
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
// ....
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
而subscribeService在MasterSlaveConnectionManager的实现中又是通过如下方式构造的
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {
this(config, id);
this.config = cfg;
// 初始化
initTimer(cfg);
initSingleEntry();
}
protected void initTimer(MasterSlaveServersConfig config) {
int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout()};
Arrays.sort(timeouts);
int minTimeout = timeouts[0];
if (minTimeout % 100 != 0) {
minTimeout = (minTimeout % 100) / 2;
} else if (minTimeout == 100) {
minTimeout = 50;
} else {
minTimeout = 100;
}
timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, false);
connectionWatcher = new IdleConnectionWatcher(this, config);
// 初始化:其中this就是MasterSlaveConnectionManager实例,config则为MasterSlaveServersConfig实例:
subscribeService = new PublishSubscribeService(this, config);
}
PublishSubscribeService构造函数
private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);
public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super();
this.connectionManager = connectionManager;
this.config = config;
for (int i = 0; i
<h3>3、回到subscribe()方法主要逻辑还是交给了 LockPubSub#subscribe()里面</h3>
<pre class="brush:java;">private final ConcurrentMap<string e> entries = new ConcurrentHashMap();
public RFuture<e> subscribe(String entryName, String channelName) {
// 从PublishSubscribeService获取对应的信号量。 相同的channelName获取的是同一个信号量
// public AsyncSemaphore getSemaphore(ChannelName channelName) {
// return locks[Math.abs(channelName.hashCode() % locks.length)];
// }
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
AtomicReference<runnable> listenerHolder = new AtomicReference<runnable>();
RPromise<e> newPromise = new RedissonPromise<e>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return semaphore.remove(listenerHolder.get());
}
};
Runnable listener = new Runnable() {
@Override
public void run() {
// 如果存在RedissonLockEntry, 则直接利用已有的监听
E entry = entries.get(entryName);
if (entry != null) {
entry.acquire();
semaphore.release();
entry.getPromise().onComplete(new TransferListener<e>(newPromise));
return;
}
E value = createEntry(newPromise);
value.acquire();
E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.acquire();
semaphore.release();
oldValue.getPromise().onComplete(new TransferListener<e>(newPromise));
return;
}
// 创建监听,
RedisPubSubListener<object> listener = createListener(channelName, value);
// 订阅监听
service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
}
};
// 最终会执行listener.run方法
semaphore.acquire(listener);
listenerHolder.set(listener);
return newPromise;
}</object></e></e></e></e></runnable></runnable></e></string>
AsyncSemaphore#acquire()方法
public void acquire(Runnable listener) {
acquire(listener, 1);
}
public void acquire(Runnable listener, int permits) {
boolean run = false;
synchronized (this) {
// counter初始化值为1
if (counter
<p>梳理上述逻辑:</p>
<p style="margin-left:40px">1、从PublishSubscribeService获取对应的信号量, 相同的channelName获取的是同一个信号量<br>2、如果是第一次请求,则会立马执行listener.run()方法, 否则需要等上个线程获取到该信号量执行完方能执行;<br>3、如果已经存在RedissonLockEntry, 则利用已经订阅就行<br>4、如果不存在RedissonLockEntry, 则会创建新的RedissonLockEntry,然后进行。</p>
<p>从上面代码看,主要逻辑是交给了PublishSubscribeService#subscribe方法</p>
<h3>4、PublishSubscribeService#subscribe逻辑如下:</h3>
<pre class="brush:java;">private final ConcurrentMap<channelname pubsubconnectionentry> name2PubSubConnection = new ConcurrentHashMap();
private final Queue<pubsubconnectionentry> freePubSubConnections = new ConcurrentLinkedQueue();
public RFuture<pubsubconnectionentry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener>... listeners) {
RPromise<pubsubconnectionentry> promise = new RedissonPromise<pubsubconnectionentry>();
// 主要逻辑入口, 这里要主要channelName每次都是新对象, 但内部覆写hashCode+equals。
subscribe(codec, new ChannelName(channelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners);
return promise;
}
private void subscribe(Codec codec, ChannelName channelName, RPromise<pubsubconnectionentry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener>... listeners) {
PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);
if (connEntry != null) {
// 从已有Connection中取,如果存在直接把listeners加入到PubSubConnectionEntry中
addListeners(channelName, promise, type, lock, connEntry, listeners);
return;
}
// 没有时,才是最重要的逻辑
freePubSubLock.acquire(new Runnable() {
@Override
public void run() {
if (promise.isDone()) {
lock.release();
freePubSubLock.release();
return;
}
// 从队列中取头部元素
PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
if (freeEntry == null) {
// 第一次肯定是没有的需要建立
connect(codec, channelName, promise, type, lock, listeners);
return;
}
// 如果存在则尝试获取,如果remainFreeAmount小于0则抛出异常终止了。
int remainFreeAmount = freeEntry.tryAcquire();
if (remainFreeAmount == -1) {
throw new IllegalStateException();
}
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
if (oldEntry != null) {
freeEntry.release();
freePubSubLock.release();
addListeners(channelName, promise, type, lock, oldEntry, listeners);
return;
}
// 如果remainFreeAmount=0, 则从队列中移除
if (remainFreeAmount == 0) {
freePubSubConnections.poll();
}
freePubSubLock.release();
// 增加监听
RFuture<void> subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners);
ChannelFuture future;
if (PubSubType.PSUBSCRIBE == type) {
future = freeEntry.psubscribe(codec, channelName);
} else {
future = freeEntry.subscribe(codec, channelName);
}
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
if (!promise.isDone()) {
subscribeFuture.cancel(false);
}
return;
}
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
subscribeFuture.cancel(false);
}
}, config.getTimeout(), TimeUnit.MILLISECONDS);
}
});
}
});
}
private void connect(Codec codec, ChannelName channelName, RPromise<pubsubconnectionentry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener>... listeners) {
// 根据channelName计算出slot获取PubSubConnection
int slot = connectionManager.calcSlot(channelName.getName());
RFuture<redispubsubconnection> connFuture = nextPubSubConnection(slot);
promise.onComplete((res, e) -> {
if (e != null) {
((RPromise<redispubsubconnection>) connFuture).tryFailure(e);
}
});
connFuture.onComplete((conn, e) -> {
if (e != null) {
freePubSubLock.release();
lock.release();
promise.tryFailure(e);
return;
}
// 这里会从配置中读取subscriptionsPerConnection
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
// 每获取一次,subscriptionsPerConnection就会减直到为0
int remainFreeAmount = entry.tryAcquire();
// 如果旧的存在,则将现有的entry释放,然后将listeners加入到oldEntry中
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
freePubSubLock.release();
addListeners(channelName, promise, type, lock, oldEntry, listeners);
return;
}
if (remainFreeAmount > 0) {
// 加入到队列中
freePubSubConnections.add(entry);
}
freePubSubLock.release();
RFuture<void> subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners);
// 这里真正的进行订阅(底层与redis交互)
ChannelFuture future;
if (PubSubType.PSUBSCRIBE == type) {
future = entry.psubscribe(codec, channelName);
} else {
future = entry.subscribe(codec, channelName);
}
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
if (!promise.isDone()) {
subscribeFuture.cancel(false);
}
return;
}
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
subscribeFuture.cancel(false);
}
}, config.getTimeout(), TimeUnit.MILLISECONDS);
}
});
});
}</void></redispubsubconnection></redispubsubconnection></pubsubconnectionentry></void></pubsubconnectionentry></pubsubconnectionentry></pubsubconnectionentry></pubsubconnectionentry></pubsubconnectionentry></channelname>
PubSubConnectionEntry#tryAcquire方法, subscriptionsPerConnection代表了每个连接的最大订阅数。当tryAcqcurie的时候会减少这个数量:
public int tryAcquire() {
while (true) {
int value = subscribedChannelsAmount.get();
if (value == 0) {
return -1;
}
if (subscribedChannelsAmount.compareAndSet(value, value - 1)) {
return value - 1;
}
}
}
梳理上述逻辑:
1、还是进行重复判断, 根据channelName从name2PubSubConnection中获取,看是否存在已经订阅:PubSubConnectionEntry; 如果存在直接把新的listener加入到PubSubConnectionEntry。
2、从队列freePubSubConnections中取公用的PubSubConnectionEntry, 如果没有就进入connect()方法
2.1 会根据subscriptionsPerConnection创建PubSubConnectionEntry, 然后调用其tryAcquire()方法 - 每调用一次就会减1
2.2 将新的PubSubConnectionEntry放入全局的name2PubSubConnection, 方便后续重复使用;
2.3 同时也将PubSubConnectionEntry放入队列freePubSubConnections中。- remainFreeAmount > 0
2.4 后面就是进行底层的subscribe和addListener
3、如果已经存在PubSubConnectionEntry,则利用已有的PubSubConnectionEntry进行tryAcquire;
4、如果remainFreeAmount 5、最后也是进行底层的subscribe和addListener;
三 总结
根因: 从上面代码分析, 导致问题的根因是因为PublishSubscribeService 会使用公共队列中的freePubSubConnections, 如果同一个key一次性请求超过subscriptionsPerConnection它的默认值5时,remainFreeAmount就可能出现-1的情况, 那么就会导致commandExecutor.syncSubscription(future)中等待超时,也就抛出如上异常Subscribe timeout: (7500ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.
解决方法: 在初始化Redisson可以可指定这个配置项的值。
相关参数的解释以及默认值请参考官网:https://github.com/redisson/redisson/wiki/2.-Configuration#23-common-settings
今天关于《关于使用Redisson订阅数问题》的内容介绍就到此结束,如果有什么疑问或者建议,可以在golang学习网公众号下多多回复交流;文中若有不正之处,也希望回复留言以告知!
redis的list数据类型相关命令介绍及使用
- 上一篇
- redis的list数据类型相关命令介绍及使用
- 下一篇
- Redis 使用 List 实现消息队列的优缺点
-
- 数据库 · Redis | 16小时前 |
- 监控Redis集群健康状态的工具与指标
- 112浏览 收藏
-
- 数据库 · Redis | 1星期前 |
- Redis数据安全防护全攻略
- 252浏览 收藏
-
- 数据库 · Redis | 2星期前 |
- Redis主从复制故障排查与修复技巧
- 302浏览 收藏
-
- 数据库 · Redis | 2星期前 |
- Redis与HBase存储方案详解
- 325浏览 收藏
-
- 数据库 · Redis | 2星期前 |
- Redis数据安全防护全攻略
- 157浏览 收藏
-
- 数据库 · Redis | 2星期前 |
- 高并发Redis优化技巧分享
- 257浏览 收藏
-
- 数据库 · Redis | 2星期前 |
- Redis数据安全防护全攻略
- 398浏览 收藏
-
- 数据库 · Redis | 3星期前 |
- Redis配置加密方法与安全设置
- 232浏览 收藏
-
- 数据库 · Redis | 3星期前 |
- RedisHyperLogLog高效统计技巧
- 283浏览 收藏
-
- 数据库 · Redis | 3星期前 |
- Redis与MySQL缓存同步方法详解
- 141浏览 收藏
-
- 数据库 · Redis | 3星期前 |
- Redis布隆过滤器防穿透原理解析
- 312浏览 收藏
-
- 数据库 · Redis | 1个月前 |
- Redis容器化部署实战技巧分享
- 195浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 485次学习
-
- ChatExcel酷表
- ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
- 3167次使用
-
- Any绘本
- 探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
- 3380次使用
-
- 可赞AI
- 可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
- 3409次使用
-
- 星月写作
- 星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
- 4513次使用
-
- MagicLight
- MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
- 3789次使用
-
- redis复制有可能碰到的问题汇总
- 2023-01-01 501浏览
-
- 使用lua+redis解决发多张券的并发问题
- 2023-01-27 501浏览
-
- Redis应用实例分享:社交媒体平台设计
- 2023-06-21 501浏览
-
- 使用Python和Redis构建日志分析系统:如何实时监控系统运行状况
- 2023-08-08 501浏览
-
- 如何利用Redis和Python实现消息队列功能
- 2023-08-16 501浏览

