在 Golang 中使用 Kafka 时,可以使用第三方驱动包来进行消息生产和消费。其中,常用的驱动包有 sarama 和 confluent-kafka-go。
对于生产者消息生产策略,Kafka 提供了两种方式:同步发送和异步发送。
- 同步发送
在同步发送模式下,当生产者调用 SendMessage() 方法时,会阻塞直到消息被成功写入或发生错误。这种方式虽然简单易用,但是由于需要等待服务器的响应,在高并发场景下可能会降低整体吞吐量。
以下是使用 sarama 库进行同步发送的示例代码:
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("hello world"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
panic(err)
}
fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)
以上代码中创建了一个新的 SyncProducer 实例,并通过 SendMessage() 方法向主题为 test-topic 的分区发送一条字符串消息。如果发送成功,则返回该消息所在分区和偏移量;否则抛出异常。
以下是使用 confluent-kafka-go 库进行同步发送的示例代码:
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
})
if err != nil {
panic(err)
}
defer producer.Close()
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte("hello world"),
}
deliveryChan := make(chan kafka.Event)
err = producer.Produce(msg, deliveryChan)
if err != nil {
panic(err)
}
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Message delivered to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
以上代码中创建了一个新的 Producer 实例,并通过 Produce() 方法向主题为 test-topic 的分区发送一条字符串消息。如果发送成功,则从 deliveryChan 中接收到消息交付事件并打印相关信息;否则抛出异常。
- 异步发送
在异步发送模式下,生产者不会等待服务器响应,而是将消息存储在本地缓冲区中,并立即返回。这种方式可以提高整体吞吐量,但需要注意缓冲区溢出的问题。
以下是使用 sarama 库进行异步发送的示例代码:
producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("hello world"),
}
producer.Input() <- msg
select {
case success := <-producer.Successes():
fmt.Printf("Message sent to partition %d at offset %d\n", success.Partition, success.Offset)
case err := <-producer.Errors():
panic(err.Err)
}
以上代码中创建了一个新的 AsyncProducer 实例,并通过 Input() 方法将一条字符串消息发送到主题为 test-topic 的分区。然后使用 select 语句接收异步发送结果,如果发送成功则打印相关信息;否则抛出异常。
以下是使用 confluent-kafka-go 库进行异步发送的示例代码:
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
})
if err != nil {
panic(err)
}
defer producer.Close()
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte("hello world"),
}
deliveryChan := make(chan kafka.Event)
err = producer.Produce(msg, deliveryChan)
if err != nil {
panic(err)
}
select {
case e := <-deliveryChan:
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Message delivered to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
case <-time.After(3 * time.Second):
fmt.Println("Delivery timed out")
}
以上代码中创建了一个新的 Producer 实例,并通过 Produce() 方法将一条字符串消息发送到主题为 test-topic 的分区。然后使用 select 语句等待异步发送结果,如果在 3 秒内未收到交付事件,则认为发送失败。
需要注意的是,在异步发送模式下,生产者会将消息缓存在本地缓冲区中,并尝试批量发送以提高效率。因此,当缓冲区已满时可能会发生阻塞或丢失消息的情况。可以通过调整驱动包的配置参数来解决该问题。




