51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Kafka 确保消费者组保持活跃

英文:

Kafka ensuring consumer group stays alive

问题 {#heading}

我有一个在独立的Kubernetes Pod中生成生产者和消费者的过程。我想使用auto.offset.reset "latest",因此需要确保消费者 Pod 在生产者 Pod 之前启动,因为我不希望生产者在消费者 Pod 上线之前开始生产消息。

我的简单方法是在生成这些 Pod 的过程中,在任何 Pod 生成之前创建消费者组。在测试中,我注意到消费者组的状态大约在约 45 秒后从稳定变为空,然后消费者组会在接下来的 30 秒到几分钟内被移除。

我如何确保创建的消费者组保持更长时间?

我的offsets.retention.minutes是默认的 7 天,参考 https://stackoverflow.com/a/65189562/9992341

我正在使用 Python 的 confluent_kafka 包来创建这个组(似乎没有直接的 API 来创建一个组 ID),并且尝试过调整订阅的可调用参数。

from confluent_kafka import Consumer

consumer = Consumer(
    {
        "sasl.username": "***",
        "sasl.password": "***",
        "bootstrap.servers": "***",
        "group.id": "test-group-1",
        "security.protocol": "SASL_SSL",
        "sasl.mechanisms": "PLAIN",
        "auto.offset.reset": "latest",
    },
)
consumer.subscribe([topic])

我运行上述代码,并在一个单独的脚本中使用管理员客户端检查组:

import confluent_kafka.admin

admin_client = confluent_kafka.admin.AdminClient(
    {
        'sasl.username': '***',
        'sasl.password': '***',
        'bootstrap.servers': '***',
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'PLAIN',
    }
)

def list_groups(admin_client):

    future = admin_client.list_consumer_groups()
    res = future.result()
    lst = [(i.group_id, i.state) for i in res.valid]
    for i in sorted(lst):
        print(i)

list_groups(admin_client)
# ('test-group-1', <ConsumerGroupState.STABLE: 3>)

然而,正如我所说,这个组的状态很快变为"EMPTY"并消失,尽管保留应该是 7 天(这对于我的用例来说有些过度,因为 Pod 很接近地一起启动)。

注意:我在消息被生产到主题和未生产到主题时都进行了测试,但没有观察到任何变化。 英文:

I have a process which spawns a producer and consumer in separate pods (kubernetes). I want to use auto.offset.reset &quot;latest&quot;, and thus need to guarantee that the consumer pod spins up prior to the producer pod, as I do not want the producer to begin producing messages before the consumer pod comes online.

My simple approach is to have the process which spawns these pods to create the consumer group prior to either of the pods spawning. In testing, I noticed that the consumer group state goes from stable to empty after about ~45 seconds and then the consumer group is variably removed after anywhere from another 30 seconds to a few minutes.

How can I guarantee that the consumer group created stays around for longer?

My offsets.retention.minutes is the default of 7 days, as per https://stackoverflow.com/a/65189562/9992341.

I am using python's confluent_kafka package to create the group (there appears to be no direct api to create a group id), and I have tried messing around with the subscribe callable params.

from confluent_kafka import Consumer
`consumer = Consumer(
{
&quot;sasl.username&quot;: &quot;***&quot;,
&quot;sasl.password&quot;: &quot;***&quot;,
&quot;bootstrap.servers&quot;: &quot;***&quot;,
&quot;group.id&quot;: &quot;test-group-1&quot;,
&quot;security.protocol&quot;: &quot;SASL_SSL&quot;,
&quot;sasl.mechanisms&quot;: &quot;PLAIN&quot;,
&quot;auto.offset.reset&quot;: &quot;latest&quot;,
},
)
consumer.subscribe([topic]) #, on_lost=lambda *args: None)
`

I run the above code and check in a separate script with the admin client the group:

import confluent_kafka.admin

admin_client = confluent_kafka.admin.AdminClient(
{
\&#39;sasl.username\&#39;: \&quot;\*\*\*\&quot;,
\&#39;sasl.password\&#39;: \&quot;\*\*\*\&quot;,
\&#39;bootstrap.servers\&#39;: \&quot;\*\*\*\&quot;,
\&#39;security.protocol\&#39;: \&#39;SASL_SSL\&#39;,
\&#39;sasl.mechanisms\&#39;: \&#39;PLAIN\&#39;,
}
)


def list_groups(admin_client):


    future = admin_client.list_consumer_groups()
    res = future.result()
    lst = [(i.group_id, i.state) for i in res.valid]
    for i in sorted(lst):
        print(i) # noqa: T201




list_groups(admin_client)


(\&#39;test-group-1\&#39;, \&lt;ConsumerGroupState.STABLE: 3\&gt;)
==================================================================

However as stated this group's state pretty quickly becomes "EMPTY" and disappears, even though the retention should be 7 days (which is overkill for my use case where pods come up pretty close together).

Note: I have tested this while messages were being produced to the topic and not produced, but no change is observed.

答案1 {#1}

得分: 1

auto.offset.reset 设置仅在 Kafka 没有提交任何偏移信息时才适用。您可以使用 auto.offset.reset = earliest 启动消费者。仅在首次运行消费者时,它将从开头消费。在后续运行中,它将从上次提交的偏移位置开始。

关于消费者组消失的问题。这是因为该组没有提交偏移。 英文:

The settings auto.offset.reset only applies when Kafka does not have any offset information committed. You can start the consumer with auto.offset.reset = earliest. Only on the first run of the consumer it will consume from the beginning. On subsequent runs it will start at the last committed offset positions.

Regarding the disappearing of your consumer group. It's because there is no offset committed for this group.


赞(3)
未经允许不得转载:工具盒子 » Kafka 确保消费者组保持活跃