# 数据转换
## Transformer 定义
在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。Addax包含了完成的E(Extract)、T(Transformer)、L(Load)支持。
## 运行模型
```mermaid graph LR source(("source")) subgraph fr["Addax Framework"]
direction LR Reader ==> Transformer ==>Writer
end target(("target")) source ==> fr ==> target ```
## UDF 函数
### dx_substr
dx_substr(idx, pos, length) -> str
参数
idx: 字段编号,对应record中第几个字段
pos: 字段值的开始位置
length: 目标字段长度
返回: 从字符串的指定位置(包含)截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
### dx_pad
dx_pad(idx, flag, length, chr)
参数
idx: 字段编号,对应record中第几个字段
flag: "l","r", 指示是在头进行填充,还是尾进行填充
length: 目标字段长度
chr: 需要填充的字符
返回: 如果源字符串长度小于目标字段长度,按照位置添加pad字符后返回。如果长于,直接截断(都截右边)。如果字段为空值,转换为空字符串进行pad,即最后的字符串全是需要pad的字符
举例:
dx_pad(1,"l","4","A"): 如果 column 1 的值为 xyz=> Axyz, 则转换后的值为 xyzzzzz => xyzz
dx_pad(1,"r","4","A"), 如果 column 1 的值为 xyz=> xyzA, 值为 xyzzzzz => xyzz
### dx_replace
dx_replace(idx, pos, length, str) -> str
参数
idx: 字段编号,对应record中第几个字段
pos: 字段值的开始位置
length: 需要替换的字段长度
str: 要替换的字符串
返回: 从字符串的指定位置(包含)替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
举例:
dx_replace(1,"2","4","****"): 如果 column 1 的值为 addaxTest, 则转换为 da****est
dx_replace(1,"5","10","****") 如果 column 1 的值为 addaxTest 则转换为 data****
### dx_filter
dx_filter(idx, operator, expr) -> str
参数:
idx: 字段编号,对应record中第几个字段
operator: 运算符, 支持 like, not like, >, =, <, >=, !=, <=
expr: 正则表达式(java正则表达式)、值
str: 要替换的字符串
返回:
如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。(注意是该行)。对于 >, =, `<`都是对字段直接compare的结果.
like , not like 是将字段转换成字符类型,然后和目标正则表达式进行全匹配。
>, =, <, >=, !=, <= ,按照类型进行比较, 数值类型按大小比较,字符及布尔类型按照字典序比较
如果目标字段为空(null),对于 = null 的过滤条件,将满足条件,被过滤。!=null 的过滤条件,null不满足过滤条件,不被过滤。 like,字段为null不满足条件,不被过滤,和 not like,字段为null满足条件,被过滤。
举例
dx_filter(1,"like","dataTest")
dx_filter(1,">=","10")
关联filter暂不支持,即多个字段的联合判断,函参太过复杂,用户难以使用。
### dx_groovy
dx_groovy(code, package) -> record
参数
coee: 符合 groovy 编码要求的代码
package: extraPackage, 列表或者为空
返回
Record 数据类型
注意:
dx_groovy 只能调用一次。不能多次调用。
groovy code 中支持 java.lang, java.util 的包,可直接引用的对象有 record ,以及element下的各种column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class)。 不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。
groovy code 中,返回更新过的 `Record`(比如record.setColumn(columnIndex, new StringColumn(newValue));),或者null。返回null表示过滤此行。
用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil)
举例:
groovy 实现的 subStr
``` java String code="Column column = record.getColumn(1);n"+
" String oriValue = column.asString();n"+ " String newValue = oriValue.substring(0, 3);n"+ " record.setColumn(1, new StringColumn(newValue));n"+ " return record;"; dx_groovy(record);
groovy 实现的Replace
``` java String code2="Column column = record.getColumn(1);n"+
groovy 实现的Pad
``` java String code3="Column column = record.getColumn(1);n"+
" String oriValue = column.asString();n"+ " String padString = "12345";n"+ " String finalPad = "";n"+ " int NeedLength = 8 - oriValue.length();n"+ " while (NeedLength > 0) {n"+ "n"+ " if (NeedLength >= padString.length()) {n"+ " finalPad += padString;n"+ " NeedLength -= padString.length();n"+ " } else {n"+ " finalPad += padString.substring(0, NeedLength);n"+ " NeedLength = 0;n"+ " }n"+ " }n"+ " String newValue= finalPad + oriValue;n"+ " record.setColumn(1, new StringColumn(newValue));n"+ " return record;";
从 4.1.2 版本开始, dx_groovy 支持从外部文件加载 groovy 代码,读取文件的相对路径为 $ADDAX_HOME 变量所在的目录,也就是 Addax 的安装目录。
以实现 subStr 为例,我们可以创建 job/substr.groovy 文件,内容如下:
`groovy
Column column = record.getColumn(1)
String oriValue = column.asString()
String newValue = oriValue.substring(0, 3)
record.setColumn(1, new StringColumn(newValue))
return record
`
然后在 job 文件中这样去定义:
- "transformer": [
- {
"name": "dx_groovy", "parameter": {
"codeFile": "job/substr.groovy"
}
}
]
}
文件也可以使用绝对路径来指定。
## Job定义
本例中,配置4个UDF。
`json
--8<-- "jobs/udfdemo.json"
`
## 自定义函数
如果自带的函数不满足数据转换要求,我们可以在 transformer 编写满足 groovy 规范要求的代码,下面给出一个完整的例子
`json
--8<-- "jobs/groovy.json"
`
上述 transformer 代码针对每条记录的前面两个字段做了修改,对第一个字段的字符串,在字符串前面增加 Header_ 字符; 第二个整数字段值进行倍增处理。最后执行的结果如下:
` shell
--8<-- "output/groovydemo.txt"
`
## 计量和脏数据
Transform过程涉及到数据的转换,可能造成数据的增加或减少,因此更加需要精确度量,包括:
Transform的入参Record条数、字节数。
Transform的出参Record条数、字节数。
Transform的脏数据Record条数、字节数。
如果是多个Transform,某一个发生脏数据,将不会再进行后面的transform,直接统计为脏数据。
目前只提供了所有Transform的计量(成功,失败,过滤的count,以及transform的消耗时间)。
涉及到运行过程的计量数据展现定义如下:
`shell
Total 1000000 records, 22000000 bytes | Transform 100000 records(in), 10000 records(out) | Speed 2.10MB/s, 100000 records/s | Error 0 records, 0 bytes | Percentage 100.00%
`
注意,这里主要记录转换的输入输出,需要检测数据输入输出的记录数量变化。
涉及到最终作业的计量数据展现定义如下:
`shell
任务启动时刻 : 2015-03-10 17:34:21
任务结束时刻 : 2015-03-10 17:34:31
任务总计耗时 : 10s
任务平均流量 : 2.10MB/s
记录写入速度 : 100000rec/s
转换输入总数 : 1000000
转换输出总数 : 1000000
读出记录总数 : 1000000
同步失败总数 : 0
`
注意,这里主要记录转换的输入输出,需要检测数据输入输出的记录数量变化。