卡夫卡流:使用相同的`application.id`从多个主题中消费
我有一个应用程序,需要听取多个不同的主题; 每个主题都有独立的消息处理逻辑。 我曾经想过为每个KafkaStreams实例使用相同的kafka属性,但是我得到如下所示的错误。
错误
java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic
代码 (kotlin)
class KafkaSetup() { companion object { private val LOG = LoggerFactory.getLogger(this::class.java) } fun getProperties(): Properties { val properties = Properties() properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app") return properties } private fun listenOnMyTopic() { val kStreamBuilder = KStreamBuilder() val kStream: KStream = kStreamBuilder.stream("my-topic") kStream.foreach { key, value -> LOG.info("do stuff") } val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties()) kafkaStreams.start() } private fun listenOnMyOtherTopic() { val kStreamBuilder = KStreamBuilder() val kStream: KStream = kStreamBuilder.stream("my-other-topic") kStream.foreach { key, value -> LOG.info("do other stuff") } val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties()) kafkaStreams.start() } }
我发现这个参考 ,建议你不能使用application.id
多个主题,但是我发现很难find参考文档来支持。 application.id
的文档状态:
流处理应用程序的标识符。 在Kafka集群中必须是唯一的。 它被用作1)默认的客户端ID前缀,2)用于成员资格管理的组ID,3)变更记录主题前缀。
问题
- 这个错误是什么意思,是什么原因造成的。
- 鉴于您可以使用相同的ID运行多个应用程序的实例以使用多个主题分区,那么“在Kafka集群中必须是唯一的”是什么意思?
- 你可以使用相同的Kafka流
application.id
来启动两个不同主题上列出的KafkaStreams
吗? 如果是的话,怎么样?
详情:卡夫卡0.11.0.2
卡夫卡流通过分区而不是主题来扩展。 因此,如果使用相同的application.id
启动多个应用程序,则它们必须与它们所订阅的输入主题及其处理逻辑相同。 应用程序使用application.id
作为group.id
形成一个消费者组,因此输入主题的不同分区被分配给不同的实例。
如果具有相同逻辑的不同主题,则可以一次订阅所有主题(在每个开始的实例中)。 虽然缩放仍然基于分区。 (这基本上是您的输入主题的“合并”。)
如果要通过主题进行缩放和/或具有不同的处理逻辑,则必须针对不同的Kafka Streams应用程序使用不同的application.id。