在 Golang 中,使用第三方库 sarama 进行 Kafka 异步处理也非常简单。
- 生产者异步发送消息:
// 创建一个生产者对象
producer, err := sarama.NewAsyncProducer([]string{"kafka-broker:9092"}, nil)
if err != nil {
panic(err)
}
defer producer.AsyncClose()
// 构造消息
msg := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("hello, kafka!"),
}
// 发送消息
producer.Input() <- msg
// 等待结果返回
select {
case success := <-producer.Successes():
fmt.Printf("message sent to partition %d at offset %d\n", success.Partition, success.Offset)
case err := <-producer.Errors():
fmt.Printf("failed to send message: %s\n", err.Error())
}
这里使用了 NewAsyncProducer() 方法创建了一个异步生产者实例。在向 Kafka Broker 发送消息时,我们将消息通过 Input() 方法写入到生产者的输入通道中,并且在 select 语句中等待结果返回。如果发送成功,则会从 Successes() 通道中获取到成功信息;如果发送失败,则会从 Errors() 通道中获取到错误信息。
- 消费者异步消费消息:
// 创建一个消费者对象
consumer, err := sarama.NewConsumer([]string{"kafka-broker:9092"}, nil)
if err != nil {
panic(err)
}
defer consumer.Close()
// 订阅主题及其 Partition
partitionList, _ := consumer.Partitions("test-topic")
for _, partition := range partitionList {
pc, _ := consumer.ConsumePartition("test-topic", partition, sarama.OffsetNewest)
// 异步消费消息
go func(pc sarama.PartitionConsumer) {
defer pc.AsyncClose()
for msg := range pc.Messages() {
fmt.Printf("partition:%d\toffset:%d\tvalue:%s\n", msg.Partition, msg.Offset, string(msg.Value))
// 处理完毕后异步提交 Offset
pc.AsyncCommitOffset()
}
}(pc)
}
// 阻塞程序不退出,等待消息到来或错误发生
select {}
这里使用了 ConsumePartition() 方法创建了一个消费者实例,并且在循环遍历每个 Partition 时启动了一个 goroutine 来异步消费消息。当从 Messages() 通道中获取到一条消息时,我们可以进行相应的处理,并且通过 AsyncCommitOffset() 方法异步提交 Offset。这样就可以确保在出现异常情况下也能够正确地提交 Offset。
总之,在 Golang 中使用 sarama 库进行 Kafka 的异步处理非常方便和灵活,可以有效提高应用程序的性能和可靠性。




