python调用zookeeper

隔壁老王 等级 480 0 0

ZooKeeper

1. 简介

ZooKeeper是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。

ZooKeeper通过其简单的架构和API解决了这个问题。ZooKeeper允许开发人员专注于核心应用程序逻辑,而不必担心应用程序的分布式特性。

ZooKeeper框架最初是在“Yahoo!"上构建的,用于以简单而稳健的方式访问他们的应用程序。 后来,Apache ZooKeeper成为Hadoop,HBase和其他分布式框架使用的有组织服务的标准。 例如,Apache HBase使用ZooKeeper跟踪分布式数据的状态。

python调用zookeeper

2. 概念知识

层次命名空间

下图描述了用于内存表示的ZooKeeper文件系统的树结构(ZooKeeper的数据保存形式)。ZooKeeper节点称为 znode 。每个znode由一个名称标识,并用路径(/)序列分隔。

每个znode最多可存储1MB的数据。

python调用zookeeper

Znode的类型

Znode被分为持久(persistent)节点,顺序(sequential)节点和临时(ephemeral)节点。

  • 持久节点 - 即使在创建该特定znode的客户端断开连接后,持久节点仍然存在。默认情况下,除非另有说明,否则所有znode都是持久的。
  • 临时节点 - 客户端活跃时,临时节点就是有效的。当客户端与ZooKeeper集合断开连接时,临时节点会自动删除。因此,只有临时节点不允许有子节点。如果临时节点被删除,则下一个合适的节点将填充其位置。临时节点在leader选举中起着重要作用。
  • 顺序节点 - 顺序节点可以是持久的或临时的。当一个新的znode被创建为一个顺序节点时,ZooKeeper通过将10位的序列号附加到原始名称来设置znode的路径。例如,如果将具有路径 /myapp 的znode创建为顺序节点,则ZooKeeper会将路径更改为 /myapp0000000001 ,并将下一个序列号设置为0000000002。如果两个顺序节点是同时创建的,那么ZooKeeper不会对每个znode使用相同的数字。顺序节点在锁定和同步中起重要作用。

Watches(监视)

监视是一种简单的机制,使客户端收到关于ZooKeeper集合中的更改的通知。客户端可以在读取特定znode时设置Watches。Watches会向注册的客户端发送任何znode(客户端注册表)更改的通知。

Znode更改是与znode相关的数据的修改或znode的子项中的更改。只触发一次watches。如果客户端想要再次通知,则必须通过另一个读取操作来完成。当连接会话过期时,客户端将与服务器断开连接,相关的watches也将被删除。

ZooKeeper安装

在安装ZooKeeper之前,请确保你的系统是在以下任一操作系统上运行:

任意Linux OS - 支持开发和部署。适合演示应用程序。

Windows OS - 仅支持开发。

Mac OS - 仅支持开发。

ZooKeeper服务器是用Java创建的,它在JVM上运行。你需要使用JDK 6或更高版本。

现在,按照以下步骤在你的机器上安装ZooKeeper框架。

步骤1:验证Java安装

相信你已经在系统上安装了Java环境。现在只需使用以下命令验证它。

$ java -version

如果你在机器上安装了Java,那么可以看到已安装的Java的版本。否则,请按照以下简单步骤安装最新版本的Java。

步骤1.1:下载JDK

通过访问链接下载最新版本的JDK,并下载最新版本的Java

步骤1.2:提取文件

通常,文件会下载到download文件夹中。验证并使用以下命令提取tar设置。

$ cd /path/to/download/
$ tar -zxvf jdk-8u181-linux-x64.gz  

步骤1.3:移动到/usr/local/jdk目录

要使Java对所有用户可用,请将提取的Java内容移动到“/usr/local/jdk"文件夹。

$ sudo mkdir /usr/local/jdk
$ sudo mv jdk1.8.0_181 /usr/local/jdk

步骤1.4:设置路径

要设置路径和JAVA_HOME变量,请将以下命令添加到〜/.bashrc文件中。

export JAVA_HOME=/usr/local/jdk/jdk1.8.0_181
export PATH=$PATH:$JAVA_HOME/bin

现在,将所有更改应用到当前运行的系统中。

$ source ~/.bashrc

步骤1.5

使用步骤1中说明的验证命令(java -version)验证Java安装。

步骤2:ZooKeeper框架安装

步骤2.1:下载ZooKeeper

要在你的计算机上安装ZooKeeper框架,请访问以下链接并下载最新版本的ZooKeeper。

http://zookeeper.apache.org/releases.html

到目前为止,最新版本的ZooKeeper是3.4.12(ZooKeeper-3.4.12.tar.gz)。

步骤2.2:提取tar文件

使用以下命令提取tar文件

$ cd /path/to/download/
$ tar -zxvf zookeeper-3.4.12.tar.gz
$ cd zookeeper-3.4.12
$ mkdir data

步骤2.3:创建配置文件

使用命令 vi conf/zoo.cfg 和所有以下参数设置为起点,打开名为 conf/zoo.cfg 的配置文件。

$ vi conf/zoo.cfg

tickTime = 2000
dataDir = /path/to/zookeeper/data
clientPort = 2181

一旦成功保存配置文件,再次返回终端。你现在可以启动zookeeper服务器。

步骤2.4:启动ZooKeeper服务器

执行以下命令

$ bin/zkServer.sh start

执行此命令后,你将收到以下响应

$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.12/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED

步骤2.5:启动CLI

键入以下命令

$ bin/zkCli.sh

键入上述命令后,将连接到ZooKeeper服务器,你应该得到以下响应。

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

停止ZooKeeper服务器

连接服务器并执行所有操作后,可以使用以下命令停止zookeeper服务器。

$ bin/zkServer.sh stop

Kazoo

kazoo是Python连接操作ZooKeeper的客户端库。我们可以通过kazoo来使用ZooKeeper。

1. 安装

pip install kazoo

2. 使用

连接ZooKeeper

from kazoo.client import KazooClient

zk = KazooClient(hosts='127.0.0.1:2181')

# 启动连接
zk.start() 

# 停止连接
zk.stop()

创建节点

# 创建节点路径,但不能设置节点数据值
zk.ensure_path("/my/favorite")

# 创建节点,并设置节点保存数据,ephemeral表示是否是临时节点,sequence表示是否是顺序节点
zk.create("/my/favorite/node", b"a value", ephemeral=True, sequence=True)

读取节点

# 获取子节点列表
children = zk.get_children("/my/favorite")

# 获取节点数据data 和节点状态stat
data, stat = zk.get("/my/favorite")

设置监视

def my_func(event):
    # 检查最新的节点数据

# 当子节点发生变化的时候,调用my_func
children = zk.get_children("/my/favorite/node", watch=my_func)

server端

import threading
from kazoo.client import KazooClient

class ThreadServer(object):
    def __init__(self, host, port, handlers):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.host = host
        self.port = port
        self.sock.bind((host, port))
        self.handlers = handlers

    def serve(self):
        """
        开始服务
        """
        self.sock.listen(128)
        self.register_zk()
        print("开始监听")
        while True:
            conn, addr = self.sock.accept()
            print("建立链接%s" % str(addr))
            t = threading.Thread(target=self.handle, args=(conn,))
            t.start()

    def handle(self, client):
        stub = ServerStub(client, self.handlers)
        try:
            while True:
                stub.process()
        except EOFError:
            print("客户端关闭连接")

        client.close()

    def register_zk(self):
        """
        注册到zookeeper
        """
        self.zk = KazooClient(hosts='127.0.0.1:2181')
        self.zk.start()
        self.zk.ensure_path('/rpc')  # 创建根节点
        value = json.dumps({'host': self.host, 'port': self.port)
        # 创建服务子节点
        self.zk.create('/rpc/server', value.encode(), ephemeral=True, sequence=True)

client端

from services import ThreadServer
from services import InvalidOperation
import sys


class Handlers:
    @staticmethod
    def divide(num1, num2=1):
        """
        除法
        :param num1:
        :param num2:
        :return:
        """
        if num2 == 0:
            raise InvalidOperation()
        val = num1 / num2
        return val


if __name__ == '__main__':
    if len(sys.argv) < 3:
        print("usage:python server.py [host] [port]")
        exit(1)
    host = sys.argv[1]
    port = sys.argv[2]
    server = ThreadServer(host, int(port), Handlers)
    server.serve()

server改写

import random
import time

class DistributedChannel(object):
    def __init__(self):
        self._zk = KazooClient(hosts='127.0.0.1:2181')
        self._zk.start()
        self._get_servers()

    def _get_servers(self, event=None):
        """
        从zookeeper获取服务器地址信息列表
        """
        servers = self._zk.get_children('/rpc', watch=self._get_servers)
        print(servers)
        self._servers = []
        for server in servers:
            data = self._zk.get('/rpc/' + server)[0]
            addr = json.loads(data)
            self._servers.append(addr)

    def _get_server(self):
        """
        随机选出一个可用的服务器
        """
        return random.choice(self._servers)

    def get_connection(self):
        """
        提供一个可用的tcp连接
        """
        while True:
            server = self._get_server()
            print(server)
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.connect((server['host'], server['port']))
            except ConnectionRefusedError:
                time.sleep(1)
                continue
            else:
                break
        return sock

client改写

from services import ClientStub
from services import DistributedChannel
from services import InvalidOperation
import time


channel = DistributedChannel()

for i in range(50):
    try:
        stub = ClientStub(channel)
        val = stub.divide(i)
    except InvalidOperation as e:
        print(e.message)
    else:
        print(val)
    time.sleep(1)

测试完整代码

import json
from kazoo.client import KazooClient

zk = KazooClient(hosts="192.168.218.136:2181")

# 启动连接
zk.start()
# 创建节点路径,但不能设置节点数据值
zk.ensure_path("/rpc")
addr1 = {"host": "127.0.0.1", "port": 8001}
addr1_str = json.dumps(addr1)
# 创建节点,并设置节点保存数据,ephemeral表示是否是临时节点,sequence表示是否是顺序节点
zk.create("/rpc/server", addr1_str.encode(), ephemeral=True, sequence=True)

addr2 = {"host": "127.0.0.1", "port": 8002}
addr2_str = json.dumps(addr2)
zk.create("/rpc/server", addr2_str.encode(), ephemeral=True, sequence=True)


# 获取子节点列表
children = zk.get_children("/rpc")

# 获取节点数据data 和节点状态stat
for i in zk.get_children("/rpc"):
    print(zk.get("/rpc/"+i)[0])


def on_change(event):
    print(event)


# 设置监视, 该监视只触发一次
servers = zk.get_children("/rpc", watch=on_change)

# 停止连接
zk.stop()

service.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Wjy
import struct
from io import BytesIO
import socket
import threading
import json
import random
import time
from kazoo.client import KazooClient


class InvalidOperation(Exception):

    def __init__(self, message=None):
        self.message = message or "invalid operation"


class MethodProtocol(object):
    """
    解读方法名
    """
    def __init__(self, connection):
        self.conn = connection

    def _read_all(self, size):
        """
        帮助我们读取二进制数据
        :param size: 想要读取的二进制数据大小
        :return: 二进制数据 bytes
        """
        # self.conn
        # 读取二进制数据
        # socket.recv(4) => ?4
        # BytesIO.read
        if isinstance(self.conn, BytesIO):
            buff = self.conn.read(size)
            return buff
        else:
            # socket
            have = 0
            buff = b""
            while have < size:
                chunk = self.conn.recv(size - have)
                buff += chunk
                l = len(chunk)
                have += l

                if l == 0:
                    # 表示客户端socket关闭了
                    raise EOFError()
            return buff

    def get_method_name(self):
        """
        提供方法名
        :return: str 方法名
        """
        # 读取字符串长度
        buff = self._read_all(4)
        length = struct.unpack("!I", buff)[0]

        # 读取字符串
        buff = self._read_all(length)
        name = buff.decode()
        return name


class DivideProtocol(object):
    """
    divide过程消息协议转换工具
    """

    def args_encode(self, num1, num2=1):
        """
        将原始的调用请求参数转换打包成二进制消息数据
        :param num1: int
        :param num2: int
        :return: bytes 二进制消息shuju
        """
        name = "divide"

        # 处理方法的名字 字符串
        # 处理字符串的长度
        buff = struct.pack("!I", 6)
        # 处理字符
        buff += name.encode()

        # 处理参数1
        # 处理序号
        buff2 = struct.pack("!B", 1)
        # 处理参数值
        buff2 += struct.pack("!i", num1)

        # 处理参数2
        if num2 != 1:
            # 处理序号
            buff2 += struct.pack("!B", 2)
            # 处理参数值
            buff2 += struct.pack("!i", num2)

        # 处理消息长度,边界固定
        length = len(buff2)
        buff += struct.pack("!I", length)

        buff += buff2

        return buff

    def _read_all(self, size):
        """
        帮助我们读取二进制数据
        :param size: 想要读取的二进制数据大小
        :return: 二进制数据 bytes
        """
        # self.conn
        # 读取二进制数据
        # socket.recv(4) => ?4
        # BytesIO.read
        if isinstance(self.conn, BytesIO):
            buff = self.conn.read(size)
            return buff
        else:
            # socket
            have = 0
            buff = b""
            while have < size:
                chunk = self.conn.recv(size - have)
                buff += chunk
                l = len(chunk)
                have += l

                if l == 0:
                    # 表示客户端socket关闭了
                    raise EOFError()
            return buff

    def args_decode(self, connection):
        """
        接受调用请求消息数据并进行解析
        :param connection: 连接对象 socket BytesIO
        :return: dict 包含了解析之后的参数
        """
        param_len_map = {
            1: 4,
            2: 4,
        }
        param_fmt_map = {
            1: "!i",
            2: "!i",
        }
        param_name_map = {
            1: "num1",
            2: "num2"
        }

        # 保存用来返回的参数
        # args = {"num1": xxx, "num2": xxx}
        args = {

        }

        self.conn = connection
        # 处理方法的名已经提前被处理(稍后实现)

        # 处理消息边界
        # 读取二进制数据
        # socket.recv(4) => ?4
        # BytesIO.read
        buff = self._read_all(4)
        # 将二进制数据转换为python数据类型
        length = struct.unpack("!I", buff)[0]

        # 已经读取处理的字节数
        have = 0

        # 处理第一个参数
        # 处理参数序号
        buff = self._read_all(1)
        have += 1
        param_seg = struct.unpack("!B", buff)[0]

        # 处理参数值
        param_len = param_len_map[param_seg]
        buff = self._read_all(param_len)
        have += param_len
        param_fmt = param_fmt_map[param_seg]
        param = struct.unpack(param_fmt, buff)[0]

        param_name = param_name_map[param_seg]
        args[param_name] = param

        if have >= length:
            return args

        # 处理第二个参数
        # 处理参数序号
        buff = self._read_all(1)
        param_seg = struct.unpack("!B", buff)[0]

        # 处理参数值
        param_len = param_len_map[param_seg]
        buff = self._read_all(param_len)
        param_fmt = param_fmt_map[param_seg]
        param = struct.unpack(param_fmt, buff)[0]

        param_name = param_name_map[param_seg]
        args[param_name] = param

        return args

    def result_encode(self, result):
        """
        将原始结果数据转换为消息协议二进制数据
        :param result: 原始结果数据 float InvalidOperation
        :return: bytes 消息协议二进制数据
        """
        # 正常
        if isinstance(result, float):
            # 处理返回值类型
            buff = struct.pack("!B", 1)
            buff += struct.pack("!f", result)
            return buff
        # 异常
        else:
            # 处理返回值类型
            buff = struct.pack("!B", 2)
            # 处理返回值
            length = len(result.message)
            # 处理字符串长度
            buff += struct.pack("!I", length)
            # 处理字符
            buff += result.message.encode()
            return buff

    def result_decode(self, connection):
        """
        将返回值消息数据转换为原始返回值
        :param connection: socket BytesIO
        :return: float InvalidOperation对象
        """
        self.conn = connection

        # 处理返回值类型
        buff = self._read_all(1)
        result_type = struct.unpack("!B", buff)[0]

        if result_type == 1:
            # 正常
            # 读取float数量
            buff = self._read_all(4)
            val = struct.unpack("!f", buff)[0]
            return val
        else:
            # 异常
            # 读取字符串的长度
            buff = self._read_all(4)
            length = struct.unpack("!I", buff)[0]

            # 读取字符串
            buff = self._read_all(length)
            message = buff.decode()

            return InvalidOperation(message)


class Channel(object):
    """
    用户客户端建立网络连接
    """
    def __init__(self, host, port):
        """

        :param host: 服务器地址
        :param port: 服务器端口号
        """
        self.host = host
        self.port = port

    def get_connection(self):
        """
        获取连接对象
        :return: 与服务器通讯的socket
        """
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((self.host, self.port))
        return sock


class DistributedChannel(object):
    """
    支持分布式的zookeeper的RPC客户端通讯连接工具
    """
    def __init__(self):
        # 创建kazoo对象,用来跟zookeeper连接,获取信息
        self.zk = KazooClient("192.168.218.136:2181")
        self.zk.start()
        self._servers = []
        self._get_servers()

    def _get_servers(self, event=None):
        """
        从zookeeper中获取所有可用的RPC服务器地址信息
        :return:
        """
        self._servers = []
        # 从zookeeper中获取/rpc节点下所有可用的rpc服务器节点
        servers = self.zk.get_children("/rpc", watch=self._get_servers)
        # 遍历节点,获取服务器的地址信息
        for server in servers:
            addr_data = self.zk.get("/rpc/" + server)[0]
            addr = json.loads(addr_data)
            self._servers.append(addr)

    def _get_server(self):
        """
        从可用的服务器列表中选出一台服务器
        :return:
        """
        return random.choice(self._servers)

    def get_connection(self):
        """
        提供一个具体的与RPC服务器的连接socket
        :return:
        """
        while True:
            addr = self._get_server()
            print(addr)
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.connect((addr["host"], addr["port"]))
            except ConnectionRefusedError:
                time.sleep(1)
                continue
            else:
                return sock



class Server(object):
    """
    RPC服务器
    """
    def __init__(self, host, port, handlers):
        self.host = host
        self.port = port
        # 创建socket的工具对象
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        # 设置socket
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        # 绑定地址
        sock.bind((self.host, self.port))
        self.sock = sock
        self.handlers = handlers

    def serve(self):
        """
        开启服务器运行,提供RPC服务
        :return:
        """
        # 开启服务器的监听,等待客户端的连接请求
        self.sock.listen(128)
        print("服务器开始监听")

        # 接收客户端的连接请求
        while True:
            client_sock, client_addr = self.sock.accept()
            print("与客户端%s建立了连接" % str(client_addr))

            # 交给ServerStub,完成客户端的具体的RPC调用请求
            stub = ServerStub(client_sock, self.handlers)
            try:
                while True:
                        stub.process()
            except EOFError:
                # 表示客户端关闭了连接
                print("客户端关闭了连接")
                client_sock.close()


class ThreadServer(object):
    """
    多线程RPC服务器
    """
    def __init__(self, host, port, handlers):
        # 创建socket的工具对象
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        # 设置socket
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        # 绑定地址
        sock.bind((host, port))
        self.host = host
        self.port = port
        self.sock = sock
        self.handlers = handlers

    def serve(self):
        """
        开启服务器运行,提供RPC服务
        :return:
        """
        # 开启服务器的监听,等待客户端的连接请求
        self.sock.listen(128)
        print("服务器开始监听")

        # 注册到zookeeper
        self.regoster_zookeeper()

        # 接收客户端的连接请求
        while True:
            client_sock, client_addr = self.sock.accept()
            print("与客户端%s建立了连接" % str(client_addr))

            # 创建子线程处理这个客户端
            t = threading.Thread(target=self.handle, args=(client_sock,))
            # 开启子线程执行
            t.start()

    def regoster_zookeeper(self):
        """
        在zookeeper中心中注册本服务器的地址信息
        :return:
        """
        # 创建kazoo客户端
        zk = KazooClient("192.168.218.136:2181")
        # 建立与zookeeper的连接
        zk.start()
        # 在zookeeper中创建节点保存数据
        zk.ensure_path("/rpc")
        data = json.dumps({"host": self.host, "port": self.port})
        # 在zookeeper上存储服务器地址及端口,设置成临时节点,顺序节点
        zk.create("/rpc/server", data.encode(), ephemeral=True, sequence=True)

    def handle(self, client_sock):
        """
        子线程调用的方法,用来处理一个客户端的请求
        :return:
        """
        # 交给ServerStub,完成客户端的具体的RPC调用请求
        stub = ServerStub(client_sock, self.handlers)
        try:
            while True:
                stub.process()
        except EOFError:
            # 表示客户端关闭了连接
            print("客户端关闭了连接")
            client_sock.close()


class ClientStub(object):
    """
    用来帮助客户端完成远程过程调用 RPC调用

    stub = ClientStub()
    stib.divide(200)
    """
    def __init__(self, channel):
        self.channel = channel
        self.conn = self.channel.get_connection()

    def divide(self, num1, num2=1):
        # 将调用的参数打包成消息协议的数据
        proto = DivideProtocol()
        args = proto.args_encode(num1, num2)

        # 将消息数据通过网络发送给服务器
        self.conn.sendall(args)

        # 接收服务器返回的返回值消息数据,并进行解析
        result = proto.result_decode(self.conn)

        # 将结果值(正常float 或 异常InvalidOperation)返回给客户端
        if isinstance(result, float):
            # 正常
            return result
        else:
            # 异常
            raise result

    def add(self):
        pass


class ServerStub:
    """
    帮助服务端完成远程过程调用
    """
    def __init__(self, connection, handlers):
        """

        :param connection: 与客户端的连接
        :param handlers: 真正本地被调用方法(函数 过程)
        class Handlers:

            @staticmethod
            def divide(num1, num2=1):
                pass

            def add():
                pass
        """
        self.conn = connection
        self.method_proto = MethodProtocol(self.conn)
        self.process_map = {
            "divide": self._process_divide,
            "add": self._process_add,
        }
        self.handlers = handlers

    def process(self):
        """
        当服务端接收了一个客户端的连接,建立好连接后,完成远端调用处理
        :return:
        """
        # 接收消息数据,并解析方法的名字
        name = self.method_proto.get_method_name()

        # 根据机械获得的方法(过程)名,调用相应的过程协议,接收并解析消息数据
        # self.process_map[name]()
        _process = self.process_map[name]
        _process()

    def _process_divide(self):
        """
        处理除法过程调用
        :return:
        """
        # 创建用于除法过程调用参数协议解析的工具
        proto = DivideProtocol()
        # 解析调用消息参数
        args = proto.args_decode(self.conn)
        # args = {"num1": xxx, "num2": xxx}

        # 进行除法的本地过程调用
        # 将本地调用过程的返回值(包括可能的异常)打包成消息协议数据,通过网络返回给客户端
        try:
            val = self.handlers.divide(**args)
        except InvalidOperation as e:
            ret_message = proto.result_encode(e)
        else:
            ret_message = proto.result_encode(val)

        self.conn.sendall(ret_message)

    def _process_add(self):
        pass


if __name__ == '__main__':
    # 狗贼消息数据
    proto = DivideProtocol()
    # divide(200, 100)
    # message = proto.args_encode(200, 100)
    # divide(200)
    message = proto.args_encode(200)
    conn = BytesIO()
    conn.write(message)
    conn.seek(0)

    # 解析消息数据
    method = MethodProtocol(conn)
    name = method.get_method_name()
    print(name)

    args = proto.args_decode(conn)
    print(args)

client.py

from service import ClientStub
from service import Channel
from service import InvalidOperation
from service import DistributedChannel
import time

# 创建与服务器的连接
# channel = Channel("127.0.0.1", 8000)
channel = DistributedChannel()

# 运行调用
for i in range(50):
    try:
        # 创建用于RPC调用的工具
        stub = ClientStub(channel)

        val = stub.divide(i * 100, 50)
    except InvalidOperation as e:
        print(e.message)
    except Exception as e:
        print(e)
    else:
        print(val)

    time.sleep(1)

使用.py

from service import InvalidOperation
from service import Server, ThreadServer
import sys


class Handlers:

    @staticmethod
    def divide(num1, num2=1):
        """
        除法
        :param num1: int
        :param num2: int
        :return:
        """
        if num2 == 0:
            raise InvalidOperation()
        val = num1 / num2
        return val


if __name__ == '__main__':
    # 开启服务器
    # _server = Server("127.0.0.1", 8000, Handlers)
    # _server.serve()

    # 从启动命令中提取服务器运行的ip地址和端口号
    host = sys.argv[1]
    port = sys.argv[2]

    _server = ThreadServer(host, int(port), Handlers)
    _server.serve()
收藏
评论区

相关推荐

Python的环境搭建和下载
Python是一个跨平台、可移植的编程语言,因此可在windows、Linux和Mac OS X系统中安装使用。 安装完成后,你会得到Python解释器环境,可以通过终端输入python命令查看本地是否已经按照python以及python版本。这里有一点需要注意的是,如果没有将python的安装目录添加到环境变量中,会报错(python不是内部命令或外部命
知乎从Python转为Go,是不是代表Go比Python好?
众所周知,知乎早在几年前就将推荐系统从 Python 转为了 Go。于是乎,一部分人就说 Go 比 Python 好,Go 和 Python 两大社区的相关开发人员为此也争论过不少,似乎,谁也没完全说服谁。 知乎从Python转为Go,是不是代表Go比Python好?我认为,各有优点,谁也取代不了谁,会长期共存! “由 Python 语言转向 Go 语言
python调用zookeeper
ZooKeeper 1. 简介ZooKeeper是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。ZooKeeper通过其简单的架构和API解决了这个问题。ZooKeeper允许开发人员专注于核心应用程序逻辑,而不必担心应用程序的分布式特性。ZooKeeper框架最初是在“Yahoo"上构建的,用于以简
Python编程基础(快速入门必看
Python编程基础一、Python语言基本语法 Python是一
Java的JDK配置
一、JDK的环境配置 1、在jdk官网下载(https://www.oracle.com/java/technologies/javasejdk14downloads.html)所想要的jdk版本,选择路径安装.(我这边选择的是windowsx64\_bin.exe) 安装好之后,打开电脑控制面板主页,点击选择高级系统设置
聊聊dubbo协议
协议协议通俗易懂地解释就是通信双方需要遵循的约定。我们了解的常见的网络传输协议有tcp、udp、http等。再到我们常用的基础组件,一般来说client端与server端也有相应的协议,如redis、mysql、zookeeper等都是各自约定的私有协议,同样今天标题中的dubbo协议也是一种私有协议,他们都是应用层协议,基于tcp或udp设计。
Zookeeper分布式锁?
客户端A要获取分布式锁的时候首先到locker下创建一个临时顺序节点(node_n),然后立即获取locker下的所有(一级)子节点。此时因为会有多个客户端同一时间争取锁,因此locker下的子节点数量就会大于1。对于顺序节点,特点是节点名称后面自动有一个数字编号,先创建的节点数字编号小于后创建的,因此可以将子节点按照节点名称后缀的数字顺序从小到大排序,这样
python文件的第一行 #!/usr/bin/python3 是什么意思?
python文件的第一行代码通常在脚本语言的第一行会看到: !/usr/bin/env python或 !/usr/bin/python 首先要确定的一点是它不是注释。这两句话的目的都是指出你的python文件用什么可执行程序去运行它。1. !/usr/bin/python 是告诉操作系统执行这个脚本的时候,调用 /usr/bin 下的 python 解释
Python初学者必备书籍《Python入门经典》高清PDF版|百度网盘免费下载|Python初学者,自学Python必读
提取码:1028以及前文提到的学习路线图内容简介Python是一种解释型、面向对象、动态数据类型的高级程序设计语言。Python可以用于很多的领域,从科学计算到游戏开发。《Python入门经典》是面向Python初学者的学习指南,详细介绍了Python编程基础,以及一些高级概念,如面向对象编程。全书分为24章。第1章介绍了Python的背景和安装方法。第2章
zookeeper到nacos的迁移实践
本文已收录 https://github.com/lkxiaolou/lkxiaolou 欢迎star。 技术选型公司的RPC框架是dubbo,配合使用的服务发现组件一直是zookeeper,长久以来也没什么大问题。至于为什么要考虑换掉zookeeper,并不是因为它的性能瓶颈,而是考虑往云原生方向演进。云原生计算基金会(CNCF)对云原生的定义是: 云原生
浅析常用的Python Web的几大框架
在各种语言平台中,python涌现的web框架恐怕是最多的,是一个百花齐放的世界,各种microframework、framework不可胜数;猜想原因应该是在python中构造框架十分简单,使得轮子不断被发明。所 以在Python社区总有关于Python框架孰优孰劣的话题。下面就给大家介绍一下python的几大框架: Django Django 应该是最出
大厂首发!java哨兵模式的作用
引言做了5年开发的我,阿里一直是我心之所向,如今我如愿以偿进入了国内互联网巨头——Alibaba!其实,今年下半年我面试不少互联网企业,像涂鸦智能,百度,京东,腾讯,字节,滴滴,阿里等等都有三井的身影,之后总结出来的针对Java面试的知识点或真题,每个点或题目都是在面试中被问过的,满满干货,诚意分享! 由于整理成了文档,总结的内容比较多,希望大家都能领取一份
JDK13的特性和JDK的历史你知道吗???喂饭式带你学好!!!
1.1 JDK 各版本主要特性回顾 JDK Version 1.019960123 Oak(橡树) 初代版本,伟大的一个里程碑,但是是纯解释运行,使用外挂JIT,性能比较差,运行速度慢。 JDK Version 1.119970219 JDBC(Java DataBase Connectivity); 支持内部类; RMI(Remote Me
一篇文带你了解JDK 13新特性,保姆级教程!!!
JDK 13新特性介绍 1.1 JDK 各版本主要特性回顾 JDK Version 1.019960123 Oak(橡树) 初代版本,伟大的一个里程碑,但是是纯解释运行,使用外挂JIT,性能比较差,运行速度慢。 JDK Version 1.119970219 JDBC(Java DataBase Connectivity); 支持内部类; R
从nacos客户端的TIME_WAIT说起
本文已收录 https://github.com/lkxiaolou/lkxiaolou 欢迎star。 问题起因前段时间调研nacos,用来代替zookeeper当作dubbo的注册中心,使用的是nacos的1.1.4版本。还用了nacosSync,一款nacos提供的迁移工具,可将常见的注册中心上的服务同步到nacos上。这玩意很不好用,至少不是生产级别