51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

java kafka 测试

java kafka 测试

新建springboot项目 : 连接

源码:

https://github.com/chaoren399/kafka-demo.git
  1. pom.xml文件添加kafka依赖

     	<dependency>
     		<groupId>org.apache.kafka</groupId>
     		<artifactId>kafka-clients</artifactId>
    
     	</dependency>
    

生产消息

新建类:

KafkaProducerDemo

package com.example.sprintbootdemo;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Scanner;

public class KafkaProducerDemo {

    public static void main(String[] args) throws InterruptedException {
        Scanner sc = new Scanner(System.in);
        Producer<String, Object> producer = createProducer();
        String mess = null;
        boolean flag = true;
        while (flag) {
            System.out.print("生产消息(输入exit 退出): ");
            mess = sc.nextLine();
            if (mess != null && (flag = (!"exit".equals(mess)))) {
                producer.send(new ProducerRecord<>("test", mess));
                Thread.sleep(1000);
                System.out.println("send message success....");
            }
        }

    }

    /**
     * 创建生产者
     *
     * @return
     */
    public static Producer<String, Object> createProducer() {
        // 使用生产者配置
        Properties properties = buildProducerProperties();
        // 创建生产者对象
        KafkaProducer<String, Object> producer = new KafkaProducer<>(properties);


        return producer;
    }


    /**
     * 构建生产者配置
     *
     * @return
     */
    public static Properties buildProducerProperties() {
        Properties properties = new Properties();
//        properties.put("bootstrap.servers", "127.0.0.1:9092");
//        properties.put("bootstrap.servers", "123.57.143.145:32008");
        properties.put("bootstrap.servers", "123.57.143.145:31002");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        return properties;
    }


}

消费消息

package com.example.sprintbootdemo;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerDemo {

    public static void main(String[] args) {

        //创建消费者, 并消费消息
        Consumer<String, Object> consumer = createConsumer();
        while (true) {
            ConsumerRecords<String, Object> records = consumer.poll(100);
            for (ConsumerRecord<String, Object> record : records) {
                System.out.println("消费消息: " + record.value());
            }
        }
    }

    /**
     * 创建消费者
     *
     * @return
     */
    public static Consumer<String, Object> createConsumer() {
        // 使用消费者配置
        Properties properties = buildConsumerProperties();
        // 创建消费者对象
        Consumer<String, Object> consumer = new KafkaConsumer<>(properties);
        // 订阅 test 主题
        consumer.subscribe(Arrays.asList("test"));
        // 返回消费者
        return consumer;
    }

    /**
     * 构建消费者配置
     *
     * @return
     */
    public static Properties buildConsumerProperties() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        properties.put("group.id", "test");
        properties.put("enable.auto.commit", false);
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        return properties;
    }
}

然后运行: 生产者和消费者 demo

在生产者中输入 消息

正常是在消费端可以展示生产者生产的数据

正常是在消费端可以展示生产者生产的数据

赞(7)
未经允许不得转载:工具盒子 » java kafka 测试