前言
上篇介绍了flink的入门程序wordcount,在项目开发过程中,最常接触的还是跟各种源头系统打交道,其中消费接收kafka中的数据是最常见的情况,而flink在1.15版本后连接kafka的依赖包发生了变化,之前的flink版本使用的依赖包是flink-connector-kafka_2.1x(后面的数字代表kafka环境的scala版本),从flink1.15版本开始引用的依赖包变为flink-connector-kafka,具体的maven配置信息如下:
提示:以下为flink1.14及以下版本maven配置:
dependency>
groupId>org.apache.flink/groupId>
artifactId>flink-connector-kafka_2.12/artifactId>
version>${flink.vesrion}/version>
/dependency>
提示:以下为flink1.15及以上版本maven配置:
dependency>
groupId>org.apache.flink/groupId>
artifactId>flink-connector-kafka/artifactId>
version>${flink.vesrion}/version>
/dependency>
一、FlinkConsumer消费kafka
FlinkConsumer使用起来感觉和普通的kafka consumer java api差不多
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class FlinkConsumerTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","cdp1:9092");
properties.setProperty("group.id","tes");
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.rest","latest");
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("event_topic",new SimpleStringSchema(),properties);
consumer.setStartFromLatest();
DataStreamString> stream = env.addSource(consumer);
stream.print();
env.execute();
}
}
二、KafkaSource消费kafka
FlinkConsumer在flink1.15版本后,已经被弃用,推出了新的消费kafka的KafkaSource,文档地址为https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/kafka/
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KafkaSourceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSourceString> source = KafkaSource.String>builder()
.setBootstrapServers("cdp1:9092")
.setGroupId("my_group")
.setTopics("event_topic")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSourceString> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka source");
kafkaDS.print();
env.execute();
}
}
总结
改用了FLink新版本的KafkaSource后,感觉代码比之前更加简洁清晰了,但具体使用原理都差不多的,在不同版本消费ka服务器托管网fka数据时,需要注意的是,容易出现版本不兼容的问题,最常见的错误:java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.poll,(可通过清理maven依赖、检查端口是否能连接,以及重启等等),今天只是简单聊了下kafkasource,其实新版本的flink中还提供了kafkasink,可以直接将接收的数据流sink到指定的位置,比如hdfs或者另外一个kafka集群,由于篇幅有限,这里就不具体展开了,后续会结合实际场景持续更新。
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
相关推荐: Taurus.MVC WebMVC 入门开发教程4:数据列表绑定List
前言: 在本篇 Taurus.MVC WebMVC 入门开发教程的第四篇文章中, 我们将学习如何实现数据列表的绑定,通过使用 List 来展示多个数据项。 我们将继续使用 Taurus.Mvc 命名空间,同时探讨如何在视图中绑定并显示一个 Model 列表。 …