package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.*;
public class KafkaTest24 {
public static final String brokerList = "k8s-master:9092";
public static Properties getConsumerProperties() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//必须配置手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "grou服务器托管网pId");
return props;
}
public static Properties getProducerProperties() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
return props;
}
//先从source-topic消费,再往sink-topic生产
public static void main(String[] args) {
KafkaConsumer consumer = new KafkaConsumer(getConsumerProperties());
consumer.subscribe(Collections.singletonList("source-topic"));
KafkaProducer producer = new KafkaProducer(getProducerProperties());
//初始化事务
producer.initTransactions();
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
if (!records.isEmpty()) {
Map offsets = new HashMap();
//开启事务
producer.beginTransaction();
try {
for (TopicPartition partition : records.partitions()) {
List> partitionRecords = records.records(partition);
for (ConsumerRecord record : partitionRecords) {
ProducerRecord producerRecord =
new ProducerRecord("sink-topic", record.key(), record.value());
producer.send(producerRecord);
System.out.println("sent :" + record.value());
}
long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsets.put(partition, new OffsetAndMetadata(lastConsumedOffset + 1));
}
// 提交消费位移
// consume-transform-produce模式,此处的group id 必须要配置成consumer 中配置的group id
produc服务器托管网er.sendOffsetsToTransaction(offsets, "groupId");
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.abortTransaction();
}
}
}
}
}
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
2-27 在命令行窗口中启动的Python解释器中实现在Python自带的IDLE中实现 print(“Hello world”) 编码规范 每个import语句只导入一个模块,尽量避免一次导入多个模块 不要在行尾添加分号“:”,也不要用分号将两条命令放在同一…