DataStream API
- 主要流程:
- 获取执行环境
- 读取数据源
- 转换操作
- 输出数据
- Execute触发执行
- 获取执行环境
- 根据实际情况获取StreamExceptionEnvironment.getExecutionEnvironment(conf)
- 创建本地环境StreamExecutionEnvironment.createLocalEnvironment()
- 创建远程环境createRemoteEnvironment(“hadoop102”, 37784, “jar/1.jar”)
- 参数1:主机号
- 参数2:端口号
- 参数3:作业jar包的路径
- 获取数据源
- 简单数据源
- 从集合中读取数据env.fromCollection(集合)
- 从元素列表中获取数据env.fromElements()
- 从文件中读取数据,env.readTextFIle(路径), 已废弃
- 从端口读取数据,env.socketTextStream()
- 文件数据源
- kafka数据源
- DataGen数据源
- 自定义数据源
- 简单数据源
文件数据源
使用文件数据源前,需要先添加相关依赖
dependency>
groupId>org.apache.flinkgroupId>
artifactId>flink-connector-filesartifactId>
version>${flink.version}version>
scope>providedscope>
dependency>
public class Flink02_FileSource {
public static void main(String[] args) throw Exception {
//1.创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认是最大并行度
env.setParallelism(1);
//file source
FileSource.FileSour服务器托管网ceBuilderString> fileSourceBuilder = FileSource
.String>forRecordStreamFormat(new TextLineInputFormat("utf-8"), new Path("input/word.txt"));
FileSourceString> fileSource = fileSourceBuilder.build();
//source 算子
DataStreamSourceString> ds = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");
ds.print();
env.execute();
}
}
DataGen数据源
主要用于生成模拟数据,也需要导入相关依赖
dependency>
groupId>org.apache.flinkgroupId>
artifactId>flink-connector-datagenartifactId>
version>${flink.version}version>
scope>compilescope>
dependency>
public class Flink04_DataGenSource {
public static void main(String[] args) {
//1.创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认是最大并行度
env.setParallelism(1);
DataGeneratorSourceString> dataGeneratorSource = new DataGeneratorSource>(
new GeneratorFunctionLong, String>() {
@Override
public String map(Long value) throws Exception {
return UUID.randomUUID() + "->" + value;
}
},
100,
RateLimiterStrategy.perSecond(1),
Types.STRING
);
DataStreamSourceString> dataGenDs = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGenDs");
dataGenDs.print();
try {
env.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Kafka消费者
-
消费方式:拉取
-
消费者对象:KafkaConsumenr
-
消费原则:
一个主题的一个分区只能被一个消费者组中的一个消费者消费
一个消费者组中的一个消费者可以消费一个主题中的多个分区 -
消费者相关的参数:
- key.deserializer 反序列化
- value.deserializer
- bootstrap.servers 集群的位置
- group.id 消费者组id (为何分组,方便同一组的消费者进行断点续传)
- auto.commit.interval.ms 自动提交间隔 默认5s
- enable.auto.commit: 开启自动提交offset偏移量
- auto.offset.reset: 当offset不存在时,offset重置,默认是最末尾的位置
- ①新的消费者组,之前没有消费过,没有记录的offset
- ②当前要消费的offset在kafka中已经不存在,可能是因为时间久了,对应的数据清理掉了
- 重置策略:
- earliest: 头,能消费到分区中现有的数据
- latest: 尾,只能消费到分区中新来的数据
- isolation.level:事务隔离级别
- 读未提交
- 读已提交
-
消费数据存在的问题
- 漏消费,导致数据丢失
- 重复消费,导致数据重复服务器托管网
-
shell 创建生产者对象:kafka-console-producer.sh –bootstrap-server hadoop102:9092 –topic first
public class Flink03_KafkaSource {
public static void main(String[] args) {
//1.创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认是最大并行度
env.setParallelism(1);
KafkaSourceString> stringKafkaSource = KafkaSource.String>builder()
.setBootstrapServers("hadoop102:9092,hadoop103:9092")
.setGroupId("flink")
.setTopics("first")
//优先使用消费者组记录的Offset进行消费,如果offset不存在,根据策略进行重置
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
//如果还有别的配置需要指定,统一使用通用方法
// .setProperty("isolation.level", "read_committed")
.build();
DataStreamSourceString> kafkaDS = env.fromSource(stringKafkaSource, WatermarkStrategy.noWatermarks(), "kafkaDS");
kafkaDS.print();
try {
env.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
相关推荐: 准确!!!在 CentOS 8 上配置 PostgreSQL 14 的主从复制
在 CentOS 8 上配置 PostgreSQL 14 的主从复制,并设置 WAL 归档到特定路径 /home/postgres/archive 的步骤如下: 主服务器配置(主机) 配置 PostgreSQL: 编辑 postgresql.conf 文件: …