Kafka主题分区时不要丢失消息

关于负载均衡策略的快速介绍。使用 Golang IBM/sarama 在 Kafka 主题上消费新添加的分区中的事件。

译自 Do not miss messages when Kafka Topic Partitioned,作者 Emre Savcı。

简介

在事件驱动通信时代,Kafka是事实上的标准消息代理之一,它具有主题和消费者组的概念。

在Kafka中,一个主题可以有多个分区,因此可以通过这种方式提高消息处理的并行性。这使我们能够将消费者应用程序扩展到多个实例。

使用Kafka时,可能会向主题添加新的分区。如果配置不正确,消费者可能会错过新分区中的消息,因此进行适当的设置非常重要。

在本文中,我将向您展示如何在本地运行Kafka代理,然后配置消费者以从主题消费消息。在消费主题的同时,我们将创建新的分区,并观察我们的消费者如何自动接收来自新分区的消息。

我们将使用Golang作为编程语言,并使用IBM/sarama作为Kafka库。

https://kajalrawal.medium.com/kafka-broker-kafka-topic-consumer-and-record-flow-in-kafka-ec55104977b8

在Docker中运行Kafka

为了测试我们的消费者和生产者,我们将使用Docker快速启动一个Kafka代理。将以下docker compose配置复制并粘贴到docker-compose.yml中:

version: '2'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:7.4.4
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  kafka-1:
    image: confluentinc/cp-kafka:7.4.4
    depends_on:
      - zookeeper-1
    ports:
      - 29092:29092
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092,PLAINTEXT_HOST://127.0.0.1:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

运行容器:

> docker-compose up -d

WARN[0000] /Users/emre.savci/Desktop/my-projects/kafka/docker-compose.yml: the attribute `version` is obsolete, it will be ignored, please remove it to avoid potential confusion
WARN[0000] Found orphan containers ([kafka-kafka-2-1 kafka-zookeeper-2-1]) for this project. If you removed or renamed this service in your compose file, you can run this command with the --remove-orphans flag to clean it up.
[+] Running 2/2
 ✔ Container kafka-zookeeper-1-1  Started                                                                                                                                  0.3s
 ✔ Container kafka-kafka-1-1      Started     

负载均衡策略

为了处理分区,我们可以根据用例选择不同的负载均衡策略。

我将使用Sarama的源代码注释来解释负载均衡策略。

轮询

此策略返回轮询负载均衡策略,该策略以交替顺序将分区分配给成员。

例如,有两个主题(t0,t1)和两个消费者(m0,m1),每个主题有三个分区(p0,p1,p2):

M0: [t0p0, t0p2, t1p1] M1: [t0p1, t1p0, t1p2]

范围

此策略返回范围负载均衡策略,这是默认策略,它将分区作为范围分配给消费者组成员。

这遵循与https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html相同的逻辑。

两个主题T1和T2,每个主题有六个分区(0..5)和两个成员(M1,M2)的示例:

M1: {T1: [0, 1, 2], T2: [0, 1, 2]} 
M2: {T1: [3, 4, 5], T2: [3, 4, 5]}

粘性

此策略返回粘性负载均衡策略,该策略尝试保留之前的分配,同时保持均衡的分区分配。

主题T有六个分区(0..5)和两个成员(M1,M2)的示例:

M1: {T: [0, 2, 4]} 
M2: {T: [1, 3, 5]}

在重新分配和添加额外消费者后,您可能会得到如下分配计划:

M1: {T: [0, 2]} 
M2: {T: [1, 3]} 
M3: {T: [4, 5]}

演示

示例消费者

type ExampleConsumer struct {
 Ready chan bool
}

// Setup is run before the consumer starts consuming, providing an opportunity to setup things.
func (consumer *ExampleConsumer) Setup(session sarama.ConsumerGroupSession) error {
 // Mark the consumer as ready
 close(consumer.Ready)
 fmt.Println("Setup ", "memberid ", session.MemberID(), "sessionid ", session.GenerationID(), "claims ", session.Claims())

 return nil
}

// Cleanup is run at the end of a session, to cleanup resources.
func (consumer *ExampleConsumer) Cleanup(session sarama.ConsumerGroupSession) error {
 fmt.Println("Cleanup ", "memberid ", session.MemberID(), "sessionid ", session.GenerationID(), "claims ", session.Claims())
 return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *ExampleConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
 fmt.Println("ConsumeClaim ", "memberid ", session.MemberID(), "sessionid ", session.GenerationID(), "claims ", session.Claims())

 for message := range claim.Messages() {
  fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s, partition = %d\n", string(message.Value), message.Timestamp, message.Topic, message.Partition)
  session.MarkMessage(message, "")
 }
 return nil
}

配置和运行

现在让我们配置并运行消费者:

package main

import (
 "context"
 "fmt"
 "log"
 "time"

 "github.com/IBM/sarama"
)

func main() {
 brokers := []string{"localhost:9092"}
 topic := "example-topic"
 groupID := "example-group"

 config := sarama.NewConfig()
 config.Version = sarama.V3_3_0_0 // Set Kafka version
 config.Consumer.Group.Heartbeat.Interval = 1 * time.Second
 config.Consumer.Offsets.Initial = sarama.OffsetOldest
 config.Metadata.RefreshFrequency = time.Minute * 1
 config.Metadata.Full = false

 // Set up group strategies
 config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
  sarama.NewBalanceStrategyRoundRobin(),
 }

 // Create a new consumer group
 consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)
 if err != nil {
  log.Fatalf("Error creating consumer group: %v", err)
 }
 defer consumerGroup.Close()

 ctx := context.Background()
 consumer := &ExampleConsumer{Ready: make(chan bool)}

 // Start consumer group session
 go func() {
  for {
   if err := consumerGroup.Consume(ctx, []string{topic}, consumer); err != nil {
    log.Fatalf("Error in consumer group: %v", err)
   }
   // Check if context was canceled, signaling the application to stop
   if ctx.Err() != nil {
    fmt.Println("Context was canceled")
    return
   }
   // Reset consumer ready channel
   consumer.Ready = make(chan bool)
  }
 }()

 // Wait for consumer to be ready
 <-consumer.Ready
 log.Println("Consumer group is ready")

 <-make(chan struct{})
}

请注意,我们选择轮询作为我们的平衡策略

生产者代码

我们将从生产者开始,自动将消息发送到主题中的每个分区。

package main

import (
 "fmt"
 "time"

 "github.com/IBM/sarama"
)

func main() {
 brokers := []string{"localhost:9092"}
 topic := "example-topic"

 config := sarama.NewConfig()
 config.Version = sarama.V3_3_0_0 // Set Kafka version
 config.Metadata.RefreshFrequency = 1 * time.Minute

 config.Producer.Return.Successes = true
 config.Producer.RequiredAcks = sarama.WaitForAll
 config.Producer.Retry.Max = 3
 config.Producer.Partitioner = sarama.NewRoundRobinPartitioner

 producer, err := sarama.NewSyncProducer(brokers, config)
 if err != nil {
  fmt.Println("Error creating producer: ", err)
  return
 }
 defer producer.Close()

 for {
  partition, _, err := producer.SendMessage(&sarama.ProducerMessage{
   Topic: topic,
   Value: sarama.StringEncoder("Hello, World!"),
  })
  if err != nil {
   fmt.Println("Error producing message: ", err)
  } else {
   fmt.Println("Produced message to partition ", partition)
  }
  time.Sleep(2 * time.Second)
 }
}

现在我们可以看到我们的消费者在运行了。

整体运行

我将启动前面提供的消费者和生产者代码。等待几秒钟后,我们将向Kafka主题添加第二个分区,然后是第三个分区。下面将提供消费者日志,以演示消费者如何处理这些更改。

运行代码

.../consumer > go run main.go
.../producer > go run main.go

创建分区

让我们进入我们的Kafka容器,并在我们的消费者和生产者运行时运行以下命令:

> docker exec -it <container-id> /bin/sh
> ./bin/kafka-topics --bootstrap-server localhost:9092 \
--alter --topic example-topic \
--partitions 2

观察日志,您将看到消费者开始监听新的分区。几分钟后,让我们创建另一个分区:

> ./bin/kafka-topics --bootstrap-server localhost:9092 \
--alter --topic example-topic \
--partitions 3

同样,我们的消费者能够消费新的分区。

以下是消费者日志:

Setup  memberid  sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid  1 claims  map[example-topic:[0]]
2024/12/13 22:25:21 Consumer group is ready
ConsumeClaim  memberid  sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid  1 claims  map[example-topic:[0]]
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Cleanup  memberid  sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid  1 claims  map[example-topic:[0]]
Setup  memberid  sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid  2 claims  map[example-topic:[0 1]]
ConsumeClaim  memberid  sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid  2 claims  map[example-topic:[0 1]]
ConsumeClaim  memberid  sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid  2 claims  map[example-topic:[0 1]]
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1
Cleanup  memberid  sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid  2 claims  map[example-topic:[0 1]]
Setup  memberid  sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid  3 claims  map[example-topic:[0 1 2]]
ConsumeClaim  memberid  sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid  3 claims  map[example-topic:[0 1 2]]
ConsumeClaim  memberid  sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid  3 claims  map[example-topic:[0 1 2]]
ConsumeClaim  memberid  sarama-287eb50b-4f02-49ab-a827-ced69a4fadd6 sessionid  3 claims  map[example-topic:[0 1 2]]
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1
Hello, World!, example-topic, partition: 2
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1
Hello, World!, example-topic, partition: 2
Hello, World!, example-topic, partition: 0
Hello, World!, example-topic, partition: 1

我们可以很容易地看到我们的消费者被告知新的分区,并开始从这些分区读取消息。

结论

  • ✅ 为您的消费者组配置添加合适的负载均衡策略
  • ✅ 在配置中设置可接受的元数据刷新频率
  • ✅ 根据您的需要选择最早或最新的初始偏移量

感谢您的阅读,希望您觉得这篇文章引人入胜。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注