目 录CONTENT

文章目录

apache.Flink - prcess函数

FatFish1
2025-08-06 / 0 评论 / 0 点赞 / 1 阅读 / 0 字 / 正在检测是否收录...

Process方法和ProcessFunction

在 Apache Flink 中,ProcessFunction 是一个低层次、功能强大的流处理 API,它提供了对数据流处理的细粒度控制。它是构建更复杂流处理逻辑的基础工具,主要作用包括:

  • 复杂事件处理:例如,需要跨多个事件进行状态跟踪(如会话窗口)

  • 自定义窗口:当内置窗口不能满足需求时,可以用定时器实现自定义窗口逻辑。

  • 超时检测:例如,在订单支付流程中,如果30分钟内未支付则取消订单。

  • 分流:将流拆分成多个流。

分流

在flink1.13之前,使用split方法进行分流,而在后续版本中,使用ProcessFunction的测输出流功能进行分流

假设一个场景:处理数据,判断结果为奇数还是偶数,对奇数和偶数进行分流,做不同的sink

public class SideOutputExample {
    public static void main(String[] args) throws Exception {
        // 设置执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个数据流,包含一些整数
        DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        // 定义两个OutputTag,分别用于奇数和偶数
        OutputTag<Integer> oddOutputTag = new OutputTag<Integer>("odd", TypeInformation.of(Integer.class));
        OutputTag<Integer> evenOutputTag = new OutputTag<Integer>("even", TypeInformation.of(Integer.class));

        // 使用ProcessFunction处理数据,并将数据发送到不同的侧输出
        SingleOutputStreamOperator<Integer> mainDataStream = dataStream.process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
                if (value % 2 == 0) {
                    // 发送到偶数流
                    ctx.output(evenOutputTag, value);
                } else {
                    // 发送到奇数流
                    ctx.output(oddOutputTag, value);
                }
                // 主输出可以什么都不发送,或者发送一些其他数据
                // 这里我们主输出不发送任何数据,所以主数据流是空的
            }
        });
        // 从主数据流中获取侧输出流
        DataStream<Integer> oddStream = mainDataStream.getSideOutput(oddOutputTag);
        DataStream<Integer> evenStream = mainDataStream.getSideOutput(evenOutputTag);
        // 打印奇数流
        oddStream.print("奇数流");
        // 打印偶数流
        evenStream.print("偶数流");
        env.execute("Side Output Example");
    }
}

这里核心的逻辑是下面这部分

SingleOutputStreamOperator<Integer> mainDataStream = dataStream.process(new ProcessFunction<Integer, Integer>() {
    @Override
    public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
        if (value % 2 == 0) {
            // 发送到偶数流
            ctx.output(evenOutputTag, value);
        } else {
            // 发送到奇数流
            ctx.output(oddOutputTag, value);
        }
        // 主数据流是空的
    }
});

将SingleOutputStreamOperator执行process方法,经过处理变成多流

基于getSideOutput方法获取多流,分别处理

DataStream<Integer> oddStream = mainDataStream.getSideOutput(oddOutputTag);
DataStream<Integer> evenStream = mainDataStream.getSideOutput(evenOutputTag);

0

评论区