哨兵的基本概念
哨兵架构Redis Sentinel是对Redis主从架构的一种升级版本
主从架构如果主节点出了问题,可能需要运维人员手工晋升新的主节点
哨兵架构则是具备自动故障发现和故障转移能力,真正实现高可用,当哨兵架构中主节点故障,从节点将会通过一些算法选举出新的主节点,并且能够对外发布谁是新的主节点
哨兵架构的java-api实现
Jedis包下面有一个JedisSentinelPool是专门针对哨兵架构的redis连接池
可以分析下其中部分源码:
构造方法
public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
final GenericObjectPoolConfig<Jedis> poolConfig, final JedisFactory factory,
final JedisClientConfig sentinelClientConfig) {
super(poolConfig, factory);
this.poolConfig = poolConfig;
this.factory = factory;
this.sentinelClientConfig = sentinelClientConfig;
HostAndPort master = initSentinels(sentinels, masterName);
initMaster(master);
}
可以看到构造哨兵线程池时主要是initSentinels方法生效
initSentinels
private HostAndPort initSentinels(Set<HostAndPort> sentinels, final String masterName) {
HostAndPort master = null;
boolean sentinelAvailable = false;
log.info("Trying to find master from available Sentinels...");
for (HostAndPort sentinel : sentinels) {
log.debug("Connecting to Sentinel {}", sentinel);
try (Jedis jedis = new Jedis(sentinel, sentinelClientConfig)) {
// 调用Jedis类下面的方法,获取当前主节点的地址
List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
// connected to sentinel...
sentinelAvailable = true;
if (masterAddr == null || masterAddr.size() != 2) {
log.warn("Can not get master addr, master name: {}. Sentinel: {}", masterName, sentinel);
continue;
}
master = toHostAndPort(masterAddr);
log.debug("Found Redis master at {}", master);
break;
} catch (JedisException e) {
// resolves #1036, it should handle JedisException there's another chance
// of raising JedisDataException
log.warn(
"Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one.",
sentinel, e);
}
}
第一部分是获取主节点地址
这里直接调用Jedis#sentinelGetMasterAddrByName
拿得到的就是当前主节点信息,不需要额外判断,因为其调用Redis中的过sentinel get-master-addr-by-name master-name
这个api,这个api是可以在故障期间提供新的可用主节点地址的
此外,这里连接池中的sentinels是一组ip、port,实际上是redis哨兵架构中所有主从的ip,即任何一个主从ip理应能获取到主节点
for (HostAndPort sentinel : sentinels) {
MasterListener masterListener = new MasterListener(masterName, sentinel.getHost(), sentinel.getPort());
// whether MasterListener threads are alive or not, process can be stopped
masterListener.setDaemon(true);
masterListeners.add(masterListener);
masterListener.start();
}
return master;
第二部分是遍历每个哨兵节点,为每个节点单独启动了一个线程,利用redis的发布订阅功能监听当前redis哨兵节点中如果发生故障,新的主节点晋升后发布的消息,从而替换当前Jedis中指向的主节点的ip、port
MasterListener
protected class MasterListener extends Thread {
……
public MasterListener(String masterName, String host, int port) {
super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port));
this.masterName = masterName;
this.host = host;
this.port = port;
}
……
}
可以看到MasterListener是实现自Thread类,那么看一下它的run方法里面到底写了什么
MasterListener#run
public void run() {
running.set(true);
while (running.get()) {
try {
……
final HostAndPort hostPort = new HostAndPort(host, port);
j = new Jedis(hostPort, sentinelClientConfig);
// code for active refresh
List<String> masterAddr = j.sentinelGetMasterAddrByName(masterName);
if (masterAddr == null || masterAddr.size() != 2) {
log.warn("Can not get master addr, master name: {}. Sentinel: {}.", masterName,
hostPort);
} else {
initMaster(toHostAndPort(masterAddr));
}
j.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
log.debug("Sentinel {} published: {}.", hostPort, message);
String[] switchMasterMsg = message.split(" ");
if (switchMasterMsg.length > 3) {
if (masterName.equals(switchMasterMsg[0])) {
initMaster(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));
} else {
log.debug(
"Ignoring message on +switch-master for master name {}, our master name is {}",
switchMasterMsg[0], masterName);
}
} else {
log.error("Invalid message received on Sentinel {} on channel +switch-master: {}",
hostPort, message);
}
}
}, "+switch-master");
} catch (JedisException e) {
……
} finally {
if (j != null) {
j.close();
}
}
}
}
可以看到上面还是使用List<String> masterAddr = j.sentinelGetMasterAddrByName(masterName);
获取当前的主节点
初始化完成后,指向Jedis#subscribe
进行监听
通过masterName.equals(switchMasterMsg[0])
判断当前Jedis连接中持有的主节点信息是不是发布过来消息中的主节点,如果不是,完成主节点更新
评论区