0%

Kettle(二)数据抽取与插入模型

在 Kettle 中主要有两个概念,一个是 Transformation,还有一个是 Job。

一. 关键术语与基本概念

Transformation 主要用来描述转换过程,它由多个 Step 组成,我们来看一下 Transformation 在 Kettle 中的表现形式,Transformation 本质上描述了数据在程序中的流向。

从上图中可以看到 Transformation 包含多个节点,这些节点称为 Step,用来表示特定的功能。如排序记录这个 Step,它主要对传输过来的数据进行排序,排完序的数据再递交给记录集连接。

如果说 Transformation 是数据流的话,那么 Job 就是工作流,它包含了多个 Transformation,这些 Transformation 按照特定的流程进行连接。
元数据也被称为数据的数据,在 Kettle 中使用的非常广泛,例如 Transformation 也有自己的元数据,它用来描述一个 Transformation 的具体细节,这些元数据可以使用文件来保存也可以存入数据库。

Transformation 文件的后缀为 ktr,Job 文件的后缀为 kjb,他们都使用 xml 格式的文件来保存。

二. 结构体系

在介绍 Kettle 的具体实现之前是先来看看 Kettle 的整体架构,这样在讲解 Kettle 批量插入时我们可以清楚的知道这些数据来自哪里并流向何处。

上图总共描述了四个 Step(粗体表示的),Kettle 首先使用表输入组件从源表中读取数据存入到 RowSet 中,每个源表对应一个 RowSet ,分别存储从对应表中读取的记录。当前两个 RowSet 中有数据之后,表合并操作会分别从两个 RowSet 中读取记录,并根据给出的字段做连接操作。当合并的数据集达到一定的数量之后根据需要输出的字段选择对应的列写入到目标表中。

通过上面的讲解我们大致了解了 Transformation 运行的机制,下面我们来详细分析重要每个 Step 的具体实现细节。

三. 数据抽取

常用的数据抽取步骤在 Kettle 中的组件名叫做“表输入”,它的实现类为 TableInput 。下面我们来看下数据抽取的模型。

上面展示了表输入启动时数据的流向,其中数字表示操作的先后顺序。如果程序是第一次启动则会初始化 thisRow 和 nextRow 这两个变量,然后将 thisRow 指向的记录放入 outputRowSet 中,最后 thisRow 指向 nextRow 指向的变量。如果程序不是第一次执行,程序会直接将取回的数据放入 nextRow 中,跟初始化相比少掉了编号为①的步骤。当程序发现 thisRow 为空时会停止表输入步骤,当然在数据处理的过程中出现任何异常,当前步骤都会通知 Trans 停止所有的转换步骤。因为 Kettle 使用的是 JDBC 连接的数据库,默认情况下 ResultSet 会将数据库中所有的数据加载到内存中,这样就很容易出现内存溢出的情况。在数据抽取的过程中为了防止内存溢出的情况,Kettle 设置了 JDBC 读取数据的方式,采用数据流的方式挨个取数据,不会将所有数据都加载到 ResultSet 中。

四. 记录集排序

在 Kettle 中进行记录集连接操作时默认认为表中的数据是有序的,如果我们不对抽取的数据进行排序,在做 join 操作时会造成一些数据的丢失。在数据量很大的情况下,Kettle 采用的是外部排序,具体的算法为多路归并排序。当然在 Kettle 中使用排序组件会阻塞后续的步骤,如果当前数据没有完全排好序的话数据不会流向下一个步骤。对于排序 Kettle 可以设置缓存的大小,这里的缓存指的是内存中的大小,设置方法如下图所示:

排序的设置参数如上图所示,我们可以看到其中包含了一些重要的参数,排序目录指的是临时文件的目录,“排序缓存大小”指的是在内存中多少条数据排序一次。为了更直观的展现排序的过程,我们结合示例图来讲解。

上面这幅图主要展示了排序的初始流程,通过第一轮的排序,我们可以在 tmpList 中找到 id 号最小的记录,并且我们可以知道该条记录所在的文件标识,这时我们先取出编号最小的记录,然后从 tmp2 中获取下一条记录也就是 id 为7的记录并放入到 tmpList 中,然后对 tmpList 进行排序,继续取出 tmpList 中 id 号最小的记录,通过多次上述的操作,从 tmpList 中取出的记录可以做到全局有序。

五. 批量插入

批量插入的流程比较简单,它的实现原理就是我们平常使用的批量插入功能。批量插入对应的 Step 为表输出,也就是 TableOutput。下面主要分析的是 TableOutput 的 processRow 方法,这是数据批量插入的入口。

我们看到一开始就调用了 getRow 方法,从源码的注释也验证了我们之前的猜想,该方法是一个会阻塞的方法,它的作用是从前一个 step 获取一条记录。
接着我们看该条记录是如何插入到表中的。

我们可以看到 r 被传入 writeToTable 方法中了,我们继续跟进 writeToTable 方法。
该方法代码比较多,我们首先看看该方法中的如下代码:

这段代码的主要作用是根据表的元数据信息构造出 insertStatement,该对象的类型是 PreparedStatement,为后面插入记录做准备。
下面我们来看看真正插入记录的代码:

setValues 方法传入了要插入的记录(insertRowData),后面的 insertRow 中的 false 表示取消 connection 的自动提交功能。既然是批量插入肯定有提交操作的地方,该操作的代码如下:

从代码中我们可以看到 commitCounter 这个变量,该变量的作用是每插入多少条数据提交一次到数据库。当然如果剩下的数据量没有达到这个值在释放这个 Step 的时候也会提交一次,防止数据丢失。

如果觉得我的文章对您有用,请查看广告(Google Ads)或扫码。您的支持将鼓励我继续创作!