Kettle(二)数据抽取与插入模型

一、 关键术语与基本概念

在Kettle中主要有两个概念,一个是Transformation,还有一个是Job。
Transformation主要用来描述转换过程,它由多个Step组成,我们来看一下Transformation在Kettle中的表现形式,Transformation本质上描述了数据在程序中的流向。

从上图中可以看到Transformation包含多个节点,这些节点称为Step,用来表示特定的功能。如排序记录这个Step,它主要对传输过来的数据进行排序,排完序的数据再递交给记录集连接S。

如果说Transformation是数据流的话,那么Job就是工作流,它包含了多个Transformation,这些Transformation按照特定的流程进行连接。
元数据也被称为数据的数据,在Kettle中使用的非常广泛,例如Transformation也有自己的元数据,它用来描述一个Transformation的具体细节,这些元数据可以使用文件来保存也可以存入数据库。

Transformation文件的后缀为ktr,Job文件的后缀为kjb,他们都使用xml格式的文件来保存。

二、结构体系

在介绍Kettle的具体实现之前是先来看看kettle的整体架构,这样在讲解Kettle批量插入时我们可以清楚的知道这些数据来自哪里并流向何处。

上图总共描述了四个Step(粗体表示的),kettle首先使用表输入组件从源表中读取数据存入到RowSet中,每个源表对应一个RowSet,分别存储从对应表中读取的记录。当前两个RowSet中有数据之后,表合并操作会分别从两个RowSet中读取记录,并根据给出的字段做连接操作。当合并的数据集达到一定的数量之后根据需要输出的字段选择对应的列写入到目标表中。

通过上面的讲解我们大致了解了Transformation运行的机制,下面我们来详细分析重要每个Step的具体实现细节。

三、数据抽取

常用的数据抽取步骤在Kettle中的组件名叫做“表输入”,它的实现类为TableInput。下面我们来看下数据抽取的模型。

上面展示了表输入启动时数据的流向,其中数字表示操作的先后顺序。如果程序是第一次启动则会初始化thisRow和nextRow这两个变量,然后将thisRow指向的记录放入outputRowSet中,最后thisRow指向nextRow指向的变量。如果程序不是第一次执行,程序会直接将取回的数据放入nextRow中,跟初始化相比少掉了编号为①的步骤。当程序发现thisRow为空时会停止表输入步骤,当然在数据处理的过程中出现任何异常,当前步骤都会通知Trans停止所有的转换步骤。因为Kettle使用的是JDBC连接的数据库,默认情况下ResultSet会将数据库中所有的数据加载到内存中,这样就很容易出现内存溢出的情况。在数据抽取的过程中为了防止内存溢出的情况,Kettle设置了JDBC读取数据的方式,采用数据流的方式挨个取数据,不会将所有数据都加载到ResultSet中。

四、记录集排序

在Kettle中进行记录集连接操作时默认认为表中的数据是有序的,如果我们不对抽取的数据进行排序,在做join操作时会造成一些数据的丢失。在数据量很大的情况下,Kettle采用的是外部排序,具体的算法为多路归并排序。当然在Kettle中使用排序组件会阻塞后续的步骤,如果当前数据没有完全排好序的话数据不会流向下一个步骤。对于排序Kettle可以设置缓存的大小,这里的缓存指的是内存中的大小,设置方法如下图所示:

排序的设置参数如上图所示,我们可以看到其中包含了一些重要的参数,排序目录指的是临时文件的目录,“排序缓存大小”指的是在内存中多少条数据排序一次。为了更直观的展现排序的过程,我们结合示例图来讲解。

上面这幅图主要展示了排序的初始流程,通过第一轮的排序,我们可以在tmpList中找到id号最小的记录,并且我们可以知道该条记录所在的文件标识,这时我们先取出编号最小的记录,然后从tmp2中获取下一条记录也就是id为7的记录并放入到tmpList中,然后对tmpList进行排序,继续取出tmpList中id号最小的记录,通过多次上述的操作,从tmpList中取出的记录可以做到全局有序。

五、批量插入

批量插入的流程比较简单,它的实现原理就是我们平常使用的批量插入功能。批量插入对应的Step为表输出,也就是TableOutput。下面主要分析的是TableOutput的processRow方法,这是数据批量插入的入口。

我们看到一开始就调用了getRow方法,从源码的注释也验证了我们之前的猜想,该方法是一个会阻塞的方法,它的作用是从前一个step获取一条记录。
接着我们看该条记录是如何插入到表中的。

我们可以看到r被传入writeToTable方法中了,我们继续跟进writeToTable方法。
该方法代码比较多,我们首先看看该方法中的如下代码:

这段代码的主要作用是根据表的元数据信息构造出insertStatement,该对象的类型是PreparedStatement,为后面插入记录做准备。
下面我们来看看真正插入记录的代码:

setValues方法传入了要插入的记录(insertRowData),后面的insertRow中的false表示取消connection的自动提交功能。既然是批量插入肯定有提交操作的地方,该操作的代码如下:

从代码中我们可以看到commitCounter这个变量,该变量的作用是每插入多少条数据提交一次到数据库。当然,如果剩下的数据量没有达到这个值在释放这个Step的时候也会提交一次,防止数据丢失。

发表评论

电子邮件地址不会被公开。 必填项已用*标注