51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Kafka 消费者未从主题接收任何消息,尽管组 ID 和客户端 ID 已正确设置。

英文:

Kafka Consumer not receiving any message from Topic even though group id and client id is set correctly

问题 {#heading}

使用 @InputChannelAdaptor,我正在从主题轮询消息,但如果我从命令行发布 JSON,就不会收到任何消息,但如果我传递任何文本,它会抛出异常。我试图消费 JSON 对象({"name": "foo"})并将其转换为 CreateResponse 类。

发送消息到主题的命令:

C:\kafka_2.13-3.5.1\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic myTopic
>{"name" : "foo"}
>sdwadaw

代码:

package com.cae.egca.connector.config;

@Configuration @EnableKafka @Slf4j public class KafkaConfig {

@Bean
RecordMessageConverter messageConverter() {
    return new StringJsonMessageConverter();
}

@Bean
@InboundChannelAdapter(channel = "inputChannel", poller = @Poller(fixedDelay = "5000"))
public KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory,
                                                     RecordMessageConverter messageConverter) {

    KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory,
            new ConsumerProperties("myTopic"));
    kafkaMessageSource.getConsumerProperties().setGroupId("myGroupId");
    kafkaMessageSource.getConsumerProperties().setClientId("myClientId");
    kafkaMessageSource.setMessageConverter(messageConverter);
    kafkaMessageSource.setPayloadType(CreateResponse.class);

    return kafkaMessageSource;
}

@Bean
public ConsumerFactory consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "10000");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupID");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, CreateResponse.class);

    return new DefaultKafkaConsumerFactory(props);
}

@ServiceActivator(inputChannel = "inputChannel")
void consumeIt(@Payload SftpOutboundFilesDetails cr, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) throws JSchException, URISyntaxException, IOException {
    log.info("In SERVICE ACTIVATOR ");
    MDC.put("transaction.id", String.valueOf(UUID.randomUUID()));
    fileServices.processFile(cr);
    acknowledgment.acknowledge();
    log.info("ACKNOWLEDGED: ");
}

@Bean
QueueChannel inputChannel() {
    return new QueueChannel();
}

}

异常:

Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON
	at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:214)
	... 14 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sdwadaw': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (String)"sdwadaw"; line: 1, column: 8]
	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477)
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:760)
	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:3041)
	... 21 more

英文:

Using @InputChannelAdaptor , I am polling messages from topic but not receiving any messages if I post json from commandline. but if I pass any text it throws exception. I am trying to consume json object ({"name" : "foo"}) and convert it to CreateResponse class

Command to send message to Topic:

C:\kafka_2.13-3.5.1\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic myTopic
>{"name" : "foo"}
>sdwadaw

Code:

package com.cae.egca.connector.config;
@Configuration
@EnableKafka
@Slf4j
public class KafkaConfig {
@Bean
RecordMessageConverter messageConverter() {
return new StringJsonMessageConverter();
}
@Bean
@InboundChannelAdapter(channel = "inputChannel", poller = @Poller(fixedDelay = "5000"))
public KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory,
RecordMessageConverter messageConverter) {
KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory,
new ConsumerProperties("myTopic"));
kafkaMessageSource.getConsumerProperties().setGroupId("myGroupId");
kafkaMessageSource.getConsumerProperties().setClientId("myClientId");
kafkaMessageSource.setMessageConverter(messageConverter);
kafkaMessageSource.setPayloadType(CreateResponse.class);
return kafkaMessageSource;
}
@Bean
public ConsumerFactory consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "10000");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupID");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, CreateResponse.class);
return new DefaultKafkaConsumerFactory(props);
}
@ServiceActivator(inputChannel = "inputChannel")
void consumeIt(@Payload SftpOutboundFilesDetails cr, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) throws JSchException, URISyntaxException, IOException {
log.info("In SERVICE ACTIVATOR ");
MDC.put("transaction.id", String.valueOf(UUID.randomUUID()));
fileServices.processFile(cr);
acknowledgment.acknowledge();
log.info("ACKNOWLEDGED: ");
}
@Bean
QueueChannel inputChannel() {
return new QueueChannel();
}
}

Exception:

Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON
at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConvertework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)
... 14 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sdwadaw': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (String)"sdwadaw"; line: 1, column: 8]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:760)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:3041)
at 	... 21 more```
</details>
# 答案1
**得分**: 0
这对我来说没问题...
```java
@SpringBootApplication
public class So76876015Application {
public static void main(String[] args) {
SpringApplication.run(So76876015Application.class, args);
}
@Bean
RecordMessageConverter messageConverter() {
return new StringJsonMessageConverter();
}
@Bean
@InboundChannelAdapter(channel = "inputChannel", poller = @Poller(fixedDelay = "5000"))
KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory,
RecordMessageConverter messageConverter) {
KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory,
new ConsumerProperties("myTopic"));
kafkaMessageSource.getConsumerProperties().setGroupId("myGroupId");
kafkaMessageSource.getConsumerProperties().setClientId("myClientId");
kafkaMessageSource.setMessageConverter(messageConverter);
kafkaMessageSource.setPayloadType(CreateResponse.class);
return kafkaMessageSource;
}
@ServiceActivator(inputChannel = "inputChannel")
void consumeIt(CreateResponse cr) {
System.out.println(cr);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("myTopic").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("myTopic", "{\"name\":\"foo\"}");
};
}
public static class CreateResponse {
private String name;
protected String getName() {
return this.name;
}
protected void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "CreateResponse [name=" + this.name + "]";
}
}
}
spring.kafka.consumer.auto-offset-reset=earliest
CreateResponse [name=foo]

英文:

This works fine for me...

@SpringBootApplication
public class So76876015Application {
public static void main(String[] args) {
	SpringApplication.run(So76876015Application.class, args);
}

@Bean
RecordMessageConverter messageConverter() {
	return new StringJsonMessageConverter();
}

@Bean
@InboundChannelAdapter(channel = &amp;quot;inputChannel&amp;quot;, poller = @Poller(fixedDelay = &amp;quot;5000&amp;quot;))
KafkaMessageSource&amp;lt;String, String&amp;gt; consumeMsg(ConsumerFactory&amp;lt;String, String&amp;gt; consumerFactory,
		RecordMessageConverter messageConverter) {

	KafkaMessageSource&amp;lt;String, String&amp;gt; kafkaMessageSource = new KafkaMessageSource&amp;lt;&amp;gt;(consumerFactory,
			new ConsumerProperties(&amp;quot;myTopic&amp;quot;));
	kafkaMessageSource.getConsumerProperties().setGroupId(&amp;quot;myGroupId&amp;quot;);
	kafkaMessageSource.getConsumerProperties().setClientId(&amp;quot;myClientId&amp;quot;);
	kafkaMessageSource.setMessageConverter(messageConverter);
	kafkaMessageSource.setPayloadType(CreateResponse.class);

	return kafkaMessageSource;
}

@ServiceActivator(inputChannel = &amp;quot;inputChannel&amp;quot;)
void consumeIt(CreateResponse cr) {
	System.out.println(cr);
}

@Bean
public NewTopic topic() {
	return TopicBuilder.name(&amp;quot;myTopic&amp;quot;).partitions(1).replicas(1).build();
}

@Bean
ApplicationRunner runner(KafkaTemplate&amp;lt;String, String&amp;gt; template) {
	return args -&amp;gt; {
		template.send(&amp;quot;myTopic&amp;quot;, &amp;quot;{\&amp;quot;name\&amp;quot;:\&amp;quot;foo\&amp;quot;}&amp;quot;);
	};
}

public static class CreateResponse {

	private String name;

	protected String getName() {
		return this.name;
	}

	protected void setName(String name) {
		this.name = name;
	}

	@Override
	public String toString() {
		return &amp;quot;CreateResponse [name=&amp;quot; + this.name + &amp;quot;]&amp;quot;;
	}

}

}

spring.kafka.consumer.auto-offset-reset=earliest
CreateResponse [name=foo]

赞(3)
未经允许不得转载:工具盒子 » Kafka 消费者未从主题接收任何消息,尽管组 ID 和客户端 ID 已正确设置。