中间件 - ZooKeeper应用场景实践

脑机接口
• 阅读 3781

注:该文章用作回顾记录

一、准备工作

预先下载安装 ZooKeeper ,简单配置就能使用了。然后构建 Maven 项目,将下面的代码粘贴到 pom.xml中:

    <dependency>  
        <groupId>org.apache.zookeeper</groupId>  
        <artifactId>zookeeper</artifactId>  
        <version>3.4.5</version>  
    </dependency>  
    <dependency>  
        <groupId>com.101tec</groupId>  
        <artifactId>zkclient</artifactId>  
        <version>0.5</version>  
    </dependency> 

zkclient 是开源的客户端工具,其中封装了很多功能,比如:删除包含子节点的父节点,连接重试,异步回调,偏向 Java 写法的注册监听等,极大地方便了用户使用。

下面不过多介绍客户端操作,只针对应用场景做介绍,该文章会随着本人的学习持续补充。

二、数据发布/订阅

使用 ZooKeeper 节点监听来实现该功能:

ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); // 连接集群

zkClient.createPersistent("/xxx/xxx"); // 创建持久节点

// 注册子节点变更监听,当子节点改变(比如创建了"/xxx/xxx/1")或当前节点删除等,会触发异步回调
zkClient.subscribeChildChanges("/xxx/xxx", new IZkChildListener() {
    @Override
    public void handleChildChange(String parentPath, List<String> currentChilds) 
        throws Exception {
    }
});

下面为部分源码:

package org.I0Itec.zkclient;

public class ZkClient implements Watcher {

    public List<String> watchForChilds(final String path) {
        return retryUntilConnected(new Callable<List<String>>() {
            @Override
            public List<String> call() throws Exception {
                exists(path, true);
                try {
                    return getChildren(path, true);
                } catch (ZkNoNodeException e) {
                }
                return null;
            }
        });
    }
    
    public <T> T retryUntilConnected(Callable<T> callable) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
        final long operationStartTime = System.currentTimeMillis();
        while (true) {
            if (_closed) {
                throw new IllegalStateException("ZkClient already closed!");
            }
            try {
                return callable.call();
            } catch (Exception e) {
                throw ExceptionUtil.convertToRuntimeException(e);
            }
        }
    }
}

基于 ZooKeeper 实现的数据发布/订阅很简单吧,快动手试试。

三、分布式锁

这部分是 ZooKeeper 重要功能,在此基础上实现诸如,分布式协调/通知,负载均衡,Master选举等复杂场景。

1、排它锁

排它锁又称为写锁或独占锁。比如事务 T1 对数据对象 O1 加了排它锁,那么在整个加锁期间,只允许 T1 对 O1 进行读取或更新操作,其它事务都不能对 O1 操作。

1)获取锁

所有客户端都创建临时节点 zkClient.createEphemeral("/xxx/xxx", null);,ZooKeeper 会保证在所有客户端中,最终只有一个客户端能创建成功,那么就认为该客户端获取了锁。同时,所有没获取到锁的客户端需在/xxx/xxx 上注册子节点变更监听,以便实时监听节点变化。如节点发生变化,则未获取到锁的客户端再重新获取锁。

private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000);
private static final String lockParentPath = "/zk-book/exclusice_lock";

public static void main(String[] args) throws InterruptedException {
    try {
        zkClient.createEphemeral(lockParentPath + "/lock");
        System.out.println("service3 获取锁成功");
    } catch (Exception e) {
        System.out.println("service3获取锁失败");
        zkClient.subscribeChildChanges(lockParentPath, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds)
                    throws Exception {
                System.out.println("service3再次获取锁");
                main(null);
            }
        });
    }
    Thread.sleep(Integer.MAX_VALUE);
}

2)释放锁

"/xxx/xxx" 是临时节点时,以下俩种情况都会释放锁。

  1. 当已获取锁的客户机宕机,导致连接超时断开,那么 ZooKeeper 会将临时节点删除。
  2. 正常执行完逻辑后,客户端主动将临时节点删除。

中间件 - ZooKeeper应用场景实践

2、共享锁

共享锁又称为读锁。如果事务 T1 对数据对象 O1 加了共享锁,那么 T1 只能对 O1 进行读取操作,其它事务只能对 O1 加共享锁,直到 O1 上所有共享锁都被释放。

1)获取锁

所有客户端都创建临时顺序节点 zkClient.createEphemeralSequential("/xxx/xxx", null);,ZooKeeper 会生成类似下面的节点,已保证节点的唯一性。

中间件 - ZooKeeper应用场景实践

2)判断读写顺序

  1. 创建完临时顺序节点后,获取 "/xxx" 下的所有子节点,并对该节点注册子节点变更监听。
  2. 确定创建完的临时顺序节点在所有节点中的顺序。
  3. 对于读节点:
    没有比自己序号小的节点,或比自己序号小的节点都是读节点,则成功获取到共享锁。
    如果比自己序号小的节点中存在写节点,则需进入等待。
    对于写节点:
    如果自己不是序号最小的节点,则需进入等待。
  4. 接受到子节点变更通知后,重复步骤1

中间件 - ZooKeeper应用场景实践

以下为实现代码:

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.http.client.ClientProtocolException;

/**
 * 分布式共享锁
 * @author alexnevsky
 * @date 2018年5月23日
 */
public class SharedLock {
    
    private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000);
    private static final String PARENT_PATH = "/zk-book/shared_lock";
    private static volatile boolean isExecuted = false;
    
    public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException {
        String nodeTemp = zkClient.createEphemeralSequential(PARENT_PATH + "/w-", null);
        String node = nodeTemp.substring(nodeTemp.lastIndexOf("/") + 1);
        
        List<String> currentChilds = sortNodes(zkClient.getChildren(PARENT_PATH));
        if (currentChilds.size() > 0)
            isExecuted = getLockAndExecute(currentChilds, node);
        
        zkClient.subscribeChildChanges(PARENT_PATH, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds)
                    throws Exception {
                if (currentChilds.size() > 0) {
                    currentChilds = sortNodes(currentChilds);
                    isExecuted = getLockAndExecute(currentChilds, node);
                }
            }
        });
        
        while (!isExecuted) {
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
    
    /**
     * 排序节点
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @return
     */
    private static List<String> sortNodes(List<String> nodes) {
        Collections.sort(nodes, new Comparator<String>() {
            @Override
            public int compare(String o1, String o2) {
                o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", "");
                o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", "");
                return Integer.parseInt(o1) - Integer.parseInt(o2); // 比较序列号
            }
        });
        return nodes;
    }
    
    /**
     * 获取节点位置
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @param node
     * @return
     */
    private static Integer getNodePosition(List<String> nodes, String node) {
        for (int i = 0, size = nodes.size(); i < size; i++) {
            if (nodes.get(i).equals(node))
                return i;
        }
        return null; // 无此数据
    }
    
    /**
     * 是否得到锁
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @param node
     * @param nodePosition
     * @return
     */
    private static boolean isGetLock(List<String> nodes, String node, int nodePosition) {
        if (nodePosition == 0) // 没有比此序号更小的节点 
            return true;
        if (node.indexOf("r-") > -1) { // 读节点
            for (int i = 0; i < nodePosition; i++) { // 遍历小于次序号的节点
                String nodeTemp = nodes.get(i);
                if (nodeTemp.indexOf("w-") > -1)  // 存在写节点,则进入等待锁
                    return false;
            }
            return true;
        }
        return false;
    }
    
    /**
     * 获取锁并执行
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static boolean getLockAndExecute(List<String> currentChilds, String node) {
        Integer nodePosition = getNodePosition(currentChilds, node);
        if (nodePosition == null) // 子节点为空
            return false;
        System.out.println("子节点:" + currentChilds.toString() + ", " + node + " 的位置:" + nodePosition);
        boolean isGetLock = isGetLock(currentChilds, node, nodePosition);
        if (isGetLock) {
            System.out.println(node + " 成功获取到锁,开始执行耗时任务");
            doSomething();
            boolean isSuccess = zkClient.delete(PARENT_PATH + "/" + node);
            if (isSuccess)
                System.out.println(node + " 成功执行完任务并删除节点");
        } else {
            System.out.println(node + " 未获取到锁");
        }
        return isGetLock;
    }
    
    private static void doSomething() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
}

测试以上代码会发现,当获取锁的节点过多时,某一节点变更会通知所有节点,会对 ZooKeeper 服务器造成巨大的性能影响和网络冲击,服务器会发送给客户端大量的事件通知。比如有以下节点,当 w-24 节点变更时,会通知给其余节点。

中间件 - ZooKeeper应用场景实践

因为当获取共享锁时,要判断比自己序号小的节点,所以应该只给 r-25 节点发送通知。针对此情况,改进后判断读写顺序为:

  1. 创建完临时顺序节点后,获取 "/xxx" 下的所有子节点。
  2. 客户端调用 getChildren() 来获取子节点列表,注意,这里不注册任何监听。
  3. 如果未获取到共享锁,那么找到比自己序号小的节点来注册监听,分为以下俩种情况:
    读节点:比自己序号小的最后一个写节点注册监听
    写节点:比自己序号小的最后一个节点注册监听
  4. 等待监听通知,重复步骤2

中间件 - ZooKeeper应用场景实践

改进后的共享锁代码实现:

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.http.client.ClientProtocolException;

/**
 * 分布式共享锁最优
 * @author alexnevsky
 * @date 2018年5月23日
 */
public class SharedLockOptimal {
    
    private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000);
    private static final String PARENT_PATH = "/zk-book/shared_lock";
    private static String nodeFullPath = zkClient.createEphemeralSequential(PARENT_PATH + "/r-", null);
    
    public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException {
        List<String> currentChilds = sortNodes(zkClient.getChildren(PARENT_PATH));
        String node = nodeFullPath.substring(nodeFullPath.lastIndexOf("/") + 1);
        
        boolean isReadNode = node.indexOf("r-") > -1 ? true : false, isGetLock = getLock(currentChilds, node);
        System.out.println("当前所有节点:" + currentChilds.toString() + ", 该" + (isReadNode ? "读" : "写") + "节点:" + node);
        
        if (isGetLock) {
            execute(node);
            System.out.println("退出程序");
            System.exit(1);
        } else {
            String monitorNode = getMonitorNode(currentChilds, node);
            System.out.println(node + " 未获取到锁,注册监听节点:" + monitorNode);
            if (null != monitorNode) {
                zkClient.subscribeChildChanges(PARENT_PATH + "/" + monitorNode, new IZkChildListener() {
                    @Override
                    public void handleChildChange(String parentPath, List<String> currentChilds)
                            throws Exception {
                        main(null); // 递归调用
                    }
                });
            }
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
    
    /**
     * 排序节点
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @return
     */
    private static List<String> sortNodes(List<String> nodes) {
        Collections.sort(nodes, new Comparator<String>() {
            @Override
            public int compare(String o1, String o2) {
                o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", "");
                o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", "");
                return Integer.parseInt(o1) - Integer.parseInt(o2); // 比较序列号
            }
        });
        return nodes;
    }
    
    /**
     * 获取节点位置
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static Integer getNodePosition(List<String> currentChilds, String node) {
        for (int i = 0, size = currentChilds.size(); i < size; i++) {
            if (currentChilds.get(i).equals(node))
                return i;
        }
        return null;
    }
    
    /**
     * 获取监听节点
     * @author alexnevsky
     * @date 2018年5月25日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static String getMonitorNode(List<String> currentChilds, String node) {
        String monitorNode = null;
        Integer nodePosition = getNodePosition(currentChilds, node);
        if (0 < nodePosition) { // 非首节点
            if (node.indexOf("r-") > -1) { // 读节点
                // 获取比当前序号小的最后一个写节点
                for (int i = nodePosition - 1; i >= 0; i--) {
                    String tempNode = currentChilds.get(i);
                    if (tempNode.indexOf("w-") > -1) 
                        return tempNode;
                }
            } else {
                // 获取比当前序号小的最后一个节点
                return currentChilds.get(nodePosition - 1);
            }
        }
        return monitorNode;
    }
    
    /**
     * 获取锁
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static boolean getLock(List<String> currentChilds, String node) {
        Integer nodePosition = getNodePosition(currentChilds, node);
        if (nodePosition == null)
            return false;
        if (nodePosition == 0) // 无序号更小的节点 
            return true;
        if (node.indexOf("r-") > -1) { // 读节点
            for (int i = 0; i < nodePosition; i++) { // 遍历前面序号的节点
                String tempNode = currentChilds.get(i);
                if (tempNode.indexOf("w-") > -1)  // 存在写节点,返回失败
                    return false;
            }
            return true;
        }
        return false;
    }
    
    /**
     * 执行
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param node
     * @return
     */
    private static void execute(String node) {
        System.out.println(node + " 成功获取到锁,开始执行耗时任务");
        doSomething();
        boolean isDeletedLock = zkClient.delete(nodeFullPath);
        System.out.println(node + " 成功执行完任务,删除节点" + (isDeletedLock ? "成功" : "失败"));
    }
    
    /**
     * 模拟耗时任务
     * @author alexnevsky
     * @date 2018年5月25日
     */
    public static void doSomething() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
}
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
6个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Easter79 Easter79
3年前
sql注入
反引号是个比较特别的字符,下面记录下怎么利用0x00SQL注入反引号可利用在分隔符及注释作用,不过使用范围只于表名、数据库名、字段名、起别名这些场景,下面具体说下1)表名payload:select\from\users\whereuser\_id1limit0,1;!(https://o
Stella981 Stella981
3年前
Hadoop 2.6.0 HA高可用集群配置详解(二)
Zookeeper集群安装Zookeeper是一个开源分布式协调服务,其独特的LeaderFollower集群结构,很好的解决了分布式单点问题。目前主要用于诸如:统一命名服务、配置管理、锁服务、集群管理等场景。大数据应用中主要使用Zookeeper的集群管理功能。本集群使用zookeeper3.4.5cdh5.7.1版本。首先在Hado
Stella981 Stella981
3年前
Kafka安装教程(详细过程)
安装前期准备:1,准备三个节点(根据自己需求决定)2,三个节点上安装好zookeeper(也可以使用kafka自带的zookeeper)3,关闭防火墙chkconfig iptablesoff一、下载安装包Kafka官网下载安装包http://kafka.apache.org/downloads.html我们下载第二种(已
Wesley13 Wesley13
3年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Stella981 Stella981
3年前
Bitmap 位图 Java实现
一、结构思想以bit作为存储单位进行0、1存取的数据结构。可用作布尔值存取,比如给定第i位,该bit为1则表示true,为0则表示false。二、使用场景及优点适用于对布尔或0、1值进行(大量)存取的场景。如:记录一个用户365天的签到记录,签了为true,没签为false。若是以普通key/valu
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
PHP创建多级树型结构
<!lang:php<?php$areaarray(array('id'1,'pid'0,'name''中国'),array('id'5,'pid'0,'name''美国'),array('id'2,'pid'1,'name''吉林'),array('id'4,'pid'2,'n
京东云开发者 京东云开发者
9个月前
RaftKeeper v2.1.0版本发布,性能大幅提升!
RaftKeeper是一款高新能分布式共识服务,完全兼容Zookeeper但性能更出色,更多关于RaftKeeer参考,我们将RaftKeeper大规模应用到ClickHouse场景中,用于解决ZooKeeper的性能瓶颈问题,同时RaftKeeper也可
脑机接口
脑机接口
Lv1
看不清的挽留,正如你执着地向前走
文章
4
粉丝
0
获赞
0