目 录CONTENT

文章目录

【实践】pulsar+flink流式架构

FatFish1
2025-07-07 / 0 评论 / 0 点赞 / 0 阅读 / 0 字 / 正在检测是否收录...

目标:搭建一个简单的流式框架,处理Pulsar中topicA的信息,处理后转发至topicB,其中:

  • topicA中的一条message是一行行数据组成的list

  • 处理后的结果应该是一行数据对应一个HandleResult,也以一条message的形式下发

构建FLink项目

引入pom依赖

<properties>
    <maven.compiler.source>17</maven.compiler.source>
    <maven.compiler.target>17</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.verison>1.20.0</flink.verison>
    <flink.pulsar.version>4.1.0-1.20.0</flink.pulsar.version>
    <flink.scala.version>1.20.0</flink.scala.version>
</properties>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.verison}</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-files</artifactId>
    <version>${flink.verison}</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-pulsar</artifactId>
    <version>${flink.pulsar.version}</version>
    <exclusions>
        <exclusion>
            <artifactId>flink-connector-base</artifactId>
            <groupId>org.apache.flink</groupId>
        </exclusion>
        <exclusion>
            <artifactId>bcpkix-jdk15on</artifactId>
            <groupId>org.bouncycastle</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.12</artifactId>
    <version>${flink.scala.version}</version>
</dependency>
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.12.20</version>
</dependency>

引入pom时,注意flink对java版本的的支持,对jdk17的zhi支持是在1.18.1+以后

因此选择flink-core、flink-client包版本为1.20.0,选择flink.pulsar版本为4.1.0-1.20.0,此外,还有对scala源码的支持,flink-scala选择1.20.0,而scala语言编译支持选择2.12.20版本

开发flink项目

主流程

public class PulsarTask {

    private static final String IP = "xx.xx.xx.xx";

    private static final String serviceUrlNew = "pulsar://" + IP + ":6650";
    private static final String adminUrlUrlNew = "http://" + IP + ":8080";

    private static final String JWT = "tokentokentokentoken....";

    private static final String TOPICA = "persistent://public/my_namespace/topicA";

    private static final String TOPICB = "persistent://public/my_namespace/topicB";

    private static final PulsarTask pulsarTask = new PulsarTask();

    static {
        // 由于topic需要自行创建,这里进行类初始化检测,查看topic是否存在
        init(TOPICA);
        init(TOPICB);
    }

    public static void main(String[] args) throws Exception {
        startPulsarFlinkTask();
    }

    private static void startPulsarFlinkTask() throws Exception {
        
        // 由于topicA中的数据是一行一行的数据,因此可以抽象化为List<String>
        TypeInformation<List<String>> originUdrListTypeInfo = TypeInformation.of(new TypeHint<List<String>>() {
        });

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        PulsarSource<List<String>> pulsarSource = PulsarSource.builder()
                .setAdminUrl(adminUrlUrlNew)
                .setServiceUrl(serviceUrlNew)
                // 认证配置,这里跟直接使用PulsarClient不同,要先选择对应的认证模式,机Token认证,后面是具体Token
                .setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", JWT)
                .setDeserializationSchema(new OMSerializer())  // pulsarSource的反序列化器
                .setStartCursor(StartCursor.earliest())   // pulsarSource核心配置,决定新的订阅从哪里开始
                .setTopics(TOPICA)   // pulsarSource核心配置,必须订阅一个topic
                .setSubscriptionName("topicA_subscription")  // pulsarSource核心配置,订阅名必填
                .build();

        PulsarSink<List<HandleResult>> pulsarSink = PulsarSink.builder()
                .setServiceUrl(serviceUrlNew)
                .setAdminUrl(adminUrlUrlNew)
                .setTopics(TOPICB) // pulsarSink核心配置,要发布的topic
                .setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", JWT)
                .setSerializationSchema(new OMSerializer())  // pulsarSource的序列化器
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        // watermakStrategy用于标记空闲状态
        DataStreamSource<List<String>> pulsarStream = env.fromSource(pulsarSource, 
            WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)), "flink-pulsar-task");

        // 必须指定一个重启策略
        env.setRestartStrategy(RestartStrategies.noRestart());

        // 通过map函数处理结果,返回List<HandleResult>类型
        SingleOutputStreamOperator<List<HandleResult>> map = pulsarStream.map(new MapFunction<List<String>, List<HandleResult>>() {

            @Override
            public List<HandleResult> map(List<String> origins) throws Exception {

                List<HandleResult> udrTaskRecords = pulsarTask .handlePulsarTask(origins);
                return HandleResult;
            }
        }).returns(TypeInformation.of(new TypeHint<List<HandleResult>>() {}));

        // 再次sink到pulsar
        map.sinkTo(pulsarSink);
        env.execute();
    }

    private static void init(String initTopic) {
        try {
            PulsarAdmin admin = PulsarAdmin.builder()
                .serviceHttpUrl(adminUrlUrlNew).authentication(AuthenticationFactory.token(JWT)).build();
            List<String> namespaces = admin.namespaces().getNamespaces("public");
            boolean billingNamespaceExists = false;
            for (String namespace : namespaces) {
                if (namespace.equals("public/my_namespace")) {
                    billingNamespaceExists = true;
                }
            }
            if (!billingNamespaceExists) {
                admin.namespaces().createNamespace("public/my_namespace");
            }

            boolean topicExists = false;
            List<String> topics = admin.topics().getList("public/my_namespace");
            for (String topic : topics) {
                if (topic.startsWith(initTopic)) {
                    topicExists = true;
                    break;
                }
            }
            if (!topicExists) {
                admin.topics().createPartitionedTopic(initTopic, 3);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

在这个flink-pulsar框架中,核心思路就是通过pulsar接收数据,通过pulsarTask#handlePulsarTask 方法做数据处理,如何发回pulsar,需要关注的点有:

  • PulsarSource和PulsarSink必须要有对应的序列化和反序列化器,如果是简单String类型,可以直接使用SimpleStringSchema,但是对于本例中的List<Object>复杂类型,则使用自定义的OMSerializer进行序列化和反序列化

  • 必须显式判定并创建对应的topic

下面看下序列化器的编码

序列化器

public class OMSerializer implements SerializationSchema<List<HandleResult>>, DeserializationSchema<List<String>> {

    private ObjectMapper mapper = new ObjectMapper();

    // 序列化方法,用于pulsarSink时将处理结果发送至pulsar
    @Override
    public byte[] serialize(List<HandleResult> handleResults) {
        try {
            String jsonStr = mapper.writeValueAsString(handleResults);
            return jsonStr.getBytes();
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

     // 反序列化方法,用于处理pulsarSource获取到的数据,是字节流形式,转成想要的类型
    @Override
    public List<String> deserialize(byte[] bytes) throws IOException {
        String input = new String(bytes, Charset.defaultCharset());
        List<String> results = mapper.readValue(input, new TypeReference<List<String>>() {
        });
        return results;
    }

    // 似乎一般用不到,直接返回false
    @Override
    public boolean isEndOfStream(List<String> strings) {
        return false;
    }

    // 该方法可以直接使用TypeInformation.of(new TypeHint<[type]>() {})封装
    @Override
    public TypeInformation<List<String>> getProducedType() {
        return TypeInformation.of(new TypeHint<List<String>>() {});
    }
}

flink-puslar中提供序列化器和反序列化器的接口:SerializationSchema<OUT>, DeserializationSchema<IN>

同时实现这两个接口,可以提供公用的序列化/反序列化器

另一个比较核心的就是TypeInformation和TypeHint的用法,对复杂类型进行包装,其中TypeHint用于处理泛型,而TypeInfomation则可以理解为一个盒子,专门用于把特殊类型进行收编

pulsarTask主处理类开发

public class PulsarTask {

    public static final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("springcontext.xml");

    public static final ExecutorService executorService = Executors.newFixedThreadPool(10);

    public List<HandleResult> pulsarTask(List<String> orginUdrs) throws IOException, ParseException {
        List<Future<HandleResult>> futures = new ArrayList<>();
        ProcessResult processResult = new ProcessResult();
        for (String orginUdr : orginUdrs) {
            Future<HandleResult> result = executorService.submit(() -> handle(orginUdr));
            futures.add(result);
        }

        for (Future<UdrTaskRecord> future : futures) {
            try {
                HandleResult handleResult = future.get();
                if (handleResult.isSuccess()) {
                    processResult.getErrorResults().add(handleResult);
                } else {
                    processResult.getNormalResults().add(handleResult);
                }
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("normal result: " + processResult.getNormalResults().size() + " error: " 
                + processResult.getErrorResults().size());
        }
        return processResult.getNormalResults();
    }

    private static HandleResult handle(String orginUdr) {
        //  doSomething...
    }
}

提升性能,使用多线程进行处理,同时引入future和结果判定

开发额外功能

添加一个PulsarClient工厂

    public static class PulsarClienttFactory {

        private static final String JWT = "tokentokentokentoken....";

        private volatile static PulsarClient client;

        public static PulsarClient createPulsarClient(String url) throws PulsarClientException, PulsarAdminException {
            if (client == null) {
                synchronized (PulsarClient.class) {
                    if (client == null) {
                        client = PulsarClient.builder()
                                .serviceUrl(url)
                                .ioThreads(4)
                                .listenerThreads(10)
                                .authentication(AuthenticationFactory.token(JWT))
                                .build();
                    }
                }
            }
            return client;

        }
    }
}

pulsar工厂用于生成PulsarClient

添加一个publisher向TOPICA发布任务

public class ProducerTask {

    private static final String IP = "xx.xx.xx.xx";

    private static final String serviceUrlNew = "pulsar://" + IP + ":6650";
    private static final String adminUrlUrlNew = "http://" + IP + ":8080";

    private static final String JWT = "tokentokentokentoken....";

    private static final String TOPICA = "persistent://public/my_namespace/topicA";

    private static final String TOPICB = "persistent://public/my_namespace/topicB";

    public static void main(String[] args) throws IOException, ParseException, PulsarAdminException {
        startPublisher();
    }

    private static void startPublisher() throws IOException, ParseException, PulsarAdminException {

        // 初始化一个任务集合
        List<String> tasks = TaskConstructUtil.taskConstruct();

        // 创建一个针对topicA的producer用于发布任务
        Producer<String> producer = PulsarTask.PulsarClienttFactory.createPulsarClient(serviceUrlNew)
            .newProducer(Schema.STRING).topic(TOPICA).blockIfQueueFull(true).create();
        ObjectMapper objectMapper = new ObjectMapper();
        Random random = new Random();
        new Thread(new Runnable() {

            @Override
            public void run() {
                // 初始化一个发布总量计数器,用于记录发布情况
                int alreadySent = 0;
                while (true) {
                    // 从任务集合中随机筛选n个任务(n <=10),做为本轮发送的任务,增强测试的随机性
                    int count = random.nextInt(10) + 1;
                    List<String> publishTasks = new ArrayList<>();
                    for (int i = 0; i < count; i++) {
                        int index = random.nextInt(tasks.size());
                        publishTasks.add(tasks.get(index));
                    }
                    if (udrs.size() > 0) {
                        try {
                            producer.send(objectMapper.writeValueAsString(udrs));
                            alreadySent += udrs.size();
                            System.out.println("producer send nums: " + udrs.size());
                            System.out.println("already sent: " + alreadySent);

                            // 注意发布间隔
                            TimeUnit.SECONDS.sleep(5);
                        } catch (Exception e) {
                            throw new RuntimeException(e);

                        }
                    }
                }

            }
        }
        ).start();
    }
}

添加一个consumer消费topicB的结果

我们再增加一个consumer用于检查flink任务处理完的结果

public class ReceiverTask {

    private static final String IP = "xx.xx.xx.xx";

    private static final String serviceUrlNew = "pulsar://" + IP + ":6650";
    private static final String adminUrlUrlNew = "http://" + IP + ":8080";

    private static final String JWT = "tokentokentokentoken....";

    private static final String TOPICA = "persistent://public/my_namespace/topicA";

    private static final String TOPICB = "persistent://public/my_namespace/topicB";

    public static void main(String[] args) throws IOException, ParseException, PulsarAdminException {
        startReceiver();
    }

    public static void startReceiver() throws IOException, ParseException, PulsarAdminException {
        Consumer<String> consumer = PulsarTask.PulsarClienttFactory.createPulsarClient(serviceUrlNew)
                .newConsumer(Schema.STRING).topic(TOPICB).subscriptionName("result_check_subscription")
                .subscriptionType(SubscriptionType.Exclusive)  // 设置订阅模式为最简单的独占型
                .ackTimeout(30, TimeUnit.SECONDS)
                .subscribe();
        ObjectMapper objectMapper = new ObjectMapper();
        new Thread(new Runnable() {

            @Override
            public void run() {
                // 初始化一个消费总量计数器,用于记录发布情况
                int alreadyReceived = 0;
                while (true) {
                    Message<String> receive = null;
                    try {
                        receive = consumer.receive(100, TimeUnit.MILLISECONDS);
                        if (receive != null) {
                            String value = receive.getValue();
                            List<HandleResult> handleResults= objectMapper.readValue(value, new TypeReference<List<HandleResult>>() {
                            });
                            alreadyReceived += handleResults.size();
                            System.out.println("consumer: received value: " + value);
                            System.out.println("already received: " + alreadyReceived);
                        } else {
                            // 如果没有接收到任何请求,进行休眠,防止系统压力过大,如果能接收到说明有积压,就不休眠了
                            System.out.println("consumer: received null");
                            System.out.println("already received: " + alreadyReceived);
                            TimeUnit.SECONDS.sleep(2);
                        }

                    } catch (Exception e) {
                        System.out.println("consumer error : " + e.getMessage());
                    } finally {
                        if (receive != null) {
                            try {
                                // 在finally中进行ack,是为了不管是否抛异常,都把消息消费掉,这里要根据实际业务场景进行处理
                                consumer.acknowledge(receive);
                            } catch (PulsarClientException e) {
                                throw new RuntimeException(e);
                            }
                        }

                    }
                }

            }
        }
        ).start();
    }
}

这样,我们完成了主流程和发布订阅者的构建,只需要依次启动flink项目、订阅者、发布者,就可以对处理数据的正确性、个数两个维度进行分析

架构搭建过程中的踩坑点

踩坑点1 - flink版本的选取

要使用与jdk版本一致的flink、pulsar、scala版本

踩坑点2 - flink项目中无法获取依赖jar中的资源

在处理任务中,我曾尝试使用spring的PathMatchingResourcePatternResolver#getResources 解读类路径,拉取依赖中的配置文件,但很遗憾,这种尝试并未成功

经过debug,我发现是由于PathMatchingResourcePatternResolver#addClassPathManifestEntries 方法有这么一段逻辑:

String javaClassPathProperty = System.getProperty("java.class.path");
for (String path : StringUtils.delimitedListToStringArray(javaClassPathProperty, File.pathSeparator)) {
    ...

即从环境变量中获取全部类路径

如果是正常启动java项目,这里的类路径是包含全部依赖jar中的类路径的,而flink项目启动时这里只会包含flink用户jar的路径

咨询deepseek,给出的结论为:

Flink 集群中 java.class.path 只包含单个路径的原因是 Flink 的类加载隔离机制和资源分发策略。即Flink使用单独的UserClassLoader加载类,而使用AppClassLoader做为运行环境的类加载器

想要获取到依赖中的资源,需要显式添加依赖jar,基于分布式缓存获取:

env.registerCachedFile("hdfs:///libs/secondary-lib.jar", "secondaryLib");

踩坑点3 - 高版本jdk使用KryoSerializer的问题

使用Flink提供的默认序列化器例如SimpleStringSchema,底层是基于Kryo的,而Kryo又是基于反射的

在jdk17中,实测模块化的功能对Kryo反射产生影响,会抛出java.lang.reflect.InaccessibleObjectException 异常,且暂未找到适配的Kryo版本和flink版本解决此问题

因此建议基于jackson自行开发序列化反序列化器

0

评论区