Go 语言实现 WebSocket 推送

ByteOrbitX
• 阅读 1417

作者:周慧婷

写在前面

系统开发的过程中,我们经常需要实现消息推送的需求。单端单实例的情况下很好处理(网上有许多教程这里不做展开),但在分布式系统及多端需要推送的情况下该如何处理呢?

在分布式系统中,消息推送服务端是多实例的。某系统中一个服务生成一条消息,这条消息需要实时推送到多个终端,此时该如何进行有效的 WebSocket 推送呢?首先一起看看如下场景:

假设推送消息由消息实例 2 产生,但是终端真正连接的消息实例是实例 1 和实例 3,并没有连接到生产消息的实例 2,系统是如何将实例 2 的消息同步推送到终端 1 和终端 2 的呢?下文将详细描述。

Go 语言实现 WebSocket 推送

基本原理

为了满足需求我们采用 redis 做协同中间件,用于存储用户信息、生成用户连接的唯一性标识以及 pod address,消息的生产者实例通过订阅 redis 获取终端连接的唯一性标识和 pod address,并通知到对应的消息实例,最终由相应连接终端的消息实例通过 WebSocket 将消息发推送到用户终端。具体流程如下图:

Go 语言实现 WebSocket 推送

服务端实现

Client

Client 组件的作用,是当用户与消息服务中某个实例建立连接后,管理这个连接的信息,这里通过一个 Golang 结构体来定义:

type Client struct {
    UUID   string 
    UserID string
    Socket *websocket.Conn
    Send   chan []byte
}

结构体中的数据类型说明如下:

  • UUID:对连接进行唯一性的标识,通过此标识可以查找到连接信息。
  • UserID:用户 ID。
  • Socket:连接对象。
  • Send:消息数据 channel。

我们为 Client 结构体实现了两个方法:Read、Write 来处理消息的接受和发送。

Read 方法

Read 方法比较简单,从终端接收请求消息后,消息实例通过 WebSocket 回应接收消息状态,并不返回请求结果。结果通过 Write 方法返回。

func (c *Client) Read(close, renewal chan *Client) {
    defer func() {
        close <- c
    }()

for {
    _, message, err := c.Socket.ReadMessage()
    if err != nil {
        break
    }
  // ...
     // message logic
}
}

Write 方法

Write 方法将请求结果返回给终端。Client 会监听 send channel,当 channel 有数据时,通过 socket 连接将消息发送给终端。

func (c *Client) Write(close chan *Client) {
    for {
        select {
        case message, ok := <-c.Send:
            if !ok {
                return
            }
            c.Socket.WriteMessage(websocket.TextMessage, message)
        case <-c.Ctx.Done():
      return
        }
    }
}

ClientManger

ClientManager 组件相当于连接池,可以管理所有的终端连接,并提供注册、注销、续期功能。

type ClientManager struct {
    sync.RWMutex
    Clients    map[string]*Client 
    Register   chan *Client
    Unregister chan *Client
    Renewal    chan *Client
}

结构体的数据类型说明如下:

  • Clients:是一个集合,用于存储创建的 Client 对象。
  • Register:注册的 channel。

    • 把连接注册到 Clients 中,并通过 key-value 加入 Client 集合中,key 是连接的唯一性标识 ,value 是连接本身。
    • 把连接的唯一性标识和用户的 ID 以及建立连接的 pod address 信息,存储到 redis 中。
  • Unregister:注销的 channel。

    • 从 ClientManager 组件的 Clients 集合中移除连接对象。
    • 删除 redis 对应的缓存信息。
  • Renewal:续期的 channel,用于对 redis 的键续期。

ClientManager 只提供了一个 Start 方法,Start 方法提供监听注册、注销以及续期的 channel,通过监听这些 channel 来管理创建的连接对象。当这些 channel 有数据时,执行对应的操作。

func (manager *ClientManager) Start(ctx context.Context) {
    for {
        select {
        case conn := <-manager.Register:
            manager.Lock()
            manager.Clients[conn.UUID] = conn
            manager.Unlock()
            _, err := manager.affair.Register(ctx, &RegisterReq{
                UserID: conn.UserID,
                UUID:   conn.UUID,
                IP:     manager.IP,
            })
        case conn := <-manager.Unregister:
            _, err := manager.affair.Unregister(ctx, &UnregisterReq{
                UserID: conn.UserID,
                UUID:   conn.UUID,
            })
            conn.Socket.Close()
            close(conn.Send)
            delete(manager.Clients, conn.UUID)
        case conn := <-manager.Renewal:
                    //...
            // Key renewal to redis
        }
    }
}

消息推送

当一个消息服务实例生产用户的消息,需要推送消息给终端时,推送步骤如下:

  1. 根据 userID 从 redis 读取数据,得到连接唯一性标识和 pod address 地址,这些信息是在终端第一次与服务端建立连接的时候写入 redis 的。
  2. 此时根据 pod address,向对应的服务器发送请求。
  3. 相应的消息服务实例接收到请求。

服务端接收请求的处理逻辑如下:

  1. 根据传递过来连接唯一性标识的参数,找到标识对应的连接。我们为 ClientManager 提供了一个 Write 方法。
func (manager *ClientManager) Write(message *Message) error {
  manager.RLock()
  client, ok := manager.Clients[message.Recipient]
  manager.RUnlock()
  if !ok {
     return errors.New("client miss [" + message.Recipient + "]")
  }
  return client.SendOut(message)
}

此方法用到 ClientManager 组件的 Clients 集合,根据唯一性标识找到对应的 Client。再利用 Client 的 SendOut 方法,写出数据到终端。
2.定义 Client 的 SendOut 方法。此方法只负责:把接收到的消息转换为字节数组后,发送 Client 的 Send Channel 中。

func (c *Client) SendOut(message *Message) error {
    content, err := json.Marshal(message.Content)
    if err != nil {
        return err
    }
    c.Send <- content
    return nil
}
  1. 发送数据给终端。在前文介绍 Client 组件中,已说明 Client 组件的 send channel 有数据时,会读取 channel 产生的数据,通过连接对象发送给对应的终端。

总结

以上是 Web Socket 推送消息给终端的主要思路:通过 redis 把用户的信息以及连接的标识和 pod address 存储起来,当某个消息服务实例产生消息,从 redis 读取信息,通知连接着终端的消息服务实例,再由这些服务实例通过 WebSocket 对象给终端发送消息。全象云低代码平台也集成了消息的实时推送,用户使用平台时能及时获取最新消息状态。

下期我们将为大家带来 Knative Serving 自定义弹性伸缩,请大家持续关注。

关于全象云

全象云平台(https://portal.clouden.io)是青云科技自主研发的低代码平台,是基于云原生、用于辅助构建企业各类数字化应用的工具和集成平台。

平台目前提供云上无代码和低代码两种应用开发模式,屏蔽了技术的复杂度。支持可视化设计器,让开发人员和业务用户能够通过简单的拖拽、参数配置等方式快速完成应用开发。同时集成了 IDaaS 身份认证能力、容器 DevOps 能力,支持企业存量业务与全象云业务融合。平台还包含丰富的开发接口和强大的插件机制,开发者可根据需要不断拓展平台的应用能力。

全象云的愿景是:在企业生产经营的各个象限、各个环节提供软件构件或支持服务。
Go 语言实现 WebSocket 推送

本文由博客一文多发平台 OpenWrite 发布!
点赞
收藏
评论区
推荐文章
Stella981 Stella981
4年前
ReactNative集成个推消息推送
前言最近项目中需要集成消息推送功能,在以往的项目中都是使用的极光推送方案(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fgithub.com%2Fjpush%2Fjpushreactnative),现在的公司安卓端使用的是个推消息推送,所以计划在本次使用RN重构项目的过
Stella981 Stella981
4年前
Http长连接200万尝试及调优
对于一个server,我们一般考虑他所能支撑的qps,但有那么一种应用,我们需要关注的是它能支撑的连接数个数,而并非qps,当然qps也是我们需要考虑的性能点之一。这种应用常见于消息推送系统,也称为comet应用,比如聊天室或即时消息推送系统等。comet应用具体可见我之前的介绍,在此不多讲。对于这类系统,因为很多消息需要到产生时才推送给客户端,所以当没有
Stella981 Stella981
4年前
Netty
nettysocketio概述nettysocketio是一个开源的Socket.io服务器端的一个java的实现,它基于Netty框架,可用于服务端推送消息给客户端。说到服务端推送技术,一般会涉及WebSocket,WebSocket是HTML5最新提出的规范,虽然主流浏览器都已经支持,但仍然可能有不兼容的
Easter79 Easter79
4年前
SwiftUI
简介消息推送相信在很多人的眼里都不陌生了吧?像即时聊天微信,好友发信息给你时会在顶部弹下小窗口提醒你。也像是在影院APP预订了电影票,在开场前一小时你也会收到提醒。这类推送是需要经过后端发送请求的,需要服务器发送推送请求,又或者使用如极光推送等第三方渠道。那么如果我们的APP不需要连网呢?这是不是就不能使用消息推送了?不是的,苹果还提供给我们本
Wesley13 Wesley13
4年前
3分钟了解华为推送服务优势,第一项就让你心动!
消息推送(Pushnotification)指产品运营人员通过自身或三方的“推送服务”向用户主动地推送消息。简单来说,我们在移动设备(例如:手机)的通知中心或锁屏界面看到的消息都属于消息推送。作为消息推送的服务提供商之一,华为推送具有怎样的特点和优势?!在这里插入图片描述(https://imgblog.csdnimg.cn/202012221
Stella981 Stella981
4年前
Knative 实战:基于阿里云 Kafka 实现消息推送
在Knative中已经提供了对Kafka事件源的支持,那么如何在阿里云上基于Kafka实现消息推送,本文给大家解锁这一新的姿势。背景消息队列forApacheKafka是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。消息队列forApacheKafka广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等
Stella981 Stella981
4年前
Android 必备进阶之百度推送
写在前边今天给大家推送一篇关于百度推送的文章。我们手机上常用的App都会时不时的推送消息在我们的消息栏显示,常用的是QQ消息推送、微信消息推送、支付宝转账消息推送等。以后再做大大小小的项目都会用到推送,今天就总结了一篇用百度云做推送消息,以后做项目会经常用到的,有时间就学习一下吧!!(https://oscimg.oschin
Stella981 Stella981
4年前
RabbitMQ学习:RabbitMQ的基本概念及RabbitMQ使用场景(二)
1、RabbitMQ的基本概念RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统
Stella981 Stella981
4年前
RabbitMQ学习:安装RabbitMQ及RabbitMQ的初步配置(一)
RabbitMQ基础含义RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。
Stella981 Stella981
4年前
Go实现基于WebSocket的弹幕服务
拉模式和推模式拉模式1、数据更新频率低,则大多数请求是无效的2、在线用户量多,则服务端的查询负载高3、定时轮询拉取,实时性低推模式1、仅在数据更新时才需要推送2、需要维护大量的在线长连接3、数据更新后可以立即推送基于webSocket推送1、浏览器支持的socket编
程序员小五 程序员小五
1年前
融云IM干货丨如果用户不在线,推送通知会怎样处理?
如果用户不在线,融云的推送通知会按照以下方式处理:离线消息推送:当用户不在线时,融云会将收到的单聊消息、群聊消息、系统消息、超级群消息通过第三方推送厂商或融云自建的推送服务通知客户端。这意味着即使用户的应用没有运行,他们也能通过系统通知栏接收到消息提醒。服