Jedis
Jedis是Redis的Java生态的客户端之一,其他还有Redisson、Lettuce
Jedis客户端支持单机模式、分片模式、集群模式的访问模式
- 单机模式:创建Jedis对象来操作单节点的Redis,只适用于访问单个Redis节点。
- 分片模式:创建ShardedJedisPool对象来访问分片模式的多个Redis节点,是Redis没有集群功能之前客户端实现的一个数据分布式方案,本质上是客户端通过一致性哈希来实现数据分布式存储。
- 集群模式:创建JedisCluster对象来访问集群模式下的多个Redis节点,是Redis3.0引入集群模式后客户端实现的集群访问访问,本质上是通过引入槽(slot)概念以及通过CRC16哈希槽算法来实现数据分布式存储。
Jedis客户端支持单命令和Pipeline方式访问Redis集群,通过Pipeline的方式能够提高集群访问的效率
Jedis单机模式的访问
- Jedis的创建过程核心在于创建Jedis对象以及Jedis内部变量Client对象
- Jedis访问Redis的过程在于通过Jedis内部的Client对象访问Redis
- Jedis继承自BinaryJedis类,在BinaryJedis类中存在和Redis对接的Client类对象,Jedis通过父类的BinaryJedis的Client对象实现Redis的读写。
public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands, ModuleCommands {
protected JedisPoolAbstract dataSource = null;
public Jedis(final String host, final int port) {
// 创建父类BinaryJedis对象
super(host, port);
}
}
public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {
// 访问redis的Client对象
protected Client client = null;
public BinaryJedis(final String host, final int port) {
// 创建Client对象访问redis
client = new Client(host, port);
}
}
- Client对象继承自BinaryClient和Connection类,在BinaryClient类中存在Redis访问密码等相关参数,在Connection类在存在访问Redis的socket对象以及对应的输入输出流。本质上Connection是和Redis进行通信的核心类。Client类在创建过程中初始化核心父类Connection对象,而Connection是负责和Redis直接进行通信。
public class Client extends BinaryClient implements Commands {
public Client(final String host, final int port) {
super(host, port);
}
}
public class BinaryClient extends Connection {
// 存储和Redis连接的相关信息
private boolean isInMulti;
private String user;
private String password;
private int db;
private boolean isInWatch;
public BinaryClient(final String host, final int port) {
super(host, port);
}
}
public class Connection implements Closeable {
// 管理和Redis连接的socket信息及对应的输入输出流
private JedisSocketFactory jedisSocketFactory;
private Socket socket;
private RedisOutputStream outputStream;
private RedisInputStream inputStream;
private int infiniteSoTimeout = 0;
private boolean broken = false;
public Connection(final String host, final int port, final boolean ssl,
SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
HostnameVerifier hostnameVerifier) {
// 构建DefaultJedisSocketFactory来创建和Redis连接的Socket对象
this(new DefaultJedisSocketFactory(host, port, Protocol.DEFAULT_TIMEOUT,
Protocol.DEFAULT_TIMEOUT, ssl, sslSocketFactory, sslParameters, hostnameVerifier));
}
}
-
访问过程(方式):
Jedis的set操作是通过Client的set操作来实现的,Client的set操作是通过父类Connection的sendCommand来实现。
public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands, ModuleCommands {
@Override
public String set(final String key, final String value) {
checkIsInMultiOrPipeline();
// client执行set操作
client.set(key, value);
return client.getStatusCodeReply();
}
}
public class Client extends BinaryClient implements Commands {
@Override
public void set(final String key, final String value) {
// 执行set命令
set(SafeEncoder.encode(key), SafeEncoder.encode(value));
}
}
public class BinaryClient extends Connection {
public void set(final byte[] key, final byte[] value) {
// 发送set指令
sendCommand(SET, key, value);
}
}
public class Connection implements Closeable {
public void sendCommand(final ProtocolCommand cmd, final byte[]... args) {
try {
// socket连接redis
connect();
// 按照redis的协议发送命令
Protocol.sendCommand(outputStream, cmd, args);
} catch (JedisConnectionException ex) {
}
}
}
Jedis分片模式访问
- 本质上Redis的分片模式跟Redis本身没有任何关系,只是通过客户端来解决单节点数据有限存储的问题。
- ShardedJedis访问Redis的核心在于构建对象的时候初始化一致性Hash对象,构建一致性Hash经典的Hash值和node的映射关系。构建完映射关系后执行set等操作就是Hash值到node的寻址过程,寻址完成后直接进行单节点的操作。
- ShardedJedis的创建过程在于父类的Sharded中关于一致性Hash相关的初始化过程,核心在于构建一致性的虚拟节点以及虚拟节点和Redis节点的映射关系。
- 源码中最核心的部分代码在于根据根据权重映射成未160个虚拟节点,通过虚拟节点来定位到具体的Redis节点。
public class Sharded<R, S extends ShardInfo<R>> {
public static final int DEFAULT_WEIGHT = 1;
// 保存虚拟节点和redis的node节点的映射关系
private TreeMap<Long, S> nodes;
// hash算法
private final Hashing algo;
// 保存redis节点和访问该节点的Jedis的连接信息
private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<>();
public Sharded(List<S> shards, Hashing algo) {
this.algo = algo;
initialize(shards);
}
private void initialize(List<S> shards) {
nodes = new TreeMap<>();
// 遍历每个redis的节点并设置hash值到节点的映射关系
for (int i = 0; i != shards.size(); ++i) {
final S shardInfo = shards.get(i);
// 根据权重映射成未160个虚拟节点
int N = 160 * shardInfo.getWeight();
if (shardInfo.getName() == null) for (int n = 0; n < N; n++) {
// 构建hash值和节点映射关系
nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
}
else for (int n = 0; n < N; n++) {
nodes.put(this.algo.hash(shardInfo.getName() + "*" + n), shardInfo);
}
// 保存每个节点的访问对象
resources.put(shardInfo, shardInfo.createResource());
}
}
}
-
访问过程(方式)
ShardedJedis的访问过程就是一致性Hash的计算过程,核心的逻辑就是:通过Hash算法对访问的key进行Hash计算生成Hash值,根据Hash值获取对应Redis节点,根据对应的Redis节点获取对应的访问对象Jedis。获取访问对象Jedis之后就可以直接进行命令操作。
public class Sharded<R, S extends ShardInfo<R>> {
public static final int DEFAULT_WEIGHT = 1;
private TreeMap<Long, S> nodes;
private final Hashing algo;
// 保存redis节点和访问该节点的Jedis的连接信息
private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<>();
public R getShard(String key) {
// 根据redis节点找到对应的访问对象Jedis
return resources.get(getShardInfo(key));
}
public S getShardInfo(String key) {
return getShardInfo(SafeEncoder.encode(getKeyTag(key)));
}
public S getShardInfo(byte[] key) {
// 针对访问的key生成对应的hash值
// 根据hash值找到对应的redis节点
SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));
if (tail.isEmpty()) {
return nodes.get(nodes.firstKey());
}
return tail.get(tail.firstKey());
}
}
Jedis的Pipeline实现
Pipeline的技术核心思想是将多个命令发送到服务器而不用等待回复,最后在一个步骤中读取该答复。这种模式的好处在于节省了请求响应这种模式的网络开销。
Redis的普通命令如set和Pipeline批量操作的核心的差别在于set命令的操作会直接发送请求到Redis并同步等待结果返回,而Pipeline的操作会发送请求但不立即同步等待结果返回,具体的实现可以从Jedis的源码一探究竟。
原生的Pipeline在集群模式下相关的key必须Hash到同一个节点才能生效,原因在于Pipeline下的Client对象只能其中的一个节点建立了连接。
在集群模式下归属于不同节点的key能够使用Pipeline就需要针对每个key保存对应的节点的client对象,在最后执行获取数据的时候一并获取。本质上可以认为在单节点的Pipeline的基础上封装成一个集群式的Pipeline。
- Pipeline用法分析
Pipeline访问单节点的Redis的时候,通过Jedis对象的Pipeline方法返回Pipeline对象,其他的命令操作通过该Pipeline对象进行访问。
Pipeline从使用角度来分析,会批量发送多个命令并最后统一使用syncAndReturnAll来一次性返回结果。
public void pipeline() {
jedis = new Jedis(hnp.getHost(), hnp.getPort(), 500);
Pipeline p = jedis.pipelined();
// 批量发送命令到redis
p.set("foo", "bar");
p.get("foo");
// 同步等待响应结果
List<Object> results = p.syncAndReturnAll();
assertEquals(2, results.size());
assertEquals("OK", results.get(0));
assertEquals("bar", results.get(1));
}
public abstract class PipelineBase extends Queable implements BinaryRedisPipeline, RedisPipeline {
@Override
public Response<String> set(final String key, final String value) {
// 发送命令
getClient(key).set(key, value);
// pipeline的getResponse只是把待响应的请求聚合到pipelinedResponses对象当中
return getResponse(BuilderFactory.STRING);
}
}
public class Queable {
private Queue<Response<?>> pipelinedResponses = new LinkedList<>();
protected <T> Response<T> getResponse(Builder<T> builder) {
Response<T> lr = new Response<>(builder);
// 统一保存到响应队列当中
pipelinedResponses.add(lr);
return lr;
}
}
public class Pipeline extends MultiKeyPipelineBase implements Closeable {
public List<Object> syncAndReturnAll() {
if (getPipelinedResponseLength() > 0) {
// 根据批量发送命令的个数即需要批量返回命令的个数,通过client对象进行批量读取
List<Object> unformatted = client.getMany(getPipelinedResponseLength());
List<Object> formatted = new ArrayList<>();
for (Object o : unformatted) {
try {
// 格式化每个返回的结果并最终保存在列表中进行返回
formatted.add(generateResponse(o).get());
} catch (JedisDataException e) {
formatted.add(e);
}
}
return formatted;
} else {
return java.util.Collections.<Object> emptyList();
}
}
}
- 普通set命令发送请求给Redis后立即通过getStatusCodeReply来获取响应结果,所以这是一种请求响应的模式。getStatusCodeReply在获取响应结果的时候会通过flush()命令强制发送报文到Redis服务端然后通过读取响应结果。
public class BinaryJedis implements BasicCommands, BinaryJedisCommands, MultiKeyBinaryCommands,
AdvancedBinaryJedisCommands, BinaryScriptingCommands, Closeable {
@Override
public String set(final byte[] key, final byte[] value) {
checkIsInMultiOrPipeline();
// 发送命令
client.set(key, value);
// 等待请求响应
return client.getStatusCodeReply();
}
}
public class Connection implements Closeable {
public String getStatusCodeReply() {
// 通过flush立即发送请求
flush();
// 处理响应请求
final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
if (null == resp) {
return null;
} else {
return SafeEncoder.encode(resp);
}
}
}
public class Connection implements Closeable {
protected void flush() {
try {
// 针对输出流进行flush操作保证报文的发出
outputStream.flush();
} catch (IOException ex) {
broken = true;
throw new JedisConnectionException(ex);
}
}
}
文章评论