一、java代码:
package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class KafkaTest25 {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "k8s-master:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_服务器托管网BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
StreamsBuilder builder = new StreamsBuilder();
KStream source = builder.stream("streams-plaintext-input");
KTable counts = source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" ")))
.groupBy((key, value) -> value).count();
counts.toStre服务器托管网am().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
二、启动console producer:
bin/kafka-console-producer.sh --broker-list xx.xx.xx.xx:9092 --topic streams-plaintext-input
三、启动console consumer:
./kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:9092
--topic streams-wordcount-output
--from-beginning
--formatter kafka.tools.DefaultMessageFormatter
--property print.key=true
--property print.value=true
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
四、在producer端输入字符串(空格分割),看consumer输出
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
neg指令详细解释 规则: 1. neg reg (对寄存器操作) 2. neg mem(对内存操作) 作用:将目的操作数的所以数据位取反加1 影响的标志:进位标志(CF),零标志(ZF),符合标志(SF),溢出标志(OF),辅助进位标志(AF),奇偶标志(P…