Reids客户端Jedis的访问模式(下)

Jedis

Jedis是Redis的Java生态的客户端之一,其他还有Redisson、Lettuce

Jedis客户端支持单机模式、分片模式、集群模式的访问模式

  • 单机模式:创建Jedis对象来操作单节点的Redis,只适用于访问单个Redis节点。
  • 分片模式:创建ShardedJedisPool对象来访问分片模式的多个Redis节点,是Redis没有集群功能之前客户端实现的一个数据分布式方案,本质上是客户端通过一致性哈希来实现数据分布式存储。
  • 集群模式:创建JedisCluster对象来访问集群模式下的多个Redis节点,是Redis3.0引入集群模式后客户端实现的集群访问访问,本质上是通过引入槽(slot)概念以及通过CRC16哈希槽算法来实现数据分布式存储。

Jedis客户端支持单命令和Pipeline方式访问Redis集群,通过Pipeline的方式能够提高集群访问的效率

Jedis集群模式访问

  • Jedis能够实现key和哈希槽的定位的核心机制在于哈希槽和Redis节点的映射,而这个发现过程基于Redis的cluster slot命令。

    关于Redis集群操作的命令:

    Redis通过cluster slots会返回Redis集群的整体状况。

    返回每一个Redis节点的信息包含:

    1. 哈希槽起始编号
    2. 哈希槽结束编号
    3. 哈希槽对应master节点,节点使用IP/Port表示
    4. master节点的第一个副本
    5. master节点的第二个副本
127.0.0.1:30001> cluster slots
1) 1) (integer) 0 // 开始槽位
   2) (integer) 5460 // 结束槽位
   3) 1) "127.0.0.1" // master节点的host
      2) (integer) 30001 // master节点的port
      3) "09dbe9720cda62f7865eabc5fd8857c5d2678366" // 节点的编码
   4) 1) "127.0.0.1" // slave节点的host
      2) (integer) 30004 // slave节点的port
      3) "821d8ca00d7ccf931ed3ffc7e3db0599d2271abf" // 节点的编码
2) 1) (integer) 5461
   2) (integer) 10922
   3) 1) "127.0.0.1"
      2) (integer) 30002
      3) "c9d93d9f2c0c524ff34cc11838c2003d8c29e013"
   4) 1) "127.0.0.1"
      2) (integer) 30005
      3) "faadb3eb99009de4ab72ad6b6ed87634c7ee410f"
3) 1) (integer) 10923
   2) (integer) 16383
   3) 1) "127.0.0.1"
      2) (integer) 30003
      3) "044ec91f325b7595e76dbcb18cc688b6a5b434a1"
   4) 1) "127.0.0.1"
      2) (integer) 30006
      3) "58e6e48d41228013e5d9c1c37c5060693925e97e"
  • 核心的流程包含JedisCluster对象的创建以及通过JedisCluster对象实现Redis的访问。
  • JedisCluster对象的创建核心在于创建JedisClusterInfoCache对象并通过集群发现来建立slot和集群节点的映射关系。
  • JedisCluster对Redis集群的访问在于获取key所在的Redis节点并通过Jedis对象进行访问。
  • JedisCluster的父类BinaryJedisCluster创建了JedisSlotBasedConnectionHandler对象,该对象负责和Redis的集群进行通信。
public class JedisCluster extends BinaryJedisCluster implements JedisClusterCommands,
    MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {
  public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout,
      int maxAttempts, String password, String clientName, final GenericObjectPoolConfig poolConfig,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap hostAndPortMap) {

    // 访问父类BinaryJedisCluster
    super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig,
        ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMap);
  }
}

public class BinaryJedisCluster implements BinaryJedisClusterCommands,
    MultiKeyBinaryJedisClusterCommands, JedisClusterBinaryScriptingCommands, Closeable {
  public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout,
      int maxAttempts, String user, String password, String clientName, GenericObjectPoolConfig poolConfig,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap hostAndPortMap) {

    // 创建JedisSlotBasedConnectionHandler对象
    this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig,
        connectionTimeout, soTimeout, user, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMap);

    this.maxAttempts = maxAttempts;
  }
}
  • JedisSlotBasedConnectionHandler的核心在于创建并初始化JedisClusterInfoCache对象,该对象缓存了Redis集群的信息
  • JedisClusterInfoCache对象的初始化过程通过initializeSlotsCache来完成,主要目的用于实现集群节点和槽位发现。
public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler {
  public JedisSlotBasedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig,
      int connectionTimeout, int soTimeout, String user, String password, String clientName,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap portMap) {

    super(nodes, poolConfig, connectionTimeout, soTimeout, user, password, clientName,
        ssl, sslSocketFactory, sslParameters, hostnameVerifier, portMap);
  }
}

public abstract class JedisClusterConnectionHandler implements Closeable {
  public JedisClusterConnectionHandler(Set<HostAndPort> nodes, final GenericObjectPoolConfig poolConfig,
      int connectionTimeout, int soTimeout, int infiniteSoTimeout, String user, String password, String clientName,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap portMap) {

    // 创建JedisClusterInfoCache对象
    this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, infiniteSoTimeout,
        user, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, portMap);

    // 初始化jedis的Slot信息
    initializeSlotsCache(nodes, connectionTimeout, soTimeout, infiniteSoTimeout,
        user, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier);
  }

  private void initializeSlotsCache(Set<HostAndPort> startNodes,
      int connectionTimeout, int soTimeout, int infiniteSoTimeout, String user, String password, String clientName,
      boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier) {
    for (HostAndPort hostAndPort : startNodes) {

      try (Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,
          soTimeout, infiniteSoTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier)) {

        // 通过discoverClusterNodesAndSlots进行集群发现
        cache.discoverClusterNodesAndSlots(jedis);
        return;
      } catch (JedisConnectionException e) {
      }
    }
  }
}
  • JedisClusterInfoCache的nodes用来保存Redis集群的节点信息,slots用来保存槽位和集群节点的信息。
  • nodes和slots维持的对象都是JedisPool对象,该对象维持了和Redis的连接信息。集群的发现过程由discoverClusterNodesAndSlots来实现,本质是执行Redis的集群发现命令cluster slots实现的。
public class JedisClusterInfoCache {
  // 负责保存redis集群的节点信息
  private final Map<String, JedisPool> nodes = new HashMap<>();
  // 负责保存redis的槽位和redis节点的映射关系
  private final Map<Integer, JedisPool> slots = new HashMap<>();

  // 负责集群的发现逻辑
  public void discoverClusterNodesAndSlots(Jedis jedis) {
    w.lock();

    try {
      reset();
      List<Object> slots = jedis.clusterSlots();

      for (Object slotInfoObj : slots) {
        List<Object> slotInfo = (List<Object>) slotInfoObj;

        if (slotInfo.size() <= MASTER_NODE_INDEX) {
          continue;
        }
        // 获取redis节点对应的槽位信息
        List<Integer> slotNums = getAssignedSlotArray(slotInfo);

        // hostInfos
        int size = slotInfo.size();
        for (int i = MASTER_NODE_INDEX; i < size; i++) {
          List<Object> hostInfos = (List<Object>) slotInfo.get(i);
          if (hostInfos.isEmpty()) {
            continue;
          }

          HostAndPort targetNode = generateHostAndPort(hostInfos);
          // 负责保存redis节点信息
          setupNodeIfNotExist(targetNode);
          if (i == MASTER_NODE_INDEX) {
            // 负责保存槽位和redis节点的映射关系
            assignSlotsToNode(slotNums, targetNode);
          }
        }
      }
    } finally {
      w.unlock();
    }
  }

  public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
    w.lock();
    try {
      JedisPool targetPool = setupNodeIfNotExist(targetNode);
      // 保存槽位和对应的JedisPool对象
      for (Integer slot : targetSlots) {
        slots.put(slot, targetPool);
      }
    } finally {
      w.unlock();
    }
  }

  public JedisPool setupNodeIfNotExist(HostAndPort node) {
    w.lock();
    try {
      // 生产redis节点对应的nodeKey
      String nodeKey = getNodeKey(node);
      JedisPool existingPool = nodes.get(nodeKey);
      if (existingPool != null) return existingPool;
      // 生产redis节点对应的JedisPool
      JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
          connectionTimeout, soTimeout, infiniteSoTimeout, user, password, 0, clientName,
          ssl, sslSocketFactory, sslParameters, hostnameVerifier);
      // 保存redis节点的key和对应的JedisPool对象
      nodes.put(nodeKey, nodePool);
      return nodePool;
    } finally {
      w.unlock();
    }
  }
}
  • 内部internalPool是通过apache common pool来实现的池化
  • JedisPool内部的internalPool通过JedisFactory的makeObject来创建Jedis对象
  • 每个Redis节点都会对应一个JedisPool对象,通过JedisPool来管理Jedis的申请释放复用等
public class JedisPool extends JedisPoolAbstract {

  public JedisPool() {
    this(Protocol.DEFAULT_HOST, Protocol.DEFAULT_PORT);
  }
}

public class JedisPoolAbstract extends Pool<Jedis> {

  public JedisPoolAbstract() {
    super();
  }
}

public abstract class Pool<T> implements Closeable {
  protected GenericObjectPool<T> internalPool;

  public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
    if (this.internalPool != null) {
      try {
        closeInternalPool();
      } catch (Exception e) {
      }
    }
    this.internalPool = new GenericObjectPool<>(factory, poolConfig);
  }
}

class JedisFactory implements PooledObjectFactory<Jedis> {

  @Override
  public PooledObject<Jedis> makeObject() throws Exception {
    // 创建Jedis对象
    final HostAndPort hp = this.hostAndPort.get();
    final Jedis jedis = new Jedis(hp.getHost(), hp.getPort(), connectionTimeout, soTimeout,
        infiniteSoTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);

    try {
      // Jedis对象连接
      jedis.connect();
      if (user != null) {
        jedis.auth(user, password);
      } else if (password != null) {
        jedis.auth(password);
      }
      if (database != 0) {
        jedis.select(database);
      }
      if (clientName != null) {
        jedis.clientSetname(clientName);
      }
    } catch (JedisException je) {
      jedis.close();
      throw je;
    }
    // 将Jedis对象包装成DefaultPooledObject进行返回
    return new DefaultPooledObject<>(jedis);
  }
}
  • 访问过程(方式)

    JedisCluster访问Redis的过程通过JedisClusterCommand来实现重试机制,最终通过Jedis对象来实现访问。从实现的角度来说JedisCluster是在Jedis之上封装了一层,进行集群节点定位以及重试机制等。JedisClusterCommand的run方法核心主要定位Redis的key所在的Redis节点,然后获取与该节点对应的Jedis对象进行访问。在Jedis对象访问异常后,JedisClusterCommand会进行重试操作并按照一定策略执行renewSlotCache方法进行重集群节点重发现动作。

public abstract class JedisClusterCommand<T> {
  public T run(String key) {
    // 针对key进行槽位的计算
    return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
  }

  private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {

    Jedis connection = null;
    try {

      if (redirect != null) {
        connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());
        if (redirect instanceof JedisAskDataException) {
          connection.asking();
        }
      } else {
        if (tryRandomNode) {
          connection = connectionHandler.getConnection();
        } else {
          // 根据slot去获取Jedis对象
          connection = connectionHandler.getConnectionFromSlot(slot);
        }
      }
      // 执行真正的Redis的命令
      return execute(connection);
    } catch (JedisNoReachableClusterNodeException jnrcne) {
      throw jnrcne;
    } catch (JedisConnectionException jce) {

      releaseConnection(connection);
      connection = null;

      if (attempts <= 1) {
        // 保证最后两次机会去重新刷新槽位和节点的对应的信息
        this.connectionHandler.renewSlotCache();
      }
      // 按照重试次数进行重试操作
      return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);
    } catch (JedisRedirectionException jre) {
      // 针对返回Move命令立即触发重新刷新槽位和节点的对应信息
      if (jre instanceof JedisMovedDataException) {
        // it rebuilds cluster's slot cache recommended by Redis cluster specification
        this.connectionHandler.renewSlotCache(connection);
      }

      releaseConnection(connection);
      connection = null;

      return runWithRetries(slot, attempts - 1, false, jre);
    } finally {
      releaseConnection(connection);
    }
  }
}
  • 以set命令为例,整个访问通过JedisClusterCommand实现如下:
    • 计算key所在的Redis节点。
    • 获取Redis节点对应的Jedis对象。
    • 通过Jedis对象进行set操作。
public class JedisCluster extends BinaryJedisCluster implements JedisClusterCommands,
    MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {

  @Override
  public String set(final String key, final String value, final SetParams params) {
    return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
      @Override
      public String execute(Jedis connection) {
        return connection.set(key, value, params);
      }
    }.run(key);
  }
}
  • JedisSlotBasedConnectionHandler的cache对象维持了slot和node的映射关系,通过getConnectionFromSlot方法来获取该slot对应的Jedis对象。
public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler {

  protected final JedisClusterInfoCache cache;

  @Override
  public Jedis getConnectionFromSlot(int slot) {
    // 获取槽位对应的JedisPool对象
    JedisPool connectionPool = cache.getSlotPool(slot);
    if (connectionPool != null) {
      // 从JedisPool对象中获取Jedis对象
      return connectionPool.getResource();
    } else {
      // 获取失败就重新刷新槽位信息
      renewSlotCache();
      connectionPool = cache.getSlotPool(slot);
      if (connectionPool != null) {
        return connectionPool.getResource();
      } else {
        //no choice, fallback to new connection to random node
        return getConnection();
      }
    }
  }
}

参考文档

点赞

发表回复

电子邮件地址不会被公开。必填项已用 * 标注