消费方application.yml配置
spring:
application:
name: kafka-consumer
kafka:
consumer:
#kafka地址
bootstrap-servers: localhost:9092
#消费者id(传给Kafka服务器的ID,用于标识自己)
client-id: kafka-consumer
#消费者组id(消费者组的名称,用于标识自己所属的消费者组)
group-id: kafka-consumer-group
#提交偏移量时间隔(自动提交偏移量的时间间隔)
auto-commit-interval: 1000
#自动重置偏移量(当消费者组中没有可消费的消息时,自动重置偏移量)
# earliest:自动重置到最早的偏移量
# latest:自动重置到最新的偏移量
# none:不自动重置偏移量,需要手动提交偏移量
# exception:抛出异常,需要手动处理
auto-offset-reset: earliest
#是否开启自动提交偏移量
enable-auto-commit: true
#是否开启消费者自动重试
enable-retry: true
#消费者每次poll请求时,最多拉取的消息数量
max-poll-records: 1000
#消费者每次poll请求时,等待时间(单位:毫秒)
poll-timeout: 10000
#消费者每次poll请求时,请求超时时间(单位:毫秒)
request-timeout: 40000
#消费者每次poll请求时,请求失败重试次数
retries: 3
#服务器应该为消费者返回的最小字节数(单位:字节)
fetch-min-size: 10000bytes
#服务器应该为消费者返回的最大字节数(单位:字节)
fetch-max-size: 1000000bytes
#消费者的消费者线程数
concurrency: 4
#消费者的消费者线程池大小
pool:
max-size: 100
#消费者的消费者线程池队列大小
queue-size: 1000000
heartbeat:
#消费者心跳时间间隔(单位:毫秒)
interval: 10000
#消费者心跳超时时间(单位:毫秒)
timeout: 10000
#消费者心跳超时时间(单位:毫秒)
max-retries: 3
#消费者心跳超时时间(单位:毫秒)
backoff: 10000
key:
#消费者的key反序列化器
deserializer: org.apache.kafka.common.serialization.StringDeserializer
value:
#消费者的value反序列化器
deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
#消费者的value反序列化器配置
spring:
json:
trusted:
#消费者所在包路径
packages: com.test.kafka.consumer
pom引入Kafka的依赖包
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.client.version}</version>
<exclusions>
<exclusion>
<artifactId>connect-json</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.client.version}</version>
</dependency>
生产者application.yml配置
spring:
kafka:
#生产者配置
producer:
#kafka地址
bootstrap-servers: localhost:9092
#生产者id(传给Kafka服务器的ID,用于标识自己)
client-id: kafka-provider
#生产者组id(生产者组的名称,用于标识自己所属的生产者组)
group-id: kafka-provider-group
#key序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
#value序列化器
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#批量发送消息的大小(单位:字节)
batch-size: 131072 #128kb
#生产者缓冲区大小(单位:字节)
buffer-memory: 67108864 #64M
#重试次数
retries: 1
#acks(消息确认机制):
#0表示生产者不等待broker确认消息,只要把消息发送出去就认为消息发送成功
#all表示生产者等待所有ISR副本确认消息才认为消息发送成功 all≈-1
#1表示leader确认消息(leader副本确认消息才认为消息发送成功)
#-1表示所有ISR确认消息(Kafka中所有的副本都确认消息才认为消息发送成功)
acks: 0
#压缩类型(gzip、snappy、lz4、zstd)
compression-type: gzip #提升性能很重要
properties:
#单个请求的最大字节数(单位:字节)消息的最大大小,超过这个大小的消息会被拆分成多个消息发送,以避免单个请求过大导致的网络拥塞
max.request.size: 5242880 #5M
#发送消息的延迟时间(单位:毫秒),超过这个时间就会发送消息,以避免频繁发送消息导致的网络拥塞
linger.ms: 5