目 录CONTENT

文章目录

Jedis

FatFish1
2024-12-28 / 0 评论 / 0 点赞 / 83 阅读 / 0 字 / 正在检测是否收录...

Jedis是基于java开发的连接Redis服务器的包,基本用法包括Jedis连接、JedisPool、Pipeline等

Jedis - 单连接用法

先看一个使用案例:

public class LocalRedisTest {
    @Test
    public void executeJedisCommand() {
        String SCRIPT = "if (redis.call('GET', KEYS[1]) == ARGV[1]) \n" +
                "then redis.call('EXPIRE', KEYS[1], ARGV[2]); return 1; end; return 0;";
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        jedis.auth("mypassword");
        // 使用两次get的场景
        String s = jedis.get("whereismyvalue");
        System.out.println(s);
        jedis.close();
    }
}

Jedis

案例中首先构造了一个Jedis实例,看下Jedis里面有啥

核心成员变量

public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
    AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {
  // socket的封装
  protected final Client client;
  ……
  // redis的pipeline能力
  protected Pipeline pipeline = null;
  ……

Jedis继承自BinaryJedis,里面有一个Client属性,继承Connection接口,继续往下翻

public class Connection implements Closeable {
  ……
  // 套接字实例
  private Socket socket;
  ……
  // 套接字读超时的默认值
  private int soTimeout = Protocol.DEFAULT_TIMEOUT;
  ……
  // 一个连接是否还存在的标志位
  private boolean broken = false;

可以看到Jedis最底层封装的Connection是一个基于Socket开发的连接实例,底层也是基于tcp/ip协议

构造函数

Jedis jedis = new Jedis("127.0.0.1", 6379);

基于这个简单的构造函数看

public Jedis(final String host, final int port) {
  super(host, port);
}

public BinaryJedis(final String host, final int port) {
  client = new Client(host, port);
}

实际上是在初始化里面的client类,继续向下翻

public Connection(final String host, final int port) {
  this(new HostAndPort(host, port), DefaultJedisClientConfig.builder().build());
}

这里因为没有主动传入JedisClinetConfig,因此配置都是使用默认的,其中有几个配置是比较核心的:

private int connectionTimeoutMillis = Protocol.DEFAULT_TIMEOUT;
// public static final int DEFAULT_TIMEOUT = 2000;

即连接超时时间是2s

继续看Connection的构造

public Connection(final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
  this(new DefaultJedisSocketFactory(hostAndPort, clientConfig));
  this.soTimeout = clientConfig.getSocketTimeoutMillis();
  this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
}

这里主要是配置的soTimeout这个属性,但是这里还没有用,因为socket还没开始建立连接

auth

jedis.auth("mypassword");

紧接着调用了Jedis#auth方法

public String auth(final String password) {
  checkIsInMultiOrPipeline();
  client.auth(password);
  return client.getStatusCodeReply();
}
public void auth(final String password) {
  setPassword(password);
  sendCommand(AUTH, password);
}

这里的是Connection#sendCommand 以发送命令的形式建立连接,实际上Jedis提供的get、set等方法都是通过此方法提交命令的

再往下看可以看到Connection#sendCommand方法的逻辑

Connection#sendCommand

public void sendCommand(final ProtocolCommand cmd, final byte[]... args) {
  try {
    // 这里开始建立连接了
    connect();
    Protocol.sendCommand(outputStream, cmd, args);
  } catch (JedisConnectionException ex) {
    ……
    // 当发现有JedisConnectionException,设置broken标志位为true
    broken = true;
    throw ex;
  }
}

继续看Connection#connect方法

Connection#connect

public void connect() throws JedisConnectionException {
  ……
  if (!isConnected()) {
    try {
      socket = socketFactory.createSocket();
      outputStream = new RedisOutputStream(socket.getOutputStream());
      inputStream = new RedisInputStream(socket.getInputStream());
    } catch (IOException ioe) {
      broken = true;
      throw new JedisConnectionException("Failed to create input/output stream", ioe);
    } catch (JedisConnectionException jce) {
      broken = true;
      throw jce;
    } finally {
      if (broken) {
        IOUtils.closeQuietly(socket);
      }
    }
  }
}

初始化socket、io对象,在这里如果发现了IOException,就会设置broken标志位,并且抛出JedisConnectionException,如果发现了JedisConnectionException,外面会设置broken标志位

继续向下看

DefaultJedisSocketFactory#createSocket

public Socket createSocket() throws JedisConnectionException {
  Socket socket = null;
  try {
    socket = new Socket();
    ……
    HostAndPort hostAndPort = getSocketHostAndPort();
    socket.connect(new InetSocketAddress(hostAndPort.getHost(), hostAndPort.getPort()), getConnectionTimeout());
    socket.setSoTimeout(getSoTimeout());
    if (ssl) {
      ……
      HostnameVerifier hostnameVerifier = getHostnameVerifier();
      if (null != hostnameVerifier
          && !hostnameVerifier.verify(hostAndPort.getHost(), ((SSLSocket) socket).getSession())) {
        String message = String.format(
          "The connection to '%s' failed ssl/tls hostname verification.", hostAndPort.getHost());
        throw new JedisConnectionException(message);
      }
    }
    return socket;
  } catch (IOException ex) {
    IOUtils.closeQuietly(socket);
    throw new JedisConnectionException("Failed to create socket.", ex);
  }
}

这里就已经开始创建socket握手了,其中关注几个点:

  • socket.setSoTimeout(getSoTimeout()) :这里把socket超时设置上了,如果没主动传,这里刚刚看到是2s,这个超时设置是为了避免socket持续阻塞的,如果到超时时间还没有执行完,就会自动把自己中断掉。可以参考socket部分

  • 使用HostnameVerifier#verify进行连接验证,在握手期间,如果URL的主机名和服务器的标识主机名不匹配,则验证机制可以回调此接口实现程序来确定是否应该允许此连接,如果回调内实现不恰当,默认接受所有域名,则有安全风险

在这里如果验证失败了,就会抛出JedisConnectionException给外面设置标志位

get

String s = jedis.get("whereismyvalue");

get方法就到jedis提供的方法了

public void get(final byte[] key) {
  sendCommand(GET, key);
}

可见也是走的Connection#sendCommand 方法,这里就可以不再重看了

close

jedis.close();
public void close() {
  if (dataSource != null) {
    JedisPoolAbstract pool = this.dataSource;
    this.dataSource = null;
    if (isBroken()) {
      pool.returnBrokenResource(this);
    } else {
      pool.returnResource(this);
    }
  } else {
    super.close();
  }
}

首先判断dataSource属性,这里的dataSource在字面意义上有点歧义,它其实不是数据源,而是这个Jedis单连接归属哪个JedisPool的一个引用存储,即:

  • 如果保有JedisPool的引用,则走池化的归还逻辑,这里还会根据broken标志位判断是真的归还进池子,还是直接销毁

  • 如果没保有JedisPool的引用,说明这里不是池化的,直接调用父类的close

看下父类的close方法的调用链路

public void close() {
  db = 0;
  super.close();
}

这里调用到父类的close方法,是Connection#close方法

public void close() {
  disconnect();
}

Connection#disconnect

public void disconnect() {
  if (isConnected()) {
    try {
      outputStream.flush();
      socket.close();
    } catch (IOException ex) {
      broken = true;
      throw new JedisConnectionException(ex);
    } finally {
      IOUtils.closeQuietly(socket);
    }
  }
}

这里做的操作包括:情况outputStream、关闭socket,如果这过程中出现了IOException,设置标志位broken为true

JedisPool - Jedis池化用法

JedisPool

核心成员变量

JedisPool是jedis的池化,基于apache.pool2实现,是GenericObjectPool的典型实现,其中的成员变量就可以看出来

protected GenericObjectPool<T> internalPool;

继承自父类Pool类

构造函数

public JedisPool(final GenericObjectPoolConfig<Jedis> poolConfig, final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
  this(poolConfig, new JedisFactory(hostAndPort, clientConfig));
}

这个构造函数传入的内容包括Jedis的对象池设置,ip端口、jedis的连接设置,其中poolConfig用来构造池对象设置,剩下两个用来构造JedisFactory

调用的父类构造函数一路向上,可以找到Pool类中的构造函数

public Pool(final GenericObjectPoolConfig<T> poolConfig, PooledObjectFactory<T> factory) {
  initPool(poolConfig, factory);
}

调用initPool方法

if (this.internalPool != null) {
  ……
    closeInternalPool();
  ……
this.internalPool = new GenericObjectPool<>(factory, poolConfig);

初始化internalPool这个属性,即如果内部池非空,先关闭当前池子,然后重新构造一个对象池

getResource

public Jedis getResource() {
  Jedis jedis = super.getResource();
  jedis.setDataSource(this);
  return jedis;

}

首先调用父类的getResource方法,获取一个Jedis单连接实例,然后给这个单连接配置上DataSource属性,属性值是自己,实际上是为了让Jedis单连接实例保有一个归属于哪个Pool的记录

这里乍一看不好理解,但是结合前面的Jedis#close方法就好理解了

父类的getResource方法如下:

public T getResource() {
  try {
    return internalPool.borrowObject();
  } catch ……
}

核心就是这么一行,调用的GenericObjectPool#borrowObject方法,补链接到对象池

returnResoruce

public void returnResource(final Jedis resource) {
  if (resource != null) {
    try {
      resource.resetState();
      returnResourceObject(resource);
    } catch (RuntimeException e) {
      returnBrokenResource(resource);
      log.warn("Resource is returned to the pool as broken", e);
    }
  }
}

jedisPool的归还方法,这里结合前面Jedis#close方法看,就比较清晰了

这是一个正常的返回方法,因为调用到returnResourceObject

protected void returnResourceObject(final T resource) {
  try {
    internalPool.returnObject(resource);
  } catch (RuntimeException e) {
    throw new JedisException("Could not return the resource to the pool", e);
  }
}

看得出来是GenericObjectPool#returnObject方法,经过一些列校验就可以把Jedis单连接返回池子里面了

returnBrokenResourceObject

这是一个不正常的返回方法,因为上面入口是当Jedis的broken标志位为false的场景

protected void returnBrokenResourceObject(final T resource) {
  try {
    internalPool.invalidateObject(resource);
  } catch (Exception e) {
    throw new JedisException("Could not return the broken resource to the pool", e);
  }
}

这里调用的是GenericObjectPool#invalidateObject 对对象进行销毁

Pipeline - Jedis的管道用法

redis管道用法的Jedis实现

看一个案例:

Jedis jedis = new Jedis("127.0.0.1");
// 1)生成pipeline对象
Pipeline pipeline = jedis.pipelined();
// 2)pipeline执行命令,注意此时命令并未真正执行
for (String key : keys) {
    pipeline.del(key);
}
// 3)执行命令
pipeline.sync();

但是要注意的是,Pipeline只是打包,并不是将这些命令封装成原子操作!

Lua脚本

通过Jedis端提交lua脚本,可以将多个操作封装成一个原子操作执行,先看api:

// Jedis#eval(java.lang.String, java.util.List<java.lang.String>, java.util.List<java.lang.String>)
public Object eval(final String script, final List<String> keys, final List<String> args) {
  return eval(script, keys.size(), getParams(keys, args));
}

可以看到eval方法的三个参数:

  • 参数1:script,即lua脚本

  • 参数2:keys,即存储lua脚本使用到的keys

  • 参数3:args,即存储lua脚本使用到的参数

假设要开发一个心跳锁的逻辑:如果key对应的value是预期的value,则给该key续期

对于这个逻辑,如果分开写,get、expire,有可能在expire之前,key被别人修改了,使用lua:

private final static String SCRIPT = "if (redis.call('GET', KEYS[1]) == ARGV[1]) \n" +
        "then redis.call('EXPIRE', KEYS[1], ARGV[2]); return 1; end; return 0;";

这里注意,使用end表示if成立的状态后面执行的条件分支结束

然后使用jedis提交命令:

long eval = (long) jedis.eval(SCRIPT, ImmutableList.of("lockKey"), ImmutableList.of("pod1", "300"));

因为lua中是return数字,而数字在jedis是以long形式返回的,因此这里直接用long承接lua脚本结果即可,通过判断是0还是1就可以识别lua走了哪个分支

要注意下:redis是从1开始的,因此没有KEYS[0]这样的写法

Redis哨兵

http://www.chymfatfish.cn/archives/redissentinel#%E5%93%A8%E5%85%B5%E6%9E%B6%E6%9E%84%E7%9A%84java-api%E5%AE%9E%E7%8E%B0

0

评论区