Redis 实战 —— 09. 实现任务队列、消息拉取和文件分发

贾菌
• 阅读 4215

任务队列 P133

通过将待执行任务的相关信息放入队列里面,并在之后对队列进行处理,可以推迟执行那些耗时对操作,这种将工作交给任务处理器来执行对做法被称为任务队列 (task queue) 。 P133

先进先出队列 P133

可以 Redis 的列表结构存储任务的相关信息,并使用 RPUSH 将待执行任务的相关信息推入列表右端,使用阻塞版本的弹出命令 BLPOP 从队列中弹出待执行任务的相关信息(因为任务处理器除了执行任务不需要执行其他工作)。 P134

发送任务

// 将任务参数推入指定任务对应的列表右端
func SendTask(conn redis.Conn, queueName string, param string) (bool, error) {
    count, err := redis.Int(conn.Do("RPUSH", queueName, param))
    if err != nil {
        return false, nil
    }
    // 只有成功推入 1 个才算成功发送
    return count == 1, nil
}

执行任务

// 不断从任务对应的列表中获取任务参数,并执行任务
func RunTask(conn redis.Conn, queueName string, taskHandler func(param string)) {
    for ; ; {
        result, err := redis.Strings(conn.Do("BLPOP", queueName, 10))
        // 如果成功获取任务信息,则执行任务
        if err != nil && len(result) == 2 {
            taskHandler(result[1])
        }
    }
}

以上代码是任务队列与 Redis 交互的通用版本,使用方式简单,只需要将入参信息序列化成字符串传入即可发送一个任务,提供一个处理任务的方法回调即可执行任务。

任务优先级 P136

在此基础上可以讲原有的先进先出任务队列改为具有优先级的任务队列,即高优先级的任务需要在低优先级的任务之前执行。 BLPOP 将弹出第一个非空列表的第一个元素,所以我们只需要将所有任务队列名数组按照优先级降序排序,让任务队列名数组作为 BLPOP 的入参即可实现上述功能(当然这种如果高优先级任务的生成速率大于消费速率,那么低优先级的任务就永远不会执行)。 P136

优先执行高优先级任务

// 不断从任务对应的列表中获取任务参数,并执行任务
// queueNames 从前往后的优先级依次降低
func RunTasks(conn redis.Conn, queueNames []string, queueNameToTaskHandler map[string]func(param string)) {
    // 校验是否所有任务都有对应的处理方法
    for _, queueName := range queueNames {
        if _, exists := queueNameToTaskHandler[queueName]; !exists {
            panic(fmt.Sprintf("queueName(%v) not in queueNameToTaskHandler", queueName))
        }
    }
    // 将所有入参放入同一个数组
    length := len(queueNames)
    args := make([]interface{}, length + 1)
    for i := 0; i < length; i++ {
        args[i] = queueNames[i]
    }
    args[length] = 10
    for ; ; {
        result, err := redis.Strings(conn.Do("BLPOP", args...))
        // 如果成功获取任务信息,则执行任务
        if err != nil && len(result) == 2 {
            // 找到对应的处理方法并执行
            taskHandler := queueNameToTaskHandler[result[0]]
            taskHandler(result[1])
        }
    }
}
延迟任务 P136

实际业务场景中还存在某些任务需要在指定时间进行操作,例如:邮件定时发送等。此时还需要存储任务执行的时间,并将可以执行的任务放入刚刚的任务队列中。可以使用有序集合进行存储,时间戳作为分值,任务相关信息及队列名等信息的 json 串作为键。

发送延迟任务

// 存储延迟任务的相关信息,用于序列化和反序列化
type delayedTaskInfo struct {
    UnixNano  int64  `json:"unixNano"`
    QueueName string `json:"queueName"`
    Param     string `json:"param"`
}
// 发送一个延迟任务
func SendDelayedTask(conn redis.Conn, queueName string, param string, executeAt time.Time) (bool, error) {
    // 如果已到执行时间,则直接发送到任务队列
    if executeAt.UnixNano() <= time.Now().UnixNano() {
        return SendTask(conn, queueName, param)
    }
    // 还未到执行时间,需要放入有序集合
    // 序列化相关信息
    infoJson, err := json.Marshal(delayedTaskInfo{
        UnixNano: time.Now().UnixNano(),
        QueueName:queueName,
        Param:param,
    })
    if err != nil {
        return false, err
    }
    // 放入有序集合
    count, err := redis.Int(conn.Do("ZADD", "delayed_tasks", infoJson, executeAt.UnixNano()))
    if err != nil {
        return false, err
    }
    // 只有成功加入 1 个才算成功
    return count == 1, nil
}

拉取可执行的延迟任务,放入任务队列

// 轮询延迟任务,将可执行的任务放入任务队列
func PollDelayedTask(conn redis.Conn) {
    for ; ; {
        // 获取最早需要执行的任务
        infoMap, err := redis.StringMap(conn.Do("ZRANGE", "delayed_tasks", 0, 0, "WITHSCORES"))
        if err != nil || len(infoMap) != 1 {
            // 睡 1ms 再继续
            time.Sleep(time.Millisecond)
            continue
        }
        for infoJson, unixNano := range infoMap {
            // 已到时间,放入任务队列
            executeAt, err := strconv.Atoi(unixNano)
            if err != nil {
                log.Errorf("#PollDelayedTask -> convert unixNano to int error, infoJson: %v, unixNano: %v", infoJson, unixNano)
                // 做一些后续处理,例如:删除该条信息,防止耽误其他延迟任务
            }
            if int64(executeAt) <= time.Now().UnixNano() {
                // 反序列化
                info := new(delayedTaskInfo)
                err := json.Unmarshal([]byte(infoJson), info)
                if err != nil {
                    log.Errorf("#PollDelayedTask -> infoJson unmarshal error, infoJson: %v, unixNano: %v", infoJson, unixNano)
                    // 做一些后续处理,例如:删除该条信息,防止耽误其他延迟任务
                }
                // 从有序集合删除该信息,并放入任务队列
                count, err := redis.Int(conn.Do("ZREM", "delayed_tasks", infoJson))
                if err != nil && count == 1 {
                    _, _ = SendTask(conn, info.QueueName, info.Param)
                }
            } else {
                // 未到时间,睡 1ms 再继续
                time.Sleep(time.Millisecond)
            }
        }
    }
}

有序集合不具备列表的阻塞弹出机制,所以程序需要不断循环,并尝试从队列中获取要被执行的任务,这一操作会增大网络和处理器的负载。可以通过在函数里面增加一个自适应方法 (adaptive method) ,让函数在一段时间内都没有发现可执行的任务时,自动延长休眠时间,或者根据下一个任务的执行时间来决定休眠的时长,并将休眠时长的最大值限制为 100ms ,从而确保任务可以被及时执行。 P138

消息拉取 P139

两个或多个客户端在互相发送和接收消息的时候,通常会使用以下两种方法来传递信息: P139

  • 消息推送 (push messaging) :即由发送者来确保所有接受者已经成功接收到了消息。 Redis 内置了用于进行消息推送的 PUBLISH 命令和 SUBSCRIBE 命令(05. Redis 其他命令简介 介绍了这两个命令的用法和缺陷)
  • 消息拉取 (pull messaging) :即由接受者自己去获取存储的信息
单个接受者 P140

单个接受者时,只需要将发送的信息保存至每个接收者对应的列表中即可,使用 RPUSH 可以向执行接受者发送消息,使用 LTRIM 可以移除列表中的前几个元素来获取收到的消息。 P140

多个接受者 P141

多个接受者的情况类似群组,即群组内的人发消息,其他人都可以收到。我们可以使用以下几个数据结构存储所需数据,以便实现我们的所需的功能:

  • STRING: 群组的消息自增 id

    • INCR: 实现 id 自增并获取
  • ZSET: 存储该群组中的每一条信息,分值为当前群组内的消息自增 id

    • ZRANGEBYSCORE: 获得未获取的消息
  • ZSET: 存储该群组中每个人获得的最新一条消息的 id ,所有消息均未获取时为 0

    • ZCARD: 获取群组人数
    • ZRANGE: 经过处理后,可实现哪些消息成功被哪些人接收了的功能
    • ZRANGE: 获取 id 最小数据,可实现删除被所有人获取过的消息的功能
  • ZSET: 存储一个人所有群组获得的最新一条消息的 id ,离开群组时自动删除,加入群组时初始化为 0

    • ZCARD: 获取所在的群组个数
    • ZRANGE: 经过处理后,可实现批量拉取所有群组的未获取的消息的功能

文件分发 P145

根据地理位置聚合用户数据 P146

现在拥有每个 ip 每天进行活动的时间和具体操作,现需要计算每天每个城市的人操作数量(类似于统计日活)。

原始数据十分巨大,所以需要分批读入内存进行聚合统计,而聚合后的数据相对来说很小,所以完全可以在内存中进行聚合统计,完成后再将结果写入 Redis 中,可以有效减少程序与 Redis 服务的通信次数,缩短任务时间。

日志分发及处理

现在有一台机器的本地日志需要交给多个日志处理器进行不同的分析。

这种场景类似群组,所以我们可以复用上面提到的支持多个接受者的消息拉取组件。

本地机器:

  1. 将所有日志发送至群组,最后再发送一条结束消息
  2. 等待所有日志处理器处理完(群组对应的完成标识 = 群组内的成员数 - 1)
  3. 清理本次发送的所有日志

日志处理器:

  1. 不断从群组中拉取消息,并进入相关处理,直至拉取到结束消息
  2. 对群组对应的完成标识进行 INCR ,表示当前日志处理器已完成处理
本文首发于公众号:满赋诸机(点击查看原文) 开源在 GitHub :reading-notes/redis-in-action
Redis 实战 —— 09. 实现任务队列、消息拉取和文件分发
点赞
收藏
评论区
推荐文章
九路 九路
4年前
4.1 手写Java PriorityQueue 核心源码
本章先讲解优先级队列和二叉堆的结构。下一篇代码实现从一个需求开始假设有这样一个需求:在一个子线程中,不停的从一个队列中取出一个任务,执行这个任务,直到这个任务处理完毕,再取出下一个任务,再执行。其实和Android的Handler机制中的Looper不停的从MessageQueue中取出一个消息然后处理是一样的。不过这个需
菜园前端 菜园前端
2年前
什么是宏任务与微任务?
原文链接:事件循环机制在事件循环中,每进行一次循环操作称为tick,每一次tick的任务处理是比较复杂的。关键步骤如下:1.执行一个宏任务2.执行过程中如果遇到微任务,就将它添加到微任务的任务队列中3.宏任务执行完毕后,立即执行当前微任务队列中的所有微任务
Stella981 Stella981
3年前
Python 并行分布式框架之 Celery
Celery(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fwww.celeryproject.org%2F) (芹菜)是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。架构设计!(http://s
Stella981 Stella981
3年前
JavaScript:再谈Tasks和Microtasks
JavaScript是单线程,也就是说JS的堆栈中只允许有一类任务在执行,不可以同时执行多类任务。在读js文件时,所有的同步任务是一条task,当然了,每一条task都是一个队列,按顺序执行。而如果在中途遇到了setTimeout这种异步任务,就会将它挂起,放到任务队列中去执行,等执行完毕后,如果有callback,就把callback推入到Tasks中去,
Stella981 Stella981
3年前
RabbitMQ之消息发布订阅与信息持久化技术
信息发布与订阅Rabbit的核心组件包含Queue(消息队列)和Exchanges两部分,Exchange的主要部分就是对信息进行路由,通过将消息队列绑定到Exchange上,则可以实现订阅形式的消息发布及Publish/Subscribe在这种模式下消息发布者只需要将信息发布到相应的Exchange中,而Ex
Wesley13 Wesley13
3年前
IM 消息服务架构
IM消息架构主要有1、消息redis缓存队列及用户信息memcache2、消息的数据落地(入库mysql)3、消息的发送4、离线消息服务5、过期消息服务消息redis缓存队列服务端落地队列1.客户端通过HTTPS
Stella981 Stella981
3年前
Celery简单说明以及在Django中的配置
Celery1.什么是CleleryCelery是一个简单、灵活且可靠的,处理大量消息的分布式系统专注于实时处理的异步任务队列同时也支持任务调度Celery架构Celery的架构由三部分组成,消息中间件(messagebroker),任务执行单元(worker)和任务执行结果存储(taskresu
Easter79 Easter79
3年前
SpringMVC中配置RabbitMQ
        RabbitMQ是工作在amqp协议(advancedmessagequeueprotocal,高级消息队列协议)上的一个消息中间件。它通过一个生产者消费者模型来处理应用中产生的消息。        除了生产者和消费者,此模型中另外一个重要的概念叫“工作队列”,也称为“任务队列”(TaskQueue),任务队列背后的核心想法是避免
Stella981 Stella981
3年前
RabbitMQ指南之二:工作队列(Work Queues)
在上一章的指南中,我们写了一个命名队列:生产者往该命名队列发送消息、消费从从该命名队列中消费消息。在本章中,我们将创建一个工作队列,用于在多个工作者之间分配耗时的任务。工作队列(即任务队列)的主要思想是避免立即执行那些需要等他们执行完成的资源密集型任务。相反,我们将任务安排在稍后完成。我们将任务封装为消息并将其发送到队列,后台运行的工作进程将取出任务并执行完
Wesley13 Wesley13
3年前
Java多线程之线程池
 newFixedThreadPool:固定线程池,核心线程数和最大线程数固定相等,而空闲存活时间为0毫秒,说明此参数也无意义,工作队列为最大为Integer.MAX\_VALUE大小的阻塞队列。当执行任务时,如果线程都很忙,就会丢到工作队列等有空闲线程时再执行,队列满就执行默认的拒绝策略 newCachedThreadPool:带缓冲
Stella981 Stella981
3年前
Spring Boot与RabbitMQ结合实现延迟队列的示例
背景何为延迟队列?顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。场景二:用户希望通过手机远程遥控