编写实例
场景一、使用DataSetAPI做批处理
// 1、获取运行环境和数据源
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 1.1 DataSource类读取txt,返回的是数据流,流中的每条数据单元就是txt中的一行
DataSource<String> stringDataSource = env.readTextFile("input/word.txt");
// 2、编写transform逻辑
// 2.1 DataSet.flatMap方法是转换算子,入参是FlatMapFunction,案例中使用lambda函数代替
FlatMapOperator<String, Tuple2<String, Long>> wordAndNum = stringDataSource
// 2.2 FlatMapFunction的返回值是<T, R>,T代表input数据单元的type,R代表return数据type
// 在案例中,数据单元即数据行String,输出使用了flink中的元组tuple2类型
// 但输出时要使用Collector进行数据封装收集得到collector收集器
.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
// 2.3 编写处理逻辑:对String数据行调用split方法切割得到String数组,
// 遍历数据,让每个单词通过收集器collector收集成tuple2二元组,index0是单词,index1是1L,代表出现一次。
// 对FlatMap处理完的数据调用returns方法,参数是返回类型,得到的是FlatMapOperator计算器结果对象
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 2-1、分组
// 2-1.1 调用FlatMapOperator.groupBy方法可以对计算器结果进行分组,形参是tuple索引,返回值是UnsortedGrouping
// 但分组后的结果还不能直接展示
UnsortedGrouping<Tuple2<String, Long>> tuple2UnsortedGrouping = wordAndNum.groupBy(0);
// 2-2、聚合
// 2-2.1 调用UnsortedGrouping.sum可以对结果进行聚合,返回值是AggregateOperator,形参也是tuple索引,结果可以调用.print展示
AggregateOperator<Tuple2<String, Long>> sum = tuple2UnsortedGrouping.sum(1);
// 3、sink
sum.print();
场景二、使用DataStreamAPI做流处理
批可以视为有界的流,因此DataStreamAPI既可以做批处理也可以做流处理
// 1、创建流式执行环境和数据源
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.readTextFile("input/word.txt");
// 2、编写transform流程
SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = dataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 2-1、分组
KeyedStream<Tuple2<String, Long>, String> keyedStream = singleOutputStreamOperator.keyBy(data -> data.f0);
// 2-2、加和
keyedStream.sum(1);
// 3、挂起处理器等待后续的数据流
streamExecutionEnvironment.execute();
流处理是不中断的持续等待,因此最后是让处理器挂起继续等待
流处理的结果反映的是流的变化,因此结果是不断增长的
流处理前面带任务槽编号,代表分布式集群编号,本地用多线程模拟
并行数称并行度,不配置默认与电脑cpu核心数一致
挂起前面的计算逻辑实际上是模拟,不是真正启动
Flink作业的编码思路和关键点
程序与数据流dataflow
flink任务代码基本可以归结为source、transform、sink
数据流dataflow类似有向无环图DAG,在flink上提交的程序会被映射成dataflows,一个程序可能一个算子,产生一个dataflow
并行度parallelism
每个算子可能包含多个子任务,这些子任务在不同线程或物理机或容器中独立执行,一个特定算子的子任务的个数被成为并行度(切割多少份)。
并行度是针对每个特定算子而言的,因此可以在代码中直接设置
flatMap(xxx).setParallelism(2)
此处要区分哪个是算子,返回operator的是算子,哪些不是算子,例如keyBy
也可以在env后面设置全局并行度。算子级优先级高于全局并行度
注意:设置的并行度还会看算子实际是否可以拆分,如果代码就不能拆分了,那设大了也没有用
数据传输和算子链
one-to-one:stream维护分区和顺序,在同一个分区空间执行前后两个算子的子任务,例如source到flatMap。
本身是one-to-one操作且并行度相同的算子可以合并成一个处理,形成一个大task。这个技术叫算子链技术。
redistributing:stram分区会发生改变,每个算子的子任务一句所选择的transformation发送数据到不同的目标任务
引起重分区的情境:算子间并行度不一致、算子本身之间就有重分区操作,例如keyBy+sum(keyBy不是算子,加上sum就变成了算子),这个算子本身就有重分区的逻辑
重分区方式:hash、rebalance
执行图ExecutionGraph
Flink中有四层执行图:StreamGraph-JobGraph-ExecutionGraph-物理执行图
streamgraph是根据用户api编写的代码生成的最初图,表示程序的拓扑。
jobGraph是streamGraph优化后的图,是提交给jobManager的数据结构
executionGraph是JobGraph并行华版本,是调度层核心数据结构
物理执行图是对job调度后在各个TaskManager上面部署task后形成的图
任务槽TaskSlots
是在TaskManager上面划分的内存块,把任务放在上面运行,它们可能会在独立的线程上执行
任务共享slot:默认条件下,flink允许子任务共享slot,这样的结果是一个slot可以保存作业的整个管道
Flink任务调度逻辑
上面这些并行度、槽等逻辑都可以在算子后面直接编写,例如:
// 禁用算子链(可以在env设置全局)
.map(xx).disableChaining();
// 从当前算子开始新链
.map(xx).startNewChain();
// 设置共享组配置slot共享
.map(xx).slotSharingGroup(“1”);
注意:分到一个共享组的算子才可以共享slot,不设置全都默认是default组,只设置一个,后面的算子默认和前面设置的算子在一个组
DataStream开发思路
获取执行环境(execution environment)
读取数据源(source)
定义基于数据的转换操作(transformations)
定义计算结果的输出位置(sink)
触发程序执行(excute)
评论区