简介
Apache的Kafka™是一个分布式流平台(a distributed streaming platform)
有以下几个特性:(来自百度百科)
- 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
- 高吞吐量 :即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
- 支持通过Kafka服务器和消费机集群来分区消息。
- 支持Hadoop并行数据加载。
本文记录的是如何将kafka和SpringBoot整合使用,调用生产者Producer和消费者Consumer的API进行消息发送和消费,更加深入的点并未详细了解。
环境准备
代码整合之后,需要安装zookeeper作为注册中心,还有kafka服务。
我选择使用的是Centos 7服务器上进行环境配置的。
Zookeeper启动
首先去官网下载tar压缩包,选择合适的版本,然后进行解压
|
|
复制一份配置文件,文件名修改成zoo.cfg,由于是单机模式,所以使用默认的配置即可,不需要进行修改
启动zkServer服务
Kafka安装
同样需要先去官网进行下载
|
|
接着解压和修改配置文件,同样也是按照默认即可(有个spring进行连接的坑,最后讲)
|
|
启动kafka
还有一些比较常用的客户端命令,例如查询topic列表,更多可以参考这里
代码进行整合
首先加入依赖,maven与gradle都可以
在application.properties进行配置
发送消息&处理消息
发送消息需要使用到kafkaTemplate,还要指定发送的topic名称
|
|
监听消息,进行处理,使用到@KafkaListener注解
|
|
发送消息测试
正常启动之后,发送消息后,能够看到消息处理的结果
举个🌰,示例代码的执行结果如下所示:
Springboot配置简易化了许多,简单调用API就能进行消息的发送和消息处理,当然Kafka还有更强的功能和作用,可以深入去了解一下
遇到的坑
个人环境
Mac
IDEA 2017.03
SpringBoot 2.0.3.RELEASE
JDK 1.8
Centos 7 安装了 zookeeper-3.4.10 kafka_2.12
启动提示java.lang.NoSuchMethodError: org.springframework.util.Assert.state
kafka版本过高,springboot版本低不支持
原本打算引用最新的kafka依赖2.1.7.RELEASE,后来启动时报错,然后发现2.1.7版本的kafka需要依赖高版本的spring(5.0.6版本)
Group / Artifact | Version | Updates |
---|---|---|
org.springframework » spring-messaging | 5.0.6.RELEASE | 5.0.7.RELEASE |
org.springframework » spring-context | 5.0.6.RELEASE | 5.0.7.RELEASE |
org.springframework » spring-tx | 5.0.6.RELEASE | 5.0.7.RELEASE |
········ | ······· | ······· |
但是SpringBoot-2.0.3版本的spring使用的是spring-context是4.3.14,导致无法启动,所以需要调整依赖的版本号(我选择了kafka降级)
|
|
启动提示Connection to node 0 could not be established. Broker may not be available.
我是用Centos服务器搭建的kafka服务,然后使用本地环境进行连接kafka,消息订阅和发布失败,看到两篇博文,发现是解析服务器的hostname失败所导致的。
解决方法:修改kafka的server.properties配置文件
然后重新启动解决问题