目 录CONTENT

文章目录

apache.flink - 转换算子、聚合算子

FatFish1
2025-06-27 / 0 评论 / 0 点赞 / 2 阅读 / 0 字 / 正在检测是否收录...

转换算子

转换算子的作用是对一条流进行转换、分流合流转换,把流变成新的流

map - 单流转换算子

一一映射,示例如下:

// 从元素种读取数据
DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
        new Event("Bob", "./cart", 2000L),
        new Event("Jerry", "./prod?id=100", 3000L));

// 自定义map提取user字段-lambda表达式写法
SingleOutputStreamOperator<String> result = stream.map(value -> value.user);
result.print();
env.execute();

// 匿名类写法
stream.map(new MapFunction<Event, String>() {
    @Override
    public String map(Event value) throws Exception {
        return value.user;
    }
});

与java.stream中的map方法类似,flink中的map方法也是类型转类型的方法,因此不需要像flatMap中需要Collector对流进行收集

看下里面的源码,实际调用到DataStream#doTransform

// org.apache.flink.streaming.api.datastream.DataStream#doTransform
protected <R> SingleOutputStreamOperator<R> doTransform(
        String operatorName,
        TypeInformation<R> outTypeInfo,
        StreamOperatorFactory<R> operatorFactory) {
    // read the output type of the input Transform to coax out errors about MissingTypeInfo
    transformation.getOutputType();
    OneInputTransformation<T, R> resultTransform =
            new OneInputTransformation<>(
                    this.transformation,
                    operatorName,
                    operatorFactory,
                    outTypeInfo,
                    environment.getParallelism());
    @SuppressWarnings({"unchecked", "rawtypes"})
    SingleOutputStreamOperator<R> returnStream =
            new SingleOutputStreamOperator(environment, resultTransform);

    // 把MapFunction封装的StreamOperatorFactory暂存到算子中,在transformation属性中
    getExecutionEnvironment().addOperator(resultTransform);
    return returnStream;
}

public void addOperator(Transformation<?> transformation) {
    Preconditions.checkNotNull(transformation, "transformation must not be null.");
    this.transformations.add(transformation);
}

注意,这里是流包里面的实现,这里面只是将传入的MapFunction构造的StreamOperatorFactory存入到operator中,即暂存算子,还没有真正执行,因此流式处理下层必须要执行execute,不然算子本质上是不执行的

而批处理并不是这样做,批处理因为是采集一批,执行一批,因此看到批处理下面的map,是直接进行了操作的:

// org.apache.flink.api.java.DataSet#map
public <R> MapOperator<T, R> map(MapFunction<T, R> mapper) {
    if (mapper == null) {
        throw new NullPointerException("Map function must not be null.");
    }
    String callLocation = Utils.getCallLocationName();
    TypeInformation<R> resultType =
            TypeExtractor.getMapReturnTypes(mapper, getType(), callLocation, true);
    return new MapOperator<>(this, resultType, clean(mapper), callLocation);
}

执行点在TypeInformation<R> resultType = TypeExtractor.getMapReturnTypes(mapper, getType(), callLocation, true);

跟进到TypeExtractionUtils#checkAndExtractLambda 方法大致可以看出来,是走的反射执行我们传入的MapFunction

public static LambdaExecutable checkAndExtractLambda(Function function)
        throws TypeExtractionException {
    try {
        ……
                Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
                replaceMethod.setAccessible(true);
                Object serialVersion = replaceMethod.invoke(function);

既然刚刚提到了流式任务必须执行StreamExecutionEnvironment#execute 才能让任务真正跑起来,那就看下里面是什么

public JobExecutionResult execute() throws Exception {
    return execute(getStreamGraph());
}

getStreamGraph() 方法中其实就是把前面封装的transformations包装成流graph

public StreamGraph getStreamGraph(boolean clearTransformations) {
    final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
    if (clearTransformations) {
        transformations.clear();
    }
    return streamGraph;
}

然后深入execute方法

public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
    // 异步处理
    final JobClient jobClient = executeAsync(streamGraph);
    try {
        final JobExecutionResult jobExecutionResult;
        if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
            jobExecutionResult = jobClient.getJobExecutionResult().get();
        } else {
            jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
        }
    ……

根据方法名可以看出来,流式任务实际上做了一个异步线程处理,然后下面通过Future#get 进行阻塞等待

这就是流式任务并行度最终产生的底层影响

同样我们得出了一个结论,批处理是启动后直接执行的,因此一个批处理任务发布后执行一次就结束了,需要定时调度启动(spark常用),而flink流式任务则是通过异步线程实现,不断监听上游过来的消息,收到一个消息,就启动一个线程执行算子graph,从而不需要定时调度,做到实时处理

filter - 单流过滤算子

SingleOutputStreamOperator<Event> result1 = stream.filter(value -> "Mary".equals(value.user)).returns(Event.class);

流式的org.apache.flink.streaming.api.datastream.DataStream#filter 包里面逻辑和map差不多,也是先添加算子链,先不执行

org.apache.flink.api.java.DataSet#filter 逻辑和map不太一样,是返回一个FilterOperator对象,其中有write方法,可以输出DataSink

flatMap - 扁平映射

SingleOutputStreamOperator<String> stringSingleOutputStreamOperator = stream.flatMap(new 
FlatMapFunction<Event, String>() {
    @Override
    public void flatMap(Event value, Collector<String> out) throws Exception {
        // todo: 增加转换逻辑
        out.collect(xxx);
    }
});
stringSingleOutputStreamOperator.print();
env.execute();

与java.stream中的flatMap逻辑也类似,是把流转流,因此需要加入一个Collector把流收集起来,转成最终结果

return - 输出类型转换算子

使用return可以进行输出类型数据转换

SingleOutputStreamOperator<User> operator = dataStreamSource.flatMap((String line, Collector<User> userCollector) -> {
    ObjectMapper objectMapper = new ObjectMapper();
    User user = objectMapper.readValue(line, User.class);
    userCollector.collect(user);
}).returns(User.class);

这里flatMap算子里面对输出类型做限制了,不加return算子其实也可以

return方法实际调用到org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#returns

// org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#returns
public SingleOutputStreamOperator<T> returns(TypeInformation<T> typeInfo) {
    requireNonNull(typeInfo, "TypeInformation must not be null");
    transformation.setOutputType(typeInfo);
    return this;
}

给算子链transformation加了一个输出类型,后面输出的时候可以使用

还记得在execute方法中会转换成StreamGraph

public JobExecutionResult execute() throws Exception {
    return execute(getStreamGraph());
}

跟进getStreamGraph方法,一直找到org.apache.flink.streaming.api.graph.StreamGraphGenerator#generate ,调用org.apache.flink.streaming.api.graph.StreamGraphGenerator#transform

private Collection<Integer> transform(Transformation<?> transform) {
    ……
    // call at least once to trigger exceptions about MissingTypeInfo
    transform.getOutputType();

这里只是获取了一下,防止没有任何输出类型,如果没有的话,直接报错

聚合算子

用于分类、汇总

keyBy/groupBy - 按键分类

keyBy返回一个KeyedStream,用于将流按某个条件分类,要聚合,先分区,效率更高。

实际为逻辑分区,并非物理分区。实际在哪个slot处理跟keyBy无关

keyBy和groupBy的区别是一个是流式处理的算子,一个是批处理的算子

KeyedStream#sum

StreamExecutionEnvironment sEn = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> collector = sEn.fromCollection(list2);
SingleOutputStreamOperator<Tuple2<String, Long>> spitter = collector.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
        String[] split = value.trim().split(" ");
        for (String word : split) {
            out.collect(Tuple2.of(word, 1L));
        }
    }
});
SingleOutputStreamOperator<Tuple2<String, Long>> counter = spitter.keyBy(t -> t.f0).sum(1);
counter.print();
sEn.execute();

max/maxBy - 取最大

max、maxBy可以传string表示字段和int表示索引

这两个算子的用途和Math包中的max逻辑不太一样,因为现在的任务是流,并不是说来了一组数据取出其中的最大值,而是流式更新最大值,比如以下场景:

public static void maxFind() throws Exception {
    List<String> list2 = Arrays.asList(
            "A 2",
            "B 6",
            "C 10",
            "D 7",
            "E 2",
            "A 3",
            "A 11",
            "A 6"
    );
    StreamExecutionEnvironment sEn = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<String> collector = sEn.fromCollection(list2);
    SingleOutputStreamOperator<Tuple2<String, Long>> spitter = collector.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
            String[] split = value.trim().split(" ");
            long l = Long.parseLong(split[1]);
            out.collect(Tuple2.of(split[0], l));
        }
    });
    SingleOutputStreamOperator<Tuple2<String, Long>> find = spitter.keyBy(t -> t.f0).maxBy(1);
    find.print();
    sEn.execute();
}

其中A数据出现了多次,值分别是2、3、11、6,流式输出的时候,每来一个A,输出一次,但A 6的时候输出还是A 11,因为11是A的最大值

max和maxBy在实际应用中的区别是:

  • max让对应字段取最大,其他字段取第一条数据,不随对应字段改变,即认为数据只对对应字段做操作,其他字段都不管了;max适合的场景:实时更新某只股票最高价

  • maxBy让对应字段取最大,其他字段也自然取到最大的对应字段对应的其他内容,即让数据相当于一个整体来取值;maxBy适合的场景:输出某只股票最高价时发生的时间点等场景

reduce - 规约聚合

T reduce(T value1, T value2) throws Exception;

value1和value2是要按某种算法规约的两个数据,输出一个规约的结果

流不断进入时,value1不断变成规约结果,再去跟其他数据继续规约

规约过程中,状态state会被保存

基于上面keyBy的案例,处理:

SingleOutputStreamOperator<Tuple2<String, Long>> find = spitter.keyBy(t -> t.f0).reduce(new ReduceFunction<Tuple2<String, Long>>() {
    @Override
    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
        return new Tuple2<>(value1.f0 + "+" + value2.f0, value1.f1+ value2.f1);
    }
});

最终A对应的结果是(A+A+A+A,22)

0

评论区