小呆呆的生活

  • 首页
  • 分类
    • Linux
    • MySQL
    • SpringBoot
    • SpringCloud
  • 工具
  • 留言
  • 登录
  • 注册
  • 友情链接
    • 咸鱼的窝
    • DIY熙的家
    • Farmer的自习室
    • Dark的小黑屋
  • 关于
人的一生注定会遇到两个人
一个惊艳了时光,一个温柔了岁月
  1. 首页
  2. Redis
  3. 正文

Reids客户端Jedis的访问模式(上)

2021年 12月 22日 920次阅读 0人点赞 0条评论

Jedis

Jedis是Redis的Java生态的客户端之一,其他还有Redisson、Lettuce

Jedis客户端支持单机模式、分片模式、集群模式的访问模式

  • 单机模式:创建Jedis对象来操作单节点的Redis,只适用于访问单个Redis节点。
  • 分片模式:创建ShardedJedisPool对象来访问分片模式的多个Redis节点,是Redis没有集群功能之前客户端实现的一个数据分布式方案,本质上是客户端通过一致性哈希来实现数据分布式存储。
  • 集群模式:创建JedisCluster对象来访问集群模式下的多个Redis节点,是Redis3.0引入集群模式后客户端实现的集群访问访问,本质上是通过引入槽(slot)概念以及通过CRC16哈希槽算法来实现数据分布式存储。

Jedis客户端支持单命令和Pipeline方式访问Redis集群,通过Pipeline的方式能够提高集群访问的效率

Jedis单机模式的访问

  • Jedis的创建过程核心在于创建Jedis对象以及Jedis内部变量Client对象
  • Jedis访问Redis的过程在于通过Jedis内部的Client对象访问Redis
  • Jedis继承自BinaryJedis类,在BinaryJedis类中存在和Redis对接的Client类对象,Jedis通过父类的BinaryJedis的Client对象实现Redis的读写。
public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
    AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands, ModuleCommands {

  protected JedisPoolAbstract dataSource = null;

  public Jedis(final String host, final int port) {
    // 创建父类BinaryJedis对象
    super(host, port);
  }
}

public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
    AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {

  // 访问redis的Client对象
  protected Client client = null;

  public BinaryJedis(final String host, final int port) {
    // 创建Client对象访问redis
    client = new Client(host, port);
  }
}
  • Client对象继承自BinaryClient和Connection类,在BinaryClient类中存在Redis访问密码等相关参数,在Connection类在存在访问Redis的socket对象以及对应的输入输出流。本质上Connection是和Redis进行通信的核心类。Client类在创建过程中初始化核心父类Connection对象,而Connection是负责和Redis直接进行通信。
public class Client extends BinaryClient implements Commands {
  public Client(final String host, final int port) {
    super(host, port);
  }
}

public class BinaryClient extends Connection {
  // 存储和Redis连接的相关信息
  private boolean isInMulti;
  private String user;
  private String password;
  private int db;
  private boolean isInWatch;

  public BinaryClient(final String host, final int port) {
    super(host, port);
  }
}

public class Connection implements Closeable {
  // 管理和Redis连接的socket信息及对应的输入输出流
  private JedisSocketFactory jedisSocketFactory;
  private Socket socket;
  private RedisOutputStream outputStream;
  private RedisInputStream inputStream;
  private int infiniteSoTimeout = 0;
  private boolean broken = false;

  public Connection(final String host, final int port, final boolean ssl,
      SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
      HostnameVerifier hostnameVerifier) {
    // 构建DefaultJedisSocketFactory来创建和Redis连接的Socket对象
    this(new DefaultJedisSocketFactory(host, port, Protocol.DEFAULT_TIMEOUT,
        Protocol.DEFAULT_TIMEOUT, ssl, sslSocketFactory, sslParameters, hostnameVerifier));
  }
}
  • 访问过程(方式):

    Jedis的set操作是通过Client的set操作来实现的,Client的set操作是通过父类Connection的sendCommand来实现。

public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
    AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands, ModuleCommands {
  @Override
  public String set(final String key, final String value) {
    checkIsInMultiOrPipeline();
    // client执行set操作
    client.set(key, value);
    return client.getStatusCodeReply();
  }
}

public class Client extends BinaryClient implements Commands {
  @Override
  public void set(final String key, final String value) {
    // 执行set命令
    set(SafeEncoder.encode(key), SafeEncoder.encode(value));
  }
}

public class BinaryClient extends Connection {
  public void set(final byte[] key, final byte[] value) {
    // 发送set指令
    sendCommand(SET, key, value);
  }
}

public class Connection implements Closeable {
  public void sendCommand(final ProtocolCommand cmd, final byte[]... args) {
    try {
      // socket连接redis
      connect();
      // 按照redis的协议发送命令
      Protocol.sendCommand(outputStream, cmd, args);
    } catch (JedisConnectionException ex) {
    }
  }
}

Jedis分片模式访问

  • 本质上Redis的分片模式跟Redis本身没有任何关系,只是通过客户端来解决单节点数据有限存储的问题。
  • ShardedJedis访问Redis的核心在于构建对象的时候初始化一致性Hash对象,构建一致性Hash经典的Hash值和node的映射关系。构建完映射关系后执行set等操作就是Hash值到node的寻址过程,寻址完成后直接进行单节点的操作。
  • ShardedJedis的创建过程在于父类的Sharded中关于一致性Hash相关的初始化过程,核心在于构建一致性的虚拟节点以及虚拟节点和Redis节点的映射关系。
  • 源码中最核心的部分代码在于根据根据权重映射成未160个虚拟节点,通过虚拟节点来定位到具体的Redis节点。
public class Sharded<R, S extends ShardInfo<R>> {

  public static final int DEFAULT_WEIGHT = 1;
  // 保存虚拟节点和redis的node节点的映射关系
  private TreeMap<Long, S> nodes;
  // hash算法
  private final Hashing algo;
  // 保存redis节点和访问该节点的Jedis的连接信息
  private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<>();

  public Sharded(List<S> shards, Hashing algo) {
    this.algo = algo;
    initialize(shards);
  }

  private void initialize(List<S> shards) {
    nodes = new TreeMap<>();
    // 遍历每个redis的节点并设置hash值到节点的映射关系
    for (int i = 0; i != shards.size(); ++i) {
      final S shardInfo = shards.get(i);
      // 根据权重映射成未160个虚拟节点
      int N =  160 * shardInfo.getWeight();
      if (shardInfo.getName() == null) for (int n = 0; n < N; n++) {
        // 构建hash值和节点映射关系
        nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
      }
      else for (int n = 0; n < N; n++) {
        nodes.put(this.algo.hash(shardInfo.getName() + "*" + n), shardInfo);
      }
      // 保存每个节点的访问对象
      resources.put(shardInfo, shardInfo.createResource());
    }
  }
}
  • 访问过程(方式)

    ShardedJedis的访问过程就是一致性Hash的计算过程,核心的逻辑就是:通过Hash算法对访问的key进行Hash计算生成Hash值,根据Hash值获取对应Redis节点,根据对应的Redis节点获取对应的访问对象Jedis。获取访问对象Jedis之后就可以直接进行命令操作。

public class Sharded<R, S extends ShardInfo<R>> {

  public static final int DEFAULT_WEIGHT = 1;
  private TreeMap<Long, S> nodes;
  private final Hashing algo;
  // 保存redis节点和访问该节点的Jedis的连接信息
  private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<>();

  public R getShard(String key) {
    // 根据redis节点找到对应的访问对象Jedis
    return resources.get(getShardInfo(key));
  }

  public S getShardInfo(String key) {
    return getShardInfo(SafeEncoder.encode(getKeyTag(key)));
  }

  public S getShardInfo(byte[] key) {
    // 针对访问的key生成对应的hash值
    // 根据hash值找到对应的redis节点
    SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));
    if (tail.isEmpty()) {
      return nodes.get(nodes.firstKey());
    }
    return tail.get(tail.firstKey());
  }
}

Jedis的Pipeline实现

Pipeline的技术核心思想是将多个命令发送到服务器而不用等待回复,最后在一个步骤中读取该答复。这种模式的好处在于节省了请求响应这种模式的网络开销。

Redis的普通命令如set和Pipeline批量操作的核心的差别在于set命令的操作会直接发送请求到Redis并同步等待结果返回,而Pipeline的操作会发送请求但不立即同步等待结果返回,具体的实现可以从Jedis的源码一探究竟。

原生的Pipeline在集群模式下相关的key必须Hash到同一个节点才能生效,原因在于Pipeline下的Client对象只能其中的一个节点建立了连接。

在集群模式下归属于不同节点的key能够使用Pipeline就需要针对每个key保存对应的节点的client对象,在最后执行获取数据的时候一并获取。本质上可以认为在单节点的Pipeline的基础上封装成一个集群式的Pipeline。

  • Pipeline用法分析

Pipeline访问单节点的Redis的时候,通过Jedis对象的Pipeline方法返回Pipeline对象,其他的命令操作通过该Pipeline对象进行访问。

Pipeline从使用角度来分析,会批量发送多个命令并最后统一使用syncAndReturnAll来一次性返回结果。

public void pipeline() {
    jedis = new Jedis(hnp.getHost(), hnp.getPort(), 500);
    Pipeline p = jedis.pipelined();
    // 批量发送命令到redis
    p.set("foo", "bar");
    p.get("foo");
    // 同步等待响应结果
    List<Object> results = p.syncAndReturnAll();

    assertEquals(2, results.size());
    assertEquals("OK", results.get(0));
    assertEquals("bar", results.get(1));
 }

public abstract class PipelineBase extends Queable implements BinaryRedisPipeline, RedisPipeline {

  @Override
  public Response<String> set(final String key, final String value) {
    // 发送命令
    getClient(key).set(key, value);
    // pipeline的getResponse只是把待响应的请求聚合到pipelinedResponses对象当中
    return getResponse(BuilderFactory.STRING);
  }
}

public class Queable {

  private Queue<Response<?>> pipelinedResponses = new LinkedList<>();
  protected <T> Response<T> getResponse(Builder<T> builder) {
    Response<T> lr = new Response<>(builder);
    // 统一保存到响应队列当中
    pipelinedResponses.add(lr);
    return lr;
  }
}

public class Pipeline extends MultiKeyPipelineBase implements Closeable {

  public List<Object> syncAndReturnAll() {
    if (getPipelinedResponseLength() > 0) {
      // 根据批量发送命令的个数即需要批量返回命令的个数,通过client对象进行批量读取
      List<Object> unformatted = client.getMany(getPipelinedResponseLength());
      List<Object> formatted = new ArrayList<>();
      for (Object o : unformatted) {
        try {
          // 格式化每个返回的结果并最终保存在列表中进行返回
          formatted.add(generateResponse(o).get());
        } catch (JedisDataException e) {
          formatted.add(e);
        }
      }
      return formatted;
    } else {
      return java.util.Collections.<Object> emptyList();
    }
  }
}
  • 普通set命令发送请求给Redis后立即通过getStatusCodeReply来获取响应结果,所以这是一种请求响应的模式。getStatusCodeReply在获取响应结果的时候会通过flush()命令强制发送报文到Redis服务端然后通过读取响应结果。
public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
    AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {

  @Override
  public String set(final byte[] key, final byte[] value) {
    checkIsInMultiOrPipeline();
    // 发送命令
    client.set(key, value);
    // 等待请求响应
    return client.getStatusCodeReply();
  }
}

public class Connection implements Closeable {
  public String getStatusCodeReply() {
    // 通过flush立即发送请求
    flush();
    // 处理响应请求
    final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
    if (null == resp) {
      return null;
    } else {
      return SafeEncoder.encode(resp);
    }
  }
}

public class Connection implements Closeable {
  protected void flush() {
    try {
      // 针对输出流进行flush操作保证报文的发出
      outputStream.flush();
    } catch (IOException ex) {
      broken = true;
      throw new JedisConnectionException(ex);
    }
  }
}

参考文档

  • GitHub - redis/jedis: Redis Java client designed for performance and ease of use.
  • 深入剖析Redis客户端Jedis的特性和原理 - 掘金 (juejin.cn)
本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: Java Redis
最后更新:2021年 12月 29日

小呆呆

知足常乐,就会拥有幸福

点赞
< 上一篇
下一篇 >

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复

小呆呆

知足常乐,就会拥有幸福

最新 热点 随机
最新 热点 随机
本站暂时停止更新,后续文章将在CSDN更新 数据库索引简析 Java多线程的使用场景以及线程的创建方式 Spring事务的使用示例和传播行为以及失效场景 Spring Boot使用JUnit和Mockito进行Service层单元测试 Spring Cloud Zuul和Gateway的简单示例(搭建方式)
SpringCloud项目搭建方式 Java面试高频(五) 本站暂时停止更新,后续文章将在CSDN更新 Spring Boot使用JUnit和Mockito进行Service层单元测试 MySQL练习(二) Spring事务的使用示例和传播行为以及失效场景
标签聚合
面试 干货 Java SpringBoot MySQL SpringCloud Spring 后端
最近评论
我是可是尼古拉斯·爱新觉·罗·G·钰豪啊 发布于 4 年前(04月08日) 我来注水了胜哥 :hehe:
鸟人金 发布于 4 年前(03月03日) v
鸟人金 发布于 4 年前(03月03日) 胜哥yyds
鸟人金 发布于 4 年前(03月03日) 我滴偶像!!!!!!!!!!!!!!!
水军2号 发布于 4 年前(03月03日) 胜哥tql
归档
  • 2024 年 4 月
  • 2024 年 2 月
  • 2024 年 1 月
  • 2023 年 12 月
  • 2023 年 11 月
  • 2023 年 10 月
  • 2023 年 8 月
  • 2023 年 6 月
  • 2022 年 11 月
  • 2022 年 8 月
  • 2022 年 6 月
  • 2022 年 4 月
  • 2022 年 3 月
  • 2022 年 1 月
  • 2021 年 12 月
  • 2021 年 9 月
  • 2021 年 8 月
  • 2021 年 6 月
  • 2021 年 4 月
  • 2020 年 10 月
  • 2020 年 9 月
  • 2020 年 8 月
  • 2020 年 7 月
  • 2020 年 6 月
  • 2020 年 5 月
  • 2020 年 4 月
  • 2020 年 3 月

COPYRIGHT © 2023 小呆呆的生活. ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang

粤ICP备2020104583号

粤公网安备44011802000463号