kafka02-kafka部署及使用
文章目录
一、安装Kafka
下载地址:Apache Kafka 以kafka版本3.3.1为例
-
在windows环境下先解压下载的压缩包,kafka压缩包中包含了需要的zookeeper
-
将kafka解压,如解压后文件名为kafka并放在D盘下
-
编写两个bat,一个用于启动zookeeper,一个用于启动kafka
创建启动zookeeper的bat如zookeeper.startup.bat
|
|
创建启动kafka的bat如kafka.startup.bat
|
|
- 先启动zookeeper,再启动kafka。
二、服务配置
配置文件位于config文件夹下的server.properties,如D:\kafka\config\server.properties
|
|
三、常用命令
下面示例采用的kafka版本是:3.3.1
创建topic
创建一个名字为test的主题, 有三个分区,有两个副本:
|
|
查看topic副本信息
|
|
查看kafka当中存在的topic
|
|
删除topic
|
|
生产者生产数据
|
|
消费者消费数据
|
|
查看消费者组的偏移量
|
|
增加topic分区
|
|
特定偏移量重新消费
- 停止当前的消费者应用程序。
- 使用 Kafka 提供的命令行工具来重置消费者组的偏移量到指定的偏移量位置:
|
|
上述命令将将消费者组 my-group 在名为 test 的主题上的偏移量重置为偏移量 100。
四、Springboot中使用kafka
添加 Maven 依赖
|
|
配置kafka
在 application.yml 配置文件中添加 Kafka 配置
- 生产者配置
|
|
- 消费者配置
|
|
编写代码
- 创建生产者发送消息
|
|
上面代码生产消息分别为简单发送方式、指定partition发送方式、自定义请求头发送方式、同步发送方式(kafka默认为异步发送)以及异步发送消息添加回调的发送方式。
- 创建消费者消费消息
|
|
上面是不同消费模式的示例代码,注意有的需要修改消费者配置。
手动提交消费偏移量分为使用Consumer提交和Acknowledgment提交,它们的区别在于Acknowledgment是消费者监听方法内部调用的,它将确认当前消费的消息并提交相应的偏移量, Consumer是 Kafka 客户端提供的提交偏移量的方法,Consumer提交偏移量的方式又分为同步提交和异步提交。从颗粒度上讲Acknowledgment是每条消息被成功处理后手动调用,因此可以实现每条消息的精确提交。Consumer是提交一批消息的偏移量。你需要在适当的时机选择调用这些方法,例如在处理一批消息后、定期提交或基于特定的业务逻辑来确定提交时机。
消息传递模式分为点对点模式和发布-订阅消息模式,
在点对点消息系统中,消息持久化到一个队列中。此时有一个或多个消费者消费队列中的消息。但是一条消息只能被一个消费者消费。当一个消费者消费了队列中的这条消息后,该条数据则从消息队列中删除。
在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。
kafka使用同一个消费者组或不用消费者组消费同一个topic实现了点对点和发布-订阅模式。
五、使用中需要注意的问题
kafka如何防止消息丢失
kafka消息可能会因为网络波动、服务异常等原因造成消息丢失,防止消息丢失可以分为三点
-
生产防止数据丢失:生产者发送消息到kafka服务端可能由于网络问题或kafka服务宕机导致消息发送失败。可以将发送方式改为同步发送(默认为异步发送)获取实时发送结果,也可以使用异步发送添加回调函数监听发送结果来对发送失败的消息进行重试,并且kafka producer本身提供了重试机制,只需要配置retries即可对失败的消息自动进行重试。
-
kafka集群中的broker防止数据丢失:borker需要把producer发送的消息持久化到磁盘,由于kafka为了高性能,采用异步批量刷盘的实现机制,也就是说按照一定的消息量和时间间隔进行刷盘,而最终刷盘的动作是交给系统调度的,所以在刷盘之前系统崩溃会导致消息丢失。然而kafka并未提供同步刷盘的机制,所以我们需要借助kafka的副本机制和acks机制来确保broker写盘成功。例如将acks设置为-1(把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功)。
-
消费防止数据丢失:消费者手动提交偏移量(默认自动提交),这样即使消费者在运行过程中挂掉,也可以再此启动后重新找到该topic的offset接着消费。但是offset的信息写入的时候并不是每条消息消费完成后都写入的,所以这种情况有可能会造成重复消费,但是不会丢失消息。
kafka如何保证顺序消费
kafka只能保证partition内是有序的,但是partition间的有序是没办法的。让需要保证顺序消费的topic放到同一个分区下。
文章作者 necor 上次更新 2024-01-08