Reids客户端Jedis的访问模式(上)

Jedis

Jedis是Redis的Java生态的客户端之一,其他还有Redisson、Lettuce

Jedis客户端支持单机模式、分片模式、集群模式的访问模式

  • 单机模式:创建Jedis对象来操作单节点的Redis,只适用于访问单个Redis节点。
  • 分片模式:创建ShardedJedisPool对象来访问分片模式的多个Redis节点,是Redis没有集群功能之前客户端实现的一个数据分布式方案,本质上是客户端通过一致性哈希来实现数据分布式存储。
  • 集群模式:创建JedisCluster对象来访问集群模式下的多个Redis节点,是Redis3.0引入集群模式后客户端实现的集群访问访问,本质上是通过引入槽(slot)概念以及通过CRC16哈希槽算法来实现数据分布式存储。

Jedis客户端支持单命令和Pipeline方式访问Redis集群,通过Pipeline的方式能够提高集群访问的效率

Jedis单机模式的访问

  • Jedis的创建过程核心在于创建Jedis对象以及Jedis内部变量Client对象
  • Jedis访问Redis的过程在于通过Jedis内部的Client对象访问Redis
  • Jedis继承自BinaryJedis类,在BinaryJedis类中存在和Redis对接的Client类对象,Jedis通过父类的BinaryJedis的Client对象实现Redis的读写。
public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
    AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands, ModuleCommands {

  protected JedisPoolAbstract dataSource = null;

  public Jedis(final String host, final int port) {
    // 创建父类BinaryJedis对象
    super(host, port);
  }
}

public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
    AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {

  // 访问redis的Client对象
  protected Client client = null;

  public BinaryJedis(final String host, final int port) {
    // 创建Client对象访问redis
    client = new Client(host, port);
  }
}
  • Client对象继承自BinaryClient和Connection类,在BinaryClient类中存在Redis访问密码等相关参数,在Connection类在存在访问Redis的socket对象以及对应的输入输出流。本质上Connection是和Redis进行通信的核心类。Client类在创建过程中初始化核心父类Connection对象,而Connection是负责和Redis直接进行通信。
public class Client extends BinaryClient implements Commands {
  public Client(final String host, final int port) {
    super(host, port);
  }
}

public class BinaryClient extends Connection {
  // 存储和Redis连接的相关信息
  private boolean isInMulti;
  private String user;
  private String password;
  private int db;
  private boolean isInWatch;

  public BinaryClient(final String host, final int port) {
    super(host, port);
  }
}

public class Connection implements Closeable {
  // 管理和Redis连接的socket信息及对应的输入输出流
  private JedisSocketFactory jedisSocketFactory;
  private Socket socket;
  private RedisOutputStream outputStream;
  private RedisInputStream inputStream;
  private int infiniteSoTimeout = 0;
  private boolean broken = false;

  public Connection(final String host, final int port, final boolean ssl,
      SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier) {
    // 构建DefaultJedisSocketFactory来创建和Redis连接的Socket对象
    this(new DefaultJedisSocketFactory(host, port, Protocol.DEFAULT_TIMEOUT,
        Protocol.DEFAULT_TIMEOUT, ssl, sslSocketFactory, sslParameters, hostnameVerifier));
  }
}
  • 访问过程(方式):

    Jedis的set操作是通过Client的set操作来实现的,Client的set操作是通过父类Connection的sendCommand来实现。

public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
    AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands, ModuleCommands {
  @Override
  public String set(final String key, final String value) {
    checkIsInMultiOrPipeline();
    // client执行set操作
    client.set(key, value);
    return client.getStatusCodeReply();
  }
}

public class Client extends BinaryClient implements Commands {
  @Override
  public void set(final String key, final String value) {
    // 执行set命令
    set(SafeEncoder.encode(key), SafeEncoder.encode(value));
  }
}

public class BinaryClient extends Connection {
  public void set(final byte[] key, final byte[] value) {
    // 发送set指令
    sendCommand(SET, key, value);
  }
}

public class Connection implements Closeable {
  public void sendCommand(final ProtocolCommand cmd, final byte[]... args) {
    try {
      // socket连接redis
      connect();
      // 按照redis的协议发送命令
      Protocol.sendCommand(outputStream, cmd, args);
    } catch (JedisConnectionException ex) {
    }
  }
}

Jedis分片模式访问

  • 本质上Redis的分片模式跟Redis本身没有任何关系,只是通过客户端来解决单节点数据有限存储的问题。
  • ShardedJedis访问Redis的核心在于构建对象的时候初始化一致性Hash对象,构建一致性Hash经典的Hash值和node的映射关系。构建完映射关系后执行set等操作就是Hash值到node的寻址过程,寻址完成后直接进行单节点的操作。
  • ShardedJedis的创建过程在于父类的Sharded中关于一致性Hash相关的初始化过程,核心在于构建一致性的虚拟节点以及虚拟节点和Redis节点的映射关系。
  • 源码中最核心的部分代码在于根据根据权重映射成未160个虚拟节点,通过虚拟节点来定位到具体的Redis节点。
public class Sharded<R, S extends ShardInfo<R>> {

  public static final int DEFAULT_WEIGHT = 1;
  // 保存虚拟节点和redis的node节点的映射关系
  private TreeMap<Long, S> nodes;
  // hash算法
  private final Hashing algo;
  // 保存redis节点和访问该节点的Jedis的连接信息
  private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<>();

  public Sharded(List<S> shards, Hashing algo) {
    this.algo = algo;
    initialize(shards);
  }

  private void initialize(List<S> shards) {
    nodes = new TreeMap<>();
    // 遍历每个redis的节点并设置hash值到节点的映射关系
    for (int i = 0; i != shards.size(); ++i) {
      final S shardInfo = shards.get(i);
      // 根据权重映射成未160个虚拟节点
      int N =  160 * shardInfo.getWeight();
      if (shardInfo.getName() == null) for (int n = 0; n < N; n++) {
        // 构建hash值和节点映射关系
        nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
      }
      else for (int n = 0; n < N; n++) {
        nodes.put(this.algo.hash(shardInfo.getName() + "*" + n), shardInfo);
      }
      // 保存每个节点的访问对象
      resources.put(shardInfo, shardInfo.createResource());
    }
  }
}
  • 访问过程(方式)

    ShardedJedis的访问过程就是一致性Hash的计算过程,核心的逻辑就是:通过Hash算法对访问的key进行Hash计算生成Hash值,根据Hash值获取对应Redis节点,根据对应的Redis节点获取对应的访问对象Jedis。获取访问对象Jedis之后就可以直接进行命令操作。

public class Sharded<R, S extends ShardInfo<R>> {

  public static final int DEFAULT_WEIGHT = 1;
  private TreeMap<Long, S> nodes;
  private final Hashing algo;
  // 保存redis节点和访问该节点的Jedis的连接信息
  private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<>();

  public R getShard(String key) {
    // 根据redis节点找到对应的访问对象Jedis
    return resources.get(getShardInfo(key));
  }

  public S getShardInfo(String key) {
    return getShardInfo(SafeEncoder.encode(getKeyTag(key)));
  }

  public S getShardInfo(byte[] key) {
    // 针对访问的key生成对应的hash值
    // 根据hash值找到对应的redis节点
    SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));
    if (tail.isEmpty()) {
      return nodes.get(nodes.firstKey());
    }
    return tail.get(tail.firstKey());
  }
}

Jedis的Pipeline实现

Pipeline的技术核心思想是将多个命令发送到服务器而不用等待回复,最后在一个步骤中读取该答复。这种模式的好处在于节省了请求响应这种模式的网络开销。

Redis的普通命令如set和Pipeline批量操作的核心的差别在于set命令的操作会直接发送请求到Redis并同步等待结果返回,而Pipeline的操作会发送请求但不立即同步等待结果返回,具体的实现可以从Jedis的源码一探究竟。

原生的Pipeline在集群模式下相关的key必须Hash到同一个节点才能生效,原因在于Pipeline下的Client对象只能其中的一个节点建立了连接。

在集群模式下归属于不同节点的key能够使用Pipeline就需要针对每个key保存对应的节点的client对象,在最后执行获取数据的时候一并获取。本质上可以认为在单节点的Pipeline的基础上封装成一个集群式的Pipeline。

  • Pipeline用法分析

Pipeline访问单节点的Redis的时候,通过Jedis对象的Pipeline方法返回Pipeline对象,其他的命令操作通过该Pipeline对象进行访问。

Pipeline从使用角度来分析,会批量发送多个命令并最后统一使用syncAndReturnAll来一次性返回结果。

public void pipeline() {
    jedis = new Jedis(hnp.getHost(), hnp.getPort(), 500);
    Pipeline p = jedis.pipelined();
    // 批量发送命令到redis
    p.set("foo", "bar");
    p.get("foo");
    // 同步等待响应结果
    List<Object> results = p.syncAndReturnAll();

    assertEquals(2, results.size());
    assertEquals("OK", results.get(0));
    assertEquals("bar", results.get(1));
 }

public abstract class PipelineBase extends Queable implements BinaryRedisPipeline, RedisPipeline {

  @Override
  public Response<String> set(final String key, final String value) {
    // 发送命令
    getClient(key).set(key, value);
    // pipeline的getResponse只是把待响应的请求聚合到pipelinedResponses对象当中
    return getResponse(BuilderFactory.STRING);
  }
}

public class Queable {

  private Queue<Response<?>> pipelinedResponses = new LinkedList<>();
  protected <T> Response<T> getResponse(Builder<T> builder) {
    Response<T> lr = new Response<>(builder);
    // 统一保存到响应队列当中
    pipelinedResponses.add(lr);
    return lr;
  }
}

public class Pipeline extends MultiKeyPipelineBase implements Closeable {

  public List<Object> syncAndReturnAll() {
    if (getPipelinedResponseLength() > 0) {
      // 根据批量发送命令的个数即需要批量返回命令的个数,通过client对象进行批量读取
      List<Object> unformatted = client.getMany(getPipelinedResponseLength());
      List<Object> formatted = new ArrayList<>();
      for (Object o : unformatted) {
        try {
          // 格式化每个返回的结果并最终保存在列表中进行返回
          formatted.add(generateResponse(o).get());
        } catch (JedisDataException e) {
          formatted.add(e);
        }
      }
      return formatted;
    } else {
      return java.util.Collections.<Object> emptyList();
    }
  }
}
  • 普通set命令发送请求给Redis后立即通过getStatusCodeReply来获取响应结果,所以这是一种请求响应的模式。getStatusCodeReply在获取响应结果的时候会通过flush()命令强制发送报文到Redis服务端然后通过读取响应结果。
public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
    AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {

  @Override
  public String set(final byte[] key, final byte[] value) {
    checkIsInMultiOrPipeline();
    // 发送命令
    client.set(key, value);
    // 等待请求响应
    return client.getStatusCodeReply();
  }
}

public class Connection implements Closeable {
  public String getStatusCodeReply() {
    // 通过flush立即发送请求
    flush();
    // 处理响应请求
    final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
    if (null == resp) {
      return null;
    } else {
      return SafeEncoder.encode(resp);
    }
  }
}

public class Connection implements Closeable {
  protected void flush() {
    try {
      // 针对输出流进行flush操作保证报文的发出
      outputStream.flush();
    } catch (IOException ex) {
      broken = true;
      throw new JedisConnectionException(ex);
    }
  }
}

参考文档

点赞

发表评论

电子邮件地址不会被公开。必填项已用 * 标注