在 Golang 中使用 Kafka 时,生产者的 ack 策略是指生产者等待服务器响应的方式。Kafka 提供了三种 ack 策略:0、1 和 all。
- acks=0
当 acks=0
时,生产者发送消息后不会等待服务器响应,直接返回成功。这种方式虽然速度最快,但也存在风险:如果消息没有被成功写入到任何一个副本中,则无法得知是否发送失败。
以下是使用 sarama 库进行 acks=0
发送的示例代码:
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.NoResponse
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("hello world"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
panic(err)
}
fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)
以上代码中创建了一个新的 SyncProducer 实例,并将 RequiredAcks 设置为 NoResponse 表示不需要等待服务器响应。如果发送成功,则返回该消息所在分区和偏移量;否则抛出异常。
以下是使用 confluent-kafka-go 库进行 acks=0
发送的示例代码:
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"acks": "0",
})
if err != nil {
panic(err)
}
defer producer.Close()
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte("hello world"),
}
deliveryChan := make(chan kafka.Event)
err = producer.Produce(msg, deliveryChan)
if err != nil {
panic(err)
}
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Message delivered to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
以上代码中创建了一个新的 Producer 实例,并将 acks 设置为 0 表示不需要等待服务器响应。如果发送成功,则从 deliveryChan
中接收到消息交付事件并打印相关信息;否则抛出异常。
- acks=1
当 acks=1
时,生产者发送消息后会等待主副本成功写入后返回成功,无需等待所有副本都同步完成。这种方式速度较快且可靠性较高,在大多数情况下都是推荐的方式。
以下是使用 sarama 库进行 acks=1
发送的示例代码:
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("hello world"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
panic(err)
}
fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)
以上代码中创建了一个新的 SyncProducer 实例,并将 RequiredAcks 设置为 WaitForLocal 表示等待主副本成功写入后返回成功。如果发送成功,则返回该消息所在分区和偏移量;否则抛出异常。
以下是使用 confluent-kafka-go 库进行 acks=1
发送的示例代码:
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"acks": "1",
})
if err != nil {
panic(err)
}
defer producer.Close()
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte("hello world"),
}
deliveryChan := make(chan kafka.Event)
err = producer.Produce(msg, deliveryChan)
if err != nil {
panic(err)
}
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Message delivered to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
以上代码中创建了一个新的 Producer 实例,并将 acks 设置为 1 表示等待主副本成功写入后返回成功。如果发送成功,则从 deliveryChan
中接收到消息交付事件并打印相关信息;否则抛出异常。
- acks=all
当 acks=all
时,生产者发送消息后会等待所有副本都同步完成后才返回成功。这种方式可靠性最高,但速度也最慢。
以下是使用 sarama 库进行 acks=all
发送的示例代码:
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("hello world"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
panic(err)
}
fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)
以上代码中创建了一个新的 SyncProducer 实例,并将 RequiredAcks 设置为 WaitForAll 表示等待所有副本都同步完成后才返回成功。如果发送成功,则返回该消息所在分区和偏移量;否则抛出异常。
以下是使用 confluent-kafka-go 库进行 acks=all
发送的示例代码:
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"acks": "all",
})
if err != nil {
panic(err)
}
defer producer.Close()
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte("hello world"),
}
deliveryChan := make(chan kafka.Event)
err = producer.Produce(msg, deliveryChan)
if err != nil {
panic(err)
}
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Message delivered to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
以上代码中创建了一个新的 Producer 实例,并将 acks 设置为 all 表示等待所有副本都同步完成后才返回成功。如果发送成功,则从 deliveryChan
中接收到消息交付事件并打印相关信息;否则抛出异常。
需要注意的是,ack 策略越高可靠性越高,但速度也会越慢。在实际使用时需要根据场景需求进行选择。