Golang Kafka 体系结构:
Kafka 是一个分布式的消息队列系统,其核心思想是基于发布/订阅模式来存储和处理消息。以下是 Kafka 的体系结构:
Broker:Kafka 集群中的每个节点被称为 Broker。它们是通过 Zookeeper 协调器进行协作和管理。
Topic:每条消息都属于一个特定的主题。主题是逻辑上类似于数据表的东西,它们用来分类消息。
Partition:每个主题可以分成多个 Partition,每个 Partition 包含了一部分消息。Partition 中的每条消息都有一个唯一的序号(Offset),并且 Offset 在整个 Partition 中是连续不断的。
Producer:生产者负责向 Kafka Broker 发送消息,并且可以指定要将消息发送到哪个 Partition。
Consumer:消费者从 Kafka Broker 拉取数据,并且可以根据需要订阅一个或多个主题及其相应的 Partition。
Consumer Group:一个消费者组由多个消费者实例组成,它们共同消费同一个主题下的不同 Partition。当某个消费者实例失败时,剩余实例会重新均衡以确保各自接收相等数量的消息。
ZooKeeper:Kafka 使用 Zookeeper 进行元数据管理、Broker 故障检测、领导选举等操作。
Golang Kafka 读写机制:
在 Golang 中,我们可以使用第三方库 sarama 来进行 Kafka 的读写操作。
- 写入数据:
// 创建一个生产者对象
producer, err := sarama.NewSyncProducer([]string{"kafka-broker:9092"}, nil)
if err != nil {
panic(err)
}
defer producer.Close()
// 构造消息
msg := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("hello, kafka!"),
}
// 发送消息
partition, offset, err := producer.SendMessage(msg)
if err != nil {
panic(err)
}
fmt.Printf("message sent to partition %d at offset %d\n", partition, offset)
- 读取数据:
// 创建一个消费者对象
consumer, err := sarama.NewConsumer([]string{"kafka-broker:9092"}, nil)
if err != nil {
panic(err)
}
defer consumer.Close()
// 订阅主题及其 Partition
partitionList, _ := consumer.Partitions("test-topic")
for _, partition := range partitionList {
pc, _ := consumer.ConsumePartition("test-topic", partition, sarama.OffsetNewest)
// 消费消息
go func(pc sarama.PartitionConsumer) {
defer pc.AsyncClose()
for msg := range pc.Messages() {
fmt.Printf("partition:%d\toffset:%d\tvalue:%s\n", msg.Partition, msg.Offset, string(msg.Value))
}
}(pc)
}
// 阻塞程序不退出,等待消息到来或错误发生
select {}
这里的代码实现了从 test-topic 主题中消费消息,并且输出了每个消息所在的 Partition、Offset 以及具体的消息内容。我们可以使用 consumer.Partitions() 方法获取主题下所有的 Partition,然后循环遍历每个 Partition 创建一个消费者实例去消费消息。当然,这里也可以采用 Consumer Group 的方式进行消费。




