集群的概述
集群和其他架构的区别
分布式缓存是由至少三个master节点及每个master节点用于的一个或多个slave节点组成的redis集群。master节点是活跃节点,slave是备份,用于在master宕机时竞选成为master。master和自己的slave在一个真实(或虚拟的)node上面,或者一个node上面部署多个主从组。
集群和主从、哨兵两种架构的本质区别是,主从和哨兵都是有特定的master节点,连接也是针对这个特定master并且这个master持有全量的数据可以进行读写(哨兵的master可以切换),而其他节点只是通过复制能力同步了一份相同的数据,而集群架构是多个master,每个节点是平级的,它们只持有这个redis集群中全量数据的一部分
集群数据分布的几种模式
RedisCluster采用的是哈希分区,将数据哈希到不同的节点上,有以下几种方案:
节点取余
最简单的哈希模式,使用特定的数据,如Redis的键或用户ID,再根据节点数量N使用公式: hash(key)% N
计算出哈希值
如果有3台,则N=3
这种方案很简单,但是如果需要扩缩容,比如3变成6,那么N=6,这样历史数据都需要全部进行再哈希,重算分布,这样扩容的影响非常大
一致性哈希
一致性哈希算法用于解决普通哈希算法解决不了的扩缩容问题
一致性哈希算法通过构造环的形式,解决这一问题。首先构造232个槽点,即0~232-1,再让节点分布在环上。计算哈希值时固定除232次方取模,落在环上的对应槽点上,顺时针旋转遇到的第一个node即要分配的node。
具体算法知识参考算法部分补链接
这种方式相比节点取余最大的好处在于加入和删除节点只影响哈希环中相邻的节点,对其他节点无影响,因为上面的的算法是找到槽,顺时针找到第一个node
但是使用这种算法后,一旦扩容还是有少部分数据无法读取到,如果是2倍扩节点,那么读不到的就更多了
虚拟槽分区
RedisCluster最终使用的方案
这种方案中,有16384个虚拟槽(0~16383),数据通过哈希算法分到不同的槽中,
把数据分散到槽中的算法是slot=CRC16(key) & 16383
CRC即循环冗余校验码(Cyclic Redundancy Check):是数据通信领域中最常用的一种查错校验码,其特征是信息字段和校验字段的长度可以任意选定,CRC16即使用16位CRC码做校验的算法
RedisCluster只维护一个node与slot的对应关系,而不维护与数据的对应关系,这样就使得node与data解耦了,这样做的好处包括:
如果需要扩容或缩容,只需要重新计算node与slot的对应关系即可,而不需要重算数据分配在哪个slot,因此顶多也就16384条数据变化,这种成本是非常小的
节点自身维护槽的映射关系,不需要客户端或者代理服务维护槽分区 元数据
支持节点、槽、键之间的映射查询,用于数据路由、在线伸缩等场景
但是这样也存在问题:
key批量操作支持有限。如mset、mget,目前只支持具有相同slot值的key执行批量操作。如果批量操作的key分散在不同的slot上面,则无法操作
事务操作只支持同slot操作,例如lua脚本,如果脚本中的key分散在不同slot上,也是操作不了的
一个key只能被分散在一个slot上,因此如果是一个大key,则可能将一个slot占满,导致RedisCluster看起来内存占用不高,但是频发触发OOM或key逐出,可能就是因为单个slot满了
那么如果我就是需要对一些无法分配到相同slot的key做事务操作怎么办?
实际上也是有办法的,那就是在这些mset、mget命令中加hash_tag,通过hash_tag让它们强制得到同一个hash结果
mget user:{10086}:frends user:{10086}:videos
其中方法中的{}就是hash_tag
RedisCluster节点间的通信
在分布式存储中需要提供维护节点元数据信息的机制,所谓元数据是指:节点负责哪些数据,是否出现故障等状态信息。常见的元数据维护方式分为:集中式和P2P方式。Redis集群采用P2P的Gossip(流言)协议
Gossip消息即ping、pong、meet、fail消息,用于查询其他节点状态
请求重定向和smart客户端
在集群模式下,Redis接收任何键相关命令时首先计算键对应的槽,再根据槽找出所对应的节点,如果节点是自身,则处理键命令;否则回复 MOVED重定向错误,通知客户端请求正确的节点。这个过程称为MOVED重 定向
而如果是使用smart客户端,例如Jedis中的JedisCluster,则不需要在开发侧主动处理重定向消息,由Jedis客户端完成内部的重定向流程,这里可以看JedisCluster相关源码
JedisCluster源码分析
JedisCluster的构造函数与槽映射关系的初始化
单纯看JedisCluster好像没啥东西,其实里面的东西都在其父类中
public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout,
int maxAttempts, String password, String clientName,
final GenericObjectPoolConfig<Jedis> poolConfig, boolean ssl,
SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap hostAndPortMap) {
super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, password, clientName,
poolConfig, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMap);
}
跟踪父类
public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout,
int infiniteSoTimeout, int maxAttempts, String user, String password, String clientName,
GenericObjectPoolConfig poolConfig, boolean ssl, SSLSocketFactory sslSocketFactory,
SSLParameters sslParameters, HostnameVerifier hostnameVerifier,
JedisClusterHostAndPortMap hostAndPortMap, Duration maxTotalRetriesDuration) {
this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig,
connectionTimeout, soTimeout, infiniteSoTimeout, user, password, clientName, ssl,
sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMap);
this.maxAttempts = maxAttempts;
this.maxTotalRetriesDuration = maxTotalRetriesDuration;
}
可以看到这里维护了一个成员变量:
protected JedisClusterConnectionHandler connectionHandler;
继续跟踪进去
public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
final JedisClientConfig seedNodesClientConfig,
final GenericObjectPoolConfig<Jedis> poolConfig,
final JedisClientConfig clusterNodesClientConfig) {
this.cache = new JedisClusterInfoCache(poolConfig, clusterNodesClientConfig, nodes);
initializeSlotsCache(nodes, seedNodesClientConfig);
}
分析JedisClusterInfoCache
public class JedisClusterInfoCache {
// 连接池
private final Map<String, JedisPool> nodes = new HashMap<>();
private final Map<Integer, JedisPool> slots = new HashMap<>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
private final Lock rediscoverLock = new ReentrantLock();
private final GenericObjectPoolConfig<Jedis> poolConfig;
private final JedisClientConfig clientConfig;
private final Set<HostAndPort> startNodes;
private static final int MASTER_NODE_INDEX = 2;
……
}
可以看到其针对node和slot分别一对一维护了JedisPool连接池
然后在JedisClusterConnectionHandler的构造函数中执行JedisClusterConnectionHandler#initializeSlotsCache
初始化JedisClusterInfoCache 中的连接池,看下里面的代码:
private void initializeSlotsCache(Set<HostAndPort> startNodes, JedisClientConfig clientConfig) {
ArrayList<HostAndPort> startNodeList = new ArrayList<>(startNodes);
Collections.shuffle(startNodeList);
for (HostAndPort hostAndPort : startNodeList) {
try (Jedis jedis = new Jedis(hostAndPort, clientConfig)) {
cache.discoverClusterNodesAndSlots(jedis);
return;
} catch (JedisConnectionException e) {
// try next nodes
}
}
}
遍历传进来的startNodes列表,即每个Node的连接信息,然后调用JedisClusterInfoCache#discoverClusterNodesAndSlots
跟进看下
// redis.clients.jedis.JedisClusterInfoCache#discoverClusterNodesAndSlots
public void discoverClusterNodesAndSlots(Jedis jedis) {
List<Object> slots = jedis.clusterSlots();
// 加写锁
w.lock();
try {
reset();
for (Object slotInfoObj : slots) {
List<Object> slotInfo = (List<Object>) slotInfoObj;
if (slotInfo.size() <= MASTER_NODE_INDEX) {
continue;
}
List<Integer> slotNums = getAssignedSlotArray(slotInfo);
// hostInfos
int size = slotInfo.size();
for (int i = MASTER_NODE_INDEX; i < size; i++) {
List<Object> hostInfos = (List<Object>) slotInfo.get(i);
if (hostInfos.isEmpty()) {
continue;
}
HostAndPort targetNode = generateHostAndPort(hostInfos);
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
}
}
}
} finally {
w.unlock();
}
}
核心方法
w.lock()
这里是加了一个写锁,通过读写锁降低锁竞争的风险
这里传进来的Jedis是一个node的Jedis连接,获取到其所有的slots,然后遍历这些slots,下面两个核心方法
setupNodeIfNotExist(targetNode);
……
assignSlotsToNode(slotNums, targetNode);
分别初始化了nodes和slots两个map
JedisCluster中命令执行流程与JedisClusterMaxAttemptsException异常的分析
可以以最简单的set命令跟进
// JedisCluster#set(java.lang.String, java.lang.String)
public String set(final String key, final String value) {
return new JedisClusterCommand<String>(connectionHandler, maxAttempts, maxTotalRetriesDuration) {
@Override
public String execute(Jedis connection) {
return connection.set(key, value);
}
}.run(key);
}
这里通过匿名内部类构造了一个JedisClusterCommand的实现,调用其run方法
其中对execute的实现就是执行命令,很简单,可知核心应该在下面run方法中
// JedisClusterCommand#run(java.lang.String)
public T run(String key) {
return runWithRetries(JedisClusterCRC16.getSlot(key));
}
这里可以看到,是基于重试执行的,执行参数是JedisClusterCRC16.getSlot(key)
,即已经通过CRC16算法获取到了对应的slot
核心方法在runWithRetries中
// redis.clients.jedis.JedisClusterCommand#runWithRetries
private T runWithRetries(final int slot) {
Instant deadline = Instant.now().plus(maxTotalRetriesDuration);
JedisRedirectionException redirect = null;
int consecutiveConnectionFailures = 0;
Exception lastException = null;
for (int attemptsLeft = this.maxAttempts; attemptsLeft > 0; attemptsLeft--) {
Jedis connection = null;
try {
if (redirect != null) {
connection = connectionHandler.getConnectionFromNode(redirect.getTargetNode());
if (redirect instanceof JedisAskDataException) {
// TODO: Pipeline asking with the original command to make it faster....
connection.asking();
}
} else {
connection = connectionHandler.getConnectionFromSlot(slot);
}
return execute(connection);
} catch (JedisNoReachableClusterNodeException jnrcne) {
throw jnrcne;
} catch (JedisConnectionException jce) {
lastException = jce;
++consecutiveConnectionFailures;
LOG.debug("Failed connecting to Redis: {}", connection, jce);
// "- 1" because we just did one, but the attemptsLeft counter hasn't been decremented yet
boolean reset = handleConnectionProblem(attemptsLeft - 1, consecutiveConnectionFailures, deadline);
if (reset) {
consecutiveConnectionFailures = 0;
redirect = null;
}
} catch (JedisRedirectionException jre) {
// avoid updating lastException if it is a connection exception
if (lastException == null || lastException instanceof JedisRedirectionException) {
lastException = jre;
}
LOG.debug("Redirected by server to {}", jre.getTargetNode());
consecutiveConnectionFailures = 0;
redirect = jre;
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache recommended by Redis cluster specification
this.connectionHandler.renewSlotCache(connection);
}
} finally {
releaseConnection(connection);
}
if (Instant.now().isAfter(deadline)) {
throw new JedisClusterOperationException("Cluster retry deadline exceeded.");
}
}
JedisClusterMaxAttemptsException maxAttemptsException
= new JedisClusterMaxAttemptsException("No more cluster attempts left.");
maxAttemptsException.addSuppressed(lastException);
throw maxAttemptsException;
}
主体是一个for循环,其中的循环条件attemptsLeft表示重试上限,如果重试上限都到了,还没正确返回,下面就会抛出JedisClusterMaxAttemptsException异常
JedisClusterMaxAttemptsException maxAttemptsException
= new JedisClusterMaxAttemptsException("No more cluster attempts left.");
maxAttemptsException.addSuppressed(lastException);
throw maxAttemptsException;
那么attempsLeft从哪里来呢,可以顺着构造方法往上找,发现是在JedisCluster的父类BinaryJedisCluster中
public static final int DEFAULT_MAX_ATTEMPTS = 5;
即最多重试5次
当然也可以通过构造函数调整
继续看for循环中的内容
Jedis connection = null;
try {
if (redirect != null) {
connection = connectionHandler.getConnectionFromNode(redirect.getTargetNode());
if (redirect instanceof JedisAskDataException) {
// TODO: Pipeline asking with the original command to make it faster....
connection.asking();
}
} else {
connection = connectionHandler.getConnectionFromSlot(slot);
}
return execute(connection);
} catch (JedisNoReachableClusterNodeException jnrcne) {
throw jnrcne;
} catch ...
try主体方法就是基于slot获取对于的连接,跟进看下
// redis.clients.jedis.JedisSlotBasedConnectionHandler#getConnectionFromSlot
public Jedis getConnectionFromSlot(int slot) {
JedisPool connectionPool = cache.getSlotPool(slot);
if (connectionPool != null) {
// It can't guaranteed to get valid connection because of node assignment
return connectionPool.getResource();
} else {
// 如果获取不到,更新slot缓存
renewSlotCache();
connectionPool = cache.getSlotPool(slot);
if (connectionPool != null) {
return connectionPool.getResource();
} else {
// no choice, fallback to new connection to random node
return getConnection();
}
}
}
核心方法在cache.getSlotPool(slot)
,继续看下
// redis.clients.jedis.JedisClusterInfoCache#getSlotPool
public JedisPool getSlotPool(int slot) {
r.lock();
try {
return slots.get(slot);
} finally {
r.unlock();
}
}
对比前面JedisClusterInfoCache#discoverClusterNodesAndSlots
方法,这里获取pool只是拿了读锁,这里也是为了减少锁冲突
回到for循环主体方法,拿到slot对应的connection之后,就是执行execute方法,即外面重写的,很简单
然后就是对异常的处理,其中有些逻辑在里面
首先第一个异常是JedisNoReachableClusterNodeException,该异常直接抛了
catch (JedisNoReachableClusterNodeException jnrcne) {
throw jnrcne;
}
然后是第二个异常JedisConnectionException
} catch (JedisConnectionException jce) {
lastException = jce;
++consecutiveConnectionFailures;
LOG.debug("Failed connecting to Redis: {}", connection, jce);
// "- 1" because we just did one, but the attemptsLeft counter hasn't been decremented yet
boolean reset = handleConnectionProblem(attemptsLeft - 1, consecutiveConnectionFailures, deadline);
if (reset) {
consecutiveConnectionFailures = 0;
redirect = null;
}
} catch ...
这里consecutiveConnectionFailures是一个重试的计数器,每次发现JedisConnectionException异常,自增1次,然后调用handleConnectionProblem方法处理连接异常
private boolean handleConnectionProblem(int attemptsLeft, int consecutiveConnectionFailures, Instant doneDeadline) {
if (this.maxAttempts < 3) {
if (attemptsLeft == 0) {
this.connectionHandler.renewSlotCache();
return true;
}
return false;
}
if (consecutiveConnectionFailures < 2) {
return false;
}
sleep(getBackoffSleepMillis(attemptsLeft, doneDeadline));
this.connectionHandler.renewSlotCache();
return true;
}
该方法的入参是attemptsLeft,在外侧传入的是attemptsLeft-1,即如果已经重试到了倒数第二次还发现该异常,则通过this.connectionHandler.renewSlotCache()
更新槽的缓存,跟进renewSlotCache方法,可以看到底层本质是调用的是JedisClusterInfoCache#discoverClusterNodesAndSlots
方法,即要申请写锁,就会阻塞所有读操作
如果没有到倒数第二次,则看consecutiveConnectionFailures,重试小于2次则不刷新缓存
这样做本质上都是为了减少slot缓存的刷新频率,因为那边刷新要加写锁,就会阻塞所有线程的读操作,在不确定连接异常是否是slot导致的时候,不需要反复刷新缓存
第三个异常是JedisRedirectionException重定向异常,即收到了MOVE返回字符
} catch (JedisRedirectionException jre) {
// avoid updating lastException if it is a connection exception
if (lastException == null || lastException instanceof JedisRedirectionException) {
lastException = jre;
}
LOG.debug("Redirected by server to {}", jre.getTargetNode());
consecutiveConnectionFailures = 0;
redirect = jre;
// if MOVED redirection occurred,
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache recommended by Redis cluster specification
this.connectionHandler.renewSlotCache(connection);
}
} finally {
releaseConnection(connection);
}
当触发这个异常,就一定要刷缓存了
最后不管成功失败,都释放连接
还记得前面重试到达上限抛出JedisClusterMaxAttemptsException异常,且异常信息非常模糊,但是看了代码可知,能让RedisCluster重试到上限的可能性非常多,例如:节点宕机或请求超时都会抛出JedisConnectionException,导致反复重试到上限
而JedisConnectionException抛出的场景也很多:
Jedis连接节点发生socket错误
所有命令、lua脚本超时
JedisPool连接池获取可用Jedis对象超时
这也是RedisCluster多次重试后才更新slot缓存的原因,因为例如连接池不足这种问题是完全不需要走更新操作去修复的,频繁的更新就会导致频发加w锁,从而阻塞所有线程,即发生slot风暴
基于hashtag实现RedisCluster的批量操作能力
看一个案例:
// hashtag
String hastag = "{user}";
// 用户A的关注表
String userAFollowKey = hastag + ":a:follow";
// 用户B的粉丝表
String userBFanKey = hastag + ":b:fans";
// 计算hashtag对应的slot
int slot = JedisClusterCRC16.getSlot(hastag);
// 获取指定slot的JedisPool
JedisPool jedisPool = jedisCluster.getConnectionHandler().getJedisPoolFromSlot(slot);
// 在当个节点上执行事务
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
// 用户A的关注表加入用户B,用户B的粉丝列表加入用户A
Transaction transaction = jedis.multi();
transaction.sadd(userAFollowKey, "user:b");
transaction.sadd(userBFanKey, "user:a");
transaction.exec();
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
if (jedis!= null)
jedis.close();
}
RedisCluster数据倾斜问题
集群倾斜指不同节点之间数据量和请求量出现明显差异
出现倾斜的原因主要有:
节点和槽分配严重不均匀
不同槽对应键数量差异过大
集合对象包含大量元素
内存相关配置不一致
评论区