目录
- Flink常用方法使用案例
-
- 1. 数据源
-
- 1.1. Socket
- 1.2. 文件
- 1.3. Kafka
- 2. 数据转换
-
- 2.1. Map
- 2.2. FlatMap
- 2.3. Filter
- 2.4. KeyBy
- 3. 数据聚合
-
- 3.1. Reduce
- 3.2. Aggregations
- 4. 数据输出
-
- 4.1. Print
- 4.2. WriteAsText
- 4.3. WriteToSocket
- 5. 执行任务
-
- 5.1. ExecutionEnvironment
- 5.2. StreamExecutionEnvironment
- 5.3. execute
- 6. 总结
Flink常用方法使用案例
本文介绍了Flink中常用的方法,并提供了相应的使用案例。
1. 数据源
1.1. Socket
从Socket中读取数据,示例代码如下:
DataStreamString> text = env.socketTextStream("localhost", 9999);
1.2. 文件
从文件中读取数据,示例代码如下:
DataStreamString> text = env.readTextFile("file:///path/to/file");
1.3. Kafka
从Kafka中读取数据,示例代码如下:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumerString> consumer = new FlinkKafkaConsumer>("topic", new SimpleStringSchema(), properties);
DataStreamString> stream = env.addSource(consumer);
2. 数据转换
2.1. Map
对数据进行Map操作,示例代码如下:
DataStreamString> text = ...;
DataStreamInteger> lengths = text.map(new MapFunctionString, Integer>() {
@Override
public Integer map(String value) throws Exception {
return value.length();
}
});
2.2. FlatMap
对数据进行FlatMap操作,示例代码如下:
DataStreamString> text = ...;
DataStreamString> words = text.flatMap(new FlatMapFunctionString, String>() {
@Override
public void flatMap(String value, CollectorString> out) throws Exception {
for (String word : value.split("s")) {
out.collect(word);
}
}
});
2.3. Filter
对数据进行Filter操作,示例代码如下:
DataStreamString> text = ...;
DataStreamString> filtered = text.filter(new FilterFunctionString>() {
@Override
public boolean filter(String value) throws Exception {
return value.startsWith("ERROR");
}
});
2.4. KeyBy
对数据进行KeyBy操作,示例代码如下:
DataStreamTuple2String, Integer>> data = ...;
KeyedStreamTuple2String, Integer>, String> keyed = data.keyBy(new KeySelectorTuple2String, Integer>, String>() {
@Override
public String getKey(Tuple2String, Integer> value) throws Exception {
return value.f0;
}
});
3. 数据聚合
3.1. Reduce
对数据进行Reduce操作,示例代码如下:
KeyedStreamTuple2String, Integer>, String> keyed = ...;
DataStreamTuple2String, Integer>> reduced = keyed.reduce(new ReduceFunctionTuple2String, Integer>>() {
@Override
public Tuple2String, Integer> reduce(Tuple2String, Integer> value1, Tuple2String, Integer> value2) throws Exception {
return new Tuple2>(value1.f0, value1.f1 + value2.f1);
}
});
3.2. Aggregations
对数据进行Aggregations操作,示例代码如下:
KeyedStreamTuple2String, Integer>, String> keyed = ...;
DataStreamTuple2String, Integer>> aggregated = keyed.sum(1);
4. 数据输出
4.1. Print
将数据输出到控制台,示例代码如下:
DataStreamString> text = ...;
text.print();
4.2. WriteAsText
将数据输出到文件,示例代码如下:
DataStreamString> text = ...;
text.writeAsText("file:///path/to/output");
4.3. WriteToSocket
将数据输出到Socket,示例代码如下:
DataStreamString> text = ...;
text.writeToSocket("localhost", 9999, new SimpleStringSchema());
5. 执行任务
5.1. ExecutionEnvironment
创建ExecutionEnvironment,示例代码如下:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
5.2. StreamExecutionEnvironment
创建StreamExecutionEnvironment,示例代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
5.3. execute
执行任务,示例代码如下:
env.execute("My Flink Job");
6. 总结
本文介绍了Flink中常用的方法,并提供了相应的使用案例。通过本文的学习,读者可以掌握Flink的基本操作,为进一步深入学习Flink打下基础。
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
Spring是如何启动的 在SpringBoot出现之前,我们使用Spring需要以配置文件的方式进行启动.如果使用XML文件配置.则通过XmlWebApplicationContext.java进行启动.常应用在Web项目的开发中. 以此为例,通过阅读源码发…