目录
-
- 一、自动提交offset的相关参数
- 二、消费者(自动提交 offset)代码示例
一、自动提交offset的相关参数
-
官网文档
-
参数解释
参数 描述 enable.auto.commi 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。 -
图解分析
二、消费者(自动提交 offset)代码示例
-
消费者自动提交 offset代码
// 自动提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); // 提交时间间隔 1秒 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
-
消费者自动提交 offset代码完整代码
package com.xz.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties; public class CustomConsumerAutoOffset { public static void main(String[] args) { // 配置 Properties properties = new Properties(); // 连接 bootstrap.servers properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); prope服务器托管网rties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 配置消费者组id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3"); // 自动提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); // 提交时间间隔 1秒 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); // 1 创建一个消费者 "", "hello" KafkaConsumerString, String> kafkaConsumer = new KafkaConsumer>(properties); // 2 订阅主题 first ArrayListString> topics = new ArrayList>(); topics.add("sevenTopic"); kafkaConsumer.subscribe(topics); // 3 消费数据 while (true){ ConsumerRecordsString, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecordString, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } }
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
相关推荐: 编码器-解码器 | 基于 Transformers 的编码器-解码器模型
基于 transformer 的编码器-解码器模型是 表征学习 和 模型架构 这两个领域多年研究成果的结晶。本文简要介绍了神经编码器-解码器模型的历史,更多背景知识,建议读者阅读由 Sebastion Ruder 撰写的这篇精彩 博文。此外,建议读者对 自注意…