RabbitMQ入门

Stella981
• 阅读 668

[toc]

一:入门

1.安装Erlang

2.安装RabbitMQ

3.配置

激活 RabbitMQ's Management Plugin

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.12\sbin>rabbitmq-plugins.bat enable rabbitmq
Error: The following plugins could not be found:
  rabbitmq


C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.12\sbin>rabbitmq-plugins.bat enable rabbitmq_management
The following plugins have been enabled:
  amqp_client
  cowlib
  cowboy
  rabbitmq_web_dispatch
  rabbitmq_management_agent
  rabbitmq_management

Applying plugin configuration to rabbit@DESKTOP-BugYang... started 6 plugins.

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.12\sbin>net stop RabbitMQ && net start RabbitMQ
RabbitMQ 服务正在停止......
RabbitMQ 服务已成功停止。

RabbitMQ 服务正在启动 .
RabbitMQ 服务已经启动成功。


C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.12\sbin>rabbitmqctl.bat list_users
Listing users
guest   [administrator]

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.12\sbin>

http://localhost:15672/

账号密码:guest

4.下载maven

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>4.1.0</version>
</dependency>

5.创建发送者

public class Send {

    //队列名称
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws java.io.IOException, TimeoutException {
        /**
         * 创建连接连接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        //设置MabbitMQ所在主机ip或者主机名
        factory.setHost("localhost");
        //创建一个连接
        Connection connection = factory.newConnection();
        //创建一个频道
        Channel channel = connection.createChannel();
        //指定一个队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //发送的消息
        String message = "hello world!";
        //往队列中发出一条消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //关闭频道和连接
        channel.close();
        connection.close();
    }
}

打印

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
 [x] Sent 'hello world!'

Process finished with exit code 0

6.创建接受者

public class Rec {
    //队列名称
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException, TimeoutException {
        //打开连接和创建频道,与发送端一样
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        //创建队列消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //指定消费队列
        channel.basicConsume(QUEUE_NAME, true, consumer);
        while (true)
        {
            //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }

    }
}

打印

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'hello world!'

二:工作队列

1.发送消息

public class NewTask
{
    //队列名称
    private final static String QUEUE_NAME = "workqueue";

    public static void main(String[] args) throws IOException, TimeoutException {

        //创建连工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置ip
        factory.setHost("localhost");
        //创建连接
        Connection connection = factory.newConnection();
        //创建队列
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //发送10条消息,依次在消息后面附加1-10个点
        for (int i = 0; i < 10; i++)
        {
            String dots = "";
            for (int j = 0; j <= i; j++)
            {
                dots += ".";
            }
            //拼数据
            String message = "helloworld" + dots+dots.length();
            //推送到rabbitmq中
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            //推送完成,打印结束语句
            System.out.println(" [x] Sent '" + message + "'");
        }
        //关闭队列
        channel.close();
        //关闭消息
        connection.close();

    }


}

 [x] Sent 'helloworld.1'
 [x] Sent 'helloworld..2'
 [x] Sent 'helloworld...3'
 [x] Sent 'helloworld....4'
 [x] Sent 'helloworld.....5'
 [x] Sent 'helloworld......6'
 [x] Sent 'helloworld.......7'
 [x] Sent 'helloworld........8'
 [x] Sent 'helloworld.........9'
 [x] Sent 'helloworld..........10'

2.接收消息

运行两个Work类

public class Work
{
    //队列名称
    private final static String QUEUE_NAME = "workqueue";

    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException, TimeoutException {

        //区分不同工作进程的输出
        int hashCode = Work.class.hashCode();

        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置ip
        factory.setHost("localhost");
        //创建连接
        Connection connection = factory.newConnection();
        //创建队列
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(hashCode
                + " [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
       // 指定消费队列
        //关闭应答机制,会丢失消息
        channel.basicConsume(QUEUE_NAME, true, consumer);
        //打开应答机制,不会丢失消息
        channel.basicConsume(QUEUE_NAME, false, consumer);
        while (true)
        {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());

            System.out.println(hashCode + " [x] Received '" + message + "'");
//            doWork(message);
            System.out.println(hashCode + " [x] Done");

        }

    }

    /**
     * 每个点耗时1s
     * @param task
     * @throws InterruptedException
     */
    private static void doWork(String task) throws InterruptedException
    {
        for (char ch : task.toCharArray())
        {
            if (ch == '.')
                Thread.sleep(1000);
        }
    }
}

746292446 [x] Received 'helloworld.1'
746292446 [x] Done
746292446 [x] Received 'helloworld...3'
746292446 [x] Done
746292446 [x] Received 'helloworld.....5'
746292446 [x] Done
746292446 [x] Received 'helloworld.......7'
746292446 [x] Done
746292446 [x] Received 'helloworld.........9'
746292446 [x] Done

242131142 [x] Received 'helloworld..2'
242131142 [x] Done
242131142 [x] Received 'helloworld....4'
242131142 [x] Done
242131142 [x] Received 'helloworld......6'
242131142 [x] Done
242131142 [x] Received 'helloworld........8'
242131142 [x] Done
242131142 [x] Received 'helloworld..........10'
242131142 [x] Done

可以看到,默认的,RabbitMQ会一个一个的发送信息给下一个消费者(consumer),而不考虑每个任务的时长等等,且是一次性分配,并非一个一个分配。平均的每个消费者将会获得相等数量的消息。这样分发消息的方式叫做round-robin。

3.消息应答(message acknowledgments)

我们首先开启两个任务,然后执行发送任务的代码(NewTask.java),然后立即关闭第二个任务,两个加起来打印出来的数据会有缺失

一旦RabbItMQ交付了一个信息给消费者,会马上从内存中移除这个信息。在这种情况下,如果杀死正在执行任务的某个工作者,我们会丢失它正在处理的信息。我们也会丢失已经转发给这个工作者且它还未执行的消息。

为了保证消息永远不会丢失,RabbitMQ支持消息应答(message acknowledgments)。

  • 消费者发送应答给RabbitMQ,告诉它信息已经被接收和处理,然后RabbitMQ可以自由的进行信息删除。
  • 如果消费者被杀死而没有发送应答,RabbitMQ会认为该信息没有被完全的处理,然后将会重新转发给别的消费者。通过这种方式,你可以确认信息不会被丢失,即使消者偶尔被杀死。
  • 这种机制并没有超时时间这么一说,RabbitMQ只有在消费者连接断开是重新转发此信息。如果消费者处理一个信息需要耗费特别特别长的时间是允许的。

消息应答默认是打开的。上面的代码中我们通过显示的设置autoAsk=true关闭了这种机制。

boolean ack = false ; //打开应答机制  
channel.basicConsume(QUEUE_NAME, ack, consumer);  
//另外需要在每次处理完成一个消息后,手动发送一次应答。  
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

public class Work  
{  
    //队列名称  
    private final static String QUEUE_NAME = "workqueue";  
  
    public static void main(String[] argv) throws java.io.IOException,  
            java.lang.InterruptedException  
    {  
        //区分不同工作进程的输出  
        int hashCode = Work.class.hashCode();  
        //创建连接和频道  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        //声明队列  
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        System.out.println(hashCode  
                + " [*] Waiting for messages. To exit press CTRL+C");  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        // 指定消费队列  
        boolean ack = false ; //打开应答机制  
        channel.basicConsume(QUEUE_NAME, ack, consumer);  
        while (true)  
        {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  
  
            System.out.println(hashCode + " [x] Received '" + message + "'");  
            doWork(message);  
            System.out.println(hashCode + " [x] Done");  
            //发送应答  
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
  
        }  
  
    }  
}

4.消息持久化(Message durability)

我们已经学习了即使消费者被杀死,消息也不会被丢失。但是如果此时RabbitMQ服务被停止,我们的消息仍然会丢失

当RabbitMQ退出或者异常退出,将会丢失所有的队列和信息,除非你告诉它不要丢失。

我们需要做两件事来确保信息不会被丢失:我们需要给所有的队列消息设置持久化的标志。

  • 第一, 我们需要确认RabbitMQ永远不会丢失我们的队列。为了这样,我们需要声明它为持久化的。

    boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);

注:RabbitMQ不允许使用不同的参数重新定义一个队列,所以已经存在的队列,我们无法修改其属性。

  • 第二, 我们需要标识我们的信息为持久化的。通过设置MessageProperties(implements BasicProperties)值为PERSISTENT_TEXT_PLAIN。

    channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

现在你可以执行一个发送消息的程序,然后关闭服务,再重新启动服务,运行消费者程序做下实验。

5.公平转发(Fair dispatch)

对于两个消费者,有一系列的任务,奇数任务特别耗时,而偶数任务却很轻松,这样造成一个消费者一直繁忙,另一个消费者却很快执行完任务后等待。
造成这样的原因是因为RabbitMQ仅仅是当消息到达队列进行转发消息。并不在乎有多少任务消费者并未传递一个应答给RabbitMQ。仅仅盲目转发所有的奇数给一个消费者,偶数给另一个消费者。

int prefetchCount = 1;  
channel.basicQos(prefetchCount);

public class NewTask  
{  
    // 队列名称  
    private final static String QUEUE_NAME = "workqueue_persistence";  
  
    public static void main(String[] args) throws IOException  
    {  
        // 创建连接和频道  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        // 声明队列  
        boolean durable = true;// 1、设置队列持久化  
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);  
        // 发送10条消息,依次在消息后面附加1-10个点  
        for (int i = 5; i > 0; i--)  
        {  
            String dots = "";  
            for (int j = 0; j <= i; j++)  
            {  
                dots += ".";  
            }  
            String message = "helloworld" + dots + dots.length();  
            // MessageProperties 2、设置消息持久化  
            channel.basicPublish("", QUEUE_NAME,  
                    MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());  
            System.out.println(" [x] Sent '" + message + "'");  
        }  
        // 关闭频道和资源  
        channel.close();  
        connection.close();  
  
    }  
  
}

public class Work  
{  
    // 队列名称  
    private final static String QUEUE_NAME = "workqueue_persistence";  
  
    public static void main(String[] argv) throws java.io.IOException,  
            java.lang.InterruptedException  
    {  
        // 区分不同工作进程的输出  
        int hashCode = Work.class.hashCode();  
        // 创建连接和频道  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        // 声明队列  
        boolean durable = true;  
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);  
        System.out.println(hashCode  
                + " [*] Waiting for messages. To exit press CTRL+C");  
        //设置最大服务转发消息数量  
        int prefetchCount = 1;  
        channel.basicQos(prefetchCount);  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        // 指定消费队列  
        boolean ack = false; // 打开应答机制  
        channel.basicConsume(QUEUE_NAME, ack, consumer);  
        while (true)  
        {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  
  
            System.out.println(hashCode + " [x] Received '" + message + "'");  
            doWork(message);  
            System.out.println(hashCode + " [x] Done");  
            //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
  
        }  
  
    }  
  
    /** 
     * 每个点耗时1s 
     *  
     * @param task 
     * @throws InterruptedException 
     */  
    private static void doWork(String task) throws InterruptedException  
    {  
        for (char ch : task.toCharArray())  
        {  
            if (ch == '.')  
                Thread.sleep(1000);  
        }  
    }  
}

三:发布/订阅

工作队列中的一个任务只会发给一个工作者

就是把一个消息发给多个消费者,这种模式称之为发布/订阅(类似观察者模式)。

为了验证这种模式,我们准备构建一个简单的日志系统。这个系统包含两类程序,

一类程序发动日志,另一类程序接收和处理日志。

我们实现,一个接收者将接收到的数据写到硬盘上,与此同时,另一个接收者把接收到的消息展现在屏幕上。

1:转发器(Exchanges)

RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列。

相反的,生产者只能发送消息给转发器(Exchange)。转发器是非常简单的,一边接收从生产者发来的消息,另一边把消息推送到队列中。转发器必须清楚的知道消息如何处理它收到的每一条消息。是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过转发器的类型进行定义。

可用的转发器类型:

  • Direct
  • Topic
  • Headers
  • Fanout

声明转发器类型的代码:

channel.exchangeDeclare("logs","fanout");

fanout类型转发器特别简单,把所有它介绍到的消息,广播到所有它所知道的队列。不过这正是我们前述的日志系统所需要的

2、匿名转发器(nameless exchange)

前面说到生产者只能发送消息给转发器(Exchange),但是我们前两篇博客中的例子并没有使用到转发器,我们仍然可以发送和接收消息。

这是因为我们使用了一个默认的转发器,它的标识符为””。之前发送消息的代码:

channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

第一个参数为转发器的名称,我们设置为”” : 如果存在routingKey(第二个参数),消息由routingKey决定发送到哪个队列。

现在我们可以指定消息发送到的转发器:

channel.basicPublish( "logs","", null, message.getBytes());

3、临时队列(Temporary queues)

前面的博客中我们都为队列指定了一个特定的名称。能够为队列命名对我们来说是很关键的,我们需要指定消费者为某个队列。当我们希望在生产者和消费者间共享队列时,为队列命名是很重要的。
不过,对于我们的日志系统我们并不关心队列的名称。我们想要接收到所有的消息,而且我们也只对当前正在传递的数据的感兴趣。为了满足我们的需求,需要做两件事:
第一, 无论什么时间连接到Rabbit我们都需要一个新的空的队列。为了实现,我们可以使用随机数创建队列,或者更好的,让服务器给我们提供一个随机的名称。
第二, 一旦消费者与Rabbit断开,消费者所接收的那个队列应该被自动删除。
Java中我们可以使用queueDeclare()方法,不传递任何参数,来创建一个非持久的、唯一的、自动删除的队列且队列名称由服务器随机产生。

String queueName = channel.queueDeclare().getQueue();

一般情况这个名称与amq.gen-JzTY20BRgKO-HjmUJj0wLg 类似

4、绑定(Bindings)

我们已经创建了一个fanout转发器和队列,我们现在需要通过binding告诉转发器把消息发送给我们的队列。

channel.queueBind(queueName, “logs”, ””)

参数1:队列名称 ;参数2:转发器名称

5、完整的例子

1.创建发送器
public class EmitLog
{
    private final static String EXCHANGE_NAME = "ex_log";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置ip
        factory.setHost("localhost");
        //创建连接
        Connection connection = factory.newConnection();
        //创建频道
        Channel channel = connection.createChannel();
        // 声明转发器和类型
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );
        //创建发送的数据
        String message = new Date().toLocaleString()+" : log something";
        // 往转发器上发送消息
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();

    }

}
2.创建接收器,数据写进文件里
public class ReceiveLogsToSave
{
    private final static String EXCHANGE_NAME = "ex_log";

    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException, TimeoutException {

        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置ip
        factory.setHost("localhost");
        //创建连接
        Connection connection = factory.newConnection();
        //创建频道
        Channel channel = connection.createChannel();
        // 声明转发器和类型
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 创建一个非持久的、唯一的且自动删除的队列,临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 为转发器指定队列,设置binding,绑定
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 指定接收者,第二个参数为自动应答,无需手动应答
        channel.basicConsume(queueName, true, consumer);

        while (true)
        {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());

            print2File(message);
        }

    }

    private static void print2File(String msg)
    {
        try
        {
            String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();
            String logFileName = new SimpleDateFormat("yyyy-MM-dd")
                    .format(new Date());
            File file = new File(dir, logFileName+".txt");
            FileOutputStream fos = new FileOutputStream(file, true);
            fos.write((msg + "\r\n").getBytes());
            fos.flush();
            fos.close();
        } catch (FileNotFoundException e)
        {
            e.printStackTrace();
        } catch (IOException e)
        {
            e.printStackTrace();
        }
    }
}
3.创建接收器,打印出信息
public class ReceiveLogsToConsole
{
    private final static String EXCHANGE_NAME = "ex_log";

    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置ip
        factory.setHost("localhost");
        //创建连接
        Connection connection = factory.newConnection();
        //创建频道
        Channel channel = connection.createChannel();
        // 声明转发器和类型
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 创建一个非持久的、唯一的且自动删除的队列
        String queueName = channel.queueDeclare().getQueue();
        // 为转发器指定队列,设置binding
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 指定接收者,第二个参数为自动应答,无需手动应答
        channel.basicConsume(queueName, true, consumer);

        while (true)
        {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");

        }

    }

}

四:路由(Routing)

需求:本篇博客我们准备给日志系统添加新的特性,让日志接收者能够订阅部分消息。例如,我们可以仅仅将致命的错误写入日志文件,然而仍然在控制面板上打印出所有的其他类型的日志消息。

1、绑定(Bindings)

在上一篇博客中我们已经使用过绑定

channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定表示转发器与队列之间的关系。

我们也可以简单的认为:队列对该转发器上的消息感兴趣。

绑定可以附带一个额外的参数routingKey。为了与避免basicPublish方法(发布消息的方法)的参数混淆,我们准备把它称作绑定键(binding key)。下面展示如何使用绑定键(binding key)来创建一个绑定:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

绑定键的意义依赖于转发器的类型。对于fanout类型,忽略此参数。

2、直接转发(Direct exchange)

上一篇的日志系统广播所有的消息给所有的消费者。

现在想:可能希望把致命类型的错误写入硬盘,而不把硬盘空间浪费在警告或者消息类型的日志上。

之前我们使用fanout类型的转发器,但是并没有给我们带来更多的灵活性:仅仅可以愚蠢的转发。

我们将会使用direct类型的转发器进行替代。direct类型的转发器背后的路由转发算法很简单:

​ 消息会被推送至绑定键(binding key)和消息发布附带的选择键(routing key)完全匹配的队列。

图解:

RabbitMQ入门

img

我们可以看到direct类型的转发器与两个队列绑定。

第一个队列与绑定键orange绑定

第二个队列与转发器间有两个绑定,一个与绑定键black绑定,另一个与green绑定键绑定。
这样的话,当一个消息附带一个选择键(routing key) orange发布至转发器将会被导向到队列Q1。消息附带一个选择键(routing key)black或者green将会被导向到Q2.所有的其他的消息将会被丢弃。

3、多重绑定(multiple bindings)

RabbitMQ入门

img

使用一个绑定键(binding key)绑定多个队列是完全合法的。如上图,一个附带选择键(routing key)的消息将会被转发到Q1和Q2。

4、发送日志(Emittinglogs)

我们将消息发送到direct类型的转发器而不是fanout类型。我们将把日志的严重性作为选择键(routing key)。这样的话,接收程序可以根据严重性来选择接收。我们首先关注发送日志的代码:

像以前一样,我们需要先创建一个转发器:

channel.exchangeDeclare(EXCHANGE_NAME,"direct");

然后我们准备发送一条消息:

channel.basicPublish(EXCHANGE_NAME,severity, null, message.getBytes());

为了简化代码,我们假定‘severity’是‘info’,‘warning’,‘error’中的一个。

5、订阅

接收消息的代码和前面的博客的中类似,只有一点不同:我们给我们所感兴趣的严重性类型的日志创建一个绑定。

StringqueueName = channel.queueDeclare().getQueue();
for(Stringseverity : argv){
    channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

6、完整的实例

1.创建发送者

public class EmitLogDirect
{

    private static final String EXCHANGE_NAME = "ex_logs_direct";
    private static final String[] SEVERITIES = { "info", "warning", "error" };

    public static void main(String[] argv) throws java.io.IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置ip
        factory.setHost("localhost");
        //创建连接
        Connection connection = factory.newConnection();
        //创建频道
        Channel channel = connection.createChannel();
        // 声明转发器的类型
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        //发送6条消息
        for (int i = 0; i < 6; i++)
        {
            String severity = getSeverity();
            String message = severity + "_log :" + UUID.randomUUID().toString();
            // 发布消息至转发器,指定routingkey
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }

        channel.close();
        connection.close();
    }

    /**
     * 随机产生一种日志类型
     *
     * @return
     */
    private static String getSeverity()
    {
        Random random = new Random();
        int ranVal = random.nextInt(3);
        return SEVERITIES[ranVal];
    }
}

2.创建接受者

public class ReceiveLogsDirect
{

    private static final String EXCHANGE_NAME = "ex_logs_direct";
    private static final String[] SEVERITIES = { "info", "warning", "error" };

    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置ip
        factory.setHost("localhost");
        //创建连接
        Connection connection = factory.newConnection();
        //创建频道
        Channel channel = connection.createChannel();
        // 声明direct类型转发器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 创建一个非持久的、唯一的且自动删除的队列,临时队列
        String queueName = channel.queueDeclare().getQueue();

        String severity = getSeverity();

        // 指定binding_key
        channel.queueBind(queueName, EXCHANGE_NAME, severity);
        System.out.println(" [*] Waiting for "+severity+" logs. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true)
        {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());

            System.out.println(" [x] Received '" + message + "'");
        }
    }

    /**
     * 随机产生一种日志类型
     *
     * @return
     */
    private static String getSeverity()
    {
        Random random = new Random();
        int ranVal = random.nextInt(3);
        return SEVERITIES[ranVal];
    }
}

3.总结:

发送消息时可以设置routing_key,接收队列与转发器间可以设置binding_key,接收者接收与binding_key与routing_key相同的消息。

五:主题

1、 主题转发(Topic Exchange)

发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。

2.

3.完整例子

1.发送

public class EmitLogTopic
{

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception
    {
        // 创建连接和频道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //指定topic的转发器
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String[] routing_keys = new String[] { "kernal.info", "cron.warning","auth.info", "kernel.critical" };

        for (String routing_key : routing_keys)
        {
            String msg = UUID.randomUUID().toString();
            channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg.getBytes());
            System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");
        }

        channel.close();
        connection.close();
    }
}

2.接收1

public class ReceiveLogsTopicForCritical
{

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception
    {
        // 创建连接和频道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明topic转发器
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        // 随机生成一个队列
        String queueName = channel.queueDeclare().getQueue();

        // 接收所有与kernel相关的消息
        channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true)
        {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received routingKey = " + routingKey  + ",msg = " + message + ".");
        }
    }
}

2.接收2

public class ReceiveLogsTopicForKernel
{

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception
    {
        // 创建连接和频道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明topic转发器
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        // 随机生成一个队列
        String queueName = channel.queueDeclare().getQueue();

        //接收所有与kernel相关的消息
        channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true)
        {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received routingKey = " + routingKey + ",msg = " + message + ".");
        }
    }
}

六:参考

http://blog.csdn.net/lmj623565791/article/details/37706355

http://blog.csdn.net/lmj623565791/article/details/37669573

http://blog.csdn.net/lmj623565791/article/details/37657225

http://blog.csdn.net/lmj623565791/article/details/37620057

http://blog.csdn.net/lmj623565791/article/details/37607165

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Easter79 Easter79
2年前
swap空间的增减方法
(1)增大swap空间去激活swap交换区:swapoff v /dev/vg00/lvswap扩展交换lv:lvextend L 10G /dev/vg00/lvswap重新生成swap交换区:mkswap /dev/vg00/lvswap激活新生成的交换区:swapon v /dev/vg00/lvswap
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
2年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这