以下是一个使用 golang 的异步生产者发送消息的示例:
package main
import (
"fmt"
"github.com/Shopify/sarama" // 导入 sarama 包
)
func main() {
config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config) // 创建一个异步生产者,连接本地的 kafka broker
if err != nil {
panic(err)
}
defer func() {
if err := producer.Close(); err != nil { // 关闭生产者连接
fmt.Println("Failed to close producer: ", err)
}
}()
message := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("Hello World!")} // 创建一个消息对象
producer.Input() <- message // 发送消息到生产者的输入通道
select {
case success := <-producer.Successes(): // 监听成功发送的消息
fmt.Printf("Produced message to topic %s partition:%d offset:%d\n", success.Topic, success.Partition, success.Offset)
case err := <-producer.Errors(): // 监听发送失败的消息
fmt.Println("Failed to produce message:", err)
}
}
以上代码使用了 sarama 包来创建一个异步生产者并发送一条消息到名为 test
的主题。通过向生产者的输入通道发送 message
对象来触发生产者发送这条消息。同时,我们使用 select
语句监听成功和失败两种情况,并在控制台输出相应信息。
需要注意的是,在使用 sarama 包时,我们需要在代码中引入该包并进行相应配置,如上例中的 config
对象。具体的配置方式和参数可参考 sarama 包的文档。