『笔记』可扩展架构设计之消息队列

夜游神
• 阅读 134

前言

众所周知,开发低耦合系统是软件开发的终极目标之一。低耦合的系统更加容易扩展,低耦合的模块更加容易复用,更易于维护和管理。我们知道,消息队列的主要功能就是收发消息,但是它的作用不仅仅只是解决应用之间的通信问题这么简单。消息队列作为常用的中间件,经常被用来对系统解耦,对模块解耦。增强系统的可扩展性和模块的可复用性。

除了对用于对系统、模块解耦,消息队列还有以下几种通途:

  • 服务异步处理
  • 流量控制
  • 作为发布 / 订阅系统实现一个微服务级系统间的观察者模式
  • 连接流计算任务和数据
  • 用于将消息广播给大量接收者

事物的存在总会有对立的一面,引入消息队列可能会带来延迟问题、产生数据不一致的问题、增加系统复杂度的问题等等。

<!-- more -->

EDA 架构之生产者与消费者模式

事件驱动架构(Event Driven Architecture, EDA)

EDA 架构原理

事件驱动架构由事件发起者和事件使用者组成。事件的发起者检测或感知事件,并以消息的形式来表示事件。它并不知道事件的使用者或事件引起的结果。

检测到事件后,系统会通过事件通道从事件发起者传输给事件使用者,而事件处理平台则会在该通道中以异步方式处理事件。事件发生时,需要通知事件使用者。他们可能会处理事件,也可能只是受事件的影响。

事件处理平台将对事件做出正确响应,并将活动下发给相应的事件使用者。通过这种下发活动,我们就可以看到事件的结果。

检测到事件后,系统会通过事件通道从事件发起者传输给事件使用者,而事件处理平台则会在该通道中以异步方式处理事件。事件发生时,需要通知事件使用者。他们可能会处理事件,也可能只是受事件的影响。

事件处理平台将对事件做出正确响应,并将活动下发给相应的事件使用者。通过这种下发活动,我们就可以看到事件的结果。

生产者-消费者模型

操作系统中常见的 EDA 架构就是生产者-消费者模型。消息队列常用来作为生产者和消费者之间的缓冲带,平衡生产者和消费者的处理能同时对服务进行解耦。有了这层缓冲带,生产者和消费者可能都不知道对方的存在。

<fancybox>『笔记』可扩展架构设计之消息队列</fancybox>

以下为生产者-消费者模型的简单实现,(内存消息队列)

import time

from queue import Queue
from random import randint
from threading import Thread

class Producer(Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:
            productA = randint(0, 10)
            productB = randint(90, 100)
            print('Produce A「number」: {}, Produce B「number」: {}'.format(productA, productB))
            self.queue.put((productA, productB))
            time.sleep(2)

class Consumer(Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:
            # block=True, if queue is empty, block(阻塞)
            products_tuple = self.queue.get(block=True)
            print(f'Consume products: {products_tuple[0]} & {products_tuple[1]}')
            time.sleep(randint(0, 10))

def main():
    queue = Queue()
    producer = Producer(queue)
    consumer = Consumer(queue)

    producer.start()
    consumer.start()

main()
"""
Produce A「number」: 8, Produce B「number」: 95
Consume products: 8 & 95
Produce A「number」: 4, Produce B「number」: 92
Consume products: 4 & 92
Produce A「number」: 9, Produce B「number」: 90
... """

基于ZeroMQ PubSub模式的观察者模式实例

<fancybox>『笔记』可扩展架构设计之消息队列</fancybox>

<fancybox>『笔记』可扩展架构设计之消息队列</fancybox>

# publisher1.py
import time
import zmq

def publisher1():
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5555")

    while True:
        count = 99

        while True:
            time.sleep(1)
            socket.send_string('publisher1 pushes event %d' % count)
            print('push event %d' % count)
            count += 1

if __name__ == "__main__":
    publisher1()
# publisher2.py

import time
import zmq

def publisher2():
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5556")

    while True:
        count = 1

        while True:
            time.sleep(1)
            socket.send_string('publisher2 pushes event %d' % count)
            print('push event %d' % count)
            count += 1

if __name__ == "__main__":
    publisher2()
# subscriber1.py

import zmq

def subscriber1():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect('tcp://127.0.0.1:5555')
    socket.connect('tcp://127.0.0.1:5556')
    socket.setsockopt_string(zmq.SUBSCRIBE, '')

    while True:
        message = socket.recv()
        print('message: %s' % message)

if __name__ == "__main__":
    subscriber1()
# subscriber2.py

import zmq

def subscriber2():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect('tcp://127.0.0.1:5555')
    socket.connect('tcp://127.0.0.1:5556')
    socket.setsockopt_string(zmq.SUBSCRIBE, '')

    while True:
        message = socket.recv()
        print('message: %s' % message)

if __name__ == "__main__":
    subscriber2()

秒杀系统的架构设计与消息队列

某秒杀系统的主要处理步骤如下:

  • 风险控制
  • 库存锁定
  • 生成订单
  • 短信通知
  • 更新统计数据

使用消息队列进行异步处理

由于秒杀成功的关键取决于风险控制、库存锁定这两步骤,所以 server 端处理了这两步之后可以给 client 端返回结果了,后续的步骤可放入消息队列中异步执行。不一定要在秒杀请求中完成。集中资源处理关键步骤(同步),碎片时间(全部秒杀请求处理结束)处理次要步骤(异步)。

<fancybox>『笔记』可扩展架构设计之消息队列</fancybox>

使用消息队列进行流量控制(削峰)

秒杀开始后,将超过 server 端处理上限(短时间内)的秒杀请求放入消息队列中,后续有能力处理时再对消息队列中消费请求进行处理。对于超时的请求可以直接丢弃(秒杀失败)。

<fancybox>『笔记』可扩展架构设计之消息队列</fancybox>

参考

本文由博客一文多发平台 OpenWrite 发布!
点赞
收藏
评论区
推荐文章
Centos7安装RabbitMQ详细教程 - 附带软件基本解释 - CSDN博客
MQ引言什么是MQMQ:messageQueue翻译为消息队列,通过典型的生产者和消费者模型不断向消息队列中生产消息,消费者不断从队列中获取消息。因为消息的生产和消费都是一部的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现了系统之间的解耦。别名是消息中间件,通过利用高效的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系
Stella981 Stella981
3年前
Spring Boot(七):RabbitMQ 详解
一、RabbitMQ简介RabbitMQ即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。消息中间件在互联网公司的使用中越来越多,消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的
Stella981 Stella981
3年前
RabbitMQ 消息中间件搭建详解
1.RabbitMQ简介消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包
Stella981 Stella981
3年前
Message Queue消息队列基本原理
消息队列基本原理📦本文已归档到:「blog」消息队列(MessageQueue,简称MQ)技术是分布式应用间交换信息的一种技术。消息队列主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。注意:_为了简便,下文中除了文章标
Stella981 Stella981
3年前
RabbitMQ基础概念详细介绍
RabbitMQ简介AMQP,即AdvancedMessageQueuingProtocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。R
专为小白打造—Kafka一篇文章从入门到入土 | 京东云技术团队
一、什么是KafkaMQ消息队列作为最常用的中间件之一,其主要特性有:解耦、异步、限流/削峰。Kafka和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka还提供了大多数消息系
微前端无界机制浅析 | 京东物流技术团队
简介随着项目的发展,前端SPA应用的规模不断加大、业务代码耦合、编译慢,导致日常的维护难度日益增加。同时前端技术的发展迅猛,导致功能扩展吃力,重构成本高,稳定性低。为了能够将前端模块解耦,通过相关技术调研,最终选择了无界微前端框架作为物流客服系统解耦支持。
京东云开发者 京东云开发者
8个月前
MQ消息乱序问题解析与实战解决方案
作者:京东物流刘浩1.背景在分布式系统中,消息队列(MQ)是实现系统解耦、异步通信的重要工具。然而,MQ消费时出现的消息乱序问题,经常会对业务逻辑的正确执行和系统稳定性产生不良影响。本文将详细探讨MQ消息乱序问题的根源,并提供一系列在实际应用中可行的解决方