Jedis是基于java开发的连接Redis服务器的包,基本用法包括Jedis连接、JedisPool、Pipeline等
Jedis - 单连接用法
先看一个使用案例:
public class LocalRedisTest {
@Test
public void executeJedisCommand() {
String SCRIPT = "if (redis.call('GET', KEYS[1]) == ARGV[1]) \n" +
"then redis.call('EXPIRE', KEYS[1], ARGV[2]); return 1; end; return 0;";
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.auth("mypassword");
// 使用两次get的场景
String s = jedis.get("whereismyvalue");
System.out.println(s);
jedis.close();
}
}
Jedis
案例中首先构造了一个Jedis实例,看下Jedis里面有啥
核心成员变量
public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {
// socket的封装
protected final Client client;
……
// redis的pipeline能力
protected Pipeline pipeline = null;
……
Jedis继承自BinaryJedis,里面有一个Client属性,继承Connection接口,继续往下翻
public class Connection implements Closeable {
……
// 套接字实例
private Socket socket;
……
// 套接字读超时的默认值
private int soTimeout = Protocol.DEFAULT_TIMEOUT;
……
// 一个连接是否还存在的标志位
private boolean broken = false;
可以看到Jedis最底层封装的Connection是一个基于Socket开发的连接实例,底层也是基于tcp/ip协议
构造函数
Jedis jedis = new Jedis("127.0.0.1", 6379);
基于这个简单的构造函数看
public Jedis(final String host, final int port) {
super(host, port);
}
public BinaryJedis(final String host, final int port) {
client = new Client(host, port);
}
实际上是在初始化里面的client类,继续向下翻
public Connection(final String host, final int port) {
this(new HostAndPort(host, port), DefaultJedisClientConfig.builder().build());
}
这里因为没有主动传入JedisClinetConfig,因此配置都是使用默认的,其中有几个配置是比较核心的:
private int connectionTimeoutMillis = Protocol.DEFAULT_TIMEOUT;
// public static final int DEFAULT_TIMEOUT = 2000;
即连接超时时间是2s
继续看Connection的构造
public Connection(final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
this(new DefaultJedisSocketFactory(hostAndPort, clientConfig));
this.soTimeout = clientConfig.getSocketTimeoutMillis();
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
}
这里主要是配置的soTimeout这个属性,但是这里还没有用,因为socket还没开始建立连接
auth
jedis.auth("mypassword");
紧接着调用了Jedis#auth
方法
public String auth(final String password) {
checkIsInMultiOrPipeline();
client.auth(password);
return client.getStatusCodeReply();
}
public void auth(final String password) {
setPassword(password);
sendCommand(AUTH, password);
}
这里的是Connection#sendCommand
以发送命令的形式建立连接,实际上Jedis提供的get、set等方法都是通过此方法提交命令的
再往下看可以看到Connection#sendCommand
方法的逻辑
Connection#sendCommand
public void sendCommand(final ProtocolCommand cmd, final byte[]... args) {
try {
// 这里开始建立连接了
connect();
Protocol.sendCommand(outputStream, cmd, args);
} catch (JedisConnectionException ex) {
……
// 当发现有JedisConnectionException,设置broken标志位为true
broken = true;
throw ex;
}
}
继续看Connection#connect
方法
Connection#connect
public void connect() throws JedisConnectionException {
……
if (!isConnected()) {
try {
socket = socketFactory.createSocket();
outputStream = new RedisOutputStream(socket.getOutputStream());
inputStream = new RedisInputStream(socket.getInputStream());
} catch (IOException ioe) {
broken = true;
throw new JedisConnectionException("Failed to create input/output stream", ioe);
} catch (JedisConnectionException jce) {
broken = true;
throw jce;
} finally {
if (broken) {
IOUtils.closeQuietly(socket);
}
}
}
}
初始化socket、io对象,在这里如果发现了IOException,就会设置broken标志位,并且抛出JedisConnectionException,如果发现了JedisConnectionException,外面会设置broken标志位
继续向下看
DefaultJedisSocketFactory#createSocket
public Socket createSocket() throws JedisConnectionException {
Socket socket = null;
try {
socket = new Socket();
……
HostAndPort hostAndPort = getSocketHostAndPort();
socket.connect(new InetSocketAddress(hostAndPort.getHost(), hostAndPort.getPort()), getConnectionTimeout());
socket.setSoTimeout(getSoTimeout());
if (ssl) {
……
HostnameVerifier hostnameVerifier = getHostnameVerifier();
if (null != hostnameVerifier
&& !hostnameVerifier.verify(hostAndPort.getHost(), ((SSLSocket) socket).getSession())) {
String message = String.format(
"The connection to '%s' failed ssl/tls hostname verification.", hostAndPort.getHost());
throw new JedisConnectionException(message);
}
}
return socket;
} catch (IOException ex) {
IOUtils.closeQuietly(socket);
throw new JedisConnectionException("Failed to create socket.", ex);
}
}
这里就已经开始创建socket握手了,其中关注几个点:
socket.setSoTimeout(getSoTimeout())
:这里把socket超时设置上了,如果没主动传,这里刚刚看到是2s,这个超时设置是为了避免socket持续阻塞的,如果到超时时间还没有执行完,就会自动把自己中断掉。可以参考socket部分使用
HostnameVerifier#verify
进行连接验证,在握手期间,如果URL的主机名和服务器的标识主机名不匹配,则验证机制可以回调此接口实现程序来确定是否应该允许此连接,如果回调内实现不恰当,默认接受所有域名,则有安全风险
在这里如果验证失败了,就会抛出JedisConnectionException给外面设置标志位
get
String s = jedis.get("whereismyvalue");
get方法就到jedis提供的方法了
public void get(final byte[] key) {
sendCommand(GET, key);
}
可见也是走的Connection#sendCommand
方法,这里就可以不再重看了
close
jedis.close();
public void close() {
if (dataSource != null) {
JedisPoolAbstract pool = this.dataSource;
this.dataSource = null;
if (isBroken()) {
pool.returnBrokenResource(this);
} else {
pool.returnResource(this);
}
} else {
super.close();
}
}
首先判断dataSource属性,这里的dataSource在字面意义上有点歧义,它其实不是数据源,而是这个Jedis单连接归属哪个JedisPool的一个引用存储,即:
如果保有JedisPool的引用,则走池化的归还逻辑,这里还会根据broken标志位判断是真的归还进池子,还是直接销毁
如果没保有JedisPool的引用,说明这里不是池化的,直接调用父类的close
看下父类的close方法的调用链路
public void close() {
db = 0;
super.close();
}
这里调用到父类的close方法,是Connection#close
方法
public void close() {
disconnect();
}
Connection#disconnect
public void disconnect() {
if (isConnected()) {
try {
outputStream.flush();
socket.close();
} catch (IOException ex) {
broken = true;
throw new JedisConnectionException(ex);
} finally {
IOUtils.closeQuietly(socket);
}
}
}
这里做的操作包括:情况outputStream、关闭socket,如果这过程中出现了IOException,设置标志位broken为true
JedisPool - Jedis池化用法
JedisPool
核心成员变量
JedisPool是jedis的池化,基于apache.pool2实现,是GenericObjectPool的典型实现,其中的成员变量就可以看出来
protected GenericObjectPool<T> internalPool;
继承自父类Pool类
构造函数
public JedisPool(final GenericObjectPoolConfig<Jedis> poolConfig, final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
this(poolConfig, new JedisFactory(hostAndPort, clientConfig));
}
这个构造函数传入的内容包括Jedis的对象池设置,ip端口、jedis的连接设置,其中poolConfig用来构造池对象设置,剩下两个用来构造JedisFactory
调用的父类构造函数一路向上,可以找到Pool类中的构造函数
public Pool(final GenericObjectPoolConfig<T> poolConfig, PooledObjectFactory<T> factory) {
initPool(poolConfig, factory);
}
调用initPool方法
if (this.internalPool != null) {
……
closeInternalPool();
……
this.internalPool = new GenericObjectPool<>(factory, poolConfig);
初始化internalPool这个属性,即如果内部池非空,先关闭当前池子,然后重新构造一个对象池
getResource
public Jedis getResource() {
Jedis jedis = super.getResource();
jedis.setDataSource(this);
return jedis;
}
首先调用父类的getResource方法,获取一个Jedis单连接实例,然后给这个单连接配置上DataSource属性,属性值是自己,实际上是为了让Jedis单连接实例保有一个归属于哪个Pool的记录
这里乍一看不好理解,但是结合前面的Jedis#close
方法就好理解了
父类的getResource方法如下:
public T getResource() {
try {
return internalPool.borrowObject();
} catch ……
}
核心就是这么一行,调用的GenericObjectPool#borrowObject
方法,补链接到对象池
returnResoruce
public void returnResource(final Jedis resource) {
if (resource != null) {
try {
resource.resetState();
returnResourceObject(resource);
} catch (RuntimeException e) {
returnBrokenResource(resource);
log.warn("Resource is returned to the pool as broken", e);
}
}
}
jedisPool的归还方法,这里结合前面Jedis#close
方法看,就比较清晰了
这是一个正常的返回方法,因为调用到returnResourceObject
protected void returnResourceObject(final T resource) {
try {
internalPool.returnObject(resource);
} catch (RuntimeException e) {
throw new JedisException("Could not return the resource to the pool", e);
}
}
看得出来是GenericObjectPool#returnObject
方法,经过一些列校验就可以把Jedis单连接返回池子里面了
returnBrokenResourceObject
这是一个不正常的返回方法,因为上面入口是当Jedis的broken标志位为false的场景
protected void returnBrokenResourceObject(final T resource) {
try {
internalPool.invalidateObject(resource);
} catch (Exception e) {
throw new JedisException("Could not return the broken resource to the pool", e);
}
}
这里调用的是GenericObjectPool#invalidateObject
对对象进行销毁
Pipeline - Jedis的管道用法
redis管道用法的Jedis实现
看一个案例:
Jedis jedis = new Jedis("127.0.0.1");
// 1)生成pipeline对象
Pipeline pipeline = jedis.pipelined();
// 2)pipeline执行命令,注意此时命令并未真正执行
for (String key : keys) {
pipeline.del(key);
}
// 3)执行命令
pipeline.sync();
但是要注意的是,Pipeline只是打包,并不是将这些命令封装成原子操作!
Lua脚本
通过Jedis端提交lua脚本,可以将多个操作封装成一个原子操作执行,先看api:
// Jedis#eval(java.lang.String, java.util.List<java.lang.String>, java.util.List<java.lang.String>)
public Object eval(final String script, final List<String> keys, final List<String> args) {
return eval(script, keys.size(), getParams(keys, args));
}
可以看到eval方法的三个参数:
参数1:script,即lua脚本
参数2:keys,即存储lua脚本使用到的keys
参数3:args,即存储lua脚本使用到的参数
假设要开发一个心跳锁的逻辑:如果key对应的value是预期的value,则给该key续期
对于这个逻辑,如果分开写,get、expire,有可能在expire之前,key被别人修改了,使用lua:
private final static String SCRIPT = "if (redis.call('GET', KEYS[1]) == ARGV[1]) \n" +
"then redis.call('EXPIRE', KEYS[1], ARGV[2]); return 1; end; return 0;";
这里注意,使用end表示if成立的状态后面执行的条件分支结束
然后使用jedis提交命令:
long eval = (long) jedis.eval(SCRIPT, ImmutableList.of("lockKey"), ImmutableList.of("pod1", "300"));
因为lua中是return数字,而数字在jedis是以long形式返回的,因此这里直接用long承接lua脚本结果即可,通过判断是0还是1就可以识别lua走了哪个分支
要注意下:redis是从1开始的,因此没有KEYS[0]这样的写法
评论区