RabbitMQ(四):路由(Routing)

前言

先决条件
✔️ 正确安装 RabbitMQ 并将其运行在 localhost:5672 上
✔️ 已经了解了 RabbitMQ 中的一些基础概念

上一个教程中,我们构建了一个简单的 fanout exchange,从而能够向许多消费者广播消息。

在本文中,我们将实现另一个功能——只订阅一部分消息。例如,我们只需要把严重的错误日志信息写入日志文件(存储到磁盘),但同时仍然把所有的日志信息输出到控制台中

绑定(Binding)

在之前的例子中,我们已经创建了绑定。可以在我们的Tut3Config文件中回忆一下这样的代码:

1
2
3
4
@Bean
public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
}

绑定(binding)是指交换器(exchange)和队列(queue)的关系。可以简单理解为:这个队列(queue)对这个交换器(exchange)的消息感兴趣。
绑定可以使用一个额外的参数routingKey。我们将交换器和队列传入到BindingBuilder,并将routingKey绑定到交换器,如下所示:

1
2
3
4
5
6
@Bean
public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(direct)
.with("orange");
}

routingKey含义取决于交换类型。比如我们之前使用的 fanout exchange,会忽略它的值。

Direct exchange

我们的日志系统广播所有的消息给所有的消费者(consumers)。我们打算扩展它,使其基于日志的严重程度进行消息过滤。例如我们也许只是希望将比较严重的错误(error)日志写入磁盘,以免在警告(warning)或者信息(info)日志上浪费磁盘空间。

我们使用的 fanout exchange 没有足够的灵活性——它能做的仅仅是广播。

我们将会使用 direct exchange 来代替。路由的算法很简单——交换器将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而确定消息该分发到哪个队列。

下图能够很好的描述这个场景:

在这个场景中,我们可以看到 direct exchange X 和两个队列进行了绑定。第一个队列使用 orange 作为绑定键,第二个队列有两个绑定,一个使用 black 作为绑定键,另外一个使用 green。

这样以来,当路由键为 orange 的消息发布到交换器,就会被路由到队列 Q1。路由键为 black 或者 green 的消息就会路由到 Q2。其他的所有消息都将会被丢弃。

多个绑定(Multiple bindings)


多个队列使用相同的绑定键是可以的。这个例子中,我们可以添加一个 X 和 Q1 之间的绑定,使用 black 绑定键。这样一来,direct exchange 就和 fanout exchange 的行为一样,会将消息广播到所有匹配的队列。带有 black 路由键的消息会同时发送到 Q1 和 Q2。

发布消息

我们将使用以上这个模型作为我们的路由系统,将消息发送到 direct exchange 而不是 fanout exchange。我们将使用颜色作为路由键,这样消费者将能通过选择想要接收(或订阅)的颜色来消费对应的消息。

我们在Tut4Config中做一些 Spring 启动配置,需要先建立一个交换器

1
2
3
4
@Bean
public DirectExchange direct() {
return new DirectExchange("tut.direct");
}

接收消息的方式与上一个教程中的一样,但也有一些不同——我们需要为每个感兴趣的颜色创建一个新的绑定。

1
2
3
4
5
@Bean
public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(direct)
.with("orange");

代码整合

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Tut4Sender {

@Autowired
private AmqpTemplate template;

@Autowired
private DirectExchange direct;

private int index;

private int count;

private final String[] keys = {"orange", "black", "green"};

@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
StringBuilder builder = new StringBuilder("Hello to ");
if (++this.index == 3) {
this.index = 0;
}
String key = keys[this.index];
builder.append(key).append(' ').append(Integer.toString(++this.count));
String message = builder.toString();
template.convertAndSend(direct.getName(), key, message);
System.out.println(" [x] Sent '" + message + "'");
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Tut4Receiver {

@RabbitListener(queues = "#{autoDeleteQueue1.name}")
public void receiver1(String in) throws InterruptedException {
receiver(in, 1);
}

@RabbitListener(queues = "#{autoDeleteQueue2.name}")
public void receiver2(String in) throws InterruptedException {
receiver(in, 2);
}

private void receiver(String in, int instance) throws InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
System.out.println("instance " + instance + " [x] Received '" + in + "'");
doWork(in);
watch.stop();
System.out.println("instance " + instance + " [x] Done in " +
watch.getTotalTimeSeconds() + "s");
}

private void doWork(String in) throws InterruptedException {
for (char ch : in.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
}

}

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@Profile({"tut4", "routing"})
@Configuration
public class Tut4Config {


@Bean
public DirectExchange direct() {
return new DirectExchange("tut.direct");
}

@Profile("receiver")
private static class ReceiverConfig {

@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}

@Bean
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
}

@Bean
public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(direct)
.with("orange");
}

@Bean
public Binding binding1b(DirectExchange direct, Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(direct)
.with("black");
}

@Bean
public Binding binding2a(DirectExchange direct, Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2)
.to(direct)
.with("green");
}

@Bean
public Binding binding2b(DirectExchange direct, Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2)
.to(direct)
.with("black");
}

@Bean
public Tut4Receiver receiver() {
return new Tut4Receiver();
}

}

@Profile("sender")
@Bean
public Tut4Sender sender() {
return new Tut4Sender();
}

}

运行

maven 编译

1
mvn clean package -Dmaven.test.skip=true

运行

1
2
java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut4,receiver  --tutorial.client.duration=60000
java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut4,sender --tutorial.client.duration=60000

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Sender
Ready ... running for 60000ms
[x] Sent 'Hello to black 1'
[x] Sent 'Hello to green 2'
[x] Sent 'Hello to orange 3'
[x] Sent 'Hello to black 4'

// Receiver
Ready ... running for 60000ms
instance 2 [x] Received 'Hello to black 1'
instance 1 [x] Received 'Hello to black 1'
instance 2 [x] Done in 0.0s
instance 1 [x] Done in 0.0s
instance 2 [x] Received 'Hello to green 2'
instance 2 [x] Done in 0.0s
instance 1 [x] Received 'Hello to orange 3'
instance 1 [x] Done in 0.0s
instance 1 [x] Received 'Hello to black 4'
instance 1 [x] Done in 0.0s
instance 2 [x] Received 'Hello to black 4'
instance 2 [x] Done in 0.0s

代码地址:https://github.com/zhaoyibo/rabbitmq-tutorial
相关文章:

  1. RabbitMQ(零):基础概念
  2. RabbitMQ(一):Hello World
  3. RabbitMQ(二):工作队列(Work queues)
  4. RabbitMQ(三):发布订阅(Publish/Subscribe)
  5. RabbitMQ(四):路由(Routing)
  6. RabbitMQ(五):主题(Topics)
  7. RabbitMQ(六):远程过程调用(RPC)

参考

RabbitMQ Tutorial Four


RabbitMQ(四):路由(Routing)
https://www.haoyizebo.com/posts/4c88b68f/
作者
一博
发布于
2018年4月13日
许可协议