ZBLOG

golang kafka 驱动包 生产者 ack 策略

在 Golang 中使用 Kafka 时,生产者的 ack 策略是指生产者等待服务器响应的方式。Kafka 提供了三种 ack 策略:0、1 和 all。

  1. acks=0

acks=0 时,生产者发送消息后不会等待服务器响应,直接返回成功。这种方式虽然速度最快,但也存在风险:如果消息没有被成功写入到任何一个副本中,则无法得知是否发送失败。

以下是使用 sarama 库进行 acks=0 发送的示例代码:

config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.NoResponse

producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
    panic(err)
}
defer producer.Close()

msg := &sarama.ProducerMessage{
    Topic: "test-topic",
    Value: sarama.StringEncoder("hello world"),
}

partition, offset, err := producer.SendMessage(msg)
if err != nil {
    panic(err)
}
fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)

以上代码中创建了一个新的 SyncProducer 实例,并将 RequiredAcks 设置为 NoResponse 表示不需要等待服务器响应。如果发送成功,则返回该消息所在分区和偏移量;否则抛出异常。

以下是使用 confluent-kafka-go 库进行 acks=0 发送的示例代码:

producer, err := kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers": "localhost:9092",
    "acks":              "0",
})
if err != nil {
    panic(err)
}
defer producer.Close()

msg := &kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    Value:          []byte("hello world"),
}

deliveryChan := make(chan kafka.Event)
err = producer.Produce(msg, deliveryChan)
if err != nil {
    panic(err)
}

e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
    fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
    fmt.Printf("Message delivered to topic %s [%d] at offset %v\n",
        *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}

以上代码中创建了一个新的 Producer 实例,并将 acks 设置为 0 表示不需要等待服务器响应。如果发送成功,则从 deliveryChan 中接收到消息交付事件并打印相关信息;否则抛出异常。

  1. acks=1

acks=1 时,生产者发送消息后会等待主副本成功写入后返回成功,无需等待所有副本都同步完成。这种方式速度较快且可靠性较高,在大多数情况下都是推荐的方式。

以下是使用 sarama 库进行 acks=1 发送的示例代码:

config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal

producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
    panic(err)
}
defer producer.Close()

msg := &sarama.ProducerMessage{
    Topic: "test-topic",
    Value: sarama.StringEncoder("hello world"),
}

partition, offset, err := producer.SendMessage(msg)
if err != nil {
    panic(err)
}
fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)

以上代码中创建了一个新的 SyncProducer 实例,并将 RequiredAcks 设置为 WaitForLocal 表示等待主副本成功写入后返回成功。如果发送成功,则返回该消息所在分区和偏移量;否则抛出异常。

以下是使用 confluent-kafka-go 库进行 acks=1 发送的示例代码:

producer, err := kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers": "localhost:9092",
    "acks":              "1",
})
if err != nil {
    panic(err)
}
defer producer.Close()

msg := &kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    Value:          []byte("hello world"),
}

deliveryChan := make(chan kafka.Event)
err = producer.Produce(msg, deliveryChan)
if err != nil {
    panic(err)
}

e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
    fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
    fmt.Printf("Message delivered to topic %s [%d] at offset %v\n",
        *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}

以上代码中创建了一个新的 Producer 实例,并将 acks 设置为 1 表示等待主副本成功写入后返回成功。如果发送成功,则从 deliveryChan 中接收到消息交付事件并打印相关信息;否则抛出异常。

  1. acks=all

acks=all 时,生产者发送消息后会等待所有副本都同步完成后才返回成功。这种方式可靠性最高,但速度也最慢。

以下是使用 sarama 库进行 acks=all 发送的示例代码:

config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll

producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
    panic(err)
}
defer producer.Close()

msg := &sarama.ProducerMessage{
    Topic: "test-topic",
    Value: sarama.StringEncoder("hello world"),
}

partition, offset, err := producer.SendMessage(msg)
if err != nil {
    panic(err)
}
fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)

以上代码中创建了一个新的 SyncProducer 实例,并将 RequiredAcks 设置为 WaitForAll 表示等待所有副本都同步完成后才返回成功。如果发送成功,则返回该消息所在分区和偏移量;否则抛出异常。

以下是使用 confluent-kafka-go 库进行 acks=all 发送的示例代码:

producer, err := kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers": "localhost:9092",
    "acks":              "all",
})
if err != nil {
    panic(err)
}
defer producer.Close()

msg := &kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    Value:          []byte("hello world"),
}

deliveryChan := make(chan kafka.Event)
err = producer.Produce(msg, deliveryChan)
if err != nil {
    panic(err)
}

e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
    fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
    fmt.Printf("Message delivered to topic %s [%d] at offset %v\n",
        *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}

以上代码中创建了一个新的 Producer 实例,并将 acks 设置为 all 表示等待所有副本都同步完成后才返回成功。如果发送成功,则从 deliveryChan 中接收到消息交付事件并打印相关信息;否则抛出异常。

需要注意的是,ack 策略越高可靠性越高,但速度也会越慢。在实际使用时需要根据场景需求进行选择。

本站部分文章来源于网络,版权归原作者所有,如有侵权请联系站长删除。
转载请注明出处:https://golang.0voice.com/?id=6709

分享:
扫描分享到社交APP
上一篇
下一篇
发表列表
游客 游客
此处应有掌声~
评论列表

还没有评论,快来说点什么吧~

联系我们

在线咨询: 点击这里给我发消息

微信号:3007537140

上班时间: 10:30-22:30

关注我们
x

注册

已经有帐号?