获取执行环境
getExecutionEnvironment
// 批处理环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 流处理环境
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
根据当前环境选择合适的方法,分为批处理和流式处理两种,以StreamExecutionEnvironment为例
// org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#getExecutionEnvironment
public static StreamExecutionEnvironment getExecutionEnvironment() {
return getExecutionEnvironment(new Configuration());
}
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(factory -> factory.createExecutionEnvironment(configuration))
.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}
其中threadLocalContextEnvironmentFactory和contextEnvironmentFactory变量并不做默认的初始化:
public class StreamExecutionEnvironment {
……
/**
* The environment of the context (local by default, cluster if invoked through command line).
*/
private static StreamExecutionEnvironmentFactory contextEnvironmentFactory = null;
/** The ThreadLocal used to store {@link StreamExecutionEnvironmentFactory}. */
private static final ThreadLocal<StreamExecutionEnvironmentFactory>
threadLocalContextEnvironmentFactory = new ThreadLocal<>();
……
}
这样,执行Utils#resolveFactory
时返回的就是Optional.EMPTY
而Optional.EMPTY
执行map方法同样以Optional.EMPTY
返回,这样就触发了下面orElseGet中的StreamExecutionEnvironment.createLocalEnvironment(configuration)
逻辑
public static LocalStreamEnvironment createLocalEnvironment(Configuration configuration) {
if (configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).isPresent()) {
return new LocalStreamEnvironment(configuration);
} else {
Configuration copyOfConfiguration = new Configuration();
copyOfConfiguration.addAll(configuration);
copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, defaultLocalParallelism);
return new LocalStreamEnvironment(copyOfConfiguration);
}
}
可以看到这里设置了Local环境的并行度
当然也可以设置Local环境后再另行通过env配置对应的环境信息
env.setParallelism(……);
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(……));
如果选择了ExecutionEnvironment创建批处理环境,就无法提交流处理任务了,但如果使用StreamExecutionEnvironment创建流处理环境,还是可以指定批处理模式,可以通过打包提交作业时指定流式还是批处理
bin/flink run -Dexecution.runtime-mode=BATCH/STREAMING …
也可以在env使用代码设置
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
源算子
读取数据的几种源
基于文件
// 读取文件
DataStreamSource<String> dataStreamSource1 = streamExecutionEnvironment.readTextFile("input/word.txt");
基于集合
// 读取集合
ArrayList<Integer> nums = new ArrayList<>();
nums.add(1);
nums.add(2);
nums.add(3);
DataStreamSource<Integer> dataStreamSource2 = streamExecutionEnvironment.fromCollection(nums);
基于实际对象读取
// 从元素读取数据,传入实际对象POJO
DataStreamSource<Object> dataStreamSource3 = streamExecutionEnvironment.fromElements(
new Object(),
new Object()
);
基于socket读取
// 从socket读取数据
DataStreamSource<String> dataStreamSource4 = streamExecutionEnvironment.socketTextStream("hadoop102", 1002);
基于kafka读取
// 从kafka读取
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092");
DataStreamSource<String> kafkaDataStream = streamExecutionEnvironment.addSource(new FlinkKafkaConsumer<String>("topic", new DeserializationSchema<String>() {
@Override
public String deserialize(byte[] message) throws IOException {
return null;
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public TypeInformation<String> getProducedType() {
return null;
}
}, properties));
flinkKafakConsumer类三个参数包括topic、反序列化函数schema(实现deserialize、isEndOfStream、getProducedType方法)、属性设置。
自定义数据连接器
如果这些都不够用的,也支持自定义连接器
public class MySourceFunction implements SourceFunction<String> {
// 声明标志位
private Boolean running = true;
@Override
public void run(SourceContext ctx) throws Exception {
Random random = new Random();
String[] users = {"Mary", "Alice", "Bob"};
String[] addresss = {"Nanjing", "Beijing"};
while (running) {
String user = users[random.nextInt(users.length)];
String address = addresss[random.nextInt(addresss.length)];
ctx.collect(user + address);
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
running = false;
}
}
run方法类似循环给数据流,cancel方法接收一个标志位,如果为false,中止数据流
也可以自定义并行流数据源,即实现ParallelSourceFunction作为sourceFunction,仍然要实现run和cancel方法。可以调用.setParallelism方法设置并行度。
Flink支持的数据类型
自定义类型:必须是public的,必须有一个空参构造,属性必须是public的
TypeInformation:是Flink中用于表达类型信息的工具,其中有TypeSerializer序列化器
TypeHint:捕获泛型类型的工具,解决泛型擦除问题
BasicTypeInfo:是在java基本数据类型基础上包装的数据类型
java复合类型的包装类:
ObjectArrayTypeInfo:对象数组的包装类
TupleTypeInfo:二元组类型
PojoTypeInfo,自定义的数据类型,即A自定义类型
RowTypeInfo,行类型,对应关系型数据库
EnumTypeInfo、ListTypeInfo、MapTypeInfo等
这些类型的使用,在map算子中就可以看到:
FlatMapOperator<String, Tuple2<String, Long>> wordAndNum = stringDataSource
// 2.2 FlatMapFunction的返回值是<T, R>,T代表input数据单元的type,R代表return数据type
// 在案例中,数据单元即数据行String,输出使用了flink中的元组tuple2类型
// 但输出时要使用Collector进行数据封装收集得到collector收集器
.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
// 2.3 编写处理逻辑:对String数据行调用split方法切割得到String数组,
// 遍历数据,让每个单词通过收集器collector收集成tuple2二元组,index0是单词,index1是1L,代表出现一次。
// 对FlatMap处理完的数据调用returns方法,参数是返回类型,得到的是FlatMapOperator计算器结果对象
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
flatMap算子返回的是Collector封装的数据收集器,然后经过收集,变成最终的类型,而Types类中有一些预置的基本类型的,比如上面这个案例
在这个案例中,因为算子中自定义的方法就是返回Tuple2<String, Long>
,因此在return方法中就使用了Types.TUPLE(Types.STRING, Types.LONG)
如果返回的是自定义的类型,就需要自己再写下
public static void stringToDto() throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
List<String> list = Arrays.asList(
"{\"id\":\"111\",\"name\":\"name\",\"email\":\"123@qq.com\",\"phone\":\"129340124\"}",
"{\"id\":\"112\",\"name\":\"name1\",\"email\":\"123@qq.com1\",\"phone\":\"1293401241\"}",
"{\"id\":\"113\",\"name\":\"name2\",\"email\":\"123@qq.com2\",\"phone\":\"1293401242\"}"
);
DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.fromCollection(list);
SingleOutputStreamOperator<User> operator = dataStreamSource.flatMap((String line, Collector<User> userCollector) -> {
ObjectMapper objectMapper = new ObjectMapper();
User user = objectMapper.readValue(line, User.class);
userCollector.collect(user);
}).returns(User.class);
operator.print();
streamExecutionEnvironment.execute();
}
在这个案例中,就是自定义的User对象
SingleOutputStreamOperator<User> operator = dataStreamSource.flatMap((String line, Collector<BasicTypeInfo<>> userCollector) -> {
……
}).returns(new TypeHint<User>() {});
而TypeHint和TypeInformation效果类似,都是对类型的加强,下面这种写法即使用TypeHint包装User:
SingleOutputStreamOperator<User> operator = dataStreamSource.flatMap((String line, Collector<User> userCollector) -> {
……
}).returns(new TypeHint<User>() {});
而BasicTypeInfo等包装类型则是提供对基本类型的封装支持
DataStream<Integer> result = input
.map(str -> str.length()) // Lambda 表达式
.returns(BasicTypeInfo.INT_TYPE_INFO); // 显式指定返回类型
但需要注意的是,Flink不支持泛型,因此使用了例如TypeHint等类型处理泛型问题
评论区