【译】RabbitMQ系列(四) - 路由模式

贾瑞
• 阅读 1417

路由模式

在之前的文章中我们建立了一个简单的日志系统。我们可以通过这个系统将日志message广播给很多接收者。

在本篇文章中,我们在这之上,添加一个新的功能,即允许接收者订阅message的一个子集。举个例子,我们将日志分成多个级别,一个接收者接收错误日志将之保存到磁盘,另一个接收者接收所有日志将之打印到控制台。

Bindings

在前面的章节中,我们已经接触过binding了,像下面的代码这样:

channel.queueBind(queueName,EXCHANGE_NAME,"");

binding将exchange和queue关联在了一起。更形象的表示,如:queue对exchange中的message感兴趣。

bindings可以携带一个routingKey参数。为了避免和basic_publish的参数弄混,我们称之它为binding_key.我们像下面这样创建一个binding

channel.queueBind(queueName,EXCHANGE_NAME,"black");

binding key的作用要看exchange的类型,对于fanout类型的exchange,binding key是直接忽略的。

Direct Exchange

在之前的日志系统中,message会推送到所有的消费者去。我们想让系统依据message的日志级别进行过滤。比如一个消费者只接收严重级别的日志。

fanout无法帮我们实现这样的功能,它只是无脑的进行广播。

我们使用direct类型的exchange,它的路由算法是非常简单的 - 只要message的routing_key和bind的binding_key相同即进行转发。

为了进行说明,像下图这么来设置
【译】RabbitMQ系列(四) - 路由模式
如图,可以看到有两个queue绑到了类型为direct的exchange上。第一个queue绑定用了orange这个binding key,第二个则用了black和green两个binding key。

那么结果就是有routing key为orange的message路由到了Q1.而routing key为black和green的message则路由到了Q2,其他的消息则被丢弃了。

Multiple Bindings

【译】RabbitMQ系列(四) - 路由模式
若使用相同的binding key将多个queue绑定到exchange上,就和fanout的行为一样了,message会广播到binding key相同的queue去。如图的设置中,一个routing key为black的message就会同时发送到Q1和Q2。

Emitting logs

我们将在我们的日志系统上应用这个模型,使用direct类型的exchange去替代fanout类型的exchange。提供日志的严重性作为routing key。接收程序可以选择要接收日志的严重性级别。
首先我们创建exchange

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

然后就是发送message

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

我们先假设severity取值 info | warning | error

Subscribing

接收message和上一章没什么区别,只是需要给各个severity创建新的binding。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

开始执行

【译】RabbitMQ系列(四) - 路由模式

EmitLogDirect.java代码如下

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
    }
  }
  //..
}

ReceiveLogsDirect.java代码如下:

import com.rabbitmq.client.*;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
        System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
        System.exit(1);
    }

    for (String severity : argv) {
        channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" +
            delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

编译代码

javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java

如果想把warning和error的日志保存到文件去,那么

java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

如果想把所有的日志打印到控制台,那么

java -cp $CP ReceiveLogsDirect info warning error

发送error日志

java -cp $CP EmitLogDirect error "Run.Run. Or it will explode"
点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
4年前
java实现23种设计模式之责任链模式
顾名思义,责任链模式(ChainofResponsibilityPattern)为请求创建了一个接收者对象的链。这种模式给予请求的类型,对请求的发送者和接收者进行解耦。这种类型的设计模式属于行为型模式。在这种模式中,通常每个接收者都包含对另一个接收者的引用。如果一个对象不能处理该请求,那么它会把相同的请求传给下一个接收者,依此类推。pa
Wesley13 Wesley13
4年前
RabbitMQ学习总结(6)——消息的路由分发机制详解
一、Routing(路由)(usingtheJavaclient)在前面的学习中,构建了一个简单的日志记录系统,能够广播所有的日志给多个接收者,在该部分学习中,将添加一个新的特点,就是可以只订阅一个特定的消息源,也就是说能够直接把关键的错误日志消息发送到日志文件保存起来,不重要的日志信息文件不保存在磁盘中,但是仍然能够在控制台输出,那么这便
Stella981 Stella981
4年前
RabbitMQ_消息队列基本使用_2
简介RabbitMQ:接受消息再传递消息,可以视为一个“邮局”。发送者和接受者通过队列来进行交互,队列的大小可以视为无限的,多个发送者可以发生给一个队列,多个接收者也可以从一个队列中接受消息。pika&使用rabbitmq使用的协议是amqp,用于python的推荐客户端是pikapipinstallpika
Stella981 Stella981
4年前
RabbitMQ 日志管理
日志默认输出位置如果我们是通过rpm方式进行安装的,那么系统默认会把日志放在/var/log/rabbitmq/目录下日志说明在日志目录/var/log/rabbitmq目录下,默认会有两类日志文件,一个是rabbit@{hostname}.log,另一个是rabbit@{hostname}sasl.log。
Stella981 Stella981
4年前
Android面试之EventBus
简介众所周知,EventBus是一款用在Android开发中的发布/订阅事件总线框架,基于观察者模式,将事件的接收者和发送者分开,简化了组件之间的通信操作,使用简单、效率高、体积小!EventBus使用了典型的发布/订阅事件模式,下面是EventBus官方给出的原理示意图。!在这里插入图片描述(https://oscimg.o
Stella981 Stella981
4年前
RabbitMQ消息队列(六):使用主题进行消息分发
在上篇文章RabbitMQ消息队列(五):Routing消息路由中,我们实现了一个简单的日志系统。Consumer可以监听不同severity的log。但是,这也是它之所以叫做简单日志系统的原因,因为是仅仅能够通过severity设定。不支持更多的标准。比如syslogunix的日志工具,它可以通过severity(info/warn/crit…
Stella981 Stella981
4年前
RabbitMQ指南之四:路由(Routing)和直连交换机(Direct Exchange)
 在上一章中,我们构建了一个简单的日志系统,我们可以把消息广播给很多的消费者。在本章中我们将增加一个特性:我们可以订阅这些信息中的一些信息。例如,我们希望只将error级别的错误存储到硬盘中,同时可以将所有级别(error、info、warning等)的日志都打印在控制台上。1、绑定(Bindings) 
Wesley13 Wesley13
4年前
JMS介绍
JMS消息传送模型    点对点消息传送模型  在点对点消息传送模型中,应用程序由消息队列,发送者,接收者组成。每一个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息(除了被接收者消费掉的和过期的消息)。点对点消息模型有一些特性,如下:每个消息只有一个接收者;消息发送者和接收者并没有时间依
Wesley13 Wesley13
4年前
Java设计模式之责任链模式
引入责任链模式责任链模式顾名思义,责任链模式(ChainofResponsibilityPattern)为请求创建了一个接收者对象的链。这种模式给予请求的类型,对请求的发送者和接收者进行解耦。这种类型的设计模式属于行为型模式。在这种模式中,通常每个接收者都包含对另一个接收者的引用。如果一个对象不能处理该请求,那么它会
Stella981 Stella981
4年前
Python日志库logging总结
在部署项目时,不可能直接将所有的信息都输出到控制台中,我们可以将这些信息记录到日志文件中,这样不仅方便我们查看程序运行时的情况,也可以在项目出现故障时根据运行时产生的日志快速定位问题出现的位置。1、日志级别Python标准库logging用作记录日志,默认分为六种日志级别(括号为级别对应的数值),NOTSET(0)、DEBUG(10)
贾瑞
贾瑞
Lv1
春眠不觉晓,处处闻啼鸟。夜来风雨声,花落知多少!
文章
4
粉丝
0
获赞
0