使用 Rust 实现的基础的List 和 Watch 机制

天翼云开发者社区
• 阅读 2

本文分享自天翼云开发者社区《使用 Rust 实现的基础的List 和 Watch 机制》。作者:l****n

介绍

在日常的开发过程中,有一个很重要的任务是能够通过Rust语言实现K8s中的各种生态组件,在这个过程中,既需要能过够了解K8S的工作原理也需要能够知道rust的语言特性。因此,在这个过程中有很多值得探讨的知识点。

在这里,第一步,我们将探索如何使用 Rust 实现一个类似于 Kubernetes 的 list 和 watch 机制。我们将通过 WebSocket 实现实时的消息推送,并使用一些关键的 Rust 异步编程模型来处理事件和连接管理。

我们首先默认大家能够了解rust语言的基本特性。下文中,将针对rust的知识点展开进行探讨。

目标

  • 理解 WebSocket 连接的建立和管理。
  • 学习如何通过 WebSocket 推送消息。
  • 掌握消息缓存和处理的实现方式。
  • 了解如何使用 Rust 实现一个高效的事件分发系统。
  • 理解K8S中的数据一致性保障方法
  • 了解本机制的不足,以及后续如何进行改进

    理解问题

    什么是 list 和 watch?

  • ** List**:列出当前所有资源的状态。
  • Watch:实时监控资源的变化,一旦有资源变化,就会立即通知客户端。

    使用场景

  • 自动化运维:实时监控系统资源状态,触发自动化运维操作。
  • 应用监控:实时获取应用状态,及时处理异常,在很多的系统设计场景中,能够减少耦合。
  • ** K8S中的相应设计**:K8S中,对相应资源的通知的基础即为list and watch机制。本人在学习K8S源码的第一步就是学习这一套设计架构。

    分析问题

    \当然,通过简单的代码仅仅通过http进行主动连接也可实现这个功能。但在目前阶段,我们希望能够设计一个高效的、稳定的、可扩展的list and watch体系,因此我们需要考虑以下几个关键问题。

关键问题

  1. 如何建立和管理 我们服务器和客户端的连接?通过什么方式进行?
  2. 如何实现高效的消息推送机制?
  3. 如何处理消息缓存和订阅管理?

    技术选型

  • 语言:Rust

  • Web 框架:warp框架

  • WebSocket实现和框架:tokio-tungstenite、warp

  • 异步编程:tokio、管道机制

    设计代码结构

    针对以上这个需求,结合目前kunos-system的需求我们阐释如下

  • 有以下几个资源,Node、Task(Task是一个shell命令、镜像运行命令的载体)、Job(Task的上层资源,一个Job包含多个Task,类似于K8s中的replicaset)我们需要对这几个资源的状态进行推送。

  • 能够在服务器建立起来一个watch and list服务器,能够推送各种事件

  • 能够

    组件设计

  1. Broker:管理 WebSocket 订阅者和事件分发。
  2. pub struct Broker<R: Resource + Clone + Serialize + Send + Sync  + 'static> {
     // 下游的订阅者列表,用于发送websocket信息
     subscribers: Arc<RwLock<HashMap<Topic, HashMap<Uuid, WsSender>>>>,
     // 事件的缓冲流
     event_sender: UnboundedSender<(Topic, WatchEvent<R>)>,
    }
  3. Watcher:对不同资源类型进行管理和操作。
    pub struct Watcher {
     // 为不同的事件建立不同的broker
     pub node_broker: Arc<Broker<Node>>,
     pub task_broker: Arc<Broker<Task>>,
     pub job_broker: Arc<Broker<Job>>,
     pub exec_broker: Arc<Broker<TaskExecRequest>>,
    }
  4. WebSocket 客户端:与服务器交互,接收实时事件。

    基本原理

websocket路由入口

let node_subscribe = warp::path!("watch" / "node").and(warp::ws()).map(
    move |ws: warp::ws::Ws| {
        let node_broker_clone = Arc::clone(&node_broker_clone);
        ws.on_upgrade(move |socket| async move {
            node_broker_clone.subscribe("node".to_string(), socket).await;
        })
    },
);
  1. warp::path!("watch" / "node")

*这部分代码定义了一个路径过滤器,用于匹配路径 /watch/node 的 HTTP 请求。warp::path!是 Warp 框架提供的一个宏,用于简化路径定义。这里的"watch" / "node"表示请求路径必须是/watch/node` 才能匹配这个过滤器。

  1. .and(warp::ws())

这一部分代码将路径过滤器与 WebSocket 协议过滤器组合起来。warp::ws() 过滤器会匹配 WebSocket 握手请求并提取一个 warp::ws::Ws 类型,表示 WebSocket 配置。这表示我们的这个路径将为一个websocket接口。

  • warp::ws() 过滤器用于匹配并提取 WebSocket 握手请求,确保该请求是 WebSocket 协议请求。
  1. .map(move |ws: warp::ws::Ws| { ... }) .map 方法用于将前面的过滤器组合结果映射到一个新的处理逻辑中。这里的 move |ws: warp::ws::Ws| { ... } 是一个闭包,用于处理 WebSocket 请求。
  • move 关键字确保闭包捕获其环境中的所有变量的所有权,因为这些变量将在异步操作中使用。
  • ws: warp::ws::Ws 参数是从前面的 warp::ws() 过滤器中提取的 WebSocket 配置。
  1. ws.on_upgrade(move |socket| async move { ... }) ws.on_upgrade 方法用于将 WebSocket 协议升级请求处理为 WebSocket 连接。它接受一个闭包作为参数,当 WebSocket 握手成功后,这个闭包会被调用。在官方定义中,这个方法主要用于自定义一个函数对建立后的websocket连接进行一定的操作,因此我们在这里将建立连接后一切操作,比如保持连接,发送信息等。
    /// Finish the upgrade, passing a function to handle the `WebSocket`.
    ///
    /// The passed function must return a `Future`.
    pub fn on_upgrade<F, U>(self, func: F) -> impl Reply
    where
     F: FnOnce(WebSocket) -> U + Send + 'static,
     U: Future<Output = ()> + Send + 'static,
    {
     WsReply {
         ws: self,
         on_upgrade: func,
     }
    }
  • move |socket| async move { ... } 是一个异步闭包,它将在 WebSocket 连接成功升级后执行。 socket 参数表示已经升级的 WebSocket 连接。

    1. node_broker_clone.subscribe("node".to_string(), socket).await; 在异步闭包内部,调用 node_broker_clone 的subscribe` 方法,将新的 WebSocket 连接订阅到节点(node)主题中。后续我们将展开讲解
  • "node".to_string() 将节点主题名称转换为字符串。

  • socket 参数表示当前的 WebSocket 连接。

  • await 关键字等待异步订阅操作完成。

    websocket连接处理

    上面说到,我们通过 ws.on_upgrade(move |socket| async move { ... })这个方法在连接建立之后进行处理,其中可以知道,我们处理的方法如下所示。

    pub async fn subscribe(&self, topic: Topic, socket: warp::ws::WebSocket) {
          let (ws_sender, mut ws_receiver) = socket.split();
          let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
          let subscriber_id = Uuid::new_v4();
    ​
          {
              let mut subs = self.subscribers.write().await;
              subs.entry(topic.clone()).or_default().insert(subscriber_id, tx);
          }
    ​
          let subscribers = Arc::clone(&self.subscribers);
          tokio::task::spawn(async move {
              while let Some(result) = ws_receiver.next().await {
                  match result {
                      Ok(message) => {
                          // 处理有效的消息
                          if message.is_text() {
                              println!(
                                  "Received message from client: {}",
                                  message.to_str().unwrap()
                              );
                          }
                      }
                      Err(e) => {
                          // 处理错误
                          eprintln!("WebSocket error: {:?}", e);
                          break;
                      }
                  }
              }
              println!("WebSocket connection closed");
              subscribers.write().await.get_mut(&topic).map(|subscribers| subscribers.remove(&subscriber_id));
          });
    ​
          tokio::task::spawn(async move {
              let mut sender = ws_sender;
    ​
              while let Some(msg) = rx.recv().await {
                  let _ = sender.send(msg).await;
              }
          });
      }
  • websocket连接处理 let (ws_sender, mut ws_receiver) = socket.split();这里使用原生的代码,将已经建立起来的socket进行分割,因为websocket是双向连接,因此获得针对这个socket的发送端(ws_sender)和接收端(ws_receiver)。

建立连接并保存

let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
let subscriber_id = Uuid::new_v4();
​
{
    let mut subs = self.subscribers.write().await;
    subs.entry(topic.clone()).or_default().insert(subscriber_id, tx);
}

在这里,我们建立了个一个管道,并将subscriber的信息进行保存,这里的 mpsc::unbounded_channel::();类似于golang中的channel,他会生成一个发送者、一个接收者,当往发送者发送消息的时候,接收者会受到该消息并进行一定处理。因此我们将subscriber的发送者(tx)保存至内存里。

建立消息发送机制

tokio::task::spawn(async move {
            let mut sender = ws_sender;
​
            while let Some(msg) = rx.recv().await {
                let _ = sender.send(msg).await;
            }
        });

这个就是很简单了,通过如果rx收到了消息,则向websocket的subscriber进行发送。该任务是以新协程任务的方式启动的,在后台持续运行

建立websocket连接保活机制

let subscribers = Arc::clone(&self.subscribers);
    tokio::task::spawn(async move {
        while let Some(result) = ws_receiver.next().await {
            match result {
                Ok(message) => {
                    // 处理有效的消息
                    if message.is_text() {
                        println!(
                            "Received message from client: {}",
                            message.to_str().unwrap()
                        );
                    }
                }
                Err(e) => {
                    // 处理错误
                    eprintln!("WebSocket error: {:?}", e);
                    break;
                }
            }
        }
        println!("WebSocket connection closed");
        subscribers.write().await.get_mut(&topic).map(|subscribers| subscribers.remove(&subscriber_id));
    });

这里我们仍然在后台启动一个守护协程,用于保活websocket连接,一旦发生了连接失效,则注销消息发送机制,删除subscribers缓存中的订阅者。

消息推送机制

事件推送 事件推送时候将允许调用相关事件的推送地址,向推送端发送消息

pub async fn produce_node_event(&self, event: WatchEvent<Node>) {
        self.node_broker.produce("node".to_string(), event).await;
    }

    pub async fn produce_task_event(&self, event: WatchEvent<Task>) {
        self.task_broker.produce("task".to_string(), event).await;
    }

    pub async fn produce_job_event(&self, event: WatchEvent<Job>) {
        self.job_broker.produce("job".to_string(), event).await;
    }

当收到消息的时候,不直接处理消息,而是将放入缓存队列中(一个消息无界流)

pub async fn produce(&self, topic: Topic, event: WatchEvent<R>) {
        if let Err(e) = self.event_sender.send((topic.clone(), event.clone())) {
            eprintln!("Failed to send event: {}", e);
        }
    }

事件分发 同样的。将启动一个协程,用于从和event_sender对应的event_receiver中获取消息,推送给订阅者。

  • 获取订阅者的列表并依次发送

  • 如果发现发送失败,则将这个订阅者从缓存中删除

    fn start_event_dispatcher(broker: Arc<Self>, mut event_receiver: UnboundedReceiver<(Topic, WatchEvent<R>)>) {
          tokio::spawn(async move {
              while let Some((topic, event)) = event_receiver.recv().await {
                  let event_json = serde_json::to_string(&event).unwrap();
                  let subscribers_list;
                  {
                      let subscribers = broker.subscribers.read().await;
                      subscribers_list = subscribers.get(&topic).cloned().unwrap_or_default();
                  }
    
                  let mut invalid_subscribers = vec![];
                  for (id, ws_sender) in subscribers_list {
                      if ws_sender.send(warp::ws::Message::text(event_json.clone())).is_err() {
                          invalid_subscribers.push(id);
                      }
                  }
    
                  if !invalid_subscribers.is_empty() {
                      let mut subscribers = broker.subscribers.write().await;
                      if let Some(subscribers) = subscribers.get_mut(&topic) {
                          for id in invalid_subscribers {
                              subscribers.remove(&id);
                          }
                      }
                  }
              }
          });
      }
  • 客户端* 客户端的代码就是建立起来一个订阅者关注相关事件的动态。在相应的代码中,可以使用该方法。本方法最终返回的是一个无界流 Stream<Item = WatchEvent>,用于得到服务器推送过来的事件类型

    pub async fn list_and_watch<R>(api_client: &ApiClient, resource_name: &str) -> impl Stream<Item = WatchEvent<R>>
    where
       R: Resource + Clone + DeserializeOwned + 'static + Send,
    {
    
       // 先通过 HTTP 获取资源列表
       let initial_resources = get_resource_list::<R>(api_client).await;
    
       // 解析要连接WebSocket服务器的URL
       let url = Url::parse(&*format!("{}/{}", api_client.watch_url, resource_name)).expect("Invalid URL");
       // 连接到WebSocket服务器
       println!("watch url is {}", url);
       let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
    
       println!("Watch client connected");
    
       let (mut write, read) = ws_stream.split();
       let (tx, rx) = mpsc::unbounded_channel();
    
       // 先发送初始资源列表
       match initial_resources {
           Ok(res) => tx.send(WatchEvent::Restarted(res)).unwrap(),
           Err(e) => eprintln!("list resource failed, {}", e),
       };
    
       // 将 WebSocket 读流转换为消息事件流
       tokio::spawn(async move {
           read.for_each(|message| async {
               match message {
                   Ok(msg) => {
                       if msg.is_text() {
                           let text = msg.to_text().unwrap();
                           match serde_json::from_str::<WatchEvent<R>>(text) {
                               Ok(event) => {
                                   tx.send(event).unwrap();
                               }
                               Err(e) => {
                                   eprintln!("Failed to parse message: {:?}", e);
                               }
                           }
                       }
                   }
                   Err(e) => {
                       eprintln!("Error receiving message: {:?}", e);
                   }
               }
           }).await;
       });
    
       // 保持 WebSocket 连接活跃
       tokio::spawn(async move {
           loop {
               if let Err(e) = write.send(WatchMessage::Text(String::new())).await {
                   eprintln!("Error sending ping: {:?}", e);
                   break;
               }
               tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
           }
       });
    
       tokio_stream::wrappers::UnboundedReceiverStream::new(rx)
    }

    使用验证

  • 不足分析* 经过上面的介绍,我们可以看到这个基础的list and watch机制能够正确运行。但是,和K8S、ETCD中广泛使用的list and watch相比仍然缺少一个机制来保证list和watch的一致性。

请考虑这样一种情况我们的服务器中会源源不断地产生数据d1,d2,d3,...,dn。当我们使用list时候,能够感知到d1,d2,d3,此时我们完成list,开始建立watch。加入在开始建立watch这个阶段,即使可能是几毫秒的时间但服务器生成了d4,而在watch建立起来后,只能接收到d5,d6,...。这就导致了数据的遗失。

在 Kubernetes 中,List 和 Watch 操作结合使用时,需要使用一个revision机制以确保资源的变更不会被遗漏。理解 List 和 Watch 操作时 revision(即 resourceVersion)的具体含义和管理方式对于保证一致性至关重要。revision的存在有着如下的意义:

  1. 数据版本控制:revision 是 Etcd 的全局递增计数器,用于标识数据的当前版本。当进行数据的修改、更新操作时候,revision会+1
  2. 一致性视图:确保返回的数据是一致的快照视图,表示在该 revision 之前的所有操作都已完成。

revision 与 List 和 Watch 的关系

  1. List 操作: 返回资源列表和当前的全局 revision,作为 resourceVersion。 确保获取到的资源是该 revision 时刻的一致视图。
  2. Watch 操作: 使用 List 操作返回的resourceVersion作为起点。 从该 resourceVersion 开始监听资源的变化,确保在List 和Watch 之间的变更不会丢失。

List 操作的 revision 当进行 List 操作时,Kubernetes API Server 从 Etcd 获取当前资源的状态及其resourceVersion 。这个 resourceVersion 是 Etcd 当前的全局revision 。它表示在此 revision 之前的所有操作都已经完成,并确保返回的数据是这个revision` 时刻的一致视图。

Watch 操作的 revision Watch 操作使用 List 操作返回的 resourceVersion 作为起点,从该版本开始监听资源的变化。这确保了从 List 到 Watch 之间的变更不会被遗漏。

示例流程

  1. List 操作: API Server 从 Etcd 获取指定资源的当前状态。 Etcd 返回包含所有资源对象的列表和一个全局 revision ,这个 revision 将作为resourceVersion`。
  2. Watch 操作: API Server 使用 List 操作返回的 resourceVersion(revision) 作为起点,开始监听资源的变化。 Etcd 返回从指定 revision` 开始的所有变更事件。

    总结

    revision:标识数据版本,确保数据一致性。 List 和 Watch:List 获取资源和 revision,Watch 从该 revision 开始监听变化,确保变更的连续性和一致性。
点赞
收藏
评论区
推荐文章
happlyfox happlyfox
4年前
now扩展-go的时间工具箱
关于我golang不像C,Java这种高级语言,有丰富的语法糖供开发者很方便的调用。所以这便催生出很多的开源组件,通过使用这些第三方组件能够帮助我们在开发过程中少踩很多的坑。时间处理是所有语言都要面对的一个问题,parse根据字符串转为date类型,tostring()将date类型转为定制化的字符串。在实际使用过程中,parse
esbuild构建工具简介
本文分享自天翼云开发者社区《》,作者:陈冬esbuild是什么esbuild是采用go语言实现的新一代构建工具,其官方文档如下:esbuild.github.io/其速度是目前已知的构建工具中速度最快的,但是esbuild为什么快以及其在实际过程中会遇的问
批量创建云主机的整个过程
本文分享自天翼云开发者社区《》,作者:乐道上次我们讲述了云主机创建的流程,整个过程中并没有详细区分各个组件的基本功能,本章节将会为大家详细讲述批量创建过程中各个组件的处理过程。1、我们通过console或openapi进行批量创建云主机的下单操作,例如批量
RabbitMQ集群部署(一)——单机模式部署
本文分享自天翼云开发者社区《》,作者:芋泥麻薯RabbitMQ是一种开源消息队列系统,是AMQP的标准实现,用erlang语言开发。RabbitMQ具有良好的性能和时效性,同时还能够非常好的支持集群和负载部署,非常适合在较大规模的分布式系统中使用。Rabb
云备份技术解析:云容灾 CT-CDR 关键技术介绍
本文分享自天翼云开发者社区《》,作者:沈军1、CDP存储快照,实现秒级RPO(1)CDP技术:云容灾CTCDR(CloudDisasterRecovery)采用CDP(ContinueDataProtection)技术,能够在IO级别进行复制能极大的提升
ES集群迁移方案总结
本文分享自天翼云开发者社区《ES集群迁移方案总结》,作者:刘鑫ES集群迁移可以通过以下几种方式实现,具体方案的选择,需要根据数据量、索引类型、网络情况等进行方案评估和选择。在实施迁移时,需确保目标集群能够承载迁移的数据量,并考虑到集群的可用性、数据一致性和
Flink 与Flink可视化平台StreamPark教程(CDC功能)
本文分享自天翼云开发者社区《》,作者:ln基本概念flinkCDC功能是面向binlog进行同步、对数据的增删改进行同步的工具,能够实现对数据的动态监听。目前其实现原理主要为监听数据源的binlog对数据的变化有所感知。在这里,我们只需引入相关依赖即可进行
flink on k8s的基本介绍
本文分享自天翼云开发者社区《》,作者:ln一、背景介绍ApacheFlink是一个流处理引擎,具有高效的流处理和批处理能力,以及良好的可伸缩性和容错性。Kubernetes(简称K8s)是一种容器编排系统,用于自动化容器部署、扩展和管理。将Flink部署在
K8s Application模式下的flink任务执行精要
本文分享自天翼云开发者社区《》,作者:ln构键k8s集群1.在这里,我们需要搭建一个K8S环境用于提供flink任务的运行时环境。在这里推荐使用kubeadm或者一些脚本工具搭建,可参考本自动k8s脚本工具。具体过程在这里省略,可以参考上述链接中的文档进行
Rust 中的 Tokio 线程同步机制
本文分享自天翼云开发者社区《》,作者:lnRust中的Tokio线程同步机制在并发编程中,线程同步是一个重要的概念,用于确保多个线程在访问共享资源时能够正确地协调。Tokio是一个强大的异步运行时库,为Rust提供了多种线程同步机制。以下是一些常见的同步机
天翼云开发者社区
天翼云开发者社区
Lv1
天翼云是中国电信倾力打造的云服务品牌,致力于成为领先的云计算服务提供商。提供云主机、CDN、云电脑、大数据及AI等全线产品和场景化解决方案。
文章
929
粉丝
16
获赞
40