项目搭建
同样的,需要我们搭建一个maven
工程,整合非常的简单,需要用到:
org.springframework.kafka
spring-kafka
来一起看下完整的pom.xml
:
4.0.0
org.example
springboot-kafka-all
1.0-SNAPSHOT
1.8
org.springframework.boot
spring-boot-starter-parent
2.1.3.RELEASE
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
org.springframework.kafka
spring-kafka
cn.hutool
hutool-all
5.8.4
com.alibaba
fastjson
1.2.58
org.slf4j
slf4j-api
1.6.4
org.slf4j
slf4j-simple
1.7.25
compile
org.projectlombok
lombok
org.springframework.boot
spring-boot-maven-plugin
2.1.3.RELEASE
配置也很简单application.yml
server:
port:8081
spring:
kafka:
producer:
bootstrap-servers:127.0.0.1:9092
然后新建一个启动类,看下控制台是否成功链接了Kafka
,在启动之前别忘了开启Kafka集群
基本使用
先从一个简单的例子,来快速体验一下Kafka
,新建HelloController
@Slf4j
@RestController
publicclassHelloController{
privatestaticfinalStringtopic="test";
@Autowired
privateKafkaTemplate
我们通过KafkaTemplate
进行消息的发送, 通过@KafkaLi服务器托管网stener
进行消息的消费,我们可以指定消费者ID
以及监听的topic
,请求localhost:8081/hello
观察控制台的变化。请求后,发现消息发送和接收的非常快,我们也可以观察UI
后台的消息详情,同步对比
topic创建
之前我们的topic
是在UI
后台创建的,那么在SpringBoot
中如何创建呢? 下面我们试着发送一个不存在的topic
//当topic不存在时会默认创建一个topic
//num.partitions=1#默认Topic分区数
//num.replica.fetchers=1#默认副本数
@GetMapping("/hello1")
publicStringhello1(){
//发送消息
kafkaTemplate.send("hello1","hello1");
return"hello1";
}
//接收消息
@KafkaListener(id="hello1Group",topics="hello1")
publicvoidlisten1(Stringmsg){
log.info("hello1receivevalue:{}",msg);
//hello1receivevalue:hello1
}
请求之后,观察控制台以及管理后台,发现并没有报错,并且给我们自动创建了一个topic
,在自动创建下,默认的参数是:
num.partitions=1#默认Topic分区数
num.replica.fetchers=1#默认副本数
如果我想手动创建呢?我们可以通过NewTopic
来手动创建:
@Configuration
publicclassKafkaConfig{
@Bean
publicKafkaAdminadmin(KafkaPropertiesproperties){
KafkaAdminadmin=newKafkaAdmin(properties.buildAdminProperties());
//默认False,在Broker不可用时,如果你觉得Broker不可用影响正常业务需要显示的将这个值设置为True
admin.setFatalIfBrokerNotAvailable(true);
//setAutoCreate(false):默认值为True,也就是Kafka实例化后会自动创建已经实例化的NewTopic对象
//initialize():当setAutoCreate为false时,需要我们程序显示的调用admin的initialize()方法来初始化NewTopic对象
returnadmin;
}
/**
*创建指定参数的topic
*@return
*/
@Bean
publicNewTopictopic(){
returnnewNewTopic("hello2",0,(short)0);
}
}
如果要更新呢?也非常的简单
/**
*更新topic
*@return
*/
@Bean
publicNewTopictopicUpdate(){
returnnewNewTopic("hello2",1,(short)1);
}
注意这里的参数只能+
不能-
这种方式太简单了,如果我想在代码逻辑中来创建呢?我们可以通过AdminClient
来手动创建
/**
*AdminClient创建
*/
@Autowired
privateKafkaPropertiesproperties;
@GetMapping("/create/{topicName}")
publicStringcreateTopic(@PathVariableStringtopicName){
AdminClientclient=AdminClient.create(properties.buildAdminProperties());
if(client!=null){
try{
CollectionnewTopics=newArrayList(1);
newTopics.add(newNewTopic(topicName,1,(short)1));
client.createTopics(newTopics);
}catch(Throwablee){
e.printStackTrace();
}finally{
clie服务器托管网nt.close();
}
}
returntopicName;
}
观察下管理后台,发现topic
都创建成功了
获取消息发送的结果
有时候我们发送消息不知道是不是发成功了,需要有一个结果通知。有两种方式,一种是同步
一种是异步
同步获取结果
/**
*获取通知结果
*@return
*/
@GetMapping("/hello2")
publicStringhello2(){
//同步获取结果
ListenableFuture>future=kafkaTemplate.send("hello2","hello2");
try{
SendResult
异步获取
/**
*获取通知结果
*@return
*/
@GetMapping("/hello2")
publicStringhello2(){
//发送消息-异步获取通知结果
kafkaTemplate.send("hello2","asynchello2").addCallback(newListenableFutureCallback>(){
@Override
publicvoidonFailure(Throwablethrowable){
log.error("fail>>>>{}",throwable.getMessage());
}
@Override
publicvoidonSuccess(SendResult
Kafka事务
同样的,消息也会存在事务
,如果第一条消息发送成功,再发第二条消息的时候出现异常,那么就会抛出异常并回滚第一条消息,下面通过一个简单的例子体会一下
@GetMapping("/hello3")
publicStringhello3(){
kafkaTemplate.executeInTransaction(t->{
t.send("hello3","msg1");
if(true)
thrownewRuntimeException("failed");
t.send("hello3","msg2");
returntrue;
});
return"hello3";
}
//接收消息
@KafkaListener(id="hello3Group",topics="hello3")
publicvoidlisten3(Stringmsg){
log.info("hello3receivevalue:{}",msg);
}
默认情况下,Spring-kafka
自动生成的KafkaTemplate
实例,是不具有事务消息发送能力的。我们需要添加transaction-id-prefix
来激活它
spring:
kafka:
producer:
bootstrap-servers:127.0.0.1:9092
transaction-id-prefix:kafka_.
启动之后,观察控制台的变化~ ,除此之外,还可以使用注解的方式@Transactional
来开启事务
//注解方式
@Transactional(rollbackFor=RuntimeException.class)
@GetMapping("/hello4")
publicStringhello4(){
kafkaTemplate.send("hello3","msg1");
if(true)
thrownewRuntimeException("failed");
kafkaTemplate.send("hello3","msg2");
return"hello4";
}
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
兴趣是最好的老师,HelloGitHub 让你对编程感兴趣! 简介 HelloGitHub 分享 GitHub 上有趣、入门级的开源项目。 https://github.com/521xueweihan/HelloGitHub 这里有实战项目、入门教程、黑科技…