Process方法和ProcessFunction
在 Apache Flink 中,ProcessFunction
是一个低层次、功能强大的流处理 API,它提供了对数据流处理的细粒度控制。它是构建更复杂流处理逻辑的基础工具,主要作用包括:
复杂事件处理:例如,需要跨多个事件进行状态跟踪(如会话窗口)
自定义窗口:当内置窗口不能满足需求时,可以用定时器实现自定义窗口逻辑。
超时检测:例如,在订单支付流程中,如果30分钟内未支付则取消订单。
分流:将流拆分成多个流。
分流
在flink1.13之前,使用split方法进行分流,而在后续版本中,使用ProcessFunction的测输出流功能进行分流
假设一个场景:处理数据,判断结果为奇数还是偶数,对奇数和偶数进行分流,做不同的sink
public class SideOutputExample {
public static void main(String[] args) throws Exception {
// 设置执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建一个数据流,包含一些整数
DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 定义两个OutputTag,分别用于奇数和偶数
OutputTag<Integer> oddOutputTag = new OutputTag<Integer>("odd", TypeInformation.of(Integer.class));
OutputTag<Integer> evenOutputTag = new OutputTag<Integer>("even", TypeInformation.of(Integer.class));
// 使用ProcessFunction处理数据,并将数据发送到不同的侧输出
SingleOutputStreamOperator<Integer> mainDataStream = dataStream.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
if (value % 2 == 0) {
// 发送到偶数流
ctx.output(evenOutputTag, value);
} else {
// 发送到奇数流
ctx.output(oddOutputTag, value);
}
// 主输出可以什么都不发送,或者发送一些其他数据
// 这里我们主输出不发送任何数据,所以主数据流是空的
}
});
// 从主数据流中获取侧输出流
DataStream<Integer> oddStream = mainDataStream.getSideOutput(oddOutputTag);
DataStream<Integer> evenStream = mainDataStream.getSideOutput(evenOutputTag);
// 打印奇数流
oddStream.print("奇数流");
// 打印偶数流
evenStream.print("偶数流");
env.execute("Side Output Example");
}
}
这里核心的逻辑是下面这部分
SingleOutputStreamOperator<Integer> mainDataStream = dataStream.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
if (value % 2 == 0) {
// 发送到偶数流
ctx.output(evenOutputTag, value);
} else {
// 发送到奇数流
ctx.output(oddOutputTag, value);
}
// 主数据流是空的
}
});
将SingleOutputStreamOperator执行process方法,经过处理变成多流
基于getSideOutput方法获取多流,分别处理
DataStream<Integer> oddStream = mainDataStream.getSideOutput(oddOutputTag);
DataStream<Integer> evenStream = mainDataStream.getSideOutput(evenOutputTag);
评论区