目 录CONTENT

文章目录

apache.flink - 环境和源算子

FatFish1
2025-06-26 / 0 评论 / 0 点赞 / 3 阅读 / 0 字 / 正在检测是否收录...

获取执行环境

getExecutionEnvironment

// 批处理环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 流处理环境
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

根据当前环境选择合适的方法,分为批处理和流式处理两种,以StreamExecutionEnvironment为例

// org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#getExecutionEnvironment
public static StreamExecutionEnvironment getExecutionEnvironment() {
    return getExecutionEnvironment(new Configuration());
}

public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
    return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
            .map(factory -> factory.createExecutionEnvironment(configuration))
            .orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}

其中threadLocalContextEnvironmentFactory和contextEnvironmentFactory变量并不做默认的初始化:

public class StreamExecutionEnvironment {
    ……
    /**
     * The environment of the context (local by default, cluster if invoked through command line).
     */
    private static StreamExecutionEnvironmentFactory contextEnvironmentFactory = null;
    /** The ThreadLocal used to store {@link StreamExecutionEnvironmentFactory}. */
    private static final ThreadLocal<StreamExecutionEnvironmentFactory>
            threadLocalContextEnvironmentFactory = new ThreadLocal<>();
    ……
}

这样,执行Utils#resolveFactory 时返回的就是Optional.EMPTY

Optional.EMPTY 执行map方法同样以Optional.EMPTY返回,这样就触发了下面orElseGet中的StreamExecutionEnvironment.createLocalEnvironment(configuration) 逻辑

public static LocalStreamEnvironment createLocalEnvironment(Configuration configuration) {
    if (configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).isPresent()) {
        return new LocalStreamEnvironment(configuration);
    } else {
        Configuration copyOfConfiguration = new Configuration();
        copyOfConfiguration.addAll(configuration);
        copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, defaultLocalParallelism);
        return new LocalStreamEnvironment(copyOfConfiguration);
    }
}

可以看到这里设置了Local环境的并行度

当然也可以设置Local环境后再另行通过env配置对应的环境信息

env.setParallelism(……);
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(……));

如果选择了ExecutionEnvironment创建批处理环境,就无法提交流处理任务了,但如果使用StreamExecutionEnvironment创建流处理环境,还是可以指定批处理模式,可以通过打包提交作业时指定流式还是批处理

bin/flink run -Dexecution.runtime-mode=BATCH/STREAMING …

也可以在env使用代码设置

env.setRuntimeMode(RuntimeExecutionMode.BATCH)

源算子

读取数据的几种源

基于文件

// 读取文件
DataStreamSource<String> dataStreamSource1 = streamExecutionEnvironment.readTextFile("input/word.txt");

基于集合

// 读取集合
ArrayList<Integer> nums = new ArrayList<>();
nums.add(1);
nums.add(2);
nums.add(3);
DataStreamSource<Integer> dataStreamSource2 = streamExecutionEnvironment.fromCollection(nums);

基于实际对象读取

// 从元素读取数据,传入实际对象POJO
DataStreamSource<Object> dataStreamSource3 = streamExecutionEnvironment.fromElements(
    new Object(),
    new Object()
);

基于socket读取

// 从socket读取数据
DataStreamSource<String> dataStreamSource4 = streamExecutionEnvironment.socketTextStream("hadoop102", 1002);

基于kafka读取

// 从kafka读取
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092");
DataStreamSource<String> kafkaDataStream = streamExecutionEnvironment.addSource(new FlinkKafkaConsumer<String>("topic", new DeserializationSchema<String>() {
    @Override
    public String deserialize(byte[] message) throws IOException {
        return null;
    }
    @Override
    public boolean isEndOfStream(String nextElement) {
        return false;
    }
    @Override
    public TypeInformation<String> getProducedType() {
        return null;
    }
}, properties));

flinkKafakConsumer类三个参数包括topic、反序列化函数schema(实现deserialize、isEndOfStream、getProducedType方法)、属性设置。

自定义数据连接器

如果这些都不够用的,也支持自定义连接器

public class MySourceFunction implements SourceFunction<String> {
    // 声明标志位
    private Boolean running = true;

    @Override
    public void run(SourceContext ctx) throws Exception {
        Random random = new Random();
        String[] users = {"Mary", "Alice", "Bob"};
        String[] addresss = {"Nanjing", "Beijing"};

        while (running) {
            String user = users[random.nextInt(users.length)];
            String address = addresss[random.nextInt(addresss.length)];
            ctx.collect(user + address);
            Thread.sleep(1000L);
        }
    }

    @Override
    public void cancel() {
		running = false;
    }
}

run方法类似循环给数据流,cancel方法接收一个标志位,如果为false,中止数据流

也可以自定义并行流数据源,即实现ParallelSourceFunction作为sourceFunction,仍然要实现run和cancel方法。可以调用.setParallelism方法设置并行度。

Flink支持的数据类型

  • 自定义类型:必须是public的,必须有一个空参构造,属性必须是public的

  • TypeInformation:是Flink中用于表达类型信息的工具,其中有TypeSerializer序列化器

  • TypeHint:捕获泛型类型的工具,解决泛型擦除问题

  • BasicTypeInfo:是在java基本数据类型基础上包装的数据类型

  • java复合类型的包装类:

    • ObjectArrayTypeInfo:对象数组的包装类

    • TupleTypeInfo:二元组类型

    • PojoTypeInfo,自定义的数据类型,即A自定义类型

    • RowTypeInfo,行类型,对应关系型数据库

    • EnumTypeInfo、ListTypeInfo、MapTypeInfo等

这些类型的使用,在map算子中就可以看到:

FlatMapOperator<String, Tuple2<String, Long>> wordAndNum = stringDataSource
        // 2.2 FlatMapFunction的返回值是<T, R>,T代表input数据单元的type,R代表return数据type
        //     在案例中,数据单元即数据行String,输出使用了flink中的元组tuple2类型
        //     但输出时要使用Collector进行数据封装收集得到collector收集器
        .flatMap((String line, Collector<Tuple2<String, Long>> out) -> {

            // 2.3 编写处理逻辑:对String数据行调用split方法切割得到String数组,
            //     遍历数据,让每个单词通过收集器collector收集成tuple2二元组,index0是单词,index1是1L,代表出现一次。
            //     对FlatMap处理完的数据调用returns方法,参数是返回类型,得到的是FlatMapOperator计算器结果对象
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

flatMap算子返回的是Collector封装的数据收集器,然后经过收集,变成最终的类型,而Types类中有一些预置的基本类型的,比如上面这个案例

在这个案例中,因为算子中自定义的方法就是返回Tuple2<String, Long>,因此在return方法中就使用了Types.TUPLE(Types.STRING, Types.LONG)

如果返回的是自定义的类型,就需要自己再写下

public static void stringToDto() throws Exception {
    StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    List<String> list = Arrays.asList(
            "{\"id\":\"111\",\"name\":\"name\",\"email\":\"123@qq.com\",\"phone\":\"129340124\"}",
            "{\"id\":\"112\",\"name\":\"name1\",\"email\":\"123@qq.com1\",\"phone\":\"1293401241\"}",
            "{\"id\":\"113\",\"name\":\"name2\",\"email\":\"123@qq.com2\",\"phone\":\"1293401242\"}"
    );
    DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.fromCollection(list);
    SingleOutputStreamOperator<User> operator = dataStreamSource.flatMap((String line, Collector<User> userCollector) -> {
        ObjectMapper objectMapper = new ObjectMapper();
        User user = objectMapper.readValue(line, User.class);
        userCollector.collect(user);
    }).returns(User.class);
    operator.print();
    streamExecutionEnvironment.execute();
}

在这个案例中,就是自定义的User对象

SingleOutputStreamOperator<User> operator = dataStreamSource.flatMap((String line, Collector<BasicTypeInfo<>> userCollector) -> {
    ……
}).returns(new TypeHint<User>() {});

而TypeHint和TypeInformation效果类似,都是对类型的加强,下面这种写法即使用TypeHint包装User:

SingleOutputStreamOperator<User> operator = dataStreamSource.flatMap((String line, Collector<User> userCollector) -> {
    ……
}).returns(new TypeHint<User>() {});

而BasicTypeInfo等包装类型则是提供对基本类型的封装支持

DataStream<Integer> result = input
    .map(str -> str.length())  // Lambda 表达式
    .returns(BasicTypeInfo.INT_TYPE_INFO);  // 显式指定返回类型

但需要注意的是,Flink不支持泛型,因此使用了例如TypeHint等类型处理泛型问题

0

评论区