-
Notifications
You must be signed in to change notification settings - Fork 64
Expand file tree
/
Copy pathRabbitMQ-Topics.java
More file actions
132 lines (125 loc) · 4.09 KB
/
Copy pathRabbitMQ-Topics.java
File metadata and controls
132 lines (125 loc) · 4.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
----------------------------
RabbitMQ-Topics |
----------------------------
# 通配模式
# 其实跟路由模式(Routing)一个德行
# 区别就是
1,Routing是消息的key和队列的key必须完全一致
2,这里是只需要符合匹配的规则
# 匹配规则
* 通配符有两种
1,#
一个或者多个词儿
2,*
只能是一个词儿
# 图解
- error.* ->
|- info.# -> [][][] C1
P -> X
|- *.error ->
|- #.info -> [][][] Cn
- #warn* ->
p :消息生产者
X :交换机(类型是:direct(路由))
C1 :消费者1
C2 :消费者N
/**
工具
*/
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("localhost");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/kevinblandy");
factory.setUsername("kevin");
factory.setPassword("a12551255");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
/**
生产者
*/
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange(交换机)
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 消息内容
String message = "Hello World!";
/**
绑定消息 key = key.1
*/
channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
/**
消费者1
*/
private final static String QUEUE_NAME = "test_queue_topic_work";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/*
绑定队列到交换机
并且指定通配符 = key.*
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
/**
消费者N
*/
private final static String QUEUE_NAME = "test_queue_topic_work2";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/*
绑定队列到交换机
并且指定通配符 = *.*
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}