目标:搭建一个简单的流式框架,处理Pulsar中topicA的信息,处理后转发至topicB,其中:
topicA中的一条message是一行行数据组成的list
处理后的结果应该是一行数据对应一个HandleResult,也以一条message的形式下发
构建FLink项目
引入pom依赖
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.verison>1.20.0</flink.verison>
<flink.pulsar.version>4.1.0-1.20.0</flink.pulsar.version>
<flink.scala.version>1.20.0</flink.scala.version>
</properties>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.verison}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.verison}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-pulsar</artifactId>
<version>${flink.pulsar.version}</version>
<exclusions>
<exclusion>
<artifactId>flink-connector-base</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>bcpkix-jdk15on</artifactId>
<groupId>org.bouncycastle</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.20</version>
</dependency>
引入pom时,注意flink对java版本的的支持,对jdk17的zhi支持是在1.18.1+以后
因此选择flink-core、flink-client包版本为1.20.0,选择flink.pulsar版本为4.1.0-1.20.0,此外,还有对scala源码的支持,flink-scala选择1.20.0,而scala语言编译支持选择2.12.20版本
开发flink项目
主流程
public class PulsarTask {
private static final String IP = "xx.xx.xx.xx";
private static final String serviceUrlNew = "pulsar://" + IP + ":6650";
private static final String adminUrlUrlNew = "http://" + IP + ":8080";
private static final String JWT = "tokentokentokentoken....";
private static final String TOPICA = "persistent://public/my_namespace/topicA";
private static final String TOPICB = "persistent://public/my_namespace/topicB";
private static final PulsarTask pulsarTask = new PulsarTask();
static {
// 由于topic需要自行创建,这里进行类初始化检测,查看topic是否存在
init(TOPICA);
init(TOPICB);
}
public static void main(String[] args) throws Exception {
startPulsarFlinkTask();
}
private static void startPulsarFlinkTask() throws Exception {
// 由于topicA中的数据是一行一行的数据,因此可以抽象化为List<String>
TypeInformation<List<String>> originUdrListTypeInfo = TypeInformation.of(new TypeHint<List<String>>() {
});
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
PulsarSource<List<String>> pulsarSource = PulsarSource.builder()
.setAdminUrl(adminUrlUrlNew)
.setServiceUrl(serviceUrlNew)
// 认证配置,这里跟直接使用PulsarClient不同,要先选择对应的认证模式,机Token认证,后面是具体Token
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", JWT)
.setDeserializationSchema(new OMSerializer()) // pulsarSource的反序列化器
.setStartCursor(StartCursor.earliest()) // pulsarSource核心配置,决定新的订阅从哪里开始
.setTopics(TOPICA) // pulsarSource核心配置,必须订阅一个topic
.setSubscriptionName("topicA_subscription") // pulsarSource核心配置,订阅名必填
.build();
PulsarSink<List<HandleResult>> pulsarSink = PulsarSink.builder()
.setServiceUrl(serviceUrlNew)
.setAdminUrl(adminUrlUrlNew)
.setTopics(TOPICB) // pulsarSink核心配置,要发布的topic
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", JWT)
.setSerializationSchema(new OMSerializer()) // pulsarSource的序列化器
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
// watermakStrategy用于标记空闲状态
DataStreamSource<List<String>> pulsarStream = env.fromSource(pulsarSource,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)), "flink-pulsar-task");
// 必须指定一个重启策略
env.setRestartStrategy(RestartStrategies.noRestart());
// 通过map函数处理结果,返回List<HandleResult>类型
SingleOutputStreamOperator<List<HandleResult>> map = pulsarStream.map(new MapFunction<List<String>, List<HandleResult>>() {
@Override
public List<HandleResult> map(List<String> origins) throws Exception {
List<HandleResult> udrTaskRecords = pulsarTask .handlePulsarTask(origins);
return HandleResult;
}
}).returns(TypeInformation.of(new TypeHint<List<HandleResult>>() {}));
// 再次sink到pulsar
map.sinkTo(pulsarSink);
env.execute();
}
private static void init(String initTopic) {
try {
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(adminUrlUrlNew).authentication(AuthenticationFactory.token(JWT)).build();
List<String> namespaces = admin.namespaces().getNamespaces("public");
boolean billingNamespaceExists = false;
for (String namespace : namespaces) {
if (namespace.equals("public/my_namespace")) {
billingNamespaceExists = true;
}
}
if (!billingNamespaceExists) {
admin.namespaces().createNamespace("public/my_namespace");
}
boolean topicExists = false;
List<String> topics = admin.topics().getList("public/my_namespace");
for (String topic : topics) {
if (topic.startsWith(initTopic)) {
topicExists = true;
break;
}
}
if (!topicExists) {
admin.topics().createPartitionedTopic(initTopic, 3);
}
} catch (Exception e) {
e.printStackTrace();
}
}
在这个flink-pulsar框架中,核心思路就是通过pulsar接收数据,通过pulsarTask#handlePulsarTask
方法做数据处理,如何发回pulsar,需要关注的点有:
PulsarSource和PulsarSink必须要有对应的序列化和反序列化器,如果是简单String类型,可以直接使用SimpleStringSchema,但是对于本例中的List<Object>复杂类型,则使用自定义的OMSerializer进行序列化和反序列化
必须显式判定并创建对应的topic
下面看下序列化器的编码
序列化器
public class OMSerializer implements SerializationSchema<List<HandleResult>>, DeserializationSchema<List<String>> {
private ObjectMapper mapper = new ObjectMapper();
// 序列化方法,用于pulsarSink时将处理结果发送至pulsar
@Override
public byte[] serialize(List<HandleResult> handleResults) {
try {
String jsonStr = mapper.writeValueAsString(handleResults);
return jsonStr.getBytes();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
// 反序列化方法,用于处理pulsarSource获取到的数据,是字节流形式,转成想要的类型
@Override
public List<String> deserialize(byte[] bytes) throws IOException {
String input = new String(bytes, Charset.defaultCharset());
List<String> results = mapper.readValue(input, new TypeReference<List<String>>() {
});
return results;
}
// 似乎一般用不到,直接返回false
@Override
public boolean isEndOfStream(List<String> strings) {
return false;
}
// 该方法可以直接使用TypeInformation.of(new TypeHint<[type]>() {})封装
@Override
public TypeInformation<List<String>> getProducedType() {
return TypeInformation.of(new TypeHint<List<String>>() {});
}
}
flink-puslar中提供序列化器和反序列化器的接口:SerializationSchema<OUT>, DeserializationSchema<IN>
同时实现这两个接口,可以提供公用的序列化/反序列化器
另一个比较核心的就是TypeInformation和TypeHint的用法,对复杂类型进行包装,其中TypeHint用于处理泛型,而TypeInfomation则可以理解为一个盒子,专门用于把特殊类型进行收编
pulsarTask主处理类开发
public class PulsarTask {
public static final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("springcontext.xml");
public static final ExecutorService executorService = Executors.newFixedThreadPool(10);
public List<HandleResult> pulsarTask(List<String> orginUdrs) throws IOException, ParseException {
List<Future<HandleResult>> futures = new ArrayList<>();
ProcessResult processResult = new ProcessResult();
for (String orginUdr : orginUdrs) {
Future<HandleResult> result = executorService.submit(() -> handle(orginUdr));
futures.add(result);
}
for (Future<UdrTaskRecord> future : futures) {
try {
HandleResult handleResult = future.get();
if (handleResult.isSuccess()) {
processResult.getErrorResults().add(handleResult);
} else {
processResult.getNormalResults().add(handleResult);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("normal result: " + processResult.getNormalResults().size() + " error: "
+ processResult.getErrorResults().size());
}
return processResult.getNormalResults();
}
private static HandleResult handle(String orginUdr) {
// doSomething...
}
}
提升性能,使用多线程进行处理,同时引入future和结果判定
开发额外功能
添加一个PulsarClient工厂
public static class PulsarClienttFactory {
private static final String JWT = "tokentokentokentoken....";
private volatile static PulsarClient client;
public static PulsarClient createPulsarClient(String url) throws PulsarClientException, PulsarAdminException {
if (client == null) {
synchronized (PulsarClient.class) {
if (client == null) {
client = PulsarClient.builder()
.serviceUrl(url)
.ioThreads(4)
.listenerThreads(10)
.authentication(AuthenticationFactory.token(JWT))
.build();
}
}
}
return client;
}
}
}
pulsar工厂用于生成PulsarClient
添加一个publisher向TOPICA发布任务
public class ProducerTask {
private static final String IP = "xx.xx.xx.xx";
private static final String serviceUrlNew = "pulsar://" + IP + ":6650";
private static final String adminUrlUrlNew = "http://" + IP + ":8080";
private static final String JWT = "tokentokentokentoken....";
private static final String TOPICA = "persistent://public/my_namespace/topicA";
private static final String TOPICB = "persistent://public/my_namespace/topicB";
public static void main(String[] args) throws IOException, ParseException, PulsarAdminException {
startPublisher();
}
private static void startPublisher() throws IOException, ParseException, PulsarAdminException {
// 初始化一个任务集合
List<String> tasks = TaskConstructUtil.taskConstruct();
// 创建一个针对topicA的producer用于发布任务
Producer<String> producer = PulsarTask.PulsarClienttFactory.createPulsarClient(serviceUrlNew)
.newProducer(Schema.STRING).topic(TOPICA).blockIfQueueFull(true).create();
ObjectMapper objectMapper = new ObjectMapper();
Random random = new Random();
new Thread(new Runnable() {
@Override
public void run() {
// 初始化一个发布总量计数器,用于记录发布情况
int alreadySent = 0;
while (true) {
// 从任务集合中随机筛选n个任务(n <=10),做为本轮发送的任务,增强测试的随机性
int count = random.nextInt(10) + 1;
List<String> publishTasks = new ArrayList<>();
for (int i = 0; i < count; i++) {
int index = random.nextInt(tasks.size());
publishTasks.add(tasks.get(index));
}
if (udrs.size() > 0) {
try {
producer.send(objectMapper.writeValueAsString(udrs));
alreadySent += udrs.size();
System.out.println("producer send nums: " + udrs.size());
System.out.println("already sent: " + alreadySent);
// 注意发布间隔
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
}
).start();
}
}
添加一个consumer消费topicB的结果
我们再增加一个consumer用于检查flink任务处理完的结果
public class ReceiverTask {
private static final String IP = "xx.xx.xx.xx";
private static final String serviceUrlNew = "pulsar://" + IP + ":6650";
private static final String adminUrlUrlNew = "http://" + IP + ":8080";
private static final String JWT = "tokentokentokentoken....";
private static final String TOPICA = "persistent://public/my_namespace/topicA";
private static final String TOPICB = "persistent://public/my_namespace/topicB";
public static void main(String[] args) throws IOException, ParseException, PulsarAdminException {
startReceiver();
}
public static void startReceiver() throws IOException, ParseException, PulsarAdminException {
Consumer<String> consumer = PulsarTask.PulsarClienttFactory.createPulsarClient(serviceUrlNew)
.newConsumer(Schema.STRING).topic(TOPICB).subscriptionName("result_check_subscription")
.subscriptionType(SubscriptionType.Exclusive) // 设置订阅模式为最简单的独占型
.ackTimeout(30, TimeUnit.SECONDS)
.subscribe();
ObjectMapper objectMapper = new ObjectMapper();
new Thread(new Runnable() {
@Override
public void run() {
// 初始化一个消费总量计数器,用于记录发布情况
int alreadyReceived = 0;
while (true) {
Message<String> receive = null;
try {
receive = consumer.receive(100, TimeUnit.MILLISECONDS);
if (receive != null) {
String value = receive.getValue();
List<HandleResult> handleResults= objectMapper.readValue(value, new TypeReference<List<HandleResult>>() {
});
alreadyReceived += handleResults.size();
System.out.println("consumer: received value: " + value);
System.out.println("already received: " + alreadyReceived);
} else {
// 如果没有接收到任何请求,进行休眠,防止系统压力过大,如果能接收到说明有积压,就不休眠了
System.out.println("consumer: received null");
System.out.println("already received: " + alreadyReceived);
TimeUnit.SECONDS.sleep(2);
}
} catch (Exception e) {
System.out.println("consumer error : " + e.getMessage());
} finally {
if (receive != null) {
try {
// 在finally中进行ack,是为了不管是否抛异常,都把消息消费掉,这里要根据实际业务场景进行处理
consumer.acknowledge(receive);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
}
}
}
}
).start();
}
}
这样,我们完成了主流程和发布订阅者的构建,只需要依次启动flink项目、订阅者、发布者,就可以对处理数据的正确性、个数两个维度进行分析
架构搭建过程中的踩坑点
踩坑点1 - flink版本的选取
要使用与jdk版本一致的flink、pulsar、scala版本
踩坑点2 - flink项目中无法获取依赖jar中的资源
在处理任务中,我曾尝试使用spring的PathMatchingResourcePatternResolver#getResources
解读类路径,拉取依赖中的配置文件,但很遗憾,这种尝试并未成功
经过debug,我发现是由于PathMatchingResourcePatternResolver#addClassPathManifestEntries
方法有这么一段逻辑:
String javaClassPathProperty = System.getProperty("java.class.path");
for (String path : StringUtils.delimitedListToStringArray(javaClassPathProperty, File.pathSeparator)) {
...
即从环境变量中获取全部类路径
如果是正常启动java项目,这里的类路径是包含全部依赖jar中的类路径的,而flink项目启动时这里只会包含flink用户jar的路径
咨询deepseek,给出的结论为:
Flink 集群中 java.class.path
只包含单个路径的原因是 Flink 的类加载隔离机制和资源分发策略。即Flink使用单独的UserClassLoader加载类,而使用AppClassLoader做为运行环境的类加载器
想要获取到依赖中的资源,需要显式添加依赖jar,基于分布式缓存获取:
env.registerCachedFile("hdfs:///libs/secondary-lib.jar", "secondaryLib");
踩坑点3 - 高版本jdk使用KryoSerializer的问题
使用Flink提供的默认序列化器例如SimpleStringSchema,底层是基于Kryo的,而Kryo又是基于反射的
在jdk17中,实测模块化的功能对Kryo反射产生影响,会抛出java.lang.reflect.InaccessibleObjectException
异常,且暂未找到适配的Kryo版本和flink版本解决此问题
因此建议基于jackson自行开发序列化反序列化器
评论区