目 录CONTENT

文章目录

apache.Flink - 富含数类、sink算子

FatFish1
2025-07-07 / 0 评论 / 0 点赞 / 0 阅读 / 0 字 / 正在检测是否收录...

富含数类

map算子、Filter算子、reduce算子等都可以使用富含数类作为算子,富含数类是增强型函数接口,为数据处理逻辑提供了生命周期管理和状态访问能力。它们是构建复杂流处理应用的关键基础

富函数类一般以抽象类形式存在,例如RichMapFunction、RichFilterFunction、RichReduceFunction,通常可以获取运行环境上下文。

生命周期管理

富含数类生命周期概念:

  • open()方法:富含数的初始化方法,开启算子的是周期,当算子中的map()方法或filter()方法被调用之前,open()会首先被调用。适合做数据库连接、配置文件读取这类工作

  • close()方法:生命周期中最后一个被调用的方法,类似于解构方法,适合做一些清理工作

public class MyRichMapFunction extends RichMapFunction<String, Integer> {
    
    // 初始化函数(每个任务执行一次)
    @Override
    public void open(Configuration parameters) {
        // 初始化数据库连接、加载机器学习模型等
    }
    
    // 清理资源(任务结束时调用)
    @Override
    public void close() {
        // 关闭数据库连接、释放文件句柄等
    }
    
    // 实际处理逻辑(每条记录调用)
    @Override
    public Integer map(String value) {
        return value.length();
    }
}

生命周期方法对于一个并行子任务只会调用一次

上下文获取

@Override
public void open(Configuration parameters) {
    // 获取当前任务信息
    int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
    
    // 访问累加器
    getRuntimeContext().addAccumulator("errorCount", new IntCounter());
    
    // 访问广播状态
    MapStateDescriptor<String, Rule> ruleDescriptor = ...;
    BroadcastState<String, Rule> broadcastState = 
        getRuntimeContext().getBroadcastState(ruleDescriptor);
}

可以直接从Flink上下文获取状态或配置

可以通过withParams方法把参数传递到富含数中:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 创建配置
Configuration config = new Configuration();
config.setString("model.path", "/models/v1");

// 传递配置到富函数
DataStream<Integer> result = input
    .map(new MyRichMapFunction())
    .withParameters(config);

Sink算子

Flink的sink算子用于数据输出,支持将数据输出到:

  • 输出到本地文件

  • 输出到elasticsearch

  • 输出到HDFS

  • sink到kafka

  • sink到mysql

  • sink到redis

输出到本地文件

StreamingFileSink<Tuple2<String, Long>> streamingFileSink 
        = StreamingFileSink.<Tuple2<String, Long>>forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF-8"))
        .withRollingPolicy(DefaultRollingPolicy.builder()
                .withMaxPartSize(1024 * 1024 * 1024)
                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                .withInactivityInterval(TimeUnit.MINUTES.toMillis(15)).build()).build();
stream.addSink(streamingFileSink);

在这个方法中使用的就是addSink+SinkFunction输出到文件系统

其中StreamingFileSink实现了SinkFunction,同时还具备检查点的能力

public class StreamingFileSink<IN> extends RichSinkFunction<IN>
        implements CheckpointedFunction, CheckpointListener {

输出到kafka

FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<>("localhost:9092", "events", new SimpleStringSchema());
stream.addSink(flinkKafkaProducer);

输出到redis

需要借助辅助连接器

<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>

其中2.11是scala版本,项目中不存在scala要求的话可以忽略

FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("hadoop102").build();
stream.addSink(new RedisSink<>(config, new MyRedisMapper implements RedisTypeMapper<String>) {
    // doSomething...
});

再看SinkFunction

public interface SinkFunction<IN> extends Function, Serializable {
    
    @Deprecated
    default void invoke(IN value) throws Exception {}

    default void invoke(IN value, Context context) throws Exception {
        invoke(value);
    }
    
    @Public
    interface Context {
       
        long currentProcessingTime();
        
        long currentWatermark();
       
        Long timestamp();
    }
}

一般使用SinkFunction的都是基于SinkFunction的子类RichSinkFunction来重写

因为富含数类擅于生命周期管理,可以在其open、close方法中完成其连接的创建和销毁

SinkFunction#invoke方法则决定了SinkFunction处理数据的方式

典型的SinkFunction例如前面用过的FlinkKafkaProducer,其父类就包括RichSinkFunction

0

评论区