英文:
Kafka Consumer not receiving any message from Topic even though group id and client id is set correctly
问题 {#heading}
使用 @InputChannelAdaptor,我正在从主题轮询消息,但如果我从命令行发布 JSON,就不会收到任何消息,但如果我传递任何文本,它会抛出异常。我试图消费 JSON 对象({"name": "foo"})并将其转换为 CreateResponse 类。
发送消息到主题的命令:
C:\kafka_2.13-3.5.1\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic myTopic
>{"name" : "foo"}
>sdwadaw
代码:
package com.cae.egca.connector.config;
@Configuration
@EnableKafka
@Slf4j
public class KafkaConfig {
@Bean
RecordMessageConverter messageConverter() {
return new StringJsonMessageConverter();
}
@Bean
@InboundChannelAdapter(channel = "inputChannel", poller = @Poller(fixedDelay = "5000"))
public KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory,
RecordMessageConverter messageConverter) {
KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory,
new ConsumerProperties("myTopic"));
kafkaMessageSource.getConsumerProperties().setGroupId("myGroupId");
kafkaMessageSource.getConsumerProperties().setClientId("myClientId");
kafkaMessageSource.setMessageConverter(messageConverter);
kafkaMessageSource.setPayloadType(CreateResponse.class);
return kafkaMessageSource;
}
@Bean
public ConsumerFactory consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "10000");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupID");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, CreateResponse.class);
return new DefaultKafkaConsumerFactory(props);
}
@ServiceActivator(inputChannel = "inputChannel")
void consumeIt(@Payload SftpOutboundFilesDetails cr, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) throws JSchException, URISyntaxException, IOException {
log.info("In SERVICE ACTIVATOR ");
MDC.put("transaction.id", String.valueOf(UUID.randomUUID()));
fileServices.processFile(cr);
acknowledgment.acknowledge();
log.info("ACKNOWLEDGED: ");
}
@Bean
QueueChannel inputChannel() {
return new QueueChannel();
}
}
异常:
Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON
at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:214)
... 14 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sdwadaw': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (String)"sdwadaw"; line: 1, column: 8]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:760)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:3041)
... 21 more
英文:
Using @InputChannelAdaptor , I am polling messages from topic but not receiving any messages if I post json from commandline. but if I pass any text it throws exception. I am trying to consume json object ({"name" : "foo"}) and convert it to CreateResponse class
Command to send message to Topic:
C:\kafka_2.13-3.5.1\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic myTopic
>{"name" : "foo"}
>sdwadaw
Code:
package com.cae.egca.connector.config;
@Configuration
@EnableKafka
@Slf4j
public class KafkaConfig {
@Bean
RecordMessageConverter messageConverter() {
return new StringJsonMessageConverter();
}
@Bean
@InboundChannelAdapter(channel = "inputChannel", poller = @Poller(fixedDelay = "5000"))
public KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory,
RecordMessageConverter messageConverter) {
KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory,
new ConsumerProperties("myTopic"));
kafkaMessageSource.getConsumerProperties().setGroupId("myGroupId");
kafkaMessageSource.getConsumerProperties().setClientId("myClientId");
kafkaMessageSource.setMessageConverter(messageConverter);
kafkaMessageSource.setPayloadType(CreateResponse.class);
return kafkaMessageSource;
}
@Bean
public ConsumerFactory consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "10000");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupID");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, CreateResponse.class);
return new DefaultKafkaConsumerFactory(props);
}
@ServiceActivator(inputChannel = "inputChannel")
void consumeIt(@Payload SftpOutboundFilesDetails cr, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) throws JSchException, URISyntaxException, IOException {
log.info("In SERVICE ACTIVATOR ");
MDC.put("transaction.id", String.valueOf(UUID.randomUUID()));
fileServices.processFile(cr);
acknowledgment.acknowledge();
log.info("ACKNOWLEDGED: ");
}
@Bean
QueueChannel inputChannel() {
return new QueueChannel();
}
}
Exception:
Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON
at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConvertework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)
... 14 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sdwadaw': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (String)"sdwadaw"; line: 1, column: 8]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:760)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:3041)
at ... 21 more```
</details>
# 答案1
**得分**: 0
这对我来说没问题...
```java
@SpringBootApplication
public class So76876015Application {
public static void main(String[] args) {
SpringApplication.run(So76876015Application.class, args);
}
@Bean
RecordMessageConverter messageConverter() {
return new StringJsonMessageConverter();
}
@Bean
@InboundChannelAdapter(channel = "inputChannel", poller = @Poller(fixedDelay = "5000"))
KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory,
RecordMessageConverter messageConverter) {
KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory,
new ConsumerProperties("myTopic"));
kafkaMessageSource.getConsumerProperties().setGroupId("myGroupId");
kafkaMessageSource.getConsumerProperties().setClientId("myClientId");
kafkaMessageSource.setMessageConverter(messageConverter);
kafkaMessageSource.setPayloadType(CreateResponse.class);
return kafkaMessageSource;
}
@ServiceActivator(inputChannel = "inputChannel")
void consumeIt(CreateResponse cr) {
System.out.println(cr);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("myTopic").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("myTopic", "{\"name\":\"foo\"}");
};
}
public static class CreateResponse {
private String name;
protected String getName() {
return this.name;
}
protected void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "CreateResponse [name=" + this.name + "]";
}
}
}
spring.kafka.consumer.auto-offset-reset=earliest
CreateResponse [name=foo]
英文:
This works fine for me...
@SpringBootApplication
public class So76876015Application {
public static void main(String[] args) {
SpringApplication.run(So76876015Application.class, args);
}
@Bean
RecordMessageConverter messageConverter() {
return new StringJsonMessageConverter();
}
@Bean
@InboundChannelAdapter(channel = &quot;inputChannel&quot;, poller = @Poller(fixedDelay = &quot;5000&quot;))
KafkaMessageSource&lt;String, String&gt; consumeMsg(ConsumerFactory&lt;String, String&gt; consumerFactory,
RecordMessageConverter messageConverter) {
KafkaMessageSource&lt;String, String&gt; kafkaMessageSource = new KafkaMessageSource&lt;&gt;(consumerFactory,
new ConsumerProperties(&quot;myTopic&quot;));
kafkaMessageSource.getConsumerProperties().setGroupId(&quot;myGroupId&quot;);
kafkaMessageSource.getConsumerProperties().setClientId(&quot;myClientId&quot;);
kafkaMessageSource.setMessageConverter(messageConverter);
kafkaMessageSource.setPayloadType(CreateResponse.class);
return kafkaMessageSource;
}
@ServiceActivator(inputChannel = &quot;inputChannel&quot;)
void consumeIt(CreateResponse cr) {
System.out.println(cr);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name(&quot;myTopic&quot;).partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaTemplate&lt;String, String&gt; template) {
return args -&gt; {
template.send(&quot;myTopic&quot;, &quot;{\&quot;name\&quot;:\&quot;foo\&quot;}&quot;);
};
}
public static class CreateResponse {
private String name;
protected String getName() {
return this.name;
}
protected void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return &quot;CreateResponse [name=&quot; + this.name + &quot;]&quot;;
}
}
}
spring.kafka.consumer.auto-offset-reset=earliest
CreateResponse [name=foo]