在 Golang 中,你可以使用 RabbitMQ 客户端库 AMQP 来同时监听多个队列。以下是一个简单的示例代码:
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
func main() {
// 连接 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
// 创建 channel
ch, err := conn.Channel()
if err != nil {
log.Fatalf("failed to open a channel: %v", err)
}
defer ch.Close()
// 创建队列并绑定 exchange
q1, err := ch.QueueDeclare(
"queue1",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("failed to declare queue1: %v", err)
}
q2, err := ch.QueueDeclare(
"queue2",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("failed to declare queue2: %v", err)
}
// 绑定 exchange 和队列
if err = ch.QueueBind(q1.Name, "", "exchange1", false, nil); err != nil {
log.Fatalf("failed to bind queue1 to exchange1: %v", err)
}
if err = ch.QueueBind(q2.Name, "", "exchange2", false, nil); err != nil {
log.Fatalf("failed to bind queue2 to exchange2: %v", err)
}
// 消费消息
msgs1, err := ch.Consume(q1.Name, "", true, false, false, false, nil)
if err != nil {
log.Fatalf("failed to consume messages from queue1: %v", err)
}
msgs2, err := ch.Consume(q2.Name, "", true, false, false, false, nil)
if err != nil {
log.Fatalf("failed to consume messages from queue2: %v", err)
}
// 在两个 channel 上循环读取消息
for {
select {
case msg := <-msgs1:
fmt.Printf("received message from queue1: %s\n", string(msg.Body))
case msg := <-msgs2:
fmt.Printf("received message from queue2: %s\n", string(msg.Body))
}
}
}
在这个示例中,我们首先连接到 RabbitMQ 服务器并创建一个 channel。然后,我们使用 QueueDeclare
创建两个队列,并使用 QueueBind
将它们绑定到相应的 exchange 上。
接下来,我们使用 Consume
方法分别从两个队列中消费消息。在一个无限循环中,我们通过对两个 channel 的监听来同时处理来自不同队列的消息。
当有消息到达时,将会通过 select 语句选择合适的 channel 并打印出消息内容。
注意:以上代码只是简单的示例,实际应用需要更加健壮和完善的错误处理和逻辑控制。