一、前言
在我们当前的互联网应用中,消息驱动已经成为一种不可或缺的模式,Kafka作为一款高性能的分布式消息系统,已经成为很多公司在消息驱动架构选择中很重要的工具。我们使用SpringBoot和Kafka快速构建消息驱动应用,应对高并发的消息处理业务。Kafka是分布式发布-订阅消息系统。主要特点是基于pull模式来处理消息消费,追求高性能、高吞吐量,完全分布式的系统。
二、zookeeper安装
Kafka需要在zookeeper环境下运行,所以我们先进行zookeeper的安装。
下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
我们选择一个版本进行下载,需要下载里面的两个文件。
下载完之后进行解压。然后我们需要把bin中的lib文件夹复制到另一个项目中去。
1.修改配置
将zoo_sample.cfg修改为zoo.cfg
然后修改这个zoo.cfg文件,如修改日志的路径等。
2.启动zookeeper
我们进入cmd到bin目录,启动zkServer.cmd。
启动成功后会出现以下的日志。
最后我们进入到bin目录启动客户端,执行以下命令:
zkCli.cmd 127.0.0.1:2181
三、kafka的安装
下载地址:https://kafka.apache.org/downloads.html
下载之后解压。
进入到config目录修改server.properties文件。
主要也是修改日志保存的位置等。
1.启动kafka
我们cmd进入到bin的windows目录下,执行以下命令。
kafka-server-start.bat ../../config/server.properties
四、SpringBoot集成
1.添加依赖
org.springframework.kafka
spring-kafka
2.application.properties配置
#kafka配置
spring.kafka.bootstrap-servers=127.0.0.1:9092
#发生错误 消息重发的次数
spring.kafka.producer.retries=0
spring.kafka.producer.buffer-memory=33554432
#键的序列化方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#值的序列化方式
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 只要集群的首领节点收到消息 生产者就会收到一个来着服务器的响应
spring.kafka.producer.acks=1
spring.kafka.consumer.auto-commit-interval=1s
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#监听器容器中运行的线程数
spring.kafka.listener.concurrency=5
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.missing-topics-fatal=false
3.topic初始化
package com.example.nettydemo.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author qx
* @date 2023/12/22
* @des Kafka配置
*/
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topic() {
// 创建一个名为topic.test的Topic并设置分区数为8 分区副本数为2
return new NewTopic("topic.test", 8, (short) 2);
}
}
4.定义一个消息发送端
package com.example.nettydemo.kafka;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* @author qx
* @date 2023/12/22
* @des kafka消息发送端
*/
@Component
@Slf4j
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void send(Object obj) {
String json = JSONObject.toJSONString(obj);
kafkaTemplate.send("topic.test", json).addCallback(new ListenableFutureCallback>() {
@Override
public void onFailure(Throwable ex) {
log.info("topic:{}生产者发送消息失败:{}", "topic.test", ex.getMessage());
}
@Override
public void onSuccess(SendResult result) {
log.info("topic:{}生产者发送消息成功:{}", "topic.test", result.getProducerRecord().value());
}
});
}
}
5.定义一个消息接收者
package com.example.nettydemo.kafka;
import lombok.extern.slf4j.Slf4j;
im服务器托管网port org.apache.kafka.c服务器托管网lients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* @author qx
* @date 2023/12/22
* @des kafka消息接收者
*/
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "topic.test", groupId = "topic.group1")
public void topicTest1(ConsumerRecord, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
System.out.println("record->value:" + record.value());
Object value = record.value();
if (value != null) {
log.info("客户端消费了:topic:[{}],message:[{}]", topic, value);
ack.acknowledge();
}
}
}
6.测试
@SpringBootTest
class NettyDemoApplicationTests {
@Autowired
private KafkaProducer kafkaProducer;
@Test
void testSend(){
kafkaProducer.send("hello world");
}
}
执行测试方法后,控制台日志显示生产者消息发送成功的日志。
我们也在控制台日志中获取到了客户端消费的日志。
这样我们就使用SpringBoot集成Kafka实现了分布式发布-订阅消息。
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
相关推荐: 2023 CCF国际AIOps挑战赛·赛题与赛制解读
本文根据本届挑战赛技术委员会主席、南开大学副教授张圣林在2023 CCF国际AIOps挑战赛宣讲会暨AIOps研讨会上题为《2023 CCF国际AIOps挑战赛·赛题与赛制》的分享整理而成,全文分为挑战赛背景介绍、题目简介、流程说明和评分规则等部分,最后简要介…