卡夫卡流:使用相同的`application.id`从多个主题中消费

我有一个应用程序,需要听取多个不同的主题; 每个主题都有独立的消息处理逻辑。 我曾经想过为每个KafkaStreams实例使用相同的kafka属性,但是我得到如下所示的错误。

错误

java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic 

代码 (kotlin)

 class KafkaSetup() { companion object { private val LOG = LoggerFactory.getLogger(this::class.java) } fun getProperties(): Properties { val properties = Properties() properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app") return properties } private fun listenOnMyTopic() { val kStreamBuilder = KStreamBuilder() val kStream: KStream = kStreamBuilder.stream("my-topic") kStream.foreach { key, value -> LOG.info("do stuff") } val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties()) kafkaStreams.start() } private fun listenOnMyOtherTopic() { val kStreamBuilder = KStreamBuilder() val kStream: KStream = kStreamBuilder.stream("my-other-topic") kStream.foreach { key, value -> LOG.info("do other stuff") } val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties()) kafkaStreams.start() } } 

我发现这个参考 ,建议你不能使用application.id多个主题,但是我发现很难find参考文档来支持。 application.id的文档状态:

流处理应用程序的标识符。 在Kafka集群中必须是唯一的。 它被用作1)默认的客户端ID前缀,2)用于成员资格管理的组ID,3)变更记录主题前缀。

问题

  1. 这个错误是什么意思,是什么原因造成的。
  2. 鉴于您可以使用相同的ID运行多个应用程序的实例以使用多个主题分区,那么“在Kafka集群中必须是唯一的”是什么意思?
  3. 你可以使用相同的Kafka流application.id来启动两个不同主题上列出的KafkaStreams吗? 如果是的话,怎么样?

详情:卡夫卡0.11.0.2

卡夫卡流通过分区而不是主题来扩展。 因此,如果使用相同的application.id启动多个应用程序,则它们必须与它们所订阅的输入主题及其处理逻辑相同。 应用程序使用application.id作为group.id形成一个消费者组,因此输入主题的不同分区被分配给不同的实例。

如果具有相同逻辑的不同主题,则可以一次订阅所有主题(在每个开始的实例中)。 虽然缩放仍然基于分区。 (这基本上是您的输入主题的“合并”。)

如果要通过主题进行缩放和/或具有不同的处理逻辑,则必须针对不同的Kafka Streams应用程序使用不同的application.id。

Interesting Posts