如何让JedisCluster支持Pipeline

原创 吴就业 140 0 2019-12-02

本文为博主原创文章,未经博主允许不得转载。

本文链接:https://wujiuye.com/article/2273af9150e64289a2d445f65f2c17c6

作者:吴就业
链接:https://wujiuye.com/article/2273af9150e64289a2d445f65f2c17c6
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。

本篇文章写于2019年12月02日,从公众号同步过来(博客搬家),本篇为原创文章。

hmset等批量操作命令与pipeline最大的区别是,前者是原子性命令,比如hmset,如果一次插入的field过多,会导致命令耗时增加;后者非原子性,只是批量的传输要执行命令,减少网络耗时。

pipeline提升性能的关键,一是RTT,节省往返时间,二是I/O系统调用,read系统调用,需要从用户态,切换到内核态。

hmset这类批量操作命令只能指定一个key,在Cluster集群下,不存在跨节点问题。而Pipeline由于支持所有命令的操作,支持多key,在Cluster集群模式下,会出现key映射到不同solt槽,可能会落到不同的节点上。这也是JedisCluster不提供Pipeline支持的原因。

HashTag

HashTag机制可以影响key被分配到的slot,从而可以使用那些被限制在slot中操作,比如rename。

笔者在项目重构阶段就遇到这个问题,代码中为了保证数据更新的原子性,使用了一个临时key写入数据,当所有数据更新完成后,再用rename将临时key替换回原来的key,因redis集群由原来的主从集群改为cluster集群后,rename导致代码抛出异常,原因是rename前的key与rename后的key映射到的槽位不在同一个节点上。

HashTag即是用{}包裹key的一个子串,如{user}1,{user}2。在设置了HashTag的情况下,集群会根据HashTag决定key分配到的slot,两个key拥有相同的HashTag=>{user},它们会被分配到同一个slot。

允许我们使用rename命令:

#           rename
#offers-tmp ---->offers-active
{offers}-tmp ----> {offers}-active

通常情况下,HashTag不支持嵌套,即将第一个’{‘,和第一个’}‘中间的内容作为HashTag。

使用HashTag可能会导致过多的key分配到同一个slot中,造成数据倾斜影响系统的吞吐量,务必谨慎使用。

让JedisCluster支持Pipeline

Pipeline需要客户端和服务端的支持。这是官网对Pipeline的介绍:https://redis.io/topics/pipelining。

对于服务端来说,所需要的是能够处理一个客户端通过同一个 TCP 连接发来的多个命令,但并不是所有命令都接收完才执行,和处理单个命令一样,每读到一条完整的命令就放入命令等待队列等待执行,每处理完一条命令就响应给客户端,直到客户端调用socket.getInputStream()的输入流的read方法读取响应。(Jedis具体实现看RedisInputStream与Connection。)

对于客户端,则是要将多个命令写入缓冲区,缓冲区满了就发送,然后再写入缓冲区buf,最后一次需要调用flush将未满的缓冲区的命令都发送出去,最后才处理 Redis 的应答(即read),缓冲区大小为8192字节。

使用Cluster集群模式,需要客户端缓存每个节点的槽位信息。JedisCluster在发送命令前会根据CRC16(key) %16384计算出key所在的槽位,根据槽位获取对应的节点连接池,再从连接池中获取一个Jedis连接。

JedisClustet是通过JedisSlotBasedConnectionHandler获取连接的,在JedisCluster的<init>方法中,会创建一个JedisSlotBasedConnectionHandler,它有一个字段cache,类型为JedisClusterInfoCache。JedisClusterInfoCache缓存了每个主节点对应的连接池nodes,以及每个槽位对应的连接池。

private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
private Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();

JedisClusterInfoCache<init>方法中调用discoverClusterNodesAndSlots方法,获取所有节点和槽位信息。即遍历配置的所有节点,只要有一个节点能连接上就可以获取到集群的槽位信息,获取到槽位配置信息后终止遍历。在连接上一个节点后,发送一条cluster slots命令获取槽位分配信息。

172.24.1.1:6379> cluster slots 
1) 1) (integer) 0 // 槽位开始
   2) (integer) 5460 // 槽位结束
   3) 1) "172.24.1.1"  // 节点1(主节点)的ip
      2) (integer) 6380 // 节点1的端口
      3) "c70e6b2122bd336790d7f8c7bbbc88b59ea95ac1" 
   4) 1) "172.24.1.2" // 节点2的ip
      2) (integer) 6379 // 节点2的端口
      3) "c3d94b9de931247446dea98e8afd2ce5059fa377"
.........

cluster slots返回是一个数组,即Cluster中所有小主从集群的信息,数组每个元素又是一个数组,通过遍历数组拿到每段槽位的主节点信息,并创建一个连接池,在源码中有一句注释。

// at this time, we just use master, discard slave information

“此时,我们只使用master,放弃slave信息”。所以并不会为从节点创建连接池。nodes字段缓存的是所有小集群的主节点的连接池。完全就是弃用从节点了,只有当主节点挂掉,连接池中的连接不可用时,才会刷新nodes,配置的从节点才会用到。

正如例子中的,JedisCluster为槽位0~5460的主节点创建一个连接池JedisPool,而slots则会缓存0到5460slot持有这个JedsiPool的引用。

// 0到5460,可看getAssignedSlotArray方法
for (Integer slot : targetSlots) {
     slots.put(slot, targetPool);
}
private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
    List<Integer> slotNums = new ArrayList<Integer>();
    for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1))
         .intValue(); slot++) {
       slotNums.add(slot);
    } 
    return slotNums;
}

所以,只要能够获取JedisCluster对象的JedisSlotBasedConnectionHandler字段,再拿到JedisSlotBasedConnectionHandler对象的JedisClusterInfoCache字段,我们就能自己实现Pipeline。

 public class JedisClusterPipeline{
     // 采用反射获取部分字段
     private static final Field FIELD_CONNECTION_HANDLER;
     private static final Field FIELD_CACHE;
     static {
         FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");
         FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");
     }

     private JedisSlotBasedConnectionHandler connectionHandler;
     private JedisClusterInfoCache clusterInfoCache;

     public void setJedisCluster(JedisCluster jedis) {
         connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);
         clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);
     }

     private JedisClusterPipeline() {
     }
     /**
      * 根据jedisCluster实例生成对应的JedisClusterPipeline
      *
      * @param
      * @return
      */
     public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {
         JedisClusterPipeline pipeline = new JedisClusterPipeline();
         pipeline.setJedisCluster(jedisCluster);
         return pipeline;
     }

}

还需继承PipelineBase以获得Pipeline API的支持,实现Closeable接口close方法做连接释放操作。

public class JedisClusterPipeline extends PipelineBase 
              implements Closeable {
    ......
    // 根据顺序存储每个命令对应的Client
    private Queue<Client> clients = new LinkedList<>();
    // 缓存Pipline持有的连接
    private Map<JedisPool, Jedis> jedisMap = new HashMap<>();
}

继承PipelineBase需要实现getClient方法。能够通过key获取一个连接Jedis(Client)。首先通过CRC16计算出key所在的槽位,再根据槽位获取到一个连接。

  @Override
  protected Client getClient(String key) {
         byte[] bKey = SafeEncoder.encode(key);
         return getClient(bKey);
  }
  
  @Override
  protected Client getClient(byte[] key) {
      Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));
      Client client = jedis.getClient();
      clients.add(client);
      return client;
  }

根据槽位获取连接就是从JedisClusterInfoCache的slots字段获取槽位对应的连接池,拿到连接池就可以从连接池中获取连接了。

由于使用Pipeline时可能存在多个key落到同一个节点上,所以只需要确保一个节点只从连接池中获取一个连接就可以了,所以使用一个Map(jedisMap)来缓存当前Pipeline持有的Jedis。

   private Jedis getJedis(int slot) {
        // 根据pool从缓存中获取Jedis
        JedisPool pool = clusterInfoCache.getSlotPool(slot);
        Jedis jedis = jedisMap.get(pool);
        if (null == jedis) {
            jedis = pool.getResource();
            jedisMap.put(pool, jedis);
        }
        return jedis;
    }

往Pipeline每写入一条命令,都是往Jedis(Client(Socket))的输出流写入,命令会缓存在输出流缓冲区中,缓冲区满则发送,最后需要调用flush命令将缓冲区剩余数据都传输到远端redis服务器。以输出流为例,看下RedisOutputStream输出流是怎么实现的。

public final class RedisOutputStream extends FilterOutputStream {
  protected final byte[] buf;
  protected int count;
  // out = socket.getOutputStream() 
  // @see: redis.clients.jedis.Connection
  public RedisOutputStream(final OutputStream out) {
    this(out, 8192);
  }
  public RedisOutputStream(final OutputStream out, final int size) {
    super(out);
    if (size <= 0) {
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    buf = new byte[size];
  }
 }

从RedisOutputStream源码可以看出,输出流的缓冲区大小默认为8192字节,接着看下往输出流里面写入命令都做了写什么。

// 将命令字节数组写入到RedisOutputStream
public void write(final byte[] b) throws IOException {
    write(b, 0, b.length);
}  

public void write(final byte[] b, final int off, final int len) throws IOException {
    // 如果字节数组大于缓冲区大小,则先将缓冲区数据写入到输出流,再直接将要写入的数据写入到输出流
    if (len >= buf.length) {
       flushBuffer();
       out.write(b, off, len);
     } else {
       // 如果字节数据大于可写入的缓冲区大小,则将缓冲区数据写入到输出流
       if (len >= buf.length - count) {
         flushBuffer();
       }
       // 再将字节数组写入到缓冲区
       System.arraycopy(b, off, buf, count, len);
       count += len;
    }
}
 // 将缓冲区内容真正写入到输出流
 private void flushBuffer() throws IOException {
    if (count > 0) {
      out.write(buf, 0, count);
      count = 0;
    }
}

所以写入完要执行的全部命令后,需要调用当前Pipeline所持有的所有Clinet的getAll()方法,将Client的输出流缓冲区命令都传输到远端redis执行,并开始从响应的输入流中读取返回结果。

public List<Object> sync() {
        List<Object> responseList = new ArrayList<>();
        try {
            // 遍历获取所有客户端结果
            for (Client client : clients) {
                // 获取所有服务端响应readProtocolWithCheckingBroken【Protocol.read(inputStream);】
                List<Object> unformatted = client.getAll();
                for (Object o : unformatted) {
                    // 从pipelinedResponses队列中弹出一个Response写入数据
                    Response<?> data = generateResponse(o);
                    if (null != responseList) {
                        responseList.add(data.get());
                    }
                }
            }
            return responseList;
        } catch (JedisRedirectionException jre) {
            if (jre instanceof JedisMovedDataException) {
                // 如果发生重定向(槽位重定向),则重建群集的插槽缓存
                refreshCluster();
            }
            throw jre;
        } finally {
            this.close();
       }
}

generateResponse是父类PipelineBase的方法,PipelineBase继承Queable。

使用Pipeline连续的往RedisOutputStream写入命令,每写入一条命令就会返回一个Response对象,同时这个Response被放入一个Queue<Response>队列中,这步一会分析。而这个Response对象就跟Futute功能一样,你可以调用Response的get方法获取返回结果,只是此时调用get会直接抛出JedisDataException异常。

public class Response<T> {
  protected T response = null;
  .....
  private boolean set = false;
  .......
  public void set(Object data) {
    this.data = data;
    set = true;
  }

  public T get() {
    .......
    if (!set) {
      throw new JedisDataException(
          "Please close pipeline or multi block before calling this method.");
    }
   .....
    return response;
  }
}

PipelineBase继承Queable,Queable有一个Queue类型字段pipelinedResponses,所以我们在自己实现的JedisClusterPipeline的sync方法中调用generateResponse方法,就会从队列中弹出一个Response对象并写入结果,此时调用get方法才能获取到返回结果。

public class Queable {
  private Queue<Response<?>> pipelinedResponses = new LinkedList<Response<?>>();
  .......
  // 从Response队列中头部弹出一个Response,并给Response写入结果
  protected Response<?> generateResponse(Object data) {
    Response<?> response = pipelinedResponses.poll();
    if (response != null) {
      response.set(data);
    }
    return response;
  }
  // 根据build new一个Response,并放入队列中
  // builder是用于解析结果的,比如T为Long,则build会将结果字符串解析为Long
  // public static final Builder<Long> LONG = new Builder<Long>() {
  //      public Long build(Object data) {
  //         return (Long)data;
  //     }
  // };
  protected <T> Response<T> getResponse(Builder<T> builder) {
    Response<T> lr = new Response<T>(builder);
    pipelinedResponses.add(lr);
    return lr;
  }
  .........
}

Jedis的Pipeline实现利用了Queue的先入先出特性,按命令的执行顺序响应结果。但这种先写入的命令先响应结果,在Cluster下,就会导致结果与命令不对应。因为多个Client,每个Client都执行一些命令,你无法保证获取结果顺序,在JedisCluster下实现Pipeline最好放弃响应结果。或者忽略响应结果的顺序问题。如果强需求获取命令的对应返回结果,那么此Pipeline并不能满足你。JedisCluster为什么不支持Pipeline是有道理的。

最后是close方法的实现,就是将当前pipeline持有的所有Jedis连接释放回连接池,遍历所有Jedis调用其close方法即可。同时flushCachedData方法是调用jedis.getClient().getAll()获取所有返回结果,其实就是清理Client的响应输入流,避免Jedis被复用时读取到错误的结果。

@Override
public void close() {
     super.clean();
     clients.clear();
     for (Jedis jedis : jedisMap.values()) {
           flushCachedData(jedis);
           jedis.close();
      }
      jedisMap.clear();
}

clean方法是父类Queable的方法,调用清空Response队列。

protected void clean() {
    pipelinedResponses.clear();
}

至此,一个简单的JedisClusterPipeline就完成了。

需要注意的地方

由于Cluster集群模式存在节点的动态添加或删除,且client不能实时感知,所以,建议在批量操作之前调用重新获取一遍集群信息。或是发生失败时再重新获取集群信息,毕竟会改变的概率很小,完全不用为这种小概率买单,前提是能容忍偶然的失败。

应用需要保证不论成功还是失败都会调用所有Jedis的close方法释放连接,且释放连接回连接池之前要清理Client。

在使用hmset这类批量命令时,如果field较多可以分批次写入,避免因命令执行耗时导致的阻塞。这点尤其要重视,我们项目中目前也存在这个问题。Pipeline建议命令总和不超过8192字节的缓冲区大小。

#后端

声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。

文章推荐

Dubbo分层架构之服务注册中心层的源码分析(上)

服务注册与发现是Dubbo核心的一个模块,假如没有注册中心,我们要调用远程服务,就必须要预先配置,就像调用第三方http接口一样,需要知道接口的域名或者IP、端口号才能调用。

缓存雪崩、穿透如何解决,如何确保Redis只缓存热点数据?

缓存雪崩如何解决?缓存穿透如何解决?如何确保Redis缓存的都是热点数据?如何更新缓存数据?如何处理请求倾斜?实际业务场景下,如何选择缓存数据结构。

线上RPC远程调用频繁超时问题排查,大功臣Arthas

项目不断新增需求,难免不会出现问题,特别是近期新增的增加请求处理耗时的需求。以及一些配置的修改而忽略掉的问题,如dubbo工作线程数调增时,忽略了redis连接池的修改。由于redis可用连接远小于工作线程数,就会出现多个线程竞争redis连接,影响性能。

Redis性能问题如何排查

并发数上升,到底是哪个服务处理能力到了瓶颈,还是Redis性能到了瓶颈,只有找出是哪里的性能问题,才能对症下药。所以,了解redis的一些运维知识能够帮助我们快速判定是否Redis集群的性能问题。

Dubbo自适应随机负载均衡策略的实现

Dubbo默认使用随机负载均衡策略,据笔者了解,目前Dubbo一共提供了四种可选的负载均衡策略,有关于负载均衡策略的实现,如果不怕阅读源码枯燥的,笔者推荐阅读官网的源码导读部份的文档。

源码分析Dubbo负载均衡策略的权重如何动态修改

dubbo随机负载均衡的权重很少会用到吗?之前我想给随机负载均衡策略配置权重,各种搜索都找不到答案,包括翻阅官方文档。而且我们项目中用的还是最新的Nacos注册中心,非常无奈,最后只能在源码中寻找答案。