51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Kafka 消费者未从主题接收任何消息,尽管组 ID 和客户端 ID 已正确设置。

英文:

Kafka Consumer not receiving any message from Topic even though group id and client id is set correctly

问题 {#heading}

使用 @InputChannelAdaptor,我正在从主题轮询消息,但如果我从命令行发布 JSON,就不会收到任何消息,但如果我传递任何文本,它会抛出异常。我试图消费 JSON 对象({"name": "foo"})并将其转换为 CreateResponse 类。

发送消息到主题的命令:

C:\kafka_2.13-3.5.1\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic myTopic
>{"name" : "foo"}
>sdwadaw

代码:

package com.cae.egca.connector.config;

@Configuration
@EnableKafka
@Slf4j
public class KafkaConfig {

    @Bean
    RecordMessageConverter messageConverter() {
        return new StringJsonMessageConverter();
    }

    @Bean
    @InboundChannelAdapter(channel = "inputChannel", poller = @Poller(fixedDelay = "5000"))
    public KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory,
                                                         RecordMessageConverter messageConverter) {

        KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory,
                new ConsumerProperties("myTopic"));
        kafkaMessageSource.getConsumerProperties().setGroupId("myGroupId");
        kafkaMessageSource.getConsumerProperties().setClientId("myClientId");
        kafkaMessageSource.setMessageConverter(messageConverter);
        kafkaMessageSource.setPayloadType(CreateResponse.class);

        return kafkaMessageSource;
    }

    @Bean
    public ConsumerFactory consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "10000");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupID");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, CreateResponse.class);

        return new DefaultKafkaConsumerFactory(props);
    }

    @ServiceActivator(inputChannel = "inputChannel")
    void consumeIt(@Payload SftpOutboundFilesDetails cr, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) throws JSchException, URISyntaxException, IOException {
        log.info("In SERVICE ACTIVATOR ");
        MDC.put("transaction.id", String.valueOf(UUID.randomUUID()));
        fileServices.processFile(cr);
        acknowledgment.acknowledge();
        log.info("ACKNOWLEDGED: ");
    }

    @Bean
    QueueChannel inputChannel() {
        return new QueueChannel();
    }
}

异常:

Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON
	at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:214)
	... 14 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sdwadaw': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (String)"sdwadaw"; line: 1, column: 8]
	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477)
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:760)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:3041)
	... 21 more

英文:

Using @InputChannelAdaptor , I am polling messages from topic but not receiving any messages if I post json from commandline. but if I pass any text it throws exception. I am trying to consume json object ({"name" : "foo"}) and convert it to CreateResponse class

Command to send message to Topic:

C:\kafka_2.13-3.5.1\bin\windows&gt;kafka-console-producer.bat --broker-list localhost:9092 --topic myTopic
&gt;{&quot;name&quot; : &quot;foo&quot;}
&gt;sdwadaw

Code:

package com.cae.egca.connector.config;
@Configuration
@EnableKafka
@Slf4j
public class KafkaConfig {
@Bean
RecordMessageConverter messageConverter() {
return new StringJsonMessageConverter();
}
@Bean
@InboundChannelAdapter(channel = &quot;inputChannel&quot;, poller = @Poller(fixedDelay = &quot;5000&quot;))
public KafkaMessageSource&lt;String, String&gt; consumeMsg(ConsumerFactory&lt;String, String&gt; consumerFactory,
RecordMessageConverter messageConverter) {
KafkaMessageSource&lt;String, String&gt; kafkaMessageSource = new KafkaMessageSource&lt;&gt;(consumerFactory,
new ConsumerProperties(&quot;myTopic&quot;));
kafkaMessageSource.getConsumerProperties().setGroupId(&quot;myGroupId&quot;);
kafkaMessageSource.getConsumerProperties().setClientId(&quot;myClientId&quot;);
kafkaMessageSource.setMessageConverter(messageConverter);
kafkaMessageSource.setPayloadType(CreateResponse.class);
return kafkaMessageSource;
}
@Bean
public ConsumerFactory consumerFactory() {
Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;localhost:9092&quot;);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, &quot;10000&quot;);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, &quot;30000&quot;);
props.put(ConsumerConfig.GROUP_ID_CONFIG, &quot;myGroupID&quot;);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;latest&quot;);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, CreateResponse.class);
return new DefaultKafkaConsumerFactory(props);
}
@ServiceActivator(inputChannel = &quot;inputChannel&quot;)
void consumeIt(@Payload SftpOutboundFilesDetails cr, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) throws JSchException, URISyntaxException, IOException {
log.info(&quot;In SERVICE ACTIVATOR &quot;);
MDC.put(&quot;transaction.id&quot;, String.valueOf(UUID.randomUUID()));
fileServices.processFile(cr);
acknowledgment.acknowledge();
log.info(&quot;ACKNOWLEDGED: &quot;);
}
@Bean
QueueChannel inputChannel() {
return new QueueChannel();
}
}

Exception:

Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON
at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConvertework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)
... 14 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token &#39;sdwadaw&#39;: was expecting (JSON String, Number, Array, Object or token &#39;null&#39;, &#39;true&#39; or &#39;false&#39;)
at [Source: (String)&quot;sdwadaw&quot;; line: 1, column: 8]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:760)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:3041)
at 	... 21 more```
</details>
# 答案1
**得分**: 0
这对我来说没问题...
```java
@SpringBootApplication
public class So76876015Application {
public static void main(String[] args) {
SpringApplication.run(So76876015Application.class, args);
}
@Bean
RecordMessageConverter messageConverter() {
return new StringJsonMessageConverter();
}
@Bean
@InboundChannelAdapter(channel = "inputChannel", poller = @Poller(fixedDelay = "5000"))
KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory,
RecordMessageConverter messageConverter) {
KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory,
new ConsumerProperties("myTopic"));
kafkaMessageSource.getConsumerProperties().setGroupId("myGroupId");
kafkaMessageSource.getConsumerProperties().setClientId("myClientId");
kafkaMessageSource.setMessageConverter(messageConverter);
kafkaMessageSource.setPayloadType(CreateResponse.class);
return kafkaMessageSource;
}
@ServiceActivator(inputChannel = "inputChannel")
void consumeIt(CreateResponse cr) {
System.out.println(cr);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("myTopic").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("myTopic", "{\"name\":\"foo\"}");
};
}
public static class CreateResponse {
private String name;
protected String getName() {
return this.name;
}
protected void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "CreateResponse [name=" + this.name + "]";
}
}
}
spring.kafka.consumer.auto-offset-reset=earliest
CreateResponse [name=foo]

英文:

This works fine for me...

@SpringBootApplication
public class So76876015Application {

	public static void main(String[] args) {
		SpringApplication.run(So76876015Application.class, args);
	}

	@Bean
	RecordMessageConverter messageConverter() {
		return new StringJsonMessageConverter();
	}

	@Bean
	@InboundChannelAdapter(channel = &quot;inputChannel&quot;, poller = @Poller(fixedDelay = &quot;5000&quot;))
	KafkaMessageSource&lt;String, String&gt; consumeMsg(ConsumerFactory&lt;String, String&gt; consumerFactory,
			RecordMessageConverter messageConverter) {

		KafkaMessageSource&lt;String, String&gt; kafkaMessageSource = new KafkaMessageSource&lt;&gt;(consumerFactory,
				new ConsumerProperties(&quot;myTopic&quot;));
		kafkaMessageSource.getConsumerProperties().setGroupId(&quot;myGroupId&quot;);
		kafkaMessageSource.getConsumerProperties().setClientId(&quot;myClientId&quot;);
		kafkaMessageSource.setMessageConverter(messageConverter);
		kafkaMessageSource.setPayloadType(CreateResponse.class);

		return kafkaMessageSource;
	}

	@ServiceActivator(inputChannel = &quot;inputChannel&quot;)
	void consumeIt(CreateResponse cr) {
		System.out.println(cr);
	}

	@Bean
	public NewTopic topic() {
		return TopicBuilder.name(&quot;myTopic&quot;).partitions(1).replicas(1).build();
	}

	@Bean
	ApplicationRunner runner(KafkaTemplate&lt;String, String&gt; template) {
		return args -&gt; {
			template.send(&quot;myTopic&quot;, &quot;{\&quot;name\&quot;:\&quot;foo\&quot;}&quot;);
		};
	}

	public static class CreateResponse {

		private String name;

		protected String getName() {
			return this.name;
		}

		protected void setName(String name) {
			this.name = name;
		}

		@Override
		public String toString() {
			return &quot;CreateResponse [name=&quot; + this.name + &quot;]&quot;;
		}

	}

}
spring.kafka.consumer.auto-offset-reset=earliest
CreateResponse [name=foo]

赞(1)
未经允许不得转载:工具盒子 » Kafka 消费者未从主题接收任何消息,尽管组 ID 和客户端 ID 已正确设置。