zookeeper系列(六)实战分布式队列

网络算
• 阅读 11448

zookeeper系列(一)zookeeper必知
zookeeper系列(二)实战master选举
zookeeper系列(三)实战数据发布订阅
zookeeper系列(四)实战负载均衡
zookeeper系列(五)实战分布式锁
zookeeper系列(六)实战分布式队列
zookeeper系列(七)实战分布式命名服务
zookeeper系列(八)zookeeper运维

分布式队列

在传统的单进程编程中,我们使用队列来存储一些数据结构,用来在多线程之间共享或传递数据。

分布式环境下,我们同样需要一个类似单进程队列的组件,用来实现跨进程、跨主机、跨网络的数据共享和数据传递,这就是我们的分布式队列。

zookeeper可以通过顺序节点实现分布式队列。

架构图

zookeeper系列(六)实战分布式队列

图中左侧代表zookeeper集群,右侧代表消费者和生产者。
生产者通过在queue节点下创建顺序节点来存放数据,消费者通过读取顺序节点来消费数据。

流程图

offer核心算法流程

zookeeper系列(六)实战分布式队列

poll核心算法流程

zookeeper系列(六)实战分布式队列

代码实现

/**
 * 简单分布式队列
 */
public class DistributedSimpleQueue<T> {

    protected final ZkClient zkClient;
    // queue节点
    protected final String root;
    // 顺序节点前缀
    protected static final String Node_NAME = "n_";


    public DistributedSimpleQueue(ZkClient zkClient, String root) {
        this.zkClient = zkClient;
        this.root = root;
    }

    // 判断队列大小
    public int size() {
        return zkClient.getChildren(root).size();
    }

    // 判断队列是否为空
    public boolean isEmpty() {
        return zkClient.getChildren(root).size() == 0;
    }

    // 向队列提供数据
    public boolean offer(T element) throws Exception{

        // 创建顺序节点
        String nodeFullPath = root .concat( "/" ).concat( Node_NAME );
        try {
            zkClient.createPersistentSequential(nodeFullPath , element);
        }catch (ZkNoNodeException e) {
            zkClient.createPersistent(root);
            offer(element);
        } catch (Exception e) {
            throw ExceptionUtil.convertToRuntimeException(e);
        }
        return true;
    }


    // 从队列取数据
    public T poll() throws Exception {
        
        try {

            // 获取所有顺序节点
            List<String> list = zkClient.getChildren(root);
            if (list.size() == 0) {
                return null;
            }

            // 排序
            Collections.sort(list, new Comparator<String>() {
                public int compare(String lhs, String rhs) {
                    return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));
                }
            });

            // 循环每个顺序节点名
            for ( String nodeName : list ){

                // 构造出顺序节点的完整路径
                String nodeFullPath = root.concat("/").concat(nodeName);    
                try {
                    // 读取顺序节点的内容
                    T node = (T) zkClient.readData(nodeFullPath);
                    // 删除顺序节点
                    zkClient.delete(nodeFullPath);
                    return node;
                } catch (ZkNoNodeException e) {
                    // ignore 由其他客户端把这个顺序节点消费掉了
                }
            }
            
            return null;
            
        } catch (Exception e) {
            throw ExceptionUtil.convertToRuntimeException(e);
        }

    }

    private String getNodeNumber(String str, String nodeName) {
        int index = str.lastIndexOf(nodeName);
        if (index >= 0) {
            index += Node_NAME.length();
            return index <= str.length() ? str.substring(index) : "";
        }
        return str;

    }

}
public class User implements Serializable {
    
    String name;
    String id;
    
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }

}
public class TestDistributedSimpleQueue {

    public static void main(String[] args) {
        
        
        ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer());
        DistributedSimpleQueue<User> queue = new DistributedSimpleQueue<User>(zkClient,"/Queue");
        
        User user1 = new User();
        user1.setId("1");
        user1.setName("xiao wang");
        
        User user2 = new User();
        user2.setId("2");
        user2.setName("xiao wang");        
        
        try {
            queue.offer(user1);
            queue.offer(user2);
            User u1 = (User) queue.poll();
            User u2 = (User) queue.poll();
            
            if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){
                System.out.println("Success!");
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        }
        
    }
    
}

上面实现了一个简单分布式队列,在此基础上,我们再扩展一个阻塞分布式队列。代码如下:

/**
 * 阻塞分布式队列
 * 扩展自简单分布式队列,在拿不到队列数据时,进行阻塞直到拿到数据
 */
public class DistributedBlockingQueue<T> extends DistributedSimpleQueue<T>{      
    
    
    public DistributedBlockingQueue(ZkClient zkClient, String root) {
        super(zkClient, root);

    }
    

    @Override
    public T poll() throws Exception {

        while (true){ // 结束在latch上的等待后,再来一次
            
            final CountDownLatch    latch = new CountDownLatch(1);
            final IZkChildListener childListener = new IZkChildListener() {
                public void handleChildChange(String parentPath, List<String> currentChilds)
                        throws Exception {
                    latch.countDown(); // 队列有变化,结束latch上的等待
                }
            };
            zkClient.subscribeChildChanges(root, childListener);
            try{
                T node = super.poll(); // 获取队列数据
                if ( node != null ){
                    return node;
                } else {
                    latch.await(); // 拿不到队列数据,则在latch上await
                }
            } finally {
                zkClient.unsubscribeChildChanges(root, childListener);
            }
            
        }
    }

}
public class TestDistributedBlockingQueue {

    public static void main(String[] args) {
        
        
        ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
        int delayTime = 5;
        
        ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer());
        final DistributedBlockingQueue<User> queue = new DistributedBlockingQueue<User>(zkClient,"/Queue");
        
        final User user1 = new User();
        user1.setId("1");
        user1.setName("xiao wang");
        
        final User user2 = new User();
        user2.setId("2");
        user2.setName("xiao wang");        
        
        try {
            
            delayExector.schedule(new Runnable() {
                
                public void run() {
                    try {
                        queue.offer(user1);
                        queue.offer(user2);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    
                }
            }, delayTime , TimeUnit.SECONDS);
            
            System.out.println("ready poll!");
            User u1 = (User) queue.poll();
            User u2 = (User) queue.poll();
            
            if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){
                System.out.println("Success!");
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally{
            delayExector.shutdown();
            try {
                delayExector.awaitTermination(2, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
            }
            
        }
        
    }
    
}

zookeeper系列(一)zookeeper必知
zookeeper系列(二)实战master选举
zookeeper系列(三)实战数据发布订阅
zookeeper系列(四)实战负载均衡
zookeeper系列(五)实战分布式锁
zookeeper系列(六)实战分布式队列
zookeeper系列(七)实战分布式命名服务
zookeeper系列(八)zookeeper运维

点赞
收藏
评论区
推荐文章
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
胖大海 胖大海
3年前
Linux centos7 安装zookeeper
一:下载zookeeper安装包从官网下载:https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper3.5.10/apachezookeeper3.5.10.tar.gz二:上传并配置zookeeper1.放在/usr/local目录下并解压。tarzxvfapachezookeeper3.5.1
隔壁老王 隔壁老王
4年前
python调用zookeeper
ZooKeeper1.简介ZooKeeper是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。ZooKeeper通过其简单的架构和API解决了这个问题。ZooKeeper允许开发人员专注于核心应用程序逻辑,而不必担心应用程序的分布式特性。ZooKeeper框架最初是在“Yahoo"上构建的,用于以简
Stella981 Stella981
4年前
CentOS 6.5 部署Kafka集群
在部署前需确保网络正常及已安装JDK和Zookeeper环境JDK安装教程(https://my.oschina.net/linch/blog/1817152)部署Zookeeper(https://my.oschina.net/linch/blog/1816836)zookeeper最好也把IP给绑定上去ClientPortAdd
Stella981 Stella981
4年前
Hadoop 2.6.0 HA高可用集群配置详解(二)
Zookeeper集群安装Zookeeper是一个开源分布式协调服务,其独特的LeaderFollower集群结构,很好的解决了分布式单点问题。目前主要用于诸如:统一命名服务、配置管理、锁服务、集群管理等场景。大数据应用中主要使用Zookeeper的集群管理功能。本集群使用zookeeper3.4.5cdh5.7.1版本。首先在Hado
Stella981 Stella981
4年前
Centos7
1\.下载zookeeper压缩包    \root@localhost(https://my.oschina.net/u/570656)tools\wgethttp://mirrors.shuosc.org/apache/zookeeper/zookeeper3.4.11/zookeeper3.4.11.tar.gz2\.
Stella981 Stella981
4年前
Linux安装zookeeper
安装zookeeper1、解压缩zookeeper3.4.6.tar.gz:    tarzxvfzookeeper3.4.6.tar.gz2、创建/usr/local/zookeeper文件夹:mkdirp/usr/local/zookeeper 3、进入到/usr/local/zookeeper目录
Stella981 Stella981
4年前
Dubbo+Zookeeper+SpringMVC整合实现分布式
目录DubboZookeeperSpringMVC整合实现分布式服务治理框架...1一、分布式服务治理架构原理分析...3二、先决条件...5三、Zookeeper安装与配置...63.1 Zookeeper下载与解压...63.2复制和编辑配置文件...63.3创建myid文件...73.4Zoo
Stella981 Stella981
4年前
Linux下 zookeeper集群安装
准备环境:3台linux虚拟主机,zookeeper安装包,zookeeper版本号是3.4.6,本文zookeeper安装在/usr/local目录下一、首先下载zookeepercd/usr/localwget http://apache.org/dist/zookeeper/zookeeper3.4.6/zookeeper3.4.6
可莉 可莉
4年前
2020年最新ZooKeeper面试题(附答案)
2020年最新ZooKeeper面试题1\.ZooKeeper是什么?ZooKeeper是一个开源的分布式协调服务。它是一个为分布式应用提供一致性服务的软件,分布式应用程序可以基于Zookeeper实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master选举、分布
Stella981 Stella981
4年前
2020年最新ZooKeeper面试题(附答案)
2020年最新ZooKeeper面试题1\.ZooKeeper是什么?ZooKeeper是一个开源的分布式协调服务。它是一个为分布式应用提供一致性服务的软件,分布式应用程序可以基于Zookeeper实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master选举、分布