Kafka
是由 Apache
软件基金会开发的一个开源流处理平台,是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据,目的是通过 Hadoop
的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
Kafka
作为一种分布式消息队列被广泛应用于以下场景中:
- 消息通讯:作为一种分布式消息传递的工具,实现高效的异步消息通讯。
- 大数据流处理:通过批量发送消息、消息压缩等机制有效地处理大量的数据流,为大数据处理提供了支持。
- 日志收集与处理:实现高吞吐量、低延迟的数据采集和处理,常被用于日志的采集、传输和处理。
- 数据解耦和异步处理:通过将数据放入 Kafka 队列,不同的系统和应用可以异步地获取和处理数据,降低系统之间的耦合性。
总之,如果需要处理大量的消息数据,实现数据解耦和异步处理,或者需要实现高吞吐量的数据传输和处理等,都可以考虑引入 Kafka
。下面分别介绍如何在 Maven
工程与 Spring
工程中集成 Kafka
实现消息发布订阅。
一、Maven集成
1. 依赖导入
在 maven
项目中导入 kafka
的相关依赖。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>
2. 数据编码
Kafka
提供默认字符串编解码,但如果需要发送复杂的对象数据,则需要自定义编解码方式。
在实际开发中最常见的方式即通过序列化将 Java
对象转化为 Json
字符串传输,可以选择 Jackson
等框架实现数据的序列化,这里以自定义序列化实现为例。
(1) 实体类
创建一个实体类,用于模拟后续数据传输。
public class User implements Serializable {
private String name;
private String password;
}
(2) 自定义编码
通过实现 Serializer
类并重写 serialize()
重写数据对象编码方式。
public class EncodeKafka implements Serializer<User> {
@Override
public void configure(Map configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, User user) {
byte[] bytes;
try (
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)
) {
// 写入 ByteArrayOutputStream
oos.writeObject(user);
oos.flush();
// ByteArrayOutputStream 导出为字符数组
bytes = bos.toByteArray();
} catch (Exception e) {
throw new RuntimeException();
}
return bytes;
}
@Override
public void close() {
System.out.println("EncodeKafka is close.");
}
}
(3) 自定义解码
通过实现 Serializer
类并重写 Deserializer()
重写数据对象编码方式。
public class DecodeKafka implements Deserializer<User> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public User deserialize(String topic, byte[] bytes) {
User user;
try (
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bis)
) {
user = ois.readObject();
} catch (Exception e) {
throw new RuntimeException(e);
}
return user;
}
@Override
public void close() {
System.out.println("DecodeKafka is close.");
}
}
3. 生产者
创建消息生产者 KafkaProducer
用于发送消息,相应的配置如下:
其中参数
acks
的取值范围如下:
- 0 : 表示
Producer
不用等待任何确认信号,会一直发送消息,否则Producer
进入等待状态。- 1 : 表示
Producer
等待leader
将数据写入本地log
(不关心follower
是否写入),然后继续发送。- -1 : 表示
leader
等待所有备份都成功写入日志,可以保证数据一定不会丢失。
注意生产者发送的消息在消费后仍存在并不会消失,可根据需要配置 Kafka
应用服务参数设置持久化策略,如配置 3
天自动过期,此时消息才会被删除。
参数 | 描述 |
---|---|
bootstrap.servers | 配置 kafka 服务地址。 |
acks | 用于通知 broker 接收到 message 后是否向 Producer 发送确认信号。 |
retries | 数据发送失败重试次数。 |
linger.ms | Producer 组将会汇总任何在请求与发送之间到达的消息记录一个单独批量的请求。 |
batch.size | Producer 批处理消息记录,以减少请求次数。 |
timeout.ms | 等待 Followers 的确认时间,如果确认的请求数目在此时间内没有实现则会返回错误。 |
buffer.memory | 可以用于缓存数据的内存大小。 |
compression.type | 数据压缩类型,可选项: none、gzip、snappy。 |
metadata.fetch.timeout.ms | 等待元数据 fetch 成功完成所需要的时间。 |
key.serializer | Key 序列化方式。 |
value.serializer | Value 序列化方式。 |
下面看一个具体的配置示例:
注意示例中编码传输数据时使用的是上述定义的 EncodeKafka
工具类,如使用 Jackson
等工具即可将 VALUE_ENCODE
值替换为 org.apache.kafka.common.serialization.StringSerializer
。
public static void main(String[] args) {
final String HOST = "127.0.0.1:9092";
final String TOPIC = "user-topic";
final String KEY_ENCODE = "org.apache.kafka.common.serialization.StringSerializer";
final String VALUE_ENCODE = "demo.utils.EncodeKafka";
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put("acks", "0");
props.put("retries", 3);
props.put("linger.ms", 1000);
props.put("batch.size", 16384);
props.put("timeout.ms", 30000);
props.put("buffer.memory", 33554432);
props.put("compression.type", "gzip");
props.put("metadata.fetch.timeout.ms", 30000);
props.put("key.serializer", KEY_ENCODE);
props.put("value.serializer", VALUE_ENCODE);
// 根据属性创建生产者
try (KafkaProducer<String, User> producer = new KafkaProducer<>(props)) {
// 如果 topic 不存在,则会自动创建,默认 replication-factor 为 1,partitions为0
int i = 0;
while (i < 10) {
String KEY = UUID.randomUUID().toString();
User user = new User("User " + i, "Pwd " + i);
producer.send(new ProducerRecord<>(TOPIC, KEY, user));
i++;
}
System.out.println("Topic: " + TOPIC + ", 发送结束.");
}
}
4. 消费者
创建消费者 KafkaConsumer
并订阅 topic
接收上述 Produce
发送的消息,相应的配置参考下表。
其中中描述的
offset
即偏移量,作用是标记当前接收的消息位置。
- 若
enable.auto.commit
设置为true
则只会接收最新队列消息。- 若
enable.auto.commit
设置为false
则每次启动一个消费组进程都将会从头开始接收消息。
参数 | 描述 |
---|---|
bootstrap.servers | 配置 kafka 服务地址。 |
group.id | 唯一标识 consumer 进程所在组的字符串。 |
enable.auto.commit | Consumer 所 fetch 的消息的 offset 是否自动的同步到 zookeeper。 |
auto.commit.interval.ms | consumer 向 zookeeper 提交 offset 的频率。 |
session.timeout.ms | 设置会话超时时间。 |
key.deserializer | 消息 Key 值解码方式。 |
value.deserializer | 消息 Value 值解码方式. |
下面同样提供一个具体的示例,运行下述消费者实现 Topic
订阅,此时再运行上述的生产者发送消息即可看到在消费者在控制台打印接受的信息。
同理,若使用 Jackson
作为编解码则将 VALUE_DECODE
替换为 org.apache.kafka.common.serialization.StringDeserializer
。
public static void main(String[] args) {
final String HOST = "127.0.0.1:9092";
final String TOPIC = "user-topic";
final String GROUP = "consumer";
final String KEY_DECODE = "org.apache.kafka.common.serialization.StringDeserializer";
final String VALUE_DECODE = "demo.utils.DecodeKafka";
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put("group.id", GROUP);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", KEY_DECODE);
props.put("value.deserializer", VALUE_DECODE);
// 根据属性创建消费者
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
// 订阅 Topic
consumer.subscribe(Arrays.asList(TOPIC));
System.out.println("Subscribed to topic: " + TOPIC);
// 接收订阅消息
while (true) {
// 每隔 100 毫秒读取一次消息队列
ConsumerRecords<String, User> records = consumer.poll(100);
for (ConsumerRecord<String, User> record : records) {
User result = record.value();
System.out.println(result.toString());
}
}
}
二、Spring集成
在 Spring Boot
中集成 Kafka
则更为简单,下面介绍配置步骤。
1. 依赖导入
同样在 Spring Boot
项目中引入相关 Maven
依赖。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.5</version>
</dependency>
2. 配置文件
在项目的 yml
配置文件中添加 Kafka
相关配置。
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
topic: test-topic
# 生产者
producer:
acks: 1
retries: 3
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消费者
consumer:
group-id: comsumer
enable-auto-commit: false
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 监听器
listener:
concurrency: 6
ack-mode: MANUAL
3. 接收者
创建发送者用于订阅 Topic
接收发送者信息。
public class ConsumerController {
@KafkaListener(topics = "test_topic")
public void listen(ConsumerRecord<?, ?> record) {
System.out.printf("topic = %s, offset = %d, value = %s \n",
record.topic(), record.offset(), record.value());
}
}
4. 发送者
通过调用 KafkaTemplate
即可实现消息发送,若需要发送复杂对象,可配套 Jackson
等工具实现。
@RestController
@RequestMapping("api/kafka")
public class ProducerController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/send")
public void send() {
String msg = "hello world";
String key = UUID.randomUUID().toString();
kafkaTemplate.send("test_topic", key, msg);
}
}