Apache InLong 的 SPI 扩展实践

Apache InLong 的 SPI 扩展实践
2023年01月31日 12:59 DataFunTalk

导读:在 InLong(应龙)上云的过程中,随着数据源端和目标端的类型急剧增多,需要扩展更多类型的数据源和目标端,由此带来维护成本高、存在大量相似代码、扩展困难等问题。通过 InLong Manager(管控服务)的 SPI 扩展实践,不仅降低了维护成本,还提高了代码的复用性、极大增强了代码扩展性等,改造后的收益颇丰。

本文将分享 InLong Manager 的 SPI 改造实践过程,分以下几点展开:

  • Apache InLong 简介

  • InLong Manager 简介

  • InLong Manager 的 SPI 改造实践

分享嘉宾|周康 (前)腾讯⼤数据 ⾼级开发⼯程师

编辑整理|陈沃晨 浪潮

出品社区|DataFun

01

Apache InLong 简介

1. 项目简介

Apache InLong(应龙)是一个一站式的海量数据集成框架,提供自动、安全、可靠和高性能的数据传输能力,方便业务构建基于流式的数据分析、建模和应用。最初于 2019 年 11 月由腾讯大数据团队捐献到 Apache 孵化器,于 2022 年 6 月正式孵化毕业,成为 Apache 顶级项目(TLP)。

2. 适⽤场景

Apache InLong 依托腾讯百万亿级别的数据接入和处理能力,整合了数据采集、汇聚、缓存、分拣全流程,具有简单易用、稳定可靠、灵活扩展等特性。在腾讯内部广泛应用于广告、支付、社交、游戏、运营商、人工智能等领域。

02

InLong Manager 简介

InLong Manager(管控服务)提供完整的数据服务管控能力,包括元数据、任务流、权限,OpenAPI 等。

Apache InLong 支持数据的采集、汇聚、缓存和分拣,只需一些基础配置,就可把数据从源端导入到实时计算引擎或者写入离线存储系统。其中,InLong Manager 统一管理系统和任务的元数据(元数据包括任务的审批信息、集群的配置信息、数据 schema 配置等),并串联起数据的Ingest(采集)Converge(汇聚)Cache(缓存)Sort(分拣)Storage(存储)的全流程。

InLong Manager 的入口是 InLong Dashboard 提供的 Web UI(或Manager Client 提供的命令行工具)。通过 InLong Dashboard 创建数据流任务,任务审批通过后,即可串联起全部流程,这些流程主要包括:

  • 创建目标端的库表结构

  • 创建 MQ 的 Topic 和消费者

  • 启动 Flink 任务,开始从 MQ 消费数据,写入目标端

  • 下发采集任务,向 MQ 生产数据

03

InLong Manager 的 SPI 改造实践

1. 存在的问题

InLong 源于腾讯内网业务,在近 10 年的发展中,主要支持的数据源和数据存储只有这些:

以数据存储端(数据目标端)为例,由于用到的存储类型有限,且考虑到不同的存储类型的参数差异较大,因此不同存储类型使用了独立的数据表,可以看到 CK 配置表、HIVE 配置表和 ES 配置表设计中每张表都存在 10 多个相同的字段:

在 InLong 上云的过程中,数据源端和目标端的类型急剧增多。可以预见的是,随着云上客户规模的增加,还会继续扩展更多类型的数据源和目标端。

2. 扩展的过程中发现的痛点

  • 维护成本高:表多,重复字段多

  • 大量相似代码(if-else / switch-case 处理相似逻辑)

  • 难扩展:要扩展新的存储类型,不仅要添加一张表,还要修改接口中的代码,添加 else / case 语句(不符合开闭原则)

3. 什么是 SPI

SPI,全称 Service Provider Interface,是 Java 提供的一套用来被第三方实现或者扩展的 API,它可以用来启用框架扩展和替换组件。翻译后是“服务提供者接口”,顾名思义,这个接口是给“服务提供者”使用的。

常见示例:

  • 加载数据库驱动 load 接口的实现类

  • SLF4J 加载不同提供商的日志实现类日志门面接口的实现类

  • Spring 中自动类型转换 Type Conversion SPI(Converter SPI、Formatter SPI)等

下面以 Flink JDBC Connector 中对不同 JDBC 方言的处理为例介绍 SPI 实现机制:

① 首先定义一个开放给外部去实现的 JdbcDialectFactory 接口,由不同的 DB Dialect 去实现:

② 在 classpath 的 META-INF/services 目录下创建一个名为此接口全限定名的文件,内容是各个实现类的全限定名:

③ 程序中通过 java.util.ServiceLoder 扫描 META-INF/services 目录下的配置文件,根据实现类的全限定名来动态装载具体的实现类:

只需要调用③中 JdbcDialectLoader 的加载方法,即可根据传入的参数动态选中符合条件的实现类,比如传入的 JDBC URL以jdbc:mysql: 开头,在 JdbcDialectLoader 中就会返回 MySqlDialectFactory:

Java SPI 实际上是一种“基于接口的编程+策略模式+配置文件”组合实现的动态加载机制。

4. SPI 改造过程

精简服务层代码,删除繁琐的 if-else/switch-case。收敛 Service 层的接口,同一领域模型的请求都在同一个 Service 接口中处理,以保存操作为例,代码如下:

代码路径:

org.apache.inlong.manager.service.sink.StramSinkServiceImpl

根据不同的类型,通过执行器的工厂找到具体的配置执行器,该执行器去执行真正的保存方法,代码如下:

代码路径: 

org.apache.inlong.manager.service.sink.SinkOperatorFactory

重构数据库实体模型,一张表支持任意类型 Sink 的配置。表中记录通用字段,对于不同类型 Sink 的特有字段,通过一个扩展字段来存储(KV,JSON),如下:

不同类型 Sink 的特有参数,转换成 JSON 格式存储到 ext_params 字段中,查询时再将其解析成其特有的 DTO。相关代码可以参考:

  • org.apache.inlong.manager.service.sink.AbstractSinkOperator 中 saveOpt 方法调用的 setTargetEntity 方法

  • org.apache.inlong.manager.service.sink.StreamSinkOperator 接口中 getFromEntity 方法的各个实现 

其他类似操作,感兴趣的朋友请检视:

  • org.apache.inlong.manager.service.group.InlongGroupOperator

  • org.apache.inlong.manager.service.sink.StreamSinkOperator

  • org.apache.inlong.manager.service.source.StreamSourceOperator

  • org.apache.inlong.manager.service.resource.sink.SinkResourceOperator

  • org.apache.inlong.manager.service.resource.queue.QueueResourceOperator

5. 改造后的收益

① 代码复用性提高,减少了大量重复/相似逻辑的代码,降低了维护成本。

② 代码扩展性极大增强,增加不同类型的配置,只需要依葫芦画瓢,去实现其特殊逻辑即可,也无需改动已有接口。

③ 表的 DDL 不用频繁变动,降低了维护成本,避免修改 DDL /增加新表引发线上问题。

④ 可以在不侵入修改开源代码的情况下,扩展腾讯内部的配置类型,加入内网特有的业务逻辑。

04

问答环节

Q1: 请问 Apache InLong 项目的应用场景?

A1: Apache InLong 于 2022 年 6 月毕业成为 Apache 基金会顶级项目(TLP),它依托腾讯百万亿级别的数据接入和处理能力,整合了数据采集、汇聚、缓存、分拣全流程,打通了数据集成的所有环节,具有简单易用、稳定可靠、灵活扩展等特性。在腾讯内部广泛应用于广告、支付、社交、游戏、运营商、人工智能等领域。

Q2:Source 侧(采集端)是否引入了 FinkCDC?

A2: 是的,Apache InLong 提供了两种模式,一种是轻量化模式:可以理解为只有一个 Sort(分拣)层,Sort 轻量化版本引入了 FinkCDC 实现不同数据源的采集和落地,并对 FinkCDC 做了大量的扩展和增强;另外一种是标准化模式:具备数据采集层、汇聚层、缓存层和分拣层整个链路。之所以这么划分,是因为我们在内网的大量实践中发现,在数据量特别大的场景中只使用 FinkCDC 这么一层模型是支撑不了的,所以标准模式中在原有基础上增加了汇聚层和缓存层,提高系统的吞吐量和数据的可靠性。

Q3:Sink(目标端)的扩展方面,像 Hudi、Doris 这些 Connector 端的扩展也都是参考您刚才讲的 SPI 的实现类就可以实现吗?

A3:是的。这里分为两个步骤,刚刚讲的是 InLong Manager(管控服务)的 SPI 扩展,Manager 端管理起来之后,还需要下游的分拣层(Sort)支持具体的写入流程。其实在分拣层也使用了大量的 SPI 技术,感兴趣的小伙伴可也看 InLong 的官网,里面有扩展的文档。在分拣层实现了SPI扩展后就可以打通整个链路,实现 Connector 端的扩展。

Q4:DB 的采集支持 DB 的 DDL 同步吗?

A4:Apache InLong 现在的版本还没有这个能力,但是我们(腾讯)内部已经具备这个能力了;在之后的发展路线中我们会把这个能力开放到社区。实现流程大概是基于 Binlog 的事件流去感知 DDL 的变更,然后上报到管控服务(Apache Manager),管控服务再同步给下游环节。

今天的分享就到这里,谢谢大家。

|分享嘉宾|

周康

(前)腾讯⼤数据 ⾼级开发⼯程师

毕业于中国海洋大学,曾任职于腾讯大数据部门,有近5年的大数据平台建设经验。

▌《数据智能知识地图》下载

🔥业内首个数据智能知识地图》已发布!涉及15个领域,133个体系框架,1000个细分知识点,欢迎大家下载!

财经自媒体联盟更多自媒体作者

新浪首页 语音播报 相关新闻 返回顶部