Redis发布订阅:最简单最通俗的文章

我是阿沐
• 阅读 2358

前言

可能小伙伴的工作年限大部分已经超过三年甚至四年五年,不知道是否有一种危机感,我们写了那么多的需求代码没有20w行也有个10w行了吧,但是出去找工作的时候不是笔试被pass掉就是面试被pass,你会发现好多你只是知道但是回答不上来。这个时候你才知道去补习知识点,其实这种做法对自身发展不太友好的。

我去年疫情期间,在大家都不敢跳槽季节我义无反顾选择跳槽,进入大家说的bat一线大厂。最近跟之前老东家的同事聊了聊技术栈、家常;说是面试了很多两三年经验开发者基础太薄弱等等。

所以我也从4月底跟随之前的朋友一起开始了写作之路,我基本上是以面对对象是小白讲解方式开展自己的写作模式,期间也有小伙伴让我写高级点的😭 😭 😭 ,但是确实不敢在那么大佬面前造次;还是坚持从0到1的redis讲解之路,希望能得到小伙伴的支持。

大部分是根据自己以往的面试和同事交流得出来的,有不对的地方还希望小伙伴们多多指正。

那么开启我们新的一轮面试知识点之旅.....

正文

今天要聊的知识点是redis的订阅发布功能,虽然说现在大厂都使用了kafkaRabbitMQActiveMQ, RocketMQ;这几种我大概用了三种,其实实现原理和内部使用方式都大同小异。为什么讲redis的呢?因为轻量、直接使用,而上面几种适合大数据量,对数据准确性要求高的场景,作为第三方组件,在小公司考虑到成本人力是不是太有好的,存在更多风险。

为什么要用发布订阅

其实理论上我们之前的列表场景使用双端链表就可以实现发布与订阅功能,但是这种通过链表来实现的发布与订阅功能有两个局限性:

  • 1、基于链表实现的消息队列,不能支持一对多的消息分发。
  • 2、假如生产者生成的速率远远大于消费者消费消息的速率,可能会导致未消费消息占用大量的内存(需要开启足够多的消费进程)。

我画两张图进行对比,小伙伴们一眼就能看出来区别:

Redis发布订阅:最简单最通俗的文章

Redis发布订阅:最简单最通俗的文章

从上面的图中可以看出普通消息队列:只能有一个多个消费者去消费,却不能将消息分发给其他消费者;redis订阅发布:生产者生产完消息通过频道分发消息给订阅该频道的消费者,这样就可以较少队列数据的积攒,导致内存暴增。

所以为了解决这两个局限性,Redis当中选择了通过其他命令来实现发布与订阅模式。

redis订阅发布的基本命令

psubscribe指令: psubscribe pattern [pattern ...] 订阅一个或多个符合给定模式的频道;时间复杂度O(n),n是订阅的模式的数量。

注意点:每个模式以 * 作为匹配符;例如 mumu*匹配所有以 mumu 开头的频道:mumu.juejin、mumu.zhihu、mumu.csdn

publist指令:publish channel message 把信息message发送到指定的频道channel;时间复杂度O(n+m),n是频道channel的订阅者数量,m则是使用模式订阅(subscribed patterns)的客户端的数量。

注意点:结果集返回是接收到message的订阅者数量,没有订阅者返回0。

pubsub指令:pubsub channels [argument [argument ...]] 查看订阅与发布系统状态;时间复杂度O(n),n为活跃频道的数量(对于长度较短的频道和模式来说,将进行模式匹配的复杂度视为常数)。

注意:列出当前的活跃频道(指的是那些至少有一个订阅者的频道, 订阅模式的客户端不计算在内),返回一个活跃频道组成的列表。

punsubscribe指令:punsubscribe [pattern [pattern ...]] 退订所有给定模式的频道;时间复杂度O(n+m),其中n是客户端已订阅的模式的数量, m则是系统中所有客户端订阅的模式的数量。

注意:pattern未指定那么订阅的所有模式都会被退订;否则只会退订指定的订阅的模式

subscribe指令:subscribe channel [channel ...] 订阅给定的一个或多个频道的信息;时间复杂度O(n),其中n是订阅的频道的数量。

unsubscribe指令:unsubscribe channel [channel ...] 指退订给定的频道;时间复杂度O(n),其中n是订阅的频道的数量。

注意:若没有指定退订channel,则默认退订所有频道;否则退订指定频道。BSCRIBE 命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。

那么在Redis中的发布与订阅也分为两种类型,一种是基于频道来实现,一种是基于模式来实现。

基于频道实现讲解

  • subscribe channe1 channel2 channel3 ... :订阅一个或者多个频道
  • unsubscribe channe1 channel2 channel3 ... :退订订阅的指定频道(关闭客户端终端没用,需要命令退订)
  • publish channe1 message:对指定频道发送消息
  • pubsub numsub channel1 channel2:查看指定频道的订阅数

好记性不如烂笔头,光看不练假把戏:

127.0.0.1:6379> SUBSCRIBE mumu_1 mumu_2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"    -- 返回值的类型:显示订阅成功
2) "mumu_1"       -- 订阅的频道名字
3) (integer) 1    -- 目前已订阅的频道数量

1) "subscribe"
2) "mumu_2"
3) (integer) 2

1) "message"      -- 返回值的类型:信息
2) "mumu_1"       -- 来源(从那个频道发送过来)
3) "\xe6\x88\x91\xe6\x98\xaf\xe9\x98\xbf\xe6\xb2\x90\xe5\x95\x8a" -- 消息内容

Redis发布订阅:最简单最通俗的文章

//获取指定频道的订阅的客户端数量
127.0.0.1:6379> PUBSUB numsub mumu_1 mumu_2
1) "mumu_1"    -- 频道名称
2) (integer) 1 -- 订阅该频道的客户端数量
3) "mumu_2"
4) (integer) 1
127.0.0.1:6379> pubsub channels
1) "mumu_2"  -- 频道名称
2) "mumu_1"  -- 频道名称

Redis发布订阅:最简单最通俗的文章

127.0.0.1:6379> UNSUBSCRIBE mumu_1
1) "unsubscribe"  -- 返回值的类型:显示取消订阅成功
2) "mumu_1"       -- 取消订阅的频道名字
3) (integer) 0

我们看下基于频道的实现原理:

源码路径redis-5.0.7/src/server.h我把redis源码下载到本地查看了;大约1239行

struct redisServer {
    /* General */
    pid_t pid;   

    //省略百十行

    // 百度翻译😜 😜 😜之后意思是: 将频道映射到已订阅客户端的列表(就是保存客户端和订阅的频道信息)
    dict *pubsub_channels;  /* Map channels to list of subscribed clients */
}

pubsub_channels定义的属性是一个字典类型,保存着客户端和频道信息,key值保存的就是频道名value是一个链表,链表中保存的是客户端id

Redis发布订阅:最简单最通俗的文章

频道订阅:订阅频道时先检查字段内部是否存在;不存在则为当前频道创建一个字典且创建一个链表存储客户端id;否则直接将客户端id插入到链表中。

取消频道订阅:取消时将客户端id从对应的链表中删除;如果删除之后链表已经是空链表了,则将会把这个频道从字典中删除。

基于模式实现讲解

  • psubscribe pattern1 pattern2 pattern3 ... :订阅一个或多个符合给定模式的频道,每个模式以 * 作为匹配符
  • punsubscribe pattern1 pattern2 pattern3 ... :取消模式的订阅(关闭客户端终端没用,需要命令退订)
  • pubsub numpat pattern1 返回订阅模式的数量,返回的不是订阅模式的客户端的数量,而是客户端订阅的所有模式的数量总和。时间复杂度O(1),(具体为啥,请看下面原来解析结构)

说起时那时快,赶紧动手来实践,眼见为实:

127.0.0.1:6379> PSUBSCRIBE mumu.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"       -- 返回值的类型:显示订阅成功
2) "mumu.*"           -- 订阅的模式
3) (integer) 1        -- 目前已订阅的模式的数量

1) "pmessage"         -- 返回值的类型:信息
2) "mumu.*"           -- 信息匹配的模式
3) "mumu.list"        -- 信息本身的目标频道
4) "i am a mumu"      -- 信息的内容

Redis发布订阅:最简单最通俗的文章

127.0.0.1:6379> PUNSUBSCRIBE mumu.*
1) "punsubscribe"  -- 返回值的类型:显示退订成功
2) "mumu.*"        -- 退订的模式
3) (integer) 1     -- 目前已退订的模式的数量

我们看下基于模式的实现原理:

源码路径redis-5.0.7/src/server.h我把redis源码下载到本地查看了;大约1240行

struct redisServer {
    /* General */
    pid_t pid;   

    //省略百十行

    // 百度翻译😄 😄 😄之后意思是:pubsub订阅的列表信息(大致就是存储订阅模式的信息)
    list *pubsub_patterns;  /* A list of pubsub_patterns */
}

// 1303行订阅模式列表结构:
typedef struct pubsubPattern {
    client *client;  -- 订阅模式客户端
    robj *pattern;   -- 被订阅的模式
} pubsubPattern;

Redis发布订阅:最简单最通俗的文章

模式订阅:新增一个pubsub_pattern数据结构添加到链表的最后尾部,同时保存客户端ID

取消模式订阅:从当前的链表pubsub_patterns结构中删除需要取消的模式订阅。

从上面的一些实际实践结果和结合图形是不是对redis发布订阅进一步了解了呢?

那么我们使用redis发布订阅能做什么?

发布订阅(pub/sub)可以这么理解:订阅者(listener)负责订阅频道(channel);发送者(publisher)负责向频道发送二进制的字符串消息,然后频道收到消息时,推送给订阅者。

  • 电商中,用户下单成功之后向指定频道发送消息,下游业务订阅支付结果这个频道处理自己相关业务逻辑
  • 粉丝关注功能
  • 文章推送
  • 等等等等

实践编码

消费者订阅Subscribe.php
<?php
/**
 * Created by 我是阿沐.
 * Date: 2021/05/04
 * Time: 下午16:02
 * QQ: 2511221051@qq.com
 */

// 设置php脚本执行时间
set_time_limit(0);

// 申明测试的平道名称
$channel_names = ['mumu_test1', 'mumu_test2', 'mumu_test3'];
//当前执行时间
$cur_time = time();
try {
    // 实例化redis
    $redis = new Redis();

    // 创建redis链接
    $redis->pconnect('127.0.0.1', 6379);

    //echo "Server is running: " . $redis->ping();
    //阻塞获取消息
    while (true) {
        // 阻塞获取消息 $redis redis的实例  $channel_name 频道名称  $msg 生产者生成的消息体
        $redis->subscribe($channel_names, function ($redis, $channel_name, $msg) {
              switch ($channel_name) {
                case 'mumu_test1':
                    echo "channel:".$channel_name.",message:".$msg."\n";
                    break;
                case 'mumu_test2':

                    break;
                case 'mumu_test3':

                    break;
            }
            if (!$msg) { //当没有收到消息时 就休眠1s钟
                echo "channel:".$channel_name.",message: not appoint channel name"."\n";
                sleep(1);
            }
        });
        // 本地测试 运行超过10分钟 则自动结束 并关闭redis链接
        if (time() - $cur_time > 10*60){
            $redis -> close();
            break;
        }
    }
} catch (Exception $e) {
    echo $e->getMessage();
}
生产者发送消息Publish.php
<?php
/**
 * Created by 我是阿沐.
 * Date: 2021/05/04
 * Time: 下午16:04
 * QQ: 2511221051@qq.com
 */

// 申明测试的平道名称
$channel_names = ['mumu_test1', 'mumu_test2', 'mumu_test3', 'mumu_test4'];

$channel_name = $channel_names[rand(0,3)];

try {
    // 实例化redis类
    $redis = new Redis();
    // 建立redis链接
    $redis->connect('127.0.0.1', 6379);

    for ($i = 0; $i < 10; $i++) {

        $data = array('key' => 'key' . ($i+1), 'msg' => 'I am li a mu !');

        $ret = $redis->publish($channel_name, json_encode($data));

        print_r($ret);
    }
} catch (Exception $e) {
    echo $e->getMessage();
}

🐧 执行结果集

终端执行消费者订阅,开始阻塞获取消息/usr/local/opt/php@7.2/bin/php Subscribe.php;结果集:

➜  publish-subscribe git:(master) ✗ /usr/local/opt/php@7.2/bin/php Subscribe.php
channel:mumu_test1,message:{"key":"key1","msg":"I am li a mu !"}
channel:mumu_test1,message:{"key":"key2","msg":"I am li a mu !"}
channel:mumu_test1,message:{"key":"key3","msg":"I am li a mu !"}
channel:mumu_test1,message:{"key":"key4","msg":"I am li a mu !"}
channel:mumu_test1,message:{"key":"key5","msg":"I am li a mu !"}
channel:mumu_test1,message:{"key":"key6","msg":"I am li a mu !"}
channel:mumu_test1,message:{"key":"key7","msg":"I am li a mu !"}
channel:mumu_test1,message:{"key":"key8","msg":"I am li a mu !"}
channel:mumu_test1,message:{"key":"key9","msg":"I am li a mu !"}
channel:mumu_test1,message:{"key":"key10","msg":"I am li a mu !"}

上面就是生产者生成的消息内容:msg字符串

终端执行生产者生产数据,开始发送消息/usr/local/opt/php@7.2/bin/php Publish.php;结果集:
➜  publish-subscribe git:(master) ✗ /usr/local/opt/php@7.2/bin/php Publish.php
1111111111

// 这种场景是 我模拟发送了没有创建的频道 mumu_test4 导致返回的结果集都是0 说明没有被订阅的channel,消息会被丢弃
➜  publish-subscribe git:(master) ✗ /usr/local/opt/php@7.2/bin/php Publish.php
0000000000

🐮 注意事项

1、订阅的消费者需要一直执行,阻塞获取消息,如果断开则表示退订了。

2、channel只接收publish发送的消息,自身是不存储消息,假如channel没有被订阅,则消息会被丢弃掉。

3、生产者生成消息时,只需要向频道内丢入消息即可。

🙈 当然还有这些命令可以玩耍

$redis->pubsub('channels'); // 获取所有频道
$redis->pubsub('channels', '*pattern*'); // 仅仅获取指定频道
$redis->pubsub('numsub', ['channel1', 'channel2']); // 查看指定频道的订阅数
$redis->pubsub('numpat'); // 返回订阅模式的数量
$redis->unsubscribe(['channel1', 'channel2']); // 客户端退订指定的频道
$redis->punsubscribe(['pattern1', 'pattern2']); // 客户端退订所有指定定模式

小伙伴们本地实践操作起来~,千看不如写一遍。

redis发布订阅的优缺点

小伙伴们从上面的实践操作来看,PubSub生产的消息,如果没有对应的频道或者消费者,消息会被丢弃,直接投递失败返回0状态。假如我们实际生产环境在消费的时候,突然网络波动,导致其中一个消费者挂掉了一段时间,那么当它重新连接上的时候,中间这一段时间产生的消息也将不会存在。也就是说Redis本身是不会存储消息体信息的。

那么在我们生产环境数量不大且想节约成本的时候,redis的发布订阅功能可能比较适合我们公司;轻量级、方便使用配合consul+supervisor+swool可以常驻内存,开多进程消费(消息队列也可以用的)。

🐣 总结

本文主要通过整理PubSub的实际操作指令,然后结合底层的源码分析它们之间的存储结构;再通过实际的客户端操作,来说明返回参数的具体意思;最最最后通过实践写代码运行展示。同时也列出PubSub的优缺点,帮助大家在实际的工作中可以有更好的选择。最后好记性不如多亲自动手实践,唯有实践,才知其本质。

好了,我是阿沐,一个不想30岁就被淘汰的打工人 ⛽️ ⛽️ ⛽️ 。创作不易觉得「阿沐」写的有点料话:👍 关注一下,💖 分享一下,我们下期再见。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Karen110 Karen110
2年前
​一篇文章总结一下Python库中关于时间的常见操作
前言本次来总结一下关于Python时间的相关操作,有一个有趣的问题。如果你的业务用不到时间相关的操作,你的业务基本上会一直用不到。但是如果你的业务一旦用到了时间操作,你就会发现,淦,到处都是时间操作。。。所以思来想去,还是总结一下吧,本次会采用类型注解方式。time包importtime时间戳从1970年1月1日00:00:00标准时区诞生到现在
Stella981 Stella981
2年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
2年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Android蓝牙连接汽车OBD设备
//设备连接public class BluetoothConnect implements Runnable {    private static final UUID CONNECT_UUID  UUID.fromString("0000110100001000800000805F9B34FB");
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
我是阿沐
我是阿沐
Lv1
男 · 腾讯音乐后端开发工程师 | 微信搜:我是阿沐
思绪来得快去得也快,偶尔会在这里停留
文章
15
粉丝
3
获赞
5