Kafka发布订阅实战


Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据,目的是通过 Hadoop 的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

Kafka 作为一种分布式消息队列被广泛应用于以下场景中:

  • 消息通讯:作为一种分布式消息传递的工具,实现高效的异步消息通讯。
  • 大数据流处理:通过批量发送消息、消息压缩等机制有效地处理大量的数据流,为大数据处理提供了支持。
  • 日志收集与处理:实现高吞吐量、低延迟的数据采集和处理,常被用于日志的采集、传输和处理。
  • 数据解耦和异步处理:通过将数据放入 Kafka 队列,不同的系统和应用可以异步地获取和处理数据,降低系统之间的耦合性。

总之,如果需要处理大量的消息数据,实现数据解耦和异步处理,或者需要实现高吞吐量的数据传输和处理等,都可以考虑引入 Kafka 。下面分别介绍如何在 Maven 工程与 Spring 工程中集成 Kafka 实现消息发布订阅。

一、Maven集成

1. 依赖导入

maven 项目中导入 kafka 的相关依赖。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.0.0</version>
</dependency>

2. 数据编码

Kafka 提供默认字符串编解码,但如果需要发送复杂的对象数据,则需要自定义编解码方式。

在实际开发中最常见的方式即通过序列化将 Java 对象转化为 Json 字符串传输,可以选择 Jackson 等框架实现数据的序列化,这里以自定义序列化实现为例。

(1) 实体类

创建一个实体类,用于模拟后续数据传输。

public class User implements Serializable {
    private String name;
    private String password;
}
(2) 自定义编码

通过实现 Serializer 类并重写 serialize() 重写数据对象编码方式。

public class EncodeKafka implements Serializer<User> {

    @Override
    public void configure(Map configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, User user) {
        byte[] bytes;
        try (
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(bos)
        ) {
            // 写入 ByteArrayOutputStream
            oos.writeObject(user);
            oos.flush();
            // ByteArrayOutputStream 导出为字符数组
            bytes = bos.toByteArray();
        } catch (Exception e) {
            throw new RuntimeException();
        }
        return bytes;
    }

    @Override
    public void close() {
        System.out.println("EncodeKafka is close.");
    }
}
(3) 自定义解码

通过实现 Serializer 类并重写 Deserializer() 重写数据对象编码方式。

public class DecodeKafka implements Deserializer<User> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public User deserialize(String topic, byte[] bytes) {
        User user;
        try (
            ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
            ObjectInputStream ois = new ObjectInputStream(bis)
        ) {
            user = ois.readObject();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return user;
    }

    @Override
    public void close() {
        System.out.println("DecodeKafka is close.");
    }
}

3. 生产者

创建消息生产者 KafkaProducer 用于发送消息,相应的配置如下:

其中参数 acks 的取值范围如下:

  • 0 : 表示 Producer 不用等待任何确认信号,会一直发送消息,否则 Producer 进入等待状态。
  • 1 : 表示 Producer 等待 leader 将数据写入本地 log (不关心 follower 是否写入),然后继续发送。
  • -1 : 表示 leader 等待所有备份都成功写入日志,可以保证数据一定不会丢失。

注意生产者发送的消息在消费后仍存在并不会消失,可根据需要配置 Kafka 应用服务参数设置持久化策略,如配置 3 天自动过期,此时消息才会被删除。

参数 描述
bootstrap.servers 配置 kafka 服务地址。
acks 用于通知 broker 接收到 message 后是否向 Producer 发送确认信号。
retries 数据发送失败重试次数。
linger.ms Producer 组将会汇总任何在请求与发送之间到达的消息记录一个单独批量的请求。
batch.size Producer 批处理消息记录,以减少请求次数。
timeout.ms 等待 Followers 的确认时间,如果确认的请求数目在此时间内没有实现则会返回错误。
buffer.memory 可以用于缓存数据的内存大小。
compression.type 数据压缩类型,可选项: none、gzip、snappy。
metadata.fetch.timeout.ms 等待元数据 fetch 成功完成所需要的时间。
key.serializer Key 序列化方式。
value.serializer Value 序列化方式。

下面看一个具体的配置示例:

public static void main(String[] args) {
    final String HOST = "127.0.0.1:9092";
    final String TOPIC = "user-topic";
    final String KEY_ENCODE = "org.apache.kafka.common.serialization.StringSerializer";
    final String VALUE_ENCODE = "demo.utils.EncodeKafka";

    Properties props = new Properties();
    props.put("bootstrap.servers", HOST);
    props.put("acks", "0");
    props.put("retries", 3);
    props.put("linger.ms", 1000);
    props.put("batch.size", 16384);
    props.put("timeout.ms", 30000);
    props.put("buffer.memory", 33554432);
    props.put("compression.type", "gzip");
    props.put("metadata.fetch.timeout.ms", 30000);
    props.put("key.serializer", KEY_ENCODE);
    props.put("value.serializer", VALUE_ENCODE);

    // 根据属性创建生产者
    try (KafkaProducer<String, User> producer = new KafkaProducer<>(props)) {
        // 如果 topic 不存在,则会自动创建,默认 replication-factor 为 1,partitions为0
        int i = 0;
        while (i < 10) {
            String KEY = UUID.randomUUID().toString();
            User user = new User("User " + i, "Pwd " + i);
            producer.send(new ProducerRecord<>(TOPIC, KEY, user));
            i++;
        }
        System.out.println("Topic: " + TOPIC + ", 发送结束.");
    }
}

4. 消费者

创建消费者 KafkaConsumer 并订阅 topic 接收上述 Produce 发送的消息,相应的配置参考下表。

其中中描述的 offset 即偏移量,作用是标记当前接收的消息位置。

  • enable.auto.commit 设置为 true 则只会接收最新队列消息。
  • enable.auto.commit 设置为 false 则每次启动一个消费组进程都将会从头开始接收消息。
参数 描述
bootstrap.servers 配置 kafka 服务地址。
group.id 唯一标识 consumer 进程所在组的字符串。
enable.auto.commit Consumer 所 fetch 的消息的 offset 是否自动的同步到 zookeeper。
auto.commit.interval.ms consumer 向 zookeeper 提交 offset 的频率。
session.timeout.ms 设置会话超时时间。
key.deserializer 消息 Key 值解码方式。
value.deserializer 消息 Value 值解码方式.

下面同样提供一个具体的示例,运行下述消费者实现 Topic 订阅,此时再运行上述的生产者发送消息即可看到在消费者在控制台打印接受的信息。

public static void main(String[] args) {
    final String HOST = "127.0.0.1:9092";
    final String TOPIC = "user-topic";
    final String GROUP = "consumer";
    final String KEY_DECODE = "org.apache.kafka.common.serialization.StringDeserializer";
    final String VALUE_DECODE = "demo.utils.DecodeKafka";

    Properties props = new Properties();
    props.put("bootstrap.servers", HOST);
    props.put("group.id", GROUP);
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", KEY_DECODE);
    props.put("value.deserializer", VALUE_DECODE);

    // 根据属性创建消费者
    KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
    // 订阅 Topic
    consumer.subscribe(Arrays.asList(TOPIC));
    System.out.println("Subscribed to topic: " + TOPIC);
    // 接收订阅消息
    while (true) {
        // 每隔 100 毫秒读取一次消息队列
        ConsumerRecords<String, User> records = consumer.poll(100);
        for (ConsumerRecord<String, User> record : records) {
            User result = record.value();
            System.out.println(result.toString());
        }
    }
}  

二、Spring集成

Spring Boot 中集成 Kafka 则更为简单,下面介绍配置步骤。

1. 依赖导入

同样在 Spring Boot 项目中引入相关 Maven 依赖。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.5</version>
</dependency>

2. 配置文件

在项目的 yml 配置文件中添加 Kafka 相关配置。

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    topic: test-topic
    # 生产者
    producer:
      acks: 1
      retries: 3
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # 消费者
    consumer:
      group-id: comsumer
      enable-auto-commit: false
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # 监听器
    listener:
      concurrency: 6
      ack-mode: MANUAL

3. 接收者

创建发送者用于订阅 Topic 接收发送者信息。

public class ConsumerController {

    @KafkaListener(topics = "test_topic")
    public void listen(ConsumerRecord<?, ?> record) {
        System.out.printf("topic = %s, offset = %d, value = %s \n",
                record.topic(), record.offset(), record.value());
    }
}

4. 发送者

通过调用 KafkaTemplate 即可实现消息发送,若需要发送复杂对象,可配套 Jackson 等工具实现。

@RestController
@RequestMapping("api/kafka")
public class ProducerController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/send")
    public void send() {
        String msg = "hello world";
        String key = UUID.randomUUID().toString();
        kafkaTemplate.send("test_topic", key, msg);
    }
}

文章作者: 烽火戏诸诸诸侯
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 烽火戏诸诸诸侯 !
  目录