转换算子
转换算子的作用是对一条流进行转换、分流合流转换,把流变成新的流
map - 单流转换算子
一一映射,示例如下:
// 从元素种读取数据
DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Jerry", "./prod?id=100", 3000L));
// 自定义map提取user字段-lambda表达式写法
SingleOutputStreamOperator<String> result = stream.map(value -> value.user);
result.print();
env.execute();
// 匿名类写法
stream.map(new MapFunction<Event, String>() {
@Override
public String map(Event value) throws Exception {
return value.user;
}
});
与java.stream中的map方法类似,flink中的map方法也是类型转类型的方法,因此不需要像flatMap中需要Collector对流进行收集
看下里面的源码,实际调用到DataStream#doTransform
// org.apache.flink.streaming.api.datastream.DataStream#doTransform
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform =
new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream =
new SingleOutputStreamOperator(environment, resultTransform);
// 把MapFunction封装的StreamOperatorFactory暂存到算子中,在transformation属性中
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
public void addOperator(Transformation<?> transformation) {
Preconditions.checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
}
注意,这里是流包里面的实现,这里面只是将传入的MapFunction构造的StreamOperatorFactory存入到operator中,即暂存算子,还没有真正执行,因此流式处理下层必须要执行execute,不然算子本质上是不执行的
而批处理并不是这样做,批处理因为是采集一批,执行一批,因此看到批处理下面的map,是直接进行了操作的:
// org.apache.flink.api.java.DataSet#map
public <R> MapOperator<T, R> map(MapFunction<T, R> mapper) {
if (mapper == null) {
throw new NullPointerException("Map function must not be null.");
}
String callLocation = Utils.getCallLocationName();
TypeInformation<R> resultType =
TypeExtractor.getMapReturnTypes(mapper, getType(), callLocation, true);
return new MapOperator<>(this, resultType, clean(mapper), callLocation);
}
执行点在TypeInformation<R> resultType = TypeExtractor.getMapReturnTypes(mapper, getType(), callLocation, true);
跟进到TypeExtractionUtils#checkAndExtractLambda
方法大致可以看出来,是走的反射执行我们传入的MapFunction
public static LambdaExecutable checkAndExtractLambda(Function function)
throws TypeExtractionException {
try {
……
Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
replaceMethod.setAccessible(true);
Object serialVersion = replaceMethod.invoke(function);
既然刚刚提到了流式任务必须执行StreamExecutionEnvironment#execute
才能让任务真正跑起来,那就看下里面是什么
public JobExecutionResult execute() throws Exception {
return execute(getStreamGraph());
}
在getStreamGraph()
方法中其实就是把前面封装的transformations包装成流graph
public StreamGraph getStreamGraph(boolean clearTransformations) {
final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
if (clearTransformations) {
transformations.clear();
}
return streamGraph;
}
然后深入execute方法
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
// 异步处理
final JobClient jobClient = executeAsync(streamGraph);
try {
final JobExecutionResult jobExecutionResult;
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
jobExecutionResult = jobClient.getJobExecutionResult().get();
} else {
jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
}
……
根据方法名可以看出来,流式任务实际上做了一个异步线程处理,然后下面通过Future#get
进行阻塞等待
这就是流式任务并行度最终产生的底层影响
同样我们得出了一个结论,批处理是启动后直接执行的,因此一个批处理任务发布后执行一次就结束了,需要定时调度启动(spark常用),而flink流式任务则是通过异步线程实现,不断监听上游过来的消息,收到一个消息,就启动一个线程执行算子graph,从而不需要定时调度,做到实时处理
filter - 单流过滤算子
SingleOutputStreamOperator<Event> result1 = stream.filter(value -> "Mary".equals(value.user)).returns(Event.class);
流式的org.apache.flink.streaming.api.datastream.DataStream#filter
包里面逻辑和map差不多,也是先添加算子链,先不执行
而org.apache.flink.api.java.DataSet#filter
逻辑和map不太一样,是返回一个FilterOperator对象,其中有write方法,可以输出DataSink
flatMap - 扁平映射
SingleOutputStreamOperator<String> stringSingleOutputStreamOperator = stream.flatMap(new
FlatMapFunction<Event, String>() {
@Override
public void flatMap(Event value, Collector<String> out) throws Exception {
// todo: 增加转换逻辑
out.collect(xxx);
}
});
stringSingleOutputStreamOperator.print();
env.execute();
与java.stream中的flatMap逻辑也类似,是把流转流,因此需要加入一个Collector把流收集起来,转成最终结果
return - 输出类型转换算子
使用return可以进行输出类型数据转换
SingleOutputStreamOperator<User> operator = dataStreamSource.flatMap((String line, Collector<User> userCollector) -> {
ObjectMapper objectMapper = new ObjectMapper();
User user = objectMapper.readValue(line, User.class);
userCollector.collect(user);
}).returns(User.class);
这里flatMap算子里面对输出类型做限制了,不加return算子其实也可以
return方法实际调用到org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#returns
// org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#returns
public SingleOutputStreamOperator<T> returns(TypeInformation<T> typeInfo) {
requireNonNull(typeInfo, "TypeInformation must not be null");
transformation.setOutputType(typeInfo);
return this;
}
给算子链transformation加了一个输出类型,后面输出的时候可以使用
还记得在execute方法中会转换成StreamGraph
public JobExecutionResult execute() throws Exception {
return execute(getStreamGraph());
}
跟进getStreamGraph方法,一直找到org.apache.flink.streaming.api.graph.StreamGraphGenerator#generate
,调用org.apache.flink.streaming.api.graph.StreamGraphGenerator#transform
private Collection<Integer> transform(Transformation<?> transform) {
……
// call at least once to trigger exceptions about MissingTypeInfo
transform.getOutputType();
这里只是获取了一下,防止没有任何输出类型,如果没有的话,直接报错
聚合算子
用于分类、汇总
keyBy/groupBy - 按键分类
keyBy返回一个KeyedStream,用于将流按某个条件分类,要聚合,先分区,效率更高。
实际为逻辑分区,并非物理分区。实际在哪个slot处理跟keyBy无关
keyBy和groupBy的区别是一个是流式处理的算子,一个是批处理的算子
KeyedStream#sum
StreamExecutionEnvironment sEn = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> collector = sEn.fromCollection(list2);
SingleOutputStreamOperator<Tuple2<String, Long>> spitter = collector.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
String[] split = value.trim().split(" ");
for (String word : split) {
out.collect(Tuple2.of(word, 1L));
}
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> counter = spitter.keyBy(t -> t.f0).sum(1);
counter.print();
sEn.execute();
max/maxBy - 取最大
max、maxBy可以传string表示字段和int表示索引
这两个算子的用途和Math包中的max逻辑不太一样,因为现在的任务是流,并不是说来了一组数据取出其中的最大值,而是流式更新最大值,比如以下场景:
public static void maxFind() throws Exception {
List<String> list2 = Arrays.asList(
"A 2",
"B 6",
"C 10",
"D 7",
"E 2",
"A 3",
"A 11",
"A 6"
);
StreamExecutionEnvironment sEn = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> collector = sEn.fromCollection(list2);
SingleOutputStreamOperator<Tuple2<String, Long>> spitter = collector.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
String[] split = value.trim().split(" ");
long l = Long.parseLong(split[1]);
out.collect(Tuple2.of(split[0], l));
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> find = spitter.keyBy(t -> t.f0).maxBy(1);
find.print();
sEn.execute();
}
其中A数据出现了多次,值分别是2、3、11、6,流式输出的时候,每来一个A,输出一次,但A 6的时候输出还是A 11,因为11是A的最大值
max和maxBy在实际应用中的区别是:
max让对应字段取最大,其他字段取第一条数据,不随对应字段改变,即认为数据只对对应字段做操作,其他字段都不管了;max适合的场景:实时更新某只股票最高价
maxBy让对应字段取最大,其他字段也自然取到最大的对应字段对应的其他内容,即让数据相当于一个整体来取值;maxBy适合的场景:输出某只股票最高价时发生的时间点等场景
reduce - 规约聚合
T reduce(T value1, T value2) throws Exception;
value1和value2是要按某种算法规约的两个数据,输出一个规约的结果
流不断进入时,value1不断变成规约结果,再去跟其他数据继续规约
规约过程中,状态state会被保存
基于上面keyBy的案例,处理:
SingleOutputStreamOperator<Tuple2<String, Long>> find = spitter.keyBy(t -> t.f0).reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
return new Tuple2<>(value1.f0 + "+" + value2.f0, value1.f1+ value2.f1);
}
});
最终A对应的结果是(A+A+A+A,22)
评论区