富含数类
map算子、Filter算子、reduce算子等都可以使用富含数类作为算子,富含数类是增强型函数接口,为数据处理逻辑提供了生命周期管理和状态访问能力。它们是构建复杂流处理应用的关键基础
富函数类一般以抽象类形式存在,例如RichMapFunction、RichFilterFunction、RichReduceFunction,通常可以获取运行环境上下文。
生命周期管理
富含数类生命周期概念:
open()方法:富含数的初始化方法,开启算子的是周期,当算子中的map()方法或filter()方法被调用之前,open()会首先被调用。适合做数据库连接、配置文件读取这类工作
close()方法:生命周期中最后一个被调用的方法,类似于解构方法,适合做一些清理工作
public class MyRichMapFunction extends RichMapFunction<String, Integer> {
// 初始化函数(每个任务执行一次)
@Override
public void open(Configuration parameters) {
// 初始化数据库连接、加载机器学习模型等
}
// 清理资源(任务结束时调用)
@Override
public void close() {
// 关闭数据库连接、释放文件句柄等
}
// 实际处理逻辑(每条记录调用)
@Override
public Integer map(String value) {
return value.length();
}
}
生命周期方法对于一个并行子任务只会调用一次
上下文获取
@Override
public void open(Configuration parameters) {
// 获取当前任务信息
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
// 访问累加器
getRuntimeContext().addAccumulator("errorCount", new IntCounter());
// 访问广播状态
MapStateDescriptor<String, Rule> ruleDescriptor = ...;
BroadcastState<String, Rule> broadcastState =
getRuntimeContext().getBroadcastState(ruleDescriptor);
}
可以直接从Flink上下文获取状态或配置
可以通过withParams方法把参数传递到富含数中:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建配置
Configuration config = new Configuration();
config.setString("model.path", "/models/v1");
// 传递配置到富函数
DataStream<Integer> result = input
.map(new MyRichMapFunction())
.withParameters(config);
Sink算子
Flink的sink算子用于数据输出,支持将数据输出到:
输出到本地文件
输出到elasticsearch
输出到HDFS
sink到kafka
sink到mysql
sink到redis
输出到本地文件
StreamingFileSink<Tuple2<String, Long>> streamingFileSink
= StreamingFileSink.<Tuple2<String, Long>>forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(1024 * 1024 * 1024)
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(15)).build()).build();
stream.addSink(streamingFileSink);
在这个方法中使用的就是addSink+SinkFunction输出到文件系统
其中StreamingFileSink实现了SinkFunction,同时还具备检查点的能力
public class StreamingFileSink<IN> extends RichSinkFunction<IN>
implements CheckpointedFunction, CheckpointListener {
输出到kafka
FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<>("localhost:9092", "events", new SimpleStringSchema());
stream.addSink(flinkKafkaProducer);
输出到redis
需要借助辅助连接器
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
其中2.11是scala版本,项目中不存在scala要求的话可以忽略
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("hadoop102").build();
stream.addSink(new RedisSink<>(config, new MyRedisMapper implements RedisTypeMapper<String>) {
// doSomething...
});
再看SinkFunction
public interface SinkFunction<IN> extends Function, Serializable {
@Deprecated
default void invoke(IN value) throws Exception {}
default void invoke(IN value, Context context) throws Exception {
invoke(value);
}
@Public
interface Context {
long currentProcessingTime();
long currentWatermark();
Long timestamp();
}
}
一般使用SinkFunction的都是基于SinkFunction的子类RichSinkFunction来重写
因为富含数类擅于生命周期管理,可以在其open、close方法中完成其连接的创建和销毁
而SinkFunction#invoke
方法则决定了SinkFunction处理数据的方式
典型的SinkFunction例如前面用过的FlinkKafkaProducer,其父类就包括RichSinkFunction
评论区