ZBLOG

golang kafka 驱动包 生产者 ack 策略

在 Golang 中使用 Kafka 时,生产者的 ack 策略是指生产者等待服务器响应的方式。Kafka 提供了三种 ack 策略:0、1 和 all。

  1. acks=0

acks=0 时,生产者发送消息后不会等待服务器响应,直接返回成功。这种方式虽然速度最快,但也存在风险:如果消息没有被成功写入到任何一个副本中,则无法得知是否发送失败。

以下是使用 sarama 库进行 acks=0 发送的示例代码:

  1. config := sarama.NewConfig()
  2. config.Producer.RequiredAcks = sarama.NoResponse
  3. producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
  4. if err != nil {
  5. panic(err)
  6. }
  7. defer producer.Close()
  8. msg := &sarama.ProducerMessage{
  9. Topic: "test-topic",
  10. Value: sarama.StringEncoder("hello world"),
  11. }
  12. partition, offset, err := producer.SendMessage(msg)
  13. if err != nil {
  14. panic(err)
  15. }
  16. fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)

以上代码中创建了一个新的 SyncProducer 实例,并将 RequiredAcks 设置为 NoResponse 表示不需要等待服务器响应。如果发送成功,则返回该消息所在分区和偏移量;否则抛出异常。

以下是使用 confluent-kafka-go 库进行 acks=0 发送的示例代码:

  1. producer, err := kafka.NewProducer(&kafka.ConfigMap{
  2. "bootstrap.servers": "localhost:9092",
  3. "acks": "0",
  4. })
  5. if err != nil {
  6. panic(err)
  7. }
  8. defer producer.Close()
  9. msg := &kafka.Message{
  10. TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
  11. Value: []byte("hello world"),
  12. }
  13. deliveryChan := make(chan kafka.Event)
  14. err = producer.Produce(msg, deliveryChan)
  15. if err != nil {
  16. panic(err)
  17. }
  18. e := <-deliveryChan
  19. m := e.(*kafka.Message)
  20. if m.TopicPartition.Error != nil {
  21. fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
  22. } else {
  23. fmt.Printf("Message delivered to topic %s [%d] at offset %v\n",
  24. *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
  25. }

以上代码中创建了一个新的 Producer 实例,并将 acks 设置为 0 表示不需要等待服务器响应。如果发送成功,则从 deliveryChan 中接收到消息交付事件并打印相关信息;否则抛出异常。

  1. acks=1

acks=1 时,生产者发送消息后会等待主副本成功写入后返回成功,无需等待所有副本都同步完成。这种方式速度较快且可靠性较高,在大多数情况下都是推荐的方式。

以下是使用 sarama 库进行 acks=1 发送的示例代码:

  1. config := sarama.NewConfig()
  2. config.Producer.RequiredAcks = sarama.WaitForLocal
  3. producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
  4. if err != nil {
  5. panic(err)
  6. }
  7. defer producer.Close()
  8. msg := &sarama.ProducerMessage{
  9. Topic: "test-topic",
  10. Value: sarama.StringEncoder("hello world"),
  11. }
  12. partition, offset, err := producer.SendMessage(msg)
  13. if err != nil {
  14. panic(err)
  15. }
  16. fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)

以上代码中创建了一个新的 SyncProducer 实例,并将 RequiredAcks 设置为 WaitForLocal 表示等待主副本成功写入后返回成功。如果发送成功,则返回该消息所在分区和偏移量;否则抛出异常。

以下是使用 confluent-kafka-go 库进行 acks=1 发送的示例代码:

  1. producer, err := kafka.NewProducer(&kafka.ConfigMap{
  2. "bootstrap.servers": "localhost:9092",
  3. "acks": "1",
  4. })
  5. if err != nil {
  6. panic(err)
  7. }
  8. defer producer.Close()
  9. msg := &kafka.Message{
  10. TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
  11. Value: []byte("hello world"),
  12. }
  13. deliveryChan := make(chan kafka.Event)
  14. err = producer.Produce(msg, deliveryChan)
  15. if err != nil {
  16. panic(err)
  17. }
  18. e := <-deliveryChan
  19. m := e.(*kafka.Message)
  20. if m.TopicPartition.Error != nil {
  21. fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
  22. } else {
  23. fmt.Printf("Message delivered to topic %s [%d] at offset %v\n",
  24. *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
  25. }

以上代码中创建了一个新的 Producer 实例,并将 acks 设置为 1 表示等待主副本成功写入后返回成功。如果发送成功,则从 deliveryChan 中接收到消息交付事件并打印相关信息;否则抛出异常。

  1. acks=all

acks=all 时,生产者发送消息后会等待所有副本都同步完成后才返回成功。这种方式可靠性最高,但速度也最慢。

以下是使用 sarama 库进行 acks=all 发送的示例代码:

  1. config := sarama.NewConfig()
  2. config.Producer.RequiredAcks = sarama.WaitForAll
  3. producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
  4. if err != nil {
  5. panic(err)
  6. }
  7. defer producer.Close()
  8. msg := &sarama.ProducerMessage{
  9. Topic: "test-topic",
  10. Value: sarama.StringEncoder("hello world"),
  11. }
  12. partition, offset, err := producer.SendMessage(msg)
  13. if err != nil {
  14. panic(err)
  15. }
  16. fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)

以上代码中创建了一个新的 SyncProducer 实例,并将 RequiredAcks 设置为 WaitForAll 表示等待所有副本都同步完成后才返回成功。如果发送成功,则返回该消息所在分区和偏移量;否则抛出异常。

以下是使用 confluent-kafka-go 库进行 acks=all 发送的示例代码:

  1. producer, err := kafka.NewProducer(&kafka.ConfigMap{
  2. "bootstrap.servers": "localhost:9092",
  3. "acks": "all",
  4. })
  5. if err != nil {
  6. panic(err)
  7. }
  8. defer producer.Close()
  9. msg := &kafka.Message{
  10. TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
  11. Value: []byte("hello world"),
  12. }
  13. deliveryChan := make(chan kafka.Event)
  14. err = producer.Produce(msg, deliveryChan)
  15. if err != nil {
  16. panic(err)
  17. }
  18. e := <-deliveryChan
  19. m := e.(*kafka.Message)
  20. if m.TopicPartition.Error != nil {
  21. fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
  22. } else {
  23. fmt.Printf("Message delivered to topic %s [%d] at offset %v\n",
  24. *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
  25. }

以上代码中创建了一个新的 Producer 实例,并将 acks 设置为 all 表示等待所有副本都同步完成后才返回成功。如果发送成功,则从 deliveryChan 中接收到消息交付事件并打印相关信息;否则抛出异常。

需要注意的是,ack 策略越高可靠性越高,但速度也会越慢。在实际使用时需要根据场景需求进行选择。

本站部分文章来源于网络,版权归原作者所有,如有侵权请联系站长删除。
转载请注明出处:https://golang.0voice.com/?id=6709

分享:
扫描分享到社交APP
上一篇
下一篇
发表列表
游客 游客
此处应有掌声~
评论列表

还没有评论,快来说点什么吧~

联系我们

在线咨询: 点击这里给我发消息

微信号:3007537140

上班时间: 10:30-22:30

关注我们
x

注册

已经有帐号?