在实际应用中,Kafka 经常被用作日志处理的工具。Golang 提供了多个开源库来方便地使用 Kafka 进行日志处理。下面介绍两个常用的库:sarama 和 confluent-kafka-go。
- 使用 sarama 库进行日志处理
sarama 是一个支持生产者和消费者 API 的 Golang 库,可以与 Apache Kafka 版本 0.8 或更高版本兼容。它提供了一组简单而强大的 API 来访问 Kafka 集群,并支持消息批量发送和接收、异步发送和接收以及基于通道的消息读取等功能。
以下是使用 sarama 库进行日志处理的示例代码:
package main
import (
"fmt"
"os"
"os/signal"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
defer producer.Close()
// 发送一条带有时间戳的消息
timestamp := time.Now().UTC().Format(time.RFC3339Nano)
msg := &sarama.ProducerMessage{
Topic: "logs",
Partition: int32(-1),
Key: sarama.StringEncoder("key"),
Value: sarama.StringEncoder(fmt.Sprintf("[%s] Hello, world!", timestamp)),
Timestamp: time.Now().UTC(),
}
_, _, err = producer.SendMessage(msg)
if err != nil {
panic(err)
}
// 创建一个消费者组,订阅 "logs" 主题的所有分区
consumer, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
if err != nil {
panic(err)
}
defer consumer.Close()
// 处理从 Kafka 中读取的消息
go func() {
for msg := range consumer.Messages() {
fmt.Printf("Received message: %s\n", string(msg.Value))
consumer.MarkOffset(msg, "") // 标记消息已被消费
}
}()
// 捕获中断信号并优雅地退出程序
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, os.Interrupt)
<-sigterm
}
该示例代码中首先创建了一个生产者实例,然后发送一条带有时间戳的消息到名为 logs
的主题。接下来,创建一个消费者组并订阅 logs
主题的所有分区,并在另一个 goroutine 中处理从 Kafka 中读取的消息。最后,通过捕获中断信号来优雅地退出程序。
- 使用 confluent-kafka-go 库进行日志处理
confluent-kafka-go 是另一个用于 Golang 和 Apache Kafka 集成的开源库。它是 Confluent 公司维护的 Kafka 官方客户端之一,支持高级功能如事务、压缩、SSL 和 SASL 等。它还提供了易于使用的 API 来处理 Kafka 日志。
以下是使用 confluent-kafka-go 库进行日志处理的示例代码:
package main
import (
"fmt"
"os"
"os/signal"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
if err != nil {
panic(err)
}
defer producer.Close()
// 发送一条带有时间戳的消息
timestamp := time.Now().UTC().Format(time.RFC3339Nano)
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Key: []byte("key"),
Value: []byte(fmt.Sprintf("[%s] Hello, world!", timestamp)),
Timestamp: time.Now(),
}
err = producer.Produce(msg, nil)
if err != nil {
panic(err)
}
// 创建一个消费者实例,订阅 "logs" 主题的所有分区
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"auto.offset.reset": "earliest",
"enable.auto.commit": false,
})
if err != nil {
panic(err)
}
defer consumer.Close()
// 订阅主题并处理从 Kafka 中读取的消息
consumer.SubscribeTopics([]string{"logs"}, nil)
for {
msg, err := consumer.ReadMessage(-1)
if err != nil {
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
continue
}
fmt.Printf("Received message: %s\n", string(msg.Value))
consumer.CommitMessage(msg) // 标记消息已被消费
}
// 捕获中断信号并优雅地退出程序
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, os.Interrupt)
<-sigterm
}
该示例代码中首先创建了一个生产者实例,然后发送一条带有时间戳的消息到名为 logs
的主题。接下来,创建一个消费者实例并订阅 logs
主题的所有分区,并在 for 循环中处理从 Kafka 中读取的消息。最后,通过捕获中断信号来优雅地退出程序。
无论是使用 sarama 还是 confluent-kafka-go 库进行日志处理,都需要注意以下几点:
- 配置合理的参数值以满足应用场景需求;
- 处理完每个消息后要及时提交 offset,以确保不会重复消费或漏掉任何消息;
- 使用异步 API 或 goroutine 处理大量的消息记录时需要注意资源占用问题。