分流
所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
简单实现
其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。
案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。
代码实现:
public class SplitStreamByFilter {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator ds = env.socketTextStream("hadoop102", 7777)
.map(Integer::valueOf);
//将ds 分为两个流 ,一个是奇数流,一个是偶数流
//使用filter 过滤两次
SingleOutputStreamOperator ds1 = ds.filter(x -> x % 2 == 0);
SingleOutputStreamOperator ds2 = ds.filter(x -> x % 2 == 1);
ds1.print("偶数");
ds2.print("奇数");
服务器托管网
env.execute();
}
}
这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流stream复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?
使用侧输出流
关于处理函数中侧输出流的用法,我们已经在7.5节做了详细介绍。简单来说,只需要调用上下文ctx的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的id和类型。
代码实现:将WaterSensor按照Id类型进行分流。
public class SplitStreamByOutputTag {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator ds = env.socketTextStream("hadoop102", 7777)
.map(new WaterSensorMapFunction());
OutputTag s1 = new OutputTag("s1", Types.POJO(WaterSensor.class)){};
OutputTag s2 = new OutputTag("s2", Types.POJO(WaterSensor.class)){};
//返回的都是主流
SingleOutputStreamOperator ds1 = ds.process(new ProcessFunction()
{
@Override
public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
if ("s1".equals(value.getId())) {
ctx.output(s1, value);
} else if ("s2".equals(value.getId())) {
ctx.output(s2, value);
} else {
//主流
out.collect(value);
}
}
});
ds1.print("主流,非s1,s2的传感器");
服务器托管网 SideOutputDataStream s1DS = ds1.getSideOutput(s1);
SideOutputDataStream s2DS = ds1.getSideOutput(s2);
s1DS.printToErr("s1");
s2DS.printToErr("s2");
env.execute();
}
}
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net